Browse Source

Add bthread/parking_lot.h bthread/remote_task_queue.h, fix .gitignore

Change-Id: I6e3172633ed8230a522241b67764c4278f805c7a
gejun 6 years ago
parent
commit
ad903bbbeb
3 changed files with 130 additions and 0 deletions
  1. 6 0
      .gitignore
  2. 56 0
      bthread/parking_lot.h
  3. 68 0
      bthread/remote_task_queue.h

+ 6 - 0
.gitignore

@@ -1,9 +1,15 @@
+#ignore all files without extension
 *
 !*.*
+!*/
+
 *.o
 *.a
+*.log
 *.pb.cc
 *.pb.h
 *.prof
 /output
 /test/output
+#ignore hidden files
+.*

+ 56 - 0
bthread/parking_lot.h

@@ -0,0 +1,56 @@
+// bthread - A M:N threading library to make applications more concurrent.
+// Copyright (c) 2017 Baidu.com, Inc. All Rights Reserved
+
+// Author: chenzhangyi01@baidu.com, gejun@baidu.com
+// Date: 2017/07/27 23:07:06
+
+#ifndef  PUBLIC_BTHREAD_PARKING_LOT_H
+#define  PUBLIC_BTHREAD_PARKING_LOT_H
+
+#include "base/atomicops.h"
+#include "bthread/sys_futex.h"
+
+namespace bthread {
+
+// Park idle workers.
+class BAIDU_CACHELINE_ALIGNMENT ParkingLot {
+public:
+    struct State {
+        bool stopped() const { return val & 1; }
+        int val;
+    };
+
+    ParkingLot() : _pending_signal(0) {}
+
+    // Wake up at most `num_task' workers.
+    // Returns #workers woken up.
+    int signal(int num_task) {
+        _pending_signal.fetch_add((num_task << 1), base::memory_order_release);
+        return futex_wake_private(&_pending_signal, num_task);
+    }
+
+    // Get a state for later wait().
+    State get_state() {
+        const State st = {_pending_signal.load(base::memory_order_acquire)};
+        return st;
+    }
+
+    // Wait for tasks.
+    // If the `expected_state' does not match, wait() may finish directly.
+    void wait(const State& expected_state) {
+        futex_wait_private(&_pending_signal, expected_state.val, NULL);
+    }
+
+    // Wakeup suspended wait() and make them unwaitable ever. 
+    void stop() {
+        _pending_signal.fetch_or(1);
+        futex_wake_private(&_pending_signal, 10000);
+    }
+private:
+    // higher 31 bits for signalling, MLB for stopping.
+    base::atomic<int> _pending_signal;
+};
+
+}  // namespace bthread
+
+#endif  //PUBLIC_BTHREAD_PARKING_LOT_H

+ 68 - 0
bthread/remote_task_queue.h

@@ -0,0 +1,68 @@
+// bthread - A M:N threading library to make applications more concurrent.
+// Copyright (c) 2017 Baidu.com, Inc. All Rights Reserved
+
+// Author: Ge,Jun (gejun@baidu.com)
+// Date: Sun, 22 Jan 2017
+
+#ifndef BAIDU_BTHREAD_REMOTE_TASK_QUEUE_H
+#define BAIDU_BTHREAD_REMOTE_TASK_QUEUE_H
+
+#include "base/containers/bounded_queue.h"
+#include "base/macros.h"
+
+namespace bthread {
+
+class TaskGroup;
+
+// A queue for storing bthreads created by non-workers. Since non-workers
+// randomly choose a TaskGroup to push which distributes the contentions,
+// this queue is simply implemented as a queue protected with a lock.
+// The function names should be self-explanatory.
+class RemoteTaskQueue {
+public:
+    RemoteTaskQueue() {}
+
+    int init(size_t cap) {
+        const size_t memsize = sizeof(bthread_t) * cap;
+        void* q_mem = malloc(memsize);
+        if (q_mem == NULL) {
+            return -1;
+        }
+        base::BoundedQueue<bthread_t> q(q_mem, memsize, base::OWNS_STORAGE);
+        _tasks.swap(q);
+        return 0;
+    }
+
+    bool pop(bthread_t* task) {
+        if (_tasks.empty()) {
+            return false;
+        }
+        _mutex.lock();
+        const bool result = _tasks.pop(task);
+        _mutex.unlock();
+        return result;
+    }
+
+    bool push(bthread_t task) {
+        _mutex.lock();
+        const bool res = push_locked(task);
+        _mutex.unlock();
+        return res;
+    }
+
+    bool push_locked(bthread_t task) {
+        return _tasks.push(task);
+    }
+
+    size_t capacity() const { return _tasks.capacity(); }
+    
+private:
+friend class TaskGroup;
+    DISALLOW_COPY_AND_ASSIGN(RemoteTaskQueue);
+    base::BoundedQueue<bthread_t> _tasks;
+    base::Mutex _mutex;
+};
+
+}  // namespace bthread
+
+#endif  // BAIDU_BTHREAD_REMOTE_TASK_QUEUE_H