123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- // Licensed to the Apache Software Foundation (ASF) under one
- // or more contributor license agreements. See the NOTICE file
- // distributed with this work for additional information
- // regarding copyright ownership. The ASF licenses this file
- // to you under the Apache License, Version 2.0 (the
- // "License"); you may not use this file except in compliance
- // with the License. You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing,
- // software distributed under the License is distributed on an
- // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- // KIND, either express or implied. See the License for the
- // specific language governing permissions and limitations
- // under the License.
- #include <sys/uio.h> // writev
- #include "butil/compat.h"
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <gtest/gtest.h>
- #include "butil/time.h"
- #include "butil/macros.h"
- #include "butil/scoped_lock.h"
- #include "butil/fd_utility.h"
- #include "butil/logging.h"
- #include "butil/gperftools_profiler.h"
- #include "bthread/bthread.h"
- #include "bthread/task_control.h"
- #include "bthread/task_group.h"
- #if defined(OS_MACOSX)
- #include <sys/types.h> // struct kevent
- #include <sys/event.h> // kevent(), kqueue()
- #endif
- #define RUN_EPOLL_IN_BTHREAD
- namespace bthread {
- extern TaskControl* global_task_control;
- int stop_and_join_epoll_threads();
- }
- namespace {
- volatile bool client_stop = false;
- volatile bool server_stop = false;
- struct BAIDU_CACHELINE_ALIGNMENT ClientMeta {
- int fd;
- size_t times;
- size_t bytes;
- };
- struct BAIDU_CACHELINE_ALIGNMENT SocketMeta {
- int fd;
- int epfd;
- butil::atomic<int> req;
- char* buf;
- size_t buf_cap;
- size_t bytes;
- size_t times;
- };
- struct EpollMeta {
- int epfd;
- int nthread;
- int nfold;
- };
- void* process_thread(void* arg) {
- SocketMeta* m = (SocketMeta*)arg;
- do {
- // Read all data.
- do {
- ssize_t n = read(m->fd, m->buf, m->buf_cap);
- if (n > 0) {
- m->bytes += n;
- ++m->times;
- if ((size_t)n < m->buf_cap) {
- break;
- }
- } else if (n < 0) {
- if (errno == EAGAIN) {
- break;
- } else if (errno == EINTR) {
- continue;
- } else {
- PLOG(FATAL) << "Fail to read fd=" << m->fd;
- return NULL;
- }
- } else {
- LOG(FATAL) << "Another end closed fd=" << m->fd;
- return NULL;
- }
- } while (1);
-
- if (m->req.exchange(0, butil::memory_order_release) == 1) {
- // no events during reading.
- break;
- }
- if (m->req.fetch_add(1, butil::memory_order_relaxed) != 0) {
- // someone else takes the fd.
- break;
- }
- } while (1);
- return NULL;
- }
- void* epoll_thread(void* arg) {
- EpollMeta* em = (EpollMeta*)arg;
- em->nthread = 0;
- em->nfold = 0;
- #if defined(OS_LINUX)
- epoll_event e[32];
- #elif defined(OS_MACOSX)
- struct kevent e[32];
- #endif
- while (!server_stop) {
- #if defined(OS_LINUX)
- const int n = epoll_wait(em->epfd, e, ARRAY_SIZE(e), -1);
- #elif defined(OS_MACOSX)
- const int n = kevent(em->epfd, NULL, 0, e, ARRAY_SIZE(e), NULL);
- #endif
- if (server_stop) {
- break;
- }
- if (n < 0) {
- if (EINTR == errno) {
- continue;
- }
- #if defined(OS_LINUX)
- PLOG(FATAL) << "Fail to epoll_wait";
- #elif defined(OS_MACOSX)
- PLOG(FATAL) << "Fail to kevent";
- #endif
- break;
- }
- for (int i = 0; i < n; ++i) {
- #if defined(OS_LINUX)
- SocketMeta* m = (SocketMeta*)e[i].data.ptr;
- #elif defined(OS_MACOSX)
- SocketMeta* m = (SocketMeta*)e[i].udata;
- #endif
- if (m->req.fetch_add(1, butil::memory_order_acquire) == 0) {
- bthread_t th;
- bthread_start_urgent(
- &th, &BTHREAD_ATTR_SMALL, process_thread, m);
- ++em->nthread;
- } else {
- ++em->nfold;
- }
- }
- }
- return NULL;
- }
- void* client_thread(void* arg) {
- ClientMeta* m = (ClientMeta*)arg;
- size_t offset = 0;
- m->times = 0;
- m->bytes = 0;
- const size_t buf_cap = 32768;
- char* buf = (char*)malloc(buf_cap);
- for (size_t i = 0; i < buf_cap/8; ++i) {
- ((uint64_t*)buf)[i] = i;
- }
- while (!client_stop) {
- ssize_t n;
- if (offset == 0) {
- n = write(m->fd, buf, buf_cap);
- } else {
- iovec v[2];
- v[0].iov_base = buf + offset;
- v[0].iov_len = buf_cap - offset;
- v[1].iov_base = buf;
- v[1].iov_len = offset;
- n = writev(m->fd, v, 2);
- }
- if (n < 0) {
- if (errno != EINTR) {
- PLOG(FATAL) << "Fail to write fd=" << m->fd;
- return NULL;
- }
- } else {
- ++m->times;
- m->bytes += n;
- offset += n;
- if (offset >= buf_cap) {
- offset -= buf_cap;
- }
- }
- }
- return NULL;
- }
- inline uint32_t fmix32 ( uint32_t h ) {
- h ^= h >> 16;
- h *= 0x85ebca6b;
- h ^= h >> 13;
- h *= 0xc2b2ae35;
- h ^= h >> 16;
- return h;
- }
- TEST(DispatcherTest, dispatch_tasks) {
- client_stop = false;
- server_stop = false;
- const size_t NEPOLL = 1;
- const size_t NCLIENT = 16;
- int epfd[NEPOLL];
- bthread_t eth[NEPOLL];
- EpollMeta* em[NEPOLL];
- int fds[2 * NCLIENT];
- pthread_t cth[NCLIENT];
- ClientMeta* cm[NCLIENT];
- SocketMeta* sm[NCLIENT];
- for (size_t i = 0; i < NEPOLL; ++i) {
- #if defined(OS_LINUX)
- epfd[i] = epoll_create(1024);
- #elif defined(OS_MACOSX)
- epfd[i] = kqueue();
- #endif
- ASSERT_GT(epfd[i], 0);
- }
-
- for (size_t i = 0; i < NCLIENT; ++i) {
- ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds + 2 * i));
- SocketMeta* m = new SocketMeta;
- m->fd = fds[i * 2];
- m->epfd = epfd[fmix32(i) % NEPOLL];
- m->req = 0;
- m->buf_cap = 32768;
- m->buf = (char*)malloc(m->buf_cap);
- m->bytes = 0;
- m->times = 0;
- ASSERT_EQ(0, butil::make_non_blocking(m->fd));
- sm[i] = m;
- #if defined(OS_LINUX)
- epoll_event evt = { (uint32_t)(EPOLLIN | EPOLLET), { m } };
- ASSERT_EQ(0, epoll_ctl(m->epfd, EPOLL_CTL_ADD, m->fd, &evt));
- #elif defined(OS_MACOSX)
- struct kevent kqueue_event;
- EV_SET(&kqueue_event, m->fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, m);
- ASSERT_EQ(0, kevent(m->epfd, &kqueue_event, 1, NULL, 0, NULL));
- #endif
- cm[i] = new ClientMeta;
- cm[i]->fd = fds[i * 2 + 1];
- cm[i]->times = 0;
- cm[i]->bytes = 0;
- ASSERT_EQ(0, pthread_create(&cth[i], NULL, client_thread, cm[i]));
- }
-
- ProfilerStart("dispatcher.prof");
- butil::Timer tm;
- tm.start();
- for (size_t i = 0; i < NEPOLL; ++i) {
- EpollMeta *m = new EpollMeta;
- em[i] = m;
- m->epfd = epfd[i];
- #ifdef RUN_EPOLL_IN_BTHREAD
- ASSERT_EQ(0, bthread_start_background(ð[i], NULL, epoll_thread, m));
- #else
- ASSERT_EQ(0, pthread_create(ð[i], NULL, epoll_thread, m));
- #endif
- }
- sleep(5);
- tm.stop();
- ProfilerStop();
- size_t client_bytes = 0;
- size_t server_bytes = 0;
- for (size_t i = 0; i < NCLIENT; ++i) {
- client_bytes += cm[i]->bytes;
- server_bytes += sm[i]->bytes;
- }
- size_t all_nthread = 0, all_nfold = 0;
- for (size_t i = 0; i < NEPOLL; ++i) {
- all_nthread += em[i]->nthread;
- all_nfold += em[i]->nfold;
- }
- LOG(INFO) << "client_tp=" << client_bytes / (double)tm.u_elapsed()
- << "MB/s server_tp=" << server_bytes / (double)tm.u_elapsed()
- << "MB/s nthread=" << all_nthread << " nfold=" << all_nfold;
- client_stop = true;
- for (size_t i = 0; i < NCLIENT; ++i) {
- pthread_join(cth[i], NULL);
- }
- server_stop = true;
- for (size_t i = 0; i < NEPOLL; ++i) {
- #if defined(OS_LINUX)
- epoll_event evt = { EPOLLOUT, { NULL } };
- ASSERT_EQ(0, epoll_ctl(epfd[i], EPOLL_CTL_ADD, 0, &evt));
- #elif defined(OS_MACOSX)
- struct kevent kqueue_event;
- EV_SET(&kqueue_event, 0, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, NULL);
- ASSERT_EQ(0, kevent(epfd[i], &kqueue_event, 1, NULL, 0, NULL));
- #endif
- #ifdef RUN_EPOLL_IN_BTHREAD
- bthread_join(eth[i], NULL);
- #else
- pthread_join(eth[i], NULL);
- #endif
- }
- bthread::stop_and_join_epoll_threads();
- bthread_usleep(100000);
- }
- } // namespace
|