CommunicatorEpoll.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "servant/CommunicatorEpoll.h"
  17. #include "servant/Communicator.h"
  18. #include "servant/Application.h"
  19. #include "servant/RemoteLogger.h"
  20. #include "servant/ObjectProxy.h"
  21. #include "servant/EndpointManager.h"
  22. using namespace std;
  23. namespace tars
  24. {
  25. #define MAX_STAT_QUEUE_SIZE 100000 //上报队列缓存大小
  26. CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThreadSeq, bool isFirst)
  27. : _communicator(pCommunicator)
  28. , _isFirst(isFirst)
  29. , _netThreadSeq(netThreadSeq)
  30. , _noSendQueueLimit(1000)
  31. , _timeoutCheckInterval(100)
  32. , _statQueue(MAX_STAT_QUEUE_SIZE)
  33. {
  34. // LOG_CONSOLE_DEBUG << endl;
  35. //节点队列未发送请求的大小限制
  36. _noSendQueueLimit = TC_Common::strto<size_t>(pCommunicator->getProperty("sendqueuelimit", pCommunicator->getProperty("nosendqueuelimit", "100000")));
  37. if(_noSendQueueLimit < 1000)
  38. {
  39. _noSendQueueLimit = 1000;
  40. }
  41. //检查超时请求的时间间隔,单位:ms
  42. _timeoutCheckInterval = TC_Common::strto<int64_t>(pCommunicator->getProperty("timeoutcheckinterval", "1000"));
  43. if(_timeoutCheckInterval < 1)
  44. {
  45. _timeoutCheckInterval = 5;
  46. }
  47. for(size_t i = 0;i < MAX_CLIENT_NOTIFYEVENT_NUM;++i)
  48. {
  49. _notify[i] = NULL;
  50. }
  51. }
  52. CommunicatorEpoll::~CommunicatorEpoll()
  53. {
  54. // LOG_CONSOLE_DEBUG << endl;
  55. }
  56. void CommunicatorEpoll::handleServantThreadQuit(uint16_t iSeq)
  57. {
  58. assert(_threadId == this_thread::get_id());
  59. //在网络线程中处理的!
  60. if(_notify[iSeq])
  61. {
  62. _notify[iSeq]->autoDestroy = true;
  63. //通知网络线程, 网络线程好析构notify对象
  64. notify(iSeq);
  65. _notify[iSeq] = NULL;
  66. }
  67. }
  68. void CommunicatorEpoll::notifyServantThreadQuit(uint16_t iSeq)
  69. {
  70. if(_scheduler)
  71. {
  72. //CommunicatorEpoll还没有退出!
  73. if (_threadId == this_thread::get_id())
  74. {
  75. //同一个线程里面结束, 直接释放相关资源即可
  76. CommunicatorEpoll::handleServantThreadQuit(iSeq);
  77. }
  78. else
  79. {
  80. //等待数据都发送出去, 避免业务线程退出以后, 还有数据没有发送出去, 这里处理不够优雅!
  81. {
  82. std::lock_guard<std::mutex> lock(_mutex);
  83. //再做一次判断
  84. if (_scheduler)
  85. {
  86. //通知网络线程去释放资源!
  87. _epoller->asyncCallback(std::bind(&CommunicatorEpoll::handleServantThreadQuit, this, iSeq));
  88. }
  89. }
  90. }
  91. }
  92. }
  93. void CommunicatorEpoll::notifyTerminate()
  94. {
  95. if (_scheduler)
  96. {
  97. if(_threadId == this_thread::get_id())
  98. {
  99. //同一个线程里面结束, 直接释放相关资源即可
  100. CommunicatorEpoll::handleTerminate();
  101. }
  102. else
  103. {
  104. std::lock_guard<std::mutex> lock(_mutex);
  105. if (_scheduler)
  106. {
  107. //通知网络线程去释放资源!
  108. _epoller->syncCallback(std::bind(&CommunicatorEpoll::handleTerminate, this), 1000);
  109. // if (_scheduler)
  110. // {
  111. // _epoller->syncCallback(std::bind(&CommunicatorEpoll::handleTerminate, this), 1000);
  112. // LOG_CONSOLE_DEBUG << _scheduler.get() << endl;
  113. // }
  114. }
  115. }
  116. }
  117. }
  118. void CommunicatorEpoll::handleTerminate()
  119. {
  120. assert(_threadId == this_thread::get_id());
  121. if (_scheduler)
  122. {
  123. for (size_t i = 0; i < MAX_CLIENT_NOTIFYEVENT_NUM; ++i)
  124. {
  125. if (_notify[i])
  126. {
  127. delete _notify[i];
  128. }
  129. _notify[i] = NULL;
  130. }
  131. for (size_t i = 0; i < _vObjectProxys.size(); i++)
  132. {
  133. if (_vObjectProxys[i])
  134. {
  135. delete _vObjectProxys[i];
  136. _vObjectProxys[i] = NULL;
  137. }
  138. }
  139. _vObjectProxys.clear();
  140. //这里是否还是有临界情况!!??
  141. if(_epoller)
  142. {
  143. //定时任务都删除掉
  144. for_each(_timerIds.begin(), _timerIds.end(), [&](int64_t id)
  145. { _epoller->erase(id); });
  146. }
  147. _timerIds.clear();
  148. //独立启动的才需要释放协程调度器, 复用其他协程是不能停止调度器的!
  149. if(!this->isSchedCommunicatorEpoll())
  150. {
  151. _scheduler->terminate();
  152. assert(_pSptd);
  153. _pSptd->_sched.reset();
  154. ServantProxyThreadData::g_sp.reset();
  155. }
  156. _scheduler.reset();
  157. StatReport::MapStatMicMsg *pmStatMicMsg = NULL;
  158. while (_statQueue.pop_front(pmStatMicMsg))
  159. {
  160. assert(pmStatMicMsg != NULL);
  161. delete pmStatMicMsg;
  162. pmStatMicMsg = NULL;
  163. }
  164. }
  165. }
  166. void CommunicatorEpoll::terminate()
  167. {
  168. //通知网络线程退出, 不再执行任何操作
  169. notifyTerminate();
  170. }
  171. void CommunicatorEpoll::notifyUpdateEndpoints(ServantProxy *servantProxy, const set<EndpointInfo> & active,const set<EndpointInfo> & inactive)
  172. {
  173. CommunicatorEpoll *ce = this;
  174. _epoller->asyncCallback([=]()
  175. { servantProxy->onNotifyEndpoints(ce, active, inactive); });
  176. }
  177. int CommunicatorEpoll::loadObjectLocator()
  178. {
  179. assert(_threadId == this_thread::get_id());
  180. for (size_t i = 0; i < _objNum; i++)
  181. {
  182. _vObjectProxys[i]->loadLocator();
  183. }
  184. return 0;
  185. }
  186. ObjectProxy* CommunicatorEpoll::servantToObjectProxy(ServantProxy *servantProxy)
  187. {
  188. TC_ThreadRLock lock(_servantMutex);
  189. auto it = _servantObjectProxy.find(servantProxy);
  190. if( it != _servantObjectProxy.end())
  191. {
  192. //当前线程(CommunicatorEpoll)还没有创建出对应的ObjectProxy
  193. return it->second;
  194. }
  195. // assert(false);
  196. return NULL;
  197. }
  198. ObjectProxy * CommunicatorEpoll::hasObjectProxy(const string & sObjectProxyName,const string& setName)
  199. {
  200. TC_LockT<TC_ThreadRecMutex> lock(_objectMutex);
  201. string tmpObjName = sObjectProxyName + "!" + setName;
  202. auto it = _objectProxys.find(tmpObjName);
  203. if(it != _objectProxys.end())
  204. {
  205. return it->second;
  206. }
  207. return NULL;
  208. }
  209. ObjectProxy * CommunicatorEpoll::createObjectProxy(ServantProxy *servantProxy, const string & sObjectProxyName, const string& setName)
  210. {
  211. ObjectProxy * pObjectProxy;
  212. string tmpObjName = sObjectProxyName + "!" + setName;
  213. {
  214. TC_LockT<TC_ThreadRecMutex> lock(_objectMutex);
  215. auto it = _objectProxys.find(tmpObjName);
  216. if (it != _objectProxys.end())
  217. {
  218. return it->second;
  219. }
  220. pObjectProxy = new ObjectProxy(this, servantProxy, sObjectProxyName, setName);
  221. _objectProxys[tmpObjName] = pObjectProxy;
  222. }
  223. {
  224. TC_ThreadWLock lock(_vObjectMutex);
  225. _vObjectProxys.push_back(pObjectProxy);
  226. _objNum++;
  227. }
  228. {
  229. TC_ThreadWLock lock(_servantMutex);
  230. _servantObjectProxy[servantProxy] = pObjectProxy;
  231. }
  232. return pObjectProxy;
  233. }
  234. void CommunicatorEpoll::addFd(AdapterProxy* adapterProxy)
  235. {
  236. shared_ptr<TC_Epoller::EpollInfo> epollInfo = adapterProxy->trans()->getEpollInfo();
  237. epollInfo->cookie(adapterProxy);
  238. map<uint32_t, TC_Epoller::EpollInfo::EVENT_CALLBACK> callbacks;
  239. callbacks[EPOLLIN] = std::bind(&CommunicatorEpoll::handleInputImp, this, std::placeholders::_1);
  240. callbacks[EPOLLOUT] = std::bind(&CommunicatorEpoll::handleOutputImp, this, std::placeholders::_1);
  241. callbacks[EPOLLERR] = std::bind(&CommunicatorEpoll::handleCloseImp, this, std::placeholders::_1);
  242. epollInfo->registerCallback(callbacks, EPOLLIN|EPOLLOUT);
  243. }
  244. void CommunicatorEpoll::notify(size_t iSeq)
  245. {
  246. assert(_notify[iSeq] != NULL);
  247. // LOG_CONSOLE_DEBUG << "iSeq:" << iSeq << ", epollInfo:" << _notify[iSeq]->notify.getEpollInfo() << ", ce:" << this << endl;
  248. _notify[iSeq]->notify.getEpollInfo()->mod(EPOLLOUT);
  249. }
  250. void CommunicatorEpoll::initNotify(size_t iSeq, const shared_ptr<ReqInfoQueue> &msgQueue)
  251. {
  252. // if(_notify[iSeq] != NULL)
  253. // {
  254. // LOG_CONSOLE_DEBUG << "iSeq:" << iSeq << ", " << msgQueue.get() << ", " << _notify[iSeq]->msgQueue.get() << endl;
  255. // }
  256. // assert(_notify[iSeq] == NULL);
  257. if (_notify[iSeq] == NULL)
  258. {
  259. _notify[iSeq] = new FDInfo();
  260. _notify[iSeq]->msgQueue = msgQueue;
  261. _notify[iSeq]->iSeq = iSeq;
  262. _notify[iSeq]->notify.init(_epoller);
  263. _notify[iSeq]->notify.getEpollInfo()->cookie((void*)_notify[iSeq]);
  264. map<uint32_t, TC_Epoller::EpollInfo::EVENT_CALLBACK> callbacks;
  265. callbacks[EPOLLOUT] = std::bind(&CommunicatorEpoll::handleNotify, this, std::placeholders::_1);
  266. _notify[iSeq]->notify.getEpollInfo()->registerCallback(callbacks, EPOLLIN | EPOLLOUT);
  267. }
  268. else
  269. {
  270. _notify[iSeq]->msgQueue = msgQueue;
  271. }
  272. }
  273. bool CommunicatorEpoll::handleCloseImp(const shared_ptr<TC_Epoller::EpollInfo> &data)
  274. {
  275. assert(_threadId == this_thread::get_id());
  276. AdapterProxy* adapterProxy = (AdapterProxy*)data->cookie();
  277. adapterProxy->trans()->close();
  278. return false;
  279. }
  280. bool CommunicatorEpoll::handleInputImp(const shared_ptr<TC_Epoller::EpollInfo> &data)
  281. {
  282. assert(_threadId == this_thread::get_id());
  283. AdapterProxy* adapterProxy = (AdapterProxy*)data->cookie();
  284. try
  285. {
  286. adapterProxy->trans()->doResponse();
  287. }
  288. catch(const std::exception& e)
  289. {
  290. // LOG_CONSOLE_DEBUG << "[CommunicatorEpoll::handleInputImp] error:" << e.what() << endl;
  291. TLOGTARS("[CommunicatorEpoll::handleInputImp] error:" << e.what() << endl);
  292. adapterProxy->addConnExc(true);
  293. return false;
  294. }
  295. return true;
  296. }
  297. bool CommunicatorEpoll::handleOutputImp(const shared_ptr<TC_Epoller::EpollInfo> &data)
  298. {
  299. assert(_threadId == this_thread::get_id());
  300. // LOG_CONSOLE_DEBUG << endl;
  301. AdapterProxy* adapterProxy = (AdapterProxy*)data->cookie();
  302. try
  303. {
  304. adapterProxy->trans()->doRequest();
  305. }
  306. catch(const std::exception& e)
  307. {
  308. // LOG_CONSOLE_DEBUG << "[CommunicatorEpoll::handleOutputImp] error:" << e.what() << endl;
  309. TLOGTARS("[CommunicatorEpoll::handleOutputImp] error:" << e.what() << endl);
  310. adapterProxy->addConnExc(true);
  311. return false;
  312. }
  313. return true;
  314. }
  315. void CommunicatorEpoll::report(StatReport::MapStatMicMsg *pmStatMicMsg)
  316. {
  317. bool bFlag = _statQueue.push_back(pmStatMicMsg);
  318. if(!bFlag)
  319. {
  320. delete pmStatMicMsg;
  321. pmStatMicMsg = NULL;
  322. TLOGERROR("[StatReport::report: queue full]" << endl);
  323. }
  324. }
  325. bool CommunicatorEpoll::popStatMsg(StatReport::MapStatMicMsg* &mStatMsg)
  326. {
  327. return _statQueue.pop_front(mStatMsg);
  328. }
  329. void CommunicatorEpoll::doTimeout()
  330. {
  331. assert(_threadId == this_thread::get_id());
  332. for(size_t i = 0; i < getObjNum(); ++i)
  333. {
  334. getObjectProxy(i)->doTimeout();
  335. }
  336. }
  337. void CommunicatorEpoll::doKeepAlive()
  338. {
  339. assert(_threadId == this_thread::get_id());
  340. if (_communicator->getKeepAliveInterval() == 0)
  341. {
  342. return;
  343. }
  344. for(size_t i = 0; i < getObjNum(); ++i)
  345. {
  346. getObjectProxy(i)->doKeepAlive();
  347. }
  348. }
  349. void CommunicatorEpoll::doStat()
  350. {
  351. assert(_threadId == this_thread::get_id());
  352. {
  353. if(isFirstNetThread()) {
  354. _communicator->doStat();
  355. }
  356. StatReport::MapStatMicMsg* pmStatMicMsg = new StatReport::MapStatMicMsg();//(mStatMicMsg);
  357. for(size_t i = 0;i < getObjNum(); ++i)
  358. {
  359. getObjectProxy(i)->mergeStat(*pmStatMicMsg);
  360. }
  361. //有数据才上报
  362. if(!pmStatMicMsg->empty())
  363. {
  364. report(pmStatMicMsg);
  365. }
  366. else
  367. {
  368. delete pmStatMicMsg;
  369. pmStatMicMsg = NULL;
  370. }
  371. }
  372. }
  373. void CommunicatorEpoll::getResourcesInfo(ostringstream &desc)
  374. {
  375. assert(_threadId == this_thread::get_id());
  376. desc << TC_Common::outfill("index") << _netThreadSeq << endl;
  377. desc << TC_Common::outfill("stat size") << getReportSize() << endl;
  378. desc << TC_Common::outfill("obj num") << getObjNum() << endl;
  379. const static string TAB = " ";
  380. for(size_t i = 0; i < getObjNum(); ++i)
  381. {
  382. desc << TAB << OUT_LINE_TAB(1) << endl;
  383. desc << TAB << TC_Common::outfill("obj name") << getObjectProxy(i)->name() << endl;
  384. const vector<AdapterProxy*> &adapters = getObjectProxy(i)->getAdapters();
  385. for(auto adapter : adapters)
  386. {
  387. desc << TAB << TAB << OUT_LINE_TAB(2) << endl;
  388. desc << TAB << TAB << TC_Common::outfill("adapter") << adapter->endpoint().getEndpoint().toString() << endl;
  389. desc << TAB << TAB << TC_Common::outfill("recv size") << adapter->trans()->getRecvBuffer().getBufferLength() << endl;
  390. desc << TAB << TAB << TC_Common::outfill("send size") << adapter->trans()->getSendBuffer().getBufferLength() << endl;
  391. }
  392. }
  393. }
  394. void CommunicatorEpoll::doReconnect()
  395. {
  396. assert(_threadId == this_thread::get_id());
  397. int64_t iNow = TNOWMS;
  398. set<TC_Transceiver*> does;
  399. while(!_reconnect.empty())
  400. {
  401. auto it = _reconnect.begin();
  402. if(it->first > iNow)
  403. {
  404. return;
  405. }
  406. //一次循环同一个节点只尝试一次重试,以避免多次触发close,导致重连的间隔无效
  407. if (does.find(it->second) != does.end())
  408. {
  409. _reconnect.erase(it++);
  410. }
  411. else
  412. {
  413. does.insert(it->second);
  414. it->second->connect();
  415. _reconnect.erase(it++);
  416. }
  417. }
  418. }
  419. bool CommunicatorEpoll::handleNotify(const shared_ptr<TC_Epoller::EpollInfo> &data)
  420. {
  421. assert(_threadId == this_thread::get_id());
  422. // LOG_CONSOLE_DEBUG << endl;
  423. //队列有消息通知过来
  424. FDInfo *pFDInfo = (FDInfo*)data->cookie();
  425. ReqMessage * msg = NULL;
  426. size_t maxProcessCount = 0;
  427. try
  428. {
  429. while (pFDInfo->msgQueue->pop_front(msg))
  430. {
  431. msg->pObjectProxy->invoke(msg);
  432. if(++maxProcessCount > 1000)
  433. {
  434. //避免包太多的时候, 循环占用网路线程, 导致连接都建立不上, 一个包都无法发送出去
  435. data->mod(EPOLLOUT);
  436. TLOGTARS("[CommunicatorEpoll::handle max process count: " << maxProcessCount << ", fd:" << data->fd() << "]" << endl);
  437. break;
  438. }
  439. }
  440. if (pFDInfo->msgQueue->empty() && pFDInfo->autoDestroy)
  441. {
  442. // LOG_CONSOLE_DEBUG << "iSeq:" << pFDInfo->iSeq << ", fd:" << pFDInfo->notify.notifyFd() << endl;
  443. delete pFDInfo;
  444. return false;
  445. }
  446. }
  447. catch(exception & e)
  448. {
  449. TLOGERROR("[CommunicatorEpoll::handleNotify error: " << e.what() << "]"<<endl);
  450. }
  451. catch(...)
  452. {
  453. TLOGERROR("[CommunicatorEpoll::handleNotify error]" <<endl);
  454. }
  455. return true;
  456. }
  457. void CommunicatorEpoll::initializeEpoller()
  458. {
  459. _threadId = this_thread::get_id();
  460. _scheduler = TC_CoroutineScheduler::scheduler();
  461. assert(_scheduler);
  462. _epoller = _scheduler->getEpoller();
  463. auto id1 = _epoller->postRepeated(1000, false, std::bind(&CommunicatorEpoll::doReconnect, this));
  464. auto id2 = _epoller->postRepeated(1000 * 5, false, std::bind(&CommunicatorEpoll::doStat, this));
  465. auto id3 = _epoller->postRepeated(_timeoutCheckInterval, false, std::bind(&CommunicatorEpoll::doTimeout, this));
  466. _timerIds = { id1, id2, id3 };
  467. if (_communicator->getKeepAliveInterval() > 0) {
  468. auto id = _epoller->postRepeated(1000 * 2, false, std::bind(&CommunicatorEpoll::doKeepAlive, this));
  469. _timerIds.emplace_back(id);
  470. }
  471. }
  472. void CommunicatorEpoll::run()
  473. {
  474. //注意网络通信器是通过startCoroutine启动的, 因此就在协程中!
  475. _public = true;
  476. initializeEpoller();
  477. _pSptd = ServantProxyThreadData::getData();
  478. _pSptd->_sched = _scheduler;
  479. _netThreadSeq = _pSptd->_reqQNo;
  480. _epoller->setName("communicator-epoller-public-netseq:" + TC_Common::tostr(_netThreadSeq));
  481. //关联公有网络通信器
  482. auto info = _pSptd->addCommunicatorEpoll(shared_from_this());
  483. info->_communicator = this->_communicator;
  484. //当前线程处于网络线程中!
  485. _pSptd->_communicatorEpoll = this;
  486. _communicator->notifyCommunicatorEpollStart();
  487. }
  488. //////////////////////////////////////////////////////////////////////////////////
  489. }