Current.cpp 8.8 KB


  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "servant/Current.h"
  17. #include "servant/ServantHandle.h"
  18. #include "servant/BaseF.h"
  19. #include "servant/Application.h"
  20. #include "tup/tup.h"
  21. #include <cerrno>
  22. namespace tars
  23. {
  24. //////////////////////////////////////////////////////////////////
  25. Current::Current(ServantHandle *pServantHandle)
  26. : _servantHandle(pServantHandle)
  27. , _response(true)
  28. , _ret(0)
  29. , _reportStat(true)
  30. {
  31. }
  32. Current::~Current()
  33. {
  34. //TUP调用或单向调用,从服务端上报调用信息
  35. if(_reportStat)
  36. {
  37. if(_request.iVersion == TUPVERSION )
  38. {
  39. reportToStat("tup_client");
  40. }
  41. else if(_request.cPacketType == TARSONEWAY)
  42. {
  43. reportToStat("one_way_client");
  44. }
  45. else if(!_data->adapter()->isTarsProtocol() && ServerConfig::ReportFlow)
  46. {
  47. //非tars客户端 从服务端上报调用信息
  48. reportToStat("not_tars_client");
  49. }
  50. }
  51. }
  52. const string &Current::getHostName() const
  53. {
  54. auto it = _request.context.find("node_name");
  55. if(it != _request.context.end())
  56. {
  57. return it->second;
  58. }
  59. return _data->ip();
  60. }
  61. const string &Current::getIp() const
  62. {
  63. return _data->ip();
  64. }
  65. int Current::getPort() const
  66. {
  67. return _data->port();
  68. }
  69. uint32_t Current::getUId() const
  70. {
  71. return _data->uid();
  72. }
  73. string Current::getServantName() const
  74. {
  75. return _request.sServantName;
  76. }
  77. short Current::getRequestVersion() const
  78. {
  79. return _request.iVersion;
  80. }
  81. map<string, string>& Current::getContext()
  82. {
  83. return _request.context;
  84. }
  85. const map<string, string>& Current::getRequestStatus() const
  86. {
  87. return _request.status;
  88. }
  89. string Current::getFuncName() const
  90. {
  91. return _request.sFuncName;
  92. }
  93. uint32_t Current::getRequestId() const
  94. {
  95. return _request.iRequestId;
  96. }
  97. char Current::getPacketType() const
  98. {
  99. return _request.cPacketType;
  100. }
  101. int Current::getMessageType() const
  102. {
  103. return _request.iMessageType;
  104. }
  105. struct timeval Current::getRecvTime() const
  106. {
  107. timeval tm;
  108. tm.tv_sec = _data->recvTimeStamp() / 1000;
  109. tm.tv_usec = (_data->recvTimeStamp() % 1000) * 1000;
  110. return tm;
  111. }
  112. void Current::setReportStat(bool bReport)
  113. {
  114. _reportStat = bReport;
  115. }
  116. const vector<char>& Current::getRequestBuffer() const
  117. {
  118. if (_data->adapter()->isTarsProtocol())
  119. {
  120. return _request.sBuffer;
  121. }
  122. else
  123. {
  124. return _data->buffer();
  125. }
  126. }
  127. bool Current::isResponse() const
  128. {
  129. return _response;
  130. }
  131. void Current::setCloseType(int type)
  132. {
  133. _data->setCloseType(type);
  134. }
  135. int Current::getCloseType() const
  136. {
  137. return _data->closeType();
  138. }
  139. void Current::initialize(const shared_ptr<TC_EpollServer::RecvContext> &data)
  140. {
  141. _data = data;
  142. Application *application = (Application*)this->_servantHandle->getApplication();
  143. _request.sServantName = application->getServantHelper()->getAdapterServant(_data->adapter()->getName());
  144. // _request.sServantName = ServantHelperManager::getInstance()->getAdapterServant(_data->adapter()->getName());
  145. if (_data->adapter()->isTarsProtocol())
  146. {
  147. initialize(_data->buffer());
  148. }
  149. }
  150. void Current::initializeClose(const shared_ptr<TC_EpollServer::RecvContext> &data)
  151. {
  152. _data = data;
  153. Application *application = (Application*)this->_servantHandle->getApplication();
  154. _request.sServantName = application->getServantHelper()->getAdapterServant(_data->adapter()->getName());
  155. }
  156. void Current::initialize(const vector<char>& sRecvBuffer)
  157. {
  158. TarsInputStream<BufferReader> is;
  159. is.setBuffer(sRecvBuffer.data(), sRecvBuffer.size());
  160. _request.readFrom(is);
  161. }
  162. void Current::sendResponse(const char *buff, uint32_t len)
  163. {
  164. shared_ptr<TC_EpollServer::SendContext> send = _data->createSendContext();
  165. send->buffer()->assign(buff, len);
  166. _servantHandle->sendResponse(send);
  167. }
  168. void Current::sendResponse(int iRet, const vector<char> &buff)
  169. {
  170. //单向调用不需要返回
  171. if (_request.cPacketType == TARSONEWAY)
  172. {
  173. return;
  174. }
  175. ResponsePacket response;
  176. response.sBuffer = buff;
  177. sendResponse(iRet, response, TARS_STATUS(), "");
  178. }
  179. void Current::sendResponse(int iRet)
  180. {
  181. ResponsePacket response;
  182. sendResponse(iRet, response, TARS_STATUS(), "");
  183. }
  184. void Current::sendResponse(int iRet, tars::TarsOutputStream<tars::BufferWriterVector>& os)
  185. {
  186. ResponsePacket response;
  187. os.swap(response.sBuffer);
  188. sendResponse(iRet, response, TARS_STATUS(), "");
  189. }
  190. void Current::sendResponse(int iRet, tup::UniAttribute<tars::BufferWriterVector, tars::BufferReader>& attr)
  191. {
  192. ResponsePacket response;
  193. attr.encode(response.sBuffer);
  194. sendResponse(iRet, response, TARS_STATUS(), "");
  195. }
  196. void Current::sendResponse(int iRet, ResponsePacket &response, const map<string, string>& status, const string & sResultDesc)
  197. {
  198. _ret = iRet;
  199. //单向调用不需要返回
  200. if (_request.cPacketType == TARSONEWAY)
  201. {
  202. return;
  203. }
  204. shared_ptr<TC_EpollServer::SendContext> send = _data->createSendContext();
  205. Int32 iHeaderLen = 0;
  206. TarsOutputStream<BufferWriterVector> os;
  207. //先预留4个字节长度
  208. os.writeBuf((const char *)&iHeaderLen, sizeof(iHeaderLen));
  209. if (_request.iVersion != TUPVERSION)
  210. {
  211. response.iRequestId = _request.iRequestId;
  212. response.cPacketType = TARSNORMAL;
  213. response.iMessageType = _request.iMessageType;
  214. response.iVersion = _request.iVersion;
  215. response.status = status;
  216. response.context = _responseContext;
  217. // response.sBuffer = std::move(buffer);
  218. response.sResultDesc = sResultDesc;
  219. response.iRet = iRet;
  220. TLOGTARS("Current::sendResponse :"
  221. << response.iMessageType << "|"
  222. << _request.sServantName << "|"
  223. << _request.sFuncName << "|"
  224. << response.iRequestId << endl);
  225. response.writeTo(os);
  226. }
  227. else
  228. {
  229. //tup回应包用请求包的结构(这里和新版本TAF是有区别的)
  230. RequestPacket tupResponse;
  231. tupResponse.iRequestId = _request.iRequestId;
  232. tupResponse.iMessageType = _request.iMessageType;
  233. tupResponse.cPacketType = TARSNORMAL;
  234. tupResponse.iVersion = _request.iVersion;
  235. tupResponse.status = status;
  236. tupResponse.context = _responseContext;
  237. tupResponse.sBuffer.swap(response.sBuffer);
  238. tupResponse.sServantName = _request.sServantName;
  239. tupResponse.sFuncName = _request.sFuncName;
  240. //异常的情况下buffer可能为空,要保证有一个空UniAttribute的编码内容
  241. if(tupResponse.sBuffer.size() == 0)
  242. {
  243. tup::UniAttribute<> tarsAttr;
  244. tarsAttr.setVersion(_request.iVersion);
  245. tarsAttr.encode(response.sBuffer);
  246. }
  247. //iRet为0时,不记录在status里面,节省空间
  248. if(iRet != 0)
  249. {
  250. tupResponse.status[ServantProxy::STATUS_RESULT_CODE] = TC_Common::tostr(iRet);
  251. }
  252. //sResultDesc为空时,不记录在status里面,节省空间
  253. if(!sResultDesc.empty())
  254. {
  255. tupResponse.status[ServantProxy::STATUS_RESULT_DESC] = sResultDesc;
  256. }
  257. TLOGTARS("Current::sendResponse :"
  258. << tupResponse.iMessageType << "|"
  259. << _request.sServantName << "|"
  260. << _request.sFuncName << "|"
  261. << tupResponse.iRequestId << endl);
  262. tupResponse.writeTo(os);
  263. }
  264. assert(os.getLength() >= 4);
  265. iHeaderLen = htonl((int)(os.getLength()));
  266. memcpy(os.getByteBuffer().data(), (const char *)&iHeaderLen, sizeof(iHeaderLen));
  267. send->buffer()->swap(os.getByteBuffer());
  268. _servantHandle->sendResponse(send);
  269. }
  270. void Current::close()
  271. {
  272. if (_servantHandle)
  273. {
  274. _servantHandle->close(_data);
  275. }
  276. }
  277. ServantHandle* Current::getServantHandle()
  278. {
  279. return _servantHandle;
  280. }
  281. TC_EpollServer::BindAdapter* Current::getBindAdapter()
  282. {
  283. return _data->adapter().get();
  284. }
  285. void Current::reportToStat(const string& sObj)
  286. {
  287. StatReport* stat = Application::getCommunicator()->getStatReport();
  288. if(stat && stat->getStatPrx())
  289. {
  290. stat->report(sObj, "", _request.sFuncName, _data->ip(), 0, (StatReport::StatResult)_ret, TNOWMS - _data->recvTimeStamp(), 0, false);
  291. }
  292. }
  293. ////////////////////////////////////////////////////////////////////////////
  294. }