rpc_message_srpc.cc 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026
  1. /*
  2. Copyright (c) 2020 Sogou, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #include <errno.h>
  14. #include <vector>
  15. #include <string>
  16. #include <google/protobuf/util/json_util.h>
  17. #include <google/protobuf/util/type_resolver_util.h>
  18. #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
  19. #include <workflow/HttpUtil.h>
  20. #include <workflow/StringUtil.h>
  21. #include "rpc_basic.h"
  22. #include "rpc_compress.h"
  23. #include "rpc_meta.pb.h"
  24. #include "rpc_message_srpc.h"
  25. #include "rpc_zero_copy_stream.h"
  26. #include "rpc_module.h"
  27. #include "rpc_trace_module.h"
  28. namespace srpc
  29. {
  30. struct SRPCHttpHeadersString
  31. {
  32. const std::string RPCCompressType = "Content-Encoding";
  33. const std::string OriginSize = "Origin-Size";
  34. const std::string CompressdSize = "Content-Length";
  35. const std::string DataType = "Content-Type";
  36. const std::string SRPCStatus = "SRPC-Status";
  37. const std::string SRPCError = "SRPC-Error";
  38. };
  39. struct CaseCmp
  40. {
  41. bool operator()(const std::string& lhs, const std::string& rhs) const
  42. {
  43. return strcasecmp(lhs.c_str(), rhs.c_str()) < 0;
  44. }
  45. };
  46. static const struct SRPCHttpHeadersString SRPCHttpHeaders;
  47. static const std::map<const std::string, int, CaseCmp> SRPCHttpHeadersCode =
  48. {
  49. {SRPCHttpHeaders.RPCCompressType, 1},
  50. {SRPCHttpHeaders.OriginSize, 2},
  51. {SRPCHttpHeaders.CompressdSize, 3},
  52. {SRPCHttpHeaders.DataType, 4},
  53. {SRPCHttpHeaders.SRPCStatus, 5},
  54. {SRPCHttpHeaders.SRPCError, 6}
  55. };
  56. static const std::vector<std::string> RPCDataTypeString =
  57. {
  58. "application/x-protobuf",
  59. "application/x-thrift",
  60. "application/json"
  61. };
  62. static const std::vector<std::string> RPCRPCCompressTypeString =
  63. {
  64. "identity",
  65. "x-snappy",
  66. "gzip",
  67. "deflate",
  68. "x-lz4"
  69. };
  70. static constexpr const char *kTypePrefix = "type.googleapis.com";
  71. class ResolverInstance
  72. {
  73. public:
  74. static google::protobuf::util::TypeResolver *get_resolver()
  75. {
  76. static ResolverInstance kInstance;
  77. return kInstance.resolver_;
  78. }
  79. private:
  80. ResolverInstance()
  81. {
  82. resolver_ = google::protobuf::util::NewTypeResolverForDescriptorPool(
  83. kTypePrefix, google::protobuf::DescriptorPool::generated_pool());
  84. }
  85. ~ResolverInstance() { delete resolver_; }
  86. google::protobuf::util::TypeResolver *resolver_;
  87. };
  88. static inline std::string GetTypeUrl(const ProtobufIDLMessage *pb_msg)
  89. {
  90. return std::string(kTypePrefix) + "/" + pb_msg->GetDescriptor()->full_name();
  91. }
  92. SRPCMessage::SRPCMessage()
  93. {
  94. this->nreceived = 0;
  95. this->meta_buf = NULL;
  96. this->meta_len = 0;
  97. this->message_len = 0;
  98. memset(this->header, 0, sizeof (this->header));
  99. this->meta = new RPCMeta();
  100. this->buf = new RPCBuffer();
  101. }
  102. int SRPCMessage::append(const void *buf, size_t *size, size_t size_limit)
  103. {
  104. uint32_t *p;
  105. size_t header_left, body_received, buf_len;
  106. if (this->nreceived < SRPC_HEADER_SIZE)
  107. {
  108. //receive header
  109. header_left = SRPC_HEADER_SIZE - this->nreceived;
  110. if (*size >= header_left)
  111. {
  112. //receive the whole header and ready to recieve body
  113. memcpy(this->header + this->nreceived, buf, header_left);
  114. this->nreceived += header_left;
  115. p = (uint32_t *)this->header + 1;
  116. this->meta_len = ntohl(*p);
  117. p = (uint32_t *)this->header + 2;
  118. this->message_len = ntohl(*p);
  119. buf_len = this->meta_len + this->message_len;
  120. if (buf_len >= size_limit)
  121. {
  122. errno = EMSGSIZE;
  123. return -1;
  124. }
  125. else if (buf_len > 0)
  126. {
  127. if (*size - header_left > buf_len)
  128. *size = header_left + buf_len;
  129. this->meta_buf = new char[this->meta_len];
  130. if (*size - header_left <= this->meta_len)
  131. {
  132. memcpy(this->meta_buf, (const char *)buf + header_left,
  133. *size - header_left);
  134. } else {
  135. memcpy(this->meta_buf, (const char *)buf + header_left,
  136. this->meta_len);
  137. this->buf->append((const char *)buf + header_left + this->meta_len,
  138. *size - header_left - this->meta_len,
  139. BUFFER_MODE_COPY);
  140. }
  141. this->nreceived += *size - header_left;
  142. if (this->nreceived == SRPC_HEADER_SIZE + buf_len)
  143. return 1;
  144. else
  145. return 0;
  146. }
  147. else if (*size == header_left)
  148. {
  149. return 1; // means body_size == 0 and finish recieved header
  150. }
  151. else
  152. {
  153. // means buf_len < 0
  154. errno = EBADMSG;
  155. return -1;
  156. }
  157. }
  158. else
  159. {
  160. // only receive header
  161. memcpy(this->header + this->nreceived, buf, *size);
  162. this->nreceived += *size;
  163. return 0;
  164. }
  165. }
  166. else
  167. {
  168. // have already received the header and now is for body only
  169. body_received = this->nreceived - SRPC_HEADER_SIZE;
  170. buf_len = this->meta_len + this->message_len;
  171. if (body_received + *size > buf_len)
  172. *size = buf_len - body_received;
  173. if (body_received + *size <= this->meta_len)
  174. {
  175. // 100 + 3 <= 106
  176. memcpy(this->meta_buf + body_received, buf, *size);
  177. } else if (body_received < this->meta_len) {
  178. // 100 + ? > 106, 100 < 106
  179. memcpy(this->meta_buf + body_received, buf,
  180. this->meta_len - body_received);
  181. if (body_received + *size > this->meta_len)// useless. always true
  182. // 100 + 10 > 106
  183. this->buf->append((const char *)buf + this->meta_len - body_received,
  184. *size - this->meta_len + body_received,
  185. BUFFER_MODE_COPY);
  186. } else {
  187. // 110 > 106
  188. this->buf->append((const char *)buf, *size, BUFFER_MODE_COPY);
  189. }
  190. this->nreceived += *size;
  191. return this->nreceived == SRPC_HEADER_SIZE + buf_len;
  192. }
  193. }
  194. int SRPCMessage::get_compress_type() const
  195. {
  196. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  197. return meta->compress_type();
  198. }
  199. int SRPCMessage::get_data_type() const
  200. {
  201. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  202. return meta->data_type();
  203. }
  204. void SRPCMessage::set_compress_type(int type)
  205. {
  206. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  207. meta->set_compress_type(type);
  208. }
  209. void SRPCMessage::set_data_type(int type)
  210. {
  211. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  212. meta->set_data_type(type);
  213. }
  214. void SRPCMessage::set_attachment_nocopy(const char *attachment, size_t len)
  215. {
  216. //TODO:
  217. }
  218. bool SRPCMessage::get_attachment_nocopy(const char **attachment, size_t *len) const
  219. {
  220. //TODO:
  221. return false;
  222. }
  223. bool SRPCMessage::set_meta_module_data(const RPCModuleData& data)
  224. {
  225. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  226. RPCMetaKeyValue *meta_kv;
  227. for (const auto& kv : data)
  228. {
  229. meta_kv = meta->add_trans_info();
  230. if (kv.first == SRPC_TRACE_ID)
  231. {
  232. meta_kv->set_key(SRPC_TRACE_ID);
  233. meta_kv->set_bytes_value(kv.second.c_str(), SRPC_TRACEID_SIZE);
  234. }
  235. else if (kv.first == SRPC_SPAN_ID)
  236. {
  237. meta_kv->set_key(SRPC_SPAN_ID);
  238. meta_kv->set_bytes_value(kv.second.c_str(), SRPC_SPANID_SIZE);
  239. }
  240. else
  241. {
  242. meta_kv->set_key(kv.first);
  243. meta_kv->set_bytes_value(kv.second);
  244. }
  245. }
  246. return true;
  247. }
  248. bool SRPCMessage::get_meta_module_data(RPCModuleData& data) const
  249. {
  250. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  251. RPCMetaKeyValue *meta_kv;
  252. for (int i = 0; i < meta->trans_info_size(); i++)
  253. {
  254. meta_kv = meta->mutable_trans_info(i);
  255. if (meta_kv->key() == SRPC_TRACE_ID)
  256. data[SRPC_TRACE_ID] = meta_kv->bytes_value();
  257. else if (meta_kv->key() == SRPC_SPAN_ID)
  258. data[SRPC_SPAN_ID] = meta_kv->bytes_value();
  259. else if (meta_kv->key() == SRPC_PARENT_SPAN_ID)
  260. data[SRPC_PARENT_SPAN_ID] = meta_kv->bytes_value();
  261. else
  262. data[meta_kv->key()] = meta_kv->bytes_value();
  263. }
  264. return true;
  265. }
  266. const std::string& SRPCRequest::get_service_name() const
  267. {
  268. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  269. return meta->mutable_request()->service_name();
  270. }
  271. const std::string& SRPCRequest::get_method_name() const
  272. {
  273. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  274. return meta->mutable_request()->method_name();
  275. }
  276. void SRPCRequest::set_service_name(const std::string& service_name)
  277. {
  278. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  279. meta->mutable_request()->set_service_name(service_name);
  280. }
  281. void SRPCRequest::set_method_name(const std::string& method_name)
  282. {
  283. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  284. meta->mutable_request()->set_method_name(method_name);
  285. }
  286. int SRPCResponse::get_status_code() const
  287. {
  288. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  289. return meta->mutable_response()->status_code();
  290. }
  291. void SRPCResponse::set_status_code(int code)
  292. {
  293. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  294. meta->mutable_response()->set_status_code(code);
  295. }
  296. int SRPCResponse::get_error() const
  297. {
  298. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  299. return meta->mutable_response()->error();
  300. }
  301. const char *SRPCResponse::get_errmsg() const
  302. {
  303. switch (this->get_status_code())
  304. {
  305. case RPCStatusOK:
  306. return "OK";
  307. case RPCStatusUndefined:
  308. return "Undefined Error";
  309. case RPCStatusServiceNotFound:
  310. return "Service Not Found";
  311. case RPCStatusMethodNotFound:
  312. return "Method Not Found";
  313. case RPCStatusMetaError:
  314. return "Meta Error";
  315. case RPCStatusReqCompressSizeInvalid:
  316. return "Request Compress-size Invalid";
  317. case RPCStatusReqDecompressSizeInvalid:
  318. return "Request Decompress-size Invalid";
  319. case RPCStatusReqCompressNotSupported:
  320. return "Request Compress Not Supported";
  321. case RPCStatusReqDecompressNotSupported:
  322. return "Request Decompress Not Supported";
  323. case RPCStatusReqCompressError:
  324. return "Request Compress Error";
  325. case RPCStatusReqDecompressError:
  326. return "Request Decompress Error";
  327. case RPCStatusReqSerializeError:
  328. return "Request Serialize Error";
  329. case RPCStatusReqDeserializeError:
  330. return "Request Deserialize Error";
  331. case RPCStatusRespCompressSizeInvalid:
  332. return "Response Compress-size Invalid";
  333. case RPCStatusRespDecompressSizeInvalid:
  334. return "Response Decompress-size Invalid";
  335. case RPCStatusRespCompressNotSupported:
  336. return "Response Compress Not Supported";
  337. case RPCStatusRespDecompressNotSupported:
  338. return "Response Decompress Not Supported";
  339. case RPCStatusRespCompressError:
  340. return "Response Compress Error";
  341. case RPCStatusRespDecompressError:
  342. return "Response Decompress Error";
  343. case RPCStatusRespSerializeError:
  344. return "Response Serialize Error";
  345. case RPCStatusRespDeserializeError:
  346. return "Response Deserialize Error";
  347. case RPCStatusIDLSerializeNotSupported:
  348. return "IDL Serialize Not Supported";
  349. case RPCStatusIDLDeserializeNotSupported:
  350. return "IDL Deserialize Not Supported";
  351. case RPCStatusURIInvalid:
  352. return "URI Invalid";
  353. case RPCStatusUpstreamFailed:
  354. return "Upstream Failed";
  355. case RPCStatusSystemError:
  356. return "System Error. Use get_error() to get errno";
  357. case RPCStatusSSLError:
  358. return "SSL Error. Use get_error() to get SSL-Error";
  359. case RPCStatusDNSError:
  360. return "DNS Error. Use get_error() to get GAI-Error";
  361. case RPCStatusProcessTerminated:
  362. return "Process Terminated";
  363. default:
  364. return "Unknown Error";
  365. }
  366. }
  367. void SRPCResponse::set_error(int error)
  368. {
  369. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  370. meta->mutable_response()->set_error(error);
  371. }
  372. int SRPCMessage::serialize(const ProtobufIDLMessage *pb_msg)
  373. {
  374. using namespace google::protobuf;
  375. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  376. bool is_resp = !meta->has_request();
  377. int data_type = meta->data_type();
  378. int ret;
  379. if (!pb_msg)
  380. return RPCStatusOK;
  381. RPCOutputStream output_stream(this->buf, pb_msg->ByteSizeLong());
  382. if (data_type == RPCDataProtobuf)
  383. {
  384. ret = pb_msg->SerializeToZeroCopyStream(&output_stream) ? 0 : -1;
  385. this->message_len = this->buf->size();
  386. }
  387. else if (data_type == RPCDataJson)
  388. {
  389. std::string binary_input = pb_msg->SerializeAsString();
  390. io::ArrayInputStream input_stream(binary_input.data(),
  391. (int)binary_input.size());
  392. const auto *pool = pb_msg->GetDescriptor()->file()->pool();
  393. auto *resolver = (pool == DescriptorPool::generated_pool() ?
  394. ResolverInstance::get_resolver() :
  395. util::NewTypeResolverForDescriptorPool(kTypePrefix, pool));
  396. util::JsonOptions options;
  397. options.add_whitespace = this->get_json_add_whitespace();
  398. options.always_print_enums_as_ints = this->get_json_enums_as_ints();
  399. options.preserve_proto_field_names = this->get_json_preserve_names();
  400. options.always_print_primitive_fields = this->get_json_print_primitive();
  401. ret = BinaryToJsonStream(resolver, GetTypeUrl(pb_msg), &input_stream,
  402. &output_stream, options).ok() ? 0 : -1;
  403. if (pool != DescriptorPool::generated_pool())
  404. delete resolver;
  405. this->message_len = this->buf->size();
  406. }
  407. else
  408. ret = -1;
  409. if (ret < 0)
  410. return is_resp ? RPCStatusRespSerializeError :
  411. RPCStatusReqSerializeError;
  412. return RPCStatusOK;
  413. }
  414. int SRPCMessage::deserialize(ProtobufIDLMessage *pb_msg)
  415. {
  416. using namespace google::protobuf;
  417. const RPCMeta *meta = static_cast<const RPCMeta *>(this->meta);
  418. bool is_resp = !meta->has_request();
  419. int data_type = meta->data_type();
  420. int ret;
  421. RPCInputStream input_stream(this->buf);
  422. if (data_type == RPCDataProtobuf)
  423. ret = pb_msg->ParseFromZeroCopyStream(&input_stream) ? 0 : -1;
  424. else if (data_type == RPCDataJson)
  425. {
  426. std::string binary_output;
  427. io::StringOutputStream output_stream(&binary_output);
  428. const auto *pool = pb_msg->GetDescriptor()->file()->pool();
  429. auto *resolver = (pool == DescriptorPool::generated_pool() ?
  430. ResolverInstance::get_resolver() :
  431. util::NewTypeResolverForDescriptorPool(kTypePrefix, pool));
  432. if (JsonToBinaryStream(resolver, GetTypeUrl(pb_msg),
  433. &input_stream, &output_stream).ok())
  434. {
  435. ret = pb_msg->ParseFromString(binary_output) ? 0 : -1;
  436. }
  437. else
  438. ret = -1;
  439. if (pool != DescriptorPool::generated_pool())
  440. delete resolver;
  441. }
  442. else
  443. ret = -1;
  444. if (ret < 0)
  445. return is_resp ? RPCStatusRespDeserializeError :
  446. RPCStatusReqDeserializeError;
  447. return RPCStatusOK;
  448. }
  449. int SRPCMessage::serialize(const ThriftIDLMessage *thrift_msg)
  450. {
  451. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  452. bool is_resp = !meta->has_request();
  453. int data_type = meta->data_type();
  454. int ret;
  455. if (!thrift_msg)
  456. return RPCStatusOK;
  457. ThriftBuffer thrift_buffer(this->buf);
  458. if (data_type == RPCDataThrift)
  459. ret = thrift_msg->descriptor->writer(thrift_msg, &thrift_buffer) ? 0 : -1;
  460. else if (data_type == RPCDataJson)
  461. ret = thrift_msg->descriptor->json_writer(thrift_msg, &thrift_buffer) ? 0 : -1;
  462. else
  463. ret = -1;
  464. if (ret < 0)
  465. return is_resp ? RPCStatusRespSerializeError :
  466. RPCStatusReqSerializeError;
  467. this->message_len = this->buf->size();
  468. return RPCStatusOK;
  469. }
  470. int SRPCMessage::deserialize(ThriftIDLMessage *thrift_msg)
  471. {
  472. const RPCMeta *meta = static_cast<const RPCMeta *>(this->meta);
  473. bool is_resp = !meta->has_request();
  474. int data_type = meta->data_type();
  475. int ret;
  476. if (this->buf->size() == 0 || this->message_len == 0)
  477. return is_resp ? RPCStatusRespDeserializeError
  478. : RPCStatusReqDeserializeError;
  479. ThriftBuffer thrift_buffer(this->buf);
  480. if (data_type == RPCDataThrift)
  481. ret = thrift_msg->descriptor->reader(&thrift_buffer, thrift_msg) ? 0 : 1;
  482. else if (data_type == RPCDataJson)
  483. ret = thrift_msg->descriptor->json_reader(&thrift_buffer, thrift_msg) ? 0 : 1;
  484. else
  485. ret = -1;
  486. if (ret < 0)
  487. return is_resp ? RPCStatusRespDeserializeError
  488. : RPCStatusReqDeserializeError;
  489. return RPCStatusOK;
  490. }
  491. int SRPCMessage::compress()
  492. {
  493. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  494. bool is_resp = !meta->has_request();
  495. int type = meta->compress_type();
  496. size_t buflen = this->message_len;
  497. int origin_size;
  498. int status_code = RPCStatusOK;
  499. if (buflen == 0)
  500. {
  501. if (type != RPCCompressNone)
  502. {
  503. meta->set_origin_size(0);
  504. meta->set_compressed_size(0);
  505. }
  506. return status_code;
  507. }
  508. if (type == RPCCompressNone)
  509. return status_code;
  510. if (buflen > 0x7FFFFFFF)
  511. {
  512. return is_resp ? RPCStatusRespCompressSizeInvalid
  513. : RPCStatusReqCompressSizeInvalid;
  514. }
  515. origin_size = (int)buflen;
  516. static RPCCompressor *compressor = RPCCompressor::get_instance();
  517. int ret = compressor->lease_compressed_size(type, buflen);
  518. if (ret == -2)
  519. {
  520. return is_resp ? RPCStatusReqCompressNotSupported
  521. : RPCStatusRespCompressNotSupported;
  522. }
  523. else if (ret <= 0)
  524. {
  525. return is_resp ? RPCStatusRespCompressSizeInvalid
  526. : RPCStatusReqCompressSizeInvalid;
  527. }
  528. buflen = ret;
  529. if (this->buf->size() != this->message_len)
  530. return is_resp ? RPCStatusRespCompressError : RPCStatusReqCompressError;
  531. RPCBuffer *dst_buf = new RPCBuffer();
  532. ret = compressor->serialize_to_compressed(this->buf, dst_buf, type);
  533. if (ret == -2)
  534. {
  535. status_code = is_resp ? RPCStatusRespCompressNotSupported
  536. : RPCStatusReqCompressNotSupported;
  537. }
  538. else if (ret == -1)
  539. {
  540. status_code = is_resp ? RPCStatusRespCompressError
  541. : RPCStatusReqCompressError;
  542. }
  543. else if (ret <= 0)
  544. {
  545. status_code = is_resp ? RPCStatusRespCompressSizeInvalid
  546. : RPCStatusReqCompressSizeInvalid;
  547. }
  548. else
  549. {
  550. meta->set_origin_size(origin_size);
  551. meta->set_compressed_size(ret);
  552. buflen = ret;
  553. }
  554. if (status_code == RPCStatusOK)
  555. {
  556. delete this->buf;
  557. this->buf = dst_buf;
  558. this->message_len = buflen;
  559. }
  560. else
  561. delete dst_buf;
  562. return status_code;
  563. }
  564. int SRPCMessage::decompress()
  565. {
  566. const RPCMeta *meta = static_cast<const RPCMeta *>(this->meta);
  567. bool is_resp = !meta->has_request();
  568. int type = meta->compress_type();
  569. int status_code = RPCStatusOK;
  570. if (this->message_len == 0 || type == RPCCompressNone)
  571. return status_code;
  572. if (meta->compressed_size() == 0)
  573. {
  574. return is_resp ? RPCStatusRespDecompressSizeInvalid
  575. : RPCStatusReqDecompressSizeInvalid;
  576. }
  577. if (this->buf->size() != (size_t)meta->compressed_size())
  578. return is_resp ? RPCStatusRespCompressError : RPCStatusReqCompressError;
  579. RPCBuffer *dst_buf = new RPCBuffer();
  580. static RPCCompressor *compressor = RPCCompressor::get_instance();
  581. int ret = compressor->parse_from_compressed(this->buf, dst_buf, type);
  582. if (ret == -2)
  583. {
  584. status_code = is_resp ? RPCStatusRespDecompressNotSupported
  585. : RPCStatusReqDecompressNotSupported;
  586. }
  587. else if (ret == -1)
  588. {
  589. status_code = is_resp ? RPCStatusRespDecompressError
  590. : RPCStatusReqDecompressError;
  591. }
  592. else if (ret <= 0 || (meta->has_origin_size() && ret != meta->origin_size()))
  593. {
  594. status_code = is_resp ? RPCStatusRespDecompressSizeInvalid
  595. : RPCStatusReqDecompressSizeInvalid;
  596. }
  597. if (status_code == RPCStatusOK)
  598. {
  599. delete this->buf;
  600. this->buf = dst_buf;
  601. this->message_len = ret;
  602. }
  603. else
  604. delete dst_buf;
  605. return status_code;
  606. }
  607. static bool __deserialize_meta_http(const char *request_uri,
  608. protocol::HttpMessage *http_msg,
  609. SRPCMessage *srpc_msg,
  610. ProtobufIDLMessage *pb_msg)
  611. {
  612. RPCMeta *meta = static_cast<RPCMeta *>(pb_msg);
  613. protocol::HttpHeaderCursor header_cursor(http_msg);
  614. std::string key, value;
  615. while (header_cursor.next(key, value))
  616. {
  617. const auto it = SRPCHttpHeadersCode.find(key);
  618. if (it != SRPCHttpHeadersCode.cend())
  619. {
  620. switch (it->second)
  621. {
  622. case 1:
  623. for (size_t i = 0; i < RPCRPCCompressTypeString.size(); i++)
  624. {
  625. if (strcasecmp(RPCRPCCompressTypeString[i].c_str(),
  626. value.c_str()) == 0)
  627. {
  628. meta->set_compress_type(i);
  629. break;
  630. }
  631. }
  632. break;
  633. case 2:
  634. meta->set_origin_size(atoi(value.c_str()));
  635. break;
  636. case 4:
  637. for (size_t i = 0; i < RPCDataTypeString.size(); i++)
  638. {
  639. if (strcasecmp(RPCDataTypeString[i].c_str(),
  640. value.c_str()) == 0)
  641. {
  642. meta->set_data_type(i);
  643. break;
  644. }
  645. }
  646. break;
  647. default:
  648. continue;
  649. }
  650. }
  651. }
  652. if (request_uri && request_uri[0] == '/')
  653. {
  654. std::string str = request_uri + 1;
  655. auto pos = str.find_first_of("?#");
  656. if (pos != std::string::npos)
  657. str.erase(pos);
  658. if (!str.empty() && str.back() == '/')
  659. str.pop_back();
  660. for (char& c : str)
  661. {
  662. if (c == '/')
  663. c = '.';
  664. }
  665. pos = str.find_last_of('.');
  666. if (pos != std::string::npos)
  667. {
  668. meta->mutable_request()->set_service_name(str.substr(0, pos));
  669. meta->mutable_request()->set_method_name(str.substr(pos + 1));
  670. }
  671. }
  672. const void *ptr;
  673. size_t len;
  674. http_msg->get_parsed_body(&ptr, &len);
  675. if (len > 0x7FFFFFFF)
  676. return false;
  677. protocol::HttpChunkCursor chunk_cursor(http_msg);
  678. RPCBuffer *buf = srpc_msg->get_buffer();
  679. size_t msg_len = 0;
  680. while (chunk_cursor.next(&ptr, &len))
  681. {
  682. buf->append((const char *)ptr, len, BUFFER_MODE_NOCOPY);
  683. msg_len += len;
  684. }
  685. srpc_msg->set_message_len(msg_len);
  686. if (!meta->has_data_type())
  687. meta->set_data_type(RPCDataJson);
  688. if (!meta->has_compress_type())
  689. meta->set_compress_type(RPCCompressNone);
  690. if (meta->compress_type() == RPCCompressNone)
  691. {
  692. if (msg_len == 0 && meta->data_type() == RPCDataJson)
  693. {
  694. buf->append("{}", 2, BUFFER_MODE_NOCOPY);
  695. srpc_msg->set_message_len(2);
  696. }
  697. }
  698. else
  699. meta->set_compressed_size(msg_len);
  700. return true;
  701. }
  702. bool SRPCHttpRequest::serialize_meta()
  703. {
  704. if (this->buf->size() > 0x7FFFFFFF)
  705. return false;
  706. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  707. int data_type = meta->data_type();
  708. int compress_type = meta->compress_type();
  709. set_http_version("HTTP/1.1");
  710. set_method("POST");
  711. set_request_uri("/" + meta->mutable_request()->service_name() +
  712. "/" + meta->mutable_request()->method_name());
  713. //set header
  714. set_header_pair(SRPCHttpHeaders.DataType,
  715. RPCDataTypeString[data_type]);
  716. set_header_pair(SRPCHttpHeaders.RPCCompressType,
  717. RPCRPCCompressTypeString[compress_type]);
  718. if (compress_type != RPCCompressNone)
  719. {
  720. set_header_pair(SRPCHttpHeaders.CompressdSize,
  721. std::to_string(meta->compressed_size()));
  722. set_header_pair(SRPCHttpHeaders.OriginSize,
  723. std::to_string(meta->origin_size()));
  724. } else {
  725. set_header_pair("Content-Length", std::to_string(this->message_len));
  726. }
  727. set_header_pair("Connection", "Keep-Alive");
  728. const void *buffer;
  729. size_t buflen;
  730. //append body
  731. while (buflen = this->buf->fetch(&buffer), buffer && buflen > 0)
  732. this->append_output_body_nocopy(buffer, buflen);
  733. return true;
  734. }
  735. bool SRPCHttpRequest::deserialize_meta()
  736. {
  737. const char *request_uri = this->get_request_uri();
  738. return __deserialize_meta_http(request_uri, this, this, this->meta);
  739. }
  740. bool SRPCHttpResponse::serialize_meta()
  741. {
  742. if (this->buf->size() > 0x7FFFFFFF)
  743. return false;
  744. RPCMeta *meta = static_cast<RPCMeta *>(this->meta);
  745. int data_type = meta->data_type();
  746. int compress_type = meta->compress_type();
  747. int rpc_status_code = this->get_status_code();
  748. const char *http_status_code = this->protocol::HttpResponse::get_status_code();
  749. set_http_version("HTTP/1.1");
  750. if (rpc_status_code == RPCStatusOK)
  751. {
  752. if (http_status_code)
  753. protocol::HttpUtil::set_response_status(this, atoi(http_status_code));
  754. else
  755. protocol::HttpUtil::set_response_status(this, HttpStatusOK);
  756. }
  757. else if (rpc_status_code == RPCStatusServiceNotFound
  758. || rpc_status_code == RPCStatusMethodNotFound
  759. || rpc_status_code == RPCStatusMetaError
  760. || rpc_status_code == RPCStatusURIInvalid)
  761. {
  762. protocol::HttpUtil::set_response_status(this, HttpStatusBadRequest);
  763. }
  764. else if (rpc_status_code == RPCStatusRespCompressNotSupported
  765. || rpc_status_code == RPCStatusRespDecompressNotSupported
  766. || rpc_status_code == RPCStatusIDLSerializeNotSupported
  767. || rpc_status_code == RPCStatusIDLDeserializeNotSupported)
  768. {
  769. protocol::HttpUtil::set_response_status(this, HttpStatusNotImplemented);
  770. }
  771. else if (rpc_status_code == RPCStatusUpstreamFailed)
  772. {
  773. protocol::HttpUtil::set_response_status(this,
  774. HttpStatusServiceUnavailable);
  775. }
  776. else
  777. {
  778. protocol::HttpUtil::set_response_status(this,
  779. HttpStatusInternalServerError);
  780. }
  781. //set header
  782. set_header_pair(SRPCHttpHeaders.SRPCStatus,
  783. std::to_string(meta->mutable_response()->status_code()));
  784. set_header_pair(SRPCHttpHeaders.SRPCError,
  785. std::to_string(meta->mutable_response()->error()));
  786. set_header_pair(SRPCHttpHeaders.DataType,
  787. RPCDataTypeString[data_type]);
  788. set_header_pair(SRPCHttpHeaders.RPCCompressType,
  789. RPCRPCCompressTypeString[compress_type]);
  790. if (compress_type != RPCCompressNone)
  791. {
  792. set_header_pair(SRPCHttpHeaders.CompressdSize,
  793. std::to_string(meta->compressed_size()));
  794. set_header_pair(SRPCHttpHeaders.OriginSize,
  795. std::to_string(meta->origin_size()));
  796. } else {
  797. set_header_pair("Content-Length", std::to_string(this->message_len));
  798. }
  799. set_header_pair("Connection", "Keep-Alive");
  800. const void *buffer;
  801. size_t buflen;
  802. //append body
  803. while (buflen = this->buf->fetch(&buffer), buffer && buflen > 0)
  804. this->append_output_body_nocopy(buffer, buflen);
  805. return true;
  806. }
  807. bool SRPCHttpResponse::deserialize_meta()
  808. {
  809. return __deserialize_meta_http(NULL, this, this, this->meta);
  810. }
  811. bool SRPCHttpRequest::set_meta_module_data(const RPCModuleData& data)
  812. {
  813. return http_set_header_module_data(data, this);
  814. }
  815. bool SRPCHttpRequest::get_meta_module_data(RPCModuleData& data) const
  816. {
  817. return http_get_header_module_data(this, data);
  818. }
  819. bool SRPCHttpResponse::set_meta_module_data(const RPCModuleData& data)
  820. {
  821. return http_set_header_module_data(data, this);
  822. }
  823. bool SRPCHttpResponse::get_meta_module_data(RPCModuleData& data) const
  824. {
  825. return http_get_header_module_data(this, data);
  826. }
  827. bool SRPCHttpRequest::set_http_header(const std::string& name,
  828. const std::string& value)
  829. {
  830. return this->protocol::HttpMessage::set_header_pair(name, value);
  831. }
  832. bool SRPCHttpRequest::add_http_header(const std::string& name,
  833. const std::string& value)
  834. {
  835. return this->protocol::HttpMessage::add_header_pair(name, value);
  836. }
  837. bool SRPCHttpRequest::get_http_header(const std::string& name,
  838. std::string& value) const
  839. {
  840. protocol::HttpHeaderCursor cursor(this);
  841. return cursor.find(name, value);
  842. }
  843. bool SRPCHttpResponse::set_http_header(const std::string& name,
  844. const std::string& value)
  845. {
  846. return this->protocol::HttpMessage::set_header_pair(name, value);
  847. }
  848. bool SRPCHttpResponse::add_http_header(const std::string& name,
  849. const std::string& value)
  850. {
  851. return this->protocol::HttpMessage::add_header_pair(name, value);
  852. }
  853. bool SRPCHttpResponse::get_http_header(const std::string& name,
  854. std::string& value) const
  855. {
  856. protocol::HttpHeaderCursor cursor(this);
  857. return cursor.find(name, value);
  858. }
  859. } // namespace srpc