Browse Source

rpc_module : support semantic conventions for HTTP spans

holmes1412 1 year ago
parent
commit
3350e21730

+ 10 - 4
CMakeLists_Headers.txt

@@ -21,10 +21,6 @@ set(SRC_HEADERS
 	src/module/rpc_filter.h
 	src/module/rpc_trace_filter.h
 	src/module/rpc_metrics_filter.h
-	src/http/http_task.h
-	src/http/http_module.h
-	src/http/http_client.h
-	src/http/http_server.h
 	src/rpc_basic.h
 	src/rpc_buffer.h
 	src/rpc_client.h
@@ -40,6 +36,16 @@ set(SRC_HEADERS
 	src/rpc_define.h
 )
 
+if (NOT WIN32)
+	set(SRC_HEADERS
+		${SRC_HEADERS}
+		src/http/http_task.h
+		src/http/http_module.h
+		src/http/http_client.h
+		src/http/http_server.h
+	)
+endif ()
+
 if (NOT SNAPPY_INSTALLED)
 	set(SNAPPY_HEADERS
 		third_party/snappy/snappy.h

+ 5 - 3
src/CMakeLists.txt

@@ -76,14 +76,17 @@ add_subdirectory(var)
 add_subdirectory(thrift)
 add_subdirectory(compress)
 add_subdirectory(message)
-add_subdirectory(http)
 
 add_dependencies(module LINK_HEADERS)
 add_dependencies(var LINK_HEADERS)
 add_dependencies(thrift LINK_HEADERS)
 add_dependencies(compress LINK_HEADERS)
 add_dependencies(message LINK_HEADERS)
-add_dependencies(http LINK_HEADERS)
+
+if (NOT WIN32)
+	add_subdirectory(http)
+	add_dependencies(http LINK_HEADERS)
+endif ()
 
 if (WIN32)
 	add_library(
@@ -94,7 +97,6 @@ if (WIN32)
 		$<TARGET_OBJECTS:thrift>
 		$<TARGET_OBJECTS:compress>
 		$<TARGET_OBJECTS:message>
-		$<TARGET_OBJECTS:http>
 	)
 
 	add_dependencies(${PROJECT_NAME} LINK_HEADERS)

+ 36 - 42
src/http/http_client.cc

@@ -21,33 +21,37 @@
 namespace srpc
 {
 
-HttpClient::HttpClient(const std::string& url) : 
-	params(HTTP_CLIENT_PARAMS_DEFAULT)
+WFHttpTask *HttpClient::create_http_task(const std::string& url,
+										 int redirect_max,
+										 int retry_max,
+										 http_callback_t callback)
 {
-//	TODO: make sure this is necessary
-	this->params.url = url;
-	this->init();
-}
-
-HttpClient::HttpClient(const ParsedURI& uri) :
-	params(HTTP_CLIENT_PARAMS_DEFAULT)
-{
-	this->params.uri = uri;
-	if (this->params.uri.scheme &&
-		strcasecmp(this->params.uri.scheme, "https") == 0)
+	std::list<RPCModule *> module;
+	for (int i = 0; i < SRPC_MODULE_MAX; i++)
 	{
-		this->params.is_ssl = true;
-//		this->params.port = HTTP_SSL_PORT_DEFAULT;
+		if (this->modules[i])
+			module.push_back(this->modules[i]);
 	}
-}
 
-HttpClient::HttpClient(const HttpClientParams *params) :
-	params(*params)
-{
-	this->init();
+	auto&& cb = std::bind(&HttpClient::callback, this, std::placeholders::_1);
+
+	HttpClientTask *task = new HttpClientTask(redirect_max, retry_max,
+											  cb, std::move(module));
+
+	ParsedURI uri;
+	URIParser::parse(url, uri);
+	task->init(std::move(uri));
+	task->set_keep_alive(HTTP_KEEPALIVE_DEFAULT);
+//	task->set_url(url);
+	task->user_callback_ = std::move(callback);
+
+	return task;
 }
 
-WFHttpTask *HttpClient::create_http_task(http_callback_t callback)
+WFHttpTask *HttpClient::create_http_task(const ParsedURI& uri,
+										 int redirect_max,
+										 int retry_max,
+										 http_callback_t callback)
 {
 	std::list<RPCModule *> module;
 	for (int i = 0; i < SRPC_MODULE_MAX; i++)
@@ -58,12 +62,13 @@ WFHttpTask *HttpClient::create_http_task(http_callback_t callback)
 
 	auto&& cb = std::bind(&HttpClient::callback, this, std::placeholders::_1);
 
-	HttpClientTask *task = new HttpClientTask(this->params.task_params.redirect_max,
-											  this->params.task_params.retry_max,
+	HttpClientTask *task = new HttpClientTask(redirect_max, retry_max,
 											  cb, std::move(module));
 
-	task->user_callback = std::move(callback);
-	this->task_init(task);
+	task->init(uri);
+	task->set_keep_alive(HTTP_KEEPALIVE_DEFAULT);
+//	task->set_url(url);
+	task->user_callback_ = std::move(callback);
 
 	return task;
 }
@@ -76,34 +81,23 @@ void HttpClient::init()
 		strcasecmp(this->params.uri.scheme, "https") == 0)
 	{
 		this->params.is_ssl = true;
-//		this->params.port = HTTP_SSL_PORT_DEFAULT;
 	}
 }
 
-void HttpClient::task_init(HttpClientTask *task)
-{
-	// set task by this->params;
-	task->init(this->params.uri);
-	task->set_transport_type(this->params.is_ssl ? TT_TCP_SSL : TT_TCP);
-	task->set_send_timeout(this->params.task_params.send_timeout);
-	task->set_receive_timeout(this->params.task_params.receive_timeout);
-	task->set_keep_alive(this->params.task_params.keep_alive_timeout);
-}
-
 void HttpClient::callback(WFHttpTask *task)
 {
-	RPCModuleData resp_data;
-	http_get_header_module_data(task->get_resp(), resp_data);
+	HttpClientTask *client_task = (HttpClientTask *)task;
+	RPCModuleData *resp_data = client_task->mutable_module_data();
+	http_get_header_module_data(task->get_resp(), *resp_data);
 	
 	for (int i = 0; i < SRPC_MODULE_MAX; i++)
 	{
 		if (this->modules[i])
-			this->modules[i]->client_task_end(task, resp_data);
+			this->modules[i]->client_task_end(task, *resp_data);
 	}
 
-	HttpClientTask *client_task = (HttpClientTask *)task;
-	if (client_task->user_callback)
-		client_task->user_callback(task);
+	if (client_task->user_callback_)
+		client_task->user_callback_(task);
 }
 
 void HttpClient::add_filter(RPCFilter *filter)

+ 12 - 6
src/http/http_client.h

@@ -70,13 +70,19 @@ static const struct HttpClientParams HTTP_CLIENT_PARAMS_DEFAULT =
 class HttpClient
 {
 public:
-	WFHttpTask *create_http_task(http_callback_t callback);
-	void add_filter(RPCFilter *filter);
+	HttpClient() { }
 
-public:
-	HttpClient(const std::string& url);
-	HttpClient(const ParsedURI& uri);
-	HttpClient(const HttpClientParams *params);
+	WFHttpTask *create_http_task(const std::string& url,
+								 int redirect_max,
+								 int retry_max,
+								 http_callback_t callback);
+
+	WFHttpTask *create_http_task(const ParsedURI& uri,
+								 int redirect_max,
+								 int retry_max,
+								 http_callback_t callback);
+
+	void add_filter(RPCFilter *filter);
 
 private:
 	void callback(WFHttpTask *task);

+ 90 - 15
src/http/http_module.cc

@@ -16,6 +16,13 @@
 
 #include "http_module.h"
 
+#ifdef _WIN32
+#include <workflow/PlatformSocket.h>
+#else
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#endif
+
 namespace srpc
 {
 
@@ -29,6 +36,16 @@ bool HttpTraceModule::client_begin(SubTask *task, RPCModuleData& data)
 	data[SRPC_COMPONENT] = SRPC_COMPONENT_SRPC;
 	data[SRPC_HTTP_METHOD] = req->get_method();
 
+	const void *body;
+	size_t body_len;
+	req->get_parsed_body(&body, &body_len);
+	data[SRPC_HTTP_REQ_LEN] = std::to_string(body_len);
+
+	data[SRPC_HTTP_PEER_NAME] = client_task->get_uri_host();
+	data[SRPC_HTTP_PEER_PORT] = client_task->get_uri_port();
+	data[SRPC_HTTP_SCHEME] = client_task->get_uri_scheme();
+	// TODO: data[SRPC_HTTP_CLIENT_URL] = client_task->get_url();
+
 	return true;
 }
 
@@ -36,15 +53,55 @@ bool HttpTraceModule::client_end(SubTask *task, RPCModuleData& data)
 {
 	TraceModule<HttpServerTask, HttpClientTask>::client_end(task, data);
 
-//	std::string ip;
-//	unsigned short port;
 	auto *client_task = static_cast<HttpClientTask *>(task);
 	auto *resp = client_task->get_resp();
 
-	if (client_task->get_state() == WFT_STATE_SUCCESS)
-		data[SRPC_HTTP_STATUS_CODE] = resp->get_status_code();
-	else
+	data[SRPC_STATE] = std::to_string(client_task->get_state());
+	if (client_task->get_state() != WFT_STATE_SUCCESS)
+	{
 		data[SRPC_ERROR] = client_task->get_error();
+		if (client_task->get_error() == ETIMEDOUT)
+		{
+			data[SRPC_TIMEOUT_REASON] =
+				std::to_string(client_task->get_timeout_reason());
+		}
+		return true;
+	}
+
+	data[SRPC_HTTP_STATUS_CODE] = resp->get_status_code();
+	data[SRPC_HTTP_RESEND_COUNT] = std::to_string(client_task->get_retry_times());
+
+	const void *body;
+	size_t body_len;
+	resp->get_parsed_body(&body, &body_len);
+	data[SRPC_HTTP_RESP_LEN] = std::to_string(body_len);
+
+	char addrstr[128];
+	struct sockaddr_storage addr;
+	socklen_t l = sizeof addr;
+	unsigned short port = 0;
+
+	client_task->get_peer_addr((struct sockaddr *)&addr, &l);
+	if (addr.ss_family == AF_INET)
+	{
+		data[SRPC_HTTP_SOCK_FAMILY] = "inet";
+
+		struct sockaddr_in *sin = (struct sockaddr_in *)&addr;
+		inet_ntop(AF_INET, &sin->sin_addr, addrstr, 128);
+		port = ntohs(sin->sin_port);
+	}
+	else if (addr.ss_family == AF_INET6)
+	{
+		data[SRPC_HTTP_SOCK_FAMILY] = "inet6";
+
+		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&addr;
+		inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, 128);
+		port = ntohs(sin6->sin6_port);
+	}
+	// else : Unknown
+
+	data[SRPC_HTTP_SOCK_ADDR] = addrstr;
+	data[SRPC_HTTP_SOCK_PORT] = std::to_string(port);
 
 	return true;
 }
@@ -59,8 +116,8 @@ bool HttpTraceModule::server_begin(SubTask *task, RPCModuleData& data)
 	data[SRPC_COMPONENT] = SRPC_COMPONENT_SRPC;
 	data[SRPC_HTTP_METHOD] = req->get_method();
 	data[SRPC_HTTP_TARGET] = req->get_request_uri();
-//	data[SRPC_HTTP_HOST_NAME] = server_task->hostname;
-//	data[SRPC_HTTP_HOST_PORT] = server_task->port;
+	data[SRPC_HTTP_SCHEME] = server_task->is_ssl() ? "https" : "http";
+	data[SRPC_HTTP_HOST_PORT] = std::to_string(server_task->listen_port());
 
 	char addrstr[128];
 	struct sockaddr_storage addr;
@@ -89,14 +146,29 @@ bool HttpTraceModule::server_begin(SubTask *task, RPCModuleData& data)
 	data[SRPC_HTTP_SOCK_ADDR] = addrstr;
 	data[SRPC_HTTP_SOCK_PORT] = std::to_string(port);
 
-/*
+	const void *body;
+	size_t body_len;
+	req->get_parsed_body(&body, &body_len);
+	data[SRPC_HTTP_REQ_LEN] = std::to_string(body_len);
+
+	std::string name;
+	std::string value;
 	protocol::HttpHeaderCursor req_cursor(req);
-	while (req_cursor.next(name, value))
+	int flag = 0;
+	while (req_cursor.next(name, value) && flag != 3)
 	{
-		if (name.casecmp("X-Forwarded-For") == 0)
+		if (strcasecmp(name.data(), "Host") == 0)
+		{
+			data[SRPC_HTTP_HOST_NAME] = value;
+			flag |= 1;
+		}
+		else if (strcasecmp(name.data(), "X-Forwarded-For") == 0)
+		{
 			data[SRPC_HTTP_CLIENT_IP] = value;
+			flag |= (1 << 1);
+		}
 	}
-*/
+
 	return true;
 }
 
@@ -104,13 +176,16 @@ bool HttpTraceModule::server_end(SubTask *task, RPCModuleData& data)
 {
 	TraceModule<HttpServerTask, HttpClientTask>::server_end(task, data);
 
-/*
 	auto *server_task = static_cast<HttpServerTask *>(task);
 	auto *resp = server_task->get_resp();
 
-	data[SRPC_STATE] = std::to_string(resp->get_status_code());
-	data[SRPC_ERROR] = std::to_string(resp->get_error());
-*/
+	data[SRPC_STATE] = std::to_string(server_task->get_state());
+	if (server_task->get_state() == WFT_STATE_SUCCESS)
+	{
+		data[SRPC_HTTP_STATUS_CODE] = resp->get_status_code();
+		data[SRPC_HTTP_RESP_LEN] = std::to_string(resp->get_output_body_size());
+	}
+
 	return true;
 }
 

+ 32 - 1
src/http/http_server.cc

@@ -18,6 +18,13 @@
 #include "http_task.h"
 #include "http_module.h"
 
+#ifdef _WIN32
+#include <workflow/PlatformSocket.h>
+#else
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#endif
+
 namespace srpc
 {
 
@@ -56,7 +63,7 @@ void HttpServer::add_filter(RPCFilter *filter)
 
 CommSession *HttpServer::new_session(long long seq, CommConnection *conn)
 {
-	WFHttpTask *task;
+	HttpServerTask *task;
 
 	std::list<RPCModule *> module;
 	for (int i = 0; i < SRPC_MODULE_MAX; i++)
@@ -69,9 +76,33 @@ CommSession *HttpServer::new_session(long long seq, CommConnection *conn)
 	task->set_keep_alive(this->params.keep_alive_timeout);
 	task->set_receive_timeout(this->params.receive_timeout);
 	task->get_req()->set_size_limit(this->params.request_size_limit);
+	task->set_is_ssl(this->get_ssl_ctx() ? true : false);
+	task->set_listen_port(this->get_listen_port());
 
 	return task;
 }
 
+unsigned short HttpServer::get_listen_port()
+{
+	if (this->listen_port == 0)
+	{
+		struct sockaddr_storage addr;
+		socklen_t l = sizeof addr;
+		this->get_listen_addr((struct sockaddr *)&addr, &l);
+		if (addr.ss_family == AF_INET)
+		{
+			struct sockaddr_in *sin = (struct sockaddr_in *)&addr;
+			this->listen_port = ntohs(sin->sin_port);
+		}
+		else // if (addr.ss_family == AF_INET6)
+		{
+			struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&addr;
+			this->listen_port = ntohs(sin6->sin6_port);
+		}
+	}
+
+	return this->listen_port;
+}
+
 } // namespace srpc
 

+ 10 - 1
src/http/http_server.h

@@ -28,15 +28,24 @@ namespace srpc
 class HttpServer : public WFHttpServer
 {
 public:
-	HttpServer(http_process_t proc) : WFHttpServer(std::move(proc)) {}
+	HttpServer(http_process_t proc) :
+		WFHttpServer(std::move(proc)),
+		listen_port(0)
+	{
+	}
+
 	void add_filter(RPCFilter *filter);
 
 protected:
 	CommSession *new_session(long long seq, CommConnection *conn) override;
 
+private:
+	unsigned short get_listen_port();
+
 private:
 	std::mutex mutex;
 	RPCModule *modules[SRPC_MODULE_MAX] = { NULL };
+	unsigned short listen_port;
 };
 
 } // namespace srpc

+ 47 - 0
src/http/http_task.cc

@@ -15,6 +15,53 @@ using namespace protocol;
 
 #define HTTP_KEEPALIVE_MAX		(300 * 1000)
 
+HttpClientTask::HttpClientTask(int redirect_max,
+							   int retry_max,
+							   http_callback_t&& callback,
+							   std::list<RPCModule *>&& modules) :
+	WFComplexClientTask(retry_max, std::move(callback)),
+	redirect_max_(redirect_max),
+	redirect_count_(0),
+	modules_(std::move(modules))
+{
+	protocol::HttpRequest *client_req = this->get_req();
+
+	client_req->set_method(HttpMethodGet);
+	client_req->set_http_version("HTTP/1.1");
+}
+
+std::string HttpClientTask::get_uri_host() const
+{
+	if (uri_.state == URI_STATE_SUCCESS)
+		return uri_.host;
+
+	return "";
+}
+
+std::string HttpClientTask::get_uri_port() const
+{
+	if (uri_.state == URI_STATE_SUCCESS)
+		return uri_.port;
+
+	return "";
+}
+
+std::string HttpClientTask::get_url() const
+{
+	if (url_.empty())
+		return url_;
+
+	return ""; //TODO: fill with uri and other info
+}
+
+std::string HttpClientTask::get_uri_scheme() const
+{
+	if (uri_.state == URI_STATE_SUCCESS)
+		return uri_.scheme;
+
+	return "";
+}
+
 CommMessageOut *HttpClientTask::message_out()
 {
 	HttpRequest *req = this->get_req();

+ 17 - 13
src/http/http_task.h

@@ -38,21 +38,17 @@ public:
 	HttpClientTask(int redirect_max,
 				   int retry_max,
 				   http_callback_t&& callback,
-				   std::list<RPCModule *>&& modules) :
-		WFComplexClientTask(retry_max, std::move(callback)),
-		redirect_max_(redirect_max),
-		redirect_count_(0),
-		modules_(std::move(modules))
-	{
-		protocol::HttpRequest *client_req = this->get_req();
-
-		client_req->set_method(HttpMethodGet);
-		client_req->set_http_version("HTTP/1.1");
-	}
+				   std::list<RPCModule *>&& modules);
 
 	RPCModuleData *mutable_module_data() { return &module_data_; }
 	void set_module_data(RPCModuleData data) { module_data_ = std::move(data); }
+	int get_retry_times() const { return retry_times_; }
+	void set_url(std::string url) { this->url_ = std::move(url); }
 
+	std::string get_uri_host() const;
+	std::string get_uri_port() const;
+	std::string get_uri_scheme() const;
+	std::string get_url() const;
 /*
 	// similar to opentracing: log({{"event", "error"}, {"message", "application log"}});
 	void log(const RPCLogVector& fields);
@@ -77,13 +73,14 @@ protected:
 	void check_response();
 
 public:
-	http_callback_t user_callback;
+	http_callback_t user_callback_;
 
 private:
+	std::string url_;
 	int redirect_max_;
 	int redirect_count_;
 	RPCModuleData module_data_;
-	std::list<RPCModule *> modules_;
+	std::list<RPCModule *> modules_ = { NULL };
 };
 
 class HttpServerTask : public WFServerTask<protocol::HttpRequest,
@@ -99,6 +96,11 @@ public:
 		modules_(std::move(modules))
 	{}
 
+	void set_is_ssl(bool is_ssl) { this->is_ssl_ = is_ssl; }
+	void set_listen_port(unsigned short port) { this->listen_port_ = port; }
+	bool is_ssl() const { return this->is_ssl_; }
+	unsigned short listen_port() const { return this->listen_port_; }
+
 	class ModuleSeries : public WFServerTask<protocol::HttpRequest,
 											 protocol::HttpResponse>::Series
 	{
@@ -135,6 +137,8 @@ protected:
 	std::string req_keep_alive_;
 	RPCModuleData module_data_;
 	std::list<RPCModule *> modules_;
+	bool is_ssl_;
+	unsigned short listen_port_;
 };
 
 } // end namespace srpc

+ 0 - 1
src/module/CMakeLists.txt

@@ -7,7 +7,6 @@ set(PROTO_LIST
 	proto/opentelemetry_common.proto
 	proto/opentelemetry_resource.proto
 	proto/opentelemetry_trace.proto
-	proto/opentelemetry_trace_service.proto
 	proto/opentelemetry_metrics.proto
 	proto/opentelemetry_metrics_service.proto)
 

+ 21 - 0
src/module/proto/opentelemetry_trace.proto

@@ -20,6 +20,27 @@ import "opentelemetry_common.proto";
 import "opentelemetry_resource.proto";
 
 // A collection of InstrumentationLibrarySpans from a Resource.
+
+// TracesData represents the traces data that can be stored in a persistent storage,
+// OR can be embedded by other protocols that transfer OTLP traces data but do
+// not implement the OTLP protocol.
+//
+// The main difference between this message and collector protocol is that
+// in this message there will not be any "control" or "metadata" specific to
+// OTLP protocol.
+//
+// When new fields are added into this message, the OTLP request MUST be updated
+// as well.
+message TracesData {
+  // An array of ResourceSpans.
+  // For data coming from a single resource this array will typically contain
+  // one element. Intermediary nodes that receive data from multiple origins
+  // typically batch the data before forwarding further and in that case this
+  // array will contain multiple elements.
+  repeated ResourceSpans resource_spans = 1;
+}
+
+// A collection of ScopeSpans from a Resource.
 message ResourceSpans {
   // The resource for the spans in this message.
   // If this field is not set then no resource info is known.

+ 0 - 40
src/module/proto/opentelemetry_trace_service.proto

@@ -1,40 +0,0 @@
-// Copyright 2019, OpenTelemetry Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-package opentelemetry.proto.collector.trace.v1;
-
-import "opentelemetry_trace.proto";
-
-// Service that can be used to push spans between one Application instrumented with
-// OpenTelemetry and a collector, or between a collector and a central collector (in this
-// case spans are sent/received to/from multiple Applications).
-service TraceService {
-  // For performance reasons, it is recommended to keep this RPC
-  // alive for the entire life of the application.
-  rpc Export(ExportTraceServiceRequest) returns (ExportTraceServiceResponse) {}
-}
-
-message ExportTraceServiceRequest {
-  // An array of ResourceSpans.
-  // For data coming from a single resource this array will typically contain one
-  // element. Intermediary nodes (such as OpenTelemetry Collector) that receive
-  // data from multiple origins typically batch the data before forwarding further and
-  // in that case this array will contain multiple elements.
-  repeated opentelemetry.proto.trace.v1.ResourceSpans resource_spans = 1;
-}
-
-message ExportTraceServiceResponse {
-}

+ 2 - 0
src/module/rpc_filter.h

@@ -29,6 +29,8 @@ static constexpr unsigned int	OTLP_HTTP_REDIRECT_MAX		= 0;
 static constexpr unsigned int	OTLP_HTTP_RETRY_MAX			= 1;
 static constexpr const char	   *OTLP_SERVICE_NAME			= "service.name";
 static constexpr const char	   *OTLP_METHOD_NAME			= "operation.name";
+static constexpr const char	   *SRPC_HTTP_METHOD			= "http.method";
+static constexpr const char	   *SRPC_HTTP_STATUS_CODE		= "http.status_code";
 static constexpr size_t			RPC_REPORT_THREHOLD_DEFAULT	= 100;
 static constexpr size_t			RPC_REPORT_INTERVAL_DEFAULT	= 1000; /* msec */
 

+ 1 - 13
src/module/rpc_module.h

@@ -36,19 +36,7 @@ static constexpr char const *SRPC_SPAN_MESSAGE		= "message";
 static constexpr char const *SRPC_START_TIMESTAMP	= "srpc.start_time";
 static constexpr char const *SRPC_FINISH_TIMESTAMP	= "srpc.finish_time";
 static constexpr char const *SRPC_DURATION			= "srpc.duration";
-
-static constexpr char const *SRPC_HTTP_METHOD		= "http.method";
-static constexpr char const *SRPC_HTTP_STATUS_CODE	= "http.status_code";
-static constexpr char const *SRPC_HTTP_SOCK_FAMILY	= "net.sock.family";
-static constexpr char const *SRPC_HTTP_SOCK_ADDR	= "net.sock.peer.addr";
-static constexpr char const *SRPC_HTTP_SOCK_PORT	= "net.sock.peer.port";
-static constexpr char const *SRPC_HTTP_REQ_LEN		= "http.request_content_length";
-static constexpr char const *SRPC_HTTP_RESP_LEN		= "http.response_content_length";
-
-// /users/12314/?q=ddds
-static constexpr char const *SRPC_HTTP_TARGET		= "http.target";
-// The IP address of the original client behind all proxies, from X-Forwarded-For
-static constexpr char const *SRPC_HTTP_CLIENT_IP	= "http.client_ip";
+static constexpr char const *SRPC_TIMEOUT_REASON	= "srpc.timeout_reason";
 
 //for SnowFlake: u_id = [timestamp][group][machine][sequence]
 static constexpr int SRPC_TIMESTAMP_BITS		= 38;

+ 103 - 33
src/module/rpc_trace_filter.cc

@@ -3,12 +3,11 @@
 #include "workflow/WFTask.h"
 #include "workflow/HttpUtil.h"
 #include "rpc_trace_filter.h"
-#include "opentelemetry_trace_service.pb.h"
+#include "opentelemetry_trace.pb.h"
 
 namespace srpc
 {
 
-using namespace opentelemetry::proto::collector::trace::v1;
 using namespace opentelemetry::proto::trace::v1;
 using namespace opentelemetry::proto::common::v1;
 using namespace opentelemetry::proto::resource::v1;
@@ -16,7 +15,7 @@ using namespace opentelemetry::proto::resource::v1;
 static InstrumentationLibrarySpans *
 rpc_span_fill_pb_request(const RPCModuleData& data,
 		const std::unordered_map<std::string, std::string>& attributes,
-		ExportTraceServiceRequest *req)
+		TracesData *req)
 {
 	ResourceSpans *rs = req->add_resource_spans();
 	InstrumentationLibrarySpans *spans = rs->add_instrumentation_library_spans();
@@ -57,26 +56,76 @@ static void rpc_span_fill_pb_span(RPCModuleData& data,
 								  InstrumentationLibrarySpans *spans)
 {
 	Span *span = spans->add_spans();
+	Status *status = span->mutable_status();
+	KeyValue *attribute;
+	AnyValue *value;
 
 	span->set_span_id(data[SRPC_SPAN_ID].c_str(), SRPC_SPANID_SIZE);
 	span->set_trace_id(data[SRPC_TRACE_ID].c_str(), SRPC_TRACEID_SIZE);
-	span->set_name(data[OTLP_METHOD_NAME]);
+
+	// name is required and specified in OpenTelemetry semantic conventions.
+	auto iter = data.find(OTLP_METHOD_NAME);
+	if (iter != data.end())
+		span->set_name(data[OTLP_METHOD_NAME]); // for RPC
+	else
+		span->set_name(data[SRPC_HTTP_METHOD]); // for HTTP
+
+	// refer to : trace/semantic_conventions/http/#status
+	int http_status_code = 0;
+	iter = data.find(SRPC_HTTP_STATUS_CODE);
+	if (iter != data.end())
+		http_status_code = atoi(data[SRPC_HTTP_STATUS_CODE].c_str());
 
 	for (const auto& iter : data)
 	{
-		if (iter.first.compare(SRPC_PARENT_SPAN_ID) == 0)
+		const std::string& key = iter.first;
+
+		if (key.compare(SRPC_PARENT_SPAN_ID) == 0)
+		{
 			span->set_parent_span_id(iter.second);
-		else if (iter.first.compare(SRPC_SPAN_KIND) == 0)
+		}
+		else if (key.compare(SRPC_SPAN_KIND) == 0)
 		{
 			if (iter.second.compare(SRPC_SPAN_KIND_CLIENT) == 0)
+			{
 				span->set_kind(Span_SpanKind_SPAN_KIND_CLIENT);
+				if (http_status_code >= 400)
+					status->set_code(Status_StatusCode_STATUS_CODE_ERROR);
+			}
 			else if (iter.second.compare(SRPC_SPAN_KIND_SERVER) == 0)
+			{
 				span->set_kind(Span_SpanKind_SPAN_KIND_SERVER);
+				if (http_status_code >= 500)
+					status->set_code(Status_StatusCode_STATUS_CODE_ERROR);
+			}
 		}
-		else if (iter.first.compare(SRPC_START_TIMESTAMP) == 0)
+		else if (key.compare(SRPC_START_TIMESTAMP) == 0)
+		{
 			span->set_start_time_unix_nano(atoll(data[SRPC_START_TIMESTAMP].data()));
-		else if (iter.first.compare(SRPC_FINISH_TIMESTAMP) == 0)
+		}
+		else if (key.compare(SRPC_FINISH_TIMESTAMP) == 0)
+		{
 			span->set_end_time_unix_nano(atoll(data[SRPC_FINISH_TIMESTAMP].data()));
+		}
+		else if (key.compare(0, 5, "srpc.") != 0)
+		{
+			attribute= span->add_attributes();
+			attribute->set_key(key);
+			value = attribute->mutable_value();
+
+			size_t len = key.length();
+			if ((len > 4 && key.substr(len - 4).compare("port") == 0) ||
+				(len > 5 && key.substr(len - 5).compare("count") == 0) ||
+				(len > 6 && key.substr(len - 6).compare("length") == 0) ||
+				key.compare(SRPC_HTTP_STATUS_CODE)== 0)
+			{
+				value->set_int_value(atoi(iter.second.c_str()));
+			}
+			else
+			{
+				value->set_string_value(iter.second);
+			}
+		}
 	}
 }
 
@@ -104,30 +153,26 @@ static size_t rpc_span_log_format(RPCModuleData& data, char *str, size_t len)
 						parent_span_id_buf);
 	}
 
-	if (data.find(OTLP_SERVICE_NAME) != data.end()) // for RPC
-	{
-		ret += snprintf(str + ret, len - ret, " service: %s method: %s",
-						data[OTLP_SERVICE_NAME].c_str(),
-						data[OTLP_METHOD_NAME].c_str());
-	}
-
-	// TODO : add some method for HTTP
-
 	ret += snprintf(str + ret, len - ret, " start_time: %s finish_time: %s"
-										  " duration: %s(ns)"
-										  " remote_ip: %s port: %s"
-										  " state: %s error: %s",
+										  " duration: %s(ns)",
 					data[SRPC_START_TIMESTAMP].c_str(),
 					data[SRPC_FINISH_TIMESTAMP].c_str(),
-					data[SRPC_DURATION].c_str(),
-					data[SRPC_REMOTE_IP].c_str(),
-					data[SRPC_REMOTE_PORT].c_str(),
-					data[SRPC_STATE].c_str(),
-					data[SRPC_ERROR].c_str());
+					data[SRPC_DURATION].c_str());
 
 	for (const auto& it : data)
 	{
-		if (strncmp(it.first.c_str(), SRPC_SPAN_LOG, strlen(SRPC_SPAN_LOG)) == 0)
+		if (strcmp(it.first.c_str(), SRPC_START_TIMESTAMP) == 0 ||
+			strcmp(it.first.c_str(), SRPC_FINISH_TIMESTAMP) == 0 ||
+			strcmp(it.first.c_str(), SRPC_DURATION) == 0 ||
+			strcmp(it.first.c_str(), SRPC_TRACE_ID) == 0 ||
+			strcmp(it.first.c_str(), SRPC_SPAN_ID) == 0 ||
+			strcmp(it.first.c_str(), SRPC_PARENT_SPAN_ID) == 0)
+		{
+			continue;
+		}
+
+		if (strcmp(it.first.c_str(), SRPC_SPAN_LOG) == 0)
+		{
 			ret += snprintf(str + ret, len - ret,
 							"\n%s trace_id: %s span_id: %s"
 							" timestamp: %s %s",
@@ -136,6 +181,15 @@ static size_t rpc_span_log_format(RPCModuleData& data, char *str, size_t len)
 							span_id_buf,
 							it.first.c_str() + strlen(SRPC_SPAN_LOG) + 1,
 							it.second.c_str());
+		}
+		else
+		{
+			const char * key = it.first.c_str();
+			if (it.first.compare(0, 5, "srpc.") == 0)
+				key += 5;
+			ret += snprintf(str + ret, len - ret, " %s: %s",
+							key, it.second.c_str());
+		}
 	}
 
 	return ret;
@@ -232,7 +286,7 @@ RPCTraceOpenTelemetry::RPCTraceOpenTelemetry(const std::string& url) :
 	report_status(false),
 	report_span_count(0)
 {
-	this->report_req = new ExportTraceServiceRequest;
+	this->report_req = new TracesData;
 }
 
 RPCTraceOpenTelemetry::RPCTraceOpenTelemetry(const std::string& url,
@@ -249,7 +303,7 @@ RPCTraceOpenTelemetry::RPCTraceOpenTelemetry(const std::string& url,
 	report_status(false),
 	report_span_count(0)
 {
-	this->report_req = new ExportTraceServiceRequest;
+	this->report_req = new TracesData;
 }
 
 RPCTraceOpenTelemetry::~RPCTraceOpenTelemetry()
@@ -261,7 +315,7 @@ SubTask *RPCTraceOpenTelemetry::create(RPCModuleData& span)
 {
 	std::string *output = new std::string;
 	SubTask *next = NULL;
-	ExportTraceServiceRequest *req = (ExportTraceServiceRequest *)this->report_req;
+	TracesData *req = (TracesData *)this->report_req;
 
 	this->mutex.lock();
 	if (!this->report_status)
@@ -320,18 +374,34 @@ bool RPCTraceOpenTelemetry::filter(RPCModuleData& data)
 {
 	std::unordered_map<std::string, google::protobuf::Message *>::iterator it;
 	InstrumentationLibrarySpans *spans;
+	std::string service_name;
 	bool ret;
-	const std::string& service_name = data[OTLP_SERVICE_NAME];
+
+	auto iter = data.find(OTLP_SERVICE_NAME);
+	if (iter != data.end())
+	{
+		service_name = iter->second;
+	}
+	else // for HTTP
+	{
+		service_name = data[SRPC_COMPONENT] + std::string(".") +
+					   data[SRPC_HTTP_SCHEME];
+
+		if (data.find(SRPC_SPAN_KIND_CLIENT) != data.end())
+			service_name += ".client";
+		else
+			service_name += ".server";
+	}
 
 	this->mutex.lock();
 	if (this->filter_policy.collect(data))
 	{
 		++this->report_span_count;
-		it = report_map.find(service_name);
-		if (it == report_map.end())
+		it = this->report_map.find(service_name);
+		if (it == this->report_map.end())
 		{
 			spans = rpc_span_fill_pb_request(data, this->attributes,
-							(ExportTraceServiceRequest *)this->report_req);
+											 (TracesData *)this->report_req);
 			this->report_map.insert({service_name, spans});
 		}
 		else

+ 22 - 0
src/module/rpc_trace_module.h

@@ -57,6 +57,28 @@ static constexpr char const *SRPC_SAMPLING_PRIO		= "srpc.sampling.priority";
 static constexpr char const *SRPC_DATA_TYPE			= "srpc.data.type";
 static constexpr char const *SRPC_COMPRESS_TYPE		= "srpc.compress.type";
 
+// for http
+static constexpr char const *SRPC_HTTP_SOCK_FAMILY	= "net.sock.family";
+static constexpr char const *SRPC_HTTP_SOCK_ADDR	= "net.sock.peer.addr";
+static constexpr char const *SRPC_HTTP_SOCK_PORT	= "net.sock.peer.port";
+static constexpr char const *SRPC_HTTP_REQ_LEN		= "http.request_content_length";
+static constexpr char const *SRPC_HTTP_RESP_LEN		= "http.response_content_length";
+
+// for http client
+static constexpr char const *SRPC_HTTP_CLIENT_URL	= "http.url";
+static constexpr char const *SRPC_HTTP_PEER_NAME	= "net.peer.name";
+static constexpr char const *SRPC_HTTP_PEER_PORT	= "net.peer.port";
+static constexpr char const *SRPC_HTTP_RESEND_COUNT	= "http.resend_count";
+
+// for http server
+static constexpr char const *SRPC_HTTP_SCHEME		= "http.scheme";
+static constexpr char const *SRPC_HTTP_HOST_NAME	= "net.host.name";
+static constexpr char const *SRPC_HTTP_HOST_PORT	= "net.host.port";
+// /users/12314/?q=ddds
+static constexpr char const *SRPC_HTTP_TARGET		= "http.target";
+// The IP address of the original client behind all proxies, from X-Forwarded-For
+static constexpr char const *SRPC_HTTP_CLIENT_IP	= "http.client_ip";
+
 // Basic TraceModule for generating general span data.
 // Each kind of network task can derived its own TraceModule.
 

+ 14 - 11
tutorial/CMakeLists.txt

@@ -210,15 +210,18 @@ foreach(src ${TUTORIAL_HELLOWORLD_LIST})
 	add_dependencies(${bin_name} TURORIAL_GEN)
 endforeach()
 
-set(TUTORIAL_HTTP_LIST
-	tutorial-17-http_server
-	tutorial-18-http_client
-)
+if (NOT WIN32)
+	set(TUTORIAL_HTTP_LIST
+		tutorial-17-http_server
+		tutorial-18-http_client
+	)
+
+	foreach(src ${TUTORIAL_HTTP_LIST})
+		string(REPLACE "-" ";" arr ${src})
+		list(GET arr -1 bin_name)
+		add_executable(${bin_name} ${src}.cc)
+		target_link_libraries(${bin_name} ${SRPC_LIB})
+		add_dependencies(${bin_name} TURORIAL_GEN)
+	endforeach()
+endif ()
 
-foreach(src ${TUTORIAL_HTTP_LIST})
-	string(REPLACE "-" ";" arr ${src})
-	list(GET arr -1 bin_name)
-	add_executable(${bin_name} ${src}.cc)
-	target_link_libraries(${bin_name} ${SRPC_LIB})
-	add_dependencies(${bin_name} TURORIAL_GEN)
-endforeach()

+ 3 - 1
tutorial/tutorial-17-http_server.cc

@@ -25,6 +25,7 @@ static WFFacilities::WaitGroup wait_group(1);
 
 srpc::RPCMetricsPull  exporter;
 srpc::RPCTraceDefault trace_log;
+//srpc::RPCTraceOpenTelemetry otel("http://127.0.0.1:8081");
 
 static void sig_handler(int signo)
 {
@@ -52,7 +53,8 @@ int main()
 	exporter.create_summary("echo_test_quantiles", "Test quantile",
 						  {{0.5, 0.05}, {0.9, 0.01}});
 
-	server.add_filter(&trace_log);	
+	server.add_filter(&trace_log);
+//	server.add_filter(&otel);
 	server.add_filter(&exporter);
 
 	if (server.start(1412) == 0)

+ 25 - 18
tutorial/tutorial-18-http_client.cc

@@ -1,11 +1,11 @@
 /*
-  Copyright (c) 2020 sogou, Inc.
+  Copyright (c) 2023 sogou, Inc.
 
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at
 
-      http://www.apache.org/licenses/LICENSE-2.0
+	  http://www.apache.org/licenses/LICENSE-2.0
 
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,35 +25,42 @@
 
 using namespace srpc;
 
+#define REDIRECT_MAX	5
+#define RETRY_MAX		2
+
 srpc::RPCTraceDefault trace_log;
 static WFFacilities::WaitGroup wait_group(1); 
 
 void callback(WFHttpTask *task)
 {
-    int state = task->get_state();
-    int error = task->get_error();
-    fprintf(stderr, "callback. state = %d error = %d\n", state, error);
-
-     if (state == WFT_STATE_SUCCESS) // print server response body
-     {
-        const void *body;
-        size_t body_len;
-
-        task->get_resp()->get_parsed_body(&body, &body_len);
-        fwrite(body, 1, body_len, stdout);
-        fflush(stdout);
-     }
+	int state = task->get_state();
+	int error = task->get_error();
+	fprintf(stderr, "callback. state = %d error = %d\n", state, error);
+
+	if (state == WFT_STATE_SUCCESS) // print server response body
+	{
+		const void *body;
+		size_t body_len;
+
+		task->get_resp()->get_parsed_body(&body, &body_len);
+		fwrite(body, 1, body_len, stdout);
+		fflush(stdout);
+		fprintf(stderr, "\nfinish print body. body_len = %zu\n", body_len);
+	}
 }
 
 int main()
 {
-	srpc::HttpClient client("http://127.0.0.1:1412");
+	srpc::HttpClient client;
 	client.add_filter(&trace_log);
 
-	WFHttpTask *task = client.create_http_task(callback);
+	WFHttpTask *task = client.create_http_task("http://127.0.0.1:1412",
+											   REDIRECT_MAX,
+											   RETRY_MAX,
+											   callback);
 	task->start();
-
 	wait_group.wait();
+
 	return 0;
 }