BufferStream.cpp 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. /***************************************************************************
  2. *
  3. * Project _____ __ ____ _ _
  4. * ( _ ) /__\ (_ _)_| |_ _| |_
  5. * )(_)( /(__)\ )( (_ _)(_ _)
  6. * (_____)(__)(__)(__) |_| |_|
  7. *
  8. *
  9. * Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
  10. *
  11. * Licensed under the Apache License, Version 2.0 (the "License");
  12. * you may not use this file except in compliance with the License.
  13. * You may obtain a copy of the License at
  14. *
  15. * http://www.apache.org/licenses/LICENSE-2.0
  16. *
  17. * Unless required by applicable law or agreed to in writing, software
  18. * distributed under the License is distributed on an "AS IS" BASIS,
  19. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  20. * See the License for the specific language governing permissions and
  21. * limitations under the License.
  22. *
  23. ***************************************************************************/
  24. #include "BufferStream.hpp"
  25. #include "oatpp/core/utils/Binary.hpp"
  26. namespace oatpp { namespace data{ namespace stream {
  27. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  28. // BufferOutputStream
  29. data::stream::DefaultInitializedContext BufferOutputStream::DEFAULT_CONTEXT(data::stream::StreamType::STREAM_INFINITE);
  30. BufferOutputStream::BufferOutputStream(v_buff_size initialCapacity, const std::shared_ptr<void>& captureData)
  31. : m_data(new v_char8[initialCapacity])
  32. , m_capacity(initialCapacity)
  33. , m_position(0)
  34. , m_maxCapacity(-1)
  35. , m_ioMode(IOMode::ASYNCHRONOUS)
  36. , m_capturedData(captureData)
  37. {}
  38. BufferOutputStream::~BufferOutputStream() {
  39. m_capturedData.reset(); // reset capture data before deleting data.
  40. delete [] m_data;
  41. }
  42. v_io_size BufferOutputStream::write(const void *data, v_buff_size count, async::Action& action) {
  43. (void) action;
  44. reserveBytesUpfront(count);
  45. std::memcpy(m_data + m_position, data, count);
  46. m_position += count;
  47. return count;
  48. }
  49. void BufferOutputStream::setOutputStreamIOMode(IOMode ioMode) {
  50. m_ioMode = ioMode;
  51. }
  52. IOMode BufferOutputStream::getOutputStreamIOMode() {
  53. return m_ioMode;
  54. }
  55. Context& BufferOutputStream::getOutputStreamContext() {
  56. return DEFAULT_CONTEXT;
  57. }
  58. void BufferOutputStream::reserveBytesUpfront(v_buff_size count) {
  59. v_buff_size capacityNeeded = m_position + count;
  60. if(capacityNeeded > m_capacity) {
  61. v_buff_size newCapacity = utils::Binary::nextP2(capacityNeeded);
  62. if(newCapacity < 0 || (m_maxCapacity > 0 && newCapacity > m_maxCapacity)) {
  63. newCapacity = m_maxCapacity;
  64. }
  65. if(newCapacity < capacityNeeded) {
  66. throw std::runtime_error("[oatpp::data::stream::BufferOutputStream::reserveBytesUpfront()]: Error. Unable to allocate requested memory.");
  67. }
  68. p_char8 newData = new v_char8[newCapacity];
  69. std::memcpy(newData, m_data, m_position);
  70. delete [] m_data;
  71. m_data = newData;
  72. m_capacity = newCapacity;
  73. }
  74. }
  75. p_char8 BufferOutputStream::getData() {
  76. return m_data;
  77. }
  78. v_buff_size BufferOutputStream::getCapacity() {
  79. return m_capacity;
  80. }
  81. v_buff_size BufferOutputStream::getCurrentPosition() {
  82. return m_position;
  83. }
  84. void BufferOutputStream::setCurrentPosition(v_buff_size position) {
  85. m_position = position;
  86. }
  87. void BufferOutputStream::reset(v_buff_size initialCapacity) {
  88. delete [] m_data;
  89. m_data = new v_char8[initialCapacity];
  90. m_capacity = initialCapacity;
  91. m_position = 0;
  92. }
  93. oatpp::String BufferOutputStream::toString() {
  94. return oatpp::String((const char*) m_data, m_position);
  95. }
  96. oatpp::String BufferOutputStream::getSubstring(v_buff_size pos, v_buff_size count) {
  97. if(pos + count <= m_position) {
  98. return oatpp::String((const char *) (m_data + pos), count);
  99. } else {
  100. return oatpp::String((const char *) (m_data + pos), m_position - pos);
  101. }
  102. }
  103. oatpp::v_io_size BufferOutputStream::flushToStream(OutputStream* stream) {
  104. return stream->writeExactSizeDataSimple(m_data, m_position);
  105. }
  106. oatpp::async::CoroutineStarter BufferOutputStream::flushToStreamAsync(const std::shared_ptr<BufferOutputStream>& _this, const std::shared_ptr<OutputStream>& stream) {
  107. class WriteDataCoroutine : public oatpp::async::Coroutine<WriteDataCoroutine> {
  108. private:
  109. std::shared_ptr<BufferOutputStream> m_this;
  110. std::shared_ptr<oatpp::data::stream::OutputStream> m_stream;
  111. data::buffer::InlineWriteData m_inlineData;
  112. public:
  113. WriteDataCoroutine(const std::shared_ptr<BufferOutputStream>& _this,
  114. const std::shared_ptr<oatpp::data::stream::OutputStream>& stream)
  115. : m_this(_this)
  116. , m_stream(stream)
  117. {}
  118. Action act() override {
  119. if(m_inlineData.currBufferPtr == nullptr) {
  120. m_inlineData.currBufferPtr = m_this->m_data;
  121. m_inlineData.bytesLeft = m_this->m_position;
  122. }
  123. return m_stream.get()->writeExactSizeDataAsyncInline(m_inlineData, finish());
  124. }
  125. };
  126. return WriteDataCoroutine::start(_this, stream);
  127. }
  128. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  129. // BufferInputStream
  130. data::stream::DefaultInitializedContext BufferInputStream::DEFAULT_CONTEXT(data::stream::StreamType::STREAM_FINITE);
  131. BufferInputStream::BufferInputStream(const std::shared_ptr<std::string>& memoryHandle,
  132. const void* data,
  133. v_buff_size size,
  134. const std::shared_ptr<void>& captureData)
  135. : m_memoryHandle(memoryHandle)
  136. , m_data((p_char8) data)
  137. , m_size(size)
  138. , m_position(0)
  139. , m_ioMode(IOMode::ASYNCHRONOUS)
  140. , m_capturedData(captureData)
  141. {}
  142. BufferInputStream::BufferInputStream(const oatpp::String& data, const std::shared_ptr<void>& captureData)
  143. : BufferInputStream(data.getPtr(), (p_char8) data->data(), data->size(), captureData)
  144. {}
  145. void BufferInputStream::reset(const std::shared_ptr<std::string>& memoryHandle,
  146. p_char8 data,
  147. v_buff_size size,
  148. const std::shared_ptr<void>& captureData)
  149. {
  150. m_memoryHandle = memoryHandle;
  151. m_data = data;
  152. m_size = size;
  153. m_position = 0;
  154. m_capturedData = captureData;
  155. }
  156. void BufferInputStream::reset() {
  157. m_memoryHandle = nullptr;
  158. m_data = nullptr;
  159. m_size = 0;
  160. m_position = 0;
  161. m_capturedData.reset();
  162. }
  163. v_io_size BufferInputStream::read(void *data, v_buff_size count, async::Action& action) {
  164. (void) action;
  165. v_buff_size desiredAmount = count;
  166. if(desiredAmount > m_size - m_position) {
  167. desiredAmount = m_size - m_position;
  168. }
  169. std::memcpy(data, &m_data[m_position], desiredAmount);
  170. m_position += desiredAmount;
  171. return desiredAmount;
  172. }
  173. void BufferInputStream::setInputStreamIOMode(IOMode ioMode) {
  174. m_ioMode = ioMode;
  175. }
  176. IOMode BufferInputStream::getInputStreamIOMode() {
  177. return m_ioMode;
  178. }
  179. Context& BufferInputStream::getInputStreamContext() {
  180. return DEFAULT_CONTEXT;
  181. }
  182. std::shared_ptr<std::string> BufferInputStream::getDataMemoryHandle() {
  183. return m_memoryHandle;
  184. }
  185. p_char8 BufferInputStream::getData() {
  186. return m_data;
  187. }
  188. v_buff_size BufferInputStream::getDataSize() {
  189. return m_size;
  190. }
  191. v_buff_size BufferInputStream::getCurrentPosition() {
  192. return m_position;
  193. }
  194. void BufferInputStream::setCurrentPosition(v_buff_size position) {
  195. m_position = position;
  196. }
  197. v_io_size BufferInputStream::peek(void *data, v_buff_size count, async::Action &action) {
  198. (void) action;
  199. v_buff_size desiredAmount = count;
  200. if(desiredAmount > m_size - m_position) {
  201. desiredAmount = m_size - m_position;
  202. }
  203. std::memcpy(data, &m_data[m_position], desiredAmount);
  204. return desiredAmount;
  205. }
  206. v_io_size BufferInputStream::availableToRead() const {
  207. return m_size - m_position;
  208. }
  209. v_io_size BufferInputStream::commitReadOffset(v_buff_size count) {
  210. if(count > m_size - m_position) {
  211. count = m_size - m_position;
  212. }
  213. m_position += count;
  214. return count;
  215. }
  216. }}}