rpc_press_impl.h 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #ifndef PBRPCPRESS_PBRPC_PRESS_H
  18. #define PBRPCPRESS_PBRPC_PRESS_H
  19. #include <stdio.h>
  20. #include <deque>
  21. #include <google/protobuf/compiler/importer.h>
  22. #include <google/protobuf/dynamic_message.h>
  23. #include <bvar/bvar.h>
  24. #include <brpc/channel.h>
  25. #include "info_thread.h"
  26. #include "pb_util.h"
  27. namespace pbrpcframework {
  28. class JsonUtil;
  29. struct PressOptions {
  30. std::string service; //service name (packet.rpcservice)
  31. std::string method; //method name (rpc service method)
  32. int server_type; // server type: 0 = hulu server, 1 = old pbrpc server, 2 = sofa server
  33. double test_req_rate; // 0 = no limit
  34. int test_thread_num;
  35. std::string input;
  36. std::string output;
  37. std::string host; // server's ip:port, used by hulu server and sofa server
  38. std::string channel; // server's channel, used by old pbrpc server
  39. //comcfg::Configure conf;
  40. std::string conf_dir;
  41. std::string conf_file;
  42. std::string connection_type; // connection type 0:SINGLE 1:POOLED 2:SHORT
  43. int connect_timeout_ms; // connection timeout in milliseconds
  44. int timeout_ms; // RPC timeout in milliseconds
  45. int max_retry; // Maximum retry times by RPC framework
  46. std::string protocol;
  47. int request_compress_type; // Snappy:1 Gzip:2 Zlib:3 LZ4:4 None:0
  48. int response_compress_type; // Snappy:1 Gzip:2 Zlib:3 LZ4:4 None:0
  49. int attachment_size; // Snappy:1 Gzip:2 Zlib:3 LZ4:4 None:0
  50. bool auth;// Enable Giano authentication
  51. std::string auth_group;
  52. std::string lb_policy; // "rr", "Policy of load balance rr ||random"
  53. std::string proto_file;
  54. std::string proto_includes;
  55. PressOptions() :
  56. server_type(0),
  57. test_req_rate(0),
  58. test_thread_num(1),
  59. connect_timeout_ms(1000),
  60. timeout_ms(1000),
  61. max_retry(3),
  62. request_compress_type(0),
  63. response_compress_type(0),
  64. attachment_size(0),
  65. auth(false)
  66. {}
  67. };
  68. class PressClient {
  69. public:
  70. PressClient(const PressOptions* options,
  71. google::protobuf::compiler::Importer* importer,
  72. google::protobuf::DynamicMessageFactory* factory) {
  73. _method_descriptor = NULL;
  74. _response_prototype = NULL;
  75. _options = options;
  76. _importer = importer;
  77. _factory = factory;
  78. }
  79. google::protobuf::Message* get_output_message() {
  80. return _response_prototype->New();
  81. }
  82. int init();
  83. void call_method(brpc::Controller* cntl,
  84. google::protobuf::Message* request,
  85. google::protobuf::Message* response,
  86. google::protobuf::Closure* done);
  87. public:
  88. brpc::Channel _rpc_client;
  89. std::string _attachment;
  90. const PressOptions* _options;
  91. const google::protobuf::MethodDescriptor* _method_descriptor;
  92. const google::protobuf::Message* _response_prototype;
  93. google::protobuf::compiler::Importer* _importer;
  94. google::protobuf::DynamicMessageFactory* _factory;
  95. };
  96. class RpcPress {
  97. public:
  98. RpcPress();
  99. ~RpcPress();
  100. int init(const PressOptions* options);
  101. int start();
  102. int stop();
  103. const PressOptions* options() { return &_options; }
  104. private:
  105. DISALLOW_COPY_AND_ASSIGN(RpcPress);
  106. bool new_pbrpc_press_client_by_client_type(int client_type);
  107. void sync_client();
  108. void handle_response(brpc::Controller* cntl,
  109. google::protobuf::Message* request,
  110. google::protobuf::Message* response,
  111. int64_t start_time_ns);
  112. static void* sync_call_thread(void* arg);
  113. bvar::LatencyRecorder _latency_recorder;
  114. bvar::Adder<int64_t> _error_count;
  115. bvar::Adder<int64_t> _sent_count;
  116. std::deque<google::protobuf::Message*> _msgs;
  117. PressClient* _pbrpc_client;
  118. PressOptions _options;
  119. bool _started;
  120. bool _stop;
  121. FILE* _output_json;
  122. google::protobuf::compiler::Importer* _importer;
  123. google::protobuf::DynamicMessageFactory _factory;
  124. std::vector<pthread_t> _ttid;
  125. brpc::InfoThread _info_thr;
  126. };
  127. }
  128. #endif // PBRPCPRESS_PBRPC_PRESS_H