12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016 |
- // Licensed to the Apache Software Foundation (ASF) under one
- // or more contributor license agreements. See the NOTICE file
- // distributed with this work for additional information
- // regarding copyright ownership. The ASF licenses this file
- // to you under the Apache License, Version 2.0 (the
- // "License"); you may not use this file except in compliance
- // with the License. You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing,
- // software distributed under the License is distributed on an
- // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- // KIND, either express or implied. See the License for the
- // specific language governing permissions and limitations
- // under the License.
- // brpc - A framework to host and access services throughout Baidu.
- // Date: Sun Jul 13 15:04:18 CST 2014
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <fcntl.h> // F_GETFD
- #include <gtest/gtest.h>
- #include <gflags/gflags.h>
- #include "butil/gperftools_profiler.h"
- #include "butil/time.h"
- #include "butil/macros.h"
- #include "butil/fd_utility.h"
- #include "bthread/unstable.h"
- #include "bthread/task_control.h"
- #include "brpc/socket.h"
- #include "brpc/errno.pb.h"
- #include "brpc/acceptor.h"
- #include "brpc/policy/hulu_pbrpc_protocol.h"
- #include "brpc/policy/most_common_message.h"
- #include "brpc/policy/http_rpc_protocol.h"
- #include "brpc/nshead.h"
- #include "brpc/server.h"
- #include "brpc/channel.h"
- #include "brpc/controller.h"
- #include "health_check.pb.h"
- #if defined(OS_MACOSX)
- #include <sys/event.h>
- #endif
- #define CONNECT_IN_KEEPWRITE 1;
- namespace bthread {
- extern TaskControl* g_task_control;
- }
- namespace brpc {
- DECLARE_int32(health_check_interval);
- }
- void EchoProcessHuluRequest(brpc::InputMessageBase* msg_base);
- int main(int argc, char* argv[]) {
- testing::InitGoogleTest(&argc, argv);
- GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
- brpc::Protocol dummy_protocol =
- { brpc::policy::ParseHuluMessage,
- brpc::SerializeRequestDefault,
- brpc::policy::PackHuluRequest,
- EchoProcessHuluRequest, EchoProcessHuluRequest,
- NULL, NULL, NULL,
- brpc::CONNECTION_TYPE_ALL, "dummy_hulu" };
- EXPECT_EQ(0, RegisterProtocol((brpc::ProtocolType)30, dummy_protocol));
- return RUN_ALL_TESTS();
- }
- struct WaitData {
- bthread_id_t id;
- int error_code;
- std::string error_text;
- WaitData() : id(INVALID_BTHREAD_ID), error_code(0) {}
- };
- int OnWaitIdReset(bthread_id_t id, void* data, int error_code,
- const std::string& error_text) {
- static_cast<WaitData*>(data)->id = id;
- static_cast<WaitData*>(data)->error_code = error_code;
- static_cast<WaitData*>(data)->error_text = error_text;
- return bthread_id_unlock_and_destroy(id);
- }
- class SocketTest : public ::testing::Test{
- protected:
- SocketTest(){
- };
- virtual ~SocketTest(){};
- virtual void SetUp() {
- };
- virtual void TearDown() {
- };
- };
- brpc::Socket* global_sock = NULL;
- class CheckRecycle : public brpc::SocketUser {
- void BeforeRecycle(brpc::Socket* s) {
- ASSERT_TRUE(global_sock);
- ASSERT_EQ(global_sock, s);
- global_sock = NULL;
- delete this;
- }
- };
- TEST_F(SocketTest, not_recycle_until_zero_nref) {
- std::cout << "sizeof(Socket)=" << sizeof(brpc::Socket) << std::endl;
- int fds[2];
- ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
- brpc::SocketId id = 8888;
- butil::EndPoint dummy;
- ASSERT_EQ(0, str2endpoint("192.168.1.26:8080", &dummy));
- brpc::SocketOptions options;
- options.fd = fds[1];
- options.remote_side = dummy;
- options.user = new CheckRecycle;
- ASSERT_EQ(0, brpc::Socket::Create(options, &id));
- {
- brpc::SocketUniquePtr s;
- ASSERT_EQ(0, brpc::Socket::Address(id, &s));
- global_sock = s.get();
- ASSERT_TRUE(s.get());
- ASSERT_EQ(fds[1], s->fd());
- ASSERT_EQ(dummy, s->remote_side());
- ASSERT_EQ(id, s->id());
- ASSERT_EQ(0, s->SetFailed());
- ASSERT_EQ(s.get(), global_sock);
- }
- ASSERT_EQ((brpc::Socket*)NULL, global_sock);
- close(fds[0]);
- brpc::SocketUniquePtr ptr;
- ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr));
- }
- butil::atomic<int> winner_count(0);
- const int AUTH_ERR = -9;
- void* auth_fighter(void* arg) {
- bthread_usleep(10000);
- int auth_error = 0;
- brpc::Socket* s = (brpc::Socket*)arg;
- if (s->FightAuthentication(&auth_error) == 0) {
- winner_count.fetch_add(1);
- s->SetAuthentication(AUTH_ERR);
- } else {
- EXPECT_EQ(AUTH_ERR, auth_error);
- }
- return NULL;
- }
- TEST_F(SocketTest, authentication) {
- brpc::SocketId id;
- brpc::SocketOptions options;
- ASSERT_EQ(0, brpc::Socket::Create(options, &id));
- brpc::SocketUniquePtr s;
- ASSERT_EQ(0, brpc::Socket::Address(id, &s));
-
- bthread_t th[64];
- for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
- ASSERT_EQ(0, bthread_start_urgent(&th[i], NULL, auth_fighter, s.get()));
- }
- for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
- ASSERT_EQ(0, bthread_join(th[i], NULL));
- }
- // Only one fighter wins
- ASSERT_EQ(1, winner_count.load());
- // Fight after signal is OK
- int auth_error = 0;
- ASSERT_NE(0, s->FightAuthentication(&auth_error));
- ASSERT_EQ(AUTH_ERR, auth_error);
- // Socket has been `SetFailed' when authentication failed
- ASSERT_TRUE(brpc::Socket::Address(s->id(), NULL));
- }
- static butil::atomic<int> g_called_seq(1);
- class MyMessage : public brpc::SocketMessage {
- public:
- MyMessage(const char* str, size_t len, int* called = NULL)
- : _str(str), _len(len), _called(called) {}
- private:
- butil::Status AppendAndDestroySelf(butil::IOBuf* out_buf, brpc::Socket*) {
- out_buf->append(_str, _len);
- if (_called) {
- *_called = g_called_seq.fetch_add(1, butil::memory_order_relaxed);
- }
- delete this;
- return butil::Status::OK();
- };
- const char* _str;
- size_t _len;
- int* _called;
- };
- class MyErrorMessage : public brpc::SocketMessage {
- public:
- explicit MyErrorMessage(const butil::Status& st) : _status(st) {}
- private:
- butil::Status AppendAndDestroySelf(butil::IOBuf*, brpc::Socket*) {
- return _status;
- };
- butil::Status _status;
- };
- TEST_F(SocketTest, single_threaded_write) {
- int fds[2];
- ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
- brpc::SocketId id = 8888;
- butil::EndPoint dummy;
- ASSERT_EQ(0, str2endpoint("192.168.1.26:8080", &dummy));
- brpc::SocketOptions options;
- options.fd = fds[1];
- options.remote_side = dummy;
- options.user = new CheckRecycle;
- ASSERT_EQ(0, brpc::Socket::Create(options, &id));
- {
- brpc::SocketUniquePtr s;
- ASSERT_EQ(0, brpc::Socket::Address(id, &s));
- global_sock = s.get();
- ASSERT_TRUE(s.get());
- ASSERT_EQ(fds[1], s->fd());
- ASSERT_EQ(dummy, s->remote_side());
- ASSERT_EQ(id, s->id());
- const int BATCH = 5;
- for (size_t i = 0; i < 20; ++i) {
- char buf[32 * BATCH];
- size_t len = snprintf(buf, sizeof(buf), "hello world! %lu", i);
- if (i % 4 == 0) {
- brpc::SocketMessagePtr<MyMessage> msg(new MyMessage(buf, len));
- ASSERT_EQ(0, s->Write(msg));
- } else if (i % 4 == 1) {
- brpc::SocketMessagePtr<MyErrorMessage> msg(
- new MyErrorMessage(butil::Status(EINVAL, "Invalid input")));
- bthread_id_t wait_id;
- WaitData data;
- ASSERT_EQ(0, bthread_id_create2(&wait_id, &data, OnWaitIdReset));
- brpc::Socket::WriteOptions wopt;
- wopt.id_wait = wait_id;
- ASSERT_EQ(0, s->Write(msg, &wopt));
- ASSERT_EQ(0, bthread_id_join(wait_id));
- ASSERT_EQ(wait_id.value, data.id.value);
- ASSERT_EQ(EINVAL, data.error_code);
- ASSERT_EQ("Invalid input", data.error_text);
- continue;
- } else if (i % 4 == 2) {
- int seq[BATCH] = {};
- brpc::SocketMessagePtr<MyMessage> msgs[BATCH];
- // re-print the buffer.
- len = 0;
- for (int j = 0; j < BATCH; ++j) {
- if (j % 2 == 0) {
- // Empty message, should be skipped.
- msgs[j].reset(new MyMessage(buf+len, 0, &seq[j]));
- } else {
- size_t sub_len = snprintf(
- buf+len, sizeof(buf)-len, "hello world! %lu.%d", i, j);
- msgs[j].reset(new MyMessage(buf+len, sub_len, &seq[j]));
- len += sub_len;
- }
- }
- for (size_t i = 0; i < BATCH; ++i) {
- ASSERT_EQ(0, s->Write(msgs[i]));
- }
- for (int j = 1; j < BATCH; ++j) {
- ASSERT_LT(seq[j-1], seq[j]) << "j=" << j;
- }
- } else {
- butil::IOBuf src;
- src.append(buf);
- ASSERT_EQ(len, src.length());
- ASSERT_EQ(0, s->Write(&src));
- ASSERT_TRUE(src.empty());
- }
- char dest[sizeof(buf)];
- ASSERT_EQ(len, (size_t)read(fds[0], dest, sizeof(dest)));
- ASSERT_EQ(0, memcmp(buf, dest, len));
- }
- ASSERT_EQ(0, s->SetFailed());
- }
- ASSERT_EQ((brpc::Socket*)NULL, global_sock);
- close(fds[0]);
- }
- void EchoProcessHuluRequest(brpc::InputMessageBase* msg_base) {
- brpc::DestroyingPtr<brpc::policy::MostCommonMessage> msg(
- static_cast<brpc::policy::MostCommonMessage*>(msg_base));
- butil::IOBuf buf;
- buf.append(msg->meta);
- buf.append(msg->payload);
- ASSERT_EQ(0, msg->socket()->Write(&buf));
- }
- class MyConnect : public brpc::AppConnect {
- public:
- MyConnect() : _done(NULL), _data(NULL), _called_start_connect(false) {}
- void StartConnect(const brpc::Socket*,
- void (*done)(int err, void* data),
- void* data) {
- LOG(INFO) << "Start application-level connect";
- _done = done;
- _data = data;
- _called_start_connect = true;
- }
- void StopConnect(brpc::Socket*) {
- LOG(INFO) << "Stop application-level connect";
- }
- void MakeConnectDone() {
- _done(0, _data);
- }
- bool is_start_connect_called() const { return _called_start_connect; }
- private:
- void (*_done)(int err, void* data);
- void* _data;
- bool _called_start_connect;
- };
- TEST_F(SocketTest, single_threaded_connect_and_write) {
- // FIXME(gejun): Messenger has to be new otherwise quitting may crash.
- brpc::Acceptor* messenger = new brpc::Acceptor;
- const brpc::InputMessageHandler pairs[] = {
- { brpc::policy::ParseHuluMessage,
- EchoProcessHuluRequest, NULL, NULL, "dummy_hulu" }
- };
- butil::EndPoint point(butil::IP_ANY, 7878);
- int listening_fd = tcp_listen(point);
- ASSERT_TRUE(listening_fd > 0);
- butil::make_non_blocking(listening_fd);
- ASSERT_EQ(0, messenger->AddHandler(pairs[0]));
- ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL));
- brpc::SocketId id = 8888;
- brpc::SocketOptions options;
- options.remote_side = point;
- std::shared_ptr<MyConnect> my_connect = std::make_shared<MyConnect>();
- options.app_connect = my_connect;
- options.user = new CheckRecycle;
- ASSERT_EQ(0, brpc::Socket::Create(options, &id));
- {
- brpc::SocketUniquePtr s;
- ASSERT_EQ(0, brpc::Socket::Address(id, &s));
- global_sock = s.get();
- ASSERT_TRUE(s.get());
- ASSERT_EQ(-1, s->fd());
- ASSERT_EQ(point, s->remote_side());
- ASSERT_EQ(id, s->id());
- for (size_t i = 0; i < 20; ++i) {
- char buf[64];
- const size_t meta_len = 4;
- *(uint32_t*)(buf + 12) = *(uint32_t*)"Meta";
- const size_t len = snprintf(buf + 12 + meta_len,
- sizeof(buf) - 12 - meta_len,
- "hello world! %lu", i);
- memcpy(buf, "HULU", 4);
- // HULU uses host byte order directly...
- *(uint32_t*)(buf + 4) = len + meta_len;
- *(uint32_t*)(buf + 8) = meta_len;
- int called = 0;
- if (i % 2 == 0) {
- brpc::SocketMessagePtr<MyMessage> msg(
- new MyMessage(buf, 12 + meta_len + len, &called));
- ASSERT_EQ(0, s->Write(msg));
- } else {
- butil::IOBuf src;
- src.append(buf, 12 + meta_len + len);
- ASSERT_EQ(12 + meta_len + len, src.length());
- ASSERT_EQ(0, s->Write(&src));
- ASSERT_TRUE(src.empty());
- }
- if (i == 0) {
- // connection needs to be established at first time.
- // Should be intentionally blocked in app_connect.
- bthread_usleep(10000);
- ASSERT_TRUE(my_connect->is_start_connect_called());
- ASSERT_LT(0, s->fd()); // already tcp connected
- ASSERT_EQ(0, called); // request is not serialized yet.
- my_connect->MakeConnectDone();
- ASSERT_LT(0, called); // serialized
- }
- int64_t start_time = butil::gettimeofday_us();
- while (s->fd() < 0) {
- bthread_usleep(1000);
- ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L) << "Too long!";
- }
- #if defined(OS_LINUX)
- ASSERT_EQ(0, bthread_fd_wait(s->fd(), EPOLLIN));
- #elif defined(OS_MACOSX)
- ASSERT_EQ(0, bthread_fd_wait(s->fd(), EVFILT_READ));
- #endif
- char dest[sizeof(buf)];
- ASSERT_EQ(meta_len + len, (size_t)read(s->fd(), dest, sizeof(dest)));
- ASSERT_EQ(0, memcmp(buf + 12, dest, meta_len + len));
- }
- ASSERT_EQ(0, s->SetFailed());
- }
- ASSERT_EQ((brpc::Socket*)NULL, global_sock);
- // The id is invalid.
- brpc::SocketUniquePtr ptr;
- ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr));
- messenger->StopAccept(0);
- ASSERT_EQ(-1, messenger->listened_fd());
- ASSERT_EQ(-1, fcntl(listening_fd, F_GETFD));
- ASSERT_EQ(EBADF, errno);
- }
- #define NUMBER_WIDTH 16
- struct WriterArg {
- size_t times;
- size_t offset;
- brpc::SocketId socket_id;
- };
- void* FailedWriter(void* void_arg) {
- WriterArg* arg = static_cast<WriterArg*>(void_arg);
- brpc::SocketUniquePtr sock;
- if (brpc::Socket::Address(arg->socket_id, &sock) < 0) {
- printf("Fail to address SocketId=%" PRIu64 "\n", arg->socket_id);
- return NULL;
- }
- char buf[32];
- for (size_t i = 0; i < arg->times; ++i) {
- bthread_id_t id;
- EXPECT_EQ(0, bthread_id_create(&id, NULL, NULL));
- snprintf(buf, sizeof(buf), "%0" BAIDU_SYMBOLSTR(NUMBER_WIDTH) "lu",
- i + arg->offset);
- butil::IOBuf src;
- src.append(buf);
- brpc::Socket::WriteOptions wopt;
- wopt.id_wait = id;
- sock->Write(&src, &wopt);
- EXPECT_EQ(0, bthread_id_join(id));
- // Only the first connect can see ECONNREFUSED and then
- // calls `SetFailed' making others' error_code=EINVAL
- //EXPECT_EQ(ECONNREFUSED, error_code);
- }
- return NULL;
- }
- TEST_F(SocketTest, fail_to_connect) {
- const size_t REP = 10;
- butil::EndPoint point(butil::IP_ANY, 7563/*not listened*/);
- brpc::SocketId id = 8888;
- brpc::SocketOptions options;
- options.remote_side = point;
- options.user = new CheckRecycle;
- ASSERT_EQ(0, brpc::Socket::Create(options, &id));
- {
- brpc::SocketUniquePtr s;
- ASSERT_EQ(0, brpc::Socket::Address(id, &s));
- global_sock = s.get();
- ASSERT_TRUE(s.get());
- ASSERT_EQ(-1, s->fd());
- ASSERT_EQ(point, s->remote_side());
- ASSERT_EQ(id, s->id());
- pthread_t th[8];
- WriterArg args[ARRAY_SIZE(th)];
- for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
- args[i].times = REP;
- args[i].offset = i * REP;
- args[i].socket_id = id;
- ASSERT_EQ(0, pthread_create(&th[i], NULL, FailedWriter, &args[i]));
- }
- for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
- ASSERT_EQ(0, pthread_join(th[i], NULL));
- }
- ASSERT_EQ(-1, s->SetFailed()); // already SetFailed
- ASSERT_EQ(-1, s->fd());
- }
- // KeepWrite is possibly still running.
- int64_t start_time = butil::gettimeofday_us();
- while (global_sock != NULL) {
- bthread_usleep(1000);
- ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L) << "Too long!";
- }
- ASSERT_EQ(-1, brpc::Socket::Status(id));
- // The id is invalid.
- brpc::SocketUniquePtr ptr;
- ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr));
- }
- TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
- brpc::SocketId id = 8888;
- butil::EndPoint point(butil::IP_ANY, 7584/*not listened*/);
- brpc::SocketOptions options;
- options.remote_side = point;
- options.user = new CheckRecycle;
- options.health_check_interval_s = 1/*s*/;
- ASSERT_EQ(0, brpc::Socket::Create(options, &id));
- {
- brpc::SocketUniquePtr s;
- ASSERT_EQ(0, brpc::Socket::Address(id, &s));
- global_sock = s.get();
- ASSERT_TRUE(s.get());
- ASSERT_EQ(-1, s->fd());
- ASSERT_EQ(point, s->remote_side());
- ASSERT_EQ(id, s->id());
- char buf[64];
- const size_t meta_len = 4;
- *(uint32_t*)(buf + 12) = *(uint32_t*)"Meta";
- const size_t len = snprintf(buf + 12 + meta_len,
- sizeof(buf) - 12 - meta_len,
- "hello world!");
- memcpy(buf, "HULU", 4);
- // HULU uses host byte order directly...
- *(uint32_t*)(buf + 4) = len + meta_len;
- *(uint32_t*)(buf + 8) = meta_len;
- butil::IOBuf src;
- src.append(buf, 12 + meta_len + len);
- ASSERT_EQ(12 + meta_len + len, src.length());
- #ifdef CONNECT_IN_KEEPWRITE
- bthread_id_t wait_id;
- WaitData data;
- ASSERT_EQ(0, bthread_id_create2(&wait_id, &data, OnWaitIdReset));
- brpc::Socket::WriteOptions wopt;
- wopt.id_wait = wait_id;
- ASSERT_EQ(0, s->Write(&src, &wopt));
- ASSERT_EQ(0, bthread_id_join(wait_id));
- ASSERT_EQ(wait_id.value, data.id.value);
- ASSERT_EQ(ECONNREFUSED, data.error_code);
- ASSERT_TRUE(butil::StringPiece(data.error_text).starts_with(
- "Fail to connect "));
- #else
- ASSERT_EQ(-1, s->Write(&src));
- ASSERT_EQ(ECONNREFUSED, errno);
- #endif
- ASSERT_TRUE(src.empty());
- ASSERT_EQ(-1, s->fd());
- }
- // HealthCheckThread is possibly still running. Spin until global_sock
- // is NULL(set in CheckRecycle::BeforeRecycle). Notice that you should
- // not spin until Socket::Status(id) becomes -1 and assert global_sock
- // to be NULL because invalidating id happens before calling BeforeRecycle.
- const int64_t start_time = butil::gettimeofday_us();
- while (global_sock != NULL) {
- bthread_usleep(1000);
- ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L);
- }
- ASSERT_EQ(-1, brpc::Socket::Status(id));
- }
- class HealthCheckTestServiceImpl : public test::HealthCheckTestService {
- public:
- HealthCheckTestServiceImpl()
- : _sleep_flag(true) {}
- virtual ~HealthCheckTestServiceImpl() {}
- virtual void default_method(google::protobuf::RpcController* cntl_base,
- const test::HealthCheckRequest* request,
- test::HealthCheckResponse* response,
- google::protobuf::Closure* done) {
- brpc::ClosureGuard done_guard(done);
- brpc::Controller* cntl = (brpc::Controller*)cntl_base;
- if (_sleep_flag) {
- bthread_usleep(510000 /* 510ms, a little bit longer than the default
- timeout of health check rpc */);
- }
- cntl->response_attachment().append("OK");
- }
- bool _sleep_flag;
- };
- TEST_F(SocketTest, app_level_health_check) {
- int old_health_check_interval = brpc::FLAGS_health_check_interval;
- GFLAGS_NS::SetCommandLineOption("health_check_path", "/HealthCheckTestService");
- GFLAGS_NS::SetCommandLineOption("health_check_interval", "1");
- butil::EndPoint point(butil::IP_ANY, 7777);
- brpc::ChannelOptions options;
- options.protocol = "http";
- options.max_retry = 0;
- brpc::Channel channel;
- ASSERT_EQ(0, channel.Init(point, &options));
- {
- brpc::Controller cntl;
- cntl.http_request().uri() = "/";
- channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
- EXPECT_TRUE(cntl.Failed());
- ASSERT_EQ(ECONNREFUSED, cntl.ErrorCode());
- }
- // 2s to make sure remote is connected by HealthCheckTask and enter the
- // sending-rpc state. Because the remote is not down, so hc rpc would keep
- // sending.
- int listening_fd = tcp_listen(point);
- bthread_usleep(2000000);
- // 2s to make sure HealthCheckTask find socket is failed and correct impl
- // should trigger next round of hc
- close(listening_fd);
- bthread_usleep(2000000);
-
- brpc::Server server;
- HealthCheckTestServiceImpl hc_service;
- ASSERT_EQ(0, server.AddService(&hc_service, brpc::SERVER_DOESNT_OWN_SERVICE));
- ASSERT_EQ(0, server.Start(point, NULL));
- for (int i = 0; i < 4; ++i) {
- // although ::connect would succeed, the stall in hc_service makes
- // the health check rpc fail.
- brpc::Controller cntl;
- cntl.http_request().uri() = "/";
- channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
- ASSERT_EQ(EHOSTDOWN, cntl.ErrorCode());
- bthread_usleep(1000000 /*1s*/);
- }
- hc_service._sleep_flag = false;
- bthread_usleep(2000000 /* a little bit longer than hc rpc timeout + hc interval */);
- // should recover now
- {
- brpc::Controller cntl;
- cntl.http_request().uri() = "/";
- channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
- ASSERT_FALSE(cntl.Failed());
- ASSERT_GT(cntl.response_attachment().size(), (size_t)0);
- }
- GFLAGS_NS::SetCommandLineOption("health_check_path", "");
- char hc_buf[8];
- snprintf(hc_buf, sizeof(hc_buf), "%d", old_health_check_interval);
- GFLAGS_NS::SetCommandLineOption("health_check_interval", hc_buf);
- }
- TEST_F(SocketTest, health_check) {
- // FIXME(gejun): Messenger has to be new otherwise quitting may crash.
- brpc::Acceptor* messenger = new brpc::Acceptor;
- brpc::SocketId id = 8888;
- butil::EndPoint point(butil::IP_ANY, 7878);
- const int kCheckInteval = 1;
- brpc::SocketOptions options;
- options.remote_side = point;
- options.user = new CheckRecycle;
- options.health_check_interval_s = kCheckInteval/*s*/;
- ASSERT_EQ(0, brpc::Socket::Create(options, &id));
- brpc::SocketUniquePtr s;
- ASSERT_EQ(0, brpc::Socket::Address(id, &s));
-
- global_sock = s.get();
- ASSERT_TRUE(s.get());
- ASSERT_EQ(-1, s->fd());
- ASSERT_EQ(point, s->remote_side());
- ASSERT_EQ(id, s->id());
- int32_t nref = -1;
- ASSERT_EQ(0, brpc::Socket::Status(id, &nref));
- ASSERT_EQ(2, nref);
- char buf[64];
- const size_t meta_len = 4;
- *(uint32_t*)(buf + 12) = *(uint32_t*)"Meta";
- const size_t len = snprintf(buf + 12 + meta_len,
- sizeof(buf) - 12 - meta_len,
- "hello world!");
- memcpy(buf, "HULU", 4);
- // HULU uses host byte order directly...
- *(uint32_t*)(buf + 4) = len + meta_len;
- *(uint32_t*)(buf + 8) = meta_len;
- const bool use_my_message = (butil::fast_rand_less_than(2) == 0);
- brpc::SocketMessagePtr<MyMessage> msg;
- int appended_msg = 0;
- butil::IOBuf src;
- if (use_my_message) {
- LOG(INFO) << "Use MyMessage";
- msg.reset(new MyMessage(buf, 12 + meta_len + len, &appended_msg));
- } else {
- src.append(buf, 12 + meta_len + len);
- ASSERT_EQ(12 + meta_len + len, src.length());
- }
- #ifdef CONNECT_IN_KEEPWRITE
- bthread_id_t wait_id;
- WaitData data;
- ASSERT_EQ(0, bthread_id_create2(&wait_id, &data, OnWaitIdReset));
- brpc::Socket::WriteOptions wopt;
- wopt.id_wait = wait_id;
- if (use_my_message) {
- ASSERT_EQ(0, s->Write(msg, &wopt));
- } else {
- ASSERT_EQ(0, s->Write(&src, &wopt));
- }
- ASSERT_EQ(0, bthread_id_join(wait_id));
- ASSERT_EQ(wait_id.value, data.id.value);
- ASSERT_EQ(ECONNREFUSED, data.error_code);
- ASSERT_TRUE(butil::StringPiece(data.error_text).starts_with(
- "Fail to connect "));
- if (use_my_message) {
- ASSERT_TRUE(appended_msg);
- }
- #else
- if (use_my_message) {
- ASSERT_EQ(-1, s->Write(msg));
- } else {
- ASSERT_EQ(-1, s->Write(&src));
- }
- ASSERT_EQ(ECONNREFUSED, errno);
- #endif
- ASSERT_TRUE(src.empty());
- ASSERT_EQ(-1, s->fd());
- ASSERT_TRUE(global_sock);
- brpc::SocketUniquePtr invalid_ptr;
- ASSERT_EQ(-1, brpc::Socket::Address(id, &invalid_ptr));
- ASSERT_EQ(1, brpc::Socket::Status(id));
- const brpc::InputMessageHandler pairs[] = {
- { brpc::policy::ParseHuluMessage,
- EchoProcessHuluRequest, NULL, NULL, "dummy_hulu" }
- };
- int listening_fd = tcp_listen(point);
- ASSERT_TRUE(listening_fd > 0);
- butil::make_non_blocking(listening_fd);
- ASSERT_EQ(0, messenger->AddHandler(pairs[0]));
- ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL));
- int64_t start_time = butil::gettimeofday_us();
- nref = -1;
- while (brpc::Socket::Status(id, &nref) != 0) {
- bthread_usleep(1000);
- ASSERT_LT(butil::gettimeofday_us(),
- start_time + kCheckInteval * 1000000L + 100000L/*100ms*/);
- }
- //ASSERT_EQ(2, nref);
- ASSERT_TRUE(global_sock);
- int fd = 0;
- {
- brpc::SocketUniquePtr ptr;
- ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
- ASSERT_NE(0, ptr->fd());
- fd = ptr->fd();
- }
- // SetFailed again, should reconnect and succeed soon.
- ASSERT_EQ(0, s->SetFailed());
- ASSERT_EQ(fd, s->fd());
- start_time = butil::gettimeofday_us();
- while (brpc::Socket::Status(id) != 0) {
- bthread_usleep(1000);
- ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L);
- }
- ASSERT_TRUE(global_sock);
- {
- brpc::SocketUniquePtr ptr;
- ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
- ASSERT_NE(0, ptr->fd());
- }
- s.release()->Dereference();
- // Must stop messenger before SetFailed the id otherwise HealthCheckThread
- // still has chance to get reconnected and revive the id.
- messenger->StopAccept(0);
- ASSERT_EQ(-1, messenger->listened_fd());
- ASSERT_EQ(-1, fcntl(listening_fd, F_GETFD));
- ASSERT_EQ(EBADF, errno);
- ASSERT_EQ(0, brpc::Socket::SetFailed(id));
- // HealthCheckThread is possibly still addressing the Socket.
- start_time = butil::gettimeofday_us();
- while (global_sock != NULL) {
- bthread_usleep(1000);
- ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L);
- }
- ASSERT_EQ(-1, brpc::Socket::Status(id));
- // The id is invalid.
- brpc::SocketUniquePtr ptr;
- ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr));
- }
- void* Writer(void* void_arg) {
- WriterArg* arg = static_cast<WriterArg*>(void_arg);
- brpc::SocketUniquePtr sock;
- if (brpc::Socket::Address(arg->socket_id, &sock) < 0) {
- printf("Fail to address SocketId=%" PRIu64 "\n", arg->socket_id);
- return NULL;
- }
- char buf[32];
- for (size_t i = 0; i < arg->times; ++i) {
- snprintf(buf, sizeof(buf), "%0" BAIDU_SYMBOLSTR(NUMBER_WIDTH) "lu",
- i + arg->offset);
- butil::IOBuf src;
- src.append(buf);
- if (sock->Write(&src) != 0) {
- if (errno == brpc::EOVERCROWDED) {
- // The buf is full, sleep a while and retry.
- bthread_usleep(1000);
- --i;
- continue;
- }
- printf("Fail to write into SocketId=%" PRIu64 ", %s\n",
- arg->socket_id, berror());
- break;
- }
- }
- return NULL;
- }
- TEST_F(SocketTest, multi_threaded_write) {
- const size_t REP = 20000;
- int fds[2];
- for (int k = 0; k < 2; ++k) {
- printf("Round %d\n", k + 1);
- ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
- pthread_t th[8];
- WriterArg args[ARRAY_SIZE(th)];
- std::vector<size_t> result;
- result.reserve(ARRAY_SIZE(th) * REP);
- brpc::SocketId id = 8888;
- butil::EndPoint dummy;
- ASSERT_EQ(0, str2endpoint("192.168.1.26:8080", &dummy));
- brpc::SocketOptions options;
- options.fd = fds[1];
- options.remote_side = dummy;
- options.user = new CheckRecycle;
- ASSERT_EQ(0, brpc::Socket::Create(options, &id));
- brpc::SocketUniquePtr s;
- ASSERT_EQ(0, brpc::Socket::Address(id, &s));
- s->_ssl_state = brpc::SSL_OFF;
- global_sock = s.get();
- ASSERT_TRUE(s.get());
- ASSERT_EQ(fds[1], s->fd());
- ASSERT_EQ(dummy, s->remote_side());
- ASSERT_EQ(id, s->id());
- butil::make_non_blocking(fds[0]);
- for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
- args[i].times = REP;
- args[i].offset = i * REP;
- args[i].socket_id = id;
- ASSERT_EQ(0, pthread_create(&th[i], NULL, Writer, &args[i]));
- }
- if (k == 1) {
- printf("sleep 100ms to block writers\n");
- bthread_usleep(100000);
- }
-
- butil::IOPortal dest;
- const int64_t start_time = butil::gettimeofday_us();
- for (;;) {
- ssize_t nr = dest.append_from_file_descriptor(fds[0], 32768);
- if (nr < 0) {
- if (errno == EINTR) {
- continue;
- }
- if (EAGAIN != errno) {
- ASSERT_EQ(EAGAIN, errno) << berror();
- }
- bthread_usleep(1000);
- if (butil::gettimeofday_us() >= start_time + 2000000L) {
- LOG(FATAL) << "Wait too long!";
- break;
- }
- continue;
- }
- while (dest.length() >= NUMBER_WIDTH) {
- char buf[NUMBER_WIDTH + 1];
- dest.copy_to(buf, NUMBER_WIDTH);
- buf[sizeof(buf)-1] = 0;
- result.push_back(strtol(buf, NULL, 10));
- dest.pop_front(NUMBER_WIDTH);
- }
- if (result.size() >= REP * ARRAY_SIZE(th)) {
- break;
- }
- }
- for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
- ASSERT_EQ(0, pthread_join(th[i], NULL));
- }
- ASSERT_TRUE(dest.empty());
- bthread::g_task_control->print_rq_sizes(std::cout);
- std::cout << std::endl;
- ASSERT_EQ(REP * ARRAY_SIZE(th), result.size())
- << "write_head=" << s->_write_head;
- std::sort(result.begin(), result.end());
- result.resize(std::unique(result.begin(),
- result.end()) - result.begin());
- ASSERT_EQ(REP * ARRAY_SIZE(th), result.size());
- ASSERT_EQ(0UL, *result.begin());
- ASSERT_EQ(REP * ARRAY_SIZE(th) - 1, *(result.end() - 1));
- ASSERT_EQ(0, s->SetFailed());
- s.release()->Dereference();
- ASSERT_EQ((brpc::Socket*)NULL, global_sock);
- close(fds[0]);
- }
- }
- void* FastWriter(void* void_arg) {
- WriterArg* arg = static_cast<WriterArg*>(void_arg);
- brpc::SocketUniquePtr sock;
- if (brpc::Socket::Address(arg->socket_id, &sock) < 0) {
- printf("Fail to address SocketId=%" PRIu64 "\n", arg->socket_id);
- return NULL;
- }
- char buf[] = "hello reader side!";
- int64_t begin_ts = butil::cpuwide_time_us();
- int64_t nretry = 0;
- size_t c = 0;
- for (; c < arg->times; ++c) {
- butil::IOBuf src;
- src.append(buf, 16);
- if (sock->Write(&src) != 0) {
- if (errno == brpc::EOVERCROWDED) {
- // The buf is full, sleep a while and retry.
- bthread_usleep(1000);
- --c;
- ++nretry;
- continue;
- }
- printf("Fail to write into SocketId=%" PRIu64 ", %s\n",
- arg->socket_id, berror());
- break;
- }
- }
- int64_t end_ts = butil::cpuwide_time_us();
- int64_t total_time = end_ts - begin_ts;
- printf("total=%ld count=%ld nretry=%ld\n",
- (long)total_time * 1000/ c, (long)c, (long)nretry);
- return NULL;
- }
- struct ReaderArg {
- int fd;
- size_t nread;
- };
- void* reader(void* void_arg) {
- ReaderArg* arg = static_cast<ReaderArg*>(void_arg);
- const size_t LEN = 32768;
- char* buf = (char*)malloc(LEN);
- while (1) {
- ssize_t nr = read(arg->fd, buf, LEN);
- if (nr < 0) {
- printf("Fail to read, %m\n");
- return NULL;
- } else if (nr == 0) {
- printf("Far end closed\n");
- return NULL;
- }
- arg->nread += nr;
- }
- return NULL;
- }
- TEST_F(SocketTest, multi_threaded_write_perf) {
- const size_t REP = 1000000000;
- int fds[2];
- ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
- bthread_t th[3];
- WriterArg args[ARRAY_SIZE(th)];
- brpc::SocketId id = 8888;
- butil::EndPoint dummy;
- ASSERT_EQ(0, str2endpoint("192.168.1.26:8080", &dummy));
- brpc::SocketOptions options;
- options.fd = fds[1];
- options.remote_side = dummy;
- options.user = new CheckRecycle;
- ASSERT_EQ(0, brpc::Socket::Create(options, &id));
- brpc::SocketUniquePtr s;
- ASSERT_EQ(0, brpc::Socket::Address(id, &s));
- s->_ssl_state = brpc::SSL_OFF;
- ASSERT_EQ(2, brpc::NRefOfVRef(s->_versioned_ref));
- global_sock = s.get();
- ASSERT_TRUE(s.get());
- ASSERT_EQ(fds[1], s->fd());
- ASSERT_EQ(dummy, s->remote_side());
- ASSERT_EQ(id, s->id());
- for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
- args[i].times = REP;
- args[i].offset = i * REP;
- args[i].socket_id = id;
- bthread_start_background(&th[i], NULL, FastWriter, &args[i]);
- }
- pthread_t rth;
- ReaderArg reader_arg = { fds[0], 0 };
- pthread_create(&rth, NULL, reader, &reader_arg);
- butil::Timer tm;
- ProfilerStart("write.prof");
- const uint64_t old_nread = reader_arg.nread;
- tm.start();
- sleep(2);
- tm.stop();
- const uint64_t new_nread = reader_arg.nread;
- ProfilerStop();
- printf("tp=%" PRIu64 "M/s\n", (new_nread - old_nread) / tm.u_elapsed());
-
- for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
- args[i].times = 0;
- }
- for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
- ASSERT_EQ(0, bthread_join(th[i], NULL));
- }
- ASSERT_EQ(0, s->SetFailed());
- s.release()->Dereference();
- pthread_join(rth, NULL);
- ASSERT_EQ((brpc::Socket*)NULL, global_sock);
- close(fds[0]);
- }
|