123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461 |
- // Licensed to the Apache Software Foundation (ASF) under one
- // or more contributor license agreements. See the NOTICE file
- // distributed with this work for additional information
- // regarding copyright ownership. The ASF licenses this file
- // to you under the Apache License, Version 2.0 (the
- // "License"); you may not use this file except in compliance
- // with the License. You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing,
- // software distributed under the License is distributed on an
- // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- // KIND, either express or implied. See the License for the
- // specific language governing permissions and limitations
- // under the License.
- #include <map>
- #include <gtest/gtest.h>
- #include "butil/atomicops.h"
- #include "butil/time.h"
- #include "butil/macros.h"
- #include "butil/scoped_lock.h"
- #include "butil/gperftools_profiler.h"
- #include "bthread/bthread.h"
- #include "bthread/condition_variable.h"
- #include "bthread/stack.h"
- namespace {
- struct Arg {
- bthread_mutex_t m;
- bthread_cond_t c;
- };
- pthread_mutex_t wake_mutex = PTHREAD_MUTEX_INITIALIZER;
- long signal_start_time = 0;
- std::vector<bthread_t> wake_tid;
- std::vector<long> wake_time;
- volatile bool stop = false;
- const long SIGNAL_INTERVAL_US = 10000;
- void* signaler(void* void_arg) {
- Arg* a = (Arg*)void_arg;
- signal_start_time = butil::gettimeofday_us();
- while (!stop) {
- bthread_usleep(SIGNAL_INTERVAL_US);
- bthread_cond_signal(&a->c);
- }
- return NULL;
- }
- void* waiter(void* void_arg) {
- Arg* a = (Arg*)void_arg;
- bthread_mutex_lock(&a->m);
- while (!stop) {
- bthread_cond_wait(&a->c, &a->m);
-
- BAIDU_SCOPED_LOCK(wake_mutex);
- wake_tid.push_back(bthread_self());
- wake_time.push_back(butil::gettimeofday_us());
- }
- bthread_mutex_unlock(&a->m);
- return NULL;
- }
- TEST(CondTest, sanity) {
- Arg a;
- ASSERT_EQ(0, bthread_mutex_init(&a.m, NULL));
- ASSERT_EQ(0, bthread_cond_init(&a.c, NULL));
- // has no effect
- ASSERT_EQ(0, bthread_cond_signal(&a.c));
- stop = false;
- wake_tid.resize(1024);
- wake_tid.clear();
- wake_time.resize(1024);
- wake_time.clear();
-
- bthread_t wth[8];
- const size_t NW = ARRAY_SIZE(wth);
- for (size_t i = 0; i < NW; ++i) {
- ASSERT_EQ(0, bthread_start_urgent(&wth[i], NULL, waiter, &a));
- }
-
- bthread_t sth;
- ASSERT_EQ(0, bthread_start_urgent(&sth, NULL, signaler, &a));
- bthread_usleep(SIGNAL_INTERVAL_US * 200);
- pthread_mutex_lock(&wake_mutex);
- const size_t nbeforestop = wake_time.size();
- pthread_mutex_unlock(&wake_mutex);
-
- stop = true;
- for (size_t i = 0; i < NW; ++i) {
- bthread_cond_signal(&a.c);
- }
-
- bthread_join(sth, NULL);
- for (size_t i = 0; i < NW; ++i) {
- bthread_join(wth[i], NULL);
- }
- printf("wake up for %lu times\n", wake_tid.size());
- // Check timing
- long square_sum = 0;
- for (size_t i = 0; i < nbeforestop; ++i) {
- long last_time = (i ? wake_time[i-1] : signal_start_time);
- long delta = wake_time[i] - last_time - SIGNAL_INTERVAL_US;
- EXPECT_GT(wake_time[i], last_time);
- square_sum += delta * delta;
- EXPECT_LT(labs(delta), 10000L) << "error[" << i << "]=" << delta << "="
- << wake_time[i] << " - " << last_time;
- }
- printf("Average error is %fus\n", sqrt(square_sum / std::max(nbeforestop, 1UL)));
- // Check fairness
- std::map<bthread_t, int> count;
- for (size_t i = 0; i < wake_tid.size(); ++i) {
- ++count[wake_tid[i]];
- }
- EXPECT_EQ(NW, count.size());
- int avg_count = (int)(wake_tid.size() / count.size());
- for (std::map<bthread_t, int>::iterator
- it = count.begin(); it != count.end(); ++it) {
- ASSERT_LE(abs(it->second - avg_count), 1)
- << "bthread=" << it->first
- << " count=" << it->second
- << " avg=" << avg_count;
- printf("%" PRId64 " wakes up %d times\n", it->first, it->second);
- }
- bthread_cond_destroy(&a.c);
- bthread_mutex_destroy(&a.m);
- }
- struct WrapperArg {
- bthread::Mutex mutex;
- bthread::ConditionVariable cond;
- };
- void* cv_signaler(void* void_arg) {
- WrapperArg* a = (WrapperArg*)void_arg;
- signal_start_time = butil::gettimeofday_us();
- while (!stop) {
- bthread_usleep(SIGNAL_INTERVAL_US);
- a->cond.notify_one();
- }
- return NULL;
- }
- void* cv_bmutex_waiter(void* void_arg) {
- WrapperArg* a = (WrapperArg*)void_arg;
- std::unique_lock<bthread_mutex_t> lck(*a->mutex.native_handler());
- while (!stop) {
- a->cond.wait(lck);
- }
- return NULL;
- }
- void* cv_mutex_waiter(void* void_arg) {
- WrapperArg* a = (WrapperArg*)void_arg;
- std::unique_lock<bthread::Mutex> lck(a->mutex);
- while (!stop) {
- a->cond.wait(lck);
- }
- return NULL;
- }
- #define COND_IN_PTHREAD
- #ifndef COND_IN_PTHREAD
- #define pthread_join bthread_join
- #define pthread_create bthread_start_urgent
- #endif
- TEST(CondTest, cpp_wrapper) {
- stop = false;
- bthread::ConditionVariable cond;
- pthread_t bmutex_waiter_threads[8];
- pthread_t mutex_waiter_threads[8];
- pthread_t signal_thread;
- WrapperArg a;
- for (size_t i = 0; i < ARRAY_SIZE(bmutex_waiter_threads); ++i) {
- ASSERT_EQ(0, pthread_create(&bmutex_waiter_threads[i], NULL,
- cv_bmutex_waiter, &a));
- ASSERT_EQ(0, pthread_create(&mutex_waiter_threads[i], NULL,
- cv_mutex_waiter, &a));
- }
- ASSERT_EQ(0, pthread_create(&signal_thread, NULL, cv_signaler, &a));
- bthread_usleep(100L * 1000);
- {
- BAIDU_SCOPED_LOCK(a.mutex);
- stop = true;
- }
- pthread_join(signal_thread, NULL);
- a.cond.notify_all();
- for (size_t i = 0; i < ARRAY_SIZE(bmutex_waiter_threads); ++i) {
- pthread_join(bmutex_waiter_threads[i], NULL);
- pthread_join(mutex_waiter_threads[i], NULL);
- }
- }
- #ifndef COND_IN_PTHREAD
- #undef pthread_join
- #undef pthread_create
- #endif
- class Signal {
- protected:
- Signal() : _signal(0) {}
- void notify() {
- BAIDU_SCOPED_LOCK(_m);
- ++_signal;
- _c.notify_one();
- }
- int wait(int old_signal) {
- std::unique_lock<bthread::Mutex> lck(_m);
- while (_signal == old_signal) {
- _c.wait(lck);
- }
- return _signal;
- }
- private:
- bthread::Mutex _m;
- bthread::ConditionVariable _c;
- int _signal;
- };
- struct PingPongArg {
- bool stopped;
- Signal sig1;
- Signal sig2;
- butil::atomic<int> nthread;
- butil::atomic<long> total_count;
- };
- void *ping_pong_thread(void* arg) {
- PingPongArg* a = (PingPongArg*)arg;
- long local_count = 0;
- bool odd = (a->nthread.fetch_add(1)) % 2;
- int old_signal = 0;
- while (!a->stopped) {
- if (odd) {
- a->sig1.notify();
- old_signal = a->sig2.wait(old_signal);
- } else {
- old_signal = a->sig1.wait(old_signal);
- a->sig2.notify();
- }
- ++local_count;
- }
- a->total_count.fetch_add(local_count);
- return NULL;
- }
- TEST(CondTest, ping_pong) {
- PingPongArg arg;
- arg.stopped = false;
- arg.nthread = 0;
- bthread_t threads[2];
- ProfilerStart("cond.prof");
- for (int i = 0; i < 2; ++i) {
- ASSERT_EQ(0, bthread_start_urgent(&threads[i], NULL, ping_pong_thread, &arg));
- }
- usleep(1000 * 1000);
- arg.stopped = true;
- arg.sig1.notify();
- arg.sig2.notify();
- for (int i = 0; i < 2; ++i) {
- ASSERT_EQ(0, bthread_join(threads[i], NULL));
- }
- ProfilerStop();
- LOG(INFO) << "total_count=" << arg.total_count.load();
- }
- struct BroadcastArg {
- bthread::ConditionVariable wait_cond;
- bthread::ConditionVariable broadcast_cond;
- bthread::Mutex mutex;
- int nwaiter;
- int cur_waiter;
- int rounds;
- int sig;
- };
- void* wait_thread(void* arg) {
- BroadcastArg* ba = (BroadcastArg*)arg;
- std::unique_lock<bthread::Mutex> lck(ba->mutex);
- while (ba->rounds > 0) {
- const int saved_round = ba->rounds;
- ++ba->cur_waiter;
- while (saved_round == ba->rounds) {
- if (ba->cur_waiter >= ba->nwaiter) {
- ba->broadcast_cond.notify_one();
- }
- ba->wait_cond.wait(lck);
- }
- }
- return NULL;
- }
- void* broadcast_thread(void* arg) {
- BroadcastArg* ba = (BroadcastArg*)arg;
- //int local_round = 0;
- while (ba->rounds > 0) {
- std::unique_lock<bthread::Mutex> lck(ba->mutex);
- while (ba->cur_waiter < ba->nwaiter) {
- ba->broadcast_cond.wait(lck);
- }
- ba->cur_waiter = 0;
- --ba->rounds;
- ba->wait_cond.notify_all();
- }
- return NULL;
- }
- void* disturb_thread(void* arg) {
- BroadcastArg* ba = (BroadcastArg*)arg;
- std::unique_lock<bthread::Mutex> lck(ba->mutex);
- while (ba->rounds > 0) {
- lck.unlock();
- lck.lock();
- }
- return NULL;
- }
- TEST(CondTest, mixed_usage) {
- BroadcastArg ba;
- ba.nwaiter = 0;
- ba.cur_waiter = 0;
- ba.rounds = 30000;
- const int NTHREADS = 10;
- ba.nwaiter = NTHREADS * 2;
- bthread_t normal_threads[NTHREADS];
- for (int i = 0; i < NTHREADS; ++i) {
- ASSERT_EQ(0, bthread_start_urgent(&normal_threads[i], NULL, wait_thread, &ba));
- }
- pthread_t pthreads[NTHREADS];
- for (int i = 0; i < NTHREADS; ++i) {
- ASSERT_EQ(0, pthread_create(&pthreads[i], NULL,
- wait_thread, &ba));
- }
- pthread_t broadcast;
- pthread_t disturb;
- ASSERT_EQ(0, pthread_create(&broadcast, NULL, broadcast_thread, &ba));
- ASSERT_EQ(0, pthread_create(&disturb, NULL, disturb_thread, &ba));
- for (int i = 0; i < NTHREADS; ++i) {
- bthread_join(normal_threads[i], NULL);
- pthread_join(pthreads[i], NULL);
- }
- pthread_join(broadcast, NULL);
- pthread_join(disturb, NULL);
- }
- class BthreadCond {
- public:
- BthreadCond() {
- bthread_cond_init(&_cond, NULL);
- bthread_mutex_init(&_mutex, NULL);
- _count = 1;
- }
- ~BthreadCond() {
- bthread_mutex_destroy(&_mutex);
- bthread_cond_destroy(&_cond);
- }
- void Init(int count = 1) {
- _count = count;
- }
- int Signal() {
- int ret = 0;
- bthread_mutex_lock(&_mutex);
- _count --;
- bthread_cond_signal(&_cond);
- bthread_mutex_unlock(&_mutex);
- return ret;
- }
- int Wait() {
- int ret = 0;
- bthread_mutex_lock(&_mutex);
- while (_count > 0) {
- ret = bthread_cond_wait(&_cond, &_mutex);
- }
- bthread_mutex_unlock(&_mutex);
- return ret;
- }
- private:
- int _count;
- bthread_cond_t _cond;
- bthread_mutex_t _mutex;
- };
- volatile bool g_stop = false;
- bool started_wait = false;
- bool ended_wait = false;
- void* usleep_thread(void *) {
- while (!g_stop) {
- bthread_usleep(1000L * 1000L);
- }
- return NULL;
- }
- void* wait_cond_thread(void* arg) {
- BthreadCond* c = (BthreadCond*)arg;
- started_wait = true;
- c->Wait();
- ended_wait = true;
- return NULL;
- }
- static void launch_many_bthreads() {
- g_stop = false;
- bthread_t tid;
- BthreadCond c;
- c.Init();
- butil::Timer tm;
- bthread_start_urgent(&tid, &BTHREAD_ATTR_PTHREAD, wait_cond_thread, &c);
- std::vector<bthread_t> tids;
- tids.reserve(32768);
- tm.start();
- for (size_t i = 0; i < 32768; ++i) {
- bthread_t t0;
- ASSERT_EQ(0, bthread_start_background(&t0, NULL, usleep_thread, NULL));
- tids.push_back(t0);
- }
- tm.stop();
- LOG(INFO) << "Creating bthreads took " << tm.u_elapsed() << " us";
- usleep(3 * 1000 * 1000L);
- c.Signal();
- g_stop = true;
- bthread_join(tid, NULL);
- for (size_t i = 0; i < tids.size(); ++i) {
- LOG_EVERY_SECOND(INFO) << "Joined " << i << " threads";
- bthread_join(tids[i], NULL);
- }
- LOG_EVERY_SECOND(INFO) << "Joined " << tids.size() << " threads";
- }
- TEST(CondTest, too_many_bthreads_from_pthread) {
- launch_many_bthreads();
- }
- static void* run_launch_many_bthreads(void*) {
- launch_many_bthreads();
- return NULL;
- }
- TEST(CondTest, too_many_bthreads_from_bthread) {
- bthread_t th;
- ASSERT_EQ(0, bthread_start_urgent(&th, NULL, run_launch_many_bthreads, NULL));
- bthread_join(th, NULL);
- }
- } // namespace
|