Skip to content

Commit

Permalink
fix: addresses issues when integrating with app
Browse files Browse the repository at this point in the history
  • Loading branch information
dbcfd committed Feb 26, 2024
1 parent 1467527 commit bebb81c
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 19 deletions.
55 changes: 39 additions & 16 deletions calculator/src/calculator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,42 @@ impl Calculator {
tracing::debug!("Skipping event for model {}", model);
return Ok(());
}
let holder = meta
.controllers
.into_iter()
.next()
.ok_or_else(|| anyhow::anyhow!("No controllers for event"))?;
let attestation_stream_id = StreamId::from_str(&event.commit_id)?;
match serde_json::from_str::<PointAttestations>(&event.content) {
Ok(attestation) => {
if let Err(e) = validate_attestation(&attestation).await {
tracing::warn!("Error validating attestation: {}", e);
return Ok(());
}
unique_events(&mut self.cache, &attestation, &attestation_stream_id).await?;
all_events(&mut self.cache, &attestation, &attestation_stream_id).await?;
first_all_events(&mut self.cache, &attestation, &attestation_stream_id).await?;
unique_events(
&mut self.cache,
&holder,
&attestation,
&attestation_stream_id,
)
.await?;
all_events(
&mut self.cache,
&holder,
&attestation,
&attestation_stream_id,
)
.await?;
first_all_events(
&mut self.cache,
&holder,
&attestation,
&attestation_stream_id,
)
.await?;
}
Err(e) => {
tracing::warn!("Error parsing attestation: {}", e);
tracing::warn!("Error parsing attestation: {}\n{}", e, event.content);
}
}
Ok(())
Expand All @@ -81,6 +104,7 @@ async fn validate_attestation(attestation: &PointAttestations) -> Result<(), any
const UNIQUE_EVENTS_CONTEXT: &str = "unique-events";
async fn unique_events(
cache: &mut MaterializationCache,
holder: &str,
attestation: &PointAttestations,
attestation_stream_id: &StreamId,
) -> Result<(), anyhow::Error> {
Expand All @@ -91,10 +115,7 @@ async fn unique_events(
.into_iter()
.map(|t| t.0)
.collect();
match cache
.get_points(&attestation.holder, UNIQUE_EVENTS_CONTEXT)
.await?
{
match cache.get_points(holder, UNIQUE_EVENTS_CONTEXT).await? {
Some(mut existing) => {
existing.points.value = keys.len() as i64;
tracing::info!(
Expand All @@ -107,12 +128,12 @@ async fn unique_events(
None => {
tracing::info!(
"Creating points for holder {} for {}",
attestation.holder,
holder,
UNIQUE_EVENTS_CONTEXT
);
cache
.create_points(
&attestation.holder,
holder,
UNIQUE_EVENTS_CONTEXT,
attestation_stream_id,
keys.len() as i64,
Expand All @@ -126,6 +147,7 @@ async fn unique_events(
const ALL_EVENTS_CONTEXT: &str = "all-events";
async fn all_events(
cache: &mut MaterializationCache,
holder: &str,
attestation: &PointAttestations,
attestation_stream_id: &StreamId,
) -> Result<(), anyhow::Error> {
Expand All @@ -137,7 +159,7 @@ async fn all_events(
.map(|t| t.0)
.collect();
match cache
.get_points(&attestation.holder, crate::calculator::ALL_EVENTS_CONTEXT)
.get_points(holder, crate::calculator::ALL_EVENTS_CONTEXT)
.await?
{
Some(mut existing) => {
Expand All @@ -152,12 +174,12 @@ async fn all_events(
None => {
tracing::info!(
"Creating points for recipient {} for {}",
attestation.holder,
holder,
ALL_EVENTS_CONTEXT
);
cache
.create_points(
&attestation.holder,
holder,
ALL_EVENTS_CONTEXT,
attestation_stream_id,
keys.len() as i64,
Expand All @@ -172,6 +194,7 @@ const FIRST_ALL_EVENTS_CONTEXT: &str = "first-all-events";
const TOTAL_EVENTS: usize = 9;
async fn first_all_events(
cache: &mut MaterializationCache,
holder: &str,
attestation: &PointAttestations,
attestation_stream_id: &StreamId,
) -> Result<(), anyhow::Error> {
Expand All @@ -191,18 +214,18 @@ async fn first_all_events(
.collect();
if events_by_last_time.len() >= TOTAL_EVENTS
&& cache
.get_points(&attestation.holder, FIRST_ALL_EVENTS_CONTEXT)
.get_points(holder, FIRST_ALL_EVENTS_CONTEXT)
.await?
.is_none()
{
tracing::info!(
"Creating points for recipient {} for {}",
attestation.holder,
holder,
FIRST_ALL_EVENTS_CONTEXT
);
cache
.create_points(
&attestation.holder,
holder,
FIRST_ALL_EVENTS_CONTEXT,
attestation_stream_id,
events_by_last_time.first().unwrap().timestamp.timestamp(),
Expand Down
3 changes: 1 addition & 2 deletions models/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ pub struct PointAttestation {
}

#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
#[serde(deny_unknown_fields)]
pub struct PointAttestations {
pub holder: String,
pub issuer: String,
pub issuer_verification: String,
pub data: Vec<PointAttestation>,
Expand Down
1 change: 0 additions & 1 deletion tester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ async fn create_attestations(
let verification = signer.sign(&serde_json::to_vec(&data)?).await?;
Ok(models::PointAttestations {
issuer: signer.id().id.clone(),
holder: signer.id().id.clone(),
issuer_verification: verification.to_string(),
data,
})
Expand Down

0 comments on commit bebb81c

Please sign in to comment.