123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578 |
- /*
- Copyright (c) 2020 Sogou, Inc.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- #include <errno.h>
- #include <vector>
- #include <string>
- #include <workflow/HttpUtil.h>
- #include <workflow/StringUtil.h>
- #include "rpc_basic.h"
- #include "rpc_compress.h"
- #include "rpc_meta_brpc.pb.h"
- #include "rpc_message_brpc.h"
- #include "rpc_zero_copy_stream.h"
- namespace srpc
- {
- static constexpr int BRPC_ENOSERVICE = 1001;
- static constexpr int BRPC_ENOMETHOD = 1002;
- static constexpr int BRPC_EREQUEST = 1003;
- static constexpr int BRPC_EINTERNAL = 2001;
- static constexpr int BRPC_ERESPONSE = 2002;
- static constexpr int BRPC_ELOGOFF = 2003;
- BRPCMessage::BRPCMessage()
- {
- this->nreceived = 0;
- this->meta_buf = NULL;
- this->meta_len = 0;
- this->message_len = 0;
- this->attachment_len = 0;
- memset(this->header, 0, sizeof (this->header));
- this->meta = new BrpcMeta();
- this->message = new RPCBuffer();
- this->attachment = NULL;
- }
- bool BRPCRequest::deserialize_meta()
- {
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- if (meta->ParseFromArray(this->meta_buf, (int)this->meta_len))
- {
- if (meta->has_attachment_size())
- {
- this->attachment_len = meta->attachment_size();
- this->message_len -= this->attachment_len;
- this->attachment = new RPCBuffer();
- this->message->cut(this->message_len, this->attachment);
- }
- return true;
- }
- return false;
- }
- bool BRPCResponse::deserialize_meta()
- {
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- int error;
- if (meta->ParseFromArray(this->meta_buf, (int)this->meta_len))
- {
- if (meta->has_attachment_size())
- {
- this->attachment_len = meta->attachment_size();
- this->message_len -= this->attachment_len;
- this->attachment = new RPCBuffer();
- this->message->cut(this->message_len, this->attachment);
- }
- this->srpc_status_code = RPCStatusOK;
- if (meta->has_response())
- {
- error = meta->mutable_response()->error_code();
- if (error != 0)
- {
- this->srpc_status_code = this->error_code_brpc_srpc(error);
- this->srpc_error_msg = meta->mutable_response()->error_text();
- }
- }
- return true;
- }
- return false;
- }
- int BRPCMessage::append(const void *buf, size_t *size, size_t size_limit)
- {
- uint32_t *p;
- size_t header_left, body_received, buf_len;
- if (this->nreceived < BRPC_HEADER_SIZE)
- {
- //receive header
- header_left = BRPC_HEADER_SIZE - this->nreceived;
- if (*size >= header_left)
- {
- //receive the whole header and ready to recieve body
- memcpy(this->header + this->nreceived, buf, header_left);
- this->nreceived += header_left;
- p = (uint32_t *)this->header + 1;
- buf_len = ntohl(*p); // payload_len
- p = (uint32_t *)this->header + 2;
- this->meta_len = ntohl(*p);
- this->message_len = buf_len - this->meta_len; // msg_len + attachment_len
- if (buf_len >= size_limit)
- {
- errno = EMSGSIZE;
- return -1;
- }
- else if (buf_len > 0)
- {
- if (*size - header_left > buf_len)
- *size = header_left + buf_len;
- this->meta_buf = new char[this->meta_len];
- // this->buf = new char[this->message_len];
- if (*size - header_left <= this->meta_len)
- {
- memcpy(this->meta_buf, (const char *)buf + header_left,
- *size - header_left);
- }
- else
- {
- memcpy(this->meta_buf, (const char *)buf + header_left,
- this->meta_len);
- // memcpy(this->buf,
- // (const char *)buf + header_left + this->meta_len,
- // *size - header_left - this->meta_len);
- this->message->append((const char *)buf + header_left + this->meta_len,
- *size - header_left - this->meta_len,
- BUFFER_MODE_COPY);
- }
- this->nreceived += *size - header_left;
- if (this->nreceived == BRPC_HEADER_SIZE + buf_len)
- return 1;
- else
- return 0;
- }
- else if (*size == header_left)
- {
- return 1; // means body_size == 0 and finish recieved header
- }
- else
- {
- // means buf_len < 0
- errno = EBADMSG;
- return -1;
- }
- }
- else
- {
- // only receive header
- memcpy(this->header + this->nreceived, buf, *size);
- this->nreceived += *size;
- return 0;
- }
- }
- else
- {
- // have already received the header and now is for body only
- body_received = this->nreceived - BRPC_HEADER_SIZE;
- buf_len = this->meta_len + this->message_len;
- if (body_received + *size > buf_len)
- *size = buf_len - body_received;
- if (body_received + *size <= this->meta_len)
- {
- memcpy(this->meta_buf + body_received, buf, *size);
- }
- else if (body_received < this->meta_len)
- {
- memcpy(this->meta_buf + body_received, buf,
- this->meta_len - body_received);
- if (body_received + *size > this->meta_len)// useless. always true
- // memcpy(this->buf, (const char *)buf + this->meta_len - body_received,
- // *size - this->meta_len + body_received);
- this->message->append((const char *)buf + this->meta_len - body_received,
- *size - this->meta_len + body_received,
- BUFFER_MODE_COPY);
- }
- else
- {
- // memcpy(this->buf + body_received - this->meta_len, buf, *size);
- this->message->append((const char *)buf, *size, BUFFER_MODE_COPY);
- }
- this->nreceived += *size;
- return this->nreceived == BRPC_HEADER_SIZE + buf_len;
- }
- }
- bool BRPCRequest::serialize_meta()
- {
- this->meta_len = meta->ByteSizeLong();
- this->meta_buf = new char[this->meta_len];
- return this->meta->SerializeToArray(this->meta_buf, (int)this->meta_len);
- }
- bool BRPCResponse::serialize_meta()
- {
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- int error;
- if (this->srpc_status_code != RPCStatusOK)
- {
- error = this->error_code_srpc_brpc(this->srpc_status_code);
- meta->mutable_response()->set_error_code(error);
- meta->mutable_response()->set_error_text(this->srpc_error_msg);
- }
- this->meta_len = meta->ByteSizeLong();
- this->meta_buf = new char[this->meta_len];
- return meta->SerializeToArray(this->meta_buf, (int)this->meta_len);
- }
- int BRPCMessage::get_compress_type() const
- {
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- return meta->compress_type();
- }
- void BRPCMessage::set_compress_type(int type)
- {
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- meta->set_compress_type(type);
- }
- void BRPCMessage::set_attachment_nocopy(const char *attachment, size_t len)
- {
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- this->attachment_len += len;
- meta->set_attachment_size(this->attachment_len);
- this->attachment = new RPCBuffer();
- this->attachment->append(attachment, len, BUFFER_MODE_NOCOPY);
- }
- bool BRPCMessage::get_attachment_nocopy(const char **attachment, size_t *len) const
- {
- size_t tmp_len = (size_t)-1;
- const void *tmp_buf;
- if (this->attachment == NULL ||
- this->attachment->fetch(&tmp_buf, &tmp_len) == false)
- {
- return false;
- }
- *attachment = (const char *)tmp_buf;
- *len = tmp_len;
- return true;
- }
- const std::string& BRPCRequest::get_service_name() const
- {
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- return meta->mutable_request()->service_name();
- }
- const std::string& BRPCRequest::get_method_name() const
- {
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- return meta->mutable_request()->method_name();
- }
- void BRPCRequest::set_service_name(const std::string& service_name)
- {
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- meta->mutable_request()->set_service_name(service_name);
- }
- void BRPCRequest::set_method_name(const std::string& method_name)
- {
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- meta->mutable_request()->set_method_name(method_name);
- }
- int64_t BRPCRequest::get_correlation_id() const
- {
- const BrpcMeta *meta = static_cast<const BrpcMeta *>(this->meta);
- if (meta->has_correlation_id())
- return meta->correlation_id();
- return -1;
- }
- int BRPCResponse::get_status_code() const
- {
- return this->srpc_status_code;
- }
- void BRPCResponse::set_status_code(int code)
- {
- this->srpc_status_code = code;
- if (code != RPCStatusOK)
- this->srpc_error_msg = this->get_errmsg();
- }
- int BRPCResponse::get_error() const
- {
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- return meta->mutable_response()->error_code();
- }
- const char *BRPCResponse::get_errmsg() const
- {
- switch (this->srpc_status_code)
- {
- case RPCStatusOK:
- return "OK";
- case RPCStatusUndefined:
- return "Undefined Error";
- case RPCStatusServiceNotFound:
- return "Service Not Found";
- case RPCStatusMethodNotFound:
- return "Method Not Found";
- case RPCStatusMetaError:
- return "Meta Error";
- case RPCStatusReqCompressSizeInvalid:
- return "Request Compress-size Invalid";
- case RPCStatusReqDecompressSizeInvalid:
- return "Request Decompress-size Invalid";
- case RPCStatusReqCompressNotSupported:
- return "Request Compress Not Supported";
- case RPCStatusReqDecompressNotSupported:
- return "Request Decompress Not Supported";
- case RPCStatusReqCompressError:
- return "Request Compress Error";
- case RPCStatusReqDecompressError:
- return "Request Decompress Error";
- case RPCStatusReqSerializeError:
- return "Request Serialize Error";
- case RPCStatusReqDeserializeError:
- return "Request Deserialize Error";
- case RPCStatusRespCompressSizeInvalid:
- return "Response Compress-size Invalid";
- case RPCStatusRespDecompressSizeInvalid:
- return "Response Decompress-size Invalid";
- case RPCStatusRespCompressNotSupported:
- return "Response Compress Not Supported";
- case RPCStatusRespDecompressNotSupported:
- return "Response Decompress Not Supported";
- case RPCStatusRespCompressError:
- return "Response Compress Error";
- case RPCStatusRespDecompressError:
- return "Response Decompress Error";
- case RPCStatusRespSerializeError:
- return "Response Serialize Error";
- case RPCStatusRespDeserializeError:
- return "Response Deserialize Error";
- case RPCStatusIDLSerializeNotSupported:
- return "IDL Serialize Not Supported";
- case RPCStatusIDLDeserializeNotSupported:
- return "IDL Deserialize Not Supported";
- case RPCStatusURIInvalid:
- return "URI Invalid";
- case RPCStatusUpstreamFailed:
- return "Upstream Failed";
- case RPCStatusSystemError:
- return "System Error. Use get_error() to get errno";
- case RPCStatusSSLError:
- return "SSL Error. Use get_error() to get SSL-Error";
- case RPCStatusDNSError:
- return "DNS Error. Use get_error() to get GAI-Error";
- case RPCStatusProcessTerminated:
- return "Process Terminated";
- default:
- return "Unknown Error";
- }
- }
- void BRPCResponse::set_error(int error)
- {
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- meta->mutable_response()->set_error_code(error);
- }
- void BRPCResponse::set_correlation_id(int64_t cid)
- {
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- meta->set_correlation_id(cid);
- }
- int BRPCMessage::serialize(const ProtobufIDLMessage *pb_msg)
- {
- if (!pb_msg)
- return RPCStatusOK;
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- bool is_resp = !meta->has_request();
- int msg_len = pb_msg->ByteSizeLong();
- RPCOutputStream stream(this->message, pb_msg->ByteSizeLong());
- int ret = pb_msg->SerializeToZeroCopyStream(&stream) ? 0 : -1;
- if (ret < 0)
- return is_resp ? RPCStatusRespSerializeError : RPCStatusReqSerializeError;
- this->message_len = msg_len;
- return RPCStatusOK;
- }
- int BRPCMessage::deserialize(ProtobufIDLMessage *pb_msg)
- {
- const BrpcMeta *meta = static_cast<const BrpcMeta *>(this->meta);
- bool is_resp = !meta->has_request();
- RPCInputStream stream(this->message);
- if (pb_msg->ParseFromZeroCopyStream(&stream) == false)
- return is_resp ? RPCStatusRespDeserializeError : RPCStatusReqDeserializeError;
- return RPCStatusOK;
- }
- int BRPCMessage::compress()
- {
- BrpcMeta *meta = static_cast<BrpcMeta *>(this->meta);
- bool is_resp = !meta->has_request();
- int type = meta->compress_type();
- size_t buflen = this->message_len;
- int status_code = RPCStatusOK;
- if (buflen == 0)
- return status_code;
- if (type == RPCCompressNone)
- return status_code;
- static RPCCompressor *compressor = RPCCompressor::get_instance();
- int ret = compressor->lease_compressed_size(type, buflen);
- if (ret == -2)
- return is_resp ? RPCStatusReqCompressNotSupported : RPCStatusRespCompressNotSupported;
- else if (ret <= 0)
- return is_resp ? RPCStatusRespCompressSizeInvalid : RPCStatusReqCompressSizeInvalid;
- //buflen = ret;
- RPCBuffer *dst_buf = new RPCBuffer();
- ret = compressor->serialize_to_compressed(this->message, dst_buf, type);
- if (ret == -2)
- status_code = is_resp ? RPCStatusRespCompressNotSupported : RPCStatusReqCompressNotSupported;
- else if (ret == -1)
- status_code = is_resp ? RPCStatusRespCompressError : RPCStatusReqCompressError;
- else if (ret <= 0)
- status_code = is_resp ? RPCStatusRespCompressSizeInvalid : RPCStatusReqCompressSizeInvalid;
- else
- buflen = ret;
- if (status_code == RPCStatusOK)
- {
- delete this->message;
- this->message = dst_buf;
- this->message_len = buflen;
- } else {
- delete dst_buf;
- }
- return status_code;
- }
- int BRPCMessage::decompress()
- {
- const BrpcMeta *meta = static_cast<const BrpcMeta *>(this->meta);
- bool is_resp = !meta->has_request();
- int type = meta->compress_type();
- int status_code = RPCStatusOK;
- if (this->message_len == 0 || type == RPCCompressNone)
- return status_code;
- RPCBuffer *dst_buf = new RPCBuffer();
- static RPCCompressor *compressor = RPCCompressor::get_instance();
- int ret = compressor->parse_from_compressed(this->message, dst_buf, type);
- if (ret == -2)
- status_code = is_resp ? RPCStatusRespDecompressNotSupported : RPCStatusReqDecompressNotSupported;
- else if (ret == -1)
- status_code = is_resp ? RPCStatusRespDecompressError : RPCStatusReqDecompressError;
- else if (ret <= 0)
- status_code = is_resp ? RPCStatusRespDecompressSizeInvalid : RPCStatusReqDecompressSizeInvalid;
- if (status_code == RPCStatusOK)
- {
- delete this->message;
- this->message = dst_buf;
- this->message_len = ret;
- } else {
- delete dst_buf;
- }
- return status_code;
- }
- inline int BRPCMessage::error_code_srpc_brpc(int srpc_status_code) const
- {
- switch (srpc_status_code)
- {
- case RPCStatusServiceNotFound:
- return BRPC_ENOSERVICE;
- case RPCStatusMethodNotFound:
- return BRPC_ENOMETHOD;
- case RPCStatusMetaError:
- case RPCStatusReqCompressSizeInvalid:
- case RPCStatusReqDecompressSizeInvalid:
- case RPCStatusReqCompressNotSupported:
- case RPCStatusReqDecompressNotSupported:
- case RPCStatusReqCompressError:
- case RPCStatusReqDecompressError:
- case RPCStatusReqSerializeError:
- case RPCStatusReqDeserializeError:
- return BRPC_EREQUEST;
- case RPCStatusRespCompressSizeInvalid:
- case RPCStatusRespDecompressSizeInvalid:
- case RPCStatusRespCompressNotSupported:
- case RPCStatusRespDecompressNotSupported:
- case RPCStatusRespCompressError:
- case RPCStatusRespDecompressError:
- case RPCStatusRespSerializeError:
- case RPCStatusRespDeserializeError:
- return BRPC_ERESPONSE;
- case RPCStatusProcessTerminated:
- return BRPC_ELOGOFF;
- default:
- return BRPC_EINTERNAL;
- }
- }
- inline int BRPCMessage::error_code_brpc_srpc(int brpc_error_code) const
- {
- switch (brpc_error_code)
- {
- case BRPC_ENOSERVICE:
- return RPCStatusServiceNotFound;
- case BRPC_ENOMETHOD:
- return RPCStatusMethodNotFound;
- case BRPC_EREQUEST:
- return RPCStatusReqDeserializeError;
- case BRPC_ERESPONSE:
- return RPCStatusRespDeserializeError;
- case BRPC_ELOGOFF:
- return RPCStatusProcessTerminated;
- default:
- return RPCStatusSystemError;
- }
- }
- } // namesapce sogou
|