StatReport.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "servant/StatReport.h"
  17. #include "util/tc_common.h"
  18. #include "util/tc_timeprovider.h"
  19. #include "servant/RemoteLogger.h"
  20. #include "servant/Communicator.h"
  21. #include "servant/Application.h"
  22. #include <iostream>
  23. namespace tars
  24. {
  25. //////////////////////////////////////////////////////////////////
  26. //
  27. StatReport::StatReport(size_t iEpollNum)
  28. : _time(0)
  29. , _reportInterval(60000)
  30. , _reportTimeout(5000)
  31. , _maxReportSize(MAX_REPORT_SIZE)
  32. , _terminate(false)
  33. , _sampleRate(1)
  34. , _maxSampleCount(500)
  35. , _epollNum(iEpollNum)
  36. , _retValueNumLimit(10)
  37. {
  38. srand(time(NULL));
  39. for(size_t i = 0 ; i < _epollNum; i++)
  40. {
  41. _statMsg.push_back(new stat_queue(MAX_STAT_QUEUE_SIZE));
  42. }
  43. }
  44. StatReport::~StatReport()
  45. {
  46. if (isAlive())
  47. {
  48. terminate();
  49. getThreadControl().join();
  50. }
  51. for(size_t i = 0; i < _statMsg.size(); ++i)
  52. {
  53. if(_statMsg[i])
  54. {
  55. delete _statMsg[i];
  56. _statMsg[i] = NULL;
  57. }
  58. }
  59. }
  60. void StatReport::terminate()
  61. {
  62. Lock lock(*this);
  63. _terminate = true;
  64. notifyAll();
  65. }
  66. void StatReport::report(size_t iSeq,MapStatMicMsg * pmStatMicMsg)
  67. {
  68. assert(iSeq < _epollNum);
  69. bool bFlag = _statMsg[iSeq]->push_back(pmStatMicMsg);
  70. if(!bFlag)
  71. {
  72. delete pmStatMicMsg;
  73. pmStatMicMsg = NULL;
  74. TLOGERROR("[TARS][StatReport::report] queue full." << endl);
  75. }
  76. }
  77. void StatReport::setReportInfo(const StatFPrx& statPrx,
  78. const PropertyFPrx& propertyPrx,
  79. const string& strModuleName,
  80. const string& strModuleIp,
  81. const string& strSetDivision,
  82. int iReportInterval,
  83. int iSampleRate,
  84. unsigned int iMaxSampleCount,
  85. int iMaxReportSize,
  86. int iReportTimeout)
  87. {
  88. Lock lock(*this);
  89. _statPrx = statPrx;
  90. _propertyPrx = propertyPrx;
  91. //包头信息,trim&substr 防止超长导致udp包发送失败
  92. _moduleName = trimAndLimitStr(strModuleName, MAX_MASTER_NAME_LEN);
  93. _ip = trimAndLimitStr(strModuleIp, MAX_MASTER_IP_LEN);
  94. _time = TNOW;
  95. _reportInterval = iReportInterval < 10000 ? 10000 : iReportInterval;
  96. _reportTimeout = iReportTimeout < 5000 ? 5000 : iReportTimeout;
  97. _sampleRate = (iSampleRate < 1)?1: iSampleRate;
  98. _maxSampleCount = iMaxSampleCount>500?500:iMaxSampleCount;
  99. if ( iMaxReportSize < MIN_REPORT_SIZE || iMaxReportSize > MAX_REPORT_SIZE )
  100. {
  101. _maxReportSize = MAX_REPORT_SIZE;
  102. }
  103. else
  104. {
  105. _maxReportSize = iMaxReportSize;
  106. }
  107. vector<string> vtSetInfo = TC_Common::sepstr<string>(strSetDivision,".");
  108. if (vtSetInfo.size()!=3 ||(vtSetInfo[0]=="*"||vtSetInfo[1]=="*"))
  109. {
  110. _setArea= "";
  111. _setID = "";
  112. }
  113. else
  114. {
  115. _setName = vtSetInfo[0];
  116. _setArea = vtSetInfo[1];
  117. _setID = vtSetInfo[2];
  118. }
  119. //TLOGDEBUG("setReportInfo Division:" << strSetDivision << " " << _setName << " " << _setArea << " " << _setID << endl);
  120. resetStatInterv();
  121. if (!isAlive())
  122. {
  123. start();
  124. }
  125. }
  126. void StatReport::addStatInterv(int iInterv)
  127. {
  128. Lock lock(*this);
  129. _timePoint.push_back(iInterv);
  130. sort(_timePoint.begin(),_timePoint.end());
  131. unique(_timePoint.begin(),_timePoint.end());
  132. }
  133. void StatReport::getIntervCount(int time,StatMicMsgBody& body)
  134. {
  135. int iTimePoint = 0;
  136. bool bNeedInit = false;
  137. bool bGetIntev = false;
  138. if(body.intervalCount.size() == 0) //第一次需要将所有描点值初始化为0
  139. {
  140. bNeedInit = true;
  141. }
  142. for(int i =0;i<(int)_timePoint.size();i++)
  143. {
  144. iTimePoint = _timePoint[i];
  145. if(bGetIntev == false && time < iTimePoint)
  146. {
  147. bGetIntev = true;
  148. body.intervalCount[iTimePoint]++;
  149. if(bNeedInit == false)
  150. break;
  151. else
  152. continue;
  153. }
  154. if(bNeedInit == true)
  155. {
  156. body.intervalCount[iTimePoint] = 0;
  157. }
  158. }
  159. return;
  160. }
  161. void StatReport::resetStatInterv()
  162. {
  163. _timePoint.clear();
  164. _timePoint.push_back(5);
  165. _timePoint.push_back(10);
  166. _timePoint.push_back(50);
  167. _timePoint.push_back(100);
  168. _timePoint.push_back(200);
  169. _timePoint.push_back(500);
  170. _timePoint.push_back(1000);
  171. _timePoint.push_back(2000);
  172. _timePoint.push_back(3000);
  173. sort(_timePoint.begin(),_timePoint.end());
  174. unique(_timePoint.begin(),_timePoint.end());
  175. }
  176. string StatReport::trimAndLimitStr(const string& str, uint32_t limitlen)
  177. {
  178. static const string strTime = "\r\t";
  179. string ret = TC_Common::trim(str, strTime);
  180. if (ret.length() > limitlen)
  181. {
  182. ret.resize(limitlen);
  183. }
  184. return ret;
  185. }
  186. bool StatReport::divison2SetInfo(const string& str, vector<string>& vtSetInfo)
  187. {
  188. vtSetInfo = TC_Common::sepstr<string>(str,".");
  189. if (vtSetInfo.size() != 3 ||(vtSetInfo[0]=="*"||vtSetInfo[1]=="*"))
  190. {
  191. TLOGERROR(__FUNCTION__ << ":" << __LINE__ << "|bad set name [" << str << endl);
  192. return false;
  193. }
  194. return true;
  195. }
  196. /*
  197. tars.tarsstat to tarsstat
  198. */
  199. string StatReport::getServerName(string sModuleName)
  200. {
  201. string::size_type pos = sModuleName.find(".");
  202. if(pos != string::npos)
  203. {
  204. return sModuleName.substr(pos + 1); //+1:过滤.
  205. }
  206. return sModuleName;
  207. }
  208. void StatReport::report(const string& strModuleName,
  209. const string& setdivision,
  210. const string& strInterfaceName,
  211. const string& strModuleIp,
  212. uint16_t iPort,
  213. StatResult eResult,
  214. int iSptime,
  215. int iReturnValue,
  216. bool bFromClient)
  217. {
  218. //包头信息,trim&substr 防止超长导致udp包发送失败
  219. //masterIp为空服务端自己获取。
  220. StatMicMsgHead head;
  221. StatMicMsgBody body;
  222. string sMaterServerName = "";
  223. string sSlaveServerName = "";
  224. string appName = "";// 由setdivision生成
  225. if(bFromClient)
  226. {
  227. if (!_setName.empty())
  228. {
  229. head.masterName = _moduleName + "." + _setName + _setArea + _setID + "@" + ClientConfig::TarsVersion;
  230. }
  231. else
  232. {
  233. head.masterName = _moduleName + "@" + ClientConfig::TarsVersion;
  234. }
  235. if (!setdivision.empty()) //被调没有启用set分组,slavename保持原样
  236. {
  237. vector <string> vtSetInfo;
  238. if(divison2SetInfo(setdivision, vtSetInfo))
  239. {
  240. head.slaveSetName = vtSetInfo[0];
  241. head.slaveSetArea = vtSetInfo[1];
  242. head.slaveSetID = vtSetInfo[2];
  243. }
  244. head.slaveName = trimAndLimitStr(strModuleName, MAX_MASTER_NAME_LEN) + "." + head.slaveSetName + head.slaveSetArea + head.slaveSetID;
  245. }
  246. else
  247. {
  248. head.slaveName = trimAndLimitStr(strModuleName, MAX_MASTER_NAME_LEN);
  249. }
  250. head.masterIp = "";
  251. head.slaveIp = trimAndLimitStr(strModuleIp, MAX_MASTER_IP_LEN);
  252. }
  253. else
  254. {
  255. //被调上报,masterName没有set信息
  256. head.masterName = trimAndLimitStr(strModuleName, MAX_MASTER_NAME_LEN);
  257. head.masterIp = trimAndLimitStr(strModuleIp, MAX_MASTER_IP_LEN);
  258. if(_setName.empty()) //被调上报,slave的set信息为空
  259. {
  260. head.slaveName = _moduleName;//服务端version不需要上报
  261. }
  262. else
  263. {
  264. head.slaveName = _moduleName + "." + _setName + _setArea + _setID;
  265. }
  266. head.slaveIp = "";
  267. head.slaveSetName = _setName;
  268. head.slaveSetArea = _setArea;
  269. head.slaveSetID = _setID;
  270. }
  271. head.interfaceName = trimAndLimitStr(strInterfaceName, MAX_MASTER_NAME_LEN);
  272. head.slavePort = iPort;
  273. head.returnValue = iReturnValue;
  274. //包体信息.
  275. if (eResult == STAT_SUCC)
  276. {
  277. body.count = 1;
  278. body.totalRspTime = body.minRspTime = body.maxRspTime = iSptime;
  279. }
  280. else if (eResult == STAT_TIMEOUT)
  281. {
  282. body.timeoutCount = 1;
  283. }
  284. else
  285. {
  286. body.execCount = 1;
  287. }
  288. submit(head, body, bFromClient);
  289. }
  290. void StatReport::report(const string& strMasterName,
  291. const string& strMasterIp,
  292. const string& strSlaveName,
  293. const string& strSlaveIp,
  294. uint16_t iSlavePort,
  295. const string& strInterfaceName,
  296. StatResult eResult,
  297. int iSptime,
  298. int iReturnValue)
  299. {
  300. //包头信息,trim&substr 防止超长导致udp包发送失败
  301. //masterIp为空服务端自己获取。
  302. StatMicMsgHead head;
  303. StatMicMsgBody body;
  304. head.masterName = trimAndLimitStr(strMasterName + "@" + ClientConfig::TarsVersion, MAX_MASTER_NAME_LEN);
  305. head.masterIp = trimAndLimitStr(strMasterIp, MAX_MASTER_IP_LEN);
  306. head.slaveName = trimAndLimitStr(strSlaveName, MAX_MASTER_NAME_LEN);
  307. head.slaveIp = trimAndLimitStr(strSlaveIp, MAX_MASTER_IP_LEN);
  308. head.interfaceName = trimAndLimitStr(strInterfaceName, MAX_MASTER_NAME_LEN);
  309. head.slavePort = iSlavePort;
  310. head.returnValue = iReturnValue;
  311. //包体信息.
  312. if(eResult == STAT_SUCC)
  313. {
  314. body.count = 1;
  315. body.totalRspTime = body.minRspTime = body.maxRspTime = iSptime;
  316. }
  317. else if(eResult == STAT_TIMEOUT)
  318. {
  319. body.timeoutCount = 1;
  320. }
  321. else
  322. {
  323. body.execCount = 1;
  324. }
  325. submit(head, body, true);
  326. }
  327. //
  328. //string StatReport::sampleUnid()
  329. //{
  330. //
  331. // static atomic<int> g_id(rand());
  332. //
  333. // char s[14] = { 0 };
  334. // time_t t = TNOW;
  335. // int ip = inet_addr(_ip.c_str());
  336. // int thread = ++g_id;
  337. // static unsigned short n = 0;
  338. // ++n;
  339. // memcpy(s, &ip, 4);
  340. // memcpy(s + 4, &t, 4);
  341. // memcpy(s + 8, &thread, 4);
  342. // memcpy(s + 12, &n, 2);
  343. // return TC_Common::bin2str(string(s, 14));
  344. //}
  345. void StatReport::submit( StatMicMsgHead& head, StatMicMsgBody& body,bool bFromClient )
  346. {
  347. Lock lock(*this);
  348. MapStatMicMsg& msg = (bFromClient == true)?_statMicMsgClient:_statMicMsgServer;
  349. MapStatMicMsg::iterator it = msg.find( head );
  350. if ( it != msg.end() )
  351. {
  352. StatMicMsgBody& stBody = it->second;
  353. stBody.count += body.count;
  354. stBody.timeoutCount += body.timeoutCount;
  355. stBody.execCount += body.execCount;
  356. stBody.totalRspTime += body.totalRspTime;
  357. if ( stBody.maxRspTime < body.maxRspTime )
  358. {
  359. stBody.maxRspTime = body.maxRspTime;
  360. }
  361. //非0最小值
  362. if ( stBody.minRspTime == 0 ||(stBody.minRspTime > body.minRspTime && body.minRspTime != 0))
  363. {
  364. stBody.minRspTime = body.minRspTime;
  365. }
  366. getIntervCount(body.maxRspTime, stBody);
  367. }
  368. else
  369. {
  370. getIntervCount(body.maxRspTime, body);
  371. msg[head] = body;
  372. }
  373. }
  374. size_t StatReport::getQueueSize(size_t epollIndex)
  375. {
  376. if(epollIndex >= _statMsg.size())
  377. {
  378. return 0;
  379. }
  380. return _statMsg[epollIndex]->size();
  381. }
  382. //void StatReport::doSample(const string& strSlaveName,
  383. // const string& strInterfaceName,
  384. // const string& strSlaveIp,
  385. // map<string, string>& status)
  386. //{
  387. //}
  388. int StatReport::reportMicMsg(MapStatMicMsg& msg,bool bFromClient)
  389. {
  390. if (msg.empty()) return 0;
  391. try
  392. {
  393. int iLen = 0;
  394. MapStatMicMsg mTemp;
  395. MapStatMicMsg mStatMsg;
  396. mStatMsg.clear();
  397. mTemp.clear();
  398. {
  399. Lock lock(*this);
  400. msg.swap(mStatMsg);
  401. }
  402. TLOGTARS("[TARS][StatReport::reportMicMsg get size:" << mStatMsg.size()<<"]"<< endl);
  403. for(MapStatMicMsg::iterator it = mStatMsg.begin(); it != mStatMsg.end(); it++)
  404. {
  405. const StatMicMsgHead &head = it->first;
  406. int iTemLen = STAT_PROTOCOL_LEN +head.masterName.length() + head.slaveName.length() + head.interfaceName.length()
  407. + head.slaveSetName.length() + head.slaveSetArea.length() + head.slaveSetID.length();
  408. iLen = iLen + iTemLen;
  409. if(iLen > _maxReportSize) //不能超过udp 1472
  410. {
  411. if(_statPrx)
  412. {
  413. TLOGTARS("[TARS][StatReport::reportMicMsg send size:" << mTemp.size()<<"]"<< endl);
  414. _statPrx->tars_set_timeout(_reportTimeout)->async_reportMicMsg(NULL,mTemp,bFromClient, ServerConfig::Context);
  415. }
  416. iLen = iTemLen;
  417. mTemp.clear();
  418. }
  419. mTemp[head] = it->second;
  420. if(LOG->isNeedLog(LocalRollLogger::INFO_LOG))
  421. {
  422. ostringstream os;
  423. os.str("");
  424. head.displaySimple(os);
  425. os << " ";
  426. mTemp[head].displaySimple(os);
  427. TLOGTARS("[TARS][StatReport::reportMicMsg display:" << os.str() << endl);
  428. }
  429. }
  430. if(0 != (int)mTemp.size())
  431. {
  432. if(_statPrx)
  433. {
  434. TLOGTARS("[TARS][StatReport::reportMicMsg send size:" << mTemp.size()<<"]"<< endl);
  435. _statPrx->tars_set_timeout(_reportTimeout)->async_reportMicMsg(NULL,mTemp,bFromClient, ServerConfig::Context);
  436. }
  437. }
  438. return 0;
  439. }
  440. catch ( exception& e )
  441. {
  442. TLOGERROR("StatReport::report catch exception:" << e.what() << endl);
  443. }
  444. catch ( ... )
  445. {
  446. TLOGERROR("StatReport::report catch unkown exception" << endl);
  447. }
  448. return -1;
  449. }
  450. int StatReport::reportPropMsg()
  451. {
  452. try
  453. {
  454. MapStatPropMsg mStatMsg;
  455. {
  456. Lock lock(*this);
  457. for(map<string, PropertyReportPtr>::iterator it = _statPropMsg.begin(); it != _statPropMsg.end(); ++it)
  458. {
  459. StatPropMsgHead head;
  460. StatPropMsgBody body;
  461. if (!it->second->getMasterName().empty())
  462. {
  463. if (!_setName.empty())
  464. {
  465. head.moduleName = it->second->getMasterName() + "." + _setName + _setArea + _setID;
  466. }
  467. else
  468. {
  469. head.moduleName = it->second->getMasterName();
  470. }
  471. }
  472. else
  473. {
  474. if (!_setName.empty())
  475. {
  476. head.moduleName = _moduleName + "." + _setName + _setArea + _setID;
  477. }
  478. else
  479. {
  480. head.moduleName = _moduleName;
  481. }
  482. }
  483. head.ip = "";
  484. head.propertyName = it->first;
  485. head.setName = _setName;
  486. head.setArea = _setArea;
  487. head.setID = _setID;
  488. head.iPropertyVer = 2;
  489. vector<pair<string, string> > v = it->second->get();
  490. for(size_t i = 0; i < v.size(); i++)
  491. {
  492. bool bFlag = false;
  493. if(v[i].first == "Sum")
  494. {
  495. if(v[i].second != "0")
  496. bFlag = true;
  497. }
  498. else if(v[i].first == "Avg")
  499. {
  500. if(v[i].second != "0")
  501. bFlag = true;
  502. }
  503. else if(v[i].first == "Distr")
  504. {
  505. if(v[i].second != "")
  506. bFlag = true;
  507. }
  508. else if(v[i].first == "Max")
  509. {
  510. if(v[i].second != "-9999999")
  511. bFlag = true;
  512. }
  513. else if(v[i].first == "Min")
  514. {
  515. if(v[i].second != "0")
  516. bFlag = true;
  517. }
  518. else if(v[i].first == "Count")
  519. {
  520. if(v[i].second != "0")
  521. bFlag = true;
  522. }
  523. else
  524. {
  525. bFlag = true;
  526. }
  527. if(bFlag)
  528. {
  529. StatPropInfo sp;
  530. sp.policy = v[i].first;
  531. sp.value = v[i].second;
  532. body.vInfo.push_back(sp);
  533. }
  534. }
  535. mStatMsg[head] = body;
  536. if(LOG->isNeedLog(LocalRollLogger::INFO_LOG))
  537. {
  538. ostringstream os;
  539. os.str("");
  540. head.displaySimple(os);
  541. os << " ";
  542. mStatMsg[head].displaySimple(os);
  543. TLOGTARS("[TARS][StatReport::reportPropMsg display:" << os.str() << endl);
  544. }
  545. }
  546. }
  547. TLOGTARS("[TARS][StatReport::reportPropMsg get size:" << mStatMsg.size()<<"]"<< endl);
  548. int iLen = 0;
  549. MapStatPropMsg mTemp;
  550. for(MapStatPropMsg::iterator it = mStatMsg.begin(); it != mStatMsg.end(); it++)
  551. {
  552. const StatPropMsgHead &head = it->first;
  553. const StatPropMsgBody &body = it->second;
  554. int iTemLen = head.moduleName.length()+ head.ip.length() + head.propertyName.length() + head.setName.length() + head.setArea.length() + head.setID.length();
  555. for(size_t i = 0; i < body.vInfo.size(); i++)
  556. {
  557. iTemLen+=body.vInfo[i].policy.length();
  558. iTemLen+=body.vInfo[i].value.length();
  559. }
  560. iTemLen = PROPERTY_PROTOCOL_LEN + body.vInfo.size(); //
  561. iLen = iLen + iTemLen;
  562. if(iLen > _maxReportSize) //不能超过udp 1472
  563. {
  564. if(_propertyPrx)
  565. {
  566. TLOGTARS("[TARS][StatReport::reportPropMsg send size:" << mTemp.size()<<"]"<< endl);
  567. _propertyPrx->tars_set_timeout(_reportTimeout)->async_reportPropMsg(NULL,mTemp);
  568. }
  569. iLen = iTemLen;
  570. mTemp.clear();
  571. }
  572. mTemp[it->first] = it->second;
  573. }
  574. if(0 != (int)mTemp.size())
  575. {
  576. if(_propertyPrx)
  577. {
  578. TLOGTARS("[TARS][StatReport::reportPropMsg send size:" << mTemp.size()<< "]"<< endl);
  579. _propertyPrx->tars_set_timeout(_reportTimeout)->async_reportPropMsg(NULL,mTemp);
  580. }
  581. }
  582. return 0;
  583. }
  584. catch ( exception& e )
  585. {
  586. TLOGERROR("StatReport::reportPropMsg catch exception:" << e.what() << endl);
  587. }
  588. catch ( ... )
  589. {
  590. TLOGERROR("StatReport::reportPropMsg catch unkown exception" << endl);
  591. }
  592. return -1;
  593. }
  594. int StatReport::reportSampleMsg()
  595. {
  596. try
  597. {
  598. MMapStatSampleMsg mmStatSampleMsg;
  599. {
  600. Lock lock(*this);
  601. _statSampleMsg.swap(mmStatSampleMsg);
  602. }
  603. TLOGTARS("[TARS][StatReport::reportSampleMsg get size:" << mmStatSampleMsg.size()<<"]"<< endl);
  604. int iLen = 0;
  605. vector<StatSampleMsg> vTemp;
  606. for(MMapStatSampleMsg::const_iterator it = mmStatSampleMsg.begin() ;it != mmStatSampleMsg.end();++it)
  607. {
  608. StatSampleMsg sample = it->second;
  609. int iTemLen = STAT_PROTOCOL_LEN +sample.masterName.length() + sample.slaveName.length() + sample.interfaceName.length();
  610. iLen = iLen + iTemLen;
  611. if(iLen > _maxReportSize) //不能超过udp 1472
  612. {
  613. if(_statPrx)
  614. {
  615. TLOGTARS("[TARS][StatReport::reportSampleMsg send size:" << vTemp.size()<< "]"<< endl);
  616. _statPrx->tars_set_timeout(_reportTimeout)->async_reportSampleMsg(NULL,vTemp, ServerConfig::Context);
  617. }
  618. iLen = iTemLen;
  619. vTemp.clear();
  620. }
  621. vTemp.push_back(sample);
  622. }
  623. if(0 != (int)vTemp.size())
  624. {
  625. if(_statPrx)
  626. {
  627. TLOGTARS("[TARS][StatReport::reportSampleMsg send size:" << vTemp.size()<< "]"<< endl);
  628. _statPrx->tars_set_timeout(_reportTimeout)->async_reportSampleMsg(NULL,vTemp, ServerConfig::Context);
  629. }
  630. }
  631. return 0;
  632. }
  633. catch ( exception& e )
  634. {
  635. TLOGERROR("StatReport::reportSampleMsg catch exception:" << e.what() << endl);
  636. }
  637. catch ( ... )
  638. {
  639. TLOGERROR("StatReport::reportSampleMsg catch unkown exception" << endl);
  640. }
  641. return -1;
  642. }
  643. void StatReport::addMicMsg(MapStatMicMsg & old,MapStatMicMsg & add)
  644. {
  645. MapStatMicMsg::iterator iter;
  646. MapStatMicMsg::iterator iterOld;
  647. iter = add.begin();
  648. for(;iter != add.end();++iter)
  649. {
  650. iterOld = old.find(iter->first);
  651. if(iterOld == old.end())
  652. {
  653. //直接insert
  654. old.insert(make_pair(iter->first,iter->second));
  655. }
  656. else
  657. {
  658. //合并
  659. iterOld->second.count += iter->second.count;
  660. iterOld->second.timeoutCount += iter->second.timeoutCount;
  661. iterOld->second.execCount += iter->second.execCount;
  662. map<int,int>::iterator iterOldInt,iterInt;
  663. map<int,int> & mCount = iter->second.intervalCount;
  664. map<int,int> & mCountOld = iterOld->second.intervalCount;
  665. iterInt = mCount.begin();
  666. for(;iterInt != mCount.end();++iterInt)
  667. {
  668. iterOldInt = mCountOld.find(iterInt->first);
  669. if(iterOldInt == mCountOld.end())
  670. {
  671. mCountOld.insert(make_pair(iterInt->first,iterInt->second));
  672. }
  673. else
  674. {
  675. iterOldInt->second += iterInt->second;
  676. }
  677. }
  678. iterOld->second.totalRspTime += iter->second.totalRspTime;
  679. if(iterOld->second.maxRspTime < iter->second.maxRspTime)
  680. iterOld->second.maxRspTime = iter->second.maxRspTime;
  681. if(iterOld->second.minRspTime > iter->second.minRspTime)
  682. iterOld->second.minRspTime = iter->second.minRspTime;
  683. }
  684. }
  685. }
  686. void StatReport::run()
  687. {
  688. while(!_terminate)
  689. {
  690. {
  691. Lock lock(*this);
  692. if (_terminate)
  693. return;
  694. timedWait(1000);
  695. }
  696. try
  697. {
  698. time_t tNow = TNOW;
  699. if(tNow - _time > _reportInterval/1000)
  700. {
  701. reportMicMsg(_statMicMsgClient, true);
  702. reportMicMsg(_statMicMsgServer, false);
  703. MapStatMicMsg mStatMsg;
  704. for(size_t i = 0; i < _epollNum; ++i)
  705. {
  706. MapStatMicMsg * pStatMsg;
  707. while(_statMsg[i]->pop_front(pStatMsg))
  708. {
  709. addMicMsg(mStatMsg,*pStatMsg);
  710. delete pStatMsg;
  711. }
  712. }
  713. #if 0
  714. ostringstream os;
  715. MapStatMicMsg::iterator iter;
  716. iter = mStatMsg.begin();
  717. for(;iter != mStatMsg.end();++iter)
  718. {
  719. iter->first.display(os);
  720. os<<endl;
  721. iter->second.display(os);
  722. os<<endl<<endl;
  723. }
  724. TLOGDEBUG("StatReport::run() msg:"<<os.str()<<endl);
  725. #endif
  726. reportMicMsg(mStatMsg, true);
  727. reportPropMsg();
  728. reportSampleMsg();
  729. _time = tNow;
  730. }
  731. }
  732. catch ( exception& e )
  733. {
  734. TLOGERROR("StatReport::run catch exception:" << e.what() << endl);
  735. }
  736. catch ( ... )
  737. {
  738. TLOGERROR("StatReport::run catch unkown exception" << endl);
  739. }
  740. }
  741. }
  742. ////////////////////////////////////////////////////////////////
  743. }