task_pendlist.cc 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "task_pendlist.h"
  17. #include "buffer_process_ask_chain.h"
  18. #include "log/log.h"
  19. DTC_USING_NAMESPACE
  20. TaskPendingList::TaskPendingList(JobAskInterface<DTCJobOperation> *o, int to)
  21. : _timeout(to), _timelist(0), _owner(o), _wakeup(0)
  22. {
  23. _timelist = _owner->owner->get_timer_list(_timeout);
  24. }
  25. TaskPendingList::~TaskPendingList()
  26. {
  27. std::list<slot_t>::iterator it;
  28. for (it = _pendlist.begin(); it != _pendlist.end(); ++it) {
  29. //把所有请求踢回客户端
  30. it->first->set_error(-ETIMEDOUT, __FUNCTION__,
  31. "object deconstruct");
  32. it->first->turn_around_job_answer();
  33. }
  34. }
  35. void TaskPendingList::add2_list(DTCJobOperation *job)
  36. {
  37. if (job) {
  38. if (_pendlist.empty())
  39. attach_timer(_timelist);
  40. _pendlist.push_back(std::make_pair(job, time(NULL)));
  41. }
  42. return;
  43. }
  44. // 唤醒队列中所有已经pending的task
  45. void TaskPendingList::Wakeup(void)
  46. {
  47. log4cplus_debug("TaskPendingList Wakeup");
  48. //唤醒所有task
  49. _wakeup = 1;
  50. attach_ready_timer(_owner->owner);
  51. return;
  52. }
  53. void TaskPendingList::job_timer_procedure(void)
  54. {
  55. log4cplus_debug("enter timer procedure");
  56. std::list<slot_t> copy;
  57. copy.swap(_pendlist);
  58. std::list<slot_t>::iterator it;
  59. if (_wakeup) {
  60. for (it = copy.begin(); it != copy.end(); ++it) {
  61. _owner->job_ask_procedure(it->first);
  62. }
  63. _wakeup = 0;
  64. } else {
  65. time_t now = time(NULL);
  66. for (it = copy.begin(); it != copy.end(); ++it) {
  67. //超时处理
  68. if (it->second + _timeout >= now) {
  69. _pendlist.push_back(*it);
  70. } else {
  71. it->first->set_error(-ETIMEDOUT, __FUNCTION__,
  72. "pending job is timedout");
  73. it->first->turn_around_job_answer();
  74. }
  75. }
  76. if (!_pendlist.empty())
  77. attach_timer(_timelist);
  78. }
  79. log4cplus_debug("leave timer procedure");
  80. return;
  81. }