EndpointManager.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #ifndef __TARS_ENDPOINT_MANAGER_H_
  17. #define __TARS_ENDPOINT_MANAGER_H_
  18. #include "servant/EndpointInfo.h"
  19. #include "servant/EndpointF.h"
  20. #include "servant/QueryF.h"
  21. #include "servant/AppProtocol.h"
  22. #include "util/tc_spin_lock.h"
  23. #include "util/tc_consistent_hash_new.h"
  24. namespace tars
  25. {
  26. ////////////////////////////////////////////////////////////////////////
  27. /*
  28. * 获取路由的方式
  29. */
  30. enum GetEndpointType
  31. {
  32. E_DEFAULT = 0,
  33. E_ALL = 1,
  34. E_SET = 2,
  35. E_STATION = 3
  36. };
  37. /*
  38. * 权重类型
  39. */
  40. enum EndpointWeightType
  41. {
  42. E_LOOP = 0,
  43. E_STATIC_WEIGHT = 1,
  44. };
  45. ////////////////////////////////////////////////////////////////////////
  46. /*
  47. * 路由请求与回调的实现类
  48. */
  49. class QueryEpBase : public QueryFPrxCallback
  50. {
  51. public:
  52. /*
  53. * 构造函数
  54. */
  55. QueryEpBase(Communicator * pComm, bool bFirstNetThread, bool bInterfaceReq);
  56. /*
  57. * 析构函数
  58. */
  59. virtual ~QueryEpBase(){}
  60. /*
  61. * 初始化
  62. */
  63. bool init(const string & sObjName, const string& setName, bool rootServant);
  64. /*
  65. * 获取所有节点信息的回调处理
  66. */
  67. void callback_findObjectById4All(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  68. /*
  69. * 获取所有节点信息的异常回调处理
  70. */
  71. void callback_findObjectById4All_exception(tars::Int32 ret);
  72. /*
  73. * 获取所有节点信息的回调处理
  74. */
  75. void callback_findObjectById4Any(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  76. /*
  77. * 获取所有节点信息的异常回调处理
  78. */
  79. void callback_findObjectById4Any_exception(tars::Int32 ret);
  80. /*
  81. * 按idc获取的节点信息的回调处理
  82. */
  83. void callback_findObjectByIdInSameGroup(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  84. /*
  85. * 按idc分组获取的节点信息的异常回调处理
  86. */
  87. void callback_findObjectByIdInSameGroup_exception(tars::Int32 ret);
  88. /*
  89. * 按set获取的节点信息的回调处理
  90. */
  91. void callback_findObjectByIdInSameSet(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  92. /*
  93. * 按set获取的节点信息的异常回调处理
  94. */
  95. void callback_findObjectByIdInSameSet_exception(tars::Int32 ret);
  96. /*
  97. * 按地区获取的节点信息的回调处理
  98. */
  99. void callback_findObjectByIdInSameStation(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  100. /*
  101. * 按地区获取的节点信息的异常回调处理
  102. */
  103. void callback_findObjectByIdInSameStation_exception(tars::Int32 ret);
  104. /*
  105. * 从主控请求到数据了 通知更新ip列表信息
  106. */
  107. virtual void notifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive,bool bSync) = 0;
  108. /*
  109. * 从主控请求到数据了 最开始调用主控 要通知
  110. */
  111. virtual void doNotify() = 0;
  112. /*
  113. * 设置主控的代理
  114. */
  115. int setLocatorPrx(QueryFPrx prx);
  116. /*
  117. * 是否直连后端
  118. */
  119. inline bool getDirectProxy() { return _direct; }
  120. protected:
  121. /*
  122. * 刷新主控
  123. */
  124. void refreshReg(GetEndpointType type,const string & sName);
  125. private:
  126. /*
  127. * 设置obj名字
  128. * 如果是直接连接,则从obj名字中提取ip列表信息
  129. * 如果是间接连接,则设置主控代理,并从缓存中加载相应的列表
  130. */
  131. void setObjName(const string & sObjName);
  132. // /*
  133. // * 解析endpoint
  134. // */
  135. // vector<string> sepEndpoint(const string& sEndpoints);
  136. /*
  137. * 从sEndpoints提取ip列表信息
  138. */
  139. void setEndpoints(const string & sEndpoints, set<EndpointInfo> & setEndpoints);
  140. /*
  141. * 主控的请求的响应到了,做相应的处理
  142. */
  143. void doEndpoints(const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp, int iRet, bool bSync = false);
  144. /*
  145. * 请求主控异常,做相应的处理
  146. */
  147. void doEndpointsExp(int iRet);
  148. /*
  149. * 刷新ip列表信息到缓存文件
  150. */
  151. void setEndPointToCache(bool bInactive);
  152. /*
  153. * 更新外部地址
  154. */
  155. virtual void onUpdateOutter() {};
  156. protected:
  157. /*
  158. * 通信器
  159. */
  160. Communicator * _communicator;
  161. /*
  162. * 是否第一个客户端网络线程
  163. * 若是,会对ip列表信息的写缓存
  164. */
  165. bool _firstNetThread;
  166. /*
  167. * 是否主动请求ip列表信息的接口的请求
  168. * 比如按idc获取某个obj的ip列表信息
  169. */
  170. bool _interfaceReq;
  171. /*
  172. * 是否直连后端服务
  173. */
  174. bool _direct;
  175. /*
  176. * 请求的后端服务的Obj对象名称
  177. */
  178. string _objName;
  179. /*
  180. * 指定set调用的setid,默认为空
  181. * 如果有值,则说明是指定set调用
  182. */
  183. string _invokeSetId;
  184. /*
  185. * 框架的主控地址
  186. */
  187. string _locator;
  188. /*
  189. * 主控的路由代理
  190. */
  191. QueryFPrx _queryFPrx;
  192. /*
  193. * 数据是否有效,初始化的时候是无效的数据
  194. * 只有请求过主控或者从文件缓存加载的数据才是有效数据
  195. */
  196. bool _valid;
  197. /*
  198. * 权重类型
  199. */
  200. EndpointWeightType _weightType;
  201. /*
  202. * 活跃节点列表
  203. */
  204. set<EndpointInfo> _activeEndpoints;
  205. /*
  206. * 不活跃节点列表
  207. */
  208. set<EndpointInfo> _inactiveEndpoints;
  209. /**
  210. * 是否是root servant
  211. */
  212. bool _rootServant;
  213. protected:
  214. /////////以下是请求主控的策略信息/////////////////
  215. /*
  216. * 是否正在向请求主控服务的ip列表信息
  217. */
  218. bool _requestRegistry;
  219. /*
  220. * 请求主控的超时时间(绝对时间),单位毫秒
  221. * 防止请求超时或者失败,一直处理请求状态
  222. */
  223. int64_t _requestTimeout;
  224. /*
  225. * 请求主控的超时间隔,默认5s
  226. */
  227. int _timeoutInterval;
  228. /*
  229. * 下次请求主控的时间(绝对时间),单位毫秒
  230. */
  231. int64_t _refreshTime;
  232. /*
  233. * 正常请求主控的频率(有ip列表信息),单位毫秒
  234. * 默认60s一次
  235. */
  236. int _refreshInterval;
  237. /*
  238. * 主控返回的活跃ip列表为空情况下
  239. * 请求主控的频率,默认10s一次
  240. */
  241. int _activeEmptyInterval;
  242. /*
  243. * 请求主控失败的时间频率
  244. * 默认2s一次
  245. */
  246. int _failInterval;
  247. /*
  248. * 请求主控连续失败达到一定的次数后
  249. * 请求主控的时间频率,默认30s一次
  250. */
  251. int _manyFailInterval;
  252. /*
  253. * 连续请求失败的次数限制,默认3次
  254. */
  255. int _failTimesLimit;
  256. /*
  257. * 连续失败的次数
  258. */
  259. int _failTimes;
  260. };
  261. ////////////////////////////////////////////////////////////////////////
  262. /*
  263. * 框架内部的路由管理的实现类
  264. */
  265. class EndpointManager : public QueryEpBase
  266. {
  267. public:
  268. static const size_t iMinWeightLimit = 10;
  269. static const size_t iMaxWeightLimit = 100;
  270. public:
  271. /*
  272. * 构造函数
  273. */
  274. EndpointManager(ObjectProxy* pObjectProxy, Communicator* pComm, bool bFirstNetThread);
  275. /*
  276. * 析构函数
  277. */
  278. virtual ~EndpointManager();
  279. /*
  280. * 重写基类的实现
  281. */
  282. void notifyEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive, bool bSync = false);
  283. /**
  284. * 更新
  285. * @param active
  286. * @param inactive
  287. */
  288. void updateEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive);
  289. /**
  290. * 外部其他通信过来的更新
  291. * @param active
  292. * @param inactive
  293. */
  294. void updateEndpointsOutter(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive);
  295. /*
  296. * 重写基类的实现
  297. */
  298. void doNotify();
  299. /**
  300. * 根据请求策略从可用的服务列表选择一个服务节点
  301. */
  302. bool selectAdapterProxy(ReqMessage * msg, AdapterProxy * & pAdapterProxy);
  303. /**
  304. * 获取所有的服务节点
  305. */
  306. const vector<AdapterProxy*> & getAdapters()
  307. {
  308. return _vAllProxys;
  309. }
  310. private:
  311. virtual void onUpdateOutter();
  312. /*
  313. * 轮询选取一个结点
  314. */
  315. AdapterProxy * getNextValidProxy();
  316. /*
  317. * 根据hash值选取一个结点
  318. */
  319. AdapterProxy* getHashProxy(int64_t hashCode, bool bConsistentHash = false);
  320. /*
  321. * 根据hash值按取模方式,从正常节点中选取一个结点
  322. */
  323. AdapterProxy* getHashProxyForNormal(int64_t hashCode);
  324. /*
  325. * 根据hash值按一致性hash方式,从正常节点中选取一个结点
  326. */
  327. AdapterProxy* getConHashProxyForNormal(int64_t hashCode);
  328. /*
  329. * 根据hash值按取模方式,从静态权重节点中选取一个结点
  330. */
  331. AdapterProxy* getHashProxyForWeight(int64_t hashCode, bool bStatic, vector<size_t> &vRouterCache);
  332. /*
  333. * 根据hash值按一致性hash方式,从静态权重节点中选取一个结点
  334. */
  335. AdapterProxy* getConHashProxyForWeight(int64_t hashCode, bool bStatic);
  336. /*
  337. * 判断静态权重节点是否有变化
  338. */
  339. bool checkHashStaticWeightChange(bool bStatic);
  340. /*
  341. * 判断静态权重节点是否有变化
  342. */
  343. bool checkConHashChange(bool bStatic, const map<string, AdapterProxy*> &mLastConHashProxys);
  344. /*
  345. * 更新取模hash方法的静态权重节点信息
  346. */
  347. void updateHashProxyWeighted(bool bStatic);
  348. /*
  349. * 更新一致性hash方法的静态权重节点信息
  350. */
  351. void updateConHashProxyWeighted(bool bStatic, map<string, AdapterProxy*> &mLastConHashProxys, TC_ConsistentHashNew &conHash);
  352. /*
  353. * 根据后端服务的权重值选取一个结点
  354. */
  355. AdapterProxy* getWeightedProxy(bool bStaticWeighted);
  356. /*
  357. * 根据后端服务的权重值选取一个结点
  358. */
  359. AdapterProxy* getWeightedForNormal(bool bStaticWeighted);
  360. /*
  361. * 根据各个节点的权重值,建立各个节点的调用数
  362. */
  363. void updateProxyWeighted();
  364. /*
  365. * 根据各个节点的静态权重值,建立各个节点的静态权重
  366. */
  367. void updateStaticWeighted();
  368. /*
  369. * 建立静态权重节点信息的缓存
  370. */
  371. void dispatchEndpointCache(const vector<int> &vWeight);
  372. private:
  373. /*
  374. * ObjectProxy
  375. */
  376. ObjectProxy * _objectProxy;
  377. /*
  378. * 活跃的结点
  379. */
  380. vector<AdapterProxy*> _activeProxys;
  381. /*
  382. * 一致性hash使用,保证强一致性
  383. * key:host
  384. */
  385. map<string, AdapterProxy*> _sortActivProxys;
  386. unordered_map<string, AdapterProxy*> _indexActiveProxys;
  387. /*
  388. * 部署的结点 包括活跃的和不活跃的
  389. */
  390. map<string,AdapterProxy*> _regProxys;
  391. vector<AdapterProxy*> _vRegProxys;
  392. /*
  393. * 所有曾经create的结点
  394. */
  395. map<string,AdapterProxy*> _allProxys;
  396. vector<AdapterProxy*> _vAllProxys;
  397. /*
  398. * 轮训访问_activeProxys的偏移
  399. */
  400. size_t _lastRoundPosition;
  401. /*
  402. * 节点信息是否有更新
  403. */
  404. bool _update;
  405. /*
  406. * 是否是第一次建立权重信息
  407. */
  408. bool _first;
  409. /**
  410. * 上次重新建立权重信息表的时间
  411. */
  412. int64_t _lastBuildWeightTime;
  413. /**
  414. * 负载值更新频率,单位毫秒
  415. */
  416. int32_t _updateWeightInterval;
  417. /**
  418. * 静态时,对应的节点路由选择
  419. */
  420. size_t _lastSWeightPosition;
  421. /**
  422. * 静态权重对应的节点路由缓存
  423. */
  424. vector<size_t> _staticRouterCache;
  425. /*
  426. * 静态权重的活跃节点
  427. */
  428. vector<AdapterProxy*> _activeWeightProxy;
  429. /*
  430. * hash静态权重的路由缓存
  431. */
  432. vector<size_t> _hashStaticRouterCache;
  433. /*
  434. * hash静态权重的缓存节点
  435. */
  436. vector<AdapterProxy*> _lastHashStaticProxys;
  437. /*
  438. * 一致性hash静态权重时使用
  439. */
  440. map<string, AdapterProxy*> _lastConHashWeightProxys;
  441. /*
  442. * 一致性hash静态权重时使用
  443. */
  444. TC_ConsistentHashNew _consistentHashWeight;
  445. /*
  446. * 一致性hash普通使用
  447. */
  448. map<string, AdapterProxy*> _lastConHashProxys;
  449. /*
  450. * 一致性hash普通使用
  451. */
  452. TC_ConsistentHashNew _consistentHash;
  453. struct OutterUpdate
  454. {
  455. set<EndpointInfo> active;
  456. set<EndpointInfo> inactive;
  457. };
  458. shared_ptr<OutterUpdate> _outterUpdate;
  459. };
  460. ////////////////////////////////////////////////////////////////////////
  461. /*
  462. * 对外按类型获取路由的实现类
  463. */
  464. class EndpointThread : public QueryEpBase
  465. {
  466. public:
  467. /*
  468. * 构造函数
  469. */
  470. EndpointThread(Communicator* pComm, const string & sObjName, GetEndpointType type, const string & sSetName, bool bFirstNetThread = false);
  471. /*
  472. * 析构函数
  473. */
  474. ~EndpointThread(){};
  475. /*
  476. * 用EndpointInfo存在可用与不可用的节点信息
  477. */
  478. void getEndpoints(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  479. /*
  480. * 用TC_Endpoint存在可用与不可用的节点信息
  481. */
  482. void getTCEndpoints(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  483. /*
  484. * 重写基类的实现
  485. */
  486. void notifyEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive, bool bSync);
  487. /*
  488. * 重写基类的实现
  489. */
  490. void doNotify()
  491. {
  492. }
  493. private:
  494. /*
  495. * 更新缓存的ip列表信息
  496. */
  497. void update(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive);
  498. private:
  499. /*
  500. * 类型
  501. */
  502. GetEndpointType _type;
  503. /*
  504. * Obj名称
  505. */
  506. string _name;
  507. /*
  508. * 锁
  509. */
  510. TC_ThreadMutex _mutex;
  511. /*
  512. * 活跃的结点
  513. */
  514. vector<EndpointInfo> _activeEndPoint;
  515. vector<TC_Endpoint> _activeTCEndPoint;
  516. /*
  517. * 不活跃的结点
  518. */
  519. vector<EndpointInfo> _inactiveEndPoint;
  520. vector<TC_Endpoint> _inactiveTCEndPoint;
  521. };
  522. ////////////////////////////////////////////////////////////////////////
  523. /*
  524. * 对外获取路由请求的封装类
  525. */
  526. class EndpointManagerThread
  527. {
  528. public:
  529. /*
  530. * 构造函数
  531. */
  532. EndpointManagerThread(Communicator *pComm, const string &sObjName);
  533. /*
  534. * 析构函数
  535. */
  536. ~EndpointManagerThread();
  537. /*
  538. * 按idc获取可用与不可用的结点
  539. */
  540. void getEndpoint(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  541. /*
  542. * 获取所有可用与不可用的结点
  543. */
  544. void getEndpointByAll(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  545. /*
  546. * 根据set获取可用与不可用的结点
  547. */
  548. void getEndpointBySet(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  549. /*
  550. * 根据地区获取可用与不可用的结点
  551. */
  552. void getEndpointByStation(const string &sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  553. /*
  554. * 按idc获取可用与不可用的结点
  555. */
  556. void getTCEndpoint( vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  557. /*
  558. * 获取所有可用与不可用的结点
  559. */
  560. void getTCEndpointByAll(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  561. /*
  562. * 根据set获取可用与不可用的结点
  563. */
  564. void getTCEndpointBySet(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  565. /*
  566. * 根据地区获取可用与不可用的结点
  567. */
  568. void getTCEndpointByStation(const string &sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  569. protected:
  570. /*
  571. * 根据type创建相应的EndpointThread
  572. */
  573. EndpointThread * getEndpointThread(GetEndpointType type, const string & sName);
  574. private:
  575. /*
  576. * 通信器
  577. */
  578. Communicator * _communicator;
  579. /*
  580. * Obj名称
  581. */
  582. string _objName;
  583. /*
  584. * 锁
  585. */
  586. // TC_ThreadLock _mutex;
  587. TC_SpinLock _mutex;
  588. /*
  589. * 保存对象的map
  590. */
  591. map<string,EndpointThread*> _info;
  592. };
  593. ////////////////////////////////////////////////////////////////////////
  594. }
  595. #endif