multi_request.cc 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "multi_request.h"
  17. #include "task/task_request.h"
  18. #include "task/task_multi_unit.h"
  19. #include "key_list.h"
  20. #include "mem_check.h"
  21. static MultiTaskReply multiTaskReply;
  22. MultiRequest::MultiRequest(JobHubAskChain *o, DTCJobOperation *job)
  23. : owner(o), wait(job), keyList(NULL), keyMask(NULL), doneReq(0),
  24. totalReq(0), subReq(0), firstPass(1), keyFields(0), internal(0)
  25. {
  26. }
  27. MultiRequest::~MultiRequest()
  28. {
  29. if (wait) {
  30. wait->turn_around_job_answer();
  31. wait = NULL;
  32. }
  33. if (internal == 0)
  34. FREE_IF(keyList);
  35. FREE_IF(keyMask);
  36. }
  37. void MultiTaskReply::job_answer_procedure(DTCJobOperation *job_operation)
  38. {
  39. MultiRequest *req = job_operation->OwnerInfo<MultiRequest>();
  40. if (req == NULL)
  41. delete job_operation;
  42. else
  43. req->complete_task(job_operation, job_operation->owner_index());
  44. }
  45. DTCValue *MultiRequest::get_key_value(int i)
  46. {
  47. return &keyList[i * keyFields];
  48. }
  49. void MultiRequest::set_key_completed(int i)
  50. {
  51. FD_SET(i, (fd_set *)keyMask);
  52. doneReq++;
  53. }
  54. int MultiRequest::is_key_completed(int i)
  55. {
  56. return FD_ISSET(i, (fd_set *)keyMask);
  57. }
  58. int MultiRequest::decode_key_list(void)
  59. {
  60. if (!wait->flag_multi_key_val()) // single job
  61. return 0;
  62. const DTCTableDefinition *table_definition_ = wait->table_definition();
  63. keyFields = table_definition_->key_fields();
  64. if (wait->internal_key_val_list()) {
  65. // embeded API
  66. totalReq = wait->internal_key_val_list()->KeyCount();
  67. // Q&D discard const here
  68. // this keyList member can't be const,
  69. // but actually readonly after init
  70. keyList =
  71. (DTCValue *)&wait->internal_key_val_list()->Value(0, 0);
  72. internal = 1;
  73. } else {
  74. // from network
  75. uint8_t fieldID[keyFields];
  76. Array keyNameList(*(wait->key_name_list()));
  77. Array keyValList(*(wait->key_val_list()));
  78. DTCBinary keyName;
  79. for (int i = 0; i < keyFields; i++) {
  80. if (keyNameList.Get(keyName) != 0) {
  81. log4cplus_error(
  82. "get key name[%d] error, key field count:%d",
  83. i, table_definition_->key_fields());
  84. return -1;
  85. }
  86. fieldID[i] = table_definition_->field_id(keyName.ptr);
  87. }
  88. if (keyNameList.Get(keyName) == 0) {
  89. log4cplus_error("bogus key name: %.*s", keyName.len,
  90. keyName.ptr);
  91. return -1;
  92. }
  93. totalReq = wait->versionInfo.get_tag(11)->u64;
  94. keyList = (DTCValue *)MALLOC(totalReq * keyFields *
  95. sizeof(DTCValue));
  96. for (int i = 0; i < totalReq; i++) {
  97. DTCValue *keyVal = get_key_value(i);
  98. for (int j = 0; j < keyFields; j++) {
  99. int fid = fieldID[j];
  100. switch (table_definition_->field_type(fid)) {
  101. case DField::Signed:
  102. case DField::Unsigned:
  103. if (keyValList.Get(keyVal[fid].u64) !=
  104. 0) {
  105. log4cplus_error(
  106. "get key value[%d][%d] error",
  107. i, j);
  108. return -2;
  109. }
  110. break;
  111. case DField::String:
  112. case DField::Binary:
  113. if (keyValList.Get(keyVal[fid].bin) !=
  114. 0) {
  115. log4cplus_error(
  116. "get key value[%d][%d] error",
  117. i, j);
  118. return -2;
  119. }
  120. break;
  121. default:
  122. log4cplus_error(
  123. "invalid key type[%d][%d]", i,
  124. j);
  125. return -3;
  126. }
  127. }
  128. }
  129. }
  130. // keyMask = (unsigned char *)CALLOC(1, (totalReq*keyFields+7)/8);
  131. // 8 bytes aligned Awaste some memory. FD_SET operate memory by 8bytes
  132. keyMask = (unsigned char *)CALLOC(
  133. 8, (((totalReq * keyFields + 7) / 8) + 7) / 8);
  134. return totalReq;
  135. }
  136. int MultiRequest::split_task(void)
  137. {
  138. log4cplus_debug("split_task begin, totalReq: %d", totalReq);
  139. for (int i = 0; i < totalReq; i++) {
  140. if (is_key_completed(i))
  141. continue;
  142. DTCValue *keyVal = get_key_value(i);
  143. DTCJobOperation *pJob = new DTCJobOperation;
  144. if (pJob == NULL) {
  145. log4cplus_error("%s: %m", "new job error");
  146. return -1;
  147. }
  148. if (pJob->Copy(*wait, keyVal) < 0) {
  149. log4cplus_error("copy job error: %s",
  150. pJob->resultInfo.error_message());
  151. delete pJob;
  152. return -1;
  153. }
  154. pJob->set_owner_info(this, i, wait->OwnerAddress());
  155. pJob->push_reply_dispatcher(&multiTaskReply);
  156. owner->push_task_queue(pJob);
  157. subReq++;
  158. }
  159. log4cplus_debug("split_task end, subReq: %d", subReq);
  160. return 0;
  161. }
  162. void MultiRequest::complete_task(DTCJobOperation *req, int index)
  163. {
  164. log4cplus_debug("MultiRequest::complete_task start, index: %d", index);
  165. if (wait) {
  166. if (wait->result_code() >= 0 && req->result_code() < 0) {
  167. wait->set_error_dup(req->resultInfo.result_code(),
  168. req->resultInfo.error_from(),
  169. req->resultInfo.error_message());
  170. }
  171. int ret;
  172. if ((ret = wait->merge_result(*req)) != 0) {
  173. wait->set_error(ret, "multi_request",
  174. "merge result error");
  175. }
  176. }
  177. delete req;
  178. set_key_completed(index);
  179. subReq--;
  180. // 注意,如果将CTaskMultiplexer放到cache线程执行,则会导致每split一个task,都是直接到cache_process执行完到这里;然后再split出第二个task。这会导致这一个判断逻辑有问题。
  181. // 目前CTaskMultiplexer是跟incoming线程绑在一起的,因此没有问题
  182. if (firstPass == 0 && subReq == 0) {
  183. complete_waiter();
  184. delete this;
  185. }
  186. log4cplus_debug("MultiRequest::complete_task end, subReq: %d", subReq);
  187. }
  188. void MultiRequest::complete_waiter(void)
  189. {
  190. if (wait) {
  191. wait->turn_around_job_answer();
  192. wait = 0;
  193. }
  194. }
  195. void MultiRequest::second_pass(int err)
  196. {
  197. firstPass = 0;
  198. if (subReq == 0) {
  199. // no sub-request present, complete whole request
  200. complete_waiter();
  201. delete this;
  202. } else if (err) {
  203. // mark all request is done except sub-requests
  204. doneReq = totalReq - subReq;
  205. complete_waiter();
  206. }
  207. return;
  208. }
  209. int DTCJobOperation::set_batch_cursor(int index)
  210. {
  211. int err = 0;
  212. MultiRequest *mreq = get_batch_key_list();
  213. if (mreq == NULL)
  214. return -1;
  215. if (index < 0 || index >= mreq->total_count()) {
  216. key = NULL;
  217. multi_key = NULL;
  218. return -1;
  219. } else {
  220. DTCValue *keyVal = mreq->get_key_value(index);
  221. int kf = table_definition()->key_fields();
  222. /* switch request_key() */
  223. key = &keyVal[0];
  224. if (kf > 1) {
  225. /* switch multi-fields key */
  226. multi_key = keyVal;
  227. }
  228. err = build_packed_key();
  229. if (err < 0) {
  230. log4cplus_error(
  231. "build packed key error, error from: %s, error message: %s",
  232. resultInfo.error_from(),
  233. resultInfo.error_message());
  234. return -1;
  235. }
  236. }
  237. return 0;
  238. }
  239. void DTCJobOperation::done_batch_cursor(int index)
  240. {
  241. MultiRequest *mreq = get_batch_key_list();
  242. if (mreq == NULL)
  243. return;
  244. mreq->set_key_completed(index);
  245. }