data_connector_ask_chain.cc 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <algorithm>
  17. #include "list/list.h"
  18. #include "config/dbconfig.h"
  19. #include "connector/connector_group.h"
  20. #include "data_connector_ask_chain.h"
  21. #include "request/request_base.h"
  22. #include "task/task_request.h"
  23. #include "log/log.h"
  24. #include "key/key_helper.h"
  25. #include "stat_dtc.h"
  26. #include "protocol.h"
  27. #include "dtc_global.h"
  28. #include "listener/listener.h"
  29. #include "helper.h"
  30. #include "socket/unix_socket.h"
  31. extern const char *connector_name[];
  32. class GuardNotify : public JobAnswerInterface<DTCJobOperation> {
  33. public:
  34. GuardNotify(DataConnectorAskChain *o) : owner(o)
  35. {
  36. }
  37. ~GuardNotify()
  38. {
  39. }
  40. virtual void job_answer_procedure(DTCJobOperation *);
  41. private:
  42. DataConnectorAskChain *owner;
  43. };
  44. void GuardNotify::job_answer_procedure(DTCJobOperation *job)
  45. {
  46. log4cplus_debug("enter job_ask_procedure");
  47. if (job->result_code() >= 0)
  48. owner->guard->add_key(job->barrier_key(), job->packed_key());
  49. job->turn_around_job_answer();
  50. log4cplus_debug("leave job_ask_procedure");
  51. }
  52. DataConnectorAskChain::DataConnectorAskChain()
  53. : JobAskInterface<DTCJobOperation>(NULL), hasDummyMachine(0),
  54. guardReply(NULL), tableNo(0), guard(NULL)
  55. , p_task_dispatcher_(NULL)
  56. {
  57. dbConfig[0] = NULL;
  58. dbConfig[1] = NULL;
  59. groupMap[0] = NULL;
  60. groupMap[1] = NULL;
  61. groups[0] = NULL;
  62. groups[1] = NULL;
  63. /*总队列的统计,暂时还有意义,暂时保留*/
  64. statQueueCurCount = g_stat_mgr.get_stat_int_counter(CUR_QUEUE_COUNT);
  65. statQueueMaxCount = g_stat_mgr.get_stat_int_counter(MAX_QUEUE_COUNT);
  66. /*新增的四个组中最大的队列长度统计项,用来进行告警监控*/
  67. statReadQueueCurMaxCount = g_stat_mgr.get_stat_int_counter(
  68. HELPER_READ_GROUR_CUR_QUEUE_MAX_SIZE);
  69. statWriteQueueMaxCount = g_stat_mgr.get_stat_int_counter(
  70. HELPER_WRITE_GROUR_CUR_QUEUE_MAX_SIZE);
  71. statCommitQueueCurMaxCount = g_stat_mgr.get_stat_int_counter(
  72. HELPER_COMMIT_GROUR_CUR_QUEUE_MAX_SIZE);
  73. statSlaveReadQueueMaxCount = g_stat_mgr.get_stat_int_counter(
  74. HELPER_SLAVE_READ_GROUR_CUR_QUEUE_MAX_SIZE);
  75. }
  76. DataConnectorAskChain::~DataConnectorAskChain()
  77. {
  78. if (groups[0]) {
  79. for (int i = 0;
  80. i < dbConfig[0]->machineCnt * GROUPS_PER_MACHINE; i++)
  81. DELETE(groups[0][i]);
  82. FREE_CLEAR(groups[0]);
  83. }
  84. FREE_CLEAR(groupMap[0]);
  85. DELETE(guard);
  86. DELETE(guardReply);
  87. }
  88. ConnectorGroup *DataConnectorAskChain::select_group(DTCJobOperation *job)
  89. {
  90. const DTCValue *key = job->request_key();
  91. uint64_t uk;
  92. /* key-hash disable */
  93. if (dbConfig[0]->keyHashConfig.keyHashEnable == 0 || key == NULL) {
  94. if (NULL == key)
  95. uk = 0;
  96. else if (key->s64 < 0)
  97. uk = 0 - key->s64;
  98. else
  99. uk = key->s64;
  100. } else {
  101. switch (job->field_type(0)) {
  102. case DField::Signed:
  103. case DField::Unsigned:
  104. uk = dbConfig[0]->keyHashConfig.keyHashFunction(
  105. (const char *)&(key->u64), sizeof(key->u64),
  106. dbConfig[0]->keyHashConfig.keyHashLeftBegin,
  107. dbConfig[0]->keyHashConfig.keyHashRightBegin);
  108. break;
  109. case DField::String:
  110. case DField::Binary:
  111. uk = dbConfig[0]->keyHashConfig.keyHashFunction(
  112. key->bin.ptr, key->bin.len,
  113. dbConfig[0]->keyHashConfig.keyHashLeftBegin,
  114. dbConfig[0]->keyHashConfig.keyHashRightBegin);
  115. break;
  116. default:
  117. uk = 0;
  118. }
  119. }
  120. if (dbConfig[1]) {
  121. int idx = uk / dbConfig[1]->dbDiv % dbConfig[1]->dbMod;
  122. int machineId = groupMap[1][idx];
  123. ConnectorGroup *ptr = groups[1][machineId * GROUPS_PER_MACHINE];
  124. if (ptr != NULL && job->request_code() != DRequest::Get)
  125. return GROUP_READONLY;
  126. }
  127. int idx = uk / dbConfig[0]->dbDiv % dbConfig[0]->dbMod;
  128. int machineId = groupMap[0][idx];
  129. if (machineId == GMAP_NONE)
  130. return NULL;
  131. if (machineId == GMAP_DUMMY)
  132. return GROUP_DUMMY;
  133. ConnectorGroup **ptr = &groups[0][machineId * GROUPS_PER_MACHINE];
  134. if (job->request_code() == DRequest::Get && ptr[GROUPS_PER_ROLE] &&
  135. false == guard->in_set(job->barrier_key(), job->packed_key())) {
  136. int role = 0;
  137. switch (dbConfig[0]->mach[machineId].mode) {
  138. case BY_SLAVE:
  139. role = 1;
  140. break;
  141. case BY_DB:
  142. role = (uk / dbConfig[0]->dbDiv) & 1;
  143. case BY_TABLE:
  144. role = (uk / dbConfig[0]->tblDiv) & 1;
  145. case BY_KEY:
  146. role = job->barrier_key() & 1;
  147. }
  148. return ptr[role * GROUPS_PER_ROLE];
  149. }
  150. int g = job->request_type();
  151. while (--g >= 0) {
  152. if (ptr[g] != NULL) {
  153. return ptr[g];
  154. }
  155. }
  156. return NULL;
  157. }
  158. bool DataConnectorAskChain::is_commit_full(DTCJobOperation *job)
  159. {
  160. if (job->request_code() != DRequest::Replace)
  161. return false;
  162. ConnectorGroup *helperGroup = select_group(job);
  163. if (helperGroup == NULL || helperGroup == GROUP_DUMMY ||
  164. helperGroup == GROUP_READONLY)
  165. return false;
  166. if (helperGroup->queue_full()) {
  167. log4cplus_warning("NO FREE COMMIT QUEUE SLOT");
  168. helperGroup->dump_state();
  169. }
  170. return helperGroup->queue_full() ? true : false;
  171. }
  172. int DataConnectorAskChain::Cleanup()
  173. {
  174. newDb.clear();
  175. new2old.clear();
  176. return 0;
  177. }
  178. int DataConnectorAskChain::Cleanup2()
  179. {
  180. if (groups[1]) {
  181. for (int i = 0; i < dbConfig[1]->machineCnt; ++i) {
  182. std::vector<int>::iterator it =
  183. find(newDb.begin(), newDb.end(), i);
  184. if (it != newDb.end()) {
  185. for (int j = 0; j < GROUPS_PER_MACHINE; ++j) {
  186. DELETE(groups[1][j]);
  187. }
  188. }
  189. }
  190. FREE_CLEAR(groups[1]);
  191. }
  192. FREE_CLEAR(groupMap[1]);
  193. if (dbConfig[1]) {
  194. dbConfig[1]->destory();
  195. dbConfig[1] = NULL;
  196. }
  197. return 0;
  198. }
  199. int DataConnectorAskChain::build_helper_object(int idx)
  200. {
  201. if (groups[idx] != NULL) {
  202. log4cplus_error("groups[%d] exists", idx);
  203. return -1;
  204. }
  205. groups[idx] = (ConnectorGroup **)CALLOC(sizeof(ConnectorGroup *),
  206. dbConfig[idx]->machineCnt *
  207. GROUPS_PER_MACHINE);
  208. if (!groups[idx]) {
  209. log4cplus_error("malloc failed, %m");
  210. return -1;
  211. }
  212. DTCConfig* p_dtc_conf = dbConfig[idx]->cfgObj;
  213. int i_has_hwc = 0;
  214. if(p_dtc_conf)
  215. {
  216. if(p_dtc_conf->get_config_node()["primary"]["hot"])
  217. i_has_hwc = 1;
  218. }
  219. log4cplus_info("enable hwc:%d" , i_has_hwc);
  220. /* build helper object */
  221. for (int i = 0; i < dbConfig[idx]->machineCnt; i++) {
  222. if (dbConfig[idx]->mach[i].helperType == DUMMY_HELPER)
  223. continue;
  224. if (idx == 1 &&
  225. find(newDb.begin(), newDb.end(), i) == newDb.end()) {
  226. // if not new db mach, just continue, copy old mach when switch
  227. continue;
  228. }
  229. for (int j = 0; j < GROUPS_PER_MACHINE; j++) {
  230. if (dbConfig[idx]->mach[i].gprocs[j] == 0)
  231. continue;
  232. log4cplus_debug("start worker sequence: %d", j);
  233. char name[24];
  234. snprintf(name, sizeof(name), "%d%c%d", i,
  235. MACHINEROLESTRING[j / GROUPS_PER_ROLE],
  236. j % GROUPS_PER_ROLE);
  237. groups[idx][i * GROUPS_PER_MACHINE + j] =
  238. new ConnectorGroup(
  239. dbConfig[idx]
  240. ->mach[i]
  241. .role[j / GROUPS_PER_ROLE]
  242. .path,
  243. name, dbConfig[idx]->mach[i].gprocs[j],
  244. dbConfig[idx]->mach[i].gqueues[j],
  245. DTC_SQL_USEC_ALL,
  246. i_has_hwc);
  247. if (j >= GROUPS_PER_ROLE)
  248. groups[idx][i * GROUPS_PER_MACHINE + j]
  249. ->fallback =
  250. groups[idx][i * GROUPS_PER_MACHINE];
  251. log4cplus_debug("start worker %s", name);
  252. }
  253. }
  254. return 0;
  255. }
  256. int DataConnectorAskChain::build_master_group_mapping(int idx)
  257. {
  258. if (groupMap[idx] != NULL) {
  259. log4cplus_error("groupMap[%d] exist", idx);
  260. return -1;
  261. }
  262. groupMap[idx] = (short *)MALLOC(sizeof(short) *
  263. dbConfig[idx]->database_max_count);
  264. if (groupMap[idx] == NULL) {
  265. log4cplus_error("malloc error for groupMap[%d]", idx);
  266. return -1;
  267. }
  268. for (int i = 0; i < dbConfig[idx]->database_max_count; i++)
  269. groupMap[idx][i] = GMAP_NONE;
  270. log4cplus_info("machine cnt:%d", dbConfig[idx]->machineCnt);
  271. /* build master group mapping */
  272. for (int i = 0; i < dbConfig[idx]->machineCnt; i++) {
  273. int gm_id = i;
  274. log4cplus_info("helper type:%d", dbConfig[idx]->mach[i].helperType);
  275. if (dbConfig[idx]->mach[i].helperType == DUMMY_HELPER) {
  276. gm_id = GMAP_DUMMY;
  277. hasDummyMachine = 1;
  278. } else if (dbConfig[idx]->mach[i].procs == 0) {
  279. log4cplus_error("procs=0 at idx:%d, i: %d", idx, i);
  280. continue;
  281. }
  282. log4cplus_info("mach[%d].dbCnt: %d", i, dbConfig[idx]->mach[i].dbCnt);
  283. for (int j = 0; j < dbConfig[idx]->mach[i].dbCnt; j++) {
  284. const int db = dbConfig[idx]->mach[i].dbIdx[j];
  285. if (groupMap[idx][db] >= 0) {
  286. log4cplus_error(
  287. "duplicate machine, db %d machine %d %d",
  288. db, groupMap[idx][db] + 1, i + 1);
  289. return -1;
  290. }
  291. groupMap[idx][db] = gm_id;
  292. }
  293. }
  294. for (int i = 0; i < dbConfig[idx]->database_max_count; ++i) {
  295. log4cplus_info("database_max_count:%d, idx: %d", dbConfig[idx]->database_max_count, idx);
  296. if (groupMap[idx][i] == GMAP_NONE) {
  297. log4cplus_error(
  298. "db completeness check error, db %d not found",
  299. i);
  300. return -1;
  301. }
  302. }
  303. return 0;
  304. }
  305. DbConfig *DataConnectorAskChain::get_db_config(DTCJobOperation *job)
  306. {
  307. RowValue row(job->table_definition());
  308. DTCConfig *config = NULL;
  309. DbConfig *newdb = NULL;
  310. // parse db config
  311. if (!job->request_operation()) {
  312. log4cplus_error("table.yaml not found when migrate db");
  313. job->set_error(-EC_DATA_NEEDED, "group collect",
  314. "migrate db need table.yaml");
  315. return NULL;
  316. }
  317. job->update_row(row);
  318. log4cplus_debug("strlen: %ld, row[3].bin.ptr: %s",
  319. strlen(row[3].bin.ptr), row[3].bin.ptr);
  320. char *buf = row[3].bin.ptr;
  321. config = new DTCConfig();
  322. if (config->load_yaml_buffer(buf) !=
  323. 0) {
  324. log4cplus_error(
  325. "table.yaml illeagl when migrate db, parse error");
  326. job->set_error(-EC_ERR_MIGRATEDB_ILLEGAL, "group collect",
  327. "table.yaml illegal, parse error");
  328. delete config;
  329. return NULL;
  330. }
  331. if ((newdb = DbConfig::Load(config)) == NULL) {
  332. log4cplus_error(
  333. "table.yaml illeagl when migrate db, load error");
  334. job->set_error(-EC_ERR_MIGRATEDB_ILLEGAL, "group collect",
  335. "table.yaml illegal, load error");
  336. return NULL;
  337. }
  338. return newdb;
  339. }
  340. int DataConnectorAskChain::migrate_db(DTCJobOperation *job)
  341. {
  342. int ret = 0;
  343. DbConfig *newDbConfig = get_db_config(job);
  344. if (newDbConfig == NULL)
  345. return -2;
  346. if (dbConfig[1]) {
  347. bool same = dbConfig[1]->Compare(newDbConfig, true);
  348. newDbConfig->destory();
  349. if (!same) {
  350. log4cplus_error("new table.yaml when migrating db");
  351. job->set_error(-EC_ERR_MIGRATEDB_MIGRATING,
  352. "group collect",
  353. "new table.yaml when migrating db");
  354. return -2;
  355. }
  356. log4cplus_info("duplicate table.yaml when migrating db");
  357. job->set_error(-EC_ERR_MIGRATEDB_DUPLICATE, "group collect",
  358. "duplicate table.yaml when migrating db");
  359. return 0;
  360. }
  361. // check are others fields same
  362. if (!newDbConfig->Compare(dbConfig[0], false)) {
  363. newDbConfig->destory();
  364. log4cplus_error("new table.yaml does not match old one");
  365. job->set_error(-EC_ERR_MIGRATEDB_DISTINCT, "group collect",
  366. "new table.yaml does not match old one");
  367. return -2;
  368. }
  369. // set read only on new db
  370. dbConfig[1] = newDbConfig;
  371. // find new db
  372. dbConfig[1]->find_new_mach(dbConfig[0], newDb, new2old);
  373. log4cplus_debug("found %ld new db machine", newDb.size());
  374. if (newDb.size() == 0) {
  375. log4cplus_error(
  376. "table.yaml does not contain new db when migrate db");
  377. job->set_error(-EC_DATA_NEEDED, "group collect",
  378. "table.yaml does not contain new db");
  379. return -1;
  380. }
  381. // check db completeness of new db config
  382. if (build_master_group_mapping(1) != 0) {
  383. log4cplus_error("table.yaml db mapping is not complete");
  384. job->set_error(-EC_DATA_NEEDED, "group collect",
  385. "table.yaml db mapping is not complete");
  386. return -1;
  387. }
  388. // save new table.yaml as table%d.conf
  389. char tableName[64];
  390. snprintf(tableName, 64, "/etc/dtc/dtc%d.conf", tableNo);
  391. log4cplus_debug("table.yaml: %s", tableName);
  392. if (dbConfig[1]->cfgObj->Dump(tableName, true) != 0) {
  393. log4cplus_error("save table.yaml as table2.conf error");
  394. job->set_error(-EC_SERVER_ERROR, "group collect",
  395. "save table.yaml as table2.conf error");
  396. return -1;
  397. }
  398. // start listener, connect db, check access, start worker
  399. if ((ret = start_listener(job)) != 0)
  400. return ret;
  401. ++tableNo;
  402. // start worker and create class member variable
  403. if (build_helper_object(1) != 0) {
  404. log4cplus_error("verify connect error: %m");
  405. job->set_error(-EC_ERR_MIGRATEDB_HELPER, "group collect",
  406. "start helper worker error");
  407. return -1;
  408. }
  409. // disable commit as none async
  410. disable_commit_group(1);
  411. set_timer_handler(recvList, connList, retryList, 1);
  412. return 0;
  413. }
  414. int DataConnectorAskChain::switch_db(DTCJobOperation *job)
  415. {
  416. if (!dbConfig[1]) {
  417. log4cplus_info("migrate db not start");
  418. job->set_error(-EC_ERR_MIGRATEDB_NOT_START, "group collect",
  419. "migrate db not start");
  420. return -2;
  421. }
  422. DbConfig *newDbConfig = get_db_config(job);
  423. if (newDbConfig == NULL)
  424. return -2;
  425. // check is table same
  426. bool same = newDbConfig->Compare(dbConfig[1], true);
  427. newDbConfig->destory();
  428. if (!same) {
  429. log4cplus_error("switch db with different table.yaml");
  430. job->set_error(-EC_ERR_MIGRATEDB_DISTINCT, "group collect",
  431. "switch db with different table.yaml");
  432. return -2;
  433. }
  434. // start worker helper
  435. do_attach(NULL, 1);
  436. // switch to new, unset read only
  437. std::swap(dbConfig[0], dbConfig[1]);
  438. std::swap(groups[0], groups[1]);
  439. std::swap(groupMap[0], groupMap[1]);
  440. // copy old client
  441. for (int i = 0; i < dbConfig[0]->machineCnt; ++i) {
  442. if (dbConfig[0]->mach[i].helperType == DUMMY_HELPER)
  443. continue;
  444. if (find(newDb.begin(), newDb.end(), i) != newDb.end())
  445. continue;
  446. memmove(groups[0] + i * GROUPS_PER_MACHINE,
  447. groups[1] + new2old[i] * GROUPS_PER_MACHINE,
  448. sizeof(ConnectorGroup *) * GROUPS_PER_MACHINE);
  449. log4cplus_debug("copy old client ptr: %p",
  450. *(groups[0] + i * GROUPS_PER_MACHINE));
  451. }
  452. // release old
  453. FREE_CLEAR(groupMap[1]);
  454. FREE_CLEAR(groups[1]);
  455. dbConfig[1]->destory();
  456. dbConfig[1] = NULL;
  457. // write conf file
  458. dbConfig[0]->cfgObj->Dump("/etc/dtc/dtc.yaml", false);
  459. Cleanup();
  460. return 0;
  461. }
  462. int DataConnectorAskChain::notify_watch_dog(StartHelperPara *para)
  463. {
  464. char buf[16];
  465. if (sizeof(*para) > 15)
  466. return -1;
  467. char *env = getenv(ENV_WATCHDOG_SOCKET_FD);
  468. int fd = env == NULL ? -1 : atoi(env);
  469. if (fd > 0) {
  470. memset(buf, 0, 16);
  471. buf[0] = WATCHDOG_INPUT_HELPER;
  472. log4cplus_debug("sizeof(*para): %ld", sizeof(*para));
  473. memcpy(buf + 1, para, sizeof(*para));
  474. send(fd, buf, sizeof(buf), 0);
  475. return 0;
  476. } else {
  477. return -2;
  478. }
  479. }
  480. int DataConnectorAskChain::start_listener(DTCJobOperation *job)
  481. {
  482. int ret = 0;
  483. log4cplus_debug("starting new db listener...");
  484. int nh = 0;
  485. dbConfig[1]->set_helper_path(getppid());
  486. for (std::vector<int>::iterator it = newDb.begin(); it != newDb.end();
  487. ++it) {
  488. // start listener
  489. HELPERTYPE t = dbConfig[1]->mach[*it].helperType;
  490. log4cplus_debug("helper type = %d", t);
  491. if (DTC_HELPER >= t)
  492. continue;
  493. for (int r = 0; r < ROLES_PER_MACHINE; ++r) {
  494. int i, n = 0;
  495. for (i = 0;
  496. i < GROUPS_PER_ROLE &&
  497. (r * GROUPS_PER_ROLE + i) < GROUPS_PER_MACHINE;
  498. ++i)
  499. n += dbConfig[1]
  500. ->mach[*it]
  501. .gprocs[r * GROUPS_PER_ROLE + i];
  502. if (n <= 0)
  503. continue;
  504. StartHelperPara para;
  505. para.type = t;
  506. para.backlog = n + 1;
  507. para.mach = *it;
  508. para.role = r;
  509. para.conf = DBHELPER_TABLE_NEW;
  510. para.num = tableNo;
  511. if ((ret = notify_watch_dog(&para)) < 0) {
  512. log4cplus_error(
  513. "notify daemons error for group %d role %d, ret: %d",
  514. *it, r, ret);
  515. return -1;
  516. }
  517. ++nh;
  518. }
  519. }
  520. log4cplus_info("%d helper listener started", nh);
  521. return 0;
  522. }
  523. void DataConnectorAskChain::job_ask_procedure(DTCJobOperation *job)
  524. {
  525. if (DRequest::ReloadConfig == job->request_code() &&
  526. TaskTypeHelperReloadConfig == job->request_type()) {
  527. collect_notify_helper_reload_config(job);
  528. return;
  529. }
  530. int ret = 0;
  531. if (job->request_code() == DRequest::TYPE_SYSTEM_COMMAND) {
  532. switch (job->requestInfo.admin_code()) {
  533. case DRequest::SystemCommand::MigrateDB:
  534. log4cplus_debug(
  535. "GroupCollect::job_ask_procedure DRequest::TYPE_SYSTEM_COMMAND::MigrateDB");
  536. ret = migrate_db(job);
  537. if (ret == -1) {
  538. Cleanup2();
  539. Cleanup();
  540. }
  541. job->turn_around_job_answer();
  542. return;
  543. case DRequest::SystemCommand::MigrateDBSwitch:
  544. log4cplus_debug(
  545. "GroupCollect::job_ask_procedure DRequest::TYPE_SYSTEM_COMMAND::MigrateDBSwitch");
  546. ret = switch_db(job);
  547. job->turn_around_job_answer();
  548. return;
  549. default:
  550. // this should not happen
  551. log4cplus_error("unknown admin code: %d",
  552. job->requestInfo.admin_code());
  553. job->set_error(-EC_SERVER_ERROR, "helper collect",
  554. "unkown admin code");
  555. job->turn_around_job_answer();
  556. return;
  557. }
  558. }
  559. ConnectorGroup *helperGroup = select_group(job);
  560. if (helperGroup == NULL) {
  561. log4cplus_error("Key not belong to this server");
  562. job->set_error(-EC_OUT_OF_KEY_RANGE,
  563. "GroupCollect::job_ask_procedure",
  564. "Key not belong to this server");
  565. job->turn_around_job_answer();
  566. } else if (helperGroup == GROUP_DUMMY) {
  567. job->mark_as_black_hole();
  568. job->turn_around_job_answer();
  569. } else if (helperGroup == GROUP_READONLY) {
  570. log4cplus_debug(
  571. "try to do non read op on a key which belongs db which is migrating");
  572. job->set_error(
  573. -EC_SERVER_ERROR, "helper collect",
  574. "try to do non read op on a key which belongs db which is migrating");
  575. job->turn_around_job_answer();
  576. } else {
  577. if (job->request_type() == TaskTypeWrite && guardReply != NULL)
  578. job->push_reply_dispatcher(guardReply);
  579. helperGroup->job_ask_procedure(job);
  580. }
  581. stat_helper_group_queue_count(groups[0], dbConfig[0]->machineCnt *
  582. GROUPS_PER_MACHINE);
  583. stat_helper_group_cur_max_queue_count(job->request_type());
  584. }
  585. int DataConnectorAskChain::load_config(DbConfig *cfg, int keysize, int idx)
  586. {
  587. dbConfig[0] = cfg;
  588. int ret = 0;
  589. if ((ret = build_master_group_mapping(idx)) != 0) {
  590. log4cplus_error("build master group map error, ret: %d", ret);
  591. return ret;
  592. }
  593. if ((ret = build_helper_object(idx)) != 0) {
  594. log4cplus_error("build helper object error, ret: %d", ret);
  595. return ret;
  596. }
  597. if (dbConfig[0]->slaveGuard) {
  598. guard = new KeyHelper(keysize, dbConfig[0]->slaveGuard);
  599. guardReply = new GuardNotify(this);
  600. }
  601. return 0;
  602. }
  603. int DataConnectorAskChain::renew_config(struct DbConfig *cfg)
  604. {
  605. dbConfig[1] = cfg;
  606. std::swap(dbConfig[0], dbConfig[1]);
  607. dbConfig[1]->destory();
  608. dbConfig[1] = NULL;
  609. return 0;
  610. }
  611. int DataConnectorAskChain::do_attach(PollerBase *thread, int idx)
  612. {
  613. if (idx == 0)
  614. JobAskInterface<DTCJobOperation>::attach_thread(thread);
  615. for (int i = 0;
  616. i < dbConfig[idx]->machineCnt * GROUPS_PER_MACHINE;
  617. i++) {
  618. if (groups[idx][i]) {
  619. groups[idx][i]->do_attach(owner, &task_queue_allocator);
  620. assert(p_task_dispatcher_ != NULL);
  621. groups[idx][i]->BindHbLogDispatcher(p_task_dispatcher_);
  622. }
  623. }
  624. return 0;
  625. }
  626. void DataConnectorAskChain::set_timer_handler(TimerList *recv, TimerList *conn,
  627. TimerList *retry, int idx)
  628. {
  629. if (idx == 0) {
  630. recvList = recv;
  631. connList = conn;
  632. retryList = retry;
  633. }
  634. for (int i = 0; i < dbConfig[idx]->machineCnt; i++) {
  635. if (dbConfig[idx]->mach[i].helperType == DUMMY_HELPER)
  636. continue;
  637. for (int j = 0; j < GROUPS_PER_MACHINE; j++) {
  638. if (dbConfig[idx]->mach[i].gprocs[j] == 0)
  639. continue;
  640. if (groups[idx][i * GROUPS_PER_MACHINE + j])
  641. groups[idx][i * GROUPS_PER_MACHINE + j]
  642. ->set_timer_handler(recv, conn, retry);
  643. }
  644. }
  645. }
  646. int DataConnectorAskChain::disable_commit_group(int idx)
  647. {
  648. if (groups[idx] == NULL)
  649. return 0;
  650. for (int i = 2; i < dbConfig[idx]->machineCnt * GROUPS_PER_MACHINE;
  651. i += GROUPS_PER_MACHINE) {
  652. DELETE(groups[idx][i]);
  653. }
  654. return 0;
  655. }
  656. void DataConnectorAskChain::stat_helper_group_queue_count(
  657. ConnectorGroup **groups, unsigned group_count)
  658. {
  659. unsigned total_queue_count = 0;
  660. unsigned total_queue_max_count = 0;
  661. for (unsigned i = 0; i < group_count; ++i) {
  662. if (groups[i]) {
  663. total_queue_count += groups[i]->queue_count();
  664. total_queue_max_count += groups[i]->queue_max_count();
  665. }
  666. }
  667. statQueueCurCount = total_queue_count;
  668. statQueueMaxCount = total_queue_max_count;
  669. return;
  670. }
  671. int DataConnectorAskChain::get_queue_cur_max_count(int iColumn)
  672. {
  673. int max_count = 0;
  674. if ((iColumn < 0) || (iColumn >= GROUPS_PER_MACHINE)) {
  675. return max_count;
  676. }
  677. for (int row = 0; row < dbConfig[0]->machineCnt; row++) {
  678. /*read组是在group矩阵的第一列*/
  679. ConnectorGroup *readGroup =
  680. groups[0][GROUPS_PER_MACHINE * row + iColumn];
  681. if (NULL == readGroup) {
  682. continue;
  683. }
  684. if (readGroup->queue_count() > max_count) {
  685. max_count = readGroup->queue_count();
  686. log4cplus_debug("the group queue max_count is %d ",
  687. max_count);
  688. }
  689. }
  690. return max_count;
  691. }
  692. /*传入请求类型,每次只根据请求类型统计响应的值*/
  693. void DataConnectorAskChain::stat_helper_group_cur_max_queue_count(
  694. int iRequestType)
  695. {
  696. /*根据请求类型分辨不出来是主读还是备读(和Workload配置有关),只好同时即统计主读组又统计备读组了*/
  697. /*除非遍历group矩阵里的指针值和selectgroup后的group指针比较,然后再对比矩阵列,这个更复杂*/
  698. if (TaskTypeRead == iRequestType) {
  699. statReadQueueCurMaxCount =
  700. get_queue_cur_max_count(MASTER_READ_GROUP_COLUMN);
  701. statSlaveReadQueueMaxCount =
  702. get_queue_cur_max_count(SLAVE_READ_GROUP_COLUMN);
  703. }
  704. if (TaskTypeWrite == iRequestType) {
  705. statWriteQueueMaxCount =
  706. get_queue_cur_max_count(MASTER_WRITE_GROUP_COLUMN);
  707. }
  708. if (TaskTypeCommit == iRequestType) {
  709. statCommitQueueCurMaxCount =
  710. get_queue_cur_max_count(MASTER_COMMIT_GROUP_COLUMN);
  711. }
  712. }
  713. void DataConnectorAskChain::collect_notify_helper_reload_config(
  714. DTCJobOperation *job)
  715. {
  716. unsigned int uiGroupNum = 0;
  717. unsigned int uiNullGroupNum = 0;
  718. for (int machineid = 0; machineid < dbConfig[0]->machineCnt;
  719. ++machineid) {
  720. ConnectorGroup **ptr =
  721. &groups[0][machineid * GROUPS_PER_MACHINE];
  722. for (int groupid = 0; groupid < GROUPS_PER_MACHINE; ++groupid) {
  723. ++uiGroupNum;
  724. ConnectorGroup *pHelperGroup = ptr[groupid];
  725. if (NULL == pHelperGroup ||
  726. GROUP_DUMMY == pHelperGroup ||
  727. GROUP_READONLY == pHelperGroup) {
  728. ++uiNullGroupNum;
  729. continue;
  730. }
  731. pHelperGroup->job_ask_procedure(job);
  732. }
  733. }
  734. if (uiGroupNum == uiNullGroupNum) {
  735. log4cplus_error(
  736. "not have available helpergroup, please check!");
  737. job->set_error(-EC_NOT_HAVE_AVAILABLE_HELPERGROUP,
  738. "helper collect",
  739. "not have available helpergroup");
  740. }
  741. log4cplus_error(
  742. "groupcollect notify work helper reload config finished!");
  743. job->turn_around_job_answer();
  744. }