brpc_sofa_pbrpc_protocol_unittest.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // brpc - A framework to host and access services throughout Baidu.
  18. // Date: Sun Jul 13 15:04:18 CST 2014
  19. #include <sys/ioctl.h>
  20. #include <sys/types.h>
  21. #include <sys/socket.h>
  22. #include <gtest/gtest.h>
  23. #include <gflags/gflags.h>
  24. #include <google/protobuf/descriptor.h>
  25. #include "butil/time.h"
  26. #include "butil/macros.h"
  27. #include "brpc/socket.h"
  28. #include "brpc/acceptor.h"
  29. #include "brpc/server.h"
  30. #include "brpc/policy/sofa_pbrpc_meta.pb.h"
  31. #include "brpc/policy/sofa_pbrpc_protocol.h"
  32. #include "brpc/policy/most_common_message.h"
  33. #include "brpc/controller.h"
  34. #include "echo.pb.h"
  35. int main(int argc, char* argv[]) {
  36. testing::InitGoogleTest(&argc, argv);
  37. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
  38. return RUN_ALL_TESTS();
  39. }
  40. namespace {
  41. static const std::string EXP_REQUEST = "hello";
  42. static const std::string EXP_RESPONSE = "world";
  43. static const std::string MOCK_CREDENTIAL = "mock credential";
  44. static const std::string MOCK_USER = "mock user";
  45. class MyAuthenticator : public brpc::Authenticator {
  46. public:
  47. MyAuthenticator() {}
  48. int GenerateCredential(std::string* auth_str) const {
  49. *auth_str = MOCK_CREDENTIAL;
  50. return 0;
  51. }
  52. int VerifyCredential(const std::string& auth_str,
  53. const butil::EndPoint&,
  54. brpc::AuthContext* ctx) const {
  55. EXPECT_EQ(MOCK_CREDENTIAL, auth_str);
  56. ctx->set_user(MOCK_USER);
  57. return 0;
  58. }
  59. };
  60. class MyEchoService : public ::test::EchoService {
  61. void Echo(::google::protobuf::RpcController* cntl_base,
  62. const ::test::EchoRequest* req,
  63. ::test::EchoResponse* res,
  64. ::google::protobuf::Closure* done) {
  65. brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
  66. brpc::ClosureGuard done_guard(done);
  67. if (req->close_fd()) {
  68. cntl->CloseConnection("Close connection according to request");
  69. return;
  70. }
  71. EXPECT_EQ(EXP_REQUEST, req->message());
  72. res->set_message(EXP_RESPONSE);
  73. }
  74. };
  75. class SofaTest : public ::testing::Test{
  76. protected:
  77. SofaTest() {
  78. EXPECT_EQ(0, _server.AddService(
  79. &_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
  80. // Hack: Regard `_server' as running
  81. _server._status = brpc::Server::RUNNING;
  82. // Sofa doesn't support authentication
  83. // _server._options.auth = &_auth;
  84. EXPECT_EQ(0, pipe(_pipe_fds));
  85. brpc::SocketId id;
  86. brpc::SocketOptions options;
  87. options.fd = _pipe_fds[1];
  88. EXPECT_EQ(0, brpc::Socket::Create(options, &id));
  89. EXPECT_EQ(0, brpc::Socket::Address(id, &_socket));
  90. };
  91. virtual ~SofaTest() {};
  92. virtual void SetUp() {};
  93. virtual void TearDown() {};
  94. void VerifyMessage(brpc::InputMessageBase* msg) {
  95. if (msg->_socket == NULL) {
  96. _socket->ReAddress(&msg->_socket);
  97. }
  98. msg->_arg = &_server;
  99. EXPECT_TRUE(brpc::policy::VerifySofaRequest(msg));
  100. }
  101. void ProcessMessage(void (*process)(brpc::InputMessageBase*),
  102. brpc::InputMessageBase* msg, bool set_eof) {
  103. if (msg->_socket == NULL) {
  104. _socket->ReAddress(&msg->_socket);
  105. }
  106. msg->_arg = &_server;
  107. _socket->PostponeEOF();
  108. if (set_eof) {
  109. _socket->SetEOF();
  110. }
  111. (*process)(msg);
  112. }
  113. brpc::policy::MostCommonMessage* MakeRequestMessage(
  114. const brpc::policy::SofaRpcMeta& meta) {
  115. brpc::policy::MostCommonMessage* msg =
  116. brpc::policy::MostCommonMessage::Get();
  117. butil::IOBufAsZeroCopyOutputStream meta_stream(&msg->meta);
  118. EXPECT_TRUE(meta.SerializeToZeroCopyStream(&meta_stream));
  119. test::EchoRequest req;
  120. req.set_message(EXP_REQUEST);
  121. butil::IOBufAsZeroCopyOutputStream req_stream(&msg->payload);
  122. EXPECT_TRUE(req.SerializeToZeroCopyStream(&req_stream));
  123. return msg;
  124. }
  125. brpc::policy::MostCommonMessage* MakeResponseMessage(
  126. const brpc::policy::SofaRpcMeta& meta) {
  127. brpc::policy::MostCommonMessage* msg =
  128. brpc::policy::MostCommonMessage::Get();
  129. butil::IOBufAsZeroCopyOutputStream meta_stream(&msg->meta);
  130. EXPECT_TRUE(meta.SerializeToZeroCopyStream(&meta_stream));
  131. test::EchoResponse res;
  132. res.set_message(EXP_RESPONSE);
  133. butil::IOBufAsZeroCopyOutputStream res_stream(&msg->payload);
  134. EXPECT_TRUE(res.SerializeToZeroCopyStream(&res_stream));
  135. return msg;
  136. }
  137. void CheckResponseCode(bool expect_empty, int expect_code) {
  138. int bytes_in_pipe = 0;
  139. ioctl(_pipe_fds[0], FIONREAD, &bytes_in_pipe);
  140. if (expect_empty) {
  141. EXPECT_EQ(0, bytes_in_pipe);
  142. return;
  143. }
  144. EXPECT_GT(bytes_in_pipe, 0);
  145. butil::IOPortal buf;
  146. EXPECT_EQ((ssize_t)bytes_in_pipe,
  147. buf.append_from_file_descriptor(_pipe_fds[0], 1024));
  148. brpc::ParseResult pr = brpc::policy::ParseSofaMessage(&buf, NULL, false, NULL);
  149. EXPECT_EQ(brpc::PARSE_OK, pr.error());
  150. brpc::policy::MostCommonMessage* msg =
  151. static_cast<brpc::policy::MostCommonMessage*>(pr.message());
  152. brpc::policy::SofaRpcMeta meta;
  153. butil::IOBufAsZeroCopyInputStream meta_stream(msg->meta);
  154. EXPECT_TRUE(meta.ParseFromZeroCopyStream(&meta_stream));
  155. EXPECT_EQ(expect_code, meta.error_code());
  156. }
  157. void TestSofaCompress(brpc::CompressType type) {
  158. butil::IOBuf request_buf;
  159. butil::IOBuf total_buf;
  160. brpc::Controller cntl;
  161. test::EchoRequest req;
  162. test::EchoResponse res;
  163. cntl._response = &res;
  164. req.set_message(EXP_REQUEST);
  165. cntl.set_request_compress_type(type);
  166. brpc::SerializeRequestDefault(&request_buf, &cntl, &req);
  167. ASSERT_FALSE(cntl.Failed());
  168. brpc::policy::PackSofaRequest(
  169. &total_buf, NULL, cntl.call_id().value,
  170. test::EchoService::descriptor()->method(0),
  171. &cntl, request_buf, &_auth);
  172. ASSERT_FALSE(cntl.Failed());
  173. brpc::ParseResult req_pr =
  174. brpc::policy::ParseSofaMessage(&total_buf, NULL, false, NULL);
  175. ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
  176. brpc::InputMessageBase* req_msg = req_pr.message();
  177. ProcessMessage(brpc::policy::ProcessSofaRequest, req_msg, false);
  178. CheckResponseCode(false, 0);
  179. }
  180. int _pipe_fds[2];
  181. brpc::SocketUniquePtr _socket;
  182. brpc::Server _server;
  183. MyEchoService _svc;
  184. MyAuthenticator _auth;
  185. };
  186. TEST_F(SofaTest, process_request_failed_socket) {
  187. brpc::policy::SofaRpcMeta meta;
  188. meta.set_type(brpc::policy::SofaRpcMeta::REQUEST);
  189. meta.set_sequence_id(0);
  190. meta.set_method("EchoService.Echo");
  191. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
  192. _socket->SetFailed();
  193. ProcessMessage(brpc::policy::ProcessSofaRequest, msg, false);
  194. ASSERT_EQ(0ll, _server._nerror_bvar.get_value());
  195. CheckResponseCode(true, 0);
  196. }
  197. TEST_F(SofaTest, process_request_logoff) {
  198. brpc::policy::SofaRpcMeta meta;
  199. meta.set_type(brpc::policy::SofaRpcMeta::REQUEST);
  200. meta.set_sequence_id(0);
  201. meta.set_method("EchoService.Echo");
  202. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
  203. _server._status = brpc::Server::READY;
  204. ProcessMessage(brpc::policy::ProcessSofaRequest, msg, false);
  205. ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
  206. CheckResponseCode(false, brpc::ELOGOFF);
  207. }
  208. TEST_F(SofaTest, process_request_wrong_method) {
  209. brpc::policy::SofaRpcMeta meta;
  210. meta.set_type(brpc::policy::SofaRpcMeta::REQUEST);
  211. meta.set_sequence_id(0);
  212. meta.set_method("EchoService.NO_SUCH_METHOD");
  213. brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
  214. ProcessMessage(brpc::policy::ProcessSofaRequest, msg, false);
  215. ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
  216. CheckResponseCode(false, brpc::ENOMETHOD);
  217. }
  218. TEST_F(SofaTest, process_response_after_eof) {
  219. brpc::policy::SofaRpcMeta meta;
  220. test::EchoResponse res;
  221. brpc::Controller cntl;
  222. meta.set_type(brpc::policy::SofaRpcMeta::RESPONSE);
  223. meta.set_sequence_id(cntl.call_id().value);
  224. cntl._response = &res;
  225. brpc::policy::MostCommonMessage* msg = MakeResponseMessage(meta);
  226. ProcessMessage(brpc::policy::ProcessSofaResponse, msg, true);
  227. ASSERT_EQ(EXP_RESPONSE, res.message());
  228. ASSERT_TRUE(_socket->Failed());
  229. }
  230. TEST_F(SofaTest, process_response_error_code) {
  231. const int ERROR_CODE = 12345;
  232. brpc::policy::SofaRpcMeta meta;
  233. brpc::Controller cntl;
  234. meta.set_type(brpc::policy::SofaRpcMeta::RESPONSE);
  235. meta.set_sequence_id(cntl.call_id().value);
  236. meta.set_error_code(ERROR_CODE);
  237. brpc::policy::MostCommonMessage* msg = MakeResponseMessage(meta);
  238. ProcessMessage(brpc::policy::ProcessSofaResponse, msg, false);
  239. ASSERT_EQ(ERROR_CODE, cntl.ErrorCode());
  240. }
  241. TEST_F(SofaTest, complete_flow) {
  242. butil::IOBuf request_buf;
  243. butil::IOBuf total_buf;
  244. brpc::Controller cntl;
  245. test::EchoRequest req;
  246. test::EchoResponse res;
  247. cntl._response = &res;
  248. // Send request
  249. req.set_message(EXP_REQUEST);
  250. brpc::SerializeRequestDefault(&request_buf, &cntl, &req);
  251. ASSERT_FALSE(cntl.Failed());
  252. brpc::policy::PackSofaRequest(
  253. &total_buf, NULL, cntl.call_id().value,
  254. test::EchoService::descriptor()->method(0), &cntl, request_buf, &_auth);
  255. ASSERT_FALSE(cntl.Failed());
  256. // Verify and handle request
  257. brpc::ParseResult req_pr =
  258. brpc::policy::ParseSofaMessage(&total_buf, NULL, false, NULL);
  259. ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
  260. brpc::InputMessageBase* req_msg = req_pr.message();
  261. VerifyMessage(req_msg);
  262. ProcessMessage(brpc::policy::ProcessSofaRequest, req_msg, false);
  263. // Read response from pipe
  264. butil::IOPortal response_buf;
  265. response_buf.append_from_file_descriptor(_pipe_fds[0], 1024);
  266. brpc::ParseResult res_pr =
  267. brpc::policy::ParseSofaMessage(&response_buf, NULL, false, NULL);
  268. ASSERT_EQ(brpc::PARSE_OK, res_pr.error());
  269. brpc::InputMessageBase* res_msg = res_pr.message();
  270. ProcessMessage(brpc::policy::ProcessSofaResponse, res_msg, false);
  271. ASSERT_FALSE(cntl.Failed());
  272. ASSERT_EQ(EXP_RESPONSE, res.message());
  273. }
  274. TEST_F(SofaTest, close_in_callback) {
  275. butil::IOBuf request_buf;
  276. butil::IOBuf total_buf;
  277. brpc::Controller cntl;
  278. test::EchoRequest req;
  279. // Send request
  280. req.set_message(EXP_REQUEST);
  281. req.set_close_fd(true);
  282. brpc::SerializeRequestDefault(&request_buf, &cntl, &req);
  283. ASSERT_FALSE(cntl.Failed());
  284. brpc::policy::PackSofaRequest(
  285. &total_buf, NULL, cntl.call_id().value,
  286. test::EchoService::descriptor()->method(0), &cntl, request_buf, &_auth);
  287. ASSERT_FALSE(cntl.Failed());
  288. // Handle request
  289. brpc::ParseResult req_pr =
  290. brpc::policy::ParseSofaMessage(&total_buf, NULL, false, NULL);
  291. ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
  292. brpc::InputMessageBase* req_msg = req_pr.message();
  293. ProcessMessage(brpc::policy::ProcessSofaRequest, req_msg, false);
  294. // Socket should be closed
  295. ASSERT_TRUE(_socket->Failed());
  296. }
  297. TEST_F(SofaTest, sofa_compress) {
  298. TestSofaCompress(brpc::COMPRESS_TYPE_SNAPPY);
  299. TestSofaCompress(brpc::COMPRESS_TYPE_GZIP);
  300. TestSofaCompress(brpc::COMPRESS_TYPE_ZLIB);
  301. }
  302. } //namespace