diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index beed677dd..79fd79df5 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -346,9 +346,6 @@ impl Client { } None => self.publish_with_reply(subject, inbox, payload).await?, } - self.flush() - .await - .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?; let request = match timeout { Some(timeout) => { tokio::time::timeout(timeout, sub.next()) diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index 1fd009848..056d10a8b 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -987,7 +987,6 @@ pub struct PublishAckFuture { impl PublishAckFuture { async fn next_with_timeout(mut self) -> Result { - self.subscription.sender.send(Command::TryFlush).await.ok(); let next = tokio::time::timeout(self.timeout, self.subscription.next()) .await .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?; diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 5ffba91fb..6fdd55631 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -388,7 +388,9 @@ impl ConnectionHandler { .pending .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); if prev <= 1 { - self.handle_flush().await?; + if let Err(_err) = self.handle_flush().await { + self.handle_disconnect().await?; + } } } None => {