Преглед на файлове

rpc_var : fix concurrent bug when CounterVar will loop data

holmes1412 преди 4 месеца
родител
ревизия
709d3ced9b
променени са 3 файла, в които са добавени 43 реда и са изтрити 8 реда
  1. 4 4
      src/module/rpc_metrics_filter.cc
  2. 38 4
      src/var/rpc_var.cc
  3. 1 0
      src/var/rpc_var.h

+ 4 - 4
src/module/rpc_metrics_filter.cc

@@ -64,9 +64,9 @@ RPCMetricsFilter::RPCMetricsFilter() :
 bool RPCMetricsFilter::client_end(SubTask *task, RPCModuleData& data)
 {
 	this->gauge(METRICS_REQUEST_COUNT)->increase();
-	this->counter(METRICS_REQUEST_METHOD)->add(
+	this->counter(METRICS_REQUEST_METHOD)->increase(
 				{{"service", data[OTLP_SERVICE_NAME]},
-				 {"method",  data[OTLP_METHOD_NAME] }})->increase();
+				 {"method",  data[OTLP_METHOD_NAME] }});
 	this->summary(METRICS_REQUEST_LATENCY)->observe(atoll(data[SRPC_DURATION].data()));
 
 	return true;
@@ -75,9 +75,9 @@ bool RPCMetricsFilter::client_end(SubTask *task, RPCModuleData& data)
 bool RPCMetricsFilter::server_end(SubTask *task, RPCModuleData& data)
 {
 	this->gauge(METRICS_REQUEST_COUNT)->increase();
-	this->counter(METRICS_REQUEST_METHOD)->add(
+	this->counter(METRICS_REQUEST_METHOD)->increase(
 				{{"service", data[OTLP_SERVICE_NAME]},
-				 {"method",  data[OTLP_METHOD_NAME] }})->increase();
+				 {"method",  data[OTLP_METHOD_NAME] }});
 	this->summary(METRICS_REQUEST_LATENCY)->observe(atoll(data[SRPC_DURATION].data()));
 
 	return true;

+ 38 - 4
src/var/rpc_var.cc

@@ -171,14 +171,14 @@ RPCVar *RPCVarGlobal::find(const std::string& name)
 	for (size_t i = 0; i < global_var->local_vars.size() && !ret; i++)
 	{
 		local = global_var->local_vars[i];
-		for (it = local->vars.begin(); it != local->vars.end(); it++)
+		local->mutex.lock();
+
+		for (it = local->vars.begin(); it != local->vars.end() && !ret; it++)
 		{
 			if (!name.compare(it->second->get_name()))
-			{
 				ret = it->second;
-				break;
-			}
 		}
+		local->mutex.unlock();
 	}
 
 	global_var->mutex.unlock();
@@ -203,6 +203,8 @@ CounterVar::~CounterVar()
 		delete it->second;
 }
 
+// Caution :
+//    make sure local->mutex.lock() before CounterVar::create(true)
 RPCVar *CounterVar::create(bool with_data)
 {
 	CounterVar *var = new CounterVar(this->name, this->help);
@@ -232,6 +234,9 @@ bool CounterVar::label_to_str(const LABEL_MAP& labels, std::string& str)
 	return true;
 }
 
+// [deprecate]
+//    This cannot guarantee the GaugeVar still exists
+//    because global will counter->reset() and delete the internal GaugeVar
 GaugeVar *CounterVar::add(const LABEL_MAP& labels)
 {
 	std::string label_str;
@@ -253,6 +258,35 @@ GaugeVar *CounterVar::add(const LABEL_MAP& labels)
 	return var;
 }
 
+void CounterVar::increase(const LABEL_MAP& labels)
+{
+	std::string label_str;
+	GaugeVar *var;
+
+	if (!this->label_to_str(labels, label_str))
+		return;
+
+	RPCVarLocal *local = RPCVarLocal::get_instance();
+	local->mutex.lock(); // against reset() and delete GaugeVar
+
+	auto it = this->data.find(label_str);
+
+	if (it == this->data.end())
+	{
+		var = new GaugeVar(label_str, "");
+		this->data.insert(std::make_pair(label_str, var));
+	}
+	else
+		var = it->second;
+
+	var->increase();
+	local->mutex.unlock();
+
+	return;
+}
+
+// Caution :
+//    make sure local->mutex.lock() before CounterVar::reduce()
 bool CounterVar::reduce(const void *ptr, size_t)
 {
 	std::unordered_map<std::string, GaugeVar *> *data;

+ 1 - 0
src/var/rpc_var.h

@@ -249,6 +249,7 @@ class CounterVar : public RPCVar
 public:
 	using LABEL_MAP = std::map<std::string, std::string>;
 	GaugeVar *add(const LABEL_MAP& labels);
+	void increase(const LABEL_MAP& labels);
 
 	RPCVar *create(bool with_data) override;
 	bool reduce(const void *ptr, size_t sz) override;