channel.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <inttypes.h>
  18. #include <google/protobuf/descriptor.h>
  19. #include <gflags/gflags.h>
  20. #include "butil/time.h" // milliseconds_from_now
  21. #include "butil/logging.h"
  22. #include "butil/third_party/murmurhash3/murmurhash3.h"
  23. #include "butil/strings/string_util.h"
  24. #include "bthread/unstable.h" // bthread_timer_add
  25. #include "brpc/socket_map.h" // SocketMapInsert
  26. #include "brpc/compress.h"
  27. #include "brpc/global.h"
  28. #include "brpc/span.h"
  29. #include "brpc/details/load_balancer_with_naming.h"
  30. #include "brpc/controller.h"
  31. #include "brpc/channel.h"
  32. #include "brpc/details/usercode_backup_pool.h" // TooManyUserCode
  33. #include "brpc/policy/esp_authenticator.h"
  34. namespace brpc {
  35. DECLARE_bool(enable_rpcz);
  36. DECLARE_bool(usercode_in_pthread);
  37. ChannelOptions::ChannelOptions()
  38. : connect_timeout_ms(200)
  39. , timeout_ms(500)
  40. , backup_request_ms(-1)
  41. , max_retry(3)
  42. , enable_circuit_breaker(false)
  43. , protocol(PROTOCOL_BAIDU_STD)
  44. , connection_type(CONNECTION_TYPE_UNKNOWN)
  45. , succeed_without_server(true)
  46. , log_succeed_without_server(true)
  47. , auth(NULL)
  48. , retry_policy(NULL)
  49. , ns_filter(NULL)
  50. {}
  51. ChannelSSLOptions* ChannelOptions::mutable_ssl_options() {
  52. if (!_ssl_options) {
  53. _ssl_options.reset(new ChannelSSLOptions);
  54. }
  55. return _ssl_options.get();
  56. }
  57. static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) {
  58. if (opt.auth == NULL &&
  59. !opt.has_ssl_options() &&
  60. opt.connection_group.empty()) {
  61. // Returning zeroized result by default is more intuitive for users.
  62. return ChannelSignature();
  63. }
  64. uint32_t seed = 0;
  65. std::string buf;
  66. buf.reserve(1024);
  67. butil::MurmurHash3_x64_128_Context mm_ctx;
  68. do {
  69. buf.clear();
  70. butil::MurmurHash3_x64_128_Init(&mm_ctx, seed);
  71. if (!opt.connection_group.empty()) {
  72. buf.append("|conng=");
  73. buf.append(opt.connection_group);
  74. }
  75. if (opt.auth) {
  76. buf.append("|auth=");
  77. buf.append((char*)&opt.auth, sizeof(opt.auth));
  78. }
  79. if (opt.has_ssl_options()) {
  80. const ChannelSSLOptions& ssl = opt.ssl_options();
  81. buf.push_back('|');
  82. buf.append(ssl.ciphers);
  83. buf.push_back('|');
  84. buf.append(ssl.protocols);
  85. buf.push_back('|');
  86. buf.append(ssl.sni_name);
  87. const VerifyOptions& verify = ssl.verify;
  88. buf.push_back('|');
  89. buf.append((char*)&verify.verify_depth, sizeof(verify.verify_depth));
  90. buf.push_back('|');
  91. buf.append(verify.ca_file_path);
  92. } else {
  93. // All disabled ChannelSSLOptions are the same
  94. }
  95. butil::MurmurHash3_x64_128_Update(&mm_ctx, buf.data(), buf.size());
  96. buf.clear();
  97. if (opt.has_ssl_options()) {
  98. const CertInfo& cert = opt.ssl_options().client_cert;
  99. if (!cert.certificate.empty()) {
  100. // Certificate may be too long (PEM string) to fit into `buf'
  101. butil::MurmurHash3_x64_128_Update(
  102. &mm_ctx, cert.certificate.data(), cert.certificate.size());
  103. butil::MurmurHash3_x64_128_Update(
  104. &mm_ctx, cert.private_key.data(), cert.private_key.size());
  105. }
  106. }
  107. // sni_filters has no effect in ChannelSSLOptions
  108. ChannelSignature result;
  109. butil::MurmurHash3_x64_128_Final(result.data, &mm_ctx);
  110. if (result != ChannelSignature()) {
  111. // the empty result is reserved for default case and cannot
  112. // be used, increment the seed and retry.
  113. return result;
  114. }
  115. ++seed;
  116. } while (true);
  117. }
  118. Channel::Channel(ProfilerLinker)
  119. : _server_id(INVALID_SOCKET_ID)
  120. , _serialize_request(NULL)
  121. , _pack_request(NULL)
  122. , _get_method_name(NULL)
  123. , _preferred_index(-1) {
  124. }
  125. Channel::~Channel() {
  126. if (_server_id != INVALID_SOCKET_ID) {
  127. const ChannelSignature sig = ComputeChannelSignature(_options);
  128. SocketMapRemove(SocketMapKey(_server_address, sig));
  129. }
  130. }
  131. int Channel::InitChannelOptions(const ChannelOptions* options) {
  132. if (options) { // Override default options if user provided one.
  133. _options = *options;
  134. }
  135. const Protocol* protocol = FindProtocol(_options.protocol);
  136. if (NULL == protocol || !protocol->support_client()) {
  137. LOG(ERROR) << "Channel does not support the protocol";
  138. return -1;
  139. }
  140. _serialize_request = protocol->serialize_request;
  141. _pack_request = protocol->pack_request;
  142. _get_method_name = protocol->get_method_name;
  143. // Check connection_type
  144. if (_options.connection_type == CONNECTION_TYPE_UNKNOWN) {
  145. // Save has_error which will be overriden in later assignments to
  146. // connection_type.
  147. const bool has_error = _options.connection_type.has_error();
  148. if (protocol->supported_connection_type & CONNECTION_TYPE_SINGLE) {
  149. _options.connection_type = CONNECTION_TYPE_SINGLE;
  150. } else if (protocol->supported_connection_type & CONNECTION_TYPE_POOLED) {
  151. _options.connection_type = CONNECTION_TYPE_POOLED;
  152. } else {
  153. _options.connection_type = CONNECTION_TYPE_SHORT;
  154. }
  155. if (has_error) {
  156. LOG(ERROR) << "Channel=" << this << " chose connection_type="
  157. << _options.connection_type.name() << " for protocol="
  158. << _options.protocol.name();
  159. }
  160. } else {
  161. if (!(_options.connection_type & protocol->supported_connection_type)) {
  162. LOG(ERROR) << protocol->name << " does not support connection_type="
  163. << ConnectionTypeToString(_options.connection_type);
  164. return -1;
  165. }
  166. }
  167. _preferred_index = get_client_side_messenger()->FindProtocolIndex(_options.protocol);
  168. if (_preferred_index < 0) {
  169. LOG(ERROR) << "Fail to get index for protocol="
  170. << _options.protocol.name();
  171. return -1;
  172. }
  173. if (_options.protocol == PROTOCOL_ESP) {
  174. if (_options.auth == NULL) {
  175. _options.auth = policy::global_esp_authenticator();
  176. }
  177. }
  178. // Normalize connection_group
  179. std::string& cg = _options.connection_group;
  180. if (!cg.empty() && (::isspace(cg.front()) || ::isspace(cg.back()))) {
  181. butil::TrimWhitespace(cg, butil::TRIM_ALL, &cg);
  182. }
  183. return 0;
  184. }
  185. int Channel::Init(const char* server_addr_and_port,
  186. const ChannelOptions* options) {
  187. GlobalInitializeOrDie();
  188. butil::EndPoint point;
  189. const AdaptiveProtocolType& ptype = (options ? options->protocol : _options.protocol);
  190. const Protocol* protocol = FindProtocol(ptype);
  191. if (protocol == NULL || !protocol->support_client()) {
  192. LOG(ERROR) << "Channel does not support the protocol";
  193. return -1;
  194. }
  195. if (protocol->parse_server_address != NULL) {
  196. if (!protocol->parse_server_address(&point, server_addr_and_port)) {
  197. LOG(ERROR) << "Fail to parse address=`" << server_addr_and_port << '\'';
  198. return -1;
  199. }
  200. } else {
  201. if (str2endpoint(server_addr_and_port, &point) != 0 &&
  202. hostname2endpoint(server_addr_and_port, &point) != 0) {
  203. // Many users called the wrong Init(). Print some log to save
  204. // our troubleshooting time.
  205. if (strstr(server_addr_and_port, "://")) {
  206. LOG(ERROR) << "Invalid address=`" << server_addr_and_port
  207. << "'. Use Init(naming_service_name, "
  208. "load_balancer_name, options) instead.";
  209. } else {
  210. LOG(ERROR) << "Invalid address=`" << server_addr_and_port << '\'';
  211. }
  212. return -1;
  213. }
  214. }
  215. return InitSingle(point, server_addr_and_port, options);
  216. }
  217. int Channel::Init(const char* server_addr, int port,
  218. const ChannelOptions* options) {
  219. GlobalInitializeOrDie();
  220. butil::EndPoint point;
  221. const AdaptiveProtocolType& ptype = (options ? options->protocol : _options.protocol);
  222. const Protocol* protocol = FindProtocol(ptype);
  223. if (protocol == NULL || !protocol->support_client()) {
  224. LOG(ERROR) << "Channel does not support the protocol";
  225. return -1;
  226. }
  227. if (protocol->parse_server_address != NULL) {
  228. if (!protocol->parse_server_address(&point, server_addr)) {
  229. LOG(ERROR) << "Fail to parse address=`" << server_addr << '\'';
  230. return -1;
  231. }
  232. point.port = port;
  233. } else {
  234. if (str2endpoint(server_addr, port, &point) != 0 &&
  235. hostname2endpoint(server_addr, port, &point) != 0) {
  236. LOG(ERROR) << "Invalid address=`" << server_addr << '\'';
  237. return -1;
  238. }
  239. }
  240. return InitSingle(point, server_addr, options);
  241. }
  242. static int CreateSocketSSLContext(const ChannelOptions& options,
  243. std::shared_ptr<SocketSSLContext>* ssl_ctx) {
  244. if (options.has_ssl_options()) {
  245. SSL_CTX* raw_ctx = CreateClientSSLContext(options.ssl_options());
  246. if (!raw_ctx) {
  247. LOG(ERROR) << "Fail to CreateClientSSLContext";
  248. return -1;
  249. }
  250. *ssl_ctx = std::make_shared<SocketSSLContext>();
  251. (*ssl_ctx)->raw_ctx = raw_ctx;
  252. (*ssl_ctx)->sni_name = options.ssl_options().sni_name;
  253. } else {
  254. (*ssl_ctx) = NULL;
  255. }
  256. return 0;
  257. }
  258. int Channel::Init(butil::EndPoint server_addr_and_port,
  259. const ChannelOptions* options) {
  260. return InitSingle(server_addr_and_port, "", options);
  261. }
  262. int Channel::InitSingle(const butil::EndPoint& server_addr_and_port,
  263. const char* raw_server_address,
  264. const ChannelOptions* options) {
  265. GlobalInitializeOrDie();
  266. if (InitChannelOptions(options) != 0) {
  267. return -1;
  268. }
  269. if (_options.protocol == brpc::PROTOCOL_HTTP &&
  270. ::strncmp(raw_server_address, "https://", 8) == 0) {
  271. if (_options.mutable_ssl_options()->sni_name.empty()) {
  272. ParseURL(raw_server_address,
  273. NULL, &_options.mutable_ssl_options()->sni_name, NULL);
  274. }
  275. }
  276. const int port = server_addr_and_port.port;
  277. if (port < 0 || port > 65535) {
  278. LOG(ERROR) << "Invalid port=" << port;
  279. return -1;
  280. }
  281. _server_address = server_addr_and_port;
  282. const ChannelSignature sig = ComputeChannelSignature(_options);
  283. std::shared_ptr<SocketSSLContext> ssl_ctx;
  284. if (CreateSocketSSLContext(_options, &ssl_ctx) != 0) {
  285. return -1;
  286. }
  287. if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig),
  288. &_server_id, ssl_ctx) != 0) {
  289. LOG(ERROR) << "Fail to insert into SocketMap";
  290. return -1;
  291. }
  292. return 0;
  293. }
  294. int Channel::Init(const char* ns_url,
  295. const char* lb_name,
  296. const ChannelOptions* options) {
  297. if (lb_name == NULL || *lb_name == '\0') {
  298. // Treat ns_url as server_addr_and_port
  299. return Init(ns_url, options);
  300. }
  301. GlobalInitializeOrDie();
  302. if (InitChannelOptions(options) != 0) {
  303. return -1;
  304. }
  305. if (_options.protocol == brpc::PROTOCOL_HTTP &&
  306. ::strncmp(ns_url, "https://", 8) == 0) {
  307. if (_options.mutable_ssl_options()->sni_name.empty()) {
  308. ParseURL(ns_url,
  309. NULL, &_options.mutable_ssl_options()->sni_name, NULL);
  310. }
  311. }
  312. LoadBalancerWithNaming* lb = new (std::nothrow) LoadBalancerWithNaming;
  313. if (NULL == lb) {
  314. LOG(FATAL) << "Fail to new LoadBalancerWithNaming";
  315. return -1;
  316. }
  317. GetNamingServiceThreadOptions ns_opt;
  318. ns_opt.succeed_without_server = _options.succeed_without_server;
  319. ns_opt.log_succeed_without_server = _options.log_succeed_without_server;
  320. ns_opt.channel_signature = ComputeChannelSignature(_options);
  321. if (CreateSocketSSLContext(_options, &ns_opt.ssl_ctx) != 0) {
  322. return -1;
  323. }
  324. if (lb->Init(ns_url, lb_name, _options.ns_filter, &ns_opt) != 0) {
  325. LOG(ERROR) << "Fail to initialize LoadBalancerWithNaming";
  326. delete lb;
  327. return -1;
  328. }
  329. _lb.reset(lb);
  330. return 0;
  331. }
  332. static void HandleTimeout(void* arg) {
  333. bthread_id_t correlation_id = { (uint64_t)arg };
  334. bthread_id_error(correlation_id, ERPCTIMEDOUT);
  335. }
  336. static void HandleBackupRequest(void* arg) {
  337. bthread_id_t correlation_id = { (uint64_t)arg };
  338. bthread_id_error(correlation_id, EBACKUPREQUEST);
  339. }
  340. void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
  341. google::protobuf::RpcController* controller_base,
  342. const google::protobuf::Message* request,
  343. google::protobuf::Message* response,
  344. google::protobuf::Closure* done) {
  345. const int64_t start_send_real_us = butil::gettimeofday_us();
  346. Controller* cntl = static_cast<Controller*>(controller_base);
  347. cntl->OnRPCBegin(start_send_real_us);
  348. // Override max_retry first to reset the range of correlation_id
  349. if (cntl->max_retry() == UNSET_MAGIC_NUM) {
  350. cntl->set_max_retry(_options.max_retry);
  351. }
  352. if (cntl->max_retry() < 0) {
  353. // this is important because #max_retry decides #versions allocated
  354. // in correlation_id. negative max_retry causes undefined behavior.
  355. cntl->set_max_retry(0);
  356. }
  357. // HTTP needs this field to be set before any SetFailed()
  358. cntl->_request_protocol = _options.protocol;
  359. if (_options.protocol.has_param()) {
  360. CHECK(cntl->protocol_param().empty());
  361. cntl->protocol_param() = _options.protocol.param();
  362. }
  363. cntl->_preferred_index = _preferred_index;
  364. cntl->_retry_policy = _options.retry_policy;
  365. if (_options.enable_circuit_breaker) {
  366. cntl->add_flag(Controller::FLAGS_ENABLED_CIRCUIT_BREAKER);
  367. }
  368. const CallId correlation_id = cntl->call_id();
  369. const int rc = bthread_id_lock_and_reset_range(
  370. correlation_id, NULL, 2 + cntl->max_retry());
  371. if (rc != 0) {
  372. CHECK_EQ(EINVAL, rc);
  373. if (!cntl->FailedInline()) {
  374. cntl->SetFailed(EINVAL, "Fail to lock call_id=%" PRId64,
  375. correlation_id.value);
  376. }
  377. LOG_IF(ERROR, cntl->is_used_by_rpc())
  378. << "Controller=" << cntl << " was used by another RPC before. "
  379. "Did you forget to Reset() it before reuse?";
  380. // Have to run done in-place. If the done runs in another thread,
  381. // Join() on this RPC is no-op and probably ends earlier than running
  382. // the callback and releases resources used in the callback.
  383. // Since this branch is only entered by wrongly-used RPC, the
  384. // potentially introduced deadlock(caused by locking RPC and done with
  385. // the same non-recursive lock) is acceptable and removable by fixing
  386. // user's code.
  387. if (done) {
  388. done->Run();
  389. }
  390. return;
  391. }
  392. cntl->set_used_by_rpc();
  393. if (cntl->_sender == NULL && IsTraceable(Span::tls_parent())) {
  394. const int64_t start_send_us = butil::cpuwide_time_us();
  395. const std::string* method_name = NULL;
  396. if (_get_method_name) {
  397. method_name = &_get_method_name(method, cntl);
  398. } else if (method) {
  399. method_name = &method->full_name();
  400. } else {
  401. const static std::string NULL_METHOD_STR = "null-method";
  402. method_name = &NULL_METHOD_STR;
  403. }
  404. Span* span = Span::CreateClientSpan(
  405. *method_name, start_send_real_us - start_send_us);
  406. span->set_log_id(cntl->log_id());
  407. span->set_base_cid(correlation_id);
  408. span->set_protocol(_options.protocol);
  409. span->set_start_send_us(start_send_us);
  410. cntl->_span = span;
  411. }
  412. // Override some options if they haven't been set by Controller
  413. if (cntl->timeout_ms() == UNSET_MAGIC_NUM) {
  414. cntl->set_timeout_ms(_options.timeout_ms);
  415. }
  416. // Since connection is shared extensively amongst channels and RPC,
  417. // overriding connect_timeout_ms does not make sense, just use the
  418. // one in ChannelOptions
  419. cntl->_connect_timeout_ms = _options.connect_timeout_ms;
  420. if (cntl->backup_request_ms() == UNSET_MAGIC_NUM) {
  421. cntl->set_backup_request_ms(_options.backup_request_ms);
  422. }
  423. if (cntl->connection_type() == CONNECTION_TYPE_UNKNOWN) {
  424. cntl->set_connection_type(_options.connection_type);
  425. }
  426. cntl->_response = response;
  427. cntl->_done = done;
  428. cntl->_pack_request = _pack_request;
  429. cntl->_method = method;
  430. cntl->_auth = _options.auth;
  431. if (SingleServer()) {
  432. cntl->_single_server_id = _server_id;
  433. cntl->_remote_side = _server_address;
  434. }
  435. // Share the lb with controller.
  436. cntl->_lb = _lb;
  437. // Ensure that serialize_request is done before pack_request in all
  438. // possible executions, including:
  439. // HandleSendFailed => OnVersionedRPCReturned => IssueRPC(pack_request)
  440. _serialize_request(&cntl->_request_buf, cntl, request);
  441. if (cntl->FailedInline()) {
  442. // Handle failures caused by serialize_request, and these error_codes
  443. // should be excluded from the retry_policy.
  444. return cntl->HandleSendFailed();
  445. }
  446. if (FLAGS_usercode_in_pthread &&
  447. done != NULL &&
  448. TooManyUserCode()) {
  449. cntl->SetFailed(ELIMIT, "Too many user code to run when "
  450. "-usercode_in_pthread is on");
  451. return cntl->HandleSendFailed();
  452. }
  453. if (cntl->_request_stream != INVALID_STREAM_ID) {
  454. // Currently we cannot handle retry and backup request correctly
  455. cntl->set_max_retry(0);
  456. cntl->set_backup_request_ms(-1);
  457. }
  458. if (cntl->backup_request_ms() >= 0 &&
  459. (cntl->backup_request_ms() < cntl->timeout_ms() ||
  460. cntl->timeout_ms() < 0)) {
  461. // Setup timer for backup request. When it occurs, we'll setup a
  462. // timer of timeout_ms before sending backup request.
  463. // _deadline_us is for truncating _connect_timeout_ms and resetting
  464. // timer when EBACKUPREQUEST occurs.
  465. if (cntl->timeout_ms() < 0) {
  466. cntl->_deadline_us = -1;
  467. } else {
  468. cntl->_deadline_us = cntl->timeout_ms() * 1000L + start_send_real_us;
  469. }
  470. const int rc = bthread_timer_add(
  471. &cntl->_timeout_id,
  472. butil::microseconds_to_timespec(
  473. cntl->backup_request_ms() * 1000L + start_send_real_us),
  474. HandleBackupRequest, (void*)correlation_id.value);
  475. if (BAIDU_UNLIKELY(rc != 0)) {
  476. cntl->SetFailed(rc, "Fail to add timer for backup request");
  477. return cntl->HandleSendFailed();
  478. }
  479. } else if (cntl->timeout_ms() >= 0) {
  480. // Setup timer for RPC timetout
  481. // _deadline_us is for truncating _connect_timeout_ms
  482. cntl->_deadline_us = cntl->timeout_ms() * 1000L + start_send_real_us;
  483. const int rc = bthread_timer_add(
  484. &cntl->_timeout_id,
  485. butil::microseconds_to_timespec(cntl->_deadline_us),
  486. HandleTimeout, (void*)correlation_id.value);
  487. if (BAIDU_UNLIKELY(rc != 0)) {
  488. cntl->SetFailed(rc, "Fail to add timer for timeout");
  489. return cntl->HandleSendFailed();
  490. }
  491. } else {
  492. cntl->_deadline_us = -1;
  493. }
  494. cntl->IssueRPC(start_send_real_us);
  495. if (done == NULL) {
  496. // MUST wait for response when sending synchronous RPC. It will
  497. // be woken up by callback when RPC finishes (succeeds or still
  498. // fails after retry)
  499. Join(correlation_id);
  500. if (cntl->_span) {
  501. cntl->SubmitSpan();
  502. }
  503. cntl->OnRPCEnd(butil::gettimeofday_us());
  504. }
  505. }
  506. void Channel::Describe(std::ostream& os, const DescribeOptions& opt) const {
  507. os << "Channel[";
  508. if (SingleServer()) {
  509. os << _server_address;
  510. } else {
  511. _lb->Describe(os, opt);
  512. }
  513. os << "]";
  514. }
  515. int Channel::Weight() {
  516. return (_lb ? _lb->Weight() : 0);
  517. }
  518. int Channel::CheckHealth() {
  519. if (_lb == NULL) {
  520. SocketUniquePtr ptr;
  521. if (Socket::Address(_server_id, &ptr) == 0 && ptr->IsAvailable()) {
  522. return 0;
  523. }
  524. return -1;
  525. } else {
  526. SocketUniquePtr tmp_sock;
  527. LoadBalancer::SelectIn sel_in = { 0, false, false, 0, NULL };
  528. LoadBalancer::SelectOut sel_out(&tmp_sock);
  529. return _lb->SelectServer(sel_in, &sel_out);
  530. }
  531. }
  532. } // namespace brpc