Quellcode durchsuchen

Merge pull request #359 from holmes1412/master

Update RPCModule for concurrency issues.
xiehan vor 3 Monaten
Ursprung
Commit
cce3754e68

+ 21 - 0
src/module/rpc_filter.h

@@ -70,9 +70,20 @@ public:
 		this->module_type = module_type;
 	}
 
+	RPCFilter(const std::string name, enum RPCModuleType module_type)
+	{
+		size_t pos = name.find("::");
+		if (pos != std::string::npos)
+			this->filter_name = name.substr(0, pos) + "::";
+		else
+			this->filter_name = name + "::";
+		this->module_type = module_type;
+	}
+
 	virtual ~RPCFilter() { }
 
 	enum RPCModuleType get_module_type() const { return this->module_type; }
+	const std::string& get_name() const { return this->filter_name; }
 
 	virtual bool client_begin(SubTask *task, RPCModuleData& data)
 	{
@@ -91,8 +102,18 @@ public:
 		return true;
 	}
 
+	const std::string raw_var_name(const std::string& name) const
+	{
+		size_t pos = name.find("::");
+		if (pos != std::string::npos)
+			return name.substr(pos + 2);
+		else
+			return name;
+	}
+
 private:
 	enum RPCModuleType module_type;
+	std::string filter_name;
 };
 
 } // end namespace srpc

+ 145 - 62
src/module/rpc_metrics_filter.cc

@@ -61,6 +61,15 @@ RPCMetricsFilter::RPCMetricsFilter() :
 						 {{0.5, 0.05}, {0.9, 0.01}});
 }
 
+RPCMetricsFilter::RPCMetricsFilter(const std::string &name) :
+	RPCFilter(RPCModuleTypeMetrics)
+{
+	this->create_gauge(METRICS_REQUEST_COUNT, "total request count");
+	this->create_counter(METRICS_REQUEST_METHOD, "request method statistics");
+	this->create_summary(METRICS_REQUEST_LATENCY, "request latency nano seconds",
+						 {{0.5, 0.05}, {0.9, 0.01}});
+}
+
 bool RPCMetricsFilter::client_end(SubTask *task, RPCModuleData& data)
 {
 	this->gauge(METRICS_REQUEST_COUNT)->increase();
@@ -85,33 +94,38 @@ bool RPCMetricsFilter::server_end(SubTask *task, RPCModuleData& data)
 
 GaugeVar *RPCMetricsFilter::gauge(const std::string& name)
 {
+	std::string var_name = this->get_name() + name;
 	return RPCVarFactory::gauge(name);
 }
 
 CounterVar *RPCMetricsFilter::counter(const std::string& name)
 {
+	std::string var_name = this->get_name() + name;
 	return RPCVarFactory::counter(name);
 }
 
 HistogramVar *RPCMetricsFilter::histogram(const std::string& name)
 {
+	std::string var_name = this->get_name() + name;
 	return RPCVarFactory::histogram(name);
 }
 
 SummaryVar *RPCMetricsFilter::summary(const std::string& name)
 {
+	std::string var_name = this->get_name() + name;
 	return RPCVarFactory::summary(name);
 }
 
-GaugeVar *RPCMetricsFilter::create_gauge(const std::string& name,
+GaugeVar *RPCMetricsFilter::create_gauge(const std::string& str,
 										 const std::string& help)
 {
-	if (RPCVarFactory::check_name_format(name) == false)
+	if (RPCVarFactory::check_name_format(str) == false)
 	{
 		errno = EINVAL;
 		return NULL;
 	}
 
+	std::string name = this->get_name() + str;
 	this->mutex.lock();
 	const auto it = var_names.insert(name);
 	this->mutex.unlock();
@@ -127,15 +141,16 @@ GaugeVar *RPCMetricsFilter::create_gauge(const std::string& name,
 	return gauge;
 }
 
-CounterVar *RPCMetricsFilter::create_counter(const std::string& name,
+CounterVar *RPCMetricsFilter::create_counter(const std::string& str,
 											 const std::string& help)
 {
-	if (RPCVarFactory::check_name_format(name) == false)
+	if (RPCVarFactory::check_name_format(str) == false)
 	{
 		errno = EINVAL;
 		return NULL;
 	}
 
+	std::string name = this->get_name() + str;
 	this->mutex.lock();
 	const auto it = var_names.insert(name);
 	this->mutex.unlock();
@@ -151,16 +166,17 @@ CounterVar *RPCMetricsFilter::create_counter(const std::string& name,
 	return counter;
 }
 
-HistogramVar *RPCMetricsFilter::create_histogram(const std::string& name,
+HistogramVar *RPCMetricsFilter::create_histogram(const std::string& str,
 												 const std::string& help,
 												 const std::vector<double>& bucket)
 {
-	if (RPCVarFactory::check_name_format(name) == false)
+	if (RPCVarFactory::check_name_format(str) == false)
 	{
 		errno = EINVAL;
 		return NULL;
 	}
 
+	std::string name = this->get_name() + str;
 	this->mutex.lock();
 	const auto it = var_names.insert(name);
 	this->mutex.unlock();
@@ -176,16 +192,17 @@ HistogramVar *RPCMetricsFilter::create_histogram(const std::string& name,
 	return histogram;
 }
 
-SummaryVar *RPCMetricsFilter::create_summary(const std::string& name,
+SummaryVar *RPCMetricsFilter::create_summary(const std::string& str,
 											 const std::string& help,
 								const std::vector<struct Quantile>& quantile)
 {
-	if (RPCVarFactory::check_name_format(name) == false)
+	if (RPCVarFactory::check_name_format(str) == false)
 	{
 		errno = EINVAL;
 		return NULL;
 	}
 
+	std::string name = this->get_name() + str;
 	this->mutex.lock();
 	const auto it = var_names.insert(name);
 	this->mutex.unlock();
@@ -204,18 +221,19 @@ SummaryVar *RPCMetricsFilter::create_summary(const std::string& name,
 	return summary;
 }
 
-SummaryVar *RPCMetricsFilter::create_summary(const std::string& name,
+SummaryVar *RPCMetricsFilter::create_summary(const std::string& str,
 											 const std::string& help,
 								const std::vector<struct Quantile>& quantile,
 								const std::chrono::milliseconds max_age,
 								int age_bucket)
 {
-	if (RPCVarFactory::check_name_format(name) == false)
+	if (RPCVarFactory::check_name_format(str) == false)
 	{
 		errno = EINVAL;
 		return NULL;
 	}
 
+	std::string name = this->get_name() + str;
 	this->mutex.lock();
 	const auto it = var_names.insert(name);
 	this->mutex.unlock();
@@ -297,7 +315,7 @@ bool RPCMetricsPull::expose()
 		RPCVar *var = it->second;
 		this->report_output += "# HELP " + var->get_name() + " " +
 							   var->get_help() + "\n# TYPE " + var->get_name() +
-                               " " + var->get_type_str() + "\n";
+							   " " + var->get_type_str() + "\n";
 		output.clear();
 		var->collect(&this->collector);
 		this->report_output += output;
@@ -373,27 +391,44 @@ void RPCMetricsPull::Collector::collect_summary_end(RPCVar *summary,
 
 bool RPCFilterPolicy::report(size_t count)
 {
+	bool ret = false;
 	long long timestamp = GET_CURRENT_MS();
 
 	if (this->last_report_timestamp == 0)
 		this->last_report_timestamp = timestamp;
 
-	if (timestamp > this->last_report_timestamp +
-		(long long)this->report_interval ||
-		count >= this->report_threshold)
+	if ((timestamp > this->last_report_timestamp +
+		 (long long)this->report_interval ||
+		 count >= this->report_threshold) &&
+		this->reporting == false)
 	{
-		this->last_report_timestamp = timestamp;
-		return true;
+		this->mutex.lock();
+		if (this->reporting == false)
+		{
+			this->reporting = true;
+			this->last_report_timestamp = timestamp;
+			ret = true;
+		}
+		this->mutex.unlock();
 	}
 
-	return false;
+	return ret;
 }
 
-RPCMetricsOTel::RPCMetricsOTel(const std::string& url,
+void RPCFilterPolicy::set_reporting(bool flag)
+{
+	this->mutex.lock();
+	this->reporting = flag;
+	this->mutex.unlock();
+}
+
+RPCMetricsOTel::RPCMetricsOTel(const std::string &name,
+							   const std::string& url,
 							   unsigned int redirect_max,
 							   unsigned int retry_max,
 							   size_t report_threshold,
 							   size_t report_interval_msec) :
+	RPCMetricsFilter(name),
 	url(url + OTLP_METRICS_PATH),
 	redirect_max(redirect_max),
 	retry_max(retry_max),
@@ -402,7 +437,9 @@ RPCMetricsOTel::RPCMetricsOTel(const std::string& url,
 {
 }
 
-RPCMetricsOTel::RPCMetricsOTel(const std::string& url) :
+RPCMetricsOTel::RPCMetricsOTel(const std::string &name,
+							   const std::string& url) :
+	RPCMetricsFilter(name),
 	url(url + OTLP_METRICS_PATH),
 	redirect_max(OTLP_HTTP_REDIRECT_MAX),
 	retry_max(OTLP_HTTP_RETRY_MAX),
@@ -447,6 +484,8 @@ SubTask *RPCMetricsOTel::create(RPCModuleData& data)
 	KeyValue *attribute;
 	AnyValue *value;
 
+	this->report_counts = 0; // this is not very strict but acceptable
+
 	InstrumentationScope *scope = metrics->mutable_scope();
 	scope->set_name(this->scope_name);
 
@@ -483,7 +522,7 @@ SubTask *RPCMetricsOTel::create(RPCModuleData& data)
 
 //	fprintf(stderr, "[Metrics info to report]\n%s\n", req.DebugString().c_str());
 	req.SerializeToString(output);
-	this->report_counts = 0;
+	this->policy.set_reporting(false);
 
 	WFHttpTask *task = WFTaskFactory::create_http_task(this->url,
 													   this->redirect_max,
@@ -522,13 +561,15 @@ bool RPCMetricsOTel::expose(google::protobuf::Message *msg)
 
 	this->reduce(tmp);
 
+	this->collector.set_current_nano(GET_CURRENT_NS());
+
 	for (it = tmp.begin(); it != tmp.end(); it++)
 	{
 		RPCVar *var = it->second;
 		Metric *m = metrics->add_metrics();
 		google::protobuf::Message *current_var;
 
-		m->set_name(var->get_name());
+		m->set_name(this->raw_var_name(var->get_name()));
 		m->set_description(var->get_help());
 
 		switch(var->get_type())
@@ -536,21 +577,21 @@ bool RPCMetricsOTel::expose(google::protobuf::Message *msg)
 		default:
 		case VAR_GAUGE:
 			current_var = m->mutable_gauge();
+			this->collector.collect_gauge(var, current_var);
 			break;
 		case VAR_COUNTER:
 			current_var = m->mutable_sum();
+			this->collector.collect_counter(var, current_var);
 			break;
 		case VAR_HISTOGRAM:
 			current_var = m->mutable_histogram();
+			this->collector.collect_histogram(var, current_var);
 			break;
 		case VAR_SUMMARY:
 			current_var = m->mutable_summary();
+			this->collector.collect_summary(var, current_var);
 			break;
 		}
-
-		this->collector.set_current_message(current_var);
-		this->collector.set_current_nano(GET_CURRENT_NS());
-		var->collect(&this->collector);
 	}
 
 	for (it = tmp.begin(); it != tmp.end(); it++)
@@ -559,14 +600,31 @@ bool RPCMetricsOTel::expose(google::protobuf::Message *msg)
 	return true;
 }
 
-void RPCMetricsOTel::Collector::collect_gauge(RPCVar *gauge, double data)
+void RPCMetricsOTel::Collector::collect_gauge(RPCVar *var,
+											  google::protobuf::Message *msg)
 {
-	Gauge *report_gauge = static_cast<Gauge *>(this->current_msg);
+	GaugeVar *gauge = (GaugeVar *)var;
+	Gauge *report_gauge = static_cast<Gauge *>(msg);
+
+	double data = gauge->get();
 	NumberDataPoint *data_points = report_gauge->add_data_points();
 	data_points->set_as_double(data);
 	data_points->set_time_unix_nano(this->current_timestamp_nano);
 }
 
+void RPCMetricsOTel::Collector::collect_counter(RPCVar *var,
+												google::protobuf::Message *msg)
+{
+	CounterVar *counter = (CounterVar *)var;
+
+	std::unordered_map<std::string, GaugeVar *> *data;
+	data = (std::unordered_map<std::string, GaugeVar *> *)counter->get_data();
+
+	for (auto it = data->begin(); it != data->end(); it++)
+		this->collect_counter_each(it->first, it->second->get(), msg);
+}
+
+
 void RPCMetricsOTel::Collector::add_counter_label(const std::string& label)
 {
 	const char *key;
@@ -597,11 +655,11 @@ void RPCMetricsOTel::Collector::add_counter_label(const std::string& label)
 	this->label_map.emplace(label, m);
 }
 
-void RPCMetricsOTel::Collector::collect_counter_each(RPCVar *counter,
-													 const std::string& label,
-													 double data)
+void RPCMetricsOTel::Collector::collect_counter_each(const std::string& label,
+													 double data,
+													 google::protobuf::Message *msg)
 {
-	Sum *report_sum = static_cast<Sum *>(this->current_msg);
+	Sum *report_sum = static_cast<Sum *>(msg);
 	NumberDataPoint *data_points = report_sum->add_data_points();
 	std::map<std::string, LABEL_MAP *>::iterator it = this->label_map.find(label);
 	std::string key;
@@ -625,60 +683,85 @@ void RPCMetricsOTel::Collector::collect_counter_each(RPCVar *counter,
 	data_points->set_time_unix_nano(this->current_timestamp_nano);
 }
 
-void RPCMetricsOTel::Collector::collect_histogram_begin(RPCVar *histogram)
+void RPCMetricsOTel::Collector::collect_histogram(RPCVar *var,
+												  google::protobuf::Message *msg)
 {
-	Histogram *report_histogram = static_cast<Histogram *>(this->current_msg);
+	HistogramVar *histogram = (HistogramVar *)var;
+	Histogram *report_histogram = static_cast<Histogram *>(msg);
+
+	const std::vector<size_t> *bucket_counts = histogram->get_bucket_counts();
+	const std::vector<double> *bucket_boundaries = histogram->get_bucket_boundaries();
+
 	HistogramDataPoint *data_points = report_histogram->add_data_points();
-	this->current_msg = data_points;
+
+	// begin
 	data_points->set_time_unix_nano(this->current_timestamp_nano);
+
+	size_t i = 0;
+	size_t current = 0;
+
+	for (; i < bucket_boundaries->size(); i++)
+	{
+		// current += this->bucket_counts[i];
+		current = bucket_counts->at(i);
+
+		this->collect_histogram_each(bucket_boundaries->at(i), current, data_points);
+	}
+
+	// current += this->bucket_counts[i];
+	current = bucket_counts->at(i);
+	this->collect_histogram_each(DBL_MAX, current, data_points);
+
+	// end
+	data_points->set_sum(histogram->get_sum());
+	data_points->set_count(histogram->get_count());
 }
 
-void RPCMetricsOTel::Collector::collect_histogram_each(RPCVar *histogram,
-													   double bucket_boundary,
-													   size_t current_count)
+void RPCMetricsOTel::Collector::collect_histogram_each(double bucket_boundary,
+													   size_t current_count,
+													   google::protobuf::Message *msg)
 {
-	HistogramDataPoint *data_points = static_cast<HistogramDataPoint *>(this->current_msg);
+	HistogramDataPoint *data_points = static_cast<HistogramDataPoint *>(msg);
 	data_points->add_bucket_counts(current_count);
 
 	if (bucket_boundary != DBL_MAX)
 		data_points->add_explicit_bounds(bucket_boundary);
 }
 
-void RPCMetricsOTel::Collector::collect_histogram_end(RPCVar *histogram,
-													  double sum,
-													  size_t count)
+void RPCMetricsOTel::Collector::collect_summary(RPCVar *var,
+												google::protobuf::Message *msg)
 {
-	HistogramDataPoint *data_points = static_cast<HistogramDataPoint *>(this->current_msg);
-	data_points->set_sum(sum);
-	data_points->set_count(count);
-}
+	SummaryVar *summary = (SummaryVar *)var;
+	Summary *report_summary = static_cast<Summary *>(msg);
+
+	const std::vector<struct Quantile>& quantiles = summary->get_quantiles();
+	const std::vector<double>& quantile_out = summary->get_quantile_out();
 
-void RPCMetricsOTel::Collector::collect_summary_begin(RPCVar *summary)
-{
-	Summary *report_summary = static_cast<Summary *>(this->current_msg);
 	SummaryDataPoint *data_points = report_summary->add_data_points();
-	this->current_msg = data_points;
+
+	// begin
 	data_points->set_time_unix_nano(this->current_timestamp_nano);
+	for (size_t i = 0; i < quantiles.size(); i++) // equavalant to summary->get_size()
+	{
+		this->collect_summary_each(quantiles[i].quantile, quantile_out[i],
+								   data_points);
+	}
+
+	// end
+	data_points->set_sum(summary->get_sum());
+	data_points->set_count(summary->get_count());
+	summary->clear_quantile_out();
 }
 
-void RPCMetricsOTel::Collector::collect_summary_each(RPCVar *summary,
-													 double quantile,
-													 double quantile_out)
+void RPCMetricsOTel::Collector::collect_summary_each(double quantile,
+													 double quantile_out,
+													 google::protobuf::Message *msg)
 {
-	SummaryDataPoint *data_points = static_cast<SummaryDataPoint *>(this->current_msg);
+	SummaryDataPoint *data_points = static_cast<SummaryDataPoint *>(msg);
 	SummaryDataPoint::ValueAtQuantile *vaq = data_points->add_quantile_values();
 	vaq->set_quantile(quantile);
 	vaq->set_value(quantile_out);
 }
 
-void RPCMetricsOTel::Collector::collect_summary_end(RPCVar *summary,
-													double sum,
-													size_t count)
-{
-	SummaryDataPoint *data_points = static_cast<SummaryDataPoint *>(this->current_msg);
-	data_points->set_sum(sum);
-	data_points->set_count(count);
-}
-
 } // end namespace srpc
 

+ 39 - 28
src/module/rpc_metrics_filter.h

@@ -69,6 +69,7 @@ public:
 
 public:
 	RPCMetricsFilter();
+	RPCMetricsFilter(const std::string &name);
 
 protected:
 	void reduce(std::unordered_map<std::string, RPCVar *>& out);
@@ -141,7 +142,8 @@ public:
 					size_t report_interval_msec) :
 		report_threshold(report_threshold),
 		report_interval(report_interval_msec),
-		last_report_timestamp(0)
+		last_report_timestamp(0),
+		reporting(false)
 	{ }
 	
 	void set_report_threshold(size_t threshold)
@@ -159,11 +161,14 @@ public:
 	}
 
 	bool report(size_t count);
+	void set_reporting(bool flag);
 
 private:
 	size_t report_threshold; // metrics to report at most
 	size_t report_interval;
 	long long last_report_timestamp;
+	bool reporting;
+	std::mutex mutex;
 };
 
 class RPCMetricsOTel : public RPCMetricsFilter
@@ -186,11 +191,11 @@ public:
 	void set_scope_name(const std::string& name) { this->scope_name = name; }
 
 public:
-	RPCMetricsOTel(const std::string& url);
+	RPCMetricsOTel(const std::string &name, const std::string &url);
 
-	RPCMetricsOTel(const std::string& url, unsigned int redirect_max,
-				   unsigned int retry_max, size_t report_threshold,
-				   size_t report_interval);
+	RPCMetricsOTel(const std::string &name, const std::string& url,
+				   unsigned int redirect_max, unsigned int retry_max,
+				   size_t report_threshold, size_t report_interval);
 
 private:
 	SubTask *create(RPCModuleData& data) override;
@@ -206,40 +211,44 @@ private:
 		Collector() { }
 		virtual ~Collector();
 
-		void set_current_message(google::protobuf::Message *var_msg)
-		{
-			this->current_msg = var_msg;
-		}
-
 		void set_current_nano(unsigned long long ns)
 		{
 			this->current_timestamp_nano = ns;
 		}
 
-		void collect_gauge(RPCVar *gauge, double data) override;
-
-		void collect_counter_each(RPCVar *counter, const std::string& label,
-								  double data) override;
-
-		void collect_histogram_begin(RPCVar *histogram) override;
-		void collect_histogram_each(RPCVar *histogram,
-									double bucket_boudary,
-									size_t current_count) override;
-		void collect_histogram_end(RPCVar *histogram, double sum,
-								   size_t count) override;
-
-		void collect_summary_begin(RPCVar *summary) override;
+		// new api : fill var into msg
+		void collect_gauge(RPCVar *gauge, google::protobuf::Message *msg);
+		void collect_counter(RPCVar *counter, google::protobuf::Message *msg);
+		void collect_histogram(RPCVar *histogram, google::protobuf::Message *msg);
+		void collect_summary(RPCVar *summary, google::protobuf::Message *msg);
+
+		void collect_counter_each(const std::string &label, double data,
+								  google::protobuf::Message *msg);
+		void collect_histogram_each(double bucket_boudary, size_t current_count,
+									google::protobuf::Message *msg);
+		void collect_summary_each(double quantile, double quantile_out,
+								  google::protobuf::Message *msg);
+
+		// deprecated api
+		void collect_gauge(RPCVar *gauge, double data) override {}
+		void collect_counter_each(RPCVar *counter, const std::string &label,
+								  double data) override {}
+
+		void collect_histogram_begin(RPCVar *histogram) override {}
+		void collect_histogram_each(RPCVar *histogram, double bucket_boudary,
+									size_t current_count) override {}
+		void collect_histogram_end(RPCVar *histogram, double sum, size_t count) override{}
+
+		void collect_summary_begin(RPCVar *summary) override {}
 		void collect_summary_each(RPCVar *summary, double quantile,
-								  double quantile_out) override;
-		void collect_summary_end(RPCVar *summary, double sum,
-								 size_t count) override;
+								  double quantile_out) override {}
+		void collect_summary_end(RPCVar *summary, double sum, size_t count) override {}
 
 	private:
 		void add_counter_label(const std::string& label);
 
 	private:
 		using LABEL_MAP = std::map<std::string, std::string>;
-		google::protobuf::Message *current_msg;
 		unsigned long long current_timestamp_nano;
 		std::map<std::string, LABEL_MAP *> label_map;
 	};
@@ -247,11 +256,13 @@ private:
 private:
 	bool expose(google::protobuf::Message *metrics);
 
+protected:
+	Collector collector;
+
 private:
 	std::string url;
 	int redirect_max;
 	int retry_max;
-	Collector collector;
 	RPCFilterPolicy policy;
 	std::atomic<size_t> report_counts;
 	std::map<std::string, std::string> attributes;

+ 17 - 0
src/var/rpc_var.cc

@@ -458,7 +458,10 @@ RPCVar *SummaryVar::create(bool with_data)
 
 void SummaryVar::observe(double value)
 {
+	RPCVarLocal *local = RPCVarLocal::get_instance();
+	local->mutex.lock();
 	this->quantile_values.insert(value);
+	local->mutex.unlock();
 }
 
 bool SummaryVar::reduce(const void *ptr, size_t sz)
@@ -572,6 +575,20 @@ void TimedGaugeVar::increase()
 		bucket.increase();
 }
 
+void TimedGaugeVar::set(double val)
+{
+	this->rotate();
+
+	for (auto &bucket : this->data_bucket)
+		bucket.set(val);
+}
+
+double TimedGaugeVar::get()
+{
+	GaugeVar &bucket = this->rotate();
+	return bucket.get();
+}
+
 const void *TimedGaugeVar::get_data()
 {
 	GaugeVar& bucket = this->rotate();

+ 19 - 0
src/var/rpc_var.h

@@ -291,6 +291,10 @@ public:
 
 	double get_sum() const { return this->sum; }
 	size_t get_count() const { return this->count; }
+	const std::vector<double> *get_bucket_boundaries() const
+	{
+		return &this->bucket_boundaries;
+	}
 	const std::vector<size_t> *get_bucket_counts() const
 	{
 		return &this->bucket_counts;
@@ -330,6 +334,18 @@ public:
 
 	void reset() override { /* no TimedSummary so no reset for Summary */}
 
+	const std::vector<struct Quantile>& get_quantiles() const
+	{
+		return this->quantiles;
+	}
+	const std::vector<double>& get_quantile_out() const
+	{
+		return this->quantile_out;
+	}
+
+	// only for clear stack variable after filled into protobuf or out_string
+	void clear_quantile_out() { this->quantile_out.clear(); }
+
 public:
 	SummaryVar(const std::string& name, const std::string& help,
 			   const std::vector<struct Quantile>& quantile,
@@ -402,6 +418,9 @@ public:
 				  Clock::duration duration, size_t bucket_num);
 	// for collect
 	void increase() override;
+	double get() override;
+	void set(double var) override;
+
 	// for reduce
 	const void *get_data() override;
 	RPCVar *create(bool with_data) override;

+ 1 - 0
tools/templates/common/config.json

@@ -47,6 +47,7 @@
     },
     {
       "filter": "opentelemetry",
+      "filter_name": "otel_reporter1",
       "address": "http://opentelemetry.com:4389",
       "redirect_max": 0,
       "retry_max": 1,

+ 3 - 1
tools/templates/config/config_full.cc

@@ -321,6 +321,7 @@ void RPCConfig::load_metrics()
             if (it.has("address") == false)
                 continue;
 
+            std::string name = it["filter_name"];
             std::string url = it["address"];
 
             unsigned int redirect_max = OTLP_HTTP_REDIRECT_MAX;
@@ -337,7 +338,8 @@ void RPCConfig::load_metrics()
             if (it.has("report_interval_ms"))
                 report_interval = it["report_interval_ms"];
 
-            RPCMetricsOTel *filter = new RPCMetricsOTel(url,
+            RPCMetricsOTel *filter = new RPCMetricsOTel(name,
+                                                        url,
                                                         redirect_max,
                                                         retry_max,
                                                         report_threshold,