Application.cpp 54 KB


  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #include <sstream>
  17. #include "util/tc_option.h"
  18. #include "util/tc_common.h"
  19. #include "servant/KeepAliveNodeF.h"
  20. #include "servant/Application.h"
  21. #include "servant/AppProtocol.h"
  22. #include "servant/AdminServant.h"
  23. #include "servant/ServantHandle.h"
  24. #include "servant/BaseF.h"
  25. #include "servant/AppCache.h"
  26. #include "servant/NotifyObserver.h"
  27. #include "servant/AuthLogic.h"
  28. #include "servant/CommunicatorFactory.h"
  29. #include <signal.h>
  30. #if TARGET_PLATFORM_LINUX
  31. #include <sys/resource.h>
  32. #endif
  33. #if TARGET_PLATFORM_LINUX || TARGET_PLATFORM_IOS
  34. #include <fcntl.h>
  35. #endif
  36. #if TARS_SSL
  37. #include "util/tc_openssl.h"
  38. #endif
  39. static TC_RollLogger __out__;
  40. #define NOTIFY_AND_WAIT(msg) { \
  41. RemoteNotify::getInstance()->report(msg); \
  42. std::this_thread::sleep_for(std::chrono::milliseconds(20)); \
  43. }
  44. namespace tars
  45. {
  46. std::string ServerConfig::TarsPath; //服务路径
  47. std::string ServerConfig::Application; //应用名称
  48. std::string ServerConfig::ServerName; //服务名称,一个服务名称含一个或多个服务标识
  49. std::string ServerConfig::LocalIp; //本机IP
  50. std::string ServerConfig::BasePath; //应用程序路径,用于保存远程系统配置的本地目录
  51. std::string ServerConfig::DataPath; //应用程序路径,用于本地数据
  52. std::string ServerConfig::Local; //本地套接字
  53. std::string ServerConfig::Node; //本机node地址
  54. std::string ServerConfig::Log; //日志中心地址
  55. std::string ServerConfig::Config; //配置中心地址
  56. std::string ServerConfig::Notify; //信息通知中心
  57. std::string ServerConfig::LogPath; //logpath
  58. int ServerConfig::LogSize; //log大小(字节)
  59. int ServerConfig::LogNum; //log个数()
  60. std::string ServerConfig::LogLevel; //log日志级别
  61. std::string ServerConfig::ConfigFile; //框架配置文件路径
  62. int ServerConfig::ReportFlow = 1; //是否服务端上报所有接口stat流量 0不上报 1上报 (用于非tars协议服务流量统计)
  63. int ServerConfig::IsCheckSet = 1; //是否对按照set规则调用进行合法性检查 0,不检查,1检查
  64. int ServerConfig::OpenCoroutine = 0; //是否启用协程处理方式
  65. size_t ServerConfig::CoroutineMemSize; //协程占用内存空间的最大大小
  66. uint32_t ServerConfig::CoroutineStackSize; //每个协程的栈大小(默认128k)
  67. bool ServerConfig::ManualListen = false; //手工启动监听端口
  68. int ServerConfig::NetThread = 1; //servernet thread
  69. bool ServerConfig::CloseCout = true;
  70. int ServerConfig::BackPacketLimit = 0;
  71. int ServerConfig::BackPacketMin = 1024;
  72. bool ServerConfig::CheckBindAdapter = true;
  73. #if TARS_SSL
  74. std::string ServerConfig::CA;
  75. std::string ServerConfig::Cert;
  76. std::string ServerConfig::Key;
  77. bool ServerConfig::VerifyClient = false;
  78. std::string ServerConfig::Ciphers;
  79. #endif
  80. map<string, string> ServerConfig::Context;
  81. #define SYNC_SERVER_CONFIG(x, y) \
  82. if(_serverConfig.x != serverConfig.x) \
  83. { \
  84. ServerConfig::y = _serverConfig.x; \
  85. } \
  86. if(ServerConfig::y != serverConfig.x) \
  87. { \
  88. _serverConfig.x = ServerConfig::y; \
  89. }
  90. ///////////////////////////////////////////////////////////////////////////////////////////
  91. CommunicatorPtr Application::_communicator = NULL;
  92. PropertyReportPtr g_pReportRspQueue;
  93. /**上报服务端发送队列大小的间隔时间**/
  94. #define REPORT_SEND_QUEUE_INTERVAL 10
  95. ///////////////////////////////////////////////////////////////////////////////////////////
  96. Application::Application()
  97. {
  98. _servantHelper = std::make_shared<ServantHelperManager>();
  99. _notifyObserver = std::make_shared<NotifyObserver>();
  100. setNotifyObserver(_notifyObserver);
  101. #if TARGET_PLATFORM_WINDOWS
  102. WSADATA wsadata;
  103. WSAStartup(MAKEWORD(2, 2), &wsadata);
  104. #endif
  105. }
  106. Application::~Application()
  107. {
  108. if (_epollServer)
  109. {
  110. _epollServer->terminate();
  111. _epollServer = NULL;
  112. }
  113. #if TARGET_PLATFORM_WINDOWS
  114. WSACleanup();
  115. #endif
  116. }
  117. void Application::ApplicationConfig::copyToStatic()
  118. {
  119. ServerConfig::TarsPath = tarsPath;
  120. ServerConfig::Application = application;
  121. ServerConfig::ServerName = serverName;
  122. ServerConfig::LocalIp = localIp;
  123. ServerConfig::BasePath = basePath;
  124. ServerConfig::DataPath = dataPath;
  125. ServerConfig::Local = local;
  126. ServerConfig::Node = node;
  127. ServerConfig::Log = log;
  128. ServerConfig::Config = config;
  129. ServerConfig::Notify = notify;
  130. ServerConfig::LogPath = logPath;
  131. ServerConfig::LogSize = logSize;
  132. ServerConfig::LogNum = logNum;
  133. ServerConfig::LogLevel = logLevel;
  134. ServerConfig::ConfigFile = configFile;
  135. ServerConfig::ReportFlow = reportFlow;
  136. ServerConfig::IsCheckSet = isCheckSet;
  137. ServerConfig::OpenCoroutine = openCoroutine;
  138. ServerConfig::CoroutineMemSize = coroutineMemSize;
  139. ServerConfig::CoroutineStackSize = coroutineStackSize;
  140. ServerConfig::ManualListen = manualListen;
  141. ServerConfig::NetThread = netThread;
  142. ServerConfig::CloseCout = closeCout;
  143. ServerConfig::BackPacketLimit = backPacketLimit;
  144. ServerConfig::BackPacketMin = backPacketMin;
  145. ServerConfig::CheckBindAdapter = checkBindAdapter;
  146. #if TARS_SSL
  147. std::string ServerConfig::CA = ca;
  148. std::string ServerConfig::Cert = cert;
  149. std::string ServerConfig::Key = key;
  150. bool ServerConfig::VerifyClient = verifyClient;
  151. std::string ServerConfig::Ciphers = ciphers;
  152. #endif
  153. }
  154. string Application::getTarsVersion()
  155. {
  156. return TARS_VERSION;
  157. }
  158. CommunicatorPtr& Application::getCommunicator()
  159. {
  160. return _communicator;
  161. }
  162. CommunicatorPtr Application::getThisCommunicator()
  163. {
  164. return _thisCommunicator;
  165. }
  166. void reportRspQueue(TC_EpollServer *epollServer)
  167. {
  168. if (!g_pReportRspQueue)
  169. return;
  170. static time_t iLastCheckTime = TNOW;
  171. time_t iNow = TNOW;
  172. if (iNow - iLastCheckTime > REPORT_SEND_QUEUE_INTERVAL)
  173. {
  174. iLastCheckTime = iNow;
  175. const vector<TC_EpollServer::BindAdapterPtr> &adapters = epollServer->getBindAdapters();
  176. size_t n = 0;
  177. for (size_t i = 0; i < adapters.size(); ++i)
  178. {
  179. n = n + adapters[i]->getSendBufferSize();
  180. }
  181. g_pReportRspQueue->report((int)n);
  182. }
  183. }
  184. void Application::manualListen()
  185. {
  186. vector<TC_EpollServer::BindAdapterPtr> v = getEpollServer()->getBindAdapters();
  187. for(auto &b : v)
  188. {
  189. b->manualListen();
  190. }
  191. }
  192. void Application::addServant(const std::string& id, std::function<tars::Servant*()> createServant)
  193. {
  194. _servantHelper->addServant(id, createServant, this, true);
  195. }
  196. void Application::waitForShutdown()
  197. {
  198. assert(_epollServer);
  199. _epollServer->setCallbackFunctor(reportRspQueue);
  200. _epollServer->setDestroyAppFunctor([&](TC_EpollServer *epollServer){
  201. this->destroyApp();
  202. NOTIFY_AND_WAIT("stop");
  203. });
  204. _epollServer->waitForShutdown();
  205. TC_Port::unregisterCtrlC(_ctrlCId);
  206. TC_Port::unregisterTerm(_termId);
  207. _epollServer = NULL;
  208. }
  209. void Application::waitForReady()
  210. {
  211. if(_epollServer)
  212. {
  213. _epollServer->waitForReady();
  214. }
  215. }
  216. void Application::terminate()
  217. {
  218. if (_epollServer && !_epollServer->isTerminate())
  219. {
  220. std::this_thread::sleep_for(std::chrono::milliseconds(100)); //稍微休息一下, 让当前处理包能够回复
  221. _epollServer->terminate();
  222. }
  223. }
  224. bool Application::cmdViewStatus(const string& command, const string& params, string& result)
  225. {
  226. TLOGTARS("Application::cmdViewStatus:" << command << " " << params << endl);
  227. ostringstream os;
  228. os << OUT_LINE_LONG << endl;
  229. os << TC_Common::outfill("[proxy config]:") << endl;
  230. outClient(os);
  231. os << OUT_LINE << "\n" << TC_Common::outfill("[server config]:") << endl;
  232. outServer(os);
  233. os << OUT_LINE << endl;
  234. outAllAdapter(os);
  235. result = os.str();
  236. return true;
  237. }
  238. bool Application::cmdCloseCoreDump(const string& command, const string& params, string& result)
  239. {
  240. #if TARGET_PLATFORM_LINUX || TARGET_PLATFORM_IOS
  241. struct rlimit tlimit;
  242. int ret = 0;
  243. ostringstream os;
  244. ret = getrlimit(RLIMIT_CORE, &tlimit);
  245. if (ret != 0)
  246. {
  247. TLOGERROR("error: "<<strerror(errno)<<endl);
  248. return false;
  249. }
  250. TLOGDEBUG("before :cur:" << tlimit.rlim_cur << ";max: " << tlimit.rlim_max << endl);
  251. os << (_serverConfig.application + "." + _serverConfig.serverName);
  252. os << "|before set:cur:" << tlimit.rlim_cur << ";max: " << tlimit.rlim_max;
  253. string param = TC_Common::lower(TC_Common::trim(params));
  254. bool bClose = (param == "yes") ? true : false;
  255. if (bClose)
  256. {
  257. tlimit.rlim_cur = 0;
  258. }
  259. else
  260. {
  261. tlimit.rlim_cur = tlimit.rlim_max;
  262. }
  263. ret = setrlimit(RLIMIT_CORE, &tlimit);
  264. if (ret != 0)
  265. {
  266. TLOGERROR("error: "<<strerror(errno)<<endl);
  267. return false;
  268. }
  269. ret = getrlimit(RLIMIT_CORE, &tlimit);
  270. if (ret != 0)
  271. {
  272. TLOGERROR("error: "<<strerror(errno)<<endl);
  273. return false;
  274. }
  275. TLOGDEBUG("after cur:" << tlimit.rlim_cur << ";max: " << tlimit.rlim_max << endl);
  276. os << "|after set cur:" << tlimit.rlim_cur << ";max: " << tlimit.rlim_max << endl;
  277. result = os.str();
  278. #else
  279. TLOGDEBUG("windows not support!");
  280. #endif
  281. return true;
  282. }
  283. bool Application::cmdSetLogLevel(const string& command, const string& params, string& result)
  284. {
  285. TLOGTARS("Application::cmdSetLogLevel:" << command << " " << params << endl);
  286. string level = TC_Common::trim(params);
  287. int ret = LocalRollLogger::getInstance()->logger()->setLogLevel(level);
  288. if (ret == 0)
  289. {
  290. _serverConfig.logLevel = TC_Common::upper(level);
  291. ServerConfig::LogLevel = _serverConfig.logLevel;
  292. result = "set log level [" + level + "] ok";
  293. _thisCommunicator->getAppCache()->set("logLevel", level);
  294. }
  295. else
  296. {
  297. result = "set log level [" + level + "] error";
  298. }
  299. return true;
  300. }
  301. bool Application::cmdEnableDayLog(const string& command, const string& params, string& result)
  302. {
  303. TLOGTARS("Application::cmdEnableDayLog:" << command << " " << params << endl);
  304. vector<string> vParams = TC_Common::sepstr<string>(TC_Common::trim(params), "|");
  305. size_t nNum = vParams.size();
  306. if (!(nNum == 2 || nNum == 3))
  307. {
  308. result = "usage: tars.enabledaylog {remote|local}|[logname]|{true|false}";
  309. return false;
  310. }
  311. if((vParams[0] != "local" && vParams[0] != "remote"))
  312. {
  313. result = "usage: tars.enabledaylog {remote|local}|[logname]|{true|false}";
  314. return false;
  315. }
  316. if(nNum == 2 && (vParams[1] != "true" && vParams[1] != "false"))
  317. {
  318. result = "usage: tars.enabledaylog {remote|local}|[logname]|{true|false}";
  319. return false;
  320. }
  321. if(nNum == 3 && (vParams[2] != "true" && vParams[2] != "false"))
  322. {
  323. result = "usage: tars.enabledaylog {remote|local}|[logname]|{true|false}";
  324. return false;
  325. }
  326. bool bEnable = true;
  327. string sFile;
  328. if (nNum == 2)
  329. {
  330. bEnable = (vParams[1] == "true") ? true : false;
  331. sFile = "";
  332. result = "set " + vParams[0] + " " + vParams[1] + " ok";
  333. }
  334. else if (nNum == 3)
  335. {
  336. bEnable = (vParams[2] == "true") ? true : false;
  337. sFile = vParams[1];
  338. result = "set " + vParams[0] + " " + vParams[1] + " " + vParams[2] + " ok";
  339. }
  340. if (vParams[0] == "local")
  341. {
  342. RemoteTimeLogger::getInstance()->enableLocal(sFile, bEnable);
  343. return true;
  344. }
  345. if (vParams[0] == "remote")
  346. {
  347. RemoteTimeLogger::getInstance()->enableRemote(sFile, bEnable);
  348. return true;
  349. }
  350. result = "usage: tars.enabledaylog {remote|local}|[logname]|{true|false}";
  351. return false;
  352. }
  353. bool Application::cmdLoadConfig(const string& command, const string& params, string& result)
  354. {
  355. TLOGTARS("Application::cmdLoadConfig:" << command << " " << params << endl);
  356. string filename = TC_Common::trim(params);
  357. if (_remoteConfigHelper->addConfig(filename, result, false))
  358. {
  359. RemoteNotify::getInstance()->report(result);
  360. return true;
  361. }
  362. RemoteNotify::getInstance()->report(result);
  363. return true;
  364. }
  365. bool Application::cmdConnections(const string& command, const string& params, string& result)
  366. {
  367. TLOGTARS("Application::cmdConnections:" << command << " " << params << endl);
  368. ostringstream os;
  369. os << OUT_LINE_LONG << endl;
  370. auto m = _epollServer->getListenSocketInfo();
  371. for (auto it = m.begin(); it != m.end(); ++it)
  372. {
  373. vector<TC_EpollServer::ConnStatus> v = it->second->getConnStatus();
  374. os << OUT_LINE << "\n" << TC_Common::outfill("[adapter:" + it->second->getName() + "] [connections:" + TC_Common::tostr(v.size()) + "]") << endl;
  375. os << TC_Common::outfill("conn-uid", ' ', 15)
  376. << TC_Common::outfill("ip:port", ' ', 25)
  377. << TC_Common::outfill("last-time", ' ', 25)
  378. << TC_Common::outfill("timeout", ' ', 10)
  379. << TC_Common::outfill("recvBufferSize", ' ', 30)
  380. << TC_Common::outfill("sendBufferSize", ' ', 30)
  381. << endl;
  382. for (size_t i = 0; i < v.size(); i++)
  383. {
  384. os << TC_Common::outfill(TC_Common::tostr<uint32_t>(v[i].uid), ' ', 15)
  385. << TC_Common::outfill(v[i].ip + ":" + TC_Common::tostr(v[i].port), ' ', 25)
  386. << TC_Common::outfill(TC_Common::tm2str(v[i].iLastRefreshTime, "%Y-%m-%d %H:%M:%S"), ' ', 25)
  387. << TC_Common::outfill(TC_Common::tostr(v[i].timeout), ' ', 10)
  388. << TC_Common::outfill(TC_Common::tostr(v[i].recvBufferSize), ' ', 30)
  389. << TC_Common::outfill(TC_Common::tostr(v[i].sendBufferSize), ' ', 30)
  390. << endl;
  391. }
  392. }
  393. os << OUT_LINE_LONG << endl;
  394. result = os.str();
  395. return true;
  396. }
  397. bool Application::cmdViewVersion(const string& command, const string& params, string& result)
  398. {
  399. result = "$" + string(TARS_VERSION) + "$";
  400. return true;
  401. }
  402. bool Application::cmdViewBuildID(const string& command, const string& params, string& result)
  403. {
  404. #define YEARSUF ((__DATE__ [9] - '0') * 10 + (__DATE__ [10] - '0'))
  405. #define MONTH (__DATE__ [2] == 'n' ? (__DATE__ [1] == 'a' ? 0 : 5) \
  406. : __DATE__ [2] == 'b' ? 1 \
  407. : __DATE__ [2] == 'r' ? (__DATE__ [0] == 'M' ? 2 : 3) \
  408. : __DATE__ [2] == 'y' ? 4 \
  409. : __DATE__ [2] == 'l' ? 6 \
  410. : __DATE__ [2] == 'g' ? 7 \
  411. : __DATE__ [2] == 'p' ? 8 \
  412. : __DATE__ [2] == 't' ? 9 \
  413. : __DATE__ [2] == 'v' ? 10 : 11)
  414. #define DAY ((__DATE__ [4] == ' ' ? 0 : __DATE__ [4] - '0') * 10 \
  415. + (__DATE__ [5] - '0'))
  416. #define TIMEINT ((((((__TIME__[0] - '0') * 10 + (__TIME__[1] - '0')) * 10 \
  417. + (__TIME__[3] - '0')) * 10 + (__TIME__[4] - '0')) * 10 \
  418. + (__TIME__[6] - '0')) * 10 + (__TIME__[7] - '0'))
  419. char buildTime[50] = {0};
  420. sprintf(buildTime, "%d.%02d%02d.%06d", YEARSUF, MONTH + 1, DAY, TIMEINT);
  421. result = "$" + _serverConfig.application + "." + _serverConfig.serverName + "-" + string(buildTime) + "$";
  422. return true;
  423. }
  424. bool Application::cmdLoadProperty(const string& command, const string& params, string& result)
  425. {
  426. try
  427. {
  428. TLOGTARS("Application::cmdLoadProperty:" << command << " " << params << endl);
  429. //重新解析配置文件
  430. _conf.parseFile(_serverConfig.configFile);
  431. string sResult = "";
  432. //加载通讯器属性
  433. _thisCommunicator->setProperty(_conf);
  434. _thisCommunicator->reloadProperty(sResult);
  435. //加载远程对象
  436. _serverConfig.log = _conf.get("/tars/application/server<log>");
  437. ServerConfig::Log = _serverConfig.log;
  438. RemoteTimeLogger::getInstance()->setLogInfo(_thisCommunicator, _serverConfig.log, _serverConfig.application, _serverConfig.serverName, _serverConfig.logPath,setDivision());
  439. _serverConfig.config = _conf.get("/tars/application/server<config>");
  440. ServerConfig::Config = _serverConfig.config;
  441. _remoteConfigHelper->setConfigInfo(_thisCommunicator, _serverConfig.config, _serverConfig.application, _serverConfig.serverName, _serverConfig.basePath,setDivision(), 5);
  442. _serverConfig.notify = _conf.get("/tars/application/server<notify>");
  443. ServerConfig::Notify = _serverConfig.notify;
  444. RemoteNotify::getInstance()->setNotifyInfo(_thisCommunicator, _serverConfig.notify, _serverConfig.application, _serverConfig.serverName, setDivision(), _serverConfig.localIp);
  445. result = "loaded config items:\r\n" + sResult +
  446. "log=" + _serverConfig.log + "\r\n" +
  447. "config=" + _serverConfig.config + "\r\n" +
  448. "notify=" + _serverConfig.notify + "\r\n";
  449. }
  450. catch (TC_Config_Exception & ex)
  451. {
  452. result = "load config " + _serverConfig.configFile + " error:" + ex.what();
  453. }
  454. catch (exception &ex)
  455. {
  456. result = ex.what();
  457. }
  458. return true;
  459. }
  460. bool Application::cmdViewAdminCommands(const string& command, const string& params, string& result)
  461. {
  462. TLOGTARS("Application::cmdViewAdminCommands:" << command << " " << params << endl);
  463. result = result + _notifyObserver->viewRegisterCommand();
  464. return true;
  465. }
  466. bool Application::cmdSetDyeing(const string& command, const string& params, string& result)
  467. {
  468. vector<string> vDyeingParams = TC_Common::sepstr<string>(params, " ");
  469. if (vDyeingParams.size() == 2 || vDyeingParams.size() == 3)
  470. {
  471. _servantHelper->setDyeing(vDyeingParams[0], vDyeingParams[1], vDyeingParams.size() == 3 ? vDyeingParams[2] : "");
  472. result = "DyeingKey=" + vDyeingParams[0] + "\r\n" +
  473. "DyeingServant=" + vDyeingParams[1] + "\r\n" +
  474. "DyeingInterface=" + (vDyeingParams.size() == 3 ? vDyeingParams[2] : "") + "\r\n";
  475. }
  476. else
  477. {
  478. result = "Invalid parameters.Should be: dyeingKey dyeingServant [dyeingInterface]";
  479. }
  480. return true;
  481. }
  482. bool Application::cmdCloseCout(const string& command, const string& params, string& result)
  483. {
  484. TLOGTARS("Application::cmdCloseCout:" << command << " " << params << endl);
  485. string s = TC_Common::lower(TC_Common::trim(params));
  486. if (s == "yes")
  487. {
  488. _thisCommunicator->getAppCache()->set("closeCout", "1");
  489. }
  490. else
  491. {
  492. _thisCommunicator->getAppCache()->set("closeCout", "0");
  493. }
  494. result = "set closeCout [" + s + "] ok";
  495. return true;
  496. }
  497. bool Application::cmdReloadLocator(const string& command, const string& params, string& result)
  498. {
  499. TLOGDEBUG("Application::cmdReloadLocator:" << command << " " << params << endl);
  500. string sPara = TC_Common::lower(TC_Common::trim(params));
  501. bool bSucc(true);
  502. if (sPara == "reload")
  503. {
  504. TLOGDEBUG(__FUNCTION__ << "|" << __LINE__ << "|conf file:" << _serverConfig.configFile << endl);
  505. TC_Config reloadConf;
  506. reloadConf.parseFile(_serverConfig.configFile);
  507. string sLocator = reloadConf.get("/tars/application/client/<locator>", "");
  508. TLOGDEBUG(__FUNCTION__ << "|" << __LINE__ << "|conf file:" << _serverConfig.configFile << "\n"
  509. << "|sLocator:" << sLocator << endl);
  510. if (sLocator.empty())
  511. {
  512. bSucc = false;
  513. result = "locator info is null.";
  514. }
  515. else
  516. {
  517. _thisCommunicator->setProperty("locator", sLocator);
  518. _thisCommunicator->reloadLocator();
  519. result = sLocator + " set succ.";
  520. }
  521. }
  522. else
  523. {
  524. result = "please input right paras.";
  525. bSucc = false;
  526. }
  527. return bSucc;
  528. }
  529. bool Application::cmdViewResource(const string& command, const string& params, string& result)
  530. {
  531. TLOGDEBUG("Application::cmdViewResource:" << command << " " << params << endl);
  532. ostringstream os;
  533. os << _thisCommunicator->getResourcesInfo() << endl;
  534. os << OUT_LINE << endl;
  535. vector<TC_EpollServer::BindAdapterPtr> adapters = _epollServer->getBindAdapters();
  536. for(auto adapter : adapters)
  537. {
  538. outAdapter(os, _servantHelper->getAdapterServant(adapter->getName()), adapter);
  539. os << TC_Common::outfill("recv-buffer-count") << adapter->getRecvBufferSize() << endl;
  540. os << TC_Common::outfill("send-buffer-count") << adapter->getSendBufferSize() << endl;
  541. }
  542. result += os.str();
  543. TLOGDEBUG("Application::cmdViewResource result:" << result << endl);
  544. return true;
  545. }
  546. void Application::outAllAdapter(ostream &os)
  547. {
  548. auto m = _epollServer->getListenSocketInfo();
  549. for (auto it = m.begin(); it != m.end(); ++it)
  550. {
  551. outAdapter(os, _servantHelper->getAdapterServant(it->second->getName()), it->second);
  552. os << OUT_LINE << endl;
  553. }
  554. }
  555. bool Application::addConfig(const string &filename)
  556. {
  557. string result;
  558. if (_remoteConfigHelper->addConfig(filename, result, false))
  559. {
  560. RemoteNotify::getInstance()->report(result);
  561. return true;
  562. }
  563. RemoteNotify::getInstance()->report(result);
  564. return true;
  565. }
  566. bool Application::addAppConfig(const string &filename)
  567. {
  568. string result = "";
  569. // true-只获取应用级别配置
  570. if (_remoteConfigHelper->addConfig(filename, result, true))
  571. {
  572. RemoteNotify::getInstance()->report(result);
  573. return true;
  574. }
  575. RemoteNotify::getInstance()->report(result);
  576. return true;
  577. }
  578. void Application::main(int argc, char *argv[])
  579. {
  580. TC_Option op;
  581. op.decode(argc, argv);
  582. main(op);
  583. }
  584. void Application::main(const TC_Option &option)
  585. {
  586. __out__.modFlag(0xfffff, false);
  587. //直接输出编译的TAF版本
  588. if (option.hasParam("version"))
  589. {
  590. __out__.debug() << "TARS:" << TARS_VERSION << endl;
  591. exit(0);
  592. }
  593. //加载配置文件
  594. _serverConfig.configFile = option.getValue("config");
  595. if (_serverConfig.configFile == "")
  596. {
  597. cerr << "start server with config, for example: exe --config=config.conf" << endl;
  598. exit(-1);
  599. }
  600. string config = TC_File::load2str(_serverConfig.configFile);
  601. __out__.debug() << "config:" << _serverConfig.configFile << endl;
  602. __out__.debug() << "config:" << config << endl;
  603. main(config);
  604. }
  605. void Application::main(const string &config)
  606. {
  607. try
  608. {
  609. #if TARGET_PLATFORM_LINUX || TARGET_PLATFORM_IOS
  610. TC_Common::ignorePipe();
  611. #endif
  612. __out__.modFlag(0xFFFF, false);
  613. //解析配置文件
  614. parseConfig(config);
  615. //初始化Proxy部分
  616. initializeClient();
  617. //初始化Server部分
  618. initializeServer();
  619. vector <TC_EpollServer::BindAdapterPtr> adapters;
  620. //绑定对象和端口
  621. bindAdapter(adapters);
  622. stringstream os;
  623. //输出所有adapter
  624. outAllAdapter(os);
  625. __out__.info() << os.str();
  626. __out__.info() << "\n" << TC_Common::outfill("[initialize server] ", '.') << " [Done]" << endl;
  627. __out__.info() << OUT_LINE_LONG << endl;
  628. {
  629. bool initing = true;
  630. std::mutex mtx;
  631. std::condition_variable cond;
  632. std::thread keepActiving([&]
  633. {
  634. do
  635. {
  636. //发送心跳给node, 表示正在启动
  637. // TARS_KEEPACTIVING;
  638. _nodeHelper->keepActiving();
  639. //等待initialize初始化完毕
  640. std::unique_lock<std::mutex> lock(mtx);
  641. cond.wait_for(lock, std::chrono::seconds(5), [&](){
  642. return !initing;
  643. });
  644. }while(initing);
  645. });
  646. try
  647. {
  648. //业务应用的初始化
  649. initialize();
  650. {
  651. std::unique_lock<std::mutex> lock(mtx);
  652. initing = false;
  653. cond.notify_all();
  654. }
  655. keepActiving.join();
  656. }
  657. catch (exception & ex)
  658. {
  659. keepActiving.detach();
  660. NOTIFY_AND_WAIT("exit: " + string(ex.what()));
  661. __out__.error() << "[init exception]:" << ex.what() << endl;
  662. exit(-1);
  663. }
  664. }
  665. //动态加载配置文件
  666. TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_LOAD_CONFIG, Application::cmdLoadConfig);
  667. //动态设置滚动日志等级
  668. TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_SET_LOG_LEVEL, Application::cmdSetLogLevel);
  669. //动态设置按天日志等级
  670. TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_SET_DAYLOG_LEVEL, Application::cmdEnableDayLog);
  671. //查看服务状态
  672. TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_VIEW_STATUS, Application::cmdViewStatus);
  673. //查看当前链接状态
  674. TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_CONNECTIONS, Application::cmdConnections);
  675. //查看编译的TARS版本
  676. TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_VIEW_VERSION, Application::cmdViewVersion);
  677. //查看服务buildid(编译时间)
  678. TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_VIEW_BID, Application::cmdViewBuildID);
  679. //加载配置文件中的属性信息
  680. TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_LOAD_PROPERTY, Application::cmdLoadProperty);
  681. //查看服务支持的管理命令
  682. TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_VIEW_ADMIN_COMMANDS, Application::cmdViewAdminCommands);
  683. //设置染色信息
  684. TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_SET_DYEING, Application::cmdSetDyeing);
  685. //设置服务的core limit
  686. TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_CLOSE_CORE, Application::cmdCloseCoreDump);
  687. //设置是否标准输出
  688. TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_CLOSE_COUT, Application::cmdCloseCout);
  689. //设置是否标准输出
  690. TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_RELOAD_LOCATOR, Application::cmdReloadLocator);
  691. //设置是否标准输出
  692. TARS_ADD_ADMIN_CMD_PREFIX(TARS_CMD_RESOURCE, Application::cmdViewResource);
  693. //上报版本
  694. // TARS_REPORTVERSION(TARS_VERSION);
  695. _nodeHelper->reportVersion(TARS_VERSION);
  696. //发送心跳给node, 表示启动了
  697. // TARS_KEEPALIVE("");
  698. _nodeHelper->keepAlive("");
  699. //发送给notify表示服务启动了
  700. RemoteNotify::getInstance()->report("restart");
  701. //ctrl + c能够完美结束服务
  702. _ctrlCId = TC_Port::registerCtrlC([=]{
  703. this->terminate();
  704. #if TARGET_PLATFORM_WINDOWS
  705. ExitProcess(0);
  706. #endif
  707. });
  708. _termId = TC_Port::registerTerm([=]{
  709. this->terminate();
  710. #if TARGET_PLATFORM_WINDOWS
  711. ExitProcess(0);
  712. #endif
  713. });
  714. #if TARGET_PLATFORM_LINUX || TARGET_PLATFORM_IOS
  715. if(_conf.get("/tars/application/server<closecout>", _thisCommunicator->getAppCache()->get("closeCout")) != "0")
  716. {
  717. // 重定向stdin、stdout、stderr
  718. int fd = open("/dev/null", O_RDWR );
  719. if (fd != -1)
  720. {
  721. dup2(fd, 0);
  722. dup2(fd, 1);
  723. dup2(fd, 2);
  724. }
  725. else
  726. {
  727. close(0);
  728. close(1);
  729. close(2);
  730. }
  731. }
  732. #endif
  733. }
  734. catch (exception &ex)
  735. {
  736. __out__.error() << "[Application]:" << ex.what() << endl;
  737. terminate();
  738. NOTIFY_AND_WAIT("exit: " + string(ex.what()));
  739. exit(-1);
  740. }
  741. //初始化完毕后, 日志再修改为异步
  742. LocalRollLogger::getInstance()->sync(false);
  743. }
  744. void Application::parseConfig(const string &config)
  745. {
  746. _conf.parseString(config);
  747. __out__.setLogLevel(_conf.get("/tars/application/server<start_output>", "DEBUG"));
  748. onParseConfig(_conf);
  749. }
  750. TC_EpollServer::BindAdapter::EOrder Application::parseOrder(const string &s)
  751. {
  752. vector<string> vtOrder = TC_Common::sepstr<string>(s, ";, \t", false);
  753. if (vtOrder.size() != 2)
  754. {
  755. cerr << "invalid order '" << TC_Common::tostr(vtOrder) << "'." << endl;
  756. exit(0);
  757. }
  758. if ((TC_Common::lower(vtOrder[0]) == "allow") && (TC_Common::lower(vtOrder[1]) == "deny"))
  759. {
  760. return TC_EpollServer::BindAdapter::ALLOW_DENY;
  761. }
  762. if ((TC_Common::lower(vtOrder[0]) == "deny") && (TC_Common::lower(vtOrder[1]) == "allow"))
  763. {
  764. return TC_EpollServer::BindAdapter::DENY_ALLOW;
  765. }
  766. cerr << "invalid order '" << TC_Common::tostr(vtOrder) << "'." << endl;
  767. exit(0);
  768. }
  769. void Application::initializeClient()
  770. {
  771. __out__.info() << "\n" << OUT_LINE_LONG << endl;
  772. if(CommunicatorFactory::getInstance()->hasCommunicator())
  773. {
  774. //已经存在缺省通信器了, 创建新的通信器
  775. _thisCommunicator = CommunicatorFactory::getInstance()->getCommunicator(_conf, "application-" + TC_Common::tostr(this));
  776. }
  777. else
  778. {
  779. //初始化缺省通信器
  780. _communicator = CommunicatorFactory::getInstance()->getCommunicator(_conf);
  781. _thisCommunicator = _communicator;
  782. }
  783. __out__.info() << TC_Common::outfill("[proxy config]:") << endl;
  784. //输出
  785. stringstream os;
  786. outClient(os);
  787. __out__.info() << os.str();
  788. }
  789. void Application::outClient(ostream &os)
  790. {
  791. os << OUT_LINE << "\n" << TC_Common::outfill("[load client]:") << endl;
  792. os << TC_Common::outfill("locator") << _thisCommunicator->getProperty("locator") << endl;
  793. os << TC_Common::outfill("sync-invoke-timeout") << _thisCommunicator->getProperty("sync-invoke-timeout") << endl;
  794. os << TC_Common::outfill("async-invoke-timeout") << _thisCommunicator->getProperty("async-invoke-timeout") << endl;
  795. os << TC_Common::outfill("refresh-endpoint-interval") << _thisCommunicator->getProperty("refresh-endpoint-interval") << endl;
  796. os << TC_Common::outfill("stat") << _thisCommunicator->getProperty("stat") << endl;
  797. os << TC_Common::outfill("property") << _thisCommunicator->getProperty("property") << endl;
  798. os << TC_Common::outfill("report-interval") << _thisCommunicator->getProperty("report-interval") << endl;
  799. os << TC_Common::outfill("keep-alive-interval") << _thisCommunicator->getProperty("keep-alive-interval") << endl;
  800. os << TC_Common::outfill("netthread") << _thisCommunicator->getProperty("netthread") << endl;
  801. os << TC_Common::outfill("asyncthread") << _thisCommunicator->getProperty("asyncthread") << endl;
  802. os << TC_Common::outfill("modulename") << _thisCommunicator->getProperty("modulename") << endl;
  803. os << TC_Common::outfill("enableset") << _thisCommunicator->getProperty("enableset") << endl;
  804. os << TC_Common::outfill("setdivision") << _thisCommunicator->getProperty("setdivision") << endl;
  805. }
  806. string Application::toDefault(const string &s, const string &sDefault)
  807. {
  808. if (s.empty())
  809. {
  810. return sDefault;
  811. }
  812. return s;
  813. }
  814. string Application::setDivision()
  815. {
  816. bool bEnableSet = TC_Common::lower(_conf.get("/tars/application<enableset>", "n"))=="y"?true:false;;
  817. string sSetDevision = bEnableSet?_conf.get("/tars/application<setdivision>", ""):"";
  818. return sSetDevision;
  819. }
  820. void Application::addServantProtocol(const string& servant, const TC_NetWorkBuffer::protocol_functor& protocol)
  821. {
  822. string adapterName = _servantHelper->getServantAdapter(servant);
  823. if (adapterName == "")
  824. {
  825. throw runtime_error("addServantProtocol fail, no found adapter for servant:" + servant);
  826. }
  827. getEpollServer()->getBindAdapter(adapterName)->setProtocol(protocol);
  828. }
  829. void Application::addAcceptCallback(const TC_EpollServer::accept_callback_functor& cb)
  830. {
  831. _acceptFuncs.push_back(cb);
  832. }
  833. void Application::onAccept(TC_EpollServer::Connection* cPtr)
  834. {
  835. for (size_t i = 0; i < _acceptFuncs.size(); ++i)
  836. {
  837. _acceptFuncs[i](cPtr);
  838. }
  839. }
  840. void Application::outServer(ostream &os)
  841. {
  842. os << TC_Common::outfill("Application(app)") << _serverConfig.application << endl;
  843. os << TC_Common::outfill("ServerName(server)") << _serverConfig.serverName << endl;
  844. os << TC_Common::outfill("BasePath(basepath)") << _serverConfig.basePath << endl;
  845. os << TC_Common::outfill("DataPath(datapath)") << _serverConfig.dataPath << endl;
  846. os << TC_Common::outfill("LocalIp(localip)") << _serverConfig.localIp << endl;
  847. os << TC_Common::outfill("Local(local)") << _serverConfig.local << endl;
  848. os << TC_Common::outfill("LogPath(logpath)") << _serverConfig.logPath << endl;
  849. os << TC_Common::outfill("LogSize(logsize)") << _serverConfig.logSize << endl;
  850. os << TC_Common::outfill("LogNum(lognum)") << _serverConfig.logNum << endl;
  851. os << TC_Common::outfill("LogLevel(loglevel)") << _serverConfig.logLevel << endl;
  852. os << TC_Common::outfill("Log(log)") << _serverConfig.log << endl;
  853. os << TC_Common::outfill("Node(node)") << _serverConfig.node << endl;
  854. os << TC_Common::outfill("Config(config)") << _serverConfig.config << endl;
  855. os << TC_Common::outfill("Notify(notify)") << _serverConfig.notify << endl;
  856. os << TC_Common::outfill("OpenCoroutine(opencoroutine)") << _serverConfig.openCoroutine << endl;
  857. os << TC_Common::outfill("CoroutineMemSize(coroutinememsize)") << _serverConfig.coroutineMemSize << endl;
  858. os << TC_Common::outfill("CoroutineStackSize(coroutinestack)") << _serverConfig.coroutineStackSize << endl;
  859. os << TC_Common::outfill("CloseCout(closecout)") << _serverConfig.closeCout << endl;
  860. os << TC_Common::outfill("NetThread(netthread)") << _serverConfig.netThread << endl;
  861. os << TC_Common::outfill("ManualListen(manuallisten)") << _serverConfig.manualListen << endl;
  862. os << TC_Common::outfill("ReportFlow(reportflow)") << _serverConfig.reportFlow<< endl;
  863. os << TC_Common::outfill("BackPacketLimit(backpacketlimit)") << _serverConfig.backPacketLimit<< endl;
  864. os << TC_Common::outfill("BackPacketMin(backpacketmin)") << _serverConfig.backPacketMin<< endl;
  865. #if TARS_SSL
  866. os << TC_Common::outfill("Ca(ca)") << _serverConfig.ca << endl;
  867. os << TC_Common::outfill("Cert(cert)") << _serverConfig.cert << endl;
  868. os << TC_Common::outfill("Key(key)") << _serverConfig.key << endl;
  869. os << TC_Common::outfill("VerifyClient(verifyclient)") << _serverConfig.verifyClient << endl;
  870. os << TC_Common::outfill("Ciphers(ciphers)") << _serverConfig.ciphers << endl;
  871. #endif
  872. }
  873. void Application::syncServerConfig(const ApplicationConfig &serverConfig)
  874. {
  875. SYNC_SERVER_CONFIG(tarsPath, TarsPath);
  876. SYNC_SERVER_CONFIG(application, Application);
  877. SYNC_SERVER_CONFIG(serverName, ServerName);
  878. SYNC_SERVER_CONFIG(localIp, LocalIp);
  879. SYNC_SERVER_CONFIG(basePath, BasePath);
  880. SYNC_SERVER_CONFIG(dataPath, DataPath);
  881. SYNC_SERVER_CONFIG(local, Local);
  882. SYNC_SERVER_CONFIG(node, Node);
  883. SYNC_SERVER_CONFIG(log, Log);
  884. SYNC_SERVER_CONFIG(config, Config);
  885. SYNC_SERVER_CONFIG(notify, Notify);
  886. SYNC_SERVER_CONFIG(logPath, LogPath);
  887. SYNC_SERVER_CONFIG(logSize, LogSize);
  888. SYNC_SERVER_CONFIG(logNum, LogNum);
  889. SYNC_SERVER_CONFIG(logLevel, LogLevel);
  890. SYNC_SERVER_CONFIG(configFile, ConfigFile);
  891. SYNC_SERVER_CONFIG(reportFlow, ReportFlow);
  892. SYNC_SERVER_CONFIG(isCheckSet, IsCheckSet);
  893. SYNC_SERVER_CONFIG(openCoroutine, OpenCoroutine);
  894. SYNC_SERVER_CONFIG(coroutineMemSize, CoroutineMemSize);
  895. SYNC_SERVER_CONFIG(coroutineStackSize, CoroutineStackSize);
  896. SYNC_SERVER_CONFIG(manualListen, ManualListen);
  897. SYNC_SERVER_CONFIG(netThread, NetThread);
  898. SYNC_SERVER_CONFIG(closeCout, CloseCout);
  899. SYNC_SERVER_CONFIG(backPacketLimit, BackPacketLimit);
  900. SYNC_SERVER_CONFIG(backPacketMin, BackPacketMin);
  901. SYNC_SERVER_CONFIG(checkBindAdapter, CheckBindAdapter);
  902. #if TARS_SSL
  903. SYNC_SERVER_CONFIG(ca, CA);
  904. SYNC_SERVER_CONFIG(cert, Cert);
  905. SYNC_SERVER_CONFIG(key, Key);
  906. SYNC_SERVER_CONFIG(verifyClient, VerifyClient);
  907. SYNC_SERVER_CONFIG(ciphers, Ciphers);
  908. #endif
  909. }
  910. void Application::initializeServer()
  911. {
  912. __out__.info() << OUT_LINE << "\n" << TC_Common::outfill("[server config]:") << endl;
  913. _serverConfig.application = toDefault(_conf.get("/tars/application/server<app>"), "UNKNOWN");
  914. //缺省采用进程名称
  915. string exe = "";
  916. try
  917. {
  918. exe = TC_File::extractFileName(TC_File::getExePath());
  919. }
  920. catch (TC_File_Exception & ex)
  921. {
  922. //取失败则使用ip代替进程名
  923. exe = _conf.get("/tars/application/server<localip>");
  924. }
  925. _serverConfig.serverName = toDefault(_conf.get("/tars/application/server<server>"), exe);
  926. #if TARGET_PLATFORM_WINDOWS
  927. _serverConfig.basePath = TC_File::simplifyDirectory(_conf.get("/tars/application/server<basepath.win>")) + FILE_SEP;
  928. if (_serverConfig.basePath == FILE_SEP)
  929. {
  930. _serverConfig.basePath = TC_File::simplifyDirectory(toDefault(_conf.get("/tars/application/server<basepath>"), ".")) + FILE_SEP;
  931. }
  932. _serverConfig.dataPath = TC_File::simplifyDirectory(_conf.get("/tars/application/server<datapath.win>")) + FILE_SEP;
  933. if(_serverConfig.dataPath == FILE_SEP)
  934. {
  935. _serverConfig.dataPath = TC_File::simplifyDirectory(toDefault(_conf.get("/tars/application/server<datapath>"), ".")) + FILE_SEP;
  936. }
  937. _serverConfig.logPath = TC_File::simplifyDirectory(_conf.get("/tars/application/server<logpath.win>")) + FILE_SEP;
  938. if(_serverConfig.logPath == FILE_SEP)
  939. {
  940. _serverConfig.logPath = TC_File::simplifyDirectory(toDefault(_conf.get("/tars/application/server<logpath>"), ".")) + FILE_SEP;
  941. }
  942. #else
  943. _serverConfig.basePath = TC_File::simplifyDirectory(toDefault(_conf.get("/tars/application/server<basepath>"), ".")) + FILE_SEP;
  944. _serverConfig.dataPath = TC_File::simplifyDirectory(toDefault(_conf.get("/tars/application/server<datapath>"), ".")) + FILE_SEP;
  945. _serverConfig.logPath = TC_File::simplifyDirectory(toDefault(_conf.get("/tars/application/server<logpath>"), ".")) + FILE_SEP;
  946. #endif
  947. _serverConfig.tarsPath = TC_File::simplifyDirectory(_serverConfig.logPath + FILE_SEP + ".." + FILE_SEP) + FILE_SEP;
  948. _serverConfig.logSize = TC_Common::toSize(toDefault(_conf.get("/tars/application/server<logsize>"), "52428800"), 52428800);
  949. _serverConfig.logNum = TC_Common::strto<int>(toDefault(_conf.get("/tars/application/server<lognum>"), "10"));
  950. _serverConfig.localIp = _conf.get("/tars/application/server<localip>");
  951. _serverConfig.local = _conf.get("/tars/application/server<local>");
  952. _serverConfig.node = _conf.get("/tars/application/server<node>");
  953. _serverConfig.log = _conf.get("/tars/application/server<log>");
  954. _serverConfig.config = _conf.get("/tars/application/server<config>");
  955. _serverConfig.notify = _conf.get("/tars/application/server<notify>");
  956. _serverConfig.reportFlow = _conf.get("/tars/application/server<reportflow>")=="0"?0:1;
  957. _serverConfig.isCheckSet = _conf.get("/tars/application/server<checkset>","1")=="0"?0:1;
  958. _serverConfig.openCoroutine = TC_Common::strto<int>(toDefault(_conf.get("/tars/application/server<opencoroutine>"), "0"));
  959. _serverConfig.coroutineMemSize = TC_Common::toSize(toDefault(_conf.get("/tars/application/server<coroutinememsize>"), "1G"), 1024*1024*1024);
  960. _serverConfig.coroutineStackSize= (uint32_t)TC_Common::toSize(toDefault(_conf.get("/tars/application/server<coroutinestack>"), "128K"), 1024*128);
  961. _serverConfig.manualListen = _conf.get("/tars/application/server<manuallisten>", "0") == "0" ? false : true;
  962. _serverConfig.netThread = TC_Common::strto<int>(toDefault(_conf.get("/tars/application/server<netthread>"), "1"));
  963. _serverConfig.closeCout = _conf.get("/tars/application/server<closecout>","1")=="0"?0:1;
  964. _serverConfig.backPacketLimit = TC_Common::strto<int>(_conf.get("/tars/application/server<backpacketlimit>", TC_Common::tostr(100*1024*1024)));
  965. _serverConfig.backPacketMin = TC_Common::strto<int>(_conf.get("/tars/application/server<backpacketmin>", "1024"));
  966. _serverConfig.checkBindAdapter = _conf.get("/tars/application/server<checkbindadapter>","1")=="0"?false:true;
  967. #if TARS_SSL
  968. _serverConfig.ca = _conf.get("/tars/application/server<ca>");
  969. _serverConfig.cert = _conf.get("/tars/application/server<cert>");
  970. _serverConfig.key = _conf.get("/tars/application/server<key>");
  971. _serverConfig.verifyClient = _conf.get("/tars/application/server<verifyclient>","0")=="0"?false:true;
  972. _serverConfig.ciphers = _conf.get("/tars/application/server<ciphers>");
  973. if(!_serverConfig.Cert.empty()) {
  974. _ctx = TC_OpenSSL::newCtx(_serverConfig.ca, _serverConfig.cert, _serverConfig.key, _serverConfig.verifyClient, _serverConfig.ciphers);
  975. if (!_ctx) {
  976. TLOGERROR("[load server ssl error, ca:" << _serverConfig.ca << endl);
  977. exit(-1);
  978. }
  979. }
  980. #endif
  981. if (_serverConfig.localIp.empty())
  982. {
  983. vector<string> v = TC_Socket::getLocalHosts();
  984. _serverConfig.localIp = "127.0.0.1";
  985. //获取第一个非127.0.0.1的IP
  986. for(size_t i = 0; i < v.size(); i++)
  987. {
  988. if(v[i] != "127.0.0.1")
  989. {
  990. _serverConfig.localIp = v[i];
  991. break;
  992. }
  993. }
  994. }
  995. ServerConfig::Context["node_name"] = _serverConfig.localIp;
  996. //保存之前的
  997. ApplicationConfig serverConfig = _serverConfig;
  998. //配置文件copy到静态
  999. _serverConfig.copyToStatic();
  1000. //让业务有修改配置的机会
  1001. onServerConfig();
  1002. //检查变更
  1003. syncServerConfig(serverConfig);
  1004. ostringstream os;
  1005. //输出信息
  1006. outServer(os);
  1007. __out__.info() << os.str();
  1008. if (_serverConfig.netThread < 1)
  1009. {
  1010. _serverConfig.netThread = 1;
  1011. __out__.info() << OUT_LINE << "\nwarning:netThreadNum < 1." << endl;
  1012. }
  1013. //网络线程的配置数目不能15个
  1014. if (_serverConfig.netThread > 15)
  1015. {
  1016. _serverConfig.netThread = 15;
  1017. __out__.info() << OUT_LINE << "\nwarning:netThreadNum > 15." << endl;
  1018. }
  1019. if(_serverConfig.coroutineMemSize/_serverConfig.coroutineStackSize <= 0)
  1020. {
  1021. __out__.error() << OUT_LINE << "\nerror:coroutine paramter error: coroutinememsize/coroutinestack <= 0!." << endl;
  1022. exit(-1);
  1023. }
  1024. _epollServer = new TC_EpollServer();
  1025. _epollServer->setThreadNum(_serverConfig.netThread);
  1026. _epollServer->setOpenCoroutine((TC_EpollServer::SERVER_OPEN_COROUTINE)_serverConfig.openCoroutine);
  1027. _epollServer->setCoroutineStack(_serverConfig.coroutineMemSize/_serverConfig.coroutineStackSize, _serverConfig.coroutineStackSize);
  1028. _epollServer->setOnAccept(std::bind(&Application::onAccept, this, std::placeholders::_1));
  1029. //初始化服务是否对空链接进行超时检查
  1030. // bool bEnable = (_conf.get("/tars/application/server<emptyconcheck>","0")=="1")?true:false;
  1031. // _epollServer->enAntiEmptyConnAttack(bEnable);
  1032. _epollServer->setEmptyConnTimeout(TC_Common::strto<int>(toDefault(_conf.get("/tars/application/server<emptyconntimeout>"), "0")));
  1033. ///////////////////////////////////////////////////////////////////////////////////////////////////
  1034. //初始化本地文件cache
  1035. __out__.info() << OUT_LINE << "\n" << TC_Common::outfill("[set file cache ]") << "OK" << endl;
  1036. _thisCommunicator->getAppCache()->setCacheInfo(_serverConfig.dataPath + _serverConfig.serverName + ".tarsdat", 0);
  1037. ///////////////////////////////////////////////////////////////////////////////////////////////////
  1038. //初始化本地Log
  1039. __out__.info() << OUT_LINE << "\n" << TC_Common::outfill("[set roll logger] ") << "OK" << endl;
  1040. LocalRollLogger::getInstance()->setLogInfo(_serverConfig.application, _serverConfig.serverName, _serverConfig.logPath, _serverConfig.logSize, _serverConfig.logNum, _thisCommunicator, _serverConfig.log);
  1041. _epollServer->setLocalLogger(LocalRollLogger::getInstance()->logger());
  1042. //初始化是日志为同步
  1043. LocalRollLogger::getInstance()->sync(true);
  1044. //设置日志级别
  1045. string level = _thisCommunicator->getAppCache()->get("logLevel");
  1046. if(level.empty())
  1047. {
  1048. level = _conf.get("/tars/application/server<logLevel>","DEBUG");
  1049. }
  1050. _serverConfig.logLevel = TC_Common::upper(level);
  1051. LocalRollLogger::getInstance()->logger()->setLogLevel(_serverConfig.logLevel);
  1052. ///////////////////////////////////////////////////////////////////////////////////////////////////
  1053. //初始化到LogServer代理
  1054. __out__.info() << OUT_LINE << "\n" << TC_Common::outfill("[set time logger] ") << "OK" << endl;
  1055. bool bLogStatReport = (_conf.get("/tars/application/server<logstatreport>", "0") == "1") ? true : false;
  1056. RemoteTimeLogger::getInstance()->setLogInfo(_thisCommunicator, _serverConfig.log, _serverConfig.application, _serverConfig.serverName, _serverConfig.logPath, setDivision(), bLogStatReport);
  1057. ///////////////////////////////////////////////////////////////////////////////////////////////////
  1058. //初始化到配置中心代理
  1059. __out__.info() << OUT_LINE << "\n" << TC_Common::outfill("[set remote config] ") << "OK" << endl;
  1060. _remoteConfigHelper = std::make_shared<RemoteConfig>();
  1061. _remoteConfigHelper->setConfigInfo(_thisCommunicator, _serverConfig.config, _serverConfig.application, _serverConfig.serverName, _serverConfig.basePath,setDivision());
  1062. ///////////////////////////////////////////////////////////////////////////////////////////////////
  1063. //初始化到信息中心代理
  1064. __out__.info() << OUT_LINE << "\n" << TC_Common::outfill("[set remote notify] ") << "OK" << endl;
  1065. RemoteNotify::getInstance()->setNotifyInfo(_thisCommunicator, _serverConfig.notify, _serverConfig.application, _serverConfig.serverName, setDivision(), _serverConfig.localIp);
  1066. ///////////////////////////////////////////////////////////////////////////////////////////////////
  1067. //初始化到Node的代理
  1068. __out__.info() << OUT_LINE << "\n" << TC_Common::outfill("[set node proxy]") << "OK" << endl;
  1069. _nodeHelper = std::make_shared<KeepAliveNodeFHelper>();
  1070. _nodeHelper->setNodeInfo(_thisCommunicator, _serverConfig.node, _serverConfig.application, _serverConfig.serverName);
  1071. ///////////////////////////////////////////////////////////////////////////////////////////////////
  1072. //初始化管理对象
  1073. __out__.info() << OUT_LINE << "\n" << TC_Common::outfill("[set admin adapter]") << "OK" << endl;
  1074. if (!_serverConfig.local.empty())
  1075. {
  1076. _servantHelper->addServant("AdminObj", []{return new AdminServant();}, this);
  1077. string adminAdapter = "AdminAdapter";
  1078. _servantHelper->setAdapterServant(adminAdapter, "AdminObj");
  1079. TC_EpollServer::BindAdapterPtr lsPtr = _epollServer->createBindAdapter<ServantHandle>(adminAdapter, _serverConfig.local, 1, this);
  1080. setAdapter(lsPtr, adminAdapter);
  1081. lsPtr->setMaxConns(TC_EpollServer::BindAdapter::DEFAULT_MAX_CONN);
  1082. lsPtr->setQueueCapacity(TC_EpollServer::BindAdapter::DEFAULT_QUEUE_CAP);
  1083. lsPtr->setQueueTimeout(TC_EpollServer::BindAdapter::DEFAULT_QUEUE_TIMEOUT);
  1084. lsPtr->setProtocolName("tars");
  1085. lsPtr->setProtocol(AppProtocol::parse);
  1086. _epollServer->bind(lsPtr);
  1087. }
  1088. //队列取平均值
  1089. if(!_thisCommunicator->getProperty("property").empty())
  1090. {
  1091. string sRspQueue("");
  1092. sRspQueue += _serverConfig.application;
  1093. sRspQueue += ".";
  1094. sRspQueue += _serverConfig.serverName;
  1095. sRspQueue += ".sendrspqueue";
  1096. g_pReportRspQueue = _thisCommunicator->getStatReport()->createPropertyReport(sRspQueue, PropertyReport::avg());
  1097. }
  1098. TarsTimeLogger::getInstance()->enableLocal(TRACE_LOG_FILENAME, false);
  1099. }
  1100. void Application::setAdapter(TC_EpollServer::BindAdapterPtr& adapter, const string &name)
  1101. {
  1102. // 设置该obj的鉴权账号密码,只要一组就够了
  1103. {
  1104. std::string accKey = _conf.get("/tars/application/server/" + name + "<accesskey>");
  1105. std::string secretKey = _conf.get("/tars/application/server/" + name + "<secretkey>");
  1106. //注意这里必须用weak, 否则adapter最终释放不了!
  1107. weak_ptr<TC_EpollServer::BindAdapter> a = adapter;
  1108. adapter->setAkSkCallback(accKey, secretKey, std::bind(&tars::serverVerifyAuthCallback, std::placeholders::_1, std::placeholders::_2, a, _servantHelper->getAdapterServant(name)));
  1109. }
  1110. #if TARS_SSL
  1111. string cert = _conf.get("/tars/application/server/" + name + "<cert>");
  1112. if (!cert.empty())
  1113. {
  1114. string ca = _conf.get("/tars/application/server/" + name + "<ca>");
  1115. string key = _conf.get("/tars/application/server/" + name + "<key>");
  1116. bool verifyClient =
  1117. _conf.get("/tars/application/server/" + name + "<verifyclient>", "0") == "0" ? false : true;
  1118. string ciphers = _conf.get("/tars/application/server/" + name + "<ciphers>");
  1119. shared_ptr<TC_OpenSSL::CTX> ctx = TC_OpenSSL::newCtx(ca, cert, key, verifyClient, ciphers);
  1120. if (!ctx) {
  1121. TLOGERROR("load server ssl error, cert:" << cert << endl);
  1122. exit(-1);
  1123. }
  1124. adapter->setSSLCtx(ctx);
  1125. }
  1126. else
  1127. {
  1128. adapter->setSSLCtx(_ctx);
  1129. }
  1130. #endif
  1131. }
  1132. void Application::bindAdapter(vector<TC_EpollServer::BindAdapterPtr>& adapters)
  1133. {
  1134. string sPrefix = _serverConfig.application + "." + _serverConfig.serverName + ".";
  1135. vector<string> adapterName;
  1136. map<string, ServantHandle*> servantHandles;
  1137. if (_conf.getDomainVector("/tars/application/server", adapterName))
  1138. {
  1139. for (size_t i = 0; i < adapterName.size(); i++)
  1140. {
  1141. string servant = _conf.get("/tars/application/server/" + adapterName[i] + "<servant>");
  1142. checkServantNameValid(servant, sPrefix);
  1143. _servantHelper->setAdapterServant(adapterName[i], servant);
  1144. string sLastPath = "/tars/application/server/" + adapterName[i];
  1145. TC_Endpoint ep;
  1146. ep.parse(_conf[sLastPath + "<endpoint>"]);
  1147. if (ep.getHost() == "localip")
  1148. {
  1149. ep.setHost(_serverConfig.localIp);
  1150. }
  1151. TC_EpollServer::BindAdapterPtr bindAdapter = _epollServer->createBindAdapter<ServantHandle>(adapterName[i], _conf[sLastPath + "<endpoint>"], TC_Common::strto<int>(_conf.get(sLastPath + "<threads>", "1")), this);
  1152. //init auth & ssl
  1153. setAdapter(bindAdapter, adapterName[i]);
  1154. bindAdapter->setMaxConns(TC_Common::strto<int>(_conf.get(sLastPath + "<maxconns>", "128")));
  1155. bindAdapter->setOrder(parseOrder(_conf.get(sLastPath + "<order>", "allow,deny")));
  1156. bindAdapter->setAllow(TC_Common::sepstr<string>(_conf[sLastPath + "<allow>"], ";,", false));
  1157. bindAdapter->setDeny(TC_Common::sepstr<string>(_conf.get(sLastPath + "<deny>", ""), ";,", false));
  1158. bindAdapter->setQueueCapacity(TC_Common::strto<int>(_conf.get(sLastPath + "<queuecap>", "1024")));
  1159. bindAdapter->setQueueTimeout(TC_Common::strto<int>(_conf.get(sLastPath + "<queuetimeout>", "10000")));
  1160. bindAdapter->setProtocolName(_conf.get(sLastPath + "<protocol>", "tars"));
  1161. bindAdapter->setBackPacketBuffLimit(_serverConfig.backPacketLimit);
  1162. bindAdapter->setBackPacketBuffMin(_serverConfig.backPacketMin);
  1163. if (bindAdapter->isTarsProtocol())
  1164. {
  1165. bindAdapter->setProtocol(AppProtocol::parse);
  1166. }
  1167. //校验ssl正常初始化
  1168. #if TARS_SSL
  1169. if (bindAdapter->getEndpoint().isSSL() && (!(bindAdapter->getSSLCtx())))
  1170. {
  1171. __out__.error() << "load server ssl error, no cert config!" << bindAdapter->getEndpoint().toString() << endl;
  1172. exit(-1);
  1173. }
  1174. #endif
  1175. if(_serverConfig.manualListen) {
  1176. //手工监听
  1177. bindAdapter->enableManualListen();
  1178. }
  1179. _epollServer->bind(bindAdapter);
  1180. adapters.push_back(bindAdapter);
  1181. //队列取平均值
  1182. if(!_thisCommunicator->getProperty("property").empty())
  1183. {
  1184. PropertyReportPtr p;
  1185. p = _thisCommunicator->getStatReport()->createPropertyReport(bindAdapter->getName() + ".queue", PropertyReport::avg());
  1186. bindAdapter->_pReportQueue = p.get();
  1187. p = _thisCommunicator->getStatReport()->createPropertyReport(bindAdapter->getName() + ".connectRate", PropertyReport::avg());
  1188. bindAdapter->_pReportConRate = p.get();
  1189. p = _thisCommunicator->getStatReport()->createPropertyReport(bindAdapter->getName() + ".timeoutNum", PropertyReport::sum());
  1190. bindAdapter->_pReportTimeoutNum = p.get();
  1191. }
  1192. }
  1193. }
  1194. }
  1195. void Application::checkServantNameValid(const string& servant, const string& sPrefix)
  1196. {
  1197. if ((servant.length() <= sPrefix.length()) || (servant.substr(0, sPrefix.length()) != sPrefix))
  1198. {
  1199. ostringstream os;
  1200. os << "Servant '" << servant << "' error: must be start with '" << sPrefix << "'";
  1201. NOTIFY_AND_WAIT("exit: " + string(os.str()));
  1202. __out__.error() << os.str() << endl;
  1203. exit(-1);
  1204. }
  1205. }
  1206. void Application::outAdapter(ostream &os, const string &v, TC_EpollServer::BindAdapterPtr lsPtr)
  1207. {
  1208. os << TC_Common::outfill("name") << lsPtr->getName() << endl;
  1209. os << TC_Common::outfill("servant") << v << endl;
  1210. os << TC_Common::outfill("endpoint") << lsPtr->getEndpoint().toString() << endl;
  1211. os << TC_Common::outfill("maxconns") << lsPtr->getMaxConns() << endl;
  1212. os << TC_Common::outfill("queuecap") << lsPtr->getQueueCapacity() << endl;
  1213. os << TC_Common::outfill("queuetimeout") << lsPtr->getQueueTimeout() << "ms" << endl;
  1214. os << TC_Common::outfill("order") << (lsPtr->getOrder() == TC_EpollServer::BindAdapter::ALLOW_DENY ? "allow,deny" : "deny,allow") << endl;
  1215. os << TC_Common::outfill("allow") << TC_Common::tostr(lsPtr->getAllow()) << endl;
  1216. os << TC_Common::outfill("deny") << TC_Common::tostr(lsPtr->getDeny()) << endl;
  1217. os << TC_Common::outfill("connections") << lsPtr->getNowConnection() << endl;
  1218. os << TC_Common::outfill("protocol") << lsPtr->getProtocolName() << endl;
  1219. os << TC_Common::outfill("handlethread") << lsPtr->getHandleNum() << endl;
  1220. }
  1221. //////////////////////////////////////////////////////////////////////////////////////////////////
  1222. }