da_msg.h 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #ifndef DA_MSG_H_
  17. #define DA_MSG_H_
  18. #include "compiler.h"
  19. #include "da_array.h"
  20. #include "da_buf.h"
  21. #include "da_queue.h"
  22. #include "da_rbtree.h"
  23. #include "da_string.h"
  24. #include <inttypes.h>
  25. #include <stdint.h>
  26. #include <stdlib.h>
  27. #include "my/my_com_data.h"
  28. #include "my/my_command.h"
  29. #include "my/my_comm.h"
  30. struct msg;
  31. struct conn;
  32. struct msg_tqh;
  33. struct context;
  34. typedef void (*msg_parse_t)(struct msg *);
  35. typedef int (*msg_fragment_t)(struct msg *, uint32_t, struct msg_tqh *);
  36. typedef int (*msg_coalesce_t)(struct msg *r);
  37. typedef enum msg_parse_result {
  38. MSG_PARSE_OK, /* parsing ok */
  39. MSG_PARSE_ERROR, /* parsing error */
  40. MSG_PARSE_REPAIR, /* more to parse -> repair parsed & unparsed data */
  41. MSG_PARSE_AGAIN, /* incomplete -> parse again */
  42. MSG_PARSE_ERROR_NO_SELECTED_DB,
  43. } msg_parse_result_t;
  44. typedef enum msg_my_error{
  45. MY_ERR_NO_DB_SELECTED = 1046, //no database selected
  46. MY_ERR_UNKNOWN_DB = 1049, // Unknown database
  47. } msg_my_error_t;
  48. #define MSG_TYPE_CODEC(ACTION) \
  49. ACTION(NOP) \
  50. ACTION(RSP_RESULTCODE) \
  51. ACTION(RSP_RESULTSET) \
  52. ACTION(REQ_SVRADMIN) \
  53. ACTION(REQ_GET) \
  54. ACTION(REQ_PURGE) \
  55. ACTION(REQ_INSERT) \
  56. ACTION(REQ_UPDATE) \
  57. ACTION(REQ_DELETE) \
  58. ACTION(REQ_REPLACE) \
  59. ACTION(REQ_FLUSH) \
  60. ACTION(REQ_INVALIDATE) \
  61. ACTION(REQ_MONITOR)
  62. #define DEFINE_ACTION(_name) MSG_##_name,
  63. typedef enum msg_type { MSG_TYPE_CODEC(DEFINE_ACTION) } msg_type_t;
  64. #undef DEFINE_ACTION
  65. /*
  66. * start point and end point must within a mbuf
  67. */
  68. struct keypos {
  69. uint8_t *start; /* key start pos */
  70. uint8_t *end; /* key end pos */
  71. };
  72. struct msg {
  73. TAILQ_ENTRY(msg) c_i_tqe; /*in client inmsg queue*/
  74. TAILQ_ENTRY(msg) c_o_tqe; /*in client omsg queue*/
  75. TAILQ_ENTRY(msg) s_i_tqe; /*in the server inmsg queue*/
  76. TAILQ_ENTRY(msg) o_tqe; /*in frag_msgq or send q*/
  77. uint64_t id; /* id for svr asyn operation*/
  78. int idx; /* index of server*/
  79. uint64_t serialnr; /* client serialnr*/
  80. msg_type_t cmd; /* msg type */
  81. struct string accesskey; /* accesskey for the msg*/
  82. uint64_t keytype; /* keytype of the primary key*/
  83. uint64_t keyCount; /* number of key count*/
  84. struct keypos keys[32]; /* array of keys */
  85. int64_t hitflag;
  86. struct conn *owner; /* message owner - client | server */
  87. struct conn *peer_conn; /* message peer(client | server) connection*/
  88. uint64_t peerid; /* id of msg peer*/
  89. struct msg *peer; /* msg peer*/
  90. int64_t affectrows; /* affect rows in result info*/
  91. uint64_t totalrows; /* total rows in result info*/
  92. uint64_t numrows; /* result rows*/
  93. uint32_t fieldnums; /* field nums*/
  94. int64_t resultcode; /* resultcode in result info*/
  95. struct msg *frag_owner; /* owner of fragment message */
  96. uint32_t nfrag; /* # fragment */
  97. uint32_t nfrag_done; /* # fragment done */
  98. uint64_t frag_id; /* id of fragmented message */
  99. struct msg
  100. *frag_seq[32]; /* sequence of fragment message, map from keys to
  101. fragments*/
  102. struct buf_stqh buf_q; /* buff list in msg*/
  103. uint32_t mlen; /* message length */
  104. uint64_t start_ts; /* start timestamp in usec */
  105. struct rbnode tmo_rbe; /* entry in time rbtree */
  106. struct rbnode msg_rbe; /* entry in backwork rbtree*/
  107. msg_parse_t parser; /* msg parse */
  108. msg_parse_result_t parse_res; /* msg parse result*/
  109. msg_fragment_t fragment; /* message fragment */
  110. msg_coalesce_t coalesce; /* message post-coalesce */
  111. struct mbuf *keyendbuf; /* the buf that end key located*/
  112. uint8_t *keyendpos;
  113. struct mbuf *keycountendbuf; /* the buf that keycount end*/
  114. uint8_t *keycountendpos; /* end position in buff*/
  115. struct mbuf *setsplitbuf;
  116. uint8_t *setsplitpos;
  117. uint64_t keycountstartlen; /* the length until the start of keycount*/
  118. uint64_t keycountlen; /* keycount len*/
  119. uint64_t keylen; /* all key len include length and key value*/
  120. uint8_t flags; /* header flags*/
  121. uint32_t *seclen; /* header 8 section len in msg header*/
  122. uint8_t cur_parse_id; /* current parse id*/
  123. uint64_t cur_parse_type; /* current parse type*/
  124. int cur_parse_lenth; /* current parse lenth*/
  125. int state; /* current parse state*/
  126. int field; /* current parse field*/
  127. int sec_parsed_len;
  128. int parsed_len; /* len has been parsed*/
  129. int subkeylen; /* parsing subkey length*/
  130. int subkeycount; /* parsing subkey count */
  131. uint8_t *pos; /* parser position marker */
  132. uint8_t *token; /* token marker */
  133. uint8_t pkt_nr; /* mysql sequence id */
  134. enum enum_server_command command; /* mysql request command type */
  135. enum enum_agent_admin admin;
  136. uint8_t layer;
  137. union COM_DATA data;
  138. int err; /* errno on error? */
  139. unsigned error : 1; /* error? */
  140. unsigned ferror : 1; /* error? */
  141. unsigned request : 1; /* request? or response? */
  142. unsigned done : 1; /* done? */
  143. unsigned fdone : 1; /* all fragments are done? */
  144. unsigned swallow : 1; /* swallow response? */
  145. unsigned cli_inq : 1; /*msg in client in msgq*/
  146. unsigned cli_outq : 1; /*msg in client out msgq*/
  147. unsigned sev_inq : 1; /*msg in server in msgq*/
  148. unsigned sev_msgtree : 1; /*msg in server in msg tree*/
  149. unsigned sending : 1; /*msg is sending*/
  150. };
  151. TAILQ_HEAD(msg_tqh, msg);
  152. void msg_tmo_delete(struct msg *msg);
  153. void msg_tmo_insert(struct msg *msg, struct conn *conn);
  154. struct msg *msg_tmo_min(void);
  155. int msg_init();
  156. struct msg *msg_get(struct conn *conn, bool request);
  157. int msg_deinit();
  158. void msg_put(struct msg *m);
  159. struct msg *msg_get_error(struct msg *smsg);
  160. void msg_dump(struct msg *msg, int level);
  161. struct string *msg_type_string(msg_type_t type);
  162. bool msg_empty(struct msg *msg);
  163. int msg_recv(struct context *ctx, struct conn *conn);
  164. int msg_send(struct context *ctx, struct conn *conn);
  165. uint64_t msg_gen_frag_id(void);
  166. int msg_append_buf(struct msg *msg, struct mbuf *sbuf, uint8_t *pos,
  167. size_t len);
  168. struct mbuf *msg_insert_mem_bulk(struct msg *msg, struct mbuf *mbuf,
  169. uint8_t *pos, size_t len);
  170. uint32_t msg_backend_idx(struct msg *msg, uint8_t *key, uint32_t keylen);
  171. #endif /* DA_MSG_H_ */