brpc_rtmp_unittest.cpp 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865
  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. // brpc - A framework to host and access services throughout Baidu.
  18. // Date: Fri May 20 15:52:22 CST 2016
  19. #include <sys/ioctl.h>
  20. #include <sys/types.h>
  21. #include <sys/socket.h>
  22. #include <gtest/gtest.h>
  23. #include <gflags/gflags.h>
  24. #include <google/protobuf/descriptor.h>
  25. #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
  26. #include "butil/time.h"
  27. #include "butil/macros.h"
  28. #include "brpc/socket.h"
  29. #include "brpc/acceptor.h"
  30. #include "brpc/server.h"
  31. #include "brpc/controller.h"
  32. #include "brpc/rtmp.h"
  33. #include "brpc/amf.h"
  34. int main(int argc, char* argv[]) {
  35. testing::InitGoogleTest(&argc, argv);
  36. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
  37. return RUN_ALL_TESTS();
  38. }
  39. class TestRtmpClientStream : public brpc::RtmpClientStream {
  40. public:
  41. TestRtmpClientStream()
  42. : _called_on_stop(0)
  43. , _called_on_first_message(0)
  44. , _nvideomsg(0)
  45. , _naudiomsg(0) {
  46. LOG(INFO) << __FUNCTION__;
  47. }
  48. ~TestRtmpClientStream() {
  49. LOG(INFO) << __FUNCTION__;
  50. assertions_on_stop();
  51. }
  52. void assertions_on_stop() {
  53. ASSERT_EQ(1, _called_on_stop);
  54. }
  55. void assertions_on_successful_play() {
  56. ASSERT_EQ(1, _called_on_first_message);
  57. ASSERT_LT(0, _nvideomsg);
  58. ASSERT_LT(0, _naudiomsg);
  59. }
  60. void assertions_on_failure() {
  61. ASSERT_EQ(0, _called_on_first_message);
  62. ASSERT_EQ(0, _nvideomsg);
  63. ASSERT_EQ(0, _naudiomsg);
  64. assertions_on_stop();
  65. }
  66. void OnFirstMessage() {
  67. ++_called_on_first_message;
  68. }
  69. void OnStop() {
  70. ++_called_on_stop;
  71. }
  72. void OnVideoMessage(brpc::RtmpVideoMessage* msg) {
  73. ++_nvideomsg;
  74. // video data is ascii in UT, print it out.
  75. LOG(INFO) << remote_side() << "|stream=" << stream_id()
  76. << ": Got " << *msg << " data=" << msg->data;
  77. }
  78. void OnAudioMessage(brpc::RtmpAudioMessage* msg) {
  79. ++_naudiomsg;
  80. // audio data is ascii in UT, print it out.
  81. LOG(INFO) << remote_side() << "|stream=" << stream_id()
  82. << ": Got " << *msg << " data=" << msg->data;
  83. }
  84. private:
  85. int _called_on_stop;
  86. int _called_on_first_message;
  87. int _nvideomsg;
  88. int _naudiomsg;
  89. };
  90. class TestRtmpRetryingClientStream
  91. : public brpc::RtmpRetryingClientStream {
  92. public:
  93. TestRtmpRetryingClientStream()
  94. : _called_on_stop(0)
  95. , _called_on_first_message(0)
  96. , _called_on_playable(0) {
  97. LOG(INFO) << __FUNCTION__;
  98. }
  99. ~TestRtmpRetryingClientStream() {
  100. LOG(INFO) << __FUNCTION__;
  101. assertions_on_stop();
  102. }
  103. void assertions_on_stop() {
  104. ASSERT_EQ(1, _called_on_stop);
  105. }
  106. void OnStop() {
  107. ++_called_on_stop;
  108. }
  109. void OnFirstMessage() {
  110. ++_called_on_first_message;
  111. }
  112. void OnPlayable() {
  113. ++_called_on_playable;
  114. }
  115. void OnVideoMessage(brpc::RtmpVideoMessage* msg) {
  116. // video data is ascii in UT, print it out.
  117. LOG(INFO) << remote_side() << "|stream=" << stream_id()
  118. << ": Got " << *msg << " data=" << msg->data;
  119. }
  120. void OnAudioMessage(brpc::RtmpAudioMessage* msg) {
  121. // audio data is ascii in UT, print it out.
  122. LOG(INFO) << remote_side() << "|stream=" << stream_id()
  123. << ": Got " << *msg << " data=" << msg->data;
  124. }
  125. private:
  126. int _called_on_stop;
  127. int _called_on_first_message;
  128. int _called_on_playable;
  129. };
  130. const char* UNEXIST_NAME = "unexist_stream";
  131. class PlayingDummyStream : public brpc::RtmpServerStream {
  132. public:
  133. enum State {
  134. STATE_UNPLAYING,
  135. STATE_PLAYING,
  136. STATE_STOPPED
  137. };
  138. PlayingDummyStream(int64_t sleep_ms)
  139. : _state(STATE_UNPLAYING), _sleep_ms(sleep_ms) {
  140. LOG(INFO) << __FUNCTION__ << "(" << this << ")";
  141. }
  142. ~PlayingDummyStream() {
  143. LOG(INFO) << __FUNCTION__ << "(" << this << ")";
  144. }
  145. void OnPlay(const brpc::RtmpPlayOptions& opt,
  146. butil::Status* status,
  147. google::protobuf::Closure* done) {
  148. brpc::ClosureGuard done_guard(done);
  149. LOG(INFO) << remote_side() << "|stream=" << stream_id()
  150. << ": Got play{stream_name=" << opt.stream_name
  151. << " start=" << opt.start
  152. << " duration=" << opt.duration
  153. << " reset=" << opt.reset << '}';
  154. if (opt.stream_name == UNEXIST_NAME) {
  155. status->set_error(EPERM, "Unexist stream");
  156. return;
  157. }
  158. if (_sleep_ms > 0) {
  159. LOG(INFO) << "Sleep " << _sleep_ms
  160. << " ms before responding play request";
  161. bthread_usleep(_sleep_ms * 1000L);
  162. }
  163. int rc = bthread_start_background(&_play_thread, NULL,
  164. RunSendData, this);
  165. if (rc) {
  166. status->set_error(rc, "Fail to create thread");
  167. return;
  168. }
  169. State expected = STATE_UNPLAYING;
  170. if (!_state.compare_exchange_strong(expected, STATE_PLAYING)) {
  171. if (expected == STATE_STOPPED) {
  172. bthread_stop(_play_thread);
  173. bthread_join(_play_thread, NULL);
  174. } else {
  175. CHECK(false) << "Impossible";
  176. }
  177. }
  178. }
  179. void OnStop() {
  180. LOG(INFO) << "OnStop of PlayingDummyStream=" << this;
  181. if (_state.exchange(STATE_STOPPED) == STATE_PLAYING) {
  182. bthread_stop(_play_thread);
  183. bthread_join(_play_thread, NULL);
  184. }
  185. }
  186. void SendData();
  187. private:
  188. static void* RunSendData(void* arg) {
  189. ((PlayingDummyStream*)arg)->SendData();
  190. return NULL;
  191. }
  192. butil::atomic<State> _state;
  193. bthread_t _play_thread;
  194. int64_t _sleep_ms;
  195. };
  196. void PlayingDummyStream::SendData() {
  197. LOG(INFO) << "Enter SendData of PlayingDummyStream=" << this;
  198. brpc::RtmpVideoMessage vmsg;
  199. brpc::RtmpAudioMessage amsg;
  200. vmsg.timestamp = 1000;
  201. amsg.timestamp = 1000;
  202. for (int i = 0; !bthread_stopped(bthread_self()); ++i) {
  203. vmsg.timestamp += 20;
  204. amsg.timestamp += 20;
  205. vmsg.frame_type = brpc::FLV_VIDEO_FRAME_KEYFRAME;
  206. vmsg.codec = brpc::FLV_VIDEO_AVC;
  207. vmsg.data.clear();
  208. vmsg.data.append(butil::string_printf("video_%d(ms_id=%u)",
  209. i, stream_id()));
  210. //failing to send is possible
  211. SendVideoMessage(vmsg);
  212. amsg.codec = brpc::FLV_AUDIO_AAC;
  213. amsg.rate = brpc::FLV_SOUND_RATE_44100HZ;
  214. amsg.bits = brpc::FLV_SOUND_16BIT;
  215. amsg.type = brpc::FLV_SOUND_STEREO;
  216. amsg.data.clear();
  217. amsg.data.append(butil::string_printf("audio_%d(ms_id=%u)",
  218. i, stream_id()));
  219. SendAudioMessage(amsg);
  220. bthread_usleep(1000000);
  221. }
  222. LOG(INFO) << "Quit SendData of PlayingDummyStream=" << this;
  223. }
  224. class PlayingDummyService : public brpc::RtmpService {
  225. public:
  226. PlayingDummyService(int64_t sleep_ms = 0) : _sleep_ms(sleep_ms) {}
  227. private:
  228. // Called to create a server-side stream.
  229. virtual brpc::RtmpServerStream* NewStream(
  230. const brpc::RtmpConnectRequest&) {
  231. return new PlayingDummyStream(_sleep_ms);
  232. }
  233. int64_t _sleep_ms;
  234. };
  235. class PublishStream : public brpc::RtmpServerStream {
  236. public:
  237. PublishStream(int64_t sleep_ms)
  238. : _sleep_ms(sleep_ms)
  239. , _called_on_stop(0)
  240. , _called_on_first_message(0)
  241. , _nvideomsg(0)
  242. , _naudiomsg(0) {
  243. LOG(INFO) << __FUNCTION__ << "(" << this << ")";
  244. }
  245. ~PublishStream() {
  246. LOG(INFO) << __FUNCTION__ << "(" << this << ")";
  247. assertions_on_stop();
  248. }
  249. void assertions_on_stop() {
  250. ASSERT_EQ(1, _called_on_stop);
  251. }
  252. void OnPublish(const std::string& stream_name,
  253. brpc::RtmpPublishType publish_type,
  254. butil::Status* status,
  255. google::protobuf::Closure* done) {
  256. brpc::ClosureGuard done_guard(done);
  257. LOG(INFO) << remote_side() << "|stream=" << stream_id()
  258. << ": Got publish{stream_name=" << stream_name
  259. << " type=" << brpc::RtmpPublishType2Str(publish_type)
  260. << '}';
  261. if (stream_name == UNEXIST_NAME) {
  262. status->set_error(EPERM, "Unexist stream");
  263. return;
  264. }
  265. if (_sleep_ms > 0) {
  266. LOG(INFO) << "Sleep " << _sleep_ms
  267. << " ms before responding play request";
  268. bthread_usleep(_sleep_ms * 1000L);
  269. }
  270. }
  271. void OnFirstMessage() {
  272. ++_called_on_first_message;
  273. }
  274. void OnStop() {
  275. LOG(INFO) << "OnStop of PublishStream=" << this;
  276. ++_called_on_stop;
  277. }
  278. void OnVideoMessage(brpc::RtmpVideoMessage* msg) {
  279. ++_nvideomsg;
  280. // video data is ascii in UT, print it out.
  281. LOG(INFO) << remote_side() << "|stream=" << stream_id()
  282. << ": Got " << *msg << " data=" << msg->data;
  283. }
  284. void OnAudioMessage(brpc::RtmpAudioMessage* msg) {
  285. ++_naudiomsg;
  286. // audio data is ascii in UT, print it out.
  287. LOG(INFO) << remote_side() << "|stream=" << stream_id()
  288. << ": Got " << *msg << " data=" << msg->data;
  289. }
  290. private:
  291. int64_t _sleep_ms;
  292. int _called_on_stop;
  293. int _called_on_first_message;
  294. int _nvideomsg;
  295. int _naudiomsg;
  296. };
  297. class PublishService : public brpc::RtmpService {
  298. public:
  299. PublishService(int64_t sleep_ms = 0) : _sleep_ms(sleep_ms) {
  300. pthread_mutex_init(&_mutex, NULL);
  301. }
  302. ~PublishService() {
  303. pthread_mutex_destroy(&_mutex);
  304. }
  305. void move_created_streams(
  306. std::vector<butil::intrusive_ptr<PublishStream> >* out) {
  307. out->clear();
  308. BAIDU_SCOPED_LOCK(_mutex);
  309. out->swap(_created_streams);
  310. }
  311. private:
  312. // Called to create a server-side stream.
  313. virtual brpc::RtmpServerStream* NewStream(
  314. const brpc::RtmpConnectRequest&) {
  315. PublishStream* stream = new PublishStream(_sleep_ms);
  316. {
  317. BAIDU_SCOPED_LOCK(_mutex);
  318. _created_streams.push_back(stream);
  319. }
  320. return stream;
  321. }
  322. int64_t _sleep_ms;
  323. pthread_mutex_t _mutex;
  324. std::vector<butil::intrusive_ptr<PublishStream> > _created_streams;
  325. };
  326. class RtmpSubStream : public brpc::RtmpClientStream {
  327. public:
  328. explicit RtmpSubStream(brpc::RtmpMessageHandler* mh)
  329. : _message_handler(mh) {}
  330. // @RtmpStreamBase
  331. void OnMetaData(brpc::RtmpMetaData*, const butil::StringPiece&);
  332. void OnSharedObjectMessage(brpc::RtmpSharedObjectMessage* msg);
  333. void OnAudioMessage(brpc::RtmpAudioMessage* msg);
  334. void OnVideoMessage(brpc::RtmpVideoMessage* msg);
  335. void OnFirstMessage();
  336. void OnStop();
  337. private:
  338. std::unique_ptr<brpc::RtmpMessageHandler> _message_handler;
  339. };
  340. void RtmpSubStream::OnFirstMessage() {
  341. _message_handler->OnPlayable();
  342. }
  343. void RtmpSubStream::OnMetaData(brpc::RtmpMetaData* obj, const butil::StringPiece& name) {
  344. _message_handler->OnMetaData(obj, name);
  345. }
  346. void RtmpSubStream::OnSharedObjectMessage(brpc::RtmpSharedObjectMessage* msg) {
  347. _message_handler->OnSharedObjectMessage(msg);
  348. }
  349. void RtmpSubStream::OnAudioMessage(brpc::RtmpAudioMessage* msg) {
  350. _message_handler->OnAudioMessage(msg);
  351. }
  352. void RtmpSubStream::OnVideoMessage(brpc::RtmpVideoMessage* msg) {
  353. _message_handler->OnVideoMessage(msg);
  354. }
  355. void RtmpSubStream::OnStop() {
  356. _message_handler->OnSubStreamStop(this);
  357. }
  358. class RtmpSubStreamCreator : public brpc::SubStreamCreator {
  359. public:
  360. RtmpSubStreamCreator(const brpc::RtmpClient* client);
  361. ~RtmpSubStreamCreator();
  362. // @SubStreamCreator
  363. void NewSubStream(brpc::RtmpMessageHandler* message_handler,
  364. butil::intrusive_ptr<brpc::RtmpStreamBase>* sub_stream);
  365. void LaunchSubStream(brpc::RtmpStreamBase* sub_stream,
  366. brpc::RtmpRetryingClientStreamOptions* options);
  367. private:
  368. const brpc::RtmpClient* _client;
  369. };
  370. RtmpSubStreamCreator::RtmpSubStreamCreator(const brpc::RtmpClient* client)
  371. : _client(client) {}
  372. RtmpSubStreamCreator::~RtmpSubStreamCreator() {}
  373. void RtmpSubStreamCreator::NewSubStream(brpc::RtmpMessageHandler* message_handler,
  374. butil::intrusive_ptr<brpc::RtmpStreamBase>* sub_stream) {
  375. if (sub_stream) {
  376. (*sub_stream).reset(new RtmpSubStream(message_handler));
  377. }
  378. return;
  379. }
  380. void RtmpSubStreamCreator::LaunchSubStream(
  381. brpc::RtmpStreamBase* sub_stream,
  382. brpc::RtmpRetryingClientStreamOptions* options) {
  383. brpc::RtmpClientStreamOptions client_options = *options;
  384. dynamic_cast<RtmpSubStream*>(sub_stream)->Init(_client, client_options);
  385. }
  386. TEST(RtmpTest, parse_rtmp_url) {
  387. butil::StringPiece host;
  388. butil::StringPiece vhost;
  389. butil::StringPiece port;
  390. butil::StringPiece app;
  391. butil::StringPiece stream_name;
  392. brpc::ParseRtmpURL("rtmp://HOST/APP/STREAM",
  393. &host, &vhost, &port, &app, &stream_name);
  394. ASSERT_EQ("HOST", host);
  395. ASSERT_TRUE(vhost.empty());
  396. ASSERT_EQ("1935", port);
  397. ASSERT_EQ("APP", app);
  398. ASSERT_EQ("STREAM", stream_name);
  399. brpc::ParseRtmpURL("HOST/APP/STREAM",
  400. &host, &vhost, &port, &app, &stream_name);
  401. ASSERT_EQ("HOST", host);
  402. ASSERT_TRUE(vhost.empty());
  403. ASSERT_EQ("1935", port);
  404. ASSERT_EQ("APP", app);
  405. ASSERT_EQ("STREAM", stream_name);
  406. brpc::ParseRtmpURL("rtmp://HOST:8765//APP?vhost=abc///STREAM?queries",
  407. &host, &vhost, &port, &app, &stream_name);
  408. ASSERT_EQ("HOST", host);
  409. ASSERT_EQ("abc", vhost);
  410. ASSERT_EQ("8765", port);
  411. ASSERT_EQ("APP", app);
  412. ASSERT_EQ("STREAM?queries", stream_name);
  413. brpc::ParseRtmpURL("HOST:8765//APP?vhost=abc///STREAM?queries",
  414. &host, &vhost, &port, &app, &stream_name);
  415. ASSERT_EQ("HOST", host);
  416. ASSERT_EQ("abc", vhost);
  417. ASSERT_EQ("8765", port);
  418. ASSERT_EQ("APP", app);
  419. ASSERT_EQ("STREAM?queries", stream_name);
  420. brpc::ParseRtmpURL("HOST:8765//APP?vhost=abc///STREAM?queries/",
  421. &host, &vhost, &port, &app, &stream_name);
  422. ASSERT_EQ("HOST", host);
  423. ASSERT_EQ("abc", vhost);
  424. ASSERT_EQ("8765", port);
  425. ASSERT_EQ("APP", app);
  426. ASSERT_EQ("STREAM?queries/", stream_name);
  427. brpc::ParseRtmpURL("HOST:8765/APP?vhost=abc",
  428. &host, &vhost, &port, &app, &stream_name);
  429. ASSERT_EQ("HOST", host);
  430. ASSERT_EQ("abc", vhost);
  431. ASSERT_EQ("8765", port);
  432. ASSERT_EQ("APP", app);
  433. ASSERT_TRUE(stream_name.empty());
  434. }
  435. TEST(RtmpTest, amf) {
  436. std::string req_buf;
  437. brpc::RtmpInfo info;
  438. brpc::AMFObject obj;
  439. std::string dummy = "_result";
  440. {
  441. google::protobuf::io::StringOutputStream zc_stream(&req_buf);
  442. brpc::AMFOutputStream ostream(&zc_stream);
  443. brpc::WriteAMFString(dummy, &ostream);
  444. brpc::WriteAMFUint32(17, &ostream);
  445. info.set_code("NetConnection.Connect"); // TODO
  446. info.set_level("error");
  447. info.set_description("heheda hello foobar");
  448. brpc::WriteAMFObject(info, &ostream);
  449. ASSERT_TRUE(ostream.good());
  450. obj.SetString("code", "foo");
  451. obj.SetString("level", "bar");
  452. obj.SetString("description", "heheda");
  453. brpc::WriteAMFObject(obj, &ostream);
  454. ASSERT_TRUE(ostream.good());
  455. }
  456. google::protobuf::io::ArrayInputStream zc_stream(req_buf.data(), req_buf.size());
  457. brpc::AMFInputStream istream(&zc_stream);
  458. std::string result;
  459. ASSERT_TRUE(brpc::ReadAMFString(&result, &istream));
  460. ASSERT_EQ(dummy, result);
  461. uint32_t num = 0;
  462. ASSERT_TRUE(brpc::ReadAMFUint32(&num, &istream));
  463. ASSERT_EQ(17u, num);
  464. brpc::RtmpInfo info2;
  465. ASSERT_TRUE(brpc::ReadAMFObject(&info2, &istream));
  466. ASSERT_EQ(info.code(), info2.code());
  467. ASSERT_EQ(info.level(), info2.level());
  468. ASSERT_EQ(info.description(), info2.description());
  469. brpc::RtmpInfo info3;
  470. ASSERT_TRUE(brpc::ReadAMFObject(&info3, &istream));
  471. ASSERT_EQ("foo", info3.code());
  472. ASSERT_EQ("bar", info3.level());
  473. ASSERT_EQ("heheda", info3.description());
  474. }
  475. TEST(RtmpTest, successfully_play_streams) {
  476. PlayingDummyService rtmp_service;
  477. brpc::Server server;
  478. brpc::ServerOptions server_opt;
  479. server_opt.rtmp_service = &rtmp_service;
  480. ASSERT_EQ(0, server.Start(8571, &server_opt));
  481. brpc::RtmpClientOptions rtmp_opt;
  482. rtmp_opt.app = "hello";
  483. rtmp_opt.swfUrl = "anything";
  484. rtmp_opt.tcUrl = "rtmp://heheda";
  485. brpc::RtmpClient rtmp_client;
  486. ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt));
  487. // Create multiple streams.
  488. const int NSTREAM = 2;
  489. brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
  490. for (int i = 0; i < NSTREAM; ++i) {
  491. cstreams[i].reset(new TestRtmpClientStream);
  492. brpc::RtmpClientStreamOptions opt;
  493. opt.play_name = butil::string_printf("play_name_%d", i);
  494. //opt.publish_name = butil::string_printf("pub_name_%d", i);
  495. opt.wait_until_play_or_publish_is_sent = true;
  496. cstreams[i]->Init(&rtmp_client, opt);
  497. }
  498. sleep(5);
  499. for (int i = 0; i < NSTREAM; ++i) {
  500. cstreams[i]->assertions_on_successful_play();
  501. }
  502. LOG(INFO) << "Quiting program...";
  503. }
  504. TEST(RtmpTest, fail_to_play_streams) {
  505. PlayingDummyService rtmp_service;
  506. brpc::Server server;
  507. brpc::ServerOptions server_opt;
  508. server_opt.rtmp_service = &rtmp_service;
  509. ASSERT_EQ(0, server.Start(8571, &server_opt));
  510. brpc::RtmpClientOptions rtmp_opt;
  511. rtmp_opt.app = "hello";
  512. rtmp_opt.swfUrl = "anything";
  513. rtmp_opt.tcUrl = "rtmp://heheda";
  514. brpc::RtmpClient rtmp_client;
  515. ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt));
  516. // Create multiple streams.
  517. const int NSTREAM = 2;
  518. brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
  519. for (int i = 0; i < NSTREAM; ++i) {
  520. cstreams[i].reset(new TestRtmpClientStream);
  521. brpc::RtmpClientStreamOptions opt;
  522. opt.play_name = UNEXIST_NAME;
  523. opt.wait_until_play_or_publish_is_sent = true;
  524. cstreams[i]->Init(&rtmp_client, opt);
  525. }
  526. sleep(1);
  527. for (int i = 0; i < NSTREAM; ++i) {
  528. cstreams[i]->assertions_on_failure();
  529. }
  530. LOG(INFO) << "Quiting program...";
  531. }
  532. TEST(RtmpTest, successfully_publish_streams) {
  533. PublishService rtmp_service;
  534. brpc::Server server;
  535. brpc::ServerOptions server_opt;
  536. server_opt.rtmp_service = &rtmp_service;
  537. ASSERT_EQ(0, server.Start(8571, &server_opt));
  538. brpc::RtmpClientOptions rtmp_opt;
  539. rtmp_opt.app = "hello";
  540. rtmp_opt.swfUrl = "anything";
  541. rtmp_opt.tcUrl = "rtmp://heheda";
  542. brpc::RtmpClient rtmp_client;
  543. ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt));
  544. // Create multiple streams.
  545. const int NSTREAM = 2;
  546. brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
  547. for (int i = 0; i < NSTREAM; ++i) {
  548. cstreams[i].reset(new TestRtmpClientStream);
  549. brpc::RtmpClientStreamOptions opt;
  550. opt.publish_name = butil::string_printf("pub_name_%d", i);
  551. opt.wait_until_play_or_publish_is_sent = true;
  552. cstreams[i]->Init(&rtmp_client, opt);
  553. }
  554. const int REP = 5;
  555. for (int i = 0; i < REP; ++i) {
  556. brpc::RtmpVideoMessage vmsg;
  557. vmsg.timestamp = 1000 + i * 20;
  558. vmsg.frame_type = brpc::FLV_VIDEO_FRAME_KEYFRAME;
  559. vmsg.codec = brpc::FLV_VIDEO_AVC;
  560. vmsg.data.append(butil::string_printf("video_%d", i));
  561. for (int j = 0; j < NSTREAM; j += 2) {
  562. ASSERT_EQ(0, cstreams[j]->SendVideoMessage(vmsg));
  563. }
  564. brpc::RtmpAudioMessage amsg;
  565. amsg.timestamp = 1000 + i * 20;
  566. amsg.codec = brpc::FLV_AUDIO_AAC;
  567. amsg.rate = brpc::FLV_SOUND_RATE_44100HZ;
  568. amsg.bits = brpc::FLV_SOUND_16BIT;
  569. amsg.type = brpc::FLV_SOUND_STEREO;
  570. amsg.data.append(butil::string_printf("audio_%d", i));
  571. for (int j = 1; j < NSTREAM; j += 2) {
  572. ASSERT_EQ(0, cstreams[j]->SendAudioMessage(amsg));
  573. }
  574. bthread_usleep(500000);
  575. }
  576. std::vector<butil::intrusive_ptr<PublishStream> > created_streams;
  577. rtmp_service.move_created_streams(&created_streams);
  578. ASSERT_EQ(NSTREAM, (int)created_streams.size());
  579. for (int i = 0; i < NSTREAM; ++i) {
  580. EXPECT_EQ(1, created_streams[i]->_called_on_first_message);
  581. }
  582. for (int j = 0; j < NSTREAM; j += 2) {
  583. ASSERT_EQ(REP, created_streams[j]->_nvideomsg);
  584. }
  585. for (int j = 1; j < NSTREAM; j += 2) {
  586. ASSERT_EQ(REP, created_streams[j]->_naudiomsg);
  587. }
  588. LOG(INFO) << "Quiting program...";
  589. }
  590. TEST(RtmpTest, failed_to_publish_streams) {
  591. PublishService rtmp_service;
  592. brpc::Server server;
  593. brpc::ServerOptions server_opt;
  594. server_opt.rtmp_service = &rtmp_service;
  595. ASSERT_EQ(0, server.Start(8575, &server_opt));
  596. brpc::RtmpClientOptions rtmp_opt;
  597. rtmp_opt.app = "hello";
  598. rtmp_opt.swfUrl = "anything";
  599. rtmp_opt.tcUrl = "rtmp://heheda";
  600. brpc::RtmpClient rtmp_client;
  601. ASSERT_EQ(0, rtmp_client.Init("localhost:8575", rtmp_opt));
  602. // Create multiple streams.
  603. const int NSTREAM = 2;
  604. brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
  605. for (int i = 0; i < NSTREAM; ++i) {
  606. cstreams[i].reset(new TestRtmpClientStream);
  607. brpc::RtmpClientStreamOptions opt;
  608. opt.publish_name = UNEXIST_NAME;
  609. opt.wait_until_play_or_publish_is_sent = true;
  610. cstreams[i]->Init(&rtmp_client, opt);
  611. }
  612. sleep(1);
  613. for (int i = 0; i < NSTREAM; ++i) {
  614. cstreams[i]->assertions_on_failure();
  615. }
  616. std::vector<butil::intrusive_ptr<PublishStream> > created_streams;
  617. rtmp_service.move_created_streams(&created_streams);
  618. ASSERT_EQ(NSTREAM, (int)created_streams.size());
  619. for (int i = 0; i < NSTREAM; ++i) {
  620. ASSERT_EQ(0, created_streams[i]->_called_on_first_message);
  621. ASSERT_EQ(0, created_streams[i]->_nvideomsg);
  622. ASSERT_EQ(0, created_streams[i]->_naudiomsg);
  623. }
  624. LOG(INFO) << "Quiting program...";
  625. }
  626. TEST(RtmpTest, failed_to_connect_client_streams) {
  627. brpc::RtmpClientOptions rtmp_opt;
  628. rtmp_opt.app = "hello";
  629. rtmp_opt.swfUrl = "anything";
  630. rtmp_opt.tcUrl = "rtmp://heheda";
  631. brpc::RtmpClient rtmp_client;
  632. ASSERT_EQ(0, rtmp_client.Init("localhost:8572", rtmp_opt));
  633. // Create multiple streams.
  634. const int NSTREAM = 2;
  635. brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
  636. for (int i = 0; i < NSTREAM; ++i) {
  637. cstreams[i].reset(new TestRtmpClientStream);
  638. brpc::RtmpClientStreamOptions opt;
  639. opt.play_name = butil::string_printf("play_name_%d", i);
  640. opt.wait_until_play_or_publish_is_sent = true;
  641. cstreams[i]->Init(&rtmp_client, opt);
  642. cstreams[i]->assertions_on_failure();
  643. }
  644. LOG(INFO) << "Quiting program...";
  645. }
  646. TEST(RtmpTest, destroy_client_streams_before_init) {
  647. brpc::RtmpClientOptions rtmp_opt;
  648. rtmp_opt.app = "hello";
  649. rtmp_opt.swfUrl = "anything";
  650. rtmp_opt.tcUrl = "rtmp://heheda";
  651. brpc::RtmpClient rtmp_client;
  652. ASSERT_EQ(0, rtmp_client.Init("localhost:8573", rtmp_opt));
  653. // Create multiple streams.
  654. const int NSTREAM = 2;
  655. butil::intrusive_ptr<TestRtmpClientStream> cstreams[NSTREAM];
  656. for (int i = 0; i < NSTREAM; ++i) {
  657. cstreams[i].reset(new TestRtmpClientStream);
  658. cstreams[i]->Destroy();
  659. ASSERT_EQ(1, cstreams[i]->_called_on_stop);
  660. ASSERT_EQ(brpc::RtmpClientStream::STATE_DESTROYING, cstreams[i]->_state);
  661. brpc::RtmpClientStreamOptions opt;
  662. opt.play_name = butil::string_printf("play_name_%d", i);
  663. opt.wait_until_play_or_publish_is_sent = true;
  664. cstreams[i]->Init(&rtmp_client, opt);
  665. cstreams[i]->assertions_on_failure();
  666. }
  667. LOG(INFO) << "Quiting program...";
  668. }
  669. TEST(RtmpTest, destroy_retrying_client_streams_before_init) {
  670. brpc::RtmpClientOptions rtmp_opt;
  671. rtmp_opt.app = "hello";
  672. rtmp_opt.swfUrl = "anything";
  673. rtmp_opt.tcUrl = "rtmp://heheda";
  674. brpc::RtmpClient rtmp_client;
  675. ASSERT_EQ(0, rtmp_client.Init("localhost:8573", rtmp_opt));
  676. // Create multiple streams.
  677. const int NSTREAM = 2;
  678. butil::intrusive_ptr<TestRtmpRetryingClientStream> cstreams[NSTREAM];
  679. for (int i = 0; i < NSTREAM; ++i) {
  680. cstreams[i].reset(new TestRtmpRetryingClientStream);
  681. cstreams[i]->Destroy();
  682. ASSERT_EQ(1, cstreams[i]->_called_on_stop);
  683. brpc::RtmpRetryingClientStreamOptions opt;
  684. opt.play_name = butil::string_printf("play_name_%d", i);
  685. brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client);
  686. cstreams[i]->Init(sc, opt);
  687. ASSERT_EQ(1, cstreams[i]->_called_on_stop);
  688. }
  689. LOG(INFO) << "Quiting program...";
  690. }
  691. TEST(RtmpTest, destroy_client_streams_during_creation) {
  692. PlayingDummyService rtmp_service(2000/*sleep 2s*/);
  693. brpc::Server server;
  694. brpc::ServerOptions server_opt;
  695. server_opt.rtmp_service = &rtmp_service;
  696. ASSERT_EQ(0, server.Start(8574, &server_opt));
  697. brpc::RtmpClientOptions rtmp_opt;
  698. rtmp_opt.app = "hello";
  699. rtmp_opt.swfUrl = "anything";
  700. rtmp_opt.tcUrl = "rtmp://heheda";
  701. brpc::RtmpClient rtmp_client;
  702. ASSERT_EQ(0, rtmp_client.Init("localhost:8574", rtmp_opt));
  703. // Create multiple streams.
  704. const int NSTREAM = 2;
  705. butil::intrusive_ptr<TestRtmpClientStream> cstreams[NSTREAM];
  706. for (int i = 0; i < NSTREAM; ++i) {
  707. cstreams[i].reset(new TestRtmpClientStream);
  708. brpc::RtmpClientStreamOptions opt;
  709. opt.play_name = butil::string_printf("play_name_%d", i);
  710. cstreams[i]->Init(&rtmp_client, opt);
  711. ASSERT_EQ(0, cstreams[i]->_called_on_stop);
  712. usleep(500*1000);
  713. ASSERT_EQ(0, cstreams[i]->_called_on_stop);
  714. cstreams[i]->Destroy();
  715. usleep(10*1000);
  716. ASSERT_EQ(1, cstreams[i]->_called_on_stop);
  717. }
  718. LOG(INFO) << "Quiting program...";
  719. }
  720. TEST(RtmpTest, destroy_retrying_client_streams_during_creation) {
  721. PlayingDummyService rtmp_service(2000/*sleep 2s*/);
  722. brpc::Server server;
  723. brpc::ServerOptions server_opt;
  724. server_opt.rtmp_service = &rtmp_service;
  725. ASSERT_EQ(0, server.Start(8574, &server_opt));
  726. brpc::RtmpClientOptions rtmp_opt;
  727. rtmp_opt.app = "hello";
  728. rtmp_opt.swfUrl = "anything";
  729. rtmp_opt.tcUrl = "rtmp://heheda";
  730. brpc::RtmpClient rtmp_client;
  731. ASSERT_EQ(0, rtmp_client.Init("localhost:8574", rtmp_opt));
  732. // Create multiple streams.
  733. const int NSTREAM = 2;
  734. butil::intrusive_ptr<TestRtmpRetryingClientStream> cstreams[NSTREAM];
  735. for (int i = 0; i < NSTREAM; ++i) {
  736. cstreams[i].reset(new TestRtmpRetryingClientStream);
  737. brpc::RtmpRetryingClientStreamOptions opt;
  738. opt.play_name = butil::string_printf("play_name_%d", i);
  739. brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client);
  740. cstreams[i]->Init(sc, opt);
  741. ASSERT_EQ(0, cstreams[i]->_called_on_stop);
  742. usleep(500*1000);
  743. ASSERT_EQ(0, cstreams[i]->_called_on_stop);
  744. cstreams[i]->Destroy();
  745. usleep(10*1000);
  746. ASSERT_EQ(1, cstreams[i]->_called_on_stop);
  747. }
  748. LOG(INFO) << "Quiting program...";
  749. }
  750. TEST(RtmpTest, retrying_stream) {
  751. PlayingDummyService rtmp_service;
  752. brpc::Server server;
  753. brpc::ServerOptions server_opt;
  754. server_opt.rtmp_service = &rtmp_service;
  755. ASSERT_EQ(0, server.Start(8576, &server_opt));
  756. brpc::RtmpClientOptions rtmp_opt;
  757. rtmp_opt.app = "hello";
  758. rtmp_opt.swfUrl = "anything";
  759. rtmp_opt.tcUrl = "rtmp://heheda";
  760. brpc::RtmpClient rtmp_client;
  761. ASSERT_EQ(0, rtmp_client.Init("localhost:8576", rtmp_opt));
  762. // Create multiple streams.
  763. const int NSTREAM = 2;
  764. brpc::DestroyingPtr<TestRtmpRetryingClientStream> cstreams[NSTREAM];
  765. for (int i = 0; i < NSTREAM; ++i) {
  766. cstreams[i].reset(new TestRtmpRetryingClientStream);
  767. brpc::Controller cntl;
  768. brpc::RtmpRetryingClientStreamOptions opt;
  769. opt.play_name = butil::string_printf("name_%d", i);
  770. brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client);
  771. cstreams[i]->Init(sc, opt);
  772. }
  773. sleep(3);
  774. LOG(INFO) << "Stopping server";
  775. server.Stop(0);
  776. server.Join();
  777. LOG(INFO) << "Stopped server and sleep for a while";
  778. sleep(3);
  779. ASSERT_EQ(0, server.Start(8576, &server_opt));
  780. sleep(3);
  781. for (int i = 0; i < NSTREAM; ++i) {
  782. ASSERT_EQ(1, cstreams[i]->_called_on_first_message);
  783. ASSERT_EQ(2, cstreams[i]->_called_on_playable);
  784. }
  785. LOG(INFO) << "Quiting program...";
  786. }