ConnectionMonitorTest.cpp 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. /***************************************************************************
  2. *
  3. * Project _____ __ ____ _ _
  4. * ( _ ) /__\ (_ _)_| |_ _| |_
  5. * )(_)( /(__)\ )( (_ _)(_ _)
  6. * (_____)(__)(__)(__) |_| |_|
  7. *
  8. *
  9. * Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
  10. *
  11. * Licensed under the Apache License, Version 2.0 (the "License");
  12. * you may not use this file except in compliance with the License.
  13. * You may obtain a copy of the License at
  14. *
  15. * http://www.apache.org/licenses/LICENSE-2.0
  16. *
  17. * Unless required by applicable law or agreed to in writing, software
  18. * distributed under the License is distributed on an "AS IS" BASIS,
  19. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  20. * See the License for the specific language governing permissions and
  21. * limitations under the License.
  22. *
  23. ***************************************************************************/
  24. #include "ConnectionMonitorTest.hpp"
  25. #include "oatpp/web/client/HttpRequestExecutor.hpp"
  26. #include "oatpp/web/server/AsyncHttpConnectionHandler.hpp"
  27. #include "oatpp/web/server/HttpConnectionHandler.hpp"
  28. #include "oatpp/web/server/HttpRouter.hpp"
  29. #include "oatpp/web/protocol/http/outgoing/StreamingBody.hpp"
  30. #include "oatpp/network/monitor/ConnectionMonitor.hpp"
  31. #include "oatpp/network/monitor/ConnectionMaxAgeChecker.hpp"
  32. #include "oatpp/network/Server.hpp"
  33. #include "oatpp/network/tcp/client/ConnectionProvider.hpp"
  34. #include "oatpp/network/tcp/server/ConnectionProvider.hpp"
  35. #include <thread>
  36. namespace oatpp { namespace test { namespace network { namespace monitor {
  37. namespace {
  38. class ReadCallback : public oatpp::data::stream::ReadCallback {
  39. public:
  40. v_io_size read(void *buffer, v_buff_size count, async::Action &action) override {
  41. OATPP_LOGI("TEST", "read(...)")
  42. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  43. char* data = (char*) buffer;
  44. data[0] = 'A';
  45. return 1;
  46. }
  47. };
  48. class StreamingHandler : public oatpp::web::server::HttpRequestHandler {
  49. public:
  50. std::shared_ptr<OutgoingResponse> handle(const std::shared_ptr<IncomingRequest>& request) override {
  51. auto body = std::make_shared<oatpp::web::protocol::http::outgoing::StreamingBody>
  52. (std::make_shared<ReadCallback>());
  53. return OutgoingResponse::createShared(Status::CODE_200, body);
  54. }
  55. };
  56. class AsyncStreamingHandler : public oatpp::web::server::HttpRequestHandler {
  57. public:
  58. oatpp::async::CoroutineStarterForResult<const std::shared_ptr<OutgoingResponse>&>
  59. handleAsync(const std::shared_ptr<IncomingRequest>& request) {
  60. class StreamCoroutine : public oatpp::async::CoroutineWithResult<StreamCoroutine, const std::shared_ptr<OutgoingResponse>&> {
  61. public:
  62. Action act() override {
  63. auto body = std::make_shared<oatpp::web::protocol::http::outgoing::StreamingBody>
  64. (std::make_shared<ReadCallback>());
  65. return _return(OutgoingResponse::createShared(Status::CODE_200, body));
  66. }
  67. };
  68. return StreamCoroutine::startForResult();
  69. }
  70. };
  71. std::shared_ptr<oatpp::network::Server> runServer(const std::shared_ptr<oatpp::network::monitor::ConnectionMonitor>& monitor) {
  72. auto router = oatpp::web::server::HttpRouter::createShared();
  73. router->route("GET", "/stream", std::make_shared<StreamingHandler>());
  74. auto connectionHandler = oatpp::web::server::HttpConnectionHandler::createShared(router);
  75. auto server = std::make_shared<oatpp::network::Server>(monitor, connectionHandler);
  76. std::thread t([server, connectionHandler]{
  77. server->run();
  78. OATPP_LOGD("TEST", "server stopped");
  79. connectionHandler->stop();
  80. OATPP_LOGD("TEST", "connectionHandler stopped");
  81. });
  82. t.detach();
  83. return server;
  84. }
  85. std::shared_ptr<oatpp::network::Server> runAsyncServer(const std::shared_ptr<oatpp::network::monitor::ConnectionMonitor>& monitor) {
  86. auto router = oatpp::web::server::HttpRouter::createShared();
  87. router->route("GET", "/stream", std::make_shared<AsyncStreamingHandler>());
  88. auto executor = std::make_shared<oatpp::async::Executor>();
  89. auto connectionHandler = oatpp::web::server::AsyncHttpConnectionHandler::createShared(router, executor);
  90. auto server = std::make_shared<oatpp::network::Server>(monitor, connectionHandler);
  91. std::thread t([server, connectionHandler, executor]{
  92. server->run();
  93. OATPP_LOGD("TEST_ASYNC", "server stopped");
  94. connectionHandler->stop();
  95. OATPP_LOGD("TEST_ASYNC", "connectionHandler stopped");
  96. executor->waitTasksFinished();
  97. executor->stop();
  98. executor->join();
  99. OATPP_LOGD("TEST_ASYNC", "executor stopped");
  100. });
  101. t.detach();
  102. return server;
  103. }
  104. void runClient() {
  105. auto connectionProvider = oatpp::network::tcp::client::ConnectionProvider::createShared(
  106. {"localhost", 8000});
  107. oatpp::web::client::HttpRequestExecutor executor(connectionProvider);
  108. auto response = executor.execute("GET", "/stream", oatpp::web::protocol::http::Headers({}), nullptr, nullptr);
  109. OATPP_ASSERT(response->getStatusCode() == 200);
  110. auto data = response->readBodyToString();
  111. OATPP_ASSERT(data)
  112. OATPP_LOGD("TEST", "data->size() == %d", data->size())
  113. OATPP_ASSERT(data->size() < 110) // it should be less than 100. But we put 110 for redundancy
  114. }
  115. void runAsyncClient() {
  116. class ClientCoroutine : public oatpp::async::Coroutine<ClientCoroutine> {
  117. private:
  118. std::shared_ptr<oatpp::web::client::HttpRequestExecutor> m_executor;
  119. std::shared_ptr<oatpp::network::monitor::ConnectionMonitor> m_monitor;
  120. public:
  121. ClientCoroutine() {
  122. auto connectionProvider = oatpp::network::tcp::client::ConnectionProvider::createShared(
  123. {"localhost", 8000});
  124. m_monitor = std::make_shared<oatpp::network::monitor::ConnectionMonitor>(connectionProvider);
  125. m_monitor->addMetricsChecker(
  126. std::make_shared<oatpp::network::monitor::ConnectionMaxAgeChecker>(
  127. std::chrono::seconds(5)
  128. )
  129. );
  130. m_executor = oatpp::web::client::HttpRequestExecutor::createShared(m_monitor);
  131. }
  132. Action act() override {
  133. return m_executor->executeAsync("GET", "/stream", oatpp::web::protocol::http::Headers({}), nullptr, nullptr)
  134. .callbackTo(&ClientCoroutine::onResponse);
  135. }
  136. Action onResponse(const std::shared_ptr<oatpp::web::protocol::http::incoming::Response>& response) {
  137. OATPP_ASSERT(response->getStatusCode() == 200);
  138. return response->readBodyToStringAsync().callbackTo(&ClientCoroutine::onBody);
  139. }
  140. Action onBody(const oatpp::String& data) {
  141. OATPP_ASSERT(data)
  142. OATPP_LOGD("TEST", "data->size() == %d", data->size())
  143. OATPP_ASSERT(data->size() < 60) // it should be less than 50. But we put 60 for redundancy
  144. m_monitor->stop();
  145. return finish();
  146. }
  147. };
  148. auto executor = std::make_shared<oatpp::async::Executor>();
  149. executor->execute<ClientCoroutine>();
  150. executor->waitTasksFinished();
  151. OATPP_LOGD("TEST_ASYNC_CLIENT", "task finished")
  152. executor->stop();
  153. OATPP_LOGD("TEST_ASYNC_CLIENT", "executor stopped")
  154. executor->join();
  155. OATPP_LOGD("TEST_ASYNC_CLIENT", "done")
  156. }
  157. }
  158. void ConnectionMonitorTest::onRun() {
  159. auto connectionProvider = oatpp::network::tcp::server::ConnectionProvider::createShared(
  160. {"localhost", 8000});
  161. auto monitor = std::make_shared<oatpp::network::monitor::ConnectionMonitor>(connectionProvider);
  162. monitor->addMetricsChecker(
  163. std::make_shared<oatpp::network::monitor::ConnectionMaxAgeChecker>(
  164. std::chrono::seconds(10)
  165. )
  166. );
  167. {
  168. OATPP_LOGD(TAG, "run simple API test")
  169. auto server = runServer(monitor);
  170. runClient();
  171. server->stop();
  172. std::this_thread::sleep_for(std::chrono::seconds(5));
  173. }
  174. {
  175. OATPP_LOGD(TAG, "run Async API test")
  176. auto server = runAsyncServer(monitor);
  177. runClient();
  178. server->stop();
  179. std::this_thread::sleep_for(std::chrono::seconds(5));
  180. }
  181. {
  182. OATPP_LOGD(TAG, "run Async Client test")
  183. auto server = runServer(monitor);
  184. runAsyncClient();
  185. server->stop();
  186. std::this_thread::sleep_for(std::chrono::seconds(5));
  187. }
  188. monitor->stop();
  189. }
  190. }}}}