123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- // Licensed to the Apache Software Foundation (ASF) under one
- // or more contributor license agreements. See the NOTICE file
- // distributed with this work for additional information
- // regarding copyright ownership. The ASF licenses this file
- // to you under the Apache License, Version 2.0 (the
- // "License"); you may not use this file except in compliance
- // with the License. You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing,
- // software distributed under the License is distributed on an
- // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- // KIND, either express or implied. See the License for the
- // specific language governing permissions and limitations
- // under the License.
- // A server to receive EchoRequest and send back EchoResponse.
- #include <gflags/gflags.h>
- #include <butil/logging.h>
- #include <brpc/server.h>
- #include <butil/atomicops.h>
- #include <butil/time.h>
- #include <butil/logging.h>
- #include <json2pb/json_to_pb.h>
- #include <bthread/timer_thread.h>
- #include <bthread/bthread.h>
- #include <cstdlib>
- #include <fstream>
- #include "cl_test.pb.h"
- DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state "
- "(waiting for client to close connection before server stops)");
- DEFINE_int32(server_bthread_concurrency, 4,
- "Configuring the value of bthread_concurrency, For compute max qps, ");
- DEFINE_int32(server_sync_sleep_us, 2500,
- "Usleep time, each request will be executed once, For compute max qps");
- // max qps = 1000 / 2.5 * 4
- DEFINE_int32(control_server_port, 9000, "");
- DEFINE_int32(echo_port, 9001, "TCP Port of echo server");
- DEFINE_int32(cntl_port, 9000, "TCP Port of controller server");
- DEFINE_string(case_file, "", "File path for test_cases");
- DEFINE_int32(latency_change_interval_us, 50000, "Intervalt for server side changes the latency");
- DEFINE_int32(server_max_concurrency, 0, "Echo Server's max_concurrency");
- DEFINE_bool(use_usleep, false,
- "EchoServer uses ::usleep or bthread_usleep to simulate latency "
- "when processing requests");
- bthread::TimerThread g_timer_thread;
- int cast_func(void* arg) {
- return *(int*)arg;
- }
- void DisplayStage(const test::Stage& stage) {
- std::string type;
- switch(stage.type()) {
- case test::FLUCTUATE:
- type = "Fluctuate";
- break;
- case test::SMOOTH:
- type = "Smooth";
- break;
- default:
- type = "Unknown";
- }
- std::stringstream ss;
- ss
- << "Stage:[" << stage.lower_bound() << ':'
- << stage.upper_bound() << "]"
- << " , Type:" << type;
- LOG(INFO) << ss.str();
- }
- butil::atomic<int> cnt(0);
- butil::atomic<int> atomic_sleep_time(0);
- bvar::PassiveStatus<int> atomic_sleep_time_bvar(cast_func, &atomic_sleep_time);
- namespace bthread {
- DECLARE_int32(bthread_concurrency);
- }
- void TimerTask(void* data);
- class EchoServiceImpl : public test::EchoService {
- public:
- EchoServiceImpl()
- : _stage_index(0)
- , _running_case(false) {
- };
- virtual ~EchoServiceImpl() {};
- void SetTestCase(const test::TestCase& test_case) {
- _test_case = test_case;
- _next_stage_start = _test_case.latency_stage_list(0).duration_sec() +
- butil::gettimeofday_s();
- _stage_index = 0;
- _running_case = false;
- DisplayStage(_test_case.latency_stage_list(_stage_index));
- }
- void StartTestCase() {
- CHECK(!_running_case);
- _running_case = true;
- UpdateLatency();
- }
- void StopTestCase() {
- _running_case = false;
- }
-
- void UpdateLatency() {
- if (!_running_case) {
- return;
- }
- ComputeLatency();
- g_timer_thread.schedule(TimerTask, (void*)this,
- butil::microseconds_from_now(FLAGS_latency_change_interval_us));
- }
- virtual void Echo(google::protobuf::RpcController* cntl_base,
- const test::NotifyRequest* request,
- test::NotifyResponse* response,
- google::protobuf::Closure* done) {
- brpc::ClosureGuard done_guard(done);
- response->set_message("hello");
- ::usleep(FLAGS_server_sync_sleep_us);
- if (FLAGS_use_usleep) {
- ::usleep(_latency.load(butil::memory_order_relaxed));
- } else {
- bthread_usleep(_latency.load(butil::memory_order_relaxed));
- }
- }
- void ComputeLatency() {
- if (_stage_index < _test_case.latency_stage_list_size() &&
- butil::gettimeofday_s() > _next_stage_start) {
- ++_stage_index;
- if (_stage_index < _test_case.latency_stage_list_size()) {
- _next_stage_start += _test_case.latency_stage_list(_stage_index).duration_sec();
- DisplayStage(_test_case.latency_stage_list(_stage_index));
- }
- }
- if (_stage_index == _test_case.latency_stage_list_size()) {
- const test::Stage& latency_stage =
- _test_case.latency_stage_list(_stage_index - 1);
- if (latency_stage.type() == test::ChangeType::FLUCTUATE) {
- _latency.store((latency_stage.lower_bound() + latency_stage.upper_bound()) / 2,
- butil::memory_order_relaxed);
- } else if (latency_stage.type() == test::ChangeType::SMOOTH) {
- _latency.store(latency_stage.upper_bound(), butil::memory_order_relaxed);
- }
- return;
- }
-
- const test::Stage& latency_stage = _test_case.latency_stage_list(_stage_index);
- const int lower_bound = latency_stage.lower_bound();
- const int upper_bound = latency_stage.upper_bound();
- if (latency_stage.type() == test::FLUCTUATE) {
- _latency.store(butil::fast_rand_less_than(upper_bound - lower_bound) + lower_bound,
- butil::memory_order_relaxed);
- } else if (latency_stage.type() == test::SMOOTH) {
- int latency = lower_bound + (upper_bound - lower_bound) /
- double(latency_stage.duration_sec()) *
- (latency_stage.duration_sec() - _next_stage_start +
- butil::gettimeofday_s());
- _latency.store(latency, butil::memory_order_relaxed);
- } else {
- LOG(FATAL) << "Wrong Type:" << latency_stage.type();
- }
- }
- private:
- int _stage_index;
- int _next_stage_start;
- butil::atomic<int> _latency;
- test::TestCase _test_case;
- bool _running_case;
- };
- void TimerTask(void* data) {
- EchoServiceImpl* echo_service = (EchoServiceImpl*)data;
- echo_service->UpdateLatency();
- }
- class ControlServiceImpl : public test::ControlService {
- public:
- ControlServiceImpl()
- : _case_index(0) {
- LoadCaseSet(FLAGS_case_file);
- _echo_service = new EchoServiceImpl;
- if (_server.AddService(_echo_service,
- brpc::SERVER_OWNS_SERVICE) != 0) {
- LOG(FATAL) << "Fail to add service";
- }
- g_timer_thread.start(NULL);
- }
- virtual ~ControlServiceImpl() {
- _echo_service->StopTestCase();
- g_timer_thread.stop_and_join();
- };
- virtual void Notify(google::protobuf::RpcController* cntl_base,
- const test::NotifyRequest* request,
- test::NotifyResponse* response,
- google::protobuf::Closure* done) {
- brpc::ClosureGuard done_guard(done);
- const std::string& message = request->message();
- LOG(INFO) << message;
- if (message == "ResetCaseSet") {
- _server.Stop(0);
- _server.Join();
- _echo_service->StopTestCase();
- LoadCaseSet(FLAGS_case_file);
- _case_index = 0;
- response->set_message("CaseSetReset");
- } else if (message == "StartCase") {
- CHECK(!_server.IsRunning()) << "Continuous StartCase";
- const test::TestCase& test_case = _case_set.test_case(_case_index++);
- _echo_service->SetTestCase(test_case);
- brpc::ServerOptions options;
- options.max_concurrency = FLAGS_server_max_concurrency;
- _server.MaxConcurrencyOf("test.EchoService.Echo") = test_case.max_concurrency();
- _server.Start(FLAGS_echo_port, &options);
- _echo_service->StartTestCase();
- response->set_message("CaseStarted");
- } else if (message == "StopCase") {
- CHECK(_server.IsRunning()) << "Continuous StopCase";
- _server.Stop(0);
- _server.Join();
- _echo_service->StopTestCase();
- response->set_message("CaseStopped");
- } else {
- LOG(FATAL) << "Invalid message:" << message;
- response->set_message("Invalid Cntl Message");
- }
- }
- private:
- void LoadCaseSet(const std::string& file_path) {
- std::ifstream ifs(file_path.c_str(), std::ios::in);
- if (!ifs) {
- LOG(FATAL) << "Fail to open case set file: " << file_path;
- }
- std::string case_set_json((std::istreambuf_iterator<char>(ifs)),
- std::istreambuf_iterator<char>());
- test::TestCaseSet case_set;
- std::string err;
- if (!json2pb::JsonToProtoMessage(case_set_json, &case_set, &err)) {
- LOG(FATAL)
- << "Fail to trans case_set from json to protobuf message: "
- << err;
- }
- _case_set = case_set;
- ifs.close();
- }
- brpc::Server _server;
- EchoServiceImpl* _echo_service;
- test::TestCaseSet _case_set;
- int _case_index;
- };
- int main(int argc, char* argv[]) {
- // Parse gflags. We recommend you to use gflags as well.
- GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
- bthread::FLAGS_bthread_concurrency= FLAGS_server_bthread_concurrency;
- brpc::Server server;
- ControlServiceImpl control_service_impl;
- if (server.AddService(&control_service_impl,
- brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
- LOG(ERROR) << "Fail to add service";
- return -1;
- }
- if (server.Start(FLAGS_cntl_port, NULL) != 0) {
- LOG(ERROR) << "Fail to start EchoServer";
- return -1;
- }
- server.RunUntilAskedToQuit();
- return 0;
- }
|