server.cc 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. #include <stdio.h>
  2. #include <signal.h>
  3. #include "benchmark_pb.srpc.h"
  4. #include "benchmark_thrift.srpc.h"
  5. #include "workflow/WFFacilities.h"
  6. using namespace srpc;
  7. std::atomic<int> query_count(0); // per_second
  8. std::atomic<long long> last_timestamp(0L);
  9. //volatile bool stop_flag = false;
  10. int max_qps = 0;
  11. long long total_count = 0;
  12. WFFacilities::WaitGroup wait_group(1);
  13. inline void collect_qps()
  14. {
  15. int64_t ms_timestamp = GET_CURRENT_MS;
  16. ++query_count;
  17. if (ms_timestamp / 1000 > last_timestamp)
  18. {
  19. last_timestamp = ms_timestamp / 1000;
  20. int count = query_count;
  21. query_count = 0;
  22. total_count += count;
  23. if (count > max_qps)
  24. max_qps = count;
  25. long long ts = ms_timestamp;
  26. fprintf(stdout, "TIMESTAMP(ms) = %llu QPS = %d\n", ts, count);
  27. }
  28. }
  29. class BenchmarkPBServiceImpl : public BenchmarkPB::Service
  30. {
  31. public:
  32. void echo_pb(FixLengthPBMsg *request, EmptyPBMsg *response,
  33. RPCContext *ctx) override
  34. {
  35. collect_qps();
  36. }
  37. void slow_pb(FixLengthPBMsg *request, EmptyPBMsg *response,
  38. RPCContext *ctx) override
  39. {
  40. auto *task = WFTaskFactory::create_timer_task(15000, nullptr);
  41. ctx->get_series()->push_back(task);
  42. }
  43. };
  44. class BenchmarkThriftServiceImpl : public BenchmarkThrift::Service
  45. {
  46. public:
  47. void echo_thrift(BenchmarkThrift::echo_thriftRequest *request,
  48. BenchmarkThrift::echo_thriftResponse *response,
  49. RPCContext *ctx) override
  50. {
  51. collect_qps();
  52. }
  53. void slow_thrift(BenchmarkThrift::slow_thriftRequest *request,
  54. BenchmarkThrift::slow_thriftResponse *response,
  55. RPCContext *ctx) override
  56. {
  57. auto *task = WFTaskFactory::create_timer_task(15000, nullptr);
  58. ctx->get_series()->push_back(task);
  59. }
  60. };
  61. static void sig_handler(int signo)
  62. {
  63. wait_group.done();
  64. }
  65. static void run_srpc_server(unsigned short port)
  66. {
  67. RPCServerParams params = RPC_SERVER_PARAMS_DEFAULT;
  68. params.max_connections = 2048;
  69. SRPCServer server(&params);
  70. BenchmarkPBServiceImpl pb_impl;
  71. BenchmarkThriftServiceImpl thrift_impl;
  72. server.add_service(&pb_impl);
  73. server.add_service(&thrift_impl);
  74. if (server.start(port) == 0)
  75. {
  76. wait_group.wait();
  77. server.stop();
  78. }
  79. else
  80. perror("server start");
  81. }
  82. template<class SERVER>
  83. static void run_pb_server(unsigned short port)
  84. {
  85. RPCServerParams params = RPC_SERVER_PARAMS_DEFAULT;
  86. params.max_connections = 2048;
  87. SERVER server(&params);
  88. BenchmarkPBServiceImpl pb_impl;
  89. server.add_service(&pb_impl);
  90. if (server.start(port) == 0)
  91. {
  92. wait_group.wait();
  93. server.stop();
  94. }
  95. else
  96. perror("server start");
  97. }
  98. template<class SERVER>
  99. static void run_thrift_server(unsigned short port)
  100. {
  101. RPCServerParams params = RPC_SERVER_PARAMS_DEFAULT;
  102. params.max_connections = 2048;
  103. SERVER server(&params);
  104. BenchmarkThriftServiceImpl thrift_impl;
  105. server.add_service(&thrift_impl);
  106. if (server.start(port) == 0)
  107. {
  108. wait_group.wait();
  109. server.stop();
  110. }
  111. else
  112. perror("server start");
  113. }
  114. int main(int argc, char* argv[])
  115. {
  116. GOOGLE_PROTOBUF_VERIFY_VERSION;
  117. if (argc != 3)
  118. {
  119. fprintf(stderr, "Usage: %s <PORT> <srpc|brpc|thrift>\n", argv[0]);
  120. abort();
  121. }
  122. signal(SIGINT, sig_handler);
  123. signal(SIGTERM, sig_handler);
  124. WFGlobalSettings my = GLOBAL_SETTINGS_DEFAULT;
  125. my.poller_threads = 16;
  126. my.handler_threads = 16;
  127. WORKFLOW_library_init(&my);
  128. unsigned short port = atoi(argv[1]);
  129. std::string server_type = argv[2];
  130. if (server_type == "srpc")
  131. run_srpc_server(port);
  132. else if (server_type == "brpc")
  133. run_pb_server<BRPCServer>(port);
  134. else if (server_type == "thrift")
  135. run_thrift_server<ThriftServer>(port);
  136. else if (server_type == "srpc_http")
  137. run_pb_server<SRPCHttpServer>(port);
  138. else if (server_type == "thrift_http")
  139. run_thrift_server<ThriftHttpServer>(port);
  140. else
  141. abort();
  142. fprintf(stdout, "\nTotal query: %llu max QPS: %d\n", total_count, max_qps);
  143. google::protobuf::ShutdownProtobufLibrary();
  144. return 0;
  145. }