connector.cc 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. */
  17. #include <stdio.h>
  18. #include <unistd.h>
  19. #include <sys/socket.h>
  20. #include <sys/stat.h>
  21. #include <sys/un.h>
  22. #include <sys/wait.h>
  23. #include <fcntl.h>
  24. #include <sched.h>
  25. // local include files
  26. #include "mysql_operation.h"
  27. // common include files
  28. #include "dtc_global.h"
  29. #include "version.h"
  30. #include "proc_title.h"
  31. #include "dtcutils.h"
  32. #include "log/log.h"
  33. #include "config/config.h"
  34. #include "daemon/daemon.h"
  35. #include "listener/listener.h"
  36. #include "socket/socket_addr.h"
  37. #include "socket/unix_socket.h"
  38. // core include files
  39. #include "buffer/buffer_pond.h"
  40. // daemons include files
  41. #include "daemon_listener.h"
  42. const char progname[] = "connector";
  43. static ConnectorProcess *conn_proc;
  44. static unsigned int proc_timeout;
  45. int target_new_hash;
  46. int hash_changing;
  47. static int sync_decode(DtcJob *task, int netfd, ConnectorProcess *conn_proc)
  48. {
  49. SimpleReceiver receiver(netfd);
  50. int code;
  51. do {
  52. code = task->do_decode(receiver);
  53. if (code == DecodeFatalError) {
  54. if (errno != 0)
  55. log4cplus_info("decode fatal error, fd=%d, %m",
  56. netfd);
  57. return -1;
  58. }
  59. if (code == DecodeDataError) {
  60. if (task->result_code() == 0 ||
  61. task->result_code() ==
  62. -EC_EXTRA_SECTION_DATA) // -EC_EXTRA_SECTION_DATA verify package
  63. return 0;
  64. log4cplus_info("decode error, fd=%d, %d", netfd,
  65. task->result_code());
  66. return -1;
  67. }
  68. conn_proc->set_title("Receiving...");
  69. } while (!stop && code != DecodeDone);
  70. if (task->result_code() < 0) {
  71. log4cplus_info("register result, fd=%d, %d", netfd,
  72. task->result_code());
  73. return -1;
  74. }
  75. return 0;
  76. }
  77. static int sync_send(Packet *reply, int netfd)
  78. {
  79. int code;
  80. do {
  81. code = reply->Send(netfd);
  82. if (code == SendResultError) {
  83. log4cplus_info("send error, fd=%d, %m", netfd);
  84. return -1;
  85. }
  86. } while (!stop && code != SendResultDone);
  87. return 0;
  88. }
  89. static void alarm_handler(int signo)
  90. {
  91. if (background == 0 && getppid() == 1)
  92. exit(0);
  93. alarm(10);
  94. }
  95. static int accept_connection(int fd)
  96. {
  97. conn_proc->set_title("listener");
  98. signal(SIGALRM, alarm_handler);
  99. while (!stop) {
  100. alarm(10);
  101. int newfd;
  102. if ((newfd = accept(fd, NULL, 0)) >= 0) {
  103. alarm(0);
  104. return newfd;
  105. }
  106. if (newfd < 0 && errno == EINVAL) {
  107. if (getppid() == (pid_t)1) {
  108. log4cplus_error(
  109. "dtc parent process not exist. helper[%d] exit now.",
  110. getpid());
  111. exit(0);
  112. }
  113. usleep(10000);
  114. }
  115. }
  116. exit(0);
  117. }
  118. static void proc_timeout_handler(int signo)
  119. {
  120. log4cplus_error(
  121. "mysql process timeout(more than %u seconds), helper[pid: %d] exit now.",
  122. proc_timeout, getpid());
  123. exit(-1);
  124. }
  125. #ifdef __DEBUG__
  126. static void inline simulate_helper_delay(void)
  127. {
  128. char *env = getenv("ENABLE_SIMULATE_DTC_HELPER_DELAY_SECOND");
  129. if (env && env[0] != 0) {
  130. unsigned delay_sec = atoi(env);
  131. if (delay_sec > 5)
  132. delay_sec = 5;
  133. log4cplus_debug("simulate dtc helper delay second[%d s]",
  134. delay_sec);
  135. sleep(delay_sec);
  136. }
  137. return;
  138. }
  139. #endif
  140. struct HelperParameter {
  141. int netfd;
  142. int gid;
  143. int role;
  144. };
  145. static int helper_proc_run(struct HelperParameter *args)
  146. {
  147. // close listen fd
  148. close(0);
  149. open("/dev/null", O_RDONLY);
  150. conn_proc->set_title("Initializing...");
  151. if (proc_timeout > 0)
  152. signal(SIGALRM, proc_timeout_handler);
  153. alarm(proc_timeout);
  154. if (conn_proc->do_init(
  155. args->gid, dbConfig,
  156. TableDefinitionManager::instance()->get_cur_table_def(),
  157. args->role) != 0) {
  158. log4cplus_error("%s", "helper process init failed");
  159. exit(-1);
  160. }
  161. conn_proc->init_ping_timeout();
  162. alarm(0);
  163. hash_changing = g_dtc_config->get_int_val("cache", "HashChanging", 0);
  164. target_new_hash =
  165. g_dtc_config->get_int_val("cache", "TargetNewHash", 0);
  166. unsigned int timeout;
  167. while (!stop) {
  168. conn_proc->set_title("Waiting...");
  169. DtcJob *task = new DtcJob(
  170. TableDefinitionManager::instance()->get_cur_table_def());
  171. if (sync_decode(task, args->netfd, conn_proc) < 0) {
  172. delete task;
  173. break;
  174. }
  175. if (task->result_code() == 0) {
  176. switch (task->request_code()) {
  177. case DRequest::Insert:
  178. case DRequest::Update:
  179. case DRequest::Delete:
  180. case DRequest::Replace:
  181. case DRequest::ReloadConfig:
  182. timeout = 2 * proc_timeout;
  183. default:
  184. timeout = proc_timeout;
  185. }
  186. alarm(timeout);
  187. #ifdef __DEBUG__
  188. simulate_helper_delay();
  189. #endif
  190. conn_proc->do_process(task);
  191. alarm(0);
  192. }
  193. conn_proc->set_title("Sending...");
  194. Packet *reply = new Packet;
  195. reply->encode_result(task);
  196. if (sync_send(reply, args->netfd) < 0) {
  197. delete reply;
  198. delete task;
  199. break;
  200. }
  201. delete reply;
  202. delete task;
  203. }
  204. close(args->netfd);
  205. conn_proc->set_title("Exiting...");
  206. delete conn_proc;
  207. daemon_cleanup();
  208. #if MEMCHECK
  209. log4cplus_info("%s v%s: stopped", progname, version);
  210. dump_non_delete();
  211. log4cplus_debug("memory allocated %lu virtual %lu", count_alloc_size(),
  212. count_virtual_size());
  213. #endif
  214. exit(0);
  215. return 0;
  216. }
  217. int check_db_version(void)
  218. {
  219. int ver = CDBConn::get_client_version();
  220. if (ver == MYSQL_VERSION_ID)
  221. return 0;
  222. log4cplus_warning(
  223. "MySql version mismatch: header=%d.%d.%d lib=%d.%d.%d",
  224. MYSQL_VERSION_ID / 10000, (MYSQL_VERSION_ID / 100) % 100,
  225. MYSQL_VERSION_ID % 100, ver / 10000, (ver / 100) % 100,
  226. ver % 100);
  227. return -1;
  228. }
  229. int check_db_table(int gid, int role)
  230. {
  231. ConnectorProcess *helper = new ConnectorProcess();
  232. if (proc_timeout > 1) {
  233. helper->set_proc_timeout(proc_timeout - 1);
  234. signal(SIGALRM, proc_timeout_handler);
  235. }
  236. alarm(proc_timeout);
  237. if (helper->do_init(
  238. gid, dbConfig,
  239. TableDefinitionManager::instance()->get_cur_table_def(),
  240. role) != 0) {
  241. log4cplus_error("%s", "helper process init failed");
  242. delete helper;
  243. alarm(0);
  244. return (-1);
  245. }
  246. if (helper->check_table() != 0) {
  247. delete helper;
  248. alarm(0);
  249. return (-2);
  250. }
  251. alarm(0);
  252. delete helper;
  253. return (0);
  254. }
  255. int main(int argc, char **argv)
  256. {
  257. init_proc_title(argc, argv);
  258. init_log4cplus();
  259. if (load_entry_parameter(argc, argv) < 0)
  260. return -1;
  261. check_db_version();
  262. argc -= optind;
  263. argv += optind;
  264. struct HelperParameter helperArgs = { 0, 0, 0 };
  265. char *addr = NULL;
  266. if (argc > 0) {
  267. char *p;
  268. helperArgs.gid = strtol(argv[0], &p, 0);
  269. if (*p == '\0' || *p == MACHINEROLESTRING[0])
  270. helperArgs.role = 0;
  271. else if (*p == MACHINEROLESTRING[1])
  272. helperArgs.role = 1;
  273. else {
  274. log4cplus_error("Bad machine id: %s", argv[0]);
  275. return -1;
  276. }
  277. }
  278. if (argc != 2 && argc != 3) {
  279. show_usage();
  280. return -1;
  281. }
  282. int usematch = g_dtc_config->get_int_val("cache",
  283. "UseMatchedAsAffectedRows", 1);
  284. int backlog = g_dtc_config->get_int_val("cache", "MaxListenCount", 256);
  285. int helperTimeout =
  286. g_dtc_config->get_int_val("cache", "HelperTimeout", 30);
  287. if (helperTimeout > 1)
  288. proc_timeout = helperTimeout - 1;
  289. else
  290. proc_timeout = 0;
  291. addr = argv[1];
  292. if (check_db_table(helperArgs.gid, helperArgs.role) != 0) {
  293. return -1;
  294. }
  295. int fd = -1;
  296. if (!strcmp(addr, "-"))
  297. fd = 0;
  298. else {
  299. std::string dtcid = g_dtc_config->get_config_node()["props"]["listener.port.dtc"].as<std::string>();
  300. if(dtcid != "none")
  301. {
  302. log4cplus_warning(
  303. "standalone %s need DTCID set to NONE",
  304. progname);
  305. return -1;
  306. }
  307. SocketAddress sockaddr;
  308. const char *err =
  309. sockaddr.set_address(addr, argc == 2 ? NULL : argv[2]);
  310. if (err) {
  311. log4cplus_warning("host %s port %s: %s", addr,
  312. argc == 2 ? "NULL" : argv[2], err);
  313. return -1;
  314. }
  315. if (sockaddr.socket_type() != SOCK_STREAM) {
  316. log4cplus_warning(
  317. "standalone %s don't support UDP protocol",
  318. progname);
  319. return -1;
  320. }
  321. fd = socket_bind(&sockaddr, backlog);
  322. if (fd < 0)
  323. return -1;
  324. }
  325. log4cplus_debug(
  326. "If you want to simulate db busy,"
  327. "you can set \"ENABLE_SIMULATE_DTC_HELPER_DELAY_SECOND=second\" before dtc startup");
  328. init_daemon();
  329. conn_proc = new ConnectorProcess();
  330. if (usematch)
  331. conn_proc->use_matched_rows();
  332. #if HAS_LOGAPI
  333. conn_proc->logapi.do_init(
  334. g_dtc_config->get_int_val("LogApi", "MessageId", 0),
  335. g_dtc_config->get_int_val("LogApi", "CallerId", 0),
  336. g_dtc_config->get_int_val("LogApi", "TargetId", 0),
  337. g_dtc_config->get_int_val("LogApi", "InterfaceId", 0));
  338. #endif
  339. conn_proc->init_title(helperArgs.gid, helperArgs.role);
  340. if (proc_timeout > 1)
  341. conn_proc->set_proc_timeout(proc_timeout - 1);
  342. while (!stop) {
  343. helperArgs.netfd = accept_connection(fd);
  344. char buf[16];
  345. memset(buf, 0, 16);
  346. buf[0] = WATCHDOG_INPUT_OBJECT;
  347. snprintf(buf + 1, 15, "%s", conn_proc->get_name());
  348. watch_dog_fork(buf, (int (*)(void *))helper_proc_run,
  349. (void *)&helperArgs);
  350. close(helperArgs.netfd);
  351. }
  352. if (fd > 0 && addr && addr[0] == '/')
  353. unlink(addr);
  354. return 0;
  355. }