main.cc 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. * Author: Linjinming, linjinming@jd.com
  17. Qiulu, choulu@jd.com
  18. Yangshuang, yangshuang68@jd.com
  19. Zhulin, shzhulin3@jd.com
  20. Chenyujie, chenyujie28@jd.com
  21. Wuxinzhen, wuxinzhen1@jd.com
  22. Caopei, caopei11@jd.com
  23. */
  24. #include "main_supply.h"
  25. #include "../misc/dtc_code.h"
  26. using namespace ClusterConfig;
  27. const char project_name[] = "dtcd";
  28. const char usage_argv[] = "";
  29. BufferProcessAskChain *g_buffer_process_ask_instance = NULL;
  30. BarrierAskAnswerChain *g_buffer_barrier_instance = NULL;
  31. BufferBypassAskChain *g_buffer_bypass_ask_instance = NULL;
  32. BarrierAskAnswerChain *g_connector_barrier_instance = NULL;
  33. SystemCommandAskChain *g_system_command_ask_instance = NULL;
  34. DataConnectorAskChain *g_data_connector_ask_instance = NULL;
  35. HotBackupAskChain *g_hot_backup_ask_instance = NULL;
  36. KeyRouteAskChain *g_key_route_ask_instance = NULL;
  37. AgentHubAskChain *g_agent_hub_ask_instance = NULL;
  38. JobHubAskChain *g_job_hub_ask_instance = NULL;
  39. RemoteDtcAskAnswerChain *g_remote_dtc_instance = NULL;
  40. BlackHoleAskChain *g_black_hole_ask_instance = NULL;
  41. AgentListenPool *agent_listener = NULL;
  42. ListenerPool *main_listener = NULL;
  43. //single thread mode only use below two thread instance.
  44. PollerBase *g_main_thread = NULL;
  45. PollerBase *g_hot_backup_thread = NULL;
  46. //below thread instance used in multi thread mode.
  47. //remote dispatcher,in order to migrate data to remote dtc.
  48. PollerBase *g_remote_thread = NULL;
  49. PollerBase *g_buffer_multi_thread = NULL;
  50. PollerBase *g_datasource_thread = NULL;
  51. PluginManager *main_plugin_mgr;
  52. int g_max_conn_cnt;
  53. int enable_plugin;
  54. int init_plugin;
  55. int cache_key;
  56. int g_datasource_mode;
  57. int async_update;
  58. int g_target_new_hash;
  59. int g_hash_changing;
  60. char cache_file[256] = CACHE_CONF_NAME;
  61. char table_file[256] = TABLE_CONF_NAME;
  62. extern DTCConfig *g_dtc_config;
  63. extern void init_task_executor(const char *, AgentListenPool *,
  64. JobAskInterface<DTCJobOperation> *);
  65. int collect_load_config(DbConfig *dbconfig);
  66. // second part of entry
  67. static int init_thread(void *dummy);
  68. // thread startp
  69. static int single_thread_mode_initiazation();
  70. static int multiple_thread_mode_initiazation();
  71. #ifdef _CORE_
  72. int main(int argc, char *argv[])
  73. {
  74. init_proc_title(argc, argv);
  75. init_log4cplus();
  76. if (load_entry_parameter(argc, argv) < 0)
  77. return DTC_CODE_FAILED;
  78. if (g_dtc_config->get_int_val("cache", "EnableCoreDump", 1))
  79. init_core_dump();
  80. if (init_config_info())
  81. return DTC_CODE_FAILED;
  82. if (init_statistics())
  83. return DTC_CODE_FAILED;
  84. if (init_cache_mode() < 0)
  85. return DTC_CODE_FAILED;
  86. if (init_daemon() < 0)
  87. return DTC_CODE_FAILED;
  88. Thread::set_auto_config_instance(
  89. g_dtc_config->get_auto_config_instance("cache"));
  90. if (start_dtc(init_thread, NULL) < 0)
  91. return DTC_CODE_FAILED;
  92. if (init_thread(NULL))
  93. return DTC_CODE_FAILED;
  94. Logger::shutdown();
  95. return DTC_CODE_SUCCESS;
  96. }
  97. #endif
  98. //main thread initialization.
  99. static int init_thread(void *dummy)
  100. {
  101. int ret = DTC_CODE_SUCCESS;
  102. Thread *root_thread =
  103. new Thread("dtc-thread-root", Thread::ThreadTypeProcess);
  104. if (root_thread != NULL)
  105. root_thread->initialize_thread();
  106. if (daemon_set_fd_limit(g_dtc_config->get_int_val("cache", "MaxFdCount",
  107. 10240)) < 0)
  108. return DTC_CODE_FAILED;
  109. //start statistic thread.
  110. g_stat_mgr.start_background_thread();
  111. int thread_mode =
  112. g_dtc_config->get_int_val("cache", "UseSingleThread", 1);
  113. //choose mode for single/multiple thread.
  114. switch (thread_mode) {
  115. case SINGLE_THREAD_MODE:
  116. ret = single_thread_mode_initiazation();
  117. break;
  118. case MULTIPLE_THREAD_MODE:
  119. ret = multiple_thread_mode_initiazation();
  120. break;
  121. default:
  122. log4cplus_error("thread mode error:%d", thread_mode);
  123. }
  124. if (ret == DTC_CODE_SUCCESS) {
  125. init_task_executor(TableDefinitionManager::instance()
  126. ->get_cur_table_def()
  127. ->table_name(),
  128. agent_listener,
  129. g_system_command_ask_instance);
  130. log4cplus_info("--------%s-v%s BEGIN!--------", project_name,
  131. version);
  132. daemon_wait();
  133. } else {
  134. log4cplus_error(
  135. "thread initilization failed, now prepare to free resource and exit.");
  136. }
  137. log4cplus_info("--------%s-v%s free resource now --------",
  138. project_name, version);
  139. free_all_resource();
  140. return ret;
  141. }
  142. //chain of responsibility pattern.
  143. static int single_thread_mode_initiazation()
  144. {
  145. // 1. thread initilization.
  146. if (init_main_chain_thread())
  147. return DTC_CODE_FAILED;
  148. if (init_hotbackup_chain_thread())
  149. return DTC_CODE_FAILED;
  150. // 2. chain initilization.
  151. if (init_remote_dtc_chain(g_main_thread) < 0)
  152. return DTC_CODE_FAILED;
  153. if (g_datasource_mode == DTC_MODE_DATABASE_ADDITION) {
  154. if (init_data_connector_ask_chain(g_main_thread) < 0)
  155. return DTC_CODE_FAILED;
  156. }
  157. if (g_datasource_mode != DTC_MODE_DATABASE_ONLY) {
  158. if (init_buffer_process_ask_chain(g_main_thread) < 0)
  159. return DTC_CODE_FAILED;
  160. }
  161. int max_barrier_count =
  162. g_dtc_config->get_int_val("cache", "MaxBarrierCount", 100000);
  163. int max_key_count =
  164. g_dtc_config->get_int_val("cache", "MaxKeyCount", 10000);
  165. g_buffer_barrier_instance =
  166. new BarrierAskAnswerChain(g_main_thread, max_barrier_count,
  167. max_key_count,
  168. BarrierAskAnswerChain::IN_FRONT);
  169. if (g_datasource_mode == DTC_MODE_DATABASE_ONLY) {
  170. g_buffer_bypass_ask_instance =
  171. new BufferBypassAskChain(g_main_thread);
  172. //Step 4 : barrier_cache bind bypass_unit
  173. g_buffer_barrier_instance->get_main_chain()->register_next_chain(
  174. g_buffer_bypass_ask_instance);
  175. //Step 5 : bypass_unit bind helper_unit
  176. g_buffer_bypass_ask_instance->get_main_chain()
  177. ->register_next_chain(g_data_connector_ask_instance);
  178. } else {
  179. g_key_route_ask_instance = new KeyRouteAskChain(
  180. g_main_thread, TableDefinitionManager::instance()
  181. ->get_cur_table_def()
  182. ->key_format());
  183. //ClusterConfig
  184. if (!check_and_create()) {
  185. log4cplus_error(
  186. "check_and_create cluster config error.");
  187. return DTC_CODE_FAILED;
  188. } else {
  189. log4cplus_debug("check_and_create cluster config ok.");
  190. }
  191. std::vector<ClusterNode> clusterConf;
  192. if (!parse_cluster_config(&clusterConf)) {
  193. log4cplus_error("parse_cluster_config error");
  194. return DTC_CODE_FAILED;
  195. } else {
  196. log4cplus_debug("parse_cluster_config ok");
  197. }
  198. g_key_route_ask_instance->init(clusterConf);
  199. if (g_key_route_ask_instance->load_node_state_if_any() != 0) {
  200. log4cplus_error("key route init error!");
  201. return DTC_CODE_FAILED;
  202. } else {
  203. log4cplus_debug("keyRoute->do_init ok");
  204. }
  205. //Setp 4 : barrier_cache bind key_route
  206. g_buffer_barrier_instance->get_main_chain()->register_next_chain(
  207. g_key_route_ask_instance);
  208. //Step 5 : key_route bind cache_process
  209. g_key_route_ask_instance->get_main_chain()->register_next_chain(
  210. g_buffer_process_ask_instance);
  211. //Step 5 : key_route bind remote_client
  212. g_key_route_ask_instance->get_remote_chain()
  213. ->register_next_chain(g_remote_dtc_instance);
  214. //Step 6 : cache_process bind remoteClinet
  215. g_buffer_process_ask_instance->get_remote_chain()
  216. ->register_next_chain(g_remote_dtc_instance);
  217. //Step 6 : cache_process bind hotback_process
  218. g_buffer_process_ask_instance->get_hotbackup_chain()
  219. ->register_next_chain(g_hot_backup_ask_instance);
  220. if (g_datasource_mode == DTC_MODE_CACHE_ONLY) {
  221. g_black_hole_ask_instance =
  222. new BlackHoleAskChain(g_main_thread);
  223. //Step 6 : cache_process bind hole
  224. g_buffer_process_ask_instance->get_main_chain()
  225. ->register_next_chain(
  226. g_black_hole_ask_instance);
  227. } else if (g_datasource_mode == DTC_MODE_DATABASE_ADDITION) {
  228. if (g_buffer_process_ask_instance->update_mode() ||
  229. g_buffer_process_ask_instance->is_mem_dirty()) {
  230. g_connector_barrier_instance =
  231. new BarrierAskAnswerChain(
  232. g_main_thread,
  233. max_barrier_count,
  234. max_key_count,
  235. BarrierAskAnswerChain::IN_BACK);
  236. //Step 6 : cache_process bind barrier_helper
  237. g_buffer_process_ask_instance->get_main_chain()
  238. ->register_next_chain(
  239. g_connector_barrier_instance);
  240. //Step 7 : barrier_helper bind helper_unit
  241. g_connector_barrier_instance->get_main_chain()
  242. ->register_next_chain(
  243. g_data_connector_ask_instance);
  244. } else {
  245. //Step 6 : cache_process bind helper_unit
  246. g_buffer_process_ask_instance->get_main_chain()
  247. ->register_next_chain(
  248. g_data_connector_ask_instance);
  249. }
  250. } else {
  251. log4cplus_error("g_datasource_mode error:%d",
  252. g_datasource_mode);
  253. return DTC_CODE_FAILED;
  254. }
  255. }
  256. g_system_command_ask_instance =
  257. SystemCommandAskChain::get_instance(g_main_thread);
  258. if (NULL == g_system_command_ask_instance) {
  259. log4cplus_error(
  260. "create system command failed, errno[%d], msg[%s]",
  261. errno, strerror(errno));
  262. return DTC_CODE_FAILED;
  263. }
  264. //Step 3 : system_command_instance bind bar_cache
  265. g_system_command_ask_instance->get_main_chain()->register_next_chain(
  266. g_buffer_barrier_instance);
  267. g_job_hub_ask_instance = new JobHubAskChain(g_main_thread);
  268. //Step 2 : multi_plexer bind system_command_instance
  269. g_job_hub_ask_instance->get_main_chain()->register_next_chain(
  270. g_system_command_ask_instance);
  271. g_agent_hub_ask_instance = new AgentHubAskChain(g_main_thread);
  272. //Step 1 : agent_process bind multi_plexer
  273. g_agent_hub_ask_instance->get_main_chain()->register_next_chain(
  274. g_job_hub_ask_instance);
  275. //Step 0 : chain of responsibility entrance.
  276. agent_listener = new AgentListenPool();
  277. if (agent_listener->register_entrance_chain(
  278. g_dtc_config, g_agent_hub_ask_instance, g_main_thread) < 0)
  279. return DTC_CODE_FAILED;
  280. int open_cnt = stat_open_fd();
  281. g_max_conn_cnt =
  282. g_dtc_config->get_int_val("cache", "MaxFdCount", 10240) -
  283. open_cnt - 10; // reserve 10 fds
  284. if (g_max_conn_cnt < 0) {
  285. log4cplus_error("MaxFdCount should large than %d",
  286. open_cnt + 10);
  287. return DTC_CODE_FAILED;
  288. }
  289. // start thread....
  290. if (g_main_thread)
  291. g_main_thread->running_thread();
  292. if (g_hot_backup_thread)
  293. g_hot_backup_thread->running_thread();
  294. return DTC_CODE_SUCCESS;
  295. }
  296. static int multiple_thread_mode_initiazation()
  297. {
  298. if (init_hotbackup_chain_thread())
  299. return DTC_CODE_FAILED;
  300. if (init_data_connector_chain_thread() < 0)
  301. return DTC_CODE_FAILED;
  302. if (init_remote_dtc_chain_thread() < 0)
  303. return DTC_CODE_FAILED;
  304. if (g_datasource_mode == DTC_MODE_DATABASE_ONLY) {
  305. g_data_connector_ask_instance->disable_commit_group();
  306. } else if (init_buffer_process_ask_chain_thread() < 0) {
  307. return DTC_CODE_FAILED;
  308. }
  309. if (g_datasource_mode == DTC_MODE_DATABASE_ADDITION) {
  310. g_data_connector_ask_instance->do_attach(g_datasource_thread);
  311. }
  312. int iMaxBarrierCount =
  313. g_dtc_config->get_int_val("cache", "MaxBarrierCount", 100000);
  314. int iMaxKeyCount =
  315. g_dtc_config->get_int_val("cache", "MaxKeyCount", 10000);
  316. g_buffer_barrier_instance = new BarrierAskAnswerChain(
  317. g_buffer_multi_thread ?: g_datasource_thread, iMaxBarrierCount,
  318. iMaxKeyCount, BarrierAskAnswerChain::IN_FRONT);
  319. if (g_datasource_mode == DTC_MODE_DATABASE_ONLY) {
  320. g_buffer_bypass_ask_instance =
  321. new BufferBypassAskChain(g_datasource_thread);
  322. g_buffer_barrier_instance->get_main_chain()->register_next_chain(
  323. g_buffer_bypass_ask_instance);
  324. g_buffer_bypass_ask_instance->get_main_chain()
  325. ->register_next_chain(g_data_connector_ask_instance);
  326. } else {
  327. g_key_route_ask_instance =
  328. new KeyRouteAskChain(g_buffer_multi_thread,
  329. TableDefinitionManager::instance()
  330. ->get_cur_table_def()
  331. ->key_format());
  332. if (!check_and_create()) {
  333. log4cplus_error("check_and_create error");
  334. return DTC_CODE_FAILED;
  335. } else {
  336. log4cplus_debug("check_and_create ok");
  337. }
  338. std::vector<ClusterNode> clusterConf;
  339. if (!parse_cluster_config(&clusterConf)) {
  340. log4cplus_error("parse_cluster_config error");
  341. return DTC_CODE_FAILED;
  342. } else {
  343. log4cplus_debug("parse_cluster_config ok");
  344. }
  345. g_key_route_ask_instance->init(clusterConf);
  346. if (g_key_route_ask_instance->load_node_state_if_any() != 0) {
  347. log4cplus_error("key route init error!");
  348. return DTC_CODE_FAILED;
  349. }
  350. log4cplus_debug("keyRoute->do_init ok");
  351. g_buffer_barrier_instance->get_main_chain()->register_next_chain(
  352. g_key_route_ask_instance);
  353. g_key_route_ask_instance->get_main_chain()->register_next_chain(
  354. g_buffer_process_ask_instance);
  355. g_key_route_ask_instance->get_remote_chain()
  356. ->register_next_chain(g_remote_dtc_instance);
  357. g_buffer_process_ask_instance->get_remote_chain()
  358. ->register_next_chain(g_remote_dtc_instance);
  359. g_buffer_process_ask_instance->get_hotbackup_chain()
  360. ->register_next_chain(g_hot_backup_ask_instance);
  361. if (g_datasource_mode == DTC_MODE_CACHE_ONLY) {
  362. g_black_hole_ask_instance =
  363. new BlackHoleAskChain(g_datasource_thread);
  364. g_buffer_process_ask_instance->get_main_chain()
  365. ->register_next_chain(
  366. g_black_hole_ask_instance);
  367. } else if (g_datasource_mode == DTC_MODE_DATABASE_ADDITION) {
  368. if (g_buffer_process_ask_instance->update_mode() ||
  369. g_buffer_process_ask_instance->is_mem_dirty()) {
  370. g_connector_barrier_instance =
  371. new BarrierAskAnswerChain(
  372. g_datasource_thread,
  373. iMaxBarrierCount, iMaxKeyCount,
  374. BarrierAskAnswerChain::IN_BACK);
  375. g_buffer_process_ask_instance->get_main_chain()
  376. ->register_next_chain(
  377. g_connector_barrier_instance);
  378. g_connector_barrier_instance->get_main_chain()
  379. ->register_next_chain(
  380. g_data_connector_ask_instance);
  381. } else {
  382. g_buffer_process_ask_instance->get_main_chain()
  383. ->register_next_chain(
  384. g_data_connector_ask_instance);
  385. }
  386. } else {
  387. log4cplus_error("g_datasource_mode error:%d",
  388. g_datasource_mode);
  389. return DTC_CODE_FAILED;
  390. }
  391. }
  392. g_system_command_ask_instance = SystemCommandAskChain::get_instance(
  393. g_buffer_multi_thread ?: g_datasource_thread);
  394. if (NULL == g_system_command_ask_instance) {
  395. log4cplus_error(
  396. "create system command failed, errno[%d], msg[%s]",
  397. errno, strerror(errno));
  398. return DTC_CODE_FAILED;
  399. }
  400. g_system_command_ask_instance->get_main_chain()->register_next_chain(
  401. g_buffer_barrier_instance);
  402. log4cplus_debug("bind server control ok");
  403. g_job_hub_ask_instance = new JobHubAskChain(
  404. g_buffer_multi_thread ?: g_datasource_thread);
  405. g_job_hub_ask_instance->get_main_chain()->register_next_chain(
  406. g_system_command_ask_instance);
  407. g_agent_hub_ask_instance = new AgentHubAskChain(
  408. g_buffer_multi_thread ?: g_datasource_thread);
  409. g_agent_hub_ask_instance->get_main_chain()->register_next_chain(
  410. g_job_hub_ask_instance);
  411. agent_listener = new AgentListenPool();
  412. if (agent_listener->register_entrance_chain_multi_thread(
  413. g_dtc_config, g_agent_hub_ask_instance) < 0)
  414. return DTC_CODE_FAILED;
  415. int open_cnt = stat_open_fd();
  416. g_max_conn_cnt =
  417. g_dtc_config->get_int_val("cache", "MaxFdCount", 10240) -
  418. open_cnt - 10; // reserve 10 fds
  419. if (g_max_conn_cnt < 0) {
  420. log4cplus_error("MaxFdCount should large than %d",
  421. open_cnt + 10);
  422. return DTC_CODE_FAILED;
  423. }
  424. if (g_datasource_thread)
  425. g_datasource_thread->running_thread();
  426. if (g_remote_thread)
  427. g_remote_thread->running_thread();
  428. if (g_hot_backup_thread)
  429. g_hot_backup_thread->running_thread();
  430. if (g_buffer_multi_thread)
  431. g_buffer_multi_thread->running_thread();
  432. agent_listener->running_all_threads();
  433. return DTC_CODE_SUCCESS;
  434. }