rpc_replay.cpp 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <gflags/gflags.h>
  18. #include <butil/logging.h>
  19. #include <butil/time.h>
  20. #include <butil/macros.h>
  21. #include <butil/file_util.h>
  22. #include <bvar/bvar.h>
  23. #include <bthread/bthread.h>
  24. #include <brpc/channel.h>
  25. #include <brpc/server.h>
  26. #include <brpc/rpc_dump.h>
  27. #include <brpc/serialized_request.h>
  28. #include "info_thread.h"
  29. DEFINE_string(dir, "", "The directory of dumped requests");
  30. DEFINE_int32(times, 1, "Repeat replaying for so many times");
  31. DEFINE_int32(qps, 0, "Limit QPS if this flag is positive");
  32. DEFINE_int32(thread_num, 0, "Number of threads for replaying");
  33. DEFINE_bool(use_bthread, true, "Use bthread to replay");
  34. DEFINE_string(connection_type, "", "Connection type, choose automatically "
  35. "according to protocol by default");
  36. DEFINE_string(server, "0.0.0.0:8002", "IP Address of server");
  37. DEFINE_string(load_balancer, "", "The algorithm for load balancing");
  38. DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
  39. DEFINE_int32(max_retry, 3, "Maximum retry times");
  40. DEFINE_int32(dummy_port, 8899, "Port of dummy server(to monitor replaying)");
  41. bvar::LatencyRecorder g_latency_recorder("rpc_replay");
  42. bvar::Adder<int64_t> g_error_count("rpc_replay_error_count");
  43. bvar::Adder<int64_t> g_sent_count;
  44. // Include channels for all protocols that support both client and server.
  45. class ChannelGroup {
  46. public:
  47. int Init();
  48. ~ChannelGroup();
  49. // Get channel by protocol type.
  50. brpc::Channel* channel(brpc::ProtocolType type) {
  51. if ((size_t)type < _chans.size()) {
  52. return _chans[(size_t)type];
  53. }
  54. return NULL;
  55. }
  56. private:
  57. std::vector<brpc::Channel*> _chans;
  58. };
  59. int ChannelGroup::Init() {
  60. {
  61. // force global initialization of rpc.
  62. brpc::Channel dummy_channel;
  63. }
  64. std::vector<std::pair<brpc::ProtocolType, brpc::Protocol> > protocols;
  65. brpc::ListProtocols(&protocols);
  66. size_t max_protocol_size = 0;
  67. for (size_t i = 0; i < protocols.size(); ++i) {
  68. max_protocol_size = std::max(max_protocol_size,
  69. (size_t)protocols[i].first);
  70. }
  71. _chans.resize(max_protocol_size + 1);
  72. for (size_t i = 0; i < protocols.size(); ++i) {
  73. if (protocols[i].second.support_client() &&
  74. protocols[i].second.support_server()) {
  75. const brpc::ProtocolType prot = protocols[i].first;
  76. brpc::Channel* chan = new brpc::Channel;
  77. brpc::ChannelOptions options;
  78. options.protocol = prot;
  79. options.connection_type = FLAGS_connection_type;
  80. options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
  81. options.max_retry = FLAGS_max_retry;
  82. if (chan->Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(),
  83. &options) != 0) {
  84. LOG(ERROR) << "Fail to initialize channel";
  85. return -1;
  86. }
  87. _chans[prot] = chan;
  88. }
  89. }
  90. return 0;
  91. }
  92. ChannelGroup::~ChannelGroup() {
  93. for (size_t i = 0; i < _chans.size(); ++i) {
  94. delete _chans[i];
  95. }
  96. _chans.clear();
  97. }
  98. static void handle_response(brpc::Controller* cntl, int64_t start_time,
  99. bool sleep_on_error/*note*/) {
  100. // TODO(gejun): some bthreads are starved when new bthreads are created
  101. // continuously, which happens when server is down and RPC keeps failing.
  102. // Sleep a while on error to avoid that now.
  103. const int64_t end_time = butil::gettimeofday_us();
  104. const int64_t elp = end_time - start_time;
  105. if (!cntl->Failed()) {
  106. g_latency_recorder << elp;
  107. } else {
  108. g_error_count << 1;
  109. if (sleep_on_error) {
  110. bthread_usleep(10000);
  111. }
  112. }
  113. delete cntl;
  114. }
  115. butil::atomic<int> g_thread_offset(0);
  116. static void* replay_thread(void* arg) {
  117. ChannelGroup* chan_group = static_cast<ChannelGroup*>(arg);
  118. const int thread_offset = g_thread_offset.fetch_add(1, butil::memory_order_relaxed);
  119. double req_rate = FLAGS_qps / (double)FLAGS_thread_num;
  120. brpc::SerializedRequest req;
  121. std::deque<int64_t> timeq;
  122. size_t MAX_QUEUE_SIZE = (size_t)req_rate;
  123. if (MAX_QUEUE_SIZE < 100) {
  124. MAX_QUEUE_SIZE = 100;
  125. } else if (MAX_QUEUE_SIZE > 2000) {
  126. MAX_QUEUE_SIZE = 2000;
  127. }
  128. timeq.push_back(butil::gettimeofday_us());
  129. for (int i = 0; !brpc::IsAskedToQuit() && i < FLAGS_times; ++i) {
  130. brpc::SampleIterator it(FLAGS_dir);
  131. int j = 0;
  132. for (brpc::SampledRequest* sample = it.Next();
  133. !brpc::IsAskedToQuit() && sample != NULL; sample = it.Next(), ++j) {
  134. std::unique_ptr<brpc::SampledRequest> sample_guard(sample);
  135. if ((j % FLAGS_thread_num) != thread_offset) {
  136. continue;
  137. }
  138. brpc::Channel* chan =
  139. chan_group->channel(sample->meta.protocol_type());
  140. if (chan == NULL) {
  141. LOG(ERROR) << "No channel on protocol="
  142. << sample->meta.protocol_type();
  143. continue;
  144. }
  145. brpc::Controller* cntl = new brpc::Controller;
  146. req.Clear();
  147. cntl->reset_sampled_request(sample_guard.release());
  148. if (sample->meta.attachment_size() > 0) {
  149. sample->request.cutn(
  150. &req.serialized_data(),
  151. sample->request.size() - sample->meta.attachment_size());
  152. cntl->request_attachment() = sample->request.movable();
  153. } else {
  154. req.serialized_data() = sample->request.movable();
  155. }
  156. g_sent_count << 1;
  157. const int64_t start_time = butil::gettimeofday_us();
  158. if (FLAGS_qps <= 0) {
  159. chan->CallMethod(NULL/*use rpc_dump_context in cntl instead*/,
  160. cntl, &req, NULL/*ignore response*/, NULL);
  161. handle_response(cntl, start_time, true);
  162. } else {
  163. google::protobuf::Closure* done =
  164. brpc::NewCallback(handle_response, cntl, start_time, false);
  165. chan->CallMethod(NULL/*use rpc_dump_context in cntl instead*/,
  166. cntl, &req, NULL/*ignore response*/, done);
  167. const int64_t end_time = butil::gettimeofday_us();
  168. int64_t expected_elp = 0;
  169. int64_t actual_elp = 0;
  170. timeq.push_back(end_time);
  171. if (timeq.size() > MAX_QUEUE_SIZE) {
  172. actual_elp = end_time - timeq.front();
  173. timeq.pop_front();
  174. expected_elp = (size_t)(1000000 * timeq.size() / req_rate);
  175. } else {
  176. actual_elp = end_time - timeq.front();
  177. expected_elp = (size_t)(1000000 * (timeq.size() - 1) / req_rate);
  178. }
  179. if (actual_elp < expected_elp) {
  180. bthread_usleep(expected_elp - actual_elp);
  181. }
  182. }
  183. }
  184. }
  185. return NULL;
  186. }
  187. int main(int argc, char* argv[]) {
  188. // Parse gflags. We recommend you to use gflags as well.
  189. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
  190. if (FLAGS_dir.empty() ||
  191. !butil::DirectoryExists(butil::FilePath(FLAGS_dir))) {
  192. LOG(ERROR) << "--dir=<dir-of-dumped-files> is required";
  193. return -1;
  194. }
  195. if (FLAGS_dummy_port >= 0) {
  196. brpc::StartDummyServerAt(FLAGS_dummy_port);
  197. }
  198. ChannelGroup chan_group;
  199. if (chan_group.Init() != 0) {
  200. LOG(ERROR) << "Fail to init ChannelGroup";
  201. return -1;
  202. }
  203. if (FLAGS_thread_num <= 0) {
  204. if (FLAGS_qps <= 0) { // unlimited qps
  205. FLAGS_thread_num = 50;
  206. } else {
  207. FLAGS_thread_num = FLAGS_qps / 10000;
  208. if (FLAGS_thread_num < 1) {
  209. FLAGS_thread_num = 1;
  210. }
  211. if (FLAGS_thread_num > 50) {
  212. FLAGS_thread_num = 50;
  213. }
  214. }
  215. }
  216. std::vector<bthread_t> bids;
  217. std::vector<pthread_t> pids;
  218. if (!FLAGS_use_bthread) {
  219. pids.resize(FLAGS_thread_num);
  220. for (int i = 0; i < FLAGS_thread_num; ++i) {
  221. if (pthread_create(&pids[i], NULL, replay_thread, &chan_group) != 0) {
  222. LOG(ERROR) << "Fail to create pthread";
  223. return -1;
  224. }
  225. }
  226. } else {
  227. bids.resize(FLAGS_thread_num);
  228. for (int i = 0; i < FLAGS_thread_num; ++i) {
  229. if (bthread_start_background(
  230. &bids[i], NULL, replay_thread, &chan_group) != 0) {
  231. LOG(ERROR) << "Fail to create bthread";
  232. return -1;
  233. }
  234. }
  235. }
  236. brpc::InfoThread info_thr;
  237. brpc::InfoThreadOptions info_thr_opt;
  238. info_thr_opt.latency_recorder = &g_latency_recorder;
  239. info_thr_opt.error_count = &g_error_count;
  240. info_thr_opt.sent_count = &g_sent_count;
  241. if (!info_thr.start(info_thr_opt)) {
  242. LOG(ERROR) << "Fail to create info_thread";
  243. return -1;
  244. }
  245. for (int i = 0; i < FLAGS_thread_num; ++i) {
  246. if (!FLAGS_use_bthread) {
  247. pthread_join(pids[i], NULL);
  248. } else {
  249. bthread_join(bids[i], NULL);
  250. }
  251. }
  252. info_thr.stop();
  253. return 0;
  254. }