diff --git a/src/io.rs b/src/io.rs index ef544e4..5fe748a 100644 --- a/src/io.rs +++ b/src/io.rs @@ -11,6 +11,8 @@ use ntex_util::time::Seconds; type Response = ::Item; +const READY_COUNT: u8 = 32; + pin_project_lite::pin_project! { /// Dispatcher for mqtt protocol pub(crate) struct Dispatcher @@ -50,6 +52,7 @@ struct DispatcherInner>, U: Encoder + Decoder + 'stat read_max_timeout: Seconds, keepalive_timeout: Seconds, + ready_count: u8, response: Option>>, response_idx: usize, } @@ -139,6 +142,7 @@ where st: IoDispatcherState::Processing, response: None, response_idx: 0, + ready_count: 0, read_remains: 0, read_remains_prev: 0, read_max_timeout: Seconds::ZERO, @@ -450,7 +454,8 @@ where fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll> { // check service readiness if self.flags.contains(Flags::READY) { - if self.service.poll_not_ready(cx).is_pending() { + if self.ready_count != 0 && self.service.poll_not_ready(cx).is_pending() { + self.ready_count -= 1; return Poll::Ready(self.check_error()); } self.flags.remove(Flags::READY); @@ -458,6 +463,7 @@ where match self.service.poll_ready(cx) { Poll::Ready(Ok(_)) => { + self.ready_count = READY_COUNT; self.flags.insert(Flags::READY); Poll::Ready(self.check_error()) }