diff --git a/openmls/src/group/core_group/new_from_external_init.rs b/openmls/src/group/core_group/new_from_external_init.rs index ae18f43d0..5626bbf07 100644 --- a/openmls/src/group/core_group/new_from_external_init.rs +++ b/openmls/src/group/core_group/new_from_external_init.rs @@ -55,7 +55,8 @@ impl CoreGroup { verifiable_group_info, // Existing proposals are discarded when joining by external commit. ProposalStore::new(), - ).await?; + ) + .await?; let group_context = public_group.group_context(); // Obtain external_pub from GroupInfo extensions. diff --git a/openmls/src/group/core_group/new_from_welcome.rs b/openmls/src/group/core_group/new_from_welcome.rs index 19ec28eeb..02471febd 100644 --- a/openmls/src/group/core_group/new_from_welcome.rs +++ b/openmls/src/group/core_group/new_from_welcome.rs @@ -40,7 +40,8 @@ impl StagedCoreWelcome { ciphersuite, resumption_psk_store, group_secrets, - ).await + ) + .await } /// Returns the [`LeafNodeIndex`] of the group member that authored the [`Welcome`] message. @@ -130,7 +131,8 @@ pub(in crate::group) async fn build_staged_welcome( ratchet_tree, verifiable_group_info.clone(), ProposalStore::new(), - ).await?; + ) + .await?; // Find our own leaf in the tree. let own_leaf_index = public_group diff --git a/openmls/src/group/core_group/process.rs b/openmls/src/group/core_group/process.rs index c7611d72b..b237745f6 100644 --- a/openmls/src/group/core_group/process.rs +++ b/openmls/src/group/core_group/process.rs @@ -1,7 +1,6 @@ -#[cfg(feature = "async")] -use futures::{stream, StreamExt}; -use futures::TryStreamExt; use core_group::proposals::QueuedProposal; +#[cfg(feature = "async")] +use futures::{stream, StreamExt, TryStreamExt}; use crate::{ framing::mls_content::FramedContentBody, @@ -200,7 +199,8 @@ impl CoreGroup { // If this is a commit, we need to load the private key material we need for decryption. let (old_epoch_keypairs, leaf_node_keypairs) = if let ContentType::Commit = unverified_message.content_type() { - self.read_decryption_keypairs(provider, own_leaf_nodes).await? + self.read_decryption_keypairs(provider, own_leaf_nodes) + .await? } else { (vec![], vec![]) }; @@ -283,34 +283,41 @@ impl CoreGroup { // If we are processing an update proposal that originally came from // us, the keypair corresponding to the leaf in the update is also a // potential decryption keypair. - let leaf_node_keypairs = Self::encryption_key_pairs_from_own_leaf_nodes(provider, own_leaf_nodes).await; + let leaf_node_keypairs = + Self::encryption_key_pairs_from_own_leaf_nodes(provider, own_leaf_nodes).await?; Ok((old_epoch_keypairs, leaf_node_keypairs)) } #[cfg(feature = "async")] - async fn encryption_key_pairs_from_own_leaf_nodes(provider: &impl OpenMlsProvider, own_leaf_nodes: &[LeafNode]) -> Vec { - let stream = stream::iter(own_leaf_nodes); - let then = stream.then(|leaf_node| async { - EncryptionKeyPair::read(provider, leaf_node.encryption_key()).await + async fn encryption_key_pairs_from_own_leaf_nodes( + provider: &impl OpenMlsProvider, + own_leaf_nodes: &[LeafNode], + ) -> Result, StageCommitError> { + stream::iter(own_leaf_nodes) + .then(|leaf_node| async { + EncryptionKeyPair::read(provider, leaf_node.encryption_key()) + .await .ok_or(StageCommitError::MissingDecryptionKey) - }); - then.map(|e| { - e.and_then(|e| { - Ok(e) }) - }).collect::>() - } + .collect::>() + .await + .into_iter() + .collect() + } #[cfg(not(feature = "async"))] - async fn encryption_key_pairs_from_own_leaf_nodes(provider: &impl OpenMlsProvider, own_leaf_nodes: &[LeafNode]) -> Vec { + async fn encryption_key_pairs_from_own_leaf_nodes( + provider: &impl OpenMlsProvider, + own_leaf_nodes: &[LeafNode], + ) -> Result, StageCommitError> { own_leaf_nodes .iter() .map(|leaf_node| { EncryptionKeyPair::read(provider, leaf_node.encryption_key()) .ok_or(StageCommitError::MissingDecryptionKey) }) - .collect::, StageCommitError>>() + .collect() } /// Merge a [StagedCommit] into the group after inspection diff --git a/openmls/src/group/mls_group/application.rs b/openmls/src/group/mls_group/application.rs index 2e6bb9874..08d445825 100644 --- a/openmls/src/group/mls_group/application.rs +++ b/openmls/src/group/mls_group/application.rs @@ -40,7 +40,8 @@ impl MlsGroup { self.configuration().padding_size(), provider, signer, - ).await + ) + .await // We know the application message is wellformed and we have the key material of the current epoch .map_err(|_| LibraryError::custom("Malformed plaintext"))?; diff --git a/openmls/src/group/mls_group/builder.rs b/openmls/src/group/mls_group/builder.rs index 4ca924930..e505c4ec3 100644 --- a/openmls/src/group/mls_group/builder.rs +++ b/openmls/src/group/mls_group/builder.rs @@ -43,7 +43,8 @@ impl MlsGroupBuilder { signer: &impl Signer, credential_with_key: CredentialWithKey, ) -> Result> { - self.build_internal(provider, signer, credential_with_key, None).await + self.build_internal(provider, signer, credential_with_key, None) + .await } /// Build a new group with the given group ID. @@ -81,7 +82,8 @@ impl MlsGroupBuilder { .with_capabilities(mls_group_create_config.capabilities.clone()) .with_max_past_epoch_secrets(mls_group_create_config.join_config.max_past_epochs) .with_lifetime(*mls_group_create_config.lifetime()) - .build(provider, signer).await + .build(provider, signer) + .await .map_err(|e| match e { CoreGroupBuildError::LibraryError(e) => e.into(), // We don't support PSKs yet @@ -115,15 +117,18 @@ impl MlsGroupBuilder { provider .storage() - .write_mls_join_config(mls_group.group_id(), &mls_group.mls_group_config).await + .write_mls_join_config(mls_group.group_id(), &mls_group.mls_group_config) + .await .map_err(NewGroupError::StorageError)?; provider .storage() - .write_group_state(mls_group.group_id(), &mls_group.group_state).await + .write_group_state(mls_group.group_id(), &mls_group.group_state) + .await .map_err(NewGroupError::StorageError)?; mls_group .group - .store(provider.storage()).await + .store(provider.storage()) + .await .map_err(NewGroupError::StorageError)?; Ok(mls_group) diff --git a/openmls/src/group/mls_group/creation.rs b/openmls/src/group/mls_group/creation.rs index a9c7f330f..86f9d4c6a 100644 --- a/openmls/src/group/mls_group/creation.rs +++ b/openmls/src/group/mls_group/creation.rs @@ -1,6 +1,9 @@ -use openmls_traits::{signatures::Signer, storage::StorageProvider as StorageProviderTrait}; #[cfg(feature = "async")] -use futures::{stream::{self, StreamExt}, TryFutureExt}; +use futures::{ + stream::{self, StreamExt}, + TryFutureExt, +}; +use openmls_traits::{signatures::Signer, storage::StorageProvider as StorageProviderTrait}; use super::{builder::MlsGroupBuilder, *}; use crate::{ @@ -40,12 +43,14 @@ impl MlsGroup { mls_group_create_config: &MlsGroupCreateConfig, credential_with_key: CredentialWithKey, ) -> Result> { - MlsGroupBuilder::new().build_internal( - provider, - signer, - credential_with_key, - Some(mls_group_create_config.clone()), - ).await + MlsGroupBuilder::new() + .build_internal( + provider, + signer, + credential_with_key, + Some(mls_group_create_config.clone()), + ) + .await } /// Creates a new group with a given group ID with the creator as the only @@ -64,7 +69,8 @@ impl MlsGroup { signer, credential_with_key, Some(mls_group_create_config.clone()), - ).await + ) + .await } /// Join an existing group through an External Commit. @@ -106,7 +112,8 @@ impl MlsGroup { params, ratchet_tree, verifiable_group_info, - ).await?; + ) + .await?; group.set_max_past_epochs(mls_group_config.max_past_epochs); let mls_group = MlsGroup { @@ -122,15 +129,18 @@ impl MlsGroup { provider .storage() - .write_mls_join_config(mls_group.group_id(), &mls_group.mls_group_config).await + .write_mls_join_config(mls_group.group_id(), &mls_group.mls_group_config) + .await .map_err(ExternalCommitError::StorageError)?; provider .storage() - .write_group_state(mls_group.group_id(), &mls_group.group_state).await + .write_group_state(mls_group.group_id(), &mls_group.group_state) + .await .map_err(ExternalCommitError::StorageError)?; mls_group .group - .store(provider.storage()).await + .store(provider.storage()) + .await .map_err(ExternalCommitError::StorageError)?; let public_message: PublicMessage = create_commit_result.commit.into(); @@ -217,7 +227,8 @@ impl ProcessedWelcome { self.ciphersuite, self.resumption_psk_store, self.group_secrets, - ).await?; + ) + .await?; let staged_welcome = StagedWelcome { mls_group_config: self.mls_group_config, @@ -253,7 +264,8 @@ impl StagedWelcome { key_package_bundle, provider, resumption_psk_store, - ).await?; + ) + .await?; let staged_welcome = StagedWelcome { mls_group_config: mls_group_config.clone(), @@ -296,11 +308,13 @@ impl StagedWelcome { provider .storage() - .write_mls_join_config(mls_group.group_id(), &mls_group.mls_group_config).await + .write_mls_join_config(mls_group.group_id(), &mls_group.mls_group_config) + .await .map_err(WelcomeError::StorageError)?; provider .storage() - .write_group_state(mls_group.group_id(), &MlsGroupState::Operational).await + .write_group_state(mls_group.group_id(), &MlsGroupState::Operational) + .await .map_err(WelcomeError::StorageError)?; Ok(mls_group) @@ -318,11 +332,14 @@ async fn keys_for_welcome( WelcomeError<::StorageError>, > { let resumption_psk_store = ResumptionPskStore::new(mls_group_config.number_of_resumption_psks); - let key_package_bundle = get_key_package_bundle_for_welcome(welcome, provider).await.ok_or(WelcomeError::NoMatchingKeyPackage)?; + let key_package_bundle = get_key_package_bundle_for_welcome(welcome, provider) + .await + .ok_or(WelcomeError::NoMatchingKeyPackage)?; if !key_package_bundle.key_package().last_resort() { provider .storage() - .delete_key_package(&key_package_bundle.key_package.hash_ref(provider.crypto())?).await + .delete_key_package(&key_package_bundle.key_package.hash_ref(provider.crypto())?) + .await .map_err(WelcomeError::StorageError)?; } else { log::debug!("Key package has last resort extension, not deleting"); @@ -332,22 +349,28 @@ async fn keys_for_welcome( #[maybe_async::must_be_async] #[cfg(feature = "async")] -async fn get_key_package_bundle_for_welcome(welcome: &Welcome, provider: &Provider) -> Option { - let stream = stream::iter(welcome.secrets()); - let events = stream.filter_map(|egs| async move { - let hash_ref = egs.new_member(); - provider - .storage() - .key_package(&hash_ref).await - .ok()? - }); - events.collect::>().await.first().cloned() +async fn get_key_package_bundle_for_welcome( + welcome: &Welcome, + provider: &Provider, +) -> Option { + let stream = stream::iter(welcome.secrets()); + let events = stream.filter_map(|egs| async move { + let hash_ref = egs.new_member(); + provider.storage().key_package(&hash_ref).await.ok()? + }); + events + .collect::>() + .await + .first() + .cloned() } - #[maybe_async::must_be_sync] #[cfg(not(feature = "async"))] -async fn get_key_package_bundle_for_welcome(welcome: &Welcome, provider: &Provider) -> Option { +async fn get_key_package_bundle_for_welcome( + welcome: &Welcome, + provider: &Provider, +) -> Option { welcome .secrets() .iter() @@ -357,8 +380,10 @@ async fn get_key_package_bundle_for_welcome(welcome: transpose_err_opt( provider .storage() - .key_package(&hash_ref).await + .key_package(&hash_ref) + .await .map_err(WelcomeError::StorageError), ) - })?.ok() -} \ No newline at end of file + })? + .ok() +} diff --git a/openmls/src/group/public_group/mod.rs b/openmls/src/group/public_group/mod.rs index a507c7a81..df4df8f7b 100644 --- a/openmls/src/group/public_group/mod.rs +++ b/openmls/src/group/public_group/mod.rs @@ -389,7 +389,9 @@ impl PublicGroup { storage.delete_tree(self.group_id()).await?; storage.delete_confirmation_tag(self.group_id()).await?; storage.delete_context(self.group_id()).await?; - storage.delete_interim_transcript_hash(self.group_id()).await?; + storage + .delete_interim_transcript_hash(self.group_id()) + .await?; Ok(()) } diff --git a/openmls/src/group/public_group/staged_commit.rs b/openmls/src/group/public_group/staged_commit.rs index 50ef8a1e8..c6a920527 100644 --- a/openmls/src/group/public_group/staged_commit.rs +++ b/openmls/src/group/public_group/staged_commit.rs @@ -293,6 +293,8 @@ impl PublicGroup { } self.proposal_store.empty(); - self.store(storage).await.map_err(MergeCommitError::StorageError) + self.store(storage) + .await + .map_err(MergeCommitError::StorageError) } }