123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070 |
- /**
- * Tencent is pleased to support the open source community by making Tars available.
- *
- * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
- *
- * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * https://opensource.org/licenses/BSD-3-Clause
- *
- * Unless required by applicable law or agreed to in writing, software distributed
- * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
- #include "util/tc_port.h"
- #include "servant/EndpointManager.h"
- #include "servant/ObjectProxy.h"
- #include "servant/RemoteLogger.h"
- #include "servant/AppCache.h"
- #include "servant/Application.h"
- #include "servant/CommunicatorEpoll.h"
- namespace tars
- {
- std::mutex QueryEpBase::_mutex;
- QueryPushFImpPtr QueryEpBase::_queryCallback;
- void QueryPushFImp::replacePrx(QueryFPrx queryFPrx)
- {
- std::lock_guard<std::mutex> lock(_mutex);
- _queryFPrx = queryFPrx;
- for(auto it : _queryBase)
- {
- _queryFPrx->tars_hash((uint64_t)_queryFPrx.get())->async_registerQuery(NULL, it.first, _queryFPrx->tars_communicator()->clientConfig().ModuleName);
- }
- }
- void QueryPushFImp::registerQuery(const string &obj, QueryEpBase *pQueryBase)
- {
- {
- std::lock_guard<std::mutex> lock(_mutex);
- _queryBase[obj].insert(pQueryBase);
- }
- _queryFPrx->tars_hash((uint64_t)_queryFPrx.get())->async_registerQuery(NULL, obj, _queryFPrx->tars_communicator()->clientConfig().ModuleName);
- }
- void QueryPushFImp::onConnect(const TC_Endpoint& ep)
- {
- std::lock_guard<std::mutex> lock(_mutex);
- for(auto it : _queryBase)
- {
- _queryFPrx->tars_hash((uint64_t)_queryFPrx.get())->async_registerQuery(NULL, it.first, _queryFPrx->tars_communicator()->clientConfig().ModuleName);
- }
- }
- void QueryPushFImp::callback_onQuery(const std::string& obj)
- {
- // LOG_CONSOLE_DEBUG << obj << endl;
- std::lock_guard<std::mutex> lock(_mutex);
- auto it = _queryBase.find(obj);
- if(it != _queryBase.end())
- {
- for(auto &e : it->second)
- {
- e->resetRefreshTime();
- }
- }
- }
- /////////////////////////////////////////////////////////////////////////////
- QueryEpBase::QueryEpBase(Communicator * pComm, bool bFirstNetThread,bool bInterfaceReq)
- : _communicator(pComm)
- , _firstNetThread(bFirstNetThread)
- , _interfaceReq(bInterfaceReq)
- , _direct(false)
- , _objName("")
- , _invokeSetId("")
- , _locator("")
- , _valid(false)
- , _weightType(E_LOOP)
- , _rootServant(true)
- , _requestRegistry(false)
- , _requestTimeout(0)
- , _timeoutInterval(5*1000)
- , _refreshTime(0)
- , _refreshInterval(60*1000)
- , _activeEmptyInterval(10*1000)
- , _failInterval(2*1000)
- , _manyFailInterval(30*1000)
- , _failTimesLimit(3)
- , _failTimes(0)
- {
- _refreshInterval = TC_Common::strto<int>(_communicator->getProperty("refresh-endpoint-interval", "60*1000"));
- if(_refreshInterval < 5*1000)
- {
- _refreshInterval = 5 * 1000;
- }
- setNoDelete(true);
- }
- void QueryEpBase::callback_findObjectById4All(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
- {
- TLOGTARS("[callback_findObjectById4All _objName:" << _objName << "|ret:" << ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectById4All_exception(Int32 ret)
- {
- TLOGERROR("[callback_findObjectById4All_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- void QueryEpBase::callback_findObjectById4Any(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
- {
- TLOGTARS("[callback_findObjectById4Any _objName:" << _objName << "|ret:" << ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectById4Any_exception(Int32 ret)
- {
- TLOGERROR("[callback_findObjectById4Any_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameGroup(Int32 ret, const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp)
- {
- TLOGTARS("[callback_findObjectByIdInSameGroup _objName:" << _objName << "|ret:"<<ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameGroup_exception(Int32 ret)
- {
- TLOGERROR("[callback_findObjectByIdInSameGroup_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameSet( Int32 ret, const vector<EndpointF> &activeEp, const vector<EndpointF> & inactiveEp)
- {
- TLOGTARS("[callback_findObjectByIdInSameSet _objName:" << _objName << "|ret:" << ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameSet_exception( Int32 ret)
- {
- TLOGERROR("[callback_findObjectByIdInSameSet_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameStation( Int32 ret, const vector<EndpointF> &activeEp, const vector<EndpointF> &inactiveEp)
- {
- TLOGTARS("[callback_findObjectByIdInSameStation _objName:" << _objName << "|ret:" << ret
- << ",active:" << activeEp.size()
- << ",inactive:" << inactiveEp.size() << "]" << endl);
- doEndpoints(activeEp,inactiveEp,ret);
- }
- void QueryEpBase::callback_findObjectByIdInSameStation_exception( Int32 ret)
- {
- TLOGERROR("[callback_findObjectByIdInSameStation_exception _objName:" << _objName << "|ret:" << ret << "]" << endl);
- doEndpointsExp(ret);
- }
- int QueryEpBase::setLocatorPrx(QueryFPrx prx)
- {
- std::lock_guard<std::mutex> lock(_mutex);
- if(_queryFPrx && _queryFPrx.get() != prx.get())
- {
- prx->tars_set_push_callback(_queryFPrx->tars_get_push_callback());
- }
- _queryFPrx = prx;
- return 0;
- }
- void QueryEpBase::resetRefreshTime()
- {
- _refreshTime = 0;
- }
- bool QueryEpBase::init(const string & sObjName, const string& setName, bool rootServant)
- {
- _locator = _communicator->getProperty("locator");
- TLOGTARS("QueryEpBase::init sObjName:" << sObjName << ", sLocator:" << _locator << ", setName:" << setName << ", rootServant: " << rootServant << endl);
- _invokeSetId = setName;
- _rootServant = rootServant;
- setObjName(sObjName);
- return true;
- }
- void QueryEpBase::setObjName(const string & sObjName)
- {
- string::size_type pos = sObjName.find_first_of('@');
- string sEndpoints;
- string sInactiveEndpoints;
- if (pos != string::npos)
- {
- //[直接连接]指定服务的IP和端口列表
- _objName = sObjName.substr(0,pos);
- sEndpoints = sObjName.substr(pos + 1);
- pos = _objName.find_first_of("#");
- if(pos != string::npos)
- {
- _objName = _objName.substr(0, pos);
- }
- _direct = true;
- _valid = true;
- }
- else
- {
- //[间接连接]通过registry查询服务端的IP和端口列表
- _direct = false;
- _valid = false;
- _objName = sObjName;
- if(_locator.find_first_not_of('@') == string::npos)
- {
- TLOGERROR("[QueryEpBase::setObjName locator is not valid,_locator:" << _locator << "]" << endl);
- throw TarsRegistryException("locator is not valid,_locator:" + _locator);
- }
- pos = _objName.find_first_of("#");
- if(pos != string::npos)
- {
- _objName = _objName.substr(0, pos);
- }
- _queryFPrx = _communicator->stringToProxy<QueryFPrx>(_locator);
- if(this->_communicator->getProperty("open-query-push", "n") == "y")
- {
- {
- std::lock_guard<std::mutex> lock(_mutex);
- if (!_queryCallback)
- {
- _queryCallback = new QueryPushFImp(_queryFPrx);
- _queryFPrx->tars_set_push_callback(_queryCallback);
- }
- }
- _queryCallback->registerQuery(_objName, this);
- }
- string sLocatorKey = _locator;
- //如果启用set,则获取按set分组的缓存
- if(_communicator->clientConfig().SetOpen)
- {
- sLocatorKey += "_" + _communicator->clientConfig().SetDivision;
- }
- string objName = _objName + string(_invokeSetId.empty() ? "" : ":") + _invokeSetId;
- //[间接连接]第一次使用cache,如果是接口级请求则不从缓存读取
- if(!_interfaceReq)
- {
- sEndpoints = _communicator->getAppCache()->get(objName,sLocatorKey);
- sInactiveEndpoints = _communicator->getAppCache()->get("inactive_"+objName,sLocatorKey);
- }
- }
- setEndpoints(sEndpoints,_activeEndpoints);
- setEndpoints(sInactiveEndpoints,_inactiveEndpoints);
- if(!_activeEndpoints.empty())
- {
- _valid = true;
- }
- if((!_activeEndpoints.empty() || !_inactiveEndpoints.empty()))
- {
- //非直接指定端口, 且从cache中能查到服务端口的, 不需要通知所有ObjectProxy更新地址
- notifyEndpoints(_activeEndpoints,_inactiveEndpoints,true);
- }
- }
- void QueryEpBase::setEndpoints(const string & sEndpoints, set<EndpointInfo> & setEndpoints)
- {
- if(sEndpoints == "")
- {
- return ;
- }
- bool bSameWeightType = true;
- bool bFirstWeightType = true;
- unsigned int iWeightType = 0;
- vector<string> vEndpoints = TC_Endpoint::sepEndpoint(sEndpoints);
- for (size_t i = 0; i < vEndpoints.size(); ++i)
- {
- try
- {
- TC_Endpoint ep(vEndpoints[i]);
- string sSetDivision;
- //解析set分组信息
- if (!_direct)
- {
- string sep = " -s ";
- size_t pos = vEndpoints[i].rfind(sep);
- if (pos != string::npos)
- {
- sSetDivision = TC_Common::trim(vEndpoints[i].substr(pos+sep.size()));
- size_t endPos = sSetDivision.find(" ");
- if (endPos != string::npos)
- {
- sSetDivision = sSetDivision.substr(0, endPos);
- }
- }
- }
- if(bFirstWeightType)
- {
- bFirstWeightType = false;
- iWeightType = ep.getWeightType();
- }
- else
- {
- if(ep.getWeightType() != iWeightType)
- {
- bSameWeightType = false;
- }
- }
- EndpointInfo epi(ep, sSetDivision);
- setEndpoints.insert(epi);
- }
- catch (exception &ex)
- {
- TLOGERROR("[QueryEpBase::setEndpoints parse error,objname:" << _objName << ",endpoint:" << vEndpoints[i] << "]" << endl);
- }
- }
- if(bSameWeightType)
- {
- if(iWeightType == 1)
- {
- _weightType = E_STATIC_WEIGHT;
- }
- else
- {
- _weightType = E_LOOP;
- }
- }
- else
- {
- _weightType = E_LOOP;
- }
- }
- void QueryEpBase::refreshReg(GetEndpointType type, const string & sName)
- {
- onUpdateOutter();
- if(_direct)
- {
- return;
- }
- // LOG_CONSOLE_DEBUG << this->_objName << ", _requestRegistry:" << _requestRegistry << ", _requestTimeout:" << _requestTimeout << ", _refreshTime:" << _refreshTime << endl;
- int64_t iNow = TNOWMS;
- //正在请求状态 而且请求超时了,或者第一次
- if(_requestRegistry && _requestTimeout < iNow)
- {
- doEndpointsExp(0);
- }
- //如果是间接连接,通过registry定时查询服务列表
- //正在请求状态 而且请求超时了
- //非请求状态 到了下一个刷新时间了
- if( (!_requestRegistry) && (_refreshTime <= iNow))
- {
- _requestRegistry = true;
- //一定时间不回调就算超时了
- _requestTimeout = iNow + _timeoutInterval;
- TLOGTARS("[QueryEpBase::refresh," << _objName << "]" <<endl);
- if(_valid && !_rootServant && _valid)
- {
- return;
- }
- //判断是同步调用还是异步调用
- //内部请求主控都是异步请求
- //接口请求主控第一次是同步请求
- bool bSync = (!_valid && _interfaceReq);
- //如果是异步且不是根servant(通过#1创建的servant, 不主动更新主控信息)
- if(!bSync && !_rootServant)
- return;
- try
- {
- if(bSync)
- {
- vector<EndpointF> activeEp;
- vector<EndpointF> inactiveEp;
- int iRet = 0;
- switch(type)
- {
- case E_ALL:
- {
- iRet = _queryFPrx->tars_hash((uint64_t)_queryFPrx.get())->findObjectById4Any(_objName,activeEp,inactiveEp, ServerConfig::Context);
- break;
- }
- case E_STATION:
- {
- iRet = _queryFPrx->tars_hash((uint64_t)_queryFPrx.get())->findObjectByIdInSameStation(_objName,sName,activeEp,inactiveEp, ServerConfig::Context);
- break;
- }
- case E_SET:
- {
- iRet = _queryFPrx->tars_hash((uint64_t)_queryFPrx.get())->findObjectByIdInSameSet(_objName,sName,activeEp,inactiveEp, ServerConfig::Context);
- break;
- }
- case E_DEFAULT:
- default:
- {
- if(_communicator->clientConfig().SetOpen || !_invokeSetId.empty())
- {
- //指定set调用时,指定set的优先级最高
- string setId = _invokeSetId.empty()?_communicator->clientConfig().SetDivision : _invokeSetId;
- iRet = _queryFPrx->tars_hash((uint64_t)_queryFPrx.get())->findObjectByIdInSameSet(_objName,setId,activeEp,inactiveEp, ServerConfig::Context);
- }
- else
- {
- iRet = _queryFPrx->tars_hash((uint64_t)_queryFPrx.get())->findObjectByIdInSameGroup(_objName,activeEp,inactiveEp, ServerConfig::Context);
- }
- break;
- }
- }
- doEndpoints(activeEp, inactiveEp, iRet, true);
- }
- else
- {
- switch(type)
- {
- case E_ALL:
- {
- _queryFPrx->tars_hash((uint64_t)_queryFPrx.get())->async_findObjectById4Any(this,_objName, ServerConfig::Context);
- break;
- }
- case E_STATION:
- {
- _queryFPrx->tars_hash((uint64_t)_queryFPrx.get())->async_findObjectByIdInSameStation(this,_objName,sName, ServerConfig::Context);
- break;
- }
- case E_SET:
- {
- _queryFPrx->tars_hash((uint64_t)_queryFPrx.get())->async_findObjectByIdInSameSet(this,_objName,sName, ServerConfig::Context);
- break;
- }
- case E_DEFAULT:
- default:
- {
- if(_communicator->clientConfig().SetOpen || !_invokeSetId.empty())
- {
- //指定set调用时,指定set的优先级最高
- string setId = _invokeSetId.empty()?_communicator->clientConfig().SetDivision:_invokeSetId;
- _queryFPrx->tars_hash((uint64_t)_queryFPrx.get())->async_findObjectByIdInSameSet(this,_objName,setId, ServerConfig::Context);
- }
- else
- {
- _queryFPrx->tars_hash((uint64_t)_queryFPrx.get())->async_findObjectByIdInSameGroup(this,_objName, ServerConfig::Context);
- }
- break;
- }
- }//end switch
- }
- }
- catch(TC_Exception & ex)
- {
- TLOGERROR("[QueryEpBase::refreshReg obj:"<<_objName<<"exception:"<<ex.what() << "]"<<endl);
- doEndpointsExp(TARSSERVERUNKNOWNERR);
- }
- catch(...)
- {
- TLOGERROR("[QueryEpBase::refreshReg obj:"<<_objName<<"unknown exception]" <<endl);
- doEndpointsExp(TARSSERVERUNKNOWNERR);
- }
- }
- }
- void QueryEpBase::doEndpoints(const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp, int iRet, bool bSync)
- {
- if(iRet != 0)
- {
- doEndpointsExp(iRet);
- return ;
- }
- _failTimes = 0;
- _requestRegistry = false;
- int64_t iNow = TNOWMS;
- //有返回成功的结点,按照正常的频率
- //如果返回空列表或者返回失败 2s去刷新一次
- //接口请求主控的方式 不管是不是空都要去刷新
- if(activeEp.empty() && (!_interfaceReq) )
- {
- _refreshTime = iNow + _activeEmptyInterval;
- //如果registry返回Active服务列表为空,不做更新
- TLOGERROR("[QueryEpBase::doEndpoints, callback activeEps is empty,objname:"<< _objName << "]" << endl);
- return;
- }
- else
- {
- _refreshTime = iNow + _refreshInterval;
- }
- bool bNeedNotify = false;
- bool bSameWeightType = true;
- bool bFirstWeightType = true;
- int iWeightType = 0;
- set<string> sActiveEndpoints;
- set<string> sInactiveEndpoints;
- set<EndpointInfo> activeEps;
- set<EndpointInfo> inactiveEps;
- //生成active set 用于比较
- for (uint32_t i = 0; i < activeEp.size(); ++i)
- {
- if(bFirstWeightType)
- {
- bFirstWeightType = false;
- iWeightType = activeEp[i].weightType;
- }
- else
- {
- if(activeEp[i].weightType != iWeightType)
- {
- bSameWeightType = false;
- }
- }
- // taf istcp意思和这里枚举值对应
- activeEps.insert(EndpointInfo(activeEp[i]));
- }
- //生成inactive set 用于比较
- for (uint32_t i = 0; i < inactiveEp.size(); ++i)
- {
- // taf istcp意思和这里枚举值对应
- inactiveEps.insert(EndpointInfo(inactiveEp[i]));
- }
- if(bSameWeightType)
- {
- if(iWeightType == 1)
- {
- _weightType = E_STATIC_WEIGHT;
- }
- else
- {
- _weightType = E_LOOP;
- }
- }
- else
- {
- _weightType = E_LOOP;
- }
- if(activeEps != _activeEndpoints)
- {
- bNeedNotify = true;
- _activeEndpoints = activeEps;
- if(_firstNetThread)
- {
- setEndPointToCache(false);
- }
- }
- if(inactiveEps != _inactiveEndpoints)
- {
- bNeedNotify = true;
- _inactiveEndpoints = inactiveEps;
- if(_firstNetThread)
- {
- setEndPointToCache(true);
- }
- }
- if(bNeedNotify)
- {
- notifyEndpoints(_activeEndpoints,_inactiveEndpoints,bSync);
- }
- if(!_valid)
- {
- _valid = true;
- doNotify();
- }
- }
- void QueryEpBase::doEndpointsExp(int iRet)
- {
- _failTimes++;
- _requestRegistry = false;
- int64_t iNow = TNOWMS;
- //频率控制获取主控失败 2秒钟再更新
- _refreshTime = iNow + _failInterval;
- //获取主控连续失败3次就等30s再更新一次
- //连续失败 强制设成数据是有效的
- if(_failTimes > _failTimesLimit)
- {
- if(!_valid)
- {
- _valid = true;
- doNotify();
- }
- _refreshTime = iNow + _manyFailInterval;
- }
- }
- void QueryEpBase::setEndPointToCache(bool bInactive)
- {
- //如果是接口级请求则不缓存到文件
- if(_interfaceReq)
- {
- return;
- }
- string sEndpoints;
- set<EndpointInfo> doEndpoints;
- if(!bInactive)
- {
- doEndpoints = _activeEndpoints;
- }
- else
- {
- doEndpoints = _inactiveEndpoints;
- }
- set<EndpointInfo>::iterator iter;
- iter = doEndpoints.begin();
- for (; iter != doEndpoints.end(); ++iter)
- {
- //这里的超时时间 只是对服务端有效。这里的值无效。所以默认用3000了
- TC_Endpoint ep = iter->getEndpoint();
- if (!sEndpoints.empty())
- {
- sEndpoints += ":";
- }
- sEndpoints += ep.toString();
- if (!iter->setDivision().empty())
- {
- sEndpoints += " -s " + iter->setDivision();
- }
- }
- //如果启用set,则按set分组保存
- string sLocatorKey = _locator;
- if(_communicator->clientConfig().SetOpen)
- {
- sLocatorKey += "_" + _communicator->clientConfig().SetDivision;
- }
- string objName = _objName + string(_invokeSetId.empty()?"":":") + _invokeSetId;
- if(bInactive)
- {
- _communicator->getAppCache()->set("inactive_"+objName,sEndpoints,sLocatorKey);
- }
- else
- {
- _communicator->getAppCache()->set(objName,sEndpoints,sLocatorKey);
- }
- TLOGTARS("[setEndPointToCache,obj:" << _objName << ",invokeSetId:" << _invokeSetId << ",endpoint:" << sEndpoints << "]" << endl);
- }
- EndpointManager::EndpointManager(ObjectProxy * pObjectProxy, Communicator* pComm, bool bFirstNetThread)
- : QueryEpBase(pComm, bFirstNetThread, false)
- ,_objectProxy(pObjectProxy)
- ,_lastRoundPosition(0)
- ,_update(true)
- ,_updateWeightInterval(60)
- ,_lastSWeightPosition(0)
- ,_consistentHashWeight(E_TC_CONHASH_KETAMAHASH)
- ,_consistentHash(E_TC_CONHASH_KETAMAHASH)
- {
- setNetThreadProcess(true);
- }
- EndpointManager::~EndpointManager()
- {
- map<string,AdapterProxy*>::iterator iterAdapter;
- for(iterAdapter = _allProxys.begin();iterAdapter != _allProxys.end();iterAdapter++)
- {
- if(iterAdapter->second)
- {
- delete iterAdapter->second;
- iterAdapter->second = NULL;
- }
- }
- _allProxys.clear();
- }
- void EndpointManager::onUpdateOutter()
- {
- // LOG_CONSOLE_DEBUG << this->_objectProxy << ", valid:" << _valid << ", " << _outterUpdate.get() << endl;
- assert(this->_objectProxy->getCommunicatorEpoll()->getThreadId() == this_thread::get_id());
- lock_guard<mutex> l(_outterLocker);
- if(_outterUpdate)
- {
- shared_ptr<OutterUpdate> outterUpdate = _outterUpdate;
- updateEndpoints(outterUpdate->active, outterUpdate->inactive);
- _valid = true;
- _outterUpdate.reset();
- }
- }
- void EndpointManager::updateEndpointsOutter(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
- {
- // LOG_CONSOLE_DEBUG << this->_objectProxy << ", " << active.begin()->desc() << endl;
- //创新新对象, 避免线程冲突
- shared_ptr<OutterUpdate> outterUpdate = std::make_shared<OutterUpdate>();
- outterUpdate->active = active;
- outterUpdate->inactive = inactive;
- //更新时间
- _refreshTime = TNOWMS + _refreshInterval;
- lock_guard<mutex> l(_outterLocker);
- _outterUpdate = outterUpdate;
- }
- void EndpointManager::updateEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
- {
- TLOGTARS("[EndpointManager::updateEndpoints obj:" << this->_objName << ", active:" << active.size() << ", inactive size:" << inactive.size() << endl);
- pair<map<string,AdapterProxy*>::iterator,bool> result;
- _activeProxys.clear();
- _regProxys.clear();
- _indexActiveProxys.clear();
- _sortActivProxys.clear();
- if(!active.empty())
- {
- //先把服务都设置为非活跃
- for (auto iter = _allProxys.begin(); iter != _allProxys.end(); ++iter)
- {
- iter->second->setActiveInReg(false);
- }
- }
- //更新active
- for(auto iter = active.begin(); iter != active.end(); ++iter)
- {
- if(!_direct && _weightType == E_STATIC_WEIGHT && iter->weight() <= 0)
- {
- continue;
- }
- // LOG_CONSOLE_DEBUG << std::this_thread::get_id() << ", allProxys size:" << _allProxys.size() << ", " << iter->cmpDesc() << endl;
- auto iterAdapter = _allProxys.find(iter->cmpDesc());
- if(iterAdapter == _allProxys.end())
- {
- AdapterProxy* ap = new AdapterProxy(_objectProxy, *iter, _communicator);
- result = _allProxys.insert(make_pair(iter->cmpDesc(),ap));
- iterAdapter = result.first;
- _vAllProxys.push_back(ap);
- }
- //该节点在主控的状态为active
- iterAdapter->second->setActiveInReg(true);
- _activeProxys.push_back(iterAdapter->second);
- _regProxys.insert(make_pair(iter->cmpDesc(),iterAdapter->second));
- const string &host = iterAdapter->second->endpoint().host();
- _indexActiveProxys.insert(make_pair(host, iterAdapter->second));
- _sortActivProxys.insert(make_pair(host, iterAdapter->second));
- //设置该节点的静态权重值
- iterAdapter->second->setWeight(iter->weight());
- }
- //更新inactive
- for(auto iter = inactive.begin(); iter != inactive.end(); ++iter)
- {
- if(!_direct && _weightType == E_STATIC_WEIGHT && iter->weight() <= 0)
- {
- continue;
- }
- auto iterAdapter = _allProxys.find(iter->cmpDesc());
- if(iterAdapter == _allProxys.end())
- {
- AdapterProxy* ap = new AdapterProxy(_objectProxy, *iter, _communicator);
- result = _allProxys.insert(make_pair(iter->cmpDesc(),ap));
- assert(result.second);
- iterAdapter = result.first;
- _vAllProxys.push_back(ap);
- }
- //该节点在主控的状态为inactive
- iterAdapter->second->setActiveInReg(false);
- _regProxys.insert(make_pair(iter->cmpDesc(),iterAdapter->second));
- //设置该节点的静态权重值
- iterAdapter->second->setWeight(iter->weight());
- }
- //_vRegProxys 需要按顺序来 重排
- _vRegProxys.clear();
- auto iterAdapter = _regProxys.begin();
- for(;iterAdapter != _regProxys.end();++iterAdapter)
- {
- _vRegProxys.push_back(iterAdapter->second);
- }
- _update = true;
- }
- void EndpointManager::notifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive,bool bNotify)
- {
- updateEndpoints(active, inactive);
- //丢给外层统一做
- _objectProxy->onNotifyEndpoints(active, inactive);
- }
- void EndpointManager::doNotify()
- {
- _objectProxy->doInvoke();
- }
- bool EndpointManager::selectAdapterProxy(ReqMessage * msg,AdapterProxy * & pAdapterProxy)
- {
- pAdapterProxy = NULL;
- //刷新主控
- refreshReg(E_DEFAULT, "");
- //无效的数据 返回true
- if (!_valid)
- {
- return true;
- }
- //如果有hash,则先使用hash策略
- if (msg->data._hash)
- {
- pAdapterProxy = getHashProxy(msg->data._hashCode, msg->data._conHash);
- return false;
- }
-
- if(_weightType == E_STATIC_WEIGHT)
- {
- //权重模式
- bool bStaticWeighted = false;
- if(_weightType == E_STATIC_WEIGHT || msg->eType == ReqMessage::ONE_WAY)
- bStaticWeighted = true;
- pAdapterProxy = getWeightedProxy(bStaticWeighted);
- }
- else
- {
- //普通轮询模式
- pAdapterProxy = getNextValidProxy();
- }
- return false;
- }
- AdapterProxy * EndpointManager::getNextValidProxy()
- {
- if (_activeProxys.empty())
- {
- TLOGERROR("[EndpointManager::getNextValidProxy activeEndpoints is empty][obj:"<<_objName<<"]" << endl);
- return NULL;
- }
- vector<AdapterProxy*> conn;
- for(size_t i=0;i<_activeProxys.size();i++)
- {
- ++_lastRoundPosition;
- if(_lastRoundPosition >= _activeProxys.size())
- {
- _lastRoundPosition = 0;
- }
- if(_activeProxys[_lastRoundPosition]->checkActive(false))
- {
- return _activeProxys[_lastRoundPosition];
- }
- if(!_activeProxys[_lastRoundPosition]->isConnTimeout() && !_activeProxys[_lastRoundPosition]->isConnExc()) {
- conn.push_back(_activeProxys[_lastRoundPosition]);
- }
- }
- if(conn.size() > 0)
- {
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- adapterProxy->resetRetryTime(false);
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- AdapterProxy* EndpointManager::getHashProxy(uint32_t hashCode, bool bConsistentHash)
- {
- if(_weightType == E_STATIC_WEIGHT)
- {
- if(bConsistentHash)
- {
- return getConHashProxyForWeight(hashCode, true);
- }
- else
- {
- return getHashProxyForWeight(hashCode, true, _hashStaticRouterCache);
- }
- }
- else
- {
- if(bConsistentHash)
- {
- return getConHashProxyForNormal(hashCode);
- }
- else
- {
- return getHashProxyForNormal(hashCode);
- }
- }
- }
- AdapterProxy* EndpointManager::getHashProxyForWeight(uint32_t hashCode, bool bStatic, vector<size_t> &vRouterCache)
- {
- if(_vRegProxys.empty())
- {
- TLOGERROR("[EndpointManager::getHashProxyForWeight _vRegProxys is empty], bStatic:" << bStatic << endl);
- return NULL;
- }
- if(checkHashStaticWeightChange(bStatic))
- {
- int64_t iBegin = TNOWMS;
- updateHashProxyWeighted(bStatic);
- int64_t iEnd = TNOWMS;
- TLOGTARS("[EndpointManager::getHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
- }
- if(vRouterCache.size() > 0)
- {
- size_t hash = hashCode % vRouterCache.size();
- //这里做判断的原因是:32位系统下,如果hashCode为负值,hash经过上面的计算会是一个超大值,导致越界
- if(hash >= vRouterCache.size())
- {
- hash = hash % vRouterCache.size();
- }
- size_t iIndex = vRouterCache[hash];
- if(iIndex >= _vRegProxys.size())
- {
- iIndex = iIndex % _vRegProxys.size();
- }
- //被hash到的节点在主控是active的才走在流程
- if (_vRegProxys[iIndex]->isActiveInReg() && _vRegProxys[iIndex]->checkActive(true))
- {
- return _vRegProxys[iIndex];
- }
- else
- {
- TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << _vRegProxys[iIndex]->endpoint().desc() << endl);
- if(_activeProxys.empty())
- {
- TLOGERROR("[EndpointManager::getHashProxyForWeight _activeEndpoints is empty], bStatic:" << bStatic << endl);
- return NULL;
- }
- //在active节点中再次hash
- vector<AdapterProxy*> thisHash = _activeProxys;
- vector<AdapterProxy*> conn;
- do
- {
- hash = hashCode % thisHash.size();
- if (thisHash[hash]->checkActive(true))
- {
- return thisHash[hash];
- }
- if(!thisHash[hash]->isConnTimeout() &&
- !thisHash[hash]->isConnExc())
- {
- conn.push_back(thisHash[hash]);
- }
- thisHash.erase(thisHash.begin() + hash);
- }
- while(!thisHash.empty());
- if(conn.size() > 0)
- {
- hash = hashCode % conn.size();
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy *adapterProxy = conn[hash];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- adapterProxy->resetRetryTime(false);
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- }
- return getHashProxyForNormal(hashCode);
- }
- AdapterProxy* EndpointManager::getConHashProxyForWeight(uint32_t hashCode, bool bStatic)
- {
- if(_vRegProxys.empty())
- {
- TLOGERROR("[EndpointManager::getConHashProxyForWeight _vRegProxys is empty], bStatic:" << bStatic << endl);
- return NULL;
- }
- if(checkConHashChange(bStatic, _lastConHashWeightProxys))
- {
- int64_t iBegin = TNOWMS;
- updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
- int64_t iEnd = TNOWMS;
- TLOGTARS("[EndpointManager::getConHashProxyForWeight update bStatic:" << bStatic << "|_objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
- }
- while(_consistentHashWeight.size() > 0)
- {
- string sNode;
- // 通过一致性hash取到对应的节点
- _consistentHashWeight.getNodeName(hashCode, sNode);
- auto it = _indexActiveProxys.find(sNode);
- // 节点不存在,可能是下线或者服务不可用
- if (it == _indexActiveProxys.end())
- {
- updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
- continue;
- }
- //被hash到的节点在主控是active的才走在流程
- if (it->second->isActiveInReg() && it->second->checkActive(true))
- {
- return it->second;
- }
- else
- {
- TLOGWARN("[EndpointManager::getHashProxyForWeight, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl);
- // 剔除节点再次hash
- if (!it->second->isActiveInReg())
- {
- // 如果在主控的注册状态不是active直接删除,如果状态有变更由updateEndpoints函数里重新添加
- _indexActiveProxys.erase(sNode);
- }
- // checkConHashChange里重新加回到_sortActivProxys重试
- _sortActivProxys.erase(sNode);
- updateConHashProxyWeighted(bStatic, _lastConHashWeightProxys, _consistentHashWeight);
- if (_indexActiveProxys.empty())
- {
- TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
- return NULL;
- }
- }
- }
- return getHashProxyForNormal(hashCode);
- }
- bool EndpointManager::checkHashStaticWeightChange(bool bStatic)
- {
- if(bStatic)
- {
- if(_lastHashStaticProxys.size() != _vRegProxys.size())
- {
- return true;
- }
- for(size_t i = 0; i < _vRegProxys.size(); i++)
- {
- //解决服务权重更新时哈希表不更新的问题
- if((_lastHashStaticProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc()) || _vRegProxys[i]->checkWeightChanged(true))
- {
- return true;
- }
- }
- }
- return false;
- }
- bool EndpointManager::checkConHashChange(bool bStatic, const map<string, AdapterProxy*> &mLastConHashProxys)
- {
- // 将之前故障临时剔除的节点重新加回来重试
- if (_indexActiveProxys.size() != _sortActivProxys.size())
- {
- for (auto &it : _indexActiveProxys)
- {
- _sortActivProxys[it.first] = it.second;
- }
- }
- if(mLastConHashProxys.size() != _sortActivProxys.size())
- {
- return true;
- }
- auto itLast = mLastConHashProxys.begin();
- auto itSort = _sortActivProxys.begin();
- for (; itLast!=mLastConHashProxys.end() && itSort!=_sortActivProxys.end(); ++itLast,++itSort)
- {
- if (itLast->first != itSort->first)
- {
- return true;
- }
- //解决服务权重更新时一致性哈希环不更新的问题
- if(bStatic && itSort->second->checkWeightChanged(true))
- {
- return true;
- }
- }
- /*
- if(vLastConHashProxys.size() != _vRegProxys.size())
- {
- return true;
- }
- for(size_t i = 0; i < _vRegProxys.size(); i++)
- {
- if(vLastConHashProxys[i]->endpoint().desc() != _vRegProxys[i]->endpoint().desc())
- {
- return true;
- }
- //解决服务权重更新时一致性哈希环不更新的问题
- if(bStatic && _vRegProxys[i]->checkWeightChanged(true))
- {
- return true;
- }
- }
- */
- return false;
- }
- void EndpointManager::updateHashProxyWeighted(bool bStatic)
- {
- if(_vRegProxys.size() <= 0)
- {
- TLOGERROR("[EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
- return ;
- }
- if(bStatic)
- {
- _lastHashStaticProxys = _vRegProxys;
- _hashStaticRouterCache.clear();
- }
- vector<AdapterProxy*> vRegProxys;
- vector<size_t> vIndex;
- for(size_t i = 0; i < _vRegProxys.size(); ++i)
- {
- if(_vRegProxys[i]->getWeight() > 0)
- {
- vRegProxys.push_back(_vRegProxys[i]);
- vIndex.push_back(i);
- }
- //防止多个服务节点权重同时更新时哈希表多次更新
- _vRegProxys[i]->resetWeightChanged();
- }
- if(vRegProxys.size() <= 0)
- {
- TLOGERROR("[EndpointManager::updateHashProxyWeighted vRegProxys is empty], bStatic:" << bStatic << endl);
- return ;
- }
- size_t iHashStaticWeightSize = vRegProxys.size();
- map<size_t, int> mIdToWeight;
- multimap<int, size_t> mWeightToId;
- size_t iMaxR = 0;
- size_t iMaxRouterR = 0;
- size_t iMaxWeight = vRegProxys[0]->getWeight();
- size_t iMinWeight = vRegProxys[0]->getWeight();
- size_t iTempWeight = 0;
- for(size_t i = 1;i < iHashStaticWeightSize; i++)
- {
- iTempWeight = vRegProxys[i]->getWeight();
-
- if(iTempWeight > iMaxWeight)
- {
- iMaxWeight = iTempWeight;
- }
- if(iTempWeight < iMinWeight)
- {
- iMinWeight = iTempWeight;
- }
- }
- if(iMinWeight > 0)
- {
- iMaxR = iMaxWeight / iMinWeight;
- if(iMaxR < iMinWeightLimit)
- iMaxR = iMinWeightLimit;
- if(iMaxR > iMaxWeightLimit)
- iMaxR = iMaxWeightLimit;
- }
- else
- {
- iMaxR = 1;
- iMaxWeight = 1;
- }
- for(size_t i = 0;i < iHashStaticWeightSize; i++)
- {
- int iWeight = (vRegProxys[i]->getWeight() * iMaxR) / iMaxWeight;
- if(iWeight > 0)
- {
- iMaxRouterR += iWeight;
- mIdToWeight.insert(map<size_t, int>::value_type(vIndex[i], iWeight));
- mWeightToId.insert(make_pair(iWeight, vIndex[i]));
- }
- else
- {
- if(bStatic)
- {
- _hashStaticRouterCache.push_back(vIndex[i]);
- }
- }
- TLOGTARS("EndpointManager::updateHashProxyWeighted bStatic:" << bStatic << "|_objName:" << _objName << "|endpoint:" << vRegProxys[i]->endpoint().desc() << "|iWeight:" << vRegProxys[i]->getWeight() << "|iWeightR:" << iWeight << "|iIndex:" << vIndex[i] << endl);
- }
- for(size_t i = 0; i < iMaxRouterR; i++)
- {
- bool bFirst = true;
- multimap<int, size_t> mulTemp;
- multimap<int, size_t>::reverse_iterator mIter = mWeightToId.rbegin();
- while(mIter != mWeightToId.rend())
- {
- if(bFirst)
- {
- bFirst = false;
- if(bStatic)
- {
- _hashStaticRouterCache.push_back(mIter->second);
- }
- mulTemp.insert(make_pair((mIter->first - iMaxRouterR + mIdToWeight[mIter->second]), mIter->second));
- }
- else
- {
- mulTemp.insert(make_pair((mIter->first + mIdToWeight[mIter->second]), mIter->second));
- }
- mIter++;
- }
- mWeightToId.clear();
- mWeightToId.swap(mulTemp);
- }
- }
- void EndpointManager::updateConHashProxyWeighted(bool bStatic, map<string, AdapterProxy*> &mLastConHashProxys, TC_ConsistentHashNew &conHash)
- {
- conHash.clear();
- if(_sortActivProxys.empty())
- {
- TLOGERROR("[EndpointManager::updateHashProxyWeighted _indexActiveProxys is empty], bStatic:" << bStatic << endl);
- return ;
- }
- mLastConHashProxys = _sortActivProxys;
- for (auto it = _sortActivProxys.begin(); it != _sortActivProxys.end(); ++it)
- {
- int iWeight = (bStatic ? (it->second->getWeight()) : 100);
- if(iWeight > 0)
- {
- iWeight = iWeight / 4;
- if(iWeight <= 0)
- {
- iWeight = 1;
- }
- // 同一服务有多个obj的情况
- // 同一hash值调用不同的obj会hash到不同的服务器
- // 因为addNode会根据desc(ip+port)计算md5,导致顺序不一致
- // 一致性hash用host进行索引,不使用index,这里传0
- conHash.addNode(it->second->endpoint().host(), 0, iWeight);
- }
- //防止多个服务节点权重同时更新时一致性哈希环多次更新
- it->second->resetWeightChanged();
- }
- /*
- if(_vRegProxys.size() <= 0)
- {
- TLOGERROR("[EndpointManager::updateHashProxyWeighted _vRegProxys is empty], bStatic:" << bStatic << endl);
- return ;
- }
- vLastConHashProxys = _vRegProxys;
- conHash.clear();
- for(size_t i = 0; i < _vRegProxys.size(); ++i)
- {
- int iWeight = (bStatic ? (_vRegProxys[i]->getWeight()) : 100);
- if(iWeight > 0)
- {
- iWeight = iWeight / 4;
- if(iWeight <= 0)
- {
- iWeight = 1;
- }
- // 同一服务有多个obj的情况
- // 同一hash值调用不同的obj会hash到不同的服务器
- // 因为addNode会根据desc(ip+port)计算md5,导致顺序不一致
- conHash.addNode(_vRegProxys[i]->endpoint().host(), i, iWeight);
- }
- //防止多个服务节点权重同时更新时一致性哈希环多次更新
- _vRegProxys[i]->resetWeightChanged();
- }
- */
- conHash.sortNode();
- }
- AdapterProxy* EndpointManager::getHashProxyForNormal(uint32_t hashCode)
- {
- if(_vRegProxys.empty())
- {
- TLOGERROR("[EndpointManager::getHashProxyForNormal _vRegProxys is empty]" << endl);
- return NULL;
- }
- // 1 _vRegProxys从客户端启动之后,就不会再改变,除非有节点增加
- // 2 如果有增加节点,则_vRegProxys顺序会重新排序,之前的hash会改变
- // 3 节点下线后,需要下次启动客户端后,_vRegProxys内容才会生效
- size_t hash = hashCode % _vRegProxys.size();
- //被hash到的节点在主控是active的才走在流程
- if (_vRegProxys[hash]->isActiveInReg() && _vRegProxys[hash]->checkActive(true))
- {
- return _vRegProxys[hash];
- }
- else
- {
- // TLOGWARN("[EndpointManager::getHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << _vRegProxys[hash]->endpoint().desc() << endl);
- if(_activeProxys.empty())
- {
- TLOGERROR("[EndpointManager::getHashProxyForNormal _activeEndpoints is empty]" << endl);
- return NULL;
- }
- //在active节点中再次hash
- vector<AdapterProxy*> thisHash = _activeProxys;
- vector<AdapterProxy*> conn;
- do
- {
- hash = hashCode % thisHash.size();
- if (thisHash[hash]->checkActive(true))
- {
- return thisHash[hash];
- }
- if(!thisHash[hash]->isConnTimeout() &&
- !thisHash[hash]->isConnExc())
- {
- conn.push_back(thisHash[hash]);
- }
- thisHash.erase(thisHash.begin() + hash);
- }
- while(!thisHash.empty());
- if(conn.size() > 0)
- {
- hash = hashCode % conn.size();
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy *adapterProxy = conn[hash];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- adapterProxy->resetRetryTime(false);
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- }
- AdapterProxy* EndpointManager::getConHashProxyForNormal(uint32_t hashCode)
- {
- if(_vRegProxys.empty())
- {
- TLOGERROR("[EndpointManager::getConHashProxyForNormal _vRegProxys is empty]" << endl);
- return NULL;
- }
- if(checkConHashChange(false, _lastConHashProxys))
- {
- int64_t iBegin = TNOWMS;
- updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
- int64_t iEnd = TNOWMS;
- TLOGTARS("[EndpointManager::getConHashProxyForNormal update _objName:" << _objName << "|timecost(ms):" << (iEnd - iBegin) << endl);
- }
- while(_consistentHash.size() > 0)
- {
- string sNode;
- // 通过一致性hash取到对应的节点
- _consistentHash.getNodeName(hashCode, sNode);
- auto it = _indexActiveProxys.find(sNode);
- // 节点不存在,可能是下线或者服务不可用
- if (it == _indexActiveProxys.end())
- {
- updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
- continue;
- }
- //被hash到的节点在主控是active的才走在流程
- if (it->second->isActiveInReg() && it->second->checkActive(true))
- {
- return it->second;
- }
- else
- {
- TLOGWARN("[EndpointManager::getConHashProxyForNormal, hash not active," << _objectProxy->name() << "@" << it->second->endpoint().desc() << endl);
- // 剔除节点再次hash
- if (!it->second->isActiveInReg())
- {
- // 如果在主控的注册状态不是active直接删除,如果状态有变更由updateEndpoints函数里重新添加
- _indexActiveProxys.erase(sNode);
- }
- // checkConHashChange里重新加回到_sortActivProxys重试
- _sortActivProxys.erase(sNode);
- updateConHashProxyWeighted(false, _lastConHashProxys, _consistentHash);
- if (_indexActiveProxys.empty())
- {
- TLOGERROR("[EndpointManager::getConHashProxyForNormal _activeEndpoints is empty]" << endl);
- return NULL;
- }
- }
- }
- return getHashProxyForNormal(hashCode);
- }
- AdapterProxy* EndpointManager::getWeightedProxy(bool bStaticWeighted)
- {
- return getWeightedForNormal(bStaticWeighted);
- }
- AdapterProxy* EndpointManager::getWeightedForNormal(bool bStaticWeighted)
- {
- if (_activeProxys.empty())
- {
- TLOGERROR("[EndpointManager::getWeightedForNormal activeEndpoints is empty][obj:"<<_objName<<"]" << endl);
- return NULL;
- }
- int64_t iNow = TNOW;
- if(_lastBuildWeightTime <= iNow)
- {
- updateProxyWeighted();
- if(!_first)
- {
- _lastBuildWeightTime = iNow + _updateWeightInterval;
- }
- else
- {
- _first = false;
- _lastBuildWeightTime = iNow + _updateWeightInterval + 5;
- }
- }
- bool bEmpty = false;
- int iActiveSize = _activeWeightProxy.size();
-
- if(iActiveSize > 0)
- {
- size_t iProxyIndex = 0;
- set<AdapterProxy*> sConn;
- if(_staticRouterCache.size() > 0)
- {
- for(size_t i = 0;i < _staticRouterCache.size(); i++)
- {
- ++_lastSWeightPosition;
- if(_lastSWeightPosition >= _staticRouterCache.size())
- _lastSWeightPosition = 0;
- iProxyIndex = _staticRouterCache[_lastSWeightPosition];
- if(_activeWeightProxy[iProxyIndex]->checkActive(false))
- {
- return _activeWeightProxy[iProxyIndex];
- }
- if(!_activeWeightProxy[iProxyIndex]->isConnTimeout() &&
- !_activeWeightProxy[iProxyIndex]->isConnExc())
- {
- sConn.insert(_activeWeightProxy[iProxyIndex]);
- }
- }
- }
- else
- {
- bEmpty = true;
- }
- if(!bEmpty)
- {
- if(sConn.size() > 0)
- {
- vector<AdapterProxy*> conn;
- set<AdapterProxy*>::iterator it_conn = sConn.begin();
- while(it_conn != sConn.end())
- {
- conn.push_back(*it_conn);
- ++it_conn;
- }
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeWeightProxy[((uint32_t)rand() % iActiveSize)];
- adapterProxy->resetRetryTime(false);
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- }
- vector<AdapterProxy*> conn;
- for(size_t i=0;i<_activeProxys.size();i++)
- {
- ++_lastRoundPosition;
- if(_lastRoundPosition >= _activeProxys.size())
- _lastRoundPosition = 0;
- if(_activeProxys[_lastRoundPosition]->checkActive(false))
- {
- return _activeProxys[_lastRoundPosition];
- }
- if(!_activeProxys[_lastRoundPosition]->isConnTimeout() &&
- !_activeProxys[_lastRoundPosition]->isConnExc())
- conn.push_back(_activeProxys[_lastRoundPosition]);
- }
- if(conn.size() > 0)
- {
- //都有问题, 随机选择一个没有connect超时或者链接异常的发送
- AdapterProxy * adapterProxy = conn[((uint32_t)rand() % conn.size())];
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- //所有adapter都有问题 选不到结点,随机找一个重试
- AdapterProxy * adapterProxy = _activeProxys[((uint32_t)rand() % _activeProxys.size())];
- adapterProxy->resetRetryTime(false);
- //该proxy可能已经被屏蔽,需重新连一次
- adapterProxy->checkActive(true);
- return adapterProxy;
- }
- void EndpointManager::updateProxyWeighted()
- {
- size_t iWeightProxySize = _activeProxys.size();
- if(iWeightProxySize <= 0)
- {
- TLOGERROR("[EndpointManager::updateProxyWeighted _objName:" << _objName << ", activeProxys.size() <= 0]" << endl);
- return ;
- }
- vector<AdapterProxy*> vProxy;
- for(size_t i = 0; i < _activeProxys.size(); ++i)
- {
- if(_activeProxys[i]->getWeight() > 0)
- {
- vProxy.push_back(_activeProxys[i]);
- }
- }
- iWeightProxySize = vProxy.size();
- if(iWeightProxySize <= 0)
- {
- TLOGERROR("[EndpointManager::updateProxyWeighted _objName:" << _objName << ", vProxy.size() <= 0]" << endl);
- return ;
- }
- if(_update)
- {
- _activeWeightProxy = vProxy;
- updateStaticWeighted();
- }
- _update = false;
- }
- void EndpointManager::updateStaticWeighted()
- {
- size_t iWeightProxySize = _activeWeightProxy.size();
- vector<int> vWeight;
- vWeight.resize(iWeightProxySize);
- for(size_t i = 0; i < iWeightProxySize; i++)
- {
- vWeight[i] = _activeWeightProxy[i]->getWeight();
- }
- dispatchEndpointCache(vWeight);
- }
- void EndpointManager::dispatchEndpointCache(const vector<int> &vWeight)
- {
- if(vWeight.size() <= 0)
- {
- TLOGERROR("[EndpointManager::dispatchEndpointCache vWeight.size() < 0]" << endl);
- return ;
- }
- size_t iWeightProxySize = vWeight.size();
- map<size_t, int> mIdToWeight;
- multimap<int, size_t> mWeightToId;
- size_t iMaxR = 0;
- size_t iMaxRouterR = 0;
- size_t iMaxWeight = 0;
- size_t iMinWeight = 0;
- size_t iTotalCapacty = 0;
- size_t iTempWeight = 0;
- for(size_t i = 0; i < vWeight.size(); ++i)
- {
- iTotalCapacty += vWeight[i];
- }
- _staticRouterCache.clear();
- _lastSWeightPosition = 0;
- _staticRouterCache.reserve(iTotalCapacty+100);
- iMaxWeight = vWeight[0];
- iMinWeight = vWeight[0];
- for(size_t i = 1;i < iWeightProxySize; i++)
- {
- iTempWeight = vWeight[i];
-
- if(iTempWeight > iMaxWeight)
- {
- iMaxWeight = iTempWeight;
- }
- if(iTempWeight < iMinWeight)
- {
- iMinWeight = iTempWeight;
- }
- }
- if(iMinWeight > 0)
- {
- iMaxR = iMaxWeight / iMinWeight;
- if(iMaxR < iMinWeightLimit)
- iMaxR = iMinWeightLimit;
- if(iMaxR > iMaxWeightLimit)
- iMaxR = iMaxWeightLimit;
- }
- else
- {
- iMaxR = 1;
- iMaxWeight = 1;
- }
- for(size_t i = 0;i < iWeightProxySize; i++)
- {
- int iWeight = (vWeight[i] * iMaxR) / iMaxWeight;
- if(iWeight > 0)
- {
- iMaxRouterR += iWeight;
- mIdToWeight.insert(map<size_t, int>::value_type(i, iWeight));
- mWeightToId.insert(make_pair(iWeight, i));
- }
- else
- {
- _staticRouterCache.push_back(i);
- }
-
- TLOGTARS("EndpointManager::dispatchEndpointCache _objName:" << _objName << "|endpoint:" << _activeWeightProxy[i]->endpoint().desc() << "|iWeightR:" << iWeight << endl);
- }
- for(size_t i = 0; i < iMaxRouterR; i++)
- {
- bool bFirst = true;
- multimap<int, size_t> mulTemp;
- multimap<int, size_t>::reverse_iterator mIter = mWeightToId.rbegin();
- while(mIter != mWeightToId.rend())
- {
- if(bFirst)
- {
- bFirst = false;
- _staticRouterCache.push_back(mIter->second);
- mulTemp.insert(make_pair((mIter->first - iMaxRouterR + mIdToWeight[mIter->second]), mIter->second));
- }
- else
- {
- mulTemp.insert(make_pair((mIter->first + mIdToWeight[mIter->second]), mIter->second));
- }
- mIter++;
- }
- mWeightToId.clear();
- mWeightToId.swap(mulTemp);
- }
- }
- /////////////////////////////////////////////////////////////////////////////
- EndpointThread::EndpointThread(Communicator* pComm, const string & sObjName, GetEndpointType type, const string & sName, bool bFirstNetThread)
- : QueryEpBase(pComm,bFirstNetThread,true)
- , _type(type)
- , _name(sName)
- {
- init(sObjName, "", true);
- }
- void EndpointThread::getEndpoints(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- //直连调用这个接口无效
- if(_direct)
- {
- return ;
- }
- {
- TC_LockT<TC_ThreadMutex> lock(_mutex);
- refreshReg(_type,_name);
-
- activeEndPoint = _activeEndPoint;
- inactiveEndPoint = _inactiveEndPoint;
- }
- }
- void EndpointThread::getTCEndpoints(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- //直连调用这个接口无效
- if(_direct)
- {
- return ;
- }
- {
-
- TC_LockT<TC_ThreadMutex> lock(_mutex);
-
- refreshReg(_type,_name);
-
- activeEndPoint = _activeTCEndPoint;
- inactiveEndPoint = _inactiveTCEndPoint;
- }
- }
- void EndpointThread::notifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive,bool bSync)
- {
- if(!bSync)
- {
- TC_LockT<TC_ThreadMutex> lock(_mutex);
- update(active, inactive);
- }
- else
- {
- update(active, inactive);
- }
- }
- void EndpointThread::update(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive)
- {
- _activeEndPoint.clear();
- _inactiveEndPoint.clear();
- _activeTCEndPoint.clear();
- _inactiveTCEndPoint.clear();
- set<EndpointInfo>::iterator iter= active.begin();
- for(;iter != active.end(); ++iter)
- {
- _activeTCEndPoint.push_back(iter->getEndpoint());
- _activeEndPoint.push_back(*iter);
- }
- iter = inactive.begin();
- for(;iter != inactive.end(); ++iter)
- {
- _inactiveTCEndPoint.push_back(iter->getEndpoint());
- _inactiveEndPoint.push_back(*iter);
- }
- }
- /////////////////////////////////////////////////////////////////////////////
- EndpointManagerThread::EndpointManagerThread(Communicator * pComm,const string & sObjName)
- :_communicator(pComm)
- ,_objName(sObjName)
- {
- }
- EndpointManagerThread::~EndpointManagerThread()
- {
- map<string,EndpointThread*>::iterator iter;
- for(iter=_info.begin();iter != _info.end();iter++)
- {
- if(iter->second)
- {
- delete iter->second;
- iter->second = NULL;
- }
- }
- }
- void EndpointManagerThread::getEndpoint(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_DEFAULT,"");
- pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getEndpointByAll(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_ALL,"");
- pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getEndpointBySet(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_SET,sName);
- pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getEndpointByStation(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_STATION,sName);
- pThread->getEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getTCEndpoint(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_DEFAULT,"");
- pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getTCEndpointByAll(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_ALL,"");
- pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getTCEndpointBySet(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_SET,sName);
- pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
- }
- void EndpointManagerThread::getTCEndpointByStation(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint)
- {
- EndpointThread * pThread = getEndpointThread(E_STATION,sName);
- pThread->getTCEndpoints(activeEndPoint,inactiveEndPoint);
- }
- EndpointThread * EndpointManagerThread::getEndpointThread(GetEndpointType type,const string & sName)
- {
- string sAllName = TC_Common::tostr((int)type) + ":" + sName;
- {
- TC_RW_RLockT<TC_ThreadRWLocker> lock(_mutex);
- auto iter = _info.find(sAllName);
- if (iter != _info.end())
- {
- return iter->second;
- }
- }
- {
- TC_RW_WLockT<TC_ThreadRWLocker> lock(_mutex);
- auto iter = _info.find(sAllName);
- if (iter != _info.end())
- {
- return iter->second;
- }
- EndpointThread* pThread = new EndpointThread(_communicator, _objName, type, sName);
- _info[sAllName] = pThread;
- return pThread;
- }
- }
- }
|