Pārlūkot izejas kodu

Add HTTP server/client with AOP modules, including trace and metrics.

holmes1412 1 gadu atpakaļ
vecāks
revīzija
13c81f0e1d

+ 4 - 0
CMakeLists_Headers.txt

@@ -21,6 +21,10 @@ set(SRC_HEADERS
 	src/module/rpc_filter.h
 	src/module/rpc_trace_filter.h
 	src/module/rpc_metrics_filter.h
+	src/http/http_task.h
+	src/http/http_module.h
+	src/http/http_client.h
+	src/http/http_server.h
 	src/rpc_basic.h
 	src/rpc_buffer.h
 	src/rpc_client.h

+ 5 - 1
src/CMakeLists.txt

@@ -76,13 +76,14 @@ add_subdirectory(var)
 add_subdirectory(thrift)
 add_subdirectory(compress)
 add_subdirectory(message)
+add_subdirectory(http)
 
 add_dependencies(module LINK_HEADERS)
 add_dependencies(var LINK_HEADERS)
 add_dependencies(thrift LINK_HEADERS)
 add_dependencies(compress LINK_HEADERS)
 add_dependencies(message LINK_HEADERS)
-
+add_dependencies(http LINK_HEADERS)
 
 if (WIN32)
 	add_library(
@@ -93,6 +94,7 @@ if (WIN32)
 		$<TARGET_OBJECTS:thrift>
 		$<TARGET_OBJECTS:compress>
 		$<TARGET_OBJECTS:message>
+		$<TARGET_OBJECTS:http>
 	)
 
 	add_dependencies(${PROJECT_NAME} LINK_HEADERS)
@@ -124,6 +126,7 @@ else ()
 		$<TARGET_OBJECTS:thrift>
 		$<TARGET_OBJECTS:compress>
 		$<TARGET_OBJECTS:message>
+		$<TARGET_OBJECTS:http>
 	)
 
 	add_library(
@@ -134,6 +137,7 @@ else ()
 		$<TARGET_OBJECTS:thrift>
 		$<TARGET_OBJECTS:compress>
 		$<TARGET_OBJECTS:message>
+		$<TARGET_OBJECTS:http>
 	)
 
 	target_link_libraries(${SHARED_LIB_NAME} 

+ 14 - 0
src/http/CMakeLists.txt

@@ -0,0 +1,14 @@
+cmake_minimum_required(VERSION 3.6)
+project(http)
+
+include_directories(${CMAKE_CURRENT_BINARY_DIR})
+
+set(SRC
+	http_module.cc
+	http_task.cc
+	http_client.cc
+	http_server.cc
+)
+
+add_library(${PROJECT_NAME} OBJECT ${SRC})
+

+ 143 - 0
src/http/http_client.cc

@@ -0,0 +1,143 @@
+/*
+  Copyright (c) 2023 Sogou, Inc.
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+#include "http_client.h"
+#include "http_task.h"
+#include "http_module.h"
+
+namespace srpc
+{
+
+HttpClient::HttpClient(const std::string& url) : 
+	params(HTTP_CLIENT_PARAMS_DEFAULT)
+{
+//	TODO: make sure this is necessary
+	this->params.url = url;
+	this->init();
+}
+
+HttpClient::HttpClient(const ParsedURI& uri) :
+	params(HTTP_CLIENT_PARAMS_DEFAULT)
+{
+	this->params.uri = uri;
+	if (this->params.uri.scheme &&
+		strcasecmp(this->params.uri.scheme, "https") == 0)
+	{
+		this->params.is_ssl = true;
+//		this->params.port = HTTP_SSL_PORT_DEFAULT;
+	}
+}
+
+HttpClient::HttpClient(const HttpClientParams *params) :
+	params(*params)
+{
+	this->init();
+}
+
+WFHttpTask *HttpClient::create_http_task(http_callback_t callback)
+{
+	std::list<RPCModule *> module;
+	for (int i = 0; i < SRPC_MODULE_MAX; i++)
+	{
+		if (this->modules[i])
+			module.push_back(this->modules[i]);
+	}
+
+	auto&& cb = std::bind(&HttpClient::callback, this, std::placeholders::_1);
+
+	HttpClientTask *task = new HttpClientTask(this->params.task_params.redirect_max,
+											  this->params.task_params.retry_max,
+											  cb, std::move(module));
+
+	task->user_callback = std::move(callback);
+	this->task_init(task);
+
+	return task;
+}
+
+void HttpClient::init()
+{
+	URIParser::parse(this->params.url, this->params.uri);
+
+	if (this->params.uri.scheme &&
+		strcasecmp(this->params.uri.scheme, "https") == 0)
+	{
+		this->params.is_ssl = true;
+//		this->params.port = HTTP_SSL_PORT_DEFAULT;
+	}
+}
+
+void HttpClient::task_init(HttpClientTask *task)
+{
+	// set task by this->params;
+	task->init(this->params.uri);
+	task->set_transport_type(this->params.is_ssl ? TT_TCP_SSL : TT_TCP);
+	task->set_send_timeout(this->params.task_params.send_timeout);
+	task->set_receive_timeout(this->params.task_params.receive_timeout);
+	task->set_keep_alive(this->params.task_params.keep_alive_timeout);
+}
+
+void HttpClient::callback(WFHttpTask *task)
+{
+	RPCModuleData resp_data;
+	http_get_header_module_data(task->get_resp(), resp_data);
+	
+	for (int i = 0; i < SRPC_MODULE_MAX; i++)
+	{
+		if (this->modules[i])
+			this->modules[i]->client_task_end(task, resp_data);
+	}
+
+	HttpClientTask *client_task = (HttpClientTask *)task;
+	if (client_task->user_callback)
+		client_task->user_callback(task);
+}
+
+void HttpClient::add_filter(RPCFilter *filter)
+{
+	int type = filter->get_module_type();
+
+	this->mutex.lock();
+	if (type < SRPC_MODULE_MAX && type >= 0)
+	{
+		RPCModule *module = this->modules[type];
+
+		if (!module)
+		{
+			switch (type)
+			{
+			case RPCModuleTypeTrace:
+				module = new HttpTraceModule();
+				break;
+			case RPCModuleTypeMetrics:
+				module = new HttpMetricsModule();
+				break;
+			default:
+				break;
+			}
+			this->modules[type] = module;
+		}
+
+		if (module)
+			module->add_filter(filter);
+	}
+
+	this->mutex.unlock();
+	return;
+}
+
+} // end namespace srpc
+

+ 98 - 0
src/http/http_client.h

@@ -0,0 +1,98 @@
+/*
+  Copyright (c) 2023 Sogou, Inc.
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+#ifndef __RPC_HTTP_CLIENT_H__
+#define __RPC_HTTP_CLIENT_H__
+
+#include "workflow/WFTask.h"
+#include "workflow/HttpMessage.h"
+#include "workflow/WFTaskFactory.h"
+#include "rpc_options.h"
+#include "rpc_basic.h"
+#include "http_task.h"
+
+namespace srpc
+{
+
+#define HTTP_REDIRECT_DEFAULT	2
+#define HTTP_RETRY_DEFAULT		5
+#define HTTP_KEEPALIVE_DEFAULT	(60 * 1000)
+
+struct HttpTaskParams
+{
+	int send_timeout;
+	int receive_timeout;
+	int watch_timeout;
+	int keep_alive_timeout;
+	int redirect_max;
+	int retry_max;
+};
+
+struct HttpClientParams
+{
+	HttpTaskParams task_params;
+	bool is_ssl;
+	std::string url; // can be empty
+	ParsedURI uri;
+};
+
+static constexpr struct HttpTaskParams HTTP_TASK_PARAMS_DEFAULT =
+{
+/*	.send_timeout		=	*/	-1,
+/*	.receive_timeout	=	*/	-1,
+/*	.watch_timeout		=	*/	0,
+/*	.keep_alive_timeout	=	*/	HTTP_KEEPALIVE_DEFAULT,
+/*	.retry_max			=	*/	HTTP_REDIRECT_DEFAULT,
+/*	.redirect_max		=	*/	HTTP_RETRY_DEFAULT,
+};
+
+static const struct HttpClientParams HTTP_CLIENT_PARAMS_DEFAULT = 
+{
+/*	.task_params		=	*/	HTTP_TASK_PARAMS_DEFAULT,
+/*	.is_ssl				=	*/	false,
+/*	.url				=	*/	"",
+/*	.uri				=	*/
+};
+
+class HttpClient
+{
+public:
+	WFHttpTask *create_http_task(http_callback_t callback);
+	void add_filter(RPCFilter *filter);
+
+public:
+	HttpClient(const std::string& url);
+	HttpClient(const ParsedURI& uri);
+	HttpClient(const HttpClientParams *params);
+
+private:
+	void callback(WFHttpTask *task);
+	void init();
+	void task_init(HttpClientTask *task);
+
+protected:
+	HttpClientParams params;
+	ParsedURI uri;
+
+private:
+	std::mutex mutex;
+	RPCModule *modules[SRPC_MODULE_MAX] = { 0 };
+};
+
+} // end namespace srpc
+
+#endif
+

+ 148 - 0
src/http/http_module.cc

@@ -0,0 +1,148 @@
+/*
+  Copyright (c) 2023 Sogou, Inc.
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+#include "http_module.h"
+
+namespace srpc
+{
+
+bool HttpTraceModule::client_begin(SubTask *task, RPCModuleData& data)
+{
+	TraceModule<HttpServerTask, HttpClientTask>::client_begin(task, data);
+
+	auto *client_task = static_cast<HttpClientTask *>(task);
+	auto *req = client_task->get_req();
+
+	data[SRPC_COMPONENT] = SRPC_COMPONENT_SRPC;
+	data[SRPC_HTTP_METHOD] = req->get_method();
+
+	return true;
+}
+
+bool HttpTraceModule::client_end(SubTask *task, RPCModuleData& data)
+{
+	TraceModule<HttpServerTask, HttpClientTask>::client_end(task, data);
+
+//	std::string ip;
+//	unsigned short port;
+	auto *client_task = static_cast<HttpClientTask *>(task);
+	auto *resp = client_task->get_resp();
+
+	if (client_task->get_state() == WFT_STATE_SUCCESS)
+		data[SRPC_HTTP_STATUS_CODE] = resp->get_status_code();
+	else
+		data[SRPC_ERROR] = client_task->get_error();
+
+	return true;
+}
+
+bool HttpTraceModule::server_begin(SubTask *task, RPCModuleData& data)
+{
+	TraceModule<HttpServerTask, HttpClientTask>::server_begin(task, data);
+
+	auto *server_task = static_cast<HttpServerTask *>(task);
+	auto *req = server_task->get_req();
+
+	data[SRPC_COMPONENT] = SRPC_COMPONENT_SRPC;
+	data[SRPC_HTTP_METHOD] = req->get_method();
+	data[SRPC_HTTP_TARGET] = req->get_request_uri();
+//	data[SRPC_HTTP_HOST_NAME] = server_task->hostname;
+//	data[SRPC_HTTP_HOST_PORT] = server_task->port;
+
+	char addrstr[128];
+	struct sockaddr_storage addr;
+	socklen_t l = sizeof addr;
+	unsigned short port = 0;
+
+	server_task->get_peer_addr((struct sockaddr *)&addr, &l);
+	if (addr.ss_family == AF_INET)
+	{
+		data[SRPC_HTTP_SOCK_FAMILY] = "inet";
+
+		struct sockaddr_in *sin = (struct sockaddr_in *)&addr;
+		inet_ntop(AF_INET, &sin->sin_addr, addrstr, 128);
+		port = ntohs(sin->sin_port);
+	}
+	else if (addr.ss_family == AF_INET6)
+	{
+		data[SRPC_HTTP_SOCK_FAMILY] = "inet6";
+
+		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&addr;
+		inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, 128);
+		port = ntohs(sin6->sin6_port);
+	}
+	// else : Unknown
+
+	data[SRPC_HTTP_SOCK_ADDR] = addrstr;
+	data[SRPC_HTTP_SOCK_PORT] = std::to_string(port);
+
+/*
+	protocol::HttpHeaderCursor req_cursor(req);
+	while (req_cursor.next(name, value))
+	{
+		if (name.casecmp("X-Forwarded-For") == 0)
+			data[SRPC_HTTP_CLIENT_IP] = value;
+	}
+*/
+	return true;
+}
+
+bool HttpTraceModule::server_end(SubTask *task, RPCModuleData& data)
+{
+	TraceModule<HttpServerTask, HttpClientTask>::server_end(task, data);
+
+/*
+	auto *server_task = static_cast<HttpServerTask *>(task);
+	auto *resp = server_task->get_resp();
+
+	data[SRPC_STATE] = std::to_string(resp->get_status_code());
+	data[SRPC_ERROR] = std::to_string(resp->get_error());
+*/
+	return true;
+}
+
+bool HttpMetricsModule::client_begin(SubTask *task, RPCModuleData& data)
+{
+	MetricsModule<HttpServerTask, HttpClientTask>::client_begin(task, data);
+
+/*
+	auto *client_task = static_cast<HttpClientTask *>(task);
+	auto *req = client_task->get_req();
+
+	data[OTLP_SERVICE_NAME] = req->get_service_name();
+	data[OTLP_METHOD_NAME] = req->get_method_name();
+*/
+
+	return true;
+}
+
+bool HttpMetricsModule::server_begin(SubTask *task, RPCModuleData& data)
+{
+	MetricsModule<HttpServerTask, HttpClientTask>::server_begin(task, data);
+
+/*
+	auto *server_task = static_cast<HttpServerTask *>(task);
+	auto *req = server_task->get_req();
+
+	data[OTLP_SERVICE_NAME] = req->get_service_name();
+	data[OTLP_METHOD_NAME] = req->get_method_name();
+*/
+
+	return true;
+}
+
+} // end namespace srpc
+

+ 48 - 0
src/http/http_module.h

@@ -0,0 +1,48 @@
+/*
+  Copyright (c) 2023 Sogou, Inc.
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+#ifndef __RPC_HTTP_MODULE_H__
+#define __RPC_HTTP_MODULE_H__
+
+#include "rpc_basic.h"
+#include "rpc_module.h"
+#include "rpc_trace_module.h"
+#include "rpc_metrics_module.h"
+#include "http_task.h"
+
+namespace srpc
+{
+
+class HttpTraceModule : public TraceModule<HttpServerTask, HttpClientTask>
+{
+public:
+	bool client_begin(SubTask *task, RPCModuleData& data) override;
+	bool client_end(SubTask *task, RPCModuleData& data) override;
+	bool server_begin(SubTask *task, RPCModuleData& data) override;
+	bool server_end(SubTask *task, RPCModuleData& data) override;
+};
+
+class HttpMetricsModule : public MetricsModule<HttpServerTask, HttpClientTask>
+{
+public:
+	bool client_begin(SubTask *task, RPCModuleData& data) override;
+	bool server_begin(SubTask *task, RPCModuleData& data) override;
+};
+
+} // end namespace srpc
+
+#endif
+

+ 77 - 0
src/http/http_server.cc

@@ -0,0 +1,77 @@
+/*
+  Copyright (c) 2023 Sogou, Inc.
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+#include "http_server.h"
+#include "http_task.h"
+#include "http_module.h"
+
+namespace srpc
+{
+
+void HttpServer::add_filter(RPCFilter *filter)
+{
+	int type = filter->get_module_type();
+
+	this->mutex.lock();
+	if (type < SRPC_MODULE_MAX && type >= 0)
+	{
+		RPCModule *module = this->modules[type];
+
+		if (!module)
+		{
+			switch (type)
+			{
+			case RPCModuleTypeTrace:
+				module = new HttpTraceModule();
+				break;
+			case RPCModuleTypeMetrics:
+				module = new HttpMetricsModule();
+				break;
+			default:
+				break;
+			}
+			this->modules[type] = module;
+		}
+
+		if (module)
+			module->add_filter(filter);
+	}
+
+	this->mutex.unlock();
+	return;
+}
+
+CommSession *HttpServer::new_session(long long seq, CommConnection *conn)
+{
+	WFHttpTask *task;
+
+	std::list<RPCModule *> module;
+	for (int i = 0; i < SRPC_MODULE_MAX; i++)
+	{
+		if (this->modules[i])
+			module.push_back(this->modules[i]);
+	}
+
+	task = new HttpServerTask(this, this->process, std::move(module));
+	task->set_keep_alive(this->params.keep_alive_timeout);
+	task->set_receive_timeout(this->params.receive_timeout);
+	task->get_req()->set_size_limit(this->params.request_size_limit);
+
+	return task;
+}
+
+} // namespace srpc
+

+ 45 - 0
src/http/http_server.h

@@ -0,0 +1,45 @@
+/*
+  Copyright (c) 2023 Sogou, Inc.
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+#ifndef __RPC_HTTP_SERVER_H__
+#define __RPC_HTTP_SERVER_H__
+
+#include "workflow/WFHttpServer.h"
+#include "rpc_basic.h"
+#include "rpc_filter.h"
+#include "rpc_module.h"
+
+namespace srpc
+{
+
+class HttpServer : public WFHttpServer
+{
+public:
+	HttpServer(http_process_t proc) : WFHttpServer(std::move(proc)) {}
+	void add_filter(RPCFilter *filter);
+
+protected:
+	CommSession *new_session(long long seq, CommConnection *conn) override;
+
+private:
+	std::mutex mutex;
+	RPCModule *modules[SRPC_MODULE_MAX] = { NULL };
+};
+
+} // namespace srpc
+
+#endif
+

+ 495 - 0
src/http/http_task.cc

@@ -0,0 +1,495 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <string>
+#include "workflow/WFTaskError.h"
+#include "workflow/WFTaskFactory.h"
+#include "workflow/HttpUtil.h"
+#include "workflow/StringUtil.h"
+#include "rpc_module.h"
+#include "http_task.h"
+
+namespace srpc
+{
+using namespace protocol;
+
+#define HTTP_KEEPALIVE_MAX		(300 * 1000)
+
+CommMessageOut *HttpClientTask::message_out()
+{
+	HttpRequest *req = this->get_req();
+	struct HttpMessageHeader header;
+	bool is_alive;
+	HttpServerTask::ModuleSeries *series;
+	RPCModuleData *data = NULL;
+
+	// prepare module_data from series to request
+	series = dynamic_cast<HttpServerTask::ModuleSeries *>(series_of(this));
+
+	if (series)
+	{
+		data = series->get_module_data();
+		if (data != NULL)
+			this->set_module_data(*data);
+	}
+
+	data = this->mutable_module_data();
+
+	for (auto *module : modules_)
+		module->client_task_begin(this, *data);
+
+	http_set_header_module_data(*data, req);
+
+	// from ComplexHttpTask::message_out()
+	if (!req->is_chunked() && !req->has_content_length_header())
+	{
+		size_t body_size = req->get_output_body_size();
+		const char *method = req->get_method();
+
+		if (body_size != 0 || strcmp(method, "POST") == 0 ||
+							  strcmp(method, "PUT") == 0)
+		{
+			char buf[32];
+			header.name = "Content-Length";
+			header.name_len = strlen("Content-Length");
+			header.value = buf;
+			header.value_len = sprintf(buf, "%zu", body_size);
+			req->add_header(&header);
+		}
+	}
+
+	if (req->has_connection_header())
+		is_alive = req->is_keep_alive();
+	else
+	{
+		header.name = "Connection";
+		header.name_len = strlen("Connection");
+		is_alive = (this->keep_alive_timeo != 0);
+		if (is_alive)
+		{
+			header.value = "Keep-Alive";
+			header.value_len = strlen("Keep-Alive");
+		}
+		else
+		{
+			header.value = "close";
+			header.value_len = strlen("close");
+		}
+
+		req->add_header(&header);
+	}
+
+	if (!is_alive)
+		this->keep_alive_timeo = 0;
+	else if (req->has_keep_alive_header())
+	{
+		HttpHeaderCursor req_cursor(req);
+
+		//req---Connection: Keep-Alive
+		//req---Keep-Alive: timeout=0,max=100
+		header.name = "Keep-Alive";
+		header.name_len = strlen("Keep-Alive");
+		if (req_cursor.find(&header))
+		{
+			std::string keep_alive((const char *)header.value, header.value_len);
+			std::vector<std::string> params = StringUtil::split(keep_alive, ',');
+
+			for (const auto& kv : params)
+			{
+				std::vector<std::string> arr = StringUtil::split(kv, '=');
+				if (arr.size() < 2)
+					arr.emplace_back("0");
+
+				std::string key = StringUtil::strip(arr[0]);
+				std::string val = StringUtil::strip(arr[1]);
+				if (strcasecmp(key.c_str(), "timeout") == 0)
+				{
+					this->keep_alive_timeo = 1000 * atoi(val.c_str());
+					break;
+				}
+			}
+		}
+
+		if ((unsigned int)this->keep_alive_timeo > HTTP_KEEPALIVE_MAX)
+			this->keep_alive_timeo = HTTP_KEEPALIVE_MAX;
+	}
+
+	return this->WFComplexClientTask::message_out();
+}
+
+CommMessageIn *HttpClientTask::message_in()
+{
+	HttpResponse *resp = this->get_resp();
+
+	if (strcmp(this->get_req()->get_method(), HttpMethodHead) == 0)
+		resp->parse_zero_body();
+
+	return this->WFComplexClientTask::message_in();
+}
+
+int HttpClientTask::keep_alive_timeout()
+{
+	return this->resp.is_keep_alive() ? this->keep_alive_timeo : 0;
+}
+
+void HttpClientTask::set_empty_request()
+{
+	HttpRequest *client_req = this->get_req();
+	client_req->set_request_uri("/");
+	client_req->set_header_pair("Host", "");
+}
+
+void HttpClientTask::init_failed()
+{
+	this->set_empty_request();
+}
+
+bool HttpClientTask::init_success()
+{
+	HttpRequest *client_req = this->get_req();
+	std::string request_uri;
+	std::string header_host;
+	bool is_ssl;
+
+	if (uri_.scheme && strcasecmp(uri_.scheme, "http") == 0)
+		is_ssl = false;
+	else if (uri_.scheme && strcasecmp(uri_.scheme, "https") == 0)
+		is_ssl = true;
+	else
+	{
+		this->state = WFT_STATE_TASK_ERROR;
+		this->error = WFT_ERR_URI_SCHEME_INVALID;
+		this->set_empty_request();
+		return false;
+	}
+
+	//todo http+unix
+	//https://stackoverflow.com/questions/26964595/whats-the-correct-way-to-use-a-unix-domain-socket-in-requests-framework
+	//https://stackoverflow.com/questions/27037990/connecting-to-postgres-via-database-url-and-unix-socket-in-rails
+
+	if (uri_.path && uri_.path[0])
+		request_uri = uri_.path;
+	else
+		request_uri = "/";
+
+	if (uri_.query && uri_.query[0])
+	{
+		request_uri += "?";
+		request_uri += uri_.query;
+	}
+
+	if (uri_.host && uri_.host[0])
+		header_host = uri_.host;
+
+	if (uri_.port && uri_.port[0])
+	{
+		int port = atoi(uri_.port);
+
+		if (is_ssl)
+		{
+			if (port != 443)
+			{
+				header_host += ":";
+				header_host += uri_.port;
+			}
+		}
+		else
+		{
+			if (port != 80)
+			{
+				header_host += ":";
+				header_host += uri_.port;
+			}
+		}
+	}
+
+	this->WFComplexClientTask::set_transport_type(is_ssl ? TT_TCP_SSL : TT_TCP);
+	client_req->set_request_uri(request_uri.c_str());
+	client_req->set_header_pair("Host", header_host.c_str());
+	return true;
+}
+
+bool HttpClientTask::redirect_url(HttpResponse *client_resp, ParsedURI& uri)
+{
+	if (redirect_count_ < redirect_max_)
+	{
+		redirect_count_++;
+		std::string url;
+		HttpHeaderCursor cursor(client_resp);
+
+		if (!cursor.find("Location", url) || url.empty())
+		{
+			this->state = WFT_STATE_TASK_ERROR;
+			this->error = WFT_ERR_HTTP_BAD_REDIRECT_HEADER;
+			return false;
+		}
+
+		if (url[0] == '/')
+		{
+			if (url[1] != '/')
+			{
+				if (uri.port)
+					url = ':' + (uri.port + url);
+
+				url = "//" + (uri.host + url);
+			}
+
+			url = uri.scheme + (':' + url);
+		}
+
+		URIParser::parse(url, uri);
+		return true;
+	}
+
+	return false;
+}
+
+bool HttpClientTask::need_redirect(ParsedURI& uri)
+{
+	HttpRequest *client_req = this->get_req();
+	HttpResponse *client_resp = this->get_resp();
+	const char *status_code_str = client_resp->get_status_code();
+	const char *method = client_req->get_method();
+
+	if (!status_code_str || !method)
+		return false;
+
+	int status_code = atoi(status_code_str);
+
+	switch (status_code)
+	{
+	case 301:
+	case 302:
+	case 303:
+		if (redirect_url(client_resp, uri))
+		{
+			if (strcasecmp(method, HttpMethodGet) != 0 &&
+				strcasecmp(method, HttpMethodHead) != 0)
+			{
+				client_req->set_method(HttpMethodGet);
+			}
+
+			return true;
+		}
+		else
+			break;
+
+	case 307:
+	case 308:
+		if (redirect_url(client_resp, uri))
+			return true;
+		else
+			break;
+
+	default:
+		break;
+	}
+
+	return false;
+}
+
+void HttpClientTask::check_response()
+{
+	HttpResponse *resp = this->get_resp();
+
+	resp->end_parsing();
+	if (this->state == WFT_STATE_SYS_ERROR && this->error == ECONNRESET)
+	{
+		/* Servers can end the message by closing the connection. */
+		if (resp->is_header_complete() &&
+			!resp->is_keep_alive() &&
+			!resp->is_chunked() &&
+			!resp->has_content_length_header())
+		{
+			this->state = WFT_STATE_SUCCESS;
+			this->error = 0;
+		}
+	}
+}
+
+bool HttpClientTask::finish_once()
+{
+	if (this->state != WFT_STATE_SUCCESS)
+		this->check_response();
+
+	if (this->state == WFT_STATE_SUCCESS)
+	{
+		if (this->need_redirect(uri_))
+			this->set_redirect(uri_);
+		else if (this->state != WFT_STATE_SUCCESS)
+			this->disable_retry();
+	}
+
+	return true;
+}
+
+/**********Server**********/
+
+void HttpServerTask::handle(int state, int error)
+{
+	if (state == WFT_STATE_TOREPLY)
+	{
+		HttpRequest *req = this->get_req();
+
+		// from WFHttpServerTask::handle()
+		req_is_alive_ = req->is_keep_alive();
+		if (req_is_alive_ && req->has_keep_alive_header())
+		{
+			HttpHeaderCursor req_cursor(req);
+			struct HttpMessageHeader header;
+
+			header.name = "Keep-Alive";
+			header.name_len = strlen("Keep-Alive");
+			req_has_keep_alive_header_ = req_cursor.find(&header);
+			if (req_has_keep_alive_header_)
+			{
+				req_keep_alive_.assign((const char *)header.value,
+										header.value_len);
+			}
+		}
+
+		this->state = WFT_STATE_TOREPLY;
+		this->target = this->get_target();
+
+		// fill module data from request to series
+		ModuleSeries *series = new ModuleSeries(this);
+
+		http_get_header_module_data(req, this->module_data_);
+		for (auto *module : this->modules_)
+		{
+			if (module)
+				module->server_task_begin(this, this->module_data_);
+		}
+
+		series->set_module_data(this->mutable_module_data());
+		series->start();
+	}
+	else if (this->state == WFT_STATE_TOREPLY)
+	{
+		this->state = state;
+		this->error = error;
+		if (error == ETIMEDOUT)
+			this->timeout_reason = TOR_TRANSMIT_TIMEOUT;
+
+		// prepare module_data from series to response
+		for (auto *module : modules_)
+			module->server_task_end(this, this->module_data_);
+
+		http_set_header_module_data(this->module_data_, this->get_resp());
+
+		this->subtask_done();
+	}
+	else
+		delete this;
+}
+
+CommMessageOut *HttpServerTask::message_out()
+{
+	HttpResponse *resp = this->get_resp();
+	struct HttpMessageHeader header;
+
+	if (!resp->get_http_version())
+		resp->set_http_version("HTTP/1.1");
+
+	const char *status_code_str = resp->get_status_code();
+	if (!status_code_str || !resp->get_reason_phrase())
+	{
+		int status_code;
+
+		if (status_code_str)
+			status_code = atoi(status_code_str);
+		else
+			status_code = HttpStatusOK;
+
+		HttpUtil::set_response_status(resp, status_code);
+	}
+
+	if (!resp->is_chunked() && !resp->has_content_length_header())
+	{
+		char buf[32];
+		header.name = "Content-Length";
+		header.name_len = strlen("Content-Length");
+		header.value = buf;
+		header.value_len = sprintf(buf, "%zu", resp->get_output_body_size());
+		resp->add_header(&header);
+	}
+
+	bool is_alive;
+
+	if (resp->has_connection_header())
+		is_alive = resp->is_keep_alive();
+	else
+		is_alive = req_is_alive_;
+
+	if (!is_alive)
+		this->keep_alive_timeo = 0;
+	else
+	{
+		//req---Connection: Keep-Alive
+		//req---Keep-Alive: timeout=5,max=100
+
+		if (req_has_keep_alive_header_)
+		{
+			int flag = 0;
+			std::vector<std::string> params = StringUtil::split(req_keep_alive_, ',');
+
+			for (const auto& kv : params)
+			{
+				std::vector<std::string> arr = StringUtil::split(kv, '=');
+				if (arr.size() < 2)
+					arr.emplace_back("0");
+
+				std::string key = StringUtil::strip(arr[0]);
+				std::string val = StringUtil::strip(arr[1]);
+				if (!(flag & 1) && strcasecmp(key.c_str(), "timeout") == 0)
+				{
+					flag |= 1;
+					// keep_alive_timeo = 5000ms when Keep-Alive: timeout=5
+					this->keep_alive_timeo = 1000 * atoi(val.c_str());
+					if (flag == 3)
+						break;
+				}
+				else if (!(flag & 2) && strcasecmp(key.c_str(), "max") == 0)
+				{
+					flag |= 2;
+					if (this->get_seq() >= atoi(val.c_str()))
+					{
+						this->keep_alive_timeo = 0;
+						break;
+					}
+
+					if (flag == 3)
+						break;
+				}
+			}
+		}
+
+		if ((unsigned int)this->keep_alive_timeo > HTTP_KEEPALIVE_MAX)
+			this->keep_alive_timeo = HTTP_KEEPALIVE_MAX;
+		//if (this->keep_alive_timeo < 0 || this->keep_alive_timeo > HTTP_KEEPALIVE_MAX)
+
+	}
+
+	if (!resp->has_connection_header())
+	{
+		header.name = "Connection";
+		header.name_len = 10;
+		if (this->keep_alive_timeo == 0)
+		{
+			header.value = "close";
+			header.value_len = 5;
+		}
+		else
+		{
+			header.value = "Keep-Alive";
+			header.value_len = 10;
+		}
+
+		resp->add_header(&header);
+	}
+
+	return this->WFServerTask::message_out();
+}
+
+} // end namespace srpc
+

+ 143 - 0
src/http/http_task.h

@@ -0,0 +1,143 @@
+/*
+  Copyright (c) 2023 Sogou, Inc.
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+#ifndef __RPC_HTTP_TASK_H__
+#define __RPC_HTTP_TASK_H__
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <string>
+#include "workflow/HttpUtil.h"
+#include "workflow/WFTaskFactory.h"
+#include "workflow/WFGlobal.h"
+#include "rpc_module.h"
+
+namespace srpc
+{
+
+// copy part of workflow/src/factory/HttpTaskImpl.cc
+
+class HttpClientTask : public WFComplexClientTask<protocol::HttpRequest,
+												  protocol::HttpResponse>
+{
+public:
+	HttpClientTask(int redirect_max,
+				   int retry_max,
+				   http_callback_t&& callback,
+				   std::list<RPCModule *>&& modules) :
+		WFComplexClientTask(retry_max, std::move(callback)),
+		redirect_max_(redirect_max),
+		redirect_count_(0),
+		modules_(std::move(modules))
+	{
+		protocol::HttpRequest *client_req = this->get_req();
+
+		client_req->set_method(HttpMethodGet);
+		client_req->set_http_version("HTTP/1.1");
+	}
+
+	RPCModuleData *mutable_module_data() { return &module_data_; }
+	void set_module_data(RPCModuleData data) { module_data_ = std::move(data); }
+
+/*
+	// similar to opentracing: log({{"event", "error"}, {"message", "application log"}});
+	void log(const RPCLogVector& fields);
+
+	// Baggage Items, which are just key:value pairs that cross process boundaries
+	void add_baggage(const std::string& key, const std::string& value);
+	bool get_baggage(const std::string& key, std::string& value);
+*/
+
+protected:
+	virtual CommMessageOut *message_out();
+	virtual CommMessageIn *message_in();
+	virtual int keep_alive_timeout();
+	virtual bool init_success();
+	virtual void init_failed();
+	virtual bool finish_once();
+
+protected:
+	bool need_redirect(ParsedURI& uri);
+	bool redirect_url(protocol::HttpResponse *client_resp, ParsedURI& uri);
+	void set_empty_request();
+	void check_response();
+
+public:
+	http_callback_t user_callback;
+
+private:
+	int redirect_max_;
+	int redirect_count_;
+	RPCModuleData module_data_;
+	std::list<RPCModule *> modules_;
+};
+
+class HttpServerTask : public WFServerTask<protocol::HttpRequest,
+										   protocol::HttpResponse>
+{
+public:
+	HttpServerTask(CommService *service,
+				   std::function<void (WFHttpTask *)>& process,
+				   std::list<RPCModule *>&& modules) :
+		WFServerTask(service, WFGlobal::get_scheduler(), process),
+		req_is_alive_(false),
+		req_has_keep_alive_header_(false),
+		modules_(std::move(modules))
+	{}
+
+	class ModuleSeries : public WFServerTask<protocol::HttpRequest,
+											 protocol::HttpResponse>::Series
+	{
+	public:
+		ModuleSeries(WFServerTask<protocol::HttpRequest,
+								  protocol::HttpResponse> *task) :
+			WFServerTask<protocol::HttpRequest,
+						 protocol::HttpResponse>::Series(task),
+			module_data_(NULL)
+		{}
+
+		RPCModuleData *get_module_data() { return module_data_; }
+		void set_module_data(RPCModuleData *data) { module_data_ = data; }
+		bool has_module_data() const { return !!module_data_; }
+		void clear_module_data() { module_data_ = NULL; }
+
+	private:
+		RPCModuleData *module_data_;
+	};
+
+//TODO:
+//	bool get_remote(std::string& ip, unsigned short *port) const;
+
+	RPCModuleData *mutable_module_data() { return &module_data_; }
+	void set_module_data(RPCModuleData data) { module_data_ = std::move(data); }
+
+protected:
+	virtual void handle(int state, int error);
+	virtual CommMessageOut *message_out();
+
+protected:
+	bool req_is_alive_;
+	bool req_has_keep_alive_header_;
+	std::string req_keep_alive_;
+	RPCModuleData module_data_;
+	std::list<RPCModule *> modules_;
+};
+
+} // end namespace srpc
+
+#endif
+

+ 15 - 0
src/module/rpc_module.h

@@ -27,6 +27,8 @@
 namespace srpc
 {
 
+static constexpr char const *SRPC_COMPONENT_SRPC	= "srpc.srpc";
+
 static constexpr char const *SRPC_SPAN_LOG			= "srpc.log";
 static constexpr char const *SRPC_SPAN_EVENT		= "event";
 static constexpr char const *SRPC_SPAN_MESSAGE		= "message";
@@ -35,6 +37,19 @@ static constexpr char const *SRPC_START_TIMESTAMP	= "srpc.start_time";
 static constexpr char const *SRPC_FINISH_TIMESTAMP	= "srpc.finish_time";
 static constexpr char const *SRPC_DURATION			= "srpc.duration";
 
+static constexpr char const *SRPC_HTTP_METHOD		= "http.method";
+static constexpr char const *SRPC_HTTP_STATUS_CODE	= "http.status_code";
+static constexpr char const *SRPC_HTTP_SOCK_FAMILY	= "net.sock.family";
+static constexpr char const *SRPC_HTTP_SOCK_ADDR	= "net.sock.peer.addr";
+static constexpr char const *SRPC_HTTP_SOCK_PORT	= "net.sock.peer.port";
+static constexpr char const *SRPC_HTTP_REQ_LEN		= "http.request_content_length";
+static constexpr char const *SRPC_HTTP_RESP_LEN		= "http.response_content_length";
+
+// /users/12314/?q=ddds
+static constexpr char const *SRPC_HTTP_TARGET		= "http.target";
+// The IP address of the original client behind all proxies, from X-Forwarded-For
+static constexpr char const *SRPC_HTTP_CLIENT_IP	= "http.client_ip";
+
 //for SnowFlake: u_id = [timestamp][group][machine][sequence]
 static constexpr int SRPC_TIMESTAMP_BITS		= 38;
 static constexpr int SRPC_GROUP_BITS			= 4;

+ 19 - 23
src/module/rpc_trace_filter.cc

@@ -90,13 +90,8 @@ static size_t rpc_span_log_format(RPCModuleData& data, char *str, size_t len)
 	TRACE_ID_BIN_TO_HEX(trace_id, trace_id_buf);
 	SPAN_ID_BIN_TO_HEX(span_id, span_id_buf);
 
-	size_t ret = snprintf(str, len, "trace_id: %s span_id: %s service: %s"
-									" method: %s start_time: %s",
-						  trace_id_buf,
-						  span_id_buf,
-						  data[OTLP_SERVICE_NAME].c_str(),
-						  data[OTLP_METHOD_NAME].c_str(),
-						  data[SRPC_START_TIMESTAMP].c_str());
+	size_t ret = snprintf(str, len, "trace_id: %s span_id: %s",
+						  trace_id_buf, span_id_buf);
 
 	auto iter = data.find(SRPC_PARENT_SPAN_ID);
 	if (iter != data.end())
@@ -109,25 +104,26 @@ static size_t rpc_span_log_format(RPCModuleData& data, char *str, size_t len)
 						parent_span_id_buf);
 	}
 
-	iter = data.find(SRPC_FINISH_TIMESTAMP);
-	if (iter != data.end())
+	if (data.find(OTLP_SERVICE_NAME) != data.end()) // for RPC
 	{
-		ret += snprintf(str + ret, len - ret, " finish_time: %s",
-						iter->second.c_str());
+		ret += snprintf(str + ret, len - ret, " service: %s method: %s",
+						data[OTLP_SERVICE_NAME].c_str(),
+						data[OTLP_METHOD_NAME].c_str());
 	}
 
-	iter = data.find(SRPC_DURATION);
-	if (iter != data.end())
-	{
-		ret += snprintf(str + ret, len - ret, " duration: %s(ns)"
-											  " remote_ip: %s port: %s"
-											  " state: %s error: %s",
-						iter->second.c_str(),
-						data[SRPC_REMOTE_IP].c_str(),
-						data[SRPC_REMOTE_PORT].c_str(),
-						data[SRPC_STATE].c_str(),
-						data[SRPC_ERROR].c_str());
-	}
+	// TODO : add some method for HTTP
+
+	ret += snprintf(str + ret, len - ret, " start_time: %s finish_time: %s"
+										  " duration: %s(ns)"
+										  " remote_ip: %s port: %s"
+										  " state: %s error: %s",
+					data[SRPC_START_TIMESTAMP].c_str(),
+					data[SRPC_FINISH_TIMESTAMP].c_str(),
+					data[SRPC_DURATION].c_str(),
+					data[SRPC_REMOTE_IP].c_str(),
+					data[SRPC_REMOTE_PORT].c_str(),
+					data[SRPC_STATE].c_str(),
+					data[SRPC_ERROR].c_str());
 
 	for (const auto& it : data)
 	{

+ 0 - 3
src/module/rpc_trace_module.h

@@ -53,9 +53,6 @@ static constexpr char const *SRPC_REMOTE_IP			= "srpc.peer.ip";
 static constexpr char const *SRPC_REMOTE_PORT		= "srpc.peer.port";
 static constexpr char const *SRPC_SAMPLING_PRIO		= "srpc.sampling.priority";
 
-// for srpc.component
-static constexpr char const *SRPC_COMPONENT_SRPC	= "srpc.srpc";
-
 // for ext tags
 static constexpr char const *SRPC_DATA_TYPE			= "srpc.data.type";
 static constexpr char const *SRPC_COMPRESS_TYPE		= "srpc.compress.type";

+ 12 - 0
tutorial/CMakeLists.txt

@@ -210,3 +210,15 @@ foreach(src ${TUTORIAL_HELLOWORLD_LIST})
 	add_dependencies(${bin_name} TURORIAL_GEN)
 endforeach()
 
+set(TUTORIAL_HTTP_LIST
+	tutorial-17-http_server
+	tutorial-18-http_client
+)
+
+foreach(src ${TUTORIAL_HTTP_LIST})
+	string(REPLACE "-" ";" arr ${src})
+	list(GET arr -1 bin_name)
+	add_executable(${bin_name} ${src}.cc)
+	target_link_libraries(${bin_name} ${SRPC_LIB})
+	add_dependencies(${bin_name} TURORIAL_GEN)
+endforeach()

+ 69 - 0
tutorial/tutorial-17-http_server.cc

@@ -0,0 +1,69 @@
+/*
+  Copyright (c) 2023 sogou, Inc.
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+#include <signal.h>
+#include "workflow/WFFacilities.h"
+#include "srpc/http_server.h"
+#include "srpc/rpc_types.h"
+#include "srpc/rpc_metrics_filter.h"
+#include "srpc/rpc_trace_filter.h"
+
+static WFFacilities::WaitGroup wait_group(1);
+
+srpc::RPCMetricsPull  exporter;
+srpc::RPCTraceDefault trace_log;
+
+static void sig_handler(int signo)
+{
+	wait_group.done();
+}
+
+void process(WFHttpTask *task)
+{
+    fprintf(stderr, "http server get request_uri: %s\n",
+            task->get_req()->get_request_uri());
+
+    task->get_resp()->append_output_body("<html>Hello from server!</html>");
+}
+
+int main()
+{
+	signal(SIGINT, sig_handler);
+	signal(SIGTERM, sig_handler);
+
+	srpc::HttpServer server(process);
+
+	exporter.init(8080); /* export port for prometheus */
+	exporter.create_histogram("echo_request_size", "Echo request size",
+							{1, 10, 100});
+	exporter.create_summary("echo_test_quantiles", "Test quantile",
+						  {{0.5, 0.05}, {0.9, 0.01}});
+
+	server.add_filter(&trace_log);	
+	server.add_filter(&exporter);
+
+	if (server.start(1412) == 0)
+	{
+		wait_group.wait();
+		server.stop();
+	}
+	else
+		perror("server start");
+
+	exporter.deinit();
+	return 0;
+}
+

+ 59 - 0
tutorial/tutorial-18-http_client.cc

@@ -0,0 +1,59 @@
+/*
+  Copyright (c) 2020 sogou, Inc.
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+#include <stdio.h>
+#include "workflow/WFFacilities.h"
+#include "echo_pb.srpc.h"
+#include "srpc/http_client.h"
+#include "srpc/rpc_types.h"
+#include "srpc/rpc_types.h"
+#include "srpc/rpc_metrics_filter.h"
+#include "srpc/rpc_trace_filter.h"
+
+using namespace srpc;
+
+srpc::RPCTraceDefault trace_log;
+static WFFacilities::WaitGroup wait_group(1); 
+
+void callback(WFHttpTask *task)
+{
+    int state = task->get_state();
+    int error = task->get_error();
+    fprintf(stderr, "callback. state = %d error = %d\n", state, error);
+
+     if (state == WFT_STATE_SUCCESS) // print server response body
+     {
+        const void *body;
+        size_t body_len;
+
+        task->get_resp()->get_parsed_body(&body, &body_len);
+        fwrite(body, 1, body_len, stdout);
+        fflush(stdout);
+     }
+}
+
+int main()
+{
+	srpc::HttpClient client("http://127.0.0.1:1412");
+	client.add_filter(&trace_log);
+
+	WFHttpTask *task = client.create_http_task(callback);
+	task->start();
+
+	wait_group.wait();
+	return 0;
+}
+