PipeTest.cpp 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. /***************************************************************************
  2. *
  3. * Project _____ __ ____ _ _
  4. * ( _ ) /__\ (_ _)_| |_ _| |_
  5. * )(_)( /(__)\ )( (_ _)(_ _)
  6. * (_____)(__)(__)(__) |_| |_|
  7. *
  8. *
  9. * Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
  10. *
  11. * Licensed under the Apache License, Version 2.0 (the "License");
  12. * you may not use this file except in compliance with the License.
  13. * You may obtain a copy of the License at
  14. *
  15. * http://www.apache.org/licenses/LICENSE-2.0
  16. *
  17. * Unless required by applicable law or agreed to in writing, software
  18. * distributed under the License is distributed on an "AS IS" BASIS,
  19. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  20. * See the License for the specific language governing permissions and
  21. * limitations under the License.
  22. *
  23. ***************************************************************************/
  24. #include "PipeTest.hpp"
  25. #include "oatpp/network/virtual_/Pipe.hpp"
  26. #include "oatpp/core/data/stream/BufferStream.hpp"
  27. #include "oatpp-test/Checker.hpp"
  28. #include <iostream>
  29. #include <thread>
  30. namespace oatpp { namespace test { namespace network { namespace virtual_ {
  31. namespace {
  32. typedef oatpp::network::virtual_::Pipe Pipe;
  33. const char* DATA_CHUNK = "<0123456789/abcdefghijklmnopqrstuvwxyz/ABCDEFGHIJKLMNOPQRSTUVWXYZ>";
  34. const v_buff_size CHUNK_SIZE = std::strlen(DATA_CHUNK);
  35. class WriterTask : public oatpp::base::Countable {
  36. private:
  37. std::shared_ptr<Pipe> m_pipe;
  38. v_int64 m_chunksToTransfer;
  39. v_buff_size m_position = 0;
  40. v_buff_size m_transferedBytes = 0;
  41. public:
  42. WriterTask(const std::shared_ptr<Pipe>& pipe, v_int64 chunksToTransfer)
  43. : m_pipe(pipe)
  44. , m_chunksToTransfer(chunksToTransfer)
  45. {}
  46. void run() {
  47. while (m_transferedBytes < CHUNK_SIZE * m_chunksToTransfer) {
  48. auto res = m_pipe->getWriter()->writeSimple(&DATA_CHUNK[m_position], CHUNK_SIZE - m_position);
  49. if(res > 0) {
  50. m_transferedBytes += res;
  51. m_position += res;
  52. if(m_position == CHUNK_SIZE) {
  53. m_position = 0;
  54. }
  55. }
  56. }
  57. OATPP_LOGV("WriterTask", "sent %d bytes", m_transferedBytes);
  58. }
  59. };
  60. class ReaderTask : public oatpp::base::Countable {
  61. private:
  62. std::shared_ptr<oatpp::data::stream::BufferOutputStream> m_buffer;
  63. std::shared_ptr<Pipe> m_pipe;
  64. v_int64 m_chunksToTransfer;
  65. public:
  66. ReaderTask(const std::shared_ptr<oatpp::data::stream::BufferOutputStream> &buffer,
  67. const std::shared_ptr<Pipe>& pipe,
  68. v_int64 chunksToTransfer)
  69. : m_buffer(buffer)
  70. , m_pipe(pipe)
  71. , m_chunksToTransfer(chunksToTransfer)
  72. {}
  73. void run() {
  74. v_char8 readBuffer[256];
  75. while (m_buffer->getCurrentPosition() < CHUNK_SIZE * m_chunksToTransfer) {
  76. auto res = m_pipe->getReader()->readSimple(readBuffer, 256);
  77. if(res > 0) {
  78. m_buffer->writeSimple(readBuffer, res);
  79. }
  80. }
  81. OATPP_LOGV("ReaderTask", "sent %d bytes", m_buffer->getCurrentPosition());
  82. }
  83. };
  84. void runTransfer(const std::shared_ptr<Pipe>& pipe, v_int64 chunksToTransfer, bool writeNonBlock, bool readerNonBlock) {
  85. OATPP_LOGV("transfer", "writer-nb: %d, reader-nb: %d", writeNonBlock, readerNonBlock);
  86. auto buffer = std::make_shared<oatpp::data::stream::BufferOutputStream>();
  87. {
  88. oatpp::test::PerformanceChecker timer("timer");
  89. std::thread writerThread(&WriterTask::run, WriterTask(pipe, chunksToTransfer));
  90. std::thread readerThread(&ReaderTask::run, ReaderTask(buffer, pipe, chunksToTransfer));
  91. writerThread.join();
  92. readerThread.join();
  93. }
  94. OATPP_ASSERT(buffer->getCurrentPosition() == chunksToTransfer * CHUNK_SIZE);
  95. auto ruleBuffer = std::make_shared<oatpp::data::stream::BufferOutputStream>();
  96. for(v_int32 i = 0; i < chunksToTransfer; i ++) {
  97. ruleBuffer->writeSimple(DATA_CHUNK, CHUNK_SIZE);
  98. }
  99. auto str1 = buffer->toString();
  100. auto str2 = buffer->toString();
  101. OATPP_ASSERT(str1 == str2);
  102. }
  103. }
  104. void PipeTest::onRun() {
  105. auto pipe = Pipe::createShared();
  106. v_int64 chunkCount = oatpp::data::buffer::IOBuffer::BUFFER_SIZE * 10 / CHUNK_SIZE;
  107. runTransfer(pipe, chunkCount, false, false);
  108. runTransfer(pipe, chunkCount, true, false);
  109. runTransfer(pipe, chunkCount, false, true);
  110. runTransfer(pipe, chunkCount, true, true);
  111. }
  112. }}}}