rpc_message_brpc.cc 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. /*
  2. Copyright (c) 2020 Sogou, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #include <errno.h>
  14. #include <vector>
  15. #include <string>
  16. #include <workflow/HttpUtil.h>
  17. #include <workflow/StringUtil.h>
  18. #include "rpc_basic.h"
  19. #include "rpc_compress.h"
  20. #include "rpc_meta_brpc.pb.h"
  21. #include "rpc_message_brpc.h"
  22. #include "rpc_zero_copy_stream.h"
  23. namespace srpc
  24. {
  25. static constexpr int BRPC_ENOSERVICE = 1001;
  26. static constexpr int BRPC_ENOMETHOD = 1002;
  27. static constexpr int BRPC_EREQUEST = 1003;
  28. static constexpr int BRPC_EINTERNAL = 2001;
  29. static constexpr int BRPC_ERESPONSE = 2002;
  30. static constexpr int BRPC_ELOGOFF = 2003;
  31. BRPCMessage::BRPCMessage()
  32. {
  33. this->nreceived = 0;
  34. this->meta_buf = NULL;
  35. this->meta_len = 0;
  36. this->message_len = 0;
  37. this->attachment_len = 0;
  38. memset(this->header, 0, sizeof (this->header));
  39. this->meta = new BrpcMeta();
  40. this->message = new RPCBuffer();
  41. this->attachment = NULL;
  42. }
  43. bool BRPCRequest::deserialize_meta()
  44. {
  45. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  46. if (meta->ParseFromArray(this->meta_buf, (int)this->meta_len))
  47. {
  48. if (meta->has_attachment_size())
  49. {
  50. this->attachment_len = meta->attachment_size();
  51. this->message_len -= this->attachment_len;
  52. this->attachment = new RPCBuffer();
  53. this->message->cut(this->message_len, this->attachment);
  54. }
  55. return true;
  56. }
  57. return false;
  58. }
  59. bool BRPCResponse::deserialize_meta()
  60. {
  61. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  62. int error;
  63. if (meta->ParseFromArray(this->meta_buf, (int)this->meta_len))
  64. {
  65. if (meta->has_attachment_size())
  66. {
  67. this->attachment_len = meta->attachment_size();
  68. this->message_len -= this->attachment_len;
  69. this->attachment = new RPCBuffer();
  70. this->message->cut(this->message_len, this->attachment);
  71. }
  72. this->srpc_status_code = RPCStatusOK;
  73. if (meta->has_response())
  74. {
  75. error = meta->mutable_response()->error_code();
  76. if (error != 0)
  77. {
  78. this->srpc_status_code = this->error_code_brpc_srpc(error);
  79. this->srpc_error_msg = meta->mutable_response()->error_text();
  80. }
  81. }
  82. return true;
  83. }
  84. return false;
  85. }
  86. int BRPCMessage::append(const void *buf, size_t *size, size_t size_limit)
  87. {
  88. uint32_t *p;
  89. size_t header_left, body_received, buf_len;
  90. if (this->nreceived < BRPC_HEADER_SIZE)
  91. {
  92. //receive header
  93. header_left = BRPC_HEADER_SIZE - this->nreceived;
  94. if (*size >= header_left)
  95. {
  96. //receive the whole header and ready to recieve body
  97. memcpy(this->header + this->nreceived, buf, header_left);
  98. this->nreceived += header_left;
  99. p = (uint32_t *)this->header + 1;
  100. buf_len = ntohl(*p); // payload_len
  101. p = (uint32_t *)this->header + 2;
  102. this->meta_len = ntohl(*p);
  103. this->message_len = buf_len - this->meta_len; // msg_len + attachment_len
  104. if (buf_len >= size_limit)
  105. {
  106. errno = EMSGSIZE;
  107. return -1;
  108. }
  109. else if (buf_len > 0)
  110. {
  111. if (*size - header_left > buf_len)
  112. *size = header_left + buf_len;
  113. this->meta_buf = new char[this->meta_len];
  114. // this->buf = new char[this->message_len];
  115. if (*size - header_left <= this->meta_len)
  116. {
  117. memcpy(this->meta_buf, (const char *)buf + header_left,
  118. *size - header_left);
  119. }
  120. else
  121. {
  122. memcpy(this->meta_buf, (const char *)buf + header_left,
  123. this->meta_len);
  124. // memcpy(this->buf,
  125. // (const char *)buf + header_left + this->meta_len,
  126. // *size - header_left - this->meta_len);
  127. this->message->append((const char *)buf + header_left + this->meta_len,
  128. *size - header_left - this->meta_len,
  129. BUFFER_MODE_COPY);
  130. }
  131. this->nreceived += *size - header_left;
  132. if (this->nreceived == BRPC_HEADER_SIZE + buf_len)
  133. return 1;
  134. else
  135. return 0;
  136. }
  137. else if (*size == header_left)
  138. {
  139. return 1; // means body_size == 0 and finish recieved header
  140. }
  141. else
  142. {
  143. // means buf_len < 0
  144. errno = EBADMSG;
  145. return -1;
  146. }
  147. }
  148. else
  149. {
  150. // only receive header
  151. memcpy(this->header + this->nreceived, buf, *size);
  152. this->nreceived += *size;
  153. return 0;
  154. }
  155. }
  156. else
  157. {
  158. // have already received the header and now is for body only
  159. body_received = this->nreceived - BRPC_HEADER_SIZE;
  160. buf_len = this->meta_len + this->message_len;
  161. if (body_received + *size > buf_len)
  162. *size = buf_len - body_received;
  163. if (body_received + *size <= this->meta_len)
  164. {
  165. memcpy(this->meta_buf + body_received, buf, *size);
  166. }
  167. else if (body_received < this->meta_len)
  168. {
  169. memcpy(this->meta_buf + body_received, buf,
  170. this->meta_len - body_received);
  171. if (body_received + *size > this->meta_len)// useless. always true
  172. // memcpy(this->buf, (const char *)buf + this->meta_len - body_received,
  173. // *size - this->meta_len + body_received);
  174. this->message->append((const char *)buf + this->meta_len - body_received,
  175. *size - this->meta_len + body_received,
  176. BUFFER_MODE_COPY);
  177. }
  178. else
  179. {
  180. // memcpy(this->buf + body_received - this->meta_len, buf, *size);
  181. this->message->append((const char *)buf, *size, BUFFER_MODE_COPY);
  182. }
  183. this->nreceived += *size;
  184. return this->nreceived == BRPC_HEADER_SIZE + buf_len;
  185. }
  186. }
  187. bool BRPCRequest::serialize_meta()
  188. {
  189. this->meta_len = meta->ByteSizeLong();
  190. this->meta_buf = new char[this->meta_len];
  191. return this->meta->SerializeToArray(this->meta_buf, (int)this->meta_len);
  192. }
  193. bool BRPCResponse::serialize_meta()
  194. {
  195. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  196. int error;
  197. if (this->srpc_status_code != RPCStatusOK)
  198. {
  199. error = this->error_code_srpc_brpc(this->srpc_status_code);
  200. meta->mutable_response()->set_error_code(error);
  201. meta->mutable_response()->set_error_text(this->srpc_error_msg);
  202. }
  203. this->meta_len = meta->ByteSizeLong();
  204. this->meta_buf = new char[this->meta_len];
  205. return meta->SerializeToArray(this->meta_buf, (int)this->meta_len);
  206. }
  207. int BRPCMessage::get_compress_type() const
  208. {
  209. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  210. return meta->compress_type();
  211. }
  212. void BRPCMessage::set_compress_type(int type)
  213. {
  214. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  215. meta->set_compress_type(type);
  216. }
  217. void BRPCMessage::set_attachment_nocopy(const char *attachment, size_t len)
  218. {
  219. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  220. this->attachment_len += len;
  221. meta->set_attachment_size(this->attachment_len);
  222. this->attachment = new RPCBuffer();
  223. this->attachment->append(attachment, len, BUFFER_MODE_NOCOPY);
  224. }
  225. bool BRPCMessage::get_attachment_nocopy(const char **attachment, size_t *len) const
  226. {
  227. size_t tmp_len = (size_t)-1;
  228. const void *tmp_buf;
  229. if (this->attachment == NULL ||
  230. this->attachment->fetch(&tmp_buf, &tmp_len) == false)
  231. {
  232. return false;
  233. }
  234. *attachment = (const char *)tmp_buf;
  235. *len = tmp_len;
  236. return true;
  237. }
  238. const std::string& BRPCRequest::get_service_name() const
  239. {
  240. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  241. return meta->mutable_request()->service_name();
  242. }
  243. const std::string& BRPCRequest::get_method_name() const
  244. {
  245. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  246. return meta->mutable_request()->method_name();
  247. }
  248. void BRPCRequest::set_service_name(const std::string& service_name)
  249. {
  250. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  251. meta->mutable_request()->set_service_name(service_name);
  252. }
  253. void BRPCRequest::set_method_name(const std::string& method_name)
  254. {
  255. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  256. meta->mutable_request()->set_method_name(method_name);
  257. }
  258. int64_t BRPCRequest::get_correlation_id() const
  259. {
  260. const BrpcMeta *meta = static_cast<const BrpcMeta *>(this->meta);
  261. if (meta->has_correlation_id())
  262. return meta->correlation_id();
  263. return -1;
  264. }
  265. int BRPCResponse::get_status_code() const
  266. {
  267. return this->srpc_status_code;
  268. }
  269. void BRPCResponse::set_status_code(int code)
  270. {
  271. this->srpc_status_code = code;
  272. if (code != RPCStatusOK)
  273. this->srpc_error_msg = this->get_errmsg();
  274. }
  275. int BRPCResponse::get_error() const
  276. {
  277. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  278. return meta->mutable_response()->error_code();
  279. }
  280. const char *BRPCResponse::get_errmsg() const
  281. {
  282. switch (this->srpc_status_code)
  283. {
  284. case RPCStatusOK:
  285. return "OK";
  286. case RPCStatusUndefined:
  287. return "Undefined Error";
  288. case RPCStatusServiceNotFound:
  289. return "Service Not Found";
  290. case RPCStatusMethodNotFound:
  291. return "Method Not Found";
  292. case RPCStatusMetaError:
  293. return "Meta Error";
  294. case RPCStatusReqCompressSizeInvalid:
  295. return "Request Compress-size Invalid";
  296. case RPCStatusReqDecompressSizeInvalid:
  297. return "Request Decompress-size Invalid";
  298. case RPCStatusReqCompressNotSupported:
  299. return "Request Compress Not Supported";
  300. case RPCStatusReqDecompressNotSupported:
  301. return "Request Decompress Not Supported";
  302. case RPCStatusReqCompressError:
  303. return "Request Compress Error";
  304. case RPCStatusReqDecompressError:
  305. return "Request Decompress Error";
  306. case RPCStatusReqSerializeError:
  307. return "Request Serialize Error";
  308. case RPCStatusReqDeserializeError:
  309. return "Request Deserialize Error";
  310. case RPCStatusRespCompressSizeInvalid:
  311. return "Response Compress-size Invalid";
  312. case RPCStatusRespDecompressSizeInvalid:
  313. return "Response Decompress-size Invalid";
  314. case RPCStatusRespCompressNotSupported:
  315. return "Response Compress Not Supported";
  316. case RPCStatusRespDecompressNotSupported:
  317. return "Response Decompress Not Supported";
  318. case RPCStatusRespCompressError:
  319. return "Response Compress Error";
  320. case RPCStatusRespDecompressError:
  321. return "Response Decompress Error";
  322. case RPCStatusRespSerializeError:
  323. return "Response Serialize Error";
  324. case RPCStatusRespDeserializeError:
  325. return "Response Deserialize Error";
  326. case RPCStatusIDLSerializeNotSupported:
  327. return "IDL Serialize Not Supported";
  328. case RPCStatusIDLDeserializeNotSupported:
  329. return "IDL Deserialize Not Supported";
  330. case RPCStatusURIInvalid:
  331. return "URI Invalid";
  332. case RPCStatusUpstreamFailed:
  333. return "Upstream Failed";
  334. case RPCStatusSystemError:
  335. return "System Error. Use get_error() to get errno";
  336. case RPCStatusSSLError:
  337. return "SSL Error. Use get_error() to get SSL-Error";
  338. case RPCStatusDNSError:
  339. return "DNS Error. Use get_error() to get GAI-Error";
  340. case RPCStatusProcessTerminated:
  341. return "Process Terminated";
  342. default:
  343. return "Unknown Error";
  344. }
  345. }
  346. void BRPCResponse::set_error(int error)
  347. {
  348. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  349. meta->mutable_response()->set_error_code(error);
  350. }
  351. void BRPCResponse::set_correlation_id(int64_t cid)
  352. {
  353. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  354. meta->set_correlation_id(cid);
  355. }
  356. int BRPCMessage::serialize(const ProtobufIDLMessage *pb_msg)
  357. {
  358. if (!pb_msg)
  359. return RPCStatusOK;
  360. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  361. bool is_resp = !meta->has_request();
  362. int msg_len = pb_msg->ByteSizeLong();
  363. RPCOutputStream stream(this->message, pb_msg->ByteSizeLong());
  364. int ret = pb_msg->SerializeToZeroCopyStream(&stream) ? 0 : -1;
  365. if (ret < 0)
  366. return is_resp ? RPCStatusRespSerializeError : RPCStatusReqSerializeError;
  367. this->message_len = msg_len;
  368. return RPCStatusOK;
  369. }
  370. int BRPCMessage::deserialize(ProtobufIDLMessage *pb_msg)
  371. {
  372. const BrpcMeta *meta = static_cast<const BrpcMeta *>(this->meta);
  373. bool is_resp = !meta->has_request();
  374. RPCInputStream stream(this->message);
  375. if (pb_msg->ParseFromZeroCopyStream(&stream) == false)
  376. return is_resp ? RPCStatusRespDeserializeError : RPCStatusReqDeserializeError;
  377. return RPCStatusOK;
  378. }
  379. int BRPCMessage::compress()
  380. {
  381. BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
  382. bool is_resp = !meta->has_request();
  383. int type = meta->compress_type();
  384. size_t buflen = this->message_len;
  385. int status_code = RPCStatusOK;
  386. if (buflen == 0)
  387. return status_code;
  388. if (type == RPCCompressNone)
  389. return status_code;
  390. static RPCCompressor *compressor = RPCCompressor::get_instance();
  391. int ret = compressor->lease_compressed_size(type, buflen);
  392. if (ret == -2)
  393. return is_resp ? RPCStatusReqCompressNotSupported : RPCStatusRespCompressNotSupported;
  394. else if (ret <= 0)
  395. return is_resp ? RPCStatusRespCompressSizeInvalid : RPCStatusReqCompressSizeInvalid;
  396. //buflen = ret;
  397. RPCBuffer *dst_buf = new RPCBuffer();
  398. ret = compressor->serialize_to_compressed(this->message, dst_buf, type);
  399. if (ret == -2)
  400. status_code = is_resp ? RPCStatusRespCompressNotSupported : RPCStatusReqCompressNotSupported;
  401. else if (ret == -1)
  402. status_code = is_resp ? RPCStatusRespCompressError : RPCStatusReqCompressError;
  403. else if (ret <= 0)
  404. status_code = is_resp ? RPCStatusRespCompressSizeInvalid : RPCStatusReqCompressSizeInvalid;
  405. else
  406. buflen = ret;
  407. if (status_code == RPCStatusOK)
  408. {
  409. delete this->message;
  410. this->message = dst_buf;
  411. this->message_len = buflen;
  412. } else {
  413. delete dst_buf;
  414. }
  415. return status_code;
  416. }
  417. int BRPCMessage::decompress()
  418. {
  419. const BrpcMeta *meta = static_cast<const BrpcMeta *>(this->meta);
  420. bool is_resp = !meta->has_request();
  421. int type = meta->compress_type();
  422. int status_code = RPCStatusOK;
  423. if (this->message_len == 0 || type == RPCCompressNone)
  424. return status_code;
  425. RPCBuffer *dst_buf = new RPCBuffer();
  426. static RPCCompressor *compressor = RPCCompressor::get_instance();
  427. int ret = compressor->parse_from_compressed(this->message, dst_buf, type);
  428. if (ret == -2)
  429. status_code = is_resp ? RPCStatusRespDecompressNotSupported : RPCStatusReqDecompressNotSupported;
  430. else if (ret == -1)
  431. status_code = is_resp ? RPCStatusRespDecompressError : RPCStatusReqDecompressError;
  432. else if (ret <= 0)
  433. status_code = is_resp ? RPCStatusRespDecompressSizeInvalid : RPCStatusReqDecompressSizeInvalid;
  434. if (status_code == RPCStatusOK)
  435. {
  436. delete this->message;
  437. this->message = dst_buf;
  438. this->message_len = ret;
  439. } else {
  440. delete dst_buf;
  441. }
  442. return status_code;
  443. }
  444. inline int BRPCMessage::error_code_srpc_brpc(int srpc_status_code) const
  445. {
  446. switch (srpc_status_code)
  447. {
  448. case RPCStatusServiceNotFound:
  449. return BRPC_ENOSERVICE;
  450. case RPCStatusMethodNotFound:
  451. return BRPC_ENOMETHOD;
  452. case RPCStatusMetaError:
  453. case RPCStatusReqCompressSizeInvalid:
  454. case RPCStatusReqDecompressSizeInvalid:
  455. case RPCStatusReqCompressNotSupported:
  456. case RPCStatusReqDecompressNotSupported:
  457. case RPCStatusReqCompressError:
  458. case RPCStatusReqDecompressError:
  459. case RPCStatusReqSerializeError:
  460. case RPCStatusReqDeserializeError:
  461. return BRPC_EREQUEST;
  462. case RPCStatusRespCompressSizeInvalid:
  463. case RPCStatusRespDecompressSizeInvalid:
  464. case RPCStatusRespCompressNotSupported:
  465. case RPCStatusRespDecompressNotSupported:
  466. case RPCStatusRespCompressError:
  467. case RPCStatusRespDecompressError:
  468. case RPCStatusRespSerializeError:
  469. case RPCStatusRespDeserializeError:
  470. return BRPC_ERESPONSE;
  471. case RPCStatusProcessTerminated:
  472. return BRPC_ELOGOFF;
  473. default:
  474. return BRPC_EINTERNAL;
  475. }
  476. }
  477. inline int BRPCMessage::error_code_brpc_srpc(int brpc_error_code) const
  478. {
  479. switch (brpc_error_code)
  480. {
  481. case BRPC_ENOSERVICE:
  482. return RPCStatusServiceNotFound;
  483. case BRPC_ENOMETHOD:
  484. return RPCStatusMethodNotFound;
  485. case BRPC_EREQUEST:
  486. return RPCStatusReqDeserializeError;
  487. case BRPC_ERESPONSE:
  488. return RPCStatusRespDeserializeError;
  489. case BRPC_ELOGOFF:
  490. return RPCStatusProcessTerminated;
  491. default:
  492. return RPCStatusSystemError;
  493. }
  494. }
  495. } // namesapce sogou