EndpointManager.cpp 65 KB


  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "util/tc_port.h"
  17. #include "servant/EndpointManager.h"
  18. #include "servant/ObjectProxy.h"
  19. #include "servant/RemoteLogger.h"
  20. #include "servant/AppCache.h"
  21. #include "servant/Application.h"
  22. #include "servant/CommunicatorEpoll.h"
  23. #include "servant/StatReport.h"
  24. namespace tars
  25. {
  26. /////////////////////////////////////////////////////////////////////////////
  27. QueryEpBase::QueryEpBase(Communicator * pComm, bool bFirstNetThread,bool bInterfaceReq)
  28. : _communicator(pComm)
  29. , _firstNetThread(bFirstNetThread)
  30. , _interfaceReq(bInterfaceReq)
  31. , _direct(false)
  32. , _objName("")
  33. , _invokeSetId("")
  34. , _locator("")
  35. , _valid(false)
  36. , _weightType(E_LOOP)
  37. , _rootServant(true)
  38. , _requestRegistry(false)
  39. , _requestTimeout(0)
  40. , _timeoutInterval(5*1000)
  41. , _refreshTime(0)
  42. , _refreshInterval(60*1000)
  43. , _activeEmptyInterval(10*1000)
  44. , _failInterval(2*1000)
  45. , _manyFailInterval(30*1000)
  46. , _failTimesLimit(3)
  47. , _failTimes(0)
  48. {
  49. _refreshInterval = TC_Common::strto<int>(_communicator->getProperty("refresh-endpoint-interval", "60*1000"));
  50. if(_refreshInterval < 5*1000)
  51. {
  52. _refreshInterval = 5 * 1000;
  53. }
  54. setNoDelete(true);
  55. }
  56. void QueryEpBase::callback_findObjectById4All(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
  57. {
  58. TLOGTARS("[callback_findObjectById4All _objName:" << _objName << "|ret:" << ret
  59. << ",active:" << activeEp.size()
  60. << ",inactive:" << inactiveEp.size() << "]" << endl);
  61. doEndpoints(activeEp,inactiveEp,ret);
  62. }
  63. void QueryEpBase::callback_findObjectById4All_exception(Int32 ret)
  64. {
  65. TLOGERROR("[callback_findObjectById4All_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  66. doEndpointsExp(ret);
  67. }
  68. void QueryEpBase::callback_findObjectById4Any(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
  69. {
  70. TLOGTARS("[callback_findObjectById4Any _objName:" << _objName << "|ret:" << ret
  71. << ",active:" << activeEp.size()
  72. << ",inactive:" << inactiveEp.size() << "]" << endl);
  73. doEndpoints(activeEp,inactiveEp,ret);
  74. }
  75. void QueryEpBase::callback_findObjectById4Any_exception(Int32 ret)
  76. {
  77. TLOGERROR("[callback_findObjectById4Any_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  78. doEndpointsExp(ret);
  79. }
  80. void QueryEpBase::callback_findObjectByIdInSameGroup(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
  81. {
  82. TLOGTARS("[callback_findObjectByIdInSameGroup _objName:" << _objName << "|ret:"<<ret
  83. << ",active:" << activeEp.size()
  84. << ",inactive:" << inactiveEp.size() << "]" << endl);
  85. doEndpoints(activeEp,inactiveEp,ret);
  86. }
  87. void QueryEpBase::callback_findObjectByIdInSameGroup_exception(Int32 ret)
  88. {
  89. TLOGERROR("[callback_findObjectByIdInSameGroup_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  90. doEndpointsExp(ret);
  91. }
  92. void QueryEpBase::callback_findObjectByIdInSameSet( Int32 ret, const vector<EndpointF> &activeEp, const vector<EndpointF> & inactiveEp)
  93. {
  94. TLOGTARS("[callback_findObjectByIdInSameSet _objName:" << _objName << "|ret:" << ret
  95. << ",active:" << activeEp.size()
  96. << ",inactive:" << inactiveEp.size() << "]" << endl);
  97. doEndpoints(activeEp,inactiveEp,ret);
  98. }
  99. void QueryEpBase::callback_findObjectByIdInSameSet_exception( Int32 ret)
  100. {
  101. TLOGERROR("[callback_findObjectByIdInSameSet_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  102. doEndpointsExp(ret);
  103. }
  104. void QueryEpBase::callback_findObjectByIdInSameStation( Int32 ret, const vector<EndpointF> &activeEp, const vector<EndpointF> &inactiveEp)
  105. {
  106. TLOGTARS("[callback_findObjectByIdInSameStation _objName:" << _objName << "|ret:" << ret
  107. << ",active:" << activeEp.size()
  108. << ",inactive:" << inactiveEp.size() << "]" << endl);
  109. doEndpoints(activeEp,inactiveEp,ret);
  110. }
  111. void QueryEpBase::callback_findObjectByIdInSameStation_exception( Int32 ret)
  112. {
  113. TLOGERROR("[callback_findObjectByIdInSameStation_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  114. doEndpointsExp(ret);
  115. }
  116. int QueryEpBase::setLocatorPrx(QueryFPrx prx)
  117. {
  118. _queryFPrx = prx;
  119. return 0;
  120. }
  121. bool QueryEpBase::init(const string & sObjName, const string& setName, bool rootServant)
  122. {
  123. _locator = _communicator->getProperty("locator");
  124. TLOGTARS("QueryEpBase::init sObjName:" << sObjName << ", sLocator:" << _locator << ", setName:" << setName << ", rootServant: " << rootServant << endl);
  125. // LOG_CONSOLE_DEBUG << "QueryEpBase::init sObjName:" << sObjName << ", sLocator:" << _locator << ", setName:" << setName << ", rootServant: " << rootServant << endl;
  126. _invokeSetId = setName;
  127. _rootServant = rootServant;
  128. setObjName(sObjName);
  129. return true;
  130. }
  131. void QueryEpBase::setObjName(const string & sObjName)
  132. {
  133. string::size_type pos = sObjName.find_first_of('@');
  134. string sEndpoints;
  135. string sInactiveEndpoints;
  136. if (pos != string::npos)
  137. {
  138. //[直接连接]指定服务的IP和端口列表
  139. _objName = sObjName.substr(0,pos);
  140. sEndpoints = sObjName.substr(pos + 1);
  141. pos = _objName.find_first_of("#");
  142. if(pos != string::npos)
  143. {
  144. _objName = _objName.substr(0, pos);
  145. }
  146. _direct = true;
  147. _valid = true;
  148. }
  149. else
  150. {
  151. //[间接连接]通过registry查询服务端的IP和端口列表
  152. _direct = false;
  153. _valid = false;
  154. _objName = sObjName;
  155. if(_locator.find_first_not_of('@') == string::npos)
  156. {
  157. TLOGERROR("[QueryEpBase::setObjName locator is not valid,_locator:" << _locator << "]" << endl);
  158. throw TarsRegistryException("locator is not valid,_locator:" + _locator);
  159. }
  160. pos = _objName.find_first_of("#");
  161. if(pos != string::npos)
  162. {
  163. _objName = _objName.substr(0, pos);
  164. }
  165. _queryFPrx = _communicator->stringToProxy<QueryFPrx>(_locator);
  166. string sLocatorKey = _locator;
  167. //如果启用set,则获取按set分组的缓存
  168. if(ClientConfig::SetOpen)
  169. {
  170. sLocatorKey += "_" + ClientConfig::SetDivision;
  171. }
  172. string objName = _objName + string(_invokeSetId.empty() ? "" : ":") + _invokeSetId;
  173. //[间接连接]第一次使用cache,如果是接口级请求则不从缓存读取
  174. if(!_interfaceReq)
  175. {
  176. sEndpoints = AppCache::getInstance()->get(objName,sLocatorKey);
  177. sInactiveEndpoints = AppCache::getInstance()->get("inactive_"+objName,sLocatorKey);
  178. }
  179. }
  180. setEndpoints(sEndpoints,_activeEndpoints);
  181. setEndpoints(sInactiveEndpoints,_inactiveEndpoints);
  182. if(!_activeEndpoints.empty())
  183. {
  184. _valid = true;
  185. }
  186. if((!_activeEndpoints.empty() || !_inactiveEndpoints.empty()))
  187. {
  188. //非直接指定端口, 且从cache中能查到服务端口的, 不需要通知所有ObjectProxy更新地址
  189. notifyEndpoints(_activeEndpoints,_inactiveEndpoints,true);
  190. }
  191. }
  192. //
  193. //vector<string> QueryEpBase::sepEndpoint(const string& sEndpoints)
  194. //{
  195. // vector<string> vEndpoints;
  196. // bool flag = false;
  197. // string::size_type startPos = 0;
  198. // string::size_type sepPos = 0;
  199. // for(string::size_type pos = 0; pos < sEndpoints.size(); pos++)
  200. // {
  201. // if(sEndpoints[pos] == ':' && !flag )
  202. // {
  203. // sepPos = pos;
  204. // flag = true;
  205. // }
  206. // else if(flag)
  207. // {
  208. // if(sEndpoints[pos] == ' ')
  209. // {
  210. // continue;
  211. // }
  212. //
  213. // if(TC_Port::strncasecmp("tcp", (sEndpoints.c_str() + pos), 3) == 0
  214. // || TC_Port::strncasecmp("udp", (sEndpoints.c_str() + pos), 3) == 0
  215. // || TC_Port::strncasecmp("ssl", (sEndpoints.c_str() + pos), 3) == 0)
  216. // {
  217. // string ep = TC_Common::trim(string(sEndpoints.c_str() + startPos, sepPos - startPos));
  218. // if(!ep.empty()) {
  219. // vEndpoints.push_back(ep);
  220. // }
  221. // startPos = pos;
  222. // }
  223. //
  224. // flag = false;
  225. // }
  226. // }
  227. //
  228. // string ep = sEndpoints.substr(startPos);
  229. //
  230. // if(!ep.empty()) {
  231. // vEndpoints.push_back(ep);
  232. // }
  233. //
  234. //// vEndpoints.push_back(sEndpoints.substr(startPos));
  235. //
  236. // return vEndpoints;
  237. //}
  238. void QueryEpBase::setEndpoints(const string & sEndpoints, set<EndpointInfo> & setEndpoints)
  239. {
  240. if(sEndpoints == "")
  241. {
  242. return ;
  243. }
  244. bool bSameWeightType = true;
  245. bool bFirstWeightType = true;
  246. unsigned int iWeightType = 0;
  247. vector<string> vEndpoints = TC_Endpoint::sepEndpoint(sEndpoints);
  248. for (size_t i = 0; i < vEndpoints.size(); ++i)
  249. {
  250. try
  251. {
  252. TC_Endpoint ep(vEndpoints[i]);
  253. string sSetDivision;
  254. //解析set分组信息
  255. if (!_direct)
  256. {
  257. string sep = " -s ";
  258. size_t pos = vEndpoints[i].rfind(sep);
  259. if (pos != string::npos)
  260. {
  261. sSetDivision = TC_Common::trim(vEndpoints[i].substr(pos+sep.size()));
  262. size_t endPos = sSetDivision.find(" ");
  263. if (endPos != string::npos)
  264. {
  265. sSetDivision = sSetDivision.substr(0, endPos);
  266. }
  267. }
  268. }
  269. if(bFirstWeightType)
  270. {
  271. bFirstWeightType = false;
  272. iWeightType = ep.getWeightType();
  273. }
  274. else
  275. {
  276. if(ep.getWeightType() != iWeightType)
  277. {
  278. bSameWeightType = false;
  279. }
  280. }
  281. EndpointInfo epi(ep, sSetDivision);
  282. setEndpoints.insert(epi);
  283. }
  284. catch (exception &ex)
  285. {
  286. TLOGERROR("[QueryEpBase::setEndpoints parse error,objname:" << _objName << ",endpoint:" << vEndpoints[i] << "]" << endl);
  287. }
  288. }
  289. if(bSameWeightType)
  290. {
  291. if(iWeightType == 1)
  292. {
  293. _weightType = E_STATIC_WEIGHT;
  294. }
  295. else
  296. {
  297. _weightType = E_LOOP;
  298. }
  299. }
  300. else
  301. {
  302. _weightType = E_LOOP;
  303. }
  304. }
  305. void QueryEpBase::refreshReg(GetEndpointType type, const string & sName)
  306. {
  307. onUpdateOutter();
  308. if(_direct)
  309. {
  310. return;
  311. }
  312. int64_t iNow = TNOWMS;
  313. //正在请求状态 而且请求超时了,或者第一次
  314. if(_requestRegistry && _requestTimeout < iNow)
  315. {
  316. doEndpointsExp(0);
  317. }
  318. //如果是间接连接,通过registry定时查询服务列表
  319. //正在请求状态 而且请求超时了
  320. //非请求状态 到了下一个刷新时间了
  321. if( (!_requestRegistry) && (_refreshTime <= iNow))
  322. {
  323. _requestRegistry = true;
  324. //一定时间不回调就算超时了
  325. _requestTimeout = iNow + _timeoutInterval;
  326. TLOGTARS("[QueryEpBase::refresh," << _objName << "]" <<endl);
  327. if(_valid && !_rootServant)
  328. {
  329. return;
  330. }
  331. //判断是同步调用还是异步调用
  332. //内部请求主控都是异步请求
  333. //接口请求主控第一次是同步请求
  334. bool bSync = (!_valid && _interfaceReq);
  335. //如果是异步且不是根servant(通过#1创建的servant, 不主动更新主控信息)
  336. if(!bSync && !_rootServant)
  337. return;
  338. try
  339. {
  340. if(bSync)
  341. {
  342. vector<EndpointF> activeEp;
  343. vector<EndpointF> inactiveEp;
  344. int iRet = 0;
  345. switch(type)
  346. {
  347. case E_ALL:
  348. {
  349. iRet = _queryFPrx->findObjectById4Any(_objName,activeEp,inactiveEp, ServerConfig::Context);
  350. break;
  351. }
  352. case E_STATION:
  353. {
  354. iRet = _queryFPrx->findObjectByIdInSameStation(_objName,sName,activeEp,inactiveEp, ServerConfig::Context);
  355. break;
  356. }
  357. case E_SET:
  358. {
  359. iRet = _queryFPrx->findObjectByIdInSameSet(_objName,sName,activeEp,inactiveEp, ServerConfig::Context);
  360. break;
  361. }
  362. case E_DEFAULT:
  363. default:
  364. {
  365. if(ClientConfig::SetOpen || !_invokeSetId.empty())
  366. {
  367. //指定set调用时,指定set的优先级最高
  368. string setId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
  369. iRet = _queryFPrx->findObjectByIdInSameSet(_objName,setId,activeEp,inactiveEp, ServerConfig::Context);
  370. }
  371. else
  372. {
  373. iRet = _queryFPrx->findObjectByIdInSameGroup(_objName,activeEp,inactiveEp, ServerConfig::Context);
  374. }
  375. break;
  376. }
  377. }
  378. doEndpoints(activeEp, inactiveEp, iRet, true);
  379. }
  380. else
  381. {
  382. switch(type)
  383. {
  384. case E_ALL:
  385. {
  386. _queryFPrx->async_findObjectById4Any(this,_objName, ServerConfig::Context);
  387. break;
  388. }
  389. case E_STATION:
  390. {
  391. _queryFPrx->async_findObjectByIdInSameStation(this,_objName,sName, ServerConfig::Context);
  392. break;
  393. }
  394. case E_SET:
  395. {
  396. _queryFPrx->async_findObjectByIdInSameSet(this,_objName,sName, ServerConfig::Context);
  397. break;
  398. }
  399. case E_DEFAULT:
  400. default:
  401. {
  402. if(ClientConfig::SetOpen || !_invokeSetId.empty())
  403. {
  404. //指定set调用时,指定set的优先级最高
  405. string setId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
  406. _queryFPrx->async_findObjectByIdInSameSet(this,_objName,setId, ServerConfig::Context);
  407. }
  408. else
  409. {
  410. _queryFPrx->async_findObjectByIdInSameGroup(this,_objName, ServerConfig::Context);
  411. }
  412. break;
  413. }
  414. }//end switch
  415. }
  416. }
  417. catch(TC_Exception & ex)
  418. {
  419. TLOGERROR("[QueryEpBase::refreshReg obj:"<<_objName<<"exception:"<<ex.what() << "]"<<endl);
  420. doEndpointsExp(TARSSERVERUNKNOWNERR);
  421. }
  422. catch(...)
  423. {
  424. TLOGERROR("[QueryEpBase::refreshReg obj:"<<_objName<<"unknown exception]" <<endl);
  425. doEndpointsExp(TARSSERVERUNKNOWNERR);
  426. }
  427. }
  428. }
  429. void QueryEpBase::doEndpoints(const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp, int iRet, bool bSync)
  430. {
  431. if(iRet != 0)
  432. {
  433. doEndpointsExp(iRet);
  434. return ;
  435. }
  436. _failTimes = 0;
  437. _requestRegistry = false;
  438. int64_t iNow = TNOWMS;
  439. //有返回成功的结点,按照正常的频率
  440. //如果返回空列表或者返回失败 2s去刷新一次
  441. //接口请求主控的方式 不管是不是空都要去刷新
  442. if(activeEp.empty() && (!_interfaceReq) )
  443. {
  444. _refreshTime = iNow + _activeEmptyInterval;
  445. //如果registry返回Active服务列表为空,不做更新
  446. TLOGERROR("[QueryEpBase::doEndpoints, callback activeEps is empty,objname:"<< _objName << "]" << endl);
  447. return;
  448. }
  449. else
  450. {
  451. _refreshTime = iNow + _refreshInterval;
  452. }
  453. bool bNeedNotify = false;
  454. bool bSameWeightType = true;
  455. bool bFirstWeightType = true;
  456. int iWeightType = 0;
  457. set<string> sActiveEndpoints;
  458. set<string> sInactiveEndpoints;
  459. set<EndpointInfo> activeEps;
  460. set<EndpointInfo> inactiveEps;
  461. //生成active set 用于比较
  462. for (uint32_t i = 0; i < activeEp.size(); ++i)
  463. {
  464. if(bFirstWeightType)
  465. {
  466. bFirstWeightType = false;
  467. iWeightType = activeEp[i].weightType;
  468. }
  469. else
  470. {
  471. if(activeEp[i].weightType != iWeightType)
  472. {
  473. bSameWeightType = false;
  474. }
  475. }
  476. // taf istcp意思和这里枚举值对应
  477. activeEps.insert(EndpointInfo(activeEp[i]));
  478. }
  479. //生成inactive set 用于比较
  480. for (uint32_t i = 0; i < inactiveEp.size(); ++i)
  481. {
  482. // taf istcp意思和这里枚举值对应
  483. inactiveEps.insert(EndpointInfo(inactiveEp[i]));
  484. }
  485. if(bSameWeightType)
  486. {
  487. if(iWeightType == 1)
  488. {
  489. _weightType = E_STATIC_WEIGHT;
  490. }
  491. else
  492. {
  493. _weightType = E_LOOP;
  494. }
  495. }
  496. else
  497. {
  498. _weightType = E_LOOP;
  499. }
  500. if(activeEps != _activeEndpoints)
  501. {
  502. bNeedNotify = true;
  503. _activeEndpoints = activeEps;
  504. if(_firstNetThread)
  505. {
  506. setEndPointToCache(false);
  507. }
  508. }
  509. if(inactiveEps != _inactiveEndpoints)
  510. {
  511. bNeedNotify = true;
  512. _inactiveEndpoints = inactiveEps;
  513. if(_firstNetThread)
  514. {
  515. setEndPointToCache(true);
  516. }
  517. }
  518. if(bNeedNotify)
  519. {
  520. notifyEndpoints(_activeEndpoints,_inactiveEndpoints,bSync);
  521. }
  522. if(!_valid)
  523. {
  524. _valid = true;
  525. doNotify();
  526. }
  527. }
  528. void QueryEpBase::doEndpointsExp(int iRet)
  529. {
  530. _failTimes++;
  531. _requestRegistry = false;
  532. int64_t iNow = TNOWMS;
  533. //频率控制获取主控失败 2秒钟再更新
  534. _refreshTime = iNow + _failInterval;
  535. //获取主控连续失败3次就等30s再更新一次
  536. //连续失败 强制设成数据是有效的
  537. if(_failTimes > _failTimesLimit)
  538. {
  539. if(!_valid)
  540. {
  541. _valid = true;
  542. doNotify();
  543. }
  544. _refreshTime = iNow + _manyFailInterval;
  545. }
  546. }
  547. void QueryEpBase::setEndPointToCache(bool bInactive)
  548. {
  549. //如果是接口级请求则不缓存到文件
  550. if(_interfaceReq)
  551. {
  552. return;
  553. }
  554. string sEndpoints;
  555. set<EndpointInfo> doEndpoints;
  556. if(!bInactive)
  557. {
  558. doEndpoints = _activeEndpoints;
  559. }
  560. else
  561. {
  562. doEndpoints = _inactiveEndpoints;
  563. }
  564. set<EndpointInfo>::iterator iter;
  565. iter = doEndpoints.begin();
  566. for (; iter != doEndpoints.end(); ++iter)
  567. {
  568. //这里的超时时间 只是对服务端有效。这里的值无效。所以默认用3000了
  569. TC_Endpoint ep = iter->getEndpoint();
  570. if (!sEndpoints.empty())
  571. {
  572. sEndpoints += ":";
  573. }
  574. sEndpoints += ep.toString();
  575. if (!iter->setDivision().empty())
  576. {
  577. sEndpoints += " -s " + iter->setDivision();
  578. }
  579. }
  580. //如果启用set,则按set分组保存
  581. string sLocatorKey = _locator;
  582. if(ClientConfig::SetOpen)
  583. {
  584. sLocatorKey += "_" + ClientConfig::SetDivision;
  585. }
  586. string objName = _objName + string(_invokeSetId.empty()?"":":") + _invokeSetId;
  587. if(bInactive)
  588. {
  589. AppCache::getInstance()->set("inactive_"+objName,sEndpoints,sLocatorKey);
  590. }
  591. else
  592. {
  593. AppCache::getInstance()->set(objName,sEndpoints,sLocatorKey);
  594. }
  595. TLOGTARS("[setEndPointToCache,obj:" << _objName << ",invokeSetId:" << _invokeSetId << ",endpoint:" << sEndpoints << "]" << endl);
  596. }
  597. EndpointManager::EndpointManager(ObjectProxy * pObjectProxy, Communicator* pComm, bool bFirstNetThread)
  598. : QueryEpBase(pComm, bFirstNetThread, false)
  599. ,_objectProxy(pObjectProxy)
  600. ,_lastRoundPosition(0)
  601. ,_update(true)
  602. ,_updateWeightInterval(60)
  603. ,_lastSWeightPosition(0)
  604. ,_consistentHashWeight(E_TC_CONHASH_KETAMAHASH)
  605. ,_consistentHash(E_TC_CONHASH_KETAMAHASH)
  606. {
  607. setNetThreadProcess(true);
  608. }
  609. EndpointManager::~EndpointManager()
  610. {
  611. map<string,AdapterProxy*>::iterator iterAdapter;
  612. for(iterAdapter = _allProxys.begin();iterAdapter != _allProxys.end();iterAdapter++)
  613. {
  614. if(iterAdapter->second)
  615. {
  616. delete iterAdapter->second;
  617. iterAdapter->second = NULL;
  618. }
  619. }
  620. _allProxys.clear();
  621. }
  622. void EndpointManager::onUpdateOutter()
  623. {
  624. // LOG_CONSOLE_DEBUG << this->_objectProxy << ", valid:" << _valid << ", " << _outterUpdate.get() << endl;
  625. if(_outterUpdate)
  626. {
  627. shared_ptr<OutterUpdate> outterUpdate = _outterUpdate;
  628. updateEndpoints(outterUpdate->active, outterUpdate->inactive);
  629. _valid = true;
  630. _outterUpdate.reset();
  631. }
  632. }
  633. void EndpointManager::updateEndpointsOutter(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
  634. {
  635. // LOG_CONSOLE_DEBUG << this->_objectProxy << ", " << active.begin()->desc() << endl;
  636. //创新新对象, 避免线程冲突
  637. _outterUpdate = std::make_shared<OutterUpdate>();
  638. _outterUpdate->active = active;
  639. _outterUpdate->inactive = inactive;
  640. //更新时间
  641. _refreshTime = TNOWMS + _refreshInterval;
  642. // updateEndpoints(active, inactive);
  643. }
  644. void EndpointManager::updateEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
  645. {
  646. TLOGTARS("[EndpointManager::updateEndpoints obj:" << this->_objName << ", active:" << active.size() << ", inactive size:" << inactive.size() << endl);
  647. pair<map<string,AdapterProxy*>::iterator,bool> result;
  648. _activeProxys.clear();
  649. _regProxys.clear();
  650. _indexActiveProxys.clear();
  651. _sortActivProxys.clear();
  652. if(!active.empty())
  653. {
  654. //先把服务都设置为非活跃
  655. for (auto iter = _allProxys.begin(); iter != _allProxys.end(); ++iter)
  656. {
  657. iter->second->setActiveInReg(false);
  658. }
  659. }
  660. //更新active
  661. for(auto iter = active.begin(); iter != active.end(); ++iter)
  662. {
  663. if(!_direct && _weightType == E_STATIC_WEIGHT && iter->weight() <= 0)
  664. {
  665. continue;
  666. }
  667. // LOG_CONSOLE_DEBUG << std::this_thread::get_id() << ", allProxys size:" << _allProxys.size() << ", " << iter->cmpDesc() << endl;
  668. auto iterAdapter = _allProxys.find(iter->cmpDesc());
  669. if(iterAdapter == _allProxys.end())
  670. {
  671. AdapterProxy* ap = new AdapterProxy(_objectProxy, *iter, _communicator);
  672. result = _allProxys.insert(make_pair(iter->cmpDesc(),ap));
  673. iterAdapter = result.first;
  674. _vAllProxys.push_back(ap);
  675. }
  676. //该节点在主控的状态为active
  677. iterAdapter->second->setActiveInReg(true);
  678. _activeProxys.push_back(iterAdapter->second);
  679. _regProxys.insert(make_pair(iter->cmpDesc(),iterAdapter->second));
  680. const string &host = iterAdapter->second->endpoint().host();
  681. _indexActiveProxys.insert(make_pair(host, iterAdapter->second));
  682. _sortActivProxys.insert(make_pair(host, iterAdapter->second));
  683. //设置该节点的静态权重值
  684. iterAdapter->second->setWeight(iter->weight());
  685. }
  686. //更新inactive
  687. for(auto iter = inactive.begin(); iter != inactive.end(); ++iter)
  688. {
  689. if(!_direct && _weightType == E_STATIC_WEIGHT && iter->weight() <= 0)
  690. {
  691. continue;
  692. }
  693. auto iterAdapter = _allProxys.find(iter->cmpDesc());
  694. if(iterAdapter == _allProxys.end())
  695. {
  696. AdapterProxy* ap = new AdapterProxy(_objectProxy, *iter, _communicator);
  697. result = _allProxys.insert(make_pair(iter->cmpDesc(),ap));
  698. assert(result.second);
  699. iterAdapter = result.first;
  700. _vAllProxys.push_back(ap);
  701. }
  702. //该节点在主控的状态为inactive
  703. iterAdapter->second->setActiveInReg(false);
  704. _regProxys.insert(make_pair(iter->cmpDesc(),iterAdapter->second));
  705. //设置该节点的静态权重值
  706. iterAdapter->second->setWeight(iter->weight());
  707. }
  708. //_vRegProxys 需要按顺序来 重排
  709. _vRegProxys.clear();
  710. auto iterAdapter = _regProxys.begin();
  711. for(;iterAdapter != _regProxys.end();++iterAdapter)
  712. {
  713. _vRegProxys.push_back(iterAdapter->second);
  714. }
  715. _update = true;
  716. }
  717. void EndpointManager::notifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive,bool bNotify)
  718. {
  719. updateEndpoints(active, inactive);
  720. //丢给外层统一做
  721. _objectProxy->onNotifyEndpoints(active, inactive);
  722. }
  723. void EndpointManager::doNotify()
  724. {
  725. _objectProxy->doInvoke();
  726. }
  727. bool EndpointManager::selectAdapterProxy(ReqMessage * msg,AdapterProxy * & pAdapterProxy)
  728. {
  729. pAdapterProxy = NULL;
  730. //刷新主控
  731. refreshReg(E_DEFAULT, "");
  732. //无效的数据 返回true
  733. if (!_valid)
  734. {
  735. return true;
  736. }
  737. //如果有hash,则先使用hash策略
  738. if (msg->data._hash)
  739. {
  740. pAdapterProxy = getHashProxy(msg->data._hashCode, msg->data._conHash);
  741. return false;
  742. }
  743. if(_weightType == E_STATIC_WEIGHT)
  744. {
  745. //权重模式
  746. bool bStaticWeighted = false;
  747. if(_weightType == E_STATIC_WEIGHT || msg->eType == ReqMessage::ONE_WAY)
  748. bStaticWeighted = true;
  749. pAdapterProxy = getWeightedProxy(bStaticWeighted);
  750. }
  751. else
  752. {
  753. //普通轮询模式
  754. pAdapterProxy = getNextValidProxy();
  755. }
  756. return false;
  757. }
  758. AdapterProxy * EndpointManager::getNextValidProxy()
  759. {
  760. if (_activeProxys.empty())
  761. {
  762. TLOGERROR("[EndpointManager::getNextValidProxy activeEndpoints is empty][obj:"<<_objName<<"]" << endl);
  763. return NULL;
  764. }
  765. vector<AdapterProxy*> conn;
  766. for(size_t i=0;i<_activeProxys.size();i++)
  767. {
  768. ++_lastRoundPosition;
  769. if(_lastRoundPosition >= _activeProxys.size())
  770. {
  771. _lastRoundPosition = 0;
  772. }
  773. if(_activeProxys[_lastRoundPosition]->checkActive(false))
  774. {
  775. return _activeProxys[_lastRoundPosition];
  776. }
  777. if(!_activeProxys[_lastRoundPosition]->isConnTimeout() && !_activeProxys[_lastRoundPosition]->isConnExc()) {
  778. conn.push_back(_activeProxys[_lastRoundPosition]);
  779. }
  780. }
  781. if(conn.size() > 0)
  782. {
  783. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  784. AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
  785. //该proxy可能已经被屏蔽,需重新连一次
  786. adapterProxy->checkActive(true);
  787. return adapterProxy;
  788. }
  789. //所有adapter都有问题 选不到结点,随机找一个重试
  790. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  791. adapterProxy->resetRetryTime(false);
  792. //该proxy可能已经被屏蔽,需重新连一次
  793. adapterProxy->checkActive(true);
  794. return adapterProxy;
  795. }
  796. AdapterProxy* EndpointManager::getHashProxy(int64_t hashCode, bool bConsistentHash)
  797. {
  798. if(_weightType == E_STATIC_WEIGHT)
  799. {
  800. if(bConsistentHash)
  801. {
  802. return getConHashProxyForWeight(hashCode, true);
  803. }
  804. else
  805. {
  806. return getHashProxyForWeight(hashCode, true, _hashStaticRouterCache);
  807. }
  808. }
  809. else
  810. {
  811. if(bConsistentHash)
  812. {
  813. return getConHashProxyForNormal(hashCode);
  814. }
  815. else
  816. {
  817. return getHashProxyForNormal(hashCode);
  818. }
  819. }
  820. }
  821. AdapterProxy* EndpointManager::getHashProxyForWeight(int64_t hashCode, bool bStatic, vector<size_t> &vRouterCache)
  822. {
  823. if(_vRegProxys.empty())
  824. {
  825. TLOGERROR("[EndpointManager::getHashProxyForWeight _vRegProxys is empty], bStatic:" << bStatic << endl);
  826. return NULL;
  827. }
  828. if(checkHashStaticWeightChange(bStatic))
  829. {
  830. int64_t iBegin = TNOWMS;
  831. updateHashProxyWeighted(bStatic);
  832. int64_t iEnd = TNOWMS;
  833. TLOGTARS("[EndpointManager::getHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
  834. }
  835. if(vRouterCache.size() > 0)
  836. {
  837. size_t hash = ((int64_t)hashCode) % vRouterCache.size();
  838. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  839. if(hash >= vRouterCache.size())
  840. {
  841. hash = hash % vRouterCache.size();
  842. }
  843. size_t iIndex = vRouterCache[hash];
  844. if(iIndex >= _vRegProxys.size())
  845. {
  846. iIndex = iIndex % _vRegProxys.size();
  847. }
  848. //被hash到的节点在主控是active的才走在流程
  849. if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive(true))
  850. {
  851. return _vRegProxys[iIndex];
  852. }
  853. else
  854. {
  855. TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << _vRegProxys[iIndex]->endpoint().desc() << endl);
  856. if(_activeProxys.empty())
  857. {
  858. TLOGERROR("[EndpointManager::getHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
  859. return NULL;
  860. }
  861. //在active节点中再次hash
  862. vector<AdapterProxy*> thisHash = _activeProxys;
  863. vector<AdapterProxy*> conn;
  864. do
  865. {
  866. hash = ((int64_t)hashCode) % thisHash.size();
  867. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  868. if(hash >= thisHash.size())
  869. {
  870. hash = hash % thisHash.size();
  871. }
  872. if (thisHash[hash]->checkActive(true))
  873. {
  874. return thisHash[hash];
  875. }
  876. if(!thisHash[hash]->isConnTimeout() &&
  877. !thisHash[hash]->isConnExc())
  878. {
  879. conn.push_back(thisHash[hash]);
  880. }
  881. thisHash.erase(thisHash.begin() + hash);
  882. }
  883. while(!thisHash.empty());
  884. if(conn.size() > 0)
  885. {
  886. hash = ((int64_t)hashCode) % conn.size();
  887. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  888. if(hash >= conn.size())
  889. {
  890. hash = hash % conn.size();
  891. }
  892. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  893. AdapterProxy *adapterProxy = conn[hash];
  894. //该proxy可能已经被屏蔽,需重新连一次
  895. adapterProxy->checkActive(true);
  896. return adapterProxy;
  897. }
  898. //所有adapter都有问题 选不到结点,随机找一个重试
  899. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  900. adapterProxy->resetRetryTime(false);
  901. //该proxy可能已经被屏蔽,需重新连一次
  902. adapterProxy->checkActive(true);
  903. return adapterProxy;
  904. }
  905. }
  906. return getHashProxyForNormal(hashCode);
  907. }
  908. AdapterProxy* EndpointManager::getConHashProxyForWeight(int64_t hashCode, bool bStatic)
  909. {
  910. if(_vRegProxys.empty())
  911. {
  912. TLOGERROR("[EndpointManager::getConHashProxyForWeight _vRegProxys is empty], bStatic:" << bStatic << endl);
  913. return NULL;
  914. }
  915. if(checkConHashChange(bStatic, _lastConHashWeightProxys))
  916. {
  917. int64_t iBegin = TNOWMS;
  918. updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
  919. int64_t iEnd = TNOWMS;
  920. TLOGTARS("[EndpointManager::getConHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
  921. }
  922. while(_consistentHashWeight.size() > 0)
  923. {
  924. string sNode;
  925. // 通过一致性hash取到对应的节点
  926. _consistentHashWeight.getNodeName(hashCode, sNode);
  927. auto it = _indexActiveProxys.find(sNode);
  928. // 节点不存在,可能是下线或者服务不可用
  929. if (it == _indexActiveProxys.end())
  930. {
  931. updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
  932. continue;
  933. }
  934. //被hash到的节点在主控是active的才走在流程
  935. if (it->second->isActiveInReg() && it->second->checkActive(true))
  936. {
  937. return it->second;
  938. }
  939. else
  940. {
  941. TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl);
  942. // 剔除节点再次hash
  943. if (!it->second->isActiveInReg())
  944. {
  945. // 如果在主控的注册状态不是active直接删除,如果状态有变更由updateEndpoints函数里重新添加
  946. _indexActiveProxys.erase(sNode);
  947. }
  948. // checkConHashChange里重新加回到_sortActivProxys重试
  949. _sortActivProxys.erase(sNode);
  950. updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
  951. if (_indexActiveProxys.empty())
  952. {
  953. TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
  954. return NULL;
  955. }
  956. }
  957. }
  958. /*if(_consistentHashWeight.size() > 0)
  959. {
  960. unsigned int iIndex = 0;
  961. // 通过一致性hash取到对应的节点
  962. _consistentHashWeight.getIndex(hashCode, iIndex);
  963. if(iIndex >= _vRegProxys.size())
  964. {
  965. iIndex = iIndex % _vRegProxys.size();
  966. }
  967. //被hash到的节点在主控是active的才走在流程
  968. if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive(true))
  969. {
  970. return _vRegProxys[iIndex];
  971. }
  972. else
  973. {
  974. TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "," << _vRegProxys[iIndex]->getTransceiver()->getEndpointInfo().desc() << endl);
  975. if(_activeProxys.empty())
  976. {
  977. TLOGERROR("[EndpointManager::getConHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
  978. return NULL;
  979. }
  980. //在active节点中再次hash
  981. vector<AdapterProxy*> thisHash = _activeProxys;
  982. vector<AdapterProxy*> conn;
  983. size_t hash = 0;
  984. do
  985. {
  986. hash = ((int64_t)hashCode) % thisHash.size();
  987. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  988. if(hash >= thisHash.size())
  989. {
  990. hash = hash % thisHash.size();
  991. }
  992. if (thisHash[hash]->checkActive(false))
  993. {
  994. return thisHash[hash];
  995. }
  996. if(!thisHash[hash]->isConnTimeout() && !thisHash[hash]->isConnExc())
  997. {
  998. conn.push_back(thisHash[hash]);
  999. }
  1000. thisHash.erase(thisHash.begin() + hash);
  1001. }
  1002. while(!thisHash.empty());
  1003. if(conn.size() > 0)
  1004. {
  1005. hash = ((int64_t)hashCode) % conn.size();
  1006. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1007. if(hash >= conn.size())
  1008. {
  1009. hash = hash % conn.size();
  1010. }
  1011. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1012. AdapterProxy *adapterProxy = conn[hash];
  1013. //该proxy可能已经被屏蔽,需重新连一次
  1014. adapterProxy->checkActive(true);
  1015. return adapterProxy;
  1016. }
  1017. //所有adapter都有问题 选不到结点,随机找一个重试
  1018. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  1019. adapterProxy->resetRetryTime(false);
  1020. //该proxy可能已经被屏蔽,需重新连一次
  1021. adapterProxy->checkActive(true);
  1022. return adapterProxy;
  1023. }
  1024. }*/
  1025. return getHashProxyForNormal(hashCode);
  1026. }
  1027. bool EndpointManager::checkHashStaticWeightChange(bool bStatic)
  1028. {
  1029. if(bStatic)
  1030. {
  1031. if(_lastHashStaticProxys.size() != _vRegProxys.size())
  1032. {
  1033. return true;
  1034. }
  1035. for(size_t i = 0; i < _vRegProxys.size(); i++)
  1036. {
  1037. //解决服务权重更新时哈希表不更新的问题
  1038. if((_lastHashStaticProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc()) || _vRegProxys[i]->checkWeightChanged(true))
  1039. {
  1040. return true;
  1041. }
  1042. }
  1043. }
  1044. return false;
  1045. }
  1046. bool EndpointManager::checkConHashChange(bool bStatic, const map<string, AdapterProxy*> &mLastConHashProxys)
  1047. {
  1048. // 将之前故障临时剔除的节点重新加回来重试
  1049. if (_indexActiveProxys.size() != _sortActivProxys.size())
  1050. {
  1051. for (auto &it : _indexActiveProxys)
  1052. {
  1053. _sortActivProxys[it.first] = it.second;
  1054. }
  1055. }
  1056. if(mLastConHashProxys.size() != _sortActivProxys.size())
  1057. {
  1058. return true;
  1059. }
  1060. auto itLast = mLastConHashProxys.begin();
  1061. auto itSort = _sortActivProxys.begin();
  1062. for (; itLast!=mLastConHashProxys.end() && itSort!=_sortActivProxys.end(); ++itLast,++itSort)
  1063. {
  1064. if (itLast->first != itSort->first)
  1065. {
  1066. return true;
  1067. }
  1068. //解决服务权重更新时一致性哈希环不更新的问题
  1069. if(bStatic && itSort->second->checkWeightChanged(true))
  1070. {
  1071. return true;
  1072. }
  1073. }
  1074. /*
  1075. if(vLastConHashProxys.size() != _vRegProxys.size())
  1076. {
  1077. return true;
  1078. }
  1079. for(size_t i = 0; i < _vRegProxys.size(); i++)
  1080. {
  1081. if(vLastConHashProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc())
  1082. {
  1083. return true;
  1084. }
  1085. //解决服务权重更新时一致性哈希环不更新的问题
  1086. if(bStatic && _vRegProxys[i]->checkWeightChanged(true))
  1087. {
  1088. return true;
  1089. }
  1090. }
  1091. */
  1092. return false;
  1093. }
  1094. void EndpointManager::updateHashProxyWeighted(bool bStatic)
  1095. {
  1096. if(_vRegProxys.size() <= 0)
  1097. {
  1098. TLOGERROR("[EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
  1099. return ;
  1100. }
  1101. if(bStatic)
  1102. {
  1103. _lastHashStaticProxys = _vRegProxys;
  1104. _hashStaticRouterCache.clear();
  1105. }
  1106. vector<AdapterProxy*> vRegProxys;
  1107. vector<size_t> vIndex;
  1108. for(size_t i = 0; i < _vRegProxys.size(); ++i)
  1109. {
  1110. if(_vRegProxys[i]->getWeight() > 0)
  1111. {
  1112. vRegProxys.push_back(_vRegProxys[i]);
  1113. vIndex.push_back(i);
  1114. }
  1115. //防止多个服务节点权重同时更新时哈希表多次更新
  1116. _vRegProxys[i]->resetWeightChanged();
  1117. }
  1118. if(vRegProxys.size() <= 0)
  1119. {
  1120. TLOGERROR("[EndpointManager::updateHashProxyWeighted vRegProxys is empty], bStatic:" << bStatic << endl);
  1121. return ;
  1122. }
  1123. size_t iHashStaticWeightSize = vRegProxys.size();
  1124. map<size_t, int> mIdToWeight;
  1125. multimap<int, size_t> mWeightToId;
  1126. size_t iMaxR = 0;
  1127. size_t iMaxRouterR = 0;
  1128. size_t iMaxWeight = vRegProxys[0]->getWeight();
  1129. size_t iMinWeight = vRegProxys[0]->getWeight();
  1130. size_t iTempWeight = 0;
  1131. for(size_t i = 1;i < iHashStaticWeightSize; i++)
  1132. {
  1133. iTempWeight = vRegProxys[i]->getWeight();
  1134. if(iTempWeight > iMaxWeight)
  1135. {
  1136. iMaxWeight = iTempWeight;
  1137. }
  1138. if(iTempWeight < iMinWeight)
  1139. {
  1140. iMinWeight = iTempWeight;
  1141. }
  1142. }
  1143. if(iMinWeight > 0)
  1144. {
  1145. iMaxR = iMaxWeight / iMinWeight;
  1146. if(iMaxR < iMinWeightLimit)
  1147. iMaxR = iMinWeightLimit;
  1148. if(iMaxR > iMaxWeightLimit)
  1149. iMaxR = iMaxWeightLimit;
  1150. }
  1151. else
  1152. {
  1153. iMaxR = 1;
  1154. iMaxWeight = 1;
  1155. }
  1156. for(size_t i = 0;i < iHashStaticWeightSize; i++)
  1157. {
  1158. int iWeight = (vRegProxys[i]->getWeight() * iMaxR) / iMaxWeight;
  1159. if(iWeight > 0)
  1160. {
  1161. iMaxRouterR += iWeight;
  1162. mIdToWeight.insert(map<size_t, int>::value_type(vIndex[i], iWeight));
  1163. mWeightToId.insert(make_pair(iWeight, vIndex[i]));
  1164. }
  1165. else
  1166. {
  1167. if(bStatic)
  1168. {
  1169. _hashStaticRouterCache.push_back(vIndex[i]);
  1170. }
  1171. }
  1172. TLOGTARS("EndpointManager::updateHashProxyWeighted bStatic:" << bStatic << "|_objName:" << _objName << "|endpoint:" << vRegProxys[i]->endpoint().desc() << "|iWeight:" << vRegProxys[i]->getWeight() << "|iWeightR:" << iWeight << "|iIndex:" << vIndex[i] << endl);
  1173. }
  1174. for(size_t i = 0; i < iMaxRouterR; i++)
  1175. {
  1176. bool bFirst = true;
  1177. multimap<int, size_t> mulTemp;
  1178. multimap<int, size_t>::reverse_iterator mIter = mWeightToId.rbegin();
  1179. while(mIter != mWeightToId.rend())
  1180. {
  1181. if(bFirst)
  1182. {
  1183. bFirst = false;
  1184. if(bStatic)
  1185. {
  1186. _hashStaticRouterCache.push_back(mIter->second);
  1187. }
  1188. mulTemp.insert(make_pair((mIter->first - iMaxRouterR + mIdToWeight[mIter->second]), mIter->second));
  1189. }
  1190. else
  1191. {
  1192. mulTemp.insert(make_pair((mIter->first + mIdToWeight[mIter->second]), mIter->second));
  1193. }
  1194. mIter++;
  1195. }
  1196. mWeightToId.clear();
  1197. mWeightToId.swap(mulTemp);
  1198. }
  1199. }
  1200. void EndpointManager::updateConHashProxyWeighted(bool bStatic, map<string, AdapterProxy*> &mLastConHashProxys, TC_ConsistentHashNew &conHash)
  1201. {
  1202. conHash.clear();
  1203. if(_sortActivProxys.empty())
  1204. {
  1205. TLOGERROR("[EndpointManager::updateHashProxyWeighted _indexActiveProxys is empty], bStatic:" << bStatic << endl);
  1206. return ;
  1207. }
  1208. mLastConHashProxys = _sortActivProxys;
  1209. for (auto it = _sortActivProxys.begin(); it != _sortActivProxys.end(); ++it)
  1210. {
  1211. int iWeight = (bStatic ? (it->second->getWeight()) : 100);
  1212. if(iWeight > 0)
  1213. {
  1214. iWeight = iWeight / 4;
  1215. if(iWeight <= 0)
  1216. {
  1217. iWeight = 1;
  1218. }
  1219. // 同一服务有多个obj的情况
  1220. // 同一hash值调用不同的obj会hash到不同的服务器
  1221. // 因为addNode会根据desc(ip+port)计算md5,导致顺序不一致
  1222. // 一致性hash用host进行索引,不使用index,这里传0
  1223. conHash.addNode(it->second->endpoint().host(), 0, iWeight);
  1224. }
  1225. //防止多个服务节点权重同时更新时一致性哈希环多次更新
  1226. it->second->resetWeightChanged();
  1227. }
  1228. /*
  1229. if(_vRegProxys.size() <= 0)
  1230. {
  1231. TLOGERROR("[EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
  1232. return ;
  1233. }
  1234. vLastConHashProxys = _vRegProxys;
  1235. conHash.clear();
  1236. for(size_t i = 0; i < _vRegProxys.size(); ++i)
  1237. {
  1238. int iWeight = (bStatic ? (_vRegProxys[i]->getWeight()) : 100);
  1239. if(iWeight > 0)
  1240. {
  1241. iWeight = iWeight / 4;
  1242. if(iWeight <= 0)
  1243. {
  1244. iWeight = 1;
  1245. }
  1246. // 同一服务有多个obj的情况
  1247. // 同一hash值调用不同的obj会hash到不同的服务器
  1248. // 因为addNode会根据desc(ip+port)计算md5,导致顺序不一致
  1249. conHash.addNode(_vRegProxys[i]->endpoint().host(), i, iWeight);
  1250. }
  1251. //防止多个服务节点权重同时更新时一致性哈希环多次更新
  1252. _vRegProxys[i]->resetWeightChanged();
  1253. }
  1254. */
  1255. conHash.sortNode();
  1256. }
  1257. AdapterProxy* EndpointManager::getHashProxyForNormal(int64_t hashCode)
  1258. {
  1259. if(_vRegProxys.empty())
  1260. {
  1261. TLOGERROR("[EndpointManager::getHashProxyForNormal _vRegProxys is empty]" << endl);
  1262. return NULL;
  1263. }
  1264. // 1 _vRegProxys从客户端启动之后,就不会再改变,除非有节点增加
  1265. // 2 如果有增加节点,则_vRegProxys顺序会重新排序,之前的hash会改变
  1266. // 3 节点下线后,需要下次启动客户端后,_vRegProxys内容才会生效
  1267. size_t hash = ((int64_t)hashCode) % _vRegProxys.size();
  1268. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1269. if(hash >= _vRegProxys.size())
  1270. {
  1271. hash = hash % _vRegProxys.size();
  1272. }
  1273. //被hash到的节点在主控是active的才走在流程
  1274. if (_vRegProxys[hash]->isActiveInReg() && _vRegProxys[hash]->checkActive(true))
  1275. {
  1276. return _vRegProxys[hash];
  1277. }
  1278. else
  1279. {
  1280. TLOGWARN("[EndpointManager::getHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << _vRegProxys[hash]->endpoint().desc() << endl);
  1281. if(_activeProxys.empty())
  1282. {
  1283. TLOGERROR("[EndpointManager::getHashProxyForNormal _activeEndpoints is empty]" << endl);
  1284. return NULL;
  1285. }
  1286. //在active节点中再次hash
  1287. vector<AdapterProxy*> thisHash = _activeProxys;
  1288. vector<AdapterProxy*> conn;
  1289. do
  1290. {
  1291. hash = ((int64_t)hashCode) % thisHash.size();
  1292. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1293. if(hash >= thisHash.size())
  1294. {
  1295. hash = hash % thisHash.size();
  1296. }
  1297. if (thisHash[hash]->checkActive(true))
  1298. {
  1299. return thisHash[hash];
  1300. }
  1301. if(!thisHash[hash]->isConnTimeout() &&
  1302. !thisHash[hash]->isConnExc())
  1303. {
  1304. conn.push_back(thisHash[hash]);
  1305. }
  1306. thisHash.erase(thisHash.begin() + hash);
  1307. }
  1308. while(!thisHash.empty());
  1309. if(conn.size() > 0)
  1310. {
  1311. hash = ((int64_t)hashCode) % conn.size();
  1312. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1313. if(hash >= conn.size())
  1314. {
  1315. hash = hash % conn.size();
  1316. }
  1317. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1318. AdapterProxy *adapterProxy = conn[hash];
  1319. //该proxy可能已经被屏蔽,需重新连一次
  1320. adapterProxy->checkActive(true);
  1321. return adapterProxy;
  1322. }
  1323. //所有adapter都有问题 选不到结点,随机找一个重试
  1324. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  1325. adapterProxy->resetRetryTime(false);
  1326. //该proxy可能已经被屏蔽,需重新连一次
  1327. adapterProxy->checkActive(true);
  1328. return adapterProxy;
  1329. }
  1330. }
  1331. AdapterProxy* EndpointManager::getConHashProxyForNormal(int64_t hashCode)
  1332. {
  1333. if(_vRegProxys.empty())
  1334. {
  1335. TLOGERROR("[EndpointManager::getConHashProxyForNormal _vRegProxys is empty]" << endl);
  1336. return NULL;
  1337. }
  1338. if(checkConHashChange(false, _lastConHashProxys))
  1339. {
  1340. int64_t iBegin = TNOWMS;
  1341. updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
  1342. int64_t iEnd = TNOWMS;
  1343. TLOGTARS("[EndpointManager::getConHashProxyForNormal update _objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
  1344. }
  1345. while(_consistentHash.size() > 0)
  1346. {
  1347. string sNode;
  1348. // 通过一致性hash取到对应的节点
  1349. _consistentHash.getNodeName(hashCode, sNode);
  1350. auto it = _indexActiveProxys.find(sNode);
  1351. // 节点不存在,可能是下线或者服务不可用
  1352. if (it == _indexActiveProxys.end())
  1353. {
  1354. updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
  1355. continue;
  1356. }
  1357. //被hash到的节点在主控是active的才走在流程
  1358. if (it->second->isActiveInReg() && it->second->checkActive(true))
  1359. {
  1360. return it->second;
  1361. }
  1362. else
  1363. {
  1364. TLOGWARN("[EndpointManager::getConHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl);
  1365. // 剔除节点再次hash
  1366. if (!it->second->isActiveInReg())
  1367. {
  1368. // 如果在主控的注册状态不是active直接删除,如果状态有变更由updateEndpoints函数里重新添加
  1369. _indexActiveProxys.erase(sNode);
  1370. }
  1371. // checkConHashChange里重新加回到_sortActivProxys重试
  1372. _sortActivProxys.erase(sNode);
  1373. updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
  1374. if (_indexActiveProxys.empty())
  1375. {
  1376. TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
  1377. return NULL;
  1378. }
  1379. }
  1380. }
  1381. /*
  1382. if(_consistentHash.size() > 0)
  1383. {
  1384. unsigned int iIndex = 0;
  1385. // 通过一致性hash取到对应的节点
  1386. _consistentHash.getIndex(hashCode, iIndex);
  1387. if(iIndex >= _vRegProxys.size())
  1388. {
  1389. iIndex = iIndex % _vRegProxys.size();
  1390. }
  1391. //被hash到的节点在主控是active的才走在流程
  1392. if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive(true))
  1393. {
  1394. return _vRegProxys[iIndex];
  1395. }
  1396. else
  1397. {
  1398. TLOGWARN("[EndpointManager::getConHashProxyForNormal, hash not active," << _objectProxy->name() << "," << _vRegProxys[iIndex]->getTransceiver()->getEndpointInfo().desc() << endl);
  1399. if(_activeProxys.empty())
  1400. {
  1401. TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
  1402. return NULL;
  1403. }
  1404. //在active节点中再次hash
  1405. vector<AdapterProxy*> thisHash = _activeProxys;
  1406. vector<AdapterProxy*> conn;
  1407. size_t hash = 0;
  1408. do
  1409. {
  1410. hash = ((int64_t)hashCode) % thisHash.size();
  1411. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1412. if(hash >= thisHash.size())
  1413. {
  1414. hash = hash % thisHash.size();
  1415. }
  1416. if (thisHash[hash]->checkActive(true))
  1417. {
  1418. return thisHash[hash];
  1419. }
  1420. if(!thisHash[hash]->isConnTimeout() &&
  1421. !thisHash[hash]->isConnExc())
  1422. {
  1423. conn.push_back(thisHash[hash]);
  1424. }
  1425. thisHash.erase(thisHash.begin() + hash);
  1426. }
  1427. while(!thisHash.empty());
  1428. if(conn.size() > 0)
  1429. {
  1430. hash = ((int64_t)hashCode) % conn.size();
  1431. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1432. if(hash >= conn.size())
  1433. {
  1434. hash = hash % conn.size();
  1435. }
  1436. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1437. AdapterProxy *adapterProxy = conn[hash];
  1438. //该proxy可能已经被屏蔽,需重新连一次
  1439. adapterProxy->checkActive(true);
  1440. return adapterProxy;
  1441. }
  1442. //所有adapter都有问题 选不到结点,随机找一个重试
  1443. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  1444. adapterProxy->resetRetryTime(false);
  1445. //该proxy可能已经被屏蔽,需重新连一次
  1446. adapterProxy->checkActive(true);
  1447. return adapterProxy;
  1448. }
  1449. }
  1450. */
  1451. return getHashProxyForNormal(hashCode);
  1452. }
  1453. AdapterProxy* EndpointManager::getWeightedProxy(bool bStaticWeighted)
  1454. {
  1455. return getWeightedForNormal(bStaticWeighted);
  1456. }
  1457. AdapterProxy* EndpointManager::getWeightedForNormal(bool bStaticWeighted)
  1458. {
  1459. if (_activeProxys.empty())
  1460. {
  1461. TLOGERROR("[EndpointManager::getWeightedForNormal activeEndpoints is empty][obj:"<<_objName<<"]" << endl);
  1462. return NULL;
  1463. }
  1464. int64_t iNow = TNOW;
  1465. if(_lastBuildWeightTime <= iNow)
  1466. {
  1467. updateProxyWeighted();
  1468. if(!_first)
  1469. {
  1470. _lastBuildWeightTime = iNow + _updateWeightInterval;
  1471. }
  1472. else
  1473. {
  1474. _first = false;
  1475. _lastBuildWeightTime = iNow + _updateWeightInterval + 5;
  1476. }
  1477. }
  1478. bool bEmpty = false;
  1479. int iActiveSize = _activeWeightProxy.size();
  1480. if(iActiveSize > 0)
  1481. {
  1482. size_t iProxyIndex = 0;
  1483. set<AdapterProxy*> sConn;
  1484. if(_staticRouterCache.size() > 0)
  1485. {
  1486. for(size_t i = 0;i < _staticRouterCache.size(); i++)
  1487. {
  1488. ++_lastSWeightPosition;
  1489. if(_lastSWeightPosition >= _staticRouterCache.size())
  1490. _lastSWeightPosition = 0;
  1491. iProxyIndex = _staticRouterCache[_lastSWeightPosition];
  1492. if(_activeWeightProxy[iProxyIndex]->checkActive(false))
  1493. {
  1494. return _activeWeightProxy[iProxyIndex];
  1495. }
  1496. if(!_activeWeightProxy[iProxyIndex]->isConnTimeout() &&
  1497. !_activeWeightProxy[iProxyIndex]->isConnExc())
  1498. {
  1499. sConn.insert(_activeWeightProxy[iProxyIndex]);
  1500. }
  1501. }
  1502. }
  1503. else
  1504. {
  1505. bEmpty = true;
  1506. }
  1507. if(!bEmpty)
  1508. {
  1509. if(sConn.size() > 0)
  1510. {
  1511. vector<AdapterProxy*> conn;
  1512. set<AdapterProxy*>::iterator it_conn = sConn.begin();
  1513. while(it_conn != sConn.end())
  1514. {
  1515. conn.push_back(*it_conn);
  1516. ++it_conn;
  1517. }
  1518. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1519. AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
  1520. //该proxy可能已经被屏蔽,需重新连一次
  1521. adapterProxy->checkActive(true);
  1522. return adapterProxy;
  1523. }
  1524. //所有adapter都有问题 选不到结点,随机找一个重试
  1525. AdapterProxy * adapterProxy = _activeWeightProxy[((uint32_t)rand() % iActiveSize)];
  1526. adapterProxy->resetRetryTime(false);
  1527. //该proxy可能已经被屏蔽,需重新连一次
  1528. adapterProxy->checkActive(true);
  1529. return adapterProxy;
  1530. }
  1531. }
  1532. vector<AdapterProxy*> conn;
  1533. for(size_t i=0;i<_activeProxys.size();i++)
  1534. {
  1535. ++_lastRoundPosition;
  1536. if(_lastRoundPosition >= _activeProxys.size())
  1537. _lastRoundPosition = 0;
  1538. if(_activeProxys[_lastRoundPosition]->checkActive(false))
  1539. {
  1540. return _activeProxys[_lastRoundPosition];
  1541. }
  1542. if(!_activeProxys[_lastRoundPosition]->isConnTimeout() &&
  1543. !_activeProxys[_lastRoundPosition]->isConnExc())
  1544. conn.push_back(_activeProxys[_lastRoundPosition]);
  1545. }
  1546. if(conn.size() > 0)
  1547. {
  1548. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1549. AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
  1550. //该proxy可能已经被屏蔽,需重新连一次
  1551. adapterProxy->checkActive(true);
  1552. return adapterProxy;
  1553. }
  1554. //所有adapter都有问题 选不到结点,随机找一个重试
  1555. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  1556. adapterProxy->resetRetryTime(false);
  1557. //该proxy可能已经被屏蔽,需重新连一次
  1558. adapterProxy->checkActive(true);
  1559. return adapterProxy;
  1560. }
  1561. void EndpointManager::updateProxyWeighted()
  1562. {
  1563. size_t iWeightProxySize = _activeProxys.size();
  1564. if(iWeightProxySize <= 0)
  1565. {
  1566. TLOGERROR("[EndpointManager::updateProxyWeighted _objName:" << _objName << ", activeProxys.size() <= 0]" << endl);
  1567. return ;
  1568. }
  1569. vector<AdapterProxy*> vProxy;
  1570. for(size_t i = 0; i < _activeProxys.size(); ++i)
  1571. {
  1572. if(_activeProxys[i]->getWeight() > 0)
  1573. {
  1574. vProxy.push_back(_activeProxys[i]);
  1575. }
  1576. }
  1577. iWeightProxySize = vProxy.size();
  1578. if(iWeightProxySize <= 0)
  1579. {
  1580. TLOGERROR("[EndpointManager::updateProxyWeighted _objName:" << _objName << ", vProxy.size() <= 0]" << endl);
  1581. return ;
  1582. }
  1583. if(_update)
  1584. {
  1585. _activeWeightProxy = vProxy;
  1586. updateStaticWeighted();
  1587. }
  1588. _update = false;
  1589. }
  1590. void EndpointManager::updateStaticWeighted()
  1591. {
  1592. size_t iWeightProxySize = _activeWeightProxy.size();
  1593. vector<int> vWeight;
  1594. vWeight.resize(iWeightProxySize);
  1595. for(size_t i = 0; i < iWeightProxySize; i++)
  1596. {
  1597. vWeight[i] = _activeWeightProxy[i]->getWeight();
  1598. }
  1599. dispatchEndpointCache(vWeight);
  1600. }
  1601. void EndpointManager::dispatchEndpointCache(const vector<int> &vWeight)
  1602. {
  1603. if(vWeight.size() <= 0)
  1604. {
  1605. TLOGERROR("[EndpointManager::dispatchEndpointCache vWeight.size() < 0]" << endl);
  1606. return ;
  1607. }
  1608. size_t iWeightProxySize = vWeight.size();
  1609. map<size_t, int> mIdToWeight;
  1610. multimap<int, size_t> mWeightToId;
  1611. size_t iMaxR = 0;
  1612. size_t iMaxRouterR = 0;
  1613. size_t iMaxWeight = 0;
  1614. size_t iMinWeight = 0;
  1615. size_t iTotalCapacty = 0;
  1616. size_t iTempWeight = 0;
  1617. for(size_t i = 0; i < vWeight.size(); ++i)
  1618. {
  1619. iTotalCapacty += vWeight[i];
  1620. }
  1621. _staticRouterCache.clear();
  1622. _lastSWeightPosition = 0;
  1623. _staticRouterCache.reserve(iTotalCapacty+100);
  1624. iMaxWeight = vWeight[0];
  1625. iMinWeight = vWeight[0];
  1626. for(size_t i = 1;i < iWeightProxySize; i++)
  1627. {
  1628. iTempWeight = vWeight[i];
  1629. if(iTempWeight > iMaxWeight)
  1630. {
  1631. iMaxWeight = iTempWeight;
  1632. }
  1633. if(iTempWeight < iMinWeight)
  1634. {
  1635. iMinWeight = iTempWeight;
  1636. }
  1637. }
  1638. if(iMinWeight > 0)
  1639. {
  1640. iMaxR = iMaxWeight / iMinWeight;
  1641. if(iMaxR < iMinWeightLimit)
  1642. iMaxR = iMinWeightLimit;
  1643. if(iMaxR > iMaxWeightLimit)
  1644. iMaxR = iMaxWeightLimit;
  1645. }
  1646. else
  1647. {
  1648. iMaxR = 1;
  1649. iMaxWeight = 1;
  1650. }
  1651. for(size_t i = 0;i < iWeightProxySize; i++)
  1652. {
  1653. int iWeight = (vWeight[i] * iMaxR) / iMaxWeight;
  1654. if(iWeight > 0)
  1655. {
  1656. iMaxRouterR += iWeight;
  1657. mIdToWeight.insert(map<size_t, int>::value_type(i, iWeight));
  1658. mWeightToId.insert(make_pair(iWeight, i));
  1659. }
  1660. else
  1661. {
  1662. _staticRouterCache.push_back(i);
  1663. }
  1664. TLOGTARS("EndpointManager::dispatchEndpointCache _objName:" << _objName << "|endpoint:" << _activeWeightProxy[i]->endpoint().desc() << "|iWeightR:" << iWeight << endl);
  1665. }
  1666. for(size_t i = 0; i < iMaxRouterR; i++)
  1667. {
  1668. bool bFirst = true;
  1669. multimap<int, size_t> mulTemp;
  1670. multimap<int, size_t>::reverse_iterator mIter = mWeightToId.rbegin();
  1671. while(mIter != mWeightToId.rend())
  1672. {
  1673. if(bFirst)
  1674. {
  1675. bFirst = false;
  1676. _staticRouterCache.push_back(mIter->second);
  1677. mulTemp.insert(make_pair((mIter->first - iMaxRouterR + mIdToWeight[mIter->second]), mIter->second));
  1678. }
  1679. else
  1680. {
  1681. mulTemp.insert(make_pair((mIter->first + mIdToWeight[mIter->second]), mIter->second));
  1682. }
  1683. mIter++;
  1684. }
  1685. mWeightToId.clear();
  1686. mWeightToId.swap(mulTemp);
  1687. }
  1688. }
  1689. /////////////////////////////////////////////////////////////////////////////
  1690. EndpointThread::EndpointThread(Communicator* pComm, const string & sObjName, GetEndpointType type, const string & sName, bool bFirstNetThread)
  1691. : QueryEpBase(pComm,bFirstNetThread,true)
  1692. , _type(type)
  1693. , _name(sName)
  1694. {
  1695. init(sObjName, "", true);
  1696. }
  1697. void EndpointThread::getEndpoints(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1698. {
  1699. //直连调用这个接口无效
  1700. if(_direct)
  1701. {
  1702. return ;
  1703. }
  1704. {
  1705. TC_LockT<TC_ThreadMutex> lock(_mutex);
  1706. refreshReg(_type,_name);
  1707. activeEndPoint = _activeEndPoint;
  1708. inactiveEndPoint = _inactiveEndPoint;
  1709. }
  1710. }
  1711. void EndpointThread::getTCEndpoints(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1712. {
  1713. //直连调用这个接口无效
  1714. if(_direct)
  1715. {
  1716. return ;
  1717. }
  1718. {
  1719. TC_LockT<TC_ThreadMutex> lock(_mutex);
  1720. refreshReg(_type,_name);
  1721. activeEndPoint = _activeTCEndPoint;
  1722. inactiveEndPoint = _inactiveTCEndPoint;
  1723. }
  1724. }
  1725. void EndpointThread::notifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive,bool bSync)
  1726. {
  1727. if(!bSync)
  1728. {
  1729. TC_LockT<TC_ThreadMutex> lock(_mutex);
  1730. update(active, inactive);
  1731. }
  1732. else
  1733. {
  1734. update(active, inactive);
  1735. }
  1736. }
  1737. void EndpointThread::update(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
  1738. {
  1739. _activeEndPoint.clear();
  1740. _inactiveEndPoint.clear();
  1741. _activeTCEndPoint.clear();
  1742. _inactiveTCEndPoint.clear();
  1743. set<EndpointInfo>::iterator iter= active.begin();
  1744. for(;iter != active.end(); ++iter)
  1745. {
  1746. // TC_Endpoint ep = (iter->host(), iter->port(), 3000, iter->type(), iter->grid());
  1747. _activeTCEndPoint.push_back(iter->getEndpoint());
  1748. _activeEndPoint.push_back(*iter);
  1749. }
  1750. iter = inactive.begin();
  1751. for(;iter != inactive.end(); ++iter)
  1752. {
  1753. // TC_Endpoint ep(iter->host(), iter->port(), 3000, iter->type(), iter->grid());
  1754. _inactiveTCEndPoint.push_back(iter->getEndpoint());
  1755. _inactiveEndPoint.push_back(*iter);
  1756. }
  1757. }
  1758. /////////////////////////////////////////////////////////////////////////////
  1759. EndpointManagerThread::EndpointManagerThread(Communicator * pComm,const string & sObjName)
  1760. :_communicator(pComm)
  1761. ,_objName(sObjName)
  1762. {
  1763. }
  1764. EndpointManagerThread::~EndpointManagerThread()
  1765. {
  1766. map<string,EndpointThread*>::iterator iter;
  1767. for(iter=_info.begin();iter != _info.end();iter++)
  1768. {
  1769. if(iter->second)
  1770. {
  1771. delete iter->second;
  1772. iter->second = NULL;
  1773. }
  1774. }
  1775. }
  1776. void EndpointManagerThread::getEndpoint(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1777. {
  1778. EndpointThread * pThread = getEndpointThread(E_DEFAULT,"");
  1779. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1780. }
  1781. void EndpointManagerThread::getEndpointByAll(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1782. {
  1783. EndpointThread * pThread = getEndpointThread(E_ALL,"");
  1784. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1785. }
  1786. void EndpointManagerThread::getEndpointBySet(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1787. {
  1788. EndpointThread * pThread = getEndpointThread(E_SET,sName);
  1789. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1790. }
  1791. void EndpointManagerThread::getEndpointByStation(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1792. {
  1793. EndpointThread * pThread = getEndpointThread(E_STATION,sName);
  1794. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1795. }
  1796. void EndpointManagerThread::getTCEndpoint(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1797. {
  1798. EndpointThread * pThread = getEndpointThread(E_DEFAULT,"");
  1799. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1800. }
  1801. void EndpointManagerThread::getTCEndpointByAll(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1802. {
  1803. EndpointThread * pThread = getEndpointThread(E_ALL,"");
  1804. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1805. }
  1806. void EndpointManagerThread::getTCEndpointBySet(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1807. {
  1808. EndpointThread * pThread = getEndpointThread(E_SET,sName);
  1809. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1810. }
  1811. void EndpointManagerThread::getTCEndpointByStation(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1812. {
  1813. EndpointThread * pThread = getEndpointThread(E_STATION,sName);
  1814. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1815. }
  1816. EndpointThread * EndpointManagerThread::getEndpointThread(GetEndpointType type,const string & sName)
  1817. {
  1818. TC_LockT<TC_SpinLock> lock(_mutex);
  1819. string sAllName = TC_Common::tostr((int)type) + ":" + sName;
  1820. map<string,EndpointThread*>::iterator iter;
  1821. iter = _info.find(sAllName);
  1822. if(iter != _info.end())
  1823. {
  1824. return iter->second;
  1825. }
  1826. EndpointThread * pThread = new EndpointThread(_communicator, _objName, type, sName);
  1827. _info[sAllName] = pThread;
  1828. return pThread;
  1829. }
  1830. }