bthread_butex_unittest.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <gtest/gtest.h>
  18. #include "butil/atomicops.h"
  19. #include "butil/time.h"
  20. #include "butil/macros.h"
  21. #include "butil/logging.h"
  22. #include "bthread/butex.h"
  23. #include "bthread/task_control.h"
  24. #include "bthread/task_group.h"
  25. #include "bthread/bthread.h"
  26. #include "bthread/unstable.h"
  27. namespace bthread {
  28. extern butil::atomic<TaskControl*> g_task_control;
  29. inline TaskControl* get_task_control() {
  30. return g_task_control.load(butil::memory_order_consume);
  31. }
  32. } // namespace bthread
  33. namespace {
  34. TEST(ButexTest, wait_on_already_timedout_butex) {
  35. uint32_t* butex = bthread::butex_create_checked<uint32_t>();
  36. ASSERT_TRUE(butex);
  37. timespec now;
  38. ASSERT_EQ(0, clock_gettime(CLOCK_REALTIME, &now));
  39. *butex = 1;
  40. ASSERT_EQ(-1, bthread::butex_wait(butex, 1, &now));
  41. ASSERT_EQ(ETIMEDOUT, errno);
  42. }
  43. void* sleeper(void* arg) {
  44. bthread_usleep((uint64_t)arg);
  45. return NULL;
  46. }
  47. void* joiner(void* arg) {
  48. const long t1 = butil::gettimeofday_us();
  49. for (bthread_t* th = (bthread_t*)arg; *th; ++th) {
  50. if (0 != bthread_join(*th, NULL)) {
  51. LOG(FATAL) << "fail to join thread_" << th - (bthread_t*)arg;
  52. }
  53. long elp = butil::gettimeofday_us() - t1;
  54. EXPECT_LE(labs(elp - (th - (bthread_t*)arg + 1) * 100000L), 15000L)
  55. << "timeout when joining thread_" << th - (bthread_t*)arg;
  56. LOG(INFO) << "Joined thread " << *th << " at " << elp << "us ["
  57. << bthread_self() << "]";
  58. }
  59. for (bthread_t* th = (bthread_t*)arg; *th; ++th) {
  60. EXPECT_EQ(0, bthread_join(*th, NULL));
  61. }
  62. return NULL;
  63. }
  64. struct A {
  65. uint64_t a;
  66. char dummy[0];
  67. };
  68. struct B {
  69. uint64_t a;
  70. };
  71. TEST(ButexTest, with_or_without_array_zero) {
  72. ASSERT_EQ(sizeof(B), sizeof(A));
  73. }
  74. TEST(ButexTest, join) {
  75. const size_t N = 6;
  76. const size_t M = 6;
  77. bthread_t th[N+1];
  78. bthread_t jth[M];
  79. pthread_t pth[M];
  80. for (size_t i = 0; i < N; ++i) {
  81. bthread_attr_t attr = (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
  82. ASSERT_EQ(0, bthread_start_urgent(
  83. &th[i], &attr, sleeper,
  84. (void*)(100000L/*100ms*/ * (i + 1))));
  85. }
  86. th[N] = 0; // joiner will join tids in `th' until seeing 0.
  87. for (size_t i = 0; i < M; ++i) {
  88. ASSERT_EQ(0, bthread_start_urgent(&jth[i], NULL, joiner, th));
  89. }
  90. for (size_t i = 0; i < M; ++i) {
  91. ASSERT_EQ(0, pthread_create(&pth[i], NULL, joiner, th));
  92. }
  93. for (size_t i = 0; i < M; ++i) {
  94. ASSERT_EQ(0, bthread_join(jth[i], NULL))
  95. << "i=" << i << " error=" << berror();
  96. }
  97. for (size_t i = 0; i < M; ++i) {
  98. ASSERT_EQ(0, pthread_join(pth[i], NULL));
  99. }
  100. }
  101. struct WaiterArg {
  102. int expected_result;
  103. int expected_value;
  104. butil::atomic<int> *butex;
  105. const timespec *ptimeout;
  106. };
  107. void* waiter(void* arg) {
  108. WaiterArg * wa = (WaiterArg*)arg;
  109. const long t1 = butil::gettimeofday_us();
  110. const int rc = bthread::butex_wait(
  111. wa->butex, wa->expected_value, wa->ptimeout);
  112. const long t2 = butil::gettimeofday_us();
  113. if (rc == 0) {
  114. EXPECT_EQ(wa->expected_result, 0) << bthread_self();
  115. } else {
  116. EXPECT_EQ(wa->expected_result, errno) << bthread_self();
  117. }
  118. LOG(INFO) << "after wait, time=" << (t2-t1) << "us";
  119. return NULL;
  120. }
  121. TEST(ButexTest, sanity) {
  122. const size_t N = 5;
  123. WaiterArg args[N * 4];
  124. pthread_t t1, t2;
  125. butil::atomic<int>* b1 =
  126. bthread::butex_create_checked<butil::atomic<int> >();
  127. ASSERT_TRUE(b1);
  128. bthread::butex_destroy(b1);
  129. b1 = bthread::butex_create_checked<butil::atomic<int> >();
  130. *b1 = 1;
  131. ASSERT_EQ(0, bthread::butex_wake(b1));
  132. WaiterArg *unmatched_arg = new WaiterArg;
  133. unmatched_arg->expected_value = *b1 + 1;
  134. unmatched_arg->expected_result = EWOULDBLOCK;
  135. unmatched_arg->butex = b1;
  136. unmatched_arg->ptimeout = NULL;
  137. pthread_create(&t2, NULL, waiter, unmatched_arg);
  138. bthread_t th;
  139. ASSERT_EQ(0, bthread_start_urgent(&th, NULL, waiter, unmatched_arg));
  140. const timespec abstime = butil::seconds_from_now(1);
  141. for (size_t i = 0; i < 4*N; ++i) {
  142. args[i].expected_value = *b1;
  143. args[i].butex = b1;
  144. if ((i % 2) == 0) {
  145. args[i].expected_result = 0;
  146. args[i].ptimeout = NULL;
  147. } else {
  148. args[i].expected_result = ETIMEDOUT;
  149. args[i].ptimeout = &abstime;
  150. }
  151. if (i < 2*N) {
  152. pthread_create(&t1, NULL, waiter, &args[i]);
  153. } else {
  154. ASSERT_EQ(0, bthread_start_urgent(&th, NULL, waiter, &args[i]));
  155. }
  156. }
  157. sleep(2);
  158. for (size_t i = 0; i < 2*N; ++i) {
  159. ASSERT_EQ(1, bthread::butex_wake(b1));
  160. }
  161. ASSERT_EQ(0, bthread::butex_wake(b1));
  162. sleep(1);
  163. bthread::butex_destroy(b1);
  164. }
  165. struct ButexWaitArg {
  166. int* butex;
  167. int expected_val;
  168. long wait_msec;
  169. int error_code;
  170. };
  171. void* wait_butex(void* void_arg) {
  172. ButexWaitArg* arg = static_cast<ButexWaitArg*>(void_arg);
  173. const timespec ts = butil::milliseconds_from_now(arg->wait_msec);
  174. int rc = bthread::butex_wait(arg->butex, arg->expected_val, &ts);
  175. int saved_errno = errno;
  176. if (arg->error_code) {
  177. EXPECT_EQ(-1, rc);
  178. EXPECT_EQ(arg->error_code, saved_errno);
  179. } else {
  180. EXPECT_EQ(0, rc);
  181. }
  182. return NULL;
  183. }
  184. TEST(ButexTest, wait_without_stop) {
  185. int* butex = bthread::butex_create_checked<int>();
  186. *butex = 7;
  187. butil::Timer tm;
  188. const long WAIT_MSEC = 500;
  189. for (int i = 0; i < 2; ++i) {
  190. const bthread_attr_t attr =
  191. (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
  192. ButexWaitArg arg = { butex, *butex, WAIT_MSEC, ETIMEDOUT };
  193. bthread_t th;
  194. tm.start();
  195. ASSERT_EQ(0, bthread_start_urgent(&th, &attr, wait_butex, &arg));
  196. ASSERT_EQ(0, bthread_join(th, NULL));
  197. tm.stop();
  198. ASSERT_LT(labs(tm.m_elapsed() - WAIT_MSEC), 250);
  199. }
  200. bthread::butex_destroy(butex);
  201. }
  202. TEST(ButexTest, stop_after_running) {
  203. int* butex = bthread::butex_create_checked<int>();
  204. *butex = 7;
  205. butil::Timer tm;
  206. const long WAIT_MSEC = 500;
  207. const long SLEEP_MSEC = 10;
  208. for (int i = 0; i < 2; ++i) {
  209. const bthread_attr_t attr =
  210. (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
  211. bthread_t th;
  212. ButexWaitArg arg = { butex, *butex, WAIT_MSEC, EINTR };
  213. tm.start();
  214. ASSERT_EQ(0, bthread_start_urgent(&th, &attr, wait_butex, &arg));
  215. ASSERT_EQ(0, bthread_usleep(SLEEP_MSEC * 1000L));
  216. ASSERT_EQ(0, bthread_stop(th));
  217. ASSERT_EQ(0, bthread_join(th, NULL));
  218. tm.stop();
  219. ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 25);
  220. // ASSERT_TRUE(bthread::get_task_control()->
  221. // timer_thread()._idset.empty());
  222. ASSERT_EQ(EINVAL, bthread_stop(th));
  223. }
  224. bthread::butex_destroy(butex);
  225. }
  226. TEST(ButexTest, stop_before_running) {
  227. int* butex = bthread::butex_create_checked<int>();
  228. *butex = 7;
  229. butil::Timer tm;
  230. const long WAIT_MSEC = 500;
  231. for (int i = 0; i < 2; ++i) {
  232. const bthread_attr_t attr =
  233. (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
  234. bthread_t th;
  235. ButexWaitArg arg = { butex, *butex, WAIT_MSEC, EINTR };
  236. tm.start();
  237. ASSERT_EQ(0, bthread_start_background(&th, &attr, wait_butex, &arg));
  238. ASSERT_EQ(0, bthread_stop(th));
  239. bthread_flush();
  240. ASSERT_EQ(0, bthread_join(th, NULL));
  241. tm.stop();
  242. ASSERT_LT(tm.m_elapsed(), 5);
  243. // ASSERT_TRUE(bthread::get_task_control()->
  244. // timer_thread()._idset.empty());
  245. ASSERT_EQ(EINVAL, bthread_stop(th));
  246. }
  247. bthread::butex_destroy(butex);
  248. }
  249. void* join_the_waiter(void* arg) {
  250. EXPECT_EQ(0, bthread_join((bthread_t)arg, NULL));
  251. return NULL;
  252. }
  253. TEST(ButexTest, join_cant_be_wakeup) {
  254. const long WAIT_MSEC = 100;
  255. int* butex = bthread::butex_create_checked<int>();
  256. *butex = 7;
  257. butil::Timer tm;
  258. ButexWaitArg arg = { butex, *butex, 1000, EINTR };
  259. for (int i = 0; i < 2; ++i) {
  260. const bthread_attr_t attr =
  261. (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
  262. tm.start();
  263. bthread_t th, th2;
  264. ASSERT_EQ(0, bthread_start_urgent(&th, NULL, wait_butex, &arg));
  265. ASSERT_EQ(0, bthread_start_urgent(&th2, &attr, join_the_waiter, (void*)th));
  266. ASSERT_EQ(0, bthread_stop(th2));
  267. ASSERT_EQ(0, bthread_usleep(WAIT_MSEC / 2 * 1000L));
  268. ASSERT_TRUE(bthread::TaskGroup::exists(th));
  269. ASSERT_TRUE(bthread::TaskGroup::exists(th2));
  270. ASSERT_EQ(0, bthread_usleep(WAIT_MSEC / 2 * 1000L));
  271. ASSERT_EQ(0, bthread_stop(th));
  272. ASSERT_EQ(0, bthread_join(th2, NULL));
  273. ASSERT_EQ(0, bthread_join(th, NULL));
  274. tm.stop();
  275. ASSERT_LT(tm.m_elapsed(), WAIT_MSEC + 15);
  276. ASSERT_EQ(EINVAL, bthread_stop(th));
  277. ASSERT_EQ(EINVAL, bthread_stop(th2));
  278. }
  279. bthread::butex_destroy(butex);
  280. }
  281. TEST(ButexTest, stop_after_slept) {
  282. butil::Timer tm;
  283. const long SLEEP_MSEC = 100;
  284. const long WAIT_MSEC = 10;
  285. for (int i = 0; i < 2; ++i) {
  286. const bthread_attr_t attr =
  287. (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
  288. tm.start();
  289. bthread_t th;
  290. ASSERT_EQ(0, bthread_start_urgent(
  291. &th, &attr, sleeper, (void*)(SLEEP_MSEC*1000L)));
  292. ASSERT_EQ(0, bthread_usleep(WAIT_MSEC * 1000L));
  293. ASSERT_EQ(0, bthread_stop(th));
  294. ASSERT_EQ(0, bthread_join(th, NULL));
  295. tm.stop();
  296. if (attr.stack_type == BTHREAD_STACKTYPE_PTHREAD) {
  297. ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 15);
  298. } else {
  299. ASSERT_LT(labs(tm.m_elapsed() - WAIT_MSEC), 15);
  300. }
  301. // ASSERT_TRUE(bthread::get_task_control()->
  302. // timer_thread()._idset.empty());
  303. ASSERT_EQ(EINVAL, bthread_stop(th));
  304. }
  305. }
  306. TEST(ButexTest, stop_just_when_sleeping) {
  307. butil::Timer tm;
  308. const long SLEEP_MSEC = 100;
  309. for (int i = 0; i < 2; ++i) {
  310. const bthread_attr_t attr =
  311. (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
  312. tm.start();
  313. bthread_t th;
  314. ASSERT_EQ(0, bthread_start_urgent(
  315. &th, &attr, sleeper, (void*)(SLEEP_MSEC*1000L)));
  316. ASSERT_EQ(0, bthread_stop(th));
  317. ASSERT_EQ(0, bthread_join(th, NULL));
  318. tm.stop();
  319. if (attr.stack_type == BTHREAD_STACKTYPE_PTHREAD) {
  320. ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 15);
  321. } else {
  322. ASSERT_LT(tm.m_elapsed(), 15);
  323. }
  324. // ASSERT_TRUE(bthread::get_task_control()->
  325. // timer_thread()._idset.empty());
  326. ASSERT_EQ(EINVAL, bthread_stop(th));
  327. }
  328. }
  329. TEST(ButexTest, stop_before_sleeping) {
  330. butil::Timer tm;
  331. const long SLEEP_MSEC = 100;
  332. for (int i = 0; i < 2; ++i) {
  333. bthread_t th;
  334. const bthread_attr_t attr =
  335. (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
  336. tm.start();
  337. ASSERT_EQ(0, bthread_start_background(&th, &attr, sleeper,
  338. (void*)(SLEEP_MSEC*1000L)));
  339. ASSERT_EQ(0, bthread_stop(th));
  340. bthread_flush();
  341. ASSERT_EQ(0, bthread_join(th, NULL));
  342. tm.stop();
  343. if (attr.stack_type == BTHREAD_STACKTYPE_PTHREAD) {
  344. ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 10);
  345. } else {
  346. ASSERT_LT(tm.m_elapsed(), 10);
  347. }
  348. // ASSERT_TRUE(bthread::get_task_control()->
  349. // timer_thread()._idset.empty());
  350. ASSERT_EQ(EINVAL, bthread_stop(th));
  351. }
  352. }
  353. } // namespace