Explorar el Código

Merge pull request #514 from oatpp/move_fastqueue

async: move FastQueue to async::utils::FastQueue.
Leonid Stryzhevskyi hace 2 años
padre
commit
ba27a57fff

+ 1 - 1
src/CMakeLists.txt

@@ -26,6 +26,7 @@ add_library(oatpp
         oatpp/codegen/DTO_define.hpp
         oatpp/codegen/DTO_undef.hpp
         oatpp/core/Types.hpp
+        oatpp/core/async/utils/FastQueue.hpp
         oatpp/core/async/Coroutine.cpp
         oatpp/core/async/Coroutine.hpp
         oatpp/core/async/CoroutineWaitList.cpp
@@ -63,7 +64,6 @@ add_library(oatpp
         oatpp/core/base/memory/MemoryPool.hpp
         oatpp/core/base/memory/ObjectPool.cpp
         oatpp/core/base/memory/ObjectPool.hpp
-        oatpp/core/collection/FastQueue.hpp
         oatpp/core/concurrency/SpinLock.cpp
         oatpp/core/concurrency/SpinLock.hpp
         oatpp/core/concurrency/Thread.cpp

+ 3 - 3
src/oatpp/core/async/Coroutine.hpp

@@ -28,9 +28,9 @@
 
 #include "./Error.hpp"
 
-#include "oatpp/core/IODefinitions.hpp"
+#include "oatpp/core/async/utils/FastQueue.hpp"
 
-#include "oatpp/core/collection/FastQueue.hpp"
+#include "oatpp/core/IODefinitions.hpp"
 #include "oatpp/core/base/memory/MemoryPool.hpp"
 #include "oatpp/core/base/Environment.hpp"
 
@@ -420,7 +420,7 @@ public:
  * This class manages coroutines processing state and a chain of coroutine calls.
  */
 class CoroutineHandle : public oatpp::base::Countable {
-  friend oatpp::collection::FastQueue<CoroutineHandle>;
+  friend utils::FastQueue<CoroutineHandle>;
   friend Processor;
   friend worker::Worker;
   friend CoroutineWaitList;

+ 2 - 2
src/oatpp/core/async/CoroutineWaitList.hpp

@@ -27,7 +27,7 @@
 #define oatpp_async_CoroutineWaitList_hpp
 
 #include "oatpp/core/async/Coroutine.hpp"
-#include "oatpp/core/collection/FastQueue.hpp"
+#include "oatpp/core/async/utils/FastQueue.hpp"
 
 #include "oatpp/core/concurrency/SpinLock.hpp"
 #include <map>
@@ -60,7 +60,7 @@ public:
     virtual void onNewItem(CoroutineWaitList& list) = 0;
   };
 private:
-  oatpp::collection::FastQueue<CoroutineHandle> m_list;
+  utils::FastQueue<CoroutineHandle> m_list;
   oatpp::concurrency::SpinLock m_lock;
   Listener* m_listener = nullptr;
   

+ 1 - 1
src/oatpp/core/async/Executor.cpp

@@ -52,7 +52,7 @@ void Executor::SubmissionProcessor::run() {
   
 }
 
-void Executor::SubmissionProcessor::pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks) {
+void Executor::SubmissionProcessor::pushTasks(utils::FastQueue<CoroutineHandle>& tasks) {
   (void)tasks;
   throw std::runtime_error("[oatpp::async::Executor::SubmissionProcessor::pushTasks]: Error. This method does nothing.");
 }

+ 1 - 1
src/oatpp/core/async/Executor.hpp

@@ -63,7 +63,7 @@ private:
 
     oatpp::async::Processor& getProcessor();
 
-    void pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks) override;
+    void pushTasks(utils::FastQueue<CoroutineHandle>& tasks) override;
 
     void pushOneTask(CoroutineHandle* task) override;
 

+ 6 - 6
src/oatpp/core/async/Processor.cpp

@@ -67,12 +67,12 @@ void Processor::addWorker(const std::shared_ptr<worker::Worker>& worker) {
 
     case worker::Worker::Type::IO:
       m_ioWorkers.push_back(worker);
-      m_ioPopQueues.push_back(collection::FastQueue<CoroutineHandle>());
+      m_ioPopQueues.push_back(utils::FastQueue<CoroutineHandle>());
     break;
 
     case worker::Worker::Type::TIMER:
       m_timerWorkers.push_back(worker);
-      m_timerPopQueues.push_back(collection::FastQueue<CoroutineHandle>());
+      m_timerPopQueues.push_back(utils::FastQueue<CoroutineHandle>());
     break;
 
     default:
@@ -154,10 +154,10 @@ void Processor::pushOneTask(CoroutineHandle* coroutine) {
   m_taskCondition.notify_one();
 }
 
-void Processor::pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks) {
+void Processor::pushTasks(utils::FastQueue<CoroutineHandle>& tasks) {
   {
     std::lock_guard<oatpp::concurrency::SpinLock> lock(m_taskLock);
-    collection::FastQueue<CoroutineHandle>::moveAll(tasks, m_pushList);
+    utils::FastQueue<CoroutineHandle>::moveAll(tasks, m_pushList);
   }
   m_taskCondition.notify_one();
 }
@@ -196,12 +196,12 @@ void Processor::consumeAllTasks() {
 
 void Processor::pushQueues() {
 
-  oatpp::collection::FastQueue<CoroutineHandle> tmpList;
+  utils::FastQueue<CoroutineHandle> tmpList;
 
   {
     std::lock_guard<oatpp::concurrency::SpinLock> lock(m_taskLock);
     consumeAllTasks();
-    oatpp::collection::FastQueue<CoroutineHandle>::moveAll(m_pushList, tmpList);
+    utils::FastQueue<CoroutineHandle>::moveAll(m_pushList, tmpList);
   }
 
   while(tmpList.first != nullptr) {

+ 7 - 7
src/oatpp/core/async/Processor.hpp

@@ -28,7 +28,7 @@
 
 #include "./Coroutine.hpp"
 #include "./CoroutineWaitList.hpp"
-#include "oatpp/core/collection/FastQueue.hpp"
+#include "oatpp/core/async/utils/FastQueue.hpp"
 
 #include <condition_variable>
 #include <list>
@@ -93,8 +93,8 @@ private:
   std::vector<std::shared_ptr<worker::Worker>> m_ioWorkers;
   std::vector<std::shared_ptr<worker::Worker>> m_timerWorkers;
 
-  std::vector<oatpp::collection::FastQueue<CoroutineHandle>> m_ioPopQueues;
-  std::vector<oatpp::collection::FastQueue<CoroutineHandle>> m_timerPopQueues;
+  std::vector<utils::FastQueue<CoroutineHandle>> m_ioPopQueues;
+  std::vector<utils::FastQueue<CoroutineHandle>> m_timerPopQueues;
 
   v_uint32 m_ioBalancer = 0;
   v_uint32 m_timerBalancer = 0;
@@ -104,11 +104,11 @@ private:
   oatpp::concurrency::SpinLock m_taskLock;
   std::condition_variable_any m_taskCondition;
   std::list<std::shared_ptr<TaskSubmission>> m_taskList;
-  oatpp::collection::FastQueue<CoroutineHandle> m_pushList;
+  utils::FastQueue<CoroutineHandle> m_pushList;
 
 private:
 
-  oatpp::collection::FastQueue<CoroutineHandle> m_queue;
+  utils::FastQueue<CoroutineHandle> m_queue;
 
 private:
 
@@ -154,9 +154,9 @@ public:
 
   /**
    * Push list of Coroutines back to processor.
-   * @param tasks - &id:oatpp::collection::FastQueue; of &id:oatpp::async::CoroutineHandle; previously popped-out(rescheduled to coworker) from this processor.
+   * @param tasks - &id:oatpp::async::utils::FastQueue; of &id:oatpp::async::CoroutineHandle; previously popped-out(rescheduled to coworker) from this processor.
    */
-  void pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks);
+  void pushTasks(utils::FastQueue<CoroutineHandle>& tasks);
 
   /**
    * Execute Coroutine.

+ 5 - 5
src/oatpp/core/collection/FastQueue.hpp → src/oatpp/core/async/utils/FastQueue.hpp

@@ -22,13 +22,13 @@
  *
  ***************************************************************************/
 
-#ifndef oatpp_collection_FastQueue_hpp
-#define oatpp_collection_FastQueue_hpp
+#ifndef oatpp_async_utils_FastQueue_hpp
+#define oatpp_async_utils_FastQueue_hpp
 
 #include "oatpp/core/concurrency/SpinLock.hpp"
 #include "oatpp/core/base/Environment.hpp"
 
-namespace oatpp { namespace collection {
+namespace oatpp { namespace async { namespace utils {
   
 template<typename T>
 class FastQueue {
@@ -179,6 +179,6 @@ public:
   
 };
   
-}}
+}}}
 
-#endif /* FastQueue_hpp */
+#endif /* oatpp_async_utils_FastQueue_hpp */

+ 5 - 5
src/oatpp/core/async/worker/IOEventWorker.hpp

@@ -75,7 +75,7 @@ private:
   IOEventWorkerForeman* m_foreman;
   Action::IOEventType m_specialization;
   std::atomic<bool> m_running;
-  oatpp::collection::FastQueue<CoroutineHandle> m_backlog;
+  utils::FastQueue<CoroutineHandle> m_backlog;
   oatpp::concurrency::SpinLock m_backlogLock;
 private:
   oatpp::v_io_handle m_eventQueueHandle;
@@ -108,9 +108,9 @@ public:
 
   /**
    * Push list of tasks to worker.
-   * @param tasks - &id:oatpp::collection::FastQueue; of &id:oatpp::async::CoroutineHandle;.
+   * @param tasks - &id:oatpp::async::utils::FastQueue; of &id:oatpp::async::CoroutineHandle;.
    */
-  void pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks) override;
+  void pushTasks(utils::FastQueue<CoroutineHandle>& tasks) override;
 
   /**
    * Push one task to worker.
@@ -162,9 +162,9 @@ public:
 
   /**
    * Push list of tasks to worker.
-   * @param tasks - &id:oatpp::collection::FastQueue; of &id:oatpp::async::CoroutineHandle;.
+   * @param tasks - &id:oatpp::async::utils::FastQueue; of &id:oatpp::async::CoroutineHandle;.
    */
-  void pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks) override;
+  void pushTasks(utils::FastQueue<CoroutineHandle>& tasks) override;
 
   /**
    * Push one task to worker.

+ 5 - 5
src/oatpp/core/async/worker/IOEventWorker_common.cpp

@@ -64,11 +64,11 @@ IOEventWorker::~IOEventWorker() {
 }
 
 
-void IOEventWorker::pushTasks(oatpp::collection::FastQueue<CoroutineHandle> &tasks) {
+void IOEventWorker::pushTasks(utils::FastQueue<CoroutineHandle> &tasks) {
   if (tasks.first != nullptr) {
     {
       std::lock_guard<oatpp::concurrency::SpinLock> guard(m_backlogLock);
-      oatpp::collection::FastQueue<CoroutineHandle>::moveAll(tasks, m_backlog);
+      utils::FastQueue<CoroutineHandle>::moveAll(tasks, m_backlog);
     }
     triggerWakeup();
   }
@@ -121,10 +121,10 @@ IOEventWorkerForeman::IOEventWorkerForeman()
 IOEventWorkerForeman::~IOEventWorkerForeman() {
 }
 
-void IOEventWorkerForeman::pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks) {
+void IOEventWorkerForeman::pushTasks(utils::FastQueue<CoroutineHandle>& tasks) {
 
-  oatpp::collection::FastQueue<CoroutineHandle> readerTasks;
-  oatpp::collection::FastQueue<CoroutineHandle> writerTasks;
+  utils::FastQueue<CoroutineHandle> readerTasks;
+  utils::FastQueue<CoroutineHandle> writerTasks;
 
   while(tasks.first != nullptr) {
 

+ 1 - 1
src/oatpp/core/async/worker/IOEventWorker_epoll.cpp

@@ -169,7 +169,7 @@ void IOEventWorker::waitEvents() {
     throw std::runtime_error("[oatpp::async::worker::IOEventWorker::waitEvents()]: Error. Event loop failed.");
   }
 
-  oatpp::collection::FastQueue<CoroutineHandle> popQueue;
+  utils::FastQueue<CoroutineHandle> popQueue;
 
   for(v_int32 i = 0; i < eventsCount; i ++) {
 

+ 3 - 3
src/oatpp/core/async/worker/IOEventWorker_kqueue.cpp

@@ -168,8 +168,8 @@ void IOEventWorker::waitEvents() {
     throw std::runtime_error("[oatpp::async::worker::IOEventWorker::waitEvents()]: Error. Event loop failed.");
   }
 
-  oatpp::collection::FastQueue<CoroutineHandle> repeatQueue;
-  oatpp::collection::FastQueue<CoroutineHandle> popQueue;
+  utils::FastQueue<CoroutineHandle> repeatQueue;
+  utils::FastQueue<CoroutineHandle> popQueue;
 
   for(v_int32 i = 0; i < eventsCount; i ++) {
 
@@ -230,7 +230,7 @@ void IOEventWorker::waitEvents() {
   if(repeatQueue.count > 0) {
     {
       std::lock_guard<oatpp::concurrency::SpinLock> lock(m_backlogLock);
-      oatpp::collection::FastQueue<CoroutineHandle>::moveAll(repeatQueue, m_backlog);
+      utils::FastQueue<CoroutineHandle>::moveAll(repeatQueue, m_backlog);
     }
   }
 

+ 4 - 4
src/oatpp/core/async/worker/IOWorker.cpp

@@ -37,10 +37,10 @@ IOWorker::IOWorker()
   m_thread = std::thread(&IOWorker::run, this);
 }
 
-void IOWorker::pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks) {
+void IOWorker::pushTasks(utils::FastQueue<CoroutineHandle>& tasks) {
   {
     std::lock_guard<oatpp::concurrency::SpinLock> guard(m_backlogLock);
-    oatpp::collection::FastQueue<CoroutineHandle>::moveAll(tasks, m_backlog);
+    utils::FastQueue<CoroutineHandle>::moveAll(tasks, m_backlog);
   }
   m_backlogCondition.notify_one();
 }
@@ -61,12 +61,12 @@ void IOWorker::consumeBacklog(bool blockToConsume) {
     while (m_backlog.first == nullptr && m_running) {
       m_backlogCondition.wait(lock);
     }
-    oatpp::collection::FastQueue<CoroutineHandle>::moveAll(m_backlog, m_queue);
+    utils::FastQueue<CoroutineHandle>::moveAll(m_backlog, m_queue);
   } else {
 
     std::unique_lock<oatpp::concurrency::SpinLock> lock(m_backlogLock, std::try_to_lock);
     if (lock.owns_lock()) {
-      oatpp::collection::FastQueue<CoroutineHandle>::moveAll(m_backlog, m_queue);
+      utils::FastQueue<CoroutineHandle>::moveAll(m_backlog, m_queue);
     }
 
   }

+ 4 - 4
src/oatpp/core/async/worker/IOWorker.hpp

@@ -41,8 +41,8 @@ namespace oatpp { namespace async { namespace worker {
 class IOWorker : public Worker {
 private:
   bool m_running;
-  oatpp::collection::FastQueue<CoroutineHandle> m_backlog;
-  oatpp::collection::FastQueue<CoroutineHandle> m_queue;
+  utils::FastQueue<CoroutineHandle> m_backlog;
+  utils::FastQueue<CoroutineHandle> m_queue;
   oatpp::concurrency::SpinLock m_backlogLock;
   std::condition_variable_any m_backlogCondition;
 private:
@@ -58,9 +58,9 @@ public:
 
   /**
   * Push list of tasks to worker.
-  * @param tasks - &id:oatpp::collection::FastQueue; of &id:oatpp::async::CoroutineHandle;.
+  * @param tasks - &id:oatpp::async::utils::FastQueue; of &id:oatpp::async::CoroutineHandle;.
   */
-  void pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks) override;
+  void pushTasks(utils::FastQueue<CoroutineHandle>& tasks) override;
 
   /**
   * Push one task to worker.

+ 3 - 3
src/oatpp/core/async/worker/TimerWorker.cpp

@@ -38,10 +38,10 @@ TimerWorker::TimerWorker(const std::chrono::duration<v_int64, std::micro>& granu
   m_thread = std::thread(&TimerWorker::run, this);
 }
 
-void TimerWorker::pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks) {
+void TimerWorker::pushTasks(utils::FastQueue<CoroutineHandle>& tasks) {
   {
     std::lock_guard<oatpp::concurrency::SpinLock> guard(m_backlogLock);
-    oatpp::collection::FastQueue<CoroutineHandle>::moveAll(tasks, m_backlog);
+    utils::FastQueue<CoroutineHandle>::moveAll(tasks, m_backlog);
   }
   m_backlogCondition.notify_one();
 }
@@ -52,7 +52,7 @@ void TimerWorker::consumeBacklog() {
   while (m_backlog.first == nullptr && m_queue.first == nullptr && m_running) {
     m_backlogCondition.wait(lock);
   }
-  oatpp::collection::FastQueue<CoroutineHandle>::moveAll(m_backlog, m_queue);
+  utils::FastQueue<CoroutineHandle>::moveAll(m_backlog, m_queue);
 
 }
 

+ 4 - 4
src/oatpp/core/async/worker/TimerWorker.hpp

@@ -41,8 +41,8 @@ namespace oatpp { namespace async { namespace worker {
 class TimerWorker : public Worker {
 private:
   std::atomic<bool> m_running;
-  oatpp::collection::FastQueue<CoroutineHandle> m_backlog;
-  oatpp::collection::FastQueue<CoroutineHandle> m_queue;
+  utils::FastQueue<CoroutineHandle> m_backlog;
+  utils::FastQueue<CoroutineHandle> m_queue;
   oatpp::concurrency::SpinLock m_backlogLock;
   std::condition_variable_any m_backlogCondition;
 private:
@@ -61,9 +61,9 @@ public:
 
   /**
    * Push list of tasks to worker.
-   * @param tasks - &id:oatpp::collection::FastQueue; of &id:oatpp::async::CoroutineHandle;.
+   * @param tasks - &id:oatpp::aysnc::utils::FastQueue; of &id:oatpp::async::CoroutineHandle;.
    */
-  void pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks) override;
+  void pushTasks(utils::FastQueue<CoroutineHandle>& tasks) override;
 
   /**
    * Push one task to worker.

+ 2 - 2
src/oatpp/core/async/worker/Worker.hpp

@@ -87,9 +87,9 @@ public:
 
   /**
    * Push list of tasks to worker.
-   * @param tasks - &id:oatpp::collection::FastQueue; of &id:oatpp::async::CoroutineHandle;.
+   * @param tasks - &id:oatpp::async::utils::FastQueue; of &id:oatpp::async::CoroutineHandle;.
    */
-  virtual void pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks) = 0;
+  virtual void pushTasks(utils::FastQueue<CoroutineHandle>& tasks) = 0;
 
   /**
    * Push one task to worker.