rpc_message_brpc.h 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. /*
  2. Copyright (c) 2020 Sogou, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #ifndef __RPC_MESSAGE_BRPC_H__
  14. #define __RPC_MESSAGE_BRPC_H__
  15. #ifdef _WIN32
  16. #include <workflow/PlatformSocket.h>
  17. #else
  18. #include <arpa/inet.h>
  19. #endif
  20. #include "rpc_message.h"
  21. #include "rpc_basic.h"
  22. namespace srpc
  23. {
  24. static constexpr int BRPC_HEADER_SIZE = 12;
  25. class BRPCMessage : public RPCMessage
  26. {
  27. public:
  28. BRPCMessage();
  29. virtual ~BRPCMessage();
  30. int encode(struct iovec vectors[], int max, size_t size_limit);
  31. int append(const void *buf, size_t *size, size_t size_limit);
  32. int get_compress_type() const override;
  33. void set_compress_type(int type) override;
  34. bool get_attachment_nocopy(const char **attachment, size_t *len) const;
  35. void set_attachment_nocopy(const char *attachment, size_t len);
  36. int get_data_type() const override { return RPCDataProtobuf; }
  37. void set_data_type(int type) override { }
  38. //Different data type is not supported in BrpcMeta.
  39. bool get_meta_module_data(RPCModuleData& data) const override { return false; }
  40. bool set_meta_module_data(const RPCModuleData& data) override { return false; }
  41. public:
  42. using RPCMessage::serialize;
  43. using RPCMessage::deserialize;
  44. int serialize(const ProtobufIDLMessage *pb_msg) override;
  45. int deserialize(ProtobufIDLMessage *pb_msg) override;
  46. int compress() override;
  47. int decompress() override;
  48. protected:
  49. // "PRPC" + PAYLOAD_SIZE + META_SIZE
  50. char header[BRPC_HEADER_SIZE];
  51. size_t nreceived;
  52. size_t meta_len;
  53. size_t message_len;
  54. size_t attachment_len;
  55. char *meta_buf;
  56. RPCBuffer *message;
  57. RPCBuffer *attachment;
  58. ProtobufIDLMessage *meta;
  59. protected:
  60. int error_code_srpc_brpc(int srpc_status_code) const;
  61. int error_code_brpc_srpc(int brpc_error_code) const;
  62. };
  63. class BRPCRequest : public BRPCMessage
  64. {
  65. public:
  66. bool serialize_meta();
  67. bool deserialize_meta();
  68. const std::string& get_service_name() const;
  69. const std::string& get_method_name() const;
  70. void set_service_name(const std::string& service_name);
  71. void set_method_name(const std::string& method_name);
  72. int64_t get_correlation_id() const;
  73. };
  74. class BRPCResponse : public BRPCMessage
  75. {
  76. public:
  77. bool serialize_meta();
  78. bool deserialize_meta();
  79. int get_status_code() const;
  80. int get_error() const;
  81. const char *get_errmsg() const;
  82. void set_status_code(int code);
  83. void set_error(int error);
  84. void set_correlation_id(int64_t cid);
  85. protected:
  86. int srpc_status_code = RPCStatusOK;
  87. std::string srpc_error_msg;
  88. };
  89. class BRPCStdRequest : public protocol::ProtocolMessage, public RPCRequest, public BRPCRequest
  90. {
  91. public:
  92. int encode(struct iovec vectors[], int max) override
  93. {
  94. return this->BRPCRequest::encode(vectors, max, this->size_limit);
  95. }
  96. int append(const void *buf, size_t *size) override
  97. {
  98. return this->BRPCRequest::append(buf, size, this->size_limit);
  99. }
  100. public:
  101. bool serialize_meta() override
  102. {
  103. return this->BRPCRequest::serialize_meta();
  104. }
  105. bool deserialize_meta() override
  106. {
  107. return this->BRPCRequest::deserialize_meta();
  108. }
  109. public:
  110. const std::string& get_service_name() const override
  111. {
  112. return this->BRPCRequest::get_service_name();
  113. }
  114. const std::string& get_method_name() const override
  115. {
  116. return this->BRPCRequest::get_method_name();
  117. }
  118. void set_service_name(const std::string& service_name) override
  119. {
  120. return this->BRPCRequest::set_service_name(service_name);
  121. }
  122. void set_method_name(const std::string& method_name) override
  123. {
  124. return this->BRPCRequest::set_method_name(method_name);
  125. }
  126. bool set_meta_module_data(const RPCModuleData& data) override
  127. {
  128. return this->BRPCMessage::set_meta_module_data(data);
  129. }
  130. bool get_meta_module_data(RPCModuleData& data) const override
  131. {
  132. return this->BRPCMessage::get_meta_module_data(data);
  133. }
  134. public:
  135. BRPCStdRequest() { this->size_limit = RPC_BODY_SIZE_LIMIT; }
  136. };
  137. class BRPCStdResponse : public protocol::ProtocolMessage, public RPCResponse, public BRPCResponse
  138. {
  139. public:
  140. int encode(struct iovec vectors[], int max) override
  141. {
  142. return this->BRPCResponse::encode(vectors, max, this->size_limit);
  143. }
  144. int append(const void *buf, size_t *size) override
  145. {
  146. return this->BRPCResponse::append(buf, size, this->size_limit);
  147. }
  148. public:
  149. bool serialize_meta() override
  150. {
  151. return this->BRPCResponse::serialize_meta();
  152. }
  153. bool deserialize_meta() override
  154. {
  155. return this->BRPCResponse::deserialize_meta();
  156. }
  157. public:
  158. int get_status_code() const override
  159. {
  160. return this->BRPCResponse::get_status_code();
  161. }
  162. int get_error() const override
  163. {
  164. return this->BRPCResponse::get_error();
  165. }
  166. const char *get_errmsg() const override
  167. {
  168. return this->BRPCResponse::get_errmsg();
  169. }
  170. void set_status_code(int code) override
  171. {
  172. return this->BRPCResponse::set_status_code(code);
  173. }
  174. void set_error(int error) override
  175. {
  176. return this->BRPCResponse::set_error(error);
  177. }
  178. bool set_meta_module_data(const RPCModuleData& data) override
  179. {
  180. return this->BRPCMessage::set_meta_module_data(data);
  181. }
  182. bool get_meta_module_data(RPCModuleData& data) const override
  183. {
  184. return this->BRPCMessage::get_meta_module_data(data);
  185. }
  186. public:
  187. BRPCStdResponse() { this->size_limit = RPC_BODY_SIZE_LIMIT; }
  188. };
  189. ////////
  190. // inl
  191. inline BRPCMessage::~BRPCMessage()
  192. {
  193. delete this->message;
  194. delete this->attachment;
  195. delete []this->meta_buf;
  196. delete this->meta;
  197. }
  198. inline int BRPCMessage::encode(struct iovec vectors[], int max, size_t size_limit)
  199. {
  200. size_t sz = this->meta_len + this->message_len + this->attachment_len;
  201. if (sz > 0x7FFFFFFF)
  202. {
  203. errno = EOVERFLOW;
  204. return -1;
  205. }
  206. int ret;
  207. int total;
  208. char *p = this->header;
  209. memcpy(p, "PRPC", 4);
  210. p += 4;
  211. *(uint32_t *)(p) = htonl((uint32_t)sz);
  212. p += 4;
  213. *(uint32_t *)(p) = htonl((uint32_t)this->meta_len);
  214. vectors[0].iov_base = this->header;
  215. vectors[0].iov_len = BRPC_HEADER_SIZE;
  216. vectors[1].iov_base = this->meta_buf;
  217. vectors[1].iov_len = this->meta_len;
  218. ret = this->message->encode(vectors + 2, max - 2);
  219. if (ret < 0)
  220. return ret;
  221. total = ret;
  222. if (this->attachment_len)
  223. {
  224. ret = this->attachment->encode(vectors + 2 + ret, max - 2 - ret);
  225. if (ret < 0)
  226. return ret;
  227. total += ret;
  228. }
  229. return 2 + total;
  230. }
  231. } // namespace srpc
  232. #endif