ServantHandle.cpp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include "util/tc_thread_pool.h"
  17. #include "util/tc_timeprovider.h"
  18. #include "servant/ServantHandle.h"
  19. #include "servant/Application.h"
  20. #include "servant/ServantHelper.h"
  21. #include "servant/AppProtocol.h"
  22. #include "servant/BaseF.h"
  23. #include "servant/KeepAliveNodeF.h"
  24. #include "servant/Cookie.h"
  25. #include "servant/Application.h"
  26. // #ifdef TARS_OPENTRACKING
  27. // #include "servant/text_map_carrier.h"
  28. // #endif
  29. namespace tars
  30. {
  31. /////////////////////////////////////////////////////////////////////////
  32. //
  33. ServantHandle::ServantHandle(Application *application)
  34. : _application(application)
  35. {
  36. }
  37. ServantHandle::~ServantHandle()
  38. {
  39. auto it = _servants.begin();
  40. while(it != _servants.end())
  41. {
  42. try
  43. {
  44. it->second->destroy();
  45. }
  46. catch(exception &ex)
  47. {
  48. TLOGERROR("[ServantHandle::destroy error:" << ex.what() << "]" << endl);
  49. }
  50. catch(...)
  51. {
  52. TLOGERROR("[ServantHandle::destroy unknown exception error]" << endl);
  53. }
  54. ++it;
  55. }
  56. }
  57. void ServantHandle::handleAsyncResponse()
  58. {
  59. ReqMessagePtr resp;
  60. auto it = _servants.begin();
  61. while (it != _servants.end())
  62. {
  63. while (it->second->getResponseQueue().pop_front(resp))
  64. {
  65. try
  66. {
  67. if (resp->response->iRet == TARSSERVERSUCCESS)
  68. {
  69. it->second->doResponse(resp);
  70. }
  71. else if (resp->pObjectProxy == NULL)
  72. {
  73. it->second->doResponseNoRequest(resp);
  74. }
  75. else
  76. {
  77. it->second->doResponseException(resp);
  78. }
  79. }
  80. catch (exception& e)
  81. {
  82. TLOGERROR("[ServantHandle::doResponse ex:" << e.what() << "]" << endl);
  83. }
  84. catch (...)
  85. {
  86. TLOGERROR("[ServantHandle::doResponse error]" << endl);
  87. }
  88. }
  89. //业务处理附加的自有消息
  90. try
  91. {
  92. it->second->doCustomMessage(false);
  93. it->second->doCustomMessage();
  94. }
  95. catch (exception& e)
  96. {
  97. TLOGERROR("[ServantHandle::doCustemMessage ex:" << e.what() << "]" << endl);
  98. }
  99. catch (...)
  100. {
  101. TLOGERROR("[ServantHandle::doCustemMessage ex.]" << endl);
  102. }
  103. ++it;
  104. }
  105. }
  106. void ServantHandle::handleCustomMessage(bool bExpectIdle)
  107. {
  108. for (auto it = _servants.begin(); it != _servants.end(); it++)
  109. {
  110. //业务处理附加的自有消息
  111. try
  112. {
  113. it->second->doCustomMessage(bExpectIdle);
  114. it->second->doCustomMessage();
  115. }
  116. catch (exception& e)
  117. {
  118. TLOGERROR("[ServantHandle::doCustemMessage ex:" << e.what() << "]" << endl);
  119. }
  120. catch (...)
  121. {
  122. TLOGERROR("[ServantHandle::doCustemMessage ex.]" << endl);
  123. }
  124. }
  125. }
  126. bool ServantHandle::allFilterIsEmpty()
  127. {
  128. auto it = _servants.begin();
  129. while (it != _servants.end())
  130. {
  131. if (!it->second->getResponseQueue().empty())
  132. {
  133. return false;
  134. }
  135. ++it;
  136. }
  137. return true;
  138. }
  139. void ServantHandle::initialize()
  140. {
  141. if(TC_CoroutineScheduler::scheduler())
  142. {
  143. ServantProxyThreadData::getData()->_sched = TC_CoroutineScheduler::scheduler();
  144. }
  145. ServantPtr servant = _application->getServantHelper()->create(_bindAdapter->getName());
  146. if (servant)
  147. {
  148. _servants[servant->getName()] = servant;
  149. }
  150. else
  151. {
  152. TLOGERROR("[ServantHandle initialize createServant ret null, for adapter `" +_bindAdapter->getName() + "`]" << endl);
  153. cerr << "ServantHandle initialize createServant ret null, for adapter `" +_bindAdapter->getName() + "`]" << endl;
  154. RemoteNotify::getInstance()->report("initialize createServant error: no adapter:" + _bindAdapter->getName());
  155. TC_Common::msleep(100);
  156. exit(-1);
  157. }
  158. auto it = _servants.begin();
  159. if(it == _servants.end())
  160. {
  161. TLOGERROR("[initialize error: no servant exists]" << endl);
  162. RemoteNotify::getInstance()->report("initialize error: no servant exists.");
  163. TC_Common::msleep(100);
  164. exit(-1);
  165. }
  166. while(it != _servants.end())
  167. {
  168. try
  169. {
  170. it->second->setHandle(this);
  171. it->second->initialize();
  172. TLOGTARS("[" << it->second->getName() << " initialize]" << endl);
  173. }
  174. catch(exception &ex)
  175. {
  176. TLOGERROR("[initialize error:" << ex.what() << "]" << endl);
  177. RemoteNotify::getInstance()->report("initialize error:" + string(ex.what()));
  178. TC_Common::msleep(100);
  179. exit(-1);
  180. }
  181. catch(...)
  182. {
  183. TLOGERROR("[initialize unknown exception error]" << endl);
  184. RemoteNotify::getInstance()->report("initialize unknown exception error");
  185. TC_Common::msleep(100);
  186. exit(-1);
  187. }
  188. ++it;
  189. }
  190. }
  191. void ServantHandle::heartbeat()
  192. {
  193. time_t fcur = TNOW;
  194. if (abs(fcur - _bindAdapter->getHeartBeatTime()) > HEART_BEAT_INTERVAL)
  195. {
  196. _bindAdapter->setHeartBeatTime(fcur);
  197. TARS_KEEPALIVE(_bindAdapter->getName());
  198. //上报连接数 比率
  199. if (_bindAdapter->_pReportConRate)
  200. {
  201. _bindAdapter->_pReportConRate->report((int)(_bindAdapter->getNowConnection() * 1000 / _bindAdapter->getMaxConns()));
  202. }
  203. //有队列, 且队列长度>0才上报
  204. if (_bindAdapter->_pReportQueue)
  205. {
  206. _bindAdapter->_pReportQueue->report((int)_bindAdapter->getRecvBufferSize());
  207. }
  208. }
  209. }
  210. CurrentPtr ServantHandle::createCurrent(const shared_ptr<TC_EpollServer::RecvContext> &data)
  211. {
  212. CurrentPtr current = new Current(this);
  213. try
  214. {
  215. current->initialize(data);
  216. }
  217. catch (TarsDecodeException &ex)
  218. {
  219. TLOGERROR("[ServantHandle::handle request protocol decode error:" << ex.what() << "]" << endl);
  220. close(data);
  221. return NULL;
  222. }
  223. //只有TARS协议才处理
  224. if(current->getBindAdapter()->isTarsProtocol())
  225. {
  226. int64_t now = TNOWMS;
  227. //数据在队列中的时间超过了客户端等待的时间(TARS协议)
  228. if (current->_request.iTimeout > 0 && (now - data->recvTimeStamp()) > current->_request.iTimeout)
  229. {
  230. //上报超时数目
  231. if (data->adapter()->_pReportTimeoutNum)
  232. data->adapter()->_pReportTimeoutNum->report(1);
  233. TLOGERROR("[TARS][ServantHandle::handle queue timeout:"
  234. << current->_request.sServantName << ", func:"
  235. << current->_request.sFuncName << ", recv time:"
  236. << data->recvTimeStamp() << ", queue timeout:"
  237. << data->adapter()->getQueueTimeout() << ", timeout:"
  238. << current->_request.iTimeout << ", now:"
  239. << now << ", ip:" << data->ip() << ", port:" << data->port() << "]" << endl);
  240. current->sendResponse(TARSSERVERQUEUETIMEOUT);
  241. return NULL;
  242. }
  243. }
  244. return current;
  245. }
  246. CurrentPtr ServantHandle::createCloseCurrent(const shared_ptr<TC_EpollServer::RecvContext> &data)
  247. {
  248. CurrentPtr current = new Current(this);
  249. current->initializeClose(data);
  250. current->setReportStat(false);
  251. current->setCloseType(data->closeType());
  252. return current;
  253. }
  254. void ServantHandle::handleClose(const shared_ptr<TC_EpollServer::RecvContext> &data)
  255. {
  256. TLOGTARS("[ServantHandle::handleClose,adapter:" << data->adapter()->getName() << ",peer:" << data->ip() << ":" << data->port() << "]"<< endl);
  257. CurrentPtr current = createCloseCurrent(data);
  258. auto sit = _servants.find(current->getServantName());
  259. if (sit == _servants.end())
  260. {
  261. TLOGERROR("[TARS]ServantHandle::handleClose,adapter:" << data->adapter()->getName() << ",peer:" << data->ip() << ":" << data->port() << ", " << current->getServantName() << " not found" << endl);
  262. return;
  263. }
  264. try
  265. {
  266. //业务逻辑处理
  267. sit->second->doClose(current);
  268. }
  269. catch (exception& ex)
  270. {
  271. TLOGERROR("[TARS]ServantHandle::handleClose " << ex.what() << endl);
  272. return;
  273. }
  274. catch (...)
  275. {
  276. TLOGERROR("[TARS]ServantHandle::handleClose unknown error" << endl);
  277. return;
  278. }
  279. }
  280. void ServantHandle::handleTimeout(const shared_ptr<TC_EpollServer::RecvContext> &data)
  281. {
  282. CurrentPtr current = createCurrent(data);
  283. if (!current) return;
  284. //上报超时数目
  285. if(data->adapter()->_pReportTimeoutNum)
  286. data->adapter()->_pReportTimeoutNum->report(1);
  287. TLOGERROR("[ServantHandle::handleTimeout adapter '"
  288. << data->adapter()->getName()
  289. << "', recvtime:" << data->recvTimeStamp() << "|"
  290. << ", timeout:" << data->adapter()->getQueueTimeout()
  291. << ", id:" << current->getRequestId() << "]" << endl);
  292. if (current->getBindAdapter()->isTarsProtocol())
  293. {
  294. current->sendResponse(TARSSERVERQUEUETIMEOUT);
  295. }
  296. }
  297. void ServantHandle::handleOverload(const shared_ptr<TC_EpollServer::RecvContext> &data)
  298. {
  299. CurrentPtr current = createCurrent(data);
  300. if (!current) return;
  301. TLOGERROR("[ServantHandle::handleOverload adapter '"
  302. << data->adapter()->getName()
  303. << "',overload:-1,queue capacity:"
  304. << data->adapter()->getQueueCapacity()
  305. << ",id:" << current->getRequestId() << "]" << endl);
  306. if (current->getBindAdapter()->isTarsProtocol())
  307. {
  308. current->sendResponse(TARSSERVEROVERLOAD);
  309. }
  310. }
  311. void ServantHandle::handle(const shared_ptr<TC_EpollServer::RecvContext> &data)
  312. {
  313. CurrentPtr current = createCurrent(data);
  314. if (!current) return;
  315. if (current->getBindAdapter()->isTarsProtocol())
  316. {
  317. handleTarsProtocol(current);
  318. }
  319. else
  320. {
  321. handleNoTarsProtocol(current);
  322. }
  323. }
  324. // #ifdef TARS_OPENTRACKING
  325. // void ServantHandle::processTracking(const TarsCurrentPtr &current)
  326. // {
  327. // if(!(Application::getCommunicator()->_traceManager))
  328. // {
  329. // return;
  330. // }
  331. // ServantProxyThreadData * sptd = ServantProxyThreadData::getData();
  332. // assert(sptd);
  333. // if(!sptd)
  334. // {
  335. // return;
  336. // }
  337. // //提取packet中的span信息,更新为被调的span信息后设置到sptd->_trackInfoMap;
  338. // sptd->_trackInfoMap.clear();
  339. // if (IS_MSG_TYPE(current->getMessageType(), tars::TARSMESSAGETYPETRACK))
  340. // {
  341. // map<string, string>::const_iterator trackinfoIter = current->getRequestStatus().find(ServantProxy::STATUS_TRACK_KEY);
  342. // TLOGTARS("[TARS] servant got a tracking request, message_type set" << current->getMessageType() << endl);
  343. // if (trackinfoIter != current->getRequestStatus().end())
  344. // {
  345. // TLOGTARS("[TARS] servant got a tracking request, tracking key:" << trackinfoIter->second << endl);
  346. // string context = trackinfoIter->second;
  347. // char szBuffer[context.size() + 1];
  348. // memset(szBuffer, 0x00, context.size() + 1);
  349. // memcpy(szBuffer, context.c_str(), context.size());
  350. // std::unordered_map<std::string, std::string> text_map;
  351. // write_span_context(text_map, szBuffer);
  352. // TextMapCarrier carrier(text_map);
  353. // auto tracer = Application::getCommunicator()->_traceManager->_tracer;
  354. // auto span_context_maybe = tracer->Extract(carrier);
  355. // if(!span_context_maybe)
  356. // {
  357. // //error
  358. // TLOGERROR("[TARS] servant got a tracking request, but extract the span context fail");
  359. // return ;
  360. // }
  361. // string funcName = current->getFuncName();
  362. // auto child_span = tracer->StartSpan(funcName, {opentracing::ChildOf(span_context_maybe->get())});
  363. // //text_map.clear();
  364. // auto err = tracer->Inject(child_span->context(), carrier);
  365. // assert(err);
  366. // sptd->_trackInfoMap = text_map;
  367. // _spanMap[current->getRequestId()].reset(child_span.release());
  368. // return ;
  369. // }
  370. // }
  371. // return ;
  372. // }
  373. // void ServantHandle::finishTracking(int ret, const TarsCurrentPtr &current)
  374. // {
  375. // int requestId = current->getRequestId();
  376. // if(_spanMap.find(requestId) != _spanMap.end())
  377. // {
  378. // auto spanIter = _spanMap.find(requestId);
  379. // spanIter->second->SetTag("Retcode", ret);
  380. // spanIter->second->Finish();
  381. // _spanMap.erase(requestId);
  382. // }
  383. // }
  384. // #endif
  385. bool ServantHandle::processDye(const CurrentPtr &current, string& dyeingKey)
  386. {
  387. //当前线程的线程数据
  388. ServantProxyThreadData *sptd = ServantProxyThreadData::getData();
  389. if (sptd)
  390. {
  391. sptd->_data._dyeingKey = "";
  392. }
  393. //当前请求已经被染色, 需要打印染色日志
  394. map<string, string>::const_iterator dyeingIt = current->getRequestStatus().find(ServantProxy::STATUS_DYED_KEY);
  395. if (IS_MSG_TYPE(current->getMessageType(), tars::TARSMESSAGETYPEDYED))
  396. {
  397. TLOGTARS("[servant got a dyeing request, message_type set: " << current->getMessageType() << "]" << endl);
  398. if (dyeingIt != current->getRequestStatus().end())
  399. {
  400. TLOGTARS("[servant got a dyeing request, dyeing key: " << dyeingIt->second << "]" << endl);
  401. dyeingKey = dyeingIt->second;
  402. }
  403. return true;
  404. }
  405. //servant已经被染色, 开启染色日志
  406. if (_application->getServantHelper()->isDyeing())
  407. {
  408. map<string, string>::const_iterator dyeingKeyIt = current->getRequestStatus().find(ServantProxy::STATUS_GRID_KEY);
  409. if (dyeingKeyIt != current->getRequestStatus().end() &&
  410. _application->getServantHelper()->isDyeingReq(dyeingKeyIt->second, current->getServantName(), current->getFuncName()))
  411. {
  412. TLOGTARS("[TARS] dyeing servant got a dyeing req, key:" << dyeingKeyIt->second << endl);
  413. dyeingKey = dyeingKeyIt->second;
  414. return true;
  415. }
  416. }
  417. return false;
  418. }
  419. bool ServantHandle::processTrace(const CurrentPtr &current)
  420. {
  421. //当前线程的线程数据
  422. ServantProxyThreadData* sptd = ServantProxyThreadData::getData();
  423. if (sptd)
  424. {
  425. sptd->_traceCall = false;
  426. sptd->_traceContext.reset();
  427. }
  428. // 如果调用链需要追踪,需要初始化线程私有追踪参数
  429. map<string, string>::const_iterator traceIt = current->getRequestStatus().find(ServantProxy::STATUS_TRACE_KEY);
  430. if (IS_MSG_TYPE(current->getMessageType(), tars::TARSMESSAGETYPETRACE))
  431. {
  432. TLOGTARS("[TARS] servant got a trace request, message_type set " << current->getMessageType() << endl);
  433. if (traceIt != current->getRequestStatus().end())
  434. {
  435. TLOGTARS("[TARS] servant got a trace request, trace key:" << traceIt->second << endl);
  436. if (sptd->initTrace(traceIt->second))
  437. {
  438. sptd->_traceCall = true;
  439. return true;
  440. }
  441. else
  442. {
  443. TLOGTARS("[TARS] servant got a trace request, but trace key is error:" << traceIt->second << endl);
  444. }
  445. }
  446. }
  447. return false;
  448. }
  449. bool ServantHandle::processCookie(const CurrentPtr &current, map<string, string> &cookie)
  450. {
  451. const static string STATUS = "STATUS_";
  452. std::for_each(current->getRequestStatus().begin(), current->getRequestStatus().end(),[&](const map<string, string>::value_type& p){
  453. if(p.first.size() > STATUS.size() && TC_Port::strncasecmp(p.first.c_str(), STATUS.c_str(), STATUS.size()) == 0) {
  454. return;
  455. }
  456. cookie.insert(make_pair(p.first, p.second));
  457. });
  458. return !cookie.empty();
  459. }
  460. bool ServantHandle::checkValidSetInvoke(const CurrentPtr &current)
  461. {
  462. /*是否允许检查合法性*/
  463. if (ServerConfig::IsCheckSet == 0)
  464. {
  465. //不检查
  466. return true;
  467. }
  468. bool isSetInvoke = IS_MSG_TYPE(current->getMessageType(), tars::TARSMESSAGETYPESETNAME);
  469. //客户端按set规则调用且服务端启用set
  470. if (isSetInvoke && ClientConfig::SetOpen)
  471. {
  472. /**
  473. * 合法性规则:
  474. * 1 客户端set名称与服务端set在同一分组,eg, test.s.1 <-> test.s.1
  475. * 2 客户端set名称与服务端set在同一地区,eg, test.s.* <-> test.s.1 | test.s.2 | test.s.*
  476. * 3 客户端set名称与服务端set属于不同名称,eg,test1.s.1 <->test2.n.2
  477. * 4 1,2,3条件都不满足,则认为该调用不合法
  478. */
  479. map<string, string>::const_iterator setIt = current->getRequestStatus().find(ServantProxy::STATUS_SETNAME_VALUE);
  480. string sSetName("");
  481. if (setIt != current->getRequestStatus().end())
  482. {
  483. TLOGTARS("[servant got a setname request, setname key:" << setIt->second << "]" << endl);
  484. sSetName = setIt->second;
  485. if (ClientConfig::SetDivision == sSetName)
  486. {
  487. return true;
  488. }
  489. else
  490. {
  491. //属于同一地区是也属于合法调用
  492. string setArea1 = ClientConfig::SetDivision.substr(0,ClientConfig::SetDivision.find_last_of("."));
  493. string setArea2 = sSetName.substr(0,sSetName.find_last_of("."));
  494. if (setArea1 == setArea2)
  495. {
  496. return true;
  497. }
  498. else if (ClientConfig::SetDivision.substr(0,ClientConfig::SetDivision.find_first_of(".")) !=
  499. sSetName.substr(0,sSetName.find_first_of(".")))
  500. {
  501. //属于不同的set之间调用也属于合法
  502. return true;
  503. }
  504. else
  505. {
  506. TLOGERROR("[ServantHandle::checkValidSetInvoke|"
  507. << current->getIp() << "|"
  508. << current->getMessageType() << "|"
  509. << current->getServantName() << "|"
  510. << current->getFuncName() << "|client:"
  511. << ClientConfig::SetDivision << "|server:"
  512. << sSetName << "]" << endl);
  513. current->sendResponse(TARSINVOKEBYINVALIDESET);
  514. return false;
  515. }
  516. }
  517. }
  518. else
  519. {
  520. TLOGERROR("[ServantHandle::checkValidSetInvoke|"
  521. << current->getIp() << "|"
  522. << current->getMessageType() << "|"
  523. << current->getServantName() << "|"
  524. << current->getFuncName() << "|client:"
  525. << ClientConfig::SetDivision << "|server:"
  526. << sSetName << "]" << endl);
  527. current->sendResponse(TARSINVOKEBYINVALIDESET);
  528. return false;
  529. }
  530. }
  531. //没有按set规则调用
  532. return true;
  533. }
  534. void ServantHandle::handleTarsProtocol(const CurrentPtr &current)
  535. {
  536. TLOGTARS("[ServantHandle::handleTarsProtocol current:"
  537. << current->getIp() << "|"
  538. << current->getPort() << "|"
  539. << current->getMessageType() << "|"
  540. << current->getServantName() << "|"
  541. << current->getFuncName() << "|"
  542. << current->getRequestId() << "|"
  543. << TC_Common::tostr(current->getRequestStatus()) << "]"<<endl);
  544. //检查set调用合法性
  545. if (!checkValidSetInvoke(current))
  546. {
  547. return;
  548. }
  549. //处理染色消息
  550. string dyeingKey = "";
  551. TarsDyeingSwitch dyeSwitch;
  552. if (processDye(current, dyeingKey))
  553. {
  554. dyeSwitch.enableDyeing(dyeingKey);
  555. }
  556. processTrace(current);
  557. //处理cookie
  558. map<string, string> cookie;
  559. CookieOp cookieOp;
  560. if (processCookie(current, cookie))
  561. {
  562. cookieOp.setCookie(cookie);
  563. current->setCookie(cookie);
  564. }
  565. // processSample(current);
  566. auto sit = _servants.find(current->getServantName());
  567. if (sit == _servants.end())
  568. {
  569. current->sendResponse(TARSSERVERNOSERVANTERR);
  570. // #ifdef TARS_OPENTRACKING
  571. // finishTracking(TARSSERVERNOSERVANTERR, current);
  572. // #endif
  573. return;
  574. }
  575. int ret = TARSSERVERUNKNOWNERR;
  576. string sResultDesc = "";
  577. ResponsePacket response;
  578. try
  579. {
  580. //业务逻辑处理
  581. ret = sit->second->dispatch(current, response.sBuffer);
  582. }
  583. catch(TarsDecodeException &ex)
  584. {
  585. TLOGERROR("[ServantHandle::handleTarsProtocol " << ex.what() << "]" << endl);
  586. ret = TARSSERVERDECODEERR;
  587. sResultDesc = ex.what();
  588. }
  589. catch(TarsEncodeException &ex)
  590. {
  591. TLOGERROR("[ServantHandle::handleTarsProtocol " << ex.what() << "]" << endl);
  592. ret = TARSSERVERENCODEERR;
  593. sResultDesc = ex.what();
  594. }
  595. catch(exception &ex)
  596. {
  597. TLOGERROR("[ServantHandle::handleTarsProtocol " << ex.what() << "]" << endl);
  598. ret = TARSSERVERUNKNOWNERR;
  599. sResultDesc = ex.what();
  600. }
  601. catch(...)
  602. {
  603. TLOGERROR("[ServantHandle::handleTarsProtocol unknown error]" << endl);
  604. ret = TARSSERVERUNKNOWNERR;
  605. sResultDesc = "handleTarsProtocol unknown exception error";
  606. }
  607. if(ret == TARSSERVERNOFUNCERR)
  608. {
  609. ret = sit->second->doNoFunc(current);
  610. }
  611. //单向调用或者业务不需要同步返回
  612. if (current->isResponse())
  613. {
  614. current->sendResponse(ret, response, Current::TARS_STATUS(), sResultDesc);
  615. }
  616. #ifdef TARS_OPENTRACKING
  617. finishTracking(ret, current);
  618. #endif
  619. }
  620. void ServantHandle::handleNoTarsProtocol(const TarsCurrentPtr &current)
  621. {
  622. TLOGTARS("[ServantHandle::handleNoTarsProtocol current:"
  623. << current->getIp() << "|"
  624. << current->getPort() << "|"
  625. << current->getServantName() << "]" << endl);
  626. auto sit = _servants.find(current->getServantName());
  627. assert(sit != _servants.end());
  628. vector<char> buffer;
  629. try
  630. {
  631. //业务逻辑处理
  632. sit->second->dispatch(current, buffer);
  633. }
  634. catch(exception &ex)
  635. {
  636. TLOGERROR("[ServantHandle::handleNoTarsProtocol " << ex.what() << "]" << endl);
  637. }
  638. catch(...)
  639. {
  640. TLOGERROR("[ServantHandle::handleNoTarsProtocol unknown error]" << endl);
  641. }
  642. if (current->isResponse() && !buffer.empty())
  643. {
  644. current->sendResponse((const char *)(buffer.data()), (uint32_t)buffer.size());
  645. }
  646. }
  647. ////////////////////////////////////////////////////////////////////////////
  648. }