123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422 |
- #include <stdio.h>
- #include <limits.h>
- #include "workflow/WFTask.h"
- #include "workflow/HttpUtil.h"
- #include "rpc_trace_filter.h"
- #include "opentelemetry_trace.pb.h"
- namespace srpc
- {
- using namespace opentelemetry::proto::trace::v1;
- using namespace opentelemetry::proto::common::v1;
- using namespace opentelemetry::proto::resource::v1;
- static InstrumentationLibrarySpans *
- rpc_span_fill_pb_request(const RPCModuleData& data,
- const std::unordered_map<std::string, std::string>& attributes,
- TracesData *req)
- {
- ResourceSpans *rs = req->add_resource_spans();
- InstrumentationLibrarySpans *spans = rs->add_instrumentation_library_spans();
- Resource *resource = rs->mutable_resource();
- KeyValue *attribute;
- AnyValue *value;
- auto iter = data.find(OTLP_METHOD_NAME);
- if (iter != data.end())
- {
- attribute = resource->add_attributes();
- attribute->set_key(OTLP_METHOD_NAME);
- value = attribute->mutable_value();
- value->set_string_value(iter->second);
- }
- for (const auto& attr : attributes)
- {
- KeyValue *attribute = resource->add_attributes();
- attribute->set_key(attr.first);
- AnyValue *value = attribute->mutable_value();
- value->set_string_value(attr.second);
- }
- iter = data.find(OTLP_SERVICE_NAME); // if attributes also set service.name, data takes precedence
- if (iter != data.end())
- {
- attribute = resource->add_attributes();
- attribute->set_key(OTLP_SERVICE_NAME);
- value = attribute->mutable_value();
- value->set_string_value(iter->second);
- }
- return spans;
- }
- static void rpc_span_fill_pb_span(RPCModuleData& data,
- InstrumentationLibrarySpans *spans)
- {
- Span *span = spans->add_spans();
- Status *status = span->mutable_status();
- KeyValue *attribute;
- AnyValue *value;
- span->set_span_id(data[SRPC_SPAN_ID].c_str(), SRPC_SPANID_SIZE);
- span->set_trace_id(data[SRPC_TRACE_ID].c_str(), SRPC_TRACEID_SIZE);
- // name is required and specified in OpenTelemetry semantic conventions.
- auto iter = data.find(OTLP_METHOD_NAME);
- if (iter != data.end())
- span->set_name(data[OTLP_METHOD_NAME]); // for RPC
- else
- span->set_name(data[SRPC_HTTP_METHOD]); // for HTTP
- // refer to : trace/semantic_conventions/http/#status
- int http_status_code = 0;
- iter = data.find(SRPC_HTTP_STATUS_CODE);
- if (iter != data.end())
- http_status_code = atoi(data[SRPC_HTTP_STATUS_CODE].c_str());
- for (const auto& iter : data)
- {
- const std::string& key = iter.first;
- if (key.compare(SRPC_PARENT_SPAN_ID) == 0)
- {
- span->set_parent_span_id(iter.second);
- }
- else if (key.compare(SRPC_SPAN_KIND) == 0)
- {
- if (iter.second.compare(SRPC_SPAN_KIND_CLIENT) == 0)
- {
- span->set_kind(Span_SpanKind_SPAN_KIND_CLIENT);
- if (http_status_code >= 400)
- status->set_code(Status_StatusCode_STATUS_CODE_ERROR);
- }
- else if (iter.second.compare(SRPC_SPAN_KIND_SERVER) == 0)
- {
- span->set_kind(Span_SpanKind_SPAN_KIND_SERVER);
- if (http_status_code >= 500)
- status->set_code(Status_StatusCode_STATUS_CODE_ERROR);
- }
- }
- else if (key.compare(SRPC_START_TIMESTAMP) == 0)
- {
- span->set_start_time_unix_nano(atoll(data[SRPC_START_TIMESTAMP].data()));
- }
- else if (key.compare(SRPC_FINISH_TIMESTAMP) == 0)
- {
- span->set_end_time_unix_nano(atoll(data[SRPC_FINISH_TIMESTAMP].data()));
- }
- else if (key.compare(0, 5, "srpc.") != 0)
- {
- attribute= span->add_attributes();
- attribute->set_key(key);
- value = attribute->mutable_value();
- size_t len = key.length();
- if ((len > 4 && key.substr(len - 4).compare("port") == 0) ||
- (len > 5 && key.substr(len - 5).compare("count") == 0) ||
- (len > 6 && key.substr(len - 6).compare("length") == 0) ||
- key.compare(SRPC_HTTP_STATUS_CODE)== 0)
- {
- value->set_int_value(atoi(iter.second.c_str()));
- }
- else
- {
- value->set_string_value(iter.second);
- }
- }
- }
- }
- static size_t rpc_span_log_format(RPCModuleData& data, char *str, size_t len)
- {
- const uint64_t *trace_id = (const uint64_t *)data[SRPC_TRACE_ID].c_str();
- const uint64_t *span_id = (const uint64_t *)data[SRPC_SPAN_ID].c_str();
- char trace_id_buf[SRPC_TRACEID_SIZE * 2 + 1];
- char span_id_buf[SRPC_SPANID_SIZE * 2 + 1];
- TRACE_ID_BIN_TO_HEX(trace_id, trace_id_buf);
- SPAN_ID_BIN_TO_HEX(span_id, span_id_buf);
- size_t ret = snprintf(str, len, "trace_id: %s span_id: %s",
- trace_id_buf, span_id_buf);
- auto iter = data.find(SRPC_PARENT_SPAN_ID);
- if (iter != data.end())
- {
- char parent_span_id_buf[SRPC_SPANID_SIZE * 2 + 1];
- span_id = (const uint64_t *)iter->second.c_str();
- SPAN_ID_BIN_TO_HEX(span_id, parent_span_id_buf);
- ret += snprintf(str + ret, len - ret, " parent_span_id: %s",
- parent_span_id_buf);
- }
- ret += snprintf(str + ret, len - ret, " start_time: %s finish_time: %s"
- " duration: %s(ns)",
- data[SRPC_START_TIMESTAMP].c_str(),
- data[SRPC_FINISH_TIMESTAMP].c_str(),
- data[SRPC_DURATION].c_str());
- for (const auto& it : data)
- {
- if (strcmp(it.first.c_str(), SRPC_START_TIMESTAMP) == 0 ||
- strcmp(it.first.c_str(), SRPC_FINISH_TIMESTAMP) == 0 ||
- strcmp(it.first.c_str(), SRPC_DURATION) == 0 ||
- strcmp(it.first.c_str(), SRPC_TRACE_ID) == 0 ||
- strcmp(it.first.c_str(), SRPC_SPAN_ID) == 0 ||
- strcmp(it.first.c_str(), SRPC_PARENT_SPAN_ID) == 0)
- {
- continue;
- }
- if (strcmp(it.first.c_str(), SRPC_SPAN_LOG) == 0)
- {
- ret += snprintf(str + ret, len - ret,
- "\n%s trace_id: %s span_id: %s"
- " timestamp: %s %s",
- "[ANNOTATION]",
- trace_id_buf,
- span_id_buf,
- it.first.c_str() + strlen(SRPC_SPAN_LOG) + 1,
- it.second.c_str());
- }
- else
- {
- const char * key = it.first.c_str();
- if (it.first.compare(0, 5, "srpc.") == 0)
- key += 5;
- ret += snprintf(str + ret, len - ret, " %s: %s",
- key, it.second.c_str());
- }
- }
- return ret;
- }
- bool RPCTraceFilterPolicy::collect(RPCModuleData& span)
- {
- if (span.find(SRPC_TRACE_ID) == span.end())
- return false;
- long long timestamp = GET_CURRENT_MS();
- if (timestamp < this->last_collect_timestamp + this->stat_interval &&
- this->spans_interval_count < this->spans_per_interval &&
- this->spans_second_count < this->spans_per_sec)
- {
- this->spans_interval_count++;
- this->spans_second_count++;
- return true;
- }
- else if (timestamp >= this->last_collect_timestamp + this->stat_interval &&
- this->spans_per_sec)
- {
- this->spans_interval_count = 0;
- if (timestamp / 1000 > this->last_collect_timestamp / 1000) // next second
- this->spans_second_count = 0;
- this->last_collect_timestamp = timestamp;
- if (this->spans_second_count < this->spans_per_sec)
- {
- this->spans_second_count++;
- this->spans_interval_count++;
- return true;
- }
- }
- return false;
- }
- bool RPCTraceFilterPolicy::report(size_t count)
- {
- long long timestamp = GET_CURRENT_MS();
- if (this->last_report_timestamp == 0)
- this->last_report_timestamp = timestamp;
- if (timestamp > this->last_report_timestamp + (long long)this->report_interval ||
- count >= this->report_threshold)
- {
- this->last_report_timestamp = timestamp;
- return true;
- }
- return false;
- }
- void RPCTraceLogTask::dispatch()
- {
- char str[SPAN_LOG_MAX_LENGTH];
- rpc_span_log_format(this->span, str, SPAN_LOG_MAX_LENGTH);
- fprintf(stderr, "[SPAN_LOG] %s\n", str);
- this->subtask_done();
- }
- SubTask *RPCTraceRedis::create(RPCModuleData& span)
- {
- auto iter = span.find(SRPC_TRACE_ID);
- if (iter == span.end())
- return WFTaskFactory::create_empty_task();
- auto *task = WFTaskFactory::create_redis_task(this->redis_url,
- this->retry_max,
- nullptr);
- protocol::RedisRequest *req = task->get_req();
- char value[SPAN_LOG_MAX_LENGTH];
- value[0] = '0';
- rpc_span_log_format(span, value, SPAN_LOG_MAX_LENGTH);
- req->set_request("SET", { span[SRPC_TRACE_ID], value} );
- return task;
- }
- RPCTraceOpenTelemetry::RPCTraceOpenTelemetry(const std::string& url) :
- RPCFilter(RPCModuleTypeTrace),
- url(url + OTLP_TRACES_PATH),
- redirect_max(OTLP_HTTP_REDIRECT_MAX),
- retry_max(OTLP_HTTP_RETRY_MAX),
- filter_policy(SPANS_PER_SECOND_DEFAULT,
- RPC_REPORT_THREHOLD_DEFAULT,
- RPC_REPORT_INTERVAL_DEFAULT),
- report_status(false),
- report_span_count(0)
- {
- this->report_req = new TracesData;
- }
- RPCTraceOpenTelemetry::RPCTraceOpenTelemetry(const std::string& url,
- int redirect_max,
- int retry_max,
- size_t spans_per_second,
- size_t report_threshold,
- size_t report_interval) :
- RPCFilter(RPCModuleTypeTrace),
- url(url + OTLP_TRACES_PATH),
- redirect_max(redirect_max),
- retry_max(retry_max),
- filter_policy(spans_per_second, report_threshold, report_interval),
- report_status(false),
- report_span_count(0)
- {
- this->report_req = new TracesData;
- }
- RPCTraceOpenTelemetry::~RPCTraceOpenTelemetry()
- {
- delete this->report_req;
- }
- SubTask *RPCTraceOpenTelemetry::create(RPCModuleData& span)
- {
- std::string *output = new std::string;
- SubTask *next = NULL;
- TracesData *req = (TracesData *)this->report_req;
- this->mutex.lock();
- if (!this->report_status)
- next = WFTaskFactory::create_empty_task();
- else
- {
- req->SerializeToString(output);
- this->report_status = false;
- this->report_span_count = 0;
- req->clear_resource_spans();
- this->report_map.clear();
- }
- this->mutex.unlock();
- if (next)
- return next;
- WFHttpTask *task = WFTaskFactory::create_http_task(this->url,
- this->redirect_max,
- this->retry_max,
- [](WFHttpTask *task) {
- delete (std::string *)task->user_data;
- });
- protocol::HttpRequest *http_req = task->get_req();
- http_req->set_method(HttpMethodPost);
- http_req->add_header_pair("Content-Type", "application/x-protobuf");
- task->user_data = output;
- http_req->append_output_body_nocopy(output->c_str(), output->length());
- return task;
- }
- void RPCTraceOpenTelemetry::add_attributes(const std::string& key,
- const std::string& value)
- {
- this->mutex.lock();
- this->attributes.insert(std::make_pair(key, value));
- this->mutex.unlock();
- }
- size_t RPCTraceOpenTelemetry::clear_attributes()
- {
- size_t ret;
- this->mutex.lock();
- ret = this->attributes.size();
- this->attributes.clear();
- this->mutex.unlock();
- return ret;
- }
- bool RPCTraceOpenTelemetry::filter(RPCModuleData& data)
- {
- std::unordered_map<std::string, google::protobuf::Message *>::iterator it;
- InstrumentationLibrarySpans *spans;
- std::string service_name;
- bool ret;
- auto iter = data.find(OTLP_SERVICE_NAME);
- if (iter != data.end())
- {
- service_name = iter->second;
- }
- else // for HTTP
- {
- service_name = data[SRPC_COMPONENT] + std::string(".") +
- data[SRPC_HTTP_SCHEME];
- if (data.find(SRPC_SPAN_KIND_CLIENT) != data.end())
- service_name += ".client";
- else
- service_name += ".server";
- }
- this->mutex.lock();
- if (this->filter_policy.collect(data))
- {
- ++this->report_span_count;
- it = this->report_map.find(service_name);
- if (it == this->report_map.end())
- {
- spans = rpc_span_fill_pb_request(data, this->attributes,
- (TracesData *)this->report_req);
- this->report_map.insert({service_name, spans});
- }
- else
- spans = (InstrumentationLibrarySpans *)it->second;
- rpc_span_fill_pb_span(data, spans);
- }
- ret = this->filter_policy.report(this->report_span_count);
- if (ret)
- this->report_status = true;
- this->mutex.unlock();
- return ret;
- }
- } // end namespace srpc
|