tc_thread_queue.h 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #ifndef __TC_THREAD_QUEUE_H_
  17. #define __TC_THREAD_QUEUE_H_
  18. #include <deque>
  19. #include <vector>
  20. #include <cassert>
  21. #include <mutex>
  22. #include <condition_variable>
  23. using namespace std;
  24. namespace tars
  25. {
  26. /////////////////////////////////////////////////
  27. /**
  28. * @file tc_thread_queue.h
  29. * @brief 线程队列类.
  30. *
  31. * @author jarodruan@upchina.com
  32. */
  33. /////////////////////////////////////////////////
  34. /**
  35. * @brief 线程安全队列
  36. */
  37. template<typename T, typename D = deque<T> >
  38. class TC_ThreadQueue
  39. {
  40. public:
  41. TC_ThreadQueue():_size(0){};
  42. public:
  43. typedef D queue_type;
  44. /**
  45. * @brief 从头部获取数据, 没有数据则等待.
  46. *
  47. * @param t
  48. * @param millsecond(wait = true时才生效) 阻塞等待时间(ms)
  49. * 0 表示不阻塞
  50. * -1 永久等待
  51. * @param wait, 是否wait
  52. * @return bool: true, 获取了数据, false, 无数据
  53. */
  54. bool pop_front(T& t, size_t millsecond = 0, bool wait = true);
  55. /**
  56. * @brief 通知等待在队列上面的线程都醒过来
  57. */
  58. void notifyT();
  59. /**
  60. * @brief 放数据到队列后端.
  61. *
  62. * @param t
  63. */
  64. void push_back(const T& t, bool notify = true);
  65. /**
  66. * @brief 放数据到队列后端.
  67. *
  68. * @param vt
  69. */
  70. void push_back(const queue_type &qt, bool notify = true);
  71. /**
  72. * @brief 放数据到队列前端.
  73. *
  74. * @param t
  75. */
  76. void push_front(const T& t, bool notify = true);
  77. /**
  78. * @brief 放数据到队列前端.
  79. *
  80. * @param vt
  81. */
  82. void push_front(const queue_type &qt, bool notify = true);
  83. /**
  84. * @brief 交换数据
  85. *
  86. * @param q
  87. * @param millsecond(wait = true时才生效) 阻塞等待时间(ms)
  88. * 0 表示不阻塞
  89. * -1 如果为则永久等待
  90. * @param 是否等待有数据
  91. * @return 有数据返回true, 无数据返回false
  92. */
  93. bool swap(queue_type &q, size_t millsecond = 0, bool wait = true);
  94. /**
  95. * @brief 队列大小.
  96. *
  97. * @return size_t 队列大小
  98. */
  99. size_t size() const;
  100. /**
  101. * @brief 清空队列
  102. */
  103. void clear();
  104. /**
  105. * @brief 是否数据为空.
  106. *
  107. * @return bool 为空返回true,否则返回false
  108. */
  109. bool empty() const;
  110. protected:
  111. TC_ThreadQueue(const TC_ThreadQueue&) = delete;
  112. TC_ThreadQueue(TC_ThreadQueue&&) = delete;
  113. TC_ThreadQueue& operator=(const TC_ThreadQueue&) = delete;
  114. TC_ThreadQueue& operator=(TC_ThreadQueue&&) = delete;
  115. protected:
  116. /**
  117. * 队列
  118. */
  119. queue_type _queue;
  120. /**
  121. * 队列长度
  122. */
  123. size_t _size;
  124. //条件变量
  125. std::condition_variable _cond;
  126. //锁
  127. mutable std::mutex _mutex;
  128. };
  129. template<typename T, typename D> bool TC_ThreadQueue<T, D>::pop_front(T& t, size_t millsecond, bool wait)
  130. {
  131. if(wait) {
  132. std::unique_lock<std::mutex> lock(_mutex);
  133. if (_queue.empty()) {
  134. if (millsecond == 0) {
  135. return false;
  136. }
  137. if (millsecond == (size_t) -1) {
  138. _cond.wait(lock);
  139. }
  140. else {
  141. //超时了
  142. if (_cond.wait_for(lock, std::chrono::milliseconds(millsecond)) == std::cv_status::timeout) {
  143. return false;
  144. }
  145. }
  146. }
  147. if (_queue.empty()) {
  148. return false;
  149. }
  150. t = _queue.front();
  151. _queue.pop_front();
  152. assert(_size > 0);
  153. --_size;
  154. return true;
  155. }
  156. else
  157. {
  158. std::lock_guard<std::mutex> lock (_mutex);
  159. if (_queue.empty())
  160. {
  161. return false;
  162. }
  163. t = _queue.front();
  164. _queue.pop_front();
  165. assert(_size > 0);
  166. --_size;
  167. return true;
  168. }
  169. }
  170. template<typename T, typename D> void TC_ThreadQueue<T, D>::notifyT()
  171. {
  172. std::unique_lock<std::mutex> lock(_mutex);
  173. _cond.notify_all();
  174. }
  175. template<typename T, typename D> void TC_ThreadQueue<T, D>::push_back(const T& t, bool notify)
  176. {
  177. if(notify) {
  178. std::unique_lock<std::mutex> lock(_mutex);
  179. _queue.push_back(t);
  180. ++_size;
  181. _cond.notify_one();
  182. }
  183. else
  184. {
  185. std::lock_guard<std::mutex> lock (_mutex);
  186. _queue.push_back(t);
  187. ++_size;
  188. }
  189. }
  190. template<typename T, typename D> void TC_ThreadQueue<T, D>::push_back(const queue_type &qt, bool notify)
  191. {
  192. if(notify) {
  193. std::unique_lock<std::mutex> lock(_mutex);
  194. typename queue_type::const_iterator it = qt.begin();
  195. typename queue_type::const_iterator itEnd = qt.end();
  196. while (it != itEnd) {
  197. _queue.push_back(*it);
  198. ++it;
  199. ++_size;
  200. }
  201. _cond.notify_all();
  202. }
  203. else
  204. {
  205. std::lock_guard<std::mutex> lock (_mutex);
  206. typename queue_type::const_iterator it = qt.begin();
  207. typename queue_type::const_iterator itEnd = qt.end();
  208. while (it != itEnd) {
  209. _queue.push_back(*it);
  210. ++it;
  211. ++_size;
  212. }
  213. }
  214. }
  215. template<typename T, typename D> void TC_ThreadQueue<T, D>::push_front(const T& t, bool notify)
  216. {
  217. if(notify) {
  218. std::unique_lock<std::mutex> lock(_mutex);
  219. _cond.notify_one();
  220. _queue.push_front(t);
  221. ++_size;
  222. }
  223. else
  224. {
  225. std::lock_guard<std::mutex> lock (_mutex);
  226. _queue.push_front(t);
  227. ++_size;
  228. }
  229. }
  230. template<typename T, typename D> void TC_ThreadQueue<T, D>::push_front(const queue_type &qt, bool notify)
  231. {
  232. if(notify) {
  233. std::unique_lock<std::mutex> lock(_mutex);
  234. typename queue_type::const_iterator it = qt.begin();
  235. typename queue_type::const_iterator itEnd = qt.end();
  236. while (it != itEnd) {
  237. _queue.push_front(*it);
  238. ++it;
  239. ++_size;
  240. }
  241. _cond.notify_all();
  242. }
  243. else
  244. {
  245. std::lock_guard<std::mutex> lock (_mutex);
  246. typename queue_type::const_iterator it = qt.begin();
  247. typename queue_type::const_iterator itEnd = qt.end();
  248. while (it != itEnd) {
  249. _queue.push_front(*it);
  250. ++it;
  251. ++_size;
  252. }
  253. }
  254. }
  255. template<typename T, typename D> bool TC_ThreadQueue<T, D>::swap(queue_type &q, size_t millsecond, bool wait)
  256. {
  257. if(wait) {
  258. std::unique_lock<std::mutex> lock(_mutex);
  259. if (_queue.empty()) {
  260. if (millsecond == 0) {
  261. return false;
  262. }
  263. if (millsecond == (size_t) -1) {
  264. _cond.wait(lock);
  265. }
  266. else {
  267. //超时了
  268. if (_cond.wait_for(lock, std::chrono::milliseconds(millsecond)) == std::cv_status::timeout) {
  269. return false;
  270. }
  271. }
  272. }
  273. if (_queue.empty()) {
  274. return false;
  275. }
  276. q.swap(_queue);
  277. _size = _queue.size();
  278. return true;
  279. }
  280. else
  281. {
  282. std::lock_guard<std::mutex> lock (_mutex);
  283. if (_queue.empty()) {
  284. return false;
  285. }
  286. q.swap(_queue);
  287. _size = _queue.size();
  288. return true;
  289. }
  290. }
  291. template<typename T, typename D> size_t TC_ThreadQueue<T, D>::size() const
  292. {
  293. std::lock_guard<std::mutex> lock(_mutex);
  294. return _size;
  295. }
  296. template<typename T, typename D> void TC_ThreadQueue<T, D>::clear()
  297. {
  298. std::lock_guard<std::mutex> lock(_mutex);
  299. _queue.clear();
  300. _size = 0;
  301. }
  302. template<typename T, typename D> bool TC_ThreadQueue<T, D>::empty() const
  303. {
  304. std::lock_guard<std::mutex> lock(_mutex);
  305. return _queue.empty();
  306. }
  307. }
  308. #endif