main.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. * Author: Linjinming, linjinming@jd.com
  17. Qiulu, choulu@jd.com
  18. Yangshuang, yangshuang68@jd.com
  19. Zhulin, shzhulin3@jd.com
  20. Chenyujie, chenyujie28@jd.com
  21. Wuxinzhen, wuxinzhen1@jd.com
  22. Caopei, caopei11@jd.com
  23. */
  24. #include "main_supply.h"
  25. #include "../misc/dtc_code.h"
  26. using namespace ClusterConfig;
  27. const char project_name[] = "core";
  28. const char usage_argv[] = "";
  29. BufferProcessAskChain *g_buffer_process_ask_instance = NULL;
  30. BarrierAskAnswerChain *g_buffer_barrier_instance = NULL;
  31. BufferBypassAskChain *g_buffer_bypass_ask_instance = NULL;
  32. BarrierAskAnswerChain *g_connector_barrier_instance = NULL;
  33. SystemCommandAskChain *g_system_command_ask_instance = NULL;
  34. DataConnectorAskChain *g_data_connector_ask_instance = NULL;
  35. HotBackupAskChain *g_hot_backup_ask_instance = NULL;
  36. KeyRouteAskChain *g_key_route_ask_instance = NULL;
  37. AgentHubAskChain *g_agent_hub_ask_instance = NULL;
  38. JobHubAskChain *g_job_hub_ask_instance = NULL;
  39. RemoteDtcAskAnswerChain *g_remote_dtc_instance = NULL;
  40. BlackHoleAskChain *g_black_hole_ask_instance = NULL;
  41. AgentListenPool *agent_listener = NULL;
  42. ListenerPool *main_listener = NULL;
  43. //single thread mode only use below two thread instance.
  44. PollerBase *g_main_thread = NULL;
  45. PollerBase *g_hot_backup_thread = NULL;
  46. //below thread instance used in multi thread mode.
  47. //remote dispatcher,in order to migrate data to remote dtc.
  48. PollerBase *g_remote_thread = NULL;
  49. PollerBase *g_buffer_multi_thread = NULL;
  50. PollerBase *g_datasource_thread = NULL;
  51. PluginManager *main_plugin_mgr;
  52. int g_max_conn_cnt;
  53. int enable_plugin;
  54. int init_plugin;
  55. int cache_key;
  56. int g_datasource_mode;
  57. int async_update;
  58. int g_target_new_hash;
  59. int g_hash_changing;
  60. char cache_file[256] = CACHE_CONF_NAME;
  61. char table_file[256] = TABLE_CONF_NAME;
  62. extern DTCConfig *g_dtc_config;
  63. extern void init_task_executor(const char *, AgentListenPool *,
  64. JobAskInterface<DTCJobOperation> *);
  65. int collect_load_config(DbConfig *dbconfig);
  66. // second part of entry
  67. static int init_thread(void *dummy);
  68. // thread startp
  69. static int single_thread_mode_initiazation();
  70. static int multiple_thread_mode_initiazation();
  71. #ifdef _CORE_
  72. int main(int argc, char *argv[])
  73. {
  74. init_proc_title(argc, argv);
  75. init_log4cplus();
  76. if (load_entry_parameter(argc, argv) < 0)
  77. return DTC_CODE_FAILED;
  78. if (g_dtc_config->get_int_val("cache", "EnableCoreDump", 1))
  79. init_core_dump();
  80. if (init_config_info())
  81. return DTC_CODE_FAILED;
  82. if (init_statistics())
  83. return DTC_CODE_FAILED;
  84. if (init_cache_mode() < 0)
  85. return DTC_CODE_FAILED;
  86. if (init_daemon() < 0)
  87. return DTC_CODE_FAILED;
  88. Thread::set_auto_config_instance(
  89. g_dtc_config->get_auto_config_instance("cache"));
  90. log4cplus_debug("entry start dtc");
  91. if (start_dtc(init_thread, NULL) < 0)
  92. {
  93. log4cplus_debug("entry will exit");
  94. abort();
  95. return DTC_CODE_FAILED;
  96. }
  97. log4cplus_debug("entry start dtc finished.");
  98. if (init_thread(NULL))
  99. return DTC_CODE_FAILED;
  100. Logger::shutdown();
  101. return DTC_CODE_SUCCESS;
  102. }
  103. #endif
  104. //main thread initialization.
  105. static int init_thread(void *dummy)
  106. {
  107. int ret = DTC_CODE_SUCCESS;
  108. Thread *root_thread =
  109. new Thread("dtc-thread-root", Thread::ThreadTypeProcess);
  110. if (root_thread != NULL)
  111. root_thread->initialize_thread();
  112. if (daemon_set_fd_limit(g_dtc_config->get_int_val("cache", "MaxFdCount",
  113. 10240)) < 0)
  114. return DTC_CODE_FAILED;
  115. //start statistic thread.
  116. g_stat_mgr.start_background_thread();
  117. int thread_mode =
  118. g_dtc_config->get_int_val("cache", "UseSingleThread", 1);
  119. //choose mode for single/multiple thread.
  120. switch (thread_mode) {
  121. case SINGLE_THREAD_MODE:
  122. ret = single_thread_mode_initiazation();
  123. break;
  124. case MULTIPLE_THREAD_MODE:
  125. ret = multiple_thread_mode_initiazation();
  126. break;
  127. default:
  128. log4cplus_error("thread mode error:%d", thread_mode);
  129. }
  130. if (ret == DTC_CODE_SUCCESS) {
  131. init_task_executor(TableDefinitionManager::instance()
  132. ->get_cur_table_def()
  133. ->table_name(),
  134. agent_listener,
  135. g_system_command_ask_instance);
  136. log4cplus_info("--------%s-v%s BEGIN!--------", project_name,
  137. version);
  138. daemon_wait();
  139. } else {
  140. log4cplus_error(
  141. "thread initilization failed, now prepare to free resource and exit.");
  142. }
  143. log4cplus_info("--------%s-v%s free resource now --------",
  144. project_name, version);
  145. free_all_resource();
  146. return ret;
  147. }
  148. //chain of responsibility pattern.
  149. static int single_thread_mode_initiazation()
  150. {
  151. // 1. thread initilization.
  152. if (init_main_chain_thread())
  153. return DTC_CODE_FAILED;
  154. if (init_hotbackup_chain_thread())
  155. return DTC_CODE_FAILED;
  156. // 2. chain initilization.
  157. if (init_remote_dtc_chain(g_main_thread) < 0)
  158. return DTC_CODE_FAILED;
  159. if (g_datasource_mode == DTC_MODE_DATABASE_ADDITION) {
  160. if (init_data_connector_ask_chain(g_main_thread) < 0)
  161. return DTC_CODE_FAILED;
  162. }
  163. if (g_datasource_mode != DTC_MODE_DATABASE_ONLY) {
  164. if (init_buffer_process_ask_chain(g_main_thread) < 0)
  165. return DTC_CODE_FAILED;
  166. }
  167. int max_barrier_count =
  168. g_dtc_config->get_int_val("cache", "MaxBarrierCount", 100000);
  169. int max_key_count =
  170. g_dtc_config->get_int_val("cache", "MaxKeyCount", 10000);
  171. g_buffer_barrier_instance =
  172. new BarrierAskAnswerChain(g_main_thread, max_barrier_count,
  173. max_key_count,
  174. BarrierAskAnswerChain::IN_FRONT);
  175. if (g_datasource_mode == DTC_MODE_DATABASE_ONLY) {
  176. g_buffer_bypass_ask_instance =
  177. new BufferBypassAskChain(g_main_thread);
  178. //Step 4 : barrier_cache bind bypass_unit
  179. g_buffer_barrier_instance->get_main_chain()->register_next_chain(
  180. g_buffer_bypass_ask_instance);
  181. //Step 5 : bypass_unit bind helper_unit
  182. g_buffer_bypass_ask_instance->get_main_chain()
  183. ->register_next_chain(g_data_connector_ask_instance);
  184. } else {
  185. g_key_route_ask_instance = new KeyRouteAskChain(
  186. g_main_thread, TableDefinitionManager::instance()
  187. ->get_cur_table_def()
  188. ->key_format());
  189. //ClusterConfig
  190. if (!check_and_create()) {
  191. log4cplus_error(
  192. "check_and_create cluster config error.");
  193. return DTC_CODE_FAILED;
  194. } else {
  195. log4cplus_debug("check_and_create cluster config ok.");
  196. }
  197. std::vector<ClusterNode> clusterConf;
  198. if (!parse_cluster_config(&clusterConf)) {
  199. log4cplus_error("parse_cluster_config error");
  200. return DTC_CODE_FAILED;
  201. } else {
  202. log4cplus_debug("parse_cluster_config ok");
  203. }
  204. g_key_route_ask_instance->init(clusterConf);
  205. if (g_key_route_ask_instance->load_node_state_if_any() != 0) {
  206. log4cplus_error("key route init error!");
  207. return DTC_CODE_FAILED;
  208. } else {
  209. log4cplus_debug("keyRoute->do_init ok");
  210. }
  211. //Setp 4 : barrier_cache bind key_route
  212. g_buffer_barrier_instance->get_main_chain()->register_next_chain(
  213. g_key_route_ask_instance);
  214. //Step 5 : key_route bind cache_process
  215. g_key_route_ask_instance->get_main_chain()->register_next_chain(
  216. g_buffer_process_ask_instance);
  217. //Step 5 : key_route bind remote_client
  218. g_key_route_ask_instance->get_remote_chain()
  219. ->register_next_chain(g_remote_dtc_instance);
  220. //Step 6 : cache_process bind remoteClinet
  221. g_buffer_process_ask_instance->get_remote_chain()
  222. ->register_next_chain(g_remote_dtc_instance);
  223. //Step 6 : cache_process bind hotback_process
  224. g_buffer_process_ask_instance->get_hotbackup_chain()
  225. ->register_next_chain(g_hot_backup_ask_instance);
  226. if (g_datasource_mode == DTC_MODE_CACHE_ONLY) {
  227. g_black_hole_ask_instance =
  228. new BlackHoleAskChain(g_main_thread);
  229. //Step 6 : cache_process bind hole
  230. g_buffer_process_ask_instance->get_main_chain()
  231. ->register_next_chain(
  232. g_black_hole_ask_instance);
  233. } else if (g_datasource_mode == DTC_MODE_DATABASE_ADDITION) {
  234. if (g_buffer_process_ask_instance->update_mode() ||
  235. g_buffer_process_ask_instance->is_mem_dirty()) {
  236. g_connector_barrier_instance =
  237. new BarrierAskAnswerChain(
  238. g_main_thread,
  239. max_barrier_count,
  240. max_key_count,
  241. BarrierAskAnswerChain::IN_BACK);
  242. //Step 6 : cache_process bind barrier_helper
  243. g_buffer_process_ask_instance->get_main_chain()
  244. ->register_next_chain(
  245. g_connector_barrier_instance);
  246. //Step 7 : barrier_helper bind helper_unit
  247. g_connector_barrier_instance->get_main_chain()
  248. ->register_next_chain(
  249. g_data_connector_ask_instance);
  250. } else {
  251. //Step 6 : cache_process bind helper_unit
  252. g_buffer_process_ask_instance->get_main_chain()
  253. ->register_next_chain(
  254. g_data_connector_ask_instance);
  255. }
  256. } else {
  257. log4cplus_error("g_datasource_mode error:%d",
  258. g_datasource_mode);
  259. return DTC_CODE_FAILED;
  260. }
  261. }
  262. g_system_command_ask_instance =
  263. SystemCommandAskChain::get_instance(g_main_thread);
  264. if (NULL == g_system_command_ask_instance) {
  265. log4cplus_error(
  266. "create system command failed, errno[%d], msg[%s]",
  267. errno, strerror(errno));
  268. return DTC_CODE_FAILED;
  269. }
  270. //Step 3 : system_command_instance bind bar_cache
  271. g_system_command_ask_instance->get_main_chain()->register_next_chain(
  272. g_buffer_barrier_instance);
  273. g_job_hub_ask_instance = new JobHubAskChain(g_main_thread);
  274. //Step 2 : multi_plexer bind system_command_instance
  275. g_job_hub_ask_instance->get_main_chain()->register_next_chain(
  276. g_system_command_ask_instance);
  277. g_agent_hub_ask_instance = new AgentHubAskChain(g_main_thread);
  278. //Step 1 : agent_process bind multi_plexer
  279. g_agent_hub_ask_instance->get_main_chain()->register_next_chain(
  280. g_job_hub_ask_instance);
  281. //Step 0 : chain of responsibility entrance.
  282. agent_listener = new AgentListenPool();
  283. if (agent_listener->register_entrance_chain(
  284. g_dtc_config, g_agent_hub_ask_instance, g_main_thread) < 0)
  285. return DTC_CODE_FAILED;
  286. int open_cnt = stat_open_fd();
  287. g_max_conn_cnt =
  288. g_dtc_config->get_int_val("cache", "MaxFdCount", 10240) -
  289. open_cnt - 10; // reserve 10 fds
  290. if (g_max_conn_cnt < 0) {
  291. log4cplus_error("MaxFdCount should large than %d",
  292. open_cnt + 10);
  293. return DTC_CODE_FAILED;
  294. }
  295. // start thread....
  296. if (g_main_thread)
  297. g_main_thread->running_thread();
  298. if (g_hot_backup_thread)
  299. g_hot_backup_thread->running_thread();
  300. return DTC_CODE_SUCCESS;
  301. }
  302. static int multiple_thread_mode_initiazation()
  303. {
  304. if (init_hotbackup_chain_thread())
  305. return DTC_CODE_FAILED;
  306. if (init_data_connector_chain_thread() < 0)
  307. return DTC_CODE_FAILED;
  308. if (init_remote_dtc_chain_thread() < 0)
  309. return DTC_CODE_FAILED;
  310. if (g_datasource_mode == DTC_MODE_DATABASE_ONLY) {
  311. g_data_connector_ask_instance->disable_commit_group();
  312. } else if (init_buffer_process_ask_chain_thread() < 0) {
  313. return DTC_CODE_FAILED;
  314. }
  315. if (g_datasource_mode == DTC_MODE_DATABASE_ADDITION) {
  316. g_data_connector_ask_instance->do_attach(g_datasource_thread);
  317. }
  318. int iMaxBarrierCount =
  319. g_dtc_config->get_int_val("cache", "MaxBarrierCount", 100000);
  320. int iMaxKeyCount =
  321. g_dtc_config->get_int_val("cache", "MaxKeyCount", 10000);
  322. g_buffer_barrier_instance = new BarrierAskAnswerChain(
  323. g_buffer_multi_thread ?: g_datasource_thread, iMaxBarrierCount,
  324. iMaxKeyCount, BarrierAskAnswerChain::IN_FRONT);
  325. if (g_datasource_mode == DTC_MODE_DATABASE_ONLY) {
  326. g_buffer_bypass_ask_instance =
  327. new BufferBypassAskChain(g_datasource_thread);
  328. g_buffer_barrier_instance->get_main_chain()->register_next_chain(
  329. g_buffer_bypass_ask_instance);
  330. g_buffer_bypass_ask_instance->get_main_chain()
  331. ->register_next_chain(g_data_connector_ask_instance);
  332. } else {
  333. g_key_route_ask_instance =
  334. new KeyRouteAskChain(g_buffer_multi_thread,
  335. TableDefinitionManager::instance()
  336. ->get_cur_table_def()
  337. ->key_format());
  338. if (!check_and_create()) {
  339. log4cplus_error("check_and_create error");
  340. return DTC_CODE_FAILED;
  341. } else {
  342. log4cplus_debug("check_and_create ok");
  343. }
  344. std::vector<ClusterNode> clusterConf;
  345. if (!parse_cluster_config(&clusterConf)) {
  346. log4cplus_error("parse_cluster_config error");
  347. return DTC_CODE_FAILED;
  348. } else {
  349. log4cplus_debug("parse_cluster_config ok");
  350. }
  351. g_key_route_ask_instance->init(clusterConf);
  352. if (g_key_route_ask_instance->load_node_state_if_any() != 0) {
  353. log4cplus_error("key route init error!");
  354. return DTC_CODE_FAILED;
  355. }
  356. log4cplus_debug("keyRoute->do_init ok");
  357. g_buffer_barrier_instance->get_main_chain()->register_next_chain(
  358. g_key_route_ask_instance);
  359. g_key_route_ask_instance->get_main_chain()->register_next_chain(
  360. g_buffer_process_ask_instance);
  361. g_key_route_ask_instance->get_remote_chain()
  362. ->register_next_chain(g_remote_dtc_instance);
  363. g_buffer_process_ask_instance->get_remote_chain()
  364. ->register_next_chain(g_remote_dtc_instance);
  365. g_buffer_process_ask_instance->get_hotbackup_chain()
  366. ->register_next_chain(g_hot_backup_ask_instance);
  367. if (g_datasource_mode == DTC_MODE_CACHE_ONLY) {
  368. g_black_hole_ask_instance =
  369. new BlackHoleAskChain(g_datasource_thread);
  370. g_buffer_process_ask_instance->get_main_chain()
  371. ->register_next_chain(
  372. g_black_hole_ask_instance);
  373. } else if (g_datasource_mode == DTC_MODE_DATABASE_ADDITION) {
  374. if (g_buffer_process_ask_instance->update_mode() ||
  375. g_buffer_process_ask_instance->is_mem_dirty()) {
  376. g_connector_barrier_instance =
  377. new BarrierAskAnswerChain(
  378. g_datasource_thread,
  379. iMaxBarrierCount, iMaxKeyCount,
  380. BarrierAskAnswerChain::IN_BACK);
  381. g_buffer_process_ask_instance->get_main_chain()
  382. ->register_next_chain(
  383. g_connector_barrier_instance);
  384. g_connector_barrier_instance->get_main_chain()
  385. ->register_next_chain(
  386. g_data_connector_ask_instance);
  387. } else {
  388. g_buffer_process_ask_instance->get_main_chain()
  389. ->register_next_chain(
  390. g_data_connector_ask_instance);
  391. }
  392. } else {
  393. log4cplus_error("g_datasource_mode error:%d",
  394. g_datasource_mode);
  395. return DTC_CODE_FAILED;
  396. }
  397. }
  398. g_system_command_ask_instance = SystemCommandAskChain::get_instance(
  399. g_buffer_multi_thread ?: g_datasource_thread);
  400. if (NULL == g_system_command_ask_instance) {
  401. log4cplus_error(
  402. "create system command failed, errno[%d], msg[%s]",
  403. errno, strerror(errno));
  404. return DTC_CODE_FAILED;
  405. }
  406. g_system_command_ask_instance->get_main_chain()->register_next_chain(
  407. g_buffer_barrier_instance);
  408. log4cplus_debug("bind server control ok");
  409. g_job_hub_ask_instance = new JobHubAskChain(
  410. g_buffer_multi_thread ?: g_datasource_thread);
  411. g_job_hub_ask_instance->get_main_chain()->register_next_chain(
  412. g_system_command_ask_instance);
  413. g_agent_hub_ask_instance = new AgentHubAskChain(
  414. g_buffer_multi_thread ?: g_datasource_thread);
  415. g_agent_hub_ask_instance->get_main_chain()->register_next_chain(
  416. g_job_hub_ask_instance);
  417. agent_listener = new AgentListenPool();
  418. if (agent_listener->register_entrance_chain_multi_thread(
  419. g_dtc_config, g_agent_hub_ask_instance) < 0)
  420. return DTC_CODE_FAILED;
  421. int open_cnt = stat_open_fd();
  422. g_max_conn_cnt =
  423. g_dtc_config->get_int_val("cache", "MaxFdCount", 10240) -
  424. open_cnt - 10; // reserve 10 fds
  425. if (g_max_conn_cnt < 0) {
  426. log4cplus_error("MaxFdCount should large than %d",
  427. open_cnt + 10);
  428. return DTC_CODE_FAILED;
  429. }
  430. if (g_datasource_thread)
  431. g_datasource_thread->running_thread();
  432. if (g_remote_thread)
  433. g_remote_thread->running_thread();
  434. if (g_hot_backup_thread)
  435. g_hot_backup_thread->running_thread();
  436. if (g_buffer_multi_thread)
  437. g_buffer_multi_thread->running_thread();
  438. agent_listener->running_all_threads();
  439. return DTC_CODE_SUCCESS;
  440. }