tc_timeout_queue_new.h 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #ifndef __TC_TIMEOUT_QUEUE_NEW_H
  17. #define __TC_TIMEOUT_QUEUE_NEW_H
  18. #include <map>
  19. #include <list>
  20. #include <functional>
  21. // #include <ext/hash_map>
  22. #include <unordered_map>
  23. #include <iostream>
  24. #include <cassert>
  25. #include "util/tc_autoptr.h"
  26. #include "util/tc_monitor.h"
  27. // #include "util/tc_functor.h"
  28. #include "util/tc_timeprovider.h"
  29. using namespace std;
  30. namespace tars
  31. {
  32. /////////////////////////////////////////////////
  33. /**
  34. * @file tc_timeout_queue_new.h
  35. * @brief 超时队列, 没有锁, 非线程安全.
  36. *
  37. */
  38. /////////////////////////////////////////////////
  39. template<class T>
  40. class TC_TimeoutQueueNew
  41. {
  42. public:
  43. struct PtrInfo;
  44. struct NodeInfo;
  45. struct SendInfo;
  46. typedef unordered_map<uint32_t, PtrInfo> data_type;
  47. typedef multimap<int64_t,NodeInfo> time_type;
  48. typedef list<SendInfo> send_type;
  49. typedef std::function<void(T&)> data_functor;
  50. struct PtrInfo
  51. {
  52. T ptr;
  53. bool hasSend;
  54. typename time_type::iterator timeIter;
  55. typename send_type::iterator sendIter;
  56. };
  57. struct NodeInfo
  58. {
  59. typename data_type::iterator dataIter;
  60. };
  61. struct SendInfo
  62. {
  63. typename data_type::iterator dataIter;
  64. };
  65. /**
  66. * @brief 超时队列,缺省5s超时.
  67. *
  68. * @param timeout 超时设定时间
  69. * @param size
  70. */
  71. TC_TimeoutQueueNew(int timeout = 5*1000, size_t size = 100 ) : _uniqId(0)
  72. {
  73. _data.reserve(size);
  74. }
  75. /**
  76. * @brief 产生该队列的下一个ID
  77. */
  78. uint32_t generateId();
  79. /**
  80. * 要发送的链表是否为空
  81. */
  82. bool sendListEmpty()
  83. {
  84. return _send.empty();
  85. }
  86. /**
  87. * 获取要发送的数据
  88. */
  89. bool getSend(T & t);
  90. /**
  91. * 把已经发送的数据从list里面删除
  92. */
  93. void popSend(bool del = false);
  94. /**
  95. *获取要发送list的size
  96. */
  97. size_t getSendListSize()
  98. {
  99. return _send.size();
  100. }
  101. /**
  102. * @brief 获取指定id的数据.
  103. *
  104. * @param id 指定的数据的id
  105. * @param T 指定id的数据
  106. * @return bool get的结果
  107. */
  108. bool get(uint32_t uniqId, T & t,bool bErase = true);
  109. /**
  110. * @brief 删除.
  111. *
  112. * @param uniqId 要删除的数据的id
  113. * @param T 被删除的数据
  114. * @return bool 删除结果
  115. */
  116. bool erase(uint32_t uniqId, T & t);
  117. /**
  118. * @brief 设置消息到队列尾端.
  119. *
  120. * @param ptr 要插入到队列尾端的消息
  121. * @param uniqId 序列号
  122. * @param timeout 超时时间
  123. * @return true 成功 false 失败
  124. */
  125. bool push(T& ptr, uint32_t uniqId,int64_t timeout,bool hasSend = true);
  126. /**
  127. * @brief 超时删除数据
  128. */
  129. void timeout();
  130. /**
  131. * @brief 超时删除数据
  132. */
  133. bool timeout(T & t);
  134. /**
  135. * @brief 删除超时的数据,并用df对数据做处理
  136. */
  137. void timeout(data_functor &df);
  138. /**
  139. * @brief 队列中的数据.
  140. *
  141. * @return size_t
  142. */
  143. size_t size() const { return _data.size(); }
  144. protected:
  145. atomic<uint32_t> _uniqId;
  146. data_type _data;
  147. time_type _time;
  148. send_type _send;
  149. };
  150. template<typename T> bool TC_TimeoutQueueNew<T>::getSend(T & t)
  151. {
  152. //链表为空返回失败
  153. if(_send.empty())
  154. {
  155. return false;
  156. }
  157. SendInfo & stSendInfo = _send.back();
  158. assert(!stSendInfo.dataIter->second.hasSend);
  159. t = stSendInfo.dataIter->second.ptr;
  160. return true;
  161. }
  162. template<typename T> void TC_TimeoutQueueNew<T>::popSend(bool del)
  163. {
  164. assert(!_send.empty());
  165. SendInfo stSendInfo;
  166. stSendInfo = _send.back();
  167. stSendInfo.dataIter->second.hasSend = true;
  168. if(del)
  169. {
  170. _time.erase(stSendInfo.dataIter->second.timeIter);
  171. _data.erase(stSendInfo.dataIter);
  172. }
  173. _send.pop_back();
  174. }
  175. template<typename T> bool TC_TimeoutQueueNew<T>::get(uint32_t uniqId, T & t, bool bErase)
  176. {
  177. typename data_type::iterator it = _data.find(uniqId);
  178. if(it == _data.end())
  179. {
  180. return false;
  181. }
  182. t = it->second.ptr;
  183. if(bErase)
  184. {
  185. _time.erase(it->second.timeIter);
  186. if(!it->second.hasSend)
  187. {
  188. _send.erase(it->second.sendIter);
  189. }
  190. _data.erase(it);
  191. }
  192. return true;
  193. }
  194. template<typename T> uint32_t TC_TimeoutQueueNew<T>::generateId()
  195. {
  196. uint32_t i = ++_uniqId;
  197. if(i == 0) {
  198. i = ++_uniqId;
  199. }
  200. return i;
  201. }
  202. template<typename T> bool TC_TimeoutQueueNew<T>::push(T& ptr, uint32_t uniqId,int64_t timeout,bool hasSend)
  203. {
  204. PtrInfo pi;
  205. pi.ptr = ptr;
  206. pi.hasSend = hasSend;
  207. pair<typename data_type::iterator, bool> result;
  208. result = _data.insert(make_pair(uniqId, pi));
  209. if (result.second == false) return false;
  210. NodeInfo stNodeInfo;
  211. stNodeInfo.dataIter = result.first;
  212. result.first->second.timeIter = _time.insert(make_pair(timeout,stNodeInfo));
  213. //没有发送放到list队列里面
  214. if(!hasSend)
  215. {
  216. SendInfo stSendInfo;
  217. stSendInfo.dataIter = result.first;
  218. _send.push_front(stSendInfo);
  219. result.first->second.sendIter = _send.begin();
  220. }
  221. return true;
  222. }
  223. template<typename T> void TC_TimeoutQueueNew<T>::timeout()
  224. {
  225. int64_t iNow = TNOWMS;
  226. while(true)
  227. {
  228. typename time_type::iterator it = _time.begin();
  229. if(_time.end() == it || it->first>iNow)
  230. break;
  231. if(!it->second.dataIter->second.hasSend)
  232. {
  233. _send.erase(it->second.dataIter->second.sendIter);
  234. }
  235. _data.erase(it->second.dataIter);
  236. _time.erase(it);
  237. }
  238. }
  239. template<typename T> bool TC_TimeoutQueueNew<T>::timeout(T & t)
  240. {
  241. int64_t iNow = TNOWMS;
  242. if(_time.empty())
  243. return false;
  244. typename time_type::iterator it = _time.begin();
  245. if(it->first>iNow)
  246. return false;
  247. t=it->second.dataIter->second.ptr;
  248. if(!it->second.dataIter->second.hasSend)
  249. {
  250. _send.erase(it->second.dataIter->second.sendIter);
  251. }
  252. _data.erase(it->second.dataIter);
  253. _time.erase(it);
  254. return true;
  255. }
  256. template<typename T> void TC_TimeoutQueueNew<T>::timeout(data_functor &df)
  257. {
  258. while(true)
  259. {
  260. int64_t iNow = TNOWMS;
  261. T ptr;
  262. typename time_type::iterator it = _time.begin();
  263. if(_time.end() == it || it->first>iNow)
  264. break;
  265. ptr=it->second->second.ptr;
  266. if(!it->second.dataIter->second.hasSend)
  267. {
  268. _send.erase(it->second.dataIter->second.sendIter);
  269. }
  270. _data.erase(it->second.dataIter);
  271. _time.erase(it);
  272. try { df(ptr); } catch(...) { }
  273. }
  274. }
  275. template<typename T> bool TC_TimeoutQueueNew<T>::erase(uint32_t uniqId, T & t)
  276. {
  277. typename data_type::iterator it = _data.find(uniqId);
  278. if(it == _data.end())
  279. return false;
  280. t = it->second.ptr;
  281. if(!it->second.hasSend)
  282. {
  283. _send.erase(it->second.sendIter);
  284. }
  285. _time.erase(it->second.timeIter);
  286. _data.erase(it);
  287. return true;
  288. }
  289. /////////////////////////////////////////////////////////////////
  290. }
  291. #endif