helper.cc 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. */
  17. #include <sys/un.h>
  18. #include "socket/unix_socket.h"
  19. #include "config/dbconfig.h"
  20. #include "thread/thread.h"
  21. #include "helper.h"
  22. #include "daemon/daemon.h"
  23. #include "log/log.h"
  24. #include "dtc_global.h"
  25. #include <sstream>
  26. #include <unistd.h>
  27. WatchDogHelper::WatchDogHelper(WatchDog *watchdog, int sec, const char *path,
  28. int machine_conf, int role, int backlog,
  29. int type, int conf, int num)
  30. : WatchDogDaemon(watchdog, sec)
  31. {
  32. std::stringstream oss;
  33. oss << "helper" << machine_conf << MACHINEROLESTRING[role];
  34. memcpy(watchdog_object_name_, oss.str().c_str(), oss.str().length());
  35. path_ = path;
  36. backlog_ = backlog;
  37. type_ = type;
  38. conf_ = conf;
  39. num_ = num;
  40. }
  41. WatchDogHelper::~WatchDogHelper(void)
  42. {
  43. }
  44. const char *connector_name[] = {
  45. NULL, NULL, "connector", "custom_connector", "custom_connector",
  46. };
  47. void WatchDogHelper::exec()
  48. {
  49. struct sockaddr_un unaddr;
  50. int len = init_unix_socket_address(&unaddr, path_);
  51. int listenfd = socket(unaddr.sun_family, SOCK_STREAM, 0);
  52. bind(listenfd, (sockaddr *)&unaddr, len);
  53. int i_ret = listen(listenfd, backlog_);
  54. if (i_ret == 0) {
  55. log4cplus_info("backlog_:%d" , backlog_);
  56. }
  57. /* relocate listenfd to stdin */
  58. dup2(listenfd, 0);
  59. close(listenfd);
  60. log4cplus_info("exec path:%s" , path_);
  61. char *argv[9];
  62. int argc = 0;
  63. argv[argc++] = NULL;
  64. argv[argc++] = (char *)"-d";
  65. if (strcmp(daemons_cache_file, CACHE_CONF_NAME)) {
  66. argv[argc++] = (char *)"-f";
  67. argv[argc++] = daemons_cache_file;
  68. }
  69. if (conf_ == DBHELPER_TABLE_NEW) {
  70. argv[argc++] = (char *)"-t";// 4
  71. char tableName[64];
  72. snprintf(tableName, 64, "../conf/dtc%d.yaml", num_);
  73. argv[argc++] = tableName; // 5
  74. } else if (conf_ == DBHELPER_TABLE_ORIGIN &&
  75. strcmp(daemons_table_file, TABLE_CONF_NAME)) {
  76. argv[argc++] = (char *)"-t";
  77. argv[argc++] = daemons_table_file;
  78. }
  79. argv[argc++] = watchdog_object_name_ + 6; // 6 : 0m
  80. argv[argc++] = (char *)"-"; // 7
  81. argv[argc++] = NULL; // 8
  82. Thread *helperThread =
  83. new Thread(watchdog_object_name_, Thread::ThreadTypeProcess);
  84. helperThread->initialize_thread();
  85. char filedir[260] = {0};
  86. char filepath[260] = {0};
  87. char fn[260] = {0};
  88. snprintf(fn, sizeof(fn), "/proc/%d/exe", getpid());
  89. int rv = readlink(fn, filedir, sizeof(filedir) - 1);
  90. if(rv > 0)
  91. {
  92. filedir[rv] = '\0';
  93. std::string str = filedir;
  94. rv = str.rfind('/');
  95. strcpy(filedir, str.substr(0, rv).c_str());
  96. }
  97. sprintf(filepath, "%s/%s", filedir, connector_name[type_]);
  98. log4cplus_info("connector path:%s", filepath);
  99. argv[0] = filepath;
  100. execv(argv[0], argv);
  101. log4cplus_error("helper[%s] execv error: %m", argv[0]);
  102. }
  103. int WatchDogHelper::verify()
  104. {
  105. struct sockaddr_un unaddr;
  106. int len = init_unix_socket_address(&unaddr, path_);
  107. log4cplus_info("verify path:%s." , path_);
  108. /* delay 100ms and verify socket */
  109. //usleep(5000 * 1000);
  110. sleep(2);
  111. int s = socket(unaddr.sun_family, SOCK_STREAM, 0);
  112. if (connect(s, (sockaddr *)&unaddr, len) < 0) {
  113. close(s);
  114. log4cplus_error("verify connect: %m");
  115. return -1;
  116. }
  117. log4cplus_info("verify success.");
  118. close(s);
  119. return watchdog_object_pid_;
  120. }