Преглед изворни кода

Layer show dbname requirement change (#75)

* async-connector: display layered db/table enhance.

* rule: fix bug.

* rule: fix bug.

* core: fix bug.

* tests: open s2.

* docker: fix bug.

* docker: update

* tests: add agent case.

* tests: close case.

* agent: move get key from accept to init conn.

* agent: multi processes developing.

* conf-gen-utils: fix bugs.

* watchdog: get conf file from dtc.

* async-connector: add custom conf path param.

* rule: add custom conf file path.

* core: change conf dir to ../conf

* docker: fix bug.

* dockerfile: update

* connector: fix bug.

* stat: relative path.

* data: relative path.

* agent-watchdog: rename from dtc to agent-watchdog.

* workflows: update.

* core: conf  relative.

* watchdog: conf file named by Mid

* agent: choose sid from sql.

* dtc: multi instances.

* agent: fix bugs.
Yang Shuang пре 1 година
родитељ
комит
0392b0f342
51 измењених фајлова са 914 додато и 431 уклоњено
  1. 20 16
      .github/workflows/build-and-test.yml
  2. 3 2
      conf/dtc.yaml
  3. 18 19
      dockerfiles/devel/dockerfile
  4. 1 0
      dockerfiles/devel/dtc.cacheonly.yaml
  5. 52 0
      dockerfiles/devel/dtc.dbaddition.ext.yaml
  6. 2 1
      dockerfiles/devel/dtc.dbaddition.s1.yaml
  7. 2 1
      dockerfiles/devel/dtc.dbaddition.s2.yaml
  8. 2 1
      dockerfiles/devel/dtc.dbaddition.s3.yaml
  9. 1 1
      dockerfiles/devel/dtc.layered.yaml
  10. 2 2
      dockerfiles/devel/run.sh
  11. 7 5
      src/agent-watchdog/CMakeLists.txt
  12. 18 4
      src/agent-watchdog/async_conn_entry.cc
  13. 3 1
      src/agent-watchdog/daemons.cc
  14. 204 0
      src/agent-watchdog/main.cc
  15. 2 2
      src/agent-watchdog/sharding_entry.cc
  16. 1 1
      src/agent/CMakeLists.txt
  17. 14 0
      src/agent/common/da_string.c
  18. 4 0
      src/agent/da.c
  19. 1 0
      src/agent/da_conn.c
  20. 0 2
      src/agent/da_listener.c
  21. 27 1
      src/agent/da_msg.c
  22. 2 0
      src/agent/da_msg.h
  23. 33 31
      src/agent/da_request.c
  24. 0 2
      src/agent/da_request.h
  25. 0 57
      src/agent/da_response.c
  26. 2 1
      src/agent/da_server.c
  27. 0 1
      src/agent/my/my_comm.h
  28. 136 32
      src/agent/my/my_parse.c
  29. 1 1
      src/agent/my/my_parse.h
  30. 2 2
      src/agent/my/my_protocol_classic.c
  31. 2 4
      src/complex/cm_load.cc
  32. 1 1
      src/complex/cm_load.h
  33. 14 2
      src/complex/main.cc
  34. 2 169
      src/complex/transaction_task.cc
  35. 1 1
      src/libs/common/config/dbconfig.cc
  36. 4 4
      src/libs/common/daemon/daemon.cc
  37. 2 2
      src/libs/common/dtc_global.h
  38. 1 1
      src/libs/common/key/key_route_ask_chain.cc
  39. 1 1
      src/libs/common/key/key_route_ask_chain.h
  40. 1 1
      src/libs/common/my/my_request.cc
  41. 3 9
      src/libs/common/packet/packet_server.cc
  42. 1 1
      src/libs/common/protocol.h
  43. 4 1
      src/libs/common/task/task_base.cc
  44. 1 1
      src/libs/stat/stat_dtc.h
  45. 38 0
      src/rule/re_cache.cc
  46. 2 1
      src/rule/re_cache.h
  47. 3 3
      src/rule/re_load.cc
  48. 96 12
      src/rule/rule.cc
  49. 4 1
      src/rule/rule.h
  50. 57 30
      src/utils/conf-gen-utils.cc
  51. 116 0
      tests/init.ext.sql

+ 20 - 16
.github/workflows/build-and-test.yml

@@ -44,7 +44,7 @@ jobs:
           cd ${{github.workspace}}
           cp build/src/core/core dockerfiles/devel/
           cp build/src/agent/dtcagent dockerfiles/devel/
-          cp build/src/agent-watchdog/dtc dockerfiles/devel/
+          cp build/src/agent-watchdog/agent-watchdog dockerfiles/devel/
           cp build/src/complex/async-connector dockerfiles/devel/
           cp build/src/connector/connector dockerfiles/devel/
           cp build/src/data_lifecycle/data-lifecycle-manager dockerfiles/devel/
@@ -92,10 +92,10 @@ jobs:
         ports:
           - 20015:20015
         volumes:
-          - /usr/local/etc:/etc/dtc/
+          - /usr/local/etc:/usr/local/dtc/conf/
         options: --name dtc
         env:
-          DTC_BIN: dtc
+          DTC_BIN: agent-watchdog
           DTC_ARGV: -c
     steps:
       - uses: actions/checkout@v3
@@ -136,20 +136,21 @@ jobs:
         ports:
           - 20015:20015
         volumes:
-          - /usr/local/etc:/etc/dtc/
+          - /usr/local/etc:/usr/local/dtc/conf/
         options: --name dtc
         env:
-          DTC_BIN: dtc
+          DTC_BIN: agent-watchdog
           DTC_ARGV: -c        
       agent:
         image: docker.io/kfysck/devel:all
         ports:
           - 12001:12001
         volumes:
-          - /usr/local/etc:/etc/dtc/
+          - /usr/local/etc:/usr/local/dtc/conf/
         options: --name agent
         env:
-          DTC_BIN: dtcagent
+          DTC_BIN: agent-watchdog
+          DTC_ARGV: -a
     steps:
       - uses: actions/checkout@v3
       
@@ -185,6 +186,7 @@ jobs:
 
 # Testing Layered Storage.
   test-agent-layered:
+    if: false
     needs: LAYERED
     runs-on: ubuntu-latest
     services:
@@ -199,10 +201,10 @@ jobs:
         ports:
           - 12001:12001
         volumes:
-          - /usr/local/etc:/etc/dtc/
+          - /usr/local/etc:/usr/local/dtc/conf/
         options: --name agent
         env:
-          DTC_BIN: dtc
+          DTC_BIN: agent-watchdog
           DTC_ARGV: -ayc
     steps:
       - uses: actions/checkout@v3
@@ -268,10 +270,10 @@ jobs:
         ports:
           - 20015:20015
         volumes:
-          - /usr/local/etc:/etc/dtc/
+          - /usr/local/etc:/usr/local/dtc/conf/
         options: --name dtc
         env:
-          DTC_BIN: dtc
+          DTC_BIN: agent-watchdog
           DTC_ARGV: -c
     steps:
       - uses: actions/checkout@v3
@@ -326,7 +328,7 @@ jobs:
         ports:
           - 12001:12001
         volumes:
-          - /usr/local/etc:/etc/dtc/
+          - /usr/local/etc:/usr/local/dtc/conf/
         options: --name agent
         env:
           DTC_BIN: dtcagent       
@@ -335,10 +337,10 @@ jobs:
         ports:
           - 20015:20015
         volumes:
-          - /usr/local/etc:/etc/dtc/
+          - /usr/local/etc:/usr/local/dtc/conf/
         options: --name dtc
         env:
-          DTC_BIN: dtc
+          DTC_BIN: agent-watchdog
           DTC_ARGV: -c
     steps:
       - uses: actions/checkout@v3
@@ -380,7 +382,6 @@ jobs:
 
 # Testing Cache with Datasource Scene 2 (Single DB and Multi Table)
   test-dtc-ds-S-db-M-table:
-    if: false
     needs: CORE
     runs-on: ubuntu-latest
     services:
@@ -395,8 +396,11 @@ jobs:
         ports:
           - 20015:20015
         volumes:
-          - /usr/local/etc:/etc/dtc/
+          - /usr/local/etc:/usr/local/dtc/conf/
         options: --name dtc
+        env:
+          DTC_BIN: agent-watchdog
+          DTC_ARGV: -c
     steps:
       - uses: actions/checkout@v3
       

+ 3 - 2
conf/dtc.yaml

@@ -22,6 +22,7 @@ connection: &connection
   pwd: password 
 
 primary:
+  db: &db mydb
   table: &table name
   layered.rule: '(a>1 and a<3) or uid = 0'
   cache:
@@ -33,7 +34,7 @@ primary:
       - {name: age, type: signed, size: 4}
   hot:
     logic:
-      {db: &db layer2, table: *table, connection: *connection}
+      {db: mydb_layer2, table: *table, connection: *connection}
     real:
       - {addr: mysql-01.local:3306, user: username, pwd: password, db: {prefix: &prefix [*db, _], start: 0, last: 10}}
       - {addr: mysql-02.local:3306, user: username, pwd: password, db: {prefix: *prefix, start: 11, last: 25}}
@@ -43,7 +44,7 @@ primary:
       table: {prefix: [*table, _], start: 0, last: 9}
   full:
     logic:
-      {db: layer3, table: *table, connection: *connection}
+      {db: mydb_layer3, table: *table, connection: *connection}
     real:
       - {addr: 127.0.0.1:3306, user: username, pwd: password, db: layer3}
 

+ 18 - 19
dockerfiles/devel/dockerfile

@@ -1,26 +1,28 @@
 FROM ubuntu
 
 ARG basepath=/usr/local/dtc
-ARG confpath=/etc/dtc
+ARG binpath=$basepath/bin
+ARG confpath=$basepath/conf
 ARG logpath=/var/log/dtc
 
 RUN mkdir -p $basepath
 RUN mkdir -p $basepath/data
 RUN mkdir -p $basepath/stat
 RUN mkdir -p $basepath/log
+RUN mkdir -p $binpath
 RUN mkdir -p $confpath
 RUN mkdir -p $logpath
 
-COPY core $basepath/core
-COPY dtcagent $basepath/dtcagent
-COPY dtc $basepath/dtc
-COPY async-connector $basepath/async-connector
-COPY connector $basepath/connector
-COPY data-lifecycle-manager $basepath/data-lifecycle-manager
-COPY hwcserver $basepath/hwcserver
-COPY librule.so $basepath/librule.so
-COPY libsqlparser.so $basepath/libsqlparser.so
-COPY conf-gen-utils $basepath/conf-gen-utils
+COPY core $binpath/core
+COPY dtcagent $binpath/dtcagent
+COPY agent-watchdog $binpath/agent-watchdog
+COPY async-connector $binpath/async-connector
+COPY connector $binpath/connector
+COPY data-lifecycle-manager $binpath/data-lifecycle-manager
+COPY hwcserver $binpath/hwcserver
+COPY librule.so $binpath/librule.so
+COPY libsqlparser.so $binpath/libsqlparser.so
+COPY conf-gen-utils $binpath/conf-gen-utils
 
 COPY librule.so /usr/local/lib/librule.so
 COPY libsqlparser.so /usr/local/lib/libsqlparser.so
@@ -35,16 +37,13 @@ COPY mysql-connector-java-5.1.49.jar $basepath/sharding/lib/mysql-connector-java
 
 COPY run.sh $basepath/run.sh
 
-RUN chmod +x $basepath/core
-RUN chmod +x $basepath/dtcagent
-RUN chmod +x $basepath/connector
+RUN chmod +x $binpath/core
+RUN chmod +x $binpath/dtcagent
+RUN chmod +x $binpath/connector
 RUN chmod +x $basepath/run.sh
-RUN chmod +x $basepath/conf-gen-utils
-RUN chmod +x $basepath/async-connector
+RUN chmod +x $binpath/conf-gen-utils
+RUN chmod +x $binpath/async-connector
 
 ENV LD_LIBRARY_PATH=:/usr/local/lib
 
 CMD ["/usr/local/dtc/run.sh"]
-
-#COPY dtc.yaml $confpath/dtc.yaml
-#COPY log4cplus.conf $confpath/log4cplus.conf

+ 1 - 0
dockerfiles/devel/dtc.cacheonly.yaml

@@ -8,6 +8,7 @@ props:
   shm.mem.size: 100 #MB
 
 primary:
+  db: dtc
   table: &table opensource
   cache:
     field:

+ 52 - 0
dockerfiles/devel/dtc.dbaddition.ext.yaml

@@ -0,0 +1,52 @@
+# 
+# DTC configure file. v2
+# Cache with Datasource Test cases.
+# Scene 2: Single DB Sharding Table
+# table name: opensource_0 ... opensource_9
+# total: 10 tables.
+#
+props:
+  log.level: debug
+  listener.port.dtc: 20015
+  shm.mem.size: 100 #MB
+
+connection: &connection
+  addr: 127.0.0.1:3307
+  user: username
+  pwd: password 
+
+primary:
+  db: &db dtc
+  table: &table opensource
+  cache:
+    field:
+      - {name: &key uid, type: signed, size: 4}
+      - {name: name, type: string, size: 50}
+      - {name: city, type: string, size: 50}
+      - {name: sex, type: signed, size: 4}
+      - {name: age, type: signed, size: 4}
+  hot:
+    logic:
+      {db: *db, table: *table, connection: *connection}
+    real:
+      - {addr: mysql:3306, user: root, pwd: 123456, db: layer2}
+    sharding:
+      key: *key
+      table: {prefix: [*table, _], start: 0, last: 9}
+
+extension:
+  - logic:
+      {db: ext1, table: monday, connection: *connection}
+    real:
+      - {addr: mysql:3306, user: root, pwd: 123456, db: january}
+  - logic:
+      {db: ext2, table: tuesday, connection: *connection}
+    real:
+      - {addr: mysql:3306, user: root, pwd: 123456, db: weekdays}
+  - logic:
+      {db: ext2, table: &sunday sunday, connection: *connection}
+    real:
+      - {addr: mysql:3306, user: root, pwd: 123456, db: weekdays}
+    sharding:
+      key: day
+      table: {prefix: [*sunday, _], start: 0, last: 2}      

+ 2 - 1
dockerfiles/devel/dtc.dbaddition.s1.yaml

@@ -14,6 +14,7 @@ connection: &connection
   pwd: password 
 
 primary:
+  db: &db dtc
   table: &table opensource
   cache:
     field:
@@ -24,6 +25,6 @@ primary:
       - {name: age, type: signed, size: 4}
   hot:
     logic:
-      {db: &db layer2, table: *table, connection: *connection}
+      {db: *db, table: *table, connection: *connection}
     real:
       - {addr: mysql:3306, user: root, pwd: 123456, db: layer2}

+ 2 - 1
dockerfiles/devel/dtc.dbaddition.s2.yaml

@@ -16,6 +16,7 @@ connection: &connection
   pwd: password 
 
 primary:
+  db: &db dtc
   table: &table opensource
   cache:
     field:
@@ -26,7 +27,7 @@ primary:
       - {name: age, type: signed, size: 4}
   hot:
     logic:
-      {db: &db layer2, table: *table, connection: *connection}
+      {db: *db, table: *table, connection: *connection}
     real:
       - {addr: mysql:3306, user: root, pwd: 123456, db: layer2}
     sharding:

+ 2 - 1
dockerfiles/devel/dtc.dbaddition.s3.yaml

@@ -14,6 +14,7 @@ connection: &connection
   pwd: password 
 
 primary:
+  db: &db dtc
   table: &table opensource
   cache:
     field:
@@ -24,7 +25,7 @@ primary:
       - {name: age, type: signed, size: 4}
   hot:
     logic:
-      {db: &db layer2, table: *table, connection: *connection}
+      {db: *db, table: *table, connection: *connection}
     real:
       - {addr: mysql-01.local:3306, user: username, pwd: password, db: single}
     sharding:

+ 1 - 1
dockerfiles/devel/dtc.layered.yaml

@@ -24,7 +24,7 @@ primary:
       - {name: age, type: signed, size: 4}
   hot:
     logic:
-      {db: &db layer2, table: *table, connection: *connection}
+      {db: layer2, table: *table, connection: *connection}
     real:
       - {addr: mysql:3306, user: root, pwd: 123456, db: layer2}
   full:

+ 2 - 2
dockerfiles/devel/run.sh

@@ -4,9 +4,9 @@ sleep_count=0
 
 while [ $sleep_count -le 100 ]
 do
-    if [ -f "/etc/dtc/dtc.yaml" ]; then 
+    if [ -f "/usr/local/dtc/conf/dtc.yaml" ]; then 
         echo "Start running process: "$DTC_BIN","$DTC_ARGV
-        cd /usr/local/dtc/
+        cd /usr/local/dtc/bin/
         ./$DTC_BIN $DTC_ARGV
         break
     else

+ 7 - 5
src/agent-watchdog/CMakeLists.txt

@@ -4,13 +4,15 @@ INCLUDE_DIRECTORIES(
     ${PROJECT_SOURCE_DIR}/src/libs/stat
     ${PROJECT_SOURCE_DIR}/src/core
     ${PROJECT_SOURCE_DIR}/src/libs/log4cplus/include
-    ${PROJECT_SOURCE_DIR}/src/libs/yaml-cpp/include)
+    ${PROJECT_SOURCE_DIR}/src/libs/yaml-cpp/include
+    ${PROJECT_SOURCE_DIR}/src/libs/mxml/include)
     
 LINK_DIRECTORIES(
     ${PROJECT_SOURCE_DIR}/src/libs/log4cplus/libs
     ${PROJECT_SOURCE_DIR}/src/libs/yaml-cpp/libs
     ${PROJECT_SOURCE_DIR}/build/src/libs/common
-    ${PROJECT_SOURCE_DIR}/build/src/libs/stat)
+    ${PROJECT_SOURCE_DIR}/build/src/libs/stat
+    ${PROJECT_SOURCE_DIR}/src/libs/mxml/libs)
 
 include(../utils.cmake)
 
@@ -27,7 +29,7 @@ LINK_LIBRARIES(dl)
 ADD_DEFINITIONS("-g -fPIC -fpermissive -std=gnu++11")
 ADD_DEFINITIONS(-Wno-builtin-macro-redefined)
 
-ADD_EXECUTABLE(dtc ${SRC_LIST1})
+ADD_EXECUTABLE(agent-watchdog ${SRC_LIST1})
 
-TARGET_LINK_LIBRARIES(dtc libstat.a libcommon.a libyaml-cpp.a liblog4cplus.a)
-redefine_file_macro(dtc)
+TARGET_LINK_LIBRARIES(agent-watchdog libstat.a libcommon.a libyaml-cpp.a liblog4cplus.a mxml)
+redefine_file_macro(agent-watchdog)

+ 18 - 4
src/agent-watchdog/async_conn_entry.cc

@@ -16,8 +16,11 @@
 */
 #include "async_conn_entry.h"
 #include <unistd.h>
+#include "log.h"
 
+#define ROOT_PATH "/etc/dtc/"
 const char *fulldata_name = "async-connector";
+extern std::map<std::string, std::string> map_dtc_conf; //key:value --> dtc addr:conf file name
 
 AsyncConnEntry::AsyncConnEntry(WatchDog *watchdog, int sec)
 	: WatchDogDaemon(watchdog, sec)
@@ -31,9 +34,20 @@ AsyncConnEntry::~AsyncConnEntry(void)
 
 void AsyncConnEntry::exec()
 {
-	char *argv[2];
+	std::map<std::string, std::string>::iterator it;
+	for(it = map_dtc_conf.begin(); it != map_dtc_conf.end(); it++)
+	{
+		std::string addr = (*it).first;
+		std::string filename = (*it).second;
 
-	argv[1] = NULL;
-	argv[0] = (char *)fulldata_name;
-	execv(argv[0], argv);
+		std::string filepath = string(ROOT_PATH) + filename;
+		log4cplus_debug("filepath:%s", filepath.c_str());
+
+		char *argv[3];
+
+		argv[2] = NULL;
+		argv[0] = (char *)fulldata_name;
+		argv[1] = (char *)filepath.c_str();
+		execv(argv[0], argv);
+	}
 }

+ 3 - 1
src/agent-watchdog/daemons.cc

@@ -86,7 +86,9 @@ void close_sharding()
 {
 	char* p[1];
 	p[0] = NULL;
-	execv("./../../sharding/bin/stop.sh", p);
+	execv("/usr/local/agent/sharding/bin/stop.sh", p);
+
+	system("/usr/local/agent/sharding/bin/stop.sh");
 }
 
 void WatchDog::run_loop()

+ 204 - 0
src/agent-watchdog/main.cc

@@ -10,9 +10,14 @@
 #include "core_entry.h"
 #include "agent_entry.h"
 #include "proc_title.h"
+#include "mxml.h"
+#include "../agent/my/my_comm.h"
 
 extern char cache_file[256];
 extern char table_file[256];
+#define ROOT_PATH "/etc/dtc/"
+char agent_file[256] = "/etc/dtc/agent.xml";
+std::map<std::string, std::string> map_dtc_conf; //key:value --> dtc addr:conf file name
 
 #define DA_VERSION_MAJOR	1
 #define DA_VERSION_MINOR	0
@@ -224,6 +229,196 @@ int init_watchdog()
 	return 0;
 }
 
+bool ParseAgentConf(std::string path){
+    FILE *fp = fopen(path.c_str(), "r");
+    if (fp == NULL) {
+        log4cplus_error("conf: failed to open configuration '%s': %s", path.c_str(), strerror(errno));
+        return false;
+    }
+    mxml_node_t* tree = mxmlLoadFile(NULL, fp, MXML_TEXT_CALLBACK);
+    if (tree == NULL) {
+        log4cplus_error("mxmlLoadFile error, file: %s", path.c_str());
+        return false;
+    }
+    fclose(fp);
+
+	mxml_node_t *poolnode, *servernode, *instancenode, *lognode;
+
+	for (poolnode = mxmlFindElement(tree, tree, "MODULE",
+	NULL, NULL, MXML_DESCEND); poolnode != NULL;
+			poolnode = mxmlFindElement(poolnode, tree, "MODULE",
+			NULL, NULL, MXML_DESCEND)) 
+	{
+		
+		for (servernode = mxmlFindElement(poolnode, poolnode, "CACHESHARDING",
+		NULL, NULL, MXML_DESCEND); servernode != NULL; servernode =
+				mxmlFindElement(servernode, poolnode, "CACHESHARDING",
+				NULL, NULL, MXML_DESCEND)) 
+		{
+			char *nodev = (char *)mxmlElementGetAttr(servernode, "ShardingName");
+			if (nodev == NULL) {
+				return false;
+			}
+			if(strcmp(nodev, "complex") == 0)
+				continue;
+
+			for (instancenode = mxmlFindElement(servernode, servernode, "INSTANCE",
+				NULL, NULL, MXML_DESCEND); instancenode != NULL; instancenode =
+				mxmlFindElement(instancenode, servernode, "INSTANCE",
+					NULL, NULL, MXML_DESCEND)) 
+				{
+			
+					char *argment = (char *)mxmlElementGetAttr(instancenode, "Enable");
+					if (argment == NULL) {
+						return false;
+					}
+					if(strcmp(argment, "true") != 0)
+						continue;
+
+					argment = (char *)mxmlElementGetAttr(instancenode, "Addr");
+					if (argment == NULL) {
+						return false;
+					}
+
+					std::string listen_on = argment;
+					log4cplus_debug("addr:%s %s", argment, (char *)mxmlElementGetAttr(poolnode, "Mid"));
+					std::string::size_type pos = listen_on.find_last_of(":");
+					if(pos == std::string::npos){
+						log4cplus_error("string find error, file: %s", path.c_str());
+						return false;
+					}
+					std::string addr = listen_on.substr(0, pos);
+					char filename[250] = {0};
+					sprintf(filename, "dtc-conf-%d.yaml", atoi((char *)mxmlElementGetAttr(poolnode, "Mid")));
+					map_dtc_conf[addr] = filename;
+
+				}
+		}
+	}
+
+    mxmlDelete(tree);
+    
+    return true;
+}
+
+std::string send_select_dtcyaml(const char* serverIp, int port)
+{
+	log4cplus_debug("server ip:%s, port:%d", serverIp, port);
+    sockaddr_in sendSockAddr;   
+    bzero((char*)&sendSockAddr, sizeof(sendSockAddr)); 
+    sendSockAddr.sin_family = AF_INET; 
+    sendSockAddr.sin_addr.s_addr = inet_addr(serverIp);
+    sendSockAddr.sin_port = htons(port);
+    int clientSd = socket(AF_INET, SOCK_STREAM, 0);
+    int status = connect(clientSd,
+                         (sockaddr*) &sendSockAddr, sizeof(sendSockAddr));
+    if(status < 0)
+    {
+        log4cplus_error("Error connecting to socket!");
+		return "";
+    }
+    log4cplus_debug("Connected to the server!");
+    int bytesRead = 0, bytesWritten = 0;
+	char greeting[500] = {0};
+	bytesRead = recv(clientSd, (char*)&greeting, sizeof(greeting), 0);
+	log4cplus_debug("greeting msg len:%d", bytesRead);
+	char send_msg[1024] = {0};
+	int len = 0;
+	struct DTC_HEADER_V2 send_header = {0};
+	int cmd_len = strlen("select dtcyaml");
+	send_header.version = 2;
+	send_header.packet_len = MYSQL_HEADER_SIZE + 3 + cmd_len + sizeof(send_header);
+	memcpy(send_msg, &send_header, sizeof(struct DTC_HEADER_V2));
+	len += sizeof(struct DTC_HEADER_V2);
+	uint8_t send_mysql_header[MYSQL_HEADER_SIZE] = {0};
+	int_conv_3(send_mysql_header, (uint)cmd_len+3);
+	send_mysql_header[3] = 0; //pkt_nr;
+	memcpy(send_msg + sizeof(struct DTC_HEADER_V2), send_mysql_header, MYSQL_HEADER_SIZE);
+	len += MYSQL_HEADER_SIZE;
+	send_msg[sizeof(struct DTC_HEADER_V2) + MYSQL_HEADER_SIZE] = 0x03; //query
+	send_msg[sizeof(struct DTC_HEADER_V2) + MYSQL_HEADER_SIZE + 1] = 0x0;
+	send_msg[sizeof(struct DTC_HEADER_V2) + MYSQL_HEADER_SIZE + 2] = 0x01;
+	len += 3;
+	memcpy(send_msg + sizeof(struct DTC_HEADER_V2) + MYSQL_HEADER_SIZE + 3, "select dtcyaml", cmd_len);
+	len += cmd_len;
+	bytesWritten = send(clientSd, send_msg, len, 0);
+	log4cplus_debug("Awaiting server response..., sent len:%d", len);
+	struct DTC_HEADER_V2 header = {0};
+	bytesRead = recv(clientSd, (char*)&header, sizeof(header), 0);
+	log4cplus_debug("bytesRead: %d, packet len: %d, dbname len:%d ver: %d admin: %d, layer: %d, id: %d", 
+			bytesRead, header.packet_len, header.dbname_len, header.version, header.admin, header.layer, header.id);
+	std::string conf_str = "";
+	if(header.packet_len > 0 && header.packet_len <= 65535)
+	{
+		int ilen = header.packet_len - sizeof(header) + 1;
+		char* msg = new char[ilen];
+		memset(msg, 0, ilen);
+		log4cplus_debug("ilen: %d", ilen);
+		bytesRead = recv(clientSd, (char*)msg, ilen - 1, 0);
+		log4cplus_debug("bytesRead: %d", bytesRead);
+		conf_str = msg;
+		delete msg;
+	}
+	else
+	{
+		log4cplus_error("header.packet_len: %d error", header.packet_len);
+	}
+
+    close(clientSd);
+    log4cplus_debug("Bytes written: %d, Bytes read: %d, Connection closed", bytesWritten, bytesRead);
+    return conf_str;
+}
+
+int get_all_dtc_confs()
+{
+	//load xml
+	if(false == ParseAgentConf(agent_file)){
+        log4cplus_error("DataConf ParseConf error.");
+        return -1;
+    }
+
+	//get each dtc instance conn
+	int success_num = 0;
+	std::map<std::string, std::string>::iterator it;
+	for(it = map_dtc_conf.begin(); it != map_dtc_conf.end(); it++)
+	{
+		std::string addr = (*it).first;
+		std::string filename = (*it).second;
+
+		//TODO: send select dtcyaml
+		char* content = NULL;
+		int content_len = 0;
+		log4cplus_debug("addr:%s", addr.c_str());
+		std::string str = send_select_dtcyaml((addr.substr(0, addr.find(':'))).c_str(), atoi((addr.substr(addr.find(':')+1)).c_str()));
+		content = str.c_str();
+		content_len = str.length();
+		log4cplus_debug("content:%s", content);
+
+		//save conf file and rename.
+		if(content != NULL && content_len > 0)
+		{
+			std::string filepath = string(ROOT_PATH) + filename;
+			log4cplus_debug("filepath:%s", filepath.c_str());
+			FILE *fp = fopen(filepath.c_str(), "w");
+			if (fp == NULL)
+			{
+				log4cplus_error("open file %s error", filepath.c_str());
+				continue;
+			}
+
+			fprintf(fp, "%s", content);
+
+			fclose(fp);
+			success_num++;
+		}
+	}
+
+	if(success_num > 0)
+		return 0;
+	else
+		return -2;
+}
+
 
 int main(int argc, char* argv[])
 {
@@ -249,6 +444,15 @@ int main(int argc, char* argv[])
 		exit(0);
 	}
 
+	if(load_agent || load_sharding || load_asyncconn)
+	{
+		if(get_all_dtc_confs() < 0)
+		{
+			log4cplus_error("get dtc conf files failed, process exit right now.");
+			exit(0);
+		}
+	}
+
 	if (load_sharding || load_all) {
 		if(start_sharding(wdog, delay) < 0)
 			log4cplus_error("start sharding failed.");

+ 2 - 2
src/agent-watchdog/sharding_entry.cc

@@ -34,11 +34,11 @@ void ShardingEntry::exec()
 {
 	char *argv[4];
 
-	int ret = system("/usr/local/dtc/conf-gen-utils");
+	int ret = system("/usr/local/agent/bin/conf-gen-utils");
 	if(ret == 0)
 	{
 		set_proc_title("agent_sharding");
-		argv[0] = (char *)"/usr/local/dtc/sharding/bin/start.sh";
+		argv[0] = (char *)"/usr/local/agent/sharding/bin/start.sh";
 		argv[1] = "3307";
 		argv[2] = "/etc/dtc";
 		argv[3] = NULL;

+ 1 - 1
src/agent/CMakeLists.txt

@@ -39,7 +39,7 @@ LINK_LIBRARIES(pthread)
 LINK_LIBRARIES(librule.so)
 
 ADD_DEFINITIONS ("-D_GNU_SOURCE")
-ADD_DEFINITIONS(-Wno-builtin-macro-redefined)
+ADD_DEFINITIONS(-Wno-builtin-macro-redefined -g)
 
 ADD_EXECUTABLE(dtcagent ${SRC_LIST})
 

+ 14 - 0
src/agent/common/da_string.c

@@ -123,6 +123,20 @@ int string_compare(const struct string *s1, const struct string *s2)
 	return da_strncmp(s1->data, s2->data, s1->len);
 }
 
+int string_compare_nonsentive(const struct string *s1, const struct string *s2)
+{
+	struct string tmp1, tmp2;
+	if (s1->len != s2->len) {
+		return s1->len > s2->len ? 1 : -1;
+	}
+	
+	string_copy(&tmp1, s1, s1->len);
+	string_copy(&tmp2, s2, s2->len);
+	string_upper(&tmp1);
+	string_upper(&tmp2);
+	return da_strncmp(&tmp1.data, &tmp2.data, &tmp1.len);
+}
+
 static const char *_safe_check_longlong(const char *fmt, int32_t *have_longlong)
 {
 	*have_longlong = false;

+ 4 - 0
src/agent/da.c

@@ -560,6 +560,7 @@ static void da_post_run(struct instance *dai) {
 static void da_run(struct instance *dai) {
 	int status;
 	struct context *ctx;
+	struct conn* c = NULL;
 
 	ctx = core_start(dai);
 	if (ctx == NULL) {
@@ -588,8 +589,11 @@ static void da_run(struct instance *dai) {
 			_set_remote_log_config_((char *)dai->ctx->cf->stCL.remote_log_ip.data, dai->ctx->cf->stCL.remote_log_port, dai->ctx->cf->localip, iPort, iBid, dai->ctx->cf->stCL.remote_log_switch);
 		}
 	}
+	else
+		return ;
 
 	tv_update_date(0, 1);
+
 	/* run rabbit run */
 	while (!da_stop) {
 		status = core_loop(dai->ctx);

+ 1 - 0
src/agent/da_conn.c

@@ -98,6 +98,7 @@ static struct conn *_conn_get() {
 	c->error=0;
 	c->writecached=0;
 	c->isvalid = 0;
+	memset(c->dbname, 0, 250);
 
 	ntotal_conn++;
 	ncurr_conn++;

+ 0 - 2
src/agent/da_listener.c

@@ -383,8 +383,6 @@ static int listener_accept(struct context *ctx, struct conn *l)
 	}
 	c->enqueue_outq(ctx, c, smsg);
 
-	//init dtc key info
-	request_dtc_key_define(ctx, c);
 	return 0;
 }
 

+ 27 - 1
src/agent/da_msg.c

@@ -214,6 +214,7 @@ static struct msg *_msg_get() {
 	m->sending = 0;
 
 	m->pkt_nr = 0;
+	m->mid = 0;
 
 	return m;
 }
@@ -549,8 +550,33 @@ struct mbuf *msg_insert_mem_bulk(struct msg *msg,struct mbuf *mbuf,uint8_t *pos,
 uint32_t msg_backend_idx(struct msg *msg, uint8_t *key, uint32_t keylen) {
 	struct conn *conn = msg->owner;
 	struct server_pool *pool = conn->owner;
+	uint32_t i ;
+	struct server_pool *temp_pool = NULL;
+	struct context *ctx = pool->ctx;
+	log_debug("msg backend idx entry");
+	for(i = 0 ; i < array_n(&(ctx->pool)) ; i ++){
+		struct string tmp1, tmp2;
+		log_debug("AAAAAAAAAAA 111111111");
+		temp_pool = (struct server_pool *)array_get(&(ctx->pool), i);
+		string_copy(&tmp2, temp_pool->name.data, temp_pool->name.len);
+		string_upper(&tmp2);		
+		log_debug("AAAAAAAAAAA 222222222222: %s", tmp2.data);
+		log_debug("AAAAAAAAAAA 333333333333: %s", msg->table_name.data);
+
+		//if(string_compare(&tmp2, &msg->table_name) == 0)
+		if(msg->mid == temp_pool->mid)
+			break;
+		else
+			temp_pool = NULL;			
+	}
 
-	return server_pool_idx(pool, key, keylen);
+	if(temp_pool)
+		return server_pool_idx(temp_pool, key, keylen);
+	else
+	{
+		temp_pool = (struct server_pool *)array_get(&(ctx->pool), 0);
+		return server_pool_idx(temp_pool, NULL, 0);
+	}
 }
 
 static int msg_send_chain(struct context *ctx, struct conn *conn,

+ 2 - 0
src/agent/da_msg.h

@@ -153,6 +153,8 @@ struct msg {
 	enum enum_server_command command; /* mysql request command type */
 	enum enum_agent_admin admin;
 	uint8_t layer;
+	int mid;
+	struct string table_name;
 	union COM_DATA data;
 
 	int err; /* errno on error? */

+ 33 - 31
src/agent/da_request.c

@@ -29,9 +29,6 @@
 #include "da_time.h"
 #include "my/my_comm.h"
 
-extern char g_dtc_key[DTC_KEY_MAX];
-extern int g_dtc_key_type;
-
 void req_put(struct msg *msg)
 {
 	struct msg *pmsg; /* peer message (response) */
@@ -450,16 +447,47 @@ static void req_forward(struct context *ctx, struct conn *c_conn,
 	struct conn *s_conn;
 	struct server_pool *pool;
 	struct cache_instance *ci;
+	uint32_t i;
+	struct server_pool *temp_pool = NULL;
 	log_debug("req_forward entry");
 
 	/*insert msg to client imsgq,waiting for
 	 *the response,first add to backworker's queue
 	 */
 	c_conn->enqueue_inq(ctx, c_conn, msg);
-
 	pool = c_conn->owner;
+
+	for(i = 0 ; i < array_n(&(ctx->pool)) ; i ++){
+		log_debug("AAAAAAAAAA 111111, len: %d, my table name: %s", msg->table_name.len, msg->table_name.data);
+		temp_pool = (struct server_pool *)array_get(&(ctx->pool), i);
+		log_debug("AAAAAAAAAA 22222222 :MSG->MID: %d, TEMP_POOL->MID: %d", msg->mid, temp_pool->mid);
+		if(msg->mid == 0)
+			break;
+		if(msg->mid == temp_pool->mid)
+			break;
+		else
+			temp_pool = NULL;
+	}
+	if(temp_pool == NULL){
+		log_debug("s_conn null");
+		//client connection is still exist,no swallow
+		msg->done = 1;
+		msg->error = 1;
+		msg->err = MSG_REQ_FORWARD_ERR;
+		if (msg->frag_owner != NULL) {
+			msg->frag_owner->nfrag_done++;
+		}
+		if (req_done(c_conn, msg)) {
+			rsp_forward(ctx, c_conn, msg);
+		}
+		stats_pool_incr(ctx, pool, forward_error);
+		log_error("msg :%" PRIu64 " from c %d ,get s_conn fail!",
+			  msg->id, c_conn->fd);
+		return;
+	}
+
 	s_conn =
-		server_pool_conn(ctx, (struct server_pool *)c_conn->owner, msg);
+		server_pool_conn(ctx, temp_pool, msg);
 	if (s_conn == NULL) {
 		log_debug("s_conn null");
 		//client connection is still exist,no swallow
@@ -758,32 +786,6 @@ ferror:
 	return true;
 }
 
-void request_dtc_key_define(struct context *ctx, struct conn *c)
-{
-	struct msg *msg = NULL;
-	int ret = 0;
-	if (g_dtc_key_type != -1)
-		return;
-
-	msg = net_send_desc_dtctable(c);
-	if (msg == NULL) {
-		log_error("error code:%d", ret);
-		return;
-	}
-	ret = dtc_header_add(msg, CMD_KEY_DEFINE, NULL);
-	if (ret) {
-		log_error("error code:%d %p", ret, msg);
-		return;
-	}
-
-	uint64_t rkey = 1;
-	msg->idx = msg_backend_idx(msg, (uint8_t *)&rkey, sizeof(uint64_t));
-	log_debug(
-		"request process will forward to dtc. msg len: %d, msg id: %d, ret:%d, idx:%d",
-		msg->mlen, msg->id, ret, msg->idx);
-	req_forward(ctx, c, msg);
-}
-
 void error_reply(struct msg *msg, struct conn *conn, struct context *ctx, int errcode)
 {
 	if (net_send_error(msg, conn) < 0)

+ 0 - 2
src/agent/da_request.h

@@ -52,6 +52,4 @@ static void req_forward(struct context *ctx, struct conn *c_conn,
 
 void error_reply(struct msg *msg, struct conn *conn, struct context *ctx, int errcode);
 
-void request_dtc_key_define(struct context *ctx, struct conn *c);
-
 #endif /* DA_REQUEST_H_ */

+ 0 - 57
src/agent/da_response.c

@@ -29,9 +29,6 @@
 #include "da_top_percentile.h"
 #include "my/my_comm.h"
 
-extern char g_dtc_key[DTC_KEY_MAX];
-extern int g_dtc_key_type;
-
 void rsp_put(struct msg *msg) {
 	ASSERT(!msg->request);
 	ASSERT(msg->peer == NULL);
@@ -160,51 +157,6 @@ int dtc_header_remove(struct msg* msg)
 	return 0;
 }
 
-int key_define_handle(struct msg* msg)
-{
-	struct mbuf* mbuf = STAILQ_LAST(&msg->buf_q, mbuf, next);
-	int buf_len = 0;
-
-	log_debug("key_define_handle entry.");
-
-	if(!mbuf)
-		return -1;
-
-	buf_len = mbuf->last - mbuf->start;
-	if(buf_len < sizeof(struct DTC_HEADER_V2))
-		return -3;
-
-	uint8_t* pos = mbuf->start + sizeof(struct DTC_HEADER_V2);
-	uint8_t type = *pos;
-	uint8_t key_len;
-	int i = 0;
-	g_dtc_key_type = type;
-	pos++;
-
-	key_len = *pos;
-	pos++;
-	if(key_len > DTC_KEY_MAX || key_len <= 0)
-		return -2;
-	
-	if(mbuf->last - pos < key_len)
-	{
-		log_debug("key len:%d %d", mbuf->last - pos, key_len);
-		return -4;
-	}
-
-	memcpy(g_dtc_key, pos, key_len);
-	g_dtc_key[key_len] = '\0';
-
-	for(i = 0; i < key_len; i++)
-		g_dtc_key[i] = upper(g_dtc_key[i]);
-	
-	log_info("dtc key:%s", g_dtc_key);
-
-	log_debug("key_define_handle leave.");
-
-	return 0;
-}
-
 void rsp_recv_done(struct context *ctx, struct conn *conn, struct msg *msg,
 		struct msg *nmsg) {
 	log_debug("rsp_recv_done entry.");
@@ -298,15 +250,6 @@ void rsp_recv_done(struct context *ctx, struct conn *conn, struct msg *msg,
 				dtc_header_remove(msg);
 				rsp_forward(ctx, c_conn, req);
 				break;
-			case CMD_KEY_DEFINE:	// get key define config from dtc server.
-				ret = key_define_handle(msg);
-				if(ret < 0)
-				{
-					log_error("get dtc key error:%d", ret);
-				}
-				c_conn->dequeue_inq(ctx, c_conn, req);
-				req_put(req);
-				break;
 			default:
 				log_error("msg admin error:%d", msg->admin);
 		}

+ 2 - 1
src/agent/da_server.c

@@ -115,7 +115,7 @@ uint32_t server_pool_idx(struct server_pool *pool, uint8_t *key,
 
 	if(key == NULL && keylen == 0)
 	{
-		log_debug("server_pool_idx 2\n");
+		log_debug("server_pool_idx 2: %d\n", array_n(&pool->server));
 		return array_n(&pool->server) - 1;
 
 		hash = server_pool_hash(pool, key, keylen);
@@ -141,6 +141,7 @@ static struct server *server_pool_server(struct server_pool *pool,
 	uint32_t idx;
 
 	idx = msg->idx;
+	log_debug("server pool idx: %d", idx);
 	server = array_get(&pool->server, idx);
 
 	return server;

+ 0 - 1
src/agent/my/my_comm.h

@@ -16,7 +16,6 @@
 
 #ifndef _MY_COMM_H
 #define _MY_COMM_H
-#include "da_string.h"
 #include <stdint.h>
 #include <stdlib.h>
 #include <string.h>

+ 136 - 32
src/agent/my/my_parse.c

@@ -23,6 +23,7 @@
 #include "../da_buf.h"
 #include "../da_util.h"
 #include "../da_errno.h"
+#include "../da_server.h"
 #include "../da_time.h"
 #include "../da_core.h"
 #include "my_comm.h"
@@ -63,9 +64,6 @@ enum codestate {
 	ST_VALUE = 2,
 };
 
-char g_dtc_key[DTC_KEY_MAX] = { 0 };
-int g_dtc_key_type = -1;
-
 /*
  * parse request msg
  */
@@ -106,8 +104,6 @@ void my_parse_req(struct msg *r)
 		}
 
 		if (r->owner->stage == CONN_STAGE_LOGGED_IN) {
-			r->keytype = g_dtc_key_type;
-
 			rc = my_get_command(p, input_packet_length, r,
 					    &command);
 			if (rc) {
@@ -128,7 +124,7 @@ void my_parse_req(struct msg *r)
 				r->cmd = MSG_REQ_SVRADMIN;
 			}
 		}
-
+log_debug("AAAAAAAAA 444444444444");
 		p += input_packet_length;
 
 		goto success;
@@ -218,7 +214,7 @@ int my_get_command(uint8_t *input_raw_packet, uint32_t input_packet_length,
 		   struct msg *r, enum enum_server_command *cmd)
 {
 	*cmd = (enum enum_server_command)(uchar)input_raw_packet[0];
-
+	log_debug("cmd: %d", *cmd);
 	if (*cmd >= COM_END)
 		*cmd = COM_END; // Wrong command
 
@@ -512,11 +508,84 @@ bool check_cmd_select(struct string *str)
 	return false;
 }
 
+int get_mid_by_dbname(const char* dbname, const char* sql, struct msg* r)
+{
+	int mid = 0;
+	struct context* ctx = NULL;
+	struct conn *c_conn = NULL;
+	int sql_len = 0;
+	c_conn = r->owner;
+	ctx = conn_to_ctx(c_conn);
+	if(dbname && strlen(dbname) > 0)
+	{
+		char* cmp_dbname[250] = {0};
+		sprintf(cmp_dbname, "%s.", dbname);
+		struct array *pool = &(ctx->pool);
+		int i;
+		for (i = 0; i < array_n(pool); i++) {
+			struct server_pool *p = (struct server_pool *)array_get(pool, i);
+			if(string_empty(&p->name))
+				continue;
+			log_info("server pool module name: %s, cmp dbname: %s", p->name.data, cmp_dbname);
+			if(da_strncmp(p->name.data, cmp_dbname, strlen(cmp_dbname)) == 0)
+			{
+				mid = p->mid;
+			}
+		}
+	}
+
+	if(sql)
+	{
+		sql_len = strlen(sql);
+		if(sql_len > 0)
+		{
+			struct array *pool = &(ctx->pool);
+			int i, j;
+			for (i = 0; i < array_n(pool); i++) {
+				struct string cmp_name; 
+				struct server_pool *p = (struct server_pool *)array_get(pool, i);
+				if(string_empty(&p->name))
+					continue;
+				
+				string_copy(&cmp_name, p->name.data, p->name.len);
+				string_upper(&cmp_name);
+				for(j = 0; j < sql_len; j++)
+				{
+					if(sql_len - j > cmp_name.len && da_strncmp(sql + j, cmp_name.data, cmp_name.len) == 0)
+					{
+						mid = p->mid;
+					}
+				}
+				log_info("server pool module name: %s, cmp sql: %s", cmp_name.data, sql);
+			}
+		}
+		
+	}
+
+	log_info("mid result: %d", mid);
+	return mid;
+}
+
+void get_tablename(struct msg* r, uint8_t* sql, int sql_len)
+{
+	char tablename[260] = {0};
+	if(sql == NULL || sql_len <= 0)
+		return ;
+
+	log_debug("AAAAAAAAA 555555555555");
+	int ret = sql_parse_table(sql, &tablename);
+	if(ret > 0)
+	{
+		log_debug("AAAAAAAAA 666666666666");
+		string_copy(&r->table_name, tablename, strlen(tablename));
+	}
+	log_debug("AAAAAAAAA 77777777777 %s", tablename);
+}
+
 int my_get_route_key(uint8_t *sql, int sql_len, int *start_offset,
-		     int *end_offset, const char* dbname)
+		     int *end_offset, const char* dbname, struct msg* r)
 {
 	int i = 0;
-	int dtc_key_len = da_strlen(g_dtc_key);
 	struct string str;
 	int ret = 0;
 	int layer = 0;
@@ -529,10 +598,47 @@ int my_get_route_key(uint8_t *sql, int sql_len, int *start_offset,
 	if (!string_upper(&str))
 		return -9;
 
-	log_debug("sql: %s, key: %s", str.data, g_dtc_key);
+	log_debug("sql: %s", str.data);
+	if(dbname && strlen(dbname))
+	{
+		log_debug("dbname len:%d, dbname: %s", strlen(dbname), dbname);
+	}
+
+	int mid = get_mid_by_dbname(dbname, str.data, r);
+	char conf_path[260] = {0};
+	if(mid != 0)
+	{
+		sprintf(conf_path, "/etc/dtc/dtc-conf-%d.yaml", mid);
+		r->mid = mid;
+	}
+
+	get_tablename(r, str.data, str.len);
+	if(r->table_name.len > 0)
+		log_debug("table name: %s", r->table_name.data);
+
+	char* res = NULL;
+	char strkey[260] = {0};
+	memset(strkey, 0, 260);
+	if(strlen(conf_path) > 0)
+	{
+		res = rule_get_key(conf_path);
+		if(res == NULL)
+		{
+			ret = -5;
+			goto done;
+		}
+		else
+		{
+			strcpy(strkey, res);
+			log_debug("strkey: %s", strkey);
+		}
+	}
+
+	r->keytype = rule_get_key_type(conf_path);
+	log_debug("strkey type: %d", r->keytype);
 
 	//agent sql route, rule engine
-	layer = rule_sql_match(str.data, g_dtc_key, dbname);
+	layer = rule_sql_match(str.data, dbname, strlen(conf_path) > 0 ? conf_path : NULL);
 	log_debug("rule layer: %d", layer);
 
 	if(layer != 1)
@@ -555,38 +661,34 @@ int my_get_route_key(uint8_t *sql, int sql_len, int *start_offset,
 	}
 
 	for (; i < str.len; i++) {
-		if (str.len - i >= dtc_key_len) {
+		if (str.len - i >= strlen(strkey)) {
 			log_debug(
-				"str.len:%d i:%d dtc_key_len:%d str.data + i:%s g_dtc_key:%c",
-				str.len, i, dtc_key_len, str.data + i,
-				g_dtc_key);
-			if (da_strncmp(str.data + i, g_dtc_key, dtc_key_len) ==
-			    0) {
+				"key: %s, key len:%d, str.len:%d i:%d dtc_key_len:%d str.data + i:%s ", strkey, strlen(strkey),
+				str.len, i, strlen(strkey), str.data + i);
+			if (da_strncmp(str.data + i, strkey, strlen(strkey)) == 0) 
+			{
 				int j;
-				for (j = i + dtc_key_len; j < str.len; j++) {
-					if (str.data[j] == '=') {
+				for (j = i + strlen(strkey); j < str.len; j++) 
+				{
+					if (str.data[j] == '=') 
+					{
 						j++;
 						//strip space.
-						while (j < str.len &&
-						       str.data[j] == ' ') {
+						while (j < str.len && str.data[j] == ' ') 
+						{
 							j++;
 						}
 
-						if (j < str.len) {
+						if (j < str.len) 
+						{
 							*start_offset = j;
 
 							int k = 0;
 							for (k = j; k < str.len;
 							     k++) {
-								if (sql[k + 1] ==
-									    ' ' ||
-								    sql[k + 1] ==
-									    ';' ||
-								    k + 1 ==
-									    str.len) {
-									*end_offset =
-										k +
-										1;
+								if (sql[k + 1] == ' ' || sql[k + 1] == ';' || k + 1 == str.len) 
+								{
+									*end_offset = k + 1;
 									ret = layer;
 									goto done;
 								}
@@ -594,7 +696,9 @@ int my_get_route_key(uint8_t *sql, int sql_len, int *start_offset,
 
 							ret = -4;
 							goto done;
-						} else {
+						} 
+						else 
+						{
 							ret = -5;
 							goto done;
 						}

+ 1 - 1
src/agent/my/my_parse.h

@@ -35,6 +35,6 @@ int my_do_command(struct context *ctx, struct conn *c_conn, struct msg *msg);
 int my_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq);
 
 int my_get_route_key(uint8_t *sql, int sql_len, int *start_offset,
-		     int *end_offset, const char* dbname);
+		     int *end_offset, const char* dbname, struct msg* r);
 
 #endif /* _MY_PARSE_H_ */

+ 2 - 2
src/agent/my/my_protocol_classic.c

@@ -18,7 +18,7 @@ bool parse_packet(uchar *input_raw_packet, int input_packet_length,
 	switch (cmd) {
 	case COM_INIT_DB: {
 		uint8_t *p = input_raw_packet;
-		log_debug("len: %d", input_packet_length);
+		log_debug("COM_INIT_DB len: %d", input_packet_length);
 
 		if (*p == 0x0) {
 			log_debug("len: %d", input_packet_length);
@@ -62,7 +62,7 @@ bool parse_packet(uchar *input_raw_packet, int input_packet_length,
 		log_debug("len: %d", input_packet_length);
 
 		int layer = my_get_route_key(p, input_packet_length,
-					   &start_offset, &end_offset, r->owner->dbname);
+					   &start_offset, &end_offset, r->owner->dbname, r);
 		//if(layer <= 0 || layer > 3)
 		//	layer = 3;
 

+ 2 - 4
src/complex/cm_load.cc

@@ -19,8 +19,6 @@
 #include "cm_conn.h"
 #include "log.h"
 
-#define CACHE_CONF_NAME "/etc/dtc/dtc.yaml"
-
 using namespace std;
 
 #define STRCPY(d,s) do{ strncpy(d, s, sizeof(d)-1); d[sizeof(d)-1]=0; }while(0)
@@ -60,11 +58,11 @@ ConfigHelper::~ConfigHelper ()
 {
 }
 
-bool ConfigHelper::load_dtc_config()
+bool ConfigHelper::load_dtc_config(std::string conf_file)
 {
 	try 
 	{
-        dtc = YAML::LoadFile(CACHE_CONF_NAME);
+        dtc = YAML::LoadFile(conf_file);
 		if(dtc.IsNull())
 		{
 			log4cplus_error("dtc null");

+ 1 - 1
src/complex/cm_load.h

@@ -27,7 +27,7 @@ public:
     std::string GetStringValue(const char* key, std::string default_value = "");
 	std::vector<int> GetIntArray(const char* key);
 
-	bool load_dtc_config();
+	bool load_dtc_config(std::string conf_file);
 	
 	bool load_hot_inst_info();
 	bool load_full_inst_info();

+ 14 - 2
src/complex/main.cc

@@ -33,6 +33,8 @@ static PollerBase* workerThread;
 CTransactionGroup* FullDBGroup = NULL;
 CTransactionGroup* HotDBGroup = NULL;
 
+std::string conf_path = "/etc/dtc/dtc.yaml";
+
 static int start_main_thread()
 {
 	workerThread = new PollerBase("async-connector");
@@ -44,7 +46,10 @@ static int start_main_thread()
 	agentListener = new CAgentListenPkg();
 
 	agentProcess->BindDispatcher(netserverProcess);	
-	if(agentListener->Bind("*:2002/tcp", agentProcess) < 0)
+	char portstr[250] = {0};
+	sprintf(portstr, "*:%d/tcp", g_config.get_conf()["props"]["listener.port.async"].as<int>());
+	log4cplus_debug("portstr:%s", portstr);
+	if(agentListener->Bind(portstr, agentProcess) < 0)
 		return -1;
 		
 	workerThread->RunningThread();
@@ -85,7 +90,7 @@ int GetIdxVal(const char *key,
 int init_config(void)
 {
 	//load cold and hot db config.
-	if(g_config.load_dtc_config() == false)
+	if(g_config.load_dtc_config(conf_path) == false)
 	{
 		log4cplus_error("load db config error");
 		return -1;
@@ -168,11 +173,18 @@ int start_db_thread_group(DBHost* dbconfig, std::string level)
 
 int main(int argc, char* argv[])
 {
+	// ./complex [conf file]
 	int ret = 0;
 
 	init_log4cplus();
 	log4cplus_info("async-connector main entry.");
 
+	if(argc == 2)
+	{
+		conf_path = argv[1];
+		log4cplus_info("custom conf path: %s", conf_path.c_str());
+	}
+
     if(init_config() == -1)
 		return -1;
 

+ 2 - 169
src/complex/transaction_task.cc

@@ -423,7 +423,7 @@ CBufferChain *encode_row_data(MysqlConn* dbconn, CBufferChain *bc, uint8_t &pkt_
 std::string build_dtc_table_type(std::string real_tbname, std::string real_dbname)
 {
 	YAML::Node dtc = g_config.get_conf();
-	if(real_dbname == "dtc")
+	if(real_dbname == dtc["primary"]["hot"]["real"][0]["db"].as<std::string>())
 	{
 		if(dtc["primary"]["full"])
 			return "LAYERED TALBE";
@@ -433,16 +433,6 @@ std::string build_dtc_table_type(std::string real_tbname, std::string real_dbnam
 			return "SINGLE TABLE";
 	}
 
-	if(dtc["primary"]["hot"]["real"][0]["db"].as<std::string>() == real_dbname)
-	{
-		std::string dst_tbname;
-		std::string tbtype;
-		dst_tbname = dtc["primary"]["hot"]["logic"]["table"].as<std::string>();
-
-		if(dst_tbname == real_tbname)
-			return dtc["primary"]["hot"]["sharding"] ? "SHARDING TALBE" : "SINGLE TABLE";
-	}
-
 	int ext_count = dtc["extension"].size();
 	for(int i = 0; i < ext_count; i++)
 	{
@@ -548,90 +538,9 @@ CBufferChain *encode_show_db_row_data(MysqlConn* dbconn, CBufferChain *bc, uint8
 		nbc = nbc->nextBuffer;
 	}
 
-	//add dtc cache layer1 info.
-	if(dbconn->field_num == 1)
-	{
-		//calc current row len
-		int row_len = 0;
-		YAML::Node dtc = g_config.get_conf();
-		std::string tbname = "dtc";
-
-		for (int j = 0; j < dbconn->field_num; j++) {
-			row_len++; //first byte for result len
-			row_len += tbname.length();
-		}
-
-		//alloc new buffer to store row data.
-		int packet_len = sizeof(CBufferChain) +
-					sizeof(MYSQL_HEADER_SIZE) + row_len;
-		CBufferChain *nbuff = (CBufferChain *)MALLOC(packet_len);
-		if (nbuff == NULL) {
-			return NULL;
-		}
-		nbuff->totalBytes = packet_len - sizeof(CBufferChain);
-		nbuff->usedBytes = sizeof(MYSQL_HEADER_SIZE) + row_len;
-		nbuff->nextBuffer = NULL;
-
-		char *r = nbuff->data;
-		encode_mysql_header(nbuff, row_len, pkt_nr++);
-		int offset = 0;
-		offset += sizeof(MYSQL_HEADER_SIZE);
-
-		//copy fields content
-		for (int j = 0; j < dbconn->field_num; j++) {
-			*(r + offset) = tbname.length();
-			offset++;
-			memcpy(r + offset, tbname.c_str(), tbname.length());
-			offset += tbname.length();
-		}
-
-		nbc->nextBuffer = nbuff;
-		nbc = nbc->nextBuffer;
-	}
-
 	return nbc;
 }
 
-CBufferChain *encode_show_tables_dtc(CTaskRequest *request, std::string dbname)
-{
-	uint8_t pkt_nr = request->get_seq_num();
-	pkt_nr++;
-
-	//calc current row len
-	int row_len = 0;
-	YAML::Node dtc = g_config.get_conf();
-	std::string tbname = dtc["primary"]["table"].as<std::string>();
-	std::string tbtypestr = build_dtc_table_type(tbname, dbname);
-
-	row_len++; //first byte for result len
-	row_len += tbtypestr.length();
-
-	//alloc new buffer to store row data.
-	int packet_len = sizeof(CBufferChain) +
-				sizeof(MYSQL_HEADER_SIZE) + row_len;
-
-	CBufferChain *nbuff = (CBufferChain *)MALLOC(packet_len);
-	if (nbuff == NULL) {
-		return NULL;
-	}
-	nbuff->totalBytes = packet_len - sizeof(CBufferChain);
-	nbuff->usedBytes = sizeof(MYSQL_HEADER_SIZE) + row_len;
-	nbuff->nextBuffer = NULL;
-
-	char *r = nbuff->data;
-	encode_mysql_header(nbuff, row_len, pkt_nr);
-	int offset = 0;
-	offset += sizeof(MYSQL_HEADER_SIZE);
-
-	//copy fields content
-	*(r + offset) = tbtypestr.length();
-	offset++;
-	memcpy(r + offset, tbtypestr.c_str(), tbtypestr.length());
-	offset += tbtypestr.length();
-
-	return nbuff;
-}
-
 CBufferChain *encode_show_tables_row_data(MysqlConn* dbconn, CBufferChain *bc, uint8_t &pkt_nr, std::string dbname)
 {
 	CBufferChain *nbc = bc;
@@ -716,67 +625,6 @@ CBufferChain *encode_show_tables_row_data(MysqlConn* dbconn, CBufferChain *bc, u
 		nbc = nbc->nextBuffer;
 	}
 
-#if 0
-	//add dtc cache layer1 info.
-	if(dbconn->field_num == 2)
-	{
-		//calc current row len
-		int row_len = 0;
-		YAML::Node dtc = g_config.get_conf();
-		std::string tbname = dtc["primary"]["table"].as<std::string>();
-		std::string tbtypestr = build_dtc_table_type(tbname, "@@DTC_LAYER1_CACHE@@");
-
-		for (int j = 0; j < dbconn->field_num; j++) {
-			if(j == 1) 	// table type field
-			{
-				row_len++; //first byte for result len
-				row_len += tbtypestr.length();
-			}
-			else if(j == 0)
-			{
-				row_len++; //first byte for result len
-				row_len += tbname.length();
-			}
-		}
-
-		//alloc new buffer to store row data.
-		int packet_len = sizeof(CBufferChain) +
-					sizeof(MYSQL_HEADER_SIZE) + row_len;
-		CBufferChain *nbuff = (CBufferChain *)MALLOC(packet_len);
-		if (nbuff == NULL) {
-			return NULL;
-		}
-		nbuff->totalBytes = packet_len - sizeof(CBufferChain);
-		nbuff->usedBytes = sizeof(MYSQL_HEADER_SIZE) + row_len;
-		nbuff->nextBuffer = NULL;
-
-		char *r = nbuff->data;
-		encode_mysql_header(nbuff, row_len, pkt_nr++);
-		int offset = 0;
-		offset += sizeof(MYSQL_HEADER_SIZE);
-
-		//copy fields content
-		for (int j = 0; j < dbconn->field_num; j++) {
-			if(j == 1) 	// table type field
-			{
-				*(r + offset) = tbtypestr.length();
-				offset++;
-				memcpy(r + offset, tbtypestr.c_str(), tbtypestr.length());
-				offset += tbtypestr.length();
-			}
-			else if(j == 0)
-			{
-				*(r + offset) = tbname.length();
-				offset++;
-				memcpy(r + offset, tbname.c_str(), tbname.length());
-				offset += tbname.length();
-			}
-		}
-
-		nbc->nextBuffer = nbuff;
-		nbc = nbc->nextBuffer;
-	}
-#endif
 	return nbc;
 }
 
@@ -868,10 +716,7 @@ CBufferChain* TransactionTask::encode_mysql_protocol(CTaskRequest *request)
 
 	CBufferChain *prow = NULL;
 	if(request->cmd == QUERY_CMD_SHOW_TABLES)
-	{
 		prow = encode_show_tables_row_data(m_DBConn, pos, pkt_nr, request->get_dbname());
-	}
-		
 	else if(request->cmd == QUERY_CMD_SHOW_DB)
 		prow = encode_show_db_row_data(m_DBConn, pos, pkt_nr);
 	else
@@ -907,7 +752,7 @@ int TransactionTask::request_db_query(std::string request_sql, CTaskRequest *req
 
 	std::string db = request->get_dbname();
 	int ret = 0;	
-	if(db.length() == 0 || db == std::string("dtc"))
+	if(db.length() == 0)
 		ret = m_DBConn->Query(m_Sql.c_str());
 	else
 		ret = m_DBConn->Query(db.c_str(), m_Sql.c_str());
@@ -951,18 +796,6 @@ int TransactionTask::request_db_query(std::string request_sql, CTaskRequest *req
 			request->cmd = QUERY_CMD_NORMAL;
 		}
 
-		if(db == std::string("dtc"))
-		{
-			CBufferChain* rba = NULL;
-			if(request->cmd == QUERY_CMD_SHOW_TABLES)
-			{
-				rba = encode_mysql_ok(request, 0); //encode_show_tables_dtc(request, request->get_dbname());
-				if(rba)
-					request->set_buffer_chain(rba);
-				return 0;
-			}
-		}
-
 		ret = m_DBConn->UseResult();
 		if (0 != ret) {
 			log4cplus_error("can not use result,sql[%s]", m_Sql.c_str());

+ 1 - 1
src/libs/common/config/dbconfig.cc

@@ -430,7 +430,7 @@ int DbConfig::get_dtc_config(YAML::Node dtc_config, DTCConfig* raw, int i_server
         }
     }
     dstype = 0;
-    checkTable = 1;
+    checkTable = 0;
 
     //TODO: string key supporting.
     // key-hash dll

+ 4 - 4
src/libs/common/daemon/daemon.cc

@@ -42,8 +42,8 @@ int background = 1;
 const char stat_project_name[] = "daemon";
 const char stat_usage_argv[] = "";
 
-#define TABLE_CONF_NAME "/etc/dtc/dtc.yaml"
-#define CACHE_CONF_NAME "/etc/dtc/dtc.yaml"
+#define TABLE_CONF_NAME "../conf/dtc.yaml"
+#define CACHE_CONF_NAME "../conf/dtc.yaml"
 
 char d_cache_file[256] = CACHE_CONF_NAME;
 char d_table_file[256] = TABLE_CONF_NAME;
@@ -106,8 +106,8 @@ int load_entry_parameter(int argc, char **argv)
 
 	//init_log("dtcd");
 	log4cplus_info("%s v%s: starting....", stat_project_name, version);
-	strcpy(d_table_file, "/etc/dtc/dtc.yaml");
-	strcpy(d_cache_file, "/etc/dtc/dtc.yaml");
+	//strcpy(d_table_file, "/etc/dtc/dtc.yaml");
+	//strcpy(d_cache_file, "/etc/dtc/dtc.yaml");
 	g_dtc_config = new DTCConfig;
 	//load config file and copy it to ../stat
 	if (g_dtc_config->load_yaml_file(d_table_file, true) == -1)

+ 2 - 2
src/libs/common/dtc_global.h

@@ -17,8 +17,8 @@
 #define _DTC_GLOBAL_H_
 #include "algorithm/non_copyable.h"
 
-#define TABLE_CONF_NAME "/etc/dtc/dtc.yaml"
-#define CACHE_CONF_NAME "/etc/dtc/dtc.yaml"
+#define TABLE_CONF_NAME "../conf/dtc.yaml"
+#define CACHE_CONF_NAME "../conf/dtc.yaml"
 #define ALARM_CONF_FILE "/etc/dtc/dtcalarm.conf"
 
 class DTCGlobal : private noncopyable {

+ 1 - 1
src/libs/common/key/key_route_ask_chain.cc

@@ -613,7 +613,7 @@ bool KeyRouteAskChain::migration_inprogress()
 	return false;
 }
 
-static const char *state_file_name = "/var/log/dtc/data/cluster.stat";
+static const char *state_file_name = "../data/cluster.stat";
 
 void KeyRouteAskChain::save_state_to_file()
 {

+ 1 - 1
src/libs/common/key/key_route_ask_chain.h

@@ -78,7 +78,7 @@ class KeyRouteAskChain : public JobAskInterface<DTCJobOperation> {
 
 	std::string key_list_file_name(const std::string &name)
 	{
-		return "/var/log/dtc/data/" + name + ".migrated";
+		return "../data/" + name + ".migrated";
 	}
 
 	std::string select_node(const char *key);

+ 1 - 1
src/libs/common/my/my_request.cc

@@ -114,7 +114,7 @@ bool MyRequest::check_packet_info()
 {
 	if (this->raw == NULL || this->raw_len <= 0) {
 		log4cplus_error(
-			"check packet info error:%p %dset packet info first please",
+			"check packet info error:%p %d, set packet info first please",
 			this->raw, this->raw_len);
 		return false;
 	} else

+ 3 - 9
src/libs/common/packet/packet_server.cc

@@ -45,8 +45,8 @@ enum enum_select_types {
 
 const MetaSelections meta_selections[] = {
 	{"select dtctables" , E_SELECT_DTC_TABLES 	, NULL},
-	{"select dtcyaml" 	, E_SELECT_DTC_YAML 	, "/etc/dtc/dtc.yaml"},
-	{"select tableyaml" , E_SELECT_TABLE_YAML 	, "/etc/dtc/dtc.yaml"}
+	{"select dtcyaml" 	, E_SELECT_DTC_YAML 	, "../conf/dtc.yaml"},
+	{"select tableyaml" , E_SELECT_TABLE_YAML 	, "../conf/dtc.yaml"}
 };
 
 enum enum_field_types { MYSQL_TYPE_DECIMAL, MYSQL_TYPE_TINY,
@@ -1263,9 +1263,7 @@ int Packet::yaml_config_result(DtcJob *job , const char* p_filename)
 	char* p_buf = NULL;
 	int i_len = 0;
 	int i_ret = load_table(p_filename , p_buf , i_len);
-	if (p_buf != NULL) {
-		log4cplus_debug("p_filename:%s , buflen:%d" , p_filename , i_len);
-	}
+	log4cplus_debug("p_filename:%s , buflen:%d" , p_filename , i_len);
     
 	if (i_ret != 0) { return -EFAULT; }
 
@@ -1366,10 +1364,6 @@ int Packet::encode_result_v2(DtcJob &job, int mtu, uint32_t ts)
 		break;
 	}
 
-	// if (is_desc_tables(&job)) {
-	// 	return desc_tables_result(&job);
-	// }
-
 	// rp指向返回数据集
 	ResultPacket *rp =
 		job.result_code() >= 0 ? job.get_result_packet() : NULL;

+ 1 - 1
src/libs/common/protocol.h

@@ -137,7 +137,7 @@ struct DTC_HEADER_V2 {
 	uint8_t version;
 	uint8_t admin;
 	uint8_t layer;
-	uint8_t reserved[1];
+	uint8_t dbname_len;
 	uint32_t packet_len;
 	uint64_t id;
 };

+ 4 - 1
src/libs/common/task/task_base.cc

@@ -295,11 +295,12 @@ void DtcJob::decode_packet_v2(char *packetIn, int packetLen, int type)
 	}
 
 	peerid = header->id;
+	int dbname_len = header->dbname_len;
 
 	//offset DTC Header.
 	p = p + sizeof(DTC_HEADER_V2);
 
-	mr.set_packet_info(p, packetLen - sizeof(DTC_HEADER_V2));
+	mr.set_packet_info(p + dbname_len, packetLen - sizeof(DTC_HEADER_V2) - dbname_len);
 
 	struct timeval tv1, tv2;
 	gettimeofday(&tv1, NULL);
@@ -812,6 +813,7 @@ void DtcJob::decode_request_v1(DTC_HEADER_V1 &header, char *p)
 		}
 
 		if (role == TaskRoleServer) {
+#if 0
 			/* local storage no need to check table, because it always set it to "@HOT_BACKUP", checking tablename */
 			if (requestCode != DRequest::Replicate &&
 			    !is_same_table(versionInfo.table_name())) {
@@ -825,6 +827,7 @@ void DtcJob::decode_request_v1(DTC_HEADER_V1 &header, char *p)
 					versionInfo.table_name().ptr,
 					table_name());
 			}
+#endif			
 
 			/* check table hash */
 			if (requestCode != DRequest::Replicate &&

+ 1 - 1
src/libs/stat/stat_dtc.h

@@ -20,7 +20,7 @@
 #include "stat_thread.h"
 #include "stat_dtc_def.h"
 
-#define STATIDX "/usr/local/dtc/stat/dtc.stat.idx"
+#define STATIDX "../stat/dtc.stat.idx"
 extern StatThread g_stat_mgr;
 extern int init_statistics(void);
 

+ 38 - 0
src/rule/re_cache.cc

@@ -148,6 +148,44 @@ std::string get_schema(SQLParserResult* sql_ast)
     return "";
 }
 
+std::string get_table_name(SQLParserResult* sql_ast)
+{
+    StatementType t = sql_ast->getStatement(0)->type();
+    if(t == kStmtSelect)
+    {
+        const SelectStatement* stmt = (const SelectStatement*)(sql_ast->getStatement(0));
+        TableRef* table = stmt->fromTable;
+        if(table)
+        {
+            return std::string(table->getName());
+        }
+    }
+    else if(t == kStmtInsert)
+    {
+        const InsertStatement* stmt = (const InsertStatement*)(sql_ast->getStatement(0));
+        return std::string(stmt->tableName);
+    }
+    else if(t == kStmtUpdate)
+    {
+        const UpdateStatement* stmt = (const UpdateStatement*)(sql_ast->getStatement(0));
+        TableRef* table = stmt->table;
+        if(table)
+        {
+            return std::string(table->getName());
+        }
+    }
+    else if(t == kStmtDelete)
+    {
+        const DeleteStatement* stmt = (const DeleteStatement*)(sql_ast->getStatement(0));
+        if(stmt)
+        {
+            return std::string(stmt->tableName);
+        }
+    }
+  
+    return "";
+}
+
 bool is_dtc_adapt_type(SQLParserResult* sql_ast)
 {
     StatementType t = sql_ast->getStatement(0)->type();

+ 2 - 1
src/rule/re_cache.h

@@ -2,4 +2,5 @@
 #include "../libs/hsql/include/util/sqlhelper.h"
 
 bool re_is_cache_sql(hsql::SQLParserResult* sql_ast, std::string key);
-std::string get_schema(hsql::SQLParserResult* sql_ast);
+std::string get_schema(hsql::SQLParserResult* sql_ast);
+std::string get_table_name(hsql::SQLParserResult* sql_ast);

+ 3 - 3
src/rule/re_load.cc

@@ -5,7 +5,7 @@
 #include <iostream>
 #include "re_comm.h"
 
-#define CACHE_CONF_NAME "/etc/dtc/dtc.yaml"
+std::string conf_file = "/etc/dtc/dtc.yaml";
 
 using namespace hsql;
 hsql::SQLParserResult rule_ast;
@@ -17,7 +17,7 @@ std::string do_get_rule()
 {
     YAML::Node config;
     try {
-        config = YAML::LoadFile(CACHE_CONF_NAME);
+        config = YAML::LoadFile(conf_file);
 	} catch (const YAML::Exception &e) {
 		log4cplus_error("config file error:%s\n", e.what());
 		return "";
@@ -165,7 +165,7 @@ extern "C" int re_load_table_key(char* key)
 {
     YAML::Node config;
     try {
-        config = YAML::LoadFile(CACHE_CONF_NAME);
+        config = YAML::LoadFile(conf_file);
 	} catch (const YAML::Exception &e) {
 		log4cplus_error("config file error:%s\n", e.what());
 		return -1;

+ 96 - 12
src/rule/rule.cc

@@ -17,23 +17,95 @@
 using namespace std;
 
 extern vector<vector<hsql::Expr*> > expr_rules;
+extern std::string conf_file;
 
-extern "C" int rule_sql_match(const char* szsql, const char* szkey, const char* dbname)
+std::string get_key_info(std::string conf)
 {
-    if(!szsql || !szkey)
+    YAML::Node config;
+    try {
+        config = YAML::LoadFile(conf);
+	} catch (const YAML::Exception &e) {
+		log4cplus_error("config file error:%s\n", e.what());
+		return "";
+	}
+
+    YAML::Node node = config["primary"]["cache"]["field"][0]["name"];
+    if(node)
+    {
+        std::string keystr = node.as<string>();
+        transform(keystr.begin(),keystr.end(),keystr.begin(),::toupper);
+        return keystr;
+    }
+    
+    return "";
+}
+
+extern "C" const char* rule_get_key(const char* conf)
+{
+    std::string strkey = get_key_info(conf);
+    printf("222222222222, conf file: %s\n", conf);
+    printf("key len: %d, key: %s\n", strkey.length(), strkey.c_str());
+    if(strkey.length() > 0)
+        return strkey.c_str();
+    else
+        return NULL;
+}
+
+extern "C" int rule_get_key_type(const char* conf)
+{
+    YAML::Node config;
+    if(conf == NULL)
+        return -1;
+
+    try {
+        config = YAML::LoadFile(conf);
+	} catch (const YAML::Exception &e) {
+		log4cplus_error("config file error:%s\n", e.what());
+		return -1;
+	}
+
+    YAML::Node node = config["primary"]["cache"]["field"][0]["type"];
+    if(node)
+    {
+        std::string str = node.as<string>();
+        if(str == "signed")
+            return 1;
+        else if(str == "unsigned")
+            return 2;
+        else if(str == "float")
+            return 3;
+        else if(str == "string")
+            return 4;
+        else if(str == "binary")
+            return 5;
+        else   
+            return -1;
+    }
+    return -1;
+}
+
+extern "C" int rule_sql_match(const char* szsql, const char* dbname, const char* conf)
+{
+    if(!szsql)
         return -1;
         
     std::string key = "";
     std::string sql = szsql;
+    bool flag = false;
 
-    key = szkey;
-    if(key.length() == 0)
-        return -1;
+    init_log4cplus();
 
-    cout<<"key: "<<key<<endl;
-    cout<<"sql: "<<sql<<endl;
+    if(conf)
+    {
+        conf_file = std::string(conf);
+        flag = true;
 
-    init_log4cplus();
+        key = get_key_info(conf_file);
+        if(key.length() == 0)
+            return -1;
+    }
+
+    log4cplus_debug("key len: %d, key: %s, sql len: %d, sql: %s, dbname len: %d, dbname: %s", key.length(), key.c_str(), sql.length(), sql.c_str(), strlen(dbname), std::string(dbname).c_str());
 
     if(sql == "show databases" || sql == "SHOW DATABASES" || sql == "select database()" || sql == "SELECT DATABASE()")
     {
@@ -48,13 +120,13 @@ extern "C" int rule_sql_match(const char* szsql, const char* szkey, const char*
             return 3;
     }
 
-    if(dbname != NULL && strlen(dbname) > 0 && 
-        std::string(dbname) != SPECIFIC_L1_SCHEMA && 
-        std::string(dbname) != SPECIFIC_L2_SCHEMA)
+    log4cplus_debug("#############dbname:%s", dbname);
+    if(dbname != NULL && strlen(dbname) > 0 && flag == false)
     {
+        log4cplus_debug("#############111111111111");
         return 3;
     }
-
+    log4cplus_debug("#############22222222222");
     int ret = re_load_rule();
     if(ret != 0)
     {
@@ -95,3 +167,15 @@ extern "C" int rule_sql_match(const char* szsql, const char* szkey, const char*
     return 3;
 }
 
+extern "C" int sql_parse_table(const char* szsql, char* out)
+{
+    hsql::SQLParserResult sql_ast;
+    if(re_parse_sql(szsql, &sql_ast) != 0)
+        return -1;
+
+    std::string tablename = get_table_name(&sql_ast);
+    if(tablename.length() > 0)
+        strcpy(out, tablename.c_str());
+
+    return tablename.length();
+}

+ 4 - 1
src/rule/rule.h

@@ -3,8 +3,11 @@
 extern "C"{
 #endif
 
-    int rule_sql_match(const char* szsql, const char* szkey, const char* dbname);
+    int rule_sql_match(const char* szsql, const char* dbname, const char* conf);
     int re_load_table_key(char* key);
+    int sql_parse_table(const char* szsql, char* out);
+    int rule_get_key_type(const char* conf);
+    const char* rule_get_key(const char* conf);
 
 #ifdef __cplusplus    
 }

+ 57 - 30
src/utils/conf-gen-utils.cc

@@ -3,48 +3,36 @@
 #include "log.h"
 #include "yaml-cpp/yaml.h"
 #include <vector>
+#include <sys/types.h>
+#include <dirent.h>
 #include <map>
 
 using namespace std;
 #define ROOT_PATH "/etc/dtc/"
 
-char conf_file[256] = {0};
-YAML::Node dtc_config;
+char conf_dir[256] = {0};
 std::map<std::string, std::vector<YAML::Node>> dbmap;
 std::string get_merge_string(YAML::Node node);
-std::map<std::string, std::string> algorithm;
 
 int load_dtc_config(int argc, char *argv[])
 {
     int c;
-    strcpy(conf_file, ROOT_PATH);
-    strcat(conf_file, "dtc.yaml");
+    strcpy(conf_dir, ROOT_PATH);
 
     while ((c = getopt(argc, argv, "c:")) != -1) {
         switch (c) {
         case 'c':
-            log4cplus_info("conf file:%s", optarg);
-            strncpy(conf_file, optarg, sizeof(conf_file) - 1);
+            log4cplus_info("conf dir:%s", optarg);
+            strncpy(conf_dir, optarg, sizeof(conf_dir) - 1);
             break;
         }
     }
 
-    try {
-        log4cplus_info("loading file: %s", conf_file);
-        dtc_config = YAML::LoadFile(conf_file);
-	} catch (const YAML::Exception &e) {
-		log4cplus_error("config file error:%s, %s\n", e.what(), conf_file);
-		return -1;
-	}
-
-    if(!dtc_config)
-        return -1;
-
     log4cplus_info("loading conf file successfully.");
     return 0;
 }
 
-int load_node_to_map()
+int load_node_to_map(YAML::Node dtc_config)
 {
     log4cplus_info("loading node to map.");
     std::vector<YAML::Node> vec;
@@ -200,6 +188,7 @@ int yaml_dump_sharding_rule(FILE *fp, std::vector<YAML::Node> vec)
     fprintf(fp, "  tables:\n");
     std::string binding_table = "";
     std::vector<YAML::Node>::iterator vt;
+    std::map<std::string, std::string> algorithm;
     for(vt = vec.begin(); vt != vec.end(); vt++)
     {
         YAML::Node node = *vt;
@@ -256,19 +245,19 @@ int yaml_dump_sharding_rule(FILE *fp, std::vector<YAML::Node> vec)
         }
     }
 
-    fprintf(fp, "  shardingAlgorithms:\n");
-    std::map<std::string, std::string>::iterator it;
-    for (it = algorithm.begin(); it != algorithm.end(); it++) 
+    if(algorithm.size() > 0)
     {
-        fprintf(fp, "    %s:\n", (*it).first.c_str());
-        fprintf(fp, "      type: INLINE\n");
-        fprintf(fp, "      props:\n");
-        fprintf(fp, "        algorithm-expression: %s\n", (*it).second.c_str());
+        fprintf(fp, "  shardingAlgorithms:\n");
+        std::map<std::string, std::string>::iterator it;
+        for (it = algorithm.begin(); it != algorithm.end(); it++) 
+        {
+            fprintf(fp, "    %s:\n", (*it).first.c_str());
+            fprintf(fp, "      type: INLINE\n");
+            fprintf(fp, "      props:\n");
+            fprintf(fp, "        algorithm-expression: %s\n", (*it).second.c_str());
+        }
     }
 
-    fprintf(fp, "  bindingTables:\n");
-    fprintf(fp, "    - %s\n", binding_table.c_str());
-
     return 0;
 }
 
@@ -337,7 +326,45 @@ int main(int argc, char* argv[])
     if(load_dtc_config(argc, argv) < 0)
         return 0;
 
-    load_node_to_map();
+    char prefix[260] = {0};
+    strcpy(prefix, "dtc-conf-");
+
+    DIR *dir = opendir(conf_dir);
+	if (!dir)
+		return -1;
+
+	struct dirent *drt = readdir(dir);
+	if (!drt) {
+		closedir(dir);
+		return -2;
+	}
+
+	int l = strlen(prefix);
+	uint32_t v = 0;
+	int found = 0;
+
+	for (; drt; drt = readdir(dir)) 
+    {
+        log4cplus_info("d_name: %s", drt->d_name);
+		int n = strncmp(drt->d_name, prefix, l);
+		if (n == 0) {
+			YAML::Node dtc_config;
+            char filepath[260] = {0};
+            try {
+                sprintf(filepath, "%s%s", conf_dir, drt->d_name);
+                log4cplus_info("loading file: %s", filepath);
+                dtc_config = YAML::LoadFile(filepath);
+            } catch (const YAML::Exception &e) {
+                log4cplus_error("config file error:%s, %s\n", e.what(), filepath);
+                return -1;
+            }
+
+            if(!dtc_config)
+                return -1;
+
+            load_node_to_map(dtc_config);
+		}
+	}   
 
     delete_all_old_yaml();
 

+ 116 - 0
tests/init.ext.sql

@@ -0,0 +1,116 @@
+CREATE TABLE `opensource_0` (
+  `uid` int(11) ,
+  `name` varchar(50),
+  `city` varchar(50),
+  `sex` int(11) ,
+  `age` int(11)
+)DEFAULT CHARSET=utf8;
+
+CREATE TABLE `opensource_1` (
+  `uid` int(11) ,
+  `name` varchar(50),
+  `city` varchar(50),
+  `sex` int(11) ,
+  `age` int(11)
+)DEFAULT CHARSET=utf8;
+
+CREATE TABLE `opensource_2` (
+  `uid` int(11) ,
+  `name` varchar(50),
+  `city` varchar(50),
+  `sex` int(11) ,
+  `age` int(11)
+)DEFAULT CHARSET=utf8;
+
+CREATE TABLE `opensource_3` (
+  `uid` int(11) ,
+  `name` varchar(50),
+  `city` varchar(50),
+  `sex` int(11) ,
+  `age` int(11)
+)DEFAULT CHARSET=utf8;
+
+CREATE TABLE `opensource_4` (
+  `uid` int(11) ,
+  `name` varchar(50),
+  `city` varchar(50),
+  `sex` int(11) ,
+  `age` int(11)
+)DEFAULT CHARSET=utf8;
+
+CREATE TABLE `opensource_5` (
+  `uid` int(11) ,
+  `name` varchar(50),
+  `city` varchar(50),
+  `sex` int(11) ,
+  `age` int(11)
+)DEFAULT CHARSET=utf8;
+
+CREATE TABLE `opensource_6` (
+  `uid` int(11) ,
+  `name` varchar(50),
+  `city` varchar(50),
+  `sex` int(11) ,
+  `age` int(11)
+)DEFAULT CHARSET=utf8;
+
+CREATE TABLE `opensource_7` (
+  `uid` int(11) ,
+  `name` varchar(50),
+  `city` varchar(50),
+  `sex` int(11) ,
+  `age` int(11)
+)DEFAULT CHARSET=utf8;
+
+CREATE TABLE `opensource_8` (
+  `uid` int(11) ,
+  `name` varchar(50),
+  `city` varchar(50),
+  `sex` int(11) ,
+  `age` int(11)
+)DEFAULT CHARSET=utf8;
+
+CREATE TABLE `opensource_9` (
+  `uid` int(11) ,
+  `name` varchar(50),
+  `city` varchar(50),
+  `sex` int(11) ,
+  `age` int(11)
+)DEFAULT CHARSET=utf8;
+
+create database january;
+create database weekdays;
+
+use january;
+
+CREATE TABLE `monday` (
+  `day` int(11) ,
+  `weekaday` varchar(50),
+  `holiday` varchar(50)
+)DEFAULT CHARSET=utf8;
+
+use weekdays;
+
+CREATE TABLE `tuesday` (
+  `day` int(11) ,
+  `weekaday` varchar(50),
+  `holiday` varchar(50)
+)DEFAULT CHARSET=utf8;
+
+CREATE TABLE `sunday_0` (
+  `day` int(11) ,
+  `weekaday` varchar(50),
+  `holiday` varchar(50)
+)DEFAULT CHARSET=utf8;
+
+CREATE TABLE `sunday_1` (
+  `day` int(11) ,
+  `weekaday` varchar(50),
+  `holiday` varchar(50)
+)DEFAULT CHARSET=utf8;
+
+CREATE TABLE `sunday_2` (
+  `day` int(11) ,
+  `weekaday` varchar(50),
+  `holiday` varchar(50)
+)DEFAULT CHARSET=utf8;