Explorar el Código

Fix online bugs (#229)

* fix bugs.

* timeout.

* add read timeout.

* fix result size more than 251 bug.

* rule test tool parameter.
Yang Shuang hace 1 año
padre
commit
aaba35239f

+ 7 - 0
src/connector/database_connection.cc

@@ -71,6 +71,7 @@ CDBConn::CDBConn(const DBHost *Host)
     STRCPY(DBConfig.User, Host->User);
     STRCPY(DBConfig.Password, Host->Password);
     DBConfig.ConnTimeout = Host->ConnTimeout;
+    DBConfig.ReadTimeout = Host->ReadTimeout;
     STRCPY(DBConfig.OptionFile, Host->OptionFile);
 
     if (mysql_init(&Mysql) == NULL) {
@@ -149,6 +150,12 @@ int CDBConn::Connect(const char *DBName)
             mysql_options(&Mysql, MYSQL_OPT_CONNECT_TIMEOUT,
                       (const char *)&(DBConfig.ConnTimeout));
         }
+        if(DBConfig.ReadTimeout != 0){
+			mysql_options(&Mysql, MYSQL_OPT_READ_TIMEOUT, (const char *)&(DBConfig.ReadTimeout));
+		}
+        if(DBConfig.WriteTimeout != 0){
+			mysql_options(&Mysql, MYSQL_OPT_WRITE_TIMEOUT, (const char *)&(DBConfig.WriteTimeout));
+		}        
         int isunix = DBConfig.Host[0] == '/';
         if (DBConfig.OptionFile[0] != '\0' &&
             mysql_options(&Mysql, MYSQL_READ_DEFAULT_FILE,

+ 2 - 0
src/connector/database_connection.h

@@ -34,6 +34,8 @@ struct DBHost {
 	char User[64];
 	char Password[128];
 	unsigned int ConnTimeout;
+	unsigned int ReadTimeout;
+	unsigned int WriteTimeout;
 	char OptionFile[256];
 };
 

+ 3 - 2
src/connector/mysql_operation.cc

@@ -798,7 +798,7 @@ std::string ConnectorProcess::value_to_str(const DTCValue *v, int fieldType)
         db_conn.escape_string(esc.c_str(), v->str.ptr,
                       v->str.len); // 先对字符串进行escape
         ret = '\'';
-        ret += esc.c_str();
+        ret += v->str.ptr;
         ret += "\'";
         return ret;
     default:
@@ -846,7 +846,7 @@ inline int ConnectorProcess::format_sql_value(const DTCValue *Value,
                 db_conn.escape_string(
                     esc.c_str(), Value->str.ptr,
                     Value->str.len); // 先对字符串进行escape
-                if (sql.append(esc.c_str()) < 0)
+                if (sql.append(Value->str.ptr) < 0)
                     error_no = -1;
             }
             if (sql.append('\'') < 0)
@@ -1513,6 +1513,7 @@ int ConnectorProcess::process_delete(DtcJob *Task)
 
     Task->resultInfo.set_affected_rows(db_conn.affected_rows());
     log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
+    log4cplus_debug("affected row: %d", db_conn.affected_rows());
 
     return (0);
 }

+ 13 - 4
src/data_lifecycle/data_manager.cc

@@ -19,6 +19,8 @@ DataManager::DataManager(){
     db_host->Port = 3306;
     strcpy(db_host->User, "");
     strcpy(db_host->Password, "");
+    db_host->ReadTimeout = 1;
+    db_host->WriteTimeout = 1;
     db_conn_ = new CDBConn(db_host);
     if(NULL != db_host){
         delete db_host;
@@ -43,6 +45,8 @@ field_flag_vec_(config_param.field_flag_vec_){
     db_host->Port = config_param.port_;
     strcpy(db_host->User, "root");
     strcpy(db_host->Password, "root");
+    db_host->ReadTimeout = 1;
+    db_host->WriteTimeout = 1;
     db_conn_ = new CDBConn(db_host);
     if(NULL != db_host){
         delete db_host;
@@ -173,12 +177,12 @@ int DataManager::DoTaskOnce(){
             // 如果执行失败,更新last_id,并退出循环
             std::string sql_set = ConstructDeleteSql(iter->field_info);
             ret = DoDelete(sql_set);
-            printf("DoDelete ret: %d\n", ret);
+            log4cplus_debug("DoDelete ret: %d\n", ret);
             last_delete_id_ = iter->id;
             last_invisible_time_ = iter->invisible_time;
             if(0 != ret){
                 //UpdateLastDeleteId();
-                printf("DoDelete error, ret: %d\n", ret);
+                log4cplus_debug("DoDelete error, ret: %d\n", ret);
                 return DTC_CODE_MYSQL_DEL_ERR;
             }
         }
@@ -240,7 +244,7 @@ std::string DataManager::ConstructQuerySql(uint64_t last_delete_id, std::string
 
 int DataManager::DoQuery(const std::string& query_sql, std::vector<QueryInfo>& query_info_vec){
     printf("begin DoQuery\n");
-    ShowVariables();
+
     int ret = full_db_conn_->do_query(cold_db_name_.c_str(), query_sql.c_str());
     if(0 != ret){
         printf("query error, ret: %d, err msg: %s\n", ret, full_db_conn_->get_err_msg());
@@ -310,7 +314,10 @@ std::string DataManager::ConstructDeleteSql(const std::vector<std::string>& key_
     ss_sql << "delete from " << table_name_ << " where ";
     for(int i = 0; i < field_vec_.size(); i++){
         if(field_flag_vec_[i] == 1){
-            ss_sql << field_vec_[i] << " = '" << key_vec[i] << "'";
+            char* esc = new char[key_vec[i].length()*2];
+            db_conn_->escape_string(esc, key_vec[i].c_str());
+            ss_sql << field_vec_[i] << " = '" << esc << "'";
+            delete []esc;
         } else {
             ss_sql << field_vec_[i] << " = " << key_vec[i];
         }
@@ -327,6 +334,8 @@ int DataManager::DoDelete(const std::string& delete_sql){
         log4cplus_debug("DoDelete error, ret: %d, err msg: %s, delete_sql: %s", ret, db_conn_->get_err_msg(), delete_sql.c_str());
         return ret;
     }
+    int affected = db_conn_->affected_rows();
+    log4cplus_debug("affected row: %d", affected);
     return 0;
 }
 

+ 0 - 1
src/data_lifecycle/main.cc

@@ -58,7 +58,6 @@ void thread_func(const std::string& config_path){
     }
     if(0 != p_data_manager->CreateTable()){
         log4cplus_error("CreateTable error.");
-        return;
     }
     p_data_manager->DoProcess();
     if(NULL != p_data_manager){

+ 0 - 1
src/hwcserver/hwc_sync_unit.cc

@@ -91,7 +91,6 @@ int HwcSync::Run()
     p_master_->Close();
     int i_sec = get_current_time() + 1;
     while (true) {
-        usleep(1000000); // 1s
         if (get_current_time() >= i_sec) {
             if (CComm::registor.CheckMemoryCreateTime()) {
                 log4cplus_error("detect share memory changed");

+ 12 - 0
src/libs/common/my/my_comm.h

@@ -86,4 +86,16 @@ static inline void int4store_big_endian(uchar *T, uint32 A)
 	*(T + 3) = (uchar)(A >> 24);
 }
 
+static inline void int8store_big_endian(uchar *T, ulonglong A) {
+  uint def_temp = (uint)A, def_temp2 = (uint)(A >> 32);
+  int4store_big_endian(T, def_temp);
+  int4store_big_endian(T + 4, def_temp2);
+}
+
+static inline void int3store(uchar *T, uint A) {
+  *(T) = (uchar)(A);
+  *(T + 1) = (uchar)(A >> 8);
+  *(T + 2) = (uchar)(A >> 16);
+}
+
 #endif /* _MY_COMM_H */

+ 52 - 7
src/libs/common/packet/packet_server.cc

@@ -933,6 +933,51 @@ BufferChain *encode_eof(BufferChain *bc, uint8_t &pkt_nr)
 	return nbc;
 }
 
+char* net_store_length(char *pkg, ulonglong length)
+{
+	uchar *packet=(uchar*) pkg;
+	if (length < (ulonglong) (251))
+	{
+		*packet=(uchar) length;
+		return (char*) packet+1;
+	}
+	/* 251 is reserved for NULL */
+	if (length < (ulonglong) (65536))
+	{ 
+		*packet++=252;
+		int2store_big_endian(packet,(uint) length);
+		return (char*) packet+2;
+	}
+	if (length < (ulonglong) (16777216))
+	{
+		*packet++=253;
+		int3store(packet,(ulong) length);
+		return (char*) packet+3;
+	}
+	*packet++=254;
+	int8store_big_endian(packet,length);
+	return (char*) packet+8;
+}
+
+int net_store_offset_num(ulonglong length)
+{
+	if (length < 251)
+	{
+		return 1;
+	}
+	/* 251 is reserved for NULL */
+	if (length < 65536)
+	{ 
+		return 3;
+	}
+	if (length < 16777216)
+	{
+		return 4;
+	}
+
+	return 9;
+}
+
 BufferChain *encode_row_data(DtcJob *job, BufferChain *bc, uint8_t &pkt_nr)
 {
 	ResultSet *pstResultSet = job->result;
@@ -986,7 +1031,7 @@ BufferChain *encode_row_data(DtcJob *job, BufferChain *bc, uint8_t &pkt_nr)
 			}
 			case DField::String:
 			case DField::Binary: {
-				row_len++;
+				row_len += net_store_offset_num(v->str.len);
 				row_len += v->str.len;
 				break;
 			}
@@ -1053,16 +1098,16 @@ BufferChain *encode_row_data(DtcJob *job, BufferChain *bc, uint8_t &pkt_nr)
 				break;
 			}
 			case DField::String: {
-				*(r + offset) = (uint8_t)v->str.len;
-				offset++;
-				memcpy(r + offset, v->str.ptr, v->str.len);
+				char* pos = net_store_length(r + offset, v->str.len);
+				offset += net_store_offset_num(v->str.len);
+				memcpy(pos, v->str.ptr, v->str.len);
 				offset += v->str.len;
 				break;
 			}
 			case DField::Binary: {
-				*(r + offset) = (uint8_t)v->bin.len;
-				offset++;
-				memcpy(r + offset, v->bin.ptr, v->bin.len);
+				char* pos = net_store_length(r + offset, v->bin.len);
+				offset += net_store_offset_num(v->bin.len);
+				memcpy(pos, v->bin.ptr, v->bin.len);
 				offset += v->bin.len;
 				break;
 			}

+ 3 - 2
src/rule/re_cache.cc

@@ -218,7 +218,8 @@ int check_dtc_key(hsql::Expr* rule, std::string key)
     }
     else if(rule->isType(kExprOperator) && rule->opType == kOpEquals)
     {
-        if(rule->expr->getName() == key)
+        log4cplus_debug("key string: %s %s", std::string(rule->expr->getName()).c_str(), key.c_str());
+        if(strcasecmp(std::string(rule->expr->getName()).c_str(), key.c_str()) == 0)
             return 1;
     }
 
@@ -277,7 +278,7 @@ bool re_is_cache_sql(SQLParserResult* sql_ast, std::string key)
 
         for(int i = 0; i < stmt->columns->size(); i++)
         {
-            if(std::string(stmt->columns->at(i)) == key)
+            if(strcasecmp(std::string(stmt->columns->at(i)).c_str(), key.c_str()) == 0)
                 return true;
         }
     }

+ 1 - 0
src/rule/re_function.cc

@@ -111,6 +111,7 @@ std::string fun_date_sub(std::vector<Expr*>* elist)
         }
     }
 
+    log4cplus_debug("timestring: %s, type: %d", timestring, (*elist)[1]->datetimeField);
     return timestring;
 }
 

+ 17 - 16
src/rule/rule.cc

@@ -486,6 +486,20 @@ extern "C" int rule_sql_match(const char* szsql, const char* osql, const char* d
 
     log4cplus_debug("input sql: %s", osql);
 
+    std::string db_dot_name = get_table_with_db(dbsession, szsql);
+    if(db_dot_name.length() > 0 && g_map_dtc_yaml.count(db_dot_name) > 0)
+    {
+        dtc_key = get_key_info(g_map_dtc_yaml[db_dot_name]);
+        if(dtc_key.length() == 0)
+        {
+            log4cplus_error("get dtc_key from yaml:%s failed.", db_dot_name.c_str());
+            return -1;
+        }
+        strcpy(out_dtckey, dtc_key.c_str());
+        *out_keytype = rule_get_key_type(g_map_dtc_yaml[db_dot_name]);
+    }
+    log4cplus_debug("dtc key len: %d, key: %s, dbname len: %d, dbname: %s", dtc_key.length(), dtc_key.c_str(), strlen(dbsession), std::string(dbsession).c_str());
+
     if(sql.find("WITHOUT@@") != sql.npos)
     {
         //L1: DTC cache.
@@ -494,7 +508,7 @@ extern "C" int rule_sql_match(const char* szsql, const char* osql, const char* d
     }
 
     hsql::SQLParserResult sql_ast;
-    if(re_parse_sql(sql, &sql_ast) != 0)
+    if(re_parse_sql(osql, &sql_ast) != 0)
     {
         log4cplus_debug("layered: error, parse sql failed.");
         return -1;
@@ -540,20 +554,6 @@ extern "C" int rule_sql_match(const char* szsql, const char* osql, const char* d
         }
     }
 
-    std::string db_dot_name = get_table_with_db(dbsession, szsql);
-    if(db_dot_name.length() > 0 && g_map_dtc_yaml.count(db_dot_name) > 0)
-    {
-        dtc_key = get_key_info(g_map_dtc_yaml[db_dot_name]);
-        if(dtc_key.length() == 0)
-        {
-            log4cplus_error("get dtc_key from yaml:%s failed.", db_dot_name.c_str());
-            return -1;
-        }
-        strcpy(out_dtckey, dtc_key.c_str());
-        *out_keytype = rule_get_key_type(g_map_dtc_yaml[db_dot_name]);
-    }
-        log4cplus_debug("dtc key len: %d, key: %s, dbname len: %d, dbname: %s", dtc_key.length(), dtc_key.c_str(), strlen(dbsession), std::string(dbsession).c_str());
-
     log4cplus_debug("Is dtc instance: %d %d %d", is_dtc_instance(dtc_key), exist_session_db(dbsession), exist_sql_db(&sql_ast));
     if((exist_session_db(dbsession) || (exist_sql_db(&sql_ast))) && !is_dtc_instance(dtc_key))
     {
@@ -597,7 +597,7 @@ extern "C" int rule_sql_match(const char* szsql, const char* osql, const char* d
             log4cplus_debug("name: %s, type: %d", stmt->columns->at(i), stmt->values->at(i)->type);
             if(stmt->values->at(i)->type == hsql::ExprType::kExprLiteralInt)
             {
-                sprintf(sztmp, "%d", stmt->values->at(i)->ival);
+                sprintf(sztmp, "%lld", stmt->values->at(i)->ival);
                 tempsql += sztmp;
             }
             else if(stmt->values->at(i)->type == hsql::ExprType::kExprLiteralFloat)
@@ -625,6 +625,7 @@ extern "C" int rule_sql_match(const char* szsql, const char* osql, const char* d
     ret = re_match_sql(&sql_ast, expr_rules, ast);  //rule match
     if(ret == 0 || is_update_delete_type(&sql_ast))
     {
+        log4cplus_debug("dtc key: %s", dtc_key.c_str());
         if(re_is_cache_sql(&sql_ast, dtc_key))  //if exist dtc key.
         {
             //L1: DTC cache.