brpc_public_pbrpc_protocol_unittest.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. // Baidu RPC - A framework to host and access services throughout Baidu.
  2. // Copyright (c) 2014 Baidu, Inc.
  3. // Date: Sun Jul 13 15:04:18 CST 2014
  4. #include <sys/ioctl.h>
  5. #include <sys/types.h>
  6. #include <sys/socket.h>
  7. #include <gtest/gtest.h>
  8. #include <gperftools/profiler.h>
  9. #include <gflags/gflags.h>
  10. #include <google/protobuf/descriptor.h>
  11. #include "butil/time.h"
  12. #include "butil/macros.h"
  13. #include "brpc/socket.h"
  14. #include "brpc/acceptor.h"
  15. #include "brpc/server.h"
  16. #include "brpc/policy/public_pbrpc_meta.pb.h"
  17. #include "brpc/policy/public_pbrpc_protocol.h"
  18. #include "brpc/policy/most_common_message.h"
  19. #include "brpc/controller.h"
  20. #include "echo.pb.h"
  21. int main(int argc, char* argv[]) {
  22. testing::InitGoogleTest(&argc, argv);
  23. google::ParseCommandLineFlags(&argc, &argv, true);
  24. return RUN_ALL_TESTS();
  25. }
  26. namespace {
  27. void* RunClosure(void* arg) {
  28. google::protobuf::Closure* done = (google::protobuf::Closure*)arg;
  29. done->Run();
  30. return NULL;
  31. }
  32. static const std::string EXP_REQUEST = "hello";
  33. static const std::string EXP_RESPONSE = "world";
  34. static const std::string MOCK_CREDENTIAL = "mock credential";
  35. static const std::string MOCK_USER = "mock user";
  36. class MyAuthenticator : public brpc::Authenticator {
  37. public:
  38. MyAuthenticator() {}
  39. int GenerateCredential(std::string* auth_str) const {
  40. *auth_str = MOCK_CREDENTIAL;
  41. return 0;
  42. }
  43. int VerifyCredential(const std::string& auth_str,
  44. const butil::EndPoint&,
  45. brpc::AuthContext* ctx) const {
  46. EXPECT_EQ(MOCK_CREDENTIAL, auth_str);
  47. ctx->set_user(MOCK_USER);
  48. return 0;
  49. }
  50. };
  51. class MyEchoService : public ::test::EchoService {
  52. void Echo(::google::protobuf::RpcController* cntl_base,
  53. const ::test::EchoRequest* req,
  54. ::test::EchoResponse* res,
  55. ::google::protobuf::Closure* done) {
  56. brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
  57. brpc::ClosureGuard done_guard(done);
  58. if (req->close_fd()) {
  59. cntl->CloseConnection("Close connection according to request");
  60. return;
  61. }
  62. EXPECT_EQ(EXP_REQUEST, req->message());
  63. res->set_message(EXP_RESPONSE);
  64. }
  65. };
  66. class PublicPbrpcTest : public ::testing::Test{
  67. protected:
  68. PublicPbrpcTest() {
  69. EXPECT_EQ(0, _server.AddService(
  70. &_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  71. // Hack: Regard `_server' as running
  72. _server._status = brpc::Server::RUNNING;
  73. _server._options.nshead_service =
  74. new brpc::policy::PublicPbrpcServiceAdaptor;
  75. // public_pbrpc doesn't support authentication
  76. // _server._options.auth = &_auth;
  77. EXPECT_EQ(0, pipe(_pipe_fds));
  78. brpc::SocketId id;
  79. brpc::SocketOptions options;
  80. options.fd = _pipe_fds[1];
  81. EXPECT_EQ(0, brpc::Socket::Create(options, &id));
  82. EXPECT_EQ(0, brpc::Socket::Address(id, &_socket));
  83. };
  84. virtual ~PublicPbrpcTest() {};
  85. virtual void SetUp() {};
  86. virtual void TearDown() {};
  87. void VerifyMessage(brpc::InputMessageBase* msg) {
  88. if (msg->_socket == NULL) {
  89. _socket->ReAddress(&msg->_socket);
  90. }
  91. msg->_arg = &_server;
  92. EXPECT_TRUE(brpc::policy::VerifyNsheadRequest(msg));
  93. }
  94. void ProcessMessage(void (*process)(brpc::InputMessageBase*),
  95. brpc::InputMessageBase* msg, bool set_eof) {
  96. if (msg->_socket == NULL) {
  97. _socket->ReAddress(&msg->_socket);
  98. }
  99. msg->_arg = &_server;
  100. _socket->PostponeEOF();
  101. if (set_eof) {
  102. _socket->SetEOF();
  103. }
  104. (*process)(msg);
  105. }
  106. brpc::policy::MostCommonMessage* MakeRequestMessage(
  107. brpc::policy::PublicPbrpcRequest* meta) {
  108. brpc::policy::MostCommonMessage* msg =
  109. brpc::policy::MostCommonMessage::Get();
  110. brpc::nshead_t head;
  111. msg->meta.append(&head, sizeof(head));
  112. if (meta->requestbody_size() > 0) {
  113. test::EchoRequest req;
  114. req.set_message(EXP_REQUEST);
  115. EXPECT_TRUE(req.SerializeToString(
  116. meta->mutable_requestbody(0)->mutable_serialized_request()));
  117. }
  118. butil::IOBufAsZeroCopyOutputStream meta_stream(&msg->payload);
  119. EXPECT_TRUE(meta->SerializeToZeroCopyStream(&meta_stream));
  120. return msg;
  121. }
  122. brpc::policy::MostCommonMessage* MakeResponseMessage(
  123. brpc::policy::PublicPbrpcResponse* meta) {
  124. brpc::policy::MostCommonMessage* msg =
  125. brpc::policy::MostCommonMessage::Get();
  126. brpc::nshead_t head;
  127. msg->meta.append(&head, sizeof(head));
  128. if (meta->responsebody_size() > 0) {
  129. test::EchoResponse res;
  130. res.set_message(EXP_RESPONSE);
  131. EXPECT_TRUE(res.SerializeToString(
  132. meta->mutable_responsebody(0)->mutable_serialized_response()));
  133. }
  134. butil::IOBufAsZeroCopyOutputStream meta_stream(&msg->payload);
  135. EXPECT_TRUE(meta->SerializeToZeroCopyStream(&meta_stream));
  136. return msg;
  137. }
  138. void CheckResponseCode(bool expect_empty, int expect_code) {
  139. int bytes_in_pipe = 0;
  140. ioctl(_pipe_fds[0], FIONREAD, &bytes_in_pipe);
  141. if (expect_empty) {
  142. EXPECT_EQ(0, bytes_in_pipe);
  143. return;
  144. }
  145. EXPECT_GT(bytes_in_pipe, 0);
  146. butil::IOPortal buf;
  147. EXPECT_EQ((ssize_t)bytes_in_pipe,
  148. buf.append_from_file_descriptor(_pipe_fds[0], 1024));
  149. brpc::ParseResult pr = brpc::policy::ParseNsheadMessage(&buf, NULL, false, NULL);
  150. EXPECT_EQ(brpc::PARSE_OK, pr.error());
  151. brpc::policy::MostCommonMessage* msg =
  152. static_cast<brpc::policy::MostCommonMessage*>(pr.message());
  153. brpc::policy::PublicPbrpcResponse meta;
  154. butil::IOBufAsZeroCopyInputStream meta_stream(msg->payload);
  155. EXPECT_TRUE(meta.ParseFromZeroCopyStream(&meta_stream));
  156. EXPECT_EQ(expect_code, meta.responsehead().code());
  157. }
  158. int _pipe_fds[2];
  159. brpc::SocketUniquePtr _socket;
  160. brpc::Server _server;
  161. MyEchoService _svc;
  162. MyAuthenticator _auth;
  163. };
  164. TEST_F(PublicPbrpcTest, process_request_failed_socket) {
  165. brpc::policy::PublicPbrpcRequest meta;
  166. brpc::policy::RequestBody* body = meta.add_requestbody();
  167. body->set_service("EchoService");
  168. body->set_method_id(0);
  169. body->set_id(0);
  170. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(&meta);
  171. _socket->SetFailed();
  172. ProcessMessage(brpc::policy::ProcessNsheadRequest, msg, false);
  173. ASSERT_EQ(0ll, _server._nerror.get_value());
  174. CheckResponseCode(true, 0);
  175. }
  176. TEST_F(PublicPbrpcTest, process_request_logoff) {
  177. brpc::policy::PublicPbrpcRequest meta;
  178. brpc::policy::RequestBody* body = meta.add_requestbody();
  179. body->set_service("EchoService");
  180. body->set_method_id(0);
  181. body->set_id(0);
  182. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(&meta);
  183. _server._status = brpc::Server::READY;
  184. ProcessMessage(brpc::policy::ProcessNsheadRequest, msg, false);
  185. ASSERT_EQ(1ll, _server._nerror.get_value());
  186. CheckResponseCode(false, brpc::ELOGOFF);
  187. }
  188. TEST_F(PublicPbrpcTest, process_request_wrong_method) {
  189. brpc::policy::PublicPbrpcRequest meta;
  190. brpc::policy::RequestBody* body = meta.add_requestbody();
  191. body->set_service("EchoService");
  192. body->set_method_id(10);
  193. body->set_id(0);
  194. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(&meta);
  195. ProcessMessage(brpc::policy::ProcessNsheadRequest, msg, false);
  196. ASSERT_EQ(1ll, _server._nerror.get_value());
  197. ASSERT_FALSE(_socket->Failed());
  198. }
  199. TEST_F(PublicPbrpcTest, process_response_after_eof) {
  200. brpc::policy::PublicPbrpcResponse meta;
  201. test::EchoResponse res;
  202. brpc::Controller cntl;
  203. brpc::policy::ResponseBody* body = meta.add_responsebody();
  204. body->set_id(cntl.call_id().value);
  205. meta.mutable_responsehead()->set_code(0);
  206. cntl._response = &res;
  207. brpc::policy::MostCommonMessage* msg = MakeResponseMessage(&meta);
  208. ProcessMessage(brpc::policy::ProcessPublicPbrpcResponse, msg, true);
  209. ASSERT_EQ(EXP_RESPONSE, res.message());
  210. ASSERT_TRUE(_socket->Failed());
  211. }
  212. TEST_F(PublicPbrpcTest, process_response_error_code) {
  213. const int ERROR_CODE = 12345;
  214. brpc::policy::PublicPbrpcResponse meta;
  215. brpc::Controller cntl;
  216. brpc::policy::ResponseBody* body = meta.add_responsebody();
  217. body->set_id(cntl.call_id().value);
  218. meta.mutable_responsehead()->set_code(ERROR_CODE);
  219. brpc::policy::MostCommonMessage* msg = MakeResponseMessage(&meta);
  220. ProcessMessage(brpc::policy::ProcessPublicPbrpcResponse, msg, false);
  221. ASSERT_EQ(ERROR_CODE, cntl.ErrorCode());
  222. }
  223. TEST_F(PublicPbrpcTest, complete_flow) {
  224. butil::IOBuf request_buf;
  225. butil::IOBuf total_buf;
  226. brpc::Controller cntl;
  227. test::EchoRequest req;
  228. test::EchoResponse res;
  229. cntl._response = &res;
  230. // Send request
  231. req.set_message(EXP_REQUEST);
  232. cntl.set_request_compress_type(brpc::COMPRESS_TYPE_SNAPPY);
  233. brpc::policy::SerializePublicPbrpcRequest(&request_buf, &cntl, &req);
  234. ASSERT_FALSE(cntl.Failed());
  235. brpc::policy::PackPublicPbrpcRequest(
  236. &total_buf, NULL, cntl.call_id().value,
  237. test::EchoService::descriptor()->method(0), &cntl, request_buf, &_auth);
  238. ASSERT_FALSE(cntl.Failed());
  239. // Verify and handle request
  240. brpc::ParseResult req_pr =
  241. brpc::policy::ParseNsheadMessage(&total_buf, NULL, false, NULL);
  242. ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
  243. brpc::InputMessageBase* req_msg = req_pr.message();
  244. VerifyMessage(req_msg);
  245. ProcessMessage(brpc::policy::ProcessNsheadRequest, req_msg, false);
  246. // Read response from pipe
  247. butil::IOPortal response_buf;
  248. response_buf.append_from_file_descriptor(_pipe_fds[0], 1024);
  249. brpc::ParseResult res_pr =
  250. brpc::policy::ParseNsheadMessage(&response_buf, NULL, false, NULL);
  251. ASSERT_EQ(brpc::PARSE_OK, res_pr.error());
  252. brpc::InputMessageBase* res_msg = res_pr.message();
  253. ProcessMessage(brpc::policy::ProcessPublicPbrpcResponse, res_msg, false);
  254. ASSERT_FALSE(cntl.Failed());
  255. ASSERT_EQ(EXP_RESPONSE, res.message());
  256. }
  257. TEST_F(PublicPbrpcTest, close_in_callback) {
  258. butil::IOBuf request_buf;
  259. butil::IOBuf total_buf;
  260. brpc::Controller cntl;
  261. test::EchoRequest req;
  262. // Send request
  263. req.set_message(EXP_REQUEST);
  264. req.set_close_fd(true);
  265. brpc::policy::SerializePublicPbrpcRequest(&request_buf, &cntl, &req);
  266. ASSERT_FALSE(cntl.Failed());
  267. brpc::policy::PackPublicPbrpcRequest(
  268. &total_buf, NULL, cntl.call_id().value,
  269. test::EchoService::descriptor()->method(0), &cntl, request_buf, &_auth);
  270. ASSERT_FALSE(cntl.Failed());
  271. // Handle request
  272. brpc::ParseResult req_pr =
  273. brpc::policy::ParseNsheadMessage(&total_buf, NULL, false, NULL);
  274. ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
  275. brpc::InputMessageBase* req_msg = req_pr.message();
  276. ProcessMessage(brpc::policy::ProcessNsheadRequest, req_msg, false);
  277. // Socket should be closed
  278. ASSERT_TRUE(_socket->Failed());
  279. }
  280. } //namespace