brpc_load_balancer_unittest.cpp 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // brpc - A framework to host and access services throughout Baidu.
  18. // Date: Sun Jul 13 15:04:18 CST 2014
  19. #include <sys/types.h>
  20. #include <sys/socket.h>
  21. #include <map>
  22. #include <gtest/gtest.h>
  23. #include "bthread/bthread.h"
  24. #include "butil/gperftools_profiler.h"
  25. #include "butil/time.h"
  26. #include "butil/fast_rand.h"
  27. #include "butil/containers/doubly_buffered_data.h"
  28. #include "brpc/describable.h"
  29. #include "brpc/socket.h"
  30. #include "butil/strings/string_number_conversions.h"
  31. #include "brpc/excluded_servers.h"
  32. #include "brpc/policy/weighted_round_robin_load_balancer.h"
  33. #include "brpc/policy/round_robin_load_balancer.h"
  34. #include "brpc/policy/weighted_randomized_load_balancer.h"
  35. #include "brpc/policy/randomized_load_balancer.h"
  36. #include "brpc/policy/locality_aware_load_balancer.h"
  37. #include "brpc/policy/consistent_hashing_load_balancer.h"
  38. #include "brpc/policy/hasher.h"
  39. #include "brpc/errno.pb.h"
  40. #include "echo.pb.h"
  41. #include "brpc/channel.h"
  42. #include "brpc/controller.h"
  43. #include "brpc/server.h"
  44. namespace brpc {
  45. DECLARE_int32(health_check_interval);
  46. DECLARE_int64(detect_available_server_interval_ms);
  47. namespace policy {
  48. extern uint32_t CRCHash32(const char *key, size_t len);
  49. extern const char* GetHashName(uint32_t (*hasher)(const void* key, size_t len));
  50. }}
  51. namespace {
  52. void initialize_random() {
  53. srand(time(0));
  54. }
  55. pthread_once_t initialize_random_control = PTHREAD_ONCE_INIT;
  56. class LoadBalancerTest : public ::testing::Test{
  57. protected:
  58. LoadBalancerTest(){
  59. pthread_once(&initialize_random_control, initialize_random);
  60. };
  61. virtual ~LoadBalancerTest(){};
  62. virtual void SetUp() {
  63. };
  64. virtual void TearDown() {
  65. };
  66. };
  67. size_t TLS_ctor = 0;
  68. size_t TLS_dtor = 0;
  69. struct TLS {
  70. TLS() {
  71. ++TLS_ctor;
  72. }
  73. ~TLS() {
  74. ++TLS_dtor;
  75. }
  76. };
  77. struct Foo {
  78. Foo() : x(0) {}
  79. int x;
  80. };
  81. bool AddN(Foo& f, int n) {
  82. f.x += n;
  83. return true;
  84. }
  85. TEST_F(LoadBalancerTest, doubly_buffered_data) {
  86. const size_t old_TLS_ctor = TLS_ctor;
  87. const size_t old_TLS_dtor = TLS_dtor;
  88. {
  89. butil::DoublyBufferedData<Foo, TLS> d2;
  90. butil::DoublyBufferedData<Foo, TLS>::ScopedPtr ptr;
  91. d2.Read(&ptr);
  92. ASSERT_EQ(old_TLS_ctor + 1, TLS_ctor);
  93. }
  94. ASSERT_EQ(old_TLS_ctor + 1, TLS_ctor);
  95. ASSERT_EQ(old_TLS_dtor + 1, TLS_dtor);
  96. butil::DoublyBufferedData<Foo> d;
  97. {
  98. butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
  99. ASSERT_EQ(0, d.Read(&ptr));
  100. ASSERT_EQ(0, ptr->x);
  101. }
  102. {
  103. butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
  104. ASSERT_EQ(0, d.Read(&ptr));
  105. ASSERT_EQ(0, ptr->x);
  106. }
  107. d.Modify(AddN, 10);
  108. {
  109. butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
  110. ASSERT_EQ(0, d.Read(&ptr));
  111. ASSERT_EQ(10, ptr->x);
  112. }
  113. }
  114. typedef brpc::policy::LocalityAwareLoadBalancer LALB;
  115. static void ValidateWeightTree(
  116. std::vector<LALB::ServerInfo> & weight_tree) {
  117. std::vector<int64_t> weight_sum;
  118. weight_sum.resize(weight_tree.size());
  119. for (ssize_t i = weight_tree.size() - 1; i >= 0; --i) {
  120. const size_t left_child = i * 2 + 1;
  121. const size_t right_child = i * 2 + 2;
  122. weight_sum[i] = weight_tree[i].weight->volatile_value();
  123. if (left_child < weight_sum.size()) {
  124. weight_sum[i] += weight_sum[left_child];
  125. }
  126. if (right_child < weight_sum.size()) {
  127. weight_sum[i] += weight_sum[right_child];
  128. }
  129. }
  130. for (size_t i = 0; i < weight_tree.size(); ++i) {
  131. const int64_t left = weight_tree[i].left->load(butil::memory_order_relaxed);
  132. size_t left_child = i * 2 + 1;
  133. if (left_child < weight_tree.size()) {
  134. ASSERT_EQ(weight_sum[left_child], left) << "i=" << i;
  135. } else {
  136. ASSERT_EQ(0, left);
  137. }
  138. }
  139. }
  140. static void ValidateLALB(LALB& lalb, size_t N) {
  141. LALB::Servers* d = lalb._db_servers._data;
  142. for (size_t R = 0; R < 2; ++R) {
  143. ASSERT_EQ(d[R].weight_tree.size(), N);
  144. ASSERT_EQ(d[R].server_map.size(), N);
  145. }
  146. ASSERT_EQ(lalb._left_weights.size(), N);
  147. int64_t total = 0;
  148. for (size_t i = 0; i < N; ++i) {
  149. ASSERT_EQ(d[0].weight_tree[i].server_id, d[1].weight_tree[i].server_id);
  150. ASSERT_EQ(d[0].weight_tree[i].weight, d[1].weight_tree[i].weight);
  151. for (size_t R = 0; R < 2; ++R) {
  152. ASSERT_EQ((int64_t*)d[R].weight_tree[i].left, &lalb._left_weights[i]);
  153. size_t* pindex = d[R].server_map.seek(d[R].weight_tree[i].server_id);
  154. ASSERT_TRUE(pindex != NULL && *pindex == i);
  155. }
  156. total += d[0].weight_tree[i].weight->volatile_value();
  157. }
  158. ValidateWeightTree(d[0].weight_tree);
  159. ASSERT_EQ(total, lalb._total.load());
  160. }
  161. TEST_F(LoadBalancerTest, la_sanity) {
  162. LALB lalb;
  163. ASSERT_EQ(0, lalb._total.load());
  164. std::vector<brpc::ServerId> ids;
  165. const size_t N = 256;
  166. size_t cur_count = 0;
  167. for (int REP = 0; REP < 5; ++REP) {
  168. const size_t before_adding = cur_count;
  169. for (; cur_count < N; ++cur_count) {
  170. char addr[32];
  171. snprintf(addr, sizeof(addr), "192.168.1.%d:8080", (int)cur_count);
  172. butil::EndPoint dummy;
  173. ASSERT_EQ(0, str2endpoint(addr, &dummy));
  174. brpc::ServerId id(8888);
  175. brpc::SocketOptions options;
  176. options.remote_side = dummy;
  177. ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
  178. ids.push_back(id);
  179. ASSERT_TRUE(lalb.AddServer(id));
  180. }
  181. std::cout << "Added " << cur_count - before_adding << std::endl;
  182. ValidateLALB(lalb, cur_count);
  183. const size_t before_removal = cur_count;
  184. std::random_shuffle(ids.begin(), ids.end());
  185. for (size_t i = 0; i < N / 2; ++i) {
  186. const brpc::ServerId id = ids.back();
  187. ids.pop_back();
  188. --cur_count;
  189. ASSERT_TRUE(lalb.RemoveServer(id)) << "i=" << i;
  190. ASSERT_EQ(0, brpc::Socket::SetFailed(id.id));
  191. }
  192. std::cout << "Removed " << before_removal - cur_count << std::endl;
  193. ValidateLALB(lalb, cur_count);
  194. }
  195. for (size_t i = 0; i < ids.size(); ++i) {
  196. ASSERT_EQ(0, brpc::Socket::SetFailed(ids[i].id));
  197. }
  198. }
  199. typedef std::map<brpc::SocketId, int> CountMap;
  200. volatile bool global_stop = false;
  201. struct SelectArg {
  202. brpc::LoadBalancer *lb;
  203. uint32_t (*hash)(const void*, size_t);
  204. };
  205. void* select_server(void* arg) {
  206. SelectArg *sa = (SelectArg *)arg;
  207. brpc::LoadBalancer* c = sa->lb;
  208. brpc::SocketUniquePtr ptr;
  209. CountMap *selected_count = new CountMap;
  210. brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
  211. brpc::LoadBalancer::SelectOut out(&ptr);
  212. uint32_t rand_seed = rand();
  213. if (sa->hash) {
  214. uint32_t rd = ++rand_seed;
  215. in.has_request_code = true;
  216. in.request_code = sa->hash((const char *)&rd, sizeof(uint32_t));
  217. }
  218. int ret = 0;
  219. while (!global_stop && (ret = c->SelectServer(in, &out)) == 0) {
  220. if (sa->hash) {
  221. uint32_t rd = ++rand_seed;
  222. in.has_request_code = true;
  223. in.request_code = sa->hash((const char *)&rd, sizeof(uint32_t));
  224. }
  225. ++(*selected_count)[ptr->id()];
  226. }
  227. LOG_IF(INFO, ret != 0) << "select_server[" << pthread_self()
  228. << "] quits before of " << berror(ret);
  229. return selected_count;
  230. }
  231. brpc::SocketId recycled_sockets[1024];
  232. butil::atomic<size_t> nrecycle(0);
  233. class SaveRecycle : public brpc::SocketUser {
  234. void BeforeRecycle(brpc::Socket* s) {
  235. recycled_sockets[nrecycle.fetch_add(1, butil::memory_order_relaxed)] = s->id();
  236. delete this;
  237. }
  238. };
  239. TEST_F(LoadBalancerTest, update_while_selection) {
  240. for (size_t round = 0; round < 5; ++round) {
  241. brpc::LoadBalancer* lb = NULL;
  242. SelectArg sa = { NULL, NULL};
  243. bool is_lalb = false;
  244. if (round == 0) {
  245. lb = new brpc::policy::RoundRobinLoadBalancer;
  246. } else if (round == 1) {
  247. lb = new brpc::policy::RandomizedLoadBalancer;
  248. } else if (round == 2) {
  249. lb = new LALB;
  250. is_lalb = true;
  251. } else if (round == 3) {
  252. lb = new brpc::policy::WeightedRoundRobinLoadBalancer;
  253. } else {
  254. lb = new brpc::policy::ConsistentHashingLoadBalancer(brpc::policy::CONS_HASH_LB_MURMUR3);
  255. sa.hash = ::brpc::policy::MurmurHash32;
  256. }
  257. sa.lb = lb;
  258. // Accessing empty lb should result in error.
  259. brpc::SocketUniquePtr ptr;
  260. brpc::LoadBalancer::SelectIn in = { 0, false, true, 0, NULL };
  261. brpc::LoadBalancer::SelectOut out(&ptr);
  262. ASSERT_EQ(ENODATA, lb->SelectServer(in, &out));
  263. nrecycle = 0;
  264. global_stop = false;
  265. pthread_t th[8];
  266. std::vector<brpc::ServerId> ids;
  267. brpc::SocketId wrr_sid_logoff = -1;
  268. for (int i = 0; i < 256; ++i) {
  269. char addr[32];
  270. snprintf(addr, sizeof(addr), "192.%d.1.%d:8080", i, i);
  271. butil::EndPoint dummy;
  272. ASSERT_EQ(0, str2endpoint(addr, &dummy));
  273. brpc::ServerId id(8888);
  274. if (3 == round) {
  275. if (i < 255) {
  276. id.tag = "1";
  277. } else {
  278. id.tag = "200000000";
  279. }
  280. }
  281. brpc::SocketOptions options;
  282. options.remote_side = dummy;
  283. options.user = new SaveRecycle;
  284. ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
  285. ids.push_back(id);
  286. ASSERT_TRUE(lb->AddServer(id));
  287. if (round == 3 && i == 255) {
  288. wrr_sid_logoff = id.id;
  289. // In case of wrr, set 255th socket with huge weight logoff.
  290. brpc::SocketUniquePtr ptr;
  291. ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr));
  292. ptr->SetLogOff();
  293. }
  294. }
  295. std::cout << "Time " << butil::class_name_str(*lb) << " ..." << std::endl;
  296. butil::Timer tm;
  297. tm.start();
  298. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  299. ASSERT_EQ(0, pthread_create(&th[i], NULL, select_server, &sa));
  300. }
  301. std::vector<brpc::ServerId> removed;
  302. const size_t REP = 200;
  303. for (size_t k = 0; k < REP; ++k) {
  304. if (round != 3) {
  305. removed = ids;
  306. } else {
  307. removed.assign(ids.begin(), ids.begin() + 255);
  308. }
  309. std::random_shuffle(removed.begin(), removed.end());
  310. removed.pop_back();
  311. ASSERT_EQ(removed.size(), lb->RemoveServersInBatch(removed));
  312. ASSERT_EQ(removed.size(), lb->AddServersInBatch(removed));
  313. // // 1: Don't remove first server, otherwise select_server would quit.
  314. // for (size_t i = 1/*1*/; i < removed.size(); ++i) {
  315. // ASSERT_TRUE(lb->RemoveServer(removed[i]));
  316. // }
  317. // for (size_t i = 1; i < removed.size(); ++i) {
  318. // ASSERT_TRUE(lb->AddServer(removed[i]));
  319. // }
  320. if (is_lalb) {
  321. LALB* lalb = (LALB*)lb;
  322. ValidateLALB(*lalb, ids.size());
  323. ASSERT_GT(lalb->_total.load(), 0);
  324. }
  325. }
  326. global_stop = true;
  327. LOG(INFO) << "Stop all...";
  328. void* retval[ARRAY_SIZE(th)];
  329. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  330. ASSERT_EQ(0, pthread_join(th[i], &retval[i]));
  331. }
  332. tm.stop();
  333. CountMap total_count;
  334. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  335. CountMap* selected_count = (CountMap*)retval[i];
  336. size_t count = 0;
  337. for (CountMap::const_iterator it = selected_count->begin();
  338. it != selected_count->end(); ++it) {
  339. total_count[it->first] += it->second;
  340. count += it->second;
  341. }
  342. delete selected_count;
  343. std::cout << "thread " << i << " selected "
  344. << count * 1000000L / tm.u_elapsed() << " times/s"
  345. << std::endl;
  346. }
  347. size_t id_num = ids.size();
  348. if (round == 3) {
  349. // Do not include the logoff socket.
  350. id_num -= 1;
  351. }
  352. ASSERT_EQ(id_num, total_count.size());
  353. for (size_t i = 0; i < id_num; ++i) {
  354. ASSERT_NE(0, total_count[ids[i].id]) << "i=" << i;
  355. std::cout << i << "=" << total_count[ids[i].id] << " ";
  356. }
  357. std::cout << std::endl;
  358. for (size_t i = 0; i < id_num; ++i) {
  359. ASSERT_EQ(0, brpc::Socket::SetFailed(ids[i].id));
  360. }
  361. ASSERT_EQ(ids.size(), nrecycle);
  362. brpc::SocketId id = -1;
  363. for (size_t i = 0; i < ids.size(); ++i) {
  364. id = recycled_sockets[i];
  365. if (id != wrr_sid_logoff) {
  366. ASSERT_EQ(1UL, total_count.erase(id));
  367. } else {
  368. ASSERT_EQ(0UL, total_count.erase(id));
  369. }
  370. }
  371. delete lb;
  372. }
  373. }
  374. TEST_F(LoadBalancerTest, fairness) {
  375. for (size_t round = 0; round < 6; ++round) {
  376. brpc::LoadBalancer* lb = NULL;
  377. SelectArg sa = { NULL, NULL};
  378. if (round == 0) {
  379. lb = new brpc::policy::RoundRobinLoadBalancer;
  380. } else if (round == 1) {
  381. lb = new brpc::policy::RandomizedLoadBalancer;
  382. } else if (round == 2) {
  383. lb = new LALB;
  384. } else if (3 == round || 4 == round) {
  385. lb = new brpc::policy::WeightedRoundRobinLoadBalancer;
  386. } else {
  387. lb = new brpc::policy::ConsistentHashingLoadBalancer(brpc::policy::CONS_HASH_LB_MURMUR3);
  388. sa.hash = brpc::policy::MurmurHash32;
  389. }
  390. sa.lb = lb;
  391. std::string lb_name = butil::class_name_str(*lb);
  392. // Remove namespace
  393. size_t ns_pos = lb_name.find_last_of(':');
  394. if (ns_pos != std::string::npos) {
  395. lb_name = lb_name.substr(ns_pos + 1);
  396. }
  397. nrecycle = 0;
  398. global_stop = false;
  399. pthread_t th[8];
  400. std::vector<brpc::ServerId> ids;
  401. for (int i = 0; i < 256; ++i) {
  402. char addr[32];
  403. snprintf(addr, sizeof(addr), "192.168.1.%d:8080", i);
  404. butil::EndPoint dummy;
  405. ASSERT_EQ(0, str2endpoint(addr, &dummy));
  406. brpc::ServerId id(8888);
  407. if (3 == round) {
  408. id.tag = "100";
  409. } else if (4 == round) {
  410. if ( i % 50 == 0) {
  411. id.tag = std::to_string(i*2 + butil::fast_rand_less_than(40) + 80);
  412. } else {
  413. id.tag = std::to_string(butil::fast_rand_less_than(40) + 80);
  414. }
  415. }
  416. brpc::SocketOptions options;
  417. options.remote_side = dummy;
  418. options.user = new SaveRecycle;
  419. ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
  420. ids.push_back(id);
  421. lb->AddServer(id);
  422. }
  423. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  424. ASSERT_EQ(0, pthread_create(&th[i], NULL, select_server, &sa));
  425. }
  426. bthread_usleep(10000);
  427. ProfilerStart((lb_name + ".prof").c_str());
  428. bthread_usleep(300000);
  429. ProfilerStop();
  430. global_stop = true;
  431. CountMap total_count;
  432. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  433. void* retval;
  434. ASSERT_EQ(0, pthread_join(th[i], &retval));
  435. CountMap* selected_count = (CountMap*)retval;
  436. ASSERT_TRUE(selected_count);
  437. int first_count = 0;
  438. for (CountMap::const_iterator it = selected_count->begin();
  439. it != selected_count->end(); ++it) {
  440. if (round == 0) {
  441. if (first_count == 0) {
  442. first_count = it->second;
  443. } else {
  444. // Load is not ensured to be fair inside each thread
  445. // ASSERT_LE(abs(first_count - it->second), 1);
  446. }
  447. }
  448. total_count[it->first] += it->second;
  449. }
  450. delete selected_count;
  451. }
  452. ASSERT_EQ(ids.size(), total_count.size());
  453. size_t count_sum = 0;
  454. size_t count_squared_sum = 0;
  455. std::cout << lb_name << ':' << '\n';
  456. if (round != 3 && round !=4) {
  457. for (size_t i = 0; i < ids.size(); ++i) {
  458. size_t count = total_count[ids[i].id];
  459. ASSERT_NE(0ul, count) << "i=" << i;
  460. std::cout << i << '=' << count << ' ';
  461. count_sum += count;
  462. count_squared_sum += count * count;
  463. }
  464. std::cout << '\n'
  465. << ": average=" << count_sum/ids.size()
  466. << " deviation=" << sqrt(count_squared_sum * ids.size()
  467. - count_sum * count_sum) / ids.size() << std::endl;
  468. } else { // for weighted round robin load balancer
  469. std::cout << "configured weight: " << std::endl;
  470. std::ostringstream os;
  471. brpc::DescribeOptions opt;
  472. lb->Describe(os, opt);
  473. std::cout << os.str() << std::endl;
  474. double scaling_count_sum = 0.0;
  475. double scaling_count_squared_sum = 0.0;
  476. for (size_t i = 0; i < ids.size(); ++i) {
  477. size_t count = total_count[ids[i].id];
  478. ASSERT_NE(0ul, count) << "i=" << i;
  479. std::cout << i << '=' << count << ' ';
  480. double scaling_count = static_cast<double>(count) / std::stoi(ids[i].tag);
  481. scaling_count_sum += scaling_count;
  482. scaling_count_squared_sum += scaling_count * scaling_count;
  483. }
  484. std::cout << '\n'
  485. << ": scaling average=" << scaling_count_sum/ids.size()
  486. << " scaling deviation=" << sqrt(scaling_count_squared_sum * ids.size()
  487. - scaling_count_sum * scaling_count_sum) / ids.size() << std::endl;
  488. }
  489. for (size_t i = 0; i < ids.size(); ++i) {
  490. ASSERT_EQ(0, brpc::Socket::SetFailed(ids[i].id));
  491. }
  492. ASSERT_EQ(ids.size(), nrecycle);
  493. for (size_t i = 0; i < ids.size(); ++i) {
  494. ASSERT_EQ(1UL, total_count.erase(recycled_sockets[i]));
  495. }
  496. delete lb;
  497. }
  498. }
  499. TEST_F(LoadBalancerTest, consistent_hashing) {
  500. ::brpc::policy::HashFunc hashs[::brpc::policy::CONS_HASH_LB_LAST] = {
  501. ::brpc::policy::MurmurHash32,
  502. ::brpc::policy::MD5Hash32,
  503. ::brpc::policy::MD5Hash32
  504. // ::brpc::policy::CRCHash32 crc is a bad hash function in test
  505. };
  506. ::brpc::policy::ConsistentHashingLoadBalancerType hash_type[::brpc::policy::CONS_HASH_LB_LAST] = {
  507. ::brpc::policy::CONS_HASH_LB_MURMUR3,
  508. ::brpc::policy::CONS_HASH_LB_MD5,
  509. ::brpc::policy::CONS_HASH_LB_KETAMA
  510. };
  511. const char* servers[] = {
  512. "10.92.115.19:8833",
  513. "10.42.108.25:8833",
  514. "10.36.150.32:8833",
  515. "10.92.149.48:8833",
  516. "10.42.122.201:8833",
  517. };
  518. for (size_t round = 0; round < ARRAY_SIZE(hashs); ++round) {
  519. brpc::policy::ConsistentHashingLoadBalancer chlb(hash_type[round]);
  520. std::vector<brpc::ServerId> ids;
  521. std::vector<butil::EndPoint> addrs;
  522. for (int j = 0;j < 5; ++j)
  523. for (int i = 0; i < 5; ++i) {
  524. const char *addr = servers[i];
  525. //snprintf(addr, sizeof(addr), "192.168.1.%d:8080", i);
  526. butil::EndPoint dummy;
  527. ASSERT_EQ(0, str2endpoint(addr, &dummy));
  528. brpc::ServerId id(8888);
  529. brpc::SocketOptions options;
  530. options.remote_side = dummy;
  531. options.user = new SaveRecycle;
  532. ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
  533. ids.push_back(id);
  534. addrs.push_back(dummy);
  535. chlb.AddServer(id);
  536. }
  537. std::cout << chlb;
  538. for (int i = 0; i < 5; ++i) {
  539. std::vector<brpc::ServerId> empty;
  540. chlb.AddServersInBatch(empty);
  541. chlb.RemoveServersInBatch(empty);
  542. std::cout << chlb;
  543. }
  544. const size_t SELECT_TIMES = 1000000;
  545. std::map<butil::EndPoint, size_t> times;
  546. brpc::SocketUniquePtr ptr;
  547. brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
  548. ::brpc::LoadBalancer::SelectOut out(&ptr);
  549. for (size_t i = 0; i < SELECT_TIMES; ++i) {
  550. in.has_request_code = true;
  551. in.request_code = hashs[round]((const char *)&i, sizeof(i));
  552. chlb.SelectServer(in, &out);
  553. ++times[ptr->remote_side()];
  554. }
  555. std::map<butil::EndPoint, double> load_map;
  556. chlb.GetLoads(&load_map);
  557. ASSERT_EQ(times.size(), load_map.size());
  558. double load_sum = 0;;
  559. double load_sqr_sum = 0;
  560. for (size_t i = 0; i < addrs.size(); ++i) {
  561. double normalized_load =
  562. (double)times[addrs[i]] / SELECT_TIMES / load_map[addrs[i]];
  563. std::cout << i << '=' << normalized_load << ' ';
  564. load_sum += normalized_load;
  565. load_sqr_sum += normalized_load * normalized_load;
  566. }
  567. std::cout << '\n';
  568. std::cout << "average_normalized_load=" << load_sum / addrs.size()
  569. << " deviation="
  570. << sqrt(load_sqr_sum * addrs.size() - load_sum * load_sum) / addrs.size()
  571. << '\n';
  572. for (size_t i = 0; i < ids.size(); ++i) {
  573. ASSERT_EQ(0, brpc::Socket::SetFailed(ids[i].id));
  574. }
  575. }
  576. }
  577. TEST_F(LoadBalancerTest, weighted_round_robin) {
  578. const char* servers[] = {
  579. "10.92.115.19:8831",
  580. "10.42.108.25:8832",
  581. "10.36.150.32:8833",
  582. "10.36.150.32:8899",
  583. "10.92.149.48:8834",
  584. "10.42.122.201:8835",
  585. "10.42.122.202:8836"
  586. };
  587. std::string weight[] = {"3", "2", "7", "200000000", "1ab", "-1", "0"};
  588. std::map<butil::EndPoint, int> configed_weight;
  589. brpc::policy::WeightedRoundRobinLoadBalancer wrrlb;
  590. // Add server to selected list. The server with invalid weight will be skipped.
  591. for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
  592. const char *addr = servers[i];
  593. butil::EndPoint dummy;
  594. ASSERT_EQ(0, str2endpoint(addr, &dummy));
  595. brpc::ServerId id(8888);
  596. brpc::SocketOptions options;
  597. options.remote_side = dummy;
  598. options.user = new SaveRecycle;
  599. ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
  600. id.tag = weight[i];
  601. if (i == 3) {
  602. brpc::SocketUniquePtr ptr;
  603. ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr));
  604. ptr->SetLogOff();
  605. }
  606. if ( i < 4 ) {
  607. int weight_num = 0;
  608. ASSERT_TRUE(butil::StringToInt(weight[i], &weight_num));
  609. configed_weight[dummy] = weight_num;
  610. EXPECT_TRUE(wrrlb.AddServer(id));
  611. } else {
  612. EXPECT_FALSE(wrrlb.AddServer(id));
  613. }
  614. }
  615. // Select the best server according to weight configured.
  616. // There are 3 valid servers with weight 3, 2 and 7 respectively.
  617. // We run SelectServer for 12 times. The result number of each server seleted should be
  618. // consistent with weight configured.
  619. std::map<butil::EndPoint, size_t> select_result;
  620. brpc::SocketUniquePtr ptr;
  621. brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
  622. brpc::LoadBalancer::SelectOut out(&ptr);
  623. int total_weight = 12;
  624. std::vector<butil::EndPoint> select_servers;
  625. for (int i = 0; i != total_weight; ++i) {
  626. EXPECT_EQ(0, wrrlb.SelectServer(in, &out));
  627. select_servers.emplace_back(ptr->remote_side());
  628. ++select_result[ptr->remote_side()];
  629. }
  630. for (const auto& s : select_servers) {
  631. std::cout << "1=" << s << ", ";
  632. }
  633. std::cout << std::endl;
  634. // Check whether slected result is consistent with expected.
  635. EXPECT_EQ((size_t)3, select_result.size());
  636. for (const auto& result : select_result) {
  637. std::cout << result.first << " result=" << result.second
  638. << " configured=" << configed_weight[result.first] << std::endl;
  639. EXPECT_EQ(result.second, (size_t)configed_weight[result.first]);
  640. }
  641. }
  642. TEST_F(LoadBalancerTest, weighted_round_robin_no_valid_server) {
  643. const char* servers[] = {
  644. "10.92.115.19:8831",
  645. "10.42.108.25:8832",
  646. "10.36.150.32:8833"
  647. };
  648. std::string weight[] = {"200000000", "2", "600000"};
  649. std::map<butil::EndPoint, int> configed_weight;
  650. brpc::policy::WeightedRoundRobinLoadBalancer wrrlb;
  651. brpc::ExcludedServers* exclude = brpc::ExcludedServers::Create(3);
  652. for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
  653. const char *addr = servers[i];
  654. butil::EndPoint dummy;
  655. ASSERT_EQ(0, str2endpoint(addr, &dummy));
  656. brpc::ServerId id(8888);
  657. brpc::SocketOptions options;
  658. options.remote_side = dummy;
  659. options.user = new SaveRecycle;
  660. id.tag = weight[i];
  661. if (i < 2) {
  662. ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
  663. }
  664. EXPECT_TRUE(wrrlb.AddServer(id));
  665. if (i == 0) {
  666. exclude->Add(id.id);
  667. }
  668. if (i == 1) {
  669. brpc::SocketUniquePtr ptr;
  670. ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr));
  671. ptr->SetLogOff();
  672. }
  673. }
  674. // The first socket is excluded. The second socket is logfoff.
  675. // The third socket is invalid.
  676. brpc::SocketUniquePtr ptr;
  677. brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, exclude };
  678. brpc::LoadBalancer::SelectOut out(&ptr);
  679. EXPECT_EQ(EHOSTDOWN, wrrlb.SelectServer(in, &out));
  680. brpc::ExcludedServers::Destroy(exclude);
  681. }
  682. TEST_F(LoadBalancerTest, weighted_randomized) {
  683. const char* servers[] = {
  684. "10.92.115.19:8831",
  685. "10.42.108.25:8832",
  686. "10.36.150.31:8833",
  687. "10.36.150.32:8899",
  688. "10.92.149.48:8834",
  689. "10.42.122.201:8835",
  690. "10.42.122.202:8836"
  691. };
  692. std::string weight[] = {"3", "2", "5", "10", "1ab", "-1", "0"};
  693. std::map<butil::EndPoint, int> configed_weight;
  694. uint64_t configed_weight_sum = 0;
  695. brpc::policy::WeightedRandomizedLoadBalancer wrlb;
  696. size_t valid_weight_num = 4;
  697. // Add server to selected list. The server with invalid weight will be skipped.
  698. for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
  699. const char *addr = servers[i];
  700. butil::EndPoint dummy;
  701. ASSERT_EQ(0, str2endpoint(addr, &dummy));
  702. brpc::ServerId id(8888);
  703. brpc::SocketOptions options;
  704. options.remote_side = dummy;
  705. options.user = new SaveRecycle;
  706. ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
  707. id.tag = weight[i];
  708. if (i < valid_weight_num) {
  709. int weight_num = 0;
  710. ASSERT_TRUE(butil::StringToInt(weight[i], &weight_num));
  711. configed_weight[dummy] = weight_num;
  712. configed_weight_sum += weight_num;
  713. EXPECT_TRUE(wrlb.AddServer(id));
  714. } else {
  715. EXPECT_FALSE(wrlb.AddServer(id));
  716. }
  717. }
  718. // Select the best server according to weight configured.
  719. // There are 4 valid servers with weight 3, 2, 5 and 10 respectively.
  720. // We run SelectServer for multiple times. The result number of each server seleted should be
  721. // weight randomized with weight configured.
  722. std::map<butil::EndPoint, size_t> select_result;
  723. brpc::SocketUniquePtr ptr;
  724. brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
  725. brpc::LoadBalancer::SelectOut out(&ptr);
  726. int run_times = configed_weight_sum * 10;
  727. std::vector<butil::EndPoint> select_servers;
  728. for (int i = 0; i < run_times; ++i) {
  729. EXPECT_EQ(0, wrlb.SelectServer(in, &out));
  730. select_servers.emplace_back(ptr->remote_side());
  731. ++select_result[ptr->remote_side()];
  732. }
  733. for (const auto& server : select_servers) {
  734. std::cout << "weight randomized=" << server << ", ";
  735. }
  736. std::cout << std::endl;
  737. // Check whether selected result is weight with expected.
  738. EXPECT_EQ(valid_weight_num, select_result.size());
  739. std::cout << "configed_weight_sum=" << configed_weight_sum << " run_times=" << run_times << std::endl;
  740. for (const auto& result : select_result) {
  741. double actual_rate = result.second * 1.0 / run_times;
  742. double expect_rate = configed_weight[result.first] * 1.0 / configed_weight_sum;
  743. std::cout << result.first << " weight=" << configed_weight[result.first]
  744. << " select_times=" << result.second
  745. << " actual_rate=" << actual_rate << " expect_rate=" << expect_rate
  746. << " expect_rate/2=" << expect_rate/2 << " expect_rate*2=" << expect_rate*2
  747. << std::endl;
  748. // actual_rate >= expect_rate / 2
  749. ASSERT_GE(actual_rate, expect_rate / 2);
  750. // actual_rate <= expect_rate * 2
  751. ASSERT_LE(actual_rate, expect_rate * 2);
  752. }
  753. }
  754. TEST_F(LoadBalancerTest, health_check_no_valid_server) {
  755. const char* servers[] = {
  756. "10.92.115.19:8832",
  757. "10.42.122.201:8833",
  758. };
  759. std::vector<brpc::LoadBalancer*> lbs;
  760. lbs.push_back(new brpc::policy::RoundRobinLoadBalancer);
  761. lbs.push_back(new brpc::policy::RandomizedLoadBalancer);
  762. lbs.push_back(new brpc::policy::WeightedRoundRobinLoadBalancer);
  763. for (int i = 0; i < (int)lbs.size(); ++i) {
  764. brpc::LoadBalancer* lb = lbs[i];
  765. std::vector<brpc::ServerId> ids;
  766. for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
  767. butil::EndPoint dummy;
  768. ASSERT_EQ(0, str2endpoint(servers[i], &dummy));
  769. brpc::ServerId id(8888);
  770. brpc::SocketOptions options;
  771. options.remote_side = dummy;
  772. ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
  773. id.tag = "50";
  774. ids.push_back(id);
  775. lb->AddServer(id);
  776. }
  777. // Without setting anything, the lb should work fine
  778. for (int i = 0; i < 4; ++i) {
  779. brpc::SocketUniquePtr ptr;
  780. brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
  781. brpc::LoadBalancer::SelectOut out(&ptr);
  782. ASSERT_EQ(0, lb->SelectServer(in, &out));
  783. }
  784. brpc::SocketUniquePtr ptr;
  785. ASSERT_EQ(0, brpc::Socket::Address(ids[0].id, &ptr));
  786. ptr->_ninflight_app_health_check.store(1, butil::memory_order_relaxed);
  787. for (int i = 0; i < 4; ++i) {
  788. brpc::SocketUniquePtr ptr;
  789. brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
  790. brpc::LoadBalancer::SelectOut out(&ptr);
  791. ASSERT_EQ(0, lb->SelectServer(in, &out));
  792. // After putting server[0] into health check state, the only choice is servers[1]
  793. ASSERT_EQ(ptr->remote_side().port, 8833);
  794. }
  795. ASSERT_EQ(0, brpc::Socket::Address(ids[1].id, &ptr));
  796. ptr->_ninflight_app_health_check.store(1, butil::memory_order_relaxed);
  797. for (int i = 0; i < 4; ++i) {
  798. brpc::SocketUniquePtr ptr;
  799. brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
  800. brpc::LoadBalancer::SelectOut out(&ptr);
  801. // There is no server available
  802. ASSERT_EQ(EHOSTDOWN, lb->SelectServer(in, &out));
  803. }
  804. ASSERT_EQ(0, brpc::Socket::Address(ids[0].id, &ptr));
  805. ptr->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
  806. ASSERT_EQ(0, brpc::Socket::Address(ids[1].id, &ptr));
  807. ptr->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
  808. // After reset health check state, the lb should work fine
  809. bool get_server1 = false;
  810. bool get_server2 = false;
  811. for (int i = 0; i < 20; ++i) {
  812. brpc::SocketUniquePtr ptr;
  813. brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
  814. brpc::LoadBalancer::SelectOut out(&ptr);
  815. ASSERT_EQ(0, lb->SelectServer(in, &out));
  816. if (ptr->remote_side().port == 8832) {
  817. get_server1 = true;
  818. } else {
  819. get_server2 = true;
  820. }
  821. }
  822. ASSERT_TRUE(get_server1 && get_server2);
  823. delete lb;
  824. }
  825. }
  826. TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
  827. const char* servers[] = {
  828. "10.92.115.19:8832",
  829. "10.42.122.201:8833",
  830. };
  831. brpc::LoadBalancer* lb = NULL;
  832. int rand = butil::fast_rand_less_than(2);
  833. if (rand == 0) {
  834. brpc::policy::RandomizedLoadBalancer rlb;
  835. lb = rlb.New("min_working_instances=2 hold_seconds=2");
  836. } else if (rand == 1) {
  837. brpc::policy::RoundRobinLoadBalancer rrlb;
  838. lb = rrlb.New("min_working_instances=2 hold_seconds=2");
  839. }
  840. brpc::SocketUniquePtr ptr[2];
  841. for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
  842. butil::EndPoint dummy;
  843. ASSERT_EQ(0, str2endpoint(servers[i], &dummy));
  844. brpc::SocketOptions options;
  845. options.remote_side = dummy;
  846. brpc::ServerId id(8888);
  847. id.tag = "50";
  848. ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
  849. ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr[i]));
  850. lb->AddServer(id);
  851. }
  852. brpc::SocketUniquePtr sptr;
  853. brpc::LoadBalancer::SelectIn in = { 0, false, true, 0u, NULL };
  854. brpc::LoadBalancer::SelectOut out(&sptr);
  855. ASSERT_EQ(0, lb->SelectServer(in, &out));
  856. ptr[0]->SetFailed();
  857. ptr[1]->SetFailed();
  858. ASSERT_EQ(EHOSTDOWN, lb->SelectServer(in, &out));
  859. // should reject all request since there is no available server
  860. for (int i = 0; i < 10; ++i) {
  861. ASSERT_EQ(brpc::EREJECT, lb->SelectServer(in, &out));
  862. }
  863. {
  864. brpc::SocketUniquePtr dummy_ptr;
  865. ASSERT_EQ(1, brpc::Socket::AddressFailedAsWell(ptr[0]->id(), &dummy_ptr));
  866. dummy_ptr->Revive();
  867. }
  868. bthread_usleep(brpc::FLAGS_detect_available_server_interval_ms * 1000);
  869. // After one server is revived, the reject rate should be 50%
  870. int num_ereject = 0;
  871. int num_ok = 0;
  872. for (int i = 0; i < 100; ++i) {
  873. int rc = lb->SelectServer(in, &out);
  874. if (rc == brpc::EREJECT) {
  875. num_ereject++;
  876. } else if (rc == 0) {
  877. num_ok++;
  878. } else {
  879. ASSERT_TRUE(false);
  880. }
  881. }
  882. ASSERT_TRUE(abs(num_ereject - num_ok) < 30);
  883. bthread_usleep((2000 /* hold_seconds */ + 10) * 1000);
  884. // After enough waiting time, traffic should be sent to all available servers.
  885. for (int i = 0; i < 10; ++i) {
  886. ASSERT_EQ(0, lb->SelectServer(in, &out));
  887. }
  888. }
  889. class EchoServiceImpl : public test::EchoService {
  890. public:
  891. EchoServiceImpl()
  892. : _num_request(0) {}
  893. virtual ~EchoServiceImpl() {}
  894. virtual void Echo(google::protobuf::RpcController* cntl_base,
  895. const test::EchoRequest* req,
  896. test::EchoResponse* res,
  897. google::protobuf::Closure* done) {
  898. //brpc::Controller* cntl =
  899. // static_cast<brpc::Controller*>(cntl_base);
  900. brpc::ClosureGuard done_guard(done);
  901. int p = _num_request.fetch_add(1, butil::memory_order_relaxed);
  902. // concurrency in normal case is 50
  903. if (p < 70) {
  904. bthread_usleep(100 * 1000);
  905. _num_request.fetch_sub(1, butil::memory_order_relaxed);
  906. res->set_message("OK");
  907. } else {
  908. _num_request.fetch_sub(1, butil::memory_order_relaxed);
  909. bthread_usleep(1000 * 1000);
  910. }
  911. return;
  912. }
  913. butil::atomic<int> _num_request;
  914. };
  915. butil::atomic<int32_t> num_failed(0);
  916. butil::atomic<int32_t> num_reject(0);
  917. class Done : public google::protobuf::Closure {
  918. public:
  919. void Run() {
  920. if (cntl.Failed()) {
  921. num_failed.fetch_add(1, butil::memory_order_relaxed);
  922. if (cntl.ErrorCode() == brpc::EREJECT) {
  923. num_reject.fetch_add(1, butil::memory_order_relaxed);
  924. }
  925. }
  926. delete this;
  927. }
  928. brpc::Controller cntl;
  929. test::EchoRequest req;
  930. test::EchoResponse res;
  931. };
  932. TEST_F(LoadBalancerTest, invalid_lb_params) {
  933. const char* lb_algo[] = { "random:mi_working_instances=2 hold_seconds=2",
  934. "rr:min_working_instances=2 hold_secon=2" };
  935. brpc::Channel channel;
  936. brpc::ChannelOptions options;
  937. options.protocol = "http";
  938. ASSERT_EQ(channel.Init("list://127.0.0.1:7777 50, 127.0.0.1:7778 50",
  939. lb_algo[butil::fast_rand_less_than(ARRAY_SIZE(lb_algo))],
  940. &options), -1);
  941. }
  942. TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) {
  943. GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_size", "20");
  944. GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_error_percent", "30");
  945. // Those two lines force the interval of first hc to 3s
  946. GFLAGS_NS::SetCommandLineOption("circuit_breaker_max_isolation_duration_ms", "3000");
  947. GFLAGS_NS::SetCommandLineOption("circuit_breaker_min_isolation_duration_ms", "3000");
  948. const char* lb_algo[] = { "random:min_working_instances=2 hold_seconds=2",
  949. "rr:min_working_instances=2 hold_seconds=2" };
  950. brpc::Channel channel;
  951. brpc::ChannelOptions options;
  952. options.protocol = "http";
  953. options.timeout_ms = 300;
  954. options.enable_circuit_breaker = true;
  955. // Disable retry to make health check happen one by one
  956. options.max_retry = 0;
  957. ASSERT_EQ(channel.Init("list://127.0.0.1:7777 50, 127.0.0.1:7778 50",
  958. lb_algo[butil::fast_rand_less_than(ARRAY_SIZE(lb_algo))],
  959. &options), 0);
  960. test::EchoRequest req;
  961. req.set_message("123");
  962. test::EchoResponse res;
  963. test::EchoService_Stub stub(&channel);
  964. {
  965. // trigger one server to health check
  966. brpc::Controller cntl;
  967. stub.Echo(&cntl, &req, &res, NULL);
  968. }
  969. // This sleep make one server revived 700ms earlier than the other server, which
  970. // can make the server down again if no request limit policy are applied here.
  971. bthread_usleep(700000);
  972. {
  973. // trigger the other server to health check
  974. brpc::Controller cntl;
  975. stub.Echo(&cntl, &req, &res, NULL);
  976. }
  977. butil::EndPoint point(butil::IP_ANY, 7777);
  978. brpc::Server server;
  979. EchoServiceImpl service;
  980. ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
  981. ASSERT_EQ(0, server.Start(point, NULL));
  982. butil::EndPoint point2(butil::IP_ANY, 7778);
  983. brpc::Server server2;
  984. EchoServiceImpl service2;
  985. ASSERT_EQ(0, server2.AddService(&service2, brpc::SERVER_DOESNT_OWN_SERVICE));
  986. ASSERT_EQ(0, server2.Start(point2, NULL));
  987. int64_t start_ms = butil::gettimeofday_ms();
  988. while ((butil::gettimeofday_ms() - start_ms) < 3500) {
  989. Done* done = new Done;
  990. done->req.set_message("123");
  991. stub.Echo(&done->cntl, &done->req, &done->res, done);
  992. bthread_usleep(1000);
  993. }
  994. // All error code should be equal to EREJECT, except when the situation
  995. // all servers are down, the very first call that trigger recovering would
  996. // fail with EHOSTDOWN instead of EREJECT. This is where the number 1 comes
  997. // in following ASSERT.
  998. ASSERT_TRUE(num_failed.load(butil::memory_order_relaxed) -
  999. num_reject.load(butil::memory_order_relaxed) == 1);
  1000. num_failed.store(0, butil::memory_order_relaxed);
  1001. // should recover now
  1002. for (int i = 0; i < 1000; ++i) {
  1003. Done* done = new Done;
  1004. done->req.set_message("123");
  1005. stub.Echo(&done->cntl, &done->req, &done->res, done);
  1006. bthread_usleep(1000);
  1007. }
  1008. bthread_usleep(500000 /* sleep longer than timeout of channel */);
  1009. ASSERT_EQ(0, num_failed.load(butil::memory_order_relaxed));
  1010. }
  1011. } //namespace