EndpointManager.cpp 57 KB


  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "util/tc_port.h"
  17. #include "servant/EndpointManager.h"
  18. #include "servant/ObjectProxy.h"
  19. #include "servant/RemoteLogger.h"
  20. #include "servant/AppCache.h"
  21. #include "servant/Application.h"
  22. #include "servant/CommunicatorEpoll.h"
  23. #include "servant/StatReport.h"
  24. namespace tars
  25. {
  26. /////////////////////////////////////////////////////////////////////////////
  27. QueryEpBase::QueryEpBase(Communicator * pComm, bool bFirstNetThread,bool bInterfaceReq)
  28. : _communicator(pComm)
  29. , _firstNetThread(bFirstNetThread)
  30. , _interfaceReq(bInterfaceReq)
  31. , _direct(false)
  32. , _objName("")
  33. , _invokeSetId("")
  34. , _locator("")
  35. , _valid(false)
  36. , _weightType(E_LOOP)
  37. , _rootServant(true)
  38. , _requestRegistry(false)
  39. , _requestTimeout(0)
  40. , _timeoutInterval(5*1000)
  41. , _refreshTime(0)
  42. , _refreshInterval(60*1000)
  43. , _activeEmptyInterval(10*1000)
  44. , _failInterval(2*1000)
  45. , _manyFailInterval(30*1000)
  46. , _failTimesLimit(3)
  47. , _failTimes(0)
  48. {
  49. _refreshInterval = TC_Common::strto<int>(_communicator->getProperty("refresh-endpoint-interval", "60*1000"));
  50. if(_refreshInterval < 5*1000)
  51. {
  52. _refreshInterval = 5 * 1000;
  53. }
  54. setNoDelete(true);
  55. }
  56. void QueryEpBase::callback_findObjectById4All(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
  57. {
  58. TLOGTARS("[callback_findObjectById4All _objName:" << _objName << "|ret:" << ret
  59. << ",active:" << activeEp.size()
  60. << ",inactive:" << inactiveEp.size() << "]" << endl);
  61. doEndpoints(activeEp,inactiveEp,ret);
  62. }
  63. void QueryEpBase::callback_findObjectById4All_exception(Int32 ret)
  64. {
  65. TLOGERROR("[callback_findObjectById4All_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  66. doEndpointsExp(ret);
  67. }
  68. void QueryEpBase::callback_findObjectById4Any(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
  69. {
  70. TLOGTARS("[callback_findObjectById4Any _objName:" << _objName << "|ret:" << ret
  71. << ",active:" << activeEp.size()
  72. << ",inactive:" << inactiveEp.size() << "]" << endl);
  73. doEndpoints(activeEp,inactiveEp,ret);
  74. }
  75. void QueryEpBase::callback_findObjectById4Any_exception(Int32 ret)
  76. {
  77. TLOGERROR("[callback_findObjectById4Any_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  78. doEndpointsExp(ret);
  79. }
  80. void QueryEpBase::callback_findObjectByIdInSameGroup(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
  81. {
  82. TLOGTARS("[callback_findObjectByIdInSameGroup _objName:" << _objName << "|ret:"<<ret
  83. << ",active:" << activeEp.size()
  84. << ",inactive:" << inactiveEp.size() << "]" << endl);
  85. doEndpoints(activeEp,inactiveEp,ret);
  86. }
  87. void QueryEpBase::callback_findObjectByIdInSameGroup_exception(Int32 ret)
  88. {
  89. TLOGERROR("[callback_findObjectByIdInSameGroup_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  90. doEndpointsExp(ret);
  91. }
  92. void QueryEpBase::callback_findObjectByIdInSameSet( Int32 ret, const vector<EndpointF> &activeEp, const vector<EndpointF> & inactiveEp)
  93. {
  94. TLOGTARS("[callback_findObjectByIdInSameSet _objName:" << _objName << "|ret:" << ret
  95. << ",active:" << activeEp.size()
  96. << ",inactive:" << inactiveEp.size() << "]" << endl);
  97. doEndpoints(activeEp,inactiveEp,ret);
  98. }
  99. void QueryEpBase::callback_findObjectByIdInSameSet_exception( Int32 ret)
  100. {
  101. TLOGERROR("[callback_findObjectByIdInSameSet_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  102. doEndpointsExp(ret);
  103. }
  104. void QueryEpBase::callback_findObjectByIdInSameStation( Int32 ret, const vector<EndpointF> &activeEp, const vector<EndpointF> &inactiveEp)
  105. {
  106. TLOGTARS("[callback_findObjectByIdInSameStation _objName:" << _objName << "|ret:" << ret
  107. << ",active:" << activeEp.size()
  108. << ",inactive:" << inactiveEp.size() << "]" << endl);
  109. doEndpoints(activeEp,inactiveEp,ret);
  110. }
  111. void QueryEpBase::callback_findObjectByIdInSameStation_exception( Int32 ret)
  112. {
  113. TLOGERROR("[callback_findObjectByIdInSameStation_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  114. doEndpointsExp(ret);
  115. }
  116. int QueryEpBase::setLocatorPrx(QueryFPrx prx)
  117. {
  118. _queryFPrx = prx;
  119. return 0;
  120. }
  121. bool QueryEpBase::init(const string & sObjName, const string& setName, bool rootServant)
  122. {
  123. _locator = _communicator->getProperty("locator");
  124. TLOGTARS("QueryEpBase::init sObjName:" << sObjName << ", sLocator:" << _locator << ", setName:" << setName << ", rootServant: " << rootServant << endl);
  125. // LOG_CONSOLE_DEBUG << "QueryEpBase::init sObjName:" << sObjName << ", sLocator:" << _locator << ", setName:" << setName << ", rootServant: " << rootServant << endl;
  126. _invokeSetId = setName;
  127. _rootServant = rootServant;
  128. setObjName(sObjName);
  129. return true;
  130. }
  131. void QueryEpBase::setObjName(const string & sObjName)
  132. {
  133. string::size_type pos = sObjName.find_first_of('@');
  134. string sEndpoints;
  135. string sInactiveEndpoints;
  136. if (pos != string::npos)
  137. {
  138. //[直接连接]指定服务的IP和端口列表
  139. _objName = sObjName.substr(0,pos);
  140. sEndpoints = sObjName.substr(pos + 1);
  141. pos = _objName.find_first_of("#");
  142. if(pos != string::npos)
  143. {
  144. _objName = _objName.substr(0, pos);
  145. }
  146. _direct = true;
  147. _valid = true;
  148. }
  149. else
  150. {
  151. //[间接连接]通过registry查询服务端的IP和端口列表
  152. _direct = false;
  153. _valid = false;
  154. _objName = sObjName;
  155. if(_locator.find_first_not_of('@') == string::npos)
  156. {
  157. TLOGERROR("[QueryEpBase::setObjName locator is not valid,_locator:" << _locator << "]" << endl);
  158. throw TarsRegistryException("locator is not valid,_locator:" + _locator);
  159. }
  160. pos = _objName.find_first_of("#");
  161. if(pos != string::npos)
  162. {
  163. _objName = _objName.substr(0, pos);
  164. }
  165. _queryFPrx = _communicator->stringToProxy<QueryFPrx>(_locator);
  166. string sLocatorKey = _locator;
  167. //如果启用set,则获取按set分组的缓存
  168. if(ClientConfig::SetOpen)
  169. {
  170. sLocatorKey += "_" + ClientConfig::SetDivision;
  171. }
  172. string objName = _objName + string(_invokeSetId.empty() ? "" : ":") + _invokeSetId;
  173. //[间接连接]第一次使用cache,如果是接口级请求则不从缓存读取
  174. if(!_interfaceReq)
  175. {
  176. sEndpoints = AppCache::getInstance()->get(objName,sLocatorKey);
  177. sInactiveEndpoints = AppCache::getInstance()->get("inactive_"+objName,sLocatorKey);
  178. }
  179. }
  180. setEndpoints(sEndpoints,_activeEndpoints);
  181. setEndpoints(sInactiveEndpoints,_inactiveEndpoints);
  182. if(!_activeEndpoints.empty())
  183. {
  184. _valid = true;
  185. }
  186. if((!_activeEndpoints.empty() || !_inactiveEndpoints.empty()))
  187. {
  188. //非直接指定端口, 且从cache中能查到服务端口的, 不需要通知所有ObjectProxy更新地址
  189. notifyEndpoints(_activeEndpoints,_inactiveEndpoints,true);
  190. }
  191. }
  192. void QueryEpBase::setEndpoints(const string & sEndpoints, set<EndpointInfo> & setEndpoints)
  193. {
  194. if(sEndpoints == "")
  195. {
  196. return ;
  197. }
  198. bool bSameWeightType = true;
  199. bool bFirstWeightType = true;
  200. unsigned int iWeightType = 0;
  201. vector<string> vEndpoints = TC_Endpoint::sepEndpoint(sEndpoints);
  202. for (size_t i = 0; i < vEndpoints.size(); ++i)
  203. {
  204. try
  205. {
  206. TC_Endpoint ep(vEndpoints[i]);
  207. string sSetDivision;
  208. //解析set分组信息
  209. if (!_direct)
  210. {
  211. string sep = " -s ";
  212. size_t pos = vEndpoints[i].rfind(sep);
  213. if (pos != string::npos)
  214. {
  215. sSetDivision = TC_Common::trim(vEndpoints[i].substr(pos+sep.size()));
  216. size_t endPos = sSetDivision.find(" ");
  217. if (endPos != string::npos)
  218. {
  219. sSetDivision = sSetDivision.substr(0, endPos);
  220. }
  221. }
  222. }
  223. if(bFirstWeightType)
  224. {
  225. bFirstWeightType = false;
  226. iWeightType = ep.getWeightType();
  227. }
  228. else
  229. {
  230. if(ep.getWeightType() != iWeightType)
  231. {
  232. bSameWeightType = false;
  233. }
  234. }
  235. EndpointInfo epi(ep, sSetDivision);
  236. setEndpoints.insert(epi);
  237. }
  238. catch (exception &ex)
  239. {
  240. TLOGERROR("[QueryEpBase::setEndpoints parse error,objname:" << _objName << ",endpoint:" << vEndpoints[i] << "]" << endl);
  241. }
  242. }
  243. if(bSameWeightType)
  244. {
  245. if(iWeightType == 1)
  246. {
  247. _weightType = E_STATIC_WEIGHT;
  248. }
  249. else
  250. {
  251. _weightType = E_LOOP;
  252. }
  253. }
  254. else
  255. {
  256. _weightType = E_LOOP;
  257. }
  258. }
  259. void QueryEpBase::refreshReg(GetEndpointType type, const string & sName)
  260. {
  261. onUpdateOutter();
  262. if(_direct)
  263. {
  264. return;
  265. }
  266. int64_t iNow = TNOWMS;
  267. //正在请求状态 而且请求超时了,或者第一次
  268. if(_requestRegistry && _requestTimeout < iNow)
  269. {
  270. doEndpointsExp(0);
  271. }
  272. //如果是间接连接,通过registry定时查询服务列表
  273. //正在请求状态 而且请求超时了
  274. //非请求状态 到了下一个刷新时间了
  275. if( (!_requestRegistry) && (_refreshTime <= iNow))
  276. {
  277. _requestRegistry = true;
  278. //一定时间不回调就算超时了
  279. _requestTimeout = iNow + _timeoutInterval;
  280. TLOGTARS("[QueryEpBase::refresh," << _objName << "]" <<endl);
  281. if(_valid && !_rootServant)
  282. {
  283. return;
  284. }
  285. //判断是同步调用还是异步调用
  286. //内部请求主控都是异步请求
  287. //接口请求主控第一次是同步请求
  288. bool bSync = (!_valid && _interfaceReq);
  289. //如果是异步且不是根servant(通过#1创建的servant, 不主动更新主控信息)
  290. if(!bSync && !_rootServant)
  291. return;
  292. try
  293. {
  294. if(bSync)
  295. {
  296. vector<EndpointF> activeEp;
  297. vector<EndpointF> inactiveEp;
  298. int iRet = 0;
  299. switch(type)
  300. {
  301. case E_ALL:
  302. {
  303. iRet = _queryFPrx->findObjectById4Any(_objName,activeEp,inactiveEp, ServerConfig::Context);
  304. break;
  305. }
  306. case E_STATION:
  307. {
  308. iRet = _queryFPrx->findObjectByIdInSameStation(_objName,sName,activeEp,inactiveEp, ServerConfig::Context);
  309. break;
  310. }
  311. case E_SET:
  312. {
  313. iRet = _queryFPrx->findObjectByIdInSameSet(_objName,sName,activeEp,inactiveEp, ServerConfig::Context);
  314. break;
  315. }
  316. case E_DEFAULT:
  317. default:
  318. {
  319. if(ClientConfig::SetOpen || !_invokeSetId.empty())
  320. {
  321. //指定set调用时,指定set的优先级最高
  322. string setId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
  323. iRet = _queryFPrx->findObjectByIdInSameSet(_objName,setId,activeEp,inactiveEp, ServerConfig::Context);
  324. }
  325. else
  326. {
  327. iRet = _queryFPrx->findObjectByIdInSameGroup(_objName,activeEp,inactiveEp, ServerConfig::Context);
  328. }
  329. break;
  330. }
  331. }
  332. doEndpoints(activeEp, inactiveEp, iRet, true);
  333. }
  334. else
  335. {
  336. switch(type)
  337. {
  338. case E_ALL:
  339. {
  340. _queryFPrx->async_findObjectById4Any(this,_objName, ServerConfig::Context);
  341. break;
  342. }
  343. case E_STATION:
  344. {
  345. _queryFPrx->async_findObjectByIdInSameStation(this,_objName,sName, ServerConfig::Context);
  346. break;
  347. }
  348. case E_SET:
  349. {
  350. _queryFPrx->async_findObjectByIdInSameSet(this,_objName,sName, ServerConfig::Context);
  351. break;
  352. }
  353. case E_DEFAULT:
  354. default:
  355. {
  356. if(ClientConfig::SetOpen || !_invokeSetId.empty())
  357. {
  358. //指定set调用时,指定set的优先级最高
  359. string setId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
  360. _queryFPrx->async_findObjectByIdInSameSet(this,_objName,setId, ServerConfig::Context);
  361. }
  362. else
  363. {
  364. _queryFPrx->async_findObjectByIdInSameGroup(this,_objName, ServerConfig::Context);
  365. }
  366. break;
  367. }
  368. }//end switch
  369. }
  370. }
  371. catch(TC_Exception & ex)
  372. {
  373. TLOGERROR("[QueryEpBase::refreshReg obj:"<<_objName<<"exception:"<<ex.what() << "]"<<endl);
  374. doEndpointsExp(TARSSERVERUNKNOWNERR);
  375. }
  376. catch(...)
  377. {
  378. TLOGERROR("[QueryEpBase::refreshReg obj:"<<_objName<<"unknown exception]" <<endl);
  379. doEndpointsExp(TARSSERVERUNKNOWNERR);
  380. }
  381. }
  382. }
  383. void QueryEpBase::doEndpoints(const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp, int iRet, bool bSync)
  384. {
  385. if(iRet != 0)
  386. {
  387. doEndpointsExp(iRet);
  388. return ;
  389. }
  390. _failTimes = 0;
  391. _requestRegistry = false;
  392. int64_t iNow = TNOWMS;
  393. //有返回成功的结点,按照正常的频率
  394. //如果返回空列表或者返回失败 2s去刷新一次
  395. //接口请求主控的方式 不管是不是空都要去刷新
  396. if(activeEp.empty() && (!_interfaceReq) )
  397. {
  398. _refreshTime = iNow + _activeEmptyInterval;
  399. //如果registry返回Active服务列表为空,不做更新
  400. TLOGERROR("[QueryEpBase::doEndpoints, callback activeEps is empty,objname:"<< _objName << "]" << endl);
  401. return;
  402. }
  403. else
  404. {
  405. _refreshTime = iNow + _refreshInterval;
  406. }
  407. bool bNeedNotify = false;
  408. bool bSameWeightType = true;
  409. bool bFirstWeightType = true;
  410. int iWeightType = 0;
  411. set<string> sActiveEndpoints;
  412. set<string> sInactiveEndpoints;
  413. set<EndpointInfo> activeEps;
  414. set<EndpointInfo> inactiveEps;
  415. //生成active set 用于比较
  416. for (uint32_t i = 0; i < activeEp.size(); ++i)
  417. {
  418. if(bFirstWeightType)
  419. {
  420. bFirstWeightType = false;
  421. iWeightType = activeEp[i].weightType;
  422. }
  423. else
  424. {
  425. if(activeEp[i].weightType != iWeightType)
  426. {
  427. bSameWeightType = false;
  428. }
  429. }
  430. // taf istcp意思和这里枚举值对应
  431. activeEps.insert(EndpointInfo(activeEp[i]));
  432. }
  433. //生成inactive set 用于比较
  434. for (uint32_t i = 0; i < inactiveEp.size(); ++i)
  435. {
  436. // taf istcp意思和这里枚举值对应
  437. inactiveEps.insert(EndpointInfo(inactiveEp[i]));
  438. }
  439. if(bSameWeightType)
  440. {
  441. if(iWeightType == 1)
  442. {
  443. _weightType = E_STATIC_WEIGHT;
  444. }
  445. else
  446. {
  447. _weightType = E_LOOP;
  448. }
  449. }
  450. else
  451. {
  452. _weightType = E_LOOP;
  453. }
  454. if(activeEps != _activeEndpoints)
  455. {
  456. bNeedNotify = true;
  457. _activeEndpoints = activeEps;
  458. if(_firstNetThread)
  459. {
  460. setEndPointToCache(false);
  461. }
  462. }
  463. if(inactiveEps != _inactiveEndpoints)
  464. {
  465. bNeedNotify = true;
  466. _inactiveEndpoints = inactiveEps;
  467. if(_firstNetThread)
  468. {
  469. setEndPointToCache(true);
  470. }
  471. }
  472. if(bNeedNotify)
  473. {
  474. notifyEndpoints(_activeEndpoints,_inactiveEndpoints,bSync);
  475. }
  476. if(!_valid)
  477. {
  478. _valid = true;
  479. doNotify();
  480. }
  481. }
  482. void QueryEpBase::doEndpointsExp(int iRet)
  483. {
  484. _failTimes++;
  485. _requestRegistry = false;
  486. int64_t iNow = TNOWMS;
  487. //频率控制获取主控失败 2秒钟再更新
  488. _refreshTime = iNow + _failInterval;
  489. //获取主控连续失败3次就等30s再更新一次
  490. //连续失败 强制设成数据是有效的
  491. if(_failTimes > _failTimesLimit)
  492. {
  493. if(!_valid)
  494. {
  495. _valid = true;
  496. doNotify();
  497. }
  498. _refreshTime = iNow + _manyFailInterval;
  499. }
  500. }
  501. void QueryEpBase::setEndPointToCache(bool bInactive)
  502. {
  503. //如果是接口级请求则不缓存到文件
  504. if(_interfaceReq)
  505. {
  506. return;
  507. }
  508. string sEndpoints;
  509. set<EndpointInfo> doEndpoints;
  510. if(!bInactive)
  511. {
  512. doEndpoints = _activeEndpoints;
  513. }
  514. else
  515. {
  516. doEndpoints = _inactiveEndpoints;
  517. }
  518. set<EndpointInfo>::iterator iter;
  519. iter = doEndpoints.begin();
  520. for (; iter != doEndpoints.end(); ++iter)
  521. {
  522. //这里的超时时间 只是对服务端有效。这里的值无效。所以默认用3000了
  523. TC_Endpoint ep = iter->getEndpoint();
  524. if (!sEndpoints.empty())
  525. {
  526. sEndpoints += ":";
  527. }
  528. sEndpoints += ep.toString();
  529. if (!iter->setDivision().empty())
  530. {
  531. sEndpoints += " -s " + iter->setDivision();
  532. }
  533. }
  534. //如果启用set,则按set分组保存
  535. string sLocatorKey = _locator;
  536. if(ClientConfig::SetOpen)
  537. {
  538. sLocatorKey += "_" + ClientConfig::SetDivision;
  539. }
  540. string objName = _objName + string(_invokeSetId.empty()?"":":") + _invokeSetId;
  541. if(bInactive)
  542. {
  543. AppCache::getInstance()->set("inactive_"+objName,sEndpoints,sLocatorKey);
  544. }
  545. else
  546. {
  547. AppCache::getInstance()->set(objName,sEndpoints,sLocatorKey);
  548. }
  549. TLOGTARS("[setEndPointToCache,obj:" << _objName << ",invokeSetId:" << _invokeSetId << ",endpoint:" << sEndpoints << "]" << endl);
  550. }
  551. EndpointManager::EndpointManager(ObjectProxy * pObjectProxy, Communicator* pComm, bool bFirstNetThread)
  552. : QueryEpBase(pComm, bFirstNetThread, false)
  553. ,_objectProxy(pObjectProxy)
  554. ,_lastRoundPosition(0)
  555. ,_update(true)
  556. ,_updateWeightInterval(60)
  557. ,_lastSWeightPosition(0)
  558. ,_consistentHashWeight(E_TC_CONHASH_KETAMAHASH)
  559. ,_consistentHash(E_TC_CONHASH_KETAMAHASH)
  560. {
  561. setNetThreadProcess(true);
  562. }
  563. EndpointManager::~EndpointManager()
  564. {
  565. map<string,AdapterProxy*>::iterator iterAdapter;
  566. for(iterAdapter = _allProxys.begin();iterAdapter != _allProxys.end();iterAdapter++)
  567. {
  568. if(iterAdapter->second)
  569. {
  570. delete iterAdapter->second;
  571. iterAdapter->second = NULL;
  572. }
  573. }
  574. _allProxys.clear();
  575. }
  576. void EndpointManager::onUpdateOutter()
  577. {
  578. // LOG_CONSOLE_DEBUG << this->_objectProxy << ", valid:" << _valid << ", " << _outterUpdate.get() << endl;
  579. assert(this->_objectProxy->getCommunicatorEpoll()->getThreadId() == this_thread::get_id());
  580. lock_guard<mutex> l(_outterLocker);
  581. if(_outterUpdate)
  582. {
  583. shared_ptr<OutterUpdate> outterUpdate = _outterUpdate;
  584. updateEndpoints(outterUpdate->active, outterUpdate->inactive);
  585. _valid = true;
  586. _outterUpdate.reset();
  587. }
  588. }
  589. void EndpointManager::updateEndpointsOutter(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
  590. {
  591. // LOG_CONSOLE_DEBUG << this->_objectProxy << ", " << active.begin()->desc() << endl;
  592. //创新新对象, 避免线程冲突
  593. shared_ptr<OutterUpdate> outterUpdate = std::make_shared<OutterUpdate>();
  594. outterUpdate->active = active;
  595. outterUpdate->inactive = inactive;
  596. //更新时间
  597. _refreshTime = TNOWMS + _refreshInterval;
  598. lock_guard<mutex> l(_outterLocker);
  599. _outterUpdate = outterUpdate;
  600. }
  601. void EndpointManager::updateEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
  602. {
  603. TLOGTARS("[EndpointManager::updateEndpoints obj:" << this->_objName << ", active:" << active.size() << ", inactive size:" << inactive.size() << endl);
  604. pair<map<string,AdapterProxy*>::iterator,bool> result;
  605. _activeProxys.clear();
  606. _regProxys.clear();
  607. _indexActiveProxys.clear();
  608. _sortActivProxys.clear();
  609. if(!active.empty())
  610. {
  611. //先把服务都设置为非活跃
  612. for (auto iter = _allProxys.begin(); iter != _allProxys.end(); ++iter)
  613. {
  614. iter->second->setActiveInReg(false);
  615. }
  616. }
  617. //更新active
  618. for(auto iter = active.begin(); iter != active.end(); ++iter)
  619. {
  620. if(!_direct && _weightType == E_STATIC_WEIGHT && iter->weight() <= 0)
  621. {
  622. continue;
  623. }
  624. // LOG_CONSOLE_DEBUG << std::this_thread::get_id() << ", allProxys size:" << _allProxys.size() << ", " << iter->cmpDesc() << endl;
  625. auto iterAdapter = _allProxys.find(iter->cmpDesc());
  626. if(iterAdapter == _allProxys.end())
  627. {
  628. AdapterProxy* ap = new AdapterProxy(_objectProxy, *iter, _communicator);
  629. result = _allProxys.insert(make_pair(iter->cmpDesc(),ap));
  630. iterAdapter = result.first;
  631. _vAllProxys.push_back(ap);
  632. }
  633. //该节点在主控的状态为active
  634. iterAdapter->second->setActiveInReg(true);
  635. _activeProxys.push_back(iterAdapter->second);
  636. _regProxys.insert(make_pair(iter->cmpDesc(),iterAdapter->second));
  637. const string &host = iterAdapter->second->endpoint().host();
  638. _indexActiveProxys.insert(make_pair(host, iterAdapter->second));
  639. _sortActivProxys.insert(make_pair(host, iterAdapter->second));
  640. //设置该节点的静态权重值
  641. iterAdapter->second->setWeight(iter->weight());
  642. }
  643. //更新inactive
  644. for(auto iter = inactive.begin(); iter != inactive.end(); ++iter)
  645. {
  646. if(!_direct && _weightType == E_STATIC_WEIGHT && iter->weight() <= 0)
  647. {
  648. continue;
  649. }
  650. auto iterAdapter = _allProxys.find(iter->cmpDesc());
  651. if(iterAdapter == _allProxys.end())
  652. {
  653. AdapterProxy* ap = new AdapterProxy(_objectProxy, *iter, _communicator);
  654. result = _allProxys.insert(make_pair(iter->cmpDesc(),ap));
  655. assert(result.second);
  656. iterAdapter = result.first;
  657. _vAllProxys.push_back(ap);
  658. }
  659. //该节点在主控的状态为inactive
  660. iterAdapter->second->setActiveInReg(false);
  661. _regProxys.insert(make_pair(iter->cmpDesc(),iterAdapter->second));
  662. //设置该节点的静态权重值
  663. iterAdapter->second->setWeight(iter->weight());
  664. }
  665. //_vRegProxys 需要按顺序来 重排
  666. _vRegProxys.clear();
  667. auto iterAdapter = _regProxys.begin();
  668. for(;iterAdapter != _regProxys.end();++iterAdapter)
  669. {
  670. _vRegProxys.push_back(iterAdapter->second);
  671. }
  672. _update = true;
  673. }
  674. void EndpointManager::notifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive,bool bNotify)
  675. {
  676. updateEndpoints(active, inactive);
  677. //丢给外层统一做
  678. _objectProxy->onNotifyEndpoints(active, inactive);
  679. }
  680. void EndpointManager::doNotify()
  681. {
  682. _objectProxy->doInvoke();
  683. }
  684. bool EndpointManager::selectAdapterProxy(ReqMessage * msg,AdapterProxy * & pAdapterProxy)
  685. {
  686. pAdapterProxy = NULL;
  687. //刷新主控
  688. refreshReg(E_DEFAULT, "");
  689. //无效的数据 返回true
  690. if (!_valid)
  691. {
  692. return true;
  693. }
  694. //如果有hash,则先使用hash策略
  695. if (msg->data._hash)
  696. {
  697. pAdapterProxy = getHashProxy(msg->data._hashCode, msg->data._conHash);
  698. return false;
  699. }
  700. if(_weightType == E_STATIC_WEIGHT)
  701. {
  702. //权重模式
  703. bool bStaticWeighted = false;
  704. if(_weightType == E_STATIC_WEIGHT || msg->eType == ReqMessage::ONE_WAY)
  705. bStaticWeighted = true;
  706. pAdapterProxy = getWeightedProxy(bStaticWeighted);
  707. }
  708. else
  709. {
  710. //普通轮询模式
  711. pAdapterProxy = getNextValidProxy();
  712. }
  713. return false;
  714. }
  715. AdapterProxy * EndpointManager::getNextValidProxy()
  716. {
  717. if (_activeProxys.empty())
  718. {
  719. TLOGERROR("[EndpointManager::getNextValidProxy activeEndpoints is empty][obj:"<<_objName<<"]" << endl);
  720. return NULL;
  721. }
  722. vector<AdapterProxy*> conn;
  723. for(size_t i=0;i<_activeProxys.size();i++)
  724. {
  725. ++_lastRoundPosition;
  726. if(_lastRoundPosition >= _activeProxys.size())
  727. {
  728. _lastRoundPosition = 0;
  729. }
  730. if(_activeProxys[_lastRoundPosition]->checkActive(false))
  731. {
  732. return _activeProxys[_lastRoundPosition];
  733. }
  734. if(!_activeProxys[_lastRoundPosition]->isConnTimeout() && !_activeProxys[_lastRoundPosition]->isConnExc()) {
  735. conn.push_back(_activeProxys[_lastRoundPosition]);
  736. }
  737. }
  738. if(conn.size() > 0)
  739. {
  740. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  741. AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
  742. //该proxy可能已经被屏蔽,需重新连一次
  743. adapterProxy->checkActive(true);
  744. return adapterProxy;
  745. }
  746. //所有adapter都有问题 选不到结点,随机找一个重试
  747. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  748. adapterProxy->resetRetryTime(false);
  749. //该proxy可能已经被屏蔽,需重新连一次
  750. adapterProxy->checkActive(true);
  751. return adapterProxy;
  752. }
  753. AdapterProxy* EndpointManager::getHashProxy(uint32_t hashCode, bool bConsistentHash)
  754. {
  755. if(_weightType == E_STATIC_WEIGHT)
  756. {
  757. if(bConsistentHash)
  758. {
  759. return getConHashProxyForWeight(hashCode, true);
  760. }
  761. else
  762. {
  763. return getHashProxyForWeight(hashCode, true, _hashStaticRouterCache);
  764. }
  765. }
  766. else
  767. {
  768. if(bConsistentHash)
  769. {
  770. return getConHashProxyForNormal(hashCode);
  771. }
  772. else
  773. {
  774. return getHashProxyForNormal(hashCode);
  775. }
  776. }
  777. }
  778. AdapterProxy* EndpointManager::getHashProxyForWeight(uint32_t hashCode, bool bStatic, vector<size_t> &vRouterCache)
  779. {
  780. if(_vRegProxys.empty())
  781. {
  782. TLOGERROR("[EndpointManager::getHashProxyForWeight _vRegProxys is empty], bStatic:" << bStatic << endl);
  783. return NULL;
  784. }
  785. if(checkHashStaticWeightChange(bStatic))
  786. {
  787. int64_t iBegin = TNOWMS;
  788. updateHashProxyWeighted(bStatic);
  789. int64_t iEnd = TNOWMS;
  790. TLOGTARS("[EndpointManager::getHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
  791. }
  792. if(vRouterCache.size() > 0)
  793. {
  794. size_t hash = hashCode % vRouterCache.size();
  795. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  796. if(hash >= vRouterCache.size())
  797. {
  798. hash = hash % vRouterCache.size();
  799. }
  800. size_t iIndex = vRouterCache[hash];
  801. if(iIndex >= _vRegProxys.size())
  802. {
  803. iIndex = iIndex % _vRegProxys.size();
  804. }
  805. //被hash到的节点在主控是active的才走在流程
  806. if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive(true))
  807. {
  808. return _vRegProxys[iIndex];
  809. }
  810. else
  811. {
  812. TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << _vRegProxys[iIndex]->endpoint().desc() << endl);
  813. if(_activeProxys.empty())
  814. {
  815. TLOGERROR("[EndpointManager::getHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
  816. return NULL;
  817. }
  818. //在active节点中再次hash
  819. vector<AdapterProxy*> thisHash = _activeProxys;
  820. vector<AdapterProxy*> conn;
  821. do
  822. {
  823. hash = hashCode % thisHash.size();
  824. if (thisHash[hash]->checkActive(true))
  825. {
  826. return thisHash[hash];
  827. }
  828. if(!thisHash[hash]->isConnTimeout() &&
  829. !thisHash[hash]->isConnExc())
  830. {
  831. conn.push_back(thisHash[hash]);
  832. }
  833. thisHash.erase(thisHash.begin() + hash);
  834. }
  835. while(!thisHash.empty());
  836. if(conn.size() > 0)
  837. {
  838. hash = hashCode % conn.size();
  839. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  840. AdapterProxy *adapterProxy = conn[hash];
  841. //该proxy可能已经被屏蔽,需重新连一次
  842. adapterProxy->checkActive(true);
  843. return adapterProxy;
  844. }
  845. //所有adapter都有问题 选不到结点,随机找一个重试
  846. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  847. adapterProxy->resetRetryTime(false);
  848. //该proxy可能已经被屏蔽,需重新连一次
  849. adapterProxy->checkActive(true);
  850. return adapterProxy;
  851. }
  852. }
  853. return getHashProxyForNormal(hashCode);
  854. }
  855. AdapterProxy* EndpointManager::getConHashProxyForWeight(uint32_t hashCode, bool bStatic)
  856. {
  857. if(_vRegProxys.empty())
  858. {
  859. TLOGERROR("[EndpointManager::getConHashProxyForWeight _vRegProxys is empty], bStatic:" << bStatic << endl);
  860. return NULL;
  861. }
  862. if(checkConHashChange(bStatic, _lastConHashWeightProxys))
  863. {
  864. int64_t iBegin = TNOWMS;
  865. updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
  866. int64_t iEnd = TNOWMS;
  867. TLOGTARS("[EndpointManager::getConHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
  868. }
  869. while(_consistentHashWeight.size() > 0)
  870. {
  871. string sNode;
  872. // 通过一致性hash取到对应的节点
  873. _consistentHashWeight.getNodeName(hashCode, sNode);
  874. auto it = _indexActiveProxys.find(sNode);
  875. // 节点不存在,可能是下线或者服务不可用
  876. if (it == _indexActiveProxys.end())
  877. {
  878. updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
  879. continue;
  880. }
  881. //被hash到的节点在主控是active的才走在流程
  882. if (it->second->isActiveInReg() && it->second->checkActive(true))
  883. {
  884. return it->second;
  885. }
  886. else
  887. {
  888. TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl);
  889. // 剔除节点再次hash
  890. if (!it->second->isActiveInReg())
  891. {
  892. // 如果在主控的注册状态不是active直接删除,如果状态有变更由updateEndpoints函数里重新添加
  893. _indexActiveProxys.erase(sNode);
  894. }
  895. // checkConHashChange里重新加回到_sortActivProxys重试
  896. _sortActivProxys.erase(sNode);
  897. updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
  898. if (_indexActiveProxys.empty())
  899. {
  900. TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
  901. return NULL;
  902. }
  903. }
  904. }
  905. return getHashProxyForNormal(hashCode);
  906. }
  907. bool EndpointManager::checkHashStaticWeightChange(bool bStatic)
  908. {
  909. if(bStatic)
  910. {
  911. if(_lastHashStaticProxys.size() != _vRegProxys.size())
  912. {
  913. return true;
  914. }
  915. for(size_t i = 0; i < _vRegProxys.size(); i++)
  916. {
  917. //解决服务权重更新时哈希表不更新的问题
  918. if((_lastHashStaticProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc()) || _vRegProxys[i]->checkWeightChanged(true))
  919. {
  920. return true;
  921. }
  922. }
  923. }
  924. return false;
  925. }
  926. bool EndpointManager::checkConHashChange(bool bStatic, const map<string, AdapterProxy*> &mLastConHashProxys)
  927. {
  928. // 将之前故障临时剔除的节点重新加回来重试
  929. if (_indexActiveProxys.size() != _sortActivProxys.size())
  930. {
  931. for (auto &it : _indexActiveProxys)
  932. {
  933. _sortActivProxys[it.first] = it.second;
  934. }
  935. }
  936. if(mLastConHashProxys.size() != _sortActivProxys.size())
  937. {
  938. return true;
  939. }
  940. auto itLast = mLastConHashProxys.begin();
  941. auto itSort = _sortActivProxys.begin();
  942. for (; itLast!=mLastConHashProxys.end() && itSort!=_sortActivProxys.end(); ++itLast,++itSort)
  943. {
  944. if (itLast->first != itSort->first)
  945. {
  946. return true;
  947. }
  948. //解决服务权重更新时一致性哈希环不更新的问题
  949. if(bStatic && itSort->second->checkWeightChanged(true))
  950. {
  951. return true;
  952. }
  953. }
  954. /*
  955. if(vLastConHashProxys.size() != _vRegProxys.size())
  956. {
  957. return true;
  958. }
  959. for(size_t i = 0; i < _vRegProxys.size(); i++)
  960. {
  961. if(vLastConHashProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc())
  962. {
  963. return true;
  964. }
  965. //解决服务权重更新时一致性哈希环不更新的问题
  966. if(bStatic && _vRegProxys[i]->checkWeightChanged(true))
  967. {
  968. return true;
  969. }
  970. }
  971. */
  972. return false;
  973. }
  974. void EndpointManager::updateHashProxyWeighted(bool bStatic)
  975. {
  976. if(_vRegProxys.size() <= 0)
  977. {
  978. TLOGERROR("[EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
  979. return ;
  980. }
  981. if(bStatic)
  982. {
  983. _lastHashStaticProxys = _vRegProxys;
  984. _hashStaticRouterCache.clear();
  985. }
  986. vector<AdapterProxy*> vRegProxys;
  987. vector<size_t> vIndex;
  988. for(size_t i = 0; i < _vRegProxys.size(); ++i)
  989. {
  990. if(_vRegProxys[i]->getWeight() > 0)
  991. {
  992. vRegProxys.push_back(_vRegProxys[i]);
  993. vIndex.push_back(i);
  994. }
  995. //防止多个服务节点权重同时更新时哈希表多次更新
  996. _vRegProxys[i]->resetWeightChanged();
  997. }
  998. if(vRegProxys.size() <= 0)
  999. {
  1000. TLOGERROR("[EndpointManager::updateHashProxyWeighted vRegProxys is empty], bStatic:" << bStatic << endl);
  1001. return ;
  1002. }
  1003. size_t iHashStaticWeightSize = vRegProxys.size();
  1004. map<size_t, int> mIdToWeight;
  1005. multimap<int, size_t> mWeightToId;
  1006. size_t iMaxR = 0;
  1007. size_t iMaxRouterR = 0;
  1008. size_t iMaxWeight = vRegProxys[0]->getWeight();
  1009. size_t iMinWeight = vRegProxys[0]->getWeight();
  1010. size_t iTempWeight = 0;
  1011. for(size_t i = 1;i < iHashStaticWeightSize; i++)
  1012. {
  1013. iTempWeight = vRegProxys[i]->getWeight();
  1014. if(iTempWeight > iMaxWeight)
  1015. {
  1016. iMaxWeight = iTempWeight;
  1017. }
  1018. if(iTempWeight < iMinWeight)
  1019. {
  1020. iMinWeight = iTempWeight;
  1021. }
  1022. }
  1023. if(iMinWeight > 0)
  1024. {
  1025. iMaxR = iMaxWeight / iMinWeight;
  1026. if(iMaxR < iMinWeightLimit)
  1027. iMaxR = iMinWeightLimit;
  1028. if(iMaxR > iMaxWeightLimit)
  1029. iMaxR = iMaxWeightLimit;
  1030. }
  1031. else
  1032. {
  1033. iMaxR = 1;
  1034. iMaxWeight = 1;
  1035. }
  1036. for(size_t i = 0;i < iHashStaticWeightSize; i++)
  1037. {
  1038. int iWeight = (vRegProxys[i]->getWeight() * iMaxR) / iMaxWeight;
  1039. if(iWeight > 0)
  1040. {
  1041. iMaxRouterR += iWeight;
  1042. mIdToWeight.insert(map<size_t, int>::value_type(vIndex[i], iWeight));
  1043. mWeightToId.insert(make_pair(iWeight, vIndex[i]));
  1044. }
  1045. else
  1046. {
  1047. if(bStatic)
  1048. {
  1049. _hashStaticRouterCache.push_back(vIndex[i]);
  1050. }
  1051. }
  1052. TLOGTARS("EndpointManager::updateHashProxyWeighted bStatic:" << bStatic << "|_objName:" << _objName << "|endpoint:" << vRegProxys[i]->endpoint().desc() << "|iWeight:" << vRegProxys[i]->getWeight() << "|iWeightR:" << iWeight << "|iIndex:" << vIndex[i] << endl);
  1053. }
  1054. for(size_t i = 0; i < iMaxRouterR; i++)
  1055. {
  1056. bool bFirst = true;
  1057. multimap<int, size_t> mulTemp;
  1058. multimap<int, size_t>::reverse_iterator mIter = mWeightToId.rbegin();
  1059. while(mIter != mWeightToId.rend())
  1060. {
  1061. if(bFirst)
  1062. {
  1063. bFirst = false;
  1064. if(bStatic)
  1065. {
  1066. _hashStaticRouterCache.push_back(mIter->second);
  1067. }
  1068. mulTemp.insert(make_pair((mIter->first - iMaxRouterR + mIdToWeight[mIter->second]), mIter->second));
  1069. }
  1070. else
  1071. {
  1072. mulTemp.insert(make_pair((mIter->first + mIdToWeight[mIter->second]), mIter->second));
  1073. }
  1074. mIter++;
  1075. }
  1076. mWeightToId.clear();
  1077. mWeightToId.swap(mulTemp);
  1078. }
  1079. }
  1080. void EndpointManager::updateConHashProxyWeighted(bool bStatic, map<string, AdapterProxy*> &mLastConHashProxys, TC_ConsistentHashNew &conHash)
  1081. {
  1082. conHash.clear();
  1083. if(_sortActivProxys.empty())
  1084. {
  1085. TLOGERROR("[EndpointManager::updateHashProxyWeighted _indexActiveProxys is empty], bStatic:" << bStatic << endl);
  1086. return ;
  1087. }
  1088. mLastConHashProxys = _sortActivProxys;
  1089. for (auto it = _sortActivProxys.begin(); it != _sortActivProxys.end(); ++it)
  1090. {
  1091. int iWeight = (bStatic ? (it->second->getWeight()) : 100);
  1092. if(iWeight > 0)
  1093. {
  1094. iWeight = iWeight / 4;
  1095. if(iWeight <= 0)
  1096. {
  1097. iWeight = 1;
  1098. }
  1099. // 同一服务有多个obj的情况
  1100. // 同一hash值调用不同的obj会hash到不同的服务器
  1101. // 因为addNode会根据desc(ip+port)计算md5,导致顺序不一致
  1102. // 一致性hash用host进行索引,不使用index,这里传0
  1103. conHash.addNode(it->second->endpoint().host(), 0, iWeight);
  1104. }
  1105. //防止多个服务节点权重同时更新时一致性哈希环多次更新
  1106. it->second->resetWeightChanged();
  1107. }
  1108. /*
  1109. if(_vRegProxys.size() <= 0)
  1110. {
  1111. TLOGERROR("[EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
  1112. return ;
  1113. }
  1114. vLastConHashProxys = _vRegProxys;
  1115. conHash.clear();
  1116. for(size_t i = 0; i < _vRegProxys.size(); ++i)
  1117. {
  1118. int iWeight = (bStatic ? (_vRegProxys[i]->getWeight()) : 100);
  1119. if(iWeight > 0)
  1120. {
  1121. iWeight = iWeight / 4;
  1122. if(iWeight <= 0)
  1123. {
  1124. iWeight = 1;
  1125. }
  1126. // 同一服务有多个obj的情况
  1127. // 同一hash值调用不同的obj会hash到不同的服务器
  1128. // 因为addNode会根据desc(ip+port)计算md5,导致顺序不一致
  1129. conHash.addNode(_vRegProxys[i]->endpoint().host(), i, iWeight);
  1130. }
  1131. //防止多个服务节点权重同时更新时一致性哈希环多次更新
  1132. _vRegProxys[i]->resetWeightChanged();
  1133. }
  1134. */
  1135. conHash.sortNode();
  1136. }
  1137. AdapterProxy* EndpointManager::getHashProxyForNormal(uint32_t hashCode)
  1138. {
  1139. if(_vRegProxys.empty())
  1140. {
  1141. TLOGERROR("[EndpointManager::getHashProxyForNormal _vRegProxys is empty]" << endl);
  1142. return NULL;
  1143. }
  1144. // 1 _vRegProxys从客户端启动之后,就不会再改变,除非有节点增加
  1145. // 2 如果有增加节点,则_vRegProxys顺序会重新排序,之前的hash会改变
  1146. // 3 节点下线后,需要下次启动客户端后,_vRegProxys内容才会生效
  1147. size_t hash = hashCode % _vRegProxys.size();
  1148. //被hash到的节点在主控是active的才走在流程
  1149. if (_vRegProxys[hash]->isActiveInReg() && _vRegProxys[hash]->checkActive(true))
  1150. {
  1151. return _vRegProxys[hash];
  1152. }
  1153. else
  1154. {
  1155. TLOGWARN("[EndpointManager::getHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << _vRegProxys[hash]->endpoint().desc() << endl);
  1156. if(_activeProxys.empty())
  1157. {
  1158. TLOGERROR("[EndpointManager::getHashProxyForNormal _activeEndpoints is empty]" << endl);
  1159. return NULL;
  1160. }
  1161. //在active节点中再次hash
  1162. vector<AdapterProxy*> thisHash = _activeProxys;
  1163. vector<AdapterProxy*> conn;
  1164. do
  1165. {
  1166. hash = hashCode % thisHash.size();
  1167. if (thisHash[hash]->checkActive(true))
  1168. {
  1169. return thisHash[hash];
  1170. }
  1171. if(!thisHash[hash]->isConnTimeout() &&
  1172. !thisHash[hash]->isConnExc())
  1173. {
  1174. conn.push_back(thisHash[hash]);
  1175. }
  1176. thisHash.erase(thisHash.begin() + hash);
  1177. }
  1178. while(!thisHash.empty());
  1179. if(conn.size() > 0)
  1180. {
  1181. hash = hashCode % conn.size();
  1182. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1183. AdapterProxy *adapterProxy = conn[hash];
  1184. //该proxy可能已经被屏蔽,需重新连一次
  1185. adapterProxy->checkActive(true);
  1186. return adapterProxy;
  1187. }
  1188. //所有adapter都有问题 选不到结点,随机找一个重试
  1189. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  1190. adapterProxy->resetRetryTime(false);
  1191. //该proxy可能已经被屏蔽,需重新连一次
  1192. adapterProxy->checkActive(true);
  1193. return adapterProxy;
  1194. }
  1195. }
  1196. AdapterProxy* EndpointManager::getConHashProxyForNormal(uint32_t hashCode)
  1197. {
  1198. if(_vRegProxys.empty())
  1199. {
  1200. TLOGERROR("[EndpointManager::getConHashProxyForNormal _vRegProxys is empty]" << endl);
  1201. return NULL;
  1202. }
  1203. if(checkConHashChange(false, _lastConHashProxys))
  1204. {
  1205. int64_t iBegin = TNOWMS;
  1206. updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
  1207. int64_t iEnd = TNOWMS;
  1208. TLOGTARS("[EndpointManager::getConHashProxyForNormal update _objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
  1209. }
  1210. while(_consistentHash.size() > 0)
  1211. {
  1212. string sNode;
  1213. // 通过一致性hash取到对应的节点
  1214. _consistentHash.getNodeName(hashCode, sNode);
  1215. auto it = _indexActiveProxys.find(sNode);
  1216. // 节点不存在,可能是下线或者服务不可用
  1217. if (it == _indexActiveProxys.end())
  1218. {
  1219. updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
  1220. continue;
  1221. }
  1222. //被hash到的节点在主控是active的才走在流程
  1223. if (it->second->isActiveInReg() && it->second->checkActive(true))
  1224. {
  1225. return it->second;
  1226. }
  1227. else
  1228. {
  1229. TLOGWARN("[EndpointManager::getConHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl);
  1230. // 剔除节点再次hash
  1231. if (!it->second->isActiveInReg())
  1232. {
  1233. // 如果在主控的注册状态不是active直接删除,如果状态有变更由updateEndpoints函数里重新添加
  1234. _indexActiveProxys.erase(sNode);
  1235. }
  1236. // checkConHashChange里重新加回到_sortActivProxys重试
  1237. _sortActivProxys.erase(sNode);
  1238. updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
  1239. if (_indexActiveProxys.empty())
  1240. {
  1241. TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
  1242. return NULL;
  1243. }
  1244. }
  1245. }
  1246. return getHashProxyForNormal(hashCode);
  1247. }
  1248. AdapterProxy* EndpointManager::getWeightedProxy(bool bStaticWeighted)
  1249. {
  1250. return getWeightedForNormal(bStaticWeighted);
  1251. }
  1252. AdapterProxy* EndpointManager::getWeightedForNormal(bool bStaticWeighted)
  1253. {
  1254. if (_activeProxys.empty())
  1255. {
  1256. TLOGERROR("[EndpointManager::getWeightedForNormal activeEndpoints is empty][obj:"<<_objName<<"]" << endl);
  1257. return NULL;
  1258. }
  1259. int64_t iNow = TNOW;
  1260. if(_lastBuildWeightTime <= iNow)
  1261. {
  1262. updateProxyWeighted();
  1263. if(!_first)
  1264. {
  1265. _lastBuildWeightTime = iNow + _updateWeightInterval;
  1266. }
  1267. else
  1268. {
  1269. _first = false;
  1270. _lastBuildWeightTime = iNow + _updateWeightInterval + 5;
  1271. }
  1272. }
  1273. bool bEmpty = false;
  1274. int iActiveSize = _activeWeightProxy.size();
  1275. if(iActiveSize > 0)
  1276. {
  1277. size_t iProxyIndex = 0;
  1278. set<AdapterProxy*> sConn;
  1279. if(_staticRouterCache.size() > 0)
  1280. {
  1281. for(size_t i = 0;i < _staticRouterCache.size(); i++)
  1282. {
  1283. ++_lastSWeightPosition;
  1284. if(_lastSWeightPosition >= _staticRouterCache.size())
  1285. _lastSWeightPosition = 0;
  1286. iProxyIndex = _staticRouterCache[_lastSWeightPosition];
  1287. if(_activeWeightProxy[iProxyIndex]->checkActive(false))
  1288. {
  1289. return _activeWeightProxy[iProxyIndex];
  1290. }
  1291. if(!_activeWeightProxy[iProxyIndex]->isConnTimeout() &&
  1292. !_activeWeightProxy[iProxyIndex]->isConnExc())
  1293. {
  1294. sConn.insert(_activeWeightProxy[iProxyIndex]);
  1295. }
  1296. }
  1297. }
  1298. else
  1299. {
  1300. bEmpty = true;
  1301. }
  1302. if(!bEmpty)
  1303. {
  1304. if(sConn.size() > 0)
  1305. {
  1306. vector<AdapterProxy*> conn;
  1307. set<AdapterProxy*>::iterator it_conn = sConn.begin();
  1308. while(it_conn != sConn.end())
  1309. {
  1310. conn.push_back(*it_conn);
  1311. ++it_conn;
  1312. }
  1313. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1314. AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
  1315. //该proxy可能已经被屏蔽,需重新连一次
  1316. adapterProxy->checkActive(true);
  1317. return adapterProxy;
  1318. }
  1319. //所有adapter都有问题 选不到结点,随机找一个重试
  1320. AdapterProxy * adapterProxy = _activeWeightProxy[((uint32_t)rand() % iActiveSize)];
  1321. adapterProxy->resetRetryTime(false);
  1322. //该proxy可能已经被屏蔽,需重新连一次
  1323. adapterProxy->checkActive(true);
  1324. return adapterProxy;
  1325. }
  1326. }
  1327. vector<AdapterProxy*> conn;
  1328. for(size_t i=0;i<_activeProxys.size();i++)
  1329. {
  1330. ++_lastRoundPosition;
  1331. if(_lastRoundPosition >= _activeProxys.size())
  1332. _lastRoundPosition = 0;
  1333. if(_activeProxys[_lastRoundPosition]->checkActive(false))
  1334. {
  1335. return _activeProxys[_lastRoundPosition];
  1336. }
  1337. if(!_activeProxys[_lastRoundPosition]->isConnTimeout() &&
  1338. !_activeProxys[_lastRoundPosition]->isConnExc())
  1339. conn.push_back(_activeProxys[_lastRoundPosition]);
  1340. }
  1341. if(conn.size() > 0)
  1342. {
  1343. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1344. AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
  1345. //该proxy可能已经被屏蔽,需重新连一次
  1346. adapterProxy->checkActive(true);
  1347. return adapterProxy;
  1348. }
  1349. //所有adapter都有问题 选不到结点,随机找一个重试
  1350. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  1351. adapterProxy->resetRetryTime(false);
  1352. //该proxy可能已经被屏蔽,需重新连一次
  1353. adapterProxy->checkActive(true);
  1354. return adapterProxy;
  1355. }
  1356. void EndpointManager::updateProxyWeighted()
  1357. {
  1358. size_t iWeightProxySize = _activeProxys.size();
  1359. if(iWeightProxySize <= 0)
  1360. {
  1361. TLOGERROR("[EndpointManager::updateProxyWeighted _objName:" << _objName << ", activeProxys.size() <= 0]" << endl);
  1362. return ;
  1363. }
  1364. vector<AdapterProxy*> vProxy;
  1365. for(size_t i = 0; i < _activeProxys.size(); ++i)
  1366. {
  1367. if(_activeProxys[i]->getWeight() > 0)
  1368. {
  1369. vProxy.push_back(_activeProxys[i]);
  1370. }
  1371. }
  1372. iWeightProxySize = vProxy.size();
  1373. if(iWeightProxySize <= 0)
  1374. {
  1375. TLOGERROR("[EndpointManager::updateProxyWeighted _objName:" << _objName << ", vProxy.size() <= 0]" << endl);
  1376. return ;
  1377. }
  1378. if(_update)
  1379. {
  1380. _activeWeightProxy = vProxy;
  1381. updateStaticWeighted();
  1382. }
  1383. _update = false;
  1384. }
  1385. void EndpointManager::updateStaticWeighted()
  1386. {
  1387. size_t iWeightProxySize = _activeWeightProxy.size();
  1388. vector<int> vWeight;
  1389. vWeight.resize(iWeightProxySize);
  1390. for(size_t i = 0; i < iWeightProxySize; i++)
  1391. {
  1392. vWeight[i] = _activeWeightProxy[i]->getWeight();
  1393. }
  1394. dispatchEndpointCache(vWeight);
  1395. }
  1396. void EndpointManager::dispatchEndpointCache(const vector<int> &vWeight)
  1397. {
  1398. if(vWeight.size() <= 0)
  1399. {
  1400. TLOGERROR("[EndpointManager::dispatchEndpointCache vWeight.size() < 0]" << endl);
  1401. return ;
  1402. }
  1403. size_t iWeightProxySize = vWeight.size();
  1404. map<size_t, int> mIdToWeight;
  1405. multimap<int, size_t> mWeightToId;
  1406. size_t iMaxR = 0;
  1407. size_t iMaxRouterR = 0;
  1408. size_t iMaxWeight = 0;
  1409. size_t iMinWeight = 0;
  1410. size_t iTotalCapacty = 0;
  1411. size_t iTempWeight = 0;
  1412. for(size_t i = 0; i < vWeight.size(); ++i)
  1413. {
  1414. iTotalCapacty += vWeight[i];
  1415. }
  1416. _staticRouterCache.clear();
  1417. _lastSWeightPosition = 0;
  1418. _staticRouterCache.reserve(iTotalCapacty+100);
  1419. iMaxWeight = vWeight[0];
  1420. iMinWeight = vWeight[0];
  1421. for(size_t i = 1;i < iWeightProxySize; i++)
  1422. {
  1423. iTempWeight = vWeight[i];
  1424. if(iTempWeight > iMaxWeight)
  1425. {
  1426. iMaxWeight = iTempWeight;
  1427. }
  1428. if(iTempWeight < iMinWeight)
  1429. {
  1430. iMinWeight = iTempWeight;
  1431. }
  1432. }
  1433. if(iMinWeight > 0)
  1434. {
  1435. iMaxR = iMaxWeight / iMinWeight;
  1436. if(iMaxR < iMinWeightLimit)
  1437. iMaxR = iMinWeightLimit;
  1438. if(iMaxR > iMaxWeightLimit)
  1439. iMaxR = iMaxWeightLimit;
  1440. }
  1441. else
  1442. {
  1443. iMaxR = 1;
  1444. iMaxWeight = 1;
  1445. }
  1446. for(size_t i = 0;i < iWeightProxySize; i++)
  1447. {
  1448. int iWeight = (vWeight[i] * iMaxR) / iMaxWeight;
  1449. if(iWeight > 0)
  1450. {
  1451. iMaxRouterR += iWeight;
  1452. mIdToWeight.insert(map<size_t, int>::value_type(i, iWeight));
  1453. mWeightToId.insert(make_pair(iWeight, i));
  1454. }
  1455. else
  1456. {
  1457. _staticRouterCache.push_back(i);
  1458. }
  1459. TLOGTARS("EndpointManager::dispatchEndpointCache _objName:" << _objName << "|endpoint:" << _activeWeightProxy[i]->endpoint().desc() << "|iWeightR:" << iWeight << endl);
  1460. }
  1461. for(size_t i = 0; i < iMaxRouterR; i++)
  1462. {
  1463. bool bFirst = true;
  1464. multimap<int, size_t> mulTemp;
  1465. multimap<int, size_t>::reverse_iterator mIter = mWeightToId.rbegin();
  1466. while(mIter != mWeightToId.rend())
  1467. {
  1468. if(bFirst)
  1469. {
  1470. bFirst = false;
  1471. _staticRouterCache.push_back(mIter->second);
  1472. mulTemp.insert(make_pair((mIter->first - iMaxRouterR + mIdToWeight[mIter->second]), mIter->second));
  1473. }
  1474. else
  1475. {
  1476. mulTemp.insert(make_pair((mIter->first + mIdToWeight[mIter->second]), mIter->second));
  1477. }
  1478. mIter++;
  1479. }
  1480. mWeightToId.clear();
  1481. mWeightToId.swap(mulTemp);
  1482. }
  1483. }
  1484. /////////////////////////////////////////////////////////////////////////////
  1485. EndpointThread::EndpointThread(Communicator* pComm, const string & sObjName, GetEndpointType type, const string & sName, bool bFirstNetThread)
  1486. : QueryEpBase(pComm,bFirstNetThread,true)
  1487. , _type(type)
  1488. , _name(sName)
  1489. {
  1490. init(sObjName, "", true);
  1491. }
  1492. void EndpointThread::getEndpoints(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1493. {
  1494. //直连调用这个接口无效
  1495. if(_direct)
  1496. {
  1497. return ;
  1498. }
  1499. {
  1500. TC_LockT<TC_ThreadMutex> lock(_mutex);
  1501. refreshReg(_type,_name);
  1502. activeEndPoint = _activeEndPoint;
  1503. inactiveEndPoint = _inactiveEndPoint;
  1504. }
  1505. }
  1506. void EndpointThread::getTCEndpoints(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1507. {
  1508. //直连调用这个接口无效
  1509. if(_direct)
  1510. {
  1511. return ;
  1512. }
  1513. {
  1514. TC_LockT<TC_ThreadMutex> lock(_mutex);
  1515. refreshReg(_type,_name);
  1516. activeEndPoint = _activeTCEndPoint;
  1517. inactiveEndPoint = _inactiveTCEndPoint;
  1518. }
  1519. }
  1520. void EndpointThread::notifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive,bool bSync)
  1521. {
  1522. if(!bSync)
  1523. {
  1524. TC_LockT<TC_ThreadMutex> lock(_mutex);
  1525. update(active, inactive);
  1526. }
  1527. else
  1528. {
  1529. update(active, inactive);
  1530. }
  1531. }
  1532. void EndpointThread::update(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
  1533. {
  1534. _activeEndPoint.clear();
  1535. _inactiveEndPoint.clear();
  1536. _activeTCEndPoint.clear();
  1537. _inactiveTCEndPoint.clear();
  1538. set<EndpointInfo>::iterator iter= active.begin();
  1539. for(;iter != active.end(); ++iter)
  1540. {
  1541. // TC_Endpoint ep = (iter->host(), iter->port(), 3000, iter->type(), iter->grid());
  1542. _activeTCEndPoint.push_back(iter->getEndpoint());
  1543. _activeEndPoint.push_back(*iter);
  1544. }
  1545. iter = inactive.begin();
  1546. for(;iter != inactive.end(); ++iter)
  1547. {
  1548. // TC_Endpoint ep(iter->host(), iter->port(), 3000, iter->type(), iter->grid());
  1549. _inactiveTCEndPoint.push_back(iter->getEndpoint());
  1550. _inactiveEndPoint.push_back(*iter);
  1551. }
  1552. }
  1553. /////////////////////////////////////////////////////////////////////////////
  1554. EndpointManagerThread::EndpointManagerThread(Communicator * pComm,const string & sObjName)
  1555. :_communicator(pComm)
  1556. ,_objName(sObjName)
  1557. {
  1558. }
  1559. EndpointManagerThread::~EndpointManagerThread()
  1560. {
  1561. map<string,EndpointThread*>::iterator iter;
  1562. for(iter=_info.begin();iter != _info.end();iter++)
  1563. {
  1564. if(iter->second)
  1565. {
  1566. delete iter->second;
  1567. iter->second = NULL;
  1568. }
  1569. }
  1570. }
  1571. void EndpointManagerThread::getEndpoint(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1572. {
  1573. EndpointThread * pThread = getEndpointThread(E_DEFAULT,"");
  1574. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1575. }
  1576. void EndpointManagerThread::getEndpointByAll(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1577. {
  1578. EndpointThread * pThread = getEndpointThread(E_ALL,"");
  1579. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1580. }
  1581. void EndpointManagerThread::getEndpointBySet(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1582. {
  1583. EndpointThread * pThread = getEndpointThread(E_SET,sName);
  1584. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1585. }
  1586. void EndpointManagerThread::getEndpointByStation(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1587. {
  1588. EndpointThread * pThread = getEndpointThread(E_STATION,sName);
  1589. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1590. }
  1591. void EndpointManagerThread::getTCEndpoint(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1592. {
  1593. EndpointThread * pThread = getEndpointThread(E_DEFAULT,"");
  1594. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1595. }
  1596. void EndpointManagerThread::getTCEndpointByAll(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1597. {
  1598. EndpointThread * pThread = getEndpointThread(E_ALL,"");
  1599. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1600. }
  1601. void EndpointManagerThread::getTCEndpointBySet(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1602. {
  1603. EndpointThread * pThread = getEndpointThread(E_SET,sName);
  1604. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1605. }
  1606. void EndpointManagerThread::getTCEndpointByStation(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1607. {
  1608. EndpointThread * pThread = getEndpointThread(E_STATION,sName);
  1609. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1610. }
  1611. EndpointThread * EndpointManagerThread::getEndpointThread(GetEndpointType type,const string & sName)
  1612. {
  1613. TC_LockT<TC_SpinLock> lock(_mutex);
  1614. string sAllName = TC_Common::tostr((int)type) + ":" + sName;
  1615. map<string,EndpointThread*>::iterator iter;
  1616. iter = _info.find(sAllName);
  1617. if(iter != _info.end())
  1618. {
  1619. return iter->second;
  1620. }
  1621. EndpointThread * pThread = new EndpointThread(_communicator, _objName, type, sName);
  1622. _info[sAllName] = pThread;
  1623. return pThread;
  1624. }
  1625. }