Communicator.h 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #ifndef __TARS_COMMUNICATOR_H_
  17. #define __TARS_COMMUNICATOR_H_
  18. #include "util/tc_thread.h"
  19. #include "util/tc_config.h"
  20. #include "util/tc_singleton.h"
  21. #include "servant/Global.h"
  22. #include "servant/ServantProxy.h"
  23. #include "servant/ServantProxyFactory.h"
  24. #include "servant/ObjectProxyFactory.h"
  25. #include "servant/AsyncProcThread.h"
  26. #include "servant/CommunicatorEpoll.h"
  27. #include "servant/RemoteLogger.h"
  28. #ifdef TARS_OPENTRACKING
  29. #include "zipkin/opentracing.h"
  30. #include "zipkin/tracer.h"
  31. #include "zipkin/ip_address.h"
  32. #endif
  33. #define CONFIG_ROOT_PATH "/tars/application/client"
  34. //
  35. //struct ssl_ctx_st;
  36. //typedef struct ssl_ctx_st SSL_CTX;
  37. namespace tars
  38. {
  39. ////////////////////////////////////////////////////////////////////////
  40. /**
  41. * 客户端配置
  42. */
  43. struct ClientConfig
  44. {
  45. /**
  46. * 客户端IP地址
  47. */
  48. static string LocalIp;
  49. /**
  50. * 客户端模块名称
  51. */
  52. static string ModuleName;
  53. /**
  54. * 客户端所有的IP地址
  55. */
  56. static set<string> SetLocalIp;
  57. /**
  58. *客户端是否打开set分组
  59. */
  60. static bool SetOpen;
  61. /**
  62. *客户端set分组
  63. */
  64. static string SetDivision;
  65. /**
  66. * 客户端的tars版本号
  67. */
  68. static string TarsVersion;
  69. };
  70. ////////////////////////////////////////////////////////////////////////
  71. /**
  72. * 通信器,用于创建和维护客户端proxy
  73. */
  74. class SVT_DLL_API Communicator : public TC_HandleBase, public TC_ThreadRecMutex
  75. {
  76. public:
  77. typedef std::function<void(ReqMessagePtr)> custom_callback;
  78. /**
  79. * 构造函数
  80. */
  81. Communicator();
  82. /**
  83. * 采用配置构造
  84. * @param conf
  85. * @param path
  86. */
  87. Communicator(TC_Config& conf, const string& domain = CONFIG_ROOT_PATH);
  88. /**
  89. * 析够
  90. * 析够时自动接收相关线程
  91. */
  92. ~Communicator();
  93. public:
  94. /**
  95. * 生成代理
  96. * @param T
  97. * @param objectName
  98. * @param setName 指定set调用的setid
  99. * @return T
  100. */
  101. template<class T> T stringToProxy(const string& objectName, const string& setName = "")
  102. {
  103. T prx = NULL;
  104. stringToProxy<T>(objectName, prx,setName);
  105. return prx;
  106. }
  107. /**
  108. * 生成代理
  109. * @param T
  110. * @param objectName
  111. * @param setName 指定set调用的setid
  112. * @param proxy
  113. */
  114. template<class T> void stringToProxy(const string& objectName, T& proxy,const string& setName="")
  115. {
  116. ServantProxy * pServantProxy = getServantProxy(objectName,setName);
  117. proxy = (typename T::element_type*)(pServantProxy);
  118. }
  119. /*
  120. *获取客户端网络线程的个数
  121. */
  122. inline size_t getClientThreadNum()
  123. {
  124. return _clientThreadNum;
  125. }
  126. /*
  127. *获取客户端网络线程的对象
  128. */
  129. inline CommunicatorEpoll * getCommunicatorEpoll(size_t iNum)
  130. {
  131. assert(iNum<_clientThreadNum);
  132. return _communicatorEpoll[iNum];
  133. }
  134. /**
  135. * 获取属性
  136. * @param name
  137. * @param dft, 缺省值
  138. * @return string
  139. */
  140. string getProperty(const string& name, const string& dft = "");
  141. /**
  142. * 设置属性
  143. * @param properties
  144. */
  145. void setProperty(const map<string, string>& properties);
  146. /**
  147. * 设置某一个属性
  148. * @param name
  149. * @param value
  150. */
  151. void setProperty(const string& name, const string& value);
  152. /**
  153. * 设置属性
  154. * @param conf
  155. * @param path
  156. */
  157. void setProperty(TC_Config& conf, const string& domain = CONFIG_ROOT_PATH);
  158. /**
  159. * get servant property
  160. * @param sObj
  161. * @return
  162. */
  163. map<string, string> getServantProperty(const string &sObj);
  164. /**
  165. * set servant property
  166. * @param sObj
  167. * @return
  168. */
  169. void setServantProperty(const string &sObj, const string& name, const string& value);
  170. /**
  171. * get servant property
  172. * @param sObj
  173. * @return
  174. */
  175. string getServantProperty(const string &sObj, const string& name);
  176. /**
  177. * 设置自动回调对象
  178. */
  179. void setServantCustomCallback(const string &sObj, custom_callback callback);
  180. /**
  181. * 上报统计
  182. * @return StatReport*
  183. */
  184. StatReport * getStatReport();
  185. /**
  186. * 重新加载属性
  187. */
  188. int reloadProperty(string & sResult);
  189. /*
  190. * 重新加载locator
  191. */
  192. void reloadLocator();
  193. /**
  194. * 获取obj对应可用ip port列表 如果启用分组,只返回同分组的服务端ip
  195. * @param sObjName
  196. * @return vector<TC_Endpoint>
  197. */
  198. vector<TC_Endpoint> getEndpoint(const string & objName);
  199. /**
  200. * 获取obj对应可用ip port列表 包括所有IDC的
  201. * @param sObjName
  202. * @return vector<TC_Endpoint>
  203. */
  204. vector<TC_Endpoint> getEndpoint4All(const string& objName);
  205. /**
  206. * 结束
  207. */
  208. void terminate();
  209. /**
  210. * 超时请求的回包回来后,是否打印超时的日志信息,AdapterProxy里用到
  211. */
  212. bool getTimeoutLogFlag() { return _timeoutLogFlag; }
  213. /**
  214. * 获取最小的超时时间
  215. */
  216. int64_t getMinTimeout() { return _minTimeout; }
  217. /**
  218. * get resource info
  219. * @return
  220. */
  221. string getResourcesInfo();
  222. protected:
  223. /**
  224. * 初始化
  225. */
  226. void initialize();
  227. /**
  228. * 是否析构中
  229. * @return bool
  230. */
  231. bool isTerminating();
  232. /**
  233. * 获取对象代理生成器
  234. * @return ServantProxyFactoryPtr
  235. */
  236. ServantProxyFactory * servantProxyFactory();
  237. /**
  238. * 获取通用对象
  239. * @param objectName
  240. * @param setName 指定set调用的setid
  241. * @return ServantPrx
  242. */
  243. ServantProxy * getServantProxy(const string& objectName,const string& setName="");
  244. /**
  245. * 数据加入到异步线程队列里面
  246. * @return
  247. */
  248. void pushAsyncThreadQueue(ReqMessage * msg);
  249. /**
  250. * 上报统计事件
  251. * @return
  252. */
  253. void doStat();
  254. #if TARS_SSL
  255. /**
  256. * get openssl of trans
  257. * @param sObjName
  258. * @return vector<TC_Endpoint>
  259. */
  260. shared_ptr<TC_OpenSSL> newClientSSL(const string & objName);
  261. #endif
  262. /**
  263. * 框架内部需要直接访问通信器的类
  264. */
  265. friend class AdapterProxy;
  266. friend class ServantProxy;
  267. friend class ObjectProxy;
  268. friend class ServantProxyFactory;
  269. friend class ObjectProxyFactory;
  270. friend class AsyncProcThread;
  271. friend class CommunicatorEpoll;
  272. friend class Transceiver;
  273. protected:
  274. /**
  275. * 是否初始化
  276. */
  277. bool _initialized;
  278. /**
  279. * 停止标志
  280. */
  281. bool _terminating;
  282. /**
  283. * 客户端的属性配置
  284. */
  285. map<string, string> _properties;
  286. /**
  287. * obj info
  288. */
  289. map<string, map<string, string>> _objInfo;
  290. /**
  291. * ServantProxy代码的工厂类
  292. */
  293. ServantProxyFactory* _servantProxyFactory;
  294. /*
  295. * 网络线程数组
  296. */
  297. CommunicatorEpoll * _communicatorEpoll[MAX_CLIENT_THREAD_NUM];
  298. /*
  299. * 网络线程数目
  300. */
  301. size_t _clientThreadNum;
  302. /*
  303. * 上报类
  304. */
  305. StatReport * _statReport;
  306. /*
  307. * 超时请求的回包回来后,是否打印超时的日志信息,AdapterProxy里用到
  308. */
  309. bool _timeoutLogFlag;
  310. /*
  311. * 最小的超时时间
  312. */
  313. int64_t _minTimeout;
  314. #if TARS_SSL
  315. /**
  316. * ssl ctx
  317. */
  318. shared_ptr<TC_OpenSSL::CTX> _ctx;
  319. /**
  320. * ssl
  321. */
  322. unordered_map<string, shared_ptr<TC_OpenSSL::CTX>> _objCtx;
  323. #endif
  324. /**
  325. *
  326. */
  327. TC_SpinLock _callbackLock;
  328. /**
  329. * callback
  330. */
  331. unordered_map<string, custom_callback> _callback;
  332. /*
  333. * 异步线程数组
  334. */
  335. //异步线程(跨通信器共享)
  336. vector<AsyncProcThread*> _asyncThread;//[MAX_THREAD_NUM];
  337. /*
  338. * 异步队列的统计上报的对象
  339. */
  340. PropertyReportPtr _reportAsyncQueue;
  341. /*
  342. * 异步线程数目
  343. */
  344. size_t _asyncThreadNum;
  345. /*
  346. * 分发给异步线程的索引seq
  347. */
  348. size_t _asyncSeq = 0;
  349. #ifdef TARS_OPENTRACKING
  350. public:
  351. struct TraceManager:public TC_HandleBase{
  352. zipkin::ZipkinOtTracerOptions _zipkin_options;
  353. std::shared_ptr<opentracing::Tracer> _tracer;
  354. TraceManager(): _tracer(nullptr){}
  355. TraceManager(zipkin::ZipkinOtTracerOptions& options):_zipkin_options(options){
  356. _tracer = zipkin::makeZipkinOtTracer(options);
  357. }
  358. ~TraceManager(){if(_tracer != nullptr){_tracer->Close();}}
  359. };
  360. TC_AutoPtr<TraceManager> _traceManager;
  361. #endif
  362. };
  363. ////////////////////////////////////////////////////////////////////////
  364. }
  365. #endif