Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable keep-alive timer is not configured #171

Merged
merged 4 commits into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions .github/workflows/osx.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,30 @@ jobs:
- stable
- nightly

name: ${{ matrix.version }} - x86_64-apple-darwin
name: ${{ matrix.version }} - aarch64-apple-darwin
runs-on: macOS-latest

steps:
- uses: actions/checkout@master

- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: ${{ matrix.version }}-x86_64-apple-darwin
toolchain: ${{ matrix.version }}-aarch64-apple-darwin
profile: minimal
override: true

- name: Generate Cargo.lock
uses: actions-rs/cargo@v1
- name: Cache cargo registry
uses: actions/cache@v4
with:
command: generate-lockfile
path: ~/.cargo/registry
key: ${{ matrix.version }}-aarch64-apple-darwin-cargo-registry-trimmed-${{ hashFiles('**/Cargo.lock') }}

- name: Cache Dependencies
uses: Swatinem/[email protected]
- name: Cache cargo index
uses: actions/cache@v4
with:
path: ~/.cargo/git
key: ${{ matrix.version }}-aarch64-apple-darwin-cargo-index-trimmed-${{ hashFiles('**/Cargo.lock') }}

- name: Run tests
uses: actions-rs/cargo@v1
Expand Down
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# Changes

## [2.0.0] - 2024-04-1x
## [2.0.0] - 2024-05-1x

* Mark `Control` type as `non exhaustive`

* Rename `ControlMessage` to `Control`

* Remove protocol variant services

* Disable keep-alive timer is not configured

## [1.1.0] - 2024-03-07

* Use MqttService::connect_timeout() only for reading protocol version
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ features = ["ntex/tokio"]

[dependencies]
ntex = "1.2"
ntex-io = "1.2"
bitflags = "2"
log = "0.4"
pin-project-lite = "0.2"
Expand Down
175 changes: 164 additions & 11 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
queue: VecDeque::new(),
}));
let pool = io.memory_pool().pool();
let keepalive_timeout = config.keepalive_timeout();

Dispatcher {
codec,
Expand All @@ -131,13 +132,17 @@
inner: DispatcherInner {
io,
state,
flags: Flags::empty(),
keepalive_timeout,
flags: if keepalive_timeout.is_zero() {
Flags::KA_ENABLED
} else {
Flags::empty()
},
config: config.clone(),
st: IoDispatcherState::Processing,
read_remains: 0,
read_remains_prev: 0,
read_max_timeout: Seconds::ZERO,
keepalive_timeout: config.keepalive_timeout(),
},
}
}
Expand Down Expand Up @@ -552,24 +557,25 @@
self.io.start_timer(timeout);
return Ok(());
}
log::trace!("{}: Max payload timeout has been reached", self.io.tag());
}
log::trace!("{}: Max payload timeout has been reached", self.io.tag());
return Err(DispatchItem::ReadTimeout);
}
} else if self.flags.contains(Flags::KA_TIMEOUT) {
log::trace!("{}: Keep-alive error, stopping dispatcher", self.io.tag());
return Err(DispatchItem::KeepAliveTimeout);
}

log::trace!("{}: Keep-alive error, stopping dispatcher", self.io.tag());
Err(DispatchItem::KeepAliveTimeout)
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::{cell::Cell, sync::Arc, sync::Mutex};
use std::{cell::Cell, io, sync::Arc, sync::Mutex};

use ntex::channel::condition::Condition;
use ntex::time::{sleep, Millis};
use ntex::util::Bytes;
use ntex::util::{Bytes, BytesMut};
use ntex::{codec::BytesCodec, io as nio, service::ServiceCtx, testing::Io};

use super::*;
Expand All @@ -587,9 +593,18 @@
codec: U,
service: F,
) -> (Self, nio::IoRef) {
let keepalive_timeout = Seconds(30);
Self::new_debug_cfg(io, codec, DispatcherConfig::default(), service)
}

/// Construct new `Dispatcher` instance
pub(crate) fn new_debug_cfg<F: IntoService<S, DispatchItem<U>>>(
io: nio::Io,
codec: U,
config: DispatcherConfig,
service: F,
) -> (Self, nio::IoRef) {
let keepalive_timeout = config.keepalive_timeout();
let rio = io.get_ref();
let config = DispatcherConfig::default();

let state = Rc::new(RefCell::new(DispatcherState {
error: None,
Expand All @@ -610,7 +625,11 @@
keepalive_timeout,
io: IoBoxed::from(io),
st: IoDispatcherState::Processing,
flags: Flags::KA_ENABLED,
flags: if keepalive_timeout.is_zero() {
Flags::empty()
} else {
Flags::KA_ENABLED
},
read_remains: 0,
read_remains_prev: 0,
read_max_timeout: Seconds::ZERO,
Expand Down Expand Up @@ -920,4 +939,138 @@
assert!(!client.is_closed());
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 0, 0]);
}

#[derive(Debug, Copy, Clone)]
struct BytesLenCodec(usize);

impl Encoder for BytesLenCodec {
type Item = Bytes;
type Error = io::Error;

#[inline]
fn encode(&self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.extend_from_slice(&item[..]);
Ok(())
}
}

impl Decoder for BytesLenCodec {
type Item = BytesMut;
type Error = io::Error;

fn decode(&self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() >= self.0 {
Ok(Some(src.split_to(self.0)))
} else {
Ok(None)
}
}
}

/// Do not use keep-alive timer if not configured
#[ntex::test]
async fn test_no_keepalive_err_after_frame_timeout() {
env_logger::init();
let (client, server) = Io::create();
client.remote_buffer_cap(1024);

let data = Arc::new(Mutex::new(RefCell::new(Vec::new())));
let data2 = data.clone();

let config = DispatcherConfig::default();
config.set_keepalive_timeout(Seconds(0)).set_frame_read_rate(Seconds(1), Seconds(2), 2);

let (disp, _) = Dispatcher::new_debug_cfg(
nio::Io::new(server),
BytesLenCodec(2),
config,
ntex::service::fn_service(move |msg: DispatchItem<BytesLenCodec>| {
let data = data2.clone();
async move {
match msg {
DispatchItem::Item(bytes) => {
data.lock().unwrap().borrow_mut().push(0);
return Ok::<_, ()>(Some(bytes.freeze()));
}
DispatchItem::KeepAliveTimeout => {
data.lock().unwrap().borrow_mut().push(1);
}
_ => (),

Check warning on line 998 in src/io.rs

View check run for this annotation

Codecov / codecov/patch

src/io.rs#L995-L998

Added lines #L995 - L998 were not covered by tests
}
Ok(None)

Check warning on line 1000 in src/io.rs

View check run for this annotation

Codecov / codecov/patch

src/io.rs#L1000

Added line #L1000 was not covered by tests
}
}),
);
ntex::rt::spawn(async move {
let _ = disp.await;
});

Check warning on line 1006 in src/io.rs

View check run for this annotation

Codecov / codecov/patch

src/io.rs#L1006

Added line #L1006 was not covered by tests

client.write("1");
sleep(Millis(250)).await;
client.write("2");
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"12"));
sleep(Millis(2000)).await;

assert_eq!(&data.lock().unwrap().borrow()[..], &[0]);
}

#[ntex::test]
async fn test_read_timeout() {
let (client, server) = Io::create();
client.remote_buffer_cap(1024);

let data = Arc::new(Mutex::new(RefCell::new(Vec::new())));
let data2 = data.clone();

let config = DispatcherConfig::default();
config.set_keepalive_timeout(Seconds::ZERO).set_frame_read_rate(
Seconds(1),
Seconds(2),
2,
);

let (disp, state) = Dispatcher::new_debug_cfg(
nio::Io::new(server),
BytesLenCodec(8),
config,
ntex::service::fn_service(move |msg: DispatchItem<BytesLenCodec>| {
let data = data2.clone();
async move {
match msg {
DispatchItem::Item(bytes) => {
data.lock().unwrap().borrow_mut().push(0);
return Ok::<_, ()>(Some(bytes.freeze()));
}
DispatchItem::ReadTimeout => {
data.lock().unwrap().borrow_mut().push(1);
}
_ => (),

Check warning on line 1048 in src/io.rs

View check run for this annotation

Codecov / codecov/patch

src/io.rs#L1048

Added line #L1048 was not covered by tests
}
Ok(None)
}
}),
);
ntex::rt::spawn(async move {
let _ = disp.await;
});

client.write("12345678");
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"12345678"));

client.write("1");
sleep(Millis(1000)).await;
assert!(!state.flags().contains(nio::Flags::IO_STOPPING));
client.write("23");
sleep(Millis(1000)).await;
assert!(!state.flags().contains(nio::Flags::IO_STOPPING));
client.write("4");
sleep(Millis(2000)).await;

// write side must be closed, dispatcher should fail with keep-alive
assert!(state.flags().contains(nio::Flags::IO_STOPPING));
assert!(client.is_closed());
assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]);
}
}
14 changes: 7 additions & 7 deletions src/v5/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@
Err(err) => {
// do not handle nested error
if error {
inner.sink.drop_sink();

Check warning on line 551 in src/v5/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/v5/dispatcher.rs#L551

Added line #L551 was not covered by tests
return Err(err);
} else {
// handle error from control service
Expand All @@ -562,20 +563,19 @@
}
};

if error {
let response = if error {
if let Some(pkt) = result.packet {
let _ = inner.sink.encode_packet(pkt);
}
if result.disconnect {
inner.sink.drop_sink();
}
Ok(None)
} else {
if result.disconnect {
inner.sink.drop_sink();
}
Ok(result.packet)
};

if result.disconnect {
inner.sink.drop_sink();
}
response
}

#[cfg(test)]
Expand Down
Loading