fd.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // bthread - A M:N threading library to make applications more concurrent.
  18. // Date: Thu Aug 7 18:56:27 CST 2014
  19. #include "butil/compat.h"
  20. #include <new> // std::nothrow
  21. #include <sys/poll.h> // poll()
  22. #if defined(OS_MACOSX)
  23. #include <sys/types.h> // struct kevent
  24. #include <sys/event.h> // kevent(), kqueue()
  25. #endif
  26. #include "butil/atomicops.h"
  27. #include "butil/time.h"
  28. #include "butil/fd_utility.h" // make_non_blocking
  29. #include "butil/logging.h"
  30. #include "butil/third_party/murmurhash3/murmurhash3.h" // fmix32
  31. #include "bthread/butex.h" // butex_*
  32. #include "bthread/task_group.h" // TaskGroup
  33. #include "bthread/bthread.h" // bthread_start_urgent
  34. // Implement bthread functions on file descriptors
  35. namespace bthread {
  36. extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
  37. template <typename T, size_t NBLOCK, size_t BLOCK_SIZE>
  38. class LazyArray {
  39. struct Block {
  40. butil::atomic<T> items[BLOCK_SIZE];
  41. };
  42. public:
  43. LazyArray() {
  44. memset(_blocks, 0, sizeof(butil::atomic<Block*>) * NBLOCK);
  45. }
  46. butil::atomic<T>* get_or_new(size_t index) {
  47. const size_t block_index = index / BLOCK_SIZE;
  48. if (block_index >= NBLOCK) {
  49. return NULL;
  50. }
  51. const size_t block_offset = index - block_index * BLOCK_SIZE;
  52. Block* b = _blocks[block_index].load(butil::memory_order_consume);
  53. if (b != NULL) {
  54. return b->items + block_offset;
  55. }
  56. b = new (std::nothrow) Block;
  57. if (NULL == b) {
  58. b = _blocks[block_index].load(butil::memory_order_consume);
  59. return (b ? b->items + block_offset : NULL);
  60. }
  61. // Set items to default value of T.
  62. std::fill(b->items, b->items + BLOCK_SIZE, T());
  63. Block* expected = NULL;
  64. if (_blocks[block_index].compare_exchange_strong(
  65. expected, b, butil::memory_order_release,
  66. butil::memory_order_consume)) {
  67. return b->items + block_offset;
  68. }
  69. delete b;
  70. return expected->items + block_offset;
  71. }
  72. butil::atomic<T>* get(size_t index) const {
  73. const size_t block_index = index / BLOCK_SIZE;
  74. if (__builtin_expect(block_index < NBLOCK, 1)) {
  75. const size_t block_offset = index - block_index * BLOCK_SIZE;
  76. Block* const b = _blocks[block_index].load(butil::memory_order_consume);
  77. if (__builtin_expect(b != NULL, 1)) {
  78. return b->items + block_offset;
  79. }
  80. }
  81. return NULL;
  82. }
  83. private:
  84. butil::atomic<Block*> _blocks[NBLOCK];
  85. };
  86. typedef butil::atomic<int> EpollButex;
  87. static EpollButex* const CLOSING_GUARD = (EpollButex*)(intptr_t)-1L;
  88. #ifndef NDEBUG
  89. butil::static_atomic<int> break_nums = BUTIL_STATIC_ATOMIC_INIT(0);
  90. #endif
  91. // Able to address 67108864 file descriptors, should be enough.
  92. LazyArray<EpollButex*, 262144/*NBLOCK*/, 256/*BLOCK_SIZE*/> fd_butexes;
  93. static const int BTHREAD_DEFAULT_EPOLL_SIZE = 65536;
  94. class EpollThread {
  95. public:
  96. EpollThread()
  97. : _epfd(-1)
  98. , _stop(false)
  99. , _tid(0) {
  100. }
  101. int start(int epoll_size) {
  102. if (started()) {
  103. return -1;
  104. }
  105. _start_mutex.lock();
  106. // Double check
  107. if (started()) {
  108. _start_mutex.unlock();
  109. return -1;
  110. }
  111. #if defined(OS_LINUX)
  112. _epfd = epoll_create(epoll_size);
  113. #elif defined(OS_MACOSX)
  114. _epfd = kqueue();
  115. #endif
  116. _start_mutex.unlock();
  117. if (_epfd < 0) {
  118. PLOG(FATAL) << "Fail to epoll_create/kqueue";
  119. return -1;
  120. }
  121. if (bthread_start_background(
  122. &_tid, NULL, EpollThread::run_this, this) != 0) {
  123. close(_epfd);
  124. _epfd = -1;
  125. LOG(FATAL) << "Fail to create epoll bthread";
  126. return -1;
  127. }
  128. return 0;
  129. }
  130. // Note: This function does not wake up suspended fd_wait. This is fine
  131. // since stop_and_join is only called on program's termination
  132. // (g_task_control.stop()), suspended bthreads do not block quit of
  133. // worker pthreads and completion of g_task_control.stop().
  134. int stop_and_join() {
  135. if (!started()) {
  136. return 0;
  137. }
  138. // No matter what this function returns, _epfd will be set to -1
  139. // (making started() false) to avoid latter stop_and_join() to
  140. // enter again.
  141. const int saved_epfd = _epfd;
  142. _epfd = -1;
  143. // epoll_wait cannot be woken up by closing _epfd. We wake up
  144. // epoll_wait by inserting a fd continuously triggering EPOLLOUT.
  145. // Visibility of _stop: constant EPOLLOUT forces epoll_wait to see
  146. // _stop (to be true) finally.
  147. _stop = true;
  148. int closing_epoll_pipe[2];
  149. if (pipe(closing_epoll_pipe)) {
  150. PLOG(FATAL) << "Fail to create closing_epoll_pipe";
  151. return -1;
  152. }
  153. #if defined(OS_LINUX)
  154. epoll_event evt = { EPOLLOUT, { NULL } };
  155. if (epoll_ctl(saved_epfd, EPOLL_CTL_ADD,
  156. closing_epoll_pipe[1], &evt) < 0) {
  157. #elif defined(OS_MACOSX)
  158. struct kevent kqueue_event;
  159. EV_SET(&kqueue_event, closing_epoll_pipe[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
  160. 0, 0, NULL);
  161. if (kevent(saved_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
  162. #endif
  163. PLOG(FATAL) << "Fail to add closing_epoll_pipe into epfd="
  164. << saved_epfd;
  165. return -1;
  166. }
  167. const int rc = bthread_join(_tid, NULL);
  168. if (rc) {
  169. LOG(FATAL) << "Fail to join EpollThread, " << berror(rc);
  170. return -1;
  171. }
  172. close(closing_epoll_pipe[0]);
  173. close(closing_epoll_pipe[1]);
  174. close(saved_epfd);
  175. return 0;
  176. }
  177. int fd_wait(int fd, unsigned events, const timespec* abstime) {
  178. butil::atomic<EpollButex*>* p = fd_butexes.get_or_new(fd);
  179. if (NULL == p) {
  180. errno = ENOMEM;
  181. return -1;
  182. }
  183. EpollButex* butex = p->load(butil::memory_order_consume);
  184. if (NULL == butex) {
  185. // It is rare to wait on one file descriptor from multiple threads
  186. // simultaneously. Creating singleton by optimistic locking here
  187. // saves mutexes for each butex.
  188. butex = butex_create_checked<EpollButex>();
  189. butex->store(0, butil::memory_order_relaxed);
  190. EpollButex* expected = NULL;
  191. if (!p->compare_exchange_strong(expected, butex,
  192. butil::memory_order_release,
  193. butil::memory_order_consume)) {
  194. butex_destroy(butex);
  195. butex = expected;
  196. }
  197. }
  198. while (butex == CLOSING_GUARD) { // bthread_close() is running.
  199. if (sched_yield() < 0) {
  200. return -1;
  201. }
  202. butex = p->load(butil::memory_order_consume);
  203. }
  204. // Save value of butex before adding to epoll because the butex may
  205. // be changed before butex_wait. No memory fence because EPOLL_CTL_MOD
  206. // and EPOLL_CTL_ADD shall have release fence.
  207. const int expected_val = butex->load(butil::memory_order_relaxed);
  208. #if defined(OS_LINUX)
  209. # ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
  210. epoll_event evt = { events | EPOLLONESHOT, { butex } };
  211. if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
  212. if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
  213. errno != EEXIST) {
  214. PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
  215. return -1;
  216. }
  217. }
  218. # else
  219. epoll_event evt;
  220. evt.events = events;
  221. evt.data.fd = fd;
  222. if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
  223. errno != EEXIST) {
  224. PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
  225. return -1;
  226. }
  227. # endif
  228. #elif defined(OS_MACOSX)
  229. struct kevent kqueue_event;
  230. EV_SET(&kqueue_event, fd, events, EV_ADD | EV_ENABLE | EV_ONESHOT,
  231. 0, 0, butex);
  232. if (kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
  233. PLOG(FATAL) << "Fail to add fd=" << fd << " into kqueuefd=" << _epfd;
  234. return -1;
  235. }
  236. #endif
  237. if (butex_wait(butex, expected_val, abstime) < 0 &&
  238. errno != EWOULDBLOCK && errno != EINTR) {
  239. return -1;
  240. }
  241. return 0;
  242. }
  243. int fd_close(int fd) {
  244. if (fd < 0) {
  245. // what close(-1) returns
  246. errno = EBADF;
  247. return -1;
  248. }
  249. butil::atomic<EpollButex*>* pbutex = bthread::fd_butexes.get(fd);
  250. if (NULL == pbutex) {
  251. // Did not call bthread_fd functions, close directly.
  252. return close(fd);
  253. }
  254. EpollButex* butex = pbutex->exchange(
  255. CLOSING_GUARD, butil::memory_order_relaxed);
  256. if (butex == CLOSING_GUARD) {
  257. // concurrent double close detected.
  258. errno = EBADF;
  259. return -1;
  260. }
  261. if (butex != NULL) {
  262. butex->fetch_add(1, butil::memory_order_relaxed);
  263. butex_wake_all(butex);
  264. }
  265. #if defined(OS_LINUX)
  266. epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
  267. #elif defined(OS_MACOSX)
  268. struct kevent evt;
  269. EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
  270. kevent(_epfd, &evt, 1, NULL, 0, NULL);
  271. EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
  272. kevent(_epfd, &evt, 1, NULL, 0, NULL);
  273. #endif
  274. const int rc = close(fd);
  275. pbutex->exchange(butex, butil::memory_order_relaxed);
  276. return rc;
  277. }
  278. bool started() const {
  279. return _epfd >= 0;
  280. }
  281. private:
  282. static void* run_this(void* arg) {
  283. return static_cast<EpollThread*>(arg)->run();
  284. }
  285. void* run() {
  286. const int initial_epfd = _epfd;
  287. const size_t MAX_EVENTS = 32;
  288. #if defined(OS_LINUX)
  289. epoll_event* e = new (std::nothrow) epoll_event[MAX_EVENTS];
  290. #elif defined(OS_MACOSX)
  291. typedef struct kevent KEVENT;
  292. struct kevent* e = new (std::nothrow) KEVENT[MAX_EVENTS];
  293. #endif
  294. if (NULL == e) {
  295. LOG(FATAL) << "Fail to new epoll_event";
  296. return NULL;
  297. }
  298. #if defined(OS_LINUX)
  299. # ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
  300. DLOG(INFO) << "Use DEL+ADD instead of EPOLLONESHOT+MOD due to kernel bug. Performance will be much lower.";
  301. # endif
  302. #endif
  303. while (!_stop) {
  304. const int epfd = _epfd;
  305. #if defined(OS_LINUX)
  306. const int n = epoll_wait(epfd, e, MAX_EVENTS, -1);
  307. #elif defined(OS_MACOSX)
  308. const int n = kevent(epfd, NULL, 0, e, MAX_EVENTS, NULL);
  309. #endif
  310. if (_stop) {
  311. break;
  312. }
  313. if (n < 0) {
  314. if (errno == EINTR) {
  315. #ifndef NDEBUG
  316. break_nums.fetch_add(1, butil::memory_order_relaxed);
  317. int* p = &errno;
  318. const char* b = berror();
  319. const char* b2 = berror(errno);
  320. DLOG(FATAL) << "Fail to epoll epfd=" << epfd << ", "
  321. << errno << " " << p << " " << b << " " << b2;
  322. #endif
  323. continue;
  324. }
  325. PLOG(INFO) << "Fail to epoll epfd=" << epfd;
  326. break;
  327. }
  328. #if defined(OS_LINUX)
  329. # ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
  330. for (int i = 0; i < n; ++i) {
  331. epoll_ctl(epfd, EPOLL_CTL_DEL, e[i].data.fd, NULL);
  332. }
  333. # endif
  334. #endif
  335. for (int i = 0; i < n; ++i) {
  336. #if defined(OS_LINUX)
  337. # ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
  338. EpollButex* butex = static_cast<EpollButex*>(e[i].data.ptr);
  339. # else
  340. butil::atomic<EpollButex*>* pbutex = fd_butexes.get(e[i].data.fd);
  341. EpollButex* butex = pbutex ?
  342. pbutex->load(butil::memory_order_consume) : NULL;
  343. # endif
  344. #elif defined(OS_MACOSX)
  345. EpollButex* butex = static_cast<EpollButex*>(e[i].udata);
  346. #endif
  347. if (butex != NULL && butex != CLOSING_GUARD) {
  348. butex->fetch_add(1, butil::memory_order_relaxed);
  349. butex_wake_all(butex);
  350. }
  351. }
  352. }
  353. delete [] e;
  354. DLOG(INFO) << "EpollThread=" << _tid << "(epfd="
  355. << initial_epfd << ") is about to stop";
  356. return NULL;
  357. }
  358. int _epfd;
  359. bool _stop;
  360. bthread_t _tid;
  361. butil::Mutex _start_mutex;
  362. };
  363. EpollThread epoll_thread[BTHREAD_EPOLL_THREAD_NUM];
  364. static inline EpollThread& get_epoll_thread(int fd) {
  365. if (BTHREAD_EPOLL_THREAD_NUM == 1UL) {
  366. EpollThread& et = epoll_thread[0];
  367. et.start(BTHREAD_DEFAULT_EPOLL_SIZE);
  368. return et;
  369. }
  370. EpollThread& et = epoll_thread[butil::fmix32(fd) % BTHREAD_EPOLL_THREAD_NUM];
  371. et.start(BTHREAD_DEFAULT_EPOLL_SIZE);
  372. return et;
  373. }
  374. //TODO(zhujiashun): change name
  375. int stop_and_join_epoll_threads() {
  376. // Returns -1 if any epoll thread failed to stop.
  377. int rc = 0;
  378. for (size_t i = 0; i < BTHREAD_EPOLL_THREAD_NUM; ++i) {
  379. if (epoll_thread[i].stop_and_join() < 0) {
  380. rc = -1;
  381. }
  382. }
  383. return rc;
  384. }
  385. #if defined(OS_LINUX)
  386. short epoll_to_poll_events(uint32_t epoll_events) {
  387. // Most POLL* and EPOLL* are same values.
  388. short poll_events = (epoll_events &
  389. (EPOLLIN | EPOLLPRI | EPOLLOUT |
  390. EPOLLRDNORM | EPOLLRDBAND |
  391. EPOLLWRNORM | EPOLLWRBAND |
  392. EPOLLMSG | EPOLLERR | EPOLLHUP));
  393. CHECK_EQ((uint32_t)poll_events, epoll_events);
  394. return poll_events;
  395. }
  396. #elif defined(OS_MACOSX)
  397. static short kqueue_to_poll_events(int kqueue_events) {
  398. //TODO: add more values?
  399. short poll_events = 0;
  400. if (kqueue_events == EVFILT_READ) {
  401. poll_events |= POLLIN;
  402. }
  403. if (kqueue_events == EVFILT_WRITE) {
  404. poll_events |= POLLOUT;
  405. }
  406. return poll_events;
  407. }
  408. #endif
  409. // For pthreads.
  410. int pthread_fd_wait(int fd, unsigned events,
  411. const timespec* abstime) {
  412. int diff_ms = -1;
  413. if (abstime) {
  414. timespec now;
  415. clock_gettime(CLOCK_REALTIME, &now);
  416. int64_t now_us = butil::timespec_to_microseconds(now);
  417. int64_t abstime_us = butil::timespec_to_microseconds(*abstime);
  418. if (abstime_us <= now_us) {
  419. errno = ETIMEDOUT;
  420. return -1;
  421. }
  422. diff_ms = (abstime_us - now_us + 999L) / 1000L;
  423. }
  424. #if defined(OS_LINUX)
  425. const short poll_events = bthread::epoll_to_poll_events(events);
  426. #elif defined(OS_MACOSX)
  427. const short poll_events = bthread::kqueue_to_poll_events(events);
  428. #endif
  429. if (poll_events == 0) {
  430. errno = EINVAL;
  431. return -1;
  432. }
  433. pollfd ufds = { fd, poll_events, 0 };
  434. const int rc = poll(&ufds, 1, diff_ms);
  435. if (rc < 0) {
  436. return -1;
  437. }
  438. if (rc == 0) {
  439. errno = ETIMEDOUT;
  440. return -1;
  441. }
  442. if (ufds.revents & POLLNVAL) {
  443. errno = EBADF;
  444. return -1;
  445. }
  446. return 0;
  447. }
  448. } // namespace bthread
  449. extern "C" {
  450. int bthread_fd_wait(int fd, unsigned events) {
  451. if (fd < 0) {
  452. errno = EINVAL;
  453. return -1;
  454. }
  455. bthread::TaskGroup* g = bthread::tls_task_group;
  456. if (NULL != g && !g->is_current_pthread_task()) {
  457. return bthread::get_epoll_thread(fd).fd_wait(
  458. fd, events, NULL);
  459. }
  460. return bthread::pthread_fd_wait(fd, events, NULL);
  461. }
  462. int bthread_fd_timedwait(int fd, unsigned events,
  463. const timespec* abstime) {
  464. if (NULL == abstime) {
  465. return bthread_fd_wait(fd, events);
  466. }
  467. if (fd < 0) {
  468. errno = EINVAL;
  469. return -1;
  470. }
  471. bthread::TaskGroup* g = bthread::tls_task_group;
  472. if (NULL != g && !g->is_current_pthread_task()) {
  473. return bthread::get_epoll_thread(fd).fd_wait(
  474. fd, events, abstime);
  475. }
  476. return bthread::pthread_fd_wait(fd, events, abstime);
  477. }
  478. int bthread_connect(int sockfd, const sockaddr* serv_addr,
  479. socklen_t addrlen) {
  480. bthread::TaskGroup* g = bthread::tls_task_group;
  481. if (NULL == g || g->is_current_pthread_task()) {
  482. return ::connect(sockfd, serv_addr, addrlen);
  483. }
  484. // FIXME: Scoped non-blocking?
  485. butil::make_non_blocking(sockfd);
  486. const int rc = connect(sockfd, serv_addr, addrlen);
  487. if (rc == 0 || errno != EINPROGRESS) {
  488. return rc;
  489. }
  490. #if defined(OS_LINUX)
  491. if (bthread_fd_wait(sockfd, EPOLLOUT) < 0) {
  492. #elif defined(OS_MACOSX)
  493. if (bthread_fd_wait(sockfd, EVFILT_WRITE) < 0) {
  494. #endif
  495. return -1;
  496. }
  497. int err;
  498. socklen_t errlen = sizeof(err);
  499. if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
  500. PLOG(FATAL) << "Fail to getsockopt";
  501. return -1;
  502. }
  503. if (err != 0) {
  504. CHECK(err != EINPROGRESS);
  505. errno = err;
  506. return -1;
  507. }
  508. return 0;
  509. }
  510. // This does not wake pthreads calling bthread_fd_*wait.
  511. int bthread_close(int fd) {
  512. return bthread::get_epoll_thread(fd).fd_close(fd);
  513. }
  514. } // extern "C"