Communicator.h 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #ifndef __TARS_COMMUNICATOR_H_
  17. #define __TARS_COMMUNICATOR_H_
  18. #include "util/tc_thread.h"
  19. #include "util/tc_config.h"
  20. #include "util/tc_singleton.h"
  21. #include "servant/Global.h"
  22. #include "servant/ServantProxy.h"
  23. #include "servant/ServantProxyFactory.h"
  24. //#include "servant/ObjectProxyFactory.h"
  25. #include "servant/AsyncProcThread.h"
  26. // #include "servant/CommunicatorEpoll.h"
  27. #include "servant/StatReport.h"
  28. #include "servant/RemoteLogger.h"
  29. #include "util/tc_openssl.h"
  30. // #ifdef TARS_OPENTRACKING
  31. // #include "zipkin/opentracing.h"
  32. // #include "zipkin/tracer.h"
  33. // #include "zipkin/ip_address.h"
  34. // #endif
  35. //
  36. const static string CONFIG_ROOT_PATH = string("/tars/application/client");
  37. /**
  38. * 设计核心:
  39. * - 设计的核心是协程化, 如果本身就处于协程状态下, rpc网络通信就复现当前的协程调度器, 从而网络收发逻辑和rpc都在一个线程中处理, 减少线程切换, 降低延时!!!
  40. * - 如果发起rpc的线程不是协程, 则请求丢给了实际的网络线程处理, 和之前版本保持一致
  41. * - 结合到服务器端的模型, 这样设计的好处是, 如果都处于协程模式, 客户端和服务器可以复用相同的协程调度器, 从而保证服务器接受请求, 发起rpc, 接收rpc响应, 到回包给客户端, 都在一个线程中处理, 无线程切换逻辑!
  42. * - 从而大幅度降低了系统延时!
  43. *
  44. * 基本说明:
  45. * - 通信器Communicator是包含了所有客户端调用的资源, 原则上在调用生命周期都必须存在
  46. * - 每个通信器都包含多个CommunicatorEpoll, 这个是Communicator创建是构建出来的, 此时CommunicatorEpoll的个数和客户端网络线程相同, 至少有1个
  47. * - 把这种初始就生成的CommunicatorEpoll作为公有的, 后续可能会动态创建私有的CommunicatorEpoll
  48. * - 无论公有还是私有CommunicatorEpoll, 网络层都用的协程调度器中的epoller对象, 这样方便复用(尤其是针对私有CommunicatorEpoll)
  49. * - 私有CommunicatorEpoll并不是完整的网路线程, 它复用了业务线程中的协程调度器(这里业务线程可能是服务器端的业务线程/协程, 自带了协程调度器), 参见后续说明
  50. * - 公有和私有的CommunicatorEpoll都会存在于Communicator对象中
  51. * - 无论是公有还是私有的CommunicatorEpoll的生命周期由通信器来管理, 通信器析构时会被释放
  52. * - 一旦Communicator被释放, 它包含的所有资源都被释放了, 由它创建的ServantPrx, 都不能再进行网络通信!!!
  53. * - ServantProxy针对每个通信器而言, 是全局唯一的, 根据stringToProxy传入的obj来唯一确定
  54. * - 调用Communicator::stringToProxy, 构建ServantProxy时, 会调用每个CommunicatorEpoll创建对应的ObjectProxy
  55. * - 即每个ObjectProxy实例唯一对应了一个CommunicatorEpoll, 即代表了网络收发处理线程
  56. * - ObjectProxy的生命周期被CommunicatorEpoll管理, 当CommunicatorEpoll释放时, ObjectProxy会被释放
  57. * - 实际的rpc调用, 虽然调用的是ServantProxy, 但是实际会选择具体发送的ObjectProxy(即选择了CommunicatorEpoll) 和 发送队列
  58. * - 这个发送队列即: 在业务线程 和 CommunicatorEpoll 存在一个无锁的队列(限制长度, 每个元素是ReqMessage, 代表本次请求)
  59. * - 这个无锁队列, 被业务线程的私有数据管理, 当第一次使用某个CommunicatorEpoll时, 创建出来, 它的析构是复杂逻辑, 参考后续逻辑!
  60. * - 业务线程退出时, 会导致线程私有数据析构, 析构时发消息给CommunicatorEpoll, 在网络线程中释放资源
  61. * - 所以框架要求, 业务线程先释放, 才能释放框架的网路线程
  62. *
  63. * CommunicatorEpoll设计说明
  64. * 1 当业务线程处于协程中, 只使用私有CommunicatorEpoll(不使用的公用的)
  65. * - 基于当前协程的调度器TC_CoroutineScheduler, 全新创建CommunicatorEpoll
  66. * - 使用该CommunicatorEpoll来处理网络请求, 这样复用相同的调度器(该调度器可能是服务端线程)
  67. * - 这样所有网络请求都在同一个线程里面处理了
  68. * - 该调度器, 保存在线程私有对象中(和Communicator指针关联), 同时也保存在Communicator中
  69. * - 当Communicator对象析构时, 主动释放该CommunicatorEpoll
  70. * - 创建CommunicatorEpoll时, 注意需要clone所有的ObjectProxy(从公有CommunicatorEpoll中复制), 并选择返回对应的ObjectProxy
  71. * - 使用ObjectProxy来发送数据
  72. * - 这种场景下, 数据收发其实都在业务线程中处理了!!!
  73. * 2 当业务线程处于普通线程中(不存在协程调度器), 只使用公有CommunicatorEpoll
  74. * - 轮询选择公有的CommunicatorEpoll的, 注意此时不选择私有CommunicatorEpoll来发送数据, 降低系统的复杂度
  75. * - 轮询的计数器保持在线程私有数据中
  76. *
  77. * 析构问题处理
  78. * - 通信器是管理客户端资源的对象
  79. * - 通信器析构 以及 业务线程(发起到rpc调用)退出时, 如何处理相关资源释放的是重点需要考虑的
  80. * - 这里最核心的处理是 业务线程和CommunicatorEpoll之前有发送队列, 这个发送队列由业务线程私有数据保持, 它的问题在于:
  81. * 1 如果业务线程先退出, 将发送队列先析构, 但是如果此时网络线程仍在使用, 则会有问题
  82. * 2 如果网络线程先退出, 业务线程退出时, 拿到的CommunicatorEpoll指针可能有问题!
  83. * - 解决方案:
  84. * 1 CommunicatorEpoll使用shared_ptr, 业务线程私有数据中持有CommunicatorEpoll时, 采用weak_ptr, 这样业务线程退出时能感知CommunicatorEpoll是否还存在
  85. * 2 如果业务线程退出时, CommunicatorEpoll不存在了, 直接delete掉发送队列即可
  86. * 3 如果业务线程退出时, CommunicatorEpoll仍然存在, 发送通知给CommunicatorEpoll, 通知它业务线程退出
  87. * 4 发送队列使用shared_ptr被线程私有数据持有, 同时它作为weak_ptr被notify通知对象持有
  88. * 5 网络线程收到notify以后, 获取发送队列的weak_ptr, 转成shared_ptr以后才使用, 保证有效性, 如果转换后shared_ptr为NULL, 表示业务线程已经退出了, 此时可以不需要做任何处理
  89. * 6 如果notify对象释放时(CommunicatorEpoll析构), 会把发送队列中的数据清空, delete msg
  90. * 7 如果通信器先析构, 实际上会有一定的泄露(非常少), 线程私有变量中记录通信器信息的map不会删除记录(直到业务线程退出才会释放掉), 这里其实有资源泄露, 但是极少, 可以不管, 除非代码不断在构造和析构通信器!
  91. *
  92. * ObjectProxy创建的问题
  93. * - ServantProxy对象, 对每个服务而言, 是全局唯一的, 它背后对应的ObjectProxy, 是每个网络线程/协程都有一个, 即CommunicatorEpoll内部每个ServantProxy都对应了一个ObjectProxy
  94. * - 对于公有的CommunicatorEpoll, 它内部的ObjectProxy是stringToProxy时, 自动创建出来的
  95. * - 对于私有CommunicatorEpoll, 它内部的ObjectProxy是ServantProxy在invoke的时候创建出来的, 这样由于调用逻辑的原因, 私有CommunicatorEpoll内部拥有的ObjectProxy是不一样的!
  96. * - 私有CommunicatorEpoll内部ObjectProxy不一样, 导致了后需要更新ip list的机制不同
  97. *
  98. * ObjectProxy服务地址更新的问题
  99. * - 由于一个进程中CommunicatorEpoll可能会有多个(公有的+私有的), 从而会有多个ObjectProxy, 带来多次更新主控的问题
  100. * - 为了避免这种现象, 设计上目前只有公有CommunicatorEpoll且netThreadSeq==0的(isFirstNetThread), 才回主动更新主控
  101. * - 当第一个公有CommunicatorEpoll更新主控, 获取到服务的ip list之后, 会遍历所有CommunicatorEpoll, 通知所有CommunicatorEpoll里面对应的ObjectProxy去更新这个ip list
  102. * - 注意私有CommunicatorEpoll内部, 可能不存在这个ObjectProxy, 可能就不要更新ip list了, 需要特殊判断
  103. * - 私有CommunicatorEpoll中的ObjectProxy不会主动更新主控
  104. * -
  105. */
  106. namespace tars
  107. {
  108. class CommunicatorEpoll;
  109. class TC_OpenSSL;
  110. ////////////////////////////////////////////////////////////////////////
  111. /**
  112. * 客户端配置
  113. */
  114. struct ClientConfig
  115. {
  116. /**
  117. * 客户端IP地址
  118. */
  119. static string LocalIp;
  120. /**
  121. * 客户端模块名称
  122. */
  123. static string ModuleName;
  124. /**
  125. * 客户端所有的IP地址
  126. */
  127. static set<string> SetLocalIp;
  128. /**
  129. *客户端是否打开set分组
  130. */
  131. static bool SetOpen;
  132. /**
  133. *客户端set分组
  134. */
  135. static string SetDivision;
  136. /**
  137. * 客户端的版本号
  138. */
  139. static string TarsVersion;
  140. };
  141. ////////////////////////////////////////////////////////////////////////
  142. /**
  143. * 通信器,用于创建和维护客户端proxy
  144. */
  145. class SVT_DLL_API Communicator : public TC_HandleBase, public TC_ThreadRecMutex
  146. {
  147. public:
  148. typedef std::function<void(ReqMessagePtr)> custom_callback;
  149. /**
  150. * 构造函数
  151. */
  152. Communicator();
  153. /**
  154. * 采用配置构造
  155. * @param conf
  156. * @param path
  157. */
  158. Communicator(TC_Config& conf, const string& domain = CONFIG_ROOT_PATH);
  159. /**
  160. * 析够
  161. * 析够时自动接收相关线程
  162. */
  163. ~Communicator();
  164. public:
  165. /**
  166. * 生成代理
  167. * @param T
  168. * @param objectName
  169. * @param setName 指定set调用的setid
  170. * @return T
  171. */
  172. template<class T> T stringToProxy(const string& objectName, const string& setName = "")
  173. {
  174. T prx = NULL;
  175. stringToProxy<T>(objectName, prx, setName);
  176. return prx;
  177. }
  178. /**
  179. * 生成代理
  180. * @param T
  181. * @param objectName
  182. * @param setName 指定set调用的setid
  183. * @param proxy
  184. */
  185. template<class T> void stringToProxy(const string& objectName, T& proxy, const string& setName = "")
  186. {
  187. ServantProxy *pServantProxy = getServantProxy(objectName, setName);
  188. proxy = (typename T::element_type *)(pServantProxy);
  189. }
  190. /**
  191. * 获取公有网络线程个数
  192. * @return
  193. */
  194. inline size_t getCommunicatorEpollNum()
  195. {
  196. return _communicatorEpoll.size();
  197. }
  198. /*
  199. *获取公有网络线程的对象
  200. */
  201. inline const shared_ptr<CommunicatorEpoll> &getCommunicatorEpoll(size_t iNum)
  202. {
  203. assert(iNum < getCommunicatorEpollNum());
  204. return _communicatorEpoll[iNum];
  205. }
  206. /**
  207. * 获取所有的网络通信器(包括公有和私有的)
  208. * @return
  209. */
  210. vector<shared_ptr<CommunicatorEpoll>> getAllCommunicatorEpoll();
  211. /**
  212. * 获取属性
  213. * @param name
  214. * @param dft, 缺省值
  215. * @return string
  216. */
  217. string getProperty(const string& name, const string& dft = "");
  218. /**
  219. * 设置属性
  220. * @param properties
  221. */
  222. void setProperty(const map<string, string>& properties);
  223. /**
  224. * 设置某一个属性
  225. * @param name
  226. * @param value
  227. */
  228. void setProperty(const string& name, const string& value);
  229. /**
  230. * 设置属性
  231. * @param conf
  232. * @param path
  233. */
  234. void setProperty(TC_Config& conf, const string& domain = CONFIG_ROOT_PATH);
  235. /**
  236. * get servant property
  237. * @param sObj
  238. * @return
  239. */
  240. map<string, string> getServantProperty(const string &sObj);
  241. /**
  242. * set servant property
  243. * @param sObj
  244. * @return
  245. */
  246. void setServantProperty(const string &sObj, const string& name, const string& value);
  247. /**
  248. * get servant property
  249. * @param sObj
  250. * @return
  251. */
  252. string getServantProperty(const string &sObj, const string& name);
  253. /**
  254. * 上报统计
  255. * @return StatReport*
  256. */
  257. StatReport * getStatReport();
  258. /**
  259. * 重新加载属性
  260. */
  261. int reloadProperty(string & sResult);
  262. /*
  263. * 重新加载locator
  264. */
  265. void reloadLocator();
  266. /**
  267. * 获取obj对应可用ip port列表 如果启用分组,只返回同分组的服务端ip
  268. * @param sObjName
  269. * @return vector<TC_Endpoint>
  270. */
  271. vector<TC_Endpoint> getEndpoint(const string & objName);
  272. /**
  273. * 获取obj对应可用ip port列表 包括所有IDC的
  274. * @param sObjName
  275. * @return vector<TC_Endpoint>
  276. */
  277. vector<TC_Endpoint> getEndpoint4All(const string& objName);
  278. /**
  279. * 结束
  280. */
  281. void terminate();
  282. /**
  283. * 超时请求的回包回来后,是否打印超时的日志信息,AdapterProxy里用到
  284. */
  285. bool getTimeoutLogFlag() { return _timeoutLogFlag; }
  286. /**
  287. * 获取最小的超时时间
  288. */
  289. int64_t getMinTimeout() { return _minTimeout; }
  290. /**
  291. * get resource info
  292. * @return
  293. */
  294. string getResourcesInfo();
  295. /**
  296. * 是否析构中
  297. * @return bool
  298. */
  299. bool isTerminating();
  300. protected:
  301. /**
  302. * 初始化
  303. */
  304. void initialize();
  305. /**
  306. * 获取对象代理生成器
  307. * @return ServantProxyFactoryPtr
  308. */
  309. ServantProxyFactory * servantProxyFactory();
  310. /**
  311. * 获取通用对象
  312. * @param objectName
  313. * @param setName 指定set调用的setid
  314. * @return ServantPrx
  315. */
  316. ServantProxy * getServantProxy(const string& objectName,const string& setName="");
  317. /**
  318. * 数据加入到异步线程队列里面
  319. * @return
  320. */
  321. void pushAsyncThreadQueue(ReqMessage * msg);
  322. /**
  323. * 上报统计事件
  324. * @return
  325. */
  326. void doStat();
  327. /**
  328. * get openssl of trans
  329. * @param sObjName
  330. * @return vector<TC_Endpoint>
  331. */
  332. shared_ptr<TC_OpenSSL> newClientSSL(const string & objName);
  333. /**
  334. * 设置调用链控制参数
  335. * @param name: 参数名
  336. */
  337. void setTraceParam(const string& name = "");
  338. /**
  339. * 通信器启动
  340. */
  341. void notifyCommunicatorEpollStart();
  342. /**
  343. *
  344. * @param func
  345. */
  346. void forEachSchedCommunicatorEpoll(std::function<void(const shared_ptr<CommunicatorEpoll> &)> func);
  347. /**
  348. * 创建一个协程内的网络通信器
  349. * @return
  350. */
  351. shared_ptr<CommunicatorEpoll> createSchedCommunicatorEpoll(size_t netThreadSeq, const shared_ptr<ReqInfoQueue> &reqInfoQueue);
  352. /**
  353. * 删除协程内网络通信器
  354. * @param netThreadSeq
  355. */
  356. void eraseSchedCommunicatorEpoll(size_t netThreadSeq);
  357. /**
  358. * 框架内部需要直接访问通信器的类
  359. */
  360. friend class AdapterProxy;
  361. friend class ServantProxy;
  362. friend class ObjectProxy;
  363. friend class ServantProxyFactory;
  364. friend class ObjectProxyFactory;
  365. friend class AsyncProcThread;
  366. friend class CommunicatorEpoll;
  367. friend class ServantProxyThreadData;
  368. protected:
  369. /**
  370. * 是否初始化
  371. */
  372. bool _initialized;
  373. /**
  374. * 停止标志
  375. */
  376. bool _terminating;
  377. /**
  378. * 客户端的属性配置
  379. */
  380. map<string, string> _properties;
  381. /**
  382. * obj info
  383. */
  384. map<string, map<string, string>> _objInfo;
  385. /**
  386. * ServantProxy代码的工厂类
  387. */
  388. ServantProxyFactory* _servantProxyFactory;
  389. /*
  390. * 公有网络线程
  391. */
  392. vector<shared_ptr<CommunicatorEpoll>> _communicatorEpoll;//[MAX_CLIENT_THREAD_NUM];
  393. /**
  394. * 私有网络线程, 会动态变化
  395. */
  396. unordered_map<size_t, shared_ptr<CommunicatorEpoll>> _schedCommunicatorEpoll;
  397. /**
  398. * 操作通信器的锁
  399. */
  400. TC_SpinLock _schedMutex;
  401. /**
  402. * 锁
  403. */
  404. std::mutex _mutex;
  405. /**
  406. * 条件变量, 用来等待网络线程启动
  407. */
  408. std::condition_variable _cond;
  409. /**
  410. * 通信器启动个数
  411. */
  412. std::atomic<size_t> _communicatorEpollStartNum{0};
  413. /*
  414. * 上报类
  415. */
  416. StatReport * _statReport;
  417. /*
  418. * 超时请求的回包回来后,是否打印超时的日志信息,AdapterProxy里用到
  419. */
  420. bool _timeoutLogFlag;
  421. /*
  422. * 最小的超时时间
  423. */
  424. int64_t _minTimeout;
  425. /**
  426. * ssl ctx
  427. */
  428. shared_ptr<TC_OpenSSL::CTX> _ctx;
  429. /**
  430. * ssl
  431. */
  432. unordered_map<string, shared_ptr<TC_OpenSSL::CTX>> _objCtx;
  433. /*
  434. * 异步线程数组
  435. */
  436. //异步线程(跨通信器共享)
  437. vector<AsyncProcThread*> _asyncThread;
  438. /*
  439. * 异步队列的统计上报的对象
  440. */
  441. PropertyReportPtr _reportAsyncQueue;
  442. /*
  443. * 异步线程数目
  444. */
  445. size_t _asyncThreadNum;
  446. /*
  447. * 分发给异步线程的索引seq
  448. */
  449. size_t _asyncSeq = 0;
  450. /**
  451. * 注册事件
  452. */
  453. size_t _sigId = -1;
  454. // #ifdef TARS_OPENTRACKING
  455. // public:
  456. // struct TraceManager:public TC_HandleBase{
  457. // zipkin::ZipkinOtTracerOptions _zipkin_options;
  458. // std::shared_ptr<opentracing::Tracer> _tracer;
  459. // TraceManager(): _tracer(nullptr){}
  460. // TraceManager(zipkin::ZipkinOtTracerOptions& options):_zipkin_options(options){
  461. // _tracer = zipkin::makeZipkinOtTracer(options);
  462. // }
  463. // ~TraceManager(){if(_tracer != nullptr){_tracer->Close();}}
  464. // };
  465. // TC_AutoPtr<TraceManager> _traceManager;
  466. // #endif
  467. };
  468. ////////////////////////////////////////////////////////////////////////
  469. }
  470. #endif