Skip to content

Commit

Permalink
chore: more progress
Browse files Browse the repository at this point in the history
  • Loading branch information
SimonThormeyer committed Jun 21, 2024
1 parent 84d9a5d commit 1702486
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 62 deletions.
3 changes: 2 additions & 1 deletion openmls/src/group/core_group/new_from_external_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ impl CoreGroup {
verifiable_group_info,
// Existing proposals are discarded when joining by external commit.
ProposalStore::new(),
).await?;
)
.await?;
let group_context = public_group.group_context();

// Obtain external_pub from GroupInfo extensions.
Expand Down
6 changes: 4 additions & 2 deletions openmls/src/group/core_group/new_from_welcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ impl StagedCoreWelcome {
ciphersuite,
resumption_psk_store,
group_secrets,
).await
)
.await
}

/// Returns the [`LeafNodeIndex`] of the group member that authored the [`Welcome`] message.
Expand Down Expand Up @@ -130,7 +131,8 @@ pub(in crate::group) async fn build_staged_welcome<Provider: OpenMlsProvider>(
ratchet_tree,
verifiable_group_info.clone(),
ProposalStore::new(),
).await?;
)
.await?;

// Find our own leaf in the tree.
let own_leaf_index = public_group
Expand Down
41 changes: 24 additions & 17 deletions openmls/src/group/core_group/process.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#[cfg(feature = "async")]
use futures::{stream, StreamExt};
use futures::TryStreamExt;
use core_group::proposals::QueuedProposal;
#[cfg(feature = "async")]
use futures::{stream, StreamExt, TryStreamExt};

use crate::{
framing::mls_content::FramedContentBody,
Expand Down Expand Up @@ -200,7 +199,8 @@ impl CoreGroup {
// If this is a commit, we need to load the private key material we need for decryption.
let (old_epoch_keypairs, leaf_node_keypairs) =
if let ContentType::Commit = unverified_message.content_type() {
self.read_decryption_keypairs(provider, own_leaf_nodes).await?
self.read_decryption_keypairs(provider, own_leaf_nodes)
.await?
} else {
(vec![], vec![])
};
Expand Down Expand Up @@ -283,34 +283,41 @@ impl CoreGroup {
// If we are processing an update proposal that originally came from
// us, the keypair corresponding to the leaf in the update is also a
// potential decryption keypair.
let leaf_node_keypairs = Self::encryption_key_pairs_from_own_leaf_nodes(provider, own_leaf_nodes).await;
let leaf_node_keypairs =
Self::encryption_key_pairs_from_own_leaf_nodes(provider, own_leaf_nodes).await?;

Ok((old_epoch_keypairs, leaf_node_keypairs))
}

#[cfg(feature = "async")]
async fn encryption_key_pairs_from_own_leaf_nodes(provider: &impl OpenMlsProvider, own_leaf_nodes: &[LeafNode]) -> Vec<EncryptionKeyPair> {
let stream = stream::iter(own_leaf_nodes);
let then = stream.then(|leaf_node| async {
EncryptionKeyPair::read(provider, leaf_node.encryption_key()).await
async fn encryption_key_pairs_from_own_leaf_nodes(
provider: &impl OpenMlsProvider,
own_leaf_nodes: &[LeafNode],
) -> Result<Vec<EncryptionKeyPair>, StageCommitError> {
stream::iter(own_leaf_nodes)
.then(|leaf_node| async {
EncryptionKeyPair::read(provider, leaf_node.encryption_key())
.await
.ok_or(StageCommitError::MissingDecryptionKey)
});
then.map(|e| {
e.and_then(|e| {
Ok(e)
})
}).collect::<Vec<_>>()
}
.collect::<Vec<_>>()
.await
.into_iter()
.collect()
}

#[cfg(not(feature = "async"))]
async fn encryption_key_pairs_from_own_leaf_nodes(provider: &impl OpenMlsProvider, own_leaf_nodes: &[LeafNode]) -> Vec<EncryptionKeyPair> {
async fn encryption_key_pairs_from_own_leaf_nodes(
provider: &impl OpenMlsProvider,
own_leaf_nodes: &[LeafNode],
) -> Result<Vec<EncryptionKeyPair>, StageCommitError> {
own_leaf_nodes
.iter()
.map(|leaf_node| {
EncryptionKeyPair::read(provider, leaf_node.encryption_key())
.ok_or(StageCommitError::MissingDecryptionKey)
})
.collect::<Result<Vec<EncryptionKeyPair>, StageCommitError>>()
.collect()
}

/// Merge a [StagedCommit] into the group after inspection
Expand Down
3 changes: 2 additions & 1 deletion openmls/src/group/mls_group/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ impl MlsGroup {
self.configuration().padding_size(),
provider,
signer,
).await
)
.await
// We know the application message is wellformed and we have the key material of the current epoch
.map_err(|_| LibraryError::custom("Malformed plaintext"))?;

Expand Down
15 changes: 10 additions & 5 deletions openmls/src/group/mls_group/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ impl MlsGroupBuilder {
signer: &impl Signer,
credential_with_key: CredentialWithKey,
) -> Result<MlsGroup, NewGroupError<Provider::StorageError>> {
self.build_internal(provider, signer, credential_with_key, None).await
self.build_internal(provider, signer, credential_with_key, None)
.await
}

/// Build a new group with the given group ID.
Expand Down Expand Up @@ -81,7 +82,8 @@ impl MlsGroupBuilder {
.with_capabilities(mls_group_create_config.capabilities.clone())
.with_max_past_epoch_secrets(mls_group_create_config.join_config.max_past_epochs)
.with_lifetime(*mls_group_create_config.lifetime())
.build(provider, signer).await
.build(provider, signer)
.await
.map_err(|e| match e {
CoreGroupBuildError::LibraryError(e) => e.into(),
// We don't support PSKs yet
Expand Down Expand Up @@ -115,15 +117,18 @@ impl MlsGroupBuilder {

provider
.storage()
.write_mls_join_config(mls_group.group_id(), &mls_group.mls_group_config).await
.write_mls_join_config(mls_group.group_id(), &mls_group.mls_group_config)
.await
.map_err(NewGroupError::StorageError)?;
provider
.storage()
.write_group_state(mls_group.group_id(), &mls_group.group_state).await
.write_group_state(mls_group.group_id(), &mls_group.group_state)
.await
.map_err(NewGroupError::StorageError)?;
mls_group
.group
.store(provider.storage()).await
.store(provider.storage())
.await
.map_err(NewGroupError::StorageError)?;

Ok(mls_group)
Expand Down
93 changes: 59 additions & 34 deletions openmls/src/group/mls_group/creation.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use openmls_traits::{signatures::Signer, storage::StorageProvider as StorageProviderTrait};
#[cfg(feature = "async")]
use futures::{stream::{self, StreamExt}, TryFutureExt};
use futures::{
stream::{self, StreamExt},
TryFutureExt,
};
use openmls_traits::{signatures::Signer, storage::StorageProvider as StorageProviderTrait};

use super::{builder::MlsGroupBuilder, *};
use crate::{
Expand Down Expand Up @@ -40,12 +43,14 @@ impl MlsGroup {
mls_group_create_config: &MlsGroupCreateConfig,
credential_with_key: CredentialWithKey,
) -> Result<Self, NewGroupError<Provider::StorageError>> {
MlsGroupBuilder::new().build_internal(
provider,
signer,
credential_with_key,
Some(mls_group_create_config.clone()),
).await
MlsGroupBuilder::new()
.build_internal(
provider,
signer,
credential_with_key,
Some(mls_group_create_config.clone()),
)
.await
}

/// Creates a new group with a given group ID with the creator as the only
Expand All @@ -64,7 +69,8 @@ impl MlsGroup {
signer,
credential_with_key,
Some(mls_group_create_config.clone()),
).await
)
.await
}

/// Join an existing group through an External Commit.
Expand Down Expand Up @@ -106,7 +112,8 @@ impl MlsGroup {
params,
ratchet_tree,
verifiable_group_info,
).await?;
)
.await?;
group.set_max_past_epochs(mls_group_config.max_past_epochs);

let mls_group = MlsGroup {
Expand All @@ -122,15 +129,18 @@ impl MlsGroup {

provider
.storage()
.write_mls_join_config(mls_group.group_id(), &mls_group.mls_group_config).await
.write_mls_join_config(mls_group.group_id(), &mls_group.mls_group_config)
.await
.map_err(ExternalCommitError::StorageError)?;
provider
.storage()
.write_group_state(mls_group.group_id(), &mls_group.group_state).await
.write_group_state(mls_group.group_id(), &mls_group.group_state)
.await
.map_err(ExternalCommitError::StorageError)?;
mls_group
.group
.store(provider.storage()).await
.store(provider.storage())
.await
.map_err(ExternalCommitError::StorageError)?;

let public_message: PublicMessage = create_commit_result.commit.into();
Expand Down Expand Up @@ -217,7 +227,8 @@ impl ProcessedWelcome {
self.ciphersuite,
self.resumption_psk_store,
self.group_secrets,
).await?;
)
.await?;

let staged_welcome = StagedWelcome {
mls_group_config: self.mls_group_config,
Expand Down Expand Up @@ -253,7 +264,8 @@ impl StagedWelcome {
key_package_bundle,
provider,
resumption_psk_store,
).await?;
)
.await?;

let staged_welcome = StagedWelcome {
mls_group_config: mls_group_config.clone(),
Expand Down Expand Up @@ -296,11 +308,13 @@ impl StagedWelcome {

provider
.storage()
.write_mls_join_config(mls_group.group_id(), &mls_group.mls_group_config).await
.write_mls_join_config(mls_group.group_id(), &mls_group.mls_group_config)
.await
.map_err(WelcomeError::StorageError)?;
provider
.storage()
.write_group_state(mls_group.group_id(), &MlsGroupState::Operational).await
.write_group_state(mls_group.group_id(), &MlsGroupState::Operational)
.await
.map_err(WelcomeError::StorageError)?;

Ok(mls_group)
Expand All @@ -318,11 +332,14 @@ async fn keys_for_welcome<Provider: OpenMlsProvider>(
WelcomeError<<Provider as OpenMlsProvider>::StorageError>,
> {
let resumption_psk_store = ResumptionPskStore::new(mls_group_config.number_of_resumption_psks);
let key_package_bundle = get_key_package_bundle_for_welcome(welcome, provider).await.ok_or(WelcomeError::NoMatchingKeyPackage)?;
let key_package_bundle = get_key_package_bundle_for_welcome(welcome, provider)
.await
.ok_or(WelcomeError::NoMatchingKeyPackage)?;
if !key_package_bundle.key_package().last_resort() {
provider
.storage()
.delete_key_package(&key_package_bundle.key_package.hash_ref(provider.crypto())?).await
.delete_key_package(&key_package_bundle.key_package.hash_ref(provider.crypto())?)
.await
.map_err(WelcomeError::StorageError)?;
} else {
log::debug!("Key package has last resort extension, not deleting");
Expand All @@ -332,22 +349,28 @@ async fn keys_for_welcome<Provider: OpenMlsProvider>(

#[maybe_async::must_be_async]
#[cfg(feature = "async")]
async fn get_key_package_bundle_for_welcome<Provider: OpenMlsProvider>(welcome: &Welcome, provider: &Provider) -> Option<KeyPackageBundle> {
let stream = stream::iter(welcome.secrets());
let events = stream.filter_map(|egs| async move {
let hash_ref = egs.new_member();
provider
.storage()
.key_package(&hash_ref).await
.ok()?
});
events.collect::<Vec<KeyPackageBundle>>().await.first().cloned()
async fn get_key_package_bundle_for_welcome<Provider: OpenMlsProvider>(
welcome: &Welcome,
provider: &Provider,
) -> Option<KeyPackageBundle> {
let stream = stream::iter(welcome.secrets());
let events = stream.filter_map(|egs| async move {
let hash_ref = egs.new_member();
provider.storage().key_package(&hash_ref).await.ok()?
});
events
.collect::<Vec<KeyPackageBundle>>()
.await
.first()
.cloned()
}


#[maybe_async::must_be_sync]
#[cfg(not(feature = "async"))]
async fn get_key_package_bundle_for_welcome<Provider: OpenMlsProvider>(welcome: &Welcome, provider: &Provider) -> Option<KeyPackageBundle> {
async fn get_key_package_bundle_for_welcome<Provider: OpenMlsProvider>(
welcome: &Welcome,
provider: &Provider,
) -> Option<KeyPackageBundle> {
welcome
.secrets()
.iter()
Expand All @@ -357,8 +380,10 @@ async fn get_key_package_bundle_for_welcome<Provider: OpenMlsProvider>(welcome:
transpose_err_opt(
provider
.storage()
.key_package(&hash_ref).await
.key_package(&hash_ref)
.await
.map_err(WelcomeError::StorageError),
)
})?.ok()
}
})?
.ok()
}
4 changes: 3 additions & 1 deletion openmls/src/group/public_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,9 @@ impl PublicGroup {
storage.delete_tree(self.group_id()).await?;
storage.delete_confirmation_tag(self.group_id()).await?;
storage.delete_context(self.group_id()).await?;
storage.delete_interim_transcript_hash(self.group_id()).await?;
storage
.delete_interim_transcript_hash(self.group_id())
.await?;

Ok(())
}
Expand Down
4 changes: 3 additions & 1 deletion openmls/src/group/public_group/staged_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ impl PublicGroup {
}

self.proposal_store.empty();
self.store(storage).await.map_err(MergeCommitError::StorageError)
self.store(storage)
.await
.map_err(MergeCommitError::StorageError)
}
}

0 comments on commit 1702486

Please sign in to comment.