Skip to content

Commit

Permalink
feat(asset-canister): upload and commit in same message
Browse files Browse the repository at this point in the history
  • Loading branch information
sesi200 committed Oct 18, 2024
1 parent 064e4ff commit b9edddf
Show file tree
Hide file tree
Showing 15 changed files with 178 additions and 42 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ Valid settings are:
### feat: batch upload assets

The frontend canister sync now tries to batch multiple small content chunks into a single call using the `create_chunks` method added earlier.
And for small amounts of uploaded data the asset sync can now skip chunk creation entirely.
This should lead to significantly faster upload times for frontends with many small files.

## Dependencies

### Frontend canister

`SetAssetContentArguments` has a new field `asset_content: opt blob` which can be used instead of `chunk_ids` so that small assets can be uploaded as part of `commit_batch`,
skipping the need to await a separate `create_chunk` call.

Bumped `api_version` to `2` for the previous addition of `create_chunks` since the improved file sync relies on it.

- Module hash: 9e4485d4358dd910aebcc025843547d05604cf28c6dc7c2cc2f8c76d083112e8
- Module hash: 17d3312f8513260fb3350095c34d7a141db47b488a103d2d7a7acd7c92a72252
- https://github.com/dfinity/sdk/pull/3947

# 0.24.1
Expand Down
63 changes: 44 additions & 19 deletions src/canisters/frontend/ic-asset/src/batch_upload/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub(crate) async fn assemble_batch_operations(
canister_assets: HashMap<String, AssetDetails>,
asset_deletion_reason: AssetDeletionReason,
canister_asset_properties: HashMap<String, AssetProperties>,
) -> Vec<BatchOperationKind> {
) -> Result<Vec<BatchOperationKind>, String> {
let mut canister_assets = canister_assets;

let mut operations = vec![];
Expand All @@ -33,10 +33,10 @@ pub(crate) async fn assemble_batch_operations(
);
create_new_assets(&mut operations, project_assets, &canister_assets);
unset_obsolete_encodings(&mut operations, project_assets, &canister_assets);
set_encodings(&mut operations, chunk_uploader, project_assets).await;
set_encodings(&mut operations, chunk_uploader, project_assets).await?;
update_properties(&mut operations, project_assets, &canister_asset_properties);

operations
Ok(operations)
}

pub(crate) async fn assemble_commit_batch_arguments(
Expand All @@ -46,19 +46,19 @@ pub(crate) async fn assemble_commit_batch_arguments(
asset_deletion_reason: AssetDeletionReason,
canister_asset_properties: HashMap<String, AssetProperties>,
batch_id: Nat,
) -> CommitBatchArguments {
) -> Result<CommitBatchArguments, String> {
let operations = assemble_batch_operations(
Some(chunk_uploader),
&project_assets,
canister_assets,
asset_deletion_reason,
canister_asset_properties,
)
.await;
CommitBatchArguments {
.await?;
Ok(CommitBatchArguments {
operations,
batch_id,
}
})
}

pub(crate) enum AssetDeletionReason {
Expand Down Expand Up @@ -163,29 +163,54 @@ pub(crate) async fn set_encodings(
operations: &mut Vec<BatchOperationKind>,
chunk_uploader: Option<&ChunkUploader<'_>>,
project_assets: &HashMap<String, ProjectAsset>,
) {
) -> Result<(), String> {
for (key, project_asset) in project_assets {
for (content_encoding, v) in &project_asset.encodings {
if v.already_in_place {
continue;
}
let chunk_ids = if let Some(uploader) = chunk_uploader {
uploader
if let Some(uploader) = chunk_uploader {
match uploader
.uploader_ids_to_canister_chunk_ids(&v.uploader_chunk_ids)
.await
{
super::plumbing::UploaderIdMapping::Error(err) => return Err(err),
super::plumbing::UploaderIdMapping::CanisterChunkIds(chunk_ids) => operations
.push(BatchOperationKind::SetAssetContent(
SetAssetContentArguments {
key: key.clone(),
content_encoding: content_encoding.clone(),
chunk_ids,
asset_content: None,
sha256: Some(v.sha256.clone()),
},
)),
super::plumbing::UploaderIdMapping::IncludeChunksDirectly(asset_content) => {
operations.push(BatchOperationKind::SetAssetContent(
SetAssetContentArguments {
key: key.clone(),
content_encoding: content_encoding.clone(),
chunk_ids: vec![],
asset_content: Some(asset_content.concat()),
sha256: Some(v.sha256.clone()),
},
))
}
}
} else {
vec![]
operations.push(BatchOperationKind::SetAssetContent(
SetAssetContentArguments {
key: key.clone(),
content_encoding: content_encoding.clone(),
chunk_ids: vec![],
asset_content: None,
sha256: Some(v.sha256.clone()),
},
));
};
operations.push(BatchOperationKind::SetAssetContent(
SetAssetContentArguments {
key: key.clone(),
content_encoding: content_encoding.clone(),
chunk_ids,
sha256: Some(v.sha256.clone()),
},
));
}
}
Ok(())
}

pub(crate) fn update_properties(
Expand Down
87 changes: 74 additions & 13 deletions src/canisters/frontend/ic-asset/src/batch_upload/plumbing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ pub(crate) struct ProjectAsset {
pub(crate) encodings: HashMap<String, ProjectAssetEncoding>,
}

enum UploaderState {
Uploading,
/// Uploader has uploaded chunks - commit will reference chunk ids to specify asset content
FinalizedWithUploads,
/// Uploader has not uploaded chunks - commit will contain asset content directly
FinalizedWithoutUploads,
}

pub(crate) enum UploaderIdMapping {
Error(String),
/// Chunks are uploaded to the canister with these ids
CanisterChunkIds(Vec<Nat>),
/// Chunks are not uploaded and should be included in the SetAssetContent operations directly
IncludeChunksDirectly(Vec<Vec<u8>>),
}

type IdMapping = BTreeMap<usize, Nat>;
type UploadQueue = Vec<(usize, Vec<u8>)>;
pub(crate) struct ChunkUploader<'agent> {
Expand All @@ -60,6 +76,7 @@ pub(crate) struct ChunkUploader<'agent> {
// maps uploader_chunk_id to canister_chunk_id
id_mapping: Arc<Mutex<IdMapping>>,
upload_queue: Arc<Mutex<UploadQueue>>,
uploader_state: Arc<Mutex<UploaderState>>,
}

impl<'agent> ChunkUploader<'agent> {
Expand All @@ -72,6 +89,7 @@ impl<'agent> ChunkUploader<'agent> {
bytes: Arc::new(AtomicUsize::new(0)),
id_mapping: Arc::new(Mutex::new(BTreeMap::new())),
upload_queue: Arc::new(Mutex::new(vec![])),
uploader_state: Arc::new(Mutex::new(UploaderState::Uploading)),
}
}

Expand Down Expand Up @@ -109,7 +127,23 @@ impl<'agent> ChunkUploader<'agent> {
&self,
semaphores: &Semaphores,
) -> Result<(), CreateChunkError> {
self.upload_chunks(0, 0, semaphores).await?;
let queue = self.upload_queue.lock().await;
let mut uploader_state = self.uploader_state.lock().await;

// Can skip upload if every chunk submitted for uploading is still in the queue.
// Additionally, chunks in the queue are small enough that there is plenty of space in the commit message to include all of them.
let skip_upload = queue.len() == self.chunks.fetch_add(0, Ordering::SeqCst)
&& queue.iter().map(|(_, chunk)| chunk.len()).sum::<usize>() < MAX_CHUNK_SIZE / 2;
drop(queue);
// Potential for further improvement: unconditional upload_chunks(MAX_CHUNK_SIZE / 2, usize::MAX, semaphores)
// Then allow mix of uploaded chunks and asset content that is part of the commit args.

if skip_upload {
*uploader_state = UploaderState::FinalizedWithoutUploads;
} else {
self.upload_chunks(0, 0, semaphores).await?;
*uploader_state = UploaderState::FinalizedWithUploads;
}
Ok(())
}

Expand All @@ -124,17 +158,44 @@ impl<'agent> ChunkUploader<'agent> {
pub(crate) async fn uploader_ids_to_canister_chunk_ids(
&self,
uploader_ids: &[usize],
) -> Vec<Nat> {
let mapping = self.id_mapping.lock().await;
uploader_ids
.iter()
.map(|id| {
mapping
.get(id)
.expect("Chunk uploader did not upload all chunks. This is a bug.")
.clone()
})
.collect()
) -> UploaderIdMapping {
let uploader_state = self.uploader_state.lock().await;
match *uploader_state {
UploaderState::Uploading => UploaderIdMapping::Error(
"Bug: Tried to map uploader ids to canister ids before finalizing".to_string(),
),
UploaderState::FinalizedWithUploads => {
let mapping = self.id_mapping.lock().await;
let ids = uploader_ids
.iter()
.map(|id| {
mapping
.get(id)
.expect("Chunk uploader did not upload all chunks but is not aware of it. This is a bug.")
.clone()
})
.collect();
UploaderIdMapping::CanisterChunkIds(ids)
}
UploaderState::FinalizedWithoutUploads => {
let queue = self.upload_queue.lock().await;
match uploader_ids
.iter()
.map(|uploader_id| {
queue.iter().find_map(|(id, content)| {
if id == uploader_id {
Some(content.clone())
} else {
None
}
}).ok_or_else(|| format!("Chunk uploader does not have a chunk with uploader id {uploader_id}. This is a bug."))
})
.collect() {
Ok(asset_content) => UploaderIdMapping::IncludeChunksDirectly(asset_content),
Err(err) => UploaderIdMapping::Error(err)
}
}
}
}

async fn add_to_upload_queue(&self, uploader_chunk_id: usize, contents: &[u8]) {
Expand Down Expand Up @@ -411,7 +472,7 @@ pub(crate) async fn make_project_assets(
.iter()
.map(|loc| {
make_project_asset(
chunk_upload_target,
chunk_upload_target.as_deref(),
loc.clone(),
canister_assets,
&semaphores,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub struct SetAssetContentArguments {
pub content_encoding: String,
/// The chunks to assign to this content
pub chunk_ids: Vec<Nat>,
/// If no chunks are assigned to this asset, then `asset_content` is the asset content
pub asset_content: Option<Vec<u8>>,
/// The sha256 of the entire content
pub sha256: Option<Vec<u8>>,
}
Expand Down
4 changes: 4 additions & 0 deletions src/canisters/frontend/ic-asset/src/error/compute_evidence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ pub enum ComputeEvidenceError {
#[error(transparent)]
GetAssetProperties(#[from] GetAssetPropertiesError),

/// Failed when assembling commit_batch argument.
#[error("Failed to assemble commit_batch argument: {0}")]
AssembleCommitBatchArgumentError(String),

/// Failed when computing hashes of asset content.
#[error(transparent)]
HashContent(#[from] HashContentError),
Expand Down
4 changes: 4 additions & 0 deletions src/canisters/frontend/ic-asset/src/error/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ pub enum UploadError {
#[error("Create batch failed: {0}")]
CreateBatchFailed(AgentError),

/// Failed when assembling commit_batch argument.
#[error("Failed to assemble commit_batch argument: {0}")]
AssembleCommitBatchArgumentError(String),

/// Failed when creating project assets.
#[error("Failed to create project asset: {0}")]
CreateProjectAssetFailed(#[from] CreateProjectAssetError),
Expand Down
4 changes: 4 additions & 0 deletions src/canisters/frontend/ic-asset/src/error/upload_content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ pub enum UploadContentError {
#[error("Failed to create project asset: {0}")]
CreateProjectAssetError(#[from] CreateProjectAssetError),

/// Failed when assembling commit_batch argument.
#[error("Failed to assemble commit_batch argument: {0}")]
AssembleCommitBatchArgumentError(String),

/// Failed when building list of assets to synchronize.
#[error("Failed to gather asset descriptors: {0}")]
GatherAssetDescriptorsFailed(#[from] GatherAssetDescriptorsError),
Expand Down
3 changes: 2 additions & 1 deletion src/canisters/frontend/ic-asset/src/evidence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ pub async fn compute_evidence(
Obsolete,
canister_asset_properties,
)
.await;
.await
.map_err(ComputeEvidenceError::AssembleCommitBatchArgumentError)?;
operations.sort();

let mut sha = Sha256::new();
Expand Down
6 changes: 4 additions & 2 deletions src/canisters/frontend/ic-asset/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ pub async fn upload_content_and_assemble_sync_operations(
&canister_assets,
logger,
)
.await?;
.await
.map_err(UploadContentError::CreateProjectAssetError)?;

let commit_batch_args = batch_upload::operations::assemble_commit_batch_arguments(
&chunk_uploader,
Expand All @@ -97,7 +98,8 @@ pub async fn upload_content_and_assemble_sync_operations(
canister_asset_properties,
batch_id,
)
.await;
.await
.map_err(UploadContentError::AssembleCommitBatchArgumentError)?;

// -v
debug!(
Expand Down
3 changes: 2 additions & 1 deletion src/canisters/frontend/ic-asset/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ pub async fn upload(
HashMap::new(),
batch_id,
)
.await;
.await
.map_err(UploadError::AssembleCommitBatchArgumentError)?;

let canister_api_version = api_version(canister).await;
info!(logger, "Committing batch.");
Expand Down
6 changes: 6 additions & 0 deletions src/canisters/frontend/ic-certified-assets/src/evidence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ fn next_chunk_index(
hasher,
};
}
} else if let Some(chunk_content) = sac.asset_content.as_ref() {
hash_chunk_by_content(&mut hasher, chunk_content);
}
}
NextOperation {
Expand All @@ -157,6 +159,10 @@ fn hash_chunk_by_id(hasher: &mut Sha256, chunk_id: &ChunkId, chunks: &HashMap<Ch
}
}

fn hash_chunk_by_content(hasher: &mut Sha256, chunk_content: &ByteBuf) {
hasher.update(chunk_content);
}

fn hash_create_asset(hasher: &mut Sha256, args: &CreateAssetArguments) {
hasher.update(TAG_CREATE_ASSET);
hasher.update(&args.key);
Expand Down
16 changes: 11 additions & 5 deletions src/canisters/frontend/ic-certified-assets/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,10 @@ impl State {
arg: SetAssetContentArguments,
now: u64,
) -> Result<(), String> {
if arg.chunk_ids.is_empty() {
return Err("encoding must have at least one chunk".to_string());
if arg.chunk_ids.is_empty() && arg.asset_content.is_none() {
return Err(
"encoding must have at least one chunk or contain asset_content".to_string(),
);
}

let dependent_keys = self.dependent_keys(&arg.key);
Expand All @@ -395,9 +397,13 @@ impl State {
let now = Int::from(now);

let mut content_chunks = vec![];
for chunk_id in arg.chunk_ids.iter() {
let chunk = self.chunks.remove(chunk_id).expect("chunk not found");
content_chunks.push(chunk.content);
if arg.chunk_ids.len() > 0 {
for chunk_id in arg.chunk_ids.iter() {
let chunk = self.chunks.remove(chunk_id).expect("chunk not found");
content_chunks.push(chunk.content);
}
} else if let Some(encoding_content) = arg.asset_content {
content_chunks.push(encoding_content.into());
}

let sha256: [u8; 32] = match arg.sha256 {
Expand Down
Loading

0 comments on commit b9edddf

Please sign in to comment.