ChunkedBuffer.hpp 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. /***************************************************************************
  2. *
  3. * Project _____ __ ____ _ _
  4. * ( _ ) /__\ (_ _)_| |_ _| |_
  5. * )(_)( /(__)\ )( (_ _)(_ _)
  6. * (_____)(__)(__)(__) |_| |_|
  7. *
  8. *
  9. * Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
  10. *
  11. * Licensed under the Apache License, Version 2.0 (the "License");
  12. * you may not use this file except in compliance with the License.
  13. * You may obtain a copy of the License at
  14. *
  15. * http://www.apache.org/licenses/LICENSE-2.0
  16. *
  17. * Unless required by applicable law or agreed to in writing, software
  18. * distributed under the License is distributed on an "AS IS" BASIS,
  19. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  20. * See the License for the specific language governing permissions and
  21. * limitations under the License.
  22. *
  23. ***************************************************************************/
  24. #ifndef oatpp_data_stream_ChunkedBuffer_hpp
  25. #define oatpp_data_stream_ChunkedBuffer_hpp
  26. #include "Stream.hpp"
  27. #include "oatpp/core/collection/LinkedList.hpp"
  28. #include "oatpp/core/async/Coroutine.hpp"
  29. namespace oatpp { namespace data{ namespace stream {
  30. /**
  31. * Buffer wich can grow by chunks and implements &id:oatpp::data::stream::ConsistentOutputStream; interface.
  32. */
  33. class ChunkedBuffer : public oatpp::base::Countable, public ConsistentOutputStream, public std::enable_shared_from_this<ChunkedBuffer> {
  34. public:
  35. static data::stream::DefaultInitializedContext DEFAULT_CONTEXT;
  36. public:
  37. static const char* ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA;
  38. public:
  39. OBJECT_POOL(ChunkedBuffer_Pool, ChunkedBuffer, 32)
  40. SHARED_OBJECT_POOL(Shared_ChunkedBuffer_Pool, ChunkedBuffer, 32)
  41. public:
  42. static const char* const CHUNK_POOL_NAME;
  43. static const v_buff_size CHUNK_ENTRY_SIZE_INDEX_SHIFT;
  44. static const v_buff_size CHUNK_ENTRY_SIZE;
  45. static const v_buff_size CHUNK_CHUNK_SIZE;
  46. static oatpp::base::memory::ThreadDistributedMemoryPool& getSegemntPool(){
  47. static std::once_flag flag;
  48. static oatpp::base::memory::ThreadDistributedMemoryPool *pool = nullptr;
  49. std::call_once(flag, []() {
  50. pool = new oatpp::base::memory::ThreadDistributedMemoryPool(CHUNK_POOL_NAME, CHUNK_ENTRY_SIZE, CHUNK_CHUNK_SIZE);
  51. });
  52. return *pool;
  53. }
  54. private:
  55. class ChunkEntry {
  56. public:
  57. OBJECT_POOL(ChunkedBuffer_ChunkEntry_Pool, ChunkEntry, 32)
  58. public:
  59. ChunkEntry(void* pChunk, ChunkEntry* pNext)
  60. : chunk(pChunk)
  61. , next(pNext)
  62. {}
  63. ~ChunkEntry(){
  64. }
  65. void* chunk;
  66. ChunkEntry* next;
  67. };
  68. public:
  69. class Chunk : public oatpp::base::Countable {
  70. public:
  71. OBJECT_POOL(ChunkedBuffer_Chunk_Pool, Chunk, 32)
  72. SHARED_OBJECT_POOL(Shared_ChunkedBuffer_Chunk_Pool, Chunk, 32)
  73. public:
  74. Chunk(void* pData, v_buff_size pSize)
  75. : data(pData)
  76. , size(pSize)
  77. {}
  78. static std::shared_ptr<Chunk> createShared(void* data, v_buff_size size){
  79. return Shared_ChunkedBuffer_Chunk_Pool::allocateShared(data, size);
  80. }
  81. const void* data;
  82. const v_buff_size size;
  83. };
  84. public:
  85. typedef oatpp::collection::LinkedList<std::shared_ptr<Chunk>> Chunks;
  86. private:
  87. v_buff_size m_size;
  88. v_buff_size m_chunkPos;
  89. ChunkEntry* m_firstEntry;
  90. ChunkEntry* m_lastEntry;
  91. IOMode m_ioMode;
  92. private:
  93. ChunkEntry* obtainNewEntry();
  94. void freeEntry(ChunkEntry* entry);
  95. v_io_size writeToEntry(ChunkEntry* entry,
  96. const void *data,
  97. v_buff_size count,
  98. v_buff_size& outChunkPos);
  99. v_io_size writeToEntryFrom(ChunkEntry* entry,
  100. v_buff_size inChunkPos,
  101. const void *data,
  102. v_buff_size count,
  103. v_buff_size& outChunkPos);
  104. ChunkEntry* getChunkForPosition(ChunkEntry* fromChunk,
  105. v_buff_size pos,
  106. v_buff_size& outChunkPos);
  107. public:
  108. /**
  109. * Constructor.
  110. */
  111. ChunkedBuffer();
  112. /**
  113. * Virtual Destructor.
  114. */
  115. ~ChunkedBuffer();
  116. public:
  117. /**
  118. * Deleted copy constructor.
  119. */
  120. ChunkedBuffer(const ChunkedBuffer&) = delete;
  121. ChunkedBuffer& operator=(const ChunkedBuffer&) = delete;
  122. public:
  123. /**
  124. * Create shared ChunkedBuffer.
  125. * @return `std::shared_ptr` to ChunkedBuffer.
  126. */
  127. static std::shared_ptr<ChunkedBuffer> createShared(){
  128. return Shared_ChunkedBuffer_Pool::allocateShared();
  129. }
  130. /**
  131. * Write data to ChunkedBuffer. Implementation of &id:oatpp::data::stream::OutputStream::write; method.
  132. * @param data - data to write.
  133. * @param count - size of data in bytes.
  134. * @param action - async specific action. If action is NOT &id:oatpp::async::Action::TYPE_NONE;, then
  135. * caller MUST return this action on coroutine iteration.
  136. * @return - actual number of bytes written.
  137. */
  138. v_io_size write(const void *data, v_buff_size count, async::Action& action) override;
  139. /**
  140. * Set stream I/O mode.
  141. * @param ioMode
  142. */
  143. void setOutputStreamIOMode(IOMode ioMode) override;
  144. /**
  145. * Set stream I/O mode.
  146. * @return
  147. */
  148. IOMode getOutputStreamIOMode() override;
  149. /**
  150. * Get stream context.
  151. * @return - &id:oatpp::data::stream::Context;.
  152. */
  153. Context& getOutputStreamContext() override;
  154. /**
  155. * Read part of ChunkedBuffer to buffer.
  156. * @param buffer - buffer to write data to.
  157. * @param pos - starting position in ChunkedBuffer to read data from.
  158. * @param count - number of bytes to read.
  159. * @return - actual number of bytes read from ChunkedBuffer and written to buffer.
  160. */
  161. v_io_size readSubstring(void *buffer, v_buff_size pos, v_buff_size count);
  162. /**
  163. * Create &id:oatpp::String; from part of ChunkedBuffer.
  164. * @param pos - starting position in ChunkedBuffer.
  165. * @param count - size of bytes to write to substring.
  166. * @return - &id:oatpp::String;
  167. */
  168. oatpp::String getSubstring(v_buff_size pos, v_buff_size count);
  169. /**
  170. * Create &id:oatpp::String; from all data in ChunkedBuffer.
  171. * @return - &id:oatpp::String;
  172. */
  173. oatpp::String toString() {
  174. return getSubstring(0, m_size);
  175. }
  176. /**
  177. * Write all data from ChunkedBuffer to &id:oatpp::data::stream::OutputStream;.
  178. * ChunkedBuffer will not be cleared during this call!
  179. * @param stream - &id:oatpp::data::stream::OutputStream; stream to write all data to.
  180. * @return - `true` if no errors occured. **will be refactored to return actual amount of bytes flushed**.
  181. */
  182. bool flushToStream(OutputStream* stream);
  183. /**
  184. * Write all data from ChunkedBuffer to &id:oatpp::data::stream::OutputStream; in asynchronous manner.
  185. * @param stream - &id:oatpp::data::stream::OutputStream; stream to write all data to.
  186. * @return - &id:oatpp::async::CoroutineStarter;.
  187. */
  188. oatpp::async::CoroutineStarter flushToStreamAsync(const std::shared_ptr<OutputStream>& stream);
  189. std::shared_ptr<Chunks> getChunks();
  190. /**
  191. * Get number of bytes written to ChunkedBuffer.
  192. * @return - number of bytes written to ChunkedBuffer.
  193. */
  194. v_buff_size getSize();
  195. /**
  196. * Clear data in ChunkedBuffer.
  197. */
  198. void clear();
  199. };
  200. }}}
  201. #endif /* oatpp_data_stream_ChunkedBuffer_hpp */