mysql_operation.cc 49 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. */
  17. #include <stdio.h>
  18. #include <stdlib.h>
  19. #include <string.h>
  20. #include <stdarg.h>
  21. #include <limits.h>
  22. #include <errno.h>
  23. #include <unistd.h>
  24. #include <netinet/in.h>
  25. #include <arpa/inet.h>
  26. #include <map>
  27. #include <string>
  28. #include <sstream>
  29. #include <string>
  30. #include <iostream>
  31. // local include files
  32. #include "mysql_operation.h"
  33. // common include files
  34. #include "protocol.h"
  35. #include "log/log.h"
  36. #include "proc_title.h"
  37. #include "table/table_def_manager.h"
  38. #include "daemon/daemon.h"
  39. // mysql include files
  40. #include "mysqld_error.h"
  41. // core include files
  42. #include "buffer/buffer_pond.h"
  43. #define MIN(x, y) ((x) <= (y) ? (x) : (y))
  44. ConnectorProcess::ConnectorProcess() : _lengths(0)
  45. {
  46. error_no = 0;
  47. left_quote = '`';
  48. right_quote = '`';
  49. title_prefix_size = 0;
  50. time(&last_access);
  51. ping_timeout = 9;
  52. proc_timeout = 0;
  53. strncpy(name, "helper", 6);
  54. }
  55. int ConnectorProcess::try_ping(void)
  56. {
  57. return db_conn.do_ping();
  58. }
  59. void ConnectorProcess::init_ping_timeout(void)
  60. {
  61. int64_t to = db_conn.get_variable("wait_timeout");
  62. log4cplus_debug("Server idle timeout %lld", (long long)to);
  63. if (to < 10)
  64. to = 10;
  65. else if (to > 600)
  66. to = 600;
  67. ping_timeout = to * 9 / 10;
  68. }
  69. int ConnectorProcess::config_db_by_struct(const DbConfig *cf)
  70. {
  71. if (cf == NULL)
  72. return -1;
  73. dbConfig = cf;
  74. return (0);
  75. }
  76. static char HiddenMysqlFields[][32] = {
  77. "id",
  78. "invisible_time"
  79. };
  80. #define DIM(a) (sizeof(a) / sizeof(a[0]))
  81. typedef struct {
  82. char m_szName[256];
  83. int m_iType;
  84. int m_uiSize;
  85. int CheckSizeInInteger(int size) {
  86. if (DField::Signed == m_iType ||
  87. DField::Unsigned == m_iType) {
  88. return (m_uiSize == size) ? 0 : -1;
  89. }
  90. return -2;
  91. };
  92. } CMysqlField , CDtcField;
  93. static CMysqlField astField[] = { { "tinyint", 1, 1 },
  94. { "smallint", 1, 2 },
  95. { "mediumint", 1, 4 },
  96. { "int", 1, 4 },
  97. { "bigint", 1, 8 },
  98. { "float", 3, 4 },
  99. { "double", 3, 8 },
  100. { "decimal", 3, 8 },
  101. { "datetime", 4, 20 },
  102. { "date", 4, 11 },
  103. { "timestamp", 4, 20 },
  104. { "time", 4, 11 },
  105. { "year", 4, 5 },
  106. { "varchar", 4, 255 },
  107. { "char", 4, 255 },
  108. { "varbinary", 5, 255 },
  109. { "binary", 5, 255 },
  110. { "tinyblob", 5, 255 },
  111. { "tinytext", 4, 255 },
  112. { "blob", 5, 65535 },
  113. { "text", 4, 65535 },
  114. { "mediumblob", 5, 16777215 },
  115. { "mediumtext", 4, 16777215 },
  116. { "longblob", 5, 4294967295U },
  117. { "longtext", 4, 4294967295U },
  118. { "enum", 4, 255 },
  119. { "set", 2, 8 } };
  120. /**
  121. * when m_iType is Signed or Unsigned , m_uiSize is useful
  122. * but when m_iType is Float , String or Binary , m_uiSize is workless
  123. */
  124. static CDtcField dtcFieldTab[] = {
  125. {"tinyint" , DField::Signed , 1},
  126. {"smallint" , DField::Signed , 2},
  127. {"mediumint" , DField::Signed , 3},
  128. {"int" , DField::Signed , 4},
  129. {"bigint" , DField::Signed , 8},
  130. {"float" , DField::Float , 4},
  131. {"varchar" , DField::String , 65535U},
  132. {"varchar" , DField::Binary , 65535U}
  133. };
  134. static int get_field_type(
  135. std::string& szType,
  136. int i_type,
  137. unsigned int ui_size)
  138. {
  139. int i = 0;
  140. for (; i < DIM(dtcFieldTab); i++) {
  141. int i_tmp_type = (DField::Unsigned == i_type) ? DField::Signed : i_type;
  142. if (dtcFieldTab[i].m_iType == i_tmp_type) {
  143. int i_ret = dtcFieldTab[i].CheckSizeInInteger(ui_size);
  144. if (i_ret == 0 || i_ret == -2) {
  145. szType = dtcFieldTab[i].m_szName;
  146. break;
  147. }
  148. }
  149. }
  150. if (i >= DIM(dtcFieldTab)) {
  151. log4cplus_error("dtc-yaml config field info has no responding mysql type, dtc type:%d , type size:%d",
  152. i_type , ui_size);
  153. return -1;
  154. }
  155. switch (i_type)
  156. {
  157. case DField::Unsigned:
  158. {
  159. szType.append(" UNSIGNED ");
  160. }
  161. break;
  162. case DField::String:
  163. case DField::Binary:
  164. {
  165. std::stringstream s_temp;
  166. s_temp << "(";
  167. s_temp << ui_size;
  168. s_temp << ")";
  169. szType.append(s_temp.str());
  170. }
  171. break;
  172. default:
  173. break;
  174. }
  175. return 0;
  176. }
  177. static int get_field_type(const char *szType, int &i_type,
  178. unsigned int &ui_size)
  179. {
  180. unsigned int i;
  181. int iTmp;
  182. for (i = 0; i < DIM(astField); i++) {
  183. if (strncasecmp(szType, astField[i].m_szName,
  184. strlen(astField[i].m_szName)) == 0) {
  185. i_type = astField[i].m_iType;
  186. ui_size = astField[i].m_uiSize;
  187. if (strncasecmp(szType, "varchar", 7) == 0) {
  188. if (sscanf(szType + 8, "%d", &iTmp) == 1)
  189. ui_size = iTmp;
  190. } else if (strncasecmp(szType, "char", 4) == 0) {
  191. if (sscanf(szType + 5, "%d", &iTmp) == 1)
  192. ui_size = iTmp;
  193. } else if (strncasecmp(szType, "varbinary", 9) == 0) {
  194. if (sscanf(szType + 10, "%d", &iTmp) == 1)
  195. ui_size = iTmp;
  196. } else if (strncasecmp(szType, "binary", 6) == 0) {
  197. if (sscanf(szType + 7, "%d", &iTmp) == 1)
  198. ui_size = iTmp;
  199. }
  200. if (i_type == 1 && strstr(szType, "unsigned") != NULL)
  201. i_type = 2;
  202. if (i_type == 3 && strstr(szType, "unsigned") != NULL)
  203. fprintf(stderr,
  204. "#warning: dtc not support unsigned double!\n");
  205. break;
  206. }
  207. }
  208. return (0);
  209. }
  210. static
  211. int ConnectorProcess::create_tab_if_not_exist()
  212. {
  213. snprintf(table_name, sizeof(table_name), dbConfig->tblFormat, 0);
  214. init_sql_buffer();
  215. sql_append_const("create table if not exists `");
  216. sql_append_string(table_name);
  217. sql_append_const("`");
  218. sql_append_const("(");
  219. std::string s_unique_key("");
  220. for (int i = 0; i <= table_def->num_fields(); i++) {
  221. int field_id = table_def->field_id(table_def->field_name(i));
  222. log4cplus_debug("field name:%s , id:%d , type:%d , size:%d , default:%d" ,
  223. table_def->field_name(i) ,
  224. field_id,
  225. table_def->field_type(i),
  226. table_def->field_size(i),
  227. table_def->has_default(i));
  228. bool is_primary_key = false;
  229. uint8_t* uniq_fields = table_def->uniq_fields_list();
  230. for (int j = 0; j < table_def->uniq_fields(); j++) {
  231. if (uniq_fields[j] == field_id) {
  232. s_unique_key.append("`");
  233. s_unique_key.append(table_def->field_name(i));
  234. s_unique_key.append("`,");
  235. is_primary_key = true;
  236. break;
  237. }
  238. }
  239. sql_append_const("`");
  240. sql_append_string(table_def->field_name(i));
  241. sql_append_const("` ");
  242. std::string stype = "";
  243. if (get_field_type(stype , table_def->field_type(i)
  244. ,table_def->field_size(i))) {
  245. return -1;
  246. }
  247. sql_append_string(stype.c_str());
  248. if (is_primary_key || !table_def->is_nullable(i)) {
  249. sql_append_string(" NOT NULL ");
  250. }
  251. if (table_def->has_default(i)) {
  252. sql_append_string("default ");
  253. format_sql_value(table_def->default_value(i), table_def->field_type(i));
  254. }
  255. sql_append_const(",");
  256. }
  257. sql_append_string("`id` INT(11) NOT NULL AUTO_INCREMENT,");
  258. sql_append_string("`invisible_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,");
  259. if (!s_unique_key.empty()) {
  260. sql_append_string("UNIQUE INDEX (");
  261. sql_append_string(s_unique_key.c_str() , s_unique_key.length() - 1);
  262. sql_append_const("),");
  263. }
  264. sql_append_string("PRIMARY KEY (`id`)");
  265. sql_append_string(")ENGINE=InnoDB DEFAULT CHARSET=");
  266. sql_append_string(db_conn.GetCharacSet().c_str());
  267. snprintf(DBName, sizeof(DBName), dbConfig->dbFormat,
  268. dbConfig->mach[self_group_id].dbIdx[0]);
  269. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  270. int i_ret = db_conn.do_query(DBName, sql.c_str());
  271. log4cplus_debug("create table ret:%d %d", i_ret, db_conn.get_raw_err_no());
  272. if (i_ret != 0) {
  273. log4cplus_warning("db query error: %s, pid: %d, group-id: %d",
  274. db_conn.get_err_msg(), getpid(),
  275. self_group_id);
  276. return -1;
  277. }
  278. return 0;
  279. }
  280. int ConnectorProcess::check_table()
  281. {
  282. int Ret;
  283. int i;
  284. int i_field_num;
  285. char ach_field_name[256][256];
  286. snprintf(DBName, sizeof(DBName), dbConfig->dbFormat,
  287. dbConfig->mach[self_group_id].dbIdx[0]);
  288. snprintf(table_name, sizeof(table_name), dbConfig->tblFormat, 0);
  289. init_sql_buffer();
  290. sql_append_const("show columns from `");
  291. sql_append_string(table_name);
  292. sql_append_const("`");
  293. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  294. Ret = db_conn.do_query(DBName, sql.c_str());
  295. log4cplus_debug("SELECT %d %d", Ret, db_conn.get_raw_err_no());
  296. if (Ret != 0) {
  297. log4cplus_warning("db query error: %s, pid: %d, group-id: %d",
  298. db_conn.get_err_msg(), getpid(),
  299. self_group_id);
  300. return (-1);
  301. }
  302. Ret = db_conn.use_result();
  303. if (Ret != 0) {
  304. log4cplus_warning("db user result error: %s",
  305. db_conn.get_err_msg());
  306. return (-2);
  307. }
  308. // 获取返回结果的各列位置
  309. int i_name_idx = 0, i_type_idx = 0;
  310. int i_null_idx = 0, i_key_idx = 0;
  311. int i_default_idx = 0, i_extra_idx = 0;
  312. unsigned int ui_num_fields = mysql_num_fields(db_conn.Res);
  313. MYSQL_FIELD *pst_fields = mysql_fetch_fields(db_conn.Res);
  314. for (i = 0; i < (int)ui_num_fields; i++) {
  315. if (strcasecmp("Field", pst_fields[i].name) == 0)
  316. i_name_idx = i;
  317. else if (strcasecmp("Type", pst_fields[i].name) == 0)
  318. i_type_idx = i;
  319. else if (strcasecmp("Null", pst_fields[i].name) == 0)
  320. i_null_idx = i;
  321. else if (strcasecmp("Key", pst_fields[i].name) == 0)
  322. i_key_idx = i;
  323. else if (strcasecmp("Default", pst_fields[i].name) == 0)
  324. i_default_idx = i;
  325. else if (strcasecmp("Extra", pst_fields[i].name) == 0)
  326. i_extra_idx = i;
  327. }
  328. int iFid;
  329. i_field_num = 0;
  330. memset(ach_field_name, 0, sizeof(ach_field_name));
  331. int uniq_fields_cnt_table = table_def->uniq_fields();
  332. for (i = 0; i < db_conn.res_num; i++) {
  333. Ret = db_conn.fetch_row();
  334. if (Ret != 0) {
  335. db_conn.free_result();
  336. log4cplus_warning("db fetch row error: %s",
  337. db_conn.get_err_msg());
  338. return (-3);
  339. }
  340. strncpy(ach_field_name[i_field_num], db_conn.Row[i_name_idx],
  341. 255);
  342. i_field_num++;
  343. int j = 0;
  344. for (; j < DIM(HiddenMysqlFields); j++) {
  345. if (!strncmp(db_conn.Row[i_name_idx] , HiddenMysqlFields[j] , 31)) {
  346. log4cplus_info("field:%s is no need check" , HiddenMysqlFields[j]);
  347. break;
  348. }
  349. }
  350. if (j < DIM(HiddenMysqlFields)) { continue; }
  351. iFid = table_def->field_id(db_conn.Row[i_name_idx]);
  352. if (iFid == -1) {
  353. log4cplus_debug("field[%s] not found in table.yaml",
  354. db_conn.Row[i_name_idx]);
  355. continue;
  356. }
  357. if (table_def->is_volatile(iFid)) {
  358. log4cplus_error(
  359. "field[name: `%s`] found in table.yaml and DB both, can't be Volatile",
  360. db_conn.Row[i_name_idx]);
  361. db_conn.free_result();
  362. return (-4);
  363. }
  364. if (table_def->is_timestamp(iFid)) {
  365. log4cplus_error(
  366. "in table.yaml, Field[name: `%s`]'s is timestamp, not support in DB mode",
  367. db_conn.Row[i_name_idx]);
  368. db_conn.free_result();
  369. return (-4);
  370. }
  371. //field type & size
  372. int i_type = -1;
  373. unsigned ui_size = 0;
  374. get_field_type(db_conn.Row[i_type_idx], i_type, ui_size);
  375. if (i_type != table_def->field_type(iFid)) {
  376. log4cplus_error(
  377. "in table.yaml, Field[name: `%s`]'s type incorrect. conf: %d, mysql:%d",
  378. db_conn.Row[i_name_idx],
  379. table_def->field_type(iFid), i_type);
  380. db_conn.free_result();
  381. return (-4);
  382. }
  383. if ((int)ui_size != table_def->field_size(iFid) &&
  384. !(ui_size >= (64 << 20) &&
  385. table_def->field_size(iFid) >= (64 << 20))) {
  386. log4cplus_error(
  387. "in table.yaml, Field[name: `%s`]'s size incorrect. conf: %d, mysql:%u",
  388. db_conn.Row[i_name_idx],
  389. table_def->field_size(iFid), ui_size);
  390. db_conn.free_result();
  391. return (-4);
  392. }
  393. if (db_conn.Row[i_extra_idx] != NULL &&
  394. strcasecmp("auto_increment", db_conn.Row[i_extra_idx]) ==
  395. 0) {
  396. if (table_def->auto_increment_field_id() != iFid) {
  397. log4cplus_error(
  398. "in table.yaml, Field[name: `%s`]'s default-value incorrect. conf: non-auto_increment, mysql:auto_increment",
  399. db_conn.Row[i_name_idx]);
  400. db_conn.free_result();
  401. return (-4);
  402. }
  403. }
  404. /*field should be uniq in table.yaml if configed primary in db */
  405. uint8_t *uniq_fields = table_def->uniq_fields_list();
  406. if (db_conn.Row[i_key_idx] != NULL &&
  407. (strcasecmp("PRI", db_conn.Row[i_key_idx]) == 0 ||
  408. strcasecmp("UNI", db_conn.Row[i_key_idx]) == 0)) {
  409. int j = 0;
  410. for (j = 0; j < table_def->uniq_fields(); j++) {
  411. if (uniq_fields[j] == iFid)
  412. break;
  413. }
  414. if (j >= table_def->uniq_fields()) {
  415. log4cplus_error(
  416. "in table.yaml, Field[name: `%s`] is primary in db, but not uniq in dtc",
  417. db_conn.Row[i_name_idx]);
  418. return -4;
  419. }
  420. uniq_fields_cnt_table--;
  421. }
  422. }
  423. /*field should be primary in db if configed uniq in table.yaml*/
  424. if (uniq_fields_cnt_table != 0) {
  425. log4cplus_error(
  426. "table.yaml have more uniq fields that not configed as primary in db");
  427. return -4;
  428. }
  429. for (int i = 0; i <= table_def->num_fields(); i++) {
  430. //bug fix volatile不在db中
  431. if (table_def->is_volatile(i))
  432. continue;
  433. const char *name = table_def->field_name(i);
  434. int j;
  435. for (j = 0; j < i_field_num; j++) {
  436. if (strcmp(ach_field_name[j], name) == 0)
  437. break;
  438. }
  439. if (j >= i_field_num) {
  440. log4cplus_error(
  441. "in table.yaml, Field[name: `%s`] not found in mysql",
  442. name);
  443. db_conn.free_result();
  444. return (-4);
  445. }
  446. }
  447. log4cplus_debug(
  448. "pid: %d, group-id: %d check table success, db: %s, sql: %s",
  449. getpid(), self_group_id, DBName, sql.c_str());
  450. db_conn.free_result();
  451. return (0);
  452. }
  453. int ConnectorProcess::machine_init(int GroupID, int r)
  454. {
  455. const char *p;
  456. // 初始化db配置信息
  457. if (dbConfig->machineCnt <= GroupID) {
  458. log4cplus_error(
  459. "parse config error, machineCnt[%d] <= GroupID[%d]",
  460. dbConfig->machineCnt, GroupID);
  461. return (-3);
  462. }
  463. typeof(&dbConfig->mach[0].role[0]) role =
  464. &dbConfig->mach[GroupID].role[r];
  465. memset(&db_host_conf, 0, sizeof(DBHost));
  466. p = strrchr(role->addr, ':');
  467. if (p == NULL) {
  468. strncpy(db_host_conf.Host, role->addr,
  469. sizeof(db_host_conf.Host) - 1);
  470. db_host_conf.Port = 0;
  471. } else {
  472. strncpy(db_host_conf.Host, role->addr,
  473. MIN(p - role->addr,
  474. (int)sizeof(db_host_conf.Host) - 1));
  475. db_host_conf.Port = atoi(p + 1);
  476. }
  477. strncpy(db_host_conf.User, role->user, sizeof(db_host_conf.User) - 1);
  478. strncpy(db_host_conf.Password, role->pass,
  479. sizeof(db_host_conf.Password) - 1);
  480. db_host_conf.ConnTimeout = proc_timeout;
  481. strncpy(db_host_conf.OptionFile, role->optfile,
  482. sizeof(db_host_conf.OptionFile) - 1);
  483. db_conn.do_config(&db_host_conf);
  484. if (db_conn.Open() != 0) {
  485. log4cplus_warning("connect db[%s] error: %s", db_host_conf.Host,
  486. db_conn.get_err_msg());
  487. return (-6);
  488. }
  489. log4cplus_debug("group-id: %d, pid: %d, db: %s, user: %s, pwd: %s , client charac_set:%s",
  490. self_group_id, getpid(), db_host_conf.Host,
  491. db_host_conf.User, db_host_conf.Password,
  492. db_conn.GetCharacSet().c_str());
  493. return (0);
  494. }
  495. int ConnectorProcess::do_init(int GroupID, const DbConfig *do_config,
  496. DTCTableDefinition *tdef, int slave)
  497. {
  498. int Ret;
  499. self_group_id = GroupID;
  500. table_def = tdef;
  501. Ret = config_db_by_struct(do_config);
  502. if (Ret != 0) {
  503. return (-1);
  504. }
  505. Ret = machine_init(GroupID, slave);
  506. if (Ret != 0) {
  507. return (-2);
  508. }
  509. return (0);
  510. }
  511. void ConnectorProcess::init_sql_buffer(void)
  512. {
  513. sql.clear();
  514. error_no = 0;
  515. }
  516. void ConnectorProcess::sql_append_string(const char *str, int len)
  517. {
  518. if (len == 0)
  519. len = strlen(str);
  520. if (sql.append(str, len) < 0) {
  521. error_no = -1;
  522. log4cplus_error("sql.append() error: %d, %m", sql.needed());
  523. }
  524. }
  525. /* 将字符串printf在原来字符串的后面,如果buffer不够大会自动重新分配buffer */
  526. void ConnectorProcess::sql_printf(const char *Format, ...)
  527. {
  528. va_list Arg;
  529. int Len;
  530. va_start(Arg, Format);
  531. Len = sql.vbprintf(Format, Arg);
  532. va_end(Arg);
  533. if (Len < 0) {
  534. error_no = -1;
  535. log4cplus_error("vsnprintf error: %d, %m", Len);
  536. }
  537. }
  538. void ConnectorProcess::sql_append_table(void)
  539. {
  540. sql_append_string(&left_quote, 1);
  541. sql_append_string(table_name);
  542. sql_append_string(&right_quote, 1);
  543. }
  544. void ConnectorProcess::sql_append_field(int fid)
  545. {
  546. sql_append_string(&left_quote, 1);
  547. sql_append_string(table_def->field_name(fid));
  548. sql_append_string(&right_quote, 1);
  549. }
  550. void ConnectorProcess::sql_append_comparator(uint8_t op)
  551. {
  552. // order is important
  553. static const char *const CompStr[] = { "=", "!=", "<", "<=", ">", ">=" };
  554. if (op >= DField::TotalComparison) {
  555. error_no = -1;
  556. log4cplus_error("unknow op: %d", op);
  557. } else {
  558. sql_append_string(CompStr[op]);
  559. }
  560. }
  561. void ConnectorProcess::init_table_name(const DTCValue *Key, int field_type)
  562. {
  563. int dbid = 0, tableid = 0;
  564. uint64_t n;
  565. double f;
  566. if (Key != NULL && dbConfig->depoly != 0) {
  567. switch (field_type) {
  568. case DField::Signed:
  569. if (dbConfig->keyHashConfig.keyHashEnable) {
  570. n = dbConfig->keyHashConfig.keyHashFunction(
  571. (const char *)&(Key->s64),
  572. sizeof(Key->s64),
  573. dbConfig->keyHashConfig.keyHashLeftBegin,
  574. dbConfig->keyHashConfig
  575. .keyHashRightBegin);
  576. } else {
  577. if (Key->s64 >= 0)
  578. n = Key->s64;
  579. else if (Key->s64 == LONG_LONG_MIN)
  580. n = 0;
  581. else
  582. n = 0 - Key->s64;
  583. }
  584. log4cplus_info("div:%d , mod:%d" , dbConfig->dbDiv , dbConfig->dbMod);
  585. dbid = (n / dbConfig->dbDiv) % dbConfig->dbMod;
  586. tableid = (n / dbConfig->tblDiv) % dbConfig->tblMod;
  587. break;
  588. case DField::Unsigned:
  589. if (dbConfig->keyHashConfig.keyHashEnable) {
  590. n = dbConfig->keyHashConfig.keyHashFunction(
  591. (const char *)&(Key->u64),
  592. sizeof(Key->u64),
  593. dbConfig->keyHashConfig.keyHashLeftBegin,
  594. dbConfig->keyHashConfig
  595. .keyHashRightBegin);
  596. } else {
  597. n = Key->u64;
  598. }
  599. dbid = (n / dbConfig->dbDiv) % dbConfig->dbMod;
  600. tableid = (n / dbConfig->tblDiv) % dbConfig->tblMod;
  601. break;
  602. case DField::Float:
  603. if (dbConfig->keyHashConfig.keyHashEnable) {
  604. n = dbConfig->keyHashConfig.keyHashFunction(
  605. (const char *)&(Key->flt),
  606. sizeof(Key->flt),
  607. dbConfig->keyHashConfig.keyHashLeftBegin,
  608. dbConfig->keyHashConfig
  609. .keyHashRightBegin);
  610. dbid = (n / dbConfig->dbDiv) % dbConfig->dbMod;
  611. tableid = (n / dbConfig->tblDiv) %
  612. dbConfig->tblMod;
  613. } else {
  614. if (Key->flt >= 0)
  615. f = Key->flt;
  616. else
  617. f = 0 - Key->flt;
  618. dbid = ((int)(f / dbConfig->dbDiv)) %
  619. dbConfig->dbMod;
  620. tableid = ((int)(f / dbConfig->tblDiv)) %
  621. dbConfig->tblMod;
  622. }
  623. break;
  624. case DField::String:
  625. case DField::Binary:
  626. if (dbConfig->keyHashConfig.keyHashEnable) {
  627. n = dbConfig->keyHashConfig.keyHashFunction(
  628. Key->bin.ptr, Key->bin.len,
  629. dbConfig->keyHashConfig.keyHashLeftBegin,
  630. dbConfig->keyHashConfig
  631. .keyHashRightBegin);
  632. dbid = (n / dbConfig->dbDiv) % dbConfig->dbMod;
  633. tableid = (n / dbConfig->tblDiv) %
  634. dbConfig->tblMod;
  635. }
  636. break;
  637. }
  638. }
  639. snprintf(DBName, sizeof(DBName), dbConfig->dbFormat, dbid);
  640. snprintf(table_name, sizeof(table_name), dbConfig->tblFormat, tableid);
  641. log4cplus_debug("dbConfig->keyHashConfig.keyHashEnable:%d, left:%d, right:%d", dbConfig->keyHashConfig.keyHashEnable,
  642. dbConfig->keyHashConfig.keyHashLeftBegin, dbConfig->keyHashConfig.keyHashRightBegin);
  643. log4cplus_debug("key hash:%lld, %d, %d, %d, %d", n, dbConfig->dbDiv, dbConfig->dbMod, dbConfig->tblDiv, dbConfig->tblMod);
  644. log4cplus_info("DBName:%s , table_name:%s" ,DBName , table_name);
  645. }
  646. int ConnectorProcess::select_field_concate(const DTCFieldSet *fs)
  647. {
  648. if (fs == NULL) {
  649. sql_append_const("COUNT(*)");
  650. } else {
  651. int i = 0;
  652. uint8_t mask[32];
  653. FIELD_ZERO(mask);
  654. fs->build_field_mask(mask);
  655. sql_append_field(0); // key
  656. for (i = 1; i < table_def->num_fields() + 1; i++) {
  657. sql_append_const(",");
  658. if (FIELD_ISSET(i, mask) == 0) {
  659. /* Missing field as 0 */
  660. sql_append_const("0");
  661. } else if (table_def->is_volatile(i) == 0) {
  662. sql_append_field(i);
  663. } else {
  664. // volatile field initialized as default value
  665. format_sql_value(table_def->default_value(i),
  666. table_def->field_type(i));
  667. }
  668. }
  669. }
  670. return 0;
  671. }
  672. std::string ConnectorProcess::value_to_str(const DTCValue *v, int fieldType)
  673. {
  674. if (v == NULL)
  675. return "NULL";
  676. char buf[32];
  677. std::string ret;
  678. switch (fieldType) {
  679. case DField::Signed:
  680. snprintf(buf, sizeof(buf), "%lld", (long long)v->s64);
  681. return buf;
  682. case DField::Unsigned:
  683. snprintf(buf, sizeof(buf), "%llu", (unsigned long long)v->u64);
  684. return buf;
  685. case DField::Float:
  686. snprintf(buf, sizeof(buf), "%f", v->flt);
  687. return buf;
  688. case DField::String:
  689. case DField::Binary:
  690. esc.clear();
  691. if (esc.expand(v->str.len * 2 + 1) < 0) {
  692. error_no = -1;
  693. log4cplus_error("realloc (size: %u) error: %m",
  694. v->str.len * 2 + 1);
  695. return "NULL";
  696. }
  697. db_conn.escape_string(esc.c_str(), v->str.ptr,
  698. v->str.len); // 先对字符串进行escape
  699. ret = '\'';
  700. ret += v->str.ptr;
  701. ret += "\'";
  702. return ret;
  703. default:
  704. error_no = -1;
  705. log4cplus_error("unknown field type: %d", fieldType);
  706. return "UNKNOWN";
  707. }
  708. }
  709. inline int ConnectorProcess::format_sql_value(const DTCValue *Value,
  710. int iFieldType)
  711. {
  712. log4cplus_debug("format_sql_value iFieldType[%d]", iFieldType);
  713. if (Value == NULL) {
  714. sql_append_const("NULL");
  715. } else
  716. switch (iFieldType) {
  717. case DField::Signed:
  718. sql_printf("%lld", (long long)Value->s64);
  719. break;
  720. case DField::Unsigned:
  721. sql_printf("%llu", (unsigned long long)Value->u64);
  722. break;
  723. case DField::Float:
  724. sql_printf("'%f'", Value->flt);
  725. break;
  726. case DField::String:
  727. case DField::Binary:
  728. if (sql.append('\'') < 0)
  729. error_no = -1;
  730. if (!Value->str.is_empty()) {
  731. esc.clear();
  732. if (esc.expand(Value->str.len * 2 + 1) < 0) {
  733. error_no = -1;
  734. log4cplus_error(
  735. "realloc (size: %u) error: %m",
  736. Value->str.len * 2 + 1);
  737. //return(-1);
  738. return (0);
  739. }
  740. db_conn.escape_string(
  741. esc.c_str(), Value->str.ptr,
  742. Value->str.len); // 先对字符串进行escape
  743. if (sql.append(Value->str.ptr) < 0)
  744. error_no = -1;
  745. }
  746. if (sql.append('\'') < 0)
  747. error_no = -1;
  748. break;
  749. default:;
  750. };
  751. return 0;
  752. }
  753. int ConnectorProcess::condition_concate(const DTCFieldValue *Condition)
  754. {
  755. int i;
  756. if (Condition == NULL)
  757. return (0);
  758. for (i = 0; i < Condition->num_fields(); i++) {
  759. if (table_def->is_volatile(i))
  760. return -1;
  761. sql_append_const(" AND ");
  762. sql_append_field(Condition->field_id(i));
  763. sql_append_comparator(Condition->field_operation(i));
  764. format_sql_value(Condition->field_value(i),
  765. Condition->field_type(i));
  766. }
  767. return 0;
  768. }
  769. inline int ConnectorProcess::set_default_value(int field_type, DTCValue &Value)
  770. {
  771. switch (field_type) {
  772. case DField::Signed:
  773. Value.s64 = 0;
  774. break;
  775. case DField::Unsigned:
  776. Value.u64 = 0;
  777. break;
  778. case DField::Float:
  779. Value.flt = 0.0;
  780. break;
  781. case DField::String:
  782. Value.str.len = 0;
  783. Value.str.ptr = 0;
  784. break;
  785. case DField::Binary:
  786. Value.bin.len = 0;
  787. Value.bin.ptr = 0;
  788. break;
  789. default:
  790. Value.s64 = 0;
  791. };
  792. return (0);
  793. }
  794. inline int ConnectorProcess::str_to_value(char *Str, int fieldid,
  795. int field_type, DTCValue &Value)
  796. {
  797. if (Str == NULL) {
  798. log4cplus_debug(
  799. "Str is NULL, field_type: %d. Check mysql table definition.",
  800. field_type);
  801. set_default_value(field_type, Value);
  802. return (0);
  803. }
  804. switch (field_type) {
  805. case DField::Signed:
  806. errno = 0;
  807. Value.s64 = strtoll(Str, NULL, 10);
  808. if (errno != 0)
  809. return (-1);
  810. break;
  811. case DField::Unsigned:
  812. errno = 0;
  813. Value.u64 = strtoull(Str, NULL, 10);
  814. if (errno != 0)
  815. return (-1);
  816. break;
  817. case DField::Float:
  818. errno = 0;
  819. Value.flt = strtod(Str, NULL);
  820. if (errno != 0)
  821. return (-1);
  822. break;
  823. case DField::String:
  824. Value.str.len = _lengths[fieldid];
  825. Value.str.ptr =
  826. Str; // 不重新new,要等这个value使用完后释放内存(如果Str是动态分配的)
  827. break;
  828. case DField::Binary:
  829. Value.bin.len = _lengths[fieldid];
  830. Value.bin.ptr = Str;
  831. break;
  832. default:
  833. log4cplus_error("field[%d] type[%d] invalid.", fieldid,
  834. field_type);
  835. break;
  836. }
  837. return (0);
  838. }
  839. int ConnectorProcess::save_row(RowValue *Row, DtcJob *Task)
  840. {
  841. int i, Ret;
  842. if (table_def->num_fields() < 0)
  843. return (-1);
  844. for (i = 1; i <= table_def->num_fields(); i++) {
  845. //db_conn.Row[0]是key的值,table_def->Field[0]也是key,
  846. //因此从1开始。结果Row也是从1开始的(不包括key)
  847. Ret = str_to_value(db_conn.Row[i], i, table_def->field_type(i),
  848. (*Row)[i]);
  849. if (Ret != 0) {
  850. log4cplus_error(
  851. "string[%s] conver to value[%d] error: %d, %m",
  852. db_conn.Row[i], table_def->field_type(i), Ret);
  853. return (-2);
  854. }
  855. }
  856. Task->update_key(Row);
  857. Ret = Task->append_row(Row);
  858. if (Ret < 0) {
  859. return (-3);
  860. }
  861. return (0);
  862. }
  863. int ConnectorProcess::process_statement_query(
  864. const DTCValue* key,
  865. std::string& s_sql)
  866. {
  867. // hash 计算key落在哪库哪表
  868. init_table_name(key, table_def->field_type(0));
  869. log4cplus_debug("db: %s, sql: %s", DBName, s_sql.c_str());
  870. // 分表时,需更更换表名
  871. if (dbConfig->depoly&2) {
  872. const char* p_table_name = table_def->table_name();
  873. if (NULL == p_table_name) {
  874. return -1;
  875. }
  876. int i_pos = s_sql.find(p_table_name);
  877. if (i_pos != std::string::npos) {
  878. int i_table_name_len = strlen(p_table_name);
  879. s_sql.replace(i_pos , i_table_name_len , table_name);
  880. }
  881. }
  882. // 重新选库,并查询
  883. int i_ret = db_conn.do_query(DBName, s_sql.c_str());
  884. if (i_ret != 0) {
  885. int i_err = db_conn.get_err_no();
  886. if (i_err != -ER_DUP_ENTRY) {
  887. log4cplus_warning("db query error: %s",
  888. db_conn.get_err_msg());
  889. } else {
  890. log4cplus_info("db query error: %s",
  891. db_conn.get_err_msg());
  892. return -ER_DUP_ENTRY;
  893. }
  894. }
  895. return i_ret;
  896. }
  897. int ConnectorProcess::process_select(DtcJob *Task)
  898. {
  899. log4cplus_info("line:%d" ,__LINE__);
  900. int Ret, i;
  901. RowValue *Row = NULL;
  902. int nRows;
  903. int haslimit =
  904. !Task->count_only() && (Task->requestInfo.limit_start() ||
  905. Task->requestInfo.limit_count());
  906. log4cplus_info("line:%d" ,__LINE__);
  907. set_title("SELECT...");
  908. init_sql_buffer();
  909. log4cplus_info("line:%d" ,__LINE__);
  910. if (Task == NULL)
  911. {
  912. log4cplus_info("line:%d" ,__LINE__);
  913. return 0;
  914. }
  915. if (table_def == NULL)
  916. {
  917. log4cplus_info("line:%d" ,__LINE__);
  918. return 0;
  919. }
  920. log4cplus_info("line:%d" ,__LINE__);
  921. init_table_name(Task->request_key(), table_def->field_type(0));
  922. log4cplus_info("line:%d" ,__LINE__);
  923. if (haslimit)
  924. sql_append_const("SELECT SQL_CALC_FOUND_ROWS ");
  925. else
  926. sql_append_const("SELECT ");
  927. select_field_concate(Task->request_fields()); // 总是SELECT所有字段
  928. sql_append_const(" FROM ");
  929. sql_append_table();
  930. log4cplus_info("line:%d" ,__LINE__);
  931. // condition
  932. sql_append_const(" WHERE ");
  933. sql_append_field(0);
  934. sql_append_const("=");
  935. format_sql_value(Task->request_key(), table_def->field_type(0));
  936. log4cplus_info("line:%d" ,__LINE__);
  937. if (condition_concate(Task->request_condition()) != 0) {
  938. Task->set_error(-EC_BAD_COMMAND, __FUNCTION__,
  939. "Volatile condition not allowed");
  940. return (-7);
  941. }
  942. log4cplus_info("line:%d" ,__LINE__);
  943. if (dbConfig->ordSql) {
  944. sql_append_const(" ");
  945. sql_append_string(dbConfig->ordSql);
  946. }
  947. if (Task->requestInfo.limit_count() > 0) {
  948. sql_printf(" LIMIT %u, %u", Task->requestInfo.limit_start(),
  949. Task->requestInfo.limit_count());
  950. }
  951. log4cplus_info("line:%d" ,__LINE__);
  952. if (error_no !=
  953. 0) { // 主要检查PrintfAppend是否发生过错误,这里统一检查一次
  954. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
  955. log4cplus_error("error occur: %d", error_no);
  956. return (-1);
  957. }
  958. //bug fixed with count *
  959. Ret = Task->prepare_result_no_limit();
  960. if (Ret != 0) {
  961. Task->set_error(-EC_ERROR_BASE, __FUNCTION__,
  962. "task prepare-result error");
  963. log4cplus_error("task prepare-result error: %d, %m", Ret);
  964. return (-2);
  965. }
  966. if (!Task->count_only()) {
  967. Row = new RowValue(table_def);
  968. if (Row == NULL) {
  969. Task->set_error(-ENOMEM, __FUNCTION__, "new row error");
  970. log4cplus_error("%s new RowValue error: %m", "");
  971. return (-3);
  972. }
  973. }
  974. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  975. Ret = db_conn.do_query(DBName, sql.c_str());
  976. log4cplus_debug("SELECT %d %d", Ret, db_conn.get_raw_err_no());
  977. if (Ret != 0) {
  978. delete Row;
  979. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  980. db_conn.get_err_msg());
  981. log4cplus_warning("db query error: %s, pid: %d, group-id: %d",
  982. db_conn.get_err_msg(), getpid(),
  983. self_group_id);
  984. return (-4);
  985. }
  986. Ret = db_conn.use_result();
  987. if (Ret != 0) {
  988. delete Row;
  989. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  990. db_conn.get_err_msg());
  991. log4cplus_warning("db user result error: %s",
  992. db_conn.get_err_msg());
  993. return (-5);
  994. }
  995. nRows = db_conn.res_num;
  996. for (i = 0; i < db_conn.res_num; i++) {
  997. Ret = db_conn.fetch_row();
  998. if (Ret != 0) {
  999. delete Row;
  1000. db_conn.free_result();
  1001. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  1002. db_conn.get_err_msg());
  1003. log4cplus_warning("db fetch row error: %s",
  1004. db_conn.get_err_msg());
  1005. return (-6);
  1006. }
  1007. //get field value length for the row
  1008. _lengths = 0;
  1009. _lengths = db_conn.get_lengths();
  1010. if (0 == _lengths) {
  1011. delete Row;
  1012. db_conn.free_result();
  1013. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  1014. db_conn.get_err_msg());
  1015. log4cplus_warning("db fetch row length error: %s",
  1016. db_conn.get_err_msg());
  1017. return (-6);
  1018. }
  1019. // 将结果转换,并保存到task的result里
  1020. if (Task->count_only()) {
  1021. nRows = atoi(db_conn.Row[0]);
  1022. //bug fixed return count *
  1023. Task->set_total_rows(nRows);
  1024. break;
  1025. } else if ((Ret = save_row(Row, Task)) != 0) {
  1026. delete Row;
  1027. db_conn.free_result();
  1028. Task->set_error(-EC_ERROR_BASE, __FUNCTION__,
  1029. "task append row error");
  1030. log4cplus_error("task append row error: %d", Ret);
  1031. return (-7);
  1032. }
  1033. }
  1034. log4cplus_debug(
  1035. "pid: %d, group-id: %d, result: %d row, db: %s, sql: %s",
  1036. getpid(), self_group_id, nRows, DBName, sql.c_str());
  1037. delete Row;
  1038. db_conn.free_result();
  1039. //bug fixed确认客户端带Limit限制
  1040. if (haslimit) { // 获取总行数
  1041. init_sql_buffer();
  1042. sql_append_const("SELECT FOUND_ROWS() ");
  1043. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1044. Ret = db_conn.do_query(DBName, sql.c_str());
  1045. log4cplus_debug("SELECT %d %d", Ret, db_conn.get_raw_err_no());
  1046. if (Ret != 0) {
  1047. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  1048. db_conn.get_err_msg());
  1049. log4cplus_warning(
  1050. "db query error: %s, pid: %d, group-id: %d",
  1051. db_conn.get_err_msg(), getpid(), self_group_id);
  1052. return (-4);
  1053. }
  1054. Ret = db_conn.use_result();
  1055. if (Ret != 0) {
  1056. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  1057. db_conn.get_err_msg());
  1058. log4cplus_warning("db user result error: %s",
  1059. db_conn.get_err_msg());
  1060. return (-5);
  1061. }
  1062. Ret = db_conn.fetch_row();
  1063. if (Ret != 0) {
  1064. db_conn.free_result();
  1065. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  1066. db_conn.get_err_msg());
  1067. log4cplus_warning("db fetch row error: %s",
  1068. db_conn.get_err_msg());
  1069. return (-6);
  1070. }
  1071. unsigned long totalRows = strtoul(db_conn.Row[0], NULL, 0);
  1072. if (totalRows == 0) {
  1073. if (nRows != 0)
  1074. totalRows =
  1075. Task->requestInfo.limit_start() + nRows;
  1076. else
  1077. totalRows = 0;
  1078. }
  1079. Ret = Task->set_total_rows(totalRows, 1);
  1080. log4cplus_debug("db: total-rows: %lu, ret: %d", totalRows, Ret);
  1081. db_conn.free_result();
  1082. }
  1083. return (0);
  1084. }
  1085. int ConnectorProcess::update_field_concate(const DTCFieldValue *UpdateInfo)
  1086. {
  1087. int i;
  1088. if (UpdateInfo == NULL)
  1089. return (0);
  1090. for (i = 0; i < UpdateInfo->num_fields(); i++) {
  1091. const int fid = UpdateInfo->field_id(i);
  1092. if (table_def->is_volatile(fid))
  1093. continue;
  1094. switch (UpdateInfo->field_operation(i)) {
  1095. case DField::Set:
  1096. if (i > 0)
  1097. sql_append_const(",");
  1098. sql_append_field(fid);
  1099. sql_append_const("=");
  1100. format_sql_value(UpdateInfo->field_value(i),
  1101. UpdateInfo->field_type(i));
  1102. break;
  1103. case DField::Add:
  1104. if (i > 0)
  1105. sql_append_const(",");
  1106. sql_append_field(fid);
  1107. sql_append_const("=");
  1108. sql_append_field(fid);
  1109. sql_append_const("+");
  1110. format_sql_value(UpdateInfo->field_value(i),
  1111. UpdateInfo->field_type(i));
  1112. break;
  1113. default:
  1114. break;
  1115. };
  1116. }
  1117. return 0;
  1118. }
  1119. int ConnectorProcess::default_value_concate(const DTCFieldValue *UpdateInfo)
  1120. {
  1121. int i;
  1122. uint8_t mask[32];
  1123. FIELD_ZERO(mask);
  1124. if (UpdateInfo)
  1125. UpdateInfo->build_field_mask(mask);
  1126. for (i = 1; i <= table_def->num_fields(); i++) {
  1127. if (FIELD_ISSET(i, mask) || table_def->is_volatile(i))
  1128. continue;
  1129. sql_append_const(",");
  1130. sql_append_field(i);
  1131. sql_append_const("=");
  1132. format_sql_value(table_def->default_value(i),
  1133. table_def->field_type(i));
  1134. }
  1135. return 0;
  1136. }
  1137. int ConnectorProcess::process_insert(DtcJob *Task)
  1138. {
  1139. int Ret;
  1140. set_title("INSERT...");
  1141. init_sql_buffer();
  1142. init_table_name(Task->request_key(), table_def->field_type(0));
  1143. sql_append_const("INSERT INTO ");
  1144. sql_append_table();
  1145. sql_append_const(" SET ");
  1146. std::map<std::string, std::string> fieldValues;
  1147. if (Task->request_key()) {
  1148. fieldValues[table_def->field_name(0)] = value_to_str(
  1149. Task->request_key(), table_def->field_type(0));
  1150. }
  1151. if (Task->request_operation()) {
  1152. const DTCFieldValue *updateInfo = Task->request_operation();
  1153. for (int i = 0; i < updateInfo->num_fields(); ++i) {
  1154. int fid = updateInfo->field_id(i);
  1155. if (table_def->is_volatile(fid))
  1156. continue;
  1157. fieldValues[table_def->field_name(fid)] =
  1158. value_to_str(updateInfo->field_value(i),
  1159. updateInfo->field_type(i));
  1160. }
  1161. }
  1162. for (std::map<std::string, std::string>::iterator iter =
  1163. fieldValues.begin();
  1164. iter != fieldValues.end(); ++iter) {
  1165. sql_append_string(&left_quote, 1);
  1166. sql_append_string(iter->first.c_str(), iter->first.length());
  1167. sql_append_string(&right_quote, 1);
  1168. sql_append_const("=");
  1169. sql_append_string(iter->second.c_str(), iter->second.length());
  1170. sql_append_const(",");
  1171. }
  1172. if (sql.at(-1) == ',')
  1173. sql.trunc(-1);
  1174. if (error_no != 0) { // 主要检查PrintfAppend是否发生过错误
  1175. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
  1176. log4cplus_error("error occur: %d", error_no);
  1177. return (-1);
  1178. }
  1179. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1180. Ret = db_conn.do_query(DBName, sql.c_str());
  1181. log4cplus_debug("INSERT %d %d", Ret, db_conn.get_raw_err_no());
  1182. if (Ret != 0) {
  1183. int err = db_conn.get_err_no();
  1184. Task->set_error_dup(err, __FUNCTION__, db_conn.get_err_msg());
  1185. if (err != -ER_DUP_ENTRY)
  1186. log4cplus_warning("db query error: %s",
  1187. db_conn.get_err_msg());
  1188. else
  1189. log4cplus_info("db query error: %s",
  1190. db_conn.get_err_msg());
  1191. return (-1);
  1192. }
  1193. Task->resultInfo.set_affected_rows(db_conn.affected_rows());
  1194. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1195. if (table_def->has_auto_increment()) {
  1196. uint64_t id = db_conn.insert_id();
  1197. if (id) {
  1198. Task->resultInfo.set_insert_id(id);
  1199. if (table_def->key_auto_increment())
  1200. Task->resultInfo.set_key(id);
  1201. }
  1202. }
  1203. return (0);
  1204. }
  1205. int ConnectorProcess::process_update(DtcJob *Task)
  1206. {
  1207. int Ret;
  1208. if (Task->request_operation() == NULL) {
  1209. Task->set_error(-EC_ERROR_BASE, __FUNCTION__,
  1210. "update field not found");
  1211. return (-1);
  1212. }
  1213. if (Task->request_operation()->has_type_commit() == 0) {
  1214. // pure volatile fields update, always succeed
  1215. return (0);
  1216. }
  1217. set_title("UPDATE...");
  1218. init_sql_buffer();
  1219. init_table_name(Task->request_key(), table_def->field_type(0));
  1220. sql_append_const("UPDATE ");
  1221. sql_append_table();
  1222. sql_append_const(" SET ");
  1223. update_field_concate(Task->request_operation());
  1224. // key
  1225. sql_append_const(" WHERE ");
  1226. sql_append_field(0);
  1227. sql_append_const("=");
  1228. format_sql_value(Task->request_key(), table_def->field_type(0));
  1229. // condition
  1230. if (condition_concate(Task->request_condition()) != 0) {
  1231. Task->set_error(-EC_BAD_COMMAND, __FUNCTION__,
  1232. "Volatile condition not allowed");
  1233. return (-7);
  1234. }
  1235. if (Task->requestInfo.limit_count() > 0) {
  1236. sql_printf(" LIMIT %u", Task->requestInfo.limit_count());
  1237. }
  1238. if (error_no != 0) { // 主要检查PrintfAppend是否发生过错误
  1239. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
  1240. log4cplus_error("error occur: %d", error_no);
  1241. return (-1);
  1242. }
  1243. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1244. Ret = db_conn.do_query(DBName, sql.c_str());
  1245. log4cplus_debug("UPDATE %d %d", Ret, db_conn.get_raw_err_no());
  1246. if (Ret != 0) {
  1247. int err = db_conn.get_err_no();
  1248. Task->set_error_dup(err, __FUNCTION__, db_conn.get_err_msg());
  1249. if (err != -ER_DUP_ENTRY)
  1250. log4cplus_warning("db query error: %s",
  1251. db_conn.get_err_msg());
  1252. else
  1253. log4cplus_info("db query error: %s",
  1254. db_conn.get_err_msg());
  1255. return -1;
  1256. }
  1257. Task->resultInfo.set_affected_rows(db_conn.affected_rows());
  1258. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1259. return (0);
  1260. }
  1261. int ConnectorProcess::process_delete(DtcJob *Task)
  1262. {
  1263. int Ret;
  1264. set_title("DELETE...");
  1265. init_sql_buffer();
  1266. init_table_name(Task->request_key(), table_def->field_type(0));
  1267. sql_append_const("DELETE FROM ");
  1268. sql_append_table();
  1269. // key
  1270. sql_append_const(" WHERE ");
  1271. sql_append_field(0);
  1272. sql_append_const("=");
  1273. format_sql_value(Task->request_key(), table_def->field_type(0));
  1274. // condition
  1275. if (condition_concate(Task->request_condition()) != 0) {
  1276. Task->set_error(-EC_BAD_COMMAND, __FUNCTION__,
  1277. "Volatile condition not allowed");
  1278. return (-7);
  1279. }
  1280. if (Task->requestInfo.limit_count() > 0) {
  1281. sql_printf(" LIMIT %u", Task->requestInfo.limit_count());
  1282. }
  1283. if (error_no !=
  1284. 0) { // 主要检查PrintfAppend是否发生过错误,这里统一检查一次
  1285. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
  1286. log4cplus_error("error occur: %d", error_no);
  1287. return (-1);
  1288. }
  1289. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1290. Ret = db_conn.do_query(DBName, sql.c_str());
  1291. log4cplus_debug("DELETE %d %d", Ret, db_conn.get_raw_err_no());
  1292. if (Ret != 0) {
  1293. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  1294. db_conn.get_err_msg());
  1295. log4cplus_warning("db query error: %s", db_conn.get_err_msg());
  1296. return (-1);
  1297. }
  1298. Task->resultInfo.set_affected_rows(db_conn.affected_rows());
  1299. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1300. log4cplus_debug("affected row: %d", db_conn.affected_rows());
  1301. return (0);
  1302. }
  1303. int ConnectorProcess::do_process(DtcJob* Task)
  1304. {
  1305. log4cplus_info("line:%d" ,__LINE__);
  1306. if (Task == NULL) {
  1307. log4cplus_error("Task is NULL!%s", "");
  1308. return (-1);
  1309. }
  1310. log4cplus_info("line:%d" ,__LINE__);
  1311. table_def = TableDefinitionManager::instance()->get_cur_table_def();
  1312. switch (Task->request_code()) {
  1313. case DRequest::TYPE_PASS:
  1314. case DRequest::Purge:
  1315. case DRequest::Flush:
  1316. return 0;
  1317. case DRequest::Get:
  1318. return process_select(Task);
  1319. case DRequest::Insert:
  1320. return process_insert(Task);
  1321. case DRequest::Update:
  1322. return process_update(Task);
  1323. case DRequest::Delete:
  1324. return process_delete(Task);
  1325. case DRequest::Replace:
  1326. return process_replace(Task);
  1327. // case DRequest::ReloadConfig:
  1328. // return process_reload_config(Task);
  1329. default:
  1330. Task->set_error(-EC_BAD_COMMAND, __FUNCTION__,
  1331. "invalid request-code");
  1332. return (-1);
  1333. }
  1334. }
  1335. int ConnectorProcess::process_replace(DtcJob *Task)
  1336. {
  1337. int Ret;
  1338. set_title("REPLACE...");
  1339. init_sql_buffer();
  1340. init_table_name(Task->request_key(), table_def->field_type(0));
  1341. sql_append_const("REPLACE INTO ");
  1342. sql_append_table();
  1343. sql_append_const(" SET ");
  1344. sql_append_field(0);
  1345. sql_append_const("=");
  1346. format_sql_value(Task->request_key(), table_def->field_type(0));
  1347. sql_append_const(",");
  1348. /* 补全缺失的默认值 */
  1349. if (Task->request_operation())
  1350. update_field_concate(Task->request_operation());
  1351. else if (sql.at(-1) == ',') {
  1352. sql.trunc(-1);
  1353. }
  1354. if (error_no != 0) { // 主要检查PrintfAppend是否发生过错误
  1355. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
  1356. log4cplus_error("error occur: %d", error_no);
  1357. return (-1);
  1358. }
  1359. if (error_no != 0) { // 主要检查PrintfAppend是否发生过错误
  1360. Task->set_error(-EC_ERROR_BASE, __FUNCTION__, "printf error");
  1361. log4cplus_error("error occur: %d", error_no);
  1362. return (-1);
  1363. }
  1364. log4cplus_debug("db: %s, sql: %s", DBName, sql.c_str());
  1365. Ret = db_conn.do_query(DBName, sql.c_str());
  1366. log4cplus_debug("REPLACE %d %d", Ret, db_conn.get_raw_err_no());
  1367. if (Ret != 0) {
  1368. Task->set_error_dup(db_conn.get_err_no(), __FUNCTION__,
  1369. db_conn.get_err_msg());
  1370. log4cplus_warning("db query error: %s", db_conn.get_err_msg());
  1371. return (-1);
  1372. }
  1373. Task->resultInfo.set_affected_rows(db_conn.affected_rows());
  1374. log4cplus_debug("%s",
  1375. "ConnectorProcess::ProcessReplaceTask() successful.");
  1376. return 0;
  1377. }
  1378. ConnectorProcess::~ConnectorProcess()
  1379. {
  1380. }
  1381. void ConnectorProcess::init_title(int group, int role)
  1382. {
  1383. title_prefix_size = snprintf(name, sizeof(name), "connector%d%c", group,
  1384. MACHINEROLESTRING[role]);
  1385. memcpy(title, name, title_prefix_size);
  1386. title[title_prefix_size++] = ':';
  1387. title[title_prefix_size++] = ' ';
  1388. title[title_prefix_size] = '\0';
  1389. title[sizeof(title) - 1] = '\0';
  1390. }
  1391. void ConnectorProcess::set_title(const char *status)
  1392. {
  1393. strncpy(title + title_prefix_size, status,
  1394. sizeof(title) - 1 - title_prefix_size);
  1395. set_proc_title(title);
  1396. }
  1397. int ConnectorProcess::process_reload_config(DtcJob *Task)
  1398. {
  1399. int cacheKey = DbConfig::get_shm_id(g_dtc_config->get_config_node());;
  1400. BlockProperties stInfo;
  1401. BufferPond cachePool;
  1402. memset(&stInfo, 0, sizeof(stInfo));
  1403. stInfo.ipc_mem_key = cacheKey;
  1404. stInfo.key_size = TableDefinitionManager::instance()
  1405. ->get_cur_table_def()
  1406. ->key_format();
  1407. stInfo.read_only = 1;
  1408. if (cachePool.cache_open(&stInfo)) {
  1409. log4cplus_error("%s", cachePool.error());
  1410. Task->set_error(-EC_RELOAD_CONFIG_FAILED, __FUNCTION__,
  1411. "open cache error!");
  1412. return -1;
  1413. }
  1414. cachePool.reload_table();
  1415. log4cplus_error(
  1416. "cmd notify work helper reload table, tableIdx : [%d], pid : [%d]",
  1417. cachePool.shm_table_idx(), getpid());
  1418. return 0;
  1419. }