1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218 |
- /**
- * Tencent is pleased to support the open source community by making Tars available.
- *
- * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
- *
- * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * https://opensource.org/licenses/BSD-3-Clause
- *
- * Unless required by applicable law or agreed to in writing, software distributed
- * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
- #include "util/tc_port.h"
- #include "servant/EndpointManager.h"
- #include "servant/ObjectProxy.h"
- #include "servant/RemoteLogger.h"
- #include "servant/AppCache.h"
- #include "servant/Application.h"
- #include "servant/CommunicatorEpoll.h"
- #include "servant/StatReport.h"
- namespace tars
- {
- /////////////////////////////////////////////////////////////////////////////
- QueryEpBase::QueryEpBase(Communicator * pComm, bool bFirstNetThread,bool bInterfaceReq)
- : _communicator(pComm)
- , _firstNetThread(bFirstNetThread)
- , _interfaceReq(bInterfaceReq)
- , _direct(false)
- , _objName("")
- , _invokeSetId("")
- , _locator("")
- , _valid(false)
- , _weightType(E_LOOP)
- , _rootServant(true)
- , _requestRegistry(false)
- , _requestTimeout(0)
- , _timeoutInterval(5*1000)
- , _refreshTime(0)
- , _refreshInterval(60*1000)
- , _activeEmptyInterval(10*1000)
- , _failInterval(2*1000)
- , _manyFailInterval(30*1000)
- , _failTimesLimit(3)
- , _failTimes(0)
- {
- _refreshInterval = TC_Common::strto<int>(_communicator->getProperty("refresh-endpoint-interval", "60*1000"));
- if(_refreshInterval < 5*1000)
- {
- _refreshInterval = 5 * 1000;
- }
- setNoDelete(true);
- }
- void QueryEpBase::callback_findObjectById4All(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
- {
- TLOGTARS("[callback_findObjectById4All _objName:" << _objName << "|ret:" << ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectById4All_exception(Int32 ret)
- {
- TLOGERROR("[callback_findObjectById4All_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- void QueryEpBase::callback_findObjectById4Any(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
- {
- TLOGTARS("[callback_findObjectById4Any _objName:" << _objName << "|ret:" << ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectById4Any_exception(Int32 ret)
- {
- TLOGERROR("[callback_findObjectById4Any_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameGroup(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
- {
- TLOGTARS("[callback_findObjectByIdInSameGroup _objName:" << _objName << "|ret:"<<ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameGroup_exception(Int32 ret)
- {
- TLOGERROR("[callback_findObjectByIdInSameGroup_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameSet( Int32 ret, const vector<EndpointF> &activeEp, const vector<EndpointF> & inactiveEp)
- {
- TLOGTARS("[callback_findObjectByIdInSameSet _objName:" << _objName << "|ret:" << ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameSet_exception( Int32 ret)
- {
- TLOGERROR("[callback_findObjectByIdInSameSet_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameStation( Int32 ret, const vector<EndpointF> &activeEp, const vector<EndpointF> &inactiveEp)
- {
- TLOGTARS("[callback_findObjectByIdInSameStation _objName:" << _objName << "|ret:" << ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameStation_exception( Int32 ret)
- {
- TLOGERROR("[callback_findObjectByIdInSameStation_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- int QueryEpBase::setLocatorPrx(QueryFPrx prx)
- {
- _queryFPrx = prx;
- return 0;
- }
- bool QueryEpBase::init(const string & sObjName, const string& setName, bool rootServant)
- {
- _locator = _communicator->getProperty("locator");
- TLOGTARS("QueryEpBase::init sObjName:" << sObjName << ", sLocator:" << _locator << ", setName:" << setName << ", rootServant: " << rootServant << endl);
- // LOG_CONSOLE_DEBUG << "QueryEpBase::init sObjName:" << sObjName << ", sLocator:" << _locator << ", setName:" << setName << ", rootServant: " << rootServant << endl;
- _invokeSetId = setName;
- _rootServant = rootServant;
- setObjName(sObjName);
- return true;
- }
- void QueryEpBase::setObjName(const string & sObjName)
- {
- string::size_type pos = sObjName.find_first_of('@');
- string sEndpoints;
- string sInactiveEndpoints;
- if (pos != string::npos)
- {
- //[直接连接]指定服务的IP和端口列表
- _objName = sObjName.substr(0,pos);
- sEndpoints = sObjName.substr(pos + 1);
- pos = _objName.find_first_of("#");
- if(pos != string::npos)
- {
- _objName = _objName.substr(0, pos);
- }
- _direct = true;
- _valid = true;
- }
- else
- {
- //[间接连接]通过registry查询服务端的IP和端口列表
- _direct = false;
- _valid = false;
- _objName = sObjName;
- if(_locator.find_first_not_of('@') == string::npos)
- {
- TLOGERROR("[QueryEpBase::setObjName locator is not valid,_locator:" << _locator << "]" << endl);
- throw TarsRegistryException("locator is not valid,_locator:" + _locator);
- }
- pos = _objName.find_first_of("#");
- if(pos != string::npos)
- {
- _objName = _objName.substr(0, pos);
- }
- _queryFPrx = _communicator->stringToProxy<QueryFPrx>(_locator);
- string sLocatorKey = _locator;
- //如果启用set,则获取按set分组的缓存
- if(ClientConfig::SetOpen)
- {
- sLocatorKey += "_" + ClientConfig::SetDivision;
- }
- string objName = _objName + string(_invokeSetId.empty() ? "" : ":") + _invokeSetId;
- //[间接连接]第一次使用cache,如果是接口级请求则不从缓存读取
- if(!_interfaceReq)
- {
- sEndpoints = AppCache::getInstance()->get(objName,sLocatorKey);
- sInactiveEndpoints = AppCache::getInstance()->get("inactive_"+objName,sLocatorKey);
- }
- }
- setEndpoints(sEndpoints,_activeEndpoints);
- setEndpoints(sInactiveEndpoints,_inactiveEndpoints);
- if(!_activeEndpoints.empty())
- {
- _valid = true;
- }
- if((!_activeEndpoints.empty() || !_inactiveEndpoints.empty()))
- {
- //非直接指定端口, 且从cache中能查到服务端口的, 不需要通知所有ObjectProxy更新地址
- notifyEndpoints(_activeEndpoints,_inactiveEndpoints,true);
- }
- }
- //
- //vector<string> QueryEpBase::sepEndpoint(const string& sEndpoints)
- //{
- // vector<string> vEndpoints;
- // bool flag = false;
- // string::size_type startPos = 0;
- // string::size_type sepPos = 0;
- // for(string::size_type pos = 0; pos < sEndpoints.size(); pos++)
- // {
- // if(sEndpoints[pos] == ':' && !flag )
- // {
- // sepPos = pos;
- // flag = true;
- // }
- // else if(flag)
- // {
- // if(sEndpoints[pos] == ' ')
- // {
- // continue;
- // }
- //
- // if(TC_Port::strncasecmp("tcp", (sEndpoints.c_str() + pos), 3) == 0
- // || TC_Port::strncasecmp("udp", (sEndpoints.c_str() + pos), 3) == 0
- // || TC_Port::strncasecmp("ssl", (sEndpoints.c_str() + pos), 3) == 0)
- // {
- // string ep = TC_Common::trim(string(sEndpoints.c_str() + startPos, sepPos - startPos));
- // if(!ep.empty()) {
- // vEndpoints.push_back(ep);
- // }
- // startPos = pos;
- // }
- //
- // flag = false;
- // }
- // }
- //
- // string ep = sEndpoints.substr(startPos);
- //
- // if(!ep.empty()) {
- // vEndpoints.push_back(ep);
- // }
- //
- //// vEndpoints.push_back(sEndpoints.substr(startPos));
- //
- // return vEndpoints;
- //}
- void QueryEpBase::setEndpoints(const string & sEndpoints, set<EndpointInfo> & setEndpoints)
- {
- if(sEndpoints == "")
- {
- return ;
- }
- bool bSameWeightType = true;
- bool bFirstWeightType = true;
- unsigned int iWeightType = 0;
- vector<string> vEndpoints = TC_Endpoint::sepEndpoint(sEndpoints);
- for (size_t i = 0; i < vEndpoints.size(); ++i)
- {
- try
- {
- TC_Endpoint ep(vEndpoints[i]);
- string sSetDivision;
- //解析set分组信息
- if (!_direct)
- {
- string sep = " -s ";
- size_t pos = vEndpoints[i].rfind(sep);
- if (pos != string::npos)
- {
- sSetDivision = TC_Common::trim(vEndpoints[i].substr(pos+sep.size()));
- size_t endPos = sSetDivision.find(" ");
- if (endPos != string::npos)
- {
- sSetDivision = sSetDivision.substr(0, endPos);
- }
- }
- }
- if(bFirstWeightType)
- {
- bFirstWeightType = false;
- iWeightType = ep.getWeightType();
- }
- else
- {
- if(ep.getWeightType() != iWeightType)
- {
- bSameWeightType = false;
- }
- }
- EndpointInfo epi(ep, sSetDivision);
- setEndpoints.insert(epi);
- }
- catch (exception &ex)
- {
- TLOGERROR("[QueryEpBase::setEndpoints parse error,objname:" << _objName << ",endpoint:" << vEndpoints[i] << "]" << endl);
- }
- }
- if(bSameWeightType)
- {
- if(iWeightType == 1)
- {
- _weightType = E_STATIC_WEIGHT;
- }
- else
- {
- _weightType = E_LOOP;
- }
- }
- else
- {
- _weightType = E_LOOP;
- }
- }
- void QueryEpBase::refreshReg(GetEndpointType type, const string & sName)
- {
- onUpdateOutter();
- if(_direct)
- {
- return;
- }
- int64_t iNow = TNOWMS;
- //正在请求状态 而且请求超时了,或者第一次
- if(_requestRegistry && _requestTimeout < iNow)
- {
- doEndpointsExp(0);
- }
- //如果是间接连接,通过registry定时查询服务列表
- //正在请求状态 而且请求超时了
- //非请求状态 到了下一个刷新时间了
- if( (!_requestRegistry) && (_refreshTime <= iNow))
- {
- _requestRegistry = true;
- //一定时间不回调就算超时了
- _requestTimeout = iNow + _timeoutInterval;
- TLOGTARS("[QueryEpBase::refresh," << _objName << "]" <<endl);
- if(_valid && !_rootServant)
- {
- return;
- }
- //判断是同步调用还是异步调用
- //内部请求主控都是异步请求
- //接口请求主控第一次是同步请求
- bool bSync = (!_valid && _interfaceReq);
- //如果是异步且不是根servant(通过#1创建的servant, 不主动更新主控信息)
- if(!bSync && !_rootServant)
- return;
- try
- {
- if(bSync)
- {
- vector<EndpointF> activeEp;
- vector<EndpointF> inactiveEp;
- int iRet = 0;
- switch(type)
- {
- case E_ALL:
- {
- iRet = _queryFPrx->findObjectById4Any(_objName,activeEp,inactiveEp, ServerConfig::Context);
- break;
- }
- case E_STATION:
- {
- iRet = _queryFPrx->findObjectByIdInSameStation(_objName,sName,activeEp,inactiveEp, ServerConfig::Context);
- break;
- }
- case E_SET:
- {
- iRet = _queryFPrx->findObjectByIdInSameSet(_objName,sName,activeEp,inactiveEp, ServerConfig::Context);
- break;
- }
- case E_DEFAULT:
- default:
- {
- if(ClientConfig::SetOpen || !_invokeSetId.empty())
- {
- //指定set调用时,指定set的优先级最高
- string setId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
- iRet = _queryFPrx->findObjectByIdInSameSet(_objName,setId,activeEp,inactiveEp, ServerConfig::Context);
- }
- else
- {
- iRet = _queryFPrx->findObjectByIdInSameGroup(_objName,activeEp,inactiveEp, ServerConfig::Context);
- }
- break;
- }
- }
- doEndpoints(activeEp, inactiveEp, iRet, true);
- }
- else
- {
- switch(type)
- {
- case E_ALL:
- {
- _queryFPrx->async_findObjectById4Any(this,_objName, ServerConfig::Context);
- break;
- }
- case E_STATION:
- {
- _queryFPrx->async_findObjectByIdInSameStation(this,_objName,sName, ServerConfig::Context);
- break;
- }
- case E_SET:
- {
- _queryFPrx->async_findObjectByIdInSameSet(this,_objName,sName, ServerConfig::Context);
- break;
- }
- case E_DEFAULT:
- default:
- {
- if(ClientConfig::SetOpen || !_invokeSetId.empty())
- {
- //指定set调用时,指定set的优先级最高
- string setId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
- _queryFPrx->async_findObjectByIdInSameSet(this,_objName,setId, ServerConfig::Context);
- }
- else
- {
- _queryFPrx->async_findObjectByIdInSameGroup(this,_objName, ServerConfig::Context);
- }
- break;
- }
- }//end switch
- }
- }
- catch(TC_Exception & ex)
- {
- TLOGERROR("[QueryEpBase::refreshReg obj:"<<_objName<<"exception:"<<ex.what() << "]"<<endl);
- doEndpointsExp(TARSSERVERUNKNOWNERR);
- }
- catch(...)
- {
- TLOGERROR("[QueryEpBase::refreshReg obj:"<<_objName<<"unknown exception]" <<endl);
- doEndpointsExp(TARSSERVERUNKNOWNERR);
- }
- }
- }
- void QueryEpBase::doEndpoints(const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp, int iRet, bool bSync)
- {
- if(iRet != 0)
- {
- doEndpointsExp(iRet);
- return ;
- }
- _failTimes = 0;
- _requestRegistry = false;
- int64_t iNow = TNOWMS;
- //有返回成功的结点,按照正常的频率
- //如果返回空列表或者返回失败 2s去刷新一次
- //接口请求主控的方式 不管是不是空都要去刷新
- if(activeEp.empty() && (!_interfaceReq) )
- {
- _refreshTime = iNow + _activeEmptyInterval;
- //如果registry返回Active服务列表为空,不做更新
- TLOGERROR("[QueryEpBase::doEndpoints, callback activeEps is empty,objname:"<< _objName << "]" << endl);
- return;
- }
- else
- {
- _refreshTime = iNow + _refreshInterval;
- }
- bool bNeedNotify = false;
- bool bSameWeightType = true;
- bool bFirstWeightType = true;
- int iWeightType = 0;
- set<string> sActiveEndpoints;
- set<string> sInactiveEndpoints;
- set<EndpointInfo> activeEps;
- set<EndpointInfo> inactiveEps;
- //生成active set 用于比较
- for (uint32_t i = 0; i < activeEp.size(); ++i)
- {
- if(bFirstWeightType)
- {
- bFirstWeightType = false;
- iWeightType = activeEp[i].weightType;
- }
- else
- {
- if(activeEp[i].weightType != iWeightType)
- {
- bSameWeightType = false;
- }
- }
- // taf istcp意思和这里枚举值对应
- activeEps.insert(EndpointInfo(activeEp[i]));
- }
- //生成inactive set 用于比较
- for (uint32_t i = 0; i < inactiveEp.size(); ++i)
- {
- // taf istcp意思和这里枚举值对应
- inactiveEps.insert(EndpointInfo(inactiveEp[i]));
- }
- if(bSameWeightType)
- {
- if(iWeightType == 1)
- {
- _weightType = E_STATIC_WEIGHT;
- }
- else
- {
- _weightType = E_LOOP;
- }
- }
- else
- {
- _weightType = E_LOOP;
- }
- if(activeEps != _activeEndpoints)
- {
- bNeedNotify = true;
- _activeEndpoints = activeEps;
- if(_firstNetThread)
- {
- setEndPointToCache(false);
- }
- }
- if(inactiveEps != _inactiveEndpoints)
- {
- bNeedNotify = true;
- _inactiveEndpoints = inactiveEps;
- if(_firstNetThread)
- {
- setEndPointToCache(true);
- }
- }
- if(bNeedNotify)
- {
- notifyEndpoints(_activeEndpoints,_inactiveEndpoints,bSync);
- }
- if(!_valid)
- {
- _valid = true;
- doNotify();
- }
- }
- void QueryEpBase::doEndpointsExp(int iRet)
- {
- _failTimes++;
- _requestRegistry = false;
- int64_t iNow = TNOWMS;
- //频率控制获取主控失败 2秒钟再更新
- _refreshTime = iNow + _failInterval;
- //获取主控连续失败3次就等30s再更新一次
- //连续失败 强制设成数据是有效的
- if(_failTimes > _failTimesLimit)
- {
- if(!_valid)
- {
- _valid = true;
- doNotify();
- }
- _refreshTime = iNow + _manyFailInterval;
- }
- }
- void QueryEpBase::setEndPointToCache(bool bInactive)
- {
- //如果是接口级请求则不缓存到文件
- if(_interfaceReq)
- {
- return;
- }
- string sEndpoints;
- set<EndpointInfo> doEndpoints;
- if(!bInactive)
- {
- doEndpoints = _activeEndpoints;
- }
- else
- {
- doEndpoints = _inactiveEndpoints;
- }
- set<EndpointInfo>::iterator iter;
- iter = doEndpoints.begin();
- for (; iter != doEndpoints.end(); ++iter)
- {
- //这里的超时时间 只是对服务端有效。这里的值无效。所以默认用3000了
- TC_Endpoint ep = iter->getEndpoint();
- if (!sEndpoints.empty())
- {
- sEndpoints += ":";
- }
- sEndpoints += ep.toString();
- if (!iter->setDivision().empty())
- {
- sEndpoints += " -s " + iter->setDivision();
- }
- }
- //如果启用set,则按set分组保存
- string sLocatorKey = _locator;
- if(ClientConfig::SetOpen)
- {
- sLocatorKey += "_" + ClientConfig::SetDivision;
- }
- string objName = _objName + string(_invokeSetId.empty()?"":":") + _invokeSetId;
- if(bInactive)
- {
- AppCache::getInstance()->set("inactive_"+objName,sEndpoints,sLocatorKey);
- }
- else
- {
- AppCache::getInstance()->set(objName,sEndpoints,sLocatorKey);
- }
- TLOGTARS("[setEndPointToCache,obj:" << _objName << ",invokeSetId:" << _invokeSetId << ",endpoint:" << sEndpoints << "]" << endl);
- }
- EndpointManager::EndpointManager(ObjectProxy * pObjectProxy, Communicator* pComm, bool bFirstNetThread)
- : QueryEpBase(pComm, bFirstNetThread, false)
- ,_objectProxy(pObjectProxy)
- ,_lastRoundPosition(0)
- ,_update(true)
- ,_updateWeightInterval(60)
- ,_lastSWeightPosition(0)
- ,_consistentHashWeight(E_TC_CONHASH_KETAMAHASH)
- ,_consistentHash(E_TC_CONHASH_KETAMAHASH)
- {
- setNetThreadProcess(true);
- }
- EndpointManager::~EndpointManager()
- {
- map<string,AdapterProxy*>::iterator iterAdapter;
- for(iterAdapter = _allProxys.begin();iterAdapter != _allProxys.end();iterAdapter++)
- {
- if(iterAdapter->second)
- {
- delete iterAdapter->second;
- iterAdapter->second = NULL;
- }
- }
- _allProxys.clear();
- }
- void EndpointManager::onUpdateOutter()
- {
- // LOG_CONSOLE_DEBUG << this->_objectProxy << ", valid:" << _valid << ", " << _outterUpdate.get() << endl;
- if(_outterUpdate)
- {
- shared_ptr<OutterUpdate> outterUpdate = _outterUpdate;
- updateEndpoints(outterUpdate->active, outterUpdate->inactive);
- _valid = true;
- _outterUpdate.reset();
- }
- }
- void EndpointManager::updateEndpointsOutter(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
- {
- // LOG_CONSOLE_DEBUG << this->_objectProxy << ", " << active.begin()->desc() << endl;
- //创新新对象, 避免线程冲突
- _outterUpdate = std::make_shared<OutterUpdate>();
- _outterUpdate->active = active;
- _outterUpdate->inactive = inactive;
- //更新时间
- _refreshTime = TNOWMS + _refreshInterval;
- // updateEndpoints(active, inactive);
- }
- void EndpointManager::updateEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
- {
- TLOGTARS("[EndpointManager::updateEndpoints obj:" << this->_objName << ", active:" << active.size() << ", inactive size:" << inactive.size() << endl);
- pair<map<string,AdapterProxy*>::iterator,bool> result;
- _activeProxys.clear();
- _regProxys.clear();
- _indexActiveProxys.clear();
- _sortActivProxys.clear();
- if(!active.empty())
- {
- //先把服务都设置为非活跃
- for (auto iter = _allProxys.begin(); iter != _allProxys.end(); ++iter)
- {
- iter->second->setActiveInReg(false);
- }
- }
- //更新active
- for(auto iter = active.begin(); iter != active.end(); ++iter)
- {
- if(!_direct && _weightType == E_STATIC_WEIGHT && iter->weight() <= 0)
- {
- continue;
- }
- // LOG_CONSOLE_DEBUG << std::this_thread::get_id() << ", allProxys size:" << _allProxys.size() << ", " << iter->cmpDesc() << endl;
- auto iterAdapter = _allProxys.find(iter->cmpDesc());
- if(iterAdapter == _allProxys.end())
- {
- AdapterProxy* ap = new AdapterProxy(_objectProxy, *iter, _communicator);
- result = _allProxys.insert(make_pair(iter->cmpDesc(),ap));
- iterAdapter = result.first;
- _vAllProxys.push_back(ap);
- }
- //该节点在主控的状态为active
- iterAdapter->second->setActiveInReg(true);
- _activeProxys.push_back(iterAdapter->second);
- _regProxys.insert(make_pair(iter->cmpDesc(),iterAdapter->second));
- const string &host = iterAdapter->second->endpoint().host();
- _indexActiveProxys.insert(make_pair(host, iterAdapter->second));
- _sortActivProxys.insert(make_pair(host, iterAdapter->second));
- //设置该节点的静态权重值
- iterAdapter->second->setWeight(iter->weight());
- }
- //更新inactive
- for(auto iter = inactive.begin(); iter != inactive.end(); ++iter)
- {
- if(!_direct && _weightType == E_STATIC_WEIGHT && iter->weight() <= 0)
- {
- continue;
- }
- auto iterAdapter = _allProxys.find(iter->cmpDesc());
- if(iterAdapter == _allProxys.end())
- {
- AdapterProxy* ap = new AdapterProxy(_objectProxy, *iter, _communicator);
- result = _allProxys.insert(make_pair(iter->cmpDesc(),ap));
- assert(result.second);
- iterAdapter = result.first;
- _vAllProxys.push_back(ap);
- }
- //该节点在主控的状态为inactive
- iterAdapter->second->setActiveInReg(false);
- _regProxys.insert(make_pair(iter->cmpDesc(),iterAdapter->second));
- //设置该节点的静态权重值
- iterAdapter->second->setWeight(iter->weight());
- }
- //_vRegProxys 需要按顺序来 重排
- _vRegProxys.clear();
- auto iterAdapter = _regProxys.begin();
- for(;iterAdapter != _regProxys.end();++iterAdapter)
- {
- _vRegProxys.push_back(iterAdapter->second);
- }
- _update = true;
- }
- void EndpointManager::notifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive,bool bNotify)
- {
- updateEndpoints(active, inactive);
- //丢给外层统一做
- _objectProxy->onNotifyEndpoints(active, inactive);
- }
- void EndpointManager::doNotify()
- {
- _objectProxy->doInvoke();
- }
- bool EndpointManager::selectAdapterProxy(ReqMessage * msg,AdapterProxy * & pAdapterProxy)
- {
- pAdapterProxy = NULL;
- //刷新主控
- refreshReg(E_DEFAULT, "");
- //无效的数据 返回true
- if (!_valid)
- {
- return true;
- }
- //如果有hash,则先使用hash策略
- if (msg->data._hash)
- {
- pAdapterProxy = getHashProxy(msg->data._hashCode, msg->data._conHash);
- return false;
- }
-
- if(_weightType == E_STATIC_WEIGHT)
- {
- //权重模式
- bool bStaticWeighted = false;
- if(_weightType == E_STATIC_WEIGHT || msg->eType == ReqMessage::ONE_WAY)
- bStaticWeighted = true;
- pAdapterProxy = getWeightedProxy(bStaticWeighted);
- }
- else
- {
- //普通轮询模式
- pAdapterProxy = getNextValidProxy();
- }
- return false;
- }
- AdapterProxy * EndpointManager::getNextValidProxy()
- {
- if (_activeProxys.empty())
- {
- TLOGERROR("[EndpointManager::getNextValidProxy activeEndpoints is empty][obj:"<<_objName<<"]" << endl);
- return NULL;
- }
- vector<AdapterProxy*> conn;
- for(size_t i=0;i<_activeProxys.size();i++)
- {
- ++_lastRoundPosition;
- if(_lastRoundPosition >= _activeProxys.size())
- {
- _lastRoundPosition = 0;
- }
- if(_activeProxys[_lastRoundPosition]->checkActive(false))
- {
- return _activeProxys[_lastRoundPosition];
- }
- if(!_activeProxys[_lastRoundPosition]->isConnTimeout() && !_activeProxys[_lastRoundPosition]->isConnExc()) {
- conn.push_back(_activeProxys[_lastRoundPosition]);
- }
- }
- if(conn.size() > 0)
- {
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- adapterProxy->resetRetryTime(false);
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- AdapterProxy* EndpointManager::getHashProxy(int64_t hashCode, bool bConsistentHash)
- {
- if(_weightType == E_STATIC_WEIGHT)
- {
- if(bConsistentHash)
- {
- return getConHashProxyForWeight(hashCode, true);
- }
- else
- {
- return getHashProxyForWeight(hashCode, true, _hashStaticRouterCache);
- }
- }
- else
- {
- if(bConsistentHash)
- {
- return getConHashProxyForNormal(hashCode);
- }
- else
- {
- return getHashProxyForNormal(hashCode);
- }
- }
- }
- AdapterProxy* EndpointManager::getHashProxyForWeight(int64_t hashCode, bool bStatic, vector<size_t> &vRouterCache)
- {
- if(_vRegProxys.empty())
- {
- TLOGERROR("[EndpointManager::getHashProxyForWeight _vRegProxys is empty], bStatic:" << bStatic << endl);
- return NULL;
- }
- if(checkHashStaticWeightChange(bStatic))
- {
- int64_t iBegin = TNOWMS;
- updateHashProxyWeighted(bStatic);
- int64_t iEnd = TNOWMS;
- TLOGTARS("[EndpointManager::getHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
- }
- if(vRouterCache.size() > 0)
- {
- size_t hash = ((int64_t)hashCode) % vRouterCache.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= vRouterCache.size())
- {
- hash = hash % vRouterCache.size();
- }
- size_t iIndex = vRouterCache[hash];
- if(iIndex >= _vRegProxys.size())
- {
- iIndex = iIndex % _vRegProxys.size();
- }
- //被hash到的节点在主控是active的才走在流程
- if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive(true))
- {
- return _vRegProxys[iIndex];
- }
- else
- {
- TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << _vRegProxys[iIndex]->endpoint().desc() << endl);
- if(_activeProxys.empty())
- {
- TLOGERROR("[EndpointManager::getHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
- return NULL;
- }
- //在active节点中再次hash
- vector<AdapterProxy*> thisHash = _activeProxys;
- vector<AdapterProxy*> conn;
- do
- {
- hash = ((int64_t)hashCode) % thisHash.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= thisHash.size())
- {
- hash = hash % thisHash.size();
- }
- if (thisHash[hash]->checkActive(true))
- {
- return thisHash[hash];
- }
- if(!thisHash[hash]->isConnTimeout() &&
- !thisHash[hash]->isConnExc())
- {
- conn.push_back(thisHash[hash]);
- }
- thisHash.erase(thisHash.begin() + hash);
- }
- while(!thisHash.empty());
- if(conn.size() > 0)
- {
- hash = ((int64_t)hashCode) % conn.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= conn.size())
- {
- hash = hash % conn.size();
- }
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy *adapterProxy = conn[hash];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- adapterProxy->resetRetryTime(false);
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- }
- return getHashProxyForNormal(hashCode);
- }
- AdapterProxy* EndpointManager::getConHashProxyForWeight(int64_t hashCode, bool bStatic)
- {
- if(_vRegProxys.empty())
- {
- TLOGERROR("[EndpointManager::getConHashProxyForWeight _vRegProxys is empty], bStatic:" << bStatic << endl);
- return NULL;
- }
- if(checkConHashChange(bStatic, _lastConHashWeightProxys))
- {
- int64_t iBegin = TNOWMS;
- updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
- int64_t iEnd = TNOWMS;
- TLOGTARS("[EndpointManager::getConHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
- }
- while(_consistentHashWeight.size() > 0)
- {
- string sNode;
- // 通过一致性hash取到对应的节点
- _consistentHashWeight.getNodeName(hashCode, sNode);
- auto it = _indexActiveProxys.find(sNode);
- // 节点不存在,可能是下线或者服务不可用
- if (it == _indexActiveProxys.end())
- {
- updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
- continue;
- }
- //被hash到的节点在主控是active的才走在流程
- if (it->second->isActiveInReg() && it->second->checkActive(true))
- {
- return it->second;
- }
- else
- {
- TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl);
- // 剔除节点再次hash
- if (!it->second->isActiveInReg())
- {
- // 如果在主控的注册状态不是active直接删除,如果状态有变更由updateEndpoints函数里重新添加
- _indexActiveProxys.erase(sNode);
- }
- // checkConHashChange里重新加回到_sortActivProxys重试
- _sortActivProxys.erase(sNode);
- updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
- if (_indexActiveProxys.empty())
- {
- TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
- return NULL;
- }
- }
- }
- /*if(_consistentHashWeight.size() > 0)
- {
- unsigned int iIndex = 0;
- // 通过一致性hash取到对应的节点
- _consistentHashWeight.getIndex(hashCode, iIndex);
- if(iIndex >= _vRegProxys.size())
- {
- iIndex = iIndex % _vRegProxys.size();
- }
- //被hash到的节点在主控是active的才走在流程
- if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive(true))
- {
- return _vRegProxys[iIndex];
- }
- else
- {
- TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "," << _vRegProxys[iIndex]->getTransceiver()->getEndpointInfo().desc() << endl);
- if(_activeProxys.empty())
- {
- TLOGERROR("[EndpointManager::getConHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
- return NULL;
- }
- //在active节点中再次hash
- vector<AdapterProxy*> thisHash = _activeProxys;
- vector<AdapterProxy*> conn;
- size_t hash = 0;
- do
- {
- hash = ((int64_t)hashCode) % thisHash.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= thisHash.size())
- {
- hash = hash % thisHash.size();
- }
- if (thisHash[hash]->checkActive(false))
- {
- return thisHash[hash];
- }
- if(!thisHash[hash]->isConnTimeout() && !thisHash[hash]->isConnExc())
- {
- conn.push_back(thisHash[hash]);
- }
- thisHash.erase(thisHash.begin() + hash);
- }
- while(!thisHash.empty());
- if(conn.size() > 0)
- {
- hash = ((int64_t)hashCode) % conn.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= conn.size())
- {
- hash = hash % conn.size();
- }
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy *adapterProxy = conn[hash];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- adapterProxy->resetRetryTime(false);
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- }*/
- return getHashProxyForNormal(hashCode);
- }
- bool EndpointManager::checkHashStaticWeightChange(bool bStatic)
- {
- if(bStatic)
- {
- if(_lastHashStaticProxys.size() != _vRegProxys.size())
- {
- return true;
- }
- for(size_t i = 0; i < _vRegProxys.size(); i++)
- {
- //解决服务权重更新时哈希表不更新的问题
- if((_lastHashStaticProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc()) || _vRegProxys[i]->checkWeightChanged(true))
- {
- return true;
- }
- }
- }
- return false;
- }
- bool EndpointManager::checkConHashChange(bool bStatic, const map<string, AdapterProxy*> &mLastConHashProxys)
- {
- // 将之前故障临时剔除的节点重新加回来重试
- if (_indexActiveProxys.size() != _sortActivProxys.size())
- {
- for (auto &it : _indexActiveProxys)
- {
- _sortActivProxys[it.first] = it.second;
- }
- }
- if(mLastConHashProxys.size() != _sortActivProxys.size())
- {
- return true;
- }
- auto itLast = mLastConHashProxys.begin();
- auto itSort = _sortActivProxys.begin();
- for (; itLast!=mLastConHashProxys.end() && itSort!=_sortActivProxys.end(); ++itLast,++itSort)
- {
- if (itLast->first != itSort->first)
- {
- return true;
- }
- //解决服务权重更新时一致性哈希环不更新的问题
- if(bStatic && itSort->second->checkWeightChanged(true))
- {
- return true;
- }
- }
- /*
- if(vLastConHashProxys.size() != _vRegProxys.size())
- {
- return true;
- }
- for(size_t i = 0; i < _vRegProxys.size(); i++)
- {
- if(vLastConHashProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc())
- {
- return true;
- }
- //解决服务权重更新时一致性哈希环不更新的问题
- if(bStatic && _vRegProxys[i]->checkWeightChanged(true))
- {
- return true;
- }
- }
- */
- return false;
- }
- void EndpointManager::updateHashProxyWeighted(bool bStatic)
- {
- if(_vRegProxys.size() <= 0)
- {
- TLOGERROR("[EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
- return ;
- }
- if(bStatic)
- {
- _lastHashStaticProxys = _vRegProxys;
- _hashStaticRouterCache.clear();
- }
- vector<AdapterProxy*> vRegProxys;
- vector<size_t> vIndex;
- for(size_t i = 0; i < _vRegProxys.size(); ++i)
- {
- if(_vRegProxys[i]->getWeight() > 0)
- {
- vRegProxys.push_back(_vRegProxys[i]);
- vIndex.push_back(i);
- }
- //防止多个服务节点权重同时更新时哈希表多次更新
- _vRegProxys[i]->resetWeightChanged();
- }
- if(vRegProxys.size() <= 0)
- {
- TLOGERROR("[EndpointManager::updateHashProxyWeighted vRegProxys is empty], bStatic:" << bStatic << endl);
- return ;
- }
- size_t iHashStaticWeightSize = vRegProxys.size();
- map<size_t, int> mIdToWeight;
- multimap<int, size_t> mWeightToId;
- size_t iMaxR = 0;
- size_t iMaxRouterR = 0;
- size_t iMaxWeight = vRegProxys[0]->getWeight();
- size_t iMinWeight = vRegProxys[0]->getWeight();
- size_t iTempWeight = 0;
- for(size_t i = 1;i < iHashStaticWeightSize; i++)
- {
- iTempWeight = vRegProxys[i]->getWeight();
-
- if(iTempWeight > iMaxWeight)
- {
- iMaxWeight = iTempWeight;
- }
- if(iTempWeight < iMinWeight)
- {
- iMinWeight = iTempWeight;
- }
- }
- if(iMinWeight > 0)
- {
- iMaxR = iMaxWeight / iMinWeight;
- if(iMaxR < iMinWeightLimit)
- iMaxR = iMinWeightLimit;
- if(iMaxR > iMaxWeightLimit)
- iMaxR = iMaxWeightLimit;
- }
- else
- {
- iMaxR = 1;
- iMaxWeight = 1;
- }
- for(size_t i = 0;i < iHashStaticWeightSize; i++)
- {
- int iWeight = (vRegProxys[i]->getWeight() * iMaxR) / iMaxWeight;
- if(iWeight > 0)
- {
- iMaxRouterR += iWeight;
- mIdToWeight.insert(map<size_t, int>::value_type(vIndex[i], iWeight));
- mWeightToId.insert(make_pair(iWeight, vIndex[i]));
- }
- else
- {
- if(bStatic)
- {
- _hashStaticRouterCache.push_back(vIndex[i]);
- }
- }
- TLOGTARS("EndpointManager::updateHashProxyWeighted bStatic:" << bStatic << "|_objName:" << _objName << "|endpoint:" << vRegProxys[i]->endpoint().desc() << "|iWeight:" << vRegProxys[i]->getWeight() << "|iWeightR:" << iWeight << "|iIndex:" << vIndex[i] << endl);
- }
- for(size_t i = 0; i < iMaxRouterR; i++)
- {
- bool bFirst = true;
- multimap<int, size_t> mulTemp;
- multimap<int, size_t>::reverse_iterator mIter = mWeightToId.rbegin();
- while(mIter != mWeightToId.rend())
- {
- if(bFirst)
- {
- bFirst = false;
- if(bStatic)
- {
- _hashStaticRouterCache.push_back(mIter->second);
- }
- mulTemp.insert(make_pair((mIter->first - iMaxRouterR + mIdToWeight[mIter->second]), mIter->second));
- }
- else
- {
- mulTemp.insert(make_pair((mIter->first + mIdToWeight[mIter->second]), mIter->second));
- }
- mIter++;
- }
- mWeightToId.clear();
- mWeightToId.swap(mulTemp);
- }
- }
- void EndpointManager::updateConHashProxyWeighted(bool bStatic, map<string, AdapterProxy*> &mLastConHashProxys, TC_ConsistentHashNew &conHash)
- {
- conHash.clear();
- if(_sortActivProxys.empty())
- {
- TLOGERROR("[EndpointManager::updateHashProxyWeighted _indexActiveProxys is empty], bStatic:" << bStatic << endl);
- return ;
- }
- mLastConHashProxys = _sortActivProxys;
- for (auto it = _sortActivProxys.begin(); it != _sortActivProxys.end(); ++it)
- {
- int iWeight = (bStatic ? (it->second->getWeight()) : 100);
- if(iWeight > 0)
- {
- iWeight = iWeight / 4;
- if(iWeight <= 0)
- {
- iWeight = 1;
- }
- // 同一服务有多个obj的情况
- // 同一hash值调用不同的obj会hash到不同的服务器
- // 因为addNode会根据desc(ip+port)计算md5,导致顺序不一致
- // 一致性hash用host进行索引,不使用index,这里传0
- conHash.addNode(it->second->endpoint().host(), 0, iWeight);
- }
- //防止多个服务节点权重同时更新时一致性哈希环多次更新
- it->second->resetWeightChanged();
- }
- /*
- if(_vRegProxys.size() <= 0)
- {
- TLOGERROR("[EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
- return ;
- }
- vLastConHashProxys = _vRegProxys;
- conHash.clear();
- for(size_t i = 0; i < _vRegProxys.size(); ++i)
- {
- int iWeight = (bStatic ? (_vRegProxys[i]->getWeight()) : 100);
- if(iWeight > 0)
- {
- iWeight = iWeight / 4;
- if(iWeight <= 0)
- {
- iWeight = 1;
- }
- // 同一服务有多个obj的情况
- // 同一hash值调用不同的obj会hash到不同的服务器
- // 因为addNode会根据desc(ip+port)计算md5,导致顺序不一致
- conHash.addNode(_vRegProxys[i]->endpoint().host(), i, iWeight);
- }
- //防止多个服务节点权重同时更新时一致性哈希环多次更新
- _vRegProxys[i]->resetWeightChanged();
- }
- */
- conHash.sortNode();
- }
- AdapterProxy* EndpointManager::getHashProxyForNormal(int64_t hashCode)
- {
- if(_vRegProxys.empty())
- {
- TLOGERROR("[EndpointManager::getHashProxyForNormal _vRegProxys is empty]" << endl);
- return NULL;
- }
- // 1 _vRegProxys从客户端启动之后,就不会再改变,除非有节点增加
- // 2 如果有增加节点,则_vRegProxys顺序会重新排序,之前的hash会改变
- // 3 节点下线后,需要下次启动客户端后,_vRegProxys内容才会生效
- size_t hash = ((int64_t)hashCode) % _vRegProxys.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= _vRegProxys.size())
- {
- hash = hash % _vRegProxys.size();
- }
- //被hash到的节点在主控是active的才走在流程
- if (_vRegProxys[hash]->isActiveInReg() && _vRegProxys[hash]->checkActive(true))
- {
- return _vRegProxys[hash];
- }
- else
- {
- TLOGWARN("[EndpointManager::getHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << _vRegProxys[hash]->endpoint().desc() << endl);
- if(_activeProxys.empty())
- {
- TLOGERROR("[EndpointManager::getHashProxyForNormal _activeEndpoints is empty]" << endl);
- return NULL;
- }
- //在active节点中再次hash
- vector<AdapterProxy*> thisHash = _activeProxys;
- vector<AdapterProxy*> conn;
- do
- {
- hash = ((int64_t)hashCode) % thisHash.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= thisHash.size())
- {
- hash = hash % thisHash.size();
- }
- if (thisHash[hash]->checkActive(true))
- {
- return thisHash[hash];
- }
- if(!thisHash[hash]->isConnTimeout() &&
- !thisHash[hash]->isConnExc())
- {
- conn.push_back(thisHash[hash]);
- }
- thisHash.erase(thisHash.begin() + hash);
- }
- while(!thisHash.empty());
- if(conn.size() > 0)
- {
- hash = ((int64_t)hashCode) % conn.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= conn.size())
- {
- hash = hash % conn.size();
- }
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy *adapterProxy = conn[hash];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- adapterProxy->resetRetryTime(false);
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- }
- AdapterProxy* EndpointManager::getConHashProxyForNormal(int64_t hashCode)
- {
- if(_vRegProxys.empty())
- {
- TLOGERROR("[EndpointManager::getConHashProxyForNormal _vRegProxys is empty]" << endl);
- return NULL;
- }
- if(checkConHashChange(false, _lastConHashProxys))
- {
- int64_t iBegin = TNOWMS;
- updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
- int64_t iEnd = TNOWMS;
- TLOGTARS("[EndpointManager::getConHashProxyForNormal update _objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
- }
- while(_consistentHash.size() > 0)
- {
- string sNode;
- // 通过一致性hash取到对应的节点
- _consistentHash.getNodeName(hashCode, sNode);
- auto it = _indexActiveProxys.find(sNode);
- // 节点不存在,可能是下线或者服务不可用
- if (it == _indexActiveProxys.end())
- {
- updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
- continue;
- }
- //被hash到的节点在主控是active的才走在流程
- if (it->second->isActiveInReg() && it->second->checkActive(true))
- {
- return it->second;
- }
- else
- {
- TLOGWARN("[EndpointManager::getConHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl);
- // 剔除节点再次hash
- if (!it->second->isActiveInReg())
- {
- // 如果在主控的注册状态不是active直接删除,如果状态有变更由updateEndpoints函数里重新添加
- _indexActiveProxys.erase(sNode);
- }
- // checkConHashChange里重新加回到_sortActivProxys重试
- _sortActivProxys.erase(sNode);
- updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
- if (_indexActiveProxys.empty())
- {
- TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
- return NULL;
- }
- }
- }
- /*
- if(_consistentHash.size() > 0)
- {
- unsigned int iIndex = 0;
- // 通过一致性hash取到对应的节点
- _consistentHash.getIndex(hashCode, iIndex);
- if(iIndex >= _vRegProxys.size())
- {
- iIndex = iIndex % _vRegProxys.size();
- }
- //被hash到的节点在主控是active的才走在流程
- if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive(true))
- {
- return _vRegProxys[iIndex];
- }
- else
- {
- TLOGWARN("[EndpointManager::getConHashProxyForNormal, hash not active," << _objectProxy->name() << "," << _vRegProxys[iIndex]->getTransceiver()->getEndpointInfo().desc() << endl);
- if(_activeProxys.empty())
- {
- TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
- return NULL;
- }
- //在active节点中再次hash
- vector<AdapterProxy*> thisHash = _activeProxys;
- vector<AdapterProxy*> conn;
- size_t hash = 0;
- do
- {
- hash = ((int64_t)hashCode) % thisHash.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= thisHash.size())
- {
- hash = hash % thisHash.size();
- }
- if (thisHash[hash]->checkActive(true))
- {
- return thisHash[hash];
- }
- if(!thisHash[hash]->isConnTimeout() &&
- !thisHash[hash]->isConnExc())
- {
- conn.push_back(thisHash[hash]);
- }
- thisHash.erase(thisHash.begin() + hash);
- }
- while(!thisHash.empty());
- if(conn.size() > 0)
- {
- hash = ((int64_t)hashCode) % conn.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= conn.size())
- {
- hash = hash % conn.size();
- }
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy *adapterProxy = conn[hash];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- adapterProxy->resetRetryTime(false);
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- }
- */
- return getHashProxyForNormal(hashCode);
- }
- AdapterProxy* EndpointManager::getWeightedProxy(bool bStaticWeighted)
- {
- return getWeightedForNormal(bStaticWeighted);
- }
- AdapterProxy* EndpointManager::getWeightedForNormal(bool bStaticWeighted)
- {
- if (_activeProxys.empty())
- {
- TLOGERROR("[EndpointManager::getWeightedForNormal activeEndpoints is empty][obj:"<<_objName<<"]" << endl);
- return NULL;
- }
- int64_t iNow = TNOW;
- if(_lastBuildWeightTime <= iNow)
- {
- updateProxyWeighted();
- if(!_first)
- {
- _lastBuildWeightTime = iNow + _updateWeightInterval;
- }
- else
- {
- _first = false;
- _lastBuildWeightTime = iNow + _updateWeightInterval + 5;
- }
- }
- bool bEmpty = false;
- int iActiveSize = _activeWeightProxy.size();
-
- if(iActiveSize > 0)
- {
- size_t iProxyIndex = 0;
- set<AdapterProxy*> sConn;
- if(_staticRouterCache.size() > 0)
- {
- for(size_t i = 0;i < _staticRouterCache.size(); i++)
- {
- ++_lastSWeightPosition;
- if(_lastSWeightPosition >= _staticRouterCache.size())
- _lastSWeightPosition = 0;
- iProxyIndex = _staticRouterCache[_lastSWeightPosition];
- if(_activeWeightProxy[iProxyIndex]->checkActive(false))
- {
- return _activeWeightProxy[iProxyIndex];
- }
- if(!_activeWeightProxy[iProxyIndex]->isConnTimeout() &&
- !_activeWeightProxy[iProxyIndex]->isConnExc())
- {
- sConn.insert(_activeWeightProxy[iProxyIndex]);
- }
- }
- }
- else
- {
- bEmpty = true;
- }
- if(!bEmpty)
- {
- if(sConn.size() > 0)
- {
- vector<AdapterProxy*> conn;
- set<AdapterProxy*>::iterator it_conn = sConn.begin();
- while(it_conn != sConn.end())
- {
- conn.push_back(*it_conn);
- ++it_conn;
- }
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeWeightProxy[((uint32_t)rand() % iActiveSize)];
- adapterProxy->resetRetryTime(false);
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- }
- vector<AdapterProxy*> conn;
- for(size_t i=0;i<_activeProxys.size();i++)
- {
- ++_lastRoundPosition;
- if(_lastRoundPosition >= _activeProxys.size())
- _lastRoundPosition = 0;
- if(_activeProxys[_lastRoundPosition]->checkActive(false))
- {
- return _activeProxys[_lastRoundPosition];
- }
- if(!_activeProxys[_lastRoundPosition]->isConnTimeout() &&
- !_activeProxys[_lastRoundPosition]->isConnExc())
- conn.push_back(_activeProxys[_lastRoundPosition]);
- }
- if(conn.size() > 0)
- {
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- adapterProxy->resetRetryTime(false);
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- void EndpointManager::updateProxyWeighted()
- {
- size_t iWeightProxySize = _activeProxys.size();
- if(iWeightProxySize <= 0)
- {
- TLOGERROR("[EndpointManager::updateProxyWeighted _objName:" << _objName << ", activeProxys.size() <= 0]" << endl);
- return ;
- }
- vector<AdapterProxy*> vProxy;
- for(size_t i = 0; i < _activeProxys.size(); ++i)
- {
- if(_activeProxys[i]->getWeight() > 0)
- {
- vProxy.push_back(_activeProxys[i]);
- }
- }
- iWeightProxySize = vProxy.size();
- if(iWeightProxySize <= 0)
- {
- TLOGERROR("[EndpointManager::updateProxyWeighted _objName:" << _objName << ", vProxy.size() <= 0]" << endl);
- return ;
- }
- if(_update)
- {
- _activeWeightProxy = vProxy;
- updateStaticWeighted();
- }
- _update = false;
- }
- void EndpointManager::updateStaticWeighted()
- {
- size_t iWeightProxySize = _activeWeightProxy.size();
- vector<int> vWeight;
- vWeight.resize(iWeightProxySize);
- for(size_t i = 0; i < iWeightProxySize; i++)
- {
- vWeight[i] = _activeWeightProxy[i]->getWeight();
- }
- dispatchEndpointCache(vWeight);
- }
- void EndpointManager::dispatchEndpointCache(const vector<int> &vWeight)
- {
- if(vWeight.size() <= 0)
- {
- TLOGERROR("[EndpointManager::dispatchEndpointCache vWeight.size() < 0]" << endl);
- return ;
- }
- size_t iWeightProxySize = vWeight.size();
- map<size_t, int> mIdToWeight;
- multimap<int, size_t> mWeightToId;
- size_t iMaxR = 0;
- size_t iMaxRouterR = 0;
- size_t iMaxWeight = 0;
- size_t iMinWeight = 0;
- size_t iTotalCapacty = 0;
- size_t iTempWeight = 0;
- for(size_t i = 0; i < vWeight.size(); ++i)
- {
- iTotalCapacty += vWeight[i];
- }
- _staticRouterCache.clear();
- _lastSWeightPosition = 0;
- _staticRouterCache.reserve(iTotalCapacty+100);
- iMaxWeight = vWeight[0];
- iMinWeight = vWeight[0];
- for(size_t i = 1;i < iWeightProxySize; i++)
- {
- iTempWeight = vWeight[i];
-
- if(iTempWeight > iMaxWeight)
- {
- iMaxWeight = iTempWeight;
- }
- if(iTempWeight < iMinWeight)
- {
- iMinWeight = iTempWeight;
- }
- }
- if(iMinWeight > 0)
- {
- iMaxR = iMaxWeight / iMinWeight;
- if(iMaxR < iMinWeightLimit)
- iMaxR = iMinWeightLimit;
- if(iMaxR > iMaxWeightLimit)
- iMaxR = iMaxWeightLimit;
- }
- else
- {
- iMaxR = 1;
- iMaxWeight = 1;
- }
- for(size_t i = 0;i < iWeightProxySize; i++)
- {
- int iWeight = (vWeight[i] * iMaxR) / iMaxWeight;
- if(iWeight > 0)
- {
- iMaxRouterR += iWeight;
- mIdToWeight.insert(map<size_t, int>::value_type(i, iWeight));
- mWeightToId.insert(make_pair(iWeight, i));
- }
- else
- {
- _staticRouterCache.push_back(i);
- }
-
- TLOGTARS("EndpointManager::dispatchEndpointCache _objName:" << _objName << "|endpoint:" << _activeWeightProxy[i]->endpoint().desc() << "|iWeightR:" << iWeight << endl);
- }
- for(size_t i = 0; i < iMaxRouterR; i++)
- {
- bool bFirst = true;
- multimap<int, size_t> mulTemp;
- multimap<int, size_t>::reverse_iterator mIter = mWeightToId.rbegin();
- while(mIter != mWeightToId.rend())
- {
- if(bFirst)
- {
- bFirst = false;
- _staticRouterCache.push_back(mIter->second);
- mulTemp.insert(make_pair((mIter->first - iMaxRouterR + mIdToWeight[mIter->second]), mIter->second));
- }
- else
- {
- mulTemp.insert(make_pair((mIter->first + mIdToWeight[mIter->second]), mIter->second));
- }
- mIter++;
- }
- mWeightToId.clear();
- mWeightToId.swap(mulTemp);
- }
- }
- /////////////////////////////////////////////////////////////////////////////
- EndpointThread::EndpointThread(Communicator* pComm, const string & sObjName, GetEndpointType type, const string & sName, bool bFirstNetThread)
- : QueryEpBase(pComm,bFirstNetThread,true)
- , _type(type)
- , _name(sName)
- {
- init(sObjName, "", true);
- }
- void EndpointThread::getEndpoints(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- //直连调用这个接口无效
- if(_direct)
- {
- return ;
- }
- {
- TC_LockT<TC_ThreadMutex> lock(_mutex);
- refreshReg(_type,_name);
-
- activeEndPoint = _activeEndPoint;
- inactiveEndPoint = _inactiveEndPoint;
- }
- }
- void EndpointThread::getTCEndpoints(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- //直连调用这个接口无效
- if(_direct)
- {
- return ;
- }
- {
-
- TC_LockT<TC_ThreadMutex> lock(_mutex);
-
- refreshReg(_type,_name);
-
- activeEndPoint = _activeTCEndPoint;
- inactiveEndPoint = _inactiveTCEndPoint;
- }
- }
- void EndpointThread::notifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive,bool bSync)
- {
- if(!bSync)
- {
- TC_LockT<TC_ThreadMutex> lock(_mutex);
- update(active, inactive);
- }
- else
- {
- update(active, inactive);
- }
- }
- void EndpointThread::update(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
- {
- _activeEndPoint.clear();
- _inactiveEndPoint.clear();
- _activeTCEndPoint.clear();
- _inactiveTCEndPoint.clear();
- set<EndpointInfo>::iterator iter= active.begin();
- for(;iter != active.end(); ++iter)
- {
- // TC_Endpoint ep = (iter->host(), iter->port(), 3000, iter->type(), iter->grid());
- _activeTCEndPoint.push_back(iter->getEndpoint());
- _activeEndPoint.push_back(*iter);
- }
- iter = inactive.begin();
- for(;iter != inactive.end(); ++iter)
- {
- // TC_Endpoint ep(iter->host(), iter->port(), 3000, iter->type(), iter->grid());
- _inactiveTCEndPoint.push_back(iter->getEndpoint());
- _inactiveEndPoint.push_back(*iter);
- }
- }
- /////////////////////////////////////////////////////////////////////////////
- EndpointManagerThread::EndpointManagerThread(Communicator * pComm,const string & sObjName)
- :_communicator(pComm)
- ,_objName(sObjName)
- {
- }
- EndpointManagerThread::~EndpointManagerThread()
- {
- map<string,EndpointThread*>::iterator iter;
- for(iter=_info.begin();iter != _info.end();iter++)
- {
- if(iter->second)
- {
- delete iter->second;
- iter->second = NULL;
- }
- }
- }
- void EndpointManagerThread::getEndpoint(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_DEFAULT,"");
- pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getEndpointByAll(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_ALL,"");
- pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getEndpointBySet(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_SET,sName);
- pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getEndpointByStation(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_STATION,sName);
- pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getTCEndpoint(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_DEFAULT,"");
- pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getTCEndpointByAll(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_ALL,"");
- pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getTCEndpointBySet(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_SET,sName);
- pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getTCEndpointByStation(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_STATION,sName);
- pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
- }
- EndpointThread * EndpointManagerThread::getEndpointThread(GetEndpointType type,const string & sName)
- {
- TC_LockT<TC_SpinLock> lock(_mutex);
- string sAllName = TC_Common::tostr((int)type) + ":" + sName;
- map<string,EndpointThread*>::iterator iter;
- iter = _info.find(sAllName);
- if(iter != _info.end())
- {
- return iter->second;
- }
- EndpointThread * pThread = new EndpointThread(_communicator, _objName, type, sName);
- _info[sAllName] = pThread;
- return pThread;
- }
- }
|