EndpointManager.cpp 58 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "servant/EndpointManager.h"
  17. #include "servant/ObjectProxy.h"
  18. #include "servant/TarsLogger.h"
  19. #include "servant/AppCache.h"
  20. #include "servant/Application.h"
  21. #include "servant/StatReport.h"
  22. namespace tars
  23. {
  24. /////////////////////////////////////////////////////////////////////////////
  25. QueryEpBase::QueryEpBase(Communicator * pComm, bool bFirstNetThread,bool bInterfaceReq)
  26. : _communicator(pComm)
  27. , _firstNetThread(bFirstNetThread)
  28. , _interfaceReq(bInterfaceReq)
  29. , _direct(false)
  30. , _objName("")
  31. , _invokeSetId("")
  32. , _locator("")
  33. , _valid(false)
  34. , _weightType(E_LOOP)
  35. , _requestRegistry(false)
  36. , _requestTimeout(0)
  37. , _timeoutInterval(5*1000)
  38. , _refreshTime(0)
  39. , _refreshInterval(60*1000)
  40. , _activeEmptyInterval(10*1000)
  41. , _failInterval(2*1000)
  42. , _manyFailInterval(30*1000)
  43. , _failTimesLimit(3)
  44. , _failTimes(0)
  45. {
  46. setNoDelete(true);
  47. }
  48. void QueryEpBase::callback_findObjectById4All(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp)
  49. {
  50. TLOGTARS("[TARS][callback_findObjectById4All _objName:" << _objName << "|ret:" << ret
  51. << ",active:" << activeEp.size()
  52. << ",inactive:" << inactiveEp.size() << "]" << endl);
  53. doEndpoints(activeEp,inactiveEp,ret);
  54. }
  55. void QueryEpBase::callback_findObjectById4All_exception(tars::Int32 ret)
  56. {
  57. TLOGERROR("[TARS][callback_findObjectById4All_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  58. doEndpointsExp(ret);
  59. }
  60. void QueryEpBase::callback_findObjectById4Any(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp)
  61. {
  62. TLOGTARS("[TARS][callback_findObjectById4Any _objName:" << _objName << "|ret:" << ret
  63. << ",active:" << activeEp.size()
  64. << ",inactive:" << inactiveEp.size() << "]" << endl);
  65. doEndpoints(activeEp,inactiveEp,ret);
  66. }
  67. void QueryEpBase::callback_findObjectById4Any_exception(tars::Int32 ret)
  68. {
  69. TLOGERROR("[TARS][callback_findObjectById4Any_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  70. doEndpointsExp(ret);
  71. }
  72. void QueryEpBase::callback_findObjectByIdInSameGroup(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp)
  73. {
  74. TLOGTARS("[TARS][callback_findObjectByIdInSameGroup _objName:" << _objName << "|ret:"<<ret
  75. << ",active:" << activeEp.size()
  76. << ",inactive:" << inactiveEp.size() << "]" << endl);
  77. doEndpoints(activeEp,inactiveEp,ret);
  78. }
  79. void QueryEpBase::callback_findObjectByIdInSameGroup_exception(tars::Int32 ret)
  80. {
  81. TLOGERROR("[TARS][callback_findObjectByIdInSameGroup_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  82. doEndpointsExp(ret);
  83. }
  84. void QueryEpBase::callback_findObjectByIdInSameSet( Int32 ret, const vector<tars::EndpointF> &activeEp, const vector<tars::EndpointF> & inactiveEp)
  85. {
  86. TLOGTARS("[TARS][callback_findObjectByIdInSameSet _objName:" << _objName << "|ret:" << ret
  87. << ",active:" << activeEp.size()
  88. << ",inactive:" << inactiveEp.size() << "]" << endl);
  89. doEndpoints(activeEp,inactiveEp,ret);
  90. }
  91. void QueryEpBase::callback_findObjectByIdInSameSet_exception( Int32 ret)
  92. {
  93. TLOGERROR("[TARS][callback_findObjectByIdInSameSet_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  94. doEndpointsExp(ret);
  95. }
  96. void QueryEpBase::callback_findObjectByIdInSameStation( Int32 ret, const vector<tars::EndpointF> &activeEp, const vector<tars::EndpointF> &inactiveEp)
  97. {
  98. TLOGTARS("[TARS][callback_findObjectByIdInSameStation _objName:" << _objName << "|ret:" << ret
  99. << ",active:" << activeEp.size()
  100. << ",inactive:" << inactiveEp.size() << "]" << endl);
  101. doEndpoints(activeEp,inactiveEp,ret);
  102. }
  103. void QueryEpBase::callback_findObjectByIdInSameStation_exception( Int32 ret)
  104. {
  105. TLOGERROR("[TARS][callback_findObjectByIdInSameStation_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
  106. doEndpointsExp(ret);
  107. }
  108. int QueryEpBase::setLocatorPrx(QueryFPrx prx)
  109. {
  110. _queryFPrx = prx;
  111. return 0;
  112. }
  113. bool QueryEpBase::init(const string & sObjName,const string & sLocator,const string& setName)
  114. {
  115. TLOGTARS("[TARS][QueryEpBase::init sObjName:" << sObjName << ",sLocator:" << sLocator << ",setName:" << setName << "]" << endl);
  116. _locator = sLocator;
  117. _invokeSetId = setName;
  118. setObjName(sObjName);
  119. return true;
  120. }
  121. void QueryEpBase::setObjName(const string & sObjName)
  122. {
  123. string sEndpoints("");
  124. string sInactiveEndpoints("");
  125. string::size_type pos = sObjName.find_first_of('@');
  126. if (pos != string::npos)
  127. {
  128. //[直接连接]指定服务的IP和端口列表
  129. _objName = sObjName.substr(0,pos);
  130. sEndpoints = sObjName.substr(pos + 1);
  131. _direct = true;
  132. _valid = true;
  133. }
  134. else
  135. {
  136. //[间接连接]通过registry查询服务端的IP和端口列表
  137. _direct = false;
  138. _valid = false;
  139. _objName = sObjName;
  140. if(_locator.find_first_not_of('@') == string::npos)
  141. {
  142. TLOGERROR("[TARS][QueryEpBase::setObjName locator is not valid,_locator:" << _locator << "]" << endl);
  143. throw TarsRegistryException("locator is not valid,_locator:" + _locator);
  144. }
  145. _queryFPrx = _communicator->stringToProxy<QueryFPrx>(_locator);
  146. string sLocatorKey = _locator;
  147. //如果启用set,则获取按set分组的缓存
  148. if(ClientConfig::SetOpen)
  149. {
  150. sLocatorKey += "_" + ClientConfig::SetDivision;
  151. }
  152. string objName = _objName + string(_invokeSetId.empty() ? "" : ":") + _invokeSetId;
  153. //[间接连接]第一次使用cache,如果是接口级请求则不从缓存读取
  154. if(!_interfaceReq)
  155. {
  156. sEndpoints = AppCache::getInstance()->get(objName,sLocatorKey);
  157. sInactiveEndpoints = AppCache::getInstance()->get("inactive_" + objName, sLocatorKey);
  158. }
  159. }
  160. setEndpoints(sEndpoints, _activeEndpoints);
  161. setEndpoints(sInactiveEndpoints, _inactiveEndpoints);
  162. if(_activeEndpoints.size() > 0)
  163. {
  164. _valid = true;
  165. }
  166. if(_activeEndpoints.size() > 0 || _inactiveEndpoints.size() > 0)
  167. {
  168. notifyEndpoints(_activeEndpoints, _inactiveEndpoints, true);
  169. }
  170. }
  171. vector<string> QueryEpBase::sepEndpoint(const string& sEndpoints)
  172. {
  173. vector<string> vEndpoints;
  174. bool flag = false;
  175. string::size_type startPos = 0;
  176. string::size_type sepPos = 0;
  177. for(string::size_type pos = 0; pos < sEndpoints.size(); pos++)
  178. {
  179. if(sEndpoints[pos] == ':' && !flag )
  180. {
  181. sepPos = pos;
  182. flag = true;
  183. }
  184. else if(flag)
  185. {
  186. if(sEndpoints[pos] == ' ')
  187. {
  188. continue;
  189. }
  190. if(TC_Port::strncasecmp("tcp", (sEndpoints.c_str() + pos), 3) == 0 || TC_Port::strncasecmp("udp", (sEndpoints.c_str() + pos), 3) == 0)
  191. {
  192. string ep = TC_Common::trim(string(sEndpoints.c_str() + startPos, sepPos - startPos));
  193. if(!ep.empty()) {
  194. vEndpoints.push_back(ep);
  195. }
  196. startPos = pos;
  197. }
  198. flag = false;
  199. }
  200. }
  201. string ep = sEndpoints.substr(startPos);
  202. if(!ep.empty()) {
  203. vEndpoints.push_back(ep);
  204. }
  205. // vEndpoints.push_back(sEndpoints.substr(startPos));
  206. return vEndpoints;
  207. }
  208. void QueryEpBase::setEndpoints(const string & sEndpoints, set<EndpointInfo> & setEndpoints)
  209. {
  210. if(sEndpoints == "")
  211. {
  212. return ;
  213. }
  214. bool bSameWeightType = true;
  215. bool bFirstWeightType = true;
  216. unsigned int iWeightType = 0;
  217. // vector<string> vEndpoints = TC_Common::sepstr<string>(sEndpoints, ":", false, isRealEndpoint);
  218. vector<string> vEndpoints = sepEndpoint(sEndpoints);
  219. for (size_t i = 0; i < vEndpoints.size(); ++i)
  220. {
  221. try
  222. {
  223. TC_Endpoint ep(vEndpoints[i]);
  224. EndpointInfo::EType type;
  225. if (ep.isSSL())
  226. type = EndpointInfo::SSL;
  227. else if (ep.isTcp())
  228. type = EndpointInfo::TCP;
  229. else
  230. type = EndpointInfo::UDP;
  231. string sSetDivision;
  232. //解析set分组信息
  233. {
  234. string sep = " -s ";
  235. size_t pos = vEndpoints[i].rfind(sep);
  236. if (pos != string::npos)
  237. {
  238. sSetDivision = TC_Common::trim(vEndpoints[i].substr(pos+sep.size()));
  239. size_t endPos = sSetDivision.find(" ");
  240. if (endPos != string::npos)
  241. {
  242. sSetDivision = sSetDivision.substr(0, endPos);
  243. }
  244. }
  245. }
  246. if(bFirstWeightType)
  247. {
  248. bFirstWeightType = false;
  249. iWeightType = ep.getWeightType();
  250. }
  251. else
  252. {
  253. if(ep.getWeightType() != iWeightType)
  254. {
  255. bSameWeightType = false;
  256. }
  257. }
  258. EndpointInfo epi(ep.getHost(), ep.getPort(), type, ep.getGrid(), sSetDivision, ep.getQos(), ep.getWeight(), ep.getWeightType(), ep.getAuthType());
  259. setEndpoints.insert(epi);
  260. }
  261. catch (...)
  262. {
  263. TLOGERROR("[TARS][QueryEpBase::setEndpoints parse error,objname:" << _objName << ",endpoint:" << vEndpoints[i] << "]" << endl);
  264. }
  265. }
  266. if(bSameWeightType)
  267. {
  268. if(iWeightType == 1)
  269. {
  270. _weightType = E_STATIC_WEIGHT;
  271. }
  272. else
  273. {
  274. _weightType = E_LOOP;
  275. }
  276. }
  277. else
  278. {
  279. _weightType = E_LOOP;
  280. }
  281. }
  282. void QueryEpBase::refreshReg(GetEndpointType type, const string & sName)
  283. {
  284. if(_direct)
  285. {
  286. return;
  287. }
  288. int64_t iNow = TNOWMS;
  289. //正在请求状态 而且请求超时了,或者第一次
  290. if(_requestRegistry && _requestTimeout < iNow)
  291. {
  292. doEndpointsExp(0);
  293. }
  294. //如果是间接连接,通过registry定时查询服务列表
  295. //正在请求状态 而且请求超时了
  296. //非请求状态 到了下一个刷新时间了
  297. if( (!_requestRegistry) && (_refreshTime <= iNow))
  298. {
  299. _requestRegistry = true;
  300. //一定时间不回调就算超时了
  301. _requestTimeout = iNow + _timeoutInterval;
  302. TLOGTARS("[TARS][QueryEpBase::refresh,"<<_objName<<"]"<<endl);
  303. //判断是同步调用还是异步调用
  304. //内部请求主控都是异步请求
  305. //接口请求主控第一次是同步请求
  306. bool bSync = (!_valid && _interfaceReq);
  307. try
  308. {
  309. if(bSync)
  310. {
  311. vector<tars::EndpointF> activeEp;
  312. vector<tars::EndpointF> inactiveEp;
  313. int iRet = 0;
  314. switch(type)
  315. {
  316. case E_ALL:
  317. {
  318. iRet = _queryFPrx->findObjectById4Any(_objName,activeEp,inactiveEp);
  319. break;
  320. }
  321. case E_STATION:
  322. {
  323. iRet = _queryFPrx->findObjectByIdInSameStation(_objName,sName,activeEp,inactiveEp);
  324. }
  325. case E_SET:
  326. {
  327. iRet = _queryFPrx->findObjectByIdInSameSet(_objName,sName,activeEp,inactiveEp);
  328. break;
  329. }
  330. case E_DEFAULT:
  331. default:
  332. {
  333. if(ClientConfig::SetOpen || !_invokeSetId.empty())
  334. {
  335. //指定set调用时,指定set的优先级最高
  336. string setId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
  337. iRet = _queryFPrx->findObjectByIdInSameSet(_objName,setId,activeEp,inactiveEp);
  338. }
  339. else
  340. {
  341. iRet = _queryFPrx->findObjectByIdInSameGroup(_objName,activeEp,inactiveEp);
  342. }
  343. break;
  344. }
  345. }
  346. doEndpoints(activeEp, inactiveEp, iRet, true);
  347. }
  348. else
  349. {
  350. switch(type)
  351. {
  352. case E_ALL:
  353. {
  354. _queryFPrx->async_findObjectById4Any(this,_objName);
  355. break;
  356. }
  357. case E_STATION:
  358. {
  359. _queryFPrx->async_findObjectByIdInSameStation(this,_objName,sName);
  360. break;
  361. }
  362. case E_SET:
  363. {
  364. _queryFPrx->async_findObjectByIdInSameSet(this,_objName,sName);
  365. break;
  366. }
  367. case E_DEFAULT:
  368. default:
  369. {
  370. if(ClientConfig::SetOpen || !_invokeSetId.empty())
  371. {
  372. //指定set调用时,指定set的优先级最高
  373. string setId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
  374. _queryFPrx->async_findObjectByIdInSameSet(this,_objName,setId);
  375. }
  376. else
  377. {
  378. _queryFPrx->async_findObjectByIdInSameGroup(this,_objName);
  379. }
  380. break;
  381. }
  382. }//end switch
  383. }
  384. }
  385. catch(TC_Exception & ex)
  386. {
  387. TLOGERROR("[TARS]QueryEpBase::refreshReg obj:"<<_objName<<"exception:"<<ex.what()<<endl);
  388. doEndpointsExp(TARSSERVERUNKNOWNERR);
  389. }
  390. catch(...)
  391. {
  392. TLOGERROR("[TARS]QueryEpBase::refreshReg obj:"<<_objName<<"unknown exception:"<<endl);
  393. doEndpointsExp(TARSSERVERUNKNOWNERR);
  394. }
  395. }
  396. }
  397. void QueryEpBase::doEndpoints(const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp, int iRet, bool bSync)
  398. {
  399. if(iRet != 0)
  400. {
  401. doEndpointsExp(iRet);
  402. return ;
  403. }
  404. _failTimes = 0;
  405. _requestRegistry = false;
  406. int64_t iNow = TNOWMS;
  407. //有返回成功的结点,按照正常的频率
  408. //如果返回空列表或者返回失败 2s去刷新一次
  409. //接口请求主控的方式 不管是不是空都要去刷新
  410. if(activeEp.empty() && (!_interfaceReq) )
  411. {
  412. _refreshTime = iNow + _activeEmptyInterval;
  413. //如果registry返回Active服务列表为空,不做更新
  414. TLOGERROR("[TARS][QueryEpBase::doEndpoints, callback activeEps is empty,objname:"<< _objName << "]" << endl);
  415. return;
  416. }
  417. else
  418. {
  419. _refreshTime = iNow + _refreshInterval;
  420. }
  421. bool bNeedNotify = false;
  422. bool bSameWeightType = true;
  423. bool bFirstWeightType = true;
  424. int iWeightType = 0;
  425. set<string> sActiveEndpoints;
  426. set<string> sInactiveEndpoints;
  427. set<EndpointInfo> activeEps;
  428. set<EndpointInfo> inactiveEps;
  429. //生成active set 用于比较
  430. for (uint32_t i = 0; i < activeEp.size(); ++i)
  431. {
  432. if(bFirstWeightType)
  433. {
  434. bFirstWeightType = false;
  435. iWeightType = activeEp[i].weightType;
  436. }
  437. else
  438. {
  439. if(activeEp[i].weightType != iWeightType)
  440. {
  441. bSameWeightType = false;
  442. }
  443. }
  444. // tars istcp意思和这里枚举值对应
  445. EndpointInfo::EType type = EndpointInfo::EType(activeEp[i].istcp);
  446. EndpointInfo ep(activeEp[i].host, activeEp[i].port, type, activeEp[i].grid, activeEp[i].setId, activeEp[i].qos, activeEp[i].weight, activeEp[i].weightType, activeEp[i].authType);
  447. activeEps.insert(ep);
  448. }
  449. //生成inactive set 用于比较
  450. for (uint32_t i = 0; i < inactiveEp.size(); ++i)
  451. {
  452. // tars istcp意思和这里枚举值对应
  453. EndpointInfo::EType type = EndpointInfo::EType(inactiveEp[i].istcp);
  454. EndpointInfo ep(inactiveEp[i].host, inactiveEp[i].port, type, inactiveEp[i].grid, inactiveEp[i].setId, inactiveEp[i].qos, inactiveEp[i].weight, inactiveEp[i].weightType, inactiveEp[i].authType);
  455. inactiveEps.insert(ep);
  456. }
  457. if(bSameWeightType)
  458. {
  459. if(iWeightType == 1)
  460. {
  461. _weightType = E_STATIC_WEIGHT;
  462. }
  463. else
  464. {
  465. _weightType = E_LOOP;
  466. }
  467. }
  468. else
  469. {
  470. _weightType = E_LOOP;
  471. }
  472. if(activeEps != _activeEndpoints)
  473. {
  474. bNeedNotify = true;
  475. _activeEndpoints = activeEps;
  476. if(_firstNetThread)
  477. {
  478. setEndPointToCache(false);
  479. }
  480. }
  481. if(inactiveEps != _inactiveEndpoints)
  482. {
  483. bNeedNotify = true;
  484. _inactiveEndpoints = inactiveEps;
  485. if(_firstNetThread)
  486. {
  487. setEndPointToCache(true);
  488. }
  489. }
  490. if(bNeedNotify)
  491. {
  492. notifyEndpoints(_activeEndpoints, _inactiveEndpoints, bSync);
  493. }
  494. if(!_valid)
  495. {
  496. _valid = true;
  497. doNotify();
  498. }
  499. }
  500. void QueryEpBase::doEndpointsExp(int iRet)
  501. {
  502. _failTimes++;
  503. _requestRegistry = false;
  504. int64_t iNow = TNOWMS;
  505. //频率控制获取主控失败 2秒钟再更新
  506. _refreshTime = iNow + _failInterval;
  507. //获取主控连续失败3次就等30s再更新一次
  508. //连续失败 强制设成数据是有效的
  509. if(_failTimes > _failTimesLimit)
  510. {
  511. if(!_valid)
  512. {
  513. _valid = true;
  514. doNotify();
  515. }
  516. _refreshTime = iNow + _manyFailInterval;
  517. }
  518. }
  519. void QueryEpBase::setEndPointToCache(bool bInactive)
  520. {
  521. //如果是接口级请求则不缓存到文件
  522. if(_interfaceReq)
  523. {
  524. return;
  525. }
  526. string sEndpoints;
  527. set<EndpointInfo> doEndpoints;
  528. if(!bInactive)
  529. {
  530. doEndpoints = _activeEndpoints;
  531. }
  532. else
  533. {
  534. doEndpoints = _inactiveEndpoints;
  535. }
  536. set<EndpointInfo>::iterator iter;
  537. iter = doEndpoints.begin();
  538. for (; iter != doEndpoints.end(); ++iter)
  539. {
  540. //这里的超时时间 只是对服务端有效。这里的值无效。所以默认用3000了
  541. TC_Endpoint ep(iter->host(), iter->port(), 3000, iter->type(), iter->grid(), iter->qos(), iter->weight(), iter->getWeightType());
  542. ep.setAuthType(iter->authType());
  543. if (!sEndpoints.empty())
  544. {
  545. sEndpoints += ":";
  546. }
  547. sEndpoints += ep.toString();
  548. if (!iter->setDivision().empty())
  549. {
  550. sEndpoints += " -s " + iter->setDivision();
  551. }
  552. }
  553. //如果启用set,则按set分组保存
  554. string sLocatorKey = _locator;
  555. if(ClientConfig::SetOpen)
  556. {
  557. sLocatorKey += "_" + ClientConfig::SetDivision;
  558. }
  559. string objName = _objName + string(_invokeSetId.empty() ? "" : ":") + _invokeSetId;
  560. if(bInactive)
  561. {
  562. AppCache::getInstance()->set("inactive_"+objName,sEndpoints,sLocatorKey);
  563. }
  564. else
  565. {
  566. AppCache::getInstance()->set(objName,sEndpoints,sLocatorKey);
  567. }
  568. TLOGTARS("[TARS][setEndPointToCache,obj:" << _objName << ",invokeSetId:" << _invokeSetId << ",endpoint:" << sEndpoints << "]" << endl);
  569. }
  570. /////////////////////////////////////////////////////////////////////////////
  571. EndpointManager::EndpointManager(ObjectProxy * pObjectProxy, Communicator* pComm, const string & sObjName, bool bFirstNetThread,const string& setName)
  572. : QueryEpBase(pComm, bFirstNetThread, false)
  573. ,_objectProxy(pObjectProxy)
  574. ,_lastRoundPosition(0)
  575. ,_update(true)
  576. ,_updateWeightInterval(60)
  577. ,_lastSWeightPosition(0)
  578. ,_consistentHashWeight(E_TC_CONHASH_KETAMAHASH)
  579. ,_consistentHash(E_TC_CONHASH_KETAMAHASH)
  580. {
  581. setNetThreadProcess(true);
  582. init(sObjName,_communicator->getProperty("locator"),setName);
  583. }
  584. EndpointManager::~EndpointManager()
  585. {
  586. map<string,AdapterProxy*>::iterator iterAdapter;
  587. for(iterAdapter = _allProxys.begin();iterAdapter != _allProxys.end();iterAdapter++)
  588. {
  589. if(iterAdapter->second)
  590. {
  591. delete iterAdapter->second;
  592. iterAdapter->second = NULL;
  593. }
  594. }
  595. }
  596. void EndpointManager::notifyEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive, bool bSync)
  597. {
  598. set<EndpointInfo>::const_iterator iter;
  599. map<string,AdapterProxy*>::iterator iterAdapter;
  600. pair<map<string,AdapterProxy*>::iterator,bool> result;
  601. _activeProxys.clear();
  602. _regProxys.clear();
  603. //更新active
  604. iter = active.begin();
  605. for(;iter != active.end();++iter)
  606. {
  607. if(!_direct && _weightType == E_STATIC_WEIGHT && iter->weight() <= 0)
  608. {
  609. continue;
  610. }
  611. iterAdapter = _allProxys.find(iter->desc());
  612. if(iterAdapter == _allProxys.end())
  613. {
  614. AdapterProxy* ap = new AdapterProxy(_objectProxy, *iter, _communicator);
  615. result = _allProxys.insert(make_pair(iter->desc(),ap));
  616. assert(result.second);
  617. iterAdapter = result.first;
  618. _vAllProxys.push_back(ap);
  619. }
  620. //该节点在主控的状态为active
  621. iterAdapter->second->setActiveInReg(true);
  622. _activeProxys.push_back(iterAdapter->second);
  623. _regProxys.insert(make_pair(iter->desc(),iterAdapter->second));
  624. //设置该节点的静态权重值
  625. iterAdapter->second->setWeight(iter->weight());
  626. }
  627. //更新inactive
  628. iter = inactive.begin();
  629. for(;iter != inactive.end();++iter)
  630. {
  631. if(!_direct && _weightType == E_STATIC_WEIGHT && iter->weight() <= 0)
  632. {
  633. continue;
  634. }
  635. iterAdapter = _allProxys.find(iter->desc());
  636. if(iterAdapter == _allProxys.end())
  637. {
  638. AdapterProxy* ap = new AdapterProxy(_objectProxy, *iter, _communicator);
  639. result = _allProxys.insert(make_pair(iter->desc(),ap));
  640. assert(result.second);
  641. iterAdapter = result.first;
  642. _vAllProxys.push_back(ap);
  643. }
  644. //该节点在主控的状态为inactive
  645. iterAdapter->second->setActiveInReg(false);
  646. _regProxys.insert(make_pair(iter->desc(),iterAdapter->second));
  647. //设置该节点的静态权重值
  648. iterAdapter->second->setWeight(iter->weight());
  649. }
  650. //_vRegProxys 需要按顺序来 重排
  651. _vRegProxys.clear();
  652. iterAdapter = _regProxys.begin();
  653. for(;iterAdapter != _regProxys.end();++iterAdapter)
  654. {
  655. _vRegProxys.push_back(iterAdapter->second);
  656. }
  657. _update = true;
  658. }
  659. void EndpointManager::doNotify()
  660. {
  661. _objectProxy->doInvoke();
  662. }
  663. bool EndpointManager::selectAdapterProxy(ReqMessage * msg,AdapterProxy * & pAdapterProxy)
  664. {
  665. pAdapterProxy = NULL;
  666. //刷新主控
  667. refreshReg(E_DEFAULT,"");
  668. //无效的数据 返回true
  669. if(!_valid)
  670. {
  671. return true;
  672. }
  673. //如果有hash,则先使用hash策略
  674. if (msg->bHash)
  675. {
  676. pAdapterProxy = getHashProxy(msg->iHashCode, msg->bConHash);
  677. return false;
  678. }
  679. if(_weightType == E_STATIC_WEIGHT)
  680. {
  681. //权重模式
  682. bool bStaticWeighted = false;
  683. if(_weightType == E_STATIC_WEIGHT || msg->eType == ReqMessage::ONE_WAY)
  684. bStaticWeighted = true;
  685. pAdapterProxy = getWeightedProxy(bStaticWeighted);
  686. }
  687. else
  688. {
  689. //普通轮询模式
  690. pAdapterProxy = getNextValidProxy();
  691. }
  692. return false;
  693. }
  694. AdapterProxy * EndpointManager::getNextValidProxy()
  695. {
  696. if (_activeProxys.empty())
  697. {
  698. TLOGERROR("[TARS][EndpointManager::getNextValidProxy activeEndpoints is empty][obj:"<<_objName<<"]" << endl);
  699. return NULL;
  700. }
  701. vector<AdapterProxy*> conn;
  702. for(size_t i=0;i<_activeProxys.size();i++)
  703. {
  704. ++_lastRoundPosition;
  705. if(_lastRoundPosition >= _activeProxys.size())
  706. _lastRoundPosition = 0;
  707. if(_activeProxys[_lastRoundPosition]->checkActive())
  708. {
  709. return _activeProxys[_lastRoundPosition];
  710. }
  711. if(!_activeProxys[_lastRoundPosition]->isConnTimeout() &&
  712. !_activeProxys[_lastRoundPosition]->isConnExc())
  713. conn.push_back(_activeProxys[_lastRoundPosition]);
  714. }
  715. if(conn.size() > 0)
  716. {
  717. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  718. AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
  719. //该proxy可能已经被屏蔽,需重新连一次
  720. adapterProxy->checkActive(true);
  721. return adapterProxy;
  722. }
  723. //所有adapter都有问题 选不到结点,随机找一个重试
  724. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  725. //该proxy可能已经被屏蔽,需重新连一次
  726. adapterProxy->checkActive(true);
  727. return NULL;
  728. }
  729. AdapterProxy* EndpointManager::getHashProxy(int64_t hashCode, bool bConsistentHash)
  730. {
  731. if(_weightType == E_STATIC_WEIGHT)
  732. {
  733. if(bConsistentHash)
  734. {
  735. return getConHashProxyForWeight(hashCode, true);
  736. }
  737. else
  738. {
  739. return getHashProxyForWeight(hashCode, true, _hashStaticRouterCache);
  740. }
  741. }
  742. else
  743. {
  744. if(bConsistentHash)
  745. {
  746. return getConHashProxyForNormal(hashCode);
  747. }
  748. else
  749. {
  750. return getHashProxyForNormal(hashCode);
  751. }
  752. }
  753. }
  754. AdapterProxy* EndpointManager::getHashProxyForWeight(int64_t hashCode, bool bStatic, vector<size_t> &vRouterCache)
  755. {
  756. if(_vRegProxys.empty())
  757. {
  758. TLOGERROR("[TARS][EndpointManager::getHashProxyForWeight _vRegProxys is empty], bStatic:" << bStatic << endl);
  759. return NULL;
  760. }
  761. if(checkHashStaticWeightChange(bStatic))
  762. {
  763. int64_t iBegin = TNOWMS;
  764. updateHashProxyWeighted(bStatic);
  765. int64_t iEnd = TNOWMS;
  766. TLOGTARS("[TARS][EndpointManager::getHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
  767. }
  768. if(vRouterCache.size() > 0)
  769. {
  770. size_t hash = ((int64_t)hashCode) % vRouterCache.size();
  771. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  772. if(hash >= vRouterCache.size())
  773. {
  774. hash = hash % vRouterCache.size();
  775. }
  776. size_t iIndex = vRouterCache[hash];
  777. if(iIndex >= _vRegProxys.size())
  778. {
  779. iIndex = iIndex % _vRegProxys.size();
  780. }
  781. //被hash到的节点在主控是active的才走在流程
  782. if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive())
  783. {
  784. return _vRegProxys[iIndex];
  785. }
  786. else
  787. {
  788. if(_activeProxys.empty())
  789. {
  790. TLOGERROR("[TARS][EndpointManager::getHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
  791. return NULL;
  792. }
  793. //在active节点中再次hash
  794. vector<AdapterProxy*> thisHash = _activeProxys;
  795. vector<AdapterProxy*> conn;
  796. do
  797. {
  798. hash = ((int64_t)hashCode) % thisHash.size();
  799. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  800. if(hash >= thisHash.size())
  801. {
  802. hash = hash % thisHash.size();
  803. }
  804. if (thisHash[hash]->checkActive())
  805. {
  806. return thisHash[hash];
  807. }
  808. if(!thisHash[hash]->isConnTimeout() &&
  809. !thisHash[hash]->isConnExc())
  810. {
  811. conn.push_back(thisHash[hash]);
  812. }
  813. thisHash.erase(thisHash.begin() + hash);
  814. }
  815. while(!thisHash.empty());
  816. if(conn.size() > 0)
  817. {
  818. hash = ((int64_t)hashCode) % conn.size();
  819. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  820. if(hash >= conn.size())
  821. {
  822. hash = hash % conn.size();
  823. }
  824. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  825. AdapterProxy *adapterProxy = conn[hash];
  826. //该proxy可能已经被屏蔽,需重新连一次
  827. adapterProxy->checkActive(true);
  828. return adapterProxy;
  829. }
  830. //所有adapter都有问题 选不到结点,随机找一个重试
  831. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  832. //该proxy可能已经被屏蔽,需重新连一次
  833. adapterProxy->checkActive(true);
  834. return NULL;
  835. }
  836. }
  837. return getHashProxyForNormal(hashCode);
  838. }
  839. AdapterProxy* EndpointManager::getConHashProxyForWeight(int64_t hashCode, bool bStatic)
  840. {
  841. if(_vRegProxys.empty())
  842. {
  843. TLOGERROR("[TARS][EndpointManager::getConHashProxyForWeight _vRegProxys is empty], bStatic:" << bStatic << endl);
  844. return NULL;
  845. }
  846. if(checkConHashChange(bStatic, _lastConHashWeightProxys))
  847. {
  848. int64_t iBegin = TNOWMS;
  849. updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
  850. int64_t iEnd = TNOWMS;
  851. TLOGTARS("[TARS][EndpointManager::getConHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
  852. }
  853. if(_consistentHashWeight.size() > 0)
  854. {
  855. unsigned int iIndex = 0;
  856. // 通过一致性hash取到对应的节点
  857. _consistentHashWeight.getIndex(hashCode, iIndex);
  858. if(iIndex >= _vRegProxys.size())
  859. {
  860. iIndex = iIndex % _vRegProxys.size();
  861. }
  862. //被hash到的节点在主控是active的才走在流程
  863. if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive())
  864. {
  865. return _vRegProxys[iIndex];
  866. }
  867. else
  868. {
  869. if(_activeProxys.empty())
  870. {
  871. TLOGERROR("[TARS][EndpointManager::getConHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
  872. return NULL;
  873. }
  874. //在active节点中再次hash
  875. vector<AdapterProxy*> thisHash = _activeProxys;
  876. vector<AdapterProxy*> conn;
  877. size_t hash = 0;
  878. do
  879. {
  880. hash = ((int64_t)hashCode) % thisHash.size();
  881. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  882. if(hash >= thisHash.size())
  883. {
  884. hash = hash % thisHash.size();
  885. }
  886. if (thisHash[hash]->checkActive())
  887. {
  888. return thisHash[hash];
  889. }
  890. if(!thisHash[hash]->isConnTimeout() && !thisHash[hash]->isConnExc())
  891. {
  892. conn.push_back(thisHash[hash]);
  893. }
  894. thisHash.erase(thisHash.begin() + hash);
  895. }
  896. while(!thisHash.empty());
  897. if(conn.size() > 0)
  898. {
  899. hash = ((int64_t)hashCode) % conn.size();
  900. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  901. if(hash >= conn.size())
  902. {
  903. hash = hash % conn.size();
  904. }
  905. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  906. AdapterProxy *adapterProxy = conn[hash];
  907. //该proxy可能已经被屏蔽,需重新连一次
  908. adapterProxy->checkActive(true);
  909. return adapterProxy;
  910. }
  911. //所有adapter都有问题 选不到结点,随机找一个重试
  912. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  913. //该proxy可能已经被屏蔽,需重新连一次
  914. adapterProxy->checkActive(true);
  915. return NULL;
  916. }
  917. }
  918. return getHashProxyForNormal(hashCode);
  919. }
  920. bool EndpointManager::checkHashStaticWeightChange(bool bStatic)
  921. {
  922. if(bStatic)
  923. {
  924. if(_lastHashStaticProxys.size() != _vRegProxys.size())
  925. {
  926. return true;
  927. }
  928. for(size_t i = 0; i < _vRegProxys.size(); i++)
  929. {
  930. if(_lastHashStaticProxys[i]->getWeight() != _vRegProxys[i]->getWeight() || _lastHashStaticProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc())
  931. {
  932. return true;
  933. }
  934. }
  935. }
  936. return false;
  937. }
  938. bool EndpointManager::checkConHashChange(bool bStatic, const vector<AdapterProxy*> &vLastConHashProxys)
  939. {
  940. if(vLastConHashProxys.size() != _vRegProxys.size())
  941. {
  942. return true;
  943. }
  944. for(size_t i = 0; i < _vRegProxys.size(); i++)
  945. {
  946. if(bStatic)
  947. {
  948. if(vLastConHashProxys[i]->getWeight() != _vRegProxys[i]->getWeight() || vLastConHashProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc())
  949. {
  950. return true;
  951. }
  952. }
  953. else
  954. {
  955. if(vLastConHashProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc())
  956. {
  957. return true;
  958. }
  959. }
  960. }
  961. return false;
  962. }
  963. void EndpointManager::updateHashProxyWeighted(bool bStatic)
  964. {
  965. if(_vRegProxys.size() <= 0)
  966. {
  967. TLOGERROR("[TARS][EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
  968. return ;
  969. }
  970. if(bStatic)
  971. {
  972. _lastHashStaticProxys = _vRegProxys;
  973. _hashStaticRouterCache.clear();
  974. }
  975. vector<AdapterProxy*> vRegProxys;
  976. vector<size_t> vIndex;
  977. for(size_t i = 0; i < _vRegProxys.size(); ++i)
  978. {
  979. if(_vRegProxys[i]->getWeight() > 0)
  980. {
  981. vRegProxys.push_back(_vRegProxys[i]);
  982. vIndex.push_back(i);
  983. }
  984. }
  985. if(vRegProxys.size() <= 0)
  986. {
  987. TLOGERROR("[TARS][EndpointManager::updateHashProxyWeighted vRegProxys is empty], bStatic:" << bStatic << endl);
  988. return ;
  989. }
  990. size_t iHashStaticWeightSize = vRegProxys.size();
  991. map<size_t, int> mIdToWeight;
  992. multimap<int, size_t> mWeightToId;
  993. size_t iMaxR = 0;
  994. size_t iMaxRouterR = 0;
  995. size_t iMaxWeight = vRegProxys[0]->getWeight();
  996. size_t iMinWeight = vRegProxys[0]->getWeight();
  997. size_t iTempWeight = 0;
  998. for(size_t i = 1;i < iHashStaticWeightSize; i++)
  999. {
  1000. iTempWeight = vRegProxys[i]->getWeight();
  1001. if(iTempWeight > iMaxWeight)
  1002. {
  1003. iMaxWeight = iTempWeight;
  1004. }
  1005. if(iTempWeight < iMinWeight)
  1006. {
  1007. iMinWeight = iTempWeight;
  1008. }
  1009. }
  1010. if(iMinWeight > 0)
  1011. {
  1012. iMaxR = iMaxWeight / iMinWeight;
  1013. if(iMaxR < iMinWeightLimit)
  1014. iMaxR = iMinWeightLimit;
  1015. if(iMaxR > iMaxWeightLimit)
  1016. iMaxR = iMaxWeightLimit;
  1017. }
  1018. else
  1019. {
  1020. iMaxR = 1;
  1021. iMaxWeight = 1;
  1022. }
  1023. for(size_t i = 0;i < iHashStaticWeightSize; i++)
  1024. {
  1025. int iWeight = (vRegProxys[i]->getWeight() * iMaxR) / iMaxWeight;
  1026. if(iWeight > 0)
  1027. {
  1028. iMaxRouterR += iWeight;
  1029. mIdToWeight.insert(map<size_t, int>::value_type(vIndex[i], iWeight));
  1030. mWeightToId.insert(make_pair(iWeight, vIndex[i]));
  1031. }
  1032. else
  1033. {
  1034. if(bStatic)
  1035. {
  1036. _hashStaticRouterCache.push_back(vIndex[i]);
  1037. }
  1038. }
  1039. TLOGTARS("[TARS]EndpointManager::updateHashProxyWeighted bStatic:" << bStatic << "|_objName:" << _objName << "|endpoint:" << vRegProxys[i]->endpoint().desc() << "|iWeight:" << vRegProxys[i]->getWeight() << "|iWeightR:" << iWeight << "|iIndex:" << vIndex[i] << endl);
  1040. }
  1041. for(size_t i = 0; i < iMaxRouterR; i++)
  1042. {
  1043. bool bFirst = true;
  1044. multimap<int, size_t> mulTemp;
  1045. multimap<int, size_t>::reverse_iterator mIter = mWeightToId.rbegin();
  1046. while(mIter != mWeightToId.rend())
  1047. {
  1048. if(bFirst)
  1049. {
  1050. bFirst = false;
  1051. if(bStatic)
  1052. {
  1053. _hashStaticRouterCache.push_back(mIter->second);
  1054. }
  1055. mulTemp.insert(make_pair((mIter->first - iMaxRouterR + mIdToWeight[mIter->second]), mIter->second));
  1056. }
  1057. else
  1058. {
  1059. mulTemp.insert(make_pair((mIter->first + mIdToWeight[mIter->second]), mIter->second));
  1060. }
  1061. mIter++;
  1062. }
  1063. mWeightToId.clear();
  1064. mWeightToId.swap(mulTemp);
  1065. }
  1066. }
  1067. void EndpointManager::updateConHashProxyWeighted(bool bStatic, vector<AdapterProxy*> &vLastConHashProxys, TC_ConsistentHashNew &conHash)
  1068. {
  1069. if(_vRegProxys.size() <= 0)
  1070. {
  1071. TLOGERROR("[TARS][EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
  1072. return ;
  1073. }
  1074. vLastConHashProxys = _vRegProxys;
  1075. conHash.clear();
  1076. for(size_t i = 0; i < _vRegProxys.size(); ++i)
  1077. {
  1078. int iWeight = (bStatic ? (_vRegProxys[i]->getWeight()) : 100);
  1079. if(iWeight > 0)
  1080. {
  1081. iWeight = iWeight / 4;
  1082. if(iWeight <= 0)
  1083. {
  1084. iWeight = 1;
  1085. }
  1086. conHash.addNode(_vRegProxys[i]->endpoint().desc(), i, iWeight);
  1087. }
  1088. }
  1089. conHash.sortNode();
  1090. }
  1091. AdapterProxy* EndpointManager::getHashProxyForNormal(int64_t hashCode)
  1092. {
  1093. if(_vRegProxys.empty())
  1094. {
  1095. TLOGERROR("[TARS][EndpointManager::getHashProxyForNormal _vRegProxys is empty]" << endl);
  1096. return NULL;
  1097. }
  1098. // 1 _vRegProxys从客户端启动之后,就不会再改变,除非有节点增加
  1099. // 2 如果有增加节点,则_vRegProxys顺序会重新排序,之前的hash会改变
  1100. // 3 节点下线后,需要下次启动客户端后,_vRegProxys内容才会生效
  1101. size_t hash = ((int64_t)hashCode) % _vRegProxys.size();
  1102. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1103. if(hash >= _vRegProxys.size())
  1104. {
  1105. hash = hash % _vRegProxys.size();
  1106. }
  1107. //被hash到的节点在主控是active的才走在流程
  1108. if (_vRegProxys[hash]->isActiveInReg() && _vRegProxys[hash]->checkActive())
  1109. {
  1110. return _vRegProxys[hash];
  1111. }
  1112. else
  1113. {
  1114. if(_activeProxys.empty())
  1115. {
  1116. TLOGERROR("[TARS][EndpointManager::getHashProxyForNormal _activeEndpoints is empty]" << endl);
  1117. return NULL;
  1118. }
  1119. //在active节点中再次hash
  1120. vector<AdapterProxy*> thisHash = _activeProxys;
  1121. vector<AdapterProxy*> conn;
  1122. do
  1123. {
  1124. hash = ((int64_t)hashCode) % thisHash.size();
  1125. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1126. if(hash >= thisHash.size())
  1127. {
  1128. hash = hash % thisHash.size();
  1129. }
  1130. if (thisHash[hash]->checkActive())
  1131. {
  1132. return thisHash[hash];
  1133. }
  1134. if(!thisHash[hash]->isConnTimeout() &&
  1135. !thisHash[hash]->isConnExc())
  1136. {
  1137. conn.push_back(thisHash[hash]);
  1138. }
  1139. thisHash.erase(thisHash.begin() + hash);
  1140. }
  1141. while(!thisHash.empty());
  1142. if(conn.size() > 0)
  1143. {
  1144. hash = ((int64_t)hashCode) % conn.size();
  1145. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1146. if(hash >= conn.size())
  1147. {
  1148. hash = hash % conn.size();
  1149. }
  1150. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1151. AdapterProxy *adapterProxy = conn[hash];
  1152. //该proxy可能已经被屏蔽,需重新连一次
  1153. adapterProxy->checkActive(true);
  1154. return adapterProxy;
  1155. }
  1156. //所有adapter都有问题 选不到结点,随机找一个重试
  1157. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  1158. //该proxy可能已经被屏蔽,需重新连一次
  1159. adapterProxy->checkActive(true);
  1160. return NULL;
  1161. }
  1162. }
  1163. AdapterProxy* EndpointManager::getConHashProxyForNormal(int64_t hashCode)
  1164. {
  1165. if(_vRegProxys.empty())
  1166. {
  1167. TLOGERROR("[TARS][EndpointManager::getConHashProxyForNormal _vRegProxys is empty]" << endl);
  1168. return NULL;
  1169. }
  1170. if(checkConHashChange(false, _lastConHashProxys))
  1171. {
  1172. int64_t iBegin = TNOWMS;
  1173. updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
  1174. int64_t iEnd = TNOWMS;
  1175. TLOGTARS("[TARS][EndpointManager::getConHashProxyForNormal update _objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
  1176. }
  1177. if(_consistentHash.size() > 0)
  1178. {
  1179. unsigned int iIndex = 0;
  1180. // 通过一致性hash取到对应的节点
  1181. _consistentHash.getIndex(hashCode, iIndex);
  1182. if(iIndex >= _vRegProxys.size())
  1183. {
  1184. iIndex = iIndex % _vRegProxys.size();
  1185. }
  1186. //被hash到的节点在主控是active的才走在流程
  1187. if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive())
  1188. {
  1189. return _vRegProxys[iIndex];
  1190. }
  1191. else
  1192. {
  1193. if(_activeProxys.empty())
  1194. {
  1195. TLOGERROR("[TARS][EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
  1196. return NULL;
  1197. }
  1198. //在active节点中再次hash
  1199. vector<AdapterProxy*> thisHash = _activeProxys;
  1200. vector<AdapterProxy*> conn;
  1201. size_t hash = 0;
  1202. do
  1203. {
  1204. hash = ((int64_t)hashCode) % thisHash.size();
  1205. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1206. if(hash >= thisHash.size())
  1207. {
  1208. hash = hash % thisHash.size();
  1209. }
  1210. if (thisHash[hash]->checkActive())
  1211. {
  1212. return thisHash[hash];
  1213. }
  1214. if(!thisHash[hash]->isConnTimeout() &&
  1215. !thisHash[hash]->isConnExc())
  1216. {
  1217. conn.push_back(thisHash[hash]);
  1218. }
  1219. thisHash.erase(thisHash.begin() + hash);
  1220. }
  1221. while(!thisHash.empty());
  1222. if(conn.size() > 0)
  1223. {
  1224. hash = ((int64_t)hashCode) % conn.size();
  1225. //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
  1226. if(hash >= conn.size())
  1227. {
  1228. hash = hash % conn.size();
  1229. }
  1230. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1231. AdapterProxy *adapterProxy = conn[hash];
  1232. //该proxy可能已经被屏蔽,需重新连一次
  1233. adapterProxy->checkActive(true);
  1234. return adapterProxy;
  1235. }
  1236. //所有adapter都有问题 选不到结点,随机找一个重试
  1237. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  1238. //该proxy可能已经被屏蔽,需重新连一次
  1239. adapterProxy->checkActive(true);
  1240. return NULL;
  1241. }
  1242. }
  1243. return getHashProxyForNormal(hashCode);
  1244. }
  1245. AdapterProxy* EndpointManager::getWeightedProxy(bool bStaticWeighted)
  1246. {
  1247. return getWeightedForNormal(bStaticWeighted);
  1248. }
  1249. AdapterProxy* EndpointManager::getWeightedForNormal(bool bStaticWeighted)
  1250. {
  1251. if (_activeProxys.empty())
  1252. {
  1253. TLOGERROR("[TARS][EndpointManager::getWeightedForNormal activeEndpoints is empty][obj:"<<_objName<<"]" << endl);
  1254. return NULL;
  1255. }
  1256. int64_t iNow = TNOW;
  1257. if(_lastBuildWeightTime <= iNow)
  1258. {
  1259. updateProxyWeighted();
  1260. if(!_first)
  1261. {
  1262. _lastBuildWeightTime = iNow + _updateWeightInterval;
  1263. }
  1264. else
  1265. {
  1266. _first = false;
  1267. _lastBuildWeightTime = iNow + _updateWeightInterval + 5;
  1268. }
  1269. }
  1270. bool bEmpty = false;
  1271. int iActiveSize = _activeWeightProxy.size();
  1272. if(iActiveSize > 0)
  1273. {
  1274. size_t iProxyIndex = 0;
  1275. set<AdapterProxy*> sConn;
  1276. if(_staticRouterCache.size() > 0)
  1277. {
  1278. for(size_t i = 0;i < _staticRouterCache.size(); i++)
  1279. {
  1280. ++_lastSWeightPosition;
  1281. if(_lastSWeightPosition >= _staticRouterCache.size())
  1282. _lastSWeightPosition = 0;
  1283. iProxyIndex = _staticRouterCache[_lastSWeightPosition];
  1284. if(_activeWeightProxy[iProxyIndex]->checkActive())
  1285. {
  1286. return _activeWeightProxy[iProxyIndex];
  1287. }
  1288. if(!_activeWeightProxy[iProxyIndex]->isConnTimeout() &&
  1289. !_activeWeightProxy[iProxyIndex]->isConnExc())
  1290. {
  1291. sConn.insert(_activeWeightProxy[iProxyIndex]);
  1292. }
  1293. }
  1294. }
  1295. else
  1296. {
  1297. bEmpty = true;
  1298. }
  1299. if(!bEmpty)
  1300. {
  1301. if(sConn.size() > 0)
  1302. {
  1303. vector<AdapterProxy*> conn;
  1304. set<AdapterProxy*>::iterator it_conn = sConn.begin();
  1305. while(it_conn != sConn.end())
  1306. {
  1307. conn.push_back(*it_conn);
  1308. ++it_conn;
  1309. }
  1310. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1311. AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
  1312. //该proxy可能已经被屏蔽,需重新连一次
  1313. adapterProxy->checkActive(true);
  1314. return adapterProxy;
  1315. }
  1316. //所有adapter都有问题 选不到结点,随机找一个重试
  1317. AdapterProxy * adapterProxy = _activeWeightProxy[((uint32_t)rand() % iActiveSize)];
  1318. //该proxy可能已经被屏蔽,需重新连一次
  1319. adapterProxy->checkActive(true);
  1320. return NULL;
  1321. }
  1322. }
  1323. vector<AdapterProxy*> conn;
  1324. for(size_t i=0;i<_activeProxys.size();i++)
  1325. {
  1326. ++_lastRoundPosition;
  1327. if(_lastRoundPosition >= _activeProxys.size())
  1328. _lastRoundPosition = 0;
  1329. if(_activeProxys[_lastRoundPosition]->checkActive())
  1330. {
  1331. return _activeProxys[_lastRoundPosition];
  1332. }
  1333. if(!_activeProxys[_lastRoundPosition]->isConnTimeout() &&
  1334. !_activeProxys[_lastRoundPosition]->isConnExc())
  1335. conn.push_back(_activeProxys[_lastRoundPosition]);
  1336. }
  1337. if(conn.size() > 0)
  1338. {
  1339. //都有问题, 随机选择一个没有connect超时或者链接异常的发送
  1340. AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
  1341. //该proxy可能已经被屏蔽,需重新连一次
  1342. adapterProxy->checkActive(true);
  1343. return adapterProxy;
  1344. }
  1345. //所有adapter都有问题 选不到结点,随机找一个重试
  1346. AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
  1347. //该proxy可能已经被屏蔽,需重新连一次
  1348. adapterProxy->checkActive(true);
  1349. return NULL;
  1350. }
  1351. void EndpointManager::updateProxyWeighted()
  1352. {
  1353. size_t iWeightProxySize = _activeProxys.size();
  1354. if(iWeightProxySize <= 0)
  1355. {
  1356. TLOGERROR("[TARS]EndpointManager::updateProxyWeighted _objName:" << _objName << "|_activeProxys.size() <= 0" << endl);
  1357. return ;
  1358. }
  1359. vector<AdapterProxy*> vProxy;
  1360. for(size_t i = 0; i < _activeProxys.size(); ++i)
  1361. {
  1362. if(_activeProxys[i]->getWeight() > 0)
  1363. {
  1364. vProxy.push_back(_activeProxys[i]);
  1365. }
  1366. }
  1367. iWeightProxySize = vProxy.size();
  1368. if(iWeightProxySize <= 0)
  1369. {
  1370. TLOGERROR("[TARS]EndpointManager::updateProxyWeighted _objName:" << _objName << "|vProxy.size() <= 0" << endl);
  1371. return ;
  1372. }
  1373. if(_update)
  1374. {
  1375. _activeWeightProxy = vProxy;
  1376. updateStaticWeighted();
  1377. }
  1378. _update = false;
  1379. }
  1380. void EndpointManager::updateStaticWeighted()
  1381. {
  1382. size_t iWeightProxySize = _activeWeightProxy.size();
  1383. vector<int> vWeight;
  1384. vWeight.resize(iWeightProxySize);
  1385. for(size_t i = 0; i < iWeightProxySize; i++)
  1386. {
  1387. vWeight[i] = _activeWeightProxy[i]->getWeight();
  1388. }
  1389. dispatchEndpointCache(vWeight);
  1390. }
  1391. void EndpointManager::dispatchEndpointCache(const vector<int> &vWeight)
  1392. {
  1393. if(vWeight.size() <= 0)
  1394. {
  1395. TLOGERROR("EndpointManager::dispatchEndpointCache vWeight.size() < 0" << endl);
  1396. return ;
  1397. }
  1398. size_t iWeightProxySize = vWeight.size();
  1399. map<size_t, int> mIdToWeight;
  1400. multimap<int, size_t> mWeightToId;
  1401. size_t iMaxR = 0;
  1402. size_t iMaxRouterR = 0;
  1403. size_t iMaxWeight = 0;
  1404. size_t iMinWeight = 0;
  1405. size_t iTotalCapacty = 0;
  1406. size_t iTempWeight = 0;
  1407. for(size_t i = 0; i < vWeight.size(); ++i)
  1408. {
  1409. iTotalCapacty += vWeight[i];
  1410. }
  1411. _staticRouterCache.clear();
  1412. _lastSWeightPosition = 0;
  1413. _staticRouterCache.reserve(iTotalCapacty+100);
  1414. iMaxWeight = vWeight[0];
  1415. iMinWeight = vWeight[0];
  1416. for(size_t i = 1;i < iWeightProxySize; i++)
  1417. {
  1418. iTempWeight = vWeight[i];
  1419. if(iTempWeight > iMaxWeight)
  1420. {
  1421. iMaxWeight = iTempWeight;
  1422. }
  1423. if(iTempWeight < iMinWeight)
  1424. {
  1425. iMinWeight = iTempWeight;
  1426. }
  1427. }
  1428. if(iMinWeight > 0)
  1429. {
  1430. iMaxR = iMaxWeight / iMinWeight;
  1431. if(iMaxR < iMinWeightLimit)
  1432. iMaxR = iMinWeightLimit;
  1433. if(iMaxR > iMaxWeightLimit)
  1434. iMaxR = iMaxWeightLimit;
  1435. }
  1436. else
  1437. {
  1438. iMaxR = 1;
  1439. iMaxWeight = 1;
  1440. }
  1441. for(size_t i = 0;i < iWeightProxySize; i++)
  1442. {
  1443. int iWeight = (vWeight[i] * iMaxR) / iMaxWeight;
  1444. if(iWeight > 0)
  1445. {
  1446. iMaxRouterR += iWeight;
  1447. mIdToWeight.insert(map<size_t, int>::value_type(i, iWeight));
  1448. mWeightToId.insert(make_pair(iWeight, i));
  1449. }
  1450. else
  1451. {
  1452. _staticRouterCache.push_back(i);
  1453. }
  1454. TLOGTARS("[TARS]EndpointManager::dispatchEndpointCache _objName:" << _objName << "|endpoint:" << _activeWeightProxy[i]->endpoint().desc() << "|iWeightR:" << iWeight << endl);
  1455. }
  1456. for(size_t i = 0; i < iMaxRouterR; i++)
  1457. {
  1458. bool bFirst = true;
  1459. multimap<int, size_t> mulTemp;
  1460. multimap<int, size_t>::reverse_iterator mIter = mWeightToId.rbegin();
  1461. while(mIter != mWeightToId.rend())
  1462. {
  1463. if(bFirst)
  1464. {
  1465. bFirst = false;
  1466. _staticRouterCache.push_back(mIter->second);
  1467. mulTemp.insert(make_pair((mIter->first - iMaxRouterR + mIdToWeight[mIter->second]), mIter->second));
  1468. }
  1469. else
  1470. {
  1471. mulTemp.insert(make_pair((mIter->first + mIdToWeight[mIter->second]), mIter->second));
  1472. }
  1473. mIter++;
  1474. }
  1475. mWeightToId.clear();
  1476. mWeightToId.swap(mulTemp);
  1477. }
  1478. }
  1479. /////////////////////////////////////////////////////////////////////////////
  1480. EndpointThread::EndpointThread(Communicator* pComm, const string & sObjName, GetEndpointType type, const string & sName, bool bFirstNetThread)
  1481. : QueryEpBase(pComm,bFirstNetThread,true)
  1482. , _type(type)
  1483. , _name(sName)
  1484. {
  1485. init(sObjName,_communicator->getProperty("locator"));
  1486. }
  1487. void EndpointThread::getEndpoints(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1488. {
  1489. //直连调用这个接口无效
  1490. if(_direct)
  1491. {
  1492. return ;
  1493. }
  1494. {
  1495. TC_LockT<TC_SpinLock> lock(_mutex);
  1496. // TC_ThreadLock::Lock lock(_mutex);
  1497. refreshReg(_type,_name);
  1498. activeEndPoint = _activeEndPoint;
  1499. inactiveEndPoint = _inactiveEndPoint;
  1500. }
  1501. }
  1502. void EndpointThread::getTCEndpoints(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1503. {
  1504. //直连调用这个接口无效
  1505. if(_direct)
  1506. {
  1507. return ;
  1508. }
  1509. {
  1510. TC_LockT<TC_SpinLock> lock(_mutex);
  1511. refreshReg(_type,_name);
  1512. activeEndPoint = _activeTCEndPoint;
  1513. inactiveEndPoint = _inactiveTCEndPoint;
  1514. }
  1515. }
  1516. void EndpointThread::notifyEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive, bool bSync)
  1517. {
  1518. if(!bSync)
  1519. {
  1520. TC_LockT<TC_SpinLock> lock(_mutex);
  1521. update(active, inactive);
  1522. }
  1523. else
  1524. {
  1525. update(active, inactive);
  1526. }
  1527. }
  1528. void EndpointThread::update(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
  1529. {
  1530. _activeEndPoint.clear();
  1531. _inactiveEndPoint.clear();
  1532. _activeTCEndPoint.clear();
  1533. _inactiveTCEndPoint.clear();
  1534. set<EndpointInfo>::iterator iter= active.begin();
  1535. for(;iter != active.end(); ++iter)
  1536. {
  1537. TC_Endpoint ep(iter->host(), iter->port(), 3000, iter->type(), iter->grid());
  1538. _activeTCEndPoint.push_back(ep);
  1539. _activeEndPoint.push_back(*iter);
  1540. }
  1541. iter = inactive.begin();
  1542. for(;iter != inactive.end(); ++iter)
  1543. {
  1544. TC_Endpoint ep(iter->host(), iter->port(), 3000, iter->type(), iter->grid());
  1545. _inactiveTCEndPoint.push_back(ep);
  1546. _inactiveEndPoint.push_back(*iter);
  1547. }
  1548. }
  1549. /////////////////////////////////////////////////////////////////////////////
  1550. EndpointManagerThread::EndpointManagerThread(Communicator * pComm,const string & sObjName)
  1551. :_communicator(pComm)
  1552. ,_objName(sObjName)
  1553. {
  1554. }
  1555. EndpointManagerThread::~EndpointManagerThread()
  1556. {
  1557. map<string,EndpointThread*>::iterator iter;
  1558. for(iter=_info.begin();iter != _info.end();iter++)
  1559. {
  1560. if(iter->second)
  1561. {
  1562. delete iter->second;
  1563. iter->second = NULL;
  1564. }
  1565. }
  1566. }
  1567. void EndpointManagerThread::getEndpoint(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1568. {
  1569. EndpointThread * pThread = getEndpointThread(E_DEFAULT,"");
  1570. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1571. }
  1572. void EndpointManagerThread::getEndpointByAll(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1573. {
  1574. EndpointThread * pThread = getEndpointThread(E_ALL,"");
  1575. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1576. }
  1577. void EndpointManagerThread::getEndpointBySet(const string sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1578. {
  1579. EndpointThread * pThread = getEndpointThread(E_SET,sName);
  1580. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1581. }
  1582. void EndpointManagerThread::getEndpointByStation(const string sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
  1583. {
  1584. EndpointThread * pThread = getEndpointThread(E_STATION,sName);
  1585. pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
  1586. }
  1587. void EndpointManagerThread::getTCEndpoint(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1588. {
  1589. EndpointThread * pThread = getEndpointThread(E_DEFAULT,"");
  1590. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1591. }
  1592. void EndpointManagerThread::getTCEndpointByAll(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1593. {
  1594. EndpointThread * pThread = getEndpointThread(E_ALL,"");
  1595. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1596. }
  1597. void EndpointManagerThread::getTCEndpointBySet(const string sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1598. {
  1599. EndpointThread * pThread = getEndpointThread(E_SET,sName);
  1600. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1601. }
  1602. void EndpointManagerThread::getTCEndpointByStation(const string sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
  1603. {
  1604. EndpointThread * pThread = getEndpointThread(E_STATION,sName);
  1605. pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
  1606. }
  1607. EndpointThread * EndpointManagerThread::getEndpointThread(GetEndpointType type,const string & sName)
  1608. {
  1609. TC_LockT<TC_SpinLock> lock(_mutex);
  1610. string sAllName = TC_Common::tostr((int)type) + ":" + sName;
  1611. map<string,EndpointThread*>::iterator iter;
  1612. iter = _info.find(sAllName);
  1613. if(iter != _info.end())
  1614. {
  1615. return iter->second;
  1616. }
  1617. EndpointThread * pThread = new EndpointThread(_communicator, _objName, type, sName);
  1618. _info[sAllName] = pThread;
  1619. return pThread;
  1620. }
  1621. }