da_request.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <inttypes.h>
  17. #include <byteswap.h>
  18. #include "da_request.h"
  19. #include "da_msg.h"
  20. #include "da_util.h"
  21. #include "da_response.h"
  22. #include "da_server.h"
  23. #include "da_event.h"
  24. #include "da_conn.h"
  25. #include "da_core.h"
  26. #include "da_errno.h"
  27. #include "da_stats.h"
  28. #include "da_time.h"
  29. #include "my/my_comm.h"
  30. #define LAYER3_DEF "layer3"
  31. void req_put(struct msg *msg)
  32. {
  33. struct msg *pmsg; /* peer message (response) */
  34. ASSERT(msg != NULL);
  35. ASSERT(msg->request);
  36. pmsg = msg->peer;
  37. if (pmsg != NULL) {
  38. ASSERT(!pmsg->request && pmsg->peer == msg);
  39. msg->peer = NULL;
  40. pmsg->peer = NULL;
  41. rsp_put(pmsg);
  42. }
  43. msg_tmo_delete(msg);
  44. msg_put(msg);
  45. }
  46. static struct msg *req_get(struct conn *conn)
  47. {
  48. struct msg *msg;
  49. msg = msg_get(conn, true);
  50. //lack of memory ,close client connection
  51. if (msg == NULL) {
  52. conn->error = 1;
  53. conn->err = CONN_MSG_GET_ERR;
  54. log_error(
  55. "req enter,get msg from pool_2msg error,lack of memory");
  56. }
  57. return msg;
  58. }
  59. struct msg *req_recv_next(struct context *ctx, struct conn *conn, bool alloc)
  60. {
  61. ASSERT(conn != NULL && conn->fd > 0);
  62. struct msg *msg;
  63. if (conn->eof) {
  64. msg = conn->rmsg;
  65. if (msg != NULL) {
  66. conn->rmsg = NULL;
  67. log_error("eof s %d discarding incomplete req %" PRIu64
  68. " len "
  69. "%" PRIu32 "",
  70. conn->fd, msg->id, msg->mlen);
  71. req_put(msg);
  72. }
  73. //no half connection
  74. //if (!conn->active(conn)) {
  75. conn->done = 1;
  76. log_debug("c %d active %d is done", conn->fd,
  77. conn->active(conn));
  78. //}
  79. return NULL;
  80. }
  81. msg = conn->rmsg;
  82. if (msg != NULL) {
  83. ASSERT(msg->request);
  84. return msg;
  85. }
  86. if (!alloc) {
  87. return NULL;
  88. }
  89. msg = req_get(conn);
  90. if (msg != NULL) {
  91. conn->rmsg = msg;
  92. }
  93. return msg;
  94. }
  95. struct msg *req_send_next(struct context *ctx, struct conn *conn)
  96. {
  97. int status;
  98. struct msg *msg, *nmsg; /* current and next message */
  99. if (conn->connecting) {
  100. server_connected(ctx, conn);
  101. }
  102. nmsg = TAILQ_FIRST(&conn->imsg_q);
  103. if (nmsg == NULL) {
  104. /* nothing to send as the server inq is empty */
  105. log_debug("del epoll out when to server send done");
  106. status = event_del_out(ctx->evb, conn);
  107. if (status != 0) {
  108. conn->error = 1;
  109. conn->err = CONN_EPOLL_DEL_ERR;
  110. }
  111. return NULL;
  112. }
  113. msg = conn->smsg;
  114. if (msg != NULL) {
  115. ASSERT(msg->request && !msg->done);
  116. nmsg = TAILQ_NEXT(msg, s_i_tqe);
  117. }
  118. conn->smsg = nmsg;
  119. if (nmsg == NULL) {
  120. return NULL;
  121. }
  122. ASSERT(nmsg->request && !nmsg->done);
  123. log_debug("send next req %" PRIu64 " len %" PRIu32 " type %d on "
  124. "s %d",
  125. nmsg->id, nmsg->mlen, nmsg->cmd, conn->fd);
  126. return nmsg;
  127. }
  128. /*
  129. * dequeue msg from server send msgq,insert masg into search tree
  130. */
  131. void req_send_done(struct context *ctx, struct conn *conn, struct msg *msg)
  132. {
  133. ASSERT(conn->type & BACKWORK);
  134. ASSERT(msg != NULL && conn->smsg == NULL);
  135. ASSERT(msg->request && !msg->done);
  136. ASSERT(msg->owner != conn);
  137. log_debug("send done req %" PRIu64 " len %" PRIu32 " type %d on "
  138. "s %d",
  139. msg->id, msg->mlen, msg->cmd, conn->fd);
  140. /*struct cache_instance *ci;
  141. ci = conn->owner;
  142. if (ci->failure_num > 0)
  143. {
  144. ci->failure_num--;
  145. }*/
  146. /* dequeue the message (request) from server inq */
  147. conn->dequeue_inq(ctx, conn, msg);
  148. /*put msg into msg search tree*/
  149. conn->en_msgtree(ctx, conn, msg);
  150. }
  151. void req_client_enqueue_omsgq(struct context *ctx, struct conn *conn,
  152. struct msg *msg)
  153. {
  154. ASSERT(msg->request);
  155. ASSERT(conn->type & FRONTWORK);
  156. TAILQ_INSERT_TAIL(&conn->omsg_q, msg, c_o_tqe);
  157. msg->cli_outq = 1;
  158. }
  159. void req_client_dequeue_omsgq(struct context *ctx, struct conn *conn,
  160. struct msg *msg)
  161. {
  162. ASSERT(msg->request);
  163. ASSERT(conn->type & FRONTWORK);
  164. TAILQ_REMOVE(&conn->omsg_q, msg, c_o_tqe);
  165. msg->cli_outq = 0;
  166. }
  167. void req_client_enqueue_imsgq(struct context *ctx, struct conn *conn,
  168. struct msg *msg)
  169. {
  170. ASSERT(msg->request);
  171. ASSERT(conn->type & FRONTWORK);
  172. TAILQ_INSERT_TAIL(&conn->imsg_q, msg, c_i_tqe);
  173. msg->cli_inq = 1;
  174. }
  175. void req_client_dequeue_imsgq(struct context *ctx, struct conn *conn,
  176. struct msg *msg)
  177. {
  178. ASSERT(msg->request);
  179. ASSERT(conn->type & FRONTWORK);
  180. TAILQ_REMOVE(&conn->imsg_q, msg, c_i_tqe);
  181. msg->cli_inq = 0;
  182. }
  183. void req_server_enqueue_imsgq(struct context *ctx, struct conn *conn,
  184. struct msg *msg)
  185. {
  186. ASSERT(msg->request);
  187. ASSERT(conn->type & BACKWORK);
  188. struct cache_instance *ci;
  189. msg_tmo_insert(msg, conn);
  190. TAILQ_INSERT_TAIL(&conn->imsg_q, msg, s_i_tqe);
  191. msg->sev_inq = 1;
  192. //TODO
  193. ci = conn->owner;
  194. stats_server_incr(ctx, ci, server_in_queue);
  195. }
  196. void req_server_dequeue_imsgq(struct context *ctx, struct conn *conn,
  197. struct msg *msg)
  198. {
  199. ASSERT(msg->request);
  200. ASSERT(conn->type & BACKWORK);
  201. TAILQ_REMOVE(&conn->imsg_q, msg, s_i_tqe);
  202. msg->sev_inq = 0;
  203. //TODO
  204. struct cache_instance *ci;
  205. ci = conn->owner;
  206. stats_server_decr(ctx, ci, server_in_queue);
  207. }
  208. /*
  209. * insert a msg to a server msg rbtree
  210. */
  211. void req_server_en_msgtree(struct context *ctx, struct conn *conn,
  212. struct msg *msg)
  213. {
  214. ASSERT(msg->request);
  215. ASSERT(conn->type & BACKWORK);
  216. struct rbnode *node;
  217. struct cache_instance *ci;
  218. /*put msg into msg search tree*/
  219. node = &msg->msg_rbe;
  220. node->key = msg->id;
  221. node->data = msg;
  222. rbtree_insert(&conn->msg_tree, node);
  223. msg->sev_msgtree = 1;
  224. //TODO
  225. ci = conn->owner;
  226. stats_server_incr(ctx, ci, server_in_tree);
  227. }
  228. /*
  229. * delete a msg from server msg rbtree
  230. */
  231. void req_server_de_msgtree(struct context *ctx, struct conn *conn,
  232. struct msg *msg)
  233. {
  234. ASSERT(msg->request);
  235. ASSERT(conn->type & BACKWORK);
  236. struct rbnode *node;
  237. /*put msg into msg search tree*/
  238. node = &msg->msg_rbe;
  239. if (node->data == NULL) {
  240. return;
  241. }
  242. rbtree_delete(&conn->msg_tree, node);
  243. msg->sev_msgtree = 0;
  244. //TODO
  245. struct cache_instance *ci;
  246. ci = conn->owner;
  247. stats_server_decr(ctx, ci, server_in_tree);
  248. }
  249. static void req_forward_stats(struct context *ctx, struct cache_instance *ci,
  250. struct msg *msg)
  251. {
  252. ASSERT(msg->request);
  253. stats_server_incr(ctx, ci, server_requests);
  254. stats_server_incr_by(ctx, ci, server_request_bytes, msg->mlen);
  255. }
  256. /*
  257. * msg not in any queue ,can reply client
  258. */
  259. static void req_make_loopback(struct context *ctx, struct conn *conn,
  260. struct msg *msg)
  261. {
  262. ASSERT((conn->type & FRONTWORK) && !(conn->type & LISTENER));
  263. log_debug("loopback req %" PRIu64 " len %" PRIu32 " type %d from "
  264. "c %d",
  265. msg->id, msg->mlen, msg->cmd, conn->fd);
  266. msg->done = 1;
  267. if (conn->writecached == 0 && conn->connected == 1) {
  268. cache_send_event(conn);
  269. }
  270. //insert msg into client out msg queue
  271. conn->enqueue_outq(ctx, conn, msg);
  272. return;
  273. }
  274. int dtc_header_add(struct msg *msg, enum enum_agent_admin admin, char* dbname)
  275. {
  276. struct DTC_HEADER_V2 dtc_header = { 0 };
  277. struct mbuf *mbuf = STAILQ_LAST(&msg->buf_q, mbuf, next);
  278. if (!mbuf)
  279. return -1;
  280. struct mbuf *new_buf = mbuf_get();
  281. if (new_buf == NULL)
  282. return -2;
  283. dtc_header.version = DA_PROTOCOL_VERSION;
  284. dtc_header.admin = admin;
  285. #if __BYTE_ORDER == __BIG_ENDIAN
  286. dtc_header.id = bswap_64(msg->id);
  287. #else
  288. dtc_header.id = msg->id;
  289. #endif
  290. dtc_header.id = msg->id;
  291. dtc_header.dbname_len = dbname && strlen(dbname) > 0 ? strlen(dbname) : 0;
  292. dtc_header.packet_len = mbuf_length(mbuf) + sizeof(dtc_header) + dtc_header.dbname_len;
  293. dtc_header.layer = msg->layer;
  294. mbuf_copy(new_buf, &dtc_header, sizeof(dtc_header));
  295. if(dbname && strlen(dbname) > 0)
  296. mbuf_copy(new_buf, dbname, dtc_header.dbname_len);
  297. mbuf_copy(new_buf, mbuf->start, mbuf_length(mbuf));
  298. log_debug("dtc_header.dbname_len: %d, %s", dtc_header.dbname_len, dbname);
  299. mbuf_remove(&msg->buf_q, mbuf);
  300. mbuf_put(mbuf);
  301. mbuf_insert(&msg->buf_q, new_buf);
  302. msg->mlen = mbuf_length(new_buf);
  303. log_debug("msg->mlen:%d sizeof(dtc_header):%d mbuf_length(mbuf):%d",
  304. msg->mlen, sizeof(dtc_header), mbuf_length(mbuf));
  305. return 0;
  306. }
  307. void req_process(struct context *ctx, struct conn *c_conn, struct msg *msg)
  308. {
  309. if (c_conn->stage ==
  310. CONN_STAGE_LOGGING_IN) /* this request is a login authentication */
  311. {
  312. if (0) {
  313. c_conn->stage = CONN_STAGE_SWITCH_NATIVE_PASSWD;
  314. if (net_send_switch(msg, c_conn) <
  315. 0) /* default resp login success. */
  316. return;
  317. req_make_loopback(ctx, c_conn, msg);
  318. } else {
  319. c_conn->stage = CONN_STAGE_LOGGED_IN;
  320. if (net_send_ok(msg, c_conn) <
  321. 0) /* default resp login success. */
  322. return;
  323. req_make_loopback(ctx, c_conn, msg);
  324. }
  325. return;
  326. }
  327. if (c_conn->stage == CONN_STAGE_SWITCH_NATIVE_PASSWD) {
  328. c_conn->stage = CONN_STAGE_LOGGED_IN;
  329. if (net_send_ok(msg, c_conn) <
  330. 0) /* default resp login success. */
  331. return;
  332. req_make_loopback(ctx, c_conn, msg);
  333. return;
  334. }
  335. if (c_conn->stage !=
  336. CONN_STAGE_LOGGED_IN) /* not logged in yet, resp error */
  337. {
  338. log_error("log in auth occur something wrong.");
  339. if (net_send_error(msg, c_conn) <
  340. 0) /* default resp login success. */
  341. return;
  342. req_make_loopback(ctx, c_conn, msg);
  343. return;
  344. }
  345. int oper = my_do_command(msg);
  346. switch (oper) {
  347. case NEXT_FORWARD:
  348. dtc_header_add(msg, CMD_NOP, c_conn->dbname);
  349. log_debug(
  350. "FORWARD. msg len: %d, msg id: %d",
  351. msg->mlen, msg->id);
  352. req_forward(ctx, c_conn, msg);
  353. break;
  354. case NEXT_RSP_OK:
  355. log_debug(
  356. "RSP OK. msg len: %d, msg id: %d",
  357. msg->mlen, msg->id);
  358. if (net_send_ok(msg, c_conn) <
  359. 0) /* default resp login success. */
  360. return;
  361. req_make_loopback(ctx, c_conn, msg);
  362. break;
  363. case NEXT_RSP_ERROR:
  364. log_debug(
  365. "RSP ERROR. msg len: %d, msg id: %d",
  366. msg->mlen, msg->id);
  367. if (net_send_error(msg, c_conn) <
  368. 0) /* default resp login success. */
  369. return;
  370. req_make_loopback(ctx, c_conn, msg);
  371. break;
  372. case NEXT_RSP_NULL:
  373. if (net_send_ok(msg, c_conn) <
  374. 0) /* default resp login success. */
  375. return;
  376. req_make_loopback(ctx, c_conn, msg);
  377. break;
  378. default:
  379. log_error("my_do_command operation error:%d", oper);
  380. }
  381. return;
  382. }
  383. static void req_forward(struct context *ctx, struct conn *c_conn,
  384. struct msg *msg)
  385. {
  386. struct conn *s_conn;
  387. struct server_pool *pool;
  388. struct cache_instance *ci;
  389. uint32_t i;
  390. struct server_pool *temp_pool = NULL;
  391. struct server_pool *ss_pool = NULL;
  392. log_debug("req_forward entry");
  393. /*insert msg to client imsgq,waiting for
  394. *the response,first add to backworker's queue
  395. */
  396. c_conn->enqueue_inq(ctx, c_conn, msg);
  397. pool = c_conn->owner;
  398. if(msg->layer == 2 || msg->layer == 3)
  399. {
  400. for(i = 0 ; i < array_n(&(ctx->pool)) ; i ++){
  401. ss_pool = (struct server_pool *)array_get(&(ctx->pool), i);
  402. if(ss_pool->mid == 999)
  403. break;
  404. else
  405. ss_pool = NULL;
  406. }
  407. if(ss_pool == NULL){
  408. log_debug("s_conn null");
  409. //client connection is still exist,no swallow
  410. msg->done = 1;
  411. msg->error = 1;
  412. msg->err = MSG_REQ_FORWARD_ERR;
  413. if (msg->frag_owner != NULL) {
  414. msg->frag_owner->nfrag_done++;
  415. }
  416. if (req_done(c_conn, msg)) {
  417. rsp_forward(ctx, c_conn, msg);
  418. }
  419. stats_pool_incr(ctx, pool, forward_error);
  420. log_error("msg :%" PRIu64 " from c %d ,get s_conn fail!",
  421. msg->id, c_conn->fd);
  422. return;
  423. }
  424. s_conn =
  425. server_pool_conn(ctx, ss_pool, msg);
  426. }
  427. else
  428. {
  429. s_conn =
  430. server_pool_conn(ctx, (struct server_pool *) c_conn->owner, msg);
  431. }
  432. if (s_conn == NULL) {
  433. log_debug("s_conn null");
  434. //client connection is still exist,no swallow
  435. msg->done = 1;
  436. msg->error = 1;
  437. msg->err = MSG_REQ_FORWARD_ERR;
  438. if (msg->frag_owner != NULL) {
  439. msg->frag_owner->nfrag_done++;
  440. }
  441. if (req_done(c_conn, msg)) {
  442. rsp_forward(ctx, c_conn, msg);
  443. }
  444. stats_pool_incr(ctx, pool, forward_error);
  445. log_error("msg :%" PRIu64 " from c %d ,get s_conn fail!",
  446. msg->id, c_conn->fd);
  447. return;
  448. }
  449. //set the peer_conn of msg
  450. msg->peer_conn = s_conn;
  451. if (s_conn->writecached == 0 && s_conn->connected == 1) {
  452. cache_send_event(s_conn);
  453. }
  454. /*
  455. * insert msg to server imsgq,send to dtc server
  456. */
  457. s_conn->enqueue_inq(ctx, s_conn, msg);
  458. //stats counter incr
  459. if (msg->cmd == MSG_REQ_GET) {
  460. stats_pool_incr_by(ctx, (struct server_pool *)c_conn->owner,
  461. pool_request_get_keys, msg->keyCount);
  462. }
  463. ci = s_conn->owner;
  464. req_forward_stats(ctx, s_conn->owner, msg);
  465. log_debug("forward from c %d to s %d req %" PRIu64 " len %" PRIu32
  466. " type %d",
  467. c_conn->fd, s_conn->fd, msg->id, msg->mlen, msg->cmd);
  468. }
  469. /*
  470. * msg not in any queue ,can reply client
  471. */
  472. static void req_make_error(struct context *ctx, struct conn *conn,
  473. struct msg *msg, int msg_errno)
  474. {
  475. ASSERT((conn->type & FRONTWORK) && !(conn->type & LISTENER));
  476. log_debug("forward req %" PRIu64 " len %" PRIu32 " type %d from "
  477. "c %d failed,msg errno:%d,errmsg:%s",
  478. msg->id, msg->mlen, msg->cmd, conn->fd, msg_errno,
  479. GetMsgErrorCodeStr(msg_errno));
  480. msg->done = 1;
  481. msg->error = 1;
  482. msg->err = msg_errno;
  483. if (conn->writecached == 0 && conn->connected == 1) {
  484. cache_send_event(conn);
  485. }
  486. //insert msg into client out msg queue
  487. conn->enqueue_outq(ctx, conn, msg);
  488. return;
  489. }
  490. static bool req_filter_empty(struct context *ctx, struct conn *conn,
  491. struct msg *msg)
  492. {
  493. ASSERT(conn->client && !conn->proxy);
  494. if (msg_empty(msg)) {
  495. ASSERT(conn->rmsg == NULL);
  496. log_debug("filter empty req %" PRIu64 " from c %d", msg->id,
  497. conn->fd);
  498. req_put(msg);
  499. return true;
  500. }
  501. return false;
  502. }
  503. static void req_recv_done_stats(struct context *ctx, struct server_pool *pool,
  504. struct msg *msg)
  505. {
  506. stats_pool_incr(ctx, pool, pool_requests);
  507. stats_pool_incr_by(ctx, pool, pool_request_bytes, msg->mlen);
  508. }
  509. void req_recv_done(struct context *ctx, struct conn *conn, struct msg *msg,
  510. struct msg *nmsg)
  511. {
  512. int status;
  513. struct server_pool *pool;
  514. struct msg_tqh frag_msgq;
  515. struct msg *sub_msg;
  516. struct msg *tmsg; /* tmp next message */
  517. log_debug("req_recv_done entry.");
  518. ASSERT((conn->type & FRONTWORK) && !(conn->type & LISTENER));
  519. ASSERT(msg->request);
  520. ASSERT(msg->owner == conn);
  521. ASSERT(conn->rmsg == msg);
  522. ASSERT(nmsg == NULL || nmsg->request);
  523. /*switch the msg been process now*/
  524. conn->rmsg = nmsg;
  525. if (req_filter_empty(ctx, conn, msg)) {
  526. ASSERT(conn->rmsg == NULL);
  527. log_debug("filter a empty msg: %" PRIu64 "in conn:%d", msg->id,
  528. conn->fd);
  529. return;
  530. }
  531. pool = (struct server_pool *)conn->owner;
  532. conn->isvalid = 1;
  533. req_recv_done_stats(ctx, pool, msg);
  534. /*msg fragment*/
  535. TAILQ_INIT(&frag_msgq);
  536. status = msg->fragment(msg, pool->ncontinuum, &frag_msgq);
  537. if (status < 0) {
  538. ASSERT(TAILQ_EMPTY(&frag_msgq));
  539. if (msg->err == MSG_FRAGMENT_ERR)
  540. stats_pool_incr(ctx, pool, fragment_error);
  541. else
  542. stats_pool_incr(ctx, pool, pool_withoutkey_req);
  543. log_debug("req_make_error");
  544. req_make_error(ctx, conn, msg, msg->err);
  545. return;
  546. }
  547. ASSERT(TAILQ_EMPTY(&frag_msgq));
  548. /* if no fragment happened */
  549. if (TAILQ_EMPTY(&frag_msgq)) {
  550. req_process(ctx, conn, msg);
  551. log_debug("req_recv_done leave.");
  552. return;
  553. }
  554. #if 0 // Not be supported multi req now.
  555. /*
  556. * insert msg into client in queue,it can
  557. * be free when client close connection,set done
  558. */
  559. msg->done = 1;
  560. conn->enqueue_inq(ctx, conn, msg);
  561. for (sub_msg = TAILQ_FIRST(&frag_msgq); sub_msg != NULL; sub_msg = tmsg) {
  562. tmsg = TAILQ_NEXT(sub_msg, o_tqe);
  563. log_debug("req forward msg %"PRIu64"", sub_msg->id);
  564. TAILQ_REMOVE(&frag_msgq, sub_msg, o_tqe);
  565. req_forward(ctx, conn, sub_msg);
  566. }
  567. ASSERT(TAILQ_EMPTY(&frag_msgq));
  568. #endif
  569. log_debug("req_recv_done leave.");
  570. return;
  571. }
  572. /*
  573. * whether req is done
  574. */
  575. bool req_done(struct conn *c, struct msg *msg)
  576. {
  577. int id, nfragment;
  578. struct msg *sub_msg, *temp_msg;
  579. if (!msg->done) {
  580. return false;
  581. }
  582. id = msg->frag_id;
  583. if (id == 0) {
  584. /*no fragment is happened*/
  585. return true;
  586. }
  587. if (msg->fdone) {
  588. return true;
  589. }
  590. log_debug("nfrag_done:%d,nfrag:%d", msg->frag_owner->nfrag_done,
  591. msg->frag_owner->nfrag);
  592. if (msg->frag_owner->nfrag_done < msg->frag_owner->nfrag) {
  593. return false;
  594. }
  595. /*
  596. * check all sub msg
  597. */
  598. for (sub_msg = msg, temp_msg = TAILQ_PREV(msg, msg_tqh, c_i_tqe);
  599. temp_msg != NULL && temp_msg->frag_id == id; sub_msg = temp_msg,
  600. temp_msg = TAILQ_PREV(temp_msg, msg_tqh, c_i_tqe)) {
  601. if (!(sub_msg->done)) {
  602. return false;
  603. }
  604. }
  605. for (sub_msg = msg, temp_msg = TAILQ_NEXT(msg, c_i_tqe);
  606. temp_msg != NULL && temp_msg->frag_id == id;
  607. sub_msg = temp_msg, temp_msg = TAILQ_NEXT(temp_msg, c_i_tqe)) {
  608. if (!(sub_msg->done)) {
  609. return false;
  610. }
  611. }
  612. msg->fdone = 1;
  613. nfragment = 0;
  614. /*
  615. * check all sub msgs and set fdone
  616. */
  617. for (sub_msg = msg, temp_msg = TAILQ_PREV(msg, msg_tqh, c_i_tqe);
  618. temp_msg != NULL && temp_msg->frag_id == id; sub_msg = temp_msg,
  619. temp_msg = TAILQ_PREV(temp_msg, msg_tqh, c_i_tqe)) {
  620. temp_msg->fdone = 1;
  621. nfragment++;
  622. }
  623. for (sub_msg = msg, temp_msg = TAILQ_NEXT(msg, c_i_tqe);
  624. temp_msg != NULL && temp_msg->frag_id == id;
  625. sub_msg = temp_msg, temp_msg = TAILQ_NEXT(temp_msg, c_i_tqe)) {
  626. temp_msg->fdone = 1;
  627. nfragment++;
  628. }
  629. ASSERT(msg->frag_owner->nfrag == nfragment);
  630. return true;
  631. }
  632. bool req_error(struct conn *conn, struct msg *msg)
  633. {
  634. struct msg *cmsg; /* current message */
  635. uint64_t id;
  636. uint32_t nfragment;
  637. ASSERT(msg->request && req_done(conn, msg));
  638. if (msg->error) {
  639. return true;
  640. }
  641. id = msg->frag_id;
  642. if (id == 0) {
  643. return false;
  644. }
  645. if (msg->ferror) {
  646. /* request has already been marked to be in error */
  647. return true;
  648. }
  649. for (cmsg = TAILQ_PREV(msg, msg_tqh, c_i_tqe);
  650. cmsg != NULL && cmsg->frag_id == id;
  651. cmsg = TAILQ_PREV(cmsg, msg_tqh, c_i_tqe)) {
  652. if (cmsg->error) {
  653. goto ferror;
  654. }
  655. }
  656. for (cmsg = TAILQ_NEXT(msg, c_i_tqe);
  657. cmsg != NULL && cmsg->frag_id == id;
  658. cmsg = TAILQ_NEXT(cmsg, c_i_tqe)) {
  659. if (cmsg->error) {
  660. goto ferror;
  661. }
  662. }
  663. return false;
  664. ferror:
  665. /*
  666. * Mark all fragments of the given request to be in error to speed up
  667. * future req_error calls for any of fragments of this request
  668. */
  669. msg->ferror = 1;
  670. nfragment = 1;
  671. for (cmsg = TAILQ_PREV(msg, msg_tqh, c_i_tqe);
  672. cmsg != NULL && cmsg->frag_id == id;
  673. cmsg = TAILQ_PREV(cmsg, msg_tqh, c_i_tqe)) {
  674. cmsg->ferror = 1;
  675. nfragment++;
  676. }
  677. for (cmsg = TAILQ_NEXT(msg, c_i_tqe);
  678. cmsg != NULL && cmsg->frag_id == id;
  679. cmsg = TAILQ_NEXT(cmsg, c_i_tqe)) {
  680. cmsg->ferror = 1;
  681. nfragment++;
  682. }
  683. log_debug("req from c %d with fid %" PRIu64 " and %" PRIu32 " "
  684. "fragments is in error",
  685. conn->fd, id, nfragment);
  686. return true;
  687. }
  688. void error_reply(struct msg *msg, struct conn *conn, struct context *ctx, int errcode)
  689. {
  690. if (net_send_error(msg, conn) < 0)
  691. return;
  692. req_make_loopback(ctx, conn, msg);
  693. }