tc_timeout_queue.h 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #ifndef __TC_TIMEOUT_QUEUE_H
  17. #define __TC_TIMEOUT_QUEUE_H
  18. #include <deque>
  19. #include <iostream>
  20. #include <list>
  21. #include <cassert>
  22. #include <unordered_map>
  23. #include <chrono>
  24. #include "util/tc_autoptr.h"
  25. #include <mutex>
  26. #include <functional>
  27. using namespace std;
  28. namespace tars
  29. {
  30. /////////////////////////////////////////////////
  31. /**
  32. * @file tc_timeout_queue.h
  33. * @brief 超时队列(模板元素只能是智能指针, 且继承TC_HandleBase).
  34. *
  35. * @author coonzhang@tencent.com
  36. */
  37. /////////////////////////////////////////////////
  38. template<class T>
  39. class TC_TimeoutQueue: public TC_HandleBase
  40. {
  41. public:
  42. struct PtrInfo;
  43. struct NodeInfo;
  44. // typedef map<uint32_t, PtrInfo> data_type;
  45. typedef unordered_map<uint32_t, PtrInfo> data_type;
  46. typedef list<NodeInfo> time_type;
  47. // typedef TC_Functor<void, TL::TYPELIST_1(T&)> data_functor;
  48. typedef std::function<void(T&)> data_functor;
  49. struct PtrInfo
  50. {
  51. T ptr;
  52. typename time_type::iterator timeIter;
  53. };
  54. struct NodeInfo
  55. {
  56. bool hasPoped;
  57. int64_t createTime;
  58. typename data_type::iterator dataIter;
  59. };
  60. /**
  61. * @brief 超时队列,缺省5s超时.
  62. *
  63. * @param timeout 超时设定时间
  64. * @param size
  65. */
  66. TC_TimeoutQueue(int timeout = 5 * 1000) : _uniqId(0), _timeout(timeout)
  67. {
  68. _firstNoPopIter = _time.end();
  69. // _data.resize(size);
  70. }
  71. /**
  72. * @brief 产生该队列的下一个ID
  73. */
  74. uint32_t generateId();
  75. /**
  76. * @brief 获取指定id的数据.
  77. *
  78. * @param id 指定的数据的id
  79. * @return T 指定id的数据
  80. */
  81. T get(uint32_t uniqId, bool bErase = true);
  82. /**
  83. * @brief, 获取数据并更新时间链, 从而能够不超时
  84. * @param uniqId
  85. * @return T 指定id的数据
  86. */
  87. T getAndRefresh(uint32_t uniqId);
  88. /**
  89. * @brief 删除.
  90. *
  91. * @param uniqId 要删除的数据的id
  92. * @return T 被删除的数据
  93. */
  94. T erase(uint32_t uniqId);
  95. /**
  96. * @brief 设置消息到队列尾端.
  97. *
  98. * @param ptr 要插入到队列尾端的消息
  99. * @return uint32_t id号
  100. */
  101. bool push(const T& ptr, uint32_t uniqId);
  102. /**
  103. * @brief 超时删除数据
  104. */
  105. void timeout();
  106. /**
  107. * @brief 删除超时的数据,并用df对数据做处理
  108. */
  109. void timeout(data_functor &df);
  110. /**
  111. * @brief 取出队列头部的消息.
  112. *
  113. * @return T 队列头部的消息
  114. */
  115. T pop();
  116. /**
  117. * @brief 取出队列头部的消息(减少一次copy).
  118. *
  119. * @param t
  120. */
  121. bool pop(T &t);
  122. /**
  123. * @brief 交换数据.
  124. *
  125. * @param q
  126. * @return bool
  127. */
  128. bool swap(deque<T> &q);
  129. /**
  130. * @brief 设置超时时间(毫秒).
  131. *
  132. * @param timeout
  133. */
  134. void setTimeout(time_t timeout) { _timeout = timeout; }
  135. /**
  136. * @brief 获取超时时间
  137. * @return [description]
  138. */
  139. time_t getTimeout() const { return _timeout; }
  140. /**
  141. * @brief 队列中的数据.
  142. *
  143. * @return size_t
  144. */
  145. size_t size() const { std::lock_guard<std::mutex> lock(_mutex); return _data.size(); }
  146. protected:
  147. uint32_t _uniqId;
  148. time_t _timeout;
  149. data_type _data;
  150. time_type _time;
  151. typename time_type::iterator _firstNoPopIter;
  152. mutable std::mutex _mutex;
  153. };
  154. template<typename T> T TC_TimeoutQueue<T>::get(uint32_t uniqId, bool bErase)
  155. {
  156. std::lock_guard<std::mutex> lock(_mutex);
  157. typename data_type::iterator it = _data.find(uniqId);
  158. if (it == _data.end())
  159. {
  160. return NULL;
  161. }
  162. T ptr = it->second.ptr;
  163. if (bErase)
  164. {
  165. if (_firstNoPopIter == it->second.timeIter)
  166. {
  167. ++_firstNoPopIter;
  168. }
  169. _time.erase(it->second.timeIter);
  170. _data.erase(it);
  171. }
  172. return ptr;
  173. }
  174. template<typename T> T TC_TimeoutQueue<T>::getAndRefresh(uint32_t uniqId)
  175. {
  176. std::lock_guard<std::mutex> lock(_mutex);
  177. typename data_type::iterator it = _data.find(uniqId);
  178. if (it == _data.end())
  179. {
  180. return NULL;
  181. }
  182. T ptr = it->second.ptr;
  183. //从时间队列中删除
  184. if (_firstNoPopIter == it->second.timeIter)
  185. {
  186. ++_firstNoPopIter;
  187. }
  188. _time.erase(it->second.timeIter);
  189. //再插入到时间队列末尾
  190. NodeInfo ni;
  191. ni.createTime = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
  192. ni.dataIter = it;
  193. ni.hasPoped = false;
  194. _time.push_back(ni);
  195. typename time_type::iterator tmp = _time.end();
  196. --tmp;
  197. it->second.timeIter = tmp;
  198. if (_firstNoPopIter == _time.end())
  199. {
  200. _firstNoPopIter = tmp;
  201. }
  202. return ptr;
  203. }
  204. template<typename T> uint32_t TC_TimeoutQueue<T>::generateId()
  205. {
  206. std::lock_guard<std::mutex> lock(_mutex);
  207. while (++_uniqId == 0);
  208. return _uniqId;
  209. }
  210. template<typename T> bool TC_TimeoutQueue<T>::push(const T& ptr, uint32_t uniqId)
  211. {
  212. std::lock_guard<std::mutex> lock(_mutex);
  213. PtrInfo pi;
  214. pi.ptr = ptr;
  215. pair<typename data_type::iterator, bool> result;
  216. result = _data.insert(make_pair(uniqId, pi));
  217. if (result.second == false) return false;
  218. typename data_type::iterator it = result.first;
  219. NodeInfo ni;
  220. ni.createTime = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
  221. ni.dataIter = it;
  222. ni.hasPoped = false;
  223. _time.push_back(ni);
  224. typename time_type::iterator tmp = _time.end();
  225. --tmp;
  226. it->second.timeIter = tmp;
  227. if (_firstNoPopIter == _time.end())
  228. {
  229. _firstNoPopIter = tmp;
  230. }
  231. return true;
  232. }
  233. template<typename T> void TC_TimeoutQueue<T>::timeout()
  234. {
  235. // struct timeval tv;
  236. // TC_TimeProvider::getInstance()->getNow(&tv);
  237. auto ms = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
  238. while (true)
  239. {
  240. std::lock_guard<std::mutex> lock(_mutex);
  241. typename time_type::iterator it = _time.begin();
  242. if (it != _time.end() && ms - it->createTime > _timeout)
  243. {
  244. _data.erase(it->dataIter);
  245. if (_firstNoPopIter == it)
  246. {
  247. ++_firstNoPopIter;
  248. }
  249. _time.erase(it);
  250. }
  251. else
  252. {
  253. break;
  254. }
  255. }
  256. }
  257. template<typename T> void TC_TimeoutQueue<T>::timeout(data_functor &df)
  258. {
  259. // struct timeval tv;
  260. // TC_TimeProvider::getInstance()->getNow(&tv);
  261. //
  262. auto ms = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
  263. while (true)
  264. {
  265. T ptr;
  266. {
  267. std::lock_guard<std::mutex> lock(_mutex);
  268. typename time_type::iterator it = _time.begin();
  269. if (it != _time.end() && ms - it->createTime > _timeout)
  270. {
  271. ptr = (*it->dataIter).second.ptr;
  272. _data.erase(it->dataIter);
  273. if (_firstNoPopIter == it)
  274. {
  275. _firstNoPopIter++;
  276. }
  277. _time.erase(it);
  278. }
  279. else
  280. {
  281. break;
  282. }
  283. }
  284. try { df(ptr); } catch (...) { }
  285. }
  286. }
  287. template<typename T> T TC_TimeoutQueue<T>::erase(uint32_t uniqId)
  288. {
  289. std::lock_guard<std::mutex> lock(_mutex);
  290. typename data_type::iterator it = _data.find(uniqId);
  291. if (it == _data.end())
  292. {
  293. return NULL;
  294. }
  295. T ptr = it->second.ptr;
  296. if (_firstNoPopIter == it->second.timeIter)
  297. {
  298. _firstNoPopIter++;
  299. }
  300. _time.erase(it->second.timeIter);
  301. _data.erase(it);
  302. return ptr;
  303. }
  304. template<typename T> T TC_TimeoutQueue<T>::pop()
  305. {
  306. T ptr;
  307. return pop(ptr) ? ptr : NULL;
  308. }
  309. template<typename T> bool TC_TimeoutQueue<T>::pop(T &ptr)
  310. {
  311. std::lock_guard<std::mutex> lock(_mutex);
  312. if (_time.empty())
  313. {
  314. return false;
  315. }
  316. typename time_type::iterator it = _time.begin();
  317. if (it->hasPoped == true)
  318. {
  319. it = _firstNoPopIter;
  320. }
  321. if (it == _time.end())
  322. {
  323. return false;
  324. }
  325. assert(it->hasPoped == false);
  326. ptr = it->dataIter->second.ptr;
  327. it->hasPoped = true;
  328. _firstNoPopIter = it;
  329. ++_firstNoPopIter;
  330. return true;
  331. }
  332. template<typename T> bool TC_TimeoutQueue<T>::swap(deque<T> &q)
  333. {
  334. std::lock_guard<std::mutex> lock(_mutex);
  335. if (_time.empty())
  336. {
  337. return false;
  338. }
  339. typename time_type::iterator it = _time.begin();
  340. while (it != _time.end())
  341. {
  342. if (it->hasPoped == true)
  343. {
  344. it = _firstNoPopIter;
  345. }
  346. if (it == _time.end())
  347. {
  348. break;
  349. }
  350. assert(it->hasPoped == false);
  351. T ptr = it->dataIter->second.ptr;
  352. it->hasPoped = true;
  353. _firstNoPopIter = it;
  354. ++_firstNoPopIter;
  355. q.push_back(ptr);
  356. ++it;
  357. }
  358. if (q.empty())
  359. {
  360. return false;
  361. }
  362. return true;
  363. }
  364. /////////////////////////////////////////////////////////////////
  365. }
  366. #endif