Current.cpp 11 KB


  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "servant/Current.h"
  17. #include "servant/ServantHandle.h"
  18. #include "servant/BaseF.h"
  19. #include "servant/Application.h"
  20. #include "tup/tup.h"
  21. #include <cerrno>
  22. namespace tars
  23. {
  24. //////////////////////////////////////////////////////////////////
  25. Current::Current(ServantHandle *pServantHandle)
  26. : _servantHandle(pServantHandle)
  27. , _response(true)
  28. , _ret(0)
  29. , _reportStat(true)
  30. , _traceCall(false)
  31. {
  32. }
  33. Current::~Current()
  34. {
  35. //TUP调用或单向调用,从服务端上报调用信息
  36. if(_reportStat)
  37. {
  38. if(_request.iVersion == TUPVERSION )
  39. {
  40. reportToStat("tup_client");
  41. }
  42. else if(_request.cPacketType == TARSONEWAY)
  43. {
  44. reportToStat("one_way_client");
  45. }
  46. else if(_request.cPacketType == TARSNORMAL)
  47. {
  48. reportToStat("stat_from_server");
  49. }
  50. else if (!_isTars && _servantHandle)
  51. {
  52. if(_servantHandle->getApplication()->applicationConfig().reportFlow)
  53. {
  54. //非tars客户端 从服务端上报调用信息
  55. reportToStat("not_tars_client");
  56. }
  57. }
  58. }
  59. }
  60. const string &Current::getHostName() const
  61. {
  62. auto it = _request.context.find("node_name");
  63. if(it != _request.context.end() && !(it->second.empty()) )
  64. {
  65. return it->second;
  66. }
  67. return _data->ip();
  68. }
  69. const string &Current::getIp() const
  70. {
  71. return _data->ip();
  72. }
  73. int Current::getPort() const
  74. {
  75. return _data->port();
  76. }
  77. uint32_t Current::getUId() const
  78. {
  79. return _data->uid();
  80. }
  81. string Current::getServantName() const
  82. {
  83. return _request.sServantName;
  84. }
  85. short Current::getRequestVersion() const
  86. {
  87. return _request.iVersion;
  88. }
  89. map<string, string>& Current::getContext()
  90. {
  91. return _request.context;
  92. }
  93. const map<string, string>& Current::getRequestStatus() const
  94. {
  95. return _request.status;
  96. }
  97. string Current::getFuncName() const
  98. {
  99. return _request.sFuncName;
  100. }
  101. uint32_t Current::getRequestId() const
  102. {
  103. return _request.iRequestId;
  104. }
  105. char Current::getPacketType() const
  106. {
  107. return _request.cPacketType;
  108. }
  109. int Current::getMessageType() const
  110. {
  111. return _request.iMessageType;
  112. }
  113. struct timeval Current::getRecvTime() const
  114. {
  115. timeval tm;
  116. tm.tv_sec = _data->recvTimeStamp() / 1000;
  117. tm.tv_usec = (_data->recvTimeStamp() % 1000) * 1000;
  118. return tm;
  119. }
  120. void Current::setReportStat(bool bReport)
  121. {
  122. _reportStat = bReport;
  123. }
  124. const vector<char>& Current::getRequestBuffer() const
  125. {
  126. if (_isTars)
  127. {
  128. return _request.sBuffer;
  129. }
  130. else
  131. {
  132. return _data->buffer();
  133. }
  134. }
  135. bool Current::isResponse() const
  136. {
  137. return _response;
  138. }
  139. void Current::setCloseType(int type)
  140. {
  141. _data->setCloseType(type);
  142. }
  143. int Current::getCloseType() const
  144. {
  145. return _data->closeType();
  146. }
  147. void Current::initialize(const shared_ptr<TC_EpollServer::RecvContext> &data)
  148. {
  149. _data = data;
  150. Application *application = (Application*)this->_servantHandle->getApplication();
  151. _request.sServantName = application->getServantHelper()->getAdapterServant(_data->adapter()->getName());
  152. _isTars = _data->adapter()->isTarsProtocol();
  153. if (_isTars)
  154. {
  155. initialize(_data->buffer());
  156. }
  157. }
  158. void Current::initializeClose(const shared_ptr<TC_EpollServer::RecvContext> &data)
  159. {
  160. _data = data;
  161. Application *application = (Application*)this->_servantHandle->getApplication();
  162. _request.sServantName = application->getServantHelper()->getAdapterServant(_data->adapter()->getName());
  163. }
  164. void Current::initialize(const vector<char>& sRecvBuffer)
  165. {
  166. TarsInputStream<BufferReader> is;
  167. is.setBuffer(sRecvBuffer.data(), sRecvBuffer.size());
  168. _request.readFrom(is);
  169. }
  170. void Current::sendResponse(const char *buff, uint32_t len)
  171. {
  172. shared_ptr<TC_EpollServer::SendContext> send = _data->createSendContext();
  173. // send->buffer()->assign(buff, len);
  174. send->buffer()->addBuffer(buff, len);
  175. _servantHandle->sendResponse(send);
  176. }
  177. void Current::sendResponse(int iRet, const vector<char> &buff)
  178. {
  179. //单向调用不需要返回
  180. if (_request.cPacketType == TARSONEWAY)
  181. {
  182. return;
  183. }
  184. ResponsePacket response;
  185. response.sBuffer = buff;
  186. response.iRet = iRet;
  187. sendResponse(response);
  188. }
  189. void Current::sendResponse(int iRet, const string &buff)
  190. {
  191. //单向调用不需要返回
  192. if (_request.cPacketType == TARSONEWAY)
  193. {
  194. return;
  195. }
  196. ResponsePacket response;
  197. response.iRet = iRet;
  198. response.sBuffer.assign(buff.begin(), buff.end());
  199. sendResponse(response);
  200. }
  201. void Current::sendResponse(int iRet)
  202. {
  203. ResponsePacket response;
  204. response.iRet = iRet;
  205. sendResponse(response);
  206. }
  207. void Current::sendResponse(int iRet, tars::TarsOutputStream<tars::BufferWriterVector>& os)
  208. {
  209. ResponsePacket response;
  210. response.iRet = iRet;
  211. os.swap(response.sBuffer);
  212. sendResponse(response);
  213. }
  214. void Current::sendResponse(int iRet, tup::UniAttribute<tars::BufferWriterVector, tars::BufferReader>& attr)
  215. {
  216. ResponsePacket response;
  217. response.iRet = iRet;
  218. attr.encode(response.sBuffer);
  219. sendResponse(response);
  220. }
  221. void Current::sendResponse(ResponsePacket &response)
  222. {
  223. _ret = response.iRet;
  224. //单向调用不需要返回
  225. if (_request.cPacketType == TARSONEWAY)
  226. {
  227. return;
  228. }
  229. shared_ptr<TC_EpollServer::SendContext> send = _data->createSendContext();
  230. Int32 iHeaderLen = 0;
  231. TarsOutputStream<BufferWriter> os;
  232. //先预留4个字节长度
  233. os.writeBuf((const char *)&iHeaderLen, sizeof(iHeaderLen));
  234. if (_request.iVersion != TUPVERSION)
  235. {
  236. response.iRequestId = _request.iRequestId;
  237. response.cPacketType = TARSNORMAL;
  238. response.iMessageType = _request.iMessageType;
  239. response.iVersion = _request.iVersion;
  240. // for(auto e: status)
  241. // {
  242. // response.status.emplace(e);
  243. // }
  244. response.context = _responseContext;
  245. response.status = _responseStatus;
  246. // response.sResultDesc = sResultDesc;
  247. // response.iRet = iRet;
  248. TLOGTARS("Current::sendResponse :"
  249. << response.iMessageType << "|"
  250. << _request.sServantName << "|"
  251. << _request.sFuncName << "|"
  252. << response.iRequestId << endl);
  253. response.writeTo(os);
  254. }
  255. else
  256. {
  257. //tup回应包用请求包的结构(这里和新版本TARS是有区别的)
  258. RequestPacket tupResponse;
  259. tupResponse.iRequestId = _request.iRequestId;
  260. tupResponse.iMessageType = _request.iMessageType;
  261. tupResponse.cPacketType = TARSNORMAL;
  262. tupResponse.iVersion = _request.iVersion;
  263. // for(auto e: status)
  264. // {
  265. // response.status.emplace(e);
  266. // }
  267. tupResponse.status = _responseStatus;
  268. tupResponse.context = _responseContext;
  269. tupResponse.sBuffer.swap(response.sBuffer);
  270. tupResponse.sServantName = _request.sServantName;
  271. tupResponse.sFuncName = _request.sFuncName;
  272. //异常的情况下buffer可能为空,要保证有一个空UniAttribute的编码内容
  273. if(tupResponse.sBuffer.size() == 0)
  274. {
  275. tup::UniAttribute<> tarsAttr;
  276. tarsAttr.setVersion(_request.iVersion);
  277. tarsAttr.encode(response.sBuffer);
  278. }
  279. //iRet为0时,不记录在status里面,节省空间
  280. if(response.iRet != 0)
  281. {
  282. tupResponse.status[ServantProxy::STATUS_RESULT_CODE] = TC_Common::tostr(response.iRet);
  283. }
  284. //sResultDesc为空时,不记录在status里面,节省空间
  285. if(!response.sResultDesc.empty())
  286. {
  287. tupResponse.status[ServantProxy::STATUS_RESULT_DESC] = response.sResultDesc;
  288. }
  289. TLOGTARS("Current::sendResponse :"
  290. << tupResponse.iMessageType << "|"
  291. << _request.sServantName << "|"
  292. << _request.sFuncName << "|"
  293. << tupResponse.iRequestId << endl);
  294. tupResponse.writeTo(os);
  295. }
  296. assert(os.getLength() >= 4);
  297. iHeaderLen = htonl((int)(os.getLength()));
  298. memcpy((void*)os.getBuffer(), (const char *)&iHeaderLen, sizeof(iHeaderLen));
  299. send->setBuffer(ProxyProtocol::toBuffer(os));
  300. _servantHandle->sendResponse(send);
  301. }
  302. void Current::sendPushResponse(int iRet, const string &funcName, TarsOutputStream<BufferWriterVector>& oss, const map<string, string> &context)
  303. {
  304. ResponsePacket response;
  305. oss.swap(response.sBuffer);
  306. response.iRet = iRet;
  307. response.iRequestId = 0;
  308. response.cPacketType = TARSNORMAL;
  309. response.iMessageType = 0;
  310. response.iVersion = TARSVERSION;
  311. response.context = context;
  312. response.status["TARS_FUNC"] = funcName;
  313. shared_ptr<TC_EpollServer::SendContext> send = _data->createSendContext();
  314. Int32 iHeaderLen = 0;
  315. TarsOutputStream<BufferWriter> os;
  316. //先预留4个字节长度
  317. os.writeBuf((const char *)&iHeaderLen, sizeof(iHeaderLen));
  318. response.writeTo(os);
  319. assert(os.getLength() >= 4);
  320. iHeaderLen = htonl((int)(os.getLength()));
  321. memcpy((void*)os.getBuffer(), (const char *)&iHeaderLen, sizeof(iHeaderLen));
  322. send->setBuffer(ProxyProtocol::toBuffer(os));
  323. _servantHandle->sendResponse(send);
  324. }
  325. void Current::close()
  326. {
  327. if (_servantHandle)
  328. {
  329. _servantHandle->close(_data);
  330. }
  331. }
  332. ServantHandle* Current::getServantHandle()
  333. {
  334. return _servantHandle;
  335. }
  336. TC_EpollServer::BindAdapter* Current::getBindAdapter()
  337. {
  338. return _data->adapter().get();
  339. }
  340. void Current::reportToStat(const string& sObj)
  341. {
  342. if(this->_servantHandle)
  343. {
  344. auto _comm = this->_servantHandle->getApplication()->getThisCommunicator();
  345. StatReport* stat = _comm->getStatReport();
  346. if (stat && stat->getStatPrx())
  347. {
  348. stat->report(sObj, _request.sFuncName, _data->ip(), 0, (StatReport::StatResult)_ret,
  349. TNOWMS - _data->recvTimeStamp(), 0, false);
  350. }
  351. }
  352. }
  353. void Current::setTrace(bool traceCall, const string& traceKey)
  354. {
  355. _traceCall = traceCall;
  356. _traceKey = traceKey;
  357. }
  358. bool Current::isTraced() const
  359. {
  360. return _traceCall;
  361. }
  362. string Current::getTraceKey() const
  363. {
  364. return _traceKey;
  365. }
  366. bool Current::connectionExists() const
  367. {
  368. return _data->connectionExists();
  369. }
  370. const string &Current::moduleName() const
  371. {
  372. return _servantHandle->getApplication()->getThisCommunicator()->clientConfig().ModuleName;
  373. }
  374. ////////////////////////////////////////////////////////////////////////////
  375. }