From c19c1409824a68586c3779729329040a554f12db Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Wed, 10 Jul 2024 02:44:38 +0300 Subject: [PATCH] fix(webocket): Avoid panic when polling quicksink after errors Pull-Request: #5482. --- Cargo.lock | 3 +- Cargo.toml | 2 +- transports/websocket/CHANGELOG.md | 6 +++ transports/websocket/Cargo.toml | 3 +- transports/websocket/src/framed.rs | 2 +- transports/websocket/src/quicksink.rs | 72 ++++++++++++++++++--------- 6 files changed, 61 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec114491e48..6a9dc56e987 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3492,7 +3492,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" -version = "0.43.1" +version = "0.43.2" dependencies = [ "async-std", "either", @@ -3507,6 +3507,7 @@ dependencies = [ "rcgen", "rw-stream-sink", "soketto", + "thiserror", "tracing", "url", "webpki-roots 0.25.2", diff --git a/Cargo.toml b/Cargo.toml index 7fa6856c26b..84767573b9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,7 +112,7 @@ libp2p-upnp = { version = "0.2.2", path = "protocols/upnp" } libp2p-webrtc = { version = "0.7.1-alpha", path = "transports/webrtc" } libp2p-webrtc-utils = { version = "0.2.1", path = "misc/webrtc-utils" } libp2p-webrtc-websys = { version = "0.3.0-alpha", path = "transports/webrtc-websys" } -libp2p-websocket = { version = "0.43.1", path = "transports/websocket" } +libp2p-websocket = { version = "0.43.2", path = "transports/websocket" } libp2p-websocket-websys = { version = "0.3.2", path = "transports/websocket-websys" } libp2p-webtransport-websys = { version = "0.3.0", path = "transports/webtransport-websys" } libp2p-yamux = { version = "0.45.2", path = "muxers/yamux" } diff --git a/transports/websocket/CHANGELOG.md b/transports/websocket/CHANGELOG.md index d206cbac6a1..419ff41c6fc 100644 --- a/transports/websocket/CHANGELOG.md +++ b/transports/websocket/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.43.2 + +- fix: Avoid websocket panic on polling after errors. See [PR 5482]. + +[PR 5482]: https://github.com/libp2p/rust-libp2p/pull/5482 + ## 0.43.1 ## 0.43.0 diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index b022d95ca47..f1b0a413115 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-websocket" edition = "2021" rust-version = { workspace = true } description = "WebSocket transport for libp2p" -version = "0.43.1" +version = "0.43.2" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -21,6 +21,7 @@ pin-project-lite = "0.2.14" rw-stream-sink = { workspace = true } soketto = "0.8.0" tracing = { workspace = true } +thiserror = "1.0.61" url = "2.5" webpki-roots = "0.25" diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index f6f99d18580..69a01fdbd46 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -571,7 +571,7 @@ fn location_to_multiaddr(location: &str) -> Result> { /// The websocket connection. pub struct Connection { receiver: BoxStream<'static, Result>, - sender: Pin + Send>>, + sender: Pin> + Send>>, _marker: std::marker::PhantomData, } diff --git a/transports/websocket/src/quicksink.rs b/transports/websocket/src/quicksink.rs index cb2c98b078f..4f620536ea1 100644 --- a/transports/websocket/src/quicksink.rs +++ b/transports/websocket/src/quicksink.rs @@ -30,14 +30,6 @@ // Ok::<_, io::Error>(stdout) // }); // ``` -// -// # Panics -// -// - If any of the [`Sink`] methods produce an error, the sink transitions -// to a failure state and none of its methods must be called afterwards or -// else a panic will occur. -// - If [`Sink::poll_close`] has been called, no other sink method must be -// called afterwards or else a panic will be caused. use futures::{ready, sink::Sink}; use pin_project_lite::pin_project; @@ -102,6 +94,15 @@ enum State { Failed, } +/// Errors the `Sink` may return. +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + #[error("Error while sending over the sink, {0}")] + Send(E), + #[error("The Sink has closed")] + Closed, +} + pin_project! { /// `SinkImpl` implements the `Sink` trait. #[derive(Debug)] @@ -119,7 +120,7 @@ where F: FnMut(S, Action) -> T, T: Future>, { - type Error = E; + type Error = Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = self.project(); @@ -135,7 +136,7 @@ where Err(e) => { this.future.set(None); *this.state = State::Failed; - Poll::Ready(Err(e)) + Poll::Ready(Err(Error::Send(e))) } } } @@ -143,20 +144,19 @@ where Ok(_) => { this.future.set(None); *this.state = State::Closed; - panic!("SinkImpl::poll_ready called on a closing sink.") + Poll::Ready(Err(Error::Closed)) } Err(e) => { this.future.set(None); *this.state = State::Failed; - Poll::Ready(Err(e)) + Poll::Ready(Err(Error::Send(e))) } }, State::Empty => { assert!(this.param.is_some()); Poll::Ready(Ok(())) } - State::Closed => panic!("SinkImpl::poll_ready called on a closed sink."), - State::Failed => panic!("SinkImpl::poll_ready called after error."), + State::Closed | State::Failed => Poll::Ready(Err(Error::Closed)), } } @@ -193,7 +193,7 @@ where Err(e) => { this.future.set(None); *this.state = State::Failed; - return Poll::Ready(Err(e)); + return Poll::Ready(Err(Error::Send(e))); } }, State::Flushing => { @@ -207,7 +207,7 @@ where Err(e) => { this.future.set(None); *this.state = State::Failed; - return Poll::Ready(Err(e)); + return Poll::Ready(Err(Error::Send(e))); } } } @@ -221,11 +221,10 @@ where Err(e) => { this.future.set(None); *this.state = State::Failed; - return Poll::Ready(Err(e)); + return Poll::Ready(Err(Error::Send(e))); } }, - State::Closed => return Poll::Ready(Ok(())), - State::Failed => panic!("SinkImpl::poll_flush called after error."), + State::Closed | State::Failed => return Poll::Ready(Err(Error::Closed)), } } } @@ -253,7 +252,7 @@ where Err(e) => { this.future.set(None); *this.state = State::Failed; - return Poll::Ready(Err(e)); + return Poll::Ready(Err(Error::Send(e))); } }, State::Flushing => { @@ -266,7 +265,7 @@ where Err(e) => { this.future.set(None); *this.state = State::Failed; - return Poll::Ready(Err(e)); + return Poll::Ready(Err(Error::Send(e))); } } } @@ -280,11 +279,11 @@ where Err(e) => { this.future.set(None); *this.state = State::Failed; - return Poll::Ready(Err(e)); + return Poll::Ready(Err(Error::Send(e))); } }, State::Closed => return Poll::Ready(Ok(())), - State::Failed => panic!("SinkImpl::poll_closed called after error."), + State::Failed => return Poll::Ready(Err(Error::Closed)), } } } @@ -347,4 +346,31 @@ mod tests { assert_eq!(&expected[..], &actual[..]) }); } + + #[test] + fn error_does_not_panic() { + task::block_on(async { + let sink = make_sink(io::stdout(), |mut _stdout, _action| async move { + Err(io::Error::new(io::ErrorKind::Other, "oh no")) + }); + + futures::pin_mut!(sink); + + let result = sink.send("hello").await; + match result { + Err(crate::quicksink::Error::Send(e)) => { + assert_eq!(e.kind(), io::ErrorKind::Other); + assert_eq!(e.to_string(), "oh no") + } + _ => panic!("unexpected result: {:?}", result), + }; + + // Call send again, expect not to panic. + let result = sink.send("hello").await; + match result { + Err(crate::quicksink::Error::Closed) => {} + _ => panic!("unexpected result: {:?}", result), + }; + }) + } }