123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 |
- // Licensed to the Apache Software Foundation (ASF) under one
- // or more contributor license agreements. See the NOTICE file
- // distributed with this work for additional information
- // regarding copyright ownership. The ASF licenses this file
- // to you under the Apache License, Version 2.0 (the
- // "License"); you may not use this file except in compliance
- // with the License. You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing,
- // software distributed under the License is distributed on an
- // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- // KIND, either express or implied. See the License for the
- // specific language governing permissions and limitations
- // under the License.
- #include <gtest/gtest.h>
- #include "butil/atomicops.h"
- #include "butil/time.h"
- #include "butil/macros.h"
- #include "butil/logging.h"
- #include "bthread/butex.h"
- #include "bthread/task_control.h"
- #include "bthread/task_group.h"
- #include "bthread/bthread.h"
- #include "bthread/unstable.h"
- namespace bthread {
- extern butil::atomic<TaskControl*> g_task_control;
- inline TaskControl* get_task_control() {
- return g_task_control.load(butil::memory_order_consume);
- }
- } // namespace bthread
- namespace {
- TEST(ButexTest, wait_on_already_timedout_butex) {
- uint32_t* butex = bthread::butex_create_checked<uint32_t>();
- ASSERT_TRUE(butex);
- timespec now;
- ASSERT_EQ(0, clock_gettime(CLOCK_REALTIME, &now));
- *butex = 1;
- ASSERT_EQ(-1, bthread::butex_wait(butex, 1, &now));
- ASSERT_EQ(ETIMEDOUT, errno);
- }
- void* sleeper(void* arg) {
- bthread_usleep((uint64_t)arg);
- return NULL;
- }
- void* joiner(void* arg) {
- const long t1 = butil::gettimeofday_us();
- for (bthread_t* th = (bthread_t*)arg; *th; ++th) {
- if (0 != bthread_join(*th, NULL)) {
- LOG(FATAL) << "fail to join thread_" << th - (bthread_t*)arg;
- }
- long elp = butil::gettimeofday_us() - t1;
- EXPECT_LE(labs(elp - (th - (bthread_t*)arg + 1) * 100000L), 15000L)
- << "timeout when joining thread_" << th - (bthread_t*)arg;
- LOG(INFO) << "Joined thread " << *th << " at " << elp << "us ["
- << bthread_self() << "]";
- }
- for (bthread_t* th = (bthread_t*)arg; *th; ++th) {
- EXPECT_EQ(0, bthread_join(*th, NULL));
- }
- return NULL;
- }
- struct A {
- uint64_t a;
- char dummy[0];
- };
- struct B {
- uint64_t a;
- };
- TEST(ButexTest, with_or_without_array_zero) {
- ASSERT_EQ(sizeof(B), sizeof(A));
- }
- TEST(ButexTest, join) {
- const size_t N = 6;
- const size_t M = 6;
- bthread_t th[N+1];
- bthread_t jth[M];
- pthread_t pth[M];
- for (size_t i = 0; i < N; ++i) {
- bthread_attr_t attr = (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
- ASSERT_EQ(0, bthread_start_urgent(
- &th[i], &attr, sleeper,
- (void*)(100000L/*100ms*/ * (i + 1))));
- }
- th[N] = 0; // joiner will join tids in `th' until seeing 0.
- for (size_t i = 0; i < M; ++i) {
- ASSERT_EQ(0, bthread_start_urgent(&jth[i], NULL, joiner, th));
- }
- for (size_t i = 0; i < M; ++i) {
- ASSERT_EQ(0, pthread_create(&pth[i], NULL, joiner, th));
- }
-
- for (size_t i = 0; i < M; ++i) {
- ASSERT_EQ(0, bthread_join(jth[i], NULL))
- << "i=" << i << " error=" << berror();
- }
- for (size_t i = 0; i < M; ++i) {
- ASSERT_EQ(0, pthread_join(pth[i], NULL));
- }
- }
- struct WaiterArg {
- int expected_result;
- int expected_value;
- butil::atomic<int> *butex;
- const timespec *ptimeout;
- };
- void* waiter(void* arg) {
- WaiterArg * wa = (WaiterArg*)arg;
- const long t1 = butil::gettimeofday_us();
- const int rc = bthread::butex_wait(
- wa->butex, wa->expected_value, wa->ptimeout);
- const long t2 = butil::gettimeofday_us();
- if (rc == 0) {
- EXPECT_EQ(wa->expected_result, 0) << bthread_self();
- } else {
- EXPECT_EQ(wa->expected_result, errno) << bthread_self();
- }
- LOG(INFO) << "after wait, time=" << (t2-t1) << "us";
- return NULL;
- }
- TEST(ButexTest, sanity) {
- const size_t N = 5;
- WaiterArg args[N * 4];
- pthread_t t1, t2;
- butil::atomic<int>* b1 =
- bthread::butex_create_checked<butil::atomic<int> >();
- ASSERT_TRUE(b1);
- bthread::butex_destroy(b1);
-
- b1 = bthread::butex_create_checked<butil::atomic<int> >();
- *b1 = 1;
- ASSERT_EQ(0, bthread::butex_wake(b1));
- WaiterArg *unmatched_arg = new WaiterArg;
- unmatched_arg->expected_value = *b1 + 1;
- unmatched_arg->expected_result = EWOULDBLOCK;
- unmatched_arg->butex = b1;
- unmatched_arg->ptimeout = NULL;
- pthread_create(&t2, NULL, waiter, unmatched_arg);
- bthread_t th;
- ASSERT_EQ(0, bthread_start_urgent(&th, NULL, waiter, unmatched_arg));
- const timespec abstime = butil::seconds_from_now(1);
- for (size_t i = 0; i < 4*N; ++i) {
- args[i].expected_value = *b1;
- args[i].butex = b1;
- if ((i % 2) == 0) {
- args[i].expected_result = 0;
- args[i].ptimeout = NULL;
- } else {
- args[i].expected_result = ETIMEDOUT;
- args[i].ptimeout = &abstime;
- }
- if (i < 2*N) {
- pthread_create(&t1, NULL, waiter, &args[i]);
- } else {
- ASSERT_EQ(0, bthread_start_urgent(&th, NULL, waiter, &args[i]));
- }
- }
-
- sleep(2);
- for (size_t i = 0; i < 2*N; ++i) {
- ASSERT_EQ(1, bthread::butex_wake(b1));
- }
- ASSERT_EQ(0, bthread::butex_wake(b1));
- sleep(1);
- bthread::butex_destroy(b1);
- }
- struct ButexWaitArg {
- int* butex;
- int expected_val;
- long wait_msec;
- int error_code;
- };
- void* wait_butex(void* void_arg) {
- ButexWaitArg* arg = static_cast<ButexWaitArg*>(void_arg);
- const timespec ts = butil::milliseconds_from_now(arg->wait_msec);
- int rc = bthread::butex_wait(arg->butex, arg->expected_val, &ts);
- int saved_errno = errno;
- if (arg->error_code) {
- EXPECT_EQ(-1, rc);
- EXPECT_EQ(arg->error_code, saved_errno);
- } else {
- EXPECT_EQ(0, rc);
- }
- return NULL;
- }
- TEST(ButexTest, wait_without_stop) {
- int* butex = bthread::butex_create_checked<int>();
- *butex = 7;
- butil::Timer tm;
- const long WAIT_MSEC = 500;
- for (int i = 0; i < 2; ++i) {
- const bthread_attr_t attr =
- (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
- ButexWaitArg arg = { butex, *butex, WAIT_MSEC, ETIMEDOUT };
- bthread_t th;
-
- tm.start();
- ASSERT_EQ(0, bthread_start_urgent(&th, &attr, wait_butex, &arg));
- ASSERT_EQ(0, bthread_join(th, NULL));
- tm.stop();
-
- ASSERT_LT(labs(tm.m_elapsed() - WAIT_MSEC), 250);
- }
- bthread::butex_destroy(butex);
- }
- TEST(ButexTest, stop_after_running) {
- int* butex = bthread::butex_create_checked<int>();
- *butex = 7;
- butil::Timer tm;
- const long WAIT_MSEC = 500;
- const long SLEEP_MSEC = 10;
- for (int i = 0; i < 2; ++i) {
- const bthread_attr_t attr =
- (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
- bthread_t th;
- ButexWaitArg arg = { butex, *butex, WAIT_MSEC, EINTR };
- tm.start();
- ASSERT_EQ(0, bthread_start_urgent(&th, &attr, wait_butex, &arg));
- ASSERT_EQ(0, bthread_usleep(SLEEP_MSEC * 1000L));
- ASSERT_EQ(0, bthread_stop(th));
- ASSERT_EQ(0, bthread_join(th, NULL));
- tm.stop();
- ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 25);
- // ASSERT_TRUE(bthread::get_task_control()->
- // timer_thread()._idset.empty());
- ASSERT_EQ(EINVAL, bthread_stop(th));
- }
- bthread::butex_destroy(butex);
- }
- TEST(ButexTest, stop_before_running) {
- int* butex = bthread::butex_create_checked<int>();
- *butex = 7;
- butil::Timer tm;
- const long WAIT_MSEC = 500;
- for (int i = 0; i < 2; ++i) {
- const bthread_attr_t attr =
- (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
- bthread_t th;
- ButexWaitArg arg = { butex, *butex, WAIT_MSEC, EINTR };
-
- tm.start();
- ASSERT_EQ(0, bthread_start_background(&th, &attr, wait_butex, &arg));
- ASSERT_EQ(0, bthread_stop(th));
- bthread_flush();
- ASSERT_EQ(0, bthread_join(th, NULL));
- tm.stop();
-
- ASSERT_LT(tm.m_elapsed(), 5);
- // ASSERT_TRUE(bthread::get_task_control()->
- // timer_thread()._idset.empty());
- ASSERT_EQ(EINVAL, bthread_stop(th));
- }
- bthread::butex_destroy(butex);
- }
- void* join_the_waiter(void* arg) {
- EXPECT_EQ(0, bthread_join((bthread_t)arg, NULL));
- return NULL;
- }
- TEST(ButexTest, join_cant_be_wakeup) {
- const long WAIT_MSEC = 100;
- int* butex = bthread::butex_create_checked<int>();
- *butex = 7;
- butil::Timer tm;
- ButexWaitArg arg = { butex, *butex, 1000, EINTR };
- for (int i = 0; i < 2; ++i) {
- const bthread_attr_t attr =
- (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
- tm.start();
- bthread_t th, th2;
- ASSERT_EQ(0, bthread_start_urgent(&th, NULL, wait_butex, &arg));
- ASSERT_EQ(0, bthread_start_urgent(&th2, &attr, join_the_waiter, (void*)th));
- ASSERT_EQ(0, bthread_stop(th2));
- ASSERT_EQ(0, bthread_usleep(WAIT_MSEC / 2 * 1000L));
- ASSERT_TRUE(bthread::TaskGroup::exists(th));
- ASSERT_TRUE(bthread::TaskGroup::exists(th2));
- ASSERT_EQ(0, bthread_usleep(WAIT_MSEC / 2 * 1000L));
- ASSERT_EQ(0, bthread_stop(th));
- ASSERT_EQ(0, bthread_join(th2, NULL));
- ASSERT_EQ(0, bthread_join(th, NULL));
- tm.stop();
- ASSERT_LT(tm.m_elapsed(), WAIT_MSEC + 15);
- ASSERT_EQ(EINVAL, bthread_stop(th));
- ASSERT_EQ(EINVAL, bthread_stop(th2));
- }
- bthread::butex_destroy(butex);
- }
- TEST(ButexTest, stop_after_slept) {
- butil::Timer tm;
- const long SLEEP_MSEC = 100;
- const long WAIT_MSEC = 10;
-
- for (int i = 0; i < 2; ++i) {
- const bthread_attr_t attr =
- (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
- tm.start();
- bthread_t th;
- ASSERT_EQ(0, bthread_start_urgent(
- &th, &attr, sleeper, (void*)(SLEEP_MSEC*1000L)));
- ASSERT_EQ(0, bthread_usleep(WAIT_MSEC * 1000L));
- ASSERT_EQ(0, bthread_stop(th));
- ASSERT_EQ(0, bthread_join(th, NULL));
- tm.stop();
- if (attr.stack_type == BTHREAD_STACKTYPE_PTHREAD) {
- ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 15);
- } else {
- ASSERT_LT(labs(tm.m_elapsed() - WAIT_MSEC), 15);
- }
- // ASSERT_TRUE(bthread::get_task_control()->
- // timer_thread()._idset.empty());
- ASSERT_EQ(EINVAL, bthread_stop(th));
- }
- }
- TEST(ButexTest, stop_just_when_sleeping) {
- butil::Timer tm;
- const long SLEEP_MSEC = 100;
-
- for (int i = 0; i < 2; ++i) {
- const bthread_attr_t attr =
- (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
- tm.start();
- bthread_t th;
- ASSERT_EQ(0, bthread_start_urgent(
- &th, &attr, sleeper, (void*)(SLEEP_MSEC*1000L)));
- ASSERT_EQ(0, bthread_stop(th));
- ASSERT_EQ(0, bthread_join(th, NULL));
- tm.stop();
- if (attr.stack_type == BTHREAD_STACKTYPE_PTHREAD) {
- ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 15);
- } else {
- ASSERT_LT(tm.m_elapsed(), 15);
- }
- // ASSERT_TRUE(bthread::get_task_control()->
- // timer_thread()._idset.empty());
- ASSERT_EQ(EINVAL, bthread_stop(th));
- }
- }
- TEST(ButexTest, stop_before_sleeping) {
- butil::Timer tm;
- const long SLEEP_MSEC = 100;
- for (int i = 0; i < 2; ++i) {
- bthread_t th;
- const bthread_attr_t attr =
- (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
-
- tm.start();
- ASSERT_EQ(0, bthread_start_background(&th, &attr, sleeper,
- (void*)(SLEEP_MSEC*1000L)));
- ASSERT_EQ(0, bthread_stop(th));
- bthread_flush();
- ASSERT_EQ(0, bthread_join(th, NULL));
- tm.stop();
- if (attr.stack_type == BTHREAD_STACKTYPE_PTHREAD) {
- ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 10);
- } else {
- ASSERT_LT(tm.m_elapsed(), 10);
- }
- // ASSERT_TRUE(bthread::get_task_control()->
- // timer_thread()._idset.empty());
- ASSERT_EQ(EINVAL, bthread_stop(th));
- }
- }
- } // namespace
|