partition_channel.h 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #ifndef BRPC_PARTITION_CHANNEL_H
  18. #define BRPC_PARTITION_CHANNEL_H
  19. // To brpc developers: This is a header included by user, don't depend
  20. // on internal structures, use opaque pointers instead.
  21. #include "brpc/parallel_channel.h"
  22. #include "brpc/selective_channel.h" // For DynamicPartitionChannel
  23. namespace brpc {
  24. class NamingServiceThread;
  25. class PartitionChannelBase;
  26. // Representing a partition kind.
  27. struct Partition {
  28. // Index of the partition kind, counting from 0.
  29. int index;
  30. // Number of partition kinds, a partition kind may have more than one
  31. // instances.
  32. int num_partition_kinds;
  33. };
  34. // Parse partition from a string tag which is often associated with servers
  35. // in NamingServices.
  36. class PartitionParser {
  37. public:
  38. virtual ~PartitionParser() {}
  39. // Implement this method to get partition `out' from string `tag'.
  40. // Returns true on success.
  41. virtual bool ParseFromTag(const std::string& tag, Partition* out) = 0;
  42. };
  43. // For customizing PartitionChannel.
  44. struct PartitionChannelOptions : public ChannelOptions {
  45. // Constructed with default values.
  46. PartitionChannelOptions();
  47. // Make RPC call stop soon (without waiting for the timeout) when failed
  48. // sub calls reached this number.
  49. // Default: number of sub channels, which means the RPC to ParallChannel
  50. // will not be canceled until all sub calls failed.
  51. int fail_limit;
  52. // Check comments on ParallelChannel.AddChannel in parallel_channel.h
  53. // Sub channels in PartitionChannel share the same mapper and merger.
  54. butil::intrusive_ptr<CallMapper> call_mapper;
  55. butil::intrusive_ptr<ResponseMerger> response_merger;
  56. };
  57. // PartitionChannel is a specialized ParallelChannel whose sub channels are
  58. // built from a NamingService which specifies partitioning information in
  59. // tags. This channel eases access to partitioned servers.
  60. class PartitionChannel : public ChannelBase {
  61. public:
  62. PartitionChannel();
  63. ~PartitionChannel();
  64. // Initialize this PartitionChannel with `num_partition_kinds' sub channels
  65. // sending requests to different partitions listed in `naming_service_url'.
  66. // `partition_parser' parses partition from tags associated with servers.
  67. // When this method succeeds, `partition_parser' is owned by this channel,
  68. // otherwise `partition_parser' is unmodified and can be used for other
  69. // usages.
  70. // For example:
  71. // num_partition_kinds = 3
  72. // partition_parser = parse N/M as Partition{index=N, num_partition_kinds=M}
  73. // naming_service = s1(tag=1/3) s2(tag=2/3) s3(tag=0/3) s4(tag=1/4) s5(tag=2/3)
  74. // load_balancer = rr
  75. // 3 sub channels(c0,c1,c2) will be created:
  76. // - c0 sends requests to s3 because the tag=0/3 means s3 is the first
  77. // partition kind in 3 kinds.
  78. // - c1 sends requests to s1 because the tag=1/3 means s1 is the second
  79. // partition kind in 3 kinds. s4(tag=1/4) is ignored because number of
  80. // partition kinds does not match.
  81. // - c2 sends requests to s2 and s5 because the tags=2/3 means they're
  82. // both the third partition kind in 3 kinds. s2 and s5 will be load-
  83. // balanced with "rr" algorithm.
  84. // / c0 -> s3 (rr)
  85. // request -> PartitionChannel -- c1 -> s1 (rr)
  86. // \ c2 -> s2, s5 (rr)
  87. int Init(int num_partition_kinds,
  88. PartitionParser* partition_parser,
  89. const char* naming_service_url,
  90. const char* load_balancer_name,
  91. const PartitionChannelOptions* options);
  92. // Access sub channels corresponding to partitions in parallel.
  93. void CallMethod(const google::protobuf::MethodDescriptor* method,
  94. google::protobuf::RpcController* controller,
  95. const google::protobuf::Message* request,
  96. google::protobuf::Message* response,
  97. google::protobuf::Closure* done);
  98. int partition_count() const;
  99. private:
  100. bool initialized() const { return _parser != NULL; }
  101. int CheckHealth();
  102. PartitionChannelBase* _pchan;
  103. butil::intrusive_ptr<NamingServiceThread> _nsthread_ptr;
  104. PartitionParser* _parser;
  105. };
  106. // As the name implies, this combo channel discovers differently partitioned
  107. // servers and builds sub PartitionChannels on-the-fly for different groups
  108. // of servers. When multiple partitioning methods co-exist, traffic is
  109. // splitted based on capacities, namely # of servers in groups. The main
  110. // purpose of this channel is to transit from one partitioning method to
  111. // another smoothly. For example, with proper deployment, servers can be
  112. // changed from M-partitions to N-partitions losslessly without changing the
  113. // client code.
  114. class DynamicPartitionChannel : public ChannelBase {
  115. public:
  116. DynamicPartitionChannel();
  117. ~DynamicPartitionChannel();
  118. // Unlike PartitionChannel, DynamicPartitionChannel does not need
  119. // `num_partition_kinds'. It discovers and groups differently partitioned
  120. // servers automatically.
  121. int Init(PartitionParser* partition_parser,
  122. const char* naming_service_url,
  123. const char* load_balancer_name,
  124. const PartitionChannelOptions* options);
  125. // Access partitions according to their capacities.
  126. void CallMethod(const google::protobuf::MethodDescriptor* method,
  127. google::protobuf::RpcController* controller,
  128. const google::protobuf::Message* request,
  129. google::protobuf::Message* response,
  130. google::protobuf::Closure* done);
  131. private:
  132. bool initialized() const { return _parser != NULL; }
  133. int CheckHealth() {
  134. return static_cast<ChannelBase*>(&_schan)->CheckHealth();
  135. }
  136. class Partitioner;
  137. SelectiveChannel _schan;
  138. Partitioner* _partitioner;
  139. butil::intrusive_ptr<NamingServiceThread> _nsthread_ptr;
  140. PartitionParser* _parser;
  141. };
  142. } // namespace brpc
  143. #endif // BRPC_PARTITION_CHANNEL_H