ChunkedBuffer.hpp 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. /***************************************************************************
  2. *
  3. * Project _____ __ ____ _ _
  4. * ( _ ) /__\ (_ _)_| |_ _| |_
  5. * )(_)( /(__)\ )( (_ _)(_ _)
  6. * (_____)(__)(__)(__) |_| |_|
  7. *
  8. *
  9. * Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
  10. *
  11. * Licensed under the Apache License, Version 2.0 (the "License");
  12. * you may not use this file except in compliance with the License.
  13. * You may obtain a copy of the License at
  14. *
  15. * http://www.apache.org/licenses/LICENSE-2.0
  16. *
  17. * Unless required by applicable law or agreed to in writing, software
  18. * distributed under the License is distributed on an "AS IS" BASIS,
  19. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  20. * See the License for the specific language governing permissions and
  21. * limitations under the License.
  22. *
  23. ***************************************************************************/
  24. #ifndef oatpp_data_stream_ChunkedBuffer_hpp
  25. #define oatpp_data_stream_ChunkedBuffer_hpp
  26. #include "Stream.hpp"
  27. #include "oatpp/core/collection/LinkedList.hpp"
  28. #include "oatpp/core/async/Coroutine.hpp"
  29. namespace oatpp { namespace data{ namespace stream {
  30. /**
  31. * Buffer wich can grow by chunks and implements &id:oatpp::data::stream::ConsistentOutputStream; interface.
  32. */
  33. class ChunkedBuffer : public oatpp::base::Countable, public ConsistentOutputStream, public std::enable_shared_from_this<ChunkedBuffer> {
  34. public:
  35. static data::stream::DefaultInitializedContext DEFAULT_CONTEXT;
  36. public:
  37. static const char* ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA;
  38. public:
  39. OBJECT_POOL(ChunkedBuffer_Pool, ChunkedBuffer, 32)
  40. SHARED_OBJECT_POOL(Shared_ChunkedBuffer_Pool, ChunkedBuffer, 32)
  41. public:
  42. static const char* const CHUNK_POOL_NAME;
  43. static const v_buff_size CHUNK_ENTRY_SIZE_INDEX_SHIFT;
  44. static const v_buff_size CHUNK_ENTRY_SIZE;
  45. static const v_buff_size CHUNK_CHUNK_SIZE;
  46. static oatpp::base::memory::ThreadDistributedMemoryPool& getSegemntPool() {
  47. static auto pool = new oatpp::base::memory::ThreadDistributedMemoryPool(CHUNK_POOL_NAME, CHUNK_ENTRY_SIZE, CHUNK_CHUNK_SIZE);
  48. return *pool;
  49. }
  50. private:
  51. class ChunkEntry {
  52. public:
  53. OBJECT_POOL(ChunkedBuffer_ChunkEntry_Pool, ChunkEntry, 32)
  54. public:
  55. ChunkEntry(void* pChunk, ChunkEntry* pNext)
  56. : chunk(pChunk)
  57. , next(pNext)
  58. {}
  59. ~ChunkEntry(){
  60. }
  61. void* chunk;
  62. ChunkEntry* next;
  63. };
  64. public:
  65. class Chunk : public oatpp::base::Countable {
  66. public:
  67. OBJECT_POOL(ChunkedBuffer_Chunk_Pool, Chunk, 32)
  68. SHARED_OBJECT_POOL(Shared_ChunkedBuffer_Chunk_Pool, Chunk, 32)
  69. public:
  70. Chunk(void* pData, v_buff_size pSize)
  71. : data(pData)
  72. , size(pSize)
  73. {}
  74. static std::shared_ptr<Chunk> createShared(void* data, v_buff_size size){
  75. return Shared_ChunkedBuffer_Chunk_Pool::allocateShared(data, size);
  76. }
  77. const void* data;
  78. const v_buff_size size;
  79. };
  80. public:
  81. typedef oatpp::collection::LinkedList<std::shared_ptr<Chunk>> Chunks;
  82. private:
  83. v_buff_size m_size;
  84. v_buff_size m_chunkPos;
  85. ChunkEntry* m_firstEntry;
  86. ChunkEntry* m_lastEntry;
  87. IOMode m_ioMode;
  88. private:
  89. ChunkEntry* obtainNewEntry();
  90. void freeEntry(ChunkEntry* entry);
  91. v_io_size writeToEntry(ChunkEntry* entry,
  92. const void *data,
  93. v_buff_size count,
  94. v_buff_size& outChunkPos);
  95. v_io_size writeToEntryFrom(ChunkEntry* entry,
  96. v_buff_size inChunkPos,
  97. const void *data,
  98. v_buff_size count,
  99. v_buff_size& outChunkPos);
  100. ChunkEntry* getChunkForPosition(ChunkEntry* fromChunk,
  101. v_buff_size pos,
  102. v_buff_size& outChunkPos);
  103. public:
  104. /**
  105. * Constructor.
  106. */
  107. ChunkedBuffer();
  108. /**
  109. * Virtual Destructor.
  110. */
  111. ~ChunkedBuffer();
  112. public:
  113. /**
  114. * Deleted copy constructor.
  115. */
  116. ChunkedBuffer(const ChunkedBuffer&) = delete;
  117. ChunkedBuffer& operator=(const ChunkedBuffer&) = delete;
  118. public:
  119. /**
  120. * Create shared ChunkedBuffer.
  121. * @return `std::shared_ptr` to ChunkedBuffer.
  122. */
  123. static std::shared_ptr<ChunkedBuffer> createShared(){
  124. return Shared_ChunkedBuffer_Pool::allocateShared();
  125. }
  126. /**
  127. * Write data to ChunkedBuffer. Implementation of &id:oatpp::data::stream::OutputStream::write; method.
  128. * @param data - data to write.
  129. * @param count - size of data in bytes.
  130. * @param action - async specific action. If action is NOT &id:oatpp::async::Action::TYPE_NONE;, then
  131. * caller MUST return this action on coroutine iteration.
  132. * @return - actual number of bytes written.
  133. */
  134. v_io_size write(const void *data, v_buff_size count, async::Action& action) override;
  135. /**
  136. * Set stream I/O mode.
  137. * @param ioMode
  138. */
  139. void setOutputStreamIOMode(IOMode ioMode) override;
  140. /**
  141. * Set stream I/O mode.
  142. * @return
  143. */
  144. IOMode getOutputStreamIOMode() override;
  145. /**
  146. * Get stream context.
  147. * @return - &id:oatpp::data::stream::Context;.
  148. */
  149. Context& getOutputStreamContext() override;
  150. /**
  151. * Read part of ChunkedBuffer to buffer.
  152. * @param buffer - buffer to write data to.
  153. * @param pos - starting position in ChunkedBuffer to read data from.
  154. * @param count - number of bytes to read.
  155. * @return - actual number of bytes read from ChunkedBuffer and written to buffer.
  156. */
  157. v_io_size readSubstring(void *buffer, v_buff_size pos, v_buff_size count);
  158. /**
  159. * Create &id:oatpp::String; from part of ChunkedBuffer.
  160. * @param pos - starting position in ChunkedBuffer.
  161. * @param count - size of bytes to write to substring.
  162. * @return - &id:oatpp::String;
  163. */
  164. oatpp::String getSubstring(v_buff_size pos, v_buff_size count);
  165. /**
  166. * Create &id:oatpp::String; from all data in ChunkedBuffer.
  167. * @return - &id:oatpp::String;
  168. */
  169. oatpp::String toString() {
  170. return getSubstring(0, m_size);
  171. }
  172. /**
  173. * Write all data from ChunkedBuffer to &id:oatpp::data::stream::OutputStream;.
  174. * ChunkedBuffer will not be cleared during this call!
  175. * @param stream - &id:oatpp::data::stream::OutputStream; stream to write all data to.
  176. * @return - `true` if no errors occured. **will be refactored to return actual amount of bytes flushed**.
  177. */
  178. bool flushToStream(OutputStream* stream);
  179. /**
  180. * Write all data from ChunkedBuffer to &id:oatpp::data::stream::OutputStream; in asynchronous manner.
  181. * @param stream - &id:oatpp::data::stream::OutputStream; stream to write all data to.
  182. * @return - &id:oatpp::async::CoroutineStarter;.
  183. */
  184. oatpp::async::CoroutineStarter flushToStreamAsync(const std::shared_ptr<OutputStream>& stream);
  185. std::shared_ptr<Chunks> getChunks();
  186. /**
  187. * Get number of bytes written to ChunkedBuffer.
  188. * @return - number of bytes written to ChunkedBuffer.
  189. */
  190. v_buff_size getSize();
  191. /**
  192. * Clear data in ChunkedBuffer.
  193. */
  194. void clear();
  195. };
  196. }}}
  197. #endif /* oatpp_data_stream_ChunkedBuffer_hpp */