TestRecvThread.cpp 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. #include "TestRecvThread.h"
  2. #include <iostream>
  3. //#include <arpa/inet.h>
  4. /*
  5. 响应包解码函数,根据特定格式解码从服务端收到的数据,解析为ResponsePacket
  6. */
  7. static TC_NetWorkBuffer::PACKET_TYPE pushResponse(TC_NetWorkBuffer &in, ResponsePacket& rsp)
  8. {
  9. size_t len = sizeof(tars::Int32);
  10. if (in.getBufferLength() < len)
  11. {
  12. return TC_NetWorkBuffer::PACKET_LESS;
  13. }
  14. string header;
  15. in.getHeader(len, header);
  16. assert(header.size() == len);
  17. tars::Int32 iHeaderLen = 0;
  18. ::memcpy(&iHeaderLen, header.c_str(), sizeof(tars::Int32));
  19. iHeaderLen = ntohl(iHeaderLen);
  20. //做一下保护,长度大于M
  21. if (iHeaderLen > 100000 || iHeaderLen < (int)sizeof(unsigned int))
  22. {
  23. throw TarsDecodeException("packet length too long or too short,len:" + TC_Common::tostr(iHeaderLen));
  24. }
  25. //包没有接收全
  26. if (in.getBufferLength() < (uint32_t)iHeaderLen)
  27. {
  28. return TC_NetWorkBuffer::PACKET_LESS;
  29. }
  30. in.moveHeader(sizeof(iHeaderLen));
  31. tars::Int32 iRequestId = 0;
  32. string sRequestId;
  33. in.getHeader(sizeof(iRequestId), sRequestId);
  34. in.moveHeader(sizeof(iRequestId));
  35. rsp.iRequestId = ntohl(*((unsigned int *)(sRequestId.c_str())));
  36. len = iHeaderLen - sizeof(iHeaderLen) - sizeof(iRequestId);
  37. in.getHeader(len, rsp.sBuffer);
  38. in.moveHeader(len);
  39. return TC_NetWorkBuffer::PACKET_FULL;
  40. }
  41. /*
  42. 请求包编码函数,本函数的打包格式为
  43. 整个包长度(4字节)+iRequestId(4字节)+包内容
  44. */
  45. static shared_ptr<TC_NetWorkBuffer::Buffer> pushRequest(RequestPacket& request, TC_Transceiver *)
  46. {
  47. shared_ptr<TC_NetWorkBuffer::Buffer> buff = std::make_shared<TC_NetWorkBuffer::Buffer>();
  48. unsigned int net_bufflength = htonl(request.sBuffer.size()+8);
  49. unsigned char * bufflengthptr = (unsigned char*)(&net_bufflength);
  50. vector<char> buffer;
  51. buffer.resize(request.sBuffer.size()+8);
  52. memcpy(buffer.data(), bufflengthptr, sizeof(unsigned int));
  53. unsigned int netrequestId = htonl(request.iRequestId);
  54. unsigned char * netrequestIdptr = (unsigned char*)(&netrequestId);
  55. memcpy(buffer.data() + sizeof(unsigned int), netrequestIdptr, sizeof(unsigned int));
  56. memcpy(buffer.data() + sizeof(unsigned int) * 2, request.sBuffer.data(), request.sBuffer.size());
  57. buff->addBuffer(buffer);
  58. return buff;
  59. // sbuff->addBuffer(buffer);
  60. }
  61. static void printResult(int iRequestId, const string &sResponseStr)
  62. {
  63. cout << "request id: " << iRequestId << ", response str: " << sResponseStr << endl;
  64. }
  65. static void printPushInfo(const string &sResponseStr)
  66. {
  67. cout << "push message: " << sResponseStr << endl;
  68. }
  69. int TestPushCallBack::onDispatch(ReqMessagePtr msg)
  70. {
  71. if(msg->request.sFuncName == "printResult")
  72. {
  73. string sRet;
  74. sRet.assign(msg->response->sBuffer.data(), msg->response->sBuffer.size());
  75. printResult(msg->request.iRequestId, sRet);
  76. return 0;
  77. }
  78. else if(msg->response->iRequestId == 0)
  79. {
  80. string sRet;
  81. sRet.assign(msg->response->sBuffer.data(), msg->response->sBuffer.size());
  82. printPushInfo(sRet);
  83. return 0;
  84. }
  85. else
  86. {
  87. cout << "no match func!" <<endl;
  88. }
  89. return -3;
  90. }
  91. RecvThread::RecvThread(int second):_second(second), _bTerminate(false)
  92. {
  93. string sObjName = "TestApp.PushServer.TestPushServantObj@tcp -h 127.0.0.1 -t 60000 -p 9300";
  94. _prx = _comm.stringToProxy<ServantPrx>(sObjName);
  95. ProxyProtocol prot;
  96. prot.requestFunc = pushRequest;
  97. prot.responseFunc = pushResponse;
  98. _prx->tars_set_protocol(prot);
  99. }
  100. void RecvThread::run(void)
  101. {
  102. TestPushCallBackPtr cbPush = new TestPushCallBack();
  103. _prx->tars_set_push_callback(cbPush);
  104. string buf("heartbeat");
  105. time_t n = TNOW;
  106. while(!_bTerminate)
  107. {
  108. {
  109. try
  110. {
  111. TestPushCallBackPtr cb = new TestPushCallBack();
  112. _prx->rpc_call_async(_prx->tars_gen_requestid(), "printResult", buf.c_str(), buf.length(), cb);
  113. }
  114. catch(TarsException& e)
  115. {
  116. cout << "TarsException: " << e.what() << endl;
  117. }
  118. catch(...)
  119. {
  120. cout << "unknown exception" << endl;
  121. }
  122. }
  123. if(TNOW - n >= _second)
  124. {
  125. _bTerminate = true;
  126. break;
  127. }
  128. {
  129. TC_ThreadLock::Lock sync(*this);
  130. timedWait(500);
  131. }
  132. }
  133. }