Skip to content

Commit

Permalink
Remove flush from request
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Aug 2, 2023
1 parent aeede88 commit 4322246
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 5 deletions.
3 changes: 0 additions & 3 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,6 @@ impl Client {
}
None => self.publish_with_reply(subject, inbox, payload).await?,
}
self.flush()
.await
.map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
let request = match timeout {
Some(timeout) => {
tokio::time::timeout(timeout, sub.next())
Expand Down
1 change: 0 additions & 1 deletion async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,6 @@ pub struct PublishAckFuture {

impl PublishAckFuture {
async fn next_with_timeout(mut self) -> Result<PublishAck, PublishError> {
self.subscription.sender.send(Command::TryFlush).await.ok();
let next = tokio::time::timeout(self.timeout, self.subscription.next())
.await
.map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;
Expand Down
4 changes: 3 additions & 1 deletion async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,9 @@ impl ConnectionHandler {
.pending
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
if prev <= 1 {
self.handle_flush().await?;
if let Err(_err) = self.handle_flush().await {
self.handle_disconnect().await?;
}
}
}
None => {
Expand Down

0 comments on commit 4322246

Please sign in to comment.