main.cpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. #include "servant/Application.h"
  2. #include "util/tc_network_buffer.h"
  3. #include <iostream>
  4. using namespace std;
  5. using namespace tars;
  6. Communicator* _comm;
  7. static string sObjName = "TestApp.CustomServer.CustomServantObj@tcp -h 127.0.0.1 -t 60000 -p 9400";
  8. struct Param
  9. {
  10. int count;
  11. string call;
  12. int thread;
  13. int buffersize;
  14. int netthread;
  15. ServantPrx servantPrx;
  16. };
  17. Param param;
  18. std::atomic<int> callback_count(0);
  19. //The response packet decoding function decodes the data received from the server according to the specific format and resolves it to theResponsePacket
  20. static TC_NetWorkBuffer::PACKET_TYPE customResponse(TC_NetWorkBuffer &in, ResponsePacket& rsp)
  21. {
  22. size_t len = sizeof(tars::Int32);
  23. if (in.getBufferLength() < len)
  24. {
  25. return TC_NetWorkBuffer::PACKET_LESS;
  26. }
  27. string header;
  28. in.getHeader(len, header);
  29. assert(header.size() == len);
  30. tars::Int32 iHeaderLen = 0;
  31. ::memcpy(&iHeaderLen, header.c_str(), sizeof(tars::Int32));
  32. iHeaderLen = ntohl(iHeaderLen);
  33. if (iHeaderLen > 100000 || iHeaderLen < (int)sizeof(unsigned int))
  34. {
  35. throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen));
  36. }
  37. if (in.getBufferLength() < (uint32_t)iHeaderLen)
  38. {
  39. return TC_NetWorkBuffer::PACKET_LESS;
  40. }
  41. in.moveHeader(sizeof(iHeaderLen));
  42. tars::Int32 iRequestId = 0;
  43. string sRequestId;
  44. in.getHeader(sizeof(iRequestId), sRequestId);
  45. in.moveHeader(sizeof(iRequestId));
  46. rsp.iRequestId = ntohl(*((unsigned int *)(sRequestId.c_str())));
  47. len = iHeaderLen - sizeof(iHeaderLen) - sizeof(iRequestId);
  48. in.getHeader(len, rsp.sBuffer);
  49. in.moveHeader(len);
  50. return TC_NetWorkBuffer::PACKET_FULL;
  51. }
  52. /*
  53. Whole package length (4 bytes) + irequestid (4 bytes) + package content
  54. */
  55. static shared_ptr<TC_NetWorkBuffer::Buffer> customRequest(RequestPacket& request, TC_Transceiver *)
  56. {
  57. shared_ptr<TC_NetWorkBuffer::Buffer> buff = std::make_shared<TC_NetWorkBuffer::Buffer>();
  58. unsigned int net_bufflength = htonl(request.sBuffer.size()+8);
  59. unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength);
  60. vector<char> buffer;
  61. buffer.resize(request.sBuffer.size()+8);
  62. memcpy(buffer.data(), bufflengthptr, sizeof(unsigned int));
  63. unsigned int netrequestId = htonl(request.iRequestId);
  64. unsigned char * netrequestIdptr = (unsigned char*)(&netrequestId);
  65. memcpy(buffer.data() + sizeof(unsigned int), netrequestIdptr, sizeof(unsigned int));
  66. memcpy(buffer.data() + sizeof(unsigned int) * 2, request.sBuffer.data(), request.sBuffer.size());
  67. buff->addBuffer(buffer);
  68. return buff;
  69. }
  70. class CustomCallBack : public ServantProxyCallback
  71. {
  72. public:
  73. virtual int onDispatch(ReqMessagePtr msg)
  74. {
  75. if(msg->response->iRet != tars::TARSSERVERSUCCESS)
  76. {
  77. cout << "ret error:" << msg->response->iRet << endl;
  78. }
  79. else
  80. {
  81. // cout << "succ:" << string(msg->response->sBuffer.data(), msg->response->sBuffer.size()) << endl;
  82. }
  83. ++callback_count;
  84. return msg->response->iRet;
  85. }
  86. };
  87. typedef tars::TC_AutoPtr<CustomCallBack> CustomCallBackPtr;
  88. void syncCall(int c)
  89. {
  90. string buffer(param.buffersize, 'a');
  91. int64_t t = TC_Common::now2us();
  92. //发起远程调用
  93. for (int i = 0; i < c; ++i)
  94. {
  95. string r;
  96. try
  97. {
  98. ResponsePacket rsp;
  99. param.servantPrx->rpc_call(param.servantPrx->tars_gen_requestid(), "doCustomFunc", buffer.c_str(), buffer.length(), rsp);
  100. }
  101. catch(exception& e)
  102. {
  103. cout << "exception:" << e.what() << endl;
  104. }
  105. ++callback_count;
  106. }
  107. int64_t cost = TC_Common::now2us() - t;
  108. cout << "syncCall total:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
  109. }
  110. void asyncCall(int c)
  111. {
  112. int64_t t = TC_Common::now2us();
  113. string buffer(param.buffersize, 'a');
  114. //发起远程调用
  115. for (int i = 0; i < c; ++i)
  116. {
  117. try
  118. {
  119. CustomCallBackPtr cb = new CustomCallBack();
  120. param.servantPrx->rpc_call_async(param.servantPrx->tars_gen_requestid(), "doCustomFunc", buffer.c_str(), buffer.length(), cb);
  121. }
  122. catch(exception& e)
  123. {
  124. cout << "exception:" << e.what() << endl;
  125. }
  126. }
  127. int64_t cost = TC_Common::now2us() - t;
  128. cout << "asyncCall send:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
  129. }
  130. int main(int argc, char *argv[])
  131. {
  132. try
  133. {
  134. if (argc < 6)
  135. {
  136. cout << "Usage:" << argv[0] << "--count=1000 --call=[sync|async] --thread=1 --buffersize=1000 --netthread=1" << endl;
  137. return 0;
  138. }
  139. TC_Option option;
  140. option.decode(argc, argv);
  141. param.count = TC_Common::strto<int>(option.getValue("count"));
  142. if(param.count <= 0) param.count = 1000;
  143. param.buffersize = TC_Common::strto<int>(option.getValue("buffersize"));
  144. if(param.buffersize <= 0) param.buffersize = 1000;
  145. param.call = option.getValue("call");
  146. if(param.call.empty()) param.call = "sync";
  147. param.thread = TC_Common::strto<int>(option.getValue("thread"));
  148. if(param.thread <= 0) param.thread = 1;
  149. param.netthread = TC_Common::strto<int>(option.getValue("netthread"));
  150. if(param.netthread <= 0) param.netthread = 1;
  151. _comm = new Communicator();
  152. _comm->setProperty("sendqueuelimit", "1000000");
  153. _comm->setProperty("asyncqueuecap", "1000000");
  154. _comm->setProperty("netthread", TC_Common::tostr(param.netthread));
  155. param.servantPrx = _comm->stringToProxy<ServantPrx>(sObjName);
  156. // LocalRollLogger::getInstance()->logger()->setLogLevel(6);
  157. ProxyProtocol prot;
  158. prot.requestFunc = customRequest;
  159. prot.responseFunc = customResponse;
  160. param.servantPrx->tars_set_protocol(prot);
  161. param.servantPrx->tars_connect_timeout(5000);
  162. param.servantPrx->tars_async_timeout(60*1000);
  163. int64_t start = TC_Common::now2us();
  164. std::function<void(int)> func;
  165. if (param.call == "sync")
  166. {
  167. func = syncCall;
  168. }
  169. else if (param.call == "async")
  170. {
  171. func = asyncCall;
  172. }
  173. else
  174. {
  175. cout << "no func, exits" << endl;
  176. exit(0);
  177. }
  178. vector<std::thread*> vt;
  179. for(int i = 0 ; i< param.thread; i++)
  180. {
  181. vt.push_back(new std::thread(func, param.count));
  182. }
  183. std::thread print([&]{while(callback_count != param.count * param.thread) {
  184. cout << "Custom:" << param.call << ": ----------finish count:" << callback_count << endl;
  185. std::this_thread::sleep_for(std::chrono::seconds(1));
  186. };});
  187. for(size_t i = 0 ; i< vt.size(); i++)
  188. {
  189. vt[i]->join();
  190. delete vt[i];
  191. }
  192. cout << "(pid:" << std::this_thread::get_id() << ")"
  193. << "(count:" << param.count << ")"
  194. << "(use ms:" << (TC_Common::now2us() - start)/1000 << ")"
  195. << endl;
  196. while(callback_count != param.count * param.thread) {
  197. std::this_thread::sleep_for(std::chrono::seconds(1));
  198. }
  199. print.join();
  200. cout << "----------finish count:" << callback_count << endl;
  201. }
  202. catch(exception &ex)
  203. {
  204. cout << ex.what() << endl;
  205. }
  206. cout << "main return." << endl;
  207. return 0;
  208. }