Skip to content

Commit

Permalink
rename asset_content to last_chunk, make it always allowed
Browse files Browse the repository at this point in the history
  • Loading branch information
sesi200 committed Oct 21, 2024
1 parent 4d1d793 commit aa850aa
Show file tree
Hide file tree
Showing 15 changed files with 108 additions and 165 deletions.
7 changes: 3 additions & 4 deletions docs/design/asset-canister-interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -483,15 +483,14 @@ type SetAssetContentArguments = record {
key: Key;
content_encoding: text;
chunk_ids: vec ChunkId;
asset_content: opt blob;
last_chunk: opt blob;
sha256: opt blob;
};
```

This operation adds or changes a single content encoding for an asset. It also updates the modification time of the content encoding.
The content of the encoding can be specified with either `chunk_ids` with previously uploaded data,
or if `chunk_ids` contains no ids, then `asset_content` may contain the encoding's content directly.
If both `chunk_ids` and `asset_content` contains data, then `asset_content` is ignored.
The content of the encoding can be specified with `chunk_ids` and `last_chunk`.
If `last_chunk` is not `null`, then its content is used as the last chunk of the encoding.

If `sha256` is not passed, the asset canister will compute the hash of the content.

Expand Down
63 changes: 22 additions & 41 deletions src/canisters/frontend/ic-asset/src/batch_upload/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::canister_api::types::batch_upload::common::{
UnsetAssetContentArguments,
};
use crate::canister_api::types::batch_upload::v1::{BatchOperationKind, CommitBatchArguments};
use crate::error::{AssembleCommitBatchArgumentError, SetEncodingError};
use candid::Nat;
use std::collections::HashMap;

Expand All @@ -20,7 +21,7 @@ pub(crate) async fn assemble_batch_operations(
canister_assets: HashMap<String, AssetDetails>,
asset_deletion_reason: AssetDeletionReason,
canister_asset_properties: HashMap<String, AssetProperties>,
) -> Result<Vec<BatchOperationKind>, String> {
) -> Result<Vec<BatchOperationKind>, AssembleCommitBatchArgumentError> {
let mut canister_assets = canister_assets;

let mut operations = vec![];
Expand All @@ -33,7 +34,9 @@ pub(crate) async fn assemble_batch_operations(
);
create_new_assets(&mut operations, project_assets, &canister_assets);
unset_obsolete_encodings(&mut operations, project_assets, &canister_assets);
set_encodings(&mut operations, chunk_uploader, project_assets).await?;
set_encodings(&mut operations, chunk_uploader, project_assets)
.await
.map_err(AssembleCommitBatchArgumentError::SetEncodingFailed)?;
update_properties(&mut operations, project_assets, &canister_asset_properties);

Ok(operations)
Expand All @@ -46,7 +49,7 @@ pub(crate) async fn assemble_commit_batch_arguments(
asset_deletion_reason: AssetDeletionReason,
canister_asset_properties: HashMap<String, AssetProperties>,
batch_id: Nat,
) -> Result<CommitBatchArguments, String> {
) -> Result<CommitBatchArguments, AssembleCommitBatchArgumentError> {
let operations = assemble_batch_operations(
Some(chunk_uploader),
&project_assets,
Expand Down Expand Up @@ -163,51 +166,29 @@ pub(crate) async fn set_encodings(
operations: &mut Vec<BatchOperationKind>,
chunk_uploader: Option<&ChunkUploader<'_>>,
project_assets: &HashMap<String, ProjectAsset>,
) -> Result<(), String> {
) -> Result<(), SetEncodingError> {
for (key, project_asset) in project_assets {
for (content_encoding, v) in &project_asset.encodings {
if v.already_in_place {
continue;
}
if let Some(uploader) = chunk_uploader {
match uploader
.uploader_ids_to_canister_chunk_ids(&v.uploader_chunk_ids)
.await
{
super::plumbing::UploaderIdMapping::Error(err) => return Err(err),
super::plumbing::UploaderIdMapping::CanisterChunkIds(chunk_ids) => operations
.push(BatchOperationKind::SetAssetContent(
SetAssetContentArguments {
key: key.clone(),
content_encoding: content_encoding.clone(),
chunk_ids,
asset_content: None,
sha256: Some(v.sha256.clone()),
},
)),
super::plumbing::UploaderIdMapping::IncludeChunksDirectly(asset_content) => {
operations.push(BatchOperationKind::SetAssetContent(
SetAssetContentArguments {
key: key.clone(),
content_encoding: content_encoding.clone(),
chunk_ids: vec![],
asset_content: Some(asset_content.concat()),
sha256: Some(v.sha256.clone()),
},
))
}
let (chunk_ids, last_chunk) = match chunk_uploader {
Some(uploader) => {
uploader
.uploader_ids_to_canister_chunk_ids(&v.uploader_chunk_ids)
.await?
}
} else {
operations.push(BatchOperationKind::SetAssetContent(
SetAssetContentArguments {
key: key.clone(),
content_encoding: content_encoding.clone(),
chunk_ids: vec![],
asset_content: None,
sha256: Some(v.sha256.clone()),
},
));
None => (vec![], None),
};
operations.push(BatchOperationKind::SetAssetContent(
SetAssetContentArguments {
key: key.clone(),
content_encoding: content_encoding.clone(),
chunk_ids,
last_chunk,
sha256: Some(v.sha256.clone()),
},
));
}
}
Ok(())
Expand Down
96 changes: 24 additions & 72 deletions src/canisters/frontend/ic-asset/src/batch_upload/plumbing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::error::CreateChunkError;
use crate::error::CreateEncodingError;
use crate::error::CreateEncodingError::EncodeContentFailed;
use crate::error::CreateProjectAssetError;
use crate::error::SetEncodingError;
use candid::Nat;
use futures::future::try_join_all;
use futures::TryFutureExt;
Expand Down Expand Up @@ -49,22 +50,6 @@ pub(crate) struct ProjectAsset {
pub(crate) encodings: HashMap<String, ProjectAssetEncoding>,
}

enum UploaderState {
Uploading,
/// Uploader has uploaded chunks - commit will reference chunk ids to specify asset content
FinalizedWithUploads,
/// Uploader has not uploaded chunks - commit will contain asset content directly
FinalizedWithoutUploads,
}

pub(crate) enum UploaderIdMapping {
Error(String),
/// Chunks are uploaded to the canister with these ids
CanisterChunkIds(Vec<Nat>),
/// Chunks are not uploaded and should be included in the SetAssetContent operations directly
IncludeChunksDirectly(Vec<Vec<u8>>),
}

type IdMapping = BTreeMap<usize, Nat>;
type UploadQueue = Vec<(usize, Vec<u8>)>;
pub(crate) struct ChunkUploader<'agent> {
Expand All @@ -76,7 +61,6 @@ pub(crate) struct ChunkUploader<'agent> {
// maps uploader_chunk_id to canister_chunk_id
id_mapping: Arc<Mutex<IdMapping>>,
upload_queue: Arc<Mutex<UploadQueue>>,
uploader_state: Arc<Mutex<UploaderState>>,
}

impl<'agent> ChunkUploader<'agent> {
Expand All @@ -89,7 +73,6 @@ impl<'agent> ChunkUploader<'agent> {
bytes: Arc::new(AtomicUsize::new(0)),
id_mapping: Arc::new(Mutex::new(BTreeMap::new())),
upload_queue: Arc::new(Mutex::new(vec![])),
uploader_state: Arc::new(Mutex::new(UploaderState::Uploading)),
}
}

Expand Down Expand Up @@ -127,24 +110,8 @@ impl<'agent> ChunkUploader<'agent> {
&self,
semaphores: &Semaphores,
) -> Result<(), CreateChunkError> {
let queue = self.upload_queue.lock().await;
let mut uploader_state = self.uploader_state.lock().await;

// Can skip upload if every chunk submitted for uploading is still in the queue.
// Additionally, chunks in the queue are small enough that there is plenty of space in the commit message to include all of them.
let skip_upload = queue.len() == self.chunks.fetch_add(0, Ordering::SeqCst)
&& queue.iter().map(|(_, chunk)| chunk.len()).sum::<usize>() < MAX_CHUNK_SIZE / 2;
drop(queue);
// Potential for further improvement: unconditional upload_chunks(MAX_CHUNK_SIZE / 2, usize::MAX, semaphores)
// Then allow mix of uploaded chunks and asset content that is part of the commit args.

if skip_upload {
*uploader_state = UploaderState::FinalizedWithoutUploads;
} else {
self.upload_chunks(0, 0, semaphores).await?;
*uploader_state = UploaderState::FinalizedWithUploads;
}
Ok(())
self.upload_chunks(MAX_CHUNK_SIZE / 2, usize::MAX, semaphores)
.await
}

pub(crate) fn bytes(&self) -> usize {
Expand All @@ -154,48 +121,33 @@ impl<'agent> ChunkUploader<'agent> {
self.chunks.load(Ordering::SeqCst)
}

/// Call only after `finalize_upload` has completed
/// Call only after `finalize_upload` has completed.
/// Returns `(chunk_ids, Option<last_chunk>)`
pub(crate) async fn uploader_ids_to_canister_chunk_ids(
&self,
uploader_ids: &[usize],
) -> UploaderIdMapping {
let uploader_state = self.uploader_state.lock().await;
match *uploader_state {
UploaderState::Uploading => UploaderIdMapping::Error(
"Bug: Tried to map uploader ids to canister ids before finalizing".to_string(),
),
UploaderState::FinalizedWithUploads => {
let mapping = self.id_mapping.lock().await;
let ids = uploader_ids
.iter()
.map(|id| {
mapping
.get(id)
.expect("Chunk uploader did not upload all chunks but is not aware of it. This is a bug.")
.clone()
})
.collect();
UploaderIdMapping::CanisterChunkIds(ids)
}
UploaderState::FinalizedWithoutUploads => {
let queue = self.upload_queue.lock().await;
match uploader_ids
) -> Result<(Vec<Nat>, Option<Vec<u8>>), SetEncodingError> {
let mut chunk_ids = vec![];
let mut last_chunk: Option<Vec<u8>> = None;
let mapping = self.id_mapping.lock().await;
let queue = self.upload_queue.lock().await;
for uploader_id in uploader_ids {
if let Some(item) = mapping.get(uploader_id) {
chunk_ids.push(item.clone());
} else if let Some(last_chunk_data) =
queue
.iter()
.map(|uploader_id| {
queue.iter().find_map(|(id, content)| {
if id == uploader_id {
Some(content.clone())
} else {
None
}
}).ok_or_else(|| format!("Chunk uploader does not have a chunk with uploader id {uploader_id}. This is a bug."))
})
.collect() {
Ok(asset_content) => UploaderIdMapping::IncludeChunksDirectly(asset_content),
Err(err) => UploaderIdMapping::Error(err)
}
.find_map(|(id, data)| if id == uploader_id { Some(data) } else { None })
{
match last_chunk.as_mut() {
Some(existing_data) => existing_data.extend(last_chunk_data.iter()),
None => last_chunk = Some(last_chunk_data.clone()),
}
} else {
return Err(SetEncodingError::UnknownUploaderChunkId(*uploader_id));
}
}
Ok((chunk_ids, last_chunk))
}

async fn add_to_upload_queue(&self, uploader_chunk_id: usize, contents: &[u8]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ pub struct SetAssetContentArguments {
pub key: String,
/// The content encoding for which this content applies
pub content_encoding: String,
/// The chunks to assign to this content
/// The chunks to assign to this content encoding
pub chunk_ids: Vec<Nat>,
/// If no chunks are assigned to this asset, then `asset_content` is the asset content
pub asset_content: Option<Vec<u8>>,
/// Appends this chunk to the data supplied in `chunk_ids`
pub last_chunk: Option<Vec<u8>>,
/// The sha256 of the entire content
pub sha256: Option<Vec<u8>>,
}
Expand Down
10 changes: 6 additions & 4 deletions src/canisters/frontend/ic-asset/src/error/compute_evidence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ use crate::error::hash_content::HashContentError;
use ic_agent::AgentError;
use thiserror::Error;

use super::AssembleCommitBatchArgumentError;

/// Errors related to computing evidence for a proposed update.
#[derive(Error, Debug)]
pub enum ComputeEvidenceError {
/// Failed when assembling commit_batch argument.
#[error(transparent)]
AssembleCommitBatchArgumentFailed(#[from] AssembleCommitBatchArgumentError),

/// Failed when inspecting assets to be updated.
#[error(transparent)]
ProcessProjectAsset(#[from] CreateProjectAssetError),
Expand All @@ -20,10 +26,6 @@ pub enum ComputeEvidenceError {
#[error(transparent)]
GetAssetProperties(#[from] GetAssetPropertiesError),

/// Failed when assembling commit_batch argument.
#[error("Failed to assemble commit_batch argument: {0}")]
AssembleCommitBatchArgumentError(String),

/// Failed when computing hashes of asset content.
#[error(transparent)]
HashContent(#[from] HashContentError),
Expand Down
4 changes: 4 additions & 0 deletions src/canisters/frontend/ic-asset/src/error/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Error types
mod assemble_commit_batch_argument;
mod compatibility;
mod compute_evidence;
mod create_chunk;
Expand All @@ -13,10 +14,12 @@ mod hash_content;
mod load_config;
mod load_rule;
mod prepare_sync_for_proposal;
mod set_encoding;
mod sync;
mod upload;
mod upload_content;

pub use assemble_commit_batch_argument::AssembleCommitBatchArgumentError;
pub use compatibility::CompatibilityError;
pub use compute_evidence::ComputeEvidenceError;
pub use create_chunk::CreateChunkError;
Expand All @@ -30,6 +33,7 @@ pub use hash_content::HashContentError;
pub use load_config::AssetLoadConfigError;
pub use load_rule::LoadRuleError;
pub use prepare_sync_for_proposal::PrepareSyncForProposalError;
pub use set_encoding::SetEncodingError;
pub use sync::SyncError;
pub use upload::UploadError;
pub use upload_content::UploadContentError;
3 changes: 2 additions & 1 deletion src/canisters/frontend/ic-asset/src/error/upload.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::AssembleCommitBatchArgumentError;
use crate::error::compatibility::CompatibilityError;
use crate::error::create_project_asset::CreateProjectAssetError;
use ic_agent::AgentError;
Expand All @@ -20,7 +21,7 @@ pub enum UploadError {

/// Failed when assembling commit_batch argument.
#[error("Failed to assemble commit_batch argument: {0}")]
AssembleCommitBatchArgumentError(String),
AssembleCommitBatchArgumentFailed(#[from] AssembleCommitBatchArgumentError),

/// Failed when creating project assets.
#[error("Failed to create project asset: {0}")]
Expand Down
10 changes: 6 additions & 4 deletions src/canisters/frontend/ic-asset/src/error/upload_content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@ use crate::error::get_asset_properties::GetAssetPropertiesError;
use ic_agent::AgentError;
use thiserror::Error;

use super::AssembleCommitBatchArgumentError;

/// Errors related to uploading content to the asset canister.
#[derive(Error, Debug)]
pub enum UploadContentError {
/// Failed when assembling commit_batch argument.
#[error("Failed to assemble commit_batch argument: {0}")]
AssembleCommitBatchArgumentFailed(AssembleCommitBatchArgumentError),

/// Failed when calling create_batch.
#[error("Failed to create batch: {0}")]
CreateBatchFailed(AgentError),
Expand All @@ -15,10 +21,6 @@ pub enum UploadContentError {
#[error("Failed to create project asset: {0}")]
CreateProjectAssetError(#[from] CreateProjectAssetError),

/// Failed when assembling commit_batch argument.
#[error("Failed to assemble commit_batch argument: {0}")]
AssembleCommitBatchArgumentError(String),

/// Failed when building list of assets to synchronize.
#[error("Failed to gather asset descriptors: {0}")]
GatherAssetDescriptorsFailed(#[from] GatherAssetDescriptorsError),
Expand Down
2 changes: 1 addition & 1 deletion src/canisters/frontend/ic-asset/src/evidence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub async fn compute_evidence(
canister_asset_properties,
)
.await
.map_err(ComputeEvidenceError::AssembleCommitBatchArgumentError)?;
.map_err(ComputeEvidenceError::AssembleCommitBatchArgumentFailed)?;
operations.sort();

let mut sha = Sha256::new();
Expand Down
2 changes: 1 addition & 1 deletion src/canisters/frontend/ic-asset/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub async fn upload_content_and_assemble_sync_operations(
batch_id,
)
.await
.map_err(UploadContentError::AssembleCommitBatchArgumentError)?;
.map_err(UploadContentError::AssembleCommitBatchArgumentFailed)?;

// -v
debug!(
Expand Down
Loading

0 comments on commit aa850aa

Please sign in to comment.