my_parse.c 15 KB


  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <inttypes.h>
  17. #include <math.h>
  18. #include <inttypes.h>
  19. #include "da_protocal.h"
  20. #include "../da_msg.h"
  21. #include "../da_conn.h"
  22. #include "../da_request.h"
  23. #include "../da_buf.h"
  24. #include "../da_util.h"
  25. #include "../da_errno.h"
  26. #include "../da_server.h"
  27. #include "../da_time.h"
  28. #include "../da_core.h"
  29. #include "my_comm.h"
  30. #include "my_command.h"
  31. #define MYSQL_HEADER_SIZE 4
  32. #define MAXPACKETSIZE (64 << 20)
  33. #define MultiKeyValue 32
  34. #define __FLTFMT__ "%LA"
  35. #define CONVERT_NULL_TO_EMPTY_STRING 1
  36. enum fieldtype {
  37. None = 0,
  38. Signed = 1,
  39. Unsigned = 2,
  40. Float = 3,
  41. String = 4,
  42. Binary = 5,
  43. TotalType,
  44. };
  45. enum codefield {
  46. FD_HEADER = 0,
  47. FD_VERSION = 1,
  48. FD_TABLEDEFINE = 2,
  49. FD_REQUEST = 3,
  50. FD_RESULTINFO = 4,
  51. FD_UPDATEINFO = 5,
  52. FD_CONDITIONINFO = 6,
  53. FD_FIELDSET = 7,
  54. FD_RESULTSET = 8,
  55. FD_TOTALFIELD = 9,
  56. };
  57. enum codestate {
  58. ST_ID = 0,
  59. ST_LENTH = 1,
  60. ST_VALUE = 2,
  61. };
  62. /*
  63. * parse request msg
  64. */
  65. void my_parse_req(struct msg *r)
  66. {
  67. struct mbuf *b;
  68. uint8_t *p;
  69. uint32_t input_packet_length = 0;
  70. enum enum_server_command command;
  71. int rc;
  72. log_debug("my_parse_req entry.");
  73. b = STAILQ_LAST(&r->buf_q, mbuf, next);
  74. p = r->pos;
  75. if (p < b->last) {
  76. if (b->last - p < MYSQL_HEADER_SIZE) {
  77. log_error(
  78. "receive size small than package header. id:%d",
  79. r->id);
  80. p = b->last;
  81. goto end;
  82. }
  83. input_packet_length = uint_trans_3(p);
  84. log_debug("uint_trans_3:0x%x 0x%x 0x%x, len:%d", p[0], p[1], p[2],
  85. input_packet_length);
  86. p += 3;
  87. r->pkt_nr = (uint8_t)(*p); // mysql sequence id
  88. p++;
  89. log_debug("pkt_nr:%d, packet len:%d", r->pkt_nr,
  90. input_packet_length);
  91. if (p + input_packet_length > b->last) {
  92. p = b->last;
  93. goto end;
  94. }
  95. if (r->owner->stage == CONN_STAGE_LOGGED_IN) {
  96. rc = my_get_command(p, input_packet_length, r,
  97. &command);
  98. if (rc) {
  99. if (rc < 0) {
  100. if(rc == -6)
  101. {
  102. r->parse_res = MSG_PARSE_ERROR_NO_SELECTED_DB;
  103. goto custom;
  104. }
  105. log_error("parse command error:%d\n",
  106. rc);
  107. goto error;
  108. }
  109. r->command = command;
  110. r->keyCount = 1;
  111. r->cmd = MSG_REQ_SVRADMIN;
  112. }
  113. }
  114. log_debug("AAAAAAAAA 444444444444");
  115. p += input_packet_length;
  116. goto success;
  117. }
  118. end:
  119. ASSERT(p == b->last);
  120. r->pos = p;
  121. if (b->last == b->end) {
  122. r->parse_res = MSG_PARSE_REPAIR;
  123. } else {
  124. r->parse_res = MSG_PARSE_AGAIN;
  125. }
  126. return;
  127. custom:
  128. r->pos = b->last;
  129. log_debug("parse msg:%" PRIu64 " error!", r->id);
  130. errno = EINVAL;
  131. log_debug("my_parse_req leave.");
  132. return ;
  133. error:
  134. r->pos = b->last;
  135. r->parse_res = MSG_PARSE_ERROR;
  136. log_debug("parse msg:%" PRIu64 " error!", r->id);
  137. errno = EINVAL;
  138. log_debug("my_parse_req leave.");
  139. return;
  140. success:
  141. r->pos = p;
  142. r->parse_res = MSG_PARSE_OK;
  143. log_debug("parse msg:%" PRIu64 " success!", r->id);
  144. log_debug("my_parse_req leave.");
  145. return;
  146. }
  147. void my_parse_rsp(struct msg *r)
  148. {
  149. struct mbuf *b;
  150. uint8_t *p;
  151. int ret;
  152. log_debug("my_parse_rsp entry.");
  153. b = STAILQ_LAST(&r->buf_q, mbuf, next);
  154. p = r->pos;
  155. r->token = NULL;
  156. if (p < b->last) {
  157. if (b->last - p <
  158. sizeof(struct DTC_HEADER_V2) + MYSQL_HEADER_SIZE) {
  159. log_error(
  160. "receive size small than package header. id:%d",
  161. r->id);
  162. p = b->last;
  163. r->parse_res = MSG_PARSE_ERROR;
  164. errno = EINVAL;
  165. return;
  166. }
  167. r->peerid = ((struct DTC_HEADER_V2 *)p)->id;
  168. r->admin = ((struct DTC_HEADER_V2 *)p)->admin;
  169. p = p + sizeof(struct DTC_HEADER_V2);
  170. r->pkt_nr = (uint8_t)(p[3]); // mysql sequence id
  171. log_debug("pkt_nr:%d, peerid:%d, id:%d, admin:%d", r->pkt_nr,
  172. r->peerid, r->id, r->admin);
  173. p = p + MYSQL_HEADER_SIZE;
  174. p = b->last;
  175. r->pos = p;
  176. r->parse_res = MSG_PARSE_OK;
  177. log_debug("parse msg:%" PRIu64 " success!", r->id);
  178. } else {
  179. r->parse_res = MSG_PARSE_ERROR;
  180. log_debug("parse msg:%" PRIu64 " error!", r->id);
  181. errno = EINVAL;
  182. }
  183. log_debug("my_parse_rsp leave.");
  184. return;
  185. }
  186. int my_get_command(uint8_t *input_raw_packet, uint32_t input_packet_length,
  187. struct msg *r, enum enum_server_command *cmd)
  188. {
  189. *cmd = (enum enum_server_command)(uchar)input_raw_packet[0];
  190. log_debug("cmd: %d", *cmd);
  191. if (*cmd >= COM_END)
  192. *cmd = COM_END; // Wrong command
  193. if (parse_packet(input_raw_packet + 1, input_packet_length - 1, r,
  194. *cmd))
  195. return 1;
  196. return -1;
  197. }
  198. int my_do_command(struct msg *msg)
  199. {
  200. int rc = NEXT_RSP_ERROR;
  201. switch (msg->command) {
  202. case COM_INIT_DB:{
  203. rc = NEXT_RSP_OK;
  204. break;
  205. }
  206. case COM_REGISTER_SLAVE:
  207. case COM_RESET_CONNECTION:
  208. case COM_CLONE:
  209. case COM_CHANGE_USER: {
  210. rc = NEXT_RSP_OK;
  211. break;
  212. }
  213. case COM_STMT_EXECUTE:
  214. case COM_STMT_FETCH:
  215. case COM_STMT_SEND_LONG_DATA:
  216. case COM_STMT_PREPARE:
  217. case COM_STMT_CLOSE:
  218. case COM_STMT_RESET: {
  219. rc = NEXT_RSP_ERROR;
  220. break;
  221. }
  222. case COM_QUERY: {
  223. log_debug("COM_QUERY, admin: %d", msg->admin);
  224. if (msg->admin == CMD_SQL_PASS_OK)
  225. rc = NEXT_RSP_OK;
  226. else if (msg->admin == CMD_SQL_PASS_NULL)
  227. rc = NEXT_RSP_NULL;
  228. else
  229. rc = NEXT_FORWARD;
  230. break;
  231. }
  232. case COM_FIELD_LIST: // This isn't actually needed
  233. case COM_QUIT:
  234. case COM_BINLOG_DUMP_GTID:
  235. case COM_BINLOG_DUMP:
  236. case COM_REFRESH:
  237. case COM_STATISTICS:
  238. case COM_PING:
  239. case COM_PROCESS_INFO:
  240. case COM_PROCESS_KILL:
  241. case COM_SET_OPTION:
  242. case COM_DEBUG:
  243. rc = NEXT_RSP_OK;
  244. break;
  245. case COM_SLEEP:
  246. case COM_CONNECT: // Impossible here
  247. case COM_TIME: // Impossible from client
  248. case COM_DELAYED_INSERT: // INSERT DELAYED has been removed.
  249. case COM_END:
  250. default:
  251. log_error("error command:%d", msg->command);
  252. rc = NEXT_RSP_ERROR;
  253. break;
  254. }
  255. return rc;
  256. }
  257. static int dtc_decode_value(enum fieldtype type, int lenth, uint8_t *p,
  258. CValue *val)
  259. {
  260. uint8_t *q;
  261. switch (type) {
  262. case None:
  263. break;
  264. case Signed:
  265. case Unsigned:
  266. if (lenth == 0 || lenth > 8) {
  267. goto decode_value_error;
  268. }
  269. q = (uint8_t *)p + 1;
  270. int64_t s64;
  271. s64 = *(int8_t *)p;
  272. switch (lenth) {
  273. case 8:
  274. s64 = (s64 << 8) | *q++;
  275. case 7:
  276. s64 = (s64 << 8) | *q++;
  277. case 6:
  278. s64 = (s64 << 8) | *q++;
  279. case 5:
  280. s64 = (s64 << 8) | *q++;
  281. case 4:
  282. s64 = (s64 << 8) | *q++;
  283. case 3:
  284. s64 = (s64 << 8) | *q++;
  285. case 2:
  286. s64 = (s64 << 8) | *q++;
  287. }
  288. val->s64 = s64;
  289. break;
  290. case Float:
  291. if (lenth < 3)
  292. goto decode_value_error;
  293. if (p[lenth - 1] != '\0')
  294. goto decode_value_error;
  295. if (!strcmp((char *)p, "NAN"))
  296. val->flt = NAN;
  297. else if (!strcmp((char *)p, "INF"))
  298. val->flt = INFINITY;
  299. else if (!strcmp((char *)p, "-INF"))
  300. val->flt = -INFINITY;
  301. else {
  302. long double ldf;
  303. if (sscanf((char *)p, __FLTFMT__, &ldf) != 1)
  304. goto decode_value_error;
  305. val->flt = ldf;
  306. }
  307. break;
  308. case String:
  309. case Binary:
  310. if (lenth == 0) {
  311. #if CONVERT_NULL_TO_EMPTY_STRING
  312. val->str.data = p;
  313. val->str.len = 0;
  314. #else
  315. val->str.data = NULL;
  316. val->str.len = 0;
  317. #endif
  318. } else {
  319. if (p[lenth - 1] != '\0')
  320. goto decode_value_error;
  321. val->str.data = p;
  322. val->str.len = lenth - 1;
  323. }
  324. break;
  325. default:
  326. goto decode_value_error;
  327. }
  328. return 0;
  329. decode_value_error:
  330. return -1;
  331. }
  332. static uint64_t randomHashSeed = 1;
  333. int my_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq)
  334. {
  335. int status, i;
  336. struct keypos *temp_kpos;
  337. CValue val;
  338. log_debug("key count:%d, cmd:%d", r->keyCount, r->cmd);
  339. if (r->cmd == MSG_NOP || r->admin != CMD_NOP) {
  340. uint64_t randomkey = randomHashSeed++;
  341. r->idx = msg_backend_idx(r, (uint8_t *)&randomkey,
  342. sizeof(uint64_t));
  343. return 0;
  344. }
  345. else if(r->layer == 2 || r->layer == 3) {
  346. r->idx = msg_backend_idx(r, NULL, 0);
  347. return 0;
  348. } else {
  349. if (r->keyCount == 0) {
  350. log_error(" request msg id: %" PRIu64
  351. " request without key",
  352. r->id);
  353. r->err = MSG_NOKEY_ERR;
  354. r->error = 1;
  355. return -1;
  356. } else if (r->keyCount == 1) {
  357. temp_kpos = &r->keys[0];
  358. switch (r->keytype) {
  359. case Signed:
  360. case Unsigned: {
  361. status = dtc_decode_value(
  362. Unsigned,
  363. temp_kpos->end - temp_kpos->start,
  364. temp_kpos->start, &val);
  365. if (status < 0) {
  366. log_error("decode value:%d", status);
  367. return -1;
  368. }
  369. log_debug("val.u64:%d", val.u64);
  370. r->idx = msg_backend_idx(r, (uint8_t *)&val.u64,
  371. sizeof(uint64_t));
  372. log_debug("r->idx:%d", r->idx);
  373. break;
  374. }
  375. case String: {
  376. int len = temp_kpos->end - temp_kpos->start;
  377. char temp[len + 1];
  378. *temp = len;
  379. for (i = 1; i < len + 1; i++) {
  380. temp[i] = lower((
  381. char)(temp_kpos->start)[i - 1]);
  382. }
  383. r->idx = msg_backend_idx(r, (uint8_t *)temp,
  384. len + 1);
  385. log_debug(
  386. "debug,len :%d the packet key is %u '%s' the hash key(r->idx): %d ",
  387. len, *temp, temp + 1, r->idx);
  388. break;
  389. }
  390. case Binary: {
  391. int len = temp_kpos->end - temp_kpos->start;
  392. char temp[len + 1];
  393. *temp = len;
  394. memcpy(temp + 1, temp_kpos->start, len);
  395. r->idx = msg_backend_idx(r, (uint8_t *)temp,
  396. len + 1);
  397. log_debug(
  398. "debug,len :%d the packet key is %u '%s' the hash key(r->idx): %d ",
  399. len, *temp, temp + 1, r->idx);
  400. break;
  401. }
  402. }
  403. log_debug("return status:%d", status);
  404. return status;
  405. } else {
  406. if (r->cmd == MSG_REQ_GET) {
  407. //status = dtc_fragment_get(r, ncontinuum, frag_msgq);
  408. //GET is not supported
  409. status = -1;
  410. } else if (r->cmd == MSG_REQ_UPDATE) {
  411. //MSET is not supported
  412. status = -1;
  413. } else if (r->cmd == MSG_REQ_DELETE) {
  414. //MDEL is not supported
  415. status = -1;
  416. } else {
  417. //other multi operation is not supported
  418. status = -1;
  419. }
  420. return status;
  421. }
  422. }
  423. }
  424. int _check_condition(struct string *str)
  425. {
  426. int i;
  427. char *condition = " WHERE ";
  428. char *insert_char = "INSERT INTO ";
  429. if (string_empty(str))
  430. return -1;
  431. for (i = 0; i < str->len; i++) {
  432. if (da_strncmp(str->data + i, condition,
  433. da_strlen(condition)) == 0 &&
  434. i + da_strlen(condition) < str->len)
  435. return i + da_strlen(condition);
  436. //if(da_strncmp(str->data+i, insert_char, da_strlen(insert_char)) == 0 && i + da_strlen(insert_char) < str->len)
  437. // return i + da_strlen(insert_char);
  438. }
  439. return -2;
  440. }
  441. bool check_cmd_operation(struct string *str)
  442. {
  443. int i = 0;
  444. char *condition_1 = "SELECT ";
  445. char *condition_2 = "DELETE FROM ";
  446. char *condition_3 = "UPDATE ";
  447. char *condition_4 = "INSERT INTO ";
  448. if (string_empty(str))
  449. return false;
  450. if (da_strncmp(str->data + i, condition_1, da_strlen(condition_1)) !=
  451. 0 &&
  452. da_strncmp(str->data + i, condition_2, da_strlen(condition_2)) !=
  453. 0 &&
  454. da_strncmp(str->data + i, condition_3, da_strlen(condition_3)) !=
  455. 0 &&
  456. da_strncmp(str->data + i, condition_4, da_strlen(condition_4)) != 0)
  457. return true;
  458. else
  459. return false;
  460. }
  461. bool check_cmd_select(struct string *str)
  462. {
  463. return false;
  464. }
  465. int get_mid_by_dbname(const char* dbname, const char* sql, struct msg* r)
  466. {
  467. int mid = 0;
  468. struct context* ctx = NULL;
  469. struct conn *c_conn = NULL;
  470. int sql_len = 0;
  471. c_conn = r->owner;
  472. ctx = conn_to_ctx(c_conn);
  473. if(dbname && strlen(dbname) > 0)
  474. {
  475. char* cmp_dbname[250] = {0};
  476. sprintf(cmp_dbname, "%s.", dbname);
  477. struct array *pool = &(ctx->pool);
  478. int i;
  479. for (i = 0; i < array_n(pool); i++) {
  480. struct server_pool *p = (struct server_pool *)array_get(pool, i);
  481. if(string_empty(&p->name))
  482. continue;
  483. log_info("server pool module name: %s, cmp dbname: %s", p->name.data, cmp_dbname);
  484. if(da_strncmp(p->name.data, cmp_dbname, strlen(cmp_dbname)) == 0)
  485. {
  486. mid = p->mid;
  487. }
  488. }
  489. }
  490. if(sql)
  491. {
  492. sql_len = strlen(sql);
  493. if(sql_len > 0)
  494. {
  495. struct array *pool = &(ctx->pool);
  496. int i, j;
  497. for (i = 0; i < array_n(pool); i++) {
  498. struct string cmp_name;
  499. struct server_pool *p = (struct server_pool *)array_get(pool, i);
  500. if(string_empty(&p->name))
  501. continue;
  502. string_copy(&cmp_name, p->name.data, p->name.len);
  503. string_upper(&cmp_name);
  504. for(j = 0; j < sql_len; j++)
  505. {
  506. if(sql_len - j > cmp_name.len && da_strncmp(sql + j, cmp_name.data, cmp_name.len) == 0)
  507. {
  508. mid = p->mid;
  509. }
  510. }
  511. log_info("server pool module name: %s, cmp sql: %s", cmp_name.data, sql);
  512. }
  513. }
  514. }
  515. log_info("mid result: %d", mid);
  516. return mid;
  517. }
  518. void get_tablename(struct msg* r, uint8_t* sql, int sql_len)
  519. {
  520. char tablename[260] = {0};
  521. if(sql == NULL || sql_len <= 0)
  522. return ;
  523. log_debug("AAAAAAAAA 555555555555");
  524. int ret = sql_parse_table(sql, &tablename);
  525. if(ret > 0)
  526. {
  527. log_debug("AAAAAAAAA 666666666666");
  528. string_copy(&r->table_name, tablename, strlen(tablename));
  529. }
  530. log_debug("AAAAAAAAA 77777777777 %s", tablename);
  531. }
  532. int my_get_route_key(uint8_t *sql, int sql_len, int *start_offset,
  533. int *end_offset, const char* dbname, struct msg* r)
  534. {
  535. int i = 0;
  536. struct string str;
  537. int ret = 0;
  538. int layer = 0;
  539. string_init(&str);
  540. string_copy(&str, sql, sql_len);
  541. if (string_empty(&str))
  542. return -1;
  543. if (!string_upper(&str))
  544. return -9;
  545. log_debug("sql: %s", str.data);
  546. if(dbname && strlen(dbname))
  547. {
  548. log_debug("dbname len:%d, dbname: %s", strlen(dbname), dbname);
  549. }
  550. int mid = get_mid_by_dbname(dbname, str.data, r);
  551. char conf_path[260] = {0};
  552. if(mid != 0)
  553. {
  554. sprintf(conf_path, "/etc/dtc/dtc-conf-%d.yaml", mid);
  555. r->mid = mid;
  556. }
  557. get_tablename(r, str.data, str.len);
  558. if(r->table_name.len > 0)
  559. log_debug("table name: %s", r->table_name.data);
  560. char* res = NULL;
  561. char strkey[260] = {0};
  562. memset(strkey, 0, 260);
  563. if(strlen(conf_path) > 0)
  564. {
  565. res = rule_get_key(conf_path);
  566. if(res == NULL)
  567. {
  568. ret = -5;
  569. goto done;
  570. }
  571. else
  572. {
  573. strcpy(strkey, res);
  574. log_debug("strkey: %s", strkey);
  575. }
  576. }
  577. r->keytype = rule_get_key_type(conf_path);
  578. log_debug("strkey type: %d", r->keytype);
  579. //agent sql route, rule engine
  580. layer = rule_sql_match(str.data, dbname, strlen(conf_path) > 0 ? conf_path : NULL);
  581. log_debug("rule layer: %d", layer);
  582. if(layer != 1)
  583. {
  584. ret = layer;
  585. goto done;
  586. }
  587. if (check_cmd_operation(&str))
  588. return -2;
  589. if (check_cmd_select(&str))
  590. return -2;
  591. i = _check_condition(&str);
  592. if (i < 0) {
  593. ret = -2;
  594. log_error("check condition error code:%d", i);
  595. goto done;
  596. }
  597. for (; i < str.len; i++) {
  598. if (str.len - i >= strlen(strkey)) {
  599. log_debug(
  600. "key: %s, key len:%d, str.len:%d i:%d dtc_key_len:%d str.data + i:%s ", strkey, strlen(strkey),
  601. str.len, i, strlen(strkey), str.data + i);
  602. if (da_strncmp(str.data + i, strkey, strlen(strkey)) == 0)
  603. {
  604. int j;
  605. for (j = i + strlen(strkey); j < str.len; j++)
  606. {
  607. if (str.data[j] == '=')
  608. {
  609. j++;
  610. //strip space.
  611. while (j < str.len && str.data[j] == ' ')
  612. {
  613. j++;
  614. }
  615. if (j < str.len)
  616. {
  617. *start_offset = j;
  618. int k = 0;
  619. for (k = j; k < str.len;
  620. k++) {
  621. if (sql[k + 1] == ' ' || sql[k + 1] == ';' || k + 1 == str.len)
  622. {
  623. *end_offset = k + 1;
  624. ret = layer;
  625. goto done;
  626. }
  627. }
  628. ret = -4;
  629. goto done;
  630. }
  631. else
  632. {
  633. ret = -5;
  634. goto done;
  635. }
  636. }
  637. }
  638. ret = -2;
  639. goto done;
  640. }
  641. }
  642. }
  643. ret = -3;
  644. goto done;
  645. done:
  646. string_deinit(&str);
  647. return ret;
  648. }