bthread_cond_unittest.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <map>
  18. #include <gtest/gtest.h>
  19. #include "butil/atomicops.h"
  20. #include "butil/time.h"
  21. #include "butil/macros.h"
  22. #include "butil/scoped_lock.h"
  23. #include "butil/gperftools_profiler.h"
  24. #include "bthread/bthread.h"
  25. #include "bthread/condition_variable.h"
  26. #include "bthread/stack.h"
  27. namespace {
  28. struct Arg {
  29. bthread_mutex_t m;
  30. bthread_cond_t c;
  31. };
  32. pthread_mutex_t wake_mutex = PTHREAD_MUTEX_INITIALIZER;
  33. long signal_start_time = 0;
  34. std::vector<bthread_t> wake_tid;
  35. std::vector<long> wake_time;
  36. volatile bool stop = false;
  37. const long SIGNAL_INTERVAL_US = 10000;
  38. void* signaler(void* void_arg) {
  39. Arg* a = (Arg*)void_arg;
  40. signal_start_time = butil::gettimeofday_us();
  41. while (!stop) {
  42. bthread_usleep(SIGNAL_INTERVAL_US);
  43. bthread_cond_signal(&a->c);
  44. }
  45. return NULL;
  46. }
  47. void* waiter(void* void_arg) {
  48. Arg* a = (Arg*)void_arg;
  49. bthread_mutex_lock(&a->m);
  50. while (!stop) {
  51. bthread_cond_wait(&a->c, &a->m);
  52. BAIDU_SCOPED_LOCK(wake_mutex);
  53. wake_tid.push_back(bthread_self());
  54. wake_time.push_back(butil::gettimeofday_us());
  55. }
  56. bthread_mutex_unlock(&a->m);
  57. return NULL;
  58. }
  59. TEST(CondTest, sanity) {
  60. Arg a;
  61. ASSERT_EQ(0, bthread_mutex_init(&a.m, NULL));
  62. ASSERT_EQ(0, bthread_cond_init(&a.c, NULL));
  63. // has no effect
  64. ASSERT_EQ(0, bthread_cond_signal(&a.c));
  65. stop = false;
  66. wake_tid.resize(1024);
  67. wake_tid.clear();
  68. wake_time.resize(1024);
  69. wake_time.clear();
  70. bthread_t wth[8];
  71. const size_t NW = ARRAY_SIZE(wth);
  72. for (size_t i = 0; i < NW; ++i) {
  73. ASSERT_EQ(0, bthread_start_urgent(&wth[i], NULL, waiter, &a));
  74. }
  75. bthread_t sth;
  76. ASSERT_EQ(0, bthread_start_urgent(&sth, NULL, signaler, &a));
  77. bthread_usleep(SIGNAL_INTERVAL_US * 200);
  78. pthread_mutex_lock(&wake_mutex);
  79. const size_t nbeforestop = wake_time.size();
  80. pthread_mutex_unlock(&wake_mutex);
  81. stop = true;
  82. for (size_t i = 0; i < NW; ++i) {
  83. bthread_cond_signal(&a.c);
  84. }
  85. bthread_join(sth, NULL);
  86. for (size_t i = 0; i < NW; ++i) {
  87. bthread_join(wth[i], NULL);
  88. }
  89. printf("wake up for %lu times\n", wake_tid.size());
  90. // Check timing
  91. long square_sum = 0;
  92. for (size_t i = 0; i < nbeforestop; ++i) {
  93. long last_time = (i ? wake_time[i-1] : signal_start_time);
  94. long delta = wake_time[i] - last_time - SIGNAL_INTERVAL_US;
  95. EXPECT_GT(wake_time[i], last_time);
  96. square_sum += delta * delta;
  97. EXPECT_LT(labs(delta), 10000L) << "error[" << i << "]=" << delta << "="
  98. << wake_time[i] << " - " << last_time;
  99. }
  100. printf("Average error is %fus\n", sqrt(square_sum / std::max(nbeforestop, 1UL)));
  101. // Check fairness
  102. std::map<bthread_t, int> count;
  103. for (size_t i = 0; i < wake_tid.size(); ++i) {
  104. ++count[wake_tid[i]];
  105. }
  106. EXPECT_EQ(NW, count.size());
  107. int avg_count = (int)(wake_tid.size() / count.size());
  108. for (std::map<bthread_t, int>::iterator
  109. it = count.begin(); it != count.end(); ++it) {
  110. ASSERT_LE(abs(it->second - avg_count), 1)
  111. << "bthread=" << it->first
  112. << " count=" << it->second
  113. << " avg=" << avg_count;
  114. printf("%" PRId64 " wakes up %d times\n", it->first, it->second);
  115. }
  116. bthread_cond_destroy(&a.c);
  117. bthread_mutex_destroy(&a.m);
  118. }
  119. struct WrapperArg {
  120. bthread::Mutex mutex;
  121. bthread::ConditionVariable cond;
  122. };
  123. void* cv_signaler(void* void_arg) {
  124. WrapperArg* a = (WrapperArg*)void_arg;
  125. signal_start_time = butil::gettimeofday_us();
  126. while (!stop) {
  127. bthread_usleep(SIGNAL_INTERVAL_US);
  128. a->cond.notify_one();
  129. }
  130. return NULL;
  131. }
  132. void* cv_bmutex_waiter(void* void_arg) {
  133. WrapperArg* a = (WrapperArg*)void_arg;
  134. std::unique_lock<bthread_mutex_t> lck(*a->mutex.native_handler());
  135. while (!stop) {
  136. a->cond.wait(lck);
  137. }
  138. return NULL;
  139. }
  140. void* cv_mutex_waiter(void* void_arg) {
  141. WrapperArg* a = (WrapperArg*)void_arg;
  142. std::unique_lock<bthread::Mutex> lck(a->mutex);
  143. while (!stop) {
  144. a->cond.wait(lck);
  145. }
  146. return NULL;
  147. }
  148. #define COND_IN_PTHREAD
  149. #ifndef COND_IN_PTHREAD
  150. #define pthread_join bthread_join
  151. #define pthread_create bthread_start_urgent
  152. #endif
  153. TEST(CondTest, cpp_wrapper) {
  154. stop = false;
  155. bthread::ConditionVariable cond;
  156. pthread_t bmutex_waiter_threads[8];
  157. pthread_t mutex_waiter_threads[8];
  158. pthread_t signal_thread;
  159. WrapperArg a;
  160. for (size_t i = 0; i < ARRAY_SIZE(bmutex_waiter_threads); ++i) {
  161. ASSERT_EQ(0, pthread_create(&bmutex_waiter_threads[i], NULL,
  162. cv_bmutex_waiter, &a));
  163. ASSERT_EQ(0, pthread_create(&mutex_waiter_threads[i], NULL,
  164. cv_mutex_waiter, &a));
  165. }
  166. ASSERT_EQ(0, pthread_create(&signal_thread, NULL, cv_signaler, &a));
  167. bthread_usleep(100L * 1000);
  168. {
  169. BAIDU_SCOPED_LOCK(a.mutex);
  170. stop = true;
  171. }
  172. pthread_join(signal_thread, NULL);
  173. a.cond.notify_all();
  174. for (size_t i = 0; i < ARRAY_SIZE(bmutex_waiter_threads); ++i) {
  175. pthread_join(bmutex_waiter_threads[i], NULL);
  176. pthread_join(mutex_waiter_threads[i], NULL);
  177. }
  178. }
  179. #ifndef COND_IN_PTHREAD
  180. #undef pthread_join
  181. #undef pthread_create
  182. #endif
  183. class Signal {
  184. protected:
  185. Signal() : _signal(0) {}
  186. void notify() {
  187. BAIDU_SCOPED_LOCK(_m);
  188. ++_signal;
  189. _c.notify_one();
  190. }
  191. int wait(int old_signal) {
  192. std::unique_lock<bthread::Mutex> lck(_m);
  193. while (_signal == old_signal) {
  194. _c.wait(lck);
  195. }
  196. return _signal;
  197. }
  198. private:
  199. bthread::Mutex _m;
  200. bthread::ConditionVariable _c;
  201. int _signal;
  202. };
  203. struct PingPongArg {
  204. bool stopped;
  205. Signal sig1;
  206. Signal sig2;
  207. butil::atomic<int> nthread;
  208. butil::atomic<long> total_count;
  209. };
  210. void *ping_pong_thread(void* arg) {
  211. PingPongArg* a = (PingPongArg*)arg;
  212. long local_count = 0;
  213. bool odd = (a->nthread.fetch_add(1)) % 2;
  214. int old_signal = 0;
  215. while (!a->stopped) {
  216. if (odd) {
  217. a->sig1.notify();
  218. old_signal = a->sig2.wait(old_signal);
  219. } else {
  220. old_signal = a->sig1.wait(old_signal);
  221. a->sig2.notify();
  222. }
  223. ++local_count;
  224. }
  225. a->total_count.fetch_add(local_count);
  226. return NULL;
  227. }
  228. TEST(CondTest, ping_pong) {
  229. PingPongArg arg;
  230. arg.stopped = false;
  231. arg.nthread = 0;
  232. bthread_t threads[2];
  233. ProfilerStart("cond.prof");
  234. for (int i = 0; i < 2; ++i) {
  235. ASSERT_EQ(0, bthread_start_urgent(&threads[i], NULL, ping_pong_thread, &arg));
  236. }
  237. usleep(1000 * 1000);
  238. arg.stopped = true;
  239. arg.sig1.notify();
  240. arg.sig2.notify();
  241. for (int i = 0; i < 2; ++i) {
  242. ASSERT_EQ(0, bthread_join(threads[i], NULL));
  243. }
  244. ProfilerStop();
  245. LOG(INFO) << "total_count=" << arg.total_count.load();
  246. }
  247. struct BroadcastArg {
  248. bthread::ConditionVariable wait_cond;
  249. bthread::ConditionVariable broadcast_cond;
  250. bthread::Mutex mutex;
  251. int nwaiter;
  252. int cur_waiter;
  253. int rounds;
  254. int sig;
  255. };
  256. void* wait_thread(void* arg) {
  257. BroadcastArg* ba = (BroadcastArg*)arg;
  258. std::unique_lock<bthread::Mutex> lck(ba->mutex);
  259. while (ba->rounds > 0) {
  260. const int saved_round = ba->rounds;
  261. ++ba->cur_waiter;
  262. while (saved_round == ba->rounds) {
  263. if (ba->cur_waiter >= ba->nwaiter) {
  264. ba->broadcast_cond.notify_one();
  265. }
  266. ba->wait_cond.wait(lck);
  267. }
  268. }
  269. return NULL;
  270. }
  271. void* broadcast_thread(void* arg) {
  272. BroadcastArg* ba = (BroadcastArg*)arg;
  273. //int local_round = 0;
  274. while (ba->rounds > 0) {
  275. std::unique_lock<bthread::Mutex> lck(ba->mutex);
  276. while (ba->cur_waiter < ba->nwaiter) {
  277. ba->broadcast_cond.wait(lck);
  278. }
  279. ba->cur_waiter = 0;
  280. --ba->rounds;
  281. ba->wait_cond.notify_all();
  282. }
  283. return NULL;
  284. }
  285. void* disturb_thread(void* arg) {
  286. BroadcastArg* ba = (BroadcastArg*)arg;
  287. std::unique_lock<bthread::Mutex> lck(ba->mutex);
  288. while (ba->rounds > 0) {
  289. lck.unlock();
  290. lck.lock();
  291. }
  292. return NULL;
  293. }
  294. TEST(CondTest, mixed_usage) {
  295. BroadcastArg ba;
  296. ba.nwaiter = 0;
  297. ba.cur_waiter = 0;
  298. ba.rounds = 30000;
  299. const int NTHREADS = 10;
  300. ba.nwaiter = NTHREADS * 2;
  301. bthread_t normal_threads[NTHREADS];
  302. for (int i = 0; i < NTHREADS; ++i) {
  303. ASSERT_EQ(0, bthread_start_urgent(&normal_threads[i], NULL, wait_thread, &ba));
  304. }
  305. pthread_t pthreads[NTHREADS];
  306. for (int i = 0; i < NTHREADS; ++i) {
  307. ASSERT_EQ(0, pthread_create(&pthreads[i], NULL,
  308. wait_thread, &ba));
  309. }
  310. pthread_t broadcast;
  311. pthread_t disturb;
  312. ASSERT_EQ(0, pthread_create(&broadcast, NULL, broadcast_thread, &ba));
  313. ASSERT_EQ(0, pthread_create(&disturb, NULL, disturb_thread, &ba));
  314. for (int i = 0; i < NTHREADS; ++i) {
  315. bthread_join(normal_threads[i], NULL);
  316. pthread_join(pthreads[i], NULL);
  317. }
  318. pthread_join(broadcast, NULL);
  319. pthread_join(disturb, NULL);
  320. }
  321. class BthreadCond {
  322. public:
  323. BthreadCond() {
  324. bthread_cond_init(&_cond, NULL);
  325. bthread_mutex_init(&_mutex, NULL);
  326. _count = 1;
  327. }
  328. ~BthreadCond() {
  329. bthread_mutex_destroy(&_mutex);
  330. bthread_cond_destroy(&_cond);
  331. }
  332. void Init(int count = 1) {
  333. _count = count;
  334. }
  335. int Signal() {
  336. int ret = 0;
  337. bthread_mutex_lock(&_mutex);
  338. _count --;
  339. bthread_cond_signal(&_cond);
  340. bthread_mutex_unlock(&_mutex);
  341. return ret;
  342. }
  343. int Wait() {
  344. int ret = 0;
  345. bthread_mutex_lock(&_mutex);
  346. while (_count > 0) {
  347. ret = bthread_cond_wait(&_cond, &_mutex);
  348. }
  349. bthread_mutex_unlock(&_mutex);
  350. return ret;
  351. }
  352. private:
  353. int _count;
  354. bthread_cond_t _cond;
  355. bthread_mutex_t _mutex;
  356. };
  357. volatile bool g_stop = false;
  358. bool started_wait = false;
  359. bool ended_wait = false;
  360. void* usleep_thread(void *) {
  361. while (!g_stop) {
  362. bthread_usleep(1000L * 1000L);
  363. }
  364. return NULL;
  365. }
  366. void* wait_cond_thread(void* arg) {
  367. BthreadCond* c = (BthreadCond*)arg;
  368. started_wait = true;
  369. c->Wait();
  370. ended_wait = true;
  371. return NULL;
  372. }
  373. static void launch_many_bthreads() {
  374. g_stop = false;
  375. bthread_t tid;
  376. BthreadCond c;
  377. c.Init();
  378. butil::Timer tm;
  379. bthread_start_urgent(&tid, &BTHREAD_ATTR_PTHREAD, wait_cond_thread, &c);
  380. std::vector<bthread_t> tids;
  381. tids.reserve(32768);
  382. tm.start();
  383. for (size_t i = 0; i < 32768; ++i) {
  384. bthread_t t0;
  385. ASSERT_EQ(0, bthread_start_background(&t0, NULL, usleep_thread, NULL));
  386. tids.push_back(t0);
  387. }
  388. tm.stop();
  389. LOG(INFO) << "Creating bthreads took " << tm.u_elapsed() << " us";
  390. usleep(3 * 1000 * 1000L);
  391. c.Signal();
  392. g_stop = true;
  393. bthread_join(tid, NULL);
  394. for (size_t i = 0; i < tids.size(); ++i) {
  395. LOG_EVERY_SECOND(INFO) << "Joined " << i << " threads";
  396. bthread_join(tids[i], NULL);
  397. }
  398. LOG_EVERY_SECOND(INFO) << "Joined " << tids.size() << " threads";
  399. }
  400. TEST(CondTest, too_many_bthreads_from_pthread) {
  401. launch_many_bthreads();
  402. }
  403. static void* run_launch_many_bthreads(void*) {
  404. launch_many_bthreads();
  405. return NULL;
  406. }
  407. TEST(CondTest, too_many_bthreads_from_bthread) {
  408. bthread_t th;
  409. ASSERT_EQ(0, bthread_start_urgent(&th, NULL, run_launch_many_bthreads, NULL));
  410. bthread_join(th, NULL);
  411. }
  412. } // namespace