brpc_channel_unittest.cpp 93 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // brpc - A framework to host and access services throughout Baidu.
  18. // Date: Sun Jul 13 15:04:18 CST 2014
  19. #include <sys/types.h>
  20. #include <sys/socket.h>
  21. #include <gtest/gtest.h>
  22. #include <gflags/gflags.h>
  23. #include <google/protobuf/descriptor.h>
  24. #include "butil/time.h"
  25. #include "butil/macros.h"
  26. #include "butil/logging.h"
  27. #include "butil/files/temp_file.h"
  28. #include "brpc/socket.h"
  29. #include "brpc/acceptor.h"
  30. #include "brpc/server.h"
  31. #include "brpc/policy/baidu_rpc_protocol.h"
  32. #include "brpc/policy/baidu_rpc_meta.pb.h"
  33. #include "brpc/policy/most_common_message.h"
  34. #include "brpc/channel.h"
  35. #include "brpc/details/load_balancer_with_naming.h"
  36. #include "brpc/parallel_channel.h"
  37. #include "brpc/selective_channel.h"
  38. #include "brpc/socket_map.h"
  39. #include "brpc/controller.h"
  40. #include "echo.pb.h"
  41. #include "brpc/options.pb.h"
  42. namespace brpc {
  43. DECLARE_int32(idle_timeout_second);
  44. DECLARE_int32(max_connection_pool_size);
  45. class Server;
  46. class MethodStatus;
  47. namespace policy {
  48. void SendRpcResponse(int64_t correlation_id, Controller* cntl,
  49. const google::protobuf::Message* req,
  50. const google::protobuf::Message* res,
  51. const Server* server_raw, MethodStatus *, int64_t);
  52. } // policy
  53. } // brpc
  54. int main(int argc, char* argv[]) {
  55. brpc::FLAGS_idle_timeout_second = 0;
  56. brpc::FLAGS_max_connection_pool_size = 0;
  57. testing::InitGoogleTest(&argc, argv);
  58. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
  59. return RUN_ALL_TESTS();
  60. }
  61. namespace {
  62. void* RunClosure(void* arg) {
  63. google::protobuf::Closure* done = (google::protobuf::Closure*)arg;
  64. done->Run();
  65. return NULL;
  66. }
  67. class DeleteOnlyOnceChannel : public brpc::Channel {
  68. public:
  69. DeleteOnlyOnceChannel() : _c(1) {
  70. }
  71. ~DeleteOnlyOnceChannel() {
  72. if (_c.fetch_sub(1) != 1) {
  73. LOG(ERROR) << "Delete more than once!";
  74. abort();
  75. }
  76. }
  77. private:
  78. butil::atomic<int> _c;
  79. };
  80. static std::string MOCK_CREDENTIAL = "mock credential";
  81. static std::string MOCK_CONTEXT = "mock context";
  82. class MyAuthenticator : public brpc::Authenticator {
  83. public:
  84. MyAuthenticator() : count(0) {}
  85. int GenerateCredential(std::string* auth_str) const {
  86. *auth_str = MOCK_CREDENTIAL;
  87. count.fetch_add(1, butil::memory_order_relaxed);
  88. return 0;
  89. }
  90. int VerifyCredential(const std::string&,
  91. const butil::EndPoint&,
  92. brpc::AuthContext* ctx) const {
  93. ctx->set_user(MOCK_CONTEXT);
  94. ctx->set_group(MOCK_CONTEXT);
  95. ctx->set_roles(MOCK_CONTEXT);
  96. ctx->set_starter(MOCK_CONTEXT);
  97. ctx->set_is_service(true);
  98. return 0;
  99. }
  100. mutable butil::atomic<int32_t> count;
  101. };
  102. static bool VerifyMyRequest(const brpc::InputMessageBase* msg_base) {
  103. const brpc::policy::MostCommonMessage* msg =
  104. static_cast<const brpc::policy::MostCommonMessage*>(msg_base);
  105. brpc::Socket* ptr = msg->socket();
  106. brpc::policy::RpcMeta meta;
  107. butil::IOBufAsZeroCopyInputStream wrapper(msg->meta);
  108. EXPECT_TRUE(meta.ParseFromZeroCopyStream(&wrapper));
  109. if (meta.has_authentication_data()) {
  110. // Credential MUST only appear in the first packet
  111. EXPECT_TRUE(NULL == ptr->auth_context());
  112. EXPECT_EQ(meta.authentication_data(), MOCK_CREDENTIAL);
  113. MyAuthenticator authenticator;
  114. return authenticator.VerifyCredential(
  115. "", butil::EndPoint(), ptr->mutable_auth_context()) == 0;
  116. }
  117. return true;
  118. }
  119. class MyEchoService : public ::test::EchoService {
  120. void Echo(google::protobuf::RpcController* cntl_base,
  121. const ::test::EchoRequest* req,
  122. ::test::EchoResponse* res,
  123. google::protobuf::Closure* done) {
  124. brpc::Controller* cntl =
  125. static_cast<brpc::Controller*>(cntl_base);
  126. brpc::ClosureGuard done_guard(done);
  127. if (req->server_fail()) {
  128. cntl->SetFailed(req->server_fail(), "Server fail1");
  129. cntl->SetFailed(req->server_fail(), "Server fail2");
  130. return;
  131. }
  132. if (req->close_fd()) {
  133. LOG(INFO) << "close fd...";
  134. cntl->CloseConnection("Close connection according to request");
  135. return;
  136. }
  137. if (req->sleep_us() > 0) {
  138. LOG(INFO) << "sleep " << req->sleep_us() << "us...";
  139. bthread_usleep(req->sleep_us());
  140. }
  141. res->set_message("received " + req->message());
  142. if (req->code() != 0) {
  143. res->add_code_list(req->code());
  144. }
  145. res->set_receiving_socket_id(cntl->_current_call.sending_sock->id());
  146. }
  147. };
  148. pthread_once_t register_mock_protocol = PTHREAD_ONCE_INIT;
  149. class ChannelTest : public ::testing::Test{
  150. protected:
  151. ChannelTest()
  152. : _ep(butil::IP_ANY, 8787)
  153. , _close_fd_once(false) {
  154. pthread_once(&register_mock_protocol, register_protocol);
  155. const brpc::InputMessageHandler pairs[] = {
  156. { brpc::policy::ParseRpcMessage,
  157. ProcessRpcRequest, VerifyMyRequest, this, "baidu_std" }
  158. };
  159. EXPECT_EQ(0, _messenger.AddHandler(pairs[0]));
  160. EXPECT_EQ(0, _server_list.save(butil::endpoint2str(_ep).c_str()));
  161. _naming_url = std::string("File://") + _server_list.fname();
  162. };
  163. virtual ~ChannelTest(){};
  164. virtual void SetUp() {
  165. };
  166. virtual void TearDown() {
  167. StopAndJoin();
  168. };
  169. static void register_protocol() {
  170. brpc::Protocol dummy_protocol =
  171. { brpc::policy::ParseRpcMessage,
  172. brpc::SerializeRequestDefault,
  173. brpc::policy::PackRpcRequest,
  174. NULL, ProcessRpcRequest,
  175. VerifyMyRequest, NULL, NULL,
  176. brpc::CONNECTION_TYPE_ALL, "baidu_std" };
  177. ASSERT_EQ(0, RegisterProtocol((brpc::ProtocolType)30, dummy_protocol));
  178. }
  179. static void ProcessRpcRequest(brpc::InputMessageBase* msg_base) {
  180. brpc::DestroyingPtr<brpc::policy::MostCommonMessage> msg(
  181. static_cast<brpc::policy::MostCommonMessage*>(msg_base));
  182. brpc::SocketUniquePtr ptr(msg->ReleaseSocket());
  183. const brpc::AuthContext* auth = ptr->auth_context();
  184. if (auth) {
  185. EXPECT_EQ(MOCK_CONTEXT, auth->user());
  186. EXPECT_EQ(MOCK_CONTEXT, auth->group());
  187. EXPECT_EQ(MOCK_CONTEXT, auth->roles());
  188. EXPECT_EQ(MOCK_CONTEXT, auth->starter());
  189. EXPECT_TRUE(auth->is_service());
  190. }
  191. ChannelTest* ts = (ChannelTest*)msg_base->arg();
  192. if (ts->_close_fd_once) {
  193. ts->_close_fd_once = false;
  194. ptr->SetFailed();
  195. return;
  196. }
  197. brpc::policy::RpcMeta meta;
  198. butil::IOBufAsZeroCopyInputStream wrapper(msg->meta);
  199. EXPECT_TRUE(meta.ParseFromZeroCopyStream(&wrapper));
  200. const brpc::policy::RpcRequestMeta& req_meta = meta.request();
  201. ASSERT_EQ(ts->_svc.descriptor()->full_name(), req_meta.service_name());
  202. const google::protobuf::MethodDescriptor* method =
  203. ts->_svc.descriptor()->FindMethodByName(req_meta.method_name());
  204. google::protobuf::Message* req =
  205. ts->_svc.GetRequestPrototype(method).New();
  206. if (meta.attachment_size() != 0) {
  207. butil::IOBuf req_buf;
  208. msg->payload.cutn(&req_buf, msg->payload.size() - meta.attachment_size());
  209. butil::IOBufAsZeroCopyInputStream wrapper2(req_buf);
  210. EXPECT_TRUE(req->ParseFromZeroCopyStream(&wrapper2));
  211. } else {
  212. butil::IOBufAsZeroCopyInputStream wrapper2(msg->payload);
  213. EXPECT_TRUE(req->ParseFromZeroCopyStream(&wrapper2));
  214. }
  215. brpc::Controller* cntl = new brpc::Controller();
  216. cntl->_current_call.peer_id = ptr->id();
  217. cntl->_current_call.sending_sock.reset(ptr.release());
  218. cntl->_server = &ts->_dummy;
  219. google::protobuf::Message* res =
  220. ts->_svc.GetResponsePrototype(method).New();
  221. google::protobuf::Closure* done =
  222. brpc::NewCallback<
  223. int64_t, brpc::Controller*,
  224. const google::protobuf::Message*,
  225. const google::protobuf::Message*,
  226. const brpc::Server*,
  227. brpc::MethodStatus*, int64_t>(
  228. &brpc::policy::SendRpcResponse,
  229. meta.correlation_id(), cntl, NULL, res,
  230. &ts->_dummy, NULL, -1);
  231. ts->_svc.CallMethod(method, cntl, req, res, done);
  232. }
  233. int StartAccept(butil::EndPoint ep) {
  234. int listening_fd = -1;
  235. while ((listening_fd = tcp_listen(ep)) < 0) {
  236. if (errno == EADDRINUSE) {
  237. bthread_usleep(1000);
  238. } else {
  239. return -1;
  240. }
  241. }
  242. if (_messenger.StartAccept(listening_fd, -1, NULL) != 0) {
  243. return -1;
  244. }
  245. return 0;
  246. }
  247. void StopAndJoin() {
  248. _messenger.StopAccept(0);
  249. _messenger.Join();
  250. }
  251. void SetUpChannel(brpc::Channel* channel,
  252. bool single_server,
  253. bool short_connection,
  254. const brpc::Authenticator* auth = NULL,
  255. std::string connection_group = std::string()) {
  256. brpc::ChannelOptions opt;
  257. if (short_connection) {
  258. opt.connection_type = brpc::CONNECTION_TYPE_SHORT;
  259. }
  260. opt.auth = auth;
  261. opt.max_retry = 0;
  262. opt.connection_group = connection_group;
  263. if (single_server) {
  264. EXPECT_EQ(0, channel->Init(_ep, &opt));
  265. } else {
  266. EXPECT_EQ(0, channel->Init(_naming_url.c_str(), "rR", &opt));
  267. }
  268. }
  269. void CallMethod(brpc::ChannelBase* channel,
  270. brpc::Controller* cntl,
  271. test::EchoRequest* req, test::EchoResponse* res,
  272. bool async, bool destroy = false) {
  273. google::protobuf::Closure* done = NULL;
  274. brpc::CallId sync_id = { 0 };
  275. if (async) {
  276. sync_id = cntl->call_id();
  277. done = brpc::DoNothing();
  278. }
  279. ::test::EchoService::Stub(channel).Echo(cntl, req, res, done);
  280. if (async) {
  281. if (destroy) {
  282. delete channel;
  283. }
  284. // Callback MUST be called for once and only once
  285. bthread_id_join(sync_id);
  286. }
  287. }
  288. void CallMethod(brpc::ChannelBase* channel,
  289. brpc::Controller* cntl,
  290. test::ComboRequest* req, test::ComboResponse* res,
  291. bool async, bool destroy = false) {
  292. google::protobuf::Closure* done = NULL;
  293. brpc::CallId sync_id = { 0 };
  294. if (async) {
  295. sync_id = cntl->call_id();
  296. done = brpc::DoNothing();
  297. }
  298. ::test::EchoService::Stub(channel).ComboEcho(cntl, req, res, done);
  299. if (async) {
  300. if (destroy) {
  301. delete channel;
  302. }
  303. // Callback MUST be called for once and only once
  304. bthread_id_join(sync_id);
  305. }
  306. }
  307. void TestConnectionFailed(bool single_server, bool async,
  308. bool short_connection) {
  309. std::cout << " *** single=" << single_server
  310. << " async=" << async
  311. << " short=" << short_connection << std::endl;
  312. brpc::Channel channel;
  313. SetUpChannel(&channel, single_server, short_connection);
  314. brpc::Controller cntl;
  315. test::EchoRequest req;
  316. test::EchoResponse res;
  317. req.set_message(__FUNCTION__);
  318. CallMethod(&channel, &cntl, &req, &res, async);
  319. EXPECT_EQ(ECONNREFUSED, cntl.ErrorCode()) << cntl.ErrorText();
  320. }
  321. void TestConnectionFailedParallel(bool single_server, bool async,
  322. bool short_connection) {
  323. std::cout << " *** single=" << single_server
  324. << " async=" << async
  325. << " short=" << short_connection << std::endl;
  326. const size_t NCHANS = 8;
  327. brpc::Channel subchans[NCHANS];
  328. brpc::ParallelChannel channel;
  329. for (size_t i = 0; i < NCHANS; ++i) {
  330. SetUpChannel(&subchans[i], single_server, short_connection);
  331. ASSERT_EQ(0, channel.AddChannel(
  332. &subchans[i], brpc::DOESNT_OWN_CHANNEL,
  333. NULL, NULL));
  334. }
  335. brpc::Controller cntl;
  336. test::EchoRequest req;
  337. test::EchoResponse res;
  338. req.set_message(__FUNCTION__);
  339. CallMethod(&channel, &cntl, &req, &res, async);
  340. EXPECT_TRUE(brpc::ETOOMANYFAILS == cntl.ErrorCode() ||
  341. ECONNREFUSED == cntl.ErrorCode()) << cntl.ErrorText();
  342. LOG(INFO) << cntl.ErrorText();
  343. }
  344. void TestConnectionFailedSelective(bool single_server, bool async,
  345. bool short_connection) {
  346. std::cout << " *** single=" << single_server
  347. << " async=" << async
  348. << " short=" << short_connection << std::endl;
  349. const size_t NCHANS = 8;
  350. brpc::SelectiveChannel channel;
  351. brpc::ChannelOptions options;
  352. options.max_retry = 0;
  353. ASSERT_EQ(0, channel.Init("rr", &options));
  354. for (size_t i = 0; i < NCHANS; ++i) {
  355. brpc::Channel* subchan = new brpc::Channel;
  356. SetUpChannel(subchan, single_server, short_connection);
  357. ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
  358. }
  359. brpc::Controller cntl;
  360. test::EchoRequest req;
  361. test::EchoResponse res;
  362. req.set_message(__FUNCTION__);
  363. CallMethod(&channel, &cntl, &req, &res, async);
  364. EXPECT_EQ(ECONNREFUSED, cntl.ErrorCode()) << cntl.ErrorText();
  365. ASSERT_EQ(1, cntl.sub_count());
  366. EXPECT_EQ(ECONNREFUSED, cntl.sub(0)->ErrorCode())
  367. << cntl.sub(0)->ErrorText();
  368. LOG(INFO) << cntl.ErrorText();
  369. }
  370. void TestSuccess(bool single_server, bool async, bool short_connection) {
  371. std::cout << " *** single=" << single_server
  372. << " async=" << async
  373. << " short=" << short_connection << std::endl;
  374. ASSERT_EQ(0, StartAccept(_ep));
  375. brpc::Channel channel;
  376. SetUpChannel(&channel, single_server, short_connection);
  377. brpc::Controller cntl;
  378. test::EchoRequest req;
  379. test::EchoResponse res;
  380. req.set_message(__FUNCTION__);
  381. CallMethod(&channel, &cntl, &req, &res, async);
  382. EXPECT_EQ(0, cntl.ErrorCode())
  383. << single_server << ", " << async << ", " << short_connection;
  384. const uint64_t receiving_socket_id = res.receiving_socket_id();
  385. EXPECT_EQ(0, cntl.sub_count());
  386. EXPECT_TRUE(NULL == cntl.sub(-1));
  387. EXPECT_TRUE(NULL == cntl.sub(0));
  388. EXPECT_TRUE(NULL == cntl.sub(1));
  389. EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
  390. if (short_connection) {
  391. // Sleep to let `_messenger' detect `Socket' being `SetFailed'
  392. const int64_t start_time = butil::gettimeofday_us();
  393. while (_messenger.ConnectionCount() != 0) {
  394. EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
  395. bthread_usleep(1000);
  396. }
  397. } else {
  398. EXPECT_GE(1ul, _messenger.ConnectionCount());
  399. }
  400. if (single_server && !short_connection) {
  401. // Reuse the connection
  402. brpc::Channel channel2;
  403. SetUpChannel(&channel2, single_server, short_connection);
  404. cntl.Reset();
  405. req.Clear();
  406. res.Clear();
  407. req.set_message(__FUNCTION__);
  408. CallMethod(&channel2, &cntl, &req, &res, async);
  409. EXPECT_EQ(0, cntl.ErrorCode())
  410. << single_server << ", " << async << ", " << short_connection;
  411. EXPECT_EQ(receiving_socket_id, res.receiving_socket_id());
  412. // A different connection_group does not reuse the connection
  413. brpc::Channel channel3;
  414. SetUpChannel(&channel3, single_server, short_connection,
  415. NULL, "another_group");
  416. cntl.Reset();
  417. req.Clear();
  418. res.Clear();
  419. req.set_message(__FUNCTION__);
  420. CallMethod(&channel3, &cntl, &req, &res, async);
  421. EXPECT_EQ(0, cntl.ErrorCode())
  422. << single_server << ", " << async << ", " << short_connection;
  423. const uint64_t receiving_socket_id2 = res.receiving_socket_id();
  424. EXPECT_NE(receiving_socket_id, receiving_socket_id2);
  425. // Channel in the same connection_group reuses the connection
  426. // note that the leading/trailing spaces should be trimed.
  427. brpc::Channel channel4;
  428. SetUpChannel(&channel4, single_server, short_connection,
  429. NULL, " another_group ");
  430. cntl.Reset();
  431. req.Clear();
  432. res.Clear();
  433. req.set_message(__FUNCTION__);
  434. CallMethod(&channel4, &cntl, &req, &res, async);
  435. EXPECT_EQ(0, cntl.ErrorCode())
  436. << single_server << ", " << async << ", " << short_connection;
  437. EXPECT_EQ(receiving_socket_id2, res.receiving_socket_id());
  438. }
  439. StopAndJoin();
  440. }
  441. class SetCode : public brpc::CallMapper {
  442. public:
  443. brpc::SubCall Map(
  444. int channel_index,
  445. const google::protobuf::MethodDescriptor* method,
  446. const google::protobuf::Message* req_base,
  447. google::protobuf::Message* response) {
  448. test::EchoRequest* req = brpc::Clone<test::EchoRequest>(req_base);
  449. req->set_code(channel_index + 1/*non-zero*/);
  450. return brpc::SubCall(method, req, response->New(),
  451. brpc::DELETE_REQUEST | brpc::DELETE_RESPONSE);
  452. }
  453. };
  454. class SetCodeOnEven : public SetCode {
  455. public:
  456. brpc::SubCall Map(
  457. int channel_index,
  458. const google::protobuf::MethodDescriptor* method,
  459. const google::protobuf::Message* req_base,
  460. google::protobuf::Message* response) {
  461. if (channel_index % 2) {
  462. return brpc::SubCall::Skip();
  463. }
  464. return SetCode::Map(channel_index, method, req_base, response);
  465. }
  466. };
  467. class GetReqAndAddRes : public brpc::CallMapper {
  468. brpc::SubCall Map(
  469. int channel_index,
  470. const google::protobuf::MethodDescriptor* method,
  471. const google::protobuf::Message* req_base,
  472. google::protobuf::Message* res_base) {
  473. const test::ComboRequest* req =
  474. dynamic_cast<const test::ComboRequest*>(req_base);
  475. test::ComboResponse* res = dynamic_cast<test::ComboResponse*>(res_base);
  476. if (method->name() != "ComboEcho" ||
  477. res == NULL || req == NULL ||
  478. req->requests_size() <= channel_index) {
  479. return brpc::SubCall::Bad();
  480. }
  481. return brpc::SubCall(::test::EchoService::descriptor()->method(0),
  482. &req->requests(channel_index),
  483. res->add_responses(), 0);
  484. }
  485. };
  486. class MergeNothing : public brpc::ResponseMerger {
  487. Result Merge(google::protobuf::Message* /*response*/,
  488. const google::protobuf::Message* /*sub_response*/) {
  489. return brpc::ResponseMerger::MERGED;
  490. }
  491. };
  492. void TestSuccessParallel(bool single_server, bool async, bool short_connection) {
  493. std::cout << " *** single=" << single_server
  494. << " async=" << async
  495. << " short=" << short_connection << std::endl;
  496. ASSERT_EQ(0, StartAccept(_ep));
  497. const size_t NCHANS = 8;
  498. brpc::Channel subchans[NCHANS];
  499. brpc::ParallelChannel channel;
  500. for (size_t i = 0; i < NCHANS; ++i) {
  501. SetUpChannel(&subchans[i], single_server, short_connection);
  502. ASSERT_EQ(0, channel.AddChannel(
  503. &subchans[i], brpc::DOESNT_OWN_CHANNEL,
  504. new SetCode, NULL));
  505. }
  506. brpc::Controller cntl;
  507. test::EchoRequest req;
  508. test::EchoResponse res;
  509. req.set_message(__FUNCTION__);
  510. req.set_code(23);
  511. CallMethod(&channel, &cntl, &req, &res, async);
  512. EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
  513. EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
  514. for (int i = 0; i < cntl.sub_count(); ++i) {
  515. EXPECT_TRUE(cntl.sub(i) && !cntl.sub(i)->Failed()) << "i=" << i;
  516. }
  517. EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
  518. ASSERT_EQ(NCHANS, (size_t)res.code_list_size());
  519. for (size_t i = 0; i < NCHANS; ++i) {
  520. ASSERT_EQ((int)i+1, res.code_list(i));
  521. }
  522. if (short_connection) {
  523. // Sleep to let `_messenger' detect `Socket' being `SetFailed'
  524. const int64_t start_time = butil::gettimeofday_us();
  525. while (_messenger.ConnectionCount() != 0) {
  526. EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
  527. bthread_usleep(1000);
  528. }
  529. } else {
  530. EXPECT_GE(1ul, _messenger.ConnectionCount());
  531. }
  532. StopAndJoin();
  533. }
  534. void TestSuccessDuplicatedParallel(
  535. bool single_server, bool async, bool short_connection) {
  536. std::cout << " *** single=" << single_server
  537. << " async=" << async
  538. << " short=" << short_connection << std::endl;
  539. ASSERT_EQ(0, StartAccept(_ep));
  540. const size_t NCHANS = 8;
  541. brpc::Channel* subchan = new DeleteOnlyOnceChannel;
  542. SetUpChannel(subchan, single_server, short_connection);
  543. brpc::ParallelChannel channel;
  544. // Share the CallMapper and ResponseMerger should be fine because
  545. // they're intrusively shared.
  546. SetCode* set_code = new SetCode;
  547. for (size_t i = 0; i < NCHANS; ++i) {
  548. ASSERT_EQ(0, channel.AddChannel(
  549. subchan,
  550. // subchan should be deleted (for only once)
  551. ((i % 2) ? brpc::DOESNT_OWN_CHANNEL : brpc::OWNS_CHANNEL),
  552. set_code, NULL));
  553. }
  554. ASSERT_EQ((int)NCHANS, set_code->ref_count());
  555. brpc::Controller cntl;
  556. test::EchoRequest req;
  557. test::EchoResponse res;
  558. req.set_message(__FUNCTION__);
  559. req.set_code(23);
  560. CallMethod(&channel, &cntl, &req, &res, async);
  561. EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
  562. EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
  563. for (int i = 0; i < cntl.sub_count(); ++i) {
  564. EXPECT_TRUE(cntl.sub(i) && !cntl.sub(i)->Failed()) << "i=" << i;
  565. }
  566. EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
  567. ASSERT_EQ(NCHANS, (size_t)res.code_list_size());
  568. for (size_t i = 0; i < NCHANS; ++i) {
  569. ASSERT_EQ((int)i+1, res.code_list(i));
  570. }
  571. if (short_connection) {
  572. // Sleep to let `_messenger' detect `Socket' being `SetFailed'
  573. const int64_t start_time = butil::gettimeofday_us();
  574. while (_messenger.ConnectionCount() != 0) {
  575. EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
  576. bthread_usleep(1000);
  577. }
  578. } else {
  579. EXPECT_GE(1ul, _messenger.ConnectionCount());
  580. }
  581. StopAndJoin();
  582. }
  583. void TestSuccessSelective(bool single_server, bool async, bool short_connection) {
  584. std::cout << " *** single=" << single_server
  585. << " async=" << async
  586. << " short=" << short_connection << std::endl;
  587. const size_t NCHANS = 8;
  588. ASSERT_EQ(0, StartAccept(_ep));
  589. brpc::SelectiveChannel channel;
  590. brpc::ChannelOptions options;
  591. options.max_retry = 0;
  592. ASSERT_EQ(0, channel.Init("rr", &options));
  593. for (size_t i = 0; i < NCHANS; ++i) {
  594. brpc::Channel* subchan = new brpc::Channel;
  595. SetUpChannel(subchan, single_server, short_connection);
  596. ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
  597. }
  598. brpc::Controller cntl;
  599. test::EchoRequest req;
  600. test::EchoResponse res;
  601. req.set_message(__FUNCTION__);
  602. req.set_code(23);
  603. CallMethod(&channel, &cntl, &req, &res, async);
  604. EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
  605. EXPECT_EQ(1, cntl.sub_count());
  606. ASSERT_EQ(0, cntl.sub(0)->ErrorCode());
  607. EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
  608. ASSERT_EQ(1, res.code_list_size());
  609. ASSERT_EQ(req.code(), res.code_list(0));
  610. ASSERT_EQ(_ep, cntl.remote_side());
  611. if (short_connection) {
  612. // Sleep to let `_messenger' detect `Socket' being `SetFailed'
  613. const int64_t start_time = butil::gettimeofday_us();
  614. while (_messenger.ConnectionCount() != 0) {
  615. EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
  616. bthread_usleep(1000);
  617. }
  618. } else {
  619. EXPECT_GE(1ul, _messenger.ConnectionCount());
  620. }
  621. StopAndJoin();
  622. }
  623. void TestSkipParallel(bool single_server, bool async, bool short_connection) {
  624. std::cout << " *** single=" << single_server
  625. << " async=" << async
  626. << " short=" << short_connection << std::endl;
  627. ASSERT_EQ(0, StartAccept(_ep));
  628. const size_t NCHANS = 8;
  629. brpc::Channel subchans[NCHANS];
  630. brpc::ParallelChannel channel;
  631. for (size_t i = 0; i < NCHANS; ++i) {
  632. SetUpChannel(&subchans[i], single_server, short_connection);
  633. ASSERT_EQ(0, channel.AddChannel(
  634. &subchans[i], brpc::DOESNT_OWN_CHANNEL,
  635. new SetCodeOnEven, NULL));
  636. }
  637. brpc::Controller cntl;
  638. test::EchoRequest req;
  639. test::EchoResponse res;
  640. req.set_message(__FUNCTION__);
  641. req.set_code(23);
  642. CallMethod(&channel, &cntl, &req, &res, async);
  643. EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
  644. EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
  645. EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
  646. for (int i = 0; i < cntl.sub_count(); ++i) {
  647. if (i % 2) {
  648. EXPECT_TRUE(NULL == cntl.sub(i)) << "i=" << i;
  649. } else {
  650. EXPECT_TRUE(cntl.sub(i) && !cntl.sub(i)->Failed()) << "i=" << i;
  651. }
  652. }
  653. ASSERT_EQ(NCHANS / 2, (size_t)res.code_list_size());
  654. for (int i = 0; i < res.code_list_size(); ++i) {
  655. ASSERT_EQ(i*2 + 1, res.code_list(i));
  656. }
  657. if (short_connection) {
  658. // Sleep to let `_messenger' detect `Socket' being `SetFailed'
  659. const int64_t start_time = butil::gettimeofday_us();
  660. while (_messenger.ConnectionCount() != 0) {
  661. EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
  662. bthread_usleep(1000);
  663. }
  664. } else {
  665. EXPECT_GE(1ul, _messenger.ConnectionCount());
  666. }
  667. StopAndJoin();
  668. }
  669. void TestSuccessParallel2(bool single_server, bool async, bool short_connection) {
  670. std::cout << " *** single=" << single_server
  671. << " async=" << async
  672. << " short=" << short_connection << std::endl;
  673. ASSERT_EQ(0, StartAccept(_ep));
  674. const size_t NCHANS = 8;
  675. brpc::Channel subchans[NCHANS];
  676. brpc::ParallelChannel channel;
  677. for (size_t i = 0; i < NCHANS; ++i) {
  678. SetUpChannel(&subchans[i], single_server, short_connection);
  679. ASSERT_EQ(0, channel.AddChannel(
  680. &subchans[i], brpc::DOESNT_OWN_CHANNEL,
  681. new GetReqAndAddRes, new MergeNothing));
  682. }
  683. brpc::Controller cntl;
  684. test::ComboRequest req;
  685. test::ComboResponse res;
  686. CallMethod(&channel, &cntl, &req, &res, false);
  687. ASSERT_TRUE(cntl.Failed()); // req does not have .requests
  688. ASSERT_EQ(brpc::EREQUEST, cntl.ErrorCode());
  689. for (size_t i = 0; i < NCHANS; ++i) {
  690. ::test::EchoRequest* sub_req = req.add_requests();
  691. sub_req->set_message(butil::string_printf("hello_%llu", (long long)i));
  692. sub_req->set_code(i + 1);
  693. }
  694. // non-parallel channel does not work.
  695. cntl.Reset();
  696. CallMethod(&subchans[0], &cntl, &req, &res, false);
  697. ASSERT_TRUE(cntl.Failed());
  698. ASSERT_EQ(brpc::EINTERNAL, cntl.ErrorCode()) << cntl.ErrorText();
  699. ASSERT_TRUE(butil::StringPiece(cntl.ErrorText()).ends_with("Method ComboEcho() not implemented."));
  700. // do the rpc call.
  701. cntl.Reset();
  702. CallMethod(&channel, &cntl, &req, &res, async);
  703. EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
  704. ASSERT_GT(cntl.latency_us(), 0);
  705. ASSERT_EQ((int)NCHANS, res.responses_size());
  706. for (int i = 0; i < res.responses_size(); ++i) {
  707. EXPECT_EQ(butil::string_printf("received hello_%d", i),
  708. res.responses(i).message());
  709. ASSERT_EQ(1, res.responses(i).code_list_size());
  710. EXPECT_EQ(i + 1, res.responses(i).code_list(0));
  711. }
  712. if (short_connection) {
  713. // Sleep to let `_messenger' detect `Socket' being `SetFailed'
  714. const int64_t start_time = butil::gettimeofday_us();
  715. while (_messenger.ConnectionCount() != 0) {
  716. EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
  717. bthread_usleep(1000);
  718. }
  719. } else {
  720. EXPECT_GE(1ul, _messenger.ConnectionCount());
  721. }
  722. StopAndJoin();
  723. }
  724. struct CancelerArg {
  725. int64_t sleep_before_cancel_us;
  726. brpc::CallId cid;
  727. };
  728. static void* Canceler(void* void_arg) {
  729. CancelerArg* arg = static_cast<CancelerArg*>(void_arg);
  730. if (arg->sleep_before_cancel_us > 0) {
  731. bthread_usleep(arg->sleep_before_cancel_us);
  732. }
  733. LOG(INFO) << "Start to cancel cid=" << arg->cid.value;
  734. brpc::StartCancel(arg->cid);
  735. return NULL;
  736. }
  737. void CancelBeforeCallMethod(
  738. bool single_server, bool async, bool short_connection) {
  739. std::cout << " *** single=" << single_server
  740. << " async=" << async
  741. << " short=" << short_connection << std::endl;
  742. ASSERT_EQ(0, StartAccept(_ep));
  743. brpc::Channel channel;
  744. SetUpChannel(&channel, single_server, short_connection);
  745. brpc::Controller cntl;
  746. test::EchoRequest req;
  747. test::EchoResponse res;
  748. req.set_message(__FUNCTION__);
  749. const brpc::CallId cid = cntl.call_id();
  750. ASSERT_TRUE(cid.value != 0);
  751. brpc::StartCancel(cid);
  752. CallMethod(&channel, &cntl, &req, &res, async);
  753. EXPECT_EQ(ECANCELED, cntl.ErrorCode()) << cntl.ErrorText();
  754. StopAndJoin();
  755. }
  756. void CancelBeforeCallMethodParallel(
  757. bool single_server, bool async, bool short_connection) {
  758. std::cout << " *** single=" << single_server
  759. << " async=" << async
  760. << " short=" << short_connection << std::endl;
  761. ASSERT_EQ(0, StartAccept(_ep));
  762. const size_t NCHANS = 8;
  763. brpc::Channel subchans[NCHANS];
  764. brpc::ParallelChannel channel;
  765. for (size_t i = 0; i < NCHANS; ++i) {
  766. SetUpChannel(&subchans[i], single_server, short_connection);
  767. ASSERT_EQ(0, channel.AddChannel(
  768. &subchans[i], brpc::DOESNT_OWN_CHANNEL,
  769. NULL, NULL));
  770. }
  771. brpc::Controller cntl;
  772. test::EchoRequest req;
  773. test::EchoResponse res;
  774. req.set_message(__FUNCTION__);
  775. const brpc::CallId cid = cntl.call_id();
  776. ASSERT_TRUE(cid.value != 0);
  777. brpc::StartCancel(cid);
  778. CallMethod(&channel, &cntl, &req, &res, async);
  779. EXPECT_EQ(ECANCELED, cntl.ErrorCode()) << cntl.ErrorText();
  780. EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
  781. EXPECT_TRUE(NULL == cntl.sub(1));
  782. EXPECT_TRUE(NULL == cntl.sub(0));
  783. StopAndJoin();
  784. }
  785. void CancelBeforeCallMethodSelective(
  786. bool single_server, bool async, bool short_connection) {
  787. std::cout << " *** single=" << single_server
  788. << " async=" << async
  789. << " short=" << short_connection << std::endl;
  790. ASSERT_EQ(0, StartAccept(_ep));
  791. const size_t NCHANS = 8;
  792. brpc::SelectiveChannel channel;
  793. ASSERT_EQ(0, channel.Init("rr", NULL));
  794. for (size_t i = 0; i < NCHANS; ++i) {
  795. brpc::Channel* subchan = new brpc::Channel;
  796. SetUpChannel(subchan, single_server, short_connection);
  797. ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
  798. }
  799. brpc::Controller cntl;
  800. test::EchoRequest req;
  801. test::EchoResponse res;
  802. req.set_message(__FUNCTION__);
  803. const brpc::CallId cid = cntl.call_id();
  804. ASSERT_TRUE(cid.value != 0);
  805. brpc::StartCancel(cid);
  806. CallMethod(&channel, &cntl, &req, &res, async);
  807. EXPECT_EQ(ECANCELED, cntl.ErrorCode()) << cntl.ErrorText();
  808. StopAndJoin();
  809. }
  810. void CancelDuringCallMethod(
  811. bool single_server, bool async, bool short_connection) {
  812. std::cout << " *** single=" << single_server
  813. << " async=" << async
  814. << " short=" << short_connection << std::endl;
  815. ASSERT_EQ(0, StartAccept(_ep));
  816. brpc::Channel channel;
  817. SetUpChannel(&channel, single_server, short_connection);
  818. brpc::Controller cntl;
  819. test::EchoRequest req;
  820. test::EchoResponse res;
  821. req.set_message(__FUNCTION__);
  822. const brpc::CallId cid = cntl.call_id();
  823. ASSERT_TRUE(cid.value != 0);
  824. pthread_t th;
  825. CancelerArg carg = { 10000, cid };
  826. ASSERT_EQ(0, pthread_create(&th, NULL, Canceler, &carg));
  827. req.set_sleep_us(carg.sleep_before_cancel_us * 2);
  828. butil::Timer tm;
  829. tm.start();
  830. CallMethod(&channel, &cntl, &req, &res, async);
  831. tm.stop();
  832. EXPECT_LT(labs(tm.u_elapsed() - carg.sleep_before_cancel_us), 10000);
  833. ASSERT_EQ(0, pthread_join(th, NULL));
  834. EXPECT_EQ(ECANCELED, cntl.ErrorCode());
  835. EXPECT_EQ(0, cntl.sub_count());
  836. EXPECT_TRUE(NULL == cntl.sub(1));
  837. EXPECT_TRUE(NULL == cntl.sub(0));
  838. StopAndJoin();
  839. }
  840. void CancelDuringCallMethodParallel(
  841. bool single_server, bool async, bool short_connection) {
  842. std::cout << " *** single=" << single_server
  843. << " async=" << async
  844. << " short=" << short_connection << std::endl;
  845. ASSERT_EQ(0, StartAccept(_ep));
  846. const size_t NCHANS = 8;
  847. brpc::Channel subchans[NCHANS];
  848. brpc::ParallelChannel channel;
  849. for (size_t i = 0; i < NCHANS; ++i) {
  850. SetUpChannel(&subchans[i], single_server, short_connection);
  851. ASSERT_EQ(0, channel.AddChannel(
  852. &subchans[i], brpc::DOESNT_OWN_CHANNEL,
  853. NULL, NULL));
  854. }
  855. brpc::Controller cntl;
  856. test::EchoRequest req;
  857. test::EchoResponse res;
  858. req.set_message(__FUNCTION__);
  859. const brpc::CallId cid = cntl.call_id();
  860. ASSERT_TRUE(cid.value != 0);
  861. pthread_t th;
  862. CancelerArg carg = { 10000, cid };
  863. ASSERT_EQ(0, pthread_create(&th, NULL, Canceler, &carg));
  864. req.set_sleep_us(carg.sleep_before_cancel_us * 2);
  865. butil::Timer tm;
  866. tm.start();
  867. CallMethod(&channel, &cntl, &req, &res, async);
  868. tm.stop();
  869. EXPECT_LT(labs(tm.u_elapsed() - carg.sleep_before_cancel_us), 10000);
  870. ASSERT_EQ(0, pthread_join(th, NULL));
  871. EXPECT_EQ(ECANCELED, cntl.ErrorCode());
  872. EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
  873. for (int i = 0; i < cntl.sub_count(); ++i) {
  874. EXPECT_EQ(ECANCELED, cntl.sub(i)->ErrorCode()) << "i=" << i;
  875. }
  876. EXPECT_LT(labs(cntl.latency_us() - carg.sleep_before_cancel_us), 10000);
  877. StopAndJoin();
  878. }
  879. void CancelDuringCallMethodSelective(
  880. bool single_server, bool async, bool short_connection) {
  881. std::cout << " *** single=" << single_server
  882. << " async=" << async
  883. << " short=" << short_connection << std::endl;
  884. ASSERT_EQ(0, StartAccept(_ep));
  885. const size_t NCHANS = 8;
  886. brpc::SelectiveChannel channel;
  887. ASSERT_EQ(0, channel.Init("rr", NULL));
  888. for (size_t i = 0; i < NCHANS; ++i) {
  889. brpc::Channel* subchan = new brpc::Channel;
  890. SetUpChannel(subchan, single_server, short_connection);
  891. ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
  892. }
  893. brpc::Controller cntl;
  894. test::EchoRequest req;
  895. test::EchoResponse res;
  896. req.set_message(__FUNCTION__);
  897. const brpc::CallId cid = cntl.call_id();
  898. ASSERT_TRUE(cid.value != 0);
  899. pthread_t th;
  900. CancelerArg carg = { 10000, cid };
  901. ASSERT_EQ(0, pthread_create(&th, NULL, Canceler, &carg));
  902. req.set_sleep_us(carg.sleep_before_cancel_us * 2);
  903. butil::Timer tm;
  904. tm.start();
  905. CallMethod(&channel, &cntl, &req, &res, async);
  906. tm.stop();
  907. EXPECT_LT(labs(tm.u_elapsed() - carg.sleep_before_cancel_us), 10000);
  908. ASSERT_EQ(0, pthread_join(th, NULL));
  909. EXPECT_EQ(ECANCELED, cntl.ErrorCode());
  910. EXPECT_EQ(1, cntl.sub_count());
  911. EXPECT_EQ(ECANCELED, cntl.sub(0)->ErrorCode());
  912. StopAndJoin();
  913. }
  914. void CancelAfterCallMethod(
  915. bool single_server, bool async, bool short_connection) {
  916. std::cout << " *** single=" << single_server
  917. << " async=" << async
  918. << " short=" << short_connection << std::endl;
  919. ASSERT_EQ(0, StartAccept(_ep));
  920. brpc::Channel channel;
  921. SetUpChannel(&channel, single_server, short_connection);
  922. brpc::Controller cntl;
  923. test::EchoRequest req;
  924. test::EchoResponse res;
  925. req.set_message(__FUNCTION__);
  926. const brpc::CallId cid = cntl.call_id();
  927. ASSERT_TRUE(cid.value != 0);
  928. CallMethod(&channel, &cntl, &req, &res, async);
  929. EXPECT_EQ(0, cntl.ErrorCode());
  930. EXPECT_EQ(0, cntl.sub_count());
  931. ASSERT_EQ(EINVAL, bthread_id_error(cid, ECANCELED));
  932. StopAndJoin();
  933. }
  934. void CancelAfterCallMethodParallel(
  935. bool single_server, bool async, bool short_connection) {
  936. std::cout << " *** single=" << single_server
  937. << " async=" << async
  938. << " short=" << short_connection << std::endl;
  939. ASSERT_EQ(0, StartAccept(_ep));
  940. const size_t NCHANS = 8;
  941. brpc::Channel subchans[NCHANS];
  942. brpc::ParallelChannel channel;
  943. for (size_t i = 0; i < NCHANS; ++i) {
  944. SetUpChannel(&subchans[i], single_server, short_connection);
  945. ASSERT_EQ(0, channel.AddChannel(
  946. &subchans[i], brpc::DOESNT_OWN_CHANNEL,
  947. NULL, NULL));
  948. }
  949. brpc::Controller cntl;
  950. test::EchoRequest req;
  951. test::EchoResponse res;
  952. req.set_message(__FUNCTION__);
  953. const brpc::CallId cid = cntl.call_id();
  954. ASSERT_TRUE(cid.value != 0);
  955. CallMethod(&channel, &cntl, &req, &res, async);
  956. EXPECT_EQ(0, cntl.ErrorCode());
  957. EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
  958. for (int i = 0; i < cntl.sub_count(); ++i) {
  959. EXPECT_TRUE(cntl.sub(i) && !cntl.sub(i)->Failed()) << "i=" << i;
  960. }
  961. ASSERT_EQ(EINVAL, bthread_id_error(cid, ECANCELED));
  962. StopAndJoin();
  963. }
  964. void TestAttachment(bool async, bool short_connection) {
  965. ASSERT_EQ(0, StartAccept(_ep));
  966. brpc::Channel channel;
  967. SetUpChannel(&channel, true, short_connection);
  968. brpc::Controller cntl;
  969. cntl.request_attachment().append("attachment");
  970. test::EchoRequest req;
  971. test::EchoResponse res;
  972. req.set_message(__FUNCTION__);
  973. CallMethod(&channel, &cntl, &req, &res, async);
  974. EXPECT_EQ(0, cntl.ErrorCode()) << short_connection;
  975. EXPECT_FALSE(cntl.request_attachment().empty())
  976. << ", " << async << ", " << short_connection;
  977. EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
  978. if (short_connection) {
  979. // Sleep to let `_messenger' detect `Socket' being `SetFailed'
  980. const int64_t start_time = butil::gettimeofday_us();
  981. while (_messenger.ConnectionCount() != 0) {
  982. EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
  983. bthread_usleep(1000);
  984. }
  985. } else {
  986. EXPECT_GE(1ul, _messenger.ConnectionCount());
  987. }
  988. StopAndJoin();
  989. }
  990. void TestRequestNotInit(bool single_server, bool async,
  991. bool short_connection) {
  992. std::cout << " *** single=" << single_server
  993. << " async=" << async
  994. << " short=" << short_connection << std::endl;
  995. ASSERT_EQ(0, StartAccept(_ep));
  996. brpc::Channel channel;
  997. SetUpChannel(&channel, single_server, short_connection);
  998. brpc::Controller cntl;
  999. test::EchoRequest req;
  1000. test::EchoResponse res;
  1001. CallMethod(&channel, &cntl, &req, &res, async);
  1002. EXPECT_EQ(brpc::EREQUEST, cntl.ErrorCode()) << cntl.ErrorText();
  1003. StopAndJoin();
  1004. }
  1005. void TestRequestNotInitParallel(bool single_server, bool async,
  1006. bool short_connection) {
  1007. std::cout << " *** single=" << single_server
  1008. << " async=" << async
  1009. << " short=" << short_connection << std::endl;
  1010. ASSERT_EQ(0, StartAccept(_ep));
  1011. const size_t NCHANS = 8;
  1012. brpc::Channel subchans[NCHANS];
  1013. brpc::ParallelChannel channel;
  1014. for (size_t i = 0; i < NCHANS; ++i) {
  1015. SetUpChannel(&subchans[i], single_server, short_connection);
  1016. ASSERT_EQ(0, channel.AddChannel(
  1017. &subchans[i], brpc::DOESNT_OWN_CHANNEL,
  1018. NULL, NULL));
  1019. }
  1020. brpc::Controller cntl;
  1021. test::EchoRequest req;
  1022. test::EchoResponse res;
  1023. CallMethod(&channel, &cntl, &req, &res, async);
  1024. EXPECT_EQ(brpc::EREQUEST, cntl.ErrorCode()) << cntl.ErrorText();
  1025. LOG(WARNING) << cntl.ErrorText();
  1026. StopAndJoin();
  1027. }
  1028. void TestRequestNotInitSelective(bool single_server, bool async,
  1029. bool short_connection) {
  1030. std::cout << " *** single=" << single_server
  1031. << " async=" << async
  1032. << " short=" << short_connection << std::endl;
  1033. ASSERT_EQ(0, StartAccept(_ep));
  1034. const size_t NCHANS = 8;
  1035. brpc::SelectiveChannel channel;
  1036. ASSERT_EQ(0, channel.Init("rr", NULL));
  1037. for (size_t i = 0; i < NCHANS; ++i) {
  1038. brpc::Channel* subchan = new brpc::Channel;
  1039. SetUpChannel(subchan, single_server, short_connection);
  1040. ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
  1041. }
  1042. brpc::Controller cntl;
  1043. test::EchoRequest req;
  1044. test::EchoResponse res;
  1045. CallMethod(&channel, &cntl, &req, &res, async);
  1046. EXPECT_EQ(brpc::EREQUEST, cntl.ErrorCode()) << cntl.ErrorText();
  1047. LOG(WARNING) << cntl.ErrorText();
  1048. ASSERT_EQ(1, cntl.sub_count());
  1049. ASSERT_EQ(brpc::EREQUEST, cntl.sub(0)->ErrorCode());
  1050. StopAndJoin();
  1051. }
  1052. void TestRPCTimeout(bool single_server, bool async, bool short_connection) {
  1053. std::cout << " *** single=" << single_server
  1054. << " async=" << async
  1055. << " short=" << short_connection << std::endl;
  1056. ASSERT_EQ(0, StartAccept(_ep));
  1057. brpc::Channel channel;
  1058. SetUpChannel(&channel, single_server, short_connection);
  1059. brpc::Controller cntl;
  1060. test::EchoRequest req;
  1061. test::EchoResponse res;
  1062. req.set_message(__FUNCTION__);
  1063. req.set_sleep_us(70000); // 70ms
  1064. cntl.set_timeout_ms(17);
  1065. butil::Timer tm;
  1066. tm.start();
  1067. CallMethod(&channel, &cntl, &req, &res, async);
  1068. tm.stop();
  1069. EXPECT_EQ(brpc::ERPCTIMEDOUT, cntl.ErrorCode()) << cntl.ErrorText();
  1070. EXPECT_LT(labs(tm.m_elapsed() - cntl.timeout_ms()), 15);
  1071. StopAndJoin();
  1072. }
  1073. void TestRPCTimeoutParallel(
  1074. bool single_server, bool async, bool short_connection) {
  1075. std::cout << " *** single=" << single_server
  1076. << " async=" << async
  1077. << " short=" << short_connection << std::endl;
  1078. ASSERT_EQ(0, StartAccept(_ep));
  1079. const size_t NCHANS = 8;
  1080. brpc::Channel subchans[NCHANS];
  1081. brpc::ParallelChannel channel;
  1082. for (size_t i = 0; i < NCHANS; ++i) {
  1083. SetUpChannel(&subchans[i], single_server, short_connection);
  1084. ASSERT_EQ(0, channel.AddChannel(
  1085. &subchans[i], brpc::DOESNT_OWN_CHANNEL,
  1086. NULL, NULL));
  1087. }
  1088. brpc::Controller cntl;
  1089. test::EchoRequest req;
  1090. test::EchoResponse res;
  1091. req.set_message(__FUNCTION__);
  1092. cntl.set_timeout_ms(17);
  1093. req.set_sleep_us(70000); // 70ms
  1094. butil::Timer tm;
  1095. tm.start();
  1096. CallMethod(&channel, &cntl, &req, &res, async);
  1097. tm.stop();
  1098. EXPECT_EQ(brpc::ERPCTIMEDOUT, cntl.ErrorCode()) << cntl.ErrorText();
  1099. EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
  1100. for (int i = 0; i < cntl.sub_count(); ++i) {
  1101. EXPECT_EQ(ECANCELED, cntl.sub(i)->ErrorCode()) << "i=" << i;
  1102. }
  1103. EXPECT_LT(labs(tm.m_elapsed() - cntl.timeout_ms()), 15);
  1104. StopAndJoin();
  1105. }
  1106. class MakeTheRequestTimeout : public brpc::CallMapper {
  1107. public:
  1108. brpc::SubCall Map(
  1109. int /*channel_index*/,
  1110. const google::protobuf::MethodDescriptor* method,
  1111. const google::protobuf::Message* req_base,
  1112. google::protobuf::Message* response) {
  1113. test::EchoRequest* req = brpc::Clone<test::EchoRequest>(req_base);
  1114. req->set_sleep_us(70000); // 70ms
  1115. return brpc::SubCall(method, req, response->New(),
  1116. brpc::DELETE_REQUEST | brpc::DELETE_RESPONSE);
  1117. }
  1118. };
  1119. void TimeoutStillChecksSubChannelsParallel(
  1120. bool single_server, bool async, bool short_connection) {
  1121. std::cout << " *** single=" << single_server
  1122. << " async=" << async
  1123. << " short=" << short_connection << std::endl;
  1124. ASSERT_EQ(0, StartAccept(_ep));
  1125. const size_t NCHANS = 8;
  1126. brpc::Channel subchans[NCHANS];
  1127. brpc::ParallelChannel channel;
  1128. for (size_t i = 0; i < NCHANS; ++i) {
  1129. SetUpChannel(&subchans[i], single_server, short_connection);
  1130. ASSERT_EQ(0, channel.AddChannel(
  1131. &subchans[i], brpc::DOESNT_OWN_CHANNEL,
  1132. ((i % 2) ? new MakeTheRequestTimeout : NULL), NULL));
  1133. }
  1134. brpc::Controller cntl;
  1135. test::EchoRequest req;
  1136. test::EchoResponse res;
  1137. req.set_message(__FUNCTION__);
  1138. cntl.set_timeout_ms(30);
  1139. butil::Timer tm;
  1140. tm.start();
  1141. CallMethod(&channel, &cntl, &req, &res, async);
  1142. tm.stop();
  1143. EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
  1144. EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
  1145. for (int i = 0; i < cntl.sub_count(); ++i) {
  1146. if (i % 2) {
  1147. EXPECT_EQ(ECANCELED, cntl.sub(i)->ErrorCode());
  1148. } else {
  1149. EXPECT_EQ(0, cntl.sub(i)->ErrorCode());
  1150. }
  1151. }
  1152. EXPECT_LT(labs(tm.m_elapsed() - cntl.timeout_ms()), 15);
  1153. StopAndJoin();
  1154. }
  1155. void TestRPCTimeoutSelective(
  1156. bool single_server, bool async, bool short_connection) {
  1157. std::cout << " *** single=" << single_server
  1158. << " async=" << async
  1159. << " short=" << short_connection << std::endl;
  1160. ASSERT_EQ(0, StartAccept(_ep));
  1161. const size_t NCHANS = 8;
  1162. brpc::SelectiveChannel channel;
  1163. ASSERT_EQ(0, channel.Init("rr", NULL));
  1164. for (size_t i = 0; i < NCHANS; ++i) {
  1165. brpc::Channel* subchan = new brpc::Channel;
  1166. SetUpChannel(subchan, single_server, short_connection);
  1167. ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
  1168. }
  1169. brpc::Controller cntl;
  1170. test::EchoRequest req;
  1171. test::EchoResponse res;
  1172. req.set_message(__FUNCTION__);
  1173. cntl.set_timeout_ms(17);
  1174. req.set_sleep_us(70000); // 70ms
  1175. butil::Timer tm;
  1176. tm.start();
  1177. CallMethod(&channel, &cntl, &req, &res, async);
  1178. tm.stop();
  1179. EXPECT_EQ(brpc::ERPCTIMEDOUT, cntl.ErrorCode()) << cntl.ErrorText();
  1180. EXPECT_EQ(1, cntl.sub_count());
  1181. EXPECT_EQ(brpc::ERPCTIMEDOUT, cntl.sub(0)->ErrorCode());
  1182. EXPECT_LT(labs(tm.m_elapsed() - cntl.timeout_ms()), 15);
  1183. StopAndJoin();
  1184. }
  1185. void TestCloseFD(bool single_server, bool async, bool short_connection) {
  1186. std::cout << " *** single=" << single_server
  1187. << " async=" << async
  1188. << " short=" << short_connection << std::endl;
  1189. ASSERT_EQ(0, StartAccept(_ep));
  1190. brpc::Channel channel;
  1191. SetUpChannel(&channel, single_server, short_connection);
  1192. brpc::Controller cntl;
  1193. test::EchoRequest req;
  1194. test::EchoResponse res;
  1195. req.set_message(__FUNCTION__);
  1196. req.set_close_fd(true);
  1197. CallMethod(&channel, &cntl, &req, &res, async);
  1198. EXPECT_EQ(brpc::EEOF, cntl.ErrorCode()) << cntl.ErrorText();
  1199. StopAndJoin();
  1200. }
  1201. void TestCloseFDParallel(bool single_server, bool async, bool short_connection) {
  1202. std::cout << " *** single=" << single_server
  1203. << " async=" << async
  1204. << " short=" << short_connection << std::endl;
  1205. ASSERT_EQ(0, StartAccept(_ep));
  1206. const size_t NCHANS = 8;
  1207. brpc::Channel subchans[NCHANS];
  1208. brpc::ParallelChannel channel;
  1209. for (size_t i = 0; i < NCHANS; ++i) {
  1210. SetUpChannel(&subchans[i], single_server, short_connection);
  1211. ASSERT_EQ(0, channel.AddChannel(
  1212. &subchans[i], brpc::DOESNT_OWN_CHANNEL,
  1213. NULL, NULL));
  1214. }
  1215. brpc::Controller cntl;
  1216. test::EchoRequest req;
  1217. test::EchoResponse res;
  1218. req.set_message(__FUNCTION__);
  1219. req.set_close_fd(true);
  1220. CallMethod(&channel, &cntl, &req, &res, async);
  1221. EXPECT_TRUE(brpc::EEOF == cntl.ErrorCode() ||
  1222. brpc::ETOOMANYFAILS == cntl.ErrorCode() ||
  1223. ECONNRESET == cntl.ErrorCode()) << cntl.ErrorText();
  1224. StopAndJoin();
  1225. }
  1226. void TestCloseFDSelective(bool single_server, bool async, bool short_connection) {
  1227. std::cout << " *** single=" << single_server
  1228. << " async=" << async
  1229. << " short=" << short_connection << std::endl;
  1230. ASSERT_EQ(0, StartAccept(_ep));
  1231. const size_t NCHANS = 8;
  1232. brpc::SelectiveChannel channel;
  1233. brpc::ChannelOptions options;
  1234. options.max_retry = 0;
  1235. ASSERT_EQ(0, channel.Init("rr", &options));
  1236. for (size_t i = 0; i < NCHANS; ++i) {
  1237. brpc::Channel* subchan = new brpc::Channel;
  1238. SetUpChannel(subchan, single_server, short_connection);
  1239. ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
  1240. }
  1241. brpc::Controller cntl;
  1242. test::EchoRequest req;
  1243. test::EchoResponse res;
  1244. req.set_message(__FUNCTION__);
  1245. req.set_close_fd(true);
  1246. CallMethod(&channel, &cntl, &req, &res, async);
  1247. EXPECT_EQ(brpc::EEOF, cntl.ErrorCode()) << cntl.ErrorText();
  1248. ASSERT_EQ(1, cntl.sub_count());
  1249. ASSERT_EQ(brpc::EEOF, cntl.sub(0)->ErrorCode());
  1250. StopAndJoin();
  1251. }
  1252. void TestServerFail(bool single_server, bool async, bool short_connection) {
  1253. std::cout << " *** single=" << single_server
  1254. << " async=" << async
  1255. << " short=" << short_connection << std::endl;
  1256. ASSERT_EQ(0, StartAccept(_ep));
  1257. brpc::Channel channel;
  1258. SetUpChannel(&channel, single_server, short_connection);
  1259. brpc::Controller cntl;
  1260. test::EchoRequest req;
  1261. test::EchoResponse res;
  1262. req.set_message(__FUNCTION__);
  1263. req.set_server_fail(brpc::EINTERNAL);
  1264. CallMethod(&channel, &cntl, &req, &res, async);
  1265. EXPECT_EQ(brpc::EINTERNAL, cntl.ErrorCode()) << cntl.ErrorText();
  1266. StopAndJoin();
  1267. }
  1268. void TestServerFailParallel(bool single_server, bool async, bool short_connection) {
  1269. std::cout << " *** single=" << single_server
  1270. << " async=" << async
  1271. << " short=" << short_connection << std::endl;
  1272. ASSERT_EQ(0, StartAccept(_ep));
  1273. const size_t NCHANS = 8;
  1274. brpc::Channel subchans[NCHANS];
  1275. brpc::ParallelChannel channel;
  1276. for (size_t i = 0; i < NCHANS; ++i) {
  1277. SetUpChannel(&subchans[i], single_server, short_connection);
  1278. ASSERT_EQ(0, channel.AddChannel(
  1279. &subchans[i], brpc::DOESNT_OWN_CHANNEL,
  1280. NULL, NULL));
  1281. }
  1282. brpc::Controller cntl;
  1283. test::EchoRequest req;
  1284. test::EchoResponse res;
  1285. req.set_message(__FUNCTION__);
  1286. req.set_server_fail(brpc::EINTERNAL);
  1287. CallMethod(&channel, &cntl, &req, &res, async);
  1288. EXPECT_EQ(brpc::EINTERNAL, cntl.ErrorCode()) << cntl.ErrorText();
  1289. LOG(INFO) << cntl.ErrorText();
  1290. StopAndJoin();
  1291. }
  1292. void TestServerFailSelective(bool single_server, bool async, bool short_connection) {
  1293. std::cout << " *** single=" << single_server
  1294. << " async=" << async
  1295. << " short=" << short_connection << std::endl;
  1296. ASSERT_EQ(0, StartAccept(_ep));
  1297. const size_t NCHANS = 5;
  1298. brpc::SelectiveChannel channel;
  1299. ASSERT_EQ(0, channel.Init("rr", NULL));
  1300. for (size_t i = 0; i < NCHANS; ++i) {
  1301. brpc::Channel* subchan = new brpc::Channel;
  1302. SetUpChannel(subchan, single_server, short_connection);
  1303. ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
  1304. }
  1305. brpc::Controller cntl;
  1306. test::EchoRequest req;
  1307. test::EchoResponse res;
  1308. req.set_message(__FUNCTION__);
  1309. req.set_server_fail(brpc::EINTERNAL);
  1310. CallMethod(&channel, &cntl, &req, &res, async);
  1311. EXPECT_EQ(brpc::EINTERNAL, cntl.ErrorCode()) << cntl.ErrorText();
  1312. ASSERT_EQ(1, cntl.sub_count());
  1313. ASSERT_EQ(brpc::EINTERNAL, cntl.sub(0)->ErrorCode());
  1314. LOG(INFO) << cntl.ErrorText();
  1315. StopAndJoin();
  1316. }
  1317. void TestDestroyChannel(bool single_server, bool short_connection) {
  1318. std::cout << "*** single=" << single_server
  1319. << ", short=" << short_connection << std::endl;
  1320. ASSERT_EQ(0, StartAccept(_ep));
  1321. brpc::Channel* channel = new brpc::Channel();
  1322. SetUpChannel(channel, single_server, short_connection);
  1323. brpc::Controller cntl;
  1324. test::EchoRequest req;
  1325. test::EchoResponse res;
  1326. req.set_message(__FUNCTION__);
  1327. req.set_sleep_us(10000);
  1328. CallMethod(channel, &cntl, &req, &res, true, true/*destroy*/);
  1329. EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
  1330. EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
  1331. // Sleep to let `_messenger' detect `Socket' being `SetFailed'
  1332. const int64_t start_time = butil::gettimeofday_us();
  1333. while (_messenger.ConnectionCount() != 0) {
  1334. EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
  1335. bthread_usleep(1000);
  1336. }
  1337. StopAndJoin();
  1338. }
  1339. void TestDestroyChannelParallel(bool single_server, bool short_connection) {
  1340. std::cout << "*** single=" << single_server
  1341. << ", short=" << short_connection << std::endl;
  1342. const size_t NCHANS = 5;
  1343. ASSERT_EQ(0, StartAccept(_ep));
  1344. brpc::ParallelChannel* channel = new brpc::ParallelChannel;
  1345. for (size_t i = 0; i < NCHANS; ++i) {
  1346. brpc::Channel* subchan = new brpc::Channel();
  1347. SetUpChannel(subchan, single_server, short_connection);
  1348. ASSERT_EQ(0, channel->AddChannel(
  1349. subchan, brpc::OWNS_CHANNEL, NULL, NULL));
  1350. }
  1351. brpc::Controller cntl;
  1352. test::EchoRequest req;
  1353. test::EchoResponse res;
  1354. req.set_sleep_us(10000);
  1355. req.set_message(__FUNCTION__);
  1356. CallMethod(channel, &cntl, &req, &res, true, true/*destroy*/);
  1357. EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
  1358. EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
  1359. // Sleep to let `_messenger' detect `Socket' being `SetFailed'
  1360. const int64_t start_time = butil::gettimeofday_us();
  1361. while (_messenger.ConnectionCount() != 0) {
  1362. EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
  1363. bthread_usleep(1000);
  1364. }
  1365. StopAndJoin();
  1366. }
  1367. void TestDestroyChannelSelective(bool single_server, bool short_connection) {
  1368. std::cout << "*** single=" << single_server
  1369. << ", short=" << short_connection << std::endl;
  1370. const size_t NCHANS = 5;
  1371. ASSERT_EQ(0, StartAccept(_ep));
  1372. brpc::SelectiveChannel* channel = new brpc::SelectiveChannel;
  1373. ASSERT_EQ(0, channel->Init("rr", NULL));
  1374. for (size_t i = 0; i < NCHANS; ++i) {
  1375. brpc::Channel* subchan = new brpc::Channel();
  1376. SetUpChannel(subchan, single_server, short_connection);
  1377. ASSERT_EQ(0, channel->AddChannel(subchan, NULL));
  1378. }
  1379. brpc::Controller cntl;
  1380. test::EchoRequest req;
  1381. test::EchoResponse res;
  1382. req.set_sleep_us(10000);
  1383. req.set_message(__FUNCTION__);
  1384. CallMethod(channel, &cntl, &req, &res, true, true/*destroy*/);
  1385. EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
  1386. EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
  1387. ASSERT_EQ(_ep, cntl.remote_side());
  1388. ASSERT_EQ(1, cntl.sub_count());
  1389. ASSERT_EQ(0, cntl.sub(0)->ErrorCode());
  1390. // Sleep to let `_messenger' detect `Socket' being `SetFailed'
  1391. const int64_t start_time = butil::gettimeofday_us();
  1392. while (_messenger.ConnectionCount() != 0) {
  1393. EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
  1394. bthread_usleep(1000);
  1395. }
  1396. StopAndJoin();
  1397. }
  1398. void RPCThread(brpc::ChannelBase* channel, bool async) {
  1399. brpc::Controller cntl;
  1400. test::EchoRequest req;
  1401. test::EchoResponse res;
  1402. req.set_message(__FUNCTION__);
  1403. CallMethod(channel, &cntl, &req, &res, async);
  1404. ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
  1405. EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
  1406. }
  1407. void RPCThread(brpc::ChannelBase* channel, bool async, int count) {
  1408. brpc::Controller cntl;
  1409. for (int i = 0; i < count; ++i) {
  1410. test::EchoRequest req;
  1411. test::EchoResponse res;
  1412. req.set_message(__FUNCTION__);
  1413. CallMethod(channel, &cntl, &req, &res, async);
  1414. ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
  1415. ASSERT_EQ("received " + std::string(__FUNCTION__), res.message());
  1416. cntl.Reset();
  1417. }
  1418. }
  1419. void RPCThread(bool single_server, bool async, bool short_connection,
  1420. const brpc::Authenticator* auth, int count) {
  1421. brpc::Channel channel;
  1422. SetUpChannel(&channel, single_server, short_connection, auth);
  1423. brpc::Controller cntl;
  1424. for (int i = 0; i < count; ++i) {
  1425. test::EchoRequest req;
  1426. test::EchoResponse res;
  1427. req.set_message(__FUNCTION__);
  1428. CallMethod(&channel, &cntl, &req, &res, async);
  1429. ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
  1430. ASSERT_EQ("received " + std::string(__FUNCTION__), res.message());
  1431. cntl.Reset();
  1432. }
  1433. }
  1434. void TestAuthentication(bool single_server,
  1435. bool async, bool short_connection) {
  1436. std::cout << " *** single=" << single_server
  1437. << " async=" << async
  1438. << " short=" << short_connection << std::endl;
  1439. ASSERT_EQ(0, StartAccept(_ep));
  1440. MyAuthenticator auth;
  1441. brpc::Channel channel;
  1442. SetUpChannel(&channel, single_server, short_connection, &auth);
  1443. const int NUM = 10;
  1444. pthread_t tids[NUM];
  1445. for (int i = 0; i < NUM; ++i) {
  1446. google::protobuf::Closure* thrd_func =
  1447. brpc::NewCallback(
  1448. this, &ChannelTest::RPCThread, (brpc::ChannelBase*)&channel, async);
  1449. EXPECT_EQ(0, pthread_create(&tids[i], NULL,
  1450. RunClosure, thrd_func));
  1451. }
  1452. for (int i = 0; i < NUM; ++i) {
  1453. pthread_join(tids[i], NULL);
  1454. }
  1455. if (short_connection) {
  1456. EXPECT_EQ(NUM, auth.count.load());
  1457. } else {
  1458. EXPECT_EQ(1, auth.count.load());
  1459. }
  1460. StopAndJoin();
  1461. }
  1462. void TestAuthenticationParallel(bool single_server,
  1463. bool async, bool short_connection) {
  1464. std::cout << " *** single=" << single_server
  1465. << " async=" << async
  1466. << " short=" << short_connection << std::endl;
  1467. ASSERT_EQ(0, StartAccept(_ep));
  1468. MyAuthenticator auth;
  1469. const int NCHANS = 5;
  1470. brpc::Channel subchans[NCHANS];
  1471. brpc::ParallelChannel channel;
  1472. for (int i = 0; i < NCHANS; ++i) {
  1473. SetUpChannel(&subchans[i], single_server, short_connection, &auth);
  1474. ASSERT_EQ(0, channel.AddChannel(
  1475. &subchans[i], brpc::DOESNT_OWN_CHANNEL,
  1476. NULL, NULL));
  1477. }
  1478. const int NUM = 10;
  1479. pthread_t tids[NUM];
  1480. for (int i = 0; i < NUM; ++i) {
  1481. google::protobuf::Closure* thrd_func =
  1482. brpc::NewCallback(
  1483. this, &ChannelTest::RPCThread, (brpc::ChannelBase*)&channel, async);
  1484. EXPECT_EQ(0, pthread_create(&tids[i], NULL,
  1485. RunClosure, thrd_func));
  1486. }
  1487. for (int i = 0; i < NUM; ++i) {
  1488. pthread_join(tids[i], NULL);
  1489. }
  1490. if (short_connection) {
  1491. EXPECT_EQ(NUM * NCHANS, auth.count.load());
  1492. } else {
  1493. EXPECT_EQ(1, auth.count.load());
  1494. }
  1495. StopAndJoin();
  1496. }
  1497. void TestAuthenticationSelective(bool single_server,
  1498. bool async, bool short_connection) {
  1499. std::cout << " *** single=" << single_server
  1500. << " async=" << async
  1501. << " short=" << short_connection << std::endl;
  1502. ASSERT_EQ(0, StartAccept(_ep));
  1503. MyAuthenticator auth;
  1504. const size_t NCHANS = 5;
  1505. brpc::SelectiveChannel channel;
  1506. ASSERT_EQ(0, channel.Init("rr", NULL));
  1507. for (size_t i = 0; i < NCHANS; ++i) {
  1508. brpc::Channel* subchan = new brpc::Channel;
  1509. SetUpChannel(subchan, single_server, short_connection, &auth);
  1510. ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
  1511. }
  1512. const int NUM = 10;
  1513. pthread_t tids[NUM];
  1514. for (int i = 0; i < NUM; ++i) {
  1515. google::protobuf::Closure* thrd_func =
  1516. brpc::NewCallback(
  1517. this, &ChannelTest::RPCThread, (brpc::ChannelBase*)&channel, async);
  1518. EXPECT_EQ(0, pthread_create(&tids[i], NULL,
  1519. RunClosure, thrd_func));
  1520. }
  1521. for (int i = 0; i < NUM; ++i) {
  1522. pthread_join(tids[i], NULL);
  1523. }
  1524. if (short_connection) {
  1525. EXPECT_EQ(NUM, auth.count.load());
  1526. } else {
  1527. EXPECT_EQ(1, auth.count.load());
  1528. }
  1529. StopAndJoin();
  1530. }
  1531. void TestRetry(bool single_server, bool async, bool short_connection) {
  1532. std::cout << " *** single=" << single_server
  1533. << " async=" << async
  1534. << " short=" << short_connection << std::endl;
  1535. ASSERT_EQ(0, StartAccept(_ep));
  1536. brpc::Channel channel;
  1537. SetUpChannel(&channel, single_server, short_connection);
  1538. const int RETRY_NUM = 3;
  1539. test::EchoRequest req;
  1540. test::EchoResponse res;
  1541. brpc::Controller cntl;
  1542. req.set_message(__FUNCTION__);
  1543. // No retry when timeout
  1544. cntl.set_max_retry(RETRY_NUM);
  1545. cntl.set_timeout_ms(10); // 10ms
  1546. req.set_sleep_us(70000); // 70ms
  1547. CallMethod(&channel, &cntl, &req, &res, async);
  1548. EXPECT_EQ(brpc::ERPCTIMEDOUT, cntl.ErrorCode()) << cntl.ErrorText();
  1549. EXPECT_EQ(0, cntl.retried_count());
  1550. bthread_usleep(100000); // wait for the sleep task to finish
  1551. // Retry when connection broken
  1552. cntl.Reset();
  1553. cntl.set_max_retry(RETRY_NUM);
  1554. _close_fd_once = true;
  1555. req.set_sleep_us(0);
  1556. CallMethod(&channel, &cntl, &req, &res, async);
  1557. if (short_connection) {
  1558. // Always succeed
  1559. EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
  1560. EXPECT_EQ(1, cntl.retried_count());
  1561. const int64_t start_time = butil::gettimeofday_us();
  1562. while (_messenger.ConnectionCount() != 0) {
  1563. EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
  1564. bthread_usleep(1000);
  1565. }
  1566. } else {
  1567. // May fail if health checker can't revive in time
  1568. if (cntl.Failed()) {
  1569. EXPECT_EQ(EHOSTDOWN, cntl.ErrorCode()) << single_server << ", " << async;
  1570. EXPECT_EQ(RETRY_NUM, cntl.retried_count());
  1571. } else {
  1572. EXPECT_TRUE(cntl.retried_count() > 0);
  1573. }
  1574. }
  1575. StopAndJoin();
  1576. bthread_usleep(100000); // wait for stop
  1577. // Retry when connection failed
  1578. cntl.Reset();
  1579. cntl.set_max_retry(RETRY_NUM);
  1580. CallMethod(&channel, &cntl, &req, &res, async);
  1581. EXPECT_EQ(EHOSTDOWN, cntl.ErrorCode());
  1582. EXPECT_EQ(RETRY_NUM, cntl.retried_count());
  1583. }
  1584. void TestRetryOtherServer(bool async, bool short_connection) {
  1585. ASSERT_EQ(0, StartAccept(_ep));
  1586. brpc::Channel channel;
  1587. brpc::ChannelOptions opt;
  1588. if (short_connection) {
  1589. opt.connection_type = brpc::CONNECTION_TYPE_SHORT;
  1590. }
  1591. butil::TempFile server_list;
  1592. EXPECT_EQ(0, server_list.save_format(
  1593. "127.0.0.1:100\n"
  1594. "127.0.0.1:200\n"
  1595. "%s", endpoint2str(_ep).c_str()));
  1596. std::string naming_url = std::string("fIle://")
  1597. + server_list.fname();
  1598. EXPECT_EQ(0, channel.Init(naming_url.c_str(), "RR", &opt));
  1599. const int RETRY_NUM = 3;
  1600. test::EchoRequest req;
  1601. test::EchoResponse res;
  1602. brpc::Controller cntl;
  1603. req.set_message(__FUNCTION__);
  1604. cntl.set_max_retry(RETRY_NUM);
  1605. CallMethod(&channel, &cntl, &req, &res, async);
  1606. EXPECT_EQ(0, cntl.ErrorCode()) << async << ", " << short_connection;
  1607. StopAndJoin();
  1608. }
  1609. butil::EndPoint _ep;
  1610. butil::TempFile _server_list;
  1611. std::string _naming_url;
  1612. brpc::Acceptor _messenger;
  1613. // Dummy server for `Server::AddError'
  1614. brpc::Server _dummy;
  1615. std::string _mock_fail_str;
  1616. bool _close_fd_once;
  1617. MyEchoService _svc;
  1618. };
  1619. class MyShared : public brpc::SharedObject {
  1620. public:
  1621. MyShared() { ++ nctor; }
  1622. MyShared(const MyShared&) : brpc::SharedObject() { ++ nctor; }
  1623. ~MyShared() { ++ ndtor; }
  1624. static int nctor;
  1625. static int ndtor;
  1626. };
  1627. int MyShared::nctor = 0;
  1628. int MyShared::ndtor = 0;
  1629. TEST_F(ChannelTest, intrusive_ptr_sanity) {
  1630. MyShared::nctor = 0;
  1631. MyShared::ndtor = 0;
  1632. {
  1633. MyShared* s1 = new MyShared;
  1634. ASSERT_EQ(0, s1->ref_count());
  1635. butil::intrusive_ptr<MyShared> p1 = s1;
  1636. ASSERT_EQ(1, p1->ref_count());
  1637. {
  1638. butil::intrusive_ptr<MyShared> p2 = s1;
  1639. ASSERT_EQ(2, p2->ref_count());
  1640. ASSERT_EQ(2, p1->ref_count());
  1641. }
  1642. ASSERT_EQ(1, p1->ref_count());
  1643. }
  1644. ASSERT_EQ(1, MyShared::nctor);
  1645. ASSERT_EQ(1, MyShared::ndtor);
  1646. }
  1647. TEST_F(ChannelTest, init_as_single_server) {
  1648. {
  1649. brpc::Channel channel;
  1650. ASSERT_EQ(-1, channel.Init("127.0.0.1:12345:asdf", NULL));
  1651. ASSERT_EQ(-1, channel.Init("127.0.0.1:99999", NULL));
  1652. ASSERT_EQ(0, channel.Init("127.0.0.1:8888", NULL));
  1653. }
  1654. {
  1655. brpc::Channel channel;
  1656. ASSERT_EQ(-1, channel.Init("127.0.0.1asdf", 12345, NULL));
  1657. ASSERT_EQ(-1, channel.Init("127.0.0.1", 99999, NULL));
  1658. ASSERT_EQ(0, channel.Init("127.0.0.1", 8888, NULL));
  1659. }
  1660. butil::EndPoint ep;
  1661. brpc::Channel channel;
  1662. ASSERT_EQ(0, str2endpoint("127.0.0.1:8888", &ep));
  1663. ASSERT_EQ(0, channel.Init(ep, NULL));
  1664. ASSERT_TRUE(channel.SingleServer());
  1665. ASSERT_EQ(ep, channel._server_address);
  1666. brpc::SocketId id;
  1667. ASSERT_EQ(0, brpc::SocketMapFind(brpc::SocketMapKey(ep), &id));
  1668. ASSERT_EQ(id, channel._server_id);
  1669. const int NUM = 10;
  1670. brpc::Channel channels[NUM];
  1671. for (int i = 0; i < 10; ++i) {
  1672. ASSERT_EQ(0, channels[i].Init(ep, NULL));
  1673. // Share the same server socket
  1674. ASSERT_EQ(id, channels[i]._server_id);
  1675. }
  1676. }
  1677. TEST_F(ChannelTest, init_using_unknown_naming_service) {
  1678. brpc::Channel channel;
  1679. ASSERT_EQ(-1, channel.Init("unknown://unknown", "unknown", NULL));
  1680. }
  1681. TEST_F(ChannelTest, init_using_unexist_fns) {
  1682. brpc::Channel channel;
  1683. ASSERT_EQ(-1, channel.Init("fiLe://no_such_file", "rr", NULL));
  1684. }
  1685. TEST_F(ChannelTest, init_using_empty_fns) {
  1686. brpc::ChannelOptions opt;
  1687. opt.succeed_without_server = false;
  1688. brpc::Channel channel;
  1689. butil::TempFile server_list;
  1690. ASSERT_EQ(0, server_list.save(""));
  1691. std::string naming_url = std::string("file://") + server_list.fname();
  1692. // empty file list results in error.
  1693. ASSERT_EQ(-1, channel.Init(naming_url.c_str(), "rr", &opt));
  1694. ASSERT_EQ(0, server_list.save("blahblah"));
  1695. // No valid address.
  1696. ASSERT_EQ(-1, channel.Init(naming_url.c_str(), "rr", NULL));
  1697. }
  1698. TEST_F(ChannelTest, init_using_empty_lns) {
  1699. brpc::ChannelOptions opt;
  1700. opt.succeed_without_server = false;
  1701. brpc::Channel channel;
  1702. ASSERT_EQ(-1, channel.Init("list:// ", "rr", &opt));
  1703. ASSERT_EQ(-1, channel.Init("list://", "rr", &opt));
  1704. ASSERT_EQ(-1, channel.Init("list://blahblah", "rr", &opt));
  1705. }
  1706. TEST_F(ChannelTest, init_using_naming_service) {
  1707. brpc::Channel* channel = new brpc::Channel();
  1708. butil::TempFile server_list;
  1709. ASSERT_EQ(0, server_list.save("127.0.0.1:8888"));
  1710. std::string naming_url = std::string("filE://") + server_list.fname();
  1711. // Rr are intended to test case-insensitivity.
  1712. ASSERT_EQ(0, channel->Init(naming_url.c_str(), "Rr", NULL));
  1713. ASSERT_FALSE(channel->SingleServer());
  1714. brpc::LoadBalancerWithNaming* lb =
  1715. dynamic_cast<brpc::LoadBalancerWithNaming*>(channel->_lb.get());
  1716. ASSERT_TRUE(lb != NULL);
  1717. brpc::NamingServiceThread* ns = lb->_nsthread_ptr.get();
  1718. {
  1719. const int NUM = 10;
  1720. brpc::Channel channels[NUM];
  1721. for (int i = 0; i < NUM; ++i) {
  1722. // Share the same naming thread
  1723. ASSERT_EQ(0, channels[i].Init(naming_url.c_str(), "rr", NULL));
  1724. brpc::LoadBalancerWithNaming* lb2 =
  1725. dynamic_cast<brpc::LoadBalancerWithNaming*>(channels[i]._lb.get());
  1726. ASSERT_TRUE(lb2 != NULL);
  1727. ASSERT_EQ(ns, lb2->_nsthread_ptr.get());
  1728. }
  1729. }
  1730. // `lb' should be valid even if `channel' has destroyed
  1731. // since we hold another reference to it
  1732. butil::intrusive_ptr<brpc::SharedLoadBalancer>
  1733. another_ctx = channel->_lb;
  1734. delete channel;
  1735. ASSERT_EQ(lb, another_ctx.get());
  1736. ASSERT_EQ(1, another_ctx->_nref.load());
  1737. // `lb' should be destroyed after
  1738. }
  1739. TEST_F(ChannelTest, connection_failed) {
  1740. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1741. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1742. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1743. TestConnectionFailed(i, j, k);
  1744. }
  1745. }
  1746. }
  1747. }
  1748. TEST_F(ChannelTest, empty_parallel_channel) {
  1749. brpc::ParallelChannel channel;
  1750. brpc::Controller cntl;
  1751. test::EchoRequest req;
  1752. test::EchoResponse res;
  1753. req.set_message(__FUNCTION__);
  1754. CallMethod(&channel, &cntl, &req, &res, false);
  1755. EXPECT_EQ(EPERM, cntl.ErrorCode()) << cntl.ErrorText();
  1756. }
  1757. TEST_F(ChannelTest, empty_selective_channel) {
  1758. brpc::SelectiveChannel channel;
  1759. ASSERT_EQ(0, channel.Init("rr", NULL));
  1760. brpc::Controller cntl;
  1761. test::EchoRequest req;
  1762. test::EchoResponse res;
  1763. req.set_message(__FUNCTION__);
  1764. CallMethod(&channel, &cntl, &req, &res, false);
  1765. EXPECT_EQ(ENODATA, cntl.ErrorCode()) << cntl.ErrorText();
  1766. }
  1767. class BadCall : public brpc::CallMapper {
  1768. brpc::SubCall Map(int,
  1769. const google::protobuf::MethodDescriptor*,
  1770. const google::protobuf::Message*,
  1771. google::protobuf::Message*) {
  1772. return brpc::SubCall::Bad();
  1773. }
  1774. };
  1775. TEST_F(ChannelTest, returns_bad_parallel) {
  1776. const size_t NCHANS = 5;
  1777. brpc::ParallelChannel channel;
  1778. for (size_t i = 0; i < NCHANS; ++i) {
  1779. brpc::Channel* subchan = new brpc::Channel();
  1780. SetUpChannel(subchan, true, false);
  1781. ASSERT_EQ(0, channel.AddChannel(
  1782. subchan, brpc::OWNS_CHANNEL, new BadCall, NULL));
  1783. }
  1784. brpc::Controller cntl;
  1785. test::EchoRequest req;
  1786. test::EchoResponse res;
  1787. req.set_message(__FUNCTION__);
  1788. CallMethod(&channel, &cntl, &req, &res, false);
  1789. EXPECT_EQ(brpc::EREQUEST, cntl.ErrorCode()) << cntl.ErrorText();
  1790. }
  1791. class SkipCall : public brpc::CallMapper {
  1792. brpc::SubCall Map(int,
  1793. const google::protobuf::MethodDescriptor*,
  1794. const google::protobuf::Message*,
  1795. google::protobuf::Message*) {
  1796. return brpc::SubCall::Skip();
  1797. }
  1798. };
  1799. TEST_F(ChannelTest, skip_all_channels) {
  1800. const size_t NCHANS = 5;
  1801. brpc::ParallelChannel channel;
  1802. for (size_t i = 0; i < NCHANS; ++i) {
  1803. brpc::Channel* subchan = new brpc::Channel();
  1804. SetUpChannel(subchan, true, false);
  1805. ASSERT_EQ(0, channel.AddChannel(
  1806. subchan, brpc::OWNS_CHANNEL, new SkipCall, NULL));
  1807. }
  1808. brpc::Controller cntl;
  1809. test::EchoRequest req;
  1810. test::EchoResponse res;
  1811. req.set_message(__FUNCTION__);
  1812. CallMethod(&channel, &cntl, &req, &res, false);
  1813. EXPECT_EQ(ECANCELED, cntl.ErrorCode()) << cntl.ErrorText();
  1814. EXPECT_EQ((int)NCHANS, cntl.sub_count());
  1815. for (int i = 0; i < cntl.sub_count(); ++i) {
  1816. EXPECT_TRUE(NULL == cntl.sub(i)) << "i=" << i;
  1817. }
  1818. }
  1819. TEST_F(ChannelTest, connection_failed_parallel) {
  1820. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1821. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1822. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1823. TestConnectionFailedParallel(i, j, k);
  1824. }
  1825. }
  1826. }
  1827. }
  1828. TEST_F(ChannelTest, connection_failed_selective) {
  1829. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1830. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1831. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1832. TestConnectionFailedSelective(i, j, k);
  1833. }
  1834. }
  1835. }
  1836. }
  1837. TEST_F(ChannelTest, success) {
  1838. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1839. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1840. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1841. TestSuccess(i, j, k);
  1842. }
  1843. }
  1844. }
  1845. }
  1846. TEST_F(ChannelTest, success_parallel) {
  1847. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1848. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1849. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1850. TestSuccessParallel(i, j, k);
  1851. }
  1852. }
  1853. }
  1854. }
  1855. TEST_F(ChannelTest, success_duplicated_parallel) {
  1856. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1857. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1858. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1859. TestSuccessDuplicatedParallel(i, j, k);
  1860. }
  1861. }
  1862. }
  1863. }
  1864. TEST_F(ChannelTest, success_selective) {
  1865. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1866. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1867. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1868. TestSuccessSelective(i, j, k);
  1869. }
  1870. }
  1871. }
  1872. }
  1873. TEST_F(ChannelTest, skip_parallel) {
  1874. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1875. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1876. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1877. TestSkipParallel(i, j, k);
  1878. }
  1879. }
  1880. }
  1881. }
  1882. TEST_F(ChannelTest, success_parallel2) {
  1883. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1884. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1885. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1886. TestSuccessParallel2(i, j, k);
  1887. }
  1888. }
  1889. }
  1890. }
  1891. TEST_F(ChannelTest, cancel_before_callmethod) {
  1892. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1893. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1894. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1895. CancelBeforeCallMethod(i, j, k);
  1896. }
  1897. }
  1898. }
  1899. }
  1900. TEST_F(ChannelTest, cancel_before_callmethod_parallel) {
  1901. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1902. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1903. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1904. CancelBeforeCallMethodParallel(i, j, k);
  1905. }
  1906. }
  1907. }
  1908. }
  1909. TEST_F(ChannelTest, cancel_before_callmethod_selective) {
  1910. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1911. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1912. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1913. CancelBeforeCallMethodSelective(i, j, k);
  1914. }
  1915. }
  1916. }
  1917. }
  1918. TEST_F(ChannelTest, cancel_during_callmethod) {
  1919. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1920. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1921. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1922. CancelDuringCallMethod(i, j, k);
  1923. }
  1924. }
  1925. }
  1926. }
  1927. TEST_F(ChannelTest, cancel_during_callmethod_parallel) {
  1928. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1929. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1930. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1931. CancelDuringCallMethodParallel(i, j, k);
  1932. }
  1933. }
  1934. }
  1935. }
  1936. TEST_F(ChannelTest, cancel_during_callmethod_selective) {
  1937. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1938. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1939. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1940. CancelDuringCallMethodSelective(i, j, k);
  1941. }
  1942. }
  1943. }
  1944. }
  1945. TEST_F(ChannelTest, cancel_after_callmethod) {
  1946. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1947. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1948. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1949. CancelAfterCallMethod(i, j, k);
  1950. }
  1951. }
  1952. }
  1953. }
  1954. TEST_F(ChannelTest, cancel_after_callmethod_parallel) {
  1955. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1956. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1957. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1958. CancelAfterCallMethodParallel(i, j, k);
  1959. }
  1960. }
  1961. }
  1962. }
  1963. TEST_F(ChannelTest, request_not_init) {
  1964. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1965. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1966. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1967. TestRequestNotInit(i, j, k);
  1968. }
  1969. }
  1970. }
  1971. }
  1972. TEST_F(ChannelTest, request_not_init_parallel) {
  1973. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1974. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1975. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1976. TestRequestNotInitParallel(i, j, k);
  1977. }
  1978. }
  1979. }
  1980. }
  1981. TEST_F(ChannelTest, request_not_init_selective) {
  1982. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1983. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1984. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1985. TestRequestNotInitSelective(i, j, k);
  1986. }
  1987. }
  1988. }
  1989. }
  1990. TEST_F(ChannelTest, timeout) {
  1991. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  1992. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  1993. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  1994. TestRPCTimeout(i, j, k);
  1995. }
  1996. }
  1997. }
  1998. }
  1999. TEST_F(ChannelTest, timeout_parallel) {
  2000. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  2001. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  2002. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  2003. TestRPCTimeoutParallel(i, j, k);
  2004. }
  2005. }
  2006. }
  2007. }
  2008. TEST_F(ChannelTest, timeout_still_checks_sub_channels_parallel) {
  2009. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  2010. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  2011. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  2012. TimeoutStillChecksSubChannelsParallel(i, j, k);
  2013. }
  2014. }
  2015. }
  2016. }
  2017. TEST_F(ChannelTest, timeout_selective) {
  2018. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  2019. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  2020. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  2021. TestRPCTimeoutSelective(i, j, k);
  2022. }
  2023. }
  2024. }
  2025. }
  2026. TEST_F(ChannelTest, close_fd) {
  2027. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  2028. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  2029. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  2030. TestCloseFD(i, j, k);
  2031. }
  2032. }
  2033. }
  2034. }
  2035. TEST_F(ChannelTest, close_fd_parallel) {
  2036. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  2037. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  2038. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  2039. TestCloseFDParallel(i, j, k);
  2040. }
  2041. }
  2042. }
  2043. }
  2044. TEST_F(ChannelTest, close_fd_selective) {
  2045. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  2046. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  2047. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  2048. TestCloseFDSelective(i, j, k);
  2049. }
  2050. }
  2051. }
  2052. }
  2053. TEST_F(ChannelTest, server_fail) {
  2054. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  2055. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  2056. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  2057. TestServerFail(i, j, k);
  2058. }
  2059. }
  2060. }
  2061. }
  2062. TEST_F(ChannelTest, server_fail_parallel) {
  2063. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  2064. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  2065. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  2066. TestServerFailParallel(i, j, k);
  2067. }
  2068. }
  2069. }
  2070. }
  2071. TEST_F(ChannelTest, server_fail_selective) {
  2072. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  2073. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  2074. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  2075. TestServerFailSelective(i, j, k);
  2076. }
  2077. }
  2078. }
  2079. }
  2080. TEST_F(ChannelTest, authentication) {
  2081. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  2082. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  2083. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  2084. TestAuthentication(i, j, k);
  2085. }
  2086. }
  2087. }
  2088. }
  2089. TEST_F(ChannelTest, authentication_parallel) {
  2090. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  2091. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  2092. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  2093. TestAuthenticationParallel(i, j, k);
  2094. }
  2095. }
  2096. }
  2097. }
  2098. TEST_F(ChannelTest, authentication_selective) {
  2099. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  2100. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  2101. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  2102. TestAuthenticationSelective(i, j, k);
  2103. }
  2104. }
  2105. }
  2106. }
  2107. TEST_F(ChannelTest, retry) {
  2108. for (int i = 0; i <= 1; ++i) { // Flag SingleServer
  2109. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  2110. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  2111. TestRetry(i, j, k);
  2112. }
  2113. }
  2114. }
  2115. }
  2116. TEST_F(ChannelTest, retry_other_servers) {
  2117. for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
  2118. for (int k = 0; k <=1; ++k) { // Flag ShortConnection
  2119. TestRetryOtherServer(j, k);
  2120. }
  2121. }
  2122. }
  2123. TEST_F(ChannelTest, multiple_threads_single_channel) {
  2124. srand(time(NULL));
  2125. ASSERT_EQ(0, StartAccept(_ep));
  2126. MyAuthenticator auth;
  2127. const int NUM = 10;
  2128. const int COUNT = 10000;
  2129. pthread_t tids[NUM];
  2130. // Cause massive connect/close log if setting to true
  2131. bool short_connection = false;
  2132. for (int single_server = 0; single_server <= 1; ++single_server) {
  2133. for (int need_auth = 0; need_auth <= 1; ++need_auth) {
  2134. for (int async = 0; async <= 1; ++async) {
  2135. std::cout << " *** short=" << short_connection
  2136. << " single=" << single_server
  2137. << " auth=" << need_auth
  2138. << " async=" << async << std::endl;
  2139. brpc::Channel channel;
  2140. SetUpChannel(&channel, single_server,
  2141. short_connection, (need_auth ? &auth : NULL));
  2142. for (int i = 0; i < NUM; ++i) {
  2143. google::protobuf::Closure* thrd_func =
  2144. brpc::NewCallback(
  2145. this, &ChannelTest::RPCThread,
  2146. (brpc::ChannelBase*)&channel,
  2147. (bool)async, COUNT);
  2148. EXPECT_EQ(0, pthread_create(&tids[i], NULL,
  2149. RunClosure, thrd_func));
  2150. }
  2151. for (int i = 0; i < NUM; ++i) {
  2152. pthread_join(tids[i], NULL);
  2153. }
  2154. }
  2155. }
  2156. }
  2157. }
  2158. TEST_F(ChannelTest, multiple_threads_multiple_channels) {
  2159. srand(time(NULL));
  2160. ASSERT_EQ(0, StartAccept(_ep));
  2161. MyAuthenticator auth;
  2162. const int NUM = 10;
  2163. const int COUNT = 10000;
  2164. pthread_t tids[NUM];
  2165. // Cause massive connect/close log if setting to true
  2166. bool short_connection = false;
  2167. for (int single_server = 0; single_server <= 1; ++single_server) {
  2168. for (int need_auth = 0; need_auth <= 1; ++need_auth) {
  2169. for (int async = 0; async <= 1; ++async) {
  2170. std::cout << " *** short=" << short_connection
  2171. << " single=" << single_server
  2172. << " auth=" << need_auth
  2173. << " async=" << async << std::endl;
  2174. for (int i = 0; i < NUM; ++i) {
  2175. google::protobuf::Closure* thrd_func =
  2176. brpc::NewCallback<
  2177. ChannelTest, ChannelTest*,
  2178. bool, bool, bool, const brpc::Authenticator*, int>
  2179. (this, &ChannelTest::RPCThread, single_server,
  2180. async, short_connection, (need_auth ? &auth : NULL), COUNT);
  2181. EXPECT_EQ(0, pthread_create(&tids[i], NULL,
  2182. RunClosure, thrd_func));
  2183. }
  2184. for (int i = 0; i < NUM; ++i) {
  2185. pthread_join(tids[i], NULL);
  2186. }
  2187. }
  2188. }
  2189. }
  2190. }
  2191. TEST_F(ChannelTest, clear_attachment_after_retry) {
  2192. for (int j = 0; j <= 1; ++j) {
  2193. for (int k = 0; k <= 1; ++k) {
  2194. TestAttachment(j, k);
  2195. }
  2196. }
  2197. }
  2198. TEST_F(ChannelTest, destroy_channel) {
  2199. for (int i = 0; i <= 1; ++i) {
  2200. for (int j = 0; j <= 1; ++j) {
  2201. TestDestroyChannel(i, j);
  2202. }
  2203. }
  2204. }
  2205. TEST_F(ChannelTest, destroy_channel_parallel) {
  2206. for (int i = 0; i <= 1; ++i) {
  2207. for (int j = 0; j <= 1; ++j) {
  2208. TestDestroyChannelParallel(i, j);
  2209. }
  2210. }
  2211. }
  2212. TEST_F(ChannelTest, destroy_channel_selective) {
  2213. for (int i = 0; i <= 1; ++i) {
  2214. for (int j = 0; j <= 1; ++j) {
  2215. TestDestroyChannelSelective(i, j);
  2216. }
  2217. }
  2218. }
  2219. TEST_F(ChannelTest, sizeof) {
  2220. LOG(INFO) << "Size of Channel is " << sizeof(brpc::Channel)
  2221. << ", Size of ParallelChannel is " << sizeof(brpc::ParallelChannel)
  2222. << ", Size of Controller is " << sizeof(brpc::Controller)
  2223. << ", Size of vector is " << sizeof(std::vector<brpc::Controller>);
  2224. }
  2225. brpc::Channel g_chan;
  2226. TEST_F(ChannelTest, global_channel_should_quit_successfully) {
  2227. g_chan.Init("bns://qa-pbrpc.SAT.tjyx", "rr", NULL);
  2228. }
  2229. TEST_F(ChannelTest, unused_call_id) {
  2230. {
  2231. brpc::Controller cntl;
  2232. }
  2233. {
  2234. brpc::Controller cntl;
  2235. cntl.Reset();
  2236. }
  2237. brpc::CallId cid1 = { 0 };
  2238. {
  2239. brpc::Controller cntl;
  2240. cid1 = cntl.call_id();
  2241. }
  2242. ASSERT_EQ(EINVAL, bthread_id_error(cid1, ECANCELED));
  2243. {
  2244. brpc::CallId cid2 = { 0 };
  2245. brpc::Controller cntl;
  2246. cid2 = cntl.call_id();
  2247. cntl.Reset();
  2248. ASSERT_EQ(EINVAL, bthread_id_error(cid2, ECANCELED));
  2249. }
  2250. }
  2251. TEST_F(ChannelTest, adaptive_connection_type) {
  2252. brpc::AdaptiveConnectionType ctype;
  2253. ASSERT_EQ(brpc::CONNECTION_TYPE_UNKNOWN, ctype);
  2254. ASSERT_FALSE(ctype.has_error());
  2255. ASSERT_STREQ("unknown", ctype.name());
  2256. ctype = brpc::CONNECTION_TYPE_SINGLE;
  2257. ASSERT_EQ(brpc::CONNECTION_TYPE_SINGLE, ctype);
  2258. ASSERT_STREQ("single", ctype.name());
  2259. ctype = "shorT";
  2260. ASSERT_EQ(brpc::CONNECTION_TYPE_SHORT, ctype);
  2261. ASSERT_STREQ("short", ctype.name());
  2262. ctype = "PooLed";
  2263. ASSERT_EQ(brpc::CONNECTION_TYPE_POOLED, ctype);
  2264. ASSERT_STREQ("pooled", ctype.name());
  2265. ctype = "SINGLE";
  2266. ASSERT_EQ(brpc::CONNECTION_TYPE_SINGLE, ctype);
  2267. ASSERT_FALSE(ctype.has_error());
  2268. ASSERT_STREQ("single", ctype.name());
  2269. ctype = "blah";
  2270. ASSERT_EQ(brpc::CONNECTION_TYPE_UNKNOWN, ctype);
  2271. ASSERT_TRUE(ctype.has_error());
  2272. ASSERT_STREQ("unknown", ctype.name());
  2273. ctype = "single";
  2274. ASSERT_EQ(brpc::CONNECTION_TYPE_SINGLE, ctype);
  2275. ASSERT_FALSE(ctype.has_error());
  2276. ASSERT_STREQ("single", ctype.name());
  2277. }
  2278. TEST_F(ChannelTest, adaptive_protocol_type) {
  2279. brpc::AdaptiveProtocolType ptype;
  2280. ASSERT_EQ(brpc::PROTOCOL_UNKNOWN, ptype);
  2281. ASSERT_STREQ("unknown", ptype.name());
  2282. ASSERT_FALSE(ptype.has_param());
  2283. ASSERT_EQ("", ptype.param());
  2284. ptype = brpc::PROTOCOL_HTTP;
  2285. ASSERT_EQ(brpc::PROTOCOL_HTTP, ptype);
  2286. ASSERT_STREQ("http", ptype.name());
  2287. ASSERT_FALSE(ptype.has_param());
  2288. ASSERT_EQ("", ptype.param());
  2289. ptype = "http:xyz ";
  2290. ASSERT_EQ(brpc::PROTOCOL_HTTP, ptype);
  2291. ASSERT_STREQ("http", ptype.name());
  2292. ASSERT_TRUE(ptype.has_param());
  2293. ASSERT_EQ("xyz ", ptype.param());
  2294. ptype = "HuLu_pbRPC";
  2295. ASSERT_EQ(brpc::PROTOCOL_HULU_PBRPC, ptype);
  2296. ASSERT_STREQ("hulu_pbrpc", ptype.name());
  2297. ASSERT_FALSE(ptype.has_param());
  2298. ASSERT_EQ("", ptype.param());
  2299. ptype = "blah";
  2300. ASSERT_EQ(brpc::PROTOCOL_UNKNOWN, ptype);
  2301. ASSERT_STREQ("blah", ptype.name());
  2302. ASSERT_FALSE(ptype.has_param());
  2303. ASSERT_EQ("", ptype.param());
  2304. ptype = "Baidu_STD";
  2305. ASSERT_EQ(brpc::PROTOCOL_BAIDU_STD, ptype);
  2306. ASSERT_STREQ("baidu_std", ptype.name());
  2307. ASSERT_FALSE(ptype.has_param());
  2308. ASSERT_EQ("", ptype.param());
  2309. }
  2310. } //namespace