server.cc 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. #include <stdio.h>
  2. #include <signal.h>
  3. #include "benchmark_pb.srpc.h"
  4. #include "benchmark_thrift.srpc.h"
  5. #include "workflow/WFFacilities.h"
  6. #ifdef _WIN32
  7. #include "workflow/PlatformSocket.h"
  8. #else
  9. #include <unistd.h>
  10. #include <sys/types.h>
  11. #include <sys/socket.h>
  12. #include <netinet/in.h>
  13. #include <arpa/inet.h>
  14. #endif
  15. using namespace srpc;
  16. std::atomic<int> query_count(0); // per_second
  17. std::atomic<long long> last_timestamp(0L);
  18. //volatile bool stop_flag = false;
  19. int max_qps = 0;
  20. long long total_count = 0;
  21. WFFacilities::WaitGroup wait_group(1);
  22. inline void collect_qps()
  23. {
  24. int64_t ms_timestamp = GET_CURRENT_MS();
  25. ++query_count;
  26. if (ms_timestamp / 1000 > last_timestamp)
  27. {
  28. last_timestamp = ms_timestamp / 1000;
  29. int count = query_count;
  30. query_count = 0;
  31. total_count += count;
  32. if (count > max_qps)
  33. max_qps = count;
  34. long long ts = ms_timestamp;
  35. fprintf(stdout, "TIMESTAMP(ms) = %llu QPS = %d\n", ts, count);
  36. }
  37. }
  38. class BenchmarkPBServiceImpl : public BenchmarkPB::Service
  39. {
  40. public:
  41. void echo_pb(FixLengthPBMsg *request, EmptyPBMsg *response,
  42. RPCContext *ctx) override
  43. {
  44. collect_qps();
  45. }
  46. void slow_pb(FixLengthPBMsg *request, EmptyPBMsg *response,
  47. RPCContext *ctx) override
  48. {
  49. auto *task = WFTaskFactory::create_timer_task(15000, nullptr);
  50. ctx->get_series()->push_back(task);
  51. }
  52. };
  53. class BenchmarkThriftServiceImpl : public BenchmarkThrift::Service
  54. {
  55. public:
  56. void echo_thrift(BenchmarkThrift::echo_thriftRequest *request,
  57. BenchmarkThrift::echo_thriftResponse *response,
  58. RPCContext *ctx) override
  59. {
  60. collect_qps();
  61. }
  62. void slow_thrift(BenchmarkThrift::slow_thriftRequest *request,
  63. BenchmarkThrift::slow_thriftResponse *response,
  64. RPCContext *ctx) override
  65. {
  66. auto *task = WFTaskFactory::create_timer_task(15000, nullptr);
  67. ctx->get_series()->push_back(task);
  68. }
  69. };
  70. static void sig_handler(int signo)
  71. {
  72. wait_group.done();
  73. }
  74. static inline int create_bind_socket(unsigned short port)
  75. {
  76. int sockfd = socket(AF_INET, SOCK_STREAM, 0);
  77. if (sockfd >= 0)
  78. {
  79. struct sockaddr_in sin = { };
  80. sin.sin_family = AF_INET;
  81. sin.sin_port = htons(port);
  82. sin.sin_addr.s_addr = htonl(INADDR_ANY);
  83. if (bind(sockfd, (struct sockaddr *)&sin, sizeof sin) >= 0)
  84. return sockfd;
  85. close(sockfd);
  86. }
  87. return -1;
  88. }
  89. static void run_srpc_server(unsigned short port, int proc_num)
  90. {
  91. RPCServerParams params = RPC_SERVER_PARAMS_DEFAULT;
  92. params.max_connections = 2048;
  93. SRPCServer server(&params);
  94. BenchmarkPBServiceImpl pb_impl;
  95. BenchmarkThriftServiceImpl thrift_impl;
  96. server.add_service(&pb_impl);
  97. server.add_service(&thrift_impl);
  98. int sockfd = create_bind_socket(port);
  99. if (sockfd < 0)
  100. {
  101. perror("create socket");
  102. exit(1);
  103. }
  104. while ((proc_num /= 2) != 0)
  105. fork();
  106. if (server.serve(sockfd) == 0)
  107. {
  108. wait_group.wait();
  109. server.stop();
  110. }
  111. else
  112. perror("server start");
  113. close(sockfd);
  114. }
  115. template<class SERVER>
  116. static void run_pb_server(unsigned short port, int proc_num)
  117. {
  118. RPCServerParams params = RPC_SERVER_PARAMS_DEFAULT;
  119. params.max_connections = 2048;
  120. SERVER server(&params);
  121. BenchmarkPBServiceImpl pb_impl;
  122. server.add_service(&pb_impl);
  123. int sockfd = create_bind_socket(port);
  124. if (sockfd < 0)
  125. {
  126. perror("create socket");
  127. exit(1);
  128. }
  129. while ((proc_num /= 2) != 0)
  130. fork();
  131. if (server.serve(sockfd) == 0)
  132. {
  133. wait_group.wait();
  134. server.stop();
  135. }
  136. else
  137. perror("server start");
  138. close(sockfd);
  139. }
  140. template<class SERVER>
  141. static void run_thrift_server(unsigned short port, int proc_num)
  142. {
  143. RPCServerParams params = RPC_SERVER_PARAMS_DEFAULT;
  144. params.max_connections = 2048;
  145. SERVER server(&params);
  146. BenchmarkThriftServiceImpl thrift_impl;
  147. server.add_service(&thrift_impl);
  148. int sockfd = create_bind_socket(port);
  149. if (sockfd < 0)
  150. {
  151. perror("create socket");
  152. exit(1);
  153. }
  154. while ((proc_num /= 2) != 0)
  155. fork();
  156. if (server.serve(sockfd) == 0)
  157. {
  158. wait_group.wait();
  159. server.stop();
  160. }
  161. else
  162. perror("server start");
  163. close(sockfd);
  164. }
  165. int main(int argc, char* argv[])
  166. {
  167. GOOGLE_PROTOBUF_VERIFY_VERSION;
  168. int proc_num = 1;
  169. if (argc == 4)
  170. {
  171. proc_num = atoi(argv[3]);
  172. if (proc_num != 1 && proc_num != 2 && proc_num != 4 && proc_num != 8 && proc_num != 16)
  173. {
  174. fprintf(stderr, "Usage: %s <PORT> <srpc|brpc|thrift> [proc num (1/2/4/8/16)]\n", argv[0]);
  175. abort();
  176. }
  177. }
  178. else if (argc != 3)
  179. {
  180. fprintf(stderr, "Usage: %s <PORT> <srpc|brpc|thrift> [proc num (1/2/4/8/16)]\n", argv[0]);
  181. abort();
  182. }
  183. signal(SIGINT, sig_handler);
  184. signal(SIGTERM, sig_handler);
  185. WFGlobalSettings my = GLOBAL_SETTINGS_DEFAULT;
  186. my.poller_threads = 16;
  187. my.handler_threads = 16;
  188. WORKFLOW_library_init(&my);
  189. unsigned short port = atoi(argv[1]);
  190. std::string server_type = argv[2];
  191. if (server_type == "srpc")
  192. run_srpc_server(port, proc_num);
  193. else if (server_type == "brpc")
  194. run_pb_server<BRPCServer>(port, proc_num);
  195. else if (server_type == "thrift")
  196. run_thrift_server<ThriftServer>(port, proc_num);
  197. else if (server_type == "srpc_http")
  198. run_pb_server<SRPCHttpServer>(port, proc_num);
  199. else if (server_type == "thrift_http")
  200. run_thrift_server<ThriftHttpServer>(port, proc_num);
  201. else
  202. abort();
  203. fprintf(stdout, "\nTotal query: %llu max QPS: %d\n", total_count, max_qps);
  204. google::protobuf::ShutdownProtobufLibrary();
  205. return 0;
  206. }