event_dispatcher.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. // Copyright (c) 2014 Baidu, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Authors: Ge,Jun (gejun@baidu.com)
  15. // Rujie Jiang (jiangrujie@baidu.com)
  16. #include <gflags/gflags.h> // DEFINE_int32
  17. #include "butil/compat.h"
  18. #include "butil/fd_utility.h" // make_close_on_exec
  19. #include "butil/logging.h" // LOG
  20. #include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32
  21. #include "bthread/bthread.h" // bthread_start_background
  22. #include "brpc/event_dispatcher.h"
  23. #ifdef BRPC_SOCKET_HAS_EOF
  24. #include "brpc/details/has_epollrdhup.h"
  25. #endif
  26. #include "brpc/reloadable_flags.h"
  27. #if defined(OS_MACOSX)
  28. #include <sys/types.h>
  29. #include <sys/event.h>
  30. #include <sys/time.h>
  31. #endif
  32. namespace brpc {
  33. DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
  34. DEFINE_bool(usercode_in_pthread, false,
  35. "Call user's callback in pthreads, use bthreads otherwise");
  36. EventDispatcher::EventDispatcher()
  37. : _epfd(-1)
  38. , _stop(false)
  39. , _tid(0)
  40. , _consumer_thread_attr(BTHREAD_ATTR_NORMAL)
  41. {
  42. #if defined(OS_LINUX)
  43. _epfd = epoll_create(1024 * 1024);
  44. if (_epfd < 0) {
  45. PLOG(FATAL) << "Fail to create epoll";
  46. return;
  47. }
  48. #elif defined(OS_MACOSX)
  49. _epfd = kqueue();
  50. if (_epfd < 0) {
  51. PLOG(FATAL) << "Fail to create kqueue";
  52. return;
  53. }
  54. #endif
  55. CHECK_EQ(0, butil::make_close_on_exec(_epfd));
  56. _wakeup_fds[0] = -1;
  57. _wakeup_fds[1] = -1;
  58. if (pipe(_wakeup_fds) != 0) {
  59. PLOG(FATAL) << "Fail to create pipe";
  60. return;
  61. }
  62. }
  63. EventDispatcher::~EventDispatcher() {
  64. Stop();
  65. Join();
  66. if (_epfd >= 0) {
  67. close(_epfd);
  68. _epfd = -1;
  69. }
  70. if (_wakeup_fds[0] > 0) {
  71. close(_wakeup_fds[0]);
  72. close(_wakeup_fds[1]);
  73. }
  74. }
  75. int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
  76. if (_epfd < 0) {
  77. #if defined(OS_LINUX)
  78. LOG(FATAL) << "epoll was not created";
  79. #elif defined(OS_MACOSX)
  80. LOG(FATAL) << "kqueue was not created";
  81. #endif
  82. return -1;
  83. }
  84. if (_tid != 0) {
  85. LOG(FATAL) << "Already started this dispatcher(" << this
  86. << ") in bthread=" << _tid;
  87. return -1;
  88. }
  89. // Set _consumer_thread_attr before creating epoll/kqueue thread to make sure
  90. // everyting seems sane to the thread.
  91. _consumer_thread_attr = (consumer_thread_attr ?
  92. *consumer_thread_attr : BTHREAD_ATTR_NORMAL);
  93. // Polling thread uses the same attr for consumer threads (NORMAL right
  94. // now). Previously, we used small stack (32KB) which may be overflowed
  95. // when the older comlog (e.g. 3.1.85) calls com_openlog_r(). Since this
  96. // is also a potential issue for consumer threads, using the same attr
  97. // should be a reasonable solution.
  98. int rc = bthread_start_background(
  99. &_tid, &_consumer_thread_attr, RunThis, this);
  100. if (rc) {
  101. LOG(FATAL) << "Fail to create epoll/kqueue thread: " << berror(rc);
  102. return -1;
  103. }
  104. return 0;
  105. }
  106. bool EventDispatcher::Running() const {
  107. return !_stop && _epfd >= 0 && _tid != 0;
  108. }
  109. void EventDispatcher::Stop() {
  110. _stop = true;
  111. if (_epfd >= 0) {
  112. #if defined(OS_LINUX)
  113. epoll_event evt = { EPOLLOUT, { NULL } };
  114. epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt);
  115. #elif defined(OS_MACOSX)
  116. struct kevent kqueue_event;
  117. EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
  118. 0, 0, NULL);
  119. kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL);
  120. #endif
  121. }
  122. }
  123. void EventDispatcher::Join() {
  124. if (_tid) {
  125. bthread_join(_tid, NULL);
  126. _tid = 0;
  127. }
  128. }
  129. int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
  130. if (_epfd < 0) {
  131. errno = EINVAL;
  132. return -1;
  133. }
  134. #if defined(OS_LINUX)
  135. epoll_event evt;
  136. evt.data.u64 = socket_id;
  137. evt.events = EPOLLOUT | EPOLLET;
  138. #ifdef BRPC_SOCKET_HAS_EOF
  139. evt.events |= has_epollrdhup;
  140. #endif
  141. if (pollin) {
  142. evt.events |= EPOLLIN;
  143. if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
  144. // This fd has been removed from epoll via `RemoveConsumer',
  145. // in which case errno will be ENOENT
  146. return -1;
  147. }
  148. } else {
  149. if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0) {
  150. return -1;
  151. }
  152. }
  153. #elif defined(OS_MACOSX)
  154. struct kevent evt;
  155. //TODO(zhujiashun): add EV_EOF
  156. EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR,
  157. 0, 0, (void*)socket_id);
  158. if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
  159. return -1;
  160. }
  161. if (pollin) {
  162. EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
  163. 0, 0, (void*)socket_id);
  164. if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
  165. return -1;
  166. }
  167. }
  168. #endif
  169. return 0;
  170. }
  171. int EventDispatcher::RemoveEpollOut(SocketId socket_id,
  172. int fd, bool pollin) {
  173. #if defined(OS_LINUX)
  174. if (pollin) {
  175. epoll_event evt;
  176. evt.data.u64 = socket_id;
  177. evt.events = EPOLLIN | EPOLLET;
  178. #ifdef BRPC_SOCKET_HAS_EOF
  179. evt.events |= has_epollrdhup;
  180. #endif
  181. return epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt);
  182. } else {
  183. return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
  184. }
  185. #elif defined(OS_MACOSX)
  186. struct kevent evt;
  187. EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
  188. if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
  189. return -1;
  190. }
  191. if (pollin) {
  192. EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
  193. 0, 0, (void*)socket_id);
  194. return kevent(_epfd, &evt, 1, NULL, 0, NULL);
  195. }
  196. return 0;
  197. #endif
  198. return -1;
  199. }
  200. int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
  201. if (_epfd < 0) {
  202. errno = EINVAL;
  203. return -1;
  204. }
  205. #if defined(OS_LINUX)
  206. epoll_event evt;
  207. evt.events = EPOLLIN | EPOLLET;
  208. evt.data.u64 = socket_id;
  209. #ifdef BRPC_SOCKET_HAS_EOF
  210. evt.events |= has_epollrdhup;
  211. #endif
  212. return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt);
  213. #elif defined(OS_MACOSX)
  214. struct kevent evt;
  215. EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
  216. 0, 0, (void*)socket_id);
  217. return kevent(_epfd, &evt, 1, NULL, 0, NULL);
  218. #endif
  219. return -1;
  220. }
  221. int EventDispatcher::RemoveConsumer(int fd) {
  222. if (fd < 0) {
  223. return -1;
  224. }
  225. // Removing the consumer from dispatcher before closing the fd because
  226. // if process was forked and the fd is not marked as close-on-exec,
  227. // closing does not set reference count of the fd to 0, thus does not
  228. // remove the fd from epoll. More badly, the fd will not be removable
  229. // from epoll again! If the fd was level-triggered and there's data left,
  230. // epoll_wait will keep returning events of the fd continuously, making
  231. // program abnormal.
  232. #if defined(OS_LINUX)
  233. if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
  234. PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd;
  235. return -1;
  236. }
  237. #elif defined(OS_MACOSX)
  238. struct kevent evt;
  239. EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
  240. kevent(_epfd, &evt, 1, NULL, 0, NULL);
  241. EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
  242. kevent(_epfd, &evt, 1, NULL, 0, NULL);
  243. #endif
  244. return 0;
  245. }
  246. void* EventDispatcher::RunThis(void* arg) {
  247. ((EventDispatcher*)arg)->Run();
  248. return NULL;
  249. }
  250. void EventDispatcher::Run() {
  251. while (!_stop) {
  252. #if defined(OS_LINUX)
  253. epoll_event e[32];
  254. #ifdef BRPC_ADDITIONAL_EPOLL
  255. // Performance downgrades in examples.
  256. int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0);
  257. if (n == 0) {
  258. n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
  259. }
  260. #else
  261. const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
  262. #endif
  263. #elif defined(OS_MACOSX)
  264. struct kevent e[32];
  265. int n = kevent(_epfd, NULL, 0, e, ARRAY_SIZE(e), NULL);
  266. #endif
  267. if (_stop) {
  268. // epoll_ctl/epoll_wait should have some sort of memory fencing
  269. // guaranteeing that we(after epoll_wait) see _stop set before
  270. // epoll_ctl.
  271. break;
  272. }
  273. if (n < 0) {
  274. if (EINTR == errno) {
  275. // We've checked _stop, no wake-up will be missed.
  276. continue;
  277. }
  278. #if defined(OS_LINUX)
  279. PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
  280. #elif defined(OS_MACOSX)
  281. PLOG(FATAL) << "Fail to kqueue epfd=" << _epfd;
  282. #endif
  283. break;
  284. }
  285. for (int i = 0; i < n; ++i) {
  286. #if defined(OS_LINUX)
  287. if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)
  288. #ifdef BRPC_SOCKET_HAS_EOF
  289. || (e[i].events & has_epollrdhup)
  290. #endif
  291. ) {
  292. // We don't care about the return value.
  293. Socket::StartInputEvent(e[i].data.u64, e[i].events,
  294. _consumer_thread_attr);
  295. }
  296. #elif defined(OS_MACOSX)
  297. if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) {
  298. // We don't care about the return value.
  299. Socket::StartInputEvent((SocketId)e[i].udata, e[i].filter,
  300. _consumer_thread_attr);
  301. }
  302. #endif
  303. }
  304. for (int i = 0; i < n; ++i) {
  305. #if defined(OS_LINUX)
  306. if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
  307. // We don't care about the return value.
  308. Socket::HandleEpollOut(e[i].data.u64);
  309. }
  310. #elif defined(OS_MACOSX)
  311. if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) {
  312. // We don't care about the return value.
  313. Socket::HandleEpollOut((SocketId)e[i].udata);
  314. }
  315. #endif
  316. }
  317. }
  318. }
  319. static EventDispatcher* g_edisp = NULL;
  320. static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT;
  321. static void StopAndJoinGlobalDispatchers() {
  322. for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
  323. g_edisp[i].Stop();
  324. g_edisp[i].Join();
  325. }
  326. }
  327. void InitializeGlobalDispatchers() {
  328. g_edisp = new EventDispatcher[FLAGS_event_dispatcher_num];
  329. for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
  330. const bthread_attr_t attr = FLAGS_usercode_in_pthread ?
  331. BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
  332. CHECK_EQ(0, g_edisp[i].Start(&attr));
  333. }
  334. // This atexit is will be run before g_task_control.stop() because above
  335. // Start() initializes g_task_control by creating bthread (to run epoll/kqueue).
  336. CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
  337. }
  338. EventDispatcher& GetGlobalEventDispatcher(int fd) {
  339. pthread_once(&g_edisp_once, InitializeGlobalDispatchers);
  340. if (FLAGS_event_dispatcher_num == 1) {
  341. return g_edisp[0];
  342. }
  343. int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num;
  344. return g_edisp[index];
  345. }
  346. } // namespace brpc