jmem_queue.h 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #ifndef _JMEM_QUEUE_H
  17. #define _JMEM_QUEUE_H
  18. #include "util/tc_mem_queue.h"
  19. #include "jmem/jmem_policy.h"
  20. #include "tup/Tars.h"
  21. namespace tars
  22. {
  23. /**
  24. * 基于Tars协议的内存循环队列
  25. * 编解码出错则抛出TarsDecodeException和TarsEncodeException
  26. * 可以对锁策略和存储策略进行组合, 例如:
  27. * 基于信号量锁, 文件存储的队列:
  28. * TarsQueue<Test::QueueElement, SemLockPolicy, FileStorePolicy>
  29. * 基于信号量锁, 共享内存存储的队列
  30. * TarsQueue<Test::QueueElement, SemLockPolicy, ShmStorePolicy>
  31. * 基于线程锁, 内存存储的队列
  32. * TarsQueue<Test::QueueElement, ThreadLockPolicy, MemStorePolicy>
  33. *
  34. * 使用上, 不同的组合, 初始化函数不完全一样
  35. * 初始化函数有:
  36. * SemLockPolicy::initLock(key_t)
  37. * ShmStorePolicy::initStore(key_t, size_t)
  38. * FileStorePolicy::initStore(const char *file, size_t)
  39. * 等, 具体参见jmem_policy.h
  40. */
  41. template<typename T,
  42. typename LockPolicy,
  43. template<class,class> class StorePolicy>
  44. class TarsQueue : public StorePolicy<TC_MemQueue, LockPolicy>
  45. {
  46. public:
  47. /**
  48. * 弹出一个元素
  49. * @param t
  50. *
  51. * @return bool,true:成功, false:无元素
  52. */
  53. bool pop_front(T &t)
  54. {
  55. string s;
  56. {
  57. TC_LockT<typename LockPolicy::Mutex> lock(LockPolicy::mutex());
  58. if(!this->_t.pop_front(s))
  59. {
  60. return false;
  61. }
  62. }
  63. tars::TarsInputStream<BufferReader> is;
  64. is.setBuffer(s.c_str(), s.length());
  65. t.readFrom(is);
  66. return true;
  67. }
  68. /**
  69. * 插入到队列
  70. * @param t
  71. *
  72. * @return bool, ture:成功, false:队列满
  73. */
  74. bool push_back(const T &t)
  75. {
  76. tars::TarsOutputStream<BufferWriter> os;
  77. t.writeTo(os);
  78. TC_LockT<typename LockPolicy::Mutex> lock(LockPolicy::mutex());
  79. return this->_t.push_back(os.getBuffer(), os.getLength());
  80. }
  81. /**
  82. * 是否满了
  83. * @param t
  84. *
  85. * @return bool
  86. */
  87. bool isFull(const T &t)
  88. {
  89. tars::TarsOutputStream<BufferWriter> os;
  90. t.writeTo(os);
  91. TC_LockT<typename LockPolicy::Mutex> lock(LockPolicy::mutex());
  92. return this->_t.isFull(os.getLength());
  93. }
  94. /**
  95. * 获取空闲的空间大小
  96. */
  97. size_t getFreeSize()
  98. {
  99. TC_LockT<typename LockPolicy::Mutex> lock(LockPolicy::mutex());
  100. return this->_t.getFreeSize();
  101. }
  102. /**
  103. * 队列是否满
  104. * @param : iSize, 输入数据块长度
  105. * @return bool , true:满, false: 非满
  106. */
  107. bool isFull(size_t iSize)
  108. {
  109. TC_LockT<typename LockPolicy::Mutex> lock(LockPolicy::mutex());
  110. return this->_t.isFull(iSize);
  111. }
  112. /**
  113. * 队列是否空
  114. * @return bool , true: 满, false: 非满
  115. */
  116. bool isEmpty()
  117. {
  118. TC_LockT<typename LockPolicy::Mutex> lock(LockPolicy::mutex());
  119. return this->_t.isEmpty();
  120. }
  121. /**
  122. * 队列中元素个数, 不加锁的情况下不保证一定正确
  123. * @return size_t, 元素个数
  124. */
  125. size_t elementCount()
  126. {
  127. TC_LockT<typename LockPolicy::Mutex> lock(LockPolicy::mutex());
  128. return this->_t.elementCount();
  129. }
  130. /**
  131. * 队列长度(字节), 小于总存储区长度(总存储区长度包含了控制快)
  132. * @return size_t : 队列长度
  133. */
  134. size_t queueSize()
  135. {
  136. TC_LockT<typename LockPolicy::Mutex> lock(LockPolicy::mutex());
  137. return this->_t.queueSize();
  138. }
  139. /**
  140. * 共享内存长度
  141. * @return size_t : 共享内存长度
  142. */
  143. size_t memSize() const
  144. {
  145. TC_LockT<typename LockPolicy::Mutex> lock(LockPolicy::mutex());
  146. return this->_t.memSize();
  147. };
  148. };
  149. }
  150. #endif