fd.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. // bthread - A M:N threading library to make applications more concurrent.
  2. // Copyright (c) 2014 Baidu, Inc.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. // Author: Ge,Jun (gejun@baidu.com)
  16. // Date: Thu Aug 7 18:56:27 CST 2014
  17. #include "butil/compat.h"
  18. #include <new> // std::nothrow
  19. #include <sys/poll.h> // poll()
  20. #if defined(OS_MACOSX)
  21. #include <sys/types.h> // struct kevent
  22. #include <sys/event.h> // kevent(), kqueue()
  23. #endif
  24. #include "butil/atomicops.h"
  25. #include "butil/time.h"
  26. #include "butil/fd_utility.h" // make_non_blocking
  27. #include "butil/logging.h"
  28. #include "butil/third_party/murmurhash3/murmurhash3.h" // fmix32
  29. #include "bthread/butex.h" // butex_*
  30. #include "bthread/task_group.h" // TaskGroup
  31. #include "bthread/bthread.h" // bthread_start_urgent
  32. // Implement bthread functions on file descriptors
  33. namespace bthread {
  34. extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
  35. template <typename T, size_t NBLOCK, size_t BLOCK_SIZE>
  36. class LazyArray {
  37. struct Block {
  38. butil::atomic<T> items[BLOCK_SIZE];
  39. };
  40. public:
  41. LazyArray() {
  42. memset(_blocks, 0, sizeof(butil::atomic<Block*>) * NBLOCK);
  43. }
  44. butil::atomic<T>* get_or_new(size_t index) {
  45. const size_t block_index = index / BLOCK_SIZE;
  46. if (block_index >= NBLOCK) {
  47. return NULL;
  48. }
  49. const size_t block_offset = index - block_index * BLOCK_SIZE;
  50. Block* b = _blocks[block_index].load(butil::memory_order_consume);
  51. if (b != NULL) {
  52. return b->items + block_offset;
  53. }
  54. b = new (std::nothrow) Block;
  55. if (NULL == b) {
  56. b = _blocks[block_index].load(butil::memory_order_consume);
  57. return (b ? b->items + block_offset : NULL);
  58. }
  59. // Set items to default value of T.
  60. std::fill(b->items, b->items + BLOCK_SIZE, T());
  61. Block* expected = NULL;
  62. if (_blocks[block_index].compare_exchange_strong(
  63. expected, b, butil::memory_order_release,
  64. butil::memory_order_consume)) {
  65. return b->items + block_offset;
  66. }
  67. delete b;
  68. return expected->items + block_offset;
  69. }
  70. butil::atomic<T>* get(size_t index) const {
  71. const size_t block_index = index / BLOCK_SIZE;
  72. if (__builtin_expect(block_index < NBLOCK, 1)) {
  73. const size_t block_offset = index - block_index * BLOCK_SIZE;
  74. Block* const b = _blocks[block_index].load(butil::memory_order_consume);
  75. if (__builtin_expect(b != NULL, 1)) {
  76. return b->items + block_offset;
  77. }
  78. }
  79. return NULL;
  80. }
  81. private:
  82. butil::atomic<Block*> _blocks[NBLOCK];
  83. };
  84. typedef butil::atomic<int> EpollButex;
  85. static EpollButex* const CLOSING_GUARD = (EpollButex*)(intptr_t)-1L;
  86. #ifndef NDEBUG
  87. butil::static_atomic<int> break_nums = BUTIL_STATIC_ATOMIC_INIT(0);
  88. #endif
  89. // Able to address 67108864 file descriptors, should be enough.
  90. LazyArray<EpollButex*, 262144/*NBLOCK*/, 256/*BLOCK_SIZE*/> fd_butexes;
  91. static const int BTHREAD_DEFAULT_EPOLL_SIZE = 65536;
  92. class EpollThread {
  93. public:
  94. EpollThread()
  95. : _epfd(-1)
  96. , _stop(false)
  97. , _tid(0) {
  98. }
  99. int start(int epoll_size) {
  100. if (started()) {
  101. return -1;
  102. }
  103. _start_mutex.lock();
  104. // Double check
  105. if (started()) {
  106. _start_mutex.unlock();
  107. return -1;
  108. }
  109. #if defined(OS_LINUX)
  110. _epfd = epoll_create(epoll_size);
  111. #elif defined(OS_MACOSX)
  112. _epfd = kqueue();
  113. #endif
  114. _start_mutex.unlock();
  115. if (_epfd < 0) {
  116. PLOG(FATAL) << "Fail to epoll_create/kqueue";
  117. return -1;
  118. }
  119. if (bthread_start_background(
  120. &_tid, NULL, EpollThread::run_this, this) != 0) {
  121. close(_epfd);
  122. _epfd = -1;
  123. LOG(FATAL) << "Fail to create epoll bthread";
  124. return -1;
  125. }
  126. return 0;
  127. }
  128. // Note: This function does not wake up suspended fd_wait. This is fine
  129. // since stop_and_join is only called on program's termination
  130. // (g_task_control.stop()), suspended bthreads do not block quit of
  131. // worker pthreads and completion of g_task_control.stop().
  132. int stop_and_join() {
  133. if (!started()) {
  134. return 0;
  135. }
  136. // No matter what this function returns, _epfd will be set to -1
  137. // (making started() false) to avoid latter stop_and_join() to
  138. // enter again.
  139. const int saved_epfd = _epfd;
  140. _epfd = -1;
  141. // epoll_wait cannot be woken up by closing _epfd. We wake up
  142. // epoll_wait by inserting a fd continuously triggering EPOLLOUT.
  143. // Visibility of _stop: constant EPOLLOUT forces epoll_wait to see
  144. // _stop (to be true) finally.
  145. _stop = true;
  146. int closing_epoll_pipe[2];
  147. if (pipe(closing_epoll_pipe)) {
  148. PLOG(FATAL) << "Fail to create closing_epoll_pipe";
  149. return -1;
  150. }
  151. #if defined(OS_LINUX)
  152. epoll_event evt = { EPOLLOUT, { NULL } };
  153. if (epoll_ctl(saved_epfd, EPOLL_CTL_ADD,
  154. closing_epoll_pipe[1], &evt) < 0) {
  155. #elif defined(OS_MACOSX)
  156. struct kevent kqueue_event;
  157. EV_SET(&kqueue_event, closing_epoll_pipe[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
  158. 0, 0, NULL);
  159. if (kevent(saved_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
  160. #endif
  161. PLOG(FATAL) << "Fail to add closing_epoll_pipe into epfd="
  162. << saved_epfd;
  163. return -1;
  164. }
  165. const int rc = bthread_join(_tid, NULL);
  166. if (rc) {
  167. LOG(FATAL) << "Fail to join EpollThread, " << berror(rc);
  168. return -1;
  169. }
  170. close(closing_epoll_pipe[0]);
  171. close(closing_epoll_pipe[1]);
  172. close(saved_epfd);
  173. return 0;
  174. }
  175. int fd_wait(int fd, unsigned events, const timespec* abstime) {
  176. butil::atomic<EpollButex*>* p = fd_butexes.get_or_new(fd);
  177. if (NULL == p) {
  178. errno = ENOMEM;
  179. return -1;
  180. }
  181. EpollButex* butex = p->load(butil::memory_order_consume);
  182. if (NULL == butex) {
  183. // It is rare to wait on one file descriptor from multiple threads
  184. // simultaneously. Creating singleton by optimistic locking here
  185. // saves mutexes for each butex.
  186. butex = butex_create_checked<EpollButex>();
  187. butex->store(0, butil::memory_order_relaxed);
  188. EpollButex* expected = NULL;
  189. if (!p->compare_exchange_strong(expected, butex,
  190. butil::memory_order_release,
  191. butil::memory_order_consume)) {
  192. butex_destroy(butex);
  193. butex = expected;
  194. }
  195. }
  196. while (butex == CLOSING_GUARD) { // bthread_close() is running.
  197. if (sched_yield() < 0) {
  198. return -1;
  199. }
  200. butex = p->load(butil::memory_order_consume);
  201. }
  202. // Save value of butex before adding to epoll because the butex may
  203. // be changed before butex_wait. No memory fence because EPOLL_CTL_MOD
  204. // and EPOLL_CTL_ADD shall have release fence.
  205. const int expected_val = butex->load(butil::memory_order_relaxed);
  206. #if defined(OS_LINUX)
  207. # ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
  208. epoll_event evt = { events | EPOLLONESHOT, { butex } };
  209. if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
  210. if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
  211. errno != EEXIST) {
  212. PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
  213. return -1;
  214. }
  215. }
  216. # else
  217. epoll_event evt;
  218. evt.events = events;
  219. evt.data.fd = fd;
  220. if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
  221. errno != EEXIST) {
  222. PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
  223. return -1;
  224. }
  225. # endif
  226. #elif defined(OS_MACOSX)
  227. struct kevent kqueue_event;
  228. EV_SET(&kqueue_event, fd, events, EV_ADD | EV_ENABLE | EV_ONESHOT,
  229. 0, 0, butex);
  230. if (kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
  231. PLOG(FATAL) << "Fail to add fd=" << fd << " into kqueuefd=" << _epfd;
  232. return -1;
  233. }
  234. #endif
  235. if (butex_wait(butex, expected_val, abstime) < 0 &&
  236. errno != EWOULDBLOCK && errno != EINTR) {
  237. return -1;
  238. }
  239. return 0;
  240. }
  241. int fd_close(int fd) {
  242. if (fd < 0) {
  243. // what close(-1) returns
  244. errno = EBADF;
  245. return -1;
  246. }
  247. butil::atomic<EpollButex*>* pbutex = bthread::fd_butexes.get(fd);
  248. if (NULL == pbutex) {
  249. // Did not call bthread_fd functions, close directly.
  250. return close(fd);
  251. }
  252. EpollButex* butex = pbutex->exchange(
  253. CLOSING_GUARD, butil::memory_order_relaxed);
  254. if (butex == CLOSING_GUARD) {
  255. // concurrent double close detected.
  256. errno = EBADF;
  257. return -1;
  258. }
  259. if (butex != NULL) {
  260. butex->fetch_add(1, butil::memory_order_relaxed);
  261. butex_wake_all(butex);
  262. }
  263. #if defined(OS_LINUX)
  264. epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
  265. #elif defined(OS_MACOSX)
  266. struct kevent evt;
  267. EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
  268. kevent(_epfd, &evt, 1, NULL, 0, NULL);
  269. EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
  270. kevent(_epfd, &evt, 1, NULL, 0, NULL);
  271. #endif
  272. const int rc = close(fd);
  273. pbutex->exchange(butex, butil::memory_order_relaxed);
  274. return rc;
  275. }
  276. bool started() const {
  277. return _epfd >= 0;
  278. }
  279. private:
  280. static void* run_this(void* arg) {
  281. return static_cast<EpollThread*>(arg)->run();
  282. }
  283. void* run() {
  284. const int initial_epfd = _epfd;
  285. const size_t MAX_EVENTS = 32;
  286. #if defined(OS_LINUX)
  287. epoll_event* e = new (std::nothrow) epoll_event[MAX_EVENTS];
  288. #elif defined(OS_MACOSX)
  289. typedef struct kevent KEVENT;
  290. struct kevent* e = new (std::nothrow) KEVENT[MAX_EVENTS];
  291. #endif
  292. if (NULL == e) {
  293. LOG(FATAL) << "Fail to new epoll_event";
  294. return NULL;
  295. }
  296. #if defined(OS_LINUX)
  297. # ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
  298. DLOG(INFO) << "Use DEL+ADD instead of EPOLLONESHOT+MOD due to kernel bug. Performance will be much lower.";
  299. # endif
  300. #endif
  301. while (!_stop) {
  302. const int epfd = _epfd;
  303. #if defined(OS_LINUX)
  304. const int n = epoll_wait(epfd, e, MAX_EVENTS, -1);
  305. #elif defined(OS_MACOSX)
  306. const int n = kevent(epfd, NULL, 0, e, MAX_EVENTS, NULL);
  307. #endif
  308. if (_stop) {
  309. break;
  310. }
  311. if (n < 0) {
  312. if (errno == EINTR) {
  313. #ifndef NDEBUG
  314. break_nums.fetch_add(1, butil::memory_order_relaxed);
  315. int* p = &errno;
  316. const char* b = berror();
  317. const char* b2 = berror(errno);
  318. DLOG(FATAL) << "Fail to epoll epfd=" << epfd << ", "
  319. << errno << " " << p << " " << b << " " << b2;
  320. #endif
  321. continue;
  322. }
  323. PLOG(INFO) << "Fail to epoll epfd=" << epfd;
  324. break;
  325. }
  326. #if defined(OS_LINUX)
  327. # ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
  328. for (int i = 0; i < n; ++i) {
  329. epoll_ctl(epfd, EPOLL_CTL_DEL, e[i].data.fd, NULL);
  330. }
  331. # endif
  332. #endif
  333. for (int i = 0; i < n; ++i) {
  334. #if defined(OS_LINUX)
  335. # ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
  336. EpollButex* butex = static_cast<EpollButex*>(e[i].data.ptr);
  337. # else
  338. butil::atomic<EpollButex*>* pbutex = fd_butexes.get(e[i].data.fd);
  339. EpollButex* butex = pbutex ?
  340. pbutex->load(butil::memory_order_consume) : NULL;
  341. # endif
  342. #elif defined(OS_MACOSX)
  343. EpollButex* butex = static_cast<EpollButex*>(e[i].udata);
  344. #endif
  345. if (butex != NULL && butex != CLOSING_GUARD) {
  346. butex->fetch_add(1, butil::memory_order_relaxed);
  347. butex_wake_all(butex);
  348. }
  349. }
  350. }
  351. delete [] e;
  352. DLOG(INFO) << "EpollThread=" << _tid << "(epfd="
  353. << initial_epfd << ") is about to stop";
  354. return NULL;
  355. }
  356. int _epfd;
  357. bool _stop;
  358. bthread_t _tid;
  359. butil::Mutex _start_mutex;
  360. };
  361. EpollThread epoll_thread[BTHREAD_EPOLL_THREAD_NUM];
  362. static inline EpollThread& get_epoll_thread(int fd) {
  363. if (BTHREAD_EPOLL_THREAD_NUM == 1UL) {
  364. EpollThread& et = epoll_thread[0];
  365. et.start(BTHREAD_DEFAULT_EPOLL_SIZE);
  366. return et;
  367. }
  368. EpollThread& et = epoll_thread[butil::fmix32(fd) % BTHREAD_EPOLL_THREAD_NUM];
  369. et.start(BTHREAD_DEFAULT_EPOLL_SIZE);
  370. return et;
  371. }
  372. //TODO(zhujiashun): change name
  373. int stop_and_join_epoll_threads() {
  374. // Returns -1 if any epoll thread failed to stop.
  375. int rc = 0;
  376. for (size_t i = 0; i < BTHREAD_EPOLL_THREAD_NUM; ++i) {
  377. if (epoll_thread[i].stop_and_join() < 0) {
  378. rc = -1;
  379. }
  380. }
  381. return rc;
  382. }
  383. #if defined(OS_LINUX)
  384. short epoll_to_poll_events(uint32_t epoll_events) {
  385. // Most POLL* and EPOLL* are same values.
  386. short poll_events = (epoll_events &
  387. (EPOLLIN | EPOLLPRI | EPOLLOUT |
  388. EPOLLRDNORM | EPOLLRDBAND |
  389. EPOLLWRNORM | EPOLLWRBAND |
  390. EPOLLMSG | EPOLLERR | EPOLLHUP));
  391. CHECK_EQ((uint32_t)poll_events, epoll_events);
  392. return poll_events;
  393. }
  394. #elif defined(OS_MACOSX)
  395. short kqueue_to_poll_events(uint32_t kqueue_events) {
  396. //TODO: add more values?
  397. short poll_events = 0;
  398. if (kqueue_events == EVFILT_READ) {
  399. poll_events |= POLLIN;
  400. }
  401. if (kqueue_events == EVFILT_WRITE) {
  402. poll_events |= POLLOUT;
  403. }
  404. return poll_events;
  405. }
  406. #endif
  407. // For pthreads.
  408. int pthread_fd_wait(int fd, unsigned events,
  409. const timespec* abstime) {
  410. int diff_ms = -1;
  411. if (abstime) {
  412. timespec now;
  413. #ifdef __MACH__ // OS X does not have clock_gettime, use clock_get_time
  414. clock_serv_t cclock;
  415. mach_timespec_t mts;
  416. host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
  417. clock_get_time(cclock, &mts);
  418. mach_port_deallocate(mach_task_self(), cclock);
  419. now.tv_sec = mts.tv_sec;
  420. now.tv_nsec = mts.tv_nsec;
  421. #else
  422. clock_gettime(CLOCK_REALTIME, &now);
  423. #endif
  424. int64_t now_us = butil::timespec_to_microseconds(now);
  425. int64_t abstime_us = butil::timespec_to_microseconds(*abstime);
  426. if (abstime_us <= now_us) {
  427. errno = ETIMEDOUT;
  428. return -1;
  429. }
  430. diff_ms = (abstime_us - now_us + 999L) / 1000L;
  431. }
  432. #if defined(OS_LINUX)
  433. const short poll_events =
  434. bthread::epoll_to_poll_events(events);
  435. #elif defined(OS_MACOSX)
  436. const short poll_events =
  437. bthread::kqueue_to_poll_events(events);
  438. #endif
  439. if (poll_events == 0) {
  440. errno = EINVAL;
  441. return -1;
  442. }
  443. pollfd ufds = { fd, poll_events, 0 };
  444. const int rc = poll(&ufds, 1, diff_ms);
  445. if (rc < 0) {
  446. return -1;
  447. }
  448. if (rc == 0) {
  449. errno = ETIMEDOUT;
  450. return -1;
  451. }
  452. if (ufds.revents & POLLNVAL) {
  453. errno = EBADF;
  454. return -1;
  455. }
  456. return 0;
  457. }
  458. } // namespace bthread
  459. extern "C" {
  460. int bthread_fd_wait(int fd, unsigned events) {
  461. if (fd < 0) {
  462. errno = EINVAL;
  463. return -1;
  464. }
  465. bthread::TaskGroup* g = bthread::tls_task_group;
  466. if (NULL != g && !g->is_current_pthread_task()) {
  467. return bthread::get_epoll_thread(fd).fd_wait(
  468. fd, events, NULL);
  469. }
  470. return bthread::pthread_fd_wait(fd, events, NULL);
  471. }
  472. int bthread_fd_timedwait(int fd, unsigned events,
  473. const timespec* abstime) {
  474. if (NULL == abstime) {
  475. return bthread_fd_wait(fd, events);
  476. }
  477. if (fd < 0) {
  478. errno = EINVAL;
  479. return -1;
  480. }
  481. bthread::TaskGroup* g = bthread::tls_task_group;
  482. if (NULL != g && !g->is_current_pthread_task()) {
  483. return bthread::get_epoll_thread(fd).fd_wait(
  484. fd, events, abstime);
  485. }
  486. return bthread::pthread_fd_wait(fd, events, abstime);
  487. }
  488. int bthread_connect(int sockfd, const sockaddr* serv_addr,
  489. socklen_t addrlen) {
  490. bthread::TaskGroup* g = bthread::tls_task_group;
  491. if (NULL == g || g->is_current_pthread_task()) {
  492. return ::connect(sockfd, serv_addr, addrlen);
  493. }
  494. // FIXME: Scoped non-blocking?
  495. butil::make_non_blocking(sockfd);
  496. const int rc = connect(sockfd, serv_addr, addrlen);
  497. if (rc == 0 || errno != EINPROGRESS) {
  498. return rc;
  499. }
  500. #if defined(OS_LINUX)
  501. if (bthread_fd_wait(sockfd, EPOLLOUT) < 0) {
  502. #elif defined(OS_MACOSX)
  503. if (bthread_fd_wait(sockfd, EVFILT_WRITE) < 0) {
  504. #endif
  505. return -1;
  506. }
  507. int err;
  508. socklen_t errlen = sizeof(err);
  509. if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
  510. PLOG(FATAL) << "Fail to getsockopt";
  511. return -1;
  512. }
  513. if (err != 0) {
  514. CHECK(err != EINPROGRESS);
  515. errno = err;
  516. return -1;
  517. }
  518. return 0;
  519. }
  520. // This does not wake pthreads calling bthread_fd_*wait.
  521. int bthread_close(int fd) {
  522. return bthread::get_epoll_thread(fd).fd_close(fd);
  523. }
  524. } // extern "C"