Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDP Support #354

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ce989b9
read up to a full fragment with one call to physical layer
jadamcrain Apr 15, 2024
fe0c4be
factor out the loop in the link reader
jadamcrain Apr 18, 2024
af8984f
introduce concept of a link ReadMode that will allow us to process st…
jadamcrain Apr 18, 2024
cd8e03b
create a LinkModes struct that parameters both the error mode and the…
jadamcrain Apr 18, 2024
0eb1d03
separate the buffer functionality
jadamcrain Apr 18, 2024
f9ed1f6
use scursor 0.2 which can tell us the amount consumed from the read …
jadamcrain Apr 18, 2024
05252d9
fix comment
jadamcrain Apr 19, 2024
9655f80
always read more data if the parser indicates an incomplete frame
jadamcrain Apr 19, 2024
3517bd9
Add initial udp physical layer and concept of a PhysAddr
jadamcrain Apr 22, 2024
ebe50a6
bring the PhysAddr up into the FrameInfo struct
jadamcrain Apr 22, 2024
36b6cd1
bring the PhysAddr up through the transport layer into the received f…
jadamcrain Apr 22, 2024
7115edf
move udp wrapper to its own file
jadamcrain Apr 22, 2024
7ff3686
create a type that combines the link and physical layer addresses
jadamcrain Apr 22, 2024
df0288d
Udp/partial (#352)
jadamcrain Apr 23, 2024
ff10cb8
First public API
jadamcrain Apr 24, 2024
af7d1cd
trap UDP resets
jadamcrain Apr 24, 2024
a15f80c
discern between stream and datagram master channels
jadamcrain Apr 24, 2024
96f1c47
Add UDP entry point
jadamcrain Apr 24, 2024
728c66d
working master example
jadamcrain Apr 24, 2024
6ef7f22
fix bug
jadamcrain Apr 24, 2024
7b61dff
Add udp outstation to the FFI
jadamcrain Apr 24, 2024
0f0dfdb
comment
jadamcrain Apr 25, 2024
e86c1fc
Implement add_udp_association
jadamcrain Apr 25, 2024
ab2a6fb
Complete UDP FFI
jadamcrain Apr 25, 2024
d37be0e
target .NET 6.0 in C# examples
jadamcrain Apr 25, 2024
4c5256f
fix example
jadamcrain Apr 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dnp3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ tracing = "0.1"
chrono = "0.4"
tokio = { version = "1", features = ["net", "sync", "io-util", "io-std", "time", "rt", "rt-multi-thread", "macros"] }
xxhash-rust = { version = "0.8", features = ["xxh64"] }
scursor = "0.1.0"
scursor = "0.2.0"

# TLS dependencies
sfio-rustls-config = { version = "0.3.2", optional = true }
Expand Down
58 changes: 45 additions & 13 deletions dnp3/examples/master.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use dnp3::serial::*;
use dnp3::tcp::tls::*;

use dnp3::outstation::FreezeInterval;
use dnp3::udp::spawn_master_udp;
use std::process::exit;

/// read handler that does nothing
Expand Down Expand Up @@ -225,7 +226,7 @@ impl FileReader for FileLogger {
The program initializes a master channel based on the command line argument and then enters a loop
reading console input allowing the user to perform common tasks interactively.

All of the configuration values are hard-coded but can be changed with a recompile.
All the configuration values are hard-coded but can be changed with a recompile.
*/
// ANCHOR: runtime_init
#[tokio::main(flavor = "multi_thread")]
Expand All @@ -243,17 +244,35 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// spawn the master channel based on the command line argument
let mut channel = create_channel()?;

// ANCHOR: association_create
let mut association = channel
.add_association(
EndpointAddress::try_new(1024)?,
get_association_config(),
ExampleReadHandler::boxed(),
Box::new(ExampleAssociationHandler),
Box::new(ExampleAssociationInformation),
)
.await?;
// ANCHOR_END: association_create
let mut association = match channel.get_channel_type() {
MasterChannelType::Udp => {
// ANCHOR: association_create_udp
channel
.add_udp_association(
EndpointAddress::try_new(1024)?,
"127.0.0.1:20000".parse()?,
get_association_config(),
ExampleReadHandler::boxed(),
Box::new(ExampleAssociationHandler),
Box::new(ExampleAssociationInformation),
)
.await?
// ANCHOR_END: association_create_udp
}
MasterChannelType::Stream => {
// ANCHOR: association_create
channel
.add_association(
EndpointAddress::try_new(1024)?,
get_association_config(),
ExampleReadHandler::boxed(),
Box::new(ExampleAssociationHandler),
Box::new(ExampleAssociationInformation),
)
.await?
// ANCHOR_END: association_create
}
};

// create an event poll
// ANCHOR: add_poll
Expand Down Expand Up @@ -483,12 +502,13 @@ fn create_channel() -> Result<MasterChannel, Box<dyn std::error::Error>> {
[_, x] => x,
_ => {
eprintln!("please specify a transport:");
eprintln!("usage: master <transport> (tcp, serial, tls-ca, tls-self-signed)");
eprintln!("usage: master <transport> (tcp, udp, serial, tls-ca, tls-self-signed)");
exit(-1);
}
};
match transport {
"tcp" => create_tcp_channel(),
"udp" => create_udp_channel(),
#[cfg(feature = "serial")]
"serial" => create_serial_channel(),
#[cfg(feature = "tls")]
Expand Down Expand Up @@ -575,6 +595,18 @@ fn create_tcp_channel() -> Result<MasterChannel, Box<dyn std::error::Error>> {
Ok(channel)
}

fn create_udp_channel() -> Result<MasterChannel, Box<dyn std::error::Error>> {
// ANCHOR: create_master_udp_channel
let channel = spawn_master_udp(
"127.0.0.1:20001".parse()?,
LinkReadMode::Datagram,
Timeout::from_secs(5)?,
get_master_channel_config()?,
);
// ANCHOR_END: create_master_udp_channel
Ok(channel)
}

#[cfg(feature = "serial")]
fn create_serial_channel() -> Result<MasterChannel, Box<dyn std::error::Error>> {
// ANCHOR: create_master_serial_channel
Expand Down
24 changes: 23 additions & 1 deletion dnp3/examples/outstation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio_util::codec::LinesCodec;
use dnp3::serial::*;
#[cfg(feature = "tls")]
use dnp3::tcp::tls::*;
use dnp3::udp::{spawn_outstation_udp, OutstationUdpConfig, UdpSocketMode};

/// example of using the outstation API asynchronously from within the Tokio runtime
///
Expand All @@ -41,6 +42,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
match transport {
"tcp" => run_tcp_server().await,
"tcp-client" => run_tcp_client().await,
"udp" => run_udp().await,
#[cfg(feature = "serial")]
"serial" => run_serial().await,
#[cfg(feature = "tls")]
Expand Down Expand Up @@ -245,6 +247,26 @@ async fn run_tcp_server() -> Result<(), Box<dyn std::error::Error>> {
run_server(server).await
}

async fn run_udp() -> Result<(), Box<dyn std::error::Error>> {
let udp_config = OutstationUdpConfig {
local_endpoint: "127.0.0.1:20000".parse().unwrap(),
remote_endpoint: "127.0.0.1:20001".parse().unwrap(),
socket_mode: UdpSocketMode::OneToOne,
link_read_mode: LinkReadMode::Datagram,
retry_delay: Timeout::from_secs(5)?,
};

let outstation = spawn_outstation_udp(
udp_config,
get_outstation_config(),
Box::new(ExampleOutstationApplication),
Box::new(ExampleOutstationInformation),
Box::new(ExampleControlHandler),
);

run_outstation(outstation).await
}

async fn run_tcp_client() -> Result<(), Box<dyn std::error::Error>> {
let outstation = spawn_outstation_tcp_client(
LinkErrorMode::Close,
Expand Down Expand Up @@ -304,7 +326,7 @@ async fn run_server(mut server: Server) -> Result<(), Box<dyn std::error::Error>
)?;
// ANCHOR_END: tcp_server_spawn_outstation

// setup the outstation's database before we spawn it
// set up the outstation's database before we spawn it
// ANCHOR: database_init
outstation.transaction(|db| {
// initialize 10 points of each type
Expand Down
4 changes: 3 additions & 1 deletion dnp3/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![doc = include_str!("../README.md")]
#![deny(
dead_code,
//dead_code,
arithmetic_overflow,
invalid_type_param_default,
missing_fragment_specifier,
Expand Down Expand Up @@ -65,6 +65,8 @@ pub mod outstation;
pub mod serial;
/// Entry points and types for TCP
pub mod tcp;
/// Entry points and types for UDP
pub mod udp;

pub(crate) mod transport;
pub(crate) mod util;
12 changes: 8 additions & 4 deletions dnp3/src/link/header.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::link::EndpointAddress;
use crate::util::phys::PhysAddr;

use super::function::Function;

Expand Down Expand Up @@ -127,18 +128,21 @@ pub(crate) struct FrameInfo {
pub(crate) source: EndpointAddress,
pub(crate) broadcast: Option<BroadcastConfirmMode>,
pub(crate) frame_type: FrameType,
pub(crate) phys_addr: PhysAddr,
}

impl FrameInfo {
pub(crate) fn new(
source: EndpointAddress,
broadcast: Option<BroadcastConfirmMode>,
frame_type: FrameType,
phys_addr: PhysAddr,
) -> Self {
Self {
source,
broadcast,
frame_type,
phys_addr,
}
}
}
Expand Down Expand Up @@ -180,13 +184,13 @@ impl Header {

pub(crate) fn request_link_status(
is_master: bool,
destination: AnyAddress,
source: AnyAddress,
destination: EndpointAddress,
source: EndpointAddress,
) -> Self {
Self::new(
ControlField::new(is_master, Function::PriRequestLinkStatus),
destination,
source,
destination.wrap(),
source.wrap(),
)
}
}
29 changes: 19 additions & 10 deletions dnp3/src/link/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use crate::link::header::{
AnyAddress, BroadcastConfirmMode, ControlField, FrameInfo, FrameType, Header,
};
use crate::link::parser::FramePayload;
use crate::link::{EndpointAddress, LinkErrorMode};
use crate::link::reader::LinkModes;
use crate::link::EndpointAddress;
use crate::outstation::Feature;
use crate::util::phys::PhysLayer;
use crate::util::phys::{PhysAddr, PhysLayer};

enum SecondaryState {
NotReset,
Expand Down Expand Up @@ -39,7 +40,8 @@ impl Reply {

impl Layer {
pub(crate) fn new(
error_mode: LinkErrorMode,
link_modes: LinkModes,
max_rx_fragment_size: usize,
endpoint_type: EndpointType,
self_address: Feature,
local_address: EndpointAddress,
Expand All @@ -49,7 +51,7 @@ impl Layer {
self_address,
local_address,
secondary_state: SecondaryState::NotReset,
reader: super::reader::Reader::new(error_mode),
reader: super::reader::Reader::new(link_modes, max_rx_fragment_size),
tx_buffer: [0; super::constant::LINK_HEADER_LENGTH],
}
}
Expand Down Expand Up @@ -91,19 +93,24 @@ impl Layer {
level: DecodeLevel,
payload: &mut FramePayload,
) -> Result<Option<FrameInfo>, LinkError> {
let header = self.reader.read(io, payload, level).await?;
let (info, reply) = self.process_header(&header);
let (header, addr) = self.reader.read_frame(io, payload, level).await?;
let (info, reply) = self.process_header(header, addr);
if let Some(reply) = reply {
let header = self.get_header(reply);
if level.link.enabled() {
tracing::info!("LINK TX - {}", LinkDisplay::new(header, &[], level.link));
}
io.write(self.format_reply(header), level.physical).await?
io.write(self.format_reply(header), addr, level.physical)
.await?
}
Ok(info)
}

fn process_header(&mut self, header: &Header) -> (Option<FrameInfo>, Option<Reply>) {
fn process_header(
&mut self,
header: Header,
addr: PhysAddr,
) -> (Option<FrameInfo>, Option<Reply>) {
// ignore frames sent from the same endpoint type
if header.control.master == self.endpoint_type.dir_bit() {
// we don't log this
Expand Down Expand Up @@ -176,7 +183,7 @@ impl Layer {
}

(
Some(FrameInfo::new(source, broadcast, FrameType::Data)),
Some(FrameInfo::new(source, broadcast, FrameType::Data, addr)),
None,
)
}
Expand Down Expand Up @@ -213,7 +220,7 @@ impl Layer {
if header.control.fcb == expected {
self.secondary_state = SecondaryState::Reset(!expected);
(
Some(FrameInfo::new(source, broadcast, FrameType::Data)),
Some(FrameInfo::new(source, broadcast, FrameType::Data, addr)),
response,
)
} else {
Expand All @@ -234,6 +241,7 @@ impl Layer {
source,
broadcast,
FrameType::LinkStatusRequest,
addr,
)),
Some(Reply::new(source, Function::SecLinkStatus)),
)
Expand All @@ -243,6 +251,7 @@ impl Layer {
source,
broadcast,
FrameType::LinkStatusResponse,
addr,
)),
None,
),
Expand Down
17 changes: 17 additions & 0 deletions dnp3/src/link/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ pub enum LinkErrorMode {
Close,
}

/// Controls how the link-layer parser treats frames that span multiple calls to read of
/// the physical layer.
///
/// UDP is unique in that the specification requires that link layer frames be wholly contained
/// within datagrams, but this can be relaxed by configuration.
#[cfg_attr(
feature = "serialization",
derive(serde::Serialize, serde::Deserialize)
)]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum LinkReadMode {
/// Reading from a stream (TCP, serial, etc.) where link-layer frames MAY span separate calls to read
Stream,
/// Reading datagrams (UDP) where link-layer frames MAY NOT span separate calls to read
Datagram,
}

/// Represents a validated 16-bit endpoint address for a master or an outstation
/// Certain special addresses are not allowed by the standard to be used
/// as endpoint addresses.
Expand Down
Loading
Loading