brpc_input_messenger_unittest.cpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // brpc - A framework to host and access services throughout Baidu.
  18. // Date: Sun Jul 13 15:04:18 CST 2014
  19. #include <sys/types.h>
  20. #include <sys/socket.h>
  21. #include <netdb.h> //
  22. #include <gtest/gtest.h>
  23. #include "butil/gperftools_profiler.h"
  24. #include "butil/time.h"
  25. #include "butil/macros.h"
  26. #include "butil/fd_utility.h"
  27. #include "butil/fd_guard.h"
  28. #include "butil/unix_socket.h"
  29. #include "brpc/acceptor.h"
  30. #include "brpc/policy/hulu_pbrpc_protocol.h"
  31. void EmptyProcessHuluRequest(brpc::InputMessageBase* msg_base) {
  32. brpc::DestroyingPtr<brpc::InputMessageBase> a(msg_base);
  33. }
  34. int main(int argc, char* argv[]) {
  35. testing::InitGoogleTest(&argc, argv);
  36. brpc::Protocol dummy_protocol =
  37. { brpc::policy::ParseHuluMessage,
  38. brpc::SerializeRequestDefault,
  39. brpc::policy::PackHuluRequest,
  40. EmptyProcessHuluRequest, EmptyProcessHuluRequest,
  41. NULL, NULL, NULL,
  42. brpc::CONNECTION_TYPE_ALL, "dummy_hulu" };
  43. EXPECT_EQ(0, RegisterProtocol((brpc::ProtocolType)30, dummy_protocol));
  44. return RUN_ALL_TESTS();
  45. }
  46. class MessengerTest : public ::testing::Test{
  47. protected:
  48. MessengerTest(){
  49. };
  50. virtual ~MessengerTest(){};
  51. virtual void SetUp() {
  52. };
  53. virtual void TearDown() {
  54. };
  55. };
  56. #define USE_UNIX_DOMAIN_SOCKET 1
  57. const size_t NEPOLL = 1;
  58. const size_t NCLIENT = 6;
  59. const size_t NMESSAGE = 1024;
  60. const size_t MESSAGE_SIZE = 32;
  61. inline uint32_t fmix32 ( uint32_t h ) {
  62. h ^= h >> 16;
  63. h *= 0x85ebca6b;
  64. h ^= h >> 13;
  65. h *= 0xc2b2ae35;
  66. h ^= h >> 16;
  67. return h;
  68. }
  69. volatile bool client_stop = false;
  70. struct BAIDU_CACHELINE_ALIGNMENT ClientMeta {
  71. size_t times;
  72. size_t bytes;
  73. };
  74. butil::atomic<size_t> client_index(0);
  75. void* client_thread(void* arg) {
  76. ClientMeta* m = (ClientMeta*)arg;
  77. size_t offset = 0;
  78. m->times = 0;
  79. m->bytes = 0;
  80. const size_t buf_cap = NMESSAGE * MESSAGE_SIZE;
  81. char* buf = (char*)malloc(buf_cap);
  82. for (size_t i = 0; i < NMESSAGE; ++i) {
  83. memcpy(buf + i * MESSAGE_SIZE, "HULU", 4);
  84. // HULU use host byte order directly...
  85. *(uint32_t*)(buf + i * MESSAGE_SIZE + 4) = MESSAGE_SIZE - 12;
  86. *(uint32_t*)(buf + i * MESSAGE_SIZE + 8) = 4;
  87. }
  88. #ifdef USE_UNIX_DOMAIN_SOCKET
  89. const size_t id = client_index.fetch_add(1);
  90. char socket_name[64];
  91. snprintf(socket_name, sizeof(socket_name), "input_messenger.socket%lu",
  92. (id % NEPOLL));
  93. butil::fd_guard fd(butil::unix_socket_connect(socket_name));
  94. if (fd < 0) {
  95. PLOG(FATAL) << "Fail to connect to " << socket_name;
  96. return NULL;
  97. }
  98. #else
  99. butil::EndPoint point(butil::IP_ANY, 7878);
  100. butil::fd_guard fd(butil::tcp_connect(point, NULL));
  101. if (fd < 0) {
  102. PLOG(FATAL) << "Fail to connect to " << point;
  103. return NULL;
  104. }
  105. #endif
  106. while (!client_stop) {
  107. ssize_t n;
  108. if (offset == 0) {
  109. n = write(fd, buf, buf_cap);
  110. } else {
  111. iovec v[2];
  112. v[0].iov_base = buf + offset;
  113. v[0].iov_len = buf_cap - offset;
  114. v[1].iov_base = buf;
  115. v[1].iov_len = offset;
  116. n = writev(fd, v, 2);
  117. }
  118. if (n < 0) {
  119. if (errno != EINTR) {
  120. PLOG(FATAL) << "Fail to write fd=" << fd;
  121. return NULL;
  122. }
  123. } else {
  124. ++m->times;
  125. m->bytes += n;
  126. offset += n;
  127. if (offset >= buf_cap) {
  128. offset -= buf_cap;
  129. }
  130. }
  131. }
  132. return NULL;
  133. }
  134. TEST_F(MessengerTest, dispatch_tasks) {
  135. client_stop = false;
  136. brpc::Acceptor messenger[NEPOLL];
  137. pthread_t cth[NCLIENT];
  138. ClientMeta* cm[NCLIENT];
  139. const brpc::InputMessageHandler pairs[] = {
  140. { brpc::policy::ParseHuluMessage,
  141. EmptyProcessHuluRequest, NULL, NULL, "dummy_hulu" }
  142. };
  143. for (size_t i = 0; i < NEPOLL; ++i) {
  144. #ifdef USE_UNIX_DOMAIN_SOCKET
  145. char buf[64];
  146. snprintf(buf, sizeof(buf), "input_messenger.socket%lu", i);
  147. int listening_fd = butil::unix_socket_listen(buf);
  148. #else
  149. int listening_fd = tcp_listen(butil::EndPoint(butil::IP_ANY, 7878));
  150. #endif
  151. ASSERT_TRUE(listening_fd > 0);
  152. butil::make_non_blocking(listening_fd);
  153. ASSERT_EQ(0, messenger[i].AddHandler(pairs[0]));
  154. ASSERT_EQ(0, messenger[i].StartAccept(listening_fd, -1, NULL));
  155. }
  156. for (size_t i = 0; i < NCLIENT; ++i) {
  157. cm[i] = new ClientMeta;
  158. cm[i]->times = 0;
  159. cm[i]->bytes = 0;
  160. ASSERT_EQ(0, pthread_create(&cth[i], NULL, client_thread, cm[i]));
  161. }
  162. sleep(1);
  163. LOG(INFO) << "Begin to profile... (5 seconds)";
  164. ProfilerStart("input_messenger.prof");
  165. size_t start_client_bytes = 0;
  166. for (size_t i = 0; i < NCLIENT; ++i) {
  167. start_client_bytes += cm[i]->bytes;
  168. }
  169. butil::Timer tm;
  170. tm.start();
  171. sleep(5);
  172. tm.stop();
  173. ProfilerStop();
  174. LOG(INFO) << "End profiling";
  175. client_stop = true;
  176. size_t client_bytes = 0;
  177. for (size_t i = 0; i < NCLIENT; ++i) {
  178. client_bytes += cm[i]->bytes;
  179. }
  180. LOG(INFO) << "client_tp=" << (client_bytes - start_client_bytes) / (double)tm.u_elapsed()
  181. << "MB/s client_msg="
  182. << (client_bytes - start_client_bytes) * 1000000L / (MESSAGE_SIZE * tm.u_elapsed())
  183. << "/s";
  184. for (size_t i = 0; i < NCLIENT; ++i) {
  185. pthread_join(cth[i], NULL);
  186. printf("joined client %lu\n", i);
  187. }
  188. for (size_t i = 0; i < NEPOLL; ++i) {
  189. messenger[i].StopAccept(0);
  190. }
  191. sleep(1);
  192. LOG(WARNING) << "begin to exit!!!!";
  193. }