bthread_unittest.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <execinfo.h>
  18. #include <gtest/gtest.h>
  19. #include "butil/time.h"
  20. #include "butil/macros.h"
  21. #include "butil/logging.h"
  22. #include "butil/logging.h"
  23. #include "butil/gperftools_profiler.h"
  24. #include "bthread/bthread.h"
  25. #include "bthread/unstable.h"
  26. #include "bthread/task_meta.h"
  27. namespace {
  28. class BthreadTest : public ::testing::Test{
  29. protected:
  30. BthreadTest(){
  31. const int kNumCores = sysconf(_SC_NPROCESSORS_ONLN);
  32. if (kNumCores > 0) {
  33. bthread_setconcurrency(kNumCores);
  34. }
  35. };
  36. virtual ~BthreadTest(){};
  37. virtual void SetUp() {
  38. };
  39. virtual void TearDown() {
  40. };
  41. };
  42. TEST_F(BthreadTest, sizeof_task_meta) {
  43. LOG(INFO) << "sizeof(TaskMeta)=" << sizeof(bthread::TaskMeta);
  44. }
  45. void* unrelated_pthread(void*) {
  46. LOG(INFO) << "I did not call any bthread function, "
  47. "I should begin and end without any problem";
  48. return (void*)(intptr_t)1;
  49. }
  50. TEST_F(BthreadTest, unrelated_pthread) {
  51. pthread_t th;
  52. ASSERT_EQ(0, pthread_create(&th, NULL, unrelated_pthread, NULL));
  53. void* ret = NULL;
  54. ASSERT_EQ(0, pthread_join(th, &ret));
  55. ASSERT_EQ(1, (intptr_t)ret);
  56. }
  57. TEST_F(BthreadTest, attr_init_and_destroy) {
  58. bthread_attr_t attr;
  59. ASSERT_EQ(0, bthread_attr_init(&attr));
  60. ASSERT_EQ(0, bthread_attr_destroy(&attr));
  61. }
  62. bthread_fcontext_t fcm;
  63. bthread_fcontext_t fc;
  64. typedef std::pair<int,int> pair_t;
  65. static void f(intptr_t param) {
  66. pair_t* p = (pair_t*)param;
  67. p = (pair_t*)bthread_jump_fcontext(&fc, fcm, (intptr_t)(p->first+p->second));
  68. bthread_jump_fcontext(&fc, fcm, (intptr_t)(p->first+p->second));
  69. }
  70. TEST_F(BthreadTest, context_sanity) {
  71. fcm = NULL;
  72. std::size_t size(8192);
  73. void* sp = malloc(size);
  74. pair_t p(std::make_pair(2, 7));
  75. fc = bthread_make_fcontext((char*)sp + size, size, f);
  76. int res = (int)bthread_jump_fcontext(&fcm, fc, (intptr_t)&p);
  77. std::cout << p.first << " + " << p.second << " == " << res << std::endl;
  78. p = std::make_pair(5, 6);
  79. res = (int)bthread_jump_fcontext(&fcm, fc, (intptr_t)&p);
  80. std::cout << p.first << " + " << p.second << " == " << res << std::endl;
  81. }
  82. TEST_F(BthreadTest, call_bthread_functions_before_tls_created) {
  83. ASSERT_EQ(0, bthread_usleep(1000));
  84. ASSERT_EQ(EINVAL, bthread_join(0, NULL));
  85. ASSERT_EQ(0UL, bthread_self());
  86. }
  87. butil::atomic<bool> stop(false);
  88. void* sleep_for_awhile(void* arg) {
  89. LOG(INFO) << "sleep_for_awhile(" << arg << ")";
  90. bthread_usleep(100000L);
  91. LOG(INFO) << "sleep_for_awhile(" << arg << ") wakes up";
  92. return NULL;
  93. }
  94. void* just_exit(void* arg) {
  95. LOG(INFO) << "just_exit(" << arg << ")";
  96. bthread_exit(NULL);
  97. EXPECT_TRUE(false) << "just_exit(" << arg << ") should never be here";
  98. return NULL;
  99. }
  100. void* repeated_sleep(void* arg) {
  101. for (size_t i = 0; !stop; ++i) {
  102. LOG(INFO) << "repeated_sleep(" << arg << ") i=" << i;
  103. bthread_usleep(1000000L);
  104. }
  105. return NULL;
  106. }
  107. void* spin_and_log(void* arg) {
  108. // This thread never yields CPU.
  109. butil::EveryManyUS every_1s(1000000L);
  110. size_t i = 0;
  111. while (!stop) {
  112. if (every_1s) {
  113. LOG(INFO) << "spin_and_log(" << arg << ")=" << i++;
  114. }
  115. }
  116. return NULL;
  117. }
  118. void* do_nothing(void* arg) {
  119. LOG(INFO) << "do_nothing(" << arg << ")";
  120. return NULL;
  121. }
  122. void* launcher(void* arg) {
  123. LOG(INFO) << "launcher(" << arg << ")";
  124. for (size_t i = 0; !stop; ++i) {
  125. bthread_t th;
  126. bthread_start_urgent(&th, NULL, do_nothing, (void*)i);
  127. bthread_usleep(1000000L);
  128. }
  129. return NULL;
  130. }
  131. void* stopper(void*) {
  132. // Need this thread to set `stop' to true. Reason: If spin_and_log (which
  133. // never yields CPU) is scheduled to main thread, main thread cannot get
  134. // to run again.
  135. bthread_usleep(5*1000000L);
  136. LOG(INFO) << "about to stop";
  137. stop = true;
  138. return NULL;
  139. }
  140. void* misc(void* arg) {
  141. LOG(INFO) << "misc(" << arg << ")";
  142. bthread_t th[8];
  143. EXPECT_EQ(0, bthread_start_urgent(&th[0], NULL, sleep_for_awhile, (void*)2));
  144. EXPECT_EQ(0, bthread_start_urgent(&th[1], NULL, just_exit, (void*)3));
  145. EXPECT_EQ(0, bthread_start_urgent(&th[2], NULL, repeated_sleep, (void*)4));
  146. EXPECT_EQ(0, bthread_start_urgent(&th[3], NULL, repeated_sleep, (void*)68));
  147. EXPECT_EQ(0, bthread_start_urgent(&th[4], NULL, spin_and_log, (void*)5));
  148. EXPECT_EQ(0, bthread_start_urgent(&th[5], NULL, spin_and_log, (void*)85));
  149. EXPECT_EQ(0, bthread_start_urgent(&th[6], NULL, launcher, (void*)6));
  150. EXPECT_EQ(0, bthread_start_urgent(&th[7], NULL, stopper, NULL));
  151. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  152. EXPECT_EQ(0, bthread_join(th[i], NULL));
  153. }
  154. return NULL;
  155. }
  156. TEST_F(BthreadTest, sanity) {
  157. LOG(INFO) << "main thread " << pthread_self();
  158. bthread_t th1;
  159. ASSERT_EQ(0, bthread_start_urgent(&th1, NULL, misc, (void*)1));
  160. LOG(INFO) << "back to main thread " << th1 << " " << pthread_self();
  161. ASSERT_EQ(0, bthread_join(th1, NULL));
  162. }
  163. const size_t BT_SIZE = 64;
  164. void *bt_array[BT_SIZE];
  165. int bt_cnt;
  166. int do_bt (void) {
  167. bt_cnt = backtrace (bt_array, BT_SIZE);
  168. return 56;
  169. }
  170. int call_do_bt (void) {
  171. return do_bt () + 1;
  172. }
  173. void * tf (void*) {
  174. if (call_do_bt () != 57) {
  175. return (void *) 1L;
  176. }
  177. return NULL;
  178. }
  179. TEST_F(BthreadTest, backtrace) {
  180. bthread_t th;
  181. ASSERT_EQ(0, bthread_start_urgent(&th, NULL, tf, NULL));
  182. ASSERT_EQ(0, bthread_join (th, NULL));
  183. char **text = backtrace_symbols (bt_array, bt_cnt);
  184. ASSERT_TRUE(text);
  185. for (int i = 0; i < bt_cnt; ++i) {
  186. puts(text[i]);
  187. }
  188. }
  189. void* show_self(void*) {
  190. EXPECT_NE(0ul, bthread_self());
  191. LOG(INFO) << "bthread_self=" << bthread_self();
  192. return NULL;
  193. }
  194. TEST_F(BthreadTest, bthread_self) {
  195. ASSERT_EQ(0ul, bthread_self());
  196. bthread_t bth;
  197. ASSERT_EQ(0, bthread_start_urgent(&bth, NULL, show_self, NULL));
  198. ASSERT_EQ(0, bthread_join(bth, NULL));
  199. }
  200. void* join_self(void*) {
  201. EXPECT_EQ(EINVAL, bthread_join(bthread_self(), NULL));
  202. return NULL;
  203. }
  204. TEST_F(BthreadTest, bthread_join) {
  205. // Invalid tid
  206. ASSERT_EQ(EINVAL, bthread_join(0, NULL));
  207. // Unexisting tid
  208. ASSERT_EQ(EINVAL, bthread_join((bthread_t)-1, NULL));
  209. // Joining self
  210. bthread_t th;
  211. ASSERT_EQ(0, bthread_start_urgent(&th, NULL, join_self, NULL));
  212. }
  213. void* change_errno(void* arg) {
  214. errno = (intptr_t)arg;
  215. return NULL;
  216. }
  217. TEST_F(BthreadTest, errno_not_changed) {
  218. bthread_t th;
  219. errno = 1;
  220. bthread_start_urgent(&th, NULL, change_errno, (void*)(intptr_t)2);
  221. ASSERT_EQ(1, errno);
  222. }
  223. static long sleep_in_adding_func = 0;
  224. void* adding_func(void* arg) {
  225. butil::atomic<size_t>* s = (butil::atomic<size_t>*)arg;
  226. if (sleep_in_adding_func > 0) {
  227. long t1 = 0;
  228. if (10000 == s->fetch_add(1)) {
  229. t1 = butil::cpuwide_time_us();
  230. }
  231. bthread_usleep(sleep_in_adding_func);
  232. if (t1) {
  233. LOG(INFO) << "elapse is " << butil::cpuwide_time_us() - t1 << "ns";
  234. }
  235. } else {
  236. s->fetch_add(1);
  237. }
  238. return NULL;
  239. }
  240. TEST_F(BthreadTest, small_threads) {
  241. for (size_t z = 0; z < 2; ++z) {
  242. sleep_in_adding_func = (z ? 1 : 0);
  243. char prof_name[32];
  244. if (sleep_in_adding_func) {
  245. snprintf(prof_name, sizeof(prof_name), "smallthread.prof");
  246. } else {
  247. snprintf(prof_name, sizeof(prof_name), "smallthread_nosleep.prof");
  248. }
  249. butil::atomic<size_t> s(0);
  250. size_t N = (sleep_in_adding_func ? 40000 : 100000);
  251. std::vector<bthread_t> th;
  252. th.reserve(N);
  253. butil::Timer tm;
  254. for (size_t j = 0; j < 3; ++j) {
  255. th.clear();
  256. if (j == 1) {
  257. ProfilerStart(prof_name);
  258. }
  259. tm.start();
  260. for (size_t i = 0; i < N; ++i) {
  261. bthread_t t1;
  262. ASSERT_EQ(0, bthread_start_urgent(
  263. &t1, &BTHREAD_ATTR_SMALL, adding_func, &s));
  264. th.push_back(t1);
  265. }
  266. tm.stop();
  267. if (j == 1) {
  268. ProfilerStop();
  269. }
  270. for (size_t i = 0; i < N; ++i) {
  271. bthread_join(th[i], NULL);
  272. }
  273. LOG(INFO) << "[Round " << j + 1 << "] bthread_start_urgent takes "
  274. << tm.n_elapsed()/N << "ns, sum=" << s;
  275. ASSERT_EQ(N * (j + 1), (size_t)s);
  276. // Check uniqueness of th
  277. std::sort(th.begin(), th.end());
  278. ASSERT_EQ(th.end(), std::unique(th.begin(), th.end()));
  279. }
  280. }
  281. }
  282. void* bthread_starter(void* void_counter) {
  283. while (!stop.load(butil::memory_order_relaxed)) {
  284. bthread_t th;
  285. EXPECT_EQ(0, bthread_start_urgent(&th, NULL, adding_func, void_counter));
  286. }
  287. return NULL;
  288. }
  289. struct BAIDU_CACHELINE_ALIGNMENT AlignedCounter {
  290. AlignedCounter() : value(0) {}
  291. butil::atomic<size_t> value;
  292. };
  293. TEST_F(BthreadTest, start_bthreads_frequently) {
  294. sleep_in_adding_func = 0;
  295. char prof_name[32];
  296. snprintf(prof_name, sizeof(prof_name), "start_bthreads_frequently.prof");
  297. const int con = bthread_getconcurrency();
  298. ASSERT_GT(con, 0);
  299. AlignedCounter* counters = new AlignedCounter[con];
  300. bthread_t th[con];
  301. std::cout << "Perf with different parameters..." << std::endl;
  302. //ProfilerStart(prof_name);
  303. for (int cur_con = 1; cur_con <= con; ++cur_con) {
  304. stop = false;
  305. for (int i = 0; i < cur_con; ++i) {
  306. counters[i].value = 0;
  307. ASSERT_EQ(0, bthread_start_urgent(
  308. &th[i], NULL, bthread_starter, &counters[i].value));
  309. }
  310. butil::Timer tm;
  311. tm.start();
  312. bthread_usleep(200000L);
  313. stop = true;
  314. for (int i = 0; i < cur_con; ++i) {
  315. bthread_join(th[i], NULL);
  316. }
  317. tm.stop();
  318. size_t sum = 0;
  319. for (int i = 0; i < cur_con; ++i) {
  320. sum += counters[i].value * 1000 / tm.m_elapsed();
  321. }
  322. std::cout << sum << ",";
  323. }
  324. std::cout << std::endl;
  325. //ProfilerStop();
  326. delete [] counters;
  327. }
  328. void* log_start_latency(void* void_arg) {
  329. butil::Timer* tm = static_cast<butil::Timer*>(void_arg);
  330. tm->stop();
  331. return NULL;
  332. }
  333. TEST_F(BthreadTest, start_latency_when_high_idle) {
  334. bool warmup = true;
  335. long elp1 = 0;
  336. long elp2 = 0;
  337. int REP = 0;
  338. for (int i = 0; i < 10000; ++i) {
  339. butil::Timer tm;
  340. tm.start();
  341. bthread_t th;
  342. bthread_start_urgent(&th, NULL, log_start_latency, &tm);
  343. bthread_join(th, NULL);
  344. bthread_t th2;
  345. butil::Timer tm2;
  346. tm2.start();
  347. bthread_start_background(&th2, NULL, log_start_latency, &tm2);
  348. bthread_join(th2, NULL);
  349. if (!warmup) {
  350. ++REP;
  351. elp1 += tm.n_elapsed();
  352. elp2 += tm2.n_elapsed();
  353. } else if (i == 100) {
  354. warmup = false;
  355. }
  356. }
  357. LOG(INFO) << "start_urgent=" << elp1 / REP << "ns start_background="
  358. << elp2 / REP << "ns";
  359. }
  360. void* sleep_for_awhile_with_sleep(void* arg) {
  361. bthread_usleep((intptr_t)arg);
  362. return NULL;
  363. }
  364. TEST_F(BthreadTest, stop_sleep) {
  365. bthread_t th;
  366. ASSERT_EQ(0, bthread_start_urgent(
  367. &th, NULL, sleep_for_awhile_with_sleep, (void*)1000000L));
  368. butil::Timer tm;
  369. tm.start();
  370. bthread_usleep(10000);
  371. ASSERT_EQ(0, bthread_stop(th));
  372. ASSERT_EQ(0, bthread_join(th, NULL));
  373. tm.stop();
  374. ASSERT_LE(labs(tm.m_elapsed() - 10), 10);
  375. }
  376. TEST_F(BthreadTest, bthread_exit) {
  377. bthread_t th1;
  378. bthread_t th2;
  379. pthread_t th3;
  380. bthread_t th4;
  381. bthread_t th5;
  382. const bthread_attr_t attr = BTHREAD_ATTR_PTHREAD;
  383. ASSERT_EQ(0, bthread_start_urgent(&th1, NULL, just_exit, NULL));
  384. ASSERT_EQ(0, bthread_start_background(&th2, NULL, just_exit, NULL));
  385. ASSERT_EQ(0, pthread_create(&th3, NULL, just_exit, NULL));
  386. EXPECT_EQ(0, bthread_start_urgent(&th4, &attr, just_exit, NULL));
  387. EXPECT_EQ(0, bthread_start_background(&th5, &attr, just_exit, NULL));
  388. ASSERT_EQ(0, bthread_join(th1, NULL));
  389. ASSERT_EQ(0, bthread_join(th2, NULL));
  390. ASSERT_EQ(0, pthread_join(th3, NULL));
  391. ASSERT_EQ(0, bthread_join(th4, NULL));
  392. ASSERT_EQ(0, bthread_join(th5, NULL));
  393. }
  394. TEST_F(BthreadTest, bthread_equal) {
  395. bthread_t th1;
  396. ASSERT_EQ(0, bthread_start_urgent(&th1, NULL, do_nothing, NULL));
  397. bthread_t th2;
  398. ASSERT_EQ(0, bthread_start_urgent(&th2, NULL, do_nothing, NULL));
  399. ASSERT_EQ(0, bthread_equal(th1, th2));
  400. bthread_t th3 = th2;
  401. ASSERT_EQ(1, bthread_equal(th3, th2));
  402. ASSERT_EQ(0, bthread_join(th1, NULL));
  403. ASSERT_EQ(0, bthread_join(th2, NULL));
  404. }
  405. void* mark_run(void* run) {
  406. *static_cast<pthread_t*>(run) = pthread_self();
  407. return NULL;
  408. }
  409. void* check_sleep(void* pthread_task) {
  410. EXPECT_TRUE(bthread_self() != 0);
  411. // Create a no-signal task that other worker will not steal. The task will be
  412. // run if current bthread does context switch.
  413. bthread_attr_t attr = BTHREAD_ATTR_NORMAL | BTHREAD_NOSIGNAL;
  414. bthread_t th1;
  415. pthread_t run = 0;
  416. const pthread_t pid = pthread_self();
  417. EXPECT_EQ(0, bthread_start_urgent(&th1, &attr, mark_run, &run));
  418. if (pthread_task) {
  419. bthread_usleep(100000L);
  420. // due to NOSIGNAL, mark_run did not run.
  421. // FIXME: actually runs. someone is still stealing.
  422. // EXPECT_EQ((pthread_t)0, run);
  423. // bthread_usleep = usleep for BTHREAD_ATTR_PTHREAD
  424. EXPECT_EQ(pid, pthread_self());
  425. // schedule mark_run
  426. bthread_flush();
  427. } else {
  428. // start_urgent should jump to the new thread first, then back to
  429. // current thread.
  430. EXPECT_EQ(pid, run); // should run in the same pthread
  431. }
  432. EXPECT_EQ(0, bthread_join(th1, NULL));
  433. if (pthread_task) {
  434. EXPECT_EQ(pid, pthread_self());
  435. EXPECT_NE((pthread_t)0, run); // the mark_run should run.
  436. }
  437. return NULL;
  438. }
  439. TEST_F(BthreadTest, bthread_usleep) {
  440. // NOTE: May fail because worker threads may still be stealing tasks
  441. // after previous cases.
  442. usleep(10000);
  443. bthread_t th1;
  444. ASSERT_EQ(0, bthread_start_urgent(&th1, &BTHREAD_ATTR_PTHREAD,
  445. check_sleep, (void*)1));
  446. ASSERT_EQ(0, bthread_join(th1, NULL));
  447. bthread_t th2;
  448. ASSERT_EQ(0, bthread_start_urgent(&th2, NULL,
  449. check_sleep, (void*)0));
  450. ASSERT_EQ(0, bthread_join(th2, NULL));
  451. }
  452. void* dummy_thread(void*) {
  453. return NULL;
  454. }
  455. TEST_F(BthreadTest, too_many_nosignal_threads) {
  456. for (size_t i = 0; i < 100000; ++i) {
  457. bthread_attr_t attr = BTHREAD_ATTR_NORMAL | BTHREAD_NOSIGNAL;
  458. bthread_t tid;
  459. ASSERT_EQ(0, bthread_start_urgent(&tid, &attr, dummy_thread, NULL));
  460. }
  461. }
  462. static void* yield_thread(void*) {
  463. bthread_yield();
  464. return NULL;
  465. }
  466. TEST_F(BthreadTest, yield_single_thread) {
  467. bthread_t tid;
  468. ASSERT_EQ(0, bthread_start_background(&tid, NULL, yield_thread, NULL));
  469. ASSERT_EQ(0, bthread_join(tid, NULL));
  470. }
  471. } // namespace