EndpointManager.h 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #ifndef __TARS_ENDPOINT_MANAGER_H_
  17. #define __TARS_ENDPOINT_MANAGER_H_
  18. #include "servant/EndpointInfo.h"
  19. #include "servant/EndpointF.h"
  20. #include "servant/QueryF.h"
  21. #include "servant/AppProtocol.h"
  22. #include "util/tc_spin_lock.h"
  23. #include "util/tc_consistent_hash_new.h"
  24. namespace tars
  25. {
  26. ////////////////////////////////////////////////////////////////////////
  27. /*
  28. * 获取路由的方式
  29. */
  30. enum GetEndpointType
  31. {
  32. E_DEFAULT = 0,
  33. E_ALL = 1,
  34. E_SET = 2,
  35. E_STATION = 3
  36. };
  37. /*
  38. * 权重类型
  39. */
  40. enum EndpointWeightType
  41. {
  42. E_LOOP = 0,
  43. E_STATIC_WEIGHT = 1,
  44. };
  45. ////////////////////////////////////////////////////////////////////////
  46. /*
  47. * 路由请求与回调的实现类
  48. */
  49. class QueryEpBase : public QueryFPrxCallback
  50. {
  51. public:
  52. /*
  53. * 构造函数
  54. */
  55. QueryEpBase(Communicator * pComm, bool bFirstNetThread, bool bInterfaceReq);
  56. /*
  57. * 析构函数
  58. */
  59. virtual ~QueryEpBase(){}
  60. /*
  61. * 初始化
  62. */
  63. bool init(const string & sObjName, const string & sLocator, const string& setName = "");
  64. /*
  65. * 获取所有节点信息的回调处理
  66. */
  67. void callback_findObjectById4All(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  68. /*
  69. * 获取所有节点信息的异常回调处理
  70. */
  71. void callback_findObjectById4All_exception(tars::Int32 ret);
  72. /*
  73. * 获取所有节点信息的回调处理
  74. */
  75. void callback_findObjectById4Any(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  76. /*
  77. * 获取所有节点信息的异常回调处理
  78. */
  79. void callback_findObjectById4Any_exception(tars::Int32 ret);
  80. /*
  81. * 按idc获取的节点信息的回调处理
  82. */
  83. void callback_findObjectByIdInSameGroup(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  84. /*
  85. * 按idc分组获取的节点信息的异常回调处理
  86. */
  87. void callback_findObjectByIdInSameGroup_exception(tars::Int32 ret);
  88. /*
  89. * 按set获取的节点信息的回调处理
  90. */
  91. void callback_findObjectByIdInSameSet(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  92. /*
  93. * 按set获取的节点信息的异常回调处理
  94. */
  95. void callback_findObjectByIdInSameSet_exception(tars::Int32 ret);
  96. /*
  97. * 按地区获取的节点信息的回调处理
  98. */
  99. void callback_findObjectByIdInSameStation(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  100. /*
  101. * 按地区获取的节点信息的异常回调处理
  102. */
  103. void callback_findObjectByIdInSameStation_exception(tars::Int32 ret);
  104. /*
  105. * 从主控请求到数据了 通知更新ip列表信息
  106. */
  107. virtual void notifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive,bool bSync) = 0;
  108. /*
  109. * 从主控请求到数据了 最开始调用主控 要通知
  110. */
  111. virtual void doNotify() = 0;
  112. /*
  113. * 设置主控的代理
  114. */
  115. int setLocatorPrx(QueryFPrx prx);
  116. /*
  117. * 是否直连后端
  118. */
  119. inline bool getDirectProxy() { return _direct; }
  120. protected:
  121. /*
  122. * 刷新主控
  123. */
  124. void refreshReg(GetEndpointType type,const string & sName);
  125. private:
  126. /*
  127. * 设置obj名字
  128. * 如果是直接连接,则从obj名字中提取ip列表信息
  129. * 如果是间接连接,则设置主控代理,并从缓存中加载相应的列表
  130. */
  131. void setObjName(const string & sObjName);
  132. /*
  133. * 解析endpoint
  134. */
  135. vector<string> sepEndpoint(const string& sEndpoints);
  136. /*
  137. * 从sEndpoints提取ip列表信息
  138. */
  139. void setEndpoints(const string & sEndpoints, set<EndpointInfo> & setEndpoints);
  140. /*
  141. * 主控的请求的响应到了,做相应的处理
  142. */
  143. void doEndpoints(const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp, int iRet, bool bSync = false);
  144. /*
  145. * 请求主控异常,做相应的处理
  146. */
  147. void doEndpointsExp(int iRet);
  148. /*
  149. * 刷新ip列表信息到缓存文件
  150. */
  151. void setEndPointToCache(bool bInactive);
  152. protected:
  153. /*
  154. * 通信器
  155. */
  156. Communicator * _communicator;
  157. /*
  158. * 是否第一个客户端网络线程
  159. * 若是,会对ip列表信息的写缓存
  160. */
  161. bool _firstNetThread;
  162. /*
  163. * 是否主动请求ip列表信息的接口的请求
  164. * 比如按idc获取某个obj的ip列表信息
  165. */
  166. bool _interfaceReq;
  167. /*
  168. * 是否直连后端服务
  169. */
  170. bool _direct;
  171. /*
  172. * 请求的后端服务的Obj对象名称
  173. */
  174. string _objName;
  175. /*
  176. * 指定set调用的setid,默认为空
  177. * 如果有值,则说明是指定set调用
  178. */
  179. string _invokeSetId;
  180. /*
  181. * 框架的主控地址
  182. */
  183. string _locator;
  184. /*
  185. * 主控的路由代理
  186. */
  187. QueryFPrx _queryFPrx;
  188. /*
  189. * 数据是否有效,初始化的时候是无效的数据
  190. * 只有请求过主控或者从文件缓存加载的数据才是有效数据
  191. */
  192. bool _valid;
  193. /*
  194. * 权重类型
  195. */
  196. EndpointWeightType _weightType;
  197. /*
  198. * 活跃节点列表
  199. */
  200. set<EndpointInfo> _activeEndpoints;
  201. /*
  202. * 不活跃节点列表
  203. */
  204. set<EndpointInfo> _inactiveEndpoints;
  205. /**
  206. * 是否是root servant
  207. */
  208. bool _rootServant;
  209. private:
  210. /////////以下是请求主控的策略信息/////////////////
  211. /*
  212. * 是否正在向请求主控服务的ip列表信息
  213. */
  214. bool _requestRegistry;
  215. /*
  216. * 请求主控的超时时间(绝对时间),单位毫秒
  217. * 防止请求超时或者失败,一直处理请求状态
  218. */
  219. int64_t _requestTimeout;
  220. /*
  221. * 请求主控的超时间隔,默认5s
  222. */
  223. int _timeoutInterval;
  224. /*
  225. * 下次请求主控的时间(绝对时间),单位毫秒
  226. */
  227. int64_t _refreshTime;
  228. /*
  229. * 正常请求主控的频率(有ip列表信息),单位毫秒
  230. * 默认60s一次
  231. */
  232. int _refreshInterval;
  233. /*
  234. * 主控返回的活跃ip列表为空情况下
  235. * 请求主控的频率,默认10s一次
  236. */
  237. int _activeEmptyInterval;
  238. /*
  239. * 请求主控失败的时间频率
  240. * 默认2s一次
  241. */
  242. int _failInterval;
  243. /*
  244. * 请求主控连续失败达到一定的次数后
  245. * 请求主控的时间频率,默认30s一次
  246. */
  247. int _manyFailInterval;
  248. /*
  249. * 连续请求失败的次数限制,默认3次
  250. */
  251. int _failTimesLimit;
  252. /*
  253. * 连续失败的次数
  254. */
  255. int _failTimes;
  256. };
  257. ////////////////////////////////////////////////////////////////////////
  258. /*
  259. * 框架内部的路由管理的实现类
  260. */
  261. class EndpointManager : public QueryEpBase
  262. {
  263. public:
  264. static const size_t iMinWeightLimit = 10;
  265. static const size_t iMaxWeightLimit = 100;
  266. public:
  267. /*
  268. * 构造函数
  269. */
  270. EndpointManager(ObjectProxy* pObjectProxy, Communicator* pComm, const string & sObjName, bool bFirstNetThread, const string& setName = "");
  271. /*
  272. * 析构函数
  273. */
  274. virtual ~EndpointManager();
  275. /*
  276. * 重写基类的实现
  277. */
  278. void notifyEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive, bool bSync = false);
  279. /**
  280. * 更新
  281. * @param active
  282. * @param inactive
  283. */
  284. void updateEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive);
  285. /*
  286. * 重写基类的实现
  287. */
  288. void doNotify();
  289. /**
  290. * 根据请求策略从可用的服务列表选择一个服务节点
  291. */
  292. bool selectAdapterProxy(ReqMessage * msg, AdapterProxy * & pAdapterProxy, bool onlyCheck);
  293. /**
  294. * 获取所有的服务节点
  295. */
  296. const vector<AdapterProxy*> & getAdapters()
  297. {
  298. return _vAllProxys;
  299. }
  300. private:
  301. /*
  302. * 轮询选取一个结点
  303. */
  304. AdapterProxy * getNextValidProxy(bool onlyCheck);
  305. /*
  306. * 根据hash值选取一个结点
  307. */
  308. AdapterProxy* getHashProxy(int64_t hashCode, bool bConsistentHash = false, bool onlyCheck = false);
  309. /*
  310. * 根据hash值按取模方式,从正常节点中选取一个结点
  311. */
  312. AdapterProxy* getHashProxyForNormal(int64_t hashCode, bool onlyCheck);
  313. /*
  314. * 根据hash值按一致性hash方式,从正常节点中选取一个结点
  315. */
  316. AdapterProxy* getConHashProxyForNormal(int64_t hashCode, bool onlyCheck);
  317. /*
  318. * 根据hash值按取模方式,从静态权重节点中选取一个结点
  319. */
  320. AdapterProxy* getHashProxyForWeight(int64_t hashCode, bool bStatic, vector<size_t> &vRouterCache, bool onlyCheck);
  321. /*
  322. * 根据hash值按一致性hash方式,从静态权重节点中选取一个结点
  323. */
  324. AdapterProxy* getConHashProxyForWeight(int64_t hashCode, bool bStatic, bool onlyCheck);
  325. /*
  326. * 判断静态权重节点是否有变化
  327. */
  328. bool checkHashStaticWeightChange(bool bStatic);
  329. /*
  330. * 判断静态权重节点是否有变化
  331. */
  332. bool checkConHashChange(bool bStatic, const vector<AdapterProxy*> &vLastConHashProxys);
  333. /*
  334. * 更新取模hash方法的静态权重节点信息
  335. */
  336. void updateHashProxyWeighted(bool bStatic);
  337. /*
  338. * 更新一致性hash方法的静态权重节点信息
  339. */
  340. void updateConHashProxyWeighted(bool bStatic, vector<AdapterProxy*> &vLastConHashProxys, TC_ConsistentHashNew &conHash);
  341. /*
  342. * 根据后端服务的权重值选取一个结点
  343. */
  344. AdapterProxy* getWeightedProxy(bool bStaticWeighted, bool onlyCheck);
  345. /*
  346. * 根据后端服务的权重值选取一个结点
  347. */
  348. AdapterProxy* getWeightedForNormal(bool bStaticWeighted, bool onlyCheck);
  349. /*
  350. * 根据各个节点的权重值,建立各个节点的调用数
  351. */
  352. void updateProxyWeighted();
  353. /*
  354. * 根据各个节点的静态权重值,建立各个节点的静态权重
  355. */
  356. void updateStaticWeighted();
  357. /*
  358. * 建立静态权重节点信息的缓存
  359. */
  360. void dispatchEndpointCache(const vector<int> &vWeight);
  361. private:
  362. /*
  363. * ObjectProxy
  364. */
  365. ObjectProxy * _objectProxy;
  366. /*
  367. * 活跃的结点
  368. */
  369. vector<AdapterProxy*> _activeProxys;
  370. /*
  371. * 部署的结点 包括活跃的和不活跃的
  372. */
  373. map<string,AdapterProxy*> _regProxys;
  374. vector<AdapterProxy*> _vRegProxys;
  375. /*
  376. * 所有曾经create的结点
  377. */
  378. map<string,AdapterProxy*> _allProxys;
  379. vector<AdapterProxy*> _vAllProxys;
  380. /*
  381. * 轮训访问_activeProxys的偏移
  382. */
  383. size_t _lastRoundPosition;
  384. /*
  385. * 节点信息是否有更新
  386. */
  387. bool _update;
  388. /*
  389. * 是否是第一次建立权重信息
  390. */
  391. bool _first;
  392. /**
  393. * 上次重新建立权重信息表的时间
  394. */
  395. int64_t _lastBuildWeightTime;
  396. /**
  397. * 负载值更新频率,单位毫秒
  398. */
  399. int32_t _updateWeightInterval;
  400. /**
  401. * 静态时,对应的节点路由选择
  402. */
  403. size_t _lastSWeightPosition;
  404. /**
  405. * 静态权重对应的节点路由缓存
  406. */
  407. vector<size_t> _staticRouterCache;
  408. /*
  409. * 静态权重的活跃节点
  410. */
  411. vector<AdapterProxy*> _activeWeightProxy;
  412. /*
  413. * hash静态权重的路由缓存
  414. */
  415. vector<size_t> _hashStaticRouterCache;
  416. /*
  417. * hash静态权重的缓存节点
  418. */
  419. vector<AdapterProxy*> _lastHashStaticProxys;
  420. /*
  421. * 一致性hash静态权重时使用
  422. */
  423. vector<AdapterProxy*> _lastConHashWeightProxys;
  424. /*
  425. * 一致性hash静态权重时使用
  426. */
  427. TC_ConsistentHashNew _consistentHashWeight;
  428. /*
  429. * 一致性hash普通使用
  430. */
  431. vector<AdapterProxy*> _lastConHashProxys;
  432. /*
  433. * 一致性hash普通使用
  434. */
  435. TC_ConsistentHashNew _consistentHash;
  436. };
  437. ////////////////////////////////////////////////////////////////////////
  438. /*
  439. * 对外按类型获取路由的实现类
  440. */
  441. class EndpointThread : public QueryEpBase
  442. {
  443. public:
  444. /*
  445. * 构造函数
  446. */
  447. EndpointThread(Communicator* pComm, const string & sObjName, GetEndpointType type, const string & sSetName, bool bFirstNetThread = false);
  448. /*
  449. * 析构函数
  450. */
  451. ~EndpointThread(){};
  452. /*
  453. * 用EndpointInfo存在可用与不可用的节点信息
  454. */
  455. void getEndpoints(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  456. /*
  457. * 用TC_Endpoint存在可用与不可用的节点信息
  458. */
  459. void getTCEndpoints(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  460. /*
  461. * 重写基类的实现
  462. */
  463. void notifyEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive, bool bSync);
  464. /*
  465. * 重写基类的实现
  466. */
  467. void doNotify()
  468. {
  469. }
  470. private:
  471. /*
  472. * 更新缓存的ip列表信息
  473. */
  474. void update(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive);
  475. private:
  476. /*
  477. * 类型
  478. */
  479. GetEndpointType _type;
  480. /*
  481. * Obj名称
  482. */
  483. string _name;
  484. /*
  485. * 锁
  486. */
  487. // TC_ThreadLock _mutex;
  488. TC_SpinLock _mutex;
  489. /*
  490. * 活跃的结点
  491. */
  492. vector<EndpointInfo> _activeEndPoint;
  493. vector<TC_Endpoint> _activeTCEndPoint;
  494. /*
  495. * 不活跃的结点
  496. */
  497. vector<EndpointInfo> _inactiveEndPoint;
  498. vector<TC_Endpoint> _inactiveTCEndPoint;
  499. };
  500. ////////////////////////////////////////////////////////////////////////
  501. /*
  502. * 对外获取路由请求的封装类
  503. */
  504. class EndpointManagerThread
  505. {
  506. public:
  507. /*
  508. * 构造函数
  509. */
  510. EndpointManagerThread(Communicator *pComm, const string &sObjName);
  511. /*
  512. * 析构函数
  513. */
  514. ~EndpointManagerThread();
  515. /*
  516. * 按idc获取可用与不可用的结点
  517. */
  518. void getEndpoint(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  519. /*
  520. * 获取所有可用与不可用的结点
  521. */
  522. void getEndpointByAll(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  523. /*
  524. * 根据set获取可用与不可用的结点
  525. */
  526. void getEndpointBySet(const string sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  527. /*
  528. * 根据地区获取可用与不可用的结点
  529. */
  530. void getEndpointByStation(const string sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  531. /*
  532. * 按idc获取可用与不可用的结点
  533. */
  534. void getTCEndpoint( vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  535. /*
  536. * 获取所有可用与不可用的结点
  537. */
  538. void getTCEndpointByAll(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  539. /*
  540. * 根据set获取可用与不可用的结点
  541. */
  542. void getTCEndpointBySet(const string sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  543. /*
  544. * 根据地区获取可用与不可用的结点
  545. */
  546. void getTCEndpointByStation(const string sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  547. protected:
  548. /*
  549. * 根据type创建相应的EndpointThread
  550. */
  551. EndpointThread * getEndpointThread(GetEndpointType type, const string & sName);
  552. private:
  553. /*
  554. * 通信器
  555. */
  556. Communicator * _communicator;
  557. /*
  558. * Obj名称
  559. */
  560. string _objName;
  561. /*
  562. * 锁
  563. */
  564. // TC_ThreadLock _mutex;
  565. TC_SpinLock _mutex;
  566. /*
  567. * 保存对象的map
  568. */
  569. map<string,EndpointThread*> _info;
  570. };
  571. ////////////////////////////////////////////////////////////////////////
  572. }
  573. #endif