瀏覽代碼

180 supporting delete limit at delete sql in dtc (#200)

* Supporting limit at update/delete in dtc.

* return error if parse failed.

* purge at limit update.
Yang Shuang 1 年之前
父節點
當前提交
d991246589

+ 8 - 0
src/connector/mysql_operation.cc

@@ -1430,6 +1430,10 @@ int ConnectorProcess::process_update(DtcJob *Task)
         return (-7);
     }
 
+    if (Task->requestInfo.limit_count() > 0) {
+            sql_printf(" LIMIT %u", Task->requestInfo.limit_count());
+    }
+
     if (error_no != 0) { // 主要检查PrintfAppend是否发生过错误
         Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
         log4cplus_error("error occur: %d", error_no);
@@ -1482,6 +1486,10 @@ int ConnectorProcess::process_delete(DtcJob *Task)
         return (-7);
     }
 
+    if (Task->requestInfo.limit_count() > 0) {
+            sql_printf(" LIMIT %u", Task->requestInfo.limit_count());
+    }
+
     if (error_no !=
         0) { // 主要检查PrintfAppend是否发生过错误,这里统一检查一次
         Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");

+ 5 - 0
src/core/chain/buffer_process_ask_chain.cc

@@ -1986,6 +1986,11 @@ BufferResult BufferProcessAskChain::reply_connector_answer(DTCJobOperation &job)
 			buffer_purge_data(job);
 			return DTC_CODE_BUFFER_SUCCESS;
 		}
+		if (job.requestInfo.limit_count() > 0) {
+			log4cplus_debug("trigger purge from update.");
+			buffer_purge_data(job);
+			return DTC_CODE_BUFFER_SUCCESS;
+		}
 		return buffer_sync_update(job);
 	case DRequest::Delete:
 		if (job.flag_black_hole())

+ 19 - 6
src/libs/common/my/my_request.cc

@@ -96,8 +96,8 @@ bool MyRequest::load_sql()
 	}
 
 	log4cplus_debug("sql: %s", m_sql.c_str());
-	hsql::SQLParser::parse(m_sql, &m_result);
-	if (m_result.isValid()) {
+	bool r = hsql::SQLParser::parse(m_sql, &m_result);
+	if (r && m_result.isValid()) {
 		log4cplus_debug("load_sql success.");
 		return true;
 	} else {
@@ -270,11 +270,24 @@ uint32_t MyRequest::get_limit_start()
 uint32_t MyRequest::get_limit_count()
 {
 	int t = m_result.getStatement(0)->type();
-	if (t != hsql::StatementType::kStmtSelect) {
-		return 0;
+	LimitDescription* limit;
+	if (t == hsql::StatementType::kStmtSelect) {
+		hsql::SelectStatement *stmt = get_result()->getStatement(0);
+		limit = stmt->limit;
 	}
-	hsql::SelectStatement *stmt = get_result()->getStatement(0);
-	LimitDescription* limit = stmt->limit;
+	else if(t == hsql::StatementType::kStmtUpdate)
+	{
+		hsql::UpdateStatement *stmt = get_result()->getStatement(0);
+		limit = stmt->limit;
+	}
+	else if(t == hsql::StatementType::kStmtDelete)
+	{
+		hsql::DeleteStatement *stmt = get_result()->getStatement(0);
+		limit = stmt->limit;
+	}
+	else
+		return 0;
+	
 	if(limit)
 	{
 		if(limit->limit)

+ 2 - 2
src/rule/re_load.cc

@@ -58,8 +58,8 @@ int do_parse_rule(std::string rules)
     sql += rules;
     sql += ";";
     log4cplus_debug("rule sql: %s", sql.c_str());
-    hsql::SQLParser::parse(sql, &rule_ast);
-    if (rule_ast.isValid() && rule_ast.size() > 0)
+    bool r = hsql::SQLParser::parse(sql, &rule_ast);
+    if (r && rule_ast.isValid() && rule_ast.size() > 0)
     {
         return 0; 
     }

+ 6 - 1
src/rule/re_match.cc

@@ -21,7 +21,12 @@ int do_check_sql()
 int re_parse_sql(std::string sql, hsql::SQLParserResult* sql_ast)
 {
     log4cplus_debug("input sql parse start..");
-    hsql::SQLParser::parse(sql, sql_ast);
+    bool r = hsql::SQLParser::parse(sql, sql_ast);
+    if(r == false)
+    {
+        log4cplus_debug("parse failed, sql: %s", sql.c_str());
+        return -1; 
+    }
     if (!sql_ast->isValid() || sql_ast->size() <= 0)
     {
         log4cplus_debug("valid:%d, size:%d, %s", sql_ast->isValid(), sql_ast->size(), sql.c_str());