packet_server.cc 41 KB


  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <stdint.h>
  17. #include <errno.h>
  18. #include <unistd.h>
  19. #include <fcntl.h>
  20. #include <stdio.h>
  21. #include <sys/socket.h>
  22. #include <new>
  23. #include "version.h"
  24. #include "packet.h"
  25. #include "../table/table_def.h"
  26. #include "../decode/decode.h"
  27. #include "../task/task_request.h"
  28. #include "../log/log.h"
  29. //#include "mysql/field_types.h"
  30. struct MetaSelections{
  31. const char* p_req_string;
  32. int i_select_type;
  33. const char* p_val;
  34. };
  35. enum enum_select_types {
  36. E_SELECT_NONE,
  37. E_SELECT_DTC_TABLES,
  38. E_SELECT_DTC_YAML,
  39. E_SELECT_TABLE_YAML
  40. };
  41. const MetaSelections meta_selections[] = {
  42. {"select dtctables" , E_SELECT_DTC_TABLES , NULL},
  43. {"select dtcyaml" , E_SELECT_DTC_YAML , "/etc/dtc/dtc.yaml"},
  44. {"select tableyaml" , E_SELECT_TABLE_YAML , "/etc/dtc/dtc.yaml"}
  45. };
  46. enum enum_field_types { MYSQL_TYPE_DECIMAL, MYSQL_TYPE_TINY,
  47. MYSQL_TYPE_SHORT, MYSQL_TYPE_LONG,
  48. MYSQL_TYPE_FLOAT, MYSQL_TYPE_DOUBLE,
  49. MYSQL_TYPE_NULL, MYSQL_TYPE_TIMESTAMP,
  50. MYSQL_TYPE_LONGLONG,MYSQL_TYPE_INT24,
  51. MYSQL_TYPE_DATE, MYSQL_TYPE_TIME,
  52. MYSQL_TYPE_DATETIME, MYSQL_TYPE_YEAR,
  53. MYSQL_TYPE_NEWDATE, MYSQL_TYPE_VARCHAR,
  54. MYSQL_TYPE_BIT,
  55. MYSQL_TYPE_TIMESTAMP2,
  56. MYSQL_TYPE_DATETIME2,
  57. MYSQL_TYPE_TIME2,
  58. MYSQL_TYPE_NEWDECIMAL=246,
  59. MYSQL_TYPE_ENUM=247,
  60. MYSQL_TYPE_SET=248,
  61. MYSQL_TYPE_TINY_BLOB=249,
  62. MYSQL_TYPE_MEDIUM_BLOB=250,
  63. MYSQL_TYPE_LONG_BLOB=251,
  64. MYSQL_TYPE_BLOB=252,
  65. MYSQL_TYPE_VAR_STRING=253,
  66. MYSQL_TYPE_STRING=254,
  67. MYSQL_TYPE_GEOMETRY=255
  68. };
  69. /* not yet pollized*/
  70. int Packet::encode_detect(const DTCTableDefinition *tdef, int sn)
  71. {
  72. DTC_HEADER_V1 header;
  73. header.version = 1;
  74. header.scts = 8;
  75. header.flags = DRequest::Flag::KeepAlive;
  76. header.cmd = DRequest::Get;
  77. DTCVersionInfo vi;
  78. // tablename & hash
  79. vi.set_table_name(tdef->table_name());
  80. vi.set_table_hash(tdef->table_hash());
  81. vi.set_serial_nr(sn);
  82. // app version
  83. vi.set_tag(5, "dtcd");
  84. // lib version
  85. vi.set_tag(6, "ctlib-v" DTC_VERSION);
  86. vi.set_tag(9, tdef->field_type(0));
  87. DTCRequestInfo ri;
  88. // key
  89. ri.set_key(DTCValue::Make(0));
  90. // ri.set_timeout(30);
  91. // field set
  92. char fs[4] = { 1, 0, 0, char(0xFF) };
  93. /* calculate version info */
  94. header.len[DRequest::Section::VersionInfo] =
  95. encoded_bytes_simple_section(vi, DField::None);
  96. /* no table definition */
  97. header.len[DRequest::Section::table_definition] = 0;
  98. /* encode request info */
  99. header.len[DRequest::Section::RequestInfo] =
  100. encoded_bytes_simple_section(ri, tdef->key_type());
  101. /* no result info */
  102. header.len[DRequest::Section::ResultInfo] = 0;
  103. /* encode update info */
  104. header.len[DRequest::Section::UpdateInfo] = 0;
  105. /* encode condition info */
  106. header.len[DRequest::Section::ConditionInfo] = 0;
  107. /* full set */
  108. header.len[DRequest::Section::FieldSet] = 4;
  109. /* no result set */
  110. header.len[DRequest::Section::DTCResultSet] = 0;
  111. bytes = encode_header_v1(header);
  112. const int len = bytes;
  113. /* exist and large enough, use. else free and malloc */
  114. int total_len = sizeof(BufferChain) + sizeof(struct iovec) + len;
  115. if (buf == NULL) {
  116. buf = (BufferChain *)MALLOC(total_len);
  117. if (buf == NULL) {
  118. return -ENOMEM;
  119. }
  120. buf->totalBytes = total_len - sizeof(BufferChain);
  121. } else if (buf &&
  122. buf->totalBytes < (int)(total_len - sizeof(BufferChain))) {
  123. FREE_IF(buf);
  124. buf = (BufferChain *)MALLOC(total_len);
  125. if (buf == NULL) {
  126. return -ENOMEM;
  127. }
  128. buf->totalBytes = total_len - sizeof(BufferChain);
  129. }
  130. /* usedBtytes never used for Packet's buf */
  131. buf->nextBuffer = NULL;
  132. v = (struct iovec *)buf->data;
  133. nv = 1;
  134. char *p = buf->data + sizeof(struct iovec);
  135. v->iov_base = p;
  136. v->iov_len = len;
  137. memcpy(p, &header, sizeof(header));
  138. p += sizeof(header);
  139. p = encode_simple_section(p, vi, DField::None);
  140. p = encode_simple_section(p, ri, tdef->key_type());
  141. // encode field set
  142. memcpy(p, fs, 4);
  143. p += 4;
  144. if (p - (char *)v->iov_base != len)
  145. fprintf(stderr, "%s(%d): BAD ENCODER len=%ld must=%d\n",
  146. __FILE__, __LINE__, (long)(p - (char *)v->iov_base),
  147. len);
  148. return 0;
  149. }
  150. int Packet::encode_reload_config(const DTCTableDefinition *tdef, int sn)
  151. {
  152. DTC_HEADER_V1 header;
  153. header.version = 1;
  154. header.scts = 8;
  155. header.flags = DRequest::Flag::KeepAlive;
  156. header.cmd = DRequest::ReloadConfig;
  157. DTCVersionInfo vi;
  158. // tablename & hash
  159. vi.set_table_name(tdef->table_name());
  160. vi.set_table_hash(tdef->table_hash());
  161. vi.set_serial_nr(sn);
  162. // app version
  163. vi.set_tag(5, "dtcd");
  164. // lib version
  165. vi.set_tag(6, "ctlib-v" DTC_VERSION);
  166. vi.set_tag(9, tdef->field_type(0));
  167. DTCRequestInfo ri;
  168. // key
  169. ri.set_key(DTCValue::Make(0));
  170. // ri.set_timeout(30);
  171. // field set
  172. char fs[4] = { 1, 0, 0, char(0xFF) };
  173. /* calculate version info */
  174. header.len[DRequest::Section::VersionInfo] =
  175. encoded_bytes_simple_section(vi, DField::None);
  176. /* no table definition */
  177. header.len[DRequest::Section::table_definition] = 0;
  178. /* encode request info */
  179. header.len[DRequest::Section::RequestInfo] =
  180. encoded_bytes_simple_section(ri, tdef->key_type());
  181. /* no result info */
  182. header.len[DRequest::Section::ResultInfo] = 0;
  183. /* encode update info */
  184. header.len[DRequest::Section::UpdateInfo] = 0;
  185. /* encode condition info */
  186. header.len[DRequest::Section::ConditionInfo] = 0;
  187. /* full set */
  188. header.len[DRequest::Section::FieldSet] = 4;
  189. /* no result set */
  190. header.len[DRequest::Section::DTCResultSet] = 0;
  191. bytes = encode_header_v1(header);
  192. const int len = bytes;
  193. /* pool, exist and large enough, use. else free and malloc */
  194. int total_len = sizeof(BufferChain) + sizeof(struct iovec) + len;
  195. if (buf == NULL) {
  196. buf = (BufferChain *)MALLOC(total_len);
  197. if (buf == NULL) {
  198. return -ENOMEM;
  199. }
  200. buf->totalBytes = total_len - sizeof(BufferChain);
  201. } else if (buf &&
  202. buf->totalBytes < (int)(total_len - sizeof(BufferChain))) {
  203. FREE_IF(buf);
  204. buf = (BufferChain *)MALLOC(total_len);
  205. if (buf == NULL) {
  206. return -ENOMEM;
  207. }
  208. buf->totalBytes = total_len - sizeof(BufferChain);
  209. }
  210. /* usedBtytes never used for Packet's buf */
  211. buf->nextBuffer = NULL;
  212. v = (struct iovec *)buf->data;
  213. nv = 1;
  214. char *p = buf->data + sizeof(struct iovec);
  215. v->iov_base = p;
  216. v->iov_len = len;
  217. memcpy(p, &header, sizeof(header));
  218. p += sizeof(header);
  219. p = encode_simple_section(p, vi, DField::None);
  220. p = encode_simple_section(p, ri, tdef->key_type());
  221. // encode field set
  222. memcpy(p, fs, 4);
  223. p += 4;
  224. if (p - (char *)v->iov_base != len)
  225. fprintf(stderr, "%s(%d): BAD ENCODER len=%ld must=%d\n",
  226. __FILE__, __LINE__, (long)(p - (char *)v->iov_base),
  227. len);
  228. return 0;
  229. }
  230. static char *EncodeBinary(char *p, const char *src, int len)
  231. {
  232. if (len)
  233. memcpy(p, src, len);
  234. return p + len;
  235. }
  236. static char *EncodeBinary(char *p, const DTCBinary &b)
  237. {
  238. return EncodeBinary(p, b.ptr, b.len);
  239. }
  240. int Packet::encode_fetch_data(DTCJobOperation &job)
  241. {
  242. const DTCTableDefinition *tdef = job.table_definition();
  243. DTC_HEADER_V1 header;
  244. header.version = 1;
  245. header.scts = 8;
  246. header.flags = DRequest::Flag::KeepAlive;
  247. header.cmd = job.flag_fetch_data() ? DRequest::Get : job.request_code();
  248. // save & remove limit information
  249. uint32_t limitStart = job.requestInfo.limit_start();
  250. uint32_t limitCount = job.requestInfo.limit_count();
  251. if (job.request_code() != DRequest::Replicate) {
  252. job.requestInfo.set_limit_start(0);
  253. job.requestInfo.set_limit_count(0);
  254. }
  255. /* calculate version info */
  256. header.len[DRequest::Section::VersionInfo] =
  257. encoded_bytes_simple_section(job.versionInfo, DField::None);
  258. /* no table definition */
  259. header.len[DRequest::Section::table_definition] = 0;
  260. /* encode request info */
  261. header.len[DRequest::Section::RequestInfo] =
  262. encoded_bytes_simple_section(job.requestInfo, tdef->key_type());
  263. /* no result info */
  264. header.len[DRequest::Section::ResultInfo] = 0;
  265. /* no update info */
  266. header.len[DRequest::Section::UpdateInfo] = 0;
  267. /* encode condition info */
  268. header.len[DRequest::Section::ConditionInfo] = encoded_bytes_multi_key(
  269. job.multi_key_array(), job.table_definition());
  270. /* full set */
  271. header.len[DRequest::Section::FieldSet] =
  272. tdef->packed_field_set(job.flag_field_set_with_key()).len;
  273. /* no result set */
  274. header.len[DRequest::Section::DTCResultSet] = 0;
  275. bytes = encode_header_v1(header);
  276. const int len = bytes;
  277. /* pool, exist and large enough, use. else free and malloc */
  278. int total_len = sizeof(BufferChain) + sizeof(struct iovec) + len;
  279. if (buf == NULL) {
  280. buf = (BufferChain *)MALLOC(total_len);
  281. if (buf == NULL) {
  282. return -ENOMEM;
  283. }
  284. buf->totalBytes = total_len - sizeof(BufferChain);
  285. } else if (buf &&
  286. buf->totalBytes < total_len - (int)sizeof(BufferChain)) {
  287. FREE_IF(buf);
  288. buf = (BufferChain *)MALLOC(total_len);
  289. if (buf == NULL) {
  290. return -ENOMEM;
  291. }
  292. buf->totalBytes = total_len - sizeof(BufferChain);
  293. }
  294. buf->nextBuffer = NULL;
  295. v = (struct iovec *)buf->data;
  296. nv = 1;
  297. char *p = buf->data + sizeof(struct iovec);
  298. v->iov_base = p;
  299. v->iov_len = len;
  300. memcpy(p, &header, sizeof(header));
  301. p += sizeof(header);
  302. p = encode_simple_section(p, job.versionInfo, DField::None);
  303. p = encode_simple_section(p, job.requestInfo, tdef->key_type());
  304. // restore limit info
  305. job.requestInfo.set_limit_start(limitStart);
  306. job.requestInfo.set_limit_count(limitCount);
  307. p = encode_multi_key(p, job.multi_key_array(), job.table_definition());
  308. p = EncodeBinary(p,
  309. tdef->packed_field_set(job.flag_field_set_with_key()));
  310. if (p - (char *)v->iov_base != len)
  311. fprintf(stderr, "%s(%d): BAD ENCODER len=%ld must=%d\n",
  312. __FILE__, __LINE__, (long)(p - (char *)v->iov_base),
  313. len);
  314. return 0;
  315. }
  316. int Packet::encode_pass_thru(DtcJob &job)
  317. {
  318. const DTCTableDefinition *tdef = job.table_definition();
  319. DTC_HEADER_V1 header;
  320. header.version = 1;
  321. header.scts = 8;
  322. header.flags = DRequest::Flag::KeepAlive;
  323. header.cmd = job.request_code();
  324. /* calculate version info */
  325. header.len[DRequest::Section::VersionInfo] =
  326. encoded_bytes_simple_section(job.versionInfo, DField::None);
  327. /* no table definition */
  328. header.len[DRequest::Section::table_definition] = 0;
  329. /* encode request info */
  330. header.len[DRequest::Section::RequestInfo] =
  331. encoded_bytes_simple_section(job.requestInfo, tdef->key_type());
  332. /* no result info */
  333. header.len[DRequest::Section::ResultInfo] = 0;
  334. /* encode update info */
  335. header.len[DRequest::Section::UpdateInfo] =
  336. job.request_operation() ?
  337. encoded_bytes_field_value(*job.request_operation()) :
  338. 0;
  339. /* encode condition info */
  340. header.len[DRequest::Section::ConditionInfo] =
  341. job.request_condition() ?
  342. encoded_bytes_field_value(*job.request_condition()) :
  343. 0;
  344. /* full set */
  345. header.len[DRequest::Section::FieldSet] =
  346. job.request_fields() ?
  347. encoded_bytes_field_set(*job.request_fields()) :
  348. 0;
  349. /* no result set */
  350. header.len[DRequest::Section::DTCResultSet] = 0;
  351. bytes = encode_header_v1(header);
  352. const int len = bytes;
  353. /* pool, exist and large enough, use. else free and malloc */
  354. int total_len = sizeof(BufferChain) + sizeof(struct iovec) + len;
  355. if (buf == NULL) {
  356. buf = (BufferChain *)MALLOC(total_len);
  357. if (buf == NULL) {
  358. return -ENOMEM;
  359. }
  360. buf->totalBytes = total_len - sizeof(BufferChain);
  361. } else if (buf &&
  362. buf->totalBytes < total_len - (int)sizeof(BufferChain)) {
  363. FREE_IF(buf);
  364. buf = (BufferChain *)MALLOC(total_len);
  365. if (buf == NULL) {
  366. return -ENOMEM;
  367. }
  368. buf->totalBytes = total_len - sizeof(BufferChain);
  369. }
  370. buf->nextBuffer = NULL;
  371. v = (struct iovec *)buf->data;
  372. nv = 1;
  373. char *p = buf->data + sizeof(struct iovec);
  374. v->iov_base = p;
  375. v->iov_len = len;
  376. memcpy(p, &header, sizeof(header));
  377. p += sizeof(header);
  378. p = encode_simple_section(p, job.versionInfo, DField::None);
  379. p = encode_simple_section(p, job.requestInfo, tdef->key_type());
  380. if (job.request_operation())
  381. p = encode_field_value(p, *job.request_operation());
  382. if (job.request_condition())
  383. p = encode_field_value(p, *job.request_condition());
  384. if (job.request_fields())
  385. p = encode_field_set(p, *job.request_fields());
  386. if (p - (char *)v->iov_base != len)
  387. fprintf(stderr, "%s(%d): BAD ENCODER len=%ld must=%d\n",
  388. __FILE__, __LINE__, (long)(p - (char *)v->iov_base),
  389. len);
  390. return 0;
  391. }
  392. int Packet::encode_forward_request(DTCJobOperation &job)
  393. {
  394. if (job.flag_pass_thru())
  395. return encode_pass_thru(job);
  396. if (job.flag_fetch_data())
  397. return encode_fetch_data(job);
  398. if (job.request_code() == DRequest::Get ||
  399. job.request_code() == DRequest::Replicate)
  400. return encode_fetch_data(job);
  401. return encode_pass_thru(job);
  402. }
  403. int Packet::encode_result(DtcJob &job, int mtu, uint32_t ts)
  404. {
  405. const DTCTableDefinition *tdef = job.table_definition();
  406. // rp指向返回数据集
  407. ResultPacket *rp =
  408. job.result_code() >= 0 ? job.get_result_packet() : NULL;
  409. BufferChain *rb = NULL;
  410. int nrp = 0, lrp = 0, off = 0;
  411. if (mtu <= 0) {
  412. mtu = MAXPACKETSIZE;
  413. }
  414. /* rp may exist but no result */
  415. if (rp && (rp->numRows || rp->totalRows)) {
  416. //rb指向数据结果集缓冲区起始位置
  417. rb = rp->bc;
  418. if (rb)
  419. rb->Count(nrp, lrp);
  420. off = 5 - encoded_bytes_length(rp->numRows);
  421. encode_length(rb->data + off, rp->numRows);
  422. lrp -= off;
  423. job.resultInfo.set_total_rows(rp->totalRows);
  424. } else {
  425. if (rp && rp->totalRows == 0 && rp->bc) {
  426. FREE(rp->bc);
  427. rp->bc = NULL;
  428. }
  429. job.resultInfo.set_total_rows(0);
  430. if (job.result_code() == 0) {
  431. job.set_error(0, NULL, NULL);
  432. }
  433. //任务出现错误的时候,可能结果集里面还有值,此时需要将结果集的buffer释放掉
  434. else if (job.result_code() < 0) {
  435. ResultPacket *resultPacket = job.get_result_packet();
  436. if (resultPacket) {
  437. if (resultPacket->bc) {
  438. FREE(resultPacket->bc);
  439. resultPacket->bc = NULL;
  440. }
  441. }
  442. }
  443. }
  444. if (ts) {
  445. job.resultInfo.set_time_info(ts);
  446. }
  447. job.versionInfo.set_serial_nr(job.request_serial());
  448. job.versionInfo.set_tag(6, "ctlib-v" DTC_VERSION);
  449. if (job.result_key() == NULL && job.request_key() != NULL)
  450. job.set_result_key(*job.request_key());
  451. DTC_HEADER_V1 header;
  452. header.version = 1;
  453. header.scts = 8;
  454. header.flags = DRequest::Flag::KeepAlive | job.flag_multi_key_val();
  455. /* rp may exist but no result */
  456. header.cmd = (rp && (rp->numRows || rp->totalRows)) ?
  457. DRequest::DTCResultSet :
  458. DRequest::result_code;
  459. /* calculate version info */
  460. header.len[DRequest::Section::VersionInfo] =
  461. encoded_bytes_simple_section(job.versionInfo, DField::None);
  462. /* copy table definition */
  463. header.len[DRequest::Section::table_definition] =
  464. job.flag_table_definition() ? tdef->packed_definition().len : 0;
  465. /* no request info */
  466. header.len[DRequest::Section::RequestInfo] = 0;
  467. /* calculate result info */
  468. header.len[DRequest::Section::ResultInfo] =
  469. encoded_bytes_simple_section(job.resultInfo,
  470. tdef->field_type(0));
  471. /* no update info */
  472. header.len[DRequest::Section::UpdateInfo] = 0;
  473. /* no condition info */
  474. header.len[DRequest::Section::ConditionInfo] = 0;
  475. /* no field set */
  476. header.len[DRequest::Section::FieldSet] = 0;
  477. /* copy result set */
  478. header.len[DRequest::Section::DTCResultSet] = lrp;
  479. bytes = encode_header_v1(header);
  480. if (bytes > mtu) {
  481. /* clear result set */
  482. nrp = 0;
  483. lrp = 0;
  484. rb = NULL;
  485. rp = NULL;
  486. /* set message size error */
  487. job.set_error(
  488. -EMSGSIZE, "encode_result",
  489. "encoded result exceed the maximum network packet size");
  490. /* re-encode resultinfo */
  491. header.len[DRequest::Section::ResultInfo] =
  492. encoded_bytes_simple_section(job.resultInfo,
  493. tdef->field_type(0));
  494. header.cmd = DRequest::result_code;
  495. header.len[DRequest::Section::DTCResultSet] = 0;
  496. /* FIXME: only work in LITTLE ENDIAN machine */
  497. bytes = encode_header_v1(header);
  498. }
  499. //non-result packet len
  500. const int len = bytes - lrp;
  501. /* pool, exist and large enough, use. else free and malloc */
  502. int total_len =
  503. sizeof(BufferChain) + sizeof(struct iovec) * (nrp + 1) + len;
  504. if (buf == NULL) {
  505. buf = (BufferChain *)MALLOC(total_len);
  506. if (buf == NULL) {
  507. return -ENOMEM;
  508. }
  509. buf->totalBytes = total_len - sizeof(BufferChain);
  510. } else if (buf &&
  511. buf->totalBytes < total_len - (int)sizeof(BufferChain)) {
  512. FREE_IF(buf);
  513. buf = (BufferChain *)MALLOC(total_len);
  514. if (buf == NULL) {
  515. return -ENOMEM;
  516. }
  517. buf->totalBytes = total_len - sizeof(BufferChain);
  518. }
  519. //发送实际数据集
  520. buf->nextBuffer = nrp ? rb : NULL;
  521. v = (struct iovec *)buf->data;
  522. char *p = buf->data + sizeof(struct iovec) * (nrp + 1);
  523. v->iov_base = p;
  524. v->iov_len = len;
  525. nv = nrp + 1;
  526. for (int i = 1; i <= nrp; i++, rb = rb->nextBuffer) {
  527. v[i].iov_base = rb->data + off;
  528. v[i].iov_len = rb->usedBytes - off;
  529. off = 0;
  530. }
  531. memcpy(p, &header, sizeof(header));
  532. p += sizeof(header);
  533. p = encode_simple_section(p, job.versionInfo, DField::None);
  534. if (job.flag_table_definition())
  535. p = EncodeBinary(p, tdef->packed_definition());
  536. p = encode_simple_section(p, job.resultInfo, tdef->field_type(0));
  537. if (p - (char *)v->iov_base != len)
  538. fprintf(stderr, "%s(%d): BAD ENCODER len=%ld must=%d\n",
  539. __FILE__, __LINE__, (long)(p - (char *)v->iov_base),
  540. len);
  541. return 0;
  542. }
  543. void encode_mysql_header(BufferChain *r, int len, uint8_t pkt_num)
  544. {
  545. //Packet Lenght + Packet Number
  546. char t[3];
  547. int_conv_3(t, len);
  548. memcpy(r->data, t, 3);
  549. *(r->data + 3) = pkt_num;
  550. }
  551. int encode_my_fileds_info(BufferChain **bc, uint8_t& pkt_num, uint8_t fields_num)
  552. {
  553. int packet_len = sizeof(BufferChain) + sizeof(MYSQL_HEADER_SIZE) +
  554. sizeof(fields_num);
  555. *bc = (BufferChain *)MALLOC(packet_len);
  556. BufferChain *r = *bc;
  557. if (r == NULL) {
  558. return -ENOMEM;
  559. }
  560. r->totalBytes = packet_len - sizeof(BufferChain);
  561. encode_mysql_header(r, 1, pkt_num++);
  562. *(r->data + sizeof(MYSQL_HEADER_SIZE)) = fields_num;
  563. r->usedBytes = 5;
  564. r->nextBuffer = NULL;
  565. return 0;
  566. }
  567. struct my_result_set_field {
  568. std::string catalog;
  569. std::string database;
  570. std::string table;
  571. std::string original_table;
  572. std::string name;
  573. std::string original_name;
  574. uint16_t charset_number;
  575. uint32_t length;
  576. uchar type;
  577. uint16_t flags;
  578. uchar decimals;
  579. uint16_t reverse;
  580. };
  581. int encode_set_field(char *buf, my_result_set_field *sf)
  582. {
  583. int len = 0;
  584. char *p = buf;
  585. len = sf->catalog.length();
  586. *p++ = (uint8_t)len;
  587. if (len > 0) {
  588. memcpy(p, sf->catalog.c_str(), len);
  589. p += len;
  590. }
  591. len = sf->database.length();
  592. *p++ = (uint8_t)len;
  593. if (len > 0) {
  594. memcpy(p, sf->database.c_str(), len);
  595. p += len;
  596. }
  597. len = sf->table.length();
  598. *p++ = (uint8_t)len;
  599. if (len > 0) {
  600. memcpy(p, sf->table.c_str(), len);
  601. p += len;
  602. }
  603. len = sf->original_table.length();
  604. *p++ = (uint8_t)len;
  605. if (len > 0) {
  606. memcpy(p, sf->original_table.c_str(), len);
  607. p += len;
  608. }
  609. len = sf->name.length();
  610. *p++ = (uint8_t)len;
  611. if (len > 0) {
  612. memcpy(p, sf->name.c_str(), len);
  613. p += len;
  614. }
  615. len = sf->original_name.length();
  616. *p++ = (uint8_t)len;
  617. if (len > 0) {
  618. memcpy(p, sf->original_name.c_str(), len);
  619. p += len;
  620. }
  621. //charset number
  622. *p++ = 0x0c;
  623. int2store_big_endian(p, sf->charset_number);
  624. p += sizeof(sf->charset_number);
  625. //length
  626. int4store_big_endian(p, sf->length);
  627. p += sizeof(sf->length);
  628. //type
  629. *p = sf->type;
  630. p += sizeof(sf->type);
  631. //flags
  632. int2store_big_endian(p, sf->flags);
  633. p += sizeof(sf->flags);
  634. //decimals
  635. *p = sf->decimals;
  636. p += sizeof(sf->decimals);
  637. //reverse
  638. *p = sf->reverse;
  639. p += sizeof(sf->reverse);
  640. return p - buf;
  641. }
  642. int calc_field_def(my_result_set_field *sf)
  643. {
  644. int len = 0;
  645. len++;
  646. len += sf->catalog.length();
  647. len++;
  648. len += sf->database.length();
  649. len++;
  650. len += sf->table.length();
  651. len++;
  652. len += sf->original_table.length();
  653. len++;
  654. len += sf->name.length();
  655. len++;
  656. len += sf->original_name.length();
  657. //charset number
  658. len++; //0x0c
  659. len += sizeof(sf->charset_number);
  660. //length
  661. len += sizeof(sf->length);
  662. //type
  663. len += sizeof(sf->type);
  664. //flag
  665. len += sizeof(sf->flags);
  666. //decimals
  667. len += sizeof(sf->decimals);
  668. //reverse
  669. len += sizeof(sf->reverse);
  670. log4cplus_debug("sf len:%d", len);
  671. return len;
  672. }
  673. int build_field_type(int type)
  674. {
  675. switch (type) {
  676. case DField::Signed:
  677. return MYSQL_TYPE_LONG;
  678. case DField::Unsigned:
  679. return MYSQL_TYPE_LONG;
  680. case DField::Float:
  681. return MYSQL_TYPE_FLOAT;
  682. case DField::String:
  683. return MYSQL_TYPE_VAR_STRING;
  684. case DField::Binary:
  685. return MYSQL_TYPE_VAR_STRING;
  686. }
  687. }
  688. uint16_t build_charset(int type)
  689. {
  690. switch (type) {
  691. case MYSQL_TYPE_VAR_STRING:
  692. return 0xff; //utf8mb4
  693. break;
  694. case MYSQL_TYPE_LONG:
  695. case MYSQL_TYPE_FLOAT:
  696. default:
  697. return 0x3f; //binary COLLATE binary
  698. }
  699. }
  700. uint16_t build_length(int type)
  701. {
  702. switch (type) {
  703. case MYSQL_TYPE_VAR_STRING:
  704. return 200;
  705. break;
  706. case MYSQL_TYPE_LONG:
  707. case MYSQL_TYPE_FLOAT:
  708. default:
  709. return 11;
  710. }
  711. }
  712. BufferChain *encode_field_def(DtcJob *job, BufferChain *bc, uint8_t& pkt_num)
  713. {
  714. const DTCTableDefinition *tdef = job->table_definition();
  715. BufferChain *nbc = bc;
  716. BufferChain *r = NULL;
  717. std::vector<std::string> need = job->mr.get_need_array();
  718. for (int i = 0; i < need.size(); i++) {
  719. my_result_set_field sf;
  720. sf.type = build_field_type(
  721. job->field_type(job->field_id(need[i].c_str())));
  722. sf.charset_number = build_charset(sf.type);
  723. sf.database = "dtc";
  724. sf.length = build_length(sf.type);
  725. sf.catalog = "def";
  726. sf.table = job->table_name();
  727. sf.original_table = job->table_name();
  728. sf.name = need[i];
  729. sf.original_name = need[i];
  730. sf.decimals = 0x00;
  731. sf.flags = 0x0;
  732. sf.reverse = 0x0000;
  733. int packet_len = sizeof(BufferChain) + calc_field_def(&sf) +
  734. sizeof(MYSQL_HEADER_SIZE);
  735. r = (BufferChain *)MALLOC(packet_len);
  736. if (r == NULL) {
  737. return NULL;
  738. }
  739. memset(r, 0, packet_len);
  740. r->totalBytes = packet_len - sizeof(BufferChain);
  741. int set_len = encode_set_field(
  742. r->data + sizeof(MYSQL_HEADER_SIZE), &sf);
  743. log4cplus_debug("set_len:%d", set_len);
  744. r->usedBytes = sizeof(MYSQL_HEADER_SIZE) + set_len;
  745. r->nextBuffer = NULL;
  746. encode_mysql_header(r, set_len, pkt_num++);
  747. nbc->nextBuffer = r;
  748. nbc = nbc->nextBuffer;
  749. }
  750. return nbc;
  751. }
  752. #pragma pack(1)
  753. struct my_result_set_eof {
  754. uchar eof;
  755. uint16_t warning;
  756. uint16_t server_status;
  757. uint16_t reverse;
  758. };
  759. #pragma pack()
  760. BufferChain *encode_eof(BufferChain *bc, uint8_t pkt_nr)
  761. {
  762. BufferChain *nbc = bc;
  763. my_result_set_eof eof;
  764. eof.eof = 0xfe;
  765. eof.warning = 0;
  766. eof.server_status = 0x0022;
  767. eof.reverse = 0;
  768. int packet_len =
  769. sizeof(BufferChain) + sizeof(eof) + sizeof(MYSQL_HEADER_SIZE);
  770. BufferChain *r = (BufferChain *)MALLOC(packet_len);
  771. if (r == NULL) {
  772. return NULL;
  773. }
  774. r->totalBytes = packet_len - sizeof(BufferChain);
  775. memcpy(r->data + sizeof(MYSQL_HEADER_SIZE), &eof, sizeof(eof));
  776. r->usedBytes = sizeof(MYSQL_HEADER_SIZE) + sizeof(eof);
  777. r->nextBuffer = NULL;
  778. encode_mysql_header(r, sizeof(eof), pkt_nr);
  779. nbc->nextBuffer = r;
  780. nbc = nbc->nextBuffer;
  781. return nbc;
  782. }
  783. BufferChain *encode_row_data(DtcJob *job, BufferChain *bc, uint8_t &pkt_nr)
  784. {
  785. ResultSet *pstResultSet = job->result;
  786. int count = 0;
  787. BufferChain *nbc = bc;
  788. std::vector<std::string> result_field = job->mr.get_need_array();
  789. const DTCTableDefinition *tdef = job->table_definition();
  790. if (pstResultSet == NULL)
  791. return NULL;
  792. for (int i = 0; i < pstResultSet->total_rows(); i++) {
  793. char buf[32] = { 0 };
  794. RowValue *pstRow = pstResultSet->_fetch_row();
  795. if (pstRow == NULL) {
  796. log4cplus_info("%s!", "call FetchRow func error");
  797. continue;
  798. }
  799. //calc current row len
  800. int row_len = 0;
  801. for (int j = 0; j < result_field.size(); j++) {
  802. int id = tdef->field_id(result_field[j].c_str());
  803. DTCValue* v;
  804. if (0 == id) {
  805. v = job->request_key();
  806. } else {
  807. v = pstRow->field_value(id);
  808. }
  809. int field_type = pstRow->field_type(id);
  810. switch (field_type) {
  811. case DField::Signed: {
  812. row_len++; //first byte for result len
  813. snprintf(buf, sizeof(buf), "%lld",
  814. (long long)v->s64);
  815. row_len += strlen(buf);
  816. break;
  817. }
  818. case DField::Unsigned: {
  819. row_len++; //first byte for result len
  820. snprintf(buf, sizeof(buf), "%llu",
  821. (unsigned long long)v->u64);
  822. row_len += strlen(buf);
  823. break;
  824. }
  825. case DField::Float: {
  826. row_len++; //first byte for result len
  827. snprintf(buf, sizeof(buf), "%f", v->flt);
  828. row_len += strlen(buf);
  829. break;
  830. }
  831. case DField::String:
  832. case DField::Binary: {
  833. row_len++;
  834. row_len += v->str.len;
  835. break;
  836. }
  837. default:
  838. break;
  839. }
  840. }
  841. //alloc new buffer to store row data.
  842. int packet_len = sizeof(BufferChain) +
  843. sizeof(MYSQL_HEADER_SIZE) + row_len;
  844. BufferChain *nbuff = (BufferChain *)MALLOC(packet_len);
  845. if (nbuff == NULL) {
  846. return NULL;
  847. }
  848. nbuff->totalBytes = packet_len - sizeof(BufferChain);
  849. nbuff->usedBytes = sizeof(MYSQL_HEADER_SIZE) + row_len;
  850. nbuff->nextBuffer = NULL;
  851. char *r = nbuff->data;
  852. encode_mysql_header(nbuff, row_len, pkt_nr++);
  853. int offset = 0;
  854. offset += sizeof(MYSQL_HEADER_SIZE);
  855. //copy fields content
  856. for (int j = 0; j < result_field.size(); j++) {
  857. int id = tdef->field_id(result_field[j].c_str());
  858. DTCValue* v;
  859. if (0 == id) {
  860. v = job->request_key();
  861. } else {
  862. v = pstRow->field_value(id);
  863. }
  864. int field_type = pstRow->field_type(id);
  865. int num_len = 0;
  866. switch (field_type) {
  867. case DField::Signed: {
  868. snprintf(buf, sizeof(buf), "%lld",
  869. (long long)v->s64);
  870. num_len = strlen(buf);
  871. *(r + offset) = (uint8_t)num_len;
  872. offset++;
  873. memcpy(r + offset, buf, num_len);
  874. offset += num_len;
  875. break;
  876. }
  877. case DField::Unsigned: {
  878. snprintf(buf, sizeof(buf), "%llu",
  879. (unsigned long long)v->u64);
  880. num_len = strlen(buf);
  881. *(r + offset) = (uint8_t)strlen(buf);
  882. offset++;
  883. memcpy(r + offset, buf, num_len);
  884. offset += num_len;
  885. break;
  886. }
  887. case DField::Float: {
  888. snprintf(buf, sizeof(buf), "%f", v->flt);
  889. num_len = strlen(buf);
  890. *(r + offset) = (uint8_t)strlen(buf);
  891. offset++;
  892. memcpy(r + offset, buf, num_len);
  893. offset += num_len;
  894. break;
  895. }
  896. case DField::String: {
  897. *(r + offset) = (uint8_t)v->str.len;
  898. offset++;
  899. memcpy(r + offset, v->str.ptr, v->str.len);
  900. offset += v->str.len;
  901. break;
  902. }
  903. case DField::Binary: {
  904. *(r + offset) = (uint8_t)v->bin.len;
  905. offset++;
  906. memcpy(r + offset, v->bin.ptr, v->bin.len);
  907. offset += v->bin.len;
  908. break;
  909. }
  910. default:
  911. break;
  912. }
  913. }
  914. nbc->nextBuffer = nbuff;
  915. nbc = nbc->nextBuffer;
  916. }
  917. return nbc;
  918. }
  919. BufferChain *Packet::encode_mysql_ok(DtcJob *job, int affected_rows)
  920. {
  921. BufferChain *bc = NULL;
  922. BufferChain *pos = NULL;
  923. uint8_t buf[7] = {0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00};
  924. int2store_big_endian(buf+ 1, affected_rows);
  925. uint8_t pkt_nr = job->mr.get_pkt_nr();
  926. pkt_nr++;
  927. int packet_len = sizeof(BufferChain) + sizeof(MYSQL_HEADER_SIZE) +
  928. sizeof(buf);
  929. bc = (BufferChain *)MALLOC(packet_len);
  930. BufferChain *r = bc;
  931. if (r == NULL) {
  932. return -ENOMEM;
  933. }
  934. r->totalBytes = packet_len - sizeof(BufferChain);
  935. encode_mysql_header(r, sizeof(buf), pkt_nr);
  936. memcpy(r->data + sizeof(MYSQL_HEADER_SIZE), buf, sizeof(buf));
  937. r->usedBytes = sizeof(buf) + sizeof(MYSQL_HEADER_SIZE);
  938. r->nextBuffer = NULL;
  939. return bc;
  940. }
  941. BufferChain *Packet::encode_mysql_protocol(DtcJob *job)
  942. {
  943. BufferChain *bc = NULL;
  944. BufferChain *pos = NULL;
  945. uint8_t pkt_nr = job->mr.get_pkt_nr();
  946. pkt_nr++;
  947. int ret = encode_my_fileds_info(&bc, pkt_nr,
  948. job->mr.get_need_num_fields());
  949. if (ret < 0)
  950. return NULL;
  951. pos = encode_field_def(job, bc, pkt_nr);
  952. if (!pos)
  953. return NULL;
  954. //Different MYSQL Version.
  955. //pos = encode_eof(pos, ++pkt_nr);
  956. //if (!pos)
  957. // return NULL;
  958. BufferChain *prow = encode_row_data(job, pos, pkt_nr);
  959. if (prow) {
  960. pos = prow;
  961. }
  962. pos = encode_eof(pos, pkt_nr);
  963. if (!pos)
  964. return NULL;
  965. return bc;
  966. }
  967. int net_send_ok(int affectedRow)
  968. {
  969. uint8_t buf[100] = { 0x00, (uint8_t)affectedRow, 0x00, 0x02, 0x00, 0x00,
  970. 0x00 };
  971. }
  972. int is_desc_tables(DtcJob *job , char*& p_filepath)
  973. {
  974. std::string sql = job->mr.get_sql();
  975. if (sql.empty()) {
  976. sql = std::string(job->mr.raw , job->mr.raw_len);
  977. }
  978. log4cplus_debug("req sql:%s" , sql.c_str());
  979. uint32_t ui_size = (sizeof(meta_selections) / sizeof(MetaSelections));
  980. for (int i = 0; i < ui_size; i++) {
  981. if (sql == string(meta_selections[i].p_req_string)) {
  982. p_filepath = meta_selections[i].p_val;
  983. return meta_selections[i].i_select_type;
  984. }
  985. }
  986. return E_SELECT_NONE;
  987. }
  988. int Packet::greeting_result()
  989. {
  990. log4cplus_debug("greeting_result entry.");
  991. uint8_t greeting_info[78] = {0x4a, 0x00, 0x00, 0x00, 0x0a, 0x38, 0x2e, 0x30, 0x2e, 0x32, 0x36, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x6f, 0x3c, 0x36, 0x36, 0x03, 0x68, 0x38, 0x46, 0x00, 0xff, 0xf7, 0xff, 0x02, 0x00, 0xff, 0x8f, 0x15, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x6a, 0x2a, 0x0a, 0x60, 0x5c, 0x68, 0x50, 0x34, 0x6b, 0x0e, 0x27, 0x73, 0x00, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x5f, 0x6e, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x00};
  992. nv = 1;
  993. int content_len = 78;
  994. int packet_len = sizeof(BufferChain) + sizeof(struct iovec) +
  995. content_len + 2;
  996. if (buf == NULL) {
  997. buf = (BufferChain *)MALLOC(packet_len);
  998. if (buf == NULL) {
  999. return -ENOMEM;
  1000. }
  1001. buf->totalBytes = packet_len - sizeof(BufferChain);
  1002. buf->nextBuffer = NULL;
  1003. } else if (buf &&
  1004. packet_len - (int)sizeof(BufferChain) > buf->totalBytes) {
  1005. FREE_IF(buf);
  1006. buf = (BufferChain *)MALLOC(packet_len);
  1007. if (buf == NULL) {
  1008. return -ENOMEM;
  1009. }
  1010. buf->totalBytes = packet_len - sizeof(BufferChain);
  1011. buf->nextBuffer = NULL;
  1012. }
  1013. char *p = buf->data + sizeof(struct iovec);
  1014. v = (struct iovec *)buf->data;
  1015. v->iov_base = p;
  1016. v->iov_len = content_len;
  1017. memcpy(p, greeting_info, content_len);
  1018. log4cplus_debug("greeting_result leave.");
  1019. return 0;
  1020. }
  1021. int Packet::desc_tables_result(DtcJob *job)
  1022. {
  1023. log4cplus_debug("desc_tables_result entry.");
  1024. const DTCTableDefinition *tdef = job->table_definition();
  1025. DTC_HEADER_V2 header = { 0 };
  1026. nv = 1;
  1027. int content_len = strlen(tdef->field_name(0));
  1028. int packet_len = sizeof(BufferChain) + sizeof(struct iovec) +
  1029. sizeof(header) + content_len + 2;
  1030. header.version = 2;
  1031. header.id = job->request_peerid();
  1032. header.packet_len = packet_len;
  1033. header.admin = CMD_KEY_DEFINE;
  1034. if (buf == NULL) {
  1035. buf = (BufferChain *)MALLOC(packet_len);
  1036. if (buf == NULL) {
  1037. return -ENOMEM;
  1038. }
  1039. buf->totalBytes = packet_len - sizeof(BufferChain);
  1040. buf->nextBuffer = NULL;
  1041. } else if (buf &&
  1042. packet_len - (int)sizeof(BufferChain) > buf->totalBytes) {
  1043. FREE_IF(buf);
  1044. buf = (BufferChain *)MALLOC(packet_len);
  1045. if (buf == NULL) {
  1046. return -ENOMEM;
  1047. }
  1048. buf->totalBytes = packet_len - sizeof(BufferChain);
  1049. buf->nextBuffer = NULL;
  1050. }
  1051. char *p = buf->data + sizeof(struct iovec);
  1052. v = (struct iovec *)buf->data;
  1053. v->iov_base = p;
  1054. v->iov_len = sizeof(header) + content_len + 2;
  1055. memcpy(p, &header, sizeof(header));
  1056. p += sizeof(header);
  1057. *p = (uint8_t)tdef->field_type(0);
  1058. p++;
  1059. *p = (uint8_t)content_len;
  1060. p++;
  1061. memcpy(p, tdef->field_name(0), content_len);
  1062. log4cplus_debug("desc_tables_result leave.");
  1063. return 0;
  1064. }
  1065. int Packet::yaml_config_result(DtcJob *job , const char* p_filename)
  1066. {
  1067. log4cplus_debug("yaml_config_result entry.");
  1068. nv = 1;
  1069. char* p_buf = NULL;
  1070. int i_len = 0;
  1071. int i_ret = load_table(p_filename , p_buf , i_len);
  1072. if (p_buf != NULL) {
  1073. log4cplus_debug("p_filename:%s , buflen:%d" , p_filename , i_len);
  1074. }
  1075. if (i_ret != 0) { return -EFAULT; }
  1076. int send_len = sizeof(DTC_HEADER_V2) + i_len;
  1077. int packet_len = sizeof(BufferChain) + sizeof(iovec) + send_len;
  1078. DTC_HEADER_V2 header = { 0 };
  1079. header.version = 2;
  1080. header.id = job->request_peerid();
  1081. header.packet_len = send_len;
  1082. header.admin = CMD_KEY_DEFINE;
  1083. if (buf == NULL) {
  1084. buf = (BufferChain *)MALLOC(packet_len);
  1085. if (buf == NULL) {
  1086. return -ENOMEM;
  1087. }
  1088. buf->totalBytes = packet_len - sizeof(BufferChain);
  1089. buf->nextBuffer = NULL;
  1090. } else if (buf &&
  1091. packet_len - (int)sizeof(BufferChain) > buf->totalBytes) {
  1092. FREE_IF(buf);
  1093. buf = (BufferChain *)MALLOC(packet_len);
  1094. if (buf == NULL) {
  1095. return -ENOMEM;
  1096. }
  1097. buf->totalBytes = packet_len - sizeof(BufferChain);
  1098. buf->nextBuffer = NULL;
  1099. }
  1100. char *p = buf->data + sizeof(struct iovec);
  1101. v = (struct iovec *)buf->data;
  1102. v->iov_base = p;
  1103. v->iov_len = send_len;
  1104. memcpy(p, &header, sizeof(header));
  1105. p += sizeof(header);
  1106. memcpy(p, p_buf, i_len);
  1107. FREE_CLEAR(p_buf);
  1108. log4cplus_debug("yaml_config_result leave.");
  1109. return 0;
  1110. };
  1111. int Packet::load_table(const char* p_filename, char*& file , int& i_length)
  1112. {
  1113. int fd = -1;
  1114. if (!p_filename
  1115. || p_filename[0] == '\0'
  1116. || (fd = open(p_filename, O_RDONLY)) < 0) {
  1117. log4cplus_error("open config file error");
  1118. return -1;
  1119. }
  1120. printf("open file:%s\n", p_filename);
  1121. lseek(fd, 0L, SEEK_SET);
  1122. i_length = lseek(fd, 0L, SEEK_END);
  1123. lseek(fd, 0L, SEEK_SET);
  1124. // Attention: memory init here ,need release outside
  1125. file = (char *)MALLOC(i_length + 1);
  1126. int readlen = read(fd, file, i_length);
  1127. if (readlen < 0 || readlen == 0)
  1128. return -1;
  1129. file[i_length] = '\0';
  1130. close(fd);
  1131. i_length++; // add finish flag length
  1132. log4cplus_debug("read file to buf, len: %d", i_length);
  1133. return 0;
  1134. };
  1135. int Packet::encode_result_v2(DtcJob &job, int mtu, uint32_t ts)
  1136. {
  1137. log4cplus_debug("encode_result_v2 entry.");
  1138. const DTCTableDefinition *tdef = job.table_definition();
  1139. char* p_file_path = NULL;
  1140. switch (is_desc_tables(&job , p_file_path))
  1141. {
  1142. case E_SELECT_DTC_TABLES:
  1143. {
  1144. return desc_tables_result(&job);
  1145. }
  1146. break;
  1147. case E_SELECT_DTC_YAML:
  1148. {
  1149. return yaml_config_result(&job , p_file_path);
  1150. }
  1151. break;
  1152. case E_SELECT_TABLE_YAML:
  1153. {
  1154. return yaml_config_result(&job , p_file_path);
  1155. }
  1156. break;
  1157. case E_SELECT_NONE:
  1158. default:
  1159. break;
  1160. }
  1161. // if (is_desc_tables(&job)) {
  1162. // return desc_tables_result(&job);
  1163. // }
  1164. // rp指向返回数据集
  1165. ResultPacket *rp =
  1166. job.result_code() >= 0 ? job.get_result_packet() : NULL;
  1167. log4cplus_info("result code:%d" , job.result_code());
  1168. BufferChain *rb = NULL;
  1169. int nrp = 0, lrp = 0, off = 0;
  1170. bool bok = false;
  1171. if (mtu <= 0) {
  1172. mtu = MAXPACKETSIZE;
  1173. }
  1174. /* rp may exist but no result */
  1175. if (rp && (rp->numRows || rp->totalRows)) {
  1176. //rb指向数据结果集缓冲区起始位置
  1177. log4cplus_info("line:%d" ,__LINE__);
  1178. rb = rp->bc;
  1179. if (rb)
  1180. rb->Count(nrp, lrp);
  1181. off = 5 - encoded_bytes_length(rp->numRows);
  1182. encode_length(rb->data + off, rp->numRows);
  1183. lrp -= off;
  1184. job.resultInfo.set_total_rows(rp->totalRows);
  1185. } else {
  1186. log4cplus_info("line:%d" ,__LINE__);
  1187. nrp = 1;
  1188. bok = true;
  1189. if (rp && rp->totalRows == 0 && rp->bc) {
  1190. FREE(rp->bc);
  1191. rp->bc = NULL;
  1192. }
  1193. job.resultInfo.set_total_rows(0);
  1194. if (job.result_code() == 0) {
  1195. job.set_error(0, NULL, NULL);
  1196. }
  1197. //任务出现错误的时候,可能结果集里面还有值,此时需要将结果集的buffer释放掉
  1198. else if (job.result_code() < 0) {
  1199. ResultPacket *resultPacket = job.get_result_packet();
  1200. if (resultPacket) {
  1201. if (resultPacket->bc) {
  1202. FREE(resultPacket->bc);
  1203. resultPacket->bc = NULL;
  1204. }
  1205. }
  1206. }
  1207. }
  1208. if (ts) {
  1209. job.resultInfo.set_time_info(ts);
  1210. }
  1211. job.versionInfo.set_serial_nr(job.request_peerid() + 1);
  1212. if (job.result_key() == NULL && job.request_key() != NULL)
  1213. job.set_result_key(*job.request_key());
  1214. //转换内容包
  1215. int err = job.decode_result_set(rb->data + off, lrp);
  1216. if (err) {
  1217. log4cplus_debug("decode result null: %d", err);
  1218. } else {
  1219. log4cplus_debug("decode_result_set success");
  1220. }
  1221. DTC_HEADER_V2 dtc_header = { 0 };
  1222. dtc_header.version = 2;
  1223. dtc_header.id = job.request_peerid();
  1224. log4cplus_info("dtc_header.id:%d , job.request_serial():%d" , dtc_header.id , job.request_peerid());
  1225. dtc_header.packet_len = 0;
  1226. dtc_header.admin = CMD_NOP;
  1227. if(bok == false)
  1228. {
  1229. nrp = 1 /*fields count info*/ +
  1230. job.mr.get_need_array().size() /*fields def*/ + 0 /*eof*/ +
  1231. (job.result ? job.result->total_rows() : 0) /*row data*/ +
  1232. 1 /*eof*/;
  1233. }
  1234. log4cplus_info("line:%d" ,__LINE__);
  1235. /* pool, exist and large enough, use. else free and malloc */
  1236. int first_packet_len = sizeof(BufferChain) +
  1237. sizeof(struct iovec) * (nrp + 1) +
  1238. sizeof(dtc_header);
  1239. if (buf == NULL) {
  1240. buf = (BufferChain *)MALLOC(first_packet_len);
  1241. if (buf == NULL) {
  1242. return -ENOMEM;
  1243. }
  1244. buf->totalBytes = first_packet_len - sizeof(BufferChain);
  1245. buf->nextBuffer = NULL;
  1246. } else if (buf && first_packet_len - (int)sizeof(BufferChain) >
  1247. buf->totalBytes) {
  1248. FREE_IF(buf);
  1249. buf = (BufferChain *)MALLOC(first_packet_len);
  1250. if (buf == NULL) {
  1251. return -ENOMEM;
  1252. }
  1253. buf->totalBytes = first_packet_len - sizeof(BufferChain);
  1254. buf->nextBuffer = NULL;
  1255. }
  1256. //设置要发送的第一个包
  1257. char *p = buf->data + sizeof(struct iovec) * (nrp + 1);
  1258. v = (struct iovec *)buf->data;
  1259. v->iov_base = p;
  1260. v->iov_len = sizeof(dtc_header);
  1261. nv = nrp + 1;
  1262. buf->usedBytes = sizeof(struct iovec) * (nrp + 1) + sizeof(dtc_header);
  1263. //修改第一个包的内容
  1264. memcpy(p, &dtc_header, sizeof(dtc_header));
  1265. p += sizeof(dtc_header);
  1266. if (p - (char *)v->iov_base != sizeof(dtc_header))
  1267. fprintf(stderr, "%s(%d): BAD ENCODER len=%ld must=%d\n",
  1268. __FILE__, __LINE__, (long)(p - (char *)v->iov_base),
  1269. sizeof(dtc_header));
  1270. rb = NULL;
  1271. if(bok == true)
  1272. rb = encode_mysql_ok(&job, job.resultInfo.affected_rows());
  1273. else
  1274. rb = encode_mysql_protocol(&job);
  1275. if (!rb)
  1276. return -3;
  1277. buf->nextBuffer = rb;
  1278. for (int i = 1; i <= nrp; i++, rb = rb->nextBuffer) {
  1279. v[i].iov_base = rb->data;
  1280. v[i].iov_len = rb->usedBytes;
  1281. }
  1282. log4cplus_debug("encode_result_v2 leave.");
  1283. return 0;
  1284. }
  1285. int Packet::encode_result_mysql(DtcJob &job, int mtu, uint32_t ts)
  1286. {
  1287. log4cplus_debug("encode_result_mysql entry.");
  1288. const DTCTableDefinition *tdef = job.table_definition();
  1289. // rp指向返回数据集
  1290. ResultPacket *rp =
  1291. job.result_code() >= 0 ? job.get_result_packet() : NULL;
  1292. log4cplus_info("result code:%d" , job.result_code());
  1293. BufferChain *rb = NULL;
  1294. int nrp = 0, lrp = 0, off = 0;
  1295. bool bok = false;
  1296. if (mtu <= 0) {
  1297. mtu = MAXPACKETSIZE;
  1298. }
  1299. /* rp may exist but no result */
  1300. if (rp && (rp->numRows || rp->totalRows)) {
  1301. //rb指向数据结果集缓冲区起始位置
  1302. log4cplus_info("line:%d" ,__LINE__);
  1303. rb = rp->bc;
  1304. if (rb)
  1305. rb->Count(nrp, lrp);
  1306. off = 5 - encoded_bytes_length(rp->numRows);
  1307. encode_length(rb->data + off, rp->numRows);
  1308. lrp -= off;
  1309. job.resultInfo.set_total_rows(rp->totalRows);
  1310. } else {
  1311. log4cplus_info("line:%d" ,__LINE__);
  1312. nrp = 1;
  1313. bok = true;
  1314. if (rp && rp->totalRows == 0 && rp->bc) {
  1315. FREE(rp->bc);
  1316. rp->bc = NULL;
  1317. }
  1318. job.resultInfo.set_total_rows(0);
  1319. if (job.result_code() == 0) {
  1320. job.set_error(0, NULL, NULL);
  1321. }
  1322. //任务出现错误的时候,可能结果集里面还有值,此时需要将结果集的buffer释放掉
  1323. else if (job.result_code() < 0) {
  1324. ResultPacket *resultPacket = job.get_result_packet();
  1325. if (resultPacket) {
  1326. if (resultPacket->bc) {
  1327. FREE(resultPacket->bc);
  1328. resultPacket->bc = NULL;
  1329. }
  1330. }
  1331. }
  1332. }
  1333. if (ts) {
  1334. job.resultInfo.set_time_info(ts);
  1335. }
  1336. job.versionInfo.set_serial_nr(job.request_peerid() + 1);
  1337. if (job.result_key() == NULL && job.request_key() != NULL)
  1338. job.set_result_key(*job.request_key());
  1339. //转换内容包
  1340. int err = job.decode_result_set(rb->data + off, lrp);
  1341. if (err) {
  1342. log4cplus_debug("decode result null: %d", err);
  1343. } else {
  1344. log4cplus_debug("decode_result_set success");
  1345. }
  1346. if(bok == false)
  1347. {
  1348. nrp = 1 /*fields count info*/ +
  1349. job.mr.get_need_array().size() /*fields def*/ + 0 /*eof*/ +
  1350. (job.result ? job.result->total_rows() : 0) /*row data*/ +
  1351. 1 /*eof*/;
  1352. }
  1353. log4cplus_info("line:%d" ,__LINE__);
  1354. /* pool, exist and large enough, use. else free and malloc */
  1355. int first_packet_len = sizeof(BufferChain) +
  1356. sizeof(struct iovec) * nrp ;
  1357. if (buf == NULL) {
  1358. buf = (BufferChain *)MALLOC(first_packet_len);
  1359. if (buf == NULL) {
  1360. return -ENOMEM;
  1361. }
  1362. buf->totalBytes = first_packet_len - sizeof(BufferChain);
  1363. buf->nextBuffer = NULL;
  1364. } else if (buf && first_packet_len - (int)sizeof(BufferChain) >
  1365. buf->totalBytes) {
  1366. FREE_IF(buf);
  1367. buf = (BufferChain *)MALLOC(first_packet_len);
  1368. if (buf == NULL) {
  1369. return -ENOMEM;
  1370. }
  1371. buf->totalBytes = first_packet_len - sizeof(BufferChain);
  1372. buf->nextBuffer = NULL;
  1373. }
  1374. //设置要发送的第一个包
  1375. v = (struct iovec *)buf->data;
  1376. nv = nrp ;
  1377. buf->usedBytes = sizeof(struct iovec) * (nrp);
  1378. rb = NULL;
  1379. if(bok == true)
  1380. rb = encode_mysql_ok(&job, job.resultInfo.affected_rows());
  1381. else
  1382. rb = encode_mysql_protocol(&job);
  1383. if (!rb)
  1384. return -3;
  1385. buf->nextBuffer = rb;
  1386. for (int i = 0; i < nrp; i++, rb = rb->nextBuffer) {
  1387. v[i].iov_base = rb->data;
  1388. v[i].iov_len = rb->usedBytes;
  1389. }
  1390. log4cplus_debug("encode_result_mysql leave.");
  1391. return 0;
  1392. }
  1393. int Packet::encode_result(DTCJobOperation &job, int mtu)
  1394. {
  1395. log4cplus_debug("encode_result entry.");
  1396. if (1 == job.get_pac_version()) {
  1397. return encode_result((DtcJob &)job, mtu, job.Timestamp());
  1398. } else if (2 == job.get_pac_version()) {
  1399. return encode_result_v2((DtcJob &)job, mtu, job.Timestamp());
  1400. } else if (0 == job.get_pac_version()) {
  1401. return encode_result_mysql((DtcJob &)job, mtu, job.Timestamp());
  1402. } else {
  1403. log4cplus_error("illegal packet version");
  1404. return -1;
  1405. }
  1406. log4cplus_debug("encode_result leave.");
  1407. }
  1408. void Packet::free_result_buff()
  1409. {
  1410. if (!buf)
  1411. return;
  1412. BufferChain *resbuff = buf->nextBuffer;
  1413. buf->nextBuffer = NULL;
  1414. while (resbuff) {
  1415. char *p = (char *)resbuff;
  1416. resbuff = resbuff->nextBuffer;
  1417. FREE(p);
  1418. }
  1419. }
  1420. int Packet::Bytes(void)
  1421. {
  1422. int sendbytes = 0;
  1423. for (int i = 0; i < nv; i++) {
  1424. sendbytes += v[i].iov_len;
  1425. }
  1426. return sendbytes;
  1427. }