TarsCurrent.cpp 11 KB


  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "servant/TarsCurrent.h"
  17. #include "servant/ServantHandle.h"
  18. #include "servant/BaseF.h"
  19. #include "servant/Application.h"
  20. #include "tup/tup.h"
  21. #include <cerrno>
  22. namespace tars
  23. {
  24. //////////////////////////////////////////////////////////////////
  25. TarsCurrent::TarsCurrent(ServantHandle *pServantHandle)
  26. : _servantHandle(pServantHandle)
  27. // , _bindAdapter(NULL)
  28. // , _uid(0)
  29. // , _ip("NULL")
  30. // , _port(0)
  31. // , _fd(-1)
  32. , _response(true)
  33. // , _begintime(0)
  34. , _ret(0)
  35. , _reportStat(true)
  36. // , _closeType(-1)
  37. {
  38. }
  39. TarsCurrent::~TarsCurrent()
  40. {
  41. //TUP调用或单向调用,从服务端上报调用信息
  42. if(_reportStat)
  43. {
  44. if(_request.iVersion == TUPVERSION )
  45. {
  46. reportToStat("tup_client");
  47. }
  48. else if(_request.cPacketType == TARSONEWAY)
  49. {
  50. reportToStat("one_way_client");
  51. }
  52. else if(!_data->adapter()->isTarsProtocol() && ServerConfig::ReportFlow)
  53. {
  54. //非tars客户端 从服务端上报调用信息
  55. reportToStat("not_tars_client");
  56. }
  57. }
  58. }
  59. const string &TarsCurrent::getHostName() const
  60. {
  61. auto it = _request.context.find("node_name");
  62. if(it != _request.context.end())
  63. {
  64. return it->second;
  65. }
  66. return _data->ip();
  67. }
  68. const string &TarsCurrent::getIp() const
  69. {
  70. return _data->ip();
  71. }
  72. int TarsCurrent::getPort() const
  73. {
  74. return _data->port();
  75. // return _port;
  76. }
  77. uint32_t TarsCurrent::getUId() const
  78. {
  79. return _data->uid();
  80. // return _uid;
  81. }
  82. string TarsCurrent::getServantName() const
  83. {
  84. return _request.sServantName;
  85. }
  86. short TarsCurrent::getRequestVersion() const
  87. {
  88. return _request.iVersion;
  89. }
  90. map<string, string>& TarsCurrent::getContext()
  91. {
  92. return _request.context;
  93. }
  94. const map<string, string>& TarsCurrent::getRequestStatus() const
  95. {
  96. return _request.status;
  97. }
  98. string TarsCurrent::getFuncName() const
  99. {
  100. return _request.sFuncName;
  101. }
  102. uint32_t TarsCurrent::getRequestId() const
  103. {
  104. return _request.iRequestId;
  105. }
  106. char TarsCurrent::getPacketType() const
  107. {
  108. return _request.cPacketType;
  109. }
  110. tars::Int32 TarsCurrent::getMessageType() const
  111. {
  112. return _request.iMessageType;
  113. }
  114. struct timeval TarsCurrent::getRecvTime() const
  115. {
  116. timeval tm;
  117. tm.tv_sec = _data->recvTimeStamp()/1000;
  118. tm.tv_usec = (_data->recvTimeStamp()%1000)*1000;
  119. return tm;
  120. }
  121. void TarsCurrent::setReportStat(bool bReport)
  122. {
  123. _reportStat = bReport;
  124. }
  125. const vector<char>& TarsCurrent::getRequestBuffer() const
  126. {
  127. if (_data->adapter()->isTarsProtocol())
  128. {
  129. return _request.sBuffer;
  130. }
  131. else
  132. {
  133. return _data->buffer();
  134. }
  135. // return _request.sBuffer;
  136. }
  137. bool TarsCurrent::isResponse() const
  138. {
  139. return _response;
  140. }
  141. void TarsCurrent::setCloseType(int type)
  142. {
  143. _data->setCloseType(type);
  144. }
  145. int TarsCurrent::getCloseType() const
  146. {
  147. return _data->closeType();
  148. }
  149. void TarsCurrent::initialize(const shared_ptr<TC_EpollServer::RecvContext> &data)
  150. // void TarsCurrent::initialize(const TC_EpollServer::tagRecvData &stRecvData)
  151. {
  152. _data = data;
  153. _request.sServantName = ServantHelperManager::getInstance()->getAdapterServant(_data->adapter()->getName());
  154. if (_data->adapter()->isTarsProtocol())
  155. {
  156. initialize(_data->buffer());
  157. }
  158. // initialize(stRecvData, begintime);
  159. }
  160. void TarsCurrent::initializeClose(const shared_ptr<TC_EpollServer::RecvContext> &data)
  161. {
  162. _data = data;
  163. _request.sServantName = ServantHelperManager::getInstance()->getAdapterServant(_data->adapter()->getName());
  164. }
  165. void TarsCurrent::initialize(const vector<char>& sRecvBuffer)
  166. {
  167. TarsInputStream<BufferReader> is;
  168. is.setBuffer(sRecvBuffer.data(), sRecvBuffer.size());
  169. _request.readFrom(is);
  170. }
  171. // void TarsCurrent::initialize(const TC_EpollServer::tagRecvData &stRecvData, int64_t beginTime)
  172. // {
  173. // _ip = stRecvData.ip;
  174. // _port = stRecvData.port;
  175. // _uid = stRecvData.uid;
  176. // _fd = stRecvData.fd;
  177. // _bindAdapter = stRecvData.adapter.get();
  178. // _begintime = beginTime;
  179. // _request.sServantName = ServantHelperManager::getInstance()->getAdapterServant(stRecvData.adapter->getName());
  180. // if (_bindAdapter->isTarsProtocol())
  181. // {
  182. // initialize(stRecvData.buffer);
  183. // }
  184. // else
  185. // {
  186. // _request.sBuffer.reserve(stRecvData.buffer.length());
  187. // _request.sBuffer.resize(stRecvData.buffer.length());
  188. // ::memcpy(&_request.sBuffer[0], stRecvData.buffer.c_str(), stRecvData.buffer.length());
  189. // }
  190. // }
  191. // void TarsCurrent::initializeClose(const TC_EpollServer::tagRecvData &stRecvData)
  192. // {
  193. // _ip = stRecvData.ip;
  194. // _port = stRecvData.port;
  195. // _uid = stRecvData.uid;
  196. // _fd = stRecvData.fd;
  197. // _bindAdapter = stRecvData.adapter.get();
  198. // _request.sServantName = ServantHelperManager::getInstance()->getAdapterServant(stRecvData.adapter->getName());
  199. // _begintime = TNOWMS;
  200. // }
  201. // void TarsCurrent::initialize(const string &sRecvBuffer)
  202. // {
  203. // TarsInputStream<BufferReader> is;
  204. // is.setBuffer(sRecvBuffer.c_str(), sRecvBuffer.length());
  205. // _request.readFrom(is);
  206. // }
  207. void TarsCurrent::sendResponse(const char* buff, uint32_t len)
  208. {
  209. // _servantHandle->sendResponse(_uid, string(buff, len), _ip, _port, _fd);
  210. shared_ptr<TC_EpollServer::SendContext> send = _data->createSendContext();
  211. send->buffer()->assign(buff, len);
  212. _servantHandle->sendResponse(send);
  213. }
  214. void TarsCurrent::sendResponse(int iRet, const vector<char> &buff)
  215. {
  216. //单向调用不需要返回
  217. if (_request.cPacketType == TARSONEWAY)
  218. {
  219. return;
  220. }
  221. // ResponsePacket response;
  222. // response.sBuffer = buff;
  223. sendResponse(iRet, buff, TARS_STATUS(), "");
  224. }
  225. void TarsCurrent::sendResponse(int iRet)
  226. {
  227. // ResponsePacket response;
  228. sendResponse(iRet, vector<char>(), TARS_STATUS(), "");
  229. }
  230. void TarsCurrent::sendResponse(int iRet, tars::TarsOutputStream<tars::BufferWriterVector>& os)
  231. {
  232. // ResponsePacket response;
  233. // os.swap(response.sBuffer);
  234. sendResponse(iRet, os.getByteBuffer(), TARS_STATUS(), "");
  235. }
  236. void TarsCurrent::sendResponse(int iRet, tup::UniAttribute<tars::BufferWriterVector, tars::BufferReader>& attr)
  237. {
  238. ResponsePacket response;
  239. attr.encode(response.sBuffer);
  240. sendResponse(iRet, response.sBuffer, TARS_STATUS(), "");
  241. }
  242. void TarsCurrent::sendResponse(int iRet, const vector<char> &buffer, const map<string, string>& status, const string & sResultDesc)
  243. {
  244. _ret = iRet;
  245. //单向调用不需要返回
  246. if (_request.cPacketType == TARSONEWAY)
  247. {
  248. return;
  249. }
  250. shared_ptr<TC_EpollServer::SendContext> send = _data->createSendContext();
  251. tars::Int32 iHeaderLen = 0;
  252. TarsOutputStream<BufferWriterVector> os;
  253. //先预留4个字节长度
  254. os.writeBuf((const char *)&iHeaderLen, sizeof(iHeaderLen));
  255. if (_request.iVersion != TUPVERSION)
  256. {
  257. ResponsePacket response;
  258. response.iRequestId = _request.iRequestId;
  259. response.iMessageType = _request.iMessageType;
  260. response.cPacketType = TARSNORMAL;
  261. response.iVersion = _request.iVersion;
  262. response.status = status;
  263. response.sBuffer = std::move(buffer);
  264. response.sResultDesc = sResultDesc;
  265. response.context = _responseContext;
  266. response.iRet = iRet;
  267. TLOGTARS("[TARS]TarsCurrent::sendResponse :"
  268. << response.iMessageType << "|"
  269. << _request.sServantName << "|"
  270. << _request.sFuncName << "|"
  271. << response.iRequestId << endl);
  272. response.writeTo(os);
  273. }
  274. else
  275. {
  276. //tup回应包用请求包的结构(这里和新版本TAF是有区别的)
  277. RequestPacket response;
  278. response.iRequestId = _request.iRequestId;
  279. response.iMessageType = _request.iMessageType;
  280. response.cPacketType = TARSNORMAL;
  281. response.iVersion = _request.iVersion;
  282. response.status = status;
  283. response.context = _responseContext;
  284. response.sBuffer = std::move(buffer);
  285. response.sServantName = _request.sServantName;
  286. response.sFuncName = _request.sFuncName;
  287. //异常的情况下buffer可能为空,要保证有一个空UniAttribute的编码内容
  288. if(response.sBuffer.size() == 0)
  289. {
  290. tup::UniAttribute<> tarsAttr;
  291. tarsAttr.setVersion(_request.iVersion);
  292. tarsAttr.encode(response.sBuffer);
  293. }
  294. //iRet为0时,不记录在status里面,节省空间
  295. if(iRet != 0)
  296. {
  297. response.status[ServantProxy::STATUS_RESULT_CODE] = TC_Common::tostr(iRet);
  298. }
  299. //sResultDesc为空时,不记录在status里面,节省空间
  300. if(!sResultDesc.empty())
  301. {
  302. response.status[ServantProxy::STATUS_RESULT_DESC] = sResultDesc;
  303. }
  304. TLOGTARS("[TARS]TarsCurrent::sendResponse :"
  305. << response.iMessageType << "|"
  306. << _request.sServantName << "|"
  307. << _request.sFuncName << "|"
  308. << response.iRequestId << endl);
  309. response.writeTo(os);
  310. }
  311. assert(os.getLength() >= 4);
  312. iHeaderLen = htonl((int)(os.getLength()));
  313. memcpy(os.getByteBuffer().data(), (const char *)&iHeaderLen, sizeof(iHeaderLen));
  314. send->buffer()->swap(os.getByteBuffer());
  315. _servantHandle->sendResponse(send);
  316. }
  317. void TarsCurrent::close()
  318. {
  319. if (_servantHandle)
  320. {
  321. _servantHandle->close(_data);
  322. }
  323. }
  324. ServantHandle* TarsCurrent::getServantHandle()
  325. {
  326. return _servantHandle;
  327. }
  328. TC_EpollServer::BindAdapter* TarsCurrent::getBindAdapter()
  329. {
  330. return _data->adapter().get();
  331. }
  332. void TarsCurrent::reportToStat(const string& sObj)
  333. {
  334. StatReport* stat = Application::getCommunicator()->getStatReport();
  335. if(stat && stat->getStatPrx())
  336. {
  337. // int64_t endtime = TNOWMS;
  338. // int sptime = endtime - _begintime;
  339. //被调上报自己的set信息,set信息在setReportInfo设置
  340. // stat->report(sObj, "" , _request.sServantName, _data->ip(), 0, _request.sFuncName, (StatReport::StatResult)_ret, TNOWMS - _data->recvTimeStamp(), 0);
  341. stat->report(sObj, "", _request.sFuncName, _data->ip(), 0, (StatReport::StatResult)_ret, TNOWMS - _data->recvTimeStamp(), 0, false);
  342. }
  343. }
  344. ////////////////////////////////////////////////////////////////////////////
  345. }