rtmp.cpp 99 KB

  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <gflags/gflags.h>
  18. #include <google/protobuf/io/zero_copy_stream_impl_lite.h> // StringOutputStream
  19. #include "bthread/bthread.h" // bthread_id_xx
  20. #include "bthread/unstable.h" // bthread_timer_del
  21. #include "brpc/log.h"
  22. #include "brpc/callback.h" // Closure
  23. #include "brpc/channel.h" // Channel
  24. #include "brpc/socket_map.h" // SocketMap
  25. #include "brpc/socket.h" // Socket
  26. #include "brpc/policy/rtmp_protocol.h" // policy::*
  27. #include "brpc/rtmp.h"
  28. #include "brpc/details/rtmp_utils.h"
  29. namespace brpc {
  30. DEFINE_bool(rtmp_server_close_connection_on_error, true,
  31. "Close the client connection on play/publish errors, clients setting"
  32. " RtmpConnectRequest.stream_multiplexing to true are not affected"
  33. " by this flag");
  34. struct RtmpBvars {
  35. bvar::Adder<int> client_count;
  36. bvar::Adder<int> client_stream_count;
  37. bvar::Adder<int> retrying_client_stream_count;
  38. bvar::Adder<int> server_stream_count;
  39. RtmpBvars()
  40. : client_count("rtmp_client_count")
  41. , client_stream_count("rtmp_client_stream_count")
  42. , retrying_client_stream_count("rtmp_retrying_client_stream_count")
  43. , server_stream_count("rtmp_server_stream_count") {
  44. }
  45. };
  46. inline RtmpBvars* get_rtmp_bvars() {
  47. return butil::get_leaky_singleton<RtmpBvars>();
  48. }
  49. namespace policy {
  50. int SendC0C1(int fd, bool* is_simple_handshake);
  51. int WriteWithoutOvercrowded(Socket*, SocketMessagePtr<>& msg);
  52. }
  53. FlvWriter::FlvWriter(butil::IOBuf* buf)
  54. : _write_header(false), _buf(buf), _options() {
  55. }
  56. FlvWriter::FlvWriter(butil::IOBuf* buf, const FlvWriterOptions& options)
  57. : _write_header(false), _buf(buf), _options(options) {
  58. }
  59. butil::Status FlvWriter::Write(const RtmpVideoMessage& msg) {
  60. char buf[32];
  61. char* p = buf;
  62. if (!_write_header) {
  63. _write_header = true;
  64. const char flags_bit = static_cast<char>(_options.flv_content_type);
  65. const char header[9] = { 'F', 'L', 'V', 0x01, flags_bit, 0, 0, 0, 0x09 };
  66. memcpy(p, header, sizeof(header));
  67. p += sizeof(header);
  68. policy::WriteBigEndian4Bytes(&p, 0); // PreviousTagSize0
  69. }
  70. // FLV tag
  71. *p++ = FLV_TAG_VIDEO;
  72. policy::WriteBigEndian3Bytes(&p, msg.size());
  73. policy::WriteBigEndian3Bytes(&p, (msg.timestamp & 0xFFFFFF));
  74. *p++ = (msg.timestamp >> 24) & 0xFF;
  75. policy::WriteBigEndian3Bytes(&p, 0); // StreamID
  76. // header of VIDEODATA
  77. *p++ = ((msg.frame_type & 0xF) << 4) | (msg.codec & 0xF);
  78. _buf->append(buf, p - buf);
  79. _buf->append(msg.data);
  80. // PreviousTagSize
  81. p = buf;
  82. policy::WriteBigEndian4Bytes(&p, 11 + msg.size());
  83. _buf->append(buf, p - buf);
  84. return butil::Status::OK();
  85. }
  86. butil::Status FlvWriter::Write(const RtmpAudioMessage& msg) {
  87. char buf[32];
  88. char* p = buf;
  89. if (!_write_header) {
  90. _write_header = true;
  91. const char flags_bit = static_cast<char>(_options.flv_content_type);
  92. const char header[9] = { 'F', 'L', 'V', 0x01, flags_bit, 0, 0, 0, 0x09 };
  93. memcpy(p, header, sizeof(header));
  94. p += sizeof(header);
  95. policy::WriteBigEndian4Bytes(&p, 0); // PreviousTagSize0
  96. }
  97. // FLV tag
  98. *p++ = FLV_TAG_AUDIO;
  99. policy::WriteBigEndian3Bytes(&p, msg.size());
  100. policy::WriteBigEndian3Bytes(&p, (msg.timestamp & 0xFFFFFF));
  101. *p++ = (msg.timestamp >> 24) & 0xFF;
  102. policy::WriteBigEndian3Bytes(&p, 0); // StreamID
  103. // header of AUDIODATA
  104. *p++ = ((msg.codec & 0xF) << 4)
  105. | ((msg.rate & 0x3) << 2)
  106. | ((msg.bits & 0x1) << 1)
  107. | (msg.type & 0x1);
  108. _buf->append(buf, p - buf);
  109. _buf->append(msg.data);
  110. // PreviousTagSize
  111. p = buf;
  112. policy::WriteBigEndian4Bytes(&p, 11 + msg.size());
  113. _buf->append(buf, p - buf);
  114. return butil::Status::OK();
  115. }
  116. butil::Status FlvWriter::WriteScriptData(const butil::IOBuf& req_buf, uint32_t timestamp) {
  117. char buf[32];
  118. char* p = buf;
  119. if (!_write_header) {
  120. _write_header = true;
  121. const char flags_bit = static_cast<char>(_options.flv_content_type);
  122. const char header[9] = { 'F', 'L', 'V', 0x01, flags_bit, 0, 0, 0, 0x09 };
  123. memcpy(p, header, sizeof(header));
  124. p += sizeof(header);
  125. policy::WriteBigEndian4Bytes(&p, 0); // PreviousTagSize0
  126. }
  127. // FLV tag
  128. *p++ = FLV_TAG_SCRIPT_DATA;
  129. policy::WriteBigEndian3Bytes(&p, req_buf.size());
  130. policy::WriteBigEndian3Bytes(&p, (timestamp & 0xFFFFFF));
  131. *p++ = (timestamp >> 24) & 0xFF;
  132. policy::WriteBigEndian3Bytes(&p, 0); // StreamID
  133. _buf->append(buf, p - buf);
  134. _buf->append(req_buf);
  135. // PreviousTagSize
  136. p = buf;
  137. policy::WriteBigEndian4Bytes(&p, 11 + req_buf.size());
  138. _buf->append(buf, p - buf);
  139. return butil::Status::OK();
  140. }
  141. butil::Status FlvWriter::Write(const RtmpCuePoint& cuepoint) {
  142. butil::IOBuf req_buf;
  143. {
  144. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  145. AMFOutputStream ostream(&zc_stream);
  146. WriteAMFString(RTMP_AMF0_SET_DATAFRAME, &ostream);
  147. WriteAMFString(RTMP_AMF0_ON_CUE_POINT, &ostream);
  148. WriteAMFObject(cuepoint.data, &ostream);
  149. if (!ostream.good()) {
  150. return butil::Status(EINVAL, "Fail to serialize cuepoint");
  151. }
  152. }
  153. return WriteScriptData(req_buf, cuepoint.timestamp);
  154. }
  155. butil::Status FlvWriter::Write(const RtmpMetaData& metadata) {
  156. butil::IOBuf req_buf;
  157. {
  158. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  159. AMFOutputStream ostream(&zc_stream);
  160. WriteAMFString(RTMP_AMF0_ON_META_DATA, &ostream);
  161. WriteAMFObject(metadata.data, &ostream);
  162. if (!ostream.good()) {
  163. return butil::Status(EINVAL, "Fail to serialize metadata");
  164. }
  165. }
  166. return WriteScriptData(req_buf, metadata.timestamp);
  167. }
  168. FlvReader::FlvReader(butil::IOBuf* buf)
  169. : _read_header(false), _buf(buf) {
  170. }
  171. butil::Status FlvReader::ReadHeader() {
  172. if (!_read_header) {
  173. // 9 is the size of FlvHeader, which is usually composed of
  174. // { 'F', 'L', 'V', 0x01, 0x05, 0, 0, 0, 0x09 }.
  175. char header_buf[9 + 4/* PreviousTagSize0 */];
  176. const char* p = (const char*)_buf->fetch(header_buf, sizeof(header_buf));
  177. if (p == NULL) {
  178. return butil::Status(EAGAIN, "Fail to read, not enough data");
  179. }
  180. const char flv_header_signature[3] = { 'F', 'L', 'V' };
  181. if (memcmp(p, flv_header_signature, sizeof(flv_header_signature)) != 0) {
  182. LOG(FATAL) << "Fail to parse FLV header";
  183. return butil::Status(EINVAL, "Fail to parse FLV header");
  184. }
  185. _buf->pop_front(sizeof(header_buf));
  186. _read_header = true;
  187. }
  188. return butil::Status::OK();
  189. }
  190. butil::Status FlvReader::PeekMessageType(FlvTagType* type_out) {
  191. butil::Status st = ReadHeader();
  192. if (!st.ok()) {
  193. return st;
  194. }
  195. const char* p = (const char*)_buf->fetch1();
  196. if (p == NULL) {
  197. return butil::Status(EAGAIN, "Fail to read, not enough data");
  198. }
  199. FlvTagType type = (FlvTagType)*p;
  200. if (type != FLV_TAG_AUDIO && type != FLV_TAG_VIDEO &&
  201. type != FLV_TAG_SCRIPT_DATA) {
  202. return butil::Status(EINVAL, "Fail to parse FLV tag");
  203. }
  204. if (type_out) {
  205. *type_out = type;
  206. }
  207. return butil::Status::OK();
  208. }
  209. butil::Status FlvReader::Read(RtmpVideoMessage* msg) {
  210. char tags[11];
  211. const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags));
  212. if (p == NULL) {
  213. return butil::Status(EAGAIN, "Fail to read, not enough data");
  214. }
  215. if (*p != FLV_TAG_VIDEO) {
  216. return butil::Status(EINVAL, "Fail to parse RtmpVideoMessage");
  217. }
  218. uint32_t msg_size = policy::ReadBigEndian3Bytes(p + 1);
  219. uint32_t timestamp = policy::ReadBigEndian3Bytes(p + 4);
  220. timestamp |= (*(p + 7) << 24);
  221. if (_buf->length() < 11 + msg_size + 4/*PreviousTagSize*/) {
  222. return butil::Status(EAGAIN, "Fail to read, not enough data");
  223. }
  224. _buf->pop_front(11);
  225. char first_byte = 0;
  226. CHECK(_buf->cut1(&first_byte));
  227. msg->timestamp = timestamp;
  228. msg->frame_type = (FlvVideoFrameType)((first_byte >> 4) & 0xF);
  229. msg->codec = (FlvVideoCodec)(first_byte & 0xF);
  230. // TODO(zhujiashun): check the validation of frame_type and codec
  231. _buf->cutn(&msg->data, msg_size - 1);
  232. _buf->pop_front(4/* PreviousTagSize0 */);
  233. return butil::Status::OK();
  234. }
  235. butil::Status FlvReader::Read(RtmpAudioMessage* msg) {
  236. char tags[11];
  237. const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags));
  238. if (p == NULL) {
  239. return butil::Status(EAGAIN, "Fail to read, not enough data");
  240. }
  241. if (*p != FLV_TAG_AUDIO) {
  242. return butil::Status(EINVAL, "Fail to parse RtmpAudioMessage");
  243. }
  244. uint32_t msg_size = policy::ReadBigEndian3Bytes(p + 1);
  245. uint32_t timestamp = policy::ReadBigEndian3Bytes(p + 4);
  246. timestamp |= (*(p + 7) << 24);
  247. if (_buf->length() < 11 + msg_size + 4/*PreviousTagSize*/) {
  248. return butil::Status(EAGAIN, "Fail to read, not enough data");
  249. }
  250. _buf->pop_front(11);
  251. char first_byte = 0;
  252. CHECK(_buf->cut1(&first_byte));
  253. msg->timestamp = timestamp;
  254. msg->codec = (FlvAudioCodec)((first_byte >> 4) & 0xF);
  255. msg->rate = (FlvSoundRate)((first_byte >> 2) & 0x3);
  256. msg->bits = (FlvSoundBits)((first_byte >> 1) & 0x1);
  257. msg->type = (FlvSoundType)(first_byte & 0x1);
  258. _buf->cutn(&msg->data, msg_size - 1);
  259. _buf->pop_front(4/* PreviousTagSize0 */);
  260. return butil::Status::OK();
  261. }
  262. butil::Status FlvReader::Read(RtmpMetaData* msg, std::string* name) {
  263. char tags[11];
  264. const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags));
  265. if (p == NULL) {
  266. return butil::Status(EAGAIN, "Fail to read, not enough data");
  267. }
  268. if (*p != FLV_TAG_SCRIPT_DATA) {
  269. return butil::Status(EINVAL, "Fail to parse RtmpScriptMessage");
  270. }
  271. uint32_t msg_size = policy::ReadBigEndian3Bytes(p + 1);
  272. uint32_t timestamp = policy::ReadBigEndian3Bytes(p + 4);
  273. timestamp |= (*(p + 7) << 24);
  274. if (_buf->length() < 11 + msg_size + 4/*PreviousTagSize*/) {
  275. return butil::Status(EAGAIN, "Fail to read, not enough data");
  276. }
  277. _buf->pop_front(11);
  278. butil::IOBuf req_buf;
  279. _buf->cutn(&req_buf, msg_size);
  280. _buf->pop_front(4/* PreviousTagSize0 */);
  281. {
  282. butil::IOBufAsZeroCopyInputStream zc_stream(req_buf);
  283. AMFInputStream istream(&zc_stream);
  284. if (!ReadAMFString(name, &istream)) {
  285. return butil::Status(EINVAL, "Fail to read AMF string");
  286. }
  287. if (!ReadAMFObject(&msg->data, &istream)) {
  288. return butil::Status(EINVAL, "Fail to read AMF object");
  289. }
  290. }
  291. msg->timestamp = timestamp;
  292. return butil::Status::OK();
  293. }
  294. const char* FlvVideoFrameType2Str(FlvVideoFrameType t) {
  295. switch (t) {
  296. case FLV_VIDEO_FRAME_KEYFRAME: return "keyframe";
  297. case FLV_VIDEO_FRAME_INTERFRAME: return "interframe";
  298. case FLV_VIDEO_FRAME_DISPOSABLE_INTERFRAME: return "disposable interframe";
  299. case FLV_VIDEO_FRAME_GENERATED_KEYFRAME: return "generated keyframe";
  300. case FLV_VIDEO_FRAME_INFOFRAME: return "info/command frame";
  301. }
  302. return "Unknown FlvVideoFrameType";
  303. }
  304. const char* FlvVideoCodec2Str(FlvVideoCodec id) {
  305. switch (id) {
  306. case FLV_VIDEO_JPEG: return "JPEG";
  307. case FLV_VIDEO_SORENSON_H263: return "Sorenson H.263";
  308. case FLV_VIDEO_SCREEN_VIDEO: return "Screen video";
  309. case FLV_VIDEO_ON2_VP6: return "On2 VP6";
  311. return "On2 VP6 with alpha channel";
  312. case FLV_VIDEO_SCREEN_VIDEO_V2: return "Screen video version 2";
  313. case FLV_VIDEO_AVC: return "AVC";
  314. case FLV_VIDEO_HEVC: return "H.265";
  315. }
  316. return "Unknown FlvVideoCodec";
  317. }
  318. const char* FlvAudioCodec2Str(FlvAudioCodec codec) {
  319. switch (codec) {
  321. return "Linear PCM, platform endian";
  322. case FLV_AUDIO_ADPCM: return "ADPCM";
  323. case FLV_AUDIO_MP3: return "MP3";
  325. return "Linear PCM, little endian";
  327. return "Nellymoser 16-kHz mono";
  329. return "Nellymoser 8-kHz mono";
  331. return "Nellymoser";
  333. return "G.711 A-law logarithmic PCM";
  335. return "G.711 mu-law logarithmic PCM";
  337. return "reserved";
  338. case FLV_AUDIO_AAC: return "AAC";
  339. case FLV_AUDIO_SPEEX: return "Speex";
  340. case FLV_AUDIO_MP3_8KHZ: return "MP3 8-Khz";
  342. return "Device-specific sound";
  343. }
  344. return "Unknown FlvAudioCodec";
  345. }
  346. const char* FlvSoundRate2Str(FlvSoundRate rate) {
  347. switch (rate) {
  348. case FLV_SOUND_RATE_5512HZ: return "5512";
  349. case FLV_SOUND_RATE_11025HZ: return "11025";
  350. case FLV_SOUND_RATE_22050HZ: return "22050";
  351. case FLV_SOUND_RATE_44100HZ: return "44100";
  352. }
  353. return "Unknown FlvSoundRate";
  354. }
  355. const char* FlvSoundBits2Str(FlvSoundBits size) {
  356. switch (size) {
  357. case FLV_SOUND_8BIT: return "8";
  358. case FLV_SOUND_16BIT: return "16";
  359. }
  360. return "Unknown FlvSoundBits";
  361. }
  362. const char* FlvSoundType2Str(FlvSoundType t) {
  363. switch (t) {
  364. case FLV_SOUND_MONO: return "mono";
  365. case FLV_SOUND_STEREO: return "stereo";
  366. }
  367. return "Unknown FlvSoundType";
  368. }
  369. std::ostream& operator<<(std::ostream& os, const RtmpAudioMessage& msg) {
  370. return os << "AudioMessage{timestamp=" << msg.timestamp
  371. << " codec=" << FlvAudioCodec2Str(msg.codec)
  372. << " rate=" << FlvSoundRate2Str(msg.rate)
  373. << " bits=" << FlvSoundBits2Str(msg.bits)
  374. << " type=" << FlvSoundType2Str(msg.type)
  375. << " data=" << butil::ToPrintable(msg.data) << '}';
  376. }
  377. std::ostream& operator<<(std::ostream& os, const RtmpVideoMessage& msg) {
  378. return os << "VideoMessage{timestamp=" << msg.timestamp
  379. << " type=" << FlvVideoFrameType2Str(msg.frame_type)
  380. << " codec=" << FlvVideoCodec2Str(msg.codec)
  381. << " data=" << butil::ToPrintable(msg.data) << '}';
  382. }
  383. butil::Status RtmpAACMessage::Create(const RtmpAudioMessage& msg) {
  384. if (msg.codec != FLV_AUDIO_AAC) {
  385. return butil::Status(EINVAL, "codec=%s is not AAC",
  386. FlvAudioCodec2Str(msg.codec));
  387. }
  388. const uint8_t* p = (const uint8_t*)msg.data.fetch1();
  389. if (p == NULL) {
  390. return butil::Status(EINVAL, "Not enough data in AudioMessage");
  391. }
  392. if (*p > FLV_AAC_PACKET_RAW) {
  393. return butil::Status(EINVAL, "Invalid AAC packet_type=%d", (int)*p);
  394. }
  395. this->timestamp = msg.timestamp;
  396. this->rate = msg.rate;
  397. this->bits = msg.bits;
  398. this->type = msg.type;
  399. this->packet_type = (FlvAACPacketType)*p;
  400. msg.data.append_to(&data, msg.data.size() - 1, 1);
  401. return butil::Status::OK();
  402. }
  403. AudioSpecificConfig::AudioSpecificConfig()
  404. : aac_object(AAC_OBJECT_UNKNOWN)
  405. , aac_sample_rate(0)
  406. , aac_channels(0) {
  407. }
  408. butil::Status AudioSpecificConfig::Create(const butil::IOBuf& buf) {
  409. if (buf.size() < 2u) {
  410. return butil::Status(EINVAL, "data_size=%" PRIu64 " is too short",
  411. (uint64_t)buf.size());
  412. }
  413. char tmpbuf[2];
  414. buf.copy_to(tmpbuf, arraysize(tmpbuf));
  415. return Create(tmpbuf, arraysize(tmpbuf));
  416. }
  417. butil::Status AudioSpecificConfig::Create(const void* data, size_t len) {
  418. if (len < 2u) {
  419. return butil::Status(EINVAL, "data_size=%" PRIu64 " is too short", (uint64_t)len);
  420. }
  421. uint8_t profile_ObjectType = ((const char*)data)[0];
  422. uint8_t samplingFrequencyIndex = ((const char*)data)[1];
  423. aac_channels = (samplingFrequencyIndex >> 3) & 0x0f;
  424. aac_sample_rate = ((profile_ObjectType << 1) & 0x0e) | ((samplingFrequencyIndex >> 7) & 0x01);
  425. aac_object = (AACObjectType)((profile_ObjectType >> 3) & 0x1f);
  426. if (aac_object == AAC_OBJECT_UNKNOWN) {
  427. return butil::Status(EINVAL, "Invalid object type");
  428. }
  429. return butil::Status::OK();
  430. }
  431. bool RtmpAudioMessage::IsAACSequenceHeader() const {
  432. if (codec != FLV_AUDIO_AAC) {
  433. return false;
  434. }
  435. const uint8_t* p = (const uint8_t*)data.fetch1();
  436. if (p == NULL) {
  437. return false;
  438. }
  440. }
  441. butil::Status RtmpAVCMessage::Create(const RtmpVideoMessage& msg) {
  442. if (msg.codec != FLV_VIDEO_AVC) {
  443. return butil::Status(EINVAL, "codec=%s is not AVC",
  444. FlvVideoCodec2Str(msg.codec));
  445. }
  446. uint8_t buf[4];
  447. const uint8_t* p = (const uint8_t*)msg.data.fetch(buf, sizeof(buf));
  448. if (p == NULL) {
  449. return butil::Status(EINVAL, "Not enough data in VideoMessage");
  450. }
  452. return butil::Status(EINVAL, "Invalid AVC packet_type=%d", (int)*p);
  453. }
  454. this->timestamp = msg.timestamp;
  455. this->frame_type = msg.frame_type;
  456. this->packet_type = (FlvAVCPacketType)*p;
  457. this->composition_time = policy::ReadBigEndian3Bytes(p + 1);
  458. msg.data.append_to(&data, msg.data.size() - 4, 4);
  459. return butil::Status::OK();
  460. }
  461. bool RtmpVideoMessage::IsAVCSequenceHeader() const {
  462. if (codec != FLV_VIDEO_AVC || frame_type != FLV_VIDEO_FRAME_KEYFRAME) {
  463. return false;
  464. }
  465. const uint8_t* p = (const uint8_t*)data.fetch1();
  466. if (p == NULL) {
  467. return false;
  468. }
  470. }
  471. bool RtmpVideoMessage::IsHEVCSequenceHeader() const {
  472. if (codec != FLV_VIDEO_HEVC || frame_type != FLV_VIDEO_FRAME_KEYFRAME) {
  473. return false;
  474. }
  475. const uint8_t* p = (const uint8_t*)data.fetch1();
  476. if (p == NULL) {
  477. return false;
  478. }
  480. }
  481. const char* AVCProfile2Str(AVCProfile p) {
  482. switch (p) {
  483. case AVC_PROFILE_BASELINE: return "Baseline";
  484. case AVC_PROFILE_CONSTRAINED_BASELINE: return "ConstrainedBaseline";
  485. case AVC_PROFILE_MAIN: return "Main";
  486. case AVC_PROFILE_EXTENDED: return "Extended";
  487. case AVC_PROFILE_HIGH: return "High";
  488. case AVC_PROFILE_HIGH10: return "High10";
  489. case AVC_PROFILE_HIGH10_INTRA: return "High10Intra";
  490. case AVC_PROFILE_HIGH422: return "High422";
  491. case AVC_PROFILE_HIGH422_INTRA: return "High422Intra";
  492. case AVC_PROFILE_HIGH444: return "High444";
  493. case AVC_PROFILE_HIGH444_PREDICTIVE: return "High444Predictive";
  494. case AVC_PROFILE_HIGH444_INTRA: return "High444Intra";
  495. }
  496. return "Unknown";
  497. }
  498. AVCDecoderConfigurationRecord::AVCDecoderConfigurationRecord()
  499. : width(0)
  500. , height(0)
  501. , avc_profile((AVCProfile)0)
  502. , avc_level((AVCLevel)0)
  503. , length_size_minus1(-1) {
  504. }
  505. std::ostream& operator<<(std::ostream& os,
  506. const AVCDecoderConfigurationRecord& r) {
  507. os << "{profile=" << AVCProfile2Str(r.avc_profile)
  508. << " level=" << (int)r.avc_level
  509. << " length_size_minus1=" << (int)r.length_size_minus1
  510. << " width=" << r.width
  511. << " height=" << r.height
  512. << " sps=[";
  513. for (size_t i = 0; i < r.sps_list.size(); ++i) {
  514. if (i) {
  515. os << ' ';
  516. }
  517. os << r.sps_list[i].size();
  518. }
  519. os << "] pps=[";
  520. for (size_t i = 0; i < r.pps_list.size(); ++i) {
  521. if (i) {
  522. os << ' ';
  523. }
  524. os << r.pps_list[i].size();
  525. }
  526. os << "]}";
  527. return os;
  528. }
  529. butil::Status AVCDecoderConfigurationRecord::Create(const butil::IOBuf& buf) {
  530. // the buf should be short generally, copy it out to continuous memory
  531. // to simplify parsing.
  532. DEFINE_SMALL_ARRAY(char, cont_buf, buf.size(), 64);
  533. buf.copy_to(cont_buf, buf.size());
  534. return Create(cont_buf, buf.size());
  535. }
  536. butil::Status AVCDecoderConfigurationRecord::Create(const void* data, size_t len) {
  537. butil::StringPiece buf((const char*)data, len);
  538. if (buf.size() < 6) {
  539. return butil::Status(EINVAL, "Length=%lu is not long enough",
  540. (unsigned long)buf.size());
  541. }
  542. // skip configurationVersion at buf[0]
  543. avc_profile = (AVCProfile)buf[1];
  544. // skip profile_compatibility at buf[2]
  545. avc_level = (AVCLevel)buf[3];
  546. // Syntax, H.264-AVC-ISO_IEC_14496-15.pdf, page 16
  547. // AVC decoder configuration record
  548. // Semantics
  549. // The value of this field shall be one of 0, 1, or 3 corresponding to a
  550. // length encoded with 1, 2, or 4 bytes, respectively.
  551. length_size_minus1 = buf[4] & 0x03;
  552. if (length_size_minus1 == 2) {
  553. return butil::Status(EINVAL, "lengthSizeMinusOne should never be 2");
  554. }
  555. // Parsing SPS
  556. const int num_sps = (int)(buf[5] & 0x1f);
  557. buf.remove_prefix(6);
  558. sps_list.clear();
  559. sps_list.reserve(num_sps);
  560. for (int i = 0; i < num_sps; ++i) {
  561. if (buf.size() < 2) {
  562. return butil::Status(EINVAL, "Not enough data to decode SPS-length");
  563. }
  564. const uint16_t sps_length = policy::ReadBigEndian2Bytes(buf.data());
  565. if (buf.size() < 2u + sps_length) {
  566. return butil::Status(EINVAL, "Not enough data to decode SPS");
  567. }
  568. if (sps_length > 0) {
  569. butil::Status st = ParseSPS(buf.data() + 2, sps_length);
  570. if (!st.ok()) {
  571. return st;
  572. }
  573. sps_list.push_back(buf.substr(2, sps_length).as_string());
  574. }
  575. buf.remove_prefix(2 + sps_length);
  576. }
  577. // Parsing PPS
  578. pps_list.clear();
  579. if (buf.empty()) {
  580. return butil::Status(EINVAL, "Not enough data to decode PPS");
  581. }
  582. const int num_pps = (int)buf[0];
  583. buf.remove_prefix(1);
  584. for (int i = 0; i < num_pps; ++i) {
  585. if (buf.size() < 2) {
  586. return butil::Status(EINVAL, "Not enough data to decode PPS-length");
  587. }
  588. const uint16_t pps_length = policy::ReadBigEndian2Bytes(buf.data());
  589. if (buf.size() < 2u + pps_length) {
  590. return butil::Status(EINVAL, "Not enough data to decode PPS");
  591. }
  592. if (pps_length > 0) {
  593. pps_list.push_back(buf.substr(2, pps_length).as_string());
  594. }
  595. buf.remove_prefix(2 + pps_length);
  596. }
  597. return butil::Status::OK();
  598. }
  599. butil::Status AVCDecoderConfigurationRecord::ParseSPS(
  600. const butil::StringPiece& buf, size_t sps_length) {
  601. // for NALU, 7.3.1 NAL unit syntax
  602. // H.264-AVC-ISO_IEC_14496-10-2012.pdf, page 61.
  603. if (buf.empty()) {
  604. return butil::Status(EINVAL, "SPS is empty");
  605. }
  606. const int8_t nutv = buf[0];
  607. const int8_t forbidden_zero_bit = (nutv >> 7) & 0x01;
  608. if (forbidden_zero_bit) {
  609. return butil::Status(EINVAL, "forbidden_zero_bit shall equal 0");
  610. }
  611. // nal_ref_idc not equal to 0 specifies that the content of the NAL unit
  612. // contains:
  613. // a sequence parameter set
  614. // or a picture parameter set
  615. // or a slice of a reference picture
  616. // or a slice data partition of a reference picture.
  617. int8_t nal_ref_idc = (nutv >> 5) & 0x03;
  618. if (!nal_ref_idc) {
  619. return butil::Status(EINVAL, "nal_ref_idc is 0");
  620. }
  621. // 7.4.1 NAL unit semantics
  622. // H.264-AVC-ISO_IEC_14496-10-2012.pdf, page 61.
  623. // nal_unit_type specifies the type of RBSP data structure contained in
  624. // the NAL unit as specified in Table 7-1.
  625. const AVCNaluType nal_unit_type = (AVCNaluType)(nutv & 0x1f);
  626. if (nal_unit_type != AVC_NALU_SPS) {
  627. return butil::Status(EINVAL, "nal_unit_type is not %d", (int)AVC_NALU_SPS);
  628. }
  629. // Extract the rbsp from sps.
  630. DEFINE_SMALL_ARRAY(char, rbsp, sps_length - 1, 64);
  631. buf.copy(rbsp, sps_length - 1, 1);
  632. size_t rbsp_len = 0;
  633. for (size_t i = 1; i < sps_length; ++i) {
  634. // XX 00 00 03 XX, the 03 byte should be dropped.
  635. if (!(i >= 3 && buf[i - 2] == 0 && buf[i - 1] == 0 && buf[i] == 3)) {
  636. rbsp[rbsp_len++] = buf[i];
  637. }
  638. }
  639. // for SPS, Sequence parameter set data syntax
  640. // H.264-AVC-ISO_IEC_14496-10-2012.pdf, page 62.
  641. if (rbsp_len < 3) {
  642. return butil::Status(EINVAL, "rbsp must be at least 3 bytes");
  643. }
  644. // Decode rbsp.
  645. const char* p = rbsp;
  646. uint8_t profile_idc = *p++;
  647. if (!profile_idc) {
  648. return butil::Status(EINVAL, "profile_idc is 0");
  649. }
  650. int8_t flags = *p++;
  651. if (flags & 0x03) {
  652. return butil::Status(EINVAL, "Invalid flags=%d", (int)flags);
  653. }
  654. uint8_t level_idc = *p++;
  655. if (!level_idc) {
  656. return butil::Status(EINVAL, "level_idc is 0");
  657. }
  658. BitStream bs(p, rbsp + rbsp_len - p);
  659. int32_t seq_parameter_set_id = -1;
  660. if (avc_nalu_read_uev(&bs, &seq_parameter_set_id) != 0) {
  661. return butil::Status(EINVAL, "Fail to read seq_parameter_set_id");
  662. }
  663. if (seq_parameter_set_id < 0) {
  664. return butil::Status(EINVAL, "Invalid seq_parameter_set_id=%d",
  665. (int)seq_parameter_set_id);
  666. }
  667. int32_t chroma_format_idc = -1;
  668. if (profile_idc == 100 || profile_idc == 110 || profile_idc == 122 ||
  669. profile_idc == 244 || profile_idc == 44 || profile_idc == 83 ||
  670. profile_idc == 86 || profile_idc == 118 || profile_idc == 128) {
  671. if (avc_nalu_read_uev(&bs, &chroma_format_idc) != 0) {
  672. return butil::Status(EINVAL, "Fail to read chroma_format_idc");
  673. }
  674. if (chroma_format_idc == 3) {
  675. int8_t separate_colour_plane_flag = -1;
  676. if (avc_nalu_read_bit(&bs, &separate_colour_plane_flag) != 0) {
  677. return butil::Status(EINVAL, "Fail to read separate_colour_plane_flag");
  678. }
  679. }
  680. int32_t bit_depth_luma_minus8 = -1;
  681. if (avc_nalu_read_uev(&bs, &bit_depth_luma_minus8) != 0) {
  682. return butil::Status(EINVAL, "Fail to read bit_depth_luma_minus8");
  683. }
  684. int32_t bit_depth_chroma_minus8 = -1;
  685. if (avc_nalu_read_uev(&bs, &bit_depth_chroma_minus8) != 0) {
  686. return butil::Status(EINVAL, "Fail to read bit_depth_chroma_minus8");
  687. }
  688. int8_t qpprime_y_zero_transform_bypass_flag = -1;
  689. if (avc_nalu_read_bit(&bs, &qpprime_y_zero_transform_bypass_flag) != 0) {
  690. return butil::Status(EINVAL, "Fail to read qpprime_y_zero_transform_bypass_flag");
  691. }
  692. int8_t seq_scaling_matrix_present_flag = -1;
  693. if (avc_nalu_read_bit(&bs, &seq_scaling_matrix_present_flag) != 0) {
  694. return butil::Status(EINVAL, "Fail to read seq_scaling_matrix_present_flag");
  695. }
  696. if (seq_scaling_matrix_present_flag) {
  697. int nb_scmpfs = (chroma_format_idc != 3 ? 8 : 12);
  698. for (int i = 0; i < nb_scmpfs; i++) {
  699. int8_t seq_scaling_matrix_present_flag_i = -1;
  700. if (avc_nalu_read_bit(&bs, &seq_scaling_matrix_present_flag_i)) {
  701. return butil::Status(EINVAL, "Fail to read seq_scaling_"
  702. "matrix_present_flag[%d]", i);
  703. }
  704. if (seq_scaling_matrix_present_flag_i) {
  705. return butil::Status(EINVAL, "Invalid seq_scaling_matrix_"
  706. "present_flag[%d]=%d nb_scmpfs=%d",
  707. i, (int)seq_scaling_matrix_present_flag_i,
  708. nb_scmpfs);
  709. }
  710. }
  711. }
  712. }
  713. int32_t log2_max_frame_num_minus4 = -1;
  714. if (avc_nalu_read_uev(&bs, &log2_max_frame_num_minus4) != 0) {
  715. return butil::Status(EINVAL, "Fail to read log2_max_frame_num_minus4");
  716. }
  717. int32_t pic_order_cnt_type = -1;
  718. if (avc_nalu_read_uev(&bs, &pic_order_cnt_type) != 0) {
  719. return butil::Status(EINVAL, "Fail to read pic_order_cnt_type");
  720. }
  721. if (pic_order_cnt_type == 0) {
  722. int32_t log2_max_pic_order_cnt_lsb_minus4 = -1;
  723. if (avc_nalu_read_uev(&bs, &log2_max_pic_order_cnt_lsb_minus4) != 0) {
  724. return butil::Status(EINVAL, "Fail to read log2_max_pic_order_cnt_lsb_minus4");
  725. }
  726. } else if (pic_order_cnt_type == 1) {
  727. int8_t delta_pic_order_always_zero_flag = -1;
  728. if (avc_nalu_read_bit(&bs, &delta_pic_order_always_zero_flag) != 0) {
  729. return butil::Status(EINVAL, "Fail to read delta_pic_order_always_zero_flag");
  730. }
  731. int32_t offset_for_non_ref_pic = -1;
  732. if (avc_nalu_read_uev(&bs, &offset_for_non_ref_pic) != 0) {
  733. return butil::Status(EINVAL, "Fail to read offset_for_non_ref_pic");
  734. }
  735. int32_t offset_for_top_to_bottom_field = -1;
  736. if (avc_nalu_read_uev(&bs, &offset_for_top_to_bottom_field) != 0) {
  737. return butil::Status(EINVAL, "Fail to read offset_for_top_to_bottom_field");
  738. }
  739. int32_t num_ref_frames_in_pic_order_cnt_cycle = -1;
  740. if (avc_nalu_read_uev(&bs, &num_ref_frames_in_pic_order_cnt_cycle) != 0) {
  741. return butil::Status(EINVAL, "Fail to read num_ref_frames_in_pic_order_cnt_cycle");
  742. }
  743. if (num_ref_frames_in_pic_order_cnt_cycle) {
  744. return butil::Status(EINVAL, "Invalid num_ref_frames_in_pic_order_cnt_cycle=%d",
  745. num_ref_frames_in_pic_order_cnt_cycle);
  746. }
  747. }
  748. int32_t max_num_ref_frames = -1;
  749. if (avc_nalu_read_uev(&bs, &max_num_ref_frames) != 0) {
  750. return butil::Status(EINVAL, "Fail to read max_num_ref_frames");
  751. }
  752. int8_t gaps_in_frame_num_value_allowed_flag = -1;
  753. if (avc_nalu_read_bit(&bs, &gaps_in_frame_num_value_allowed_flag) != 0) {
  754. return butil::Status(EINVAL, "Fail to read gaps_in_frame_num_value_allowed_flag");
  755. }
  756. int32_t pic_width_in_mbs_minus1 = -1;
  757. if (avc_nalu_read_uev(&bs, &pic_width_in_mbs_minus1) != 0) {
  758. return butil::Status(EINVAL, "Fail to read pic_width_in_mbs_minus1");
  759. }
  760. int32_t pic_height_in_map_units_minus1 = -1;
  761. if (avc_nalu_read_uev(&bs, &pic_height_in_map_units_minus1) != 0) {
  762. return butil::Status(EINVAL, "Fail to read pic_height_in_map_units_minus1");
  763. }
  764. width = (int)(pic_width_in_mbs_minus1 + 1) * 16;
  765. height = (int)(pic_height_in_map_units_minus1 + 1) * 16;
  766. return butil::Status::OK();
  767. }
  768. static bool find_avc_annexb_nalu_start_code(const butil::IOBuf& buf,
  769. size_t* start_code_length) {
  770. size_t consecutive_zero_count = 0;
  771. for (butil::IOBufBytesIterator it(buf); it != NULL; ++it) {
  772. char c = *it;
  773. if (c == 0) {
  774. ++consecutive_zero_count;
  775. } else if (c == 1) {
  776. if (consecutive_zero_count >= 2) {
  777. if (start_code_length) {
  778. *start_code_length = consecutive_zero_count + 1;
  779. }
  780. return true;
  781. }
  782. return false;
  783. } else {
  784. return false;
  785. }
  786. }
  787. return false;
  788. }
  789. static void find_avc_annexb_nalu_stop_code(const butil::IOBuf& buf,
  790. size_t* nalu_length_out,
  791. size_t* stop_code_length) {
  792. size_t nalu_length = 0;
  793. size_t consecutive_zero_count = 0;
  794. for (butil::IOBufBytesIterator it(buf); it != NULL; ++it) {
  795. unsigned char c = (unsigned char)*it;
  796. if (c > 1) { // most frequent
  797. ++nalu_length;
  798. consecutive_zero_count = 0;
  799. continue;
  800. }
  801. if (c == 0) {
  802. ++consecutive_zero_count;
  803. } else { // c == 1
  804. if (consecutive_zero_count >= 2) {
  805. if (nalu_length_out) {
  806. *nalu_length_out = nalu_length;
  807. }
  808. if (stop_code_length) {
  809. *stop_code_length = consecutive_zero_count + 1;
  810. }
  811. return;
  812. }
  813. ++nalu_length;
  814. consecutive_zero_count = 0;
  815. }
  816. }
  817. if (nalu_length_out) {
  818. *nalu_length_out = nalu_length + consecutive_zero_count;
  819. }
  820. if (stop_code_length) {
  821. *stop_code_length = 0;
  822. }
  823. }
  824. AVCNaluIterator::AVCNaluIterator(butil::IOBuf* data, uint32_t length_size_minus1,
  825. AVCNaluFormat* format)
  826. : _data(data)
  827. , _format(format)
  828. , _length_size_minus1(length_size_minus1)
  829. , _nalu_type(AVC_NALU_EMPTY) {
  830. if (_data) {
  831. ++*this;
  832. }
  833. }
  834. AVCNaluIterator::~AVCNaluIterator() {
  835. }
  836. void AVCNaluIterator::operator++() {
  837. if (*_format == AVC_NALU_FORMAT_ANNEXB) {
  838. if (!next_as_annexb()) {
  839. return set_end();
  840. }
  841. } else if (*_format == AVC_NALU_FORMAT_IBMF) {
  842. if (!next_as_ibmf()) {
  843. return set_end();
  844. }
  845. } else {
  846. size_t start_code_length = 0;
  847. if (find_avc_annexb_nalu_start_code(*_data, &start_code_length) &&
  848. _data->size() > start_code_length) {
  849. if (start_code_length > 0) {
  850. _data->pop_front(start_code_length);
  851. }
  852. *_format = AVC_NALU_FORMAT_ANNEXB;
  853. if (!next_as_annexb()) {
  854. return set_end();
  855. }
  856. } else if (next_as_ibmf()) {
  857. *_format = AVC_NALU_FORMAT_IBMF;
  858. } else {
  859. set_end();
  860. }
  861. }
  862. }
  863. bool AVCNaluIterator::next_as_annexb() {
  864. if (_data->empty()) {
  865. return false;
  866. }
  867. size_t nalu_length = 0;
  868. size_t stop_code_length = 0;
  869. find_avc_annexb_nalu_stop_code(*_data, &nalu_length, &stop_code_length);
  870. _cur_nalu.clear();
  871. _nalu_type = AVC_NALU_EMPTY;
  872. if (nalu_length) {
  873. _data->cutn(&_cur_nalu, nalu_length);
  874. const uint8_t byte0 = *(const uint8_t*)_cur_nalu.fetch1();
  875. _nalu_type = (AVCNaluType)(byte0 & 0x1f);
  876. }
  877. if (stop_code_length) {
  878. _data->pop_front(stop_code_length);
  879. }
  880. return true;
  881. }
  882. bool AVCNaluIterator::next_as_ibmf() {
  883. // The value of this field shall be one of 0, 1, or 3 corresponding to a
  884. // length encoded with 1, 2, or 4 bytes, respectively.
  885. CHECK_NE(_length_size_minus1, 2u);
  886. if (_data->empty()) {
  887. return false;
  888. }
  889. if (_data->size() < _length_size_minus1 + 1) {
  890. LOG(ERROR) << "Not enough data to decode length of NALU";
  891. return false;
  892. }
  893. int32_t nalu_length = 0;
  894. char buf[4];
  895. if (_length_size_minus1 == 3) {
  896. _data->copy_to(buf, 4);
  897. nalu_length = policy::ReadBigEndian4Bytes(buf);
  898. } else if (_length_size_minus1 == 1) {
  899. _data->copy_to(buf, 2);
  900. nalu_length = policy::ReadBigEndian2Bytes(buf);
  901. } else {
  902. _data->copy_to(buf, 1);
  903. nalu_length = *buf;
  904. }
  905. // maybe stream is invalid format.
  906. // see: https://github.com/ossrs/srs/issues/183
  907. if (nalu_length < 0) {
  908. LOG(ERROR) << "Invalid nalu_length=" << nalu_length;
  909. return false;
  910. }
  911. if (_data->size() < _length_size_minus1 + 1 + nalu_length) {
  912. LOG(ERROR) << "Not enough data to decode NALU";
  913. return false;
  914. }
  915. _data->pop_front(_length_size_minus1 + 1);
  916. _cur_nalu.clear();
  917. _nalu_type = AVC_NALU_EMPTY;
  918. if (nalu_length) {
  919. _data->cutn(&_cur_nalu, nalu_length);
  920. const uint8_t byte0 = *(const uint8_t*)_cur_nalu.fetch1();
  921. _nalu_type = (AVCNaluType)(byte0 & 0x1f);
  922. }
  923. return true;
  924. }
  925. RtmpClientOptions::RtmpClientOptions()
  926. : fpad(false)
  927. , audioCodecs((RtmpAudioCodec)3575) // Copy from SRS
  928. , videoCodecs((RtmpVideoCodec)252) // Copy from SRS
  930. , timeout_ms(1000)
  931. , connect_timeout_ms(500)
  932. , buffer_length_ms(1000)
  933. , chunk_size(policy::RTMP_DEFAULT_CHUNK_SIZE)
  934. , window_ack_size(policy::RTMP_DEFAULT_WINDOW_ACK_SIZE)
  935. , simplified_rtmp(false) {
  936. }
  937. // Shared by RtmpClient and RtmpClientStream(s)
  938. class RtmpClientImpl : public SharedObject {
  939. friend class RtmpClientStream;
  940. public:
  941. RtmpClientImpl() {
  942. get_rtmp_bvars()->client_count << 1;
  943. }
  944. ~RtmpClientImpl() {
  945. get_rtmp_bvars()->client_count << -1;
  946. RPC_VLOG << "Destroying RtmpClientImpl=" << this;
  947. }
  948. // Specify the servers to connect.
  949. int Init(butil::EndPoint server_addr_and_port,
  950. const RtmpClientOptions& options);
  951. int Init(const char* server_addr_and_port,
  952. const RtmpClientOptions& options);
  953. int Init(const char* server_addr, int port,
  954. const RtmpClientOptions& options);
  955. int Init(const char* naming_service_url,
  956. const char* load_balancer_name,
  957. const RtmpClientOptions& options);
  958. const RtmpClientOptions& options() const { return _connect_options; }
  959. SocketMap& socket_map() { return _socket_map; }
  960. int CreateSocket(const butil::EndPoint& pt, SocketId* id);
  961. private:
  962. DISALLOW_COPY_AND_ASSIGN(RtmpClientImpl);
  963. int CommonInit(const RtmpClientOptions& options);
  964. Channel _chan;
  965. RtmpClientOptions _connect_options;
  966. SocketMap _socket_map;
  967. };
  968. class RtmpConnect : public AppConnect {
  969. public:
  970. // @AppConnect
  971. void StartConnect(const Socket* s, void (*done)(int, void*), void* data) override;
  972. void StopConnect(Socket* s) override;
  973. };
  974. void RtmpConnect::StartConnect(
  975. const Socket* s, void (*done)(int, void*), void* data) {
  976. RPC_VLOG << "Establish rtmp-level connection on " << *s;
  977. policy::RtmpContext* ctx =
  978. static_cast<policy::RtmpContext*>(s->parsing_context());
  979. if (ctx == NULL) {
  980. LOG(FATAL) << "RtmpContext of " << *s << " is NULL";
  981. return done(EINVAL, data);
  982. }
  983. const RtmpClientOptions* _client_options = ctx->client_options();
  984. if (_client_options && _client_options->simplified_rtmp) {
  985. ctx->set_simplified_rtmp(true);
  986. if (ctx->SendConnectRequest(s->remote_side(), s->fd(), true) != 0) {
  987. LOG(ERROR) << s->remote_side() << ": Fail to send simple connect";
  988. return done(EINVAL, data);
  989. }
  990. ctx->SetState(s->remote_side(), policy::RtmpContext::STATE_RECEIVED_S2);
  991. ctx->set_create_stream_with_play_or_publish(true);
  992. return done(0, data);
  993. }
  994. // Save to callback to call when RTMP connect is done.
  995. ctx->SetConnectCallback(done, data);
  996. // Initiate the rtmp handshake.
  997. bool is_simple_handshake = false;
  998. if (policy::SendC0C1(s->fd(), &is_simple_handshake) != 0) {
  999. LOG(ERROR) << s->remote_side() << ": Fail to send C0 C1";
  1000. return done(EINVAL, data);
  1001. }
  1002. if (is_simple_handshake) {
  1003. ctx->only_check_simple_s0s1();
  1004. }
  1005. }
  1006. void RtmpConnect::StopConnect(Socket* s) {
  1007. policy::RtmpContext* ctx =
  1008. static_cast<policy::RtmpContext*>(s->parsing_context());
  1009. if (ctx == NULL) {
  1010. LOG(FATAL) << "RtmpContext of " << *s << " is NULL";
  1011. } else {
  1012. ctx->OnConnected(EFAILEDSOCKET);
  1013. }
  1014. }
  1015. class RtmpSocketCreator : public SocketCreator {
  1016. public:
  1017. RtmpSocketCreator(const RtmpClientOptions& connect_options)
  1018. : _connect_options(connect_options) {
  1019. }
  1020. int CreateSocket(const SocketOptions& opt, SocketId* id) {
  1021. SocketOptions sock_opt = opt;
  1022. sock_opt.app_connect = std::make_shared<RtmpConnect>();
  1023. sock_opt.initial_parsing_context = new policy::RtmpContext(&_connect_options, NULL);
  1024. return get_client_side_messenger()->Create(sock_opt, id);
  1025. }
  1026. private:
  1027. RtmpClientOptions _connect_options;
  1028. };
  1029. int RtmpClientImpl::CreateSocket(const butil::EndPoint& pt, SocketId* id) {
  1030. SocketOptions sock_opt;
  1031. sock_opt.remote_side = pt;
  1032. sock_opt.app_connect = std::make_shared<RtmpConnect>();
  1033. sock_opt.initial_parsing_context = new policy::RtmpContext(&_connect_options, NULL);
  1034. return get_client_side_messenger()->Create(sock_opt, id);
  1035. }
  1036. int RtmpClientImpl::CommonInit(const RtmpClientOptions& options) {
  1037. _connect_options = options;
  1038. SocketMapOptions sm_options;
  1039. sm_options.socket_creator = new RtmpSocketCreator(_connect_options);
  1040. if (_socket_map.Init(sm_options) != 0) {
  1041. LOG(ERROR) << "Fail to init _socket_map";
  1042. return -1;
  1043. }
  1044. return 0;
  1045. }
  1046. int RtmpClientImpl::Init(butil::EndPoint server_addr_and_port,
  1047. const RtmpClientOptions& options) {
  1048. if (CommonInit(options) != 0) {
  1049. return -1;
  1050. }
  1051. ChannelOptions copts;
  1052. copts.connect_timeout_ms = options.connect_timeout_ms;
  1053. copts.timeout_ms = options.timeout_ms;
  1054. copts.protocol = PROTOCOL_RTMP;
  1055. return _chan.Init(server_addr_and_port, &copts);
  1056. }
  1057. int RtmpClientImpl::Init(const char* server_addr_and_port,
  1058. const RtmpClientOptions& options) {
  1059. if (CommonInit(options) != 0) {
  1060. return -1;
  1061. }
  1062. ChannelOptions copts;
  1063. copts.connect_timeout_ms = options.connect_timeout_ms;
  1064. copts.timeout_ms = options.timeout_ms;
  1065. copts.protocol = PROTOCOL_RTMP;
  1066. return _chan.Init(server_addr_and_port, &copts);
  1067. }
  1068. int RtmpClientImpl::Init(const char* server_addr, int port,
  1069. const RtmpClientOptions& options) {
  1070. if (CommonInit(options) != 0) {
  1071. return -1;
  1072. }
  1073. ChannelOptions copts;
  1074. copts.connect_timeout_ms = options.connect_timeout_ms;
  1075. copts.timeout_ms = options.timeout_ms;
  1076. copts.protocol = PROTOCOL_RTMP;
  1077. return _chan.Init(server_addr, port, &copts);
  1078. }
  1079. int RtmpClientImpl::Init(const char* naming_service_url,
  1080. const char* load_balancer_name,
  1081. const RtmpClientOptions& options) {
  1082. if (CommonInit(options) != 0) {
  1083. return -1;
  1084. }
  1085. ChannelOptions copts;
  1086. copts.connect_timeout_ms = options.connect_timeout_ms;
  1087. copts.timeout_ms = options.timeout_ms;
  1088. copts.protocol = PROTOCOL_RTMP;
  1089. return _chan.Init(naming_service_url, load_balancer_name, &copts);
  1090. }
  1091. RtmpClient::RtmpClient() {}
  1092. RtmpClient::~RtmpClient() {}
  1093. RtmpClient::RtmpClient(const RtmpClient& rhs) : _impl(rhs._impl) {}
  1094. RtmpClient& RtmpClient::operator=(const RtmpClient& rhs) {
  1095. _impl = rhs._impl;
  1096. return *this;
  1097. }
  1098. const RtmpClientOptions& RtmpClient::options() const {
  1099. if (_impl) {
  1100. return _impl->options();
  1101. } else {
  1102. static RtmpClientOptions dft_opt;
  1103. return dft_opt;
  1104. }
  1105. }
  1106. int RtmpClient::Init(butil::EndPoint server_addr_and_port,
  1107. const RtmpClientOptions& options) {
  1108. butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl);
  1109. if (tmp == NULL) {
  1110. LOG(FATAL) << "Fail to new RtmpClientImpl";
  1111. return -1;
  1112. }
  1113. if (tmp->Init(server_addr_and_port, options) != 0) {
  1114. return -1;
  1115. }
  1116. tmp.swap(_impl);
  1117. return 0;
  1118. }
  1119. int RtmpClient::Init(const char* server_addr_and_port,
  1120. const RtmpClientOptions& options) {
  1121. butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl);
  1122. if (tmp == NULL) {
  1123. LOG(FATAL) << "Fail to new RtmpClientImpl";
  1124. return -1;
  1125. }
  1126. if (tmp->Init(server_addr_and_port, options) != 0) {
  1127. return -1;
  1128. }
  1129. tmp.swap(_impl);
  1130. return 0;
  1131. }
  1132. int RtmpClient::Init(const char* server_addr, int port,
  1133. const RtmpClientOptions& options) {
  1134. butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl);
  1135. if (tmp == NULL) {
  1136. LOG(FATAL) << "Fail to new RtmpClientImpl";
  1137. return -1;
  1138. }
  1139. if (tmp->Init(server_addr, port, options) != 0) {
  1140. return -1;
  1141. }
  1142. tmp.swap(_impl);
  1143. return 0;
  1144. }
  1145. int RtmpClient::Init(const char* naming_service_url,
  1146. const char* load_balancer_name,
  1147. const RtmpClientOptions& options) {
  1148. butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl);
  1149. if (tmp == NULL) {
  1150. LOG(FATAL) << "Fail to new RtmpClientImpl";
  1151. return -1;
  1152. }
  1153. if (tmp->Init(naming_service_url, load_balancer_name, options) != 0) {
  1154. return -1;
  1155. }
  1156. tmp.swap(_impl);
  1157. return 0;
  1158. }
  1159. bool RtmpClient::initialized() const { return _impl != NULL; }
  1160. RtmpStreamBase::RtmpStreamBase(bool is_client)
  1161. : _is_client(is_client)
  1162. , _paused(false)
  1163. , _stopped(false)
  1164. , _processing_msg(false)
  1165. , _has_data_ever(false)
  1166. , _message_stream_id(0)
  1167. , _chunk_stream_id(0)
  1168. , _create_realtime_us(butil::gettimeofday_us())
  1169. , _is_server_accepted(false) {
  1170. }
  1171. RtmpStreamBase::~RtmpStreamBase() {
  1172. }
  1173. void RtmpStreamBase::Destroy() {
  1174. return;
  1175. }
  1176. int RtmpStreamBase::SendMessage(uint32_t timestamp,
  1177. uint8_t message_type,
  1178. const butil::IOBuf& body) {
  1179. if (_rtmpsock == NULL) {
  1180. errno = EPERM;
  1181. return -1;
  1182. }
  1183. if (_chunk_stream_id == 0) {
  1184. LOG(ERROR) << "SendXXXMessage can't be called before play() is received";
  1185. errno = EPERM;
  1186. return -1;
  1187. }
  1188. SocketMessagePtr<policy::RtmpUnsentMessage> msg(new policy::RtmpUnsentMessage);
  1189. msg->header.timestamp = timestamp;
  1190. msg->header.message_length = body.size();
  1191. msg->header.message_type = message_type;
  1192. msg->header.stream_id = _message_stream_id;
  1193. msg->chunk_stream_id = _chunk_stream_id;
  1194. msg->body = body;
  1195. return _rtmpsock->Write(msg);
  1196. }
  1197. int RtmpStreamBase::SendControlMessage(
  1198. uint8_t message_type, const void* body, size_t size) {
  1199. if (_rtmpsock == NULL) {
  1200. errno = EPERM;
  1201. return -1;
  1202. }
  1203. SocketMessagePtr<policy::RtmpUnsentMessage> msg(
  1204. policy::MakeUnsentControlMessage(message_type, body, size));
  1205. return _rtmpsock->Write(msg);
  1206. }
  1207. int RtmpStreamBase::SendCuePoint(const RtmpCuePoint& cuepoint) {
  1208. butil::IOBuf req_buf;
  1209. {
  1210. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  1211. AMFOutputStream ostream(&zc_stream);
  1212. WriteAMFString(RTMP_AMF0_SET_DATAFRAME, &ostream);
  1213. WriteAMFString(RTMP_AMF0_ON_CUE_POINT, &ostream);
  1214. WriteAMFObject(cuepoint.data, &ostream);
  1215. if (!ostream.good()) {
  1216. LOG(ERROR) << "Fail to serialize cuepoint";
  1217. return -1;
  1218. }
  1219. }
  1220. return SendMessage(cuepoint.timestamp, policy::RTMP_MESSAGE_DATA_AMF0, req_buf);
  1221. }
  1222. int RtmpStreamBase::SendMetaData(const RtmpMetaData& metadata,
  1223. const butil::StringPiece& name) {
  1224. butil::IOBuf req_buf;
  1225. {
  1226. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  1227. AMFOutputStream ostream(&zc_stream);
  1228. WriteAMFString(name, &ostream);
  1229. WriteAMFObject(metadata.data, &ostream);
  1230. if (!ostream.good()) {
  1231. LOG(ERROR) << "Fail to serialize metadata";
  1232. return -1;
  1233. }
  1234. }
  1235. return SendMessage(metadata.timestamp, policy::RTMP_MESSAGE_DATA_AMF0, req_buf);
  1236. }
  1237. int RtmpStreamBase::SendSharedObjectMessage(const RtmpSharedObjectMessage&) {
  1238. CHECK(false) << "Not supported yet";
  1239. return -1;
  1240. }
  1241. int RtmpStreamBase::SendAudioMessage(const RtmpAudioMessage& msg) {
  1242. if (_rtmpsock == NULL) {
  1243. errno = EPERM;
  1244. return -1;
  1245. }
  1246. if (_chunk_stream_id == 0) {
  1247. LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received";
  1248. errno = EPERM;
  1249. return -1;
  1250. }
  1251. if (_paused) {
  1252. errno = EPERM;
  1253. return -1;
  1254. }
  1255. SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage);
  1256. msg2->header.timestamp = msg.timestamp;
  1257. msg2->header.message_length = msg.size();
  1258. msg2->header.message_type = policy::RTMP_MESSAGE_AUDIO;
  1259. msg2->header.stream_id = _message_stream_id;
  1260. msg2->chunk_stream_id = _chunk_stream_id;
  1261. // Make audio header.
  1262. const char audio_head =
  1263. ((msg.codec & 0xF) << 4)
  1264. | ((msg.rate & 0x3) << 2)
  1265. | ((msg.bits & 0x1) << 1)
  1266. | (msg.type & 0x1);
  1267. msg2->body.push_back(audio_head);
  1268. msg2->body.append(msg.data);
  1269. return _rtmpsock->Write(msg2);
  1270. }
  1271. int RtmpStreamBase::SendAACMessage(const RtmpAACMessage& msg) {
  1272. if (_rtmpsock == NULL) {
  1273. errno = EPERM;
  1274. return -1;
  1275. }
  1276. if (_chunk_stream_id == 0) {
  1277. LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received";
  1278. errno = EPERM;
  1279. return -1;
  1280. }
  1281. if (_paused) {
  1282. errno = EPERM;
  1283. return -1;
  1284. }
  1285. SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage);
  1286. msg2->header.timestamp = msg.timestamp;
  1287. msg2->header.message_length = msg.size();
  1288. msg2->header.message_type = policy::RTMP_MESSAGE_AUDIO;
  1289. msg2->header.stream_id = _message_stream_id;
  1290. msg2->chunk_stream_id = _chunk_stream_id;
  1291. // Make audio header.
  1292. char aac_head[2];
  1293. aac_head[0] = ((FLV_AUDIO_AAC & 0xF) << 4)
  1294. | ((msg.rate & 0x3) << 2)
  1295. | ((msg.bits & 0x1) << 1)
  1296. | (msg.type & 0x1);
  1297. aac_head[1] = (FlvAACPacketType)msg.packet_type;
  1298. msg2->body.append(aac_head, sizeof(aac_head));
  1299. msg2->body.append(msg.data);
  1300. return _rtmpsock->Write(msg2);
  1301. }
  1302. int RtmpStreamBase::SendUserMessage(void*) {
  1303. CHECK(false) << "You should implement your own SendUserMessage";
  1304. return 0;
  1305. }
  1306. int RtmpStreamBase::SendVideoMessage(const RtmpVideoMessage& msg) {
  1307. if (_rtmpsock == NULL) {
  1308. errno = EPERM;
  1309. return -1;
  1310. }
  1311. if (_chunk_stream_id == 0) {
  1312. LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received";
  1313. errno = EPERM;
  1314. return -1;
  1315. }
  1316. if (!policy::is_video_frame_type_valid(msg.frame_type)) {
  1317. LOG(WARNING) << "Invalid frame_type=" << (int)msg.frame_type;
  1318. }
  1319. if (!policy::is_video_codec_valid(msg.codec)) {
  1320. LOG(WARNING) << "Invalid codec=" << (int)msg.codec;
  1321. }
  1322. if (_paused) {
  1323. errno = EPERM;
  1324. return -1;
  1325. }
  1326. SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage);
  1327. msg2->header.timestamp = msg.timestamp;
  1328. msg2->header.message_length = msg.size();
  1329. msg2->header.message_type = policy::RTMP_MESSAGE_VIDEO;
  1330. msg2->header.stream_id = _message_stream_id;
  1331. msg2->chunk_stream_id = _chunk_stream_id;
  1332. // Make video header
  1333. const char video_head = ((msg.frame_type & 0xF) << 4) | (msg.codec & 0xF);
  1334. msg2->body.push_back(video_head);
  1335. msg2->body.append(msg.data);
  1336. return _rtmpsock->Write(msg2);
  1337. }
  1338. int RtmpStreamBase::SendAVCMessage(const RtmpAVCMessage& msg) {
  1339. if (_rtmpsock == NULL) {
  1340. errno = EPERM;
  1341. return -1;
  1342. }
  1343. if (_chunk_stream_id == 0) {
  1344. LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received";
  1345. errno = EPERM;
  1346. return -1;
  1347. }
  1348. if (!policy::is_video_frame_type_valid(msg.frame_type)) {
  1349. LOG(WARNING) << "Invalid frame_type=" << (int)msg.frame_type;
  1350. }
  1351. if (_paused) {
  1352. errno = EPERM;
  1353. return -1;
  1354. }
  1355. SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage);
  1356. msg2->header.timestamp = msg.timestamp;
  1357. msg2->header.message_length = msg.size();
  1358. msg2->header.message_type = policy::RTMP_MESSAGE_VIDEO;
  1359. msg2->header.stream_id = _message_stream_id;
  1360. msg2->chunk_stream_id = _chunk_stream_id;
  1361. // Make video header
  1362. char avc_head[5];
  1363. char* p = avc_head;
  1364. *p++ = ((msg.frame_type & 0xF) << 4) | (FLV_VIDEO_AVC & 0xF);
  1365. *p++ = (FlvAVCPacketType)msg.packet_type;
  1366. policy::WriteBigEndian3Bytes(&p, msg.composition_time);
  1367. msg2->body.append(avc_head, sizeof(avc_head));
  1368. msg2->body.append(msg.data);
  1369. return _rtmpsock->Write(msg2);
  1370. }
  1371. int RtmpStreamBase::SendStopMessage(const butil::StringPiece&) {
  1372. return -1;
  1373. }
  1374. const char* RtmpObjectEncoding2Str(RtmpObjectEncoding e) {
  1375. switch (e) {
  1376. case RTMP_AMF0: return "AMF0";
  1377. case RTMP_AMF3: return "AMF3";
  1378. }
  1379. return "Unknown RtmpObjectEncoding";
  1380. }
  1381. void RtmpStreamBase::SignalError() {
  1382. return;
  1383. }
  1384. void RtmpStreamBase::OnFirstMessage() {}
  1385. void RtmpStreamBase::OnUserData(void*) {
  1386. LOG(INFO) << remote_side() << '[' << stream_id()
  1387. << "] ignored UserData{}";
  1388. }
  1389. void RtmpStreamBase::OnCuePoint(RtmpCuePoint* cuepoint) {
  1390. LOG(INFO) << remote_side() << '[' << stream_id()
  1391. << "] ignored CuePoint{" << cuepoint->data << '}';
  1392. }
  1393. void RtmpStreamBase::OnMetaData(RtmpMetaData* metadata, const butil::StringPiece& name) {
  1394. LOG(INFO) << remote_side() << '[' << stream_id()
  1395. << "] ignored MetaData{" << metadata->data << '}'
  1396. << " name{" << name << '}';
  1397. }
  1398. void RtmpStreamBase::OnSharedObjectMessage(RtmpSharedObjectMessage*) {
  1399. LOG(ERROR) << remote_side() << '[' << stream_id()
  1400. << "] ignored SharedObjectMessage{}";
  1401. }
  1402. void RtmpStreamBase::OnAudioMessage(RtmpAudioMessage* msg) {
  1403. LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored " << *msg;
  1404. }
  1405. void RtmpStreamBase::OnVideoMessage(RtmpVideoMessage* msg) {
  1406. LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored " << *msg;
  1407. }
  1408. void RtmpStreamBase::OnStop() {
  1409. // do nothing by default
  1410. }
  1411. bool RtmpStreamBase::BeginProcessingMessage(const char* fun_name) {
  1412. std::unique_lock<butil::Mutex> mu(_call_mutex);
  1413. if (_stopped) {
  1414. mu.unlock();
  1415. LOG(ERROR) << fun_name << " is called after OnStop()";
  1416. return false;
  1417. }
  1418. if (_processing_msg) {
  1419. mu.unlock();
  1420. LOG(ERROR) << "Impossible: Another OnXXXMessage is being called!";
  1421. return false;
  1422. }
  1423. _processing_msg = true;
  1424. if (!_has_data_ever) {
  1425. _has_data_ever = true;
  1426. OnFirstMessage();
  1427. }
  1428. return true;
  1429. }
  1430. void RtmpStreamBase::EndProcessingMessage() {
  1431. std::unique_lock<butil::Mutex> mu(_call_mutex);
  1432. _processing_msg = false;
  1433. if (_stopped) {
  1434. mu.unlock();
  1435. return OnStop();
  1436. }
  1437. }
  1438. void RtmpStreamBase::CallOnUserData(void* data) {
  1439. if (BeginProcessingMessage("OnUserData()")) {
  1440. OnUserData(data);
  1441. EndProcessingMessage();
  1442. }
  1443. }
  1444. void RtmpStreamBase::CallOnCuePoint(RtmpCuePoint* obj) {
  1445. if (BeginProcessingMessage("OnCuePoint()")) {
  1446. OnCuePoint(obj);
  1447. EndProcessingMessage();
  1448. }
  1449. }
  1450. void RtmpStreamBase::CallOnMetaData(RtmpMetaData* obj, const butil::StringPiece& name) {
  1451. if (BeginProcessingMessage("OnMetaData()")) {
  1452. OnMetaData(obj, name);
  1453. EndProcessingMessage();
  1454. }
  1455. }
  1456. void RtmpStreamBase::CallOnSharedObjectMessage(RtmpSharedObjectMessage* msg) {
  1457. if (BeginProcessingMessage("OnSharedObjectMessage()")) {
  1458. OnSharedObjectMessage(msg);
  1459. EndProcessingMessage();
  1460. }
  1461. }
  1462. void RtmpStreamBase::CallOnAudioMessage(RtmpAudioMessage* msg) {
  1463. if (BeginProcessingMessage("OnAudioMessage()")) {
  1464. OnAudioMessage(msg);
  1465. EndProcessingMessage();
  1466. }
  1467. }
  1468. void RtmpStreamBase::CallOnVideoMessage(RtmpVideoMessage* msg) {
  1469. if (BeginProcessingMessage("OnVideoMessage()")) {
  1470. OnVideoMessage(msg);
  1471. EndProcessingMessage();
  1472. }
  1473. }
  1474. void RtmpStreamBase::CallOnStop() {
  1475. {
  1476. std::unique_lock<butil::Mutex> mu(_call_mutex);
  1477. if (_stopped) {
  1478. mu.unlock();
  1479. LOG(ERROR) << "OnStop() was called more than once";
  1480. return;
  1481. }
  1482. _stopped = true;
  1483. if (_processing_msg) {
  1484. // EndProcessingMessage() will call OnStop();
  1485. return;
  1486. }
  1487. }
  1488. OnStop();
  1489. }
  1490. butil::EndPoint RtmpStreamBase::remote_side() const
  1491. { return _rtmpsock ? _rtmpsock->remote_side() : butil::EndPoint(); }
  1492. butil::EndPoint RtmpStreamBase::local_side() const
  1493. { return _rtmpsock ? _rtmpsock->local_side() : butil::EndPoint(); }
  1494. // ============ RtmpClientStream =============
  1495. RtmpClientStream::RtmpClientStream()
  1496. : RtmpStreamBase(true)
  1497. , _onfail_id(INVALID_BTHREAD_ID)
  1498. , _create_stream_rpc_id(INVALID_BTHREAD_ID)
  1499. , _from_socketmap(true)
  1500. , _created_stream_with_play_or_publish(false)
  1501. , _state(STATE_UNINITIALIZED) {
  1502. get_rtmp_bvars()->client_stream_count << 1;
  1503. _self_ref.reset(this);
  1504. }
  1505. RtmpClientStream::~RtmpClientStream() {
  1506. get_rtmp_bvars()->client_stream_count << -1;
  1507. }
  1508. void RtmpClientStream::Destroy() {
  1509. bthread_id_t onfail_id = INVALID_BTHREAD_ID;
  1510. CallId create_stream_rpc_id = INVALID_BTHREAD_ID;
  1511. butil::intrusive_ptr<RtmpClientStream> self_ref;
  1512. std::unique_lock<butil::Mutex> mu(_state_mutex);
  1513. switch (_state) {
  1515. _state = STATE_DESTROYING;
  1516. mu.unlock();
  1517. OnStopInternal();
  1518. _self_ref.swap(self_ref);
  1519. return;
  1520. case STATE_CREATING:
  1521. _state = STATE_DESTROYING;
  1522. create_stream_rpc_id = _create_stream_rpc_id;
  1523. mu.unlock();
  1524. _self_ref.swap(self_ref);
  1525. StartCancel(create_stream_rpc_id);
  1526. return;
  1527. case STATE_CREATED:
  1528. _state = STATE_DESTROYING;
  1529. onfail_id = _onfail_id;
  1530. mu.unlock();
  1531. _self_ref.swap(self_ref);
  1532. bthread_id_error(onfail_id, 0);
  1533. return;
  1534. case STATE_ERROR:
  1535. _state = STATE_DESTROYING;
  1536. mu.unlock();
  1537. _self_ref.swap(self_ref);
  1538. return;
  1539. case STATE_DESTROYING:
  1540. // Destroy() was already called.
  1541. return;
  1542. }
  1543. }
  1544. void RtmpClientStream::SignalError() {
  1545. bthread_id_t onfail_id = INVALID_BTHREAD_ID;
  1546. std::unique_lock<butil::Mutex> mu(_state_mutex);
  1547. switch (_state) {
  1549. _state = STATE_ERROR;
  1550. mu.unlock();
  1551. OnStopInternal();
  1552. return;
  1553. case STATE_CREATING:
  1554. _state = STATE_ERROR;
  1555. mu.unlock();
  1556. return;
  1557. case STATE_CREATED:
  1558. _state = STATE_ERROR;
  1559. onfail_id = _onfail_id;
  1560. mu.unlock();
  1561. bthread_id_error(onfail_id, 0);
  1562. return;
  1563. case STATE_ERROR:
  1564. case STATE_DESTROYING:
  1565. // SignalError() or Destroy() was already called.
  1566. return;
  1567. }
  1568. }
  1569. StreamUserData* RtmpClientStream::OnCreatingStream(
  1570. SocketUniquePtr* inout, Controller* cntl) {
  1571. {
  1572. std::unique_lock<butil::Mutex> mu(_state_mutex);
  1573. if (_state == STATE_ERROR || _state == STATE_DESTROYING) {
  1574. cntl->SetFailed(EINVAL, "Fail to replace socket for stream, _state is error or destroying");
  1575. return NULL;
  1576. }
  1577. }
  1578. SocketId esid;
  1579. if (cntl->connection_type() == CONNECTION_TYPE_SHORT) {
  1580. if (_client_impl->CreateSocket((*inout)->remote_side(), &esid) != 0) {
  1581. cntl->SetFailed(EINVAL, "Fail to create RTMP socket");
  1582. return NULL;
  1583. }
  1584. } else {
  1585. if (_client_impl->socket_map().Insert(
  1586. SocketMapKey((*inout)->remote_side()), &esid) != 0) {
  1587. cntl->SetFailed(EINVAL, "Fail to get the RTMP socket");
  1588. return NULL;
  1589. }
  1590. }
  1591. SocketUniquePtr tmp_ptr;
  1592. if (Socket::Address(esid, &tmp_ptr) != 0) {
  1593. cntl->SetFailed(EFAILEDSOCKET, "Fail to address RTMP SocketId=%" PRIu64
  1594. " from SocketMap of RtmpClient=%p",
  1595. esid, _client_impl.get());
  1596. return NULL;
  1597. }
  1598. RPC_VLOG << "Replace Socket For Stream, RTMP socketId=" << esid
  1599. << ", main socketId=" << (*inout)->id();
  1600. tmp_ptr->ShareStats(inout->get());
  1601. inout->reset(tmp_ptr.release());
  1602. return this;
  1603. }
  1604. int RtmpClientStream::RunOnFailed(bthread_id_t id, void* data, int) {
  1605. butil::intrusive_ptr<RtmpClientStream> stream(
  1606. static_cast<RtmpClientStream*>(data), false);
  1607. CHECK(stream->_rtmpsock);
  1608. // Must happen after NotifyOnFailed which is after all other callsites
  1609. // to OnStopInternal().
  1610. stream->OnStopInternal();
  1611. bthread_id_unlock_and_destroy(id);
  1612. return 0;
  1613. }
  1614. void RtmpClientStream::OnFailedToCreateStream() {
  1615. {
  1616. std::unique_lock<butil::Mutex> mu(_state_mutex);
  1617. switch (_state) {
  1618. case STATE_CREATING:
  1619. _state = STATE_ERROR;
  1620. break;
  1622. case STATE_CREATED:
  1623. _state = STATE_ERROR;
  1624. mu.unlock();
  1625. CHECK(false) << "Impossible";
  1626. break;
  1627. case STATE_ERROR:
  1628. case STATE_DESTROYING:
  1629. break;
  1630. }
  1631. }
  1632. return OnStopInternal();
  1633. }
  1634. void RtmpClientStream::DestroyStreamUserData(SocketUniquePtr& sending_sock,
  1635. Controller* cntl,
  1636. int /*error_code*/,
  1637. bool end_of_rpc) {
  1638. if (!end_of_rpc) {
  1639. if (sending_sock) {
  1640. if (_from_socketmap) {
  1641. _client_impl->socket_map().Remove(SocketMapKey(sending_sock->remote_side()),
  1642. sending_sock->id());
  1643. } else {
  1644. sending_sock->SetFailed(); // not necessary, already failed.
  1645. }
  1646. }
  1647. } else {
  1648. // Always move sending_sock into _rtmpsock at the end of rpc.
  1649. // - If the RPC is successful, moving sending_sock prevents it from
  1650. // setfailed in Controller after calling this method.
  1651. // - If the RPC is failed, OnStopInternal() can clean up the socket_map
  1652. // inserted in OnCreatingStream().
  1653. _rtmpsock.swap(sending_sock);
  1654. }
  1655. }
  1656. void RtmpClientStream::DestroyStreamCreator(Controller* cntl) {
  1657. if (cntl->Failed()) {
  1658. if (_rtmpsock != NULL &&
  1659. // ^ If sending_sock is NULL, the RPC fails before _pack_request
  1660. // which calls AddTransaction, in another word, RemoveTransaction
  1661. // is not needed.
  1662. cntl->ErrorCode() != ERTMPCREATESTREAM) {
  1663. // ^ ERTMPCREATESTREAM is triggered by receiving "_error" command,
  1664. // RemoveTransaction should already be called.
  1665. CHECK_LT(cntl->log_id(), (uint64_t)std::numeric_limits<uint32_t>::max());
  1666. const uint32_t transaction_id = cntl->log_id();
  1667. policy::RtmpContext* rtmp_ctx =
  1668. static_cast<policy::RtmpContext*>(_rtmpsock->parsing_context());
  1669. if (rtmp_ctx == NULL) {
  1670. LOG(FATAL) << "RtmpContext must be created";
  1671. } else {
  1672. policy::RtmpTransactionHandler* handler =
  1673. rtmp_ctx->RemoveTransaction(transaction_id);
  1674. if (handler) {
  1675. handler->Cancel();
  1676. }
  1677. }
  1678. }
  1679. return OnFailedToCreateStream();
  1680. }
  1681. int rc = 0;
  1682. bthread_id_t onfail_id = INVALID_BTHREAD_ID;
  1683. {
  1684. std::unique_lock<butil::Mutex> mu(_state_mutex);
  1685. switch (_state) {
  1686. case STATE_CREATING:
  1687. CHECK(_rtmpsock);
  1688. rc = bthread_id_create(&onfail_id, this, RunOnFailed);
  1689. if (rc) {
  1690. cntl->SetFailed(ENOMEM, "Fail to create _onfail_id: %s", berror(rc));
  1691. mu.unlock();
  1692. return OnFailedToCreateStream();
  1693. }
  1694. // Add a ref for RunOnFailed.
  1695. butil::intrusive_ptr<RtmpClientStream>(this).detach();
  1696. _state = STATE_CREATED;
  1697. _onfail_id = onfail_id;
  1698. break;
  1700. case STATE_CREATED:
  1701. _state = STATE_ERROR;
  1702. mu.unlock();
  1703. CHECK(false) << "Impossible";
  1704. return OnStopInternal();
  1705. case STATE_ERROR:
  1706. case STATE_DESTROYING:
  1707. mu.unlock();
  1708. return OnStopInternal();
  1709. }
  1710. }
  1711. if (onfail_id != INVALID_BTHREAD_ID) {
  1712. _rtmpsock->NotifyOnFailed(onfail_id);
  1713. }
  1714. }
  1715. void RtmpClientStream::OnStopInternal() {
  1716. if (_rtmpsock == NULL) {
  1717. return CallOnStop();
  1718. }
  1719. if (!_rtmpsock->Failed() && _chunk_stream_id != 0) {
  1720. // SRS requires closeStream which is sent over this stream.
  1721. butil::IOBuf req_buf1;
  1722. {
  1723. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf1);
  1724. AMFOutputStream ostream(&zc_stream);
  1725. WriteAMFString(RTMP_AMF0_COMMAND_CLOSE_STREAM, &ostream);
  1726. WriteAMFUint32(0, &ostream);
  1727. WriteAMFNull(&ostream);
  1728. CHECK(ostream.good());
  1729. }
  1730. SocketMessagePtr<policy::RtmpUnsentMessage> msg1(new policy::RtmpUnsentMessage);
  1731. msg1->header.message_length = req_buf1.size();
  1732. msg1->header.message_type = policy::RTMP_MESSAGE_COMMAND_AMF0;
  1733. msg1->header.stream_id = _message_stream_id;
  1734. msg1->chunk_stream_id = _chunk_stream_id;
  1735. msg1->body = req_buf1;
  1736. // Send deleteStream over the control stream.
  1737. butil::IOBuf req_buf2;
  1738. {
  1739. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf2);
  1740. AMFOutputStream ostream(&zc_stream);
  1741. WriteAMFString(RTMP_AMF0_COMMAND_DELETE_STREAM, &ostream);
  1742. WriteAMFUint32(0, &ostream);
  1743. WriteAMFNull(&ostream);
  1744. WriteAMFUint32(_message_stream_id, &ostream);
  1745. CHECK(ostream.good());
  1746. }
  1747. policy::RtmpUnsentMessage* msg2 = policy::MakeUnsentControlMessage(
  1748. policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf2);
  1749. msg1->next.reset(msg2);
  1750. if (policy::WriteWithoutOvercrowded(_rtmpsock.get(), msg1) != 0) {
  1751. if (errno != EFAILEDSOCKET) {
  1752. PLOG(WARNING) << "Fail to send closeStream/deleteStream to "
  1753. << _rtmpsock->remote_side() << "["
  1754. << _message_stream_id << "]";
  1755. // Close the connection to make sure the server-side knows the
  1756. // closing event, however this may terminate other streams over
  1757. // the connection as well.
  1758. _rtmpsock->SetFailed(EFAILEDSOCKET, "Fail to send closeStream/deleteStream");
  1759. }
  1760. }
  1761. }
  1762. policy::RtmpContext* ctx =
  1763. static_cast<policy::RtmpContext*>(_rtmpsock->parsing_context());
  1764. if (ctx != NULL) {
  1765. if (!ctx->RemoveMessageStream(this)) {
  1766. // The stream is not registered yet. Is this normal?
  1767. LOG(ERROR) << "Fail to remove stream_id=" << _message_stream_id;
  1768. }
  1769. } else {
  1770. LOG(FATAL) << "RtmpContext of " << *_rtmpsock << " is NULL";
  1771. }
  1772. if (_from_socketmap) {
  1773. _client_impl->socket_map().Remove(SocketMapKey(_rtmpsock->remote_side()),
  1774. _rtmpsock->id());
  1775. } else {
  1776. _rtmpsock->ReleaseAdditionalReference();
  1777. }
  1778. CallOnStop();
  1779. }
  1780. RtmpPlayOptions::RtmpPlayOptions()
  1781. : start(-2)
  1782. , duration(-1)
  1783. , reset(true) {
  1784. }
  1785. int RtmpClientStream::Play(const RtmpPlayOptions& opt) {
  1786. if (_rtmpsock == NULL) {
  1787. errno = EPERM;
  1788. return -1;
  1789. }
  1790. if (opt.stream_name.empty()) {
  1791. LOG(ERROR) << "Empty stream_name";
  1792. errno = EINVAL;
  1793. return -1;
  1794. }
  1795. if (_client_impl == NULL) {
  1796. LOG(ERROR) << "The client stream is not created yet";
  1797. errno = EPERM;
  1798. return -1;
  1799. }
  1800. butil::IOBuf req_buf;
  1801. {
  1802. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  1803. AMFOutputStream ostream(&zc_stream);
  1804. WriteAMFString(RTMP_AMF0_COMMAND_PLAY, &ostream);
  1805. WriteAMFUint32(0, &ostream);
  1806. WriteAMFNull(&ostream);
  1807. WriteAMFString(opt.stream_name, &ostream);
  1808. WriteAMFNumber(opt.start, &ostream);
  1809. WriteAMFNumber(opt.duration, &ostream);
  1810. WriteAMFBool(opt.reset, &ostream);
  1811. CHECK(ostream.good());
  1812. }
  1813. SocketMessagePtr<policy::RtmpUnsentMessage> msg1(new policy::RtmpUnsentMessage);
  1814. msg1->header.message_length = req_buf.size();
  1815. msg1->header.message_type = policy::RTMP_MESSAGE_COMMAND_AMF0;
  1816. msg1->header.stream_id = _message_stream_id;
  1817. msg1->chunk_stream_id = _chunk_stream_id;
  1818. msg1->body = req_buf;
  1819. if (_client_impl->options().buffer_length_ms > 0) {
  1820. char data[10];
  1821. char* p = data;
  1822. policy::WriteBigEndian2Bytes(
  1824. policy::WriteBigEndian4Bytes(&p, stream_id());
  1825. policy::WriteBigEndian4Bytes(&p, _client_impl->options().buffer_length_ms);
  1826. policy::RtmpUnsentMessage* msg2 = policy::MakeUnsentControlMessage(
  1827. policy::RTMP_MESSAGE_USER_CONTROL, data, sizeof(data));
  1828. msg1->next.reset(msg2);
  1829. }
  1830. // FIXME(gejun): Do we need to SetChunkSize for play?
  1831. // if (_client_impl->options().chunk_size > policy::RTMP_INITIAL_CHUNK_SIZE) {
  1832. // if (SetChunkSize(_client_impl->options().chunk_size) != 0) {
  1833. // return -1;
  1834. // }
  1835. // }
  1836. return _rtmpsock->Write(msg1);
  1837. }
  1838. int RtmpClientStream::Play2(const RtmpPlay2Options& opt) {
  1839. butil::IOBuf req_buf;
  1840. {
  1841. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  1842. AMFOutputStream ostream(&zc_stream);
  1843. WriteAMFString(RTMP_AMF0_COMMAND_PLAY2, &ostream);
  1844. WriteAMFUint32(0, &ostream);
  1845. WriteAMFNull(&ostream);
  1846. WriteAMFObject(opt, &ostream);
  1847. if (!ostream.good()) {
  1848. LOG(ERROR) << "Fail to serialize play2 request";
  1849. errno = EINVAL;
  1850. return -1;
  1851. }
  1852. }
  1853. return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf);
  1854. }
  1855. const char* RtmpPublishType2Str(RtmpPublishType type) {
  1856. switch (type) {
  1857. case RTMP_PUBLISH_RECORD: return "record";
  1858. case RTMP_PUBLISH_APPEND: return "append";
  1859. case RTMP_PUBLISH_LIVE: return "live";
  1860. }
  1861. return "Unknown RtmpPublishType";
  1862. }
  1863. bool Str2RtmpPublishType(const butil::StringPiece& str, RtmpPublishType* type) {
  1864. if (str == "record") {
  1865. *type = RTMP_PUBLISH_RECORD;
  1866. return true;
  1867. } else if (str == "append") {
  1868. *type = RTMP_PUBLISH_APPEND;
  1869. return true;
  1870. } else if (str == "live") {
  1871. *type = RTMP_PUBLISH_LIVE;
  1872. return true;
  1873. }
  1874. return false;
  1875. }
  1876. int RtmpClientStream::Publish(const butil::StringPiece& name,
  1877. RtmpPublishType type) {
  1878. butil::IOBuf req_buf;
  1879. {
  1880. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  1881. AMFOutputStream ostream(&zc_stream);
  1882. WriteAMFString(RTMP_AMF0_COMMAND_PUBLISH, &ostream);
  1883. WriteAMFUint32(0, &ostream);
  1884. WriteAMFNull(&ostream);
  1885. WriteAMFString(name, &ostream);
  1886. WriteAMFString(RtmpPublishType2Str(type), &ostream);
  1887. CHECK(ostream.good());
  1888. }
  1889. return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf);
  1890. }
  1891. int RtmpClientStream::Seek(double offset_ms) {
  1892. butil::IOBuf req_buf;
  1893. {
  1894. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  1895. AMFOutputStream ostream(&zc_stream);
  1896. WriteAMFString(RTMP_AMF0_COMMAND_SEEK, &ostream);
  1897. WriteAMFUint32(0, &ostream);
  1898. WriteAMFNull(&ostream);
  1899. WriteAMFNumber(offset_ms, &ostream);
  1900. CHECK(ostream.good());
  1901. }
  1902. return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf);
  1903. }
  1904. int RtmpClientStream::Pause(bool pause_or_unpause, double offset_ms) {
  1905. butil::IOBuf req_buf;
  1906. {
  1907. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  1908. AMFOutputStream ostream(&zc_stream);
  1909. WriteAMFString(RTMP_AMF0_COMMAND_PAUSE, &ostream);
  1910. WriteAMFUint32(0, &ostream);
  1911. WriteAMFNull(&ostream);
  1912. WriteAMFBool(pause_or_unpause, &ostream);
  1913. WriteAMFNumber(offset_ms, &ostream);
  1914. CHECK(ostream.good());
  1915. }
  1916. return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf);
  1917. }
  1918. void RtmpClientStream::OnStatus(const RtmpInfo& info) {
  1919. if (info.level() == RTMP_INFO_LEVEL_ERROR) {
  1920. LOG(WARNING) << remote_side() << '[' << stream_id()
  1921. << "] " << info.code() << ": " << info.description();
  1922. return SignalError();
  1923. } else if (info.level() == RTMP_INFO_LEVEL_STATUS) {
  1924. if ((!_options.play_name.empty() &&
  1925. info.code() == RTMP_STATUS_CODE_PLAY_START) ||
  1926. (!_options.publish_name.empty() &&
  1927. info.code() == RTMP_STATUS_CODE_PUBLISH_START)) {
  1928. // the memory fence makes sure that if _is_server_accepted is true,
  1929. // publish request must be sent (so that SendXXX functions can
  1930. // be enabled)
  1931. _is_server_accepted.store(true, butil::memory_order_release);
  1932. }
  1933. }
  1934. }
  1935. RtmpClientStreamOptions::RtmpClientStreamOptions()
  1936. : share_connection(true)
  1937. , wait_until_play_or_publish_is_sent(false)
  1938. , create_stream_max_retry(3)
  1939. , publish_type(RTMP_PUBLISH_LIVE) {
  1940. }
  1941. class OnClientStreamCreated : public google::protobuf::Closure {
  1942. public:
  1943. void Run(); // @Closure
  1944. void CancelBeforeCallMethod() { delete this; }
  1945. public:
  1946. Controller cntl;
  1947. // Hold a reference of stream to prevent it from destructing during an
  1948. // async Create().
  1949. butil::intrusive_ptr<RtmpClientStream> stream;
  1950. };
  1951. void OnClientStreamCreated::Run() {
  1952. std::unique_ptr<OnClientStreamCreated> delete_self(this);
  1953. if (cntl.Failed()) {
  1954. LOG(WARNING) << "Fail to create stream=" << stream->rtmp_url()
  1955. << ": " << cntl.ErrorText();
  1956. return;
  1957. }
  1958. if (stream->_created_stream_with_play_or_publish) {
  1959. // the server accepted the play/publish command packed in createStream
  1960. return;
  1961. }
  1962. const RtmpClientStreamOptions& options = stream->options();
  1963. bool do_nothing = true;
  1964. if (!options.play_name.empty()) {
  1965. do_nothing = false;
  1966. RtmpPlayOptions play_opt;
  1967. play_opt.stream_name = options.play_name;
  1968. if (stream->Play(play_opt) != 0) {
  1969. LOG(WARNING) << "Fail to play " << options.play_name;
  1970. return stream->SignalError();
  1971. }
  1972. }
  1973. if (!options.publish_name.empty()) {
  1974. do_nothing = false;
  1975. if (stream->Publish(options.publish_name, options.publish_type) != 0) {
  1976. LOG(WARNING) << "Fail to publish " << stream->rtmp_url();
  1977. return stream->SignalError();
  1978. }
  1979. }
  1980. if (do_nothing) {
  1981. LOG(ERROR) << "play_name and publish_name are both empty";
  1982. return stream->SignalError();
  1983. }
  1984. }
  1985. void RtmpClientStream::Init(const RtmpClient* client,
  1986. const RtmpClientStreamOptions& options) {
  1987. if (client->_impl == NULL) {
  1988. LOG(FATAL) << "RtmpClient is not initialized";
  1989. return OnStopInternal();
  1990. }
  1991. {
  1992. std::unique_lock<butil::Mutex> mu(_state_mutex);
  1993. if (_state == STATE_DESTROYING || _state == STATE_ERROR) {
  1994. // already Destroy()-ed or SignalError()-ed
  1995. LOG(WARNING) << "RtmpClientStream=" << this << " was already "
  1996. "Destroy()-ed, stop Init()";
  1997. return;
  1998. }
  1999. }
  2000. _client_impl = client->_impl;
  2001. _options = options;
  2002. OnClientStreamCreated* done = new OnClientStreamCreated;
  2003. done->stream.reset(this);
  2004. done->cntl.set_stream_creator(this);
  2005. done->cntl.set_connection_type(_options.share_connection ?
  2008. _from_socketmap = (done->cntl.connection_type() == CONNECTION_TYPE_SINGLE);
  2009. done->cntl.set_max_retry(_options.create_stream_max_retry);
  2010. if (_options.hash_code.has_been_set()) {
  2011. done->cntl.set_request_code(_options.hash_code);
  2012. }
  2013. // Hack: we pass stream as response so that PackRtmpRequest can get
  2014. // the stream from controller.
  2015. google::protobuf::Message* res = (google::protobuf::Message*)this;
  2016. const CallId call_id = done->cntl.call_id();
  2017. {
  2018. std::unique_lock<butil::Mutex> mu(_state_mutex);
  2019. switch (_state) {
  2021. _state = STATE_CREATING;
  2022. _create_stream_rpc_id = call_id;
  2023. break;
  2024. case STATE_CREATING:
  2025. case STATE_CREATED:
  2026. mu.unlock();
  2027. LOG(ERROR) << "RtmpClientStream::Init() is called by multiple "
  2028. "threads simultaneously";
  2029. return done->CancelBeforeCallMethod();
  2030. case STATE_ERROR:
  2031. case STATE_DESTROYING:
  2032. mu.unlock();
  2033. return done->CancelBeforeCallMethod();
  2034. }
  2035. }
  2036. _client_impl->_chan.CallMethod(NULL, &done->cntl, NULL, res, done);
  2037. if (options.wait_until_play_or_publish_is_sent) {
  2038. Join(call_id);
  2039. }
  2040. }
  2041. std::string RtmpClientStream::rtmp_url() const {
  2042. if (_client_impl == NULL) {
  2043. return std::string();
  2044. }
  2045. butil::StringPiece tcurl = _client_impl->options().tcUrl;
  2046. butil::StringPiece stream_name = _options.stream_name();
  2047. std::string result;
  2048. result.reserve(tcurl.size() + 1 + stream_name.size());
  2049. result.append(tcurl.data(), tcurl.size());
  2050. result.push_back('/');
  2051. result.append(stream_name.data(), stream_name.size());
  2052. return result;
  2053. }
  2054. // ========= RtmpRetryingClientStream ============
  2055. RtmpRetryingClientStreamOptions::RtmpRetryingClientStreamOptions()
  2056. : retry_interval_ms(1000)
  2057. , max_retry_duration_ms(-1)
  2058. , fast_retry_count(2)
  2059. , quit_when_no_data_ever(true) {
  2060. }
  2061. RtmpRetryingClientStream::RtmpRetryingClientStream()
  2062. : RtmpStreamBase(true)
  2063. , _destroying(false)
  2064. , _called_on_stop(false)
  2065. , _changed_stream(false)
  2066. , _has_timer_ever(false)
  2067. , _is_server_accepted_ever(false)
  2068. , _num_fast_retries(0)
  2069. , _last_creation_time_us(0)
  2070. , _last_retry_start_time_us(0)
  2071. , _create_timer_id(0)
  2072. , _sub_stream_creator(NULL) {
  2073. get_rtmp_bvars()->retrying_client_stream_count << 1;
  2074. _self_ref.reset(this);
  2075. }
  2076. RtmpRetryingClientStream::~RtmpRetryingClientStream() {
  2077. delete _sub_stream_creator;
  2078. _sub_stream_creator = NULL;
  2079. get_rtmp_bvars()->retrying_client_stream_count << -1;
  2080. }
  2081. void RtmpRetryingClientStream::CallOnStopIfNeeded() {
  2082. // CallOnStop uses locks, we don't need memory fence on _called_on_stop,
  2083. // atomic ops is enough.
  2084. if (!_called_on_stop.load(butil::memory_order_relaxed) &&
  2085. !_called_on_stop.exchange(true, butil::memory_order_relaxed)) {
  2086. CallOnStop();
  2087. }
  2088. }
  2089. void RtmpRetryingClientStream::Destroy() {
  2090. if (_destroying.exchange(true, butil::memory_order_relaxed)) {
  2091. // Destroy() was already called.
  2092. return;
  2093. }
  2094. // Make sure _self_ref is released before quiting this function.
  2095. // Notice that _self_ref.reset(NULL) is wrong because it may destructs
  2096. // this object immediately.
  2097. butil::intrusive_ptr<RtmpRetryingClientStream> self_ref;
  2098. _self_ref.swap(self_ref);
  2099. butil::intrusive_ptr<RtmpStreamBase> old_sub_stream;
  2100. {
  2101. BAIDU_SCOPED_LOCK(_stream_mutex);
  2102. // swap instead of reset(NULL) to make the stream destructed
  2103. // outside _stream_mutex.
  2104. _using_sub_stream.swap(old_sub_stream);
  2105. }
  2106. if (old_sub_stream) {
  2107. old_sub_stream->Destroy();
  2108. }
  2109. if (_has_timer_ever) {
  2110. if (bthread_timer_del(_create_timer_id) == 0) {
  2111. // The callback is not run yet. Remove the additional ref added
  2112. // before creating the timer.
  2113. butil::intrusive_ptr<RtmpRetryingClientStream> deref(this, false);
  2114. }
  2115. }
  2116. return CallOnStopIfNeeded();
  2117. }
  2118. void RtmpRetryingClientStream::Init(
  2119. SubStreamCreator* sub_stream_creator,
  2120. const RtmpRetryingClientStreamOptions& options) {
  2121. if (sub_stream_creator == NULL) {
  2122. LOG(ERROR) << "sub_stream_creator is NULL";
  2123. return CallOnStopIfNeeded();
  2124. }
  2125. _sub_stream_creator = sub_stream_creator;
  2126. if (_destroying.load(butil::memory_order_relaxed)) {
  2127. LOG(WARNING) << "RtmpRetryingClientStream=" << this << " was already "
  2128. "Destroy()-ed, stop Init()";
  2129. return;
  2130. }
  2131. _options = options;
  2132. // retrying stream does not support this option.
  2133. _options.wait_until_play_or_publish_is_sent = false;
  2134. _last_retry_start_time_us = butil::gettimeofday_us();
  2135. Recreate();
  2136. }
  2137. void RetryingClientMessageHandler::OnPlayable() {
  2138. _parent->OnPlayable();
  2139. }
  2140. void RetryingClientMessageHandler::OnUserData(void* msg) {
  2141. _parent->CallOnUserData(msg);
  2142. }
  2143. void RetryingClientMessageHandler::OnCuePoint(brpc::RtmpCuePoint* cuepoint) {
  2144. _parent->CallOnCuePoint(cuepoint);
  2145. }
  2146. void RetryingClientMessageHandler::OnMetaData(brpc::RtmpMetaData* metadata, const butil::StringPiece& name) {
  2147. _parent->CallOnMetaData(metadata, name);
  2148. }
  2149. void RetryingClientMessageHandler::OnAudioMessage(brpc::RtmpAudioMessage* msg) {
  2150. _parent->CallOnAudioMessage(msg);
  2151. }
  2152. void RetryingClientMessageHandler::OnVideoMessage(brpc::RtmpVideoMessage* msg) {
  2153. _parent->CallOnVideoMessage(msg);
  2154. }
  2155. void RetryingClientMessageHandler::OnSharedObjectMessage(RtmpSharedObjectMessage* msg) {
  2156. _parent->CallOnSharedObjectMessage(msg);
  2157. }
  2158. void RetryingClientMessageHandler::OnSubStreamStop(RtmpStreamBase* sub_stream) {
  2159. _parent->OnSubStreamStop(sub_stream);
  2160. }
  2161. RetryingClientMessageHandler::RetryingClientMessageHandler(RtmpRetryingClientStream* parent)
  2162. : _parent(parent) {}
  2163. void RtmpRetryingClientStream::Recreate() {
  2164. butil::intrusive_ptr<RtmpStreamBase> sub_stream;
  2165. _sub_stream_creator->NewSubStream(new RetryingClientMessageHandler(this), &sub_stream);
  2166. butil::intrusive_ptr<RtmpStreamBase> old_sub_stream;
  2167. bool destroying = false;
  2168. {
  2169. BAIDU_SCOPED_LOCK(_stream_mutex);
  2170. // Need to check _destroying to avoid setting the new sub_stream to a
  2171. // destroying retrying stream.
  2172. // Note: the load of _destroying and the setting of _using_sub_stream
  2173. // must be in the same lock, otherwise current bthread may be scheduled
  2174. // and Destroy() may be called, making new sub_stream leaked.
  2175. destroying = _destroying.load(butil::memory_order_relaxed);
  2176. if (!destroying) {
  2177. _using_sub_stream.swap(old_sub_stream);
  2178. _using_sub_stream = sub_stream;
  2179. _changed_stream = true;
  2180. }
  2181. }
  2182. if (old_sub_stream) {
  2183. old_sub_stream->Destroy();
  2184. }
  2185. if (destroying) {
  2186. sub_stream->Destroy();
  2187. return;
  2188. }
  2189. _last_creation_time_us = butil::gettimeofday_us();
  2190. // If Init() of sub_stream is called before setting _using_sub_stream,
  2191. // OnStop() may happen before _using_sub_stream is set and the stopped
  2192. // stream is wrongly left in the variable.
  2193. _sub_stream_creator->LaunchSubStream(sub_stream.get(), &_options);
  2194. }
  2195. void RtmpRetryingClientStream::OnRecreateTimer(void* arg) {
  2196. // Hold the referenced stream.
  2197. butil::intrusive_ptr<RtmpRetryingClientStream> ptr(
  2198. static_cast<RtmpRetryingClientStream*>(arg), false/*not add ref*/);
  2199. ptr->Recreate();
  2200. }
  2201. void RtmpRetryingClientStream::OnSubStreamStop(RtmpStreamBase* sub_stream) {
  2202. // Make sure the sub_stream is destroyed after this function.
  2203. DestroyingPtr<RtmpStreamBase> sub_stream_guard(sub_stream);
  2204. butil::intrusive_ptr<RtmpStreamBase> removed_sub_stream;
  2205. {
  2206. BAIDU_SCOPED_LOCK(_stream_mutex);
  2207. if (sub_stream == _using_sub_stream) {
  2208. _using_sub_stream.swap(removed_sub_stream);
  2209. }
  2210. }
  2211. if (removed_sub_stream == NULL ||
  2212. _destroying.load(butil::memory_order_relaxed) ||
  2213. _called_on_stop.load(butil::memory_order_relaxed)) {
  2214. return;
  2215. }
  2216. // Update _is_server_accepted_ever
  2217. if (sub_stream->is_server_accepted()) {
  2218. _is_server_accepted_ever = true;
  2219. }
  2220. if (_options.max_retry_duration_ms == 0) {
  2221. return CallOnStopIfNeeded();
  2222. }
  2223. // If the sub_stream has data ever, count this retry as the beginning
  2224. // of RtmpRetryingClientStreamOptions.max_retry_duration_ms.
  2225. if ((!_options.play_name.empty() && sub_stream->has_data_ever()) ||
  2226. (!_options.publish_name.empty() && sub_stream->is_server_accepted())) {
  2227. const int64_t now = butil::gettimeofday_us();
  2228. if (now >= _last_retry_start_time_us +
  2229. 3 * _options.retry_interval_ms * 1000L) {
  2230. // re-enable fast retries when the interval is long enough.
  2231. // `3' is just a randomly-chosen (small) number.
  2232. _num_fast_retries = 0;
  2233. }
  2234. _last_retry_start_time_us = now;
  2235. }
  2236. // Check max duration. Notice that this branch cannot be moved forward
  2237. // above branch which may update _last_retry_start_time_us
  2238. if (_options.max_retry_duration_ms > 0 &&
  2239. butil::gettimeofday_us() >
  2240. (_last_retry_start_time_us + _options.max_retry_duration_ms * 1000L)) {
  2241. // exceed the duration, stop retrying.
  2242. return CallOnStopIfNeeded();
  2243. }
  2244. if (_num_fast_retries < _options.fast_retry_count) {
  2245. ++_num_fast_retries;
  2246. // Retry immediately for several times. Works for scenarios like:
  2247. // restarting servers, occasional connection lost etc...
  2248. return Recreate();
  2249. }
  2250. if (_options.quit_when_no_data_ever &&
  2251. ((!_options.play_name.empty() && !has_data_ever()) ||
  2252. (!_options.publish_name.empty() && !_is_server_accepted_ever))) {
  2253. // Stop retrying when created playing streams never have data or
  2254. // publishing streams were never accepted. It's very likely that
  2255. // continuing retrying does not make sense.
  2256. return CallOnStopIfNeeded();
  2257. }
  2258. const int64_t wait_us = _last_creation_time_us +
  2259. _options.retry_interval_ms * 1000L - butil::gettimeofday_us();
  2260. if (wait_us > 0) {
  2261. // retry is too frequent, schedule the retry.
  2262. // Add a ref for OnRecreateTimer which does deref.
  2263. butil::intrusive_ptr<RtmpRetryingClientStream>(this).detach();
  2264. if (bthread_timer_add(&_create_timer_id,
  2265. butil::microseconds_from_now(wait_us),
  2266. OnRecreateTimer, this) != 0) {
  2267. LOG(ERROR) << "Fail to create timer";
  2268. return CallOnStopIfNeeded();
  2269. }
  2270. _has_timer_ever = true;
  2271. } else {
  2272. Recreate();
  2273. }
  2274. }
  2275. int RtmpRetryingClientStream::AcquireStreamToSend(
  2276. butil::intrusive_ptr<RtmpStreamBase>* ptr) {
  2277. BAIDU_SCOPED_LOCK(_stream_mutex);
  2278. if (!_using_sub_stream) {
  2279. errno = EPERM;
  2280. return -1;
  2281. }
  2282. if (!_using_sub_stream->is_server_accepted()) {
  2283. // not published yet.
  2284. errno = EPERM;
  2285. return -1;
  2286. }
  2287. if (_changed_stream) {
  2288. _changed_stream = false;
  2289. errno = ERTMPPUBLISHABLE;
  2290. return -1;
  2291. }
  2292. *ptr = _using_sub_stream;
  2293. return 0;
  2294. }
  2295. int RtmpRetryingClientStream::SendCuePoint(const RtmpCuePoint& obj) {
  2296. butil::intrusive_ptr<RtmpStreamBase> ptr;
  2297. if (AcquireStreamToSend(&ptr) != 0) {
  2298. return -1;
  2299. }
  2300. return ptr->SendCuePoint(obj);
  2301. }
  2302. int RtmpRetryingClientStream::SendMetaData(const RtmpMetaData& obj, const butil::StringPiece& name) {
  2303. butil::intrusive_ptr<RtmpStreamBase> ptr;
  2304. if (AcquireStreamToSend(&ptr) != 0) {
  2305. return -1;
  2306. }
  2307. return ptr->SendMetaData(obj, name);
  2308. }
  2309. int RtmpRetryingClientStream::SendSharedObjectMessage(
  2310. const RtmpSharedObjectMessage& msg) {
  2311. butil::intrusive_ptr<RtmpStreamBase> ptr;
  2312. if (AcquireStreamToSend(&ptr) != 0) {
  2313. return -1;
  2314. }
  2315. return ptr->SendSharedObjectMessage(msg);
  2316. }
  2317. int RtmpRetryingClientStream::SendAudioMessage(const RtmpAudioMessage& msg) {
  2318. butil::intrusive_ptr<RtmpStreamBase> ptr;
  2319. if (AcquireStreamToSend(&ptr) != 0) {
  2320. return -1;
  2321. }
  2322. return ptr->SendAudioMessage(msg);
  2323. }
  2324. int RtmpRetryingClientStream::SendAACMessage(const RtmpAACMessage& msg) {
  2325. butil::intrusive_ptr<RtmpStreamBase> ptr;
  2326. if (AcquireStreamToSend(&ptr) != 0) {
  2327. return -1;
  2328. }
  2329. return ptr->SendAACMessage(msg);
  2330. }
  2331. int RtmpRetryingClientStream::SendVideoMessage(const RtmpVideoMessage& msg) {
  2332. butil::intrusive_ptr<RtmpStreamBase> ptr;
  2333. if (AcquireStreamToSend(&ptr) != 0) {
  2334. return -1;
  2335. }
  2336. return ptr->SendVideoMessage(msg);
  2337. }
  2338. int RtmpRetryingClientStream::SendAVCMessage(const RtmpAVCMessage& msg) {
  2339. butil::intrusive_ptr<RtmpStreamBase> ptr;
  2340. if (AcquireStreamToSend(&ptr) != 0) {
  2341. return -1;
  2342. }
  2343. return ptr->SendAVCMessage(msg);
  2344. }
  2345. void RtmpRetryingClientStream::StopCurrentStream() {
  2346. butil::intrusive_ptr<RtmpStreamBase> sub_stream;
  2347. {
  2348. BAIDU_SCOPED_LOCK(_stream_mutex);
  2349. sub_stream = _using_sub_stream;
  2350. }
  2351. if (sub_stream) {
  2352. sub_stream->SignalError();
  2353. }
  2354. }
  2355. void RtmpRetryingClientStream::OnPlayable() {}
  2356. butil::EndPoint RtmpRetryingClientStream::remote_side() const {
  2357. {
  2358. BAIDU_SCOPED_LOCK(_stream_mutex);
  2359. if (_using_sub_stream) {
  2360. return _using_sub_stream->remote_side();
  2361. }
  2362. }
  2363. return butil::EndPoint();
  2364. }
  2365. butil::EndPoint RtmpRetryingClientStream::local_side() const {
  2366. {
  2367. BAIDU_SCOPED_LOCK(_stream_mutex);
  2368. if (_using_sub_stream) {
  2369. return _using_sub_stream->local_side();
  2370. }
  2371. }
  2372. return butil::EndPoint();
  2373. }
  2374. // =========== RtmpService ===============
  2375. void RtmpService::OnPingResponse(const butil::EndPoint&, uint32_t) {
  2376. // TODO: put into some bvars?
  2377. }
  2378. RtmpServerStream::RtmpServerStream()
  2379. : RtmpStreamBase(false)
  2380. , _client_supports_stream_multiplexing(false)
  2381. , _is_publish(false)
  2382. , _onfail_id(INVALID_BTHREAD_ID) {
  2383. get_rtmp_bvars()->server_stream_count << 1;
  2384. }
  2385. RtmpServerStream::~RtmpServerStream() {
  2386. get_rtmp_bvars()->server_stream_count << -1;
  2387. }
  2388. void RtmpServerStream::Destroy() {
  2389. CHECK(false) << "You're not supposed to call Destroy() for server-side streams";
  2390. }
  2391. void RtmpServerStream::OnPlay(const RtmpPlayOptions& opt,
  2392. butil::Status* status,
  2393. google::protobuf::Closure* done) {
  2394. ClosureGuard done_guard(done);
  2395. status->set_error(EPERM, "%s[%u] ignored play{stream_name=%s start=%f"
  2396. " duration=%f reset=%d}",
  2397. butil::endpoint2str(remote_side()).c_str(), stream_id(),
  2398. opt.stream_name.c_str(), opt.start, opt.duration,
  2399. (int)opt.reset);
  2400. }
  2401. void RtmpServerStream::OnPlay2(const RtmpPlay2Options& opt) {
  2402. LOG(ERROR) << remote_side() << '[' << stream_id()
  2403. << "] ignored play2{" << opt.ShortDebugString() << '}';
  2404. }
  2405. void RtmpServerStream::OnPublish(const std::string& name,
  2406. RtmpPublishType type,
  2407. butil::Status* status,
  2408. google::protobuf::Closure* done) {
  2409. ClosureGuard done_guard(done);
  2410. status->set_error(EPERM, "%s[%u] ignored publish{stream_name=%s type=%s}",
  2411. butil::endpoint2str(remote_side()).c_str(), stream_id(),
  2412. name.c_str(), RtmpPublishType2Str(type));
  2413. }
  2414. int RtmpServerStream::OnSeek(double offset_ms) {
  2415. LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored seek("
  2416. << offset_ms << ")";
  2417. return -1;
  2418. }
  2419. int RtmpServerStream::OnPause(bool pause, double offset_ms) {
  2420. LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored "
  2421. << (pause ? "pause" : "unpause")
  2422. << "(offset_ms=" << offset_ms << ")";
  2423. return -1;
  2424. }
  2425. void RtmpServerStream::OnSetBufferLength(uint32_t /*buffer_length_ms*/) {}
  2426. int RtmpServerStream::SendStopMessage(const butil::StringPiece& error_desc) {
  2427. if (_rtmpsock == NULL) {
  2428. errno = EINVAL;
  2429. return -1;
  2430. }
  2431. if (FLAGS_rtmp_server_close_connection_on_error &&
  2432. !_client_supports_stream_multiplexing) {
  2433. _rtmpsock->SetFailed(EFAILEDSOCKET, "Close connection because %.*s",
  2434. (int)error_desc.size(), error_desc.data());
  2435. // The purpose is to close the connection, no matter what SetFailed()
  2436. // returns, the operation should be done.
  2437. LOG_IF(WARNING, FLAGS_log_error_text)
  2438. << "Close connection because " << error_desc;
  2439. return 0;
  2440. }
  2441. // Send StreamNotFound error to make the client close connections.
  2442. // Works for flashplayer and ffplay(not started playing), not work for SRS
  2443. // and ffplay(started playing)
  2444. butil::IOBuf req_buf;
  2445. RtmpInfo info;
  2446. {
  2447. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  2448. AMFOutputStream ostream(&zc_stream);
  2449. WriteAMFString(RTMP_AMF0_COMMAND_ON_STATUS, &ostream);
  2450. WriteAMFUint32(0, &ostream);
  2451. WriteAMFNull(&ostream);
  2452. if (_is_publish) {
  2453. // NetStream.Publish.Rejected does not work for ffmpeg, works for OBS.
  2454. // NetStream.Publish.BadName does not work for OBS.
  2455. // NetStream.Play.StreamNotFound is not accurate but works for both
  2456. // ffmpeg and OBS.
  2457. info.set_code(RTMP_STATUS_CODE_STREAM_NOT_FOUND);
  2458. } else {
  2459. info.set_code(RTMP_STATUS_CODE_STREAM_NOT_FOUND);
  2460. }
  2461. info.set_level(RTMP_INFO_LEVEL_ERROR);
  2462. if (!error_desc.empty()) {
  2463. info.set_description(error_desc.as_string());
  2464. }
  2465. WriteAMFObject(info, &ostream);
  2466. }
  2467. SocketMessagePtr<policy::RtmpUnsentMessage> msg(new policy::RtmpUnsentMessage);
  2468. msg->header.message_length = req_buf.size();
  2469. msg->header.message_type = policy::RTMP_MESSAGE_COMMAND_AMF0;
  2470. msg->header.stream_id = _message_stream_id;
  2471. msg->chunk_stream_id = _chunk_stream_id;
  2472. msg->body = req_buf;
  2473. if (policy::WriteWithoutOvercrowded(_rtmpsock.get(), msg) != 0) {
  2475. << _rtmpsock->remote_side() << '[' << _message_stream_id
  2476. << "]: Fail to send " << info.code() << ": " << error_desc;
  2477. return -1;
  2478. }
  2479. LOG_IF(WARNING, FLAGS_log_error_text)
  2480. << _rtmpsock->remote_side() << '[' << _message_stream_id << "]: Sent "
  2481. << info.code() << ' ' << error_desc;
  2482. return 0;
  2483. }
  2484. // Call this method to send StreamDry to the client.
  2485. // Returns 0 on success, -1 otherwise.
  2486. int RtmpServerStream::SendStreamDry() {
  2487. char data[6];
  2488. char* p = data;
  2489. policy::WriteBigEndian2Bytes(&p, policy::RTMP_USER_CONTROL_EVENT_STREAM_DRY);
  2490. policy::WriteBigEndian4Bytes(&p, stream_id());
  2491. return SendControlMessage(policy::RTMP_MESSAGE_USER_CONTROL, data, sizeof(data));
  2492. }
  2493. int RtmpServerStream::RunOnFailed(bthread_id_t id, void* data, int) {
  2494. butil::intrusive_ptr<RtmpServerStream> stream(
  2495. static_cast<RtmpServerStream*>(data), false);
  2496. CHECK(stream->_rtmpsock);
  2497. stream->OnStopInternal();
  2498. bthread_id_unlock_and_destroy(id);
  2499. return 0;
  2500. }
  2501. void RtmpServerStream::OnStopInternal() {
  2502. if (_rtmpsock == NULL) {
  2503. return CallOnStop();
  2504. }
  2505. policy::RtmpContext* ctx =
  2506. static_cast<policy::RtmpContext*>(_rtmpsock->parsing_context());
  2507. if (ctx == NULL) {
  2508. LOG(FATAL) << _rtmpsock->remote_side() << ": RtmpContext of "
  2509. << *_rtmpsock << " is NULL";
  2510. return CallOnStop();
  2511. }
  2512. if (ctx->RemoveMessageStream(this)) {
  2513. return CallOnStop();
  2514. }
  2515. }
  2516. butil::StringPiece RemoveRtmpPrefix(const butil::StringPiece& url_in) {
  2517. if (!url_in.starts_with("rtmp://")) {
  2518. return url_in;
  2519. }
  2520. butil::StringPiece url = url_in;
  2521. size_t i = 7;
  2522. for (; i < url.size() && url[i] == '/'; ++i);
  2523. url.remove_prefix(i);
  2524. return url;
  2525. }
  2526. butil::StringPiece RemoveProtocolPrefix(const butil::StringPiece& url_in) {
  2527. size_t proto_pos = url_in.find("://");
  2528. if (proto_pos == butil::StringPiece::npos) {
  2529. return url_in;
  2530. }
  2531. butil::StringPiece url = url_in;
  2532. size_t i = proto_pos + 3;
  2533. for (; i < url.size() && url[i] == '/'; ++i);
  2534. url.remove_prefix(i);
  2535. return url;
  2536. }
  2537. void ParseRtmpHostAndPort(const butil::StringPiece& host_and_port,
  2538. butil::StringPiece* host,
  2539. butil::StringPiece* port) {
  2540. size_t colon_pos = host_and_port.find(':');
  2541. if (colon_pos == butil::StringPiece::npos) {
  2542. if (host) {
  2543. *host = host_and_port;
  2544. }
  2545. if (port) {
  2546. *port = "1935";
  2547. }
  2548. } else {
  2549. if (host) {
  2550. *host = host_and_port.substr(0, colon_pos);
  2551. }
  2552. if (port) {
  2553. *port = host_and_port.substr(colon_pos + 1);
  2554. }
  2555. }
  2556. }
  2557. butil::StringPiece RemoveQueryStrings(const butil::StringPiece& stream_name_in,
  2558. butil::StringPiece* query_strings) {
  2559. const size_t qm_pos = stream_name_in.find('?');
  2560. if (qm_pos == butil::StringPiece::npos) {
  2561. if (query_strings) {
  2562. query_strings->clear();
  2563. }
  2564. return stream_name_in;
  2565. } else {
  2566. if (query_strings) {
  2567. *query_strings = stream_name_in.substr(qm_pos + 1);
  2568. }
  2569. return stream_name_in.substr(0, qm_pos);
  2570. }
  2571. }
  2572. // Split vhost from *app in forms of "APP?vhost=..." and overwrite *host.
  2573. static void SplitVHostFromApp(const butil::StringPiece& app_and_vhost,
  2574. butil::StringPiece* app,
  2575. butil::StringPiece* vhost) {
  2576. const size_t q_pos = app_and_vhost.find('?');
  2577. if (q_pos == butil::StringPiece::npos) {
  2578. if (app) {
  2579. *app = app_and_vhost;
  2580. }
  2581. if (vhost) {
  2582. vhost->clear();
  2583. }
  2584. return;
  2585. }
  2586. if (app) {
  2587. *app = app_and_vhost.substr(0, q_pos);
  2588. }
  2589. if (vhost) {
  2590. butil::StringPiece qstr = app_and_vhost.substr(q_pos + 1);
  2591. butil::StringSplitter sp(qstr.data(), qstr.data() + qstr.size(), '&');
  2592. for (; sp; ++sp) {
  2593. butil::StringPiece field(sp.field(), sp.length());
  2594. if (field.starts_with("vhost=")) {
  2595. *vhost = field.substr(6);
  2596. // vhost cannot have port.
  2597. const size_t colon_pos = vhost->find_last_of(':');
  2598. if (colon_pos != butil::StringPiece::npos) {
  2599. vhost->remove_suffix(vhost->size() - colon_pos);
  2600. }
  2601. return;
  2602. }
  2603. }
  2604. vhost->clear();
  2605. }
  2606. }
  2607. void ParseRtmpURL(const butil::StringPiece& rtmp_url_in,
  2608. butil::StringPiece* host,
  2609. butil::StringPiece* vhost,
  2610. butil::StringPiece* port,
  2611. butil::StringPiece* app,
  2612. butil::StringPiece* stream_name) {
  2613. if (stream_name) {
  2614. stream_name->clear();
  2615. }
  2616. butil::StringPiece rtmp_url = RemoveRtmpPrefix(rtmp_url_in);
  2617. size_t slash1_pos = rtmp_url.find_first_of('/');
  2618. if (slash1_pos == butil::StringPiece::npos) {
  2619. if (host || port) {
  2620. ParseRtmpHostAndPort(rtmp_url, host, port);
  2621. }
  2622. if (app) {
  2623. app->clear();
  2624. }
  2625. return;
  2626. }
  2627. if (host || port) {
  2628. ParseRtmpHostAndPort(rtmp_url.substr(0, slash1_pos), host, port);
  2629. }
  2630. // Remove duplicated slashes.
  2631. for (++slash1_pos; slash1_pos < rtmp_url.size() &&
  2632. rtmp_url[slash1_pos] == '/'; ++slash1_pos);
  2633. rtmp_url.remove_prefix(slash1_pos);
  2634. size_t slash2_pos = rtmp_url.find_first_of('/');
  2635. if (slash2_pos == butil::StringPiece::npos) {
  2636. return SplitVHostFromApp(rtmp_url, app, vhost);
  2637. }
  2638. SplitVHostFromApp(rtmp_url.substr(0, slash2_pos), app, vhost);
  2639. if (stream_name != NULL) {
  2640. // Remove duplicated slashes.
  2641. for (++slash2_pos; slash2_pos < rtmp_url.size() &&
  2642. rtmp_url[slash2_pos] == '/'; ++slash2_pos);
  2643. rtmp_url.remove_prefix(slash2_pos);
  2644. *stream_name = rtmp_url;
  2645. }
  2646. }
  2647. std::string MakeRtmpURL(const butil::StringPiece& host,
  2648. const butil::StringPiece& port,
  2649. const butil::StringPiece& app,
  2650. const butil::StringPiece& stream_name) {
  2651. std::string result;
  2652. result.reserve(15 + host.size() + app.size() + stream_name.size());
  2653. result.append("rtmp://");
  2654. result.append(host.data(), host.size());
  2655. if (!port.empty()) {
  2656. result.push_back(':');
  2657. result.append(port.data(), port.size());
  2658. }
  2659. if (!app.empty()) {
  2660. result.push_back('/');
  2661. result.append(app.data(), app.size());
  2662. }
  2663. if (!stream_name.empty()) {
  2664. if (app.empty()) { // extra / to notify user that app is empty.
  2665. result.push_back('/');
  2666. }
  2667. result.push_back('/');
  2668. result.append(stream_name.data(), stream_name.size());
  2669. }
  2670. return result;
  2671. }
  2672. } // namespace brpc