1
0

rpc_message_srpc.h 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. /*
  2. Copyright (c) 2020 Sogou, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #ifndef __RPC_MESSAGE_SRPC_H__
  14. #define __RPC_MESSAGE_SRPC_H__
  15. #ifdef _WIN32
  16. #include <workflow/PlatformSocket.h>
  17. #else
  18. #include <arpa/inet.h>
  19. #endif
  20. #include <workflow/HttpMessage.h>
  21. #include "rpc_message.h"
  22. #include "rpc_basic.h"
  23. #include "rpc_thrift_idl.h"
  24. #include "rpc_buffer.h"
  25. namespace srpc
  26. {
  27. static constexpr int SRPC_HEADER_SIZE = 16;
  28. // define srpc protocol
  29. class SRPCMessage : public RPCMessage
  30. {
  31. public:
  32. SRPCMessage();
  33. virtual ~SRPCMessage();
  34. int encode(struct iovec vectors[], int max, size_t size_limit);
  35. int append(const void *buf, size_t *size, size_t size_limit);
  36. bool serialize_meta();
  37. bool deserialize_meta();
  38. int get_compress_type() const override;
  39. int get_data_type() const override;
  40. void set_compress_type(int type) override;
  41. void set_data_type(int type) override;
  42. void set_attachment_nocopy(const char *attachment, size_t len);
  43. bool get_attachment_nocopy(const char **attachment, size_t *len) const;
  44. bool set_meta_module_data(const RPCModuleData& data) override;
  45. bool get_meta_module_data(RPCModuleData& data) const override;
  46. public:
  47. using RPCMessage::serialize;
  48. using RPCMessage::deserialize;
  49. int serialize(const ProtobufIDLMessage *pb_msg) override;
  50. int deserialize(ProtobufIDLMessage *pb_msg) override;
  51. int serialize(const ThriftIDLMessage *thrift_msg) override;
  52. int deserialize(ThriftIDLMessage *thrift_msg) override;
  53. int compress() override;
  54. int decompress() override;
  55. public:
  56. RPCBuffer *get_buffer() const { return this->buf; }
  57. size_t get_message_len() const { return this->message_len; }
  58. void set_message_len(size_t len) { this->message_len = len; }
  59. protected:
  60. void init_meta();
  61. // "SRPC" + META_LEN + MESSAGE_LEN + RESERVED
  62. char header[SRPC_HEADER_SIZE];
  63. RPCBuffer *buf;
  64. char *meta_buf;
  65. size_t nreceived;
  66. size_t meta_len;
  67. size_t message_len;
  68. ProtobufIDLMessage *meta;
  69. };
  70. class SRPCRequest : public SRPCMessage
  71. {
  72. public:
  73. const std::string& get_service_name() const;
  74. const std::string& get_method_name() const;
  75. void set_service_name(const std::string& service_name);
  76. void set_method_name(const std::string& method_name);
  77. };
  78. class SRPCResponse : public SRPCMessage
  79. {
  80. public:
  81. int get_status_code() const;
  82. int get_error() const;
  83. const char *get_errmsg() const;
  84. void set_status_code(int code);
  85. void set_error(int error);
  86. };
  87. class SRPCStdRequest : public protocol::ProtocolMessage, public RPCRequest, public SRPCRequest
  88. {
  89. public:
  90. int encode(struct iovec vectors[], int max) override
  91. {
  92. return this->SRPCRequest::encode(vectors, max, this->size_limit);
  93. }
  94. int append(const void *buf, size_t *size) override
  95. {
  96. return this->SRPCRequest::append(buf, size, this->size_limit);
  97. }
  98. public:
  99. bool serialize_meta() override
  100. {
  101. return this->SRPCRequest::serialize_meta();
  102. }
  103. bool deserialize_meta() override
  104. {
  105. return this->SRPCRequest::deserialize_meta();
  106. }
  107. public:
  108. const std::string& get_service_name() const override
  109. {
  110. return this->SRPCRequest::get_service_name();
  111. }
  112. const std::string& get_method_name() const override
  113. {
  114. return this->SRPCRequest::get_method_name();
  115. }
  116. void set_service_name(const std::string& service_name) override
  117. {
  118. return this->SRPCRequest::set_service_name(service_name);
  119. }
  120. void set_method_name(const std::string& method_name) override
  121. {
  122. return this->SRPCRequest::set_method_name(method_name);
  123. }
  124. bool set_meta_module_data(const RPCModuleData& data) override
  125. {
  126. return this->SRPCMessage::set_meta_module_data(data);
  127. }
  128. bool get_meta_module_data(RPCModuleData& data) const override
  129. {
  130. return this->SRPCMessage::get_meta_module_data(data);
  131. }
  132. public:
  133. SRPCStdRequest() { this->size_limit = RPC_BODY_SIZE_LIMIT; }
  134. };
  135. class SRPCStdResponse : public protocol::ProtocolMessage, public RPCResponse, public SRPCResponse
  136. {
  137. public:
  138. int encode(struct iovec vectors[], int max) override
  139. {
  140. return this->SRPCResponse::encode(vectors, max, this->size_limit);
  141. }
  142. int append(const void *buf, size_t *size) override
  143. {
  144. return this->SRPCResponse::append(buf, size, this->size_limit);
  145. }
  146. public:
  147. bool serialize_meta() override
  148. {
  149. return this->SRPCResponse::serialize_meta();
  150. }
  151. bool deserialize_meta() override
  152. {
  153. return this->SRPCResponse::deserialize_meta();
  154. }
  155. public:
  156. int get_status_code() const override
  157. {
  158. return this->SRPCResponse::get_status_code();
  159. }
  160. int get_error() const override
  161. {
  162. return this->SRPCResponse::get_error();
  163. }
  164. const char *get_errmsg() const override
  165. {
  166. return this->SRPCResponse::get_errmsg();
  167. }
  168. void set_status_code(int code) override
  169. {
  170. return this->SRPCResponse::set_status_code(code);
  171. }
  172. void set_error(int error) override
  173. {
  174. return this->SRPCResponse::set_error(error);
  175. }
  176. bool set_meta_module_data(const RPCModuleData& data) override
  177. {
  178. return this->SRPCMessage::set_meta_module_data(data);
  179. }
  180. bool get_meta_module_data(RPCModuleData& data) const override
  181. {
  182. return this->SRPCMessage::get_meta_module_data(data);
  183. }
  184. public:
  185. SRPCStdResponse() { this->size_limit = RPC_BODY_SIZE_LIMIT; }
  186. };
  187. class SRPCHttpRequest : public protocol::HttpRequest, public RPCRequest, public SRPCRequest
  188. {
  189. public:
  190. bool serialize_meta() override;
  191. bool deserialize_meta() override;
  192. public:
  193. const std::string& get_service_name() const override
  194. {
  195. return this->SRPCRequest::get_service_name();
  196. }
  197. const std::string& get_method_name() const override
  198. {
  199. return this->SRPCRequest::get_method_name();
  200. }
  201. void set_service_name(const std::string& service_name) override
  202. {
  203. return this->SRPCRequest::set_service_name(service_name);
  204. }
  205. void set_method_name(const std::string& method_name) override
  206. {
  207. return this->SRPCRequest::set_method_name(method_name);
  208. }
  209. bool set_meta_module_data(const RPCModuleData& data) override;
  210. bool get_meta_module_data(RPCModuleData& data) const override;
  211. bool set_http_header(const std::string& name,
  212. const std::string& value) override;
  213. bool add_http_header(const std::string& name,
  214. const std::string& value) override;
  215. bool get_http_header(const std::string& name,
  216. std::string& value) const override;
  217. public:
  218. SRPCHttpRequest() { this->size_limit = RPC_BODY_SIZE_LIMIT; }
  219. };
  220. class SRPCHttpResponse : public protocol::HttpResponse, public RPCResponse, public SRPCResponse
  221. {
  222. public:
  223. bool serialize_meta() override;
  224. bool deserialize_meta() override;
  225. public:
  226. int get_status_code() const override
  227. {
  228. return this->SRPCResponse::get_status_code();
  229. }
  230. int get_error() const override
  231. {
  232. return this->SRPCResponse::get_error();
  233. }
  234. const char *get_errmsg() const override
  235. {
  236. return this->SRPCResponse::get_errmsg();
  237. }
  238. void set_status_code(int code) override
  239. {
  240. return this->SRPCResponse::set_status_code(code);
  241. }
  242. void set_error(int error) override
  243. {
  244. return this->SRPCResponse::set_error(error);
  245. }
  246. bool set_http_code(int code) override
  247. {
  248. return this->protocol::HttpResponse::set_status_code(std::to_string(code));
  249. }
  250. bool set_meta_module_data(const RPCModuleData& data) override;
  251. bool get_meta_module_data(RPCModuleData& data) const override;
  252. bool set_http_header(const std::string& name,
  253. const std::string& value) override;
  254. bool add_http_header(const std::string& name,
  255. const std::string& value) override;
  256. bool get_http_header(const std::string& name,
  257. std::string& value) const override;
  258. public:
  259. SRPCHttpResponse() { this->size_limit = RPC_BODY_SIZE_LIMIT; }
  260. };
  261. ////////
  262. // inl
  263. inline SRPCMessage::~SRPCMessage()
  264. {
  265. delete []this->meta_buf;
  266. delete this->meta;
  267. delete this->buf;
  268. }
  269. inline int SRPCMessage::encode(struct iovec vectors[], int max, size_t size_limit)
  270. {
  271. if (this->message_len > 0x7FFFFFFF)
  272. {
  273. errno = EOVERFLOW;
  274. return -1;
  275. }
  276. char *p = this->header;
  277. memcpy(p, "SRPC", 4);
  278. p += 4;
  279. *(uint32_t *)(p) = htonl((uint32_t)this->meta_len);
  280. p += 4;
  281. *(uint32_t *)(p) = htonl((uint32_t)this->message_len);
  282. vectors[0].iov_base = this->header;
  283. vectors[0].iov_len = SRPC_HEADER_SIZE;
  284. vectors[1].iov_base = this->meta_buf;
  285. vectors[1].iov_len = this->meta_len;
  286. int ret = this->buf->encode(vectors + 2, max - 2);
  287. return ret < 0 ? ret : ret + 2;
  288. }
  289. inline bool SRPCMessage::serialize_meta()
  290. {
  291. this->meta_len = this->meta->ByteSizeLong();
  292. this->meta_buf = new char[this->meta_len];
  293. return this->meta->SerializeToArray(this->meta_buf, (int)this->meta_len);
  294. }
  295. inline bool SRPCMessage::deserialize_meta()
  296. {
  297. return this->meta->ParseFromArray(this->meta_buf, (int)this->meta_len);
  298. }
  299. } // namespace srpc
  300. #endif