Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dynamic publish requests limits based on subscriptions and latency #408

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions lib/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,6 @@ impl ClientBuilder {
self
}

/// Maximum number of pending publish requests.
pub fn max_inflight_publish(mut self, max_inflight_publish: usize) -> Self {
self.config.max_inflight_publish = max_inflight_publish;
self
}

/// Sets the session timeout period, in milliseconds.
pub fn session_timeout(mut self, session_timeout: u32) -> Self {
self.config.session_timeout = session_timeout;
Expand All @@ -273,12 +267,6 @@ impl ClientBuilder {
self
}

/// Maximum number of inflight messages.
pub fn max_inflight_messages(mut self, max_inflight_messages: usize) -> Self {
self.config.performance.max_inflight_messages = max_inflight_messages;
self
}

/// Session name - the default name to use for a new session
pub fn session_name(mut self, session_name: impl Into<String>) -> Self {
self.config.session_name = session_name.into();
Expand Down
6 changes: 0 additions & 6 deletions lib/src/client/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ pub struct Performance {
pub(crate) ignore_clock_skew: bool,
/// Maximum number of monitored items per request when recreating subscriptions on session recreation.
pub(crate) recreate_monitored_items_chunk: usize,
/// Maximum number of inflight messages.
pub(crate) max_inflight_messages: usize,
}

/// Client OPC UA configuration
Expand Down Expand Up @@ -218,8 +216,6 @@ pub struct ClientConfig {
/// Minimum publish interval. Setting this higher will make sure that subscriptions
/// publish together, which may reduce the number of publish requests if you have a lot of subscriptions.
pub(crate) min_publish_interval: Duration,
/// Maximum number of inflight publish requests before further requests are skipped.
pub(crate) max_inflight_publish: usize,

/// Requested session timeout in milliseconds
pub(crate) session_timeout: u32,
Expand Down Expand Up @@ -358,7 +354,6 @@ impl ClientConfig {
request_timeout: Duration::from_secs(60),
min_publish_interval: Duration::from_secs(1),
publish_timeout: Duration::from_secs(60),
max_inflight_publish: 2,
session_timeout: 0,
decoding_options: DecodingOptions {
max_array_length: decoding_options.max_array_length,
Expand All @@ -372,7 +367,6 @@ impl ClientConfig {
performance: Performance {
ignore_clock_skew: false,
recreate_monitored_items_chunk: 1000,
max_inflight_messages: 20,
},
session_name: "Rust OPC UA Client".into(),
}
Expand Down
1 change: 0 additions & 1 deletion lib/src/client/session/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ impl Client {
Arc::default(),
TransportConfiguration {
max_pending_incoming: 5,
max_inflight: self.config.performance.max_inflight_messages,
send_buffer_size: self.config.decoding_options.max_chunk_size,
recv_buffer_size: self.config.decoding_options.max_incoming_chunk_size,
max_message_size: self.config.decoding_options.max_message_size,
Expand Down
36 changes: 20 additions & 16 deletions lib/src/client/session/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ impl SessionActivityLoop {
futures::stream::unfold(self, |mut slf| async move {
match slf.tick_gen.next().await {
SessionTickEvent::KeepAlive => {
let now = Instant::now();
let res = slf
.inner
.read(
Expand All @@ -293,11 +294,19 @@ impl SessionActivityLoop {
1f64,
)
.await;
let elapsed = now.elapsed();

let value = match res.map(|r| r.into_iter().next()) {
Ok(Some(dv)) => dv,
// Should not be possible, this would be a bug in the server, assume everything
// is terrible.
let data_value = match res.map(|r| r.into_iter().next()) {
Ok(Some(data_value)) => {
// Only update if the request was successful to avoid
// skewing the roundtrip time by processing timeouts.
slf.inner
.publish_limits_watch_tx
.send_modify(|limits| limits.update_message_roundtrip(elapsed));
data_value
}
// Should not be possible, this would be a bug in
// the server, assume everything is terrible.
Ok(None) => {
return Some((
SessionActivity::KeepAliveFailed(StatusCode::BadUnknownResponse),
Expand All @@ -307,24 +316,19 @@ impl SessionActivityLoop {
Err(e) => return Some((SessionActivity::KeepAliveFailed(e), slf)),
};

let Some(status): Option<u8> = value.value.and_then(|v| v.try_into().ok())
else {
return Some((
SessionActivity::KeepAliveFailed(StatusCode::BadUnknownResponse),
slf,
));
};

match status {
// ServerState::Running
0 => Some((SessionActivity::KeepAliveSucceeded, slf)),
s => {
match data_value.value.and_then(|v| v.try_into().ok()) {
Some(0) => Some((SessionActivity::KeepAliveSucceeded, slf)),
Some(s) => {
warn!("Keep alive failed, non-running status code {s}");
Some((
SessionActivity::KeepAliveFailed(StatusCode::BadServerHalted),
slf,
))
}
None => Some((
SessionActivity::KeepAliveFailed(StatusCode::BadUnknownResponse),
slf,
)),
}
}
}
Expand Down
110 changes: 71 additions & 39 deletions lib/src/client/session/services/subscriptions/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ pub enum SubscriptionActivity {
pub struct SubscriptionEventLoop {
session: Arc<Session>,
trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
max_inflight_publish: usize,
last_external_trigger: Instant,
// This is true if the client has received a message BadTooManyPublishRequests
// This is true if the client has received BadTooManyPublishRequests
// and is waiting for a response before making further requests.
is_waiting_for_response: bool,
}
Expand All @@ -41,15 +40,14 @@ impl SubscriptionEventLoop {
///
/// * `session` - A shared reference to an [AsyncSession].
/// * `trigger_publish_recv` - A channel used to transmit external publish triggers.
/// This is used to trigger publish outside of the normal schedule, for example when
/// a new subscription is created.
/// This is used to trigger publish outside of the normal schedule, for example when
/// a new subscription is created.
pub fn new(
session: Arc<Session>,
trigger_publish_recv: tokio::sync::watch::Receiver<Instant>,
) -> Self {
let last_external_trigger = trigger_publish_recv.borrow().clone();
let last_external_trigger = *trigger_publish_recv.borrow();
Self {
max_inflight_publish: session.max_inflight_publish,
last_external_trigger,
trigger_publish_recv,
session,
Expand All @@ -69,9 +67,9 @@ impl SubscriptionEventLoop {
slf.trigger_publish_recv.clone();

let res = loop {
// Future for the next periodic publish. We do not send publish requests if there
// are no active subscriptions. In this case, simply return the non-terminating
// future.
// Future for the next periodic publish. We do not send publish requests
// if there are no active subscriptions. In this case, simply return the
// non-terminating future.
let next_tick_fut = if let Some(next) = next {
if slf.is_waiting_for_response && !futures.is_empty() {
Either::Right(futures::future::pending::<()>())
Expand All @@ -81,8 +79,9 @@ impl SubscriptionEventLoop {
} else {
Either::Right(futures::future::pending::<()>())
};
// If FuturesUnordered is empty, it will immediately yield `None`. We don't want that,
// so if it is empty we return the non-terminating future.

// If FuturesUnordered is empty, it will immediately yield `None`. We don't
// want that, so if it is empty we return the non-terminating future.
let next_publish_fut = if futures.is_empty() {
Either::Left(futures::future::pending())
} else {
Expand All @@ -93,65 +92,98 @@ impl SubscriptionEventLoop {
// Both internal ticks and external triggers result in publish requests.
v = recv.wait_for(|i| i > &slf.last_external_trigger) => {
if let Ok(v) = v {
debug!("Sending publish due to external trigger");
// On an external trigger, we always publish.
futures.push(slf.static_publish());
next = slf.session.next_publish_time(true);
slf.last_external_trigger = v.clone();
if !slf.is_waiting_for_response {
debug!("Sending publish due to external trigger");
// On an external trigger, we always publish.
futures.push(slf.static_publish());
next = slf.session.next_publish_time(true);
slf.last_external_trigger = *v;
} else {
debug!("Skipping publish due BadTooManyPublishRequests");
}
}
}
_ = next_tick_fut => {
// Avoid publishing if there are too many inflight publish requests.
if futures.len() < slf.max_inflight_publish {
debug!("Sending publish due to internal tick");
futures.push(slf.static_publish());
if futures.len()
< slf
.session
.publish_limits_watch_rx
.borrow()
.max_publish_requests
{
if !slf.is_waiting_for_response {
debug!("Sending publish due to internal tick");
futures.push(slf.static_publish());
} else {
debug!("Skipping publish due BadTooManyPublishRequests");
}
}
next = slf.session.next_publish_time(true);
}
res = next_publish_fut => {
match res {
Some(Ok(should_publish_now)) => {
if should_publish_now {
futures.push(slf.static_publish());
// Set the last publish time.
// We do this to avoid a buildup of publish requests
// if exhausting the queue takes more time than
// a single publishing interval.
slf.session.next_publish_time(true);
Some(Ok(more_notifications)) => {
if more_notifications
|| futures.len()
< slf
.session
.publish_limits_watch_rx
.borrow()
.min_publish_requests
{
if !slf.is_waiting_for_response {
debug!("Sending publish after receiving response");
futures.push(slf.static_publish());
// Set the last publish time to to avoid a buildup
// of publish requests if exhausting the queue takes
// more time than a single publishing interval.
slf.session.next_publish_time(true);
} else {
debug!("Skipping publish due BadTooManyPublishRequests");
}
}
slf.is_waiting_for_response = false;

break SubscriptionActivity::Publish
}
Some(Err(e)) => {
match e {
StatusCode::BadTimeout => {
session_debug!(slf.session, "Publish request timed out, sending another");
if futures.len() < slf.max_inflight_publish {
futures.push(slf.static_publish());
}
session_debug!(slf.session, "Publish request timed out");
}
StatusCode::BadTooManyPublishRequests => {
session_debug!(slf.session, "Server returned BadTooManyPublishRequests, backing off");
session_debug!(
slf.session,
"Server returned BadTooManyPublishRequests, backing off",
);
slf.is_waiting_for_response = true;
}
StatusCode::BadSessionClosed
| StatusCode::BadSessionIdInvalid => {
// TODO: Do something here?
session_error!(slf.session, "Publish response indicates session is dead");
session_error!(
slf.session,
"Publish response indicates session is dead"
);
}
StatusCode::BadNoSubscription
| StatusCode::BadSubscriptionIdInvalid => {
// TODO: Maybe do something here? This could happen when subscriptions are
// in the process of being recreated. Make sure to avoid race conditions.
session_error!(slf.session, "Publish response indicates subscription is dead");
// TODO: Maybe do something here? This could happen when
// subscriptions are in the process of being recreated.
// Make sure to avoid race conditions.
session_error!(
slf.session,
"Publish response indicates subscription is dead",
);
}
_ => ()
}
break SubscriptionActivity::PublishFailed(e)
}
// Should be impossible
None => break SubscriptionActivity::PublishFailed(StatusCode::BadInvalidState)
// Should be impossible.
None => break SubscriptionActivity::PublishFailed(
StatusCode::BadInvalidState,
)
}
}
}
Expand Down
43 changes: 43 additions & 0 deletions lib/src/client/session/services/subscriptions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,3 +449,46 @@ impl Subscription {
}
}
}

#[derive(Debug)]
pub(crate) struct PublishLimits {
message_roundtrip: Duration,
publish_interval: Duration,
subscriptions: usize,
min_publish_requests: usize,
max_publish_requests: usize,
}

impl PublishLimits {
const MIN_MESSAGE_ROUNDTRIP: Duration = Duration::from_millis(10);
const REQUESTS_PER_SUBSCRIPTION: usize = 2;

pub fn new() -> Self {
Self {
message_roundtrip: Self::MIN_MESSAGE_ROUNDTRIP,
publish_interval: Duration::ZERO,
subscriptions: 0,
min_publish_requests: 0,
max_publish_requests: 0,
}
}

pub fn update_message_roundtrip(&mut self, message_roundtrip: Duration) {
self.message_roundtrip = message_roundtrip.max(Self::MIN_MESSAGE_ROUNDTRIP);
self.calculate_publish_limits();
}

pub fn update_subscriptions(&mut self, subscriptions: usize, publish_interval: Duration) {
self.subscriptions = subscriptions;
self.publish_interval = publish_interval;
self.calculate_publish_limits();
}

fn calculate_publish_limits(&mut self) {
self.min_publish_requests = self.subscriptions * Self::REQUESTS_PER_SUBSCRIPTION;
self.max_publish_requests = (self.message_roundtrip.as_millis() as f32
/ self.publish_interval.as_millis() as f32)
.ceil() as usize
* (self.min_publish_requests);
}
}
Loading
Loading