client_cdf.cc 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <math.h>
  4. #include <thread>
  5. #include <string>
  6. #include <atomic>
  7. #include <chrono>
  8. #include <thread>
  9. #include <vector>
  10. #include <mutex>
  11. #include "benchmark_pb.srpc.h"
  12. #include "benchmark_thrift.srpc.h"
  13. using namespace srpc;
  14. #define TEST_SECOND 20
  15. #define GET_CURRENT_NS std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now().time_since_epoch()).count()
  16. std::atomic<int> query_count(0);
  17. std::atomic<int> slow_count(0);
  18. std::atomic<int> success_count(0);
  19. std::atomic<int> error_count(0);
  20. //std::atomic<int64_t> latency_sum(0);
  21. std::vector<std::vector<int64_t>> latency_lists;
  22. volatile bool stop_flag = false;
  23. int PARALLEL_NUMBER;
  24. std::string request_msg;
  25. int QPS;
  26. template<class CLIENT>
  27. static void do_echo_pb(CLIENT *client, int idx)
  28. {
  29. std::mutex mutex;
  30. auto& latency_list = latency_lists[idx];
  31. FixLengthPBMsg req;
  32. req.set_msg(request_msg);
  33. int usleep_gap = 1000000 / QPS * PARALLEL_NUMBER;
  34. while (!stop_flag)
  35. {
  36. int64_t ns_st = GET_CURRENT_NS;
  37. if (++query_count % 100 > 0)
  38. {
  39. client->echo_pb(&req, [ns_st, &latency_list, &mutex](EmptyPBMsg *response, RPCContext *ctx) {
  40. if (ctx->success())
  41. {
  42. //printf("%s\n", ctx->get_remote_ip().c_str());
  43. ++success_count;
  44. //latency_sum += GET_CURRENT_NS - ns_st;
  45. mutex.lock();
  46. latency_list.emplace_back(GET_CURRENT_NS - ns_st);
  47. mutex.unlock();
  48. }
  49. else
  50. {
  51. printf("status[%d] error[%d] errmsg:%s\n", ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
  52. ++error_count;
  53. }
  54. //printf("echo done. seq_id=%d\n", ctx->get_task_seq());
  55. });
  56. }
  57. else
  58. {
  59. client->slow_pb(&req, [](EmptyPBMsg *response, RPCContext *ctx) {
  60. slow_count++;
  61. if (!ctx->success())
  62. printf("status[%d] error[%d] errmsg:%s\n", ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
  63. });
  64. }
  65. std::this_thread::sleep_for(std::chrono::microseconds(usleep_gap));
  66. }
  67. }
  68. template<class CLIENT>
  69. static void do_echo_thrift(CLIENT *client, int idx)
  70. {
  71. std::mutex mutex;
  72. auto& latency_list = latency_lists[idx];
  73. BenchmarkThrift::echo_thriftRequest req;
  74. req.msg = request_msg;
  75. BenchmarkThrift::slow_thriftRequest slow_req;
  76. slow_req.msg = request_msg;
  77. int usleep_gap = 1000000 / QPS * PARALLEL_NUMBER;
  78. while (!stop_flag)
  79. {
  80. int64_t ns_st = GET_CURRENT_NS;
  81. if (++query_count % 100 > 0)
  82. {
  83. client->echo_thrift(&req, [ns_st, &latency_list, &mutex](BenchmarkThrift::echo_thriftResponse *response, RPCContext *ctx) {
  84. if (ctx->success())
  85. {
  86. //printf("%s\n", ctx->get_remote_ip().c_str());
  87. ++success_count;
  88. //latency_sum += GET_CURRENT_NS - ns_st;
  89. mutex.lock();
  90. latency_list.emplace_back(GET_CURRENT_NS - ns_st);
  91. mutex.unlock();
  92. }
  93. else
  94. {
  95. printf("status[%d] error[%d] errmsg:%s \n", ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
  96. ++error_count;
  97. }
  98. //printf("echo done. seq_id=%d\n", ctx->get_task_seq());
  99. });
  100. }
  101. else
  102. {
  103. client->slow_thrift(&slow_req, [](BenchmarkThrift::slow_thriftResponse *response, RPCContext *ctx) {
  104. slow_count++;
  105. if (!ctx->success())
  106. printf("status[%d] error[%d] errmsg:%s\n", ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
  107. });
  108. }
  109. std::this_thread::sleep_for(std::chrono::microseconds(usleep_gap));
  110. }
  111. }
  112. int main(int argc, char* argv[])
  113. {
  114. GOOGLE_PROTOBUF_VERIFY_VERSION;
  115. if (argc != 8)
  116. {
  117. fprintf(stderr, "Usage: %s <IP> <PORT> <srpc|brpc|thrift> <pb|thrift> <PARALLEL_NUMBER> <REQUEST_BYTES> <QPS>\n", argv[0]);
  118. abort();
  119. }
  120. WFGlobalSettings setting = GLOBAL_SETTINGS_DEFAULT;
  121. setting.endpoint_params.max_connections = 2048;
  122. setting.poller_threads = 16;
  123. setting.handler_threads = 16;
  124. WORKFLOW_library_init(&setting);
  125. RPCClientParams client_params = RPC_CLIENT_PARAMS_DEFAULT;
  126. client_params.task_params.keep_alive_timeout = -1;
  127. client_params.host = argv[1];
  128. client_params.port = atoi(argv[2]);
  129. std::string server_type = argv[3];
  130. std::string idl_type = argv[4];
  131. PARALLEL_NUMBER = atoi(argv[5]);
  132. int REQUEST_BYTES = atoi(argv[6]);
  133. QPS = atoi(argv[7]);
  134. request_msg.resize(REQUEST_BYTES, 'r');
  135. //for (int i = 0; i < REQUEST_BYTES; i++)
  136. // request_msg[i] = (unsigned char)(rand() % 256);
  137. latency_lists.resize(PARALLEL_NUMBER);
  138. std::vector<std::thread *> th;
  139. int64_t start = GET_CURRENT_MS();
  140. if (server_type == "srpc")
  141. {
  142. if (idl_type == "pb")
  143. {
  144. auto *client = new BenchmarkPB::SRPCClient(&client_params);
  145. for (int i = 0; i < PARALLEL_NUMBER; i++)
  146. th.push_back(new std::thread(do_echo_pb<BenchmarkPB::SRPCClient>, client, i));
  147. }
  148. else if (idl_type == "thrift")
  149. {
  150. auto *client = new BenchmarkThrift::SRPCClient(&client_params);
  151. for (int i = 0; i < PARALLEL_NUMBER; i++)
  152. th.push_back(new std::thread(do_echo_thrift<BenchmarkThrift::SRPCClient>, client, i));
  153. }
  154. else
  155. abort();
  156. }
  157. else if (server_type == "brpc")
  158. {
  159. auto *client = new BenchmarkPB::BRPCClient(&client_params);
  160. for (int i = 0; i < PARALLEL_NUMBER; i++)
  161. {
  162. if (idl_type == "pb")
  163. th.push_back(new std::thread(do_echo_pb<BenchmarkPB::BRPCClient>, client, i));
  164. else if (idl_type == "thrift")
  165. abort();
  166. else
  167. abort();
  168. }
  169. }
  170. else if (server_type == "thrift")
  171. {
  172. auto *client = new BenchmarkThrift::ThriftClient(&client_params);
  173. for (int i = 0; i < PARALLEL_NUMBER; i++)
  174. {
  175. if (idl_type == "pb")
  176. abort();
  177. else if (idl_type == "thrift")
  178. th.push_back(new std::thread(do_echo_thrift<BenchmarkThrift::ThriftClient>, client, i));
  179. else
  180. abort();
  181. }
  182. }
  183. else if (server_type == "srpc_http")
  184. {
  185. if (idl_type == "pb")
  186. {
  187. auto * client = new BenchmarkPB::SRPCHttpClient(&client_params);
  188. for (int i = 0; i < PARALLEL_NUMBER; i++)
  189. th.push_back(new std::thread(do_echo_pb<BenchmarkPB::SRPCHttpClient>, client, i));
  190. }
  191. else if (idl_type == "thrift")
  192. {
  193. auto *client = new BenchmarkThrift::SRPCHttpClient(&client_params);
  194. for (int i = 0; i < PARALLEL_NUMBER; i++)
  195. th.push_back(new std::thread(do_echo_thrift<BenchmarkThrift::SRPCHttpClient>, client, i));
  196. }
  197. else
  198. abort();
  199. }
  200. else if (server_type == "thrift_http")
  201. {
  202. auto *client = new BenchmarkThrift::ThriftHttpClient(&client_params);
  203. for (int i = 0; i < PARALLEL_NUMBER; i++)
  204. {
  205. if (idl_type == "pb")
  206. abort();
  207. else if (idl_type == "thrift")
  208. th.push_back(new std::thread(do_echo_thrift<BenchmarkThrift::ThriftHttpClient>, client, i));
  209. else
  210. abort();
  211. }
  212. }
  213. else
  214. abort();
  215. std::this_thread::sleep_for(std::chrono::seconds(TEST_SECOND));
  216. stop_flag = true;
  217. for (auto *t : th)
  218. {
  219. t->join();
  220. delete t;
  221. }
  222. int64_t end = GET_CURRENT_MS();
  223. int tot = query_count - slow_count;
  224. int s = success_count;
  225. int e = error_count;
  226. int64_t l = 0;//latency_sum;
  227. std::vector<int64_t> all_lc;
  228. for (const auto& list : latency_lists)
  229. {
  230. for (auto v : list)
  231. {
  232. //fprintf(stderr, "%lld\n", (long long int)v);
  233. l += v;
  234. }
  235. all_lc.insert(all_lc.end(), list.begin(), list.end());
  236. }
  237. sort(all_lc.begin(), all_lc.end());
  238. for (double r = 0.950; r <= 0.999; r += 0.001)
  239. {
  240. double d = r * all_lc.size();
  241. int idx = (int)(d + 1.0e-8);
  242. if (fabs(d - int(d)) > 1.0e-8)
  243. idx++;
  244. printf("%.3lf %lld\n", r, (long long int)all_lc[idx - 1]/1000);
  245. }
  246. //printf("%.3lf %lld\n", 1.0, (long long int)all_lc[all_lc.size() - 1]/1000);
  247. fprintf(stderr, "\nquery\t%d\ttimes, %d success, %d error.\n", tot, s, e);
  248. fprintf(stderr, "total\t%.3lf\tseconds\n", (end - start) / 1000.0);
  249. fprintf(stderr, "qps=%.0lf\n", tot * 1000.0 / (end - start));
  250. fprintf(stderr, "latency=%.0lfus\n", s > 0 ? l * 1.0 / s / 1000 : 0);
  251. std::this_thread::sleep_for(std::chrono::seconds(1));
  252. google::protobuf::ShutdownProtobufLibrary();
  253. return 0;
  254. }