Quellcode durchsuchen

data_lifecycle: multi-tenancy (#120)

* data_lifecycle: modify delete sql

* data_lifecycle: add without=1

* data_lifecycle: life_cycle_table use mysql

* data_lifecycle: deal with null value

* data_lifecycle: deal with string field

* data_lifecycle: 多租户改造

* data_lifecycle: deal with last_invisible_time

* update config path

* fix bug

* data_lifecycle: read my.conf

* match dtc-conf

Co-authored-by: shzhulin3 <shzhulin3@jd.com>
Co-authored-by: Yang Shuang <yangshuang68@jd.com>
zhulin vor 1 Jahr
Ursprung
Commit
dc55975cdf

+ 1 - 0
conf/dtc.yaml

@@ -25,6 +25,7 @@ primary:
   db: &db mydb
   table: &table name
   layered.rule: '(a>1 and a<3) or uid = 0'
+  option.file.path: '../conf/my.conf'
   cache:
     field:
       - {name: &key uid, type: signed/unsigned/float/string/binary, size: 4}

BIN
src/data_lifecycle.zip


+ 16 - 34
src/data_lifecycle/data_conf.cc

@@ -52,39 +52,16 @@ uint32_t DataConf::Port(){
     return port_;
 }
 
-int DataConf::LoadConfig(int argc, char *argv[]){
+int DataConf::LoadConfig(const std::string& config_path){
     int c;
-    strcpy(table_file, "../conf/dtc-conf-1000.yaml");
-    strcpy(cache_file, "../conf/dtc-conf-1000.yaml");
-
-    while ((c = getopt(argc, argv, "df:t:hvV")) != -1) {
-        switch (c) {
-        case 'd':
-            background = 0;
-            break;
-        case 'f':
-            strncpy(cache_file, optarg, sizeof(cache_file) - 1);
-            break;
-        case 't':
-            strncpy(table_file, optarg, sizeof(table_file) - 1);
-            break;
-        case 'a':
-            strncpy(agent_file, optarg, sizeof(agent_file) - 1);
-            break;
-        case 'h':
-            show_usage();
-            return 0;
-        case '?':
-            show_usage();
-            return -1;
-        }
-    }
+    strcpy(table_file, config_path.c_str());
+    strcpy(cache_file, config_path.c_str());
 
     YAML::Node config;
     try {
-        config = YAML::LoadFile(cache_file);
+        config = YAML::LoadFile(config_path);
     } catch (const YAML::Exception &e) {
-        log4cplus_error("config file error:%s, %s\n", e.what(), cache_file);
+        log4cplus_error("config file error:%s, %s\n", e.what(), config_path.c_str());
         return DTC_CODE_LOAD_CONFIG_ERR;
     }
 
@@ -104,19 +81,20 @@ int DataConf::LoadConfig(int argc, char *argv[]){
     return 0;
 }
 
-int DataConf::ParseConfig(ConfigParam& config_param){
+int DataConf::ParseConfig(const std::string& config_path, ConfigParam& config_param){
     YAML::Node config;
     try {
-        config = YAML::LoadFile(cache_file);
+        config = YAML::LoadFile(config_path);
     } catch (const YAML::Exception &e) {
-        log4cplus_error("config file error:%s, %s\n", e.what(), cache_file);
+        log4cplus_error("config file error:%s, %s\n", e.what(), config_path.c_str());
         return DTC_CODE_LOAD_CONFIG_ERR;
     }
 
     YAML::Node node = config["data_lifecycle"]["single.query.count"];
     config_param.single_query_cnt_ = node? node.as<int>(): 10;    
     
-    node = config["data_lifecycle"]["rule.sql"];
+    //node = config["data_lifecycle"]["rule.sql"];
+    node = config["primary"]["layered.rule"];
     if(!node){
         log4cplus_error("rule.sql not defined.");
         return DTC_CODE_PARSE_CONFIG_ERR;
@@ -200,11 +178,15 @@ int DataConf::ParseConfig(ConfigParam& config_param){
         return DTC_CODE_PARSE_CONFIG_ERR;
     }
     config_param.table_name_ = node.as<string>();
+    config_param.port_ = port_;
+
+    node = config["primary"]["option.file.path"];
+    config_param.option_file = node? node.as<string>(): "";
 
     log4cplus_debug("single_query_cnt_: %d, data_rule: %s, operate_time_rule: %s, operate_type: %s, "
-        "life_cycle_table_name: %s, key_field_name: %s, table_name: %s, hot_database_name: %s",
+        "life_cycle_table_name: %s, key_field_name: %s, table_name: %s, hot_database_name: %s, cold_database_name: %s",
         config_param.single_query_cnt_, config_param.data_rule_.c_str(), config_param.operate_time_rule_.c_str(),
         config_param.operate_type_.c_str(), config_param.life_cycle_table_name_.c_str(), config_param.key_field_name_.c_str(),
-        config_param.table_name_.c_str(), config_param.hot_db_name_.c_str());
+        config_param.table_name_.c_str(), config_param.hot_db_name_.c_str(), config_param.cold_db_name_.c_str());
     return 0;
 }

+ 4 - 8
src/data_lifecycle/data_conf.h

@@ -22,20 +22,16 @@ public:
     std::string full_db_addr_;
     std::string full_db_user_;
     std::string full_db_pwd_;
+    uint32_t port_;
+    std::string option_file;
 };
 
 class DataConf{
 public:
     DataConf();
     ~DataConf();
-    static DataConf *Instance(){
-        return Singleton<DataConf>::instance();
-    }
-    static void Destroy(){
-        Singleton<DataConf>::destory();
-    }
-    int LoadConfig(int argc, char *argv[]);
-    int ParseConfig(ConfigParam& config_param);
+    int LoadConfig(const std::string& config_path);
+    int ParseConfig(const std::string& config_path, ConfigParam& config_param);
     bool ParseAgentConf(std::string path);
     uint32_t Port();
 private:

+ 101 - 7
src/data_lifecycle/data_manager.cc

@@ -4,13 +4,19 @@
 #include "croncpp.h"
 #include <unistd.h>
 #include <set>
+#include <ifaddrs.h>
+#include <netinet/in.h>
+#include <string.h>
+#include <arpa/inet.h>
+
+#define default_option_file "../conf/my.conf"
 
 DataManager::DataManager(){
     next_process_time_ = 0;
     DBHost* db_host = new DBHost();
     memset(db_host, 0, sizeof(db_host));
     strcpy(db_host->Host, "127.0.0.1");
-    db_host->Port = DataConf::Instance()->Port();
+    db_host->Port = 3306;
     strcpy(db_host->User, "");
     strcpy(db_host->Password, "");
     db_conn_ = new CDBConn(db_host);
@@ -34,7 +40,7 @@ field_flag_vec_(config_param.field_flag_vec_){
     DBHost* db_host = new DBHost();
     memset(db_host, 0, sizeof(db_host));
     strcpy(db_host->Host, "127.0.0.1");
-    db_host->Port = DataConf::Instance()->Port();
+    db_host->Port = config_param.port_;
     strcpy(db_host->User, "root");
     strcpy(db_host->Password, "root");
     db_conn_ = new CDBConn(db_host);
@@ -50,6 +56,13 @@ field_flag_vec_(config_param.field_flag_vec_){
         full_db_host->Port = stoi(full_db_vec[1]);
         strcpy(full_db_host->User, config_param.full_db_user_.c_str());
         strcpy(full_db_host->Password, config_param.full_db_pwd_.c_str());
+        strcpy(full_db_host->OptionFile, "");
+        if(config_param.option_file.size() != 0){
+            strcpy(full_db_host->OptionFile, config_param.option_file.c_str());
+        } else {
+            strcpy(full_db_host->OptionFile, default_option_file);
+        }
+        printf("full_db_host->OptionFile: %s\n", full_db_host->OptionFile);
         full_db_conn_ = new CDBConn(full_db_host);
         if(NULL != full_db_host){
             delete full_db_host;
@@ -66,6 +79,31 @@ DataManager::~DataManager(){
     }
 }
 
+static std::string GetIp(){
+    struct ifaddrs * ifAddrStruct = NULL;
+    struct ifaddrs * ifAddrStruct1 = NULL;
+    void * tmpAddrPtr = NULL;
+
+    getifaddrs(&ifAddrStruct);
+    ifAddrStruct1 = ifAddrStruct;
+    std::string my_ip;
+
+    while (ifAddrStruct != NULL)
+    {
+        if (ifAddrStruct->ifa_addr->sa_family == AF_INET) {
+           tmpAddrPtr = &((struct sockaddr_in*)ifAddrStruct->ifa_addr)->sin_addr;
+           char addressBuffer[INET_ADDRSTRLEN];
+           inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN);
+           if(strcmp(ifAddrStruct->ifa_name, "eth0") == 0){
+               my_ip = addressBuffer;
+           }
+        }
+        ifAddrStruct=ifAddrStruct->ifa_next;
+    }
+    freeifaddrs(ifAddrStruct1);
+    return my_ip;
+}
+
 int DataManager::ConnectAgent(){
     return db_conn_->Open();
 }
@@ -115,8 +153,12 @@ int DataManager::DoTaskOnce(){
             printf("GetLastId error, ret: %d\n", ret);
             return DTC_CODE_MYSQL_QRY_ERR;
         }
+        if("" == last_invisible_time){
+            last_invisible_time = "1970-01-01 08:00:00";
+        }
         std::string query_sql = ConstructQuerySql(last_delete_id, last_invisible_time);
         std::vector<QueryInfo> query_info_vec;
+        //full_db_conn_->do_query(cold_db_name_.c_str(), "set names utf8");
         ret = DoQuery(query_sql, query_info_vec);
         if(0 != ret){
             printf("DoQuery error, ret: %d\n", ret);
@@ -135,12 +177,13 @@ int DataManager::DoTaskOnce(){
             last_delete_id_ = iter->id;
             last_invisible_time_ = iter->invisible_time;
             if(0 != ret){
-                UpdateLastDeleteId();
+                //UpdateLastDeleteId();
                 printf("DoDelete error, ret: %d\n", ret);
                 return DTC_CODE_MYSQL_DEL_ERR;
             }
         }
         UpdateLastDeleteId();
+        sleep(1);
     }
     return 0;
 }
@@ -152,7 +195,9 @@ void DataManager::SetTimeRule(const std::string& time_rule){
 int DataManager::GetLastId(uint64_t& last_delete_id, std::string& last_invisible_time){
     std::stringstream ss_sql;
     ss_sql << "select id,ip,last_id,last_update_time from " << life_cycle_table_name_
-            << " order by id desc limit 1";
+            << " where table_name='" << table_name_
+            << "' order by id desc limit 1";
+    log4cplus_debug("query sql: %s", ss_sql.str().c_str());
     int ret = full_db_conn_->do_query(cold_db_name_.c_str(), ss_sql.str().c_str());
     if(0 != ret){
         log4cplus_debug("query error, ret: %d, err msg: %s", ret, full_db_conn_->get_err_msg());
@@ -184,7 +229,7 @@ std::string DataManager::ConstructQuerySql(uint64_t last_delete_id, std::string
         }
     }
     ss_sql << " from " << table_name_
-        << " where (" << data_rule_
+        << " where not(" << data_rule_
         << ") and (invisible_time>'" << last_invisible_time
         << "' or (invisible_time='" << last_invisible_time
         << "' and id>" << last_delete_id
@@ -194,6 +239,8 @@ std::string DataManager::ConstructQuerySql(uint64_t last_delete_id, std::string
 }
 
 int DataManager::DoQuery(const std::string& query_sql, std::vector<QueryInfo>& query_info_vec){
+    printf("begin DoQuery\n");
+    ShowVariables();
     int ret = full_db_conn_->do_query(cold_db_name_.c_str(), query_sql.c_str());
     if(0 != ret){
         printf("query error, ret: %d, err msg: %s\n", ret, full_db_conn_->get_err_msg());
@@ -229,6 +276,13 @@ int DataManager::DoQuery(const std::string& query_sql, std::vector<QueryInfo>& q
     return 0;
 }
 
+void hextostring(char* str, int len){
+    for(int i = 0; i < len; i++){
+        printf("%02x", str[i]);
+    }
+    printf("\n");
+}
+
 std::set<std::string> DataManager::ConstructDeleteSql(const std::string& key){
     // delete根据key删除,并带上规则
     std::set<std::string> sql_set;
@@ -277,10 +331,11 @@ int DataManager::DoDelete(const std::string& delete_sql){
 }
 
 int DataManager::UpdateLastDeleteId(){
-    std::string local_ip;
+    std::string local_ip = GetIp();
     std::stringstream ss_sql;
-    ss_sql << "insert into " << life_cycle_table_name_
+    ss_sql << "replace into " << life_cycle_table_name_
         << " values(NULL,'" << local_ip
+        << "', '" << table_name_
         << "', " << last_delete_id_
         << ", '" << last_invisible_time_
         << "')";
@@ -292,6 +347,45 @@ int DataManager::UpdateLastDeleteId(){
     return 0;
 }
 
+int DataManager::ShowVariables(){
+    int ret = full_db_conn_->do_query(cold_db_name_.c_str(), "show variables like '%%char%%'");
+    if(0 != ret){
+        printf("query error, ret: %d, err msg: %s\n", ret, full_db_conn_->get_err_msg());
+        return ret;
+    }
+    if(0 == full_db_conn_->use_result()){
+        for (int i = 0; i < full_db_conn_->res_num; i++) {
+            ret = full_db_conn_->fetch_row();
+            if (ret != 0) {
+                full_db_conn_->free_result();
+                printf("db fetch row error: %s\n", full_db_conn_->get_err_msg());
+                return ret;
+            }
+            printf("%s: %s\n", full_db_conn_->Row[0], full_db_conn_->Row[1]);
+        }
+    }
+    return 0;
+}
+
+int DataManager::CreateTable(){
+    std::stringstream ss_sql;
+    ss_sql << "CREATE TABLE if not exists " << life_cycle_table_name_ << "("
+        << "`id` int(11) unsigned NOT NULL AUTO_INCREMENT,"
+        << "`ip` varchar(20) NOT NULL DEFAULT '0' COMMENT '执行清理操作的机器ip',"
+        << "`table_name` varchar(40) DEFAULT NULL,"
+        << "`last_id` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '上次删除的记录对应的id',"
+        << "`last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '上次删除的记录对应的更新时间',"
+        << "PRIMARY KEY (`id`),"
+        << "UNIQUE (`table_name`)"
+        << ") ENGINE=InnoDB DEFAULT CHARSET=utf8";
+    int ret = full_db_conn_->do_query(cold_db_name_.c_str(), ss_sql.str().c_str());
+    if(0 != ret){
+        log4cplus_debug("create table error, ret: %d, err msg: %s", ret, full_db_conn_->get_err_msg());
+        return ret;
+    }
+    return 0;
+}
+
 std::set<std::string> DataManager::splitStr(const std::string& src, const std::string& separate_character)
 {
     std::set<std::string> strs;

+ 2 - 0
src/data_lifecycle/data_manager.h

@@ -41,6 +41,8 @@ public:
     virtual int UpdateLastDeleteId();
     std::set<std::string> splitStr(const std::string& src, const std::string& separate_character);
     std::vector<std::string> splitVecStr(const std::string& src, const std::string& separate_character);
+    int CreateTable();
+    int ShowVariables();
 private:
     std::string data_rule_; // example: status=0
     std::string operate_time_rule_; // example: 0 */5 * * * ?

+ 74 - 15
src/data_lifecycle/main.cc

@@ -7,36 +7,95 @@
 #include "dbconfig.h"
 #include "log.h"
 
+#include <sys/types.h>
+#include <dirent.h>
+#include <stdio.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <iostream>
+#include <thread>
+
 const char data_project_name[] = "data_lifecycle_manager";
 
-int main(int argc, char *argv[]){
-    init_proc_title(argc, argv);
-    set_proc_title("agent-data-lifecycle");
-    init_log4cplus();
-    if(DataConf::Instance()->LoadConfig(argc, argv) != 0){
-        log4cplus_error("load_config error.");
+int scan_file(const char* path, std::vector<std::string>& config_vec){
+    DIR *dir = opendir(path);
+    if(dir == NULL){
+        log4cplus_error("opendir error.");
         return DTC_CODE_LOAD_CONFIG_ERR;
     }
+    struct dirent *dirent;
+    while(dirent = readdir(dir)){
+        string dir_name = dirent->d_name;
+        string match_str="dtc-conf";
+        if(dir_name == "." || dir_name == ".." || dir_name == "dtc-conf-0.yaml"){
+            continue;
+        } else if(dir_name.size() > match_str.size() && dir_name.substr(0, match_str.size()) == match_str){
+            config_vec.push_back("../conf/" + dir_name);
+        }
+    }
+    return 0;
+}
+
+void thread_func(const std::string& config_path){
+    DataConf* p_data_conf = new DataConf();
+    if(p_data_conf->LoadConfig(config_path) != 0){
+        log4cplus_error("load_config error.");
+        return;
+    }
     ConfigParam config_param;
-    if(DataConf::Instance()->ParseConfig(config_param) != 0){
+    if(p_data_conf->ParseConfig(config_path, config_param) != 0){
         log4cplus_error("parse_config error.");
-        return DTC_CODE_PARSE_CONFIG_ERR;
-    }
-    log4cplus_info("%s v%s: starting....", data_project_name, version);
-    if(init_daemon() < 0){
-        log4cplus_error("init_daemon error.");
-        return DTC_CODE_INIT_DAEMON_ERR;
+        return;
     }
     DataManager* p_data_manager = new DataManager(config_param);
     if(0 != p_data_manager->ConnectAgent()){
         log4cplus_error("ConnectAgent error.");
-        return DTC_CODE_INIT_DAEMON_ERR;
+        return;
     }
     if(0 != p_data_manager->ConnectFullDB()){
         log4cplus_error("ConnectFullDB error.");
-        return DTC_CODE_INIT_DAEMON_ERR;
+        return;
+    }
+    if(0 != p_data_manager->CreateTable()){
+        log4cplus_error("CreateTable error.");
+        return;
     }
     p_data_manager->DoProcess();
+    if(NULL != p_data_manager){
+        delete p_data_manager;
+    }
+    if(NULL != p_data_conf){
+        delete p_data_conf;
+    }
+}
+
+int main(int argc, char *argv[]){
+    init_proc_title(argc, argv);
+    set_proc_title("agent-data-lifecycle");
+    init_log4cplus();
+    log4cplus_info("%s v%s: starting....", data_project_name, version);
+    if(init_daemon() < 0){
+        log4cplus_error("init_daemon error.");
+        return DTC_CODE_INIT_DAEMON_ERR;
+    }
+
+    std::vector<std::string> config_vec;
+    int ret = scan_file("../conf", config_vec);
+    if(0 != ret){
+        log4cplus_error("scan_file error.");
+        return DTC_CODE_LOAD_CONFIG_ERR;
+    }
+    std::vector<std::thread> thread_vec;
+    for(auto config_path : config_vec){
+        thread_vec.push_back(std::thread(thread_func, config_path));
+    }
+
+    for (auto vit = thread_vec.begin(); vit != thread_vec.end(); vit++) {
+        if (vit->joinable()) {
+            vit->join();
+        }
+    }
+
     /*if(NULL != p_data_manager){
         delete p_data_manager;
     }