main.cpp 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. #include "servant/Application.h"
  2. #include "util/tc_network_buffer.h"
  3. #include <iostream>
  4. using namespace std;
  5. using namespace tars;
  6. Communicator* _comm;
  7. static string sObjName = "TestApp.CustomServer.CustomServantObj@tcp -h 127.0.0.1 -t 60000 -p 9400";
  8. struct Param
  9. {
  10. int count;
  11. string call;
  12. int thread;
  13. int buffersize;
  14. int netthread;
  15. ServantPrx servantPrx;
  16. };
  17. Param param;
  18. std::atomic<int> callback_count(0);
  19. //The response packet decoding function decodes the data received from the server according to the specific format and resolves it to theResponsePacket
  20. static TC_NetWorkBuffer::PACKET_TYPE customResponse(TC_NetWorkBuffer &in, ResponsePacket& rsp)
  21. {
  22. size_t len = sizeof(tars::Int32);
  23. if (in.getBufferLength() < len)
  24. {
  25. return TC_NetWorkBuffer::PACKET_LESS;
  26. }
  27. string header;
  28. in.getHeader(len, header);
  29. assert(header.size() == len);
  30. tars::Int32 iHeaderLen = 0;
  31. ::memcpy(&iHeaderLen, header.c_str(), sizeof(tars::Int32));
  32. iHeaderLen = ntohl(iHeaderLen);
  33. if (iHeaderLen > 100000 || iHeaderLen < (int)sizeof(unsigned int))
  34. {
  35. throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen));
  36. }
  37. if (in.getBufferLength() < (uint32_t)iHeaderLen)
  38. {
  39. return TC_NetWorkBuffer::PACKET_LESS;
  40. }
  41. in.moveHeader(sizeof(iHeaderLen));
  42. tars::Int32 iRequestId = 0;
  43. string sRequestId;
  44. in.getHeader(sizeof(iRequestId), sRequestId);
  45. in.moveHeader(sizeof(iRequestId));
  46. rsp.iRequestId = ntohl(*((unsigned int *)(sRequestId.c_str())));
  47. len = iHeaderLen - sizeof(iHeaderLen) - sizeof(iRequestId);
  48. in.getHeader(len, rsp.sBuffer);
  49. in.moveHeader(len);
  50. return TC_NetWorkBuffer::PACKET_FULL;
  51. }
  52. /*
  53. Whole package length (4 bytes) + irequestid (4 bytes) + package content
  54. */
  55. static vector<char> customRequest(RequestPacket& request, Transceiver *)
  56. {
  57. unsigned int net_bufflength = htonl(request.sBuffer.size()+8);
  58. unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength);
  59. vector<char> buffer;
  60. buffer.resize(request.sBuffer.size()+8);
  61. memcpy(buffer.data(), bufflengthptr, sizeof(unsigned int));
  62. unsigned int netrequestId = htonl(request.iRequestId);
  63. unsigned char * netrequestIdptr = (unsigned char*)(&netrequestId);
  64. memcpy(buffer.data() + sizeof(unsigned int), netrequestIdptr, sizeof(unsigned int));
  65. memcpy(buffer.data() + sizeof(unsigned int) * 2, request.sBuffer.data(), request.sBuffer.size());
  66. return buffer;
  67. }
  68. class CustomCallBack : public ServantProxyCallback
  69. {
  70. public:
  71. virtual int onDispatch(ReqMessagePtr msg)
  72. {
  73. if(msg->response->iRet != tars::TARSSERVERSUCCESS)
  74. {
  75. cout << "ret error:" << msg->response->iRet << endl;
  76. }
  77. else
  78. {
  79. // cout << "succ:" << string(msg->response->sBuffer.data(), msg->response->sBuffer.size()) << endl;
  80. }
  81. ++callback_count;
  82. return msg->response->iRet;
  83. }
  84. };
  85. typedef tars::TC_AutoPtr<CustomCallBack> CustomCallBackPtr;
  86. void syncCall(int c)
  87. {
  88. string buffer(param.buffersize, 'a');
  89. int64_t t = TC_Common::now2us();
  90. //发起远程调用
  91. for (int i = 0; i < c; ++i)
  92. {
  93. string r;
  94. try
  95. {
  96. ResponsePacket rsp;
  97. param.servantPrx->rpc_call(param.servantPrx->tars_gen_requestid(), "doCustomFunc", buffer.c_str(), buffer.length(), rsp);
  98. }
  99. catch(exception& e)
  100. {
  101. cout << "exception:" << e.what() << endl;
  102. }
  103. ++callback_count;
  104. }
  105. int64_t cost = TC_Common::now2us() - t;
  106. cout << "syncCall total:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
  107. }
  108. void asyncCall(int c)
  109. {
  110. int64_t t = TC_Common::now2us();
  111. string buffer(param.buffersize, 'a');
  112. //发起远程调用
  113. for (int i = 0; i < c; ++i)
  114. {
  115. try
  116. {
  117. CustomCallBackPtr cb = new CustomCallBack();
  118. param.servantPrx->rpc_call_async(param.servantPrx->tars_gen_requestid(), "doCustomFunc", buffer.c_str(), buffer.length(), cb);
  119. }
  120. catch(exception& e)
  121. {
  122. cout << "exception:" << e.what() << endl;
  123. }
  124. }
  125. int64_t cost = TC_Common::now2us() - t;
  126. cout << "asyncCall send:" << cost << "us, avg:" << 1.*cost/c << "us" << endl;
  127. }
  128. int main(int argc, char *argv[])
  129. {
  130. try
  131. {
  132. if (argc < 6)
  133. {
  134. cout << "Usage:" << argv[0] << "--count=1000 --call=[sync|async] --thread=1 --buffersize=1000 --netthread=1" << endl;
  135. return 0;
  136. }
  137. TC_Option option;
  138. option.decode(argc, argv);
  139. param.count = TC_Common::strto<int>(option.getValue("count"));
  140. if(param.count <= 0) param.count = 1000;
  141. param.buffersize = TC_Common::strto<int>(option.getValue("buffersize"));
  142. if(param.buffersize <= 0) param.buffersize = 1000;
  143. param.call = option.getValue("call");
  144. if(param.call.empty()) param.call = "sync";
  145. param.thread = TC_Common::strto<int>(option.getValue("thread"));
  146. if(param.thread <= 0) param.thread = 1;
  147. param.netthread = TC_Common::strto<int>(option.getValue("netthread"));
  148. if(param.netthread <= 0) param.netthread = 1;
  149. _comm = new Communicator();
  150. _comm->setProperty("sendqueuelimit", "1000000");
  151. _comm->setProperty("asyncqueuecap", "1000000");
  152. _comm->setProperty("netthread", TC_Common::tostr(param.netthread));
  153. param.servantPrx = _comm->stringToProxy<ServantPrx>(sObjName);
  154. // LocalRollLogger::getInstance()->logger()->setLogLevel(6);
  155. ProxyProtocol prot;
  156. prot.requestFunc = customRequest;
  157. prot.responseFunc = customResponse;
  158. param.servantPrx->tars_set_protocol(prot);
  159. param.servantPrx->tars_connect_timeout(5000);
  160. param.servantPrx->tars_async_timeout(60*1000);
  161. int64_t start = TC_Common::now2us();
  162. std::function<void(int)> func;
  163. if (param.call == "sync")
  164. {
  165. func = syncCall;
  166. }
  167. else if (param.call == "async")
  168. {
  169. func = asyncCall;
  170. }
  171. else
  172. {
  173. cout << "no func, exits" << endl;
  174. exit(0);
  175. }
  176. vector<std::thread*> vt;
  177. for(int i = 0 ; i< param.thread; i++)
  178. {
  179. vt.push_back(new std::thread(func, param.count));
  180. }
  181. std::thread print([&]{while(callback_count != param.count * param.thread) {
  182. cout << "Custom:" << param.call << ": ----------finish count:" << callback_count << endl;
  183. std::this_thread::sleep_for(std::chrono::seconds(1));
  184. };});
  185. for(size_t i = 0 ; i< vt.size(); i++)
  186. {
  187. vt[i]->join();
  188. delete vt[i];
  189. }
  190. cout << "(pid:" << std::this_thread::get_id() << ")"
  191. << "(count:" << param.count << ")"
  192. << "(use ms:" << (TC_Common::now2us() - start)/1000 << ")"
  193. << endl;
  194. while(callback_count != param.count * param.thread) {
  195. std::this_thread::sleep_for(std::chrono::seconds(1));
  196. }
  197. print.join();
  198. cout << "----------finish count:" << callback_count << endl;
  199. }
  200. catch(exception &ex)
  201. {
  202. cout << ex.what() << endl;
  203. }
  204. cout << "main return." << endl;
  205. return 0;
  206. }