123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399 |
- // Baidu RPC - A framework to host and access services throughout Baidu.
- // Copyright (c) 2014 Baidu.com, Inc. All Rights Reserved
- // Author: Ge,Jun (gejun@baidu.com)
- // Date: Mon Sep 22 22:23:13 CST 2014
- #ifndef BASE_DOUBLY_BUFFERED_DATA_H
- #define BASE_DOUBLY_BUFFERED_DATA_H
- #include <vector> // std::vector
- #include <pthread.h>
- #include "base/scoped_lock.h"
- #include "base/thread_local.h"
- #include "base/logging.h"
- #include "base/macros.h"
- #include "base/type_traits.h"
- #include "base/errno.h"
- namespace base {
- // This data structure makes Read() almost lock-free by making Modify()
- // *much* slower. It's very suitable for implementing LoadBalancers which
- // have a lot of concurrent read-only ops from many threads and occasional
- // modifications of data. As a side effect, this data structure can store
- // a thread-local data for user.
- //
- // Read(): begin with a thread-local mutex locked then read the foreground
- // instance which will not be changed before the mutex is unlocked. Since the
- // mutex is only locked by Modify() with an empty critical section, the
- // function is almost lock-free.
- //
- // Modify(): Modify background instance which is not used by any Read(), flip
- // foreground and background, lock thread-local mutexes one by one to make
- // sure all existing Read() finish and later Read() see new foreground,
- // then modify background(foreground before flip) again.
- class Void { };
- template <typename T, typename TLS = Void>
- class DoublyBufferedData {
- class Wrapper;
- public:
- class ScopedPtr {
- friend class DoublyBufferedData;
- public:
- ScopedPtr() : _data(NULL), _w(NULL) {}
- ~ScopedPtr() {
- if (_w) {
- _w->EndRead();
- }
- }
- const T* get() const { return _data; }
- const T& operator*() const { return *_data; }
- const T* operator->() const { return _data; }
- TLS& tls() { return _w->user_tls(); }
-
- private:
- DISALLOW_COPY_AND_ASSIGN(ScopedPtr);
- const T* _data;
- Wrapper* _w;
- };
-
- DoublyBufferedData();
- ~DoublyBufferedData();
- // Put foreground instance into ptr. The instance will not be changed until
- // ptr is destructed.
- // This function is not blocked by Read() and Modify() in other threads.
- // Returns 0 on success, -1 otherwise.
- int Read(ScopedPtr* ptr);
- // Modify background and foreground instances. fn(T&, ...) will be called
- // twice. Modify() from different threads are exclusive from each other.
- // NOTE: Call same series of fn to different equivalent instances should
- // result in equivalent instances, otherwise foreground and background
- // instance will be inconsistent.
- template <typename Fn> size_t Modify(Fn& fn);
- template <typename Fn, typename Arg1> size_t Modify(Fn& fn, const Arg1&);
- template <typename Fn, typename Arg1, typename Arg2>
- size_t Modify(Fn& fn, const Arg1&, const Arg2&);
- // fn(T& background, const T& foreground, ...) will be called to background
- // and foreground instances respectively.
- template <typename Fn> size_t ModifyWithForeground(Fn& fn);
- template <typename Fn, typename Arg1>
- size_t ModifyWithForeground(Fn& fn, const Arg1&);
- template <typename Fn, typename Arg1, typename Arg2>
- size_t ModifyWithForeground(Fn& fn, const Arg1&, const Arg2&);
-
- private:
- template <typename Fn>
- struct WithFG0 {
- WithFG0(Fn& fn, T* data) : _fn(fn), _data(data) { }
- size_t operator()(T& bg) {
- return _fn(bg, (const T&)_data[&bg == _data]);
- }
- private:
- Fn& _fn;
- T* _data;
- };
- template <typename Fn, typename Arg1>
- struct WithFG1 {
- WithFG1(Fn& fn, T* data, const Arg1& arg1)
- : _fn(fn), _data(data), _arg1(arg1) {}
- size_t operator()(T& bg) {
- return _fn(bg, (const T&)_data[&bg == _data], _arg1);
- }
- private:
- Fn& _fn;
- T* _data;
- const Arg1& _arg1;
- };
- template <typename Fn, typename Arg1, typename Arg2>
- struct WithFG2 {
- WithFG2(Fn& fn, T* data, const Arg1& arg1, const Arg2& arg2)
- : _fn(fn), _data(data), _arg1(arg1), _arg2(arg2) {}
- size_t operator()(T& bg) {
- return _fn(bg, (const T&)_data[&bg == _data], _arg1, _arg2);
- }
- private:
- Fn& _fn;
- T* _data;
- const Arg1& _arg1;
- const Arg2& _arg2;
- };
- template <typename Fn, typename Arg1>
- struct Closure1 {
- Closure1(Fn& fn, const Arg1& arg1) : _fn(fn), _arg1(arg1) {}
- size_t operator()(T& bg) { return _fn(bg, _arg1); }
- private:
- Fn& _fn;
- const Arg1& _arg1;
- };
- template <typename Fn, typename Arg1, typename Arg2>
- struct Closure2 {
- Closure2(Fn& fn, const Arg1& arg1, const Arg2& arg2)
- : _fn(fn), _arg1(arg1), _arg2(arg2) {}
- size_t operator()(T& bg) { return _fn(bg, _arg1, _arg2); }
- private:
- Fn& _fn;
- const Arg1& _arg1;
- const Arg2& _arg2;
- };
- const T* UnsafeRead() const { return _data + _index; }
- Wrapper* AddWrapper();
- void RemoveWrapper(Wrapper*);
- // Foreground and background void.
- T _data[2];
- // Index of foreground instance.
- short _index;
- // Key to access thread-local wrappers.
- bool _created_key;
- pthread_key_t _wrapper_key;
- // All thread-local instances.
- std::vector<Wrapper*> _wrappers;
- // Sequence access to _wrappers.
- pthread_mutex_t _wrappers_mutex;
- // Sequence modifications.
- pthread_mutex_t _modify_mutex;
- };
- static const pthread_key_t INVALID_PTHREAD_KEY = (pthread_key_t)-1;
- template <typename T, typename TLS>
- class DoublyBufferedDataWrapperBase {
- public:
- TLS& user_tls() { return _user_tls; }
- protected:
- TLS _user_tls;
- };
- template <typename T>
- class DoublyBufferedDataWrapperBase<T, Void> {
- };
- template <typename T, typename TLS>
- class DoublyBufferedData<T, TLS>::Wrapper
- : public DoublyBufferedDataWrapperBase<T, TLS> {
- friend class DoublyBufferedData;
- public:
- explicit Wrapper(DoublyBufferedData* c) : _control(c) {
- pthread_mutex_init(&_mutex, NULL);
- }
-
- ~Wrapper() {
- if (_control != NULL) {
- _control->RemoveWrapper(this);
- }
- pthread_mutex_destroy(&_mutex);
- }
- // _mutex will be locked by the calling pthread and DoublyBufferedData.
- // Most of the time, no modifications are done, so the mutex is
- // uncontended and fast.
- inline void BeginRead() {
- pthread_mutex_lock(&_mutex);
- }
- inline void EndRead() {
- pthread_mutex_unlock(&_mutex);
- }
- inline void WaitReadDone() {
- BAIDU_SCOPED_LOCK(_mutex);
- }
-
- private:
- DoublyBufferedData* _control;
- pthread_mutex_t _mutex;
- };
- // Called when thread initializes thread-local wrapper.
- template <typename T, typename TLS>
- typename DoublyBufferedData<T, TLS>::Wrapper*
- DoublyBufferedData<T, TLS>::AddWrapper() {
- Wrapper* w = new (std::nothrow) Wrapper(this);
- if (NULL == w) {
- return NULL;
- }
- try {
- BAIDU_SCOPED_LOCK(_wrappers_mutex);
- _wrappers.push_back(w);
- } catch (std::exception& e) {
- return NULL;
- }
- return w;
- }
- // Called when thread quits.
- template <typename T, typename TLS>
- void DoublyBufferedData<T, TLS>::RemoveWrapper(
- typename DoublyBufferedData<T, TLS>::Wrapper* w) {
- if (NULL == w) {
- return;
- }
- BAIDU_SCOPED_LOCK(_wrappers_mutex);
- for (size_t i = 0; i < _wrappers.size(); ++i) {
- if (_wrappers[i] == w) {
- _wrappers[i] = _wrappers.back();
- _wrappers.pop_back();
- return;
- }
- }
- }
- template <typename T, typename TLS>
- DoublyBufferedData<T, TLS>::DoublyBufferedData()
- : _index(0)
- , _created_key(false)
- , _wrapper_key(0) {
- _wrappers.reserve(64);
- pthread_mutex_init(&_modify_mutex, NULL);
- pthread_mutex_init(&_wrappers_mutex, NULL);
- const int rc = pthread_key_create(&_wrapper_key,
- base::delete_object<Wrapper>);
- if (rc != 0) {
- LOG(FATAL) << "Fail to pthread_key_create: " << berror(rc);
- } else {
- _created_key = true;
- }
- // Initialize _data for some POD types. This is essential for pointer
- // types because they should be Read() as NULL before any Modify().
- if (is_integral<T>::value || is_floating_point<T>::value ||
- is_pointer<T>::value || is_member_function_pointer<T>::value) {
- _data[0] = T();
- _data[1] = T();
- }
- }
- template <typename T, typename TLS>
- DoublyBufferedData<T, TLS>::~DoublyBufferedData() {
- // User is responsible for synchronizations between Read()/Modify() and
- // this function.
- if (_created_key) {
- pthread_key_delete(_wrapper_key);
- }
-
- {
- BAIDU_SCOPED_LOCK(_wrappers_mutex);
- for (size_t i = 0; i < _wrappers.size(); ++i) {
- _wrappers[i]->_control = NULL; // hack: disable removal.
- delete _wrappers[i];
- }
- _wrappers.clear();
- }
- pthread_mutex_destroy(&_modify_mutex);
- pthread_mutex_destroy(&_wrappers_mutex);
- }
- template <typename T, typename TLS>
- int DoublyBufferedData<T, TLS>::Read(
- typename DoublyBufferedData<T, TLS>::ScopedPtr* ptr) {
- if (BAIDU_UNLIKELY(!_created_key)) {
- return -1;
- }
- Wrapper* w = static_cast<Wrapper*>(pthread_getspecific(_wrapper_key));
- if (BAIDU_LIKELY(w != NULL)) {
- w->BeginRead();
- ptr->_data = UnsafeRead();
- ptr->_w = w;
- return 0;
- }
- w = AddWrapper();
- if (BAIDU_LIKELY(w != NULL)) {
- const int rc = pthread_setspecific(_wrapper_key, w);
- if (rc == 0) {
- w->BeginRead();
- ptr->_data = UnsafeRead();
- ptr->_w = w;
- return 0;
- }
- }
- return -1;
- }
- template <typename T, typename TLS>
- template <typename Fn>
- size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn) {
- // _modify_mutex sequences modifications. Using a separate mutex rather
- // than _wrappers_mutex is to avoid blocking threads calling
- // AddWrapper() or RemoveWrapper() too long. Most of the time, modifications
- // are done by one thread, contention should be negligible.
- BAIDU_SCOPED_LOCK(_modify_mutex);
- // background instance is not accessed by other threads, being safe to
- // modify.
- const size_t ret = fn(_data[!_index]);
- if (!ret) {
- return 0;
- }
- // Publish, flip background and foreground.
- _index = !_index;
-
- // Wait until all threads finishes current reading. When they begin next
- // read, they should see updated _index.
- {
- BAIDU_SCOPED_LOCK(_wrappers_mutex);
- for (size_t i = 0; i < _wrappers.size(); ++i) {
- _wrappers[i]->WaitReadDone();
- }
- }
- const size_t ret2 = fn(_data[!_index]);
- CHECK_EQ(ret2, ret) << "index=" << _index;
- return ret2;
- }
- template <typename T, typename TLS>
- template <typename Fn, typename Arg1>
- size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn, const Arg1& arg1) {
- Closure1<Fn, Arg1> c(fn, arg1);
- return Modify(c);
- }
- template <typename T, typename TLS>
- template <typename Fn, typename Arg1, typename Arg2>
- size_t DoublyBufferedData<T, TLS>::Modify(
- Fn& fn, const Arg1& arg1, const Arg2& arg2) {
- Closure2<Fn, Arg1, Arg2> c(fn, arg1, arg2);
- return Modify(c);
- }
- template <typename T, typename TLS>
- template <typename Fn>
- size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(Fn& fn) {
- WithFG0<Fn> c(fn, _data);
- return Modify(c);
- }
- template <typename T, typename TLS>
- template <typename Fn, typename Arg1>
- size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(Fn& fn, const Arg1& arg1) {
- WithFG1<Fn, Arg1> c(fn, _data, arg1);
- return Modify(c);
- }
- template <typename T, typename TLS>
- template <typename Fn, typename Arg1, typename Arg2>
- size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(
- Fn& fn, const Arg1& arg1, const Arg2& arg2) {
- WithFG2<Fn, Arg1, Arg2> c(fn, _data, arg1, arg2);
- return Modify(c);
- }
- } // namespace base
- #endif // BASE_DOUBLY_BUFFERED_DATA_H
|