123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485 |
- // bthread - A M:N threading library to make applications more concurrent.
- // Copyright (c) 2014 Baidu, Inc.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- // Author: Ge,Jun (gejun@baidu.com)
- // Date: Sun Aug 3 12:46:15 CST 2014
- #include <pthread.h>
- #include "butil/macros.h"
- #include "butil/atomicops.h"
- #include "bvar/passive_status.h"
- #include "bthread/errno.h" // EAGAIN
- #include "bthread/task_group.h" // TaskGroup
- // Implement bthread_key_t related functions
- namespace bthread {
- class KeyTable;
- // defined in task_group.cpp
- extern __thread TaskGroup* tls_task_group;
- extern __thread LocalStorage tls_bls;
- static __thread bool tls_ever_created_keytable = false;
- // We keep thread specific data in a two-level array. The top-level array
- // contains at most KEY_1STLEVEL_SIZE pointers to dynamically allocated
- // arrays of at most KEY_2NDLEVEL_SIZE data pointers. Many applications
- // may just occupy one or two second level array, thus this machanism keeps
- // memory footprint smaller and we can change KEY_1STLEVEL_SIZE to a
- // bigger number more freely. The tradeoff is an additional memory indirection:
- // negligible at most time.
- static const uint32_t KEY_2NDLEVEL_SIZE = 32;
- // Notice that we're trying to make the memory of second level and first
- // level both 256 bytes to make memory allocator happier.
- static const uint32_t KEY_1STLEVEL_SIZE = 31;
- // Max tls in one thread, currently the value is 992 which should be enough
- // for most projects throughout baidu.
- static const uint32_t KEYS_MAX = KEY_2NDLEVEL_SIZE * KEY_1STLEVEL_SIZE;
- // destructors/version of TLS.
- struct KeyInfo {
- uint32_t version;
- void (*dtor)(void*, const void*);
- const void* dtor_args;
- };
- static KeyInfo s_key_info[KEYS_MAX] = {};
- // For allocating keys.
- static pthread_mutex_t s_key_mutex = PTHREAD_MUTEX_INITIALIZER;
- static size_t nfreekey = 0;
- static size_t nkey = 0;
- static uint32_t s_free_keys[KEYS_MAX];
- // Stats.
- static butil::static_atomic<size_t> nkeytable = BUTIL_STATIC_ATOMIC_INIT(0);
- static butil::static_atomic<size_t> nsubkeytable = BUTIL_STATIC_ATOMIC_INIT(0);
- // The second-level array.
- // Align with cacheline to avoid false sharing.
- class BAIDU_CACHELINE_ALIGNMENT SubKeyTable {
- public:
- SubKeyTable() {
- memset(_data, 0, sizeof(_data));
- nsubkeytable.fetch_add(1, butil::memory_order_relaxed);
- }
- // NOTE: Call clear first.
- ~SubKeyTable() {
- nsubkeytable.fetch_sub(1, butil::memory_order_relaxed);
- }
- void clear(uint32_t offset) {
- for (uint32_t i = 0; i < KEY_2NDLEVEL_SIZE; ++i) {
- void* p = _data[i].ptr;
- if (p) {
- // Set the position to NULL before calling dtor which may set
- // the position again.
- _data[i].ptr = NULL;
-
- KeyInfo info = bthread::s_key_info[offset + i];
- if (info.dtor && _data[i].version == info.version) {
- info.dtor(p, info.dtor_args);
- }
- }
- }
- }
- bool cleared() const {
- // We need to iterate again to check if every slot is empty. An
- // alternative is remember if set_data() was called during clear.
- for (uint32_t i = 0; i < KEY_2NDLEVEL_SIZE; ++i) {
- if (_data[i].ptr) {
- return false;
- }
- }
- return true;
- }
- inline void* get_data(uint32_t index, uint32_t version) const {
- if (_data[index].version == version) {
- return _data[index].ptr;
- }
- return NULL;
- }
- inline void set_data(uint32_t index, uint32_t version, void* data) {
- _data[index].version = version;
- _data[index].ptr = data;
- }
- private:
- struct Data {
- uint32_t version;
- void* ptr;
- };
- Data _data[KEY_2NDLEVEL_SIZE];
- };
- // The first-level array.
- // Align with cacheline to avoid false sharing.
- class BAIDU_CACHELINE_ALIGNMENT KeyTable {
- public:
- KeyTable() : next(NULL) {
- memset(_subs, 0, sizeof(_subs));
- nkeytable.fetch_add(1, butil::memory_order_relaxed);
- }
- ~KeyTable() {
- nkeytable.fetch_sub(1, butil::memory_order_relaxed);
- for (int ntry = 0; ntry < PTHREAD_DESTRUCTOR_ITERATIONS; ++ntry) {
- for (uint32_t i = 0; i < KEY_1STLEVEL_SIZE; ++i) {
- if (_subs[i]) {
- _subs[i]->clear(i * KEY_2NDLEVEL_SIZE);
- }
- }
- bool all_cleared = true;
- for (uint32_t i = 0; i < KEY_1STLEVEL_SIZE; ++i) {
- if (_subs[i] != NULL && !_subs[i]->cleared()) {
- all_cleared = false;
- break;
- }
- }
- if (all_cleared) {
- for (uint32_t i = 0; i < KEY_1STLEVEL_SIZE; ++i) {
- delete _subs[i];
- }
- return;
- }
- }
- LOG(ERROR) << "Fail to destroy all objects in KeyTable[" << this << ']';
- }
- inline void* get_data(bthread_key_t key) const {
- const uint32_t subidx = key.index / KEY_2NDLEVEL_SIZE;
- if (subidx < KEY_1STLEVEL_SIZE) {
- const SubKeyTable* sub_kt = _subs[subidx];
- if (sub_kt) {
- return sub_kt->get_data(
- key.index - subidx * KEY_2NDLEVEL_SIZE, key.version);
- }
- }
- return NULL;
- }
- inline int set_data(bthread_key_t key, void* data) {
- const uint32_t subidx = key.index / KEY_2NDLEVEL_SIZE;
- if (subidx < KEY_1STLEVEL_SIZE &&
- key.version == s_key_info[key.index].version) {
- SubKeyTable* sub_kt = _subs[subidx];
- if (sub_kt == NULL) {
- sub_kt = new (std::nothrow) SubKeyTable;
- if (NULL == sub_kt) {
- return ENOMEM;
- }
- _subs[subidx] = sub_kt;
- }
- sub_kt->set_data(key.index - subidx * KEY_2NDLEVEL_SIZE,
- key.version, data);
- return 0;
- }
- CHECK(false) << "bthread_setspecific is called on invalid " << key;
- return EINVAL;
- }
- public:
- KeyTable* next;
- private:
- SubKeyTable* _subs[KEY_1STLEVEL_SIZE];
- };
- static KeyTable* borrow_keytable(bthread_keytable_pool_t* pool) {
- if (pool != NULL && pool->free_keytables) {
- BAIDU_SCOPED_LOCK(pool->mutex);
- KeyTable* p = (KeyTable*)pool->free_keytables;
- if (p) {
- pool->free_keytables = p->next;
- return p;
- }
- }
- return NULL;
- }
- // Referenced in task_group.cpp, must be extern.
- // Caller of this function must hold the KeyTable
- void return_keytable(bthread_keytable_pool_t* pool, KeyTable* kt) {
- if (NULL == kt) {
- return;
- }
- if (pool == NULL) {
- delete kt;
- return;
- }
- std::unique_lock<pthread_mutex_t> mu(pool->mutex);
- if (pool->destroyed) {
- mu.unlock();
- delete kt;
- return;
- }
- kt->next = (KeyTable*)pool->free_keytables;
- pool->free_keytables = kt;
- }
- static void cleanup_pthread() {
- KeyTable* kt = tls_bls.keytable;
- if (kt) {
- delete kt;
- // After deletion: tls may be set during deletion.
- tls_bls.keytable = NULL;
- }
- }
- static void arg_as_dtor(void* data, const void* arg) {
- typedef void (*KeyDtor)(void*);
- return ((KeyDtor)arg)(data);
- }
- static int get_key_count(void*) {
- BAIDU_SCOPED_LOCK(bthread::s_key_mutex);
- return (int)nkey - (int)nfreekey;
- }
- static size_t get_keytable_count(void*) {
- return nkeytable.load(butil::memory_order_relaxed);
- }
- static size_t get_keytable_memory(void*) {
- const size_t n = nkeytable.load(butil::memory_order_relaxed);
- const size_t nsub = nsubkeytable.load(butil::memory_order_relaxed);
- return n * sizeof(KeyTable) + nsub * sizeof(SubKeyTable);
- }
- static bvar::PassiveStatus<int> s_bthread_key_count(
- "bthread_key_count", get_key_count, NULL);
- static bvar::PassiveStatus<size_t> s_bthread_keytable_count(
- "bthread_keytable_count", get_keytable_count, NULL);
- static bvar::PassiveStatus<size_t> s_bthread_keytable_memory(
- "bthread_keytable_memory", get_keytable_memory, NULL);
- } // namespace bthread
- extern "C" {
- int bthread_keytable_pool_init(bthread_keytable_pool_t* pool) {
- if (pool == NULL) {
- LOG(ERROR) << "Param[pool] is NULL";
- return EINVAL;
- }
- pthread_mutex_init(&pool->mutex, NULL);
- pool->free_keytables = NULL;
- pool->destroyed = 0;
- return 0;
- }
- int bthread_keytable_pool_destroy(bthread_keytable_pool_t* pool) {
- if (pool == NULL) {
- LOG(ERROR) << "Param[pool] is NULL";
- return EINVAL;
- }
- bthread::KeyTable* saved_free_keytables = NULL;
- {
- BAIDU_SCOPED_LOCK(pool->mutex);
- if (pool->free_keytables) {
- saved_free_keytables = (bthread::KeyTable*)pool->free_keytables;
- pool->free_keytables = NULL;
- }
- pool->destroyed = 1;
- }
- // Cheat get/setspecific and destroy the keytables.
- bthread::TaskGroup* const g = bthread::tls_task_group;
- bthread::KeyTable* old_kt = bthread::tls_bls.keytable;
- while (saved_free_keytables) {
- bthread::KeyTable* kt = saved_free_keytables;
- saved_free_keytables = kt->next;
- bthread::tls_bls.keytable = kt;
- if (g) {
- g->current_task()->local_storage.keytable = kt;
- }
- delete kt;
- if (old_kt == kt) {
- old_kt = NULL;
- }
- }
- bthread::tls_bls.keytable = old_kt;
- if (g) {
- g->current_task()->local_storage.keytable = old_kt;
- }
- // TODO: return_keytable may race with this function, we don't destroy
- // the mutex right now.
- // pthread_mutex_destroy(&pool->mutex);
- return 0;
- }
- int bthread_keytable_pool_getstat(bthread_keytable_pool_t* pool,
- bthread_keytable_pool_stat_t* stat) {
- if (pool == NULL || stat == NULL) {
- LOG(ERROR) << "Param[pool] or Param[stat] is NULL";
- return EINVAL;
- }
- std::unique_lock<pthread_mutex_t> mu(pool->mutex);
- size_t count = 0;
- bthread::KeyTable* p = (bthread::KeyTable*)pool->free_keytables;
- for (; p; p = p->next, ++count) {}
- stat->nfree = count;
- return 0;
- }
- // TODO: this is not strict `reserve' because we only check #free.
- // Currently there's no way to track KeyTables that may be returned
- // to the pool in future.
- void bthread_keytable_pool_reserve(bthread_keytable_pool_t* pool,
- size_t nfree,
- bthread_key_t key,
- void* ctor(const void*),
- const void* ctor_args) {
- if (pool == NULL) {
- LOG(ERROR) << "Param[pool] is NULL";
- return;
- }
- bthread_keytable_pool_stat_t stat;
- if (bthread_keytable_pool_getstat(pool, &stat) != 0) {
- LOG(ERROR) << "Fail to getstat of pool=" << pool;
- return;
- }
- for (size_t i = stat.nfree; i < nfree; ++i) {
- bthread::KeyTable* kt = new (std::nothrow) bthread::KeyTable;
- if (kt == NULL) {
- break;
- }
- void* data = ctor(ctor_args);
- if (data) {
- kt->set_data(key, data);
- } // else append kt w/o data.
- std::unique_lock<pthread_mutex_t> mu(pool->mutex);
- if (pool->destroyed) {
- mu.unlock();
- delete kt;
- break;
- }
- kt->next = (bthread::KeyTable*)pool->free_keytables;
- pool->free_keytables = kt;
- if (data == NULL) {
- break;
- }
- }
- }
- int bthread_key_create2(bthread_key_t* key,
- void (*dtor)(void*, const void*),
- const void* dtor_args) __THROW {
- uint32_t index = 0;
- {
- BAIDU_SCOPED_LOCK(bthread::s_key_mutex);
- if (bthread::nfreekey > 0) {
- index = bthread::s_free_keys[--bthread::nfreekey];
- } else if (bthread::nkey < bthread::KEYS_MAX) {
- index = bthread::nkey++;
- } else {
- return EAGAIN; // what pthread_key_create returns in this case.
- }
- }
- bthread::s_key_info[index].dtor = dtor;
- bthread::s_key_info[index].dtor_args = dtor_args;
- key->index = index;
- key->version = bthread::s_key_info[index].version;
- if (key->version == 0) {
- ++bthread::s_key_info[index].version;
- ++key->version;
- }
- return 0;
- }
- int bthread_key_create(bthread_key_t* key, void (*dtor)(void*)) __THROW {
- if (dtor == NULL) {
- return bthread_key_create2(key, NULL, NULL);
- } else {
- return bthread_key_create2(key, bthread::arg_as_dtor, (const void*)dtor);
- }
- }
- int bthread_key_delete(bthread_key_t key) __THROW {
- if (key.index < bthread::KEYS_MAX &&
- key.version == bthread::s_key_info[key.index].version) {
- BAIDU_SCOPED_LOCK(bthread::s_key_mutex);
- if (key.version == bthread::s_key_info[key.index].version) {
- if (++bthread::s_key_info[key.index].version == 0) {
- ++bthread::s_key_info[key.index].version;
- }
- bthread::s_key_info[key.index].dtor = NULL;
- bthread::s_key_info[key.index].dtor_args = NULL;
- bthread::s_free_keys[bthread::nfreekey++] = key.index;
- return 0;
- }
- }
- CHECK(false) << "bthread_key_delete is called on invalid " << key;
- return EINVAL;
- }
- // NOTE: Can't borrow_keytable in bthread_setspecific, otherwise following
- // memory leak may occur:
- // -> bthread_getspecific fails to borrow_keytable and returns NULL.
- // -> bthread_setspecific succeeds to borrow_keytable and overwrites old data
- // at the position with newly created data, the old data is leaked.
- int bthread_setspecific(bthread_key_t key, void* data) __THROW {
- bthread::KeyTable* kt = bthread::tls_bls.keytable;
- if (NULL == kt) {
- kt = new (std::nothrow) bthread::KeyTable;
- if (NULL == kt) {
- return ENOMEM;
- }
- bthread::tls_bls.keytable = kt;
- bthread::TaskGroup* const g = bthread::tls_task_group;
- if (g) {
- g->current_task()->local_storage.keytable = kt;
- }
- if (!bthread::tls_ever_created_keytable) {
- bthread::tls_ever_created_keytable = true;
- CHECK_EQ(0, butil::thread_atexit(bthread::cleanup_pthread));
- }
- }
- return kt->set_data(key, data);
- }
- void* bthread_getspecific(bthread_key_t key) __THROW {
- bthread::KeyTable* kt = bthread::tls_bls.keytable;
- if (kt) {
- return kt->get_data(key);
- }
- bthread::TaskGroup* const g = bthread::tls_task_group;
- if (g) {
- bthread::TaskMeta* const task = g->current_task();
- kt = bthread::borrow_keytable(task->attr.keytable_pool);
- if (kt) {
- g->current_task()->local_storage.keytable = kt;
- bthread::tls_bls.keytable = kt;
- return kt->get_data(key);
- }
- }
- return NULL;
- }
- void bthread_assign_data(void* data) __THROW {
- bthread::tls_bls.assigned_data = data;
- bthread::TaskGroup* const g = bthread::tls_task_group;
- if (g) {
- g->current_task()->local_storage.assigned_data = data;
- }
- }
- void* bthread_get_assigned_data() __THROW {
- return bthread::tls_bls.assigned_data;
- }
- } // extern "C"
|