hwc_sync_unit.cc 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. #include "hwc_sync_unit.h"
  2. #include <string>
  3. #include <sys/time.h>
  4. // local
  5. #include "comm.h"
  6. // connector
  7. #include "mysql_operation.h"
  8. // libs/api/cc_api/include/
  9. #include "dtcapi.h"
  10. #include "mysqld_error.h"
  11. HwcSync::HwcSync(DTC::Server* p_server)
  12. : i_limit_(1)
  13. , p_master_(p_server)
  14. , o_journal_id_(CComm::registor.JournalId())
  15. { }
  16. HwcSync::~HwcSync()
  17. { }
  18. int HwcSync::query_cold_server(
  19. DTCJobOperation* p_job,
  20. const DTCValue* key)
  21. {
  22. p_job->set_request_key(key);
  23. p_job->set_request_code(DRequest::Get);
  24. DTCFieldSet* p_dtc_field_set = p_job->request_fields();
  25. DELETE(p_dtc_field_set);
  26. DTCTableDefinition* p_dtc_tab_def = TableDefinitionManager::instance()->get_cur_table_def();
  27. #if FOR_DEBUG
  28. for (int i = 1; i < p_job->num_fields() + 1; i++) {
  29. log4cplus_info("field id:%d" , p_dtc_tab_def->raw_fields_list()[i]);
  30. }
  31. #endif
  32. // begin at 1 ,exclude key field id
  33. p_dtc_field_set = new DTCFieldSet(
  34. p_dtc_tab_def->raw_fields_list(),
  35. p_job->num_fields() + 1);
  36. p_job->set_request_fields(p_dtc_field_set);
  37. if (CComm::mysql_process_.do_process(p_job)) {
  38. return -1;
  39. }
  40. return p_job->process_internal_result();
  41. }
  42. void HwcSync::sql_statement_query(
  43. const DTCValue* p_key,
  44. std::string& s_sql)
  45. {
  46. uint32_t ui_count = 0;
  47. do {
  48. int i_ret = CComm::mysql_process_.process_statement_query(p_key , s_sql);
  49. if (-ER_DUP_ENTRY == i_ret || 0 == i_ret) {
  50. break;
  51. }
  52. uint64_t ui_interval = pow(2, ++ui_count);
  53. sleep(ui_interval);
  54. log4cplus_error("sql statement query fail sequence:%d" , ui_count);
  55. } while (true);
  56. }
  57. void HwcSync::decode_hotbin_result(
  58. ResultSet* o_hot_res,
  59. const HwcBinlogCont& o_hwc_bin)
  60. {
  61. DTCBinary o_raw_bin;
  62. o_raw_bin.len = o_hwc_bin.i_raw_len;
  63. o_raw_bin.ptr = o_hwc_bin.p_raw_val;
  64. o_hot_res->set_value_data(o_hwc_bin.i_raw_nums , o_raw_bin);
  65. }
  66. int HwcSync::get_current_time()
  67. {
  68. timeval now;
  69. gettimeofday(&now, NULL);
  70. return now.tv_sec;
  71. }
  72. int HwcSync::Run()
  73. {
  74. /* 先关闭连接,防止fd重路 */
  75. p_master_->Close();
  76. int i_sec = get_current_time() + 1;
  77. while (true) {
  78. if (get_current_time() >= i_sec) {
  79. if (CComm::registor.CheckMemoryCreateTime()) {
  80. log4cplus_error("detect share memory changed");
  81. }
  82. i_sec += 1;
  83. }
  84. DTC::SvrAdminRequest request_m(p_master_);
  85. request_m.SetAdminCode(DTC::GetUpdateKey);
  86. request_m.Need("type");
  87. request_m.Need("flag");
  88. request_m.Need("key");
  89. request_m.Need("value");
  90. request_m.SetHotBackupID((uint64_t)o_journal_id_);
  91. request_m.Limit(0, i_limit_);
  92. log4cplus_info("begin serial:%d , offset:%d" , o_journal_id_.serial , o_journal_id_.offset);
  93. DTC::Result result_m;
  94. int ret = request_m.Execute(result_m);
  95. log4cplus_warning("hwc server is aliving....., return:%d", ret);
  96. if (-DTC::EC_BAD_HOTBACKUP_JID == ret) {
  97. log4cplus_error("master report journalID is not match");
  98. }
  99. // 重试
  100. if (0 != ret) {
  101. log4cplus_warning("fetch key-list from master, limit[%d], ret=%d, err=%s",
  102. i_limit_, ret, result_m.ErrorMessage());
  103. usleep(100);
  104. continue;
  105. }
  106. // 写请求 插入 冷数据库
  107. for (int i = 0; i < result_m.NumRows(); ++i) {
  108. ret = result_m.FetchRow();
  109. if (ret < 0) {
  110. log4cplus_error("fetch key-list from master failed, limit[%d], ret=%d, err=%s",
  111. i_limit_, ret, result_m.ErrorMessage());
  112. // dtc可以运行失败
  113. return E_HWC_SYNC_DTC_ERROR;
  114. }
  115. int i_type = result_m.IntValue("type");
  116. if (i_type != DTCHotBackup::SYNC_NONE) {
  117. log4cplus_info("no sync none type , continue");
  118. break;
  119. }
  120. // key parse
  121. int i_key_size = 0;
  122. char* p_key = result_m.BinaryValue("key", i_key_size);
  123. DTCTableDefinition* p_dtc_tab_def = TableDefinitionManager::instance()->get_cur_table_def();
  124. DTCValue astKey[p_dtc_tab_def->key_fields()];// always single key
  125. TaskPackedKey::unpack_key(p_dtc_tab_def, p_key, astKey);
  126. int i_value_size = 0;
  127. char* p_value = (char *)result_m.BinaryValue("value", i_value_size);
  128. HwcBinlogCont o_hot_bin;
  129. bool b_ret = o_hot_bin.ParseFromString(p_value , i_value_size);
  130. if (!b_ret) {
  131. log4cplus_error("report alarm to manager");
  132. break;
  133. }
  134. std::string s_sql(o_hot_bin.p_sql , o_hot_bin.i_sql_len);
  135. log4cplus_info(" mysql cmd:%s , check flag:%d , row len:%d" ,
  136. s_sql.c_str(), o_hot_bin.i_check_flag
  137. , o_hot_bin.i_raw_len);
  138. if (0 == o_hot_bin.i_check_flag) {
  139. sql_statement_query(astKey, s_sql);
  140. break;
  141. } else if (1 == o_hot_bin.i_check_flag) {
  142. log4cplus_info("check: starting...");
  143. DTCJobOperation o_cold_job(p_dtc_tab_def);
  144. query_cold_server(&o_cold_job , astKey);
  145. ResultSet* p_cold_res = o_cold_job.result;
  146. if (!p_cold_res) {
  147. log4cplus_info("cold res is null");
  148. return E_HWC_SYNC_NORMAL_EXIT;
  149. }
  150. log4cplus_info("hot row num:%d ,cold row num:%d" ,
  151. o_hot_bin.i_raw_nums , p_cold_res->total_rows());
  152. if (o_hot_bin.i_raw_nums > p_cold_res->total_rows() ||
  153. o_hot_bin.i_raw_nums < p_cold_res->total_rows()) {
  154. sql_statement_query(astKey, s_sql);
  155. break;
  156. }
  157. uint8_t* p_fiedld_list = p_dtc_tab_def->raw_fields_list();
  158. DTCFieldSet o_dtc_field_set(p_fiedld_list , p_dtc_tab_def->num_fields() + 1);
  159. ResultSet p_hot_result(o_dtc_field_set , p_dtc_tab_def);
  160. decode_hotbin_result(&p_hot_result , o_hot_bin);
  161. for (int i = 0; i < p_hot_result.total_rows(); i++) {
  162. RowValue* p_hot_raw = p_hot_result.fetch_row();
  163. bool b_check = false;
  164. // 冷数据库为base,只要冷数据库中没有热的,就插入
  165. for (int j = 0; j < p_cold_res->total_rows(); j++) {
  166. RowValue* p_cold_raw = p_cold_res->fetch_row();
  167. if(p_hot_raw->Compare(*p_cold_raw ,
  168. p_fiedld_list ,
  169. p_dtc_tab_def->num_fields() + 1) == 0) {
  170. log4cplus_info("check: row data has been in cold table");
  171. b_check = true;
  172. break;
  173. }
  174. }
  175. if (!b_check) {
  176. // 对账失败,执行sql语句 ,容错逻辑
  177. log4cplus_info("check: need insert in cold table");
  178. sql_statement_query(astKey, s_sql);
  179. break;
  180. }
  181. p_cold_res->rewind();
  182. }
  183. log4cplus_info("check: finish");
  184. } else {
  185. log4cplus_error("illegal check flag");
  186. continue;
  187. }
  188. }
  189. // 成功,则更新控制文件中的journalID
  190. o_journal_id_ = (uint64_t)result_m.HotBackupID();
  191. log4cplus_info("end serial:%d , offset:%d" , o_journal_id_.serial , o_journal_id_.offset);
  192. CComm::registor.JournalId() = o_journal_id_;
  193. }
  194. return E_HWC_SYNC_NORMAL_EXIT;
  195. }
  196. //***************************分割线***************************
  197. HwcSyncUnit::HwcSyncUnit()
  198. : p_hwc_sync_(NULL)
  199. { }
  200. HwcSyncUnit::~HwcSyncUnit()
  201. {
  202. DELETE(p_hwc_sync_);
  203. }
  204. bool HwcSyncUnit::Run(DTC::Server* m , int limit)
  205. {
  206. log4cplus_warning("hwc sync unit is start");
  207. if (NULL == p_hwc_sync_) {
  208. p_hwc_sync_ = new HwcSync(m);
  209. if (!p_hwc_sync_) {
  210. log4cplus_error("hwcsync is not complete, err: create HwcSync obj failed");
  211. return false;
  212. }
  213. }
  214. p_hwc_sync_->SetLimit(limit);
  215. int i_ret = p_hwc_sync_->Run();
  216. log4cplus_warning("hwcsync is stop , errorid:%d" , i_ret);
  217. return (i_ret != E_HWC_SYNC_NORMAL_EXIT);
  218. }