brpc_public_pbrpc_protocol_unittest.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // brpc - A framework to host and access services throughout Baidu.
  18. // Date: Sun Jul 13 15:04:18 CST 2014
  19. #include <sys/ioctl.h>
  20. #include <sys/types.h>
  21. #include <sys/socket.h>
  22. #include <gtest/gtest.h>
  23. #include <gflags/gflags.h>
  24. #include <google/protobuf/descriptor.h>
  25. #include "butil/time.h"
  26. #include "butil/macros.h"
  27. #include "brpc/socket.h"
  28. #include "brpc/acceptor.h"
  29. #include "brpc/server.h"
  30. #include "brpc/policy/public_pbrpc_meta.pb.h"
  31. #include "brpc/policy/public_pbrpc_protocol.h"
  32. #include "brpc/policy/most_common_message.h"
  33. #include "brpc/controller.h"
  34. #include "echo.pb.h"
  35. int main(int argc, char* argv[]) {
  36. testing::InitGoogleTest(&argc, argv);
  37. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
  38. return RUN_ALL_TESTS();
  39. }
  40. namespace {
  41. static const std::string EXP_REQUEST = "hello";
  42. static const std::string EXP_RESPONSE = "world";
  43. static const std::string MOCK_CREDENTIAL = "mock credential";
  44. static const std::string MOCK_USER = "mock user";
  45. class MyAuthenticator : public brpc::Authenticator {
  46. public:
  47. MyAuthenticator() {}
  48. int GenerateCredential(std::string* auth_str) const {
  49. *auth_str = MOCK_CREDENTIAL;
  50. return 0;
  51. }
  52. int VerifyCredential(const std::string& auth_str,
  53. const butil::EndPoint&,
  54. brpc::AuthContext* ctx) const {
  55. EXPECT_EQ(MOCK_CREDENTIAL, auth_str);
  56. ctx->set_user(MOCK_USER);
  57. return 0;
  58. }
  59. };
  60. class MyEchoService : public ::test::EchoService {
  61. void Echo(::google::protobuf::RpcController* cntl_base,
  62. const ::test::EchoRequest* req,
  63. ::test::EchoResponse* res,
  64. ::google::protobuf::Closure* done) {
  65. brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
  66. brpc::ClosureGuard done_guard(done);
  67. if (req->close_fd()) {
  68. cntl->CloseConnection("Close connection according to request");
  69. return;
  70. }
  71. EXPECT_EQ(EXP_REQUEST, req->message());
  72. res->set_message(EXP_RESPONSE);
  73. }
  74. };
  75. class PublicPbrpcTest : public ::testing::Test{
  76. protected:
  77. PublicPbrpcTest() {
  78. EXPECT_EQ(0, _server.AddService(
  79. &_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  80. // Hack: Regard `_server' as running
  81. _server._status = brpc::Server::RUNNING;
  82. _server._options.nshead_service =
  83. new brpc::policy::PublicPbrpcServiceAdaptor;
  84. // public_pbrpc doesn't support authentication
  85. // _server._options.auth = &_auth;
  86. EXPECT_EQ(0, pipe(_pipe_fds));
  87. brpc::SocketId id;
  88. brpc::SocketOptions options;
  89. options.fd = _pipe_fds[1];
  90. EXPECT_EQ(0, brpc::Socket::Create(options, &id));
  91. EXPECT_EQ(0, brpc::Socket::Address(id, &_socket));
  92. };
  93. virtual ~PublicPbrpcTest() {};
  94. virtual void SetUp() {};
  95. virtual void TearDown() {};
  96. void VerifyMessage(brpc::InputMessageBase* msg) {
  97. if (msg->_socket == NULL) {
  98. _socket->ReAddress(&msg->_socket);
  99. }
  100. msg->_arg = &_server;
  101. EXPECT_TRUE(brpc::policy::VerifyNsheadRequest(msg));
  102. }
  103. void ProcessMessage(void (*process)(brpc::InputMessageBase*),
  104. brpc::InputMessageBase* msg, bool set_eof) {
  105. if (msg->_socket == NULL) {
  106. _socket->ReAddress(&msg->_socket);
  107. }
  108. msg->_arg = &_server;
  109. _socket->PostponeEOF();
  110. if (set_eof) {
  111. _socket->SetEOF();
  112. }
  113. (*process)(msg);
  114. }
  115. brpc::policy::MostCommonMessage* MakeRequestMessage(
  116. brpc::policy::PublicPbrpcRequest* meta) {
  117. brpc::policy::MostCommonMessage* msg =
  118. brpc::policy::MostCommonMessage::Get();
  119. brpc::nshead_t head;
  120. msg->meta.append(&head, sizeof(head));
  121. if (meta->requestbody_size() > 0) {
  122. test::EchoRequest req;
  123. req.set_message(EXP_REQUEST);
  124. EXPECT_TRUE(req.SerializeToString(
  125. meta->mutable_requestbody(0)->mutable_serialized_request()));
  126. }
  127. butil::IOBufAsZeroCopyOutputStream meta_stream(&msg->payload);
  128. EXPECT_TRUE(meta->SerializeToZeroCopyStream(&meta_stream));
  129. return msg;
  130. }
  131. brpc::policy::MostCommonMessage* MakeResponseMessage(
  132. brpc::policy::PublicPbrpcResponse* meta) {
  133. brpc::policy::MostCommonMessage* msg =
  134. brpc::policy::MostCommonMessage::Get();
  135. brpc::nshead_t head;
  136. msg->meta.append(&head, sizeof(head));
  137. if (meta->responsebody_size() > 0) {
  138. test::EchoResponse res;
  139. res.set_message(EXP_RESPONSE);
  140. EXPECT_TRUE(res.SerializeToString(
  141. meta->mutable_responsebody(0)->mutable_serialized_response()));
  142. }
  143. butil::IOBufAsZeroCopyOutputStream meta_stream(&msg->payload);
  144. EXPECT_TRUE(meta->SerializeToZeroCopyStream(&meta_stream));
  145. return msg;
  146. }
  147. void CheckResponseCode(bool expect_empty, int expect_code) {
  148. int bytes_in_pipe = 0;
  149. ioctl(_pipe_fds[0], FIONREAD, &bytes_in_pipe);
  150. if (expect_empty) {
  151. EXPECT_EQ(0, bytes_in_pipe);
  152. return;
  153. }
  154. EXPECT_GT(bytes_in_pipe, 0);
  155. butil::IOPortal buf;
  156. EXPECT_EQ((ssize_t)bytes_in_pipe,
  157. buf.append_from_file_descriptor(_pipe_fds[0], 1024));
  158. brpc::ParseResult pr = brpc::policy::ParseNsheadMessage(&buf, NULL, false, NULL);
  159. EXPECT_EQ(brpc::PARSE_OK, pr.error());
  160. brpc::policy::MostCommonMessage* msg =
  161. static_cast<brpc::policy::MostCommonMessage*>(pr.message());
  162. brpc::policy::PublicPbrpcResponse meta;
  163. butil::IOBufAsZeroCopyInputStream meta_stream(msg->payload);
  164. EXPECT_TRUE(meta.ParseFromZeroCopyStream(&meta_stream));
  165. EXPECT_EQ(expect_code, meta.responsehead().code());
  166. }
  167. int _pipe_fds[2];
  168. brpc::SocketUniquePtr _socket;
  169. brpc::Server _server;
  170. MyEchoService _svc;
  171. MyAuthenticator _auth;
  172. };
  173. TEST_F(PublicPbrpcTest, process_request_failed_socket) {
  174. brpc::policy::PublicPbrpcRequest meta;
  175. brpc::policy::RequestBody* body = meta.add_requestbody();
  176. body->set_service("EchoService");
  177. body->set_method_id(0);
  178. body->set_id(0);
  179. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(&meta);
  180. _socket->SetFailed();
  181. ProcessMessage(brpc::policy::ProcessNsheadRequest, msg, false);
  182. ASSERT_EQ(0ll, _server._nerror_bvar.get_value());
  183. CheckResponseCode(true, 0);
  184. }
  185. TEST_F(PublicPbrpcTest, process_request_logoff) {
  186. brpc::policy::PublicPbrpcRequest meta;
  187. brpc::policy::RequestBody* body = meta.add_requestbody();
  188. body->set_service("EchoService");
  189. body->set_method_id(0);
  190. body->set_id(0);
  191. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(&meta);
  192. _server._status = brpc::Server::READY;
  193. ProcessMessage(brpc::policy::ProcessNsheadRequest, msg, false);
  194. ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
  195. CheckResponseCode(false, brpc::ELOGOFF);
  196. }
  197. TEST_F(PublicPbrpcTest, process_request_wrong_method) {
  198. brpc::policy::PublicPbrpcRequest meta;
  199. brpc::policy::RequestBody* body = meta.add_requestbody();
  200. body->set_service("EchoService");
  201. body->set_method_id(10);
  202. body->set_id(0);
  203. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(&meta);
  204. ProcessMessage(brpc::policy::ProcessNsheadRequest, msg, false);
  205. ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
  206. ASSERT_FALSE(_socket->Failed());
  207. }
  208. TEST_F(PublicPbrpcTest, process_response_after_eof) {
  209. brpc::policy::PublicPbrpcResponse meta;
  210. test::EchoResponse res;
  211. brpc::Controller cntl;
  212. brpc::policy::ResponseBody* body = meta.add_responsebody();
  213. body->set_id(cntl.call_id().value);
  214. meta.mutable_responsehead()->set_code(0);
  215. cntl._response = &res;
  216. brpc::policy::MostCommonMessage* msg = MakeResponseMessage(&meta);
  217. ProcessMessage(brpc::policy::ProcessPublicPbrpcResponse, msg, true);
  218. ASSERT_EQ(EXP_RESPONSE, res.message());
  219. ASSERT_TRUE(_socket->Failed());
  220. }
  221. TEST_F(PublicPbrpcTest, process_response_error_code) {
  222. const int ERROR_CODE = 12345;
  223. brpc::policy::PublicPbrpcResponse meta;
  224. brpc::Controller cntl;
  225. brpc::policy::ResponseBody* body = meta.add_responsebody();
  226. body->set_id(cntl.call_id().value);
  227. meta.mutable_responsehead()->set_code(ERROR_CODE);
  228. brpc::policy::MostCommonMessage* msg = MakeResponseMessage(&meta);
  229. ProcessMessage(brpc::policy::ProcessPublicPbrpcResponse, msg, false);
  230. ASSERT_EQ(ERROR_CODE, cntl.ErrorCode());
  231. }
  232. TEST_F(PublicPbrpcTest, complete_flow) {
  233. butil::IOBuf request_buf;
  234. butil::IOBuf total_buf;
  235. brpc::Controller cntl;
  236. test::EchoRequest req;
  237. test::EchoResponse res;
  238. cntl._response = &res;
  239. // Send request
  240. req.set_message(EXP_REQUEST);
  241. cntl.set_request_compress_type(brpc::COMPRESS_TYPE_SNAPPY);
  242. brpc::policy::SerializePublicPbrpcRequest(&request_buf, &cntl, &req);
  243. ASSERT_FALSE(cntl.Failed());
  244. brpc::policy::PackPublicPbrpcRequest(
  245. &total_buf, NULL, cntl.call_id().value,
  246. test::EchoService::descriptor()->method(0), &cntl, request_buf, &_auth);
  247. ASSERT_FALSE(cntl.Failed());
  248. // Verify and handle request
  249. brpc::ParseResult req_pr =
  250. brpc::policy::ParseNsheadMessage(&total_buf, NULL, false, NULL);
  251. ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
  252. brpc::InputMessageBase* req_msg = req_pr.message();
  253. VerifyMessage(req_msg);
  254. ProcessMessage(brpc::policy::ProcessNsheadRequest, req_msg, false);
  255. // Read response from pipe
  256. butil::IOPortal response_buf;
  257. response_buf.append_from_file_descriptor(_pipe_fds[0], 1024);
  258. brpc::ParseResult res_pr =
  259. brpc::policy::ParseNsheadMessage(&response_buf, NULL, false, NULL);
  260. ASSERT_EQ(brpc::PARSE_OK, res_pr.error());
  261. brpc::InputMessageBase* res_msg = res_pr.message();
  262. ProcessMessage(brpc::policy::ProcessPublicPbrpcResponse, res_msg, false);
  263. ASSERT_FALSE(cntl.Failed());
  264. ASSERT_EQ(EXP_RESPONSE, res.message());
  265. }
  266. TEST_F(PublicPbrpcTest, close_in_callback) {
  267. butil::IOBuf request_buf;
  268. butil::IOBuf total_buf;
  269. brpc::Controller cntl;
  270. test::EchoRequest req;
  271. // Send request
  272. req.set_message(EXP_REQUEST);
  273. req.set_close_fd(true);
  274. brpc::policy::SerializePublicPbrpcRequest(&request_buf, &cntl, &req);
  275. ASSERT_FALSE(cntl.Failed());
  276. brpc::policy::PackPublicPbrpcRequest(
  277. &total_buf, NULL, cntl.call_id().value,
  278. test::EchoService::descriptor()->method(0), &cntl, request_buf, &_auth);
  279. ASSERT_FALSE(cntl.Failed());
  280. // Handle request
  281. brpc::ParseResult req_pr =
  282. brpc::policy::ParseNsheadMessage(&total_buf, NULL, false, NULL);
  283. ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
  284. brpc::InputMessageBase* req_msg = req_pr.message();
  285. ProcessMessage(brpc::policy::ProcessNsheadRequest, req_msg, false);
  286. // Socket should be closed
  287. ASSERT_TRUE(_socket->Failed());
  288. }
  289. } //namespace