HttpProcessor.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. /***************************************************************************
  2. *
  3. * Project _____ __ ____ _ _
  4. * ( _ ) /__\ (_ _)_| |_ _| |_
  5. * )(_)( /(__)\ )( (_ _)(_ _)
  6. * (_____)(__)(__)(__) |_| |_|
  7. *
  8. *
  9. * Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
  10. *
  11. * Licensed under the Apache License, Version 2.0 (the "License");
  12. * you may not use this file except in compliance with the License.
  13. * You may obtain a copy of the License at
  14. *
  15. * http://www.apache.org/licenses/LICENSE-2.0
  16. *
  17. * Unless required by applicable law or agreed to in writing, software
  18. * distributed under the License is distributed on an "AS IS" BASIS,
  19. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  20. * See the License for the specific language governing permissions and
  21. * limitations under the License.
  22. *
  23. ***************************************************************************/
  24. #include "HttpProcessor.hpp"
  25. #include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp"
  26. #include "oatpp/core/data/stream/BufferStream.hpp"
  27. namespace oatpp { namespace web { namespace server {
  28. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  29. // Components
  30. HttpProcessor::Components::Components(const std::shared_ptr<HttpRouter>& pRouter,
  31. const std::shared_ptr<protocol::http::encoding::ProviderCollection>& pContentEncodingProviders,
  32. const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& pBodyDecoder,
  33. const std::shared_ptr<handler::ErrorHandler>& pErrorHandler,
  34. const RequestInterceptors& pRequestInterceptors,
  35. const ResponseInterceptors& pResponseInterceptors,
  36. const std::shared_ptr<Config>& pConfig)
  37. : router(pRouter)
  38. , contentEncodingProviders(pContentEncodingProviders)
  39. , bodyDecoder(pBodyDecoder)
  40. , errorHandler(pErrorHandler)
  41. , requestInterceptors(pRequestInterceptors)
  42. , responseInterceptors(pResponseInterceptors)
  43. , config(pConfig)
  44. {}
  45. HttpProcessor::Components::Components(const std::shared_ptr<HttpRouter>& pRouter)
  46. : Components(pRouter,
  47. nullptr,
  48. std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>(),
  49. handler::DefaultErrorHandler::createShared(),
  50. {},
  51. {},
  52. std::make_shared<Config>())
  53. {}
  54. HttpProcessor::Components::Components(const std::shared_ptr<HttpRouter>& pRouter, const std::shared_ptr<Config>& pConfig)
  55. : Components(pRouter,
  56. nullptr,
  57. std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>(),
  58. handler::DefaultErrorHandler::createShared(),
  59. {},
  60. {},
  61. pConfig)
  62. {}
  63. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  64. // Other
  65. HttpProcessor::ProcessingResources::ProcessingResources(const std::shared_ptr<Components>& pComponents,
  66. const provider::ResourceHandle<oatpp::data::stream::IOStream>& pConnection)
  67. : components(pComponents)
  68. , connection(pConnection)
  69. , headersInBuffer(components->config->headersInBufferInitial)
  70. , headersOutBuffer(components->config->headersOutBufferInitial)
  71. , headersReader(&headersInBuffer, components->config->headersReaderChunkSize, components->config->headersReaderMaxSize)
  72. , inStream(data::stream::InputStreamBufferedProxy::createShared(connection.object, std::make_shared<std::string>(data::buffer::IOBuffer::BUFFER_SIZE, 0)))
  73. {}
  74. std::shared_ptr<protocol::http::outgoing::Response>
  75. HttpProcessor::processNextRequest(ProcessingResources& resources,
  76. const std::shared_ptr<protocol::http::incoming::Request>& request,
  77. ConnectionState& connectionState)
  78. {
  79. std::shared_ptr<protocol::http::outgoing::Response> response;
  80. try{
  81. for(auto& interceptor : resources.components->requestInterceptors) {
  82. response = interceptor->intercept(request);
  83. if(response) {
  84. return response;
  85. }
  86. }
  87. auto route = resources.components->router->getRoute(request->getStartingLine().method, request->getStartingLine().path);
  88. if(!route) {
  89. data::stream::BufferOutputStream ss;
  90. ss << "No mapping for HTTP-method: '" << request->getStartingLine().method.toString()
  91. << "', URL: '" << request->getStartingLine().path.toString() << "'";
  92. connectionState = ConnectionState::CLOSING;
  93. return resources.components->errorHandler->handleError(protocol::http::Status::CODE_404, ss.toString());
  94. }
  95. request->setPathVariables(route.getMatchMap());
  96. return route.getEndpoint()->handle(request);
  97. } catch (oatpp::web::protocol::http::HttpError& error) {
  98. response = resources.components->errorHandler->handleError(error.getInfo().status, error.getMessage(), error.getHeaders());
  99. connectionState = ConnectionState::CLOSING;
  100. } catch (std::exception& error) {
  101. response = resources.components->errorHandler->handleError(protocol::http::Status::CODE_500, error.what());
  102. connectionState = ConnectionState::CLOSING;
  103. } catch (...) {
  104. response = resources.components->errorHandler->handleError(protocol::http::Status::CODE_500, "Unhandled Error");
  105. connectionState = ConnectionState::CLOSING;
  106. }
  107. return response;
  108. }
  109. HttpProcessor::ConnectionState HttpProcessor::processNextRequest(ProcessingResources& resources) {
  110. oatpp::web::protocol::http::HttpError::Info error;
  111. auto headersReadResult = resources.headersReader.readHeaders(resources.inStream.get(), error);
  112. if(error.ioStatus <= 0) {
  113. return ConnectionState::DEAD;
  114. }
  115. ConnectionState connectionState = ConnectionState::ALIVE;
  116. std::shared_ptr<protocol::http::incoming::Request> request;
  117. std::shared_ptr<protocol::http::outgoing::Response> response;
  118. if(error.status.code != 0) {
  119. response = resources.components->errorHandler->handleError(error.status, "Invalid Request Headers");
  120. connectionState = ConnectionState::CLOSING;
  121. } else {
  122. request = protocol::http::incoming::Request::createShared(resources.connection.object,
  123. headersReadResult.startingLine,
  124. headersReadResult.headers,
  125. resources.inStream,
  126. resources.components->bodyDecoder);
  127. response = processNextRequest(resources, request, connectionState);
  128. try {
  129. for (auto& interceptor : resources.components->responseInterceptors) {
  130. response = interceptor->intercept(request, response);
  131. if (!response) {
  132. response = resources.components->errorHandler->handleError(
  133. protocol::http::Status::CODE_500,
  134. "Response Interceptor returned an Invalid Response - 'null'"
  135. );
  136. connectionState = ConnectionState::CLOSING;
  137. }
  138. }
  139. } catch (...) {
  140. response = resources.components->errorHandler->handleError(
  141. protocol::http::Status::CODE_500,
  142. "Unhandled Error in Response Interceptor"
  143. );
  144. connectionState = ConnectionState::CLOSING;
  145. }
  146. response->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER);
  147. protocol::http::utils::CommunicationUtils::considerConnectionState(request, response, connectionState);
  148. switch(connectionState) {
  149. case ConnectionState::ALIVE :
  150. response->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_KEEP_ALIVE);
  151. break;
  152. case ConnectionState::CLOSING:
  153. case ConnectionState::DEAD:
  154. response->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_CLOSE);
  155. break;
  156. default:
  157. break;
  158. }
  159. }
  160. auto contentEncoderProvider =
  161. protocol::http::utils::CommunicationUtils::selectEncoder(request, resources.components->contentEncodingProviders);
  162. response->send(resources.connection.object.get(), &resources.headersOutBuffer, contentEncoderProvider.get());
  163. /* Delegate connection handling to another handler only after the response is sent to the client */
  164. if(connectionState == ConnectionState::DELEGATED) {
  165. auto handler = response->getConnectionUpgradeHandler();
  166. if(handler) {
  167. handler->handleConnection(resources.connection, response->getConnectionUpgradeParameters());
  168. connectionState = ConnectionState::DELEGATED;
  169. } else {
  170. OATPP_LOGW("[oatpp::web::server::HttpProcessor::processNextRequest()]", "Warning. ConnectionUpgradeHandler not set!");
  171. connectionState = ConnectionState::CLOSING;
  172. }
  173. }
  174. return connectionState;
  175. }
  176. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  177. // Task
  178. HttpProcessor::Task::Task(const std::shared_ptr<Components>& components,
  179. const provider::ResourceHandle<oatpp::data::stream::IOStream>& connection,
  180. TaskProcessingListener* taskListener)
  181. : m_components(components)
  182. , m_connection(connection)
  183. , m_taskListener(taskListener)
  184. {
  185. m_taskListener->onTaskStart(m_connection);
  186. }
  187. HttpProcessor::Task::Task(HttpProcessor::Task &&other)
  188. : m_components(std::move(other.m_components))
  189. , m_connection(std::move(other.m_connection))
  190. , m_taskListener(other.m_taskListener)
  191. {
  192. other.m_taskListener = nullptr;
  193. }
  194. HttpProcessor::Task::~Task() {
  195. if (m_taskListener != nullptr) {
  196. m_taskListener->onTaskEnd(m_connection);
  197. }
  198. }
  199. HttpProcessor::Task &HttpProcessor::Task::operator=(HttpProcessor::Task &&other) {
  200. m_components = std::move(other.m_components);
  201. m_connection = std::move(other.m_connection);
  202. m_taskListener = other.m_taskListener;
  203. other.m_taskListener = nullptr;
  204. return *this;
  205. }
  206. void HttpProcessor::Task::run(){
  207. m_connection.object->initContexts();
  208. ProcessingResources resources(m_components, m_connection);
  209. ConnectionState connectionState;
  210. try {
  211. do {
  212. connectionState = HttpProcessor::processNextRequest(resources);
  213. } while (connectionState == ConnectionState::ALIVE);
  214. } catch (...) {
  215. // DO NOTHING
  216. }
  217. }
  218. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  219. // HttpProcessor::Coroutine
  220. HttpProcessor::Coroutine::Coroutine(const std::shared_ptr<Components>& components,
  221. const provider::ResourceHandle<oatpp::data::stream::IOStream>& connection,
  222. TaskProcessingListener* taskListener)
  223. : m_components(components)
  224. , m_connection(connection)
  225. , m_headersInBuffer(components->config->headersInBufferInitial)
  226. , m_headersReader(&m_headersInBuffer, components->config->headersReaderChunkSize, components->config->headersReaderMaxSize)
  227. , m_headersOutBuffer(std::make_shared<oatpp::data::stream::BufferOutputStream>(components->config->headersOutBufferInitial))
  228. , m_inStream(data::stream::InputStreamBufferedProxy::createShared(m_connection.object, std::make_shared<std::string>(data::buffer::IOBuffer::BUFFER_SIZE, 0)))
  229. , m_connectionState(ConnectionState::ALIVE)
  230. , m_taskListener(taskListener)
  231. {
  232. m_taskListener->onTaskStart(m_connection);
  233. }
  234. HttpProcessor::Coroutine::~Coroutine() {
  235. m_taskListener->onTaskEnd(m_connection);
  236. }
  237. HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() {
  238. return m_connection.object->initContextsAsync().next(yieldTo(&HttpProcessor::Coroutine::parseHeaders));
  239. }
  240. HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::parseHeaders() {
  241. return m_headersReader.readHeadersAsync(m_inStream).callbackTo(&HttpProcessor::Coroutine::onHeadersParsed);
  242. }
  243. oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHeadersReader::Result& headersReadResult) {
  244. m_currentRequest = protocol::http::incoming::Request::createShared(m_connection.object,
  245. headersReadResult.startingLine,
  246. headersReadResult.headers,
  247. m_inStream,
  248. m_components->bodyDecoder);
  249. for(auto& interceptor : m_components->requestInterceptors) {
  250. m_currentResponse = interceptor->intercept(m_currentRequest);
  251. if(m_currentResponse) {
  252. return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
  253. }
  254. }
  255. m_currentRoute = m_components->router->getRoute(headersReadResult.startingLine.method.toString(), headersReadResult.startingLine.path.toString());
  256. if(!m_currentRoute) {
  257. data::stream::BufferOutputStream ss;
  258. ss << "No mapping for HTTP-method: '" << headersReadResult.startingLine.method.toString()
  259. << "', URL: '" << headersReadResult.startingLine.path.toString() << "'";
  260. m_currentResponse = m_components->errorHandler->handleError(protocol::http::Status::CODE_404, ss.toString());
  261. m_connectionState = ConnectionState::CLOSING;
  262. return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
  263. }
  264. m_currentRequest->setPathVariables(m_currentRoute.getMatchMap());
  265. return yieldTo(&HttpProcessor::Coroutine::onRequestFormed);
  266. }
  267. HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestFormed() {
  268. return m_currentRoute.getEndpoint()->handleAsync(m_currentRequest).callbackTo(&HttpProcessor::Coroutine::onResponse);
  269. }
  270. HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponse(const std::shared_ptr<protocol::http::outgoing::Response>& response) {
  271. m_currentResponse = response;
  272. return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
  273. }
  274. HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() {
  275. for(auto& interceptor : m_components->responseInterceptors) {
  276. m_currentResponse = interceptor->intercept(m_currentRequest, m_currentResponse);
  277. if(!m_currentResponse) {
  278. m_currentResponse = m_components->errorHandler->handleError(
  279. protocol::http::Status::CODE_500,
  280. "Response Interceptor returned an Invalid Response - 'null'"
  281. );
  282. }
  283. }
  284. m_currentResponse->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER);
  285. oatpp::web::protocol::http::utils::CommunicationUtils::considerConnectionState(m_currentRequest, m_currentResponse, m_connectionState);
  286. switch(m_connectionState) {
  287. case ConnectionState::ALIVE :
  288. m_currentResponse->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_KEEP_ALIVE);
  289. break;
  290. case ConnectionState::CLOSING:
  291. case ConnectionState::DEAD:
  292. m_currentResponse->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_CLOSE);
  293. break;
  294. default:
  295. break;
  296. }
  297. auto contentEncoderProvider =
  298. protocol::http::utils::CommunicationUtils::selectEncoder(m_currentRequest, m_components->contentEncodingProviders);
  299. return protocol::http::outgoing::Response::sendAsync(m_currentResponse, m_connection.object, m_headersOutBuffer, contentEncoderProvider)
  300. .next(yieldTo(&HttpProcessor::Coroutine::onRequestDone));
  301. }
  302. HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestDone() {
  303. switch (m_connectionState) {
  304. case ConnectionState::ALIVE:
  305. return yieldTo(&HttpProcessor::Coroutine::parseHeaders);
  306. /* Delegate connection handling to another handler only after the response is sent to the client */
  307. case ConnectionState::DELEGATED: {
  308. auto handler = m_currentResponse->getConnectionUpgradeHandler();
  309. if(handler) {
  310. handler->handleConnection(m_connection, m_currentResponse->getConnectionUpgradeParameters());
  311. m_connectionState = ConnectionState::DELEGATED;
  312. } else {
  313. OATPP_LOGW("[oatpp::web::server::HttpProcessor::Coroutine::onResponseFormed()]", "Warning. ConnectionUpgradeHandler not set!");
  314. m_connectionState = ConnectionState::CLOSING;
  315. }
  316. break;
  317. }
  318. default:
  319. break;
  320. }
  321. return finish();
  322. }
  323. HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::handleError(Error* error) {
  324. if(error) {
  325. if(error->is<oatpp::AsyncIOError>()) {
  326. auto aioe = static_cast<oatpp::AsyncIOError*>(error);
  327. if(aioe->getCode() == oatpp::IOError::BROKEN_PIPE) {
  328. return aioe; // do not report BROKEN_PIPE error
  329. }
  330. }
  331. if(m_currentResponse) {
  332. //OATPP_LOGE("[oatpp::web::server::HttpProcessor::Coroutine::handleError()]", "Unhandled error. '%s'. Dropping connection", error->what());
  333. return error;
  334. }
  335. m_currentResponse = m_components->errorHandler->handleError(protocol::http::Status::CODE_500, error->what());
  336. return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
  337. }
  338. return error;
  339. }
  340. }}}