EndpointManager.cpp 64 KB


  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "util/tc_port.h"
  17. #include "servant/EndpointManager.h"
  18. #include "servant/ObjectProxy.h"
  19. #include "servant/RemoteLogger.h"
  20. #include "servant/AppCache.h"
  21. #include "servant/Application.h"
  22. #include "servant/CommunicatorEpoll.h"
  23. #include "servant/StatReport.h"
  24. namespace tars
  25. {
  26. /////////////////////////////////////////////////////////////////////////////
  27. QueryEpBase::QueryEpBase(Communicator * pComm, bool bFirstNetThread,bool bInterfaceReq)
  28. : _communicator(pComm)
  29. , _firstNetThread(bFirstNetThread)
  30. , _interfaceReq(bInterfaceReq)
  31. , _direct(false)
  32. , _objName("")
  33. , _invokeSetId("")
  34. , _locator("")
  35. , _valid(false)
  36. , _weightType(E_LOOP)
  37. , _rootServant(true)
  38. , _requestRegistry(false)
  39. , _requestTimeout(0)
  40. , _timeoutInterval(5*1000)
  41. , _refreshTime(0)
  42. , _refreshInterval(60*1000)
  43. , _activeEmptyInterval(10*1000)
  44. , _failInterval(2*1000)
  45. , _manyFailInterval(30*1000)
  46. , _failTimesLimit(3)
  47. , _failTimes(0)
  48. {
  49. _refreshInterval = TC_Common::strto<int>(_communicator->getProperty("refresh-endpoint-interval", "60*1000"));
  50. if(_refreshInterval < 5*1000)
  51. {
  52. _refreshInterval = 5 * 1000;
  53. }
  54. setNoDelete(true);
  55. }
  56. void QueryEpBase::callback_findObjectById4All(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
  57. {
  58. TLOGTARS("[callback_findObjectById4All _objName:" << _objName << "|ret:" << ret
  59. << ",active:" << activeEp.size()
  60. << ",inactive:" << inactiveEp.size() << "]" << endl);
  61. doEndpoints(activeEp,inactiveEp,ret);
  62. }
  63. void QueryEpBase::callback_findObjectById4All_exception(Int32 ret)
  64. {
  65. TLOGERROR("[callback_findObjectById4All_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  66. doEndpointsExp(ret);
  67. }
  68. void QueryEpBase::callback_findObjectById4Any(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
  69. {
  70. TLOGTARS("[callback_findObjectById4Any _objName:" << _objName << "|ret:" << ret
  71. << ",active:" << activeEp.size()
  72. << ",inactive:" << inactiveEp.size() << "]" << endl);
  73. doEndpoints(activeEp,inactiveEp,ret);
  74. }
  75. void QueryEpBase::callback_findObjectById4Any_exception(Int32 ret)
  76. {
  77. TLOGERROR("[callback_findObjectById4Any_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  78. doEndpointsExp(ret);
  79. }
  80. void QueryEpBase::callback_findObjectByIdInSameGroup(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
  81. {
  82. TLOGTARS("[callback_findObjectByIdInSameGroup _objName:" << _objName << "|ret:"<<ret
  83. << ",active:" << activeEp.size()
  84. << ",inactive:" << inactiveEp.size() << "]" << endl);
  85. doEndpoints(activeEp,inactiveEp,ret);
  86. }
  87. void QueryEpBase::callback_findObjectByIdInSameGroup_exception(Int32 ret)
  88. {
  89. TLOGERROR("[callback_findObjectByIdInSameGroup_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  90. doEndpointsExp(ret);
  91. }
  92. void QueryEpBase::callback_findObjectByIdInSameSet( Int32 ret, const vector<EndpointF> &activeEp, const vector<EndpointF> & inactiveEp)
  93. {
  94. TLOGTARS("[callback_findObjectByIdInSameSet _objName:" << _objName << "|ret:" << ret
  95. << ",active:" << activeEp.size()
  96. << ",inactive:" << inactiveEp.size() << "]" << endl);
  97. doEndpoints(activeEp,inactiveEp,ret);
  98. }
  99. void QueryEpBase::callback_findObjectByIdInSameSet_exception( Int32 ret)
  100. {
  101. TLOGERROR("[callback_findObjectByIdInSameSet_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  102. doEndpointsExp(ret);
  103. }
  104. void QueryEpBase::callback_findObjectByIdInSameStation( Int32 ret, const vector<EndpointF> &activeEp, const vector<EndpointF> &inactiveEp)
  105. {
  106. TLOGTARS("[callback_findObjectByIdInSameStation _objName:" << _objName << "|ret:" << ret
  107. << ",active:" << activeEp.size()
  108. << ",inactive:" << inactiveEp.size() << "]" << endl);
  109. doEndpoints(activeEp,inactiveEp,ret);
  110. }
  111. void QueryEpBase::callback_findObjectByIdInSameStation_exception( Int32 ret)
  112. {
  113. TLOGERROR("[callback_findObjectByIdInSameStation_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  114. doEndpointsExp(ret);
  115. }
  116. int QueryEpBase::setLocatorPrx(QueryFPrx prx)
  117. {
  118. _queryFPrx = prx;
  119. return 0;
  120. }
  121. bool QueryEpBase::init(const string & sObjName, const string& setName, bool rootServant)
  122. {
  123. _locator = _communicator->getProperty("locator");
  124. TLOGTARS("QueryEpBase::init sObjName:" << sObjName << ", sLocator:" << _locator << ", setName:" << setName << ", rootServant: " << rootServant << endl);
  125. // LOG_CONSOLE_DEBUG << "QueryEpBase::init sObjName:" << sObjName << ", sLocator:" << _locator << ", setName:" << setName << ", rootServant: " << rootServant << endl;
  126. _invokeSetId = setName;
  127. _rootServant = rootServant;
  128. setObjName(sObjName);
  129. return true;
  130. }
  131. void QueryEpBase::setObjName(const string & sObjName)
  132. {
  133. string::size_type pos = sObjName.find_first_of('@');
  134. string sEndpoints;
  135. string sInactiveEndpoints;
  136. if (pos != string::npos)
  137. {
  138. //[直接连接]指定服务的IP和端口列表
  139. _objName = sObjName.substr(0,pos);
  140. sEndpoints = sObjName.substr(pos + 1);
  141. pos = _objName.find_first_of("#");
  142. if(pos != string::npos)
  143. {
  144. _objName = _objName.substr(0, pos);
  145. }
  146. _direct = true;
  147. _valid = true;
  148. }
  149. else
  150. {
  151. //[间接连接]通过registry查询服务端的IP和端口列表
  152. _direct = false;
  153. _valid = false;
  154. _objName = sObjName;
  155. if(_locator.find_first_not_of('@') == string::npos)
  156. {
  157. TLOGERROR("[QueryEpBase::setObjName locator is not valid,_locator:" << _locator << "]" << endl);
  158. throw TarsRegistryException("locator is not valid,_locator:" + _locator);
  159. }
  160. pos = _objName.find_first_of("#");
  161. if(pos != string::npos)
  162. {
  163. _objName = _objName.substr(0, pos);
  164. }
  165. _queryFPrx = _communicator->stringToProxy<QueryFPrx>(_locator);
  166. string sLocatorKey = _locator;
  167. //如果启用set,则获取按set分组的缓存
  168. if(ClientConfig::SetOpen)
  169. {
  170. sLocatorKey += "_" + ClientConfig::SetDivision;
  171. }
  172. string objName = _objName + string(_invokeSetId.empty() ? "" : ":") + _invokeSetId;
  173. //[间接连接]第一次使用cache,如果是接口级请求则不从缓存读取
  174. if(!_interfaceReq)
  175. {
  176. sEndpoints = AppCache::getInstance()->get(objName,sLocatorKey);
  177. sInactiveEndpoints = AppCache::getInstance()->get("inactive_"+objName,sLocatorKey);
  178. }
  179. }
  180. setEndpoints(sEndpoints,_activeEndpoints);
  181. setEndpoints(sInactiveEndpoints,_inactiveEndpoints);
  182. if(!_activeEndpoints.empty())
  183. {
  184. _valid = true;
  185. }
  186. if((!_activeEndpoints.empty() || !_inactiveEndpoints.empty()))
  187. {
  188. //非直接指定端口, 且从cache中能查到服务端口的, 不需要通知所有ObjectProxy更新地址
  189. notifyEndpoints(_activeEndpoints,_inactiveEndpoints,true);
  190. }
  191. }
  192. void QueryEpBase::setEndpoints(const string & sEndpoints, set<EndpointInfo> & setEndpoints)
  193. {
  194. if(sEndpoints == "")
  195. {
  196. return ;
  197. }
  198. bool bSameWeightType = true;
  199. bool bFirstWeightType = true;
  200. unsigned int iWeightType = 0;
  201. vector<string> vEndpoints = TC_Endpoint::sepEndpoint(sEndpoints);
  202. for (size_t i = 0; i < vEndpoints.size(); ++i)
  203. {
  204. try
  205. {
  206. TC_Endpoint ep(vEndpoints[i]);
  207. string sSetDivision;
  208. //解析set分组信息
  209. if (!_direct)
  210. {
  211. string sep = " -s ";
  212. size_t pos = vEndpoints[i].rfind(sep);
  213. if (pos != string::npos)
  214. {
  215. sSetDivision = TC_Common::trim(vEndpoints[i].substr(pos+sep.size()));
  216. size_t endPos = sSetDivision.find(" ");
  217. if (endPos != string::npos)
  218. {
  219. sSetDivision = sSetDivision.substr(0, endPos);
  220. }
  221. }
  222. }
  223. if(bFirstWeightType)
  224. {
  225. bFirstWeightType = false;
  226. iWeightType = ep.getWeightType();
  227. }
  228. else
  229. {
  230. if(ep.getWeightType() != iWeightType)
  231. {
  232. bSameWeightType = false;
  233. }
  234. }
  235. EndpointInfo epi(ep, sSetDivision);
  236. setEndpoints.insert(epi);
  237. }
  238. catch (exception &ex)
  239. {
  240. TLOGERROR("[QueryEpBase::setEndpoints parse error,objname:" << _objName << ",endpoint:" << vEndpoints[i] << "]" << endl);
  241. }
  242. }
  243. if(bSameWeightType)
  244. {
  245. if(iWeightType == 1)
  246. {
  247. _weightType = E_STATIC_WEIGHT;
  248. }
  249. else
  250. {
  251. _weightType = E_LOOP;
  252. }
  253. }
  254. else
  255. {
  256. _weightType = E_LOOP;
  257. }
  258. }
  259. void QueryEpBase::refreshReg(GetEndpointType type, const string & sName)
  260. {
  261. onUpdateOutter();
  262. if(_direct)
  263. {
  264. return;
  265. }
  266. int64_t iNow = TNOWMS;
  267. //正在请求状态 而且请求超时了,或者第一次
  268. if(_requestRegistry && _requestTimeout < iNow)
  269. {
  270. doEndpointsExp(0);
  271. }
  272. //如果是间接连接,通过registry定时查询服务列表
  273. //正在请求状态 而且请求超时了
  274. //非请求状态 到了下一个刷新时间了
  275. if( (!_requestRegistry) && (_refreshTime <= iNow))
  276. {
  277. _requestRegistry = true;
  278. //一定时间不回调就算超时了
  279. _requestTimeout = iNow + _timeoutInterval;
  280. TLOGTARS("[QueryEpBase::refresh," << _objName << "]" <<endl);
  281. if(_valid && !_rootServant)
  282. {
  283. return;
  284. }
  285. //判断是同步调用还是异步调用
  286. //内部请求主控都是异步请求
  287. //接口请求主控第一次是同步请求
  288. bool bSync = (!_valid && _interfaceReq);
  289. //如果是异步且不是根servant(通过#1创建的servant, 不主动更新主控信息)
  290. if(!bSync && !_rootServant)
  291. return;
  292. try
  293. {
  294. if(bSync)
  295. {
  296. vector<EndpointF> activeEp;
  297. vector<EndpointF> inactiveEp;
  298. int iRet = 0;
  299. switch(type)
  300. {
  301. case E_ALL:
  302. {
  303. iRet = _queryFPrx->findObjectById4Any(_objName,activeEp,inactiveEp, ServerConfig::Context);
  304. break;
  305. }
  306. case E_STATION:
  307. {
  308. iRet = _queryFPrx->findObjectByIdInSameStation(_objName,sName,activeEp,inactiveEp, ServerConfig::Context);
  309. break;
  310. }
  311. case E_SET:
  312. {
  313. iRet = _queryFPrx->findObjectByIdInSameSet(_objName,sName,activeEp,inactiveEp, ServerConfig::Context);
  314. break;
  315. }
  316. case E_DEFAULT:
  317. default:
  318. {
  319. if(ClientConfig::SetOpen || !_invokeSetId.empty())
  320. {
  321. //指定set调用时,指定set的优先级最高
  322. string setId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
  323. iRet = _queryFPrx->findObjectByIdInSameSet(_objName,setId,activeEp,inactiveEp, ServerConfig::Context);
  324. }
  325. else
  326. {
  327. iRet = _queryFPrx->findObjectByIdInSameGroup(_objName,activeEp,inactiveEp, ServerConfig::Context);
  328. }
  329. break;
  330. }
  331. }
  332. doEndpoints(activeEp, inactiveEp, iRet, true);
  333. }
  334. else
  335. {
  336. switch(type)
  337. {
  338. case E_ALL:
  339. {
  340. _queryFPrx->async_findObjectById4Any(this,_objName, ServerConfig::Context);
  341. break;
  342. }
  343. case E_STATION:
  344. {
  345. _queryFPrx->async_findObjectByIdInSameStation(this,_objName,sName, ServerConfig::Context);
  346. break;
  347. }
  348. case E_SET:
  349. {
  350. _queryFPrx->async_findObjectByIdInSameSet(this,_objName,sName, ServerConfig::Context);
  351. break;
  352. }
  353. case E_DEFAULT:
  354. default:
  355. {
  356. if(ClientConfig::SetOpen || !_invokeSetId.empty())
  357. {
  358. //指定set调用时,指定set的优先级最高
  359. string setId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
  360. _queryFPrx->async_findObjectByIdInSameSet(this,_objName,setId, ServerConfig::Context);
  361. }
  362. else
  363. {
  364. _queryFPrx->async_findObjectByIdInSameGroup(this,_objName, ServerConfig::Context);
  365. }
  366. break;
  367. }
  368. }//end switch
  369. }
  370. }
  371. catch(TC_Exception & ex)
  372. {
  373. TLOGERROR("[QueryEpBase::refreshReg obj:"<<_objName<<"exception:"<<ex.what() << "]"<<endl);
  374. doEndpointsExp(TARSSERVERUNKNOWNERR);
  375. }
  376. catch(...)
  377. {
  378. TLOGERROR("[QueryEpBase::refreshReg obj:"<<_objName<<"unknown exception]" <<endl);
  379. doEndpointsExp(TARSSERVERUNKNOWNERR);
  380. }
  381. }
  382. }
  383. void QueryEpBase::doEndpoints(const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp, int iRet, bool bSync)
  384. {
  385. if(iRet != 0)
  386. {
  387. doEndpointsExp(iRet);
  388. return ;
  389. }
  390. _failTimes = 0;
  391. _requestRegistry = false;
  392. int64_t iNow = TNOWMS;
  393. //有返回成功的结点,按照正常的频率
  394. //如果返回空列表或者返回失败 2s去刷新一次
  395. //接口请求主控的方式 不管是不是空都要去刷新
  396. if(activeEp.empty() && (!_interfaceReq) )
  397. {
  398. _refreshTime = iNow + _activeEmptyInterval;
  399. //如果registry返回Active服务列表为空,不做更新
  400. TLOGERROR("[QueryEpBase::doEndpoints, callback activeEps is empty,objname:"<< _objName << "]" << endl);
  401. return;
  402. }
  403. else
  404. {
  405. _refreshTime = iNow + _refreshInterval;
  406. }
  407. bool bNeedNotify = false;
  408. bool bSameWeightType = true;
  409. bool bFirstWeightType = true;
  410. int iWeightType = 0;
  411. set<string> sActiveEndpoints;
  412. set<string> sInactiveEndpoints;
  413. set<EndpointInfo> activeEps;
  414. set<EndpointInfo> inactiveEps;
  415. //生成active set 用于比较
  416. for (uint32_t i = 0; i < activeEp.size(); ++i)
  417. {
  418. if(bFirstWeightType)
  419. {
  420. bFirstWeightType = false;
  421. iWeightType = activeEp[i].weightType;
  422. }
  423. else
  424. {
  425. if(activeEp[i].weightType != iWeightType)
  426. {
  427. bSameWeightType = false;
  428. }
  429. }
  430. // taf istcp意思和这里枚举值对应
  431. activeEps.insert(EndpointInfo(activeEp[i]));
  432. }
  433. //生成inactive set 用于比较
  434. for (uint32_t i = 0; i < inactiveEp.size(); ++i)
  435. {
  436. // taf istcp意思和这里枚举值对应
  437. inactiveEps.insert(EndpointInfo(inactiveEp[i]));
  438. }
  439. if(bSameWeightType)
  440. {
  441. if(iWeightType == 1)
  442. {
  443. _weightType = E_STATIC_WEIGHT;
  444. }
  445. else
  446. {
  447. _weightType = E_LOOP;
  448. }
  449. }
  450. else
  451. {
  452. _weightType = E_LOOP;
  453. }
  454. if(activeEps != _activeEndpoints)
  455. {
  456. bNeedNotify = true;
  457. _activeEndpoints = activeEps;
  458. if(_firstNetThread)
  459. {
  460. setEndPointToCache(false);
  461. }
  462. }
  463. if(inactiveEps != _inactiveEndpoints)
  464. {
  465. bNeedNotify = true;
  466. _inactiveEndpoints = inactiveEps;
  467. if(_firstNetThread)
  468. {
  469. setEndPointToCache(true);
  470. }
  471. }
  472. if(bNeedNotify)
  473. {
  474. notifyEndpoints(_activeEndpoints,_inactiveEndpoints,bSync);
  475. }
  476. if(!_valid)
  477. {
  478. _valid = true;
  479. doNotify();
  480. }
  481. }
  482. void QueryEpBase::doEndpointsExp(int iRet)
  483. {
  484. _failTimes++;
  485. _requestRegistry = false;
  486. int64_t iNow = TNOWMS;
  487. //频率控制获取主控失败 2秒钟再更新
  488. _refreshTime = iNow + _failInterval;
  489. //获取主控连续失败3次就等30s再更新一次
  490. //连续失败 强制设成数据是有效的
  491. if(_failTimes > _failTimesLimit)
  492. {
  493. if(!_valid)
  494. {
  495. _valid = true;
  496. doNotify();
  497. }
  498. _refreshTime = iNow + _manyFailInterval;
  499. }
  500. }
  501. void QueryEpBase::setEndPointToCache(bool bInactive)
  502. {
  503. //如果是接口级请求则不缓存到文件
  504. if(_interfaceReq)
  505. {
  506. return;
  507. }
  508. string sEndpoints;
  509. set<EndpointInfo> doEndpoints;
  510. if(!bInactive)
  511. {
  512. doEndpoints = _activeEndpoints;
  513. }
  514. else
  515. {
  516. doEndpoints = _inactiveEndpoints;
  517. }
  518. set<EndpointInfo>::iterator iter;
  519. iter = doEndpoints.begin();
  520. for (; iter != doEndpoints.end(); ++iter)
  521. {
  522. //这里的超时时间 只是对服务端有效。这里的值无效。所以默认用3000了
  523. TC_Endpoint ep = iter->getEndpoint();
  524. if (!sEndpoints.empty())
  525. {
  526. sEndpoints += ":";
  527. }
  528. sEndpoints += ep.toString();
  529. if (!iter->setDivision().empty())
  530. {
  531. sEndpoints += " -s " + iter->setDivision();
  532. }
  533. }
  534. //如果启用set,则按set分组保存
  535. string sLocatorKey = _locator;
  536. if(ClientConfig::SetOpen)
  537. {
  538. sLocatorKey += "_" + ClientConfig::SetDivision;
  539. }
  540. string objName = _objName + string(_invokeSetId.empty()?"":":") + _invokeSetId;
  541. if(bInactive)
  542. {
  543. AppCache::getInstance()->set("inactive_"+objName,sEndpoints,sLocatorKey);
  544. }
  545. else
  546. {
  547. AppCache::getInstance()->set(objName,sEndpoints,sLocatorKey);
  548. }
  549. TLOGTARS("[setEndPointToCache,obj:" << _objName << ",invokeSetId:" << _invokeSetId << ",endpoint:" << sEndpoints << "]" << endl);
  550. }
  551. EndpointManager::EndpointManager(ObjectProxy * pObjectProxy, Communicator* pComm, bool bFirstNetThread)
  552. : QueryEpBase(pComm, bFirstNetThread, false)
  553. ,_objectProxy(pObjectProxy)
  554. ,_lastRoundPosition(0)
  555. ,_update(true)
  556. ,_updateWeightInterval(60)
  557. ,_lastSWeightPosition(0)
  558. ,_consistentHashWeight(E_TC_CONHASH_KETAMAHASH)
  559. ,_consistentHash(E_TC_CONHASH_KETAMAHASH)
  560. {
  561. setNetThreadProcess(true);
  562. }
  563. EndpointManager::~EndpointManager()
  564. {
  565. map<string,AdapterProxy*>::iterator iterAdapter;
  566. for(iterAdapter = _allProxys.begin();iterAdapter != _allProxys.end();iterAdapter++)
  567. {
  568. if(iterAdapter->second)
  569. {
  570. delete iterAdapter->second;
  571. iterAdapter->second = NULL;
  572. }
  573. }
  574. _allProxys.clear();
  575. }
  576. void EndpointManager::onUpdateOutter()
  577. {
  578. // LOG_CONSOLE_DEBUG << this->_objectProxy << ", valid:" << _valid << ", " << _outterUpdate.get() << endl;
  579. assert(this->_objectProxy->getCommunicatorEpoll()->getThreadId() == this_thread::get_id());
  580. lock_guard<mutex> l(_outterLocker);
  581. if(_outterUpdate)
  582. {
  583. shared_ptr<OutterUpdate> outterUpdate = _outterUpdate;
  584. updateEndpoints(outterUpdate->active, outterUpdate->inactive);
  585. _valid = true;
  586. _outterUpdate.reset();
  587. }
  588. }
  589. void EndpointManager::updateEndpointsOutter(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
  590. {
  591. // LOG_CONSOLE_DEBUG << this->_objectProxy << ", " << active.begin()->desc() << endl;
  592. //创新新对象, 避免线程冲突
  593. shared_ptr<OutterUpdate> outterUpdate = std::make_shared<OutterUpdate>();
  594. outterUpdate->active = active;
  595. outterUpdate->inactive = inactive;
  596. //更新时间
  597. _refreshTime = TNOWMS + _refreshInterval;
  598. lock_guard<mutex> l(_outterLocker);
  599. _outterUpdate = outterUpdate;
  600. }
  601. void EndpointManager::updateEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
  602. {
  603. TLOGTARS("[EndpointManager::updateEndpoints obj:" << this->_objName << ", active:" << active.size() << ", inactive size:" << inactive.size() << endl);
  604. pair<map<string,AdapterProxy*>::iterator,bool> result;
  605. _activeProxys.clear();
  606. _regProxys.clear();
  607. _indexActiveProxys.clear();
  608. _sortActivProxys.clear();
  609. if(!active.empty())
  610. {
  611. //先把服务都设置为非活跃
  612. for (auto iter = _allProxys.begin(); iter != _allProxys.end(); ++iter)
  613. {
  614. iter->second->setActiveInReg(false);
  615. }
  616. }
  617. //更新active
  618. for(auto iter = active.begin(); iter != active.end(); ++iter)
  619. {
  620. if(!_direct && _weightType == E_STATIC_WEIGHT && iter->weight() <= 0)
  621. {
  622. continue;
  623. }
  624. // LOG_CONSOLE_DEBUG << std::this_thread::get_id() << ", allProxys size:" << _allProxys.size() << ", " << iter->cmpDesc() << endl;
  625. auto iterAdapter = _allProxys.find(iter->cmpDesc());
  626. if(iterAdapter == _allProxys.end())
  627. {
  628. AdapterProxy* ap = new AdapterProxy(_objectProxy, *iter, _communicator);
  629. result = _allProxys.insert(make_pair(iter->cmpDesc(),ap));
  630. iterAdapter = result.first;
  631. _vAllProxys.push_back(ap);
  632. }
  633. //该节点在主控的状态为active
  634. iterAdapter->second->setActiveInReg(true);
  635. _activeProxys.push_back(iterAdapter->second);
  636. _regProxys.insert(make_pair(iter->cmpDesc(),iterAdapter->second));
  637. const string &host = iterAdapter->second->endpoint().host();
  638. _indexActiveProxys.insert(make_pair(host, iterAdapter->second));
  639. _sortActivProxys.insert(make_pair(host, iterAdapter->second));
  640. //设置该节点的静态权重值
  641. iterAdapter->second->setWeight(iter->weight());
  642. }
  643. //更新inactive
  644. for(auto iter = inactive.begin(); iter != inactive.end(); ++iter)
  645. {
  646. if(!_direct && _weightType == E_STATIC_WEIGHT && iter->weight() <= 0)
  647. {
  648. continue;
  649. }
  650. auto iterAdapter = _allProxys.find(iter->cmpDesc());
  651. if(iterAdapter == _allProxys.end())
  652. {
  653. AdapterProxy* ap = new AdapterProxy(_objectProxy, *iter, _communicator);
  654. result = _allProxys.insert(make_pair(iter->cmpDesc(),ap));
  655. assert(result.second);
  656. iterAdapter = result.first;
  657. _vAllProxys.push_back(ap);
  658. }
  659. //该节点在主控的状态为inactive
  660. iterAdapter->second->setActiveInReg(false);
  661. _regProxys.insert(make_pair(iter->cmpDesc(),iterAdapter->second));
  662. //设置该节点的静态权重值
  663. iterAdapter->second->setWeight(iter->weight());
  664. }
  665. //_vRegProxys 需要按顺序来 重排
  666. _vRegProxys.clear();
  667. auto iterAdapter = _regProxys.begin();
  668. for(;iterAdapter != _regProxys.end();++iterAdapter)
  669. {
  670. _vRegProxys.push_back(iterAdapter->second);
  671. }
  672. _update = true;
  673. }
  674. void EndpointManager::notifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive,bool bNotify)
  675. {
  676. updateEndpoints(active, inactive);
  677. //丢给外层统一做
  678. _objectProxy->onNotifyEndpoints(active, inactive);
  679. }
  680. void EndpointManager::doNotify()
  681. {
  682. _objectProxy->doInvoke();
  683. }
  684. bool EndpointManager::selectAdapterProxy(ReqMessage * msg,AdapterProxy * & pAdapterProxy)
  685. {
  686. pAdapterProxy = NULL;
  687. //刷新主控
  688. refreshReg(E_DEFAULT, "");
  689. //无效的数据 返回true
  690. if (!_valid)
  691. {
  692. return true;
  693. }
  694. //如果有hash,则先使用hash策略
  695. if (msg->data._hash)
  696. {
  697. pAdapterProxy = getHashProxy(msg->data._hashCode, msg->data._conHash);
  698. return false;
  699. }
  700. if(_weightType == E_STATIC_WEIGHT)
  701. {
  702. //权重模式
  703. bool bStaticWeighted = false;
  704. if(_weightType == E_STATIC_WEIGHT || msg->eType == ReqMessage::ONE_WAY)
  705. bStaticWeighted = true;
  706. pAdapterProxy = getWeightedProxy(bStaticWeighted);
  707. }
  708. else
  709. {
  710. //普通轮询模式
  711. pAdapterProxy = getNextValidProxy();
  712. }
  713. return false;
  714. }
  715. AdapterProxy * EndpointManager::getNextValidProxy()
  716. {
  717. if (_activeProxys.empty())
  718. {
  719. TLOGERROR("[EndpointManager::getNextValidProxy activeEndpoints is empty][obj:"<<_objName<<"]" << endl);
  720. return NULL;
  721. }
  722. vector<AdapterProxy*> conn;
  723. for(size_t i=0;i<_activeProxys.size();i++)
  724. {
  725. ++_lastRoundPosition;
  726. if(_lastRoundPosition >= _activeProxys.size())
  727. {
  728. _lastRoundPosition = 0;
  729. }
  730. if(_activeProxys[_lastRoundPosition]->checkActive(false))
  731. {
  732. return _activeProxys[_lastRoundPosition];
  733. }
  734. if(!_activeProxys[_lastRoundPosition]->isConnTimeout() && !_activeProxys[_lastRoundPosition]->isConnExc()) {
  735. conn.push_back(_activeProxys[_lastRoundPosition]);
  736. }
  737. }
  738. if(conn.size() > 0)
  739. {
  740. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  741. AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
  742. //该proxy可能已经被屏蔽,需重新连一次
  743. adapterProxy->checkActive(true);
  744. return adapterProxy;
  745. }
  746. //所有adapter都有问题 选不到结点,随机找一个重试
  747. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  748. adapterProxy->resetRetryTime(false);
  749. //该proxy可能已经被屏蔽,需重新连一次
  750. adapterProxy->checkActive(true);
  751. return adapterProxy;
  752. }
  753. AdapterProxy* EndpointManager::getHashProxy(int64_t hashCode, bool bConsistentHash)
  754. {
  755. if(_weightType == E_STATIC_WEIGHT)
  756. {
  757. if(bConsistentHash)
  758. {
  759. return getConHashProxyForWeight(hashCode, true);
  760. }
  761. else
  762. {
  763. return getHashProxyForWeight(hashCode, true, _hashStaticRouterCache);
  764. }
  765. }
  766. else
  767. {
  768. if(bConsistentHash)
  769. {
  770. return getConHashProxyForNormal(hashCode);
  771. }
  772. else
  773. {
  774. return getHashProxyForNormal(hashCode);
  775. }
  776. }
  777. }
  778. AdapterProxy* EndpointManager::getHashProxyForWeight(int64_t hashCode, bool bStatic, vector<size_t> &vRouterCache)
  779. {
  780. if(_vRegProxys.empty())
  781. {
  782. TLOGERROR("[EndpointManager::getHashProxyForWeight _vRegProxys is empty], bStatic:" << bStatic << endl);
  783. return NULL;
  784. }
  785. if(checkHashStaticWeightChange(bStatic))
  786. {
  787. int64_t iBegin = TNOWMS;
  788. updateHashProxyWeighted(bStatic);
  789. int64_t iEnd = TNOWMS;
  790. TLOGTARS("[EndpointManager::getHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
  791. }
  792. if(vRouterCache.size() > 0)
  793. {
  794. size_t hash = ((int64_t)hashCode) % vRouterCache.size();
  795. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  796. if(hash >= vRouterCache.size())
  797. {
  798. hash = hash % vRouterCache.size();
  799. }
  800. size_t iIndex = vRouterCache[hash];
  801. if(iIndex >= _vRegProxys.size())
  802. {
  803. iIndex = iIndex % _vRegProxys.size();
  804. }
  805. //被hash到的节点在主控是active的才走在流程
  806. if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive(true))
  807. {
  808. return _vRegProxys[iIndex];
  809. }
  810. else
  811. {
  812. TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << _vRegProxys[iIndex]->endpoint().desc() << endl);
  813. if(_activeProxys.empty())
  814. {
  815. TLOGERROR("[EndpointManager::getHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
  816. return NULL;
  817. }
  818. //在active节点中再次hash
  819. vector<AdapterProxy*> thisHash = _activeProxys;
  820. vector<AdapterProxy*> conn;
  821. do
  822. {
  823. hash = ((int64_t)hashCode) % thisHash.size();
  824. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  825. if(hash >= thisHash.size())
  826. {
  827. hash = hash % thisHash.size();
  828. }
  829. if (thisHash[hash]->checkActive(true))
  830. {
  831. return thisHash[hash];
  832. }
  833. if(!thisHash[hash]->isConnTimeout() &&
  834. !thisHash[hash]->isConnExc())
  835. {
  836. conn.push_back(thisHash[hash]);
  837. }
  838. thisHash.erase(thisHash.begin() + hash);
  839. }
  840. while(!thisHash.empty());
  841. if(conn.size() > 0)
  842. {
  843. hash = ((int64_t)hashCode) % conn.size();
  844. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  845. if(hash >= conn.size())
  846. {
  847. hash = hash % conn.size();
  848. }
  849. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  850. AdapterProxy *adapterProxy = conn[hash];
  851. //该proxy可能已经被屏蔽,需重新连一次
  852. adapterProxy->checkActive(true);
  853. return adapterProxy;
  854. }
  855. //所有adapter都有问题 选不到结点,随机找一个重试
  856. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  857. adapterProxy->resetRetryTime(false);
  858. //该proxy可能已经被屏蔽,需重新连一次
  859. adapterProxy->checkActive(true);
  860. return adapterProxy;
  861. }
  862. }
  863. return getHashProxyForNormal(hashCode);
  864. }
  865. AdapterProxy* EndpointManager::getConHashProxyForWeight(int64_t hashCode, bool bStatic)
  866. {
  867. if(_vRegProxys.empty())
  868. {
  869. TLOGERROR("[EndpointManager::getConHashProxyForWeight _vRegProxys is empty], bStatic:" << bStatic << endl);
  870. return NULL;
  871. }
  872. if(checkConHashChange(bStatic, _lastConHashWeightProxys))
  873. {
  874. int64_t iBegin = TNOWMS;
  875. updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
  876. int64_t iEnd = TNOWMS;
  877. TLOGTARS("[EndpointManager::getConHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
  878. }
  879. while(_consistentHashWeight.size() > 0)
  880. {
  881. string sNode;
  882. // 通过一致性hash取到对应的节点
  883. _consistentHashWeight.getNodeName(hashCode, sNode);
  884. auto it = _indexActiveProxys.find(sNode);
  885. // 节点不存在,可能是下线或者服务不可用
  886. if (it == _indexActiveProxys.end())
  887. {
  888. updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
  889. continue;
  890. }
  891. //被hash到的节点在主控是active的才走在流程
  892. if (it->second->isActiveInReg() && it->second->checkActive(true))
  893. {
  894. return it->second;
  895. }
  896. else
  897. {
  898. TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl);
  899. // 剔除节点再次hash
  900. if (!it->second->isActiveInReg())
  901. {
  902. // 如果在主控的注册状态不是active直接删除,如果状态有变更由updateEndpoints函数里重新添加
  903. _indexActiveProxys.erase(sNode);
  904. }
  905. // checkConHashChange里重新加回到_sortActivProxys重试
  906. _sortActivProxys.erase(sNode);
  907. updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
  908. if (_indexActiveProxys.empty())
  909. {
  910. TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
  911. return NULL;
  912. }
  913. }
  914. }
  915. /*if(_consistentHashWeight.size() > 0)
  916. {
  917. unsigned int iIndex = 0;
  918. // 通过一致性hash取到对应的节点
  919. _consistentHashWeight.getIndex(hashCode, iIndex);
  920. if(iIndex >= _vRegProxys.size())
  921. {
  922. iIndex = iIndex % _vRegProxys.size();
  923. }
  924. //被hash到的节点在主控是active的才走在流程
  925. if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive(true))
  926. {
  927. return _vRegProxys[iIndex];
  928. }
  929. else
  930. {
  931. TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "," << _vRegProxys[iIndex]->getTransceiver()->getEndpointInfo().desc() << endl);
  932. if(_activeProxys.empty())
  933. {
  934. TLOGERROR("[EndpointManager::getConHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
  935. return NULL;
  936. }
  937. //在active节点中再次hash
  938. vector<AdapterProxy*> thisHash = _activeProxys;
  939. vector<AdapterProxy*> conn;
  940. size_t hash = 0;
  941. do
  942. {
  943. hash = ((int64_t)hashCode) % thisHash.size();
  944. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  945. if(hash >= thisHash.size())
  946. {
  947. hash = hash % thisHash.size();
  948. }
  949. if (thisHash[hash]->checkActive(false))
  950. {
  951. return thisHash[hash];
  952. }
  953. if(!thisHash[hash]->isConnTimeout() && !thisHash[hash]->isConnExc())
  954. {
  955. conn.push_back(thisHash[hash]);
  956. }
  957. thisHash.erase(thisHash.begin() + hash);
  958. }
  959. while(!thisHash.empty());
  960. if(conn.size() > 0)
  961. {
  962. hash = ((int64_t)hashCode) % conn.size();
  963. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  964. if(hash >= conn.size())
  965. {
  966. hash = hash % conn.size();
  967. }
  968. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  969. AdapterProxy *adapterProxy = conn[hash];
  970. //该proxy可能已经被屏蔽,需重新连一次
  971. adapterProxy->checkActive(true);
  972. return adapterProxy;
  973. }
  974. //所有adapter都有问题 选不到结点,随机找一个重试
  975. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  976. adapterProxy->resetRetryTime(false);
  977. //该proxy可能已经被屏蔽,需重新连一次
  978. adapterProxy->checkActive(true);
  979. return adapterProxy;
  980. }
  981. }*/
  982. return getHashProxyForNormal(hashCode);
  983. }
  984. bool EndpointManager::checkHashStaticWeightChange(bool bStatic)
  985. {
  986. if(bStatic)
  987. {
  988. if(_lastHashStaticProxys.size() != _vRegProxys.size())
  989. {
  990. return true;
  991. }
  992. for(size_t i = 0; i < _vRegProxys.size(); i++)
  993. {
  994. //解决服务权重更新时哈希表不更新的问题
  995. if((_lastHashStaticProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc()) || _vRegProxys[i]->checkWeightChanged(true))
  996. {
  997. return true;
  998. }
  999. }
  1000. }
  1001. return false;
  1002. }
  1003. bool EndpointManager::checkConHashChange(bool bStatic, const map<string, AdapterProxy*> &mLastConHashProxys)
  1004. {
  1005. // 将之前故障临时剔除的节点重新加回来重试
  1006. if (_indexActiveProxys.size() != _sortActivProxys.size())
  1007. {
  1008. for (auto &it : _indexActiveProxys)
  1009. {
  1010. _sortActivProxys[it.first] = it.second;
  1011. }
  1012. }
  1013. if(mLastConHashProxys.size() != _sortActivProxys.size())
  1014. {
  1015. return true;
  1016. }
  1017. auto itLast = mLastConHashProxys.begin();
  1018. auto itSort = _sortActivProxys.begin();
  1019. for (; itLast!=mLastConHashProxys.end() && itSort!=_sortActivProxys.end(); ++itLast,++itSort)
  1020. {
  1021. if (itLast->first != itSort->first)
  1022. {
  1023. return true;
  1024. }
  1025. //解决服务权重更新时一致性哈希环不更新的问题
  1026. if(bStatic && itSort->second->checkWeightChanged(true))
  1027. {
  1028. return true;
  1029. }
  1030. }
  1031. /*
  1032. if(vLastConHashProxys.size() != _vRegProxys.size())
  1033. {
  1034. return true;
  1035. }
  1036. for(size_t i = 0; i < _vRegProxys.size(); i++)
  1037. {
  1038. if(vLastConHashProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc())
  1039. {
  1040. return true;
  1041. }
  1042. //解决服务权重更新时一致性哈希环不更新的问题
  1043. if(bStatic && _vRegProxys[i]->checkWeightChanged(true))
  1044. {
  1045. return true;
  1046. }
  1047. }
  1048. */
  1049. return false;
  1050. }
  1051. void EndpointManager::updateHashProxyWeighted(bool bStatic)
  1052. {
  1053. if(_vRegProxys.size() <= 0)
  1054. {
  1055. TLOGERROR("[EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
  1056. return ;
  1057. }
  1058. if(bStatic)
  1059. {
  1060. _lastHashStaticProxys = _vRegProxys;
  1061. _hashStaticRouterCache.clear();
  1062. }
  1063. vector<AdapterProxy*> vRegProxys;
  1064. vector<size_t> vIndex;
  1065. for(size_t i = 0; i < _vRegProxys.size(); ++i)
  1066. {
  1067. if(_vRegProxys[i]->getWeight() > 0)
  1068. {
  1069. vRegProxys.push_back(_vRegProxys[i]);
  1070. vIndex.push_back(i);
  1071. }
  1072. //防止多个服务节点权重同时更新时哈希表多次更新
  1073. _vRegProxys[i]->resetWeightChanged();
  1074. }
  1075. if(vRegProxys.size() <= 0)
  1076. {
  1077. TLOGERROR("[EndpointManager::updateHashProxyWeighted vRegProxys is empty], bStatic:" << bStatic << endl);
  1078. return ;
  1079. }
  1080. size_t iHashStaticWeightSize = vRegProxys.size();
  1081. map<size_t, int> mIdToWeight;
  1082. multimap<int, size_t> mWeightToId;
  1083. size_t iMaxR = 0;
  1084. size_t iMaxRouterR = 0;
  1085. size_t iMaxWeight = vRegProxys[0]->getWeight();
  1086. size_t iMinWeight = vRegProxys[0]->getWeight();
  1087. size_t iTempWeight = 0;
  1088. for(size_t i = 1;i < iHashStaticWeightSize; i++)
  1089. {
  1090. iTempWeight = vRegProxys[i]->getWeight();
  1091. if(iTempWeight > iMaxWeight)
  1092. {
  1093. iMaxWeight = iTempWeight;
  1094. }
  1095. if(iTempWeight < iMinWeight)
  1096. {
  1097. iMinWeight = iTempWeight;
  1098. }
  1099. }
  1100. if(iMinWeight > 0)
  1101. {
  1102. iMaxR = iMaxWeight / iMinWeight;
  1103. if(iMaxR < iMinWeightLimit)
  1104. iMaxR = iMinWeightLimit;
  1105. if(iMaxR > iMaxWeightLimit)
  1106. iMaxR = iMaxWeightLimit;
  1107. }
  1108. else
  1109. {
  1110. iMaxR = 1;
  1111. iMaxWeight = 1;
  1112. }
  1113. for(size_t i = 0;i < iHashStaticWeightSize; i++)
  1114. {
  1115. int iWeight = (vRegProxys[i]->getWeight() * iMaxR) / iMaxWeight;
  1116. if(iWeight > 0)
  1117. {
  1118. iMaxRouterR += iWeight;
  1119. mIdToWeight.insert(map<size_t, int>::value_type(vIndex[i], iWeight));
  1120. mWeightToId.insert(make_pair(iWeight, vIndex[i]));
  1121. }
  1122. else
  1123. {
  1124. if(bStatic)
  1125. {
  1126. _hashStaticRouterCache.push_back(vIndex[i]);
  1127. }
  1128. }
  1129. TLOGTARS("EndpointManager::updateHashProxyWeighted bStatic:" << bStatic << "|_objName:" << _objName << "|endpoint:" << vRegProxys[i]->endpoint().desc() << "|iWeight:" << vRegProxys[i]->getWeight() << "|iWeightR:" << iWeight << "|iIndex:" << vIndex[i] << endl);
  1130. }
  1131. for(size_t i = 0; i < iMaxRouterR; i++)
  1132. {
  1133. bool bFirst = true;
  1134. multimap<int, size_t> mulTemp;
  1135. multimap<int, size_t>::reverse_iterator mIter = mWeightToId.rbegin();
  1136. while(mIter != mWeightToId.rend())
  1137. {
  1138. if(bFirst)
  1139. {
  1140. bFirst = false;
  1141. if(bStatic)
  1142. {
  1143. _hashStaticRouterCache.push_back(mIter->second);
  1144. }
  1145. mulTemp.insert(make_pair((mIter->first - iMaxRouterR + mIdToWeight[mIter->second]), mIter->second));
  1146. }
  1147. else
  1148. {
  1149. mulTemp.insert(make_pair((mIter->first + mIdToWeight[mIter->second]), mIter->second));
  1150. }
  1151. mIter++;
  1152. }
  1153. mWeightToId.clear();
  1154. mWeightToId.swap(mulTemp);
  1155. }
  1156. }
  1157. void EndpointManager::updateConHashProxyWeighted(bool bStatic, map<string, AdapterProxy*> &mLastConHashProxys, TC_ConsistentHashNew &conHash)
  1158. {
  1159. conHash.clear();
  1160. if(_sortActivProxys.empty())
  1161. {
  1162. TLOGERROR("[EndpointManager::updateHashProxyWeighted _indexActiveProxys is empty], bStatic:" << bStatic << endl);
  1163. return ;
  1164. }
  1165. mLastConHashProxys = _sortActivProxys;
  1166. for (auto it = _sortActivProxys.begin(); it != _sortActivProxys.end(); ++it)
  1167. {
  1168. int iWeight = (bStatic ? (it->second->getWeight()) : 100);
  1169. if(iWeight > 0)
  1170. {
  1171. iWeight = iWeight / 4;
  1172. if(iWeight <= 0)
  1173. {
  1174. iWeight = 1;
  1175. }
  1176. // 同一服务有多个obj的情况
  1177. // 同一hash值调用不同的obj会hash到不同的服务器
  1178. // 因为addNode会根据desc(ip+port)计算md5,导致顺序不一致
  1179. // 一致性hash用host进行索引,不使用index,这里传0
  1180. conHash.addNode(it->second->endpoint().host(), 0, iWeight);
  1181. }
  1182. //防止多个服务节点权重同时更新时一致性哈希环多次更新
  1183. it->second->resetWeightChanged();
  1184. }
  1185. /*
  1186. if(_vRegProxys.size() <= 0)
  1187. {
  1188. TLOGERROR("[EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
  1189. return ;
  1190. }
  1191. vLastConHashProxys = _vRegProxys;
  1192. conHash.clear();
  1193. for(size_t i = 0; i < _vRegProxys.size(); ++i)
  1194. {
  1195. int iWeight = (bStatic ? (_vRegProxys[i]->getWeight()) : 100);
  1196. if(iWeight > 0)
  1197. {
  1198. iWeight = iWeight / 4;
  1199. if(iWeight <= 0)
  1200. {
  1201. iWeight = 1;
  1202. }
  1203. // 同一服务有多个obj的情况
  1204. // 同一hash值调用不同的obj会hash到不同的服务器
  1205. // 因为addNode会根据desc(ip+port)计算md5,导致顺序不一致
  1206. conHash.addNode(_vRegProxys[i]->endpoint().host(), i, iWeight);
  1207. }
  1208. //防止多个服务节点权重同时更新时一致性哈希环多次更新
  1209. _vRegProxys[i]->resetWeightChanged();
  1210. }
  1211. */
  1212. conHash.sortNode();
  1213. }
  1214. AdapterProxy* EndpointManager::getHashProxyForNormal(int64_t hashCode)
  1215. {
  1216. if(_vRegProxys.empty())
  1217. {
  1218. TLOGERROR("[EndpointManager::getHashProxyForNormal _vRegProxys is empty]" << endl);
  1219. return NULL;
  1220. }
  1221. // 1 _vRegProxys从客户端启动之后,就不会再改变,除非有节点增加
  1222. // 2 如果有增加节点,则_vRegProxys顺序会重新排序,之前的hash会改变
  1223. // 3 节点下线后,需要下次启动客户端后,_vRegProxys内容才会生效
  1224. size_t hash = ((int64_t)hashCode) % _vRegProxys.size();
  1225. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1226. if(hash >= _vRegProxys.size())
  1227. {
  1228. hash = hash % _vRegProxys.size();
  1229. }
  1230. //被hash到的节点在主控是active的才走在流程
  1231. if (_vRegProxys[hash]->isActiveInReg() && _vRegProxys[hash]->checkActive(true))
  1232. {
  1233. return _vRegProxys[hash];
  1234. }
  1235. else
  1236. {
  1237. TLOGWARN("[EndpointManager::getHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << _vRegProxys[hash]->endpoint().desc() << endl);
  1238. if(_activeProxys.empty())
  1239. {
  1240. TLOGERROR("[EndpointManager::getHashProxyForNormal _activeEndpoints is empty]" << endl);
  1241. return NULL;
  1242. }
  1243. //在active节点中再次hash
  1244. vector<AdapterProxy*> thisHash = _activeProxys;
  1245. vector<AdapterProxy*> conn;
  1246. do
  1247. {
  1248. hash = ((int64_t)hashCode) % thisHash.size();
  1249. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1250. if(hash >= thisHash.size())
  1251. {
  1252. hash = hash % thisHash.size();
  1253. }
  1254. if (thisHash[hash]->checkActive(true))
  1255. {
  1256. return thisHash[hash];
  1257. }
  1258. if(!thisHash[hash]->isConnTimeout() &&
  1259. !thisHash[hash]->isConnExc())
  1260. {
  1261. conn.push_back(thisHash[hash]);
  1262. }
  1263. thisHash.erase(thisHash.begin() + hash);
  1264. }
  1265. while(!thisHash.empty());
  1266. if(conn.size() > 0)
  1267. {
  1268. hash = ((int64_t)hashCode) % conn.size();
  1269. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1270. if(hash >= conn.size())
  1271. {
  1272. hash = hash % conn.size();
  1273. }
  1274. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1275. AdapterProxy *adapterProxy = conn[hash];
  1276. //该proxy可能已经被屏蔽,需重新连一次
  1277. adapterProxy->checkActive(true);
  1278. return adapterProxy;
  1279. }
  1280. //所有adapter都有问题 选不到结点,随机找一个重试
  1281. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  1282. adapterProxy->resetRetryTime(false);
  1283. //该proxy可能已经被屏蔽,需重新连一次
  1284. adapterProxy->checkActive(true);
  1285. return adapterProxy;
  1286. }
  1287. }
  1288. AdapterProxy* EndpointManager::getConHashProxyForNormal(int64_t hashCode)
  1289. {
  1290. if(_vRegProxys.empty())
  1291. {
  1292. TLOGERROR("[EndpointManager::getConHashProxyForNormal _vRegProxys is empty]" << endl);
  1293. return NULL;
  1294. }
  1295. if(checkConHashChange(false, _lastConHashProxys))
  1296. {
  1297. int64_t iBegin = TNOWMS;
  1298. updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
  1299. int64_t iEnd = TNOWMS;
  1300. TLOGTARS("[EndpointManager::getConHashProxyForNormal update _objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
  1301. }
  1302. while(_consistentHash.size() > 0)
  1303. {
  1304. string sNode;
  1305. // 通过一致性hash取到对应的节点
  1306. _consistentHash.getNodeName(hashCode, sNode);
  1307. auto it = _indexActiveProxys.find(sNode);
  1308. // 节点不存在,可能是下线或者服务不可用
  1309. if (it == _indexActiveProxys.end())
  1310. {
  1311. updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
  1312. continue;
  1313. }
  1314. //被hash到的节点在主控是active的才走在流程
  1315. if (it->second->isActiveInReg() && it->second->checkActive(true))
  1316. {
  1317. return it->second;
  1318. }
  1319. else
  1320. {
  1321. TLOGWARN("[EndpointManager::getConHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl);
  1322. // 剔除节点再次hash
  1323. if (!it->second->isActiveInReg())
  1324. {
  1325. // 如果在主控的注册状态不是active直接删除,如果状态有变更由updateEndpoints函数里重新添加
  1326. _indexActiveProxys.erase(sNode);
  1327. }
  1328. // checkConHashChange里重新加回到_sortActivProxys重试
  1329. _sortActivProxys.erase(sNode);
  1330. updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
  1331. if (_indexActiveProxys.empty())
  1332. {
  1333. TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
  1334. return NULL;
  1335. }
  1336. }
  1337. }
  1338. /*
  1339. if(_consistentHash.size() > 0)
  1340. {
  1341. unsigned int iIndex = 0;
  1342. // 通过一致性hash取到对应的节点
  1343. _consistentHash.getIndex(hashCode, iIndex);
  1344. if(iIndex >= _vRegProxys.size())
  1345. {
  1346. iIndex = iIndex % _vRegProxys.size();
  1347. }
  1348. //被hash到的节点在主控是active的才走在流程
  1349. if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive(true))
  1350. {
  1351. return _vRegProxys[iIndex];
  1352. }
  1353. else
  1354. {
  1355. TLOGWARN("[EndpointManager::getConHashProxyForNormal, hash not active," << _objectProxy->name() << "," << _vRegProxys[iIndex]->getTransceiver()->getEndpointInfo().desc() << endl);
  1356. if(_activeProxys.empty())
  1357. {
  1358. TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
  1359. return NULL;
  1360. }
  1361. //在active节点中再次hash
  1362. vector<AdapterProxy*> thisHash = _activeProxys;
  1363. vector<AdapterProxy*> conn;
  1364. size_t hash = 0;
  1365. do
  1366. {
  1367. hash = ((int64_t)hashCode) % thisHash.size();
  1368. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1369. if(hash >= thisHash.size())
  1370. {
  1371. hash = hash % thisHash.size();
  1372. }
  1373. if (thisHash[hash]->checkActive(true))
  1374. {
  1375. return thisHash[hash];
  1376. }
  1377. if(!thisHash[hash]->isConnTimeout() &&
  1378. !thisHash[hash]->isConnExc())
  1379. {
  1380. conn.push_back(thisHash[hash]);
  1381. }
  1382. thisHash.erase(thisHash.begin() + hash);
  1383. }
  1384. while(!thisHash.empty());
  1385. if(conn.size() > 0)
  1386. {
  1387. hash = ((int64_t)hashCode) % conn.size();
  1388. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1389. if(hash >= conn.size())
  1390. {
  1391. hash = hash % conn.size();
  1392. }
  1393. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1394. AdapterProxy *adapterProxy = conn[hash];
  1395. //该proxy可能已经被屏蔽,需重新连一次
  1396. adapterProxy->checkActive(true);
  1397. return adapterProxy;
  1398. }
  1399. //所有adapter都有问题 选不到结点,随机找一个重试
  1400. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  1401. adapterProxy->resetRetryTime(false);
  1402. //该proxy可能已经被屏蔽,需重新连一次
  1403. adapterProxy->checkActive(true);
  1404. return adapterProxy;
  1405. }
  1406. }
  1407. */
  1408. return getHashProxyForNormal(hashCode);
  1409. }
  1410. AdapterProxy* EndpointManager::getWeightedProxy(bool bStaticWeighted)
  1411. {
  1412. return getWeightedForNormal(bStaticWeighted);
  1413. }
  1414. AdapterProxy* EndpointManager::getWeightedForNormal(bool bStaticWeighted)
  1415. {
  1416. if (_activeProxys.empty())
  1417. {
  1418. TLOGERROR("[EndpointManager::getWeightedForNormal activeEndpoints is empty][obj:"<<_objName<<"]" << endl);
  1419. return NULL;
  1420. }
  1421. int64_t iNow = TNOW;
  1422. if(_lastBuildWeightTime <= iNow)
  1423. {
  1424. updateProxyWeighted();
  1425. if(!_first)
  1426. {
  1427. _lastBuildWeightTime = iNow + _updateWeightInterval;
  1428. }
  1429. else
  1430. {
  1431. _first = false;
  1432. _lastBuildWeightTime = iNow + _updateWeightInterval + 5;
  1433. }
  1434. }
  1435. bool bEmpty = false;
  1436. int iActiveSize = _activeWeightProxy.size();
  1437. if(iActiveSize > 0)
  1438. {
  1439. size_t iProxyIndex = 0;
  1440. set<AdapterProxy*> sConn;
  1441. if(_staticRouterCache.size() > 0)
  1442. {
  1443. for(size_t i = 0;i < _staticRouterCache.size(); i++)
  1444. {
  1445. ++_lastSWeightPosition;
  1446. if(_lastSWeightPosition >= _staticRouterCache.size())
  1447. _lastSWeightPosition = 0;
  1448. iProxyIndex = _staticRouterCache[_lastSWeightPosition];
  1449. if(_activeWeightProxy[iProxyIndex]->checkActive(false))
  1450. {
  1451. return _activeWeightProxy[iProxyIndex];
  1452. }
  1453. if(!_activeWeightProxy[iProxyIndex]->isConnTimeout() &&
  1454. !_activeWeightProxy[iProxyIndex]->isConnExc())
  1455. {
  1456. sConn.insert(_activeWeightProxy[iProxyIndex]);
  1457. }
  1458. }
  1459. }
  1460. else
  1461. {
  1462. bEmpty = true;
  1463. }
  1464. if(!bEmpty)
  1465. {
  1466. if(sConn.size() > 0)
  1467. {
  1468. vector<AdapterProxy*> conn;
  1469. set<AdapterProxy*>::iterator it_conn = sConn.begin();
  1470. while(it_conn != sConn.end())
  1471. {
  1472. conn.push_back(*it_conn);
  1473. ++it_conn;
  1474. }
  1475. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1476. AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
  1477. //该proxy可能已经被屏蔽,需重新连一次
  1478. adapterProxy->checkActive(true);
  1479. return adapterProxy;
  1480. }
  1481. //所有adapter都有问题 选不到结点,随机找一个重试
  1482. AdapterProxy * adapterProxy = _activeWeightProxy[((uint32_t)rand() % iActiveSize)];
  1483. adapterProxy->resetRetryTime(false);
  1484. //该proxy可能已经被屏蔽,需重新连一次
  1485. adapterProxy->checkActive(true);
  1486. return adapterProxy;
  1487. }
  1488. }
  1489. vector<AdapterProxy*> conn;
  1490. for(size_t i=0;i<_activeProxys.size();i++)
  1491. {
  1492. ++_lastRoundPosition;
  1493. if(_lastRoundPosition >= _activeProxys.size())
  1494. _lastRoundPosition = 0;
  1495. if(_activeProxys[_lastRoundPosition]->checkActive(false))
  1496. {
  1497. return _activeProxys[_lastRoundPosition];
  1498. }
  1499. if(!_activeProxys[_lastRoundPosition]->isConnTimeout() &&
  1500. !_activeProxys[_lastRoundPosition]->isConnExc())
  1501. conn.push_back(_activeProxys[_lastRoundPosition]);
  1502. }
  1503. if(conn.size() > 0)
  1504. {
  1505. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1506. AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
  1507. //该proxy可能已经被屏蔽,需重新连一次
  1508. adapterProxy->checkActive(true);
  1509. return adapterProxy;
  1510. }
  1511. //所有adapter都有问题 选不到结点,随机找一个重试
  1512. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  1513. adapterProxy->resetRetryTime(false);
  1514. //该proxy可能已经被屏蔽,需重新连一次
  1515. adapterProxy->checkActive(true);
  1516. return adapterProxy;
  1517. }
  1518. void EndpointManager::updateProxyWeighted()
  1519. {
  1520. size_t iWeightProxySize = _activeProxys.size();
  1521. if(iWeightProxySize <= 0)
  1522. {
  1523. TLOGERROR("[EndpointManager::updateProxyWeighted _objName:" << _objName << ", activeProxys.size() <= 0]" << endl);
  1524. return ;
  1525. }
  1526. vector<AdapterProxy*> vProxy;
  1527. for(size_t i = 0; i < _activeProxys.size(); ++i)
  1528. {
  1529. if(_activeProxys[i]->getWeight() > 0)
  1530. {
  1531. vProxy.push_back(_activeProxys[i]);
  1532. }
  1533. }
  1534. iWeightProxySize = vProxy.size();
  1535. if(iWeightProxySize <= 0)
  1536. {
  1537. TLOGERROR("[EndpointManager::updateProxyWeighted _objName:" << _objName << ", vProxy.size() <= 0]" << endl);
  1538. return ;
  1539. }
  1540. if(_update)
  1541. {
  1542. _activeWeightProxy = vProxy;
  1543. updateStaticWeighted();
  1544. }
  1545. _update = false;
  1546. }
  1547. void EndpointManager::updateStaticWeighted()
  1548. {
  1549. size_t iWeightProxySize = _activeWeightProxy.size();
  1550. vector<int> vWeight;
  1551. vWeight.resize(iWeightProxySize);
  1552. for(size_t i = 0; i < iWeightProxySize; i++)
  1553. {
  1554. vWeight[i] = _activeWeightProxy[i]->getWeight();
  1555. }
  1556. dispatchEndpointCache(vWeight);
  1557. }
  1558. void EndpointManager::dispatchEndpointCache(const vector<int> &vWeight)
  1559. {
  1560. if(vWeight.size() <= 0)
  1561. {
  1562. TLOGERROR("[EndpointManager::dispatchEndpointCache vWeight.size() < 0]" << endl);
  1563. return ;
  1564. }
  1565. size_t iWeightProxySize = vWeight.size();
  1566. map<size_t, int> mIdToWeight;
  1567. multimap<int, size_t> mWeightToId;
  1568. size_t iMaxR = 0;
  1569. size_t iMaxRouterR = 0;
  1570. size_t iMaxWeight = 0;
  1571. size_t iMinWeight = 0;
  1572. size_t iTotalCapacty = 0;
  1573. size_t iTempWeight = 0;
  1574. for(size_t i = 0; i < vWeight.size(); ++i)
  1575. {
  1576. iTotalCapacty += vWeight[i];
  1577. }
  1578. _staticRouterCache.clear();
  1579. _lastSWeightPosition = 0;
  1580. _staticRouterCache.reserve(iTotalCapacty+100);
  1581. iMaxWeight = vWeight[0];
  1582. iMinWeight = vWeight[0];
  1583. for(size_t i = 1;i < iWeightProxySize; i++)
  1584. {
  1585. iTempWeight = vWeight[i];
  1586. if(iTempWeight > iMaxWeight)
  1587. {
  1588. iMaxWeight = iTempWeight;
  1589. }
  1590. if(iTempWeight < iMinWeight)
  1591. {
  1592. iMinWeight = iTempWeight;
  1593. }
  1594. }
  1595. if(iMinWeight > 0)
  1596. {
  1597. iMaxR = iMaxWeight / iMinWeight;
  1598. if(iMaxR < iMinWeightLimit)
  1599. iMaxR = iMinWeightLimit;
  1600. if(iMaxR > iMaxWeightLimit)
  1601. iMaxR = iMaxWeightLimit;
  1602. }
  1603. else
  1604. {
  1605. iMaxR = 1;
  1606. iMaxWeight = 1;
  1607. }
  1608. for(size_t i = 0;i < iWeightProxySize; i++)
  1609. {
  1610. int iWeight = (vWeight[i] * iMaxR) / iMaxWeight;
  1611. if(iWeight > 0)
  1612. {
  1613. iMaxRouterR += iWeight;
  1614. mIdToWeight.insert(map<size_t, int>::value_type(i, iWeight));
  1615. mWeightToId.insert(make_pair(iWeight, i));
  1616. }
  1617. else
  1618. {
  1619. _staticRouterCache.push_back(i);
  1620. }
  1621. TLOGTARS("EndpointManager::dispatchEndpointCache _objName:" << _objName << "|endpoint:" << _activeWeightProxy[i]->endpoint().desc() << "|iWeightR:" << iWeight << endl);
  1622. }
  1623. for(size_t i = 0; i < iMaxRouterR; i++)
  1624. {
  1625. bool bFirst = true;
  1626. multimap<int, size_t> mulTemp;
  1627. multimap<int, size_t>::reverse_iterator mIter = mWeightToId.rbegin();
  1628. while(mIter != mWeightToId.rend())
  1629. {
  1630. if(bFirst)
  1631. {
  1632. bFirst = false;
  1633. _staticRouterCache.push_back(mIter->second);
  1634. mulTemp.insert(make_pair((mIter->first - iMaxRouterR + mIdToWeight[mIter->second]), mIter->second));
  1635. }
  1636. else
  1637. {
  1638. mulTemp.insert(make_pair((mIter->first + mIdToWeight[mIter->second]), mIter->second));
  1639. }
  1640. mIter++;
  1641. }
  1642. mWeightToId.clear();
  1643. mWeightToId.swap(mulTemp);
  1644. }
  1645. }
  1646. /////////////////////////////////////////////////////////////////////////////
  1647. EndpointThread::EndpointThread(Communicator* pComm, const string & sObjName, GetEndpointType type, const string & sName, bool bFirstNetThread)
  1648. : QueryEpBase(pComm,bFirstNetThread,true)
  1649. , _type(type)
  1650. , _name(sName)
  1651. {
  1652. init(sObjName, "", true);
  1653. }
  1654. void EndpointThread::getEndpoints(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1655. {
  1656. //直连调用这个接口无效
  1657. if(_direct)
  1658. {
  1659. return ;
  1660. }
  1661. {
  1662. TC_LockT<TC_ThreadMutex> lock(_mutex);
  1663. refreshReg(_type,_name);
  1664. activeEndPoint = _activeEndPoint;
  1665. inactiveEndPoint = _inactiveEndPoint;
  1666. }
  1667. }
  1668. void EndpointThread::getTCEndpoints(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1669. {
  1670. //直连调用这个接口无效
  1671. if(_direct)
  1672. {
  1673. return ;
  1674. }
  1675. {
  1676. TC_LockT<TC_ThreadMutex> lock(_mutex);
  1677. refreshReg(_type,_name);
  1678. activeEndPoint = _activeTCEndPoint;
  1679. inactiveEndPoint = _inactiveTCEndPoint;
  1680. }
  1681. }
  1682. void EndpointThread::notifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive,bool bSync)
  1683. {
  1684. if(!bSync)
  1685. {
  1686. TC_LockT<TC_ThreadMutex> lock(_mutex);
  1687. update(active, inactive);
  1688. }
  1689. else
  1690. {
  1691. update(active, inactive);
  1692. }
  1693. }
  1694. void EndpointThread::update(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
  1695. {
  1696. _activeEndPoint.clear();
  1697. _inactiveEndPoint.clear();
  1698. _activeTCEndPoint.clear();
  1699. _inactiveTCEndPoint.clear();
  1700. set<EndpointInfo>::iterator iter= active.begin();
  1701. for(;iter != active.end(); ++iter)
  1702. {
  1703. // TC_Endpoint ep = (iter->host(), iter->port(), 3000, iter->type(), iter->grid());
  1704. _activeTCEndPoint.push_back(iter->getEndpoint());
  1705. _activeEndPoint.push_back(*iter);
  1706. }
  1707. iter = inactive.begin();
  1708. for(;iter != inactive.end(); ++iter)
  1709. {
  1710. // TC_Endpoint ep(iter->host(), iter->port(), 3000, iter->type(), iter->grid());
  1711. _inactiveTCEndPoint.push_back(iter->getEndpoint());
  1712. _inactiveEndPoint.push_back(*iter);
  1713. }
  1714. }
  1715. /////////////////////////////////////////////////////////////////////////////
  1716. EndpointManagerThread::EndpointManagerThread(Communicator * pComm,const string & sObjName)
  1717. :_communicator(pComm)
  1718. ,_objName(sObjName)
  1719. {
  1720. }
  1721. EndpointManagerThread::~EndpointManagerThread()
  1722. {
  1723. map<string,EndpointThread*>::iterator iter;
  1724. for(iter=_info.begin();iter != _info.end();iter++)
  1725. {
  1726. if(iter->second)
  1727. {
  1728. delete iter->second;
  1729. iter->second = NULL;
  1730. }
  1731. }
  1732. }
  1733. void EndpointManagerThread::getEndpoint(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1734. {
  1735. EndpointThread * pThread = getEndpointThread(E_DEFAULT,"");
  1736. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1737. }
  1738. void EndpointManagerThread::getEndpointByAll(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1739. {
  1740. EndpointThread * pThread = getEndpointThread(E_ALL,"");
  1741. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1742. }
  1743. void EndpointManagerThread::getEndpointBySet(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1744. {
  1745. EndpointThread * pThread = getEndpointThread(E_SET,sName);
  1746. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1747. }
  1748. void EndpointManagerThread::getEndpointByStation(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1749. {
  1750. EndpointThread * pThread = getEndpointThread(E_STATION,sName);
  1751. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1752. }
  1753. void EndpointManagerThread::getTCEndpoint(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1754. {
  1755. EndpointThread * pThread = getEndpointThread(E_DEFAULT,"");
  1756. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1757. }
  1758. void EndpointManagerThread::getTCEndpointByAll(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1759. {
  1760. EndpointThread * pThread = getEndpointThread(E_ALL,"");
  1761. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1762. }
  1763. void EndpointManagerThread::getTCEndpointBySet(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1764. {
  1765. EndpointThread * pThread = getEndpointThread(E_SET,sName);
  1766. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1767. }
  1768. void EndpointManagerThread::getTCEndpointByStation(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1769. {
  1770. EndpointThread * pThread = getEndpointThread(E_STATION,sName);
  1771. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1772. }
  1773. EndpointThread * EndpointManagerThread::getEndpointThread(GetEndpointType type,const string & sName)
  1774. {
  1775. TC_LockT<TC_SpinLock> lock(_mutex);
  1776. string sAllName = TC_Common::tostr((int)type) + ":" + sName;
  1777. map<string,EndpointThread*>::iterator iter;
  1778. iter = _info.find(sAllName);
  1779. if(iter != _info.end())
  1780. {
  1781. return iter->second;
  1782. }
  1783. EndpointThread * pThread = new EndpointThread(_communicator, _objName, type, sName);
  1784. _info[sAllName] = pThread;
  1785. return pThread;
  1786. }
  1787. }