Skip to content

Commit

Permalink
added multicast scouting ttl option (#1032)
Browse files Browse the repository at this point in the history
* added multicast scouting ttl option

* added warning for multicast ttl v6
  • Loading branch information
jeremiah-wooten authored May 28, 2024
1 parent 25f06bd commit 1d3a8ab
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 7 deletions.
2 changes: 2 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@
address: "224.0.0.224:7446",
/// The network interface which should be used for multicast scouting
interface: "auto", // If not set or set to "auto" the interface if picked automatically
/// The time-to-live on multicast scouting packets
ttl: 1,
/// Which type of Zenoh instances to automatically establish sessions with upon discovery on UDP multicast.
/// Accepts a single value or different values for router, peer and client.
/// Each value is bit-or-like combinations of "peer", "router" and "client".
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub mod scouting {
pub const enabled: bool = true;
pub const address: ([u8; 4], u16) = ([224, 0, 0, 224], 7446);
pub const interface: &str = "auto";
pub const ttl: u32 = 1;
pub mod autoconnect {
pub const router: &crate::WhatAmIMatcher = // ""
&crate::WhatAmIMatcher::empty();
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ validated_struct::validator! {
address: Option<SocketAddr>,
/// The network interface which should be used for multicast scouting. `zenohd` will automatically select an interface if none is provided.
interface: Option<String>,
/// The time-to-live on multicast scouting packets. (default: 1)
pub ttl: Option<u32>,
/// Which type of Zenoh instances to automatically establish sessions with upon discovery through UDP multicast.
#[serde(deserialize_with = "treat_error_as_none")]
autoconnect: Option<ModeDependentValue<WhatAmIMatcher>>,
Expand Down
27 changes: 21 additions & 6 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ impl Runtime {
}

async fn start_client(&self) -> ZResult<()> {
let (peers, scouting, addr, ifaces, timeout) = {
let (peers, scouting, addr, ifaces, timeout, multicast_ttl) = {
let guard = self.state.config.lock();
(
guard.connect().endpoints().clone(),
unwrap_or_default!(guard.scouting().multicast().enabled()),
unwrap_or_default!(guard.scouting().multicast().address()),
unwrap_or_default!(guard.scouting().multicast().interface()),
std::time::Duration::from_millis(unwrap_or_default!(guard.scouting().timeout())),
unwrap_or_default!(guard.scouting().multicast().ttl()),
)
};
match peers.len() {
Expand All @@ -72,7 +73,7 @@ impl Runtime {
} else {
let sockets: Vec<UdpSocket> = ifaces
.into_iter()
.filter_map(|iface| Runtime::bind_ucast_port(iface).ok())
.filter_map(|iface| Runtime::bind_ucast_port(iface, multicast_ttl).ok())
.collect();
if sockets.is_empty() {
bail!("Unable to bind UDP port to any multicast interface!")
Expand Down Expand Up @@ -183,12 +184,16 @@ impl Runtime {
addr: SocketAddr,
ifaces: String,
) -> ZResult<()> {
let multicast_ttl = {
let guard = self.state.config.lock();
unwrap_or_default!(guard.scouting().multicast().ttl())
};
let ifaces = Runtime::get_interfaces(&ifaces);
let mcast_socket = Runtime::bind_mcast_port(&addr, &ifaces).await?;
let mcast_socket = Runtime::bind_mcast_port(&addr, &ifaces, multicast_ttl).await?;
if !ifaces.is_empty() {
let sockets: Vec<UdpSocket> = ifaces
.into_iter()
.filter_map(|iface| Runtime::bind_ucast_port(iface).ok())
.filter_map(|iface| Runtime::bind_ucast_port(iface, multicast_ttl).ok())
.collect();
if !sockets.is_empty() {
let this = self.clone();
Expand Down Expand Up @@ -525,7 +530,11 @@ impl Runtime {
}
}

pub async fn bind_mcast_port(sockaddr: &SocketAddr, ifaces: &[IpAddr]) -> ZResult<UdpSocket> {
pub async fn bind_mcast_port(
sockaddr: &SocketAddr,
ifaces: &[IpAddr],
multicast_ttl: u32,
) -> ZResult<UdpSocket> {
let socket = match Socket::new(Domain::IPV4, Type::DGRAM, None) {
Ok(socket) => socket,
Err(err) => {
Expand Down Expand Up @@ -603,14 +612,19 @@ impl Runtime {
// Must set to nonblocking according to the doc of tokio
// https://docs.rs/tokio/latest/tokio/net/struct.UdpSocket.html#notes
socket.set_nonblocking(true)?;
socket.set_multicast_ttl_v4(multicast_ttl)?;

if sockaddr.is_ipv6() && multicast_ttl > 1 {
tracing::warn!("UDP Multicast TTL has been set to a value greater than 1 on a socket bound to an IPv6 address. This might not have the desired effect");
}

// UdpSocket::from_std requires a runtime even though it's a sync function
let udp_socket = zenoh_runtime::ZRuntime::Net
.block_in_place(async { UdpSocket::from_std(socket.into()) })?;
Ok(udp_socket)
}

pub fn bind_ucast_port(addr: IpAddr) -> ZResult<UdpSocket> {
pub fn bind_ucast_port(addr: IpAddr, multicast_ttl: u32) -> ZResult<UdpSocket> {
let socket = match Socket::new(Domain::IPV4, Type::DGRAM, None) {
Ok(socket) => socket,
Err(err) => {
Expand All @@ -637,6 +651,7 @@ impl Runtime {
// Must set to nonblocking according to the doc of tokio
// https://docs.rs/tokio/latest/tokio/net/struct.UdpSocket.html#notes
socket.set_nonblocking(true)?;
socket.set_multicast_ttl_v4(multicast_ttl)?;

// UdpSocket::from_std requires a runtime even though it's a sync function
let udp_socket = zenoh_runtime::ZRuntime::Net
Expand Down
8 changes: 7 additions & 1 deletion zenoh/src/scouting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ fn scout(
tracing::trace!("scout({}, {})", what, &config);
let default_addr = SocketAddr::from(zenoh_config::defaults::scouting::multicast::address);
let addr = config.scouting.multicast.address().unwrap_or(default_addr);
let default_multicast_ttl = zenoh_config::defaults::scouting::multicast::ttl;
let multicast_ttl = config
.scouting
.multicast
.ttl
.unwrap_or(default_multicast_ttl);
let ifaces = config.scouting.multicast.interface().as_ref().map_or(
zenoh_config::defaults::scouting::multicast::interface,
|s| s.as_ref(),
Expand All @@ -320,7 +326,7 @@ fn scout(
if !ifaces.is_empty() {
let sockets: Vec<UdpSocket> = ifaces
.into_iter()
.filter_map(|iface| Runtime::bind_ucast_port(iface).ok())
.filter_map(|iface| Runtime::bind_ucast_port(iface, multicast_ttl).ok())
.collect();
if !sockets.is_empty() {
let cancellation_token = TerminatableTask::create_cancellation_token();
Expand Down

0 comments on commit 1d3a8ab

Please sign in to comment.