Communicator.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "util/tc_file.h"
  17. #include "util/tc_epoller.h"
  18. #include "servant/Communicator.h"
  19. #include "servant/CommunicatorEpoll.h"
  20. #include "servant/Application.h"
  21. #include "servant/StatReport.h"
  22. #include "servant/RemoteLogger.h"
  23. #include "servant/ObjectProxy.h"
  24. namespace tars
  25. {
  26. //////////////////////////////////////////////////////////////////////////////////////////////
  27. string ClientConfig::LocalIp = "127.0.0.1";
  28. string ClientConfig::ModuleName = "unknown";
  29. set<string> ClientConfig::SetLocalIp;
  30. bool ClientConfig::SetOpen = false;
  31. string ClientConfig::SetDivision = "";
  32. string ClientConfig::TarsVersion = string(TARS_VERSION);
  33. //////////////////////////////////////////////////////////////////////////////////////////////
  34. Communicator::Communicator()
  35. : _initialized(false)
  36. , _terminating(false)
  37. , _statReport(NULL)
  38. , _timeoutLogFlag(true)
  39. , _keepAliveInterval(0)
  40. // #ifdef TARS_OPENTRACKING
  41. // , _traceManager(NULL)
  42. // #endif
  43. {
  44. #if TARGET_PLATFORM_WINDOWS
  45. WSADATA wsadata;
  46. WSAStartup(MAKEWORD(2, 2), &wsadata);
  47. #endif
  48. }
  49. Communicator::Communicator(TC_Config& conf, const string& domain/* = CONFIG_ROOT_PATH*/)
  50. : _initialized(false)
  51. , _terminating(false)
  52. , _timeoutLogFlag(true)
  53. , _keepAliveInterval(0)
  54. // #ifdef TARS_OPENTRACKING
  55. // , _traceManager(NULL)
  56. // #endif
  57. {
  58. setProperty(conf, domain);
  59. }
  60. Communicator::~Communicator()
  61. {
  62. // LOG_CONSOLE_DEBUG << endl;
  63. ServantProxyThreadData::deconstructor(this);
  64. terminate();
  65. #if TARGET_PLATFORM_WINDOWS
  66. WSACleanup();
  67. #endif
  68. }
  69. bool Communicator::isTerminating()
  70. {
  71. return _terminating;
  72. }
  73. map<string, string> Communicator::getServantProperty(const string &sObj)
  74. {
  75. TC_LockT<TC_ThreadRecMutex> lock(*this);
  76. auto it = _objInfo.find(sObj);
  77. if(it != _objInfo.end())
  78. {
  79. return it->second;
  80. }
  81. return map<string, string>();
  82. }
  83. void Communicator::setServantProperty(const string &sObj, const string& name, const string& value)
  84. {
  85. TC_LockT<TC_ThreadRecMutex> lock(*this);
  86. _objInfo[sObj][name] = value;
  87. }
  88. string Communicator::getServantProperty(const string &sObj, const string& name)
  89. {
  90. TC_LockT<TC_ThreadRecMutex> lock(*this);
  91. auto it = _objInfo.find(sObj);
  92. if(it != _objInfo.end())
  93. {
  94. auto vit = it->second.find(name);
  95. if(vit != it->second.end())
  96. {
  97. return vit->second;
  98. }
  99. }
  100. return "";
  101. }
  102. void Communicator::setTraceParam(const string& name)
  103. {
  104. if (!name.empty() && name != "trace_param_max_len")
  105. {
  106. return;
  107. }
  108. string defaultValue = "";
  109. defaultValue = getProperty("trace_param_max_len", defaultValue);
  110. if (!defaultValue.empty() && TC_Common::isdigit(defaultValue))
  111. {
  112. ServantProxyThreadData::setTraceParamMaxLen(TC_Common::strto<unsigned int>(defaultValue));
  113. }
  114. }
  115. shared_ptr<TC_OpenSSL> Communicator::newClientSSL(const string & objName)
  116. {
  117. #if TARS_SSL
  118. TC_LockT<TC_ThreadRecMutex> lock(*this);
  119. auto it = _objCtx.find(objName);
  120. if(it != _objCtx.end())
  121. {
  122. return TC_OpenSSL::newSSL(it->second);
  123. }
  124. if(!_ctx) {
  125. _ctx = TC_OpenSSL::newCtx("", "", "", false, "");
  126. }
  127. return TC_OpenSSL::newSSL(_ctx);
  128. #else
  129. return NULL;
  130. #endif
  131. }
  132. void Communicator::setProperty(TC_Config& conf, const string& domain/* = CONFIG_ROOT_PATH*/)
  133. {
  134. TC_LockT<TC_ThreadRecMutex> lock(*this);
  135. conf.getDomainMap(domain, _properties);
  136. setTraceParam();
  137. string defaultValue = "dft";
  138. if ((defaultValue == getProperty("enableset", defaultValue)) || (defaultValue == getProperty("setdivision", defaultValue)))
  139. {
  140. _properties["enableset"] = conf.get("/tars/application<enableset>", "n");
  141. _properties["setdivision"] = conf.get("/tars/application<setdivision>", "NULL");
  142. }
  143. vector<string> auths;
  144. if (conf.getDomainVector(CONFIG_ROOT_PATH, auths))
  145. {
  146. for(size_t i = 0; i < auths.size(); i++)
  147. {
  148. map<string, string> &data = _objInfo[auths[i]];
  149. data["accesskey"] = conf.get(CONFIG_ROOT_PATH + "/" + auths[i] + "<accesskey>");
  150. data["secretkey"] = conf.get(CONFIG_ROOT_PATH + "/" + auths[i] + "<secretkey>");
  151. data["ca"] = conf.get(CONFIG_ROOT_PATH + "/" + auths[i] + "<ca>");
  152. data["cert"] = conf.get(CONFIG_ROOT_PATH + "/" + auths[i] + "<cert>");
  153. data["key"] = conf.get(CONFIG_ROOT_PATH + "/" + auths[i] + "<key>");
  154. data["ciphers"] = conf.get(CONFIG_ROOT_PATH + "/" + auths[i] + "<ciphers>");
  155. #if TARS_SSL
  156. if(!data["ca"].empty())
  157. {
  158. shared_ptr<TC_OpenSSL::CTX> ctx = TC_OpenSSL::newCtx( data["ca"], data["cert"], data["key"], false, data["ciphers"]);
  159. if(!ctx)
  160. {
  161. TLOGERROR("[load obj:" << auths[i] << ", ssl error, ca:" << data["ca"] << endl);
  162. exit(-1);
  163. }
  164. _objCtx[auths[i]] = ctx;
  165. }
  166. #endif
  167. }
  168. }
  169. // initClientConfig();
  170. }
  171. void Communicator::notifyCommunicatorEpollStart()
  172. {
  173. ++_communicatorEpollStartNum;
  174. std::unique_lock<std::mutex> lock(_mutex);
  175. _cond.notify_one();
  176. }
  177. void Communicator::initialize()
  178. {
  179. TC_LockT<TC_ThreadRecMutex> lock(*this);
  180. //两次保护
  181. if (_initialized)
  182. return;
  183. _initialized = true;
  184. _sigId = TC_Port::registerCtrlC([&]{
  185. TC_Common::msleep(50);
  186. this->terminate();
  187. #if TARGET_PLATFORM_WINDOWS
  188. ExitProcess(0);
  189. #else
  190. exit(0);
  191. #endif
  192. });
  193. ClientConfig::TarsVersion = TARS_VERSION;
  194. ClientConfig::SetOpen = TC_Common::lower(getProperty("enableset", "n")) == "y" ? true : false;
  195. if (ClientConfig::SetOpen)
  196. {
  197. ClientConfig::SetDivision = getProperty("setdivision");
  198. vector<string> vtSetDivisions = TC_Common::sepstr<string>(ClientConfig::SetDivision, ".");
  199. string sWildCard = "*";
  200. if (vtSetDivisions.size() != 3
  201. || vtSetDivisions[0] == sWildCard
  202. || vtSetDivisions[1] == sWildCard)
  203. {
  204. //set分组名不对时默认没有打开set分组
  205. ClientConfig::SetOpen = false;
  206. setProperty("enableset", "n");
  207. TLOGERROR("[set division name error:" << ClientConfig::SetDivision << ", client failed to open set]" << endl);
  208. }
  209. }
  210. ClientConfig::LocalIp = getProperty("localip", "");
  211. if (ClientConfig::SetLocalIp.empty())
  212. {
  213. vector<string> v = TC_Socket::getLocalHosts();
  214. for (size_t i = 0; i < v.size(); i++)
  215. {
  216. if (v[i] != "127.0.0.1" && ClientConfig::LocalIp.empty())
  217. {
  218. ClientConfig::LocalIp = v[i];
  219. }
  220. ClientConfig::SetLocalIp.insert(v[i]);
  221. }
  222. }
  223. //缺省采用进程名称
  224. string exe = "";
  225. try
  226. {
  227. exe = TC_File::extractFileName(TC_File::getExePath());
  228. }
  229. catch (TC_File_Exception& ex)
  230. {
  231. //取失败则使用ip代替进程名
  232. exe = ClientConfig::LocalIp;
  233. }
  234. ClientConfig::ModuleName = getProperty("modulename", exe);
  235. #if TARS_SSL
  236. string ca = getProperty("ca");
  237. string cert = getProperty("cert");
  238. string key = getProperty("key");
  239. string ciphers = getProperty("ciphers");
  240. if(!ca.empty()) {
  241. _ctx = TC_OpenSSL::newCtx(ca, cert, key, false, ciphers);
  242. if (!_ctx)
  243. {
  244. TLOGERROR("load client ssl error, ca:" << ca << endl);
  245. exit(-1);
  246. }
  247. }
  248. #endif
  249. _servantProxyFactory = new ServantProxyFactory(this);
  250. //网络线程
  251. size_t clientThreadNum = TC_Common::strto<size_t>(getProperty("netthread", "1"));
  252. if (0 == clientThreadNum)
  253. {
  254. clientThreadNum = 1;
  255. }
  256. else if(MAX_CLIENT_THREAD_NUM < clientThreadNum)
  257. {
  258. clientThreadNum = MAX_CLIENT_THREAD_NUM;
  259. }
  260. //异步线程数
  261. _asyncThreadNum = TC_Common::strto<size_t>(getProperty("asyncthread", "3"));
  262. if(_asyncThreadNum == 0)
  263. {
  264. _asyncThreadNum = 3;
  265. }
  266. if(_asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM)
  267. {
  268. _asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM;
  269. }
  270. bool merge = TC_Common::strto<bool>(getProperty("mergenetasync", "0"));
  271. //异步队列的大小
  272. size_t iAsyncQueueCap = TC_Common::strto<size_t>(getProperty("asyncqueuecap", "100000"));
  273. if(iAsyncQueueCap < 10000)
  274. {
  275. iAsyncQueueCap = 10000;
  276. }
  277. //第一个通信器才去启动回调线程
  278. for (size_t i = 0; i < _asyncThreadNum; ++i) {
  279. _asyncThread.push_back(new AsyncProcThread(iAsyncQueueCap, merge));
  280. }
  281. //stat总是有对象, 保证getStat返回的对象总是有效
  282. _statReport = new StatReport(this);
  283. _keepAliveInterval = TC_Common::strto<int64_t>(getProperty("keep-alive-interval", "0"))/1000;
  284. if (_keepAliveInterval<5 && _keepAliveInterval!=0)
  285. {
  286. _keepAliveInterval = 5;
  287. }
  288. for (size_t i = 0; i < clientThreadNum; ++i)
  289. {
  290. _communicatorEpoll.push_back(std::make_shared<CommunicatorEpoll>(this, -1, i == 0));
  291. //以协程模式启动
  292. _communicatorEpoll.back()->setThreadName("communicator-epoll-" + TC_Common::tostr(i));
  293. _communicatorEpoll.back()->startCoroutine(3, 128*1024, false);
  294. }
  295. {
  296. std::unique_lock<std::mutex> lock(_mutex);
  297. _cond.wait(lock, [&]{ return _communicatorEpollStartNum == clientThreadNum; });
  298. }
  299. //异步队列数目上报
  300. _reportAsyncQueue= getStatReport()->createPropertyReport(ClientConfig::ModuleName + ".asyncqueue", PropertyReport::avg());
  301. //初始化统计上报接口
  302. string statObj = getProperty("stat", "");
  303. string propertyObj = getProperty("property", "");
  304. int iReportInterval = TC_Common::strto<int>(getProperty("report-interval", "60000"));
  305. int iReportTimeout = TC_Common::strto<int>(getProperty("report-timeout", "5000"));
  306. // int iSampleRate = TC_Common::strto<int>(getProperty("sample-rate", "1000"));
  307. // int iMaxSampleCount = TC_Common::strto<int>(getProperty("max-sample-count", "100"));
  308. int iMaxReportSize = TC_Common::strto<int>(getProperty("max-report-size", "1400"));
  309. _timeoutLogFlag = TC_Common::strto<bool>(getProperty("timeout-log-flag", "1"));
  310. _minTimeout = TC_Common::strto<int64_t>(getProperty("min-timeout", "100"));
  311. if(_minTimeout < 1)
  312. _minTimeout = 1;
  313. StatFPrx statPrx = NULL;
  314. if (!statObj.empty())
  315. {
  316. statPrx = stringToProxy<StatFPrx>(statObj);
  317. }
  318. //上报Property信息的代理
  319. PropertyFPrx propertyPrx = NULL;
  320. if (!propertyObj.empty())
  321. {
  322. propertyPrx = stringToProxy<PropertyFPrx>(propertyObj);
  323. }
  324. string sSetDivision = ClientConfig::SetOpen ? ClientConfig::SetDivision : "";
  325. _statReport->setReportInfo(statPrx, propertyPrx, ClientConfig::ModuleName, ClientConfig::LocalIp, sSetDivision, iReportInterval, 0, 0, iMaxReportSize, iReportTimeout);
  326. #if TARS_OPENTRACKING
  327. string collector_host = getProperty("collector_host", "");
  328. string collector_port = getProperty("collector_port", "");
  329. if(!collector_host.empty() && !collector_port.empty())
  330. {
  331. //init zipkin config
  332. zipkin::ZipkinOtTracerOptions options;
  333. options.service_name = ClientConfig::ModuleName;
  334. options.service_address = {zipkin::IpVersion::v4, ClientConfig::LocalIp};
  335. options.sample_rate = strtod(getProperty("sample_rate", "1.0").c_str(), NULL);
  336. options.collector_host = collector_host;
  337. options.collector_port = atoi(collector_port.c_str());
  338. _traceManager = new TraceManager(options);
  339. assert(_traceManager != NULL);
  340. }
  341. #endif
  342. }
  343. void Communicator::setProperty(const map<string, string>& properties)
  344. {
  345. TC_LockT<TC_ThreadRecMutex> lock(*this);
  346. _properties = properties;
  347. setTraceParam();
  348. }
  349. void Communicator::setProperty(const string& name, const string& value)
  350. {
  351. TC_LockT<TC_ThreadRecMutex> lock(*this);
  352. _properties[name] = value;
  353. setTraceParam(name);
  354. }
  355. string Communicator::getProperty(const string& name, const string& dft/* = ""*/)
  356. {
  357. TC_LockT<TC_ThreadRecMutex> lock(*this);
  358. map<string, string>::iterator it = _properties.find(name);
  359. if (it != _properties.end())
  360. {
  361. return it->second;
  362. }
  363. return dft;
  364. }
  365. vector<shared_ptr<CommunicatorEpoll>> Communicator::getAllCommunicatorEpoll()
  366. {
  367. vector<shared_ptr<CommunicatorEpoll>> communicatorEpolls = _communicatorEpoll;
  368. forEachSchedCommunicatorEpoll([&](const shared_ptr<CommunicatorEpoll> &c){
  369. communicatorEpolls.push_back(c);
  370. });
  371. return communicatorEpolls;
  372. }
  373. void Communicator::forEachSchedCommunicatorEpoll(std::function<void(const shared_ptr<CommunicatorEpoll> &)> func)
  374. {
  375. TC_LockT<TC_SpinLock> lock(_schedMutex);
  376. for(auto it : _schedCommunicatorEpoll)
  377. {
  378. func(it.second);
  379. }
  380. }
  381. shared_ptr<CommunicatorEpoll> Communicator::createSchedCommunicatorEpoll(size_t netThreadSeq, const shared_ptr<ReqInfoQueue> &reqInfoQueue)
  382. {
  383. shared_ptr<CommunicatorEpoll> communicatorEpoll = std::make_shared<CommunicatorEpoll>(this, netThreadSeq);
  384. communicatorEpoll->initializeEpoller();
  385. communicatorEpoll->initNotify(netThreadSeq, reqInfoQueue);
  386. {
  387. TC_LockT<TC_SpinLock> lock(_schedMutex);
  388. _schedCommunicatorEpoll.insert(std::make_pair(netThreadSeq, communicatorEpoll));
  389. }
  390. return communicatorEpoll;
  391. }
  392. void Communicator::eraseSchedCommunicatorEpoll(size_t netThreadSeq)
  393. {
  394. shared_ptr<CommunicatorEpoll> ce;
  395. {
  396. TC_LockT<TC_SpinLock> lock(_schedMutex);
  397. ce = _schedCommunicatorEpoll[netThreadSeq];
  398. _schedCommunicatorEpoll.erase(netThreadSeq);
  399. }
  400. if(ce)
  401. {
  402. ce->terminate();
  403. }
  404. }
  405. void Communicator::reloadLocator()
  406. {
  407. for (size_t i = 0; i < _communicatorEpoll.size(); ++i)
  408. {
  409. _communicatorEpoll[i]->_epoller->syncCallback(std::bind(&CommunicatorEpoll::loadObjectLocator, _communicatorEpoll[i].get()));
  410. // _communicatorEpoll[i]->loadObjectLocator();
  411. }
  412. forEachSchedCommunicatorEpoll([](const shared_ptr<CommunicatorEpoll> &c){
  413. c->_epoller->syncCallback(std::bind(&CommunicatorEpoll::loadObjectLocator, c.get()));
  414. // c->loadObjectLocator();
  415. });
  416. }
  417. int Communicator::reloadProperty(string & sResult)
  418. {
  419. // size_t num = getCommunicatorEpollNum();
  420. reloadLocator();
  421. int iReportInterval = TC_Common::strto<int>(getProperty("report-interval", "60000"));
  422. int iReportTimeout = TC_Common::strto<int>(getProperty("report-timeout", "5000"));
  423. int iMaxReportSize = TC_Common::strto<int>(getProperty("max-report-size", "1400"));
  424. string statObj = getProperty("stat", "");
  425. string propertyObj = getProperty("property", "");
  426. StatFPrx statPrx = NULL;
  427. if (!statObj.empty())
  428. {
  429. statPrx = stringToProxy<StatFPrx>(statObj);
  430. }
  431. PropertyFPrx propertyPrx = NULL;
  432. if (!propertyObj.empty())
  433. {
  434. propertyPrx = stringToProxy<PropertyFPrx>(propertyObj);
  435. }
  436. string sSetDivision = ClientConfig::SetOpen ? ClientConfig::SetDivision : "";
  437. _statReport->setReportInfo(statPrx, propertyPrx, ClientConfig::ModuleName, ClientConfig::LocalIp, sSetDivision, iReportInterval, 0, 0, iMaxReportSize, iReportTimeout);
  438. sResult = "locator=" + getProperty("locator", "") + "\r\n" +
  439. "stat=" + statObj + "\r\n" + "property=" + propertyObj + "\r\n" +
  440. "SetDivision=" + sSetDivision + "\r\n" +
  441. "report-interval=" + TC_Common::tostr(iReportInterval) + "\r\n" +
  442. "report-timeout=" + TC_Common::tostr(iReportTimeout) + "\r\n";
  443. return 0;
  444. }
  445. vector<TC_Endpoint> Communicator::getEndpoint(const string& objName)
  446. {
  447. ServantProxy * pServantProxy = getServantProxy(objName, "", true);
  448. return pServantProxy->getEndpoint();
  449. }
  450. vector<TC_Endpoint> Communicator::getEndpoint4All(const string& objName)
  451. {
  452. ServantProxy *pServantProxy = getServantProxy(objName, "", true);
  453. return pServantProxy->getEndpoint4All();
  454. }
  455. string Communicator::getResourcesInfo()
  456. {
  457. ostringstream os;
  458. for (size_t i = 0; i < _communicatorEpoll.size(); ++i)
  459. {
  460. os << OUT_LINE << endl;
  461. _communicatorEpoll[i]->_epoller->syncCallback(std::bind(&CommunicatorEpoll::getResourcesInfo, _communicatorEpoll[i].get(), std::ref(os)));
  462. }
  463. forEachSchedCommunicatorEpoll([&](const shared_ptr<CommunicatorEpoll> & c){
  464. os << OUT_LINE << endl;
  465. c->_epoller->syncCallback(std::bind(&CommunicatorEpoll::getResourcesInfo, c.get(), std::ref(os)));
  466. });
  467. return os.str();
  468. }
  469. void Communicator::terminate()
  470. {
  471. {
  472. if (_terminating)
  473. return;
  474. TC_LockT<TC_ThreadRecMutex> lock(*this);
  475. _terminating = true;
  476. TC_Port::unregisterCtrlC(_sigId);
  477. if (_initialized)
  478. {
  479. //先要结束stat的, 这样后续结束可以把stat队列清空
  480. if (_statReport)
  481. {
  482. _statReport->terminate();
  483. _statReport->getThreadControl().join();
  484. }
  485. for (size_t i = 0; i < _communicatorEpoll.size(); ++i)
  486. {
  487. if(_communicatorEpoll[i]) {
  488. _communicatorEpoll[i]->terminate();
  489. }
  490. }
  491. forEachSchedCommunicatorEpoll([](const shared_ptr<CommunicatorEpoll> &c)
  492. {
  493. c->terminate();
  494. });
  495. for(size_t i = 0;i < _asyncThreadNum; ++i)
  496. {
  497. _asyncThread[i]->terminate();
  498. }
  499. }
  500. }
  501. //把锁释放掉, 再来等待线程停止, 避免死锁
  502. //因为通信器线程运行过程中, 有可能加上上面那把锁
  503. if (_initialized)
  504. {
  505. // LOG_CONSOLE_DEBUG << endl;
  506. //停止掉异步线程
  507. for (size_t i = 0; i < _asyncThread.size(); ++i)
  508. {
  509. if (_asyncThread[i])
  510. {
  511. if (_asyncThread[i]->joinable())
  512. {
  513. _asyncThread[i]->join();
  514. }
  515. }
  516. }
  517. //停止掉网络线程
  518. for (size_t i = 0; i < _communicatorEpoll.size(); ++i)
  519. {
  520. if(_communicatorEpoll[i]->joinable())
  521. {
  522. _communicatorEpoll[i]->join();
  523. }
  524. }
  525. //都停止了, 再把异步线程析构掉(因为异步线程中会调用网络线程的数据)
  526. for (size_t i = 0; i < _asyncThread.size(); ++i)
  527. {
  528. delete _asyncThread[i];
  529. _asyncThread[i] = NULL;
  530. }
  531. _asyncThread.clear();
  532. if(_statReport)
  533. {
  534. delete _statReport;
  535. _statReport = NULL;
  536. }
  537. if(_servantProxyFactory)
  538. {
  539. delete _servantProxyFactory;
  540. _servantProxyFactory = NULL;
  541. }
  542. _communicatorEpoll.clear();
  543. _schedCommunicatorEpoll.clear();
  544. }
  545. }
  546. void Communicator::pushAsyncThreadQueue(ReqMessage * msg)
  547. {
  548. if (msg->pObjectProxy->getRootServantProxy()->_callback)
  549. {
  550. ReqMessagePtr msgPtr = msg;
  551. msg->pObjectProxy->getRootServantProxy()->_callback(msgPtr);
  552. }
  553. else if (msg->pObjectProxy->getRootServantProxy()->_callbackHash && msg->adapter )
  554. {
  555. //先不考虑每个线程队列数目不一致的情况
  556. _asyncThread[((uint32_t) msg->adapter->trans()->fd()) % _asyncThreadNum]->push_back(msg);
  557. }
  558. else
  559. {
  560. //先不考虑每个线程队列数目不一致的情况
  561. _asyncThread[(_asyncSeq++) % _asyncThreadNum]->push_back(msg);
  562. }
  563. }
  564. void Communicator::doStat()
  565. {
  566. //队列长度上报
  567. if (_reportAsyncQueue) {
  568. size_t n = 0;
  569. for (size_t i = 0; i < _asyncThread.size(); ++i)
  570. {
  571. n = n + _asyncThread[i]->getSize();
  572. }
  573. _reportAsyncQueue->report((int) n);
  574. }
  575. }
  576. ServantProxy* Communicator::getServantProxy(const string& objectName, const string& setName, bool rootServant)
  577. {
  578. Communicator::initialize();
  579. return _servantProxyFactory->getServantProxy(objectName, setName, rootServant);
  580. }
  581. StatReport* Communicator::getStatReport()
  582. {
  583. Communicator::initialize();
  584. return _statReport;
  585. }
  586. ServantProxyFactory* Communicator::servantProxyFactory()
  587. {
  588. return _servantProxyFactory;
  589. }
  590. ///////////////////////////////////////////////////////////////
  591. }