123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980 |
- /**
- * Tencent is pleased to support the open source community by making Tars available.
- *
- * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
- *
- * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * https://opensource.org/licenses/BSD-3-Clause
- *
- * Unless required by applicable law or agreed to in writing, software distributed
- * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
- #include "servant/EndpointManager.h"
- #include "servant/ObjectProxy.h"
- #include "servant/TarsLogger.h"
- #include "servant/AppCache.h"
- #include "servant/Application.h"
- #include "servant/StatReport.h"
- namespace tars
- {
- /////////////////////////////////////////////////////////////////////////////
- QueryEpBase::QueryEpBase(Communicator * pComm, bool bFirstNetThread,bool bInterfaceReq)
- : _communicator(pComm)
- , _firstNetThread(bFirstNetThread)
- , _interfaceReq(bInterfaceReq)
- , _direct(false)
- , _objName("")
- , _invokeSetId("")
- , _locator("")
- , _valid(false)
- , _weightType(E_LOOP)
- , _requestRegistry(false)
- , _requestTimeout(0)
- , _timeoutInterval(5*1000)
- , _refreshTime(0)
- , _refreshInterval(60*1000)
- , _activeEmptyInterval(10*1000)
- , _failInterval(2*1000)
- , _manyFailInterval(30*1000)
- , _failTimesLimit(3)
- , _failTimes(0)
- {
- setNoDelete(true);
- }
- void QueryEpBase::callback_findObjectById4All(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp)
- {
- TLOGTARS("[TARS][callback_findObjectById4All _objName:" << _objName << "|ret:" << ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectById4All_exception(tars::Int32 ret)
- {
- TLOGERROR("[TARS][callback_findObjectById4All_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- void QueryEpBase::callback_findObjectById4Any(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp)
- {
- TLOGTARS("[TARS][callback_findObjectById4Any _objName:" << _objName << "|ret:" << ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectById4Any_exception(tars::Int32 ret)
- {
- TLOGERROR("[TARS][callback_findObjectById4Any_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameGroup(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp)
- {
- TLOGTARS("[TARS][callback_findObjectByIdInSameGroup _objName:" << _objName << "|ret:"<<ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameGroup_exception(tars::Int32 ret)
- {
- TLOGERROR("[TARS][callback_findObjectByIdInSameGroup_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameSet( Int32 ret, const vector<tars::EndpointF> &activeEp, const vector<tars::EndpointF> & inactiveEp)
- {
- TLOGTARS("[TARS][callback_findObjectByIdInSameSet _objName:" << _objName << "|ret:" << ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameSet_exception( Int32 ret)
- {
- TLOGERROR("[TARS][callback_findObjectByIdInSameSet_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameStation( Int32 ret, const vector<tars::EndpointF> &activeEp, const vector<tars::EndpointF> &inactiveEp)
- {
- TLOGTARS("[TARS][callback_findObjectByIdInSameStation _objName:" << _objName << "|ret:" << ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameStation_exception( Int32 ret)
- {
- TLOGERROR("[TARS][callback_findObjectByIdInSameStation_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- int QueryEpBase::setLocatorPrx(QueryFPrx prx)
- {
- _queryFPrx = prx;
- return 0;
- }
- bool QueryEpBase::init(const string & sObjName,const string & sLocator,const string& setName)
- {
- TLOGTARS("[TARS][QueryEpBase::init sObjName:" << sObjName << ",sLocator:" << sLocator << ",setName:" << setName << "]" << endl);
- _locator = sLocator;
- _invokeSetId = setName;
- setObjName(sObjName);
- return true;
- }
- void QueryEpBase::setObjName(const string & sObjName)
- {
- string sEndpoints("");
- string sInactiveEndpoints("");
- string::size_type pos = sObjName.find_first_of('@');
- if (pos != string::npos)
- {
- //[直接连接]指定服务的IP和端口列表
- _objName = sObjName.substr(0,pos);
- sEndpoints = sObjName.substr(pos + 1);
- _direct = true;
- _valid = true;
- }
- else
- {
- //[间接连接]通过registry查询服务端的IP和端口列表
- _direct = false;
- _valid = false;
- _objName = sObjName;
- if(_locator.find_first_not_of('@') == string::npos)
- {
- TLOGERROR("[TARS][QueryEpBase::setObjName locator is not valid,_locator:" << _locator << "]" << endl);
- throw TarsRegistryException("locator is not valid,_locator:" + _locator);
- }
- _queryFPrx = _communicator->stringToProxy<QueryFPrx>(_locator);
- string sLocatorKey = _locator;
- //如果启用set,则获取按set分组的缓存
- if(ClientConfig::SetOpen)
- {
- sLocatorKey += "_" + ClientConfig::SetDivision;
- }
- string objName = _objName + string(_invokeSetId.empty() ? "" : ":") + _invokeSetId;
- //[间接连接]第一次使用cache,如果是接口级请求则不从缓存读取
- if(!_interfaceReq)
- {
- sEndpoints = AppCache::getInstance()->get(objName,sLocatorKey);
- sInactiveEndpoints = AppCache::getInstance()->get("inactive_" + objName, sLocatorKey);
- }
- }
- setEndpoints(sEndpoints, _activeEndpoints);
- setEndpoints(sInactiveEndpoints, _inactiveEndpoints);
- if(_activeEndpoints.size() > 0)
- {
- _valid = true;
- }
- if(_activeEndpoints.size() > 0 || _inactiveEndpoints.size() > 0)
- {
- notifyEndpoints(_activeEndpoints, _inactiveEndpoints, true);
- }
- }
- vector<string> QueryEpBase::sepEndpoint(const string& sEndpoints)
- {
- vector<string> vEndpoints;
- bool flag = false;
- string::size_type startPos = 0;
- string::size_type sepPos = 0;
- for(string::size_type pos = 0; pos < sEndpoints.size(); pos++)
- {
- if(sEndpoints[pos] == ':' && !flag )
- {
- sepPos = pos;
- flag = true;
- }
- else if(flag)
- {
- if(sEndpoints[pos] == ' ')
- {
- continue;
- }
- if(TC_Port::strncasecmp("tcp", (sEndpoints.c_str() + pos), 3) == 0 || TC_Port::strncasecmp("udp", (sEndpoints.c_str() + pos), 3) == 0)
- {
- string ep = TC_Common::trim(string(sEndpoints.c_str() + startPos, sepPos - startPos));
- if(!ep.empty()) {
- vEndpoints.push_back(ep);
- }
- startPos = pos;
- }
- flag = false;
- }
- }
- string ep = sEndpoints.substr(startPos);
- if(!ep.empty()) {
- vEndpoints.push_back(ep);
- }
- // vEndpoints.push_back(sEndpoints.substr(startPos));
- return vEndpoints;
- }
- void QueryEpBase::setEndpoints(const string & sEndpoints, set<EndpointInfo> & setEndpoints)
- {
- if(sEndpoints == "")
- {
- return ;
- }
- bool bSameWeightType = true;
- bool bFirstWeightType = true;
- unsigned int iWeightType = 0;
- // vector<string> vEndpoints = TC_Common::sepstr<string>(sEndpoints, ":", false, isRealEndpoint);
- vector<string> vEndpoints = sepEndpoint(sEndpoints);
- for (size_t i = 0; i < vEndpoints.size(); ++i)
- {
- try
- {
- TC_Endpoint ep(vEndpoints[i]);
- EndpointInfo::EType type;
- if (ep.isSSL())
- type = EndpointInfo::SSL;
- else if (ep.isTcp())
- type = EndpointInfo::TCP;
- else
- type = EndpointInfo::UDP;
- string sSetDivision;
- //解析set分组信息
- {
- string sep = " -s ";
- size_t pos = vEndpoints[i].rfind(sep);
- if (pos != string::npos)
- {
- sSetDivision = TC_Common::trim(vEndpoints[i].substr(pos+sep.size()));
- size_t endPos = sSetDivision.find(" ");
- if (endPos != string::npos)
- {
- sSetDivision = sSetDivision.substr(0, endPos);
- }
- }
- }
- if(bFirstWeightType)
- {
- bFirstWeightType = false;
- iWeightType = ep.getWeightType();
- }
- else
- {
- if(ep.getWeightType() != iWeightType)
- {
- bSameWeightType = false;
- }
- }
- EndpointInfo epi(ep.getHost(), ep.getPort(), type, ep.getGrid(), sSetDivision, ep.getQos(), ep.getWeight(), ep.getWeightType(), ep.getAuthType());
- setEndpoints.insert(epi);
- }
- catch (...)
- {
- TLOGERROR("[TARS][QueryEpBase::setEndpoints parse error,objname:" << _objName << ",endpoint:" << vEndpoints[i] << "]" << endl);
- }
- }
- if(bSameWeightType)
- {
- if(iWeightType == 1)
- {
- _weightType = E_STATIC_WEIGHT;
- }
- else
- {
- _weightType = E_LOOP;
- }
- }
- else
- {
- _weightType = E_LOOP;
- }
- }
- void QueryEpBase::refreshReg(GetEndpointType type, const string & sName)
- {
- if(_direct)
- {
- return;
- }
- int64_t iNow = TNOWMS;
- //正在请求状态 而且请求超时了,或者第一次
- if(_requestRegistry && _requestTimeout < iNow)
- {
- doEndpointsExp(0);
- }
- //如果是间接连接,通过registry定时查询服务列表
- //正在请求状态 而且请求超时了
- //非请求状态 到了下一个刷新时间了
- if( (!_requestRegistry) && (_refreshTime <= iNow))
- {
- _requestRegistry = true;
- //一定时间不回调就算超时了
- _requestTimeout = iNow + _timeoutInterval;
- TLOGTARS("[TARS][QueryEpBase::refresh,"<<_objName<<"]"<<endl);
- //判断是同步调用还是异步调用
- //内部请求主控都是异步请求
- //接口请求主控第一次是同步请求
- bool bSync = (!_valid && _interfaceReq);
- try
- {
- if(bSync)
- {
- vector<tars::EndpointF> activeEp;
- vector<tars::EndpointF> inactiveEp;
- int iRet = 0;
- switch(type)
- {
- case E_ALL:
- {
- iRet = _queryFPrx->findObjectById4Any(_objName,activeEp,inactiveEp);
- break;
- }
- case E_STATION:
- {
- iRet = _queryFPrx->findObjectByIdInSameStation(_objName,sName,activeEp,inactiveEp);
- }
- case E_SET:
- {
- iRet = _queryFPrx->findObjectByIdInSameSet(_objName,sName,activeEp,inactiveEp);
- break;
- }
- case E_DEFAULT:
- default:
- {
- if(ClientConfig::SetOpen || !_invokeSetId.empty())
- {
- //指定set调用时,指定set的优先级最高
- string setId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
- iRet = _queryFPrx->findObjectByIdInSameSet(_objName,setId,activeEp,inactiveEp);
- }
- else
- {
- iRet = _queryFPrx->findObjectByIdInSameGroup(_objName,activeEp,inactiveEp);
- }
- break;
- }
- }
- doEndpoints(activeEp, inactiveEp, iRet, true);
- }
- else
- {
- switch(type)
- {
- case E_ALL:
- {
- _queryFPrx->async_findObjectById4Any(this,_objName);
- break;
- }
- case E_STATION:
- {
- _queryFPrx->async_findObjectByIdInSameStation(this,_objName,sName);
- break;
- }
- case E_SET:
- {
- _queryFPrx->async_findObjectByIdInSameSet(this,_objName,sName);
- break;
- }
- case E_DEFAULT:
- default:
- {
- if(ClientConfig::SetOpen || !_invokeSetId.empty())
- {
- //指定set调用时,指定set的优先级最高
- string setId = _invokeSetId.empty()?ClientConfig::SetDivision:_invokeSetId;
- _queryFPrx->async_findObjectByIdInSameSet(this,_objName,setId);
- }
- else
- {
- _queryFPrx->async_findObjectByIdInSameGroup(this,_objName);
- }
- break;
- }
- }//end switch
- }
- }
- catch(TC_Exception & ex)
- {
- TLOGERROR("[TARS]QueryEpBase::refreshReg obj:"<<_objName<<"exception:"<<ex.what()<<endl);
- doEndpointsExp(TARSSERVERUNKNOWNERR);
- }
- catch(...)
- {
- TLOGERROR("[TARS]QueryEpBase::refreshReg obj:"<<_objName<<"unknown exception:"<<endl);
- doEndpointsExp(TARSSERVERUNKNOWNERR);
- }
- }
- }
- void QueryEpBase::doEndpoints(const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp, int iRet, bool bSync)
- {
- if(iRet != 0)
- {
- doEndpointsExp(iRet);
- return ;
- }
- _failTimes = 0;
- _requestRegistry = false;
- int64_t iNow = TNOWMS;
- //有返回成功的结点,按照正常的频率
- //如果返回空列表或者返回失败 2s去刷新一次
- //接口请求主控的方式 不管是不是空都要去刷新
- if(activeEp.empty() && (!_interfaceReq) )
- {
- _refreshTime = iNow + _activeEmptyInterval;
- //如果registry返回Active服务列表为空,不做更新
- TLOGERROR("[TARS][QueryEpBase::doEndpoints, callback activeEps is empty,objname:"<< _objName << "]" << endl);
- return;
- }
- else
- {
- _refreshTime = iNow + _refreshInterval;
- }
- bool bNeedNotify = false;
- bool bSameWeightType = true;
- bool bFirstWeightType = true;
- int iWeightType = 0;
- set<string> sActiveEndpoints;
- set<string> sInactiveEndpoints;
- set<EndpointInfo> activeEps;
- set<EndpointInfo> inactiveEps;
- //生成active set 用于比较
- for (uint32_t i = 0; i < activeEp.size(); ++i)
- {
- if(bFirstWeightType)
- {
- bFirstWeightType = false;
- iWeightType = activeEp[i].weightType;
- }
- else
- {
- if(activeEp[i].weightType != iWeightType)
- {
- bSameWeightType = false;
- }
- }
- // tars istcp意思和这里枚举值对应
- EndpointInfo::EType type = EndpointInfo::EType(activeEp[i].istcp);
- EndpointInfo ep(activeEp[i].host, activeEp[i].port, type, activeEp[i].grid, activeEp[i].setId, activeEp[i].qos, activeEp[i].weight, activeEp[i].weightType, activeEp[i].authType);
- activeEps.insert(ep);
- }
- //生成inactive set 用于比较
- for (uint32_t i = 0; i < inactiveEp.size(); ++i)
- {
- // tars istcp意思和这里枚举值对应
- EndpointInfo::EType type = EndpointInfo::EType(inactiveEp[i].istcp);
- EndpointInfo ep(inactiveEp[i].host, inactiveEp[i].port, type, inactiveEp[i].grid, inactiveEp[i].setId, inactiveEp[i].qos, inactiveEp[i].weight, inactiveEp[i].weightType, inactiveEp[i].authType);
- inactiveEps.insert(ep);
- }
- if(bSameWeightType)
- {
- if(iWeightType == 1)
- {
- _weightType = E_STATIC_WEIGHT;
- }
- else
- {
- _weightType = E_LOOP;
- }
- }
- else
- {
- _weightType = E_LOOP;
- }
- if(activeEps != _activeEndpoints)
- {
- bNeedNotify = true;
- _activeEndpoints = activeEps;
- if(_firstNetThread)
- {
- setEndPointToCache(false);
- }
- }
- if(inactiveEps != _inactiveEndpoints)
- {
- bNeedNotify = true;
- _inactiveEndpoints = inactiveEps;
- if(_firstNetThread)
- {
- setEndPointToCache(true);
- }
- }
- if(bNeedNotify)
- {
- notifyEndpoints(_activeEndpoints, _inactiveEndpoints, bSync);
- }
- if(!_valid)
- {
- _valid = true;
- doNotify();
- }
- }
- void QueryEpBase::doEndpointsExp(int iRet)
- {
- _failTimes++;
- _requestRegistry = false;
- int64_t iNow = TNOWMS;
- //频率控制获取主控失败 2秒钟再更新
- _refreshTime = iNow + _failInterval;
- //获取主控连续失败3次就等30s再更新一次
- //连续失败 强制设成数据是有效的
- if(_failTimes > _failTimesLimit)
- {
- if(!_valid)
- {
- _valid = true;
- doNotify();
- }
- _refreshTime = iNow + _manyFailInterval;
- }
- }
- void QueryEpBase::setEndPointToCache(bool bInactive)
- {
- //如果是接口级请求则不缓存到文件
- if(_interfaceReq)
- {
- return;
- }
- string sEndpoints;
- set<EndpointInfo> doEndpoints;
- if(!bInactive)
- {
- doEndpoints = _activeEndpoints;
- }
- else
- {
- doEndpoints = _inactiveEndpoints;
- }
- set<EndpointInfo>::iterator iter;
- iter = doEndpoints.begin();
- for (; iter != doEndpoints.end(); ++iter)
- {
- //这里的超时时间 只是对服务端有效。这里的值无效。所以默认用3000了
- TC_Endpoint ep(iter->host(), iter->port(), 3000, iter->type(), iter->grid(), iter->qos(), iter->weight(), iter->getWeightType());
- ep.setAuthType(iter->authType());
- if (!sEndpoints.empty())
- {
- sEndpoints += ":";
- }
- sEndpoints += ep.toString();
- if (!iter->setDivision().empty())
- {
- sEndpoints += " -s " + iter->setDivision();
- }
- }
- //如果启用set,则按set分组保存
- string sLocatorKey = _locator;
- if(ClientConfig::SetOpen)
- {
- sLocatorKey += "_" + ClientConfig::SetDivision;
- }
- string objName = _objName + string(_invokeSetId.empty() ? "" : ":") + _invokeSetId;
- if(bInactive)
- {
- AppCache::getInstance()->set("inactive_"+objName,sEndpoints,sLocatorKey);
- }
- else
- {
- AppCache::getInstance()->set(objName,sEndpoints,sLocatorKey);
- }
- TLOGTARS("[TARS][setEndPointToCache,obj:" << _objName << ",invokeSetId:" << _invokeSetId << ",endpoint:" << sEndpoints << "]" << endl);
- }
- /////////////////////////////////////////////////////////////////////////////
- EndpointManager::EndpointManager(ObjectProxy * pObjectProxy, Communicator* pComm, const string & sObjName, bool bFirstNetThread,const string& setName)
- : QueryEpBase(pComm, bFirstNetThread, false)
- ,_objectProxy(pObjectProxy)
- ,_lastRoundPosition(0)
- ,_update(true)
- ,_updateWeightInterval(60)
- ,_lastSWeightPosition(0)
- ,_consistentHashWeight(E_TC_CONHASH_KETAMAHASH)
- ,_consistentHash(E_TC_CONHASH_KETAMAHASH)
- {
- setNetThreadProcess(true);
- init(sObjName,_communicator->getProperty("locator"),setName);
- }
- EndpointManager::~EndpointManager()
- {
- map<string,AdapterProxy*>::iterator iterAdapter;
- for(iterAdapter = _allProxys.begin();iterAdapter != _allProxys.end();iterAdapter++)
- {
- if(iterAdapter->second)
- {
- delete iterAdapter->second;
- iterAdapter->second = NULL;
- }
- }
- }
- void EndpointManager::notifyEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive, bool bSync)
- {
- set<EndpointInfo>::const_iterator iter;
- map<string,AdapterProxy*>::iterator iterAdapter;
- pair<map<string,AdapterProxy*>::iterator,bool> result;
- _activeProxys.clear();
- _regProxys.clear();
- //更新active
- iter = active.begin();
- for(;iter != active.end();++iter)
- {
- if(!_direct && _weightType == E_STATIC_WEIGHT && iter->weight() <= 0)
- {
- continue;
- }
- iterAdapter = _allProxys.find(iter->desc());
- if(iterAdapter == _allProxys.end())
- {
- AdapterProxy* ap = new AdapterProxy(_objectProxy, *iter, _communicator);
- result = _allProxys.insert(make_pair(iter->desc(),ap));
- assert(result.second);
- iterAdapter = result.first;
- _vAllProxys.push_back(ap);
- }
- //该节点在主控的状态为active
- iterAdapter->second->setActiveInReg(true);
- _activeProxys.push_back(iterAdapter->second);
- _regProxys.insert(make_pair(iter->desc(),iterAdapter->second));
- //设置该节点的静态权重值
- iterAdapter->second->setWeight(iter->weight());
- }
- //更新inactive
- iter = inactive.begin();
- for(;iter != inactive.end();++iter)
- {
- if(!_direct && _weightType == E_STATIC_WEIGHT && iter->weight() <= 0)
- {
- continue;
- }
- iterAdapter = _allProxys.find(iter->desc());
- if(iterAdapter == _allProxys.end())
- {
- AdapterProxy* ap = new AdapterProxy(_objectProxy, *iter, _communicator);
- result = _allProxys.insert(make_pair(iter->desc(),ap));
- assert(result.second);
- iterAdapter = result.first;
- _vAllProxys.push_back(ap);
- }
- //该节点在主控的状态为inactive
- iterAdapter->second->setActiveInReg(false);
- _regProxys.insert(make_pair(iter->desc(),iterAdapter->second));
- //设置该节点的静态权重值
- iterAdapter->second->setWeight(iter->weight());
- }
- //_vRegProxys 需要按顺序来 重排
- _vRegProxys.clear();
- iterAdapter = _regProxys.begin();
- for(;iterAdapter != _regProxys.end();++iterAdapter)
- {
- _vRegProxys.push_back(iterAdapter->second);
- }
- _update = true;
- }
- void EndpointManager::doNotify()
- {
- _objectProxy->doInvoke();
- }
- bool EndpointManager::selectAdapterProxy(ReqMessage * msg,AdapterProxy * & pAdapterProxy)
- {
- pAdapterProxy = NULL;
- //刷新主控
- refreshReg(E_DEFAULT,"");
- //无效的数据 返回true
- if(!_valid)
- {
- return true;
- }
- //如果有hash,则先使用hash策略
- if (msg->bHash)
- {
- pAdapterProxy = getHashProxy(msg->iHashCode, msg->bConHash);
- return false;
- }
-
- if(_weightType == E_STATIC_WEIGHT)
- {
- //权重模式
- bool bStaticWeighted = false;
- if(_weightType == E_STATIC_WEIGHT || msg->eType == ReqMessage::ONE_WAY)
- bStaticWeighted = true;
- pAdapterProxy = getWeightedProxy(bStaticWeighted);
- }
- else
- {
- //普通轮询模式
- pAdapterProxy = getNextValidProxy();
- }
- return false;
- }
- AdapterProxy * EndpointManager::getNextValidProxy()
- {
- if (_activeProxys.empty())
- {
- TLOGERROR("[TARS][EndpointManager::getNextValidProxy activeEndpoints is empty][obj:"<<_objName<<"]" << endl);
- return NULL;
- }
- vector<AdapterProxy*> conn;
- for(size_t i=0;i<_activeProxys.size();i++)
- {
- ++_lastRoundPosition;
- if(_lastRoundPosition >= _activeProxys.size())
- _lastRoundPosition = 0;
- if(_activeProxys[_lastRoundPosition]->checkActive())
- {
- return _activeProxys[_lastRoundPosition];
- }
- if(!_activeProxys[_lastRoundPosition]->isConnTimeout() &&
- !_activeProxys[_lastRoundPosition]->isConnExc())
- conn.push_back(_activeProxys[_lastRoundPosition]);
- }
- if(conn.size() > 0)
- {
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return NULL;
- }
- AdapterProxy* EndpointManager::getHashProxy(int64_t hashCode, bool bConsistentHash)
- {
- if(_weightType == E_STATIC_WEIGHT)
- {
- if(bConsistentHash)
- {
- return getConHashProxyForWeight(hashCode, true);
- }
- else
- {
- return getHashProxyForWeight(hashCode, true, _hashStaticRouterCache);
- }
- }
- else
- {
- if(bConsistentHash)
- {
- return getConHashProxyForNormal(hashCode);
- }
- else
- {
- return getHashProxyForNormal(hashCode);
- }
- }
- }
- AdapterProxy* EndpointManager::getHashProxyForWeight(int64_t hashCode, bool bStatic, vector<size_t> &vRouterCache)
- {
- if(_vRegProxys.empty())
- {
- TLOGERROR("[TARS][EndpointManager::getHashProxyForWeight _vRegProxys is empty], bStatic:" << bStatic << endl);
- return NULL;
- }
- if(checkHashStaticWeightChange(bStatic))
- {
- int64_t iBegin = TNOWMS;
- updateHashProxyWeighted(bStatic);
- int64_t iEnd = TNOWMS;
- TLOGTARS("[TARS][EndpointManager::getHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
- }
- if(vRouterCache.size() > 0)
- {
- size_t hash = ((int64_t)hashCode) % vRouterCache.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= vRouterCache.size())
- {
- hash = hash % vRouterCache.size();
- }
- size_t iIndex = vRouterCache[hash];
- if(iIndex >= _vRegProxys.size())
- {
- iIndex = iIndex % _vRegProxys.size();
- }
- //被hash到的节点在主控是active的才走在流程
- if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive())
- {
- return _vRegProxys[iIndex];
- }
- else
- {
- if(_activeProxys.empty())
- {
- TLOGERROR("[TARS][EndpointManager::getHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
- return NULL;
- }
- //在active节点中再次hash
- vector<AdapterProxy*> thisHash = _activeProxys;
- vector<AdapterProxy*> conn;
- do
- {
- hash = ((int64_t)hashCode) % thisHash.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= thisHash.size())
- {
- hash = hash % thisHash.size();
- }
- if (thisHash[hash]->checkActive())
- {
- return thisHash[hash];
- }
- if(!thisHash[hash]->isConnTimeout() &&
- !thisHash[hash]->isConnExc())
- {
- conn.push_back(thisHash[hash]);
- }
- thisHash.erase(thisHash.begin() + hash);
- }
- while(!thisHash.empty());
- if(conn.size() > 0)
- {
- hash = ((int64_t)hashCode) % conn.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= conn.size())
- {
- hash = hash % conn.size();
- }
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy *adapterProxy = conn[hash];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return NULL;
- }
- }
- return getHashProxyForNormal(hashCode);
- }
- AdapterProxy* EndpointManager::getConHashProxyForWeight(int64_t hashCode, bool bStatic)
- {
- if(_vRegProxys.empty())
- {
- TLOGERROR("[TARS][EndpointManager::getConHashProxyForWeight _vRegProxys is empty], bStatic:" << bStatic << endl);
- return NULL;
- }
- if(checkConHashChange(bStatic, _lastConHashWeightProxys))
- {
- int64_t iBegin = TNOWMS;
- updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
- int64_t iEnd = TNOWMS;
- TLOGTARS("[TARS][EndpointManager::getConHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
- }
- if(_consistentHashWeight.size() > 0)
- {
- unsigned int iIndex = 0;
- // 通过一致性hash取到对应的节点
- _consistentHashWeight.getIndex(hashCode, iIndex);
- if(iIndex >= _vRegProxys.size())
- {
- iIndex = iIndex % _vRegProxys.size();
- }
- //被hash到的节点在主控是active的才走在流程
- if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive())
- {
- return _vRegProxys[iIndex];
- }
- else
- {
- if(_activeProxys.empty())
- {
- TLOGERROR("[TARS][EndpointManager::getConHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
- return NULL;
- }
- //在active节点中再次hash
- vector<AdapterProxy*> thisHash = _activeProxys;
- vector<AdapterProxy*> conn;
- size_t hash = 0;
- do
- {
- hash = ((int64_t)hashCode) % thisHash.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= thisHash.size())
- {
- hash = hash % thisHash.size();
- }
- if (thisHash[hash]->checkActive())
- {
- return thisHash[hash];
- }
- if(!thisHash[hash]->isConnTimeout() && !thisHash[hash]->isConnExc())
- {
- conn.push_back(thisHash[hash]);
- }
- thisHash.erase(thisHash.begin() + hash);
- }
- while(!thisHash.empty());
- if(conn.size() > 0)
- {
- hash = ((int64_t)hashCode) % conn.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= conn.size())
- {
- hash = hash % conn.size();
- }
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy *adapterProxy = conn[hash];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return NULL;
- }
- }
- return getHashProxyForNormal(hashCode);
- }
- bool EndpointManager::checkHashStaticWeightChange(bool bStatic)
- {
- if(bStatic)
- {
- if(_lastHashStaticProxys.size() != _vRegProxys.size())
- {
- return true;
- }
- for(size_t i = 0; i < _vRegProxys.size(); i++)
- {
- if(_lastHashStaticProxys[i]->getWeight() != _vRegProxys[i]->getWeight() || _lastHashStaticProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc())
- {
- return true;
- }
- }
- }
- return false;
- }
- bool EndpointManager::checkConHashChange(bool bStatic, const vector<AdapterProxy*> &vLastConHashProxys)
- {
- if(vLastConHashProxys.size() != _vRegProxys.size())
- {
- return true;
- }
- for(size_t i = 0; i < _vRegProxys.size(); i++)
- {
- if(bStatic)
- {
- if(vLastConHashProxys[i]->getWeight() != _vRegProxys[i]->getWeight() || vLastConHashProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc())
- {
- return true;
- }
- }
- else
- {
- if(vLastConHashProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc())
- {
- return true;
- }
- }
- }
- return false;
- }
- void EndpointManager::updateHashProxyWeighted(bool bStatic)
- {
- if(_vRegProxys.size() <= 0)
- {
- TLOGERROR("[TARS][EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
- return ;
- }
- if(bStatic)
- {
- _lastHashStaticProxys = _vRegProxys;
- _hashStaticRouterCache.clear();
- }
- vector<AdapterProxy*> vRegProxys;
- vector<size_t> vIndex;
- for(size_t i = 0; i < _vRegProxys.size(); ++i)
- {
- if(_vRegProxys[i]->getWeight() > 0)
- {
- vRegProxys.push_back(_vRegProxys[i]);
- vIndex.push_back(i);
- }
- }
- if(vRegProxys.size() <= 0)
- {
- TLOGERROR("[TARS][EndpointManager::updateHashProxyWeighted vRegProxys is empty], bStatic:" << bStatic << endl);
- return ;
- }
- size_t iHashStaticWeightSize = vRegProxys.size();
- map<size_t, int> mIdToWeight;
- multimap<int, size_t> mWeightToId;
- size_t iMaxR = 0;
- size_t iMaxRouterR = 0;
- size_t iMaxWeight = vRegProxys[0]->getWeight();
- size_t iMinWeight = vRegProxys[0]->getWeight();
- size_t iTempWeight = 0;
- for(size_t i = 1;i < iHashStaticWeightSize; i++)
- {
- iTempWeight = vRegProxys[i]->getWeight();
-
- if(iTempWeight > iMaxWeight)
- {
- iMaxWeight = iTempWeight;
- }
- if(iTempWeight < iMinWeight)
- {
- iMinWeight = iTempWeight;
- }
- }
- if(iMinWeight > 0)
- {
- iMaxR = iMaxWeight / iMinWeight;
- if(iMaxR < iMinWeightLimit)
- iMaxR = iMinWeightLimit;
- if(iMaxR > iMaxWeightLimit)
- iMaxR = iMaxWeightLimit;
- }
- else
- {
- iMaxR = 1;
- iMaxWeight = 1;
- }
- for(size_t i = 0;i < iHashStaticWeightSize; i++)
- {
- int iWeight = (vRegProxys[i]->getWeight() * iMaxR) / iMaxWeight;
- if(iWeight > 0)
- {
- iMaxRouterR += iWeight;
- mIdToWeight.insert(map<size_t, int>::value_type(vIndex[i], iWeight));
- mWeightToId.insert(make_pair(iWeight, vIndex[i]));
- }
- else
- {
- if(bStatic)
- {
- _hashStaticRouterCache.push_back(vIndex[i]);
- }
- }
-
- TLOGTARS("[TARS]EndpointManager::updateHashProxyWeighted bStatic:" << bStatic << "|_objName:" << _objName << "|endpoint:" << vRegProxys[i]->endpoint().desc() << "|iWeight:" << vRegProxys[i]->getWeight() << "|iWeightR:" << iWeight << "|iIndex:" << vIndex[i] << endl);
- }
- for(size_t i = 0; i < iMaxRouterR; i++)
- {
- bool bFirst = true;
- multimap<int, size_t> mulTemp;
- multimap<int, size_t>::reverse_iterator mIter = mWeightToId.rbegin();
- while(mIter != mWeightToId.rend())
- {
- if(bFirst)
- {
- bFirst = false;
- if(bStatic)
- {
- _hashStaticRouterCache.push_back(mIter->second);
- }
- mulTemp.insert(make_pair((mIter->first - iMaxRouterR + mIdToWeight[mIter->second]), mIter->second));
- }
- else
- {
- mulTemp.insert(make_pair((mIter->first + mIdToWeight[mIter->second]), mIter->second));
- }
- mIter++;
- }
- mWeightToId.clear();
- mWeightToId.swap(mulTemp);
- }
- }
- void EndpointManager::updateConHashProxyWeighted(bool bStatic, vector<AdapterProxy*> &vLastConHashProxys, TC_ConsistentHashNew &conHash)
- {
- if(_vRegProxys.size() <= 0)
- {
- TLOGERROR("[TARS][EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
- return ;
- }
- vLastConHashProxys = _vRegProxys;
- conHash.clear();
- for(size_t i = 0; i < _vRegProxys.size(); ++i)
- {
- int iWeight = (bStatic ? (_vRegProxys[i]->getWeight()) : 100);
- if(iWeight > 0)
- {
- iWeight = iWeight / 4;
- if(iWeight <= 0)
- {
- iWeight = 1;
- }
- conHash.addNode(_vRegProxys[i]->endpoint().desc(), i, iWeight);
- }
- }
- conHash.sortNode();
- }
- AdapterProxy* EndpointManager::getHashProxyForNormal(int64_t hashCode)
- {
- if(_vRegProxys.empty())
- {
- TLOGERROR("[TARS][EndpointManager::getHashProxyForNormal _vRegProxys is empty]" << endl);
- return NULL;
- }
- // 1 _vRegProxys从客户端启动之后,就不会再改变,除非有节点增加
- // 2 如果有增加节点,则_vRegProxys顺序会重新排序,之前的hash会改变
- // 3 节点下线后,需要下次启动客户端后,_vRegProxys内容才会生效
- size_t hash = ((int64_t)hashCode) % _vRegProxys.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= _vRegProxys.size())
- {
- hash = hash % _vRegProxys.size();
- }
- //被hash到的节点在主控是active的才走在流程
- if (_vRegProxys[hash]->isActiveInReg() && _vRegProxys[hash]->checkActive())
- {
- return _vRegProxys[hash];
- }
- else
- {
- if(_activeProxys.empty())
- {
- TLOGERROR("[TARS][EndpointManager::getHashProxyForNormal _activeEndpoints is empty]" << endl);
- return NULL;
- }
- //在active节点中再次hash
- vector<AdapterProxy*> thisHash = _activeProxys;
- vector<AdapterProxy*> conn;
- do
- {
- hash = ((int64_t)hashCode) % thisHash.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= thisHash.size())
- {
- hash = hash % thisHash.size();
- }
- if (thisHash[hash]->checkActive())
- {
- return thisHash[hash];
- }
- if(!thisHash[hash]->isConnTimeout() &&
- !thisHash[hash]->isConnExc())
- {
- conn.push_back(thisHash[hash]);
- }
- thisHash.erase(thisHash.begin() + hash);
- }
- while(!thisHash.empty());
- if(conn.size() > 0)
- {
- hash = ((int64_t)hashCode) % conn.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= conn.size())
- {
- hash = hash % conn.size();
- }
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy *adapterProxy = conn[hash];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return NULL;
- }
- }
- AdapterProxy* EndpointManager::getConHashProxyForNormal(int64_t hashCode)
- {
- if(_vRegProxys.empty())
- {
- TLOGERROR("[TARS][EndpointManager::getConHashProxyForNormal _vRegProxys is empty]" << endl);
- return NULL;
- }
- if(checkConHashChange(false, _lastConHashProxys))
- {
- int64_t iBegin = TNOWMS;
- updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
- int64_t iEnd = TNOWMS;
- TLOGTARS("[TARS][EndpointManager::getConHashProxyForNormal update _objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
- }
- if(_consistentHash.size() > 0)
- {
- unsigned int iIndex = 0;
- // 通过一致性hash取到对应的节点
- _consistentHash.getIndex(hashCode, iIndex);
- if(iIndex >= _vRegProxys.size())
- {
- iIndex = iIndex % _vRegProxys.size();
- }
- //被hash到的节点在主控是active的才走在流程
- if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive())
- {
- return _vRegProxys[iIndex];
- }
- else
- {
- if(_activeProxys.empty())
- {
- TLOGERROR("[TARS][EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
- return NULL;
- }
- //在active节点中再次hash
- vector<AdapterProxy*> thisHash = _activeProxys;
- vector<AdapterProxy*> conn;
- size_t hash = 0;
- do
- {
- hash = ((int64_t)hashCode) % thisHash.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= thisHash.size())
- {
- hash = hash % thisHash.size();
- }
- if (thisHash[hash]->checkActive())
- {
- return thisHash[hash];
- }
- if(!thisHash[hash]->isConnTimeout() &&
- !thisHash[hash]->isConnExc())
- {
- conn.push_back(thisHash[hash]);
- }
- thisHash.erase(thisHash.begin() + hash);
- }
- while(!thisHash.empty());
- if(conn.size() > 0)
- {
- hash = ((int64_t)hashCode) % conn.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= conn.size())
- {
- hash = hash % conn.size();
- }
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy *adapterProxy = conn[hash];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return NULL;
- }
- }
- return getHashProxyForNormal(hashCode);
- }
- AdapterProxy* EndpointManager::getWeightedProxy(bool bStaticWeighted)
- {
- return getWeightedForNormal(bStaticWeighted);
- }
- AdapterProxy* EndpointManager::getWeightedForNormal(bool bStaticWeighted)
- {
- if (_activeProxys.empty())
- {
- TLOGERROR("[TARS][EndpointManager::getWeightedForNormal activeEndpoints is empty][obj:"<<_objName<<"]" << endl);
- return NULL;
- }
- int64_t iNow = TNOW;
- if(_lastBuildWeightTime <= iNow)
- {
- updateProxyWeighted();
- if(!_first)
- {
- _lastBuildWeightTime = iNow + _updateWeightInterval;
- }
- else
- {
- _first = false;
- _lastBuildWeightTime = iNow + _updateWeightInterval + 5;
- }
- }
- bool bEmpty = false;
- int iActiveSize = _activeWeightProxy.size();
-
- if(iActiveSize > 0)
- {
- size_t iProxyIndex = 0;
- set<AdapterProxy*> sConn;
- if(_staticRouterCache.size() > 0)
- {
- for(size_t i = 0;i < _staticRouterCache.size(); i++)
- {
- ++_lastSWeightPosition;
- if(_lastSWeightPosition >= _staticRouterCache.size())
- _lastSWeightPosition = 0;
- iProxyIndex = _staticRouterCache[_lastSWeightPosition];
- if(_activeWeightProxy[iProxyIndex]->checkActive())
- {
- return _activeWeightProxy[iProxyIndex];
- }
- if(!_activeWeightProxy[iProxyIndex]->isConnTimeout() &&
- !_activeWeightProxy[iProxyIndex]->isConnExc())
- {
- sConn.insert(_activeWeightProxy[iProxyIndex]);
- }
- }
- }
- else
- {
- bEmpty = true;
- }
- if(!bEmpty)
- {
- if(sConn.size() > 0)
- {
- vector<AdapterProxy*> conn;
- set<AdapterProxy*>::iterator it_conn = sConn.begin();
- while(it_conn != sConn.end())
- {
- conn.push_back(*it_conn);
- ++it_conn;
- }
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeWeightProxy[((uint32_t)rand() % iActiveSize)];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return NULL;
- }
- }
- vector<AdapterProxy*> conn;
- for(size_t i=0;i<_activeProxys.size();i++)
- {
- ++_lastRoundPosition;
- if(_lastRoundPosition >= _activeProxys.size())
- _lastRoundPosition = 0;
- if(_activeProxys[_lastRoundPosition]->checkActive())
- {
- return _activeProxys[_lastRoundPosition];
- }
- if(!_activeProxys[_lastRoundPosition]->isConnTimeout() &&
- !_activeProxys[_lastRoundPosition]->isConnExc())
- conn.push_back(_activeProxys[_lastRoundPosition]);
- }
- if(conn.size() > 0)
- {
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return NULL;
- }
- void EndpointManager::updateProxyWeighted()
- {
- size_t iWeightProxySize = _activeProxys.size();
- if(iWeightProxySize <= 0)
- {
- TLOGERROR("[TARS]EndpointManager::updateProxyWeighted _objName:" << _objName << "|_activeProxys.size() <= 0" << endl);
- return ;
- }
- vector<AdapterProxy*> vProxy;
- for(size_t i = 0; i < _activeProxys.size(); ++i)
- {
- if(_activeProxys[i]->getWeight() > 0)
- {
- vProxy.push_back(_activeProxys[i]);
- }
- }
- iWeightProxySize = vProxy.size();
- if(iWeightProxySize <= 0)
- {
- TLOGERROR("[TARS]EndpointManager::updateProxyWeighted _objName:" << _objName << "|vProxy.size() <= 0" << endl);
- return ;
- }
- if(_update)
- {
- _activeWeightProxy = vProxy;
- updateStaticWeighted();
- }
- _update = false;
- }
- void EndpointManager::updateStaticWeighted()
- {
- size_t iWeightProxySize = _activeWeightProxy.size();
- vector<int> vWeight;
- vWeight.resize(iWeightProxySize);
- for(size_t i = 0; i < iWeightProxySize; i++)
- {
- vWeight[i] = _activeWeightProxy[i]->getWeight();
- }
- dispatchEndpointCache(vWeight);
- }
- void EndpointManager::dispatchEndpointCache(const vector<int> &vWeight)
- {
- if(vWeight.size() <= 0)
- {
- TLOGERROR("EndpointManager::dispatchEndpointCache vWeight.size() < 0" << endl);
- return ;
- }
- size_t iWeightProxySize = vWeight.size();
- map<size_t, int> mIdToWeight;
- multimap<int, size_t> mWeightToId;
- size_t iMaxR = 0;
- size_t iMaxRouterR = 0;
- size_t iMaxWeight = 0;
- size_t iMinWeight = 0;
- size_t iTotalCapacty = 0;
- size_t iTempWeight = 0;
- for(size_t i = 0; i < vWeight.size(); ++i)
- {
- iTotalCapacty += vWeight[i];
- }
- _staticRouterCache.clear();
- _lastSWeightPosition = 0;
- _staticRouterCache.reserve(iTotalCapacty+100);
- iMaxWeight = vWeight[0];
- iMinWeight = vWeight[0];
- for(size_t i = 1;i < iWeightProxySize; i++)
- {
- iTempWeight = vWeight[i];
-
- if(iTempWeight > iMaxWeight)
- {
- iMaxWeight = iTempWeight;
- }
- if(iTempWeight < iMinWeight)
- {
- iMinWeight = iTempWeight;
- }
- }
- if(iMinWeight > 0)
- {
- iMaxR = iMaxWeight / iMinWeight;
- if(iMaxR < iMinWeightLimit)
- iMaxR = iMinWeightLimit;
- if(iMaxR > iMaxWeightLimit)
- iMaxR = iMaxWeightLimit;
- }
- else
- {
- iMaxR = 1;
- iMaxWeight = 1;
- }
- for(size_t i = 0;i < iWeightProxySize; i++)
- {
- int iWeight = (vWeight[i] * iMaxR) / iMaxWeight;
- if(iWeight > 0)
- {
- iMaxRouterR += iWeight;
- mIdToWeight.insert(map<size_t, int>::value_type(i, iWeight));
- mWeightToId.insert(make_pair(iWeight, i));
- }
- else
- {
- _staticRouterCache.push_back(i);
- }
-
- TLOGTARS("[TARS]EndpointManager::dispatchEndpointCache _objName:" << _objName << "|endpoint:" << _activeWeightProxy[i]->endpoint().desc() << "|iWeightR:" << iWeight << endl);
- }
- for(size_t i = 0; i < iMaxRouterR; i++)
- {
- bool bFirst = true;
- multimap<int, size_t> mulTemp;
- multimap<int, size_t>::reverse_iterator mIter = mWeightToId.rbegin();
- while(mIter != mWeightToId.rend())
- {
- if(bFirst)
- {
- bFirst = false;
- _staticRouterCache.push_back(mIter->second);
- mulTemp.insert(make_pair((mIter->first - iMaxRouterR + mIdToWeight[mIter->second]), mIter->second));
- }
- else
- {
- mulTemp.insert(make_pair((mIter->first + mIdToWeight[mIter->second]), mIter->second));
- }
- mIter++;
- }
- mWeightToId.clear();
- mWeightToId.swap(mulTemp);
- }
- }
- /////////////////////////////////////////////////////////////////////////////
- EndpointThread::EndpointThread(Communicator* pComm, const string & sObjName, GetEndpointType type, const string & sName, bool bFirstNetThread)
- : QueryEpBase(pComm,bFirstNetThread,true)
- , _type(type)
- , _name(sName)
- {
- init(sObjName,_communicator->getProperty("locator"));
- }
- void EndpointThread::getEndpoints(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- //直连调用这个接口无效
- if(_direct)
- {
- return ;
- }
- {
- TC_LockT<TC_SpinLock> lock(_mutex);
- // TC_ThreadLock::Lock lock(_mutex);
-
- refreshReg(_type,_name);
-
- activeEndPoint = _activeEndPoint;
- inactiveEndPoint = _inactiveEndPoint;
- }
- }
- void EndpointThread::getTCEndpoints(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- //直连调用这个接口无效
- if(_direct)
- {
- return ;
- }
- {
-
- TC_LockT<TC_SpinLock> lock(_mutex);
-
- refreshReg(_type,_name);
-
- activeEndPoint = _activeTCEndPoint;
- inactiveEndPoint = _inactiveTCEndPoint;
- }
- }
- void EndpointThread::notifyEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive, bool bSync)
- {
- if(!bSync)
- {
- TC_LockT<TC_SpinLock> lock(_mutex);
- update(active, inactive);
- }
- else
- {
- update(active, inactive);
- }
- }
- void EndpointThread::update(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
- {
- _activeEndPoint.clear();
- _inactiveEndPoint.clear();
- _activeTCEndPoint.clear();
- _inactiveTCEndPoint.clear();
- set<EndpointInfo>::iterator iter= active.begin();
- for(;iter != active.end(); ++iter)
- {
- TC_Endpoint ep(iter->host(), iter->port(), 3000, iter->type(), iter->grid());
- _activeTCEndPoint.push_back(ep);
- _activeEndPoint.push_back(*iter);
- }
- iter = inactive.begin();
- for(;iter != inactive.end(); ++iter)
- {
- TC_Endpoint ep(iter->host(), iter->port(), 3000, iter->type(), iter->grid());
- _inactiveTCEndPoint.push_back(ep);
- _inactiveEndPoint.push_back(*iter);
- }
- }
- /////////////////////////////////////////////////////////////////////////////
- EndpointManagerThread::EndpointManagerThread(Communicator * pComm,const string & sObjName)
- :_communicator(pComm)
- ,_objName(sObjName)
- {
- }
- EndpointManagerThread::~EndpointManagerThread()
- {
- map<string,EndpointThread*>::iterator iter;
- for(iter=_info.begin();iter != _info.end();iter++)
- {
- if(iter->second)
- {
- delete iter->second;
- iter->second = NULL;
- }
- }
- }
- void EndpointManagerThread::getEndpoint(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_DEFAULT,"");
- pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getEndpointByAll(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_ALL,"");
- pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getEndpointBySet(const string sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_SET,sName);
- pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getEndpointByStation(const string sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_STATION,sName);
- pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getTCEndpoint(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_DEFAULT,"");
- pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getTCEndpointByAll(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_ALL,"");
- pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getTCEndpointBySet(const string sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_SET,sName);
- pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getTCEndpointByStation(const string sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_STATION,sName);
- pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
- }
- EndpointThread * EndpointManagerThread::getEndpointThread(GetEndpointType type,const string & sName)
- {
- TC_LockT<TC_SpinLock> lock(_mutex);
- string sAllName = TC_Common::tostr((int)type) + ":" + sName;
- map<string,EndpointThread*>::iterator iter;
- iter = _info.find(sAllName);
- if(iter != _info.end())
- {
- return iter->second;
- }
- EndpointThread * pThread = new EndpointThread(_communicator, _objName, type, sName);
- _info[sAllName] = pThread;
- return pThread;
- }
- }
|