123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735 |
- /**
- * Tencent is pleased to support the open source community by making Tars available.
- *
- * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
- *
- * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * https://opensource.org/licenses/BSD-3-Clause
- *
- * Unless required by applicable law or agreed to in writing, software distributed
- * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
- #ifndef __TARS_ENDPOINT_MANAGER_H_
- #define __TARS_ENDPOINT_MANAGER_H_
- #include "servant/EndpointInfo.h"
- #include "servant/EndpointF.h"
- #include "servant/QueryF.h"
- #include "servant/AppProtocol.h"
- #include "util/tc_spin_lock.h"
- #include "util/tc_consistent_hash_new.h"
- namespace tars
- {
- ////////////////////////////////////////////////////////////////////////
- /*
- * 获取路由的方式
- */
- enum GetEndpointType
- {
- E_DEFAULT = 0,
- E_ALL = 1,
- E_SET = 2,
- E_STATION = 3
- };
- /*
- * 权重类型
- */
- enum EndpointWeightType
- {
- E_LOOP = 0,
- E_STATIC_WEIGHT = 1,
- };
- ////////////////////////////////////////////////////////////////////////
- /*
- * 路由请求与回调的实现类
- */
- class QueryEpBase : public QueryFPrxCallback
- {
- public:
- /*
- * 构造函数
- */
- QueryEpBase(Communicator * pComm, bool bFirstNetThread, bool bInterfaceReq);
- /*
- * 析构函数
- */
- virtual ~QueryEpBase(){}
- /*
- * 初始化
- */
- bool init(const string & sObjName, const string& setName, bool rootServant);
- /*
- * 获取所有节点信息的回调处理
- */
- void callback_findObjectById4All(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
- /*
- * 获取所有节点信息的异常回调处理
- */
- void callback_findObjectById4All_exception(tars::Int32 ret);
- /*
- * 获取所有节点信息的回调处理
- */
- void callback_findObjectById4Any(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
- /*
- * 获取所有节点信息的异常回调处理
- */
- void callback_findObjectById4Any_exception(tars::Int32 ret);
- /*
- * 按idc获取的节点信息的回调处理
- */
- void callback_findObjectByIdInSameGroup(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
- /*
- * 按idc分组获取的节点信息的异常回调处理
- */
- void callback_findObjectByIdInSameGroup_exception(tars::Int32 ret);
- /*
- * 按set获取的节点信息的回调处理
- */
- void callback_findObjectByIdInSameSet(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
- /*
- * 按set获取的节点信息的异常回调处理
- */
- void callback_findObjectByIdInSameSet_exception(tars::Int32 ret);
- /*
- * 按地区获取的节点信息的回调处理
- */
- void callback_findObjectByIdInSameStation(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
- /*
- * 按地区获取的节点信息的异常回调处理
- */
- void callback_findObjectByIdInSameStation_exception(tars::Int32 ret);
- /*
- * 从主控请求到数据了 通知更新ip列表信息
- */
- virtual void notifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive,bool bSync) = 0;
- /*
- * 从主控请求到数据了 最开始调用主控 要通知
- */
- virtual void doNotify() = 0;
- /*
- * 设置主控的代理
- */
- int setLocatorPrx(QueryFPrx prx);
- /*
- * 是否直连后端
- */
- inline bool getDirectProxy() { return _direct; }
- protected:
- /*
- * 刷新主控
- */
- void refreshReg(GetEndpointType type,const string & sName);
- private:
-
- /*
- * 设置obj名字
- * 如果是直接连接,则从obj名字中提取ip列表信息
- * 如果是间接连接,则设置主控代理,并从缓存中加载相应的列表
- */
- void setObjName(const string & sObjName);
- // /*
- // * 解析endpoint
- // */
- // vector<string> sepEndpoint(const string& sEndpoints);
- /*
- * 从sEndpoints提取ip列表信息
- */
- void setEndpoints(const string & sEndpoints, set<EndpointInfo> & setEndpoints);
- /*
- * 主控的请求的响应到了,做相应的处理
- */
- void doEndpoints(const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp, int iRet, bool bSync = false);
- /*
- * 请求主控异常,做相应的处理
- */
- void doEndpointsExp(int iRet);
- /*
- * 刷新ip列表信息到缓存文件
- */
- void setEndPointToCache(bool bInactive);
- /*
- * 更新外部地址
- */
- virtual void onUpdateOutter() {};
- protected:
- /*
- * 通信器
- */
- Communicator * _communicator;
- /*
- * 是否第一个客户端网络线程
- * 若是,会对ip列表信息的写缓存
- */
- bool _firstNetThread;
- /*
- * 是否主动请求ip列表信息的接口的请求
- * 比如按idc获取某个obj的ip列表信息
- */
- bool _interfaceReq;
- /*
- * 是否直连后端服务
- */
- bool _direct;
- /*
- * 请求的后端服务的Obj对象名称
- */
- string _objName;
- /*
- * 指定set调用的setid,默认为空
- * 如果有值,则说明是指定set调用
- */
- string _invokeSetId;
- /*
- * 框架的主控地址
- */
- string _locator;
- /*
- * 主控的路由代理
- */
- QueryFPrx _queryFPrx;
- /*
- * 数据是否有效,初始化的时候是无效的数据
- * 只有请求过主控或者从文件缓存加载的数据才是有效数据
- */
- bool _valid;
- /*
- * 权重类型
- */
- EndpointWeightType _weightType;
-
- /*
- * 活跃节点列表
- */
- set<EndpointInfo> _activeEndpoints;
- /*
- * 不活跃节点列表
- */
- set<EndpointInfo> _inactiveEndpoints;
- /**
- * 是否是root servant
- */
- bool _rootServant;
- protected:
- /////////以下是请求主控的策略信息/////////////////
- /*
- * 是否正在向请求主控服务的ip列表信息
- */
- bool _requestRegistry;
- /*
- * 请求主控的超时时间(绝对时间),单位毫秒
- * 防止请求超时或者失败,一直处理请求状态
- */
- int64_t _requestTimeout;
- /*
- * 请求主控的超时间隔,默认5s
- */
- int _timeoutInterval;
- /*
- * 下次请求主控的时间(绝对时间),单位毫秒
- */
- int64_t _refreshTime;
- /*
- * 正常请求主控的频率(有ip列表信息),单位毫秒
- * 默认60s一次
- */
- int _refreshInterval;
- /*
- * 主控返回的活跃ip列表为空情况下
- * 请求主控的频率,默认10s一次
- */
- int _activeEmptyInterval;
- /*
- * 请求主控失败的时间频率
- * 默认2s一次
- */
- int _failInterval;
- /*
- * 请求主控连续失败达到一定的次数后
- * 请求主控的时间频率,默认30s一次
- */
- int _manyFailInterval;
- /*
- * 连续请求失败的次数限制,默认3次
- */
- int _failTimesLimit;
- /*
- * 连续失败的次数
- */
- int _failTimes;
-
- };
- ////////////////////////////////////////////////////////////////////////
- /*
- * 框架内部的路由管理的实现类
- */
- class EndpointManager : public QueryEpBase
- {
- public:
- static const size_t iMinWeightLimit = 10;
- static const size_t iMaxWeightLimit = 100;
- public:
- /*
- * 构造函数
- */
- EndpointManager(ObjectProxy* pObjectProxy, Communicator* pComm, bool bFirstNetThread);
- /*
- * 析构函数
- */
- virtual ~EndpointManager();
- /*
- * 重写基类的实现
- */
- void notifyEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive, bool bSync = false);
- /**
- * 更新
- * @param active
- * @param inactive
- */
- void updateEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive);
- /**
- * 外部其他通信过来的更新
- * @param active
- * @param inactive
- */
- void updateEndpointsOutter(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive);
- /*
- * 重写基类的实现
- */
- void doNotify();
- /**
- * 根据请求策略从可用的服务列表选择一个服务节点
- */
- bool selectAdapterProxy(ReqMessage * msg, AdapterProxy * & pAdapterProxy);
- /**
- * 获取所有的服务节点
- */
- const vector<AdapterProxy*> & getAdapters()
- {
- return _vAllProxys;
- }
- private:
- virtual void onUpdateOutter();
- /*
- * 轮询选取一个结点
- */
- AdapterProxy * getNextValidProxy();
- /*
- * 根据hash值选取一个结点
- */
- AdapterProxy* getHashProxy(int64_t hashCode, bool bConsistentHash = false);
- /*
- * 根据hash值按取模方式,从正常节点中选取一个结点
- */
- AdapterProxy* getHashProxyForNormal(int64_t hashCode);
- /*
- * 根据hash值按一致性hash方式,从正常节点中选取一个结点
- */
- AdapterProxy* getConHashProxyForNormal(int64_t hashCode);
- /*
- * 根据hash值按取模方式,从静态权重节点中选取一个结点
- */
- AdapterProxy* getHashProxyForWeight(int64_t hashCode, bool bStatic, vector<size_t> &vRouterCache);
- /*
- * 根据hash值按一致性hash方式,从静态权重节点中选取一个结点
- */
- AdapterProxy* getConHashProxyForWeight(int64_t hashCode, bool bStatic);
- /*
- * 判断静态权重节点是否有变化
- */
- bool checkHashStaticWeightChange(bool bStatic);
- /*
- * 判断静态权重节点是否有变化
- */
- bool checkConHashChange(bool bStatic, const map<string, AdapterProxy*> &mLastConHashProxys);
- /*
- * 更新取模hash方法的静态权重节点信息
- */
- void updateHashProxyWeighted(bool bStatic);
- /*
- * 更新一致性hash方法的静态权重节点信息
- */
- void updateConHashProxyWeighted(bool bStatic, map<string, AdapterProxy*> &mLastConHashProxys, TC_ConsistentHashNew &conHash);
- /*
- * 根据后端服务的权重值选取一个结点
- */
- AdapterProxy* getWeightedProxy(bool bStaticWeighted);
- /*
- * 根据后端服务的权重值选取一个结点
- */
- AdapterProxy* getWeightedForNormal(bool bStaticWeighted);
- /*
- * 根据各个节点的权重值,建立各个节点的调用数
- */
- void updateProxyWeighted();
- /*
- * 根据各个节点的静态权重值,建立各个节点的静态权重
- */
- void updateStaticWeighted();
- /*
- * 建立静态权重节点信息的缓存
- */
- void dispatchEndpointCache(const vector<int> &vWeight);
- private:
- /*
- * ObjectProxy
- */
- ObjectProxy * _objectProxy;
- /*
- * 活跃的结点
- */
- vector<AdapterProxy*> _activeProxys;
- /*
- * 一致性hash使用,保证强一致性
- * key:host
- */
- map<string, AdapterProxy*> _sortActivProxys;
- unordered_map<string, AdapterProxy*> _indexActiveProxys;
- /*
- * 部署的结点 包括活跃的和不活跃的
- */
- map<string,AdapterProxy*> _regProxys;
- vector<AdapterProxy*> _vRegProxys;
- /*
- * 所有曾经create的结点
- */
- map<string,AdapterProxy*> _allProxys;
- vector<AdapterProxy*> _vAllProxys;
- /*
- * 轮训访问_activeProxys的偏移
- */
- size_t _lastRoundPosition;
- /*
- * 节点信息是否有更新
- */
- bool _update;
- /*
- * 是否是第一次建立权重信息
- */
- bool _first;
- /**
- * 上次重新建立权重信息表的时间
- */
- int64_t _lastBuildWeightTime;
- /**
- * 负载值更新频率,单位毫秒
- */
- int32_t _updateWeightInterval;
- /**
- * 静态时,对应的节点路由选择
- */
- size_t _lastSWeightPosition;
- /**
- * 静态权重对应的节点路由缓存
- */
- vector<size_t> _staticRouterCache;
- /*
- * 静态权重的活跃节点
- */
- vector<AdapterProxy*> _activeWeightProxy;
- /*
- * hash静态权重的路由缓存
- */
- vector<size_t> _hashStaticRouterCache;
- /*
- * hash静态权重的缓存节点
- */
- vector<AdapterProxy*> _lastHashStaticProxys;
- /*
- * 一致性hash静态权重时使用
- */
- map<string, AdapterProxy*> _lastConHashWeightProxys;
- /*
- * 一致性hash静态权重时使用
- */
- TC_ConsistentHashNew _consistentHashWeight;
- /*
- * 一致性hash普通使用
- */
- map<string, AdapterProxy*> _lastConHashProxys;
- /*
- * 一致性hash普通使用
- */
- TC_ConsistentHashNew _consistentHash;
- struct OutterUpdate
- {
- set<EndpointInfo> active;
- set<EndpointInfo> inactive;
- };
- shared_ptr<OutterUpdate> _outterUpdate;
- };
- ////////////////////////////////////////////////////////////////////////
- /*
- * 对外按类型获取路由的实现类
- */
- class EndpointThread : public QueryEpBase
- {
- public:
- /*
- * 构造函数
- */
- EndpointThread(Communicator* pComm, const string & sObjName, GetEndpointType type, const string & sSetName, bool bFirstNetThread = false);
- /*
- * 析构函数
- */
- ~EndpointThread(){};
- /*
- * 用EndpointInfo存在可用与不可用的节点信息
- */
- void getEndpoints(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
- /*
- * 用TC_Endpoint存在可用与不可用的节点信息
- */
- void getTCEndpoints(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
- /*
- * 重写基类的实现
- */
- void notifyEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive, bool bSync);
- /*
- * 重写基类的实现
- */
- void doNotify()
- {
- }
- private:
- /*
- * 更新缓存的ip列表信息
- */
- void update(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive);
- private:
- /*
- * 类型
- */
- GetEndpointType _type;
- /*
- * Obj名称
- */
- string _name;
- /*
- * 锁
- */
- TC_ThreadMutex _mutex;
- /*
- * 活跃的结点
- */
- vector<EndpointInfo> _activeEndPoint;
- vector<TC_Endpoint> _activeTCEndPoint;
- /*
- * 不活跃的结点
- */
- vector<EndpointInfo> _inactiveEndPoint;
- vector<TC_Endpoint> _inactiveTCEndPoint;
-
- };
- ////////////////////////////////////////////////////////////////////////
- /*
- * 对外获取路由请求的封装类
- */
- class EndpointManagerThread
- {
- public:
- /*
- * 构造函数
- */
- EndpointManagerThread(Communicator *pComm, const string &sObjName);
- /*
- * 析构函数
- */
- ~EndpointManagerThread();
- /*
- * 按idc获取可用与不可用的结点
- */
- void getEndpoint(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
- /*
- * 获取所有可用与不可用的结点
- */
- void getEndpointByAll(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
- /*
- * 根据set获取可用与不可用的结点
- */
- void getEndpointBySet(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
- /*
- * 根据地区获取可用与不可用的结点
- */
- void getEndpointByStation(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
- /*
- * 按idc获取可用与不可用的结点
- */
- void getTCEndpoint( vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
- /*
- * 获取所有可用与不可用的结点
- */
- void getTCEndpointByAll(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
- /*
- * 根据set获取可用与不可用的结点
- */
- void getTCEndpointBySet(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
- /*
- * 根据地区获取可用与不可用的结点
- */
- void getTCEndpointByStation(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
- protected:
- /*
- * 根据type创建相应的EndpointThread
- */
- EndpointThread * getEndpointThread(GetEndpointType type, const string & sName);
- private:
- /*
- * 通信器
- */
- Communicator * _communicator;
- /*
- * Obj名称
- */
- string _objName;
- /*
- * 锁
- */
- // TC_ThreadLock _mutex;
- TC_SpinLock _mutex;
- /*
- * 保存对象的map
- */
- map<string,EndpointThread*> _info;
- };
- ////////////////////////////////////////////////////////////////////////
- }
- #endif
|