Communicator.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "util/tc_file.h"
  17. #include "util/tc_epoller.h"
  18. #include "servant/Communicator.h"
  19. #include "servant/CommunicatorEpoll.h"
  20. #include "servant/Application.h"
  21. #include "servant/StatReport.h"
  22. #include "servant/RemoteLogger.h"
  23. #include "servant/ObjectProxy.h"
  24. namespace tars
  25. {
  26. //////////////////////////////////////////////////////////////////////////////////////////////
  27. //
  28. //string ClientConfig::LocalIp = "127.0.0.1";
  29. //
  30. //string ClientConfig::ModuleName = "unknown";
  31. //
  32. //set<string> ClientConfig::SetLocalIp;
  33. //
  34. //bool ClientConfig::SetOpen = false;
  35. //
  36. //string ClientConfig::SetDivision = "";
  37. //
  38. //string ClientConfig::TarsVersion = string(TARS_VERSION);
  39. //////////////////////////////////////////////////////////////////////////////////////////////
  40. Communicator::Communicator()
  41. : _initialized(false)
  42. , _terminating(false)
  43. , _appCache(this)
  44. , _statReport(NULL)
  45. , _timeoutLogFlag(true)
  46. , _keepAliveInterval(0)
  47. // #ifdef TARS_OPENTRACKING
  48. // , _traceManager(NULL)
  49. // #endif
  50. {
  51. #if TARGET_PLATFORM_WINDOWS
  52. WSADATA wsadata;
  53. WSAStartup(MAKEWORD(2, 2), &wsadata);
  54. #endif
  55. }
  56. Communicator::Communicator(TC_Config& conf, const string& domain/* = CONFIG_ROOT_PATH*/)
  57. : _initialized(false)
  58. , _terminating(false)
  59. , _appCache(this)
  60. , _timeoutLogFlag(true)
  61. , _keepAliveInterval(0)
  62. // #ifdef TARS_OPENTRACKING
  63. // , _traceManager(NULL)
  64. // #endif
  65. {
  66. setProperty(conf, domain);
  67. }
  68. Communicator::~Communicator()
  69. {
  70. // LOG_CONSOLE_DEBUG << endl;
  71. ServantProxyThreadData::deconstructor(this);
  72. terminate();
  73. #if TARGET_PLATFORM_WINDOWS
  74. WSACleanup();
  75. #endif
  76. }
  77. bool Communicator::isTerminating()
  78. {
  79. return _terminating;
  80. }
  81. map<string, string> Communicator::getServantProperty(const string &sObj)
  82. {
  83. TC_LockT<TC_ThreadRecMutex> lock(*this);
  84. auto it = _objInfo.find(sObj);
  85. if(it != _objInfo.end())
  86. {
  87. return it->second;
  88. }
  89. return map<string, string>();
  90. }
  91. void Communicator::setServantProperty(const string &sObj, const string& name, const string& value)
  92. {
  93. TC_LockT<TC_ThreadRecMutex> lock(*this);
  94. _objInfo[sObj][name] = value;
  95. }
  96. string Communicator::getServantProperty(const string &sObj, const string& name)
  97. {
  98. TC_LockT<TC_ThreadRecMutex> lock(*this);
  99. auto it = _objInfo.find(sObj);
  100. if(it != _objInfo.end())
  101. {
  102. auto vit = it->second.find(name);
  103. if(vit != it->second.end())
  104. {
  105. return vit->second;
  106. }
  107. }
  108. return "";
  109. }
  110. void Communicator::setTraceParam(const string& name)
  111. {
  112. if (!name.empty() && name != "trace_param_max_len")
  113. {
  114. return;
  115. }
  116. string defaultValue = "";
  117. defaultValue = getProperty("trace_param_max_len", defaultValue);
  118. if (!defaultValue.empty() && TC_Common::isdigit(defaultValue))
  119. {
  120. ServantProxyThreadData::setTraceParamMaxLen(TC_Common::strto<unsigned int>(defaultValue));
  121. }
  122. }
  123. shared_ptr<TC_OpenSSL> Communicator::newClientSSL(const string & objName)
  124. {
  125. #if TARS_SSL
  126. TC_LockT<TC_ThreadRecMutex> lock(*this);
  127. auto it = _objCtx.find(objName);
  128. if(it != _objCtx.end())
  129. {
  130. return TC_OpenSSL::newSSL(it->second);
  131. }
  132. if(!_ctx) {
  133. _ctx = TC_OpenSSL::newCtx("", "", "", false, "");
  134. }
  135. return TC_OpenSSL::newSSL(_ctx);
  136. #else
  137. return NULL;
  138. #endif
  139. }
  140. void Communicator::setProperty(TC_Config& conf, const string& domain/* = CONFIG_ROOT_PATH*/)
  141. {
  142. TC_LockT<TC_ThreadRecMutex> lock(*this);
  143. conf.getDomainMap(domain, _properties);
  144. setTraceParam();
  145. string defaultValue = "dft";
  146. if ((defaultValue == getProperty("enableset", defaultValue)) || (defaultValue == getProperty("setdivision", defaultValue)))
  147. {
  148. _properties["enableset"] = conf.get("/tars/application<enableset>", "n");
  149. _properties["setdivision"] = conf.get("/tars/application<setdivision>", "NULL");
  150. }
  151. vector<string> auths;
  152. if (conf.getDomainVector(CONFIG_ROOT_PATH, auths))
  153. {
  154. for(size_t i = 0; i < auths.size(); i++)
  155. {
  156. map<string, string> &data = _objInfo[auths[i]];
  157. data["accesskey"] = conf.get(CONFIG_ROOT_PATH + "/" + auths[i] + "<accesskey>");
  158. data["secretkey"] = conf.get(CONFIG_ROOT_PATH + "/" + auths[i] + "<secretkey>");
  159. data["ca"] = conf.get(CONFIG_ROOT_PATH + "/" + auths[i] + "<ca>");
  160. data["cert"] = conf.get(CONFIG_ROOT_PATH + "/" + auths[i] + "<cert>");
  161. data["key"] = conf.get(CONFIG_ROOT_PATH + "/" + auths[i] + "<key>");
  162. data["ciphers"] = conf.get(CONFIG_ROOT_PATH + "/" + auths[i] + "<ciphers>");
  163. #if TARS_SSL
  164. if(!data["ca"].empty())
  165. {
  166. shared_ptr<TC_OpenSSL::CTX> ctx = TC_OpenSSL::newCtx( data["ca"], data["cert"], data["key"], false, data["ciphers"]);
  167. if(!ctx)
  168. {
  169. TLOGERROR("[load obj:" << auths[i] << ", ssl error, ca:" << data["ca"] << endl);
  170. exit(-1);
  171. }
  172. _objCtx[auths[i]] = ctx;
  173. }
  174. #endif
  175. }
  176. }
  177. // initClientConfig();
  178. }
  179. void Communicator::notifyCommunicatorEpollStart()
  180. {
  181. ++_communicatorEpollStartNum;
  182. std::unique_lock<std::mutex> lock(_mutex);
  183. _cond.notify_one();
  184. }
  185. void Communicator::initialize()
  186. {
  187. TC_LockT<TC_ThreadRecMutex> lock(*this);
  188. //两次保护
  189. if (_initialized)
  190. return;
  191. _initialized = true;
  192. _sigId = TC_Port::registerCtrlC([&]{
  193. TC_Common::msleep(50);
  194. this->terminate();
  195. #if TARGET_PLATFORM_WINDOWS
  196. ExitProcess(0);
  197. #else
  198. exit(0);
  199. #endif
  200. });
  201. _clientConfig.TarsVersion = TARS_VERSION;
  202. _clientConfig.SetOpen = TC_Common::lower(getProperty("enableset", "n")) == "y" ? true : false;
  203. if (_clientConfig.SetOpen)
  204. {
  205. _clientConfig.SetDivision = getProperty("setdivision");
  206. vector<string> vtSetDivisions = TC_Common::sepstr<string>(_clientConfig.SetDivision, ".");
  207. string sWildCard = "*";
  208. if (vtSetDivisions.size() != 3
  209. || vtSetDivisions[0] == sWildCard
  210. || vtSetDivisions[1] == sWildCard)
  211. {
  212. //set分组名不对时默认没有打开set分组
  213. _clientConfig.SetOpen = false;
  214. setProperty("enableset", "n");
  215. TLOGERROR("[set division name error:" << _clientConfig.SetDivision << ", client failed to open set]" << endl);
  216. }
  217. }
  218. _clientConfig.LocalIp = getProperty("localip", "");
  219. if (_clientConfig.SetLocalIp.empty())
  220. {
  221. vector<string> v = TC_Socket::getLocalHosts();
  222. for (size_t i = 0; i < v.size(); i++)
  223. {
  224. if (v[i] != "127.0.0.1" && _clientConfig.LocalIp.empty())
  225. {
  226. _clientConfig.LocalIp = v[i];
  227. }
  228. _clientConfig.SetLocalIp.insert(v[i]);
  229. }
  230. }
  231. //缺省采用进程名称
  232. string exe = "";
  233. try
  234. {
  235. exe = TC_File::extractFileName(TC_File::getExePath());
  236. }
  237. catch (TC_File_Exception& ex)
  238. {
  239. //取失败则使用ip代替进程名
  240. exe = _clientConfig.LocalIp;
  241. }
  242. _clientConfig.ModuleName = getProperty("modulename", exe);
  243. #if TARS_SSL
  244. string ca = getProperty("ca");
  245. string cert = getProperty("cert");
  246. string key = getProperty("key");
  247. string ciphers = getProperty("ciphers");
  248. if(!ca.empty()) {
  249. _ctx = TC_OpenSSL::newCtx(ca, cert, key, false, ciphers);
  250. if (!_ctx)
  251. {
  252. TLOGERROR("load client ssl error, ca:" << ca << endl);
  253. exit(-1);
  254. }
  255. }
  256. #endif
  257. _servantProxyFactory = new ServantProxyFactory(this);
  258. //网络线程
  259. size_t clientThreadNum = TC_Common::strto<size_t>(getProperty("netthread", "1"));
  260. if (0 == clientThreadNum)
  261. {
  262. clientThreadNum = 1;
  263. }
  264. else if(MAX_CLIENT_THREAD_NUM < clientThreadNum)
  265. {
  266. clientThreadNum = MAX_CLIENT_THREAD_NUM;
  267. }
  268. //异步线程数
  269. _asyncThreadNum = TC_Common::strto<size_t>(getProperty("asyncthread", "3"));
  270. if(_asyncThreadNum == 0)
  271. {
  272. _asyncThreadNum = 3;
  273. }
  274. if(_asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM)
  275. {
  276. _asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM;
  277. }
  278. bool merge = TC_Common::strto<bool>(getProperty("mergenetasync", "0"));
  279. //异步队列的大小
  280. size_t iAsyncQueueCap = TC_Common::strto<size_t>(getProperty("asyncqueuecap", "100000"));
  281. if(iAsyncQueueCap < 10000)
  282. {
  283. iAsyncQueueCap = 10000;
  284. }
  285. //第一个通信器才去启动回调线程
  286. for (size_t i = 0; i < _asyncThreadNum; ++i) {
  287. _asyncThread.push_back(new AsyncProcThread(iAsyncQueueCap, merge));
  288. }
  289. //stat总是有对象, 保证getStat返回的对象总是有效
  290. _statReport = new StatReport(this);
  291. _keepAliveInterval = TC_Common::strto<int64_t>(getProperty("keep-alive-interval", "0"))/1000;
  292. if (_keepAliveInterval<5 && _keepAliveInterval!=0)
  293. {
  294. _keepAliveInterval = 5;
  295. }
  296. for (size_t i = 0; i < clientThreadNum; ++i)
  297. {
  298. _communicatorEpoll.push_back(std::make_shared<CommunicatorEpoll>(this, -1, i == 0));
  299. //以协程模式启动
  300. _communicatorEpoll.back()->setThreadName("communicator-epoll-" + TC_Common::tostr(i));
  301. _communicatorEpoll.back()->startCoroutine(3, 128*1024, false);
  302. }
  303. {
  304. std::unique_lock<std::mutex> lock(_mutex);
  305. _cond.wait(lock, [&]{ return _communicatorEpollStartNum == clientThreadNum; });
  306. }
  307. //异步队列数目上报
  308. _reportAsyncQueue= getStatReport()->createPropertyReport(_clientConfig.ModuleName + ".asyncqueue", PropertyReport::avg());
  309. //初始化统计上报接口
  310. string statObj = getProperty("stat", "");
  311. string propertyObj = getProperty("property", "");
  312. int iReportInterval = TC_Common::strto<int>(getProperty("report-interval", "60000"));
  313. int iReportTimeout = TC_Common::strto<int>(getProperty("report-timeout", "5000"));
  314. // int iSampleRate = TC_Common::strto<int>(getProperty("sample-rate", "1000"));
  315. // int iMaxSampleCount = TC_Common::strto<int>(getProperty("max-sample-count", "100"));
  316. int iMaxReportSize = TC_Common::strto<int>(getProperty("max-report-size", "1400"));
  317. _timeoutLogFlag = TC_Common::strto<bool>(getProperty("timeout-log-flag", "1"));
  318. _minTimeout = TC_Common::strto<int64_t>(getProperty("min-timeout", "100"));
  319. if(_minTimeout < 1)
  320. _minTimeout = 1;
  321. StatFPrx statPrx = NULL;
  322. if (!statObj.empty())
  323. {
  324. statPrx = stringToProxy<StatFPrx>(statObj);
  325. }
  326. //上报Property信息的代理
  327. PropertyFPrx propertyPrx = NULL;
  328. if (!propertyObj.empty())
  329. {
  330. propertyPrx = stringToProxy<PropertyFPrx>(propertyObj);
  331. }
  332. string sSetDivision = _clientConfig.SetOpen ? _clientConfig.SetDivision : "";
  333. _statReport->setReportInfo(statPrx, propertyPrx, _clientConfig.ModuleName, _clientConfig.LocalIp, sSetDivision, iReportInterval, 0, 0, iMaxReportSize, iReportTimeout);
  334. #if TARS_OPENTRACKING
  335. string collector_host = getProperty("collector_host", "");
  336. string collector_port = getProperty("collector_port", "");
  337. if(!collector_host.empty() && !collector_port.empty())
  338. {
  339. //init zipkin config
  340. zipkin::ZipkinOtTracerOptions options;
  341. options.service_name = _clientConfig.ModuleName;
  342. options.service_address = {zipkin::IpVersion::v4, _clientConfig.LocalIp};
  343. options.sample_rate = strtod(getProperty("sample_rate", "1.0").c_str(), NULL);
  344. options.collector_host = collector_host;
  345. options.collector_port = atoi(collector_port.c_str());
  346. _traceManager = new TraceManager(options);
  347. assert(_traceManager != NULL);
  348. }
  349. #endif
  350. }
  351. void Communicator::setProperty(const map<string, string>& properties)
  352. {
  353. TC_LockT<TC_ThreadRecMutex> lock(*this);
  354. _properties = properties;
  355. setTraceParam();
  356. }
  357. void Communicator::setProperty(const string& name, const string& value)
  358. {
  359. TC_LockT<TC_ThreadRecMutex> lock(*this);
  360. _properties[name] = value;
  361. setTraceParam(name);
  362. }
  363. string Communicator::getProperty(const string& name, const string& dft/* = ""*/)
  364. {
  365. TC_LockT<TC_ThreadRecMutex> lock(*this);
  366. map<string, string>::iterator it = _properties.find(name);
  367. if (it != _properties.end())
  368. {
  369. return it->second;
  370. }
  371. return dft;
  372. }
  373. vector<shared_ptr<CommunicatorEpoll>> Communicator::getAllCommunicatorEpoll()
  374. {
  375. vector<shared_ptr<CommunicatorEpoll>> communicatorEpolls = _communicatorEpoll;
  376. forEachSchedCommunicatorEpoll([&](const shared_ptr<CommunicatorEpoll> &c){
  377. communicatorEpolls.push_back(c);
  378. });
  379. return communicatorEpolls;
  380. }
  381. void Communicator::forEachSchedCommunicatorEpoll(std::function<void(const shared_ptr<CommunicatorEpoll> &)> func)
  382. {
  383. TC_LockT<TC_ThreadMutex> lock(_schedMutex);
  384. for(auto it : _schedCommunicatorEpoll)
  385. {
  386. func(it.second);
  387. }
  388. }
  389. shared_ptr<CommunicatorEpoll> Communicator::createSchedCommunicatorEpoll(size_t netThreadSeq, const shared_ptr<ReqInfoQueue> &reqInfoQueue)
  390. {
  391. shared_ptr<CommunicatorEpoll> communicatorEpoll = std::make_shared<CommunicatorEpoll>(this, netThreadSeq);
  392. communicatorEpoll->initializeEpoller();
  393. communicatorEpoll->initNotify(netThreadSeq, reqInfoQueue);
  394. {
  395. TC_LockT<TC_ThreadMutex> lock(_schedMutex);
  396. _schedCommunicatorEpoll.insert(std::make_pair(netThreadSeq, communicatorEpoll));
  397. }
  398. return communicatorEpoll;
  399. }
  400. void Communicator::eraseSchedCommunicatorEpoll(size_t netThreadSeq)
  401. {
  402. shared_ptr<CommunicatorEpoll> ce;
  403. {
  404. TC_LockT<TC_ThreadMutex> lock(_schedMutex);
  405. ce = _schedCommunicatorEpoll[netThreadSeq];
  406. _schedCommunicatorEpoll.erase(netThreadSeq);
  407. }
  408. if(ce)
  409. {
  410. ce->terminate();
  411. }
  412. }
  413. void Communicator::reloadLocator()
  414. {
  415. for (size_t i = 0; i < _communicatorEpoll.size(); ++i)
  416. {
  417. _communicatorEpoll[i]->_epoller->asyncCallback(std::bind(&CommunicatorEpoll::loadObjectLocator, _communicatorEpoll[i].get()));
  418. }
  419. forEachSchedCommunicatorEpoll([](const shared_ptr<CommunicatorEpoll> &c){
  420. c->_epoller->asyncCallback(std::bind(&CommunicatorEpoll::loadObjectLocator, c.get()));
  421. });
  422. }
  423. int Communicator::reloadProperty(string & sResult)
  424. {
  425. // size_t num = getCommunicatorEpollNum();
  426. reloadLocator();
  427. int iReportInterval = TC_Common::strto<int>(getProperty("report-interval", "60000"));
  428. int iReportTimeout = TC_Common::strto<int>(getProperty("report-timeout", "5000"));
  429. int iMaxReportSize = TC_Common::strto<int>(getProperty("max-report-size", "1400"));
  430. string statObj = getProperty("stat", "");
  431. string propertyObj = getProperty("property", "");
  432. StatFPrx statPrx = NULL;
  433. if (!statObj.empty())
  434. {
  435. statPrx = stringToProxy<StatFPrx>(statObj);
  436. }
  437. PropertyFPrx propertyPrx = NULL;
  438. if (!propertyObj.empty())
  439. {
  440. propertyPrx = stringToProxy<PropertyFPrx>(propertyObj);
  441. }
  442. string sSetDivision = _clientConfig.SetOpen ? _clientConfig.SetDivision : "";
  443. _statReport->setReportInfo(statPrx, propertyPrx, _clientConfig.ModuleName, _clientConfig.LocalIp, sSetDivision, iReportInterval, 0, 0, iMaxReportSize, iReportTimeout);
  444. sResult = "locator=" + getProperty("locator", "") + "\r\n" +
  445. "stat=" + statObj + "\r\n" + "property=" + propertyObj + "\r\n" +
  446. "SetDivision=" + sSetDivision + "\r\n" +
  447. "report-interval=" + TC_Common::tostr(iReportInterval) + "\r\n" +
  448. "report-timeout=" + TC_Common::tostr(iReportTimeout) + "\r\n";
  449. return 0;
  450. }
  451. vector<TC_Endpoint> Communicator::getEndpoint(const string& objName)
  452. {
  453. ServantProxy * pServantProxy = getServantProxy(objName, "", true);
  454. return pServantProxy->getEndpoint();
  455. }
  456. vector<TC_Endpoint> Communicator::getEndpoint4All(const string& objName)
  457. {
  458. ServantProxy *pServantProxy = getServantProxy(objName, "", true);
  459. return pServantProxy->getEndpoint4All();
  460. }
  461. string Communicator::getResourcesInfo()
  462. {
  463. ostringstream os;
  464. for (size_t i = 0; i < _communicatorEpoll.size(); ++i)
  465. {
  466. os << OUT_LINE << endl;
  467. _communicatorEpoll[i]->_epoller->syncCallback(std::bind(&CommunicatorEpoll::getResourcesInfo, _communicatorEpoll[i].get(), std::ref(os)));
  468. }
  469. forEachSchedCommunicatorEpoll([&](const shared_ptr<CommunicatorEpoll> & c){
  470. os << OUT_LINE << endl;
  471. c->_epoller->syncCallback(std::bind(&CommunicatorEpoll::getResourcesInfo, c.get(), std::ref(os)));
  472. });
  473. return os.str();
  474. }
  475. void Communicator::terminate()
  476. {
  477. {
  478. if (_terminating)
  479. return;
  480. TC_LockT<TC_ThreadRecMutex> lock(*this);
  481. _terminating = true;
  482. TC_Port::unregisterCtrlC(_sigId);
  483. if (_initialized)
  484. {
  485. //先要结束stat的, 这样后续结束可以把stat队列清空
  486. if (_statReport)
  487. {
  488. _statReport->terminate();
  489. _statReport->getThreadControl().join();
  490. }
  491. for (size_t i = 0; i < _communicatorEpoll.size(); ++i)
  492. {
  493. if(_communicatorEpoll[i]) {
  494. _communicatorEpoll[i]->terminate();
  495. }
  496. }
  497. forEachSchedCommunicatorEpoll([](const shared_ptr<CommunicatorEpoll> &c)
  498. {
  499. c->terminate();
  500. });
  501. for(size_t i = 0;i < _asyncThreadNum; ++i)
  502. {
  503. _asyncThread[i]->terminate();
  504. }
  505. }
  506. }
  507. //把锁释放掉, 再来等待线程停止, 避免死锁
  508. //因为通信器线程运行过程中, 有可能加上上面那把锁
  509. if (_initialized)
  510. {
  511. // LOG_CONSOLE_DEBUG << endl;
  512. //停止掉异步线程
  513. for (size_t i = 0; i < _asyncThread.size(); ++i)
  514. {
  515. if (_asyncThread[i])
  516. {
  517. if (_asyncThread[i]->joinable())
  518. {
  519. _asyncThread[i]->join();
  520. }
  521. }
  522. }
  523. //停止掉网络线程
  524. for (size_t i = 0; i < _communicatorEpoll.size(); ++i)
  525. {
  526. if(_communicatorEpoll[i]->joinable())
  527. {
  528. _communicatorEpoll[i]->join();
  529. }
  530. }
  531. //都停止了, 再把异步线程析构掉(因为异步线程中会调用网络线程的数据)
  532. for (size_t i = 0; i < _asyncThread.size(); ++i)
  533. {
  534. delete _asyncThread[i];
  535. _asyncThread[i] = NULL;
  536. }
  537. _asyncThread.clear();
  538. if(_statReport)
  539. {
  540. delete _statReport;
  541. _statReport = NULL;
  542. }
  543. if(_servantProxyFactory)
  544. {
  545. delete _servantProxyFactory;
  546. _servantProxyFactory = NULL;
  547. }
  548. _communicatorEpoll.clear();
  549. _schedCommunicatorEpoll.clear();
  550. }
  551. }
  552. void Communicator::pushAsyncThreadQueue(ReqMessage * msg)
  553. {
  554. if (msg->pObjectProxy->getRootServantProxy()->_callback)
  555. {
  556. ReqMessagePtr msgPtr = msg;
  557. msg->pObjectProxy->getRootServantProxy()->_callback(msgPtr);
  558. }
  559. else if (msg->pObjectProxy->getRootServantProxy()->_callbackHash && msg->adapter )
  560. {
  561. //先不考虑每个线程队列数目不一致的情况
  562. _asyncThread[((uint32_t) msg->adapter->trans()->fd()) % _asyncThreadNum]->push_back(msg);
  563. }
  564. else
  565. {
  566. //先不考虑每个线程队列数目不一致的情况
  567. _asyncThread[(_asyncSeq++) % _asyncThreadNum]->push_back(msg);
  568. }
  569. }
  570. void Communicator::doStat()
  571. {
  572. //队列长度上报
  573. if (_reportAsyncQueue) {
  574. size_t n = 0;
  575. for (size_t i = 0; i < _asyncThread.size(); ++i)
  576. {
  577. n = n + _asyncThread[i]->getSize();
  578. }
  579. _reportAsyncQueue->report((int) n);
  580. }
  581. }
  582. ServantProxy* Communicator::getServantProxy(const string& objectName, const string& setName, bool rootServant)
  583. {
  584. Communicator::initialize();
  585. return _servantProxyFactory->getServantProxy(objectName, setName, rootServant);
  586. }
  587. StatReport* Communicator::getStatReport()
  588. {
  589. Communicator::initialize();
  590. return _statReport;
  591. }
  592. ServantProxyFactory* Communicator::servantProxyFactory()
  593. {
  594. return _servantProxyFactory;
  595. }
  596. ///////////////////////////////////////////////////////////////
  597. }