brpc_hulu_pbrpc_protocol_unittest.cpp 11 KB


  1. // Baidu RPC - A framework to host and access services throughout Baidu.
  2. // Copyright (c) 2014 Baidu, Inc.
  3. // Date: Sun Jul 13 15:04:18 CST 2014
  4. #include <sys/ioctl.h>
  5. #include <sys/types.h>
  6. #include <sys/socket.h>
  7. #include <gtest/gtest.h>
  8. #include <gperftools/profiler.h>
  9. #include <gflags/gflags.h>
  10. #include <google/protobuf/descriptor.h>
  11. #include "butil/time.h"
  12. #include "butil/macros.h"
  13. #include "brpc/socket.h"
  14. #include "brpc/acceptor.h"
  15. #include "brpc/server.h"
  16. #include "brpc/policy/hulu_pbrpc_meta.pb.h"
  17. #include "brpc/policy/hulu_pbrpc_protocol.h"
  18. #include "brpc/policy/most_common_message.h"
  19. #include "brpc/controller.h"
  20. #include "echo.pb.h"
  21. int main(int argc, char* argv[]) {
  22. testing::InitGoogleTest(&argc, argv);
  23. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
  24. return RUN_ALL_TESTS();
  25. }
  26. namespace {
  27. static const std::string EXP_REQUEST = "hello";
  28. static const std::string EXP_RESPONSE = "world";
  29. static const std::string MOCK_CREDENTIAL = "mock credential";
  30. static const std::string MOCK_USER = "mock user";
  31. class MyAuthenticator : public brpc::Authenticator {
  32. public:
  33. MyAuthenticator() {}
  34. int GenerateCredential(std::string* auth_str) const {
  35. *auth_str = MOCK_CREDENTIAL;
  36. return 0;
  37. }
  38. int VerifyCredential(const std::string& auth_str,
  39. const butil::EndPoint&,
  40. brpc::AuthContext* ctx) const {
  41. EXPECT_EQ(MOCK_CREDENTIAL, auth_str);
  42. ctx->set_user(MOCK_USER);
  43. return 0;
  44. }
  45. };
  46. class MyEchoService : public ::test::EchoService {
  47. void Echo(::google::protobuf::RpcController* cntl_base,
  48. const ::test::EchoRequest* req,
  49. ::test::EchoResponse* res,
  50. ::google::protobuf::Closure* done) {
  51. brpc::Controller* cntl =
  52. static_cast<brpc::Controller*>(cntl_base);
  53. brpc::ClosureGuard done_guard(done);
  54. if (req->close_fd()) {
  55. cntl->CloseConnection("Close connection according to request");
  56. return;
  57. }
  58. if (cntl->auth_context()) {
  59. EXPECT_EQ(MOCK_USER, cntl->auth_context()->user());
  60. }
  61. EXPECT_EQ(EXP_REQUEST, req->message());
  62. if (!cntl->request_attachment().empty()) {
  63. EXPECT_EQ(EXP_REQUEST, cntl->request_attachment().to_string());
  64. cntl->response_attachment().append(EXP_RESPONSE);
  65. }
  66. res->set_message(EXP_RESPONSE);
  67. }
  68. };
  69. class HuluTest : public ::testing::Test{
  70. protected:
  71. HuluTest() {
  72. EXPECT_EQ(0, _server.AddService(
  73. &_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  74. // Hack: Regard `_server' as running
  75. _server._status = brpc::Server::RUNNING;
  76. _server._options.auth = &_auth;
  77. EXPECT_EQ(0, pipe(_pipe_fds));
  78. brpc::SocketId id;
  79. brpc::SocketOptions options;
  80. options.fd = _pipe_fds[1];
  81. EXPECT_EQ(0, brpc::Socket::Create(options, &id));
  82. EXPECT_EQ(0, brpc::Socket::Address(id, &_socket));
  83. };
  84. virtual ~HuluTest() {};
  85. virtual void SetUp() {};
  86. virtual void TearDown() {};
  87. void VerifyMessage(brpc::InputMessageBase* msg) {
  88. if (msg->_socket == NULL) {
  89. _socket->ReAddress(&msg->_socket);
  90. }
  91. msg->_arg = &_server;
  92. EXPECT_TRUE(brpc::policy::VerifyHuluRequest(msg));
  93. }
  94. void ProcessMessage(void (*process)(brpc::InputMessageBase*),
  95. brpc::InputMessageBase* msg, bool set_eof) {
  96. if (msg->_socket == NULL) {
  97. _socket.get()->ReAddress(&msg->_socket);
  98. }
  99. msg->_arg = &_server;
  100. _socket->PostponeEOF();
  101. if (set_eof) {
  102. _socket->SetEOF();
  103. }
  104. (*process)(msg);
  105. }
  106. brpc::policy::MostCommonMessage* MakeRequestMessage(
  107. const brpc::policy::HuluRpcRequestMeta& meta) {
  108. brpc::policy::MostCommonMessage* msg =
  109. brpc::policy::MostCommonMessage::Get();
  110. butil::IOBufAsZeroCopyOutputStream meta_stream(&msg->meta);
  111. EXPECT_TRUE(meta.SerializeToZeroCopyStream(&meta_stream));
  112. test::EchoRequest req;
  113. req.set_message(EXP_REQUEST);
  114. butil::IOBufAsZeroCopyOutputStream req_stream(&msg->payload);
  115. EXPECT_TRUE(req.SerializeToZeroCopyStream(&req_stream));
  116. return msg;
  117. }
  118. brpc::policy::MostCommonMessage* MakeResponseMessage(
  119. const brpc::policy::HuluRpcResponseMeta& meta) {
  120. brpc::policy::MostCommonMessage* msg =
  121. brpc::policy::MostCommonMessage::Get();
  122. butil::IOBufAsZeroCopyOutputStream meta_stream(&msg->meta);
  123. EXPECT_TRUE(meta.SerializeToZeroCopyStream(&meta_stream));
  124. test::EchoResponse res;
  125. res.set_message(EXP_RESPONSE);
  126. butil::IOBufAsZeroCopyOutputStream res_stream(&msg->payload);
  127. EXPECT_TRUE(res.SerializeToZeroCopyStream(&res_stream));
  128. return msg;
  129. }
  130. void CheckResponseCode(bool expect_empty, int expect_code) {
  131. int bytes_in_pipe = 0;
  132. ioctl(_pipe_fds[0], FIONREAD, &bytes_in_pipe);
  133. if (expect_empty) {
  134. EXPECT_EQ(0, bytes_in_pipe);
  135. return;
  136. }
  137. EXPECT_GT(bytes_in_pipe, 0);
  138. butil::IOPortal buf;
  139. EXPECT_EQ((ssize_t)bytes_in_pipe,
  140. buf.append_from_file_descriptor(_pipe_fds[0], 1024));
  141. brpc::ParseResult pr = brpc::policy::ParseHuluMessage(&buf, NULL, false, NULL);
  142. EXPECT_EQ(brpc::PARSE_OK, pr.error());
  143. brpc::policy::MostCommonMessage* msg =
  144. static_cast<brpc::policy::MostCommonMessage*>(pr.message());
  145. brpc::policy::HuluRpcResponseMeta meta;
  146. butil::IOBufAsZeroCopyInputStream meta_stream(msg->meta);
  147. EXPECT_TRUE(meta.ParseFromZeroCopyStream(&meta_stream));
  148. EXPECT_EQ(expect_code, meta.error_code());
  149. }
  150. void TestHuluCompress(brpc::CompressType type) {
  151. butil::IOBuf request_buf;
  152. butil::IOBuf total_buf;
  153. brpc::Controller cntl;
  154. test::EchoRequest req;
  155. test::EchoResponse res;
  156. cntl._response = &res;
  157. req.set_message(EXP_REQUEST);
  158. cntl.set_request_compress_type(type);
  159. brpc::SerializeRequestDefault(&request_buf, &cntl, &req);
  160. ASSERT_FALSE(cntl.Failed());
  161. brpc::policy::PackHuluRequest(
  162. &total_buf, NULL, cntl.call_id().value,
  163. test::EchoService::descriptor()->method(0),
  164. &cntl, request_buf, &_auth);
  165. ASSERT_FALSE(cntl.Failed());
  166. brpc::ParseResult req_pr =
  167. brpc::policy::ParseHuluMessage(&total_buf, NULL, false, NULL);
  168. ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
  169. brpc::InputMessageBase* req_msg = req_pr.message();
  170. ProcessMessage(brpc::policy::ProcessHuluRequest, req_msg, false);
  171. CheckResponseCode(false, 0);
  172. }
  173. int _pipe_fds[2];
  174. brpc::SocketUniquePtr _socket;
  175. brpc::Server _server;
  176. MyEchoService _svc;
  177. MyAuthenticator _auth;
  178. };
  179. TEST_F(HuluTest, process_request_failed_socket) {
  180. brpc::policy::HuluRpcRequestMeta meta;
  181. meta.set_service_name("EchoService");
  182. meta.set_method_index(0);
  183. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
  184. _socket->SetFailed();
  185. ProcessMessage(brpc::policy::ProcessHuluRequest, msg, false);
  186. ASSERT_EQ(0ll, _server._nerror.get_value());
  187. CheckResponseCode(true, 0);
  188. }
  189. TEST_F(HuluTest, process_request_logoff) {
  190. brpc::policy::HuluRpcRequestMeta meta;
  191. meta.set_service_name("EchoService");
  192. meta.set_method_index(0);
  193. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
  194. _server._status = brpc::Server::READY;
  195. ProcessMessage(brpc::policy::ProcessHuluRequest, msg, false);
  196. ASSERT_EQ(1ll, _server._nerror.get_value());
  197. CheckResponseCode(false, brpc::ELOGOFF);
  198. }
  199. TEST_F(HuluTest, process_request_wrong_method) {
  200. brpc::policy::HuluRpcRequestMeta meta;
  201. meta.set_service_name("EchoService");
  202. meta.set_method_index(10);
  203. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
  204. ProcessMessage(brpc::policy::ProcessHuluRequest, msg, false);
  205. ASSERT_EQ(1ll, _server._nerror.get_value());
  206. CheckResponseCode(false, brpc::ENOMETHOD);
  207. }
  208. TEST_F(HuluTest, process_response_after_eof) {
  209. brpc::policy::HuluRpcResponseMeta meta;
  210. test::EchoResponse res;
  211. brpc::Controller cntl;
  212. meta.set_correlation_id(cntl.call_id().value);
  213. cntl._response = &res;
  214. brpc::policy::MostCommonMessage* msg = MakeResponseMessage(meta);
  215. ProcessMessage(brpc::policy::ProcessHuluResponse, msg, true);
  216. ASSERT_EQ(EXP_RESPONSE, res.message());
  217. ASSERT_TRUE(_socket->Failed());
  218. }
  219. TEST_F(HuluTest, process_response_error_code) {
  220. const int ERROR_CODE = 12345;
  221. brpc::policy::HuluRpcResponseMeta meta;
  222. brpc::Controller cntl;
  223. meta.set_correlation_id(cntl.call_id().value);
  224. meta.set_error_code(ERROR_CODE);
  225. brpc::policy::MostCommonMessage* msg = MakeResponseMessage(meta);
  226. ProcessMessage(brpc::policy::ProcessHuluResponse, msg, false);
  227. ASSERT_EQ(ERROR_CODE, cntl.ErrorCode());
  228. }
  229. TEST_F(HuluTest, complete_flow) {
  230. butil::IOBuf request_buf;
  231. butil::IOBuf total_buf;
  232. brpc::Controller cntl;
  233. test::EchoRequest req;
  234. test::EchoResponse res;
  235. cntl._response = &res;
  236. // Send request
  237. req.set_message(EXP_REQUEST);
  238. brpc::SerializeRequestDefault(&request_buf, &cntl, &req);
  239. ASSERT_FALSE(cntl.Failed());
  240. cntl.request_attachment().append(EXP_REQUEST);
  241. brpc::policy::PackHuluRequest(
  242. &total_buf, NULL, cntl.call_id().value,
  243. test::EchoService::descriptor()->method(0), &cntl, request_buf, &_auth);
  244. ASSERT_FALSE(cntl.Failed());
  245. // Verify and handle request
  246. brpc::ParseResult req_pr =
  247. brpc::policy::ParseHuluMessage(&total_buf, NULL, false, NULL);
  248. ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
  249. brpc::InputMessageBase* req_msg = req_pr.message();
  250. VerifyMessage(req_msg);
  251. ProcessMessage(brpc::policy::ProcessHuluRequest, req_msg, false);
  252. // Read response from pipe
  253. butil::IOPortal response_buf;
  254. response_buf.append_from_file_descriptor(_pipe_fds[0], 1024);
  255. brpc::ParseResult res_pr =
  256. brpc::policy::ParseHuluMessage(&response_buf, NULL, false, NULL);
  257. ASSERT_EQ(brpc::PARSE_OK, res_pr.error());
  258. brpc::InputMessageBase* res_msg = res_pr.message();
  259. ProcessMessage(brpc::policy::ProcessHuluResponse, res_msg, false);
  260. ASSERT_FALSE(cntl.Failed());
  261. ASSERT_EQ(EXP_RESPONSE, res.message());
  262. }
  263. TEST_F(HuluTest, close_in_callback) {
  264. butil::IOBuf request_buf;
  265. butil::IOBuf total_buf;
  266. brpc::Controller cntl;
  267. test::EchoRequest req;
  268. // Send request
  269. req.set_message(EXP_REQUEST);
  270. req.set_close_fd(true);
  271. brpc::SerializeRequestDefault(&request_buf, &cntl, &req);
  272. ASSERT_FALSE(cntl.Failed());
  273. brpc::policy::PackHuluRequest(
  274. &total_buf, NULL, cntl.call_id().value,
  275. test::EchoService::descriptor()->method(0), &cntl, request_buf, &_auth);
  276. ASSERT_FALSE(cntl.Failed());
  277. // Handle request
  278. brpc::ParseResult req_pr =
  279. brpc::policy::ParseHuluMessage(&total_buf, NULL, false, NULL);
  280. ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
  281. brpc::InputMessageBase* req_msg = req_pr.message();
  282. ProcessMessage(brpc::policy::ProcessHuluRequest, req_msg, false);
  283. // Socket should be closed
  284. ASSERT_TRUE(_socket->Failed());
  285. }
  286. TEST_F(HuluTest, hulu_compress) {
  287. TestHuluCompress(brpc::COMPRESS_TYPE_SNAPPY);
  288. TestHuluCompress(brpc::COMPRESS_TYPE_GZIP);
  289. TestHuluCompress(brpc::COMPRESS_TYPE_ZLIB);
  290. }
  291. } //namespace