brpc_event_dispatcher_unittest.cpp 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // brpc - A framework to host and access services throughout Baidu.
  18. // Date: Sun Jul 13 15:04:18 CST 2014
  19. #include <pthread.h>
  20. #include <sys/types.h>
  21. #include <sys/socket.h>
  22. #include <gtest/gtest.h>
  23. #include "butil/gperftools_profiler.h"
  24. #include "butil/time.h"
  25. #include "butil/macros.h"
  26. #include "butil/fd_utility.h"
  27. #include "brpc/event_dispatcher.h"
  28. #include "brpc/details/has_epollrdhup.h"
  29. class EventDispatcherTest : public ::testing::Test{
  30. protected:
  31. EventDispatcherTest(){
  32. };
  33. virtual ~EventDispatcherTest(){};
  34. virtual void SetUp() {
  35. };
  36. virtual void TearDown() {
  37. };
  38. };
  39. TEST_F(EventDispatcherTest, has_epollrdhup) {
  40. LOG(INFO) << brpc::has_epollrdhup;
  41. }
  42. TEST_F(EventDispatcherTest, versioned_ref) {
  43. butil::atomic<uint64_t> versioned_ref(2);
  44. versioned_ref.fetch_add(brpc::MakeVRef(0, -1),
  45. butil::memory_order_release);
  46. ASSERT_EQ(brpc::MakeVRef(1, 1), versioned_ref);
  47. }
  48. std::vector<int> err_fd;
  49. pthread_mutex_t err_fd_mutex = PTHREAD_MUTEX_INITIALIZER;
  50. std::vector<int> rel_fd;
  51. pthread_mutex_t rel_fd_mutex = PTHREAD_MUTEX_INITIALIZER;
  52. volatile bool client_stop = false;
  53. struct BAIDU_CACHELINE_ALIGNMENT ClientMeta {
  54. int fd;
  55. size_t times;
  56. size_t bytes;
  57. };
  58. struct BAIDU_CACHELINE_ALIGNMENT SocketExtra : public brpc::SocketUser {
  59. char* buf;
  60. size_t buf_cap;
  61. size_t bytes;
  62. size_t times;
  63. SocketExtra() {
  64. buf_cap = 32768;
  65. buf = (char*)malloc(buf_cap);
  66. bytes = 0;
  67. times = 0;
  68. }
  69. virtual void BeforeRecycle(brpc::Socket* m) {
  70. pthread_mutex_lock(&rel_fd_mutex);
  71. rel_fd.push_back(m->fd());
  72. pthread_mutex_unlock(&rel_fd_mutex);
  73. delete this;
  74. }
  75. static int OnEdgeTriggeredEventOnce(brpc::Socket* m) {
  76. SocketExtra* e = static_cast<SocketExtra*>(m->user());
  77. // Read all data.
  78. do {
  79. ssize_t n = read(m->fd(), e->buf, e->buf_cap);
  80. if (n == 0
  81. #ifdef BRPC_SOCKET_HAS_EOF
  82. || m->_eof
  83. #endif
  84. ) {
  85. pthread_mutex_lock(&err_fd_mutex);
  86. err_fd.push_back(m->fd());
  87. pthread_mutex_unlock(&err_fd_mutex);
  88. LOG(WARNING) << "Another end closed fd=" << m->fd();
  89. return -1;
  90. } else if (n > 0) {
  91. e->bytes += n;
  92. ++e->times;
  93. #ifdef BRPC_SOCKET_HAS_EOF
  94. if ((size_t)n < e->buf_cap && brpc::has_epollrdhup) {
  95. break;
  96. }
  97. #endif
  98. } else {
  99. if (errno == EAGAIN) {
  100. break;
  101. } else if (errno == EINTR) {
  102. continue;
  103. } else {
  104. PLOG(WARNING) << "Fail to read fd=" << m->fd();
  105. return -1;
  106. }
  107. }
  108. } while (1);
  109. return 0;
  110. }
  111. static void OnEdgeTriggeredEvents(brpc::Socket* m) {
  112. int progress = brpc::Socket::PROGRESS_INIT;
  113. do {
  114. if (OnEdgeTriggeredEventOnce(m) != 0) {
  115. m->SetFailed();
  116. return;
  117. }
  118. } while (m->MoreReadEvents(&progress));
  119. }
  120. };
  121. void* client_thread(void* arg) {
  122. ClientMeta* m = (ClientMeta*)arg;
  123. size_t offset = 0;
  124. m->times = 0;
  125. m->bytes = 0;
  126. const size_t buf_cap = 32768;
  127. char* buf = (char*)malloc(buf_cap);
  128. for (size_t i = 0; i < buf_cap/8; ++i) {
  129. ((uint64_t*)buf)[i] = i;
  130. }
  131. while (!client_stop) {
  132. ssize_t n;
  133. if (offset == 0) {
  134. n = write(m->fd, buf, buf_cap);
  135. } else {
  136. iovec v[2];
  137. v[0].iov_base = buf + offset;
  138. v[0].iov_len = buf_cap - offset;
  139. v[1].iov_base = buf;
  140. v[1].iov_len = offset;
  141. n = writev(m->fd, v, 2);
  142. }
  143. if (n < 0) {
  144. if (errno != EINTR) {
  145. PLOG(WARNING) << "Fail to write fd=" << m->fd;
  146. break;
  147. }
  148. } else {
  149. ++m->times;
  150. m->bytes += n;
  151. offset += n;
  152. if (offset >= buf_cap) {
  153. offset -= buf_cap;
  154. }
  155. }
  156. }
  157. EXPECT_EQ(0, close(m->fd));
  158. return NULL;
  159. }
  160. inline uint32_t fmix32 ( uint32_t h ) {
  161. h ^= h >> 16;
  162. h *= 0x85ebca6b;
  163. h ^= h >> 13;
  164. h *= 0xc2b2ae35;
  165. h ^= h >> 16;
  166. return h;
  167. }
  168. TEST_F(EventDispatcherTest, dispatch_tasks) {
  169. #ifdef BUTIL_RESOURCE_POOL_NEED_FREE_ITEM_NUM
  170. const butil::ResourcePoolInfo old_info =
  171. butil::describe_resources<brpc::Socket>();
  172. #endif
  173. client_stop = false;
  174. const size_t NCLIENT = 16;
  175. int fds[2 * NCLIENT];
  176. pthread_t cth[NCLIENT];
  177. ClientMeta* cm[NCLIENT];
  178. SocketExtra* sm[NCLIENT];
  179. for (size_t i = 0; i < NCLIENT; ++i) {
  180. ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds + 2 * i));
  181. sm[i] = new SocketExtra;
  182. const int fd = fds[i * 2];
  183. butil::make_non_blocking(fd);
  184. brpc::SocketId socket_id;
  185. brpc::SocketOptions options;
  186. options.fd = fd;
  187. options.user = sm[i];
  188. options.on_edge_triggered_events = SocketExtra::OnEdgeTriggeredEvents;
  189. ASSERT_EQ(0, brpc::Socket::Create(options, &socket_id));
  190. cm[i] = new ClientMeta;
  191. cm[i]->fd = fds[i * 2 + 1];
  192. cm[i]->times = 0;
  193. cm[i]->bytes = 0;
  194. ASSERT_EQ(0, pthread_create(&cth[i], NULL, client_thread, cm[i]));
  195. }
  196. LOG(INFO) << "Begin to profile... (5 seconds)";
  197. ProfilerStart("event_dispatcher.prof");
  198. butil::Timer tm;
  199. tm.start();
  200. sleep(5);
  201. tm.stop();
  202. ProfilerStop();
  203. LOG(INFO) << "End profiling";
  204. size_t client_bytes = 0;
  205. size_t server_bytes = 0;
  206. for (size_t i = 0; i < NCLIENT; ++i) {
  207. client_bytes += cm[i]->bytes;
  208. server_bytes += sm[i]->bytes;
  209. }
  210. LOG(INFO) << "client_tp=" << client_bytes / (double)tm.u_elapsed()
  211. << "MB/s server_tp=" << server_bytes / (double)tm.u_elapsed()
  212. << "MB/s";
  213. client_stop = true;
  214. for (size_t i = 0; i < NCLIENT; ++i) {
  215. pthread_join(cth[i], NULL);
  216. }
  217. sleep(1);
  218. std::vector<int> copy1, copy2;
  219. pthread_mutex_lock(&err_fd_mutex);
  220. copy1.swap(err_fd);
  221. pthread_mutex_unlock(&err_fd_mutex);
  222. pthread_mutex_lock(&rel_fd_mutex);
  223. copy2.swap(rel_fd);
  224. pthread_mutex_unlock(&rel_fd_mutex);
  225. std::sort(copy1.begin(), copy1.end());
  226. std::sort(copy2.begin(), copy2.end());
  227. ASSERT_EQ(copy1.size(), copy2.size());
  228. for (size_t i = 0; i < copy1.size(); ++i) {
  229. ASSERT_EQ(copy1[i], copy2[i]) << i;
  230. }
  231. ASSERT_EQ(NCLIENT, copy1.size());
  232. const butil::ResourcePoolInfo info
  233. = butil::describe_resources<brpc::Socket>();
  234. LOG(INFO) << info;
  235. #ifdef BUTIL_RESOURCE_POOL_NEED_FREE_ITEM_NUM
  236. ASSERT_EQ(NCLIENT, info.free_item_num - old_info.free_item_num);
  237. #endif
  238. }