From a0501ab6e2c52e7c5910a86e2bb41e9fb67bfd47 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Thu, 27 Jun 2024 14:06:49 -0400 Subject: [PATCH] Add cleanup task to remove old data from db table --- .../migrations/34_sp_boosted_rewards_bans.sql | 1 + .../src/sp_boosted_rewards_bans.rs | 152 +++++++++++++----- 2 files changed, 117 insertions(+), 36 deletions(-) diff --git a/mobile_verifier/migrations/34_sp_boosted_rewards_bans.sql b/mobile_verifier/migrations/34_sp_boosted_rewards_bans.sql index e9d86f2d0..c8b768af2 100644 --- a/mobile_verifier/migrations/34_sp_boosted_rewards_bans.sql +++ b/mobile_verifier/migrations/34_sp_boosted_rewards_bans.sql @@ -3,5 +3,6 @@ CREATE TABLE IF NOT EXISTS sp_boosted_rewards_bans ( radio_key TEXT NOT NULL, received_timestamp TIMESTAMPTZ NOT NULL, until TIMESTAMPTZ NOT NULL, + invalidated_at TIMESTAMPTZ, PRIMARY KEY (radio_type, radio_key) ); diff --git a/mobile_verifier/src/sp_boosted_rewards_bans.rs b/mobile_verifier/src/sp_boosted_rewards_bans.rs index ea2fa854d..4355a6d00 100644 --- a/mobile_verifier/src/sp_boosted_rewards_bans.rs +++ b/mobile_verifier/src/sp_boosted_rewards_bans.rs @@ -12,7 +12,9 @@ use helium_crypto::PublicKeyBinary; use helium_proto::services::{ mobile_config::NetworkKeyRole, poc_mobile::{ - service_provider_boosted_rewards_banned_radio_req_v1::KeyType, + service_provider_boosted_rewards_banned_radio_req_v1::{ + KeyType, SpBoostedRewardsBannedRadioReason, + }, ServiceProviderBoostedRewardsBannedRadioIngestReportV1, ServiceProviderBoostedRewardsBannedRadioVerificationStatus, VerifiedServiceProviderBoostedRewardsBannedRadioIngestReportV1, @@ -25,12 +27,16 @@ use tokio::sync::mpsc::Receiver; use crate::Settings; +const CLEANUP_DAYS: i64 = 7; +const CLEANUP_INTERVAL_SECS: u64 = 12 * 60 * 60; + struct BannedRadioReport { received_timestamp: DateTime, pubkey: PublicKeyBinary, radio_type: String, radio_key: String, until: DateTime, + reason: SpBoostedRewardsBannedRadioReason, } impl TryFrom for BannedRadioReport { @@ -43,6 +49,8 @@ impl TryFrom for BannedR .report .ok_or_else(|| anyhow::anyhow!("invalid ingest report"))?; + let reason = report.reason(); + let (radio_type, radio_key) = match report.key_type { Some(KeyType::CbsdId(cbsd_id)) => ("cbrs", cbsd_id), Some(KeyType::HotspotKey(bytes)) => ("wifi", PublicKeyBinary::from(bytes).to_string()), @@ -63,6 +71,7 @@ impl TryFrom for BannedR .timestamp_opt(report.until as i64, 0) .single() .ok_or_else(|| anyhow::anyhow!("invalid until: {}", report.until))?, + reason, }) } } @@ -143,10 +152,17 @@ where async fn run(mut self, mut shutdown: triggered::Listener) -> anyhow::Result<()> { tracing::info!("service provider boosted rewards ban ingestor starting"); + + let mut cleanup_interval = + tokio::time::interval(std::time::Duration::from_secs(CLEANUP_INTERVAL_SECS)); + loop { tokio::select! { biased; _ = &mut shutdown => break, + _ = cleanup_interval.tick() => { + db::cleanup(&self.pool).await?; + } Some(file) = self.receiver.recv() => { self.process_file(file).await?; } @@ -168,35 +184,45 @@ where .await? .map(anyhow::Ok) .try_fold(transaction, |mut tx, ingest| async move { - let report = BannedRadioReport::try_from(ingest.clone())?; - let is_authorized = self.is_authorized(&report.pubkey).await?; + self.process_ingest_report(&mut tx, ingest).await?; + Ok(tx) + }) + .await? + .commit() + .await?; - if is_authorized { - save(&mut tx, report).await?; - } + self.verified_sink.commit().await?; - let status = match is_authorized { + Ok(()) + } + + async fn process_ingest_report( + &self, + transaction: &mut Transaction<'_, Postgres>, + ingest: ServiceProviderBoostedRewardsBannedRadioIngestReportV1, + ) -> anyhow::Result<()> { + let report = BannedRadioReport::try_from(ingest.clone())?; + let is_authorized = self.is_authorized(&report.pubkey).await?; + + if is_authorized { + db::update_report(transaction, report).await?; + } + + let status = match is_authorized { true => ServiceProviderBoostedRewardsBannedRadioVerificationStatus::SpBoostedRewardsBanValid, false => ServiceProviderBoostedRewardsBannedRadioVerificationStatus::SpBoostedRewardsBanInvalidCarrierKey, }; - let verified_report = - VerifiedServiceProviderBoostedRewardsBannedRadioIngestReportV1 { - report: Some(ingest), - status: status.into(), - timestamp: Utc::now().timestamp_millis() as u64, - }; - - self.verified_sink.write(verified_report, &[("status", status.as_str_name())]).await?; + let verified_report = VerifiedServiceProviderBoostedRewardsBannedRadioIngestReportV1 { + report: Some(ingest), + status: status.into(), + timestamp: Utc::now().timestamp_millis() as u64, + }; - Ok(tx) - }) - .await? - .commit() + self.verified_sink + .write(verified_report, &[("status", status.as_str_name())]) .await?; - self.verified_sink.commit().await?; - Ok(()) } @@ -208,22 +234,76 @@ where } } -async fn save( - transaction: &mut Transaction<'_, Postgres>, - report: BannedRadioReport, -) -> anyhow::Result<()> { - sqlx::query( - r#" +mod db { + use chrono::Duration; + + use super::*; + + pub(super) async fn cleanup(pool: &PgPool) -> anyhow::Result<()> { + sqlx::query( + r#" + DELETE FROM sp_boosted_rewards_bans + WHERE until < $1 or invalidated_at < $1 + "#, + ) + .bind(Utc::now() - Duration::days(CLEANUP_DAYS)) + .execute(pool) + .await + .map(|_| ()) + .map_err(anyhow::Error::from) + } + + pub(super) async fn update_report( + transaction: &mut Transaction<'_, Postgres>, + report: BannedRadioReport, + ) -> anyhow::Result<()> { + match report.reason { + SpBoostedRewardsBannedRadioReason::Unbanned => { + invalidate_all_before(transaction, report).await + } + _ => save(transaction, report).await, + } + } + + async fn save( + transaction: &mut Transaction<'_, Postgres>, + report: BannedRadioReport, + ) -> anyhow::Result<()> { + sqlx::query( + r#" INSERT INTO sp_boosted_rewards_bans(radio_type, radio_key, received_timestamp, until) VALUES($1,$2,$3,$4) "#, - ) - .bind(report.radio_type) - .bind(report.radio_key) - .bind(report.received_timestamp) - .bind(report.until) - .execute(transaction) - .await - .map(|_| ()) - .map_err(anyhow::Error::from) + ) + .bind(report.radio_type) + .bind(report.radio_key) + .bind(report.received_timestamp) + .bind(report.until) + .execute(transaction) + .await + .map(|_| ()) + .map_err(anyhow::Error::from) + } + + async fn invalidate_all_before( + transaction: &mut Transaction<'_, Postgres>, + report: BannedRadioReport, + ) -> anyhow::Result<()> { + sqlx::query( + r#" + UPDATE sp_boosted_rewards_bans + SET invalidated_at = now() + WHERE radio_type = $1 + AND radio_key = $2 + AND received_timestamp <= $3 + "#, + ) + .bind(report.radio_type) + .bind(report.radio_key) + .bind(report.received_timestamp) + .execute(transaction) + .await + .map(|_| ()) + .map_err(anyhow::Error::from) + } }