|
@@ -61,6 +61,15 @@ RPCMetricsFilter::RPCMetricsFilter() :
|
|
|
{{0.5, 0.05}, {0.9, 0.01}});
|
|
|
}
|
|
|
|
|
|
+RPCMetricsFilter::RPCMetricsFilter(const std::string &name) :
|
|
|
+ RPCFilter(RPCModuleTypeMetrics)
|
|
|
+{
|
|
|
+ this->create_gauge(METRICS_REQUEST_COUNT, "total request count");
|
|
|
+ this->create_counter(METRICS_REQUEST_METHOD, "request method statistics");
|
|
|
+ this->create_summary(METRICS_REQUEST_LATENCY, "request latency nano seconds",
|
|
|
+ {{0.5, 0.05}, {0.9, 0.01}});
|
|
|
+}
|
|
|
+
|
|
|
bool RPCMetricsFilter::client_end(SubTask *task, RPCModuleData& data)
|
|
|
{
|
|
|
this->gauge(METRICS_REQUEST_COUNT)->increase();
|
|
@@ -85,33 +94,38 @@ bool RPCMetricsFilter::server_end(SubTask *task, RPCModuleData& data)
|
|
|
|
|
|
GaugeVar *RPCMetricsFilter::gauge(const std::string& name)
|
|
|
{
|
|
|
+ std::string var_name = this->get_name() + name;
|
|
|
return RPCVarFactory::gauge(name);
|
|
|
}
|
|
|
|
|
|
CounterVar *RPCMetricsFilter::counter(const std::string& name)
|
|
|
{
|
|
|
+ std::string var_name = this->get_name() + name;
|
|
|
return RPCVarFactory::counter(name);
|
|
|
}
|
|
|
|
|
|
HistogramVar *RPCMetricsFilter::histogram(const std::string& name)
|
|
|
{
|
|
|
+ std::string var_name = this->get_name() + name;
|
|
|
return RPCVarFactory::histogram(name);
|
|
|
}
|
|
|
|
|
|
SummaryVar *RPCMetricsFilter::summary(const std::string& name)
|
|
|
{
|
|
|
+ std::string var_name = this->get_name() + name;
|
|
|
return RPCVarFactory::summary(name);
|
|
|
}
|
|
|
|
|
|
-GaugeVar *RPCMetricsFilter::create_gauge(const std::string& name,
|
|
|
+GaugeVar *RPCMetricsFilter::create_gauge(const std::string& str,
|
|
|
const std::string& help)
|
|
|
{
|
|
|
- if (RPCVarFactory::check_name_format(name) == false)
|
|
|
+ if (RPCVarFactory::check_name_format(str) == false)
|
|
|
{
|
|
|
errno = EINVAL;
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
+ std::string name = this->get_name() + str;
|
|
|
this->mutex.lock();
|
|
|
const auto it = var_names.insert(name);
|
|
|
this->mutex.unlock();
|
|
@@ -127,15 +141,16 @@ GaugeVar *RPCMetricsFilter::create_gauge(const std::string& name,
|
|
|
return gauge;
|
|
|
}
|
|
|
|
|
|
-CounterVar *RPCMetricsFilter::create_counter(const std::string& name,
|
|
|
+CounterVar *RPCMetricsFilter::create_counter(const std::string& str,
|
|
|
const std::string& help)
|
|
|
{
|
|
|
- if (RPCVarFactory::check_name_format(name) == false)
|
|
|
+ if (RPCVarFactory::check_name_format(str) == false)
|
|
|
{
|
|
|
errno = EINVAL;
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
+ std::string name = this->get_name() + str;
|
|
|
this->mutex.lock();
|
|
|
const auto it = var_names.insert(name);
|
|
|
this->mutex.unlock();
|
|
@@ -151,16 +166,17 @@ CounterVar *RPCMetricsFilter::create_counter(const std::string& name,
|
|
|
return counter;
|
|
|
}
|
|
|
|
|
|
-HistogramVar *RPCMetricsFilter::create_histogram(const std::string& name,
|
|
|
+HistogramVar *RPCMetricsFilter::create_histogram(const std::string& str,
|
|
|
const std::string& help,
|
|
|
const std::vector<double>& bucket)
|
|
|
{
|
|
|
- if (RPCVarFactory::check_name_format(name) == false)
|
|
|
+ if (RPCVarFactory::check_name_format(str) == false)
|
|
|
{
|
|
|
errno = EINVAL;
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
+ std::string name = this->get_name() + str;
|
|
|
this->mutex.lock();
|
|
|
const auto it = var_names.insert(name);
|
|
|
this->mutex.unlock();
|
|
@@ -176,16 +192,17 @@ HistogramVar *RPCMetricsFilter::create_histogram(const std::string& name,
|
|
|
return histogram;
|
|
|
}
|
|
|
|
|
|
-SummaryVar *RPCMetricsFilter::create_summary(const std::string& name,
|
|
|
+SummaryVar *RPCMetricsFilter::create_summary(const std::string& str,
|
|
|
const std::string& help,
|
|
|
const std::vector<struct Quantile>& quantile)
|
|
|
{
|
|
|
- if (RPCVarFactory::check_name_format(name) == false)
|
|
|
+ if (RPCVarFactory::check_name_format(str) == false)
|
|
|
{
|
|
|
errno = EINVAL;
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
+ std::string name = this->get_name() + str;
|
|
|
this->mutex.lock();
|
|
|
const auto it = var_names.insert(name);
|
|
|
this->mutex.unlock();
|
|
@@ -204,18 +221,19 @@ SummaryVar *RPCMetricsFilter::create_summary(const std::string& name,
|
|
|
return summary;
|
|
|
}
|
|
|
|
|
|
-SummaryVar *RPCMetricsFilter::create_summary(const std::string& name,
|
|
|
+SummaryVar *RPCMetricsFilter::create_summary(const std::string& str,
|
|
|
const std::string& help,
|
|
|
const std::vector<struct Quantile>& quantile,
|
|
|
const std::chrono::milliseconds max_age,
|
|
|
int age_bucket)
|
|
|
{
|
|
|
- if (RPCVarFactory::check_name_format(name) == false)
|
|
|
+ if (RPCVarFactory::check_name_format(str) == false)
|
|
|
{
|
|
|
errno = EINVAL;
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
+ std::string name = this->get_name() + str;
|
|
|
this->mutex.lock();
|
|
|
const auto it = var_names.insert(name);
|
|
|
this->mutex.unlock();
|
|
@@ -297,7 +315,7 @@ bool RPCMetricsPull::expose()
|
|
|
RPCVar *var = it->second;
|
|
|
this->report_output += "# HELP " + var->get_name() + " " +
|
|
|
var->get_help() + "\n# TYPE " + var->get_name() +
|
|
|
- " " + var->get_type_str() + "\n";
|
|
|
+ " " + var->get_type_str() + "\n";
|
|
|
output.clear();
|
|
|
var->collect(&this->collector);
|
|
|
this->report_output += output;
|
|
@@ -373,27 +391,44 @@ void RPCMetricsPull::Collector::collect_summary_end(RPCVar *summary,
|
|
|
|
|
|
bool RPCFilterPolicy::report(size_t count)
|
|
|
{
|
|
|
+ bool ret = false;
|
|
|
long long timestamp = GET_CURRENT_MS();
|
|
|
|
|
|
if (this->last_report_timestamp == 0)
|
|
|
this->last_report_timestamp = timestamp;
|
|
|
|
|
|
- if (timestamp > this->last_report_timestamp +
|
|
|
- (long long)this->report_interval ||
|
|
|
- count >= this->report_threshold)
|
|
|
+ if ((timestamp > this->last_report_timestamp +
|
|
|
+ (long long)this->report_interval ||
|
|
|
+ count >= this->report_threshold) &&
|
|
|
+ this->reporting == false)
|
|
|
{
|
|
|
- this->last_report_timestamp = timestamp;
|
|
|
- return true;
|
|
|
+ this->mutex.lock();
|
|
|
+ if (this->reporting == false)
|
|
|
+ {
|
|
|
+ this->reporting = true;
|
|
|
+ this->last_report_timestamp = timestamp;
|
|
|
+ ret = true;
|
|
|
+ }
|
|
|
+ this->mutex.unlock();
|
|
|
}
|
|
|
|
|
|
- return false;
|
|
|
+ return ret;
|
|
|
}
|
|
|
|
|
|
-RPCMetricsOTel::RPCMetricsOTel(const std::string& url,
|
|
|
+void RPCFilterPolicy::set_reporting(bool flag)
|
|
|
+{
|
|
|
+ this->mutex.lock();
|
|
|
+ this->reporting = flag;
|
|
|
+ this->mutex.unlock();
|
|
|
+}
|
|
|
+
|
|
|
+RPCMetricsOTel::RPCMetricsOTel(const std::string &name,
|
|
|
+ const std::string& url,
|
|
|
unsigned int redirect_max,
|
|
|
unsigned int retry_max,
|
|
|
size_t report_threshold,
|
|
|
size_t report_interval_msec) :
|
|
|
+ RPCMetricsFilter(name),
|
|
|
url(url + OTLP_METRICS_PATH),
|
|
|
redirect_max(redirect_max),
|
|
|
retry_max(retry_max),
|
|
@@ -402,7 +437,9 @@ RPCMetricsOTel::RPCMetricsOTel(const std::string& url,
|
|
|
{
|
|
|
}
|
|
|
|
|
|
-RPCMetricsOTel::RPCMetricsOTel(const std::string& url) :
|
|
|
+RPCMetricsOTel::RPCMetricsOTel(const std::string &name,
|
|
|
+ const std::string& url) :
|
|
|
+ RPCMetricsFilter(name),
|
|
|
url(url + OTLP_METRICS_PATH),
|
|
|
redirect_max(OTLP_HTTP_REDIRECT_MAX),
|
|
|
retry_max(OTLP_HTTP_RETRY_MAX),
|
|
@@ -447,6 +484,8 @@ SubTask *RPCMetricsOTel::create(RPCModuleData& data)
|
|
|
KeyValue *attribute;
|
|
|
AnyValue *value;
|
|
|
|
|
|
+ this->report_counts = 0; // this is not very strict but acceptable
|
|
|
+
|
|
|
InstrumentationScope *scope = metrics->mutable_scope();
|
|
|
scope->set_name(this->scope_name);
|
|
|
|
|
@@ -483,7 +522,7 @@ SubTask *RPCMetricsOTel::create(RPCModuleData& data)
|
|
|
|
|
|
// fprintf(stderr, "[Metrics info to report]\n%s\n", req.DebugString().c_str());
|
|
|
req.SerializeToString(output);
|
|
|
- this->report_counts = 0;
|
|
|
+ this->policy.set_reporting(false);
|
|
|
|
|
|
WFHttpTask *task = WFTaskFactory::create_http_task(this->url,
|
|
|
this->redirect_max,
|
|
@@ -522,13 +561,15 @@ bool RPCMetricsOTel::expose(google::protobuf::Message *msg)
|
|
|
|
|
|
this->reduce(tmp);
|
|
|
|
|
|
+ this->collector.set_current_nano(GET_CURRENT_NS());
|
|
|
+
|
|
|
for (it = tmp.begin(); it != tmp.end(); it++)
|
|
|
{
|
|
|
RPCVar *var = it->second;
|
|
|
Metric *m = metrics->add_metrics();
|
|
|
google::protobuf::Message *current_var;
|
|
|
|
|
|
- m->set_name(var->get_name());
|
|
|
+ m->set_name(this->raw_var_name(var->get_name()));
|
|
|
m->set_description(var->get_help());
|
|
|
|
|
|
switch(var->get_type())
|
|
@@ -536,21 +577,21 @@ bool RPCMetricsOTel::expose(google::protobuf::Message *msg)
|
|
|
default:
|
|
|
case VAR_GAUGE:
|
|
|
current_var = m->mutable_gauge();
|
|
|
+ this->collector.collect_gauge(var, current_var);
|
|
|
break;
|
|
|
case VAR_COUNTER:
|
|
|
current_var = m->mutable_sum();
|
|
|
+ this->collector.collect_counter(var, current_var);
|
|
|
break;
|
|
|
case VAR_HISTOGRAM:
|
|
|
current_var = m->mutable_histogram();
|
|
|
+ this->collector.collect_histogram(var, current_var);
|
|
|
break;
|
|
|
case VAR_SUMMARY:
|
|
|
current_var = m->mutable_summary();
|
|
|
+ this->collector.collect_summary(var, current_var);
|
|
|
break;
|
|
|
}
|
|
|
-
|
|
|
- this->collector.set_current_message(current_var);
|
|
|
- this->collector.set_current_nano(GET_CURRENT_NS());
|
|
|
- var->collect(&this->collector);
|
|
|
}
|
|
|
|
|
|
for (it = tmp.begin(); it != tmp.end(); it++)
|
|
@@ -559,14 +600,31 @@ bool RPCMetricsOTel::expose(google::protobuf::Message *msg)
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
-void RPCMetricsOTel::Collector::collect_gauge(RPCVar *gauge, double data)
|
|
|
+void RPCMetricsOTel::Collector::collect_gauge(RPCVar *var,
|
|
|
+ google::protobuf::Message *msg)
|
|
|
{
|
|
|
- Gauge *report_gauge = static_cast<Gauge *>(this->current_msg);
|
|
|
+ GaugeVar *gauge = (GaugeVar *)var;
|
|
|
+ Gauge *report_gauge = static_cast<Gauge *>(msg);
|
|
|
+
|
|
|
+ double data = gauge->get();
|
|
|
NumberDataPoint *data_points = report_gauge->add_data_points();
|
|
|
data_points->set_as_double(data);
|
|
|
data_points->set_time_unix_nano(this->current_timestamp_nano);
|
|
|
}
|
|
|
|
|
|
+void RPCMetricsOTel::Collector::collect_counter(RPCVar *var,
|
|
|
+ google::protobuf::Message *msg)
|
|
|
+{
|
|
|
+ CounterVar *counter = (CounterVar *)var;
|
|
|
+
|
|
|
+ std::unordered_map<std::string, GaugeVar *> *data;
|
|
|
+ data = (std::unordered_map<std::string, GaugeVar *> *)counter->get_data();
|
|
|
+
|
|
|
+ for (auto it = data->begin(); it != data->end(); it++)
|
|
|
+ this->collect_counter_each(it->first, it->second->get(), msg);
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
void RPCMetricsOTel::Collector::add_counter_label(const std::string& label)
|
|
|
{
|
|
|
const char *key;
|
|
@@ -597,11 +655,11 @@ void RPCMetricsOTel::Collector::add_counter_label(const std::string& label)
|
|
|
this->label_map.emplace(label, m);
|
|
|
}
|
|
|
|
|
|
-void RPCMetricsOTel::Collector::collect_counter_each(RPCVar *counter,
|
|
|
- const std::string& label,
|
|
|
- double data)
|
|
|
+void RPCMetricsOTel::Collector::collect_counter_each(const std::string& label,
|
|
|
+ double data,
|
|
|
+ google::protobuf::Message *msg)
|
|
|
{
|
|
|
- Sum *report_sum = static_cast<Sum *>(this->current_msg);
|
|
|
+ Sum *report_sum = static_cast<Sum *>(msg);
|
|
|
NumberDataPoint *data_points = report_sum->add_data_points();
|
|
|
std::map<std::string, LABEL_MAP *>::iterator it = this->label_map.find(label);
|
|
|
std::string key;
|
|
@@ -625,60 +683,85 @@ void RPCMetricsOTel::Collector::collect_counter_each(RPCVar *counter,
|
|
|
data_points->set_time_unix_nano(this->current_timestamp_nano);
|
|
|
}
|
|
|
|
|
|
-void RPCMetricsOTel::Collector::collect_histogram_begin(RPCVar *histogram)
|
|
|
+void RPCMetricsOTel::Collector::collect_histogram(RPCVar *var,
|
|
|
+ google::protobuf::Message *msg)
|
|
|
{
|
|
|
- Histogram *report_histogram = static_cast<Histogram *>(this->current_msg);
|
|
|
+ HistogramVar *histogram = (HistogramVar *)var;
|
|
|
+ Histogram *report_histogram = static_cast<Histogram *>(msg);
|
|
|
+
|
|
|
+ const std::vector<size_t> *bucket_counts = histogram->get_bucket_counts();
|
|
|
+ const std::vector<double> *bucket_boundaries = histogram->get_bucket_boundaries();
|
|
|
+
|
|
|
HistogramDataPoint *data_points = report_histogram->add_data_points();
|
|
|
- this->current_msg = data_points;
|
|
|
+
|
|
|
+ // begin
|
|
|
data_points->set_time_unix_nano(this->current_timestamp_nano);
|
|
|
+
|
|
|
+ size_t i = 0;
|
|
|
+ size_t current = 0;
|
|
|
+
|
|
|
+ for (; i < bucket_boundaries->size(); i++)
|
|
|
+ {
|
|
|
+ // current += this->bucket_counts[i];
|
|
|
+ current = bucket_counts->at(i);
|
|
|
+
|
|
|
+ this->collect_histogram_each(bucket_boundaries->at(i), current, data_points);
|
|
|
+ }
|
|
|
+
|
|
|
+ // current += this->bucket_counts[i];
|
|
|
+ current = bucket_counts->at(i);
|
|
|
+ this->collect_histogram_each(DBL_MAX, current, data_points);
|
|
|
+
|
|
|
+ // end
|
|
|
+ data_points->set_sum(histogram->get_sum());
|
|
|
+ data_points->set_count(histogram->get_count());
|
|
|
}
|
|
|
|
|
|
-void RPCMetricsOTel::Collector::collect_histogram_each(RPCVar *histogram,
|
|
|
- double bucket_boundary,
|
|
|
- size_t current_count)
|
|
|
+void RPCMetricsOTel::Collector::collect_histogram_each(double bucket_boundary,
|
|
|
+ size_t current_count,
|
|
|
+ google::protobuf::Message *msg)
|
|
|
{
|
|
|
- HistogramDataPoint *data_points = static_cast<HistogramDataPoint *>(this->current_msg);
|
|
|
+ HistogramDataPoint *data_points = static_cast<HistogramDataPoint *>(msg);
|
|
|
data_points->add_bucket_counts(current_count);
|
|
|
|
|
|
if (bucket_boundary != DBL_MAX)
|
|
|
data_points->add_explicit_bounds(bucket_boundary);
|
|
|
}
|
|
|
|
|
|
-void RPCMetricsOTel::Collector::collect_histogram_end(RPCVar *histogram,
|
|
|
- double sum,
|
|
|
- size_t count)
|
|
|
+void RPCMetricsOTel::Collector::collect_summary(RPCVar *var,
|
|
|
+ google::protobuf::Message *msg)
|
|
|
{
|
|
|
- HistogramDataPoint *data_points = static_cast<HistogramDataPoint *>(this->current_msg);
|
|
|
- data_points->set_sum(sum);
|
|
|
- data_points->set_count(count);
|
|
|
-}
|
|
|
+ SummaryVar *summary = (SummaryVar *)var;
|
|
|
+ Summary *report_summary = static_cast<Summary *>(msg);
|
|
|
+
|
|
|
+ const std::vector<struct Quantile>& quantiles = summary->get_quantiles();
|
|
|
+ const std::vector<double>& quantile_out = summary->get_quantile_out();
|
|
|
|
|
|
-void RPCMetricsOTel::Collector::collect_summary_begin(RPCVar *summary)
|
|
|
-{
|
|
|
- Summary *report_summary = static_cast<Summary *>(this->current_msg);
|
|
|
SummaryDataPoint *data_points = report_summary->add_data_points();
|
|
|
- this->current_msg = data_points;
|
|
|
+
|
|
|
+ // begin
|
|
|
data_points->set_time_unix_nano(this->current_timestamp_nano);
|
|
|
+ for (size_t i = 0; i < quantiles.size(); i++) // equavalant to summary->get_size()
|
|
|
+ {
|
|
|
+ this->collect_summary_each(quantiles[i].quantile, quantile_out[i],
|
|
|
+ data_points);
|
|
|
+ }
|
|
|
+
|
|
|
+ // end
|
|
|
+ data_points->set_sum(summary->get_sum());
|
|
|
+ data_points->set_count(summary->get_count());
|
|
|
+ summary->clear_quantile_out();
|
|
|
}
|
|
|
|
|
|
-void RPCMetricsOTel::Collector::collect_summary_each(RPCVar *summary,
|
|
|
- double quantile,
|
|
|
- double quantile_out)
|
|
|
+void RPCMetricsOTel::Collector::collect_summary_each(double quantile,
|
|
|
+ double quantile_out,
|
|
|
+ google::protobuf::Message *msg)
|
|
|
{
|
|
|
- SummaryDataPoint *data_points = static_cast<SummaryDataPoint *>(this->current_msg);
|
|
|
+ SummaryDataPoint *data_points = static_cast<SummaryDataPoint *>(msg);
|
|
|
SummaryDataPoint::ValueAtQuantile *vaq = data_points->add_quantile_values();
|
|
|
vaq->set_quantile(quantile);
|
|
|
vaq->set_value(quantile_out);
|
|
|
}
|
|
|
|
|
|
-void RPCMetricsOTel::Collector::collect_summary_end(RPCVar *summary,
|
|
|
- double sum,
|
|
|
- size_t count)
|
|
|
-{
|
|
|
- SummaryDataPoint *data_points = static_cast<SummaryDataPoint *>(this->current_msg);
|
|
|
- data_points->set_sum(sum);
|
|
|
- data_points->set_count(count);
|
|
|
-}
|
|
|
-
|
|
|
} // end namespace srpc
|
|
|
|