From 8d43a78b6cd793221b401e74bd3e9621d1fe4162 Mon Sep 17 00:00:00 2001 From: jadamcrain Date: Mon, 15 Jan 2024 12:05:45 -0800 Subject: [PATCH 1/2] test and fix for unsolicted issue --- dnp3/examples/outstation.rs | 12 +++- dnp3/src/outstation/session.rs | 71 ++++++++++++-------- dnp3/src/outstation/tests/harness/harness.rs | 7 ++ dnp3/src/outstation/tests/unsolicited.rs | 70 ++++++++++++++++++- 4 files changed, 131 insertions(+), 29 deletions(-) diff --git a/dnp3/examples/outstation.rs b/dnp3/examples/outstation.rs index 5eb35efe..77f93519 100644 --- a/dnp3/examples/outstation.rs +++ b/dnp3/examples/outstation.rs @@ -331,7 +331,15 @@ async fn run_server(mut server: Server) -> Result<(), Box ); db.add(i, Some(EventClass::Class1), CounterConfig::default()); db.add(i, Some(EventClass::Class1), FrozenCounterConfig::default()); - db.add(i, Some(EventClass::Class1), AnalogInputConfig::default()); + db.add( + i, + Some(EventClass::Class1), + AnalogInputConfig { + s_var: StaticAnalogInputVariation::Group30Var1, + e_var: EventAnalogInputVariation::Group32Var1, + deadband: 0.0, + }, + ); db.add( i, Some(EventClass::Class1), @@ -536,6 +544,8 @@ fn get_outstation_config() -> OutstationConfig { EndpointAddress::try_new(1).unwrap(), get_event_buffer_config(), ); + config.class_zero.octet_string = true; + // override the default decoding config.decode_level.application = AppDecodeLevel::ObjectValues; // ANCHOR_END: outstation_config diff --git a/dnp3/src/outstation/session.rs b/dnp3/src/outstation/session.rs index c6808b68..6a1fa579 100644 --- a/dnp3/src/outstation/session.rs +++ b/dnp3/src/outstation/session.rs @@ -305,6 +305,30 @@ enum ConfirmAction { ContinueWait, } +#[derive(Copy, Clone)] +enum NextIdleAction { + NoSleep, + SleepUntilEvent, + SleepUnit(tokio::time::Instant), +} + +impl NextIdleAction { + fn select_earliest(self, instant: Option) -> Self { + match instant { + None => self, + Some(instant) => { + match self { + NextIdleAction::NoSleep => self, + NextIdleAction::SleepUntilEvent => Self::SleepUnit(instant), + NextIdleAction::SleepUnit(other) => { + Self::SleepUnit(tokio::time::Instant::min(instant, other)) + } + } + } + } + } +} + impl OutstationSession { pub(crate) fn new( initial_state: Enabled, @@ -449,7 +473,7 @@ impl OutstationSession { .await?; // check to see if we should perform unsolicited - let deadline = self.check_unsolicited(io, reader, writer, database).await?; + let next_action = self.check_unsolicited(io, reader, writer, database).await?; // handle a deferred read request if it was produced during unsolicited self.handle_deferred_read(io, reader, writer, database) @@ -458,13 +482,7 @@ impl OutstationSession { // check to see if we should perform a link status check self.check_link_status(io, writer).await?; - let deadline = match deadline { - Some(deadline) => match self.next_link_status { - Some(link_deadline) => Some(tokio::time::Instant::min(deadline, link_deadline)), - None => Some(deadline), - }, - None => self.next_link_status, - }; + let next_action = next_action.select_earliest(self.next_link_status); // wait for an event tokio::select! { @@ -475,7 +493,7 @@ impl OutstationSession { _ = database.wait_for_change() => { // wake for unsolicited here } - res = self.sleep_until(deadline) => { + res = self.sleep_until(next_action) => { res? // just wake up } @@ -490,9 +508,10 @@ impl OutstationSession { reader: &mut TransportReader, writer: &mut TransportWriter, database: &mut DatabaseHandle, - ) -> Result, RunError> { + ) -> Result { + if self.config.unsolicited.is_disabled() { - return Ok(None); + return Ok(NextIdleAction::SleepUntilEvent); } match self.state.unsolicited { @@ -504,18 +523,18 @@ impl OutstationSession { { UnsolicitedResult::Timeout | UnsolicitedResult::ReturnToIdle => { self.state.unsolicited = UnsolicitedState::NullRequired; - Ok(Some(tokio::time::Instant::now())) + Ok(NextIdleAction::NoSleep) } UnsolicitedResult::Confirmed => { self.state.unsolicited = UnsolicitedState::Ready(None); - Ok(None) + Ok(NextIdleAction::NoSleep) } } } UnsolicitedState::Ready(deadline) => { if let Some(deadline) = deadline { if tokio::time::Instant::now() < deadline { - return Ok(Some(deadline)); // not ready yet + return Ok(NextIdleAction::SleepUnit(deadline)); // not ready yet } } @@ -526,19 +545,19 @@ impl OutstationSession { { None => { // there was nothing to send - Ok(None) + Ok(NextIdleAction::SleepUntilEvent) } Some(UnsolicitedResult::Timeout) | Some(UnsolicitedResult::ReturnToIdle) => { let retry_at = self.new_unsolicited_retry_deadline(); self.state.unsolicited = UnsolicitedState::Ready(Some(retry_at)); - Ok(Some(retry_at)) + Ok(NextIdleAction::SleepUnit(retry_at)) } Some(UnsolicitedResult::Confirmed) => { database .clear_written_events(self.application.as_mut()) .await; self.state.unsolicited = UnsolicitedState::Ready(None); - Ok(None) + Ok(NextIdleAction::NoSleep) } } } @@ -845,7 +864,7 @@ impl OutstationSession { ) -> Result { let decode_level = self.config.decode_level; tokio::select! { - res = self.sleep_until(Some(deadline)) => { + res = self.sleep_until(NextIdleAction::SleepUnit(deadline)) => { res?; Ok(TimeoutStatus::Yes) } @@ -856,20 +875,18 @@ impl OutstationSession { } } - async fn sleep_until(&mut self, instant: Option) -> Result<(), RunError> { - async fn sleep_only(instant: Option) { - match instant { - Some(x) => tokio::time::sleep_until(x).await, - None => { - // sleep forever - crate::util::future::forever().await; - } + async fn sleep_until(&mut self, next_action: NextIdleAction) -> Result<(), RunError> { + async fn sleep_only(next_action: NextIdleAction) { + match next_action { + NextIdleAction::NoSleep => {}, + NextIdleAction::SleepUnit(x) => tokio::time::sleep_until(x).await, + NextIdleAction::SleepUntilEvent => crate::util::future::forever().await, } } loop { tokio::select! { - _ = sleep_only(instant) => { + _ = sleep_only(next_action) => { return Ok(()); } res = self.handle_next_message() => { diff --git a/dnp3/src/outstation/tests/harness/harness.rs b/dnp3/src/outstation/tests/harness/harness.rs index d9fd6451..99481918 100644 --- a/dnp3/src/outstation/tests/harness/harness.rs +++ b/dnp3/src/outstation/tests/harness/harness.rs @@ -47,6 +47,13 @@ impl OutstationHarness { self.expect_response(response).await; } + pub(crate) async fn expect_write(&mut self) -> Vec { + match self.io.next_event().await { + sfio_tokio_mock_io::Event::Write(bytes) => bytes, + x => panic!("Expected write but got: {x:?}"), + } + } + pub(crate) async fn expect_response(&mut self, response: &[u8]) { assert_eq!( self.io.next_event().await, diff --git a/dnp3/src/outstation/tests/unsolicited.rs b/dnp3/src/outstation/tests/unsolicited.rs index 0250fe88..18601498 100644 --- a/dnp3/src/outstation/tests/unsolicited.rs +++ b/dnp3/src/outstation/tests/unsolicited.rs @@ -1,5 +1,7 @@ +use crate::app::gen::prefixed::PrefixedVariation; use crate::app::measurement::*; -use crate::app::Timestamp; +use crate::app::parse::parser::{HeaderDetails, ParsedFragment}; +use crate::app::{BufferSize, Timestamp}; use crate::outstation::config::OutstationConfig; use crate::outstation::database::*; use crate::outstation::{BufferState, ClassCount, TypeCount}; @@ -303,6 +305,72 @@ async fn handles_disable_unsolicited_during_unsolicited_confirm_wait() { harness.check_no_events(); } +#[tokio::test] +async fn sends_unsolicited_from_one_update() { + let mut config = get_default_unsolicited_config(); + config.unsolicited_buffer_size = BufferSize::min(); + config.event_buffer_config.max_analog = 1001; + let mut harness = new_harness(config); + confirm_null_unsolicited(&mut harness).await; + enable_unsolicited(&mut harness).await; + + harness.handle.database.transaction(|db| { + for i in 0..1000 { + db.add( + i, + Some(EventClass::Class1), + AnalogInputConfig { + s_var: StaticAnalogInputVariation::Group30Var1, + e_var: EventAnalogInputVariation::Group32Var1, // 5 bytes + deadband: 0.0, + }, + ); + } + }); + + harness.handle.database.transaction(|db| { + let value = AnalogInput { + value: 98.6, + flags: Flags { value: 0 }, + time: None, + }; + + for i in 0..1000 { + db.update(i, &value, UpdateOptions::detect_event()); + } + }); + + let mut total_fragments: usize = 0; + let mut total_events = 0; + + loop { + let rx = harness.expect_write().await; + let response = ParsedFragment::parse(&rx).unwrap().to_response().unwrap(); + let mut num_events = 0; + match response.objects.unwrap().get_only_header().unwrap().details { + HeaderDetails::TwoByteCountAndPrefix(_, PrefixedVariation::Group32Var1(seq)) => { + num_events += seq.iter().count() + } + x => panic!("Unexpected header: {x:?}"), + } + + total_fragments += 1; + total_events += num_events; + + // confirm the fragment + let confirm = &[uns_confirm(response.header.control.seq.value()), 0x00]; + harness.send_and_process(confirm).await; + + // terminate when the outstation indicates there are no more events + if !response.header.iin.iin1.get_class_1_events() { + break; + } + } + + assert_eq!(total_events, 1000); + assert!(total_fragments > 10); +} + #[tokio::test] async fn buffer_overflow_issue() { let mut config = get_default_unsolicited_config(); From a85627c36356f1ca81d240bec0f66e9fc5cf0084 Mon Sep 17 00:00:00 2001 From: jadamcrain Date: Mon, 15 Jan 2024 12:13:20 -0800 Subject: [PATCH 2/2] formatting --- dnp3/src/outstation/session.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/dnp3/src/outstation/session.rs b/dnp3/src/outstation/session.rs index 6a1fa579..a64ac2de 100644 --- a/dnp3/src/outstation/session.rs +++ b/dnp3/src/outstation/session.rs @@ -316,15 +316,13 @@ impl NextIdleAction { fn select_earliest(self, instant: Option) -> Self { match instant { None => self, - Some(instant) => { - match self { - NextIdleAction::NoSleep => self, - NextIdleAction::SleepUntilEvent => Self::SleepUnit(instant), - NextIdleAction::SleepUnit(other) => { - Self::SleepUnit(tokio::time::Instant::min(instant, other)) - } + Some(instant) => match self { + NextIdleAction::NoSleep => self, + NextIdleAction::SleepUntilEvent => Self::SleepUnit(instant), + NextIdleAction::SleepUnit(other) => { + Self::SleepUnit(tokio::time::Instant::min(instant, other)) } - } + }, } } } @@ -509,7 +507,6 @@ impl OutstationSession { writer: &mut TransportWriter, database: &mut DatabaseHandle, ) -> Result { - if self.config.unsolicited.is_disabled() { return Ok(NextIdleAction::SleepUntilEvent); } @@ -878,7 +875,7 @@ impl OutstationSession { async fn sleep_until(&mut self, next_action: NextIdleAction) -> Result<(), RunError> { async fn sleep_only(next_action: NextIdleAction) { match next_action { - NextIdleAction::NoSleep => {}, + NextIdleAction::NoSleep => {} NextIdleAction::SleepUnit(x) => tokio::time::sleep_until(x).await, NextIdleAction::SleepUntilEvent => crate::util::future::forever().await, }