123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694 |
- /*
- * Copyright [2021] JD.com, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #ifndef __CH_TASK_H__
- #define __CH_TASK_H__
- #include "../table/table_def.h"
- #include "section.h"
- #include "../field/field.h"
- #include "../packet/packet.h"
- #include "result.h"
- #include "dtc_error_code.h"
- #include "algorithm/timestamp.h"
- #include <sys/time.h>
- #include "../log/log.h"
- #include "buffer.h"
- #include "receiver.h"
- #include "../my/my_request.h"
- #include "../field/field_api.h"
- class NCRequest;
- class ClientAgent;
- enum DecodeResult {
- DecodeFatalError, // no response needed
- DecodeDataError, // response with error code
- DecodeIdle, // no data received
- DecodeWaitData, // partial decoding
- DecodeDone // full packet
- };
- enum DecodeStage { // internal use
- DecodeStageFatalError, // same as result
- DecodeStageDataError, // same as result
- DecodeStageIdle, // same as result
- DecodeStageWaitHeader, // partial header
- DecodeStageWaitData, // partial data
- DecodeStageDiscard, // error mode, discard remain data
- DecodeStageDone // full packet
- };
- enum TaskRole {
- TaskRoleServer = 0, /* server, for incoming thread */
- TaskRoleClient, /* client, reply from server */
- TaskRoleHelperReply, /* helper, reply from server */
- };
- enum TaskType {
- TaskTypeAdmin = 0,
- TaskTypeRead, /* Read Only operation */
- TaskTypeWrite, /* Modify data */
- TaskTypeCommit, /* commit dirty data */
- TaskTypeWriteHbLog,
- TaskTypeReadHbLog,
- TaskTypeWriteLruHbLog,
- TaskTypeRegisterHbLog,
- TaskTypeQueryHbLogInfo,
- TaskTypeHelperReloadConfig,
- };
- enum CHITFLAG {
- HIT_INIT = 0, // hit init flag
- HIT_SUCCESS = 1, // hit success flag
- };
- class DtcJob : public TableReference {
- public:
- static const DecodeResult stage2result[];
- DecodeResult get_decode_result(void)
- {
- return stage2result[stage];
- };
- static const uint32_t validcmds[];
- static const uint16_t cmd2type[];
- static const uint16_t validsections[][2];
- static const uint8_t validktype[DField::TotalType][DField::TotalType];
- static const uint8_t validxtype[DField::TotalOperation]
- [DField::TotalType][DField::TotalType];
- static const uint8_t validcomps[DField::TotalType]
- [DField::TotalComparison];
- protected:
- class BufferPool {
- // simple buffer pool, only keep 2 buffers
- public:
- char *ptr[2];
- int len[2];
- BufferPool()
- {
- ptr[0] = NULL;
- ptr[1] = NULL;
- }
- ~BufferPool()
- {
- FREE_IF(ptr[0]);
- FREE_IF(ptr[1]);
- }
- void Push(char *v)
- {
- ptr[ptr[0] ? 1 : 0] = v;
- }
- inline char *Allocate(int size, TaskRole role)
- {
- if (role == TaskRoleServer || role == TaskRoleClient) {
- CreateBuff(size, len[0], &ptr[0]);
- if (ptr[0] == NULL)
- return NULL;
- return ptr[0];
- } else {
- CreateBuff(size, len[1], &ptr[1]);
- if (ptr[1] == NULL)
- return NULL;
- return ptr[1];
- }
- }
- char *Clone(char *buff, int size, TaskRole role)
- {
- if (role != TaskRoleClient)
- return NULL;
- char *p;
- if (ptr[0])
- p = ptr[1] = (char *)MALLOC(size);
- else
- p = ptr[0] = (char *)MALLOC(size);
- if (p)
- memcpy(p, buff, size);
- return p;
- }
- };
- public:
- char *migratebuf;
- protected: // decoder informations
- DecodeStage stage;
- TaskRole role;
- BufferPool packetbuf;
- //don't use it except packet decoding
- DTCTableDefinition *dataTableDef;
- DTCTableDefinition *hotbackupTableDef;
- // used by replicate table definition
- DTCTableDefinition *replicateTableDef;
- protected: // packet info, read-only
- DTCFieldValue *updateInfo;
- DTCFieldValue *conditionInfo;
- DTCFieldSet *fieldList;
- public:
- ResultSet *result;
- MyRequest mr;
- public: // packet info, read-write
- DTCVersionInfo versionInfo;
- DTCRequestInfo requestInfo;
- DTCResultInfo resultInfo;
- FieldValueByName ui;
- FieldValueByName ci;
- protected: // working data
- uint64_t serialNr; /* derived from packet */
- uint64_t peerid;
- const DTCValue *key; /* derived from packet */
- const DTCValue *rkey; /* processing */
- /* resultWriter only create once in job entire life */
- ResultWriter *resultWriter; /* processing */
- int resultWriterReseted;
- uint8_t requestCode; /* derived from packet */
- uint8_t requestType; /* derived from packet */
- uint8_t requestFlags; /* derived from packet */
- uint8_t replyCode; /* processing */
- uint8_t replyFlags; /* derived from packet */
- enum { PFLAG_REMOTETABLE = 1,
- PFLAG_ALLROWS = 2,
- PFLAG_PASSTHRU = 4,
- PFLAG_ISHIT = 8,
- PFLAG_FETCHDATA = 0x10,
- PFLAG_ALLOWREMOTETABLE = 0x20,
- PFLAG_FIELDSETWITHKEY = 0x40,
- PFLAG_BLACKHOLED = 0x80,
- };
- uint8_t processFlags; /* processing */
- int8_t pac_version;
- protected:
- // return total packet size
- int decode_header_v1(const DTC_HEADER_V1 &in,
- DTC_HEADER_V1 *out = NULL);
- int validate_section(DTC_HEADER_V1 &header);
- void decode_request_v1(DTC_HEADER_V1 &header, char *p);
- void decode_request_v2(MyRequest *mr);
- int decode_field_value(char *d, int l, int m);
- int decode_field_set(char *d, int l);
- private:
- int8_t select_version(char *packetIn, int packetLen);
- ClientAgent* _client_owner;
- public:
- DtcJob(DTCTableDefinition *tdef = NULL, TaskRole r = TaskRoleServer,
- int final = 0)
- : TableReference(tdef), migratebuf(NULL),
- stage(final ? DecodeStageDataError : DecodeStageIdle),
- role(r), dataTableDef(tdef), hotbackupTableDef(NULL),
- replicateTableDef(NULL), updateInfo(NULL),
- conditionInfo(NULL), fieldList(NULL), result(NULL),
- serialNr(0), peerid(0),key(NULL), rkey(NULL), resultWriter(NULL),
- resultWriterReseted(0), requestCode(0), requestType(0),
- requestFlags(0), replyCode(0), replyFlags(0),
- processFlags(PFLAG_ALLROWS) , pac_version(0)
- {
- }
- virtual ~DtcJob(void)
- {
- DELETE(updateInfo);
- DELETE(conditionInfo);
- DELETE(fieldList);
- DELETE(resultWriter);
- DELETE(result);
- FREE_IF(migratebuf);
- }
- // linked clone
- inline DtcJob(const DtcJob &orig)
- {
- DtcJob();
- Copy(orig);
- }
- int decode_result_set(char *d, int l);
- // these Copy()... only apply to empty DtcJob
- // linked clone
- int Copy(const DtcJob &orig);
- // linked clone with replace key
- int Copy(const DtcJob &orig, const DTCValue *newkey);
- // replace row
- int Copy(const RowValue &);
- // internal API
- int Copy(NCRequest &rq, const DTCValue *key);
- inline void Clean()
- {
- TableReference::set_table_definition(NULL);
- if (updateInfo)
- updateInfo->Clean();
- if (conditionInfo)
- conditionInfo->Clean();
- if (fieldList)
- fieldList->Clean();
- if (result)
- result->Clean();
- versionInfo.Clean();
- requestInfo.Clean();
- resultInfo.Clean();
- //serialNr = 0;
- key = NULL;
- rkey = NULL;
- if (resultWriter) {
- resultWriter->Clean();
- resultWriterReseted = 0;
- }
- requestCode = 0;
- requestType = 0;
- requestFlags = 0;
- replyCode = 0;
- replyFlags = 0;
- processFlags = PFLAG_ALLROWS;
- }
- //////////// some API access request property
- inline void set_data_table(DTCTableDefinition *t)
- {
- TableReference::set_table_definition(t);
- dataTableDef = t;
- }
- inline void set_hotbackup_table(DTCTableDefinition *t)
- {
- hotbackupTableDef = t;
- }
- inline void set_replicate_table(DTCTableDefinition *t)
- {
- replicateTableDef = t;
- }
- inline DTCTableDefinition *get_replicate_table()
- {
- return replicateTableDef;
- }
- // This code has to value (not very usefull):
- // DRequest::ResultInfo --> result/error code/key only
- // DRequest::DTCResultSet --> result_code() >=0, with DTCResultSet
- // please use result_code() for detail error code
- int reply_code(void) const
- {
- return replyCode;
- }
- // Retrieve request key
- int has_request_key(void) const
- {
- return key != NULL;
- }
- const DTCValue *request_key(void) const
- {
- return key;
- }
- unsigned int int_key(void) const
- {
- return (unsigned int)(request_key()->u64);
- }
- void update_key(RowValue *r)
- {
- (*r)[0] = *request_key();
- }
- // only for test suit
- void set_request_condition(DTCFieldValue *cond)
- {
- conditionInfo = cond;
- }
- void set_request_key(DTCValue *val)
- {
- key = val;
- }
- const DTCFieldValue *request_condition(void) const
- {
- return conditionInfo;
- }
- const DTCFieldValue *request_operation(void) const
- {
- return updateInfo;
- }
- //for migrate
- void set_request_operation(DTCFieldValue *ui)
- {
- updateInfo = ui;
- }
- const DTCFieldSet *request_fields(void)
- {
- return fieldList;
- }
- void set_request_fields(const DTCFieldSet* p_dtc_field_set)
- {
- fieldList = p_dtc_field_set;
- }
- const uint64_t request_serial(void) const
- {
- return serialNr;
- }
- const uint64_t request_peerid(void) const
- {
- return peerid;
- }
- // result key
- const DTCValue *result_key(void) const
- {
- return rkey;
- }
- // only if insert w/o key
- void set_result_key(const DTCValue &v)
- {
- resultInfo.set_key(v);
- rkey = &v;
- }
- static int max_header_size(void)
- {
- return sizeof(DTC_HEADER_V1);
- }
- static int min_header_size(void)
- {
- return sizeof(DTC_HEADER_V1);
- }
- static int check_packet_size(const char *buf, int len);
- // Decode data from fd
- void decode_stream(SimpleReceiver &receiver);
- DecodeResult do_decode(SimpleReceiver &receiver)
- {
- decode_stream(receiver);
- return get_decode_result();
- }
- // Decode data from packet
- // type 0: clone packet
- // type 1: eat(keep&free) packet
- // type 2: use external packet
- void decode_packet_v1(char *packetIn, int packetLen, int type);
- void decode_packet_v2(char *packetIn, int packetLen, int type);
- void decode_mysql_packet(char *packetIn, int packetLen, int type);
- int build_field_type_r(int sql_type, char *field_name);
- int8_t get_pac_version() { return pac_version; }
- DecodeResult do_decode(char *packetIn, int packetLen, int type)
- {
- pac_version = select_version(packetIn, packetLen);
- if (pac_version == 1)
- decode_packet_v1(packetIn, packetLen, type);
- else if (pac_version == 2)
- decode_packet_v2(packetIn, packetLen, type);
- else if(pac_version == 0)
- decode_mysql_packet(packetIn, packetLen, type);
- return get_decode_result();
- }
- DecodeResult do_decode(const char *packetIn, int packetLen)
- {
- pac_version = select_version(packetIn, packetLen);
- if (pac_version == 1)
- decode_packet_v1((char *)packetIn, packetLen, 0);
- else if (pac_version == 2)
- decode_packet_v2((char *)packetIn, packetLen, 0);
- else if(pac_version == 0)
- decode_mysql_packet(packetIn, packetLen, 0);
- return get_decode_result();
- };
- inline void begin_stage()
- {
- stage = DecodeStageIdle;
- }
- // change role from TaskRoleServer to TaskRoleHelperReply
- // cleanup decode state, prepare reply from helper
- inline void prepare_decode_reply(void)
- {
- role = TaskRoleHelperReply;
- stage = DecodeStageIdle;
- }
- inline void set_role_as_server()
- {
- role = TaskRoleServer;
- }
- // set error code before Packet::encode_result();
- // err is positive errno
- void set_error(int err, const char *from, const char *msg)
- {
- resultInfo.set_error(err, from, msg);
- }
- void set_error_dup(int err, const char *from, const char *msg)
- {
- resultInfo.set_error_dup(err, from, msg);
- }
- // retrieve previous result code
- // >= 0 success
- // < 0 err, negative value of set_error()
- int result_code(void) const
- {
- return resultInfo.result_code();
- }
- int allow_remote_table(void) const
- {
- return processFlags & PFLAG_ALLOWREMOTETABLE;
- }
- void mark_allow_remote_table(void)
- {
- processFlags |= PFLAG_ALLOWREMOTETABLE;
- }
- void mark_has_remote_table(void)
- {
- processFlags |= PFLAG_REMOTETABLE;
- }
- DTCTableDefinition *remote_table_definition(void)
- {
- if (processFlags & PFLAG_REMOTETABLE)
- return table_definition();
- return NULL;
- }
- //////////// some API for request processing
- // Client Request Code
- int Role(void) const
- {
- return role;
- }
- int request_code(void) const
- {
- return requestCode;
- }
- void set_request_code(uint8_t code)
- {
- requestCode = code;
- }
- int request_type(void) const
- {
- return requestType;
- }
- void set_request_type(TaskType type)
- {
- requestType = type;
- }
- int flag_keep_alive(void) const
- {
- return requestFlags & DRequest::Flag::KeepAlive;
- }
- int flag_table_definition(void) const
- {
- return requestFlags & DRequest::Flag::NeedTableDefinition;
- }
- int flag_no_cache(void) const
- {
- return requestFlags & DRequest::Flag::no_cache;
- }
- int flag_no_result(void) const
- {
- return requestFlags & DRequest::Flag::NoResult;
- }
- int flag_no_next_server(void) const
- {
- return requestFlags & DRequest::Flag::no_next_server;
- }
- int flag_multi_key_val(void) const
- {
- return requestFlags & DRequest::Flag::MultiKeyValue;
- }
- int flag_multi_key_result(void) const
- {
- return replyFlags & DRequest::Flag::MultiKeyValue;
- }
- int flag_admin_table(void) const
- {
- return requestFlags & DRequest::Flag::admin_table;
- }
- int flag_pass_thru(void) const
- {
- return processFlags & PFLAG_PASSTHRU;
- }
- int flag_fetch_data(void) const
- {
- return processFlags & PFLAG_FETCHDATA;
- }
- int flag_is_hit(void) const
- {
- return processFlags & PFLAG_ISHIT;
- }
- int flag_field_set_with_key(void) const
- {
- return processFlags & PFLAG_FIELDSETWITHKEY;
- }
- int flag_black_hole(void) const
- {
- return processFlags & PFLAG_BLACKHOLED;
- }
- void mark_as_pass_thru(void)
- {
- processFlags |= PFLAG_PASSTHRU;
- }
- void mark_as_fetch_data(void)
- {
- processFlags |= PFLAG_FETCHDATA;
- }
- void mark_as_hit(void)
- {
- processFlags |= PFLAG_ISHIT;
- }
- void mark_field_set_with_key(void)
- {
- processFlags |= PFLAG_FIELDSETWITHKEY;
- }
- void mark_as_black_hole(void)
- {
- processFlags |= PFLAG_BLACKHOLED;
- }
- void set_result_hit_flag(CHITFLAG hitFlag)
- {
- resultInfo.set_hit_flag((uint32_t)hitFlag);
- }
- // API for key expire time
- int update_key_expire_time(int max);
- // this is count only request
- int count_only(void) const
- {
- return fieldList == NULL;
- }
- // this is non-contional request
- void clear_all_rows(void)
- {
- processFlags &= ~PFLAG_ALLROWS;
- }
- int all_rows(void) const
- {
- return processFlags & PFLAG_ALLROWS;
- }
- // apply insertValues, updateOperations to row
- int update_row(RowValue &row)
- {
- return updateInfo == NULL ? 0 : updateInfo->Update(row);
- }
- // checking the condition
- int compare_row(const RowValue &row, int iCmpFirstNRows = 256) const
- {
- return all_rows() ? 1 :
- conditionInfo->Compare(row, iCmpFirstNRows);
- }
- // prepare an DTCResultSet, for afterward operation
- int prepare_result(int start, int count);
- inline int prepare_result(void)
- {
- return prepare_result(requestInfo.limit_start(),
- requestInfo.limit_count());
- }
- inline int prepare_result_no_limit(void)
- {
- return prepare_result(0, 0);
- }
- inline void detach_result_in_result_writer()
- {
- if (resultWriter)
- resultWriter->detach_result();
- }
- // new a countonly DTCResultSet with 'nr' rows
- // No extra append_row() allowed
- int set_total_rows(unsigned int nr, int Force = 0)
- {
- if (!Force) {
- if (resultWriter || prepare_result() == 0)
- return resultWriter->set_rows(nr);
- } else {
- resultWriter->set_total_rows(nr);
- }
- return 0;
- }
- void add_total_rows(int n)
- {
- resultWriter->add_total_rows(n);
- }
- int in_range(unsigned int nr, unsigned int begin = 0) const
- {
- return resultWriter->in_range(nr, begin);
- }
- int result_full(void) const
- {
- return resultWriter && resultWriter->is_full();
- }
- // append_row, from row 'r'
- int append_row(const RowValue &r)
- {
- return resultWriter->append_row(r);
- }
- int append_row(const RowValue *r)
- {
- return r ? resultWriter->append_row(*r) : 0;
- }
- // Append Row from DTCResultSet with condition operation
- int append_result(ResultSet *rs);
- // Append All Row from DTCResultSet
- int pass_all_result(ResultSet *rs);
- // Merge all row from sub-job
- int merge_result(const DtcJob &job);
- // Get Encoded Result Packet
- ResultPacket *get_result_packet(void) const
- {
- return (ResultPacket *)resultWriter;
- }
- // Process Internal Results
- int process_internal_result(uint32_t ts = 0);
- void set_job_owner_client(ClientAgent* ca) {_client_owner = ca;}
- ClientAgent* get_job_owner_client() { return _client_owner;}
- };
- extern int packet_body_len_v1(DTC_HEADER_V1 &header);
- #endif
|