diff --git a/.github/CLA.md b/.github/CLA.md index 6713ef4d..0215b072 100644 --- a/.github/CLA.md +++ b/.github/CLA.md @@ -48,3 +48,4 @@ Example: - Wen Yang, @fastfeee, 2023/10/10 - Hongcha Zhang, @hongcha98, 2023/10/10 - Yuanyuan Zhou, @dfasdfasddf, 2023/11/01 +- Cooper Hzk, @CooperHash, 2024/03/07 diff --git a/Cargo.lock b/Cargo.lock index e64ae69d..80001d2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -781,29 +781,6 @@ dependencies = [ "cfg-if 1.0.0", ] -[[package]] -name = "env_filter" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" -dependencies = [ - "log", - "regex", -] - -[[package]] -name = "env_logger" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d" -dependencies = [ - "anstream", - "anstyle", - "env_filter", - "humantime", - "log", -] - [[package]] name = "equivalent" version = "1.0.1" @@ -1132,12 +1109,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "hyper" version = "0.14.28" @@ -1311,22 +1282,23 @@ dependencies = [ "anyhow", "axum", "base64 0.21.7", - "env_logger", "http 1.0.0", "http-body 1.0.0", + "http-body-util", "hyper 1.2.0", "lazy_static", - "log", "md5", "mime_guess", "prometheus", "rust-embed", "serde", "serde_json", - "thiserror", "tokio", "toml", "tower-http", + "tracing", + "tracing-logfmt", + "tracing-subscriber", "webrtc", ] @@ -1346,6 +1318,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" @@ -1466,6 +1447,25 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + +[[package]] +name = "nu-ansi-term" +version = "0.49.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c073d3c1930d0751774acf49e66653acecb416c3a54c6ec095a9b11caddb5a68" +dependencies = [ + "windows-sys 0.48.0", +] + [[package]] name = "num-bigint" version = "0.4.4" @@ -1550,6 +1550,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "p256" version = "0.11.1" @@ -1801,8 +1807,17 @@ checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.5", + "regex-syntax 0.8.2", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -1813,9 +1828,15 @@ checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.2", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.2" @@ -2193,6 +2214,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shellwords" version = "1.1.0" @@ -2408,6 +2438,16 @@ dependencies = [ "syn 2.0.50", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if 1.0.0", + "once_cell", +] + [[package]] name = "time" version = "0.3.34" @@ -2604,9 +2644,21 @@ checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.50", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -2614,6 +2666,49 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-logfmt" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84bab42e40ace4e4ff19c92023ee1dbc1510db60976828fbbdc6994852c7d065" +dependencies = [ + "nu-ansi-term 0.49.0", + "time", + "tracing", + "tracing-core", + "tracing-subscriber", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "matchers", + "nu-ansi-term 0.46.0", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -2730,6 +2825,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "version_check" version = "0.9.4" diff --git a/Cargo.toml b/Cargo.toml index be2cc93e..b0bf357b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,8 +9,9 @@ description = "A very simple, high performance, edge WebRTC SFU" members = [".", "libs/cli", "libs/libwish", "tools/whepfrom", "tools/whipinto"] [dependencies] -axum = { version = "0.7.4", features = ["multipart"] } -tower-http = { version = "0.5.2", features = ["fs", "auth"] } +http-body-util = "0.1.0" +axum = { version = "0.7.4", features = ["multipart", "tracing"] } +tower-http = { version = "0.5.2", features = ["fs", "auth", "trace","cors"] } # TODO # There have error, Next commit can't work with obs studio @@ -20,8 +21,9 @@ webrtc = { git = "https://github.com/webrtc-rs/webrtc", rev = "3f34e2e055463e88f anyhow = { version = "1", features = ["backtrace"] } tokio = { version = "1.36", features = ["full"] } hyper = "1.2.0" -log = "0.4.20" -env_logger = "0.11.2" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-logfmt = { version = "0.3.3", features = ["ansi_logs"] } serde = { version = "1.0.188", features = ["serde_derive"] } toml = "0.8.10" serde_json = "1.0.114" @@ -32,7 +34,6 @@ mime_guess = "2.0.4" rust-embed = { version = "8.2.0", features = ["axum-ex"] } prometheus = "0.13.3" lazy_static = "1.4.0" -thiserror = "1" md5 = "0.7.0" # cargo install cargo-deb @@ -49,10 +50,34 @@ depends = "libc6 systemd" section = "utility" priority = "optional" assets = [ - ["target/release/live777", "usr/bin/", "755"], - ["target/release/whipinto", "usr/bin/", "755"], - ["target/release/whepfrom", "usr/bin/", "755"], - ["config-dist.toml", "etc/live777/config.toml", "644"], - ["live777.service", "usr/lib/systemd/system/live777.service", "644"], - ["README.md", "usr/share/doc/live777/README", "644"], + [ + "target/release/live777", + "usr/bin/", + "755", + ], + [ + "target/release/whipinto", + "usr/bin/", + "755", + ], + [ + "target/release/whepfrom", + "usr/bin/", + "755", + ], + [ + "config-dist.toml", + "etc/live777/config.toml", + "644", + ], + [ + "live777.service", + "usr/lib/systemd/system/live777.service", + "644", + ], + [ + "README.md", + "usr/share/doc/live777/README", + "644", + ], ] diff --git a/assets/index.html b/assets/index.html index 113c3c85..26188c14 100644 --- a/assets/index.html +++ b/assets/index.html @@ -14,6 +14,10 @@ padding: 0.5rem; margin: 0.5rem; } + + section { + margin: 0.5rem; + } @@ -32,8 +36,8 @@
-
Audio Device: -
Video Device: +
Audio Device:
+
Video Device:
diff --git a/config-dist.toml b/config-dist.toml index 7b6e4905..ea3e3a14 100644 --- a/config-dist.toml +++ b/config-dist.toml @@ -1,5 +1,9 @@ +[http] # Http Server Listen Address # listen = "[::]:7777" +# Cross-Origin Resource Sharing (CORS) +# reference: https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS +# cors = false [[ice_servers]] urls = [ @@ -30,3 +34,4 @@ urls = [ # Default: info # Values: off, error, warn, info, debug, trace # level = "warn" + diff --git a/src/config.rs b/src/config.rs index 537e3b8c..3367cecb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,18 +5,26 @@ use webrtc::{ ice_transport::{ice_credential_type::RTCIceCredentialType, ice_server::RTCIceServer}, Error, }; - #[derive(Debug, Default, Clone, Deserialize, Serialize)] pub struct Config { - #[serde(default = "default_listen")] - pub listen: String, + #[serde(default = "Http::default")] + pub http: Http, #[serde(default = "default_ice_servers")] pub ice_servers: Vec, #[serde(default)] pub auth: Auth, - #[serde(default = "default_log")] + #[serde(default = "Log::default")] pub log: Log, } + +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +pub struct Http { + #[serde(default = "default_http_listen")] + pub listen: String, + #[serde(default)] + pub cors: bool, +} + #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct Auth { #[serde(default)] @@ -39,10 +47,19 @@ pub struct Log { pub level: String, } -fn default_listen() -> String { +fn default_http_listen() -> String { format!("[::]:{}", env::var("PORT").unwrap_or(String::from("7777"))) } +impl Http { + fn default() -> Self { + Self { + listen: default_http_listen(), + cors: Default::default(), + } + } +} + fn default_ice_servers() -> Vec { vec![IceServer { urls: vec!["stun:stun.l.google.com:19302".to_string()], @@ -52,9 +69,11 @@ fn default_ice_servers() -> Vec { }] } -fn default_log() -> Log { - Log { - level: default_log_level(), +impl Log { + fn default() -> Self { + Self { + level: default_log_level(), + } } } @@ -153,10 +172,10 @@ impl Config { } } else { Config { + http: Http::default(), ice_servers: default_ice_servers(), - listen: default_listen(), auth: Default::default(), - log: default_log(), + log: Log::default(), } } } diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 00000000..b21d37d0 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,55 @@ +use axum::response::{IntoResponse, Response}; +use http::StatusCode; + +#[derive(Debug)] +pub enum AppError { + ResourceNotFound(String), + ResourceAlreadyExists(String), + Throw(String), + InternalServerError(anyhow::Error), +} + +impl AppError { + pub fn resource_not_fount(t: T) -> Self + where + T: ToString, + { + AppError::ResourceNotFound(t.to_string()) + } + + pub fn resource_already_exists(t: T) -> Self + where + T: ToString, + { + AppError::ResourceAlreadyExists(t.to_string()) + } + + pub fn throw(t: T) -> Self + where + T: ToString, + { + AppError::Throw(t.to_string()) + } +} + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + match self { + AppError::ResourceNotFound(err) => (StatusCode::NOT_FOUND, err).into_response(), + AppError::ResourceAlreadyExists(err) => (StatusCode::CONFLICT, err).into_response(), + AppError::InternalServerError(err) => { + (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response() + } + AppError::Throw(err) => (StatusCode::INTERNAL_SERVER_ERROR, err).into_response(), + } + } +} + +impl From for AppError +where + E: Into, +{ + fn from(err: E) -> Self { + AppError::InternalServerError(err.into()) + } +} diff --git a/src/forward/forward_internal.rs b/src/forward/forward_internal.rs index cb5ec617..7ec62601 100644 --- a/src/forward/forward_internal.rs +++ b/src/forward/forward_internal.rs @@ -1,9 +1,9 @@ use std::borrow::ToOwned; use std::sync::Arc; -use anyhow::Result; -use log::{debug, info}; +use crate::result::Result; use tokio::sync::{broadcast, RwLock}; +use tracing::{debug, info}; use webrtc::api::interceptor_registry::register_default_interceptors; use webrtc::api::media_engine::MediaEngine; use webrtc::api::setting_engine::SettingEngine; @@ -224,10 +224,9 @@ impl PeerForwardInternal { pub(crate) async fn set_publish(&self, peer: Arc) -> Result<()> { let mut publish = self.publish.write().await; if publish.is_some() { - return Err(AppError::ResourceAlreadyExists( - "A connection has already been established".to_string(), - ) - .into()); + return Err(AppError::resource_already_exists( + "A connection has already been established", + )); } let publish_peer = PublishRTCPeerConnection::new( self.id.clone(), @@ -247,7 +246,7 @@ impl PeerForwardInternal { return Ok(()); } if publish.as_ref().unwrap().id != get_peer_id(&peer) { - return Err(anyhow::anyhow!("publish not myself")); + return Err(AppError::throw("publish not myself")); } let mut publish_tracks = self.publish_tracks.write().await; publish_tracks.clear(); @@ -281,7 +280,7 @@ impl PeerForwardInternal { media_info: MediaInfo, ) -> Result> { if media_info.video_transceiver.0 > 1 && media_info.audio_transceiver.0 > 1 { - return Err(anyhow::anyhow!("sendonly is more than 1")); + return Err(AppError::throw("sendonly is more than 1")); } let mut m = MediaEngine::default(); m.register_default_codecs()?; @@ -367,10 +366,10 @@ impl PeerForwardInternal { media_info: MediaInfo, ) -> Result> { if !self.publish_is_some().await { - return Err(anyhow::anyhow!("publish is none")); + return Err(AppError::throw("publish is none")); } if media_info.video_transceiver.1 > 1 && media_info.audio_transceiver.1 > 1 { - return Err(anyhow::anyhow!("sendonly is more than 1")); + return Err(AppError::throw("sendonly is more than 1")); } let mut m = MediaEngine::default(); m.register_default_codecs()?; diff --git a/src/forward/mod.rs b/src/forward/mod.rs index 2f5f4b26..0f0e7d3b 100644 --- a/src/forward/mod.rs +++ b/src/forward/mod.rs @@ -1,9 +1,9 @@ use std::io::Cursor; use std::sync::Arc; -use anyhow::{Ok, Result}; -use log::info; +use crate::result::Result; use tokio::sync::Mutex; +use tracing::info; use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit; use webrtc::ice_transport::ice_server::RTCIceServer; use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState; @@ -52,17 +52,15 @@ impl PeerForward { offer: RTCSessionDescription, ) -> Result<(RTCSessionDescription, String)> { if self.internal.publish_is_some().await { - return Err(AppError::ResourceAlreadyExists( - "A connection has already been established".to_string(), - ) - .into()); + return Err(AppError::resource_already_exists( + "A connection has already been established", + )); } let _ = self.publish_lock.lock().await; if self.internal.publish_is_some().await { - return Err(AppError::ResourceAlreadyExists( - "A connection has already been established".to_string(), - ) - .into()); + return Err(AppError::resource_already_exists( + "A connection has already been established", + )); } let peer = self .internal @@ -122,7 +120,7 @@ impl PeerForward { offer: RTCSessionDescription, ) -> Result<(RTCSessionDescription, String)> { if !self.internal.publish_is_ok().await { - return Err(anyhow::anyhow!("publish is not ok")); + return Err(AppError::throw("publish is not ok")); } let peer = self .internal @@ -191,7 +189,7 @@ impl PeerForward { } Ok(layers) } else { - Err(anyhow::anyhow!("not layers")) + Err(AppError::throw("not layers")) } } @@ -213,7 +211,7 @@ impl PeerForward { ) -> Result<()> { let codec_type = RTPCodecType::from(change_resource.kind.as_str()); if codec_type == RTPCodecType::Unspecified { - return Err(anyhow::anyhow!("kind unspecified")); + return Err(AppError::throw("kind unspecified")); } let rid = if change_resource.enabled { @@ -281,7 +279,7 @@ mod test { use crate::forward::parse_ice_candidate; #[test] - fn test_parse_ice_candidate() -> anyhow::Result<()> { + fn test_parse_ice_candidate() -> crate::result::Result<()> { let body = "a=ice-ufrag:EsAw a=ice-pwd:P2uYro0UCOQ4zxjKXaWCBui1 m=audio 9 RTP/AVP 0 diff --git a/src/forward/publish.rs b/src/forward/publish.rs index 20911e4f..d2e9bad8 100644 --- a/src/forward/publish.rs +++ b/src/forward/publish.rs @@ -1,8 +1,8 @@ use std::sync::{Arc, Weak}; use anyhow::{anyhow, Result}; -use log::debug; use tokio::sync::broadcast; +use tracing::debug; use webrtc::peer_connection::RTCPeerConnection; use crate::forward::rtcp::RtcpMessage; diff --git a/src/forward/subscribe.rs b/src/forward/subscribe.rs index 768f2ece..080988a1 100644 --- a/src/forward/subscribe.rs +++ b/src/forward/subscribe.rs @@ -1,17 +1,18 @@ use std::collections::HashMap; use std::sync::Arc; -use log::debug; use tokio::sync::{broadcast, RwLock}; +use tracing::debug; use webrtc::peer_connection::RTCPeerConnection; use webrtc::rtp_transceiver::rtp_codec::RTPCodecType; use webrtc::rtp_transceiver::rtp_sender::RTCRtpSender; use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP; use webrtc::track::track_local::TrackLocalWriter; +use crate::error::AppError; use crate::forward::rtcp::RtcpMessage; use crate::forward::track::ForwardData; -use crate::{constant, AppResult}; +use crate::{constant, result::Result}; use super::track::PublishTrackRemote; use super::{get_peer_id, info}; @@ -246,9 +247,9 @@ impl SubscribeRTCPeerConnection { info!("[{}] [{}] {} down", path, id, kind); } - pub(crate) fn select_kind_rid(&self, kind: RTPCodecType, rid: String) -> AppResult<()> { + pub(crate) fn select_kind_rid(&self, kind: RTPCodecType, rid: String) -> Result<()> { if let Err(err) = self.select_layer_sender.send((kind, rid)) { - Err(anyhow::anyhow!("select layer send err: {}", err).into()) + Err(AppError::throw(format!("select layer send err: {}", err))) } else { Ok(()) } diff --git a/src/forward/track.rs b/src/forward/track.rs index d33248aa..aa89d1a9 100644 --- a/src/forward/track.rs +++ b/src/forward/track.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use log::{debug, info}; use tokio::sync::broadcast; +use tracing::{debug, info}; use webrtc::rtp::packet::Packet; use webrtc::rtp_transceiver::rtp_codec::RTPCodecType; use webrtc::track::track_remote::TrackRemote; diff --git a/src/main.rs b/src/main.rs index d977d761..60d57b43 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,7 @@ -use std::collections::HashMap; -use std::future::IntoFuture; -use std::net::SocketAddr; -use std::str::FromStr; -use std::sync::Arc; - +use axum::body::{Body, Bytes}; +use axum::extract::Request; use axum::http::HeaderMap; - +use axum::middleware::Next; use axum::routing::get; use axum::Json; use axum::{ @@ -15,20 +11,30 @@ use axum::{ routing::post, Router, }; - +use error::AppError; use forward::info::Layer; -use http::header::ToStrError; use http::Uri; -use log::{debug, error, info}; -use thiserror::Error; +use http_body_util::BodyExt; +use std::collections::HashMap; +use std::future::IntoFuture; +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::Arc; #[cfg(debug_assertions)] +use tower_http::cors::CorsLayer; use tower_http::services::{ServeDir, ServeFile}; +use tower_http::trace::TraceLayer; use tower_http::validate_request::ValidateRequestHeaderLayer; +use tracing::info_span; +use tracing::{debug, error, info}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; use crate::auth::ManyValidate; use crate::config::Config; use crate::dto::req::{ChangeResource, SelectLayer}; +use crate::result::Result; use config::IceServer; use path::manager::Manager; #[cfg(not(debug_assertions))] @@ -38,9 +44,11 @@ mod auth; mod config; mod constant; mod dto; +mod error; mod forward; mod metrics; mod path; +mod result; mod signal; #[tokio::main] @@ -51,18 +59,15 @@ async fn main() { metrics::REGISTRY .register(Box::new(metrics::SUBSCRIBE.clone())) .unwrap(); - let cfg = Config::parse(); - env_logger::builder() - .filter_module( - "live777", - log::LevelFilter::from_str(cfg.log.level.as_str()).unwrap(), + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| format!("live777={},webrtc=error", cfg.log.level).into()), ) - .filter_module("webrtc", log::LevelFilter::Error) - .write_style(env_logger::WriteStyle::Auto) - .target(env_logger::Target::Stdout) + .with(tracing_logfmt::layer()) .init(); - let addr = SocketAddr::from_str(&cfg.listen).expect("invalid listen address"); + let addr = SocketAddr::from_str(&cfg.http.listen).expect("invalid listen address"); info!("Server listening on {}", addr); let ice_servers = cfg .ice_servers @@ -90,7 +95,21 @@ async fn main() { ) .layer(auth_layer) .route("/metrics", get(metrics)) - .with_state(app_state); + .with_state(app_state) + .layer(cors_layer(cfg.http.cors)) + .layer(axum::middleware::from_fn(print_request_response)) + .layer( + TraceLayer::new_for_http().make_span_with(|request: &Request<_>| { + let span = info_span!( + "http_request", + uri = ?request.uri(), + method = ?request.method(), + span_id = tracing::field::Empty, + ); + span.record("span_id", span.id().unwrap().into_u64()); + span + }), + ); tokio::select! { Err(e) = axum::serve(tokio::net::TcpListener::bind(&addr).await.unwrap(), static_server(app)).into_future() => error!("Application error: {e}"), msg = signal::wait_for_stop_signal() => debug!("Received signal: {}", msg), @@ -104,6 +123,14 @@ async fn metrics() -> String { .unwrap() } +fn cors_layer(cfg: bool) -> CorsLayer { + if cfg { + CorsLayer::permissive() + } else { + CorsLayer::new() + } +} + #[cfg(not(debug_assertions))] #[derive(RustEmbed)] #[folder = "assets/"] @@ -143,12 +170,56 @@ struct AppState { paths: Arc, } +async fn print_request_response( + req: Request, + next: Next, +) -> std::result::Result { + let req_headers = req.headers().clone(); + let (parts, body) = req.into_parts(); + let bytes = buffer_and_print("request", req_headers, body).await?; + let req = Request::from_parts(parts, Body::from(bytes)); + + let res = next.run(req).await; + let res_headers = res.headers().clone(); + let (parts, body) = res.into_parts(); + let bytes = buffer_and_print("response", res_headers, body).await?; + let res = Response::from_parts(parts, Body::from(bytes)); + + Ok(res) +} + +async fn buffer_and_print( + direction: &str, + headers: HeaderMap, + body: B, +) -> std::result::Result +where + B: axum::body::HttpBody, + B::Error: std::fmt::Display, +{ + let bytes = match body.collect().await { + Ok(collected) => collected.to_bytes(), + Err(err) => { + return Err(( + StatusCode::BAD_REQUEST, + format!("failed to read {direction} body: {err}"), + )); + } + }; + + if let Ok(body) = std::str::from_utf8(&bytes) { + tracing::debug!("{direction} headers = {headers:?} body = {body:?}"); + } + + Ok(bytes) +} + async fn whip( State(state): State, Path(id): Path, header: HeaderMap, body: String, -) -> AppResult> { +) -> Result> { let content_type = header .get("Content-Type") .ok_or(anyhow::anyhow!("Content-Type is required"))?; @@ -173,7 +244,7 @@ async fn whep( Path(id): Path, header: HeaderMap, body: String, -) -> AppResult> { +) -> Result> { let content_type = header .get("Content-Type") .ok_or(anyhow::anyhow!("Content-Type is required"))?; @@ -207,7 +278,7 @@ async fn add_ice_candidate( Path((id, key)): Path<(String, String)>, header: HeaderMap, body: String, -) -> AppResult> { +) -> Result> { let content_type = header .get("Content-Type") .ok_or(AppError::from(anyhow::anyhow!("Content-Type is required")))?; @@ -224,7 +295,7 @@ async fn remove_path_key( State(state): State, Path((id, key)): Path<(String, String)>, _uri: Uri, -) -> AppResult> { +) -> Result> { state.paths.remove_path_key(id, key).await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -235,7 +306,7 @@ async fn change_resource( State(state): State, Path((id, key)): Path<(String, String)>, Json(dto): Json, -) -> AppResult>> { +) -> Result>> { state.paths.change_resource(id, key, dto).await?; Ok(Json(HashMap::new())) } @@ -243,7 +314,7 @@ async fn change_resource( async fn get_layer( State(state): State, Path((id, _key)): Path<(String, String)>, -) -> AppResult>> { +) -> Result>> { let layers = state.paths.layers(id).await?; Ok(Json(layers)) } @@ -252,7 +323,7 @@ async fn select_layer( State(state): State, Path((id, key)): Path<(String, String)>, Json(layer): Json, -) -> AppResult { +) -> Result { state .paths .select_layer( @@ -267,7 +338,7 @@ async fn select_layer( async fn un_select_layer( State(state): State, Path((id, key)): Path<(String, String)>, -) -> AppResult { +) -> Result { state .paths .select_layer( @@ -310,55 +381,3 @@ fn string_encoder(s: &impl ToString) -> String { s[1..s.len() - 1].to_string() } -pub type AppResult = Result; - -#[derive(Debug, Error)] -pub enum AppError { - #[error("resource not found:{0}")] - ResourceNotFound(String), - #[error("resource already exists:{0}")] - ResourceAlreadyExists(String), - #[error("internal server error")] - InternalServerError(anyhow::Error), -} - -impl IntoResponse for AppError { - fn into_response(self) -> Response { - match self { - AppError::ResourceNotFound(err) => { - (StatusCode::NOT_FOUND, err.to_string()).into_response() - } - AppError::InternalServerError(err) => { - debug!("{:?}", err); - (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response() - } - AppError::ResourceAlreadyExists(err) => { - (StatusCode::CONFLICT, err.to_string()).into_response() - } - } - } -} - -impl From for AppError { - fn from(err: http::Error) -> Self { - AppError::InternalServerError(err.into()) - } -} - -impl From for AppError { - fn from(err: ToStrError) -> Self { - AppError::InternalServerError(err.into()) - } -} - -impl From for AppError { - fn from(err: webrtc::Error) -> Self { - AppError::InternalServerError(err.into()) - } -} - -impl From for AppError { - fn from(err: anyhow::Error) -> Self { - AppError::InternalServerError(err) - } -} diff --git a/src/path/manager.rs b/src/path/manager.rs index e240f7cf..7d46621f 100644 --- a/src/path/manager.rs +++ b/src/path/manager.rs @@ -1,8 +1,8 @@ use std::{collections::HashMap, sync::Arc}; -use anyhow::Result; -use log::info; +use crate::result::Result; use tokio::sync::RwLock; +use tracing::info; use webrtc::{ ice_transport::ice_server::RTCIceServer, peer_connection::sdp::session_description::RTCSessionDescription, @@ -40,7 +40,7 @@ impl Manager { let (sdp, key) = forward.set_publish(offer).await?; let mut paths = self.paths.write().await; if paths.contains_key(&path) { - return Err(anyhow::anyhow!("resource already exists")); + return Err(AppError::resource_already_exists("resource already exists")); } info!("add path : {}", path); paths.insert(path, forward); @@ -55,11 +55,9 @@ impl Manager { if let Some(forward) = forward { forward.add_subscribe(offer).await } else { - Err(AppError::ResourceNotFound( - ("The requested resource not exist,please check the path and try again.") - .to_string(), - ) - .into()) + Err(AppError::resource_not_fount( + "The requested resource not exist,please check the path and try again.", + )) } } @@ -75,7 +73,7 @@ impl Manager { if let Some(forward) = forward { forward.add_ice_candidate(key, ice_candidates).await } else { - Err(anyhow::anyhow!("resource not exists")) + Err(AppError::resource_not_fount("resource not exists")) } } @@ -101,7 +99,7 @@ impl Manager { if let Some(forward) = forward { forward.layers().await } else { - Err(anyhow::anyhow!("resource not exists")) + Err(AppError::resource_not_fount("resource not exists")) } } @@ -117,7 +115,7 @@ impl Manager { if let Some(forward) = forward { forward.select_layer(key, layer).await } else { - Err(anyhow::anyhow!("resource not exists")) + Err(AppError::resource_not_fount("resource not exists")) } } @@ -133,7 +131,7 @@ impl Manager { if let Some(forward) = forward { forward.change_resource(key, change_resource).await } else { - Err(anyhow::anyhow!("resource not exists")) + Err(AppError::resource_not_fount("resource not exists")) } } } diff --git a/src/result.rs b/src/result.rs new file mode 100644 index 00000000..aa5ba226 --- /dev/null +++ b/src/result.rs @@ -0,0 +1,5 @@ +use std::result; + +use crate::error::AppError; + +pub type Result = result::Result; diff --git a/tools/whepfrom/src/main.rs b/tools/whepfrom/src/main.rs index 30a72a13..d8c6871f 100644 --- a/tools/whepfrom/src/main.rs +++ b/tools/whepfrom/src/main.rs @@ -137,9 +137,19 @@ async fn webrtc_start( ) .await?; let offer = peer.create_offer(None).await?; - let (answer, _ice_servers) = client.wish(offer.sdp.clone()).await?; - peer.set_local_description(offer.clone()).await?; - peer.set_remote_description(answer).await?; + + let mut gather_complete = peer.gathering_complete_promise().await; + peer.set_local_description(offer).await?; + let _ = gather_complete.recv().await; + + let (answer, _ice_servers) = client + .wish(peer.local_description().await.unwrap().sdp.clone()) + .await?; + + peer.set_remote_description(answer) + .await + .map_err(|error| anyhow!(format!("{:?}: {}", error, error)))?; + Ok(peer) } diff --git a/tools/whipinto/src/main.rs b/tools/whipinto/src/main.rs index a23954d5..09ce3d3f 100644 --- a/tools/whipinto/src/main.rs +++ b/tools/whipinto/src/main.rs @@ -123,9 +123,19 @@ async fn webrtc_start( ) -> Result<(Arc, UnboundedSender>)> { let (peer, sender) = new_peer(codec, complete_tx.clone()).await?; let offer = peer.create_offer(None).await?; - let (answer, _ice_servers) = client.wish(offer.sdp.clone()).await?; - peer.set_local_description(offer.clone()).await?; - peer.set_remote_description(answer).await?; + + let mut gather_complete = peer.gathering_complete_promise().await; + peer.set_local_description(offer).await?; + let _ = gather_complete.recv().await; + + let (answer, _ice_servers) = client + .wish(peer.local_description().await.unwrap().sdp) + .await?; + + peer.set_remote_description(answer) + .await + .map_err(|error| anyhow!(format!("{:?}: {}", error, error)))?; + Ok((peer, sender)) }