CommunicatorEpoll.h 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #ifndef __TARS__COMMUNICATOR_EPOLL_H_
  17. #define __TARS__COMMUNICATOR_EPOLL_H_
  18. #include "util/tc_thread.h"
  19. #include "util/tc_thread_mutex.h"
  20. #include "util/tc_epoller.h"
  21. #include "util/tc_loop_queue.h"
  22. #include "servant/Message.h"
  23. #include "servant/EndpointInfo.h"
  24. #include "servant/StatReport.h"
  25. #include "servant/Communicator.h"
  26. #include <set>
  27. namespace tars
  28. {
  29. class Communicator;
  30. class ObjectProxy;
  31. class StatReport;
  32. class PropertyReport;
  33. ////////////////////////////////////////////////////////////////////////
  34. /**
  35. * 监听FD事件并触发注册的handle
  36. */
  37. struct FDInfo
  38. {
  39. /**
  40. * 构造函数
  41. */
  42. FDInfo()
  43. {
  44. }
  45. /**
  46. * 析构函数
  47. */
  48. ~FDInfo()
  49. {
  50. if (msgQueue)
  51. {
  52. ReqMessage *msg;
  53. while (msgQueue->pop_front(msg))
  54. {
  55. delete msg;
  56. }
  57. }
  58. }
  59. size_t iSeq;
  60. shared_ptr<ReqInfoQueue> msgQueue;
  61. TC_Epoller::NotifyInfo notify;
  62. bool autoDestroy = false;
  63. };
  64. ////////////////////////////////////////////////////////////////////////
  65. /**
  66. * 客户端网络处理的线程类
  67. */
  68. class CommunicatorEpoll : public TC_Thread, public enable_shared_from_this<CommunicatorEpoll>
  69. {
  70. public:
  71. /**
  72. * 构造函数
  73. * @param pCommunicator
  74. * @param netThreadSeq, 业务线程序号, 如果是公有网络通信器, 则为-1
  75. * @param isFirst, 是否是第一个公有网络通信器
  76. */
  77. CommunicatorEpoll(Communicator * pCommunicator, size_t netThreadSeq, bool isFirst = false);
  78. /**
  79. * 析构函数
  80. */
  81. virtual ~CommunicatorEpoll();
  82. /**
  83. * 获取 Communicator
  84. */
  85. inline Communicator * getCommunicator()
  86. {
  87. return _communicator;
  88. }
  89. /**
  90. * 获取 网络线程id
  91. */
  92. inline size_t getCommunicatorNetThreadSeq()
  93. {
  94. return _netThreadSeq;
  95. }
  96. /**
  97. * 获取 节点队列未发送请求的大小限制
  98. */
  99. inline size_t getNoSendQueueLimit()
  100. {
  101. return _noSendQueueLimit;
  102. }
  103. /*
  104. * 判断是否是第一个网络线程 主控写缓存的时候用到
  105. */
  106. inline bool isFirstNetThread()
  107. {
  108. return _isFirst;
  109. }
  110. /**
  111. * 获取epoller
  112. * @return
  113. */
  114. inline TC_Epoller* getEpoller() { return _epoller; }
  115. /**
  116. * 是否存在ObjectProxy了, 如果已经存在则创建
  117. * @param sObjectProxyName
  118. * @param setName
  119. * @return
  120. */
  121. ObjectProxy * hasObjectProxy(const string & sObjectProxyName,const string& setName="");
  122. /*
  123. * 获取本epoll的代理对象
  124. */
  125. ObjectProxy * createObjectProxy(ServantProxy *servantProxy, const string & sObjectProxyName,const string& setName="");
  126. /**
  127. * 循环监听网络事件
  128. */
  129. virtual void run();
  130. /**
  131. * 中止监听
  132. */
  133. void terminate();
  134. /**
  135. * 注册fd对应的处理handle
  136. * @param adapterProxy
  137. */
  138. void addFd(AdapterProxy* adapterProxy);
  139. /**
  140. * 通知事件过来
  141. * @param iSeq
  142. */
  143. void notify(size_t iSeq);
  144. /**
  145. * 主动更新ip list
  146. * @param active
  147. * @param inactive
  148. */
  149. void notifyUpdateEndpoints(ServantProxy *servantProxy, const set<EndpointInfo> & active,const set<EndpointInfo> & inactive);
  150. /**
  151. * 数据加入到异步线程队列里面
  152. * @return
  153. */
  154. inline void pushAsyncThreadQueue(ReqMessage * msg) { _communicator->pushAsyncThreadQueue(msg); }
  155. /**
  156. * set reconnect
  157. * @param time
  158. */
  159. inline void reConnect(int64_t ms, TC_Transceiver*p) { _reconnect[ms] = p; }
  160. /**
  161. * communicator resource desc
  162. * @return
  163. */
  164. void getResourcesInfo(ostringstream &desc);
  165. /**
  166. * 所有对象代理加载locator信息
  167. */
  168. int loadObjectLocator();
  169. /**
  170. * servant换成对应线程的objectproxy
  171. * @param servantProxy
  172. * @return
  173. */
  174. ObjectProxy* servantToObjectProxy(ServantProxy *servantProxy);
  175. /**
  176. * 是否是协程中的私有通信器
  177. * @return
  178. */
  179. inline bool isSchedCommunicatorEpoll() const { return !_public; }
  180. /**
  181. * 初始化notify
  182. */
  183. void initNotify(size_t iSeq, const shared_ptr<ReqInfoQueue> &msgQueue);
  184. /**
  185. * 直接通知
  186. */
  187. inline void handle(uint16_t reqQNo) { handleNotify(_notify[reqQNo]->notify.getEpollInfo()); }
  188. /**
  189. * 获取通知句柄(主要用于测试)
  190. * @return
  191. */
  192. FDInfo** getNotify() { return _notify; }
  193. protected:
  194. /**
  195. * 网络线程中处理CommunicatorEpoll退出的清理逻辑
  196. */
  197. void handleTerminate();
  198. /**
  199. * 通知CommunicatorEpoll退出
  200. */
  201. void notifyTerminate();
  202. /**
  203. * 网络线程中处理业务线程退出的清理逻辑
  204. */
  205. void handleServantThreadQuit(uint16_t iSeq);
  206. /**
  207. * 通知业务线程退出
  208. */
  209. void notifyServantThreadQuit(uint16_t iSeq);
  210. /**
  211. * 初始化
  212. * 如果在其他协程中, 并不自己run, 只需要调用该函数初始化epoller即可
  213. */
  214. void initializeEpoller();
  215. /**
  216. * 上报数据
  217. * @param pmStatMicMsg
  218. */
  219. void report(StatReport::MapStatMicMsg *pmStatMicMsg);
  220. /**
  221. * 弹出来统计数据
  222. * @param mStatMsg
  223. * @return
  224. */
  225. bool popStatMsg(StatReport::MapStatMicMsg* &mStatMsg);
  226. /**
  227. * 输入事件
  228. * @param pi
  229. */
  230. bool handleCloseImp(const shared_ptr<TC_Epoller::EpollInfo> &data);
  231. /**
  232. * 输入事件
  233. * @param pi
  234. */
  235. bool handleInputImp(const shared_ptr<TC_Epoller::EpollInfo> &data);
  236. /**
  237. * 输出事件
  238. * @param pi
  239. */
  240. bool handleOutputImp(const shared_ptr<TC_Epoller::EpollInfo> &data);
  241. /**
  242. * 处理notify
  243. */
  244. bool handleNotify(const shared_ptr<TC_Epoller::EpollInfo> & data);
  245. /**
  246. * 处理超时
  247. * @param pi
  248. */
  249. void doTimeout();
  250. /**
  251. * 处理tars_ping
  252. * @param pi
  253. */
  254. void doKeepAlive();
  255. /**
  256. * 处理stat
  257. * @param pi
  258. */
  259. void doStat();
  260. /**
  261. * reconnect
  262. */
  263. void doReconnect();
  264. /**
  265. * 根据序号 获取所有obj对象
  266. */
  267. inline ObjectProxy * getObjectProxy(size_t iNum)
  268. {
  269. TC_ThreadRLock lock(_vObjectMutex);
  270. assert(iNum < _objNum);
  271. return _vObjectProxys[iNum];
  272. }
  273. /**
  274. * 获取所有对象的个数,为了不加锁不用map
  275. */
  276. inline size_t getObjNum()
  277. {
  278. return _objNum;
  279. }
  280. /**
  281. * 需要上报的stat数据size
  282. * @return
  283. */
  284. inline size_t getReportSize() { return _statQueue.size(); }
  285. friend class StatReport;
  286. friend class AdapterProxy;
  287. friend class Communicator;
  288. friend class ServantProxy;
  289. friend class ServantProxyThreadData;
  290. protected:
  291. /*
  292. * 通信器
  293. */
  294. Communicator * _communicator;
  295. /**
  296. * 是否第一个网络线程
  297. */
  298. bool _isFirst = false;
  299. /**
  300. * 是否公有的网络线程
  301. */
  302. bool _public = false;
  303. /**
  304. * notify
  305. */
  306. FDInfo* _notify[MAX_CLIENT_NOTIFYEVENT_NUM];
  307. /**
  308. * schedule
  309. */
  310. shared_ptr<TC_CoroutineScheduler> _scheduler;
  311. /**
  312. * 独立的网络线程存在, 线程私有数据
  313. */
  314. ServantProxyThreadData *_pSptd = NULL;
  315. /*
  316. * epoll
  317. */
  318. TC_Epoller *_epoller = NULL;
  319. /**
  320. * lock
  321. */
  322. TC_ThreadRecMutex _objectMutex;
  323. /**
  324. * 保存已创建的objectproxy
  325. */
  326. unordered_map<string, ObjectProxy*> _objectProxys;
  327. /**
  328. * _vObjectProxys读写锁
  329. */
  330. TC_ThreadRWLocker _vObjectMutex;
  331. /**
  332. * 保存已经创建的obj 取的时候可以不用加锁
  333. */
  334. vector<ObjectProxy *> _vObjectProxys;
  335. /**
  336. * 读写锁
  337. */
  338. TC_ThreadRWLocker _servantMutex;
  339. /**
  340. * servant对应的objectProxy
  341. */
  342. unordered_map<ServantProxy*, ObjectProxy*> _servantObjectProxy;
  343. /*
  344. *保存已经创建obj的数量
  345. */
  346. size_t _objNum = 0;
  347. /*
  348. * 网络线程的id号
  349. * 私有网络线程: ServantProxyThreadData::_reqQNo, 从0开始计数
  350. */
  351. size_t _netThreadSeq;
  352. /*
  353. * 节点ip队列未发送请求的大小限制
  354. */
  355. size_t _noSendQueueLimit;
  356. /*
  357. * 超时的检查时间间隔
  358. */
  359. int64_t _timeoutCheckInterval;
  360. /**
  361. * auto reconnect TC_Transceiver
  362. */
  363. unordered_map<int64_t, TC_Transceiver*> _reconnect;
  364. /**
  365. * 统计数据
  366. */
  367. TC_LoopQueue<StatReport::MapStatMicMsg*> _statQueue;
  368. /**
  369. * 网络线程ID
  370. */
  371. std::thread::id _threadId;
  372. /**
  373. * 定时器的id
  374. */
  375. vector<int64_t> _timerIds;
  376. /**
  377. * 锁
  378. */
  379. std::mutex _mutex;
  380. };
  381. /////////////////////////////////////////////////////////////////////////////////////
  382. }
  383. #endif