// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include #include #include #include #include #include #include "butil/compat.h" #include "butil/time.h" #include "butil/macros.h" #include "butil/errno.h" #include #include #include "bthread/bthread.h" #include "butil/atomicops.h" namespace { DEFINE_int32(thread_num, 1, "#pairs of threads doing ping pong"); DEFINE_bool(loop, false, "run until ctrl-C is pressed"); DEFINE_bool(use_futex, false, "use futex instead of pipe"); DEFINE_bool(use_butex, false, "use butex instead of pipe"); void ALLOW_UNUSED (*ignore_sigpipe)(int) = signal(SIGPIPE, SIG_IGN); volatile bool stop = false; void quit_handler(int) { stop = true; } struct BAIDU_CACHELINE_ALIGNMENT AlignedIntWrapper { int value; }; struct BAIDU_CACHELINE_ALIGNMENT PlayerArg { int read_fd; int write_fd; int* wait_addr; int* wake_addr; long counter; long wakeup; }; void* pipe_player(void* void_arg) { PlayerArg* arg = static_cast(void_arg); char dummy = '\0'; while (1) { ssize_t nr = read(arg->read_fd, &dummy, 1); if (nr <= 0) { if (nr == 0) { printf("[%" PRIu64 "] EOF\n", pthread_numeric_id()); break; } if (errno != EINTR) { printf("[%" PRIu64 "] bad read, %m\n", pthread_numeric_id()); break; } continue; } if (1L != write(arg->write_fd, &dummy, 1)) { printf("[%" PRIu64 "] bad write, %m\n", pthread_numeric_id()); break; } ++arg->counter; } return NULL; } static const int INITIAL_FUTEX_VALUE = 0; void* futex_player(void* void_arg) { PlayerArg* arg = static_cast(void_arg); int counter = INITIAL_FUTEX_VALUE; while (!stop) { int rc = bthread::futex_wait_private(arg->wait_addr, counter, NULL); ++counter; ++*arg->wake_addr; bthread::futex_wake_private(arg->wake_addr, 1); ++arg->counter; arg->wakeup += (rc == 0); } return NULL; } void* butex_player(void* void_arg) { PlayerArg* arg = static_cast(void_arg); int counter = INITIAL_FUTEX_VALUE; while (!stop) { int rc = bthread::butex_wait(arg->wait_addr, counter, NULL); ++counter; ++*arg->wake_addr; bthread::butex_wake(arg->wake_addr); ++arg->counter; arg->wakeup += (rc == 0); } return NULL; } TEST(PingPongTest, ping_pong) { signal(SIGINT, quit_handler); stop = false; PlayerArg* args[FLAGS_thread_num]; for (int i = 0; i < FLAGS_thread_num; ++i) { int pipe1[2]; int pipe2[2]; if (!FLAGS_use_futex && !FLAGS_use_butex) { ASSERT_EQ(0, pipe(pipe1)); ASSERT_EQ(0, pipe(pipe2)); } PlayerArg* arg1 = new PlayerArg; if (!FLAGS_use_futex && !FLAGS_use_butex) { arg1->read_fd = pipe1[0]; arg1->write_fd = pipe2[1]; } else if (FLAGS_use_futex) { AlignedIntWrapper* w1 = new AlignedIntWrapper; w1->value = INITIAL_FUTEX_VALUE; AlignedIntWrapper* w2 = new AlignedIntWrapper; w2->value = INITIAL_FUTEX_VALUE; arg1->wait_addr = &w1->value; arg1->wake_addr = &w2->value; } else if (FLAGS_use_butex) { arg1->wait_addr = bthread::butex_create_checked(); *arg1->wait_addr = INITIAL_FUTEX_VALUE; arg1->wake_addr = bthread::butex_create_checked(); *arg1->wake_addr = INITIAL_FUTEX_VALUE; } else { ASSERT_TRUE(false); } arg1->counter = 0; arg1->wakeup = 0; args[i] = arg1; PlayerArg* arg2 = new PlayerArg; if (!FLAGS_use_futex && !FLAGS_use_butex) { arg2->read_fd = pipe2[0]; arg2->write_fd = pipe1[1]; } else { arg2->wait_addr = arg1->wake_addr; arg2->wake_addr = arg1->wait_addr; } arg2->counter = 0; arg2->wakeup = 0; pthread_t th1, th2; bthread_t bth1, bth2; if (!FLAGS_use_futex && !FLAGS_use_butex) { ASSERT_EQ(0, pthread_create(&th1, NULL, pipe_player, arg1)); ASSERT_EQ(0, pthread_create(&th2, NULL, pipe_player, arg2)); } else if (FLAGS_use_futex) { ASSERT_EQ(0, pthread_create(&th1, NULL, futex_player, arg1)); ASSERT_EQ(0, pthread_create(&th2, NULL, futex_player, arg2)); } else if (FLAGS_use_butex) { ASSERT_EQ(0, bthread_start_background(&bth1, NULL, butex_player, arg1)); ASSERT_EQ(0, bthread_start_background(&bth2, NULL, butex_player, arg2)); } else { ASSERT_TRUE(false); } if (!FLAGS_use_futex && !FLAGS_use_butex) { // send the seed data. unsigned char seed = 255; ASSERT_EQ(1L, write(pipe1[1], &seed, 1)); } else if (FLAGS_use_futex) { ++*arg1->wait_addr; bthread::futex_wake_private(arg1->wait_addr, 1); } else if (FLAGS_use_butex) { ++*arg1->wait_addr; bthread::butex_wake(arg1->wait_addr); } else { ASSERT_TRUE(false); } } long last_counter = 0; long last_wakeup = 0; while (!stop) { butil::Timer tm; tm.start(); sleep(1); tm.stop(); long cur_counter = 0; long cur_wakeup = 0; for (int i = 0; i < FLAGS_thread_num; ++i) { cur_counter += args[i]->counter; cur_wakeup += args[i]->wakeup; } if (FLAGS_use_futex || FLAGS_use_butex) { printf("pingpong-ed %" PRId64 "/s, wakeup=%" PRId64 "/s\n", (cur_counter - last_counter) * 1000L / tm.m_elapsed(), (cur_wakeup - last_wakeup) * 1000L / tm.m_elapsed()); } else { printf("pingpong-ed %" PRId64 "/s\n", (cur_counter - last_counter) * 1000L / tm.m_elapsed()); } last_counter = cur_counter; last_wakeup = cur_wakeup; if (!FLAGS_loop) { break; } } stop = true; // Program quits, Let resource leak. } } // namespace