brpc_hulu_pbrpc_protocol_unittest.cpp 12 KB


  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // brpc - A framework to host and access services throughout Baidu.
  18. // Date: Sun Jul 13 15:04:18 CST 2014
  19. #include <sys/ioctl.h>
  20. #include <sys/types.h>
  21. #include <sys/socket.h>
  22. #include <gtest/gtest.h>
  23. #include <gflags/gflags.h>
  24. #include <google/protobuf/descriptor.h>
  25. #include "butil/time.h"
  26. #include "butil/macros.h"
  27. #include "butil/gperftools_profiler.h"
  28. #include "brpc/socket.h"
  29. #include "brpc/acceptor.h"
  30. #include "brpc/server.h"
  31. #include "brpc/policy/hulu_pbrpc_meta.pb.h"
  32. #include "brpc/policy/hulu_pbrpc_protocol.h"
  33. #include "brpc/policy/most_common_message.h"
  34. #include "brpc/controller.h"
  35. #include "echo.pb.h"
  36. int main(int argc, char* argv[]) {
  37. testing::InitGoogleTest(&argc, argv);
  38. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
  39. return RUN_ALL_TESTS();
  40. }
  41. namespace {
  42. static const std::string EXP_REQUEST = "hello";
  43. static const std::string EXP_RESPONSE = "world";
  44. static const std::string MOCK_CREDENTIAL = "mock credential";
  45. static const std::string MOCK_USER = "mock user";
  46. class MyAuthenticator : public brpc::Authenticator {
  47. public:
  48. MyAuthenticator() {}
  49. int GenerateCredential(std::string* auth_str) const {
  50. *auth_str = MOCK_CREDENTIAL;
  51. return 0;
  52. }
  53. int VerifyCredential(const std::string& auth_str,
  54. const butil::EndPoint&,
  55. brpc::AuthContext* ctx) const {
  56. EXPECT_EQ(MOCK_CREDENTIAL, auth_str);
  57. ctx->set_user(MOCK_USER);
  58. return 0;
  59. }
  60. };
  61. class MyEchoService : public ::test::EchoService {
  62. void Echo(::google::protobuf::RpcController* cntl_base,
  63. const ::test::EchoRequest* req,
  64. ::test::EchoResponse* res,
  65. ::google::protobuf::Closure* done) {
  66. brpc::Controller* cntl =
  67. static_cast<brpc::Controller*>(cntl_base);
  68. brpc::ClosureGuard done_guard(done);
  69. if (req->close_fd()) {
  70. cntl->CloseConnection("Close connection according to request");
  71. return;
  72. }
  73. if (cntl->auth_context()) {
  74. EXPECT_EQ(MOCK_USER, cntl->auth_context()->user());
  75. }
  76. EXPECT_EQ(EXP_REQUEST, req->message());
  77. if (!cntl->request_attachment().empty()) {
  78. EXPECT_EQ(EXP_REQUEST, cntl->request_attachment().to_string());
  79. cntl->response_attachment().append(EXP_RESPONSE);
  80. }
  81. res->set_message(EXP_RESPONSE);
  82. }
  83. };
  84. class HuluTest : public ::testing::Test{
  85. protected:
  86. HuluTest() {
  87. EXPECT_EQ(0, _server.AddService(
  88. &_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  89. // Hack: Regard `_server' as running
  90. _server._status = brpc::Server::RUNNING;
  91. _server._options.auth = &_auth;
  92. EXPECT_EQ(0, pipe(_pipe_fds));
  93. brpc::SocketId id;
  94. brpc::SocketOptions options;
  95. options.fd = _pipe_fds[1];
  96. EXPECT_EQ(0, brpc::Socket::Create(options, &id));
  97. EXPECT_EQ(0, brpc::Socket::Address(id, &_socket));
  98. };
  99. virtual ~HuluTest() {};
  100. virtual void SetUp() {};
  101. virtual void TearDown() {};
  102. void VerifyMessage(brpc::InputMessageBase* msg) {
  103. if (msg->_socket == NULL) {
  104. _socket->ReAddress(&msg->_socket);
  105. }
  106. msg->_arg = &_server;
  107. EXPECT_TRUE(brpc::policy::VerifyHuluRequest(msg));
  108. }
  109. void ProcessMessage(void (*process)(brpc::InputMessageBase*),
  110. brpc::InputMessageBase* msg, bool set_eof) {
  111. if (msg->_socket == NULL) {
  112. _socket.get()->ReAddress(&msg->_socket);
  113. }
  114. msg->_arg = &_server;
  115. _socket->PostponeEOF();
  116. if (set_eof) {
  117. _socket->SetEOF();
  118. }
  119. (*process)(msg);
  120. }
  121. brpc::policy::MostCommonMessage* MakeRequestMessage(
  122. const brpc::policy::HuluRpcRequestMeta& meta) {
  123. brpc::policy::MostCommonMessage* msg =
  124. brpc::policy::MostCommonMessage::Get();
  125. butil::IOBufAsZeroCopyOutputStream meta_stream(&msg->meta);
  126. EXPECT_TRUE(meta.SerializeToZeroCopyStream(&meta_stream));
  127. test::EchoRequest req;
  128. req.set_message(EXP_REQUEST);
  129. butil::IOBufAsZeroCopyOutputStream req_stream(&msg->payload);
  130. EXPECT_TRUE(req.SerializeToZeroCopyStream(&req_stream));
  131. return msg;
  132. }
  133. brpc::policy::MostCommonMessage* MakeResponseMessage(
  134. const brpc::policy::HuluRpcResponseMeta& meta) {
  135. brpc::policy::MostCommonMessage* msg =
  136. brpc::policy::MostCommonMessage::Get();
  137. butil::IOBufAsZeroCopyOutputStream meta_stream(&msg->meta);
  138. EXPECT_TRUE(meta.SerializeToZeroCopyStream(&meta_stream));
  139. test::EchoResponse res;
  140. res.set_message(EXP_RESPONSE);
  141. butil::IOBufAsZeroCopyOutputStream res_stream(&msg->payload);
  142. EXPECT_TRUE(res.SerializeToZeroCopyStream(&res_stream));
  143. return msg;
  144. }
  145. void CheckResponseCode(bool expect_empty, int expect_code) {
  146. int bytes_in_pipe = 0;
  147. ioctl(_pipe_fds[0], FIONREAD, &bytes_in_pipe);
  148. if (expect_empty) {
  149. EXPECT_EQ(0, bytes_in_pipe);
  150. return;
  151. }
  152. EXPECT_GT(bytes_in_pipe, 0);
  153. butil::IOPortal buf;
  154. EXPECT_EQ((ssize_t)bytes_in_pipe,
  155. buf.append_from_file_descriptor(_pipe_fds[0], 1024));
  156. brpc::ParseResult pr = brpc::policy::ParseHuluMessage(&buf, NULL, false, NULL);
  157. EXPECT_EQ(brpc::PARSE_OK, pr.error());
  158. brpc::policy::MostCommonMessage* msg =
  159. static_cast<brpc::policy::MostCommonMessage*>(pr.message());
  160. brpc::policy::HuluRpcResponseMeta meta;
  161. butil::IOBufAsZeroCopyInputStream meta_stream(msg->meta);
  162. EXPECT_TRUE(meta.ParseFromZeroCopyStream(&meta_stream));
  163. EXPECT_EQ(expect_code, meta.error_code());
  164. }
  165. void TestHuluCompress(brpc::CompressType type) {
  166. butil::IOBuf request_buf;
  167. butil::IOBuf total_buf;
  168. brpc::Controller cntl;
  169. test::EchoRequest req;
  170. test::EchoResponse res;
  171. cntl._response = &res;
  172. req.set_message(EXP_REQUEST);
  173. cntl.set_request_compress_type(type);
  174. brpc::SerializeRequestDefault(&request_buf, &cntl, &req);
  175. ASSERT_FALSE(cntl.Failed());
  176. brpc::policy::PackHuluRequest(
  177. &total_buf, NULL, cntl.call_id().value,
  178. test::EchoService::descriptor()->method(0),
  179. &cntl, request_buf, &_auth);
  180. ASSERT_FALSE(cntl.Failed());
  181. brpc::ParseResult req_pr =
  182. brpc::policy::ParseHuluMessage(&total_buf, NULL, false, NULL);
  183. ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
  184. brpc::InputMessageBase* req_msg = req_pr.message();
  185. ProcessMessage(brpc::policy::ProcessHuluRequest, req_msg, false);
  186. CheckResponseCode(false, 0);
  187. }
  188. int _pipe_fds[2];
  189. brpc::SocketUniquePtr _socket;
  190. brpc::Server _server;
  191. MyEchoService _svc;
  192. MyAuthenticator _auth;
  193. };
  194. TEST_F(HuluTest, process_request_failed_socket) {
  195. brpc::policy::HuluRpcRequestMeta meta;
  196. meta.set_service_name("EchoService");
  197. meta.set_method_index(0);
  198. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
  199. _socket->SetFailed();
  200. ProcessMessage(brpc::policy::ProcessHuluRequest, msg, false);
  201. ASSERT_EQ(0ll, _server._nerror_bvar.get_value());
  202. CheckResponseCode(true, 0);
  203. }
  204. TEST_F(HuluTest, process_request_logoff) {
  205. brpc::policy::HuluRpcRequestMeta meta;
  206. meta.set_service_name("EchoService");
  207. meta.set_method_index(0);
  208. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
  209. _server._status = brpc::Server::READY;
  210. ProcessMessage(brpc::policy::ProcessHuluRequest, msg, false);
  211. ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
  212. CheckResponseCode(false, brpc::ELOGOFF);
  213. }
  214. TEST_F(HuluTest, process_request_wrong_method) {
  215. brpc::policy::HuluRpcRequestMeta meta;
  216. meta.set_service_name("EchoService");
  217. meta.set_method_index(10);
  218. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
  219. ProcessMessage(brpc::policy::ProcessHuluRequest, msg, false);
  220. ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
  221. CheckResponseCode(false, brpc::ENOMETHOD);
  222. }
  223. TEST_F(HuluTest, process_response_after_eof) {
  224. brpc::policy::HuluRpcResponseMeta meta;
  225. test::EchoResponse res;
  226. brpc::Controller cntl;
  227. meta.set_correlation_id(cntl.call_id().value);
  228. cntl._response = &res;
  229. brpc::policy::MostCommonMessage* msg = MakeResponseMessage(meta);
  230. ProcessMessage(brpc::policy::ProcessHuluResponse, msg, true);
  231. ASSERT_EQ(EXP_RESPONSE, res.message());
  232. ASSERT_TRUE(_socket->Failed());
  233. }
  234. TEST_F(HuluTest, process_response_error_code) {
  235. const int ERROR_CODE = 12345;
  236. brpc::policy::HuluRpcResponseMeta meta;
  237. brpc::Controller cntl;
  238. meta.set_correlation_id(cntl.call_id().value);
  239. meta.set_error_code(ERROR_CODE);
  240. brpc::policy::MostCommonMessage* msg = MakeResponseMessage(meta);
  241. ProcessMessage(brpc::policy::ProcessHuluResponse, msg, false);
  242. ASSERT_EQ(ERROR_CODE, cntl.ErrorCode());
  243. }
  244. TEST_F(HuluTest, complete_flow) {
  245. butil::IOBuf request_buf;
  246. butil::IOBuf total_buf;
  247. brpc::Controller cntl;
  248. test::EchoRequest req;
  249. test::EchoResponse res;
  250. cntl._response = &res;
  251. // Send request
  252. req.set_message(EXP_REQUEST);
  253. brpc::SerializeRequestDefault(&request_buf, &cntl, &req);
  254. ASSERT_FALSE(cntl.Failed());
  255. cntl.request_attachment().append(EXP_REQUEST);
  256. brpc::policy::PackHuluRequest(
  257. &total_buf, NULL, cntl.call_id().value,
  258. test::EchoService::descriptor()->method(0), &cntl, request_buf, &_auth);
  259. ASSERT_FALSE(cntl.Failed());
  260. // Verify and handle request
  261. brpc::ParseResult req_pr =
  262. brpc::policy::ParseHuluMessage(&total_buf, NULL, false, NULL);
  263. ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
  264. brpc::InputMessageBase* req_msg = req_pr.message();
  265. VerifyMessage(req_msg);
  266. ProcessMessage(brpc::policy::ProcessHuluRequest, req_msg, false);
  267. // Read response from pipe
  268. butil::IOPortal response_buf;
  269. response_buf.append_from_file_descriptor(_pipe_fds[0], 1024);
  270. brpc::ParseResult res_pr =
  271. brpc::policy::ParseHuluMessage(&response_buf, NULL, false, NULL);
  272. ASSERT_EQ(brpc::PARSE_OK, res_pr.error());
  273. brpc::InputMessageBase* res_msg = res_pr.message();
  274. ProcessMessage(brpc::policy::ProcessHuluResponse, res_msg, false);
  275. ASSERT_FALSE(cntl.Failed());
  276. ASSERT_EQ(EXP_RESPONSE, res.message());
  277. }
  278. TEST_F(HuluTest, close_in_callback) {
  279. butil::IOBuf request_buf;
  280. butil::IOBuf total_buf;
  281. brpc::Controller cntl;
  282. test::EchoRequest req;
  283. // Send request
  284. req.set_message(EXP_REQUEST);
  285. req.set_close_fd(true);
  286. brpc::SerializeRequestDefault(&request_buf, &cntl, &req);
  287. ASSERT_FALSE(cntl.Failed());
  288. brpc::policy::PackHuluRequest(
  289. &total_buf, NULL, cntl.call_id().value,
  290. test::EchoService::descriptor()->method(0), &cntl, request_buf, &_auth);
  291. ASSERT_FALSE(cntl.Failed());
  292. // Handle request
  293. brpc::ParseResult req_pr =
  294. brpc::policy::ParseHuluMessage(&total_buf, NULL, false, NULL);
  295. ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
  296. brpc::InputMessageBase* req_msg = req_pr.message();
  297. ProcessMessage(brpc::policy::ProcessHuluRequest, req_msg, false);
  298. // Socket should be closed
  299. ASSERT_TRUE(_socket->Failed());
  300. }
  301. TEST_F(HuluTest, hulu_compress) {
  302. TestHuluCompress(brpc::COMPRESS_TYPE_SNAPPY);
  303. TestHuluCompress(brpc::COMPRESS_TYPE_GZIP);
  304. TestHuluCompress(brpc::COMPRESS_TYPE_ZLIB);
  305. }
  306. } //namespace