http_task.cc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. #include <stdlib.h>
  2. #include <stdio.h>
  3. #include <string.h>
  4. #include <string>
  5. #include "workflow/WFTaskError.h"
  6. #include "workflow/WFTaskFactory.h"
  7. #include "workflow/HttpUtil.h"
  8. #include "workflow/StringUtil.h"
  9. #include "rpc_module.h"
  10. #include "http_task.h"
  11. namespace srpc
  12. {
  13. using namespace protocol;
  14. #define HTTP_KEEPALIVE_MAX (300 * 1000)
  15. HttpClientTask::HttpClientTask(int redirect_max,
  16. int retry_max,
  17. http_callback_t&& callback,
  18. std::list<RPCModule *>&& modules) :
  19. WFComplexClientTask(retry_max, std::move(callback)),
  20. redirect_max_(redirect_max),
  21. redirect_count_(0),
  22. modules_(std::move(modules))
  23. {
  24. protocol::HttpRequest *client_req = this->get_req();
  25. client_req->set_method(HttpMethodGet);
  26. client_req->set_http_version("HTTP/1.1");
  27. }
  28. std::string HttpClientTask::get_uri_host() const
  29. {
  30. if (uri_.state == URI_STATE_SUCCESS)
  31. return uri_.host;
  32. return "";
  33. }
  34. std::string HttpClientTask::get_uri_port() const
  35. {
  36. if (uri_.state == URI_STATE_SUCCESS)
  37. return uri_.port;
  38. return "";
  39. }
  40. std::string HttpClientTask::get_url() const
  41. {
  42. if (url_.empty())
  43. return url_;
  44. return ""; //TODO: fill with uri and other info
  45. }
  46. std::string HttpClientTask::get_uri_scheme() const
  47. {
  48. if (uri_.state == URI_STATE_SUCCESS)
  49. return uri_.scheme;
  50. return "";
  51. }
  52. CommMessageOut *HttpClientTask::message_out()
  53. {
  54. HttpRequest *req = this->get_req();
  55. struct HttpMessageHeader header;
  56. bool is_alive;
  57. HttpServerTask::ModuleSeries *series;
  58. RPCModuleData *data = NULL;
  59. // prepare module_data from series to request
  60. series = dynamic_cast<HttpServerTask::ModuleSeries *>(series_of(this));
  61. if (series)
  62. {
  63. data = series->get_module_data();
  64. if (data != NULL)
  65. this->set_module_data(*data);
  66. }
  67. data = this->mutable_module_data();
  68. for (auto *module : modules_)
  69. module->client_task_begin(this, *data);
  70. http_set_header_module_data(*data, req);
  71. // from ComplexHttpTask::message_out()
  72. if (!req->is_chunked() && !req->has_content_length_header())
  73. {
  74. size_t body_size = req->get_output_body_size();
  75. const char *method = req->get_method();
  76. if (body_size != 0 || strcmp(method, "POST") == 0 ||
  77. strcmp(method, "PUT") == 0)
  78. {
  79. char buf[32];
  80. header.name = "Content-Length";
  81. header.name_len = strlen("Content-Length");
  82. header.value = buf;
  83. header.value_len = sprintf(buf, "%zu", body_size);
  84. req->add_header(&header);
  85. }
  86. }
  87. if (req->has_connection_header())
  88. is_alive = req->is_keep_alive();
  89. else
  90. {
  91. header.name = "Connection";
  92. header.name_len = strlen("Connection");
  93. is_alive = (this->keep_alive_timeo != 0);
  94. if (is_alive)
  95. {
  96. header.value = "Keep-Alive";
  97. header.value_len = strlen("Keep-Alive");
  98. }
  99. else
  100. {
  101. header.value = "close";
  102. header.value_len = strlen("close");
  103. }
  104. req->add_header(&header);
  105. }
  106. if (!is_alive)
  107. this->keep_alive_timeo = 0;
  108. else if (req->has_keep_alive_header())
  109. {
  110. HttpHeaderCursor req_cursor(req);
  111. //req---Connection: Keep-Alive
  112. //req---Keep-Alive: timeout=0,max=100
  113. header.name = "Keep-Alive";
  114. header.name_len = strlen("Keep-Alive");
  115. if (req_cursor.find(&header))
  116. {
  117. std::string keep_alive((const char *)header.value, header.value_len);
  118. std::vector<std::string> params = StringUtil::split(keep_alive, ',');
  119. for (const auto& kv : params)
  120. {
  121. std::vector<std::string> arr = StringUtil::split(kv, '=');
  122. if (arr.size() < 2)
  123. arr.emplace_back("0");
  124. std::string key = StringUtil::strip(arr[0]);
  125. std::string val = StringUtil::strip(arr[1]);
  126. if (strcasecmp(key.c_str(), "timeout") == 0)
  127. {
  128. this->keep_alive_timeo = 1000 * atoi(val.c_str());
  129. break;
  130. }
  131. }
  132. }
  133. if ((unsigned int)this->keep_alive_timeo > HTTP_KEEPALIVE_MAX)
  134. this->keep_alive_timeo = HTTP_KEEPALIVE_MAX;
  135. }
  136. return this->WFComplexClientTask::message_out();
  137. }
  138. CommMessageIn *HttpClientTask::message_in()
  139. {
  140. HttpResponse *resp = this->get_resp();
  141. if (strcmp(this->get_req()->get_method(), HttpMethodHead) == 0)
  142. resp->parse_zero_body();
  143. return this->WFComplexClientTask::message_in();
  144. }
  145. int HttpClientTask::keep_alive_timeout()
  146. {
  147. return this->resp.is_keep_alive() ? this->keep_alive_timeo : 0;
  148. }
  149. void HttpClientTask::set_empty_request()
  150. {
  151. HttpRequest *client_req = this->get_req();
  152. client_req->set_request_uri("/");
  153. client_req->set_header_pair("Host", "");
  154. }
  155. void HttpClientTask::init_failed()
  156. {
  157. this->set_empty_request();
  158. }
  159. bool HttpClientTask::init_success()
  160. {
  161. HttpRequest *client_req = this->get_req();
  162. std::string request_uri;
  163. std::string header_host;
  164. bool is_ssl;
  165. if (uri_.scheme && strcasecmp(uri_.scheme, "http") == 0)
  166. is_ssl = false;
  167. else if (uri_.scheme && strcasecmp(uri_.scheme, "https") == 0)
  168. is_ssl = true;
  169. else
  170. {
  171. this->state = WFT_STATE_TASK_ERROR;
  172. this->error = WFT_ERR_URI_SCHEME_INVALID;
  173. this->set_empty_request();
  174. return false;
  175. }
  176. //todo http+unix
  177. //https://stackoverflow.com/questions/26964595/whats-the-correct-way-to-use-a-unix-domain-socket-in-requests-framework
  178. //https://stackoverflow.com/questions/27037990/connecting-to-postgres-via-database-url-and-unix-socket-in-rails
  179. if (uri_.path && uri_.path[0])
  180. request_uri = uri_.path;
  181. else
  182. request_uri = "/";
  183. if (uri_.query && uri_.query[0])
  184. {
  185. request_uri += "?";
  186. request_uri += uri_.query;
  187. }
  188. if (uri_.host && uri_.host[0])
  189. header_host = uri_.host;
  190. if (uri_.port && uri_.port[0])
  191. {
  192. int port = atoi(uri_.port);
  193. if (is_ssl)
  194. {
  195. if (port != 443)
  196. {
  197. header_host += ":";
  198. header_host += uri_.port;
  199. }
  200. }
  201. else
  202. {
  203. if (port != 80)
  204. {
  205. header_host += ":";
  206. header_host += uri_.port;
  207. }
  208. }
  209. }
  210. this->WFComplexClientTask::set_transport_type(is_ssl ? TT_TCP_SSL : TT_TCP);
  211. client_req->set_request_uri(request_uri.c_str());
  212. client_req->set_header_pair("Host", header_host.c_str());
  213. return true;
  214. }
  215. bool HttpClientTask::redirect_url(HttpResponse *client_resp, ParsedURI& uri)
  216. {
  217. if (redirect_count_ < redirect_max_)
  218. {
  219. redirect_count_++;
  220. std::string url;
  221. HttpHeaderCursor cursor(client_resp);
  222. if (!cursor.find("Location", url) || url.empty())
  223. {
  224. this->state = WFT_STATE_TASK_ERROR;
  225. this->error = WFT_ERR_HTTP_BAD_REDIRECT_HEADER;
  226. return false;
  227. }
  228. if (url[0] == '/')
  229. {
  230. if (url[1] != '/')
  231. {
  232. if (uri.port)
  233. url = ':' + (uri.port + url);
  234. url = "//" + (uri.host + url);
  235. }
  236. url = uri.scheme + (':' + url);
  237. }
  238. URIParser::parse(url, uri);
  239. return true;
  240. }
  241. return false;
  242. }
  243. bool HttpClientTask::need_redirect(ParsedURI& uri)
  244. {
  245. HttpRequest *client_req = this->get_req();
  246. HttpResponse *client_resp = this->get_resp();
  247. const char *status_code_str = client_resp->get_status_code();
  248. const char *method = client_req->get_method();
  249. if (!status_code_str || !method)
  250. return false;
  251. int status_code = atoi(status_code_str);
  252. switch (status_code)
  253. {
  254. case 301:
  255. case 302:
  256. case 303:
  257. if (redirect_url(client_resp, uri))
  258. {
  259. if (strcasecmp(method, HttpMethodGet) != 0 &&
  260. strcasecmp(method, HttpMethodHead) != 0)
  261. {
  262. client_req->set_method(HttpMethodGet);
  263. }
  264. return true;
  265. }
  266. else
  267. break;
  268. case 307:
  269. case 308:
  270. if (redirect_url(client_resp, uri))
  271. return true;
  272. else
  273. break;
  274. default:
  275. break;
  276. }
  277. return false;
  278. }
  279. void HttpClientTask::check_response()
  280. {
  281. HttpResponse *resp = this->get_resp();
  282. resp->end_parsing();
  283. if (this->state == WFT_STATE_SYS_ERROR && this->error == ECONNRESET)
  284. {
  285. /* Servers can end the message by closing the connection. */
  286. if (resp->is_header_complete() &&
  287. !resp->is_keep_alive() &&
  288. !resp->is_chunked() &&
  289. !resp->has_content_length_header())
  290. {
  291. this->state = WFT_STATE_SUCCESS;
  292. this->error = 0;
  293. }
  294. }
  295. }
  296. bool HttpClientTask::finish_once()
  297. {
  298. if (this->state != WFT_STATE_SUCCESS)
  299. this->check_response();
  300. if (this->state == WFT_STATE_SUCCESS)
  301. {
  302. if (this->need_redirect(uri_))
  303. this->set_redirect(uri_);
  304. else if (this->state != WFT_STATE_SUCCESS)
  305. this->disable_retry();
  306. }
  307. return true;
  308. }
  309. /**********Server**********/
  310. void HttpServerTask::handle(int state, int error)
  311. {
  312. if (state == WFT_STATE_TOREPLY)
  313. {
  314. HttpRequest *req = this->get_req();
  315. // from WFHttpServerTask::handle()
  316. req_is_alive_ = req->is_keep_alive();
  317. if (req_is_alive_ && req->has_keep_alive_header())
  318. {
  319. HttpHeaderCursor req_cursor(req);
  320. struct HttpMessageHeader header;
  321. header.name = "Keep-Alive";
  322. header.name_len = strlen("Keep-Alive");
  323. req_has_keep_alive_header_ = req_cursor.find(&header);
  324. if (req_has_keep_alive_header_)
  325. {
  326. req_keep_alive_.assign((const char *)header.value,
  327. header.value_len);
  328. }
  329. }
  330. this->state = WFT_STATE_TOREPLY;
  331. this->target = this->get_target();
  332. // fill module data from request to series
  333. ModuleSeries *series = new ModuleSeries(this);
  334. http_get_header_module_data(req, this->module_data_);
  335. for (auto *module : this->modules_)
  336. {
  337. if (module)
  338. module->server_task_begin(this, this->module_data_);
  339. }
  340. series->set_module_data(this->mutable_module_data());
  341. series->start();
  342. }
  343. else if (this->state == WFT_STATE_TOREPLY)
  344. {
  345. this->state = state;
  346. this->error = error;
  347. if (error == ETIMEDOUT)
  348. this->timeout_reason = TOR_TRANSMIT_TIMEOUT;
  349. // prepare module_data from series to response
  350. for (auto *module : modules_)
  351. module->server_task_end(this, this->module_data_);
  352. http_set_header_module_data(this->module_data_, this->get_resp());
  353. this->subtask_done();
  354. }
  355. else
  356. delete this;
  357. }
  358. CommMessageOut *HttpServerTask::message_out()
  359. {
  360. HttpResponse *resp = this->get_resp();
  361. struct HttpMessageHeader header;
  362. if (!resp->get_http_version())
  363. resp->set_http_version("HTTP/1.1");
  364. const char *status_code_str = resp->get_status_code();
  365. if (!status_code_str || !resp->get_reason_phrase())
  366. {
  367. int status_code;
  368. if (status_code_str)
  369. status_code = atoi(status_code_str);
  370. else
  371. status_code = HttpStatusOK;
  372. HttpUtil::set_response_status(resp, status_code);
  373. }
  374. if (!resp->is_chunked() && !resp->has_content_length_header())
  375. {
  376. char buf[32];
  377. header.name = "Content-Length";
  378. header.name_len = strlen("Content-Length");
  379. header.value = buf;
  380. header.value_len = sprintf(buf, "%zu", resp->get_output_body_size());
  381. resp->add_header(&header);
  382. }
  383. bool is_alive;
  384. if (resp->has_connection_header())
  385. is_alive = resp->is_keep_alive();
  386. else
  387. is_alive = req_is_alive_;
  388. if (!is_alive)
  389. this->keep_alive_timeo = 0;
  390. else
  391. {
  392. //req---Connection: Keep-Alive
  393. //req---Keep-Alive: timeout=5,max=100
  394. if (req_has_keep_alive_header_)
  395. {
  396. int flag = 0;
  397. std::vector<std::string> params = StringUtil::split(req_keep_alive_, ',');
  398. for (const auto& kv : params)
  399. {
  400. std::vector<std::string> arr = StringUtil::split(kv, '=');
  401. if (arr.size() < 2)
  402. arr.emplace_back("0");
  403. std::string key = StringUtil::strip(arr[0]);
  404. std::string val = StringUtil::strip(arr[1]);
  405. if (!(flag & 1) && strcasecmp(key.c_str(), "timeout") == 0)
  406. {
  407. flag |= 1;
  408. // keep_alive_timeo = 5000ms when Keep-Alive: timeout=5
  409. this->keep_alive_timeo = 1000 * atoi(val.c_str());
  410. if (flag == 3)
  411. break;
  412. }
  413. else if (!(flag & 2) && strcasecmp(key.c_str(), "max") == 0)
  414. {
  415. flag |= 2;
  416. if (this->get_seq() >= atoi(val.c_str()))
  417. {
  418. this->keep_alive_timeo = 0;
  419. break;
  420. }
  421. if (flag == 3)
  422. break;
  423. }
  424. }
  425. }
  426. if ((unsigned int)this->keep_alive_timeo > HTTP_KEEPALIVE_MAX)
  427. this->keep_alive_timeo = HTTP_KEEPALIVE_MAX;
  428. //if (this->keep_alive_timeo < 0 || this->keep_alive_timeo > HTTP_KEEPALIVE_MAX)
  429. }
  430. if (!resp->has_connection_header())
  431. {
  432. header.name = "Connection";
  433. header.name_len = 10;
  434. if (this->keep_alive_timeo == 0)
  435. {
  436. header.value = "close";
  437. header.value_len = 5;
  438. }
  439. else
  440. {
  441. header.value = "Keep-Alive";
  442. header.value_len = 10;
  443. }
  444. resp->add_header(&header);
  445. }
  446. return this->WFServerTask::message_out();
  447. }
  448. } // end namespace srpc