diff --git a/src/forward/forward_internal.rs b/src/forward/forward_internal.rs index 573ab991..f80996a5 100644 --- a/src/forward/forward_internal.rs +++ b/src/forward/forward_internal.rs @@ -126,6 +126,66 @@ impl PeerForwardInternal { } } + pub(crate) async fn add_ice_candidate( + &self, + key: String, + ice_candidates: Vec, + ) -> Result<()> { + let mut peers = self.subscribe_group.read().await.clone(); + let anchor = self.anchor.read().await.as_ref().cloned(); + if let Some(anchor) = anchor { + peers.push(PeerWrap(anchor)) + } + let mut peers: Vec = peers.into_iter().filter(|p| p.get_key() == key).collect(); + if peers.len() != 1 { + return Err(anyhow::anyhow!("find key peers size : {}", peers.len())); + } + let peer = peers.pop().unwrap(); + for ice_candidate in ice_candidates { + peer.0.add_ice_candidate(ice_candidate).await?; + } + Ok(()) + } + + pub(crate) async fn remove_peer(&self, key: String) -> Result { + let anchor = self.anchor.read().await; + if let Some(anchor) = anchor.as_ref() { + if get_peer_key(anchor.clone()) == key { + anchor.close().await?; + return Ok(true); + } + } + drop(anchor); + let peers = self.subscribe_group.read().await; + for peer in peers.iter() { + if peer.get_key() == key { + peer.0.close().await?; + break; + } + } + Ok(false) + } + + async fn peer_send_rtcp( + peer: Weak, + media_ssrc: u32, + mut recv: Receiver, + ) { + while let (Some(rtcp_message), Some(pc)) = (recv.recv().await, peer.upgrade()) { + debug!("ssrc : {} ,send rtcp : {:?}", media_ssrc, rtcp_message); + if pc + .write_rtcp(&[rtcp_message.to_rtcp_packet(media_ssrc)]) + .await + .is_err() + { + break; + } + } + } +} + +// anchor +impl PeerForwardInternal { pub(crate) async fn anchor_is_some(&self) -> bool { let anchor = self.anchor.read().await; anchor.is_some() @@ -136,17 +196,17 @@ impl PeerForwardInternal { let anchor_track_forward_map = self.anchor_track_forward_map.read().await; anchor.is_some() && anchor_track_forward_map.len() - == media::count_sends( - &anchor - .as_ref() - .unwrap() - .remote_description() - .await - .unwrap() - .unmarshal() - .unwrap() - .media_descriptions, - ) + == media::count_sends( + &anchor + .as_ref() + .unwrap() + .remote_description() + .await + .unwrap() + .unmarshal() + .unwrap() + .media_descriptions, + ) && anchor.as_ref().unwrap().connection_state() == RTCPeerConnectionState::Connected } @@ -156,7 +216,7 @@ impl PeerForwardInternal { return Err(AppError::ResourceAlreadyExists( "A connection has already been established".to_string(), ) - .into()); + .into()); } info!("[{}] [anchor] set {}", self.id, peer.get_stats_id()); *anchor = Some(peer); @@ -185,14 +245,6 @@ impl PeerForwardInternal { Ok(()) } - pub async fn add_subscribe(&self, peer: Arc) -> Result<()> { - let mut subscribe_peers = self.subscribe_group.write().await; - subscribe_peers.push(PeerWrap(peer.clone())); - info!("[{}] [subscribe] [{}] up", self.id, peer.get_stats_id()); - metrics::SUBSCRIBE.inc(); - Ok(()) - } - pub async fn publish_is_svc(&self) -> bool { self.publish_track_remotes(RTPCodecType::Video).await.len() > 1 } @@ -223,91 +275,6 @@ impl PeerForwardInternal { Err(anyhow::anyhow!("anchor svc rids error")) } - pub async fn select_layer(&self, key: String, layer: Option) -> Result<()> { - let rid = if let Some(layer) = layer { - layer.encoding_id - } else { - self.publish_svc_rids().await?[0].clone() - }; - let peer = self - .subscribe_group - .read() - .await - .iter() - .find(|p| p.get_key() == key) - .cloned(); - if let Some(peer) = peer { - let anchor_track_forward_map = self.anchor_track_forward_map.read().await; - for (track_remote, track_forward) in anchor_track_forward_map.iter() { - if track_remote.0.rid() == rid && track_remote.0.kind() == RTPCodecType::Video { - for (track_remote_original, track_forward_original) in - anchor_track_forward_map.iter() - { - if track_remote_original.0.kind() != RTPCodecType::Video { - continue; - } - let subscription_group = - track_forward_original.subscription_group.read().await; - if subscription_group.contains_key(&peer) { - if track_remote_original.0.rid() == rid { - return Ok(()); - } - drop(subscription_group); - let mut subscription_group = - track_forward_original.subscription_group.write().await; - if let Some(sender) = subscription_group.remove(&peer) { - drop(subscription_group); - track_forward - .subscription_group - .write() - .await - .insert(peer.clone(), sender); - let _ = track_forward - .rtcp_send - .try_send(RtcpMessage::PictureLossIndication); - info!( - "[{}] [subscribe] [{}] select layer {} to {} success", - self.id, - peer.get_key(), - track_remote_original.0.rid(), - rid - ); - return Ok(()); - } else { - warn!( - "[{}] [subscribe] [{}] select layer {} to {} fail,concurrent transfer", - self.id, - peer.get_key(), - track_remote_original.0.rid(), - rid - ); - return Err(anyhow::anyhow!("concurrent transfer")); - } - } - } - } - } - Err(anyhow::anyhow!("not found layer")) - } else { - Err(anyhow::anyhow!("not found key")) - } - } - - pub async fn remove_subscribe(&self, peer: Arc) -> Result<()> { - let peer_wrap = PeerWrap(peer.clone()); - for (_, track_forward) in self.anchor_track_forward_map.write().await.iter() { - let mut subscription_group = track_forward.subscription_group.write().await; - subscription_group.remove(&peer_wrap); - } - let mut subscribe_peers = self.subscribe_group.write().await; - let size = subscribe_peers.len(); - subscribe_peers.retain(|x| x != &peer_wrap); - if size != subscribe_peers.len() { - info!("[{}] [subscribe] [{}] down", self.id, peer.get_stats_id()); - metrics::SUBSCRIBE.dec(); - } - Ok(()) - } pub(crate) async fn new_publish_peer( &self, @@ -354,6 +321,73 @@ impl PeerForwardInternal { Ok(peer) } + pub(crate) async fn anchor_track_up( + &self, + peer: Arc, + track: Arc, + ) -> Result<()> { + let anchor = self.anchor.read().await; + if anchor.is_none() { + return Err(anyhow::anyhow!("anchor is none")); + } + if anchor.as_ref().unwrap().get_stats_id() != peer.get_stats_id() { + return Err(anyhow::anyhow!("anchor is not self")); + } + let (send, recv) = channel(1); + tokio::spawn(Self::peer_send_rtcp( + Arc::downgrade(&peer), + track.ssrc(), + recv, + )); + let mut anchor_track_forward_map = self.anchor_track_forward_map.write().await; + let handle = TrackForward { + rtcp_send: send, + subscription_group: Default::default(), + }; + anchor_track_forward_map.insert(TrackRemoteWrap(track.clone()), handle.clone()); + tokio::spawn(Self::anchor_track_forward( + self.id.clone(), + track, + handle.subscription_group, + )); + Ok(()) + } + + async fn anchor_track_forward( + id: String, + track: Arc, + subscription: SubscriptionGroup, + ) { + let mut b = vec![0u8; 1500]; + info!( + "[{}] [anchor] [track-{}-{}-{}] forward up", + id, + track.kind(), + track.ssrc(), + track.rid() + ); + while let Ok((rtp_packet, _)) = track.read(&mut b).await { + if let Ok(anchor_track_forward) = subscription.try_read() { + let packet = Arc::new(rtp_packet); + for (peer_wrap, sender) in anchor_track_forward.iter() { + if peer_wrap.0.connection_state() == RTCPeerConnectionState::Connected { + let _ = sender.send(packet.clone()); + } + } + } + } + info!( + "[{}] [anchor] [track-{}-{}-{}] forward down", + id, + track.kind(), + track.ssrc(), + track.rid() + ); + } +} + +// subscribe +impl PeerForwardInternal { pub(crate) async fn new_subscription_peer( &self, media_descriptions: Vec, @@ -455,6 +489,100 @@ impl PeerForwardInternal { Ok(send) } + pub async fn add_subscribe(&self, peer: Arc) -> Result<()> { + let mut subscribe_peers = self.subscribe_group.write().await; + subscribe_peers.push(PeerWrap(peer.clone())); + info!("[{}] [subscribe] [{}] up", self.id, peer.get_stats_id()); + metrics::SUBSCRIBE.inc(); + Ok(()) + } + + pub async fn remove_subscribe(&self, peer: Arc) -> Result<()> { + let peer_wrap = PeerWrap(peer.clone()); + for (_, track_forward) in self.anchor_track_forward_map.write().await.iter() { + let mut subscription_group = track_forward.subscription_group.write().await; + subscription_group.remove(&peer_wrap); + } + let mut subscribe_peers = self.subscribe_group.write().await; + let size = subscribe_peers.len(); + subscribe_peers.retain(|x| x != &peer_wrap); + if size != subscribe_peers.len() { + info!("[{}] [subscribe] [{}] down", self.id, peer.get_stats_id()); + metrics::SUBSCRIBE.dec(); + } + Ok(()) + } + + pub async fn select_layer(&self, key: String, layer: Option) -> Result<()> { + let rid = if let Some(layer) = layer { + layer.encoding_id + } else { + self.publish_svc_rids().await?[0].clone() + }; + let peer = self + .subscribe_group + .read() + .await + .iter() + .find(|p| p.get_key() == key) + .cloned(); + if let Some(peer) = peer { + let anchor_track_forward_map = self.anchor_track_forward_map.read().await; + for (track_remote, track_forward) in anchor_track_forward_map.iter() { + if track_remote.0.rid() == rid && track_remote.0.kind() == RTPCodecType::Video { + for (track_remote_original, track_forward_original) in + anchor_track_forward_map.iter() + { + if track_remote_original.0.kind() != RTPCodecType::Video { + continue; + } + let subscription_group = + track_forward_original.subscription_group.read().await; + if subscription_group.contains_key(&peer) { + if track_remote_original.0.rid() == rid { + return Ok(()); + } + drop(subscription_group); + let mut subscription_group = + track_forward_original.subscription_group.write().await; + if let Some(sender) = subscription_group.remove(&peer) { + drop(subscription_group); + track_forward + .subscription_group + .write() + .await + .insert(peer.clone(), sender); + let _ = track_forward + .rtcp_send + .try_send(RtcpMessage::PictureLossIndication); + info!( + "[{}] [subscribe] [{}] select layer {} to {} success", + self.id, + peer.get_key(), + track_remote_original.0.rid(), + rid + ); + return Ok(()); + } else { + warn!( + "[{}] [subscribe] [{}] select layer {} to {} fail,concurrent transfer", + self.id, + peer.get_key(), + track_remote_original.0.rid(), + rid + ); + return Err(anyhow::anyhow!("concurrent transfer")); + } + } + } + } + } + Err(anyhow::anyhow!("not found layer")) + } else { + Err(anyhow::anyhow!("not found key")) + } + } + async fn subscribe_read_rtcp( pc: Weak, sender: Arc, @@ -480,125 +608,4 @@ impl PeerForwardInternal { } } } - - pub(crate) async fn add_ice_candidate( - &self, - key: String, - ice_candidates: Vec, - ) -> Result<()> { - let mut peers = self.subscribe_group.read().await.clone(); - let anchor = self.anchor.read().await.as_ref().cloned(); - if let Some(anchor) = anchor { - peers.push(PeerWrap(anchor)) - } - let mut peers: Vec = peers.into_iter().filter(|p| p.get_key() == key).collect(); - if peers.len() != 1 { - return Err(anyhow::anyhow!("find key peers size : {}", peers.len())); - } - let peer = peers.pop().unwrap(); - for ice_candidate in ice_candidates { - peer.0.add_ice_candidate(ice_candidate).await?; - } - Ok(()) - } - - pub(crate) async fn remove_peer(&self, key: String) -> Result { - let anchor = self.anchor.read().await; - if let Some(anchor) = anchor.as_ref() { - if get_peer_key(anchor.clone()) == key { - anchor.close().await?; - return Ok(true); - } - } - drop(anchor); - let peers = self.subscribe_group.read().await; - for peer in peers.iter() { - if peer.get_key() == key { - peer.0.close().await?; - break; - } - } - Ok(false) - } - - pub(crate) async fn anchor_track_up( - &self, - peer: Arc, - track: Arc, - ) -> Result<()> { - let anchor = self.anchor.read().await; - if anchor.is_none() { - return Err(anyhow::anyhow!("anchor is none")); - } - if anchor.as_ref().unwrap().get_stats_id() != peer.get_stats_id() { - return Err(anyhow::anyhow!("anchor is not self")); - } - let (send, recv) = channel(1); - tokio::spawn(Self::peer_send_rtcp( - Arc::downgrade(&peer), - track.ssrc(), - recv, - )); - let mut anchor_track_forward_map = self.anchor_track_forward_map.write().await; - let handle = TrackForward { - rtcp_send: send, - subscription_group: Default::default(), - }; - anchor_track_forward_map.insert(TrackRemoteWrap(track.clone()), handle.clone()); - tokio::spawn(Self::anchor_track_forward( - self.id.clone(), - track, - handle.subscription_group, - )); - Ok(()) - } - - async fn anchor_track_forward( - id: String, - track: Arc, - subscription: SubscriptionGroup, - ) { - let mut b = vec![0u8; 1500]; - info!( - "[{}] [anchor] [track-{}-{}-{}] forward up", - id, - track.kind(), - track.ssrc(), - track.rid() - ); - while let Ok((rtp_packet, _)) = track.read(&mut b).await { - if let Ok(anchor_track_forward) = subscription.try_read() { - let packet = Arc::new(rtp_packet); - for (peer_wrap, sender) in anchor_track_forward.iter() { - if peer_wrap.0.connection_state() == RTCPeerConnectionState::Connected { - let _ = sender.send(packet.clone()); - } - } - } - } - info!( - "[{}] [anchor] [track-{}-{}-{}] forward down", - id, - track.kind(), - track.ssrc(), - track.rid() - ); - } - - async fn peer_send_rtcp( - peer: Weak, - media_ssrc: u32, - mut recv: Receiver, - ) { - while let (Some(rtcp_message), Some(pc)) = (recv.recv().await, peer.upgrade()) { - debug!("ssrc : {} ,send rtcp : {:?}", media_ssrc, rtcp_message); - if pc - .write_rtcp(&[rtcp_message.to_rtcp_packet(media_ssrc)]) - .await - .is_err() - { - break; - } - } - } -} +} \ No newline at end of file