tree_data_process.cc 19 KB


  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <stdio.h>
  17. #include <stdlib.h>
  18. #include <string.h>
  19. #include "tree_data_process.h"
  20. #include "global.h"
  21. #include "log/log.h"
  22. #include "sys_malloc.h"
  23. DTC_USING_NAMESPACE
  24. TreeDataProcess::TreeDataProcess(MallocBase *pstMalloc,
  25. DTCTableDefinition *p_table_definition_,
  26. BufferPond *pstPool,
  27. const UpdateMode *pstUpdateMode)
  28. : m_stTreeData(pstMalloc), p_table_(p_table_definition_),
  29. p_mallocator_(pstMalloc), p_buffer_pond_(pstPool)
  30. {
  31. memcpy(&update_mode_, pstUpdateMode, sizeof(update_mode_));
  32. nodeSizeLimit = 0;
  33. history_rowsize = g_stat_mgr.get_sample(ROW_SIZE_HISTORY_STAT);
  34. }
  35. TreeDataProcess::~TreeDataProcess()
  36. {
  37. }
  38. int TreeDataProcess::get_expire_time(DTCTableDefinition *t, Node *p_node,
  39. uint32_t &expire)
  40. {
  41. int iRet = 0;
  42. iRet = m_stTreeData.do_attach(p_node->vd_handle());
  43. if (iRet != 0) {
  44. snprintf(err_message_, sizeof(err_message_),
  45. "attach data error");
  46. log4cplus_error("tree-data attach[handle:" UINT64FMT
  47. "] error: %d,%s",
  48. p_node->vd_handle(), iRet,
  49. m_stTreeData.get_err_msg());
  50. return (iRet);
  51. }
  52. iRet = m_stTreeData.get_expire_time(t, expire);
  53. if (iRet != 0) {
  54. log4cplus_error("tree data get expire time error: %d", iRet);
  55. return iRet;
  56. }
  57. return 0;
  58. }
  59. int TreeDataProcess::do_replace_all(Node *p_node, RawData *new_data)
  60. {
  61. int iRet;
  62. log4cplus_debug("Replace TreeData start ");
  63. rows_count_ = 0;
  64. dirty_rows_count_ = 0;
  65. TreeData tmpTreeData(p_mallocator_);
  66. iRet = tmpTreeData.do_init(new_data->key());
  67. if (iRet == EC_NO_MEM) {
  68. if (p_buffer_pond_->try_purge_size(tmpTreeData.need_size(),
  69. *p_node) == 0)
  70. iRet = tmpTreeData.do_init(new_data->key());
  71. }
  72. if (iRet != 0) {
  73. snprintf(err_message_, sizeof(err_message_),
  74. "root-data init error: %s", tmpTreeData.get_err_msg());
  75. tmpTreeData.destory();
  76. return (-2);
  77. }
  78. iRet = tmpTreeData.copy_tree_all(new_data);
  79. if (iRet == EC_NO_MEM) {
  80. if (p_buffer_pond_->try_purge_size(tmpTreeData.need_size(),
  81. *p_node) == 0)
  82. iRet = tmpTreeData.copy_tree_all(new_data);
  83. }
  84. if (iRet != 0) {
  85. snprintf(err_message_, sizeof(err_message_),
  86. "root-data init error: %s", tmpTreeData.get_err_msg());
  87. tmpTreeData.destory();
  88. return (-2);
  89. }
  90. if (p_node->vd_handle() != INVALID_HANDLE)
  91. destroy_data(p_node);
  92. p_node->vd_handle() = tmpTreeData.get_handle();
  93. if (tmpTreeData.total_rows() > 0) {
  94. history_rowsize.push(tmpTreeData.total_rows());
  95. }
  96. return (0);
  97. }
  98. int TreeDataProcess::do_append(DTCJobOperation &job_op, Node *p_node,
  99. RawData *affected_data, bool isDirty,
  100. bool setrows)
  101. {
  102. int iRet;
  103. DTCTableDefinition *stpNodeTab, *stpTaskTab;
  104. RowValue *stpNodeRow, *stpTaskRow;
  105. iRet = m_stTreeData.do_attach(p_node->vd_handle());
  106. if (iRet != 0) {
  107. snprintf(err_message_, sizeof(err_message_),
  108. "attach data error");
  109. log4cplus_error("tree-data attach[handle:" UINT64FMT
  110. "] error: %d,%s",
  111. p_node->vd_handle(), iRet,
  112. m_stTreeData.get_err_msg());
  113. return (iRet);
  114. }
  115. stpNodeTab = m_stTreeData.get_node_table_def();
  116. stpTaskTab = job_op.table_definition();
  117. RowValue stTaskRow(stpTaskTab);
  118. RowValue stNodeRow(stpNodeTab);
  119. stpTaskRow = &stTaskRow;
  120. stpTaskRow->default_value();
  121. job_op.update_row(*stpTaskRow);
  122. if (stpTaskTab->auto_increment_field_id() >= stpTaskTab->key_fields() &&
  123. job_op.resultInfo.insert_id()) {
  124. const int iFieldID = stpTaskTab->auto_increment_field_id();
  125. const uint64_t iVal = job_op.resultInfo.insert_id();
  126. stpTaskRow->field_value(iFieldID)->Set(iVal);
  127. }
  128. if (stpNodeTab == stpTaskTab) {
  129. stpNodeRow = stpTaskRow;
  130. } else {
  131. stpNodeRow = &stNodeRow;
  132. stpNodeRow->default_value();
  133. stpNodeRow->Copy(stpTaskRow);
  134. }
  135. log4cplus_debug("AppendTreeData start! ");
  136. rows_count_ = 0;
  137. dirty_rows_count_ = 0;
  138. unsigned int uiTotalRows = m_stTreeData.total_rows();
  139. if (uiTotalRows > 0) {
  140. if ((isDirty || setrows) &&
  141. job_op.table_definition()->key_as_uniq_field()) {
  142. snprintf(err_message_, sizeof(err_message_),
  143. "duplicate key error");
  144. return (-1062);
  145. }
  146. if (setrows &&
  147. job_op.table_definition()->key_part_of_uniq_field()) {
  148. iRet = m_stTreeData.compare_tree_data(stpNodeRow);
  149. if (iRet < 0) {
  150. log4cplus_error(
  151. "tree-data decode row error: %d,%s",
  152. iRet, m_stTreeData.get_err_msg());
  153. return iRet;
  154. } else if (iRet == 0) {
  155. snprintf(err_message_, sizeof(err_message_),
  156. "duplicate key error");
  157. return (-1062);
  158. }
  159. }
  160. }
  161. // insert clean row
  162. iRet = m_stTreeData.insert_row(*stpNodeRow, KeyCompare, isDirty);
  163. if (iRet == EC_NO_MEM) {
  164. if (p_buffer_pond_->try_purge_size(m_stTreeData.need_size(),
  165. *p_node) == 0)
  166. iRet = m_stTreeData.insert_row(*stpNodeRow, KeyCompare,
  167. isDirty);
  168. }
  169. if (iRet != EC_NO_MEM)
  170. p_node->vd_handle() = m_stTreeData.get_handle();
  171. if (iRet != 0) {
  172. snprintf(err_message_, sizeof(err_message_),
  173. "tree-data insert row error: %s,%d",
  174. m_stTreeData.get_err_msg(), iRet);
  175. /*标记加入黑名单*/
  176. job_op.push_black_list_size(m_stTreeData.need_size());
  177. return (-2);
  178. }
  179. if (job_op.resultInfo.affected_rows() == 0 || setrows == true)
  180. job_op.resultInfo.set_affected_rows(1);
  181. rows_count_++;
  182. if (isDirty)
  183. dirty_rows_count_++;
  184. history_rowsize.push(m_stTreeData.total_rows());
  185. return (0);
  186. }
  187. int TreeDataProcess::do_get(DTCJobOperation &job_op, Node *p_node)
  188. {
  189. int iRet;
  190. log4cplus_debug("Get TreeData start! ");
  191. rows_count_ = 0;
  192. dirty_rows_count_ = 0;
  193. iRet = m_stTreeData.do_attach(p_node->vd_handle());
  194. if (iRet != 0) {
  195. snprintf(err_message_, sizeof(err_message_),
  196. "attach data error");
  197. log4cplus_error("tree-data attach[handle:" UINT64FMT
  198. "] error: %d,%s",
  199. p_node->vd_handle(), iRet,
  200. m_stTreeData.get_err_msg());
  201. return (-1);
  202. }
  203. iRet = m_stTreeData.get_tree_data(job_op);
  204. if (iRet != 0) {
  205. snprintf(err_message_, sizeof(err_message_),
  206. "get tree data error");
  207. log4cplus_error("tree-data get[handle:" UINT64FMT
  208. "] error: %d,%s",
  209. p_node->vd_handle(), iRet,
  210. m_stTreeData.get_err_msg());
  211. return iRet;
  212. }
  213. /*更新访问时间和查找操作计数*/
  214. log4cplus_debug("node[id:%u] ,Get Count is %d", p_node->node_id(),
  215. m_stTreeData.total_rows());
  216. return (0);
  217. }
  218. int TreeDataProcess::expand_node(DTCJobOperation &job_op, Node *p_node)
  219. {
  220. return 0;
  221. }
  222. int TreeDataProcess::get_dirty_row_count(DTCJobOperation &job_op, Node *p_node)
  223. {
  224. int iRet = m_stTreeData.do_attach(p_node->vd_handle());
  225. if (iRet != 0) {
  226. snprintf(err_message_, sizeof(err_message_),
  227. "attach data error");
  228. log4cplus_error("tree-data attach[handle:" UINT64FMT
  229. "] error: %d,%s",
  230. p_node->vd_handle(), iRet,
  231. m_stTreeData.get_err_msg());
  232. return (-1);
  233. }
  234. return m_stTreeData.get_dirty_row_count();
  235. }
  236. int TreeDataProcess::attach_data(Node *p_node, RawData *affected_data)
  237. {
  238. int iRet;
  239. iRet = m_stTreeData.do_attach(p_node->vd_handle());
  240. if (iRet != 0) {
  241. log4cplus_error("tree-data attach[handle:" UINT64FMT
  242. "] error: %d,%s",
  243. p_node->vd_handle(), iRet,
  244. m_stTreeData.get_err_msg());
  245. return (-1);
  246. }
  247. if (affected_data != NULL) {
  248. iRet = affected_data->do_init(m_stTreeData.key(), 0);
  249. if (iRet != 0) {
  250. log4cplus_error("tree-data init error: %d,%s", iRet,
  251. affected_data->get_err_msg());
  252. return (-2);
  253. }
  254. }
  255. return (0);
  256. }
  257. int TreeDataProcess::get_node_all_rows_count(Node *p_node, RawData *pstRows)
  258. {
  259. int iRet = 0;
  260. rows_count_ = 0;
  261. dirty_rows_count_ = 0;
  262. iRet = attach_data(p_node, pstRows);
  263. if (iRet != 0) {
  264. log4cplus_error("attach data error: %d", iRet);
  265. return (-1);
  266. }
  267. iRet = m_stTreeData.copy_raw_all(pstRows);
  268. if (iRet != 0) {
  269. log4cplus_error("copy data error: %d,%s", iRet,
  270. m_stTreeData.get_err_msg());
  271. return (-2);
  272. }
  273. return (0);
  274. }
  275. int TreeDataProcess::do_delete(DTCJobOperation &job_op, Node *p_node,
  276. RawData *affected_data)
  277. {
  278. int iRet;
  279. log4cplus_debug("Delete TreeData start! ");
  280. iRet = m_stTreeData.do_attach(p_node->vd_handle());
  281. if (iRet != 0) {
  282. snprintf(err_message_, sizeof(err_message_),
  283. "attach data error");
  284. log4cplus_error("tree-data attach[handle:" UINT64FMT
  285. "] error: %d,%s",
  286. p_node->vd_handle(), iRet,
  287. m_stTreeData.get_err_msg());
  288. return (-1);
  289. }
  290. int start = m_stTreeData.total_rows();
  291. iRet = m_stTreeData.delete_tree_data(job_op);
  292. if (iRet != 0) {
  293. snprintf(err_message_, sizeof(err_message_),
  294. "get tree data error");
  295. log4cplus_error("tree-data get[handle:" UINT64FMT
  296. "] error: %d,%s",
  297. p_node->vd_handle(), iRet,
  298. m_stTreeData.get_err_msg());
  299. return iRet;
  300. }
  301. int iAffectRows = start - m_stTreeData.total_rows();
  302. if (iAffectRows > 0) {
  303. if (job_op.resultInfo.affected_rows() == 0 ||
  304. (job_op.request_condition() &&
  305. job_op.request_condition()->has_type_timestamp())) {
  306. job_op.resultInfo.set_affected_rows(iAffectRows);
  307. }
  308. }
  309. rows_count_ = m_stTreeData.get_increase_row_count();
  310. dirty_rows_count_ = m_stTreeData.get_increase_dirty_row_count();
  311. log4cplus_debug("node[id:%u] ,Get Count is %d", p_node->node_id(),
  312. m_stTreeData.total_rows());
  313. return (0);
  314. }
  315. int TreeDataProcess::do_replace_all(DTCJobOperation &job_op, Node *p_node)
  316. {
  317. log4cplus_debug("do_replace_all start! ");
  318. DTCTableDefinition *stpNodeTab, *stpTaskTab;
  319. RowValue *stpNodeRow;
  320. int iRet;
  321. int try_purge_count = 0;
  322. uint64_t all_rows_size = 0;
  323. int laid = job_op.flag_no_cache() || job_op.count_only() ?
  324. -1 :
  325. job_op.table_definition()->lastacc_field_id();
  326. int matchedCount = 0;
  327. int limitStart = 0;
  328. int limitStop = 0x10000000;
  329. stpTaskTab = job_op.table_definition();
  330. if (DTCColExpand::instance()->is_expanding())
  331. stpNodeTab =
  332. TableDefinitionManager::instance()->get_new_table_def();
  333. else
  334. stpNodeTab =
  335. TableDefinitionManager::instance()->get_cur_table_def();
  336. RowValue stNodeRow(stpNodeTab);
  337. stpNodeRow = &stNodeRow;
  338. stpNodeRow->default_value();
  339. if (laid > 0 && job_op.requestInfo.limit_count() > 0) {
  340. limitStart = job_op.requestInfo.limit_start();
  341. if (job_op.requestInfo.limit_start() > 0x10000000) {
  342. laid = -1;
  343. } else if (job_op.requestInfo.limit_count() < 0x10000000) {
  344. limitStop =
  345. limitStart + job_op.requestInfo.limit_count();
  346. }
  347. }
  348. rows_count_ = 0;
  349. dirty_rows_count_ = 0;
  350. if (p_node->vd_handle() != INVALID_HANDLE) {
  351. iRet = destroy_data(p_node);
  352. if (iRet != 0)
  353. return (-1);
  354. }
  355. iRet = m_stTreeData.do_init(job_op.packed_key());
  356. if (iRet == EC_NO_MEM) {
  357. if (p_buffer_pond_->try_purge_size(m_stTreeData.need_size(),
  358. *p_node) == 0)
  359. iRet = m_stTreeData.do_init(p_table_->key_fields() - 1,
  360. p_table_->key_format(),
  361. job_op.packed_key());
  362. }
  363. if (iRet != EC_NO_MEM)
  364. p_node->vd_handle() = m_stTreeData.get_handle();
  365. if (iRet != 0) {
  366. snprintf(err_message_, sizeof(err_message_),
  367. "raw-data init error: %s", m_stTreeData.get_err_msg());
  368. /*标记加入黑名单*/
  369. job_op.push_black_list_size(m_stTreeData.need_size());
  370. p_buffer_pond_->purge_node(job_op.packed_key(), *p_node);
  371. return (-2);
  372. }
  373. if (job_op.result != NULL) {
  374. ResultSet *pstResultSet = job_op.result;
  375. for (int i = 0; i < pstResultSet->total_rows(); i++) {
  376. RowValue *pstRow = pstResultSet->_fetch_row();
  377. if (pstRow == NULL) {
  378. log4cplus_debug("%s!",
  379. "call fetch_row func error");
  380. p_buffer_pond_->purge_node(job_op.packed_key(),
  381. *p_node);
  382. m_stTreeData.destory();
  383. return (-3);
  384. }
  385. if (laid > 0 && job_op.compare_row(*pstRow)) {
  386. if (matchedCount >= limitStart &&
  387. matchedCount < limitStop) {
  388. (*pstRow)[laid].s64 =
  389. job_op.Timestamp();
  390. }
  391. matchedCount++;
  392. }
  393. if (stpTaskTab != stpNodeTab) {
  394. stpNodeRow->Copy(pstRow);
  395. } else {
  396. stpNodeRow = pstRow;
  397. }
  398. /* 插入当前行 */
  399. iRet = m_stTreeData.insert_row(*stpNodeRow, KeyCompare,
  400. false);
  401. /* 如果内存空间不足,尝试扩大最多两次 */
  402. if (iRet == EC_NO_MEM) {
  403. if (try_purge_count >= 2) {
  404. goto ERROR_PROCESS;
  405. }
  406. /* 尝试次数 */
  407. ++try_purge_count;
  408. if (p_buffer_pond_->try_purge_size(
  409. m_stTreeData.need_size(),
  410. *p_node) == 0)
  411. iRet = m_stTreeData.insert_row(
  412. *stpNodeRow, KeyCompare, false);
  413. }
  414. if (iRet != EC_NO_MEM)
  415. p_node->vd_handle() = m_stTreeData.get_handle();
  416. /* 当前行操作成功 */
  417. if (0 == iRet)
  418. continue;
  419. ERROR_PROCESS:
  420. snprintf(
  421. err_message_, sizeof(err_message_),
  422. "raw-data insert row error: ret=%d,err=%s, cnt=%d",
  423. iRet, m_stTreeData.get_err_msg(),
  424. try_purge_count);
  425. /*标记加入黑名单*/
  426. job_op.push_black_list_size(all_rows_size);
  427. p_buffer_pond_->purge_node(job_op.packed_key(),
  428. *p_node);
  429. m_stTreeData.destory();
  430. return (-4);
  431. }
  432. rows_count_ += pstResultSet->total_rows();
  433. }
  434. history_rowsize.push(m_stTreeData.total_rows());
  435. return (0);
  436. }
  437. int TreeDataProcess::do_replace(DTCJobOperation &job_op, Node *p_node,
  438. RawData *affected_data, bool async,
  439. bool setrows = false)
  440. {
  441. int iRet;
  442. log4cplus_debug("Replace TreeData start! ");
  443. rows_count_ = 0;
  444. dirty_rows_count_ = 0;
  445. if (p_node) {
  446. iRet = m_stTreeData.do_attach(p_node->vd_handle());
  447. if (iRet != 0) {
  448. log4cplus_error("attach tree data error: %d", iRet);
  449. return (iRet);
  450. }
  451. } else {
  452. iRet = m_stTreeData.do_init(job_op.packed_key());
  453. if (iRet == EC_NO_MEM) {
  454. if (p_buffer_pond_->try_purge_size(
  455. m_stTreeData.need_size(), *p_node) == 0)
  456. iRet = m_stTreeData.do_init(
  457. job_op.packed_key());
  458. }
  459. if (iRet != 0) {
  460. log4cplus_error("tree-data replace[handle:" UINT64FMT
  461. "] error: %d,%s",
  462. p_node->vd_handle(), iRet,
  463. m_stTreeData.get_err_msg());
  464. return iRet;
  465. }
  466. p_node->vd_handle() = m_stTreeData.get_handle();
  467. }
  468. unsigned char uchRowFlags;
  469. iRet = m_stTreeData.replace_tree_data(job_op, p_node, affected_data,
  470. async, uchRowFlags, setrows);
  471. if (iRet == EC_NO_MEM) {
  472. if (p_buffer_pond_->try_purge_size(m_stTreeData.need_size(),
  473. *p_node) == 0)
  474. iRet = m_stTreeData.replace_tree_data(
  475. job_op, p_node, affected_data, async,
  476. uchRowFlags, setrows);
  477. }
  478. if (iRet != 0) {
  479. log4cplus_error("tree-data replace[handle:" UINT64FMT
  480. "] error: %d,%s",
  481. p_node->vd_handle(), iRet,
  482. m_stTreeData.get_err_msg());
  483. return iRet;
  484. }
  485. if (uchRowFlags & OPER_DIRTY)
  486. dirty_rows_count_--;
  487. if (async)
  488. dirty_rows_count_++;
  489. uint64_t ullAffectedRows = m_stTreeData.get_affectedrows();
  490. if (ullAffectedRows == 0) //insert
  491. {
  492. DTCTableDefinition *stpTaskTab;
  493. RowValue *stpNewRow;
  494. stpTaskTab = job_op.table_definition();
  495. RowValue stNewRow(stpTaskTab);
  496. stNewRow.default_value();
  497. stpNewRow = &stNewRow;
  498. job_op.update_row(*stpNewRow); //获取Replace的行
  499. iRet = m_stTreeData.insert_row(*stpNewRow, KeyCompare,
  500. async); // 加进cache
  501. if (iRet == EC_NO_MEM) {
  502. if (p_buffer_pond_->try_purge_size(
  503. m_stTreeData.need_size(), *p_node) == 0)
  504. iRet = m_stTreeData.insert_row(
  505. *stpNewRow, KeyCompare, async);
  506. }
  507. if (iRet != EC_NO_MEM)
  508. p_node->vd_handle() = m_stTreeData.get_handle();
  509. if (iRet != 0) {
  510. snprintf(err_message_, sizeof(err_message_),
  511. "raw-data replace row error: %d, %s", iRet,
  512. m_stTreeData.get_err_msg());
  513. /*标记加入黑名单*/
  514. job_op.push_black_list_size(m_stTreeData.need_size());
  515. return (-3);
  516. }
  517. rows_count_++;
  518. ullAffectedRows++;
  519. if (async)
  520. dirty_rows_count_++;
  521. }
  522. if (async == true || setrows == true) {
  523. job_op.resultInfo.set_affected_rows(ullAffectedRows);
  524. } else if (ullAffectedRows != job_op.resultInfo.affected_rows()) {
  525. //如果cache更新纪录数和helper更新的纪录数不相等
  526. log4cplus_debug(
  527. "unequal affected rows, cache[%lld], helper[%lld]",
  528. (long long)ullAffectedRows,
  529. (long long)job_op.resultInfo.affected_rows());
  530. }
  531. return 0;
  532. }
  533. int TreeDataProcess::do_update(DTCJobOperation &job_op, Node *p_node,
  534. RawData *affected_data, bool async,
  535. bool setrows = false)
  536. {
  537. int iRet;
  538. log4cplus_debug("Update TreeData start! ");
  539. rows_count_ = 0;
  540. dirty_rows_count_ = 0;
  541. iRet = m_stTreeData.do_attach(p_node->vd_handle());
  542. if (iRet != 0) {
  543. log4cplus_error("attach tree data error: %d", iRet);
  544. return (iRet);
  545. }
  546. m_stTreeData.set_affected_rows(0);
  547. iRet = m_stTreeData.update_tree_data(job_op, p_node, affected_data,
  548. async, setrows);
  549. if (iRet == EC_NO_MEM) {
  550. if (p_buffer_pond_->try_purge_size(m_stTreeData.need_size(),
  551. *p_node) == 0)
  552. iRet = m_stTreeData.update_tree_data(
  553. job_op, p_node, affected_data, async, setrows);
  554. }
  555. if (iRet != 0) {
  556. log4cplus_error("tree-data update[handle:" UINT64FMT
  557. "] error: %d,%s",
  558. p_node->vd_handle(), iRet,
  559. m_stTreeData.get_err_msg());
  560. return iRet;
  561. }
  562. uint64_t ullAffectedRows = m_stTreeData.get_affectedrows();
  563. dirty_rows_count_ = m_stTreeData.get_increase_dirty_row_count();
  564. if (async == true || setrows == true) {
  565. job_op.resultInfo.set_affected_rows(ullAffectedRows);
  566. } else if (ullAffectedRows != job_op.resultInfo.affected_rows()) {
  567. //如果cache更新纪录数和helper更新的纪录数不相等
  568. log4cplus_debug(
  569. "unequal affected rows, cache[%lld], helper[%lld]",
  570. (long long)ullAffectedRows,
  571. (long long)job_op.resultInfo.affected_rows());
  572. }
  573. return (0);
  574. }
  575. int TreeDataProcess::do_flush(DTCFlushRequest *flush_req, Node *p_node,
  576. unsigned int &affected_count)
  577. {
  578. int iRet;
  579. log4cplus_debug("do_flush start! ");
  580. rows_count_ = 0;
  581. dirty_rows_count_ = 0;
  582. iRet = m_stTreeData.do_attach(p_node->vd_handle());
  583. if (iRet != 0) {
  584. snprintf(err_message_, sizeof(err_message_),
  585. "attach data error");
  586. log4cplus_error("tree-data attach[handle:" UINT64FMT
  587. "] error: %d,%s",
  588. p_node->vd_handle(), iRet,
  589. m_stTreeData.get_err_msg());
  590. return (-1);
  591. }
  592. iRet = m_stTreeData.flush_tree_data(flush_req, p_node, affected_count);
  593. if (iRet != 0) {
  594. snprintf(err_message_, sizeof(err_message_),
  595. "flush tree data error");
  596. log4cplus_error("tree-data flush[handle:" UINT64FMT
  597. "] error: %d,%s",
  598. p_node->vd_handle(), iRet,
  599. m_stTreeData.get_err_msg());
  600. return iRet;
  601. }
  602. dirty_rows_count_ = m_stTreeData.get_increase_dirty_row_count();
  603. return (0);
  604. }
  605. int TreeDataProcess::do_purge(DTCFlushRequest *flush_req, Node *p_node,
  606. unsigned int &affected_count)
  607. {
  608. int iRet;
  609. log4cplus_debug("do_purge start! ");
  610. iRet = do_flush(flush_req, p_node, affected_count);
  611. if (iRet != 0) {
  612. return (iRet);
  613. }
  614. rows_count_ = 0LL - m_stTreeData.total_rows();
  615. return 0;
  616. }
  617. int TreeDataProcess::destroy_data(Node *p_node)
  618. {
  619. if (p_node->vd_handle() == INVALID_HANDLE)
  620. return 0;
  621. TreeData treeData(p_mallocator_);
  622. treeData.do_attach(p_node->vd_handle());
  623. treeData.destory();
  624. p_node->vd_handle() = INVALID_HANDLE;
  625. return 0;
  626. }