Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unsolicted bug #341

Merged
merged 2 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion dnp3/examples/outstation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,15 @@ async fn run_server(mut server: Server) -> Result<(), Box<dyn std::error::Error>
);
db.add(i, Some(EventClass::Class1), CounterConfig::default());
db.add(i, Some(EventClass::Class1), FrozenCounterConfig::default());
db.add(i, Some(EventClass::Class1), AnalogInputConfig::default());
db.add(
i,
Some(EventClass::Class1),
AnalogInputConfig {
s_var: StaticAnalogInputVariation::Group30Var1,
e_var: EventAnalogInputVariation::Group32Var1,
deadband: 0.0,
},
);
db.add(
i,
Some(EventClass::Class1),
Expand Down Expand Up @@ -536,6 +544,8 @@ fn get_outstation_config() -> OutstationConfig {
EndpointAddress::try_new(1).unwrap(),
get_event_buffer_config(),
);
config.class_zero.octet_string = true;

// override the default decoding
config.decode_level.application = AppDecodeLevel::ObjectValues;
// ANCHOR_END: outstation_config
Expand Down
68 changes: 41 additions & 27 deletions dnp3/src/outstation/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,28 @@ enum ConfirmAction {
ContinueWait,
}

#[derive(Copy, Clone)]
enum NextIdleAction {
NoSleep,
SleepUntilEvent,
SleepUnit(tokio::time::Instant),
}

impl NextIdleAction {
fn select_earliest(self, instant: Option<tokio::time::Instant>) -> Self {
match instant {
None => self,
Some(instant) => match self {
NextIdleAction::NoSleep => self,
NextIdleAction::SleepUntilEvent => Self::SleepUnit(instant),
NextIdleAction::SleepUnit(other) => {
Self::SleepUnit(tokio::time::Instant::min(instant, other))
}
},
}
}
}

impl OutstationSession {
pub(crate) fn new(
initial_state: Enabled,
Expand Down Expand Up @@ -449,7 +471,7 @@ impl OutstationSession {
.await?;

// check to see if we should perform unsolicited
let deadline = self.check_unsolicited(io, reader, writer, database).await?;
let next_action = self.check_unsolicited(io, reader, writer, database).await?;

// handle a deferred read request if it was produced during unsolicited
self.handle_deferred_read(io, reader, writer, database)
Expand All @@ -458,13 +480,7 @@ impl OutstationSession {
// check to see if we should perform a link status check
self.check_link_status(io, writer).await?;

let deadline = match deadline {
Some(deadline) => match self.next_link_status {
Some(link_deadline) => Some(tokio::time::Instant::min(deadline, link_deadline)),
None => Some(deadline),
},
None => self.next_link_status,
};
let next_action = next_action.select_earliest(self.next_link_status);

// wait for an event
tokio::select! {
Expand All @@ -475,7 +491,7 @@ impl OutstationSession {
_ = database.wait_for_change() => {
// wake for unsolicited here
}
res = self.sleep_until(deadline) => {
res = self.sleep_until(next_action) => {
res?
// just wake up
}
Expand All @@ -490,9 +506,9 @@ impl OutstationSession {
reader: &mut TransportReader,
writer: &mut TransportWriter,
database: &mut DatabaseHandle,
) -> Result<Option<tokio::time::Instant>, RunError> {
) -> Result<NextIdleAction, RunError> {
if self.config.unsolicited.is_disabled() {
return Ok(None);
return Ok(NextIdleAction::SleepUntilEvent);
}

match self.state.unsolicited {
Expand All @@ -504,18 +520,18 @@ impl OutstationSession {
{
UnsolicitedResult::Timeout | UnsolicitedResult::ReturnToIdle => {
self.state.unsolicited = UnsolicitedState::NullRequired;
Ok(Some(tokio::time::Instant::now()))
Ok(NextIdleAction::NoSleep)
}
UnsolicitedResult::Confirmed => {
self.state.unsolicited = UnsolicitedState::Ready(None);
Ok(None)
Ok(NextIdleAction::NoSleep)
}
}
}
UnsolicitedState::Ready(deadline) => {
if let Some(deadline) = deadline {
if tokio::time::Instant::now() < deadline {
return Ok(Some(deadline)); // not ready yet
return Ok(NextIdleAction::SleepUnit(deadline)); // not ready yet
}
}

Expand All @@ -526,19 +542,19 @@ impl OutstationSession {
{
None => {
// there was nothing to send
Ok(None)
Ok(NextIdleAction::SleepUntilEvent)
}
Some(UnsolicitedResult::Timeout) | Some(UnsolicitedResult::ReturnToIdle) => {
let retry_at = self.new_unsolicited_retry_deadline();
self.state.unsolicited = UnsolicitedState::Ready(Some(retry_at));
Ok(Some(retry_at))
Ok(NextIdleAction::SleepUnit(retry_at))
}
Some(UnsolicitedResult::Confirmed) => {
database
.clear_written_events(self.application.as_mut())
.await;
self.state.unsolicited = UnsolicitedState::Ready(None);
Ok(None)
Ok(NextIdleAction::NoSleep)
}
}
}
Expand Down Expand Up @@ -845,7 +861,7 @@ impl OutstationSession {
) -> Result<TimeoutStatus, RunError> {
let decode_level = self.config.decode_level;
tokio::select! {
res = self.sleep_until(Some(deadline)) => {
res = self.sleep_until(NextIdleAction::SleepUnit(deadline)) => {
res?;
Ok(TimeoutStatus::Yes)
}
Expand All @@ -856,20 +872,18 @@ impl OutstationSession {
}
}

async fn sleep_until(&mut self, instant: Option<tokio::time::Instant>) -> Result<(), RunError> {
async fn sleep_only(instant: Option<tokio::time::Instant>) {
match instant {
Some(x) => tokio::time::sleep_until(x).await,
None => {
// sleep forever
crate::util::future::forever().await;
}
async fn sleep_until(&mut self, next_action: NextIdleAction) -> Result<(), RunError> {
async fn sleep_only(next_action: NextIdleAction) {
match next_action {
NextIdleAction::NoSleep => {}
NextIdleAction::SleepUnit(x) => tokio::time::sleep_until(x).await,
NextIdleAction::SleepUntilEvent => crate::util::future::forever().await,
}
}

loop {
tokio::select! {
_ = sleep_only(instant) => {
_ = sleep_only(next_action) => {
return Ok(());
}
res = self.handle_next_message() => {
Expand Down
7 changes: 7 additions & 0 deletions dnp3/src/outstation/tests/harness/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ impl OutstationHarness {
self.expect_response(response).await;
}

pub(crate) async fn expect_write(&mut self) -> Vec<u8> {
match self.io.next_event().await {
sfio_tokio_mock_io::Event::Write(bytes) => bytes,
x => panic!("Expected write but got: {x:?}"),
}
}

pub(crate) async fn expect_response(&mut self, response: &[u8]) {
assert_eq!(
self.io.next_event().await,
Expand Down
70 changes: 69 additions & 1 deletion dnp3/src/outstation/tests/unsolicited.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::app::gen::prefixed::PrefixedVariation;
use crate::app::measurement::*;
use crate::app::Timestamp;
use crate::app::parse::parser::{HeaderDetails, ParsedFragment};
use crate::app::{BufferSize, Timestamp};
use crate::outstation::config::OutstationConfig;
use crate::outstation::database::*;
use crate::outstation::{BufferState, ClassCount, TypeCount};
Expand Down Expand Up @@ -303,6 +305,72 @@ async fn handles_disable_unsolicited_during_unsolicited_confirm_wait() {
harness.check_no_events();
}

#[tokio::test]
async fn sends_unsolicited_from_one_update() {
let mut config = get_default_unsolicited_config();
config.unsolicited_buffer_size = BufferSize::min();
config.event_buffer_config.max_analog = 1001;
let mut harness = new_harness(config);
confirm_null_unsolicited(&mut harness).await;
enable_unsolicited(&mut harness).await;

harness.handle.database.transaction(|db| {
for i in 0..1000 {
db.add(
i,
Some(EventClass::Class1),
AnalogInputConfig {
s_var: StaticAnalogInputVariation::Group30Var1,
e_var: EventAnalogInputVariation::Group32Var1, // 5 bytes
deadband: 0.0,
},
);
}
});

harness.handle.database.transaction(|db| {
let value = AnalogInput {
value: 98.6,
flags: Flags { value: 0 },
time: None,
};

for i in 0..1000 {
db.update(i, &value, UpdateOptions::detect_event());
}
});

let mut total_fragments: usize = 0;
let mut total_events = 0;

loop {
let rx = harness.expect_write().await;
let response = ParsedFragment::parse(&rx).unwrap().to_response().unwrap();
let mut num_events = 0;
match response.objects.unwrap().get_only_header().unwrap().details {
HeaderDetails::TwoByteCountAndPrefix(_, PrefixedVariation::Group32Var1(seq)) => {
num_events += seq.iter().count()
}
x => panic!("Unexpected header: {x:?}"),
}

total_fragments += 1;
total_events += num_events;

// confirm the fragment
let confirm = &[uns_confirm(response.header.control.seq.value()), 0x00];
harness.send_and_process(confirm).await;

// terminate when the outstation indicates there are no more events
if !response.header.iin.iin1.get_class_1_events() {
break;
}
}

assert_eq!(total_events, 1000);
assert!(total_fragments > 10);
}

#[tokio::test]
async fn buffer_overflow_issue() {
let mut config = get_default_unsolicited_config();
Expand Down