// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // brpc - A framework to host and access services throughout Baidu. // Date: Sun Jul 13 15:04:18 CST 2014 #include #include #include // #include #include "butil/gperftools_profiler.h" #include "butil/time.h" #include "butil/macros.h" #include "butil/fd_utility.h" #include "butil/fd_guard.h" #include "butil/unix_socket.h" #include "brpc/acceptor.h" #include "brpc/policy/hulu_pbrpc_protocol.h" void EmptyProcessHuluRequest(brpc::InputMessageBase* msg_base) { brpc::DestroyingPtr a(msg_base); } int main(int argc, char* argv[]) { testing::InitGoogleTest(&argc, argv); brpc::Protocol dummy_protocol = { brpc::policy::ParseHuluMessage, brpc::SerializeRequestDefault, brpc::policy::PackHuluRequest, EmptyProcessHuluRequest, EmptyProcessHuluRequest, NULL, NULL, NULL, brpc::CONNECTION_TYPE_ALL, "dummy_hulu" }; EXPECT_EQ(0, RegisterProtocol((brpc::ProtocolType)30, dummy_protocol)); return RUN_ALL_TESTS(); } class MessengerTest : public ::testing::Test{ protected: MessengerTest(){ }; virtual ~MessengerTest(){}; virtual void SetUp() { }; virtual void TearDown() { }; }; #define USE_UNIX_DOMAIN_SOCKET 1 const size_t NEPOLL = 1; const size_t NCLIENT = 6; const size_t NMESSAGE = 1024; const size_t MESSAGE_SIZE = 32; inline uint32_t fmix32 ( uint32_t h ) { h ^= h >> 16; h *= 0x85ebca6b; h ^= h >> 13; h *= 0xc2b2ae35; h ^= h >> 16; return h; } volatile bool client_stop = false; struct BAIDU_CACHELINE_ALIGNMENT ClientMeta { size_t times; size_t bytes; }; butil::atomic client_index(0); void* client_thread(void* arg) { ClientMeta* m = (ClientMeta*)arg; size_t offset = 0; m->times = 0; m->bytes = 0; const size_t buf_cap = NMESSAGE * MESSAGE_SIZE; char* buf = (char*)malloc(buf_cap); for (size_t i = 0; i < NMESSAGE; ++i) { memcpy(buf + i * MESSAGE_SIZE, "HULU", 4); // HULU use host byte order directly... *(uint32_t*)(buf + i * MESSAGE_SIZE + 4) = MESSAGE_SIZE - 12; *(uint32_t*)(buf + i * MESSAGE_SIZE + 8) = 4; } #ifdef USE_UNIX_DOMAIN_SOCKET const size_t id = client_index.fetch_add(1); char socket_name[64]; snprintf(socket_name, sizeof(socket_name), "input_messenger.socket%lu", (id % NEPOLL)); butil::fd_guard fd(butil::unix_socket_connect(socket_name)); if (fd < 0) { PLOG(FATAL) << "Fail to connect to " << socket_name; return NULL; } #else butil::EndPoint point(butil::IP_ANY, 7878); butil::fd_guard fd(butil::tcp_connect(point, NULL)); if (fd < 0) { PLOG(FATAL) << "Fail to connect to " << point; return NULL; } #endif while (!client_stop) { ssize_t n; if (offset == 0) { n = write(fd, buf, buf_cap); } else { iovec v[2]; v[0].iov_base = buf + offset; v[0].iov_len = buf_cap - offset; v[1].iov_base = buf; v[1].iov_len = offset; n = writev(fd, v, 2); } if (n < 0) { if (errno != EINTR) { PLOG(FATAL) << "Fail to write fd=" << fd; return NULL; } } else { ++m->times; m->bytes += n; offset += n; if (offset >= buf_cap) { offset -= buf_cap; } } } return NULL; } TEST_F(MessengerTest, dispatch_tasks) { client_stop = false; brpc::Acceptor messenger[NEPOLL]; pthread_t cth[NCLIENT]; ClientMeta* cm[NCLIENT]; const brpc::InputMessageHandler pairs[] = { { brpc::policy::ParseHuluMessage, EmptyProcessHuluRequest, NULL, NULL, "dummy_hulu" } }; for (size_t i = 0; i < NEPOLL; ++i) { #ifdef USE_UNIX_DOMAIN_SOCKET char buf[64]; snprintf(buf, sizeof(buf), "input_messenger.socket%lu", i); int listening_fd = butil::unix_socket_listen(buf); #else int listening_fd = tcp_listen(butil::EndPoint(butil::IP_ANY, 7878)); #endif ASSERT_TRUE(listening_fd > 0); butil::make_non_blocking(listening_fd); ASSERT_EQ(0, messenger[i].AddHandler(pairs[0])); ASSERT_EQ(0, messenger[i].StartAccept(listening_fd, -1, NULL)); } for (size_t i = 0; i < NCLIENT; ++i) { cm[i] = new ClientMeta; cm[i]->times = 0; cm[i]->bytes = 0; ASSERT_EQ(0, pthread_create(&cth[i], NULL, client_thread, cm[i])); } sleep(1); LOG(INFO) << "Begin to profile... (5 seconds)"; ProfilerStart("input_messenger.prof"); size_t start_client_bytes = 0; for (size_t i = 0; i < NCLIENT; ++i) { start_client_bytes += cm[i]->bytes; } butil::Timer tm; tm.start(); sleep(5); tm.stop(); ProfilerStop(); LOG(INFO) << "End profiling"; client_stop = true; size_t client_bytes = 0; for (size_t i = 0; i < NCLIENT; ++i) { client_bytes += cm[i]->bytes; } LOG(INFO) << "client_tp=" << (client_bytes - start_client_bytes) / (double)tm.u_elapsed() << "MB/s client_msg=" << (client_bytes - start_client_bytes) * 1000000L / (MESSAGE_SIZE * tm.u_elapsed()) << "/s"; for (size_t i = 0; i < NCLIENT; ++i) { pthread_join(cth[i], NULL); printf("joined client %lu\n", i); } for (size_t i = 0; i < NEPOLL; ++i) { messenger[i].StopAccept(0); } sleep(1); LOG(WARNING) << "begin to exit!!!!"; }