brpc_socket_unittest.cpp 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // brpc - A framework to host and access services throughout Baidu.
  18. // Date: Sun Jul 13 15:04:18 CST 2014
  19. #include <sys/types.h>
  20. #include <sys/socket.h>
  21. #include <fcntl.h> // F_GETFD
  22. #include <gtest/gtest.h>
  23. #include <gflags/gflags.h>
  24. #include "butil/gperftools_profiler.h"
  25. #include "butil/time.h"
  26. #include "butil/macros.h"
  27. #include "butil/fd_utility.h"
  28. #include "bthread/unstable.h"
  29. #include "bthread/task_control.h"
  30. #include "brpc/socket.h"
  31. #include "brpc/errno.pb.h"
  32. #include "brpc/acceptor.h"
  33. #include "brpc/policy/hulu_pbrpc_protocol.h"
  34. #include "brpc/policy/most_common_message.h"
  35. #include "brpc/policy/http_rpc_protocol.h"
  36. #include "brpc/nshead.h"
  37. #include "brpc/server.h"
  38. #include "brpc/channel.h"
  39. #include "brpc/controller.h"
  40. #include "health_check.pb.h"
  41. #if defined(OS_MACOSX)
  42. #include <sys/event.h>
  43. #endif
  44. #define CONNECT_IN_KEEPWRITE 1;
  45. namespace bthread {
  46. extern TaskControl* g_task_control;
  47. }
  48. namespace brpc {
  49. DECLARE_int32(health_check_interval);
  50. }
  51. void EchoProcessHuluRequest(brpc::InputMessageBase* msg_base);
  52. int main(int argc, char* argv[]) {
  53. testing::InitGoogleTest(&argc, argv);
  54. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
  55. brpc::Protocol dummy_protocol =
  56. { brpc::policy::ParseHuluMessage,
  57. brpc::SerializeRequestDefault,
  58. brpc::policy::PackHuluRequest,
  59. EchoProcessHuluRequest, EchoProcessHuluRequest,
  60. NULL, NULL, NULL,
  61. brpc::CONNECTION_TYPE_ALL, "dummy_hulu" };
  62. EXPECT_EQ(0, RegisterProtocol((brpc::ProtocolType)30, dummy_protocol));
  63. return RUN_ALL_TESTS();
  64. }
  65. struct WaitData {
  66. bthread_id_t id;
  67. int error_code;
  68. std::string error_text;
  69. WaitData() : id(INVALID_BTHREAD_ID), error_code(0) {}
  70. };
  71. int OnWaitIdReset(bthread_id_t id, void* data, int error_code,
  72. const std::string& error_text) {
  73. static_cast<WaitData*>(data)->id = id;
  74. static_cast<WaitData*>(data)->error_code = error_code;
  75. static_cast<WaitData*>(data)->error_text = error_text;
  76. return bthread_id_unlock_and_destroy(id);
  77. }
  78. class SocketTest : public ::testing::Test{
  79. protected:
  80. SocketTest(){
  81. };
  82. virtual ~SocketTest(){};
  83. virtual void SetUp() {
  84. };
  85. virtual void TearDown() {
  86. };
  87. };
  88. brpc::Socket* global_sock = NULL;
  89. class CheckRecycle : public brpc::SocketUser {
  90. void BeforeRecycle(brpc::Socket* s) {
  91. ASSERT_TRUE(global_sock);
  92. ASSERT_EQ(global_sock, s);
  93. global_sock = NULL;
  94. delete this;
  95. }
  96. };
  97. TEST_F(SocketTest, not_recycle_until_zero_nref) {
  98. std::cout << "sizeof(Socket)=" << sizeof(brpc::Socket) << std::endl;
  99. int fds[2];
  100. ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
  101. brpc::SocketId id = 8888;
  102. butil::EndPoint dummy;
  103. ASSERT_EQ(0, str2endpoint("192.168.1.26:8080", &dummy));
  104. brpc::SocketOptions options;
  105. options.fd = fds[1];
  106. options.remote_side = dummy;
  107. options.user = new CheckRecycle;
  108. ASSERT_EQ(0, brpc::Socket::Create(options, &id));
  109. {
  110. brpc::SocketUniquePtr s;
  111. ASSERT_EQ(0, brpc::Socket::Address(id, &s));
  112. global_sock = s.get();
  113. ASSERT_TRUE(s.get());
  114. ASSERT_EQ(fds[1], s->fd());
  115. ASSERT_EQ(dummy, s->remote_side());
  116. ASSERT_EQ(id, s->id());
  117. ASSERT_EQ(0, s->SetFailed());
  118. ASSERT_EQ(s.get(), global_sock);
  119. }
  120. ASSERT_EQ((brpc::Socket*)NULL, global_sock);
  121. close(fds[0]);
  122. brpc::SocketUniquePtr ptr;
  123. ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr));
  124. }
  125. butil::atomic<int> winner_count(0);
  126. const int AUTH_ERR = -9;
  127. void* auth_fighter(void* arg) {
  128. bthread_usleep(10000);
  129. int auth_error = 0;
  130. brpc::Socket* s = (brpc::Socket*)arg;
  131. if (s->FightAuthentication(&auth_error) == 0) {
  132. winner_count.fetch_add(1);
  133. s->SetAuthentication(AUTH_ERR);
  134. } else {
  135. EXPECT_EQ(AUTH_ERR, auth_error);
  136. }
  137. return NULL;
  138. }
  139. TEST_F(SocketTest, authentication) {
  140. brpc::SocketId id;
  141. brpc::SocketOptions options;
  142. ASSERT_EQ(0, brpc::Socket::Create(options, &id));
  143. brpc::SocketUniquePtr s;
  144. ASSERT_EQ(0, brpc::Socket::Address(id, &s));
  145. bthread_t th[64];
  146. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  147. ASSERT_EQ(0, bthread_start_urgent(&th[i], NULL, auth_fighter, s.get()));
  148. }
  149. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  150. ASSERT_EQ(0, bthread_join(th[i], NULL));
  151. }
  152. // Only one fighter wins
  153. ASSERT_EQ(1, winner_count.load());
  154. // Fight after signal is OK
  155. int auth_error = 0;
  156. ASSERT_NE(0, s->FightAuthentication(&auth_error));
  157. ASSERT_EQ(AUTH_ERR, auth_error);
  158. // Socket has been `SetFailed' when authentication failed
  159. ASSERT_TRUE(brpc::Socket::Address(s->id(), NULL));
  160. }
  161. static butil::atomic<int> g_called_seq(1);
  162. class MyMessage : public brpc::SocketMessage {
  163. public:
  164. MyMessage(const char* str, size_t len, int* called = NULL)
  165. : _str(str), _len(len), _called(called) {}
  166. private:
  167. butil::Status AppendAndDestroySelf(butil::IOBuf* out_buf, brpc::Socket*) {
  168. out_buf->append(_str, _len);
  169. if (_called) {
  170. *_called = g_called_seq.fetch_add(1, butil::memory_order_relaxed);
  171. }
  172. delete this;
  173. return butil::Status::OK();
  174. };
  175. const char* _str;
  176. size_t _len;
  177. int* _called;
  178. };
  179. class MyErrorMessage : public brpc::SocketMessage {
  180. public:
  181. explicit MyErrorMessage(const butil::Status& st) : _status(st) {}
  182. private:
  183. butil::Status AppendAndDestroySelf(butil::IOBuf*, brpc::Socket*) {
  184. return _status;
  185. };
  186. butil::Status _status;
  187. };
  188. TEST_F(SocketTest, single_threaded_write) {
  189. int fds[2];
  190. ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
  191. brpc::SocketId id = 8888;
  192. butil::EndPoint dummy;
  193. ASSERT_EQ(0, str2endpoint("192.168.1.26:8080", &dummy));
  194. brpc::SocketOptions options;
  195. options.fd = fds[1];
  196. options.remote_side = dummy;
  197. options.user = new CheckRecycle;
  198. ASSERT_EQ(0, brpc::Socket::Create(options, &id));
  199. {
  200. brpc::SocketUniquePtr s;
  201. ASSERT_EQ(0, brpc::Socket::Address(id, &s));
  202. global_sock = s.get();
  203. ASSERT_TRUE(s.get());
  204. ASSERT_EQ(fds[1], s->fd());
  205. ASSERT_EQ(dummy, s->remote_side());
  206. ASSERT_EQ(id, s->id());
  207. const int BATCH = 5;
  208. for (size_t i = 0; i < 20; ++i) {
  209. char buf[32 * BATCH];
  210. size_t len = snprintf(buf, sizeof(buf), "hello world! %lu", i);
  211. if (i % 4 == 0) {
  212. brpc::SocketMessagePtr<MyMessage> msg(new MyMessage(buf, len));
  213. ASSERT_EQ(0, s->Write(msg));
  214. } else if (i % 4 == 1) {
  215. brpc::SocketMessagePtr<MyErrorMessage> msg(
  216. new MyErrorMessage(butil::Status(EINVAL, "Invalid input")));
  217. bthread_id_t wait_id;
  218. WaitData data;
  219. ASSERT_EQ(0, bthread_id_create2(&wait_id, &data, OnWaitIdReset));
  220. brpc::Socket::WriteOptions wopt;
  221. wopt.id_wait = wait_id;
  222. ASSERT_EQ(0, s->Write(msg, &wopt));
  223. ASSERT_EQ(0, bthread_id_join(wait_id));
  224. ASSERT_EQ(wait_id.value, data.id.value);
  225. ASSERT_EQ(EINVAL, data.error_code);
  226. ASSERT_EQ("Invalid input", data.error_text);
  227. continue;
  228. } else if (i % 4 == 2) {
  229. int seq[BATCH] = {};
  230. brpc::SocketMessagePtr<MyMessage> msgs[BATCH];
  231. // re-print the buffer.
  232. len = 0;
  233. for (int j = 0; j < BATCH; ++j) {
  234. if (j % 2 == 0) {
  235. // Empty message, should be skipped.
  236. msgs[j].reset(new MyMessage(buf+len, 0, &seq[j]));
  237. } else {
  238. size_t sub_len = snprintf(
  239. buf+len, sizeof(buf)-len, "hello world! %lu.%d", i, j);
  240. msgs[j].reset(new MyMessage(buf+len, sub_len, &seq[j]));
  241. len += sub_len;
  242. }
  243. }
  244. for (size_t i = 0; i < BATCH; ++i) {
  245. ASSERT_EQ(0, s->Write(msgs[i]));
  246. }
  247. for (int j = 1; j < BATCH; ++j) {
  248. ASSERT_LT(seq[j-1], seq[j]) << "j=" << j;
  249. }
  250. } else {
  251. butil::IOBuf src;
  252. src.append(buf);
  253. ASSERT_EQ(len, src.length());
  254. ASSERT_EQ(0, s->Write(&src));
  255. ASSERT_TRUE(src.empty());
  256. }
  257. char dest[sizeof(buf)];
  258. ASSERT_EQ(len, (size_t)read(fds[0], dest, sizeof(dest)));
  259. ASSERT_EQ(0, memcmp(buf, dest, len));
  260. }
  261. ASSERT_EQ(0, s->SetFailed());
  262. }
  263. ASSERT_EQ((brpc::Socket*)NULL, global_sock);
  264. close(fds[0]);
  265. }
  266. void EchoProcessHuluRequest(brpc::InputMessageBase* msg_base) {
  267. brpc::DestroyingPtr<brpc::policy::MostCommonMessage> msg(
  268. static_cast<brpc::policy::MostCommonMessage*>(msg_base));
  269. butil::IOBuf buf;
  270. buf.append(msg->meta);
  271. buf.append(msg->payload);
  272. ASSERT_EQ(0, msg->socket()->Write(&buf));
  273. }
  274. class MyConnect : public brpc::AppConnect {
  275. public:
  276. MyConnect() : _done(NULL), _data(NULL), _called_start_connect(false) {}
  277. void StartConnect(const brpc::Socket*,
  278. void (*done)(int err, void* data),
  279. void* data) {
  280. LOG(INFO) << "Start application-level connect";
  281. _done = done;
  282. _data = data;
  283. _called_start_connect = true;
  284. }
  285. void StopConnect(brpc::Socket*) {
  286. LOG(INFO) << "Stop application-level connect";
  287. }
  288. void MakeConnectDone() {
  289. _done(0, _data);
  290. }
  291. bool is_start_connect_called() const { return _called_start_connect; }
  292. private:
  293. void (*_done)(int err, void* data);
  294. void* _data;
  295. bool _called_start_connect;
  296. };
  297. TEST_F(SocketTest, single_threaded_connect_and_write) {
  298. // FIXME(gejun): Messenger has to be new otherwise quitting may crash.
  299. brpc::Acceptor* messenger = new brpc::Acceptor;
  300. const brpc::InputMessageHandler pairs[] = {
  301. { brpc::policy::ParseHuluMessage,
  302. EchoProcessHuluRequest, NULL, NULL, "dummy_hulu" }
  303. };
  304. butil::EndPoint point(butil::IP_ANY, 7878);
  305. int listening_fd = tcp_listen(point);
  306. ASSERT_TRUE(listening_fd > 0);
  307. butil::make_non_blocking(listening_fd);
  308. ASSERT_EQ(0, messenger->AddHandler(pairs[0]));
  309. ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL));
  310. brpc::SocketId id = 8888;
  311. brpc::SocketOptions options;
  312. options.remote_side = point;
  313. std::shared_ptr<MyConnect> my_connect = std::make_shared<MyConnect>();
  314. options.app_connect = my_connect;
  315. options.user = new CheckRecycle;
  316. ASSERT_EQ(0, brpc::Socket::Create(options, &id));
  317. {
  318. brpc::SocketUniquePtr s;
  319. ASSERT_EQ(0, brpc::Socket::Address(id, &s));
  320. global_sock = s.get();
  321. ASSERT_TRUE(s.get());
  322. ASSERT_EQ(-1, s->fd());
  323. ASSERT_EQ(point, s->remote_side());
  324. ASSERT_EQ(id, s->id());
  325. for (size_t i = 0; i < 20; ++i) {
  326. char buf[64];
  327. const size_t meta_len = 4;
  328. *(uint32_t*)(buf + 12) = *(uint32_t*)"Meta";
  329. const size_t len = snprintf(buf + 12 + meta_len,
  330. sizeof(buf) - 12 - meta_len,
  331. "hello world! %lu", i);
  332. memcpy(buf, "HULU", 4);
  333. // HULU uses host byte order directly...
  334. *(uint32_t*)(buf + 4) = len + meta_len;
  335. *(uint32_t*)(buf + 8) = meta_len;
  336. int called = 0;
  337. if (i % 2 == 0) {
  338. brpc::SocketMessagePtr<MyMessage> msg(
  339. new MyMessage(buf, 12 + meta_len + len, &called));
  340. ASSERT_EQ(0, s->Write(msg));
  341. } else {
  342. butil::IOBuf src;
  343. src.append(buf, 12 + meta_len + len);
  344. ASSERT_EQ(12 + meta_len + len, src.length());
  345. ASSERT_EQ(0, s->Write(&src));
  346. ASSERT_TRUE(src.empty());
  347. }
  348. if (i == 0) {
  349. // connection needs to be established at first time.
  350. // Should be intentionally blocked in app_connect.
  351. bthread_usleep(10000);
  352. ASSERT_TRUE(my_connect->is_start_connect_called());
  353. ASSERT_LT(0, s->fd()); // already tcp connected
  354. ASSERT_EQ(0, called); // request is not serialized yet.
  355. my_connect->MakeConnectDone();
  356. ASSERT_LT(0, called); // serialized
  357. }
  358. int64_t start_time = butil::gettimeofday_us();
  359. while (s->fd() < 0) {
  360. bthread_usleep(1000);
  361. ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L) << "Too long!";
  362. }
  363. #if defined(OS_LINUX)
  364. ASSERT_EQ(0, bthread_fd_wait(s->fd(), EPOLLIN));
  365. #elif defined(OS_MACOSX)
  366. ASSERT_EQ(0, bthread_fd_wait(s->fd(), EVFILT_READ));
  367. #endif
  368. char dest[sizeof(buf)];
  369. ASSERT_EQ(meta_len + len, (size_t)read(s->fd(), dest, sizeof(dest)));
  370. ASSERT_EQ(0, memcmp(buf + 12, dest, meta_len + len));
  371. }
  372. ASSERT_EQ(0, s->SetFailed());
  373. }
  374. ASSERT_EQ((brpc::Socket*)NULL, global_sock);
  375. // The id is invalid.
  376. brpc::SocketUniquePtr ptr;
  377. ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr));
  378. messenger->StopAccept(0);
  379. ASSERT_EQ(-1, messenger->listened_fd());
  380. ASSERT_EQ(-1, fcntl(listening_fd, F_GETFD));
  381. ASSERT_EQ(EBADF, errno);
  382. }
  383. #define NUMBER_WIDTH 16
  384. struct WriterArg {
  385. size_t times;
  386. size_t offset;
  387. brpc::SocketId socket_id;
  388. };
  389. void* FailedWriter(void* void_arg) {
  390. WriterArg* arg = static_cast<WriterArg*>(void_arg);
  391. brpc::SocketUniquePtr sock;
  392. if (brpc::Socket::Address(arg->socket_id, &sock) < 0) {
  393. printf("Fail to address SocketId=%" PRIu64 "\n", arg->socket_id);
  394. return NULL;
  395. }
  396. char buf[32];
  397. for (size_t i = 0; i < arg->times; ++i) {
  398. bthread_id_t id;
  399. EXPECT_EQ(0, bthread_id_create(&id, NULL, NULL));
  400. snprintf(buf, sizeof(buf), "%0" BAIDU_SYMBOLSTR(NUMBER_WIDTH) "lu",
  401. i + arg->offset);
  402. butil::IOBuf src;
  403. src.append(buf);
  404. brpc::Socket::WriteOptions wopt;
  405. wopt.id_wait = id;
  406. sock->Write(&src, &wopt);
  407. EXPECT_EQ(0, bthread_id_join(id));
  408. // Only the first connect can see ECONNREFUSED and then
  409. // calls `SetFailed' making others' error_code=EINVAL
  410. //EXPECT_EQ(ECONNREFUSED, error_code);
  411. }
  412. return NULL;
  413. }
  414. TEST_F(SocketTest, fail_to_connect) {
  415. const size_t REP = 10;
  416. butil::EndPoint point(butil::IP_ANY, 7563/*not listened*/);
  417. brpc::SocketId id = 8888;
  418. brpc::SocketOptions options;
  419. options.remote_side = point;
  420. options.user = new CheckRecycle;
  421. ASSERT_EQ(0, brpc::Socket::Create(options, &id));
  422. {
  423. brpc::SocketUniquePtr s;
  424. ASSERT_EQ(0, brpc::Socket::Address(id, &s));
  425. global_sock = s.get();
  426. ASSERT_TRUE(s.get());
  427. ASSERT_EQ(-1, s->fd());
  428. ASSERT_EQ(point, s->remote_side());
  429. ASSERT_EQ(id, s->id());
  430. pthread_t th[8];
  431. WriterArg args[ARRAY_SIZE(th)];
  432. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  433. args[i].times = REP;
  434. args[i].offset = i * REP;
  435. args[i].socket_id = id;
  436. ASSERT_EQ(0, pthread_create(&th[i], NULL, FailedWriter, &args[i]));
  437. }
  438. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  439. ASSERT_EQ(0, pthread_join(th[i], NULL));
  440. }
  441. ASSERT_EQ(-1, s->SetFailed()); // already SetFailed
  442. ASSERT_EQ(-1, s->fd());
  443. }
  444. // KeepWrite is possibly still running.
  445. int64_t start_time = butil::gettimeofday_us();
  446. while (global_sock != NULL) {
  447. bthread_usleep(1000);
  448. ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L) << "Too long!";
  449. }
  450. ASSERT_EQ(-1, brpc::Socket::Status(id));
  451. // The id is invalid.
  452. brpc::SocketUniquePtr ptr;
  453. ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr));
  454. }
  455. TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
  456. brpc::SocketId id = 8888;
  457. butil::EndPoint point(butil::IP_ANY, 7584/*not listened*/);
  458. brpc::SocketOptions options;
  459. options.remote_side = point;
  460. options.user = new CheckRecycle;
  461. options.health_check_interval_s = 1/*s*/;
  462. ASSERT_EQ(0, brpc::Socket::Create(options, &id));
  463. {
  464. brpc::SocketUniquePtr s;
  465. ASSERT_EQ(0, brpc::Socket::Address(id, &s));
  466. global_sock = s.get();
  467. ASSERT_TRUE(s.get());
  468. ASSERT_EQ(-1, s->fd());
  469. ASSERT_EQ(point, s->remote_side());
  470. ASSERT_EQ(id, s->id());
  471. char buf[64];
  472. const size_t meta_len = 4;
  473. *(uint32_t*)(buf + 12) = *(uint32_t*)"Meta";
  474. const size_t len = snprintf(buf + 12 + meta_len,
  475. sizeof(buf) - 12 - meta_len,
  476. "hello world!");
  477. memcpy(buf, "HULU", 4);
  478. // HULU uses host byte order directly...
  479. *(uint32_t*)(buf + 4) = len + meta_len;
  480. *(uint32_t*)(buf + 8) = meta_len;
  481. butil::IOBuf src;
  482. src.append(buf, 12 + meta_len + len);
  483. ASSERT_EQ(12 + meta_len + len, src.length());
  484. #ifdef CONNECT_IN_KEEPWRITE
  485. bthread_id_t wait_id;
  486. WaitData data;
  487. ASSERT_EQ(0, bthread_id_create2(&wait_id, &data, OnWaitIdReset));
  488. brpc::Socket::WriteOptions wopt;
  489. wopt.id_wait = wait_id;
  490. ASSERT_EQ(0, s->Write(&src, &wopt));
  491. ASSERT_EQ(0, bthread_id_join(wait_id));
  492. ASSERT_EQ(wait_id.value, data.id.value);
  493. ASSERT_EQ(ECONNREFUSED, data.error_code);
  494. ASSERT_TRUE(butil::StringPiece(data.error_text).starts_with(
  495. "Fail to connect "));
  496. #else
  497. ASSERT_EQ(-1, s->Write(&src));
  498. ASSERT_EQ(ECONNREFUSED, errno);
  499. #endif
  500. ASSERT_TRUE(src.empty());
  501. ASSERT_EQ(-1, s->fd());
  502. }
  503. // HealthCheckThread is possibly still running. Spin until global_sock
  504. // is NULL(set in CheckRecycle::BeforeRecycle). Notice that you should
  505. // not spin until Socket::Status(id) becomes -1 and assert global_sock
  506. // to be NULL because invalidating id happens before calling BeforeRecycle.
  507. const int64_t start_time = butil::gettimeofday_us();
  508. while (global_sock != NULL) {
  509. bthread_usleep(1000);
  510. ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L);
  511. }
  512. ASSERT_EQ(-1, brpc::Socket::Status(id));
  513. }
  514. class HealthCheckTestServiceImpl : public test::HealthCheckTestService {
  515. public:
  516. HealthCheckTestServiceImpl()
  517. : _sleep_flag(true) {}
  518. virtual ~HealthCheckTestServiceImpl() {}
  519. virtual void default_method(google::protobuf::RpcController* cntl_base,
  520. const test::HealthCheckRequest* request,
  521. test::HealthCheckResponse* response,
  522. google::protobuf::Closure* done) {
  523. brpc::ClosureGuard done_guard(done);
  524. brpc::Controller* cntl = (brpc::Controller*)cntl_base;
  525. if (_sleep_flag) {
  526. bthread_usleep(510000 /* 510ms, a little bit longer than the default
  527. timeout of health check rpc */);
  528. }
  529. cntl->response_attachment().append("OK");
  530. }
  531. bool _sleep_flag;
  532. };
  533. TEST_F(SocketTest, app_level_health_check) {
  534. int old_health_check_interval = brpc::FLAGS_health_check_interval;
  535. GFLAGS_NS::SetCommandLineOption("health_check_path", "/HealthCheckTestService");
  536. GFLAGS_NS::SetCommandLineOption("health_check_interval", "1");
  537. butil::EndPoint point(butil::IP_ANY, 7777);
  538. brpc::ChannelOptions options;
  539. options.protocol = "http";
  540. options.max_retry = 0;
  541. brpc::Channel channel;
  542. ASSERT_EQ(0, channel.Init(point, &options));
  543. {
  544. brpc::Controller cntl;
  545. cntl.http_request().uri() = "/";
  546. channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
  547. EXPECT_TRUE(cntl.Failed());
  548. ASSERT_EQ(ECONNREFUSED, cntl.ErrorCode());
  549. }
  550. // 2s to make sure remote is connected by HealthCheckTask and enter the
  551. // sending-rpc state. Because the remote is not down, so hc rpc would keep
  552. // sending.
  553. int listening_fd = tcp_listen(point);
  554. bthread_usleep(2000000);
  555. // 2s to make sure HealthCheckTask find socket is failed and correct impl
  556. // should trigger next round of hc
  557. close(listening_fd);
  558. bthread_usleep(2000000);
  559. brpc::Server server;
  560. HealthCheckTestServiceImpl hc_service;
  561. ASSERT_EQ(0, server.AddService(&hc_service, brpc::SERVER_DOESNT_OWN_SERVICE));
  562. ASSERT_EQ(0, server.Start(point, NULL));
  563. for (int i = 0; i < 4; ++i) {
  564. // although ::connect would succeed, the stall in hc_service makes
  565. // the health check rpc fail.
  566. brpc::Controller cntl;
  567. cntl.http_request().uri() = "/";
  568. channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
  569. ASSERT_EQ(EHOSTDOWN, cntl.ErrorCode());
  570. bthread_usleep(1000000 /*1s*/);
  571. }
  572. hc_service._sleep_flag = false;
  573. bthread_usleep(2000000 /* a little bit longer than hc rpc timeout + hc interval */);
  574. // should recover now
  575. {
  576. brpc::Controller cntl;
  577. cntl.http_request().uri() = "/";
  578. channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
  579. ASSERT_FALSE(cntl.Failed());
  580. ASSERT_GT(cntl.response_attachment().size(), (size_t)0);
  581. }
  582. GFLAGS_NS::SetCommandLineOption("health_check_path", "");
  583. char hc_buf[8];
  584. snprintf(hc_buf, sizeof(hc_buf), "%d", old_health_check_interval);
  585. GFLAGS_NS::SetCommandLineOption("health_check_interval", hc_buf);
  586. }
  587. TEST_F(SocketTest, health_check) {
  588. // FIXME(gejun): Messenger has to be new otherwise quitting may crash.
  589. brpc::Acceptor* messenger = new brpc::Acceptor;
  590. brpc::SocketId id = 8888;
  591. butil::EndPoint point(butil::IP_ANY, 7878);
  592. const int kCheckInteval = 1;
  593. brpc::SocketOptions options;
  594. options.remote_side = point;
  595. options.user = new CheckRecycle;
  596. options.health_check_interval_s = kCheckInteval/*s*/;
  597. ASSERT_EQ(0, brpc::Socket::Create(options, &id));
  598. brpc::SocketUniquePtr s;
  599. ASSERT_EQ(0, brpc::Socket::Address(id, &s));
  600. global_sock = s.get();
  601. ASSERT_TRUE(s.get());
  602. ASSERT_EQ(-1, s->fd());
  603. ASSERT_EQ(point, s->remote_side());
  604. ASSERT_EQ(id, s->id());
  605. int32_t nref = -1;
  606. ASSERT_EQ(0, brpc::Socket::Status(id, &nref));
  607. ASSERT_EQ(2, nref);
  608. char buf[64];
  609. const size_t meta_len = 4;
  610. *(uint32_t*)(buf + 12) = *(uint32_t*)"Meta";
  611. const size_t len = snprintf(buf + 12 + meta_len,
  612. sizeof(buf) - 12 - meta_len,
  613. "hello world!");
  614. memcpy(buf, "HULU", 4);
  615. // HULU uses host byte order directly...
  616. *(uint32_t*)(buf + 4) = len + meta_len;
  617. *(uint32_t*)(buf + 8) = meta_len;
  618. const bool use_my_message = (butil::fast_rand_less_than(2) == 0);
  619. brpc::SocketMessagePtr<MyMessage> msg;
  620. int appended_msg = 0;
  621. butil::IOBuf src;
  622. if (use_my_message) {
  623. LOG(INFO) << "Use MyMessage";
  624. msg.reset(new MyMessage(buf, 12 + meta_len + len, &appended_msg));
  625. } else {
  626. src.append(buf, 12 + meta_len + len);
  627. ASSERT_EQ(12 + meta_len + len, src.length());
  628. }
  629. #ifdef CONNECT_IN_KEEPWRITE
  630. bthread_id_t wait_id;
  631. WaitData data;
  632. ASSERT_EQ(0, bthread_id_create2(&wait_id, &data, OnWaitIdReset));
  633. brpc::Socket::WriteOptions wopt;
  634. wopt.id_wait = wait_id;
  635. if (use_my_message) {
  636. ASSERT_EQ(0, s->Write(msg, &wopt));
  637. } else {
  638. ASSERT_EQ(0, s->Write(&src, &wopt));
  639. }
  640. ASSERT_EQ(0, bthread_id_join(wait_id));
  641. ASSERT_EQ(wait_id.value, data.id.value);
  642. ASSERT_EQ(ECONNREFUSED, data.error_code);
  643. ASSERT_TRUE(butil::StringPiece(data.error_text).starts_with(
  644. "Fail to connect "));
  645. if (use_my_message) {
  646. ASSERT_TRUE(appended_msg);
  647. }
  648. #else
  649. if (use_my_message) {
  650. ASSERT_EQ(-1, s->Write(msg));
  651. } else {
  652. ASSERT_EQ(-1, s->Write(&src));
  653. }
  654. ASSERT_EQ(ECONNREFUSED, errno);
  655. #endif
  656. ASSERT_TRUE(src.empty());
  657. ASSERT_EQ(-1, s->fd());
  658. ASSERT_TRUE(global_sock);
  659. brpc::SocketUniquePtr invalid_ptr;
  660. ASSERT_EQ(-1, brpc::Socket::Address(id, &invalid_ptr));
  661. ASSERT_EQ(1, brpc::Socket::Status(id));
  662. const brpc::InputMessageHandler pairs[] = {
  663. { brpc::policy::ParseHuluMessage,
  664. EchoProcessHuluRequest, NULL, NULL, "dummy_hulu" }
  665. };
  666. int listening_fd = tcp_listen(point);
  667. ASSERT_TRUE(listening_fd > 0);
  668. butil::make_non_blocking(listening_fd);
  669. ASSERT_EQ(0, messenger->AddHandler(pairs[0]));
  670. ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL));
  671. int64_t start_time = butil::gettimeofday_us();
  672. nref = -1;
  673. while (brpc::Socket::Status(id, &nref) != 0) {
  674. bthread_usleep(1000);
  675. ASSERT_LT(butil::gettimeofday_us(),
  676. start_time + kCheckInteval * 1000000L + 100000L/*100ms*/);
  677. }
  678. //ASSERT_EQ(2, nref);
  679. ASSERT_TRUE(global_sock);
  680. int fd = 0;
  681. {
  682. brpc::SocketUniquePtr ptr;
  683. ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
  684. ASSERT_NE(0, ptr->fd());
  685. fd = ptr->fd();
  686. }
  687. // SetFailed again, should reconnect and succeed soon.
  688. ASSERT_EQ(0, s->SetFailed());
  689. ASSERT_EQ(fd, s->fd());
  690. start_time = butil::gettimeofday_us();
  691. while (brpc::Socket::Status(id) != 0) {
  692. bthread_usleep(1000);
  693. ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L);
  694. }
  695. ASSERT_TRUE(global_sock);
  696. {
  697. brpc::SocketUniquePtr ptr;
  698. ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
  699. ASSERT_NE(0, ptr->fd());
  700. }
  701. s.release()->Dereference();
  702. // Must stop messenger before SetFailed the id otherwise HealthCheckThread
  703. // still has chance to get reconnected and revive the id.
  704. messenger->StopAccept(0);
  705. ASSERT_EQ(-1, messenger->listened_fd());
  706. ASSERT_EQ(-1, fcntl(listening_fd, F_GETFD));
  707. ASSERT_EQ(EBADF, errno);
  708. ASSERT_EQ(0, brpc::Socket::SetFailed(id));
  709. // HealthCheckThread is possibly still addressing the Socket.
  710. start_time = butil::gettimeofday_us();
  711. while (global_sock != NULL) {
  712. bthread_usleep(1000);
  713. ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L);
  714. }
  715. ASSERT_EQ(-1, brpc::Socket::Status(id));
  716. // The id is invalid.
  717. brpc::SocketUniquePtr ptr;
  718. ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr));
  719. }
  720. void* Writer(void* void_arg) {
  721. WriterArg* arg = static_cast<WriterArg*>(void_arg);
  722. brpc::SocketUniquePtr sock;
  723. if (brpc::Socket::Address(arg->socket_id, &sock) < 0) {
  724. printf("Fail to address SocketId=%" PRIu64 "\n", arg->socket_id);
  725. return NULL;
  726. }
  727. char buf[32];
  728. for (size_t i = 0; i < arg->times; ++i) {
  729. snprintf(buf, sizeof(buf), "%0" BAIDU_SYMBOLSTR(NUMBER_WIDTH) "lu",
  730. i + arg->offset);
  731. butil::IOBuf src;
  732. src.append(buf);
  733. if (sock->Write(&src) != 0) {
  734. if (errno == brpc::EOVERCROWDED) {
  735. // The buf is full, sleep a while and retry.
  736. bthread_usleep(1000);
  737. --i;
  738. continue;
  739. }
  740. printf("Fail to write into SocketId=%" PRIu64 ", %s\n",
  741. arg->socket_id, berror());
  742. break;
  743. }
  744. }
  745. return NULL;
  746. }
  747. TEST_F(SocketTest, multi_threaded_write) {
  748. const size_t REP = 20000;
  749. int fds[2];
  750. for (int k = 0; k < 2; ++k) {
  751. printf("Round %d\n", k + 1);
  752. ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
  753. pthread_t th[8];
  754. WriterArg args[ARRAY_SIZE(th)];
  755. std::vector<size_t> result;
  756. result.reserve(ARRAY_SIZE(th) * REP);
  757. brpc::SocketId id = 8888;
  758. butil::EndPoint dummy;
  759. ASSERT_EQ(0, str2endpoint("192.168.1.26:8080", &dummy));
  760. brpc::SocketOptions options;
  761. options.fd = fds[1];
  762. options.remote_side = dummy;
  763. options.user = new CheckRecycle;
  764. ASSERT_EQ(0, brpc::Socket::Create(options, &id));
  765. brpc::SocketUniquePtr s;
  766. ASSERT_EQ(0, brpc::Socket::Address(id, &s));
  767. s->_ssl_state = brpc::SSL_OFF;
  768. global_sock = s.get();
  769. ASSERT_TRUE(s.get());
  770. ASSERT_EQ(fds[1], s->fd());
  771. ASSERT_EQ(dummy, s->remote_side());
  772. ASSERT_EQ(id, s->id());
  773. butil::make_non_blocking(fds[0]);
  774. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  775. args[i].times = REP;
  776. args[i].offset = i * REP;
  777. args[i].socket_id = id;
  778. ASSERT_EQ(0, pthread_create(&th[i], NULL, Writer, &args[i]));
  779. }
  780. if (k == 1) {
  781. printf("sleep 100ms to block writers\n");
  782. bthread_usleep(100000);
  783. }
  784. butil::IOPortal dest;
  785. const int64_t start_time = butil::gettimeofday_us();
  786. for (;;) {
  787. ssize_t nr = dest.append_from_file_descriptor(fds[0], 32768);
  788. if (nr < 0) {
  789. if (errno == EINTR) {
  790. continue;
  791. }
  792. if (EAGAIN != errno) {
  793. ASSERT_EQ(EAGAIN, errno) << berror();
  794. }
  795. bthread_usleep(1000);
  796. if (butil::gettimeofday_us() >= start_time + 2000000L) {
  797. LOG(FATAL) << "Wait too long!";
  798. break;
  799. }
  800. continue;
  801. }
  802. while (dest.length() >= NUMBER_WIDTH) {
  803. char buf[NUMBER_WIDTH + 1];
  804. dest.copy_to(buf, NUMBER_WIDTH);
  805. buf[sizeof(buf)-1] = 0;
  806. result.push_back(strtol(buf, NULL, 10));
  807. dest.pop_front(NUMBER_WIDTH);
  808. }
  809. if (result.size() >= REP * ARRAY_SIZE(th)) {
  810. break;
  811. }
  812. }
  813. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  814. ASSERT_EQ(0, pthread_join(th[i], NULL));
  815. }
  816. ASSERT_TRUE(dest.empty());
  817. bthread::g_task_control->print_rq_sizes(std::cout);
  818. std::cout << std::endl;
  819. ASSERT_EQ(REP * ARRAY_SIZE(th), result.size())
  820. << "write_head=" << s->_write_head;
  821. std::sort(result.begin(), result.end());
  822. result.resize(std::unique(result.begin(),
  823. result.end()) - result.begin());
  824. ASSERT_EQ(REP * ARRAY_SIZE(th), result.size());
  825. ASSERT_EQ(0UL, *result.begin());
  826. ASSERT_EQ(REP * ARRAY_SIZE(th) - 1, *(result.end() - 1));
  827. ASSERT_EQ(0, s->SetFailed());
  828. s.release()->Dereference();
  829. ASSERT_EQ((brpc::Socket*)NULL, global_sock);
  830. close(fds[0]);
  831. }
  832. }
  833. void* FastWriter(void* void_arg) {
  834. WriterArg* arg = static_cast<WriterArg*>(void_arg);
  835. brpc::SocketUniquePtr sock;
  836. if (brpc::Socket::Address(arg->socket_id, &sock) < 0) {
  837. printf("Fail to address SocketId=%" PRIu64 "\n", arg->socket_id);
  838. return NULL;
  839. }
  840. char buf[] = "hello reader side!";
  841. int64_t begin_ts = butil::cpuwide_time_us();
  842. int64_t nretry = 0;
  843. size_t c = 0;
  844. for (; c < arg->times; ++c) {
  845. butil::IOBuf src;
  846. src.append(buf, 16);
  847. if (sock->Write(&src) != 0) {
  848. if (errno == brpc::EOVERCROWDED) {
  849. // The buf is full, sleep a while and retry.
  850. bthread_usleep(1000);
  851. --c;
  852. ++nretry;
  853. continue;
  854. }
  855. printf("Fail to write into SocketId=%" PRIu64 ", %s\n",
  856. arg->socket_id, berror());
  857. break;
  858. }
  859. }
  860. int64_t end_ts = butil::cpuwide_time_us();
  861. int64_t total_time = end_ts - begin_ts;
  862. printf("total=%ld count=%ld nretry=%ld\n",
  863. (long)total_time * 1000/ c, (long)c, (long)nretry);
  864. return NULL;
  865. }
  866. struct ReaderArg {
  867. int fd;
  868. size_t nread;
  869. };
  870. void* reader(void* void_arg) {
  871. ReaderArg* arg = static_cast<ReaderArg*>(void_arg);
  872. const size_t LEN = 32768;
  873. char* buf = (char*)malloc(LEN);
  874. while (1) {
  875. ssize_t nr = read(arg->fd, buf, LEN);
  876. if (nr < 0) {
  877. printf("Fail to read, %m\n");
  878. return NULL;
  879. } else if (nr == 0) {
  880. printf("Far end closed\n");
  881. return NULL;
  882. }
  883. arg->nread += nr;
  884. }
  885. return NULL;
  886. }
  887. TEST_F(SocketTest, multi_threaded_write_perf) {
  888. const size_t REP = 1000000000;
  889. int fds[2];
  890. ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
  891. bthread_t th[3];
  892. WriterArg args[ARRAY_SIZE(th)];
  893. brpc::SocketId id = 8888;
  894. butil::EndPoint dummy;
  895. ASSERT_EQ(0, str2endpoint("192.168.1.26:8080", &dummy));
  896. brpc::SocketOptions options;
  897. options.fd = fds[1];
  898. options.remote_side = dummy;
  899. options.user = new CheckRecycle;
  900. ASSERT_EQ(0, brpc::Socket::Create(options, &id));
  901. brpc::SocketUniquePtr s;
  902. ASSERT_EQ(0, brpc::Socket::Address(id, &s));
  903. s->_ssl_state = brpc::SSL_OFF;
  904. ASSERT_EQ(2, brpc::NRefOfVRef(s->_versioned_ref));
  905. global_sock = s.get();
  906. ASSERT_TRUE(s.get());
  907. ASSERT_EQ(fds[1], s->fd());
  908. ASSERT_EQ(dummy, s->remote_side());
  909. ASSERT_EQ(id, s->id());
  910. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  911. args[i].times = REP;
  912. args[i].offset = i * REP;
  913. args[i].socket_id = id;
  914. bthread_start_background(&th[i], NULL, FastWriter, &args[i]);
  915. }
  916. pthread_t rth;
  917. ReaderArg reader_arg = { fds[0], 0 };
  918. pthread_create(&rth, NULL, reader, &reader_arg);
  919. butil::Timer tm;
  920. ProfilerStart("write.prof");
  921. const uint64_t old_nread = reader_arg.nread;
  922. tm.start();
  923. sleep(2);
  924. tm.stop();
  925. const uint64_t new_nread = reader_arg.nread;
  926. ProfilerStop();
  927. printf("tp=%" PRIu64 "M/s\n", (new_nread - old_nread) / tm.u_elapsed());
  928. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  929. args[i].times = 0;
  930. }
  931. for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
  932. ASSERT_EQ(0, bthread_join(th[i], NULL));
  933. }
  934. ASSERT_EQ(0, s->SetFailed());
  935. s.release()->Dereference();
  936. pthread_join(rth, NULL);
  937. ASSERT_EQ((brpc::Socket*)NULL, global_sock);
  938. close(fds[0]);
  939. }