Skip to content

Commit

Permalink
Fix unsolicted bug (#341)
Browse files Browse the repository at this point in the history
  • Loading branch information
jadamcrain committed Jan 16, 2024
1 parent 386339b commit 5df8eee
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 29 deletions.
12 changes: 11 additions & 1 deletion dnp3/examples/outstation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,15 @@ async fn run_server(mut server: Server) -> Result<(), Box<dyn std::error::Error>
);
db.add(i, Some(EventClass::Class1), CounterConfig::default());
db.add(i, Some(EventClass::Class1), FrozenCounterConfig::default());
db.add(i, Some(EventClass::Class1), AnalogInputConfig::default());
db.add(
i,
Some(EventClass::Class1),
AnalogInputConfig {
s_var: StaticAnalogInputVariation::Group30Var1,
e_var: EventAnalogInputVariation::Group32Var1,
deadband: 0.0,
},
);
db.add(
i,
Some(EventClass::Class1),
Expand Down Expand Up @@ -536,6 +544,8 @@ fn get_outstation_config() -> OutstationConfig {
EndpointAddress::try_new(1).unwrap(),
get_event_buffer_config(),
);
config.class_zero.octet_string = true;

// override the default decoding
config.decode_level.application = AppDecodeLevel::ObjectValues;
// ANCHOR_END: outstation_config
Expand Down
68 changes: 41 additions & 27 deletions dnp3/src/outstation/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,28 @@ enum ConfirmAction {
ContinueWait,
}

#[derive(Copy, Clone)]
enum NextIdleAction {
NoSleep,
SleepUntilEvent,
SleepUnit(tokio::time::Instant),
}

impl NextIdleAction {
fn select_earliest(self, instant: Option<tokio::time::Instant>) -> Self {
match instant {
None => self,
Some(instant) => match self {
NextIdleAction::NoSleep => self,
NextIdleAction::SleepUntilEvent => Self::SleepUnit(instant),
NextIdleAction::SleepUnit(other) => {
Self::SleepUnit(tokio::time::Instant::min(instant, other))
}
},
}
}
}

impl OutstationSession {
pub(crate) fn new(
initial_state: Enabled,
Expand Down Expand Up @@ -449,7 +471,7 @@ impl OutstationSession {
.await?;

// check to see if we should perform unsolicited
let deadline = self.check_unsolicited(io, reader, writer, database).await?;
let next_action = self.check_unsolicited(io, reader, writer, database).await?;

// handle a deferred read request if it was produced during unsolicited
self.handle_deferred_read(io, reader, writer, database)
Expand All @@ -458,13 +480,7 @@ impl OutstationSession {
// check to see if we should perform a link status check
self.check_link_status(io, writer).await?;

let deadline = match deadline {
Some(deadline) => match self.next_link_status {
Some(link_deadline) => Some(tokio::time::Instant::min(deadline, link_deadline)),
None => Some(deadline),
},
None => self.next_link_status,
};
let next_action = next_action.select_earliest(self.next_link_status);

// wait for an event
tokio::select! {
Expand All @@ -475,7 +491,7 @@ impl OutstationSession {
_ = database.wait_for_change() => {
// wake for unsolicited here
}
res = self.sleep_until(deadline) => {
res = self.sleep_until(next_action) => {
res?
// just wake up
}
Expand All @@ -490,9 +506,9 @@ impl OutstationSession {
reader: &mut TransportReader,
writer: &mut TransportWriter,
database: &mut DatabaseHandle,
) -> Result<Option<tokio::time::Instant>, RunError> {
) -> Result<NextIdleAction, RunError> {
if self.config.unsolicited.is_disabled() {
return Ok(None);
return Ok(NextIdleAction::SleepUntilEvent);
}

match self.state.unsolicited {
Expand All @@ -504,18 +520,18 @@ impl OutstationSession {
{
UnsolicitedResult::Timeout | UnsolicitedResult::ReturnToIdle => {
self.state.unsolicited = UnsolicitedState::NullRequired;
Ok(Some(tokio::time::Instant::now()))
Ok(NextIdleAction::NoSleep)
}
UnsolicitedResult::Confirmed => {
self.state.unsolicited = UnsolicitedState::Ready(None);
Ok(None)
Ok(NextIdleAction::NoSleep)
}
}
}
UnsolicitedState::Ready(deadline) => {
if let Some(deadline) = deadline {
if tokio::time::Instant::now() < deadline {
return Ok(Some(deadline)); // not ready yet
return Ok(NextIdleAction::SleepUnit(deadline)); // not ready yet
}
}

Expand All @@ -526,19 +542,19 @@ impl OutstationSession {
{
None => {
// there was nothing to send
Ok(None)
Ok(NextIdleAction::SleepUntilEvent)
}
Some(UnsolicitedResult::Timeout) | Some(UnsolicitedResult::ReturnToIdle) => {
let retry_at = self.new_unsolicited_retry_deadline();
self.state.unsolicited = UnsolicitedState::Ready(Some(retry_at));
Ok(Some(retry_at))
Ok(NextIdleAction::SleepUnit(retry_at))
}
Some(UnsolicitedResult::Confirmed) => {
database
.clear_written_events(self.application.as_mut())
.await;
self.state.unsolicited = UnsolicitedState::Ready(None);
Ok(None)
Ok(NextIdleAction::NoSleep)
}
}
}
Expand Down Expand Up @@ -845,7 +861,7 @@ impl OutstationSession {
) -> Result<TimeoutStatus, RunError> {
let decode_level = self.config.decode_level;
tokio::select! {
res = self.sleep_until(Some(deadline)) => {
res = self.sleep_until(NextIdleAction::SleepUnit(deadline)) => {
res?;
Ok(TimeoutStatus::Yes)
}
Expand All @@ -856,20 +872,18 @@ impl OutstationSession {
}
}

async fn sleep_until(&mut self, instant: Option<tokio::time::Instant>) -> Result<(), RunError> {
async fn sleep_only(instant: Option<tokio::time::Instant>) {
match instant {
Some(x) => tokio::time::sleep_until(x).await,
None => {
// sleep forever
crate::util::future::forever().await;
}
async fn sleep_until(&mut self, next_action: NextIdleAction) -> Result<(), RunError> {
async fn sleep_only(next_action: NextIdleAction) {
match next_action {
NextIdleAction::NoSleep => {}
NextIdleAction::SleepUnit(x) => tokio::time::sleep_until(x).await,
NextIdleAction::SleepUntilEvent => crate::util::future::forever().await,
}
}

loop {
tokio::select! {
_ = sleep_only(instant) => {
_ = sleep_only(next_action) => {
return Ok(());
}
res = self.handle_next_message() => {
Expand Down
7 changes: 7 additions & 0 deletions dnp3/src/outstation/tests/harness/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ impl OutstationHarness {
self.expect_response(response).await;
}

pub(crate) async fn expect_write(&mut self) -> Vec<u8> {
match self.io.next_event().await {
sfio_tokio_mock_io::Event::Write(bytes) => bytes,
x => panic!("Expected write but got: {x:?}"),
}
}

pub(crate) async fn expect_response(&mut self, response: &[u8]) {
assert_eq!(
self.io.next_event().await,
Expand Down
70 changes: 69 additions & 1 deletion dnp3/src/outstation/tests/unsolicited.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::app::gen::prefixed::PrefixedVariation;
use crate::app::measurement::*;
use crate::app::Timestamp;
use crate::app::parse::parser::{HeaderDetails, ParsedFragment};
use crate::app::{BufferSize, Timestamp};
use crate::outstation::config::OutstationConfig;
use crate::outstation::database::*;
use crate::outstation::{BufferState, ClassCount, TypeCount};
Expand Down Expand Up @@ -303,6 +305,72 @@ async fn handles_disable_unsolicited_during_unsolicited_confirm_wait() {
harness.check_no_events();
}

#[tokio::test]
async fn sends_unsolicited_from_one_update() {
let mut config = get_default_unsolicited_config();
config.unsolicited_buffer_size = BufferSize::min();
config.event_buffer_config.max_analog = 1001;
let mut harness = new_harness(config);
confirm_null_unsolicited(&mut harness).await;
enable_unsolicited(&mut harness).await;

harness.handle.database.transaction(|db| {
for i in 0..1000 {
db.add(
i,
Some(EventClass::Class1),
AnalogInputConfig {
s_var: StaticAnalogInputVariation::Group30Var1,
e_var: EventAnalogInputVariation::Group32Var1, // 5 bytes
deadband: 0.0,
},
);
}
});

harness.handle.database.transaction(|db| {
let value = AnalogInput {
value: 98.6,
flags: Flags { value: 0 },
time: None,
};

for i in 0..1000 {
db.update(i, &value, UpdateOptions::detect_event());
}
});

let mut total_fragments: usize = 0;
let mut total_events = 0;

loop {
let rx = harness.expect_write().await;
let response = ParsedFragment::parse(&rx).unwrap().to_response().unwrap();
let mut num_events = 0;
match response.objects.unwrap().get_only_header().unwrap().details {
HeaderDetails::TwoByteCountAndPrefix(_, PrefixedVariation::Group32Var1(seq)) => {
num_events += seq.iter().count()
}
x => panic!("Unexpected header: {x:?}"),
}

total_fragments += 1;
total_events += num_events;

// confirm the fragment
let confirm = &[uns_confirm(response.header.control.seq.value()), 0x00];
harness.send_and_process(confirm).await;

// terminate when the outstation indicates there are no more events
if !response.header.iin.iin1.get_class_1_events() {
break;
}
}

assert_eq!(total_events, 1000);
assert!(total_fragments > 10);
}

#[tokio::test]
async fn buffer_overflow_issue() {
let mut config = get_default_unsolicited_config();
Expand Down

0 comments on commit 5df8eee

Please sign in to comment.