client.cc 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <thread>
  4. #include <string>
  5. #include <atomic>
  6. #include <chrono>
  7. #include "benchmark_pb.srpc.h"
  8. #include "benchmark_thrift.srpc.h"
  9. using namespace srpc;
  10. #define TEST_SECOND 20
  11. #define GET_CURRENT_NS std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now().time_since_epoch()).count()
  12. std::atomic<int> query_count(0);
  13. std::atomic<int> success_count(0);
  14. std::atomic<int> error_count(0);
  15. std::atomic<int64_t> latency_sum(0);
  16. volatile bool stop_flag = false;
  17. int PARALLEL_NUMBER;
  18. std::string request_msg;
  19. template<class CLIENT>
  20. static void do_echo_pb(CLIENT *client)
  21. {
  22. FixLengthPBMsg req;
  23. req.set_msg(request_msg);
  24. int64_t ns_st = GET_CURRENT_NS;
  25. ++query_count;
  26. client->echo_pb(&req, [client, ns_st](EmptyPBMsg *response, RPCContext *ctx) {
  27. if (ctx->success())
  28. {
  29. //printf("%s\n", ctx->get_remote_ip().c_str());
  30. latency_sum += GET_CURRENT_NS - ns_st;
  31. ++success_count;
  32. }
  33. else
  34. {
  35. printf("status[%d] error[%d] errmsg:%s\n",
  36. ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
  37. ++error_count;
  38. }
  39. if (!stop_flag)
  40. do_echo_pb<CLIENT>(client);
  41. //printf("echo done. seq_id=%d\n", ctx->get_task_seq());
  42. });
  43. }
  44. template<class CLIENT>
  45. static void do_echo_thrift(CLIENT *client)
  46. {
  47. BenchmarkThrift::echo_thriftRequest req;
  48. req.msg = request_msg;
  49. int64_t ns_st = GET_CURRENT_NS;
  50. ++query_count;
  51. client->echo_thrift(&req, [client, ns_st](BenchmarkThrift::echo_thriftResponse *response, RPCContext *ctx) {
  52. if (ctx->success())
  53. {
  54. //printf("%s\n", ctx->get_remote_ip().c_str());
  55. latency_sum += GET_CURRENT_NS - ns_st;
  56. ++success_count;
  57. }
  58. else
  59. {
  60. printf("status[%d] error[%d] errmsg:%s \n",
  61. ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
  62. ++error_count;
  63. }
  64. if (!stop_flag)
  65. do_echo_thrift<CLIENT>(client);
  66. //printf("echo done. seq_id=%d\n", ctx->get_task_seq());
  67. });
  68. }
  69. int main(int argc, char* argv[])
  70. {
  71. GOOGLE_PROTOBUF_VERIFY_VERSION;
  72. if (argc != 7)
  73. {
  74. fprintf(stderr, "Usage: %s <IP> <PORT> <srpc|brpc|thrift> <pb|thrift> <PARALLEL_NUMBER> <REQUEST_BYTES>\n", argv[0]);
  75. abort();
  76. }
  77. WFGlobalSettings setting = GLOBAL_SETTINGS_DEFAULT;
  78. setting.endpoint_params.max_connections = 2048;
  79. setting.poller_threads = 16;
  80. setting.handler_threads = 16;
  81. WORKFLOW_library_init(&setting);
  82. RPCClientParams client_params = RPC_CLIENT_PARAMS_DEFAULT;
  83. client_params.task_params.keep_alive_timeout = -1;
  84. client_params.host = argv[1];
  85. client_params.port = atoi(argv[2]);
  86. std::string server_type = argv[3];
  87. std::string idl_type = argv[4];
  88. PARALLEL_NUMBER = atoi(argv[5]);
  89. int REQUEST_BYTES = atoi(argv[6]);
  90. request_msg.resize(REQUEST_BYTES, 'r');
  91. //for (int i = 0; i < REQUEST_BYTES; i++)
  92. // request_msg[i] = (unsigned char)(rand() % 256);
  93. int64_t start = GET_CURRENT_MS();
  94. if (server_type == "srpc")
  95. {
  96. if (idl_type == "pb")
  97. {
  98. auto *client = new BenchmarkPB::SRPCClient(&client_params);
  99. for (int i = 0; i < PARALLEL_NUMBER; i++)
  100. do_echo_pb(client);
  101. }
  102. else if (idl_type == "thrift")
  103. {
  104. auto *client = new BenchmarkThrift::SRPCClient(&client_params);
  105. for (int i = 0; i < PARALLEL_NUMBER; i++)
  106. do_echo_thrift(client);
  107. }
  108. else
  109. abort();
  110. }
  111. else if (server_type == "brpc")
  112. {
  113. auto *client = new BenchmarkPB::BRPCClient(&client_params);
  114. for (int i = 0; i < PARALLEL_NUMBER; i++)
  115. {
  116. if (idl_type == "pb")
  117. do_echo_pb(client);
  118. else if (idl_type == "thrift")
  119. abort();
  120. else
  121. abort();
  122. }
  123. }
  124. else if (server_type == "thrift")
  125. {
  126. auto *client = new BenchmarkThrift::ThriftClient(&client_params);
  127. for (int i = 0; i < PARALLEL_NUMBER; i++)
  128. {
  129. if (idl_type == "pb")
  130. abort();
  131. else if (idl_type == "thrift")
  132. do_echo_thrift(client);
  133. else
  134. abort();
  135. }
  136. }
  137. else if (server_type == "srpc_http")
  138. {
  139. if (idl_type == "pb")
  140. {
  141. auto *client = new BenchmarkPB::SRPCHttpClient(&client_params);
  142. for (int i = 0; i < PARALLEL_NUMBER; i++)
  143. do_echo_pb(client);
  144. }
  145. else if (idl_type == "thrift")
  146. {
  147. auto *client = new BenchmarkThrift::SRPCHttpClient(&client_params);
  148. for (int i = 0; i < PARALLEL_NUMBER; i++)
  149. do_echo_thrift(client);
  150. }
  151. else
  152. abort();
  153. }
  154. else if (server_type == "thrift_http")
  155. {
  156. auto *client = new BenchmarkThrift::ThriftHttpClient(&client_params);
  157. for (int i = 0; i < PARALLEL_NUMBER; i++)
  158. {
  159. if (idl_type == "pb")
  160. abort();
  161. else if (idl_type == "thrift")
  162. do_echo_thrift(client);
  163. else
  164. abort();
  165. }
  166. }
  167. else
  168. abort();
  169. std::this_thread::sleep_for(std::chrono::seconds(TEST_SECOND));
  170. stop_flag = true;
  171. int64_t end = GET_CURRENT_MS();
  172. int tot = query_count;
  173. int s = success_count;
  174. int e = error_count;
  175. int64_t l = latency_sum;
  176. fprintf(stderr, "\nquery\t%d\ttimes, %d success, %d error.\n", tot, s, e);
  177. fprintf(stderr, "total\t%.3lf\tseconds\n", (end - start) / 1000.0);
  178. fprintf(stderr, "qps=%.0lf\n", tot * 1000.0 / (end - start));
  179. fprintf(stderr, "latency=%.0lfus\n", s > 0 ? l * 1.0 / s / 1000 : 0);
  180. std::this_thread::sleep_for(std::chrono::seconds(1));
  181. google::protobuf::ShutdownProtobufLibrary();
  182. return 0;
  183. }