redis.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #ifndef BRPC_REDIS_H
  18. #define BRPC_REDIS_H
  19. #include <google/protobuf/message.h>
  20. #include <unordered_map>
  21. #include <memory>
  22. #include "butil/iobuf.h"
  23. #include "butil/strings/string_piece.h"
  24. #include "butil/arena.h"
  25. #include "brpc/proto_base.pb.h"
  26. #include "brpc/redis_reply.h"
  27. #include "brpc/parse_result.h"
  28. #include "brpc/callback.h"
  29. #include "brpc/socket.h"
  30. namespace brpc {
  31. // Request to redis.
  32. // Notice that you can pipeline multiple commands in one request and sent
  33. // them to ONE redis-server together.
  34. // Example:
  35. // RedisRequest request;
  36. // request.AddCommand("PING");
  37. // RedisResponse response;
  38. // channel.CallMethod(&controller, &request, &response, NULL/*done*/);
  39. // if (!cntl.Failed()) {
  40. // LOG(INFO) << response.reply(0);
  41. // }
  42. class RedisRequest : public ::google::protobuf::Message {
  43. public:
  44. RedisRequest();
  45. virtual ~RedisRequest();
  46. RedisRequest(const RedisRequest& from);
  47. inline RedisRequest& operator=(const RedisRequest& from) {
  48. CopyFrom(from);
  49. return *this;
  50. }
  51. void Swap(RedisRequest* other);
  52. // Add a command with a va_list to this request. The conversion
  53. // specifiers are compatible with the ones used by hiredis, namely except
  54. // that %b stands for binary data, other specifiers are similar with printf.
  55. bool AddCommandV(const char* fmt, va_list args);
  56. // Concatenate components into a redis command, similarly with
  57. // redisCommandArgv() in hiredis.
  58. // Example:
  59. // butil::StringPiece components[] = { "set", "key", "value" };
  60. // request.AddCommandByComponents(components, arraysize(components));
  61. bool AddCommandByComponents(const butil::StringPiece* components, size_t n);
  62. // Add a command with variadic args to this request.
  63. // The reason that adding so many overloads rather than using ... is that
  64. // it's the only way to dispatch the AddCommand w/o args differently.
  65. bool AddCommand(const butil::StringPiece& command);
  66. template <typename A1>
  67. bool AddCommand(const char* format, A1 a1)
  68. { return AddCommandWithArgs(format, a1); }
  69. template <typename A1, typename A2>
  70. bool AddCommand(const char* format, A1 a1, A2 a2)
  71. { return AddCommandWithArgs(format, a1, a2); }
  72. template <typename A1, typename A2, typename A3>
  73. bool AddCommand(const char* format, A1 a1, A2 a2, A3 a3)
  74. { return AddCommandWithArgs(format, a1, a2, a3); }
  75. template <typename A1, typename A2, typename A3, typename A4>
  76. bool AddCommand(const char* format, A1 a1, A2 a2, A3 a3, A4 a4)
  77. { return AddCommandWithArgs(format, a1, a2, a3, a4); }
  78. template <typename A1, typename A2, typename A3, typename A4, typename A5>
  79. bool AddCommand(const char* format, A1 a1, A2 a2, A3 a3, A4 a4, A5 a5)
  80. { return AddCommandWithArgs(format, a1, a2, a3, a4, a5); }
  81. template <typename A1, typename A2, typename A3, typename A4, typename A5, typename A6>
  82. bool AddCommand(const char* format, A1 a1, A2 a2, A3 a3, A4 a4, A5 a5, A6 a6)
  83. { return AddCommandWithArgs(format, a1, a2, a3, a4, a5, a6); }
  84. // Number of successfully added commands
  85. int command_size() const { return _ncommand; }
  86. // True if previous AddCommand[V] failed.
  87. bool has_error() const { return _has_error; }
  88. // Serialize the request into `buf'. Return true on success.
  89. bool SerializeTo(butil::IOBuf* buf) const;
  90. // Protobuf methods.
  91. RedisRequest* New() const;
  92. void CopyFrom(const ::google::protobuf::Message& from);
  93. void MergeFrom(const ::google::protobuf::Message& from);
  94. void CopyFrom(const RedisRequest& from);
  95. void MergeFrom(const RedisRequest& from);
  96. void Clear();
  97. bool IsInitialized() const;
  98. int ByteSize() const;
  99. bool MergePartialFromCodedStream(
  100. ::google::protobuf::io::CodedInputStream* input);
  101. void SerializeWithCachedSizes(
  102. ::google::protobuf::io::CodedOutputStream* output) const;
  103. ::google::protobuf::uint8* SerializeWithCachedSizesToArray(::google::protobuf::uint8* output) const;
  104. int GetCachedSize() const { return _cached_size_; }
  105. static const ::google::protobuf::Descriptor* descriptor();
  106. void Print(std::ostream&) const;
  107. protected:
  108. ::google::protobuf::Metadata GetMetadata() const override;
  109. private:
  110. void SharedCtor();
  111. void SharedDtor();
  112. void SetCachedSize(int size) const;
  113. bool AddCommandWithArgs(const char* fmt, ...);
  114. int _ncommand; // # of valid commands
  115. bool _has_error; // previous AddCommand had error
  116. butil::IOBuf _buf; // the serialized request.
  117. mutable int _cached_size_; // ByteSize
  118. };
  119. // Response from Redis.
  120. // Notice that a RedisResponse instance may contain multiple replies
  121. // due to pipelining.
  122. class RedisResponse : public ::google::protobuf::Message {
  123. public:
  124. RedisResponse();
  125. virtual ~RedisResponse();
  126. RedisResponse(const RedisResponse& from);
  127. inline RedisResponse& operator=(const RedisResponse& from) {
  128. CopyFrom(from);
  129. return *this;
  130. }
  131. void Swap(RedisResponse* other);
  132. // Number of replies in this response.
  133. // (May have more than one reply due to pipeline)
  134. int reply_size() const { return _nreply; }
  135. // Get index-th reply. If index is out-of-bound, nil reply is returned.
  136. const RedisReply& reply(int index) const {
  137. if (index < reply_size()) {
  138. return (index == 0 ? _first_reply : _other_replies[index - 1]);
  139. }
  140. static RedisReply redis_nil(NULL);
  141. return redis_nil;
  142. }
  143. // Parse and consume intact replies from the buf.
  144. // Returns PARSE_OK on success.
  145. // Returns PARSE_ERROR_NOT_ENOUGH_DATA if data in `buf' is not enough to parse.
  146. // Returns PARSE_ERROR_ABSOLUTELY_WRONG if the parsing failed.
  147. ParseError ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count);
  148. // implements Message ----------------------------------------------
  149. RedisResponse* New() const;
  150. void CopyFrom(const ::google::protobuf::Message& from);
  151. void MergeFrom(const ::google::protobuf::Message& from);
  152. void CopyFrom(const RedisResponse& from);
  153. void MergeFrom(const RedisResponse& from);
  154. void Clear();
  155. bool IsInitialized() const;
  156. int ByteSize() const;
  157. bool MergePartialFromCodedStream(
  158. ::google::protobuf::io::CodedInputStream* input);
  159. void SerializeWithCachedSizes(
  160. ::google::protobuf::io::CodedOutputStream* output) const;
  161. ::google::protobuf::uint8* SerializeWithCachedSizesToArray(::google::protobuf::uint8* output) const;
  162. int GetCachedSize() const { return _cached_size_; }
  163. static const ::google::protobuf::Descriptor* descriptor();
  164. protected:
  165. ::google::protobuf::Metadata GetMetadata() const override;
  166. private:
  167. void SharedCtor();
  168. void SharedDtor();
  169. void SetCachedSize(int size) const;
  170. RedisReply _first_reply;
  171. RedisReply* _other_replies;
  172. butil::Arena _arena;
  173. int _nreply;
  174. mutable int _cached_size_;
  175. };
  176. std::ostream& operator<<(std::ostream& os, const RedisRequest&);
  177. std::ostream& operator<<(std::ostream& os, const RedisResponse&);
  178. class RedisCommandHandler;
  179. // Container of CommandHandlers.
  180. // Assign an instance to ServerOption.redis_service to enable redis support.
  181. class RedisService {
  182. public:
  183. virtual ~RedisService() {}
  184. // Call this function to register `handler` that can handle command `name`.
  185. bool AddCommandHandler(const std::string& name, RedisCommandHandler* handler);
  186. // This function should not be touched by user and used by brpc deverloper only.
  187. RedisCommandHandler* FindCommandHandler(const std::string& name) const;
  188. private:
  189. typedef std::unordered_map<std::string, RedisCommandHandler*> CommandMap;
  190. CommandMap _command_map;
  191. };
  192. enum RedisCommandHandlerResult {
  193. REDIS_CMD_HANDLED = 0,
  194. REDIS_CMD_CONTINUE = 1,
  195. REDIS_CMD_BATCHED = 2,
  196. };
  197. // The Command handler for a redis request. User should impletement Run().
  198. class RedisCommandHandler {
  199. public:
  200. virtual ~RedisCommandHandler() {}
  201. // Once Server receives commands, it will first find the corresponding handlers and
  202. // call them sequentially(one by one) according to the order that requests arrive,
  203. // just like what redis-server does.
  204. // `args' is the array of request command. For example, "set somekey somevalue"
  205. // corresponds to args[0]=="set", args[1]=="somekey" and args[2]=="somevalue".
  206. // `output', which should be filled by user, is the content that sent to client side.
  207. // Read brpc/src/redis_reply.h for more usage.
  208. // `flush_batched' indicates whether the user should flush all the results of
  209. // batched commands. If user want to do some batch processing, user should buffer
  210. // the commands and return REDIS_CMD_BATCHED. Once `flush_batched' is true,
  211. // run all the commands, set `output' to be an array in which every element is the
  212. // result of batched commands and return REDIS_CMD_HANDLED.
  213. //
  214. // The return value should be REDIS_CMD_HANDLED for normal cases. If you want
  215. // to implement transaction, return REDIS_CMD_CONTINUE once server receives
  216. // an start marker and brpc will call MultiTransactionHandler() to new a transaction
  217. // handler that all the following commands are sent to this tranction handler until
  218. // it returns REDIS_CMD_HANDLED. Read the comment below.
  219. virtual RedisCommandHandlerResult Run(const std::vector<const char*>& args,
  220. brpc::RedisReply* output,
  221. bool flush_batched) = 0;
  222. // The Run() returns CONTINUE for "multi", which makes brpc call this method to
  223. // create a transaction_handler to process following commands until transaction_handler
  224. // returns OK. For example, for command "multi; set k1 v1; set k2 v2; set k3 v3;
  225. // exec":
  226. // 1) First command is "multi" and Run() should return REDIS_CMD_CONTINUE,
  227. // then brpc calls NewTransactionHandler() to new a transaction_handler.
  228. // 2) brpc calls transaction_handler.Run() with command "set k1 v1",
  229. // which should return CONTINUE.
  230. // 3) brpc calls transaction_handler.Run() with command "set k2 v2",
  231. // which should return CONTINUE.
  232. // 4) brpc calls transaction_handler.Run() with command "set k3 v3",
  233. // which should return CONTINUE.
  234. // 5) An ending marker(exec) is found in transaction_handler.Run(), user exeuctes all
  235. // the commands and return OK. This Transation is done.
  236. virtual RedisCommandHandler* NewTransactionHandler();
  237. };
  238. } // namespace brpc
  239. #endif // BRPC_REDIS_H