data_connector_ask_chain.h 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #ifndef __DATA_CONNECTOR_ASK_CHAIN__
  17. #define __DATA_CONNECTOR_ASK_CHAIN__
  18. #include "config/dbconfig.h"
  19. #include "poll/poller_base.h"
  20. #include "request/request_base.h"
  21. #include "../daemons/daemon_listener.h"
  22. #include <stat_dtc.h>
  23. class ConnectorGroup;
  24. class TimerList;
  25. class KeyHelper;
  26. enum { MASTER_READ_GROUP_COLUMN = 0,
  27. MASTER_WRITE_GROUP_COLUMN = 1,
  28. MASTER_COMMIT_GROUP_COLUMN = 2,
  29. SLAVE_READ_GROUP_COLUMN = 3,
  30. };
  31. class DataConnectorAskChain : public JobAskInterface<DTCJobOperation> {
  32. public:
  33. DataConnectorAskChain();
  34. ~DataConnectorAskChain();
  35. void BindHbLogDispatcher(JobAskInterface<DTCJobOperation>* p_task_dispatcher) {
  36. p_task_dispatcher_ = p_task_dispatcher;
  37. };
  38. int load_config(struct DbConfig *cfg, int ks, int idx = 0);
  39. int renew_config(struct DbConfig *cfg);
  40. void collect_notify_helper_reload_config(DTCJobOperation *job);
  41. int build_master_group_mapping(int idx = 0);
  42. int build_helper_object(int idx = 0);
  43. int notify_watch_dog(StartHelperPara *para);
  44. int Cleanup();
  45. int Cleanup2();
  46. int start_listener(DTCJobOperation *job);
  47. bool is_commit_full(DTCJobOperation *job);
  48. bool Dispatch(DTCJobOperation *t);
  49. int do_attach(PollerBase *thread, int idx = 0);
  50. void set_timer_handler(TimerList *recv, TimerList *conn,
  51. TimerList *retry, int idx = 0);
  52. int disable_commit_group(int idx = 0);
  53. DbConfig *get_db_config(DTCJobOperation *job);
  54. int migrate_db(DTCJobOperation *t);
  55. int switch_db(DTCJobOperation *t);
  56. int has_dummy_machine(void) const
  57. {
  58. return hasDummyMachine;
  59. }
  60. private:
  61. virtual void job_ask_procedure(DTCJobOperation *);
  62. ConnectorGroup *select_group(DTCJobOperation *t);
  63. void stat_helper_group_queue_count(ConnectorGroup **group,
  64. unsigned group_count);
  65. void stat_helper_group_cur_max_queue_count(int iRequestType);
  66. int get_queue_cur_max_count(int iColumn);
  67. private:
  68. struct DbConfig *dbConfig[2];
  69. int hasDummyMachine;
  70. ConnectorGroup **groups[2];
  71. #define GMAP_NONE -1
  72. #define GMAP_DUMMY -2
  73. #define GROUP_DUMMY ((ConnectorGroup *)-2)
  74. #define GROUP_READONLY ((ConnectorGroup *)-3)
  75. short *groupMap[2];
  76. JobAnswerInterface<DTCJobOperation> *guardReply;
  77. LinkQueue<DTCJobOperation *>::allocator task_queue_allocator;
  78. TimerList *recvList;
  79. TimerList *connList;
  80. TimerList *retryList;
  81. std::vector<int> newDb;
  82. std::map<int, int> new2old;
  83. int tableNo;
  84. JobAskInterface<DTCJobOperation>* p_task_dispatcher_;
  85. public:
  86. KeyHelper *guard;
  87. private:
  88. StatCounter statQueueCurCount; /*所有组当前总的队列大小*/
  89. StatCounter statQueueMaxCount; /*所有组配置总的队列大小*/
  90. StatCounter statReadQueueCurMaxCount; /*所有机器所有主读组当前最大的队列大小*/
  91. StatCounter statWriteQueueMaxCount; /*所有机器所有写组当前最大的队列大小*/
  92. StatCounter statCommitQueueCurMaxCount; /*所有机器所有提交组当前最大的队列大小*/
  93. StatCounter statSlaveReadQueueMaxCount; /*所有机器所有备读组当前最大的队列大小*/
  94. };
  95. #endif