123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- /*
- Copyright (c) 2020 Sogou, Inc.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- #include <stdio.h>
- #include <sys/types.h>
- #include <sys/stat.h>
- #include <fcntl.h>
- #include <string.h>
- #include <string>
- #include <gtest/gtest.h>
- #include "workflow/WFOperator.h"
- #include "workflow/WFFacilities.h"
- #include "test_pb.srpc.h"
- #include "test_thrift.srpc.h"
- using namespace srpc;
- using namespace unit;
- class ForceShutdown
- {
- public:
- ~ForceShutdown() { google::protobuf::ShutdownProtobufLibrary(); }
- } g_holder;
- class TestPBServiceImpl : public TestPB::Service
- {
- public:
- void Add(AddRequest *request, AddResponse *response, RPCContext *ctx) override
- {
- response->set_c(request->a() + request->b());
- }
- void Substr(SubstrRequest *request, SubstrResponse *response, RPCContext *ctx) override
- {
- if (request->has_length())
- response->set_str(std::string(request->str(), request->idx(), request->length()));
- else
- response->set_str(std::string(request->str(), request->idx()));
- }
- };
- class TestThriftServiceImpl : public TestThrift::Service
- {
- public:
- int32_t add(const int32_t a, const int32_t b) override
- {
- return a + b;
- }
- void substr(std::string& _return, const std::string& str, const int32_t idx, const int32_t length) override
- {
- if (length < 0)
- _return = std::string(str, idx);
- else
- _return = std::string(str, idx, length);
- }
- };
- template<class SERVER, class CLIENT>
- void test_pb(SERVER& server)
- {
- WFFacilities::WaitGroup wg(1);
- RPCClientParams client_params = RPC_CLIENT_PARAMS_DEFAULT;
- TestPBServiceImpl impl;
- server.add_service(&impl);
- EXPECT_TRUE(server.start("127.0.0.1", 9964) == 0) << "server start failed";
- client_params.host = "127.0.0.1";
- client_params.port = 9964;
- CLIENT client(&client_params);
- AddRequest req1;
- req1.set_a(123);
- req1.set_b(456);
- client.Add(&req1, [&client, &wg](AddResponse *response, RPCContext *ctx) {
- EXPECT_EQ(ctx->get_status_code(), RPCStatusOK);
- if (ctx->success())
- {
- EXPECT_EQ(response->c(), 123 + 456);
- SubstrRequest req2;
- req2.set_str("hello world!");
- req2.set_idx(6);
- client.Substr(&req2, [&wg](SubstrResponse *response, RPCContext *ctx) {
- EXPECT_EQ(ctx->get_status_code(), RPCStatusOK);
- EXPECT_TRUE(response->str() == "world!");
- wg.done();
- });
- }
- else
- {
- wg.done();
- }
- });
- wg.wait();
- AddResponse resp1;
- RPCSyncContext ctx1;
- client.Add(&req1, &resp1, &ctx1);
- EXPECT_EQ(ctx1.success, true);
- EXPECT_EQ(resp1.c(), 123 + 456);
- auto fr = client.async_Add(&req1);
- auto res = fr.get();
- EXPECT_EQ(res.second.success, true);
- EXPECT_EQ(res.first.c(), 123 + 456);
- server.stop();
- }
- template<class SERVER, class CLIENT>
- void test_thrift(SERVER& server)
- {
- WFFacilities::WaitGroup wg(1);
- RPCClientParams client_params = RPC_CLIENT_PARAMS_DEFAULT;
- TestThriftServiceImpl impl;
- server.add_service(&impl);
- EXPECT_TRUE(server.start("127.0.0.1", 9965) == 0) << "server start failed";
- client_params.host = "127.0.0.1";
- client_params.port = 9965;
- CLIENT client(&client_params);
- TestThrift::addRequest req1;
- req1.a = 123;
- req1.b = 456;
- client.add(&req1, [&client, &wg](TestThrift::addResponse *response, RPCContext *ctx) {
- EXPECT_EQ(ctx->get_status_code(), RPCStatusOK);
- if (ctx->success())
- {
- EXPECT_EQ(response->result, 123 + 456);
- TestThrift::substrRequest req2;
- req2.str = "hello world!";
- req2.idx = 6;
- req2.length = -1;
- client.substr(&req2, [&wg](TestThrift::substrResponse *response, RPCContext *ctx) {
- EXPECT_EQ(ctx->get_status_code(), RPCStatusOK);
- EXPECT_TRUE(response->result == "world!");
- wg.done();
- });
- }
- else
- {
- wg.done();
- }
- });
- wg.wait();
- int32_t c = client.add(123, 456);
- EXPECT_EQ(client.thrift_last_sync_success(), true);
- EXPECT_EQ(c, 123 + 456);
- client.send_add(123, 456);
- c = client.recv_add();
- EXPECT_EQ(client.thrift_last_sync_success(), true);
- EXPECT_EQ(c, 123 + 456);
- server.stop();
- }
- TEST(SRPC, unittest)
- {
- RPCServerParams server_params = RPC_SERVER_PARAMS_DEFAULT;
- SRPCServer server(&server_params);
- test_pb<SRPCServer, TestPB::SRPCClient>(server);
- test_thrift<SRPCServer, TestThrift::SRPCClient>(server);
- }
- TEST(SRPCHttp, unittest)
- {
- RPCServerParams server_params = RPC_SERVER_PARAMS_DEFAULT;
- SRPCHttpServer server(&server_params);
- test_pb<SRPCHttpServer, TestPB::SRPCHttpClient>(server);
- }
- TEST(BRPC, unittest)
- {
- RPCServerParams server_params = RPC_SERVER_PARAMS_DEFAULT;
- BRPCServer server(&server_params);
- test_pb<BRPCServer, TestPB::BRPCClient>(server);
- }
- TEST(Thrift, unittest)
- {
- RPCServerParams server_params = RPC_SERVER_PARAMS_DEFAULT;
- ThriftServer server(&server_params);
- test_thrift<ThriftServer, TestThrift::ThriftClient>(server);
- }
- TEST(ThriftHttp, unittest)
- {
- RPCServerParams server_params = RPC_SERVER_PARAMS_DEFAULT;
- ThriftHttpServer server(&server_params);
- test_thrift<ThriftHttpServer, TestThrift::ThriftHttpClient>(server);
- }
- TEST(SRPC_COMPRESS, unittest)
- {
- WFFacilities::WaitGroup wg(1);
- RPCServerParams server_params = RPC_SERVER_PARAMS_DEFAULT;
- RPCClientParams client_params = RPC_CLIENT_PARAMS_DEFAULT;
- SRPCServer server(&server_params);
- TestPBServiceImpl impl;
- server.add_service(&impl);
- EXPECT_TRUE(server.start("127.0.0.1", 9964) == 0) << "server start failed";
- client_params.host = "127.0.0.1";
- client_params.port = 9964;
- TestPB::SRPCClient client(&client_params);
- AddRequest req;
- req.set_a(123);
- req.set_b(456);
- auto&& cb = [](AddResponse *response, RPCContext *ctx) {
- EXPECT_EQ(ctx->get_status_code(), RPCStatusOK);
- EXPECT_EQ(ctx->success(), true);
- EXPECT_EQ(response->c(), 123 + 456);
- };
- auto *t1 = client.create_Add_task(cb);
- auto *t2 = client.create_Add_task(cb);
- auto *t3 = client.create_Add_task(cb);
- auto *t4 = client.create_Add_task(cb);
- t1->set_compress_type(RPCCompressSnappy);
- t2->set_compress_type(RPCCompressGzip);
- t3->set_compress_type(RPCCompressZlib);
- t4->set_compress_type(RPCCompressLz4);
- t1->serialize_input(&req);
- t2->serialize_input(&req);
- t3->serialize_input(&req);
- t4->serialize_input(&req);
- auto& par = *t1 * t2 * t3 * t4;
- par.set_callback([&wg](const ParallelWork *par) {
- wg.done();
- });
- par.start();
- wg.wait();
- server.stop();
- }
|