EndpointManager.h 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #ifndef __TARS_ENDPOINT_MANAGER_H_
  17. #define __TARS_ENDPOINT_MANAGER_H_
  18. #include "servant/EndpointInfo.h"
  19. #include "servant/EndpointF.h"
  20. #include "servant/QueryF.h"
  21. #include "servant/AppProtocol.h"
  22. #include "util/tc_spin_lock.h"
  23. #include "util/tc_consistent_hash_new.h"
  24. namespace tars
  25. {
  26. ////////////////////////////////////////////////////////////////////////
  27. /*
  28. * 获取路由的方式
  29. */
  30. enum GetEndpointType
  31. {
  32. E_DEFAULT = 0,
  33. E_ALL = 1,
  34. E_SET = 2,
  35. E_STATION = 3
  36. };
  37. /*
  38. * 权重类型
  39. */
  40. enum EndpointWeightType
  41. {
  42. E_LOOP = 0,
  43. E_STATIC_WEIGHT = 1,
  44. };
  45. ////////////////////////////////////////////////////////////////////////
  46. /*
  47. * 路由请求与回调的实现类
  48. */
  49. class QueryEpBase : public QueryFPrxCallback
  50. {
  51. public:
  52. /*
  53. * 构造函数
  54. */
  55. QueryEpBase(Communicator * pComm, bool bFirstNetThread, bool bInterfaceReq);
  56. /*
  57. * 析构函数
  58. */
  59. virtual ~QueryEpBase(){}
  60. /*
  61. * 初始化
  62. */
  63. bool init(const string & sObjName, const string& setName = "");
  64. /*
  65. * 获取所有节点信息的回调处理
  66. */
  67. void callback_findObjectById4All(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  68. /*
  69. * 获取所有节点信息的异常回调处理
  70. */
  71. void callback_findObjectById4All_exception(tars::Int32 ret);
  72. /*
  73. * 获取所有节点信息的回调处理
  74. */
  75. void callback_findObjectById4Any(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  76. /*
  77. * 获取所有节点信息的异常回调处理
  78. */
  79. void callback_findObjectById4Any_exception(tars::Int32 ret);
  80. /*
  81. * 按idc获取的节点信息的回调处理
  82. */
  83. void callback_findObjectByIdInSameGroup(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  84. /*
  85. * 按idc分组获取的节点信息的异常回调处理
  86. */
  87. void callback_findObjectByIdInSameGroup_exception(tars::Int32 ret);
  88. /*
  89. * 按set获取的节点信息的回调处理
  90. */
  91. void callback_findObjectByIdInSameSet(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  92. /*
  93. * 按set获取的节点信息的异常回调处理
  94. */
  95. void callback_findObjectByIdInSameSet_exception(tars::Int32 ret);
  96. /*
  97. * 按地区获取的节点信息的回调处理
  98. */
  99. void callback_findObjectByIdInSameStation(tars::Int32 ret, const vector<tars::EndpointF>& activeEp, const vector<tars::EndpointF>& inactiveEp);
  100. /*
  101. * 按地区获取的节点信息的异常回调处理
  102. */
  103. void callback_findObjectByIdInSameStation_exception(tars::Int32 ret);
  104. /*
  105. * 从主控请求到数据了 通知更新ip列表信息
  106. */
  107. virtual void notifyEndpoints(const set<EndpointInfo> & active,const set<EndpointInfo> & inactive,bool bSync) = 0;
  108. /*
  109. * 从主控请求到数据了 最开始调用主控 要通知
  110. */
  111. virtual void doNotify() = 0;
  112. /*
  113. * 设置主控的代理
  114. */
  115. int setLocatorPrx(QueryFPrx prx);
  116. /*
  117. * 是否直连后端
  118. */
  119. inline bool getDirectProxy() { return _direct; }
  120. protected:
  121. /*
  122. * 刷新主控
  123. */
  124. void refreshReg(GetEndpointType type,const string & sName);
  125. private:
  126. /*
  127. * 设置obj名字
  128. * 如果是直接连接,则从obj名字中提取ip列表信息
  129. * 如果是间接连接,则设置主控代理,并从缓存中加载相应的列表
  130. */
  131. void setObjName(const string & sObjName);
  132. /*
  133. * 解析endpoint
  134. */
  135. vector<string> sepEndpoint(const string& sEndpoints);
  136. /*
  137. * 从sEndpoints提取ip列表信息
  138. */
  139. void setEndpoints(const string & sEndpoints, set<EndpointInfo> & setEndpoints);
  140. /*
  141. * 主控的请求的响应到了,做相应的处理
  142. */
  143. void doEndpoints(const vector<EndpointF>& activeEp, const vector<EndpointF>& inactiveEp, int iRet, bool bSync = false);
  144. /*
  145. * 请求主控异常,做相应的处理
  146. */
  147. void doEndpointsExp(int iRet);
  148. /*
  149. * 刷新ip列表信息到缓存文件
  150. */
  151. void setEndPointToCache(bool bInactive);
  152. /*
  153. * 更新外部地址
  154. */
  155. virtual void onUpdateOutter() {};
  156. protected:
  157. /*
  158. * 通信器
  159. */
  160. Communicator * _communicator;
  161. /*
  162. * 是否第一个客户端网络线程
  163. * 若是,会对ip列表信息的写缓存
  164. */
  165. bool _firstNetThread;
  166. /*
  167. * 是否主动请求ip列表信息的接口的请求
  168. * 比如按idc获取某个obj的ip列表信息
  169. */
  170. bool _interfaceReq;
  171. /*
  172. * 是否直连后端服务
  173. */
  174. bool _direct;
  175. /*
  176. * 请求的后端服务的Obj对象名称
  177. */
  178. string _objName;
  179. /*
  180. * 指定set调用的setid,默认为空
  181. * 如果有值,则说明是指定set调用
  182. */
  183. string _invokeSetId;
  184. /*
  185. * 框架的主控地址
  186. */
  187. string _locator;
  188. /*
  189. * 主控的路由代理
  190. */
  191. QueryFPrx _queryFPrx;
  192. /*
  193. * 数据是否有效,初始化的时候是无效的数据
  194. * 只有请求过主控或者从文件缓存加载的数据才是有效数据
  195. */
  196. bool _valid;
  197. /*
  198. * 权重类型
  199. */
  200. EndpointWeightType _weightType;
  201. /*
  202. * 活跃节点列表
  203. */
  204. set<EndpointInfo> _activeEndpoints;
  205. /*
  206. * 不活跃节点列表
  207. */
  208. set<EndpointInfo> _inactiveEndpoints;
  209. /**
  210. * 是否是root servant
  211. */
  212. bool _rootServant;
  213. protected:
  214. /////////以下是请求主控的策略信息/////////////////
  215. /*
  216. * 是否正在向请求主控服务的ip列表信息
  217. */
  218. bool _requestRegistry;
  219. /*
  220. * 请求主控的超时时间(绝对时间),单位毫秒
  221. * 防止请求超时或者失败,一直处理请求状态
  222. */
  223. int64_t _requestTimeout;
  224. /*
  225. * 请求主控的超时间隔,默认5s
  226. */
  227. int _timeoutInterval;
  228. /*
  229. * 下次请求主控的时间(绝对时间),单位毫秒
  230. */
  231. int64_t _refreshTime;
  232. /*
  233. * 正常请求主控的频率(有ip列表信息),单位毫秒
  234. * 默认60s一次
  235. */
  236. int _refreshInterval;
  237. /*
  238. * 主控返回的活跃ip列表为空情况下
  239. * 请求主控的频率,默认10s一次
  240. */
  241. int _activeEmptyInterval;
  242. /*
  243. * 请求主控失败的时间频率
  244. * 默认2s一次
  245. */
  246. int _failInterval;
  247. /*
  248. * 请求主控连续失败达到一定的次数后
  249. * 请求主控的时间频率,默认30s一次
  250. */
  251. int _manyFailInterval;
  252. /*
  253. * 连续请求失败的次数限制,默认3次
  254. */
  255. int _failTimesLimit;
  256. /*
  257. * 连续失败的次数
  258. */
  259. int _failTimes;
  260. };
  261. ////////////////////////////////////////////////////////////////////////
  262. /*
  263. * 框架内部的路由管理的实现类
  264. */
  265. class EndpointManager : public QueryEpBase
  266. {
  267. public:
  268. static const size_t iMinWeightLimit = 10;
  269. static const size_t iMaxWeightLimit = 100;
  270. public:
  271. /*
  272. * 构造函数
  273. */
  274. EndpointManager(ObjectProxy* pObjectProxy, Communicator* pComm, bool bFirstNetThread);
  275. /*
  276. * 析构函数
  277. */
  278. virtual ~EndpointManager();
  279. /*
  280. * 重写基类的实现
  281. */
  282. void notifyEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive, bool bSync = false);
  283. /**
  284. * 更新
  285. * @param active
  286. * @param inactive
  287. */
  288. void updateEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive);
  289. /**
  290. * 外部其他通信过来的更新
  291. * @param active
  292. * @param inactive
  293. */
  294. void updateEndpointsOutter(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive);
  295. /*
  296. * 重写基类的实现
  297. */
  298. void doNotify();
  299. /**
  300. * 根据请求策略从可用的服务列表选择一个服务节点
  301. */
  302. bool selectAdapterProxy(ReqMessage * msg, AdapterProxy * & pAdapterProxy);
  303. /**
  304. * 获取所有的服务节点
  305. */
  306. const vector<AdapterProxy*> & getAdapters()
  307. {
  308. return _vAllProxys;
  309. }
  310. private:
  311. virtual void onUpdateOutter();
  312. /*
  313. * 轮询选取一个结点
  314. */
  315. AdapterProxy * getNextValidProxy();
  316. /*
  317. * 根据hash值选取一个结点
  318. */
  319. AdapterProxy* getHashProxy(int64_t hashCode, bool bConsistentHash = false);
  320. /*
  321. * 根据hash值按取模方式,从正常节点中选取一个结点
  322. */
  323. AdapterProxy* getHashProxyForNormal(int64_t hashCode);
  324. /*
  325. * 根据hash值按一致性hash方式,从正常节点中选取一个结点
  326. */
  327. AdapterProxy* getConHashProxyForNormal(int64_t hashCode);
  328. /*
  329. * 根据hash值按取模方式,从静态权重节点中选取一个结点
  330. */
  331. AdapterProxy* getHashProxyForWeight(int64_t hashCode, bool bStatic, vector<size_t> &vRouterCache);
  332. /*
  333. * 根据hash值按一致性hash方式,从静态权重节点中选取一个结点
  334. */
  335. AdapterProxy* getConHashProxyForWeight(int64_t hashCode, bool bStatic);
  336. /*
  337. * 判断静态权重节点是否有变化
  338. */
  339. bool checkHashStaticWeightChange(bool bStatic);
  340. /*
  341. * 判断静态权重节点是否有变化
  342. */
  343. bool checkConHashChange(bool bStatic, const vector<AdapterProxy*> &vLastConHashProxys);
  344. /*
  345. * 更新取模hash方法的静态权重节点信息
  346. */
  347. void updateHashProxyWeighted(bool bStatic);
  348. /*
  349. * 更新一致性hash方法的静态权重节点信息
  350. */
  351. void updateConHashProxyWeighted(bool bStatic, vector<AdapterProxy*> &vLastConHashProxys, TC_ConsistentHashNew &conHash);
  352. /*
  353. * 根据后端服务的权重值选取一个结点
  354. */
  355. AdapterProxy* getWeightedProxy(bool bStaticWeighted);
  356. /*
  357. * 根据后端服务的权重值选取一个结点
  358. */
  359. AdapterProxy* getWeightedForNormal(bool bStaticWeighted);
  360. /*
  361. * 根据各个节点的权重值,建立各个节点的调用数
  362. */
  363. void updateProxyWeighted();
  364. /*
  365. * 根据各个节点的静态权重值,建立各个节点的静态权重
  366. */
  367. void updateStaticWeighted();
  368. /*
  369. * 建立静态权重节点信息的缓存
  370. */
  371. void dispatchEndpointCache(const vector<int> &vWeight);
  372. private:
  373. /*
  374. * ObjectProxy
  375. */
  376. ObjectProxy * _objectProxy;
  377. /*
  378. * 活跃的结点
  379. */
  380. vector<AdapterProxy*> _activeProxys;
  381. /*
  382. * 部署的结点 包括活跃的和不活跃的
  383. */
  384. map<string,AdapterProxy*> _regProxys;
  385. vector<AdapterProxy*> _vRegProxys;
  386. /*
  387. * 所有曾经create的结点
  388. */
  389. map<string,AdapterProxy*> _allProxys;
  390. vector<AdapterProxy*> _vAllProxys;
  391. /*
  392. * 轮训访问_activeProxys的偏移
  393. */
  394. size_t _lastRoundPosition;
  395. /*
  396. * 节点信息是否有更新
  397. */
  398. bool _update;
  399. /*
  400. * 是否是第一次建立权重信息
  401. */
  402. bool _first;
  403. /**
  404. * 上次重新建立权重信息表的时间
  405. */
  406. int64_t _lastBuildWeightTime;
  407. /**
  408. * 负载值更新频率,单位毫秒
  409. */
  410. int32_t _updateWeightInterval;
  411. /**
  412. * 静态时,对应的节点路由选择
  413. */
  414. size_t _lastSWeightPosition;
  415. /**
  416. * 静态权重对应的节点路由缓存
  417. */
  418. vector<size_t> _staticRouterCache;
  419. /*
  420. * 静态权重的活跃节点
  421. */
  422. vector<AdapterProxy*> _activeWeightProxy;
  423. /*
  424. * hash静态权重的路由缓存
  425. */
  426. vector<size_t> _hashStaticRouterCache;
  427. /*
  428. * hash静态权重的缓存节点
  429. */
  430. vector<AdapterProxy*> _lastHashStaticProxys;
  431. /*
  432. * 一致性hash静态权重时使用
  433. */
  434. vector<AdapterProxy*> _lastConHashWeightProxys;
  435. /*
  436. * 一致性hash静态权重时使用
  437. */
  438. TC_ConsistentHashNew _consistentHashWeight;
  439. /*
  440. * 一致性hash普通使用
  441. */
  442. vector<AdapterProxy*> _lastConHashProxys;
  443. /*
  444. * 一致性hash普通使用
  445. */
  446. TC_ConsistentHashNew _consistentHash;
  447. struct OutterUpdate
  448. {
  449. set<EndpointInfo> active;
  450. set<EndpointInfo> inactive;
  451. };
  452. shared_ptr<OutterUpdate> _outterUpdate;
  453. };
  454. ////////////////////////////////////////////////////////////////////////
  455. /*
  456. * 对外按类型获取路由的实现类
  457. */
  458. class EndpointThread : public QueryEpBase
  459. {
  460. public:
  461. /*
  462. * 构造函数
  463. */
  464. EndpointThread(Communicator* pComm, const string & sObjName, GetEndpointType type, const string & sSetName, bool bFirstNetThread = false);
  465. /*
  466. * 析构函数
  467. */
  468. ~EndpointThread(){};
  469. /*
  470. * 用EndpointInfo存在可用与不可用的节点信息
  471. */
  472. void getEndpoints(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  473. /*
  474. * 用TC_Endpoint存在可用与不可用的节点信息
  475. */
  476. void getTCEndpoints(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  477. /*
  478. * 重写基类的实现
  479. */
  480. void notifyEndpoints(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive, bool bSync);
  481. /*
  482. * 重写基类的实现
  483. */
  484. void doNotify()
  485. {
  486. }
  487. private:
  488. /*
  489. * 更新缓存的ip列表信息
  490. */
  491. void update(const set<EndpointInfo> & active, const set<EndpointInfo> & inactive);
  492. private:
  493. /*
  494. * 类型
  495. */
  496. GetEndpointType _type;
  497. /*
  498. * Obj名称
  499. */
  500. string _name;
  501. /*
  502. * 锁
  503. */
  504. TC_ThreadMutex _mutex;
  505. /*
  506. * 活跃的结点
  507. */
  508. vector<EndpointInfo> _activeEndPoint;
  509. vector<TC_Endpoint> _activeTCEndPoint;
  510. /*
  511. * 不活跃的结点
  512. */
  513. vector<EndpointInfo> _inactiveEndPoint;
  514. vector<TC_Endpoint> _inactiveTCEndPoint;
  515. };
  516. ////////////////////////////////////////////////////////////////////////
  517. /*
  518. * 对外获取路由请求的封装类
  519. */
  520. class EndpointManagerThread
  521. {
  522. public:
  523. /*
  524. * 构造函数
  525. */
  526. EndpointManagerThread(Communicator *pComm, const string &sObjName);
  527. /*
  528. * 析构函数
  529. */
  530. ~EndpointManagerThread();
  531. /*
  532. * 按idc获取可用与不可用的结点
  533. */
  534. void getEndpoint(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  535. /*
  536. * 获取所有可用与不可用的结点
  537. */
  538. void getEndpointByAll(vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  539. /*
  540. * 根据set获取可用与不可用的结点
  541. */
  542. void getEndpointBySet(const string sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  543. /*
  544. * 根据地区获取可用与不可用的结点
  545. */
  546. void getEndpointByStation(const string sName, vector<EndpointInfo> &activeEndPoint, vector<EndpointInfo> &inactiveEndPoint);
  547. /*
  548. * 按idc获取可用与不可用的结点
  549. */
  550. void getTCEndpoint( vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  551. /*
  552. * 获取所有可用与不可用的结点
  553. */
  554. void getTCEndpointByAll(vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  555. /*
  556. * 根据set获取可用与不可用的结点
  557. */
  558. void getTCEndpointBySet(const string sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  559. /*
  560. * 根据地区获取可用与不可用的结点
  561. */
  562. void getTCEndpointByStation(const string sName, vector<TC_Endpoint> &activeEndPoint, vector<TC_Endpoint> &inactiveEndPoint);
  563. protected:
  564. /*
  565. * 根据type创建相应的EndpointThread
  566. */
  567. EndpointThread * getEndpointThread(GetEndpointType type, const string & sName);
  568. private:
  569. /*
  570. * 通信器
  571. */
  572. Communicator * _communicator;
  573. /*
  574. * Obj名称
  575. */
  576. string _objName;
  577. /*
  578. * 锁
  579. */
  580. // TC_ThreadLock _mutex;
  581. TC_SpinLock _mutex;
  582. /*
  583. * 保存对象的map
  584. */
  585. map<string,EndpointThread*> _info;
  586. };
  587. ////////////////////////////////////////////////////////////////////////
  588. }
  589. #endif