ConnectionPoolTest.cpp 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. /***************************************************************************
  2. *
  3. * Project _____ __ ____ _ _
  4. * ( _ ) /__\ (_ _)_| |_ _| |_
  5. * )(_)( /(__)\ )( (_ _)(_ _)
  6. * (_____)(__)(__)(__) |_| |_|
  7. *
  8. *
  9. * Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
  10. *
  11. * Licensed under the Apache License, Version 2.0 (the "License");
  12. * you may not use this file except in compliance with the License.
  13. * You may obtain a copy of the License at
  14. *
  15. * http://www.apache.org/licenses/LICENSE-2.0
  16. *
  17. * Unless required by applicable law or agreed to in writing, software
  18. * distributed under the License is distributed on an "AS IS" BASIS,
  19. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  20. * See the License for the specific language governing permissions and
  21. * limitations under the License.
  22. *
  23. ***************************************************************************/
  24. #include "ConnectionPoolTest.hpp"
  25. #include "oatpp/network/ConnectionPool.hpp"
  26. #include "oatpp/core/async/Executor.hpp"
  27. namespace oatpp { namespace test { namespace network {
  28. namespace {
  29. typedef oatpp::provider::Pool<
  30. oatpp::network::ConnectionProvider,
  31. oatpp::data::stream::IOStream,
  32. oatpp::network::ConnectionAcquisitionProxy
  33. > ConnectionPool;
  34. class StubStream : public oatpp::data::stream::IOStream, public oatpp::base::Countable {
  35. public:
  36. v_io_size write(const void *buff, v_buff_size count, async::Action& actions) override {
  37. throw std::runtime_error("It's a stub!");
  38. }
  39. v_io_size read(void *buff, v_buff_size count, async::Action& action) override {
  40. throw std::runtime_error("It's a stub!");
  41. }
  42. void setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) override {
  43. throw std::runtime_error("It's a stub!");
  44. }
  45. oatpp::data::stream::IOMode getOutputStreamIOMode() override {
  46. throw std::runtime_error("It's a stub!");
  47. }
  48. oatpp::data::stream::Context& getOutputStreamContext() override {
  49. throw std::runtime_error("It's a stub!");
  50. }
  51. void setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) override {
  52. throw std::runtime_error("It's a stub!");
  53. }
  54. oatpp::data::stream::IOMode getInputStreamIOMode() override {
  55. throw std::runtime_error("It's a stub!");
  56. }
  57. oatpp::data::stream::Context& getInputStreamContext() override {
  58. throw std::runtime_error("It's a stub!");
  59. }
  60. };
  61. class StubStreamProvider : public oatpp::network::ConnectionProvider {
  62. private:
  63. class Invalidator : public oatpp::provider::Invalidator<oatpp::data::stream::IOStream> {
  64. public:
  65. void invalidate(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) override {
  66. (void)connection;
  67. // DO Nothing.
  68. }
  69. };
  70. private:
  71. std::shared_ptr<Invalidator> m_invalidator = std::make_shared<Invalidator>();
  72. public:
  73. StubStreamProvider()
  74. : counter(0)
  75. {}
  76. std::atomic<v_int64> counter;
  77. oatpp::provider::ResourceHandle<oatpp::data::stream::IOStream> get() override {
  78. ++ counter;
  79. return oatpp::provider::ResourceHandle<oatpp::data::stream::IOStream>(
  80. std::make_shared<StubStream>(),
  81. m_invalidator
  82. );
  83. }
  84. oatpp::async::CoroutineStarterForResult<const oatpp::provider::ResourceHandle<oatpp::data::stream::IOStream>&> getAsync() override {
  85. class ConnectionCoroutine : public oatpp::async::CoroutineWithResult<ConnectionCoroutine, const oatpp::provider::ResourceHandle<oatpp::data::stream::IOStream>&> {
  86. private:
  87. std::shared_ptr<Invalidator> m_invalidator;
  88. public:
  89. ConnectionCoroutine(const std::shared_ptr<Invalidator>& invalidator)
  90. : m_invalidator(invalidator)
  91. {}
  92. Action act() override {
  93. return _return(oatpp::provider::ResourceHandle<oatpp::data::stream::IOStream>(
  94. std::make_shared<StubStream>(),
  95. m_invalidator
  96. ));
  97. }
  98. };
  99. ++ counter;
  100. return ConnectionCoroutine::startForResult(m_invalidator);
  101. }
  102. void stop() override {
  103. // DO NOTHING
  104. }
  105. };
  106. class ClientCoroutine : public oatpp::async::Coroutine<ClientCoroutine> {
  107. private:
  108. std::shared_ptr<ConnectionPool> m_pool;
  109. oatpp::provider::ResourceHandle<oatpp::data::stream::IOStream> m_connection;
  110. v_int32 m_repeats;
  111. bool m_invalidate;
  112. public:
  113. ClientCoroutine(const std::shared_ptr<ConnectionPool>& pool, bool invalidate)
  114. : m_pool(pool)
  115. , m_repeats(0)
  116. , m_invalidate(invalidate)
  117. {}
  118. Action act() override {
  119. return m_pool->getAsync().callbackTo(&ClientCoroutine::onConnection);
  120. }
  121. Action onConnection(const oatpp::provider::ResourceHandle<oatpp::data::stream::IOStream>& connection) {
  122. m_connection = connection;
  123. return yieldTo(&ClientCoroutine::useConnection);
  124. }
  125. Action useConnection() {
  126. if(m_repeats < 1) {
  127. m_repeats ++;
  128. return waitFor(std::chrono::milliseconds(100)).next(yieldTo(&ClientCoroutine::useConnection));
  129. }
  130. if(m_invalidate) {
  131. m_connection.invalidator->invalidate(m_connection.object);
  132. }
  133. return finish();
  134. }
  135. };
  136. void clientMethod(std::shared_ptr<ConnectionPool> pool, bool invalidate) {
  137. auto connection = pool->get();
  138. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  139. if(invalidate) {
  140. connection.invalidator->invalidate(connection.object);
  141. }
  142. }
  143. }
  144. void ConnectionPoolTest::onRun() {
  145. oatpp::async::Executor executor(1, 1, 1);
  146. auto connectionProvider = std::make_shared<StubStreamProvider>();
  147. auto pool = ConnectionPool::createShared(connectionProvider, 10 /* maxConnections */, std::chrono::seconds(10) /* maxConnectionTTL */);
  148. std::list<std::thread> threads;
  149. for(v_int32 i = 0; i < 100; i ++ ) {
  150. threads.push_back(std::thread(clientMethod, pool, false));
  151. executor.execute<ClientCoroutine>(pool, false);
  152. }
  153. for(std::thread& thread : threads) {
  154. thread.join();
  155. }
  156. executor.waitTasksFinished();
  157. OATPP_LOGD(TAG, "connections_counter=%d", connectionProvider->counter.load());
  158. OATPP_ASSERT(connectionProvider->counter <= 10);
  159. pool->stop();
  160. executor.stop();
  161. executor.join();
  162. /* wait pool cleanup task exit */
  163. std::this_thread::sleep_for(std::chrono::milliseconds(200));
  164. }
  165. }}}