Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it optional to transfer subscriptions on reconnect #409

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ impl ClientBuilder {
self
}

/// Sets whether the client should transfer (or recreate if transfers are
/// not supported) subscriptions to the new session when reconnecting.
pub fn transfer_on_reconnect(mut self, transfer_on_reconnect: bool) -> Self {
self.config.transfer_on_reconnect = transfer_on_reconnect;
self
}

/// Initial time between retries when backing off on session reconnects.
pub fn session_retry_initial(mut self, session_retry_initial: Duration) -> Self {
self.config.session_retry_initial = session_retry_initial;
Expand Down
32 changes: 19 additions & 13 deletions lib/src/client/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,18 @@ impl ClientUserToken {
);
valid = false;
}
} else {
if self.cert_path.is_none() && self.private_key_path.is_none() {
error!(
"User token {} fails to provide a password or certificate info.",
self.user
);
valid = false;
} else if self.cert_path.is_none() || self.private_key_path.is_none() {
error!("User token {} fails to provide both a certificate path and a private key path.", self.user);
valid = false;
}
} else if self.cert_path.is_none() && self.private_key_path.is_none() {
error!(
"User token {} fails to provide a password or certificate info.",
self.user
);
valid = false;
} else if self.cert_path.is_none() || self.private_key_path.is_none() {
error!(
"User token {} fails to provide both a certificate path and a private key path.",
self.user
);
valid = false;
}
valid
}
Expand Down Expand Up @@ -202,6 +203,9 @@ pub struct ClientConfig {
/// Maximum number of times to attempt to reconnect to the server before giving up.
/// -1 retries forever
pub(crate) session_retry_limit: i32,
/// Transfer (or recreate if transfers are not supported) subscriptions to
/// the new session when reconnecting.
pub(crate) transfer_on_reconnect: bool,

/// Initial delay for exponential backoff when reconnecting to the server.
pub(crate) session_retry_initial: Duration,
Expand All @@ -215,8 +219,9 @@ pub struct ClientConfig {
/// Timeout for publish requests, separate from normal timeout since
/// subscriptions are often more time sensitive.
pub(crate) publish_timeout: Duration,
/// Minimum publish interval. Setting this higher will make sure that subscriptions
/// publish together, which may reduce the number of publish requests if you have a lot of subscriptions.
/// Minimum publish interval. Setting this higher will make sure that
/// subscriptions publish together, which may reduce the number of publish
/// requests if you have a lot of subscriptions.
pub(crate) min_publish_interval: Duration,
/// Maximum number of inflight publish requests before further requests are skipped.
pub(crate) max_inflight_publish: usize,
Expand Down Expand Up @@ -354,6 +359,7 @@ impl ClientConfig {
session_retry_limit: SessionRetryPolicy::DEFAULT_RETRY_LIMIT as i32,
session_retry_initial: Duration::from_secs(1),
session_retry_max: Duration::from_secs(30),
transfer_on_reconnect: true,
keep_alive_interval: Duration::from_secs(10),
request_timeout: Duration::from_secs(60),
min_publish_interval: Duration::from_secs(1),
Expand Down
4 changes: 3 additions & 1 deletion lib/src/client/session/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ impl SessionConnector {
}
};

self.inner.transfer_subscriptions_from_old_session().await;
if self.inner.transfer_on_reconnect {
self.inner.transfer_subscriptions_from_old_session().await;
}

Ok(reconnect)
}
Expand Down
4 changes: 3 additions & 1 deletion lib/src/client/session/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ pub struct Session {
pub(super) application_description: ApplicationDescription,
pub(super) request_timeout: Duration,
pub(super) publish_timeout: Duration,
pub(super) recreate_monitored_items_chunk: usize,
pub(super) session_timeout: f64,
pub(super) max_inflight_publish: usize,
pub(super) recreate_monitored_items_chunk: usize,
pub subscription_state: Mutex<SubscriptionState>,
pub(super) transfer_on_reconnect: bool,
pub(super) monitored_item_handle: AtomicHandle,
pub(super) trigger_publish_tx: tokio::sync::watch::Sender<Instant>,
}
Expand Down Expand Up @@ -106,6 +107,7 @@ impl Session {
max_inflight_publish: config.max_inflight_publish,
recreate_monitored_items_chunk: config.performance.recreate_monitored_items_chunk,
subscription_state: Mutex::new(SubscriptionState::new(config.min_publish_interval)),
transfer_on_reconnect: config.transfer_on_reconnect,
monitored_item_handle: AtomicHandle::new(1000),
trigger_publish_tx,
});
Expand Down
1 change: 1 addition & 0 deletions samples/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ decoding_options:
max_byte_string_length: 65535
max_array_length: 1000
session_retry_limit: 10
transfer_on_reconnect: true
session_retry_initial:
secs: 1
nanos: 0
Expand Down
Loading