db_main.cc 15 KB


  1. /*
  2. * =====================================================================================
  3. *
  4. * Filename: db_main.cc
  5. *
  6. * Description:
  7. *
  8. * Version: 1.0
  9. * Created: 09/08/2020 10:02:05 PM
  10. * Revision: none
  11. * Compiler: gcc
  12. *
  13. * Author: Norton, yangshuang68@jd.com
  14. * Company: JD.com, Inc.
  15. *
  16. * =====================================================================================
  17. */
  18. #if 1
  19. #include <stdio.h>
  20. #include <stdlib.h>
  21. #include <unistd.h>
  22. #include <sys/socket.h>
  23. #include <sys/stat.h>
  24. #include <sys/un.h>
  25. #include <sys/wait.h>
  26. #include <fcntl.h>
  27. #include <sched.h>
  28. #include <assert.h>
  29. #include <thread>
  30. #include <dtc_global.h>
  31. #include <version.h>
  32. #include <proc_title.h>
  33. #include <log.h>
  34. #include <config.h>
  35. #include "table_def_manager.h"
  36. #include <daemon.h>
  37. #include <listener.h>
  38. #include <net_addr.h>
  39. #include <unix_socket.h>
  40. #include <watchdog_listener.h>
  41. #include "dtcutils.h"
  42. #include "rocksdb_conn.h"
  43. #include "db_process_rocks.h"
  44. #include "rocksdb_direct_process.h"
  45. #include "rocksdb_replication.h"
  46. extern void _set_remote_log_config_(const char *addr, int port, int businessid);
  47. const char progname[] = "rocksdb_helper";
  48. const char usage_argv[] = "machId addr [port]";
  49. char cacheFile[256] = CACHE_CONF_NAME;
  50. char tableFile[256] = TABLE_CONF_NAME;
  51. HelperProcessBase* helperProc;
  52. static unsigned int procTimeout;
  53. static RocksDBConn *gRocksdbConn;
  54. std::string gRocksdbPath = "../rocksdb_data";
  55. std::string gRocksDirectAccessPath = "/tmp/domain_socket/";
  56. static RocksdbDirectProcess *gRocksdbDirectProcess;
  57. int targetNewHash;
  58. int hashChanging;
  59. static int sync_decode(DTCTask *task, int netfd, HelperProcessBase *helperProc)
  60. {
  61. SimpleReceiver receiver(netfd);
  62. int code;
  63. do
  64. {
  65. code = task->Decode(receiver);
  66. if (code == DecodeFatalError)
  67. {
  68. if (errno != 0)
  69. log_notice("decode fatal error, fd=%d, %m", netfd);
  70. log_info("decode error!!!!!");
  71. return -1;
  72. }
  73. if (code == DecodeDataError)
  74. {
  75. if (task->result_code() == 0 || task->result_code() == -EC_EXTRA_SECTION_DATA) // -EC_EXTRA_SECTION_DATA verify package
  76. return 0;
  77. log_notice("decode error, fd=%d, %d", netfd, task->result_code());
  78. return -1;
  79. }
  80. helperProc->set_title("Receiving...");
  81. } while (!stop && code != DecodeDone);
  82. if (task->result_code() < 0)
  83. {
  84. log_notice("register result, fd=%d, %d", netfd, task->result_code());
  85. return -1;
  86. }
  87. return 0;
  88. }
  89. static int sync_send(Packet *reply, int netfd)
  90. {
  91. int code;
  92. do
  93. {
  94. code = reply->Send(netfd);
  95. if (code == SendResultError)
  96. {
  97. log_notice("send error, fd=%d, %m", netfd);
  98. return -1;
  99. }
  100. } while (!stop && code != SendResultDone);
  101. return 0;
  102. }
  103. static void alarm_handler(int signo)
  104. {
  105. if (background == 0 && getppid() == 1)
  106. exit(0);
  107. alarm(10);
  108. }
  109. static int accept_connection(int fd)
  110. {
  111. helperProc->set_title("listener");
  112. signal(SIGALRM, alarm_handler);
  113. while (!stop)
  114. {
  115. alarm(10);
  116. int newfd;
  117. if ((newfd = accept(fd, NULL, 0)) >= 0)
  118. {
  119. alarm(0);
  120. return newfd;
  121. }
  122. if (newfd < 0 && errno == EINVAL)
  123. {
  124. if (getppid() == (pid_t)1)
  125. {
  126. log_error("dtc father process not exist. helper[%d] exit now.", getpid());
  127. exit(0);
  128. }
  129. log_info("parent process close the connection!");
  130. usleep(10000);
  131. }
  132. }
  133. exit(0);
  134. }
  135. static void proc_timeout_handler(int signo)
  136. {
  137. log_error("mysql process timeout(more than %u seconds), helper[pid: %d] exit now.", procTimeout, getpid());
  138. exit(-1);
  139. }
  140. #ifdef __DEBUG__
  141. static void inline simulate_helper_delay(void)
  142. {
  143. char *env = getenv("ENABLE_SIMULATE_DTC_HELPER_DELAY_SECOND");
  144. if (env && env[0] != 0)
  145. {
  146. unsigned delay_sec = atoi(env);
  147. if (delay_sec > 5)
  148. delay_sec = 5;
  149. log_debug("simulate dtc helper delay second[%d s]", delay_sec);
  150. sleep(delay_sec);
  151. }
  152. return;
  153. }
  154. #endif
  155. struct THelperProcParameter
  156. {
  157. int netfd;
  158. int gid;
  159. int role;
  160. };
  161. static int helper_proc_run(struct THelperProcParameter *args)
  162. {
  163. // close listen fd
  164. close(0);
  165. open("/dev/null", O_RDONLY);
  166. helperProc->set_title("Initializing...");
  167. if (procTimeout > 0)
  168. signal(SIGALRM, proc_timeout_handler);
  169. alarm(procTimeout);
  170. if (helperProc->Init(args->gid, dbConfig, TableDefinitionManager::Instance()->get_cur_table_def(), args->role) != 0)
  171. {
  172. log_error("%s", "helper process init failed");
  173. exit(-1);
  174. }
  175. helperProc->init_ping_timeout();
  176. alarm(0);
  177. _set_remote_log_config_(gConfig->get_str_val("cache", "RemoteLogAddr"),
  178. gConfig->get_int_val("cache", "RemoteLogPort", 0),
  179. dtc::utils::get_bid());
  180. _set_remote_log_fd_();
  181. hashChanging = gConfig->get_int_val("cache", "HashChanging", 0);
  182. targetNewHash = gConfig->get_int_val("cache", "TargetNewHash", 0);
  183. unsigned int timeout;
  184. while (!stop)
  185. {
  186. helperProc->set_title("Waiting...");
  187. DTCTask *task = new DTCTask(TableDefinitionManager::Instance()->get_cur_table_def());
  188. if (sync_decode(task, args->netfd, helperProc) < 0)
  189. {
  190. log_info("sync decode failed!");
  191. delete task;
  192. break;
  193. }
  194. if (task->result_code() == 0)
  195. {
  196. switch (task->request_code())
  197. {
  198. case DRequest::Insert:
  199. case DRequest::Update:
  200. case DRequest::Delete:
  201. case DRequest::Replace:
  202. case DRequest::ReloadConfig:
  203. case DRequest::Replicate:
  204. case DRequest::LocalMigrate:
  205. timeout = 2 * procTimeout;
  206. default:
  207. timeout = procTimeout;
  208. }
  209. alarm(timeout);
  210. #ifdef __DEBUG__
  211. simulate_helper_delay();
  212. #endif
  213. helperProc->process_task(task);
  214. alarm(0);
  215. }
  216. helperProc->set_title("Sending...");
  217. Packet *reply = new Packet;
  218. reply->encode_result(task);
  219. if (sync_send(reply, args->netfd) < 0)
  220. {
  221. delete reply;
  222. delete task;
  223. break;
  224. log_info("sync send failed!");
  225. }
  226. delete reply;
  227. delete task;
  228. }
  229. close(args->netfd);
  230. helperProc->set_title("Exiting...");
  231. delete helperProc;
  232. daemon_cleanup();
  233. #if MEMCHECK
  234. log_info("%s v%s: stopped", progname, version);
  235. dump_non_delete();
  236. log_debug("memory allocated %lu virtual %lu", count_alloc_size(), count_virtual_size());
  237. #endif
  238. log_info("helper exit!");
  239. exit(0);
  240. return 0;
  241. }
  242. static int helper_proc_run_rocks(struct THelperProcParameter args)
  243. {
  244. log_info("xx77xx11 test multiple thread model! threadId:%d, fd:%d", std::this_thread::get_id(), args.netfd);
  245. open("/dev/null", O_RDONLY);
  246. helperProc->set_title("Initializing...");
  247. if (procTimeout > 0)
  248. signal(SIGALRM, proc_timeout_handler);
  249. alarm(procTimeout);
  250. // helperProc->init_ping_timeout();
  251. alarm(0);
  252. _set_remote_log_config_(gConfig->get_str_val("cache", "RemoteLogAddr"),
  253. gConfig->get_int_val("cache", "RemoteLogPort", 0),
  254. dtc::utils::get_bid());
  255. _set_remote_log_fd_();
  256. hashChanging = gConfig->get_int_val("cache", "HashChanging", 0);
  257. targetNewHash = gConfig->get_int_val("cache", "TargetNewHash", 0);
  258. unsigned int timeout;
  259. while (!stop)
  260. {
  261. helperProc->set_title("Waiting...");
  262. DTCTask *task = new DTCTask(TableDefinitionManager::Instance()->get_cur_table_def());
  263. if (sync_decode(task, args.netfd, helperProc) < 0)
  264. {
  265. log_info("sync decode failed!");
  266. delete task;
  267. break;
  268. }
  269. log_info("recieve request, threadId:%d", std::this_thread::get_id());
  270. if (task->result_code() == 0)
  271. {
  272. switch (task->request_code())
  273. {
  274. case DRequest::Insert:
  275. case DRequest::Update:
  276. case DRequest::Delete:
  277. case DRequest::Replace:
  278. case DRequest::ReloadConfig:
  279. case DRequest::Replicate:
  280. case DRequest::LocalMigrate:
  281. timeout = 2 * procTimeout;
  282. default:
  283. timeout = procTimeout;
  284. }
  285. alarm(timeout);
  286. #ifdef __DEBUG__
  287. simulate_helper_delay();
  288. #endif
  289. helperProc->process_task(task);
  290. alarm(0);
  291. }
  292. helperProc->set_title("Sending...");
  293. Packet *reply = new Packet;
  294. reply->encode_result(task);
  295. if (sync_send(reply, args.netfd) < 0)
  296. {
  297. delete reply;
  298. delete task;
  299. break;
  300. log_info("sync send failed!");
  301. }
  302. delete reply;
  303. delete task;
  304. }
  305. close(args.netfd);
  306. helperProc->set_title("Exiting...");
  307. daemon_cleanup();
  308. #if MEMCHECK
  309. log_info("%s v%s: stopped", progname, version);
  310. dump_non_delete();
  311. log_debug("memory allocated %lu virtual %lu", count_alloc_size(), count_virtual_size());
  312. #endif
  313. log_info("helper exit!");
  314. return 0;
  315. }
  316. // check version base on DB type
  317. int check_db_version(void)
  318. {
  319. // dbConfig->dstype = 1;
  320. switch (dbConfig->dstype)
  321. {
  322. case 0:
  323. default:
  324. case 2:
  325. {
  326. log_debug("no need to check rocksdb!");
  327. break;
  328. }
  329. }
  330. return -1;
  331. }
  332. int check_db_table(int gid, int role)
  333. {
  334. HelperProcessBase *helper;
  335. switch (dbConfig->dstype)
  336. {
  337. case 0:
  338. default:
  339. case 2:
  340. // no table concept in rocksdb, no need to check
  341. log_error("no need to check table in rocksdb storage!");
  342. return 0;
  343. }
  344. if (procTimeout > 1)
  345. {
  346. helper->set_proc_timeout(procTimeout - 1);
  347. signal(SIGALRM, proc_timeout_handler);
  348. }
  349. alarm(procTimeout);
  350. log_debug("begin initialize gauss db");
  351. if (helper->Init(gid, dbConfig, TableDefinitionManager::Instance()->get_cur_table_def(), role) != 0)
  352. {
  353. log_error("%s", "helper process init failed");
  354. delete helper;
  355. alarm(0);
  356. return (-1);
  357. }
  358. if (helper->check_table() != 0)
  359. {
  360. delete helper;
  361. alarm(0);
  362. return (-2);
  363. }
  364. alarm(0);
  365. delete helper;
  366. return (0);
  367. }
  368. int create_rocks_domain_dir()
  369. {
  370. // create domain socket directory
  371. int ret = access(gRocksDirectAccessPath.c_str(), F_OK);
  372. if (ret != 0)
  373. {
  374. int err = errno;
  375. if (errno == ENOENT)
  376. {
  377. // create log dir
  378. if (mkdir(gRocksDirectAccessPath.c_str(), 0755) != 0)
  379. {
  380. log_error("create rocksdb domain socket dir failed! path:%s, errno:%d", gRocksDirectAccessPath.c_str(), errno);
  381. return -1;
  382. }
  383. }
  384. else
  385. {
  386. log_error("access rocksdb domain socket dir failed!, path:%s, errno:%d", gRocksDirectAccessPath.c_str(), errno);
  387. return -1;
  388. }
  389. }
  390. return 0;
  391. }
  392. // process rocksdb direct access
  393. int rocks_direct_access_proc()
  394. {
  395. log_error("Rocksdb direct access channel open!");
  396. std::string socketPath = gRocksDirectAccessPath;
  397. std::string dtcDeployAddr = dbConfig->cfgObj->get_str_val("cache", "BindAddr");
  398. SocketAddress sockAddr;
  399. const char *strRet = sockAddr.set_address(dtcDeployAddr.c_str());
  400. if (strRet)
  401. {
  402. log_error("parse dtc bind addr failed, errmsg:%s", strRet);
  403. return -1;
  404. }
  405. int dtcDeployPort;
  406. switch (sockAddr.addr->sa_family)
  407. {
  408. case AF_INET:
  409. dtcDeployPort = ntohs(sockAddr.in4->sin_port);
  410. break;
  411. case AF_INET6:
  412. dtcDeployPort = ntohs(sockAddr.in6->sin6_port);
  413. break;
  414. default:
  415. log_error("unsupport addr type! addr:%s, type:%d", dtcDeployAddr.c_str(), sockAddr.addr->sa_family);
  416. return -1;
  417. }
  418. assert(dtcDeployPort > 0);
  419. socketPath.append("rocks_direct_").append(std::to_string(dtcDeployPort)).append(".sock");
  420. gRocksdbDirectProcess = new RocksdbDirectProcess(socketPath, helperProc);
  421. if (!gRocksdbDirectProcess)
  422. {
  423. log_error("create RocksdbDirectProcess failed!");
  424. return -1;
  425. }
  426. int ret = gRocksdbDirectProcess->init();
  427. if (ret != 0)
  428. return -1;
  429. return gRocksdbDirectProcess->run_process();
  430. }
  431. int main(int argc, char **argv)
  432. {
  433. init_proc_title(argc, argv);
  434. if (dtc_daemon_init(argc, argv) < 0)
  435. return -1;
  436. check_db_version();
  437. argc -= optind;
  438. argv += optind;
  439. struct THelperProcParameter helperArgs = {0, 0, 0};
  440. char *addr = NULL;
  441. if (argc > 0)
  442. {
  443. char *p;
  444. helperArgs.gid = strtol(argv[0], &p, 0);
  445. if (*p == '\0' || *p == MACHINEROLESTRING[0])
  446. helperArgs.role = 0;
  447. else if (*p == MACHINEROLESTRING[1])
  448. helperArgs.role = 1;
  449. else
  450. {
  451. log_error("Bad machine id: %s", argv[0]);
  452. return -1;
  453. }
  454. }
  455. if (argc != 2 && argc != 3)
  456. {
  457. show_usage();
  458. return -1;
  459. }
  460. int usematch = gConfig->get_int_val("cache", "UseMatchedAsAffectedRows", 1);
  461. int backlog = gConfig->get_int_val("cache", "MaxListenCount", 256);
  462. int helperTimeout = gConfig->get_int_val("cache", "HelperTimeout", 30);
  463. if (helperTimeout > 1)
  464. procTimeout = helperTimeout - 1;
  465. else
  466. procTimeout = 0;
  467. addr = argv[1];
  468. log_error("helper listen addr:%s", addr);
  469. if (dbConfig->checkTable && check_db_table(helperArgs.gid, helperArgs.role) != 0)
  470. {
  471. return -1;
  472. }
  473. int fd = -1;
  474. if (!strcmp(addr, "-"))
  475. fd = 0;
  476. else
  477. {
  478. if (strcasecmp(gConfig->get_str_val("cache", "CacheShmKey") ?: "", "none") != 0)
  479. {
  480. log_warning("standalone %s need CacheShmKey set to NONE", progname);
  481. return -1;
  482. }
  483. SocketAddress sockaddr;
  484. const char *err = sockaddr.set_address(addr, argc == 2 ? NULL : argv[2]);
  485. if (err)
  486. {
  487. log_warning("host %s port %s: %s", addr, argc == 2 ? "NULL" : argv[2], err);
  488. return -1;
  489. }
  490. if (sockaddr.socket_type() != SOCK_STREAM)
  491. {
  492. log_warning("standalone %s don't support UDP protocol", progname);
  493. return -1;
  494. }
  495. fd = sock_bind(&sockaddr, backlog);
  496. if (fd < 0)
  497. return -1;
  498. }
  499. log_info("helper listen fd:%d", fd);
  500. log_debug("If you want to simulate db busy,"
  501. "you can set \"ENABLE_SIMULATE_DTC_HELPER_DELAY_SECOND=second\" before dtc startup");
  502. daemon_start();
  503. // create helper instance base on database type
  504. switch (dbConfig->dstype)
  505. {
  506. default:
  507. case 2:
  508. {
  509. // rocksdb
  510. gRocksdbConn = RocksDBConn::Instance();
  511. helperProc = new RocksdbProcess(gRocksdbConn);
  512. assert(helperProc);
  513. int ret = helperProc->Init(helperArgs.gid, dbConfig, TableDefinitionManager::Instance()->get_cur_table_def(), helperArgs.role);
  514. if (ret != 0)
  515. {
  516. log_error("%s", "helper process init failed");
  517. return -1;
  518. }
  519. gRocksdbConn->set_key_type(TableDefinitionManager::Instance()->get_cur_table_def()->key_type());
  520. ret = gRocksdbConn->Open(gRocksdbPath);
  521. assert(ret == 0);
  522. // start direct rocksdb access channel
  523. ret = create_rocks_domain_dir();
  524. if (ret != 0)
  525. return -1;
  526. ret = rocks_direct_access_proc();
  527. if (ret != 0) return -1;
  528. // start master replication listener
  529. ret = helperProc->startReplListener();
  530. if (ret != 0) return -1;
  531. break;
  532. }
  533. }
  534. if (usematch)
  535. helperProc->use_matched_rows();
  536. #if HAS_LOGAPI
  537. helperProc->logapi.Init(
  538. gConfig->get_int_val("LogApi", "MessageId", 0),
  539. gConfig->get_int_val("LogApi", "CallerId", 0),
  540. gConfig->get_int_val("LogApi", "TargetId", 0),
  541. gConfig->get_int_val("LogApi", "InterfaceId", 0));
  542. #endif
  543. helperProc->init_title(helperArgs.gid, helperArgs.role);
  544. if (procTimeout > 1)
  545. helperProc->set_proc_timeout(procTimeout - 1);
  546. while (!stop)
  547. {
  548. helperArgs.netfd = accept_connection(fd);
  549. char buf[16];
  550. memset(buf, 0, 16);
  551. buf[0] = WATCHDOG_INPUT_OBJECT;
  552. log_info("xx77xx11 procName:%s", helperProc->Name());
  553. snprintf(buf + 1, 15, "%s", helperProc->Name());
  554. log_info("fork child helper! fd:%d", helperArgs.netfd);
  555. if (dbConfig->dstype != 2)
  556. {
  557. watch_dog_fork(buf, (int (*)(void *))helper_proc_run, (void *)&helperArgs);
  558. close(helperArgs.netfd);
  559. }
  560. else
  561. {
  562. // rocksdb use multiple thread mode
  563. std::thread runner(helper_proc_run_rocks, helperArgs);
  564. runner.detach();
  565. }
  566. }
  567. /* close global rocksdb connection */
  568. if (gRocksdbConn)
  569. {
  570. int ret = gRocksdbConn->Close();
  571. if (ret != 0)
  572. {
  573. log_error("close rocksdb connection failed, rockspath:%s", gRocksdbPath.c_str());
  574. }
  575. }
  576. log_info("helper main process exist!");
  577. if (fd > 0 && addr && addr[0] == '/')
  578. unlink(addr);
  579. return 0;
  580. }
  581. #endif