tc_squeue.h 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. /**
  2. * Tencent is pleased to support the open source community by making Tars available.
  3. *
  4. * Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
  5. *
  6. * Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
  7. * in compliance with the License. You may obtain a copy of the License at
  8. *
  9. * https://opensource.org/licenses/BSD-3-Clause
  10. *
  11. * Unless required by applicable law or agreed to in writing, software distributed
  12. * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
  13. * CONDITIONS OF ANY KIND, either express or implied. See the License for the
  14. * specific language governing permissions and limitations under the License.
  15. */
  16. #ifndef __TC_SQueue_H__
  17. #define __TC_SQueue_H__
  18. #include <fcntl.h>
  19. // #include <unistd.h>
  20. #include <errno.h>
  21. #include <stdexcept>
  22. #include <iostream>
  23. #include <assert.h>
  24. #include <iostream>
  25. #include <string>
  26. #include "util/tc_ex.h"
  27. /**
  28. * 结构化的queue,在一边读一边写的情况下可以不用加锁,是线程(进程)安全的
  29. * 如果多个同时读写,需要加锁
  30. * 增加一个数量标记,在一边读一边写的情况下不一定准确完全准确
  31. */
  32. namespace tars
  33. {
  34. /**
  35. * @brief 异常
  36. */
  37. struct TC_SQueue_Exception : public TC_Exception
  38. {
  39. TC_SQueue_Exception(const std::string &buffer) : TC_Exception(buffer){};
  40. ~TC_SQueue_Exception() throw(){};
  41. };
  42. class TC_SQueue
  43. {
  44. public:
  45. TC_SQueue() {_header = NULL;_data = NULL;}
  46. ~TC_SQueue() {_header = NULL;_data = NULL;}
  47. void attach(char* pBuf, size_t iBufSize)
  48. {
  49. if(iBufSize <= sizeof(Header)+MarkLen+ReserveLen) {
  50. throw TC_SQueue_Exception("TC_SQueue::attach fail:iBufSize is too small");
  51. }
  52. _header = (Header *)pBuf;
  53. _data = pBuf+sizeof(Header);
  54. if(_header->iBufSize != iBufSize - sizeof(Header))
  55. throw TC_SQueue_Exception("TC_SQueue::attach fail: iBufSize != iBufSize - sizeof(Header);");
  56. if(_header->iReserveLen != ReserveLen)
  57. throw TC_SQueue_Exception("TC_SQueue::attach fail: iReserveLen != ReserveLen");
  58. if(_header->iBegin >= _header->iBufSize)
  59. throw TC_SQueue_Exception("TC_SQueue::attach fail: iBegin > iBufSize - sizeof(Header);");
  60. if(_header->iEnd > iBufSize - sizeof(Header))
  61. throw TC_SQueue_Exception("TC_SQueue::attach fail: iEnd > iBufSize - sizeof(Header);");
  62. }
  63. void create(char* pBuf, size_t iBufSize)
  64. {
  65. if(iBufSize <= sizeof(Header)+MarkLen+ReserveLen) {
  66. throw TC_SQueue_Exception("TC_SQueue::create fail:iBufSize is too small");
  67. }
  68. _header = (Header *)pBuf;
  69. _data = pBuf+sizeof(Header);
  70. _header->iBufSize = iBufSize - sizeof(Header);
  71. _header->iReserveLen = ReserveLen;
  72. _header->iBegin = 0;
  73. _header->iEnd = 0;
  74. _header->iNum = 0;
  75. }
  76. bool pop(std::string& buffer)
  77. {
  78. size_t iEnd=_header->iEnd;
  79. size_t tmp_num;
  80. if(_header->iBegin == iEnd) {
  81. _header->iNum = 0;
  82. return false;
  83. }
  84. else if(_header->iBegin<iEnd) {
  85. assert(_header->iBegin+MarkLen < iEnd);
  86. size_t len = GetLen(_data+_header->iBegin);
  87. assert(_header->iBegin+MarkLen+len <= iEnd);
  88. buffer.assign(_data+_header->iBegin+MarkLen, len);
  89. _header->iBegin += len+MarkLen;
  90. tmp_num = _header->iNum;
  91. if(tmp_num > 0)
  92. _header->iNum = tmp_num-1;
  93. } else {
  94. // 被分段
  95. assert(iEnd+ReserveLen <= _header->iBegin);
  96. size_t len = 0;
  97. size_t new_begin = 0;
  98. char *data_from = NULL;
  99. char *data_to = NULL;
  100. assert(_header->iBegin < _header->iBufSize);
  101. // 长度字段也被分段
  102. if(_header->iBegin+MarkLen > _header->iBufSize) {
  103. char tmp[16];
  104. memcpy(tmp,_data+_header->iBegin,_header->iBufSize-_header->iBegin);
  105. memcpy(tmp+_header->iBufSize-_header->iBegin,_data,_header->iBegin+MarkLen-_header->iBufSize);
  106. len = GetLen(tmp);
  107. data_from = _data+(_header->iBegin+MarkLen-_header->iBufSize); //
  108. new_begin = _header->iBegin+MarkLen-_header->iBufSize+len;
  109. assert(new_begin <= iEnd);
  110. } else {
  111. len = GetLen(_data+_header->iBegin);
  112. data_from = _data+_header->iBegin+MarkLen;
  113. if(data_from == _data+_header->iBufSize) data_from = _data;
  114. if(_header->iBegin+MarkLen+len < _header->iBufSize) {
  115. new_begin = _header->iBegin+MarkLen+len;
  116. } else { // 数据被分段
  117. new_begin = _header->iBegin+MarkLen+len-_header->iBufSize;
  118. assert(new_begin <= iEnd);
  119. }
  120. }
  121. data_to = _data+new_begin;
  122. if(data_to > data_from) {
  123. assert(data_to - data_from == (long)len);
  124. buffer.assign(data_from, len);
  125. } else {
  126. buffer.assign(data_from, _data+_header->iBufSize-data_from);
  127. buffer.append(_data, data_to-_data);
  128. assert(_header->iBufSize-(data_from-data_to)== len);
  129. }
  130. _header->iBegin = new_begin;
  131. tmp_num = _header->iNum;
  132. if(tmp_num > 0)
  133. _header->iNum = tmp_num-1;
  134. }
  135. return true;
  136. }
  137. bool push(const std::string& buffer)
  138. {
  139. return push(buffer.c_str(), buffer.length());
  140. }
  141. // 写端使用
  142. bool push(const char *buffer, size_t len)
  143. {
  144. if(len == 0) return true;
  145. size_t iBegin = _header->iBegin;
  146. if(_header->iEnd == iBegin) {
  147. _header->iNum = 0;
  148. if(MarkLen+len+ReserveLen>_header->iBufSize)
  149. return false;
  150. } else if(_header->iEnd > iBegin) {
  151. assert(iBegin+MarkLen < _header->iEnd);
  152. if(_header->iBufSize - _header->iEnd + iBegin < MarkLen+len+ReserveLen)
  153. return false;
  154. } else {
  155. assert(_header->iEnd+ReserveLen <= iBegin);
  156. if(iBegin - _header->iEnd < MarkLen+len+ReserveLen)
  157. return false;
  158. }
  159. // 长度字段被分段
  160. if(_header->iEnd+MarkLen > _header->iBufSize) {
  161. char tmp[16]; SetLen(tmp,len);
  162. memcpy(_data+_header->iEnd,tmp,_header->iBufSize-_header->iEnd);
  163. memcpy(_data,tmp+_header->iBufSize-_header->iEnd,_header->iEnd+MarkLen-_header->iBufSize);
  164. memcpy(_data+_header->iEnd+MarkLen-_header->iBufSize,buffer,len);
  165. _header->iEnd = len+_header->iEnd+MarkLen-_header->iBufSize;
  166. assert(_header->iEnd+ReserveLen <= iBegin);
  167. _header->iNum++;
  168. }
  169. // 数据被分段
  170. else if(_header->iEnd+MarkLen+len > _header->iBufSize){
  171. SetLen(_data+_header->iEnd,len);
  172. memcpy(_data+_header->iEnd+MarkLen,buffer,_header->iBufSize-_header->iEnd-MarkLen);
  173. memcpy(_data,buffer+_header->iBufSize-_header->iEnd-MarkLen,len-(_header->iBufSize-_header->iEnd-MarkLen));
  174. _header->iEnd = len-(_header->iBufSize-_header->iEnd-MarkLen);
  175. assert(_header->iEnd+ReserveLen <= iBegin);
  176. _header->iNum++;
  177. } else {
  178. SetLen(_data+_header->iEnd,len);
  179. memcpy(_data+_header->iEnd+MarkLen,buffer,len);
  180. _header->iEnd = (_header->iEnd+MarkLen+len)%_header->iBufSize;
  181. _header->iNum++;
  182. }
  183. return true;
  184. }
  185. // 读端使用
  186. //bool isEmpty() const {size_t iEnd=_header->iEnd;return _header->iBegin == iEnd;}
  187. bool empty() const {size_t iEnd=_header->iEnd;return _header->iBegin == iEnd;}
  188. // 写端使用
  189. bool full(size_t len) const
  190. {
  191. size_t iBegin = _header->iBegin;
  192. if(len==0) return false;
  193. if(_header->iEnd == iBegin) {
  194. if(len+MarkLen+ReserveLen > _header->iBufSize) return true;
  195. return false;
  196. } else if(_header->iEnd > iBegin) {
  197. assert(iBegin+MarkLen < _header->iEnd);
  198. return _header->iBufSize - _header->iEnd + iBegin < MarkLen+len+ReserveLen;
  199. }
  200. assert(_header->iEnd+ReserveLen <= iBegin);
  201. return (iBegin - _header->iEnd < MarkLen+len+ReserveLen);
  202. }
  203. // 返回队列里的元素数量,不一定绝对准确,只能作为参考
  204. size_t size() const
  205. {
  206. if (empty()) {
  207. _header->iNum = 0;
  208. }
  209. return _header->iNum;
  210. }
  211. private:
  212. size_t GetLen(char *buf) {size_t u; memcpy((void *)&u,buf,MarkLen); return u;}
  213. void SetLen(char *buf, size_t u) {memcpy(buf,(void *)&u,MarkLen);}
  214. private:
  215. const static size_t ReserveLen = 8;
  216. const static size_t MarkLen = sizeof(size_t);
  217. struct Header
  218. {
  219. size_t iBufSize;
  220. size_t iReserveLen; // must be 8
  221. size_t iBegin;
  222. size_t iEnd;
  223. size_t iNum; // 增加一个数量标记,不一定准确
  224. };
  225. Header *_header;
  226. char *_data;
  227. };
  228. }
  229. #endif