rpc_message_thrift.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. /*
  2. Copyright (c) 2020 sogou, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #ifndef __RPC_MESSAGE_THRIFT_H__
  14. #define __RPC_MESSAGE_THRIFT_H__
  15. #include <string>
  16. #include <workflow/HttpMessage.h>
  17. #include "rpc_message.h"
  18. #include "rpc_thrift_idl.h"
  19. namespace srpc
  20. {
  21. class ThriftException : public ThriftIDLMessage
  22. {
  23. public:
  24. std::string message;
  25. int32_t type;
  26. public:
  27. struct ISSET
  28. {
  29. bool message = true;
  30. bool type = true;
  31. }__isset;
  32. ThriftException()
  33. {
  34. this->elements = ThriftElementsImpl<ThriftException>::get_elements_instance();
  35. this->descriptor = ThriftDescriptorImpl<ThriftException, TDT_STRUCT,
  36. void, void>::get_instance();
  37. }
  38. static void StaticElementsImpl(std::list<struct_element> *elements)
  39. {
  40. const ThriftException *st = (const ThriftException *)0;
  41. const char *base = (const char *)st;
  42. using subtype_1 = ThriftDescriptorImpl<std::string, TDT_STRING, void, void>;
  43. using subtype_2 = ThriftDescriptorImpl<int32_t, TDT_I32, void, void>;
  44. elements->push_back({subtype_1::get_instance(), "message",
  45. (const char *)(&st->__isset.message) - base,
  46. (const char *)(&st->message) - base, 1});
  47. elements->push_back({subtype_2::get_instance(), "type",
  48. (const char *)(&st->__isset.type) - base,
  49. (const char *)(&st->type) - base, 2});
  50. }
  51. };
  52. class ThriftMessage : public RPCMessage
  53. {
  54. public:
  55. ThriftMessage() : TBuffer_(&buf_) { }
  56. virtual ~ThriftMessage() { }
  57. //copy constructor
  58. ThriftMessage(const ThriftMessage&) = delete;
  59. //copy operator
  60. ThriftMessage& operator= (const ThriftMessage&) = delete;
  61. //move constructor
  62. ThriftMessage(ThriftMessage&&) = delete;
  63. //move operator
  64. ThriftMessage& operator= (ThriftMessage&&) = delete;
  65. public:
  66. int get_compress_type() const override { return RPCCompressNone; }
  67. int get_data_type() const override { return RPCDataThrift; }
  68. void set_compress_type(int type) override {}
  69. void set_data_type(int type) override {}
  70. void set_attachment_nocopy(const char *attachment, size_t len) { }
  71. bool get_attachment_nocopy(const char **attachment, size_t *len) const
  72. {
  73. return false;
  74. }
  75. public:
  76. int serialize(const ThriftIDLMessage *thrift_msg) override;
  77. int deserialize(ThriftIDLMessage *thrift_msg) override;
  78. int compress() override { return RPCStatusOK; }
  79. int decompress() override { return RPCStatusOK; }
  80. bool get_meta_module_data(RPCModuleData& data) const override { return false; }
  81. bool set_meta_module_data(const RPCModuleData& data) override { return false; }
  82. public:
  83. const ThriftMeta *get_meta() const { return &TBuffer_.meta; }
  84. ThriftMeta *get_meta() { return &TBuffer_.meta; }
  85. protected:
  86. int encode(struct iovec vectors[], int max, size_t size_limit);
  87. int append(const void *buf, size_t *size, size_t size_limit);
  88. RPCBuffer buf_;
  89. ThriftBuffer TBuffer_;
  90. };
  91. class ThriftRequest : public ThriftMessage
  92. {
  93. public:
  94. bool serialize_meta() { return TBuffer_.writeMessageBegin(); }
  95. bool deserialize_meta() { return TBuffer_.readMessageBegin(); }
  96. public:
  97. const std::string& get_service_name() const { return TBuffer_.meta.method_name; }
  98. const std::string& get_method_name() const { return TBuffer_.meta.method_name; }
  99. void set_service_name(const std::string& service_name) { }
  100. void set_method_name(const std::string& method_name)
  101. {
  102. TBuffer_.meta.method_name = method_name;
  103. }
  104. void set_seqid(long long seqid) { TBuffer_.meta.seqid = (int)seqid; }
  105. };
  106. class ThriftResponse : public ThriftMessage
  107. {
  108. public:
  109. bool serialize_meta();
  110. bool deserialize_meta();
  111. bool get_meta_module_data(RPCModuleData& data) const override { return false; }
  112. bool set_meta_module_data(const RPCModuleData& data) override { return false; }
  113. public:
  114. int get_status_code() const { return status_code_; }
  115. int get_error() const { return error_; }
  116. const char *get_errmsg() const;
  117. void set_status_code(int code)
  118. {
  119. status_code_ = code;
  120. }
  121. void set_error(int error) { error_ = error; }
  122. protected:
  123. int status_code_ = RPCStatusOK;
  124. int error_ = TET_UNKNOWN;
  125. std::string errmsg_;
  126. };
  127. class ThriftStdRequest : public protocol::ProtocolMessage, public RPCRequest,
  128. public ThriftRequest
  129. {
  130. public:
  131. int encode(struct iovec vectors[], int max) override
  132. {
  133. return this->ThriftRequest::encode(vectors, max, this->size_limit);
  134. }
  135. int append(const void *buf, size_t *size) override
  136. {
  137. return this->ThriftRequest::append(buf, size, this->size_limit);
  138. }
  139. public:
  140. bool serialize_meta() override
  141. {
  142. return this->ThriftRequest::serialize_meta();
  143. }
  144. bool deserialize_meta() override
  145. {
  146. return this->ThriftRequest::deserialize_meta();
  147. }
  148. public:
  149. const std::string& get_service_name() const override
  150. {
  151. return this->ThriftRequest::get_service_name();
  152. }
  153. const std::string& get_method_name() const override
  154. {
  155. return this->ThriftRequest::get_method_name();
  156. }
  157. void set_service_name(const std::string& service_name) override
  158. {
  159. return this->ThriftRequest::set_service_name(service_name);
  160. }
  161. void set_method_name(const std::string& method_name) override
  162. {
  163. return this->ThriftRequest::set_method_name(method_name);
  164. }
  165. void set_seqid(long long seqid) override
  166. {
  167. this->ThriftRequest::set_seqid(seqid);
  168. }
  169. bool set_meta_module_data(const RPCModuleData& data) override
  170. {
  171. return this->ThriftMessage::set_meta_module_data(data);
  172. }
  173. bool get_meta_module_data(RPCModuleData& data) const override
  174. {
  175. return this->ThriftMessage::get_meta_module_data(data);
  176. }
  177. public:
  178. ThriftStdRequest() { this->size_limit = RPC_BODY_SIZE_LIMIT; }
  179. };
  180. class ThriftStdResponse : public protocol::ProtocolMessage, public RPCResponse,
  181. public ThriftResponse
  182. {
  183. public:
  184. int encode(struct iovec vectors[], int max) override
  185. {
  186. return this->ThriftResponse::encode(vectors, max, this->size_limit);
  187. }
  188. int append(const void *buf, size_t *size) override
  189. {
  190. return this->ThriftResponse::append(buf, size, this->size_limit);
  191. }
  192. public:
  193. bool serialize_meta() override
  194. {
  195. return this->ThriftResponse::serialize_meta();
  196. }
  197. bool deserialize_meta() override
  198. {
  199. return this->ThriftResponse::deserialize_meta();
  200. }
  201. public:
  202. int get_status_code() const override
  203. {
  204. return this->ThriftResponse::get_status_code();
  205. }
  206. int get_error() const override
  207. {
  208. return this->ThriftResponse::get_error();
  209. }
  210. const char *get_errmsg() const override
  211. {
  212. return this->ThriftResponse::get_errmsg();
  213. }
  214. void set_status_code(int code) override
  215. {
  216. return this->ThriftResponse::set_status_code(code);
  217. }
  218. void set_error(int error) override
  219. {
  220. return this->ThriftResponse::set_error(error);
  221. }
  222. bool set_meta_module_data(const RPCModuleData& data) override
  223. {
  224. return this->ThriftMessage::set_meta_module_data(data);
  225. }
  226. bool get_meta_module_data(RPCModuleData& data) const override
  227. {
  228. return this->ThriftMessage::get_meta_module_data(data);
  229. }
  230. public:
  231. ThriftStdResponse() { this->size_limit = RPC_BODY_SIZE_LIMIT; }
  232. };
  233. class ThriftHttpRequest : public protocol::HttpRequest, public RPCRequest,
  234. public ThriftRequest
  235. {
  236. public:
  237. bool serialize_meta() override;
  238. bool deserialize_meta() override;
  239. public:
  240. const std::string& get_service_name() const override
  241. {
  242. return this->ThriftRequest::get_service_name();
  243. }
  244. const std::string& get_method_name() const override
  245. {
  246. return this->ThriftRequest::get_method_name();
  247. }
  248. void set_service_name(const std::string& service_name) override
  249. {
  250. return this->ThriftRequest::set_service_name(service_name);
  251. }
  252. void set_method_name(const std::string& method_name) override
  253. {
  254. return this->ThriftRequest::set_method_name(method_name);
  255. }
  256. void set_seqid(long long seqid) override
  257. {
  258. this->ThriftRequest::set_seqid(seqid);
  259. }
  260. bool set_meta_module_data(const RPCModuleData& data) override
  261. {
  262. return this->ThriftMessage::set_meta_module_data(data);
  263. }
  264. bool get_meta_module_data(RPCModuleData& data) const override
  265. {
  266. return this->ThriftMessage::get_meta_module_data(data);
  267. }
  268. bool set_http_header(const std::string& name,
  269. const std::string& value) override;
  270. bool add_http_header(const std::string& name,
  271. const std::string& value) override;
  272. bool get_http_header(const std::string& name,
  273. std::string& value) const override;
  274. public:
  275. ThriftHttpRequest() { this->size_limit = RPC_BODY_SIZE_LIMIT; }
  276. };
  277. class ThriftHttpResponse : public protocol::HttpResponse, public RPCResponse,
  278. public ThriftResponse
  279. {
  280. public:
  281. bool serialize_meta() override;
  282. bool deserialize_meta() override;
  283. public:
  284. int get_status_code() const override
  285. {
  286. return this->ThriftResponse::get_status_code();
  287. }
  288. int get_error() const override
  289. {
  290. return this->ThriftResponse::get_error();
  291. }
  292. const char *get_errmsg() const override
  293. {
  294. return this->ThriftResponse::get_errmsg();
  295. }
  296. void set_status_code(int code) override
  297. {
  298. return this->ThriftResponse::set_status_code(code);
  299. }
  300. void set_error(int error) override
  301. {
  302. return this->ThriftResponse::set_error(error);
  303. }
  304. bool set_http_code(int code) override
  305. {
  306. return this->protocol::HttpResponse::set_status_code(std::to_string(code));
  307. }
  308. bool set_meta_module_data(const RPCModuleData& data) override
  309. {
  310. return this->ThriftMessage::set_meta_module_data(data);
  311. }
  312. bool get_meta_module_data(RPCModuleData& data) const override
  313. {
  314. return this->ThriftMessage::get_meta_module_data(data);
  315. }
  316. bool set_http_header(const std::string& name,
  317. const std::string& value) override;
  318. bool add_http_header(const std::string& name,
  319. const std::string& value) override;
  320. bool get_http_header(const std::string& name,
  321. std::string& value) const override;
  322. public:
  323. ThriftHttpResponse() { this->size_limit = RPC_BODY_SIZE_LIMIT; }
  324. };
  325. ////////
  326. // inl
  327. inline int ThriftMessage::serialize(const ThriftIDLMessage *thrift_msg)
  328. {
  329. if (thrift_msg)
  330. {
  331. if (!thrift_msg->descriptor->writer(thrift_msg, &TBuffer_))
  332. {
  333. return TBuffer_.meta.message_type == TMT_CALL ?
  334. RPCStatusReqSerializeError :
  335. RPCStatusRespSerializeError;
  336. }
  337. }
  338. return RPCStatusOK;
  339. }
  340. inline int ThriftMessage::deserialize(ThriftIDLMessage *thrift_msg)
  341. {
  342. if (thrift_msg)
  343. {
  344. if (!thrift_msg->descriptor->reader(&TBuffer_, thrift_msg))
  345. {
  346. return TBuffer_.meta.message_type == TMT_CALL ?
  347. RPCStatusReqDeserializeError :
  348. RPCStatusRespDeserializeError;
  349. }
  350. }
  351. return RPCStatusOK;
  352. }
  353. inline int ThriftMessage::encode(struct iovec vectors[], int max,
  354. size_t size_limit)
  355. {
  356. size_t sz = TBuffer_.meta.writebuf.size() + buf_.size();
  357. if (sz > 0x7FFFFFFF)
  358. {
  359. errno = EOVERFLOW;
  360. return -1;
  361. }
  362. if (sz > 0)
  363. {
  364. TBuffer_.framesize = ntohl((int32_t)sz);
  365. vectors[0].iov_base = (char *)&TBuffer_.framesize;
  366. vectors[0].iov_len = sizeof (int32_t);
  367. vectors[1].iov_base = const_cast<char *>(TBuffer_.meta.writebuf.c_str());
  368. vectors[1].iov_len = TBuffer_.meta.writebuf.size();
  369. int ret = buf_.encode(vectors + 2, max - 2);
  370. return ret < 0 ? ret : 2 + ret;
  371. }
  372. return 0;
  373. }
  374. } // namespace srpc
  375. #endif