rpc_message_trpc.cc 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569
  1. /*
  2. Copyright (c) 2021 Sogou, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #include <errno.h>
  14. #include <vector>
  15. #include <string>
  16. #include <google/protobuf/util/json_util.h>
  17. #include <google/protobuf/util/type_resolver_util.h>
  18. #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
  19. #include <workflow/HttpUtil.h>
  20. #include <workflow/StringUtil.h>
  21. #include <workflow/json_parser.h>
  22. #include "rpc_message_trpc.h"
  23. #include "rpc_meta_trpc.pb.h"
  24. #include "rpc_basic.h"
  25. #include "rpc_compress.h"
  26. #include "rpc_zero_copy_stream.h"
  27. #include "rpc_module.h"
  28. namespace srpc
  29. {
  30. namespace TRPCHttpHeaders
  31. {
  32. const std::string CompressType = "Content-Encoding";
  33. const std::string DataType = "Content-Type";
  34. const std::string CallType = "trpc-call-type";
  35. const std::string RequestId = "trpc-request-id";
  36. const std::string Timeout = "trpc-timeout";
  37. const std::string Caller = "trpc-caller";
  38. const std::string Callee = "trpc-callee";
  39. const std::string Func = "trpc-func";
  40. const std::string Ret = "trpc-ret";
  41. const std::string FuncRet = "trpc-func-ret";
  42. const std::string ErrorMsg = "trpc-error-msg";
  43. const std::string MessageType = "trpc-message-type";
  44. const std::string TransInfo = "trpc-trans-info";
  45. const std::string SRPCStatus = "SRPC-Status";
  46. const std::string SRPCError = "SRPC-Error";
  47. }
  48. namespace TRPCHttpHeadersCode
  49. {
  50. enum
  51. {
  52. Unknown = 0,
  53. CompressType,
  54. DataType,
  55. CallType,
  56. RequestId,
  57. Timeout,
  58. Caller,
  59. Callee,
  60. Func,
  61. Ret,
  62. FuncRet,
  63. ErrorMsg,
  64. MessageType,
  65. TransInfo,
  66. SRPCStatus,
  67. SRPCError
  68. };
  69. }
  70. struct CaseCmp
  71. {
  72. bool operator()(const std::string& lhs, const std::string& rhs) const
  73. {
  74. return strcasecmp(lhs.c_str(), rhs.c_str()) < 0;
  75. }
  76. };
  77. static int GetHttpHeadersCode(const std::string& header)
  78. {
  79. static const std::map<std::string, int, CaseCmp> M =
  80. {
  81. {TRPCHttpHeaders::CompressType, TRPCHttpHeadersCode::CompressType},
  82. {TRPCHttpHeaders::DataType, TRPCHttpHeadersCode::DataType},
  83. {TRPCHttpHeaders::CallType, TRPCHttpHeadersCode::CallType},
  84. {TRPCHttpHeaders::RequestId, TRPCHttpHeadersCode::RequestId},
  85. {TRPCHttpHeaders::Timeout, TRPCHttpHeadersCode::Timeout},
  86. {TRPCHttpHeaders::Caller, TRPCHttpHeadersCode::Caller},
  87. {TRPCHttpHeaders::Callee, TRPCHttpHeadersCode::Callee},
  88. {TRPCHttpHeaders::Func, TRPCHttpHeadersCode::Func},
  89. {TRPCHttpHeaders::Ret, TRPCHttpHeadersCode::Ret},
  90. {TRPCHttpHeaders::FuncRet, TRPCHttpHeadersCode::FuncRet},
  91. {TRPCHttpHeaders::ErrorMsg, TRPCHttpHeadersCode::ErrorMsg},
  92. {TRPCHttpHeaders::MessageType, TRPCHttpHeadersCode::MessageType},
  93. {TRPCHttpHeaders::TransInfo, TRPCHttpHeadersCode::TransInfo},
  94. {TRPCHttpHeaders::SRPCStatus, TRPCHttpHeadersCode::SRPCStatus},
  95. {TRPCHttpHeaders::SRPCError, TRPCHttpHeadersCode::SRPCError}
  96. };
  97. auto it = M.find(header);
  98. return it == M.end() ? TRPCHttpHeadersCode::Unknown : it->second;
  99. }
  100. static int GetHttpDataType(const std::string &type)
  101. {
  102. static const std::unordered_map<std::string, int> M =
  103. {
  104. {"application/json", RPCDataJson},
  105. {"application/x-protobuf", RPCDataProtobuf},
  106. {"application/protobuf", RPCDataProtobuf},
  107. {"application/pb", RPCDataProtobuf},
  108. {"application/proto", RPCDataProtobuf}
  109. };
  110. auto it = M.find(type);
  111. return it == M.end() ? RPCDataUndefined : it->second;
  112. }
  113. static std::string GetHttpDataTypeStr(int type)
  114. {
  115. switch (type)
  116. {
  117. case RPCDataJson:
  118. return "application/json";
  119. case RPCDataProtobuf:
  120. return "application/proto";
  121. }
  122. return "";
  123. }
  124. static int GetHttpCompressType(const std::string &type)
  125. {
  126. static const std::unordered_map<std::string, int> M =
  127. {
  128. {"identity", RPCCompressNone},
  129. {"x-snappy", RPCCompressSnappy},
  130. {"gzip", RPCCompressGzip},
  131. {"deflate", RPCCompressZlib},
  132. {"x-lz4", RPCCompressLz4}
  133. };
  134. auto it = M.find(type);
  135. return it == M.end() ? RPCCompressNone : it->second;
  136. }
  137. static std::string GetHttpCompressTypeStr(int type)
  138. {
  139. switch (type)
  140. {
  141. case RPCCompressNone:
  142. return "identity";
  143. case RPCCompressSnappy:
  144. return "x-snappy";
  145. case RPCCompressGzip:
  146. return "gzip";
  147. case RPCCompressZlib:
  148. return "deflate";
  149. case RPCCompressLz4:
  150. return "x-lz4";
  151. }
  152. return "";
  153. }
  154. static int Base64Encode(const std::string& in, std::string& out)
  155. {
  156. size_t len = in.length();
  157. size_t base64_len = (len + 2) / 3 * 4;
  158. const unsigned char *f = (const unsigned char *)in.c_str();
  159. out.resize(base64_len + 1);
  160. EVP_EncodeBlock((unsigned char *)&out[0], f, len);
  161. out.pop_back();
  162. return 0;
  163. }
  164. static int Base64Decode(const char *in, size_t len, std::string& out)
  165. {
  166. size_t origin_len = (len + 3) / 4 * 3;
  167. const unsigned char *f = (const unsigned char *)in;
  168. int ret;
  169. int zeros = 0;
  170. out.resize(origin_len);
  171. ret = EVP_DecodeBlock((unsigned char *)&out[0], f, len);
  172. if (ret < 0)
  173. return ret;
  174. while (len != 0 && in[--len] == '=')
  175. zeros++;
  176. if (zeros > 2)
  177. return -1;
  178. out.resize(ret - zeros);
  179. return 0;
  180. }
  181. static std::string JsonEscape(const std::string& in)
  182. {
  183. std::string out;
  184. out.reserve(in.size());
  185. for (char c : in)
  186. {
  187. switch (c)
  188. {
  189. case '\"':
  190. out.append("\\\"");
  191. break;
  192. case '\\':
  193. out.append("\\\\");
  194. break;
  195. case '/':
  196. out.append("\\/");
  197. break;
  198. case '\b':
  199. out.append("\\b");
  200. break;
  201. case '\f':
  202. out.append("\\f");
  203. break;
  204. case '\n':
  205. out.append("\\n");
  206. break;
  207. case '\r':
  208. out.append("\\r");
  209. break;
  210. case '\t':
  211. out.append("\\t");
  212. break;
  213. default:
  214. out.push_back(c);
  215. break;
  216. }
  217. }
  218. return out;
  219. }
  220. using TransInfoMap = ::google::protobuf::Map<::std::string, ::std::string>;
  221. static int DecodeTransInfo(const std::string& content, TransInfoMap& map)
  222. {
  223. json_value_t *json;
  224. json_object_t *obj;
  225. const json_value_t *value;
  226. const char *name, *str;
  227. size_t len;
  228. std::string decoded;
  229. int errno_bak = errno;
  230. errno = EBADMSG;
  231. json = json_value_parse(content.c_str());
  232. if (json == nullptr)
  233. return -1;
  234. errno = errno_bak;
  235. if (json_value_type(json) != JSON_VALUE_OBJECT)
  236. {
  237. json_value_destroy(json);
  238. return -1;
  239. }
  240. obj = json_value_object(json);
  241. json_object_for_each(name, value, obj)
  242. {
  243. str = json_value_string(value);
  244. if (!name || !str)
  245. continue;
  246. len = strlen(str);
  247. if (Base64Decode(str, len, decoded) == 0)
  248. map[name].assign(std::move(decoded));
  249. else
  250. map[name].assign(str, len);
  251. }
  252. json_value_destroy(json);
  253. return 0;
  254. }
  255. static std::string EncodeTransInfo(const TransInfoMap& map)
  256. {
  257. std::string s;
  258. std::string encoded;
  259. s.append("{");
  260. for (const auto& kv : map)
  261. {
  262. Base64Encode(kv.second, encoded);
  263. s.append("\"").append(JsonEscape(kv.first)).append("\":");
  264. s.append("\"").append(encoded).append("\",");
  265. }
  266. if (s.back() == ',')
  267. s.back() = '}';
  268. else
  269. s.push_back('}');
  270. return s;
  271. }
  272. static constexpr const char *kTypePrefix = "type.googleapis.com";
  273. class ResolverInstance
  274. {
  275. public:
  276. static google::protobuf::util::TypeResolver *get_resolver()
  277. {
  278. static ResolverInstance kInstance;
  279. return kInstance.resolver_;
  280. }
  281. private:
  282. ResolverInstance()
  283. {
  284. resolver_ = google::protobuf::util::NewTypeResolverForDescriptorPool(
  285. kTypePrefix, google::protobuf::DescriptorPool::generated_pool());
  286. }
  287. ~ResolverInstance() { delete resolver_; }
  288. google::protobuf::util::TypeResolver *resolver_;
  289. };
  290. static inline std::string GetTypeUrl(const ProtobufIDLMessage *pb_msg)
  291. {
  292. return std::string(kTypePrefix) + "/" + pb_msg->GetDescriptor()->full_name();
  293. }
  294. TRPCMessage::TRPCMessage()
  295. {
  296. this->nreceived = 0;
  297. this->meta_buf = NULL;
  298. this->meta_len = 0;
  299. this->message_len = 0;
  300. memset(this->header, 0, TRPC_HEADER_SIZE);
  301. this->message = new RPCBuffer();
  302. }
  303. TRPCMessage::~TRPCMessage()
  304. {
  305. delete this->message;
  306. delete []this->meta_buf;
  307. delete this->meta;
  308. }
  309. TRPCRequest::TRPCRequest()
  310. {
  311. this->meta = new RequestProtocol();
  312. }
  313. TRPCResponse::TRPCResponse()
  314. {
  315. this->meta = new ResponseProtocol();
  316. }
  317. int TRPCMessage::encode(struct iovec vectors[], int max, size_t size_limit)
  318. {
  319. size_t sz = TRPC_HEADER_SIZE + this->meta_len + this->message_len;
  320. if (sz > 0x7FFFFFFF)
  321. {
  322. errno = EOVERFLOW;
  323. return -1;
  324. }
  325. int ret;
  326. char *p = this->header;
  327. *(uint16_t *)p = ntohs((uint16_t)TrpcMagic::TRPC_MAGIC_VALUE);
  328. p += 2;
  329. p += 1; // TrpcDataFrameType
  330. p += 1; // TrpcDataFrameState
  331. *(uint32_t *)(p) = htonl((uint32_t)sz);
  332. p += 4;
  333. *(uint16_t *)(p) = htons((uint16_t)this->meta_len);
  334. // 2: stream_id + 4 : reserved
  335. vectors[0].iov_base = this->header;
  336. vectors[0].iov_len = TRPC_HEADER_SIZE;
  337. vectors[1].iov_base = this->meta_buf;
  338. vectors[1].iov_len = this->meta_len;
  339. ret = this->message->encode(vectors + 2, max - 2);
  340. if (ret < 0)
  341. return ret;
  342. return 2 + ret;
  343. }
  344. int TRPCMessage::append(const void *buf, size_t *size, size_t size_limit)
  345. {
  346. uint32_t *p;
  347. uint16_t *sp;
  348. size_t header_left, body_received, buf_len;
  349. if (this->nreceived < TRPC_HEADER_SIZE)
  350. {
  351. //receive header
  352. header_left = TRPC_HEADER_SIZE - this->nreceived;
  353. if (*size >= header_left)
  354. {
  355. //receive the whole header and ready to recieve body
  356. memcpy(this->header + this->nreceived, buf, header_left);
  357. this->nreceived += header_left;
  358. sp = (uint16_t *)this->header;
  359. uint16_t magic_value = ntohs(*sp);
  360. if (magic_value != TrpcMagic::TRPC_MAGIC_VALUE ||
  361. this->header[2] || this->header[3])
  362. {
  363. errno = EBADMSG;
  364. return -1;
  365. }
  366. p = (uint32_t *)this->header + 1;
  367. buf_len = ntohl(*p);
  368. sp = (uint16_t *)this->header + 4;
  369. this->meta_len = ntohs(*sp);
  370. this->message_len = buf_len - TRPC_HEADER_SIZE - this->meta_len;
  371. buf_len -= TRPC_HEADER_SIZE;
  372. if (buf_len >= size_limit)
  373. {
  374. errno = EMSGSIZE;
  375. return -1;
  376. }
  377. else if (buf_len > 0)
  378. {
  379. if (*size - header_left > buf_len)
  380. *size = header_left + buf_len;
  381. this->meta_buf = new char[this->meta_len];
  382. if (*size - header_left <= this->meta_len)
  383. {
  384. memcpy(this->meta_buf, (const char *)buf + header_left,
  385. *size - header_left);
  386. } else {
  387. memcpy(this->meta_buf, (const char *)buf + header_left,
  388. this->meta_len);
  389. this->message->append((const char *)buf + header_left + this->meta_len,
  390. *size - header_left - this->meta_len,
  391. BUFFER_MODE_COPY);
  392. }
  393. this->nreceived += *size - header_left;
  394. if (this->nreceived == TRPC_HEADER_SIZE + buf_len)
  395. return 1;
  396. else
  397. return 0;
  398. }
  399. else if (*size == header_left)
  400. {
  401. return 1; // means body_size == 0 and finish recieved header
  402. }
  403. else
  404. {
  405. // means buf_len < 0
  406. errno = EBADMSG;
  407. return -1;
  408. }
  409. }
  410. else
  411. {
  412. // only receive header
  413. memcpy(this->header + this->nreceived, buf, *size);
  414. this->nreceived += *size;
  415. return 0;
  416. }
  417. }
  418. else
  419. {
  420. // have already received the header and now is for body only
  421. body_received = this->nreceived - TRPC_HEADER_SIZE;
  422. buf_len = this->meta_len + this->message_len;
  423. if (body_received + *size > buf_len)
  424. *size = buf_len - body_received;
  425. if (body_received + *size <= this->meta_len)
  426. {
  427. memcpy(this->meta_buf + body_received, buf, *size);
  428. } else if (body_received < this->meta_len) {
  429. memcpy(this->meta_buf + body_received, buf,
  430. this->meta_len - body_received);
  431. if (body_received + *size > this->meta_len)// useless. always true
  432. this->message->append((const char *)buf + this->meta_len - body_received,
  433. *size - this->meta_len + body_received,
  434. BUFFER_MODE_COPY);
  435. } else {
  436. this->message->append((const char *)buf, *size, BUFFER_MODE_COPY);
  437. }
  438. this->nreceived += *size;
  439. return this->nreceived == TRPC_HEADER_SIZE + buf_len;
  440. }
  441. }
  442. bool TRPCRequest::deserialize_meta()
  443. {
  444. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  445. if (!meta->ParseFromArray(this->meta_buf, (int)this->meta_len))
  446. return false;
  447. if (meta->version() != TrpcProtoVersion::TRPC_PROTO_V1 ||
  448. meta->call_type() != TrpcCallType::TRPC_UNARY_CALL ||
  449. meta->content_type() != TrpcContentEncodeType::TRPC_PROTO_ENCODE)
  450. {
  451. // this->trpc_error = ST_ERR_UNSUPPORTED_PROTO_TYPE;
  452. return false;
  453. }
  454. // this->timeout = meta->timeout();
  455. return true;
  456. }
  457. bool TRPCResponse::deserialize_meta()
  458. {
  459. ResponseProtocol *meta = static_cast<ResponseProtocol *>(this->meta);
  460. if (!meta->ParseFromArray(this->meta_buf, (int)this->meta_len))
  461. return false;
  462. this->srpc_status_code = this->status_code_trpc_srpc(meta->ret());
  463. if (!meta->error_msg().empty())
  464. this->srpc_error_msg = meta->error_msg().data();
  465. return true;
  466. }
  467. bool TRPCRequest::serialize_meta()
  468. {
  469. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  470. meta->set_content_type(TrpcContentEncodeType::TRPC_PROTO_ENCODE);
  471. meta->set_version(TrpcProtoVersion::TRPC_PROTO_V1);
  472. meta->set_call_type(TrpcCallType::TRPC_UNARY_CALL);
  473. this->meta_len = meta->ByteSizeLong();
  474. this->meta_buf = new char[this->meta_len];
  475. return this->meta->SerializeToArray(this->meta_buf, (int)this->meta_len);
  476. }
  477. bool TRPCResponse::serialize_meta()
  478. {
  479. ResponseProtocol *meta = static_cast<ResponseProtocol *>(this->meta);
  480. meta->set_version(TrpcProtoVersion::TRPC_PROTO_V1);
  481. meta->set_call_type(TrpcCallType::TRPC_UNARY_CALL);
  482. meta->set_ret(this->status_code_srpc_trpc(this->srpc_status_code));
  483. // meta->set_error_msg(this->error_msg_srpc_trpc(this->srpc_status_code));
  484. meta->set_error_msg(this->srpc_error_msg);
  485. this->meta_len = meta->ByteSizeLong();
  486. this->meta_buf = new char[this->meta_len];
  487. return meta->SerializeToArray(this->meta_buf, (int)this->meta_len);
  488. }
  489. inline int TRPCMessage::compress_type_trpc_srpc(int trpc_content_encoding) const
  490. {
  491. switch (trpc_content_encoding)
  492. {
  493. case TrpcCompressType::TRPC_DEFAULT_COMPRESS :
  494. return RPCCompressNone;
  495. case TrpcCompressType::TRPC_GZIP_COMPRESS :
  496. return RPCCompressGzip;
  497. case TrpcCompressType::TRPC_SNAPPY_COMPRESS :
  498. return RPCCompressSnappy;
  499. default :
  500. return -1;
  501. }
  502. }
  503. inline int TRPCMessage::compress_type_srpc_trpc(int srpc_compress_type) const
  504. {
  505. switch (srpc_compress_type)
  506. {
  507. case RPCCompressNone :
  508. return TrpcCompressType::TRPC_DEFAULT_COMPRESS;
  509. case RPCCompressGzip :
  510. return TrpcCompressType::TRPC_GZIP_COMPRESS;
  511. case RPCCompressSnappy :
  512. return TrpcCompressType::TRPC_SNAPPY_COMPRESS;
  513. case RPCCompressZlib :
  514. return TrpcCompressType::TRPC_ZLIB_COMPRESS;
  515. case RPCCompressLz4 :
  516. return TrpcCompressType::TRPC_LZ4_COMPRESS;
  517. default :
  518. return -1;
  519. }
  520. }
  521. inline int TRPCMessage::data_type_trpc_srpc(int trpc_content_type) const
  522. {
  523. switch (trpc_content_type)
  524. {
  525. case TrpcContentEncodeType::TRPC_PROTO_ENCODE :
  526. return RPCDataProtobuf;
  527. case TrpcContentEncodeType::TRPC_JSON_ENCODE :
  528. return RPCDataJson;
  529. default :
  530. return -1;
  531. }
  532. }
  533. inline int TRPCMessage::data_type_srpc_trpc(int srpc_data_type) const
  534. {
  535. switch (srpc_data_type)
  536. {
  537. case RPCDataProtobuf :
  538. return TrpcContentEncodeType::TRPC_PROTO_ENCODE;
  539. case RPCDataJson :
  540. return TrpcContentEncodeType::TRPC_JSON_ENCODE;
  541. default :
  542. return -1;
  543. }
  544. }
  545. inline int TRPCMessage::status_code_srpc_trpc(int srpc_status_code) const
  546. {
  547. switch (srpc_status_code)
  548. {
  549. case RPCStatusOK:
  550. return TrpcRetCode::TRPC_INVOKE_SUCCESS;
  551. case RPCStatusServiceNotFound:
  552. return TrpcRetCode::TRPC_SERVER_NOSERVICE_ERR;
  553. case RPCStatusMethodNotFound:
  554. return TrpcRetCode::TRPC_SERVER_NOFUNC_ERR;
  555. case RPCStatusURIInvalid:
  556. case RPCStatusMetaError:
  557. case RPCStatusReqCompressSizeInvalid:
  558. case RPCStatusReqCompressNotSupported:
  559. case RPCStatusReqCompressError:
  560. case RPCStatusReqSerializeError:
  561. return TrpcRetCode::TRPC_CLIENT_ENCODE_ERR;
  562. case RPCStatusReqDecompressSizeInvalid:
  563. case RPCStatusReqDecompressNotSupported:
  564. case RPCStatusReqDecompressError:
  565. case RPCStatusReqDeserializeError:
  566. return TrpcRetCode::TRPC_SERVER_DECODE_ERR;
  567. case RPCStatusRespCompressSizeInvalid:
  568. case RPCStatusRespCompressNotSupported:
  569. case RPCStatusRespCompressError:
  570. case RPCStatusRespSerializeError:
  571. return TrpcRetCode::TRPC_SERVER_ENCODE_ERR;
  572. case RPCStatusRespDecompressSizeInvalid:
  573. case RPCStatusRespDecompressNotSupported:
  574. case RPCStatusRespDecompressError:
  575. case RPCStatusRespDeserializeError:
  576. return TrpcRetCode::TRPC_CLIENT_DECODE_ERR;
  577. case RPCStatusUpstreamFailed:
  578. case RPCStatusDNSError:
  579. return TrpcRetCode::TRPC_CLIENT_ROUTER_ERR;
  580. case RPCStatusSystemError:
  581. return TrpcRetCode::TRPC_SERVER_SYSTEM_ERR;
  582. // return TrpcRetCode::TRPC_CLINET_NETWORK_ERR;
  583. default:
  584. return TrpcRetCode::TRPC_INVOKE_UNKNOWN_ERR;
  585. }
  586. }
  587. inline int TRPCMessage::status_code_trpc_srpc(int trpc_ret_code) const
  588. {
  589. switch (trpc_ret_code)
  590. {
  591. case TrpcRetCode::TRPC_INVOKE_SUCCESS:
  592. return RPCStatusOK;
  593. case TrpcRetCode::TRPC_SERVER_NOSERVICE_ERR:
  594. return RPCStatusServiceNotFound;
  595. case TrpcRetCode::TRPC_SERVER_NOFUNC_ERR:
  596. return RPCStatusMethodNotFound;
  597. case TrpcRetCode::TRPC_CLIENT_ENCODE_ERR:
  598. return RPCStatusReqSerializeError;
  599. case TrpcRetCode::TRPC_SERVER_DECODE_ERR:
  600. return RPCStatusReqDeserializeError;
  601. case TrpcRetCode::TRPC_SERVER_ENCODE_ERR:
  602. return RPCStatusRespSerializeError;
  603. case TrpcRetCode::TRPC_CLIENT_DECODE_ERR:
  604. return RPCStatusRespDeserializeError;
  605. case TrpcRetCode::TRPC_CLIENT_ROUTER_ERR:
  606. return RPCStatusUpstreamFailed;
  607. // return RPCStatusDNSError;
  608. default:
  609. return RPCStatusSystemError;
  610. }
  611. }
  612. const char *TRPCMessage::error_msg_srpc_trpc(int srpc_status_code) const
  613. {
  614. return "";//TODO
  615. }
  616. int TRPCRequest::get_compress_type() const
  617. {
  618. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  619. return this->compress_type_trpc_srpc(meta->content_encoding());
  620. }
  621. void TRPCRequest::set_compress_type(int type)
  622. {
  623. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  624. meta->set_content_encoding(this->compress_type_srpc_trpc(type));
  625. }
  626. int TRPCRequest::get_data_type() const
  627. {
  628. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  629. return this->data_type_trpc_srpc(meta->content_type());
  630. }
  631. void TRPCRequest::set_data_type(int type)
  632. {
  633. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  634. meta->set_content_type(this->data_type_srpc_trpc(type));
  635. }
  636. void TRPCRequest::set_request_id(int32_t req_id)
  637. {
  638. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  639. meta->set_request_id(req_id);
  640. }
  641. int32_t TRPCRequest::get_request_id() const
  642. {
  643. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  644. return meta->request_id();
  645. }
  646. const std::string& TRPCRequest::get_service_name() const
  647. {
  648. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  649. return meta->callee();
  650. }
  651. const std::string& TRPCRequest::get_method_name() const
  652. {
  653. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  654. return meta->func();
  655. }
  656. // like p.service_name
  657. void TRPCRequest::set_service_name(const std::string& service_name)
  658. {
  659. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  660. meta->set_callee(service_name); // useless in transfer protocol
  661. }
  662. // like /p.service_name/method_name
  663. void TRPCRequest::set_method_name(const std::string& method_name)
  664. {
  665. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  666. meta->set_func(method_name); // use this prefix as service_name for route
  667. }
  668. void TRPCRequest::set_callee_timeout(int timeout)
  669. {
  670. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  671. meta->set_timeout(timeout);
  672. }
  673. void TRPCRequest::set_caller_name(const std::string& caller_name)
  674. {
  675. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  676. meta->set_caller(caller_name);
  677. }
  678. const std::string& TRPCRequest::get_caller_name() const
  679. {
  680. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  681. return meta->caller();
  682. }
  683. int TRPCResponse::get_compress_type() const
  684. {
  685. ResponseProtocol *meta = static_cast<ResponseProtocol *>(this->meta);
  686. return this->compress_type_trpc_srpc(meta->content_encoding());
  687. }
  688. int TRPCResponse::get_data_type() const
  689. {
  690. ResponseProtocol *meta = static_cast<ResponseProtocol *>(this->meta);
  691. return this->data_type_trpc_srpc(meta->content_type());
  692. }
  693. void TRPCResponse::set_compress_type(int type)
  694. {
  695. ResponseProtocol *meta = static_cast<ResponseProtocol *>(this->meta);
  696. meta->set_content_encoding(this->compress_type_srpc_trpc(type));
  697. }
  698. void TRPCResponse::set_data_type(int type)
  699. {
  700. ResponseProtocol *meta = static_cast<ResponseProtocol *>(this->meta);
  701. meta->set_content_type(this->data_type_srpc_trpc(type));
  702. }
  703. void TRPCResponse::set_request_id(int32_t req_id)
  704. {
  705. ResponseProtocol *meta = static_cast<ResponseProtocol *>(this->meta);
  706. meta->set_request_id(req_id);
  707. }
  708. int32_t TRPCResponse::get_request_id() const
  709. {
  710. ResponseProtocol *meta = static_cast<ResponseProtocol *>(this->meta);
  711. return meta->request_id();
  712. }
  713. int TRPCResponse::get_status_code() const
  714. {
  715. return this->srpc_status_code;
  716. }
  717. void TRPCResponse::set_status_code(int code)
  718. {
  719. this->srpc_status_code = code;
  720. if (code != RPCStatusOK)
  721. this->srpc_error_msg = this->get_errmsg();
  722. }
  723. int TRPCResponse::get_error() const
  724. {
  725. ResponseProtocol *meta = static_cast<ResponseProtocol *>(this->meta);
  726. return meta->ret();//TODO
  727. }
  728. void TRPCResponse::set_error(int error)
  729. {
  730. ResponseProtocol *meta = static_cast<ResponseProtocol *>(this->meta);
  731. meta->set_ret(error);
  732. }
  733. const char *TRPCResponse::get_errmsg() const
  734. {
  735. ResponseProtocol *meta = static_cast<ResponseProtocol *>(this->meta);
  736. return meta->error_msg().c_str();
  737. }
  738. int TRPCMessage::serialize(const ProtobufIDLMessage *pb_msg)
  739. {
  740. if (!pb_msg) //TODO: make sure trpc is OK to send a NULL user pb_msg
  741. return RPCStatusOK;
  742. using namespace google::protobuf;
  743. ResponseProtocol *meta = dynamic_cast<ResponseProtocol *>(this->meta);
  744. bool is_resp = (meta != NULL);
  745. int data_type = this->get_data_type();
  746. RPCOutputStream output_stream(this->message, pb_msg->ByteSizeLong());
  747. int ret;
  748. if (data_type == RPCDataProtobuf)
  749. ret = pb_msg->SerializeToZeroCopyStream(&output_stream) ? 0 : -1;
  750. else if (data_type == RPCDataJson)
  751. {
  752. std::string binary_input = pb_msg->SerializeAsString();
  753. io::ArrayInputStream input_stream(binary_input.data(),
  754. (int)binary_input.size());
  755. const auto *pool = pb_msg->GetDescriptor()->file()->pool();
  756. auto *resolver = (pool == DescriptorPool::generated_pool() ?
  757. ResolverInstance::get_resolver() :
  758. util::NewTypeResolverForDescriptorPool(kTypePrefix, pool));
  759. util::JsonOptions options;
  760. options.add_whitespace = this->get_json_add_whitespace();
  761. options.always_print_enums_as_ints = this->get_json_enums_as_ints();
  762. options.preserve_proto_field_names = this->get_json_preserve_names();
  763. options.always_print_primitive_fields = this->get_json_print_primitive();
  764. ret = BinaryToJsonStream(resolver, GetTypeUrl(pb_msg), &input_stream,
  765. &output_stream, options).ok() ? 0 : -1;
  766. if (pool != DescriptorPool::generated_pool())
  767. delete resolver;
  768. }
  769. else
  770. ret = -1;
  771. this->message_len = this->message->size();
  772. if (ret < 0)
  773. return is_resp ? RPCStatusRespSerializeError :
  774. RPCStatusReqSerializeError;
  775. return RPCStatusOK;
  776. }
  777. int TRPCMessage::deserialize(ProtobufIDLMessage *pb_msg)
  778. {
  779. using namespace google::protobuf;
  780. ResponseProtocol *meta = dynamic_cast<ResponseProtocol *>(this->meta);
  781. bool is_resp = (meta != NULL);
  782. int data_type = this->get_data_type();
  783. int ret;
  784. RPCInputStream input_stream(this->message);
  785. if (data_type == RPCDataProtobuf)
  786. ret = pb_msg->ParseFromZeroCopyStream(&input_stream) ? 0 : -1;
  787. else if (data_type == RPCDataJson)
  788. {
  789. std::string binary_output;
  790. io::StringOutputStream output_stream(&binary_output);
  791. const auto *pool = pb_msg->GetDescriptor()->file()->pool();
  792. auto *resolver = (pool == DescriptorPool::generated_pool() ?
  793. ResolverInstance::get_resolver() :
  794. util::NewTypeResolverForDescriptorPool(kTypePrefix, pool));
  795. if (JsonToBinaryStream(resolver, GetTypeUrl(pb_msg),
  796. &input_stream, &output_stream).ok())
  797. {
  798. ret = pb_msg->ParseFromString(binary_output) ? 0 : -1;
  799. }
  800. else
  801. ret = -1;
  802. if (pool != DescriptorPool::generated_pool())
  803. delete resolver;
  804. }
  805. else
  806. ret = -1;
  807. if (ret < 0)
  808. return is_resp ? RPCStatusRespDeserializeError :
  809. RPCStatusReqDeserializeError;
  810. return RPCStatusOK;
  811. }
  812. int TRPCMessage::compress()
  813. {
  814. ResponseProtocol *meta = dynamic_cast<ResponseProtocol *>(this->meta);
  815. bool is_resp = (meta != NULL);
  816. int type = this->get_compress_type();
  817. size_t buflen = this->message_len;
  818. int status_code = RPCStatusOK;
  819. if (buflen == 0)
  820. return status_code;
  821. if (type == RPCCompressNone)
  822. return status_code;
  823. static RPCCompressor *compressor = RPCCompressor::get_instance();
  824. int ret = compressor->lease_compressed_size(type, buflen);
  825. if (ret == -2)
  826. return is_resp ? RPCStatusReqCompressNotSupported :
  827. RPCStatusRespCompressNotSupported;
  828. else if (ret <= 0)
  829. return is_resp ? RPCStatusRespCompressSizeInvalid :
  830. RPCStatusReqCompressSizeInvalid;
  831. //buflen = ret;
  832. RPCBuffer *dst_buf = new RPCBuffer();
  833. ret = compressor->serialize_to_compressed(this->message, dst_buf, type);
  834. if (ret == -2)
  835. {
  836. status_code = is_resp ? RPCStatusRespCompressNotSupported :
  837. RPCStatusReqCompressNotSupported;
  838. }
  839. else if (ret == -1)
  840. {
  841. status_code = is_resp ? RPCStatusRespCompressError :
  842. RPCStatusReqCompressError;
  843. }
  844. else if (ret <= 0)
  845. {
  846. status_code = is_resp ? RPCStatusRespCompressSizeInvalid :
  847. RPCStatusReqCompressSizeInvalid;
  848. }
  849. else
  850. buflen = ret;
  851. if (status_code == RPCStatusOK)
  852. {
  853. delete this->message;
  854. this->message = dst_buf;
  855. this->message_len = buflen;
  856. }
  857. else
  858. delete dst_buf;
  859. return status_code;
  860. }
  861. int TRPCMessage::decompress()
  862. {
  863. ResponseProtocol *meta = dynamic_cast<ResponseProtocol *>(this->meta);
  864. bool is_resp = (meta != NULL);
  865. int type = this->get_compress_type();
  866. int status_code = RPCStatusOK;
  867. if (this->message_len == 0 || type == RPCCompressNone)
  868. return status_code;
  869. RPCBuffer *dst_buf = new RPCBuffer();
  870. static RPCCompressor *compressor = RPCCompressor::get_instance();
  871. int ret = compressor->parse_from_compressed(this->message, dst_buf, type);
  872. if (ret == -2)
  873. {
  874. status_code = is_resp ? RPCStatusRespDecompressNotSupported :
  875. RPCStatusReqDecompressNotSupported;
  876. }
  877. else if (ret == -1)
  878. {
  879. status_code = is_resp ? RPCStatusRespDecompressError :
  880. RPCStatusReqDecompressError;
  881. }
  882. else if (ret <= 0)
  883. {
  884. status_code = is_resp ? RPCStatusRespDecompressSizeInvalid :
  885. RPCStatusReqDecompressSizeInvalid;
  886. }
  887. if (status_code == RPCStatusOK)
  888. {
  889. delete this->message;
  890. this->message = dst_buf;
  891. this->message_len = ret;
  892. }
  893. else
  894. delete dst_buf;
  895. return status_code;
  896. }
  897. static std::string set_trace_parent(std::string& trace, std::string& span,
  898. const RPCModuleData& data)
  899. {
  900. std::string str = "00-"; // set version
  901. char trace_id_buf[SRPC_TRACEID_SIZE * 2 + 1];
  902. TRACE_ID_BIN_TO_HEX((uint64_t *)trace.data(), trace_id_buf);
  903. str.append(trace_id_buf);
  904. str.append("-");
  905. char span_id_buf[SRPC_SPANID_SIZE * 2 + 1];
  906. SPAN_ID_BIN_TO_HEX((uint64_t *)span.data(), span_id_buf);
  907. str.append(span_id_buf);
  908. str.append("-");
  909. str.append("01"); // set traceflag : sampled
  910. return str;
  911. }
  912. bool TRPCRequest::set_meta_module_data(const RPCModuleData& data)
  913. {
  914. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  915. std::string trace;
  916. std::string span;
  917. int flag = 0;
  918. for (const auto & pair : data)
  919. {
  920. if (pair.first.compare(SRPC_TRACE_ID) == 0)
  921. {
  922. trace = pair.second;
  923. flag |= 1;
  924. }
  925. else if (pair.first.compare(SRPC_SPAN_ID) == 0)
  926. {
  927. span = pair.second;
  928. flag |= (1 << 1);
  929. }
  930. else
  931. meta->mutable_trans_info()->insert({pair.first, pair.second});
  932. }
  933. if (flag == 3)
  934. meta->mutable_trans_info()->insert({OTLP_TRACE_PARENT,
  935. set_trace_parent(trace, span, data)});
  936. return true;
  937. }
  938. static bool get_trace_parent(const std::string& str, RPCModuleData& data)
  939. {
  940. // only support version = 00 : "00-" trace-id "-" parent-id "-" trace-flags
  941. size_t begin = OTLP_TRACE_VERSION_SIZE;
  942. if (str.length() < 55 || str.substr(0, begin).compare("00") != 0)
  943. return false;
  944. // data[OTLP_TRACE_VERSION] = "00";
  945. uint64_t trace[2];
  946. begin += 1;
  947. TRACE_ID_HEX_TO_BIN(str.substr(begin, SRPC_TRACEID_SIZE * 2).data(), trace);
  948. data[SRPC_TRACE_ID] = std::string((char *)trace, SRPC_TRACEID_SIZE);
  949. uint64_t span[1];
  950. begin += SRPC_TRACEID_SIZE * 2 + 1;
  951. SPAN_ID_HEX_TO_BIN(str.substr(begin, SRPC_SPANID_SIZE * 2).data(), span);
  952. data[SRPC_SPAN_ID] = std::string((char *)span, SRPC_SPANID_SIZE);
  953. // begin += SRPC_SPANID_SIZE + 1;
  954. // data[OTLP_TRACE_FLAG] = str.substr(begin);
  955. return true;
  956. }
  957. bool TRPCRequest::get_meta_module_data(RPCModuleData& data) const
  958. {
  959. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  960. for (const auto & pair : meta->trans_info())
  961. {
  962. if (pair.first.compare(OTLP_TRACE_PARENT) == 0)
  963. get_trace_parent(pair.second, data);
  964. else if (pair.first.compare(OTLP_TRACE_STATE) == 0)
  965. ;// TODO: support tracestate
  966. else
  967. data.insert(pair);
  968. }
  969. return true;
  970. }
  971. bool TRPCRequest::trim_method_prefix()
  972. {
  973. RequestProtocol *meta = static_cast<RequestProtocol *>(this->meta);
  974. std::string *func = meta->mutable_func();
  975. if ((*func)[0] == '/')
  976. {
  977. size_t pos = func->find('/', 1);
  978. if (pos != std::string::npos)
  979. {
  980. // callee is not guaranteed being set as service_name in transfer protocol
  981. // so server need to get from func and set on callee for local use
  982. meta->set_callee(func->substr(1, pos - 1));
  983. meta->set_func(func->substr(pos + 1, func->length()));
  984. return true;
  985. }
  986. }
  987. meta->set_callee(""); // make it fail in find_service()
  988. return false;
  989. }
  990. bool TRPCResponse::set_meta_module_data(const RPCModuleData& data)
  991. {
  992. ResponseProtocol *meta = static_cast<ResponseProtocol *>(this->meta);
  993. for (const auto & pair : data)
  994. meta->mutable_trans_info()->insert({pair.first, pair.second});
  995. return true;
  996. }
  997. bool TRPCResponse::get_meta_module_data(RPCModuleData& data) const
  998. {
  999. ResponseProtocol *meta = static_cast<ResponseProtocol *>(this->meta);
  1000. for (const auto & pair : meta->trans_info())
  1001. data.insert(pair);
  1002. return true;
  1003. }
  1004. bool TRPCHttpRequest::serialize_meta()
  1005. {
  1006. if (this->message->size() > 0x7FFFFFFF)
  1007. return false;
  1008. int data_type = this->get_data_type();
  1009. int compress_type = this->get_compress_type();
  1010. std::string uri("/");
  1011. uri += this->get_service_name();
  1012. uri += "/";
  1013. uri += this->get_method_name();
  1014. set_http_version("HTTP/1.1");
  1015. set_method("POST");
  1016. set_request_uri(uri);
  1017. //set header
  1018. set_header_pair(TRPCHttpHeaders::DataType,
  1019. GetHttpDataTypeStr(data_type));
  1020. set_header_pair(TRPCHttpHeaders::CompressType,
  1021. GetHttpCompressTypeStr(compress_type));
  1022. set_header_pair("Connection", "Keep-Alive");
  1023. set_header_pair("Content-Length", std::to_string(this->message_len));
  1024. set_header_pair(TRPCHttpHeaders::CallType, "0");
  1025. set_header_pair(TRPCHttpHeaders::RequestId,
  1026. std::to_string(this->get_request_id()));
  1027. set_header_pair(TRPCHttpHeaders::Callee, this->get_method_name());
  1028. set_header_pair(TRPCHttpHeaders::Func, this->get_service_name());
  1029. set_header_pair(TRPCHttpHeaders::Caller, this->get_caller_name());
  1030. auto *req_meta = (RequestProtocol *)this->meta;
  1031. set_header_pair(TRPCHttpHeaders::TransInfo,
  1032. EncodeTransInfo(req_meta->trans_info()));
  1033. const void *buffer;
  1034. size_t buflen;
  1035. while (buflen = this->message->fetch(&buffer), buffer && buflen > 0)
  1036. this->append_output_body_nocopy(buffer, buflen);
  1037. return true;
  1038. }
  1039. bool TRPCHttpRequest::deserialize_meta()
  1040. {
  1041. const char *request_uri = this->get_request_uri();
  1042. protocol::HttpHeaderCursor header_cursor(this);
  1043. auto *meta = (RequestProtocol *)this->meta;
  1044. std::string key, value;
  1045. this->set_data_type(RPCDataJson);
  1046. this->set_compress_type(RPCCompressNone);
  1047. while (header_cursor.next(key, value))
  1048. {
  1049. switch (GetHttpHeadersCode(key))
  1050. {
  1051. case TRPCHttpHeadersCode::DataType:
  1052. this->set_data_type(GetHttpDataType(value));
  1053. break;
  1054. case TRPCHttpHeadersCode::CompressType:
  1055. this->set_compress_type(GetHttpCompressType(value));
  1056. break;
  1057. case TRPCHttpHeadersCode::CallType:
  1058. meta->set_call_type(std::atoi(value.c_str()));
  1059. break;
  1060. case TRPCHttpHeadersCode::RequestId:
  1061. meta->set_request_id(std::atoi(value.c_str()));
  1062. break;
  1063. case TRPCHttpHeadersCode::Timeout:
  1064. meta->set_timeout(std::atoi(value.c_str()));
  1065. break;
  1066. case TRPCHttpHeadersCode::Caller:
  1067. meta->set_caller(value);
  1068. break;
  1069. case TRPCHttpHeadersCode::Callee:
  1070. meta->set_callee(value);
  1071. break;
  1072. case TRPCHttpHeadersCode::MessageType:
  1073. meta->set_message_type(std::atoi(value.c_str()));
  1074. break;
  1075. case TRPCHttpHeadersCode::TransInfo:
  1076. DecodeTransInfo(value, *meta->mutable_trans_info());
  1077. break;
  1078. default:
  1079. break;
  1080. }
  1081. }
  1082. if (request_uri && request_uri[0] == '/')
  1083. {
  1084. std::string str = request_uri + 1;
  1085. auto pos = str.find_first_of("?#");
  1086. if (pos != std::string::npos)
  1087. str.erase(pos);
  1088. if (!str.empty() && str.back() == '/')
  1089. str.pop_back();
  1090. pos = str.find_last_of('/');
  1091. if (pos != std::string::npos)
  1092. {
  1093. this->set_service_name(str.substr(0, pos));
  1094. this->set_method_name(str.substr(pos + 1));
  1095. }
  1096. }
  1097. const void *ptr;
  1098. size_t len;
  1099. this->get_parsed_body(&ptr, &len);
  1100. if (len > 0x7FFFFFFF)
  1101. return false;
  1102. protocol::HttpChunkCursor chunk_cursor(this);
  1103. RPCBuffer *buf = this->get_buffer();
  1104. size_t msg_len = 0;
  1105. while (chunk_cursor.next(&ptr, &len))
  1106. {
  1107. msg_len += len;
  1108. buf->append((const char *)ptr, len, BUFFER_MODE_NOCOPY);
  1109. }
  1110. if (this->get_compress_type() == RPCCompressNone &&
  1111. msg_len == 0 && this->get_data_type() == RPCDataJson)
  1112. {
  1113. buf->append("{}", 2, BUFFER_MODE_NOCOPY);
  1114. msg_len = 2;
  1115. }
  1116. this->message_len = msg_len;
  1117. return true;
  1118. }
  1119. bool TRPCHttpResponse::serialize_meta()
  1120. {
  1121. if (this->message->size() > 0x7FFFFFFF)
  1122. return false;
  1123. auto *meta = (ResponseProtocol *)this->meta;
  1124. int data_type = this->get_data_type();
  1125. int compress_type = this->get_compress_type();
  1126. int rpc_status_code = this->get_status_code();
  1127. int rpc_error = this->get_error();
  1128. const char *http_status_code = this->protocol::HttpResponse::get_status_code();
  1129. set_http_version("HTTP/1.1");
  1130. if (rpc_status_code == RPCStatusOK)
  1131. {
  1132. if (http_status_code)
  1133. protocol::HttpUtil::set_response_status(this, atoi(http_status_code));
  1134. else
  1135. protocol::HttpUtil::set_response_status(this, HttpStatusOK);
  1136. }
  1137. else if (rpc_status_code == RPCStatusServiceNotFound
  1138. || rpc_status_code == RPCStatusMethodNotFound
  1139. || rpc_status_code == RPCStatusMetaError
  1140. || rpc_status_code == RPCStatusURIInvalid)
  1141. {
  1142. protocol::HttpUtil::set_response_status(this, HttpStatusBadRequest);
  1143. }
  1144. else if (rpc_status_code == RPCStatusRespCompressNotSupported
  1145. || rpc_status_code == RPCStatusRespDecompressNotSupported
  1146. || rpc_status_code == RPCStatusIDLSerializeNotSupported
  1147. || rpc_status_code == RPCStatusIDLDeserializeNotSupported)
  1148. {
  1149. protocol::HttpUtil::set_response_status(this, HttpStatusNotImplemented);
  1150. }
  1151. else if (rpc_status_code == RPCStatusUpstreamFailed)
  1152. {
  1153. protocol::HttpUtil::set_response_status(this,
  1154. HttpStatusServiceUnavailable);
  1155. }
  1156. else
  1157. {
  1158. protocol::HttpUtil::set_response_status(this,
  1159. HttpStatusInternalServerError);
  1160. }
  1161. //set header
  1162. set_header_pair(TRPCHttpHeaders::SRPCStatus,
  1163. std::to_string(rpc_status_code));
  1164. set_header_pair(TRPCHttpHeaders::SRPCError,
  1165. std::to_string(rpc_error));
  1166. set_header_pair(TRPCHttpHeaders::DataType,
  1167. GetHttpDataTypeStr(data_type));
  1168. set_header_pair(TRPCHttpHeaders::CompressType,
  1169. GetHttpCompressTypeStr(compress_type));
  1170. set_header_pair(TRPCHttpHeaders::CallType,
  1171. std::to_string(meta->call_type()));
  1172. set_header_pair(TRPCHttpHeaders::RequestId,
  1173. std::to_string(meta->request_id()));
  1174. set_header_pair(TRPCHttpHeaders::Ret,
  1175. std::to_string(meta->ret()));
  1176. set_header_pair(TRPCHttpHeaders::FuncRet,
  1177. std::to_string(meta->func_ret()));
  1178. set_header_pair(TRPCHttpHeaders::ErrorMsg,
  1179. meta->error_msg());
  1180. set_header_pair(TRPCHttpHeaders::MessageType,
  1181. std::to_string(meta->message_type()));
  1182. set_header_pair("Content-Length", std::to_string(this->message_len));
  1183. set_header_pair("Connection", "Keep-Alive");
  1184. set_header_pair(TRPCHttpHeaders::TransInfo,
  1185. EncodeTransInfo(meta->trans_info()));
  1186. const void *buffer;
  1187. size_t buflen;
  1188. while (buflen = this->message->fetch(&buffer), buffer && buflen > 0)
  1189. this->append_output_body_nocopy(buffer, buflen);
  1190. return true;
  1191. }
  1192. bool TRPCHttpResponse::deserialize_meta()
  1193. {
  1194. protocol::HttpHeaderCursor header_cursor(this);
  1195. auto *meta = (ResponseProtocol *)this->meta;
  1196. std::string key, value;
  1197. this->set_data_type(RPCDataJson);
  1198. this->set_compress_type(RPCCompressNone);
  1199. while (header_cursor.next(key, value))
  1200. {
  1201. switch (GetHttpHeadersCode(key))
  1202. {
  1203. case TRPCHttpHeadersCode::DataType:
  1204. this->set_data_type(GetHttpDataType(value));
  1205. break;
  1206. case TRPCHttpHeadersCode::CompressType:
  1207. this->set_compress_type(GetHttpCompressType(value));
  1208. break;
  1209. case TRPCHttpHeadersCode::CallType:
  1210. meta->set_call_type(std::atoi(value.c_str()));
  1211. break;
  1212. case TRPCHttpHeadersCode::RequestId:
  1213. meta->set_request_id(std::atoi(value.c_str()));
  1214. break;
  1215. case TRPCHttpHeadersCode::Ret:
  1216. meta->set_ret(std::atoi(value.c_str()));
  1217. break;
  1218. case TRPCHttpHeadersCode::FuncRet:
  1219. meta->set_func_ret(std::atoi(value.c_str()));
  1220. break;
  1221. case TRPCHttpHeadersCode::ErrorMsg:
  1222. meta->set_error_msg(value);
  1223. break;
  1224. case TRPCHttpHeadersCode::MessageType:
  1225. meta->set_message_type(std::atoi(value.c_str()));
  1226. break;
  1227. case TRPCHttpHeadersCode::TransInfo:
  1228. DecodeTransInfo(value, *meta->mutable_trans_info());
  1229. break;
  1230. default:
  1231. break;
  1232. }
  1233. }
  1234. const void *ptr;
  1235. size_t len;
  1236. this->get_parsed_body(&ptr, &len);
  1237. if (len > 0x7FFFFFFF)
  1238. return false;
  1239. protocol::HttpChunkCursor chunk_cursor(this);
  1240. RPCBuffer *buf = this->get_buffer();
  1241. size_t msg_len = 0;
  1242. while (chunk_cursor.next(&ptr, &len))
  1243. {
  1244. buf->append((const char *)ptr, len, BUFFER_MODE_NOCOPY);
  1245. msg_len += len;
  1246. }
  1247. this->message_len = msg_len;
  1248. return true;
  1249. }
  1250. bool TRPCHttpRequest::set_meta_module_data(const RPCModuleData& data)
  1251. {
  1252. return this->TRPCRequest::set_meta_module_data(data);
  1253. }
  1254. bool TRPCHttpRequest::get_meta_module_data(RPCModuleData& data) const
  1255. {
  1256. return this->TRPCRequest::get_meta_module_data(data);
  1257. }
  1258. bool TRPCHttpResponse::set_meta_module_data(const RPCModuleData& data)
  1259. {
  1260. return this->TRPCResponse::set_meta_module_data(data);
  1261. }
  1262. bool TRPCHttpResponse::get_meta_module_data(RPCModuleData& data) const
  1263. {
  1264. return this->TRPCResponse::get_meta_module_data(data);
  1265. }
  1266. bool TRPCHttpRequest::set_http_header(const std::string& name,
  1267. const std::string& value)
  1268. {
  1269. return this->protocol::HttpMessage::set_header_pair(name, value);
  1270. }
  1271. bool TRPCHttpRequest::add_http_header(const std::string& name,
  1272. const std::string& value)
  1273. {
  1274. return this->protocol::HttpMessage::add_header_pair(name, value);
  1275. }
  1276. bool TRPCHttpRequest::get_http_header(const std::string& name,
  1277. std::string& value) const
  1278. {
  1279. protocol::HttpHeaderCursor cursor(this);
  1280. return cursor.find(name, value);
  1281. }
  1282. bool TRPCHttpResponse::set_http_header(const std::string& name,
  1283. const std::string& value)
  1284. {
  1285. return this->protocol::HttpMessage::set_header_pair(name, value);
  1286. }
  1287. bool TRPCHttpResponse::add_http_header(const std::string& name,
  1288. const std::string& value)
  1289. {
  1290. return this->protocol::HttpMessage::add_header_pair(name, value);
  1291. }
  1292. bool TRPCHttpResponse::get_http_header(const std::string& name,
  1293. std::string& value) const
  1294. {
  1295. protocol::HttpHeaderCursor cursor(this);
  1296. return cursor.find(name, value);
  1297. }
  1298. } // namespace srpc