Skip to content

Commit

Permalink
Merge pull request #25 from omnia-network/dev
Browse files Browse the repository at this point in the history
v1.3.3
  • Loading branch information
ilbertt authored Feb 1, 2024
2 parents 3a72dd7 + 55065ed commit bb51fae
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 35 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 22 additions & 13 deletions src/gateway-state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,18 @@ impl GatewayState {
&self,
canister_id: CanisterPrincipal,
client_key: ClientKey,
) -> ClientEntry {
) -> ClientRemovalResult {
// START OF THE CRITICAL SECTION
if let Entry::Occupied(mut entry) = self.inner.data.entry(canister_id) {
let poller_state = entry.get_mut();

// even if this is the last client session for the canister, do not remove the canister from the gateway state
// this will be done by the poller task
// returns 'ClientEntry::Removed' if the client was removed, 'ClientEntry::Vacant' if there was no such client
// returns 'ClientRemovalResult::Removed' if the client was removed, 'ClientRemovalResult::Vacant' if there was no such client
return {
match poller_state.remove(&client_key) {
Some(_) => ClientEntry::Removed(client_key),
None => ClientEntry::Vacant,
Some(_) => ClientRemovalResult::Removed(client_key),
None => ClientRemovalResult::Vacant,
}
};
}
Expand All @@ -148,7 +148,7 @@ impl GatewayState {
// indeed, a client session might get an error before the poller side of the channel has been dropped - but after the poller state has been removed -
// in such a case, the client state has already been removed by the poller, together with the whole poller state
// therefore there is no need to do anything else here and we pretend that there is no such entry
ClientEntry::Vacant
ClientRemovalResult::Vacant
}

/// SAFETY:
Expand All @@ -160,17 +160,20 @@ impl GatewayState {
/// Therefore, this function is executed atomically.
///
/// This function shall be called only if it is guaranteed that the canister entry exists in the gateway state.
pub fn remove_canister_if_empty(&self, canister_id: CanisterPrincipal) -> CanisterEntry {
pub fn remove_canister_if_empty(
&self,
canister_id: CanisterPrincipal,
) -> CanisterRemovalResult {
// remove_if returns None if the condition is not met, otherwise it returns the Some(<entry>)
// if Some, the poller state is empty and therefore the poller shall terminate - return 'CanisterEntry::RemovedEmpty'
// if None, the poller state is not empty and therefore there are still clients connected and the poller shall not terminate - return 'CanisterEntry::NotEmpty'
// if Some, the poller state is empty and therefore the poller shall terminate - return 'CanisterRemovalResult::Empty'
// if None, the poller state is not empty and therefore there are still clients connected and the poller shall not terminate - return 'CanisterRemovalResult::NotEmpty'
match self
.inner
.data
.remove_if(&canister_id, |_, poller_state| poller_state.is_empty())
{
Some(_) => CanisterEntry::RemovedEmpty,
None => CanisterEntry::NotEmpty,
Some(_) => CanisterRemovalResult::Empty,
None => CanisterRemovalResult::NotEmpty,
}
}

Expand Down Expand Up @@ -211,13 +214,19 @@ impl GatewayStateInner {
/// and the state associated to each client
pub type PollerState = Arc<DashMap<ClientKey, ClientSender>>;

pub enum ClientEntry {
/// Determines whether the client was removed from the poller state or if there was no such client
pub enum ClientRemovalResult {
/// The client was removed from the poller state
Removed(ClientKey),
/// The client was not present in the poller state
Vacant,
}

pub enum CanisterEntry {
RemovedEmpty,
/// Determines whether the canister was removed from the gateway state or not (in case there are still clients connected)
pub enum CanisterRemovalResult {
/// The canister was removed from the gateway state
Empty,
/// The canister was not removed from the gateway state
NotEmpty,
}

Expand Down
2 changes: 1 addition & 1 deletion src/ic-websocket-gateway/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ic_websocket_gateway"
version = "1.3.2"
version = "1.3.3"
edition.workspace = true
rust-version.workspace = true
repository.workspace = true
Expand Down
26 changes: 17 additions & 9 deletions src/ic-websocket-gateway/src/canister_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use canister_utils::{
ws_get_messages, CanisterOutputCertifiedMessages, CanisterToClientMessage,
CanisterWsGetMessagesArguments, IcError, IcWsCanisterMessage,
};
use gateway_state::{CanisterEntry, CanisterPrincipal, ClientSender, GatewayState, PollerState};
use ic_agent::{Agent, AgentError};
use gateway_state::{
CanisterPrincipal, CanisterRemovalResult, ClientSender, GatewayState, PollerState,
};
use ic_agent::{agent::RejectCode, Agent, AgentError};
use std::{sync::Arc, time::Duration};
use tokio::{sync::mpsc::Sender, time::timeout};
use tracing::{error, span, trace, warn, Instrument, Level, Span};
Expand All @@ -13,11 +15,15 @@ pub(crate) const POLLING_TIMEOUT_MS: u64 = 5_000;

type PollingTimeout = Duration;

/// Result of the polling iteration
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum PollingStatus {
/// No messages polled
NoMessagesPolled,
/// Some messages polled
MessagesPolled(CanisterOutputCertifiedMessages),
PollerTimedOut,
/// Request timed out
TimedOut,
}

/// Poller which periodically queries a canister for new messages and relays them to the client
Expand Down Expand Up @@ -130,7 +136,7 @@ impl CanisterPoller {
return Ok(());
}
},
PollingStatus::PollerTimedOut => {
PollingStatus::TimedOut => {
// if the poller timed out, it already waited way too long... return immediately so that the next polling iteration can be started
warn!("Poller timed out. Polling immediately");
return Ok(());
Expand Down Expand Up @@ -196,7 +202,7 @@ impl CanisterPoller {
Ok(Err(IcError::Cdk(e))) => Err(format!("Unrecoverable CDK error: {:?}", e)),
Err(e) => {
warn!("Poller took too long to retrieve messages: {:?}", e);
Ok(PollingStatus::PollerTimedOut)
Ok(PollingStatus::TimedOut)
},
}
}
Expand All @@ -221,7 +227,7 @@ impl CanisterPoller {
.get(&canister_output_message.client_key)
.as_deref()
{
let canister_message_span = span!(parent: client_session_span, Level::TRACE, "Canister Message", message_key = canister_to_client_message.key);
let canister_message_span = span!(parent: client_session_span, Level::TRACE, "Canister Message", message_key = canister_to_client_message.key, %self.canister_id);
canister_message_span.follows_from(Span::current().id());
let canister_message = canister_message_span.in_scope(|| {
trace!("Start relaying message",);
Expand Down Expand Up @@ -313,8 +319,8 @@ impl CanisterPoller {
.gateway_state
.remove_canister_if_empty(self.canister_id)
{
CanisterEntry::RemovedEmpty => true,
CanisterEntry::NotEmpty => false,
CanisterRemovalResult::Empty => true,
CanisterRemovalResult::NotEmpty => false,
}
}
}
Expand Down Expand Up @@ -353,7 +359,6 @@ fn is_recoverable_error(e: &AgentError) -> bool {
AgentError::InvalidReplicaUrl(_)
| AgentError::TimeoutWaitingForResponse()
| AgentError::InvalidCborData(_)
| AgentError::ReplicaError(_)
| AgentError::HttpError(_)
| AgentError::InvalidReplicaStatus
| AgentError::RequestStatusDoneNoReply(_)
Expand All @@ -366,6 +371,9 @@ fn is_recoverable_error(e: &AgentError) -> bool {
| AgentError::TransportError(_)
| AgentError::CallDataMismatch { .. }
| AgentError::InvalidRejectCode(_) => true,
// in case of a replica error, we recover only if the error is transient
// all other errors (SysFatal, DestinationInvalid, CanisterReject, CanisterError) are considered permanent
AgentError::ReplicaError(e) => e.reject_code == RejectCode::SysTransient,
_ => false,
}
}
31 changes: 24 additions & 7 deletions src/ic-websocket-gateway/src/client_session_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
};
use canister_utils::{ws_close, CanisterWsCloseArguments, ClientKey, IcWsCanisterMessage};
use futures_util::StreamExt;
use gateway_state::{CanisterPrincipal, ClientEntry, GatewayState, PollerState};
use gateway_state::{CanisterPrincipal, ClientRemovalResult, GatewayState, PollerState};
use ic_agent::Agent;
use std::sync::Arc;
use tokio::{
Expand Down Expand Up @@ -135,21 +135,31 @@ impl ClientSessionHandler {
.gateway_state
.insert_client_channel_and_get_new_poller_state(
canister_id,
client_key,
client_key.clone(),
// important not to clone 'client_channel_tx' as otherwise the client session will not receive None in case of a poller error
client_channel_tx.take().expect("must be set only once"),
client_session_span.clone(),
);
debug!("Client added to gateway state");

client_session_span.record("canister_id", canister_id.to_string());

// ensure this is done after the gateway state has been updated
// TODO: figure out if it is guaranteed that all threads see the updated state of the gateway
// before relaying the message to the IC
client_session
if let Err(e) = client_session
.relay_client_message(ws_open_message)
.instrument(client_session_span.clone())
.await
.map_err(|e| format!("Could not relay WS open message to IC: {:?}", e))?;
{
// if the message could not be relayed to the IC, remove the client from the gateway state
// before returning the error and terminating the session handler
self.gateway_state
.remove_client(canister_id, client_key.clone());
debug!("Client removed from gateway state");

return Err(format!("Could not relay WS open message to IC: {:?}", e))?;
}

client_session_span.in_scope(|| {
debug!("Client session setup");
Expand Down Expand Up @@ -180,9 +190,10 @@ impl ClientSessionHandler {

let canister_id = self.get_canister_id(&client_session);
let client_key = self.get_client_key(&client_session);
// remove client from poller state
// remove client from gateway state
self.gateway_state
.remove_client(canister_id, client_key.clone());
debug!("Client removed from gateway state");

self.call_ws_close(&canister_id, client_key).await;

Expand All @@ -194,20 +205,26 @@ impl ClientSessionHandler {
continue;
},
Err(e) => {
client_session_span.in_scope(|| {
debug!("Client session error");
});
if let IcWsError::Poller(e) = e {
// no need to remove the client as the whole poller state has already been removed by the poller task
return Err(format!("Poller error: {:?}", e));
let err_msg = format!("Poller error: {:?}", e);
warn!(err_msg);
return Err(err_msg);
}
let canister_id = self.get_canister_id(&client_session);
let client_key = self.get_client_key(&client_session);
// if the error is not due to a a failed poller
// remove client from poller state, if it is present
// error might have happened before the client session was Setup
// if so, there is no need to remove the client as it is not yet in the poller state
if let ClientEntry::Removed(client_key) = self
if let ClientRemovalResult::Removed(client_key) = self
.gateway_state
.remove_client_if_exists(canister_id, client_key)
{
debug!("Client removed from gateway state");
self.call_ws_close(&canister_id, client_key).await;

// return Err as the session had an error and cannot be updated anymore
Expand Down
5 changes: 1 addition & 4 deletions src/ic-websocket-gateway/src/tests/canister_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,7 @@ mod test {
let mut poller = create_poller(polling_interval_ms, client_channel_tx);

// check that the poller times out
assert_eq!(
Ok(PollingStatus::PollerTimedOut),
poller.poll_canister().await
);
assert_eq!(Ok(PollingStatus::TimedOut), poller.poll_canister().await);

// check that the poller does not wait for a polling interval after timing out
let start_polling_instant = tokio::time::Instant::now();
Expand Down

0 comments on commit bb51fae

Please sign in to comment.