brpc_redis_unittest.cpp 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <iostream>
  18. #include <unordered_map>
  19. #include <butil/time.h>
  20. #include <butil/logging.h>
  21. #include <brpc/redis.h>
  22. #include <brpc/channel.h>
  23. #include <brpc/policy/redis_authenticator.h>
  24. #include <brpc/server.h>
  25. #include <brpc/redis_command.h>
  26. #include <gtest/gtest.h>
  27. namespace brpc {
  28. DECLARE_int32(idle_timeout_second);
  29. }
  30. int main(int argc, char* argv[]) {
  31. brpc::FLAGS_idle_timeout_second = 0;
  32. testing::InitGoogleTest(&argc, argv);
  33. return RUN_ALL_TESTS();
  34. }
  35. namespace {
  36. static pthread_once_t download_redis_server_once = PTHREAD_ONCE_INIT;
  37. static pid_t g_redis_pid = -1;
  38. static void RemoveRedisServer() {
  39. if (g_redis_pid > 0) {
  40. puts("[Stopping redis-server]");
  41. char cmd[256];
  42. #if defined(BAIDU_INTERNAL)
  43. snprintf(cmd, sizeof(cmd), "kill %d; rm -rf redis_server_for_test", g_redis_pid);
  44. #else
  45. snprintf(cmd, sizeof(cmd), "kill %d", g_redis_pid);
  46. #endif
  47. CHECK(0 == system(cmd));
  48. // Wait for redis to stop
  49. usleep(50000);
  50. }
  51. }
  52. #define REDIS_SERVER_BIN "redis-server"
  53. #define REDIS_SERVER_PORT "6479"
  54. static void RunRedisServer() {
  55. #if defined(BAIDU_INTERNAL)
  56. puts("Downloading redis-server...");
  57. if (system("mkdir -p redis_server_for_test && cd redis_server_for_test && svn co https://svn.baidu.com/third-64/tags/redis/redis_2-6-14-100_PD_BL/bin") != 0) {
  58. puts("Fail to get redis-server from svn");
  59. return;
  60. }
  61. # undef REDIS_SERVER_BIN
  62. # define REDIS_SERVER_BIN "redis_server_for_test/bin/redis-server";
  63. #else
  64. if (system("which " REDIS_SERVER_BIN) != 0) {
  65. puts("Fail to find " REDIS_SERVER_BIN ", following tests will be skipped");
  66. return;
  67. }
  68. #endif
  69. atexit(RemoveRedisServer);
  70. g_redis_pid = fork();
  71. if (g_redis_pid < 0) {
  72. puts("Fail to fork");
  73. exit(1);
  74. } else if (g_redis_pid == 0) {
  75. puts("[Starting redis-server]");
  76. char* const argv[] = { (char*)REDIS_SERVER_BIN,
  77. (char*)"--port", (char*)REDIS_SERVER_PORT,
  78. NULL };
  79. unlink("dump.rdb");
  80. if (execvp(REDIS_SERVER_BIN, argv) < 0) {
  81. puts("Fail to run " REDIS_SERVER_BIN);
  82. exit(1);
  83. }
  84. }
  85. // Wait for redis to start.
  86. usleep(50000);
  87. }
  88. class RedisTest : public testing::Test {
  89. protected:
  90. RedisTest() {}
  91. void SetUp() {
  92. pthread_once(&download_redis_server_once, RunRedisServer);
  93. }
  94. void TearDown() {}
  95. };
  96. void AssertReplyEqual(const brpc::RedisReply& reply1,
  97. const brpc::RedisReply& reply2) {
  98. if (&reply1 == &reply2) {
  99. return;
  100. }
  101. CHECK_EQ(reply1.type(), reply2.type());
  102. switch (reply1.type()) {
  103. case brpc::REDIS_REPLY_ARRAY:
  104. ASSERT_EQ(reply1.size(), reply2.size());
  105. for (size_t j = 0; j < reply1.size(); ++j) {
  106. ASSERT_NE(&reply1[j], &reply2[j]); // from different arena
  107. AssertReplyEqual(reply1[j], reply2[j]);
  108. }
  109. break;
  110. case brpc::REDIS_REPLY_INTEGER:
  111. ASSERT_EQ(reply1.integer(), reply2.integer());
  112. break;
  113. case brpc::REDIS_REPLY_NIL:
  114. break;
  115. case brpc::REDIS_REPLY_STRING:
  116. // fall through
  117. case brpc::REDIS_REPLY_STATUS:
  118. ASSERT_NE(reply1.c_str(), reply2.c_str()); // from different arena
  119. ASSERT_EQ(reply1.data(), reply2.data());
  120. break;
  121. case brpc::REDIS_REPLY_ERROR:
  122. ASSERT_NE(reply1.error_message(), reply2.error_message()); // from different arena
  123. ASSERT_STREQ(reply1.error_message(), reply2.error_message());
  124. break;
  125. }
  126. }
  127. void AssertResponseEqual(const brpc::RedisResponse& r1,
  128. const brpc::RedisResponse& r2,
  129. int repeated_times = 1) {
  130. if (&r1 == &r2) {
  131. ASSERT_EQ(repeated_times, 1);
  132. return;
  133. }
  134. ASSERT_EQ(r2.reply_size()* repeated_times, r1.reply_size());
  135. for (int j = 0; j < repeated_times; ++j) {
  136. for (int i = 0; i < r2.reply_size(); ++i) {
  137. ASSERT_NE(&r2.reply(i), &r1.reply(j * r2.reply_size() + i));
  138. AssertReplyEqual(r2.reply(i), r1.reply(j * r2.reply_size() + i));
  139. }
  140. }
  141. }
  142. TEST_F(RedisTest, sanity) {
  143. if (g_redis_pid < 0) {
  144. puts("Skipped due to absence of redis-server");
  145. return;
  146. }
  147. brpc::ChannelOptions options;
  148. options.protocol = brpc::PROTOCOL_REDIS;
  149. brpc::Channel channel;
  150. ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
  151. brpc::RedisRequest request;
  152. brpc::RedisResponse response;
  153. brpc::Controller cntl;
  154. ASSERT_TRUE(request.AddCommand("get hello"));
  155. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  156. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  157. ASSERT_EQ(1, response.reply_size());
  158. ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(0).type())
  159. << response;
  160. cntl.Reset();
  161. request.Clear();
  162. response.Clear();
  163. request.AddCommand("set hello world");
  164. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  165. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  166. ASSERT_EQ(1, response.reply_size());
  167. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
  168. ASSERT_EQ("OK", response.reply(0).data());
  169. cntl.Reset();
  170. request.Clear();
  171. response.Clear();
  172. ASSERT_TRUE(request.AddCommand("get hello"));
  173. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  174. ASSERT_FALSE(cntl.Failed());
  175. ASSERT_EQ(1, response.reply_size());
  176. ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(0).type());
  177. ASSERT_EQ("world", response.reply(0).data());
  178. cntl.Reset();
  179. request.Clear();
  180. response.Clear();
  181. request.AddCommand("set hello world2");
  182. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  183. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  184. ASSERT_EQ(1, response.reply_size());
  185. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
  186. ASSERT_EQ("OK", response.reply(0).data());
  187. cntl.Reset();
  188. request.Clear();
  189. response.Clear();
  190. ASSERT_TRUE(request.AddCommand("get hello"));
  191. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  192. ASSERT_FALSE(cntl.Failed());
  193. ASSERT_EQ(1, response.reply_size());
  194. ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(0).type());
  195. ASSERT_EQ("world2", response.reply(0).data());
  196. cntl.Reset();
  197. request.Clear();
  198. response.Clear();
  199. ASSERT_TRUE(request.AddCommand("del hello"));
  200. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  201. ASSERT_FALSE(cntl.Failed());
  202. ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(0).type());
  203. ASSERT_EQ(1, response.reply(0).integer());
  204. cntl.Reset();
  205. request.Clear();
  206. response.Clear();
  207. ASSERT_TRUE(request.AddCommand("get %s", "hello"));
  208. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  209. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  210. ASSERT_EQ(1, response.reply_size());
  211. ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(0).type());
  212. }
  213. TEST_F(RedisTest, keys_with_spaces) {
  214. if (g_redis_pid < 0) {
  215. puts("Skipped due to absence of redis-server");
  216. return;
  217. }
  218. brpc::ChannelOptions options;
  219. options.protocol = brpc::PROTOCOL_REDIS;
  220. brpc::Channel channel;
  221. ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
  222. brpc::RedisRequest request;
  223. brpc::RedisResponse response;
  224. brpc::Controller cntl;
  225. cntl.Reset();
  226. request.Clear();
  227. response.Clear();
  228. ASSERT_TRUE(request.AddCommand("set %s 'he1 he1 da1'", "hello world"));
  229. ASSERT_TRUE(request.AddCommand("set 'hello2 world2' 'he2 he2 da2'"));
  230. ASSERT_TRUE(request.AddCommand("set \"hello3 world3\" \"he3 he3 da3\""));
  231. ASSERT_TRUE(request.AddCommand("get \"hello world\""));
  232. ASSERT_TRUE(request.AddCommand("get 'hello world'"));
  233. ASSERT_TRUE(request.AddCommand("get 'hello2 world2'"));
  234. ASSERT_TRUE(request.AddCommand("get 'hello3 world3'"));
  235. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  236. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  237. ASSERT_EQ(7, response.reply_size());
  238. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
  239. ASSERT_EQ("OK", response.reply(0).data());
  240. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(1).type());
  241. ASSERT_EQ("OK", response.reply(1).data());
  242. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
  243. ASSERT_EQ("OK", response.reply(2).data());
  244. ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
  245. ASSERT_EQ("he1 he1 da1", response.reply(3).data());
  246. ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(4).type());
  247. ASSERT_EQ("he1 he1 da1", response.reply(4).data());
  248. ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(5).type());
  249. ASSERT_EQ("he2 he2 da2", response.reply(5).data());
  250. ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(6).type());
  251. ASSERT_EQ("he3 he3 da3", response.reply(6).data());
  252. brpc::RedisResponse response2 = response;
  253. AssertResponseEqual(response2, response);
  254. response2.MergeFrom(response);
  255. AssertResponseEqual(response2, response, 2);
  256. }
  257. TEST_F(RedisTest, incr_and_decr) {
  258. if (g_redis_pid < 0) {
  259. puts("Skipped due to absence of redis-server");
  260. return;
  261. }
  262. brpc::ChannelOptions options;
  263. options.protocol = brpc::PROTOCOL_REDIS;
  264. brpc::Channel channel;
  265. ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
  266. brpc::RedisRequest request;
  267. brpc::RedisResponse response;
  268. brpc::Controller cntl;
  269. request.AddCommand("incr counter1");
  270. request.AddCommand("decr counter1");
  271. request.AddCommand("incrby counter1 %d", 10);
  272. request.AddCommand("decrby counter1 %d", 20);
  273. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  274. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  275. ASSERT_EQ(4, response.reply_size());
  276. ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(0).type());
  277. ASSERT_EQ(1, response.reply(0).integer());
  278. ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(1).type());
  279. ASSERT_EQ(0, response.reply(1).integer());
  280. ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(2).type());
  281. ASSERT_EQ(10, response.reply(2).integer());
  282. ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(3).type());
  283. ASSERT_EQ(-10, response.reply(3).integer());
  284. brpc::RedisResponse response2 = response;
  285. AssertResponseEqual(response2, response);
  286. response2.MergeFrom(response);
  287. AssertResponseEqual(response2, response, 2);
  288. }
  289. TEST_F(RedisTest, by_components) {
  290. if (g_redis_pid < 0) {
  291. puts("Skipped due to absence of redis-server");
  292. return;
  293. }
  294. brpc::ChannelOptions options;
  295. options.protocol = brpc::PROTOCOL_REDIS;
  296. brpc::Channel channel;
  297. ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
  298. brpc::RedisRequest request;
  299. brpc::RedisResponse response;
  300. brpc::Controller cntl;
  301. butil::StringPiece comp1[] = { "incr", "counter2" };
  302. butil::StringPiece comp2[] = { "decr", "counter2" };
  303. butil::StringPiece comp3[] = { "incrby", "counter2", "10" };
  304. butil::StringPiece comp4[] = { "decrby", "counter2", "20" };
  305. request.AddCommandByComponents(comp1, arraysize(comp1));
  306. request.AddCommandByComponents(comp2, arraysize(comp2));
  307. request.AddCommandByComponents(comp3, arraysize(comp3));
  308. request.AddCommandByComponents(comp4, arraysize(comp4));
  309. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  310. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  311. ASSERT_EQ(4, response.reply_size());
  312. ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(0).type());
  313. ASSERT_EQ(1, response.reply(0).integer());
  314. ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(1).type());
  315. ASSERT_EQ(0, response.reply(1).integer());
  316. ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(2).type());
  317. ASSERT_EQ(10, response.reply(2).integer());
  318. ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(3).type());
  319. ASSERT_EQ(-10, response.reply(3).integer());
  320. brpc::RedisResponse response2 = response;
  321. AssertResponseEqual(response2, response);
  322. response2.MergeFrom(response);
  323. AssertResponseEqual(response2, response, 2);
  324. }
  325. static std::string GeneratePassword() {
  326. std::string result;
  327. result.reserve(12);
  328. for (size_t i = 0; i < result.capacity(); ++i) {
  329. result.push_back(butil::fast_rand_in('a', 'z'));
  330. }
  331. return result;
  332. }
  333. TEST_F(RedisTest, auth) {
  334. if (g_redis_pid < 0) {
  335. puts("Skipped due to absence of redis-server");
  336. return;
  337. }
  338. // generate a random password
  339. const std::string passwd1 = GeneratePassword();
  340. const std::string passwd2 = GeneratePassword();
  341. LOG(INFO) << "Generated passwd1=" << passwd1 << " passwd2=" << passwd2;
  342. // config auth
  343. {
  344. brpc::ChannelOptions options;
  345. options.protocol = brpc::PROTOCOL_REDIS;
  346. brpc::Channel channel;
  347. ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
  348. brpc::RedisRequest request;
  349. brpc::RedisResponse response;
  350. brpc::Controller cntl;
  351. request.AddCommand("set mykey %s", passwd1.c_str());
  352. request.AddCommand("config set requirepass %s", passwd1.c_str());
  353. request.AddCommand("auth %s", passwd1.c_str());
  354. request.AddCommand("get mykey");
  355. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  356. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  357. ASSERT_EQ(4, response.reply_size());
  358. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
  359. ASSERT_STREQ("OK", response.reply(0).c_str());
  360. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(1).type());
  361. ASSERT_STREQ("OK", response.reply(1).c_str());
  362. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
  363. ASSERT_STREQ("OK", response.reply(2).c_str());
  364. ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
  365. ASSERT_STREQ(passwd1.c_str(), response.reply(3).c_str());
  366. }
  367. // Auth failed
  368. {
  369. brpc::ChannelOptions options;
  370. options.protocol = brpc::PROTOCOL_REDIS;
  371. brpc::Channel channel;
  372. ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
  373. brpc::RedisRequest request;
  374. brpc::RedisResponse response;
  375. brpc::Controller cntl;
  376. request.AddCommand("get mykey");
  377. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  378. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  379. ASSERT_EQ(1, response.reply_size());
  380. ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(0).type());
  381. }
  382. // Auth with RedisAuthenticator and change to passwd2 (setting to empty
  383. // pass does not work on redis 6.0.6)
  384. {
  385. brpc::ChannelOptions options;
  386. options.protocol = brpc::PROTOCOL_REDIS;
  387. brpc::Channel channel;
  388. brpc::policy::RedisAuthenticator* auth =
  389. new brpc::policy::RedisAuthenticator(passwd1.c_str());
  390. options.auth = auth;
  391. ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
  392. brpc::RedisRequest request;
  393. brpc::RedisResponse response;
  394. brpc::Controller cntl;
  395. request.AddCommand("get mykey");
  396. request.AddCommand("config set requirepass %s", passwd2.c_str());
  397. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  398. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  399. ASSERT_EQ(2, response.reply_size());
  400. ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(0).type());
  401. ASSERT_STREQ(passwd1.c_str(), response.reply(0).c_str());
  402. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(1).type());
  403. ASSERT_STREQ("OK", response.reply(1).c_str());
  404. }
  405. // Auth with passwd2
  406. {
  407. brpc::ChannelOptions options;
  408. options.protocol = brpc::PROTOCOL_REDIS;
  409. brpc::policy::RedisAuthenticator* auth =
  410. new brpc::policy::RedisAuthenticator(passwd2.c_str());
  411. options.auth = auth;
  412. brpc::Channel channel;
  413. ASSERT_EQ(0, channel.Init("0.0.0.0:" REDIS_SERVER_PORT, &options));
  414. brpc::RedisRequest request;
  415. brpc::RedisResponse response;
  416. brpc::Controller cntl;
  417. request.AddCommand("get mykey");
  418. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  419. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  420. ASSERT_EQ(1, response.reply_size());
  421. ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(0).type()) << response.reply(0);
  422. ASSERT_STREQ(passwd1.c_str(), response.reply(0).c_str());
  423. }
  424. }
  425. TEST_F(RedisTest, cmd_format) {
  426. if (g_redis_pid < 0) {
  427. puts("Skipped due to absence of redis-server");
  428. return;
  429. }
  430. brpc::RedisRequest request;
  431. // set empty string
  432. request.AddCommand("set a ''");
  433. ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$0\r\n\r\n",
  434. request._buf.to_string().c_str());
  435. request.Clear();
  436. request.AddCommand("mset b '' c ''");
  437. ASSERT_STREQ("*5\r\n$4\r\nmset\r\n$1\r\nb\r\n$0\r\n\r\n$1\r\nc\r\n$0\r\n\r\n",
  438. request._buf.to_string().c_str());
  439. request.Clear();
  440. // set non-empty string
  441. request.AddCommand("set a 123");
  442. ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$3\r\n123\r\n",
  443. request._buf.to_string().c_str());
  444. request.Clear();
  445. request.AddCommand("mset b '' c ccc");
  446. ASSERT_STREQ("*5\r\n$4\r\nmset\r\n$1\r\nb\r\n$0\r\n\r\n$1\r\nc\r\n$3\r\nccc\r\n",
  447. request._buf.to_string().c_str());
  448. request.Clear();
  449. request.AddCommand("get ''key value"); // == get <empty> key value
  450. ASSERT_STREQ("*4\r\n$3\r\nget\r\n$0\r\n\r\n$3\r\nkey\r\n$5\r\nvalue\r\n", request._buf.to_string().c_str());
  451. request.Clear();
  452. request.AddCommand("get key'' value"); // == get key <empty> value
  453. ASSERT_STREQ("*4\r\n$3\r\nget\r\n$3\r\nkey\r\n$0\r\n\r\n$5\r\nvalue\r\n", request._buf.to_string().c_str());
  454. request.Clear();
  455. request.AddCommand("get 'ext'key value "); // == get ext key value
  456. ASSERT_STREQ("*4\r\n$3\r\nget\r\n$3\r\next\r\n$3\r\nkey\r\n$5\r\nvalue\r\n", request._buf.to_string().c_str());
  457. request.Clear();
  458. request.AddCommand(" get key'ext' value "); // == get key ext value
  459. ASSERT_STREQ("*4\r\n$3\r\nget\r\n$3\r\nkey\r\n$3\r\next\r\n$5\r\nvalue\r\n", request._buf.to_string().c_str());
  460. request.Clear();
  461. }
  462. TEST_F(RedisTest, quote_and_escape) {
  463. if (g_redis_pid < 0) {
  464. puts("Skipped due to absence of redis-server");
  465. return;
  466. }
  467. brpc::RedisRequest request;
  468. request.AddCommand("set a 'foo bar'");
  469. ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$7\r\nfoo bar\r\n",
  470. request._buf.to_string().c_str());
  471. request.Clear();
  472. request.AddCommand("set a 'foo \\'bar'");
  473. ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$8\r\nfoo 'bar\r\n",
  474. request._buf.to_string().c_str());
  475. request.Clear();
  476. request.AddCommand("set a 'foo \"bar'");
  477. ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$8\r\nfoo \"bar\r\n",
  478. request._buf.to_string().c_str());
  479. request.Clear();
  480. request.AddCommand("set a 'foo \\\"bar'");
  481. ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$9\r\nfoo \\\"bar\r\n",
  482. request._buf.to_string().c_str());
  483. request.Clear();
  484. request.AddCommand("set a \"foo 'bar\"");
  485. ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$8\r\nfoo 'bar\r\n",
  486. request._buf.to_string().c_str());
  487. request.Clear();
  488. request.AddCommand("set a \"foo \\'bar\"");
  489. ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$9\r\nfoo \\'bar\r\n",
  490. request._buf.to_string().c_str());
  491. request.Clear();
  492. request.AddCommand("set a \"foo \\\"bar\"");
  493. ASSERT_STREQ("*3\r\n$3\r\nset\r\n$1\r\na\r\n$8\r\nfoo \"bar\r\n",
  494. request._buf.to_string().c_str());
  495. request.Clear();
  496. }
  497. std::string GetCompleteCommand(const std::vector<butil::StringPiece>& commands) {
  498. std::string res;
  499. for (int i = 0; i < (int)commands.size(); ++i) {
  500. if (i != 0) {
  501. res.push_back(' ');
  502. }
  503. res.append(commands[i].data(), commands[i].size());
  504. }
  505. return res;
  506. }
  507. TEST_F(RedisTest, command_parser) {
  508. brpc::RedisCommandParser parser;
  509. butil::IOBuf buf;
  510. std::vector<butil::StringPiece> command_out;
  511. butil::Arena arena;
  512. {
  513. // parse from whole command
  514. std::string command = "set abc edc";
  515. ASSERT_TRUE(brpc::RedisCommandNoFormat(&buf, command.c_str()).ok());
  516. ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &arena));
  517. ASSERT_TRUE(buf.empty());
  518. ASSERT_EQ(command, GetCompleteCommand(command_out));
  519. }
  520. {
  521. // simulate parsing from network
  522. int t = 100;
  523. std::string raw_string("*3\r\n$3\r\nset\r\n$3\r\nabc\r\n$3\r\ndef\r\n");
  524. int size = raw_string.size();
  525. while (t--) {
  526. for (int i = 0; i < size; ++i) {
  527. buf.push_back(raw_string[i]);
  528. if (i == size - 1) {
  529. ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &arena));
  530. } else {
  531. if (butil::fast_rand_less_than(2) == 0) {
  532. ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA,
  533. parser.Consume(buf, &command_out, &arena));
  534. }
  535. }
  536. }
  537. ASSERT_TRUE(buf.empty());
  538. ASSERT_EQ(GetCompleteCommand(command_out), "set abc def");
  539. }
  540. }
  541. {
  542. // there is a non-string message in command and parse should fail
  543. buf.append("*3\r\n$3");
  544. ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, parser.Consume(buf, &command_out, &arena));
  545. ASSERT_EQ((int)buf.size(), 2); // left "$3"
  546. buf.append("\r\nset\r\n:123\r\n$3\r\ndef\r\n");
  547. ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, parser.Consume(buf, &command_out, &arena));
  548. parser.Reset();
  549. }
  550. {
  551. // not array
  552. buf.append(":123456\r\n");
  553. ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
  554. parser.Reset();
  555. }
  556. {
  557. // not array
  558. buf.append("+Error\r\n");
  559. ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
  560. parser.Reset();
  561. }
  562. {
  563. // not array
  564. buf.append("+OK\r\n");
  565. ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
  566. parser.Reset();
  567. }
  568. {
  569. // not array
  570. buf.append("$5\r\nhello\r\n");
  571. ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
  572. parser.Reset();
  573. }
  574. }
  575. TEST_F(RedisTest, redis_reply_codec) {
  576. butil::Arena arena;
  577. // status
  578. {
  579. brpc::RedisReply r(&arena);
  580. butil::IOBuf buf;
  581. butil::IOBufAppender appender;
  582. r.SetStatus("OK");
  583. ASSERT_TRUE(r.SerializeTo(&appender));
  584. appender.move_to(buf);
  585. ASSERT_STREQ(buf.to_string().c_str(), "+OK\r\n");
  586. ASSERT_STREQ(r.c_str(), "OK");
  587. brpc::RedisReply r2(&arena);
  588. brpc::ParseError err = r2.ConsumePartialIOBuf(buf);
  589. ASSERT_EQ(err, brpc::PARSE_OK);
  590. ASSERT_TRUE(r2.is_string());
  591. ASSERT_STREQ("OK", r2.c_str());
  592. }
  593. // error
  594. {
  595. brpc::RedisReply r(&arena);
  596. butil::IOBuf buf;
  597. butil::IOBufAppender appender;
  598. r.SetError("not exist \'key\'");
  599. ASSERT_TRUE(r.SerializeTo(&appender));
  600. appender.move_to(buf);
  601. ASSERT_STREQ(buf.to_string().c_str(), "-not exist \'key\'\r\n");
  602. brpc::RedisReply r2(&arena);
  603. brpc::ParseError err = r2.ConsumePartialIOBuf(buf);
  604. ASSERT_EQ(err, brpc::PARSE_OK);
  605. ASSERT_TRUE(r2.is_error());
  606. ASSERT_STREQ("not exist \'key\'", r2.error_message());
  607. }
  608. // string
  609. {
  610. brpc::RedisReply r(&arena);
  611. butil::IOBuf buf;
  612. butil::IOBufAppender appender;
  613. r.SetNullString();
  614. ASSERT_TRUE(r.SerializeTo(&appender));
  615. appender.move_to(buf);
  616. ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n");
  617. brpc::RedisReply r2(&arena);
  618. brpc::ParseError err = r2.ConsumePartialIOBuf(buf);
  619. ASSERT_EQ(err, brpc::PARSE_OK);
  620. ASSERT_TRUE(r2.is_nil());
  621. r.SetString("abcde'hello world");
  622. ASSERT_TRUE(r.SerializeTo(&appender));
  623. appender.move_to(buf);
  624. ASSERT_STREQ(buf.to_string().c_str(), "$17\r\nabcde'hello world\r\n");
  625. ASSERT_STREQ("abcde'hello world", r.c_str());
  626. r.FormatString("int:%d str:%s fp:%.2f", 123, "foobar", 3.21);
  627. ASSERT_TRUE(r.SerializeTo(&appender));
  628. appender.move_to(buf);
  629. ASSERT_STREQ(buf.to_string().c_str(), "$26\r\nint:123 str:foobar fp:3.21\r\n");
  630. ASSERT_STREQ("int:123 str:foobar fp:3.21", r.c_str());
  631. r.FormatString("verylongstring verylongstring verylongstring verylongstring int:%d str:%s fp:%.2f", 123, "foobar", 3.21);
  632. ASSERT_TRUE(r.SerializeTo(&appender));
  633. appender.move_to(buf);
  634. ASSERT_STREQ(buf.to_string().c_str(), "$86\r\nverylongstring verylongstring verylongstring verylongstring int:123 str:foobar fp:3.21\r\n");
  635. ASSERT_STREQ("verylongstring verylongstring verylongstring verylongstring int:123 str:foobar fp:3.21", r.c_str());
  636. brpc::RedisReply r3(&arena);
  637. err = r3.ConsumePartialIOBuf(buf);
  638. ASSERT_EQ(err, brpc::PARSE_OK);
  639. ASSERT_TRUE(r3.is_string());
  640. ASSERT_STREQ(r.c_str(), r3.c_str());
  641. }
  642. // integer
  643. {
  644. brpc::RedisReply r(&arena);
  645. butil::IOBuf buf;
  646. butil::IOBufAppender appender;
  647. int t = 2;
  648. int input[] = { -1, 1234567 };
  649. const char* output[] = { ":-1\r\n", ":1234567\r\n" };
  650. for (int i = 0; i < t; ++i) {
  651. r.SetInteger(input[i]);
  652. ASSERT_TRUE(r.SerializeTo(&appender));
  653. appender.move_to(buf);
  654. ASSERT_STREQ(buf.to_string().c_str(), output[i]);
  655. brpc::RedisReply r2(&arena);
  656. brpc::ParseError err = r2.ConsumePartialIOBuf(buf);
  657. ASSERT_EQ(err, brpc::PARSE_OK);
  658. ASSERT_TRUE(r2.is_integer());
  659. ASSERT_EQ(r2.integer(), input[i]);
  660. }
  661. }
  662. // array
  663. {
  664. brpc::RedisReply r(&arena);
  665. butil::IOBuf buf;
  666. butil::IOBufAppender appender;
  667. r.SetArray(3);
  668. brpc::RedisReply& sub_reply = r[0];
  669. sub_reply.SetArray(2);
  670. sub_reply[0].SetString("hello, it's me");
  671. sub_reply[1].SetInteger(422);
  672. r[1].SetString("To go over everything");
  673. r[2].SetInteger(1);
  674. ASSERT_TRUE(r[3].is_nil());
  675. ASSERT_TRUE(r.SerializeTo(&appender));
  676. appender.move_to(buf);
  677. ASSERT_STREQ(buf.to_string().c_str(),
  678. "*3\r\n*2\r\n$14\r\nhello, it's me\r\n:422\r\n$21\r\n"
  679. "To go over everything\r\n:1\r\n");
  680. brpc::RedisReply r2(&arena);
  681. ASSERT_EQ(r2.ConsumePartialIOBuf(buf), brpc::PARSE_OK);
  682. ASSERT_TRUE(r2.is_array());
  683. ASSERT_EQ(3ul, r2.size());
  684. ASSERT_TRUE(r2[0].is_array());
  685. ASSERT_EQ(2ul, r2[0].size());
  686. ASSERT_TRUE(r2[0][0].is_string());
  687. ASSERT_STREQ(r2[0][0].c_str(), "hello, it's me");
  688. ASSERT_TRUE(r2[0][1].is_integer());
  689. ASSERT_EQ(r2[0][1].integer(), 422);
  690. ASSERT_TRUE(r2[1].is_string());
  691. ASSERT_STREQ(r2[1].c_str(), "To go over everything");
  692. ASSERT_TRUE(r2[2].is_integer());
  693. ASSERT_EQ(1, r2[2].integer());
  694. // null array
  695. r.SetNullArray();
  696. ASSERT_TRUE(r.SerializeTo(&appender));
  697. appender.move_to(buf);
  698. ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n");
  699. ASSERT_EQ(r.ConsumePartialIOBuf(buf), brpc::PARSE_OK);
  700. ASSERT_TRUE(r.is_nil());
  701. }
  702. // CopyFromDifferentArena
  703. {
  704. brpc::RedisReply r(&arena);
  705. r.SetArray(1);
  706. brpc::RedisReply& sub_reply = r[0];
  707. sub_reply.SetArray(2);
  708. sub_reply[0].SetString("hello, it's me");
  709. sub_reply[1].SetInteger(422);
  710. brpc::RedisReply r2(&arena);
  711. r2.CopyFromDifferentArena(r);
  712. ASSERT_TRUE(r2.is_array());
  713. ASSERT_EQ((int)r2[0].size(), 2);
  714. ASSERT_STREQ(r2[0][0].c_str(), sub_reply[0].c_str());
  715. ASSERT_EQ(r2[0][1].integer(), sub_reply[1].integer());
  716. }
  717. // SetXXX can be called multiple times.
  718. {
  719. brpc::RedisReply r(&arena);
  720. r.SetStatus("OK");
  721. ASSERT_TRUE(r.is_string());
  722. r.SetNullString();
  723. ASSERT_TRUE(r.is_nil());
  724. r.SetArray(2);
  725. ASSERT_TRUE(r.is_array());
  726. r.SetString("OK");
  727. ASSERT_TRUE(r.is_string());
  728. r.SetError("OK");
  729. ASSERT_TRUE(r.is_error());
  730. r.SetInteger(42);
  731. ASSERT_TRUE(r.is_integer());
  732. }
  733. }
  734. butil::Mutex s_mutex;
  735. std::unordered_map<std::string, std::string> m;
  736. std::unordered_map<std::string, int64_t> int_map;
  737. class RedisServiceImpl : public brpc::RedisService {
  738. public:
  739. RedisServiceImpl()
  740. : _batch_count(0) {}
  741. brpc::RedisCommandHandlerResult OnBatched(const std::vector<butil::StringPiece>& args,
  742. brpc::RedisReply* output, bool flush_batched) {
  743. if (_batched_command.empty() && flush_batched) {
  744. if (args[0] == "set") {
  745. DoSet(args[1].as_string(), args[2].as_string(), output);
  746. } else if (args[0] == "get") {
  747. DoGet(args[1].as_string(), output);
  748. }
  749. return brpc::REDIS_CMD_HANDLED;
  750. }
  751. std::vector<std::string> comm;
  752. for (int i = 0; i < (int)args.size(); ++i) {
  753. comm.push_back(args[i].as_string());
  754. }
  755. _batched_command.push_back(comm);
  756. if (flush_batched) {
  757. output->SetArray(_batched_command.size());
  758. for (int i = 0; i < (int)_batched_command.size(); ++i) {
  759. if (_batched_command[i][0] == "set") {
  760. DoSet(_batched_command[i][1], _batched_command[i][2], &(*output)[i]);
  761. } else if (_batched_command[i][0] == "get") {
  762. DoGet(_batched_command[i][1], &(*output)[i]);
  763. }
  764. }
  765. _batch_count++;
  766. _batched_command.clear();
  767. return brpc::REDIS_CMD_HANDLED;
  768. } else {
  769. return brpc::REDIS_CMD_BATCHED;
  770. }
  771. }
  772. void DoSet(const std::string& key, const std::string& value, brpc::RedisReply* output) {
  773. m[key] = value;
  774. output->SetStatus("OK");
  775. }
  776. void DoGet(const std::string& key, brpc::RedisReply* output) {
  777. auto it = m.find(key);
  778. if (it != m.end()) {
  779. output->SetString(it->second);
  780. } else {
  781. output->SetNullString();
  782. }
  783. }
  784. std::vector<std::vector<std::string> > _batched_command;
  785. int _batch_count;
  786. };
  787. class SetCommandHandler : public brpc::RedisCommandHandler {
  788. public:
  789. SetCommandHandler(RedisServiceImpl* rs, bool batch_process = false)
  790. : _rs(rs)
  791. , _batch_process(batch_process) {}
  792. brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
  793. brpc::RedisReply* output,
  794. bool flush_batched) {
  795. if (args.size() < 3) {
  796. output->SetError("ERR wrong number of arguments for 'set' command");
  797. return brpc::REDIS_CMD_HANDLED;
  798. }
  799. if (_batch_process) {
  800. return _rs->OnBatched(args, output, flush_batched);
  801. } else {
  802. DoSet(args[1].as_string(), args[2].as_string(), output);
  803. return brpc::REDIS_CMD_HANDLED;
  804. }
  805. }
  806. void DoSet(const std::string& key, const std::string& value, brpc::RedisReply* output) {
  807. m[key] = value;
  808. output->SetStatus("OK");
  809. }
  810. private:
  811. RedisServiceImpl* _rs;
  812. bool _batch_process;
  813. };
  814. class GetCommandHandler : public brpc::RedisCommandHandler {
  815. public:
  816. GetCommandHandler(RedisServiceImpl* rs, bool batch_process = false)
  817. : _rs(rs)
  818. , _batch_process(batch_process) {}
  819. brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
  820. brpc::RedisReply* output,
  821. bool flush_batched) {
  822. if (args.size() < 2) {
  823. output->SetError("ERR wrong number of arguments for 'get' command");
  824. return brpc::REDIS_CMD_HANDLED;
  825. }
  826. if (_batch_process) {
  827. return _rs->OnBatched(args, output, flush_batched);
  828. } else {
  829. DoGet(args[1].as_string(), output);
  830. return brpc::REDIS_CMD_HANDLED;
  831. }
  832. }
  833. void DoGet(const std::string& key, brpc::RedisReply* output) {
  834. auto it = m.find(key);
  835. if (it != m.end()) {
  836. output->SetString(it->second);
  837. } else {
  838. output->SetNullString();
  839. }
  840. }
  841. private:
  842. RedisServiceImpl* _rs;
  843. bool _batch_process;
  844. };
  845. class IncrCommandHandler : public brpc::RedisCommandHandler {
  846. public:
  847. IncrCommandHandler() {}
  848. brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
  849. brpc::RedisReply* output,
  850. bool flush_batched) {
  851. if (args.size() < 2) {
  852. output->SetError("ERR wrong number of arguments for 'incr' command");
  853. return brpc::REDIS_CMD_HANDLED;
  854. }
  855. int64_t value;
  856. s_mutex.lock();
  857. value = ++int_map[args[1].as_string()];
  858. s_mutex.unlock();
  859. output->SetInteger(value);
  860. return brpc::REDIS_CMD_HANDLED;
  861. }
  862. };
  863. TEST_F(RedisTest, server_sanity) {
  864. brpc::Server server;
  865. brpc::ServerOptions server_options;
  866. RedisServiceImpl* rsimpl = new RedisServiceImpl;
  867. GetCommandHandler *gh = new GetCommandHandler(rsimpl);
  868. SetCommandHandler *sh = new SetCommandHandler(rsimpl);
  869. IncrCommandHandler *ih = new IncrCommandHandler;
  870. rsimpl->AddCommandHandler("get", gh);
  871. rsimpl->AddCommandHandler("set", sh);
  872. rsimpl->AddCommandHandler("incr", ih);
  873. server_options.redis_service = rsimpl;
  874. brpc::PortRange pr(8081, 8900);
  875. ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
  876. brpc::ChannelOptions options;
  877. options.protocol = brpc::PROTOCOL_REDIS;
  878. brpc::Channel channel;
  879. ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));
  880. brpc::RedisRequest request;
  881. brpc::RedisResponse response;
  882. brpc::Controller cntl;
  883. ASSERT_TRUE(request.AddCommand("get hello"));
  884. ASSERT_TRUE(request.AddCommand("get hello2"));
  885. ASSERT_TRUE(request.AddCommand("set key1 value1"));
  886. ASSERT_TRUE(request.AddCommand("get key1"));
  887. ASSERT_TRUE(request.AddCommand("set key2 value2"));
  888. ASSERT_TRUE(request.AddCommand("get key2"));
  889. ASSERT_TRUE(request.AddCommand("xxxcommand key2"));
  890. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  891. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  892. ASSERT_EQ(7, response.reply_size());
  893. ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(0).type());
  894. ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(1).type());
  895. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
  896. ASSERT_STREQ("OK", response.reply(2).c_str());
  897. ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
  898. ASSERT_STREQ("value1", response.reply(3).c_str());
  899. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(4).type());
  900. ASSERT_STREQ("OK", response.reply(4).c_str());
  901. ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(5).type());
  902. ASSERT_STREQ("value2", response.reply(5).c_str());
  903. ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(6).type());
  904. ASSERT_TRUE(butil::StringPiece(response.reply(6).error_message()).starts_with("ERR unknown command"));
  905. cntl.Reset();
  906. request.Clear();
  907. response.Clear();
  908. std::string value3("value3");
  909. value3.append(1, '\0');
  910. value3.append(1, 'a');
  911. std::vector<butil::StringPiece> pieces;
  912. pieces.push_back("set");
  913. pieces.push_back("key3");
  914. pieces.push_back(value3);
  915. ASSERT_TRUE(request.AddCommandByComponents(&pieces[0], pieces.size()));
  916. ASSERT_TRUE(request.AddCommand("set key4 \"\""));
  917. ASSERT_TRUE(request.AddCommand("get key3"));
  918. ASSERT_TRUE(request.AddCommand("get key4"));
  919. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  920. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  921. ASSERT_EQ(4, response.reply_size());
  922. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
  923. ASSERT_STREQ("OK", response.reply(0).c_str());
  924. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(1).type());
  925. ASSERT_STREQ("OK", response.reply(1).c_str());
  926. ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(2).type());
  927. ASSERT_STREQ("value3", response.reply(2).c_str());
  928. ASSERT_NE("value3", response.reply(2).data());
  929. ASSERT_EQ(value3, response.reply(2).data());
  930. ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
  931. ASSERT_EQ("", response.reply(3).data());
  932. }
  933. void* incr_thread(void* arg) {
  934. brpc::Channel* c = static_cast<brpc::Channel*>(arg);
  935. for (int i = 0; i < 5000; ++i) {
  936. brpc::RedisRequest request;
  937. brpc::RedisResponse response;
  938. brpc::Controller cntl;
  939. EXPECT_TRUE(request.AddCommand("incr count"));
  940. c->CallMethod(NULL, &cntl, &request, &response, NULL);
  941. EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
  942. EXPECT_EQ(1, response.reply_size());
  943. EXPECT_TRUE(response.reply(0).is_integer());
  944. }
  945. return NULL;
  946. }
  947. TEST_F(RedisTest, server_concurrency) {
  948. int N = 10;
  949. brpc::Server server;
  950. brpc::ServerOptions server_options;
  951. RedisServiceImpl* rsimpl = new RedisServiceImpl;
  952. IncrCommandHandler *ih = new IncrCommandHandler;
  953. rsimpl->AddCommandHandler("incr", ih);
  954. server_options.redis_service = rsimpl;
  955. brpc::PortRange pr(8081, 8900);
  956. ASSERT_EQ(0, server.Start("0.0.0.0", pr, &server_options));
  957. brpc::ChannelOptions options;
  958. options.protocol = brpc::PROTOCOL_REDIS;
  959. options.connection_type = "pooled";
  960. std::vector<bthread_t> bths;
  961. std::vector<brpc::Channel*> channels;
  962. for (int i = 0; i < N; ++i) {
  963. channels.push_back(new brpc::Channel);
  964. ASSERT_EQ(0, channels.back()->Init("127.0.0.1", server.listen_address().port, &options));
  965. bthread_t bth;
  966. ASSERT_EQ(bthread_start_background(&bth, NULL, incr_thread, channels.back()), 0);
  967. bths.push_back(bth);
  968. }
  969. for (int i = 0; i < N; ++i) {
  970. bthread_join(bths[i], NULL);
  971. delete channels[i];
  972. }
  973. ASSERT_EQ(int_map["count"], 10 * 5000LL);
  974. }
  975. class MultiCommandHandler : public brpc::RedisCommandHandler {
  976. public:
  977. MultiCommandHandler() {}
  978. brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
  979. brpc::RedisReply* output,
  980. bool flush_batched) {
  981. output->SetStatus("OK");
  982. return brpc::REDIS_CMD_CONTINUE;
  983. }
  984. RedisCommandHandler* NewTransactionHandler() override {
  985. return new MultiTransactionHandler;
  986. }
  987. class MultiTransactionHandler : public brpc::RedisCommandHandler {
  988. public:
  989. brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
  990. brpc::RedisReply* output,
  991. bool flush_batched) {
  992. if (args[0] == "multi") {
  993. output->SetError("ERR duplicate multi");
  994. return brpc::REDIS_CMD_CONTINUE;
  995. }
  996. if (args[0] != "exec") {
  997. std::vector<std::string> comm;
  998. for (int i = 0; i < (int)args.size(); ++i) {
  999. comm.push_back(args[i].as_string());
  1000. }
  1001. _commands.push_back(comm);
  1002. output->SetStatus("QUEUED");
  1003. return brpc::REDIS_CMD_CONTINUE;
  1004. }
  1005. output->SetArray(_commands.size());
  1006. s_mutex.lock();
  1007. for (size_t i = 0; i < _commands.size(); ++i) {
  1008. if (_commands[i][0] == "incr") {
  1009. int64_t value;
  1010. value = ++int_map[_commands[i][1]];
  1011. (*output)[i].SetInteger(value);
  1012. } else {
  1013. (*output)[i].SetStatus("unknown command");
  1014. }
  1015. }
  1016. s_mutex.unlock();
  1017. return brpc::REDIS_CMD_HANDLED;
  1018. }
  1019. private:
  1020. std::vector<std::vector<std::string> > _commands;
  1021. };
  1022. };
  1023. TEST_F(RedisTest, server_command_continue) {
  1024. brpc::Server server;
  1025. brpc::ServerOptions server_options;
  1026. RedisServiceImpl* rsimpl = new RedisServiceImpl;
  1027. rsimpl->AddCommandHandler("get", new GetCommandHandler(rsimpl));
  1028. rsimpl->AddCommandHandler("set", new SetCommandHandler(rsimpl));
  1029. rsimpl->AddCommandHandler("incr", new IncrCommandHandler);
  1030. rsimpl->AddCommandHandler("multi", new MultiCommandHandler);
  1031. server_options.redis_service = rsimpl;
  1032. brpc::PortRange pr(8081, 8900);
  1033. ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
  1034. brpc::ChannelOptions options;
  1035. options.protocol = brpc::PROTOCOL_REDIS;
  1036. brpc::Channel channel;
  1037. ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));
  1038. {
  1039. brpc::RedisRequest request;
  1040. brpc::RedisResponse response;
  1041. brpc::Controller cntl;
  1042. ASSERT_TRUE(request.AddCommand("set hello world"));
  1043. ASSERT_TRUE(request.AddCommand("get hello"));
  1044. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  1045. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  1046. ASSERT_EQ(2, response.reply_size());
  1047. ASSERT_STREQ("world", response.reply(1).c_str());
  1048. }
  1049. {
  1050. brpc::RedisRequest request;
  1051. brpc::RedisResponse response;
  1052. brpc::Controller cntl;
  1053. ASSERT_TRUE(request.AddCommand("multi"));
  1054. ASSERT_TRUE(request.AddCommand("mUltI"));
  1055. int count = 10;
  1056. for (int i = 0; i < count; ++i) {
  1057. ASSERT_TRUE(request.AddCommand("incr hello 1"));
  1058. }
  1059. ASSERT_TRUE(request.AddCommand("exec"));
  1060. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  1061. ASSERT_EQ(13, response.reply_size());
  1062. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  1063. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
  1064. ASSERT_STREQ("OK", response.reply(0).c_str());
  1065. ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(1).type());
  1066. for (int i = 2; i < count + 2; ++i) {
  1067. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(i).type());
  1068. ASSERT_STREQ("QUEUED", response.reply(i).c_str());
  1069. }
  1070. const brpc::RedisReply& m = response.reply(count + 2);
  1071. ASSERT_EQ(count, (int)m.size());
  1072. for (int i = 0; i < count; ++i) {
  1073. ASSERT_EQ(i+1, m[i].integer());
  1074. }
  1075. }
  1076. // After 'multi', normal requests should be successful
  1077. {
  1078. brpc::RedisRequest request;
  1079. brpc::RedisResponse response;
  1080. brpc::Controller cntl;
  1081. ASSERT_TRUE(request.AddCommand("get hello"));
  1082. ASSERT_TRUE(request.AddCommand("get hello2"));
  1083. ASSERT_TRUE(request.AddCommand("set key1 value1"));
  1084. ASSERT_TRUE(request.AddCommand("get key1"));
  1085. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  1086. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  1087. ASSERT_STREQ("world", response.reply(0).c_str());
  1088. ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(1).type());
  1089. ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
  1090. ASSERT_STREQ("OK", response.reply(2).c_str());
  1091. ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
  1092. ASSERT_STREQ("value1", response.reply(3).c_str());
  1093. }
  1094. }
  1095. TEST_F(RedisTest, server_handle_pipeline) {
  1096. brpc::Server server;
  1097. brpc::ServerOptions server_options;
  1098. RedisServiceImpl* rsimpl = new RedisServiceImpl;
  1099. GetCommandHandler* getch = new GetCommandHandler(rsimpl, true);
  1100. SetCommandHandler* setch = new SetCommandHandler(rsimpl, true);
  1101. rsimpl->AddCommandHandler("get", getch);
  1102. rsimpl->AddCommandHandler("set", setch);
  1103. rsimpl->AddCommandHandler("multi", new MultiCommandHandler);
  1104. server_options.redis_service = rsimpl;
  1105. brpc::PortRange pr(8081, 8900);
  1106. ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
  1107. brpc::ChannelOptions options;
  1108. options.protocol = brpc::PROTOCOL_REDIS;
  1109. brpc::Channel channel;
  1110. ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));
  1111. brpc::RedisRequest request;
  1112. brpc::RedisResponse response;
  1113. brpc::Controller cntl;
  1114. ASSERT_TRUE(request.AddCommand("set key1 v1"));
  1115. ASSERT_TRUE(request.AddCommand("set key2 v2"));
  1116. ASSERT_TRUE(request.AddCommand("set key3 v3"));
  1117. ASSERT_TRUE(request.AddCommand("get hello"));
  1118. ASSERT_TRUE(request.AddCommand("get hello"));
  1119. ASSERT_TRUE(request.AddCommand("set key1 world"));
  1120. ASSERT_TRUE(request.AddCommand("set key2 world"));
  1121. ASSERT_TRUE(request.AddCommand("get key2"));
  1122. channel.CallMethod(NULL, &cntl, &request, &response, NULL);
  1123. ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
  1124. ASSERT_EQ(8, response.reply_size());
  1125. ASSERT_EQ(1, rsimpl->_batch_count);
  1126. ASSERT_TRUE(response.reply(7).is_string());
  1127. ASSERT_STREQ(response.reply(7).c_str(), "world");
  1128. }
  1129. } //namespace