brpc_http_rpc_protocol_unittest.cpp 57 KB


  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // brpc - A framework to host and access services throughout Baidu.
  18. // Date: Sun Jul 13 15:04:18 CST 2014
  19. #include <sys/ioctl.h>
  20. #include <sys/types.h>
  21. #include <sys/socket.h>
  22. #include <gtest/gtest.h>
  23. #include <gflags/gflags.h>
  24. #include <google/protobuf/descriptor.h>
  25. #include "butil/time.h"
  26. #include "butil/macros.h"
  27. #include "butil/files/scoped_file.h"
  28. #include "butil/fd_guard.h"
  29. #include "brpc/socket.h"
  30. #include "brpc/acceptor.h"
  31. #include "brpc/server.h"
  32. #include "brpc/channel.h"
  33. #include "brpc/policy/most_common_message.h"
  34. #include "brpc/controller.h"
  35. #include "echo.pb.h"
  36. #include "brpc/policy/http_rpc_protocol.h"
  37. #include "brpc/policy/http2_rpc_protocol.h"
  38. #include "json2pb/pb_to_json.h"
  39. #include "json2pb/json_to_pb.h"
  40. #include "brpc/details/method_status.h"
  41. int main(int argc, char* argv[]) {
  42. testing::InitGoogleTest(&argc, argv);
  43. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
  44. if (GFLAGS_NS::SetCommandLineOption("socket_max_unwritten_bytes", "2000000").empty()) {
  45. std::cerr << "Fail to set -socket_max_unwritten_bytes" << std::endl;
  46. return -1;
  47. }
  48. if (GFLAGS_NS::SetCommandLineOption("crash_on_fatal_log", "true").empty()) {
  49. std::cerr << "Fail to set -crash_on_fatal_log" << std::endl;
  50. return -1;
  51. }
  52. return RUN_ALL_TESTS();
  53. }
  54. namespace {
  55. static const std::string EXP_REQUEST = "hello";
  56. static const std::string EXP_RESPONSE = "world";
  57. static const std::string MOCK_CREDENTIAL = "mock credential";
  58. static const std::string MOCK_USER = "mock user";
  59. class MyAuthenticator : public brpc::Authenticator {
  60. public:
  61. MyAuthenticator() {}
  62. int GenerateCredential(std::string* auth_str) const {
  63. *auth_str = MOCK_CREDENTIAL;
  64. return 0;
  65. }
  66. int VerifyCredential(const std::string& auth_str,
  67. const butil::EndPoint&,
  68. brpc::AuthContext* ctx) const {
  69. EXPECT_EQ(MOCK_CREDENTIAL, auth_str);
  70. ctx->set_user(MOCK_USER);
  71. return 0;
  72. }
  73. };
  74. class MyEchoService : public ::test::EchoService {
  75. public:
  76. void Echo(::google::protobuf::RpcController* cntl_base,
  77. const ::test::EchoRequest* req,
  78. ::test::EchoResponse* res,
  79. ::google::protobuf::Closure* done) {
  80. brpc::ClosureGuard done_guard(done);
  81. brpc::Controller* cntl =
  82. static_cast<brpc::Controller*>(cntl_base);
  83. const std::string* sleep_ms_str =
  84. cntl->http_request().uri().GetQuery("sleep_ms");
  85. if (sleep_ms_str) {
  86. bthread_usleep(strtol(sleep_ms_str->data(), NULL, 10) * 1000);
  87. }
  88. res->set_message(EXP_RESPONSE);
  89. }
  90. };
  91. class HttpTest : public ::testing::Test{
  92. protected:
  93. HttpTest() {
  94. EXPECT_EQ(0, _server.AddBuiltinServices());
  95. EXPECT_EQ(0, _server.AddService(
  96. &_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  97. // Hack: Regard `_server' as running
  98. _server._status = brpc::Server::RUNNING;
  99. _server._options.auth = &_auth;
  100. EXPECT_EQ(0, pipe(_pipe_fds));
  101. brpc::SocketId id;
  102. brpc::SocketOptions options;
  103. options.fd = _pipe_fds[1];
  104. EXPECT_EQ(0, brpc::Socket::Create(options, &id));
  105. EXPECT_EQ(0, brpc::Socket::Address(id, &_socket));
  106. brpc::SocketOptions h2_client_options;
  107. h2_client_options.user = brpc::get_client_side_messenger();
  108. h2_client_options.fd = _pipe_fds[1];
  109. EXPECT_EQ(0, brpc::Socket::Create(h2_client_options, &id));
  110. EXPECT_EQ(0, brpc::Socket::Address(id, &_h2_client_sock));
  111. };
  112. virtual ~HttpTest() {};
  113. virtual void SetUp() {};
  114. virtual void TearDown() {};
  115. void VerifyMessage(brpc::InputMessageBase* msg, bool expect) {
  116. if (msg->_socket == NULL) {
  117. _socket->ReAddress(&msg->_socket);
  118. }
  119. msg->_arg = &_server;
  120. EXPECT_EQ(expect, brpc::policy::VerifyHttpRequest(msg));
  121. }
  122. void ProcessMessage(void (*process)(brpc::InputMessageBase*),
  123. brpc::InputMessageBase* msg, bool set_eof) {
  124. if (msg->_socket == NULL) {
  125. _socket->ReAddress(&msg->_socket);
  126. }
  127. msg->_arg = &_server;
  128. _socket->PostponeEOF();
  129. if (set_eof) {
  130. _socket->SetEOF();
  131. }
  132. (*process)(msg);
  133. }
  134. brpc::policy::HttpContext* MakePostRequestMessage(const std::string& path) {
  135. brpc::policy::HttpContext* msg = new brpc::policy::HttpContext(false);
  136. msg->header().uri().set_path(path);
  137. msg->header().set_content_type("application/json");
  138. msg->header().set_method(brpc::HTTP_METHOD_POST);
  139. test::EchoRequest req;
  140. req.set_message(EXP_REQUEST);
  141. butil::IOBufAsZeroCopyOutputStream req_stream(&msg->body());
  142. EXPECT_TRUE(json2pb::ProtoMessageToJson(req, &req_stream, NULL));
  143. return msg;
  144. }
  145. brpc::policy::HttpContext* MakeGetRequestMessage(const std::string& path) {
  146. brpc::policy::HttpContext* msg = new brpc::policy::HttpContext(false);
  147. msg->header().uri().set_path(path);
  148. msg->header().set_method(brpc::HTTP_METHOD_GET);
  149. return msg;
  150. }
  151. brpc::policy::HttpContext* MakeResponseMessage(int code) {
  152. brpc::policy::HttpContext* msg = new brpc::policy::HttpContext(false);
  153. msg->header().set_status_code(code);
  154. msg->header().set_content_type("application/json");
  155. test::EchoResponse res;
  156. res.set_message(EXP_RESPONSE);
  157. butil::IOBufAsZeroCopyOutputStream res_stream(&msg->body());
  158. EXPECT_TRUE(json2pb::ProtoMessageToJson(res, &res_stream, NULL));
  159. return msg;
  160. }
  161. void CheckResponseCode(bool expect_empty, int expect_code) {
  162. int bytes_in_pipe = 0;
  163. ioctl(_pipe_fds[0], FIONREAD, &bytes_in_pipe);
  164. if (expect_empty) {
  165. EXPECT_EQ(0, bytes_in_pipe);
  166. return;
  167. }
  168. EXPECT_GT(bytes_in_pipe, 0);
  169. butil::IOPortal buf;
  170. EXPECT_EQ((ssize_t)bytes_in_pipe,
  171. buf.append_from_file_descriptor(_pipe_fds[0], 1024));
  172. brpc::ParseResult pr =
  173. brpc::policy::ParseHttpMessage(&buf, _socket.get(), false, NULL);
  174. EXPECT_EQ(brpc::PARSE_OK, pr.error());
  175. brpc::policy::HttpContext* msg =
  176. static_cast<brpc::policy::HttpContext*>(pr.message());
  177. EXPECT_EQ(expect_code, msg->header().status_code());
  178. msg->Destroy();
  179. }
  180. void MakeH2EchoRequestBuf(butil::IOBuf* out, brpc::Controller* cntl, int* h2_stream_id) {
  181. butil::IOBuf request_buf;
  182. test::EchoRequest req;
  183. req.set_message(EXP_REQUEST);
  184. cntl->http_request().set_method(brpc::HTTP_METHOD_POST);
  185. brpc::policy::SerializeHttpRequest(&request_buf, cntl, &req);
  186. ASSERT_FALSE(cntl->Failed());
  187. brpc::policy::H2UnsentRequest* h2_req = brpc::policy::H2UnsentRequest::New(cntl);
  188. cntl->_current_call.stream_user_data = h2_req;
  189. brpc::SocketMessage* socket_message = NULL;
  190. brpc::policy::PackH2Request(NULL, &socket_message, cntl->call_id().value,
  191. NULL, cntl, request_buf, NULL);
  192. butil::Status st = socket_message->AppendAndDestroySelf(out, _h2_client_sock.get());
  193. ASSERT_TRUE(st.ok());
  194. *h2_stream_id = h2_req->_stream_id;
  195. }
  196. void MakeH2EchoResponseBuf(butil::IOBuf* out, int h2_stream_id) {
  197. brpc::Controller cntl;
  198. test::EchoResponse res;
  199. res.set_message(EXP_RESPONSE);
  200. cntl.http_request().set_content_type("application/proto");
  201. {
  202. butil::IOBufAsZeroCopyOutputStream wrapper(&cntl.response_attachment());
  203. EXPECT_TRUE(res.SerializeToZeroCopyStream(&wrapper));
  204. }
  205. brpc::policy::H2UnsentResponse* h2_res = brpc::policy::H2UnsentResponse::New(&cntl, h2_stream_id, false /*is grpc*/);
  206. butil::Status st = h2_res->AppendAndDestroySelf(out, _h2_client_sock.get());
  207. ASSERT_TRUE(st.ok());
  208. }
  209. int _pipe_fds[2];
  210. brpc::SocketUniquePtr _socket;
  211. brpc::SocketUniquePtr _h2_client_sock;
  212. brpc::Server _server;
  213. MyEchoService _svc;
  214. MyAuthenticator _auth;
  215. };
  216. TEST_F(HttpTest, indenting_ostream) {
  217. std::ostringstream os1;
  218. brpc::IndentingOStream is1(os1, 2);
  219. brpc::IndentingOStream is2(is1, 2);
  220. os1 << "begin1\nhello" << std::endl << "world\nend1" << std::endl;
  221. is1 << "begin2\nhello" << std::endl << "world\nend2" << std::endl;
  222. is2 << "begin3\nhello" << std::endl << "world\nend3" << std::endl;
  223. ASSERT_EQ(
  224. "begin1\nhello\nworld\nend1\nbegin2\n hello\n world\n end2\n"
  225. " begin3\n hello\n world\n end3\n",
  226. os1.str());
  227. }
  228. TEST_F(HttpTest, parse_http_address) {
  229. const std::string EXP_HOSTNAME = "www.baidu.com:9876";
  230. butil::EndPoint EXP_ENDPOINT;
  231. {
  232. std::string url = "https://" + EXP_HOSTNAME;
  233. EXPECT_TRUE(brpc::policy::ParseHttpServerAddress(&EXP_ENDPOINT, url.c_str()));
  234. }
  235. {
  236. butil::EndPoint ep;
  237. std::string url = "http://" +
  238. std::string(endpoint2str(EXP_ENDPOINT).c_str());
  239. EXPECT_TRUE(brpc::policy::ParseHttpServerAddress(&ep, url.c_str()));
  240. EXPECT_EQ(EXP_ENDPOINT, ep);
  241. }
  242. {
  243. butil::EndPoint ep;
  244. std::string url = "https://" +
  245. std::string(butil::ip2str(EXP_ENDPOINT.ip).c_str());
  246. EXPECT_TRUE(brpc::policy::ParseHttpServerAddress(&ep, url.c_str()));
  247. EXPECT_EQ(EXP_ENDPOINT.ip, ep.ip);
  248. EXPECT_EQ(443, ep.port);
  249. }
  250. {
  251. butil::EndPoint ep;
  252. EXPECT_FALSE(brpc::policy::ParseHttpServerAddress(&ep, "invalid_url"));
  253. }
  254. {
  255. butil::EndPoint ep;
  256. EXPECT_FALSE(brpc::policy::ParseHttpServerAddress(
  257. &ep, "https://no.such.machine:9090"));
  258. }
  259. }
  260. TEST_F(HttpTest, verify_request) {
  261. {
  262. brpc::policy::HttpContext* msg =
  263. MakePostRequestMessage("/EchoService/Echo");
  264. VerifyMessage(msg, false);
  265. msg->Destroy();
  266. }
  267. {
  268. brpc::policy::HttpContext* msg = MakeGetRequestMessage("/status");
  269. VerifyMessage(msg, true);
  270. msg->Destroy();
  271. }
  272. {
  273. brpc::policy::HttpContext* msg =
  274. MakePostRequestMessage("/EchoService/Echo");
  275. _socket->SetFailed();
  276. VerifyMessage(msg, false);
  277. msg->Destroy();
  278. }
  279. }
  280. TEST_F(HttpTest, process_request_failed_socket) {
  281. brpc::policy::HttpContext* msg = MakePostRequestMessage("/EchoService/Echo");
  282. _socket->SetFailed();
  283. ProcessMessage(brpc::policy::ProcessHttpRequest, msg, false);
  284. ASSERT_EQ(0ll, _server._nerror_bvar.get_value());
  285. CheckResponseCode(true, 0);
  286. }
  287. TEST_F(HttpTest, reject_get_to_pb_services_with_required_fields) {
  288. brpc::policy::HttpContext* msg = MakeGetRequestMessage("/EchoService/Echo");
  289. _server._status = brpc::Server::RUNNING;
  290. ProcessMessage(brpc::policy::ProcessHttpRequest, msg, false);
  291. ASSERT_EQ(0ll, _server._nerror_bvar.get_value());
  292. const brpc::Server::MethodProperty* mp =
  293. _server.FindMethodPropertyByFullName("test.EchoService.Echo");
  294. ASSERT_TRUE(mp);
  295. ASSERT_TRUE(mp->status);
  296. ASSERT_EQ(1ll, mp->status->_nerror_bvar.get_value());
  297. CheckResponseCode(false, brpc::HTTP_STATUS_BAD_REQUEST);
  298. }
  299. TEST_F(HttpTest, process_request_logoff) {
  300. brpc::policy::HttpContext* msg = MakePostRequestMessage("/EchoService/Echo");
  301. _server._status = brpc::Server::READY;
  302. ProcessMessage(brpc::policy::ProcessHttpRequest, msg, false);
  303. ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
  304. CheckResponseCode(false, brpc::HTTP_STATUS_SERVICE_UNAVAILABLE);
  305. }
  306. TEST_F(HttpTest, process_request_wrong_method) {
  307. brpc::policy::HttpContext* msg = MakePostRequestMessage("/NO_SUCH_METHOD");
  308. ProcessMessage(brpc::policy::ProcessHttpRequest, msg, false);
  309. ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
  310. CheckResponseCode(false, brpc::HTTP_STATUS_NOT_FOUND);
  311. }
  312. TEST_F(HttpTest, process_response_after_eof) {
  313. test::EchoResponse res;
  314. brpc::Controller cntl;
  315. cntl._response = &res;
  316. brpc::policy::HttpContext* msg =
  317. MakeResponseMessage(brpc::HTTP_STATUS_OK);
  318. _socket->set_correlation_id(cntl.call_id().value);
  319. ProcessMessage(brpc::policy::ProcessHttpResponse, msg, true);
  320. ASSERT_EQ(EXP_RESPONSE, res.message());
  321. ASSERT_TRUE(_socket->Failed());
  322. }
  323. TEST_F(HttpTest, process_response_error_code) {
  324. {
  325. brpc::Controller cntl;
  326. _socket->set_correlation_id(cntl.call_id().value);
  327. brpc::policy::HttpContext* msg =
  328. MakeResponseMessage(brpc::HTTP_STATUS_CONTINUE);
  329. ProcessMessage(brpc::policy::ProcessHttpResponse, msg, false);
  330. ASSERT_EQ(brpc::EHTTP, cntl.ErrorCode());
  331. ASSERT_EQ(brpc::HTTP_STATUS_CONTINUE, cntl.http_response().status_code());
  332. }
  333. {
  334. brpc::Controller cntl;
  335. _socket->set_correlation_id(cntl.call_id().value);
  336. brpc::policy::HttpContext* msg =
  337. MakeResponseMessage(brpc::HTTP_STATUS_TEMPORARY_REDIRECT);
  338. ProcessMessage(brpc::policy::ProcessHttpResponse, msg, false);
  339. ASSERT_EQ(brpc::EHTTP, cntl.ErrorCode());
  340. ASSERT_EQ(brpc::HTTP_STATUS_TEMPORARY_REDIRECT,
  341. cntl.http_response().status_code());
  342. }
  343. {
  344. brpc::Controller cntl;
  345. _socket->set_correlation_id(cntl.call_id().value);
  346. brpc::policy::HttpContext* msg =
  347. MakeResponseMessage(brpc::HTTP_STATUS_BAD_REQUEST);
  348. ProcessMessage(brpc::policy::ProcessHttpResponse, msg, false);
  349. ASSERT_EQ(brpc::EHTTP, cntl.ErrorCode());
  350. ASSERT_EQ(brpc::HTTP_STATUS_BAD_REQUEST,
  351. cntl.http_response().status_code());
  352. }
  353. {
  354. brpc::Controller cntl;
  355. _socket->set_correlation_id(cntl.call_id().value);
  356. brpc::policy::HttpContext* msg = MakeResponseMessage(12345);
  357. ProcessMessage(brpc::policy::ProcessHttpResponse, msg, false);
  358. ASSERT_EQ(brpc::EHTTP, cntl.ErrorCode());
  359. ASSERT_EQ(12345, cntl.http_response().status_code());
  360. }
  361. }
  362. TEST_F(HttpTest, complete_flow) {
  363. butil::IOBuf request_buf;
  364. butil::IOBuf total_buf;
  365. brpc::Controller cntl;
  366. test::EchoRequest req;
  367. test::EchoResponse res;
  368. cntl._response = &res;
  369. cntl._connection_type = brpc::CONNECTION_TYPE_SHORT;
  370. cntl._method = test::EchoService::descriptor()->method(0);
  371. ASSERT_EQ(0, brpc::Socket::Address(_socket->id(), &cntl._current_call.sending_sock));
  372. // Send request
  373. req.set_message(EXP_REQUEST);
  374. brpc::policy::SerializeHttpRequest(&request_buf, &cntl, &req);
  375. ASSERT_FALSE(cntl.Failed());
  376. brpc::policy::PackHttpRequest(
  377. &total_buf, NULL, cntl.call_id().value,
  378. cntl._method, &cntl, request_buf, &_auth);
  379. ASSERT_FALSE(cntl.Failed());
  380. // Verify and handle request
  381. brpc::ParseResult req_pr =
  382. brpc::policy::ParseHttpMessage(&total_buf, _socket.get(), false, NULL);
  383. ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
  384. brpc::InputMessageBase* req_msg = req_pr.message();
  385. VerifyMessage(req_msg, true);
  386. ProcessMessage(brpc::policy::ProcessHttpRequest, req_msg, false);
  387. // Read response from pipe
  388. butil::IOPortal response_buf;
  389. response_buf.append_from_file_descriptor(_pipe_fds[0], 1024);
  390. brpc::ParseResult res_pr =
  391. brpc::policy::ParseHttpMessage(&response_buf, _socket.get(), false, NULL);
  392. ASSERT_EQ(brpc::PARSE_OK, res_pr.error());
  393. brpc::InputMessageBase* res_msg = res_pr.message();
  394. ProcessMessage(brpc::policy::ProcessHttpResponse, res_msg, false);
  395. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  396. ASSERT_EQ(EXP_RESPONSE, res.message());
  397. }
  398. TEST_F(HttpTest, chunked_uploading) {
  399. const int port = 8923;
  400. brpc::Server server;
  401. EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  402. EXPECT_EQ(0, server.Start(port, NULL));
  403. // Send request via curl using chunked encoding
  404. const std::string req = "{\"message\":\"hello\"}";
  405. const std::string res_fname = "curl.out";
  406. std::string cmd;
  407. butil::string_printf(&cmd, "curl -X POST -d '%s' -H 'Transfer-Encoding:chunked' "
  408. "-H 'Content-Type:application/json' -o %s "
  409. "http://localhost:%d/EchoService/Echo",
  410. req.c_str(), res_fname.c_str(), port);
  411. ASSERT_EQ(0, system(cmd.c_str()));
  412. // Check response
  413. const std::string exp_res = "{\"message\":\"world\"}";
  414. butil::ScopedFILE fp(res_fname.c_str(), "r");
  415. char buf[128];
  416. ASSERT_TRUE(fgets(buf, sizeof(buf), fp));
  417. EXPECT_EQ(exp_res, std::string(buf));
  418. }
  419. enum DonePlace {
  420. DONE_BEFORE_CREATE_PA = 0,
  421. DONE_AFTER_CREATE_PA_BEFORE_DESTROY_PA,
  422. DONE_AFTER_DESTROY_PA,
  423. };
  424. // For writing into PA.
  425. const char PA_DATA[] = "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()_=-+";
  426. const size_t PA_DATA_LEN = sizeof(PA_DATA) - 1/*not count the ending zero*/;
  427. static void CopyPAPrefixedWithSeqNo(char* buf, uint64_t seq_no) {
  428. memcpy(buf, PA_DATA, PA_DATA_LEN);
  429. *(uint64_t*)buf = seq_no;
  430. }
  431. class DownloadServiceImpl : public ::test::DownloadService {
  432. public:
  433. DownloadServiceImpl(DonePlace done_place = DONE_BEFORE_CREATE_PA,
  434. size_t num_repeat = 1)
  435. : _done_place(done_place)
  436. , _nrep(num_repeat)
  437. , _nwritten(0)
  438. , _ever_full(false)
  439. , _last_errno(0) {}
  440. void Download(::google::protobuf::RpcController* cntl_base,
  441. const ::test::HttpRequest*,
  442. ::test::HttpResponse*,
  443. ::google::protobuf::Closure* done) {
  444. brpc::ClosureGuard done_guard(done);
  445. brpc::Controller* cntl =
  446. static_cast<brpc::Controller*>(cntl_base);
  447. cntl->http_response().set_content_type("text/plain");
  448. brpc::StopStyle stop_style = (_nrep == std::numeric_limits<size_t>::max()
  449. ? brpc::FORCE_STOP : brpc::WAIT_FOR_STOP);
  450. butil::intrusive_ptr<brpc::ProgressiveAttachment> pa
  451. = cntl->CreateProgressiveAttachment(stop_style);
  452. if (pa == NULL) {
  453. cntl->SetFailed("The socket was just failed");
  454. return;
  455. }
  456. if (_done_place == DONE_BEFORE_CREATE_PA) {
  457. done_guard.reset(NULL);
  458. }
  459. ASSERT_GT(PA_DATA_LEN, 8u); // long enough to hold a 64-bit decimal.
  460. char buf[PA_DATA_LEN];
  461. for (size_t c = 0; c < _nrep;) {
  462. CopyPAPrefixedWithSeqNo(buf, c);
  463. if (pa->Write(buf, sizeof(buf)) != 0) {
  464. if (errno == brpc::EOVERCROWDED) {
  465. LOG_EVERY_SECOND(INFO) << "full pa=" << pa.get();
  466. _ever_full = true;
  467. bthread_usleep(10000);
  468. continue;
  469. } else {
  470. _last_errno = errno;
  471. break;
  472. }
  473. } else {
  474. _nwritten += PA_DATA_LEN;
  475. }
  476. ++c;
  477. }
  478. if (_done_place == DONE_AFTER_CREATE_PA_BEFORE_DESTROY_PA) {
  479. done_guard.reset(NULL);
  480. }
  481. LOG(INFO) << "Destroy pa=" << pa.get();
  482. pa.reset(NULL);
  483. if (_done_place == DONE_AFTER_DESTROY_PA) {
  484. done_guard.reset(NULL);
  485. }
  486. }
  487. void DownloadFailed(::google::protobuf::RpcController* cntl_base,
  488. const ::test::HttpRequest*,
  489. ::test::HttpResponse*,
  490. ::google::protobuf::Closure* done) {
  491. brpc::ClosureGuard done_guard(done);
  492. brpc::Controller* cntl =
  493. static_cast<brpc::Controller*>(cntl_base);
  494. cntl->http_response().set_content_type("text/plain");
  495. brpc::StopStyle stop_style = (_nrep == std::numeric_limits<size_t>::max()
  496. ? brpc::FORCE_STOP : brpc::WAIT_FOR_STOP);
  497. butil::intrusive_ptr<brpc::ProgressiveAttachment> pa
  498. = cntl->CreateProgressiveAttachment(stop_style);
  499. if (pa == NULL) {
  500. cntl->SetFailed("The socket was just failed");
  501. return;
  502. }
  503. char buf[PA_DATA_LEN];
  504. while (true) {
  505. if (pa->Write(buf, sizeof(buf)) != 0) {
  506. if (errno == brpc::EOVERCROWDED) {
  507. LOG_EVERY_SECOND(INFO) << "full pa=" << pa.get();
  508. bthread_usleep(10000);
  509. continue;
  510. } else {
  511. _last_errno = errno;
  512. break;
  513. }
  514. }
  515. break;
  516. }
  517. // The remote client will not receive the data written to the
  518. // progressive attachment when the controller failed.
  519. cntl->SetFailed("Intentionally set controller failed");
  520. done_guard.reset(NULL);
  521. // Return value of Write after controller has failed should
  522. // be less than zero.
  523. CHECK_LT(pa->Write(buf, sizeof(buf)), 0);
  524. CHECK_EQ(errno, ECANCELED);
  525. }
  526. void set_done_place(DonePlace done_place) { _done_place = done_place; }
  527. size_t written_bytes() const { return _nwritten; }
  528. bool ever_full() const { return _ever_full; }
  529. int last_errno() const { return _last_errno; }
  530. private:
  531. DonePlace _done_place;
  532. size_t _nrep;
  533. size_t _nwritten;
  534. bool _ever_full;
  535. int _last_errno;
  536. };
  537. TEST_F(HttpTest, read_chunked_response_normally) {
  538. const int port = 8923;
  539. brpc::Server server;
  540. DownloadServiceImpl svc;
  541. EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  542. EXPECT_EQ(0, server.Start(port, NULL));
  543. for (int i = 0; i < 3; ++i) {
  544. svc.set_done_place((DonePlace)i);
  545. brpc::Channel channel;
  546. brpc::ChannelOptions options;
  547. options.protocol = brpc::PROTOCOL_HTTP;
  548. ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
  549. brpc::Controller cntl;
  550. cntl.http_request().uri() = "/DownloadService/Download";
  551. channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
  552. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  553. std::string expected(PA_DATA_LEN, 0);
  554. CopyPAPrefixedWithSeqNo(&expected[0], 0);
  555. ASSERT_EQ(expected, cntl.response_attachment());
  556. }
  557. }
  558. TEST_F(HttpTest, read_failed_chunked_response) {
  559. const int port = 8923;
  560. brpc::Server server;
  561. DownloadServiceImpl svc;
  562. EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  563. EXPECT_EQ(0, server.Start(port, NULL));
  564. brpc::Channel channel;
  565. brpc::ChannelOptions options;
  566. options.protocol = brpc::PROTOCOL_HTTP;
  567. ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
  568. brpc::Controller cntl;
  569. cntl.http_request().uri() = "/DownloadService/DownloadFailed";
  570. cntl.response_will_be_read_progressively();
  571. channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
  572. EXPECT_TRUE(cntl.response_attachment().empty());
  573. ASSERT_TRUE(cntl.Failed());
  574. ASSERT_NE(cntl.ErrorText().find("HTTP/1.1 500 Internal Server Error"),
  575. std::string::npos) << cntl.ErrorText();
  576. ASSERT_NE(cntl.ErrorText().find("Intentionally set controller failed"),
  577. std::string::npos) << cntl.ErrorText();
  578. ASSERT_EQ(0, svc.last_errno());
  579. }
  580. class ReadBody : public brpc::ProgressiveReader,
  581. public brpc::SharedObject {
  582. public:
  583. ReadBody()
  584. : _nread(0)
  585. , _ncount(0)
  586. , _destroyed(false) {
  587. butil::intrusive_ptr<ReadBody>(this).detach(); // ref
  588. }
  589. butil::Status OnReadOnePart(const void* data, size_t length) {
  590. _nread += length;
  591. while (length > 0) {
  592. size_t nappend = std::min(_buf.size() + length, PA_DATA_LEN) - _buf.size();
  593. _buf.append((const char*)data, nappend);
  594. data = (const char*)data + nappend;
  595. length -= nappend;
  596. if (_buf.size() >= PA_DATA_LEN) {
  597. EXPECT_EQ(PA_DATA_LEN, _buf.size());
  598. char expected[PA_DATA_LEN];
  599. CopyPAPrefixedWithSeqNo(expected, _ncount++);
  600. EXPECT_EQ(0, memcmp(expected, _buf.data(), PA_DATA_LEN))
  601. << "ncount=" << _ncount;
  602. _buf.clear();
  603. }
  604. }
  605. return butil::Status::OK();
  606. }
  607. void OnEndOfMessage(const butil::Status& st) {
  608. butil::intrusive_ptr<ReadBody>(this, false); // deref
  609. ASSERT_LT(_buf.size(), PA_DATA_LEN);
  610. ASSERT_EQ(0, memcmp(_buf.data(), PA_DATA, _buf.size()));
  611. _destroyed = true;
  612. _destroying_st = st;
  613. LOG(INFO) << "Destroy ReadBody=" << this << ", " << st;
  614. }
  615. bool destroyed() const { return _destroyed; }
  616. const butil::Status& destroying_status() const { return _destroying_st; }
  617. size_t read_bytes() const { return _nread; }
  618. private:
  619. std::string _buf;
  620. size_t _nread;
  621. size_t _ncount;
  622. bool _destroyed;
  623. butil::Status _destroying_st;
  624. };
  625. static const int GENERAL_DELAY_US = 300000; // 0.3s
  626. TEST_F(HttpTest, read_long_body_progressively) {
  627. butil::intrusive_ptr<ReadBody> reader;
  628. {
  629. const int port = 8923;
  630. brpc::Server server;
  631. DownloadServiceImpl svc(DONE_BEFORE_CREATE_PA,
  632. std::numeric_limits<size_t>::max());
  633. EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  634. EXPECT_EQ(0, server.Start(port, NULL));
  635. {
  636. brpc::Channel channel;
  637. brpc::ChannelOptions options;
  638. options.protocol = brpc::PROTOCOL_HTTP;
  639. ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
  640. {
  641. brpc::Controller cntl;
  642. cntl.response_will_be_read_progressively();
  643. cntl.http_request().uri() = "/DownloadService/Download";
  644. channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
  645. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  646. ASSERT_TRUE(cntl.response_attachment().empty());
  647. reader.reset(new ReadBody);
  648. cntl.ReadProgressiveAttachmentBy(reader.get());
  649. size_t last_read = 0;
  650. for (size_t i = 0; i < 3; ++i) {
  651. sleep(1);
  652. size_t current_read = reader->read_bytes();
  653. LOG(INFO) << "read=" << current_read - last_read
  654. << " total=" << current_read;
  655. last_read = current_read;
  656. }
  657. // Read something in past N seconds.
  658. ASSERT_GT(last_read, (size_t)100000);
  659. }
  660. // the socket still holds a ref.
  661. ASSERT_FALSE(reader->destroyed());
  662. }
  663. // Wait for recycling of the main socket.
  664. usleep(GENERAL_DELAY_US);
  665. // even if the main socket is recycled, the pooled socket for
  666. // receiving data is not affected.
  667. ASSERT_FALSE(reader->destroyed());
  668. }
  669. // Wait for close of the connection due to server's stopping.
  670. usleep(GENERAL_DELAY_US);
  671. ASSERT_TRUE(reader->destroyed());
  672. ASSERT_EQ(ECONNRESET, reader->destroying_status().error_code());
  673. }
  674. TEST_F(HttpTest, read_short_body_progressively) {
  675. butil::intrusive_ptr<ReadBody> reader;
  676. const int port = 8923;
  677. brpc::Server server;
  678. const int NREP = 10000;
  679. DownloadServiceImpl svc(DONE_BEFORE_CREATE_PA, NREP);
  680. EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  681. EXPECT_EQ(0, server.Start(port, NULL));
  682. {
  683. brpc::Channel channel;
  684. brpc::ChannelOptions options;
  685. options.protocol = brpc::PROTOCOL_HTTP;
  686. ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
  687. {
  688. brpc::Controller cntl;
  689. cntl.response_will_be_read_progressively();
  690. cntl.http_request().uri() = "/DownloadService/Download";
  691. channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
  692. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  693. ASSERT_TRUE(cntl.response_attachment().empty());
  694. reader.reset(new ReadBody);
  695. cntl.ReadProgressiveAttachmentBy(reader.get());
  696. size_t last_read = 0;
  697. for (size_t i = 0; i < 3; ++i) {
  698. sleep(1);
  699. size_t current_read = reader->read_bytes();
  700. LOG(INFO) << "read=" << current_read - last_read
  701. << " total=" << current_read;
  702. last_read = current_read;
  703. }
  704. ASSERT_EQ(NREP * PA_DATA_LEN, svc.written_bytes());
  705. ASSERT_EQ(NREP * PA_DATA_LEN, last_read);
  706. }
  707. ASSERT_TRUE(reader->destroyed());
  708. ASSERT_EQ(0, reader->destroying_status().error_code());
  709. }
  710. }
  711. TEST_F(HttpTest, read_progressively_after_cntl_destroys) {
  712. butil::intrusive_ptr<ReadBody> reader;
  713. {
  714. const int port = 8923;
  715. brpc::Server server;
  716. DownloadServiceImpl svc(DONE_BEFORE_CREATE_PA,
  717. std::numeric_limits<size_t>::max());
  718. EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  719. EXPECT_EQ(0, server.Start(port, NULL));
  720. {
  721. brpc::Channel channel;
  722. brpc::ChannelOptions options;
  723. options.protocol = brpc::PROTOCOL_HTTP;
  724. ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
  725. {
  726. brpc::Controller cntl;
  727. cntl.response_will_be_read_progressively();
  728. cntl.http_request().uri() = "/DownloadService/Download";
  729. channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
  730. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  731. ASSERT_TRUE(cntl.response_attachment().empty());
  732. reader.reset(new ReadBody);
  733. cntl.ReadProgressiveAttachmentBy(reader.get());
  734. }
  735. size_t last_read = 0;
  736. for (size_t i = 0; i < 3; ++i) {
  737. sleep(1);
  738. size_t current_read = reader->read_bytes();
  739. LOG(INFO) << "read=" << current_read - last_read
  740. << " total=" << current_read;
  741. last_read = current_read;
  742. }
  743. // Read something in past N seconds.
  744. ASSERT_GT(last_read, (size_t)100000);
  745. ASSERT_FALSE(reader->destroyed());
  746. }
  747. // Wait for recycling of the main socket.
  748. usleep(GENERAL_DELAY_US);
  749. ASSERT_FALSE(reader->destroyed());
  750. }
  751. // Wait for close of the connection due to server's stopping.
  752. usleep(GENERAL_DELAY_US);
  753. ASSERT_TRUE(reader->destroyed());
  754. ASSERT_EQ(ECONNRESET, reader->destroying_status().error_code());
  755. }
  756. TEST_F(HttpTest, read_progressively_after_long_delay) {
  757. butil::intrusive_ptr<ReadBody> reader;
  758. {
  759. const int port = 8923;
  760. brpc::Server server;
  761. DownloadServiceImpl svc(DONE_BEFORE_CREATE_PA,
  762. std::numeric_limits<size_t>::max());
  763. EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  764. EXPECT_EQ(0, server.Start(port, NULL));
  765. {
  766. brpc::Channel channel;
  767. brpc::ChannelOptions options;
  768. options.protocol = brpc::PROTOCOL_HTTP;
  769. ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
  770. {
  771. brpc::Controller cntl;
  772. cntl.response_will_be_read_progressively();
  773. cntl.http_request().uri() = "/DownloadService/Download";
  774. channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
  775. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  776. ASSERT_TRUE(cntl.response_attachment().empty());
  777. LOG(INFO) << "Sleep 3 seconds to make PA at server-side full";
  778. sleep(3);
  779. EXPECT_TRUE(svc.ever_full());
  780. ASSERT_EQ(0, svc.last_errno());
  781. reader.reset(new ReadBody);
  782. cntl.ReadProgressiveAttachmentBy(reader.get());
  783. size_t last_read = 0;
  784. for (size_t i = 0; i < 3; ++i) {
  785. sleep(1);
  786. size_t current_read = reader->read_bytes();
  787. LOG(INFO) << "read=" << current_read - last_read
  788. << " total=" << current_read;
  789. last_read = current_read;
  790. }
  791. // Read something in past N seconds.
  792. ASSERT_GT(last_read, (size_t)100000);
  793. }
  794. ASSERT_FALSE(reader->destroyed());
  795. }
  796. // Wait for recycling of the main socket.
  797. usleep(GENERAL_DELAY_US);
  798. ASSERT_FALSE(reader->destroyed());
  799. }
  800. // Wait for close of the connection due to server's stopping.
  801. usleep(GENERAL_DELAY_US);
  802. ASSERT_TRUE(reader->destroyed());
  803. ASSERT_EQ(ECONNRESET, reader->destroying_status().error_code());
  804. }
  805. TEST_F(HttpTest, skip_progressive_reading) {
  806. const int port = 8923;
  807. brpc::Server server;
  808. DownloadServiceImpl svc(DONE_BEFORE_CREATE_PA,
  809. std::numeric_limits<size_t>::max());
  810. EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  811. EXPECT_EQ(0, server.Start(port, NULL));
  812. brpc::Channel channel;
  813. brpc::ChannelOptions options;
  814. options.protocol = brpc::PROTOCOL_HTTP;
  815. ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
  816. {
  817. brpc::Controller cntl;
  818. cntl.response_will_be_read_progressively();
  819. cntl.http_request().uri() = "/DownloadService/Download";
  820. channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
  821. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  822. ASSERT_TRUE(cntl.response_attachment().empty());
  823. }
  824. const size_t old_written_bytes = svc.written_bytes();
  825. LOG(INFO) << "Sleep 3 seconds after destroy of Controller";
  826. sleep(3);
  827. const size_t new_written_bytes = svc.written_bytes();
  828. ASSERT_EQ(0, svc.last_errno());
  829. LOG(INFO) << "Server still wrote " << new_written_bytes - old_written_bytes;
  830. // The server side still wrote things.
  831. ASSERT_GT(new_written_bytes - old_written_bytes, (size_t)100000);
  832. }
  833. class AlwaysFailRead : public brpc::ProgressiveReader {
  834. public:
  835. // @ProgressiveReader
  836. butil::Status OnReadOnePart(const void* /*data*/, size_t /*length*/) {
  837. return butil::Status(-1, "intended fail at %s:%d", __FILE__, __LINE__);
  838. }
  839. void OnEndOfMessage(const butil::Status& st) {
  840. LOG(INFO) << "Destroy " << this << ": " << st;
  841. delete this;
  842. }
  843. };
  844. TEST_F(HttpTest, failed_on_read_one_part) {
  845. const int port = 8923;
  846. brpc::Server server;
  847. DownloadServiceImpl svc(DONE_BEFORE_CREATE_PA,
  848. std::numeric_limits<size_t>::max());
  849. EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  850. EXPECT_EQ(0, server.Start(port, NULL));
  851. brpc::Channel channel;
  852. brpc::ChannelOptions options;
  853. options.protocol = brpc::PROTOCOL_HTTP;
  854. ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
  855. {
  856. brpc::Controller cntl;
  857. cntl.response_will_be_read_progressively();
  858. cntl.http_request().uri() = "/DownloadService/Download";
  859. channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
  860. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  861. ASSERT_TRUE(cntl.response_attachment().empty());
  862. cntl.ReadProgressiveAttachmentBy(new AlwaysFailRead);
  863. }
  864. LOG(INFO) << "Sleep 1 second";
  865. sleep(1);
  866. ASSERT_NE(0, svc.last_errno());
  867. }
  868. TEST_F(HttpTest, broken_socket_stops_progressive_reading) {
  869. butil::intrusive_ptr<ReadBody> reader;
  870. const int port = 8923;
  871. brpc::Server server;
  872. DownloadServiceImpl svc(DONE_BEFORE_CREATE_PA,
  873. std::numeric_limits<size_t>::max());
  874. EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  875. EXPECT_EQ(0, server.Start(port, NULL));
  876. brpc::Channel channel;
  877. brpc::ChannelOptions options;
  878. options.protocol = brpc::PROTOCOL_HTTP;
  879. ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
  880. {
  881. brpc::Controller cntl;
  882. cntl.response_will_be_read_progressively();
  883. cntl.http_request().uri() = "/DownloadService/Download";
  884. channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
  885. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  886. ASSERT_TRUE(cntl.response_attachment().empty());
  887. reader.reset(new ReadBody);
  888. cntl.ReadProgressiveAttachmentBy(reader.get());
  889. size_t last_read = 0;
  890. for (size_t i = 0; i < 3; ++i) {
  891. sleep(1);
  892. size_t current_read = reader->read_bytes();
  893. LOG(INFO) << "read=" << current_read - last_read
  894. << " total=" << current_read;
  895. last_read = current_read;
  896. }
  897. // Read something in past N seconds.
  898. ASSERT_GT(last_read, (size_t)100000);
  899. }
  900. // the socket still holds a ref.
  901. ASSERT_FALSE(reader->destroyed());
  902. LOG(INFO) << "Stopping the server";
  903. server.Stop(0);
  904. server.Join();
  905. // Wait for error reporting from the socket.
  906. usleep(GENERAL_DELAY_US);
  907. ASSERT_TRUE(reader->destroyed());
  908. ASSERT_EQ(ECONNRESET, reader->destroying_status().error_code());
  909. }
  910. TEST_F(HttpTest, http2_sanity) {
  911. const int port = 8923;
  912. brpc::Server server;
  913. EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  914. EXPECT_EQ(0, server.Start(port, NULL));
  915. brpc::Channel channel;
  916. brpc::ChannelOptions options;
  917. options.protocol = "h2";
  918. ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
  919. // Check that the first request with size larger than the default window can
  920. // be sent out, when remote settings are not received.
  921. brpc::Controller cntl;
  922. test::EchoRequest big_req;
  923. test::EchoResponse res;
  924. std::string message(2 * 1024 * 1024 /* 2M */, 'x');
  925. big_req.set_message(message);
  926. cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
  927. cntl.http_request().uri() = "/EchoService/Echo";
  928. channel.CallMethod(NULL, &cntl, &big_req, &res, NULL);
  929. ASSERT_FALSE(cntl.Failed());
  930. ASSERT_EQ(EXP_RESPONSE, res.message());
  931. // socket replacement when streamId runs out, the initial streamId is a special
  932. // value set in ctor of H2Context so that the number 15000 is enough to run out
  933. // of stream.
  934. test::EchoRequest req;
  935. req.set_message(EXP_REQUEST);
  936. for (int i = 0; i < 15000; ++i) {
  937. brpc::Controller cntl;
  938. cntl.http_request().set_content_type("application/json");
  939. cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
  940. cntl.http_request().uri() = "/EchoService/Echo";
  941. channel.CallMethod(NULL, &cntl, &req, &res, NULL);
  942. ASSERT_FALSE(cntl.Failed());
  943. ASSERT_EQ(EXP_RESPONSE, res.message());
  944. }
  945. // check connection window size
  946. brpc::SocketUniquePtr main_ptr;
  947. brpc::SocketUniquePtr agent_ptr;
  948. EXPECT_EQ(brpc::Socket::Address(channel._server_id, &main_ptr), 0);
  949. EXPECT_EQ(main_ptr->GetAgentSocket(&agent_ptr, NULL), 0);
  950. brpc::policy::H2Context* ctx = static_cast<brpc::policy::H2Context*>(agent_ptr->parsing_context());
  951. ASSERT_GT(ctx->_remote_window_left.load(butil::memory_order_relaxed),
  952. brpc::H2Settings::DEFAULT_INITIAL_WINDOW_SIZE / 2);
  953. }
  954. TEST_F(HttpTest, http2_ping) {
  955. // This test injects PING frames before and after header and data.
  956. brpc::Controller cntl;
  957. // Prepare request
  958. butil::IOBuf req_out;
  959. int h2_stream_id = 0;
  960. MakeH2EchoRequestBuf(&req_out, &cntl, &h2_stream_id);
  961. // Prepare response
  962. butil::IOBuf res_out;
  963. char pingbuf[9 /*FRAME_HEAD_SIZE*/ + 8 /*Opaque Data*/];
  964. brpc::policy::SerializeFrameHead(pingbuf, 8, brpc::policy::H2_FRAME_PING, 0, 0);
  965. // insert ping before header and data
  966. res_out.append(pingbuf, sizeof(pingbuf));
  967. MakeH2EchoResponseBuf(&res_out, h2_stream_id);
  968. // insert ping after header and data
  969. res_out.append(pingbuf, sizeof(pingbuf));
  970. // parse response
  971. brpc::ParseResult res_pr =
  972. brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
  973. ASSERT_TRUE(res_pr.is_ok());
  974. // process response
  975. ProcessMessage(brpc::policy::ProcessHttpResponse, res_pr.message(), false);
  976. ASSERT_FALSE(cntl.Failed());
  977. }
  978. inline void SaveUint32(void* out, uint32_t v) {
  979. uint8_t* p = (uint8_t*)out;
  980. p[0] = (v >> 24) & 0xFF;
  981. p[1] = (v >> 16) & 0xFF;
  982. p[2] = (v >> 8) & 0xFF;
  983. p[3] = v & 0xFF;
  984. }
  985. TEST_F(HttpTest, http2_rst_before_header) {
  986. brpc::Controller cntl;
  987. // Prepare request
  988. butil::IOBuf req_out;
  989. int h2_stream_id = 0;
  990. MakeH2EchoRequestBuf(&req_out, &cntl, &h2_stream_id);
  991. // Prepare response
  992. butil::IOBuf res_out;
  993. char rstbuf[9 /*FRAME_HEAD_SIZE*/ + 4];
  994. brpc::policy::SerializeFrameHead(rstbuf, 4, brpc::policy::H2_FRAME_RST_STREAM, 0, h2_stream_id);
  995. SaveUint32(rstbuf + 9, brpc::H2_INTERNAL_ERROR);
  996. res_out.append(rstbuf, sizeof(rstbuf));
  997. MakeH2EchoResponseBuf(&res_out, h2_stream_id);
  998. // parse response
  999. brpc::ParseResult res_pr =
  1000. brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
  1001. ASSERT_TRUE(res_pr.is_ok());
  1002. // process response
  1003. ProcessMessage(brpc::policy::ProcessHttpResponse, res_pr.message(), false);
  1004. ASSERT_TRUE(cntl.Failed());
  1005. ASSERT_TRUE(cntl.ErrorCode() == brpc::EHTTP);
  1006. ASSERT_TRUE(cntl.http_response().status_code() == brpc::HTTP_STATUS_INTERNAL_SERVER_ERROR);
  1007. }
  1008. TEST_F(HttpTest, http2_rst_after_header_and_data) {
  1009. brpc::Controller cntl;
  1010. // Prepare request
  1011. butil::IOBuf req_out;
  1012. int h2_stream_id = 0;
  1013. MakeH2EchoRequestBuf(&req_out, &cntl, &h2_stream_id);
  1014. // Prepare response
  1015. butil::IOBuf res_out;
  1016. MakeH2EchoResponseBuf(&res_out, h2_stream_id);
  1017. char rstbuf[9 /*FRAME_HEAD_SIZE*/ + 4];
  1018. brpc::policy::SerializeFrameHead(rstbuf, 4, brpc::policy::H2_FRAME_RST_STREAM, 0, h2_stream_id);
  1019. SaveUint32(rstbuf + 9, brpc::H2_INTERNAL_ERROR);
  1020. res_out.append(rstbuf, sizeof(rstbuf));
  1021. // parse response
  1022. brpc::ParseResult res_pr =
  1023. brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
  1024. ASSERT_TRUE(res_pr.is_ok());
  1025. // process response
  1026. ProcessMessage(brpc::policy::ProcessHttpResponse, res_pr.message(), false);
  1027. ASSERT_FALSE(cntl.Failed());
  1028. ASSERT_TRUE(cntl.http_response().status_code() == brpc::HTTP_STATUS_OK);
  1029. }
  1030. TEST_F(HttpTest, http2_window_used_up) {
  1031. brpc::Controller cntl;
  1032. butil::IOBuf request_buf;
  1033. test::EchoRequest req;
  1034. // longer message to trigger using up window size sooner
  1035. req.set_message("FLOW_CONTROL_FLOW_CONTROL");
  1036. cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
  1037. cntl.http_request().set_content_type("application/proto");
  1038. brpc::policy::SerializeHttpRequest(&request_buf, &cntl, &req);
  1039. char settingsbuf[brpc::policy::FRAME_HEAD_SIZE + 36];
  1040. brpc::H2Settings h2_settings;
  1041. const size_t nb = brpc::policy::SerializeH2Settings(h2_settings, settingsbuf + brpc::policy::FRAME_HEAD_SIZE);
  1042. brpc::policy::SerializeFrameHead(settingsbuf, nb, brpc::policy::H2_FRAME_SETTINGS, 0, 0);
  1043. butil::IOBuf buf;
  1044. buf.append(settingsbuf, brpc::policy::FRAME_HEAD_SIZE + nb);
  1045. brpc::policy::ParseH2Message(&buf, _h2_client_sock.get(), false, NULL);
  1046. int nsuc = brpc::H2Settings::DEFAULT_INITIAL_WINDOW_SIZE / cntl.request_attachment().size();
  1047. for (int i = 0; i <= nsuc; i++) {
  1048. brpc::policy::H2UnsentRequest* h2_req = brpc::policy::H2UnsentRequest::New(&cntl);
  1049. cntl._current_call.stream_user_data = h2_req;
  1050. brpc::SocketMessage* socket_message = NULL;
  1051. brpc::policy::PackH2Request(NULL, &socket_message, cntl.call_id().value,
  1052. NULL, &cntl, request_buf, NULL);
  1053. butil::IOBuf dummy;
  1054. butil::Status st = socket_message->AppendAndDestroySelf(&dummy, _h2_client_sock.get());
  1055. if (i == nsuc) {
  1056. // the last message should fail according to flow control policy.
  1057. ASSERT_FALSE(st.ok());
  1058. ASSERT_TRUE(st.error_code() == brpc::ELIMIT);
  1059. ASSERT_TRUE(butil::StringPiece(st.error_str()).starts_with("remote_window_left is not enough"));
  1060. } else {
  1061. ASSERT_TRUE(st.ok());
  1062. }
  1063. h2_req->DestroyStreamUserData(_h2_client_sock, &cntl, 0, false);
  1064. }
  1065. }
  1066. TEST_F(HttpTest, http2_settings) {
  1067. char settingsbuf[brpc::policy::FRAME_HEAD_SIZE + 36];
  1068. brpc::H2Settings h2_settings;
  1069. h2_settings.header_table_size = 8192;
  1070. h2_settings.max_concurrent_streams = 1024;
  1071. h2_settings.stream_window_size= (1u << 29) - 1;
  1072. const size_t nb = brpc::policy::SerializeH2Settings(h2_settings, settingsbuf + brpc::policy::FRAME_HEAD_SIZE);
  1073. brpc::policy::SerializeFrameHead(settingsbuf, nb, brpc::policy::H2_FRAME_SETTINGS, 0, 0);
  1074. butil::IOBuf buf;
  1075. buf.append(settingsbuf, brpc::policy::FRAME_HEAD_SIZE + nb);
  1076. brpc::policy::H2Context* ctx = new brpc::policy::H2Context(_socket.get(), NULL);
  1077. CHECK_EQ(ctx->Init(), 0);
  1078. _socket->initialize_parsing_context(&ctx);
  1079. ctx->_conn_state = brpc::policy::H2_CONNECTION_READY;
  1080. // parse settings
  1081. brpc::policy::ParseH2Message(&buf, _socket.get(), false, NULL);
  1082. butil::IOPortal response_buf;
  1083. CHECK_EQ(response_buf.append_from_file_descriptor(_pipe_fds[0], 1024),
  1084. (ssize_t)brpc::policy::FRAME_HEAD_SIZE);
  1085. brpc::policy::H2FrameHead frame_head;
  1086. butil::IOBufBytesIterator it(response_buf);
  1087. ctx->ConsumeFrameHead(it, &frame_head);
  1088. CHECK_EQ(frame_head.type, brpc::policy::H2_FRAME_SETTINGS);
  1089. CHECK_EQ(frame_head.flags, 0x01 /* H2_FLAGS_ACK */);
  1090. CHECK_EQ(frame_head.stream_id, 0);
  1091. ASSERT_TRUE(ctx->_remote_settings.header_table_size == 8192);
  1092. ASSERT_TRUE(ctx->_remote_settings.max_concurrent_streams == 1024);
  1093. ASSERT_TRUE(ctx->_remote_settings.stream_window_size == (1u << 29) - 1);
  1094. }
  1095. TEST_F(HttpTest, http2_invalid_settings) {
  1096. {
  1097. brpc::Server server;
  1098. brpc::ServerOptions options;
  1099. options.h2_settings.stream_window_size = brpc::H2Settings::MAX_WINDOW_SIZE + 1;
  1100. ASSERT_EQ(-1, server.Start("127.0.0.1:8924", &options));
  1101. }
  1102. {
  1103. brpc::Server server;
  1104. brpc::ServerOptions options;
  1105. options.h2_settings.max_frame_size =
  1106. brpc::H2Settings::DEFAULT_MAX_FRAME_SIZE - 1;
  1107. ASSERT_EQ(-1, server.Start("127.0.0.1:8924", &options));
  1108. }
  1109. {
  1110. brpc::Server server;
  1111. brpc::ServerOptions options;
  1112. options.h2_settings.max_frame_size =
  1113. brpc::H2Settings::MAX_OF_MAX_FRAME_SIZE + 1;
  1114. ASSERT_EQ(-1, server.Start("127.0.0.1:8924", &options));
  1115. }
  1116. }
  1117. TEST_F(HttpTest, http2_not_closing_socket_when_rpc_timeout) {
  1118. const int port = 8923;
  1119. brpc::Server server;
  1120. EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  1121. EXPECT_EQ(0, server.Start(port, NULL));
  1122. brpc::Channel channel;
  1123. brpc::ChannelOptions options;
  1124. options.protocol = "h2";
  1125. ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
  1126. test::EchoRequest req;
  1127. test::EchoResponse res;
  1128. req.set_message(EXP_REQUEST);
  1129. {
  1130. // make a successful call to create the connection first
  1131. brpc::Controller cntl;
  1132. cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
  1133. cntl.http_request().uri() = "/EchoService/Echo";
  1134. channel.CallMethod(NULL, &cntl, &req, &res, NULL);
  1135. ASSERT_FALSE(cntl.Failed());
  1136. ASSERT_EQ(EXP_RESPONSE, res.message());
  1137. }
  1138. brpc::SocketUniquePtr main_ptr;
  1139. EXPECT_EQ(brpc::Socket::Address(channel._server_id, &main_ptr), 0);
  1140. brpc::SocketId agent_id = main_ptr->_agent_socket_id.load(butil::memory_order_relaxed);
  1141. for (int i = 0; i < 4; i++) {
  1142. brpc::Controller cntl;
  1143. cntl.set_timeout_ms(50);
  1144. cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
  1145. cntl.http_request().uri() = "/EchoService/Echo?sleep_ms=300";
  1146. channel.CallMethod(NULL, &cntl, &req, &res, NULL);
  1147. ASSERT_TRUE(cntl.Failed());
  1148. brpc::SocketUniquePtr ptr;
  1149. brpc::SocketId id = main_ptr->_agent_socket_id.load(butil::memory_order_relaxed);
  1150. EXPECT_EQ(id, agent_id);
  1151. }
  1152. {
  1153. // make a successful call again to make sure agent_socket not changing
  1154. brpc::Controller cntl;
  1155. cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
  1156. cntl.http_request().uri() = "/EchoService/Echo";
  1157. channel.CallMethod(NULL, &cntl, &req, &res, NULL);
  1158. ASSERT_FALSE(cntl.Failed());
  1159. ASSERT_EQ(EXP_RESPONSE, res.message());
  1160. brpc::SocketUniquePtr ptr;
  1161. brpc::SocketId id = main_ptr->_agent_socket_id.load(butil::memory_order_relaxed);
  1162. EXPECT_EQ(id, agent_id);
  1163. }
  1164. }
  1165. TEST_F(HttpTest, http2_header_after_data) {
  1166. brpc::Controller cntl;
  1167. // Prepare request
  1168. butil::IOBuf req_out;
  1169. int h2_stream_id = 0;
  1170. MakeH2EchoRequestBuf(&req_out, &cntl, &h2_stream_id);
  1171. // Prepare response to res_out
  1172. butil::IOBuf res_out;
  1173. {
  1174. butil::IOBuf data_buf;
  1175. test::EchoResponse res;
  1176. res.set_message(EXP_RESPONSE);
  1177. {
  1178. butil::IOBufAsZeroCopyOutputStream wrapper(&data_buf);
  1179. EXPECT_TRUE(res.SerializeToZeroCopyStream(&wrapper));
  1180. }
  1181. brpc::policy::H2Context* ctx =
  1182. static_cast<brpc::policy::H2Context*>(_h2_client_sock->parsing_context());
  1183. brpc::HPacker& hpacker = ctx->hpacker();
  1184. butil::IOBufAppender header1_appender;
  1185. brpc::HPackOptions options;
  1186. options.encode_name = false; /* disable huffman encoding */
  1187. options.encode_value = false;
  1188. {
  1189. brpc::HPacker::Header header(":status", "200");
  1190. hpacker.Encode(&header1_appender, header, options);
  1191. }
  1192. {
  1193. brpc::HPacker::Header header("content-length",
  1194. butil::string_printf("%llu", (unsigned long long)data_buf.size()));
  1195. hpacker.Encode(&header1_appender, header, options);
  1196. }
  1197. {
  1198. brpc::HPacker::Header header(":status", "200");
  1199. hpacker.Encode(&header1_appender, header, options);
  1200. }
  1201. {
  1202. brpc::HPacker::Header header("content-type", "application/proto");
  1203. hpacker.Encode(&header1_appender, header, options);
  1204. }
  1205. {
  1206. brpc::HPacker::Header header("user-defined1", "a");
  1207. hpacker.Encode(&header1_appender, header, options);
  1208. }
  1209. butil::IOBuf header1;
  1210. header1_appender.move_to(header1);
  1211. char headbuf[brpc::policy::FRAME_HEAD_SIZE];
  1212. brpc::policy::SerializeFrameHead(headbuf, header1.size(),
  1213. brpc::policy::H2_FRAME_HEADERS, 0, h2_stream_id);
  1214. // append header1
  1215. res_out.append(headbuf, sizeof(headbuf));
  1216. res_out.append(butil::IOBuf::Movable(header1));
  1217. brpc::policy::SerializeFrameHead(headbuf, data_buf.size(),
  1218. brpc::policy::H2_FRAME_DATA, 0, h2_stream_id);
  1219. // append data
  1220. res_out.append(headbuf, sizeof(headbuf));
  1221. res_out.append(butil::IOBuf::Movable(data_buf));
  1222. butil::IOBufAppender header2_appender;
  1223. {
  1224. brpc::HPacker::Header header("user-defined1", "overwrite-a");
  1225. hpacker.Encode(&header2_appender, header, options);
  1226. }
  1227. {
  1228. brpc::HPacker::Header header("user-defined2", "b");
  1229. hpacker.Encode(&header2_appender, header, options);
  1230. }
  1231. butil::IOBuf header2;
  1232. header2_appender.move_to(header2);
  1233. brpc::policy::SerializeFrameHead(headbuf, header2.size(),
  1234. brpc::policy::H2_FRAME_HEADERS, 0x05/* end header and stream */,
  1235. h2_stream_id);
  1236. // append header2
  1237. res_out.append(headbuf, sizeof(headbuf));
  1238. res_out.append(butil::IOBuf::Movable(header2));
  1239. }
  1240. // parse response
  1241. brpc::ParseResult res_pr =
  1242. brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
  1243. ASSERT_TRUE(res_pr.is_ok());
  1244. // process response
  1245. ProcessMessage(brpc::policy::ProcessHttpResponse, res_pr.message(), false);
  1246. ASSERT_FALSE(cntl.Failed());
  1247. brpc::HttpHeader& res_header = cntl.http_response();
  1248. ASSERT_EQ(res_header.content_type(), "application/proto");
  1249. // Check overlapped header is overwritten by the latter.
  1250. const std::string* user_defined1 = res_header.GetHeader("user-defined1");
  1251. ASSERT_EQ(*user_defined1, "overwrite-a");
  1252. const std::string* user_defined2 = res_header.GetHeader("user-defined2");
  1253. ASSERT_EQ(*user_defined2, "b");
  1254. }
  1255. TEST_F(HttpTest, http2_goaway_sanity) {
  1256. brpc::Controller cntl;
  1257. // Prepare request
  1258. butil::IOBuf req_out;
  1259. int h2_stream_id = 0;
  1260. MakeH2EchoRequestBuf(&req_out, &cntl, &h2_stream_id);
  1261. // Prepare response
  1262. butil::IOBuf res_out;
  1263. MakeH2EchoResponseBuf(&res_out, h2_stream_id);
  1264. // append goaway
  1265. char goawaybuf[9 /*FRAME_HEAD_SIZE*/ + 8];
  1266. brpc::policy::SerializeFrameHead(goawaybuf, 8, brpc::policy::H2_FRAME_GOAWAY, 0, 0);
  1267. SaveUint32(goawaybuf + 9, 0x7fffd8ef /*last stream id*/);
  1268. SaveUint32(goawaybuf + 13, brpc::H2_NO_ERROR);
  1269. res_out.append(goawaybuf, sizeof(goawaybuf));
  1270. // parse response
  1271. brpc::ParseResult res_pr =
  1272. brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
  1273. ASSERT_TRUE(res_pr.is_ok());
  1274. // process response
  1275. ProcessMessage(brpc::policy::ProcessHttpResponse, res_pr.message(), false);
  1276. ASSERT_TRUE(!cntl.Failed());
  1277. // parse GOAWAY
  1278. res_pr = brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
  1279. ASSERT_EQ(res_pr.error(), brpc::PARSE_ERROR_NOT_ENOUGH_DATA);
  1280. // Since GOAWAY has been received, the next request should fail
  1281. brpc::policy::H2UnsentRequest* h2_req = brpc::policy::H2UnsentRequest::New(&cntl);
  1282. cntl._current_call.stream_user_data = h2_req;
  1283. brpc::SocketMessage* socket_message = NULL;
  1284. brpc::policy::PackH2Request(NULL, &socket_message, cntl.call_id().value,
  1285. NULL, &cntl, butil::IOBuf(), NULL);
  1286. butil::IOBuf dummy;
  1287. butil::Status st = socket_message->AppendAndDestroySelf(&dummy, _h2_client_sock.get());
  1288. ASSERT_EQ(st.error_code(), brpc::ELOGOFF);
  1289. ASSERT_TRUE(st.error_data().ends_with("the connection just issued GOAWAY"));
  1290. }
  1291. class AfterRecevingGoAway : public ::google::protobuf::Closure {
  1292. public:
  1293. void Run() {
  1294. ASSERT_EQ(brpc::EHTTP, cntl.ErrorCode());
  1295. delete this;
  1296. }
  1297. brpc::Controller cntl;
  1298. };
  1299. TEST_F(HttpTest, http2_handle_goaway_streams) {
  1300. const butil::EndPoint ep(butil::IP_ANY, 5961);
  1301. butil::fd_guard listenfd(butil::tcp_listen(ep));
  1302. ASSERT_GT(listenfd, 0);
  1303. brpc::Channel channel;
  1304. brpc::ChannelOptions options;
  1305. options.protocol = brpc::PROTOCOL_H2;
  1306. ASSERT_EQ(0, channel.Init(ep, &options));
  1307. int req_size = 10;
  1308. std::vector<brpc::CallId> ids(req_size);
  1309. for (int i = 0; i < req_size; i++) {
  1310. AfterRecevingGoAway* done = new AfterRecevingGoAway;
  1311. brpc::Controller& cntl = done->cntl;
  1312. ids.push_back(cntl.call_id());
  1313. cntl.set_timeout_ms(-1);
  1314. cntl.http_request().uri() = "/it-doesnt-matter";
  1315. channel.CallMethod(NULL, &cntl, NULL, NULL, done);
  1316. }
  1317. int servfd = accept(listenfd, NULL, NULL);
  1318. ASSERT_GT(servfd, 0);
  1319. // Sleep for a while to make sure that server has received all data.
  1320. bthread_usleep(2000);
  1321. char goawaybuf[brpc::policy::FRAME_HEAD_SIZE + 8];
  1322. SerializeFrameHead(goawaybuf, 8, brpc::policy::H2_FRAME_GOAWAY, 0, 0);
  1323. SaveUint32(goawaybuf + brpc::policy::FRAME_HEAD_SIZE, 0);
  1324. SaveUint32(goawaybuf + brpc::policy::FRAME_HEAD_SIZE + 4, 0);
  1325. ASSERT_EQ((ssize_t)brpc::policy::FRAME_HEAD_SIZE + 8, ::write(servfd, goawaybuf, brpc::policy::FRAME_HEAD_SIZE + 8));
  1326. // After receving GOAWAY, the callbacks in client should be run correctly.
  1327. for (int i = 0; i < req_size; i++) {
  1328. brpc::Join(ids[i]);
  1329. }
  1330. }
  1331. TEST_F(HttpTest, spring_protobuf_content_type) {
  1332. const int port = 8923;
  1333. brpc::Server server;
  1334. EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  1335. EXPECT_EQ(0, server.Start(port, nullptr));
  1336. brpc::Channel channel;
  1337. brpc::ChannelOptions options;
  1338. options.protocol = "http";
  1339. ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
  1340. brpc::Controller cntl;
  1341. test::EchoRequest req;
  1342. test::EchoResponse res;
  1343. req.set_message(EXP_REQUEST);
  1344. cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
  1345. cntl.http_request().uri() = "/EchoService/Echo";
  1346. cntl.http_request().set_content_type("application/x-protobuf");
  1347. cntl.request_attachment().append(req.SerializeAsString());
  1348. channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr);
  1349. ASSERT_FALSE(cntl.Failed());
  1350. ASSERT_EQ("application/x-protobuf", cntl.http_response().content_type());
  1351. ASSERT_TRUE(res.ParseFromString(cntl.response_attachment().to_string()));
  1352. ASSERT_EQ(EXP_RESPONSE, res.message());
  1353. brpc::Controller cntl2;
  1354. test::EchoService_Stub stub(&channel);
  1355. req.set_message(EXP_REQUEST);
  1356. res.Clear();
  1357. cntl2.http_request().set_content_type("application/x-protobuf");
  1358. stub.Echo(&cntl2, &req, &res, nullptr);
  1359. ASSERT_FALSE(cntl.Failed());
  1360. ASSERT_EQ(EXP_RESPONSE, res.message());
  1361. ASSERT_EQ("application/x-protobuf", cntl.http_response().content_type());
  1362. }
  1363. } //namespace