main.cpp 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include <iostream>
  17. #include "util/tc_http.h"
  18. #include "util/tc_option.h"
  19. #include "util/tc_common.h"
  20. #include "util/tc_clientsocket.h"
  21. #include "util/tc_thread_pool.h"
  22. #include "util/tc_timeprovider.h"
  23. #include "servant/Application.h"
  24. #include "helloworld.pb.h"
  25. #include "util/tc_grpc.h"
  26. using namespace std;
  27. using namespace tars;
  28. Communicator* _comm;
  29. string grpcObj = "TestApp.GrpcServer.GrpcObj@";
  30. struct Param
  31. {
  32. int count;
  33. string call;
  34. int thread;
  35. string domain;
  36. ServantPrx servant2Prx;
  37. };
  38. Param param;
  39. std::atomic<int> callback_count(0);
  40. void syncRpc2(int c)
  41. {
  42. int64_t t = TC_Common::now2us();
  43. //发起远程调用
  44. for (int i = 0; i < c; ++i)
  45. {
  46. shared_ptr<TC_HttpResponse> rsp;
  47. shared_ptr<TC_HttpRequest> req = std::make_shared<TC_HttpRequest>();
  48. helloworld::HelloRequest request;
  49. helloworld::HelloReply reply;
  50. request.set_name(string("world-") + TC_Common::tostr(i));
  51. string message = request.SerializeAsString();
  52. addGrpcPrefix(message, false);
  53. std::string url = "http://" + param.domain;
  54. req->setPostRequest(url, message, true);
  55. req->setHeader("content-type", "application/grpc");
  56. req->setHeader("te", "trailers");
  57. req->setHeader(":path", "/helloworld.Greeter/SayHello");
  58. std::string content;
  59. bool compressed;
  60. try
  61. {
  62. param.servant2Prx->http_call("SayHello", req, rsp);
  63. content = rsp->getContent();
  64. RemoveGrpcPrefix(content, &compressed);
  65. reply.ParseFromString(content);
  66. cout << "rsp content: " << reply.message() << endl;
  67. }
  68. catch (exception & e)
  69. {
  70. cout << "exception:" << e.what() << endl;
  71. }
  72. std::string rspMessage = "Hello " + request.name();
  73. assert(rspMessage == reply.message());
  74. assert(req.use_count() == 1);
  75. assert(rsp.use_count() == 1);
  76. ++callback_count;
  77. }
  78. int64_t cost = TC_Common::now2us() - t;
  79. cout << "syncRpc2 total:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
  80. }
  81. struct TestHttpCallback : public HttpCallback
  82. {
  83. TestHttpCallback(const string &buff) : _buff(buff)
  84. {
  85. }
  86. virtual int onHttpResponse(const shared_ptr<TC_HttpResponse> &rsp)
  87. {
  88. callback_count++;
  89. helloworld::HelloRequest request;
  90. helloworld::HelloReply reply;
  91. std::string content;
  92. bool compressed;
  93. content = rsp->getContent();
  94. RemoveGrpcPrefix(content, &compressed);
  95. RemoveGrpcPrefix(_buff, &compressed);
  96. request.ParseFromString(_buff);
  97. reply.ParseFromString(content);
  98. std::string rspMessage = "Hello " + request.name();
  99. assert(rspMessage == reply.message());
  100. return 0;
  101. }
  102. virtual int onHttpResponseException(int expCode)
  103. {
  104. cout << "onHttpResponseException expCode:" << expCode << endl;
  105. callback_count++;
  106. return 0;
  107. }
  108. string _buff;
  109. };
  110. void asyncRpc2(int c)
  111. {
  112. int64_t t = TC_Common::now2us();
  113. //发起远程调用
  114. for (int i = 0; i < c; ++i)
  115. {
  116. shared_ptr<TC_HttpResponse> rsp;
  117. shared_ptr<TC_HttpRequest> req = std::make_shared<TC_HttpRequest>();
  118. helloworld::HelloRequest request;
  119. helloworld::HelloReply reply;
  120. request.set_name(string("world-") + TC_Common::tostr(i));
  121. string message = request.SerializeAsString();
  122. addGrpcPrefix(message, false);
  123. std::string url = "http://" + param.domain;
  124. req->setPostRequest(url, message, true);
  125. req->setHeader("content-type", "application/grpc");
  126. req->setHeader(":path", "/helloworld.Greeter/SayHello");
  127. req->setHeader("te", "trailers");
  128. HttpCallbackPtr p = new TestHttpCallback(message);
  129. try
  130. {
  131. param.servant2Prx->http_call_async("hello", req, p);
  132. }
  133. catch(exception& e)
  134. {
  135. cout << "exception:" << e.what() << endl;
  136. }
  137. if(i % 500 == 0)
  138. {
  139. TC_Common::msleep(100);
  140. }
  141. }
  142. int64_t cost = TC_Common::now2us() - t;
  143. cout << "asyncRpc2 send:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
  144. }
  145. int main(int argc, char *argv[])
  146. {
  147. try
  148. {
  149. if (argc < 4)
  150. {
  151. cout << "Usage:" << argv[0] << " --domain=127.0.0.1:50051 --count=1000 --call=[sync|async] --thread=1" << endl;
  152. return 0;
  153. }
  154. TC_Option option;
  155. option.decode(argc, argv);
  156. param.domain = option.getValue("domain");
  157. if(param.domain.empty()) param.domain = "127.0.0.1:50051";
  158. param.count = TC_Common::strto<int>(option.getValue("count"));
  159. if(param.count <= 0) param.count = 1000;
  160. param.call = option.getValue("call");
  161. if(param.call.empty()) param.call = "sync";
  162. param.thread = TC_Common::strto<int>(option.getValue("thread"));
  163. if(param.thread <= 0) param.thread = 1;
  164. _comm = new Communicator();
  165. // TarsRollLogger::getInstance()->logger()->setLogLevel(6);
  166. _comm->setProperty("sendqueuelimit", "1000000");
  167. _comm->setProperty("asyncqueuecap", "1000000");
  168. string ip = param.domain.substr(0, param.domain.find(":"));
  169. string port = param.domain.substr(param.domain.find(":") + 1);
  170. grpcObj += "tcp -h " + ip + " -p " + port;
  171. std::cout << "grpcObj: " << grpcObj << std::endl;
  172. param.servant2Prx = _comm->stringToProxy<ServantPrx>(grpcObj);
  173. param.servant2Prx->tars_connect_timeout(5000);
  174. param.servant2Prx->tars_async_timeout(60*1000);
  175. param.servant2Prx->tars_set_protocol(ServantProxy::PROTOCOL_GRPC);
  176. int64_t start = TC_Common::now2us();
  177. std::function<void(int)> func;
  178. if (param.call == "sync")
  179. {
  180. func = syncRpc2;
  181. }
  182. else if(param.call == "async")
  183. {
  184. func = asyncRpc2;
  185. }
  186. else
  187. {
  188. cout << "no func, exits" << endl;
  189. exit(0);
  190. }
  191. vector<std::thread*> vt;
  192. for(int i = 0 ; i< param.thread; i++)
  193. {
  194. vt.push_back(new std::thread(func, param.count));
  195. }
  196. std::thread print([&]{while(callback_count != param.count * param.thread) {
  197. cout << "Grpc:" << param.call << ": ----------finish count:" << callback_count << endl;
  198. std::this_thread::sleep_for(std::chrono::seconds(1));
  199. };});
  200. for(size_t i = 0 ; i< vt.size(); i++)
  201. {
  202. vt[i]->join();
  203. delete vt[i];
  204. }
  205. cout << "(pid:" << std::this_thread::get_id() << ")"
  206. << "(count:" << param.count << ")"
  207. << "(use ms:" << (TC_Common::now2us() - start)/1000 << ")"
  208. << endl;
  209. while(callback_count != param.count * param.thread) {
  210. std::this_thread::sleep_for(std::chrono::seconds(1));
  211. }
  212. print.join();
  213. cout << "----------finish count:" << callback_count << endl;
  214. }
  215. catch(exception &ex)
  216. {
  217. cout << ex.what() << endl;
  218. }
  219. cout << "main return." << endl;
  220. return 0;
  221. }