123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485 |
- /*
- * Copyright [2021] JD.com, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <stdarg.h>
- #include <limits.h>
- #include <errno.h>
- #include <unistd.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <map>
- #include <string>
- // local include files
- #include "mysql_operation.h"
- // common include files
- #include "protocol.h"
- #include "log/log.h"
- #include "proc_title.h"
- #include "table/table_def_manager.h"
- #include "daemon/daemon.h"
- // mysql include files
- #include "mysqld_error.h"
- // core include files
- #include "buffer/buffer_pond.h"
- #define MIN(x, y) ((x) <= (y) ? (x) : (y))
- ConnectorProcess::ConnectorProcess() : _lengths(0)
- {
- error_no = 0;
- left_quote = '`';
- right_quote = '`';
- title_prefix_size = 0;
- time(&last_access);
- ping_timeout = 9;
- proc_timeout = 0;
- strncpy(name, "helper", 6);
- }
- int ConnectorProcess::try_ping(void)
- {
- return db_conn.do_ping();
- }
- void ConnectorProcess::init_ping_timeout(void)
- {
- int64_t to = db_conn.get_variable("wait_timeout");
- log4cplus_debug("Server idle timeout %lld", (long long)to);
- if (to < 10)
- to = 10;
- else if (to > 600)
- to = 600;
- ping_timeout = to * 9 / 10;
- }
- int ConnectorProcess::config_db_by_struct(const DbConfig *cf)
- {
- if (cf == NULL)
- return -1;
- dbConfig = cf;
- return (0);
- }
- #define DIM(a) (sizeof(a) / sizeof(a[0]))
- static int get_field_type(const char *szType, int &i_type,
- unsigned int &ui_size)
- {
- unsigned int i;
- int iTmp;
- typedef struct {
- char m_szName[256];
- int m_iType;
- int m_uiSize;
- } CMysqlField;
- static CMysqlField astField[] = { { "tinyint", 1, 1 },
- { "smallint", 1, 2 },
- { "mediumint", 1, 4 },
- { "int", 1, 4 },
- { "bigint", 1, 8 },
- { "float", 3, 4 },
- { "double", 3, 8 },
- { "decimal", 3, 8 },
- { "datetime", 4, 20 },
- { "date", 4, 11 },
- { "timestamp", 4, 20 },
- { "time", 4, 11 },
- { "year", 4, 5 },
- { "varchar", 4, 255 },
- { "char", 4, 255 },
- { "varbinary", 5, 255 },
- { "binary", 5, 255 },
- { "tinyblob", 5, 255 },
- { "tinytext", 4, 255 },
- { "blob", 5, 65535 },
- { "text", 4, 65535 },
- { "mediumblob", 5, 16777215 },
- { "mediumtext", 4, 16777215 },
- { "longblob", 5, 4294967295U },
- { "longtext", 4, 4294967295U },
- { "enum", 4, 255 },
- { "set", 2, 8 } };
- for (i = 0; i < DIM(astField); i++) {
- if (strncasecmp(szType, astField[i].m_szName,
- strlen(astField[i].m_szName)) == 0) {
- i_type = astField[i].m_iType;
- ui_size = astField[i].m_uiSize;
- if (strncasecmp(szType, "varchar", 7) == 0) {
- if (sscanf(szType + 8, "%d", &iTmp) == 1)
- ui_size = iTmp;
- } else if (strncasecmp(szType, "char", 4) == 0) {
- if (sscanf(szType + 5, "%d", &iTmp) == 1)
- ui_size = iTmp;
- } else if (strncasecmp(szType, "varbinary", 9) == 0) {
- if (sscanf(szType + 10, "%d", &iTmp) == 1)
- ui_size = iTmp;
- } else if (strncasecmp(szType, "binary", 6) == 0) {
- if (sscanf(szType + 7, "%d", &iTmp) == 1)
- ui_size = iTmp;
- }
- if (i_type == 1 && strstr(szType, "unsigned") != NULL)
- i_type = 2;
- if (i_type == 3 && strstr(szType, "unsigned") != NULL)
- fprintf(stderr,
- "#warning: dtc not support unsigned double!\n");
- break;
- }
- }
- return (0);
- }
- int ConnectorProcess::check_table()
- {
- int Ret;
- int i;
- int i_field_num;
- char ach_field_name[256][256];
- snprintf(DBName, sizeof(DBName), dbConfig->dbFormat,
- dbConfig->mach[self_group_id].dbIdx[0]);
- snprintf(table_name, sizeof(table_name), dbConfig->tblFormat, 0);
- init_sql_buffer();
- sql_append_const("show columns from `");
- sql_append_string(table_name);
- sql_append_const("`");
- log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
- Ret = db_conn.do_query(DBName, sql.c_str());
- log4cplus_debug("SELECT %d %d", Ret, db_conn.get_raw_err_no());
- if (Ret != 0) {
- log4cplus_warning("db query error: %s, pid: %d, group-id: %d",
- db_conn.get_err_msg(), getpid(),
- self_group_id);
- return (-1);
- }
- Ret = db_conn.use_result();
- if (Ret != 0) {
- log4cplus_warning("db user result error: %s",
- db_conn.get_err_msg());
- return (-2);
- }
- // 获取返回结果的各列位置
- int i_name_idx = 0, i_type_idx = 0;
- int i_null_idx = 0, i_key_idx = 0;
- int i_default_idx = 0, i_extra_idx = 0;
- unsigned int ui_num_fields = mysql_num_fields(db_conn.Res);
- MYSQL_FIELD *pst_fields = mysql_fetch_fields(db_conn.Res);
- for (i = 0; i < (int)ui_num_fields; i++) {
- if (strcasecmp("Field", pst_fields[i].name) == 0)
- i_name_idx = i;
- else if (strcasecmp("Type", pst_fields[i].name) == 0)
- i_type_idx = i;
- else if (strcasecmp("Null", pst_fields[i].name) == 0)
- i_null_idx = i;
- else if (strcasecmp("Key", pst_fields[i].name) == 0)
- i_key_idx = i;
- else if (strcasecmp("Default", pst_fields[i].name) == 0)
- i_default_idx = i;
- else if (strcasecmp("Extra", pst_fields[i].name) == 0)
- i_extra_idx = i;
- }
- int iFid;
- i_field_num = 0;
- memset(ach_field_name, 0, sizeof(ach_field_name));
- int uniq_fields_cnt_table = table_def->uniq_fields();
- for (i = 0; i < db_conn.res_num; i++) {
- Ret = db_conn.fetch_row();
- if (Ret != 0) {
- db_conn.free_result();
- log4cplus_warning("db fetch row error: %s",
- db_conn.get_err_msg());
- return (-3);
- }
- strncpy(ach_field_name[i_field_num], db_conn.Row[i_name_idx],
- 255);
- i_field_num++;
- iFid = table_def->field_id(db_conn.Row[i_name_idx]);
- if (iFid == -1) {
- log4cplus_debug("field[%s] not found in table.yaml",
- db_conn.Row[i_name_idx]);
- continue;
- }
- if (table_def->is_volatile(iFid)) {
- log4cplus_error(
- "field[name: `%s`] found in table.yaml and DB both, can't be Volatile",
- db_conn.Row[i_name_idx]);
- db_conn.free_result();
- return (-4);
- }
- if (table_def->is_timestamp(iFid)) {
- log4cplus_error(
- "in table.yaml, Field[name: `%s`]'s is timestamp, not support in DB mode",
- db_conn.Row[i_name_idx]);
- db_conn.free_result();
- return (-4);
- }
- //field type & size
- int i_type = -1;
- unsigned ui_size = 0;
- get_field_type(db_conn.Row[i_type_idx], i_type, ui_size);
- if (i_type != table_def->field_type(iFid)) {
- log4cplus_error(
- "in table.yaml, Field[name: `%s`]'s type incorrect. conf: %d, mysql:%d",
- db_conn.Row[i_name_idx],
- table_def->field_type(iFid), i_type);
- db_conn.free_result();
- return (-4);
- }
- if ((int)ui_size != table_def->field_size(iFid) &&
- !(ui_size >= (64 << 20) &&
- table_def->field_size(iFid) >= (64 << 20))) {
- log4cplus_error(
- "in table.yaml, Field[name: `%s`]'s size incorrect. conf: %d, mysql:%u",
- db_conn.Row[i_name_idx],
- table_def->field_size(iFid), ui_size);
- db_conn.free_result();
- return (-4);
- }
- if (db_conn.Row[i_extra_idx] != NULL &&
- strcasecmp("auto_increment", db_conn.Row[i_extra_idx]) ==
- 0) {
- if (table_def->auto_increment_field_id() != iFid) {
- log4cplus_error(
- "in table.yaml, Field[name: `%s`]'s default-value incorrect. conf: non-auto_increment, mysql:auto_increment",
- db_conn.Row[i_name_idx]);
- db_conn.free_result();
- return (-4);
- }
- }
- /*field should be uniq in table.yaml if configed primary in db */
- uint8_t *uniq_fields = table_def->uniq_fields_list();
- if (db_conn.Row[i_key_idx] != NULL &&
- (strcasecmp("PRI", db_conn.Row[i_key_idx]) == 0 ||
- strcasecmp("UNI", db_conn.Row[i_key_idx]) == 0)) {
- int j = 0;
- for (j = 0; j < table_def->uniq_fields(); j++) {
- if (uniq_fields[j] == iFid)
- break;
- }
- if (j >= table_def->uniq_fields()) {
- log4cplus_error(
- "in table.yaml, Field[name: `%s`] is primary in db, but not uniq in dtc",
- db_conn.Row[i_name_idx]);
- return -4;
- }
- uniq_fields_cnt_table--;
- }
- }
- /*field should be primary in db if configed uniq in table.yaml*/
- if (uniq_fields_cnt_table != 0) {
- log4cplus_error(
- "table.yaml have more uniq fields that not configed as primary in db");
- return -4;
- }
- for (int i = 0; i <= table_def->num_fields(); i++) {
- //bug fix volatile不在db中
- if (table_def->is_volatile(i))
- continue;
- const char *name = table_def->field_name(i);
- int j;
- for (j = 0; j < i_field_num; j++) {
- if (strcmp(ach_field_name[j], name) == 0)
- break;
- }
- if (j >= i_field_num) {
- log4cplus_error(
- "in table.yaml, Field[name: `%s`] not found in mysql",
- name);
- db_conn.free_result();
- return (-4);
- }
- }
- log4cplus_debug(
- "pid: %d, group-id: %d check table success, db: %s, sql: %s",
- getpid(), self_group_id, DBName, sql.c_str());
- db_conn.free_result();
- return (0);
- }
- int ConnectorProcess::machine_init(int GroupID, int r)
- {
- const char *p;
- // 初始化db配置信息
- if (dbConfig->machineCnt <= GroupID) {
- log4cplus_error(
- "parse config error, machineCnt[%d] <= GroupID[%d]",
- dbConfig->machineCnt, GroupID);
- return (-3);
- }
- typeof(&dbConfig->mach[0].role[0]) role =
- &dbConfig->mach[GroupID].role[r];
- memset(&db_host_conf, 0, sizeof(DBHost));
- p = strrchr(role->addr, ':');
- if (p == NULL) {
- strncpy(db_host_conf.Host, role->addr,
- sizeof(db_host_conf.Host) - 1);
- db_host_conf.Port = 0;
- } else {
- strncpy(db_host_conf.Host, role->addr,
- MIN(p - role->addr,
- (int)sizeof(db_host_conf.Host) - 1));
- db_host_conf.Port = atoi(p + 1);
- }
- strncpy(db_host_conf.User, role->user, sizeof(db_host_conf.User) - 1);
- strncpy(db_host_conf.Password, role->pass,
- sizeof(db_host_conf.Password) - 1);
- db_host_conf.ConnTimeout = proc_timeout;
- strncpy(db_host_conf.OptionFile, role->optfile,
- sizeof(db_host_conf.OptionFile) - 1);
- db_conn.do_config(&db_host_conf);
- if (db_conn.Open() != 0) {
- log4cplus_warning("connect db[%s] error: %s", db_host_conf.Host,
- db_conn.get_err_msg());
- return (-6);
- }
- log4cplus_debug("group-id: %d, pid: %d, db: %s, user: %s, pwd: %s",
- self_group_id, getpid(), db_host_conf.Host,
- db_host_conf.User, db_host_conf.Password);
- return (0);
- }
- int ConnectorProcess::do_init(int GroupID, const DbConfig *do_config,
- DTCTableDefinition *tdef, int slave)
- {
- int Ret;
- self_group_id = GroupID;
- table_def = tdef;
- Ret = config_db_by_struct(do_config);
- if (Ret != 0) {
- return (-1);
- }
- Ret = machine_init(GroupID, slave);
- if (Ret != 0) {
- return (-2);
- }
- return (0);
- }
- void ConnectorProcess::init_sql_buffer(void)
- {
- sql.clear();
- error_no = 0;
- }
- void ConnectorProcess::sql_append_string(const char *str, int len)
- {
- if (len == 0)
- len = strlen(str);
- if (sql.append(str, len) < 0) {
- error_no = -1;
- log4cplus_error("sql.append() error: %d, %m", sql.needed());
- }
- }
- /* 将字符串printf在原来字符串的后面,如果buffer不够大会自动重新分配buffer */
- void ConnectorProcess::sql_printf(const char *Format, ...)
- {
- va_list Arg;
- int Len;
- va_start(Arg, Format);
- Len = sql.vbprintf(Format, Arg);
- va_end(Arg);
- if (Len < 0) {
- error_no = -1;
- log4cplus_error("vsnprintf error: %d, %m", Len);
- }
- }
- void ConnectorProcess::sql_append_table(void)
- {
- sql_append_string(&left_quote, 1);
- sql_append_string(table_name);
- sql_append_string(&right_quote, 1);
- }
- void ConnectorProcess::sql_append_field(int fid)
- {
- sql_append_string(&left_quote, 1);
- sql_append_string(table_def->field_name(fid));
- sql_append_string(&right_quote, 1);
- }
- void ConnectorProcess::sql_append_comparator(uint8_t op)
- {
- // order is important
- static const char *const CompStr[] = { "=", "!=", "<", "<=", ">", ">=" };
- if (op >= DField::TotalComparison) {
- error_no = -1;
- log4cplus_error("unknow op: %d", op);
- } else {
- sql_append_string(CompStr[op]);
- }
- }
- void ConnectorProcess::init_table_name(const DTCValue *Key, int field_type)
- {
- log4cplus_info("line:%d" ,__LINE__);
- int dbid = 0, tableid = 0;
- uint64_t n;
- double f;
- if (Key != NULL && dbConfig->depoly != 0) {
- switch (field_type) {
- case DField::Signed:
- if (dbConfig->keyHashConfig.keyHashEnable) {
- n = dbConfig->keyHashConfig.keyHashFunction(
- (const char *)&(Key->s64),
- sizeof(Key->s64),
- dbConfig->keyHashConfig.keyHashLeftBegin,
- dbConfig->keyHashConfig
- .keyHashRightBegin);
- } else {
- if (Key->s64 >= 0)
- n = Key->s64;
- else if (Key->s64 == LONG_LONG_MIN)
- n = 0;
- else
- n = 0 - Key->s64;
- }
- log4cplus_info("div:%d , mod:%d" , dbConfig->dbDiv , dbConfig->dbMod);
- dbid = (n / dbConfig->dbDiv) % dbConfig->dbMod;
- tableid = (n / dbConfig->tblDiv) % dbConfig->tblMod;
- break;
- case DField::Unsigned:
- if (dbConfig->keyHashConfig.keyHashEnable) {
- n = dbConfig->keyHashConfig.keyHashFunction(
- (const char *)&(Key->u64),
- sizeof(Key->u64),
- dbConfig->keyHashConfig.keyHashLeftBegin,
- dbConfig->keyHashConfig
- .keyHashRightBegin);
- } else {
- n = Key->u64;
- }
- dbid = (n / dbConfig->dbDiv) % dbConfig->dbMod;
- tableid = (n / dbConfig->tblDiv) % dbConfig->tblMod;
- break;
- case DField::Float:
- if (dbConfig->keyHashConfig.keyHashEnable) {
- n = dbConfig->keyHashConfig.keyHashFunction(
- (const char *)&(Key->flt),
- sizeof(Key->flt),
- dbConfig->keyHashConfig.keyHashLeftBegin,
- dbConfig->keyHashConfig
- .keyHashRightBegin);
- dbid = (n / dbConfig->dbDiv) % dbConfig->dbMod;
- tableid = (n / dbConfig->tblDiv) %
- dbConfig->tblMod;
- } else {
- if (Key->flt >= 0)
- f = Key->flt;
- else
- f = 0 - Key->flt;
- dbid = ((int)(f / dbConfig->dbDiv)) %
- dbConfig->dbMod;
- tableid = ((int)(f / dbConfig->tblDiv)) %
- dbConfig->tblMod;
- }
- break;
- case DField::String:
- case DField::Binary:
- if (dbConfig->keyHashConfig.keyHashEnable) {
- n = dbConfig->keyHashConfig.keyHashFunction(
- Key->bin.ptr, Key->bin.len,
- dbConfig->keyHashConfig.keyHashLeftBegin,
- dbConfig->keyHashConfig
- .keyHashRightBegin);
- dbid = (n / dbConfig->dbDiv) % dbConfig->dbMod;
- tableid = (n / dbConfig->tblDiv) %
- dbConfig->tblMod;
- }
- break;
- }
- }
- log4cplus_info("line:%d" ,__LINE__);
- snprintf(DBName, sizeof(DBName), dbConfig->dbFormat, dbid);
- snprintf(table_name, sizeof(table_name), dbConfig->tblFormat, tableid);
- log4cplus_info("DBName:%s , table_name:%s" ,DBName , table_name);
- }
- int ConnectorProcess::select_field_concate(const DTCFieldSet *fs)
- {
- if (fs == NULL) {
- sql_append_const("COUNT(*)");
- } else {
- int i = 0;
- uint8_t mask[32];
- FIELD_ZERO(mask);
- fs->build_field_mask(mask);
- sql_append_field(0); // key
- for (i = 1; i < table_def->num_fields() + 1; i++) {
- sql_append_const(",");
- if (FIELD_ISSET(i, mask) == 0) {
- /* Missing field as 0 */
- sql_append_const("0");
- } else if (table_def->is_volatile(i) == 0) {
- sql_append_field(i);
- } else {
- // volatile field initialized as default value
- format_sql_value(table_def->default_value(i),
- table_def->field_type(i));
- }
- }
- }
- return 0;
- }
- std::string ConnectorProcess::value_to_str(const DTCValue *v, int fieldType)
- {
- if (v == NULL)
- return "NULL";
- char buf[32];
- std::string ret;
- switch (fieldType) {
- case DField::Signed:
- snprintf(buf, sizeof(buf), "%lld", (long long)v->s64);
- return buf;
- case DField::Unsigned:
- snprintf(buf, sizeof(buf), "%llu", (unsigned long long)v->u64);
- return buf;
- case DField::Float:
- snprintf(buf, sizeof(buf), "%f", v->flt);
- return buf;
- case DField::String:
- case DField::Binary:
- esc.clear();
- if (esc.expand(v->str.len * 2 + 1) < 0) {
- error_no = -1;
- log4cplus_error("realloc (size: %u) error: %m",
- v->str.len * 2 + 1);
- return "NULL";
- }
- db_conn.escape_string(esc.c_str(), v->str.ptr,
- v->str.len); // 先对字符串进行escape
- ret = '\'';
- ret += esc.c_str();
- ret += "\'";
- return ret;
- default:
- error_no = -1;
- log4cplus_error("unknown field type: %d", fieldType);
- return "UNKNOWN";
- }
- }
- inline int ConnectorProcess::format_sql_value(const DTCValue *Value,
- int iFieldType)
- {
- log4cplus_debug("format_sql_value iFieldType[%d]", iFieldType);
- if (Value == NULL) {
- sql_append_const("NULL");
- } else
- switch (iFieldType) {
- case DField::Signed:
- sql_printf("%lld", (long long)Value->s64);
- break;
- case DField::Unsigned:
- sql_printf("%llu", (unsigned long long)Value->u64);
- break;
- case DField::Float:
- sql_printf("'%f'", Value->flt);
- break;
- case DField::String:
- case DField::Binary:
- if (sql.append('\'') < 0)
- error_no = -1;
- if (!Value->str.is_empty()) {
- esc.clear();
- if (esc.expand(Value->str.len * 2 + 1) < 0) {
- error_no = -1;
- log4cplus_error(
- "realloc (size: %u) error: %m",
- Value->str.len * 2 + 1);
- //return(-1);
- return (0);
- }
- db_conn.escape_string(
- esc.c_str(), Value->str.ptr,
- Value->str.len); // 先对字符串进行escape
- if (sql.append(esc.c_str()) < 0)
- error_no = -1;
- }
- if (sql.append('\'') < 0)
- error_no = -1;
- break;
- default:;
- };
- return 0;
- }
- int ConnectorProcess::condition_concate(const DTCFieldValue *Condition)
- {
- int i;
- if (Condition == NULL)
- return (0);
- for (i = 0; i < Condition->num_fields(); i++) {
- if (table_def->is_volatile(i))
- return -1;
- sql_append_const(" AND ");
- sql_append_field(Condition->field_id(i));
- sql_append_comparator(Condition->field_operation(i));
- format_sql_value(Condition->field_value(i),
- Condition->field_type(i));
- }
- return 0;
- }
- inline int ConnectorProcess::set_default_value(int field_type, DTCValue &Value)
- {
- switch (field_type) {
- case DField::Signed:
- Value.s64 = 0;
- break;
- case DField::Unsigned:
- Value.u64 = 0;
- break;
- case DField::Float:
- Value.flt = 0.0;
- break;
- case DField::String:
- Value.str.len = 0;
- Value.str.ptr = 0;
- break;
- case DField::Binary:
- Value.bin.len = 0;
- Value.bin.ptr = 0;
- break;
- default:
- Value.s64 = 0;
- };
- return (0);
- }
- inline int ConnectorProcess::str_to_value(char *Str, int fieldid,
- int field_type, DTCValue &Value)
- {
- if (Str == NULL) {
- log4cplus_debug(
- "Str is NULL, field_type: %d. Check mysql table definition.",
- field_type);
- set_default_value(field_type, Value);
- return (0);
- }
- switch (field_type) {
- case DField::Signed:
- errno = 0;
- Value.s64 = strtoll(Str, NULL, 10);
- if (errno != 0)
- return (-1);
- break;
- case DField::Unsigned:
- errno = 0;
- Value.u64 = strtoull(Str, NULL, 10);
- if (errno != 0)
- return (-1);
- break;
- case DField::Float:
- errno = 0;
- Value.flt = strtod(Str, NULL);
- if (errno != 0)
- return (-1);
- break;
- case DField::String:
- Value.str.len = _lengths[fieldid];
- Value.str.ptr =
- Str; // 不重新new,要等这个value使用完后释放内存(如果Str是动态分配的)
- break;
- case DField::Binary:
- Value.bin.len = _lengths[fieldid];
- Value.bin.ptr = Str;
- break;
- default:
- log4cplus_error("field[%d] type[%d] invalid.", fieldid,
- field_type);
- break;
- }
- return (0);
- }
- int ConnectorProcess::save_row(RowValue *Row, DtcJob *Task)
- {
- int i, Ret;
- if (table_def->num_fields() < 0)
- return (-1);
- for (i = 1; i <= table_def->num_fields(); i++) {
- //db_conn.Row[0]是key的值,table_def->Field[0]也是key,
- //因此从1开始。结果Row也是从1开始的(不包括key)
- Ret = str_to_value(db_conn.Row[i], i, table_def->field_type(i),
- (*Row)[i]);
- if (Ret != 0) {
- log4cplus_error(
- "string[%s] conver to value[%d] error: %d, %m",
- db_conn.Row[i], table_def->field_type(i), Ret);
- return (-2);
- }
- }
- Task->update_key(Row);
- Ret = Task->append_row(Row);
- if (Ret < 0) {
- return (-3);
- }
- return (0);
- }
- int ConnectorProcess::process_statement_query(
- const DTCValue* key,
- std::string& s_sql)
- {
- // hash 计算key落在哪库哪表
- init_table_name(key, table_def->field_type(0));
- log4cplus_debug("db: %s, sql: %s", DBName, s_sql.c_str());
- // 分表时,需更更换表名
- if (dbConfig->depoly&2) {
- const char* p_table_name = table_def->table_name();
- if (NULL == p_table_name) {
- return -1;
- }
-
- int i_pos = s_sql.find(p_table_name);
- if (i_pos != std::string::npos) {
- int i_table_name_len = strlen(p_table_name);
- s_sql.replace(i_pos , i_table_name_len , table_name);
- }
- }
-
- // 重新选库,并查询
- int i_ret = db_conn.do_query(DBName, s_sql.c_str());
- if (i_ret != 0) {
- int i_err = db_conn.get_err_no();
- if (i_err != -ER_DUP_ENTRY) {
- log4cplus_warning("db query error: %s",
- db_conn.get_err_msg());
- } else {
- log4cplus_info("db query error: %s",
- db_conn.get_err_msg());
- return -ER_DUP_ENTRY;
- }
- }
- return i_ret;
- }
- int ConnectorProcess::process_select(DtcJob *Task)
- {
- log4cplus_info("line:%d" ,__LINE__);
- int Ret, i;
- RowValue *Row = NULL;
- int nRows;
- int haslimit =
- !Task->count_only() && (Task->requestInfo.limit_start() ||
- Task->requestInfo.limit_count());
- log4cplus_info("line:%d" ,__LINE__);
- set_title("SELECT...");
- init_sql_buffer();
- log4cplus_info("line:%d" ,__LINE__);
- if (Task == NULL)
- {
- log4cplus_info("line:%d" ,__LINE__);
- return 0;
- }
- if (table_def == NULL)
- {
- log4cplus_info("line:%d" ,__LINE__);
- return 0;
- }
- log4cplus_info("line:%d" ,__LINE__);
- init_table_name(Task->request_key(), table_def->field_type(0));
- log4cplus_info("line:%d" ,__LINE__);
- if (haslimit)
- sql_append_const("SELECT SQL_CALC_FOUND_ROWS ");
- else
- sql_append_const("SELECT ");
- select_field_concate(Task->request_fields()); // 总是SELECT所有字段
- sql_append_const(" FROM ");
- sql_append_table();
- log4cplus_info("line:%d" ,__LINE__);
- // condition
- sql_append_const(" WHERE ");
- sql_append_field(0);
- sql_append_const("=");
- format_sql_value(Task->request_key(), table_def->field_type(0));
- log4cplus_info("line:%d" ,__LINE__);
- if (condition_concate(Task->request_condition()) != 0) {
- Task->set_error(-EC_BAD_COMMAND, __FUNCTION__,
- "Volatile condition not allowed");
- return (-7);
- }
- log4cplus_info("line:%d" ,__LINE__);
- if (dbConfig->ordSql) {
- sql_append_const(" ");
- sql_append_string(dbConfig->ordSql);
- }
- if (Task->requestInfo.limit_count() > 0) {
- sql_printf(" LIMIT %u, %u", Task->requestInfo.limit_start(),
- Task->requestInfo.limit_count());
- }
- log4cplus_info("line:%d" ,__LINE__);
- if (error_no !=
- 0) { // 主要检查PrintfAppend是否发生过错误,这里统一检查一次
- Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
- log4cplus_error("error occur: %d", error_no);
- return (-1);
- }
- //bug fixed with count *
- Ret = Task->prepare_result_no_limit();
- if (Ret != 0) {
- Task->set_error(-EC_ERROR_BASE, __FUNCTION__,
- "task prepare-result error");
- log4cplus_error("task prepare-result error: %d, %m", Ret);
- return (-2);
- }
- if (!Task->count_only()) {
- Row = new RowValue(table_def);
- if (Row == NULL) {
- Task->set_error(-ENOMEM, __FUNCTION__, "new row error");
- log4cplus_error("%s new RowValue error: %m", "");
- return (-3);
- }
- }
- log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
- Ret = db_conn.do_query(DBName, sql.c_str());
- log4cplus_debug("SELECT %d %d", Ret, db_conn.get_raw_err_no());
- if (Ret != 0) {
- delete Row;
- Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
- db_conn.get_err_msg());
- log4cplus_warning("db query error: %s, pid: %d, group-id: %d",
- db_conn.get_err_msg(), getpid(),
- self_group_id);
- return (-4);
- }
- Ret = db_conn.use_result();
- if (Ret != 0) {
- delete Row;
- Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
- db_conn.get_err_msg());
- log4cplus_warning("db user result error: %s",
- db_conn.get_err_msg());
- return (-5);
- }
- nRows = db_conn.res_num;
- for (i = 0; i < db_conn.res_num; i++) {
- Ret = db_conn.fetch_row();
- if (Ret != 0) {
- delete Row;
- db_conn.free_result();
- Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
- db_conn.get_err_msg());
- log4cplus_warning("db fetch row error: %s",
- db_conn.get_err_msg());
- return (-6);
- }
- //get field value length for the row
- _lengths = 0;
- _lengths = db_conn.get_lengths();
- if (0 == _lengths) {
- delete Row;
- db_conn.free_result();
- Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
- db_conn.get_err_msg());
- log4cplus_warning("db fetch row length error: %s",
- db_conn.get_err_msg());
- return (-6);
- }
- // 将结果转换,并保存到task的result里
- if (Task->count_only()) {
- nRows = atoi(db_conn.Row[0]);
- //bug fixed return count *
- Task->set_total_rows(nRows);
- break;
- } else if ((Ret = save_row(Row, Task)) != 0) {
- delete Row;
- db_conn.free_result();
- Task->set_error(-EC_ERROR_BASE, __FUNCTION__,
- "task append row error");
- log4cplus_error("task append row error: %d", Ret);
- return (-7);
- }
- }
- log4cplus_debug(
- "pid: %d, group-id: %d, result: %d row, db: %s, sql: %s",
- getpid(), self_group_id, nRows, DBName, sql.c_str());
- delete Row;
- db_conn.free_result();
- //bug fixed确认客户端带Limit限制
- if (haslimit) { // 获取总行数
- init_sql_buffer();
- sql_append_const("SELECT FOUND_ROWS() ");
- log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
- Ret = db_conn.do_query(DBName, sql.c_str());
- log4cplus_debug("SELECT %d %d", Ret, db_conn.get_raw_err_no());
- if (Ret != 0) {
- Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
- db_conn.get_err_msg());
- log4cplus_warning(
- "db query error: %s, pid: %d, group-id: %d",
- db_conn.get_err_msg(), getpid(), self_group_id);
- return (-4);
- }
- Ret = db_conn.use_result();
- if (Ret != 0) {
- Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
- db_conn.get_err_msg());
- log4cplus_warning("db user result error: %s",
- db_conn.get_err_msg());
- return (-5);
- }
- Ret = db_conn.fetch_row();
- if (Ret != 0) {
- db_conn.free_result();
- Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
- db_conn.get_err_msg());
- log4cplus_warning("db fetch row error: %s",
- db_conn.get_err_msg());
- return (-6);
- }
- unsigned long totalRows = strtoul(db_conn.Row[0], NULL, 0);
- if (totalRows == 0) {
- if (nRows != 0)
- totalRows =
- Task->requestInfo.limit_start() + nRows;
- else
- totalRows = 0;
- }
- Ret = Task->set_total_rows(totalRows, 1);
- log4cplus_debug("db: total-rows: %lu, ret: %d", totalRows, Ret);
- db_conn.free_result();
- }
- return (0);
- }
- int ConnectorProcess::update_field_concate(const DTCFieldValue *UpdateInfo)
- {
- int i;
- if (UpdateInfo == NULL)
- return (0);
- for (i = 0; i < UpdateInfo->num_fields(); i++) {
- const int fid = UpdateInfo->field_id(i);
- if (table_def->is_volatile(fid))
- continue;
- switch (UpdateInfo->field_operation(i)) {
- case DField::Set:
- if (i > 0)
- sql_append_const(",");
- sql_append_field(fid);
- sql_append_const("=");
- format_sql_value(UpdateInfo->field_value(i),
- UpdateInfo->field_type(i));
- break;
- case DField::Add:
- if (i > 0)
- sql_append_const(",");
- sql_append_field(fid);
- sql_append_const("=");
- sql_append_field(fid);
- sql_append_const("+");
- format_sql_value(UpdateInfo->field_value(i),
- UpdateInfo->field_type(i));
- break;
- default:
- break;
- };
- }
- return 0;
- }
- int ConnectorProcess::default_value_concate(const DTCFieldValue *UpdateInfo)
- {
- int i;
- uint8_t mask[32];
- FIELD_ZERO(mask);
- if (UpdateInfo)
- UpdateInfo->build_field_mask(mask);
- for (i = 1; i <= table_def->num_fields(); i++) {
- if (FIELD_ISSET(i, mask) || table_def->is_volatile(i))
- continue;
- sql_append_const(",");
- sql_append_field(i);
- sql_append_const("=");
- format_sql_value(table_def->default_value(i),
- table_def->field_type(i));
- }
- return 0;
- }
- int ConnectorProcess::process_insert(DtcJob *Task)
- {
- int Ret;
- set_title("INSERT...");
- init_sql_buffer();
- init_table_name(Task->request_key(), table_def->field_type(0));
- sql_append_const("INSERT INTO ");
- sql_append_table();
- sql_append_const(" SET ");
- std::map<std::string, std::string> fieldValues;
- if (Task->request_key()) {
- fieldValues[table_def->field_name(0)] = value_to_str(
- Task->request_key(), table_def->field_type(0));
- }
- if (Task->request_operation()) {
- const DTCFieldValue *updateInfo = Task->request_operation();
- for (int i = 0; i < updateInfo->num_fields(); ++i) {
- int fid = updateInfo->field_id(i);
- if (table_def->is_volatile(fid))
- continue;
- fieldValues[table_def->field_name(fid)] =
- value_to_str(updateInfo->field_value(i),
- updateInfo->field_type(i));
- }
- }
- for (int i = 1; i <= table_def->num_fields(); ++i) {
- if (table_def->is_volatile(i))
- continue;
- if (fieldValues.find(table_def->field_name(i)) !=
- fieldValues.end())
- continue;
- fieldValues[table_def->field_name(i)] = value_to_str(
- table_def->default_value(i), table_def->field_type(i));
- }
- for (std::map<std::string, std::string>::iterator iter =
- fieldValues.begin();
- iter != fieldValues.end(); ++iter) {
- sql_append_string(&left_quote, 1);
- sql_append_string(iter->first.c_str(), iter->first.length());
- sql_append_string(&right_quote, 1);
- sql_append_const("=");
- sql_append_string(iter->second.c_str(), iter->second.length());
- sql_append_const(",");
- }
- if (sql.at(-1) == ',')
- sql.trunc(-1);
- if (error_no != 0) { // 主要检查PrintfAppend是否发生过错误
- Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
- log4cplus_error("error occur: %d", error_no);
- return (-1);
- }
- log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
- Ret = db_conn.do_query(DBName, sql.c_str());
- log4cplus_debug("INSERT %d %d", Ret, db_conn.get_raw_err_no());
- if (Ret != 0) {
- int err = db_conn.get_err_no();
- Task->set_error_dup(err, __FUNCTION__, db_conn.get_err_msg());
- if (err != -ER_DUP_ENTRY)
- log4cplus_warning("db query error: %s",
- db_conn.get_err_msg());
- else
- log4cplus_info("db query error: %s",
- db_conn.get_err_msg());
- return (-1);
- }
- Task->resultInfo.set_affected_rows(db_conn.affected_rows());
- log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
- if (table_def->has_auto_increment()) {
- uint64_t id = db_conn.insert_id();
- if (id) {
- Task->resultInfo.set_insert_id(id);
- if (table_def->key_auto_increment())
- Task->resultInfo.set_key(id);
- }
- }
- return (0);
- }
- int ConnectorProcess::process_update(DtcJob *Task)
- {
- int Ret;
- if (Task->request_operation() == NULL) {
- Task->set_error(-EC_ERROR_BASE, __FUNCTION__,
- "update field not found");
- return (-1);
- }
- if (Task->request_operation()->has_type_commit() == 0) {
- // pure volatile fields update, always succeed
- return (0);
- }
- set_title("UPDATE...");
- init_sql_buffer();
- init_table_name(Task->request_key(), table_def->field_type(0));
- sql_append_const("UPDATE ");
- sql_append_table();
- sql_append_const(" SET ");
- update_field_concate(Task->request_operation());
- // key
- sql_append_const(" WHERE ");
- sql_append_field(0);
- sql_append_const("=");
- format_sql_value(Task->request_key(), table_def->field_type(0));
- // condition
- if (condition_concate(Task->request_condition()) != 0) {
- Task->set_error(-EC_BAD_COMMAND, __FUNCTION__,
- "Volatile condition not allowed");
- return (-7);
- }
- if (error_no != 0) { // 主要检查PrintfAppend是否发生过错误
- Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
- log4cplus_error("error occur: %d", error_no);
- return (-1);
- }
- log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
- Ret = db_conn.do_query(DBName, sql.c_str());
- log4cplus_debug("UPDATE %d %d", Ret, db_conn.get_raw_err_no());
- if (Ret != 0) {
- int err = db_conn.get_err_no();
- Task->set_error_dup(err, __FUNCTION__, db_conn.get_err_msg());
- if (err != -ER_DUP_ENTRY)
- log4cplus_warning("db query error: %s",
- db_conn.get_err_msg());
- else
- log4cplus_info("db query error: %s",
- db_conn.get_err_msg());
- return -1;
- }
- Task->resultInfo.set_affected_rows(db_conn.affected_rows());
- log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
- return (0);
- }
- int ConnectorProcess::process_delete(DtcJob *Task)
- {
- int Ret;
- set_title("DELETE...");
- init_sql_buffer();
- init_table_name(Task->request_key(), table_def->field_type(0));
- sql_append_const("DELETE FROM ");
- sql_append_table();
- // key
- sql_append_const(" WHERE ");
- sql_append_field(0);
- sql_append_const("=");
- format_sql_value(Task->request_key(), table_def->field_type(0));
- // condition
- if (condition_concate(Task->request_condition()) != 0) {
- Task->set_error(-EC_BAD_COMMAND, __FUNCTION__,
- "Volatile condition not allowed");
- return (-7);
- }
- if (error_no !=
- 0) { // 主要检查PrintfAppend是否发生过错误,这里统一检查一次
- Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
- log4cplus_error("error occur: %d", error_no);
- return (-1);
- }
- log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
- Ret = db_conn.do_query(DBName, sql.c_str());
- log4cplus_debug("DELETE %d %d", Ret, db_conn.get_raw_err_no());
- if (Ret != 0) {
- Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
- db_conn.get_err_msg());
- log4cplus_warning("db query error: %s", db_conn.get_err_msg());
- return (-1);
- }
- Task->resultInfo.set_affected_rows(db_conn.affected_rows());
- log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
- return (0);
- }
- int ConnectorProcess::do_process(DtcJob* Task)
- {
- log4cplus_info("line:%d" ,__LINE__);
- if (Task == NULL) {
- log4cplus_error("Task is NULL!%s", "");
- return (-1);
- }
- log4cplus_info("line:%d" ,__LINE__);
- table_def = TableDefinitionManager::instance()->get_cur_table_def();
- switch (Task->request_code()) {
- case DRequest::TYPE_PASS:
- case DRequest::Purge:
- case DRequest::Flush:
- return 0;
-
- case DRequest::Get:
- return process_select(Task);
- case DRequest::Insert:
- return process_insert(Task);
- case DRequest::Update:
- return process_update(Task);
- case DRequest::Delete:
- return process_delete(Task);
- case DRequest::Replace:
- return process_replace(Task);
- // case DRequest::ReloadConfig:
- // return process_reload_config(Task);
- default:
- Task->set_error(-EC_BAD_COMMAND, __FUNCTION__,
- "invalid request-code");
- return (-1);
- }
- }
- int ConnectorProcess::process_replace(DtcJob *Task)
- {
- int Ret;
- set_title("REPLACE...");
- init_sql_buffer();
- init_table_name(Task->request_key(), table_def->field_type(0));
- sql_append_const("REPLACE INTO ");
- sql_append_table();
- sql_append_const(" SET ");
- sql_append_field(0);
- sql_append_const("=");
- format_sql_value(Task->request_key(), table_def->field_type(0));
- sql_append_const(",");
- /* 补全缺失的默认值 */
- if (Task->request_operation())
- update_field_concate(Task->request_operation());
- else if (sql.at(-1) == ',') {
- sql.trunc(-1);
- }
- default_value_concate(Task->request_operation());
- if (error_no != 0) { // 主要检查PrintfAppend是否发生过错误
- Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
- log4cplus_error("error occur: %d", error_no);
- return (-1);
- }
- if (error_no != 0) { // 主要检查PrintfAppend是否发生过错误
- Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
- log4cplus_error("error occur: %d", error_no);
- return (-1);
- }
- log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
- Ret = db_conn.do_query(DBName, sql.c_str());
- log4cplus_debug("REPLACE %d %d", Ret, db_conn.get_raw_err_no());
- if (Ret != 0) {
- Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
- db_conn.get_err_msg());
- log4cplus_warning("db query error: %s", db_conn.get_err_msg());
- return (-1);
- }
- Task->resultInfo.set_affected_rows(db_conn.affected_rows());
- log4cplus_debug("%s",
- "ConnectorProcess::ProcessReplaceTask() successful.");
- return 0;
- }
- ConnectorProcess::~ConnectorProcess()
- {
- }
- void ConnectorProcess::init_title(int group, int role)
- {
- title_prefix_size = snprintf(name, sizeof(name), "connector%d%c", group,
- MACHINEROLESTRING[role]);
- memcpy(title, name, title_prefix_size);
- title[title_prefix_size++] = ':';
- title[title_prefix_size++] = ' ';
- title[title_prefix_size] = '\0';
- title[sizeof(title) - 1] = '\0';
- }
- void ConnectorProcess::set_title(const char *status)
- {
- strncpy(title + title_prefix_size, status,
- sizeof(title) - 1 - title_prefix_size);
- set_proc_title(title);
- }
- int ConnectorProcess::process_reload_config(DtcJob *Task)
- {
- int cacheKey = DbConfig::get_shm_id(g_dtc_config->get_config_node());;
- BlockProperties stInfo;
- BufferPond cachePool;
- memset(&stInfo, 0, sizeof(stInfo));
- stInfo.ipc_mem_key = cacheKey;
- stInfo.key_size = TableDefinitionManager::instance()
- ->get_cur_table_def()
- ->key_format();
- stInfo.read_only = 1;
- if (cachePool.cache_open(&stInfo)) {
- log4cplus_error("%s", cachePool.error());
- Task->set_error(-EC_RELOAD_CONFIG_FAILED, __FUNCTION__,
- "open cache error!");
- return -1;
- }
- cachePool.reload_table();
- log4cplus_error(
- "cmd notify work helper reload table, tableIdx : [%d], pid : [%d]",
- cachePool.shm_table_idx(), getpid());
- return 0;
- }
|