From 414353110aa7046a9c7239eb9baaf7e6ed9997fc Mon Sep 17 00:00:00 2001 From: hongcha Date: Fri, 22 Dec 2023 21:07:35 +0800 Subject: [PATCH] fix: concurrent transfer --- src/forward/forward_internal.rs | 48 ++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/src/forward/forward_internal.rs b/src/forward/forward_internal.rs index 7ed76d0c..06269f0a 100644 --- a/src/forward/forward_internal.rs +++ b/src/forward/forward_internal.rs @@ -3,7 +3,7 @@ use std::hash::{Hash, Hasher}; use std::sync::{Arc, Weak}; use anyhow::Result; -use log::{debug, info}; +use log::{debug, info, warn}; use tokio::sync::mpsc::{channel, unbounded_channel, Receiver, Sender, UnboundedSender}; use tokio::sync::RwLock; use webrtc::api::interceptor_registry::register_default_interceptors; @@ -255,24 +255,34 @@ impl PeerForwardInternal { drop(subscription_group); let mut subscription_group = track_forward_original.subscription_group.write().await; - let sender = subscription_group.remove(&peer).unwrap(); - drop(subscription_group); - track_forward - .subscription_group - .write() - .await - .insert(peer.clone(), sender); - let _ = track_forward - .rtcp_send - .try_send(RtcpMessage::PictureLossIndication); - info!( - "[{}] [subscribe] [{}] select layer {} to {} ", - self.id, - peer.get_key(), - track_remote_original.0.rid(), - rid - ); - return Ok(()); + if let Some(sender) = subscription_group.remove(&peer) { + drop(subscription_group); + track_forward + .subscription_group + .write() + .await + .insert(peer.clone(), sender); + let _ = track_forward + .rtcp_send + .try_send(RtcpMessage::PictureLossIndication); + info!( + "[{}] [subscribe] [{}] select layer {} to {} success", + self.id, + peer.get_key(), + track_remote_original.0.rid(), + rid + ); + return Ok(()); + } else { + warn!( + "[{}] [subscribe] [{}] select layer {} to {} fail,concurrent transfer", + self.id, + peer.get_key(), + track_remote_original.0.rid(), + rid + ); + return Err(anyhow::anyhow!("concurrent transfer")); + } } } }