Skip to content

Commit

Permalink
feat: Allow accepting and burning cycles in replicated queries (#363)
Browse files Browse the repository at this point in the history
This PR allows canisters to accept and burn cycles when executing
queries in replicated mode (e.g. as an ingress message or when another
canister calls the query method). See also spec
[PR](dfinity/portal#3760).

This allows canisters to expect some payment in cases someone is calling
an expensive endpoint similar to how this is possible in update calls.
Given that replicated queries run across all nodes, there's no technical
issue in persisting cycles changes and it gives developers another way
of protecting expensive endpoints of their canisters.

The main parts of the change are the following:

Previously the sandbox would return an optional `StateModifications`
object as changes would need to be persisted in update calls but not in
queries. Since we would like to persist cycles changes which are part of
the system state of the canister, the struct is modified to have an
`Option<ExecutionStateModifications>` to capture the optionality of
applying execution state changes while it always includes
`SystemStateChanges`. The system API is also adjusted to return only the
changes that are relevant per context of execution.

The benefit of this is that it makes more clear what parts of the
canister state can be persisted. It also allows to handle more uniformly
other parts of the system state that need to be persisted for replicated
queries, like canister logs. Instead of handling them separately, they
could now be simply included when applying changes to the system state
(not included in this PR but would be a possible follow up). Further
future changes that have similar characteristics (e.g. persisting
canister metrics in a way similar to logs) could be incorporated more
easily.

The second big chunk of changes is in the `replicated_query` execution
handler. The handler is modified to allow for handling the acceptance of
cycles and refunding any remaining amount to the caller.

Some tests were also added to confirm things work as expected.
  • Loading branch information
dsarlis authored Jan 22, 2025
1 parent 225b046 commit 178acea
Show file tree
Hide file tree
Showing 23 changed files with 606 additions and 332 deletions.
4 changes: 2 additions & 2 deletions hs/spec_compliance/src/IC/Test/Spec/CanisterVersion.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ canister_version_tests ecid =
ctr <- callToQuery'' cid (replyData canister_version) >>= is2xx >>= isReply >>= asWord64
ctr @?= 1
ctr <- callToQuery'' cid (replyData canister_version) >>= is2xx >>= isReply >>= asWord64
ctr @?= 1
ctr @?= 2
ctr <- callToQuery'' cid (replyData canister_version) >>= is2xx >>= isReply >>= asWord64
ctr @?= 1,
ctr @?= 3,
simpleTestCase "in update" ecid $ \cid -> do
ctr <- call cid (replyData canister_version) >>= asWord64
ctr @?= 1
Expand Down
40 changes: 22 additions & 18 deletions rs/canister_sandbox/src/protocol/ctlsvc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ mod tests {
ctlsvc::{ExecutionFinishedReply, ExecutionPausedReply, ExecutionPausedRequest, Reply},
id::ExecId,
logging::{LogLevel, LogRequest},
structs::{MemoryModifications, SandboxExecOutput, StateModifications},
structs::{
ExecutionStateModifications, MemoryModifications, SandboxExecOutput, StateModifications,
},
};

use super::{ExecutionFinishedRequest, Request};
Expand Down Expand Up @@ -105,24 +107,26 @@ mod tests {
system_api_call_counters: SystemApiCallCounters::default(),
canister_log: CanisterLog::default(),
},
state: Some(StateModifications {
globals: vec![
Global::I32(10),
Global::I64(32),
Global::F32(10.5),
Global::F64(1.1),
Global::V128(123),
],
wasm_memory: MemoryModifications {
page_delta: PageMap::new_for_testing().serialize_delta(&[]),
size: NumWasmPages::new(10),
},
stable_memory: MemoryModifications {
page_delta: PageMap::new_for_testing().serialize_delta(&[]),
size: NumWasmPages::new(42),
},
state: StateModifications {
execution_state_modifications: Some(ExecutionStateModifications {
globals: vec![
Global::I32(10),
Global::I64(32),
Global::F32(10.5),
Global::F64(1.1),
Global::V128(123),
],
wasm_memory: MemoryModifications {
page_delta: PageMap::new_for_testing().serialize_delta(&[]),
size: NumWasmPages::new(10),
},
stable_memory: MemoryModifications {
page_delta: PageMap::new_for_testing().serialize_delta(&[]),
size: NumWasmPages::new(42),
},
}),
system_state_changes: SystemStateChanges::default(),
}),
},
execute_total_duration: Duration::from_secs(10),
execute_run_duration: Duration::from_secs(1),
};
Expand Down
35 changes: 26 additions & 9 deletions rs/canister_sandbox/src/protocol/structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,42 @@ pub struct SandboxExecInput {
pub struct SandboxExecOutput {
pub slice: SliceExecutionOutput,
pub wasm: WasmExecutionOutput,
pub state: Option<StateModifications>,
pub state: StateModifications,
pub execute_total_duration: std::time::Duration,
pub execute_run_duration: std::time::Duration,
}

impl SandboxExecOutput {
pub fn take_state_modifications(&mut self) -> StateModifications {
std::mem::take(&mut self.state)
}
}

/// Describes the memory changes performed by execution.
#[derive(Clone, PartialEq, Debug, Deserialize, Serialize)]
pub struct MemoryModifications {
pub page_delta: PageDeltaSerialization,
pub size: NumWasmPages,
}

#[derive(Clone, PartialEq, Debug, Deserialize, Serialize)]
#[derive(Serialize, Default, Debug, Deserialize, Clone, PartialEq)]
pub struct StateModifications {
/// Modifications in the execution state of the canister.
///
/// This field is optional because the state changes might or might not
/// be applied depending on the method executed.
pub execution_state_modifications: Option<ExecutionStateModifications>,

/// Modifications in the system state of the canister.
///
/// The system state changes contain parts that are always applied
/// and parts that are only applied depending on the method executed
/// (similarly to `execution_state_modifications`).
pub system_state_changes: SystemStateChanges,
}

#[derive(Serialize, Debug, Deserialize, Clone, PartialEq)]
pub struct ExecutionStateModifications {
/// The state of the global variables after execution.
pub globals: Vec<Global>,

Expand All @@ -58,19 +80,15 @@ pub struct StateModifications {

/// Modifications in the stable memory.
pub stable_memory: MemoryModifications,

/// Modifications in the system state.
pub system_state_changes: SystemStateChanges,
}

impl StateModifications {
impl ExecutionStateModifications {
pub fn new(
globals: Vec<Global>,
wasm_memory: &Memory,
stable_memory: &Memory,
wasm_memory_delta: &[PageIndex],
stable_memory_delta: &[PageIndex],
system_state_changes: SystemStateChanges,
) -> Self {
let wasm_memory = MemoryModifications {
page_delta: wasm_memory.page_map.serialize_delta(wasm_memory_delta),
Expand All @@ -82,11 +100,10 @@ impl StateModifications {
size: stable_memory.size,
};

StateModifications {
ExecutionStateModifications {
globals,
wasm_memory,
stable_memory,
system_state_changes,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use crate::controller_launcher_service::ControllerLauncherService;
use crate::launcher_service::LauncherService;
use crate::protocol::id::{ExecId, MemoryId, WasmId};
use crate::protocol::sbxsvc::MemorySerialization;
use crate::protocol::structs::{SandboxExecInput, SandboxExecOutput};
use crate::protocol::structs::{SandboxExecInput, SandboxExecOutput, StateModifications};
use crate::sandbox_service::SandboxService;
use crate::{protocol, rpc};
use ic_config::embedders::Config as EmbeddersConfig;
use ic_config::flag_status::FlagStatus;
use ic_embedders::wasm_executor::{
get_wasm_reserved_pages, wasm_execution_error, CanisterStateChanges, PausedWasmExecution,
SliceExecutionOutput, WasmExecutionResult, WasmExecutor,
get_wasm_reserved_pages, wasm_execution_error, CanisterStateChanges, ExecutionStateChanges,
PausedWasmExecution, SliceExecutionOutput, WasmExecutionResult, WasmExecutor,
};
use ic_embedders::{
wasm_utils::WasmImportsDetails, CompilationCache, CompilationResult, StoredCompilation,
Expand Down Expand Up @@ -1579,22 +1579,31 @@ impl SandboxedExecutionController {
next_stable_memory_id: MemoryId,
canister_id: CanisterId,
sandbox_process: Arc<SandboxProcess>,
) -> Option<CanisterStateChanges> {
) -> CanisterStateChanges {
// If the execution has failed, then we don't apply any changes.
if exec_output.wasm.wasm_result.is_err() {
return None;
return CanisterStateChanges::default();
}
match exec_output.state.take() {
None => None,
Some(state_modifications) => {

let StateModifications {
execution_state_modifications,
system_state_changes,
} = exec_output.take_state_modifications();

match execution_state_modifications {
None => CanisterStateChanges {
execution_state_changes: None,
system_state_changes,
},
Some(execution_state_modifications) => {
// TODO: If a canister has broken out of wasm then it might have allocated more
// wasm or stable memory then allowed. We should add an additional check here
// that thet canister is still within it's allowed memory usage.
let mut wasm_memory = execution_state.wasm_memory.clone();
wasm_memory
.page_map
.deserialize_delta(state_modifications.wasm_memory.page_delta);
wasm_memory.size = state_modifications.wasm_memory.size;
.deserialize_delta(execution_state_modifications.wasm_memory.page_delta);
wasm_memory.size = execution_state_modifications.wasm_memory.size;
wasm_memory.sandbox_memory = SandboxMemory::synced(wrap_remote_memory(
&sandbox_process,
next_wasm_memory_id,
Expand All @@ -1614,8 +1623,8 @@ impl SandboxedExecutionController {
let mut stable_memory = execution_state.stable_memory.clone();
stable_memory
.page_map
.deserialize_delta(state_modifications.stable_memory.page_delta);
stable_memory.size = state_modifications.stable_memory.size;
.deserialize_delta(execution_state_modifications.stable_memory.page_delta);
stable_memory.size = execution_state_modifications.stable_memory.size;
stable_memory.sandbox_memory = SandboxMemory::synced(wrap_remote_memory(
&sandbox_process,
next_stable_memory_id,
Expand All @@ -1632,12 +1641,14 @@ impl SandboxedExecutionController {
.sandboxed_execution_critical_error_invalid_memory_size
.inc();
}
Some(CanisterStateChanges {
globals: state_modifications.globals,
wasm_memory,
stable_memory,
system_state_changes: state_modifications.system_state_changes,
})
CanisterStateChanges {
execution_state_changes: Some(ExecutionStateChanges {
globals: execution_state_modifications.globals,
wasm_memory,
stable_memory,
}),
system_state_changes,
}
}
}
}
Expand Down
74 changes: 41 additions & 33 deletions rs/canister_sandbox/src/sandbox_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use std::time::{Duration, Instant};
use crate::protocol::id::{ExecId, MemoryId, WasmId};
use crate::protocol::sbxsvc::{CreateExecutionStateSerializedSuccessReply, OpenMemoryRequest};
use crate::protocol::structs::{
MemoryModifications, SandboxExecInput, SandboxExecOutput, StateModifications,
ExecutionStateModifications, MemoryModifications, SandboxExecInput, SandboxExecOutput,
StateModifications,
};
use crate::{controller_service::ControllerService, protocol};
use ic_config::embedders::Config as EmbeddersConfig;
Expand Down Expand Up @@ -158,37 +159,44 @@ impl Execution {

match wasm_result {
Ok(_) => {
let state_modifications = deltas.map(
|WasmStateChanges {
dirty_page_indices,
globals,
}| {
let system_state_changes = match instance_or_system_api {
// Here we use `store_data_mut` instead of
// `into_store_data` because the later will drop the
// wasmtime Instance which can be an expensive
// operation. Mutating the store instead allows us
// to delay the drop until after the execution
// completed message is sent back to the main
// process.
Ok(mut instance) => instance
.store_data_mut()
.system_api_mut()
.expect("System api not present in the wasmtime instance")
.take_system_state_changes(),
Err(system_api) => system_api.into_system_state_changes(),
};
StateModifications::new(
globals,
&wasm_memory,
&stable_memory,
&dirty_page_indices.wasm_memory_delta,
&dirty_page_indices.stable_memory_delta,
system_state_changes,
)
},
);
if state_modifications.is_some() {
let state_modifications = {
let system_state_changes = match instance_or_system_api {
// Here we use `store_data_mut` instead of
// `into_store_data` because the later will drop the
// wasmtime Instance which can be an expensive
// operation. Mutating the store instead allows us
// to delay the drop until after the execution
// completed message is sent back to the main
// process.
Ok(mut instance) => instance
.store_data_mut()
.system_api_mut()
.expect("System api not present in the wasmtime instance")
.take_system_state_changes(),
Err(system_api) => system_api.into_system_state_changes(),
};

let execution_state_modifications = deltas.map(
|WasmStateChanges {
dirty_page_indices,
globals,
}| {
ExecutionStateModifications::new(
globals,
&wasm_memory,
&stable_memory,
&dirty_page_indices.wasm_memory_delta,
&dirty_page_indices.stable_memory_delta,
)
},
);

StateModifications {
execution_state_modifications,
system_state_changes,
}
};
if state_modifications.execution_state_modifications.is_some() {
self.sandbox_manager
.add_memory(exec_input.next_wasm_memory_id, wasm_memory);
self.sandbox_manager
Expand Down Expand Up @@ -237,7 +245,7 @@ impl Execution {
exec_output: SandboxExecOutput {
slice,
wasm: wasm_output,
state: None,
state: StateModifications::default(),
execute_total_duration: total_timer.elapsed(),
execute_run_duration: run_timer.elapsed(),
},
Expand Down
Loading

0 comments on commit 178acea

Please sign in to comment.