bthread_dispatcher_unittest.cpp 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <sys/uio.h> // writev
  18. #include "butil/compat.h"
  19. #include <sys/types.h>
  20. #include <sys/socket.h>
  21. #include <gtest/gtest.h>
  22. #include "butil/time.h"
  23. #include "butil/macros.h"
  24. #include "butil/scoped_lock.h"
  25. #include "butil/fd_utility.h"
  26. #include "butil/logging.h"
  27. #include "butil/gperftools_profiler.h"
  28. #include "bthread/bthread.h"
  29. #include "bthread/task_control.h"
  30. #include "bthread/task_group.h"
  31. #if defined(OS_MACOSX)
  32. #include <sys/types.h> // struct kevent
  33. #include <sys/event.h> // kevent(), kqueue()
  34. #endif
  35. #define RUN_EPOLL_IN_BTHREAD
  36. namespace bthread {
  37. extern TaskControl* global_task_control;
  38. int stop_and_join_epoll_threads();
  39. }
  40. namespace {
  41. volatile bool client_stop = false;
  42. volatile bool server_stop = false;
  43. struct BAIDU_CACHELINE_ALIGNMENT ClientMeta {
  44. int fd;
  45. size_t times;
  46. size_t bytes;
  47. };
  48. struct BAIDU_CACHELINE_ALIGNMENT SocketMeta {
  49. int fd;
  50. int epfd;
  51. butil::atomic<int> req;
  52. char* buf;
  53. size_t buf_cap;
  54. size_t bytes;
  55. size_t times;
  56. };
  57. struct EpollMeta {
  58. int epfd;
  59. int nthread;
  60. int nfold;
  61. };
  62. void* process_thread(void* arg) {
  63. SocketMeta* m = (SocketMeta*)arg;
  64. do {
  65. // Read all data.
  66. do {
  67. ssize_t n = read(m->fd, m->buf, m->buf_cap);
  68. if (n > 0) {
  69. m->bytes += n;
  70. ++m->times;
  71. if ((size_t)n < m->buf_cap) {
  72. break;
  73. }
  74. } else if (n < 0) {
  75. if (errno == EAGAIN) {
  76. break;
  77. } else if (errno == EINTR) {
  78. continue;
  79. } else {
  80. PLOG(FATAL) << "Fail to read fd=" << m->fd;
  81. return NULL;
  82. }
  83. } else {
  84. LOG(FATAL) << "Another end closed fd=" << m->fd;
  85. return NULL;
  86. }
  87. } while (1);
  88. if (m->req.exchange(0, butil::memory_order_release) == 1) {
  89. // no events during reading.
  90. break;
  91. }
  92. if (m->req.fetch_add(1, butil::memory_order_relaxed) != 0) {
  93. // someone else takes the fd.
  94. break;
  95. }
  96. } while (1);
  97. return NULL;
  98. }
  99. void* epoll_thread(void* arg) {
  100. EpollMeta* em = (EpollMeta*)arg;
  101. em->nthread = 0;
  102. em->nfold = 0;
  103. #if defined(OS_LINUX)
  104. epoll_event e[32];
  105. #elif defined(OS_MACOSX)
  106. struct kevent e[32];
  107. #endif
  108. while (!server_stop) {
  109. #if defined(OS_LINUX)
  110. const int n = epoll_wait(em->epfd, e, ARRAY_SIZE(e), -1);
  111. #elif defined(OS_MACOSX)
  112. const int n = kevent(em->epfd, NULL, 0, e, ARRAY_SIZE(e), NULL);
  113. #endif
  114. if (server_stop) {
  115. break;
  116. }
  117. if (n < 0) {
  118. if (EINTR == errno) {
  119. continue;
  120. }
  121. #if defined(OS_LINUX)
  122. PLOG(FATAL) << "Fail to epoll_wait";
  123. #elif defined(OS_MACOSX)
  124. PLOG(FATAL) << "Fail to kevent";
  125. #endif
  126. break;
  127. }
  128. for (int i = 0; i < n; ++i) {
  129. #if defined(OS_LINUX)
  130. SocketMeta* m = (SocketMeta*)e[i].data.ptr;
  131. #elif defined(OS_MACOSX)
  132. SocketMeta* m = (SocketMeta*)e[i].udata;
  133. #endif
  134. if (m->req.fetch_add(1, butil::memory_order_acquire) == 0) {
  135. bthread_t th;
  136. bthread_start_urgent(
  137. &th, &BTHREAD_ATTR_SMALL, process_thread, m);
  138. ++em->nthread;
  139. } else {
  140. ++em->nfold;
  141. }
  142. }
  143. }
  144. return NULL;
  145. }
  146. void* client_thread(void* arg) {
  147. ClientMeta* m = (ClientMeta*)arg;
  148. size_t offset = 0;
  149. m->times = 0;
  150. m->bytes = 0;
  151. const size_t buf_cap = 32768;
  152. char* buf = (char*)malloc(buf_cap);
  153. for (size_t i = 0; i < buf_cap/8; ++i) {
  154. ((uint64_t*)buf)[i] = i;
  155. }
  156. while (!client_stop) {
  157. ssize_t n;
  158. if (offset == 0) {
  159. n = write(m->fd, buf, buf_cap);
  160. } else {
  161. iovec v[2];
  162. v[0].iov_base = buf + offset;
  163. v[0].iov_len = buf_cap - offset;
  164. v[1].iov_base = buf;
  165. v[1].iov_len = offset;
  166. n = writev(m->fd, v, 2);
  167. }
  168. if (n < 0) {
  169. if (errno != EINTR) {
  170. PLOG(FATAL) << "Fail to write fd=" << m->fd;
  171. return NULL;
  172. }
  173. } else {
  174. ++m->times;
  175. m->bytes += n;
  176. offset += n;
  177. if (offset >= buf_cap) {
  178. offset -= buf_cap;
  179. }
  180. }
  181. }
  182. return NULL;
  183. }
  184. inline uint32_t fmix32 ( uint32_t h ) {
  185. h ^= h >> 16;
  186. h *= 0x85ebca6b;
  187. h ^= h >> 13;
  188. h *= 0xc2b2ae35;
  189. h ^= h >> 16;
  190. return h;
  191. }
  192. TEST(DispatcherTest, dispatch_tasks) {
  193. client_stop = false;
  194. server_stop = false;
  195. const size_t NEPOLL = 1;
  196. const size_t NCLIENT = 16;
  197. int epfd[NEPOLL];
  198. bthread_t eth[NEPOLL];
  199. EpollMeta* em[NEPOLL];
  200. int fds[2 * NCLIENT];
  201. pthread_t cth[NCLIENT];
  202. ClientMeta* cm[NCLIENT];
  203. SocketMeta* sm[NCLIENT];
  204. for (size_t i = 0; i < NEPOLL; ++i) {
  205. #if defined(OS_LINUX)
  206. epfd[i] = epoll_create(1024);
  207. #elif defined(OS_MACOSX)
  208. epfd[i] = kqueue();
  209. #endif
  210. ASSERT_GT(epfd[i], 0);
  211. }
  212. for (size_t i = 0; i < NCLIENT; ++i) {
  213. ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds + 2 * i));
  214. SocketMeta* m = new SocketMeta;
  215. m->fd = fds[i * 2];
  216. m->epfd = epfd[fmix32(i) % NEPOLL];
  217. m->req = 0;
  218. m->buf_cap = 32768;
  219. m->buf = (char*)malloc(m->buf_cap);
  220. m->bytes = 0;
  221. m->times = 0;
  222. ASSERT_EQ(0, butil::make_non_blocking(m->fd));
  223. sm[i] = m;
  224. #if defined(OS_LINUX)
  225. epoll_event evt = { (uint32_t)(EPOLLIN | EPOLLET), { m } };
  226. ASSERT_EQ(0, epoll_ctl(m->epfd, EPOLL_CTL_ADD, m->fd, &evt));
  227. #elif defined(OS_MACOSX)
  228. struct kevent kqueue_event;
  229. EV_SET(&kqueue_event, m->fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, m);
  230. ASSERT_EQ(0, kevent(m->epfd, &kqueue_event, 1, NULL, 0, NULL));
  231. #endif
  232. cm[i] = new ClientMeta;
  233. cm[i]->fd = fds[i * 2 + 1];
  234. cm[i]->times = 0;
  235. cm[i]->bytes = 0;
  236. ASSERT_EQ(0, pthread_create(&cth[i], NULL, client_thread, cm[i]));
  237. }
  238. ProfilerStart("dispatcher.prof");
  239. butil::Timer tm;
  240. tm.start();
  241. for (size_t i = 0; i < NEPOLL; ++i) {
  242. EpollMeta *m = new EpollMeta;
  243. em[i] = m;
  244. m->epfd = epfd[i];
  245. #ifdef RUN_EPOLL_IN_BTHREAD
  246. ASSERT_EQ(0, bthread_start_background(&eth[i], NULL, epoll_thread, m));
  247. #else
  248. ASSERT_EQ(0, pthread_create(&eth[i], NULL, epoll_thread, m));
  249. #endif
  250. }
  251. sleep(5);
  252. tm.stop();
  253. ProfilerStop();
  254. size_t client_bytes = 0;
  255. size_t server_bytes = 0;
  256. for (size_t i = 0; i < NCLIENT; ++i) {
  257. client_bytes += cm[i]->bytes;
  258. server_bytes += sm[i]->bytes;
  259. }
  260. size_t all_nthread = 0, all_nfold = 0;
  261. for (size_t i = 0; i < NEPOLL; ++i) {
  262. all_nthread += em[i]->nthread;
  263. all_nfold += em[i]->nfold;
  264. }
  265. LOG(INFO) << "client_tp=" << client_bytes / (double)tm.u_elapsed()
  266. << "MB/s server_tp=" << server_bytes / (double)tm.u_elapsed()
  267. << "MB/s nthread=" << all_nthread << " nfold=" << all_nfold;
  268. client_stop = true;
  269. for (size_t i = 0; i < NCLIENT; ++i) {
  270. pthread_join(cth[i], NULL);
  271. }
  272. server_stop = true;
  273. for (size_t i = 0; i < NEPOLL; ++i) {
  274. #if defined(OS_LINUX)
  275. epoll_event evt = { EPOLLOUT, { NULL } };
  276. ASSERT_EQ(0, epoll_ctl(epfd[i], EPOLL_CTL_ADD, 0, &evt));
  277. #elif defined(OS_MACOSX)
  278. struct kevent kqueue_event;
  279. EV_SET(&kqueue_event, 0, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, NULL);
  280. ASSERT_EQ(0, kevent(epfd[i], &kqueue_event, 1, NULL, 0, NULL));
  281. #endif
  282. #ifdef RUN_EPOLL_IN_BTHREAD
  283. bthread_join(eth[i], NULL);
  284. #else
  285. pthread_join(eth[i], NULL);
  286. #endif
  287. }
  288. bthread::stop_and_join_epoll_threads();
  289. bthread_usleep(100000);
  290. }
  291. } // namespace