request_threading.h 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. /*
  2. * Copyright [2021] JD.com, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #ifndef __H_DTC_REQUEST_THREAD_H__
  17. #define __H_DTC_REQUEST_THREAD_H__
  18. #include "queue/mtpqueue.h"
  19. #include "queue/wait_queue.h"
  20. #include "../request/request_base_all.h"
  21. template <typename T> class ThreadingOutputDispatcher {
  22. private: // internal implementation
  23. class InternalTaskDispatcher
  24. : public ThreadingPipeQueue<T *, InternalTaskDispatcher> {
  25. public:
  26. JobAskInterface<T> *proc;
  27. public:
  28. InternalTaskDispatcher() : proc(0)
  29. {
  30. }
  31. virtual ~InternalTaskDispatcher()
  32. {
  33. }
  34. void job_ask_procedure(T *p)
  35. {
  36. proc->job_ask_procedure(p);
  37. };
  38. };
  39. class InternalReplyDispatcher : public JobAnswerInterface<T>,
  40. public threading_wait_queue<T *> {
  41. public:
  42. InternalReplyDispatcher *freenext;
  43. InternalReplyDispatcher *allnext;
  44. public:
  45. InternalReplyDispatcher() : freenext(0), allnext(0)
  46. {
  47. }
  48. virtual ~InternalReplyDispatcher()
  49. {
  50. }
  51. virtual void job_answer_procedure(T *p)
  52. {
  53. this->Push(p);
  54. };
  55. };
  56. private:
  57. InternalTaskDispatcher incQueue;
  58. pthread_mutex_t lock;
  59. // lock management, protect free_list and allList
  60. inline void Lock(void)
  61. {
  62. pthread_mutex_lock(&lock);
  63. }
  64. inline void Unlock(void)
  65. {
  66. pthread_mutex_unlock(&lock);
  67. }
  68. InternalReplyDispatcher *volatile free_list;
  69. InternalReplyDispatcher *volatile allList;
  70. volatile int stopping;
  71. public:
  72. ThreadingOutputDispatcher() : free_list(0), allList(0), stopping(0)
  73. {
  74. pthread_mutex_init(&lock, NULL);
  75. }
  76. ~ThreadingOutputDispatcher()
  77. {
  78. InternalReplyDispatcher *q;
  79. while (allList) {
  80. q = allList;
  81. allList = q->allnext;
  82. delete q;
  83. }
  84. pthread_mutex_destroy(&lock);
  85. }
  86. void Stop(void)
  87. {
  88. stopping = 1;
  89. InternalReplyDispatcher *p;
  90. for (p = allList; p; p = p->allnext)
  91. p->Stop(NULL);
  92. }
  93. int Stopping(void)
  94. {
  95. return stopping;
  96. }
  97. int do_execute(T *p)
  98. {
  99. InternalReplyDispatcher *q;
  100. // aborted without side-effect
  101. if (Stopping())
  102. return -1;
  103. /* freelist被别的线程在lock锁住的时候被别的线程置成了NULL */
  104. Lock();
  105. if (free_list) {
  106. q = free_list;
  107. free_list = q->freenext;
  108. q->Clear();
  109. q->freenext = NULL;
  110. } else {
  111. q = new InternalReplyDispatcher();
  112. q->allnext = allList;
  113. allList = q;
  114. }
  115. Unlock();
  116. p->push_reply_dispatcher(q);
  117. incQueue.Push(p);
  118. // has side effect
  119. if (q->Pop() == NULL) {
  120. // q leaked by purpose
  121. // because an pending reply didn't Popped
  122. return -2;
  123. }
  124. Lock();
  125. q->freenext = free_list;
  126. free_list = q;
  127. Unlock();
  128. return 0;
  129. }
  130. int register_next_chain(JobAskInterface<T> *to)
  131. {
  132. log4cplus_debug("Create register_next_chain to thread %s",
  133. to->get_owner_thread()->Name());
  134. incQueue.proc = to;
  135. return incQueue.attach_poller(to->get_owner_thread());
  136. }
  137. };
  138. #endif