parallel_channel.cpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include "bthread/bthread.h" // bthread_id_xx
  18. #include "bthread/unstable.h" // bthread_timer_add
  19. #include "butil/atomicops.h"
  20. #include "butil/time.h"
  21. #include "butil/macros.h"
  22. #include "brpc/details/controller_private_accessor.h"
  23. #include "brpc/parallel_channel.h"
  24. namespace brpc {
  25. ParallelChannelOptions::ParallelChannelOptions()
  26. : timeout_ms(500)
  27. , fail_limit(-1) {
  28. }
  29. DECLARE_bool(usercode_in_pthread);
  30. // Not see difference when memory is cached.
  31. #ifdef BRPC_CACHE_PCHAN_MEM
  32. struct Memory {
  33. int size;
  34. void* ptr;
  35. };
  36. static __thread Memory tls_cached_pchan_mem = { 0, NULL };
  37. #endif
  38. class ParallelChannelDone : public google::protobuf::Closure {
  39. private:
  40. ParallelChannelDone(int fail_limit, int ndone, int nchan, int memsize,
  41. Controller* cntl, google::protobuf::Closure* user_done)
  42. : _fail_limit(fail_limit)
  43. , _ndone(ndone)
  44. , _nchan(nchan)
  45. , _memsize(memsize)
  46. , _current_fail(0)
  47. , _current_done(0)
  48. , _cntl(cntl)
  49. , _user_done(user_done)
  50. , _callmethod_bthread(INVALID_BTHREAD)
  51. , _callmethod_pthread(0) {
  52. }
  53. ~ParallelChannelDone() { }
  54. public:
  55. class SubDone : public google::protobuf::Closure {
  56. public:
  57. SubDone() : shared_data(NULL) {
  58. }
  59. ~SubDone() {
  60. // Can't delete request/response in ~SubCall because the
  61. // object is copyable.
  62. if (ap.flags & DELETE_REQUEST) {
  63. delete ap.request;
  64. }
  65. if (ap.flags & DELETE_RESPONSE) {
  66. delete ap.response;
  67. }
  68. }
  69. void Run() {
  70. shared_data->OnSubDoneRun(this);
  71. }
  72. ParallelChannelDone* shared_data;
  73. butil::intrusive_ptr<ResponseMerger> merger;
  74. SubCall ap;
  75. Controller cntl;
  76. };
  77. static ParallelChannelDone* Create(
  78. int fail_limit, int ndone, const SubCall* aps, int nchan,
  79. Controller* cntl, google::protobuf::Closure* user_done) {
  80. // We need to create the object in this way because _sub_done is
  81. // dynamically allocated.
  82. // The memory layout:
  83. // ParallelChannelDone
  84. // SubDone1 `
  85. // SubDone2 - ndone
  86. // ... /
  87. // SubDoneIndex1 `
  88. // SubDoneIndex2 - nchan, existing when nchan != ndone
  89. // ... /
  90. size_t req_size = offsetof(ParallelChannelDone, _sub_done) +
  91. sizeof(SubDone) * ndone;
  92. if (ndone != nchan) {
  93. req_size += sizeof(int) * nchan;
  94. }
  95. void* mem = NULL;
  96. int memsize = 0;
  97. #ifdef BRPC_CACHE_PCHAN_MEM
  98. Memory pchan_mem = tls_cached_pchan_mem;
  99. if (pchan_mem.size >= req_size) { // use tls if it's big enough
  100. mem = pchan_mem.ptr;
  101. memsize = pchan_mem.size;
  102. pchan_mem.size = 0;
  103. pchan_mem.ptr = NULL;
  104. tls_cached_pchan_mem = pchan_mem;
  105. } else {
  106. mem = malloc(req_size);
  107. memsize = req_size;
  108. if (BAIDU_UNLIKELY(NULL == mem)) {
  109. return NULL;
  110. }
  111. }
  112. #else
  113. mem = malloc(req_size);
  114. memsize = req_size;
  115. if (BAIDU_UNLIKELY(NULL == mem)) {
  116. return NULL;
  117. }
  118. #endif
  119. ParallelChannelDone* d = new (mem) ParallelChannelDone(
  120. fail_limit, ndone, nchan, memsize, cntl, user_done);
  121. // Apply client settings of _cntl to controllers of sub calls, except
  122. // timeout. If we let sub channel do their timeout separately, when
  123. // timeout happens, we get ETOOMANYFAILS rather than ERPCTIMEDOUT.
  124. Controller::ClientSettings settings;
  125. cntl->SaveClientSettings(&settings);
  126. settings.timeout_ms = -1;
  127. for (int i = 0; i < ndone; ++i) {
  128. new (d->sub_done(i)) SubDone;
  129. d->sub_done(i)->cntl.ApplyClientSettings(settings);
  130. d->sub_done(i)->cntl.allow_done_to_run_in_place();
  131. }
  132. // Setup the map for finding sub_done of i-th sub_channel
  133. if (ndone != nchan) {
  134. int done_index = 0;
  135. for (int i = 0; i < nchan; ++i) {
  136. if (aps[i].is_skip()) {
  137. d->sub_done_map(i) = -1;
  138. } else {
  139. d->sub_done_map(i) = done_index++;
  140. }
  141. }
  142. CHECK_EQ(ndone, done_index);
  143. }
  144. return d;
  145. }
  146. static void Destroy(ParallelChannelDone* d) {
  147. if (d != NULL) {
  148. for (int i = 0; i < d->_ndone; ++i) {
  149. d->sub_done(i)->~SubDone();
  150. }
  151. #ifdef BRPC_CACHE_PCHAN_MEM
  152. Memory pchan_mem = tls_cached_pchan_mem;
  153. if (pchan_mem.size != 0) {
  154. // free the memory if tls already has sth.
  155. d->~ParallelChannelDone();
  156. free(d);
  157. } else {
  158. pchan_mem.size = d->_memsize;
  159. pchan_mem.ptr = d;
  160. d->~ParallelChannelDone();
  161. tls_cached_pchan_mem = pchan_mem;
  162. }
  163. #else
  164. d->~ParallelChannelDone();
  165. free(d);
  166. #endif
  167. }
  168. }
  169. void Run() {
  170. const int ec = _cntl->ErrorCode();
  171. if (ec == EPCHANFINISH) {
  172. // all sub calls finished. Clear the error and we'll set
  173. // successfulness of _cntl in OnSubDoneRun().
  174. _cntl->_error_code = 0;
  175. _cntl->_error_text.clear();
  176. } else {
  177. CHECK(ECANCELED == ec || ERPCTIMEDOUT == ec) << "ec=" << ec;
  178. }
  179. OnSubDoneRun(NULL);
  180. }
  181. static void* RunOnComplete(void* arg) {
  182. static_cast<ParallelChannelDone*>(arg)->OnComplete();
  183. return NULL;
  184. }
  185. // For otherwhere to know if they're in the same thread.
  186. void SaveThreadInfoOfCallsite() {
  187. _callmethod_bthread = bthread_self();
  188. if (_callmethod_bthread == INVALID_BTHREAD) {
  189. _callmethod_pthread = pthread_self();
  190. }
  191. }
  192. bool IsSameThreadAsCallMethod() const {
  193. if (_callmethod_bthread != INVALID_BTHREAD) {
  194. return bthread_self() == _callmethod_bthread;
  195. }
  196. return pthread_self() == _callmethod_pthread;
  197. }
  198. void OnSubDoneRun(SubDone* fin) {
  199. if (fin != NULL) {
  200. // [ called from SubDone::Run() ]
  201. // Count failed sub calls, if fail_limit is reached, cancel others.
  202. if (fin->cntl.FailedInline() &&
  203. _current_fail.fetch_add(1, butil::memory_order_relaxed) + 1
  204. == _fail_limit) {
  205. for (int i = 0; i < _ndone; ++i) {
  206. SubDone* sd = sub_done(i);
  207. if (fin != sd) {
  208. bthread_id_error(sd->cntl.call_id(), ECANCELED);
  209. }
  210. }
  211. }
  212. // NOTE: Don't access any member after the fetch_add because
  213. // another thread may already go down and Destroy()-ed this object.
  214. const uint32_t saved_ndone = _ndone;
  215. const CallId saved_cid = _cntl->_correlation_id;
  216. // Add 1 to finished sub calls.
  217. // The release fence is matched with acquire fence below to
  218. // guarantee visibilities of all other variables.
  219. const uint32_t val =
  220. _current_done.fetch_add(1, butil::memory_order_release);
  221. // Lower 31 bits are number of finished sub calls. If caller is not
  222. // the last call that finishes, return.
  223. if ((val & 0x7fffffff) + 1 != saved_ndone) {
  224. return;
  225. }
  226. // If _cntl->call_id() is still there, stop it by sending a special
  227. // error(which will be cleared) and return.
  228. if (!(val & 0x80000000)) {
  229. bthread_id_error(saved_cid, EPCHANFINISH);
  230. return;
  231. }
  232. } else {
  233. // [ Called from this->Run() ]
  234. // We may cancel sub calls even if all sub calls finish because
  235. // of reading the value relaxly (and CPU cache is not sync yet).
  236. // It's OK and we have to, because sub_done can't be accessed
  237. // after fetch_or.
  238. uint32_t val = _current_done.load(butil::memory_order_relaxed);
  239. // Lower 31 bits are number of finished sub calls. Cancel sub calls
  240. // if not all of them finish.
  241. if ((val & 0x7fffffff) != (uint32_t)_ndone) {
  242. for (int i = 0; i < _ndone; ++i) {
  243. bthread_id_error(sub_done(i)->cntl.call_id(), ECANCELED);
  244. }
  245. }
  246. // NOTE: Don't access any member after the fetch_or because
  247. // another thread may already go down and Destroy()-ed this object.
  248. const int saved_ndone = _ndone;
  249. // Modify MSB to mark that this->Run() run.
  250. // The release fence is matched with acquire fence below to
  251. // guarantee visibilities of all other variables.
  252. val = _current_done.fetch_or(0x80000000, butil::memory_order_release);
  253. // If not all sub calls finish, return.
  254. if ((val & 0x7fffffff) != (uint32_t)saved_ndone) {
  255. return;
  256. }
  257. }
  258. butil::atomic_thread_fence(butil::memory_order_acquire);
  259. if (fin != NULL &&
  260. !_cntl->is_done_allowed_to_run_in_place() &&
  261. IsSameThreadAsCallMethod()) {
  262. // A sub channel's CallMethod calls a subdone directly, create a
  263. // thread to run OnComplete.
  264. bthread_t bh;
  265. bthread_attr_t attr = (FLAGS_usercode_in_pthread ?
  266. BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
  267. if (bthread_start_background(&bh, &attr, RunOnComplete, this) != 0) {
  268. LOG(FATAL) << "Fail to start bthread";
  269. OnComplete();
  270. }
  271. } else {
  272. OnComplete();
  273. }
  274. }
  275. void OnComplete() {
  276. // [ Rendezvous point ]
  277. // One and only one thread arrives here.
  278. // all call_id of sub calls are destroyed and call_id of _cntl is
  279. // still locked (because FLAGS_DESTROY_CID_IN_DONE is true);
  280. // Merge responses of successful calls if fail_limit is not reached.
  281. // nfailed may be increased during the merging.
  282. // NOTE: Don't forget to set "nfailed = _ndone" when the _cntl is set
  283. // to be failed since the RPC is still considered to be successful if
  284. // nfailed is less than fail_limit
  285. int nfailed = _current_fail.load(butil::memory_order_relaxed);
  286. if (nfailed < _fail_limit) {
  287. for (int i = 0; i < _ndone; ++i) {
  288. SubDone* sd = sub_done(i);
  289. google::protobuf::Message* sub_res = sd->cntl._response;
  290. if (!sd->cntl.FailedInline()) { // successful calls only.
  291. if (sd->merger == NULL) {
  292. try {
  293. _cntl->_response->MergeFrom(*sub_res);
  294. } catch (const std::exception& e) {
  295. nfailed = _ndone;
  296. _cntl->SetFailed(ERESPONSE, "%s", e.what());
  297. break;
  298. }
  299. } else {
  300. ResponseMerger::Result res =
  301. sd->merger->Merge(_cntl->_response, sub_res);
  302. switch (res) {
  303. case ResponseMerger::MERGED:
  304. break;
  305. case ResponseMerger::FAIL:
  306. ++nfailed;
  307. break;
  308. case ResponseMerger::FAIL_ALL:
  309. nfailed = _ndone;
  310. _cntl->SetFailed(
  311. ERESPONSE,
  312. "Fail to merge response of channel[%d]", i);
  313. break;
  314. }
  315. }
  316. }
  317. }
  318. }
  319. // Note: 1 <= _fail_limit <= _ndone.
  320. if (nfailed >= _fail_limit) {
  321. // If controller was already failed, don't change it.
  322. if (!_cntl->FailedInline()) {
  323. char buf[16];
  324. int unified_ec = ECANCELED;
  325. for (int i = 0; i < _ndone; ++i) {
  326. Controller* sub_cntl = &sub_done(i)->cntl;
  327. const int ec = sub_cntl->ErrorCode();
  328. if (ec != 0 && ec != ECANCELED) {
  329. if (unified_ec == ECANCELED) {
  330. unified_ec = ec;
  331. } else if (unified_ec != ec) {
  332. unified_ec = ETOOMANYFAILS;
  333. break;
  334. }
  335. }
  336. }
  337. _cntl->SetFailed(unified_ec, "%d/%d channels failed, fail_limit=%d",
  338. nfailed, _ndone, _fail_limit);
  339. for (int i = 0; i < _ndone; ++i) {
  340. Controller* sub_cntl = &sub_done(i)->cntl;
  341. if (sub_cntl->FailedInline()) {
  342. const int len = snprintf(buf, sizeof(buf), " [C%d]", i);
  343. _cntl->_error_text.append(buf, len);
  344. _cntl->_error_text.append(sub_cntl->_error_text);
  345. }
  346. }
  347. }
  348. } else {
  349. // Failed sub channels does not reach the limit, the RPC is
  350. // considered to be successful. For example, a RPC to a
  351. // ParallelChannel is canceled by user however enough sub calls
  352. // (> _ndone - fail_limit) already succeed before the canceling,
  353. // the RPC is still successful rather than ECANCELED.
  354. _cntl->_error_code = 0;
  355. _cntl->_error_text.clear();
  356. }
  357. google::protobuf::Closure* user_done = _user_done;
  358. const CallId saved_cid = _cntl->call_id();
  359. // NOTE: we don't destroy self here, controller destroys this done in
  360. // Reset() so that user can access sub controllers before Reset().
  361. if (user_done) {
  362. _cntl->OnRPCEnd(butil::gettimeofday_us());
  363. user_done->Run();
  364. }
  365. CHECK_EQ(0, bthread_id_unlock_and_destroy(saved_cid));
  366. }
  367. int sub_done_size() const { return _ndone; }
  368. SubDone* sub_done(int i) { return &_sub_done[i]; }
  369. const SubDone* sub_done(int i) const { return &_sub_done[i]; }
  370. int& sub_done_map(int i) {
  371. return reinterpret_cast<int*>((_sub_done + _ndone))[i];
  372. }
  373. int sub_done_map(int i) const {
  374. return reinterpret_cast<const int*>((_sub_done + _ndone))[i];
  375. }
  376. int sub_channel_size() const { return _nchan; }
  377. // Different from sub_done(), sub_channel_controller returns NULL for
  378. // invalid accesses and never crashes.
  379. const Controller* sub_channel_controller(int i) const {
  380. if (i >= 0 && i < _nchan) {
  381. if (_nchan == _ndone) {
  382. return &sub_done(i)->cntl;
  383. }
  384. const int offset = sub_done_map(i);
  385. if (offset >= 0) {
  386. return &sub_done(offset)->cntl;
  387. }
  388. }
  389. return NULL;
  390. }
  391. private:
  392. int _fail_limit;
  393. int _ndone;
  394. int _nchan;
  395. #if defined(__clang__)
  396. int ALLOW_UNUSED _memsize;
  397. #else
  398. int _memsize;
  399. #endif
  400. butil::atomic<int> _current_fail;
  401. butil::atomic<uint32_t> _current_done;
  402. Controller* _cntl;
  403. google::protobuf::Closure* _user_done;
  404. bthread_t _callmethod_bthread;
  405. pthread_t _callmethod_pthread;
  406. SubDone _sub_done[0];
  407. };
  408. // Used in controller.cpp
  409. void DestroyParallelChannelDone(google::protobuf::Closure* c) {
  410. ParallelChannelDone::Destroy(static_cast<ParallelChannelDone*>(c));
  411. }
  412. const Controller* GetSubControllerOfParallelChannel(
  413. const google::protobuf::Closure* c, int i) {
  414. const ParallelChannelDone* d = static_cast<const ParallelChannelDone*>(c);
  415. return d->sub_channel_controller(i);
  416. }
  417. int ParallelChannel::Init(const ParallelChannelOptions* options) {
  418. if (options != NULL) {
  419. _options = *options;
  420. }
  421. return 0;
  422. }
  423. int ParallelChannel::AddChannel(ChannelBase* sub_channel,
  424. ChannelOwnership ownership,
  425. CallMapper* call_mapper,
  426. ResponseMerger* merger) {
  427. if (NULL == sub_channel) {
  428. LOG(ERROR) << "Param[sub_channel] is NULL";
  429. return -1;
  430. }
  431. if (_chans.capacity() == 0) {
  432. _chans.reserve(32);
  433. }
  434. SubChan sc;
  435. sc.chan = sub_channel;
  436. sc.ownership = ownership;
  437. sc.call_mapper = call_mapper;
  438. sc.merger = merger;
  439. _chans.push_back(sc);
  440. return 0;
  441. }
  442. struct SortByChannelPtr {
  443. bool operator()(const ParallelChannel::SubChan& c1,
  444. const ParallelChannel::SubChan& c2) const {
  445. return c1.chan < c2.chan;
  446. }
  447. };
  448. struct EqualChannelPtr {
  449. bool operator()(const ParallelChannel::SubChan& c1,
  450. const ParallelChannel::SubChan& c2) const {
  451. return c1.chan == c2.chan;
  452. }
  453. };
  454. void ParallelChannel::Reset() {
  455. // Removal of channels are a little complex because a channel may be
  456. // added multiple times.
  457. // Dereference call_mapper and mergers first.
  458. for (size_t i = 0; i < _chans.size(); ++i) {
  459. _chans[i].call_mapper.reset();
  460. _chans[i].merger.reset();
  461. }
  462. // Remove not own-ed channels.
  463. for (size_t i = 0; i < _chans.size();) {
  464. if (_chans[i].ownership != OWNS_CHANNEL) {
  465. _chans[i] = _chans.back();
  466. _chans.pop_back();
  467. } else {
  468. ++i;
  469. }
  470. }
  471. if (_chans.empty()) {
  472. return;
  473. }
  474. // Sort own-ed channels so that we can deduplicate them more efficiently.
  475. std::sort(_chans.begin(), _chans.end(), SortByChannelPtr());
  476. const size_t uniq_size =
  477. std::unique(_chans.begin(), _chans.end(), EqualChannelPtr())
  478. - _chans.begin();
  479. for (size_t i = 0; i < uniq_size; ++i) {
  480. CHECK_EQ(_chans[i].ownership, OWNS_CHANNEL);
  481. delete _chans[i].chan;
  482. }
  483. _chans.clear();
  484. }
  485. ParallelChannel::~ParallelChannel() {
  486. Reset();
  487. }
  488. static void HandleTimeout(void* arg) {
  489. bthread_id_t correlation_id = { (uint64_t)arg };
  490. bthread_id_error(correlation_id, ERPCTIMEDOUT);
  491. }
  492. void* ParallelChannel::RunDoneAndDestroy(void* arg) {
  493. Controller* c = static_cast<Controller*>(arg);
  494. // Move done out from the controller.
  495. google::protobuf::Closure* done = c->_done;
  496. c->_done = NULL;
  497. // Save call_id from the controller which may be deleted after Run().
  498. const bthread_id_t cid = c->call_id();
  499. done->Run();
  500. CHECK_EQ(0, bthread_id_unlock_and_destroy(cid));
  501. return NULL;
  502. }
  503. void ParallelChannel::CallMethod(
  504. const google::protobuf::MethodDescriptor* method,
  505. google::protobuf::RpcController* cntl_base,
  506. const google::protobuf::Message* request,
  507. google::protobuf::Message* response,
  508. google::protobuf::Closure* done) {
  509. Controller* cntl = static_cast<Controller*>(cntl_base);
  510. cntl->OnRPCBegin(butil::gettimeofday_us());
  511. // Make sure cntl->sub_count() always equal #sub-channels
  512. const int nchan = _chans.size();
  513. cntl->_pchan_sub_count = nchan;
  514. const CallId cid = cntl->call_id();
  515. const int rc = bthread_id_lock(cid, NULL);
  516. if (rc != 0) {
  517. CHECK_EQ(EINVAL, rc);
  518. if (!cntl->FailedInline()) {
  519. cntl->SetFailed(EINVAL, "Fail to lock call_id=%" PRId64, cid.value);
  520. }
  521. LOG_IF(ERROR, cntl->is_used_by_rpc())
  522. << "Controller=" << cntl << " was used by another RPC before. "
  523. "Did you forget to Reset() it before reuse?";
  524. // Have to run done in-place.
  525. // Read comment in CallMethod() in channel.cpp for details.
  526. if (done) {
  527. done->Run();
  528. }
  529. return;
  530. }
  531. cntl->set_used_by_rpc();
  532. ParallelChannelDone* d = NULL;
  533. int ndone = nchan;
  534. int fail_limit = 1;
  535. DEFINE_SMALL_ARRAY(SubCall, aps, nchan, 64);
  536. if (cntl->FailedInline()) {
  537. // The call_id is cancelled before RPC.
  538. goto FAIL;
  539. }
  540. // we don't support http whose response is NULL.
  541. if (response == NULL) {
  542. cntl->SetFailed(EINVAL, "response must be non-NULL");
  543. goto FAIL;
  544. }
  545. if (nchan == 0) {
  546. cntl->SetFailed(EPERM, "No channels added");
  547. goto FAIL;
  548. }
  549. for (int i = 0; i < nchan; ++i) {
  550. SubChan& sub_chan = _chans[i];
  551. if (sub_chan.call_mapper != NULL) {
  552. aps[i] = sub_chan.call_mapper->Map(i, method, request, response);
  553. // Test is_skip first because it implies is_bad.
  554. if (aps[i].is_skip()) {
  555. --ndone;
  556. } else if (aps[i].is_bad()) {
  557. cntl->SetFailed(
  558. EREQUEST, "CallMapper of channel[%d] returns Bad()", i);
  559. goto FAIL;
  560. }
  561. } else {
  562. google::protobuf::Message* cur_res = response->New();
  563. if (cur_res == NULL) {
  564. cntl->SetFailed(ENOMEM, "Fail to new response");
  565. goto FAIL;
  566. }
  567. aps[i] = SubCall(method, request, cur_res, DELETE_RESPONSE);
  568. }
  569. }
  570. if (ndone <= 0) {
  571. cntl->SetFailed(ECANCELED, "Skipped all channels(%d)", nchan);
  572. goto FAIL;
  573. }
  574. if (_options.fail_limit < 0) {
  575. // Both Controller and ParallelChannel haven't set `fail_limit'
  576. fail_limit = ndone;
  577. } else {
  578. fail_limit = _options.fail_limit;
  579. if (fail_limit < 1) {
  580. fail_limit = 1;
  581. } else if (fail_limit > ndone) {
  582. fail_limit = ndone;
  583. }
  584. }
  585. d = ParallelChannelDone::Create(fail_limit, ndone, aps, nchan,
  586. cntl, done);
  587. if (NULL == d) {
  588. cntl->SetFailed(ENOMEM, "Fail to new ParallelChannelDone");
  589. goto FAIL;
  590. }
  591. for (int i = 0, j = 0; i < nchan; ++i) {
  592. SubChan& sub_chan = _chans[i];
  593. if (!aps[i].is_skip()) {
  594. ParallelChannelDone::SubDone* sd = d->sub_done(j++);
  595. sd->ap = aps[i];
  596. sd->shared_data = d;
  597. sd->merger = sub_chan.merger;
  598. }
  599. }
  600. cntl->_response = response;
  601. cntl->_done = d;
  602. cntl->add_flag(Controller::FLAGS_DESTROY_CID_IN_DONE);
  603. if (cntl->timeout_ms() == UNSET_MAGIC_NUM) {
  604. cntl->set_timeout_ms(_options.timeout_ms);
  605. }
  606. if (cntl->timeout_ms() >= 0) {
  607. cntl->_deadline_us = cntl->timeout_ms() * 1000L + cntl->_begin_time_us;
  608. // Setup timer for RPC timetout
  609. const int rc = bthread_timer_add(
  610. &cntl->_timeout_id,
  611. butil::microseconds_to_timespec(cntl->_deadline_us),
  612. HandleTimeout, (void*)cid.value);
  613. if (rc != 0) {
  614. cntl->SetFailed(rc, "Fail to add timer");
  615. goto FAIL;
  616. }
  617. } else {
  618. cntl->_deadline_us = -1;
  619. }
  620. d->SaveThreadInfoOfCallsite();
  621. CHECK_EQ(0, bthread_id_unlock(cid));
  622. // Don't touch `cntl' and `d' again (for async RPC)
  623. for (int i = 0, j = 0; i < nchan; ++i) {
  624. if (!aps[i].is_skip()) {
  625. ParallelChannelDone::SubDone* sd = d->sub_done(j++);
  626. // Forward the attachment to each sub call
  627. sd->cntl.request_attachment().append(cntl->request_attachment());
  628. _chans[i].chan->CallMethod(sd->ap.method, &sd->cntl,
  629. sd->ap.request, sd->ap.response, sd);
  630. }
  631. // Although we can delete request (if delete_request is true) after
  632. // starting sub call, we leave it in ~SubCall(called when d is
  633. // Destroy()-ed) because we may need to check requests for debugging
  634. // purposes.
  635. }
  636. if (done == NULL) {
  637. Join(cid);
  638. cntl->OnRPCEnd(butil::gettimeofday_us());
  639. }
  640. return;
  641. FAIL:
  642. // The RPC was failed after locking call_id and before calling sub channels.
  643. if (d) {
  644. // Set the _done to NULL to make sure cntl->sub(any_index) is NULL.
  645. cntl->_done = NULL;
  646. ParallelChannelDone::Destroy(d);
  647. }
  648. if (done) {
  649. if (!cntl->is_done_allowed_to_run_in_place()) {
  650. bthread_t bh;
  651. bthread_attr_t attr = (FLAGS_usercode_in_pthread ?
  652. BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
  653. // Hack: save done in cntl->_done to remove a malloc of args.
  654. cntl->_done = done;
  655. if (bthread_start_background(&bh, &attr, RunDoneAndDestroy, cntl) == 0) {
  656. return;
  657. }
  658. cntl->_done = NULL;
  659. LOG(FATAL) << "Fail to start bthread";
  660. }
  661. done->Run();
  662. }
  663. CHECK_EQ(0, bthread_id_unlock_and_destroy(cid));
  664. }
  665. int ParallelChannel::Weight() {
  666. if (_chans.empty()) {
  667. return 0;
  668. }
  669. int w = _chans[0].chan->Weight();
  670. for (size_t i = 1; i < _chans.size(); ++i) {
  671. const int w2 = _chans[i].chan->Weight();
  672. if (w2 < w) {
  673. w = w2;
  674. }
  675. }
  676. return w;
  677. }
  678. int ParallelChannel::CheckHealth() {
  679. if (_chans.empty()) {
  680. return -1;
  681. }
  682. int threshold = (int)_chans.size();
  683. if (_options.fail_limit > 0) {
  684. threshold -= _options.fail_limit;
  685. ++threshold;
  686. }
  687. if (threshold <= 0) {
  688. return 0;
  689. }
  690. int nhealthy = 0;
  691. for (size_t i = 0; i < _chans.size(); ++i) {
  692. nhealthy += (_chans[i].chan->CheckHealth() == 0);
  693. if (nhealthy >= threshold) {
  694. return 0;
  695. }
  696. }
  697. return -1;
  698. }
  699. void ParallelChannel::Describe(
  700. std::ostream& os, const DescribeOptions& options) const {
  701. os << "ParallelChannel[";
  702. if (!options.verbose) {
  703. os << _chans.size();
  704. } else {
  705. for (size_t i = 0; i < _chans.size(); ++i) {
  706. if (i != 0) {
  707. os << ' ';
  708. }
  709. os << *_chans[i].chan;
  710. }
  711. }
  712. os << "]";
  713. }
  714. } // namespace brpc