hwc_sync_unit.cc 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. #include "hwc_sync_unit.h"
  2. #include <string>
  3. #include <sys/time.h>
  4. // local
  5. #include "comm.h"
  6. // connector
  7. #include "mysql_operation.h"
  8. // libs/api/cc_api/include/
  9. #include "dtcapi.h"
  10. #include "mysqld_error.h"
  11. HwcSync::HwcSync(DTC::Server* p_server)
  12. : i_limit_(1)
  13. , p_master_(p_server)
  14. , o_journal_id_(CComm::registor.JournalId())
  15. { }
  16. HwcSync::~HwcSync()
  17. { }
  18. int HwcSync::query_cold_server(
  19. DTCJobOperation* p_job,
  20. const DTCValue* key)
  21. {
  22. p_job->set_request_key(key);
  23. p_job->set_request_code(DRequest::Get);
  24. DTCFieldSet* p_dtc_field_set = p_job->request_fields();
  25. DELETE(p_dtc_field_set);
  26. DTCTableDefinition* p_dtc_tab_def = TableDefinitionManager::instance()->get_cur_table_def();
  27. #if FOR_DEBUG
  28. for (int i = 1; i < p_job->num_fields() + 1; i++) {
  29. log4cplus_info("field id:%d" , p_dtc_tab_def->raw_fields_list()[i]);
  30. }
  31. #endif
  32. // begin at 1 ,exclude key field id
  33. p_dtc_field_set = new DTCFieldSet(
  34. p_dtc_tab_def->raw_fields_list(),
  35. p_job->num_fields() + 1);
  36. p_job->set_request_fields(p_dtc_field_set);
  37. if (CComm::mysql_process_.do_process(p_job)) {
  38. return -1;
  39. }
  40. return p_job->process_internal_result();
  41. }
  42. void HwcSync::sql_statement_query(
  43. const DTCValue* p_key,
  44. std::string& s_sql)
  45. {
  46. uint32_t ui_count = 0;
  47. do {
  48. int i_ret = CComm::mysql_process_.process_statement_query(p_key , s_sql);
  49. if (-ER_DUP_ENTRY == i_ret || 0 == i_ret) {
  50. break;
  51. }
  52. uint64_t ui_interval = pow(2, ++ui_count);
  53. sleep(ui_interval);
  54. log4cplus_error("sql statement query fail sequence:%d" , ui_count);
  55. } while (true);
  56. }
  57. void HwcSync::decode_hotbin_result(
  58. ResultSet* o_hot_res,
  59. const HwcBinlogCont& o_hwc_bin)
  60. {
  61. DTCBinary o_raw_bin;
  62. o_raw_bin.len = o_hwc_bin.i_raw_len;
  63. o_raw_bin.ptr = o_hwc_bin.p_raw_val;
  64. o_hot_res->set_value_data(o_hwc_bin.i_raw_nums , o_raw_bin);
  65. }
  66. int HwcSync::get_current_time()
  67. {
  68. timeval now;
  69. gettimeofday(&now, NULL);
  70. return now.tv_sec;
  71. }
  72. int HwcSync::Run()
  73. {
  74. /* 先关闭连接,防止fd重路 */
  75. p_master_->Close();
  76. int i_sec = get_current_time() + 1;
  77. while (true) {
  78. usleep(1000000); // 1s
  79. if (get_current_time() >= i_sec) {
  80. if (CComm::registor.CheckMemoryCreateTime()) {
  81. log4cplus_error("detect share memory changed");
  82. }
  83. i_sec += 1;
  84. }
  85. DTC::SvrAdminRequest request_m(p_master_);
  86. request_m.SetAdminCode(DTC::GetUpdateKey);
  87. request_m.Need("type");
  88. request_m.Need("flag");
  89. request_m.Need("key");
  90. request_m.Need("value");
  91. request_m.SetHotBackupID((uint64_t)o_journal_id_);
  92. request_m.Limit(0, i_limit_);
  93. log4cplus_info("begin serial:%d , offset:%d" , o_journal_id_.serial , o_journal_id_.offset);
  94. DTC::Result result_m;
  95. int ret = request_m.Execute(result_m);
  96. log4cplus_warning("hwc server is aliving....., return:%d", ret);
  97. if (-DTC::EC_BAD_HOTBACKUP_JID == ret) {
  98. log4cplus_error("master report journalID is not match");
  99. }
  100. // 重试
  101. if (0 != ret) {
  102. log4cplus_warning("fetch key-list from master, limit[%d], ret=%d, err=%s",
  103. i_limit_, ret, result_m.ErrorMessage());
  104. usleep(100);
  105. continue;
  106. }
  107. // 写请求 插入 冷数据库
  108. for (int i = 0; i < result_m.NumRows(); ++i) {
  109. ret = result_m.FetchRow();
  110. if (ret < 0) {
  111. log4cplus_error("fetch key-list from master failed, limit[%d], ret=%d, err=%s",
  112. i_limit_, ret, result_m.ErrorMessage());
  113. // dtc可以运行失败
  114. return E_HWC_SYNC_DTC_ERROR;
  115. }
  116. int i_type = result_m.IntValue("type");
  117. if (i_type != DTCHotBackup::SYNC_NONE) {
  118. log4cplus_info("no sync none type , continue");
  119. break;
  120. }
  121. // key parse
  122. int i_key_size = 0;
  123. char* p_key = result_m.BinaryValue("key", i_key_size);
  124. DTCTableDefinition* p_dtc_tab_def = TableDefinitionManager::instance()->get_cur_table_def();
  125. DTCValue astKey[p_dtc_tab_def->key_fields()];// always single key
  126. TaskPackedKey::unpack_key(p_dtc_tab_def, p_key, astKey);
  127. int i_value_size = 0;
  128. char* p_value = (char *)result_m.BinaryValue("value", i_value_size);
  129. HwcBinlogCont o_hot_bin;
  130. bool b_ret = o_hot_bin.ParseFromString(p_value , i_value_size);
  131. if (!b_ret) {
  132. log4cplus_error("report alarm to manager");
  133. break;
  134. }
  135. std::string s_sql(o_hot_bin.p_sql , o_hot_bin.i_sql_len);
  136. log4cplus_info(" mysql cmd:%s , check flag:%d , row len:%d" ,
  137. s_sql.c_str(), o_hot_bin.i_check_flag
  138. , o_hot_bin.i_raw_len);
  139. if (0 == o_hot_bin.i_check_flag) {
  140. sql_statement_query(astKey, s_sql);
  141. break;
  142. } else if (1 == o_hot_bin.i_check_flag) {
  143. log4cplus_info("check: starting...");
  144. DTCJobOperation o_cold_job(p_dtc_tab_def);
  145. query_cold_server(&o_cold_job , astKey);
  146. ResultSet* p_cold_res = o_cold_job.result;
  147. if (!p_cold_res) {
  148. log4cplus_info("cold res is null");
  149. return E_HWC_SYNC_NORMAL_EXIT;
  150. }
  151. log4cplus_info("hot row num:%d ,cold row num:%d" ,
  152. o_hot_bin.i_raw_nums , p_cold_res->total_rows());
  153. if (o_hot_bin.i_raw_nums > p_cold_res->total_rows() ||
  154. o_hot_bin.i_raw_nums < p_cold_res->total_rows()) {
  155. sql_statement_query(astKey, s_sql);
  156. break;
  157. }
  158. uint8_t* p_fiedld_list = p_dtc_tab_def->raw_fields_list();
  159. DTCFieldSet o_dtc_field_set(p_fiedld_list , p_dtc_tab_def->num_fields() + 1);
  160. ResultSet p_hot_result(o_dtc_field_set , p_dtc_tab_def);
  161. decode_hotbin_result(&p_hot_result , o_hot_bin);
  162. for (int i = 0; i < p_hot_result.total_rows(); i++) {
  163. RowValue* p_hot_raw = p_hot_result.fetch_row();
  164. bool b_check = false;
  165. // 冷数据库为base,只要冷数据库中没有热的,就插入
  166. for (int j = 0; j < p_cold_res->total_rows(); j++) {
  167. RowValue* p_cold_raw = p_cold_res->fetch_row();
  168. if(p_hot_raw->Compare(*p_cold_raw ,
  169. p_fiedld_list ,
  170. p_dtc_tab_def->num_fields() + 1) == 0) {
  171. log4cplus_info("check: row data has been in cold table");
  172. b_check = true;
  173. break;
  174. }
  175. }
  176. if (!b_check) {
  177. // 对账失败,执行sql语句 ,容错逻辑
  178. log4cplus_info("check: need insert in cold table");
  179. sql_statement_query(astKey, s_sql);
  180. break;
  181. }
  182. p_cold_res->rewind();
  183. }
  184. log4cplus_info("check: finish");
  185. } else {
  186. log4cplus_error("illegal check flag");
  187. continue;
  188. }
  189. }
  190. // 成功,则更新控制文件中的journalID
  191. o_journal_id_ = (uint64_t)result_m.HotBackupID();
  192. log4cplus_info("end serial:%d , offset:%d" , o_journal_id_.serial , o_journal_id_.offset);
  193. CComm::registor.JournalId() = o_journal_id_;
  194. }
  195. return E_HWC_SYNC_NORMAL_EXIT;
  196. }
  197. //***************************分割线***************************
  198. HwcSyncUnit::HwcSyncUnit()
  199. : p_hwc_sync_(NULL)
  200. { }
  201. HwcSyncUnit::~HwcSyncUnit()
  202. {
  203. DELETE(p_hwc_sync_);
  204. }
  205. bool HwcSyncUnit::Run(DTC::Server* m , int limit)
  206. {
  207. log4cplus_warning("hwc sync unit is start");
  208. if (NULL == p_hwc_sync_) {
  209. p_hwc_sync_ = new HwcSync(m);
  210. if (!p_hwc_sync_) {
  211. log4cplus_error("hwcsync is not complete, err: create HwcSync obj failed");
  212. return false;
  213. }
  214. }
  215. p_hwc_sync_->SetLimit(limit);
  216. int i_ret = p_hwc_sync_->Run();
  217. log4cplus_warning("hwcsync is stop , errorid:%d" , i_ret);
  218. return (i_ret != E_HWC_SYNC_NORMAL_EXIT);
  219. }