ChunkedBuffer.cpp 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. /***************************************************************************
  2. *
  3. * Project _____ __ ____ _ _
  4. * ( _ ) /__\ (_ _)_| |_ _| |_
  5. * )(_)( /(__)\ )( (_ _)(_ _)
  6. * (_____)(__)(__)(__) |_| |_|
  7. *
  8. *
  9. * Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
  10. *
  11. * Licensed under the Apache License, Version 2.0 (the "License");
  12. * you may not use this file except in compliance with the License.
  13. * You may obtain a copy of the License at
  14. *
  15. * http://www.apache.org/licenses/LICENSE-2.0
  16. *
  17. * Unless required by applicable law or agreed to in writing, software
  18. * distributed under the License is distributed on an "AS IS" BASIS,
  19. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  20. * See the License for the specific language governing permissions and
  21. * limitations under the License.
  22. *
  23. ***************************************************************************/
  24. #include "ChunkedBuffer.hpp"
  25. namespace oatpp { namespace data{ namespace stream {
  26. data::stream::DefaultInitializedContext ChunkedBuffer::DEFAULT_CONTEXT(data::stream::StreamType::STREAM_INFINITE);
  27. const char* ChunkedBuffer::ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA = "ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA";
  28. const char* const ChunkedBuffer::CHUNK_POOL_NAME = "ChunkedBuffer_Chunk_Pool";
  29. const v_buff_size ChunkedBuffer::CHUNK_ENTRY_SIZE_INDEX_SHIFT = 11;
  30. const v_buff_size ChunkedBuffer::CHUNK_ENTRY_SIZE =
  31. (1 << ChunkedBuffer::CHUNK_ENTRY_SIZE_INDEX_SHIFT);
  32. const v_buff_size ChunkedBuffer::CHUNK_CHUNK_SIZE = 32;
  33. ChunkedBuffer::ChunkedBuffer()
  34. : m_size(0)
  35. , m_chunkPos(0)
  36. , m_firstEntry(nullptr)
  37. , m_lastEntry(nullptr)
  38. , m_ioMode(IOMode::ASYNCHRONOUS)
  39. {}
  40. ChunkedBuffer::~ChunkedBuffer() {
  41. clear();
  42. }
  43. ChunkedBuffer::ChunkEntry* ChunkedBuffer::obtainNewEntry(){
  44. auto result = new ChunkEntry(getSegemntPool().obtain(), nullptr);
  45. if(m_firstEntry == nullptr) {
  46. m_firstEntry = result;
  47. } else {
  48. m_lastEntry->next = result;
  49. }
  50. m_lastEntry = result;
  51. return result;
  52. }
  53. void ChunkedBuffer::freeEntry(ChunkEntry* entry){
  54. oatpp::base::memory::MemoryPool::free(entry->chunk);
  55. delete entry;
  56. }
  57. v_io_size ChunkedBuffer::writeToEntry(ChunkEntry* entry,
  58. const void *data,
  59. v_buff_size count,
  60. v_buff_size& outChunkPos)
  61. {
  62. if(count >= CHUNK_ENTRY_SIZE){
  63. std::memcpy(entry->chunk, data, CHUNK_ENTRY_SIZE);
  64. outChunkPos = 0;
  65. return CHUNK_ENTRY_SIZE;
  66. } else {
  67. std::memcpy(entry->chunk, data, (size_t)count);
  68. outChunkPos = count;
  69. return count;
  70. }
  71. }
  72. v_io_size ChunkedBuffer::writeToEntryFrom(ChunkEntry* entry,
  73. v_buff_size inChunkPos,
  74. const void *data,
  75. v_buff_size count,
  76. v_buff_size& outChunkPos)
  77. {
  78. v_io_size spaceLeft = CHUNK_ENTRY_SIZE - inChunkPos;
  79. if(count >= spaceLeft){
  80. std::memcpy(&((p_char8) entry->chunk)[inChunkPos], data, (size_t)spaceLeft);
  81. outChunkPos = 0;
  82. return spaceLeft;
  83. } else {
  84. std::memcpy(&((p_char8) entry->chunk)[inChunkPos], data, (size_t)count);
  85. outChunkPos = inChunkPos + count;
  86. return count;
  87. }
  88. }
  89. ChunkedBuffer::ChunkEntry* ChunkedBuffer::getChunkForPosition(ChunkEntry* fromChunk,
  90. v_buff_size pos,
  91. v_buff_size& outChunkPos)
  92. {
  93. v_buff_size segIndex = pos >> CHUNK_ENTRY_SIZE_INDEX_SHIFT;
  94. outChunkPos = pos - (segIndex << CHUNK_ENTRY_SIZE_INDEX_SHIFT);
  95. auto curr = fromChunk;
  96. for(v_buff_size i = 0; i < segIndex; i++){
  97. curr = curr->next;
  98. }
  99. return curr;
  100. }
  101. v_io_size ChunkedBuffer::write(const void *data, v_buff_size count, async::Action& action){
  102. (void) action;
  103. if(count <= 0){
  104. return 0;
  105. }
  106. if(m_lastEntry == nullptr){
  107. obtainNewEntry();
  108. }
  109. ChunkEntry* entry = m_lastEntry;
  110. v_buff_size pos = 0;
  111. pos += writeToEntryFrom(entry, m_chunkPos, data, count, m_chunkPos);
  112. if(m_chunkPos == 0){
  113. entry = obtainNewEntry();
  114. }
  115. while (pos < count) {
  116. pos += writeToEntry(entry, &((p_char8) data)[pos], count - pos, m_chunkPos);
  117. if(m_chunkPos == 0){
  118. entry = obtainNewEntry();
  119. }
  120. }
  121. m_size += pos; // pos == count
  122. return count;
  123. }
  124. void ChunkedBuffer::setOutputStreamIOMode(IOMode ioMode) {
  125. m_ioMode = ioMode;
  126. }
  127. IOMode ChunkedBuffer::getOutputStreamIOMode() {
  128. return m_ioMode;
  129. }
  130. Context& ChunkedBuffer::getOutputStreamContext() {
  131. return DEFAULT_CONTEXT;
  132. }
  133. v_io_size ChunkedBuffer::readSubstring(void *buffer,
  134. v_buff_size pos,
  135. v_buff_size count)
  136. {
  137. if(pos < 0 || pos >= m_size){
  138. return 0;
  139. }
  140. v_buff_size countToRead;
  141. if(pos + count > m_size){
  142. countToRead = m_size - pos;
  143. } else {
  144. countToRead = count;
  145. }
  146. v_buff_size firstChunkPos;
  147. auto firstChunk = getChunkForPosition(m_firstEntry, pos, firstChunkPos);
  148. v_buff_size lastChunkPos;
  149. auto lastChunk = getChunkForPosition(firstChunk, firstChunkPos + countToRead, lastChunkPos);
  150. v_io_size bufferPos = 0;
  151. if(firstChunk != lastChunk){
  152. v_buff_size countToCopy = CHUNK_ENTRY_SIZE - firstChunkPos;
  153. std::memcpy(buffer, &((p_char8)firstChunk->chunk)[firstChunkPos], (size_t)countToCopy);
  154. bufferPos += countToCopy;
  155. auto curr = firstChunk->next;
  156. while (curr != lastChunk) {
  157. std::memcpy(&((p_char8)buffer)[bufferPos], curr->chunk, CHUNK_ENTRY_SIZE);
  158. bufferPos += CHUNK_ENTRY_SIZE;
  159. curr = curr->next;
  160. }
  161. std::memcpy(&((p_char8)buffer)[bufferPos], lastChunk->chunk, (size_t)lastChunkPos);
  162. } else {
  163. v_buff_size countToCopy = lastChunkPos - firstChunkPos;
  164. std::memcpy(buffer, &((p_char8)firstChunk->chunk)[firstChunkPos], (size_t)countToCopy);
  165. }
  166. return countToRead;
  167. }
  168. oatpp::String ChunkedBuffer::getSubstring(v_buff_size pos, v_buff_size count){
  169. auto str = oatpp::String((v_int32) count);
  170. readSubstring((p_char8)str->data(), pos, count);
  171. return str;
  172. }
  173. bool ChunkedBuffer::flushToStream(OutputStream* stream){
  174. v_io_size pos = m_size;
  175. auto curr = m_firstEntry;
  176. while (pos > 0) {
  177. if(pos > CHUNK_ENTRY_SIZE) {
  178. auto res = stream->writeExactSizeDataSimple(curr->chunk, CHUNK_ENTRY_SIZE);
  179. if(res != CHUNK_ENTRY_SIZE) {
  180. return false;
  181. }
  182. pos -= res;
  183. } else {
  184. auto res = stream->writeExactSizeDataSimple(curr->chunk, pos);
  185. if(res != pos) {
  186. return false;
  187. }
  188. pos -= res;
  189. }
  190. curr = curr->next;
  191. }
  192. return true;
  193. }
  194. oatpp::async::CoroutineStarter ChunkedBuffer::flushToStreamAsync(const std::shared_ptr<OutputStream>& stream) {
  195. class FlushCoroutine : public oatpp::async::Coroutine<FlushCoroutine> {
  196. private:
  197. std::shared_ptr<ChunkedBuffer> m_chunkedBuffer;
  198. std::shared_ptr<OutputStream> m_stream;
  199. ChunkEntry* m_currEntry;
  200. v_io_size m_bytesLeft;
  201. Action m_nextAction;
  202. data::buffer::InlineWriteData m_currData;
  203. bool m_needInit;
  204. public:
  205. FlushCoroutine(const std::shared_ptr<ChunkedBuffer>& chunkedBuffer,
  206. const std::shared_ptr<OutputStream>& stream)
  207. : m_chunkedBuffer(chunkedBuffer)
  208. , m_stream(stream)
  209. , m_currEntry(nullptr)
  210. , m_bytesLeft(0)
  211. , m_nextAction(Action::createActionByType(Action::TYPE_FINISH))
  212. , m_needInit(true)
  213. {}
  214. Action act() override {
  215. if (m_needInit) {
  216. m_needInit = false;
  217. m_currEntry = m_chunkedBuffer->m_firstEntry;
  218. m_bytesLeft = m_chunkedBuffer->m_size;
  219. }
  220. if(m_currEntry == nullptr) {
  221. return finish();
  222. }
  223. if(m_bytesLeft > CHUNK_ENTRY_SIZE) {
  224. m_currData.set(m_currEntry->chunk, CHUNK_ENTRY_SIZE);
  225. m_nextAction = yieldTo(&FlushCoroutine::act);
  226. m_currEntry = m_currEntry->next;
  227. m_bytesLeft -= m_currData.bytesLeft;
  228. return yieldTo(&FlushCoroutine::writeCurrData);
  229. } else {
  230. m_currData.set(m_currEntry->chunk, m_bytesLeft);
  231. m_nextAction = yieldTo(&FlushCoroutine::act);
  232. m_currEntry = m_currEntry->next;
  233. m_bytesLeft -= m_currData.bytesLeft;
  234. return yieldTo(&FlushCoroutine::writeCurrData);
  235. }
  236. }
  237. Action writeCurrData() {
  238. return m_stream->writeExactSizeDataAsyncInline(m_currData, Action::clone(m_nextAction));
  239. }
  240. };
  241. return FlushCoroutine::start(shared_from_this(), stream);
  242. }
  243. std::shared_ptr<ChunkedBuffer::Chunks> ChunkedBuffer::getChunks() {
  244. auto chunks = std::make_shared<Chunks>();
  245. auto curr = m_firstEntry;
  246. v_int32 count = 0;
  247. while (curr != nullptr) {
  248. chunks->push_back(Chunk::createShared(curr->chunk, curr->next
  249. ? CHUNK_ENTRY_SIZE
  250. : m_size - CHUNK_ENTRY_SIZE * count));
  251. ++count;
  252. curr = curr->next;
  253. }
  254. return chunks;
  255. }
  256. v_buff_size ChunkedBuffer::getSize(){
  257. return m_size;
  258. }
  259. void ChunkedBuffer::clear(){
  260. ChunkEntry* curr = m_firstEntry;
  261. while (curr != nullptr) {
  262. ChunkEntry* next = curr->next;
  263. freeEntry(curr);
  264. curr = next;
  265. }
  266. m_size = 0;
  267. m_chunkPos = 0;
  268. m_firstEntry = nullptr;
  269. m_lastEntry = nullptr;
  270. }
  271. }}}