main.cpp 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. 
  2. #include <iostream>
  3. #include "util/tc_http.h"
  4. #include "util/tc_common.h"
  5. #include "util/tc_clientsocket.h"
  6. #include "util/tc_thread_pool.h"
  7. // #include "tup/tup.h"
  8. #include "util/tc_timeprovider.h"
  9. #include "servant/Application.h"
  10. using namespace std;
  11. using namespace tars;
  12. // using namespace wup;
  13. Communicator* _comm;
  14. //static string httpObj = "TestApp.HttpServer.httpObj@tcp -h 127.0.0.1 -p 8081:tcp -h 127.0.0.1 -p 8082:tcp -h 127.0.0.1 -p 8083";
  15. static string httpObj = "TestApp.HttpServer.httpObj@tcp -h 127.0.0.1 -p 8081";
  16. //static string httpObj = "TestApp.HttpServer.httpObj@tcp -h 134.175.105.92 -p 8081";
  17. struct Param
  18. {
  19. int count;
  20. string call;
  21. int thread;
  22. ServantPrx servantPrx;
  23. };
  24. Param param;
  25. std::atomic<int> callback_count(0);
  26. void httpCall(int excut_num)
  27. {
  28. int64_t _iTime = TC_TimeProvider::getInstance()->getNowMs();
  29. // string sServer1("http://134.175.105.92:8081/");
  30. string sServer1("http://127.0.0.1:8081/");
  31. TC_HttpRequest stHttpReq;
  32. stHttpReq.setCacheControl("no-cache");
  33. // stHttpReq.setGetRequest(sServer1);
  34. TC_TCPClient client ;
  35. // client.init("127.0.0.1", 8081, 3000);
  36. client.init("127.0.0.1", 8082, 3000);
  37. int iRet = 0;
  38. for (int i = 0; i<excut_num; i++)
  39. {
  40. TC_HttpResponse stHttpRsp;
  41. stHttpReq.setPostRequest(sServer1, TC_Common::tostr(i), true);
  42. iRet = stHttpReq.doRequest(stHttpRsp, 3000);
  43. // iRet = stHttpReq.doRequest(client,stHttpRsp);
  44. if (iRet != 0)
  45. {
  46. cout <<"pthread id: " << TC_Thread::CURRENT_THREADID() << ", iRet:" << iRet <<endl;
  47. }
  48. ++callback_count;
  49. }
  50. cout << "httpCall, succ:" << param.count << "/" << excut_num << ", " << TC_TimeProvider::getInstance()->getNowMs() - _iTime <<"(ms)"<<endl;
  51. }
  52. void syncRpc(int c)
  53. {
  54. int64_t t = TC_Common::now2us();
  55. //发起远程调用
  56. for (int i = 0; i < c; ++i)
  57. {
  58. string buff = string("helloworld-") + TC_Common::tostr(i);
  59. shared_ptr<TC_HttpRequest> req = std::make_shared<TC_HttpRequest>();
  60. req->setPostRequest("http://tars.com/hello", buff, true);
  61. // req->setHeader("Connection", "keep-alive");
  62. shared_ptr<TC_HttpResponse> rsp;
  63. try
  64. {
  65. param.servantPrx->tars_hash(i)->http_call("hello", req, rsp);
  66. assert(req->getContent() == rsp->getContent());
  67. }
  68. catch(exception& e)
  69. {
  70. cout << "exception:" << e.what() << endl;
  71. }
  72. // sleep(1);
  73. // assert(req.use_count() == 1);
  74. // assert(rsp.use_count() == 1);
  75. ++callback_count;
  76. }
  77. int64_t cost = TC_Common::now2us() - t;
  78. cout << "syncCall total:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
  79. }
  80. struct TestHttpCallback : public HttpCallback
  81. {
  82. TestHttpCallback(const string &buff) : _buff(buff)
  83. {
  84. }
  85. virtual int onHttpResponse(const shared_ptr<TC_HttpResponse> &rsp)
  86. {
  87. callback_count++;
  88. assert(_buff == rsp->getContent());
  89. return 0;
  90. }
  91. virtual int onHttpResponseException(int expCode)
  92. {
  93. cout << "onHttpResponseException expCode:" << expCode << endl;
  94. callback_count++;
  95. return 0;
  96. }
  97. string _buff;
  98. };
  99. void asyncRpc(int c)
  100. {
  101. int64_t t = TC_Common::now2us();
  102. //发起远程调用
  103. for (int i = 0; i < c; ++i)
  104. {
  105. shared_ptr<TC_HttpRequest> req = std::make_shared<TC_HttpRequest>();
  106. string buff = string("helloworld-") + TC_Common::tostr(i);
  107. req->setPostRequest("http://tars.com/hello", buff, true);
  108. // req->setHeader("Connection", "keep-alive");
  109. HttpCallbackPtr p = new TestHttpCallback(buff);
  110. try
  111. {
  112. param.servantPrx->http_call_async("hello", req, p);
  113. }
  114. catch(exception& e)
  115. {
  116. cout << "exception:" << e.what() << endl;
  117. }
  118. if(i % 500 == 0)
  119. {
  120. TC_Common::msleep(100);
  121. }
  122. }
  123. int64_t cost = TC_Common::now2us() - t;
  124. cout << "asyncRpc2 send:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
  125. }
  126. void asyncRpcCallback(int c)
  127. {
  128. int64_t t = TC_Common::now2us();
  129. _comm->setServantCustomCallback(param.servantPrx->tars_name(), [](ReqMessagePtr msg) {
  130. msg->callback->dispatch(msg);
  131. });
  132. //发起远程调用
  133. for (int i = 0; i < c; ++i)
  134. {
  135. shared_ptr<TC_HttpRequest> req = std::make_shared<TC_HttpRequest>();
  136. string buff = string("helloworld-") + TC_Common::tostr(i);
  137. req->setPostRequest("http://tars.com/hello", buff, true);
  138. // req->setHeader("Connection", "keep-alive");
  139. HttpCallbackPtr p = new TestHttpCallback(buff);
  140. try
  141. {
  142. param.servantPrx->http_call_async("hello", req, p);
  143. }
  144. catch(exception& e)
  145. {
  146. cout << "exception:" << e.what() << endl;
  147. }
  148. if(i % 500 == 0)
  149. {
  150. TC_Common::msleep(100);
  151. }
  152. }
  153. int64_t cost = TC_Common::now2us() - t;
  154. cout << "asyncRpc2 send:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
  155. }
  156. int main(int argc, char *argv[])
  157. {
  158. try
  159. {
  160. if (argc < 4)
  161. {
  162. cout << "Usage:" << argv[0] << "--count=1000 --call=[base|sync|async|callback] --thread=1" << endl;
  163. return 0;
  164. }
  165. TC_Option option;
  166. option.decode(argc, argv);
  167. param.count = TC_Common::strto<int>(option.getValue("count"));
  168. if(param.count <= 0) param.count = 1000;
  169. param.call = option.getValue("call");
  170. if(param.call.empty()) param.call = "sync";
  171. param.thread = TC_Common::strto<int>(option.getValue("thread"));
  172. if(param.thread <= 0) param.thread = 1;
  173. _comm = new Communicator();
  174. // LocalRollLogger::getInstance()->logger()->setLogLevel(6);
  175. _comm->setProperty("sendqueuelimit", "1000000");
  176. _comm->setProperty("asyncqueuecap", "1000000");
  177. param.servantPrx = _comm->stringToProxy<ServantPrx>(httpObj);
  178. int64_t start = TC_Common::now2us();
  179. std::function<void(int)> func;
  180. if (param.call == "base")
  181. {
  182. func = httpCall;
  183. }
  184. else if (param.call == "sync")
  185. {
  186. func = syncRpc;
  187. }
  188. else if(param.call == "async")
  189. {
  190. func = asyncRpc;
  191. }
  192. else if(param.call == "callback")
  193. {
  194. func = asyncRpcCallback;
  195. }
  196. else
  197. {
  198. cout << "no func, exits" << endl;
  199. exit(0);
  200. }
  201. param.servantPrx->tars_connect_timeout(5000);
  202. param.servantPrx->tars_async_timeout(60*1000);
  203. param.servantPrx->tars_set_protocol(ServantProxy::PROTOCOL_HTTP1, 3);
  204. vector<std::thread*> vt;
  205. for(int i = 0 ; i< param.thread; i++)
  206. {
  207. vt.push_back(new std::thread(func, param.count));
  208. }
  209. std::thread print([&]{while(callback_count != param.count * param.thread) {
  210. cout << "Http:" << param.call << ": ----------finish count:" << callback_count << endl;
  211. std::this_thread::sleep_for(std::chrono::seconds(1));
  212. };});
  213. for(size_t i = 0 ; i< vt.size(); i++)
  214. {
  215. vt[i]->join();
  216. delete vt[i];
  217. }
  218. cout << "(pid:" << std::this_thread::get_id() << ")"
  219. << "(count:" << param.count << ")"
  220. << "(use ms:" << (TC_Common::now2us() - start)/1000 << ")"
  221. << endl;
  222. while(callback_count != param.count * param.thread) {
  223. std::this_thread::sleep_for(std::chrono::seconds(1));
  224. }
  225. print.join();
  226. cout << "----------finish count:" << callback_count << endl;
  227. delete _comm;
  228. _comm = NULL;
  229. }
  230. catch(exception &ex)
  231. {
  232. cout << ex.what() << endl;
  233. }
  234. cout << "main return." << endl;
  235. return 0;
  236. }