task_base.h 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #ifndef __CH_TASK_H__
  17. #define __CH_TASK_H__
  18. #include "../table/table_def.h"
  19. #include "section.h"
  20. #include "../field/field.h"
  21. #include "../packet/packet.h"
  22. #include "result.h"
  23. #include "dtc_error_code.h"
  24. #include "algorithm/timestamp.h"
  25. #include <sys/time.h>
  26. #include "../log/log.h"
  27. #include "buffer.h"
  28. #include "receiver.h"
  29. #include "../my/my_request.h"
  30. #include "../field/field_api.h"
  31. class NCRequest;
  32. class ClientAgent;
  33. enum DecodeResult {
  34. DecodeFatalError, // no response needed
  35. DecodeDataError, // response with error code
  36. DecodeIdle, // no data received
  37. DecodeWaitData, // partial decoding
  38. DecodeDone // full packet
  39. };
  40. enum DecodeStage { // internal use
  41. DecodeStageFatalError, // same as result
  42. DecodeStageDataError, // same as result
  43. DecodeStageIdle, // same as result
  44. DecodeStageWaitHeader, // partial header
  45. DecodeStageWaitData, // partial data
  46. DecodeStageDiscard, // error mode, discard remain data
  47. DecodeStageDone // full packet
  48. };
  49. enum TaskRole {
  50. TaskRoleServer = 0, /* server, for incoming thread */
  51. TaskRoleClient, /* client, reply from server */
  52. TaskRoleHelperReply, /* helper, reply from server */
  53. };
  54. enum TaskType {
  55. TaskTypeAdmin = 0,
  56. TaskTypeRead, /* Read Only operation */
  57. TaskTypeWrite, /* Modify data */
  58. TaskTypeCommit, /* commit dirty data */
  59. TaskTypeWriteHbLog,
  60. TaskTypeReadHbLog,
  61. TaskTypeWriteLruHbLog,
  62. TaskTypeRegisterHbLog,
  63. TaskTypeQueryHbLogInfo,
  64. TaskTypeHelperReloadConfig,
  65. };
  66. enum CHITFLAG {
  67. HIT_INIT = 0, // hit init flag
  68. HIT_SUCCESS = 1, // hit success flag
  69. };
  70. class DtcJob : public TableReference {
  71. public:
  72. static const DecodeResult stage2result[];
  73. DecodeResult get_decode_result(void)
  74. {
  75. return stage2result[stage];
  76. };
  77. static const uint32_t validcmds[];
  78. static const uint16_t cmd2type[];
  79. static const uint16_t validsections[][2];
  80. static const uint8_t validktype[DField::TotalType][DField::TotalType];
  81. static const uint8_t validxtype[DField::TotalOperation]
  82. [DField::TotalType][DField::TotalType];
  83. static const uint8_t validcomps[DField::TotalType]
  84. [DField::TotalComparison];
  85. protected:
  86. class BufferPool {
  87. // simple buffer pool, only keep 2 buffers
  88. public:
  89. char *ptr[2];
  90. int len[2];
  91. BufferPool()
  92. {
  93. ptr[0] = NULL;
  94. ptr[1] = NULL;
  95. }
  96. ~BufferPool()
  97. {
  98. FREE_IF(ptr[0]);
  99. FREE_IF(ptr[1]);
  100. }
  101. void Push(char *v)
  102. {
  103. ptr[ptr[0] ? 1 : 0] = v;
  104. }
  105. inline char *Allocate(int size, TaskRole role)
  106. {
  107. if (role == TaskRoleServer || role == TaskRoleClient) {
  108. CreateBuff(size, len[0], &ptr[0]);
  109. if (ptr[0] == NULL)
  110. return NULL;
  111. return ptr[0];
  112. } else {
  113. CreateBuff(size, len[1], &ptr[1]);
  114. if (ptr[1] == NULL)
  115. return NULL;
  116. return ptr[1];
  117. }
  118. }
  119. char *Clone(char *buff, int size, TaskRole role)
  120. {
  121. if (role != TaskRoleClient)
  122. return NULL;
  123. char *p;
  124. if (ptr[0])
  125. p = ptr[1] = (char *)MALLOC(size);
  126. else
  127. p = ptr[0] = (char *)MALLOC(size);
  128. if (p)
  129. memcpy(p, buff, size);
  130. return p;
  131. }
  132. };
  133. public:
  134. char *migratebuf;
  135. protected: // decoder informations
  136. DecodeStage stage;
  137. TaskRole role;
  138. BufferPool packetbuf;
  139. //don't use it except packet decoding
  140. DTCTableDefinition *dataTableDef;
  141. DTCTableDefinition *hotbackupTableDef;
  142. // used by replicate table definition
  143. DTCTableDefinition *replicateTableDef;
  144. protected: // packet info, read-only
  145. DTCFieldValue *updateInfo;
  146. DTCFieldValue *conditionInfo;
  147. DTCFieldSet *fieldList;
  148. public:
  149. ResultSet *result;
  150. MyRequest mr;
  151. public: // packet info, read-write
  152. DTCVersionInfo versionInfo;
  153. DTCRequestInfo requestInfo;
  154. DTCResultInfo resultInfo;
  155. FieldValueByName ui;
  156. FieldValueByName ci;
  157. protected: // working data
  158. uint64_t serialNr; /* derived from packet */
  159. uint64_t peerid;
  160. const DTCValue *key; /* derived from packet */
  161. const DTCValue *rkey; /* processing */
  162. /* resultWriter only create once in job entire life */
  163. ResultWriter *resultWriter; /* processing */
  164. int resultWriterReseted;
  165. uint8_t requestCode; /* derived from packet */
  166. uint8_t requestType; /* derived from packet */
  167. uint8_t requestFlags; /* derived from packet */
  168. uint8_t replyCode; /* processing */
  169. uint8_t replyFlags; /* derived from packet */
  170. enum { PFLAG_REMOTETABLE = 1,
  171. PFLAG_ALLROWS = 2,
  172. PFLAG_PASSTHRU = 4,
  173. PFLAG_ISHIT = 8,
  174. PFLAG_FETCHDATA = 0x10,
  175. PFLAG_ALLOWREMOTETABLE = 0x20,
  176. PFLAG_FIELDSETWITHKEY = 0x40,
  177. PFLAG_BLACKHOLED = 0x80,
  178. };
  179. uint8_t processFlags; /* processing */
  180. int8_t pac_version;
  181. protected:
  182. // return total packet size
  183. int decode_header_v1(const DTC_HEADER_V1 &in,
  184. DTC_HEADER_V1 *out = NULL);
  185. int validate_section(DTC_HEADER_V1 &header);
  186. void decode_request_v1(DTC_HEADER_V1 &header, char *p);
  187. void decode_request_v2(MyRequest *mr);
  188. int decode_field_value(char *d, int l, int m);
  189. int decode_field_set(char *d, int l);
  190. private:
  191. int8_t select_version(char *packetIn, int packetLen);
  192. ClientAgent* _client_owner;
  193. public:
  194. DtcJob(DTCTableDefinition *tdef = NULL, TaskRole r = TaskRoleServer,
  195. int final = 0)
  196. : TableReference(tdef), migratebuf(NULL),
  197. stage(final ? DecodeStageDataError : DecodeStageIdle),
  198. role(r), dataTableDef(tdef), hotbackupTableDef(NULL),
  199. replicateTableDef(NULL), updateInfo(NULL),
  200. conditionInfo(NULL), fieldList(NULL), result(NULL),
  201. serialNr(0), peerid(0),key(NULL), rkey(NULL), resultWriter(NULL),
  202. resultWriterReseted(0), requestCode(0), requestType(0),
  203. requestFlags(0), replyCode(0), replyFlags(0),
  204. processFlags(PFLAG_ALLROWS) , pac_version(0)
  205. {
  206. }
  207. virtual ~DtcJob(void)
  208. {
  209. DELETE(updateInfo);
  210. DELETE(conditionInfo);
  211. DELETE(fieldList);
  212. DELETE(resultWriter);
  213. DELETE(result);
  214. FREE_IF(migratebuf);
  215. }
  216. // linked clone
  217. inline DtcJob(const DtcJob &orig)
  218. {
  219. DtcJob();
  220. Copy(orig);
  221. }
  222. int decode_result_set(char *d, int l);
  223. // these Copy()... only apply to empty DtcJob
  224. // linked clone
  225. int Copy(const DtcJob &orig);
  226. // linked clone with replace key
  227. int Copy(const DtcJob &orig, const DTCValue *newkey);
  228. // replace row
  229. int Copy(const RowValue &);
  230. // internal API
  231. int Copy(NCRequest &rq, const DTCValue *key);
  232. inline void Clean()
  233. {
  234. TableReference::set_table_definition(NULL);
  235. if (updateInfo)
  236. updateInfo->Clean();
  237. if (conditionInfo)
  238. conditionInfo->Clean();
  239. if (fieldList)
  240. fieldList->Clean();
  241. if (result)
  242. result->Clean();
  243. versionInfo.Clean();
  244. requestInfo.Clean();
  245. resultInfo.Clean();
  246. //serialNr = 0;
  247. key = NULL;
  248. rkey = NULL;
  249. if (resultWriter) {
  250. resultWriter->Clean();
  251. resultWriterReseted = 0;
  252. }
  253. requestCode = 0;
  254. requestType = 0;
  255. requestFlags = 0;
  256. replyCode = 0;
  257. replyFlags = 0;
  258. processFlags = PFLAG_ALLROWS;
  259. }
  260. //////////// some API access request property
  261. inline void set_data_table(DTCTableDefinition *t)
  262. {
  263. TableReference::set_table_definition(t);
  264. dataTableDef = t;
  265. }
  266. inline void set_hotbackup_table(DTCTableDefinition *t)
  267. {
  268. hotbackupTableDef = t;
  269. }
  270. inline void set_replicate_table(DTCTableDefinition *t)
  271. {
  272. replicateTableDef = t;
  273. }
  274. inline DTCTableDefinition *get_replicate_table()
  275. {
  276. return replicateTableDef;
  277. }
  278. // This code has to value (not very usefull):
  279. // DRequest::ResultInfo --> result/error code/key only
  280. // DRequest::DTCResultSet --> result_code() >=0, with DTCResultSet
  281. // please use result_code() for detail error code
  282. int reply_code(void) const
  283. {
  284. return replyCode;
  285. }
  286. // Retrieve request key
  287. int has_request_key(void) const
  288. {
  289. return key != NULL;
  290. }
  291. const DTCValue *request_key(void) const
  292. {
  293. return key;
  294. }
  295. unsigned int int_key(void) const
  296. {
  297. return (unsigned int)(request_key()->u64);
  298. }
  299. void update_key(RowValue *r)
  300. {
  301. (*r)[0] = *request_key();
  302. }
  303. // only for test suit
  304. void set_request_condition(DTCFieldValue *cond)
  305. {
  306. conditionInfo = cond;
  307. }
  308. void set_request_key(DTCValue *val)
  309. {
  310. key = val;
  311. }
  312. const DTCFieldValue *request_condition(void) const
  313. {
  314. return conditionInfo;
  315. }
  316. const DTCFieldValue *request_operation(void) const
  317. {
  318. return updateInfo;
  319. }
  320. //for migrate
  321. void set_request_operation(DTCFieldValue *ui)
  322. {
  323. updateInfo = ui;
  324. }
  325. const DTCFieldSet *request_fields(void)
  326. {
  327. return fieldList;
  328. }
  329. void set_request_fields(const DTCFieldSet* p_dtc_field_set)
  330. {
  331. fieldList = p_dtc_field_set;
  332. }
  333. const uint64_t request_serial(void) const
  334. {
  335. return serialNr;
  336. }
  337. const uint64_t request_peerid(void) const
  338. {
  339. return peerid;
  340. }
  341. // result key
  342. const DTCValue *result_key(void) const
  343. {
  344. return rkey;
  345. }
  346. // only if insert w/o key
  347. void set_result_key(const DTCValue &v)
  348. {
  349. resultInfo.set_key(v);
  350. rkey = &v;
  351. }
  352. static int max_header_size(void)
  353. {
  354. return sizeof(DTC_HEADER_V1);
  355. }
  356. static int min_header_size(void)
  357. {
  358. return sizeof(DTC_HEADER_V1);
  359. }
  360. static int check_packet_size(const char *buf, int len);
  361. // Decode data from fd
  362. void decode_stream(SimpleReceiver &receiver);
  363. DecodeResult do_decode(SimpleReceiver &receiver)
  364. {
  365. decode_stream(receiver);
  366. return get_decode_result();
  367. }
  368. // Decode data from packet
  369. // type 0: clone packet
  370. // type 1: eat(keep&free) packet
  371. // type 2: use external packet
  372. void decode_packet_v1(char *packetIn, int packetLen, int type);
  373. void decode_packet_v2(char *packetIn, int packetLen, int type);
  374. void decode_mysql_packet(char *packetIn, int packetLen, int type);
  375. int build_field_type_r(int sql_type, char *field_name);
  376. int8_t get_pac_version() { return pac_version; }
  377. DecodeResult do_decode(char *packetIn, int packetLen, int type)
  378. {
  379. pac_version = select_version(packetIn, packetLen);
  380. if (pac_version == 1)
  381. decode_packet_v1(packetIn, packetLen, type);
  382. else if (pac_version == 2)
  383. decode_packet_v2(packetIn, packetLen, type);
  384. else if(pac_version == 0)
  385. decode_mysql_packet(packetIn, packetLen, type);
  386. return get_decode_result();
  387. }
  388. DecodeResult do_decode(const char *packetIn, int packetLen)
  389. {
  390. pac_version = select_version(packetIn, packetLen);
  391. if (pac_version == 1)
  392. decode_packet_v1((char *)packetIn, packetLen, 0);
  393. else if (pac_version == 2)
  394. decode_packet_v2((char *)packetIn, packetLen, 0);
  395. else if(pac_version == 0)
  396. decode_mysql_packet(packetIn, packetLen, 0);
  397. return get_decode_result();
  398. };
  399. inline void begin_stage()
  400. {
  401. stage = DecodeStageIdle;
  402. }
  403. // change role from TaskRoleServer to TaskRoleHelperReply
  404. // cleanup decode state, prepare reply from helper
  405. inline void prepare_decode_reply(void)
  406. {
  407. role = TaskRoleHelperReply;
  408. stage = DecodeStageIdle;
  409. }
  410. inline void set_role_as_server()
  411. {
  412. role = TaskRoleServer;
  413. }
  414. // set error code before Packet::encode_result();
  415. // err is positive errno
  416. void set_error(int err, const char *from, const char *msg)
  417. {
  418. resultInfo.set_error(err, from, msg);
  419. }
  420. void set_error_dup(int err, const char *from, const char *msg)
  421. {
  422. resultInfo.set_error_dup(err, from, msg);
  423. }
  424. // retrieve previous result code
  425. // >= 0 success
  426. // < 0 err, negative value of set_error()
  427. int result_code(void) const
  428. {
  429. return resultInfo.result_code();
  430. }
  431. int allow_remote_table(void) const
  432. {
  433. return processFlags & PFLAG_ALLOWREMOTETABLE;
  434. }
  435. void mark_allow_remote_table(void)
  436. {
  437. processFlags |= PFLAG_ALLOWREMOTETABLE;
  438. }
  439. void mark_has_remote_table(void)
  440. {
  441. processFlags |= PFLAG_REMOTETABLE;
  442. }
  443. DTCTableDefinition *remote_table_definition(void)
  444. {
  445. if (processFlags & PFLAG_REMOTETABLE)
  446. return table_definition();
  447. return NULL;
  448. }
  449. //////////// some API for request processing
  450. // Client Request Code
  451. int Role(void) const
  452. {
  453. return role;
  454. }
  455. int request_code(void) const
  456. {
  457. return requestCode;
  458. }
  459. void set_request_code(uint8_t code)
  460. {
  461. requestCode = code;
  462. }
  463. int request_type(void) const
  464. {
  465. return requestType;
  466. }
  467. void set_request_type(TaskType type)
  468. {
  469. requestType = type;
  470. }
  471. int flag_keep_alive(void) const
  472. {
  473. return requestFlags & DRequest::Flag::KeepAlive;
  474. }
  475. int flag_table_definition(void) const
  476. {
  477. return requestFlags & DRequest::Flag::NeedTableDefinition;
  478. }
  479. int flag_no_cache(void) const
  480. {
  481. return requestFlags & DRequest::Flag::no_cache;
  482. }
  483. int flag_no_result(void) const
  484. {
  485. return requestFlags & DRequest::Flag::NoResult;
  486. }
  487. int flag_no_next_server(void) const
  488. {
  489. return requestFlags & DRequest::Flag::no_next_server;
  490. }
  491. int flag_multi_key_val(void) const
  492. {
  493. return requestFlags & DRequest::Flag::MultiKeyValue;
  494. }
  495. int flag_multi_key_result(void) const
  496. {
  497. return replyFlags & DRequest::Flag::MultiKeyValue;
  498. }
  499. int flag_admin_table(void) const
  500. {
  501. return requestFlags & DRequest::Flag::admin_table;
  502. }
  503. int flag_pass_thru(void) const
  504. {
  505. return processFlags & PFLAG_PASSTHRU;
  506. }
  507. int flag_fetch_data(void) const
  508. {
  509. return processFlags & PFLAG_FETCHDATA;
  510. }
  511. int flag_is_hit(void) const
  512. {
  513. return processFlags & PFLAG_ISHIT;
  514. }
  515. int flag_field_set_with_key(void) const
  516. {
  517. return processFlags & PFLAG_FIELDSETWITHKEY;
  518. }
  519. int flag_black_hole(void) const
  520. {
  521. return processFlags & PFLAG_BLACKHOLED;
  522. }
  523. void mark_as_pass_thru(void)
  524. {
  525. processFlags |= PFLAG_PASSTHRU;
  526. }
  527. void mark_as_fetch_data(void)
  528. {
  529. processFlags |= PFLAG_FETCHDATA;
  530. }
  531. void mark_as_hit(void)
  532. {
  533. processFlags |= PFLAG_ISHIT;
  534. }
  535. void mark_field_set_with_key(void)
  536. {
  537. processFlags |= PFLAG_FIELDSETWITHKEY;
  538. }
  539. void mark_as_black_hole(void)
  540. {
  541. processFlags |= PFLAG_BLACKHOLED;
  542. }
  543. void set_result_hit_flag(CHITFLAG hitFlag)
  544. {
  545. resultInfo.set_hit_flag((uint32_t)hitFlag);
  546. }
  547. // API for key expire time
  548. int update_key_expire_time(int max);
  549. // this is count only request
  550. int count_only(void) const
  551. {
  552. return fieldList == NULL;
  553. }
  554. // this is non-contional request
  555. void clear_all_rows(void)
  556. {
  557. processFlags &= ~PFLAG_ALLROWS;
  558. }
  559. int all_rows(void) const
  560. {
  561. return processFlags & PFLAG_ALLROWS;
  562. }
  563. // apply insertValues, updateOperations to row
  564. int update_row(RowValue &row)
  565. {
  566. return updateInfo == NULL ? 0 : updateInfo->Update(row);
  567. }
  568. // checking the condition
  569. int compare_row(const RowValue &row, int iCmpFirstNRows = 256) const
  570. {
  571. return all_rows() ? 1 :
  572. conditionInfo->Compare(row, iCmpFirstNRows);
  573. }
  574. // prepare an DTCResultSet, for afterward operation
  575. int prepare_result(int start, int count);
  576. inline int prepare_result(void)
  577. {
  578. return prepare_result(requestInfo.limit_start(),
  579. requestInfo.limit_count());
  580. }
  581. inline int prepare_result_no_limit(void)
  582. {
  583. return prepare_result(0, 0);
  584. }
  585. inline void detach_result_in_result_writer()
  586. {
  587. if (resultWriter)
  588. resultWriter->detach_result();
  589. }
  590. // new a countonly DTCResultSet with 'nr' rows
  591. // No extra append_row() allowed
  592. int set_total_rows(unsigned int nr, int Force = 0)
  593. {
  594. if (!Force) {
  595. if (resultWriter || prepare_result() == 0)
  596. return resultWriter->set_rows(nr);
  597. } else {
  598. resultWriter->set_total_rows(nr);
  599. }
  600. return 0;
  601. }
  602. void add_total_rows(int n)
  603. {
  604. resultWriter->add_total_rows(n);
  605. }
  606. int in_range(unsigned int nr, unsigned int begin = 0) const
  607. {
  608. return resultWriter->in_range(nr, begin);
  609. }
  610. int result_full(void) const
  611. {
  612. return resultWriter && resultWriter->is_full();
  613. }
  614. // append_row, from row 'r'
  615. int append_row(const RowValue &r)
  616. {
  617. return resultWriter->append_row(r);
  618. }
  619. int append_row(const RowValue *r)
  620. {
  621. return r ? resultWriter->append_row(*r) : 0;
  622. }
  623. // Append Row from DTCResultSet with condition operation
  624. int append_result(ResultSet *rs);
  625. // Append All Row from DTCResultSet
  626. int pass_all_result(ResultSet *rs);
  627. // Merge all row from sub-job
  628. int merge_result(const DtcJob &job);
  629. // Get Encoded Result Packet
  630. ResultPacket *get_result_packet(void) const
  631. {
  632. return (ResultPacket *)resultWriter;
  633. }
  634. // Process Internal Results
  635. int process_internal_result(uint32_t ts = 0);
  636. void set_job_owner_client(ClientAgent* ca) {_client_owner = ca;}
  637. ClientAgent* get_job_owner_client() { return _client_owner;}
  638. };
  639. extern int packet_body_len_v1(DTC_HEADER_V1 &header);
  640. #endif