From 9c91f401ae3082c011ca0a79d9e55f7891833b47 Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Sat, 28 Dec 2024 18:30:44 +0000 Subject: [PATCH 01/10] Move file metadata to stable memory --- Cargo.lock | 4 + .../canisters/storage_bucket/impl/Cargo.toml | 6 + .../src/jobs/migrate_data_to_stable_map.rs | 32 +++ .../storage_bucket/impl/src/jobs/mod.rs | 2 + .../storage_bucket/impl/src/model/files.rs | 243 ++++++++++++++---- .../impl/src/model/files/proptests.rs | 125 +++++++++ .../impl/src/model/files_map.rs | 55 ++++ .../impl/src/model/files_per_accessor_map.rs | 48 ++++ .../storage_bucket/impl/src/model/mod.rs | 3 + .../impl/src/model/reference_counts.rs | 49 ++++ .../libraries/stable_memory_map/src/keys.rs | 8 + .../stable_memory_map/src/keys/chat_event.rs | 4 +- .../src/keys/community_event.rs | 4 +- .../stable_memory_map/src/keys/files.rs | 114 ++++++++ .../stable_memory_map/src/keys/macros.rs | 10 +- .../stable_memory_map/src/keys/member.rs | 4 +- .../src/keys/principal_to_user_id.rs | 4 +- .../libraries/stable_memory_map/src/lib.rs | 4 + 18 files changed, 649 insertions(+), 70 deletions(-) create mode 100644 backend/canisters/storage_bucket/impl/src/jobs/migrate_data_to_stable_map.rs create mode 100644 backend/canisters/storage_bucket/impl/src/model/files/proptests.rs create mode 100644 backend/canisters/storage_bucket/impl/src/model/files_map.rs create mode 100644 backend/canisters/storage_bucket/impl/src/model/files_per_accessor_map.rs create mode 100644 backend/canisters/storage_bucket/impl/src/model/reference_counts.rs create mode 100644 backend/libraries/stable_memory_map/src/keys/files.rs diff --git a/Cargo.lock b/Cargo.lock index 748193428d..b7db6c6895 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7375,13 +7375,17 @@ dependencies = [ "ic-stable-structures", "msgpack", "num-traits", + "proptest", "rand 0.8.5", "serde", "serde_bytes", "stable_memory", + "stable_memory_map", "storage_bucket_canister", "storage_index_canister", "storage_index_canister_c2c_client", + "test-case", + "test-strategy", "timer_job_queues", "tracing", "types", diff --git a/backend/canisters/storage_bucket/impl/Cargo.toml b/backend/canisters/storage_bucket/impl/Cargo.toml index 9916a44816..5c08883362 100644 --- a/backend/canisters/storage_bucket/impl/Cargo.toml +++ b/backend/canisters/storage_bucket/impl/Cargo.toml @@ -24,6 +24,7 @@ rand = { workspace = true } serde = { workspace = true } serde_bytes = { workspace = true } stable_memory = { path = "../../../libraries/stable_memory" } +stable_memory_map = { path = "../../../libraries/stable_memory_map" } storage_bucket_canister = { path = "../api" } storage_index_canister = { path = "../../storage_index/api" } storage_index_canister_c2c_client = { path = "../../storage_index/c2c_client" } @@ -31,3 +32,8 @@ timer_job_queues = { path = "../../../libraries/timer_job_queues" } tracing = { workspace = true } types = { path = "../../../libraries/types" } utils = { path = "../../../libraries/utils" } + +[dev-dependencies] +proptest = { workspace = true } +test-case = { workspace = true } +test-strategy = { workspace = true } \ No newline at end of file diff --git a/backend/canisters/storage_bucket/impl/src/jobs/migrate_data_to_stable_map.rs b/backend/canisters/storage_bucket/impl/src/jobs/migrate_data_to_stable_map.rs new file mode 100644 index 0000000000..95b31a428d --- /dev/null +++ b/backend/canisters/storage_bucket/impl/src/jobs/migrate_data_to_stable_map.rs @@ -0,0 +1,32 @@ +use crate::mutate_state; +use ic_cdk_timers::TimerId; +use std::cell::Cell; +use std::time::Duration; +use tracing::info; + +thread_local! { + static TIMER_ID: Cell> = Cell::default(); +} + +const INTERVAL: Duration = Duration::from_secs(60); // 1 minute + +pub fn start_job() { + let timer_id = ic_cdk_timers::set_timer_interval(INTERVAL, run); + TIMER_ID.set(Some(timer_id)); +} + +fn run() { + info!("'migrate_data_to_stable_map' job running"); + + mutate_state(|state| { + if state.data.files.migrate_files() + && state.data.files.migrate_reference_counts() + && state.data.files.migrate_accessors() + { + if let Some(timer_id) = TIMER_ID.take() { + ic_cdk_timers::clear_timer(timer_id); + info!("'migrate_data_to_stable_map' job completed"); + } + } + }) +} diff --git a/backend/canisters/storage_bucket/impl/src/jobs/mod.rs b/backend/canisters/storage_bucket/impl/src/jobs/mod.rs index cb1dd8a659..59019042f7 100644 --- a/backend/canisters/storage_bucket/impl/src/jobs/mod.rs +++ b/backend/canisters/storage_bucket/impl/src/jobs/mod.rs @@ -2,10 +2,12 @@ use crate::RuntimeState; pub mod calculate_freezing_limit; pub mod check_cycles_balance; +mod migrate_data_to_stable_map; pub mod remove_expired_files; pub(crate) fn start(state: &RuntimeState) { calculate_freezing_limit::start_job(); check_cycles_balance::start_job(); + migrate_data_to_stable_map::start_job(); remove_expired_files::start_job_if_required(state); } diff --git a/backend/canisters/storage_bucket/impl/src/model/files.rs b/backend/canisters/storage_bucket/impl/src/model/files.rs index 2616b73ace..3309236fd3 100644 --- a/backend/canisters/storage_bucket/impl/src/model/files.rs +++ b/backend/canisters/storage_bucket/impl/src/model/files.rs @@ -1,3 +1,6 @@ +use crate::model::files_map::FilesMap; +use crate::model::files_per_accessor_map::FilesPerAccessorStableMap; +use crate::model::reference_counts::ReferenceCountsStableMap; use crate::model::stable_blob_storage::StableBlobStorage; use crate::{calc_chunk_count, MAX_BLOB_SIZE_BYTES}; use candid::Principal; @@ -6,16 +9,26 @@ use std::cmp::Ordering; use std::collections::btree_map::Entry::{Occupied, Vacant}; use std::collections::{BTreeMap, BTreeSet, VecDeque}; use storage_bucket_canister::upload_chunk_v2::Args as UploadChunkArgs; +use tracing::info; use types::{AccessorId, CanisterId, FileAdded, FileId, FileMetaData, FileRemoved, Hash, TimestampMillis}; use utils::file_id::generate_file_id; use utils::hasher::hash_bytes; +#[cfg(test)] +mod proptests; + #[derive(Serialize, Deserialize, Default)] pub struct Files { files: BTreeMap, + #[serde(default)] + files_stable: FilesMap, pending_files: BTreeMap, reference_counts: ReferenceCounts, + #[serde(default)] + reference_counts_stable: ReferenceCountsStableMap, accessors_map: AccessorsMap, + #[serde(default)] + accessors_map_stable: FilesPerAccessorStableMap, blobs: StableBlobStorage, expiration_queue: BTreeMap>, bytes_used: u64, @@ -50,8 +63,64 @@ impl File { } impl Files { - pub fn get(&self, file_id: &FileId) -> Option<&File> { - self.files.get(file_id) + pub fn migrate_files(&mut self) -> bool { + if self.files.is_empty() { + return true; + } + + let mut keys = 0; + while let Some((file_id, file)) = self.files.pop_first() { + self.files_stable.set(file_id, file); + keys += 1; + + if keys % 1000 == 0 && ic_cdk::api::instruction_counter() > 2_000_000_000 { + break; + } + } + info!(keys, "Migrated files to stable map"); + self.reference_counts.counts.is_empty() + } + + pub fn migrate_reference_counts(&mut self) -> bool { + if self.reference_counts.counts.is_empty() { + return true; + } + + let mut keys = 0; + while let Some((hash, count)) = self.reference_counts.counts.pop_first() { + self.reference_counts_stable.set(hash, count); + keys += 1; + + if keys % 1000 == 0 && ic_cdk::api::instruction_counter() > 2_000_000_000 { + break; + } + } + info!(keys, "Migrated reference counts to stable map"); + self.reference_counts.counts.is_empty() + } + + pub fn migrate_accessors(&mut self) -> bool { + if self.accessors_map.map.is_empty() { + return true; + } + + let mut keys = 0; + while let Some((accessor_id, files)) = self.accessors_map.map.pop_first() { + for file in files { + self.accessors_map_stable.link(accessor_id, file); + keys += 1; + } + + if ic_cdk::api::instruction_counter() > 2_000_000_000 { + break; + } + } + info!(keys, "Migrated accessors to stable map"); + self.accessors_map.map.is_empty() + } + + pub fn get(&self, file_id: &FileId) -> Option { + self.get_file(file_id) } pub fn pending_file(&self, file_id: &FileId) -> Option<&PendingFile> { @@ -63,8 +132,7 @@ impl Files { } pub fn owner(&self, file_id: &FileId) -> Option { - self.files - .get(file_id) + self.get_file(file_id) .map(|f| f.owner) .or_else(|| self.pending_files.get(file_id).map(|f| f.owner)) } @@ -74,7 +142,7 @@ impl Files { return PutChunkResult::FileTooBig(MAX_BLOB_SIZE_BYTES); } - if self.files.contains_key(&args.file_id) { + if self.contains_file(&args.file_id) || self.files_stable.contains_key(&args.file_id) { return PutChunkResult::FileAlreadyExists; } @@ -146,10 +214,9 @@ impl Files { } pub fn remove(&mut self, caller: Principal, file_id: FileId) -> RemoveFileResult { - if let Occupied(e) = self.files.entry(file_id) { - if e.get().can_be_removed_by(caller) { - let file = e.remove(); - let file_removed = self.process_removed_file(file_id, file); + if let Some(file) = self.get_file(&file_id) { + if file.can_be_removed_by(caller) { + let file_removed = self.remove_file(file_id).unwrap(); RemoveFileResult::Success(file_removed) } else { @@ -161,10 +228,7 @@ impl Files { } pub fn remove_unchecked(&mut self, file_id: FileId) -> RemoveFileResult { - if let Occupied(e) = self.files.entry(file_id) { - let file = e.remove(); - let file_removed = self.process_removed_file(file_id, file); - + if let Some(file_removed) = self.remove_file(file_id) { RemoveFileResult::Success(file_removed) } else { RemoveFileResult::NotFound @@ -188,8 +252,11 @@ impl Files { let hash = file.hash; let new_file_id = generate_file_id(canister_id, caller, hash, file_id_seed, now); - self.accessors_map.link_many(caller, accessors.iter().copied(), new_file_id); - self.reference_counts.incr(hash); + self.link_accessor_to_file(caller, new_file_id); + for accessor in accessors.iter().copied() { + self.link_accessor_to_file(accessor, new_file_id); + } + self.incr_reference_count(hash); let meta_data = file.meta_data(); let new_file = File { @@ -200,17 +267,13 @@ impl Files { mime_type: file.mime_type, }; - if self.files.insert(new_file_id, new_file).is_none() { - ForwardFileResult::Success(FileAdded { - file_id: new_file_id, - hash, - size, - meta_data, - }) - } else { - // There should never be a file_id clash - unreachable!(); - } + self.set_file(new_file_id, new_file); + ForwardFileResult::Success(FileAdded { + file_id: new_file_id, + hash, + size, + meta_data, + }) } pub fn remove_pending_file(&mut self, file_id: &FileId) -> bool { @@ -220,22 +283,25 @@ impl Files { pub fn remove_accessor(&mut self, accessor_id: &AccessorId) -> Vec { let mut files_removed = Vec::new(); - if let Some(file_ids) = self.accessors_map.remove(accessor_id) { + if let Some(file_ids) = self + .accessors_map + .remove(accessor_id) + .or_else(|| Some(self.accessors_map_stable.remove(*accessor_id).into_iter().collect())) + { for file_id in file_ids { let mut blob_to_delete = None; - if let Occupied(mut e) = self.files.entry(file_id) { - let file = e.get_mut(); + if let Some(mut file) = self.get_file(&file_id) { file.accessors.remove(accessor_id); if file.accessors.is_empty() { - let delete_blob = self.reference_counts.decr(file.hash) == 0; + let delete_blob = self.decr_reference_count(file.hash) == 0; if delete_blob { blob_to_delete = Some(file.hash); } - let file = e.remove(); - files_removed.push(FileRemoved { - file_id, - meta_data: file.meta_data(), - }); + if let Some(file_removed) = self.remove_file(file_id) { + files_removed.push(file_removed); + } + } else { + self.set_file(file_id, file); } } @@ -249,8 +315,9 @@ impl Files { } pub fn update_owner(&mut self, file_id: &FileId, new_owner: Principal) -> bool { - if let Some(file) = self.files.get_mut(file_id) { + if let Some(mut file) = self.get_file(file_id) { file.owner = new_owner; + self.set_file(*file_id, file); true } else { false @@ -260,9 +327,10 @@ impl Files { pub fn update_accessor_id(&mut self, old_accessor_id: AccessorId, new_accessor_id: AccessorId) { if let Some(files) = self.accessors_map.map.remove(&old_accessor_id) { for file_id in files.iter() { - if let Some(file) = self.files.get_mut(file_id) { + if let Some(mut file) = self.get_file(file_id) { if file.accessors.remove(&old_accessor_id) { file.accessors.insert(new_accessor_id); + self.set_file(*file_id, file); } } } @@ -292,8 +360,8 @@ impl Files { let mut files_removed = Vec::with_capacity(files_to_remove.len()); for file_id in files_to_remove { - if let Some(file) = self.files.remove(&file_id) { - files_removed.push(self.process_removed_file(file_id, file)); + if let Some(file_removed) = self.remove_file(file_id) { + files_removed.push(file_removed); } } files_removed @@ -313,23 +381,25 @@ impl Files { pub fn metrics(&self) -> Metrics { Metrics { - file_count: self.files.len() as u64, + file_count: (self.files.len() + self.files_stable.len()) as u64, blob_count: self.blobs.len(), } } fn insert_completed_file(&mut self, file_id: FileId, completed_file: PendingFile) { - self.accessors_map - .link_many(completed_file.owner, completed_file.accessors.iter().copied(), file_id); + self.link_accessor_to_file(completed_file.owner, file_id); + for accessor in completed_file.accessors.iter().copied() { + self.link_accessor_to_file(accessor, file_id); + } - self.reference_counts.incr(completed_file.hash); + self.incr_reference_count(completed_file.hash); self.add_blob_if_not_exists(completed_file.hash, completed_file.bytes); if let Some(expiry) = completed_file.expiry { self.expiration_queue.entry(expiry).or_default().push_back(file_id); } - self.files.insert( + self.set_file( file_id, File { owner: completed_file.owner, @@ -341,19 +411,37 @@ impl Files { ); } - fn process_removed_file(&mut self, file_id: FileId, file: File) -> FileRemoved { - if self.reference_counts.decr(file.hash) == 0 { + fn get_file(&self, file_id: &FileId) -> Option { + self.files.get(file_id).cloned().or_else(|| self.files_stable.get(file_id)) + } + + fn contains_file(&self, file_id: &FileId) -> bool { + self.files.contains_key(file_id) || self.files_stable.contains_key(file_id) + } + + fn set_file(&mut self, file_id: FileId, file: File) { + if let Occupied(mut e) = self.files.entry(file_id) { + e.insert(file); + } else { + self.files_stable.set(file_id, file); + } + } + + fn remove_file(&mut self, file_id: FileId) -> Option { + let file = self.files.remove(&file_id).or_else(|| self.files_stable.remove(&file_id))?; + + if self.decr_reference_count(file.hash) == 0 { self.remove_blob(&file.hash); } for accessor_id in file.accessors.iter() { - self.accessors_map.unlink(*accessor_id, &file_id); + self.unlink_accessor_from_file(*accessor_id, file_id); } - FileRemoved { + Some(FileRemoved { file_id, meta_data: file.meta_data(), - } + }) } fn add_blob_if_not_exists(&mut self, hash: Hash, bytes: Vec) { @@ -380,6 +468,55 @@ impl Files { Some((file.clone(), size)) } + + fn incr_reference_count(&mut self, hash: Hash) -> u32 { + if self.reference_counts.counts.contains_key(&hash) { + self.reference_counts.incr(hash) + } else { + self.reference_counts_stable.incr(hash) + } + } + + fn decr_reference_count(&mut self, hash: Hash) -> u32 { + if self.reference_counts.counts.contains_key(&hash) { + self.reference_counts.decr(hash) + } else { + self.reference_counts_stable.decr(hash) + } + } + + fn link_accessor_to_file(&mut self, accessor_id: AccessorId, file_id: FileId) { + if self.accessors_map.map.contains_key(&accessor_id) { + self.accessors_map.link(accessor_id, file_id); + } else { + self.accessors_map_stable.link(accessor_id, file_id); + } + } + + fn unlink_accessor_from_file(&mut self, accessor_id: AccessorId, file_id: FileId) { + self.accessors_map.unlink(accessor_id, &file_id); + self.accessors_map_stable.unlink(accessor_id, file_id); + } + + #[cfg(test)] + fn check_invariants(&self) { + let files = self.files_stable.get_all(); + + assert_eq!(files.len(), self.files_stable.len()); + + let mut files_per_accessor: BTreeMap> = BTreeMap::new(); + let mut reference_counts = BTreeMap::new(); + + for (file_id, file) in files { + for accessor in file.accessors.iter() { + files_per_accessor.entry(*accessor).or_default().push(file_id); + } + *reference_counts.entry(file.hash).or_default() += 1; + } + + assert_eq!(files_per_accessor, self.accessors_map_stable.get_all()); + assert_eq!(reference_counts, self.reference_counts_stable.get_all()); + } } #[derive(Serialize, Deserialize, Default)] @@ -418,14 +555,6 @@ struct AccessorsMap { } impl AccessorsMap { - pub fn link_many(&mut self, owner: Principal, accessors: impl Iterator, file_id: FileId) { - self.link(owner, file_id); - - for accessor in accessors { - self.link(accessor, file_id); - } - } - pub fn link(&mut self, accessor_id: AccessorId, file_id: FileId) { self.map.entry(accessor_id).or_default().insert(file_id); } diff --git a/backend/canisters/storage_bucket/impl/src/model/files/proptests.rs b/backend/canisters/storage_bucket/impl/src/model/files/proptests.rs new file mode 100644 index 0000000000..ae94afb269 --- /dev/null +++ b/backend/canisters/storage_bucket/impl/src/model/files/proptests.rs @@ -0,0 +1,125 @@ +use crate::model::files::{Files, PutChunkArgs}; +use candid::Principal; +use ic_stable_structures::memory_manager::{MemoryId, MemoryManager}; +use ic_stable_structures::DefaultMemoryImpl; +use proptest::collection::vec as pvec; +use proptest::prelude::*; +use proptest::prop_oneof; +use test_strategy::proptest; +use types::{AccessorId, CanisterId, FileId, Hash, TimestampMillis}; + +#[derive(Debug, Clone)] +enum Operation { + Add { + owner: Principal, + file_id: FileId, + }, + Remove { + file_index: usize, + }, + Forward { + owner: Principal, + file_index: usize, + file_id_seed: u128, + }, + UpdateAccessorId { + old: Principal, + new: Principal, + }, + RemoveAccessor { + accessor: AccessorId, + }, +} + +fn operation_strategy() -> impl Strategy { + prop_oneof![ + 50 => (any::(), any::()) + .prop_map(|(user_index, file_id)| Operation::Add { owner: principal(user_index), file_id }), + 20 => any::() + .prop_map(|file_index| Operation::Remove { file_index }), + 10 => (any::(), any::(), any::()).prop_map(|(user_index, file_index, file_id_seed)| Operation::Forward { owner: principal(user_index), file_index, file_id_seed } ), + 5 => (any::(), any::()).prop_map(|(old_index, new_index)| Operation::UpdateAccessorId { old: principal(old_index), new: principal(new_index) } ), + 3 => any::().prop_map(|user_index| Operation::RemoveAccessor { accessor: principal(user_index) }), + ] +} + +#[proptest(cases = 10)] +fn comprehensive(#[strategy(pvec(operation_strategy(), 1_000..5_000))] ops: Vec) { + let memory = MemoryManager::init(DefaultMemoryImpl::default()); + stable_memory_map::init(memory.get(MemoryId::new(1))); + + let mut files = Files::default(); + + let mut file_ids = Vec::new(); + + let mut timestamp = 1000; + for op in ops.into_iter() { + if let Operation::Add { owner, file_id } = op { + file_ids.push((owner, file_id)); + } + + execute_operation(&mut files, op, timestamp, &mut file_ids); + timestamp += 1000; + } + + files.check_invariants(); +} + +fn execute_operation(files: &mut Files, op: Operation, timestamp: TimestampMillis, file_ids: &mut [(Principal, FileId)]) { + match op { + Operation::Add { owner, file_id } => { + files.put_chunk(PutChunkArgs { + owner, + file_id, + hash: hash(file_id), + mime_type: "".to_string(), + accessors: vec![owner], + chunk_index: 0, + chunk_size: 1, + total_size: 1, + bytes: vec![1], + expiry: None, + now: timestamp, + }); + } + Operation::Remove { file_index } => { + if !file_ids.is_empty() { + let index = file_index % file_ids.len(); + let (_, file_id) = file_ids[index]; + files.remove_file(file_id); + } + } + Operation::Forward { + owner, + file_index, + file_id_seed, + } => { + if !file_ids.is_empty() { + let index = file_index % file_ids.len(); + let (_, file_id) = file_ids[index]; + files.forward( + owner, + file_id, + CanisterId::from_slice(&[1]), + file_id_seed, + [owner].into_iter().collect(), + timestamp, + ); + } + } + Operation::UpdateAccessorId { old, new } => files.update_accessor_id(old, new), + Operation::RemoveAccessor { accessor } => { + files.remove_accessor(&accessor); + } + }; +} + +fn hash(file_id: FileId) -> Hash { + let mut bytes = [0u8; 32]; + bytes[0] = (file_id % 100) as u8; + bytes +} + +fn principal(index: usize) -> Principal { + Principal::from_slice(&index.to_be_bytes()) +} diff --git a/backend/canisters/storage_bucket/impl/src/model/files_map.rs b/backend/canisters/storage_bucket/impl/src/model/files_map.rs new file mode 100644 index 0000000000..1c4a6beadf --- /dev/null +++ b/backend/canisters/storage_bucket/impl/src/model/files_map.rs @@ -0,0 +1,55 @@ +use crate::model::files::File; +use serde::{Deserialize, Serialize}; +use stable_memory_map::{with_map, with_map_mut, FileIdToFileKeyPrefix, KeyPrefix}; +use types::FileId; + +#[derive(Serialize, Deserialize, Default)] +pub struct FilesMap { + prefix: FileIdToFileKeyPrefix, + len: usize, +} + +impl FilesMap { + pub fn get(&self, file_id: &FileId) -> Option { + with_map(|m| m.get(self.prefix.create_key(file_id))).map(bytes_to_file) + } + + pub fn contains_key(&self, file_id: &FileId) -> bool { + with_map(|m| m.contains_key(self.prefix.create_key(file_id))) + } + + pub fn set(&mut self, file_id: FileId, file: File) { + if with_map_mut(|m| m.insert(self.prefix.create_key(&file_id), file_to_bytes(file))).is_none() { + self.len = self.len.saturating_sub(1); + } + } + + pub fn remove(&mut self, file_id: &FileId) -> Option { + let removed = with_map_mut(|m| m.remove(self.prefix.create_key(file_id))).map(bytes_to_file); + if removed.is_some() { + self.len = self.len.saturating_sub(1); + } + removed + } + + pub fn len(&self) -> usize { + self.len + } + + #[cfg(test)] + pub fn get_all(&self) -> Vec<(FileId, File)> { + with_map(|m| { + m.range(self.prefix.create_key(&0)..) + .map(|(k, v)| (k.file_id(), bytes_to_file(v))) + .collect() + }) + } +} + +fn file_to_bytes(file: File) -> Vec { + msgpack::serialize_then_unwrap(file) +} + +fn bytes_to_file(bytes: Vec) -> File { + msgpack::deserialize_then_unwrap(&bytes) +} diff --git a/backend/canisters/storage_bucket/impl/src/model/files_per_accessor_map.rs b/backend/canisters/storage_bucket/impl/src/model/files_per_accessor_map.rs new file mode 100644 index 0000000000..90026bab54 --- /dev/null +++ b/backend/canisters/storage_bucket/impl/src/model/files_per_accessor_map.rs @@ -0,0 +1,48 @@ +use serde::{Deserialize, Serialize}; +use stable_memory_map::{with_map, with_map_mut, FilesPerAccessorKeyPrefix, KeyPrefix}; +use types::{AccessorId, FileId}; + +#[derive(Serialize, Deserialize, Default)] +pub struct FilesPerAccessorStableMap { + prefix: FilesPerAccessorKeyPrefix, +} + +impl FilesPerAccessorStableMap { + pub fn get(&self, accessor_id: AccessorId) -> Vec { + with_map(|m| { + m.range(self.prefix.create_key(&(accessor_id, 0))..) + .map(|(k, _)| k.file_id()) + .collect() + }) + } + + pub fn remove(&mut self, accessor_id: AccessorId) -> Vec { + let files = self.get(accessor_id); + with_map_mut(|m| { + for file in files.iter() { + m.remove(self.prefix.create_key(&(accessor_id, *file))); + } + }); + files + } + + pub fn link(&mut self, accessor_id: AccessorId, file_id: u128) { + with_map_mut(|m| m.insert(self.prefix.create_key(&(accessor_id, file_id)), Vec::new())); + } + + pub fn unlink(&mut self, accessor_id: AccessorId, file_id: u128) { + with_map_mut(|m| m.remove(self.prefix.create_key(&(accessor_id, file_id)))); + } + + #[cfg(test)] + pub fn get_all(&self) -> std::collections::BTreeMap> { + use std::collections::BTreeMap; + let mut map: BTreeMap> = BTreeMap::new(); + with_map(|m| { + for (key, _) in m.range(self.prefix.create_key(&(AccessorId::from_slice(&[]), 0))..) { + map.entry(key.accessor_id()).or_default().push(key.file_id()); + } + }); + map + } +} diff --git a/backend/canisters/storage_bucket/impl/src/model/mod.rs b/backend/canisters/storage_bucket/impl/src/model/mod.rs index cb7b4eaba8..57c88daf40 100644 --- a/backend/canisters/storage_bucket/impl/src/model/mod.rs +++ b/backend/canisters/storage_bucket/impl/src/model/mod.rs @@ -1,4 +1,7 @@ pub mod files; +pub mod files_map; +pub mod files_per_accessor_map; pub mod index_event_batch; +pub mod reference_counts; pub mod stable_blob_storage; pub mod users; diff --git a/backend/canisters/storage_bucket/impl/src/model/reference_counts.rs b/backend/canisters/storage_bucket/impl/src/model/reference_counts.rs new file mode 100644 index 0000000000..ac01bc5229 --- /dev/null +++ b/backend/canisters/storage_bucket/impl/src/model/reference_counts.rs @@ -0,0 +1,49 @@ +use serde::{Deserialize, Serialize}; +use stable_memory_map::{with_map, with_map_mut, FileReferenceCountKeyPrefix, KeyPrefix}; +use types::Hash; + +#[derive(Serialize, Deserialize, Default)] +pub struct ReferenceCountsStableMap { + prefix: FileReferenceCountKeyPrefix, +} + +impl ReferenceCountsStableMap { + pub fn incr(&mut self, hash: Hash) -> u32 { + let count = self.get(hash).saturating_add(1); + self.set(hash, count); + count + } + + pub fn decr(&mut self, hash: Hash) -> u32 { + let count = self.get(hash).saturating_sub(1); + self.set(hash, count); + count + } + + fn get(&self, hash: Hash) -> u32 { + with_map(|m| m.get(self.prefix.create_key(&hash))) + .map(bytes_to_u32) + .unwrap_or_default() + } + + pub fn set(&mut self, hash: Hash, count: u32) { + if count == 0 { + with_map_mut(|m| m.remove(self.prefix.create_key(&hash))); + } else { + with_map_mut(|m| m.insert(self.prefix.create_key(&hash), count.to_be_bytes().to_vec())); + } + } + + #[cfg(test)] + pub fn get_all(&self) -> std::collections::BTreeMap { + with_map(|m| { + m.range(self.prefix.create_key(&[0; 32])..) + .map(|(k, v)| (k.hash(), bytes_to_u32(v))) + .collect() + }) + } +} + +fn bytes_to_u32(bytes: Vec) -> u32 { + u32::from_be_bytes(bytes.try_into().unwrap()) +} diff --git a/backend/libraries/stable_memory_map/src/keys.rs b/backend/libraries/stable_memory_map/src/keys.rs index 4e3e1ba237..1060f86d2b 100644 --- a/backend/libraries/stable_memory_map/src/keys.rs +++ b/backend/libraries/stable_memory_map/src/keys.rs @@ -5,12 +5,14 @@ use std::borrow::Cow; mod chat_event; mod community_event; +mod files; mod macros; mod member; mod principal_to_user_id; pub use chat_event::*; pub use community_event::*; +pub use files::*; pub use member::*; pub use principal_to_user_id::*; @@ -91,6 +93,9 @@ pub enum KeyType { CommunityMember = 9, CommunityEvent = 10, PrincipalToUserId = 11, + FileIdToFile = 12, + FileReferenceCount = 13, + FilesPerAccessor = 14, } fn extract_key_type(bytes: &[u8]) -> Option { @@ -113,6 +118,9 @@ impl TryFrom for KeyType { 9 => Ok(KeyType::CommunityMember), 10 => Ok(KeyType::CommunityEvent), 11 => Ok(KeyType::PrincipalToUserId), + 12 => Ok(KeyType::FileIdToFile), + 13 => Ok(KeyType::FileReferenceCount), + 14 => Ok(KeyType::FilesPerAccessor), _ => Err(()), } } diff --git a/backend/libraries/stable_memory_map/src/keys/chat_event.rs b/backend/libraries/stable_memory_map/src/keys/chat_event.rs index 6acca7947b..44608509c0 100644 --- a/backend/libraries/stable_memory_map/src/keys/chat_event.rs +++ b/backend/libraries/stable_memory_map/src/keys/chat_event.rs @@ -1,6 +1,6 @@ use crate::keys::extract_key_type; use crate::keys::macros::key; -use crate::{BaseKey, KeyPrefix, KeyType}; +use crate::{KeyPrefix, KeyType}; use ic_principal::Principal; use types::{ChannelId, Chat, EventIndex, MessageIndex, UserId}; @@ -156,7 +156,7 @@ impl ChatEventKey { #[cfg(test)] mod tests { use super::*; - use crate::Key; + use crate::{BaseKey, Key}; use rand::{thread_rng, Rng, RngCore}; use types::{ChannelId, Chat, EventIndex, MessageIndex}; diff --git a/backend/libraries/stable_memory_map/src/keys/community_event.rs b/backend/libraries/stable_memory_map/src/keys/community_event.rs index 239d6349d6..77f1743c8a 100644 --- a/backend/libraries/stable_memory_map/src/keys/community_event.rs +++ b/backend/libraries/stable_memory_map/src/keys/community_event.rs @@ -1,5 +1,5 @@ use crate::keys::macros::key; -use crate::{BaseKey, KeyPrefix, KeyType}; +use crate::{KeyPrefix, KeyType}; use types::EventIndex; key!(CommunityEventKey, CommunityEventKeyPrefix, KeyType::CommunityEvent); @@ -39,7 +39,7 @@ impl CommunityEventKey { #[cfg(test)] mod tests { use super::*; - use crate::Key; + use crate::{BaseKey, Key}; use rand::{thread_rng, RngCore}; use types::EventIndex; diff --git a/backend/libraries/stable_memory_map/src/keys/files.rs b/backend/libraries/stable_memory_map/src/keys/files.rs new file mode 100644 index 0000000000..1f41a775e0 --- /dev/null +++ b/backend/libraries/stable_memory_map/src/keys/files.rs @@ -0,0 +1,114 @@ +use crate::keys::macros::key; +use crate::{KeyPrefix, KeyType}; +use types::{AccessorId, FileId, Hash}; + +key!(FileIdToFileKey, FileIdToFileKeyPrefix, KeyType::FileIdToFile); + +impl FileIdToFileKeyPrefix { + pub fn new() -> Self { + // KeyType::FileIdToFile 1 byte + FileIdToFileKeyPrefix(vec![KeyType::FileIdToFile as u8]) + } +} + +impl KeyPrefix for FileIdToFileKeyPrefix { + type Key = FileIdToFileKey; + type Suffix = FileId; + + fn create_key(&self, file_id: &FileId) -> FileIdToFileKey { + let mut bytes = Vec::with_capacity(17); + bytes.push(KeyType::FileIdToFile as u8); + bytes.extend_from_slice(&file_id.to_be_bytes()); + FileIdToFileKey(bytes) + } +} + +impl Default for FileIdToFileKeyPrefix { + fn default() -> Self { + Self::new() + } +} + +impl FileIdToFileKey { + pub fn file_id(&self) -> FileId { + FileId::from_be_bytes(self.0[1..].try_into().unwrap()) + } +} + +key!( + FileReferenceCountKey, + FileReferenceCountKeyPrefix, + KeyType::FileReferenceCount +); + +impl FileReferenceCountKeyPrefix { + pub fn new() -> Self { + // KeyType::FileReferenceCount 1 byte + FileReferenceCountKeyPrefix(vec![KeyType::FileReferenceCount as u8]) + } +} + +impl KeyPrefix for FileReferenceCountKeyPrefix { + type Key = FileReferenceCountKey; + type Suffix = Hash; + + fn create_key(&self, hash: &Hash) -> FileReferenceCountKey { + let mut bytes = Vec::with_capacity(33); + bytes.push(KeyType::FileReferenceCount as u8); + bytes.extend_from_slice(hash); + FileReferenceCountKey(bytes) + } +} + +impl Default for FileReferenceCountKeyPrefix { + fn default() -> Self { + Self::new() + } +} + +impl FileReferenceCountKey { + pub fn hash(&self) -> Hash { + self.0[1..].try_into().unwrap() + } +} + +key!(FilesPerAccessorKey, FilesPerAccessorKeyPrefix, KeyType::FilesPerAccessor); + +impl FilesPerAccessorKeyPrefix { + pub fn new() -> Self { + // KeyType::FilesPerAccessor 1 byte + FilesPerAccessorKeyPrefix(vec![KeyType::FilesPerAccessor as u8]) + } +} + +impl KeyPrefix for FilesPerAccessorKeyPrefix { + type Key = FilesPerAccessorKey; + type Suffix = (AccessorId, FileId); + + fn create_key(&self, (accessor_id, file_id): &(AccessorId, FileId)) -> FilesPerAccessorKey { + let accessor_bytes = accessor_id.as_slice(); + let mut bytes = Vec::with_capacity(accessor_bytes.len() + 17); + bytes.push(KeyType::FilesPerAccessor as u8); + bytes.extend_from_slice(accessor_bytes); + bytes.extend_from_slice(&file_id.to_be_bytes()); + FilesPerAccessorKey(bytes) + } +} + +impl Default for FilesPerAccessorKeyPrefix { + fn default() -> Self { + Self::new() + } +} + +impl FilesPerAccessorKey { + pub fn accessor_id(&self) -> AccessorId { + let end = self.0.len() - 16; + AccessorId::from_slice(&self.0[1..end]) + } + + pub fn file_id(&self) -> FileId { + let start = self.0.len() - 16; + FileId::from_be_bytes(self.0[start..].try_into().unwrap()) + } +} diff --git a/backend/libraries/stable_memory_map/src/keys/macros.rs b/backend/libraries/stable_memory_map/src/keys/macros.rs index 93ad47c635..500b9d255c 100644 --- a/backend/libraries/stable_memory_map/src/keys/macros.rs +++ b/backend/libraries/stable_memory_map/src/keys/macros.rs @@ -8,7 +8,7 @@ macro_rules! key { #[serde(into = "crate::keys::BaseKeyPrefix", try_from = "crate::keys::BaseKeyPrefix")] pub struct $key_prefix_name(Vec); - impl From<$key_name> for BaseKey { + impl From<$key_name> for crate::keys::BaseKey { fn from(value: $key_name) -> Self { crate::keys::BaseKey(value.0) } @@ -20,18 +20,18 @@ macro_rules! key { } } - impl TryFrom for $key_name { + impl TryFrom for $key_name { type Error = String; - fn try_from(value: crate::BaseKey) -> Result { + fn try_from(value: crate::keys::BaseKey) -> Result { crate::keys::validate_key(&value.0, |kt| matches!(kt, $key_types)).map(|_| $key_name(value.0)) } } - impl TryFrom for $key_prefix_name { + impl TryFrom for $key_prefix_name { type Error = String; - fn try_from(value: crate::BaseKeyPrefix) -> Result { + fn try_from(value: crate::keys::BaseKeyPrefix) -> Result { crate::keys::validate_key(&value.0, |kt| matches!(kt, $key_types)).map(|_| $key_prefix_name(value.0)) } } diff --git a/backend/libraries/stable_memory_map/src/keys/member.rs b/backend/libraries/stable_memory_map/src/keys/member.rs index 22d0941605..28079830a9 100644 --- a/backend/libraries/stable_memory_map/src/keys/member.rs +++ b/backend/libraries/stable_memory_map/src/keys/member.rs @@ -1,5 +1,5 @@ use crate::keys::macros::key; -use crate::{BaseKey, KeyPrefix, KeyType}; +use crate::{KeyPrefix, KeyType}; use ic_principal::Principal; use types::{ChannelId, MultiUserChat, UserId}; @@ -68,7 +68,7 @@ impl MemberKey { #[cfg(test)] mod tests { use super::*; - use crate::Key; + use crate::{BaseKey, Key}; use rand::{thread_rng, Rng, RngCore}; #[test] diff --git a/backend/libraries/stable_memory_map/src/keys/principal_to_user_id.rs b/backend/libraries/stable_memory_map/src/keys/principal_to_user_id.rs index e2f27e4559..f96a710e61 100644 --- a/backend/libraries/stable_memory_map/src/keys/principal_to_user_id.rs +++ b/backend/libraries/stable_memory_map/src/keys/principal_to_user_id.rs @@ -1,5 +1,5 @@ use crate::keys::macros::key; -use crate::{BaseKey, KeyPrefix, KeyType}; +use crate::{KeyPrefix, KeyType}; use ic_principal::Principal; key!(PrincipalToUserIdKey, PrincipalToUserIdKeyPrefix, KeyType::PrincipalToUserId); @@ -39,7 +39,7 @@ impl PrincipalToUserIdKey { #[cfg(test)] mod tests { use super::*; - use crate::Key; + use crate::{BaseKey, Key}; use rand::{thread_rng, RngCore}; #[test] diff --git a/backend/libraries/stable_memory_map/src/lib.rs b/backend/libraries/stable_memory_map/src/lib.rs index 99eb4bbfba..04335edaea 100644 --- a/backend/libraries/stable_memory_map/src/lib.rs +++ b/backend/libraries/stable_memory_map/src/lib.rs @@ -41,6 +41,10 @@ impl StableMemoryMap { self.map.get(&key.into()) } + pub fn contains_key(&self, key: K) -> bool { + self.map.contains_key(&key.into()) + } + pub fn insert(&mut self, key: K, value: Vec) -> Option> { self.map.insert(key.into(), value) } From e12699502c23b3577b8ed0b55fb78692a6de14d4 Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Sat, 28 Dec 2024 18:32:47 +0000 Subject: [PATCH 02/10] Update CHANGELOG --- backend/canisters/storage_bucket/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/canisters/storage_bucket/CHANGELOG.md b/backend/canisters/storage_bucket/CHANGELOG.md index fdae11b9eb..4b18955c85 100644 --- a/backend/canisters/storage_bucket/CHANGELOG.md +++ b/backend/canisters/storage_bucket/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Changed - Reduce storage bucket memory usage ([#7103](https://github.com/open-chat-labs/open-chat/pull/7103)) +- Move file metadata to stable memory ([#7120](https://github.com/open-chat-labs/open-chat/pull/7120)) ## [[2.0.1532](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1532-storage_bucket)] - 2024-12-19 From 8dc93063dbeebb1b4139c3d91f1cdda1dee2540d Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Sun, 29 Dec 2024 00:08:55 +0000 Subject: [PATCH 03/10] Set virtual memory during init/post_upgrade --- .../canisters/storage_bucket/impl/src/lifecycle/init.rs | 2 ++ .../storage_bucket/impl/src/lifecycle/post_upgrade.rs | 4 +++- backend/canisters/storage_bucket/impl/src/memory.rs | 7 ++++++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/backend/canisters/storage_bucket/impl/src/lifecycle/init.rs b/backend/canisters/storage_bucket/impl/src/lifecycle/init.rs index e487d125b7..04f9379aba 100644 --- a/backend/canisters/storage_bucket/impl/src/lifecycle/init.rs +++ b/backend/canisters/storage_bucket/impl/src/lifecycle/init.rs @@ -1,4 +1,5 @@ use crate::lifecycle::{init_env, init_state}; +use crate::memory::get_stable_memory_map_memory; use crate::Data; use canister_tracing_macros::trace; use ic_cdk::init; @@ -10,6 +11,7 @@ use utils::env::Environment; #[trace] fn init(args: Args) { canister_logger::init(args.test_mode); + stable_memory_map::init(get_stable_memory_map_memory()); let env = init_env([0; 32]); diff --git a/backend/canisters/storage_bucket/impl/src/lifecycle/post_upgrade.rs b/backend/canisters/storage_bucket/impl/src/lifecycle/post_upgrade.rs index 7b3b2c2de6..e60790d602 100644 --- a/backend/canisters/storage_bucket/impl/src/lifecycle/post_upgrade.rs +++ b/backend/canisters/storage_bucket/impl/src/lifecycle/post_upgrade.rs @@ -1,5 +1,5 @@ use crate::lifecycle::{init_env, init_state}; -use crate::memory::get_upgrades_memory; +use crate::memory::{get_stable_memory_map_memory, get_upgrades_memory}; use crate::Data; use canister_logger::LogEntry; use canister_tracing_macros::trace; @@ -11,6 +11,8 @@ use tracing::info; #[post_upgrade] #[trace] fn post_upgrade(args: Args) { + stable_memory_map::init(get_stable_memory_map_memory()); + let memory = get_upgrades_memory(); let reader = get_reader(&memory); diff --git a/backend/canisters/storage_bucket/impl/src/memory.rs b/backend/canisters/storage_bucket/impl/src/memory.rs index e8a5ec2cda..2422cf4f4e 100644 --- a/backend/canisters/storage_bucket/impl/src/memory.rs +++ b/backend/canisters/storage_bucket/impl/src/memory.rs @@ -6,6 +6,7 @@ use std::collections::BTreeMap; const UPGRADES: MemoryId = MemoryId::new(0); const BLOBS: MemoryId = MemoryId::new(1); +const STABLE_MEMORY_MAP: MemoryId = MemoryId::new(2); pub type Memory = VirtualMemory; @@ -22,8 +23,12 @@ pub fn get_blobs_memory() -> Memory { get_memory(BLOBS) } +pub fn get_stable_memory_map_memory() -> Memory { + get_memory(STABLE_MEMORY_MAP) +} + pub fn memory_sizes() -> BTreeMap { - (0u8..=1).map(|id| (id, get_memory(MemoryId::new(id)).size())).collect() + (0u8..=2).map(|id| (id, get_memory(MemoryId::new(id)).size())).collect() } fn get_memory(id: MemoryId) -> Memory { From 624b7fcd4142db748d1f8b54b62d036590e70b1d Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Sun, 29 Dec 2024 23:01:17 +0000 Subject: [PATCH 04/10] This is better --- .../storage_bucket/impl/src/model/files.rs | 194 ++++++++---------- 1 file changed, 87 insertions(+), 107 deletions(-) diff --git a/backend/canisters/storage_bucket/impl/src/model/files.rs b/backend/canisters/storage_bucket/impl/src/model/files.rs index 3309236fd3..86fbcaad05 100644 --- a/backend/canisters/storage_bucket/impl/src/model/files.rs +++ b/backend/canisters/storage_bucket/impl/src/model/files.rs @@ -19,13 +19,16 @@ mod proptests; #[derive(Serialize, Deserialize, Default)] pub struct Files { + #[deprecated] files: BTreeMap, #[serde(default)] files_stable: FilesMap, pending_files: BTreeMap, + #[deprecated] reference_counts: ReferenceCounts, #[serde(default)] reference_counts_stable: ReferenceCountsStableMap, + #[deprecated] accessors_map: AccessorsMap, #[serde(default)] accessors_map_stable: FilesPerAccessorStableMap, @@ -63,6 +66,7 @@ impl File { } impl Files { + #[allow(deprecated)] pub fn migrate_files(&mut self) -> bool { if self.files.is_empty() { return true; @@ -81,6 +85,7 @@ impl Files { self.reference_counts.counts.is_empty() } + #[allow(deprecated)] pub fn migrate_reference_counts(&mut self) -> bool { if self.reference_counts.counts.is_empty() { return true; @@ -99,6 +104,7 @@ impl Files { self.reference_counts.counts.is_empty() } + #[allow(deprecated)] pub fn migrate_accessors(&mut self) -> bool { if self.accessors_map.map.is_empty() { return true; @@ -119,6 +125,32 @@ impl Files { self.accessors_map.map.is_empty() } + #[allow(deprecated)] + fn migrate_file(&mut self, file_id: FileId) -> bool { + if let Some(file) = self.files.remove(&file_id) { + let hash = file.hash; + if let Some(count) = self.reference_counts.counts.remove(&hash) { + self.reference_counts_stable.set(hash, count); + } + self.files_stable.set(file_id, file); + true + } else { + false + } + } + + #[allow(deprecated)] + fn migrate_accessor(&mut self, accessor_id: AccessorId) -> bool { + if let Some(files) = self.accessors_map.map.remove(&accessor_id) { + for file in files { + self.accessors_map_stable.link(accessor_id, file); + } + true + } else { + false + } + } + pub fn get(&self, file_id: &FileId) -> Option { self.get_file(file_id) } @@ -244,6 +276,11 @@ impl Files { accessors: BTreeSet, now: TimestampMillis, ) -> ForwardFileResult { + self.migrate_file(file_id); + for accessor in accessors.iter() { + self.migrate_accessor(*accessor); + } + let (file, size) = match self.file_and_size(&file_id) { Some((f, s)) => (f, s), None => return ForwardFileResult::NotFound, @@ -252,11 +289,11 @@ impl Files { let hash = file.hash; let new_file_id = generate_file_id(canister_id, caller, hash, file_id_seed, now); - self.link_accessor_to_file(caller, new_file_id); + self.accessors_map_stable.link(caller, new_file_id); for accessor in accessors.iter().copied() { - self.link_accessor_to_file(accessor, new_file_id); + self.accessors_map_stable.link(accessor, new_file_id); } - self.incr_reference_count(hash); + self.reference_counts_stable.incr(hash); let meta_data = file.meta_data(); let new_file = File { @@ -267,7 +304,7 @@ impl Files { mime_type: file.mime_type, }; - self.set_file(new_file_id, new_file); + self.files_stable.set(new_file_id, new_file); ForwardFileResult::Success(FileAdded { file_id: new_file_id, hash, @@ -281,33 +318,31 @@ impl Files { } pub fn remove_accessor(&mut self, accessor_id: &AccessorId) -> Vec { + self.migrate_accessor(*accessor_id); + let mut files_removed = Vec::new(); - if let Some(file_ids) = self - .accessors_map - .remove(accessor_id) - .or_else(|| Some(self.accessors_map_stable.remove(*accessor_id).into_iter().collect())) - { - for file_id in file_ids { - let mut blob_to_delete = None; - if let Some(mut file) = self.get_file(&file_id) { - file.accessors.remove(accessor_id); - if file.accessors.is_empty() { - let delete_blob = self.decr_reference_count(file.hash) == 0; - if delete_blob { - blob_to_delete = Some(file.hash); - } - if let Some(file_removed) = self.remove_file(file_id) { - files_removed.push(file_removed); - } - } else { - self.set_file(file_id, file); + let file_ids = self.accessors_map_stable.remove(*accessor_id); + for file_id in file_ids { + self.migrate_file(file_id); + let mut blob_to_delete = None; + if let Some(mut file) = self.get_file(&file_id) { + file.accessors.remove(accessor_id); + if file.accessors.is_empty() { + let delete_blob = self.reference_counts_stable.decr(file.hash) == 0; + if delete_blob { + blob_to_delete = Some(file.hash); + } + if let Some(file_removed) = self.remove_file(file_id) { + files_removed.push(file_removed); } + } else { + self.files_stable.set(file_id, file); } + } - if let Some(blob_to_delete) = blob_to_delete { - self.remove_blob(&blob_to_delete); - } + if let Some(blob_to_delete) = blob_to_delete { + self.remove_blob(&blob_to_delete); } } @@ -315,9 +350,11 @@ impl Files { } pub fn update_owner(&mut self, file_id: &FileId, new_owner: Principal) -> bool { + self.migrate_file(*file_id); + if let Some(mut file) = self.get_file(file_id) { file.owner = new_owner; - self.set_file(*file_id, file); + self.files_stable.set(*file_id, file); true } else { false @@ -325,17 +362,19 @@ impl Files { } pub fn update_accessor_id(&mut self, old_accessor_id: AccessorId, new_accessor_id: AccessorId) { - if let Some(files) = self.accessors_map.map.remove(&old_accessor_id) { - for file_id in files.iter() { - if let Some(mut file) = self.get_file(file_id) { - if file.accessors.remove(&old_accessor_id) { - file.accessors.insert(new_accessor_id); - self.set_file(*file_id, file); - } + self.migrate_accessor(old_accessor_id); + self.migrate_accessor(new_accessor_id); + + let files = self.accessors_map_stable.remove(old_accessor_id); + for file_id in files.iter() { + self.migrate_file(*file_id); + if let Some(mut file) = self.get_file(file_id) { + if file.accessors.remove(&old_accessor_id) { + file.accessors.insert(new_accessor_id); + self.files_stable.set(*file_id, file); + self.accessors_map_stable.link(new_accessor_id, *file_id); } } - - self.accessors_map.map.insert(new_accessor_id, files); } } @@ -380,6 +419,7 @@ impl Files { } pub fn metrics(&self) -> Metrics { + #[allow(deprecated)] Metrics { file_count: (self.files.len() + self.files_stable.len()) as u64, blob_count: self.blobs.len(), @@ -387,19 +427,19 @@ impl Files { } fn insert_completed_file(&mut self, file_id: FileId, completed_file: PendingFile) { - self.link_accessor_to_file(completed_file.owner, file_id); + self.accessors_map_stable.link(completed_file.owner, file_id); for accessor in completed_file.accessors.iter().copied() { - self.link_accessor_to_file(accessor, file_id); + self.accessors_map_stable.link(accessor, file_id); } - self.incr_reference_count(completed_file.hash); + self.reference_counts_stable.incr(completed_file.hash); self.add_blob_if_not_exists(completed_file.hash, completed_file.bytes); if let Some(expiry) = completed_file.expiry { self.expiration_queue.entry(expiry).or_default().push_back(file_id); } - self.set_file( + self.files_stable.set( file_id, File { owner: completed_file.owner, @@ -411,26 +451,22 @@ impl Files { ); } + #[allow(deprecated)] fn get_file(&self, file_id: &FileId) -> Option { self.files.get(file_id).cloned().or_else(|| self.files_stable.get(file_id)) } + #[allow(deprecated)] fn contains_file(&self, file_id: &FileId) -> bool { self.files.contains_key(file_id) || self.files_stable.contains_key(file_id) } - fn set_file(&mut self, file_id: FileId, file: File) { - if let Occupied(mut e) = self.files.entry(file_id) { - e.insert(file); - } else { - self.files_stable.set(file_id, file); - } - } - fn remove_file(&mut self, file_id: FileId) -> Option { - let file = self.files.remove(&file_id).or_else(|| self.files_stable.remove(&file_id))?; + self.migrate_file(file_id); - if self.decr_reference_count(file.hash) == 0 { + let file = self.files_stable.remove(&file_id)?; + + if self.reference_counts_stable.decr(file.hash) == 0 { self.remove_blob(&file.hash); } @@ -469,31 +505,8 @@ impl Files { Some((file.clone(), size)) } - fn incr_reference_count(&mut self, hash: Hash) -> u32 { - if self.reference_counts.counts.contains_key(&hash) { - self.reference_counts.incr(hash) - } else { - self.reference_counts_stable.incr(hash) - } - } - - fn decr_reference_count(&mut self, hash: Hash) -> u32 { - if self.reference_counts.counts.contains_key(&hash) { - self.reference_counts.decr(hash) - } else { - self.reference_counts_stable.decr(hash) - } - } - - fn link_accessor_to_file(&mut self, accessor_id: AccessorId, file_id: FileId) { - if self.accessors_map.map.contains_key(&accessor_id) { - self.accessors_map.link(accessor_id, file_id); - } else { - self.accessors_map_stable.link(accessor_id, file_id); - } - } - fn unlink_accessor_from_file(&mut self, accessor_id: AccessorId, file_id: FileId) { + #[allow(deprecated)] self.accessors_map.unlink(accessor_id, &file_id); self.accessors_map_stable.unlink(accessor_id, file_id); } @@ -524,41 +537,12 @@ struct ReferenceCounts { counts: BTreeMap, } -impl ReferenceCounts { - pub fn incr(&mut self, hash: Hash) -> u32 { - *self - .counts - .entry(hash) - .and_modify(|c| { - *c += 1; - }) - .or_insert(1) - } - - pub fn decr(&mut self, hash: Hash) -> u32 { - if let Occupied(mut e) = self.counts.entry(hash) { - let count = e.get_mut(); - if *count > 1 { - *count -= 1; - return *count; - } else { - e.remove(); - } - } - 0 - } -} - #[derive(Serialize, Deserialize, Default)] struct AccessorsMap { map: BTreeMap>, } impl AccessorsMap { - pub fn link(&mut self, accessor_id: AccessorId, file_id: FileId) { - self.map.entry(accessor_id).or_default().insert(file_id); - } - pub fn unlink(&mut self, accessor_id: AccessorId, file_id: &FileId) { if let Occupied(mut e) = self.map.entry(accessor_id) { let entry = e.get_mut(); @@ -568,10 +552,6 @@ impl AccessorsMap { } } } - - pub fn remove(&mut self, accessor_id: &AccessorId) -> Option> { - self.map.remove(accessor_id) - } } #[derive(Serialize, Deserialize)] From 6b178781b809f40df5359a86643ec835a3133bfd Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Sun, 29 Dec 2024 23:58:23 +0000 Subject: [PATCH 05/10] Add tests --- .../stable_memory_map/src/keys/files.rs | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/backend/libraries/stable_memory_map/src/keys/files.rs b/backend/libraries/stable_memory_map/src/keys/files.rs index 1f41a775e0..c6b1451c18 100644 --- a/backend/libraries/stable_memory_map/src/keys/files.rs +++ b/backend/libraries/stable_memory_map/src/keys/files.rs @@ -112,3 +112,76 @@ impl FilesPerAccessorKey { FileId::from_be_bytes(self.0[start..].try_into().unwrap()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{BaseKey, Key}; + use rand::{thread_rng, Rng}; + + #[test] + fn file_id_to_file_key_e2e() { + for _ in 0..100 { + let file_id: u128 = thread_rng().gen(); + let prefix = FileIdToFileKeyPrefix::new(); + let key = BaseKey::from(prefix.create_key(&file_id)); + let file_key = FileIdToFileKey::try_from(key.clone()).unwrap(); + + assert_eq!(*file_key.0.first().unwrap(), KeyType::FileIdToFile as u8); + assert_eq!(file_key.0.len(), 17); + assert!(file_key.matches_prefix(&prefix)); + assert_eq!(file_key.file_id(), file_id); + + let serialized = msgpack::serialize_then_unwrap(&file_key); + assert_eq!(serialized.len(), file_key.0.len() + 2); + let deserialized: FileIdToFileKey = msgpack::deserialize_then_unwrap(&serialized); + assert_eq!(deserialized, file_key); + assert_eq!(deserialized.0, key.0); + } + } + + #[test] + fn file_reference_count_key_e2e() { + for _ in 0..100 { + let hash: [u8; 32] = thread_rng().gen(); + let prefix = FileReferenceCountKeyPrefix::new(); + let key = BaseKey::from(prefix.create_key(&hash)); + let reference_count_key = FileReferenceCountKey::try_from(key.clone()).unwrap(); + + assert_eq!(*reference_count_key.0.first().unwrap(), KeyType::FileReferenceCount as u8); + assert_eq!(reference_count_key.0.len(), 33); + assert!(reference_count_key.matches_prefix(&prefix)); + assert_eq!(reference_count_key.hash(), hash); + + let serialized = msgpack::serialize_then_unwrap(&reference_count_key); + assert_eq!(serialized.len(), reference_count_key.0.len() + 2); + let deserialized: FileReferenceCountKey = msgpack::deserialize_then_unwrap(&serialized); + assert_eq!(deserialized, reference_count_key); + assert_eq!(deserialized.0, key.0); + } + } + + #[test] + fn files_per_accessor_key_e2e() { + for _ in 0..100 { + let accessor_id_bytes: [u8; 10] = thread_rng().gen(); + let accessor_id = AccessorId::from_slice(&accessor_id_bytes); + let file_id: u128 = thread_rng().gen(); + let prefix = FilesPerAccessorKeyPrefix::new(); + let key = BaseKey::from(prefix.create_key(&(accessor_id, file_id))); + let member_key = FilesPerAccessorKey::try_from(key.clone()).unwrap(); + + assert_eq!(*member_key.0.first().unwrap(), KeyType::FilesPerAccessor as u8); + assert_eq!(member_key.0.len(), 27); + assert!(member_key.matches_prefix(&prefix)); + assert_eq!(member_key.accessor_id(), accessor_id); + assert_eq!(member_key.file_id(), file_id); + + let serialized = msgpack::serialize_then_unwrap(&member_key); + assert_eq!(serialized.len(), member_key.0.len() + 2); + let deserialized: FilesPerAccessorKey = msgpack::deserialize_then_unwrap(&serialized); + assert_eq!(deserialized, member_key); + assert_eq!(deserialized.0, key.0); + } + } +} From d1bea0640d4ad8ddba39b31b05e92d0f2730c5c1 Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Mon, 30 Dec 2024 13:52:36 +0000 Subject: [PATCH 06/10] Expose in metrics --- .../src/jobs/migrate_data_to_stable_map.rs | 21 ++++++++++++------- .../canisters/storage_bucket/impl/src/lib.rs | 16 +++++++++++++- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/backend/canisters/storage_bucket/impl/src/jobs/migrate_data_to_stable_map.rs b/backend/canisters/storage_bucket/impl/src/jobs/migrate_data_to_stable_map.rs index 95b31a428d..a8f2d257c3 100644 --- a/backend/canisters/storage_bucket/impl/src/jobs/migrate_data_to_stable_map.rs +++ b/backend/canisters/storage_bucket/impl/src/jobs/migrate_data_to_stable_map.rs @@ -19,13 +19,20 @@ fn run() { info!("'migrate_data_to_stable_map' job running"); mutate_state(|state| { - if state.data.files.migrate_files() - && state.data.files.migrate_reference_counts() - && state.data.files.migrate_accessors() - { - if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - info!("'migrate_data_to_stable_map' job completed"); + if state.data.files.migrate_files() { + state.data.files_migrated = true; + + if state.data.files.migrate_reference_counts() { + state.data.file_reference_counts_migrated = true; + + if state.data.files.migrate_accessors() { + state.data.files_per_accessor_migrated = true; + + if let Some(timer_id) = TIMER_ID.take() { + ic_cdk_timers::clear_timer(timer_id); + info!("'migrate_data_to_stable_map' job completed"); + } + } } } }) diff --git a/backend/canisters/storage_bucket/impl/src/lib.rs b/backend/canisters/storage_bucket/impl/src/lib.rs index 7b2bc445d4..83ad2b7084 100644 --- a/backend/canisters/storage_bucket/impl/src/lib.rs +++ b/backend/canisters/storage_bucket/impl/src/lib.rs @@ -69,6 +69,9 @@ impl RuntimeState { index_sync_queue_length: self.data.index_event_sync_queue.len() as u32, freezing_limit: self.data.freezing_limit.value.unwrap_or_default(), stable_memory_sizes: memory::memory_sizes(), + files_migrated: self.data.files_migrated, + file_reference_counts_migrated: self.data.file_reference_counts_migrated, + files_per_accessor_migrated: self.data.files_per_accessor_migrated, } } } @@ -83,11 +86,16 @@ struct Data { freezing_limit: Timestamped>, rng_seed: [u8; 32], test_mode: bool, + #[serde(default)] + files_migrated: bool, + #[serde(default)] + file_reference_counts_migrated: bool, + #[serde(default)] + files_per_accessor_migrated: bool, } impl Data { pub fn new(storage_index_canister_id: CanisterId, now: TimestampMillis, test_mode: bool) -> Data { - #[allow(deprecated)] Data { storage_index_canister_id, users: Users::default(), @@ -97,6 +105,9 @@ impl Data { freezing_limit: Timestamped::default(), rng_seed: [0; 32], test_mode, + files_migrated: true, + file_reference_counts_migrated: true, + files_per_accessor_migrated: true, } } @@ -130,6 +141,9 @@ pub struct Metrics { pub index_sync_queue_length: u32, pub freezing_limit: Cycles, pub stable_memory_sizes: BTreeMap, + pub files_migrated: bool, + pub file_reference_counts_migrated: bool, + pub files_per_accessor_migrated: bool, } pub fn calc_chunk_count(chunk_size: u32, total_size: u64) -> u32 { From f065473580d64a200a807c37df2850d17fa75d34 Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Mon, 30 Dec 2024 14:26:34 +0000 Subject: [PATCH 07/10] Fix return value --- backend/canisters/storage_bucket/impl/src/model/files.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/canisters/storage_bucket/impl/src/model/files.rs b/backend/canisters/storage_bucket/impl/src/model/files.rs index 86fbcaad05..ca6bdb4202 100644 --- a/backend/canisters/storage_bucket/impl/src/model/files.rs +++ b/backend/canisters/storage_bucket/impl/src/model/files.rs @@ -82,7 +82,7 @@ impl Files { } } info!(keys, "Migrated files to stable map"); - self.reference_counts.counts.is_empty() + self.files.is_empty() } #[allow(deprecated)] From 0a4ff7c2dc2b292eff4dc31687ba67cd7da92909 Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Mon, 30 Dec 2024 14:31:01 +0000 Subject: [PATCH 08/10] Fix file count --- backend/canisters/storage_bucket/impl/src/model/files_map.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/canisters/storage_bucket/impl/src/model/files_map.rs b/backend/canisters/storage_bucket/impl/src/model/files_map.rs index 1c4a6beadf..6051ee470c 100644 --- a/backend/canisters/storage_bucket/impl/src/model/files_map.rs +++ b/backend/canisters/storage_bucket/impl/src/model/files_map.rs @@ -20,7 +20,7 @@ impl FilesMap { pub fn set(&mut self, file_id: FileId, file: File) { if with_map_mut(|m| m.insert(self.prefix.create_key(&file_id), file_to_bytes(file))).is_none() { - self.len = self.len.saturating_sub(1); + self.len = self.len.saturating_add(1); } } From 5b987cd06c604f8eeb9ca3c83f3d717029e67677 Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Mon, 30 Dec 2024 16:23:51 +0000 Subject: [PATCH 09/10] Fix proptests by resetting stable memory on each run --- .../storage_bucket/impl/src/model/files.rs | 10 +++++++++ .../impl/src/model/files/proptests.rs | 22 +++++++++---------- .../impl/src/model/stable_blob_storage.rs | 8 +++++++ 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/backend/canisters/storage_bucket/impl/src/model/files.rs b/backend/canisters/storage_bucket/impl/src/model/files.rs index ca6bdb4202..bae6a74464 100644 --- a/backend/canisters/storage_bucket/impl/src/model/files.rs +++ b/backend/canisters/storage_bucket/impl/src/model/files.rs @@ -1,3 +1,4 @@ +use crate::memory::Memory; use crate::model::files_map::FilesMap; use crate::model::files_per_accessor_map::FilesPerAccessorStableMap; use crate::model::reference_counts::ReferenceCountsStableMap; @@ -511,10 +512,19 @@ impl Files { self.accessors_map_stable.unlink(accessor_id, file_id); } + #[cfg(test)] + pub fn new_with_blobs_memory(memory: Memory) -> Files { + Files { + blobs: StableBlobStorage::init_with_memory(memory), + ..Default::default() + } + } + #[cfg(test)] fn check_invariants(&self) { let files = self.files_stable.get_all(); + assert!(!files.is_empty()); assert_eq!(files.len(), self.files_stable.len()); let mut files_per_accessor: BTreeMap> = BTreeMap::new(); diff --git a/backend/canisters/storage_bucket/impl/src/model/files/proptests.rs b/backend/canisters/storage_bucket/impl/src/model/files/proptests.rs index ae94afb269..8082fe6248 100644 --- a/backend/canisters/storage_bucket/impl/src/model/files/proptests.rs +++ b/backend/canisters/storage_bucket/impl/src/model/files/proptests.rs @@ -6,7 +6,8 @@ use proptest::collection::vec as pvec; use proptest::prelude::*; use proptest::prop_oneof; use test_strategy::proptest; -use types::{AccessorId, CanisterId, FileId, Hash, TimestampMillis}; +use types::{AccessorId, CanisterId, FileId, TimestampMillis}; +use utils::hasher::hash_bytes; #[derive(Debug, Clone)] enum Operation { @@ -44,11 +45,11 @@ fn operation_strategy() -> impl Strategy { } #[proptest(cases = 10)] -fn comprehensive(#[strategy(pvec(operation_strategy(), 1_000..5_000))] ops: Vec) { +fn comprehensive(#[strategy(pvec(operation_strategy(), 100..1_000))] ops: Vec) { let memory = MemoryManager::init(DefaultMemoryImpl::default()); - stable_memory_map::init(memory.get(MemoryId::new(1))); + stable_memory_map::init(memory.get(MemoryId::new(2))); - let mut files = Files::default(); + let mut files = Files::new_with_blobs_memory(memory.get(MemoryId::new(1))); let mut file_ids = Vec::new(); @@ -68,16 +69,17 @@ fn comprehensive(#[strategy(pvec(operation_strategy(), 1_000..5_000))] ops: Vec< fn execute_operation(files: &mut Files, op: Operation, timestamp: TimestampMillis, file_ids: &mut [(Principal, FileId)]) { match op { Operation::Add { owner, file_id } => { + let bytes = file_bytes(file_id); files.put_chunk(PutChunkArgs { owner, file_id, - hash: hash(file_id), + hash: hash_bytes(&bytes), mime_type: "".to_string(), accessors: vec![owner], chunk_index: 0, chunk_size: 1, - total_size: 1, - bytes: vec![1], + total_size: bytes.len() as u64, + bytes, expiry: None, now: timestamp, }); @@ -114,10 +116,8 @@ fn execute_operation(files: &mut Files, op: Operation, timestamp: TimestampMilli }; } -fn hash(file_id: FileId) -> Hash { - let mut bytes = [0u8; 32]; - bytes[0] = (file_id % 100) as u8; - bytes +fn file_bytes(file_id: FileId) -> Vec { + vec![file_id as u8] } fn principal(index: usize) -> Principal { diff --git a/backend/canisters/storage_bucket/impl/src/model/stable_blob_storage.rs b/backend/canisters/storage_bucket/impl/src/model/stable_blob_storage.rs index 496437eea3..f26c2788bc 100644 --- a/backend/canisters/storage_bucket/impl/src/model/stable_blob_storage.rs +++ b/backend/canisters/storage_bucket/impl/src/model/stable_blob_storage.rs @@ -78,6 +78,14 @@ impl StableBlobStorage { Some([first].into_iter().chain(iter)) } + + #[cfg(test)] + pub fn init_with_memory(memory: Memory) -> StableBlobStorage { + StableBlobStorage { + blobs: StableBTreeMap::init(memory), + count: 0, + } + } } fn init_blobs() -> StableBTreeMap { From 22aea1db480921f6e76b04371607ead5bb08641d Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Thu, 2 Jan 2025 08:11:07 +0000 Subject: [PATCH 10/10] Tiny bit better --- backend/canisters/storage_bucket/impl/src/model/files.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/backend/canisters/storage_bucket/impl/src/model/files.rs b/backend/canisters/storage_bucket/impl/src/model/files.rs index bae6a74464..56433029ff 100644 --- a/backend/canisters/storage_bucket/impl/src/model/files.rs +++ b/backend/canisters/storage_bucket/impl/src/model/files.rs @@ -483,10 +483,7 @@ impl Files { fn add_blob_if_not_exists(&mut self, hash: Hash, bytes: Vec) { if !self.blobs.exists(&hash) { - self.bytes_used = self - .bytes_used - .checked_add(bytes.len() as u64) - .expect("'bytes_used' overflowed"); + self.bytes_used = self.bytes_used.saturating_add(bytes.len() as u64); self.blobs.insert(hash, bytes); } @@ -495,7 +492,7 @@ impl Files { fn remove_blob(&mut self, hash: &Hash) { if let Some(size) = self.blobs.data_size(hash) { self.blobs.remove(hash); - self.bytes_used = self.bytes_used.checked_sub(size).expect("'bytes used' underflowed"); + self.bytes_used = self.bytes_used.saturating_sub(size); } }