From f2ca5190d232f4dc365f322700ee980119eafc19 Mon Sep 17 00:00:00 2001 From: Dan Gohman Date: Mon, 15 May 2023 08:43:52 -0700 Subject: [PATCH] Add a new cap-net-ext API for more flexible binding and connecting. (#320) * Add a new cap-net-ext API for more flexible binding and connecting. Add new `tcp_binder_for`, `udp_binder_for`, `tcp_connecter_for`, and `udp_connecter_for` functions to cap-net-ext's `PoolExt` trait. These return `TcpBinder`, `UdpBinder`, `TcpConnecter`, and `UdpConnecter` objects which allow users to split validation of addresses from the actual binding and connecting operations. * Drop the `_for` from the function names, and add comments. --- cap-async-std/src/net/pool.rs | 23 +- cap-net-ext/Cargo.toml | 1 + cap-net-ext/src/lib.rs | 258 +++++- cap-primitives/src/net/pool.rs | 17 + cap-std/src/net/pool.rs | 25 +- tests/cap-net-ext-tcp-split.rs | 1369 ++++++++++++++++++++++++++++++++ tests/cap-net-ext-udp-split.rs | 549 +++++++++++++ 7 files changed, 2217 insertions(+), 25 deletions(-) create mode 100644 tests/cap-net-ext-tcp-split.rs create mode 100644 tests/cap-net-ext-udp-split.rs diff --git a/cap-async-std/src/net/pool.rs b/cap-async-std/src/net/pool.rs index 63051202..6dfa07ca 100644 --- a/cap-async-std/src/net/pool.rs +++ b/cap-async-std/src/net/pool.rs @@ -1,12 +1,23 @@ use crate::net::{TcpListener, TcpStream, ToSocketAddrs, UdpSocket}; use async_std::{io, net}; -use cap_primitives::net::NO_SOCKET_ADDRS; +use cap_primitives::net::no_socket_addrs; use cap_primitives::{ipnet, AmbientAuthority}; /// A pool of network addresses. /// /// This does not directly correspond to anything in `async_std`, however its /// methods correspond to the several functions in [`async_std::net`]. +/// +/// `Pool` implements `Clone`, which creates new independent entities that +/// carry the full authority of the originals. This means that in a borrow +/// of a `Pool`, the scope of the authority is not necessarily limited to +/// the scope of the borrow. +/// +/// Similarly, the [`cap_net_ext::PoolExt`] class allows creating "binder" +/// and "connecter" objects which represent capabilities to bind and +/// connect to addresses. +/// +/// [`cap_net_ext::PoolExt`]: https://docs.rs/cap-net-ext/latest/cap_net_ext/trait.PoolExt.html #[derive(Clone, Default)] pub struct Pool { cap: cap_primitives::net::Pool, @@ -109,7 +120,7 @@ impl Pool { } match last_err { Some(e) => Err(e), - None => Err(net::TcpListener::bind(NO_SOCKET_ADDRS).await.unwrap_err()), + None => Err(no_socket_addrs()), } } @@ -131,7 +142,7 @@ impl Pool { } match last_err { Some(e) => Err(e), - None => Err(net::TcpStream::connect(NO_SOCKET_ADDRS).await.unwrap_err()), + None => Err(no_socket_addrs()), } } @@ -154,7 +165,7 @@ impl Pool { } match last_err { Some(e) => Err(e), - None => Err(net::UdpSocket::bind(NO_SOCKET_ADDRS).await.unwrap_err()), + None => Err(no_socket_addrs()), } } @@ -172,7 +183,7 @@ impl Pool { // `UdpSocket::send_to` only sends to the first address. let addr = match addrs.next() { - None => return Err(net::UdpSocket::bind(NO_SOCKET_ADDRS).await.unwrap_err()), + None => return Err(no_socket_addrs()), Some(addr) => addr, }; self.cap.check_addr(&addr)?; @@ -200,7 +211,7 @@ impl Pool { } match last_err { Some(e) => Err(e), - None => Err(net::UdpSocket::bind(NO_SOCKET_ADDRS).await.unwrap_err()), + None => Err(no_socket_addrs()), } } } diff --git a/cap-net-ext/Cargo.toml b/cap-net-ext/Cargo.toml index 63dd048e..41aacf3d 100644 --- a/cap-net-ext/Cargo.toml +++ b/cap-net-ext/Cargo.toml @@ -16,3 +16,4 @@ edition = "2018" cap-std = { path = "../cap-std", version = "^1.0.14" } cap-primitives = { path = "../cap-primitives", version = "^1.0.14" } rustix = { version = "0.37.9", features = ["net"] } +smallvec = "1.10" diff --git a/cap-net-ext/src/lib.rs b/cap-net-ext/src/lib.rs index b4ff1efc..c5760bd3 100644 --- a/cap-net-ext/src/lib.rs +++ b/cap-net-ext/src/lib.rs @@ -39,7 +39,7 @@ html_favicon_url = "https://raw.githubusercontent.com/bytecodealliance/cap-std/main/media/cap-std.ico" )] -use cap_primitives::net::NO_SOCKET_ADDRS; +use cap_primitives::net::no_socket_addrs; use cap_std::net::{IpAddr, Pool, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket}; use rustix::fd::OwnedFd; use std::io; @@ -120,9 +120,9 @@ pub trait TcpListenerExt: private::Sealed + Sized { /// listening; this function enables listening. After this, the listener /// may accept new connections with [`accept`] or [`accept_with`]. /// - /// This is similar to [`TcpListener::bind_tcp_listener`] in that it - /// performs the `listen` step, however it does not create the socket - /// itself, or bind it. + /// This is similar to [`Pool::bind_tcp_listener`] in that it performs the + /// `listen` step, however it does not create the socket itself, or bind + /// it. /// /// The `backlog` argument specifies an optional hint to the implementation /// about how many connections can be waiting before new connections are @@ -176,8 +176,8 @@ pub trait UdpSocketExt: private::Sealed + Sized { /// Use [`PoolExt::bind_existing_udp_socket`] to bind it, or /// [`PoolExt::connect_existing_udp_socket`] to initiate a connection. /// - /// This is similar to [`TcpListener::bind_udp_socket`] in that it creates - /// a UDP socket, however it does not perform the `bind`. And, it has a + /// This is similar to [`Pool::bind_udp_socket`] in that it creates a UDP + /// socket, however it does not perform the `bind`. And, it has a /// `blocking` argument to select blocking or non-blocking mode for the /// created socket. /// @@ -212,6 +212,11 @@ pub trait PoolExt: private::Sealed { /// This is similar to [`Pool::bind_tcp_listener`] in that it binds a TCP /// socket, however it does not create the socket itself, or perform the /// `listen` step. + /// + /// This function ensures that the address to be bound is permitted by the + /// pool, and performs the bind. To perform these steps separately, create + /// a [`TcpBinder`] with [`Self::tcp_binder`] and use + /// [`TcpBinder::bind_existing_tcp_listener`]. fn bind_existing_tcp_listener( &self, listener: &TcpListener, @@ -225,6 +230,11 @@ pub trait PoolExt: private::Sealed { /// /// This is similar to [`Pool::bind_udp_socket`] in that it binds a UDP /// socket, however it does not create the socket itself. + /// + /// This function ensures that the address to be bound is permitted by the + /// pool, and performs the bind. To perform these steps separately, create + /// a [`UdpBinder`] with [`Self::udp_binder`] and use + /// [`UdpBinder::bind_existing_udp_socket`]. fn bind_existing_udp_socket( &self, socket: &UdpSocket, @@ -240,6 +250,11 @@ pub trait PoolExt: private::Sealed { /// /// Despite the name, this function uses the `TcpListener` type as a /// generic socket container. + /// + /// This function ensures that the address to connect to is permitted by + /// the pool, and performs the connect. To perform these steps separately, + /// create a [`TcpConnecter`] with [`Self::tcp_connecter`] and use + /// [`TcpConnecter::connect_into_tcp_stream`]. fn connect_into_tcp_stream( &self, socket: TcpListener, @@ -248,9 +263,14 @@ pub trait PoolExt: private::Sealed { /// Initiate a TCP connection on a socket. /// - /// This is simlar to to [`connect_into_tcp_stream`], however instead + /// This is simlar to to [`Self::connect_into_tcp_stream`], however instead /// of converting a `TcpListener` to a `TcpStream`, it leaves fd in the /// existing `TcpListener`. + /// + /// This function ensures that the address to connect to is permitted by + /// the pool, and performs the connect. To perform these steps separately, + /// create a [`TcpConnecter`] with [`Self::tcp_connecter`] and use + /// [`TcpConnecter::connect_existing_tcp_listener`]. fn connect_existing_tcp_listener( &self, socket: &TcpListener, @@ -262,11 +282,51 @@ pub trait PoolExt: private::Sealed { /// This is simlar to to [`Pool::connect_udp_socket`] in that it performs a /// UDP connection, but instead of creating a new socket itself it takes a /// [`UdpSocket`], such as one created with [`UdpSocketExt::new`]. + /// + /// This function ensures that the address to connect to is permitted by + /// the pool, and performs the connect. To perform these steps separately, + /// create a [`UdpConnecter`] with [`Self::udp_connecter`] and use + /// [`UdpConnecter::connect_existing_udp_socket`]. fn connect_existing_udp_socket( &self, socket: &UdpSocket, addrs: A, ) -> io::Result<()>; + + /// Create a TCP binder. + /// + /// This is an alternative to [`Self::bind_existing_tcp_listener`]. It + /// checks that all the addresses in `addrs` are permitted for TCP binding + /// up front, and then records them in a [`TcpBinder`] which can then be + /// used to make repeated [`TcpBinder::bind_existing_tcp_listener`] calls. + fn tcp_binder(&self, addrs: A) -> io::Result; + + /// Create a UDP binder. + /// + /// This is an alternative to [`Self::bind_existing_udp_socket`]. It checks + /// that all the addresses in `addrs` are permitted for UDP binding up + /// front, and then records them in a [`UdpBinder`] which can then be used + /// to make repeated [`UdpBinder::bind_existing_udp_socket`] calls. + fn udp_binder(&self, addrs: A) -> io::Result; + + /// Create a TCP connecter. + /// + /// This is an alternative to [`Self::connect_into_tcp_stream`] and + /// [`Self::connect_existing_tcp_listener`]. It checks that all the + /// addresses in `addrs` are permitted for TCP connecting up front, and + /// then records them in a [`TcpConnecter`] which can then be used to make + /// repeated [`TcpConnecter::connect_into_tcp_stream`] and + /// [`TcpConnecter::connect_existing_tcp_listener`] calls. + fn tcp_connecter(&self, addrs: A) -> io::Result; + + /// Create a UDP connecter. + /// + /// This is an alternative to [`Self::connect_existing_udp_socket`]. It + /// checks that all the addresses in `addrs` are permitted for UDP + /// connecting up front, and then records them in a [`UdpConnecter`] which + /// can then be used to make repeated + /// [`UdpConnecter::connect_existing_udp_socket`] calls. + fn udp_connecter(&self, addrs: A) -> io::Result; } impl PoolExt for Pool { @@ -290,7 +350,7 @@ impl PoolExt for Pool { } match last_err { Some(err) => Err(err), - None => Err(std::net::TcpListener::bind(NO_SOCKET_ADDRS).unwrap_err()), + None => Err(no_socket_addrs()), } } @@ -312,7 +372,7 @@ impl PoolExt for Pool { } match last_err { Some(err) => Err(err.into()), - None => Err(std::net::TcpListener::bind(NO_SOCKET_ADDRS).unwrap_err()), + None => Err(no_socket_addrs()), } } @@ -343,7 +403,7 @@ impl PoolExt for Pool { } match last_err { Some(err) => Err(err.into()), - None => Err(std::net::TcpStream::connect(NO_SOCKET_ADDRS).unwrap_err()), + None => Err(no_socket_addrs()), } } @@ -365,7 +425,183 @@ impl PoolExt for Pool { } match last_err { Some(err) => Err(err.into()), - None => Err(std::net::TcpStream::connect(NO_SOCKET_ADDRS).unwrap_err()), + None => Err(no_socket_addrs()), + } + } + + fn tcp_binder(&self, addrs: A) -> io::Result { + Ok(TcpBinder(check_addrs(self._pool(), addrs)?)) + } + + fn udp_binder(&self, addrs: A) -> io::Result { + Ok(UdpBinder(check_addrs(self._pool(), addrs)?)) + } + + fn tcp_connecter(&self, addrs: A) -> io::Result { + Ok(TcpConnecter(check_addrs(self._pool(), addrs)?)) + } + + fn udp_connecter(&self, addrs: A) -> io::Result { + Ok(UdpConnecter(check_addrs(self._pool(), addrs)?)) + } +} + +/// Check all the addresses in `addrs` and return a new list of them. +fn check_addrs( + pool: &cap_primitives::net::Pool, + addrs: A, +) -> io::Result> { + let mut checked = smallvec::SmallVec::new(); + for addr in addrs.to_socket_addrs()? { + pool.check_addr(&addr)?; + checked.push(addr); + } + Ok(checked) +} + +/// A utility for binding TCP listeners. +/// +/// See [`PoolExt::tcp_binder`] for details. +pub struct TcpBinder(smallvec::SmallVec<[SocketAddr; 1]>); + +impl TcpBinder { + /// Bind a [`TcpListener`]. + /// + /// A newly-created `TcpListener` created with [`TcpListenerExt::new`] + /// has not been bound yet; this function binds it. Before it can accept + /// connections, it must be marked for listening with + /// [`TcpListenerExt::listen`]. + /// + /// This is similar to [`Pool::bind_tcp_listener`] in that it binds a TCP + /// socket, however it does not create the socket itself, or perform the + /// `listen` step. + /// + /// This is similar to [`PoolExt::bind_existing_tcp_listener`] except that + /// it uses a `TcpBinder` which contains addresses that have already been + /// checked against a `Pool`. + pub fn bind_existing_tcp_listener(&self, listener: &TcpListener) -> io::Result<()> { + let mut last_err = None; + for addr in &self.0 { + set_reuseaddr(listener)?; + + match rustix::net::bind(listener, addr) { + Ok(()) => return Ok(()), + Err(err) => last_err = Some(err.into()), + } + } + match last_err { + Some(err) => Err(err), + None => Err(no_socket_addrs()), + } + } +} + +/// A utility for binding UDP sockets. +/// +/// See [`PoolExt::udp_binder`] for details. +pub struct UdpBinder(smallvec::SmallVec<[SocketAddr; 1]>); + +impl UdpBinder { + /// Bind a [`UdpSocket`] to the specified address. + /// + /// A newly-created `UdpSocket` created with [`UdpSocketExt::new`] has not + /// been bound yet; this function binds it. + /// + /// This is similar to [`Pool::bind_udp_socket`] in that it binds a UDP + /// socket, however it does not create the socket itself. + /// + /// This is similar to [`PoolExt::bind_existing_udp_socket`] except that + /// it uses a `UdpBinder` which contains addresses that have already been + /// checked against a `Pool`. + pub fn bind_existing_udp_socket(&self, socket: &UdpSocket) -> io::Result<()> { + let mut last_err = None; + for addr in &self.0 { + match rustix::net::bind(socket, addr) { + Ok(()) => return Ok(()), + Err(err) => last_err = Some(err.into()), + } + } + match last_err { + Some(err) => Err(err), + None => Err(no_socket_addrs()), + } + } +} + +/// A utility for making TCP connections. +/// +/// See [`PoolExt::tcp_connecter`] for details. +pub struct TcpConnecter(smallvec::SmallVec<[SocketAddr; 1]>); + +impl TcpConnecter { + /// Initiate a TCP connection, converting a [`TcpListener`] to a + /// [`TcpStream`]. + /// + /// This is simlar to to [`Pool::connect_tcp_stream`] in that it performs a + /// TCP connection, but instead of creating a new socket itself it takes a + /// [`TcpListener`], such as one created with [`TcpListenerExt::new`]. + /// + /// Despite the name, this function uses the `TcpListener` type as a + /// generic socket container. + /// + /// This is similar to [`PoolExt::connect_into_tcp_stream`] except that + /// it uses a `TcpConnecter` which contains addresses that have already + /// been checked against a `Pool`. + pub fn connect_into_tcp_stream(&self, socket: TcpListener) -> io::Result { + self.connect_existing_tcp_listener(&socket)?; + Ok(TcpStream::from(OwnedFd::from(socket))) + } + + /// Initiate a TCP connection on a socket. + /// + /// This is simlar to to [`Pool::connect_into_tcp_stream`], however instead + /// of converting a `TcpListener` to a `TcpStream`, it leaves fd in the + /// existing `TcpListener`. + /// + /// This is similar to [`PoolExt::connect_existing_tcp_listener`] except + /// that it uses a `TcpConnecter` which contains addresses that have + /// already been checked against a `Pool`. + pub fn connect_existing_tcp_listener(&self, socket: &TcpListener) -> io::Result<()> { + let mut last_err = None; + for addr in &self.0 { + match rustix::net::connect(socket, addr) { + Ok(()) => return Ok(()), + Err(err) => last_err = Some(err), + } + } + match last_err { + Some(err) => Err(err.into()), + None => Err(no_socket_addrs()), + } + } +} + +/// A utility for making UDP connections. +/// +/// See [`PoolExt::udp_connecter`] for details. +pub struct UdpConnecter(smallvec::SmallVec<[SocketAddr; 1]>); + +impl UdpConnecter { + /// Initiate a UDP connection. + /// + /// This is simlar to to [`Pool::connect_udp_socket`] in that it performs a + /// UDP connection, but instead of creating a new socket itself it takes a + /// [`UdpSocket`], such as one created with [`UdpSocketExt::new`]. + /// + /// This is similar to [`PoolExt::connect_existing_udp_socket`] except that + /// it uses a `UdpConnecter` which contains addresses that have already + /// been checked against a `Pool`. + pub fn connect_existing_udp_socket(&self, socket: &UdpSocket) -> io::Result<()> { + let mut last_err = None; + for addr in &self.0 { + match rustix::net::connect(socket, addr) { + Ok(()) => return Ok(()), + Err(err) => last_err = Some(err), + } + } + match last_err { + Some(err) => Err(err.into()), + None => Err(no_socket_addrs()), } } } diff --git a/cap-primitives/src/net/pool.rs b/cap-primitives/src/net/pool.rs index ecd0568f..388e31e7 100644 --- a/cap-primitives/src/net/pool.rs +++ b/cap-primitives/src/net/pool.rs @@ -54,6 +54,17 @@ impl IpGrant { /// /// This is presently a very simple concept, though it could grow in /// sophistication in the future. +/// +/// `Pool` implements `Clone`, which creates new independent entities that +/// carry the full authority of the originals. This means that in a borrow +/// of a `Pool`, the scope of the authority is not necessarily limited to +/// the scope of the borrow. +/// +/// Similarly, the [`cap_net_ext::PoolExt`] class allows creating "binder" +/// and "connecter" objects which represent capabilities to bind and +/// connect to addresses. +/// +/// [`cap_net_ext::PoolExt`]: https://docs.rs/cap-net-ext/latest/cap_net_ext/trait.PoolExt.html #[derive(Clone, Default)] pub struct Pool { // TODO: when compiling for WASI, use WASI-specific handle instead @@ -163,6 +174,12 @@ impl Pool { /// An empty array of `SocketAddr`s. pub const NO_SOCKET_ADDRS: &[net::SocketAddr] = &[]; +/// Return an error for reporting that no socket addresses were available. +#[cold] +pub fn no_socket_addrs() -> io::Error { + std::net::TcpListener::bind(&NO_SOCKET_ADDRS).unwrap_err() +} + #[test] fn test_empty() { let p = Pool::new(); diff --git a/cap-std/src/net/pool.rs b/cap-std/src/net/pool.rs index 70fc1134..625d49a2 100644 --- a/cap-std/src/net/pool.rs +++ b/cap-std/src/net/pool.rs @@ -1,5 +1,5 @@ use crate::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket}; -use cap_primitives::net::NO_SOCKET_ADDRS; +use cap_primitives::net::no_socket_addrs; use cap_primitives::{ipnet, AmbientAuthority}; use std::time::Duration; use std::{io, net}; @@ -8,6 +8,17 @@ use std::{io, net}; /// /// This does not directly correspond to anything in `std`, however its methods /// correspond to the several functions in [`std::net`]. +/// +/// `Pool` implements `Clone`, which creates new independent entities that +/// carry the full authority of the originals. This means that in a borrow +/// of a `Pool`, the scope of the authority is not necessarily limited to +/// the scope of the borrow. +/// +/// Similarly, the [`cap_net_ext::PoolExt`] class allows creating "binder" +/// and "connecter" objects which represent capabilities to bind and +/// connect to addresses. +/// +/// [`cap_net_ext::PoolExt`]: https://docs.rs/cap-net-ext/latest/cap_net_ext/trait.PoolExt.html #[derive(Clone, Default)] pub struct Pool { cap: cap_primitives::net::Pool, @@ -116,7 +127,7 @@ impl Pool { } match last_err { Some(e) => Err(e), - None => Err(net::TcpListener::bind(NO_SOCKET_ADDRS).unwrap_err()), + None => Err(no_socket_addrs()), } } @@ -138,7 +149,7 @@ impl Pool { } match last_err { Some(e) => Err(e), - None => Err(net::TcpStream::connect(NO_SOCKET_ADDRS).unwrap_err()), + None => Err(no_socket_addrs()), } } @@ -173,7 +184,7 @@ impl Pool { } match last_err { Some(e) => Err(e), - None => Err(net::UdpSocket::bind(NO_SOCKET_ADDRS).unwrap_err()), + None => Err(no_socket_addrs()), } } @@ -191,9 +202,7 @@ impl Pool { let mut addrs = addr.to_socket_addrs()?; // `UdpSocket::send_to` only sends to the first address. - let addr = addrs - .next() - .ok_or_else(|| net::UdpSocket::bind(NO_SOCKET_ADDRS).unwrap_err())?; + let addr = addrs.next().ok_or_else(no_socket_addrs)?; self.cap.check_addr(&addr)?; udp_socket.std.send_to(buf, addr) } @@ -221,7 +230,7 @@ impl Pool { } match last_err { Some(e) => Err(e), - None => Err(net::UdpSocket::bind(NO_SOCKET_ADDRS).unwrap_err()), + None => Err(no_socket_addrs()), } } diff --git a/tests/cap-net-ext-tcp-split.rs b/tests/cap-net-ext-tcp-split.rs new file mode 100644 index 00000000..1fab750f --- /dev/null +++ b/tests/cap-net-ext-tcp-split.rs @@ -0,0 +1,1369 @@ +// This file is derived from Rust's library/std/src/net/tcp/tests.rs at +// revision 377d1a984cd2a53327092b90aa1d8b7e22d1e347. +// +// This is like cap-net-ext-tcp.rs but uses the binder/connecter APIs. + +mod net; + +use cap_net_ext::{AddressFamily, Blocking, PoolExt, TcpListenerExt}; +use cap_std::ambient_authority; +use cap_std::net::*; +use net::{next_test_ip4, next_test_ip6}; +use std::io::prelude::*; +use std::io::{ErrorKind, IoSlice, IoSliceMut}; +use std::sync::mpsc::channel; +use std::time::{Duration, Instant}; +use std::{fmt, thread}; + +fn each_ip(f: &mut dyn FnMut(SocketAddr)) { + f(next_test_ip4()); + f(next_test_ip6()); +} + +macro_rules! t { + ($e:expr) => { + match $e { + Ok(t) => t, + Err(e) => panic!("received error for `{}`: {}", stringify!($e), e), + } + }; +} + +#[test] +fn bind_error() { + let mut pool = Pool::new(); + pool.insert_socket_addr("1.1.1.1:9999".parse().unwrap(), ambient_authority()); + + let listener = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + match pool + .tcp_binder("1.1.1.1:9999") + .unwrap() + .bind_existing_tcp_listener(&listener) + { + Ok(..) => panic!(), + Err(e) => assert_eq!(e.kind(), ErrorKind::AddrNotAvailable), + } +} + +#[test] +fn connect_error() { + let mut pool = Pool::new(); + pool.insert_socket_addr("0.0.0.0:1".parse().unwrap(), ambient_authority()); + + let socket = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + match pool + .tcp_connecter("0.0.0.0:1") + .unwrap() + .connect_into_tcp_stream(socket) + { + Ok(..) => panic!(), + Err(e) => assert!( + e.kind() == ErrorKind::ConnectionRefused + || e.kind() == ErrorKind::InvalidInput + || e.kind() == ErrorKind::AddrInUse + || e.kind() == ErrorKind::AddrNotAvailable + || e.kind() == ErrorKind::TimedOut, + "bad error: {} {:?}", + e, + e.kind() + ), + } +} + +#[test] +fn listen_localhost() { + let socket_addr = next_test_ip4(); + + let mut pool = Pool::new(); + pool.insert_socket_addr(socket_addr, ambient_authority()); + for resolved in format!("localhost:{}", socket_addr.port()) + .to_socket_addrs() + .unwrap() + { + pool.insert_socket_addr(resolved, ambient_authority()); + } + + let listener = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&socket_addr) + .unwrap() + .bind_existing_tcp_listener(&listener)); + listener.listen(None).unwrap(); + + let _t = thread::spawn(move || { + let socket = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + let mut stream = t!(pool + .tcp_connecter(&("localhost", socket_addr.port())) + .unwrap() + .connect_into_tcp_stream(socket)); + t!(stream.write(&[144])); + }); + + let mut stream = t!(listener.accept()).0; + let mut buf = [0]; + t!(stream.read(&mut buf)); + assert!(buf[0] == 144); +} + +#[test] +fn connect_loopback() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let acceptor = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&acceptor)); + acceptor.listen(None).unwrap(); + + let _t = thread::spawn(move || { + let host = match addr { + SocketAddr::V4(..) => "127.0.0.1", + SocketAddr::V6(..) => "::1", + }; + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut stream = t!(pool + .tcp_connecter(&(host, addr.port())) + .unwrap() + .connect_into_tcp_stream(socket)); + t!(stream.write(&[66])); + }); + + let mut stream = t!(acceptor.accept()).0; + let mut buf = [0]; + t!(stream.read(&mut buf)); + assert!(buf[0] == 66); + }) +} + +#[test] +fn smoke_test() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let acceptor = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&acceptor)); + acceptor.listen(None).unwrap(); + + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut stream = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + t!(stream.write(&[99])); + tx.send(t!(stream.local_addr())).unwrap(); + }); + + let (mut stream, addr) = t!(acceptor.accept()); + let mut buf = [0]; + t!(stream.read(&mut buf)); + assert!(buf[0] == 99); + assert_eq!(addr, t!(rx.recv())); + }) +} + +#[test] +fn read_eof() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let acceptor = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&acceptor)); + acceptor.listen(None).unwrap(); + + let _t = thread::spawn(move || { + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let _stream = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + // Close + }); + + let mut stream = t!(acceptor.accept()).0; + let mut buf = [0]; + let nread = t!(stream.read(&mut buf)); + assert_eq!(nread, 0); + let nread = t!(stream.read(&mut buf)); + assert_eq!(nread, 0); + }) +} + +#[test] +fn write_close() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let acceptor = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&acceptor)); + acceptor.listen(None).unwrap(); + + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + drop(t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket))); + tx.send(()).unwrap(); + }); + + let mut stream = t!(acceptor.accept()).0; + rx.recv().unwrap(); + let buf = [0]; + match stream.write(&buf) { + Ok(..) => {} + Err(e) => { + assert!( + e.kind() == ErrorKind::ConnectionReset + || e.kind() == ErrorKind::BrokenPipe + || e.kind() == ErrorKind::ConnectionAborted, + "unknown error: {}", + e + ); + } + } + }) +} + +#[test] +fn multiple_connect_serial() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let max = 10; + let acceptor = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&acceptor)); + acceptor.listen(None).unwrap(); + + let _t = thread::spawn(move || { + for _ in 0..max { + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut stream = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + t!(stream.write(&[99])); + } + }); + + for stream in acceptor.incoming().take(max) { + let mut stream = t!(stream); + let mut buf = [0]; + t!(stream.read(&mut buf)); + assert_eq!(buf[0], 99); + } + }) +} + +#[test] +fn multiple_connect_interleaved_greedy_schedule() { + const MAX: usize = 10; + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let acceptor = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&acceptor)); + acceptor.listen(None).unwrap(); + + let _t = thread::spawn(move || { + let acceptor = acceptor; + for (i, stream) in acceptor.incoming().enumerate().take(MAX) { + // Start another thread to handle the connection + let _t = thread::spawn(move || { + let mut stream = t!(stream); + let mut buf = [0]; + t!(stream.read(&mut buf)); + assert!(buf[0] == i as u8); + }); + } + }); + + connect(0, addr, pool); + }); + + fn connect(i: usize, addr: SocketAddr, pool: Pool) { + if i == MAX { + return; + } + + let t = thread::spawn(move || { + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut stream = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + // Connect again before writing + connect(i + 1, addr, pool); + t!(stream.write(&[i as u8])); + }); + t.join().ok().expect("thread panicked"); + } +} + +#[test] +fn multiple_connect_interleaved_lazy_schedule() { + const MAX: usize = 10; + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let acceptor = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&acceptor)); + acceptor.listen(None).unwrap(); + + let _t = thread::spawn(move || { + for stream in acceptor.incoming().take(MAX) { + // Start another thread to handle the connection + let _t = thread::spawn(move || { + let mut stream = t!(stream); + let mut buf = [0]; + t!(stream.read(&mut buf)); + assert!(buf[0] == 99); + }); + } + }); + + connect(0, addr, pool); + }); + + fn connect(i: usize, addr: SocketAddr, pool: Pool) { + if i == MAX { + return; + } + + let t = thread::spawn(move || { + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut stream = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + connect(i + 1, addr, pool); + t!(stream.write(&[99])); + }); + t.join().ok().expect("thread panicked"); + } +} + +#[test] +fn socket_and_peer_name() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let listener = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&listener)); + listener.listen(None).unwrap(); + let so_name = t!(listener.local_addr()); + assert_eq!(addr, so_name); + let _t = thread::spawn(move || { + t!(listener.accept()); + }); + + let socket = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let stream = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + assert_eq!(addr, t!(stream.peer_addr())); + }) +} + +#[test] +fn partial_read() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let (tx, rx) = channel(); + let srv = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&srv)); + srv.listen(None).unwrap(); + let _t = thread::spawn(move || { + let mut cl = t!(srv.accept()).0; + cl.write(&[10]).unwrap(); + let mut b = [0]; + t!(cl.read(&mut b)); + tx.send(()).unwrap(); + }); + + let socket = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut c = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + let mut b = [0; 10]; + assert_eq!(c.read(&mut b).unwrap(), 1); + t!(c.write(&[1])); + rx.recv().unwrap(); + }) +} + +#[test] +fn read_vectored() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let srv = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&srv)); + srv.listen(None).unwrap(); + let socket = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut s1 = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + let mut s2 = t!(srv.accept()).0; + + let len = s1.write(&[10, 11, 12]).unwrap(); + assert_eq!(len, 3); + + let mut a = []; + let mut b = [0]; + let mut c = [0; 3]; + let len = t!(s2.read_vectored(&mut [ + IoSliceMut::new(&mut a), + IoSliceMut::new(&mut b), + IoSliceMut::new(&mut c) + ],)); + assert!(len > 0); + assert_eq!(b, [10]); + // some implementations don't support readv, so we may only fill the first + // buffer + assert!(len == 1 || c == [11, 12, 0]); + }) +} + +#[test] +fn write_vectored() { + let mut pool = Pool::new(); + + each_ip(&mut |addr| { + pool.insert_socket_addr(addr, ambient_authority()); + + let srv = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&srv)); + srv.listen(None).unwrap(); + let socket = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut s1 = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + let mut s2 = t!(srv.accept()).0; + + let a = []; + let b = [10]; + let c = [11, 12]; + t!(s1.write_vectored(&[IoSlice::new(&a), IoSlice::new(&b), IoSlice::new(&c)])); + + let mut buf = [0; 4]; + let len = t!(s2.read(&mut buf)); + // some implementations don't support writev, so we may only write the first + // buffer + if len == 1 { + assert_eq!(buf, [10, 0, 0, 0]); + } else { + assert_eq!(len, 3); + assert_eq!(buf, [10, 11, 12, 0]); + } + }) +} + +#[test] +fn double_bind() { + let mut pool = Pool::new(); + + each_ip(&mut |addr| { + pool.insert_socket_addr(addr, ambient_authority()); + + let listener1 = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&listener1)); + listener1.listen(None).unwrap(); + let listener2 = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + match pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&listener2) + { + Ok(()) => panic!( + "This system (perhaps due to options set by pool.bind_existing_tcp_listener) \ + permits double binding: {:?} and {:?}", + listener1, listener2 + ), + Err(e) => { + assert!( + e.kind() == ErrorKind::ConnectionRefused + || e.kind() == ErrorKind::Other + || e.kind() == ErrorKind::AddrInUse, + "unknown error: {} {:?}", + e, + e.kind() + ); + } + } + }) +} + +#[test] +fn tcp_clone_smoke() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let acceptor = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&acceptor)); + acceptor.listen(None).unwrap(); + + let _t = thread::spawn(move || { + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut s = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + let mut buf = [0, 0]; + assert_eq!(s.read(&mut buf).unwrap(), 1); + assert_eq!(buf[0], 1); + t!(s.write(&[2])); + }); + + let mut s1 = t!(acceptor.accept()).0; + let s2 = t!(s1.try_clone()); + + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let _t = thread::spawn(move || { + let mut s2 = s2; + rx1.recv().unwrap(); + t!(s2.write(&[1])); + tx2.send(()).unwrap(); + }); + tx1.send(()).unwrap(); + let mut buf = [0, 0]; + assert_eq!(s1.read(&mut buf).unwrap(), 1); + rx2.recv().unwrap(); + }) +} + +#[test] +fn tcp_clone_two_read() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let acceptor = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&acceptor)); + acceptor.listen(None).unwrap(); + let (tx1, rx) = channel(); + let tx2 = tx1.clone(); + + let _t = thread::spawn(move || { + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut s = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + t!(s.write(&[1])); + rx.recv().unwrap(); + t!(s.write(&[2])); + rx.recv().unwrap(); + }); + + let mut s1 = t!(acceptor.accept()).0; + let s2 = t!(s1.try_clone()); + + let (done, rx) = channel(); + let _t = thread::spawn(move || { + let mut s2 = s2; + let mut buf = [0, 0]; + t!(s2.read(&mut buf)); + tx2.send(()).unwrap(); + done.send(()).unwrap(); + }); + let mut buf = [0, 0]; + t!(s1.read(&mut buf)); + tx1.send(()).unwrap(); + + rx.recv().unwrap(); + }) +} + +#[test] +fn tcp_clone_two_write() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let acceptor = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&acceptor)); + acceptor.listen(None).unwrap(); + + let _t = thread::spawn(move || { + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut s = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + let mut buf = [0, 1]; + t!(s.read(&mut buf)); + t!(s.read(&mut buf)); + }); + + let mut s1 = t!(acceptor.accept()).0; + let s2 = t!(s1.try_clone()); + + let (done, rx) = channel(); + let _t = thread::spawn(move || { + let mut s2 = s2; + t!(s2.write(&[1])); + done.send(()).unwrap(); + }); + t!(s1.write(&[2])); + + rx.recv().unwrap(); + }) +} + +#[test] +// FIXME: https://github.com/fortanix/rust-sgx/issues/110 +#[cfg_attr(target_env = "sgx", ignore)] +fn shutdown_smoke() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let a = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&a)); + a.listen(None).unwrap(); + let _t = thread::spawn(move || { + let mut c = t!(a.accept()).0; + let mut b = [0]; + assert_eq!(c.read(&mut b).unwrap(), 0); + t!(c.write(&[1])); + }); + + let socket = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut s = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + t!(s.shutdown(Shutdown::Write)); + assert!(s.write(&[1]).is_err()); + let mut b = [0, 0]; + assert_eq!(t!(s.read(&mut b)), 1); + assert_eq!(b[0], 1); + }) +} + +#[test] +// FIXME: https://github.com/fortanix/rust-sgx/issues/110 +#[cfg_attr(target_env = "sgx", ignore)] +fn close_readwrite_smoke() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let a = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&a)); + a.listen(None).unwrap(); + let (tx, rx) = channel::<()>(); + let _t = thread::spawn(move || { + let _s = t!(a.accept()); + let _ = rx.recv(); + }); + + let mut b = [0]; + let socket = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut s = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + let mut s2 = t!(s.try_clone()); + + // closing should prevent reads/writes + t!(s.shutdown(Shutdown::Write)); + assert!(s.write(&[0]).is_err()); + t!(s.shutdown(Shutdown::Read)); + assert_eq!(s.read(&mut b).unwrap(), 0); + + // closing should affect previous handles + assert!(s2.write(&[0]).is_err()); + assert_eq!(s2.read(&mut b).unwrap(), 0); + + // closing should affect new handles + let mut s3 = t!(s.try_clone()); + assert!(s3.write(&[0]).is_err()); + assert_eq!(s3.read(&mut b).unwrap(), 0); + + // make sure these don't die + let _ = s2.shutdown(Shutdown::Read); + let _ = s2.shutdown(Shutdown::Write); + let _ = s3.shutdown(Shutdown::Read); + let _ = s3.shutdown(Shutdown::Write); + drop(tx); + }) +} + +#[test] +#[cfg(unix)] // test doesn't work on Windows, see #31657 +fn close_read_wakes_up() { + let mut pool = Pool::new(); + + each_ip(&mut |addr| { + pool.insert_socket_addr(addr, ambient_authority()); + + let a = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&a)); + a.listen(None).unwrap(); + let (tx1, rx) = channel::<()>(); + let _t = thread::spawn(move || { + let _s = t!(a.accept()); + let _ = rx.recv(); + }); + + let socket = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let s = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + let s2 = t!(s.try_clone()); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + let mut s2 = s2; + assert_eq!(t!(s2.read(&mut [0])), 0); + tx.send(()).unwrap(); + }); + // this should wake up the child thread + t!(s.shutdown(Shutdown::Read)); + + // this test will never finish if the child doesn't wake up + rx.recv().unwrap(); + drop(tx1); + }) +} + +#[test] +fn clone_while_reading() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let accept = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&accept)); + accept.listen(None).unwrap(); + + // Enqueue a thread to write to a socket + let (tx, rx) = channel(); + let (txdone, rxdone) = channel(); + let txdone2 = txdone.clone(); + let _t = thread::spawn(move || { + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut tcp = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + rx.recv().unwrap(); + t!(tcp.write(&[0])); + txdone2.send(()).unwrap(); + }); + + // Spawn off a reading clone + let tcp = t!(accept.accept()).0; + let tcp2 = t!(tcp.try_clone()); + let txdone3 = txdone.clone(); + let _t = thread::spawn(move || { + let mut tcp2 = tcp2; + t!(tcp2.read(&mut [0])); + txdone3.send(()).unwrap(); + }); + + // Try to ensure that the reading clone is indeed reading + for _ in 0..50 { + thread::yield_now(); + } + + // clone the handle again while it's reading, then let it finish the + // read. + let _ = t!(tcp.try_clone()); + tx.send(()).unwrap(); + rxdone.recv().unwrap(); + rxdone.recv().unwrap(); + }) +} + +#[test] +fn clone_accept_smoke() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let a = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&a)); + a.listen(None).unwrap(); + let a2 = t!(a.try_clone()); + + let p = pool.clone(); + let _t = thread::spawn(move || { + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let _ = p.connect_into_tcp_stream(socket, &addr); + }); + let p = pool.clone(); + let _t = thread::spawn(move || { + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let _ = p.connect_into_tcp_stream(socket, &addr); + }); + + t!(a.accept()); + t!(a2.accept()); + }) +} + +#[test] +fn clone_accept_concurrent() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let a = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&a)); + a.listen(None).unwrap(); + let a2 = t!(a.try_clone()); + + let (tx, rx) = channel(); + let tx2 = tx.clone(); + + let _t = thread::spawn(move || { + tx.send(t!(a.accept())).unwrap(); + }); + let _t = thread::spawn(move || { + tx2.send(t!(a2.accept())).unwrap(); + }); + + let p = pool.clone(); + let _t = thread::spawn(move || { + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let _ = p.connect_into_tcp_stream(socket, &addr); + }); + let p = pool.clone(); + let _t = thread::spawn(move || { + let socket = + TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let _ = p.connect_into_tcp_stream(socket, &addr); + }); + + rx.recv().unwrap(); + rx.recv().unwrap(); + }) +} + +#[test] +fn debug() { + let mut pool = Pool::new(); + + #[cfg(not(target_env = "sgx"))] + fn render_socket_addr<'a>(addr: &'a SocketAddr) -> impl fmt::Debug + 'a { + addr + } + #[cfg(target_env = "sgx")] + fn render_socket_addr<'a>(addr: &'a SocketAddr) -> impl fmt::Debug + 'a { + addr.to_string() + } + + #[cfg(target_env = "sgx")] + use std::os::fortanix_sgx::io::AsRawFd; + #[cfg(unix)] + use std::os::unix::io::AsRawFd; + #[cfg(not(windows))] + fn render_inner(addr: &dyn AsRawFd) -> impl fmt::Debug { + addr.as_raw_fd() + } + #[cfg(windows)] + fn render_inner(addr: &dyn std::os::windows::io::AsRawSocket) -> impl fmt::Debug { + addr.as_raw_socket() + } + + let inner_name = if cfg!(windows) { "socket" } else { "fd" }; + let socket_addr = next_test_ip4(); + pool.insert_socket_addr(socket_addr, ambient_authority()); + for resolved in format!("localhost:{}", socket_addr.port()) + .to_socket_addrs() + .unwrap() + { + pool.insert_socket_addr(resolved, ambient_authority()); + } + + let listener = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&socket_addr) + .unwrap() + .bind_existing_tcp_listener(&listener)); + listener.listen(None).unwrap(); + let compare = format!( + "TcpListener {{ addr: {:?}, {}: {:?} }}", + render_socket_addr(&socket_addr), + inner_name, + render_inner(&listener) + ); + assert_eq!(format!("{:?}", listener), compare); + + let socket = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + let stream = t!(pool + .tcp_connecter(&("localhost", socket_addr.port())) + .unwrap() + .connect_into_tcp_stream(socket)); + let compare = format!( + "TcpStream {{ addr: {:?}, peer: {:?}, {}: {:?} }}", + render_socket_addr(&stream.local_addr().unwrap()), + render_socket_addr(&stream.peer_addr().unwrap()), + inner_name, + render_inner(&stream) + ); + assert_eq!(format!("{:?}", stream), compare); +} + +// FIXME: re-enabled openbsd tests once their socket timeout code +// no longer has rounding errors. +// VxWorks ignores SO_SNDTIMEO. +#[cfg_attr( + any(target_os = "netbsd", target_os = "openbsd", target_os = "vxworks"), + ignore +)] +#[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31 +#[test] +fn timeouts() { + let addr = next_test_ip4(); + + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + for resolved in format!("localhost:{}", addr.port()) + .to_socket_addrs() + .unwrap() + { + pool.insert_socket_addr(resolved, ambient_authority()); + } + + let listener = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&listener)); + listener.listen(None).unwrap(); + + let socket = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + let stream = t!(pool + .tcp_connecter(&("localhost", addr.port())) + .unwrap() + .connect_into_tcp_stream(socket)); + let dur = Duration::new(15410, 0); + + assert_eq!(None, t!(stream.read_timeout())); + + t!(stream.set_read_timeout(Some(dur))); + assert_eq!(Some(dur), t!(stream.read_timeout())); + + assert_eq!(None, t!(stream.write_timeout())); + + t!(stream.set_write_timeout(Some(dur))); + assert_eq!(Some(dur), t!(stream.write_timeout())); + + t!(stream.set_read_timeout(None)); + assert_eq!(None, t!(stream.read_timeout())); + + t!(stream.set_write_timeout(None)); + assert_eq!(None, t!(stream.write_timeout())); + drop(listener); +} + +#[test] +#[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31 +fn test_read_timeout() { + let addr = next_test_ip4(); + + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + for resolved in format!("localhost:{}", addr.port()) + .to_socket_addrs() + .unwrap() + { + pool.insert_socket_addr(resolved, ambient_authority()); + } + + let listener = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&listener)); + listener.listen(None).unwrap(); + + let socket = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + let mut stream = t!(pool + .tcp_connecter(&("localhost", addr.port())) + .unwrap() + .connect_into_tcp_stream(socket)); + t!(stream.set_read_timeout(Some(Duration::from_millis(1000)))); + + let mut buf = [0; 10]; + let start = Instant::now(); + let kind = stream + .read_exact(&mut buf) + .err() + .expect("expected error") + .kind(); + assert!( + kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut, + "unexpected_error: {:?}", + kind + ); + assert!(start.elapsed() > Duration::from_millis(400)); + drop(listener); +} + +#[test] +#[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31 +fn test_read_with_timeout() { + let addr = next_test_ip4(); + + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + for resolved in format!("localhost:{}", addr.port()) + .to_socket_addrs() + .unwrap() + { + pool.insert_socket_addr(resolved, ambient_authority()); + } + + let listener = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&listener)); + listener.listen(None).unwrap(); + + let socket = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + let mut stream = t!(pool + .tcp_connecter(&("localhost", addr.port())) + .unwrap() + .connect_into_tcp_stream(socket)); + t!(stream.set_read_timeout(Some(Duration::from_millis(1000)))); + + let mut other_end = t!(listener.accept()).0; + t!(other_end.write_all(b"hello world")); + + let mut buf = [0; 11]; + t!(stream.read(&mut buf)); + assert_eq!(b"hello world", &buf[..]); + + let start = Instant::now(); + let kind = stream + .read_exact(&mut buf) + .err() + .expect("expected error") + .kind(); + assert!( + kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut, + "unexpected_error: {:?}", + kind + ); + assert!(start.elapsed() > Duration::from_millis(400)); + drop(listener); +} + +// Ensure the `set_read_timeout` and `set_write_timeout` calls return errors +// when passed zero Durations +#[test] +fn test_timeout_zero_duration() { + let addr = next_test_ip4(); + + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let listener = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&listener)); + listener.listen(None).unwrap(); + let socket = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + let stream = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + + let result = stream.set_write_timeout(Some(Duration::new(0, 0))); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + + let result = stream.set_read_timeout(Some(Duration::new(0, 0))); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + + drop(listener); +} + +#[test] +#[cfg_attr(target_env = "sgx", ignore)] +fn nodelay() { + let addr = next_test_ip4(); + + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + for resolved in format!("localhost:{}", addr.port()) + .to_socket_addrs() + .unwrap() + { + pool.insert_socket_addr(resolved, ambient_authority()); + } + + let _listener = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&_listener)); + _listener.listen(None).unwrap(); + + let socket = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + let stream = t!(pool + .tcp_connecter(&("localhost", addr.port())) + .unwrap() + .connect_into_tcp_stream(socket)); + + assert_eq!(false, t!(stream.nodelay())); + t!(stream.set_nodelay(true)); + assert_eq!(true, t!(stream.nodelay())); + t!(stream.set_nodelay(false)); + assert_eq!(false, t!(stream.nodelay())); +} + +#[test] +#[cfg_attr(target_env = "sgx", ignore)] +fn ttl() { + let ttl = 100; + + let addr = next_test_ip4(); + + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + for resolved in format!("localhost:{}", addr.port()) + .to_socket_addrs() + .unwrap() + { + pool.insert_socket_addr(resolved, ambient_authority()); + } + + let listener = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&listener)); + listener.listen(None).unwrap(); + + t!(listener.set_ttl(ttl)); + assert_eq!(ttl, t!(listener.ttl())); + + let socket = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + let stream = t!(pool + .tcp_connecter(&("localhost", addr.port())) + .unwrap() + .connect_into_tcp_stream(socket)); + + t!(stream.set_ttl(ttl)); + assert_eq!(ttl, t!(stream.ttl())); +} + +#[test] +#[cfg_attr(target_env = "sgx", ignore)] +fn set_nonblocking() { + let addr = next_test_ip4(); + + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + for resolved in format!("localhost:{}", addr.port()) + .to_socket_addrs() + .unwrap() + { + pool.insert_socket_addr(resolved, ambient_authority()); + } + + let listener = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&listener)); + listener.listen(None).unwrap(); + + t!(listener.set_nonblocking(true)); + t!(listener.set_nonblocking(false)); + + let socket = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + let mut stream = t!(pool + .tcp_connecter(&("localhost", addr.port())) + .unwrap() + .connect_into_tcp_stream(socket)); + + t!(stream.set_nonblocking(false)); + t!(stream.set_nonblocking(true)); + + let mut buf = [0]; + match stream.read(&mut buf) { + Ok(_) => panic!("expected error"), + Err(ref e) if e.kind() == ErrorKind::WouldBlock => {} + Err(e) => panic!("unexpected error {}", e), + } +} + +#[test] +#[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31 +fn peek() { + each_ip(&mut |addr| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let (txdone, rxdone) = channel(); + + let srv = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .tcp_binder(&addr) + .unwrap() + .bind_existing_tcp_listener(&srv)); + srv.listen(None).unwrap(); + let _t = thread::spawn(move || { + let mut cl = t!(srv.accept()).0; + cl.write(&[1, 3, 3, 7]).unwrap(); + t!(rxdone.recv()); + }); + + let socket = TcpListener::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + let mut c = t!(pool + .tcp_connecter(&addr) + .unwrap() + .connect_into_tcp_stream(socket)); + let mut b = [0; 10]; + for _ in 1..3 { + let len = c.peek(&mut b).unwrap(); + assert_eq!(len, 4); + } + let len = c.read(&mut b).unwrap(); + assert_eq!(len, 4); + + t!(c.set_nonblocking(true)); + match c.peek(&mut b) { + Ok(_) => panic!("expected error"), + Err(ref e) if e.kind() == ErrorKind::WouldBlock => {} + Err(e) => panic!("unexpected error {}", e), + } + t!(txdone.send(())); + }) +} + +#[test] +#[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31 +fn connect_timeout_valid() { + let mut pool = Pool::new(); + pool.insert_socket_addr("127.0.0.1:0".parse().unwrap(), ambient_authority()); + let listener = TcpListener::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + pool.tcp_binder("127.0.0.1:0") + .unwrap() + .bind_existing_tcp_listener(&listener) + .unwrap(); + listener.listen(None).unwrap(); + let addr = listener.local_addr().unwrap(); + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + pool.connect_timeout_tcp_stream(&addr, Duration::from_secs(2)) + .unwrap(); +} diff --git a/tests/cap-net-ext-udp-split.rs b/tests/cap-net-ext-udp-split.rs new file mode 100644 index 00000000..916d6f7c --- /dev/null +++ b/tests/cap-net-ext-udp-split.rs @@ -0,0 +1,549 @@ +// This file is derived from Rust's library/std/src/net/udp/tests.rs at +// revision 377d1a984cd2a53327092b90aa1d8b7e22d1e347. +// +// This is like cap-net-ext-udp.rs but uses the binder/connecter APIs. + +mod net; +mod sys_common; + +use cap_net_ext::{AddressFamily, Blocking, PoolExt, UdpSocketExt}; +use cap_std::ambient_authority; +use cap_std::net::*; +use net::{next_test_ip4, next_test_ip6}; +use std::io::ErrorKind; +use std::sync::mpsc::channel; +//use sys_common::AsInner; +use std::thread; +use std::time::{Duration, Instant}; + +fn each_ip(f: &mut dyn FnMut(SocketAddr, SocketAddr)) { + f(next_test_ip4(), next_test_ip4()); + f(next_test_ip6(), next_test_ip6()); +} + +macro_rules! t { + ($e:expr) => { + match $e { + Ok(t) => t, + Err(e) => panic!("received error for `{}`: {}", stringify!($e), e), + } + }; +} + +#[test] +fn bind_error() { + let mut pool = Pool::new(); + pool.insert_socket_addr("1.1.1.1:9999".parse().unwrap(), ambient_authority()); + + let socket = UdpSocket::new(AddressFamily::Ipv4, Blocking::Yes).unwrap(); + match pool + .udp_binder("1.1.1.1:9999") + .unwrap() + .bind_existing_udp_socket(&socket) + { + Ok(..) => panic!(), + Err(e) => assert_eq!(e.kind(), ErrorKind::AddrNotAvailable), + } +} + +#[test] +fn socket_smoke_test_ip4() { + each_ip(&mut |server_ip, client_ip| { + let mut client_pool = Pool::new(); + client_pool.insert_socket_addr(client_ip, ambient_authority()); + let mut server_pool = Pool::new(); + server_pool.insert_socket_addr(server_ip, ambient_authority()); + + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + + let p = server_pool.clone(); + let _t = thread::spawn(move || { + let client = + UdpSocket::new(AddressFamily::of_socket_addr(client_ip), Blocking::Yes).unwrap(); + t!(client_pool + .udp_binder(&client_ip) + .unwrap() + .bind_existing_udp_socket(&client)); + rx1.recv().unwrap(); + t!(p.send_to_udp_socket_addr(&client, &[99], &server_ip)); + tx2.send(()).unwrap(); + }); + + let server = + UdpSocket::new(AddressFamily::of_socket_addr(server_ip), Blocking::Yes).unwrap(); + t!(server_pool + .udp_binder(&server_ip) + .unwrap() + .bind_existing_udp_socket(&server)); + tx1.send(()).unwrap(); + let mut buf = [0]; + let (nread, src) = t!(server.recv_from(&mut buf)); + assert_eq!(nread, 1); + assert_eq!(buf[0], 99); + assert_eq!(src, client_ip); + rx2.recv().unwrap(); + }) +} + +#[test] +fn socket_name() { + each_ip(&mut |addr, _| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let server = UdpSocket::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .udp_binder(&addr) + .unwrap() + .bind_existing_udp_socket(&server)); + assert_eq!(addr, t!(server.local_addr())); + }) +} + +#[test] +fn socket_peer() { + each_ip(&mut |addr1, addr2| { + let mut pool1 = Pool::new(); + pool1.insert_socket_addr(addr1, ambient_authority()); + let mut pool2 = Pool::new(); + pool2.insert_socket_addr(addr2, ambient_authority()); + + let server = UdpSocket::new(AddressFamily::of_socket_addr(addr1), Blocking::Yes).unwrap(); + t!(pool1 + .udp_binder(&addr1) + .unwrap() + .bind_existing_udp_socket(&server)); + assert_eq!( + server.peer_addr().unwrap_err().kind(), + ErrorKind::NotConnected + ); + t!(pool2 + .udp_connecter(&addr2) + .unwrap() + .connect_existing_udp_socket(&server)); + assert_eq!(addr2, t!(server.peer_addr())); + }) +} + +#[test] +fn udp_clone_smoke() { + each_ip(&mut |addr1, addr2| { + let mut pool1 = Pool::new(); + pool1.insert_socket_addr(addr1, ambient_authority()); + let mut pool2 = Pool::new(); + pool2.insert_socket_addr(addr2, ambient_authority()); + + let sock1 = UdpSocket::new(AddressFamily::of_socket_addr(addr1), Blocking::Yes).unwrap(); + t!(pool1 + .udp_binder(&addr1) + .unwrap() + .bind_existing_udp_socket(&sock1)); + let sock2 = UdpSocket::new(AddressFamily::of_socket_addr(addr2), Blocking::Yes).unwrap(); + t!(pool2 + .udp_binder(&addr2) + .unwrap() + .bind_existing_udp_socket(&sock2)); + + let _t = thread::spawn(move || { + let mut buf = [0, 0]; + assert_eq!(sock2.recv_from(&mut buf).unwrap(), (1, addr1)); + assert_eq!(buf[0], 1); + t!(pool1.send_to_udp_socket_addr(&sock2, &[2], &addr1)); + }); + + let sock3 = t!(sock1.try_clone()); + + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let p = pool2.clone(); + let _t = thread::spawn(move || { + rx1.recv().unwrap(); + t!(p.send_to_udp_socket_addr(&sock3, &[1], &addr2)); + tx2.send(()).unwrap(); + }); + tx1.send(()).unwrap(); + let mut buf = [0, 0]; + assert_eq!(sock1.recv_from(&mut buf).unwrap(), (1, addr2)); + rx2.recv().unwrap(); + }) +} + +#[test] +fn udp_clone_two_read() { + each_ip(&mut |addr1, addr2| { + let mut pool1 = Pool::new(); + pool1.insert_socket_addr(addr1, ambient_authority()); + let mut pool2 = Pool::new(); + pool2.insert_socket_addr(addr2, ambient_authority()); + + let sock1 = UdpSocket::new(AddressFamily::of_socket_addr(addr1), Blocking::Yes).unwrap(); + t!(pool1 + .udp_binder(&addr1) + .unwrap() + .bind_existing_udp_socket(&sock1)); + let sock2 = UdpSocket::new(AddressFamily::of_socket_addr(addr2), Blocking::Yes).unwrap(); + t!(pool2 + .udp_binder(&addr2) + .unwrap() + .bind_existing_udp_socket(&sock2)); + let (tx1, rx) = channel(); + let tx2 = tx1.clone(); + + let _t = thread::spawn(move || { + t!(pool1.send_to_udp_socket_addr(&sock2, &[1], &addr1)); + rx.recv().unwrap(); + t!(pool1.send_to_udp_socket_addr(&sock2, &[2], &addr1)); + rx.recv().unwrap(); + }); + + let sock3 = t!(sock1.try_clone()); + + let (done, rx) = channel(); + let _t = thread::spawn(move || { + let mut buf = [0, 0]; + t!(sock3.recv_from(&mut buf)); + tx2.send(()).unwrap(); + done.send(()).unwrap(); + }); + let mut buf = [0, 0]; + t!(sock1.recv_from(&mut buf)); + tx1.send(()).unwrap(); + + rx.recv().unwrap(); + }) +} + +#[test] +fn udp_clone_two_write() { + each_ip(&mut |addr1, addr2| { + let mut pool1 = Pool::new(); + pool1.insert_socket_addr(addr1, ambient_authority()); + let mut pool2 = Pool::new(); + pool2.insert_socket_addr(addr2, ambient_authority()); + + let sock1 = UdpSocket::new(AddressFamily::of_socket_addr(addr1), Blocking::Yes).unwrap(); + t!(pool1 + .udp_binder(&addr1) + .unwrap() + .bind_existing_udp_socket(&sock1)); + let sock2 = UdpSocket::new(AddressFamily::of_socket_addr(addr2), Blocking::Yes).unwrap(); + t!(pool2 + .udp_binder(&addr2) + .unwrap() + .bind_existing_udp_socket(&sock2)); + + let (tx, rx) = channel(); + let (serv_tx, serv_rx) = channel(); + + let _t = thread::spawn(move || { + let mut buf = [0, 1]; + rx.recv().unwrap(); + t!(sock2.recv_from(&mut buf)); + serv_tx.send(()).unwrap(); + }); + + let sock3 = t!(sock1.try_clone()); + + let (done, rx) = channel(); + let tx2 = tx.clone(); + let p = pool2.clone(); + let _t = thread::spawn(move || { + if p.send_to_udp_socket_addr(&sock3, &[1], &addr2).is_ok() { + let _ = tx2.send(()); + } + done.send(()).unwrap(); + }); + if pool2.send_to_udp_socket_addr(&sock1, &[2], &addr2).is_ok() { + let _ = tx.send(()); + } + drop(tx); + + rx.recv().unwrap(); + serv_rx.recv().unwrap(); + }) +} + +/* Disable this test, as it depends on Rust-internal details. +#[test] +fn debug() { + let name = if cfg!(windows) { "socket" } else { "fd" }; + let socket_addr = next_test_ip4(); + + let mut pool = Pool::new(); + + let udpsock_inner = udpsock.0.socket().as_inner(); + let compare = format!("UdpSocket {{ addr: {:?}, {}: {:?} }}", socket_addr, name, udpsock_inner); + assert_eq!(format!("{:?}", udpsock), compare); +} +*/ + +// FIXME: re-enabled openbsd/netbsd tests once their socket timeout code +// no longer has rounding errors. +// VxWorks ignores SO_SNDTIMEO. +#[cfg_attr( + any(target_os = "netbsd", target_os = "openbsd", target_os = "vxworks"), + ignore +)] +#[test] +fn timeouts() { + let addr = next_test_ip4(); + + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let stream = UdpSocket::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .udp_binder(&addr) + .unwrap() + .bind_existing_udp_socket(&stream)); + let dur = Duration::new(15410, 0); + + assert_eq!(None, t!(stream.read_timeout())); + + t!(stream.set_read_timeout(Some(dur))); + assert_eq!(Some(dur), t!(stream.read_timeout())); + + assert_eq!(None, t!(stream.write_timeout())); + + t!(stream.set_write_timeout(Some(dur))); + assert_eq!(Some(dur), t!(stream.write_timeout())); + + t!(stream.set_read_timeout(None)); + assert_eq!(None, t!(stream.read_timeout())); + + t!(stream.set_write_timeout(None)); + assert_eq!(None, t!(stream.write_timeout())); +} + +#[test] +fn test_read_timeout() { + let addr = next_test_ip4(); + + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let stream = UdpSocket::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .udp_binder(&addr) + .unwrap() + .bind_existing_udp_socket(&stream)); + t!(stream.set_read_timeout(Some(Duration::from_millis(1000)))); + + let mut buf = [0; 10]; + + let start = Instant::now(); + loop { + let kind = stream + .recv_from(&mut buf) + .err() + .expect("expected error") + .kind(); + if kind != ErrorKind::Interrupted { + assert!( + kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut, + "unexpected_error: {:?}", + kind + ); + break; + } + } + assert!(start.elapsed() > Duration::from_millis(400)); +} + +#[test] +fn test_read_with_timeout() { + let addr = next_test_ip4(); + + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let stream = UdpSocket::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .udp_binder(&addr) + .unwrap() + .bind_existing_udp_socket(&stream)); + t!(stream.set_read_timeout(Some(Duration::from_millis(1000)))); + + t!(pool.send_to_udp_socket_addr(&stream, b"hello world", &addr)); + + let mut buf = [0; 11]; + t!(stream.recv_from(&mut buf)); + assert_eq!(b"hello world", &buf[..]); + + let start = Instant::now(); + loop { + let kind = stream + .recv_from(&mut buf) + .err() + .expect("expected error") + .kind(); + if kind != ErrorKind::Interrupted { + assert!( + kind == ErrorKind::WouldBlock || kind == ErrorKind::TimedOut, + "unexpected_error: {:?}", + kind + ); + break; + } + } + assert!(start.elapsed() > Duration::from_millis(400)); +} + +// Ensure the `set_read_timeout` and `set_write_timeout` calls return errors +// when passed zero Durations +#[test] +fn test_timeout_zero_duration() { + let addr = next_test_ip4(); + + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let socket = UdpSocket::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .udp_binder(&addr) + .unwrap() + .bind_existing_udp_socket(&socket)); + + let result = socket.set_write_timeout(Some(Duration::new(0, 0))); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + + let result = socket.set_read_timeout(Some(Duration::new(0, 0))); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); +} + +#[test] +fn connect_send_recv() { + let addr = next_test_ip4(); + + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let socket = UdpSocket::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .udp_binder(&addr) + .unwrap() + .bind_existing_udp_socket(&socket)); + t!(pool + .udp_connecter(addr) + .unwrap() + .connect_existing_udp_socket(&socket)); + + t!(socket.send(b"hello world")); + + let mut buf = [0; 11]; + t!(socket.recv(&mut buf)); + assert_eq!(b"hello world", &buf[..]); +} + +#[test] +fn connect_send_peek_recv() { + each_ip(&mut |addr, _| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let socket = UdpSocket::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .udp_binder(&addr) + .unwrap() + .bind_existing_udp_socket(&socket)); + t!(pool + .udp_connecter(addr) + .unwrap() + .connect_existing_udp_socket(&socket)); + + t!(socket.send(b"hello world")); + + for _ in 1..3 { + let mut buf = [0; 11]; + let size = t!(socket.peek(&mut buf)); + assert_eq!(b"hello world", &buf[..]); + assert_eq!(size, 11); + } + + let mut buf = [0; 11]; + let size = t!(socket.recv(&mut buf)); + assert_eq!(b"hello world", &buf[..]); + assert_eq!(size, 11); + }) +} + +#[test] +fn peek_from() { + each_ip(&mut |addr, _| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let socket = UdpSocket::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .udp_binder(&addr) + .unwrap() + .bind_existing_udp_socket(&socket)); + t!(pool.send_to_udp_socket_addr(&socket, b"hello world", &addr)); + + for _ in 1..3 { + let mut buf = [0; 11]; + let (size, _) = t!(socket.peek_from(&mut buf)); + assert_eq!(b"hello world", &buf[..]); + assert_eq!(size, 11); + } + + let mut buf = [0; 11]; + let (size, _) = t!(socket.recv_from(&mut buf)); + assert_eq!(b"hello world", &buf[..]); + assert_eq!(size, 11); + }) +} + +#[test] +fn ttl() { + let ttl = 100; + + let addr = next_test_ip4(); + + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let stream = UdpSocket::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .udp_binder(&addr) + .unwrap() + .bind_existing_udp_socket(&stream)); + + t!(stream.set_ttl(ttl)); + assert_eq!(ttl, t!(stream.ttl())); +} + +#[test] +fn set_nonblocking() { + each_ip(&mut |addr, _| { + let mut pool = Pool::new(); + pool.insert_socket_addr(addr, ambient_authority()); + + let socket = UdpSocket::new(AddressFamily::of_socket_addr(addr), Blocking::Yes).unwrap(); + t!(pool + .udp_binder(&addr) + .unwrap() + .bind_existing_udp_socket(&socket)); + + t!(socket.set_nonblocking(true)); + t!(socket.set_nonblocking(false)); + + t!(pool + .udp_connecter(addr) + .unwrap() + .connect_existing_udp_socket(&socket)); + + t!(socket.set_nonblocking(false)); + t!(socket.set_nonblocking(true)); + + let mut buf = [0]; + match socket.recv(&mut buf) { + Ok(_) => panic!("expected error"), + Err(ref e) if e.kind() == ErrorKind::WouldBlock => {} + Err(e) => panic!("unexpected error {}", e), + } + }) +}