main.cpp 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "BServant.h"
  17. #include "servant/Communicator.h"
  18. #include "util/tc_thread.h"
  19. #include "util/tc_option.h"
  20. #include <iostream>
  21. using namespace std;
  22. using namespace Test;
  23. using namespace tars;
  24. Communicator* _comm;
  25. static string coroObj = "TestApp.BServer.BServantObj@tcp -h 127.0.0.1 -p 9200";
  26. struct Param
  27. {
  28. int count;
  29. string call;
  30. int thread;
  31. int buffersize;
  32. int netthread;
  33. BServantPrx pPrx;
  34. };
  35. Param param;
  36. std::atomic<int> callback_count(0);
  37. void serialCall(int c)
  38. {
  39. string buffer(param.buffersize, 'a');
  40. int64_t t = TC_Common::now2us();
  41. //发起远程调用
  42. for (int i = 0; i < c; ++i)
  43. {
  44. string r;
  45. try
  46. {
  47. param.pPrx->testCoroSerial(buffer, r);
  48. }
  49. catch(exception& e)
  50. {
  51. cout << "exception:" << e.what() << endl;
  52. }
  53. ++callback_count;
  54. }
  55. int64_t cost = TC_Common::now2us() - t;
  56. cout << "serialCall total:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
  57. }
  58. void parallelCall(int c)
  59. {
  60. string buffer(param.buffersize, 'a');
  61. int64_t t = TC_Common::now2us();
  62. //发起远程调用
  63. for (int i = 0; i < c; ++i)
  64. {
  65. string r;
  66. try
  67. {
  68. param.pPrx->testCoroParallel(buffer, r);
  69. }
  70. catch(exception& e)
  71. {
  72. cout << "exception:" << e.what() << endl;
  73. }
  74. ++callback_count;
  75. }
  76. int64_t cost = TC_Common::now2us() - t;
  77. cout << "parallelCall total:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
  78. }
  79. //
  80. //class Test1
  81. //{
  82. //public:
  83. // Test1();
  84. //
  85. // ~Test1();
  86. //
  87. // void queryResult(int iFlag, int iExecuteNum);
  88. //
  89. //private:
  90. // Communicator _comm;
  91. // BServantPrx _prx;
  92. //};
  93. //
  94. //Test1::Test1()
  95. //{
  96. // // _comm.setProperty("locator", "tars.tarsregistry.QueryObj@tcp -h 10.208.139.242 -p 17890 -t 10000");
  97. // // _comm.setProperty("stat", "tars.tarsstat.StatObj");
  98. // _prx = _comm.stringToProxy<BServantPrx>("TestApp.BServer.BServantObj@tcp -h 127.0.0.1 -p 9200");
  99. //}
  100. //
  101. //Test1::~Test1()
  102. //{
  103. //
  104. //}
  105. //
  106. //void Test1::queryResult(int iFlag, int iExecuteNum)
  107. //{
  108. // string sIn(10,'a');
  109. // string sOut("");
  110. //
  111. // time_t t = TC_Common::now2us();
  112. //
  113. // for(int i=0; i<iExecuteNum; i++)
  114. // {
  115. // // sOut = "";
  116. // try
  117. // {
  118. // int ret = -1;
  119. // if(iFlag == 0)
  120. // {
  121. // ret = _prx->testCoroSerial(sIn, sOut);
  122. // }
  123. // else
  124. // {
  125. // ret = _prx->testCoroParallel(sIn, sOut);
  126. // }
  127. //
  128. // assert(sIn == sOut);
  129. // // cout << ret << ", " << sIn << ", " << sOut << endl;
  130. // }
  131. // catch(TC_Exception &e)
  132. // {
  133. // cout << "pthread id: " << std::this_thread::get_id() << "id: " << i << "exception: " << e.what() << endl;
  134. // }
  135. // catch(...)
  136. // {
  137. // cout << "pthread id: " << std::this_thread::get_id() << "id: " << i << "unknown exception." << endl;
  138. // }
  139. // }
  140. //
  141. // int64_t cost = TC_Common::now2us() - t;
  142. // cout << "syncCall total:" << cost << "us, avg:" << 1.*cost/iExecuteNum << "us" << endl;
  143. //}
  144. int main(int argc, char *argv[])
  145. {
  146. try
  147. {
  148. if (argc < 5)
  149. {
  150. cout << "Usage:" << argv[0] << " --count=1000 --call=[serial|parallel] --thread=1 --buffersize=1000 --netthread=1" << endl;
  151. return 0;
  152. }
  153. TC_Option option;
  154. option.decode(argc, argv);
  155. param.count = TC_Common::strto<int>(option.getValue("count"));
  156. if(param.count <= 0) param.count = 1000;
  157. param.buffersize = TC_Common::strto<int>(option.getValue("buffersize"));
  158. if(param.buffersize <= 0) param.buffersize = 1000;
  159. param.call = option.getValue("call");
  160. if(param.call.empty()) param.call = "sync";
  161. param.thread = TC_Common::strto<int>(option.getValue("thread"));
  162. if(param.thread <= 0) param.thread = 1;
  163. param.netthread = TC_Common::strto<int>(option.getValue("netthread"));
  164. if(param.netthread <= 0) param.netthread = 1;
  165. _comm = new Communicator();
  166. param.pPrx = _comm->stringToProxy<BServantPrx>(coroObj);
  167. // LocalRollLogger::getInstance()->logger()->setLogLevel(6);
  168. _comm->setProperty("sendqueuelimit", "1000000");
  169. _comm->setProperty("asyncqueuecap", "1000000");
  170. _comm->setProperty("netthread", TC_Common::tostr(param.netthread));
  171. int64_t start = TC_Common::now2us();
  172. std::function<void(int)> func;
  173. if (param.call == "serial")
  174. {
  175. func = serialCall;
  176. }
  177. else if (param.call == "parallel")
  178. {
  179. func = parallelCall;
  180. }
  181. param.pPrx->tars_connect_timeout(5000);
  182. param.pPrx->tars_async_timeout(60*1000);
  183. vector<std::thread*> vt;
  184. for(int i = 0 ; i< param.thread; i++)
  185. {
  186. vt.push_back(new std::thread(func, param.count));
  187. }
  188. std::thread print([&]{while(callback_count != param.count * param.thread) {
  189. cout << "coro:" << param.call << ": ----------finish count:" << callback_count << endl;
  190. std::this_thread::sleep_for(std::chrono::seconds(1));
  191. };});
  192. for(size_t i = 0 ; i< vt.size(); i++)
  193. {
  194. vt[i]->join();
  195. delete vt[i];
  196. }
  197. cout << "(pid:" << std::this_thread::get_id() << ")"
  198. << "(count:" << param.count << ")"
  199. << "(use ms:" << (TC_Common::now2us() - start)/1000 << ")"
  200. << endl;
  201. while(callback_count != param.count * param.thread) {
  202. std::this_thread::sleep_for(std::chrono::seconds(1));
  203. }
  204. print.join();
  205. cout << "----------finish count:" << callback_count << endl;
  206. }
  207. catch(exception &ex)
  208. {
  209. cout << ex.what() << endl;
  210. }
  211. cout << "main return." << endl;
  212. return 0;
  213. }