rpc_trace_filter.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. #include <stdio.h>
  2. #include <limits.h>
  3. #include "workflow/WFTask.h"
  4. #include "workflow/HttpUtil.h"
  5. #include "rpc_trace_filter.h"
  6. #include "opentelemetry_trace.pb.h"
  7. namespace srpc
  8. {
  9. using namespace opentelemetry::proto::trace::v1;
  10. using namespace opentelemetry::proto::common::v1;
  11. using namespace opentelemetry::proto::resource::v1;
  12. static InstrumentationLibrarySpans *
  13. rpc_span_fill_pb_request(const RPCModuleData& data,
  14. const std::unordered_map<std::string, std::string>& attributes,
  15. TracesData *req)
  16. {
  17. ResourceSpans *rs = req->add_resource_spans();
  18. InstrumentationLibrarySpans *spans = rs->add_instrumentation_library_spans();
  19. Resource *resource = rs->mutable_resource();
  20. KeyValue *attribute;
  21. AnyValue *value;
  22. auto iter = data.find(OTLP_METHOD_NAME);
  23. if (iter != data.end())
  24. {
  25. attribute = resource->add_attributes();
  26. attribute->set_key(OTLP_METHOD_NAME);
  27. value = attribute->mutable_value();
  28. value->set_string_value(iter->second);
  29. }
  30. for (const auto& attr : attributes)
  31. {
  32. KeyValue *attribute = resource->add_attributes();
  33. attribute->set_key(attr.first);
  34. AnyValue *value = attribute->mutable_value();
  35. value->set_string_value(attr.second);
  36. }
  37. iter = data.find(OTLP_SERVICE_NAME); // if attributes also set service.name, data takes precedence
  38. if (iter != data.end())
  39. {
  40. attribute = resource->add_attributes();
  41. attribute->set_key(OTLP_SERVICE_NAME);
  42. value = attribute->mutable_value();
  43. value->set_string_value(iter->second);
  44. }
  45. return spans;
  46. }
  47. static void rpc_span_fill_pb_span(RPCModuleData& data,
  48. InstrumentationLibrarySpans *spans)
  49. {
  50. Span *span = spans->add_spans();
  51. Status *status = span->mutable_status();
  52. KeyValue *attribute;
  53. AnyValue *value;
  54. span->set_span_id(data[SRPC_SPAN_ID].c_str(), SRPC_SPANID_SIZE);
  55. span->set_trace_id(data[SRPC_TRACE_ID].c_str(), SRPC_TRACEID_SIZE);
  56. // name is required and specified in OpenTelemetry semantic conventions.
  57. auto iter = data.find(OTLP_METHOD_NAME);
  58. if (iter != data.end())
  59. span->set_name(data[OTLP_METHOD_NAME]); // for RPC
  60. else
  61. span->set_name(data[SRPC_HTTP_METHOD]); // for HTTP
  62. // refer to : trace/semantic_conventions/http/#status
  63. int http_status_code = 0;
  64. iter = data.find(SRPC_HTTP_STATUS_CODE);
  65. if (iter != data.end())
  66. http_status_code = atoi(data[SRPC_HTTP_STATUS_CODE].c_str());
  67. for (const auto& iter : data)
  68. {
  69. const std::string& key = iter.first;
  70. if (key.compare(SRPC_PARENT_SPAN_ID) == 0)
  71. {
  72. span->set_parent_span_id(iter.second);
  73. }
  74. else if (key.compare(SRPC_SPAN_KIND) == 0)
  75. {
  76. if (iter.second.compare(SRPC_SPAN_KIND_CLIENT) == 0)
  77. {
  78. span->set_kind(Span_SpanKind_SPAN_KIND_CLIENT);
  79. if (http_status_code >= 400)
  80. status->set_code(Status_StatusCode_STATUS_CODE_ERROR);
  81. }
  82. else if (iter.second.compare(SRPC_SPAN_KIND_SERVER) == 0)
  83. {
  84. span->set_kind(Span_SpanKind_SPAN_KIND_SERVER);
  85. if (http_status_code >= 500)
  86. status->set_code(Status_StatusCode_STATUS_CODE_ERROR);
  87. }
  88. }
  89. else if (key.compare(SRPC_START_TIMESTAMP) == 0)
  90. {
  91. span->set_start_time_unix_nano(atoll(data[SRPC_START_TIMESTAMP].data()));
  92. }
  93. else if (key.compare(SRPC_FINISH_TIMESTAMP) == 0)
  94. {
  95. span->set_end_time_unix_nano(atoll(data[SRPC_FINISH_TIMESTAMP].data()));
  96. }
  97. else if (key.compare(0, 5, "srpc.") != 0)
  98. {
  99. attribute= span->add_attributes();
  100. attribute->set_key(key);
  101. value = attribute->mutable_value();
  102. size_t len = key.length();
  103. if ((len > 4 && key.substr(len - 4).compare("port") == 0) ||
  104. (len > 5 && key.substr(len - 5).compare("count") == 0) ||
  105. (len > 6 && key.substr(len - 6).compare("length") == 0) ||
  106. key.compare(SRPC_HTTP_STATUS_CODE)== 0)
  107. {
  108. value->set_int_value(atoi(iter.second.c_str()));
  109. }
  110. else
  111. {
  112. value->set_string_value(iter.second);
  113. }
  114. }
  115. }
  116. }
  117. static size_t rpc_span_log_format(RPCModuleData& data, char *str, size_t len)
  118. {
  119. const uint64_t *trace_id = (const uint64_t *)data[SRPC_TRACE_ID].c_str();
  120. const uint64_t *span_id = (const uint64_t *)data[SRPC_SPAN_ID].c_str();
  121. char trace_id_buf[SRPC_TRACEID_SIZE * 2 + 1];
  122. char span_id_buf[SRPC_SPANID_SIZE * 2 + 1];
  123. TRACE_ID_BIN_TO_HEX(trace_id, trace_id_buf);
  124. SPAN_ID_BIN_TO_HEX(span_id, span_id_buf);
  125. size_t ret = snprintf(str, len, "trace_id: %s span_id: %s",
  126. trace_id_buf, span_id_buf);
  127. auto iter = data.find(SRPC_PARENT_SPAN_ID);
  128. if (iter != data.end())
  129. {
  130. char parent_span_id_buf[SRPC_SPANID_SIZE * 2 + 1];
  131. span_id = (const uint64_t *)iter->second.c_str();
  132. SPAN_ID_BIN_TO_HEX(span_id, parent_span_id_buf);
  133. ret += snprintf(str + ret, len - ret, " parent_span_id: %s",
  134. parent_span_id_buf);
  135. }
  136. ret += snprintf(str + ret, len - ret, " start_time: %s finish_time: %s"
  137. " duration: %s(ns)",
  138. data[SRPC_START_TIMESTAMP].c_str(),
  139. data[SRPC_FINISH_TIMESTAMP].c_str(),
  140. data[SRPC_DURATION].c_str());
  141. for (const auto& it : data)
  142. {
  143. if (strcmp(it.first.c_str(), SRPC_START_TIMESTAMP) == 0 ||
  144. strcmp(it.first.c_str(), SRPC_FINISH_TIMESTAMP) == 0 ||
  145. strcmp(it.first.c_str(), SRPC_DURATION) == 0 ||
  146. strcmp(it.first.c_str(), SRPC_TRACE_ID) == 0 ||
  147. strcmp(it.first.c_str(), SRPC_SPAN_ID) == 0 ||
  148. strcmp(it.first.c_str(), SRPC_PARENT_SPAN_ID) == 0)
  149. {
  150. continue;
  151. }
  152. if (strcmp(it.first.c_str(), SRPC_SPAN_LOG) == 0)
  153. {
  154. ret += snprintf(str + ret, len - ret,
  155. "\n%s trace_id: %s span_id: %s"
  156. " timestamp: %s %s",
  157. "[ANNOTATION]",
  158. trace_id_buf,
  159. span_id_buf,
  160. it.first.c_str() + strlen(SRPC_SPAN_LOG) + 1,
  161. it.second.c_str());
  162. }
  163. else
  164. {
  165. const char * key = it.first.c_str();
  166. if (it.first.compare(0, 5, "srpc.") == 0)
  167. key += 5;
  168. ret += snprintf(str + ret, len - ret, " %s: %s",
  169. key, it.second.c_str());
  170. }
  171. }
  172. return ret;
  173. }
  174. bool RPCTraceFilterPolicy::collect(RPCModuleData& span)
  175. {
  176. if (span.find(SRPC_TRACE_ID) == span.end())
  177. return false;
  178. long long timestamp = GET_CURRENT_MS();
  179. if (timestamp < this->last_collect_timestamp + this->stat_interval &&
  180. this->spans_interval_count < this->spans_per_interval &&
  181. this->spans_second_count < this->spans_per_sec)
  182. {
  183. this->spans_interval_count++;
  184. this->spans_second_count++;
  185. return true;
  186. }
  187. else if (timestamp >= this->last_collect_timestamp + this->stat_interval &&
  188. this->spans_per_sec)
  189. {
  190. this->spans_interval_count = 0;
  191. if (timestamp / 1000 > this->last_collect_timestamp / 1000) // next second
  192. this->spans_second_count = 0;
  193. this->last_collect_timestamp = timestamp;
  194. if (this->spans_second_count < this->spans_per_sec)
  195. {
  196. this->spans_second_count++;
  197. this->spans_interval_count++;
  198. return true;
  199. }
  200. }
  201. return false;
  202. }
  203. bool RPCTraceFilterPolicy::report(size_t count)
  204. {
  205. long long timestamp = GET_CURRENT_MS();
  206. if (this->last_report_timestamp == 0)
  207. this->last_report_timestamp = timestamp;
  208. if (timestamp > this->last_report_timestamp + (long long)this->report_interval ||
  209. count >= this->report_threshold)
  210. {
  211. this->last_report_timestamp = timestamp;
  212. return true;
  213. }
  214. return false;
  215. }
  216. void RPCTraceLogTask::dispatch()
  217. {
  218. char str[SPAN_LOG_MAX_LENGTH];
  219. rpc_span_log_format(this->span, str, SPAN_LOG_MAX_LENGTH);
  220. fprintf(stderr, "[SPAN_LOG] %s\n", str);
  221. this->subtask_done();
  222. }
  223. SubTask *RPCTraceRedis::create(RPCModuleData& span)
  224. {
  225. auto iter = span.find(SRPC_TRACE_ID);
  226. if (iter == span.end())
  227. return WFTaskFactory::create_empty_task();
  228. auto *task = WFTaskFactory::create_redis_task(this->redis_url,
  229. this->retry_max,
  230. nullptr);
  231. protocol::RedisRequest *req = task->get_req();
  232. char value[SPAN_LOG_MAX_LENGTH];
  233. value[0] = '0';
  234. rpc_span_log_format(span, value, SPAN_LOG_MAX_LENGTH);
  235. req->set_request("SET", { span[SRPC_TRACE_ID], value} );
  236. return task;
  237. }
  238. RPCTraceOpenTelemetry::RPCTraceOpenTelemetry(const std::string& url) :
  239. RPCFilter(RPCModuleTypeTrace),
  240. url(url + OTLP_TRACES_PATH),
  241. redirect_max(OTLP_HTTP_REDIRECT_MAX),
  242. retry_max(OTLP_HTTP_RETRY_MAX),
  243. filter_policy(SPANS_PER_SECOND_DEFAULT,
  244. RPC_REPORT_THREHOLD_DEFAULT,
  245. RPC_REPORT_INTERVAL_DEFAULT),
  246. report_status(false),
  247. report_span_count(0)
  248. {
  249. this->report_req = new TracesData;
  250. }
  251. RPCTraceOpenTelemetry::RPCTraceOpenTelemetry(const std::string& url,
  252. int redirect_max,
  253. int retry_max,
  254. size_t spans_per_second,
  255. size_t report_threshold,
  256. size_t report_interval) :
  257. RPCFilter(RPCModuleTypeTrace),
  258. url(url + OTLP_TRACES_PATH),
  259. redirect_max(redirect_max),
  260. retry_max(retry_max),
  261. filter_policy(spans_per_second, report_threshold, report_interval),
  262. report_status(false),
  263. report_span_count(0)
  264. {
  265. this->report_req = new TracesData;
  266. }
  267. RPCTraceOpenTelemetry::~RPCTraceOpenTelemetry()
  268. {
  269. delete this->report_req;
  270. }
  271. SubTask *RPCTraceOpenTelemetry::create(RPCModuleData& span)
  272. {
  273. std::string *output = new std::string;
  274. SubTask *next = NULL;
  275. TracesData *req = (TracesData *)this->report_req;
  276. this->mutex.lock();
  277. if (!this->report_status)
  278. next = WFTaskFactory::create_empty_task();
  279. else
  280. {
  281. req->SerializeToString(output);
  282. this->report_status = false;
  283. this->report_span_count = 0;
  284. req->clear_resource_spans();
  285. this->report_map.clear();
  286. }
  287. this->mutex.unlock();
  288. if (next)
  289. return next;
  290. WFHttpTask *task = WFTaskFactory::create_http_task(this->url,
  291. this->redirect_max,
  292. this->retry_max,
  293. [](WFHttpTask *task) {
  294. delete (std::string *)task->user_data;
  295. });
  296. protocol::HttpRequest *http_req = task->get_req();
  297. http_req->set_method(HttpMethodPost);
  298. http_req->add_header_pair("Content-Type", "application/x-protobuf");
  299. task->user_data = output;
  300. http_req->append_output_body_nocopy(output->c_str(), output->length());
  301. return task;
  302. }
  303. void RPCTraceOpenTelemetry::add_attributes(const std::string& key,
  304. const std::string& value)
  305. {
  306. this->mutex.lock();
  307. this->attributes.insert(std::make_pair(key, value));
  308. this->mutex.unlock();
  309. }
  310. size_t RPCTraceOpenTelemetry::clear_attributes()
  311. {
  312. size_t ret;
  313. this->mutex.lock();
  314. ret = this->attributes.size();
  315. this->attributes.clear();
  316. this->mutex.unlock();
  317. return ret;
  318. }
  319. bool RPCTraceOpenTelemetry::filter(RPCModuleData& data)
  320. {
  321. std::unordered_map<std::string, google::protobuf::Message *>::iterator it;
  322. InstrumentationLibrarySpans *spans;
  323. std::string service_name;
  324. bool ret;
  325. auto iter = data.find(OTLP_SERVICE_NAME);
  326. if (iter != data.end())
  327. {
  328. service_name = iter->second;
  329. }
  330. else // for HTTP
  331. {
  332. service_name = data[SRPC_COMPONENT] + std::string(".") +
  333. data[SRPC_HTTP_SCHEME];
  334. if (data.find(SRPC_SPAN_KIND_CLIENT) != data.end())
  335. service_name += ".client";
  336. else
  337. service_name += ".server";
  338. }
  339. this->mutex.lock();
  340. if (this->filter_policy.collect(data))
  341. {
  342. ++this->report_span_count;
  343. it = this->report_map.find(service_name);
  344. if (it == this->report_map.end())
  345. {
  346. spans = rpc_span_fill_pb_request(data, this->attributes,
  347. (TracesData *)this->report_req);
  348. this->report_map.insert({service_name, spans});
  349. }
  350. else
  351. spans = (InstrumentationLibrarySpans *)it->second;
  352. rpc_span_fill_pb_span(data, spans);
  353. }
  354. ret = this->filter_policy.report(this->report_span_count);
  355. if (ret)
  356. this->report_status = true;
  357. this->mutex.unlock();
  358. return ret;
  359. }
  360. } // end namespace srpc