Bläddra i källkod

rpc_var : add RPCTimeWindow and TimedGaugeVar; add UnitTest;

liyingxin 1 år sedan
förälder
incheckning
83751b9e8d
5 ändrade filer med 260 tillägg och 21 borttagningar
  1. 3 0
      .github/workflows/ci.yml
  2. 61 4
      src/var/rpc_var.cc
  3. 92 12
      src/var/rpc_var.h
  4. 12 5
      test/CMakeLists.txt
  5. 92 0
      test/var_unittest.cc

+ 3 - 0
.github/workflows/ci.yml

@@ -28,6 +28,9 @@ jobs:
     - name: make tutorial
       run: make tutorial -j4
 
+    - name: make check
+      run: make check -j4
+
     - name: make install
       run: sudo make install
   test:

+ 61 - 4
src/var/rpc_var.cc

@@ -89,15 +89,26 @@ RPCVarLocal::~RPCVarLocal()
 {
 	RPCVarGlobal *global_var = RPCVarGlobal::get_instance();
 
-	global_var->del(this);
+	global_var->remove(this);
 	global_var->dup(this->vars);
 
 	for (auto it = this->vars.begin(); it != this->vars.end(); it++)
 		delete it->second;
 }
 
+RPCVarGlobal::~RPCVarGlobal()
+{
+	this->finished = true;
+
+	for (auto& local : this->local_vars)
+		delete local;
+}
+
 void RPCVarGlobal::dup(const std::unordered_map<std::string, RPCVar *>& vars)
 {
+	if (this->finished == true)
+		return;
+
 	RPCVarLocal *local;
 
 	this->mutex.lock();
@@ -105,6 +116,7 @@ void RPCVarGlobal::dup(const std::unordered_map<std::string, RPCVar *>& vars)
 		local = NULL;
 	else
 		local = this->local_vars[0];
+
 	this->mutex.unlock();
 
 	if (local == NULL)
@@ -130,7 +142,7 @@ void RPCVarGlobal::dup(const std::unordered_map<std::string, RPCVar *>& vars)
 	local->mutex.unlock();
 }
 
-void RPCVarGlobal::del(const RPCVarLocal *var)
+void RPCVarGlobal::remove(const RPCVarLocal *var)
 {
 	this->mutex.lock();
 	for (size_t i = 0; i < this->local_vars.size(); i++)
@@ -197,8 +209,7 @@ RPCVar *CounterVar::create(bool with_data)
 
 	if (with_data)
 	{
-		for (auto it = this->data.begin();
-			 it != this->data.end(); it++)
+		for (auto it = this->data.begin(); it != this->data.end(); it++)
 		{
 			var->data.insert(std::make_pair(it->first,
 							(GaugeVar *)it->second->create(true)));
@@ -496,5 +507,51 @@ void SummaryVar::collect(RPCVarCollector *collector)
 	this->quantile_out.clear();
 }
 
+TimedGaugeVar::TimedGaugeVar(const std::string& name, const std::string& help,
+							 Clock::duration duration, size_t bucket_num) :
+	GaugeVar(name, help),
+	RPCTimeWindow<GaugeVar>(duration, bucket_num)
+{
+	for (size_t i = 0; i < bucket_num; i ++)
+		this->data_bucket.push_back(GaugeVar(name, help));
+}
+
+void TimedGaugeVar::increase()
+{
+	this->rotate();
+
+	for (auto& bucket : this->data_bucket)
+		bucket.increase();
+}
+
+const void *TimedGaugeVar::get_data()
+{
+	GaugeVar& bucket = this->rotate();
+	return bucket.get_data();
+}
+
+RPCVar *TimedGaugeVar::create(bool with_data)
+{
+	GaugeVar& current_bucket = this->rotate();
+	RPCVar *var;
+
+	if (with_data) // for reduce
+	{
+		GaugeVar *gauge = new GaugeVar(this->get_name(), this->get_help());
+		gauge->set(*(double *)current_bucket.get_data());
+		var = gauge;
+	}
+	else // for dup into other threads
+	{
+		TimedGaugeVar *timed_gauge = new TimedGaugeVar(this->get_name(),
+													   this->get_help(),
+													   this->duration,
+													   this->bucket_num);
+		var = timed_gauge;
+	}
+
+	return var;
+}
+
 } // end namespace srpc
 

+ 92 - 12
src/var/rpc_var.h

@@ -23,6 +23,7 @@
 #include <vector>
 #include <map>
 #include <unordered_map>
+#include <chrono>
 #include "time_window_quantiles.h"
 
 namespace srpc
@@ -76,6 +77,7 @@ public:
 	static RPCVar *var(const std::string& name);
 	static bool check_name_format(const std::string& name);
 };
+
 class RPCVarGlobal
 {
 public:
@@ -85,23 +87,30 @@ public:
 		return &kInstance;
 	}
 
-	void add(RPCVarLocal *var)
+	void add(RPCVarLocal *local)
 	{
 		this->mutex.lock();
-		this->local_vars.push_back(var);
+		this->local_vars.push_back(local);
 		this->mutex.unlock();
 	}
 
-	void del(const RPCVarLocal *var);
 	RPCVar *find(const std::string& name);
+
+	// following APIs are used when thread exit and its ~RPCVarLocal()
+
+	// remove a RPCVarLocal from Glolba->local_vars
+	void remove(const RPCVarLocal *var);
+	// duplicate the vars into global existing RPCVarLocal
 	void dup(const std::unordered_map<std::string, RPCVar *>& vars);
 
 private:
-	RPCVarGlobal() { }
+	RPCVarGlobal() { this->finished = false; }
+	~RPCVarGlobal();
 
 public:
 	std::mutex mutex;
 	std::vector<RPCVarLocal *> local_vars;
+	bool finished;
 //	friend class RPCVarFactory;
 };
 
@@ -173,8 +182,9 @@ public:
 	virtual RPCVar *create(bool with_data) = 0;
 	virtual bool reduce(const void *ptr, size_t sz) = 0;
 	virtual size_t get_size() const = 0;
-	virtual const void *get_data() const = 0;
+	virtual const void *get_data() = 0;
 	virtual void collect(RPCVarCollector *collector) = 0;
+	virtual void reset() = 0;
 
 public:
 	RPCVar(const std::string& name, const std::string& help, RPCVarType type) :
@@ -199,12 +209,12 @@ protected:
 class GaugeVar : public RPCVar
 {
 public:
-	void increase() { ++this->data; }
-	void decrease() { --this->data; }
+	virtual void increase() { ++this->data; }
+	virtual void decrease() { --this->data; }
 	size_t get_size() const override { return sizeof(double); }
-	const void *get_data() const override { return &this->data; }
+	const void *get_data() override { return &this->data; }
 
-	virtual double get() const { return this->data; }
+	virtual double get() { return this->data; }
 	virtual void set(double var) { this->data = var; }
 
 	RPCVar *create(bool with_data) override;
@@ -221,6 +231,8 @@ public:
 		collector->collect_gauge(this, this->data);
 	}
 
+	void reset() override { this->data = 0; }
+
 public:
 	GaugeVar(const std::string& name, const std::string& help) :
 		RPCVar(name, help, VAR_GAUGE)
@@ -243,10 +255,12 @@ public:
 	void collect(RPCVarCollector *collector) override;
 
 	size_t get_size() const override { return this->data.size(); }
-	const void *get_data() const override { return &this->data; }
+	const void *get_data() override { return &this->data; }
 
 	static bool label_to_str(const LABEL_MAP& labels, std::string& str);
 
+	void reset() override { this->data.clear(); }
+
 public:
 	CounterVar(const std::string& name, const std::string& help) :
 		RPCVar(name, help, VAR_COUNTER)
@@ -254,6 +268,7 @@ public:
 	}
 
 	~CounterVar();
+
 private:
 	std::unordered_map<std::string, GaugeVar *> data;
 };
@@ -271,7 +286,7 @@ public:
 	void collect(RPCVarCollector *collector) override;
 
 	size_t get_size() const override { return this->bucket_counts.size(); }
-	const void *get_data() const override { return this; }
+	const void *get_data() override { return this; }
 
 	double get_sum() const { return this->sum; }
 	size_t get_count() const { return this->count; }
@@ -280,6 +295,8 @@ public:
 		return &this->bucket_counts;
 	}
 
+	void reset() override { this->bucket_counts.clear(); }
+
 public:
 	HistogramVar(const std::string& name, const std::string& help,
 				 const std::vector<double>& bucket);
@@ -301,7 +318,7 @@ public:
 	void collect(RPCVarCollector *collector) override;
 
 	size_t get_size() const override { return this->quantiles.size(); }
-	const void *get_data() const override { return this; }
+	const void *get_data() override { return this; }
 
 	double get_sum() const { return this->sum; }
 	size_t get_count() const { return this->count; }
@@ -310,6 +327,8 @@ public:
 		return &this->quantile_values;
 	}
 
+	void reset() override { /* no TimedSummary so no reset for Summary */}
+
 public:
 	SummaryVar(const std::string& name, const std::string& help,
 			   const std::vector<struct Quantile>& quantile,
@@ -326,6 +345,67 @@ private:
 	std::vector<double> quantile_out;
 };
 
+template<typename VAR>
+class RPCTimeWindow
+{
+public:
+	using Clock = std::chrono::steady_clock;
+	RPCTimeWindow(Clock::duration duration, size_t bucket_num);
+
+protected:
+	VAR& rotate();
+
+protected:
+	std::vector<VAR> data_bucket;
+	Clock::duration duration;
+	size_t bucket_num;
+
+private:
+	size_t current_bucket;
+	Clock::time_point last_rotation;
+	Clock::duration rotation_interval;
+};
+
+template<typename VAR>
+RPCTimeWindow<VAR>::RPCTimeWindow(Clock::duration duration, size_t bucket_num) :
+	duration(duration),
+	bucket_num(bucket_num),
+	rotation_interval(duration / bucket_num)
+{
+	this->current_bucket = 0;
+	this->last_rotation = Clock::now();
+}
+
+template<typename VAR>
+VAR& RPCTimeWindow<VAR>::rotate()
+{
+	auto delta = Clock::now() - this->last_rotation;
+
+	while (delta > this->rotation_interval)
+	{
+		this->data_bucket[this->current_bucket].reset();
+		if (++this->current_bucket >= this->data_bucket.size())
+			this->current_bucket = 0;
+
+		delta -= this->rotation_interval;
+		this->last_rotation += this->rotation_interval;
+	}
+
+	return this->data_bucket[this->current_bucket];
+}
+
+class TimedGaugeVar : public GaugeVar, RPCTimeWindow<GaugeVar>
+{
+public:
+	TimedGaugeVar(const std::string& name, const std::string& help,
+				  Clock::duration duration, size_t bucket_num);
+	// for collect
+	void increase() override;
+	// for reduce
+	const void *get_data() override;
+	RPCVar *create(bool with_data) override;
+};
+
 } // end namespace srpc
 
 #endif

+ 12 - 5
test/CMakeLists.txt

@@ -93,10 +93,6 @@ else ()
 	set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fPIC -pipe -std=${CXX_STD} -fno-exceptions")
 endif ()
 
-set(TEST_LIST
-	unittest
-)
-
 protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS test_pb.proto)
 
 add_custom_target(
@@ -158,6 +154,10 @@ else ()
 		)
 endif ()
 
+set(TEST_LIST unittest)
+set(BASIC_TEST var_unittest)
+set(ALL_TEST var_unittest unittest)
+
 foreach(src ${TEST_LIST})
 	add_executable(${src} EXCLUDE_FROM_ALL ${src}.cc ${PROTO_SRCS} ${PROTO_HDRS})
 	target_link_libraries(${src} ${SRPC_LIB} ${GTEST_LIB})
@@ -166,10 +166,17 @@ foreach(src ${TEST_LIST})
 	add_dependencies(check ${src})
 endforeach()
 
+foreach(src ${BASIC_TEST})
+	add_executable(${src} EXCLUDE_FROM_ALL ${src}.cc)
+	target_link_libraries(${src} ${SRPC_LIB} ${GTEST_LIB})
+	add_test(${src} ${src})
+	add_dependencies(check ${src})
+endforeach()
+
 if (WIN32)
 	set(memcheck nothing)
 elseif (NOT ${CMAKE_MEMORYCHECK_COMMAND} STREQUAL "CMAKE_MEMORYCHECK_COMMAND-NOTFOUND")
-	foreach(src ${TEST_LIST})
+	foreach(src ${ALL_TEST})
 		add_test(${src}-memory-check ${memcheck_command} ./${src})
 	endforeach()
 endif ()

+ 92 - 0
test/var_unittest.cc

@@ -0,0 +1,92 @@
+/*
+  Copyright (c) 2023 Sogou, Inc.
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <string.h>
+#include <string>
+#include <mutex>
+#include <condition_variable>
+#include <chrono>
+#include <gtest/gtest.h>
+#include "srpc/rpc_var.h"
+
+using namespace srpc;
+
+RPCVar *get_and_reduce(std::string&& var_name)
+{
+	RPCVar *result = NULL;
+	RPCVarGlobal *global_var = RPCVarGlobal::get_instance();
+	std::unordered_map<std::string, RPCVar *>::iterator it;
+
+	global_var->mutex.lock();
+	for (RPCVarLocal *local : global_var->local_vars)
+	{
+		local->mutex.lock();
+		it = local->vars.find(var_name);
+		if (it != local->vars.end())
+		{
+			if (result == NULL)
+				result = it->second->create(true);
+			else
+				result->reduce(it->second->get_data(), it->second->get_size());
+		}
+		local->mutex.unlock();
+	}
+	global_var->mutex.unlock();
+
+	return result;
+}
+
+TEST(var_unittest, GaugeVar)
+{
+	GaugeVar *req = new GaugeVar("req", "total req");
+	RPCVarLocal::get_instance()->add("req", req);
+
+	RPCVarFactory::gauge("req")->increase();
+
+	GaugeVar *gauge = (GaugeVar *)get_and_reduce("req");
+	EXPECT_EQ(gauge->get(), 1.0);
+	delete gauge;
+}
+
+TEST(var_unittest, TimedGaugeVar)
+{
+	TimedGaugeVar *qps = new TimedGaugeVar("qps", "req per second",
+										   std::chrono::seconds(1), 4);
+	RPCVarLocal::get_instance()->add("qps", qps);
+
+	RPCVarFactory::gauge("qps")->increase();
+	RPCVarFactory::gauge("qps")->increase();
+
+	GaugeVar *gauge = (GaugeVar *)get_and_reduce("qps");
+	EXPECT_EQ(gauge->get(), 2.0);
+	delete gauge;
+
+	usleep(500000);
+	RPCVarFactory::gauge("qps")->increase();
+	gauge = (GaugeVar *)get_and_reduce("qps");
+	EXPECT_EQ(gauge->get(), 3.0);
+	delete gauge;
+
+	usleep(500000);
+	gauge = (GaugeVar *)get_and_reduce("qps");
+	EXPECT_EQ(gauge->get(), 1.0);
+	delete gauge;
+}
+