Skip to content

Commit

Permalink
Merge branch 'dimitris/stop-req-timeout' into 'master'
Browse files Browse the repository at this point in the history
feat: Time out stop_canister requests

Stop canister requests now time out after 5 minutes. See dfinity/interface-spec#229 for more details. 

See merge request dfinity-lab/public/ic!15068
  • Loading branch information
ielashi committed Nov 22, 2023
2 parents 5815865 + 4ee1803 commit e3a6b3d
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 68 deletions.
7 changes: 7 additions & 0 deletions rs/config/src/execution_environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ const SUBNET_WASM_CUSTOM_SECTIONS_MEMORY_CAPACITY: NumBytes = NumBytes::new(2 *
/// The number of bytes reserved for response callback executions.
const SUBNET_MEMORY_RESERVATION: NumBytes = NumBytes::new(10 * GIB);

/// The duration a stop_canister has to stop the canister before timing out.
pub const STOP_CANISTER_TIMEOUT_DURATION: Duration = Duration::from_secs(5 * 60); // 5 minutes

/// This is the upper limit on how big heap deltas all the canisters together
/// can produce on a subnet in between checkpoints. Once, the total delta size
/// is above this limit, no more canisters will be executed till the next
Expand Down Expand Up @@ -229,6 +232,9 @@ pub struct Config {

/// Indicate whether the Wasm chunk store feature has been enabled or not.
pub wasm_chunk_store: FlagStatus,

/// The duration a stop_canister has to stop the canister before timing out.
pub stop_canister_timeout_duration: Duration,
}

impl Default for Config {
Expand Down Expand Up @@ -294,6 +300,7 @@ impl Default for Config {
max_compilation_cache_size: MAX_COMPILATION_CACHE_SIZE,
query_stats_aggregation: FlagStatus::Disabled,
wasm_chunk_store: FlagStatus::Disabled,
stop_canister_timeout_duration: STOP_CANISTER_TIMEOUT_DURATION,
}
}
}
Expand Down
220 changes: 156 additions & 64 deletions rs/execution_environment/src/execution_environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,15 @@ struct PausedExecutionRegistry {
paused_install_code: HashMap<PausedExecutionId, Box<dyn PausedInstallCodeExecution>>,
}

// The replies that can be returned for a `stop_canister` request.
#[derive(Debug, PartialEq, Eq)]
enum StopCanisterReply {
// The stop request was completed successfully.
Completed,
// The stop request timed out.
Timeout,
}

/// ExecutionEnvironment is the component responsible for executing messages
/// on the IC.
pub struct ExecutionEnvironment {
Expand Down Expand Up @@ -2662,9 +2671,73 @@ impl ExecutionEnvironment {
}
}

/// Helper function to respond to a stop request based on the provided `StopCanisterReply`.
fn reply_to_stop_context(
&self,
stop_context: &StopCanisterContext,
state: &mut ReplicatedState,
canister_id: CanisterId,
time: Time,
reply: StopCanisterReply,
) {
let call_id = stop_context.call_id();
self.remove_stop_canister_call(state, canister_id, *call_id);

match stop_context {
StopCanisterContext::Ingress {
sender, message_id, ..
} => {
// Responding to stop_canister request from a user.
let ingress_state = match reply {
StopCanisterReply::Completed => {
IngressState::Completed(WasmResult::Reply(EmptyBlob.encode()))
}
StopCanisterReply::Timeout => IngressState::Failed(UserError::new(
ErrorCode::StopCanisterRequestTimeout,
"Stop canister request timed out".to_string(),
)),
};
self.ingress_history_writer.set_status(
state,
message_id.clone(),
IngressStatus::Known {
receiver: IC_00.get(),
user_id: *sender,
time,
state: ingress_state,
},
)
}
StopCanisterContext::Canister {
sender,
reply_callback,
cycles,
..
} => {
// Responding to stop_canister request from a canister.
let subnet_id_as_canister_id = CanisterId::from(self.own_subnet_id);
let response_payload = match reply {
StopCanisterReply::Completed => Payload::Data(EmptyBlob.encode()),
StopCanisterReply::Timeout => Payload::Reject(RejectContext::new(
RejectCode::SysTransient,
"Stop canister request timed out",
)),
};
let response = ic_types::messages::Response {
originator: *sender,
respondent: subnet_id_as_canister_id,
originator_reply_callback: *reply_callback,
refund: *cycles,
response_payload,
};
state.push_subnet_output_response(response.into());
}
}
}

/// Helper function to remove stop canister calls
/// from SubnetCallContextManager based on provided call id.
pub fn remove_stop_canister_call(
fn remove_stop_canister_call(
&self,
state: &mut ReplicatedState,
canister_id: CanisterId,
Expand Down Expand Up @@ -2698,76 +2771,51 @@ impl ExecutionEnvironment {
}
}

/// Checks for stopping canisters and, if any of them are ready to stop,
/// transitions them to be fully stopped. Responses to the pending stop
/// messages are written to ingress history.
/// Checks for stopping canisters and performs the following:
/// 1. If there are stop contexts that have timed out, respond to them.
/// 2. If any stopping canisters are ready to stop, transition them to
/// be fully stopped and reply to the corresponding stop contexts.
///
/// Responses to the pending stop messages are written to ingress history
/// or returned to the calling canisters respectively.
pub fn process_stopping_canisters(&self, mut state: ReplicatedState) -> ReplicatedState {
let mut canister_states = state.take_canister_states();
let time = state.time();

for canister in canister_states.values_mut() {
if !(canister.status() == CanisterStatusType::Stopping
&& canister.system_state.ready_to_stop())
{
// Canister is either not stopping or isn't ready to be stopped yet. Nothing to
// do.
continue;
}

// Transition the canister to "stopped".
let stopping_status =
mem::replace(&mut canister.system_state.status, CanisterStatus::Stopped);

if let CanisterStatus::Stopping { stop_contexts, .. } = stopping_status {
// Respond to the stop messages.
for stop_context in stop_contexts {
match stop_context {
StopCanisterContext::Ingress {
sender,
message_id,
call_id,
} => {
// Responding to stop_canister request from a user.
self.remove_stop_canister_call(
&mut state,
canister.canister_id(),
call_id,
);
self.ingress_history_writer.set_status(
&mut state,
message_id,
IngressStatus::Known {
receiver: IC_00.get(),
user_id: sender,
let canister_id = canister.canister_id();
let ready_to_stop = canister.system_state.ready_to_stop();
match canister.system_state.status {
// Canister is not stopping so we can skip it.
CanisterStatus::Running { .. } | CanisterStatus::Stopped => continue,
// Canister is stopping so there might be some work to do.
CanisterStatus::Stopping {
ref mut stop_contexts,
..
} => {
if ready_to_stop {
// Canister is ready to stop.
// Transition the canister to "stopped".
let stopping_status = mem::replace(
&mut canister.system_state.status,
CanisterStatus::Stopped,
);

// Reply to all pending stop_canister requests.
if let CanisterStatus::Stopping { stop_contexts, .. } = stopping_status {
for stop_context in stop_contexts {
self.reply_to_stop_context(
&stop_context,
&mut state,
canister_id,
time,
state: IngressState::Completed(WasmResult::Reply(
EmptyBlob.encode(),
)),
},
)
}
StopCanisterContext::Canister {
sender,
reply_callback,
call_id,
cycles,
} => {
// Responding to stop_canister request from a canister.
let subnet_id_as_canister_id = CanisterId::from(self.own_subnet_id);
self.remove_stop_canister_call(
&mut state,
canister.canister_id(),
call_id,
);
let response = ic_types::messages::Response {
originator: sender,
respondent: subnet_id_as_canister_id,
originator_reply_callback: reply_callback,
refund: cycles,
response_payload: Payload::Data(EmptyBlob.encode()),
};
state.push_subnet_output_response(response.into());
StopCanisterReply::Completed,
);
}
}
} else {
// Respond to any stop contexts that have timed out.
self.timeout_expired_requests(time, canister_id, stop_contexts, &mut state);
}
}
}
Expand All @@ -2776,6 +2824,50 @@ impl ExecutionEnvironment {
state
}

fn timeout_expired_requests(
&self,
time: Time,
canister_id: CanisterId,
stop_contexts: &mut Vec<StopCanisterContext>,
state: &mut ReplicatedState,
) {
// Identify if any of the stop contexts have expired.
let (expired_stop_contexts, remaining_stop_contexts) = std::mem::take(stop_contexts)
.into_iter()
.partition(|stop_context| {
match stop_context.call_id() {
Some(call_id) => {
let sc_time = state
.metadata
.subnet_call_context_manager
.get_time_for_stop_canister_call(call_id);
match sc_time {
Some(t) => time >= t + self.config.stop_canister_timeout_duration,
// Should never hit this case unless there's a
// bug but handle it for robustness.
None => false,
}
}
// Should only happen for old stop requests that existed
// before call ids were added.
None => false,
}
});

for stop_context in expired_stop_contexts {
// Respond to expired requests that they timed out.
self.reply_to_stop_context(
&stop_context,
state,
canister_id,
time,
StopCanisterReply::Timeout,
);
}

*stop_contexts = remaining_stop_contexts;
}

fn reject_unexpected_ingress(
&self,
method: Ic00Method,
Expand Down
1 change: 1 addition & 0 deletions rs/execution_environment/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,5 +287,6 @@ fn dashboard_label_value_from(code: ErrorCode) -> &'static str {
ReservedCyclesLimitExceededInMemoryAllocation => "Canister cannot increase memory allocation due to its reserved cycles limit",
ReservedCyclesLimitExceededInMemoryGrow => "Canister cannot grow memory due to its reserved cycles limit",
InsufficientCyclesInMessageMemoryGrow => "Canister does not have enough cycles to grow message memory",
StopCanisterRequestTimeout => "Stop canister request timed out",
}
}
Loading

0 comments on commit e3a6b3d

Please sign in to comment.