AdapterProxy.cpp 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "servant/AdapterProxy.h"
  17. #include "servant/Communicator.h"
  18. #include "servant/CommunicatorEpoll.h"
  19. #include "servant/StatReport.h"
  20. #include "servant/RemoteLogger.h"
  21. #include "tup/tup.h"
  22. #include "servant/StatF.h"
  23. // #ifdef TARS_OPENTRACKING
  24. // #include "servant/text_map_carrier.h"
  25. // #endif
  26. namespace tars
  27. {
  28. std::atomic<int> AdapterProxy::_idGen;
  29. AdapterProxy::AdapterProxy(ObjectProxy * pObjectProxy, const EndpointInfo &ep, Communicator* pCom)
  30. : _communicator(pCom)
  31. , _objectProxy(pObjectProxy)
  32. , _ep(ep)
  33. , _activeStateInReg(true)
  34. , _activeStatus(true)
  35. , _totalInvoke(0)
  36. , _timeoutInvoke(0)
  37. , _nextFinishInvokeTime(0)
  38. , _frequenceFailInvoke(0)
  39. , _frequenceFailTime(0)
  40. , _nextRetryTime(0)
  41. , _nextKeepAliveTime(0)
  42. //, _connTimeout(false)
  43. , _connExc(false)
  44. , _connExcCnt(0)
  45. , _timeoutLogFlag(true)
  46. , _noSendQueueLimit(100000)
  47. , _id((++_idGen))
  48. {
  49. _timeoutQueue.reset(new TC_TimeoutQueueNew<ReqMessage*>());
  50. if(pObjectProxy->getCommunicatorEpoll())
  51. {
  52. _noSendQueueLimit = pObjectProxy->getCommunicatorEpoll()->getNoSendQueueLimit();
  53. }
  54. if(_communicator)
  55. {
  56. _timeoutLogFlag = _communicator->getTimeoutLogFlag();
  57. }
  58. #if TARS_SSL
  59. if (ep.isSsl())
  60. {
  61. _trans.reset(new TC_SSLTransceiver(pObjectProxy->getCommunicatorEpoll()->getEpoller(), ep.getEndpoint()));
  62. }
  63. else if (ep.isTcp())
  64. {
  65. _trans.reset(new TC_TCPTransceiver(pObjectProxy->getCommunicatorEpoll()->getEpoller(), ep.getEndpoint()));
  66. }
  67. else
  68. {
  69. _trans.reset(new TC_UDPTransceiver(pObjectProxy->getCommunicatorEpoll()->getEpoller(), ep.getEndpoint()));
  70. }
  71. #else
  72. if (ep.isUdp())
  73. {
  74. _trans.reset(new TC_UDPTransceiver(pObjectProxy->getCommunicatorEpoll()->getEpoller(), ep.getEndpoint()));
  75. }
  76. else
  77. {
  78. _trans.reset(new TC_TCPTransceiver(pObjectProxy->getCommunicatorEpoll()->getEpoller(), ep.getEndpoint()));
  79. }
  80. #endif
  81. _trans->initializeClient(std::bind(&AdapterProxy::onCreateCallback, this, std::placeholders::_1),
  82. std::bind(&AdapterProxy::onCloseCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3),
  83. std::bind(&AdapterProxy::onConnectCallback, this, std::placeholders::_1),
  84. std::bind(&AdapterProxy::onRequestCallback, this, std::placeholders::_1),
  85. std::bind(&AdapterProxy::onParserCallback, this, std::placeholders::_1, std::placeholders::_2),
  86. std::bind(&AdapterProxy::onOpensslCallback, this, std::placeholders::_1),
  87. std::bind(&AdapterProxy::onCompletePackage, this, std::placeholders::_1));
  88. _trans->setClientAuthCallback(std::bind(&AdapterProxy::onSendAuthCallback, this, std::placeholders::_1),
  89. std::bind(&AdapterProxy::onVerifyAuthCallback, this, std::placeholders::_1, std::placeholders::_2));
  90. // if (!_endpoint.isTcp())
  91. // {
  92. // _checkTransInterval = 10; //udp端口10秒检查一次, 避免影响用户请求
  93. // }
  94. //初始化stat的head信息
  95. initStatHead();
  96. }
  97. AdapterProxy::~AdapterProxy()
  98. {
  99. }
  100. shared_ptr<TC_ProxyInfo> AdapterProxy::onCreateCallback(TC_Transceiver* trans)
  101. {
  102. // LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans << endl;
  103. _objectProxy->getCommunicatorEpoll()->addFd(this);
  104. trans->setConnTimeout(_objectProxy->getRootServantProxy()->tars_connect_timeout());
  105. if(_objectProxy->getRootServantProxy()->getProxyInfo())
  106. {
  107. return TC_ProxyInfo::createProxyInfo(*_objectProxy->getRootServantProxy()->getProxyInfo());
  108. }
  109. return NULL;
  110. }
  111. std::shared_ptr<TC_OpenSSL> AdapterProxy::onOpensslCallback(TC_Transceiver* trans)
  112. {
  113. // LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans << endl;
  114. return _objectProxy->getCommunicatorEpoll()->getCommunicator()->newClientSSL(_objectProxy->name());
  115. }
  116. void AdapterProxy::onCloseCallback(TC_Transceiver* trans, TC_Transceiver::CloseReason reason, const string &err)
  117. {
  118. if(_objectProxy->getRootServantProxy()->tars_get_push_callback())
  119. {
  120. _objectProxy->getRootServantProxy()->tars_get_push_callback()->onClose();
  121. }
  122. int millisecond =_objectProxy->reconnect();
  123. if (millisecond <= 0)
  124. {
  125. return;
  126. }
  127. if (_objectProxy->reconnectActiveInReg() && !isActiveInReg())
  128. {
  129. return;
  130. }
  131. _objectProxy->getCommunicatorEpoll()->reConnect(TNOWMS + millisecond, trans);
  132. TLOGERROR("[trans close:" << _objectProxy->name() << "," << trans->getConnectEndpoint().toString() << ", reconnect:" << millisecond << " ms]" << endl);
  133. }
  134. void AdapterProxy::onConnectCallback(TC_Transceiver* trans)
  135. {
  136. // LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans << endl;
  137. addConnExc(false);
  138. if(_objectProxy->getRootServantProxy()->tars_get_push_callback())
  139. {
  140. _objectProxy->getRootServantProxy()->tars_get_push_callback()->onConnect(trans->getConnectEndpoint());
  141. }
  142. _objectProxy->onConnect(this);
  143. }
  144. void AdapterProxy::onRequestCallback(TC_Transceiver* trans)
  145. {
  146. // LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans << endl;
  147. doInvoke();
  148. }
  149. TC_NetWorkBuffer::PACKET_TYPE AdapterProxy::onParserCallback(TC_NetWorkBuffer& buff, TC_Transceiver* trans)
  150. {
  151. // LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans<< endl;
  152. try
  153. {
  154. shared_ptr<ResponsePacket> rsp = std::make_shared<ResponsePacket>();
  155. TC_NetWorkBuffer::PACKET_TYPE ret = _objectProxy->getRootServantProxy()->tars_get_protocol().responseFunc(buff, *rsp.get());
  156. if(ret == TC_NetWorkBuffer::PACKET_FULL || ret == TC_NetWorkBuffer::PACKET_FULL_CLOSE)
  157. {
  158. finishInvoke(rsp);
  159. }
  160. return ret;
  161. }
  162. catch(exception &ex)
  163. {
  164. TLOG_ERROR(ex.what() << ", obj: " << _objectProxy->name() << ", desc:" << _trans->getConnectionString()<< endl);
  165. }
  166. return TC_NetWorkBuffer::PACKET_ERR;
  167. }
  168. void AdapterProxy::onCompletePackage(TC_Transceiver* trans)
  169. {
  170. // LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans << endl;
  171. if(_objectProxy->getRootServantProxy()->tars_connection_serial() > 0)
  172. {
  173. _objectProxy->prepareConnection(this);
  174. if(!_timeoutQueue->sendListEmpty())
  175. {
  176. //并行连接模式, 继续发起连接, 建立连接后, 会自动doInvoke发包
  177. if(_trans->hasConnected())
  178. {
  179. doInvoke();
  180. }
  181. else
  182. {
  183. checkActive(true);
  184. }
  185. }
  186. }
  187. }
  188. shared_ptr<TC_NetWorkBuffer::Buffer> AdapterProxy::onSendAuthCallback(TC_Transceiver* trans)
  189. {
  190. // LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans << endl;
  191. // 走框架的AK/SK认证
  192. BasicAuthInfo info;
  193. info.sObjName = _objectProxy->name();
  194. info.sAccessKey = _objectProxy->getCommunicatorEpoll()->getCommunicator()->getServantProperty(_objectProxy->name(), "accesskey");
  195. info.sSecretKey = _objectProxy->getCommunicatorEpoll()->getCommunicator()->getServantProperty(_objectProxy->name(), "secretkey");
  196. // std::string out = tars::defaultCreateAuthReq(info);
  197. const int kAuthType = 0x40;
  198. RequestPacket request;
  199. request.sFuncName = "InnerAuthServer";
  200. request.sServantName = _objectProxy->name();
  201. request.iVersion = TARSVERSION;
  202. request.iRequestId = 1;
  203. request.cPacketType = TARSNORMAL;
  204. request.iMessageType = kAuthType;
  205. request.sBuffer = tars::defaultCreateAuthReq(info);//.assign(out.begin(), out.end());
  206. return _objectProxy->getRootServantProxy()->tars_get_protocol().requestFunc(request, trans);
  207. }
  208. TC_NetWorkBuffer::PACKET_TYPE AdapterProxy::onVerifyAuthCallback(TC_NetWorkBuffer &buff, TC_Transceiver*trans)
  209. {
  210. shared_ptr<ResponsePacket> rsp = std::make_shared<ResponsePacket>();
  211. const int kAuthType = 0x40;
  212. rsp->iMessageType = kAuthType;
  213. TC_NetWorkBuffer::PACKET_TYPE ret = _objectProxy->getRootServantProxy()->tars_get_protocol().responseFunc(buff, *rsp.get());
  214. if(ret == TC_NetWorkBuffer::PACKET_FULL)
  215. {
  216. // std::string ret(rsp->sBuffer.begin(), rsp->sBuffer.end());
  217. if(TC_Port::strncasecmp(rsp->sBuffer.data(), "AUTH_SUCC", rsp->sBuffer.size()) == 0)
  218. {
  219. return TC_NetWorkBuffer::PACKET_FULL;
  220. }
  221. return TC_NetWorkBuffer::PACKET_ERR;
  222. }
  223. return ret;
  224. }
  225. void AdapterProxy::initStatHead()
  226. {
  227. vector <string> vtSetInfo;
  228. auto clientConfig = this->_communicator->clientConfig();
  229. if(!clientConfig.SetDivision.empty() && StatReport::divison2SetInfo(clientConfig.SetDivision, vtSetInfo)) {
  230. //主调(client)启用set
  231. _statHead.masterName = StatReport::trimAndLimitStr(clientConfig.ModuleName + "." + vtSetInfo[0] + vtSetInfo[1] + vtSetInfo[2] + "@" + clientConfig.TarsVersion, StatReport::MAX_MASTER_NAME_LEN);
  232. }
  233. else
  234. {
  235. _statHead.masterName = StatReport::trimAndLimitStr(clientConfig.ModuleName + "@" + clientConfig.TarsVersion, StatReport::MAX_MASTER_NAME_LEN);
  236. }
  237. string sSlaveSet = _ep.setDivision();
  238. const string sSlaveName = getSlaveName(_objectProxy->name());
  239. if (!sSlaveSet.empty() && StatReport::divison2SetInfo(sSlaveSet, vtSetInfo)) //被调启用set
  240. {
  241. _statHead.slaveSetName = vtSetInfo[0];
  242. _statHead.slaveSetArea = vtSetInfo[1];
  243. _statHead.slaveSetID = vtSetInfo[2];
  244. _statHead.slaveName = StatReport::trimAndLimitStr(sSlaveName + "." + vtSetInfo[0] + vtSetInfo[1] + vtSetInfo[2], StatReport::MAX_MASTER_NAME_LEN);
  245. }
  246. else
  247. {
  248. _statHead.slaveName = StatReport::trimAndLimitStr(sSlaveName, StatReport::MAX_MASTER_NAME_LEN);
  249. }
  250. _statHead.slaveIp = StatReport::trimAndLimitStr(_ep.host(), StatReport::MAX_MASTER_IP_LEN);
  251. _statHead.slavePort = _ep.port();
  252. _statHead.returnValue = 0;
  253. }
  254. string AdapterProxy::getSlaveName(const string& sSlaveName)
  255. {
  256. string::size_type pos = sSlaveName.find(".");
  257. if (pos != string::npos)
  258. {
  259. pos = sSlaveName.find(".", pos + 1);
  260. if (pos != string::npos)
  261. {
  262. return sSlaveName.substr(0, pos);
  263. }
  264. }
  265. return sSlaveName;
  266. }
  267. void AdapterProxy::onConnect()
  268. {
  269. _objectProxy->onConnect(this);
  270. }
  271. int AdapterProxy::invoke_connection_serial(ReqMessage * msg)
  272. {
  273. assert(msg->eType != ReqMessage::ONE_WAY);
  274. msg->sReqData = _objectProxy->getRootServantProxy()->tars_get_protocol().requestFunc(msg->request, _trans.get());
  275. msg->request.iRequestId = _timeoutQueue->generateId();
  276. if(!_requestMsg && _timeoutQueue->sendListEmpty())
  277. {
  278. int ret = _trans->sendRequest(msg->sReqData);
  279. if(ret == TC_Transceiver::eRetOk || ret == TC_Transceiver::eRetFull)
  280. {
  281. TLOGTARS("[AdapterProxy::invoke_connection_serial push (send) obj: " << _objectProxy->name() << ", desc:" << _trans->getConnectionString() << ", id: " << msg->request.iRequestId << endl);
  282. _requestMsg = msg;
  283. bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime);
  284. if (!bFlag)
  285. {
  286. TLOGERROR("[AdapterProxy::invoke_connection_serial fail1 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ",id: " << msg->request.iRequestId << "," << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  287. _requestMsg = NULL;
  288. msg->eStatus = ReqMessage::REQ_EXC;
  289. finishInvoke(msg);
  290. }
  291. return 0;
  292. }
  293. else if(ret == TC_Transceiver::eRetError)
  294. {
  295. //发送出错了
  296. _requestMsg = NULL;
  297. msg->eStatus = ReqMessage::REQ_EXC;
  298. finishInvoke(msg);
  299. return -1;
  300. }
  301. }
  302. //数据没有发送
  303. TLOGTARS("[AdapterProxy::invoke_connection_serial push (no send) " << _objectProxy->name() << ", " << _trans->getConnectionString() << ",id " << msg->request.iRequestId << ", " << _requestMsg << endl);
  304. //数据没有发送
  305. bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime, false);
  306. if (!bFlag)
  307. {
  308. TLOGERROR("[AdapterProxy::invoke_connection_serial fail2 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ", id: " << msg->request.iRequestId << ", " << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  309. msg->eStatus = ReqMessage::REQ_EXC;
  310. finishInvoke(msg);
  311. }
  312. return 0;
  313. }
  314. int AdapterProxy::invoke_connection_parallel(ReqMessage * msg)
  315. {
  316. msg->sReqData = _objectProxy->getRootServantProxy()->tars_get_protocol().requestFunc(msg->request, _trans.get());
  317. //当前队列是空的, 且是连接复用模式, 交给连接发送数据
  318. //连接连上 buffer不为空 发送数据成功
  319. if (_timeoutQueue->sendListEmpty())
  320. {
  321. int ret = _trans->sendRequest(msg->sReqData);
  322. if(ret == TC_Transceiver::eRetOk || ret == TC_Transceiver::eRetFull)
  323. {
  324. TLOGTARS("[AdapterProxy::invoke_connection_parallel push (send) obj: " << _objectProxy->name() << ", desc:" << _trans->getConnectionString() << ", id: " << msg->request.iRequestId << endl);
  325. //请求发送成功了 处理采样
  326. //这个请求发送成功了。单向调用直接返回
  327. if (msg->eType == ReqMessage::ONE_WAY)
  328. {
  329. // #ifdef TARS_OPENTRACKING
  330. // finishTrack(msg);
  331. // #endif
  332. delete msg;
  333. msg = NULL;
  334. return 0;
  335. }
  336. bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime);
  337. if (!bFlag)
  338. {
  339. TLOGERROR("[AdapterProxy::invoke_connection_parallel fail1 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ",id: " << msg->request.iRequestId << "," << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  340. msg->eStatus = ReqMessage::REQ_EXC;
  341. finishInvoke(msg);
  342. }
  343. return 0;
  344. }
  345. else if(ret == TC_Transceiver::eRetError)
  346. {
  347. TLOGTARS("[AdapterProxy::invoke_connection_parallel send request failed,queue size:" << _timeoutQueue->size() << ",id: " << msg->request.iRequestId << "," << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  348. //发送出错了
  349. msg->eStatus = ReqMessage::REQ_EXC;
  350. finishInvoke(msg);
  351. return -1;
  352. }
  353. }
  354. //没有发送数据
  355. TLOGTARS("[AdapterProxy::invoke_connection_parallel push (no send) " << _objectProxy->name() << ", " << _trans->getConnectionString() << ",id " << msg->request.iRequestId << endl);
  356. //之前还没有数据没发送 或者 请求发送失败了, 进队列
  357. bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime, false);
  358. if (!bFlag)
  359. {
  360. TLOGERROR("[AdapterProxy::invoke_connection_parallel fail2 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ", id: " << msg->request.iRequestId << ", " << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  361. msg->eStatus = ReqMessage::REQ_EXC;
  362. finishInvoke(msg);
  363. }
  364. return 0;
  365. }
  366. int AdapterProxy::invoke(ReqMessage * msg)
  367. {
  368. assert(_trans != NULL);
  369. TLOGTARS("[AdapterProxy::invoke " << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  370. //未发链表有长度限制
  371. if (_timeoutQueue->getSendListSize() > _noSendQueueLimit)
  372. {
  373. TLOGERROR("[AdapterProxy::invoke fail,ReqInfoQueue.size(" << _timeoutQueue->getSendListSize() << ") > " << _noSendQueueLimit << "," << _objectProxy->name() << "," << _trans->getConnectionString() << "]" << endl);
  374. msg->eStatus = ReqMessage::REQ_EXC;
  375. msg->response->iRet = tars::TARSSENDREQUESTERR;
  376. finishInvoke(msg);
  377. return 0;
  378. }
  379. //生成requestid
  380. //tars调用 而且 不是单向调用
  381. if (!msg->bFromRpc)
  382. {
  383. msg->request.iRequestId = _timeoutQueue->generateId();
  384. }
  385. // #ifdef TARS_OPENTRACKING
  386. // startTrack(msg);
  387. // #endif
  388. if(_objectProxy->getRootServantProxy()->tars_connection_serial() > 0)
  389. {
  390. return invoke_connection_serial(msg);
  391. }
  392. else
  393. {
  394. return invoke_connection_parallel(msg);
  395. }
  396. }
  397. void AdapterProxy::doInvoke_serial()
  398. {
  399. if(_requestMsg != NULL || _timeoutQueue->sendListEmpty())
  400. {
  401. //有请求 or 发送队列是空的, 当前请求不再发送 等epoll事件通知 在doRequest中发送
  402. TLOGTARS("[AdapterProxy::doInvoke_serial not send obj:" << _objectProxy->name() << ",desc:" << _trans->getConnectionString() << ", send size:" << _timeoutQueue->getSendListSize() << ", " << _requestMsg << endl);
  403. return;
  404. }
  405. ReqMessage * msg = NULL;
  406. _timeoutQueue->getSend(msg);
  407. int iRet = _trans->sendRequest(msg->sReqData);
  408. if(iRet == TC_Transceiver::eRetError)
  409. {
  410. _requestMsg = NULL;
  411. _timeoutQueue->popSend(true);
  412. msg->response->iRet = TARSSENDREQUESTERR;
  413. TLOGTARS("[AdapterProxy::doInvoke_serial sendRequest failed, obj:" << _objectProxy->name() << ",desc:" << _trans->getConnectionString() << ",id:" << msg->request.iRequestId << ", ret:" << iRet << endl);
  414. finishInvoke(msg);
  415. }
  416. else if(iRet == TC_Transceiver::eRetOk || iRet == TC_Transceiver::eRetFull)
  417. {
  418. _requestMsg = msg;
  419. //从发送send 队列中清掉, 但是保留在定时队列中
  420. _timeoutQueue->popSend(false);
  421. }
  422. }
  423. void AdapterProxy::doInvoke_parallel()
  424. {
  425. while(!_timeoutQueue->sendListEmpty())
  426. {
  427. ReqMessage * msg = NULL;
  428. _timeoutQueue->getSend(msg);
  429. int iRet = _trans->sendRequest(msg->sReqData);
  430. //发送失败 or 没有发送
  431. if (iRet == TC_Transceiver::eRetError)
  432. {
  433. TLOGTARS("[AdapterProxy::doInvoke_parallel sendRequest failed, obj:" << _objectProxy->name() << ",desc:" << _trans->getConnectionString() << ",id:" << msg->request.iRequestId << ", ret:" << iRet << endl);
  434. return;
  435. }
  436. if (iRet == TC_Transceiver::eRetNotSend)
  437. {
  438. TLOGTARS("[AdapterProxy::doInvoke_parallel sendRequest not send, obj:" << _objectProxy->name() << ",desc:" << _trans->getConnectionString() << ",id:" << msg->request.iRequestId << ", ret:" << iRet << endl);
  439. return;
  440. }
  441. //发送完成,要从队列里面清掉
  442. _timeoutQueue->popSend(msg->eType == ReqMessage::ONE_WAY);
  443. if (msg->eType == ReqMessage::ONE_WAY)
  444. {
  445. delete msg;
  446. msg = NULL;
  447. }
  448. //发送buffer已经满了 要返回
  449. if (iRet == TC_Transceiver::eRetFull)
  450. {
  451. return;
  452. }
  453. }
  454. }
  455. void AdapterProxy::doInvoke()
  456. {
  457. if(_objectProxy->getRootServantProxy()->tars_connection_serial() > 0)
  458. {
  459. doInvoke_serial();
  460. }
  461. else
  462. {
  463. doInvoke_parallel();
  464. }
  465. }
  466. void AdapterProxy::finishInvoke(bool bFail)
  467. {
  468. TLOGTARS("[AdapterProxy::finishInvoke(bool), " << _objectProxy->name() << ", " << _trans->getConnectionString() << "," << bFail << "]" << endl);
  469. time_t now = TNOW;
  470. const CheckTimeoutInfo& info = _objectProxy->getRootServantProxy()->tars_check_timeout_info();
  471. //处于异常状态 已经屏蔽
  472. if (!_activeStatus)
  473. {
  474. if (!bFail)
  475. {
  476. //重试成功,恢复正常状态
  477. _activeStatus = true;
  478. //连续失败次数清零
  479. _frequenceFailInvoke = 0;
  480. _nextFinishInvokeTime = now + info.checkTimeoutInterval;
  481. _frequenceFailInvoke = 0;
  482. _totalInvoke = 1;
  483. _timeoutInvoke = 0;
  484. _trans->setIsConnTimeout(false);
  485. _connExc = false;
  486. TLOGTARS("[AdapterProxy::finishInvoke(bool), " << _objectProxy->name() << ", " << _trans->getConnectionString() << ", retry ok]" << endl);
  487. }
  488. else
  489. {
  490. //结点已经屏蔽 过来失败的包不用处理
  491. TLOGTARS("[AdapterProxy::finishInvoke(bool), " << _objectProxy->name() << ", " << _trans->getConnectionString() << ", retry fail]" << endl);
  492. }
  493. return;
  494. }
  495. ++_totalInvoke;
  496. if (bFail)
  497. {
  498. //调用失败
  499. //失败次数+1
  500. ++_timeoutInvoke;
  501. //连续失败时间间隔重新计算
  502. if (0 == _frequenceFailInvoke)
  503. {
  504. _frequenceFailTime = now + info.minFrequenceFailTime;
  505. }
  506. //连续失败次数加1
  507. _frequenceFailInvoke++;
  508. //检查是否到了连续失败次数,且至少在5s以上
  509. if (_frequenceFailInvoke >= info.frequenceFailInvoke && now >= _frequenceFailTime)
  510. {
  511. setInactive();
  512. _activeStatus = false;
  513. resetRetryTime();
  514. TLOGERROR("[AdapterProxy::finishInvoke(bool) objname:"<< _objectProxy->name()
  515. << ",desc:" << _trans->getConnectionString()
  516. << ",disable frequenceFail,freqtimeout:" << _frequenceFailInvoke
  517. << ",timeout:"<< _timeoutInvoke
  518. << ",total:" << _totalInvoke << endl);
  519. return ;
  520. }
  521. }
  522. else
  523. {
  524. _frequenceFailInvoke = 0;
  525. }
  526. //判断一段时间内的超时比例
  527. if (now > _nextFinishInvokeTime)
  528. {
  529. _nextFinishInvokeTime = now + info.checkTimeoutInterval;
  530. if (bFail && _timeoutInvoke >= info.minTimeoutInvoke && _timeoutInvoke >= info.radio * _totalInvoke)
  531. {
  532. setInactive();
  533. TLOGERROR("[AdapterProxy::finishInvoke(bool), "
  534. << _objectProxy->name() << "," << _trans->getConnectionString()
  535. << ",disable radioFail,freqtimeout:" << _frequenceFailInvoke
  536. << ",timeout:" << _timeoutInvoke
  537. << ",total:" << _totalInvoke << "] " << endl);
  538. }
  539. else
  540. {
  541. //每一分钟清空一次
  542. _totalInvoke = 0;
  543. _timeoutInvoke = 0;
  544. }
  545. }
  546. }
  547. void AdapterProxy::resetRetryTime(bool next)
  548. {
  549. if(next) {
  550. _nextRetryTime = TNOW + _objectProxy->getRootServantProxy()->tars_check_timeout_info().tryTimeInterval;
  551. }else {
  552. _nextRetryTime = TNOW;
  553. }
  554. }
  555. bool AdapterProxy::checkActive(bool connecting)
  556. {
  557. time_t now = TNOW;
  558. TLOGTARS("[AdapterProxy::checkActive,"
  559. << _objectProxy->name() << "," << _trans->getConnectionString() << ","
  560. << (_activeStatus ? "active" : "inactive")
  561. << (connecting ? ", connecting" : "")
  562. << ", freqtimeout:" << _frequenceFailInvoke
  563. << ", timeout:" << _timeoutInvoke
  564. << ", connExcCnt:" << _connExcCnt
  565. << ", total:" << _totalInvoke << "]" << endl);
  566. //失效且没有到下次重试时间, 直接返回不可用
  567. if ((!_activeStatus) && (now < _nextRetryTime))
  568. {
  569. TLOGTARS("[AdapterProxy::checkActive,not reach retry time ," << _objectProxy->name() << ","
  570. << _trans->getConnectionString() << endl);
  571. return false;
  572. }
  573. if (!_activeStatus)
  574. {
  575. resetRetryTime();
  576. }
  577. //连接没有建立或者连接无效, 重新建立连接
  578. if (!_trans->isValid())
  579. {
  580. try
  581. {
  582. _trans->connect();
  583. }
  584. catch (exception & ex)
  585. {
  586. _activeStatus = false;
  587. _trans->close();
  588. TLOGERROR("[AdapterProxy::checkActive connect obj:" << _objectProxy->name() << ",desc:" << _trans->getConnectionString() << ", ex:" << ex.what() << endl);
  589. }
  590. }
  591. if(connecting && _activeStatus) {
  592. //hash模式, 且是第一次连接(_activeStatus=true, 即没有失败过), 返回已经连接或者正在连接的, 这样保证第一次hash不会错且连接挂过以后, 不会马上就使用, 直到连接成功才使用!
  593. return (_trans->hasConnected() || _trans->isConnecting());
  594. }
  595. else {
  596. return _trans->hasConnected();
  597. }
  598. }
  599. void AdapterProxy::onSetInactive()
  600. {
  601. _activeStatus = false;
  602. resetRetryTime();
  603. //需要关闭连接
  604. _trans->close();
  605. }
  606. //屏蔽结点
  607. void AdapterProxy::setInactive()
  608. {
  609. onSetInactive();
  610. _objectProxy->getRootServantProxy()->onSetInactive(_ep);
  611. TLOGTARS("[AdapterProxy::setInactive, " << _objectProxy->name() << ", " << _trans->getConnectionString() << ", inactive]" << endl);
  612. }
  613. void AdapterProxy::finishInvoke_serial(shared_ptr<ResponsePacket> & rsp)
  614. {
  615. TLOGTARS("[AdapterProxy::finishInvoke_serial, " << _objectProxy->name() << ", " << _trans->getConnectionString() << ", id:" << rsp->iRequestId << "]" << endl);
  616. if (!_requestMsg)
  617. {
  618. if(_timeoutLogFlag)
  619. {
  620. TLOGERROR("[AdapterProxy::finishInvoke_serial,"
  621. << _objectProxy->name()
  622. << ", get req-ptr NULL,may be timeout,id:"
  623. << rsp->iRequestId << ",desc:" << _trans->getConnectionString() << "]" << endl);
  624. }
  625. return;
  626. }
  627. ReqMessage * msg = _requestMsg;
  628. //这里的队列中的发送链表中的数据可能已经在timeout的时候删除了,因此可能会core,在erase中要加判断
  629. //获取请求信息
  630. bool retErase = _timeoutQueue->erase(_requestMsg->request.iRequestId, msg);
  631. assert(retErase);
  632. assert(_requestMsg == msg);
  633. assert(msg->eType != ReqMessage::ONE_WAY);
  634. assert(msg->eStatus == ReqMessage::REQ_REQ);
  635. _requestMsg = NULL;
  636. msg->eStatus = ReqMessage::REQ_RSP;
  637. msg->response = rsp;
  638. finishInvoke(msg);
  639. }
  640. void AdapterProxy::finishInvoke_parallel(shared_ptr<ResponsePacket> & rsp)
  641. {
  642. TLOGTARS("[AdapterProxy::finishInvoke_parallel, " << _objectProxy->name() << ", " << _trans->getConnectionString() << ", id:" << rsp->iRequestId << "]" << endl);
  643. ReqMessage * msg = NULL;
  644. if (rsp->iRequestId == 0)
  645. {
  646. //requestid 为0 是push消息, push callback is null
  647. if (!_objectProxy->getRootServantProxy()->tars_get_push_callback())
  648. {
  649. TLOGERROR("[AdapterProxy::finishInvoke(BasePacket), request id is 0, pushcallback is null, " << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  650. throw TarsDecodeException("request id is 0, pushcallback is null, obj: " + _objectProxy->name() + ", desc: " + _trans->getConnectionString());
  651. }
  652. msg = new ReqMessage();
  653. msg->eStatus = ReqMessage::REQ_RSP;
  654. msg->eType = ReqMessage::ASYNC_CALL;
  655. msg->bFromRpc = true;
  656. msg->bPush = true;
  657. msg->proxy = _objectProxy->getServantProxy();
  658. msg->pObjectProxy = _objectProxy;
  659. msg->adapter = this;
  660. msg->callback = _objectProxy->getRootServantProxy()->tars_get_push_callback();
  661. }
  662. else
  663. {
  664. //这里的队列中的发送链表中的数据可能已经在timeout的时候删除了,因此可能会core,在erase中要加判断
  665. //获取请求信息
  666. bool retErase = _timeoutQueue->erase(rsp->iRequestId, msg);
  667. //找不到此id信息
  668. if (!retErase)
  669. {
  670. if (_timeoutLogFlag)
  671. {
  672. TLOGERROR("[AdapterProxy::finishInvoke_parallel,"
  673. << _objectProxy->name()
  674. << ",get req-ptr NULL,may be timeout,id:"
  675. << rsp->iRequestId << ",desc:" << _trans->getConnectionString() << "]" << endl);
  676. }
  677. return ;
  678. }
  679. assert(msg->eStatus == ReqMessage::REQ_REQ);
  680. msg->eStatus = ReqMessage::REQ_RSP;
  681. }
  682. msg->response = rsp;
  683. finishInvoke(msg);
  684. }
  685. void AdapterProxy::finishInvoke(shared_ptr<ResponsePacket> & rsp)
  686. {
  687. if(_objectProxy->getRootServantProxy()->tars_connection_serial() > 0)
  688. {
  689. finishInvoke_serial(rsp);
  690. }
  691. else
  692. {
  693. finishInvoke_parallel(rsp);
  694. }
  695. }
  696. void AdapterProxy::finishInvoke(ReqMessage * msg)
  697. {
  698. // assert(msg->eStatus != ReqMessage::REQ_REQ);
  699. TLOGTARS("[AdapterProxy::finishInvokeMsg " << _objectProxy->name() << ", " << _trans->getConnectionString() << " ,id:" << msg->response->iRequestId << "]" << endl);
  700. // #ifdef TARS_OPENTRACKING
  701. // finishTrack(msg);
  702. // #endif
  703. //单向调用
  704. if (msg->eType == ReqMessage::ONE_WAY)
  705. {
  706. TLOGTARS("[AdapterProxy::finishInvokeMsg " << _objectProxy->name() << ", " << _trans->getConnectionString()
  707. << " ,id:" << msg->response->iRequestId
  708. << " ,one way call]" << endl);
  709. delete msg;
  710. msg = NULL;
  711. return ;
  712. }
  713. //stat 上报调用统计
  714. stat(msg);
  715. //超时屏蔽统计,异常不算超时统计
  716. if (msg->eStatus != ReqMessage::REQ_EXC && !msg->bPush)
  717. {
  718. finishInvoke(msg->response->iRet != TARSSERVERSUCCESS);
  719. }
  720. //同步调用,唤醒ServantProxy线程
  721. if (msg->eType == ReqMessage::SYNC_CALL)
  722. {
  723. if (!msg->sched)
  724. {
  725. assert(msg->pMonitor);
  726. msg->pMonitor->notify();
  727. }
  728. else
  729. {
  730. msg->sched->put(msg->iCoroId);
  731. }
  732. return ;
  733. }
  734. //异步调用
  735. if (msg->eType == ReqMessage::ASYNC_CALL)
  736. {
  737. if(!msg->sched)
  738. {
  739. if (msg->callback->getNetThreadProcess())
  740. {
  741. //如果是本线程的回调,直接本线程处理
  742. //比如获取endpoint
  743. ReqMessagePtr msgPtr = msg;
  744. try
  745. {
  746. msg->callback->dispatch(msgPtr);
  747. }
  748. catch (exception & e)
  749. {
  750. TLOGERROR("[AdapterProxy::finishInvoke(ReqMessage) exp:" << e.what() << " ,line:" << __LINE__ << "]" << endl);
  751. }
  752. catch (...)
  753. {
  754. TLOGERROR("[AdapterProxy::finishInvoke(ReqMessage) exp:unknown line:|" << __LINE__ << "]" << endl);
  755. }
  756. }
  757. else
  758. {
  759. //异步回调,放入回调处理线程中
  760. _objectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);
  761. }
  762. }
  763. else
  764. {
  765. CoroParallelBasePtr ptr = msg->callback->getCoroParallelBasePtr();
  766. if (ptr)
  767. {
  768. ptr->insert(msg);
  769. if (ptr->checkAllReqReturn())
  770. {
  771. msg->sched->put(msg->iCoroId);
  772. }
  773. }
  774. else
  775. {
  776. TLOGERROR("[AdapterProxy::finishInvoke(ReqMessage) coro parallel callback error,obj:" << _objectProxy->name() << ",endpoint:" << _trans->getConnectionString() << " ,id:" << msg->response->iRequestId << "]" << endl);
  777. delete msg;
  778. msg = NULL;
  779. }
  780. }
  781. return;
  782. }
  783. assert(false);
  784. return;
  785. }
  786. void AdapterProxy::doTimeout()
  787. {
  788. ReqMessage * msg;
  789. while (_timeoutQueue->timeout(msg))
  790. {
  791. TLOGTARS("[AdapterProxy::doTimeout, " << _objectProxy->name() << ", " << _trans->getConnectionString() << ", id:" << msg->request.iRequestId << ", status:" << msg->eStatus << "]" << endl);
  792. // assert(msg->eStatus == ReqMessage::REQ_REQ);
  793. if(msg == _requestMsg)
  794. {
  795. _requestMsg = NULL;
  796. //timeout, close
  797. _trans->close();
  798. }
  799. msg->eStatus = ReqMessage::REQ_TIME;
  800. //有可能是单向调用超时了
  801. if (msg->eType == ReqMessage::ONE_WAY)
  802. {
  803. delete msg;
  804. msg = NULL;
  805. continue;
  806. }
  807. //如果是异步调用超时
  808. if (msg->eType == ReqMessage::ASYNC_CALL)
  809. {
  810. //_connExcCnt大于0说明是网络连接异常引起的超时
  811. msg->response->iRet = (_connExcCnt > 0 ? TARSPROXYCONNECTERR : TARSASYNCCALLTIMEOUT);
  812. }
  813. finishInvoke(msg);
  814. }
  815. }
  816. void AdapterProxy::doKeepAlive()
  817. {
  818. if (!checkActive(false))
  819. {
  820. return;
  821. }
  822. time_t now = TNOW;
  823. if (now < _nextKeepAliveTime)
  824. {
  825. return;
  826. }
  827. _nextKeepAliveTime = now + _communicator->getKeepAliveInterval();
  828. TLOGTARS("[AdapterProxy::doKeepAlive, " << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  829. ReqMessage *msg = new ReqMessage();
  830. msg->init(ReqMessage::ONE_WAY, _objectProxy->getServantProxy());
  831. msg->callback = NULL;
  832. msg->request.iVersion = TARSVERSION;
  833. msg->request.cPacketType = TARSONEWAY;
  834. msg->request.sFuncName = "tars_ping";
  835. msg->request.sServantName = _objectProxy->name();
  836. msg->request.iTimeout = ServantProxy::DEFAULT_ASYNCTIMEOUT;
  837. msg->proxy = _objectProxy->getServantProxy();
  838. msg->response->iRet = TARSSERVERUNKNOWNERR;
  839. //调用发起时间
  840. msg->iBeginTime = TNOWMS;
  841. msg->pObjectProxy = _objectProxy;
  842. invoke(msg);
  843. }
  844. // #ifdef TARS_OPENTRACKING
  845. // void AdapterProxy::startTrack(ReqMessage * msg)
  846. // {
  847. // if(!_communicator->_traceManager)
  848. // {
  849. // TLOGTARS("tracer info is null, just return" << endl);
  850. // return;
  851. // }
  852. // string functionName = msg->request.sFuncName;
  853. // std::unique_ptr<opentracing::Span> span;
  854. // if(msg->trackInfoMap.empty()) //start a new track
  855. // {
  856. // //std::chrono::time_point<std::chrono::system_clock> t1 = std::chrono::system_clock::now();
  857. // // _communicator->_traceManager->_tracer->StartSpan(functionName, {opentracing::StartTimestamp(t1)});
  858. // span = _communicator->_traceManager->_tracer->StartSpan(functionName);
  859. // }else{
  860. // TextMapCarrier carrier1(msg->trackInfoMap);
  861. // auto span_context_maybe = _communicator->_traceManager->_tracer->Extract(carrier1);
  862. // assert(span_context_maybe);
  863. // //std::chrono::time_point<std::chrono::system_clock> t1 = std::chrono::system_clock::now();
  864. // //_communicator->_traceManager->_tracer->StartSpan(functionName, {opentracing::ChildOf(span_context_maybe->get()), opentracing::StartTimestamp(t1)});
  865. // span = _communicator->_traceManager->_tracer->StartSpan(functionName, {opentracing::ChildOf(span_context_maybe->get())});
  866. // }
  867. // //将调用链信息注入到request的status中
  868. // std::unordered_map<std::string, std::string> text_map;
  869. // TextMapCarrier carrier(text_map);
  870. // auto err = _communicator->_traceManager->_tracer->Inject(span->context(), carrier);
  871. // assert(err);
  872. // std::string contxt = read_span_context(text_map);
  873. // _spanMap[msg->request.iRequestId].reset(span.release());
  874. // //_spanMap.insert(std::move(make_pair(msg->request.iRequestId, std::move(span))));
  875. // msg->request.status[ServantProxy::STATUS_TRACK_KEY] = contxt;
  876. // SET_MSG_TYPE(msg->request.iMessageType, tars::TARSMESSAGETYPETRACK);
  877. // }
  878. // void AdapterProxy::finishTrack(ReqMessage * msg)
  879. // {
  880. // map<int,std::unique_ptr<opentracing::Span>>::iterator spanIter = _spanMap.find(msg->request.iRequestId);
  881. // //report span info to zipkin collector
  882. // if(spanIter != _spanMap.end())
  883. // {
  884. // if(msg->eType == ReqMessage::ONE_WAY)
  885. // {
  886. // spanIter->second->SetTag("Retcode", 0);
  887. // }
  888. // else
  889. // {
  890. // spanIter->second->SetTag("Retcode",msg->response->iRet);
  891. // }
  892. // spanIter->second->Finish();
  893. // _spanMap.erase(msg->response->iRequestId);
  894. // }
  895. // }
  896. // #endif
  897. void AdapterProxy::stat(ReqMessage * msg)
  898. {
  899. if (msg->bPush)
  900. {
  901. return ;
  902. }
  903. StatMicMsgBody body;
  904. // int64_t sptime = 0;
  905. msg->iEndTime = TNOWMS;
  906. //包体信息.
  907. if(msg->eStatus == ReqMessage::REQ_RSP && TARSSERVERSUCCESS == msg->response->iRet)
  908. {
  909. body.count = 1;
  910. int64_t sptime = (msg->iEndTime >= msg->iBeginTime) ? (msg->iEndTime - msg->iBeginTime) : 10000;
  911. body.totalRspTime = body.minRspTime = body.maxRspTime = sptime;
  912. }
  913. else if (msg->eStatus == ReqMessage::REQ_TIME)
  914. {
  915. body.timeoutCount = 1;
  916. }
  917. else
  918. {
  919. body.execCount = 1;
  920. }
  921. auto it = _statBody.find(msg->request.sFuncName);
  922. if (it != _statBody.end())
  923. {
  924. merge(body, it->second);
  925. }
  926. else
  927. {
  928. _communicator->getStatReport()->getIntervCount(body.maxRspTime, body);
  929. _statBody[msg->request.sFuncName] = body;
  930. }
  931. }
  932. void AdapterProxy::merge(const StatMicMsgBody& inBody, StatMicMsgBody& outBody/*out*/)
  933. {
  934. outBody.count += inBody.count;
  935. outBody.timeoutCount += inBody.timeoutCount;
  936. outBody.execCount += inBody.execCount;
  937. outBody.totalRspTime += inBody.totalRspTime;
  938. if (outBody.maxRspTime < inBody.maxRspTime )
  939. {
  940. outBody.maxRspTime = inBody.maxRspTime;
  941. }
  942. //非0最小值
  943. if (outBody.minRspTime == 0 || (outBody.minRspTime > inBody.minRspTime && inBody.minRspTime != 0))
  944. {
  945. outBody.minRspTime = inBody.minRspTime;
  946. }
  947. _communicator->getStatReport()->getIntervCount(inBody.maxRspTime, outBody);
  948. }
  949. void AdapterProxy::mergeStat(map<StatMicMsgHead, StatMicMsgBody> & mStatMicMsg)
  950. {
  951. auto iter = _statBody.begin();
  952. for (; iter != _statBody.end(); ++iter)
  953. {
  954. _statHead.interfaceName = iter->first;
  955. //有数据就放到map里面
  956. if (iter->second.count != 0
  957. || iter->second.timeoutCount != 0
  958. || iter->second.execCount != 0)
  959. {
  960. //判断是否已经有相同的数据了,需要汇总
  961. auto it = mStatMicMsg.find(_statHead);
  962. if (it != mStatMicMsg.end())
  963. {
  964. merge(iter->second, it->second);
  965. }
  966. else
  967. {
  968. mStatMicMsg[_statHead] = iter->second;
  969. }
  970. }
  971. }
  972. //清空数据
  973. _statBody.clear();
  974. }
  975. void AdapterProxy::addConnExc(bool bExc)
  976. {
  977. if (bExc)
  978. {
  979. if(!_connExc && ++_connExcCnt >= _objectProxy->getRootServantProxy()->tars_check_timeout_info().maxConnectExc)
  980. {
  981. // TLOGERROR("[AdapterProxy::addConnExc ep: " << _trans->getConnectionString() << " connect exception! (connect error)]" << endl);
  982. setInactive();
  983. _connExc = true;
  984. }
  985. }
  986. else
  987. {
  988. // if (_connExc)
  989. // {
  990. // TLOGERROR("[AdapterProxy::addConnExc ep: " << _trans->getConnectionString() << " connect exception change to succ]" << endl);
  991. // }
  992. _connExc = false;
  993. _connExcCnt = 0;
  994. if (!_activeStatus)
  995. {
  996. _activeStatus = true;
  997. }
  998. }
  999. }
  1000. }