rpc_metrics_filter.cc 23 KB


  1. /*
  2. Copyright (c) 2023 Sogou, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #include <stdio.h>
  14. #include <cfloat>
  15. #include <string>
  16. #include <set>
  17. #include <vector>
  18. #include <mutex>
  19. #include <chrono>
  20. #include "workflow/HttpMessage.h"
  21. #include "workflow/HttpUtil.h"
  22. #include "workflow/WFTaskFactory.h"
  23. #include "workflow/WFHttpServer.h"
  24. #include "rpc_basic.h"
  25. #include "rpc_var.h"
  26. #include "rpc_metrics_filter.h"
  27. #include "opentelemetry_metrics_service.pb.h"
  28. namespace srpc
  29. {
  30. static constexpr size_t METRICS_DEFAULT_MAX_AGE = 60;
  31. static constexpr size_t METRICS_DEFAULT_AGE_BUCKET = 5;
  32. static constexpr const char *METRICS_PULL_METRICS_PATH = "/metrics";
  33. static constexpr const char *OTLP_METRICS_PATH = "/v1/metrics";
  34. static constexpr const char *METRICS_REQUEST_COUNT = "total_request_count";
  35. static constexpr const char *METRICS_REQUEST_METHOD = "total_request_method";
  36. static constexpr const char *METRICS_REQUEST_LATENCY = "total_request_latency";
  37. //static constexpr const char *METRICS_REQUEST_SIZE = "total_request_size";
  38. //static constexpr const char *METRICS_RESPONSE_SIZE = "total_response_size";
  39. using namespace opentelemetry::proto::collector::metrics::v1;
  40. using namespace opentelemetry::proto::metrics::v1;
  41. using namespace opentelemetry::proto::common::v1;
  42. using namespace opentelemetry::proto::resource::v1;
  43. RPCMetricsFilter::RPCMetricsFilter() :
  44. RPCFilter(RPCModuleTypeMetrics)
  45. {
  46. this->create_gauge(METRICS_REQUEST_COUNT, "total request count");
  47. this->create_counter(METRICS_REQUEST_METHOD, "request method statistics");
  48. // this->create_histogram(METRICS_REQUEST_SIZE, "total request body size",
  49. // {256, 512, 1024, 16384});
  50. // this->create_histogram(METRICS_RESPONSE_SIZE, "total response body size",
  51. // {256, 512, 1024, 16384});
  52. this->create_summary(METRICS_REQUEST_LATENCY, "request latency nano seconds",
  53. {{0.5, 0.05}, {0.9, 0.01}});
  54. }
  55. RPCMetricsFilter::RPCMetricsFilter(const std::string &name) :
  56. RPCFilter(RPCModuleTypeMetrics)
  57. {
  58. this->create_gauge(METRICS_REQUEST_COUNT, "total request count");
  59. this->create_counter(METRICS_REQUEST_METHOD, "request method statistics");
  60. this->create_summary(METRICS_REQUEST_LATENCY, "request latency nano seconds",
  61. {{0.5, 0.05}, {0.9, 0.01}});
  62. }
  63. bool RPCMetricsFilter::client_end(SubTask *task, RPCModuleData& data)
  64. {
  65. this->gauge(METRICS_REQUEST_COUNT)->increase();
  66. this->counter(METRICS_REQUEST_METHOD)->increase(
  67. {{"service", data[OTLP_SERVICE_NAME]},
  68. {"method", data[OTLP_METHOD_NAME] }});
  69. this->summary(METRICS_REQUEST_LATENCY)->observe(atoll(data[SRPC_DURATION].data()));
  70. return true;
  71. }
  72. bool RPCMetricsFilter::server_end(SubTask *task, RPCModuleData& data)
  73. {
  74. this->gauge(METRICS_REQUEST_COUNT)->increase();
  75. this->counter(METRICS_REQUEST_METHOD)->increase(
  76. {{"service", data[OTLP_SERVICE_NAME]},
  77. {"method", data[OTLP_METHOD_NAME] }});
  78. this->summary(METRICS_REQUEST_LATENCY)->observe(atoll(data[SRPC_DURATION].data()));
  79. return true;
  80. }
  81. GaugeVar *RPCMetricsFilter::gauge(const std::string& name)
  82. {
  83. std::string var_name = this->get_name() + name;
  84. return RPCVarFactory::gauge(name);
  85. }
  86. CounterVar *RPCMetricsFilter::counter(const std::string& name)
  87. {
  88. std::string var_name = this->get_name() + name;
  89. return RPCVarFactory::counter(name);
  90. }
  91. HistogramVar *RPCMetricsFilter::histogram(const std::string& name)
  92. {
  93. std::string var_name = this->get_name() + name;
  94. return RPCVarFactory::histogram(name);
  95. }
  96. SummaryVar *RPCMetricsFilter::summary(const std::string& name)
  97. {
  98. std::string var_name = this->get_name() + name;
  99. return RPCVarFactory::summary(name);
  100. }
  101. HistogramCounterVar *RPCMetricsFilter::histogram_counter(const std::string &name)
  102. {
  103. std::string var_name = this->get_name() + name;
  104. return RPCVarFactory::histogram_counter(var_name);
  105. }
  106. GaugeVar *RPCMetricsFilter::create_gauge(const std::string& str,
  107. const std::string& help)
  108. {
  109. if (RPCVarFactory::check_name_format(str) == false)
  110. {
  111. errno = EINVAL;
  112. return NULL;
  113. }
  114. std::string name = this->get_name() + str;
  115. this->mutex.lock();
  116. const auto it = var_names.insert(name);
  117. this->mutex.unlock();
  118. if (!it.second)
  119. {
  120. errno = EEXIST;
  121. return NULL;
  122. }
  123. GaugeVar *gauge = new GaugeVar(name, help);
  124. RPCVarLocal::get_instance()->add(name, gauge);
  125. return gauge;
  126. }
  127. CounterVar *RPCMetricsFilter::create_counter(const std::string& str,
  128. const std::string& help)
  129. {
  130. if (RPCVarFactory::check_name_format(str) == false)
  131. {
  132. errno = EINVAL;
  133. return NULL;
  134. }
  135. std::string name = this->get_name() + str;
  136. this->mutex.lock();
  137. const auto it = var_names.insert(name);
  138. this->mutex.unlock();
  139. if (!it.second)
  140. {
  141. errno = EEXIST;
  142. return NULL;
  143. }
  144. CounterVar *counter = new CounterVar(name, help);
  145. RPCVarLocal::get_instance()->add(name, counter);
  146. return counter;
  147. }
  148. HistogramVar *RPCMetricsFilter::create_histogram(const std::string& str,
  149. const std::string& help,
  150. const std::vector<double>& bucket)
  151. {
  152. if (RPCVarFactory::check_name_format(str) == false)
  153. {
  154. errno = EINVAL;
  155. return NULL;
  156. }
  157. std::string name = this->get_name() + str;
  158. this->mutex.lock();
  159. const auto it = var_names.insert(name);
  160. this->mutex.unlock();
  161. if (!it.second)
  162. {
  163. errno = EEXIST;
  164. return NULL;
  165. }
  166. HistogramVar *histogram = new HistogramVar(name, help, bucket);
  167. RPCVarLocal::get_instance()->add(name, histogram);
  168. return histogram;
  169. }
  170. SummaryVar *RPCMetricsFilter::create_summary(const std::string& str,
  171. const std::string& help,
  172. const std::vector<struct Quantile>& quantile)
  173. {
  174. if (RPCVarFactory::check_name_format(str) == false)
  175. {
  176. errno = EINVAL;
  177. return NULL;
  178. }
  179. std::string name = this->get_name() + str;
  180. this->mutex.lock();
  181. const auto it = var_names.insert(name);
  182. this->mutex.unlock();
  183. if (!it.second)
  184. {
  185. errno = EEXIST;
  186. return NULL;
  187. }
  188. SummaryVar *summary = new SummaryVar(name, help, quantile,
  189. std::chrono::seconds(METRICS_DEFAULT_MAX_AGE),
  190. METRICS_DEFAULT_AGE_BUCKET);
  191. RPCVarLocal::get_instance()->add(name, summary);
  192. return summary;
  193. }
  194. SummaryVar *RPCMetricsFilter::create_summary(const std::string& str,
  195. const std::string& help,
  196. const std::vector<struct Quantile>& quantile,
  197. const std::chrono::milliseconds max_age,
  198. int age_bucket)
  199. {
  200. if (RPCVarFactory::check_name_format(str) == false)
  201. {
  202. errno = EINVAL;
  203. return NULL;
  204. }
  205. std::string name = this->get_name() + str;
  206. this->mutex.lock();
  207. const auto it = var_names.insert(name);
  208. this->mutex.unlock();
  209. if (!it.second)
  210. {
  211. errno = EEXIST;
  212. return NULL;
  213. }
  214. SummaryVar *summary = new SummaryVar(name, help, quantile, max_age, age_bucket);
  215. RPCVarLocal::get_instance()->add(name, summary);
  216. return summary;
  217. }
  218. HistogramCounterVar *RPCMetricsFilter::create_histogram_counter(const std::string &str,
  219. const std::string &help,
  220. const std::vector<double> &bucket)
  221. {
  222. if (RPCVarFactory::check_name_format(str) == false)
  223. {
  224. errno = EINVAL;
  225. return NULL;
  226. }
  227. std::string name = this->get_name() + str;
  228. this->mutex.lock();
  229. const auto it = var_names.insert(name);
  230. this->mutex.unlock();
  231. if (!it.second)
  232. {
  233. errno = EEXIST;
  234. return NULL;
  235. }
  236. HistogramCounterVar *hc = new HistogramCounterVar(name, help, bucket);
  237. RPCVarLocal::get_instance()->add(name, hc);
  238. return hc;
  239. }
  240. void RPCMetricsFilter::reduce(std::unordered_map<std::string, RPCVar *>& out)
  241. {
  242. std::unordered_map<std::string, RPCVar *>::iterator it;
  243. RPCVarGlobal *global_var = RPCVarGlobal::get_instance();
  244. global_var->mutex.lock();
  245. for (RPCVarLocal *local : global_var->local_vars)
  246. {
  247. local->mutex.lock();
  248. for (it = local->vars.begin(); it != local->vars.end(); it++)
  249. {
  250. if (this->var_names.find(it->first) == this->var_names.end())
  251. continue;
  252. if (out.find(it->first) == out.end())
  253. out.insert(std::make_pair(it->first, it->second->create(true)));
  254. else
  255. out[it->first]->reduce(it->second->get_data(),
  256. it->second->get_size());
  257. }
  258. local->mutex.unlock();
  259. }
  260. global_var->mutex.unlock();
  261. }
  262. void RPCMetricsFilter::reset()
  263. {
  264. std::unordered_map<std::string, RPCVar *>::iterator it;
  265. RPCVarGlobal *global_var = RPCVarGlobal::get_instance();
  266. global_var->mutex.lock();
  267. for (RPCVarLocal *local : global_var->local_vars)
  268. {
  269. local->mutex.lock();
  270. for (it = local->vars.begin(); it != local->vars.end(); it++)
  271. {
  272. if (this->var_names.find(it->first) == this->var_names.end())
  273. continue;
  274. it->second->reset();
  275. }
  276. local->mutex.unlock();
  277. }
  278. global_var->mutex.unlock();
  279. }
  280. RPCMetricsPull::RPCMetricsPull() :
  281. collector(this->report_output),
  282. server(std::bind(&RPCMetricsPull::pull, this, std::placeholders::_1))
  283. {
  284. }
  285. bool RPCMetricsPull::init(unsigned short port)
  286. {
  287. return this->server.start(port) == 0;
  288. }
  289. void RPCMetricsPull::deinit()
  290. {
  291. this->server.stop();
  292. }
  293. void RPCMetricsPull::pull(WFHttpTask *task)
  294. {
  295. if (strcmp(task->get_req()->get_request_uri(), METRICS_PULL_METRICS_PATH))
  296. return;
  297. this->mutex.lock();
  298. this->expose();
  299. task->get_resp()->append_output_body(std::move(this->report_output));
  300. this->report_output.clear();
  301. this->mutex.unlock();
  302. }
  303. bool RPCMetricsPull::expose()
  304. {
  305. std::unordered_map<std::string, RPCVar *> tmp;
  306. std::unordered_map<std::string, RPCVar *>::iterator it;
  307. std::string output;
  308. this->reduce(tmp);
  309. for (it = tmp.begin(); it != tmp.end(); it++)
  310. {
  311. RPCVar *var = it->second;
  312. this->report_output += "# HELP " + var->get_name() + " " +
  313. var->get_help() + "\n# TYPE " + var->get_name() +
  314. " " + var->get_type_str() + "\n";
  315. output.clear();
  316. var->collect(&this->collector);
  317. this->report_output += output;
  318. }
  319. for (it = tmp.begin(); it != tmp.end(); it++)
  320. delete it->second;
  321. return true;
  322. }
  323. void RPCMetricsPull::Collector::collect_gauge(RPCVar *gauge, double data)
  324. {
  325. this->report_output += gauge->get_name() + " " + std::to_string(data) + "\n";
  326. }
  327. void RPCMetricsPull::Collector::collect_counter_each(RPCVar *counter,
  328. const std::string& label,
  329. double data)
  330. {
  331. this->report_output += counter->get_name() + "{" + label + "} " +
  332. std::to_string(data) + "\n";
  333. }
  334. void RPCMetricsPull::Collector::collect_histogram_each(RPCVar *histogram,
  335. double bucket_boundary,
  336. size_t current_count)
  337. {
  338. this->report_output += histogram->get_name();
  339. if (bucket_boundary != DBL_MAX)
  340. {
  341. this->report_output += "_bucket{le=\"" +
  342. std::to_string(bucket_boundary) + "\"} ";
  343. }
  344. else
  345. this->report_output += "_bucket{le=\"+Inf\"} ";
  346. this->report_output += std::to_string(current_count) + "\n";
  347. }
  348. void RPCMetricsPull::Collector::collect_histogram_end(RPCVar *histogram,
  349. double sum,
  350. size_t count)
  351. {
  352. this->report_output += histogram->get_name() + "_sum " +
  353. std::to_string(sum) + "\n";
  354. this->report_output += histogram->get_name() + "_count " +
  355. std::to_string(count) + "\n";
  356. }
  357. void RPCMetricsPull::Collector::collect_summary_each(RPCVar *summary,
  358. double quantile,
  359. double quantile_out)
  360. {
  361. this->report_output += summary->get_name() + "{quantile=\"" +
  362. std::to_string(quantile) + "\"} ";
  363. if (quantile_out == 0)
  364. this->report_output += "NaN";
  365. else
  366. this->report_output += std::to_string(quantile_out);
  367. this->report_output += "\n";
  368. }
  369. void RPCMetricsPull::Collector::collect_summary_end(RPCVar *summary,
  370. double sum,
  371. size_t count)
  372. {
  373. this->report_output += summary->get_name() + "_sum " +
  374. std::to_string(sum) + "\n";
  375. this->report_output += summary->get_name() + "_count " +
  376. std::to_string(count) + "\n";
  377. }
  378. bool RPCFilterPolicy::report(size_t count)
  379. {
  380. bool ret = false;
  381. long long timestamp = GET_CURRENT_MS();
  382. if (this->last_report_timestamp == 0)
  383. this->last_report_timestamp = timestamp;
  384. if ((timestamp > this->last_report_timestamp +
  385. (long long)this->report_interval ||
  386. count >= this->report_threshold) &&
  387. this->reporting == false)
  388. {
  389. this->mutex.lock();
  390. if (this->reporting == false)
  391. {
  392. this->reporting = true;
  393. this->last_report_timestamp = timestamp;
  394. ret = true;
  395. }
  396. this->mutex.unlock();
  397. }
  398. return ret;
  399. }
  400. void RPCFilterPolicy::set_reporting(bool flag)
  401. {
  402. this->mutex.lock();
  403. this->reporting = flag;
  404. this->mutex.unlock();
  405. }
  406. RPCMetricsOTel::RPCMetricsOTel(const std::string &name,
  407. const std::string& url,
  408. unsigned int redirect_max,
  409. unsigned int retry_max,
  410. size_t report_threshold,
  411. size_t report_interval_msec) :
  412. RPCMetricsFilter(name),
  413. url(url + OTLP_METRICS_PATH),
  414. redirect_max(redirect_max),
  415. retry_max(retry_max),
  416. policy(report_threshold, report_interval_msec),
  417. report_counts(0)
  418. {
  419. }
  420. RPCMetricsOTel::RPCMetricsOTel(const std::string &name,
  421. const std::string& url) :
  422. RPCMetricsFilter(name),
  423. url(url + OTLP_METRICS_PATH),
  424. redirect_max(OTLP_HTTP_REDIRECT_MAX),
  425. retry_max(OTLP_HTTP_RETRY_MAX),
  426. policy(RPC_REPORT_THREHOLD_DEFAULT, RPC_REPORT_INTERVAL_DEFAULT),
  427. report_counts(0)
  428. {
  429. }
  430. RPCMetricsOTel::Collector::~Collector()
  431. {
  432. for (const auto& kv : this->label_map)
  433. delete kv.second;
  434. }
  435. void RPCMetricsOTel::add_attributes(const std::string& key,
  436. const std::string& value)
  437. {
  438. this->mutex.lock();
  439. this->attributes.insert(std::make_pair(key, value));
  440. this->mutex.unlock();
  441. }
  442. size_t RPCMetricsOTel::clear_attributes()
  443. {
  444. size_t ret;
  445. this->mutex.lock();
  446. ret = this->attributes.size();
  447. this->attributes.clear();
  448. this->mutex.unlock();
  449. return ret;
  450. }
  451. SubTask *RPCMetricsOTel::create(RPCModuleData& data)
  452. {
  453. std::string *output = new std::string;
  454. ExportMetricsServiceRequest req;
  455. ResourceMetrics *rm = req.add_resource_metrics();
  456. Resource *resource = rm->mutable_resource();
  457. ScopeMetrics *metrics = rm->add_scope_metrics();
  458. KeyValue *attribute;
  459. AnyValue *value;
  460. this->report_counts = 0; // this is not very strict but acceptable
  461. InstrumentationScope *scope = metrics->mutable_scope();
  462. scope->set_name(this->scope_name);
  463. auto iter = data.find(OTLP_METHOD_NAME);
  464. if (iter != data.end())
  465. {
  466. attribute = resource->add_attributes();
  467. attribute->set_key(OTLP_METHOD_NAME);
  468. value = attribute->mutable_value();
  469. value->set_string_value(iter->second);
  470. }
  471. this->mutex.lock();
  472. for (const auto& attr : this->attributes)
  473. {
  474. KeyValue *attribute = resource->add_attributes();
  475. attribute->set_key(attr.first);
  476. AnyValue *value = attribute->mutable_value();
  477. value->set_string_value(attr.second);
  478. }
  479. this->mutex.unlock();
  480. iter = data.find(OTLP_SERVICE_NAME); // if attributes also set service.name, data takes precedence
  481. if (iter != data.end())
  482. {
  483. attribute = resource->add_attributes();
  484. attribute->set_key(OTLP_SERVICE_NAME);
  485. value = attribute->mutable_value();
  486. value->set_string_value(iter->second);
  487. }
  488. this->expose(metrics);
  489. // fprintf(stderr, "[Metrics info to report]\n%s\n", req.DebugString().c_str());
  490. req.SerializeToString(output);
  491. this->policy.set_reporting(false);
  492. WFHttpTask *task = WFTaskFactory::create_http_task(this->url,
  493. this->redirect_max,
  494. this->retry_max,
  495. [](WFHttpTask *task) {
  496. /*
  497. protocol::HttpResponse *resp = task->get_resp();
  498. fprintf(stderr, "[metrics report callback] state=%d error=%d\n",
  499. task->get_state(), task->get_error());
  500. if (task->get_state() == WFT_STATE_SUCCESS)
  501. {
  502. fprintf(stderr, "%s %s %s\r\n", resp->get_http_version(),
  503. resp->get_status_code(), resp->get_reason_phrase());
  504. }
  505. */
  506. delete (std::string *)task->user_data;
  507. });
  508. protocol::HttpRequest *http_req = task->get_req();
  509. http_req->set_method(HttpMethodPost);
  510. http_req->add_header_pair("Content-Type", "application/x-protobuf");
  511. task->user_data = output;
  512. http_req->append_output_body_nocopy(output->c_str(), output->length());
  513. return task;
  514. }
  515. bool RPCMetricsOTel::expose(google::protobuf::Message *msg)
  516. {
  517. std::unordered_map<std::string, RPCVar *> tmp;
  518. std::unordered_map<std::string, RPCVar *>::iterator it;
  519. ScopeMetrics *metrics;
  520. Histogram *report_histogram;
  521. metrics = static_cast<ScopeMetrics *>(msg);
  522. this->reduce(tmp);
  523. this->collector.set_current_nano(GET_CURRENT_NS());
  524. for (it = tmp.begin(); it != tmp.end(); it++)
  525. {
  526. RPCVar *var = it->second;
  527. Metric *m = metrics->add_metrics();
  528. google::protobuf::Message *current_var;
  529. m->set_name(RPCFilter::raw_var_name(var->get_name()));
  530. m->set_description(var->get_help());
  531. switch(var->get_type())
  532. {
  533. default:
  534. case VAR_GAUGE:
  535. current_var = m->mutable_gauge();
  536. this->collector.collect_gauge(var, current_var);
  537. break;
  538. case VAR_COUNTER:
  539. current_var = m->mutable_sum();
  540. this->collector.collect_counter(var, current_var);
  541. break;
  542. case VAR_HISTOGRAM:
  543. report_histogram = m->mutable_histogram();
  544. current_var = report_histogram->add_data_points();
  545. this->collector.collect_histogram(var, current_var);
  546. break;
  547. case VAR_SUMMARY:
  548. current_var = m->mutable_summary();
  549. this->collector.collect_summary(var, current_var);
  550. break;
  551. case VAR_HISTOGRAM_COUNTER:
  552. // add multiple metrics inside
  553. this->collector.collect_histogram_counter(var, metrics);
  554. break;
  555. }
  556. }
  557. for (it = tmp.begin(); it != tmp.end(); it++)
  558. delete it->second;
  559. this->reset(); // reset by report interval
  560. return true;
  561. }
  562. void RPCMetricsOTel::Collector::collect_gauge(RPCVar *var,
  563. google::protobuf::Message *msg)
  564. {
  565. GaugeVar *gauge = (GaugeVar *)var;
  566. Gauge *report_gauge = static_cast<Gauge *>(msg);
  567. double data = gauge->get();
  568. NumberDataPoint *data_points = report_gauge->add_data_points();
  569. data_points->set_as_double(data);
  570. data_points->set_time_unix_nano(this->current_timestamp_nano);
  571. }
  572. void RPCMetricsOTel::Collector::collect_counter(RPCVar *var,
  573. google::protobuf::Message *msg)
  574. {
  575. CounterVar *counter = (CounterVar *)var;
  576. std::unordered_map<std::string, GaugeVar *> *data;
  577. data = (std::unordered_map<std::string, GaugeVar *> *)counter->get_map();
  578. for (auto it = data->begin(); it != data->end(); it++)
  579. this->collect_counter_each(it->first, it->second->get(), msg);
  580. }
  581. void RPCMetricsOTel::Collector::add_counter_label(const std::string& label)
  582. {
  583. const char *key;
  584. const char *value;
  585. size_t key_len;
  586. size_t lpos;
  587. size_t rpos;
  588. size_t pos = -1;
  589. LABEL_MAP *m = new LABEL_MAP();
  590. do {
  591. pos++;
  592. lpos = label.find_first_of('=', pos);
  593. key = label.data() + pos;
  594. key_len = lpos - pos;
  595. rpos = label.find_first_of(',', lpos + 1);
  596. pos = rpos;
  597. if (rpos == std::string::npos)
  598. rpos = label.length();
  599. value = label.data() + lpos + 2;
  600. // value_len = rpos - lpos - 3;
  601. m->emplace(std::string{key, key_len}, std::string{value, rpos - lpos - 3});
  602. } while (pos != std::string::npos);
  603. this->label_map.emplace(label, m);
  604. }
  605. void RPCMetricsOTel::Collector::collect_counter_each(const std::string& label,
  606. double data,
  607. google::protobuf::Message *msg)
  608. {
  609. Sum *report_sum = static_cast<Sum *>(msg);
  610. NumberDataPoint *data_points = report_sum->add_data_points();
  611. std::map<std::string, LABEL_MAP *>::iterator it;
  612. std::string key;
  613. std::string value;
  614. if (!label.empty())
  615. {
  616. if (it == this->label_map.end())
  617. {
  618. this->add_counter_label(label);
  619. it = this->label_map.find(label);
  620. }
  621. for (const auto& kv : *(it->second))
  622. {
  623. KeyValue *attribute = data_points->add_attributes();
  624. attribute->set_key(kv.first);
  625. AnyValue *value = attribute->mutable_value();
  626. value->set_string_value(kv.second);
  627. }
  628. }
  629. data_points->set_as_double(data);
  630. data_points->set_time_unix_nano(this->current_timestamp_nano);
  631. }
  632. void RPCMetricsOTel::Collector::collect_histogram(RPCVar *var,
  633. google::protobuf::Message *msg)
  634. {
  635. HistogramVar *histogram = (HistogramVar *)var;
  636. HistogramDataPoint *data_points = static_cast<HistogramDataPoint *>(msg);
  637. const std::vector<size_t> *bucket_counts = histogram->get_bucket_counts();
  638. const std::vector<double> *bucket_boundaries = histogram->get_bucket_boundaries();
  639. // begin
  640. data_points->set_time_unix_nano(this->current_timestamp_nano);
  641. size_t i = 0;
  642. size_t current = 0;
  643. for (; i < bucket_boundaries->size(); i++)
  644. {
  645. // current += this->bucket_counts[i];
  646. current = bucket_counts->at(i);
  647. this->collect_histogram_each(bucket_boundaries->at(i), current, data_points);
  648. }
  649. // current += this->bucket_counts[i];
  650. current = bucket_counts->at(i);
  651. this->collect_histogram_each(DBL_MAX, current, data_points);
  652. // end
  653. data_points->set_sum(histogram->get_sum());
  654. data_points->set_count(histogram->get_count());
  655. }
  656. void RPCMetricsOTel::Collector::collect_histogram_each(double bucket_boundary,
  657. size_t current_count,
  658. google::protobuf::Message *msg)
  659. {
  660. HistogramDataPoint *data_points = static_cast<HistogramDataPoint *>(msg);
  661. data_points->add_bucket_counts(current_count);
  662. if (bucket_boundary != DBL_MAX)
  663. data_points->add_explicit_bounds(bucket_boundary);
  664. }
  665. void RPCMetricsOTel::Collector::collect_summary(RPCVar *var,
  666. google::protobuf::Message *msg)
  667. {
  668. SummaryVar *summary = (SummaryVar *)var;
  669. Summary *report_summary = static_cast<Summary *>(msg);
  670. const std::vector<struct Quantile>& quantiles = summary->get_quantiles();
  671. const std::vector<double>& quantile_out = summary->get_quantile_out();
  672. SummaryDataPoint *data_points = report_summary->add_data_points();
  673. // begin
  674. data_points->set_time_unix_nano(this->current_timestamp_nano);
  675. for (size_t i = 0; i < quantiles.size(); i++) // equavalant to summary->get_size()
  676. {
  677. this->collect_summary_each(quantiles[i].quantile, quantile_out[i],
  678. data_points);
  679. }
  680. // end
  681. data_points->set_sum(summary->get_sum());
  682. data_points->set_count(summary->get_count());
  683. summary->clear_quantile_out();
  684. }
  685. void RPCMetricsOTel::Collector::collect_summary_each(double quantile,
  686. double quantile_out,
  687. google::protobuf::Message *msg)
  688. {
  689. SummaryDataPoint *data_points = static_cast<SummaryDataPoint *>(msg);
  690. SummaryDataPoint::ValueAtQuantile *vaq = data_points->add_quantile_values();
  691. vaq->set_quantile(quantile);
  692. vaq->set_value(quantile_out);
  693. }
  694. void RPCMetricsOTel::Collector::collect_histogram_counter(RPCVar *var,
  695. google::protobuf::Message *msg)
  696. {
  697. HistogramCounterVar *hc = (HistogramCounterVar *)var;
  698. ScopeMetrics *metrics = static_cast<ScopeMetrics *>(msg);
  699. const std::unordered_map<std::string, HistogramVar *> *data;
  700. data = static_cast<const std::unordered_map<std::string,
  701. HistogramVar *> *>(hc->get_map());
  702. std::map<std::string, LABEL_MAP *>::iterator m_it;
  703. std::string key;
  704. std::string value;
  705. Metric *m;
  706. Histogram *report_histogram;
  707. HistogramDataPoint *data_points;
  708. std::string label;
  709. for (auto it = data->begin(); it != data->end(); it++)
  710. {
  711. m = metrics->add_metrics();
  712. m->set_name(RPCFilter::raw_var_name(hc->get_name()));
  713. m->set_description(hc->get_help());
  714. report_histogram = m->mutable_histogram();
  715. report_histogram->set_aggregation_temporality(
  716. AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA);
  717. data_points = report_histogram->add_data_points();
  718. label = it->first;
  719. m_it = this->label_map.find(label);
  720. if (m_it == this->label_map.end())
  721. {
  722. this->add_counter_label(label);
  723. m_it = this->label_map.find(label);
  724. }
  725. for (const auto &kv : *(m_it->second))
  726. {
  727. KeyValue *attribute = data_points->add_attributes();
  728. attribute->set_key(kv.first);
  729. AnyValue *value = attribute->mutable_value();
  730. value->set_string_value(kv.second);
  731. }
  732. this->collect_histogram(it->second, data_points);
  733. }
  734. }
  735. } // end namespace srpc