mysql_operation.cc 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. */
  17. #include <stdio.h>
  18. #include <stdlib.h>
  19. #include <string.h>
  20. #include <stdarg.h>
  21. #include <limits.h>
  22. #include <errno.h>
  23. #include <unistd.h>
  24. #include <netinet/in.h>
  25. #include <arpa/inet.h>
  26. #include <map>
  27. #include <string>
  28. // local include files
  29. #include "mysql_operation.h"
  30. // common include files
  31. #include "protocol.h"
  32. #include "log/log.h"
  33. #include "proc_title.h"
  34. #include "table/table_def_manager.h"
  35. #include "daemon/daemon.h"
  36. // mysql include files
  37. #include "mysqld_error.h"
  38. // core include files
  39. #include "buffer/buffer_pond.h"
  40. #define MIN(x, y) ((x) <= (y) ? (x) : (y))
  41. ConnectorProcess::ConnectorProcess() : _lengths(0)
  42. {
  43. error_no = 0;
  44. left_quote = '`';
  45. right_quote = '`';
  46. title_prefix_size = 0;
  47. time(&last_access);
  48. ping_timeout = 9;
  49. proc_timeout = 0;
  50. strncpy(name, "helper", 6);
  51. }
  52. int ConnectorProcess::try_ping(void)
  53. {
  54. return db_conn.do_ping();
  55. }
  56. void ConnectorProcess::init_ping_timeout(void)
  57. {
  58. int64_t to = db_conn.get_variable("wait_timeout");
  59. log4cplus_debug("Server idle timeout %lld", (long long)to);
  60. if (to < 10)
  61. to = 10;
  62. else if (to > 600)
  63. to = 600;
  64. ping_timeout = to * 9 / 10;
  65. }
  66. int ConnectorProcess::config_db_by_struct(const DbConfig *cf)
  67. {
  68. if (cf == NULL)
  69. return -1;
  70. dbConfig = cf;
  71. return (0);
  72. }
  73. #define DIM(a) (sizeof(a) / sizeof(a[0]))
  74. static int get_field_type(const char *szType, int &i_type,
  75. unsigned int &ui_size)
  76. {
  77. unsigned int i;
  78. int iTmp;
  79. typedef struct {
  80. char m_szName[256];
  81. int m_iType;
  82. int m_uiSize;
  83. } CMysqlField;
  84. static CMysqlField astField[] = { { "tinyint", 1, 1 },
  85. { "smallint", 1, 2 },
  86. { "mediumint", 1, 4 },
  87. { "int", 1, 4 },
  88. { "bigint", 1, 8 },
  89. { "float", 3, 4 },
  90. { "double", 3, 8 },
  91. { "decimal", 3, 8 },
  92. { "datetime", 4, 20 },
  93. { "date", 4, 11 },
  94. { "timestamp", 4, 20 },
  95. { "time", 4, 11 },
  96. { "year", 4, 5 },
  97. { "varchar", 4, 255 },
  98. { "char", 4, 255 },
  99. { "varbinary", 5, 255 },
  100. { "binary", 5, 255 },
  101. { "tinyblob", 5, 255 },
  102. { "tinytext", 4, 255 },
  103. { "blob", 5, 65535 },
  104. { "text", 4, 65535 },
  105. { "mediumblob", 5, 16777215 },
  106. { "mediumtext", 4, 16777215 },
  107. { "longblob", 5, 4294967295U },
  108. { "longtext", 4, 4294967295U },
  109. { "enum", 4, 255 },
  110. { "set", 2, 8 } };
  111. for (i = 0; i < DIM(astField); i++) {
  112. if (strncasecmp(szType, astField[i].m_szName,
  113. strlen(astField[i].m_szName)) == 0) {
  114. i_type = astField[i].m_iType;
  115. ui_size = astField[i].m_uiSize;
  116. if (strncasecmp(szType, "varchar", 7) == 0) {
  117. if (sscanf(szType + 8, "%d", &iTmp) == 1)
  118. ui_size = iTmp;
  119. } else if (strncasecmp(szType, "char", 4) == 0) {
  120. if (sscanf(szType + 5, "%d", &iTmp) == 1)
  121. ui_size = iTmp;
  122. } else if (strncasecmp(szType, "varbinary", 9) == 0) {
  123. if (sscanf(szType + 10, "%d", &iTmp) == 1)
  124. ui_size = iTmp;
  125. } else if (strncasecmp(szType, "binary", 6) == 0) {
  126. if (sscanf(szType + 7, "%d", &iTmp) == 1)
  127. ui_size = iTmp;
  128. }
  129. if (i_type == 1 && strstr(szType, "unsigned") != NULL)
  130. i_type = 2;
  131. if (i_type == 3 && strstr(szType, "unsigned") != NULL)
  132. fprintf(stderr,
  133. "#warning: dtc not support unsigned double!\n");
  134. break;
  135. }
  136. }
  137. return (0);
  138. }
  139. int ConnectorProcess::check_table()
  140. {
  141. int Ret;
  142. int i;
  143. int i_field_num;
  144. char ach_field_name[256][256];
  145. snprintf(DBName, sizeof(DBName), dbConfig->dbFormat,
  146. dbConfig->mach[self_group_id].dbIdx[0]);
  147. snprintf(table_name, sizeof(table_name), dbConfig->tblFormat, 0);
  148. init_sql_buffer();
  149. sql_append_const("show columns from `");
  150. sql_append_string(table_name);
  151. sql_append_const("`");
  152. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  153. Ret = db_conn.do_query(DBName, sql.c_str());
  154. log4cplus_debug("SELECT %d %d", Ret, db_conn.get_raw_err_no());
  155. if (Ret != 0) {
  156. log4cplus_warning("db query error: %s, pid: %d, group-id: %d",
  157. db_conn.get_err_msg(), getpid(),
  158. self_group_id);
  159. return (-1);
  160. }
  161. Ret = db_conn.use_result();
  162. if (Ret != 0) {
  163. log4cplus_warning("db user result error: %s",
  164. db_conn.get_err_msg());
  165. return (-2);
  166. }
  167. // 获取返回结果的各列位置
  168. int i_name_idx = 0, i_type_idx = 0;
  169. int i_null_idx = 0, i_key_idx = 0;
  170. int i_default_idx = 0, i_extra_idx = 0;
  171. unsigned int ui_num_fields = mysql_num_fields(db_conn.Res);
  172. MYSQL_FIELD *pst_fields = mysql_fetch_fields(db_conn.Res);
  173. for (i = 0; i < (int)ui_num_fields; i++) {
  174. if (strcasecmp("Field", pst_fields[i].name) == 0)
  175. i_name_idx = i;
  176. else if (strcasecmp("Type", pst_fields[i].name) == 0)
  177. i_type_idx = i;
  178. else if (strcasecmp("Null", pst_fields[i].name) == 0)
  179. i_null_idx = i;
  180. else if (strcasecmp("Key", pst_fields[i].name) == 0)
  181. i_key_idx = i;
  182. else if (strcasecmp("Default", pst_fields[i].name) == 0)
  183. i_default_idx = i;
  184. else if (strcasecmp("Extra", pst_fields[i].name) == 0)
  185. i_extra_idx = i;
  186. }
  187. int iFid;
  188. i_field_num = 0;
  189. memset(ach_field_name, 0, sizeof(ach_field_name));
  190. int uniq_fields_cnt_table = table_def->uniq_fields();
  191. for (i = 0; i < db_conn.res_num; i++) {
  192. Ret = db_conn.fetch_row();
  193. if (Ret != 0) {
  194. db_conn.free_result();
  195. log4cplus_warning("db fetch row error: %s",
  196. db_conn.get_err_msg());
  197. return (-3);
  198. }
  199. strncpy(ach_field_name[i_field_num], db_conn.Row[i_name_idx],
  200. 255);
  201. i_field_num++;
  202. iFid = table_def->field_id(db_conn.Row[i_name_idx]);
  203. if (iFid == -1) {
  204. log4cplus_debug("field[%s] not found in table.yaml",
  205. db_conn.Row[i_name_idx]);
  206. continue;
  207. }
  208. if (table_def->is_volatile(iFid)) {
  209. log4cplus_error(
  210. "field[name: `%s`] found in table.yaml and DB both, can't be Volatile",
  211. db_conn.Row[i_name_idx]);
  212. db_conn.free_result();
  213. return (-4);
  214. }
  215. if (table_def->is_timestamp(iFid)) {
  216. log4cplus_error(
  217. "in table.yaml, Field[name: `%s`]'s is timestamp, not support in DB mode",
  218. db_conn.Row[i_name_idx]);
  219. db_conn.free_result();
  220. return (-4);
  221. }
  222. //field type & size
  223. int i_type = -1;
  224. unsigned ui_size = 0;
  225. get_field_type(db_conn.Row[i_type_idx], i_type, ui_size);
  226. if (i_type != table_def->field_type(iFid)) {
  227. log4cplus_error(
  228. "in table.yaml, Field[name: `%s`]'s type incorrect. conf: %d, mysql:%d",
  229. db_conn.Row[i_name_idx],
  230. table_def->field_type(iFid), i_type);
  231. db_conn.free_result();
  232. return (-4);
  233. }
  234. if ((int)ui_size != table_def->field_size(iFid) &&
  235. !(ui_size >= (64 << 20) &&
  236. table_def->field_size(iFid) >= (64 << 20))) {
  237. log4cplus_error(
  238. "in table.yaml, Field[name: `%s`]'s size incorrect. conf: %d, mysql:%u",
  239. db_conn.Row[i_name_idx],
  240. table_def->field_size(iFid), ui_size);
  241. db_conn.free_result();
  242. return (-4);
  243. }
  244. if (db_conn.Row[i_extra_idx] != NULL &&
  245. strcasecmp("auto_increment", db_conn.Row[i_extra_idx]) ==
  246. 0) {
  247. if (table_def->auto_increment_field_id() != iFid) {
  248. log4cplus_error(
  249. "in table.yaml, Field[name: `%s`]'s default-value incorrect. conf: non-auto_increment, mysql:auto_increment",
  250. db_conn.Row[i_name_idx]);
  251. db_conn.free_result();
  252. return (-4);
  253. }
  254. }
  255. /*field should be uniq in table.yaml if configed primary in db */
  256. uint8_t *uniq_fields = table_def->uniq_fields_list();
  257. if (db_conn.Row[i_key_idx] != NULL &&
  258. (strcasecmp("PRI", db_conn.Row[i_key_idx]) == 0 ||
  259. strcasecmp("UNI", db_conn.Row[i_key_idx]) == 0)) {
  260. int j = 0;
  261. for (j = 0; j < table_def->uniq_fields(); j++) {
  262. if (uniq_fields[j] == iFid)
  263. break;
  264. }
  265. if (j >= table_def->uniq_fields()) {
  266. log4cplus_error(
  267. "in table.yaml, Field[name: `%s`] is primary in db, but not uniq in dtc",
  268. db_conn.Row[i_name_idx]);
  269. return -4;
  270. }
  271. uniq_fields_cnt_table--;
  272. }
  273. }
  274. /*field should be primary in db if configed uniq in table.yaml*/
  275. if (uniq_fields_cnt_table != 0) {
  276. log4cplus_error(
  277. "table.yaml have more uniq fields that not configed as primary in db");
  278. return -4;
  279. }
  280. for (int i = 0; i <= table_def->num_fields(); i++) {
  281. //bug fix volatile不在db中
  282. if (table_def->is_volatile(i))
  283. continue;
  284. const char *name = table_def->field_name(i);
  285. int j;
  286. for (j = 0; j < i_field_num; j++) {
  287. if (strcmp(ach_field_name[j], name) == 0)
  288. break;
  289. }
  290. if (j >= i_field_num) {
  291. log4cplus_error(
  292. "in table.yaml, Field[name: `%s`] not found in mysql",
  293. name);
  294. db_conn.free_result();
  295. return (-4);
  296. }
  297. }
  298. log4cplus_debug(
  299. "pid: %d, group-id: %d check table success, db: %s, sql: %s",
  300. getpid(), self_group_id, DBName, sql.c_str());
  301. db_conn.free_result();
  302. return (0);
  303. }
  304. int ConnectorProcess::machine_init(int GroupID, int r)
  305. {
  306. const char *p;
  307. // 初始化db配置信息
  308. if (dbConfig->machineCnt <= GroupID) {
  309. log4cplus_error(
  310. "parse config error, machineCnt[%d] <= GroupID[%d]",
  311. dbConfig->machineCnt, GroupID);
  312. return (-3);
  313. }
  314. typeof(&dbConfig->mach[0].role[0]) role =
  315. &dbConfig->mach[GroupID].role[r];
  316. memset(&db_host_conf, 0, sizeof(DBHost));
  317. p = strrchr(role->addr, ':');
  318. if (p == NULL) {
  319. strncpy(db_host_conf.Host, role->addr,
  320. sizeof(db_host_conf.Host) - 1);
  321. db_host_conf.Port = 0;
  322. } else {
  323. strncpy(db_host_conf.Host, role->addr,
  324. MIN(p - role->addr,
  325. (int)sizeof(db_host_conf.Host) - 1));
  326. db_host_conf.Port = atoi(p + 1);
  327. }
  328. strncpy(db_host_conf.User, role->user, sizeof(db_host_conf.User) - 1);
  329. strncpy(db_host_conf.Password, role->pass,
  330. sizeof(db_host_conf.Password) - 1);
  331. db_host_conf.ConnTimeout = proc_timeout;
  332. strncpy(db_host_conf.OptionFile, role->optfile,
  333. sizeof(db_host_conf.OptionFile) - 1);
  334. db_conn.do_config(&db_host_conf);
  335. if (db_conn.Open() != 0) {
  336. log4cplus_warning("connect db[%s] error: %s", db_host_conf.Host,
  337. db_conn.get_err_msg());
  338. return (-6);
  339. }
  340. log4cplus_debug("group-id: %d, pid: %d, db: %s, user: %s, pwd: %s",
  341. self_group_id, getpid(), db_host_conf.Host,
  342. db_host_conf.User, db_host_conf.Password);
  343. return (0);
  344. }
  345. int ConnectorProcess::do_init(int GroupID, const DbConfig *do_config,
  346. DTCTableDefinition *tdef, int slave)
  347. {
  348. int Ret;
  349. self_group_id = GroupID;
  350. table_def = tdef;
  351. Ret = config_db_by_struct(do_config);
  352. if (Ret != 0) {
  353. return (-1);
  354. }
  355. Ret = machine_init(GroupID, slave);
  356. if (Ret != 0) {
  357. return (-2);
  358. }
  359. return (0);
  360. }
  361. void ConnectorProcess::init_sql_buffer(void)
  362. {
  363. sql.clear();
  364. error_no = 0;
  365. }
  366. void ConnectorProcess::sql_append_string(const char *str, int len)
  367. {
  368. if (len == 0)
  369. len = strlen(str);
  370. if (sql.append(str, len) < 0) {
  371. error_no = -1;
  372. log4cplus_error("sql.append() error: %d, %m", sql.needed());
  373. }
  374. }
  375. /* 将字符串printf在原来字符串的后面,如果buffer不够大会自动重新分配buffer */
  376. void ConnectorProcess::sql_printf(const char *Format, ...)
  377. {
  378. va_list Arg;
  379. int Len;
  380. va_start(Arg, Format);
  381. Len = sql.vbprintf(Format, Arg);
  382. va_end(Arg);
  383. if (Len < 0) {
  384. error_no = -1;
  385. log4cplus_error("vsnprintf error: %d, %m", Len);
  386. }
  387. }
  388. void ConnectorProcess::sql_append_table(void)
  389. {
  390. sql_append_string(&left_quote, 1);
  391. sql_append_string(table_name);
  392. sql_append_string(&right_quote, 1);
  393. }
  394. void ConnectorProcess::sql_append_field(int fid)
  395. {
  396. sql_append_string(&left_quote, 1);
  397. sql_append_string(table_def->field_name(fid));
  398. sql_append_string(&right_quote, 1);
  399. }
  400. void ConnectorProcess::sql_append_comparator(uint8_t op)
  401. {
  402. // order is important
  403. static const char *const CompStr[] = { "=", "!=", "<", "<=", ">", ">=" };
  404. if (op >= DField::TotalComparison) {
  405. error_no = -1;
  406. log4cplus_error("unknow op: %d", op);
  407. } else {
  408. sql_append_string(CompStr[op]);
  409. }
  410. }
  411. void ConnectorProcess::init_table_name(const DTCValue *Key, int field_type)
  412. {
  413. log4cplus_info("line:%d" ,__LINE__);
  414. int dbid = 0, tableid = 0;
  415. uint64_t n;
  416. double f;
  417. if (Key != NULL && dbConfig->depoly != 0) {
  418. switch (field_type) {
  419. case DField::Signed:
  420. if (dbConfig->keyHashConfig.keyHashEnable) {
  421. n = dbConfig->keyHashConfig.keyHashFunction(
  422. (const char *)&(Key->s64),
  423. sizeof(Key->s64),
  424. dbConfig->keyHashConfig.keyHashLeftBegin,
  425. dbConfig->keyHashConfig
  426. .keyHashRightBegin);
  427. } else {
  428. if (Key->s64 >= 0)
  429. n = Key->s64;
  430. else if (Key->s64 == LONG_LONG_MIN)
  431. n = 0;
  432. else
  433. n = 0 - Key->s64;
  434. }
  435. log4cplus_info("div:%d , mod:%d" , dbConfig->dbDiv , dbConfig->dbMod);
  436. dbid = (n / dbConfig->dbDiv) % dbConfig->dbMod;
  437. tableid = (n / dbConfig->tblDiv) % dbConfig->tblMod;
  438. break;
  439. case DField::Unsigned:
  440. if (dbConfig->keyHashConfig.keyHashEnable) {
  441. n = dbConfig->keyHashConfig.keyHashFunction(
  442. (const char *)&(Key->u64),
  443. sizeof(Key->u64),
  444. dbConfig->keyHashConfig.keyHashLeftBegin,
  445. dbConfig->keyHashConfig
  446. .keyHashRightBegin);
  447. } else {
  448. n = Key->u64;
  449. }
  450. dbid = (n / dbConfig->dbDiv) % dbConfig->dbMod;
  451. tableid = (n / dbConfig->tblDiv) % dbConfig->tblMod;
  452. break;
  453. case DField::Float:
  454. if (dbConfig->keyHashConfig.keyHashEnable) {
  455. n = dbConfig->keyHashConfig.keyHashFunction(
  456. (const char *)&(Key->flt),
  457. sizeof(Key->flt),
  458. dbConfig->keyHashConfig.keyHashLeftBegin,
  459. dbConfig->keyHashConfig
  460. .keyHashRightBegin);
  461. dbid = (n / dbConfig->dbDiv) % dbConfig->dbMod;
  462. tableid = (n / dbConfig->tblDiv) %
  463. dbConfig->tblMod;
  464. } else {
  465. if (Key->flt >= 0)
  466. f = Key->flt;
  467. else
  468. f = 0 - Key->flt;
  469. dbid = ((int)(f / dbConfig->dbDiv)) %
  470. dbConfig->dbMod;
  471. tableid = ((int)(f / dbConfig->tblDiv)) %
  472. dbConfig->tblMod;
  473. }
  474. break;
  475. case DField::String:
  476. case DField::Binary:
  477. if (dbConfig->keyHashConfig.keyHashEnable) {
  478. n = dbConfig->keyHashConfig.keyHashFunction(
  479. Key->bin.ptr, Key->bin.len,
  480. dbConfig->keyHashConfig.keyHashLeftBegin,
  481. dbConfig->keyHashConfig
  482. .keyHashRightBegin);
  483. dbid = (n / dbConfig->dbDiv) % dbConfig->dbMod;
  484. tableid = (n / dbConfig->tblDiv) %
  485. dbConfig->tblMod;
  486. }
  487. break;
  488. }
  489. }
  490. log4cplus_info("line:%d" ,__LINE__);
  491. snprintf(DBName, sizeof(DBName), dbConfig->dbFormat, dbid);
  492. snprintf(table_name, sizeof(table_name), dbConfig->tblFormat, tableid);
  493. log4cplus_info("DBName:%s , table_name:%s" ,DBName , table_name);
  494. }
  495. int ConnectorProcess::select_field_concate(const DTCFieldSet *fs)
  496. {
  497. if (fs == NULL) {
  498. sql_append_const("COUNT(*)");
  499. } else {
  500. int i = 0;
  501. uint8_t mask[32];
  502. FIELD_ZERO(mask);
  503. fs->build_field_mask(mask);
  504. sql_append_field(0); // key
  505. for (i = 1; i < table_def->num_fields() + 1; i++) {
  506. sql_append_const(",");
  507. if (FIELD_ISSET(i, mask) == 0) {
  508. /* Missing field as 0 */
  509. sql_append_const("0");
  510. } else if (table_def->is_volatile(i) == 0) {
  511. sql_append_field(i);
  512. } else {
  513. // volatile field initialized as default value
  514. format_sql_value(table_def->default_value(i),
  515. table_def->field_type(i));
  516. }
  517. }
  518. }
  519. return 0;
  520. }
  521. std::string ConnectorProcess::value_to_str(const DTCValue *v, int fieldType)
  522. {
  523. if (v == NULL)
  524. return "NULL";
  525. char buf[32];
  526. std::string ret;
  527. switch (fieldType) {
  528. case DField::Signed:
  529. snprintf(buf, sizeof(buf), "%lld", (long long)v->s64);
  530. return buf;
  531. case DField::Unsigned:
  532. snprintf(buf, sizeof(buf), "%llu", (unsigned long long)v->u64);
  533. return buf;
  534. case DField::Float:
  535. snprintf(buf, sizeof(buf), "%f", v->flt);
  536. return buf;
  537. case DField::String:
  538. case DField::Binary:
  539. esc.clear();
  540. if (esc.expand(v->str.len * 2 + 1) < 0) {
  541. error_no = -1;
  542. log4cplus_error("realloc (size: %u) error: %m",
  543. v->str.len * 2 + 1);
  544. return "NULL";
  545. }
  546. db_conn.escape_string(esc.c_str(), v->str.ptr,
  547. v->str.len); // 先对字符串进行escape
  548. ret = '\'';
  549. ret += esc.c_str();
  550. ret += "\'";
  551. return ret;
  552. default:
  553. error_no = -1;
  554. log4cplus_error("unknown field type: %d", fieldType);
  555. return "UNKNOWN";
  556. }
  557. }
  558. inline int ConnectorProcess::format_sql_value(const DTCValue *Value,
  559. int iFieldType)
  560. {
  561. log4cplus_debug("format_sql_value iFieldType[%d]", iFieldType);
  562. if (Value == NULL) {
  563. sql_append_const("NULL");
  564. } else
  565. switch (iFieldType) {
  566. case DField::Signed:
  567. sql_printf("%lld", (long long)Value->s64);
  568. break;
  569. case DField::Unsigned:
  570. sql_printf("%llu", (unsigned long long)Value->u64);
  571. break;
  572. case DField::Float:
  573. sql_printf("'%f'", Value->flt);
  574. break;
  575. case DField::String:
  576. case DField::Binary:
  577. if (sql.append('\'') < 0)
  578. error_no = -1;
  579. if (!Value->str.is_empty()) {
  580. esc.clear();
  581. if (esc.expand(Value->str.len * 2 + 1) < 0) {
  582. error_no = -1;
  583. log4cplus_error(
  584. "realloc (size: %u) error: %m",
  585. Value->str.len * 2 + 1);
  586. //return(-1);
  587. return (0);
  588. }
  589. db_conn.escape_string(
  590. esc.c_str(), Value->str.ptr,
  591. Value->str.len); // 先对字符串进行escape
  592. if (sql.append(esc.c_str()) < 0)
  593. error_no = -1;
  594. }
  595. if (sql.append('\'') < 0)
  596. error_no = -1;
  597. break;
  598. default:;
  599. };
  600. return 0;
  601. }
  602. int ConnectorProcess::condition_concate(const DTCFieldValue *Condition)
  603. {
  604. int i;
  605. if (Condition == NULL)
  606. return (0);
  607. for (i = 0; i < Condition->num_fields(); i++) {
  608. if (table_def->is_volatile(i))
  609. return -1;
  610. sql_append_const(" AND ");
  611. sql_append_field(Condition->field_id(i));
  612. sql_append_comparator(Condition->field_operation(i));
  613. format_sql_value(Condition->field_value(i),
  614. Condition->field_type(i));
  615. }
  616. return 0;
  617. }
  618. inline int ConnectorProcess::set_default_value(int field_type, DTCValue &Value)
  619. {
  620. switch (field_type) {
  621. case DField::Signed:
  622. Value.s64 = 0;
  623. break;
  624. case DField::Unsigned:
  625. Value.u64 = 0;
  626. break;
  627. case DField::Float:
  628. Value.flt = 0.0;
  629. break;
  630. case DField::String:
  631. Value.str.len = 0;
  632. Value.str.ptr = 0;
  633. break;
  634. case DField::Binary:
  635. Value.bin.len = 0;
  636. Value.bin.ptr = 0;
  637. break;
  638. default:
  639. Value.s64 = 0;
  640. };
  641. return (0);
  642. }
  643. inline int ConnectorProcess::str_to_value(char *Str, int fieldid,
  644. int field_type, DTCValue &Value)
  645. {
  646. if (Str == NULL) {
  647. log4cplus_debug(
  648. "Str is NULL, field_type: %d. Check mysql table definition.",
  649. field_type);
  650. set_default_value(field_type, Value);
  651. return (0);
  652. }
  653. switch (field_type) {
  654. case DField::Signed:
  655. errno = 0;
  656. Value.s64 = strtoll(Str, NULL, 10);
  657. if (errno != 0)
  658. return (-1);
  659. break;
  660. case DField::Unsigned:
  661. errno = 0;
  662. Value.u64 = strtoull(Str, NULL, 10);
  663. if (errno != 0)
  664. return (-1);
  665. break;
  666. case DField::Float:
  667. errno = 0;
  668. Value.flt = strtod(Str, NULL);
  669. if (errno != 0)
  670. return (-1);
  671. break;
  672. case DField::String:
  673. Value.str.len = _lengths[fieldid];
  674. Value.str.ptr =
  675. Str; // 不重新new,要等这个value使用完后释放内存(如果Str是动态分配的)
  676. break;
  677. case DField::Binary:
  678. Value.bin.len = _lengths[fieldid];
  679. Value.bin.ptr = Str;
  680. break;
  681. default:
  682. log4cplus_error("field[%d] type[%d] invalid.", fieldid,
  683. field_type);
  684. break;
  685. }
  686. return (0);
  687. }
  688. int ConnectorProcess::save_row(RowValue *Row, DtcJob *Task)
  689. {
  690. int i, Ret;
  691. if (table_def->num_fields() < 0)
  692. return (-1);
  693. for (i = 1; i <= table_def->num_fields(); i++) {
  694. //db_conn.Row[0]是key的值,table_def->Field[0]也是key,
  695. //因此从1开始。结果Row也是从1开始的(不包括key)
  696. Ret = str_to_value(db_conn.Row[i], i, table_def->field_type(i),
  697. (*Row)[i]);
  698. if (Ret != 0) {
  699. log4cplus_error(
  700. "string[%s] conver to value[%d] error: %d, %m",
  701. db_conn.Row[i], table_def->field_type(i), Ret);
  702. return (-2);
  703. }
  704. }
  705. Task->update_key(Row);
  706. Ret = Task->append_row(Row);
  707. if (Ret < 0) {
  708. return (-3);
  709. }
  710. return (0);
  711. }
  712. int ConnectorProcess::process_statement_query(
  713. const DTCValue* key,
  714. std::string& s_sql)
  715. {
  716. // hash 计算key落在哪库哪表
  717. init_table_name(key, table_def->field_type(0));
  718. log4cplus_debug("db: %s, sql: %s", DBName, s_sql.c_str());
  719. // 分表时,需更更换表名
  720. if (dbConfig->depoly&2) {
  721. const char* p_table_name = table_def->table_name();
  722. if (NULL == p_table_name) {
  723. return -1;
  724. }
  725. int i_pos = s_sql.find(p_table_name);
  726. if (i_pos != std::string::npos) {
  727. int i_table_name_len = strlen(p_table_name);
  728. s_sql.replace(i_pos , i_table_name_len , table_name);
  729. }
  730. }
  731. // 重新选库,并查询
  732. int i_ret = db_conn.do_query(DBName, s_sql.c_str());
  733. if (i_ret != 0) {
  734. int i_err = db_conn.get_err_no();
  735. if (i_err != -ER_DUP_ENTRY) {
  736. log4cplus_warning("db query error: %s",
  737. db_conn.get_err_msg());
  738. } else {
  739. log4cplus_info("db query error: %s",
  740. db_conn.get_err_msg());
  741. return -ER_DUP_ENTRY;
  742. }
  743. }
  744. return i_ret;
  745. }
  746. int ConnectorProcess::process_select(DtcJob *Task)
  747. {
  748. log4cplus_info("line:%d" ,__LINE__);
  749. int Ret, i;
  750. RowValue *Row = NULL;
  751. int nRows;
  752. int haslimit =
  753. !Task->count_only() && (Task->requestInfo.limit_start() ||
  754. Task->requestInfo.limit_count());
  755. log4cplus_info("line:%d" ,__LINE__);
  756. set_title("SELECT...");
  757. init_sql_buffer();
  758. log4cplus_info("line:%d" ,__LINE__);
  759. if (Task == NULL)
  760. {
  761. log4cplus_info("line:%d" ,__LINE__);
  762. return 0;
  763. }
  764. if (table_def == NULL)
  765. {
  766. log4cplus_info("line:%d" ,__LINE__);
  767. return 0;
  768. }
  769. log4cplus_info("line:%d" ,__LINE__);
  770. init_table_name(Task->request_key(), table_def->field_type(0));
  771. log4cplus_info("line:%d" ,__LINE__);
  772. if (haslimit)
  773. sql_append_const("SELECT SQL_CALC_FOUND_ROWS ");
  774. else
  775. sql_append_const("SELECT ");
  776. select_field_concate(Task->request_fields()); // 总是SELECT所有字段
  777. sql_append_const(" FROM ");
  778. sql_append_table();
  779. log4cplus_info("line:%d" ,__LINE__);
  780. // condition
  781. sql_append_const(" WHERE ");
  782. sql_append_field(0);
  783. sql_append_const("=");
  784. format_sql_value(Task->request_key(), table_def->field_type(0));
  785. log4cplus_info("line:%d" ,__LINE__);
  786. if (condition_concate(Task->request_condition()) != 0) {
  787. Task->set_error(-EC_BAD_COMMAND, __FUNCTION__,
  788. "Volatile condition not allowed");
  789. return (-7);
  790. }
  791. log4cplus_info("line:%d" ,__LINE__);
  792. if (dbConfig->ordSql) {
  793. sql_append_const(" ");
  794. sql_append_string(dbConfig->ordSql);
  795. }
  796. if (Task->requestInfo.limit_count() > 0) {
  797. sql_printf(" LIMIT %u, %u", Task->requestInfo.limit_start(),
  798. Task->requestInfo.limit_count());
  799. }
  800. log4cplus_info("line:%d" ,__LINE__);
  801. if (error_no !=
  802. 0) { // 主要检查PrintfAppend是否发生过错误,这里统一检查一次
  803. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
  804. log4cplus_error("error occur: %d", error_no);
  805. return (-1);
  806. }
  807. //bug fixed with count *
  808. Ret = Task->prepare_result_no_limit();
  809. if (Ret != 0) {
  810. Task->set_error(-EC_ERROR_BASE, __FUNCTION__,
  811. "task prepare-result error");
  812. log4cplus_error("task prepare-result error: %d, %m", Ret);
  813. return (-2);
  814. }
  815. if (!Task->count_only()) {
  816. Row = new RowValue(table_def);
  817. if (Row == NULL) {
  818. Task->set_error(-ENOMEM, __FUNCTION__, "new row error");
  819. log4cplus_error("%s new RowValue error: %m", "");
  820. return (-3);
  821. }
  822. }
  823. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  824. Ret = db_conn.do_query(DBName, sql.c_str());
  825. log4cplus_debug("SELECT %d %d", Ret, db_conn.get_raw_err_no());
  826. if (Ret != 0) {
  827. delete Row;
  828. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  829. db_conn.get_err_msg());
  830. log4cplus_warning("db query error: %s, pid: %d, group-id: %d",
  831. db_conn.get_err_msg(), getpid(),
  832. self_group_id);
  833. return (-4);
  834. }
  835. Ret = db_conn.use_result();
  836. if (Ret != 0) {
  837. delete Row;
  838. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  839. db_conn.get_err_msg());
  840. log4cplus_warning("db user result error: %s",
  841. db_conn.get_err_msg());
  842. return (-5);
  843. }
  844. nRows = db_conn.res_num;
  845. for (i = 0; i < db_conn.res_num; i++) {
  846. Ret = db_conn.fetch_row();
  847. if (Ret != 0) {
  848. delete Row;
  849. db_conn.free_result();
  850. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  851. db_conn.get_err_msg());
  852. log4cplus_warning("db fetch row error: %s",
  853. db_conn.get_err_msg());
  854. return (-6);
  855. }
  856. //get field value length for the row
  857. _lengths = 0;
  858. _lengths = db_conn.get_lengths();
  859. if (0 == _lengths) {
  860. delete Row;
  861. db_conn.free_result();
  862. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  863. db_conn.get_err_msg());
  864. log4cplus_warning("db fetch row length error: %s",
  865. db_conn.get_err_msg());
  866. return (-6);
  867. }
  868. // 将结果转换,并保存到task的result里
  869. if (Task->count_only()) {
  870. nRows = atoi(db_conn.Row[0]);
  871. //bug fixed return count *
  872. Task->set_total_rows(nRows);
  873. break;
  874. } else if ((Ret = save_row(Row, Task)) != 0) {
  875. delete Row;
  876. db_conn.free_result();
  877. Task->set_error(-EC_ERROR_BASE, __FUNCTION__,
  878. "task append row error");
  879. log4cplus_error("task append row error: %d", Ret);
  880. return (-7);
  881. }
  882. }
  883. log4cplus_debug(
  884. "pid: %d, group-id: %d, result: %d row, db: %s, sql: %s",
  885. getpid(), self_group_id, nRows, DBName, sql.c_str());
  886. delete Row;
  887. db_conn.free_result();
  888. //bug fixed确认客户端带Limit限制
  889. if (haslimit) { // 获取总行数
  890. init_sql_buffer();
  891. sql_append_const("SELECT FOUND_ROWS() ");
  892. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  893. Ret = db_conn.do_query(DBName, sql.c_str());
  894. log4cplus_debug("SELECT %d %d", Ret, db_conn.get_raw_err_no());
  895. if (Ret != 0) {
  896. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  897. db_conn.get_err_msg());
  898. log4cplus_warning(
  899. "db query error: %s, pid: %d, group-id: %d",
  900. db_conn.get_err_msg(), getpid(), self_group_id);
  901. return (-4);
  902. }
  903. Ret = db_conn.use_result();
  904. if (Ret != 0) {
  905. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  906. db_conn.get_err_msg());
  907. log4cplus_warning("db user result error: %s",
  908. db_conn.get_err_msg());
  909. return (-5);
  910. }
  911. Ret = db_conn.fetch_row();
  912. if (Ret != 0) {
  913. db_conn.free_result();
  914. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  915. db_conn.get_err_msg());
  916. log4cplus_warning("db fetch row error: %s",
  917. db_conn.get_err_msg());
  918. return (-6);
  919. }
  920. unsigned long totalRows = strtoul(db_conn.Row[0], NULL, 0);
  921. if (totalRows == 0) {
  922. if (nRows != 0)
  923. totalRows =
  924. Task->requestInfo.limit_start() + nRows;
  925. else
  926. totalRows = 0;
  927. }
  928. Ret = Task->set_total_rows(totalRows, 1);
  929. log4cplus_debug("db: total-rows: %lu, ret: %d", totalRows, Ret);
  930. db_conn.free_result();
  931. }
  932. return (0);
  933. }
  934. int ConnectorProcess::update_field_concate(const DTCFieldValue *UpdateInfo)
  935. {
  936. int i;
  937. if (UpdateInfo == NULL)
  938. return (0);
  939. for (i = 0; i < UpdateInfo->num_fields(); i++) {
  940. const int fid = UpdateInfo->field_id(i);
  941. if (table_def->is_volatile(fid))
  942. continue;
  943. switch (UpdateInfo->field_operation(i)) {
  944. case DField::Set:
  945. if (i > 0)
  946. sql_append_const(",");
  947. sql_append_field(fid);
  948. sql_append_const("=");
  949. format_sql_value(UpdateInfo->field_value(i),
  950. UpdateInfo->field_type(i));
  951. break;
  952. case DField::Add:
  953. if (i > 0)
  954. sql_append_const(",");
  955. sql_append_field(fid);
  956. sql_append_const("=");
  957. sql_append_field(fid);
  958. sql_append_const("+");
  959. format_sql_value(UpdateInfo->field_value(i),
  960. UpdateInfo->field_type(i));
  961. break;
  962. default:
  963. break;
  964. };
  965. }
  966. return 0;
  967. }
  968. int ConnectorProcess::default_value_concate(const DTCFieldValue *UpdateInfo)
  969. {
  970. int i;
  971. uint8_t mask[32];
  972. FIELD_ZERO(mask);
  973. if (UpdateInfo)
  974. UpdateInfo->build_field_mask(mask);
  975. for (i = 1; i <= table_def->num_fields(); i++) {
  976. if (FIELD_ISSET(i, mask) || table_def->is_volatile(i))
  977. continue;
  978. sql_append_const(",");
  979. sql_append_field(i);
  980. sql_append_const("=");
  981. format_sql_value(table_def->default_value(i),
  982. table_def->field_type(i));
  983. }
  984. return 0;
  985. }
  986. int ConnectorProcess::process_insert(DtcJob *Task)
  987. {
  988. int Ret;
  989. set_title("INSERT...");
  990. init_sql_buffer();
  991. init_table_name(Task->request_key(), table_def->field_type(0));
  992. sql_append_const("INSERT INTO ");
  993. sql_append_table();
  994. sql_append_const(" SET ");
  995. std::map<std::string, std::string> fieldValues;
  996. if (Task->request_key()) {
  997. fieldValues[table_def->field_name(0)] = value_to_str(
  998. Task->request_key(), table_def->field_type(0));
  999. }
  1000. if (Task->request_operation()) {
  1001. const DTCFieldValue *updateInfo = Task->request_operation();
  1002. for (int i = 0; i < updateInfo->num_fields(); ++i) {
  1003. int fid = updateInfo->field_id(i);
  1004. if (table_def->is_volatile(fid))
  1005. continue;
  1006. fieldValues[table_def->field_name(fid)] =
  1007. value_to_str(updateInfo->field_value(i),
  1008. updateInfo->field_type(i));
  1009. }
  1010. }
  1011. for (int i = 1; i <= table_def->num_fields(); ++i) {
  1012. if (table_def->is_volatile(i))
  1013. continue;
  1014. if (fieldValues.find(table_def->field_name(i)) !=
  1015. fieldValues.end())
  1016. continue;
  1017. fieldValues[table_def->field_name(i)] = value_to_str(
  1018. table_def->default_value(i), table_def->field_type(i));
  1019. }
  1020. for (std::map<std::string, std::string>::iterator iter =
  1021. fieldValues.begin();
  1022. iter != fieldValues.end(); ++iter) {
  1023. sql_append_string(&left_quote, 1);
  1024. sql_append_string(iter->first.c_str(), iter->first.length());
  1025. sql_append_string(&right_quote, 1);
  1026. sql_append_const("=");
  1027. sql_append_string(iter->second.c_str(), iter->second.length());
  1028. sql_append_const(",");
  1029. }
  1030. if (sql.at(-1) == ',')
  1031. sql.trunc(-1);
  1032. if (error_no != 0) { // 主要检查PrintfAppend是否发生过错误
  1033. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
  1034. log4cplus_error("error occur: %d", error_no);
  1035. return (-1);
  1036. }
  1037. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1038. Ret = db_conn.do_query(DBName, sql.c_str());
  1039. log4cplus_debug("INSERT %d %d", Ret, db_conn.get_raw_err_no());
  1040. if (Ret != 0) {
  1041. int err = db_conn.get_err_no();
  1042. Task->set_error_dup(err, __FUNCTION__, db_conn.get_err_msg());
  1043. if (err != -ER_DUP_ENTRY)
  1044. log4cplus_warning("db query error: %s",
  1045. db_conn.get_err_msg());
  1046. else
  1047. log4cplus_info("db query error: %s",
  1048. db_conn.get_err_msg());
  1049. return (-1);
  1050. }
  1051. Task->resultInfo.set_affected_rows(db_conn.affected_rows());
  1052. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1053. if (table_def->has_auto_increment()) {
  1054. uint64_t id = db_conn.insert_id();
  1055. if (id) {
  1056. Task->resultInfo.set_insert_id(id);
  1057. if (table_def->key_auto_increment())
  1058. Task->resultInfo.set_key(id);
  1059. }
  1060. }
  1061. return (0);
  1062. }
  1063. int ConnectorProcess::process_update(DtcJob *Task)
  1064. {
  1065. int Ret;
  1066. if (Task->request_operation() == NULL) {
  1067. Task->set_error(-EC_ERROR_BASE, __FUNCTION__,
  1068. "update field not found");
  1069. return (-1);
  1070. }
  1071. if (Task->request_operation()->has_type_commit() == 0) {
  1072. // pure volatile fields update, always succeed
  1073. return (0);
  1074. }
  1075. set_title("UPDATE...");
  1076. init_sql_buffer();
  1077. init_table_name(Task->request_key(), table_def->field_type(0));
  1078. sql_append_const("UPDATE ");
  1079. sql_append_table();
  1080. sql_append_const(" SET ");
  1081. update_field_concate(Task->request_operation());
  1082. // key
  1083. sql_append_const(" WHERE ");
  1084. sql_append_field(0);
  1085. sql_append_const("=");
  1086. format_sql_value(Task->request_key(), table_def->field_type(0));
  1087. // condition
  1088. if (condition_concate(Task->request_condition()) != 0) {
  1089. Task->set_error(-EC_BAD_COMMAND, __FUNCTION__,
  1090. "Volatile condition not allowed");
  1091. return (-7);
  1092. }
  1093. if (error_no != 0) { // 主要检查PrintfAppend是否发生过错误
  1094. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
  1095. log4cplus_error("error occur: %d", error_no);
  1096. return (-1);
  1097. }
  1098. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1099. Ret = db_conn.do_query(DBName, sql.c_str());
  1100. log4cplus_debug("UPDATE %d %d", Ret, db_conn.get_raw_err_no());
  1101. if (Ret != 0) {
  1102. int err = db_conn.get_err_no();
  1103. Task->set_error_dup(err, __FUNCTION__, db_conn.get_err_msg());
  1104. if (err != -ER_DUP_ENTRY)
  1105. log4cplus_warning("db query error: %s",
  1106. db_conn.get_err_msg());
  1107. else
  1108. log4cplus_info("db query error: %s",
  1109. db_conn.get_err_msg());
  1110. return -1;
  1111. }
  1112. Task->resultInfo.set_affected_rows(db_conn.affected_rows());
  1113. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1114. return (0);
  1115. }
  1116. int ConnectorProcess::process_delete(DtcJob *Task)
  1117. {
  1118. int Ret;
  1119. set_title("DELETE...");
  1120. init_sql_buffer();
  1121. init_table_name(Task->request_key(), table_def->field_type(0));
  1122. sql_append_const("DELETE FROM ");
  1123. sql_append_table();
  1124. // key
  1125. sql_append_const(" WHERE ");
  1126. sql_append_field(0);
  1127. sql_append_const("=");
  1128. format_sql_value(Task->request_key(), table_def->field_type(0));
  1129. // condition
  1130. if (condition_concate(Task->request_condition()) != 0) {
  1131. Task->set_error(-EC_BAD_COMMAND, __FUNCTION__,
  1132. "Volatile condition not allowed");
  1133. return (-7);
  1134. }
  1135. if (error_no !=
  1136. 0) { // 主要检查PrintfAppend是否发生过错误,这里统一检查一次
  1137. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
  1138. log4cplus_error("error occur: %d", error_no);
  1139. return (-1);
  1140. }
  1141. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1142. Ret = db_conn.do_query(DBName, sql.c_str());
  1143. log4cplus_debug("DELETE %d %d", Ret, db_conn.get_raw_err_no());
  1144. if (Ret != 0) {
  1145. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  1146. db_conn.get_err_msg());
  1147. log4cplus_warning("db query error: %s", db_conn.get_err_msg());
  1148. return (-1);
  1149. }
  1150. Task->resultInfo.set_affected_rows(db_conn.affected_rows());
  1151. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1152. return (0);
  1153. }
  1154. int ConnectorProcess::do_process(DtcJob* Task)
  1155. {
  1156. log4cplus_info("line:%d" ,__LINE__);
  1157. if (Task == NULL) {
  1158. log4cplus_error("Task is NULL!%s", "");
  1159. return (-1);
  1160. }
  1161. log4cplus_info("line:%d" ,__LINE__);
  1162. table_def = TableDefinitionManager::instance()->get_cur_table_def();
  1163. switch (Task->request_code()) {
  1164. case DRequest::TYPE_PASS:
  1165. case DRequest::Purge:
  1166. case DRequest::Flush:
  1167. return 0;
  1168. case DRequest::Get:
  1169. return process_select(Task);
  1170. case DRequest::Insert:
  1171. return process_insert(Task);
  1172. case DRequest::Update:
  1173. return process_update(Task);
  1174. case DRequest::Delete:
  1175. return process_delete(Task);
  1176. case DRequest::Replace:
  1177. return process_replace(Task);
  1178. // case DRequest::ReloadConfig:
  1179. // return process_reload_config(Task);
  1180. default:
  1181. Task->set_error(-EC_BAD_COMMAND, __FUNCTION__,
  1182. "invalid request-code");
  1183. return (-1);
  1184. }
  1185. }
  1186. int ConnectorProcess::process_replace(DtcJob *Task)
  1187. {
  1188. int Ret;
  1189. set_title("REPLACE...");
  1190. init_sql_buffer();
  1191. init_table_name(Task->request_key(), table_def->field_type(0));
  1192. sql_append_const("REPLACE INTO ");
  1193. sql_append_table();
  1194. sql_append_const(" SET ");
  1195. sql_append_field(0);
  1196. sql_append_const("=");
  1197. format_sql_value(Task->request_key(), table_def->field_type(0));
  1198. sql_append_const(",");
  1199. /* 补全缺失的默认值 */
  1200. if (Task->request_operation())
  1201. update_field_concate(Task->request_operation());
  1202. else if (sql.at(-1) == ',') {
  1203. sql.trunc(-1);
  1204. }
  1205. default_value_concate(Task->request_operation());
  1206. if (error_no != 0) { // 主要检查PrintfAppend是否发生过错误
  1207. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
  1208. log4cplus_error("error occur: %d", error_no);
  1209. return (-1);
  1210. }
  1211. if (error_no != 0) { // 主要检查PrintfAppend是否发生过错误
  1212. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
  1213. log4cplus_error("error occur: %d", error_no);
  1214. return (-1);
  1215. }
  1216. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1217. Ret = db_conn.do_query(DBName, sql.c_str());
  1218. log4cplus_debug("REPLACE %d %d", Ret, db_conn.get_raw_err_no());
  1219. if (Ret != 0) {
  1220. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  1221. db_conn.get_err_msg());
  1222. log4cplus_warning("db query error: %s", db_conn.get_err_msg());
  1223. return (-1);
  1224. }
  1225. Task->resultInfo.set_affected_rows(db_conn.affected_rows());
  1226. log4cplus_debug("%s",
  1227. "ConnectorProcess::ProcessReplaceTask() successful.");
  1228. return 0;
  1229. }
  1230. ConnectorProcess::~ConnectorProcess()
  1231. {
  1232. }
  1233. void ConnectorProcess::init_title(int group, int role)
  1234. {
  1235. title_prefix_size = snprintf(name, sizeof(name), "connector%d%c", group,
  1236. MACHINEROLESTRING[role]);
  1237. memcpy(title, name, title_prefix_size);
  1238. title[title_prefix_size++] = ':';
  1239. title[title_prefix_size++] = ' ';
  1240. title[title_prefix_size] = '\0';
  1241. title[sizeof(title) - 1] = '\0';
  1242. }
  1243. void ConnectorProcess::set_title(const char *status)
  1244. {
  1245. strncpy(title + title_prefix_size, status,
  1246. sizeof(title) - 1 - title_prefix_size);
  1247. set_proc_title(title);
  1248. }
  1249. int ConnectorProcess::process_reload_config(DtcJob *Task)
  1250. {
  1251. int cacheKey = DbConfig::get_shm_id(g_dtc_config->get_config_node());;
  1252. BlockProperties stInfo;
  1253. BufferPond cachePool;
  1254. memset(&stInfo, 0, sizeof(stInfo));
  1255. stInfo.ipc_mem_key = cacheKey;
  1256. stInfo.key_size = TableDefinitionManager::instance()
  1257. ->get_cur_table_def()
  1258. ->key_format();
  1259. stInfo.read_only = 1;
  1260. if (cachePool.cache_open(&stInfo)) {
  1261. log4cplus_error("%s", cachePool.error());
  1262. Task->set_error(-EC_RELOAD_CONFIG_FAILED, __FUNCTION__,
  1263. "open cache error!");
  1264. return -1;
  1265. }
  1266. cachePool.reload_table();
  1267. log4cplus_error(
  1268. "cmd notify work helper reload table, tableIdx : [%d], pid : [%d]",
  1269. cachePool.shm_table_idx(), getpid());
  1270. return 0;
  1271. }