rtmp.cpp 98 KB


  1. // Licensed to the Apache Software Foundation (ASF) under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. #include <gflags/gflags.h>
  18. #include <google/protobuf/io/zero_copy_stream_impl_lite.h> // StringOutputStream
  19. #include "bthread/bthread.h" // bthread_id_xx
  20. #include "bthread/unstable.h" // bthread_timer_del
  21. #include "brpc/log.h"
  22. #include "brpc/callback.h" // Closure
  23. #include "brpc/channel.h" // Channel
  24. #include "brpc/socket_map.h" // SocketMap
  25. #include "brpc/socket.h" // Socket
  26. #include "brpc/policy/rtmp_protocol.h" // policy::*
  27. #include "brpc/rtmp.h"
  28. #include "brpc/details/rtmp_utils.h"
  29. namespace brpc {
  30. DEFINE_bool(rtmp_server_close_connection_on_error, true,
  31. "Close the client connection on play/publish errors, clients setting"
  32. " RtmpConnectRequest.stream_multiplexing to true are not affected"
  33. " by this flag");
  34. struct RtmpBvars {
  35. bvar::Adder<int> client_count;
  36. bvar::Adder<int> client_stream_count;
  37. bvar::Adder<int> retrying_client_stream_count;
  38. bvar::Adder<int> server_stream_count;
  39. RtmpBvars()
  40. : client_count("rtmp_client_count")
  41. , client_stream_count("rtmp_client_stream_count")
  42. , retrying_client_stream_count("rtmp_retrying_client_stream_count")
  43. , server_stream_count("rtmp_server_stream_count") {
  44. }
  45. };
  46. inline RtmpBvars* get_rtmp_bvars() {
  47. return butil::get_leaky_singleton<RtmpBvars>();
  48. }
  49. namespace policy {
  50. int SendC0C1(int fd, bool* is_simple_handshake);
  51. int WriteWithoutOvercrowded(Socket*, SocketMessagePtr<>& msg);
  52. }
  53. FlvWriter::FlvWriter(butil::IOBuf* buf)
  54. : _write_header(false), _buf(buf), _options() {
  55. }
  56. FlvWriter::FlvWriter(butil::IOBuf* buf, const FlvWriterOptions& options)
  57. : _write_header(false), _buf(buf), _options(options) {
  58. }
  59. butil::Status FlvWriter::Write(const RtmpVideoMessage& msg) {
  60. char buf[32];
  61. char* p = buf;
  62. if (!_write_header) {
  63. _write_header = true;
  64. const char flags_bit = static_cast<char>(_options.flv_content_type);
  65. const char header[9] = { 'F', 'L', 'V', 0x01, flags_bit, 0, 0, 0, 0x09 };
  66. memcpy(p, header, sizeof(header));
  67. p += sizeof(header);
  68. policy::WriteBigEndian4Bytes(&p, 0); // PreviousTagSize0
  69. }
  70. // FLV tag
  71. *p++ = FLV_TAG_VIDEO;
  72. policy::WriteBigEndian3Bytes(&p, msg.size());
  73. policy::WriteBigEndian3Bytes(&p, (msg.timestamp & 0xFFFFFF));
  74. *p++ = (msg.timestamp >> 24) & 0xFF;
  75. policy::WriteBigEndian3Bytes(&p, 0); // StreamID
  76. // header of VIDEODATA
  77. *p++ = ((msg.frame_type & 0xF) << 4) | (msg.codec & 0xF);
  78. _buf->append(buf, p - buf);
  79. _buf->append(msg.data);
  80. // PreviousTagSize
  81. p = buf;
  82. policy::WriteBigEndian4Bytes(&p, 11 + msg.size());
  83. _buf->append(buf, p - buf);
  84. return butil::Status::OK();
  85. }
  86. butil::Status FlvWriter::Write(const RtmpAudioMessage& msg) {
  87. char buf[32];
  88. char* p = buf;
  89. if (!_write_header) {
  90. _write_header = true;
  91. const char flags_bit = static_cast<char>(_options.flv_content_type);
  92. const char header[9] = { 'F', 'L', 'V', 0x01, flags_bit, 0, 0, 0, 0x09 };
  93. memcpy(p, header, sizeof(header));
  94. p += sizeof(header);
  95. policy::WriteBigEndian4Bytes(&p, 0); // PreviousTagSize0
  96. }
  97. // FLV tag
  98. *p++ = FLV_TAG_AUDIO;
  99. policy::WriteBigEndian3Bytes(&p, msg.size());
  100. policy::WriteBigEndian3Bytes(&p, (msg.timestamp & 0xFFFFFF));
  101. *p++ = (msg.timestamp >> 24) & 0xFF;
  102. policy::WriteBigEndian3Bytes(&p, 0); // StreamID
  103. // header of AUDIODATA
  104. *p++ = ((msg.codec & 0xF) << 4)
  105. | ((msg.rate & 0x3) << 2)
  106. | ((msg.bits & 0x1) << 1)
  107. | (msg.type & 0x1);
  108. _buf->append(buf, p - buf);
  109. _buf->append(msg.data);
  110. // PreviousTagSize
  111. p = buf;
  112. policy::WriteBigEndian4Bytes(&p, 11 + msg.size());
  113. _buf->append(buf, p - buf);
  114. return butil::Status::OK();
  115. }
  116. butil::Status FlvWriter::WriteScriptData(const butil::IOBuf& req_buf, uint32_t timestamp) {
  117. char buf[32];
  118. char* p = buf;
  119. if (!_write_header) {
  120. _write_header = true;
  121. const char flags_bit = static_cast<char>(_options.flv_content_type);
  122. const char header[9] = { 'F', 'L', 'V', 0x01, flags_bit, 0, 0, 0, 0x09 };
  123. memcpy(p, header, sizeof(header));
  124. p += sizeof(header);
  125. policy::WriteBigEndian4Bytes(&p, 0); // PreviousTagSize0
  126. }
  127. // FLV tag
  128. *p++ = FLV_TAG_SCRIPT_DATA;
  129. policy::WriteBigEndian3Bytes(&p, req_buf.size());
  130. policy::WriteBigEndian3Bytes(&p, (timestamp & 0xFFFFFF));
  131. *p++ = (timestamp >> 24) & 0xFF;
  132. policy::WriteBigEndian3Bytes(&p, 0); // StreamID
  133. _buf->append(buf, p - buf);
  134. _buf->append(req_buf);
  135. // PreviousTagSize
  136. p = buf;
  137. policy::WriteBigEndian4Bytes(&p, 11 + req_buf.size());
  138. _buf->append(buf, p - buf);
  139. return butil::Status::OK();
  140. }
  141. butil::Status FlvWriter::Write(const RtmpCuePoint& cuepoint) {
  142. butil::IOBuf req_buf;
  143. {
  144. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  145. AMFOutputStream ostream(&zc_stream);
  146. WriteAMFString(RTMP_AMF0_SET_DATAFRAME, &ostream);
  147. WriteAMFString(RTMP_AMF0_ON_CUE_POINT, &ostream);
  148. WriteAMFObject(cuepoint.data, &ostream);
  149. if (!ostream.good()) {
  150. return butil::Status(EINVAL, "Fail to serialize cuepoint");
  151. }
  152. }
  153. return WriteScriptData(req_buf, cuepoint.timestamp);
  154. }
  155. butil::Status FlvWriter::Write(const RtmpMetaData& metadata) {
  156. butil::IOBuf req_buf;
  157. {
  158. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  159. AMFOutputStream ostream(&zc_stream);
  160. WriteAMFString(RTMP_AMF0_ON_META_DATA, &ostream);
  161. WriteAMFObject(metadata.data, &ostream);
  162. if (!ostream.good()) {
  163. return butil::Status(EINVAL, "Fail to serialize metadata");
  164. }
  165. }
  166. return WriteScriptData(req_buf, metadata.timestamp);
  167. }
  168. FlvReader::FlvReader(butil::IOBuf* buf)
  169. : _read_header(false), _buf(buf) {
  170. }
  171. static char g_flv_header[9] = { 'F', 'L', 'V', 0x01, 0x05, 0, 0, 0, 0x09 };
  172. butil::Status FlvReader::ReadHeader() {
  173. if (!_read_header) {
  174. char header_buf[sizeof(g_flv_header) + 4/* PreviousTagSize0 */];
  175. const char* p = (const char*)_buf->fetch(header_buf, sizeof(header_buf));
  176. if (p == NULL) {
  177. return butil::Status(EAGAIN, "Fail to read, not enough data");
  178. }
  179. if (memcmp(p, g_flv_header, 3) != 0) {
  180. LOG(FATAL) << "Fail to parse FLV header";
  181. return butil::Status(EINVAL, "Fail to parse FLV header");
  182. }
  183. _buf->pop_front(sizeof(header_buf));
  184. _read_header = true;
  185. }
  186. return butil::Status::OK();
  187. }
  188. butil::Status FlvReader::PeekMessageType(FlvTagType* type_out) {
  189. butil::Status st = ReadHeader();
  190. if (!st.ok()) {
  191. return st;
  192. }
  193. const char* p = (const char*)_buf->fetch1();
  194. if (p == NULL) {
  195. return butil::Status(EAGAIN, "Fail to read, not enough data");
  196. }
  197. FlvTagType type = (FlvTagType)*p;
  198. if (type != FLV_TAG_AUDIO && type != FLV_TAG_VIDEO &&
  199. type != FLV_TAG_SCRIPT_DATA) {
  200. return butil::Status(EINVAL, "Fail to parse FLV tag");
  201. }
  202. if (type_out) {
  203. *type_out = type;
  204. }
  205. return butil::Status::OK();
  206. }
  207. butil::Status FlvReader::Read(RtmpVideoMessage* msg) {
  208. char tags[11];
  209. const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags));
  210. if (p == NULL) {
  211. return butil::Status(EAGAIN, "Fail to read, not enough data");
  212. }
  213. if (*p != FLV_TAG_VIDEO) {
  214. return butil::Status(EINVAL, "Fail to parse RtmpVideoMessage");
  215. }
  216. uint32_t msg_size = policy::ReadBigEndian3Bytes(p + 1);
  217. uint32_t timestamp = policy::ReadBigEndian3Bytes(p + 4);
  218. timestamp |= (*(p + 7) << 24);
  219. if (_buf->length() < 11 + msg_size + 4/*PreviousTagSize*/) {
  220. return butil::Status(EAGAIN, "Fail to read, not enough data");
  221. }
  222. _buf->pop_front(11);
  223. char first_byte = 0;
  224. CHECK(_buf->cut1(&first_byte));
  225. msg->timestamp = timestamp;
  226. msg->frame_type = (FlvVideoFrameType)((first_byte >> 4) & 0xF);
  227. msg->codec = (FlvVideoCodec)(first_byte & 0xF);
  228. // TODO(zhujiashun): check the validation of frame_type and codec
  229. _buf->cutn(&msg->data, msg_size - 1);
  230. _buf->pop_front(4/* PreviousTagSize0 */);
  231. return butil::Status::OK();
  232. }
  233. butil::Status FlvReader::Read(RtmpAudioMessage* msg) {
  234. char tags[11];
  235. const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags));
  236. if (p == NULL) {
  237. return butil::Status(EAGAIN, "Fail to read, not enough data");
  238. }
  239. if (*p != FLV_TAG_AUDIO) {
  240. return butil::Status(EINVAL, "Fail to parse RtmpAudioMessage");
  241. }
  242. uint32_t msg_size = policy::ReadBigEndian3Bytes(p + 1);
  243. uint32_t timestamp = policy::ReadBigEndian3Bytes(p + 4);
  244. timestamp |= (*(p + 7) << 24);
  245. if (_buf->length() < 11 + msg_size + 4/*PreviousTagSize*/) {
  246. return butil::Status(EAGAIN, "Fail to read, not enough data");
  247. }
  248. _buf->pop_front(11);
  249. char first_byte = 0;
  250. CHECK(_buf->cut1(&first_byte));
  251. msg->timestamp = timestamp;
  252. msg->codec = (FlvAudioCodec)((first_byte >> 4) & 0xF);
  253. msg->rate = (FlvSoundRate)((first_byte >> 2) & 0x3);
  254. msg->bits = (FlvSoundBits)((first_byte >> 1) & 0x1);
  255. msg->type = (FlvSoundType)(first_byte & 0x1);
  256. _buf->cutn(&msg->data, msg_size - 1);
  257. _buf->pop_front(4/* PreviousTagSize0 */);
  258. return butil::Status::OK();
  259. }
  260. butil::Status FlvReader::Read(RtmpMetaData* msg, std::string* name) {
  261. char tags[11];
  262. const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags));
  263. if (p == NULL) {
  264. return butil::Status(EAGAIN, "Fail to read, not enough data");
  265. }
  266. if (*p != FLV_TAG_SCRIPT_DATA) {
  267. return butil::Status(EINVAL, "Fail to parse RtmpScriptMessage");
  268. }
  269. uint32_t msg_size = policy::ReadBigEndian3Bytes(p + 1);
  270. uint32_t timestamp = policy::ReadBigEndian3Bytes(p + 4);
  271. timestamp |= (*(p + 7) << 24);
  272. if (_buf->length() < 11 + msg_size + 4/*PreviousTagSize*/) {
  273. return butil::Status(EAGAIN, "Fail to read, not enough data");
  274. }
  275. _buf->pop_front(11);
  276. butil::IOBuf req_buf;
  277. _buf->cutn(&req_buf, msg_size);
  278. _buf->pop_front(4/* PreviousTagSize0 */);
  279. {
  280. butil::IOBufAsZeroCopyInputStream zc_stream(req_buf);
  281. AMFInputStream istream(&zc_stream);
  282. if (!ReadAMFString(name, &istream)) {
  283. return butil::Status(EINVAL, "Fail to read AMF string");
  284. }
  285. if (!ReadAMFObject(&msg->data, &istream)) {
  286. return butil::Status(EINVAL, "Fail to read AMF object");
  287. }
  288. }
  289. msg->timestamp = timestamp;
  290. return butil::Status::OK();
  291. }
  292. const char* FlvVideoFrameType2Str(FlvVideoFrameType t) {
  293. switch (t) {
  294. case FLV_VIDEO_FRAME_KEYFRAME: return "keyframe";
  295. case FLV_VIDEO_FRAME_INTERFRAME: return "interframe";
  296. case FLV_VIDEO_FRAME_DISPOSABLE_INTERFRAME: return "disposable interframe";
  297. case FLV_VIDEO_FRAME_GENERATED_KEYFRAME: return "generated keyframe";
  298. case FLV_VIDEO_FRAME_INFOFRAME: return "info/command frame";
  299. }
  300. return "Unknown FlvVideoFrameType";
  301. }
  302. const char* FlvVideoCodec2Str(FlvVideoCodec id) {
  303. switch (id) {
  304. case FLV_VIDEO_JPEG: return "JPEG";
  305. case FLV_VIDEO_SORENSON_H263: return "Sorenson H.263";
  306. case FLV_VIDEO_SCREEN_VIDEO: return "Screen video";
  307. case FLV_VIDEO_ON2_VP6: return "On2 VP6";
  308. case FLV_VIDEO_ON2_VP6_WITH_ALPHA_CHANNEL:
  309. return "On2 VP6 with alpha channel";
  310. case FLV_VIDEO_SCREEN_VIDEO_V2: return "Screen video version 2";
  311. case FLV_VIDEO_AVC: return "AVC";
  312. case FLV_VIDEO_HEVC: return "H.265";
  313. }
  314. return "Unknown FlvVideoCodec";
  315. }
  316. const char* FlvAudioCodec2Str(FlvAudioCodec codec) {
  317. switch (codec) {
  318. case FLV_AUDIO_LINEAR_PCM_PLATFORM_ENDIAN:
  319. return "Linear PCM, platform endian";
  320. case FLV_AUDIO_ADPCM: return "ADPCM";
  321. case FLV_AUDIO_MP3: return "MP3";
  322. case FLV_AUDIO_LINEAR_PCM_LITTLE_ENDIAN:
  323. return "Linear PCM, little endian";
  324. case FLV_AUDIO_NELLYMOSER_16KHZ_MONO:
  325. return "Nellymoser 16-kHz mono";
  326. case FLV_AUDIO_NELLYMOSER_8KHZ_MONO:
  327. return "Nellymoser 8-kHz mono";
  328. case FLV_AUDIO_NELLYMOSER:
  329. return "Nellymoser";
  330. case FLV_AUDIO_G711_ALAW_LOGARITHMIC_PCM:
  331. return "G.711 A-law logarithmic PCM";
  332. case FLV_AUDIO_G711_MULAW_LOGARITHMIC_PCM:
  333. return "G.711 mu-law logarithmic PCM";
  334. case FLV_AUDIO_RESERVED:
  335. return "reserved";
  336. case FLV_AUDIO_AAC: return "AAC";
  337. case FLV_AUDIO_SPEEX: return "Speex";
  338. case FLV_AUDIO_MP3_8KHZ: return "MP3 8-Khz";
  339. case FLV_AUDIO_DEVICE_SPECIFIC_SOUND:
  340. return "Device-specific sound";
  341. }
  342. return "Unknown FlvAudioCodec";
  343. }
  344. const char* FlvSoundRate2Str(FlvSoundRate rate) {
  345. switch (rate) {
  346. case FLV_SOUND_RATE_5512HZ: return "5512";
  347. case FLV_SOUND_RATE_11025HZ: return "11025";
  348. case FLV_SOUND_RATE_22050HZ: return "22050";
  349. case FLV_SOUND_RATE_44100HZ: return "44100";
  350. }
  351. return "Unknown FlvSoundRate";
  352. }
  353. const char* FlvSoundBits2Str(FlvSoundBits size) {
  354. switch (size) {
  355. case FLV_SOUND_8BIT: return "8";
  356. case FLV_SOUND_16BIT: return "16";
  357. }
  358. return "Unknown FlvSoundBits";
  359. }
  360. const char* FlvSoundType2Str(FlvSoundType t) {
  361. switch (t) {
  362. case FLV_SOUND_MONO: return "mono";
  363. case FLV_SOUND_STEREO: return "stereo";
  364. }
  365. return "Unknown FlvSoundType";
  366. }
  367. std::ostream& operator<<(std::ostream& os, const RtmpAudioMessage& msg) {
  368. return os << "AudioMessage{timestamp=" << msg.timestamp
  369. << " codec=" << FlvAudioCodec2Str(msg.codec)
  370. << " rate=" << FlvSoundRate2Str(msg.rate)
  371. << " bits=" << FlvSoundBits2Str(msg.bits)
  372. << " type=" << FlvSoundType2Str(msg.type)
  373. << " data=" << butil::ToPrintable(msg.data) << '}';
  374. }
  375. std::ostream& operator<<(std::ostream& os, const RtmpVideoMessage& msg) {
  376. return os << "VideoMessage{timestamp=" << msg.timestamp
  377. << " type=" << FlvVideoFrameType2Str(msg.frame_type)
  378. << " codec=" << FlvVideoCodec2Str(msg.codec)
  379. << " data=" << butil::ToPrintable(msg.data) << '}';
  380. }
  381. butil::Status RtmpAACMessage::Create(const RtmpAudioMessage& msg) {
  382. if (msg.codec != FLV_AUDIO_AAC) {
  383. return butil::Status(EINVAL, "codec=%s is not AAC",
  384. FlvAudioCodec2Str(msg.codec));
  385. }
  386. const uint8_t* p = (const uint8_t*)msg.data.fetch1();
  387. if (p == NULL) {
  388. return butil::Status(EINVAL, "Not enough data in AudioMessage");
  389. }
  390. if (*p > FLV_AAC_PACKET_RAW) {
  391. return butil::Status(EINVAL, "Invalid AAC packet_type=%d", (int)*p);
  392. }
  393. this->timestamp = msg.timestamp;
  394. this->rate = msg.rate;
  395. this->bits = msg.bits;
  396. this->type = msg.type;
  397. this->packet_type = (FlvAACPacketType)*p;
  398. msg.data.append_to(&data, msg.data.size() - 1, 1);
  399. return butil::Status::OK();
  400. }
  401. AudioSpecificConfig::AudioSpecificConfig()
  402. : aac_object(AAC_OBJECT_UNKNOWN)
  403. , aac_sample_rate(0)
  404. , aac_channels(0) {
  405. }
  406. butil::Status AudioSpecificConfig::Create(const butil::IOBuf& buf) {
  407. if (buf.size() < 2u) {
  408. return butil::Status(EINVAL, "data_size=%" PRIu64 " is too short",
  409. (uint64_t)buf.size());
  410. }
  411. char tmpbuf[2];
  412. buf.copy_to(tmpbuf, arraysize(tmpbuf));
  413. return Create(tmpbuf, arraysize(tmpbuf));
  414. }
  415. butil::Status AudioSpecificConfig::Create(const void* data, size_t len) {
  416. if (len < 2u) {
  417. return butil::Status(EINVAL, "data_size=%" PRIu64 " is too short", (uint64_t)len);
  418. }
  419. uint8_t profile_ObjectType = ((const char*)data)[0];
  420. uint8_t samplingFrequencyIndex = ((const char*)data)[1];
  421. aac_channels = (samplingFrequencyIndex >> 3) & 0x0f;
  422. aac_sample_rate = ((profile_ObjectType << 1) & 0x0e) | ((samplingFrequencyIndex >> 7) & 0x01);
  423. aac_object = (AACObjectType)((profile_ObjectType >> 3) & 0x1f);
  424. if (aac_object == AAC_OBJECT_UNKNOWN) {
  425. return butil::Status(EINVAL, "Invalid object type");
  426. }
  427. return butil::Status::OK();
  428. }
  429. bool RtmpAudioMessage::IsAACSequenceHeader() const {
  430. if (codec != FLV_AUDIO_AAC) {
  431. return false;
  432. }
  433. const uint8_t* p = (const uint8_t*)data.fetch1();
  434. if (p == NULL) {
  435. return false;
  436. }
  437. return *p == FLV_AAC_PACKET_SEQUENCE_HEADER;
  438. }
  439. butil::Status RtmpAVCMessage::Create(const RtmpVideoMessage& msg) {
  440. if (msg.codec != FLV_VIDEO_AVC) {
  441. return butil::Status(EINVAL, "codec=%s is not AVC",
  442. FlvVideoCodec2Str(msg.codec));
  443. }
  444. uint8_t buf[4];
  445. const uint8_t* p = (const uint8_t*)msg.data.fetch(buf, sizeof(buf));
  446. if (p == NULL) {
  447. return butil::Status(EINVAL, "Not enough data in VideoMessage");
  448. }
  449. if (*p > FLV_AVC_PACKET_END_OF_SEQUENCE) {
  450. return butil::Status(EINVAL, "Invalid AVC packet_type=%d", (int)*p);
  451. }
  452. this->timestamp = msg.timestamp;
  453. this->frame_type = msg.frame_type;
  454. this->packet_type = (FlvAVCPacketType)*p;
  455. this->composition_time = policy::ReadBigEndian3Bytes(p + 1);
  456. msg.data.append_to(&data, msg.data.size() - 4, 4);
  457. return butil::Status::OK();
  458. }
  459. bool RtmpVideoMessage::IsAVCSequenceHeader() const {
  460. if (codec != FLV_VIDEO_AVC || frame_type != FLV_VIDEO_FRAME_KEYFRAME) {
  461. return false;
  462. }
  463. const uint8_t* p = (const uint8_t*)data.fetch1();
  464. if (p == NULL) {
  465. return false;
  466. }
  467. return *p == FLV_AVC_PACKET_SEQUENCE_HEADER;
  468. }
  469. bool RtmpVideoMessage::IsHEVCSequenceHeader() const {
  470. if (codec != FLV_VIDEO_HEVC || frame_type != FLV_VIDEO_FRAME_KEYFRAME) {
  471. return false;
  472. }
  473. const uint8_t* p = (const uint8_t*)data.fetch1();
  474. if (p == NULL) {
  475. return false;
  476. }
  477. return *p == FLV_AVC_PACKET_SEQUENCE_HEADER;
  478. }
  479. const char* AVCProfile2Str(AVCProfile p) {
  480. switch (p) {
  481. case AVC_PROFILE_BASELINE: return "Baseline";
  482. case AVC_PROFILE_CONSTRAINED_BASELINE: return "ConstrainedBaseline";
  483. case AVC_PROFILE_MAIN: return "Main";
  484. case AVC_PROFILE_EXTENDED: return "Extended";
  485. case AVC_PROFILE_HIGH: return "High";
  486. case AVC_PROFILE_HIGH10: return "High10";
  487. case AVC_PROFILE_HIGH10_INTRA: return "High10Intra";
  488. case AVC_PROFILE_HIGH422: return "High422";
  489. case AVC_PROFILE_HIGH422_INTRA: return "High422Intra";
  490. case AVC_PROFILE_HIGH444: return "High444";
  491. case AVC_PROFILE_HIGH444_PREDICTIVE: return "High444Predictive";
  492. case AVC_PROFILE_HIGH444_INTRA: return "High444Intra";
  493. }
  494. return "Unknown";
  495. }
  496. AVCDecoderConfigurationRecord::AVCDecoderConfigurationRecord()
  497. : width(0)
  498. , height(0)
  499. , avc_profile((AVCProfile)0)
  500. , avc_level((AVCLevel)0)
  501. , length_size_minus1(-1) {
  502. }
  503. std::ostream& operator<<(std::ostream& os,
  504. const AVCDecoderConfigurationRecord& r) {
  505. os << "{profile=" << AVCProfile2Str(r.avc_profile)
  506. << " level=" << (int)r.avc_level
  507. << " length_size_minus1=" << (int)r.length_size_minus1
  508. << " width=" << r.width
  509. << " height=" << r.height
  510. << " sps=[";
  511. for (size_t i = 0; i < r.sps_list.size(); ++i) {
  512. if (i) {
  513. os << ' ';
  514. }
  515. os << r.sps_list[i].size();
  516. }
  517. os << "] pps=[";
  518. for (size_t i = 0; i < r.pps_list.size(); ++i) {
  519. if (i) {
  520. os << ' ';
  521. }
  522. os << r.pps_list[i].size();
  523. }
  524. os << "]}";
  525. return os;
  526. }
  527. butil::Status AVCDecoderConfigurationRecord::Create(const butil::IOBuf& buf) {
  528. // the buf should be short generally, copy it out to continuous memory
  529. // to simplify parsing.
  530. DEFINE_SMALL_ARRAY(char, cont_buf, buf.size(), 64);
  531. buf.copy_to(cont_buf, buf.size());
  532. return Create(cont_buf, buf.size());
  533. }
  534. butil::Status AVCDecoderConfigurationRecord::Create(const void* data, size_t len) {
  535. butil::StringPiece buf((const char*)data, len);
  536. if (buf.size() < 6) {
  537. return butil::Status(EINVAL, "Length=%lu is not long enough",
  538. (unsigned long)buf.size());
  539. }
  540. // skip configurationVersion at buf[0]
  541. avc_profile = (AVCProfile)buf[1];
  542. // skip profile_compatibility at buf[2]
  543. avc_level = (AVCLevel)buf[3];
  544. // 5.3.4.2.1 Syntax, H.264-AVC-ISO_IEC_14496-15.pdf, page 16
  545. // 5.2.4.1 AVC decoder configuration record
  546. // 5.2.4.1.2 Semantics
  547. // The value of this field shall be one of 0, 1, or 3 corresponding to a
  548. // length encoded with 1, 2, or 4 bytes, respectively.
  549. length_size_minus1 = buf[4] & 0x03;
  550. if (length_size_minus1 == 2) {
  551. return butil::Status(EINVAL, "lengthSizeMinusOne should never be 2");
  552. }
  553. // Parsing SPS
  554. const int num_sps = (int)(buf[5] & 0x1f);
  555. buf.remove_prefix(6);
  556. sps_list.clear();
  557. sps_list.reserve(num_sps);
  558. for (int i = 0; i < num_sps; ++i) {
  559. if (buf.size() < 2) {
  560. return butil::Status(EINVAL, "Not enough data to decode SPS-length");
  561. }
  562. const uint16_t sps_length = policy::ReadBigEndian2Bytes(buf.data());
  563. if (buf.size() < 2u + sps_length) {
  564. return butil::Status(EINVAL, "Not enough data to decode SPS");
  565. }
  566. if (sps_length > 0) {
  567. butil::Status st = ParseSPS(buf.data() + 2, sps_length);
  568. if (!st.ok()) {
  569. return st;
  570. }
  571. sps_list.push_back(buf.substr(2, sps_length).as_string());
  572. }
  573. buf.remove_prefix(2 + sps_length);
  574. }
  575. // Parsing PPS
  576. pps_list.clear();
  577. if (buf.empty()) {
  578. return butil::Status(EINVAL, "Not enough data to decode PPS");
  579. }
  580. const int num_pps = (int)buf[0];
  581. buf.remove_prefix(1);
  582. for (int i = 0; i < num_pps; ++i) {
  583. if (buf.size() < 2) {
  584. return butil::Status(EINVAL, "Not enough data to decode PPS-length");
  585. }
  586. const uint16_t pps_length = policy::ReadBigEndian2Bytes(buf.data());
  587. if (buf.size() < 2u + pps_length) {
  588. return butil::Status(EINVAL, "Not enough data to decode PPS");
  589. }
  590. if (pps_length > 0) {
  591. pps_list.push_back(buf.substr(2, pps_length).as_string());
  592. }
  593. buf.remove_prefix(2 + pps_length);
  594. }
  595. return butil::Status::OK();
  596. }
  597. butil::Status AVCDecoderConfigurationRecord::ParseSPS(
  598. const butil::StringPiece& buf, size_t sps_length) {
  599. // for NALU, 7.3.1 NAL unit syntax
  600. // H.264-AVC-ISO_IEC_14496-10-2012.pdf, page 61.
  601. if (buf.empty()) {
  602. return butil::Status(EINVAL, "SPS is empty");
  603. }
  604. const int8_t nutv = buf[0];
  605. const int8_t forbidden_zero_bit = (nutv >> 7) & 0x01;
  606. if (forbidden_zero_bit) {
  607. return butil::Status(EINVAL, "forbidden_zero_bit shall equal 0");
  608. }
  609. // nal_ref_idc not equal to 0 specifies that the content of the NAL unit
  610. // contains:
  611. // a sequence parameter set
  612. // or a picture parameter set
  613. // or a slice of a reference picture
  614. // or a slice data partition of a reference picture.
  615. int8_t nal_ref_idc = (nutv >> 5) & 0x03;
  616. if (!nal_ref_idc) {
  617. return butil::Status(EINVAL, "nal_ref_idc is 0");
  618. }
  619. // 7.4.1 NAL unit semantics
  620. // H.264-AVC-ISO_IEC_14496-10-2012.pdf, page 61.
  621. // nal_unit_type specifies the type of RBSP data structure contained in
  622. // the NAL unit as specified in Table 7-1.
  623. const AVCNaluType nal_unit_type = (AVCNaluType)(nutv & 0x1f);
  624. if (nal_unit_type != AVC_NALU_SPS) {
  625. return butil::Status(EINVAL, "nal_unit_type is not %d", (int)AVC_NALU_SPS);
  626. }
  627. // Extract the rbsp from sps.
  628. DEFINE_SMALL_ARRAY(char, rbsp, sps_length - 1, 64);
  629. buf.copy(rbsp, sps_length - 1, 1);
  630. size_t rbsp_len = 0;
  631. for (size_t i = 1; i < sps_length; ++i) {
  632. // XX 00 00 03 XX, the 03 byte should be dropped.
  633. if (!(i >= 3 && buf[i - 2] == 0 && buf[i - 1] == 0 && buf[i] == 3)) {
  634. rbsp[rbsp_len++] = buf[i];
  635. }
  636. }
  637. // for SPS, 7.3.2.1.1 Sequence parameter set data syntax
  638. // H.264-AVC-ISO_IEC_14496-10-2012.pdf, page 62.
  639. if (rbsp_len < 3) {
  640. return butil::Status(EINVAL, "rbsp must be at least 3 bytes");
  641. }
  642. // Decode rbsp.
  643. const char* p = rbsp;
  644. uint8_t profile_idc = *p++;
  645. if (!profile_idc) {
  646. return butil::Status(EINVAL, "profile_idc is 0");
  647. }
  648. int8_t flags = *p++;
  649. if (flags & 0x03) {
  650. return butil::Status(EINVAL, "Invalid flags=%d", (int)flags);
  651. }
  652. uint8_t level_idc = *p++;
  653. if (!level_idc) {
  654. return butil::Status(EINVAL, "level_idc is 0");
  655. }
  656. BitStream bs(p, rbsp + rbsp_len - p);
  657. int32_t seq_parameter_set_id = -1;
  658. if (avc_nalu_read_uev(&bs, &seq_parameter_set_id) != 0) {
  659. return butil::Status(EINVAL, "Fail to read seq_parameter_set_id");
  660. }
  661. if (seq_parameter_set_id < 0) {
  662. return butil::Status(EINVAL, "Invalid seq_parameter_set_id=%d",
  663. (int)seq_parameter_set_id);
  664. }
  665. int32_t chroma_format_idc = -1;
  666. if (profile_idc == 100 || profile_idc == 110 || profile_idc == 122 ||
  667. profile_idc == 244 || profile_idc == 44 || profile_idc == 83 ||
  668. profile_idc == 86 || profile_idc == 118 || profile_idc == 128) {
  669. if (avc_nalu_read_uev(&bs, &chroma_format_idc) != 0) {
  670. return butil::Status(EINVAL, "Fail to read chroma_format_idc");
  671. }
  672. if (chroma_format_idc == 3) {
  673. int8_t separate_colour_plane_flag = -1;
  674. if (avc_nalu_read_bit(&bs, &separate_colour_plane_flag) != 0) {
  675. return butil::Status(EINVAL, "Fail to read separate_colour_plane_flag");
  676. }
  677. }
  678. int32_t bit_depth_luma_minus8 = -1;
  679. if (avc_nalu_read_uev(&bs, &bit_depth_luma_minus8) != 0) {
  680. return butil::Status(EINVAL, "Fail to read bit_depth_luma_minus8");
  681. }
  682. int32_t bit_depth_chroma_minus8 = -1;
  683. if (avc_nalu_read_uev(&bs, &bit_depth_chroma_minus8) != 0) {
  684. return butil::Status(EINVAL, "Fail to read bit_depth_chroma_minus8");
  685. }
  686. int8_t qpprime_y_zero_transform_bypass_flag = -1;
  687. if (avc_nalu_read_bit(&bs, &qpprime_y_zero_transform_bypass_flag) != 0) {
  688. return butil::Status(EINVAL, "Fail to read qpprime_y_zero_transform_bypass_flag");
  689. }
  690. int8_t seq_scaling_matrix_present_flag = -1;
  691. if (avc_nalu_read_bit(&bs, &seq_scaling_matrix_present_flag) != 0) {
  692. return butil::Status(EINVAL, "Fail to read seq_scaling_matrix_present_flag");
  693. }
  694. if (seq_scaling_matrix_present_flag) {
  695. int nb_scmpfs = (chroma_format_idc != 3 ? 8 : 12);
  696. for (int i = 0; i < nb_scmpfs; i++) {
  697. int8_t seq_scaling_matrix_present_flag_i = -1;
  698. if (avc_nalu_read_bit(&bs, &seq_scaling_matrix_present_flag_i)) {
  699. return butil::Status(EINVAL, "Fail to read seq_scaling_"
  700. "matrix_present_flag[%d]", i);
  701. }
  702. if (seq_scaling_matrix_present_flag_i) {
  703. return butil::Status(EINVAL, "Invalid seq_scaling_matrix_"
  704. "present_flag[%d]=%d nb_scmpfs=%d",
  705. i, (int)seq_scaling_matrix_present_flag_i,
  706. nb_scmpfs);
  707. }
  708. }
  709. }
  710. }
  711. int32_t log2_max_frame_num_minus4 = -1;
  712. if (avc_nalu_read_uev(&bs, &log2_max_frame_num_minus4) != 0) {
  713. return butil::Status(EINVAL, "Fail to read log2_max_frame_num_minus4");
  714. }
  715. int32_t pic_order_cnt_type = -1;
  716. if (avc_nalu_read_uev(&bs, &pic_order_cnt_type) != 0) {
  717. return butil::Status(EINVAL, "Fail to read pic_order_cnt_type");
  718. }
  719. if (pic_order_cnt_type == 0) {
  720. int32_t log2_max_pic_order_cnt_lsb_minus4 = -1;
  721. if (avc_nalu_read_uev(&bs, &log2_max_pic_order_cnt_lsb_minus4) != 0) {
  722. return butil::Status(EINVAL, "Fail to read log2_max_pic_order_cnt_lsb_minus4");
  723. }
  724. } else if (pic_order_cnt_type == 1) {
  725. int8_t delta_pic_order_always_zero_flag = -1;
  726. if (avc_nalu_read_bit(&bs, &delta_pic_order_always_zero_flag) != 0) {
  727. return butil::Status(EINVAL, "Fail to read delta_pic_order_always_zero_flag");
  728. }
  729. int32_t offset_for_non_ref_pic = -1;
  730. if (avc_nalu_read_uev(&bs, &offset_for_non_ref_pic) != 0) {
  731. return butil::Status(EINVAL, "Fail to read offset_for_non_ref_pic");
  732. }
  733. int32_t offset_for_top_to_bottom_field = -1;
  734. if (avc_nalu_read_uev(&bs, &offset_for_top_to_bottom_field) != 0) {
  735. return butil::Status(EINVAL, "Fail to read offset_for_top_to_bottom_field");
  736. }
  737. int32_t num_ref_frames_in_pic_order_cnt_cycle = -1;
  738. if (avc_nalu_read_uev(&bs, &num_ref_frames_in_pic_order_cnt_cycle) != 0) {
  739. return butil::Status(EINVAL, "Fail to read num_ref_frames_in_pic_order_cnt_cycle");
  740. }
  741. if (num_ref_frames_in_pic_order_cnt_cycle) {
  742. return butil::Status(EINVAL, "Invalid num_ref_frames_in_pic_order_cnt_cycle=%d",
  743. num_ref_frames_in_pic_order_cnt_cycle);
  744. }
  745. }
  746. int32_t max_num_ref_frames = -1;
  747. if (avc_nalu_read_uev(&bs, &max_num_ref_frames) != 0) {
  748. return butil::Status(EINVAL, "Fail to read max_num_ref_frames");
  749. }
  750. int8_t gaps_in_frame_num_value_allowed_flag = -1;
  751. if (avc_nalu_read_bit(&bs, &gaps_in_frame_num_value_allowed_flag) != 0) {
  752. return butil::Status(EINVAL, "Fail to read gaps_in_frame_num_value_allowed_flag");
  753. }
  754. int32_t pic_width_in_mbs_minus1 = -1;
  755. if (avc_nalu_read_uev(&bs, &pic_width_in_mbs_minus1) != 0) {
  756. return butil::Status(EINVAL, "Fail to read pic_width_in_mbs_minus1");
  757. }
  758. int32_t pic_height_in_map_units_minus1 = -1;
  759. if (avc_nalu_read_uev(&bs, &pic_height_in_map_units_minus1) != 0) {
  760. return butil::Status(EINVAL, "Fail to read pic_height_in_map_units_minus1");
  761. }
  762. width = (int)(pic_width_in_mbs_minus1 + 1) * 16;
  763. height = (int)(pic_height_in_map_units_minus1 + 1) * 16;
  764. return butil::Status::OK();
  765. }
  766. static bool find_avc_annexb_nalu_start_code(const butil::IOBuf& buf,
  767. size_t* start_code_length) {
  768. size_t consecutive_zero_count = 0;
  769. for (butil::IOBufBytesIterator it(buf); it != NULL; ++it) {
  770. char c = *it;
  771. if (c == 0) {
  772. ++consecutive_zero_count;
  773. } else if (c == 1) {
  774. if (consecutive_zero_count >= 2) {
  775. if (start_code_length) {
  776. *start_code_length = consecutive_zero_count + 1;
  777. }
  778. return true;
  779. }
  780. return false;
  781. } else {
  782. return false;
  783. }
  784. }
  785. return false;
  786. }
  787. static void find_avc_annexb_nalu_stop_code(const butil::IOBuf& buf,
  788. size_t* nalu_length_out,
  789. size_t* stop_code_length) {
  790. size_t nalu_length = 0;
  791. size_t consecutive_zero_count = 0;
  792. for (butil::IOBufBytesIterator it(buf); it != NULL; ++it) {
  793. unsigned char c = (unsigned char)*it;
  794. if (c > 1) { // most frequent
  795. ++nalu_length;
  796. consecutive_zero_count = 0;
  797. continue;
  798. }
  799. if (c == 0) {
  800. ++consecutive_zero_count;
  801. } else { // c == 1
  802. if (consecutive_zero_count >= 2) {
  803. if (nalu_length_out) {
  804. *nalu_length_out = nalu_length;
  805. }
  806. if (stop_code_length) {
  807. *stop_code_length = consecutive_zero_count + 1;
  808. }
  809. return;
  810. }
  811. ++nalu_length;
  812. consecutive_zero_count = 0;
  813. }
  814. }
  815. if (nalu_length_out) {
  816. *nalu_length_out = nalu_length + consecutive_zero_count;
  817. }
  818. if (stop_code_length) {
  819. *stop_code_length = 0;
  820. }
  821. }
  822. AVCNaluIterator::AVCNaluIterator(butil::IOBuf* data, uint32_t length_size_minus1,
  823. AVCNaluFormat* format)
  824. : _data(data)
  825. , _format(format)
  826. , _length_size_minus1(length_size_minus1)
  827. , _nalu_type(AVC_NALU_EMPTY) {
  828. if (_data) {
  829. ++*this;
  830. }
  831. }
  832. AVCNaluIterator::~AVCNaluIterator() {
  833. }
  834. void AVCNaluIterator::operator++() {
  835. if (*_format == AVC_NALU_FORMAT_ANNEXB) {
  836. if (!next_as_annexb()) {
  837. return set_end();
  838. }
  839. } else if (*_format == AVC_NALU_FORMAT_IBMF) {
  840. if (!next_as_ibmf()) {
  841. return set_end();
  842. }
  843. } else {
  844. size_t start_code_length = 0;
  845. if (find_avc_annexb_nalu_start_code(*_data, &start_code_length) &&
  846. _data->size() > start_code_length) {
  847. if (start_code_length > 0) {
  848. _data->pop_front(start_code_length);
  849. }
  850. *_format = AVC_NALU_FORMAT_ANNEXB;
  851. if (!next_as_annexb()) {
  852. return set_end();
  853. }
  854. } else if (next_as_ibmf()) {
  855. *_format = AVC_NALU_FORMAT_IBMF;
  856. } else {
  857. set_end();
  858. }
  859. }
  860. }
  861. bool AVCNaluIterator::next_as_annexb() {
  862. if (_data->empty()) {
  863. return false;
  864. }
  865. size_t nalu_length = 0;
  866. size_t stop_code_length = 0;
  867. find_avc_annexb_nalu_stop_code(*_data, &nalu_length, &stop_code_length);
  868. _cur_nalu.clear();
  869. _nalu_type = AVC_NALU_EMPTY;
  870. if (nalu_length) {
  871. _data->cutn(&_cur_nalu, nalu_length);
  872. const uint8_t byte0 = *(const uint8_t*)_cur_nalu.fetch1();
  873. _nalu_type = (AVCNaluType)(byte0 & 0x1f);
  874. }
  875. if (stop_code_length) {
  876. _data->pop_front(stop_code_length);
  877. }
  878. return true;
  879. }
  880. bool AVCNaluIterator::next_as_ibmf() {
  881. // The value of this field shall be one of 0, 1, or 3 corresponding to a
  882. // length encoded with 1, 2, or 4 bytes, respectively.
  883. CHECK_NE(_length_size_minus1, 2u);
  884. if (_data->empty()) {
  885. return false;
  886. }
  887. if (_data->size() < _length_size_minus1 + 1) {
  888. LOG(ERROR) << "Not enough data to decode length of NALU";
  889. return false;
  890. }
  891. int32_t nalu_length = 0;
  892. char buf[4];
  893. if (_length_size_minus1 == 3) {
  894. _data->copy_to(buf, 4);
  895. nalu_length = policy::ReadBigEndian4Bytes(buf);
  896. } else if (_length_size_minus1 == 1) {
  897. _data->copy_to(buf, 2);
  898. nalu_length = policy::ReadBigEndian2Bytes(buf);
  899. } else {
  900. _data->copy_to(buf, 1);
  901. nalu_length = *buf;
  902. }
  903. // maybe stream is invalid format.
  904. // see: https://github.com/ossrs/srs/issues/183
  905. if (nalu_length < 0) {
  906. LOG(ERROR) << "Invalid nalu_length=" << nalu_length;
  907. return false;
  908. }
  909. if (_data->size() < _length_size_minus1 + 1 + nalu_length) {
  910. LOG(ERROR) << "Not enough data to decode NALU";
  911. return false;
  912. }
  913. _data->pop_front(_length_size_minus1 + 1);
  914. _cur_nalu.clear();
  915. _nalu_type = AVC_NALU_EMPTY;
  916. if (nalu_length) {
  917. _data->cutn(&_cur_nalu, nalu_length);
  918. const uint8_t byte0 = *(const uint8_t*)_cur_nalu.fetch1();
  919. _nalu_type = (AVCNaluType)(byte0 & 0x1f);
  920. }
  921. return true;
  922. }
  923. RtmpClientOptions::RtmpClientOptions()
  924. : fpad(false)
  925. , audioCodecs((RtmpAudioCodec)3575) // Copy from SRS
  926. , videoCodecs((RtmpVideoCodec)252) // Copy from SRS
  927. , videoFunction(RTMP_VIDEO_FUNCTION_CLIENT_SEEK)
  928. , timeout_ms(1000)
  929. , connect_timeout_ms(500)
  930. , buffer_length_ms(1000)
  931. , chunk_size(policy::RTMP_DEFAULT_CHUNK_SIZE)
  932. , window_ack_size(policy::RTMP_DEFAULT_WINDOW_ACK_SIZE)
  933. , simplified_rtmp(false) {
  934. }
  935. // Shared by RtmpClient and RtmpClientStream(s)
  936. class RtmpClientImpl : public SharedObject {
  937. friend class RtmpClientStream;
  938. public:
  939. RtmpClientImpl() {
  940. get_rtmp_bvars()->client_count << 1;
  941. }
  942. ~RtmpClientImpl() {
  943. get_rtmp_bvars()->client_count << -1;
  944. RPC_VLOG << "Destroying RtmpClientImpl=" << this;
  945. }
  946. // Specify the servers to connect.
  947. int Init(butil::EndPoint server_addr_and_port,
  948. const RtmpClientOptions& options);
  949. int Init(const char* server_addr_and_port,
  950. const RtmpClientOptions& options);
  951. int Init(const char* server_addr, int port,
  952. const RtmpClientOptions& options);
  953. int Init(const char* naming_service_url,
  954. const char* load_balancer_name,
  955. const RtmpClientOptions& options);
  956. const RtmpClientOptions& options() const { return _connect_options; }
  957. SocketMap& socket_map() { return _socket_map; }
  958. int CreateSocket(const butil::EndPoint& pt, SocketId* id);
  959. private:
  960. DISALLOW_COPY_AND_ASSIGN(RtmpClientImpl);
  961. int CommonInit(const RtmpClientOptions& options);
  962. Channel _chan;
  963. RtmpClientOptions _connect_options;
  964. SocketMap _socket_map;
  965. };
  966. class RtmpConnect : public AppConnect {
  967. public:
  968. // @AppConnect
  969. void StartConnect(const Socket* s, void (*done)(int, void*), void* data) override;
  970. void StopConnect(Socket* s) override;
  971. };
  972. void RtmpConnect::StartConnect(
  973. const Socket* s, void (*done)(int, void*), void* data) {
  974. RPC_VLOG << "Establish rtmp-level connection on " << *s;
  975. policy::RtmpContext* ctx =
  976. static_cast<policy::RtmpContext*>(s->parsing_context());
  977. if (ctx == NULL) {
  978. LOG(FATAL) << "RtmpContext of " << *s << " is NULL";
  979. return done(EINVAL, data);
  980. }
  981. const RtmpClientOptions* _client_options = ctx->client_options();
  982. if (_client_options && _client_options->simplified_rtmp) {
  983. ctx->set_simplified_rtmp(true);
  984. if (ctx->SendConnectRequest(s->remote_side(), s->fd(), true) != 0) {
  985. LOG(ERROR) << s->remote_side() << ": Fail to send simple connect";
  986. return done(EINVAL, data);
  987. }
  988. ctx->SetState(s->remote_side(), policy::RtmpContext::STATE_RECEIVED_S2);
  989. ctx->set_create_stream_with_play_or_publish(true);
  990. return done(0, data);
  991. }
  992. // Save to callback to call when RTMP connect is done.
  993. ctx->SetConnectCallback(done, data);
  994. // Initiate the rtmp handshake.
  995. bool is_simple_handshake = false;
  996. if (policy::SendC0C1(s->fd(), &is_simple_handshake) != 0) {
  997. LOG(ERROR) << s->remote_side() << ": Fail to send C0 C1";
  998. return done(EINVAL, data);
  999. }
  1000. if (is_simple_handshake) {
  1001. ctx->only_check_simple_s0s1();
  1002. }
  1003. }
  1004. void RtmpConnect::StopConnect(Socket* s) {
  1005. policy::RtmpContext* ctx =
  1006. static_cast<policy::RtmpContext*>(s->parsing_context());
  1007. if (ctx == NULL) {
  1008. LOG(FATAL) << "RtmpContext of " << *s << " is NULL";
  1009. } else {
  1010. ctx->OnConnected(EFAILEDSOCKET);
  1011. }
  1012. }
  1013. class RtmpSocketCreator : public SocketCreator {
  1014. public:
  1015. RtmpSocketCreator(const RtmpClientOptions& connect_options)
  1016. : _connect_options(connect_options) {
  1017. }
  1018. int CreateSocket(const SocketOptions& opt, SocketId* id) {
  1019. SocketOptions sock_opt = opt;
  1020. sock_opt.app_connect = std::make_shared<RtmpConnect>();
  1021. sock_opt.initial_parsing_context = new policy::RtmpContext(&_connect_options, NULL);
  1022. return get_client_side_messenger()->Create(sock_opt, id);
  1023. }
  1024. private:
  1025. RtmpClientOptions _connect_options;
  1026. };
  1027. int RtmpClientImpl::CreateSocket(const butil::EndPoint& pt, SocketId* id) {
  1028. SocketOptions sock_opt;
  1029. sock_opt.remote_side = pt;
  1030. sock_opt.app_connect = std::make_shared<RtmpConnect>();
  1031. sock_opt.initial_parsing_context = new policy::RtmpContext(&_connect_options, NULL);
  1032. return get_client_side_messenger()->Create(sock_opt, id);
  1033. }
  1034. int RtmpClientImpl::CommonInit(const RtmpClientOptions& options) {
  1035. _connect_options = options;
  1036. SocketMapOptions sm_options;
  1037. sm_options.socket_creator = new RtmpSocketCreator(_connect_options);
  1038. if (_socket_map.Init(sm_options) != 0) {
  1039. LOG(ERROR) << "Fail to init _socket_map";
  1040. return -1;
  1041. }
  1042. return 0;
  1043. }
  1044. int RtmpClientImpl::Init(butil::EndPoint server_addr_and_port,
  1045. const RtmpClientOptions& options) {
  1046. if (CommonInit(options) != 0) {
  1047. return -1;
  1048. }
  1049. ChannelOptions copts;
  1050. copts.connect_timeout_ms = options.connect_timeout_ms;
  1051. copts.timeout_ms = options.timeout_ms;
  1052. copts.protocol = PROTOCOL_RTMP;
  1053. return _chan.Init(server_addr_and_port, &copts);
  1054. }
  1055. int RtmpClientImpl::Init(const char* server_addr_and_port,
  1056. const RtmpClientOptions& options) {
  1057. if (CommonInit(options) != 0) {
  1058. return -1;
  1059. }
  1060. ChannelOptions copts;
  1061. copts.connect_timeout_ms = options.connect_timeout_ms;
  1062. copts.timeout_ms = options.timeout_ms;
  1063. copts.protocol = PROTOCOL_RTMP;
  1064. return _chan.Init(server_addr_and_port, &copts);
  1065. }
  1066. int RtmpClientImpl::Init(const char* server_addr, int port,
  1067. const RtmpClientOptions& options) {
  1068. if (CommonInit(options) != 0) {
  1069. return -1;
  1070. }
  1071. ChannelOptions copts;
  1072. copts.connect_timeout_ms = options.connect_timeout_ms;
  1073. copts.timeout_ms = options.timeout_ms;
  1074. copts.protocol = PROTOCOL_RTMP;
  1075. return _chan.Init(server_addr, port, &copts);
  1076. }
  1077. int RtmpClientImpl::Init(const char* naming_service_url,
  1078. const char* load_balancer_name,
  1079. const RtmpClientOptions& options) {
  1080. if (CommonInit(options) != 0) {
  1081. return -1;
  1082. }
  1083. ChannelOptions copts;
  1084. copts.connect_timeout_ms = options.connect_timeout_ms;
  1085. copts.timeout_ms = options.timeout_ms;
  1086. copts.protocol = PROTOCOL_RTMP;
  1087. return _chan.Init(naming_service_url, load_balancer_name, &copts);
  1088. }
  1089. RtmpClient::RtmpClient() {}
  1090. RtmpClient::~RtmpClient() {}
  1091. RtmpClient::RtmpClient(const RtmpClient& rhs) : _impl(rhs._impl) {}
  1092. RtmpClient& RtmpClient::operator=(const RtmpClient& rhs) {
  1093. _impl = rhs._impl;
  1094. return *this;
  1095. }
  1096. const RtmpClientOptions& RtmpClient::options() const {
  1097. if (_impl) {
  1098. return _impl->options();
  1099. } else {
  1100. static RtmpClientOptions dft_opt;
  1101. return dft_opt;
  1102. }
  1103. }
  1104. int RtmpClient::Init(butil::EndPoint server_addr_and_port,
  1105. const RtmpClientOptions& options) {
  1106. butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl);
  1107. if (tmp == NULL) {
  1108. LOG(FATAL) << "Fail to new RtmpClientImpl";
  1109. return -1;
  1110. }
  1111. if (tmp->Init(server_addr_and_port, options) != 0) {
  1112. return -1;
  1113. }
  1114. tmp.swap(_impl);
  1115. return 0;
  1116. }
  1117. int RtmpClient::Init(const char* server_addr_and_port,
  1118. const RtmpClientOptions& options) {
  1119. butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl);
  1120. if (tmp == NULL) {
  1121. LOG(FATAL) << "Fail to new RtmpClientImpl";
  1122. return -1;
  1123. }
  1124. if (tmp->Init(server_addr_and_port, options) != 0) {
  1125. return -1;
  1126. }
  1127. tmp.swap(_impl);
  1128. return 0;
  1129. }
  1130. int RtmpClient::Init(const char* server_addr, int port,
  1131. const RtmpClientOptions& options) {
  1132. butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl);
  1133. if (tmp == NULL) {
  1134. LOG(FATAL) << "Fail to new RtmpClientImpl";
  1135. return -1;
  1136. }
  1137. if (tmp->Init(server_addr, port, options) != 0) {
  1138. return -1;
  1139. }
  1140. tmp.swap(_impl);
  1141. return 0;
  1142. }
  1143. int RtmpClient::Init(const char* naming_service_url,
  1144. const char* load_balancer_name,
  1145. const RtmpClientOptions& options) {
  1146. butil::intrusive_ptr<RtmpClientImpl> tmp(new (std::nothrow) RtmpClientImpl);
  1147. if (tmp == NULL) {
  1148. LOG(FATAL) << "Fail to new RtmpClientImpl";
  1149. return -1;
  1150. }
  1151. if (tmp->Init(naming_service_url, load_balancer_name, options) != 0) {
  1152. return -1;
  1153. }
  1154. tmp.swap(_impl);
  1155. return 0;
  1156. }
  1157. bool RtmpClient::initialized() const { return _impl != NULL; }
  1158. RtmpStreamBase::RtmpStreamBase(bool is_client)
  1159. : _is_client(is_client)
  1160. , _paused(false)
  1161. , _stopped(false)
  1162. , _processing_msg(false)
  1163. , _has_data_ever(false)
  1164. , _message_stream_id(0)
  1165. , _chunk_stream_id(0)
  1166. , _create_realtime_us(butil::gettimeofday_us())
  1167. , _is_server_accepted(false) {
  1168. }
  1169. RtmpStreamBase::~RtmpStreamBase() {
  1170. }
  1171. void RtmpStreamBase::Destroy() {
  1172. return;
  1173. }
  1174. int RtmpStreamBase::SendMessage(uint32_t timestamp,
  1175. uint8_t message_type,
  1176. const butil::IOBuf& body) {
  1177. if (_rtmpsock == NULL) {
  1178. errno = EPERM;
  1179. return -1;
  1180. }
  1181. if (_chunk_stream_id == 0) {
  1182. LOG(ERROR) << "SendXXXMessage can't be called before play() is received";
  1183. errno = EPERM;
  1184. return -1;
  1185. }
  1186. SocketMessagePtr<policy::RtmpUnsentMessage> msg(new policy::RtmpUnsentMessage);
  1187. msg->header.timestamp = timestamp;
  1188. msg->header.message_length = body.size();
  1189. msg->header.message_type = message_type;
  1190. msg->header.stream_id = _message_stream_id;
  1191. msg->chunk_stream_id = _chunk_stream_id;
  1192. msg->body = body;
  1193. return _rtmpsock->Write(msg);
  1194. }
  1195. int RtmpStreamBase::SendControlMessage(
  1196. uint8_t message_type, const void* body, size_t size) {
  1197. if (_rtmpsock == NULL) {
  1198. errno = EPERM;
  1199. return -1;
  1200. }
  1201. SocketMessagePtr<policy::RtmpUnsentMessage> msg(
  1202. policy::MakeUnsentControlMessage(message_type, body, size));
  1203. return _rtmpsock->Write(msg);
  1204. }
  1205. int RtmpStreamBase::SendCuePoint(const RtmpCuePoint& cuepoint) {
  1206. butil::IOBuf req_buf;
  1207. {
  1208. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  1209. AMFOutputStream ostream(&zc_stream);
  1210. WriteAMFString(RTMP_AMF0_SET_DATAFRAME, &ostream);
  1211. WriteAMFString(RTMP_AMF0_ON_CUE_POINT, &ostream);
  1212. WriteAMFObject(cuepoint.data, &ostream);
  1213. if (!ostream.good()) {
  1214. LOG(ERROR) << "Fail to serialize cuepoint";
  1215. return -1;
  1216. }
  1217. }
  1218. return SendMessage(cuepoint.timestamp, policy::RTMP_MESSAGE_DATA_AMF0, req_buf);
  1219. }
  1220. int RtmpStreamBase::SendMetaData(const RtmpMetaData& metadata,
  1221. const butil::StringPiece& name) {
  1222. butil::IOBuf req_buf;
  1223. {
  1224. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  1225. AMFOutputStream ostream(&zc_stream);
  1226. WriteAMFString(name, &ostream);
  1227. WriteAMFObject(metadata.data, &ostream);
  1228. if (!ostream.good()) {
  1229. LOG(ERROR) << "Fail to serialize metadata";
  1230. return -1;
  1231. }
  1232. }
  1233. return SendMessage(metadata.timestamp, policy::RTMP_MESSAGE_DATA_AMF0, req_buf);
  1234. }
  1235. int RtmpStreamBase::SendSharedObjectMessage(const RtmpSharedObjectMessage&) {
  1236. CHECK(false) << "Not supported yet";
  1237. return -1;
  1238. }
  1239. int RtmpStreamBase::SendAudioMessage(const RtmpAudioMessage& msg) {
  1240. if (_rtmpsock == NULL) {
  1241. errno = EPERM;
  1242. return -1;
  1243. }
  1244. if (_chunk_stream_id == 0) {
  1245. LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received";
  1246. errno = EPERM;
  1247. return -1;
  1248. }
  1249. if (_paused) {
  1250. errno = EPERM;
  1251. return -1;
  1252. }
  1253. SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage);
  1254. msg2->header.timestamp = msg.timestamp;
  1255. msg2->header.message_length = msg.size();
  1256. msg2->header.message_type = policy::RTMP_MESSAGE_AUDIO;
  1257. msg2->header.stream_id = _message_stream_id;
  1258. msg2->chunk_stream_id = _chunk_stream_id;
  1259. // Make audio header.
  1260. const char audio_head =
  1261. ((msg.codec & 0xF) << 4)
  1262. | ((msg.rate & 0x3) << 2)
  1263. | ((msg.bits & 0x1) << 1)
  1264. | (msg.type & 0x1);
  1265. msg2->body.push_back(audio_head);
  1266. msg2->body.append(msg.data);
  1267. return _rtmpsock->Write(msg2);
  1268. }
  1269. int RtmpStreamBase::SendAACMessage(const RtmpAACMessage& msg) {
  1270. if (_rtmpsock == NULL) {
  1271. errno = EPERM;
  1272. return -1;
  1273. }
  1274. if (_chunk_stream_id == 0) {
  1275. LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received";
  1276. errno = EPERM;
  1277. return -1;
  1278. }
  1279. if (_paused) {
  1280. errno = EPERM;
  1281. return -1;
  1282. }
  1283. SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage);
  1284. msg2->header.timestamp = msg.timestamp;
  1285. msg2->header.message_length = msg.size();
  1286. msg2->header.message_type = policy::RTMP_MESSAGE_AUDIO;
  1287. msg2->header.stream_id = _message_stream_id;
  1288. msg2->chunk_stream_id = _chunk_stream_id;
  1289. // Make audio header.
  1290. char aac_head[2];
  1291. aac_head[0] = ((FLV_AUDIO_AAC & 0xF) << 4)
  1292. | ((msg.rate & 0x3) << 2)
  1293. | ((msg.bits & 0x1) << 1)
  1294. | (msg.type & 0x1);
  1295. aac_head[1] = (FlvAACPacketType)msg.packet_type;
  1296. msg2->body.append(aac_head, sizeof(aac_head));
  1297. msg2->body.append(msg.data);
  1298. return _rtmpsock->Write(msg2);
  1299. }
  1300. int RtmpStreamBase::SendUserMessage(void*) {
  1301. CHECK(false) << "You should implement your own SendUserMessage";
  1302. return 0;
  1303. }
  1304. int RtmpStreamBase::SendVideoMessage(const RtmpVideoMessage& msg) {
  1305. if (_rtmpsock == NULL) {
  1306. errno = EPERM;
  1307. return -1;
  1308. }
  1309. if (_chunk_stream_id == 0) {
  1310. LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received";
  1311. errno = EPERM;
  1312. return -1;
  1313. }
  1314. if (!policy::is_video_frame_type_valid(msg.frame_type)) {
  1315. LOG(WARNING) << "Invalid frame_type=" << (int)msg.frame_type;
  1316. }
  1317. if (!policy::is_video_codec_valid(msg.codec)) {
  1318. LOG(WARNING) << "Invalid codec=" << (int)msg.codec;
  1319. }
  1320. if (_paused) {
  1321. errno = EPERM;
  1322. return -1;
  1323. }
  1324. SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage);
  1325. msg2->header.timestamp = msg.timestamp;
  1326. msg2->header.message_length = msg.size();
  1327. msg2->header.message_type = policy::RTMP_MESSAGE_VIDEO;
  1328. msg2->header.stream_id = _message_stream_id;
  1329. msg2->chunk_stream_id = _chunk_stream_id;
  1330. // Make video header
  1331. const char video_head = ((msg.frame_type & 0xF) << 4) | (msg.codec & 0xF);
  1332. msg2->body.push_back(video_head);
  1333. msg2->body.append(msg.data);
  1334. return _rtmpsock->Write(msg2);
  1335. }
  1336. int RtmpStreamBase::SendAVCMessage(const RtmpAVCMessage& msg) {
  1337. if (_rtmpsock == NULL) {
  1338. errno = EPERM;
  1339. return -1;
  1340. }
  1341. if (_chunk_stream_id == 0) {
  1342. LOG(ERROR) << __FUNCTION__ << " can't be called before play() is received";
  1343. errno = EPERM;
  1344. return -1;
  1345. }
  1346. if (!policy::is_video_frame_type_valid(msg.frame_type)) {
  1347. LOG(WARNING) << "Invalid frame_type=" << (int)msg.frame_type;
  1348. }
  1349. if (_paused) {
  1350. errno = EPERM;
  1351. return -1;
  1352. }
  1353. SocketMessagePtr<policy::RtmpUnsentMessage> msg2(new policy::RtmpUnsentMessage);
  1354. msg2->header.timestamp = msg.timestamp;
  1355. msg2->header.message_length = msg.size();
  1356. msg2->header.message_type = policy::RTMP_MESSAGE_VIDEO;
  1357. msg2->header.stream_id = _message_stream_id;
  1358. msg2->chunk_stream_id = _chunk_stream_id;
  1359. // Make video header
  1360. char avc_head[5];
  1361. char* p = avc_head;
  1362. *p++ = ((msg.frame_type & 0xF) << 4) | (FLV_VIDEO_AVC & 0xF);
  1363. *p++ = (FlvAVCPacketType)msg.packet_type;
  1364. policy::WriteBigEndian3Bytes(&p, msg.composition_time);
  1365. msg2->body.append(avc_head, sizeof(avc_head));
  1366. msg2->body.append(msg.data);
  1367. return _rtmpsock->Write(msg2);
  1368. }
  1369. int RtmpStreamBase::SendStopMessage(const butil::StringPiece&) {
  1370. return -1;
  1371. }
  1372. const char* RtmpObjectEncoding2Str(RtmpObjectEncoding e) {
  1373. switch (e) {
  1374. case RTMP_AMF0: return "AMF0";
  1375. case RTMP_AMF3: return "AMF3";
  1376. }
  1377. return "Unknown RtmpObjectEncoding";
  1378. }
  1379. void RtmpStreamBase::SignalError() {
  1380. return;
  1381. }
  1382. void RtmpStreamBase::OnFirstMessage() {}
  1383. void RtmpStreamBase::OnUserData(void*) {
  1384. LOG(INFO) << remote_side() << '[' << stream_id()
  1385. << "] ignored UserData{}";
  1386. }
  1387. void RtmpStreamBase::OnCuePoint(RtmpCuePoint* cuepoint) {
  1388. LOG(INFO) << remote_side() << '[' << stream_id()
  1389. << "] ignored CuePoint{" << cuepoint->data << '}';
  1390. }
  1391. void RtmpStreamBase::OnMetaData(RtmpMetaData* metadata, const butil::StringPiece& name) {
  1392. LOG(INFO) << remote_side() << '[' << stream_id()
  1393. << "] ignored MetaData{" << metadata->data << '}'
  1394. << " name{" << name << '}';
  1395. }
  1396. void RtmpStreamBase::OnSharedObjectMessage(RtmpSharedObjectMessage*) {
  1397. LOG(ERROR) << remote_side() << '[' << stream_id()
  1398. << "] ignored SharedObjectMessage{}";
  1399. }
  1400. void RtmpStreamBase::OnAudioMessage(RtmpAudioMessage* msg) {
  1401. LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored " << *msg;
  1402. }
  1403. void RtmpStreamBase::OnVideoMessage(RtmpVideoMessage* msg) {
  1404. LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored " << *msg;
  1405. }
  1406. void RtmpStreamBase::OnStop() {
  1407. // do nothing by default
  1408. }
  1409. bool RtmpStreamBase::BeginProcessingMessage(const char* fun_name) {
  1410. std::unique_lock<butil::Mutex> mu(_call_mutex);
  1411. if (_stopped) {
  1412. mu.unlock();
  1413. LOG(ERROR) << fun_name << " is called after OnStop()";
  1414. return false;
  1415. }
  1416. if (_processing_msg) {
  1417. mu.unlock();
  1418. LOG(ERROR) << "Impossible: Another OnXXXMessage is being called!";
  1419. return false;
  1420. }
  1421. _processing_msg = true;
  1422. if (!_has_data_ever) {
  1423. _has_data_ever = true;
  1424. OnFirstMessage();
  1425. }
  1426. return true;
  1427. }
  1428. void RtmpStreamBase::EndProcessingMessage() {
  1429. std::unique_lock<butil::Mutex> mu(_call_mutex);
  1430. _processing_msg = false;
  1431. if (_stopped) {
  1432. mu.unlock();
  1433. return OnStop();
  1434. }
  1435. }
  1436. void RtmpStreamBase::CallOnUserData(void* data) {
  1437. if (BeginProcessingMessage("OnUserData()")) {
  1438. OnUserData(data);
  1439. EndProcessingMessage();
  1440. }
  1441. }
  1442. void RtmpStreamBase::CallOnCuePoint(RtmpCuePoint* obj) {
  1443. if (BeginProcessingMessage("OnCuePoint()")) {
  1444. OnCuePoint(obj);
  1445. EndProcessingMessage();
  1446. }
  1447. }
  1448. void RtmpStreamBase::CallOnMetaData(RtmpMetaData* obj, const butil::StringPiece& name) {
  1449. if (BeginProcessingMessage("OnMetaData()")) {
  1450. OnMetaData(obj, name);
  1451. EndProcessingMessage();
  1452. }
  1453. }
  1454. void RtmpStreamBase::CallOnSharedObjectMessage(RtmpSharedObjectMessage* msg) {
  1455. if (BeginProcessingMessage("OnSharedObjectMessage()")) {
  1456. OnSharedObjectMessage(msg);
  1457. EndProcessingMessage();
  1458. }
  1459. }
  1460. void RtmpStreamBase::CallOnAudioMessage(RtmpAudioMessage* msg) {
  1461. if (BeginProcessingMessage("OnAudioMessage()")) {
  1462. OnAudioMessage(msg);
  1463. EndProcessingMessage();
  1464. }
  1465. }
  1466. void RtmpStreamBase::CallOnVideoMessage(RtmpVideoMessage* msg) {
  1467. if (BeginProcessingMessage("OnVideoMessage()")) {
  1468. OnVideoMessage(msg);
  1469. EndProcessingMessage();
  1470. }
  1471. }
  1472. void RtmpStreamBase::CallOnStop() {
  1473. {
  1474. std::unique_lock<butil::Mutex> mu(_call_mutex);
  1475. if (_stopped) {
  1476. mu.unlock();
  1477. LOG(ERROR) << "OnStop() was called more than once";
  1478. return;
  1479. }
  1480. _stopped = true;
  1481. if (_processing_msg) {
  1482. // EndProcessingMessage() will call OnStop();
  1483. return;
  1484. }
  1485. }
  1486. OnStop();
  1487. }
  1488. butil::EndPoint RtmpStreamBase::remote_side() const
  1489. { return _rtmpsock ? _rtmpsock->remote_side() : butil::EndPoint(); }
  1490. butil::EndPoint RtmpStreamBase::local_side() const
  1491. { return _rtmpsock ? _rtmpsock->local_side() : butil::EndPoint(); }
  1492. // ============ RtmpClientStream =============
  1493. RtmpClientStream::RtmpClientStream()
  1494. : RtmpStreamBase(true)
  1495. , _onfail_id(INVALID_BTHREAD_ID)
  1496. , _create_stream_rpc_id(INVALID_BTHREAD_ID)
  1497. , _from_socketmap(true)
  1498. , _created_stream_with_play_or_publish(false)
  1499. , _state(STATE_UNINITIALIZED) {
  1500. get_rtmp_bvars()->client_stream_count << 1;
  1501. _self_ref.reset(this);
  1502. }
  1503. RtmpClientStream::~RtmpClientStream() {
  1504. get_rtmp_bvars()->client_stream_count << -1;
  1505. }
  1506. void RtmpClientStream::Destroy() {
  1507. bthread_id_t onfail_id = INVALID_BTHREAD_ID;
  1508. CallId create_stream_rpc_id = INVALID_BTHREAD_ID;
  1509. butil::intrusive_ptr<RtmpClientStream> self_ref;
  1510. std::unique_lock<butil::Mutex> mu(_state_mutex);
  1511. switch (_state) {
  1512. case STATE_UNINITIALIZED:
  1513. _state = STATE_DESTROYING;
  1514. mu.unlock();
  1515. OnStopInternal();
  1516. _self_ref.swap(self_ref);
  1517. return;
  1518. case STATE_CREATING:
  1519. _state = STATE_DESTROYING;
  1520. create_stream_rpc_id = _create_stream_rpc_id;
  1521. mu.unlock();
  1522. _self_ref.swap(self_ref);
  1523. StartCancel(create_stream_rpc_id);
  1524. return;
  1525. case STATE_CREATED:
  1526. _state = STATE_DESTROYING;
  1527. onfail_id = _onfail_id;
  1528. mu.unlock();
  1529. _self_ref.swap(self_ref);
  1530. bthread_id_error(onfail_id, 0);
  1531. return;
  1532. case STATE_ERROR:
  1533. _state = STATE_DESTROYING;
  1534. mu.unlock();
  1535. _self_ref.swap(self_ref);
  1536. return;
  1537. case STATE_DESTROYING:
  1538. // Destroy() was already called.
  1539. return;
  1540. }
  1541. }
  1542. void RtmpClientStream::SignalError() {
  1543. bthread_id_t onfail_id = INVALID_BTHREAD_ID;
  1544. std::unique_lock<butil::Mutex> mu(_state_mutex);
  1545. switch (_state) {
  1546. case STATE_UNINITIALIZED:
  1547. _state = STATE_ERROR;
  1548. mu.unlock();
  1549. OnStopInternal();
  1550. return;
  1551. case STATE_CREATING:
  1552. _state = STATE_ERROR;
  1553. mu.unlock();
  1554. return;
  1555. case STATE_CREATED:
  1556. _state = STATE_ERROR;
  1557. onfail_id = _onfail_id;
  1558. mu.unlock();
  1559. bthread_id_error(onfail_id, 0);
  1560. return;
  1561. case STATE_ERROR:
  1562. case STATE_DESTROYING:
  1563. // SignalError() or Destroy() was already called.
  1564. return;
  1565. }
  1566. }
  1567. StreamUserData* RtmpClientStream::OnCreatingStream(
  1568. SocketUniquePtr* inout, Controller* cntl) {
  1569. {
  1570. std::unique_lock<butil::Mutex> mu(_state_mutex);
  1571. if (_state == STATE_ERROR || _state == STATE_DESTROYING) {
  1572. cntl->SetFailed(EINVAL, "Fail to replace socket for stream, _state is error or destroying");
  1573. return NULL;
  1574. }
  1575. }
  1576. SocketId esid;
  1577. if (cntl->connection_type() == CONNECTION_TYPE_SHORT) {
  1578. if (_client_impl->CreateSocket((*inout)->remote_side(), &esid) != 0) {
  1579. cntl->SetFailed(EINVAL, "Fail to create RTMP socket");
  1580. return NULL;
  1581. }
  1582. } else {
  1583. if (_client_impl->socket_map().Insert(
  1584. SocketMapKey((*inout)->remote_side()), &esid) != 0) {
  1585. cntl->SetFailed(EINVAL, "Fail to get the RTMP socket");
  1586. return NULL;
  1587. }
  1588. }
  1589. SocketUniquePtr tmp_ptr;
  1590. if (Socket::Address(esid, &tmp_ptr) != 0) {
  1591. cntl->SetFailed(EFAILEDSOCKET, "Fail to address RTMP SocketId=%" PRIu64
  1592. " from SocketMap of RtmpClient=%p",
  1593. esid, _client_impl.get());
  1594. return NULL;
  1595. }
  1596. RPC_VLOG << "Replace Socket For Stream, RTMP socketId=" << esid
  1597. << ", main socketId=" << (*inout)->id();
  1598. tmp_ptr->ShareStats(inout->get());
  1599. inout->reset(tmp_ptr.release());
  1600. return this;
  1601. }
  1602. int RtmpClientStream::RunOnFailed(bthread_id_t id, void* data, int) {
  1603. butil::intrusive_ptr<RtmpClientStream> stream(
  1604. static_cast<RtmpClientStream*>(data), false);
  1605. CHECK(stream->_rtmpsock);
  1606. // Must happen after NotifyOnFailed which is after all other callsites
  1607. // to OnStopInternal().
  1608. stream->OnStopInternal();
  1609. bthread_id_unlock_and_destroy(id);
  1610. return 0;
  1611. }
  1612. void RtmpClientStream::OnFailedToCreateStream() {
  1613. {
  1614. std::unique_lock<butil::Mutex> mu(_state_mutex);
  1615. switch (_state) {
  1616. case STATE_CREATING:
  1617. _state = STATE_ERROR;
  1618. break;
  1619. case STATE_UNINITIALIZED:
  1620. case STATE_CREATED:
  1621. _state = STATE_ERROR;
  1622. mu.unlock();
  1623. CHECK(false) << "Impossible";
  1624. break;
  1625. case STATE_ERROR:
  1626. case STATE_DESTROYING:
  1627. break;
  1628. }
  1629. }
  1630. return OnStopInternal();
  1631. }
  1632. void RtmpClientStream::DestroyStreamUserData(SocketUniquePtr& sending_sock,
  1633. Controller* cntl,
  1634. int /*error_code*/,
  1635. bool end_of_rpc) {
  1636. if (!end_of_rpc) {
  1637. if (sending_sock) {
  1638. if (_from_socketmap) {
  1639. _client_impl->socket_map().Remove(SocketMapKey(sending_sock->remote_side()),
  1640. sending_sock->id());
  1641. } else {
  1642. sending_sock->SetFailed(); // not necessary, already failed.
  1643. }
  1644. }
  1645. } else {
  1646. // Always move sending_sock into _rtmpsock at the end of rpc.
  1647. // - If the RPC is successful, moving sending_sock prevents it from
  1648. // setfailed in Controller after calling this method.
  1649. // - If the RPC is failed, OnStopInternal() can clean up the socket_map
  1650. // inserted in OnCreatingStream().
  1651. _rtmpsock.swap(sending_sock);
  1652. }
  1653. }
  1654. void RtmpClientStream::DestroyStreamCreator(Controller* cntl) {
  1655. if (cntl->Failed()) {
  1656. if (_rtmpsock != NULL &&
  1657. // ^ If sending_sock is NULL, the RPC fails before _pack_request
  1658. // which calls AddTransaction, in another word, RemoveTransaction
  1659. // is not needed.
  1660. cntl->ErrorCode() != ERTMPCREATESTREAM) {
  1661. // ^ ERTMPCREATESTREAM is triggered by receiving "_error" command,
  1662. // RemoveTransaction should already be called.
  1663. CHECK_LT(cntl->log_id(), (uint64_t)std::numeric_limits<uint32_t>::max());
  1664. const uint32_t transaction_id = cntl->log_id();
  1665. policy::RtmpContext* rtmp_ctx =
  1666. static_cast<policy::RtmpContext*>(_rtmpsock->parsing_context());
  1667. if (rtmp_ctx == NULL) {
  1668. LOG(FATAL) << "RtmpContext must be created";
  1669. } else {
  1670. policy::RtmpTransactionHandler* handler =
  1671. rtmp_ctx->RemoveTransaction(transaction_id);
  1672. if (handler) {
  1673. handler->Cancel();
  1674. }
  1675. }
  1676. }
  1677. return OnFailedToCreateStream();
  1678. }
  1679. int rc = 0;
  1680. bthread_id_t onfail_id = INVALID_BTHREAD_ID;
  1681. {
  1682. std::unique_lock<butil::Mutex> mu(_state_mutex);
  1683. switch (_state) {
  1684. case STATE_CREATING:
  1685. CHECK(_rtmpsock);
  1686. rc = bthread_id_create(&onfail_id, this, RunOnFailed);
  1687. if (rc) {
  1688. cntl->SetFailed(ENOMEM, "Fail to create _onfail_id: %s", berror(rc));
  1689. mu.unlock();
  1690. return OnFailedToCreateStream();
  1691. }
  1692. // Add a ref for RunOnFailed.
  1693. butil::intrusive_ptr<RtmpClientStream>(this).detach();
  1694. _state = STATE_CREATED;
  1695. _onfail_id = onfail_id;
  1696. break;
  1697. case STATE_UNINITIALIZED:
  1698. case STATE_CREATED:
  1699. _state = STATE_ERROR;
  1700. mu.unlock();
  1701. CHECK(false) << "Impossible";
  1702. return OnStopInternal();
  1703. case STATE_ERROR:
  1704. case STATE_DESTROYING:
  1705. mu.unlock();
  1706. return OnStopInternal();
  1707. }
  1708. }
  1709. if (onfail_id != INVALID_BTHREAD_ID) {
  1710. _rtmpsock->NotifyOnFailed(onfail_id);
  1711. }
  1712. }
  1713. void RtmpClientStream::OnStopInternal() {
  1714. if (_rtmpsock == NULL) {
  1715. return CallOnStop();
  1716. }
  1717. if (!_rtmpsock->Failed() && _chunk_stream_id != 0) {
  1718. // SRS requires closeStream which is sent over this stream.
  1719. butil::IOBuf req_buf1;
  1720. {
  1721. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf1);
  1722. AMFOutputStream ostream(&zc_stream);
  1723. WriteAMFString(RTMP_AMF0_COMMAND_CLOSE_STREAM, &ostream);
  1724. WriteAMFUint32(0, &ostream);
  1725. WriteAMFNull(&ostream);
  1726. CHECK(ostream.good());
  1727. }
  1728. SocketMessagePtr<policy::RtmpUnsentMessage> msg1(new policy::RtmpUnsentMessage);
  1729. msg1->header.message_length = req_buf1.size();
  1730. msg1->header.message_type = policy::RTMP_MESSAGE_COMMAND_AMF0;
  1731. msg1->header.stream_id = _message_stream_id;
  1732. msg1->chunk_stream_id = _chunk_stream_id;
  1733. msg1->body = req_buf1;
  1734. // Send deleteStream over the control stream.
  1735. butil::IOBuf req_buf2;
  1736. {
  1737. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf2);
  1738. AMFOutputStream ostream(&zc_stream);
  1739. WriteAMFString(RTMP_AMF0_COMMAND_DELETE_STREAM, &ostream);
  1740. WriteAMFUint32(0, &ostream);
  1741. WriteAMFNull(&ostream);
  1742. WriteAMFUint32(_message_stream_id, &ostream);
  1743. CHECK(ostream.good());
  1744. }
  1745. policy::RtmpUnsentMessage* msg2 = policy::MakeUnsentControlMessage(
  1746. policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf2);
  1747. msg1->next.reset(msg2);
  1748. if (policy::WriteWithoutOvercrowded(_rtmpsock.get(), msg1) != 0) {
  1749. if (errno != EFAILEDSOCKET) {
  1750. PLOG(WARNING) << "Fail to send closeStream/deleteStream to "
  1751. << _rtmpsock->remote_side() << "["
  1752. << _message_stream_id << "]";
  1753. // Close the connection to make sure the server-side knows the
  1754. // closing event, however this may terminate other streams over
  1755. // the connection as well.
  1756. _rtmpsock->SetFailed(EFAILEDSOCKET, "Fail to send closeStream/deleteStream");
  1757. }
  1758. }
  1759. }
  1760. policy::RtmpContext* ctx =
  1761. static_cast<policy::RtmpContext*>(_rtmpsock->parsing_context());
  1762. if (ctx != NULL) {
  1763. if (!ctx->RemoveMessageStream(this)) {
  1764. // The stream is not registered yet. Is this normal?
  1765. LOG(ERROR) << "Fail to remove stream_id=" << _message_stream_id;
  1766. }
  1767. } else {
  1768. LOG(FATAL) << "RtmpContext of " << *_rtmpsock << " is NULL";
  1769. }
  1770. if (_from_socketmap) {
  1771. _client_impl->socket_map().Remove(SocketMapKey(_rtmpsock->remote_side()),
  1772. _rtmpsock->id());
  1773. } else {
  1774. _rtmpsock->ReleaseAdditionalReference();
  1775. }
  1776. CallOnStop();
  1777. }
  1778. RtmpPlayOptions::RtmpPlayOptions()
  1779. : start(-2)
  1780. , duration(-1)
  1781. , reset(true) {
  1782. }
  1783. int RtmpClientStream::Play(const RtmpPlayOptions& opt) {
  1784. if (_rtmpsock == NULL) {
  1785. errno = EPERM;
  1786. return -1;
  1787. }
  1788. if (opt.stream_name.empty()) {
  1789. LOG(ERROR) << "Empty stream_name";
  1790. errno = EINVAL;
  1791. return -1;
  1792. }
  1793. if (_client_impl == NULL) {
  1794. LOG(ERROR) << "The client stream is not created yet";
  1795. errno = EPERM;
  1796. return -1;
  1797. }
  1798. butil::IOBuf req_buf;
  1799. {
  1800. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  1801. AMFOutputStream ostream(&zc_stream);
  1802. WriteAMFString(RTMP_AMF0_COMMAND_PLAY, &ostream);
  1803. WriteAMFUint32(0, &ostream);
  1804. WriteAMFNull(&ostream);
  1805. WriteAMFString(opt.stream_name, &ostream);
  1806. WriteAMFNumber(opt.start, &ostream);
  1807. WriteAMFNumber(opt.duration, &ostream);
  1808. WriteAMFBool(opt.reset, &ostream);
  1809. CHECK(ostream.good());
  1810. }
  1811. SocketMessagePtr<policy::RtmpUnsentMessage> msg1(new policy::RtmpUnsentMessage);
  1812. msg1->header.message_length = req_buf.size();
  1813. msg1->header.message_type = policy::RTMP_MESSAGE_COMMAND_AMF0;
  1814. msg1->header.stream_id = _message_stream_id;
  1815. msg1->chunk_stream_id = _chunk_stream_id;
  1816. msg1->body = req_buf;
  1817. if (_client_impl->options().buffer_length_ms > 0) {
  1818. char data[10];
  1819. char* p = data;
  1820. policy::WriteBigEndian2Bytes(
  1821. &p, policy::RTMP_USER_CONTROL_EVENT_SET_BUFFER_LENGTH);
  1822. policy::WriteBigEndian4Bytes(&p, stream_id());
  1823. policy::WriteBigEndian4Bytes(&p, _client_impl->options().buffer_length_ms);
  1824. policy::RtmpUnsentMessage* msg2 = policy::MakeUnsentControlMessage(
  1825. policy::RTMP_MESSAGE_USER_CONTROL, data, sizeof(data));
  1826. msg1->next.reset(msg2);
  1827. }
  1828. // FIXME(gejun): Do we need to SetChunkSize for play?
  1829. // if (_client_impl->options().chunk_size > policy::RTMP_INITIAL_CHUNK_SIZE) {
  1830. // if (SetChunkSize(_client_impl->options().chunk_size) != 0) {
  1831. // return -1;
  1832. // }
  1833. // }
  1834. return _rtmpsock->Write(msg1);
  1835. }
  1836. int RtmpClientStream::Play2(const RtmpPlay2Options& opt) {
  1837. butil::IOBuf req_buf;
  1838. {
  1839. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  1840. AMFOutputStream ostream(&zc_stream);
  1841. WriteAMFString(RTMP_AMF0_COMMAND_PLAY2, &ostream);
  1842. WriteAMFUint32(0, &ostream);
  1843. WriteAMFNull(&ostream);
  1844. WriteAMFObject(opt, &ostream);
  1845. if (!ostream.good()) {
  1846. LOG(ERROR) << "Fail to serialize play2 request";
  1847. errno = EINVAL;
  1848. return -1;
  1849. }
  1850. }
  1851. return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf);
  1852. }
  1853. const char* RtmpPublishType2Str(RtmpPublishType type) {
  1854. switch (type) {
  1855. case RTMP_PUBLISH_RECORD: return "record";
  1856. case RTMP_PUBLISH_APPEND: return "append";
  1857. case RTMP_PUBLISH_LIVE: return "live";
  1858. }
  1859. return "Unknown RtmpPublishType";
  1860. }
  1861. bool Str2RtmpPublishType(const butil::StringPiece& str, RtmpPublishType* type) {
  1862. if (str == "record") {
  1863. *type = RTMP_PUBLISH_RECORD;
  1864. return true;
  1865. } else if (str == "append") {
  1866. *type = RTMP_PUBLISH_APPEND;
  1867. return true;
  1868. } else if (str == "live") {
  1869. *type = RTMP_PUBLISH_LIVE;
  1870. return true;
  1871. }
  1872. return false;
  1873. }
  1874. int RtmpClientStream::Publish(const butil::StringPiece& name,
  1875. RtmpPublishType type) {
  1876. butil::IOBuf req_buf;
  1877. {
  1878. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  1879. AMFOutputStream ostream(&zc_stream);
  1880. WriteAMFString(RTMP_AMF0_COMMAND_PUBLISH, &ostream);
  1881. WriteAMFUint32(0, &ostream);
  1882. WriteAMFNull(&ostream);
  1883. WriteAMFString(name, &ostream);
  1884. WriteAMFString(RtmpPublishType2Str(type), &ostream);
  1885. CHECK(ostream.good());
  1886. }
  1887. return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf);
  1888. }
  1889. int RtmpClientStream::Seek(double offset_ms) {
  1890. butil::IOBuf req_buf;
  1891. {
  1892. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  1893. AMFOutputStream ostream(&zc_stream);
  1894. WriteAMFString(RTMP_AMF0_COMMAND_SEEK, &ostream);
  1895. WriteAMFUint32(0, &ostream);
  1896. WriteAMFNull(&ostream);
  1897. WriteAMFNumber(offset_ms, &ostream);
  1898. CHECK(ostream.good());
  1899. }
  1900. return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf);
  1901. }
  1902. int RtmpClientStream::Pause(bool pause_or_unpause, double offset_ms) {
  1903. butil::IOBuf req_buf;
  1904. {
  1905. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  1906. AMFOutputStream ostream(&zc_stream);
  1907. WriteAMFString(RTMP_AMF0_COMMAND_PAUSE, &ostream);
  1908. WriteAMFUint32(0, &ostream);
  1909. WriteAMFNull(&ostream);
  1910. WriteAMFBool(pause_or_unpause, &ostream);
  1911. WriteAMFNumber(offset_ms, &ostream);
  1912. CHECK(ostream.good());
  1913. }
  1914. return SendMessage(0, policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf);
  1915. }
  1916. void RtmpClientStream::OnStatus(const RtmpInfo& info) {
  1917. if (info.level() == RTMP_INFO_LEVEL_ERROR) {
  1918. LOG(WARNING) << remote_side() << '[' << stream_id()
  1919. << "] " << info.code() << ": " << info.description();
  1920. return SignalError();
  1921. } else if (info.level() == RTMP_INFO_LEVEL_STATUS) {
  1922. if ((!_options.play_name.empty() &&
  1923. info.code() == RTMP_STATUS_CODE_PLAY_START) ||
  1924. (!_options.publish_name.empty() &&
  1925. info.code() == RTMP_STATUS_CODE_PUBLISH_START)) {
  1926. // the memory fence makes sure that if _is_server_accepted is true,
  1927. // publish request must be sent (so that SendXXX functions can
  1928. // be enabled)
  1929. _is_server_accepted.store(true, butil::memory_order_release);
  1930. }
  1931. }
  1932. }
  1933. RtmpClientStreamOptions::RtmpClientStreamOptions()
  1934. : share_connection(true)
  1935. , wait_until_play_or_publish_is_sent(false)
  1936. , create_stream_max_retry(3)
  1937. , publish_type(RTMP_PUBLISH_LIVE) {
  1938. }
  1939. class OnClientStreamCreated : public google::protobuf::Closure {
  1940. public:
  1941. void Run(); // @Closure
  1942. void CancelBeforeCallMethod() { delete this; }
  1943. public:
  1944. Controller cntl;
  1945. // Hold a reference of stream to prevent it from destructing during an
  1946. // async Create().
  1947. butil::intrusive_ptr<RtmpClientStream> stream;
  1948. };
  1949. void OnClientStreamCreated::Run() {
  1950. std::unique_ptr<OnClientStreamCreated> delete_self(this);
  1951. if (cntl.Failed()) {
  1952. LOG(WARNING) << "Fail to create stream=" << stream->rtmp_url()
  1953. << ": " << cntl.ErrorText();
  1954. return;
  1955. }
  1956. if (stream->_created_stream_with_play_or_publish) {
  1957. // the server accepted the play/publish command packed in createStream
  1958. return;
  1959. }
  1960. const RtmpClientStreamOptions& options = stream->options();
  1961. bool do_nothing = true;
  1962. if (!options.play_name.empty()) {
  1963. do_nothing = false;
  1964. RtmpPlayOptions play_opt;
  1965. play_opt.stream_name = options.play_name;
  1966. if (stream->Play(play_opt) != 0) {
  1967. LOG(WARNING) << "Fail to play " << options.play_name;
  1968. return stream->SignalError();
  1969. }
  1970. }
  1971. if (!options.publish_name.empty()) {
  1972. do_nothing = false;
  1973. if (stream->Publish(options.publish_name, options.publish_type) != 0) {
  1974. LOG(WARNING) << "Fail to publish " << stream->rtmp_url();
  1975. return stream->SignalError();
  1976. }
  1977. }
  1978. if (do_nothing) {
  1979. LOG(ERROR) << "play_name and publish_name are both empty";
  1980. return stream->SignalError();
  1981. }
  1982. }
  1983. void RtmpClientStream::Init(const RtmpClient* client,
  1984. const RtmpClientStreamOptions& options) {
  1985. if (client->_impl == NULL) {
  1986. LOG(FATAL) << "RtmpClient is not initialized";
  1987. return OnStopInternal();
  1988. }
  1989. {
  1990. std::unique_lock<butil::Mutex> mu(_state_mutex);
  1991. if (_state == STATE_DESTROYING || _state == STATE_ERROR) {
  1992. // already Destroy()-ed or SignalError()-ed
  1993. LOG(WARNING) << "RtmpClientStream=" << this << " was already "
  1994. "Destroy()-ed, stop Init()";
  1995. return;
  1996. }
  1997. }
  1998. _client_impl = client->_impl;
  1999. _options = options;
  2000. OnClientStreamCreated* done = new OnClientStreamCreated;
  2001. done->stream.reset(this);
  2002. done->cntl.set_stream_creator(this);
  2003. done->cntl.set_connection_type(_options.share_connection ?
  2004. CONNECTION_TYPE_SINGLE :
  2005. CONNECTION_TYPE_SHORT);
  2006. _from_socketmap = (done->cntl.connection_type() == CONNECTION_TYPE_SINGLE);
  2007. done->cntl.set_max_retry(_options.create_stream_max_retry);
  2008. if (_options.hash_code.has_been_set()) {
  2009. done->cntl.set_request_code(_options.hash_code);
  2010. }
  2011. // Hack: we pass stream as response so that PackRtmpRequest can get
  2012. // the stream from controller.
  2013. google::protobuf::Message* res = (google::protobuf::Message*)this;
  2014. const CallId call_id = done->cntl.call_id();
  2015. {
  2016. std::unique_lock<butil::Mutex> mu(_state_mutex);
  2017. switch (_state) {
  2018. case STATE_UNINITIALIZED:
  2019. _state = STATE_CREATING;
  2020. _create_stream_rpc_id = call_id;
  2021. break;
  2022. case STATE_CREATING:
  2023. case STATE_CREATED:
  2024. mu.unlock();
  2025. LOG(ERROR) << "RtmpClientStream::Init() is called by multiple "
  2026. "threads simultaneously";
  2027. return done->CancelBeforeCallMethod();
  2028. case STATE_ERROR:
  2029. case STATE_DESTROYING:
  2030. mu.unlock();
  2031. return done->CancelBeforeCallMethod();
  2032. }
  2033. }
  2034. _client_impl->_chan.CallMethod(NULL, &done->cntl, NULL, res, done);
  2035. if (options.wait_until_play_or_publish_is_sent) {
  2036. Join(call_id);
  2037. }
  2038. }
  2039. std::string RtmpClientStream::rtmp_url() const {
  2040. if (_client_impl == NULL) {
  2041. return std::string();
  2042. }
  2043. butil::StringPiece tcurl = _client_impl->options().tcUrl;
  2044. butil::StringPiece stream_name = _options.stream_name();
  2045. std::string result;
  2046. result.reserve(tcurl.size() + 1 + stream_name.size());
  2047. result.append(tcurl.data(), tcurl.size());
  2048. result.push_back('/');
  2049. result.append(stream_name.data(), stream_name.size());
  2050. return result;
  2051. }
  2052. // ========= RtmpRetryingClientStream ============
  2053. RtmpRetryingClientStreamOptions::RtmpRetryingClientStreamOptions()
  2054. : retry_interval_ms(1000)
  2055. , max_retry_duration_ms(-1)
  2056. , fast_retry_count(2)
  2057. , quit_when_no_data_ever(true) {
  2058. }
  2059. RtmpRetryingClientStream::RtmpRetryingClientStream()
  2060. : RtmpStreamBase(true)
  2061. , _destroying(false)
  2062. , _called_on_stop(false)
  2063. , _changed_stream(false)
  2064. , _has_timer_ever(false)
  2065. , _is_server_accepted_ever(false)
  2066. , _num_fast_retries(0)
  2067. , _last_creation_time_us(0)
  2068. , _last_retry_start_time_us(0)
  2069. , _create_timer_id(0)
  2070. , _sub_stream_creator(NULL) {
  2071. get_rtmp_bvars()->retrying_client_stream_count << 1;
  2072. _self_ref.reset(this);
  2073. }
  2074. RtmpRetryingClientStream::~RtmpRetryingClientStream() {
  2075. delete _sub_stream_creator;
  2076. _sub_stream_creator = NULL;
  2077. get_rtmp_bvars()->retrying_client_stream_count << -1;
  2078. }
  2079. void RtmpRetryingClientStream::CallOnStopIfNeeded() {
  2080. // CallOnStop uses locks, we don't need memory fence on _called_on_stop,
  2081. // atomic ops is enough.
  2082. if (!_called_on_stop.load(butil::memory_order_relaxed) &&
  2083. !_called_on_stop.exchange(true, butil::memory_order_relaxed)) {
  2084. CallOnStop();
  2085. }
  2086. }
  2087. void RtmpRetryingClientStream::Destroy() {
  2088. if (_destroying.exchange(true, butil::memory_order_relaxed)) {
  2089. // Destroy() was already called.
  2090. return;
  2091. }
  2092. // Make sure _self_ref is released before quiting this function.
  2093. // Notice that _self_ref.reset(NULL) is wrong because it may destructs
  2094. // this object immediately.
  2095. butil::intrusive_ptr<RtmpRetryingClientStream> self_ref;
  2096. _self_ref.swap(self_ref);
  2097. butil::intrusive_ptr<RtmpStreamBase> old_sub_stream;
  2098. {
  2099. BAIDU_SCOPED_LOCK(_stream_mutex);
  2100. // swap instead of reset(NULL) to make the stream destructed
  2101. // outside _stream_mutex.
  2102. _using_sub_stream.swap(old_sub_stream);
  2103. }
  2104. if (old_sub_stream) {
  2105. old_sub_stream->Destroy();
  2106. }
  2107. if (_has_timer_ever) {
  2108. if (bthread_timer_del(_create_timer_id) == 0) {
  2109. // The callback is not run yet. Remove the additional ref added
  2110. // before creating the timer.
  2111. butil::intrusive_ptr<RtmpRetryingClientStream> deref(this, false);
  2112. }
  2113. }
  2114. return CallOnStopIfNeeded();
  2115. }
  2116. void RtmpRetryingClientStream::Init(
  2117. SubStreamCreator* sub_stream_creator,
  2118. const RtmpRetryingClientStreamOptions& options) {
  2119. if (sub_stream_creator == NULL) {
  2120. LOG(ERROR) << "sub_stream_creator is NULL";
  2121. return CallOnStopIfNeeded();
  2122. }
  2123. _sub_stream_creator = sub_stream_creator;
  2124. if (_destroying.load(butil::memory_order_relaxed)) {
  2125. LOG(WARNING) << "RtmpRetryingClientStream=" << this << " was already "
  2126. "Destroy()-ed, stop Init()";
  2127. return;
  2128. }
  2129. _options = options;
  2130. // retrying stream does not support this option.
  2131. _options.wait_until_play_or_publish_is_sent = false;
  2132. _last_retry_start_time_us = butil::gettimeofday_us();
  2133. Recreate();
  2134. }
  2135. void RetryingClientMessageHandler::OnPlayable() {
  2136. _parent->OnPlayable();
  2137. }
  2138. void RetryingClientMessageHandler::OnUserData(void* msg) {
  2139. _parent->CallOnUserData(msg);
  2140. }
  2141. void RetryingClientMessageHandler::OnCuePoint(brpc::RtmpCuePoint* cuepoint) {
  2142. _parent->CallOnCuePoint(cuepoint);
  2143. }
  2144. void RetryingClientMessageHandler::OnMetaData(brpc::RtmpMetaData* metadata, const butil::StringPiece& name) {
  2145. _parent->CallOnMetaData(metadata, name);
  2146. }
  2147. void RetryingClientMessageHandler::OnAudioMessage(brpc::RtmpAudioMessage* msg) {
  2148. _parent->CallOnAudioMessage(msg);
  2149. }
  2150. void RetryingClientMessageHandler::OnVideoMessage(brpc::RtmpVideoMessage* msg) {
  2151. _parent->CallOnVideoMessage(msg);
  2152. }
  2153. void RetryingClientMessageHandler::OnSharedObjectMessage(RtmpSharedObjectMessage* msg) {
  2154. _parent->CallOnSharedObjectMessage(msg);
  2155. }
  2156. void RetryingClientMessageHandler::OnSubStreamStop(RtmpStreamBase* sub_stream) {
  2157. _parent->OnSubStreamStop(sub_stream);
  2158. }
  2159. RetryingClientMessageHandler::RetryingClientMessageHandler(RtmpRetryingClientStream* parent)
  2160. : _parent(parent) {}
  2161. void RtmpRetryingClientStream::Recreate() {
  2162. butil::intrusive_ptr<RtmpStreamBase> sub_stream;
  2163. _sub_stream_creator->NewSubStream(new RetryingClientMessageHandler(this), &sub_stream);
  2164. butil::intrusive_ptr<RtmpStreamBase> old_sub_stream;
  2165. bool destroying = false;
  2166. {
  2167. BAIDU_SCOPED_LOCK(_stream_mutex);
  2168. // Need to check _destroying to avoid setting the new sub_stream to a
  2169. // destroying retrying stream.
  2170. // Note: the load of _destroying and the setting of _using_sub_stream
  2171. // must be in the same lock, otherwise current bthread may be scheduled
  2172. // and Destroy() may be called, making new sub_stream leaked.
  2173. destroying = _destroying.load(butil::memory_order_relaxed);
  2174. if (!destroying) {
  2175. _using_sub_stream.swap(old_sub_stream);
  2176. _using_sub_stream = sub_stream;
  2177. _changed_stream = true;
  2178. }
  2179. }
  2180. if (old_sub_stream) {
  2181. old_sub_stream->Destroy();
  2182. }
  2183. if (destroying) {
  2184. sub_stream->Destroy();
  2185. return;
  2186. }
  2187. _last_creation_time_us = butil::gettimeofday_us();
  2188. // If Init() of sub_stream is called before setting _using_sub_stream,
  2189. // OnStop() may happen before _using_sub_stream is set and the stopped
  2190. // stream is wrongly left in the variable.
  2191. _sub_stream_creator->LaunchSubStream(sub_stream.get(), &_options);
  2192. }
  2193. void RtmpRetryingClientStream::OnRecreateTimer(void* arg) {
  2194. // Hold the referenced stream.
  2195. butil::intrusive_ptr<RtmpRetryingClientStream> ptr(
  2196. static_cast<RtmpRetryingClientStream*>(arg), false/*not add ref*/);
  2197. ptr->Recreate();
  2198. }
  2199. void RtmpRetryingClientStream::OnSubStreamStop(RtmpStreamBase* sub_stream) {
  2200. // Make sure the sub_stream is destroyed after this function.
  2201. DestroyingPtr<RtmpStreamBase> sub_stream_guard(sub_stream);
  2202. butil::intrusive_ptr<RtmpStreamBase> removed_sub_stream;
  2203. {
  2204. BAIDU_SCOPED_LOCK(_stream_mutex);
  2205. if (sub_stream == _using_sub_stream) {
  2206. _using_sub_stream.swap(removed_sub_stream);
  2207. }
  2208. }
  2209. if (removed_sub_stream == NULL ||
  2210. _destroying.load(butil::memory_order_relaxed) ||
  2211. _called_on_stop.load(butil::memory_order_relaxed)) {
  2212. return;
  2213. }
  2214. // Update _is_server_accepted_ever
  2215. if (sub_stream->is_server_accepted()) {
  2216. _is_server_accepted_ever = true;
  2217. }
  2218. if (_options.max_retry_duration_ms == 0) {
  2219. return CallOnStopIfNeeded();
  2220. }
  2221. // If the sub_stream has data ever, count this retry as the beginning
  2222. // of RtmpRetryingClientStreamOptions.max_retry_duration_ms.
  2223. if ((!_options.play_name.empty() && sub_stream->has_data_ever()) ||
  2224. (!_options.publish_name.empty() && sub_stream->is_server_accepted())) {
  2225. const int64_t now = butil::gettimeofday_us();
  2226. if (now >= _last_retry_start_time_us +
  2227. 3 * _options.retry_interval_ms * 1000L) {
  2228. // re-enable fast retries when the interval is long enough.
  2229. // `3' is just a randomly-chosen (small) number.
  2230. _num_fast_retries = 0;
  2231. }
  2232. _last_retry_start_time_us = now;
  2233. }
  2234. // Check max duration. Notice that this branch cannot be moved forward
  2235. // above branch which may update _last_retry_start_time_us
  2236. if (_options.max_retry_duration_ms > 0 &&
  2237. butil::gettimeofday_us() >
  2238. (_last_retry_start_time_us + _options.max_retry_duration_ms * 1000L)) {
  2239. // exceed the duration, stop retrying.
  2240. return CallOnStopIfNeeded();
  2241. }
  2242. if (_num_fast_retries < _options.fast_retry_count) {
  2243. ++_num_fast_retries;
  2244. // Retry immediately for several times. Works for scenarios like:
  2245. // restarting servers, occasional connection lost etc...
  2246. return Recreate();
  2247. }
  2248. if (_options.quit_when_no_data_ever &&
  2249. ((!_options.play_name.empty() && !has_data_ever()) ||
  2250. (!_options.publish_name.empty() && !_is_server_accepted_ever))) {
  2251. // Stop retrying when created playing streams never have data or
  2252. // publishing streams were never accepted. It's very likely that
  2253. // continuing retrying does not make sense.
  2254. return CallOnStopIfNeeded();
  2255. }
  2256. const int64_t wait_us = _last_creation_time_us +
  2257. _options.retry_interval_ms * 1000L - butil::gettimeofday_us();
  2258. if (wait_us > 0) {
  2259. // retry is too frequent, schedule the retry.
  2260. // Add a ref for OnRecreateTimer which does deref.
  2261. butil::intrusive_ptr<RtmpRetryingClientStream>(this).detach();
  2262. if (bthread_timer_add(&_create_timer_id,
  2263. butil::microseconds_from_now(wait_us),
  2264. OnRecreateTimer, this) != 0) {
  2265. LOG(ERROR) << "Fail to create timer";
  2266. return CallOnStopIfNeeded();
  2267. }
  2268. _has_timer_ever = true;
  2269. } else {
  2270. Recreate();
  2271. }
  2272. }
  2273. int RtmpRetryingClientStream::AcquireStreamToSend(
  2274. butil::intrusive_ptr<RtmpStreamBase>* ptr) {
  2275. BAIDU_SCOPED_LOCK(_stream_mutex);
  2276. if (!_using_sub_stream) {
  2277. errno = EPERM;
  2278. return -1;
  2279. }
  2280. if (!_using_sub_stream->is_server_accepted()) {
  2281. // not published yet.
  2282. errno = EPERM;
  2283. return -1;
  2284. }
  2285. if (_changed_stream) {
  2286. _changed_stream = false;
  2287. errno = ERTMPPUBLISHABLE;
  2288. return -1;
  2289. }
  2290. *ptr = _using_sub_stream;
  2291. return 0;
  2292. }
  2293. int RtmpRetryingClientStream::SendCuePoint(const RtmpCuePoint& obj) {
  2294. butil::intrusive_ptr<RtmpStreamBase> ptr;
  2295. if (AcquireStreamToSend(&ptr) != 0) {
  2296. return -1;
  2297. }
  2298. return ptr->SendCuePoint(obj);
  2299. }
  2300. int RtmpRetryingClientStream::SendMetaData(const RtmpMetaData& obj, const butil::StringPiece& name) {
  2301. butil::intrusive_ptr<RtmpStreamBase> ptr;
  2302. if (AcquireStreamToSend(&ptr) != 0) {
  2303. return -1;
  2304. }
  2305. return ptr->SendMetaData(obj, name);
  2306. }
  2307. int RtmpRetryingClientStream::SendSharedObjectMessage(
  2308. const RtmpSharedObjectMessage& msg) {
  2309. butil::intrusive_ptr<RtmpStreamBase> ptr;
  2310. if (AcquireStreamToSend(&ptr) != 0) {
  2311. return -1;
  2312. }
  2313. return ptr->SendSharedObjectMessage(msg);
  2314. }
  2315. int RtmpRetryingClientStream::SendAudioMessage(const RtmpAudioMessage& msg) {
  2316. butil::intrusive_ptr<RtmpStreamBase> ptr;
  2317. if (AcquireStreamToSend(&ptr) != 0) {
  2318. return -1;
  2319. }
  2320. return ptr->SendAudioMessage(msg);
  2321. }
  2322. int RtmpRetryingClientStream::SendAACMessage(const RtmpAACMessage& msg) {
  2323. butil::intrusive_ptr<RtmpStreamBase> ptr;
  2324. if (AcquireStreamToSend(&ptr) != 0) {
  2325. return -1;
  2326. }
  2327. return ptr->SendAACMessage(msg);
  2328. }
  2329. int RtmpRetryingClientStream::SendVideoMessage(const RtmpVideoMessage& msg) {
  2330. butil::intrusive_ptr<RtmpStreamBase> ptr;
  2331. if (AcquireStreamToSend(&ptr) != 0) {
  2332. return -1;
  2333. }
  2334. return ptr->SendVideoMessage(msg);
  2335. }
  2336. int RtmpRetryingClientStream::SendAVCMessage(const RtmpAVCMessage& msg) {
  2337. butil::intrusive_ptr<RtmpStreamBase> ptr;
  2338. if (AcquireStreamToSend(&ptr) != 0) {
  2339. return -1;
  2340. }
  2341. return ptr->SendAVCMessage(msg);
  2342. }
  2343. void RtmpRetryingClientStream::StopCurrentStream() {
  2344. butil::intrusive_ptr<RtmpStreamBase> sub_stream;
  2345. {
  2346. BAIDU_SCOPED_LOCK(_stream_mutex);
  2347. sub_stream = _using_sub_stream;
  2348. }
  2349. if (sub_stream) {
  2350. sub_stream->SignalError();
  2351. }
  2352. }
  2353. void RtmpRetryingClientStream::OnPlayable() {}
  2354. butil::EndPoint RtmpRetryingClientStream::remote_side() const {
  2355. {
  2356. BAIDU_SCOPED_LOCK(_stream_mutex);
  2357. if (_using_sub_stream) {
  2358. return _using_sub_stream->remote_side();
  2359. }
  2360. }
  2361. return butil::EndPoint();
  2362. }
  2363. butil::EndPoint RtmpRetryingClientStream::local_side() const {
  2364. {
  2365. BAIDU_SCOPED_LOCK(_stream_mutex);
  2366. if (_using_sub_stream) {
  2367. return _using_sub_stream->local_side();
  2368. }
  2369. }
  2370. return butil::EndPoint();
  2371. }
  2372. // =========== RtmpService ===============
  2373. void RtmpService::OnPingResponse(const butil::EndPoint&, uint32_t) {
  2374. // TODO: put into some bvars?
  2375. }
  2376. RtmpServerStream::RtmpServerStream()
  2377. : RtmpStreamBase(false)
  2378. , _client_supports_stream_multiplexing(false)
  2379. , _is_publish(false)
  2380. , _onfail_id(INVALID_BTHREAD_ID) {
  2381. get_rtmp_bvars()->server_stream_count << 1;
  2382. }
  2383. RtmpServerStream::~RtmpServerStream() {
  2384. get_rtmp_bvars()->server_stream_count << -1;
  2385. }
  2386. void RtmpServerStream::Destroy() {
  2387. CHECK(false) << "You're not supposed to call Destroy() for server-side streams";
  2388. }
  2389. void RtmpServerStream::OnPlay(const RtmpPlayOptions& opt,
  2390. butil::Status* status,
  2391. google::protobuf::Closure* done) {
  2392. ClosureGuard done_guard(done);
  2393. status->set_error(EPERM, "%s[%u] ignored play{stream_name=%s start=%f"
  2394. " duration=%f reset=%d}",
  2395. butil::endpoint2str(remote_side()).c_str(), stream_id(),
  2396. opt.stream_name.c_str(), opt.start, opt.duration,
  2397. (int)opt.reset);
  2398. }
  2399. void RtmpServerStream::OnPlay2(const RtmpPlay2Options& opt) {
  2400. LOG(ERROR) << remote_side() << '[' << stream_id()
  2401. << "] ignored play2{" << opt.ShortDebugString() << '}';
  2402. }
  2403. void RtmpServerStream::OnPublish(const std::string& name,
  2404. RtmpPublishType type,
  2405. butil::Status* status,
  2406. google::protobuf::Closure* done) {
  2407. ClosureGuard done_guard(done);
  2408. status->set_error(EPERM, "%s[%u] ignored publish{stream_name=%s type=%s}",
  2409. butil::endpoint2str(remote_side()).c_str(), stream_id(),
  2410. name.c_str(), RtmpPublishType2Str(type));
  2411. }
  2412. int RtmpServerStream::OnSeek(double offset_ms) {
  2413. LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored seek("
  2414. << offset_ms << ")";
  2415. return -1;
  2416. }
  2417. int RtmpServerStream::OnPause(bool pause, double offset_ms) {
  2418. LOG(ERROR) << remote_side() << '[' << stream_id() << "] ignored "
  2419. << (pause ? "pause" : "unpause")
  2420. << "(offset_ms=" << offset_ms << ")";
  2421. return -1;
  2422. }
  2423. void RtmpServerStream::OnSetBufferLength(uint32_t /*buffer_length_ms*/) {}
  2424. int RtmpServerStream::SendStopMessage(const butil::StringPiece& error_desc) {
  2425. if (_rtmpsock == NULL) {
  2426. errno = EINVAL;
  2427. return -1;
  2428. }
  2429. if (FLAGS_rtmp_server_close_connection_on_error &&
  2430. !_client_supports_stream_multiplexing) {
  2431. _rtmpsock->SetFailed(EFAILEDSOCKET, "Close connection because %.*s",
  2432. (int)error_desc.size(), error_desc.data());
  2433. // The purpose is to close the connection, no matter what SetFailed()
  2434. // returns, the operation should be done.
  2435. LOG_IF(WARNING, FLAGS_log_error_text)
  2436. << "Close connection because " << error_desc;
  2437. return 0;
  2438. }
  2439. // Send StreamNotFound error to make the client close connections.
  2440. // Works for flashplayer and ffplay(not started playing), not work for SRS
  2441. // and ffplay(started playing)
  2442. butil::IOBuf req_buf;
  2443. RtmpInfo info;
  2444. {
  2445. butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
  2446. AMFOutputStream ostream(&zc_stream);
  2447. WriteAMFString(RTMP_AMF0_COMMAND_ON_STATUS, &ostream);
  2448. WriteAMFUint32(0, &ostream);
  2449. WriteAMFNull(&ostream);
  2450. if (_is_publish) {
  2451. // NetStream.Publish.Rejected does not work for ffmpeg, works for OBS.
  2452. // NetStream.Publish.BadName does not work for OBS.
  2453. // NetStream.Play.StreamNotFound is not accurate but works for both
  2454. // ffmpeg and OBS.
  2455. info.set_code(RTMP_STATUS_CODE_STREAM_NOT_FOUND);
  2456. } else {
  2457. info.set_code(RTMP_STATUS_CODE_STREAM_NOT_FOUND);
  2458. }
  2459. info.set_level(RTMP_INFO_LEVEL_ERROR);
  2460. if (!error_desc.empty()) {
  2461. info.set_description(error_desc.as_string());
  2462. }
  2463. WriteAMFObject(info, &ostream);
  2464. }
  2465. SocketMessagePtr<policy::RtmpUnsentMessage> msg(new policy::RtmpUnsentMessage);
  2466. msg->header.message_length = req_buf.size();
  2467. msg->header.message_type = policy::RTMP_MESSAGE_COMMAND_AMF0;
  2468. msg->header.stream_id = _message_stream_id;
  2469. msg->chunk_stream_id = _chunk_stream_id;
  2470. msg->body = req_buf;
  2471. if (policy::WriteWithoutOvercrowded(_rtmpsock.get(), msg) != 0) {
  2472. PLOG_IF(WARNING, errno != EFAILEDSOCKET)
  2473. << _rtmpsock->remote_side() << '[' << _message_stream_id
  2474. << "]: Fail to send " << info.code() << ": " << error_desc;
  2475. return -1;
  2476. }
  2477. LOG_IF(WARNING, FLAGS_log_error_text)
  2478. << _rtmpsock->remote_side() << '[' << _message_stream_id << "]: Sent "
  2479. << info.code() << ' ' << error_desc;
  2480. return 0;
  2481. }
  2482. // Call this method to send StreamDry to the client.
  2483. // Returns 0 on success, -1 otherwise.
  2484. int RtmpServerStream::SendStreamDry() {
  2485. char data[6];
  2486. char* p = data;
  2487. policy::WriteBigEndian2Bytes(&p, policy::RTMP_USER_CONTROL_EVENT_STREAM_DRY);
  2488. policy::WriteBigEndian4Bytes(&p, stream_id());
  2489. return SendControlMessage(policy::RTMP_MESSAGE_USER_CONTROL, data, sizeof(data));
  2490. }
  2491. int RtmpServerStream::RunOnFailed(bthread_id_t id, void* data, int) {
  2492. butil::intrusive_ptr<RtmpServerStream> stream(
  2493. static_cast<RtmpServerStream*>(data), false);
  2494. CHECK(stream->_rtmpsock);
  2495. stream->OnStopInternal();
  2496. bthread_id_unlock_and_destroy(id);
  2497. return 0;
  2498. }
  2499. void RtmpServerStream::OnStopInternal() {
  2500. if (_rtmpsock == NULL) {
  2501. return CallOnStop();
  2502. }
  2503. policy::RtmpContext* ctx =
  2504. static_cast<policy::RtmpContext*>(_rtmpsock->parsing_context());
  2505. if (ctx == NULL) {
  2506. LOG(FATAL) << _rtmpsock->remote_side() << ": RtmpContext of "
  2507. << *_rtmpsock << " is NULL";
  2508. return CallOnStop();
  2509. }
  2510. if (ctx->RemoveMessageStream(this)) {
  2511. return CallOnStop();
  2512. }
  2513. }
  2514. butil::StringPiece RemoveRtmpPrefix(const butil::StringPiece& url_in) {
  2515. if (!url_in.starts_with("rtmp://")) {
  2516. return url_in;
  2517. }
  2518. butil::StringPiece url = url_in;
  2519. size_t i = 7;
  2520. for (; i < url.size() && url[i] == '/'; ++i);
  2521. url.remove_prefix(i);
  2522. return url;
  2523. }
  2524. butil::StringPiece RemoveProtocolPrefix(const butil::StringPiece& url_in) {
  2525. size_t proto_pos = url_in.find("://");
  2526. if (proto_pos == butil::StringPiece::npos) {
  2527. return url_in;
  2528. }
  2529. butil::StringPiece url = url_in;
  2530. size_t i = proto_pos + 3;
  2531. for (; i < url.size() && url[i] == '/'; ++i);
  2532. url.remove_prefix(i);
  2533. return url;
  2534. }
  2535. void ParseRtmpHostAndPort(const butil::StringPiece& host_and_port,
  2536. butil::StringPiece* host,
  2537. butil::StringPiece* port) {
  2538. size_t colon_pos = host_and_port.find(':');
  2539. if (colon_pos == butil::StringPiece::npos) {
  2540. if (host) {
  2541. *host = host_and_port;
  2542. }
  2543. if (port) {
  2544. *port = "1935";
  2545. }
  2546. } else {
  2547. if (host) {
  2548. *host = host_and_port.substr(0, colon_pos);
  2549. }
  2550. if (port) {
  2551. *port = host_and_port.substr(colon_pos + 1);
  2552. }
  2553. }
  2554. }
  2555. butil::StringPiece RemoveQueryStrings(const butil::StringPiece& stream_name_in,
  2556. butil::StringPiece* query_strings) {
  2557. const size_t qm_pos = stream_name_in.find('?');
  2558. if (qm_pos == butil::StringPiece::npos) {
  2559. if (query_strings) {
  2560. query_strings->clear();
  2561. }
  2562. return stream_name_in;
  2563. } else {
  2564. if (query_strings) {
  2565. *query_strings = stream_name_in.substr(qm_pos + 1);
  2566. }
  2567. return stream_name_in.substr(0, qm_pos);
  2568. }
  2569. }
  2570. // Split vhost from *app in forms of "APP?vhost=..." and overwrite *host.
  2571. static void SplitVHostFromApp(const butil::StringPiece& app_and_vhost,
  2572. butil::StringPiece* app,
  2573. butil::StringPiece* vhost) {
  2574. const size_t q_pos = app_and_vhost.find('?');
  2575. if (q_pos == butil::StringPiece::npos) {
  2576. if (app) {
  2577. *app = app_and_vhost;
  2578. }
  2579. if (vhost) {
  2580. vhost->clear();
  2581. }
  2582. return;
  2583. }
  2584. if (app) {
  2585. *app = app_and_vhost.substr(0, q_pos);
  2586. }
  2587. if (vhost) {
  2588. butil::StringPiece qstr = app_and_vhost.substr(q_pos + 1);
  2589. butil::StringSplitter sp(qstr.data(), qstr.data() + qstr.size(), '&');
  2590. for (; sp; ++sp) {
  2591. butil::StringPiece field(sp.field(), sp.length());
  2592. if (field.starts_with("vhost=")) {
  2593. *vhost = field.substr(6);
  2594. // vhost cannot have port.
  2595. const size_t colon_pos = vhost->find_last_of(':');
  2596. if (colon_pos != butil::StringPiece::npos) {
  2597. vhost->remove_suffix(vhost->size() - colon_pos);
  2598. }
  2599. return;
  2600. }
  2601. }
  2602. vhost->clear();
  2603. }
  2604. }
  2605. void ParseRtmpURL(const butil::StringPiece& rtmp_url_in,
  2606. butil::StringPiece* host,
  2607. butil::StringPiece* vhost,
  2608. butil::StringPiece* port,
  2609. butil::StringPiece* app,
  2610. butil::StringPiece* stream_name) {
  2611. if (stream_name) {
  2612. stream_name->clear();
  2613. }
  2614. butil::StringPiece rtmp_url = RemoveRtmpPrefix(rtmp_url_in);
  2615. size_t slash1_pos = rtmp_url.find_first_of('/');
  2616. if (slash1_pos == butil::StringPiece::npos) {
  2617. if (host || port) {
  2618. ParseRtmpHostAndPort(rtmp_url, host, port);
  2619. }
  2620. if (app) {
  2621. app->clear();
  2622. }
  2623. return;
  2624. }
  2625. if (host || port) {
  2626. ParseRtmpHostAndPort(rtmp_url.substr(0, slash1_pos), host, port);
  2627. }
  2628. // Remove duplicated slashes.
  2629. for (++slash1_pos; slash1_pos < rtmp_url.size() &&
  2630. rtmp_url[slash1_pos] == '/'; ++slash1_pos);
  2631. rtmp_url.remove_prefix(slash1_pos);
  2632. size_t slash2_pos = rtmp_url.find_first_of('/');
  2633. if (slash2_pos == butil::StringPiece::npos) {
  2634. return SplitVHostFromApp(rtmp_url, app, vhost);
  2635. }
  2636. SplitVHostFromApp(rtmp_url.substr(0, slash2_pos), app, vhost);
  2637. if (stream_name != NULL) {
  2638. // Remove duplicated slashes.
  2639. for (++slash2_pos; slash2_pos < rtmp_url.size() &&
  2640. rtmp_url[slash2_pos] == '/'; ++slash2_pos);
  2641. rtmp_url.remove_prefix(slash2_pos);
  2642. *stream_name = rtmp_url;
  2643. }
  2644. }
  2645. std::string MakeRtmpURL(const butil::StringPiece& host,
  2646. const butil::StringPiece& port,
  2647. const butil::StringPiece& app,
  2648. const butil::StringPiece& stream_name) {
  2649. std::string result;
  2650. result.reserve(15 + host.size() + app.size() + stream_name.size());
  2651. result.append("rtmp://");
  2652. result.append(host.data(), host.size());
  2653. if (!port.empty()) {
  2654. result.push_back(':');
  2655. result.append(port.data(), port.size());
  2656. }
  2657. if (!app.empty()) {
  2658. result.push_back('/');
  2659. result.append(app.data(), app.size());
  2660. }
  2661. if (!stream_name.empty()) {
  2662. if (app.empty()) { // extra / to notify user that app is empty.
  2663. result.push_back('/');
  2664. }
  2665. result.push_back('/');
  2666. result.append(stream_name.data(), stream_name.size());
  2667. }
  2668. return result;
  2669. }
  2670. } // namespace brpc