AdapterProxy.cpp 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "servant/AdapterProxy.h"
  17. #include "servant/Communicator.h"
  18. #include "servant/CommunicatorEpoll.h"
  19. #include "servant/StatReport.h"
  20. #include "servant/RemoteLogger.h"
  21. #include "tup/tup.h"
  22. #include "servant/StatF.h"
  23. // #ifdef TARS_OPENTRACKING
  24. // #include "servant/text_map_carrier.h"
  25. // #endif
  26. namespace tars
  27. {
  28. std::atomic<int> AdapterProxy::_idGen;
  29. AdapterProxy::AdapterProxy(ObjectProxy * pObjectProxy, const EndpointInfo &ep, Communicator* pCom)
  30. : _communicator(pCom)
  31. , _objectProxy(pObjectProxy)
  32. , _ep(ep)
  33. , _activeStateInReg(true)
  34. , _activeStatus(true)
  35. , _totalInvoke(0)
  36. , _timeoutInvoke(0)
  37. , _nextFinishInvokeTime(0)
  38. , _frequenceFailInvoke(0)
  39. , _frequenceFailTime(0)
  40. , _nextRetryTime(0)
  41. , _nextKeepAliveTime(0)
  42. //, _connTimeout(false)
  43. , _connExc(false)
  44. , _connExcCnt(0)
  45. , _timeoutLogFlag(true)
  46. , _noSendQueueLimit(100000)
  47. , _id((++_idGen))
  48. {
  49. _timeoutQueue.reset(new TC_TimeoutQueueNew<ReqMessage*>());
  50. if(pObjectProxy->getCommunicatorEpoll())
  51. {
  52. _noSendQueueLimit = pObjectProxy->getCommunicatorEpoll()->getNoSendQueueLimit();
  53. }
  54. if(_communicator)
  55. {
  56. _timeoutLogFlag = _communicator->getTimeoutLogFlag();
  57. }
  58. #if TARS_SSL
  59. if (ep.isSsl())
  60. {
  61. _trans.reset(new TC_SSLTransceiver(pObjectProxy->getCommunicatorEpoll()->getEpoller(), ep.getEndpoint()));
  62. }
  63. else if (ep.isTcp())
  64. {
  65. _trans.reset(new TC_TCPTransceiver(pObjectProxy->getCommunicatorEpoll()->getEpoller(), ep.getEndpoint()));
  66. }
  67. else
  68. {
  69. _trans.reset(new TC_UDPTransceiver(pObjectProxy->getCommunicatorEpoll()->getEpoller(), ep.getEndpoint()));
  70. }
  71. #else
  72. if (ep.isUdp())
  73. {
  74. _trans.reset(new TC_UDPTransceiver(pObjectProxy->getCommunicatorEpoll()->getEpoller(), ep.getEndpoint()));
  75. }
  76. else
  77. {
  78. _trans.reset(new TC_TCPTransceiver(pObjectProxy->getCommunicatorEpoll()->getEpoller(), ep.getEndpoint()));
  79. }
  80. #endif
  81. _trans->initializeClient(std::bind(&AdapterProxy::onCreateCallback, this, std::placeholders::_1),
  82. std::bind(&AdapterProxy::onCloseCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3),
  83. std::bind(&AdapterProxy::onConnectCallback, this, std::placeholders::_1),
  84. std::bind(&AdapterProxy::onRequestCallback, this, std::placeholders::_1),
  85. std::bind(&AdapterProxy::onParserCallback, this, std::placeholders::_1, std::placeholders::_2),
  86. std::bind(&AdapterProxy::onOpensslCallback, this, std::placeholders::_1),
  87. std::bind(&AdapterProxy::onCompletePackage, this, std::placeholders::_1));
  88. _trans->setClientAuthCallback(std::bind(&AdapterProxy::onSendAuthCallback, this, std::placeholders::_1),
  89. std::bind(&AdapterProxy::onVerifyAuthCallback, this, std::placeholders::_1, std::placeholders::_2));
  90. // if (!_endpoint.isTcp())
  91. // {
  92. // _checkTransInterval = 10; //udp端口10秒检查一次, 避免影响用户请求
  93. // }
  94. //初始化stat的head信息
  95. initStatHead();
  96. }
  97. AdapterProxy::~AdapterProxy()
  98. {
  99. }
  100. shared_ptr<TC_ProxyInfo> AdapterProxy::onCreateCallback(TC_Transceiver* trans)
  101. {
  102. // LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans << endl;
  103. _objectProxy->getCommunicatorEpoll()->addFd(this);
  104. trans->setConnTimeout(_objectProxy->getRootServantProxy()->tars_connect_timeout());
  105. if(_objectProxy->getRootServantProxy()->getProxyInfo())
  106. {
  107. return TC_ProxyInfo::createProxyInfo(*_objectProxy->getRootServantProxy()->getProxyInfo());
  108. }
  109. return NULL;
  110. }
  111. std::shared_ptr<TC_OpenSSL> AdapterProxy::onOpensslCallback(TC_Transceiver* trans)
  112. {
  113. // LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans << endl;
  114. return _objectProxy->getCommunicatorEpoll()->getCommunicator()->newClientSSL(_objectProxy->name());
  115. }
  116. void AdapterProxy::onCloseCallback(TC_Transceiver* trans, TC_Transceiver::CloseReason reason, const string &err)
  117. {
  118. if(_objectProxy->getRootServantProxy()->tars_get_push_callback())
  119. {
  120. _objectProxy->getRootServantProxy()->tars_get_push_callback()->onClose();
  121. }
  122. int second =_objectProxy->reconnect();
  123. if(second > 0)
  124. {
  125. _objectProxy->getCommunicatorEpoll()->reConnect(TNOWMS + second * 1000, trans);
  126. TLOGERROR("[trans close:" << _objectProxy->name() << "," << trans->getConnectEndpoint().toString() << ", reconnect:" << second << "]" << endl);
  127. }
  128. }
  129. void AdapterProxy::onConnectCallback(TC_Transceiver* trans)
  130. {
  131. // LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans << endl;
  132. addConnExc(false);
  133. if(_objectProxy->getRootServantProxy()->tars_get_push_callback())
  134. {
  135. _objectProxy->getRootServantProxy()->tars_get_push_callback()->onConnect(trans->getConnectEndpoint());
  136. }
  137. _objectProxy->onConnect(this);
  138. }
  139. void AdapterProxy::onRequestCallback(TC_Transceiver* trans)
  140. {
  141. // LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans << endl;
  142. doInvoke();
  143. }
  144. TC_NetWorkBuffer::PACKET_TYPE AdapterProxy::onParserCallback(TC_NetWorkBuffer& buff, TC_Transceiver* trans)
  145. {
  146. // LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans<< endl;
  147. try
  148. {
  149. shared_ptr<ResponsePacket> rsp = std::make_shared<ResponsePacket>();
  150. TC_NetWorkBuffer::PACKET_TYPE ret = _objectProxy->getRootServantProxy()->tars_get_protocol().responseFunc(buff, *rsp.get());
  151. if(ret == TC_NetWorkBuffer::PACKET_FULL || ret == TC_NetWorkBuffer::PACKET_FULL_CLOSE)
  152. {
  153. finishInvoke(rsp);
  154. }
  155. return ret;
  156. }
  157. catch(exception &ex)
  158. {
  159. TLOG_ERROR(ex.what() << ", obj: " << _objectProxy->name() << ", desc:" << _trans->getConnectionString()<< endl);
  160. }
  161. return TC_NetWorkBuffer::PACKET_ERR;
  162. }
  163. void AdapterProxy::onCompletePackage(TC_Transceiver* trans)
  164. {
  165. // LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans << endl;
  166. if(_objectProxy->getRootServantProxy()->tars_connection_serial() > 0)
  167. {
  168. _objectProxy->prepareConnection(this);
  169. if(!_timeoutQueue->sendListEmpty())
  170. {
  171. //并行连接模式, 继续发起连接, 建立连接后, 会自动doInvoke发包
  172. if(_trans->hasConnected())
  173. {
  174. doInvoke();
  175. }
  176. else
  177. {
  178. checkActive(true);
  179. }
  180. }
  181. }
  182. }
  183. shared_ptr<TC_NetWorkBuffer::Buffer> AdapterProxy::onSendAuthCallback(TC_Transceiver* trans)
  184. {
  185. // LOG_CONSOLE_DEBUG << "fd:" << trans->fd() << ", " << trans << endl;
  186. // 走框架的AK/SK认证
  187. BasicAuthInfo info;
  188. info.sObjName = _objectProxy->name();
  189. info.sAccessKey = _objectProxy->getCommunicatorEpoll()->getCommunicator()->getServantProperty(_objectProxy->name(), "accesskey");
  190. info.sSecretKey = _objectProxy->getCommunicatorEpoll()->getCommunicator()->getServantProperty(_objectProxy->name(), "secretkey");
  191. // std::string out = tars::defaultCreateAuthReq(info);
  192. const int kAuthType = 0x40;
  193. RequestPacket request;
  194. request.sFuncName = "InnerAuthServer";
  195. request.sServantName = "authServant";
  196. request.iVersion = TARSVERSION;
  197. request.iRequestId = 1;
  198. request.cPacketType = TARSNORMAL;
  199. request.iMessageType = kAuthType;
  200. request.sBuffer = tars::defaultCreateAuthReq(info);//.assign(out.begin(), out.end());
  201. return _objectProxy->getRootServantProxy()->tars_get_protocol().requestFunc(request, trans);
  202. }
  203. TC_NetWorkBuffer::PACKET_TYPE AdapterProxy::onVerifyAuthCallback(TC_NetWorkBuffer &buff, TC_Transceiver*trans)
  204. {
  205. shared_ptr<ResponsePacket> rsp = std::make_shared<ResponsePacket>();
  206. TC_NetWorkBuffer::PACKET_TYPE ret = _objectProxy->getRootServantProxy()->tars_get_protocol().responseFunc(buff, *rsp.get());
  207. if(ret == TC_NetWorkBuffer::PACKET_FULL)
  208. {
  209. // std::string ret(rsp->sBuffer.begin(), rsp->sBuffer.end());
  210. if(TC_Port::strncasecmp(rsp->sBuffer.data(), "AUTH_SUCC", rsp->sBuffer.size()) == 0)
  211. {
  212. return TC_NetWorkBuffer::PACKET_FULL;
  213. }
  214. return TC_NetWorkBuffer::PACKET_ERR;
  215. }
  216. return ret;
  217. }
  218. void AdapterProxy::initStatHead()
  219. {
  220. vector <string> vtSetInfo;
  221. if(!ClientConfig::SetDivision.empty() && StatReport::divison2SetInfo(ClientConfig::SetDivision, vtSetInfo)) {
  222. //主调(client)启用set
  223. _statHead.masterName = StatReport::trimAndLimitStr(ClientConfig::ModuleName + "." + vtSetInfo[0] + vtSetInfo[1] + vtSetInfo[2] + "@" + ClientConfig::TarsVersion, StatReport::MAX_MASTER_NAME_LEN);
  224. }
  225. else
  226. {
  227. _statHead.masterName = StatReport::trimAndLimitStr(ClientConfig::ModuleName + "@" + ClientConfig::TarsVersion, StatReport::MAX_MASTER_NAME_LEN);
  228. }
  229. string sSlaveSet = _ep.setDivision();
  230. const string sSlaveName = getSlaveName(_objectProxy->name());
  231. if (!sSlaveSet.empty() && StatReport::divison2SetInfo(sSlaveSet, vtSetInfo)) //被调启用set
  232. {
  233. _statHead.slaveSetName = vtSetInfo[0];
  234. _statHead.slaveSetArea = vtSetInfo[1];
  235. _statHead.slaveSetID = vtSetInfo[2];
  236. _statHead.slaveName = StatReport::trimAndLimitStr(sSlaveName + "." + vtSetInfo[0] + vtSetInfo[1] + vtSetInfo[2], StatReport::MAX_MASTER_NAME_LEN);
  237. }
  238. else
  239. {
  240. _statHead.slaveName = StatReport::trimAndLimitStr(sSlaveName, StatReport::MAX_MASTER_NAME_LEN);
  241. }
  242. _statHead.slaveIp = StatReport::trimAndLimitStr(_ep.host(), StatReport::MAX_MASTER_IP_LEN);
  243. _statHead.slavePort = _ep.port();
  244. _statHead.returnValue = 0;
  245. }
  246. string AdapterProxy::getSlaveName(const string& sSlaveName)
  247. {
  248. string::size_type pos = sSlaveName.find(".");
  249. if (pos != string::npos)
  250. {
  251. pos = sSlaveName.find(".", pos + 1);
  252. if (pos != string::npos)
  253. {
  254. return sSlaveName.substr(0, pos);
  255. }
  256. }
  257. return sSlaveName;
  258. }
  259. void AdapterProxy::onConnect()
  260. {
  261. _objectProxy->onConnect(this);
  262. }
  263. int AdapterProxy::invoke_connection_serial(ReqMessage * msg)
  264. {
  265. assert(msg->eType != ReqMessage::ONE_WAY);
  266. msg->sReqData = _objectProxy->getRootServantProxy()->tars_get_protocol().requestFunc(msg->request, _trans.get());
  267. msg->request.iRequestId = _timeoutQueue->generateId();
  268. if(!_requestMsg && _timeoutQueue->sendListEmpty())
  269. {
  270. int ret = _trans->sendRequest(msg->sReqData);
  271. if(ret == TC_Transceiver::eRetOk || ret == TC_Transceiver::eRetFull)
  272. {
  273. TLOGTARS("[AdapterProxy::invoke_connection_serial push (send) obj: " << _objectProxy->name() << ", desc:" << _trans->getConnectionString() << ", id: " << msg->request.iRequestId << endl);
  274. _requestMsg = msg;
  275. bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime);
  276. if (!bFlag)
  277. {
  278. TLOGERROR("[AdapterProxy::invoke_connection_serial fail1 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ",id: " << msg->request.iRequestId << "," << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  279. _requestMsg = NULL;
  280. msg->eStatus = ReqMessage::REQ_EXC;
  281. finishInvoke(msg);
  282. }
  283. return 0;
  284. }
  285. else if(ret == TC_Transceiver::eRetError)
  286. {
  287. //发送出错了
  288. _requestMsg = NULL;
  289. msg->eStatus = ReqMessage::REQ_EXC;
  290. finishInvoke(msg);
  291. return -1;
  292. }
  293. }
  294. //数据没有发送
  295. TLOGTARS("[AdapterProxy::invoke_connection_serial push (no send) " << _objectProxy->name() << ", " << _trans->getConnectionString() << ",id " << msg->request.iRequestId << ", " << _requestMsg << endl);
  296. //数据没有发送
  297. bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime, false);
  298. if (!bFlag)
  299. {
  300. TLOGERROR("[AdapterProxy::invoke_connection_serial fail2 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ", id: " << msg->request.iRequestId << ", " << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  301. msg->eStatus = ReqMessage::REQ_EXC;
  302. finishInvoke(msg);
  303. }
  304. return 0;
  305. }
  306. int AdapterProxy::invoke_connection_parallel(ReqMessage * msg)
  307. {
  308. msg->sReqData = _objectProxy->getRootServantProxy()->tars_get_protocol().requestFunc(msg->request, _trans.get());
  309. //当前队列是空的, 且是连接复用模式, 交给连接发送数据
  310. //连接连上 buffer不为空 发送数据成功
  311. if (_timeoutQueue->sendListEmpty())
  312. {
  313. int ret = _trans->sendRequest(msg->sReqData);
  314. if(ret == TC_Transceiver::eRetOk || ret == TC_Transceiver::eRetFull)
  315. {
  316. TLOGTARS("[AdapterProxy::invoke_connection_parallel push (send) obj: " << _objectProxy->name() << ", desc:" << _trans->getConnectionString() << ", id: " << msg->request.iRequestId << endl);
  317. //请求发送成功了 处理采样
  318. //这个请求发送成功了。单向调用直接返回
  319. if (msg->eType == ReqMessage::ONE_WAY)
  320. {
  321. // #ifdef TARS_OPENTRACKING
  322. // finishTrack(msg);
  323. // #endif
  324. delete msg;
  325. msg = NULL;
  326. return 0;
  327. }
  328. bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime);
  329. if (!bFlag)
  330. {
  331. TLOGERROR("[AdapterProxy::invoke_connection_parallel fail1 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ",id: " << msg->request.iRequestId << "," << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  332. msg->eStatus = ReqMessage::REQ_EXC;
  333. finishInvoke(msg);
  334. }
  335. return 0;
  336. }
  337. else if(ret == TC_Transceiver::eRetError)
  338. {
  339. TLOGTARS("[AdapterProxy::invoke_connection_parallel send request failed,queue size:" << _timeoutQueue->size() << ",id: " << msg->request.iRequestId << "," << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  340. //发送出错了
  341. msg->eStatus = ReqMessage::REQ_EXC;
  342. finishInvoke(msg);
  343. return -1;
  344. }
  345. }
  346. //没有发送数据
  347. TLOGTARS("[AdapterProxy::invoke_connection_parallel push (no send) " << _objectProxy->name() << ", " << _trans->getConnectionString() << ",id " << msg->request.iRequestId << endl);
  348. //之前还没有数据没发送 或者 请求发送失败了, 进队列
  349. bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime, false);
  350. if (!bFlag)
  351. {
  352. TLOGERROR("[AdapterProxy::invoke_connection_parallel fail2 : insert timeout queue fail,queue size:" << _timeoutQueue->size() << ", id: " << msg->request.iRequestId << ", " << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  353. msg->eStatus = ReqMessage::REQ_EXC;
  354. finishInvoke(msg);
  355. }
  356. return 0;
  357. }
  358. int AdapterProxy::invoke(ReqMessage * msg)
  359. {
  360. assert(_trans != NULL);
  361. TLOGTARS("[AdapterProxy::invoke " << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  362. //未发链表有长度限制
  363. if (_timeoutQueue->getSendListSize() > _noSendQueueLimit)
  364. {
  365. TLOGERROR("[AdapterProxy::invoke fail,ReqInfoQueue.size(" << _timeoutQueue->getSendListSize() << ") > " << _noSendQueueLimit << "," << _objectProxy->name() << "," << _trans->getConnectionString() << "]" << endl);
  366. msg->eStatus = ReqMessage::REQ_EXC;
  367. msg->response->iRet = tars::TARSSENDREQUESTERR;
  368. finishInvoke(msg);
  369. return 0;
  370. }
  371. //生成requestid
  372. //tars调用 而且 不是单向调用
  373. if (!msg->bFromRpc)
  374. {
  375. msg->request.iRequestId = _timeoutQueue->generateId();
  376. }
  377. // #ifdef TARS_OPENTRACKING
  378. // startTrack(msg);
  379. // #endif
  380. if(_objectProxy->getRootServantProxy()->tars_connection_serial() > 0)
  381. {
  382. return invoke_connection_serial(msg);
  383. }
  384. else
  385. {
  386. return invoke_connection_parallel(msg);
  387. }
  388. }
  389. void AdapterProxy::doInvoke_serial()
  390. {
  391. if(_requestMsg != NULL || _timeoutQueue->sendListEmpty())
  392. {
  393. //有请求 or 发送队列是空的, 当前请求不再发送 等epoll事件通知 在doRequest中发送
  394. TLOGTARS("[AdapterProxy::doInvoke_serial not send obj:" << _objectProxy->name() << ",desc:" << _trans->getConnectionString() << ", send size:" << _timeoutQueue->getSendListSize() << ", " << _requestMsg << endl);
  395. return;
  396. }
  397. ReqMessage * msg = NULL;
  398. _timeoutQueue->getSend(msg);
  399. int iRet = _trans->sendRequest(msg->sReqData);
  400. if(iRet == TC_Transceiver::eRetError)
  401. {
  402. _requestMsg = NULL;
  403. _timeoutQueue->popSend(true);
  404. msg->response->iRet = TARSSENDREQUESTERR;
  405. TLOGTARS("[AdapterProxy::doInvoke_serial sendRequest failed, obj:" << _objectProxy->name() << ",desc:" << _trans->getConnectionString() << ",id:" << msg->request.iRequestId << ", ret:" << iRet << endl);
  406. finishInvoke(msg);
  407. }
  408. else if(iRet == TC_Transceiver::eRetOk || iRet == TC_Transceiver::eRetFull)
  409. {
  410. _requestMsg = msg;
  411. //从发送send 队列中清掉, 但是保留在定时队列中
  412. _timeoutQueue->popSend(false);
  413. }
  414. }
  415. void AdapterProxy::doInvoke_parallel()
  416. {
  417. while(!_timeoutQueue->sendListEmpty())
  418. {
  419. ReqMessage * msg = NULL;
  420. _timeoutQueue->getSend(msg);
  421. int iRet = _trans->sendRequest(msg->sReqData);
  422. //发送失败 or 没有发送
  423. if (iRet == TC_Transceiver::eRetError)
  424. {
  425. TLOGTARS("[AdapterProxy::doInvoke_parallel sendRequest failed, obj:" << _objectProxy->name() << ",desc:" << _trans->getConnectionString() << ",id:" << msg->request.iRequestId << ", ret:" << iRet << endl);
  426. return;
  427. }
  428. if (iRet == TC_Transceiver::eRetNotSend)
  429. {
  430. TLOGTARS("[AdapterProxy::doInvoke_parallel sendRequest not send, obj:" << _objectProxy->name() << ",desc:" << _trans->getConnectionString() << ",id:" << msg->request.iRequestId << ", ret:" << iRet << endl);
  431. return;
  432. }
  433. //发送完成,要从队列里面清掉
  434. _timeoutQueue->popSend(msg->eType == ReqMessage::ONE_WAY);
  435. if (msg->eType == ReqMessage::ONE_WAY)
  436. {
  437. delete msg;
  438. msg = NULL;
  439. }
  440. //发送buffer已经满了 要返回
  441. if (iRet == TC_Transceiver::eRetFull)
  442. {
  443. return;
  444. }
  445. }
  446. }
  447. void AdapterProxy::doInvoke()
  448. {
  449. if(_objectProxy->getRootServantProxy()->tars_connection_serial() > 0)
  450. {
  451. doInvoke_serial();
  452. }
  453. else
  454. {
  455. doInvoke_parallel();
  456. }
  457. }
  458. void AdapterProxy::finishInvoke(bool bFail)
  459. {
  460. TLOGTARS("[AdapterProxy::finishInvoke(bool), " << _objectProxy->name() << ", " << _trans->getConnectionString() << "," << bFail << "]" << endl);
  461. time_t now = TNOW;
  462. const CheckTimeoutInfo& info = _objectProxy->getRootServantProxy()->tars_check_timeout_info();
  463. //处于异常状态 已经屏蔽
  464. if (!_activeStatus)
  465. {
  466. if (!bFail)
  467. {
  468. //重试成功,恢复正常状态
  469. _activeStatus = true;
  470. //连续失败次数清零
  471. _frequenceFailInvoke = 0;
  472. _nextFinishInvokeTime = now + info.checkTimeoutInterval;
  473. _frequenceFailInvoke = 0;
  474. _totalInvoke = 1;
  475. _timeoutInvoke = 0;
  476. _trans->setIsConnTimeout(false);
  477. _connExc = false;
  478. TLOGTARS("[AdapterProxy::finishInvoke(bool), " << _objectProxy->name() << ", " << _trans->getConnectionString() << ", retry ok]" << endl);
  479. }
  480. else
  481. {
  482. //结点已经屏蔽 过来失败的包不用处理
  483. TLOGTARS("[AdapterProxy::finishInvoke(bool), " << _objectProxy->name() << ", " << _trans->getConnectionString() << ", retry fail]" << endl);
  484. }
  485. return;
  486. }
  487. ++_totalInvoke;
  488. if (bFail)
  489. {
  490. //调用失败
  491. //失败次数+1
  492. ++_timeoutInvoke;
  493. //连续失败时间间隔重新计算
  494. if (0 == _frequenceFailInvoke)
  495. {
  496. _frequenceFailTime = now + info.minFrequenceFailTime;
  497. }
  498. //连续失败次数加1
  499. _frequenceFailInvoke++;
  500. //检查是否到了连续失败次数,且至少在5s以上
  501. if (_frequenceFailInvoke >= info.frequenceFailInvoke && now >= _frequenceFailTime)
  502. {
  503. setInactive();
  504. _activeStatus = false;
  505. resetRetryTime();
  506. TLOGERROR("[AdapterProxy::finishInvoke(bool) objname:"<< _objectProxy->name()
  507. << ",desc:" << _trans->getConnectionString()
  508. << ",disable frequenceFail,freqtimeout:" << _frequenceFailInvoke
  509. << ",timeout:"<< _timeoutInvoke
  510. << ",total:" << _totalInvoke << endl);
  511. return ;
  512. }
  513. }
  514. else
  515. {
  516. _frequenceFailInvoke = 0;
  517. }
  518. //判断一段时间内的超时比例
  519. if (now > _nextFinishInvokeTime)
  520. {
  521. _nextFinishInvokeTime = now + info.checkTimeoutInterval;
  522. if (bFail && _timeoutInvoke >= info.minTimeoutInvoke && _timeoutInvoke >= info.radio * _totalInvoke)
  523. {
  524. setInactive();
  525. TLOGERROR("[AdapterProxy::finishInvoke(bool), "
  526. << _objectProxy->name() << "," << _trans->getConnectionString()
  527. << ",disable radioFail,freqtimeout:" << _frequenceFailInvoke
  528. << ",timeout:" << _timeoutInvoke
  529. << ",total:" << _totalInvoke << "] " << endl);
  530. }
  531. else
  532. {
  533. //每一分钟清空一次
  534. _totalInvoke = 0;
  535. _timeoutInvoke = 0;
  536. }
  537. }
  538. }
  539. void AdapterProxy::resetRetryTime(bool next)
  540. {
  541. if(next) {
  542. _nextRetryTime = TNOW + _objectProxy->getRootServantProxy()->tars_check_timeout_info().tryTimeInterval;
  543. }else {
  544. _nextRetryTime = TNOW;
  545. }
  546. }
  547. bool AdapterProxy::checkActive(bool connecting)
  548. {
  549. time_t now = TNOW;
  550. TLOGTARS("[AdapterProxy::checkActive,"
  551. << _objectProxy->name() << "," << _trans->getConnectionString() << ","
  552. << (_activeStatus ? "active" : "inactive")
  553. << (connecting ? ", connecting" : "")
  554. << ", freqtimeout:" << _frequenceFailInvoke
  555. << ", timeout:" << _timeoutInvoke
  556. << ", connExcCnt:" << _connExcCnt
  557. << ", total:" << _totalInvoke << "]" << endl);
  558. //失效且没有到下次重试时间, 直接返回不可用
  559. if ((!_activeStatus) && (now < _nextRetryTime))
  560. {
  561. TLOGTARS("[AdapterProxy::checkActive,not reach retry time ," << _objectProxy->name() << ","
  562. << _trans->getConnectionString() << endl);
  563. return false;
  564. }
  565. if (!_activeStatus)
  566. {
  567. resetRetryTime();
  568. }
  569. //连接没有建立或者连接无效, 重新建立连接
  570. if (!_trans->isValid())
  571. {
  572. try
  573. {
  574. _trans->connect();
  575. }
  576. catch (exception & ex)
  577. {
  578. _activeStatus = false;
  579. _trans->close();
  580. TLOGERROR("[AdapterProxy::checkActive connect obj:" << _objectProxy->name() << ",desc:" << _trans->getConnectionString() << ", ex:" << ex.what() << endl);
  581. }
  582. }
  583. if(connecting && _activeStatus) {
  584. //hash模式, 且是第一次连接(_activeStatus=true, 即没有失败过), 返回已经连接或者正在连接的, 这样保证第一次hash不会错且连接挂过以后, 不会马上就使用, 直到连接成功才使用!
  585. return (_trans->hasConnected() || _trans->isConnecting());
  586. }
  587. else {
  588. return _trans->hasConnected();
  589. }
  590. }
  591. void AdapterProxy::onSetInactive()
  592. {
  593. _activeStatus = false;
  594. resetRetryTime();
  595. //需要关闭连接
  596. _trans->close();
  597. }
  598. //屏蔽结点
  599. void AdapterProxy::setInactive()
  600. {
  601. onSetInactive();
  602. _objectProxy->getRootServantProxy()->onSetInactive(_ep);
  603. TLOGTARS("[AdapterProxy::setInactive, " << _objectProxy->name() << ", " << _trans->getConnectionString() << ", inactive]" << endl);
  604. }
  605. void AdapterProxy::finishInvoke_serial(shared_ptr<ResponsePacket> & rsp)
  606. {
  607. TLOGTARS("[AdapterProxy::finishInvoke_serial, " << _objectProxy->name() << ", " << _trans->getConnectionString() << ", id:" << rsp->iRequestId << "]" << endl);
  608. if (!_requestMsg)
  609. {
  610. if(_timeoutLogFlag)
  611. {
  612. TLOGERROR("[AdapterProxy::finishInvoke_serial,"
  613. << _objectProxy->name()
  614. << ", get req-ptr NULL,may be timeout,id:"
  615. << rsp->iRequestId << ",desc:" << _trans->getConnectionString() << "]" << endl);
  616. }
  617. return;
  618. }
  619. ReqMessage * msg = _requestMsg;
  620. //这里的队列中的发送链表中的数据可能已经在timeout的时候删除了,因此可能会core,在erase中要加判断
  621. //获取请求信息
  622. bool retErase = _timeoutQueue->erase(_requestMsg->request.iRequestId, msg);
  623. assert(retErase);
  624. assert(_requestMsg == msg);
  625. assert(msg->eType != ReqMessage::ONE_WAY);
  626. assert(msg->eStatus == ReqMessage::REQ_REQ);
  627. _requestMsg = NULL;
  628. msg->eStatus = ReqMessage::REQ_RSP;
  629. msg->response = rsp;
  630. finishInvoke(msg);
  631. }
  632. void AdapterProxy::finishInvoke_parallel(shared_ptr<ResponsePacket> & rsp)
  633. {
  634. TLOGTARS("[AdapterProxy::finishInvoke_parallel, " << _objectProxy->name() << ", " << _trans->getConnectionString() << ", id:" << rsp->iRequestId << "]" << endl);
  635. ReqMessage * msg = NULL;
  636. if (rsp->iRequestId == 0)
  637. {
  638. //requestid 为0 是push消息, push callback is null
  639. if (!_objectProxy->getRootServantProxy()->tars_get_push_callback())
  640. {
  641. TLOGERROR("[AdapterProxy::finishInvoke(BasePacket), request id is 0, pushcallback is null, " << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  642. throw TarsDecodeException("request id is 0, pushcallback is null, obj: " + _objectProxy->name() + ", desc: " + _trans->getConnectionString());
  643. }
  644. msg = new ReqMessage();
  645. msg->eStatus = ReqMessage::REQ_RSP;
  646. msg->eType = ReqMessage::ASYNC_CALL;
  647. msg->bFromRpc = true;
  648. msg->bPush = true;
  649. msg->proxy = _objectProxy->getServantProxy();
  650. msg->pObjectProxy = _objectProxy;
  651. msg->adapter = this;
  652. msg->callback = _objectProxy->getRootServantProxy()->tars_get_push_callback();
  653. }
  654. else
  655. {
  656. //这里的队列中的发送链表中的数据可能已经在timeout的时候删除了,因此可能会core,在erase中要加判断
  657. //获取请求信息
  658. bool retErase = _timeoutQueue->erase(rsp->iRequestId, msg);
  659. //找不到此id信息
  660. if (!retErase)
  661. {
  662. if (_timeoutLogFlag)
  663. {
  664. TLOGERROR("[AdapterProxy::finishInvoke_parallel,"
  665. << _objectProxy->name()
  666. << ",get req-ptr NULL,may be timeout,id:"
  667. << rsp->iRequestId << ",desc:" << _trans->getConnectionString() << "]" << endl);
  668. }
  669. return ;
  670. }
  671. assert(msg->eStatus == ReqMessage::REQ_REQ);
  672. msg->eStatus = ReqMessage::REQ_RSP;
  673. }
  674. msg->response = rsp;
  675. finishInvoke(msg);
  676. }
  677. void AdapterProxy::finishInvoke(shared_ptr<ResponsePacket> & rsp)
  678. {
  679. if(_objectProxy->getRootServantProxy()->tars_connection_serial() > 0)
  680. {
  681. finishInvoke_serial(rsp);
  682. }
  683. else
  684. {
  685. finishInvoke_parallel(rsp);
  686. }
  687. }
  688. void AdapterProxy::finishInvoke(ReqMessage * msg)
  689. {
  690. // assert(msg->eStatus != ReqMessage::REQ_REQ);
  691. TLOGTARS("[AdapterProxy::finishInvokeMsg " << _objectProxy->name() << ", " << _trans->getConnectionString() << " ,id:" << msg->response->iRequestId << "]" << endl);
  692. // #ifdef TARS_OPENTRACKING
  693. // finishTrack(msg);
  694. // #endif
  695. //单向调用
  696. if (msg->eType == ReqMessage::ONE_WAY)
  697. {
  698. TLOGTARS("[AdapterProxy::finishInvokeMsg " << _objectProxy->name() << ", " << _trans->getConnectionString()
  699. << " ,id:" << msg->response->iRequestId
  700. << " ,one way call]" << endl);
  701. delete msg;
  702. msg = NULL;
  703. return ;
  704. }
  705. //stat 上报调用统计
  706. stat(msg);
  707. //超时屏蔽统计,异常不算超时统计
  708. if (msg->eStatus != ReqMessage::REQ_EXC && !msg->bPush)
  709. {
  710. finishInvoke(msg->response->iRet != TARSSERVERSUCCESS);
  711. }
  712. //同步调用,唤醒ServantProxy线程
  713. if (msg->eType == ReqMessage::SYNC_CALL)
  714. {
  715. if (!msg->sched)
  716. {
  717. assert(msg->pMonitor);
  718. msg->pMonitor->notify();
  719. }
  720. else
  721. {
  722. msg->sched->put(msg->iCoroId);
  723. }
  724. return ;
  725. }
  726. //异步调用
  727. if (msg->eType == ReqMessage::ASYNC_CALL)
  728. {
  729. if(!msg->sched)
  730. {
  731. if (msg->callback->getNetThreadProcess())
  732. {
  733. //如果是本线程的回调,直接本线程处理
  734. //比如获取endpoint
  735. ReqMessagePtr msgPtr = msg;
  736. try
  737. {
  738. msg->callback->dispatch(msgPtr);
  739. }
  740. catch (exception & e)
  741. {
  742. TLOGERROR("[AdapterProxy::finishInvoke(ReqMessage) exp:" << e.what() << " ,line:" << __LINE__ << "]" << endl);
  743. }
  744. catch (...)
  745. {
  746. TLOGERROR("[AdapterProxy::finishInvoke(ReqMessage) exp:unknown line:|" << __LINE__ << "]" << endl);
  747. }
  748. }
  749. else
  750. {
  751. //异步回调,放入回调处理线程中
  752. _objectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);
  753. }
  754. }
  755. else
  756. {
  757. CoroParallelBasePtr ptr = msg->callback->getCoroParallelBasePtr();
  758. if (ptr)
  759. {
  760. ptr->insert(msg);
  761. if (ptr->checkAllReqReturn())
  762. {
  763. msg->sched->put(msg->iCoroId);
  764. }
  765. }
  766. else
  767. {
  768. TLOGERROR("[AdapterProxy::finishInvoke(ReqMessage) coro parallel callback error,obj:" << _objectProxy->name() << ",endpoint:" << _trans->getConnectionString() << " ,id:" << msg->response->iRequestId << "]" << endl);
  769. delete msg;
  770. msg = NULL;
  771. }
  772. }
  773. return;
  774. }
  775. assert(false);
  776. return;
  777. }
  778. void AdapterProxy::doTimeout()
  779. {
  780. ReqMessage * msg;
  781. while (_timeoutQueue->timeout(msg))
  782. {
  783. TLOGTARS("[AdapterProxy::doTimeout, " << _objectProxy->name() << ", " << _trans->getConnectionString() << ", id:" << msg->request.iRequestId << ", status:" << msg->eStatus << "]" << endl);
  784. // assert(msg->eStatus == ReqMessage::REQ_REQ);
  785. if(msg == _requestMsg)
  786. {
  787. _requestMsg = NULL;
  788. //timeout, close
  789. _trans->close();
  790. }
  791. msg->eStatus = ReqMessage::REQ_TIME;
  792. //有可能是单向调用超时了
  793. if (msg->eType == ReqMessage::ONE_WAY)
  794. {
  795. delete msg;
  796. msg = NULL;
  797. continue;
  798. }
  799. //如果是异步调用超时
  800. if (msg->eType == ReqMessage::ASYNC_CALL)
  801. {
  802. //_connExcCnt大于0说明是网络连接异常引起的超时
  803. msg->response->iRet = (_connExcCnt > 0 ? TARSPROXYCONNECTERR : TARSASYNCCALLTIMEOUT);
  804. }
  805. finishInvoke(msg);
  806. }
  807. }
  808. void AdapterProxy::doKeepAlive()
  809. {
  810. if (!checkActive(false))
  811. {
  812. return;
  813. }
  814. time_t now = TNOW;
  815. if (now < _nextKeepAliveTime)
  816. {
  817. return;
  818. }
  819. _nextKeepAliveTime = now + _communicator->getKeepAliveInterval();
  820. TLOGTARS("[AdapterProxy::doKeepAlive, " << _objectProxy->name() << ", " << _trans->getConnectionString() << "]" << endl);
  821. ReqMessage *msg = new ReqMessage();
  822. msg->init(ReqMessage::ONE_WAY, _objectProxy->getServantProxy());
  823. msg->callback = NULL;
  824. msg->request.iVersion = TARSVERSION;
  825. msg->request.cPacketType = TARSONEWAY;
  826. msg->request.sFuncName = "tars_ping";
  827. msg->request.sServantName = _objectProxy->name();
  828. msg->request.iTimeout = ServantProxy::DEFAULT_ASYNCTIMEOUT;
  829. msg->proxy = _objectProxy->getServantProxy();
  830. msg->response->iRet = TARSSERVERUNKNOWNERR;
  831. //调用发起时间
  832. msg->iBeginTime = TNOWMS;
  833. msg->pObjectProxy = _objectProxy;
  834. invoke(msg);
  835. }
  836. // #ifdef TARS_OPENTRACKING
  837. // void AdapterProxy::startTrack(ReqMessage * msg)
  838. // {
  839. // if(!_communicator->_traceManager)
  840. // {
  841. // TLOGTARS("tracer info is null, just return" << endl);
  842. // return;
  843. // }
  844. // string functionName = msg->request.sFuncName;
  845. // std::unique_ptr<opentracing::Span> span;
  846. // if(msg->trackInfoMap.empty()) //start a new track
  847. // {
  848. // //std::chrono::time_point<std::chrono::system_clock> t1 = std::chrono::system_clock::now();
  849. // // _communicator->_traceManager->_tracer->StartSpan(functionName, {opentracing::StartTimestamp(t1)});
  850. // span = _communicator->_traceManager->_tracer->StartSpan(functionName);
  851. // }else{
  852. // TextMapCarrier carrier1(msg->trackInfoMap);
  853. // auto span_context_maybe = _communicator->_traceManager->_tracer->Extract(carrier1);
  854. // assert(span_context_maybe);
  855. // //std::chrono::time_point<std::chrono::system_clock> t1 = std::chrono::system_clock::now();
  856. // //_communicator->_traceManager->_tracer->StartSpan(functionName, {opentracing::ChildOf(span_context_maybe->get()), opentracing::StartTimestamp(t1)});
  857. // span = _communicator->_traceManager->_tracer->StartSpan(functionName, {opentracing::ChildOf(span_context_maybe->get())});
  858. // }
  859. // //将调用链信息注入到request的status中
  860. // std::unordered_map<std::string, std::string> text_map;
  861. // TextMapCarrier carrier(text_map);
  862. // auto err = _communicator->_traceManager->_tracer->Inject(span->context(), carrier);
  863. // assert(err);
  864. // std::string contxt = read_span_context(text_map);
  865. // _spanMap[msg->request.iRequestId].reset(span.release());
  866. // //_spanMap.insert(std::move(make_pair(msg->request.iRequestId, std::move(span))));
  867. // msg->request.status[ServantProxy::STATUS_TRACK_KEY] = contxt;
  868. // SET_MSG_TYPE(msg->request.iMessageType, tars::TARSMESSAGETYPETRACK);
  869. // }
  870. // void AdapterProxy::finishTrack(ReqMessage * msg)
  871. // {
  872. // map<int,std::unique_ptr<opentracing::Span>>::iterator spanIter = _spanMap.find(msg->request.iRequestId);
  873. // //report span info to zipkin collector
  874. // if(spanIter != _spanMap.end())
  875. // {
  876. // if(msg->eType == ReqMessage::ONE_WAY)
  877. // {
  878. // spanIter->second->SetTag("Retcode", 0);
  879. // }
  880. // else
  881. // {
  882. // spanIter->second->SetTag("Retcode",msg->response->iRet);
  883. // }
  884. // spanIter->second->Finish();
  885. // _spanMap.erase(msg->response->iRequestId);
  886. // }
  887. // }
  888. // #endif
  889. void AdapterProxy::stat(ReqMessage * msg)
  890. {
  891. if (msg->bPush)
  892. {
  893. return ;
  894. }
  895. StatMicMsgBody body;
  896. // int64_t sptime = 0;
  897. msg->iEndTime = TNOWMS;
  898. //包体信息.
  899. if(msg->eStatus == ReqMessage::REQ_RSP && TARSSERVERSUCCESS == msg->response->iRet)
  900. {
  901. body.count = 1;
  902. int64_t sptime = (msg->iEndTime >= msg->iBeginTime) ? (msg->iEndTime - msg->iBeginTime) : 10000;
  903. body.totalRspTime = body.minRspTime = body.maxRspTime = sptime;
  904. }
  905. else if (msg->eStatus == ReqMessage::REQ_TIME)
  906. {
  907. body.timeoutCount = 1;
  908. }
  909. else
  910. {
  911. body.execCount = 1;
  912. }
  913. auto it = _statBody.find(msg->request.sFuncName);
  914. if (it != _statBody.end())
  915. {
  916. merge(body, it->second);
  917. }
  918. else
  919. {
  920. _communicator->getStatReport()->getIntervCount(body.maxRspTime, body);
  921. _statBody[msg->request.sFuncName] = body;
  922. }
  923. }
  924. void AdapterProxy::merge(const StatMicMsgBody& inBody, StatMicMsgBody& outBody/*out*/)
  925. {
  926. outBody.count += inBody.count;
  927. outBody.timeoutCount += inBody.timeoutCount;
  928. outBody.execCount += inBody.execCount;
  929. outBody.totalRspTime += inBody.totalRspTime;
  930. if (outBody.maxRspTime < inBody.maxRspTime )
  931. {
  932. outBody.maxRspTime = inBody.maxRspTime;
  933. }
  934. //非0最小值
  935. if (outBody.minRspTime == 0 || (outBody.minRspTime > inBody.minRspTime && inBody.minRspTime != 0))
  936. {
  937. outBody.minRspTime = inBody.minRspTime;
  938. }
  939. _communicator->getStatReport()->getIntervCount(inBody.maxRspTime, outBody);
  940. }
  941. void AdapterProxy::mergeStat(map<StatMicMsgHead, StatMicMsgBody> & mStatMicMsg)
  942. {
  943. auto iter = _statBody.begin();
  944. for (; iter != _statBody.end(); ++iter)
  945. {
  946. _statHead.interfaceName = iter->first;
  947. //有数据就放到map里面
  948. if (iter->second.count != 0
  949. || iter->second.timeoutCount != 0
  950. || iter->second.execCount != 0)
  951. {
  952. //判断是否已经有相同的数据了,需要汇总
  953. auto it = mStatMicMsg.find(_statHead);
  954. if (it != mStatMicMsg.end())
  955. {
  956. merge(iter->second, it->second);
  957. }
  958. else
  959. {
  960. mStatMicMsg[_statHead] = iter->second;
  961. }
  962. }
  963. }
  964. //清空数据
  965. _statBody.clear();
  966. }
  967. void AdapterProxy::addConnExc(bool bExc)
  968. {
  969. if (bExc)
  970. {
  971. if(!_connExc && ++_connExcCnt >= _objectProxy->getRootServantProxy()->tars_check_timeout_info().maxConnectExc)
  972. {
  973. // TLOGERROR("[AdapterProxy::addConnExc ep: " << _trans->getConnectionString() << " connect exception! (connect error)]" << endl);
  974. setInactive();
  975. _connExc = true;
  976. }
  977. }
  978. else
  979. {
  980. // if (_connExc)
  981. // {
  982. // TLOGERROR("[AdapterProxy::addConnExc ep: " << _trans->getConnectionString() << " connect exception change to succ]" << endl);
  983. // }
  984. _connExc = false;
  985. _connExcCnt = 0;
  986. if (!_activeStatus)
  987. {
  988. _activeStatus = true;
  989. }
  990. }
  991. }
  992. }