12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241 |
- // Licensed to the Apache Software Foundation (ASF) under one
- // or more contributor license agreements. See the NOTICE file
- // distributed with this work for additional information
- // regarding copyright ownership. The ASF licenses this file
- // to you under the Apache License, Version 2.0 (the
- // "License"); you may not use this file except in compliance
- // with the License. You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing,
- // software distributed under the License is distributed on an
- // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- // KIND, either express or implied. See the License for the
- // specific language governing permissions and limitations
- // under the License.
- #include <iostream>
- #include <unordered_map>
- #include <butil/time.h>
- #include <butil/logging.h>
- #include <brpc/redis.h>
- #include <brpc/channel.h>
- #include <brpc/policy/redis_authenticator.h>
- #include <brpc/server.h>
- #include <brpc/redis_command.h>
- #include <gtest/gtest.h>
- namespace brpc {
- DECLARE_int32(idle_timeout_second);
- }
- int main(int argc, char* argv[]) {
- brpc::FLAGS_idle_timeout_second = 0;
- testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
- }
- namespace {
- static pthread_once_t download_redis_server_once = PTHREAD_ONCE_INIT;
- static pid_t g_redis_pid = -1;
- static void RemoveRedisServer() {
- if (g_redis_pid > 0) {
- puts("[Stopping redis-server]");
- char cmd[256];
- #if defined(BAIDU_INTERNAL)
- snprintf(cmd, sizeof(cmd), "kill %d; rm -rf redis_server_for_test", g_redis_pid);
- #else
- snprintf(cmd, sizeof(cmd), "kill %d", g_redis_pid);
- #endif
- CHECK(0 == system(cmd));
- // Wait for redis to stop
- usleep(50000);
- }
- }
- #define REDIS_SERVER_BIN "redis-server"
- #define REDIS_SERVER_PORT "6479"
- static void RunRedisServer() {
- #if defined(BAIDU_INTERNAL)
- puts("Downloading redis-server...");
- if (system("mkdir -p redis_server_for_test && cd redis_server_for_test && svn co https://svn.baidu.com/third-64/tags/redis/redis_2-6-14-100_PD_BL/bin") != 0) {
- puts("Fail to get redis-server from svn");
- return;
- }
- # undef REDIS_SERVER_BIN
- # define REDIS_SERVER_BIN "redis_server_for_test/bin/redis-server";
- #else
- if (system("which " REDIS_SERVER_BIN) != 0) {
- puts("Fail to find " REDIS_SERVER_BIN ", following tests will be skipped");
- return;
- }
- #endif
- atexit(RemoveRedisServer);
- g_redis_pid = fork();
- if (g_redis_pid < 0) {
- puts("Fail to fork");
- exit(1);
- } else if (g_redis_pid == 0) {
- puts("[Starting redis-server]");
- char* const argv[] = { (char*)REDIS_SERVER_BIN,
- (char*)"--port", (char*)REDIS_SERVER_PORT,
- NULL };
- unlink("dump.rdb");
- if (execvp(REDIS_SERVER_BIN, argv) < 0) {
- puts("Fail to run " REDIS_SERVER_BIN);
- exit(1);
- }
- }
- // Wait for redis to start.
- usleep(50000);
- }
- class RedisTest : public testing::Test {
- protected:
- RedisTest() {}
- void SetUp() {
- pthread_once(&download_redis_server_once, RunRedisServer);
- }
- void TearDown() {}
- };
- void AssertReplyEqual(const brpc::RedisReply& reply1,
- const brpc::RedisReply& reply2) {
- if (&reply1 == &reply2) {
- return;
- }
- CHECK_EQ(reply1.type(), reply2.type());
- switch (reply1.type()) {
- case brpc::REDIS_REPLY_ARRAY:
- ASSERT_EQ(reply1.size(), reply2.size());
- for (size_t j = 0; j < reply1.size(); ++j) {
- ASSERT_NE(&reply1[j], &reply2[j]); // from different arena
- AssertReplyEqual(reply1[j], reply2[j]);
- }
- break;
- case brpc::REDIS_REPLY_INTEGER:
- ASSERT_EQ(reply1.integer(), reply2.integer());
- break;
- case brpc::REDIS_REPLY_NIL:
- break;
- case brpc::REDIS_REPLY_STRING:
- // fall through
- case brpc::REDIS_REPLY_STATUS:
- ASSERT_NE(reply1.c_str(), reply2.c_str()); // from different arena
- ASSERT_EQ(reply1.data(), reply2.data());
- break;
- case brpc::REDIS_REPLY_ERROR:
- ASSERT_NE(reply1.error_message(), reply2.error_message()); // from different arena
- ASSERT_STREQ(reply1.error_message(), reply2.error_message());
- break;
- }
- }
- void AssertResponseEqual(const brpc::RedisResponse& r1,
- const brpc::RedisResponse& r2,
- int repeated_times = 1) {
- if (&r1 == &r2) {
- ASSERT_EQ(repeated_times, 1);
- return;
- }
- ASSERT_EQ(r2.reply_size()* repeated_times, r1.reply_size());
- for (int j = 0; j < repeated_times; ++j) {
- for (int i = 0; i < r2.reply_size(); ++i) {
- ASSERT_NE(&r2.reply(i), &r1.reply(j * r2.reply_size() + i));
- AssertReplyEqual(r2.reply(i), r1.reply(j * r2.reply_size() + i));
- }
- }
- }
- TEST_F(RedisTest, sanity) {
- if (g_redis_pid < 0) {
- puts("Skipped due to absence of redis-server");
- return;
- }
- brpc::ChannelOptions options;
- options.protocol = brpc::PROTOCOL_REDIS;
- brpc::Channel channel;
- ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
- brpc::RedisRequest request;
- brpc::RedisResponse response;
- brpc::Controller cntl;
- ASSERT_TRUE(request.AddCommand("get hello"));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(1, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(0).type())
- << response;
- cntl.Reset();
- request.Clear();
- response.Clear();
- request.AddCommand("set hello world");
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(1, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
- ASSERT_EQ("OK", response.reply(0).data());
- cntl.Reset();
- request.Clear();
- response.Clear();
- ASSERT_TRUE(request.AddCommand("get hello"));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed());
- ASSERT_EQ(1, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(0).type());
- ASSERT_EQ("world", response.reply(0).data());
- cntl.Reset();
- request.Clear();
- response.Clear();
- request.AddCommand("set hello world2");
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(1, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
- ASSERT_EQ("OK", response.reply(0).data());
-
- cntl.Reset();
- request.Clear();
- response.Clear();
- ASSERT_TRUE(request.AddCommand("get hello"));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed());
- ASSERT_EQ(1, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(0).type());
- ASSERT_EQ("world2", response.reply(0).data());
- cntl.Reset();
- request.Clear();
- response.Clear();
- ASSERT_TRUE(request.AddCommand("del hello"));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed());
- ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(0).type());
- ASSERT_EQ(1, response.reply(0).integer());
- cntl.Reset();
- request.Clear();
- response.Clear();
- ASSERT_TRUE(request.AddCommand("get %s", "hello"));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(1, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(0).type());
- }
- TEST_F(RedisTest, keys_with_spaces) {
- if (g_redis_pid < 0) {
- puts("Skipped due to absence of redis-server");
- return;
- }
- brpc::ChannelOptions options;
- options.protocol = brpc::PROTOCOL_REDIS;
- brpc::Channel channel;
- ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
- brpc::RedisRequest request;
- brpc::RedisResponse response;
- brpc::Controller cntl;
-
- cntl.Reset();
- request.Clear();
- response.Clear();
- ASSERT_TRUE(request.AddCommand("set %s 'he1 he1 da1'", "hello world"));
- ASSERT_TRUE(request.AddCommand("set 'hello2 world2' 'he2 he2 da2'"));
- ASSERT_TRUE(request.AddCommand("set \"hello3 world3\" \"he3 he3 da3\""));
- ASSERT_TRUE(request.AddCommand("get \"hello world\""));
- ASSERT_TRUE(request.AddCommand("get 'hello world'"));
- ASSERT_TRUE(request.AddCommand("get 'hello2 world2'"));
- ASSERT_TRUE(request.AddCommand("get 'hello3 world3'"));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(7, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
- ASSERT_EQ("OK", response.reply(0).data());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(1).type());
- ASSERT_EQ("OK", response.reply(1).data());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
- ASSERT_EQ("OK", response.reply(2).data());
- ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
- ASSERT_EQ("he1 he1 da1", response.reply(3).data());
- ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(4).type());
- ASSERT_EQ("he1 he1 da1", response.reply(4).data());
- ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(5).type());
- ASSERT_EQ("he2 he2 da2", response.reply(5).data());
- ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(6).type());
- ASSERT_EQ("he3 he3 da3", response.reply(6).data());
- brpc::RedisResponse response2 = response;
- AssertResponseEqual(response2, response);
- response2.MergeFrom(response);
- AssertResponseEqual(response2, response, 2);
- }
- TEST_F(RedisTest, incr_and_decr) {
- if (g_redis_pid < 0) {
- puts("Skipped due to absence of redis-server");
- return;
- }
- brpc::ChannelOptions options;
- options.protocol = brpc::PROTOCOL_REDIS;
- brpc::Channel channel;
- ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
- brpc::RedisRequest request;
- brpc::RedisResponse response;
- brpc::Controller cntl;
- request.AddCommand("incr counter1");
- request.AddCommand("decr counter1");
- request.AddCommand("incrby counter1 %d", 10);
- request.AddCommand("decrby counter1 %d", 20);
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(4, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(0).type());
- ASSERT_EQ(1, response.reply(0).integer());
- ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(1).type());
- ASSERT_EQ(0, response.reply(1).integer());
- ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(2).type());
- ASSERT_EQ(10, response.reply(2).integer());
- ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(3).type());
- ASSERT_EQ(-10, response.reply(3).integer());
- brpc::RedisResponse response2 = response;
- AssertResponseEqual(response2, response);
- response2.MergeFrom(response);
- AssertResponseEqual(response2, response, 2);
- }
- TEST_F(RedisTest, by_components) {
- if (g_redis_pid < 0) {
- puts("Skipped due to absence of redis-server");
- return;
- }
- brpc::ChannelOptions options;
- options.protocol = brpc::PROTOCOL_REDIS;
- brpc::Channel channel;
- ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
- brpc::RedisRequest request;
- brpc::RedisResponse response;
- brpc::Controller cntl;
- butil::StringPiece comp1[] = { "incr", "counter2" };
- butil::StringPiece comp2[] = { "decr", "counter2" };
- butil::StringPiece comp3[] = { "incrby", "counter2", "10" };
- butil::StringPiece comp4[] = { "decrby", "counter2", "20" };
- request.AddCommandByComponents(comp1, arraysize(comp1));
- request.AddCommandByComponents(comp2, arraysize(comp2));
- request.AddCommandByComponents(comp3, arraysize(comp3));
- request.AddCommandByComponents(comp4, arraysize(comp4));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(4, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(0).type());
- ASSERT_EQ(1, response.reply(0).integer());
- ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(1).type());
- ASSERT_EQ(0, response.reply(1).integer());
- ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(2).type());
- ASSERT_EQ(10, response.reply(2).integer());
- ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(3).type());
- ASSERT_EQ(-10, response.reply(3).integer());
- brpc::RedisResponse response2 = response;
- AssertResponseEqual(response2, response);
- response2.MergeFrom(response);
- AssertResponseEqual(response2, response, 2);
- }
- static std::string GeneratePassword() {
- std::string result;
- result.reserve(12);
- for (size_t i = 0; i < result.capacity(); ++i) {
- result.push_back(butil::fast_rand_in('a', 'z'));
- }
- return result;
- }
- TEST_F(RedisTest, auth) {
- if (g_redis_pid < 0) {
- puts("Skipped due to absence of redis-server");
- return;
- }
- // generate a random password
- const std::string passwd1 = GeneratePassword();
- const std::string passwd2 = GeneratePassword();
- LOG(INFO) << "Generated passwd1=" << passwd1 << " passwd2=" << passwd2;
- // config auth
- {
- brpc::ChannelOptions options;
- options.protocol = brpc::PROTOCOL_REDIS;
- brpc::Channel channel;
- ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
- brpc::RedisRequest request;
- brpc::RedisResponse response;
- brpc::Controller cntl;
- request.AddCommand("set mykey %s", passwd1.c_str());
- request.AddCommand("config set requirepass %s", passwd1.c_str());
- request.AddCommand("auth %s", passwd1.c_str());
- request.AddCommand("get mykey");
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(4, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
- ASSERT_STREQ("OK", response.reply(0).c_str());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(1).type());
- ASSERT_STREQ("OK", response.reply(1).c_str());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
- ASSERT_STREQ("OK", response.reply(2).c_str());
- ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
- ASSERT_STREQ(passwd1.c_str(), response.reply(3).c_str());
- }
- // Auth failed
- {
- brpc::ChannelOptions options;
- options.protocol = brpc::PROTOCOL_REDIS;
- brpc::Channel channel;
- ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
- brpc::RedisRequest request;
- brpc::RedisResponse response;
- brpc::Controller cntl;
- request.AddCommand("get mykey");
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(1, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(0).type());
- }
- // Auth with RedisAuthenticator and change to passwd2 (setting to empty
- // pass does not work on redis 6.0.6)
- {
- brpc::ChannelOptions options;
- options.protocol = brpc::PROTOCOL_REDIS;
- brpc::Channel channel;
- brpc::policy::RedisAuthenticator* auth =
- new brpc::policy::RedisAuthenticator(passwd1.c_str());
- options.auth = auth;
- ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
- brpc::RedisRequest request;
- brpc::RedisResponse response;
- brpc::Controller cntl;
- request.AddCommand("get mykey");
- request.AddCommand("config set requirepass %s", passwd2.c_str());
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(2, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(0).type());
- ASSERT_STREQ(passwd1.c_str(), response.reply(0).c_str());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(1).type());
- ASSERT_STREQ("OK", response.reply(1).c_str());
- }
- // Auth with passwd2
- {
- brpc::ChannelOptions options;
- options.protocol = brpc::PROTOCOL_REDIS;
- brpc::policy::RedisAuthenticator* auth =
- new brpc::policy::RedisAuthenticator(passwd2.c_str());
- options.auth = auth;
- brpc::Channel channel;
- ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
- brpc::RedisRequest request;
- brpc::RedisResponse response;
- brpc::Controller cntl;
- request.AddCommand("get mykey");
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(1, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(0).type()) << response.reply(0);
- ASSERT_STREQ(passwd1.c_str(), response.reply(0).c_str());
- }
- }
- TEST_F(RedisTest, cmd_format) {
- if (g_redis_pid < 0) {
- puts("Skipped due to absence of redis-server");
- return;
- }
- brpc::RedisRequest request;
- // set empty string
- request.AddCommand("set a ''");
- ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$0\r\n\r\n",
- request._buf.to_string().c_str());
- request.Clear();
- request.AddCommand("mset b '' c ''");
- ASSERT_STREQ("*5\r\n$4\r\nmset\r\n$1\r\nb\r\n$0\r\n\r\n$1\r\nc\r\n$0\r\n\r\n",
- request._buf.to_string().c_str());
- request.Clear();
- // set non-empty string
- request.AddCommand("set a 123");
- ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$3\r\n123\r\n",
- request._buf.to_string().c_str());
- request.Clear();
- request.AddCommand("mset b '' c ccc");
- ASSERT_STREQ("*5\r\n$4\r\nmset\r\n$1\r\nb\r\n$0\r\n\r\n$1\r\nc\r\n$3\r\nccc\r\n",
- request._buf.to_string().c_str());
- request.Clear();
- request.AddCommand("get ''key value"); // == get <empty> key value
- ASSERT_STREQ("*4\r\n$3\r\nget\r\n$0\r\n\r\n$3\r\nkey\r\n$5\r\nvalue\r\n", request._buf.to_string().c_str());
- request.Clear();
- request.AddCommand("get key'' value"); // == get key <empty> value
- ASSERT_STREQ("*4\r\n$3\r\nget\r\n$3\r\nkey\r\n$0\r\n\r\n$5\r\nvalue\r\n", request._buf.to_string().c_str());
- request.Clear();
- request.AddCommand("get 'ext'key value "); // == get ext key value
- ASSERT_STREQ("*4\r\n$3\r\nget\r\n$3\r\next\r\n$3\r\nkey\r\n$5\r\nvalue\r\n", request._buf.to_string().c_str());
- request.Clear();
-
- request.AddCommand(" get key'ext' value "); // == get key ext value
- ASSERT_STREQ("*4\r\n$3\r\nget\r\n$3\r\nkey\r\n$3\r\next\r\n$5\r\nvalue\r\n", request._buf.to_string().c_str());
- request.Clear();
- }
- TEST_F(RedisTest, quote_and_escape) {
- if (g_redis_pid < 0) {
- puts("Skipped due to absence of redis-server");
- return;
- }
- brpc::RedisRequest request;
- request.AddCommand("set a 'foo bar'");
- ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$7\r\nfoo bar\r\n",
- request._buf.to_string().c_str());
- request.Clear();
- request.AddCommand("set a 'foo \\'bar'");
- ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$8\r\nfoo 'bar\r\n",
- request._buf.to_string().c_str());
- request.Clear();
- request.AddCommand("set a 'foo \"bar'");
- ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$8\r\nfoo \"bar\r\n",
- request._buf.to_string().c_str());
- request.Clear();
- request.AddCommand("set a 'foo \\\"bar'");
- ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$9\r\nfoo \\\"bar\r\n",
- request._buf.to_string().c_str());
- request.Clear();
- request.AddCommand("set a \"foo 'bar\"");
- ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$8\r\nfoo 'bar\r\n",
- request._buf.to_string().c_str());
- request.Clear();
- request.AddCommand("set a \"foo \\'bar\"");
- ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$9\r\nfoo \\'bar\r\n",
- request._buf.to_string().c_str());
- request.Clear();
- request.AddCommand("set a \"foo \\\"bar\"");
- ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$8\r\nfoo \"bar\r\n",
- request._buf.to_string().c_str());
- request.Clear();
- }
- std::string GetCompleteCommand(const std::vector<butil::StringPiece>& commands) {
- std::string res;
- for (int i = 0; i < (int)commands.size(); ++i) {
- if (i != 0) {
- res.push_back(' ');
- }
- res.append(commands[i].data(), commands[i].size());
- }
- return res;
- }
- TEST_F(RedisTest, command_parser) {
- brpc::RedisCommandParser parser;
- butil::IOBuf buf;
- std::vector<butil::StringPiece> command_out;
- butil::Arena arena;
- {
- // parse from whole command
- std::string command = "set abc edc";
- ASSERT_TRUE(brpc::RedisCommandNoFormat(&buf, command.c_str()).ok());
- ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &arena));
- ASSERT_TRUE(buf.empty());
- ASSERT_EQ(command, GetCompleteCommand(command_out));
- }
- {
- // simulate parsing from network
- int t = 100;
- std::string raw_string("*3\r\n$3\r\nset\r\n$3\r\nabc\r\n$3\r\ndef\r\n");
- int size = raw_string.size();
- while (t--) {
- for (int i = 0; i < size; ++i) {
- buf.push_back(raw_string[i]);
- if (i == size - 1) {
- ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &arena));
- } else {
- if (butil::fast_rand_less_than(2) == 0) {
- ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA,
- parser.Consume(buf, &command_out, &arena));
- }
- }
- }
- ASSERT_TRUE(buf.empty());
- ASSERT_EQ(GetCompleteCommand(command_out), "set abc def");
- }
- }
- {
- // there is a non-string message in command and parse should fail
- buf.append("*3\r\n$3");
- ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, parser.Consume(buf, &command_out, &arena));
- ASSERT_EQ((int)buf.size(), 2); // left "$3"
- buf.append("\r\nset\r\n:123\r\n$3\r\ndef\r\n");
- ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, parser.Consume(buf, &command_out, &arena));
- parser.Reset();
- }
- {
- // not array
- buf.append(":123456\r\n");
- ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
- parser.Reset();
- }
- {
- // not array
- buf.append("+Error\r\n");
- ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
- parser.Reset();
- }
- {
- // not array
- buf.append("+OK\r\n");
- ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
- parser.Reset();
- }
- {
- // not array
- buf.append("$5\r\nhello\r\n");
- ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
- parser.Reset();
- }
- }
- TEST_F(RedisTest, redis_reply_codec) {
- butil::Arena arena;
- // status
- {
- brpc::RedisReply r(&arena);
- butil::IOBuf buf;
- butil::IOBufAppender appender;
- r.SetStatus("OK");
- ASSERT_TRUE(r.SerializeTo(&appender));
- appender.move_to(buf);
- ASSERT_STREQ(buf.to_string().c_str(), "+OK\r\n");
- ASSERT_STREQ(r.c_str(), "OK");
- brpc::RedisReply r2(&arena);
- brpc::ParseError err = r2.ConsumePartialIOBuf(buf);
- ASSERT_EQ(err, brpc::PARSE_OK);
- ASSERT_TRUE(r2.is_string());
- ASSERT_STREQ("OK", r2.c_str());
- }
- // error
- {
- brpc::RedisReply r(&arena);
- butil::IOBuf buf;
- butil::IOBufAppender appender;
- r.SetError("not exist \'key\'");
- ASSERT_TRUE(r.SerializeTo(&appender));
- appender.move_to(buf);
- ASSERT_STREQ(buf.to_string().c_str(), "-not exist \'key\'\r\n");
- brpc::RedisReply r2(&arena);
- brpc::ParseError err = r2.ConsumePartialIOBuf(buf);
- ASSERT_EQ(err, brpc::PARSE_OK);
- ASSERT_TRUE(r2.is_error());
- ASSERT_STREQ("not exist \'key\'", r2.error_message());
- }
- // string
- {
- brpc::RedisReply r(&arena);
- butil::IOBuf buf;
- butil::IOBufAppender appender;
- r.SetNullString();
- ASSERT_TRUE(r.SerializeTo(&appender));
- appender.move_to(buf);
- ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n");
-
- brpc::RedisReply r2(&arena);
- brpc::ParseError err = r2.ConsumePartialIOBuf(buf);
- ASSERT_EQ(err, brpc::PARSE_OK);
- ASSERT_TRUE(r2.is_nil());
- r.SetString("abcde'hello world");
- ASSERT_TRUE(r.SerializeTo(&appender));
- appender.move_to(buf);
- ASSERT_STREQ(buf.to_string().c_str(), "$17\r\nabcde'hello world\r\n");
- ASSERT_STREQ("abcde'hello world", r.c_str());
- r.FormatString("int:%d str:%s fp:%.2f", 123, "foobar", 3.21);
- ASSERT_TRUE(r.SerializeTo(&appender));
- appender.move_to(buf);
- ASSERT_STREQ(buf.to_string().c_str(), "$26\r\nint:123 str:foobar fp:3.21\r\n");
- ASSERT_STREQ("int:123 str:foobar fp:3.21", r.c_str());
- r.FormatString("verylongstring verylongstring verylongstring verylongstring int:%d str:%s fp:%.2f", 123, "foobar", 3.21);
- ASSERT_TRUE(r.SerializeTo(&appender));
- appender.move_to(buf);
- ASSERT_STREQ(buf.to_string().c_str(), "$86\r\nverylongstring verylongstring verylongstring verylongstring int:123 str:foobar fp:3.21\r\n");
- ASSERT_STREQ("verylongstring verylongstring verylongstring verylongstring int:123 str:foobar fp:3.21", r.c_str());
-
- brpc::RedisReply r3(&arena);
- err = r3.ConsumePartialIOBuf(buf);
- ASSERT_EQ(err, brpc::PARSE_OK);
- ASSERT_TRUE(r3.is_string());
- ASSERT_STREQ(r.c_str(), r3.c_str());
- }
- // integer
- {
- brpc::RedisReply r(&arena);
- butil::IOBuf buf;
- butil::IOBufAppender appender;
- int t = 2;
- int input[] = { -1, 1234567 };
- const char* output[] = { ":-1\r\n", ":1234567\r\n" };
- for (int i = 0; i < t; ++i) {
- r.SetInteger(input[i]);
- ASSERT_TRUE(r.SerializeTo(&appender));
- appender.move_to(buf);
- ASSERT_STREQ(buf.to_string().c_str(), output[i]);
- brpc::RedisReply r2(&arena);
- brpc::ParseError err = r2.ConsumePartialIOBuf(buf);
- ASSERT_EQ(err, brpc::PARSE_OK);
- ASSERT_TRUE(r2.is_integer());
- ASSERT_EQ(r2.integer(), input[i]);
- }
- }
- // array
- {
- brpc::RedisReply r(&arena);
- butil::IOBuf buf;
- butil::IOBufAppender appender;
- r.SetArray(3);
- brpc::RedisReply& sub_reply = r[0];
- sub_reply.SetArray(2);
- sub_reply[0].SetString("hello, it's me");
- sub_reply[1].SetInteger(422);
- r[1].SetString("To go over everything");
- r[2].SetInteger(1);
- ASSERT_TRUE(r[3].is_nil());
- ASSERT_TRUE(r.SerializeTo(&appender));
- appender.move_to(buf);
- ASSERT_STREQ(buf.to_string().c_str(),
- "*3\r\n*2\r\n$14\r\nhello, it's me\r\n:422\r\n$21\r\n"
- "To go over everything\r\n:1\r\n");
- brpc::RedisReply r2(&arena);
- ASSERT_EQ(r2.ConsumePartialIOBuf(buf), brpc::PARSE_OK);
- ASSERT_TRUE(r2.is_array());
- ASSERT_EQ(3ul, r2.size());
- ASSERT_TRUE(r2[0].is_array());
- ASSERT_EQ(2ul, r2[0].size());
- ASSERT_TRUE(r2[0][0].is_string());
- ASSERT_STREQ(r2[0][0].c_str(), "hello, it's me");
- ASSERT_TRUE(r2[0][1].is_integer());
- ASSERT_EQ(r2[0][1].integer(), 422);
- ASSERT_TRUE(r2[1].is_string());
- ASSERT_STREQ(r2[1].c_str(), "To go over everything");
- ASSERT_TRUE(r2[2].is_integer());
- ASSERT_EQ(1, r2[2].integer());
- // null array
- r.SetNullArray();
- ASSERT_TRUE(r.SerializeTo(&appender));
- appender.move_to(buf);
- ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n");
- ASSERT_EQ(r.ConsumePartialIOBuf(buf), brpc::PARSE_OK);
- ASSERT_TRUE(r.is_nil());
- }
- // CopyFromDifferentArena
- {
- brpc::RedisReply r(&arena);
- r.SetArray(1);
- brpc::RedisReply& sub_reply = r[0];
- sub_reply.SetArray(2);
- sub_reply[0].SetString("hello, it's me");
- sub_reply[1].SetInteger(422);
- brpc::RedisReply r2(&arena);
- r2.CopyFromDifferentArena(r);
- ASSERT_TRUE(r2.is_array());
- ASSERT_EQ((int)r2[0].size(), 2);
- ASSERT_STREQ(r2[0][0].c_str(), sub_reply[0].c_str());
- ASSERT_EQ(r2[0][1].integer(), sub_reply[1].integer());
- }
- // SetXXX can be called multiple times.
- {
- brpc::RedisReply r(&arena);
- r.SetStatus("OK");
- ASSERT_TRUE(r.is_string());
- r.SetNullString();
- ASSERT_TRUE(r.is_nil());
- r.SetArray(2);
- ASSERT_TRUE(r.is_array());
- r.SetString("OK");
- ASSERT_TRUE(r.is_string());
- r.SetError("OK");
- ASSERT_TRUE(r.is_error());
- r.SetInteger(42);
- ASSERT_TRUE(r.is_integer());
- }
- }
- butil::Mutex s_mutex;
- std::unordered_map<std::string, std::string> m;
- std::unordered_map<std::string, int64_t> int_map;
- class RedisServiceImpl : public brpc::RedisService {
- public:
- RedisServiceImpl()
- : _batch_count(0) {}
- brpc::RedisCommandHandlerResult OnBatched(const std::vector<butil::StringPiece>& args,
- brpc::RedisReply* output, bool flush_batched) {
- if (_batched_command.empty() && flush_batched) {
- if (args[0] == "set") {
- DoSet(args[1].as_string(), args[2].as_string(), output);
- } else if (args[0] == "get") {
- DoGet(args[1].as_string(), output);
- }
- return brpc::REDIS_CMD_HANDLED;
- }
- std::vector<std::string> comm;
- for (int i = 0; i < (int)args.size(); ++i) {
- comm.push_back(args[i].as_string());
- }
- _batched_command.push_back(comm);
- if (flush_batched) {
- output->SetArray(_batched_command.size());
- for (int i = 0; i < (int)_batched_command.size(); ++i) {
- if (_batched_command[i][0] == "set") {
- DoSet(_batched_command[i][1], _batched_command[i][2], &(*output)[i]);
- } else if (_batched_command[i][0] == "get") {
- DoGet(_batched_command[i][1], &(*output)[i]);
- }
- }
- _batch_count++;
- _batched_command.clear();
- return brpc::REDIS_CMD_HANDLED;
- } else {
- return brpc::REDIS_CMD_BATCHED;
- }
- }
- void DoSet(const std::string& key, const std::string& value, brpc::RedisReply* output) {
- m[key] = value;
- output->SetStatus("OK");
- }
- void DoGet(const std::string& key, brpc::RedisReply* output) {
- auto it = m.find(key);
- if (it != m.end()) {
- output->SetString(it->second);
- } else {
- output->SetNullString();
- }
- }
- std::vector<std::vector<std::string> > _batched_command;
- int _batch_count;
- };
- class SetCommandHandler : public brpc::RedisCommandHandler {
- public:
- SetCommandHandler(RedisServiceImpl* rs, bool batch_process = false)
- : _rs(rs)
- , _batch_process(batch_process) {}
- brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
- brpc::RedisReply* output,
- bool flush_batched) {
- if (args.size() < 3) {
- output->SetError("ERR wrong number of arguments for 'set' command");
- return brpc::REDIS_CMD_HANDLED;
- }
- if (_batch_process) {
- return _rs->OnBatched(args, output, flush_batched);
- } else {
- DoSet(args[1].as_string(), args[2].as_string(), output);
- return brpc::REDIS_CMD_HANDLED;
- }
- }
- void DoSet(const std::string& key, const std::string& value, brpc::RedisReply* output) {
- m[key] = value;
- output->SetStatus("OK");
- }
- private:
- RedisServiceImpl* _rs;
- bool _batch_process;
- };
- class GetCommandHandler : public brpc::RedisCommandHandler {
- public:
- GetCommandHandler(RedisServiceImpl* rs, bool batch_process = false)
- : _rs(rs)
- , _batch_process(batch_process) {}
- brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
- brpc::RedisReply* output,
- bool flush_batched) {
- if (args.size() < 2) {
- output->SetError("ERR wrong number of arguments for 'get' command");
- return brpc::REDIS_CMD_HANDLED;
- }
- if (_batch_process) {
- return _rs->OnBatched(args, output, flush_batched);
- } else {
- DoGet(args[1].as_string(), output);
- return brpc::REDIS_CMD_HANDLED;
- }
- }
- void DoGet(const std::string& key, brpc::RedisReply* output) {
- auto it = m.find(key);
- if (it != m.end()) {
- output->SetString(it->second);
- } else {
- output->SetNullString();
- }
- }
- private:
- RedisServiceImpl* _rs;
- bool _batch_process;
- };
- class IncrCommandHandler : public brpc::RedisCommandHandler {
- public:
- IncrCommandHandler() {}
- brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
- brpc::RedisReply* output,
- bool flush_batched) {
- if (args.size() < 2) {
- output->SetError("ERR wrong number of arguments for 'incr' command");
- return brpc::REDIS_CMD_HANDLED;
- }
- int64_t value;
- s_mutex.lock();
- value = ++int_map[args[1].as_string()];
- s_mutex.unlock();
- output->SetInteger(value);
- return brpc::REDIS_CMD_HANDLED;
- }
- };
- TEST_F(RedisTest, server_sanity) {
- brpc::Server server;
- brpc::ServerOptions server_options;
- RedisServiceImpl* rsimpl = new RedisServiceImpl;
- GetCommandHandler *gh = new GetCommandHandler(rsimpl);
- SetCommandHandler *sh = new SetCommandHandler(rsimpl);
- IncrCommandHandler *ih = new IncrCommandHandler;
- rsimpl->AddCommandHandler("get", gh);
- rsimpl->AddCommandHandler("set", sh);
- rsimpl->AddCommandHandler("incr", ih);
- server_options.redis_service = rsimpl;
- brpc::PortRange pr(8081, 8900);
- ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
- brpc::ChannelOptions options;
- options.protocol = brpc::PROTOCOL_REDIS;
- brpc::Channel channel;
- ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));
- brpc::RedisRequest request;
- brpc::RedisResponse response;
- brpc::Controller cntl;
- ASSERT_TRUE(request.AddCommand("get hello"));
- ASSERT_TRUE(request.AddCommand("get hello2"));
- ASSERT_TRUE(request.AddCommand("set key1 value1"));
- ASSERT_TRUE(request.AddCommand("get key1"));
- ASSERT_TRUE(request.AddCommand("set key2 value2"));
- ASSERT_TRUE(request.AddCommand("get key2"));
- ASSERT_TRUE(request.AddCommand("xxxcommand key2"));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(7, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(0).type());
- ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(1).type());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
- ASSERT_STREQ("OK", response.reply(2).c_str());
- ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
- ASSERT_STREQ("value1", response.reply(3).c_str());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(4).type());
- ASSERT_STREQ("OK", response.reply(4).c_str());
- ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(5).type());
- ASSERT_STREQ("value2", response.reply(5).c_str());
- ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(6).type());
- ASSERT_TRUE(butil::StringPiece(response.reply(6).error_message()).starts_with("ERR unknown command"));
- cntl.Reset();
- request.Clear();
- response.Clear();
- std::string value3("value3");
- value3.append(1, '\0');
- value3.append(1, 'a');
- std::vector<butil::StringPiece> pieces;
- pieces.push_back("set");
- pieces.push_back("key3");
- pieces.push_back(value3);
- ASSERT_TRUE(request.AddCommandByComponents(&pieces[0], pieces.size()));
- ASSERT_TRUE(request.AddCommand("set key4 \"\""));
- ASSERT_TRUE(request.AddCommand("get key3"));
- ASSERT_TRUE(request.AddCommand("get key4"));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(4, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
- ASSERT_STREQ("OK", response.reply(0).c_str());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(1).type());
- ASSERT_STREQ("OK", response.reply(1).c_str());
- ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(2).type());
- ASSERT_STREQ("value3", response.reply(2).c_str());
- ASSERT_NE("value3", response.reply(2).data());
- ASSERT_EQ(value3, response.reply(2).data());
- ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
- ASSERT_EQ("", response.reply(3).data());
- }
- void* incr_thread(void* arg) {
- brpc::Channel* c = static_cast<brpc::Channel*>(arg);
- for (int i = 0; i < 5000; ++i) {
- brpc::RedisRequest request;
- brpc::RedisResponse response;
- brpc::Controller cntl;
- EXPECT_TRUE(request.AddCommand("incr count"));
- c->CallMethod(NULL, &cntl, &request, &response, NULL);
- EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
- EXPECT_EQ(1, response.reply_size());
- EXPECT_TRUE(response.reply(0).is_integer());
- }
- return NULL;
- }
- TEST_F(RedisTest, server_concurrency) {
- int N = 10;
- brpc::Server server;
- brpc::ServerOptions server_options;
- RedisServiceImpl* rsimpl = new RedisServiceImpl;
- IncrCommandHandler *ih = new IncrCommandHandler;
- rsimpl->AddCommandHandler("incr", ih);
- server_options.redis_service = rsimpl;
- brpc::PortRange pr(8081, 8900);
- ASSERT_EQ(0, server.Start("0.0.0.0", pr, &server_options));
- brpc::ChannelOptions options;
- options.protocol = brpc::PROTOCOL_REDIS;
- options.connection_type = "pooled";
- std::vector<bthread_t> bths;
- std::vector<brpc::Channel*> channels;
- for (int i = 0; i < N; ++i) {
- channels.push_back(new brpc::Channel);
- ASSERT_EQ(0, channels.back()->Init("127.0.0.1", server.listen_address().port, &options));
- bthread_t bth;
- ASSERT_EQ(bthread_start_background(&bth, NULL, incr_thread, channels.back()), 0);
- bths.push_back(bth);
- }
- for (int i = 0; i < N; ++i) {
- bthread_join(bths[i], NULL);
- delete channels[i];
- }
- ASSERT_EQ(int_map["count"], 10 * 5000LL);
- }
- class MultiCommandHandler : public brpc::RedisCommandHandler {
- public:
- MultiCommandHandler() {}
- brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
- brpc::RedisReply* output,
- bool flush_batched) {
- output->SetStatus("OK");
- return brpc::REDIS_CMD_CONTINUE;
- }
- RedisCommandHandler* NewTransactionHandler() override {
- return new MultiTransactionHandler;
- }
- class MultiTransactionHandler : public brpc::RedisCommandHandler {
- public:
- brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
- brpc::RedisReply* output,
- bool flush_batched) {
- if (args[0] == "multi") {
- output->SetError("ERR duplicate multi");
- return brpc::REDIS_CMD_CONTINUE;
- }
- if (args[0] != "exec") {
- std::vector<std::string> comm;
- for (int i = 0; i < (int)args.size(); ++i) {
- comm.push_back(args[i].as_string());
- }
- _commands.push_back(comm);
- output->SetStatus("QUEUED");
- return brpc::REDIS_CMD_CONTINUE;
- }
- output->SetArray(_commands.size());
- s_mutex.lock();
- for (size_t i = 0; i < _commands.size(); ++i) {
- if (_commands[i][0] == "incr") {
- int64_t value;
- value = ++int_map[_commands[i][1]];
- (*output)[i].SetInteger(value);
- } else {
- (*output)[i].SetStatus("unknown command");
- }
- }
- s_mutex.unlock();
- return brpc::REDIS_CMD_HANDLED;
- }
- private:
- std::vector<std::vector<std::string> > _commands;
- };
- };
- TEST_F(RedisTest, server_command_continue) {
- brpc::Server server;
- brpc::ServerOptions server_options;
- RedisServiceImpl* rsimpl = new RedisServiceImpl;
- rsimpl->AddCommandHandler("get", new GetCommandHandler(rsimpl));
- rsimpl->AddCommandHandler("set", new SetCommandHandler(rsimpl));
- rsimpl->AddCommandHandler("incr", new IncrCommandHandler);
- rsimpl->AddCommandHandler("multi", new MultiCommandHandler);
- server_options.redis_service = rsimpl;
- brpc::PortRange pr(8081, 8900);
- ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
- brpc::ChannelOptions options;
- options.protocol = brpc::PROTOCOL_REDIS;
- brpc::Channel channel;
- ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));
- {
- brpc::RedisRequest request;
- brpc::RedisResponse response;
- brpc::Controller cntl;
- ASSERT_TRUE(request.AddCommand("set hello world"));
- ASSERT_TRUE(request.AddCommand("get hello"));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(2, response.reply_size());
- ASSERT_STREQ("world", response.reply(1).c_str());
- }
- {
- brpc::RedisRequest request;
- brpc::RedisResponse response;
- brpc::Controller cntl;
- ASSERT_TRUE(request.AddCommand("multi"));
- ASSERT_TRUE(request.AddCommand("mUltI"));
- int count = 10;
- for (int i = 0; i < count; ++i) {
- ASSERT_TRUE(request.AddCommand("incr hello 1"));
- }
- ASSERT_TRUE(request.AddCommand("exec"));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_EQ(13, response.reply_size());
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
- ASSERT_STREQ("OK", response.reply(0).c_str());
- ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(1).type());
- for (int i = 2; i < count + 2; ++i) {
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(i).type());
- ASSERT_STREQ("QUEUED", response.reply(i).c_str());
- }
- const brpc::RedisReply& m = response.reply(count + 2);
- ASSERT_EQ(count, (int)m.size());
- for (int i = 0; i < count; ++i) {
- ASSERT_EQ(i+1, m[i].integer());
- }
- }
- // After 'multi', normal requests should be successful
- {
- brpc::RedisRequest request;
- brpc::RedisResponse response;
- brpc::Controller cntl;
- ASSERT_TRUE(request.AddCommand("get hello"));
- ASSERT_TRUE(request.AddCommand("get hello2"));
- ASSERT_TRUE(request.AddCommand("set key1 value1"));
- ASSERT_TRUE(request.AddCommand("get key1"));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_STREQ("world", response.reply(0).c_str());
- ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(1).type());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
- ASSERT_STREQ("OK", response.reply(2).c_str());
- ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
- ASSERT_STREQ("value1", response.reply(3).c_str());
- }
- }
- TEST_F(RedisTest, server_handle_pipeline) {
- brpc::Server server;
- brpc::ServerOptions server_options;
- RedisServiceImpl* rsimpl = new RedisServiceImpl;
- GetCommandHandler* getch = new GetCommandHandler(rsimpl, true);
- SetCommandHandler* setch = new SetCommandHandler(rsimpl, true);
- rsimpl->AddCommandHandler("get", getch);
- rsimpl->AddCommandHandler("set", setch);
- rsimpl->AddCommandHandler("multi", new MultiCommandHandler);
- server_options.redis_service = rsimpl;
- brpc::PortRange pr(8081, 8900);
- ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
- brpc::ChannelOptions options;
- options.protocol = brpc::PROTOCOL_REDIS;
- brpc::Channel channel;
- ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));
- brpc::RedisRequest request;
- brpc::RedisResponse response;
- brpc::Controller cntl;
- ASSERT_TRUE(request.AddCommand("set key1 v1"));
- ASSERT_TRUE(request.AddCommand("set key2 v2"));
- ASSERT_TRUE(request.AddCommand("set key3 v3"));
- ASSERT_TRUE(request.AddCommand("get hello"));
- ASSERT_TRUE(request.AddCommand("get hello"));
- ASSERT_TRUE(request.AddCommand("set key1 world"));
- ASSERT_TRUE(request.AddCommand("set key2 world"));
- ASSERT_TRUE(request.AddCommand("get key2"));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(8, response.reply_size());
- ASSERT_EQ(1, rsimpl->_batch_count);
- ASSERT_TRUE(response.reply(7).is_string());
- ASSERT_STREQ(response.reply(7).c_str(), "world");
- }
- } //namespace
|