瀏覽代碼

Introduce IOEventWorkerForeman to support full-duplex IO event mode

lganzzzo 5 年之前
父節點
當前提交
312fcae8a5

+ 4 - 0
src/oatpp/core/async/Coroutine.cpp

@@ -121,6 +121,10 @@ Action::IOEventType Action::getIOEventType() const {
   return m_data.ioData.ioEventType;
 }
 
+v_int32 Action::getIOEventCode() const {
+  return m_type | m_data.ioData.ioEventType;
+}
+
 ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 // CoroutineStarter
 

+ 44 - 2
src/oatpp/core/async/Coroutine.hpp

@@ -120,15 +120,51 @@ public:
     /**
      * IO event type READ.
      */
-    IO_EVENT_READ = 0,
+    IO_EVENT_READ = 256,
 
     /**
      * IO event type WRITE.
      */
-    IO_EVENT_WRITE = 1
+    IO_EVENT_WRITE = 512
 
   };
 
+  /**
+   * Convenience I/O Action Code.
+   * This code is applicable for Action of type TYPE_IO_WAIT only.
+   */
+  static constexpr const v_int32 CODE_IO_WAIT_READ = TYPE_IO_WAIT | IOEventType::IO_EVENT_READ;
+
+  /**
+   * Convenience I/O Action Code.
+   * This code is applicable for Action of type TYPE_IO_WAIT only.
+   */
+  static constexpr const v_int32 CODE_IO_WAIT_WRITE = TYPE_IO_WAIT | IOEventType::IO_EVENT_WRITE;
+
+  /**
+   * Convenience I/O Action Code.
+   * This code is applicable for Action of type TYPE_IO_WAIT only.
+   */
+  static constexpr const v_int32 CODE_IO_WAIT_RESCHEDULE = TYPE_IO_WAIT | IOEventType::IO_EVENT_READ | IOEventType::IO_EVENT_WRITE;
+
+  /**
+   * Convenience I/O Action Code.
+   * This code is applicable for Action of type TYPE_IO_REPEAT only.
+   */
+  static constexpr const v_int32 CODE_IO_REPEAT_READ = TYPE_IO_REPEAT | IOEventType::IO_EVENT_READ;
+
+  /**
+   * Convenience I/O Action Code.
+   * This code is applicable for Action of type TYPE_IO_REPEAT only.
+   */
+  static constexpr const v_int32 CODE_IO_REPEAT_WRITE = TYPE_IO_REPEAT | IOEventType::IO_EVENT_WRITE;
+
+  /**
+   * Convenience I/O Action Code.
+   * This code is applicable for Action of type TYPE_IO_REPEAT only.
+   */
+  static constexpr const v_int32 CODE_IO_REPEAT_RESCHEDULE = TYPE_IO_REPEAT | IOEventType::IO_EVENT_READ | IOEventType::IO_EVENT_WRITE;
+
 private:
 
   struct IOData {
@@ -267,6 +303,12 @@ public:
    */
   IOEventType getIOEventType() const;
 
+  /**
+   * Convenience method to get I/O Event code.
+   * @return - `getType() | getIOEventType()`.
+   */
+  v_int32 getIOEventCode() const;
+
   
 };
 

+ 1 - 1
src/oatpp/core/async/Executor.cpp

@@ -90,7 +90,7 @@ Executor::Executor(v_int32 processorWorkersCount, v_int32 ioWorkersCount, v_int3
 
   std::vector<std::shared_ptr<worker::Worker>> ioWorkers;
   for(v_int32 i = 0; i < ioWorkersCount; i++) {
-    ioWorkers.push_back(std::make_shared<worker::IOEventWorker>());
+    ioWorkers.push_back(std::make_shared<worker::IOEventWorkerForeman>());
   }
 
   linkWorkers(ioWorkers);

+ 54 - 1
src/oatpp/core/async/worker/IOEventWorker.hpp

@@ -54,6 +54,8 @@
 
 namespace oatpp { namespace async { namespace worker {
 
+class IOEventWorkerForeman; // FWD
+
 /**
  * Event-based implementation of I/O worker.
  * <ul>
@@ -65,6 +67,8 @@ class IOEventWorker : public Worker {
 private:
   static constexpr const v_int32 MAX_EVENTS = 10000;
 private:
+  IOEventWorkerForeman* m_foreman;
+  Action::IOEventType m_specialization;
   bool m_running;
   oatpp::collection::FastQueue<AbstractCoroutine> m_backlog;
   oatpp::concurrency::SpinLock m_backlogLock;
@@ -89,7 +93,7 @@ public:
   /**
    * Constructor.
    */
-  IOEventWorker();
+  IOEventWorker(IOEventWorkerForeman* foreman, Action::IOEventType specialization);
 
   /**
    * Virtual destructor.
@@ -130,6 +134,55 @@ public:
 
 };
 
+/**
+ * Class responsible to assign I/O tasks to specific &l:IOEventWorker; according to worker's "specialization". <br>
+ * Needed in order to support full-duplex I/O mode without duplicating file-descriptors.
+ */
+class IOEventWorkerForeman : public Worker {
+private:
+  IOEventWorker m_reader;
+  IOEventWorker m_writer;
+public:
+
+  /**
+   * Constructor.
+   */
+  IOEventWorkerForeman();
+
+  /**
+   * Virtual destructor.
+   */
+  ~IOEventWorkerForeman();
+
+  /**
+   * Push list of tasks to worker.
+   * @param tasks - &id:oatpp::collection::FastQueue; of &id:oatpp::async::AbstractCoroutine;.
+   */
+  void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) override;
+
+  /**
+   * Push one task to worker.
+   * @param task - &id:AbstractCoroutine;.
+   */
+  void pushOneTask(AbstractCoroutine* task) override;
+
+  /**
+ * Break run loop.
+ */
+  void stop() override;
+
+  /**
+   * Join all worker-threads.
+   */
+  void join() override;
+
+  /**
+   * Detach all worker-threads.
+   */
+  void detach() override;
+
+};
+
 }}}
 
 #endif //oatpp_async_worker_IOEventWorker_hpp

+ 91 - 1
src/oatpp/core/async/worker/IOEventWorker_common.cpp

@@ -28,8 +28,13 @@
 
 namespace oatpp { namespace async { namespace worker {
 
-IOEventWorker::IOEventWorker()
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// IOEventWorker
+
+IOEventWorker::IOEventWorker(IOEventWorkerForeman* foreman, Action::IOEventType specialization)
   : Worker(Type::IO)
+  , m_foreman(foreman)
+  , m_specialization(specialization)
   , m_running(true)
   , m_eventQueueHandle(-1)
   , m_wakeupTrigger(-1)
@@ -105,4 +110,89 @@ void IOEventWorker::detach() {
   m_thread.detach();
 }
 
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// IOEventWorkerForeman
+
+IOEventWorkerForeman::IOEventWorkerForeman()
+  : Worker(Type::IO)
+  , m_reader(this, Action::IOEventType::IO_EVENT_READ)
+  , m_writer(this, Action::IOEventType::IO_EVENT_WRITE)
+{}
+
+IOEventWorkerForeman::~IOEventWorkerForeman() {
+}
+
+void IOEventWorkerForeman::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
+
+  oatpp::collection::FastQueue<AbstractCoroutine> readerTasks;
+  oatpp::collection::FastQueue<AbstractCoroutine> writerTasks;
+
+  while(tasks.first != nullptr) {
+
+    AbstractCoroutine* coroutine = tasks.popFront();
+    auto& action = getCoroutineScheduledAction(coroutine);
+
+    switch(action.getIOEventType()) {
+
+      case Action::IOEventType::IO_EVENT_READ:
+        readerTasks.pushBack(coroutine);
+        break;
+
+      case Action::IOEventType::IO_EVENT_WRITE:
+        writerTasks.pushBack(coroutine);
+        break;
+
+      default:
+        throw std::runtime_error("[oatpp::async::worker::IOEventWorkerForeman::pushTasks()]: Error. Unknown Action Event Type.");
+
+    }
+
+  }
+
+  if(readerTasks.first != nullptr) {
+    m_reader.pushTasks(readerTasks);
+  }
+
+  if(writerTasks.first != nullptr) {
+    m_writer.pushTasks(writerTasks);
+  }
+
+}
+
+void IOEventWorkerForeman::pushOneTask(AbstractCoroutine* task) {
+
+  auto& action = getCoroutineScheduledAction(task);
+
+  switch(action.getIOEventType()) {
+
+    case Action::IOEventType::IO_EVENT_READ:
+      m_reader.pushOneTask(task);
+      break;
+
+    case Action::IOEventType::IO_EVENT_WRITE:
+      m_writer.pushOneTask(task);
+      break;
+
+    default:
+      throw std::runtime_error("[oatpp::async::worker::IOEventWorkerForeman::pushTasks()]: Error. Unknown Action Event Type.");
+
+  }
+
+}
+
+void IOEventWorkerForeman::stop() {
+  m_writer.stop();
+  m_reader.stop();
+}
+
+void IOEventWorkerForeman::join() {
+  m_reader.join();
+  m_writer.join();
+}
+
+void IOEventWorkerForeman::detach() {
+  m_reader.detach();
+  m_writer.detach();
+}
+
 }}}

+ 57 - 4
src/oatpp/core/async/worker/IOEventWorker_epoll.cpp

@@ -150,6 +150,8 @@ void IOEventWorker::waitEvents() {
     throw std::runtime_error("[oatpp::async::worker::IOEventWorker::waitEvents()]: Error. Event loop failed.");
   }
 
+  oatpp::collection::FastQueue<AbstractCoroutine> popQueue;
+
   for(v_int32 i = 0; i < eventsCount; i ++) {
 
     struct epoll_event* event = (struct epoll_event*)&m_outEvents[i * sizeof(struct epoll_event)];
@@ -168,23 +170,70 @@ void IOEventWorker::waitEvents() {
 
         Action action = coroutine->iterate();
 
-        switch(action.getType()) {
+        int res;
+
+        switch(action.getIOEventCode() | m_specialization) {
+
+          case Action::CODE_IO_WAIT_READ:
+            setCoroutineScheduledAction(coroutine, std::move(action));
+            setCoroutineEvent(coroutine, EPOLL_CTL_MOD, nullptr);
+            break;
+
+          case Action::CODE_IO_WAIT_WRITE:
+            setCoroutineScheduledAction(coroutine, std::move(action));
+            setCoroutineEvent(coroutine, EPOLL_CTL_MOD, nullptr);
+            break;
 
-          case Action::TYPE_IO_WAIT:
+          case Action::CODE_IO_REPEAT_READ:
             setCoroutineScheduledAction(coroutine, std::move(action));
             setCoroutineEvent(coroutine, EPOLL_CTL_MOD, nullptr);
             break;
 
-          case Action::TYPE_IO_REPEAT:
+          case Action::CODE_IO_REPEAT_WRITE:
             setCoroutineScheduledAction(coroutine, std::move(action));
             setCoroutineEvent(coroutine, EPOLL_CTL_MOD, nullptr);
             break;
 
+          case Action::CODE_IO_WAIT_RESCHEDULE:
+
+            res = epoll_ctl(m_eventQueueHandle, EPOLL_CTL_DEL, action.getIOHandle(), nullptr);
+            if(res == -1) {
+              OATPP_LOGD(
+                "[oatpp::async::worker::IOEventWorker::waitEvents()]",
+                "Error. Call to epoll_ctl failed. operation=%d, errno=%d. action_code=%d, worker_specialization=%d",
+                EPOLL_CTL_DEL, errno, action.getIOEventCode(), m_specialization
+              );
+              throw std::runtime_error("[oatpp::async::worker::IOEventWorker::waitEvents()]: Error. Call to epoll_ctl failed.");
+            }
+
+            setCoroutineScheduledAction(coroutine, std::move(action));
+            popQueue.pushBack(coroutine);
+
+            break;
+
+          case Action::CODE_IO_REPEAT_RESCHEDULE:
+
+            res = epoll_ctl(m_eventQueueHandle, EPOLL_CTL_DEL, action.getIOHandle(), nullptr);
+            if(res == -1) {
+              OATPP_LOGD(
+                "[oatpp::async::worker::IOEventWorker::waitEvents()]",
+                "Error. Call to epoll_ctl failed. operation=%d, errno=%d. action_code=%d, worker_specialization=%d",
+                EPOLL_CTL_DEL, errno, action.getIOEventCode(), m_specialization
+              );
+              throw std::runtime_error("[oatpp::async::worker::IOEventWorker::waitEvents()]: Error. Call to epoll_ctl failed.");
+            }
+
+            setCoroutineScheduledAction(coroutine, std::move(action));
+            popQueue.pushBack(coroutine);
+
+            break;
+
+
           default:
 
             auto& prevAction = getCoroutineScheduledAction(coroutine);
 
-            auto res = epoll_ctl(m_eventQueueHandle, EPOLL_CTL_DEL, prevAction.getIOHandle(), nullptr);
+            res = epoll_ctl(m_eventQueueHandle, EPOLL_CTL_DEL, prevAction.getIOHandle(), nullptr);
             if(res == -1) {
               OATPP_LOGD("[oatpp::async::worker::IOEventWorker::waitEvents()]", "Error. Call to epoll_ctl failed. operation=%d, errno=%d", EPOLL_CTL_DEL, errno);
               throw std::runtime_error("[oatpp::async::worker::IOEventWorker::waitEvents()]: Error. Call to epoll_ctl failed.");
@@ -201,6 +250,10 @@ void IOEventWorker::waitEvents() {
 
   }
 
+  if(popQueue.count > 0) {
+    m_foreman->pushTasks(popQueue);
+  }
+
 }
 
 }}}

+ 33 - 8
src/oatpp/core/async/worker/IOEventWorker_kqueue.cpp

@@ -145,7 +145,8 @@ void IOEventWorker::waitEvents() {
     throw std::runtime_error("[oatpp::async::worker::IOEventWorker::waitEvents()]: Error. Event loop failed.");
   }
 
-  oatpp::collection::FastQueue<AbstractCoroutine> m_repeatQueue;
+  oatpp::collection::FastQueue<AbstractCoroutine> repeatQueue;
+  oatpp::collection::FastQueue<AbstractCoroutine> popQueue;
 
   for(v_int32 i = 0; i < eventsCount; i ++) {
 
@@ -161,16 +162,36 @@ void IOEventWorker::waitEvents() {
 
       Action action = coroutine->iterate();
 
-      switch(action.getType()) {
+      switch(action.getIOEventCode() | m_specialization) {
 
-        case Action::TYPE_IO_WAIT:
+        case Action::CODE_IO_WAIT_READ:
           setCoroutineScheduledAction(coroutine, std::move(action));
-          m_repeatQueue.pushBack(coroutine);
+          repeatQueue.pushBack(coroutine);
           break;
 
-        case Action::TYPE_IO_REPEAT:
+        case Action::CODE_IO_WAIT_WRITE:
           setCoroutineScheduledAction(coroutine, std::move(action));
-          m_repeatQueue.pushBack(coroutine);
+          repeatQueue.pushBack(coroutine);
+          break;
+
+        case Action::CODE_IO_REPEAT_READ:
+          setCoroutineScheduledAction(coroutine, std::move(action));
+          repeatQueue.pushBack(coroutine);
+          break;
+
+        case Action::CODE_IO_REPEAT_WRITE:
+          setCoroutineScheduledAction(coroutine, std::move(action));
+          repeatQueue.pushBack(coroutine);
+          break;
+
+        case Action::CODE_IO_WAIT_RESCHEDULE:
+          setCoroutineScheduledAction(coroutine, std::move(action));
+          popQueue.pushBack(coroutine);
+          break;
+
+        case Action::CODE_IO_REPEAT_RESCHEDULE:
+          setCoroutineScheduledAction(coroutine, std::move(action));
+          popQueue.pushBack(coroutine);
           break;
 
         default:
@@ -183,13 +204,17 @@ void IOEventWorker::waitEvents() {
 
   }
 
-  if(m_repeatQueue.count > 0) {
+  if(repeatQueue.count > 0) {
     {
       std::lock_guard<oatpp::concurrency::SpinLock> lock(m_backlogLock);
-      oatpp::collection::FastQueue<AbstractCoroutine>::moveAll(m_repeatQueue, m_backlog);
+      oatpp::collection::FastQueue<AbstractCoroutine>::moveAll(repeatQueue, m_backlog);
     }
   }
 
+  if(popQueue.count > 0) {
+    m_foreman->pushTasks(popQueue);
+  }
+
 }
 
 }}}

+ 8 - 6
src/oatpp/core/data/stream/Stream.hpp

@@ -60,7 +60,7 @@ public:
   virtual ~OutputStream() = default;
 
   /**
-   * Write data to stream up to count bytes, and return number of bytes actually written
+   * Write data to stream up to count bytes, and return number of bytes actually written. <br>
    * It is a legal case if return result < count. Caller should handle this!
    * @param data - data to write.
    * @param count - number of bytes to write.
@@ -69,8 +69,9 @@ public:
   virtual data::v_io_size write(const void *data, data::v_io_size count) = 0;
 
   /**
-   * Implementation of OutputStream must suggest async actions for I/O results.
-   * Suggested Action is used for scheduling coroutines in async::Executor.
+   * Implementation of OutputStream must suggest async actions for I/O results. <br>
+   * Suggested Action is used for scheduling coroutines in async::Executor. <br>
+   * **Stream MUST always give the same file-handle if applicable**
    * @param ioResult - result of the call to &l:OutputStream::write ();.
    * @return - &id:oatpp::async::Action;.
    */
@@ -129,7 +130,7 @@ public:
   virtual ~InputStream() = default;
 
   /**
-   * Read data from stream up to count bytes, and return number of bytes actually read
+   * Read data from stream up to count bytes, and return number of bytes actually read. <br>
    * It is a legal case if return result < count. Caller should handle this!
    * @param data - buffer to read dat to.
    * @param count - size of the buffer.
@@ -138,8 +139,9 @@ public:
   virtual data::v_io_size read(void *data, data::v_io_size count) = 0;
 
   /**
-   * Implementation of InputStream must suggest async actions for I/O results.
-   * Suggested Action is used for scheduling coroutines in async::Executor.
+   * Implementation of InputStream must suggest async actions for I/O results. <br>
+   * Suggested Action is used for scheduling coroutines in async::Executor. <br>
+   * **Stream MUST always give the same file-handle if applicable**
    * @param ioResult - result of the call to &l:InputStream::read ();.
    * @return - &id:oatpp::async::Action;.
    */