http_rpc_protocol.cpp 62 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <google/protobuf/descriptor.h> // MethodDescriptor
  18. #include <gflags/gflags.h>
  19. #include <json2pb/pb_to_json.h> // ProtoMessageToJson
  20. #include <json2pb/json_to_pb.h> // JsonToProtoMessage
  21. #include "butil/unique_ptr.h" // std::unique_ptr
  22. #include "butil/string_splitter.h" // StringMultiSplitter
  23. #include "butil/string_printf.h"
  24. #include "butil/time.h"
  25. #include "butil/sys_byteorder.h"
  26. #include "brpc/compress.h"
  27. #include "brpc/errno.pb.h" // ENOSERVICE, ENOMETHOD
  28. #include "brpc/controller.h" // Controller
  29. #include "brpc/server.h" // Server
  30. #include "brpc/details/server_private_accessor.h"
  31. #include "brpc/span.h"
  32. #include "brpc/socket.h" // Socket
  33. #include "brpc/http_status_code.h" // HTTP_STATUS_*
  34. #include "brpc/details/controller_private_accessor.h"
  35. #include "brpc/builtin/index_service.h" // IndexService
  36. #include "brpc/policy/gzip_compress.h"
  37. #include "brpc/policy/http2_rpc_protocol.h"
  38. #include "brpc/details/usercode_backup_pool.h"
  39. #include "brpc/grpc.h"
  40. extern "C" {
  41. void bthread_assign_data(void* data);
  42. }
  43. namespace brpc {
  44. int is_failed_after_queries(const http_parser* parser);
  45. int is_failed_after_http_version(const http_parser* parser);
  46. DECLARE_bool(http_verbose);
  47. DECLARE_int32(http_verbose_max_body_length);
  48. // Defined in grpc.cpp
  49. int64_t ConvertGrpcTimeoutToUS(const std::string* grpc_timeout);
  50. namespace policy {
  51. DEFINE_int32(http_max_error_length, 2048, "Max printed length of a http error");
  52. DEFINE_int32(http_body_compress_threshold, 512, "Not compress http body when "
  53. "it's less than so many bytes.");
  54. DEFINE_string(http_header_of_user_ip, "", "http requests sent by proxies may "
  55. "set the client ip in http headers. When this flag is non-empty, "
  56. "brpc will read ip:port from the specified header for "
  57. "authorization and set Controller::remote_side()");
  58. DEFINE_bool(pb_enum_as_number, false, "[Not recommended] Convert enums in "
  59. "protobuf to json as numbers, affecting both client-side and "
  60. "server-side");
  61. // Read user address from the header specified by -http_header_of_user_ip
  62. static bool GetUserAddressFromHeaderImpl(const HttpHeader& headers,
  63. butil::EndPoint* user_addr) {
  64. const std::string* user_addr_str =
  65. headers.GetHeader(FLAGS_http_header_of_user_ip);
  66. if (user_addr_str == NULL) {
  67. return false;
  68. }
  69. if (user_addr_str->find(':') == std::string::npos) {
  70. if (butil::str2ip(user_addr_str->c_str(), &user_addr->ip) != 0) {
  71. LOG(WARNING) << "Fail to parse ip from " << *user_addr_str;
  72. return false;
  73. }
  74. user_addr->port = 0;
  75. } else {
  76. if (butil::str2endpoint(user_addr_str->c_str(), user_addr) != 0) {
  77. LOG(WARNING) << "Fail to parse ip:port from " << *user_addr_str;
  78. return false;
  79. }
  80. }
  81. return true;
  82. }
  83. inline bool GetUserAddressFromHeader(const HttpHeader& headers,
  84. butil::EndPoint* user_addr) {
  85. if (FLAGS_http_header_of_user_ip.empty()) {
  86. return false;
  87. }
  88. return GetUserAddressFromHeaderImpl(headers, user_addr);
  89. }
  90. CommonStrings::CommonStrings()
  91. : ACCEPT("accept")
  92. , DEFAULT_ACCEPT("*/*")
  93. , USER_AGENT("user-agent")
  94. , DEFAULT_USER_AGENT("brpc/1.0 curl/7.0")
  95. , CONTENT_TYPE("content-type")
  96. , CONTENT_TYPE_TEXT("text/plain")
  97. , CONTENT_TYPE_JSON("application/json")
  98. , CONTENT_TYPE_PROTO("application/proto")
  99. , ERROR_CODE("x-bd-error-code")
  100. , AUTHORIZATION("authorization")
  101. , ACCEPT_ENCODING("accept-encoding")
  102. , CONTENT_ENCODING("content-encoding")
  103. , GZIP("gzip")
  104. , CONNECTION("connection")
  105. , KEEP_ALIVE("keep-alive")
  106. , CLOSE("close")
  107. , LOG_ID("log-id")
  108. , DEFAULT_METHOD("default_method")
  109. , NO_METHOD("no_method")
  110. , H2_SCHEME(":scheme")
  111. , H2_SCHEME_HTTP("http")
  112. , H2_SCHEME_HTTPS("https")
  113. , H2_AUTHORITY(":authority")
  114. , H2_PATH(":path")
  115. , H2_STATUS(":status")
  116. , STATUS_200("200")
  117. , H2_METHOD(":method")
  118. , METHOD_GET("GET")
  119. , METHOD_POST("POST")
  120. , TE("te")
  121. , TRAILERS("trailers")
  122. , GRPC_ENCODING("grpc-encoding")
  123. , GRPC_ACCEPT_ENCODING("grpc-accept-encoding")
  124. , GRPC_ACCEPT_ENCODING_VALUE("identity,gzip")
  125. , GRPC_STATUS("grpc-status")
  126. , GRPC_MESSAGE("grpc-message")
  127. , GRPC_TIMEOUT("grpc-timeout")
  128. {}
  129. static CommonStrings* common = NULL;
  130. static pthread_once_t g_common_strings_once = PTHREAD_ONCE_INIT;
  131. static void CreateCommonStrings() {
  132. common = new CommonStrings;
  133. }
  134. // Called in global.cpp
  135. int InitCommonStrings() {
  136. return pthread_once(&g_common_strings_once, CreateCommonStrings);
  137. }
  138. static const int ALLOW_UNUSED force_creation_of_common = InitCommonStrings();
  139. const CommonStrings* get_common_strings() { return common; }
  140. HttpContentType ParseContentType(butil::StringPiece ct, bool* is_grpc_ct) {
  141. // According to http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.7
  142. // media-type = type "/" subtype *( ";" parameter )
  143. // type = token
  144. // subtype = token
  145. const butil::StringPiece prefix = "application/";
  146. if (!ct.starts_with(prefix)) {
  147. return HTTP_CONTENT_OTHERS;
  148. }
  149. ct.remove_prefix(prefix.size());
  150. if (ct.starts_with("grpc")) {
  151. if (ct.size() == (size_t)4 || ct[4] == ';') {
  152. if (is_grpc_ct) {
  153. *is_grpc_ct = true;
  154. }
  155. // assume that the default content type for grpc is proto.
  156. return HTTP_CONTENT_PROTO;
  157. } else if (ct[4] == '+') {
  158. ct.remove_prefix(5);
  159. if (is_grpc_ct) {
  160. *is_grpc_ct = true;
  161. }
  162. }
  163. // else don't change ct. Note that "grpcfoo" is a valid but non-grpc
  164. // content-type in the sense of format.
  165. }
  166. HttpContentType type = HTTP_CONTENT_OTHERS;
  167. if (ct.starts_with("json")) {
  168. type = HTTP_CONTENT_JSON;
  169. ct.remove_prefix(4);
  170. } else if (ct.starts_with("proto")) {
  171. type = HTTP_CONTENT_PROTO;
  172. ct.remove_prefix(5);
  173. } else {
  174. return HTTP_CONTENT_OTHERS;
  175. }
  176. return (ct.empty() || ct.front() == ';') ? type : HTTP_CONTENT_OTHERS;
  177. }
  178. static void PrintMessage(const butil::IOBuf& inbuf,
  179. bool request_or_response,
  180. bool has_content) {
  181. butil::IOBuf buf1 = inbuf;
  182. butil::IOBuf buf2;
  183. char str[48];
  184. if (request_or_response) {
  185. snprintf(str, sizeof(str), "[ HTTP REQUEST @%s ]", butil::my_ip_cstr());
  186. } else {
  187. snprintf(str, sizeof(str), "[ HTTP RESPONSE @%s ]", butil::my_ip_cstr());
  188. }
  189. buf2.append(str);
  190. size_t last_size;
  191. do {
  192. buf2.append("\r\n> ");
  193. last_size = buf2.size();
  194. } while (buf1.cut_until(&buf2, "\r\n") == 0);
  195. if (buf2.size() == last_size) {
  196. buf2.pop_back(2); // remove "> "
  197. }
  198. if (!has_content) {
  199. LOG(INFO) << '\n' << buf2 << buf1;
  200. } else {
  201. LOG(INFO) << '\n' << buf2 << butil::ToPrintableString(buf1, FLAGS_http_verbose_max_body_length);
  202. }
  203. }
  204. static void AddGrpcPrefix(butil::IOBuf* body, bool compressed) {
  205. char buf[5];
  206. buf[0] = (compressed ? 1 : 0);
  207. *(uint32_t*)(buf + 1) = butil::HostToNet32(body->size());
  208. butil::IOBuf tmp_buf;
  209. tmp_buf.append(buf, sizeof(buf));
  210. tmp_buf.append(butil::IOBuf::Movable(*body));
  211. body->swap(tmp_buf);
  212. }
  213. static bool RemoveGrpcPrefix(butil::IOBuf* body, bool* compressed) {
  214. if (body->empty()) {
  215. *compressed = false;
  216. return true;
  217. }
  218. const size_t sz = body->size();
  219. if (sz < (size_t)5) {
  220. return false;
  221. }
  222. char buf[5];
  223. body->cutn(buf, sizeof(buf));
  224. *compressed = buf[0];
  225. const size_t message_length = butil::NetToHost32(*(uint32_t*)(buf + 1));
  226. return (message_length + 5 == sz);
  227. }
  228. void ProcessHttpResponse(InputMessageBase* msg) {
  229. const int64_t start_parse_us = butil::cpuwide_time_us();
  230. DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
  231. Socket* socket = imsg_guard->socket();
  232. uint64_t cid_value;
  233. const bool is_http2 = imsg_guard->header().is_http2();
  234. if (is_http2) {
  235. H2StreamContext* h2_sctx = static_cast<H2StreamContext*>(msg);
  236. cid_value = h2_sctx->correlation_id();
  237. } else {
  238. cid_value = socket->correlation_id();
  239. }
  240. if (cid_value == 0) {
  241. LOG(WARNING) << "Fail to find correlation_id from " << *socket;
  242. return;
  243. }
  244. const bthread_id_t cid = { cid_value };
  245. Controller* cntl = NULL;
  246. const int rc = bthread_id_lock(cid, (void**)&cntl);
  247. if (rc != 0) {
  248. LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
  249. << "Fail to lock correlation_id=" << cid << ": " << berror(rc);
  250. return;
  251. }
  252. ControllerPrivateAccessor accessor(cntl);
  253. Span* span = accessor.span();
  254. if (span) {
  255. span->set_base_real_us(msg->base_real_us());
  256. span->set_received_us(msg->received_us());
  257. // TODO: changing when imsg_guard->read_body_progressively() is true
  258. span->set_response_size(imsg_guard->parsed_length());
  259. span->set_start_parse_us(start_parse_us);
  260. }
  261. HttpHeader* res_header = &cntl->http_response();
  262. res_header->Swap(imsg_guard->header());
  263. butil::IOBuf& res_body = imsg_guard->body();
  264. CHECK(cntl->response_attachment().empty());
  265. const int saved_error = cntl->ErrorCode();
  266. bool is_grpc_ct = false;
  267. const HttpContentType content_type =
  268. ParseContentType(res_header->content_type(), &is_grpc_ct);
  269. const bool is_grpc = (is_http2 && is_grpc_ct);
  270. bool grpc_compressed = false; // only valid when is_grpc is true.
  271. do {
  272. if (!is_http2) {
  273. // If header has "Connection: close", close the connection.
  274. const std::string* conn_cmd = res_header->GetHeader(common->CONNECTION);
  275. if (conn_cmd != NULL && 0 == strcasecmp(conn_cmd->c_str(), "close")) {
  276. // Server asked to close the connection.
  277. if (imsg_guard->read_body_progressively()) {
  278. // Close the socket when reading completes.
  279. socket->read_will_be_progressive(CONNECTION_TYPE_SHORT);
  280. } else {
  281. socket->SetFailed();
  282. }
  283. }
  284. } else if (is_grpc) {
  285. if (!RemoveGrpcPrefix(&res_body, &grpc_compressed)) {
  286. cntl->SetFailed(ERESPONSE, "Invalid gRPC response");
  287. break;
  288. }
  289. const std::string* grpc_status = res_header->GetHeader(common->GRPC_STATUS);
  290. if (grpc_status) {
  291. // TODO: More strict parsing
  292. GrpcStatus status = (GrpcStatus)strtol(grpc_status->data(), NULL, 10);
  293. if (status != GRPC_OK) {
  294. const std::string* grpc_message =
  295. res_header->GetHeader(common->GRPC_MESSAGE);
  296. if (grpc_message) {
  297. std::string message_decoded;
  298. PercentDecode(*grpc_message, &message_decoded);
  299. cntl->SetFailed(GrpcStatusToErrorCode(status), "%s",
  300. message_decoded.c_str());
  301. } else {
  302. cntl->SetFailed(GrpcStatusToErrorCode(status), "%s",
  303. GrpcStatusToString(status));
  304. }
  305. break;
  306. }
  307. }
  308. }
  309. if (imsg_guard->read_body_progressively()) {
  310. // Set RPA if needed
  311. accessor.set_readable_progressive_attachment(imsg_guard.get());
  312. const int sc = res_header->status_code();
  313. if (sc < 200 || sc >= 300) {
  314. // Even if the body is for streaming purpose, a non-OK status
  315. // code indicates that the body is probably the error text
  316. // which is helpful for debugging.
  317. // content may be binary data, so the size limit is a must.
  318. std::string body_str;
  319. res_body.copy_to(
  320. &body_str, std::min((int)res_body.size(),
  321. FLAGS_http_max_error_length));
  322. cntl->SetFailed(EHTTP, "HTTP/%d.%d %d %s: %.*s",
  323. res_header->major_version(),
  324. res_header->minor_version(),
  325. static_cast<int>(res_header->status_code()),
  326. res_header->reason_phrase(),
  327. (int)body_str.size(), body_str.c_str());
  328. } else if (cntl->response() != NULL &&
  329. cntl->response()->GetDescriptor()->field_count() != 0) {
  330. cntl->SetFailed(ERESPONSE, "A protobuf response can't be parsed"
  331. " from progressively-read HTTP body");
  332. }
  333. break;
  334. }
  335. // Fail RPC if status code is an error in http sense.
  336. // ErrorCode of RPC is unified to EHTTP.
  337. const int sc = res_header->status_code();
  338. if (sc < 200 || sc >= 300) {
  339. std::string err = butil::string_printf(
  340. "HTTP/%d.%d %d %s",
  341. res_header->major_version(),
  342. res_header->minor_version(),
  343. static_cast<int>(res_header->status_code()),
  344. res_header->reason_phrase());
  345. if (!res_body.empty()) {
  346. // Use content as error text if it's present. Notice that
  347. // content may be binary data, so the size limit is a must.
  348. err.append(": ");
  349. res_body.append_to(
  350. &err, std::min((int)res_body.size(),
  351. FLAGS_http_max_error_length));
  352. }
  353. cntl->SetFailed(EHTTP, "%s", err.c_str());
  354. if (cntl->response() == NULL ||
  355. cntl->response()->GetDescriptor()->field_count() == 0) {
  356. // A http call. Http users may need the body(containing a html,
  357. // json etc) even if the http call was failed. This is different
  358. // from protobuf services where responses are undefined when RPC
  359. // was failed.
  360. cntl->response_attachment().swap(res_body);
  361. }
  362. break;
  363. }
  364. if (cntl->response() == NULL ||
  365. cntl->response()->GetDescriptor()->field_count() == 0) {
  366. // a http call, content is the "real response".
  367. cntl->response_attachment().swap(res_body);
  368. break;
  369. }
  370. const std::string* encoding = NULL;
  371. if (is_grpc) {
  372. if (grpc_compressed) {
  373. encoding = res_header->GetHeader(common->GRPC_ENCODING);
  374. if (encoding == NULL) {
  375. cntl->SetFailed(ERESPONSE, "Fail to find header `grpc-encoding'"
  376. " in compressed gRPC response");
  377. break;
  378. }
  379. }
  380. } else {
  381. encoding = res_header->GetHeader(common->CONTENT_ENCODING);
  382. }
  383. if (encoding != NULL && *encoding == common->GZIP) {
  384. TRACEPRINTF("Decompressing response=%lu",
  385. (unsigned long)res_body.size());
  386. butil::IOBuf uncompressed;
  387. if (!policy::GzipDecompress(res_body, &uncompressed)) {
  388. cntl->SetFailed(ERESPONSE, "Fail to un-gzip response body");
  389. break;
  390. }
  391. res_body.swap(uncompressed);
  392. }
  393. if (content_type == HTTP_CONTENT_PROTO) {
  394. if (!ParsePbFromIOBuf(cntl->response(), res_body)) {
  395. cntl->SetFailed(ERESPONSE, "Fail to parse content");
  396. break;
  397. }
  398. } else if (content_type == HTTP_CONTENT_JSON) {
  399. // message body is json
  400. butil::IOBufAsZeroCopyInputStream wrapper(res_body);
  401. std::string err;
  402. json2pb::Json2PbOptions options;
  403. options.base64_to_bytes = cntl->has_pb_bytes_to_base64();
  404. if (!json2pb::JsonToProtoMessage(&wrapper, cntl->response(), options, &err)) {
  405. cntl->SetFailed(ERESPONSE, "Fail to parse content, %s", err.c_str());
  406. break;
  407. }
  408. } else {
  409. cntl->SetFailed(ERESPONSE,
  410. "Unknown content-type=%s when response is not NULL",
  411. res_header->content_type().c_str());
  412. break;
  413. }
  414. } while (0);
  415. // Unlocks correlation_id inside. Revert controller's
  416. // error code if it version check of `cid' fails
  417. imsg_guard.reset();
  418. accessor.OnResponse(cid, saved_error);
  419. }
  420. void SerializeHttpRequest(butil::IOBuf* /*not used*/,
  421. Controller* cntl,
  422. const google::protobuf::Message* pbreq) {
  423. HttpHeader& hreq = cntl->http_request();
  424. const bool is_http2 = (cntl->request_protocol() == PROTOCOL_H2);
  425. bool is_grpc = false;
  426. ControllerPrivateAccessor accessor(cntl);
  427. if (!accessor.protocol_param().empty() && hreq.content_type().empty()) {
  428. const std::string& param = accessor.protocol_param();
  429. if (param.find('/') == std::string::npos) {
  430. std::string& s = hreq.mutable_content_type();
  431. s.reserve(12 + param.size());
  432. s.append("application/");
  433. s.append(param);
  434. } else {
  435. hreq.set_content_type(param);
  436. }
  437. }
  438. if (pbreq != NULL) {
  439. // If request is not NULL, message body will be serialized proto/json,
  440. if (!pbreq->IsInitialized()) {
  441. return cntl->SetFailed(
  442. EREQUEST, "Missing required fields in request: %s",
  443. pbreq->InitializationErrorString().c_str());
  444. }
  445. if (!cntl->request_attachment().empty()) {
  446. return cntl->SetFailed(EREQUEST, "request_attachment must be empty "
  447. "when request is not NULL");
  448. }
  449. HttpContentType content_type = HTTP_CONTENT_OTHERS;
  450. if (hreq.content_type().empty()) {
  451. // Set content-type if user did not.
  452. // Note that http1.x defaults to json and h2 defaults to pb.
  453. if (is_http2) {
  454. content_type = HTTP_CONTENT_PROTO;
  455. hreq.set_content_type(common->CONTENT_TYPE_PROTO);
  456. } else {
  457. content_type = HTTP_CONTENT_JSON;
  458. hreq.set_content_type(common->CONTENT_TYPE_JSON);
  459. }
  460. } else {
  461. bool is_grpc_ct = false;
  462. content_type = ParseContentType(hreq.content_type(),
  463. &is_grpc_ct);
  464. is_grpc = (is_http2 && is_grpc_ct);
  465. }
  466. butil::IOBufAsZeroCopyOutputStream wrapper(&cntl->request_attachment());
  467. if (content_type == HTTP_CONTENT_PROTO) {
  468. // Serialize content as protobuf
  469. if (!pbreq->SerializeToZeroCopyStream(&wrapper)) {
  470. cntl->request_attachment().clear();
  471. return cntl->SetFailed(EREQUEST, "Fail to serialize %s",
  472. pbreq->GetTypeName().c_str());
  473. }
  474. } else if (content_type == HTTP_CONTENT_JSON) {
  475. std::string err;
  476. json2pb::Pb2JsonOptions opt;
  477. opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
  478. opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
  479. opt.always_print_primitive_fields = cntl->has_always_print_primitive_fields();
  480. opt.enum_option = (FLAGS_pb_enum_as_number
  481. ? json2pb::OUTPUT_ENUM_BY_NUMBER
  482. : json2pb::OUTPUT_ENUM_BY_NAME);
  483. if (!json2pb::ProtoMessageToJson(*pbreq, &wrapper, opt, &err)) {
  484. cntl->request_attachment().clear();
  485. return cntl->SetFailed(
  486. EREQUEST, "Fail to convert request to json, %s", err.c_str());
  487. }
  488. } else {
  489. return cntl->SetFailed(
  490. EREQUEST, "Cannot serialize pb request according to content_type=%s",
  491. hreq.content_type().c_str());
  492. }
  493. } else {
  494. // Use request_attachment.
  495. // TODO: Checking required fields of http header.
  496. }
  497. // Make RPC fail if uri() is not OK (previous SetHttpURL/operator= failed)
  498. if (!hreq.uri().status().ok()) {
  499. return cntl->SetFailed(EREQUEST, "%s",
  500. hreq.uri().status().error_cstr());
  501. }
  502. bool grpc_compressed = false;
  503. if (cntl->request_compress_type() != COMPRESS_TYPE_NONE) {
  504. if (cntl->request_compress_type() != COMPRESS_TYPE_GZIP) {
  505. return cntl->SetFailed(EREQUEST, "http does not support %s",
  506. CompressTypeToCStr(cntl->request_compress_type()));
  507. }
  508. const size_t request_size = cntl->request_attachment().size();
  509. if (request_size >= (size_t)FLAGS_http_body_compress_threshold) {
  510. TRACEPRINTF("Compressing request=%lu", (unsigned long)request_size);
  511. butil::IOBuf compressed;
  512. if (GzipCompress(cntl->request_attachment(), &compressed, NULL)) {
  513. cntl->request_attachment().swap(compressed);
  514. if (is_grpc) {
  515. grpc_compressed = true;
  516. hreq.SetHeader(common->GRPC_ENCODING, common->GZIP);
  517. } else {
  518. hreq.SetHeader(common->CONTENT_ENCODING, common->GZIP);
  519. }
  520. } else {
  521. cntl->SetFailed("Fail to gzip the request body, skip compressing");
  522. }
  523. }
  524. }
  525. // Fill log-id if user set it.
  526. if (cntl->has_log_id()) {
  527. hreq.SetHeader(common->LOG_ID,
  528. butil::string_printf(
  529. "%llu", (unsigned long long)cntl->log_id()));
  530. }
  531. if (!is_http2) {
  532. // HTTP before 1.1 needs to set keep-alive explicitly.
  533. if (hreq.before_http_1_1() &&
  534. cntl->connection_type() != CONNECTION_TYPE_SHORT &&
  535. hreq.GetHeader(common->CONNECTION) == NULL) {
  536. hreq.SetHeader(common->CONNECTION, common->KEEP_ALIVE);
  537. }
  538. } else {
  539. cntl->set_stream_creator(get_h2_global_stream_creator());
  540. if (is_grpc) {
  541. /*
  542. hreq.SetHeader(common->GRPC_ACCEPT_ENCODING,
  543. common->GRPC_ACCEPT_ENCODING_VALUE);
  544. */
  545. // TODO: do we need this?
  546. hreq.SetHeader(common->TE, common->TRAILERS);
  547. if (cntl->timeout_ms() >= 0) {
  548. hreq.SetHeader(common->GRPC_TIMEOUT,
  549. butil::string_printf("%" PRId64, cntl->timeout_ms()));
  550. }
  551. // Append compressed and length before body
  552. AddGrpcPrefix(&cntl->request_attachment(), grpc_compressed);
  553. }
  554. }
  555. // Set url to /ServiceName/MethodName when we're about to call protobuf
  556. // services (indicated by non-NULL method).
  557. const google::protobuf::MethodDescriptor* method = cntl->method();
  558. if (method != NULL) {
  559. hreq.set_method(HTTP_METHOD_POST);
  560. std::string path;
  561. path.reserve(2 + method->service()->full_name().size()
  562. + method->name().size());
  563. path.push_back('/');
  564. path.append(method->service()->full_name());
  565. path.push_back('/');
  566. path.append(method->name());
  567. hreq.uri().set_path(path);
  568. }
  569. Span* span = accessor.span();
  570. if (span) {
  571. hreq.SetHeader("x-bd-trace-id", butil::string_printf(
  572. "%llu", (unsigned long long)span->trace_id()));
  573. hreq.SetHeader("x-bd-span-id", butil::string_printf(
  574. "%llu", (unsigned long long)span->span_id()));
  575. hreq.SetHeader("x-bd-parent-span-id", butil::string_printf(
  576. "%llu", (unsigned long long)span->parent_span_id()));
  577. }
  578. }
  579. void PackHttpRequest(butil::IOBuf* buf,
  580. SocketMessage**,
  581. uint64_t correlation_id,
  582. const google::protobuf::MethodDescriptor*,
  583. Controller* cntl,
  584. const butil::IOBuf& /*unused*/,
  585. const Authenticator* auth) {
  586. if (cntl->connection_type() == CONNECTION_TYPE_SINGLE) {
  587. return cntl->SetFailed(EREQUEST, "http can't work with CONNECTION_TYPE_SINGLE");
  588. }
  589. ControllerPrivateAccessor accessor(cntl);
  590. HttpHeader* header = &cntl->http_request();
  591. if (auth != NULL && header->GetHeader(common->AUTHORIZATION) == NULL) {
  592. std::string auth_data;
  593. if (auth->GenerateCredential(&auth_data) != 0) {
  594. return cntl->SetFailed(EREQUEST, "Fail to GenerateCredential");
  595. }
  596. header->SetHeader(common->AUTHORIZATION, auth_data);
  597. }
  598. // Store `correlation_id' into Socket since http server
  599. // may not echo back this field. But we send it anyway.
  600. accessor.get_sending_socket()->set_correlation_id(correlation_id);
  601. MakeRawHttpRequest(buf, header, cntl->remote_side(),
  602. &cntl->request_attachment());
  603. if (FLAGS_http_verbose) {
  604. PrintMessage(*buf, true, true);
  605. }
  606. }
  607. inline bool SupportGzip(Controller* cntl) {
  608. const std::string* encodings =
  609. cntl->http_request().GetHeader(common->ACCEPT_ENCODING);
  610. return (encodings && encodings->find(common->GZIP) != std::string::npos);
  611. }
  612. class HttpResponseSender {
  613. friend class HttpResponseSenderAsDone;
  614. public:
  615. HttpResponseSender()
  616. : _method_status(NULL), _received_us(0), _h2_stream_id(-1) {}
  617. HttpResponseSender(Controller* cntl/*own*/)
  618. : _cntl(cntl), _method_status(NULL), _received_us(0), _h2_stream_id(-1) {}
  619. HttpResponseSender(HttpResponseSender&& s)
  620. : _cntl(std::move(s._cntl))
  621. , _req(std::move(s._req))
  622. , _res(std::move(s._res))
  623. , _method_status(std::move(s._method_status))
  624. , _received_us(s._received_us)
  625. , _h2_stream_id(s._h2_stream_id) {
  626. }
  627. ~HttpResponseSender();
  628. void own_request(google::protobuf::Message* req) { _req.reset(req); }
  629. void own_response(google::protobuf::Message* res) { _res.reset(res); }
  630. void set_method_status(MethodStatus* ms) { _method_status = ms; }
  631. void set_received_us(int64_t t) { _received_us = t; }
  632. void set_h2_stream_id(int id) { _h2_stream_id = id; }
  633. private:
  634. std::unique_ptr<Controller, LogErrorTextAndDelete> _cntl;
  635. std::unique_ptr<google::protobuf::Message> _req;
  636. std::unique_ptr<google::protobuf::Message> _res;
  637. MethodStatus* _method_status;
  638. int64_t _received_us;
  639. int _h2_stream_id;
  640. };
  641. class HttpResponseSenderAsDone : public google::protobuf::Closure {
  642. public:
  643. HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {}
  644. void Run() override { delete this; }
  645. private:
  646. HttpResponseSender _sender;
  647. };
  648. HttpResponseSender::~HttpResponseSender() {
  649. Controller* cntl = _cntl.get();
  650. if (cntl == NULL) {
  651. return;
  652. }
  653. ControllerPrivateAccessor accessor(cntl);
  654. Span* span = accessor.span();
  655. if (span) {
  656. span->set_start_send_us(butil::cpuwide_time_us());
  657. }
  658. ConcurrencyRemover concurrency_remover(_method_status, cntl, _received_us);
  659. Socket* socket = accessor.get_sending_socket();
  660. const google::protobuf::Message* res = _res.get();
  661. if (cntl->IsCloseConnection()) {
  662. socket->SetFailed();
  663. return;
  664. }
  665. const HttpHeader* req_header = &cntl->http_request();
  666. HttpHeader* res_header = &cntl->http_response();
  667. res_header->set_version(req_header->major_version(),
  668. req_header->minor_version());
  669. const std::string* content_type_str = &res_header->content_type();
  670. if (content_type_str->empty()) {
  671. // Use request's content_type if response's is not set.
  672. content_type_str = &req_header->content_type();
  673. res_header->set_content_type(*content_type_str);
  674. }
  675. // Notice that HTTP1 can have a header named `grpc-encoding' as well
  676. // which should be treated as an user-defined header and ignored by
  677. // the framework.
  678. bool is_grpc_ct = false;
  679. const HttpContentType content_type = ParseContentType(*content_type_str, &is_grpc_ct);
  680. const bool is_http2 = req_header->is_http2();
  681. const bool is_grpc = (is_http2 && is_grpc_ct);
  682. // Convert response to json/proto if needed.
  683. // Notice: Not check res->IsInitialized() which should be checked in the
  684. // conversion function.
  685. if (res != NULL &&
  686. cntl->response_attachment().empty() &&
  687. // ^ user did not fill the body yet.
  688. res->GetDescriptor()->field_count() > 0 &&
  689. // ^ a pb service
  690. !cntl->Failed()) {
  691. // ^ pb response in failed RPC is undefined, no need to convert.
  692. butil::IOBufAsZeroCopyOutputStream wrapper(&cntl->response_attachment());
  693. if (content_type == HTTP_CONTENT_PROTO) {
  694. if (!res->SerializeToZeroCopyStream(&wrapper)) {
  695. cntl->SetFailed(ERESPONSE, "Fail to serialize %s", res->GetTypeName().c_str());
  696. }
  697. } else {
  698. std::string err;
  699. json2pb::Pb2JsonOptions opt;
  700. opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
  701. opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
  702. opt.always_print_primitive_fields = cntl->has_always_print_primitive_fields();
  703. opt.enum_option = (FLAGS_pb_enum_as_number
  704. ? json2pb::OUTPUT_ENUM_BY_NUMBER
  705. : json2pb::OUTPUT_ENUM_BY_NAME);
  706. if (!json2pb::ProtoMessageToJson(*res, &wrapper, opt, &err)) {
  707. cntl->SetFailed(ERESPONSE, "Fail to convert response to json, %s", err.c_str());
  708. }
  709. }
  710. }
  711. // In HTTP 0.9, the server always closes the connection after sending the
  712. // response. The client must close its end of the connection after
  713. // receiving the response.
  714. // In HTTP 1.0, the server always closes the connection after sending the
  715. // response UNLESS the client sent a Connection: keep-alive request header
  716. // and the server sent a Connection: keep-alive response header. If no
  717. // such response header exists, the client must close its end of the
  718. // connection after receiving the response.
  719. // In HTTP 1.1, the server does not close the connection after sending
  720. // the response UNLESS the client sent a Connection: close request header,
  721. // or the server sent a Connection: close response header. If such a
  722. // response header exists, the client must close its end of the connection
  723. // after receiving the response.
  724. if (!is_http2) {
  725. const std::string* res_conn = res_header->GetHeader(common->CONNECTION);
  726. if (res_conn == NULL || strcasecmp(res_conn->c_str(), "close") != 0) {
  727. const std::string* req_conn =
  728. req_header->GetHeader(common->CONNECTION);
  729. if (req_header->before_http_1_1()) {
  730. if (req_conn != NULL &&
  731. strcasecmp(req_conn->c_str(), "keep-alive") == 0) {
  732. res_header->SetHeader(common->CONNECTION, common->KEEP_ALIVE);
  733. }
  734. } else {
  735. if (req_conn != NULL &&
  736. strcasecmp(req_conn->c_str(), "close") == 0) {
  737. res_header->SetHeader(common->CONNECTION, common->CLOSE);
  738. }
  739. }
  740. } // else user explicitly set Connection:close, clients of
  741. // HTTP 1.1/1.0/0.9 should all close the connection.
  742. } else if (is_grpc) {
  743. // status code is always 200 according to grpc protocol
  744. res_header->set_status_code(HTTP_STATUS_OK);
  745. }
  746. bool grpc_compressed = false;
  747. if (cntl->Failed()) {
  748. cntl->response_attachment().clear();
  749. if (!is_grpc) {
  750. // Set status-code with default value(converted from error code)
  751. // if user did not set it.
  752. if (res_header->status_code() == HTTP_STATUS_OK) {
  753. res_header->set_status_code(ErrorCodeToStatusCode(cntl->ErrorCode()));
  754. }
  755. // Fill ErrorCode into header
  756. res_header->SetHeader(common->ERROR_CODE,
  757. butil::string_printf("%d", cntl->ErrorCode()));
  758. // Fill body with ErrorText.
  759. // user may compress the output and change content-encoding. However
  760. // body is error-text right now, remove the header.
  761. res_header->RemoveHeader(common->CONTENT_ENCODING);
  762. res_header->set_content_type(common->CONTENT_TYPE_TEXT);
  763. cntl->response_attachment().append(cntl->ErrorText());
  764. }
  765. } else if (cntl->has_progressive_writer()) {
  766. // Transfer-Encoding is supported since HTTP/1.1
  767. if (res_header->major_version() < 2 && !res_header->before_http_1_1()) {
  768. res_header->SetHeader("Transfer-Encoding", "chunked");
  769. }
  770. if (!cntl->response_attachment().empty()) {
  771. LOG(ERROR) << "response_attachment(size="
  772. << cntl->response_attachment().size() << ") will be"
  773. " ignored when CreateProgressiveAttachment() was called";
  774. }
  775. // not set_content to enable chunked mode.
  776. } else if (cntl->response_compress_type() == COMPRESS_TYPE_GZIP) {
  777. const size_t response_size = cntl->response_attachment().size();
  778. if (response_size >= (size_t)FLAGS_http_body_compress_threshold
  779. && (is_http2 || SupportGzip(cntl))) {
  780. TRACEPRINTF("Compressing response=%lu", (unsigned long)response_size);
  781. butil::IOBuf tmpbuf;
  782. if (GzipCompress(cntl->response_attachment(), &tmpbuf, NULL)) {
  783. cntl->response_attachment().swap(tmpbuf);
  784. if (is_grpc) {
  785. grpc_compressed = true;
  786. res_header->SetHeader(common->GRPC_ENCODING, common->GZIP);
  787. } else {
  788. res_header->SetHeader(common->CONTENT_ENCODING, common->GZIP);
  789. }
  790. } else {
  791. LOG(ERROR) << "Fail to gzip the http response, skip compression.";
  792. }
  793. }
  794. } else {
  795. // TODO(gejun): Support snappy (grpc)
  796. LOG_IF(ERROR, cntl->response_compress_type() != COMPRESS_TYPE_NONE)
  797. << "Unknown compress_type=" << cntl->response_compress_type()
  798. << ", skip compression.";
  799. }
  800. int rc = -1;
  801. // Have the risk of unlimited pending responses, in which case, tell
  802. // users to set max_concurrency.
  803. Socket::WriteOptions wopt;
  804. wopt.ignore_eovercrowded = true;
  805. if (is_http2) {
  806. if (is_grpc) {
  807. // Append compressed and length before body
  808. AddGrpcPrefix(&cntl->response_attachment(), grpc_compressed);
  809. }
  810. SocketMessagePtr<H2UnsentResponse> h2_response(
  811. H2UnsentResponse::New(cntl, _h2_stream_id, is_grpc));
  812. if (h2_response == NULL) {
  813. LOG(ERROR) << "Fail to make http2 response";
  814. errno = EINVAL;
  815. rc = -1;
  816. } else {
  817. if (FLAGS_http_verbose) {
  818. LOG(INFO) << '\n' << *h2_response;
  819. }
  820. if (span) {
  821. span->set_response_size(h2_response->EstimatedByteSize());
  822. }
  823. rc = socket->Write(h2_response, &wopt);
  824. }
  825. } else {
  826. butil::IOBuf* content = NULL;
  827. if (cntl->Failed() || !cntl->has_progressive_writer()) {
  828. content = &cntl->response_attachment();
  829. }
  830. butil::IOBuf res_buf;
  831. MakeRawHttpResponse(&res_buf, res_header, content);
  832. if (FLAGS_http_verbose) {
  833. PrintMessage(res_buf, false, !!content);
  834. }
  835. if (span) {
  836. span->set_response_size(res_buf.size());
  837. }
  838. rc = socket->Write(&res_buf, &wopt);
  839. }
  840. if (rc != 0) {
  841. // EPIPE is common in pooled connections + backup requests.
  842. const int errcode = errno;
  843. PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket;
  844. cntl->SetFailed(errcode, "Fail to write into %s", socket->description().c_str());
  845. return;
  846. }
  847. if (span) {
  848. // TODO: this is not sent
  849. span->set_sent_us(butil::cpuwide_time_us());
  850. }
  851. }
  852. // Normalize the sub string of `uri_path' covered by `splitter' and
  853. // put it into `unresolved_path'
  854. static void FillUnresolvedPath(std::string* unresolved_path,
  855. const std::string& uri_path,
  856. butil::StringSplitter& splitter) {
  857. if (unresolved_path == NULL) {
  858. return;
  859. }
  860. if (!splitter) {
  861. unresolved_path->clear();
  862. return;
  863. }
  864. // Normalize unresolve_path.
  865. const size_t path_len =
  866. uri_path.c_str() + uri_path.size() - splitter.field();
  867. unresolved_path->reserve(path_len);
  868. unresolved_path->clear();
  869. for (butil::StringSplitter slash_sp(
  870. splitter.field(), splitter.field() + path_len, '/');
  871. slash_sp != NULL; ++slash_sp) {
  872. if (!unresolved_path->empty()) {
  873. unresolved_path->push_back('/');
  874. }
  875. unresolved_path->append(slash_sp.field(), slash_sp.length());
  876. }
  877. }
  878. inline const Server::MethodProperty*
  879. FindMethodPropertyByURIImpl(const std::string& uri_path, const Server* server,
  880. std::string* unresolved_path) {
  881. ServerPrivateAccessor wrapper(server);
  882. butil::StringSplitter splitter(uri_path.c_str(), '/');
  883. // Show index page for empty URI
  884. if (NULL == splitter) {
  885. return wrapper.FindMethodPropertyByFullName(
  886. IndexService::descriptor()->full_name(), common->DEFAULT_METHOD);
  887. }
  888. butil::StringPiece service_name(splitter.field(), splitter.length());
  889. const bool full_service_name =
  890. (service_name.find('.') != butil::StringPiece::npos);
  891. const Server::ServiceProperty* const sp =
  892. (full_service_name ?
  893. wrapper.FindServicePropertyByFullName(service_name) :
  894. wrapper.FindServicePropertyByName(service_name));
  895. if (NULL == sp) {
  896. // normal for urls matching _global_restful_map
  897. return NULL;
  898. }
  899. // Find restful methods by uri.
  900. if (sp->restful_map) {
  901. ++splitter;
  902. butil::StringPiece left_path;
  903. if (splitter) {
  904. // The -1 is for including /, always safe because of ++splitter
  905. left_path.set(splitter.field() - 1, uri_path.c_str() +
  906. uri_path.size() - splitter.field() + 1);
  907. }
  908. return sp->restful_map->FindMethodProperty(left_path, unresolved_path);
  909. }
  910. if (!full_service_name) {
  911. // Change to service's fullname.
  912. service_name = sp->service->GetDescriptor()->full_name();
  913. }
  914. // Regard URI as [service_name]/[method_name]
  915. const Server::MethodProperty* mp = NULL;
  916. butil::StringPiece method_name;
  917. if (++splitter != NULL) {
  918. method_name.set(splitter.field(), splitter.length());
  919. // Copy splitter rather than modifying it directly since it's used
  920. // in later branches.
  921. mp = wrapper.FindMethodPropertyByFullName(service_name, method_name);
  922. if (mp) {
  923. ++splitter; // skip method name
  924. FillUnresolvedPath(unresolved_path, uri_path, splitter);
  925. return mp;
  926. }
  927. }
  928. // Try [service_name]/default_method
  929. mp = wrapper.FindMethodPropertyByFullName(service_name, common->DEFAULT_METHOD);
  930. if (mp) {
  931. FillUnresolvedPath(unresolved_path, uri_path, splitter);
  932. return mp;
  933. }
  934. // Call BadMethodService::no_method for service_name-only URL.
  935. if (method_name.empty()) {
  936. return wrapper.FindMethodPropertyByFullName(
  937. BadMethodService::descriptor()->full_name(), common->NO_METHOD);
  938. }
  939. // Called an existing service w/o default_method with an unknown method.
  940. return NULL;
  941. }
  942. // Used in UT, don't be static
  943. const Server::MethodProperty*
  944. FindMethodPropertyByURI(const std::string& uri_path, const Server* server,
  945. std::string* unresolved_path) {
  946. const Server::MethodProperty* mp =
  947. FindMethodPropertyByURIImpl(uri_path, server, unresolved_path);
  948. if (mp != NULL) {
  949. if (mp->http_url != NULL) {
  950. // the restful method is accessed from its
  951. // default url (SERVICE/METHOD) which should be rejected.
  952. return NULL;
  953. }
  954. return mp;
  955. }
  956. // uri_path cannot match any methods with exact service_name. Match
  957. // the fuzzy patterns in global restful map which often matches
  958. // extension names. Say "*.txt => get_text_file, *.mp4 => download_mp4".
  959. ServerPrivateAccessor accessor(server);
  960. if (accessor.global_restful_map()) {
  961. return accessor.global_restful_map()->FindMethodProperty(
  962. uri_path, unresolved_path);
  963. }
  964. return NULL;
  965. }
  966. ParseResult ParseHttpMessage(butil::IOBuf *source, Socket *socket,
  967. bool read_eof, const void* /*arg*/) {
  968. HttpContext* http_imsg =
  969. static_cast<HttpContext*>(socket->parsing_context());
  970. if (http_imsg == NULL) {
  971. if (read_eof || source->empty()) {
  972. // 1. read_eof: Read EOF after intact HTTP messages, a common case.
  973. // Notice that errors except NOT_ENOUGH_DATA can't be returned
  974. // otherwise the Socket will be SetFailed() and messages just
  975. // in ProcessHttpXXX() may be dropped.
  976. // 2. source->empty(): also common, InputMessage tries parse
  977. // handlers until error is met. If a message was consumed,
  978. // source is likely to be empty.
  979. return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
  980. }
  981. http_imsg = new (std::nothrow) HttpContext(socket->is_read_progressive());
  982. if (http_imsg == NULL) {
  983. LOG(FATAL) << "Fail to new HttpContext";
  984. return MakeParseError(PARSE_ERROR_NO_RESOURCE);
  985. }
  986. // Parsing http is costly, parsing an incomplete http message from the
  987. // beginning repeatedly should be avoided, otherwise the cost may reach
  988. // O(n^2) in the worst case. Save incomplete http messages in sockets
  989. // to prevent re-parsing. The message will be released when it is
  990. // completed or destroyed along with the socket.
  991. socket->reset_parsing_context(http_imsg);
  992. }
  993. ssize_t rc = 0;
  994. if (read_eof) {
  995. // Send EOF to HttpContext, check comments in http_message.h
  996. rc = http_imsg->ParseFromArray(NULL, 0);
  997. } else {
  998. // Empty `source' is sliently ignored and 0 is returned, check
  999. // comments in http_message.h
  1000. rc = http_imsg->ParseFromIOBuf(*source);
  1001. }
  1002. if (http_imsg->is_stage2()) {
  1003. // The header part is already parsed as an intact HTTP message
  1004. // to the ProcessHttpXXX. Here parses the body part.
  1005. if (rc >= 0) {
  1006. source->pop_front(rc);
  1007. if (http_imsg->Completed()) {
  1008. // Already returned the message before, don't return again.
  1009. CHECK_EQ(http_imsg, socket->release_parsing_context());
  1010. // NOTE: calling http_imsg->Destroy() is wrong which can only
  1011. // be called from ProcessHttpXXX
  1012. http_imsg->RemoveOneRefForStage2();
  1013. socket->OnProgressiveReadCompleted();
  1014. return MakeMessage(NULL);
  1015. } else {
  1016. return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
  1017. }
  1018. } else {
  1019. // Fail to parse the body. Since headers were parsed successfully,
  1020. // the message is assumed to be HTTP, stop trying other protocols.
  1021. const char* err = http_errno_description(
  1022. HTTP_PARSER_ERRNO(&http_imsg->parser()));
  1023. return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG, err);
  1024. }
  1025. } else if (rc >= 0) {
  1026. // Normal or stage1 of progressive-read http message.
  1027. source->pop_front(rc);
  1028. if (http_imsg->Completed()) {
  1029. CHECK_EQ(http_imsg, socket->release_parsing_context());
  1030. const ParseResult result = MakeMessage(http_imsg);
  1031. if (socket->is_read_progressive()) {
  1032. socket->OnProgressiveReadCompleted();
  1033. }
  1034. return result;
  1035. } else if (socket->is_read_progressive() &&
  1036. http_imsg->stage() >= HTTP_ON_HEADERS_COMPLELE) {
  1037. // header part of a progressively-read http message is complete,
  1038. // go on to ProcessHttpXXX w/o waiting for full body.
  1039. http_imsg->AddOneRefForStage2(); // released when body is fully read
  1040. return MakeMessage(http_imsg);
  1041. } else {
  1042. return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
  1043. }
  1044. } else if (!socket->CreatedByConnect()) {
  1045. // Note: If the parser fails at query-string/fragment/the-following
  1046. // -"HTTP/x.y", the message is very likely to be in http format (not
  1047. // other protocols registered after http). We send 400 back to client
  1048. // which is more informational than just closing the connection (may
  1049. // cause OP's alarms if the remote side is baidu's nginx). To do this,
  1050. // We make InputMessenger do nothing by cheating it with
  1051. // PARSE_ERROR_NOT_ENOUGH_DATA and remove the addtitional ref of the
  1052. // socket so that it will be recycled when the response is written.
  1053. // We can't use SetFailed which interrupts the writing.
  1054. // Tricky: Socket::ReleaseAdditionalReference() does not remove the
  1055. // internal fd from epoll thus we can still get EPOLLIN and read
  1056. // in more data. If the second read happens, parsing_context()
  1057. // should return the same InputMessage that we see now because we
  1058. // don't reset_parsing_context(NULL) in this branch, and following
  1059. // ParseFromXXX should return -1 immediately because of the non-zero
  1060. // parser.http_errno, and ReleaseAdditionalReference() here should
  1061. // return -1 to prevent us from sending another 400.
  1062. if (is_failed_after_queries(&http_imsg->parser())) {
  1063. int rc = socket->ReleaseAdditionalReference();
  1064. if (rc < 0) {
  1065. // Already released, leave the socket to be recycled
  1066. // by itself.
  1067. return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
  1068. } else if (rc > 0) {
  1069. LOG(ERROR) << "Impossible: Recycled!";
  1070. return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
  1071. }
  1072. // Send 400 back.
  1073. butil::IOBuf bad_req;
  1074. HttpHeader header;
  1075. header.set_status_code(HTTP_STATUS_BAD_REQUEST);
  1076. MakeRawHttpRequest(&bad_req, &header, socket->remote_side(), NULL);
  1077. Socket::WriteOptions wopt;
  1078. wopt.ignore_eovercrowded = true;
  1079. socket->Write(&bad_req, &wopt);
  1080. return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
  1081. } else {
  1082. return MakeParseError(PARSE_ERROR_TRY_OTHERS);
  1083. }
  1084. } else {
  1085. if (is_failed_after_http_version(&http_imsg->parser())) {
  1086. return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG,
  1087. "invalid http response");
  1088. }
  1089. return MakeParseError(PARSE_ERROR_TRY_OTHERS);
  1090. }
  1091. }
  1092. bool VerifyHttpRequest(const InputMessageBase* msg) {
  1093. Server* server = (Server*)msg->arg();
  1094. Socket* socket = msg->socket();
  1095. HttpContext* http_request = (HttpContext*)msg;
  1096. const Authenticator* auth = server->options().auth;
  1097. if (NULL == auth) {
  1098. // Fast pass
  1099. return true;
  1100. }
  1101. const Server::MethodProperty* mp = FindMethodPropertyByURI(
  1102. http_request->header().uri().path(), server, NULL);
  1103. if (mp != NULL &&
  1104. mp->is_builtin_service &&
  1105. mp->service->GetDescriptor() != BadMethodService::descriptor()) {
  1106. // BuiltinService doesn't need authentication
  1107. // TODO: Fix backdoor that sends BuiltinService at first
  1108. // and then sends other requests without authentication
  1109. return true;
  1110. }
  1111. const std::string *authorization
  1112. = http_request->header().GetHeader("Authorization");
  1113. if (authorization == NULL) {
  1114. return false;
  1115. }
  1116. butil::EndPoint user_addr;
  1117. if (!GetUserAddressFromHeader(http_request->header(), &user_addr)) {
  1118. user_addr = socket->remote_side();
  1119. }
  1120. return auth->VerifyCredential(*authorization, user_addr,
  1121. socket->mutable_auth_context()) == 0;
  1122. }
  1123. // Defined in baidu_rpc_protocol.cpp
  1124. void EndRunningCallMethodInPool(
  1125. ::google::protobuf::Service* service,
  1126. const ::google::protobuf::MethodDescriptor* method,
  1127. ::google::protobuf::RpcController* controller,
  1128. const ::google::protobuf::Message* request,
  1129. ::google::protobuf::Message* response,
  1130. ::google::protobuf::Closure* done);
  1131. void ProcessHttpRequest(InputMessageBase *msg) {
  1132. const int64_t start_parse_us = butil::cpuwide_time_us();
  1133. DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
  1134. SocketUniquePtr socket_guard(imsg_guard->ReleaseSocket());
  1135. Socket* socket = socket_guard.get();
  1136. const Server* server = static_cast<const Server*>(msg->arg());
  1137. ScopedNonServiceError non_service_error(server);
  1138. Controller* cntl = new (std::nothrow) Controller;
  1139. if (NULL == cntl) {
  1140. LOG(FATAL) << "Fail to new Controller";
  1141. return;
  1142. }
  1143. HttpResponseSender resp_sender(cntl);
  1144. resp_sender.set_received_us(msg->received_us());
  1145. const bool is_http2 = imsg_guard->header().is_http2();
  1146. if (is_http2) {
  1147. H2StreamContext* h2_sctx = static_cast<H2StreamContext*>(msg);
  1148. resp_sender.set_h2_stream_id(h2_sctx->stream_id());
  1149. }
  1150. ControllerPrivateAccessor accessor(cntl);
  1151. HttpHeader& req_header = cntl->http_request();
  1152. imsg_guard->header().Swap(req_header);
  1153. butil::IOBuf& req_body = imsg_guard->body();
  1154. butil::EndPoint user_addr;
  1155. if (!GetUserAddressFromHeader(req_header, &user_addr)) {
  1156. user_addr = socket->remote_side();
  1157. }
  1158. ServerPrivateAccessor server_accessor(server);
  1159. const bool security_mode = server->options().security_mode() &&
  1160. socket->user() == server_accessor.acceptor();
  1161. accessor.set_server(server)
  1162. .set_security_mode(security_mode)
  1163. .set_peer_id(socket->id())
  1164. .set_remote_side(user_addr)
  1165. .set_local_side(socket->local_side())
  1166. .set_auth_context(socket->auth_context())
  1167. .set_request_protocol(PROTOCOL_HTTP)
  1168. .set_begin_time_us(msg->received_us())
  1169. .move_in_server_receiving_sock(socket_guard);
  1170. // Read log-id. errno may be set when input to strtoull overflows.
  1171. // atoi/atol/atoll don't support 64-bit integer and can't be used.
  1172. const std::string* log_id_str = req_header.GetHeader(common->LOG_ID);
  1173. if (log_id_str) {
  1174. char* logid_end = NULL;
  1175. errno = 0;
  1176. uint64_t logid = strtoull(log_id_str->c_str(), &logid_end, 10);
  1177. if (*logid_end || errno) {
  1178. LOG(ERROR) << "Invalid " << common->LOG_ID << '='
  1179. << *log_id_str << " in http request";
  1180. } else {
  1181. cntl->set_log_id(logid);
  1182. }
  1183. }
  1184. // Tag the bthread with this server's key for
  1185. // thread_local_data().
  1186. if (server->thread_local_options().thread_local_data_factory) {
  1187. bthread_assign_data((void*)&server->thread_local_options());
  1188. }
  1189. Span* span = NULL;
  1190. const std::string& path = req_header.uri().path();
  1191. const std::string* trace_id_str = req_header.GetHeader("x-bd-trace-id");
  1192. if (IsTraceable(trace_id_str)) {
  1193. uint64_t trace_id = 0;
  1194. if (trace_id_str) {
  1195. trace_id = strtoull(trace_id_str->c_str(), NULL, 10);
  1196. }
  1197. uint64_t span_id = 0;
  1198. const std::string* span_id_str = req_header.GetHeader("x-bd-span-id");
  1199. if (span_id_str) {
  1200. span_id = strtoull(span_id_str->c_str(), NULL, 10);
  1201. }
  1202. uint64_t parent_span_id = 0;
  1203. const std::string* parent_span_id_str =
  1204. req_header.GetHeader("x-bd-parent-span-id");
  1205. if (parent_span_id_str) {
  1206. parent_span_id = strtoull(parent_span_id_str->c_str(), NULL, 10);
  1207. }
  1208. span = Span::CreateServerSpan(
  1209. path, trace_id, span_id, parent_span_id, msg->base_real_us());
  1210. accessor.set_span(span);
  1211. span->set_log_id(cntl->log_id());
  1212. span->set_remote_side(user_addr);
  1213. span->set_received_us(msg->received_us());
  1214. span->set_start_parse_us(start_parse_us);
  1215. span->set_protocol(is_http2 ? PROTOCOL_H2 : PROTOCOL_HTTP);
  1216. span->set_request_size(imsg_guard->parsed_length());
  1217. }
  1218. if (!server->IsRunning()) {
  1219. cntl->SetFailed(ELOGOFF, "Server is stopping");
  1220. return;
  1221. }
  1222. if (server->options().http_master_service) {
  1223. // If http_master_service is on, just call it.
  1224. google::protobuf::Service* svc = server->options().http_master_service;
  1225. const google::protobuf::MethodDescriptor* md =
  1226. svc->GetDescriptor()->FindMethodByName(common->DEFAULT_METHOD);
  1227. if (md == NULL) {
  1228. cntl->SetFailed(ENOMETHOD, "No default_method in http_master_service");
  1229. return;
  1230. }
  1231. accessor.set_method(md);
  1232. cntl->request_attachment().swap(req_body);
  1233. google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
  1234. if (span) {
  1235. span->ResetServerSpanName(md->full_name());
  1236. span->set_start_callback_us(butil::cpuwide_time_us());
  1237. span->AsParent();
  1238. }
  1239. // `cntl', `req' and `res' will be deleted inside `done'
  1240. return svc->CallMethod(md, cntl, NULL, NULL, done);
  1241. }
  1242. const Server::MethodProperty* const sp =
  1243. FindMethodPropertyByURI(path, server, &req_header._unresolved_path);
  1244. if (NULL == sp) {
  1245. if (security_mode) {
  1246. std::string escape_path;
  1247. WebEscape(path, &escape_path);
  1248. cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", escape_path.c_str());
  1249. } else {
  1250. cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", path.c_str());
  1251. }
  1252. return;
  1253. } else if (sp->service->GetDescriptor() == BadMethodService::descriptor()) {
  1254. BadMethodRequest breq;
  1255. BadMethodResponse bres;
  1256. butil::StringSplitter split(path.c_str(), '/');
  1257. breq.set_service_name(std::string(split.field(), split.length()));
  1258. sp->service->CallMethod(sp->method, cntl, &breq, &bres, NULL);
  1259. return;
  1260. }
  1261. // Switch to service-specific error.
  1262. non_service_error.release();
  1263. MethodStatus* method_status = sp->status;
  1264. resp_sender.set_method_status(method_status);
  1265. if (method_status) {
  1266. int rejected_cc = 0;
  1267. if (!method_status->OnRequested(&rejected_cc)) {
  1268. cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
  1269. sp->method->full_name().c_str(), rejected_cc);
  1270. return;
  1271. }
  1272. }
  1273. if (span) {
  1274. span->ResetServerSpanName(sp->method->full_name());
  1275. }
  1276. // NOTE: accesses to builtin services are not counted as part of
  1277. // concurrency, therefore are not limited by ServerOptions.max_concurrency.
  1278. if (!sp->is_builtin_service && !sp->params.is_tabbed) {
  1279. if (socket->is_overcrowded()) {
  1280. cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
  1281. butil::endpoint2str(socket->remote_side()).c_str());
  1282. return;
  1283. }
  1284. if (!server_accessor.AddConcurrency(cntl)) {
  1285. cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
  1286. server->options().max_concurrency);
  1287. return;
  1288. }
  1289. if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
  1290. cntl->SetFailed(ELIMIT, "Too many user code to run when"
  1291. " -usercode_in_pthread is on");
  1292. return;
  1293. }
  1294. } else if (security_mode) {
  1295. cntl->SetFailed(EPERM, "Not allowed to access builtin services, try "
  1296. "ServerOptions.internal_port=%d instead if you're in"
  1297. " internal network", server->options().internal_port);
  1298. return;
  1299. }
  1300. google::protobuf::Service* svc = sp->service;
  1301. const google::protobuf::MethodDescriptor* method = sp->method;
  1302. accessor.set_method(method);
  1303. google::protobuf::Message* req = svc->GetRequestPrototype(method).New();
  1304. resp_sender.own_request(req);
  1305. google::protobuf::Message* res = svc->GetResponsePrototype(method).New();
  1306. resp_sender.own_response(res);
  1307. if (__builtin_expect(!req || !res, 0)) {
  1308. PLOG(FATAL) << "Fail to new req or res";
  1309. cntl->SetFailed("Fail to new req or res");
  1310. return;
  1311. }
  1312. if (sp->params.allow_http_body_to_pb &&
  1313. method->input_type()->field_count() > 0) {
  1314. // A protobuf service. No matter if Content-type is set to
  1315. // applcation/json or body is empty, we have to treat body as a json
  1316. // and try to convert it to pb, which guarantees that a protobuf
  1317. // service is always accessed with valid requests.
  1318. if (req_body.empty()) {
  1319. // Treat empty body specially since parsing it results in error
  1320. if (!req->IsInitialized()) {
  1321. cntl->SetFailed(EREQUEST, "%s needs to be created from a"
  1322. " non-empty json, it has required fields.",
  1323. req->GetDescriptor()->full_name().c_str());
  1324. return;
  1325. } // else all fields of the request are optional.
  1326. } else {
  1327. bool is_grpc_ct = false;
  1328. const HttpContentType content_type =
  1329. ParseContentType(req_header.content_type(), &is_grpc_ct);
  1330. const std::string* encoding = NULL;
  1331. if (is_http2) {
  1332. if (is_grpc_ct) {
  1333. bool grpc_compressed = false;
  1334. if (!RemoveGrpcPrefix(&req_body, &grpc_compressed)) {
  1335. cntl->SetFailed(ERESPONSE, "Invalid gRPC response");
  1336. return;
  1337. }
  1338. if (grpc_compressed) {
  1339. encoding = req_header.GetHeader(common->GRPC_ENCODING);
  1340. if (encoding == NULL) {
  1341. cntl->SetFailed(
  1342. EREQUEST, "Fail to find header `grpc-encoding'"
  1343. " in compressed gRPC request");
  1344. return;
  1345. }
  1346. }
  1347. int64_t timeout_value_us =
  1348. ConvertGrpcTimeoutToUS(req_header.GetHeader(common->GRPC_TIMEOUT));
  1349. if (timeout_value_us >= 0) {
  1350. accessor.set_deadline_us(
  1351. butil::gettimeofday_us() + timeout_value_us);
  1352. }
  1353. }
  1354. } else {
  1355. encoding = req_header.GetHeader(common->CONTENT_ENCODING);
  1356. }
  1357. if (encoding != NULL && *encoding == common->GZIP) {
  1358. TRACEPRINTF("Decompressing request=%lu",
  1359. (unsigned long)req_body.size());
  1360. butil::IOBuf uncompressed;
  1361. if (!policy::GzipDecompress(req_body, &uncompressed)) {
  1362. cntl->SetFailed(EREQUEST, "Fail to un-gzip request body");
  1363. return;
  1364. }
  1365. req_body.swap(uncompressed);
  1366. }
  1367. if (content_type == HTTP_CONTENT_PROTO) {
  1368. if (!ParsePbFromIOBuf(req, req_body)) {
  1369. cntl->SetFailed(EREQUEST, "Fail to parse http body as %s",
  1370. req->GetDescriptor()->full_name().c_str());
  1371. return;
  1372. }
  1373. } else {
  1374. butil::IOBufAsZeroCopyInputStream wrapper(req_body);
  1375. std::string err;
  1376. json2pb::Json2PbOptions options;
  1377. options.base64_to_bytes = sp->params.pb_bytes_to_base64;
  1378. cntl->set_pb_bytes_to_base64(sp->params.pb_bytes_to_base64);
  1379. if (!json2pb::JsonToProtoMessage(&wrapper, req, options, &err)) {
  1380. cntl->SetFailed(EREQUEST, "Fail to parse http body as %s, %s",
  1381. req->GetDescriptor()->full_name().c_str(), err.c_str());
  1382. return;
  1383. }
  1384. }
  1385. }
  1386. } else {
  1387. // A http server, just keep content as it is.
  1388. cntl->request_attachment().swap(req_body);
  1389. }
  1390. google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
  1391. imsg_guard.reset(); // optional, just release resourse ASAP
  1392. if (span) {
  1393. span->set_start_callback_us(butil::cpuwide_time_us());
  1394. span->AsParent();
  1395. }
  1396. if (!FLAGS_usercode_in_pthread) {
  1397. return svc->CallMethod(method, cntl, req, res, done);
  1398. }
  1399. if (BeginRunningUserCode()) {
  1400. svc->CallMethod(method, cntl, req, res, done);
  1401. return EndRunningUserCodeInPlace();
  1402. } else {
  1403. return EndRunningCallMethodInPool(svc, method, cntl, req, res, done);
  1404. }
  1405. }
  1406. bool ParseHttpServerAddress(butil::EndPoint* point, const char* server_addr_and_port) {
  1407. std::string scheme;
  1408. std::string host;
  1409. int port = -1;
  1410. if (ParseURL(server_addr_and_port, &scheme, &host, &port) != 0) {
  1411. LOG(ERROR) << "Invalid address=`" << server_addr_and_port << '\'';
  1412. return false;
  1413. }
  1414. if (scheme.empty() || scheme == "http") {
  1415. if (port < 0) {
  1416. port = 80;
  1417. }
  1418. } else if (scheme == "https") {
  1419. if (port < 0) {
  1420. port = 443;
  1421. }
  1422. } else {
  1423. LOG(ERROR) << "Invalid scheme=`" << scheme << '\'';
  1424. return false;
  1425. }
  1426. if (str2endpoint(host.c_str(), port, point) != 0 &&
  1427. hostname2endpoint(host.c_str(), port, point) != 0) {
  1428. LOG(ERROR) << "Invalid host=" << host << " port=" << port;
  1429. return false;
  1430. }
  1431. return true;
  1432. }
  1433. const std::string& GetHttpMethodName(
  1434. const google::protobuf::MethodDescriptor*,
  1435. const Controller* cntl) {
  1436. return cntl->http_request().uri().path();
  1437. }
  1438. } // namespace policy
  1439. } // namespace brpc