client.cc 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. #include <string>
  2. #include <stdio.h>
  3. #include <unistd.h>
  4. #include <signal.h>
  5. #include <atomic>
  6. #include <chrono>
  7. #include <mutex>
  8. #include <condition_variable>
  9. #include "echo.srpc.h"
  10. #include "msg.srpc.h"
  11. using namespace srpc;
  12. using namespace example;
  13. #define PARALLEL_NUM 20
  14. #define GET_CURRENT_MS std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count()
  15. #define GET_CURRENT_NS std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now().time_since_epoch()).count()
  16. std::atomic<int> query_count(0);
  17. std::atomic<int> success_count(0);
  18. std::atomic<int> error_count(0);
  19. std::atomic<int64_t> latency_sum(0);
  20. int64_t start, end;
  21. volatile bool stop_flag = false;
  22. EchoRequest g_echo_req;
  23. ExampleThrift::MessageRequest g_msg_req;
  24. template<class CLIENT>
  25. static void do_echo(CLIENT *client)
  26. {
  27. EchoRequest req;
  28. req.set_message("Hello, sogou rpc!");
  29. req.set_name("Jeff Dean");
  30. int64_t ns_st = GET_CURRENT_NS;
  31. client->Echo(&req, [client, ns_st](EchoResponse *response, RPCContext *ctx) {
  32. if (!stop_flag)
  33. do_echo<CLIENT>(client);
  34. ++query_count;
  35. if (ctx->success())
  36. {
  37. // printf("%s\n", response->message().c_str());
  38. // printf("%s\n", ctx->get_remote_ip().c_str());
  39. ++success_count;
  40. latency_sum += GET_CURRENT_NS - ns_st;
  41. }
  42. else
  43. {
  44. printf("status[%d] error[%d] errmsg:%s\n", ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
  45. ++error_count;
  46. }
  47. //printf("echo done. seq_id=%d\n", ctx->get_task_seq());
  48. });
  49. }
  50. template<class CLIENT>
  51. static void do_msg(CLIENT *client)
  52. {
  53. ExampleThrift::MessageRequest req;
  54. req.message = "Hello, sogou rpc!";
  55. req.name = "Jeff Dean";
  56. int64_t ns_st = GET_CURRENT_NS;
  57. client->Message(&req, [client, ns_st](ExampleThrift::MessageResponse *response, RPCContext *ctx) {
  58. if (!stop_flag)
  59. do_msg<CLIENT>(client);
  60. ++query_count;
  61. if (ctx->success())
  62. {
  63. //printf("%s\n", response->result.message.c_str());
  64. //printf("%s\n", ctx->get_remote_ip().c_str());
  65. ++success_count;
  66. latency_sum += GET_CURRENT_NS - ns_st;
  67. }
  68. else
  69. {
  70. printf("status[%d] error[%d] errmsg:%s \n", ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
  71. ++error_count;
  72. }
  73. //printf("echo done. seq_id=%d\n", ctx->get_task_seq());
  74. });
  75. }
  76. void sig_handler(int signo)
  77. {
  78. stop_flag = true;
  79. end = GET_CURRENT_MS;
  80. int tot = query_count;
  81. int s = success_count;
  82. int e = error_count;
  83. int64_t l = latency_sum;
  84. fprintf(stderr, "\nquery\t%d\ttimes, %d success, %d error.\n", tot, s, e);
  85. fprintf(stderr, "total\t%.3lf\tseconds\n", (end - start) / 1000.0);
  86. fprintf(stderr, "qps=%.0lf\n", tot * 1000.0 / (end - start));
  87. fprintf(stderr, "latency=%.0lfus\n", s > 0 ? l * 1.0 / s / 1000 : 0);
  88. }
  89. int main(int argc, char* argv[])
  90. {
  91. GOOGLE_PROTOBUF_VERIFY_VERSION;
  92. if (argc < 4 || argc > 5)
  93. {
  94. fprintf(stderr, "Usage: %s <srpc|brpc|thrift> <echo|msg> <IP> <PORT> or %s <srpc|brpc|thrift> <echo|msg> <URL>\n",
  95. argv[0], argv[0]);
  96. return 0;
  97. }
  98. signal(SIGINT, sig_handler);
  99. signal(SIGTERM, sig_handler);
  100. WFGlobalSettings setting = GLOBAL_SETTINGS_DEFAULT;
  101. setting.endpoint_params.max_connections = 1000;
  102. setting.poller_threads = 16;
  103. setting.handler_threads = 16;
  104. WORKFLOW_library_init(&setting);
  105. RPCClientParams client_params = RPC_CLIENT_PARAMS_DEFAULT;
  106. client_params.task_params.keep_alive_timeout = -1;
  107. // client_params.task_params.data_type = RPCDataJson;
  108. // client_params.task_params.compress_type = RPCCompressGzip;
  109. // UpstreamManager::upstream_create_weighted_random("echo_server", true);
  110. // UpstreamManager::upstream_add_server("echo_server", "sostest12.web.gd.ted:1412");
  111. // UpstreamManager::upstream_add_server("echo_server", "sostest11.web.gd.ted:1412");
  112. // UpstreamManager::upstream_add_server("echo_server", "sostest11.web.gd.ted:1411");
  113. std::string server_type = argv[1];
  114. std::string service_name = argv[2];
  115. if (argc == 4)
  116. {
  117. // client_params.url = "http://echo_server";//for upstream
  118. client_params.url = argv[3];
  119. } else {
  120. client_params.host = argv[3];
  121. client_params.port = atoi(argv[4]);
  122. }
  123. start = GET_CURRENT_MS;
  124. if (server_type == "srpc")
  125. {
  126. if (service_name == "echo")
  127. {
  128. ExamplePB::SRPCClient client(&client_params);
  129. for (int i = 0; i < PARALLEL_NUM; i++)
  130. do_echo(&client);
  131. }
  132. else if (service_name == "msg")
  133. {
  134. ExampleThrift::SRPCClient client(&client_params);
  135. for (int i = 0; i < PARALLEL_NUM; i++)
  136. do_msg(&client);
  137. }
  138. else
  139. abort();
  140. }
  141. else if (server_type == "brpc")
  142. {
  143. ExamplePB::BRPCClient client(&client_params);
  144. for (int i = 0; i < PARALLEL_NUM; i++)
  145. {
  146. if (service_name == "echo")
  147. do_echo(&client);
  148. else if (service_name == "msg")
  149. abort();
  150. else
  151. abort();
  152. }
  153. }
  154. else if (server_type == "thrift")
  155. {
  156. ExampleThrift::ThriftClient client(&client_params);
  157. for (int i = 0; i < PARALLEL_NUM; i++)
  158. {
  159. if (service_name == "echo")
  160. abort();
  161. else if (service_name == "msg")
  162. do_msg(&client);
  163. else
  164. abort();
  165. }
  166. }
  167. else if (server_type == "srpc_http")
  168. {
  169. if (service_name == "echo")
  170. {
  171. ExamplePB::SRPCHttpClient client(&client_params);
  172. for (int i = 0; i < PARALLEL_NUM; i++)
  173. do_echo(&client);
  174. }
  175. else if (service_name == "msg")
  176. {
  177. ExampleThrift::SRPCHttpClient client(&client_params);
  178. for (int i = 0; i < PARALLEL_NUM; i++)
  179. do_msg(&client);
  180. }
  181. else
  182. abort();
  183. }
  184. else if (server_type == "thrift_http")
  185. {
  186. ExampleThrift::ThriftHttpClient client(&client_params);
  187. for (int i = 0; i < PARALLEL_NUM; i++)
  188. {
  189. if (service_name == "echo")
  190. abort();
  191. else if (service_name == "msg")
  192. do_msg(&client);
  193. else
  194. abort();
  195. }
  196. }
  197. else
  198. abort();
  199. pause();
  200. sleep(2);
  201. google::protobuf::ShutdownProtobufLibrary();
  202. return 0;
  203. }