doubly_buffered_data.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. // Baidu RPC - A framework to host and access services throughout Baidu.
  2. // Copyright (c) 2014 Baidu.com, Inc. All Rights Reserved
  3. // Author: Ge,Jun (gejun@baidu.com)
  4. // Date: Mon Sep 22 22:23:13 CST 2014
  5. #ifndef BASE_DOUBLY_BUFFERED_DATA_H
  6. #define BASE_DOUBLY_BUFFERED_DATA_H
  7. #include <vector> // std::vector
  8. #include <pthread.h>
  9. #include "base/scoped_lock.h"
  10. #include "base/thread_local.h"
  11. #include "base/logging.h"
  12. #include "base/macros.h"
  13. #include "base/type_traits.h"
  14. #include "base/errno.h"
  15. namespace base {
  16. // This data structure makes Read() almost lock-free by making Modify()
  17. // *much* slower. It's very suitable for implementing LoadBalancers which
  18. // have a lot of concurrent read-only ops from many threads and occasional
  19. // modifications of data. As a side effect, this data structure can store
  20. // a thread-local data for user.
  21. //
  22. // Read(): begin with a thread-local mutex locked then read the foreground
  23. // instance which will not be changed before the mutex is unlocked. Since the
  24. // mutex is only locked by Modify() with an empty critical section, the
  25. // function is almost lock-free.
  26. //
  27. // Modify(): Modify background instance which is not used by any Read(), flip
  28. // foreground and background, lock thread-local mutexes one by one to make
  29. // sure all existing Read() finish and later Read() see new foreground,
  30. // then modify background(foreground before flip) again.
  31. class Void { };
  32. template <typename T, typename TLS = Void>
  33. class DoublyBufferedData {
  34. class Wrapper;
  35. public:
  36. class ScopedPtr {
  37. friend class DoublyBufferedData;
  38. public:
  39. ScopedPtr() : _data(NULL), _w(NULL) {}
  40. ~ScopedPtr() {
  41. if (_w) {
  42. _w->EndRead();
  43. }
  44. }
  45. const T* get() const { return _data; }
  46. const T& operator*() const { return *_data; }
  47. const T* operator->() const { return _data; }
  48. TLS& tls() { return _w->user_tls(); }
  49. private:
  50. DISALLOW_COPY_AND_ASSIGN(ScopedPtr);
  51. const T* _data;
  52. Wrapper* _w;
  53. };
  54. DoublyBufferedData();
  55. ~DoublyBufferedData();
  56. // Put foreground instance into ptr. The instance will not be changed until
  57. // ptr is destructed.
  58. // This function is not blocked by Read() and Modify() in other threads.
  59. // Returns 0 on success, -1 otherwise.
  60. int Read(ScopedPtr* ptr);
  61. // Modify background and foreground instances. fn(T&, ...) will be called
  62. // twice. Modify() from different threads are exclusive from each other.
  63. // NOTE: Call same series of fn to different equivalent instances should
  64. // result in equivalent instances, otherwise foreground and background
  65. // instance will be inconsistent.
  66. template <typename Fn> size_t Modify(Fn& fn);
  67. template <typename Fn, typename Arg1> size_t Modify(Fn& fn, const Arg1&);
  68. template <typename Fn, typename Arg1, typename Arg2>
  69. size_t Modify(Fn& fn, const Arg1&, const Arg2&);
  70. // fn(T& background, const T& foreground, ...) will be called to background
  71. // and foreground instances respectively.
  72. template <typename Fn> size_t ModifyWithForeground(Fn& fn);
  73. template <typename Fn, typename Arg1>
  74. size_t ModifyWithForeground(Fn& fn, const Arg1&);
  75. template <typename Fn, typename Arg1, typename Arg2>
  76. size_t ModifyWithForeground(Fn& fn, const Arg1&, const Arg2&);
  77. private:
  78. template <typename Fn>
  79. struct WithFG0 {
  80. WithFG0(Fn& fn, T* data) : _fn(fn), _data(data) { }
  81. size_t operator()(T& bg) {
  82. return _fn(bg, (const T&)_data[&bg == _data]);
  83. }
  84. private:
  85. Fn& _fn;
  86. T* _data;
  87. };
  88. template <typename Fn, typename Arg1>
  89. struct WithFG1 {
  90. WithFG1(Fn& fn, T* data, const Arg1& arg1)
  91. : _fn(fn), _data(data), _arg1(arg1) {}
  92. size_t operator()(T& bg) {
  93. return _fn(bg, (const T&)_data[&bg == _data], _arg1);
  94. }
  95. private:
  96. Fn& _fn;
  97. T* _data;
  98. const Arg1& _arg1;
  99. };
  100. template <typename Fn, typename Arg1, typename Arg2>
  101. struct WithFG2 {
  102. WithFG2(Fn& fn, T* data, const Arg1& arg1, const Arg2& arg2)
  103. : _fn(fn), _data(data), _arg1(arg1), _arg2(arg2) {}
  104. size_t operator()(T& bg) {
  105. return _fn(bg, (const T&)_data[&bg == _data], _arg1, _arg2);
  106. }
  107. private:
  108. Fn& _fn;
  109. T* _data;
  110. const Arg1& _arg1;
  111. const Arg2& _arg2;
  112. };
  113. template <typename Fn, typename Arg1>
  114. struct Closure1 {
  115. Closure1(Fn& fn, const Arg1& arg1) : _fn(fn), _arg1(arg1) {}
  116. size_t operator()(T& bg) { return _fn(bg, _arg1); }
  117. private:
  118. Fn& _fn;
  119. const Arg1& _arg1;
  120. };
  121. template <typename Fn, typename Arg1, typename Arg2>
  122. struct Closure2 {
  123. Closure2(Fn& fn, const Arg1& arg1, const Arg2& arg2)
  124. : _fn(fn), _arg1(arg1), _arg2(arg2) {}
  125. size_t operator()(T& bg) { return _fn(bg, _arg1, _arg2); }
  126. private:
  127. Fn& _fn;
  128. const Arg1& _arg1;
  129. const Arg2& _arg2;
  130. };
  131. const T* UnsafeRead() const { return _data + _index; }
  132. Wrapper* AddWrapper();
  133. void RemoveWrapper(Wrapper*);
  134. // Foreground and background void.
  135. T _data[2];
  136. // Index of foreground instance.
  137. short _index;
  138. // Key to access thread-local wrappers.
  139. bool _created_key;
  140. pthread_key_t _wrapper_key;
  141. // All thread-local instances.
  142. std::vector<Wrapper*> _wrappers;
  143. // Sequence access to _wrappers.
  144. pthread_mutex_t _wrappers_mutex;
  145. // Sequence modifications.
  146. pthread_mutex_t _modify_mutex;
  147. };
  148. static const pthread_key_t INVALID_PTHREAD_KEY = (pthread_key_t)-1;
  149. template <typename T, typename TLS>
  150. class DoublyBufferedDataWrapperBase {
  151. public:
  152. TLS& user_tls() { return _user_tls; }
  153. protected:
  154. TLS _user_tls;
  155. };
  156. template <typename T>
  157. class DoublyBufferedDataWrapperBase<T, Void> {
  158. };
  159. template <typename T, typename TLS>
  160. class DoublyBufferedData<T, TLS>::Wrapper
  161. : public DoublyBufferedDataWrapperBase<T, TLS> {
  162. friend class DoublyBufferedData;
  163. public:
  164. explicit Wrapper(DoublyBufferedData* c) : _control(c) {
  165. pthread_mutex_init(&_mutex, NULL);
  166. }
  167. ~Wrapper() {
  168. if (_control != NULL) {
  169. _control->RemoveWrapper(this);
  170. }
  171. pthread_mutex_destroy(&_mutex);
  172. }
  173. // _mutex will be locked by the calling pthread and DoublyBufferedData.
  174. // Most of the time, no modifications are done, so the mutex is
  175. // uncontended and fast.
  176. inline void BeginRead() {
  177. pthread_mutex_lock(&_mutex);
  178. }
  179. inline void EndRead() {
  180. pthread_mutex_unlock(&_mutex);
  181. }
  182. inline void WaitReadDone() {
  183. BAIDU_SCOPED_LOCK(_mutex);
  184. }
  185. private:
  186. DoublyBufferedData* _control;
  187. pthread_mutex_t _mutex;
  188. };
  189. // Called when thread initializes thread-local wrapper.
  190. template <typename T, typename TLS>
  191. typename DoublyBufferedData<T, TLS>::Wrapper*
  192. DoublyBufferedData<T, TLS>::AddWrapper() {
  193. Wrapper* w = new (std::nothrow) Wrapper(this);
  194. if (NULL == w) {
  195. return NULL;
  196. }
  197. try {
  198. BAIDU_SCOPED_LOCK(_wrappers_mutex);
  199. _wrappers.push_back(w);
  200. } catch (std::exception& e) {
  201. return NULL;
  202. }
  203. return w;
  204. }
  205. // Called when thread quits.
  206. template <typename T, typename TLS>
  207. void DoublyBufferedData<T, TLS>::RemoveWrapper(
  208. typename DoublyBufferedData<T, TLS>::Wrapper* w) {
  209. if (NULL == w) {
  210. return;
  211. }
  212. BAIDU_SCOPED_LOCK(_wrappers_mutex);
  213. for (size_t i = 0; i < _wrappers.size(); ++i) {
  214. if (_wrappers[i] == w) {
  215. _wrappers[i] = _wrappers.back();
  216. _wrappers.pop_back();
  217. return;
  218. }
  219. }
  220. }
  221. template <typename T, typename TLS>
  222. DoublyBufferedData<T, TLS>::DoublyBufferedData()
  223. : _index(0)
  224. , _created_key(false)
  225. , _wrapper_key(0) {
  226. _wrappers.reserve(64);
  227. pthread_mutex_init(&_modify_mutex, NULL);
  228. pthread_mutex_init(&_wrappers_mutex, NULL);
  229. const int rc = pthread_key_create(&_wrapper_key,
  230. base::delete_object<Wrapper>);
  231. if (rc != 0) {
  232. LOG(FATAL) << "Fail to pthread_key_create: " << berror(rc);
  233. } else {
  234. _created_key = true;
  235. }
  236. // Initialize _data for some POD types. This is essential for pointer
  237. // types because they should be Read() as NULL before any Modify().
  238. if (is_integral<T>::value || is_floating_point<T>::value ||
  239. is_pointer<T>::value || is_member_function_pointer<T>::value) {
  240. _data[0] = T();
  241. _data[1] = T();
  242. }
  243. }
  244. template <typename T, typename TLS>
  245. DoublyBufferedData<T, TLS>::~DoublyBufferedData() {
  246. // User is responsible for synchronizations between Read()/Modify() and
  247. // this function.
  248. if (_created_key) {
  249. pthread_key_delete(_wrapper_key);
  250. }
  251. {
  252. BAIDU_SCOPED_LOCK(_wrappers_mutex);
  253. for (size_t i = 0; i < _wrappers.size(); ++i) {
  254. _wrappers[i]->_control = NULL; // hack: disable removal.
  255. delete _wrappers[i];
  256. }
  257. _wrappers.clear();
  258. }
  259. pthread_mutex_destroy(&_modify_mutex);
  260. pthread_mutex_destroy(&_wrappers_mutex);
  261. }
  262. template <typename T, typename TLS>
  263. int DoublyBufferedData<T, TLS>::Read(
  264. typename DoublyBufferedData<T, TLS>::ScopedPtr* ptr) {
  265. if (BAIDU_UNLIKELY(!_created_key)) {
  266. return -1;
  267. }
  268. Wrapper* w = static_cast<Wrapper*>(pthread_getspecific(_wrapper_key));
  269. if (BAIDU_LIKELY(w != NULL)) {
  270. w->BeginRead();
  271. ptr->_data = UnsafeRead();
  272. ptr->_w = w;
  273. return 0;
  274. }
  275. w = AddWrapper();
  276. if (BAIDU_LIKELY(w != NULL)) {
  277. const int rc = pthread_setspecific(_wrapper_key, w);
  278. if (rc == 0) {
  279. w->BeginRead();
  280. ptr->_data = UnsafeRead();
  281. ptr->_w = w;
  282. return 0;
  283. }
  284. }
  285. return -1;
  286. }
  287. template <typename T, typename TLS>
  288. template <typename Fn>
  289. size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn) {
  290. // _modify_mutex sequences modifications. Using a separate mutex rather
  291. // than _wrappers_mutex is to avoid blocking threads calling
  292. // AddWrapper() or RemoveWrapper() too long. Most of the time, modifications
  293. // are done by one thread, contention should be negligible.
  294. BAIDU_SCOPED_LOCK(_modify_mutex);
  295. // background instance is not accessed by other threads, being safe to
  296. // modify.
  297. const size_t ret = fn(_data[!_index]);
  298. if (!ret) {
  299. return 0;
  300. }
  301. // Publish, flip background and foreground.
  302. _index = !_index;
  303. // Wait until all threads finishes current reading. When they begin next
  304. // read, they should see updated _index.
  305. {
  306. BAIDU_SCOPED_LOCK(_wrappers_mutex);
  307. for (size_t i = 0; i < _wrappers.size(); ++i) {
  308. _wrappers[i]->WaitReadDone();
  309. }
  310. }
  311. const size_t ret2 = fn(_data[!_index]);
  312. CHECK_EQ(ret2, ret) << "index=" << _index;
  313. return ret2;
  314. }
  315. template <typename T, typename TLS>
  316. template <typename Fn, typename Arg1>
  317. size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn, const Arg1& arg1) {
  318. Closure1<Fn, Arg1> c(fn, arg1);
  319. return Modify(c);
  320. }
  321. template <typename T, typename TLS>
  322. template <typename Fn, typename Arg1, typename Arg2>
  323. size_t DoublyBufferedData<T, TLS>::Modify(
  324. Fn& fn, const Arg1& arg1, const Arg2& arg2) {
  325. Closure2<Fn, Arg1, Arg2> c(fn, arg1, arg2);
  326. return Modify(c);
  327. }
  328. template <typename T, typename TLS>
  329. template <typename Fn>
  330. size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(Fn& fn) {
  331. WithFG0<Fn> c(fn, _data);
  332. return Modify(c);
  333. }
  334. template <typename T, typename TLS>
  335. template <typename Fn, typename Arg1>
  336. size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(Fn& fn, const Arg1& arg1) {
  337. WithFG1<Fn, Arg1> c(fn, _data, arg1);
  338. return Modify(c);
  339. }
  340. template <typename T, typename TLS>
  341. template <typename Fn, typename Arg1, typename Arg2>
  342. size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(
  343. Fn& fn, const Arg1& arg1, const Arg2& arg2) {
  344. WithFG2<Fn, Arg1, Arg2> c(fn, _data, arg1, arg2);
  345. return Modify(c);
  346. }
  347. } // namespace base
  348. #endif // BASE_DOUBLY_BUFFERED_DATA_H