Skip to content

Commit

Permalink
Disable keep-alive timer is not configured (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored May 12, 2024
1 parent 184fc53 commit 0b65b30
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 27 deletions.
20 changes: 12 additions & 8 deletions .github/workflows/osx.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,30 @@ jobs:
- stable
- nightly

name: ${{ matrix.version }} - x86_64-apple-darwin
name: ${{ matrix.version }} - aarch64-apple-darwin
runs-on: macOS-latest

steps:
- uses: actions/checkout@master

- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: ${{ matrix.version }}-x86_64-apple-darwin
toolchain: ${{ matrix.version }}-aarch64-apple-darwin
profile: minimal
override: true

- name: Generate Cargo.lock
uses: actions-rs/cargo@v1
- name: Cache cargo registry
uses: actions/cache@v4
with:
command: generate-lockfile
path: ~/.cargo/registry
key: ${{ matrix.version }}-aarch64-apple-darwin-cargo-registry-trimmed-${{ hashFiles('**/Cargo.lock') }}

- name: Cache Dependencies
uses: Swatinem/[email protected]
- name: Cache cargo index
uses: actions/cache@v4
with:
path: ~/.cargo/git
key: ${{ matrix.version }}-aarch64-apple-darwin-cargo-index-trimmed-${{ hashFiles('**/Cargo.lock') }}

- name: Run tests
uses: actions-rs/cargo@v1
Expand Down
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# Changes

## [2.0.0] - 2024-04-1x
## [2.0.0] - 2024-05-1x

* Mark `Control` type as `non exhaustive`

* Rename `ControlMessage` to `Control`

* Remove protocol variant services

* Disable keep-alive timer is not configured

## [1.1.0] - 2024-03-07

* Use MqttService::connect_timeout() only for reading protocol version
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ features = ["ntex/tokio"]

[dependencies]
ntex = "1.2"
ntex-io = "1.2"
bitflags = "2"
log = "0.4"
pin-project-lite = "0.2"
Expand Down
175 changes: 164 additions & 11 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ where
queue: VecDeque::new(),
}));
let pool = io.memory_pool().pool();
let keepalive_timeout = config.keepalive_timeout();

Dispatcher {
codec,
Expand All @@ -131,13 +132,17 @@ where
inner: DispatcherInner {
io,
state,
flags: Flags::empty(),
keepalive_timeout,
flags: if keepalive_timeout.is_zero() {
Flags::KA_ENABLED
} else {
Flags::empty()
},
config: config.clone(),
st: IoDispatcherState::Processing,
read_remains: 0,
read_remains_prev: 0,
read_max_timeout: Seconds::ZERO,
keepalive_timeout: config.keepalive_timeout(),
},
}
}
Expand Down Expand Up @@ -552,24 +557,25 @@ where
self.io.start_timer(timeout);
return Ok(());
}
log::trace!("{}: Max payload timeout has been reached", self.io.tag());
}
log::trace!("{}: Max payload timeout has been reached", self.io.tag());
return Err(DispatchItem::ReadTimeout);
}
} else if self.flags.contains(Flags::KA_TIMEOUT) {
log::trace!("{}: Keep-alive error, stopping dispatcher", self.io.tag());
return Err(DispatchItem::KeepAliveTimeout);
}

log::trace!("{}: Keep-alive error, stopping dispatcher", self.io.tag());
Err(DispatchItem::KeepAliveTimeout)
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::{cell::Cell, sync::Arc, sync::Mutex};
use std::{cell::Cell, io, sync::Arc, sync::Mutex};

use ntex::channel::condition::Condition;
use ntex::time::{sleep, Millis};
use ntex::util::Bytes;
use ntex::util::{Bytes, BytesMut};
use ntex::{codec::BytesCodec, io as nio, service::ServiceCtx, testing::Io};

use super::*;
Expand All @@ -587,9 +593,18 @@ mod tests {
codec: U,
service: F,
) -> (Self, nio::IoRef) {
let keepalive_timeout = Seconds(30);
Self::new_debug_cfg(io, codec, DispatcherConfig::default(), service)
}

/// Construct new `Dispatcher` instance
pub(crate) fn new_debug_cfg<F: IntoService<S, DispatchItem<U>>>(
io: nio::Io,
codec: U,
config: DispatcherConfig,
service: F,
) -> (Self, nio::IoRef) {
let keepalive_timeout = config.keepalive_timeout();
let rio = io.get_ref();
let config = DispatcherConfig::default();

let state = Rc::new(RefCell::new(DispatcherState {
error: None,
Expand All @@ -610,7 +625,11 @@ mod tests {
keepalive_timeout,
io: IoBoxed::from(io),
st: IoDispatcherState::Processing,
flags: Flags::KA_ENABLED,
flags: if keepalive_timeout.is_zero() {
Flags::empty()
} else {
Flags::KA_ENABLED
},
read_remains: 0,
read_remains_prev: 0,
read_max_timeout: Seconds::ZERO,
Expand Down Expand Up @@ -920,4 +939,138 @@ mod tests {
assert!(!client.is_closed());
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 0, 0]);
}

#[derive(Debug, Copy, Clone)]
struct BytesLenCodec(usize);

impl Encoder for BytesLenCodec {
type Item = Bytes;
type Error = io::Error;

#[inline]
fn encode(&self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.extend_from_slice(&item[..]);
Ok(())
}
}

impl Decoder for BytesLenCodec {
type Item = BytesMut;
type Error = io::Error;

fn decode(&self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() >= self.0 {
Ok(Some(src.split_to(self.0)))
} else {
Ok(None)
}
}
}

/// Do not use keep-alive timer if not configured
#[ntex::test]
async fn test_no_keepalive_err_after_frame_timeout() {
env_logger::init();
let (client, server) = Io::create();
client.remote_buffer_cap(1024);

let data = Arc::new(Mutex::new(RefCell::new(Vec::new())));
let data2 = data.clone();

let config = DispatcherConfig::default();
config.set_keepalive_timeout(Seconds(0)).set_frame_read_rate(Seconds(1), Seconds(2), 2);

let (disp, _) = Dispatcher::new_debug_cfg(
nio::Io::new(server),
BytesLenCodec(2),
config,
ntex::service::fn_service(move |msg: DispatchItem<BytesLenCodec>| {
let data = data2.clone();
async move {
match msg {
DispatchItem::Item(bytes) => {
data.lock().unwrap().borrow_mut().push(0);
return Ok::<_, ()>(Some(bytes.freeze()));
}
DispatchItem::KeepAliveTimeout => {
data.lock().unwrap().borrow_mut().push(1);
}
_ => (),
}
Ok(None)
}
}),
);
ntex::rt::spawn(async move {
let _ = disp.await;
});

client.write("1");
sleep(Millis(250)).await;
client.write("2");
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"12"));
sleep(Millis(2000)).await;

assert_eq!(&data.lock().unwrap().borrow()[..], &[0]);
}

#[ntex::test]
async fn test_read_timeout() {
let (client, server) = Io::create();
client.remote_buffer_cap(1024);

let data = Arc::new(Mutex::new(RefCell::new(Vec::new())));
let data2 = data.clone();

let config = DispatcherConfig::default();
config.set_keepalive_timeout(Seconds::ZERO).set_frame_read_rate(
Seconds(1),
Seconds(2),
2,
);

let (disp, state) = Dispatcher::new_debug_cfg(
nio::Io::new(server),
BytesLenCodec(8),
config,
ntex::service::fn_service(move |msg: DispatchItem<BytesLenCodec>| {
let data = data2.clone();
async move {
match msg {
DispatchItem::Item(bytes) => {
data.lock().unwrap().borrow_mut().push(0);
return Ok::<_, ()>(Some(bytes.freeze()));
}
DispatchItem::ReadTimeout => {
data.lock().unwrap().borrow_mut().push(1);
}
_ => (),
}
Ok(None)
}
}),
);
ntex::rt::spawn(async move {
let _ = disp.await;
});

client.write("12345678");
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"12345678"));

client.write("1");
sleep(Millis(1000)).await;
assert!(!state.flags().contains(nio::Flags::IO_STOPPING));
client.write("23");
sleep(Millis(1000)).await;
assert!(!state.flags().contains(nio::Flags::IO_STOPPING));
client.write("4");
sleep(Millis(2000)).await;

// write side must be closed, dispatcher should fail with keep-alive
assert!(state.flags().contains(nio::Flags::IO_STOPPING));
assert!(client.is_closed());
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]);
}
}
14 changes: 7 additions & 7 deletions src/v5/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ where
Err(err) => {
// do not handle nested error
if error {
inner.sink.drop_sink();
return Err(err);
} else {
// handle error from control service
Expand All @@ -562,20 +563,19 @@ where
}
};

if error {
let response = if error {
if let Some(pkt) = result.packet {
let _ = inner.sink.encode_packet(pkt);
}
if result.disconnect {
inner.sink.drop_sink();
}
Ok(None)
} else {
if result.disconnect {
inner.sink.drop_sink();
}
Ok(result.packet)
};

if result.disconnect {
inner.sink.drop_sink();
}
response
}

#[cfg(test)]
Expand Down

0 comments on commit 0b65b30

Please sign in to comment.