Skip to content

Commit

Permalink
Add cleanup task to remove old data from db table
Browse files Browse the repository at this point in the history
  • Loading branch information
bbalser committed Jun 27, 2024
1 parent 1d58063 commit a0501ab
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 36 deletions.
1 change: 1 addition & 0 deletions mobile_verifier/migrations/34_sp_boosted_rewards_bans.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ CREATE TABLE IF NOT EXISTS sp_boosted_rewards_bans (
radio_key TEXT NOT NULL,
received_timestamp TIMESTAMPTZ NOT NULL,
until TIMESTAMPTZ NOT NULL,
invalidated_at TIMESTAMPTZ,
PRIMARY KEY (radio_type, radio_key)
);
152 changes: 116 additions & 36 deletions mobile_verifier/src/sp_boosted_rewards_bans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use helium_crypto::PublicKeyBinary;
use helium_proto::services::{
mobile_config::NetworkKeyRole,
poc_mobile::{
service_provider_boosted_rewards_banned_radio_req_v1::KeyType,
service_provider_boosted_rewards_banned_radio_req_v1::{
KeyType, SpBoostedRewardsBannedRadioReason,
},
ServiceProviderBoostedRewardsBannedRadioIngestReportV1,
ServiceProviderBoostedRewardsBannedRadioVerificationStatus,
VerifiedServiceProviderBoostedRewardsBannedRadioIngestReportV1,
Expand All @@ -25,12 +27,16 @@ use tokio::sync::mpsc::Receiver;

use crate::Settings;

const CLEANUP_DAYS: i64 = 7;
const CLEANUP_INTERVAL_SECS: u64 = 12 * 60 * 60;

struct BannedRadioReport {
received_timestamp: DateTime<Utc>,
pubkey: PublicKeyBinary,
radio_type: String,
radio_key: String,
until: DateTime<Utc>,
reason: SpBoostedRewardsBannedRadioReason,
}

impl TryFrom<ServiceProviderBoostedRewardsBannedRadioIngestReportV1> for BannedRadioReport {
Expand All @@ -43,6 +49,8 @@ impl TryFrom<ServiceProviderBoostedRewardsBannedRadioIngestReportV1> for BannedR
.report
.ok_or_else(|| anyhow::anyhow!("invalid ingest report"))?;

let reason = report.reason();

let (radio_type, radio_key) = match report.key_type {
Some(KeyType::CbsdId(cbsd_id)) => ("cbrs", cbsd_id),
Some(KeyType::HotspotKey(bytes)) => ("wifi", PublicKeyBinary::from(bytes).to_string()),
Expand All @@ -63,6 +71,7 @@ impl TryFrom<ServiceProviderBoostedRewardsBannedRadioIngestReportV1> for BannedR
.timestamp_opt(report.until as i64, 0)
.single()
.ok_or_else(|| anyhow::anyhow!("invalid until: {}", report.until))?,
reason,
})
}
}
Expand Down Expand Up @@ -143,10 +152,17 @@ where

async fn run(mut self, mut shutdown: triggered::Listener) -> anyhow::Result<()> {
tracing::info!("service provider boosted rewards ban ingestor starting");

let mut cleanup_interval =
tokio::time::interval(std::time::Duration::from_secs(CLEANUP_INTERVAL_SECS));

loop {
tokio::select! {
biased;
_ = &mut shutdown => break,
_ = cleanup_interval.tick() => {
db::cleanup(&self.pool).await?;
}
Some(file) = self.receiver.recv() => {
self.process_file(file).await?;
}
Expand All @@ -168,35 +184,45 @@ where
.await?
.map(anyhow::Ok)
.try_fold(transaction, |mut tx, ingest| async move {
let report = BannedRadioReport::try_from(ingest.clone())?;
let is_authorized = self.is_authorized(&report.pubkey).await?;
self.process_ingest_report(&mut tx, ingest).await?;
Ok(tx)
})
.await?
.commit()
.await?;

if is_authorized {
save(&mut tx, report).await?;
}
self.verified_sink.commit().await?;

let status = match is_authorized {
Ok(())
}

async fn process_ingest_report(
&self,
transaction: &mut Transaction<'_, Postgres>,
ingest: ServiceProviderBoostedRewardsBannedRadioIngestReportV1,
) -> anyhow::Result<()> {
let report = BannedRadioReport::try_from(ingest.clone())?;
let is_authorized = self.is_authorized(&report.pubkey).await?;

if is_authorized {
db::update_report(transaction, report).await?;
}

let status = match is_authorized {
true => ServiceProviderBoostedRewardsBannedRadioVerificationStatus::SpBoostedRewardsBanValid,
false => ServiceProviderBoostedRewardsBannedRadioVerificationStatus::SpBoostedRewardsBanInvalidCarrierKey,
};

let verified_report =
VerifiedServiceProviderBoostedRewardsBannedRadioIngestReportV1 {
report: Some(ingest),
status: status.into(),
timestamp: Utc::now().timestamp_millis() as u64,
};

self.verified_sink.write(verified_report, &[("status", status.as_str_name())]).await?;
let verified_report = VerifiedServiceProviderBoostedRewardsBannedRadioIngestReportV1 {
report: Some(ingest),
status: status.into(),
timestamp: Utc::now().timestamp_millis() as u64,
};

Ok(tx)
})
.await?
.commit()
self.verified_sink
.write(verified_report, &[("status", status.as_str_name())])
.await?;

self.verified_sink.commit().await?;

Ok(())
}

Expand All @@ -208,22 +234,76 @@ where
}
}

async fn save(
transaction: &mut Transaction<'_, Postgres>,
report: BannedRadioReport,
) -> anyhow::Result<()> {
sqlx::query(
r#"
mod db {
use chrono::Duration;

use super::*;

pub(super) async fn cleanup(pool: &PgPool) -> anyhow::Result<()> {
sqlx::query(
r#"
DELETE FROM sp_boosted_rewards_bans
WHERE until < $1 or invalidated_at < $1
"#,
)
.bind(Utc::now() - Duration::days(CLEANUP_DAYS))
.execute(pool)
.await
.map(|_| ())
.map_err(anyhow::Error::from)
}

pub(super) async fn update_report(
transaction: &mut Transaction<'_, Postgres>,
report: BannedRadioReport,
) -> anyhow::Result<()> {
match report.reason {
SpBoostedRewardsBannedRadioReason::Unbanned => {
invalidate_all_before(transaction, report).await
}
_ => save(transaction, report).await,
}
}

async fn save(
transaction: &mut Transaction<'_, Postgres>,
report: BannedRadioReport,
) -> anyhow::Result<()> {
sqlx::query(
r#"
INSERT INTO sp_boosted_rewards_bans(radio_type, radio_key, received_timestamp, until)
VALUES($1,$2,$3,$4)
"#,
)
.bind(report.radio_type)
.bind(report.radio_key)
.bind(report.received_timestamp)
.bind(report.until)
.execute(transaction)
.await
.map(|_| ())
.map_err(anyhow::Error::from)
)
.bind(report.radio_type)
.bind(report.radio_key)
.bind(report.received_timestamp)
.bind(report.until)
.execute(transaction)
.await
.map(|_| ())
.map_err(anyhow::Error::from)
}

async fn invalidate_all_before(
transaction: &mut Transaction<'_, Postgres>,
report: BannedRadioReport,
) -> anyhow::Result<()> {
sqlx::query(
r#"
UPDATE sp_boosted_rewards_bans
SET invalidated_at = now()
WHERE radio_type = $1
AND radio_key = $2
AND received_timestamp <= $3
"#,
)
.bind(report.radio_type)
.bind(report.radio_key)
.bind(report.received_timestamp)
.execute(transaction)
.await
.map(|_| ())
.map_err(anyhow::Error::from)
}
}

0 comments on commit a0501ab

Please sign in to comment.