Skip to content

Commit

Permalink
update libwish whipinto whepfrom
Browse files Browse the repository at this point in the history
  • Loading branch information
hongcha98 committed Feb 4, 2024
1 parent b6df28b commit fff50aa
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 55 deletions.
53 changes: 27 additions & 26 deletions libs/libwish/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use webrtc::{
ice_transport::ice_server::RTCIceServer,
peer_connection::sdp::session_description::RTCSessionDescription,
};

#[derive(Clone)]
pub struct Client {
url: String,
defulat_headers: HeaderMap,
resource_url: Option<String>,
default_headers: HeaderMap,
}

impl Client {
Expand All @@ -39,48 +41,44 @@ impl Client {
pub fn new(url: String, defulat_headers: Option<HeaderMap>) -> Self {
Client {
url,
defulat_headers: defulat_headers.unwrap_or_default(),
resource_url: None,
default_headers: defulat_headers.unwrap_or_default(),
}
}

pub async fn get_answer(&self, sdp: String) -> Result<(RTCSessionDescription, String)> {
let mut header_map = self.defulat_headers.clone();
pub async fn wish(
&mut self,
sdp: String,
) -> Result<(RTCSessionDescription, Vec<RTCIceServer>)> {
let mut header_map = self.default_headers.clone();
header_map.insert("Content-Type", HeaderValue::from_str("application/sdp")?);
let response = request(self.url.clone(), "POST", header_map, sdp).await?;
if response.status() != StatusCode::CREATED {
return Err(anyhow::anyhow!(get_response_error(response).await));
}
let etag = response
let resource_url = response
.headers()
.get("E-Tag")
.ok_or_else(|| anyhow::anyhow!("response no E-Tag header"))?
.get("location")
.ok_or_else(|| anyhow::anyhow!("Response missing location header"))?
.to_str()?
.to_owned();
self.resource_url = Some(resource_url);
let ice_servers = Self::parse_ide_servers(&response)?;
let sdp =
RTCSessionDescription::answer(String::from_utf8(response.bytes().await?.to_vec())?)?;
Ok((sdp, etag))
Ok((sdp, ice_servers))
}

pub async fn get_ide_servers(&self) -> Result<Vec<RTCIceServer>> {
let response = request(
self.url.clone(),
"OPTIONS",
self.defulat_headers.clone(),
"",
)
.await?;
if response.status() != StatusCode::NO_CONTENT {
return Err(anyhow::anyhow!(get_response_error(response).await));
}
fn parse_ide_servers(response: &Response) -> Result<Vec<RTCIceServer>> {
let links = response.headers().get_all("Link");
let mut _ice_servers = vec![];
let mut ice_servers = vec![];
for link in links {
let link_header = parse_link_header::parse_with_rel(link.to_str()?)?;
for (rel, mut link) in link_header {
if &rel != "ice-server" {
continue;
}
_ice_servers.push(RTCIceServer {
ice_servers.push(RTCIceServer {
urls: vec![link
.uri
.to_string()
Expand All @@ -97,13 +95,16 @@ impl Client {
})
}
}
Ok(_ice_servers)
Ok(ice_servers)
}

pub async fn remove_resource(&self, key: String) -> Result<()> {
let mut header_map = self.defulat_headers.clone();
header_map.insert("If-Match", HeaderValue::from_str(&key)?);
let response = request(self.url.clone(), "DELETE", header_map, "").await?;
pub async fn remove_resource(&self) -> Result<()> {
let resource_url = self
.resource_url
.clone()
.ok_or(anyhow::anyhow!("there is no resource url"))?;
let header_map = self.default_headers.clone();
let response = request(resource_url, "DELETE", header_map, "").await?;
if response.status() != StatusCode::NO_CONTENT {
Err(anyhow::anyhow!(get_response_error(response).await))
} else {
Expand Down
35 changes: 20 additions & 15 deletions tools/whepfrom/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::{
signal,
sync::mpsc::{unbounded_channel, UnboundedSender},
};
use webrtc::ice_transport::ice_credential_type::RTCIceCredentialType;
use webrtc::{
api::{interceptor_registry::register_default_interceptors, media_engine::*, APIBuilder},
ice_transport::ice_server::RTCIceServer,
Expand All @@ -30,7 +31,7 @@ use webrtc::{
const PREFIX_LIB: &str = "WEBRTC";

#[derive(Parser)]
#[command(author, version, about,long_about = None)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg(short, long)]
target: String,
Expand Down Expand Up @@ -60,7 +61,7 @@ async fn main() -> Result<()> {
assert!((96..=127).contains(&payload_type));
let udp_socket = UdpSocket::bind("0.0.0.0:0").await?;
udp_socket.connect(&args.target).await?;
let client = Client::new(
let mut client = Client::new(
args.url,
Client::get_auth_header_map(args.auth_basic, args.auth_token),
);
Expand All @@ -75,8 +76,8 @@ async fn main() -> Result<()> {
let (complete_tx, mut complete_rx) = unbounded_channel();
let (send, mut recv) = unbounded_channel::<Vec<u8>>();

let (peer, etag) = webrtc_start(
client.clone(),
let peer = webrtc_start(
&mut client,
args.codec.into(),
send,
payload_type,
Expand Down Expand Up @@ -113,40 +114,37 @@ async fn main() -> Result<()> {
_= complete_rx.recv() => { }
_= signal::ctrl_c() => {}
}
let _ = client.remove_resource(etag).await;
let _ = client.remove_resource().await;
let _ = peer.close().await;
Ok(())
}

async fn webrtc_start(
client: Client,
client: &mut Client,
codec: RTCRtpCodecCapability,
send: UnboundedSender<Vec<u8>>,
payload_type: u8,
complete_tx: UnboundedSender<()>,
) -> Result<(Arc<RTCPeerConnection>, String)> {
let ide_servers = client.get_ide_servers().await?;
) -> Result<Arc<RTCPeerConnection>> {
let peer = new_peer(
RTCRtpCodecParameters {
capability: codec,
payload_type,
stats_id: Default::default(),
},
ide_servers,
complete_tx.clone(),
send,
)
.await?;
let offser = peer.create_offer(None).await?;
peer.set_local_description(offser.clone()).await?;
let (answer, etag) = client.get_answer(offser.sdp).await?;
let offer = peer.create_offer(None).await?;
let (answer, _ice_servers) = client.wish(offer.sdp.clone()).await?;
peer.set_local_description(offer.clone()).await?;
peer.set_remote_description(answer).await?;
Ok((peer, etag))
Ok(peer)
}

async fn new_peer(
codec: RTCRtpCodecParameters,
ice_servers: Vec<RTCIceServer>,
complete_tx: UnboundedSender<()>,
sender: UnboundedSender<Vec<u8>>,
) -> Result<Arc<RTCPeerConnection>> {
Expand All @@ -160,7 +158,14 @@ async fn new_peer(
.with_interceptor_registry(registry)
.build();
let config = RTCConfiguration {
ice_servers,
ice_servers: vec![{
RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_string()],
username: "".to_string(),
credential: "".to_string(),
credential_type: RTCIceCredentialType::Unspecified,
}
}],
..Default::default()
};
let peer = Arc::new(
Expand Down
34 changes: 20 additions & 14 deletions tools/whipinto/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::{
signal,
sync::mpsc::{unbounded_channel, UnboundedSender},
};
use webrtc::ice_transport::ice_credential_type::RTCIceCredentialType;
use webrtc::{
api::{interceptor_registry::register_default_interceptors, media_engine::*, APIBuilder},
ice_transport::ice_server::RTCIceServer,
Expand All @@ -30,7 +31,7 @@ use webrtc::{
const PREFIX_LIB: &str = "WEBRTC";

#[derive(Parser)]
#[command(author, version, about,long_about = None)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg(short, long, default_value_t = 0)]
port: u16,
Expand All @@ -56,7 +57,7 @@ async fn main() -> Result<()> {
let listener = UdpSocket::bind(format!("0.0.0.0:{}", args.port)).await?;
let port = listener.local_addr()?.port();
println!("=== RTP listener started : {} ===", port);
let client = Client::new(
let mut client = Client::new(
args.url,
Client::get_auth_header_map(args.auth_basic, args.auth_token),
);
Expand All @@ -74,7 +75,7 @@ async fn main() -> Result<()> {
}
});
let (complete_tx, mut complete_rx) = unbounded_channel();
let (peer, sender, etag) = webrtc_start(client.clone(), args.codec.into(), complete_tx.clone())
let (peer, sender) = webrtc_start(&mut client, args.codec.into(), complete_tx.clone())
.await
.map_err(|error| anyhow!(format!("[{}] {}", PREFIX_LIB, error)))?;
tokio::spawn(rtp_listener(listener, sender));
Expand Down Expand Up @@ -102,7 +103,7 @@ async fn main() -> Result<()> {
_= signal::ctrl_c() => {}
}
println!("RTP listener closed");
let _ = client.remove_resource(etag).await;
let _ = client.remove_resource().await;
let _ = peer.close().await;
Ok(())
}
Expand All @@ -116,22 +117,20 @@ async fn rtp_listener(socker: UdpSocket, sender: UnboundedSender<Vec<u8>>) {
}

async fn webrtc_start(
client: Client,
client: &mut Client,
codec: RTCRtpCodecCapability,
complete_tx: UnboundedSender<()>,
) -> Result<(Arc<RTCPeerConnection>, UnboundedSender<Vec<u8>>, String)> {
let ide_servers = client.get_ide_servers().await?;
let (peer, sender) = new_peer(codec, ide_servers, complete_tx.clone()).await?;
let offser = peer.create_offer(None).await?;
peer.set_local_description(offser.clone()).await?;
let (answer, etag) = client.get_answer(offser.sdp).await?;
) -> Result<(Arc<RTCPeerConnection>, UnboundedSender<Vec<u8>>)> {
let (peer, sender) = new_peer(codec, complete_tx.clone()).await?;
let offer = peer.create_offer(None).await?;
let (answer, _ice_servers) = client.wish(offer.sdp.clone()).await?;
peer.set_local_description(offer.clone()).await?;
peer.set_remote_description(answer).await?;
Ok((peer, sender, etag))
Ok((peer, sender))
}

async fn new_peer(
codec: RTCRtpCodecCapability,
ice_servers: Vec<RTCIceServer>,
complete_tx: UnboundedSender<()>,
) -> Result<(Arc<RTCPeerConnection>, UnboundedSender<Vec<u8>>)> {
let mut m = MediaEngine::default();
Expand All @@ -143,7 +142,14 @@ async fn new_peer(
.with_interceptor_registry(registry)
.build();
let config = RTCConfiguration {
ice_servers,
ice_servers: vec![{
RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_string()],
username: "".to_string(),
credential: "".to_string(),
credential_type: RTCIceCredentialType::Unspecified,
}
}],
..Default::default()
};

Expand Down

0 comments on commit fff50aa

Please sign in to comment.