remote_dtc_ask_answer_chain.cc 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "connector/connector_group.h"
  17. #include "stat_dtc_def.h"
  18. #include "remote_dtc_ask_answer_chain.h"
  19. RemoteDtcAskAnswerChain::RemoteDtcAskAnswerChain(PollerBase *owner,
  20. int clientPerGroup)
  21. : JobAskInterface<DTCJobOperation>(owner), m_recv_timer_list(0),
  22. m_conn_timer_list(0), m_retry_timer_list(0),
  23. m_client_per_group(clientPerGroup)
  24. {
  25. }
  26. RemoteDtcAskAnswerChain::~RemoteDtcAskAnswerChain()
  27. {
  28. for (HelperMapType::iterator iter = m_groups.begin();
  29. iter != m_groups.end(); ++iter) {
  30. delete iter->second.group;
  31. }
  32. m_groups.clear();
  33. }
  34. void RemoteDtcAskAnswerChain::job_ask_procedure(DTCJobOperation *t)
  35. {
  36. log4cplus_debug("enter job_ask_procedure");
  37. log4cplus_debug("t->remote_addr: %s", t->remote_addr());
  38. HelperMapType::iterator iter = m_groups.find(t->remote_addr());
  39. if (iter == m_groups.end()) {
  40. ConnectorGroup *g = new ConnectorGroup(
  41. t->remote_addr(), /* sock path */
  42. t->remote_addr(), /* name */
  43. m_client_per_group, /* helper client count */
  44. m_client_per_group * 32 /* queue size */,
  45. DTC_FORWARD_USEC_ALL);
  46. g->set_timer_handler(m_recv_timer_list, m_conn_timer_list,
  47. m_retry_timer_list);
  48. g->do_attach(owner, &m_task_queue_allocator);
  49. helper_group hg = { g, 0 };
  50. m_groups[t->remote_addr()] = hg;
  51. iter = m_groups.find(t->remote_addr());
  52. }
  53. t->push_reply_dispatcher(this);
  54. iter->second.group->job_ask_procedure(t);
  55. iter->second.used = 1;
  56. log4cplus_debug("leave job_ask_procedure");
  57. }
  58. void RemoteDtcAskAnswerChain::job_answer_procedure(DTCJobOperation *t)
  59. {
  60. if (t->result_code() == 0) {
  61. log4cplus_debug(
  62. "reply from remote dtc success,append result start ");
  63. if (t->result) {
  64. t->prepare_result();
  65. int iRet = t->pass_all_result(t->result);
  66. if (iRet < 0) {
  67. log4cplus_info("job append_result error: %d",
  68. iRet);
  69. t->set_error(iRet, "RemoteDtcAskAnswerChain",
  70. "append_result() error");
  71. t->turn_around_job_answer();
  72. return;
  73. }
  74. }
  75. t->turn_around_job_answer();
  76. return;
  77. } else {
  78. log4cplus_debug("reply from remote dtc error:%d",
  79. t->result_code());
  80. t->turn_around_job_answer();
  81. return;
  82. }
  83. }
  84. void RemoteDtcAskAnswerChain::set_timer_handler(TimerList *recv,
  85. TimerList *conn,
  86. TimerList *retry)
  87. {
  88. m_recv_timer_list = recv;
  89. m_conn_timer_list = conn;
  90. m_retry_timer_list = retry;
  91. attach_timer(m_retry_timer_list);
  92. }
  93. void RemoteDtcAskAnswerChain::job_timer_procedure()
  94. {
  95. log4cplus_debug("enter timer procedure");
  96. for (HelperMapType::iterator i = m_groups.begin();
  97. i != m_groups.end();) {
  98. if (i->second.used == 0) {
  99. delete i->second.group;
  100. m_groups.erase(i++);
  101. } else {
  102. i->second.used = 0;
  103. ++i;
  104. }
  105. }
  106. log4cplus_debug("leave timer procedure");
  107. }