From 3d225973efd6fcc04d26e3822478423b18ec6f95 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 23 Apr 2021 10:42:15 +0800 Subject: [PATCH] Bridger: Support RTC2RTMP bridger and shared FastTimer. 4.0.95 --- README.md | 1 + trunk/conf/full.conf | 9 + trunk/src/app/srs_app_config.cpp | 43 +- trunk/src/app/srs_app_config.hpp | 2 + trunk/src/app/srs_app_listener.cpp | 4 + trunk/src/app/srs_app_rtc_conn.cpp | 22 + trunk/src/app/srs_app_rtc_source.cpp | 523 ++++++++++++++++++++++++ trunk/src/app/srs_app_rtc_source.hpp | 67 ++- trunk/src/app/srs_app_source.hpp | 2 +- trunk/src/core/srs_core_version4.hpp | 2 +- trunk/src/kernel/srs_kernel_rtc_rtp.cpp | 27 ++ trunk/src/kernel/srs_kernel_rtc_rtp.hpp | 2 + 12 files changed, 700 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index c273502fba..2b56066842 100755 --- a/README.md +++ b/README.md @@ -157,6 +157,7 @@ Other important wiki: ## V4 changes +* v5.0, 2021-04-20, Support RTC2RTMP bridger and shared FastTimer. 4.0.95 * v5.0, 2021-04-20, Refine transcoder to support aac2opus and opus2aac. 4.0.94 * v4.0, 2021-05-01, Timer: Extract and apply shared FastTimer. 4.0.93 * v4.0, 2021-04-29, RTC: Support AV1 for Chrome M90. 4.0.91 diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 978b237d08..d894ed62e1 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -571,6 +571,15 @@ vhost rtc.vhost.srs.com { # discard Discard aac audio packet. # default: transcode aac transcode; + ############################################################### + # For transmuxing RTC to RTMP. + # Whether trans-mux RTC to RTMP streaming. + # Default: off + rtc_to_rtmp off; + # The PLI interval in seconds, for RTC to RTMP. + # Note the available range is [0.5, 30] + # Default: 6.0 + pli_for_rtmp 6.0; } ############################################################### # For transmuxing RTMP to RTC, it will impact the default values if RTC is on. diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 1428814966..8944954f5c 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3940,7 +3940,8 @@ srs_error_t SrsConfig::check_normal_config() for (int j = 0; j < (int)conf->directives.size(); j++) { string m = conf->at(j)->name; if (m != "enabled" && m != "bframe" && m != "aac" && m != "stun_timeout" && m != "stun_strict_check" - && m != "dtls_role" && m != "dtls_version" && m != "drop_for_pt") { + && m != "dtls_role" && m != "dtls_version" && m != "drop_for_pt" && m != "rtc_to_rtmp" + && m != "pli_for_rtmp") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.rtc.%s of %s", m.c_str(), vhost->arg0().c_str()); } } @@ -5226,6 +5227,46 @@ int SrsConfig::get_rtc_drop_for_pt(string vhost) return ::atoi(conf->arg0().c_str()); } +bool SrsConfig::get_rtc_to_rtmp(string vhost) +{ + static bool DEFAULT = false; + + SrsConfDirective* conf = get_rtc(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("rtc_to_rtmp"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + +srs_utime_t SrsConfig::get_rtc_pli_for_rtmp(string vhost) +{ + static srs_utime_t DEFAULT = 6 * SRS_UTIME_SECONDS; + + SrsConfDirective* conf = get_rtc(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("pli_for_rtmp"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + srs_utime_t v = (srs_utime_t)(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS); + if (v < 500 * SRS_UTIME_MILLISECONDS || v > 30 * SRS_UTIME_SECONDS) { + srs_warn("Reset pli %dms to %dms", srsu2msi(v), srsu2msi(DEFAULT)); + return DEFAULT; + } + + return v; +} + bool SrsConfig::get_rtc_nack_enabled(string vhost) { static bool DEFAULT = true; diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 0e31bcb6da..95d507bd75 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -562,6 +562,8 @@ class SrsConfig std::string get_rtc_dtls_role(std::string vhost); std::string get_rtc_dtls_version(std::string vhost); int get_rtc_drop_for_pt(std::string vhost); + bool get_rtc_to_rtmp(std::string vhost); + srs_utime_t get_rtc_pli_for_rtmp(std::string vhost); bool get_rtc_nack_enabled(std::string vhost); bool get_rtc_nack_no_copy(std::string vhost); bool get_rtc_twcc_enabled(std::string vhost); diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index c38609c9f5..0037a6d9c7 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -536,6 +536,10 @@ srs_error_t SrsUdpMuxListener::listen() srs_freep(trd); trd = new SrsSTCoroutine("udp", this, cid); + + //change stack size to 256K, fix crash when call some 3rd-part api. + ((SrsSTCoroutine*)trd)->set_stack_size(1 << 18); + if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "start thread"); } diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index c220f0f833..547ef0aa60 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -998,6 +998,28 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescripti } source->set_publish_stream(this); + // Bridge to rtmp + bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req->vhost); + if (rtc_to_rtmp) { + SrsSource *rtmp = NULL; + if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + // TODO: FIMXE: Check it in SrsRtcConnection::add_publisher? + if (!rtmp->can_publish(false)) { + return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp stream %s busy", r->get_stream_url().c_str()); + } + + SrsRtmpFromRtcBridger *bridger = new SrsRtmpFromRtcBridger(rtmp); + if ((err = bridger->initialize(r)) != srs_success) { + srs_freep(bridger); + return srs_error_wrap(err, "create bridger"); + } + + source->set_bridger(bridger); + } + return err; } diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 8570252bec..344d421683 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -328,6 +328,14 @@ ISrsRtcStreamEventHandler::~ISrsRtcStreamEventHandler() { } +ISrsRtcSourceBridger::ISrsRtcSourceBridger() +{ +} + +ISrsRtcSourceBridger::~ISrsRtcSourceBridger() +{ +} + SrsRtcStream::SrsRtcStream() { is_created_ = false; @@ -337,6 +345,7 @@ SrsRtcStream::SrsRtcStream() stream_desc_ = NULL; req = NULL; + bridger_ = NULL; } SrsRtcStream::~SrsRtcStream() @@ -346,6 +355,7 @@ SrsRtcStream::~SrsRtcStream() consumers.clear(); srs_freep(req); + srs_freep(bridger_); srs_freep(stream_desc_); } @@ -406,6 +416,12 @@ SrsContextId SrsRtcStream::pre_source_id() return _pre_source_id; } +void SrsRtcStream::set_bridger(ISrsRtcSourceBridger *bridger) +{ + srs_freep(bridger_); + bridger_ = bridger; +} + srs_error_t SrsRtcStream::create_consumer(SrsRtcConsumer*& consumer) { srs_error_t err = srs_success; @@ -475,6 +491,17 @@ srs_error_t SrsRtcStream::on_publish() return srs_error_wrap(err, "source id change"); } + // If bridge to other source, handle event and start timer to request PLI. + if (bridger_) { + if ((err = bridger_->on_publish()) != srs_success) { + return srs_error_wrap(err, "bridger on publish"); + } + + // For SrsRtcStream::on_timer() + srs_utime_t pli_for_rtmp = _srs_config->get_rtc_pli_for_rtmp(req->vhost); + _srs_hybrid->timer()->subscribe(pli_for_rtmp, this); + } + // TODO: FIXME: Handle by statistic. return err; @@ -502,6 +529,15 @@ void SrsRtcStream::on_unpublish() h->on_unpublish(); } + //free bridger resource + if (bridger_) { + // For SrsRtcStream::on_timer() + _srs_hybrid->timer()->unsubscribe(this); + + bridger_->on_unpublish(); + srs_freep(bridger_); + } + // release unpublish stream description. set_stream_desc(NULL); @@ -545,6 +581,10 @@ srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket2* pkt) } } + if (bridger_ && (err = bridger_->on_rtp(pkt)) != srs_success) { + return srs_error_wrap(err, "bridger consume message"); + } + return err; } @@ -586,6 +626,22 @@ std::vector SrsRtcStream::get_track_desc(std::string ty return track_descs; } +srs_error_t SrsRtcStream::on_timer(srs_utime_t interval, srs_utime_t tick) +{ + srs_error_t err = srs_success; + + if (!publish_stream_) { + return err; + } + + for (int i = 0; i < (int)stream_desc_->video_track_descs_.size(); i++) { + SrsRtcTrackDescription* desc = stream_desc_->video_track_descs_.at(i); + publish_stream_->request_keyframe(desc->ssrc_); + } + + return err; +} + SrsRtpPacketCacheHelper::SrsRtpPacketCacheHelper() { pkt = _srs_rtp_cache->allocate(); @@ -1172,6 +1228,473 @@ srs_error_t SrsRtcFromRtmpBridger::consume_packets(vectorinitialize(from, to, channels, sample_rate, bitrate)) != srs_success) { + return srs_error_wrap(err, "bridge initialize"); + } + + if ((err = format->initialize()) != srs_success) { + return srs_error_wrap(err, "format initialize"); + } + + return err; +} + +srs_error_t SrsRtmpFromRtcBridger::on_publish() +{ + srs_error_t err = srs_success; + + is_first_audio = true; + is_first_video = true; + + // TODO: FIXME: Should sync with bridger? + if ((err = source_->on_publish()) != srs_success) { + return srs_error_wrap(err, "source publish"); + } + + return err; +} + +srs_error_t SrsRtmpFromRtcBridger::on_rtp(SrsRtpPacket2 *pkt) +{ + srs_error_t err = srs_success; + + if (!pkt->payload()) { + return err; + } + + if (pkt->is_audio()) { + err = trancode_audio(pkt); + } else { + err = packet_video(pkt); + } + + return err; +} + +void SrsRtmpFromRtcBridger::on_unpublish() +{ + // TODO: FIXME: Should sync with bridger? + source_->on_unpublish(); +} + +srs_error_t SrsRtmpFromRtcBridger::trancode_audio(SrsRtpPacket2 *pkt) +{ + srs_error_t err = srs_success; + + // to common message. + uint32_t ts = pkt->header.get_timestamp()/(48000/1000); + if (is_first_audio) { + int header_len = 0; + uint8_t* header = NULL; + codec_->aac_codec_header(&header, &header_len); + + SrsCommonMessage out_rtmp; + packet_aac(&out_rtmp, (char *)header, header_len, ts, is_first_audio); + + if ((err = source_->on_audio(&out_rtmp)) != srs_success) { + return srs_error_wrap(err, "source on audio"); + } + + is_first_audio = false; + } + + std::vector out_pkts; + SrsRtpRawPayload *payload = dynamic_cast(pkt->payload()); + + SrsAudioFrame frame; + frame.add_sample(payload->payload, payload->nn_payload); + frame.dts = ts; + frame.cts = 0; + + err = codec_->transcode(&frame, out_pkts); + if (err != srs_success) { + return err; + } + + for (std::vector::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) { + SrsCommonMessage out_rtmp; + out_rtmp.header.timestamp = (*it)->dts*(48000/1000); + packet_aac(&out_rtmp, (*it)->samples[0].bytes, (*it)->samples[0].size, ts, is_first_audio); + + if ((err = source_->on_audio(&out_rtmp)) != srs_success) { + err = srs_error_wrap(err, "source on audio"); + break; + } + } + codec_->free_frames(out_pkts); + + return err; +} + +void SrsRtmpFromRtcBridger::packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header) +{ + int rtmp_len = len + 2; + audio->header.initialize_audio(rtmp_len, pts, 1); + audio->create_payload(rtmp_len); + SrsBuffer stream(audio->payload, rtmp_len); + uint8_t aac_flag = (SrsAudioCodecIdAAC << 4) | (SrsAudioSampleRate44100 << 2) | (SrsAudioSampleBits16bit << 1) | SrsAudioChannelsStereo; + stream.write_1bytes(aac_flag); + if (is_header) { + stream.write_1bytes(0); + } else { + stream.write_1bytes(1); + } + stream.write_bytes(data, len); + audio->size = rtmp_len; +} + +srs_error_t SrsRtmpFromRtcBridger::packet_video(SrsRtpPacket2* src) +{ + srs_error_t err = srs_success; + + // TODO: Only copy when need + SrsRtpPacket2* pkt = src->copy(); + + if (pkt->is_keyframe()) { + return packet_video_key_frame(pkt); + } + + // store in cache + int index = cache_index(pkt->header.get_sequence()); + cache_video_pkts_[index].in_use = true; + cache_video_pkts_[index].pkt = pkt; + cache_video_pkts_[index].sn = pkt->header.get_sequence(); + cache_video_pkts_[index].ts = pkt->header.get_timestamp(); + + // check whether to recovery lost packet and can construct a video frame + if (lost_sn_ == pkt->header.get_sequence()) { + uint16_t tail_sn = 0; + int sn = find_next_lost_sn(lost_sn_, tail_sn); + if (-1 == sn ) { + if (check_frame_complete(header_sn_, tail_sn)) { + if ((err = packet_video_rtmp(header_sn_, tail_sn)) != srs_success) { + err = srs_error_wrap(err, "fail to pack video frame"); + } + } + } else if (-2 == sn) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "video cache is overflow"); + } else { + lost_sn_ = (uint16_t)sn; + } + } + + return err; +} + +srs_error_t SrsRtmpFromRtcBridger::packet_video_key_frame(SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + // TODO: handle sps and pps in 2 rtp packets + SrsRtpSTAPPayload* stap_payload = dynamic_cast(pkt->payload()); + if (stap_payload) { + SrsSample* sps = stap_payload->get_sps(); + SrsSample* pps = stap_payload->get_pps(); + if (NULL == sps || NULL == pps) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "no sps or pps in stap-a rtp. sps: %p, pps:%p", sps, pps); + } else { + //type_codec1 + avc_type + composition time + fix header + count of sps + len of sps + sps + count of pps + len of pps + pps + int nb_payload = 1 + 1 + 3 + 5 + 1 + 2 + sps->size + 1 + 2 + pps->size; + SrsCommonMessage rtmp; + rtmp.header.initialize_video(nb_payload, pkt->header.get_timestamp() / 90, 1); + rtmp.create_payload(nb_payload); + rtmp.size = nb_payload; + SrsBuffer payload(rtmp.payload, rtmp.size); + //TODO: call api + payload.write_1bytes(0x17);// type(4 bits): key frame; code(4bits): avc + payload.write_1bytes(0x0); // avc_type: sequence header + payload.write_1bytes(0x0); // composition time + payload.write_1bytes(0x0); + payload.write_1bytes(0x0); + payload.write_1bytes(0x01); // version + payload.write_1bytes(sps->bytes[1]); + payload.write_1bytes(sps->bytes[2]); + payload.write_1bytes(sps->bytes[3]); + payload.write_1bytes(0xff); + payload.write_1bytes(0xe1); + payload.write_2bytes(sps->size); + payload.write_bytes(sps->bytes, sps->size); + payload.write_1bytes(0x01); + payload.write_2bytes(pps->size); + payload.write_bytes(pps->bytes, pps->size); + if ((err = source_->on_video(&rtmp)) != srs_success) { + return err; + } + } + } + + if (-1 == key_frame_ts_) { + key_frame_ts_ = pkt->header.get_timestamp(); + header_sn_ = pkt->header.get_sequence(); + lost_sn_ = header_sn_ + 1; + // Received key frame and clean cache of old p frame pkts + clear_cached_video(); + srs_trace("set ts=%lld, header=%hu, lost=%hu", key_frame_ts_, header_sn_, lost_sn_); + } else if (key_frame_ts_ != pkt->header.get_timestamp()) { + //new key frame, clean cache + int64_t old_ts = key_frame_ts_; + uint16_t old_header_sn = header_sn_; + uint16_t old_lost_sn = lost_sn_; + key_frame_ts_ = pkt->header.get_timestamp(); + header_sn_ = pkt->header.get_sequence(); + lost_sn_ = header_sn_ + 1; + clear_cached_video(); + srs_trace("drop old ts=%lld, header=%hu, lost=%hu, set new ts=%lld, header=%hu, lost=%hu", + old_ts, old_header_sn, old_lost_sn, key_frame_ts_, header_sn_, lost_sn_); + } + + uint16_t index = cache_index(pkt->header.get_sequence()); + cache_video_pkts_[index].in_use = true; + cache_video_pkts_[index].pkt = pkt; + cache_video_pkts_[index].sn = pkt->header.get_sequence(); + cache_video_pkts_[index].ts = pkt->header.get_timestamp(); + + int32_t sn = lost_sn_; + uint16_t tail_sn = 0; + if (srs_rtp_seq_distance(header_sn_, pkt->header.get_sequence()) < 0){ + // When receive previous pkt in the same frame, update header sn; + header_sn_ = pkt->header.get_sequence(); + sn = find_next_lost_sn(header_sn_, tail_sn); + } else if (lost_sn_ == pkt->header.get_sequence()) { + sn = find_next_lost_sn(lost_sn_, tail_sn); + } + + if (-1 == sn) { + if (check_frame_complete(header_sn_, tail_sn)) { + if ((err = packet_video_rtmp(header_sn_, tail_sn)) != srs_success) { + err = srs_error_wrap(err, "fail to packet key frame"); + } + } + } else if (-2 == sn) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "video cache is overflow"); + } else { + lost_sn_ = (uint16_t)sn; + } + + return err; +} + +srs_error_t SrsRtmpFromRtcBridger::packet_video_rtmp(const uint16_t start, const uint16_t end) +{ + srs_error_t err = srs_success; + + //type_codec1 + avc_type + composition time + nalu size + nalu + int nb_payload = 1 + 1 + 3; + uint16_t cnt = end - start + 1; + + for (uint16_t i = 0; i < cnt; ++i) { + uint16_t sn = start + i; + uint16_t index = cache_index(sn); + SrsRtpPacket2* pkt = cache_video_pkts_[index].pkt; + // calculate nalu len + SrsRtpFUAPayload2* fua_payload = dynamic_cast(pkt->payload()); + if (fua_payload) { + if (fua_payload->start) { + nb_payload += 1 + 4; + } + nb_payload += fua_payload->size; + continue; + } + + SrsRtpSTAPPayload* stap_payload = dynamic_cast(pkt->payload()); + if (stap_payload) { + for (int j = 0; j < stap_payload->nalus.size(); ++j) { + SrsSample* sample = stap_payload->nalus.at(j); + nb_payload += 4 + sample->size; + } + continue; + } + + SrsRtpRawPayload* raw_payload = dynamic_cast(pkt->payload()); + if (raw_payload) { + nb_payload += 4 + raw_payload->nn_payload; + continue; + } + } + + SrsCommonMessage rtmp; + SrsRtpPacket2* header = cache_video_pkts_[cache_index(start)].pkt; + rtmp.header.initialize_video(nb_payload, header->header.get_timestamp() / 90, 1); + rtmp.create_payload(nb_payload); + rtmp.size = nb_payload; + SrsBuffer payload(rtmp.payload, rtmp.size); + if (header->is_keyframe()) { + payload.write_1bytes(0x17); // type(4 bits): key frame; code(4bits): avc + key_frame_ts_ = -1; + } else { + payload.write_1bytes(0x27); // type(4 bits): inter frame; code(4bits): avc + } + payload.write_1bytes(0x01); // avc_type: nalu + payload.write_1bytes(0x0); // composition time + payload.write_1bytes(0x0); + payload.write_1bytes(0x0); + + int nalu_len = 0; + for (uint16_t i = 0; i < cnt; ++i) { + uint16_t index = cache_index((start + i)); + SrsRtpPacket2* pkt = cache_video_pkts_[index].pkt; + cache_video_pkts_[index].in_use = false; + cache_video_pkts_[index].pkt = NULL; + cache_video_pkts_[index].ts = 0; + cache_video_pkts_[index].sn = 0; + + SrsRtpFUAPayload2* fua_payload = dynamic_cast(pkt->payload()); + if (fua_payload) { + if (fua_payload->start) { + nalu_len = fua_payload->size + 1; + //skip 4 bytes to write nalu_len future + payload.skip(4); + payload.write_1bytes(fua_payload->nri | fua_payload->nalu_type); + payload.write_bytes(fua_payload->payload, fua_payload->size); + } else { + nalu_len += fua_payload->size; + payload.write_bytes(fua_payload->payload, fua_payload->size); + if (fua_payload->end) { + //write nalu_len back + payload.skip(-(4 + nalu_len)); + payload.write_4bytes(nalu_len); + payload.skip(nalu_len); + } + } + srs_freep(pkt); + continue; + } + + SrsRtpSTAPPayload* stap_payload = dynamic_cast(pkt->payload()); + if (stap_payload) { + for (int j = 0; j < stap_payload->nalus.size(); ++j) { + SrsSample* sample = stap_payload->nalus.at(j); + payload.write_4bytes(sample->size); + payload.write_bytes(sample->bytes, sample->size); + } + srs_freep(pkt); + continue; + } + + SrsRtpRawPayload* raw_payload = dynamic_cast(pkt->payload()); + if (raw_payload) { + payload.write_4bytes(raw_payload->nn_payload); + payload.write_bytes(raw_payload->payload, raw_payload->nn_payload); + srs_freep(pkt); + continue; + } + + srs_freep(pkt); + } + + if ((err = source_->on_video(&rtmp)) != srs_success) { + srs_warn("fail to pack video frame"); + } + + header_sn_ = end + 1; + uint16_t tail_sn = 0; + int sn = find_next_lost_sn(header_sn_, tail_sn); + if (-1 == sn) { + if (check_frame_complete(header_sn_, tail_sn)) { + err = packet_video_rtmp(header_sn_, tail_sn); + } + } else if (-2 == sn) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "video cache is overflow"); + } else { + lost_sn_ = sn; + } + + return err; +} + +int32_t SrsRtmpFromRtcBridger::find_next_lost_sn(uint16_t current_sn, uint16_t& end_sn) +{ + uint32_t last_ts = cache_video_pkts_[cache_index(header_sn_)].ts; + for (int i = 0; i < s_cache_size; ++i) { + uint16_t lost_sn = current_sn + i; + int index = cache_index(lost_sn); + + if (!cache_video_pkts_[index].in_use) { + return lost_sn; + } + //check time first, avoid two small frame mixed case decode fail + if (last_ts != cache_video_pkts_[index].ts) { + end_sn = lost_sn - 1; + return -1; + } + + if (cache_video_pkts_[index].pkt->header.get_marker()) { + end_sn = lost_sn; + return -1; + } + } + + srs_error("the cache is mess. the packet count of video frame is more than %u", s_cache_size); + return -2; +} + +void SrsRtmpFromRtcBridger::clear_cached_video() +{ + for (size_t i = 0; i < s_cache_size; i++) + { + if (cache_video_pkts_[i].in_use) { + srs_freep(cache_video_pkts_[i].pkt); + cache_video_pkts_[i].sn = 0; + cache_video_pkts_[i].ts = 0; + cache_video_pkts_[i].in_use = false; + } + } +} + +bool SrsRtmpFromRtcBridger::check_frame_complete(const uint16_t start, const uint16_t end) +{ + uint16_t cnt = (end - start + 1); + uint16_t fu_s_c = 0; + uint16_t fu_e_c = 0; + for (uint16_t i = 0; i < cnt; ++i) { + int index = cache_index((start + i)); + SrsRtpPacket2* pkt = cache_video_pkts_[index].pkt; + SrsRtpFUAPayload2* fua_payload = dynamic_cast(pkt->payload()); + if (fua_payload) { + if (fua_payload->start) { + ++fu_s_c; + } + + if (fua_payload->end) { + ++fu_e_c; + } + } + } + + return fu_s_c == fu_e_c; +} #endif SrsCodecPayload::SrsCodecPayload() diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index a7eb4abfc5..2543daa6fd 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -163,8 +163,20 @@ class ISrsRtcStreamEventHandler virtual void on_consumers_finished() = 0; }; +// SrsRtcStream bridge to SrsSource +class ISrsRtcSourceBridger +{ +public: + ISrsRtcSourceBridger(); + virtual ~ISrsRtcSourceBridger(); +public: + virtual srs_error_t on_publish() = 0; + virtual srs_error_t on_rtp(SrsRtpPacket2 *pkt) = 0; + virtual void on_unpublish() = 0; +}; + // A Source is a stream, to publish and to play with, binding to SrsRtcPublishStream and SrsRtcPlayStream. -class SrsRtcStream +class SrsRtcStream : public ISrsFastTimer { private: // For publish, it's the publish client id. @@ -178,6 +190,8 @@ class SrsRtcStream ISrsRtcPublishStream* publish_stream_; // Steam description for this steam. SrsRtcStreamDescription* stream_desc_; + // The Source bridger, bridger stream to other source. + ISrsRtcSourceBridger* bridger_; private: // To delivery stream to clients. std::vector consumers; @@ -201,6 +215,8 @@ class SrsRtcStream // Get current source id. virtual SrsContextId source_id(); virtual SrsContextId pre_source_id(); +public: + void set_bridger(ISrsRtcSourceBridger *bridger); public: // Create consumer // @param consumer, output the create consumer. @@ -234,6 +250,9 @@ class SrsRtcStream bool has_stream_desc(); void set_stream_desc(SrsRtcStreamDescription* stream_desc); std::vector get_track_desc(std::string type, std::string media_type); +// interface ISrsFastTimer +private: + srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick); }; // A helper class, to release the packet to cache. @@ -286,6 +305,52 @@ class SrsRtcFromRtmpBridger : public ISrsSourceBridger srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector& helpers); srs_error_t consume_packets(std::vector& helpers); }; + +class SrsRtmpFromRtcBridger : public ISrsRtcSourceBridger +{ +private: + SrsSource *source_; + SrsAudioTranscoder *codec_; + bool is_first_audio; + bool is_first_video; + // The format, codec information. + SrsRtmpFormat* format; + + //TODO:use SrsRtpRingBuffer + //TODO:jitter buffer class + struct RtcPacketCache { + bool in_use; + uint16_t sn; + uint32_t ts; + SrsRtpPacket2* pkt; + }; + const static uint16_t s_cache_size = 512; + RtcPacketCache cache_video_pkts_[s_cache_size]; + uint16_t header_sn_; + uint16_t lost_sn_; + int64_t key_frame_ts_; +public: + SrsRtmpFromRtcBridger(SrsSource *src); + virtual ~SrsRtmpFromRtcBridger(); +public: + srs_error_t initialize(SrsRequest* r); +public: + virtual srs_error_t on_publish(); + virtual srs_error_t on_rtp(SrsRtpPacket2 *pkt); + virtual void on_unpublish(); +private: + srs_error_t trancode_audio(SrsRtpPacket2 *pkt); + void packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header); + srs_error_t packet_video(SrsRtpPacket2* pkt); + srs_error_t packet_video_key_frame(SrsRtpPacket2* pkt); + srs_error_t packet_video_rtmp(const uint16_t start, const uint16_t end); + int32_t find_next_lost_sn(uint16_t current_sn, uint16_t& end_sn); + void clear_cached_video(); + inline uint16_t cache_index(uint16_t current_sn) { + return current_sn%s_cache_size; + } + bool check_frame_complete(const uint16_t start, const uint16_t end); +}; #endif // TODO: FIXME: Rename it. diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 5244bbbfdc..4a59058951 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -487,7 +487,7 @@ class SrsSourceManager : public ISrsHourGlass // Global singleton instance. extern SrsSourceManager* _srs_sources; -// For two sources to bridge with each other. +// For RTMP2RTC, bridge SrsSource to SrsRtcStream class ISrsSourceBridger { public: diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index a4c3312f94..64f88dea16 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -26,6 +26,6 @@ #define VERSION_MAJOR 4 #define VERSION_MINOR 0 -#define VERSION_REVISION 94 +#define VERSION_REVISION 95 #endif diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp index bcc04d766e..5474d9106f 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp @@ -1054,6 +1054,33 @@ srs_error_t SrsRtpPacket2::decode(SrsBuffer* buf) return err; } +bool SrsRtpPacket2::is_keyframe() +{ + // False if audio packet + if(SrsFrameTypeAudio == frame_type) { + return false; + } + + // It's normal H264 video rtp packet + if (nalu_type == kStapA) { + SrsRtpSTAPPayload* stap_payload = dynamic_cast(payload_); + if(NULL != stap_payload->get_sps() || NULL != stap_payload->get_pps()) { + return true; + } + } else if (nalu_type == kFuA) { + SrsRtpFUAPayload2* fua_payload = dynamic_cast(payload_); + if(SrsAvcNaluTypeIDR == fua_payload->nalu_type) { + return true; + } + } else { + if((SrsAvcNaluTypeIDR == nalu_type) || (SrsAvcNaluTypeSPS == nalu_type) || (SrsAvcNaluTypePPS == nalu_type)) { + return true; + } + } + + return false; +} + SrsRtpObjectCacheManager* _srs_rtp_cache = new SrsRtpObjectCacheManager(sizeof(SrsRtpPacket2)); SrsRtpObjectCacheManager* _srs_rtp_raw_cache = new SrsRtpObjectCacheManager(sizeof(SrsRtpRawPayload)); SrsRtpObjectCacheManager* _srs_rtp_fua_cache = new SrsRtpObjectCacheManager(sizeof(SrsRtpFUAPayload2)); diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp index c0da94f217..b00135e858 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp @@ -353,6 +353,8 @@ class SrsRtpPacket2 virtual uint64_t nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); virtual srs_error_t decode(SrsBuffer* buf); +public: + bool is_keyframe(); }; // For object cache manager to stat the object dropped.