server.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // A server to receive EchoRequest and send back EchoResponse.
  18. #include <gflags/gflags.h>
  19. #include <butil/logging.h>
  20. #include <brpc/server.h>
  21. #include <butil/atomicops.h>
  22. #include <butil/time.h>
  23. #include <butil/logging.h>
  24. #include <json2pb/json_to_pb.h>
  25. #include <bthread/timer_thread.h>
  26. #include <bthread/bthread.h>
  27. #include <cstdlib>
  28. #include <fstream>
  29. #include "cl_test.pb.h"
  30. DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state "
  31. "(waiting for client to close connection before server stops)");
  32. DEFINE_int32(server_bthread_concurrency, 4,
  33. "Configuring the value of bthread_concurrency, For compute max qps, ");
  34. DEFINE_int32(server_sync_sleep_us, 2500,
  35. "Usleep time, each request will be executed once, For compute max qps");
  36. // max qps = 1000 / 2.5 * 4
  37. DEFINE_int32(control_server_port, 9000, "");
  38. DEFINE_int32(echo_port, 9001, "TCP Port of echo server");
  39. DEFINE_int32(cntl_port, 9000, "TCP Port of controller server");
  40. DEFINE_string(case_file, "", "File path for test_cases");
  41. DEFINE_int32(latency_change_interval_us, 50000, "Intervalt for server side changes the latency");
  42. DEFINE_int32(server_max_concurrency, 0, "Echo Server's max_concurrency");
  43. DEFINE_bool(use_usleep, false,
  44. "EchoServer uses ::usleep or bthread_usleep to simulate latency "
  45. "when processing requests");
  46. bthread::TimerThread g_timer_thread;
  47. int cast_func(void* arg) {
  48. return *(int*)arg;
  49. }
  50. void DisplayStage(const test::Stage& stage) {
  51. std::string type;
  52. switch(stage.type()) {
  53. case test::FLUCTUATE:
  54. type = "Fluctuate";
  55. break;
  56. case test::SMOOTH:
  57. type = "Smooth";
  58. break;
  59. default:
  60. type = "Unknown";
  61. }
  62. std::stringstream ss;
  63. ss
  64. << "Stage:[" << stage.lower_bound() << ':'
  65. << stage.upper_bound() << "]"
  66. << " , Type:" << type;
  67. LOG(INFO) << ss.str();
  68. }
  69. butil::atomic<int> cnt(0);
  70. butil::atomic<int> atomic_sleep_time(0);
  71. bvar::PassiveStatus<int> atomic_sleep_time_bvar(cast_func, &atomic_sleep_time);
  72. namespace bthread {
  73. DECLARE_int32(bthread_concurrency);
  74. }
  75. void TimerTask(void* data);
  76. class EchoServiceImpl : public test::EchoService {
  77. public:
  78. EchoServiceImpl()
  79. : _stage_index(0)
  80. , _running_case(false) {
  81. };
  82. virtual ~EchoServiceImpl() {};
  83. void SetTestCase(const test::TestCase& test_case) {
  84. _test_case = test_case;
  85. _next_stage_start = _test_case.latency_stage_list(0).duration_sec() +
  86. butil::gettimeofday_s();
  87. _stage_index = 0;
  88. _running_case = false;
  89. DisplayStage(_test_case.latency_stage_list(_stage_index));
  90. }
  91. void StartTestCase() {
  92. CHECK(!_running_case);
  93. _running_case = true;
  94. UpdateLatency();
  95. }
  96. void StopTestCase() {
  97. _running_case = false;
  98. }
  99. void UpdateLatency() {
  100. if (!_running_case) {
  101. return;
  102. }
  103. ComputeLatency();
  104. g_timer_thread.schedule(TimerTask, (void*)this,
  105. butil::microseconds_from_now(FLAGS_latency_change_interval_us));
  106. }
  107. virtual void Echo(google::protobuf::RpcController* cntl_base,
  108. const test::NotifyRequest* request,
  109. test::NotifyResponse* response,
  110. google::protobuf::Closure* done) {
  111. brpc::ClosureGuard done_guard(done);
  112. response->set_message("hello");
  113. ::usleep(FLAGS_server_sync_sleep_us);
  114. if (FLAGS_use_usleep) {
  115. ::usleep(_latency.load(butil::memory_order_relaxed));
  116. } else {
  117. bthread_usleep(_latency.load(butil::memory_order_relaxed));
  118. }
  119. }
  120. void ComputeLatency() {
  121. if (_stage_index < _test_case.latency_stage_list_size() &&
  122. butil::gettimeofday_s() > _next_stage_start) {
  123. ++_stage_index;
  124. if (_stage_index < _test_case.latency_stage_list_size()) {
  125. _next_stage_start += _test_case.latency_stage_list(_stage_index).duration_sec();
  126. DisplayStage(_test_case.latency_stage_list(_stage_index));
  127. }
  128. }
  129. if (_stage_index == _test_case.latency_stage_list_size()) {
  130. const test::Stage& latency_stage =
  131. _test_case.latency_stage_list(_stage_index - 1);
  132. if (latency_stage.type() == test::ChangeType::FLUCTUATE) {
  133. _latency.store((latency_stage.lower_bound() + latency_stage.upper_bound()) / 2,
  134. butil::memory_order_relaxed);
  135. } else if (latency_stage.type() == test::ChangeType::SMOOTH) {
  136. _latency.store(latency_stage.upper_bound(), butil::memory_order_relaxed);
  137. }
  138. return;
  139. }
  140. const test::Stage& latency_stage = _test_case.latency_stage_list(_stage_index);
  141. const int lower_bound = latency_stage.lower_bound();
  142. const int upper_bound = latency_stage.upper_bound();
  143. if (latency_stage.type() == test::FLUCTUATE) {
  144. _latency.store(butil::fast_rand_less_than(upper_bound - lower_bound) + lower_bound,
  145. butil::memory_order_relaxed);
  146. } else if (latency_stage.type() == test::SMOOTH) {
  147. int latency = lower_bound + (upper_bound - lower_bound) /
  148. double(latency_stage.duration_sec()) *
  149. (latency_stage.duration_sec() - _next_stage_start +
  150. butil::gettimeofday_s());
  151. _latency.store(latency, butil::memory_order_relaxed);
  152. } else {
  153. LOG(FATAL) << "Wrong Type:" << latency_stage.type();
  154. }
  155. }
  156. private:
  157. int _stage_index;
  158. int _next_stage_start;
  159. butil::atomic<int> _latency;
  160. test::TestCase _test_case;
  161. bool _running_case;
  162. };
  163. void TimerTask(void* data) {
  164. EchoServiceImpl* echo_service = (EchoServiceImpl*)data;
  165. echo_service->UpdateLatency();
  166. }
  167. class ControlServiceImpl : public test::ControlService {
  168. public:
  169. ControlServiceImpl()
  170. : _case_index(0) {
  171. LoadCaseSet(FLAGS_case_file);
  172. _echo_service = new EchoServiceImpl;
  173. if (_server.AddService(_echo_service,
  174. brpc::SERVER_OWNS_SERVICE) != 0) {
  175. LOG(FATAL) << "Fail to add service";
  176. }
  177. g_timer_thread.start(NULL);
  178. }
  179. virtual ~ControlServiceImpl() {
  180. _echo_service->StopTestCase();
  181. g_timer_thread.stop_and_join();
  182. };
  183. virtual void Notify(google::protobuf::RpcController* cntl_base,
  184. const test::NotifyRequest* request,
  185. test::NotifyResponse* response,
  186. google::protobuf::Closure* done) {
  187. brpc::ClosureGuard done_guard(done);
  188. const std::string& message = request->message();
  189. LOG(INFO) << message;
  190. if (message == "ResetCaseSet") {
  191. _server.Stop(0);
  192. _server.Join();
  193. _echo_service->StopTestCase();
  194. LoadCaseSet(FLAGS_case_file);
  195. _case_index = 0;
  196. response->set_message("CaseSetReset");
  197. } else if (message == "StartCase") {
  198. CHECK(!_server.IsRunning()) << "Continuous StartCase";
  199. const test::TestCase& test_case = _case_set.test_case(_case_index++);
  200. _echo_service->SetTestCase(test_case);
  201. brpc::ServerOptions options;
  202. options.max_concurrency = FLAGS_server_max_concurrency;
  203. _server.MaxConcurrencyOf("test.EchoService.Echo") = test_case.max_concurrency();
  204. _server.Start(FLAGS_echo_port, &options);
  205. _echo_service->StartTestCase();
  206. response->set_message("CaseStarted");
  207. } else if (message == "StopCase") {
  208. CHECK(_server.IsRunning()) << "Continuous StopCase";
  209. _server.Stop(0);
  210. _server.Join();
  211. _echo_service->StopTestCase();
  212. response->set_message("CaseStopped");
  213. } else {
  214. LOG(FATAL) << "Invalid message:" << message;
  215. response->set_message("Invalid Cntl Message");
  216. }
  217. }
  218. private:
  219. void LoadCaseSet(const std::string& file_path) {
  220. std::ifstream ifs(file_path.c_str(), std::ios::in);
  221. if (!ifs) {
  222. LOG(FATAL) << "Fail to open case set file: " << file_path;
  223. }
  224. std::string case_set_json((std::istreambuf_iterator<char>(ifs)),
  225. std::istreambuf_iterator<char>());
  226. test::TestCaseSet case_set;
  227. std::string err;
  228. if (!json2pb::JsonToProtoMessage(case_set_json, &case_set, &err)) {
  229. LOG(FATAL)
  230. << "Fail to trans case_set from json to protobuf message: "
  231. << err;
  232. }
  233. _case_set = case_set;
  234. ifs.close();
  235. }
  236. brpc::Server _server;
  237. EchoServiceImpl* _echo_service;
  238. test::TestCaseSet _case_set;
  239. int _case_index;
  240. };
  241. int main(int argc, char* argv[]) {
  242. // Parse gflags. We recommend you to use gflags as well.
  243. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
  244. bthread::FLAGS_bthread_concurrency= FLAGS_server_bthread_concurrency;
  245. brpc::Server server;
  246. ControlServiceImpl control_service_impl;
  247. if (server.AddService(&control_service_impl,
  248. brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
  249. LOG(ERROR) << "Fail to add service";
  250. return -1;
  251. }
  252. if (server.Start(FLAGS_cntl_port, NULL) != 0) {
  253. LOG(ERROR) << "Fail to start EchoServer";
  254. return -1;
  255. }
  256. server.RunUntilAskedToQuit();
  257. return 0;
  258. }