123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558 |
- // Licensed to the Apache Software Foundation (ASF) under one
- // or more contributor license agreements. See the NOTICE file
- // distributed with this work for additional information
- // regarding copyright ownership. The ASF licenses this file
- // to you under the Apache License, Version 2.0 (the
- // "License"); you may not use this file except in compliance
- // with the License. You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing,
- // software distributed under the License is distributed on an
- // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- // KIND, either express or implied. See the License for the
- // specific language governing permissions and limitations
- // under the License.
- #include "butil/compat.h"
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <sys/utsname.h> // uname
- #include <fcntl.h>
- #include <gtest/gtest.h>
- #include <pthread.h>
- #include "butil/gperftools_profiler.h"
- #include "butil/time.h"
- #include "butil/macros.h"
- #include "butil/fd_utility.h"
- #include "butil/logging.h"
- #include "bthread/task_control.h"
- #include "bthread/task_group.h"
- #include "bthread/interrupt_pthread.h"
- #include "bthread/bthread.h"
- #include "bthread/unstable.h"
- #if defined(OS_MACOSX)
- #include <sys/types.h> // struct kevent
- #include <sys/event.h> // kevent(), kqueue()
- #endif
- #ifndef NDEBUG
- namespace bthread {
- extern butil::atomic<int> break_nums;
- extern TaskControl* global_task_control;
- int stop_and_join_epoll_threads();
- }
- #endif
- namespace {
- TEST(FDTest, read_kernel_version) {
- utsname name;
- uname(&name);
- std::cout << "sysname=" << name.sysname << std::endl
- << "nodename=" << name.nodename << std::endl
- << "release=" << name.release << std::endl
- << "version=" << name.version << std::endl
- << "machine=" << name.machine << std::endl;
- }
- #define RUN_CLIENT_IN_BTHREAD 1
- //#define USE_BLOCKING_EPOLL 1
- //#define RUN_EPOLL_IN_BTHREAD 1
- //#define CREATE_THREAD_TO_PROCESS 1
- volatile bool stop = false;
- struct SocketMeta {
- int fd;
- int epfd;
- };
- struct BAIDU_CACHELINE_ALIGNMENT ClientMeta {
- int fd;
- size_t count;
- size_t times;
- };
- struct EpollMeta {
- int epfd;
- };
- const size_t NCLIENT = 30;
- void* process_thread(void* arg) {
- SocketMeta* m = (SocketMeta*)arg;
- size_t count;
- //printf("begin to process fd=%d\n", m->fd);
- ssize_t n = read(m->fd, &count, sizeof(count));
- if (n != sizeof(count)) {
- LOG(FATAL) << "Should not happen in this test";
- return NULL;
- }
- count += NCLIENT;
- //printf("write result=%lu to fd=%d\n", count, m->fd);
- if (write(m->fd, &count, sizeof(count)) != sizeof(count)) {
- LOG(FATAL) << "Should not happen in this test";
- return NULL;
- }
- #ifdef CREATE_THREAD_TO_PROCESS
- # if defined(OS_LINUX)
- epoll_event evt = { EPOLLIN | EPOLLONESHOT, { m } };
- if (epoll_ctl(m->epfd, EPOLL_CTL_MOD, m->fd, &evt) < 0) {
- epoll_ctl(m->epfd, EPOLL_CTL_ADD, m->fd, &evt);
- }
- # elif defined(OS_MACOSX)
- struct kevent kqueue_event;
- EV_SET(&kqueue_event, m->fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_ONESHOT,
- 0, 0, m);
- kevent(m->epfd, &kqueue_event, 1, NULL, 0, NULL);
- # endif
- #endif
- return NULL;
- }
- void* epoll_thread(void* arg) {
- bthread_usleep(1);
- EpollMeta* m = (EpollMeta*)arg;
- const int epfd = m->epfd;
- #if defined(OS_LINUX)
- epoll_event e[32];
- #elif defined(OS_MACOSX)
- struct kevent e[32];
- #endif
- while (!stop) {
- #if defined(OS_LINUX)
- # ifndef USE_BLOCKING_EPOLL
- const int n = epoll_wait(epfd, e, ARRAY_SIZE(e), 0);
- if (stop) {
- break;
- }
- if (n == 0) {
- bthread_fd_wait(epfd, EPOLLIN);
- continue;
- }
- # else
- const int n = epoll_wait(epfd, e, ARRAY_SIZE(e), -1);
- if (stop) {
- break;
- }
- if (n == 0) {
- continue;
- }
- # endif
- #elif defined(OS_MACOSX)
- const int n = kevent(epfd, NULL, 0, e, ARRAY_SIZE(e), NULL);
- if (stop) {
- break;
- }
- if (n == 0) {
- continue;
- }
- #endif
- if (n < 0) {
- if (EINTR == errno) {
- continue;
- }
- #if defined(OS_LINUX)
- PLOG(FATAL) << "Fail to epoll_wait";
- #elif defined(OS_MACOSX)
- PLOG(FATAL) << "Fail to kevent";
- #endif
- break;
- }
- #ifdef CREATE_THREAD_TO_PROCESS
- bthread_fvec vec[n];
- for (int i = 0; i < n; ++i) {
- vec[i].fn = process_thread;
- # if defined(OS_LINUX)
- vec[i].arg = e[i].data.ptr;
- # elif defined(OS_MACOSX)
- vec[i].arg = e[i].udata;
- # endif
- }
- bthread_t tid[n];
- bthread_startv(tid, vec, n, &BTHREAD_ATTR_SMALL);
- #else
- for (int i = 0; i < n; ++i) {
- # if defined(OS_LINUX)
- process_thread(e[i].data.ptr);
- # elif defined(OS_MACOSX)
- process_thread(e[i].udata);
- # endif
- }
- #endif
- }
- return NULL;
- }
- void* client_thread(void* arg) {
- ClientMeta* m = (ClientMeta*)arg;
- for (size_t i = 0; i < m->times; ++i) {
- if (write(m->fd, &m->count, sizeof(m->count)) != sizeof(m->count)) {
- LOG(FATAL) << "Should not happen in this test";
- return NULL;
- }
- #ifdef RUN_CLIENT_IN_BTHREAD
- ssize_t rc;
- do {
- # if defined(OS_LINUX)
- const int wait_rc = bthread_fd_wait(m->fd, EPOLLIN);
- # elif defined(OS_MACOSX)
- const int wait_rc = bthread_fd_wait(m->fd, EVFILT_READ);
- # endif
- EXPECT_EQ(0, wait_rc) << berror();
- rc = read(m->fd, &m->count, sizeof(m->count));
- } while (rc < 0 && errno == EAGAIN);
- #else
- ssize_t rc = read(m->fd, &m->count, sizeof(m->count));
- #endif
- if (rc != sizeof(m->count)) {
- PLOG(FATAL) << "Should not happen in this test, rc=" << rc;
- return NULL;
- }
- }
- return NULL;
- }
- inline uint32_t fmix32 ( uint32_t h ) {
- h ^= h >> 16;
- h *= 0x85ebca6b;
- h ^= h >> 13;
- h *= 0xc2b2ae35;
- h ^= h >> 16;
- return h;
- }
- // Disable temporarily due to epoll's bug. The bug is fixed by
- // a kernel patch that lots of machines currently don't have
- TEST(FDTest, ping_pong) {
- #ifndef NDEBUG
- bthread::break_nums = 0;
- #endif
- const size_t REP = 30000;
- const size_t NEPOLL = 2;
- int epfd[NEPOLL];
- #ifdef RUN_EPOLL_IN_BTHREAD
- bthread_t eth[NEPOLL];
- #else
- pthread_t eth[NEPOLL];
- #endif
- int fds[2 * NCLIENT];
- #ifdef RUN_CLIENT_IN_BTHREAD
- bthread_t cth[NCLIENT];
- #else
- pthread_t cth[NCLIENT];
- #endif
- ClientMeta* cm[NCLIENT];
- for (size_t i = 0; i < NEPOLL; ++i) {
- #if defined(OS_LINUX)
- epfd[i] = epoll_create(1024);
- #elif defined(OS_MACOSX)
- epfd[i] = kqueue();
- #endif
- ASSERT_GT(epfd[i], 0);
- }
-
- for (size_t i = 0; i < NCLIENT; ++i) {
- ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds + 2 * i));
- //printf("Created fd=%d,%d i=%lu\n", fds[2*i], fds[2*i+1], i);
- SocketMeta* m = new SocketMeta;
- m->fd = fds[i * 2];
- m->epfd = epfd[fmix32(i) % NEPOLL];
- ASSERT_EQ(0, fcntl(m->fd, F_SETFL, fcntl(m->fd, F_GETFL, 0) | O_NONBLOCK));
- #ifdef CREATE_THREAD_TO_PROCESS
- # if defined(OS_LINUX)
- epoll_event evt = { EPOLLIN | EPOLLONESHOT, { m } };
- # elif defined(OS_MACOSX)
- struct kevent kqueue_event;
- EV_SET(&kqueue_event, m->fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_ONESHOT,
- 0, 0, m);
- # endif
- #else
- # if defined(OS_LINUX)
- epoll_event evt = { EPOLLIN, { m } };
- # elif defined(OS_MACOSX)
- struct kevent kqueue_event;
- EV_SET(&kqueue_event, m->fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, m);
- # endif
- #endif
- #if defined(OS_LINUX)
- ASSERT_EQ(0, epoll_ctl(m->epfd, EPOLL_CTL_ADD, m->fd, &evt));
- #elif defined(OS_MACOSX)
- ASSERT_EQ(0, kevent(m->epfd, &kqueue_event, 1, NULL, 0, NULL));
- #endif
- cm[i] = new ClientMeta;
- cm[i]->fd = fds[i * 2 + 1];
- cm[i]->count = i;
- cm[i]->times = REP;
- #ifdef RUN_CLIENT_IN_BTHREAD
- butil::make_non_blocking(cm[i]->fd);
- ASSERT_EQ(0, bthread_start_urgent(&cth[i], NULL, client_thread, cm[i]));
- #else
- ASSERT_EQ(0, pthread_create(&cth[i], NULL, client_thread, cm[i]));
- #endif
- }
- ProfilerStart("ping_pong.prof");
- butil::Timer tm;
- tm.start();
- for (size_t i = 0; i < NEPOLL; ++i) {
- EpollMeta *em = new EpollMeta;
- em->epfd = epfd[i];
- #ifdef RUN_EPOLL_IN_BTHREAD
- ASSERT_EQ(0, bthread_start_urgent(ð[i], epoll_thread, em, NULL);
- #else
- ASSERT_EQ(0, pthread_create(ð[i], NULL, epoll_thread, em));
- #endif
- }
- for (size_t i = 0; i < NCLIENT; ++i) {
- #ifdef RUN_CLIENT_IN_BTHREAD
- bthread_join(cth[i], NULL);
- #else
- pthread_join(cth[i], NULL);
- #endif
- ASSERT_EQ(i + REP * NCLIENT, cm[i]->count);
- }
- tm.stop();
- ProfilerStop();
- LOG(INFO) << "tid=" << REP*NCLIENT*1000000L/tm.u_elapsed();
- stop = true;
- for (size_t i = 0; i < NEPOLL; ++i) {
- #if defined(OS_LINUX)
- epoll_event evt = { EPOLLOUT, { NULL } };
- ASSERT_EQ(0, epoll_ctl(epfd[i], EPOLL_CTL_ADD, 0, &evt));
- #elif defined(OS_MACOSX)
- struct kevent kqueue_event;
- EV_SET(&kqueue_event, 0, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, NULL);
- ASSERT_EQ(0, kevent(epfd[i], &kqueue_event, 1, NULL, 0, NULL));
- #endif
- #ifdef RUN_EPOLL_IN_BTHREAD
- bthread_join(eth[i], NULL);
- #else
- pthread_join(eth[i], NULL);
- #endif
- }
- //bthread::stop_and_join_epoll_threads();
- bthread_usleep(100000);
- #ifndef NDEBUG
- std::cout << "break_nums=" << bthread::break_nums << std::endl;
- #endif
- }
- TEST(FDTest, mod_closed_fd) {
- #if defined(OS_LINUX)
- // Conclusion:
- // If fd is never added into epoll, MOD returns ENOENT
- // If fd is inside epoll and valid, MOD returns 0
- // If fd is closed and not-reused, MOD returns EBADF
- // If fd is closed and reused, MOD returns ENOENT again
-
- const int epfd = epoll_create(1024);
- int new_fd[2];
- int fd[2];
- ASSERT_EQ(0, pipe(fd));
- epoll_event e = { EPOLLIN, { NULL } };
- errno = 0;
- ASSERT_EQ(-1, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e));
- ASSERT_EQ(ENOENT, errno);
- ASSERT_EQ(0, epoll_ctl(epfd, EPOLL_CTL_ADD, fd[0], &e));
- // mod after add
- ASSERT_EQ(0, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e));
- // mod after mod
- ASSERT_EQ(0, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e));
- ASSERT_EQ(0, close(fd[0]));
- ASSERT_EQ(0, close(fd[1]));
- errno = 0;
- ASSERT_EQ(-1, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e));
- ASSERT_EQ(EBADF, errno) << berror();
- ASSERT_EQ(0, pipe(new_fd));
- ASSERT_EQ(fd[0], new_fd[0]);
- ASSERT_EQ(fd[1], new_fd[1]);
-
- errno = 0;
- ASSERT_EQ(-1, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e));
- ASSERT_EQ(ENOENT, errno) << berror();
-
- ASSERT_EQ(0, close(epfd));
- #endif
- }
- TEST(FDTest, add_existing_fd) {
- #if defined(OS_LINUX)
- const int epfd = epoll_create(1024);
- epoll_event e = { EPOLLIN, { NULL } };
- ASSERT_EQ(0, epoll_ctl(epfd, EPOLL_CTL_ADD, 0, &e));
- errno = 0;
- ASSERT_EQ(-1, epoll_ctl(epfd, EPOLL_CTL_ADD, 0, &e));
- ASSERT_EQ(EEXIST, errno);
- ASSERT_EQ(0, close(epfd));
- #endif
- }
- void* epoll_waiter(void* arg) {
- #if defined(OS_LINUX)
- epoll_event e;
- if (1 == epoll_wait((int)(intptr_t)arg, &e, 1, -1)) {
- std::cout << e.events << std::endl;
- }
- #elif defined(OS_MACOSX)
- struct kevent e;
- if (1 == kevent((int)(intptr_t)arg, NULL, 0, &e, 1, NULL)) {
- std::cout << e.flags << std::endl;
- }
- #endif
- std::cout << pthread_self() << " quits" << std::endl;
- return NULL;
- }
- TEST(FDTest, interrupt_pthread) {
- #if defined(OS_LINUX)
- const int epfd = epoll_create(1024);
- #elif defined(OS_MACOSX)
- const int epfd = kqueue();
- #endif
- pthread_t th, th2;
- ASSERT_EQ(0, pthread_create(&th, NULL, epoll_waiter, (void*)(intptr_t)epfd));
- ASSERT_EQ(0, pthread_create(&th2, NULL, epoll_waiter, (void*)(intptr_t)epfd));
- bthread_usleep(100000L);
- std::cout << "wake up " << th << std::endl;
- bthread::interrupt_pthread(th);
- bthread_usleep(100000L);
- std::cout << "wake up " << th2 << std::endl;
- bthread::interrupt_pthread(th2);
- pthread_join(th, NULL);
- pthread_join(th2, NULL);
- }
- void* close_the_fd(void* arg) {
- bthread_usleep(10000/*10ms*/);
- EXPECT_EQ(0, bthread_close(*(int*)arg));
- return NULL;
- }
- TEST(FDTest, invalid_epoll_events) {
- errno = 0;
- #if defined(OS_LINUX)
- ASSERT_EQ(-1, bthread_fd_wait(-1, EPOLLIN));
- #elif defined(OS_MACOSX)
- ASSERT_EQ(-1, bthread_fd_wait(-1, EVFILT_READ));
- #endif
- ASSERT_EQ(EINVAL, errno);
- errno = 0;
- #if defined(OS_LINUX)
- ASSERT_EQ(-1, bthread_fd_timedwait(-1, EPOLLIN, NULL));
- #elif defined(OS_MACOSX)
- ASSERT_EQ(-1, bthread_fd_timedwait(-1, EVFILT_READ, NULL));
- #endif
- ASSERT_EQ(EINVAL, errno);
- int fds[2];
- ASSERT_EQ(0, pipe(fds));
- #if defined(OS_LINUX)
- ASSERT_EQ(-1, bthread_fd_wait(fds[0], EPOLLET));
- ASSERT_EQ(EINVAL, errno);
- #endif
- bthread_t th;
- ASSERT_EQ(0, bthread_start_urgent(&th, NULL, close_the_fd, &fds[1]));
- butil::Timer tm;
- tm.start();
- #if defined(OS_LINUX)
- ASSERT_EQ(0, bthread_fd_wait(fds[0], EPOLLIN | EPOLLET));
- #elif defined(OS_MACOSX)
- ASSERT_EQ(0, bthread_fd_wait(fds[0], EVFILT_READ));
- #endif
- tm.stop();
- ASSERT_LT(tm.m_elapsed(), 20);
- ASSERT_EQ(0, bthread_join(th, NULL));
- ASSERT_EQ(0, bthread_close(fds[0]));
- }
- void* wait_for_the_fd(void* arg) {
- timespec ts = butil::milliseconds_from_now(50);
- #if defined(OS_LINUX)
- bthread_fd_timedwait(*(int*)arg, EPOLLIN, &ts);
- #elif defined(OS_MACOSX)
- bthread_fd_timedwait(*(int*)arg, EVFILT_READ, &ts);
- #endif
- return NULL;
- }
- TEST(FDTest, timeout) {
- int fds[2];
- ASSERT_EQ(0, pipe(fds));
- pthread_t th;
- ASSERT_EQ(0, pthread_create(&th, NULL, wait_for_the_fd, &fds[0]));
- bthread_t bth;
- ASSERT_EQ(0, bthread_start_urgent(&bth, NULL, wait_for_the_fd, &fds[0]));
- butil::Timer tm;
- tm.start();
- ASSERT_EQ(0, pthread_join(th, NULL));
- ASSERT_EQ(0, bthread_join(bth, NULL));
- tm.stop();
- ASSERT_LT(tm.m_elapsed(), 80);
- ASSERT_EQ(0, bthread_close(fds[0]));
- ASSERT_EQ(0, bthread_close(fds[1]));
- }
- TEST(FDTest, close_should_wakeup_waiter) {
- int fds[2];
- ASSERT_EQ(0, pipe(fds));
- bthread_t bth;
- ASSERT_EQ(0, bthread_start_urgent(&bth, NULL, wait_for_the_fd, &fds[0]));
- butil::Timer tm;
- tm.start();
- ASSERT_EQ(0, bthread_close(fds[0]));
- ASSERT_EQ(0, bthread_join(bth, NULL));
- tm.stop();
- ASSERT_LT(tm.m_elapsed(), 5);
- // Launch again, should quit soon due to EBADF
- #if defined(OS_LINUX)
- ASSERT_EQ(-1, bthread_fd_timedwait(fds[0], EPOLLIN, NULL));
- #elif defined(OS_MACOSX)
- ASSERT_EQ(-1, bthread_fd_timedwait(fds[0], EVFILT_READ, NULL));
- #endif
- ASSERT_EQ(EBADF, errno);
- ASSERT_EQ(0, bthread_close(fds[1]));
- }
- TEST(FDTest, close_definitely_invalid) {
- int ec = 0;
- ASSERT_EQ(-1, close(-1));
- ec = errno;
- ASSERT_EQ(-1, bthread_close(-1));
- ASSERT_EQ(ec, errno);
- }
- TEST(FDTest, bthread_close_fd_which_did_not_call_bthread_functions) {
- int fds[2];
- ASSERT_EQ(0, pipe(fds));
- ASSERT_EQ(0, bthread_close(fds[0]));
- ASSERT_EQ(0, bthread_close(fds[1]));
- }
- TEST(FDTest, double_close) {
- int fds[2];
- ASSERT_EQ(0, pipe(fds));
- ASSERT_EQ(0, close(fds[0]));
- int ec = 0;
- ASSERT_EQ(-1, close(fds[0]));
- ec = errno;
- ASSERT_EQ(0, bthread_close(fds[1]));
- ASSERT_EQ(-1, bthread_close(fds[1]));
- ASSERT_EQ(ec, errno);
- }
- } // namespace
|