123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865 |
- // Licensed to the Apache Software Foundation (ASF) under one
- // or more contributor license agreements. See the NOTICE file
- // distributed with this work for additional information
- // regarding copyright ownership. The ASF licenses this file
- // to you under the Apache License, Version 2.0 (the
- // "License"); you may not use this file except in compliance
- // with the License. You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing,
- // software distributed under the License is distributed on an
- // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- // KIND, either express or implied. See the License for the
- // specific language governing permissions and limitations
- // under the License.
- // brpc - A framework to host and access services throughout Baidu.
- // Date: Fri May 20 15:52:22 CST 2016
- #include <sys/ioctl.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <gtest/gtest.h>
- #include <gflags/gflags.h>
- #include <google/protobuf/descriptor.h>
- #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
- #include "butil/time.h"
- #include "butil/macros.h"
- #include "brpc/socket.h"
- #include "brpc/acceptor.h"
- #include "brpc/server.h"
- #include "brpc/controller.h"
- #include "brpc/rtmp.h"
- #include "brpc/amf.h"
- int main(int argc, char* argv[]) {
- testing::InitGoogleTest(&argc, argv);
- GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
- return RUN_ALL_TESTS();
- }
- class TestRtmpClientStream : public brpc::RtmpClientStream {
- public:
- TestRtmpClientStream()
- : _called_on_stop(0)
- , _called_on_first_message(0)
- , _nvideomsg(0)
- , _naudiomsg(0) {
- LOG(INFO) << __FUNCTION__;
- }
- ~TestRtmpClientStream() {
- LOG(INFO) << __FUNCTION__;
- assertions_on_stop();
- }
- void assertions_on_stop() {
- ASSERT_EQ(1, _called_on_stop);
- }
- void assertions_on_successful_play() {
- ASSERT_EQ(1, _called_on_first_message);
- ASSERT_LT(0, _nvideomsg);
- ASSERT_LT(0, _naudiomsg);
- }
- void assertions_on_failure() {
- ASSERT_EQ(0, _called_on_first_message);
- ASSERT_EQ(0, _nvideomsg);
- ASSERT_EQ(0, _naudiomsg);
- assertions_on_stop();
- }
- void OnFirstMessage() {
- ++_called_on_first_message;
- }
- void OnStop() {
- ++_called_on_stop;
- }
- void OnVideoMessage(brpc::RtmpVideoMessage* msg) {
- ++_nvideomsg;
- // video data is ascii in UT, print it out.
- LOG(INFO) << remote_side() << "|stream=" << stream_id()
- << ": Got " << *msg << " data=" << msg->data;
- }
- void OnAudioMessage(brpc::RtmpAudioMessage* msg) {
- ++_naudiomsg;
- // audio data is ascii in UT, print it out.
- LOG(INFO) << remote_side() << "|stream=" << stream_id()
- << ": Got " << *msg << " data=" << msg->data;
- }
- private:
- int _called_on_stop;
- int _called_on_first_message;
- int _nvideomsg;
- int _naudiomsg;
- };
- class TestRtmpRetryingClientStream
- : public brpc::RtmpRetryingClientStream {
- public:
- TestRtmpRetryingClientStream()
- : _called_on_stop(0)
- , _called_on_first_message(0)
- , _called_on_playable(0) {
- LOG(INFO) << __FUNCTION__;
- }
- ~TestRtmpRetryingClientStream() {
- LOG(INFO) << __FUNCTION__;
- assertions_on_stop();
- }
- void assertions_on_stop() {
- ASSERT_EQ(1, _called_on_stop);
- }
- void OnStop() {
- ++_called_on_stop;
- }
- void OnFirstMessage() {
- ++_called_on_first_message;
- }
- void OnPlayable() {
- ++_called_on_playable;
- }
- void OnVideoMessage(brpc::RtmpVideoMessage* msg) {
- // video data is ascii in UT, print it out.
- LOG(INFO) << remote_side() << "|stream=" << stream_id()
- << ": Got " << *msg << " data=" << msg->data;
- }
- void OnAudioMessage(brpc::RtmpAudioMessage* msg) {
- // audio data is ascii in UT, print it out.
- LOG(INFO) << remote_side() << "|stream=" << stream_id()
- << ": Got " << *msg << " data=" << msg->data;
- }
- private:
- int _called_on_stop;
- int _called_on_first_message;
- int _called_on_playable;
- };
- const char* UNEXIST_NAME = "unexist_stream";
- class PlayingDummyStream : public brpc::RtmpServerStream {
- public:
- enum State {
- STATE_UNPLAYING,
- STATE_PLAYING,
- STATE_STOPPED
- };
- PlayingDummyStream(int64_t sleep_ms)
- : _state(STATE_UNPLAYING), _sleep_ms(sleep_ms) {
- LOG(INFO) << __FUNCTION__ << "(" << this << ")";
- }
- ~PlayingDummyStream() {
- LOG(INFO) << __FUNCTION__ << "(" << this << ")";
- }
- void OnPlay(const brpc::RtmpPlayOptions& opt,
- butil::Status* status,
- google::protobuf::Closure* done) {
- brpc::ClosureGuard done_guard(done);
- LOG(INFO) << remote_side() << "|stream=" << stream_id()
- << ": Got play{stream_name=" << opt.stream_name
- << " start=" << opt.start
- << " duration=" << opt.duration
- << " reset=" << opt.reset << '}';
- if (opt.stream_name == UNEXIST_NAME) {
- status->set_error(EPERM, "Unexist stream");
- return;
- }
- if (_sleep_ms > 0) {
- LOG(INFO) << "Sleep " << _sleep_ms
- << " ms before responding play request";
- bthread_usleep(_sleep_ms * 1000L);
- }
- int rc = bthread_start_background(&_play_thread, NULL,
- RunSendData, this);
- if (rc) {
- status->set_error(rc, "Fail to create thread");
- return;
- }
- State expected = STATE_UNPLAYING;
- if (!_state.compare_exchange_strong(expected, STATE_PLAYING)) {
- if (expected == STATE_STOPPED) {
- bthread_stop(_play_thread);
- bthread_join(_play_thread, NULL);
- } else {
- CHECK(false) << "Impossible";
- }
- }
- }
- void OnStop() {
- LOG(INFO) << "OnStop of PlayingDummyStream=" << this;
- if (_state.exchange(STATE_STOPPED) == STATE_PLAYING) {
- bthread_stop(_play_thread);
- bthread_join(_play_thread, NULL);
- }
- }
- void SendData();
-
- private:
- static void* RunSendData(void* arg) {
- ((PlayingDummyStream*)arg)->SendData();
- return NULL;
- }
- butil::atomic<State> _state;
- bthread_t _play_thread;
- int64_t _sleep_ms;
- };
- void PlayingDummyStream::SendData() {
- LOG(INFO) << "Enter SendData of PlayingDummyStream=" << this;
- brpc::RtmpVideoMessage vmsg;
- brpc::RtmpAudioMessage amsg;
- vmsg.timestamp = 1000;
- amsg.timestamp = 1000;
- for (int i = 0; !bthread_stopped(bthread_self()); ++i) {
- vmsg.timestamp += 20;
- amsg.timestamp += 20;
- vmsg.frame_type = brpc::FLV_VIDEO_FRAME_KEYFRAME;
- vmsg.codec = brpc::FLV_VIDEO_AVC;
- vmsg.data.clear();
- vmsg.data.append(butil::string_printf("video_%d(ms_id=%u)",
- i, stream_id()));
- //failing to send is possible
- SendVideoMessage(vmsg);
- amsg.codec = brpc::FLV_AUDIO_AAC;
- amsg.rate = brpc::FLV_SOUND_RATE_44100HZ;
- amsg.bits = brpc::FLV_SOUND_16BIT;
- amsg.type = brpc::FLV_SOUND_STEREO;
- amsg.data.clear();
- amsg.data.append(butil::string_printf("audio_%d(ms_id=%u)",
- i, stream_id()));
- SendAudioMessage(amsg);
- bthread_usleep(1000000);
- }
- LOG(INFO) << "Quit SendData of PlayingDummyStream=" << this;
- }
- class PlayingDummyService : public brpc::RtmpService {
- public:
- PlayingDummyService(int64_t sleep_ms = 0) : _sleep_ms(sleep_ms) {}
- private:
- // Called to create a server-side stream.
- virtual brpc::RtmpServerStream* NewStream(
- const brpc::RtmpConnectRequest&) {
- return new PlayingDummyStream(_sleep_ms);
- }
- int64_t _sleep_ms;
- };
- class PublishStream : public brpc::RtmpServerStream {
- public:
- PublishStream(int64_t sleep_ms)
- : _sleep_ms(sleep_ms)
- , _called_on_stop(0)
- , _called_on_first_message(0)
- , _nvideomsg(0)
- , _naudiomsg(0) {
- LOG(INFO) << __FUNCTION__ << "(" << this << ")";
- }
- ~PublishStream() {
- LOG(INFO) << __FUNCTION__ << "(" << this << ")";
- assertions_on_stop();
- }
- void assertions_on_stop() {
- ASSERT_EQ(1, _called_on_stop);
- }
- void OnPublish(const std::string& stream_name,
- brpc::RtmpPublishType publish_type,
- butil::Status* status,
- google::protobuf::Closure* done) {
- brpc::ClosureGuard done_guard(done);
- LOG(INFO) << remote_side() << "|stream=" << stream_id()
- << ": Got publish{stream_name=" << stream_name
- << " type=" << brpc::RtmpPublishType2Str(publish_type)
- << '}';
- if (stream_name == UNEXIST_NAME) {
- status->set_error(EPERM, "Unexist stream");
- return;
- }
- if (_sleep_ms > 0) {
- LOG(INFO) << "Sleep " << _sleep_ms
- << " ms before responding play request";
- bthread_usleep(_sleep_ms * 1000L);
- }
- }
- void OnFirstMessage() {
- ++_called_on_first_message;
- }
- void OnStop() {
- LOG(INFO) << "OnStop of PublishStream=" << this;
- ++_called_on_stop;
- }
- void OnVideoMessage(brpc::RtmpVideoMessage* msg) {
- ++_nvideomsg;
- // video data is ascii in UT, print it out.
- LOG(INFO) << remote_side() << "|stream=" << stream_id()
- << ": Got " << *msg << " data=" << msg->data;
- }
- void OnAudioMessage(brpc::RtmpAudioMessage* msg) {
- ++_naudiomsg;
- // audio data is ascii in UT, print it out.
- LOG(INFO) << remote_side() << "|stream=" << stream_id()
- << ": Got " << *msg << " data=" << msg->data;
- }
- private:
- int64_t _sleep_ms;
- int _called_on_stop;
- int _called_on_first_message;
- int _nvideomsg;
- int _naudiomsg;
- };
- class PublishService : public brpc::RtmpService {
- public:
- PublishService(int64_t sleep_ms = 0) : _sleep_ms(sleep_ms) {
- pthread_mutex_init(&_mutex, NULL);
- }
- ~PublishService() {
- pthread_mutex_destroy(&_mutex);
- }
- void move_created_streams(
- std::vector<butil::intrusive_ptr<PublishStream> >* out) {
- out->clear();
- BAIDU_SCOPED_LOCK(_mutex);
- out->swap(_created_streams);
- }
- private:
- // Called to create a server-side stream.
- virtual brpc::RtmpServerStream* NewStream(
- const brpc::RtmpConnectRequest&) {
- PublishStream* stream = new PublishStream(_sleep_ms);
- {
- BAIDU_SCOPED_LOCK(_mutex);
- _created_streams.push_back(stream);
- }
- return stream;
- }
- int64_t _sleep_ms;
- pthread_mutex_t _mutex;
- std::vector<butil::intrusive_ptr<PublishStream> > _created_streams;
- };
- class RtmpSubStream : public brpc::RtmpClientStream {
- public:
- explicit RtmpSubStream(brpc::RtmpMessageHandler* mh)
- : _message_handler(mh) {}
- // @RtmpStreamBase
- void OnMetaData(brpc::RtmpMetaData*, const butil::StringPiece&);
- void OnSharedObjectMessage(brpc::RtmpSharedObjectMessage* msg);
- void OnAudioMessage(brpc::RtmpAudioMessage* msg);
- void OnVideoMessage(brpc::RtmpVideoMessage* msg);
- void OnFirstMessage();
- void OnStop();
- private:
- std::unique_ptr<brpc::RtmpMessageHandler> _message_handler;
- };
- void RtmpSubStream::OnFirstMessage() {
- _message_handler->OnPlayable();
- }
- void RtmpSubStream::OnMetaData(brpc::RtmpMetaData* obj, const butil::StringPiece& name) {
- _message_handler->OnMetaData(obj, name);
- }
- void RtmpSubStream::OnSharedObjectMessage(brpc::RtmpSharedObjectMessage* msg) {
- _message_handler->OnSharedObjectMessage(msg);
- }
- void RtmpSubStream::OnAudioMessage(brpc::RtmpAudioMessage* msg) {
- _message_handler->OnAudioMessage(msg);
- }
- void RtmpSubStream::OnVideoMessage(brpc::RtmpVideoMessage* msg) {
- _message_handler->OnVideoMessage(msg);
- }
- void RtmpSubStream::OnStop() {
- _message_handler->OnSubStreamStop(this);
- }
- class RtmpSubStreamCreator : public brpc::SubStreamCreator {
- public:
- RtmpSubStreamCreator(const brpc::RtmpClient* client);
- ~RtmpSubStreamCreator();
- // @SubStreamCreator
- void NewSubStream(brpc::RtmpMessageHandler* message_handler,
- butil::intrusive_ptr<brpc::RtmpStreamBase>* sub_stream);
- void LaunchSubStream(brpc::RtmpStreamBase* sub_stream,
- brpc::RtmpRetryingClientStreamOptions* options);
- private:
- const brpc::RtmpClient* _client;
- };
- RtmpSubStreamCreator::RtmpSubStreamCreator(const brpc::RtmpClient* client)
- : _client(client) {}
- RtmpSubStreamCreator::~RtmpSubStreamCreator() {}
-
- void RtmpSubStreamCreator::NewSubStream(brpc::RtmpMessageHandler* message_handler,
- butil::intrusive_ptr<brpc::RtmpStreamBase>* sub_stream) {
- if (sub_stream) {
- (*sub_stream).reset(new RtmpSubStream(message_handler));
- }
- return;
- }
- void RtmpSubStreamCreator::LaunchSubStream(
- brpc::RtmpStreamBase* sub_stream,
- brpc::RtmpRetryingClientStreamOptions* options) {
- brpc::RtmpClientStreamOptions client_options = *options;
- dynamic_cast<RtmpSubStream*>(sub_stream)->Init(_client, client_options);
- }
- TEST(RtmpTest, parse_rtmp_url) {
- butil::StringPiece host;
- butil::StringPiece vhost;
- butil::StringPiece port;
- butil::StringPiece app;
- butil::StringPiece stream_name;
- brpc::ParseRtmpURL("rtmp://HOST/APP/STREAM",
- &host, &vhost, &port, &app, &stream_name);
- ASSERT_EQ("HOST", host);
- ASSERT_TRUE(vhost.empty());
- ASSERT_EQ("1935", port);
- ASSERT_EQ("APP", app);
- ASSERT_EQ("STREAM", stream_name);
- brpc::ParseRtmpURL("HOST/APP/STREAM",
- &host, &vhost, &port, &app, &stream_name);
- ASSERT_EQ("HOST", host);
- ASSERT_TRUE(vhost.empty());
- ASSERT_EQ("1935", port);
- ASSERT_EQ("APP", app);
- ASSERT_EQ("STREAM", stream_name);
- brpc::ParseRtmpURL("rtmp://HOST:8765//APP?vhost=abc///STREAM?queries",
- &host, &vhost, &port, &app, &stream_name);
- ASSERT_EQ("HOST", host);
- ASSERT_EQ("abc", vhost);
- ASSERT_EQ("8765", port);
- ASSERT_EQ("APP", app);
- ASSERT_EQ("STREAM?queries", stream_name);
- brpc::ParseRtmpURL("HOST:8765//APP?vhost=abc///STREAM?queries",
- &host, &vhost, &port, &app, &stream_name);
- ASSERT_EQ("HOST", host);
- ASSERT_EQ("abc", vhost);
- ASSERT_EQ("8765", port);
- ASSERT_EQ("APP", app);
- ASSERT_EQ("STREAM?queries", stream_name);
- brpc::ParseRtmpURL("HOST:8765//APP?vhost=abc///STREAM?queries/",
- &host, &vhost, &port, &app, &stream_name);
- ASSERT_EQ("HOST", host);
- ASSERT_EQ("abc", vhost);
- ASSERT_EQ("8765", port);
- ASSERT_EQ("APP", app);
- ASSERT_EQ("STREAM?queries/", stream_name);
- brpc::ParseRtmpURL("HOST:8765/APP?vhost=abc",
- &host, &vhost, &port, &app, &stream_name);
- ASSERT_EQ("HOST", host);
- ASSERT_EQ("abc", vhost);
- ASSERT_EQ("8765", port);
- ASSERT_EQ("APP", app);
- ASSERT_TRUE(stream_name.empty());
- }
- TEST(RtmpTest, amf) {
- std::string req_buf;
- brpc::RtmpInfo info;
- brpc::AMFObject obj;
- std::string dummy = "_result";
- {
- google::protobuf::io::StringOutputStream zc_stream(&req_buf);
- brpc::AMFOutputStream ostream(&zc_stream);
- brpc::WriteAMFString(dummy, &ostream);
- brpc::WriteAMFUint32(17, &ostream);
- info.set_code("NetConnection.Connect"); // TODO
- info.set_level("error");
- info.set_description("heheda hello foobar");
- brpc::WriteAMFObject(info, &ostream);
- ASSERT_TRUE(ostream.good());
- obj.SetString("code", "foo");
- obj.SetString("level", "bar");
- obj.SetString("description", "heheda");
- brpc::WriteAMFObject(obj, &ostream);
- ASSERT_TRUE(ostream.good());
- }
- google::protobuf::io::ArrayInputStream zc_stream(req_buf.data(), req_buf.size());
- brpc::AMFInputStream istream(&zc_stream);
- std::string result;
- ASSERT_TRUE(brpc::ReadAMFString(&result, &istream));
- ASSERT_EQ(dummy, result);
- uint32_t num = 0;
- ASSERT_TRUE(brpc::ReadAMFUint32(&num, &istream));
- ASSERT_EQ(17u, num);
- brpc::RtmpInfo info2;
- ASSERT_TRUE(brpc::ReadAMFObject(&info2, &istream));
- ASSERT_EQ(info.code(), info2.code());
- ASSERT_EQ(info.level(), info2.level());
- ASSERT_EQ(info.description(), info2.description());
- brpc::RtmpInfo info3;
- ASSERT_TRUE(brpc::ReadAMFObject(&info3, &istream));
- ASSERT_EQ("foo", info3.code());
- ASSERT_EQ("bar", info3.level());
- ASSERT_EQ("heheda", info3.description());
- }
- TEST(RtmpTest, successfully_play_streams) {
- PlayingDummyService rtmp_service;
- brpc::Server server;
- brpc::ServerOptions server_opt;
- server_opt.rtmp_service = &rtmp_service;
- ASSERT_EQ(0, server.Start(8571, &server_opt));
- brpc::RtmpClientOptions rtmp_opt;
- rtmp_opt.app = "hello";
- rtmp_opt.swfUrl = "anything";
- rtmp_opt.tcUrl = "rtmp://heheda";
- brpc::RtmpClient rtmp_client;
- ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt));
- // Create multiple streams.
- const int NSTREAM = 2;
- brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
- for (int i = 0; i < NSTREAM; ++i) {
- cstreams[i].reset(new TestRtmpClientStream);
- brpc::RtmpClientStreamOptions opt;
- opt.play_name = butil::string_printf("play_name_%d", i);
- //opt.publish_name = butil::string_printf("pub_name_%d", i);
- opt.wait_until_play_or_publish_is_sent = true;
- cstreams[i]->Init(&rtmp_client, opt);
- }
- sleep(5);
- for (int i = 0; i < NSTREAM; ++i) {
- cstreams[i]->assertions_on_successful_play();
- }
- LOG(INFO) << "Quiting program...";
- }
- TEST(RtmpTest, fail_to_play_streams) {
- PlayingDummyService rtmp_service;
- brpc::Server server;
- brpc::ServerOptions server_opt;
- server_opt.rtmp_service = &rtmp_service;
- ASSERT_EQ(0, server.Start(8571, &server_opt));
- brpc::RtmpClientOptions rtmp_opt;
- rtmp_opt.app = "hello";
- rtmp_opt.swfUrl = "anything";
- rtmp_opt.tcUrl = "rtmp://heheda";
- brpc::RtmpClient rtmp_client;
- ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt));
- // Create multiple streams.
- const int NSTREAM = 2;
- brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
- for (int i = 0; i < NSTREAM; ++i) {
- cstreams[i].reset(new TestRtmpClientStream);
- brpc::RtmpClientStreamOptions opt;
- opt.play_name = UNEXIST_NAME;
- opt.wait_until_play_or_publish_is_sent = true;
- cstreams[i]->Init(&rtmp_client, opt);
- }
- sleep(1);
- for (int i = 0; i < NSTREAM; ++i) {
- cstreams[i]->assertions_on_failure();
- }
- LOG(INFO) << "Quiting program...";
- }
- TEST(RtmpTest, successfully_publish_streams) {
- PublishService rtmp_service;
- brpc::Server server;
- brpc::ServerOptions server_opt;
- server_opt.rtmp_service = &rtmp_service;
- ASSERT_EQ(0, server.Start(8571, &server_opt));
- brpc::RtmpClientOptions rtmp_opt;
- rtmp_opt.app = "hello";
- rtmp_opt.swfUrl = "anything";
- rtmp_opt.tcUrl = "rtmp://heheda";
- brpc::RtmpClient rtmp_client;
- ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt));
- // Create multiple streams.
- const int NSTREAM = 2;
- brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
- for (int i = 0; i < NSTREAM; ++i) {
- cstreams[i].reset(new TestRtmpClientStream);
- brpc::RtmpClientStreamOptions opt;
- opt.publish_name = butil::string_printf("pub_name_%d", i);
- opt.wait_until_play_or_publish_is_sent = true;
- cstreams[i]->Init(&rtmp_client, opt);
- }
- const int REP = 5;
- for (int i = 0; i < REP; ++i) {
- brpc::RtmpVideoMessage vmsg;
- vmsg.timestamp = 1000 + i * 20;
- vmsg.frame_type = brpc::FLV_VIDEO_FRAME_KEYFRAME;
- vmsg.codec = brpc::FLV_VIDEO_AVC;
- vmsg.data.append(butil::string_printf("video_%d", i));
- for (int j = 0; j < NSTREAM; j += 2) {
- ASSERT_EQ(0, cstreams[j]->SendVideoMessage(vmsg));
- }
-
- brpc::RtmpAudioMessage amsg;
- amsg.timestamp = 1000 + i * 20;
- amsg.codec = brpc::FLV_AUDIO_AAC;
- amsg.rate = brpc::FLV_SOUND_RATE_44100HZ;
- amsg.bits = brpc::FLV_SOUND_16BIT;
- amsg.type = brpc::FLV_SOUND_STEREO;
- amsg.data.append(butil::string_printf("audio_%d", i));
- for (int j = 1; j < NSTREAM; j += 2) {
- ASSERT_EQ(0, cstreams[j]->SendAudioMessage(amsg));
- }
-
- bthread_usleep(500000);
- }
- std::vector<butil::intrusive_ptr<PublishStream> > created_streams;
- rtmp_service.move_created_streams(&created_streams);
- ASSERT_EQ(NSTREAM, (int)created_streams.size());
- for (int i = 0; i < NSTREAM; ++i) {
- EXPECT_EQ(1, created_streams[i]->_called_on_first_message);
- }
- for (int j = 0; j < NSTREAM; j += 2) {
- ASSERT_EQ(REP, created_streams[j]->_nvideomsg);
- }
- for (int j = 1; j < NSTREAM; j += 2) {
- ASSERT_EQ(REP, created_streams[j]->_naudiomsg);
- }
- LOG(INFO) << "Quiting program...";
- }
- TEST(RtmpTest, failed_to_publish_streams) {
- PublishService rtmp_service;
- brpc::Server server;
- brpc::ServerOptions server_opt;
- server_opt.rtmp_service = &rtmp_service;
- ASSERT_EQ(0, server.Start(8575, &server_opt));
- brpc::RtmpClientOptions rtmp_opt;
- rtmp_opt.app = "hello";
- rtmp_opt.swfUrl = "anything";
- rtmp_opt.tcUrl = "rtmp://heheda";
- brpc::RtmpClient rtmp_client;
- ASSERT_EQ(0, rtmp_client.Init("localhost:8575", rtmp_opt));
- // Create multiple streams.
- const int NSTREAM = 2;
- brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
- for (int i = 0; i < NSTREAM; ++i) {
- cstreams[i].reset(new TestRtmpClientStream);
- brpc::RtmpClientStreamOptions opt;
- opt.publish_name = UNEXIST_NAME;
- opt.wait_until_play_or_publish_is_sent = true;
- cstreams[i]->Init(&rtmp_client, opt);
- }
- sleep(1);
- for (int i = 0; i < NSTREAM; ++i) {
- cstreams[i]->assertions_on_failure();
- }
- std::vector<butil::intrusive_ptr<PublishStream> > created_streams;
- rtmp_service.move_created_streams(&created_streams);
- ASSERT_EQ(NSTREAM, (int)created_streams.size());
- for (int i = 0; i < NSTREAM; ++i) {
- ASSERT_EQ(0, created_streams[i]->_called_on_first_message);
- ASSERT_EQ(0, created_streams[i]->_nvideomsg);
- ASSERT_EQ(0, created_streams[i]->_naudiomsg);
- }
- LOG(INFO) << "Quiting program...";
- }
- TEST(RtmpTest, failed_to_connect_client_streams) {
- brpc::RtmpClientOptions rtmp_opt;
- rtmp_opt.app = "hello";
- rtmp_opt.swfUrl = "anything";
- rtmp_opt.tcUrl = "rtmp://heheda";
- brpc::RtmpClient rtmp_client;
- ASSERT_EQ(0, rtmp_client.Init("localhost:8572", rtmp_opt));
- // Create multiple streams.
- const int NSTREAM = 2;
- brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
- for (int i = 0; i < NSTREAM; ++i) {
- cstreams[i].reset(new TestRtmpClientStream);
- brpc::RtmpClientStreamOptions opt;
- opt.play_name = butil::string_printf("play_name_%d", i);
- opt.wait_until_play_or_publish_is_sent = true;
- cstreams[i]->Init(&rtmp_client, opt);
- cstreams[i]->assertions_on_failure();
- }
- LOG(INFO) << "Quiting program...";
- }
- TEST(RtmpTest, destroy_client_streams_before_init) {
- brpc::RtmpClientOptions rtmp_opt;
- rtmp_opt.app = "hello";
- rtmp_opt.swfUrl = "anything";
- rtmp_opt.tcUrl = "rtmp://heheda";
- brpc::RtmpClient rtmp_client;
- ASSERT_EQ(0, rtmp_client.Init("localhost:8573", rtmp_opt));
- // Create multiple streams.
- const int NSTREAM = 2;
- butil::intrusive_ptr<TestRtmpClientStream> cstreams[NSTREAM];
- for (int i = 0; i < NSTREAM; ++i) {
- cstreams[i].reset(new TestRtmpClientStream);
- cstreams[i]->Destroy();
- ASSERT_EQ(1, cstreams[i]->_called_on_stop);
- ASSERT_EQ(brpc::RtmpClientStream::STATE_DESTROYING, cstreams[i]->_state);
- brpc::RtmpClientStreamOptions opt;
- opt.play_name = butil::string_printf("play_name_%d", i);
- opt.wait_until_play_or_publish_is_sent = true;
- cstreams[i]->Init(&rtmp_client, opt);
- cstreams[i]->assertions_on_failure();
- }
- LOG(INFO) << "Quiting program...";
- }
- TEST(RtmpTest, destroy_retrying_client_streams_before_init) {
- brpc::RtmpClientOptions rtmp_opt;
- rtmp_opt.app = "hello";
- rtmp_opt.swfUrl = "anything";
- rtmp_opt.tcUrl = "rtmp://heheda";
- brpc::RtmpClient rtmp_client;
- ASSERT_EQ(0, rtmp_client.Init("localhost:8573", rtmp_opt));
- // Create multiple streams.
- const int NSTREAM = 2;
- butil::intrusive_ptr<TestRtmpRetryingClientStream> cstreams[NSTREAM];
- for (int i = 0; i < NSTREAM; ++i) {
- cstreams[i].reset(new TestRtmpRetryingClientStream);
- cstreams[i]->Destroy();
- ASSERT_EQ(1, cstreams[i]->_called_on_stop);
- brpc::RtmpRetryingClientStreamOptions opt;
- opt.play_name = butil::string_printf("play_name_%d", i);
- brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client);
- cstreams[i]->Init(sc, opt);
- ASSERT_EQ(1, cstreams[i]->_called_on_stop);
- }
- LOG(INFO) << "Quiting program...";
- }
- TEST(RtmpTest, destroy_client_streams_during_creation) {
- PlayingDummyService rtmp_service(2000/*sleep 2s*/);
- brpc::Server server;
- brpc::ServerOptions server_opt;
- server_opt.rtmp_service = &rtmp_service;
- ASSERT_EQ(0, server.Start(8574, &server_opt));
- brpc::RtmpClientOptions rtmp_opt;
- rtmp_opt.app = "hello";
- rtmp_opt.swfUrl = "anything";
- rtmp_opt.tcUrl = "rtmp://heheda";
- brpc::RtmpClient rtmp_client;
- ASSERT_EQ(0, rtmp_client.Init("localhost:8574", rtmp_opt));
- // Create multiple streams.
- const int NSTREAM = 2;
- butil::intrusive_ptr<TestRtmpClientStream> cstreams[NSTREAM];
- for (int i = 0; i < NSTREAM; ++i) {
- cstreams[i].reset(new TestRtmpClientStream);
- brpc::RtmpClientStreamOptions opt;
- opt.play_name = butil::string_printf("play_name_%d", i);
- cstreams[i]->Init(&rtmp_client, opt);
- ASSERT_EQ(0, cstreams[i]->_called_on_stop);
- usleep(500*1000);
- ASSERT_EQ(0, cstreams[i]->_called_on_stop);
- cstreams[i]->Destroy();
- usleep(10*1000);
- ASSERT_EQ(1, cstreams[i]->_called_on_stop);
- }
- LOG(INFO) << "Quiting program...";
- }
- TEST(RtmpTest, destroy_retrying_client_streams_during_creation) {
- PlayingDummyService rtmp_service(2000/*sleep 2s*/);
- brpc::Server server;
- brpc::ServerOptions server_opt;
- server_opt.rtmp_service = &rtmp_service;
- ASSERT_EQ(0, server.Start(8574, &server_opt));
- brpc::RtmpClientOptions rtmp_opt;
- rtmp_opt.app = "hello";
- rtmp_opt.swfUrl = "anything";
- rtmp_opt.tcUrl = "rtmp://heheda";
- brpc::RtmpClient rtmp_client;
- ASSERT_EQ(0, rtmp_client.Init("localhost:8574", rtmp_opt));
- // Create multiple streams.
- const int NSTREAM = 2;
- butil::intrusive_ptr<TestRtmpRetryingClientStream> cstreams[NSTREAM];
- for (int i = 0; i < NSTREAM; ++i) {
- cstreams[i].reset(new TestRtmpRetryingClientStream);
- brpc::RtmpRetryingClientStreamOptions opt;
- opt.play_name = butil::string_printf("play_name_%d", i);
- brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client);
- cstreams[i]->Init(sc, opt);
- ASSERT_EQ(0, cstreams[i]->_called_on_stop);
- usleep(500*1000);
- ASSERT_EQ(0, cstreams[i]->_called_on_stop);
- cstreams[i]->Destroy();
- usleep(10*1000);
- ASSERT_EQ(1, cstreams[i]->_called_on_stop);
- }
- LOG(INFO) << "Quiting program...";
- }
- TEST(RtmpTest, retrying_stream) {
- PlayingDummyService rtmp_service;
- brpc::Server server;
- brpc::ServerOptions server_opt;
- server_opt.rtmp_service = &rtmp_service;
- ASSERT_EQ(0, server.Start(8576, &server_opt));
- brpc::RtmpClientOptions rtmp_opt;
- rtmp_opt.app = "hello";
- rtmp_opt.swfUrl = "anything";
- rtmp_opt.tcUrl = "rtmp://heheda";
- brpc::RtmpClient rtmp_client;
- ASSERT_EQ(0, rtmp_client.Init("localhost:8576", rtmp_opt));
- // Create multiple streams.
- const int NSTREAM = 2;
- brpc::DestroyingPtr<TestRtmpRetryingClientStream> cstreams[NSTREAM];
- for (int i = 0; i < NSTREAM; ++i) {
- cstreams[i].reset(new TestRtmpRetryingClientStream);
- brpc::Controller cntl;
- brpc::RtmpRetryingClientStreamOptions opt;
- opt.play_name = butil::string_printf("name_%d", i);
- brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client);
- cstreams[i]->Init(sc, opt);
- }
- sleep(3);
- LOG(INFO) << "Stopping server";
- server.Stop(0);
- server.Join();
- LOG(INFO) << "Stopped server and sleep for a while";
- sleep(3);
- ASSERT_EQ(0, server.Start(8576, &server_opt));
- sleep(3);
- for (int i = 0; i < NSTREAM; ++i) {
- ASSERT_EQ(1, cstreams[i]->_called_on_first_message);
- ASSERT_EQ(2, cstreams[i]->_called_on_playable);
- }
- LOG(INFO) << "Quiting program...";
- }
|