weighted_randomized_load_balancer.cpp 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <algorithm>
  18. #include "butil/fast_rand.h"
  19. #include "brpc/socket.h"
  20. #include "brpc/policy/weighted_randomized_load_balancer.h"
  21. #include "butil/strings/string_number_conversions.h"
  22. namespace brpc {
  23. namespace policy {
  24. static bool server_compare(const WeightedRandomizedLoadBalancer::Server& lhs, const WeightedRandomizedLoadBalancer::Server& rhs) {
  25. return (lhs.current_weight_sum < rhs.current_weight_sum);
  26. }
  27. bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) {
  28. if (bg.server_list.capacity() < 128) {
  29. bg.server_list.reserve(128);
  30. }
  31. uint32_t weight = 0;
  32. if (butil::StringToUint(id.tag, &weight) &&
  33. weight > 0) {
  34. bool insert_server =
  35. bg.server_map.emplace(id.id, bg.server_list.size()).second;
  36. if (insert_server) {
  37. uint64_t current_weight_sum = bg.weight_sum + weight;
  38. bg.server_list.emplace_back(id.id, weight, current_weight_sum);
  39. bg.weight_sum = current_weight_sum;
  40. return true;
  41. }
  42. } else {
  43. LOG(ERROR) << "Invalid weight is set: " << id.tag;
  44. }
  45. return false;
  46. }
  47. bool WeightedRandomizedLoadBalancer::Remove(Servers& bg, const ServerId& id) {
  48. typedef std::map<SocketId, size_t>::iterator MapIter_t;
  49. MapIter_t iter = bg.server_map.find(id.id);
  50. if (iter != bg.server_map.end()) {
  51. size_t index = iter->second;
  52. Server remove_server = bg.server_list[index];
  53. uint32_t weight_diff = bg.server_list.back().weight - remove_server.weight;
  54. bg.weight_sum -= remove_server.weight;
  55. bg.server_list[index] = bg.server_list.back();
  56. bg.server_list[index].current_weight_sum = remove_server.current_weight_sum + weight_diff;
  57. bg.server_map[bg.server_list[index].id] = index;
  58. bg.server_list.pop_back();
  59. bg.server_map.erase(iter);
  60. size_t n = bg.server_list.size();
  61. for (index++; index < n; index++) {
  62. bg.server_list[index].current_weight_sum += weight_diff;
  63. }
  64. return true;
  65. }
  66. return false;
  67. }
  68. size_t WeightedRandomizedLoadBalancer::BatchAdd(
  69. Servers& bg, const std::vector<ServerId>& servers) {
  70. size_t count = 0;
  71. for (size_t i = 0; i < servers.size(); ++i) {
  72. count += !!Add(bg, servers[i]);
  73. }
  74. return count;
  75. }
  76. size_t WeightedRandomizedLoadBalancer::BatchRemove(
  77. Servers& bg, const std::vector<ServerId>& servers) {
  78. size_t count = 0;
  79. for (size_t i = 0; i < servers.size(); ++i) {
  80. count += !!Remove(bg, servers[i]);
  81. }
  82. return count;
  83. }
  84. bool WeightedRandomizedLoadBalancer::AddServer(const ServerId& id) {
  85. return _db_servers.Modify(Add, id);
  86. }
  87. bool WeightedRandomizedLoadBalancer::RemoveServer(const ServerId& id) {
  88. return _db_servers.Modify(Remove, id);
  89. }
  90. size_t WeightedRandomizedLoadBalancer::AddServersInBatch(
  91. const std::vector<ServerId>& servers) {
  92. return _db_servers.Modify(BatchAdd, servers);
  93. }
  94. size_t WeightedRandomizedLoadBalancer::RemoveServersInBatch(
  95. const std::vector<ServerId>& servers) {
  96. return _db_servers.Modify(BatchRemove, servers);
  97. }
  98. int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
  99. butil::DoublyBufferedData<Servers>::ScopedPtr s;
  100. if (_db_servers.Read(&s) != 0) {
  101. return ENOMEM;
  102. }
  103. size_t n = s->server_list.size();
  104. if (n == 0) {
  105. return ENODATA;
  106. }
  107. uint64_t weight_sum = s->weight_sum;
  108. for (size_t i = 0; i < n; ++i) {
  109. uint64_t random_weight = butil::fast_rand_less_than(weight_sum);
  110. const Server random_server(0, 0, random_weight);
  111. const auto& server = std::lower_bound(s->server_list.begin(), s->server_list.end(), random_server, server_compare);
  112. const SocketId id = server->id;
  113. if (((i + 1) == n // always take last chance
  114. || !ExcludedServers::IsExcluded(in.excluded, id))
  115. && Socket::Address(id, out->ptr) == 0
  116. && (*out->ptr)->IsAvailable()) {
  117. // We found an available server
  118. return 0;
  119. }
  120. }
  121. // After we traversed the whole server list, there is still no
  122. // available server
  123. return EHOSTDOWN;
  124. }
  125. LoadBalancer* WeightedRandomizedLoadBalancer::New(
  126. const butil::StringPiece&) const {
  127. return new (std::nothrow) WeightedRandomizedLoadBalancer;
  128. }
  129. void WeightedRandomizedLoadBalancer::Destroy() {
  130. delete this;
  131. }
  132. void WeightedRandomizedLoadBalancer::Describe(
  133. std::ostream &os, const DescribeOptions& options) {
  134. if (!options.verbose) {
  135. os << "wr";
  136. return;
  137. }
  138. os << "WeightedRandomized{";
  139. butil::DoublyBufferedData<Servers>::ScopedPtr s;
  140. if (_db_servers.Read(&s) != 0) {
  141. os << "fail to read _db_servers";
  142. } else {
  143. os << "n=" << s->server_list.size() << ':';
  144. for (const auto& server : s->server_list) {
  145. os << ' ' << server.id << '(' << server.weight << ')';
  146. }
  147. }
  148. os << '}';
  149. }
  150. } // namespace policy
  151. } // namespace brpc