bthread_ping_pong_unittest.cpp 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <stdlib.h>
  18. #include <unistd.h>
  19. #include <stdio.h>
  20. #include <signal.h>
  21. #include <gflags/gflags.h>
  22. #include <gtest/gtest.h>
  23. #include "butil/compat.h"
  24. #include "butil/time.h"
  25. #include "butil/macros.h"
  26. #include "butil/errno.h"
  27. #include <bthread/sys_futex.h>
  28. #include <bthread/butex.h>
  29. #include "bthread/bthread.h"
  30. #include "butil/atomicops.h"
  31. namespace {
  32. DEFINE_int32(thread_num, 1, "#pairs of threads doing ping pong");
  33. DEFINE_bool(loop, false, "run until ctrl-C is pressed");
  34. DEFINE_bool(use_futex, false, "use futex instead of pipe");
  35. DEFINE_bool(use_butex, false, "use butex instead of pipe");
  36. void ALLOW_UNUSED (*ignore_sigpipe)(int) = signal(SIGPIPE, SIG_IGN);
  37. volatile bool stop = false;
  38. void quit_handler(int) {
  39. stop = true;
  40. }
  41. struct BAIDU_CACHELINE_ALIGNMENT AlignedIntWrapper {
  42. int value;
  43. };
  44. struct BAIDU_CACHELINE_ALIGNMENT PlayerArg {
  45. int read_fd;
  46. int write_fd;
  47. int* wait_addr;
  48. int* wake_addr;
  49. long counter;
  50. long wakeup;
  51. };
  52. void* pipe_player(void* void_arg) {
  53. PlayerArg* arg = static_cast<PlayerArg*>(void_arg);
  54. char dummy = '\0';
  55. while (1) {
  56. ssize_t nr = read(arg->read_fd, &dummy, 1);
  57. if (nr <= 0) {
  58. if (nr == 0) {
  59. printf("[%" PRIu64 "] EOF\n", pthread_numeric_id());
  60. break;
  61. }
  62. if (errno != EINTR) {
  63. printf("[%" PRIu64 "] bad read, %m\n", pthread_numeric_id());
  64. break;
  65. }
  66. continue;
  67. }
  68. if (1L != write(arg->write_fd, &dummy, 1)) {
  69. printf("[%" PRIu64 "] bad write, %m\n", pthread_numeric_id());
  70. break;
  71. }
  72. ++arg->counter;
  73. }
  74. return NULL;
  75. }
  76. static const int INITIAL_FUTEX_VALUE = 0;
  77. void* futex_player(void* void_arg) {
  78. PlayerArg* arg = static_cast<PlayerArg*>(void_arg);
  79. int counter = INITIAL_FUTEX_VALUE;
  80. while (!stop) {
  81. int rc = bthread::futex_wait_private(arg->wait_addr, counter, NULL);
  82. ++counter;
  83. ++*arg->wake_addr;
  84. bthread::futex_wake_private(arg->wake_addr, 1);
  85. ++arg->counter;
  86. arg->wakeup += (rc == 0);
  87. }
  88. return NULL;
  89. }
  90. void* butex_player(void* void_arg) {
  91. PlayerArg* arg = static_cast<PlayerArg*>(void_arg);
  92. int counter = INITIAL_FUTEX_VALUE;
  93. while (!stop) {
  94. int rc = bthread::butex_wait(arg->wait_addr, counter, NULL);
  95. ++counter;
  96. ++*arg->wake_addr;
  97. bthread::butex_wake(arg->wake_addr);
  98. ++arg->counter;
  99. arg->wakeup += (rc == 0);
  100. }
  101. return NULL;
  102. }
  103. TEST(PingPongTest, ping_pong) {
  104. signal(SIGINT, quit_handler);
  105. stop = false;
  106. PlayerArg* args[FLAGS_thread_num];
  107. for (int i = 0; i < FLAGS_thread_num; ++i) {
  108. int pipe1[2];
  109. int pipe2[2];
  110. if (!FLAGS_use_futex && !FLAGS_use_butex) {
  111. ASSERT_EQ(0, pipe(pipe1));
  112. ASSERT_EQ(0, pipe(pipe2));
  113. }
  114. PlayerArg* arg1 = new PlayerArg;
  115. if (!FLAGS_use_futex && !FLAGS_use_butex) {
  116. arg1->read_fd = pipe1[0];
  117. arg1->write_fd = pipe2[1];
  118. } else if (FLAGS_use_futex) {
  119. AlignedIntWrapper* w1 = new AlignedIntWrapper;
  120. w1->value = INITIAL_FUTEX_VALUE;
  121. AlignedIntWrapper* w2 = new AlignedIntWrapper;
  122. w2->value = INITIAL_FUTEX_VALUE;
  123. arg1->wait_addr = &w1->value;
  124. arg1->wake_addr = &w2->value;
  125. } else if (FLAGS_use_butex) {
  126. arg1->wait_addr = bthread::butex_create_checked<int>();
  127. *arg1->wait_addr = INITIAL_FUTEX_VALUE;
  128. arg1->wake_addr = bthread::butex_create_checked<int>();
  129. *arg1->wake_addr = INITIAL_FUTEX_VALUE;
  130. } else {
  131. ASSERT_TRUE(false);
  132. }
  133. arg1->counter = 0;
  134. arg1->wakeup = 0;
  135. args[i] = arg1;
  136. PlayerArg* arg2 = new PlayerArg;
  137. if (!FLAGS_use_futex && !FLAGS_use_butex) {
  138. arg2->read_fd = pipe2[0];
  139. arg2->write_fd = pipe1[1];
  140. } else {
  141. arg2->wait_addr = arg1->wake_addr;
  142. arg2->wake_addr = arg1->wait_addr;
  143. }
  144. arg2->counter = 0;
  145. arg2->wakeup = 0;
  146. pthread_t th1, th2;
  147. bthread_t bth1, bth2;
  148. if (!FLAGS_use_futex && !FLAGS_use_butex) {
  149. ASSERT_EQ(0, pthread_create(&th1, NULL, pipe_player, arg1));
  150. ASSERT_EQ(0, pthread_create(&th2, NULL, pipe_player, arg2));
  151. } else if (FLAGS_use_futex) {
  152. ASSERT_EQ(0, pthread_create(&th1, NULL, futex_player, arg1));
  153. ASSERT_EQ(0, pthread_create(&th2, NULL, futex_player, arg2));
  154. } else if (FLAGS_use_butex) {
  155. ASSERT_EQ(0, bthread_start_background(&bth1, NULL, butex_player, arg1));
  156. ASSERT_EQ(0, bthread_start_background(&bth2, NULL, butex_player, arg2));
  157. } else {
  158. ASSERT_TRUE(false);
  159. }
  160. if (!FLAGS_use_futex && !FLAGS_use_butex) {
  161. // send the seed data.
  162. unsigned char seed = 255;
  163. ASSERT_EQ(1L, write(pipe1[1], &seed, 1));
  164. } else if (FLAGS_use_futex) {
  165. ++*arg1->wait_addr;
  166. bthread::futex_wake_private(arg1->wait_addr, 1);
  167. } else if (FLAGS_use_butex) {
  168. ++*arg1->wait_addr;
  169. bthread::butex_wake(arg1->wait_addr);
  170. } else {
  171. ASSERT_TRUE(false);
  172. }
  173. }
  174. long last_counter = 0;
  175. long last_wakeup = 0;
  176. while (!stop) {
  177. butil::Timer tm;
  178. tm.start();
  179. sleep(1);
  180. tm.stop();
  181. long cur_counter = 0;
  182. long cur_wakeup = 0;
  183. for (int i = 0; i < FLAGS_thread_num; ++i) {
  184. cur_counter += args[i]->counter;
  185. cur_wakeup += args[i]->wakeup;
  186. }
  187. if (FLAGS_use_futex || FLAGS_use_butex) {
  188. printf("pingpong-ed %" PRId64 "/s, wakeup=%" PRId64 "/s\n",
  189. (cur_counter - last_counter) * 1000L / tm.m_elapsed(),
  190. (cur_wakeup - last_wakeup) * 1000L / tm.m_elapsed());
  191. } else {
  192. printf("pingpong-ed %" PRId64 "/s\n",
  193. (cur_counter - last_counter) * 1000L / tm.m_elapsed());
  194. }
  195. last_counter = cur_counter;
  196. last_wakeup = cur_wakeup;
  197. if (!FLAGS_loop) {
  198. break;
  199. }
  200. }
  201. stop = true;
  202. // Program quits, Let resource leak.
  203. }
  204. } // namespace