diff --git a/src/forward/forward_internal.rs b/src/forward/forward_internal.rs index 70092d5c..0312de39 100644 --- a/src/forward/forward_internal.rs +++ b/src/forward/forward_internal.rs @@ -207,7 +207,7 @@ impl PeerForwardInternal { video_track_remotes } - async fn publish_svc_rids(&self) -> Result> { + pub async fn publish_svc_rids(&self) -> Result> { let anchor = self.anchor.read().await.as_ref().cloned(); if let Some(pc) = anchor { if let Some(rd) = pc.remote_description().await { diff --git a/src/forward/mod.rs b/src/forward/mod.rs index fefd3332..8ae86cd2 100644 --- a/src/forward/mod.rs +++ b/src/forward/mod.rs @@ -1,7 +1,7 @@ use std::io::Cursor; use std::sync::Arc; -use anyhow::Result; +use anyhow::{Ok, Result}; use log::info; use tokio::sync::Mutex; use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit; @@ -13,6 +13,7 @@ use webrtc::rtp_transceiver::rtp_codec::RTPCodecType; use webrtc::sdp::{MediaDescription, SessionDescription}; use crate::forward::forward_internal::{get_peer_key, PeerForwardInternal}; +use crate::layer::Layer; use crate::AppError; use crate::{media, metrics}; @@ -155,8 +156,21 @@ impl PeerForward { pub async fn remove_peer(&self, key: String) -> Result { self.internal.remove_peer(key).await } -} + pub async fn layers(&self) -> Result> { + if self.internal.publish_is_svc().await { + let mut layers = vec![]; + for rid in self.internal.publish_svc_rids().await? { + layers.push(Layer { + encoding_id: rid.to_owned(), + }); + } + Ok(layers) + } else { + Err(anyhow::anyhow!("not layers")) + } + } +} async fn peer_complete( offer: RTCSessionDescription, peer: Arc, diff --git a/src/layer.rs b/src/layer.rs new file mode 100644 index 00000000..a6099d8d --- /dev/null +++ b/src/layer.rs @@ -0,0 +1,8 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone)] +pub struct Layer { + #[serde(rename = "encodingId")] + pub encoding_id: String, + // TODO Other fields +} diff --git a/src/main.rs b/src/main.rs index 9e14d1c6..8d8aff26 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,6 +31,7 @@ use crate::config::Config; mod auth; mod config; mod forward; +mod layer; mod media; mod metrics; mod path; @@ -186,14 +187,18 @@ async fn whep( return Err(anyhow::anyhow!("Content-Type must be application/sdp").into()); } let offer = RTCSessionDescription::offer(body)?; - let (answer, key) = state.paths.subscribe(id, offer).await?; - Ok(Response::builder() + let (answer, key) = state.paths.subscribe(id.clone(), offer).await?; + let mut builder = Response::builder() .status(StatusCode::CREATED) .header("Content-Type", "application/sdp") .header("Accept-Patch", "application/trickle-ice-sdpfrag") .header("E-Tag", key) - .header("Location", uri.to_string()) - .body(answer.sdp)?) + .header("Location", uri.to_string()); + if state.paths.layers(id).await.is_ok() { + builder = builder.header("Link", format!("<{}/layer>; rel=\"urn:ietf:params:whep:ext:core:layer\"", uri.to_string())) + .header("Link", format!("<{}/sse_info>; rel=\"urn:ietf:params:whep:ext:core:server-sent-events\"; events=\"layers\"", uri.to_string())) + } + Ok(builder.body(answer.sdp)?) } async fn add_ice_candidate( @@ -279,6 +284,7 @@ fn string_encoder(s: &impl ToString) -> String { } pub type AppResult = Result; + #[derive(Debug, Error)] pub enum AppError { #[error("resource not found:{0}")] @@ -304,11 +310,13 @@ impl IntoResponse for AppError { } } } + impl From for AppError { fn from(err: http::Error) -> Self { AppError::InternalServerError(err.into()) } } + impl From for AppError { fn from(err: ToStrError) -> Self { AppError::InternalServerError(err.into()) diff --git a/src/path/manager.rs b/src/path/manager.rs index ce5db736..59a95399 100644 --- a/src/path/manager.rs +++ b/src/path/manager.rs @@ -9,6 +9,7 @@ use webrtc::{ }; use crate::forward::PeerForward; +use crate::layer::Layer; use crate::AppError; #[derive(Clone)] @@ -91,4 +92,15 @@ impl Manager { } Ok(()) } + + pub async fn layers(&self, path: String) -> Result> { + let paths = self.paths.read().await; + let forward = paths.get(&path).cloned(); + drop(paths); + if let Some(forward) = forward { + forward.layers().await + } else { + Err(anyhow::anyhow!("resource not exists")) + } + } }