TestRecvThread.cpp 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. #include "TestRecvThread.h"
  2. #include <iostream>
  3. #include <arpa/inet.h>
  4. /*
  5. 响应包解码函数,根据特定格式解码从服务端收到的数据,解析为ResponsePacket
  6. */
  7. static size_t pushResponse(const char* recvBuffer, size_t length, list<ResponsePacket>& done)
  8. {
  9. size_t pos = 0;
  10. while (pos < length)
  11. {
  12. unsigned int len = length - pos;
  13. if(len < sizeof(unsigned int))
  14. {
  15. break;
  16. }
  17. unsigned int iHeaderLen = ntohl(*(unsigned int*)(recvBuffer + pos));
  18. //做一下保护,长度大于M
  19. if (iHeaderLen > 100000 || iHeaderLen < sizeof(unsigned int))
  20. {
  21. throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen));
  22. }
  23. //包没有接收全
  24. if (len < iHeaderLen)
  25. {
  26. break;
  27. }
  28. else
  29. {
  30. ResponsePacket rsp;
  31. rsp.iRequestId = ntohl(*((unsigned int *)(recvBuffer + pos + sizeof(unsigned int))));
  32. rsp.sBuffer.resize(iHeaderLen - 2*sizeof(unsigned int));
  33. ::memcpy(&rsp.sBuffer[0], recvBuffer + pos + 2*sizeof(unsigned int), iHeaderLen - 2*sizeof(unsigned int));
  34. pos += iHeaderLen;
  35. done.push_back(rsp);
  36. }
  37. }
  38. return pos;
  39. }
  40. /*
  41. 请求包编码函数,本函数的打包格式为
  42. 整个包长度(字节)+iRequestId(字节)+包内容
  43. */
  44. static void pushRequest(const RequestPacket& request, string& buff)
  45. {
  46. unsigned int net_bufflength = htonl(request.sBuffer.size()+8);
  47. unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength);
  48. buff = "";
  49. for (int i = 0; i<4; ++i)
  50. {
  51. buff += *bufflengthptr++;
  52. }
  53. unsigned int netrequestId = htonl(request.iRequestId);
  54. unsigned char * netrequestIdptr = (unsigned char*)(&netrequestId);
  55. for (int i = 0; i<4; ++i)
  56. {
  57. buff += *netrequestIdptr++;
  58. }
  59. string tmp;
  60. tmp.assign((const char*)(&request.sBuffer[0]), request.sBuffer.size());
  61. buff+=tmp;
  62. }
  63. static void printResult(int iRequestId, const string &sResponseStr)
  64. {
  65. cout << "request id: " << iRequestId << endl;
  66. cout << "response str: " << sResponseStr << endl;
  67. }
  68. static void printPushInfo(const string &sResponseStr)
  69. {
  70. cout << "push message: " << sResponseStr << endl;
  71. }
  72. int TestPushCallBack::onDispatch(ReqMessagePtr msg)
  73. {
  74. if(msg->request.sFuncName == "printResult")
  75. {
  76. string sRet;
  77. cout << "sBuffer: " << msg->response.sBuffer.size() << endl;
  78. sRet.assign(&(msg->response.sBuffer[0]), msg->response.sBuffer.size());
  79. printResult(msg->request.iRequestId, sRet);
  80. return 0;
  81. }
  82. else if(msg->response.iRequestId == 0)
  83. {
  84. string sRet;
  85. sRet.assign(&(msg->response.sBuffer[0]), msg->response.sBuffer.size());
  86. printPushInfo(sRet);
  87. return 0;
  88. }
  89. else
  90. {
  91. cout << "no match func!" <<endl;
  92. }
  93. return -3;
  94. }
  95. RecvThread::RecvThread():_bTerminate(false)
  96. {
  97. string sObjName = "Test.TestPushServer.TestPushServantObj";
  98. string sObjHost = "tcp -h 10.120.129.226 -t 60000 -p 10099";
  99. _prx = _comm.stringToProxy<ServantPrx>(sObjName+"@"+sObjHost);
  100. ProxyProtocol prot;
  101. prot.requestFunc = pushRequest;
  102. prot.responseFunc = pushResponse;
  103. _prx->tars_set_protocol(prot);
  104. }
  105. void RecvThread::terminate()
  106. {
  107. _bTerminate = true;
  108. {
  109. tars::TC_ThreadLock::Lock sync(*this);
  110. notifyAll();
  111. }
  112. }
  113. void RecvThread::run(void)
  114. {
  115. TestPushCallBackPtr cbPush = new TestPushCallBack();
  116. _prx->tars_set_push_callback(cbPush);
  117. string buf("heartbeat");
  118. while(!_bTerminate)
  119. {
  120. {
  121. try
  122. {
  123. TestPushCallBackPtr cb = new TestPushCallBack();
  124. _prx->rpc_call_async(_prx->tars_gen_requestid(), "printResult", buf.c_str(), buf.length(), cb);
  125. }
  126. catch(TarsException& e)
  127. {
  128. cout << "TarsException: " << e.what() << endl;
  129. }
  130. catch(...)
  131. {
  132. cout << "unknown exception" << endl;
  133. }
  134. }
  135. {
  136. TC_ThreadLock::Lock sync(*this);
  137. timedWait(5000);
  138. }
  139. }
  140. }