Bläddra i källkod

Merge pull request #166 from oatpp/thread_safe_LazyStringMap

LazyStringMap. Synchronize for thread-safety.
Leonid Stryzhevskyi 4 år sedan
förälder
incheckning
857315c01e

+ 90 - 8
src/oatpp/core/data/share/LazyStringMap.hpp

@@ -26,6 +26,7 @@
 #define oatpp_data_share_LazyStringMap_hpp
 
 #include "./MemoryLabel.hpp"
+#include "oatpp/core/concurrency/SpinLock.hpp"
 #include <unordered_map>
 
 namespace oatpp { namespace data { namespace share {
@@ -39,6 +40,7 @@ namespace oatpp { namespace data { namespace share {
 template<class Key>
 class LazyStringMap {
 private:
+  mutable concurrency::SpinLock m_lock;
   mutable bool m_fullyInitialized;
   std::unordered_map<Key, StringKeyLabel> m_map;
 public:
@@ -51,26 +53,61 @@ public:
   {}
 
   /**
-   * Default copy-constructor.
+   * Copy-constructor.
    * @param other
    */
-  LazyStringMap(const LazyStringMap& other) = default;
+  LazyStringMap(const LazyStringMap& other) {
+
+    std::lock_guard<concurrency::SpinLock> otherLock(other.m_lock);
+
+    m_fullyInitialized = other.m_fullyInitialized;
+    m_map = std::unordered_map<Key, StringKeyLabel>(other.m_map);
+
+  }
 
   /**
    * Move constructor.
    * @param other
    */
-  LazyStringMap(LazyStringMap&& other)
-    : m_fullyInitialized(other.m_fullyInitialized)
-    , m_map(std::move(other.m_map))
-  {}
+  LazyStringMap(LazyStringMap&& other) {
 
-  LazyStringMap& operator = (LazyStringMap& other) = default;
+    std::lock_guard<concurrency::SpinLock> otherLock(other.m_lock);
 
-  LazyStringMap& operator = (LazyStringMap&& other){
     m_fullyInitialized = other.m_fullyInitialized;
     m_map = std::move(other.m_map);
+
+  }
+
+  LazyStringMap& operator = (const LazyStringMap& other) {
+
+    if(this != &other) {
+
+      std::lock_guard<concurrency::SpinLock> thisLock(m_lock);
+      std::lock_guard<concurrency::SpinLock> otherLock(other.m_lock);
+
+      m_fullyInitialized = other.m_fullyInitialized;
+      m_map = std::unordered_map<Key, StringKeyLabel>(other.m_map);
+
+    }
+
     return *this;
+
+  }
+
+  LazyStringMap& operator = (LazyStringMap&& other) {
+
+    if(this != &other) {
+
+      std::lock_guard<concurrency::SpinLock> thisLock(m_lock);
+      std::lock_guard<concurrency::SpinLock> otherLock(other.m_lock);
+
+      m_fullyInitialized = other.m_fullyInitialized;
+      m_map = std::move(other.m_map);
+
+    }
+
+    return *this;
+
   }
 
   /**
@@ -79,6 +116,20 @@ public:
    * @param value
    */
   void put(const Key& key, const StringKeyLabel& value) {
+
+    std::lock_guard<concurrency::SpinLock> lock(m_lock);
+
+    m_map.insert({key, value});
+    m_fullyInitialized = false;
+
+  }
+
+  /**
+   * Put value to map. Not thread-safe.
+   * @param key
+   * @param value
+   */
+  void put_LockFree(const Key& key, const StringKeyLabel& value) {
     m_map.insert({key, value});
     m_fullyInitialized = false;
   }
@@ -91,6 +142,28 @@ public:
    */
   bool putIfNotExists(const Key& key, const StringKeyLabel& value) {
 
+    std::lock_guard<concurrency::SpinLock> lock(m_lock);
+
+    auto it = m_map.find(key);
+
+    if(it == m_map.end()) {
+      m_map.insert({key, value});
+      m_fullyInitialized = false;
+      return true;
+    }
+
+    return false;
+
+  }
+
+  /**
+   * Put value to map if not already exists. Not thread-safe.
+   * @param key
+   * @param value
+   * @return
+   */
+  bool putIfNotExists_LockFree(const Key& key, const StringKeyLabel& value) {
+
     auto it = m_map.find(key);
 
     if(it == m_map.end()) {
@@ -110,6 +183,8 @@ public:
    */
   oatpp::String get(const Key& key) const {
 
+    std::lock_guard<concurrency::SpinLock> lock(m_lock);
+
     auto it = m_map.find(key);
 
     if(it != m_map.end()) {
@@ -131,6 +206,8 @@ public:
   template<class T>
   T getAsMemoryLabel(const Key& key) const {
 
+    std::lock_guard<concurrency::SpinLock> lock(m_lock);
+
     auto it = m_map.find(key);
 
     if(it != m_map.end()) {
@@ -153,6 +230,8 @@ public:
   template<class T>
   T getAsMemoryLabel_Unsafe(const Key& key) const {
 
+    std::lock_guard<concurrency::SpinLock> lock(m_lock);
+
     auto it = m_map.find(key);
 
     if(it != m_map.end()) {
@@ -170,6 +249,8 @@ public:
    */
   const std::unordered_map<Key, StringKeyLabel>& getAll() const {
 
+    std::lock_guard<concurrency::SpinLock> lock(m_lock);
+
     if(!m_fullyInitialized) {
 
       for(auto& pair : m_map) {
@@ -197,6 +278,7 @@ public:
    * @return
    */
   v_int32 getSize() const {
+    std::lock_guard<concurrency::SpinLock> lock(m_lock);
     return (v_int32) m_map.size();
   }
 

+ 3 - 3
src/oatpp/network/Url.cpp

@@ -116,10 +116,10 @@ void Url::Parser::parseQueryParams(Url::Parameters& params, oatpp::parser::Caret
         caret.inc();
         auto valueLabel = caret.putLabel();
         caret.findChar('&');
-        params.put(StringKeyLabel(caret.getDataMemoryHandle(), nameLabel.getData(), nameLabel.getSize()),
-                   StringKeyLabel(caret.getDataMemoryHandle(), valueLabel.getData(), valueLabel.getSize()));
+        params.put_LockFree(StringKeyLabel(caret.getDataMemoryHandle(), nameLabel.getData(), nameLabel.getSize()),
+                            StringKeyLabel(caret.getDataMemoryHandle(), valueLabel.getData(), valueLabel.getSize()));
       } else {
-        params.put(StringKeyLabel(caret.getDataMemoryHandle(), nameLabel.getData(), nameLabel.getSize()), "");
+        params.put_LockFree(StringKeyLabel(caret.getDataMemoryHandle(), nameLabel.getData(), nameLabel.getSize()), "");
       }
     } while (caret.canContinueAtChar('&'));
 

+ 6 - 6
src/oatpp/network/server/SimpleTCPConnectionProvider.cpp

@@ -243,9 +243,9 @@ std::shared_ptr<oatpp::data::stream::IOStream> SimpleTCPConnectionProvider::getE
     struct sockaddr_in* sockAddress = (struct sockaddr_in*) &clientAddress;
     inet_ntop(AF_INET, &sockAddress->sin_addr, strIp, INET_ADDRSTRLEN);
 
-    properties.put(ExtendedConnection::PROPERTY_PEER_ADDRESS, oatpp::String((const char*) strIp));
-    properties.put(ExtendedConnection::PROPERTY_PEER_ADDRESS_FORMAT, "ipv4");
-    properties.put(ExtendedConnection::PROPERTY_PEER_PORT, oatpp::utils::conversion::int32ToStr(sockAddress->sin_port));
+    properties.put_LockFree(ExtendedConnection::PROPERTY_PEER_ADDRESS, oatpp::String((const char*) strIp));
+    properties.put_LockFree(ExtendedConnection::PROPERTY_PEER_ADDRESS_FORMAT, "ipv4");
+    properties.put_LockFree(ExtendedConnection::PROPERTY_PEER_PORT, oatpp::utils::conversion::int32ToStr(sockAddress->sin_port));
 
   } else if (clientAddress.ss_family == AF_INET6) {
 
@@ -253,9 +253,9 @@ std::shared_ptr<oatpp::data::stream::IOStream> SimpleTCPConnectionProvider::getE
     struct sockaddr_in6* sockAddress = (struct sockaddr_in6*) &clientAddress;
     inet_ntop(AF_INET6, &sockAddress->sin6_addr, strIp, INET6_ADDRSTRLEN);
 
-    properties.put(ExtendedConnection::PROPERTY_PEER_ADDRESS, oatpp::String((const char*) strIp));
-    properties.put(ExtendedConnection::PROPERTY_PEER_ADDRESS_FORMAT, "ipv6");
-    properties.put(ExtendedConnection::PROPERTY_PEER_PORT, oatpp::utils::conversion::int32ToStr(sockAddress->sin6_port));
+    properties.put_LockFree(ExtendedConnection::PROPERTY_PEER_ADDRESS, oatpp::String((const char*) strIp));
+    properties.put_LockFree(ExtendedConnection::PROPERTY_PEER_ADDRESS_FORMAT, "ipv6");
+    properties.put_LockFree(ExtendedConnection::PROPERTY_PEER_PORT, oatpp::utils::conversion::int32ToStr(sockAddress->sin6_port));
 
   } else {
 

+ 3 - 0
src/oatpp/parser/json/Beautifier.hpp

@@ -29,6 +29,9 @@
 
 namespace oatpp { namespace parser { namespace json {
 
+/**
+ * JSON output stream beautifier.
+ */
 class Beautifier : public oatpp::data::stream::ConsistentOutputStream {
 public:
   typedef oatpp::data::stream::ConsistentOutputStream ConsistentOutputStream;

+ 1 - 1
src/oatpp/web/client/ApiClient.cpp

@@ -118,7 +118,7 @@ oatpp::web::protocol::http::Headers ApiClient::convertParamsMap(const std::share
     auto curr = params->getFirstEntry();
     
     while (curr != nullptr) {
-      result.put(curr->getKey(), oatpp::utils::conversion::primitiveToStr(curr->getValue()));
+      result.put_LockFree(curr->getKey(), oatpp::utils::conversion::primitiveToStr(curr->getValue()));
       curr = curr->getNext();
     }
   }

+ 1 - 1
src/oatpp/web/protocol/http/Http.cpp

@@ -346,7 +346,7 @@ void Parser::parseOneHeader(Headers& headers,
     caret.skipChar(' ');
     v_buff_size valuePos0 = caret.getPosition();
     caret.findRN();
-    headers.put(name, oatpp::data::share::StringKeyLabel(headersText, &caret.getData()[valuePos0], caret.getPosition() - valuePos0));
+    headers.put_LockFree(name, oatpp::data::share::StringKeyLabel(headersText, &caret.getData()[valuePos0], caret.getPosition() - valuePos0));
     caret.skipRN();
   } else {
     error = Status::CODE_431;

+ 1 - 1
src/oatpp/web/protocol/http/outgoing/BufferBody.cpp

@@ -48,7 +48,7 @@ std::shared_ptr<BufferBody> BufferBody::createShared(const oatpp::String& buffer
 }
 
 void BufferBody::declareHeaders(Headers& headers) noexcept {
-  headers.put(oatpp::web::protocol::http::Header::CONTENT_LENGTH, oatpp::utils::conversion::int64ToStr(m_buffer->getSize()));
+  headers.put_LockFree(oatpp::web::protocol::http::Header::CONTENT_LENGTH, oatpp::utils::conversion::int64ToStr(m_buffer->getSize()));
 }
 
 void BufferBody::writeToStream(OutputStream* stream) noexcept {

+ 1 - 2
src/oatpp/web/protocol/http/outgoing/ChunkedBody.cpp

@@ -47,10 +47,9 @@ bool ChunkedBody::writeData(OutputStream* stream, const void* data, v_buff_size
 }
 
 void ChunkedBody::declareHeaders(Headers& headers) noexcept {
-  headers.put(oatpp::web::protocol::http::Header::TRANSFER_ENCODING, oatpp::web::protocol::http::Header::Value::TRANSFER_ENCODING_CHUNKED);
+  headers.put_LockFree(oatpp::web::protocol::http::Header::TRANSFER_ENCODING, oatpp::web::protocol::http::Header::Value::TRANSFER_ENCODING_CHUNKED);
 }
 
-
 void ChunkedBody::writeToStream(OutputStream* stream) noexcept {
 
   if(stream->getOutputStreamIOMode() != oatpp::data::stream::IOMode::BLOCKING) {

+ 1 - 1
src/oatpp/web/protocol/http/outgoing/ChunkedBufferBody.cpp

@@ -35,7 +35,7 @@ std::shared_ptr<ChunkedBufferBody> ChunkedBufferBody::createShared(const std::sh
 }
 
 void ChunkedBufferBody::declareHeaders(Headers& headers) noexcept {
-  headers.put(oatpp::web::protocol::http::Header::CONTENT_LENGTH, oatpp::utils::conversion::int64ToStr(m_buffer->getSize()));
+  headers.put_LockFree(oatpp::web::protocol::http::Header::CONTENT_LENGTH, oatpp::utils::conversion::int64ToStr(m_buffer->getSize()));
 }
 
 void ChunkedBufferBody::writeToStream(OutputStream* stream) noexcept {

+ 1 - 1
src/oatpp/web/protocol/http/outgoing/DtoBody.cpp

@@ -43,7 +43,7 @@ void DtoBody::declareHeaders(Headers& headers) noexcept {
     m_objectMapper->write(m_buffer, m_dto);
   }
   ChunkedBufferBody::declareHeaders(headers);
-  headers.putIfNotExists(Header::CONTENT_TYPE, m_objectMapper->getInfo().http_content_type);
+  headers.putIfNotExists_LockFree(Header::CONTENT_TYPE, m_objectMapper->getInfo().http_content_type);
 }
 
 v_buff_size DtoBody::getKnownSize() {

+ 2 - 2
src/oatpp/web/protocol/http/outgoing/MultipartBody.cpp

@@ -306,11 +306,11 @@ MultipartBody::MultipartBody(const std::shared_ptr<Multipart>& multipart, data::
 
 void MultipartBody::declareHeaders(Headers& headers) noexcept {
   if(m_multipart->getAllParts().empty()) {
-    headers.put(oatpp::web::protocol::http::Header::CONTENT_LENGTH, "0");
+    headers.put_LockFree(oatpp::web::protocol::http::Header::CONTENT_LENGTH, "0");
     return;
   }
   ChunkedBody::declareHeaders(headers);
-  headers.put(oatpp::web::protocol::http::Header::CONTENT_TYPE, "multipart/form-data; boundary=" + m_multipart->getBoundary());
+  headers.put_LockFree(oatpp::web::protocol::http::Header::CONTENT_TYPE, "multipart/form-data; boundary=" + m_multipart->getBoundary());
 }
 
 void MultipartBody::writeToStream(OutputStream* stream) noexcept {

+ 2 - 2
src/oatpp/web/protocol/http/outgoing/Request.cpp

@@ -73,7 +73,7 @@ void Request::send(data::stream::OutputStream* stream){
   if(m_body){
     m_body->declareHeaders(m_headers);
   } else {
-    m_headers.put(Header::CONTENT_LENGTH, "0");
+    m_headers.put_LockFree(Header::CONTENT_LENGTH, "0");
   }
 
   oatpp::data::stream::BufferOutputStream buffer;
@@ -130,7 +130,7 @@ oatpp::async::CoroutineStarter Request::sendAsync(std::shared_ptr<Request> _this
       if(m_this->m_body){
         m_this->m_body->declareHeaders(m_this->m_headers);
       } else {
-        m_this->m_headers.put(Header::CONTENT_LENGTH, "0");
+        m_this->m_headers.put_LockFree(Header::CONTENT_LENGTH, "0");
       }
       
       m_headersWriteBuffer->write(m_this->m_method.getData(), m_this->m_method.getSize());

+ 2 - 2
src/oatpp/web/protocol/http/outgoing/Response.cpp

@@ -78,7 +78,7 @@ void Response::send(data::stream::OutputStream* stream, oatpp::data::stream::Buf
   if(m_body){
     m_body->declareHeaders(m_headers);
   } else {
-    m_headers.put(Header::CONTENT_LENGTH, "0");
+    m_headers.put_LockFree(Header::CONTENT_LENGTH, "0");
   }
 
   headersWriteBuffer->setCurrentPosition(0);
@@ -136,7 +136,7 @@ oatpp::async::CoroutineStarter Response::sendAsync(const std::shared_ptr<Respons
       if(m_this->m_body){
         m_this->m_body->declareHeaders(m_this->m_headers);
       } else {
-        m_this->m_headers.put(Header::CONTENT_LENGTH, "0");
+        m_this->m_headers.put_LockFree(Header::CONTENT_LENGTH, "0");
       }
 
       m_headersWriteBuffer->setCurrentPosition(0);

+ 1 - 1
src/oatpp/web/server/handler/AuthorizationHandler.cpp

@@ -42,7 +42,7 @@ void AuthorizationHandler::renderAuthenticateHeaderValue(ChunkedBuffer& stream)
 void AuthorizationHandler::addErrorResponseHeaders(Headers& headers) {
   ChunkedBuffer stream;
   renderAuthenticateHeaderValue(stream);
-  headers.put(protocol::http::Header::WWW_AUTHENTICATE, stream.toString());
+  headers.put_LockFree(protocol::http::Header::WWW_AUTHENTICATE, stream.toString());
 }
 
 oatpp::String AuthorizationHandler::getScheme() {