bthread_futex_unittest.cpp 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <stdlib.h>
  18. #include <unistd.h>
  19. #include <stdio.h>
  20. #include <signal.h>
  21. #include <gtest/gtest.h>
  22. #include "butil/time.h"
  23. #include "butil/macros.h"
  24. #include "butil/errno.h"
  25. #include <limits.h> // INT_MAX
  26. #include "butil/atomicops.h"
  27. #include "bthread/bthread.h"
  28. #include <bthread/sys_futex.h>
  29. #include <bthread/processor.h>
  30. namespace {
  31. volatile bool stop = false;
  32. butil::atomic<int> nthread(0);
  33. void* read_thread(void* arg) {
  34. butil::atomic<int>* m = (butil::atomic<int>*)arg;
  35. int njob = 0;
  36. while (!stop) {
  37. int x;
  38. while (!stop && (x = *m) != 0) {
  39. if (x > 0) {
  40. while ((x = m->fetch_sub(1)) > 0) {
  41. ++njob;
  42. const long start = butil::cpuwide_time_ns();
  43. while (butil::cpuwide_time_ns() < start + 10000) {
  44. }
  45. if (stop) {
  46. return new int(njob);
  47. }
  48. }
  49. m->fetch_add(1);
  50. } else {
  51. cpu_relax();
  52. }
  53. }
  54. ++nthread;
  55. bthread::futex_wait_private(m/*lock1*/, 0/*consumed_njob*/, NULL);
  56. --nthread;
  57. }
  58. return new int(njob);
  59. }
  60. TEST(FutexTest, rdlock_performance) {
  61. const size_t N = 100000;
  62. butil::atomic<int> lock1(0);
  63. pthread_t rth[8];
  64. for (size_t i = 0; i < ARRAY_SIZE(rth); ++i) {
  65. ASSERT_EQ(0, pthread_create(&rth[i], NULL, read_thread, &lock1));
  66. }
  67. const int64_t t1 = butil::cpuwide_time_ns();
  68. for (size_t i = 0; i < N; ++i) {
  69. if (nthread) {
  70. lock1.fetch_add(1);
  71. bthread::futex_wake_private(&lock1, 1);
  72. } else {
  73. lock1.fetch_add(1);
  74. if (nthread) {
  75. bthread::futex_wake_private(&lock1, 1);
  76. }
  77. }
  78. }
  79. const int64_t t2 = butil::cpuwide_time_ns();
  80. bthread_usleep(3000000);
  81. stop = true;
  82. for (int i = 0; i < 10; ++i) {
  83. bthread::futex_wake_private(&lock1, INT_MAX);
  84. sched_yield();
  85. }
  86. int njob = 0;
  87. int* res;
  88. for (size_t i = 0; i < ARRAY_SIZE(rth); ++i) {
  89. pthread_join(rth[i], (void**)&res);
  90. njob += *res;
  91. delete res;
  92. }
  93. printf("wake %lu times, %" PRId64 "ns each, lock1=%d njob=%d\n",
  94. N, (t2-t1)/N, lock1.load(), njob);
  95. ASSERT_EQ(N, (size_t)(lock1.load() + njob));
  96. }
  97. TEST(FutexTest, futex_wake_before_wait) {
  98. int lock1 = 0;
  99. timespec timeout = { 1, 0 };
  100. ASSERT_EQ(0, bthread::futex_wake_private(&lock1, INT_MAX));
  101. ASSERT_EQ(-1, bthread::futex_wait_private(&lock1, 0, &timeout));
  102. ASSERT_EQ(ETIMEDOUT, errno);
  103. }
  104. void* dummy_waiter(void* lock) {
  105. bthread::futex_wait_private(lock, 0, NULL);
  106. return NULL;
  107. }
  108. TEST(FutexTest, futex_wake_many_waiters_perf) {
  109. int lock1 = 0;
  110. size_t N = 0;
  111. pthread_t th;
  112. for (; N < 1000 && !pthread_create(&th, NULL, dummy_waiter, &lock1); ++N) {}
  113. sleep(1);
  114. int nwakeup = 0;
  115. butil::Timer tm;
  116. tm.start();
  117. for (size_t i = 0; i < N; ++i) {
  118. nwakeup += bthread::futex_wake_private(&lock1, 1);
  119. }
  120. tm.stop();
  121. printf("N=%lu, futex_wake a thread = %" PRId64 "ns\n", N, tm.n_elapsed() / N);
  122. ASSERT_EQ(N, (size_t)nwakeup);
  123. sleep(2);
  124. const size_t REP = 10000;
  125. nwakeup = 0;
  126. tm.start();
  127. for (size_t i = 0; i < REP; ++i) {
  128. nwakeup += bthread::futex_wake_private(&lock1, 1);
  129. }
  130. tm.stop();
  131. ASSERT_EQ(0, nwakeup);
  132. printf("futex_wake nop = %" PRId64 "ns\n", tm.n_elapsed() / REP);
  133. }
  134. butil::atomic<int> nevent(0);
  135. void* waker(void* lock) {
  136. bthread_usleep(10000);
  137. const size_t REP = 100000;
  138. int nwakeup = 0;
  139. butil::Timer tm;
  140. tm.start();
  141. for (size_t i = 0; i < REP; ++i) {
  142. nwakeup += bthread::futex_wake_private(lock, 1);
  143. }
  144. tm.stop();
  145. EXPECT_EQ(0, nwakeup);
  146. printf("futex_wake nop = %" PRId64 "ns\n", tm.n_elapsed() / REP);
  147. return NULL;
  148. }
  149. void* batch_waker(void* lock) {
  150. bthread_usleep(10000);
  151. const size_t REP = 100000;
  152. int nwakeup = 0;
  153. butil::Timer tm;
  154. tm.start();
  155. for (size_t i = 0; i < REP; ++i) {
  156. if (nevent.fetch_add(1, butil::memory_order_relaxed) == 0) {
  157. nwakeup += bthread::futex_wake_private(lock, 1);
  158. int expected = 1;
  159. while (1) {
  160. int last_expected = expected;
  161. if (nevent.compare_exchange_strong(expected, 0, butil::memory_order_relaxed)) {
  162. break;
  163. }
  164. nwakeup += bthread::futex_wake_private(lock, expected - last_expected);
  165. }
  166. }
  167. }
  168. tm.stop();
  169. EXPECT_EQ(0, nwakeup);
  170. printf("futex_wake nop = %" PRId64 "ns\n", tm.n_elapsed() / REP);
  171. return NULL;
  172. }
  173. TEST(FutexTest, many_futex_wake_nop_perf) {
  174. pthread_t th[8];
  175. int lock1;
  176. std::cout << "[Direct wake]" << std::endl;
  177. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  178. ASSERT_EQ(0, pthread_create(&th[i], NULL, waker, &lock1));
  179. }
  180. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  181. ASSERT_EQ(0, pthread_join(th[i], NULL));
  182. }
  183. std::cout << "[Batch wake]" << std::endl;
  184. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  185. ASSERT_EQ(0, pthread_create(&th[i], NULL, batch_waker, &lock1));
  186. }
  187. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  188. ASSERT_EQ(0, pthread_join(th[i], NULL));
  189. }
  190. }
  191. } // namespace