HttpProcessor.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. /***************************************************************************
  2. *
  3. * Project _____ __ ____ _ _
  4. * ( _ ) /__\ (_ _)_| |_ _| |_
  5. * )(_)( /(__)\ )( (_ _)(_ _)
  6. * (_____)(__)(__)(__) |_| |_|
  7. *
  8. *
  9. * Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
  10. *
  11. * Licensed under the Apache License, Version 2.0 (the "License");
  12. * you may not use this file except in compliance with the License.
  13. * You may obtain a copy of the License at
  14. *
  15. * http://www.apache.org/licenses/LICENSE-2.0
  16. *
  17. * Unless required by applicable law or agreed to in writing, software
  18. * distributed under the License is distributed on an "AS IS" BASIS,
  19. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  20. * See the License for the specific language governing permissions and
  21. * limitations under the License.
  22. *
  23. ***************************************************************************/
  24. #include "HttpProcessor.hpp"
  25. #include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp"
  26. #include "oatpp/core/data/stream/BufferStream.hpp"
  27. namespace oatpp { namespace web { namespace server {
  28. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  29. // Components
  30. HttpProcessor::Components::Components(const std::shared_ptr<HttpRouter>& pRouter,
  31. const std::shared_ptr<protocol::http::encoding::ProviderCollection>& pContentEncodingProviders,
  32. const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& pBodyDecoder,
  33. const std::shared_ptr<handler::ErrorHandler>& pErrorHandler,
  34. const RequestInterceptors& pRequestInterceptors,
  35. const ResponseInterceptors& pResponseInterceptors,
  36. const std::shared_ptr<Config>& pConfig)
  37. : router(pRouter)
  38. , contentEncodingProviders(pContentEncodingProviders)
  39. , bodyDecoder(pBodyDecoder)
  40. , errorHandler(pErrorHandler)
  41. , requestInterceptors(pRequestInterceptors)
  42. , responseInterceptors(pResponseInterceptors)
  43. , config(pConfig)
  44. {}
  45. HttpProcessor::Components::Components(const std::shared_ptr<HttpRouter>& pRouter)
  46. : Components(pRouter,
  47. nullptr,
  48. std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>(),
  49. handler::DefaultErrorHandler::createShared(),
  50. {},
  51. {},
  52. std::make_shared<Config>())
  53. {}
  54. HttpProcessor::Components::Components(const std::shared_ptr<HttpRouter>& pRouter, const std::shared_ptr<Config>& pConfig)
  55. : Components(pRouter,
  56. nullptr,
  57. std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>(),
  58. handler::DefaultErrorHandler::createShared(),
  59. {},
  60. {},
  61. pConfig)
  62. {}
  63. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  64. // Other
  65. HttpProcessor::ProcessingResources::ProcessingResources(const std::shared_ptr<Components>& pComponents,
  66. const provider::ResourceHandle<oatpp::data::stream::IOStream>& pConnection)
  67. : components(pComponents)
  68. , connection(pConnection)
  69. , headersInBuffer(components->config->headersInBufferInitial)
  70. , headersOutBuffer(components->config->headersOutBufferInitial)
  71. , headersReader(&headersInBuffer, components->config->headersReaderChunkSize, components->config->headersReaderMaxSize)
  72. , inStream(data::stream::InputStreamBufferedProxy::createShared(connection.object, std::make_shared<std::string>(data::buffer::IOBuffer::BUFFER_SIZE, 0)))
  73. {}
  74. std::shared_ptr<protocol::http::outgoing::Response>
  75. HttpProcessor::processNextRequest(ProcessingResources& resources,
  76. const std::shared_ptr<protocol::http::incoming::Request>& request,
  77. ConnectionState& connectionState)
  78. {
  79. std::shared_ptr<protocol::http::outgoing::Response> response;
  80. try{
  81. for(auto& interceptor : resources.components->requestInterceptors) {
  82. response = interceptor->intercept(request);
  83. if(response) {
  84. return response;
  85. }
  86. }
  87. auto route = resources.components->router->getRoute(request->getStartingLine().method, request->getStartingLine().path);
  88. if(!route) {
  89. data::stream::BufferOutputStream ss;
  90. ss << "No mapping for HTTP-method: '" << request->getStartingLine().method.toString()
  91. << "', URL: '" << request->getStartingLine().path.toString() << "'";
  92. connectionState = ConnectionState::CLOSING;
  93. oatpp::web::protocol::http::HttpError error(protocol::http::Status::CODE_404, ss.toString());
  94. auto ptr = std::make_exception_ptr(error);
  95. return resources.components->errorHandler->handleError(ptr);
  96. }
  97. request->setPathVariables(route.getMatchMap());
  98. return route.getEndpoint()->handle(request);
  99. } catch (...) {
  100. response = resources.components->errorHandler->handleError(std::current_exception());
  101. connectionState = ConnectionState::CLOSING;
  102. }
  103. return response;
  104. }
  105. HttpProcessor::ConnectionState HttpProcessor::processNextRequest(ProcessingResources& resources) {
  106. oatpp::web::protocol::http::HttpError::Info error;
  107. auto headersReadResult = resources.headersReader.readHeaders(resources.inStream.get(), error);
  108. if(error.ioStatus <= 0) {
  109. return ConnectionState::DEAD;
  110. }
  111. ConnectionState connectionState = ConnectionState::ALIVE;
  112. std::shared_ptr<protocol::http::incoming::Request> request;
  113. std::shared_ptr<protocol::http::outgoing::Response> response;
  114. if(error.status.code != 0) {
  115. oatpp::web::protocol::http::HttpError httpError(error.status, "Invalid Request Headers");
  116. auto eptr = std::make_exception_ptr(httpError);
  117. response = resources.components->errorHandler->handleError(eptr);
  118. connectionState = ConnectionState::CLOSING;
  119. } else {
  120. request = protocol::http::incoming::Request::createShared(resources.connection.object,
  121. headersReadResult.startingLine,
  122. headersReadResult.headers,
  123. resources.inStream,
  124. resources.components->bodyDecoder);
  125. response = processNextRequest(resources, request, connectionState);
  126. try {
  127. for (auto& interceptor : resources.components->responseInterceptors) {
  128. response = interceptor->intercept(request, response);
  129. if (!response) {
  130. oatpp::web::protocol::http::HttpError httpError(protocol::http::Status::CODE_500, "Response Interceptor returned an Invalid Response - 'null'");
  131. auto eptr = std::make_exception_ptr(httpError);
  132. response = resources.components->errorHandler->handleError(eptr);
  133. connectionState = ConnectionState::CLOSING;
  134. }
  135. }
  136. } catch (...) {
  137. response = resources.components->errorHandler->handleError(std::current_exception());
  138. connectionState = ConnectionState::CLOSING;
  139. }
  140. response->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER);
  141. protocol::http::utils::CommunicationUtils::considerConnectionState(request, response, connectionState);
  142. switch(connectionState) {
  143. case ConnectionState::ALIVE :
  144. response->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_KEEP_ALIVE);
  145. break;
  146. case ConnectionState::CLOSING:
  147. case ConnectionState::DEAD:
  148. response->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_CLOSE);
  149. break;
  150. default:
  151. break;
  152. }
  153. }
  154. auto contentEncoderProvider =
  155. protocol::http::utils::CommunicationUtils::selectEncoder(request, resources.components->contentEncodingProviders);
  156. response->send(resources.connection.object.get(), &resources.headersOutBuffer, contentEncoderProvider.get());
  157. /* Delegate connection handling to another handler only after the response is sent to the client */
  158. if(connectionState == ConnectionState::DELEGATED) {
  159. auto handler = response->getConnectionUpgradeHandler();
  160. if(handler) {
  161. handler->handleConnection(resources.connection, response->getConnectionUpgradeParameters());
  162. connectionState = ConnectionState::DELEGATED;
  163. } else {
  164. OATPP_LOGW("[oatpp::web::server::HttpProcessor::processNextRequest()]", "Warning. ConnectionUpgradeHandler not set!");
  165. connectionState = ConnectionState::CLOSING;
  166. }
  167. }
  168. return connectionState;
  169. }
  170. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  171. // Task
  172. HttpProcessor::Task::Task(const std::shared_ptr<Components>& components,
  173. const provider::ResourceHandle<oatpp::data::stream::IOStream>& connection,
  174. TaskProcessingListener* taskListener)
  175. : m_components(components)
  176. , m_connection(connection)
  177. , m_taskListener(taskListener)
  178. {
  179. m_taskListener->onTaskStart(m_connection);
  180. }
  181. HttpProcessor::Task::Task(HttpProcessor::Task &&other)
  182. : m_components(std::move(other.m_components))
  183. , m_connection(std::move(other.m_connection))
  184. , m_taskListener(other.m_taskListener)
  185. {
  186. other.m_taskListener = nullptr;
  187. }
  188. HttpProcessor::Task::~Task() {
  189. if (m_taskListener != nullptr) {
  190. m_taskListener->onTaskEnd(m_connection);
  191. }
  192. }
  193. HttpProcessor::Task &HttpProcessor::Task::operator=(HttpProcessor::Task &&other) {
  194. m_components = std::move(other.m_components);
  195. m_connection = std::move(other.m_connection);
  196. m_taskListener = other.m_taskListener;
  197. other.m_taskListener = nullptr;
  198. return *this;
  199. }
  200. void HttpProcessor::Task::run(){
  201. m_connection.object->initContexts();
  202. ProcessingResources resources(m_components, m_connection);
  203. ConnectionState connectionState;
  204. try {
  205. do {
  206. connectionState = HttpProcessor::processNextRequest(resources);
  207. } while (connectionState == ConnectionState::ALIVE);
  208. } catch (...) {
  209. // DO NOTHING
  210. }
  211. }
  212. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  213. // HttpProcessor::Coroutine
  214. HttpProcessor::Coroutine::Coroutine(const std::shared_ptr<Components>& components,
  215. const provider::ResourceHandle<oatpp::data::stream::IOStream>& connection,
  216. TaskProcessingListener* taskListener)
  217. : m_components(components)
  218. , m_connection(connection)
  219. , m_headersInBuffer(components->config->headersInBufferInitial)
  220. , m_headersReader(&m_headersInBuffer, components->config->headersReaderChunkSize, components->config->headersReaderMaxSize)
  221. , m_headersOutBuffer(std::make_shared<oatpp::data::stream::BufferOutputStream>(components->config->headersOutBufferInitial))
  222. , m_inStream(data::stream::InputStreamBufferedProxy::createShared(m_connection.object, std::make_shared<std::string>(data::buffer::IOBuffer::BUFFER_SIZE, 0)))
  223. , m_connectionState(ConnectionState::ALIVE)
  224. , m_taskListener(taskListener)
  225. {
  226. m_taskListener->onTaskStart(m_connection);
  227. }
  228. HttpProcessor::Coroutine::~Coroutine() {
  229. m_taskListener->onTaskEnd(m_connection);
  230. }
  231. HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() {
  232. return m_connection.object->initContextsAsync().next(yieldTo(&HttpProcessor::Coroutine::parseHeaders));
  233. }
  234. HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::parseHeaders() {
  235. return m_headersReader.readHeadersAsync(m_inStream).callbackTo(&HttpProcessor::Coroutine::onHeadersParsed);
  236. }
  237. oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHeadersReader::Result& headersReadResult) {
  238. m_currentRequest = protocol::http::incoming::Request::createShared(m_connection.object,
  239. headersReadResult.startingLine,
  240. headersReadResult.headers,
  241. m_inStream,
  242. m_components->bodyDecoder);
  243. for(auto& interceptor : m_components->requestInterceptors) {
  244. m_currentResponse = interceptor->intercept(m_currentRequest);
  245. if(m_currentResponse) {
  246. return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
  247. }
  248. }
  249. m_currentRoute = m_components->router->getRoute(headersReadResult.startingLine.method.toString(), headersReadResult.startingLine.path.toString());
  250. if(!m_currentRoute) {
  251. data::stream::BufferOutputStream ss;
  252. ss << "No mapping for HTTP-method: '" << headersReadResult.startingLine.method.toString()
  253. << "', URL: '" << headersReadResult.startingLine.path.toString() << "'";
  254. oatpp::web::protocol::http::HttpError error(protocol::http::Status::CODE_404, ss.toString());
  255. auto eptr = std::make_exception_ptr(error);
  256. m_currentResponse = m_components->errorHandler->handleError(eptr);
  257. m_connectionState = ConnectionState::CLOSING;
  258. return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
  259. }
  260. m_currentRequest->setPathVariables(m_currentRoute.getMatchMap());
  261. return yieldTo(&HttpProcessor::Coroutine::onRequestFormed);
  262. }
  263. HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestFormed() {
  264. return m_currentRoute.getEndpoint()->handleAsync(m_currentRequest).callbackTo(&HttpProcessor::Coroutine::onResponse);
  265. }
  266. HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponse(const std::shared_ptr<protocol::http::outgoing::Response>& response) {
  267. m_currentResponse = response;
  268. return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
  269. }
  270. HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() {
  271. for(auto& interceptor : m_components->responseInterceptors) {
  272. m_currentResponse = interceptor->intercept(m_currentRequest, m_currentResponse);
  273. if(!m_currentResponse) {
  274. oatpp::web::protocol::http::HttpError error(protocol::http::Status::CODE_500, "Response Interceptor returned an Invalid Response - 'null'");
  275. auto eptr = std::make_exception_ptr(error);
  276. m_currentResponse = m_components->errorHandler->handleError(eptr);
  277. }
  278. }
  279. m_currentResponse->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER);
  280. oatpp::web::protocol::http::utils::CommunicationUtils::considerConnectionState(m_currentRequest, m_currentResponse, m_connectionState);
  281. switch(m_connectionState) {
  282. case ConnectionState::ALIVE :
  283. m_currentResponse->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_KEEP_ALIVE);
  284. break;
  285. case ConnectionState::CLOSING:
  286. case ConnectionState::DEAD:
  287. m_currentResponse->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_CLOSE);
  288. break;
  289. default:
  290. break;
  291. }
  292. auto contentEncoderProvider =
  293. protocol::http::utils::CommunicationUtils::selectEncoder(m_currentRequest, m_components->contentEncodingProviders);
  294. return protocol::http::outgoing::Response::sendAsync(m_currentResponse, m_connection.object, m_headersOutBuffer, contentEncoderProvider)
  295. .next(yieldTo(&HttpProcessor::Coroutine::onRequestDone));
  296. }
  297. HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestDone() {
  298. switch (m_connectionState) {
  299. case ConnectionState::ALIVE:
  300. return yieldTo(&HttpProcessor::Coroutine::parseHeaders);
  301. /* Delegate connection handling to another handler only after the response is sent to the client */
  302. case ConnectionState::DELEGATED: {
  303. auto handler = m_currentResponse->getConnectionUpgradeHandler();
  304. if(handler) {
  305. handler->handleConnection(m_connection, m_currentResponse->getConnectionUpgradeParameters());
  306. m_connectionState = ConnectionState::DELEGATED;
  307. } else {
  308. OATPP_LOGW("[oatpp::web::server::HttpProcessor::Coroutine::onResponseFormed()]", "Warning. ConnectionUpgradeHandler not set!");
  309. m_connectionState = ConnectionState::CLOSING;
  310. }
  311. break;
  312. }
  313. default:
  314. break;
  315. }
  316. return finish();
  317. }
  318. HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::handleError(Error* error) {
  319. if(error) {
  320. if(error->is<oatpp::AsyncIOError>()) {
  321. auto aioe = static_cast<oatpp::AsyncIOError*>(error);
  322. if(aioe->getCode() == oatpp::IOError::BROKEN_PIPE) {
  323. return aioe; // do not report BROKEN_PIPE error
  324. }
  325. }
  326. if(m_currentResponse) {
  327. //OATPP_LOGE("[oatpp::web::server::HttpProcessor::Coroutine::handleError()]", "Unhandled error. '%s'. Dropping connection", error->what());
  328. return error;
  329. }
  330. oatpp::web::protocol::http::HttpError httpError(protocol::http::Status::CODE_500, error->what());
  331. auto eptr = std::make_exception_ptr(httpError);
  332. m_currentResponse = m_components->errorHandler->handleError(eptr);
  333. return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
  334. }
  335. return error;
  336. }
  337. }}}