diff --git a/Cargo.lock b/Cargo.lock index d71279090..4081a3320 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1617,7 +1617,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=bbalser/hip-125#aa27cf79e0ed886bb138253263297a36741f3975" +source = "git+https://github.com/helium/proto?branch=bbalser/hip-125#64813bee906724e9ac3abccb2ffcce4bd677f6a6" dependencies = [ "base64 0.21.7", "byteorder", @@ -3801,7 +3801,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=bbalser/hip-125#aa27cf79e0ed886bb138253263297a36741f3975" +source = "git+https://github.com/helium/proto?branch=bbalser/hip-125#64813bee906724e9ac3abccb2ffcce4bd677f6a6" dependencies = [ "bytes", "prost", @@ -9906,7 +9906,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.10.8", + "sha2 0.9.9", "thiserror", "twox-hash", "xorf", diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index 7addfae61..4eaab2d0e 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -158,6 +158,8 @@ pub const URBANIZATION_DATA_SET: &str = "urbanization"; pub const FOOTFALL_DATA_SET: &str = "footfall"; pub const LANDTYPE_DATA_SET: &str = "landtype"; pub const SP_BOOSTED_REWARDS_BANNED_RADIO: &str = "service_provider_boosted_rewards_banned_radio"; +pub const VERIFIED_SP_BOOSTED_REWARDS_BANNED_RADIO: &str = + "verified_service_provider_boosted_rewards_banned_radio"; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)] #[serde(rename_all = "snake_case")] @@ -211,6 +213,7 @@ pub enum FileType { InvalidatedRadioThresholdIngestReport, VerifiedInvalidatedRadioThresholdIngestReport, ServiceProviderBoostedRewardsBannedRadioIngestReport, + VerifiedServiceProviderBoostedRewardsBannedRadioIngestReport, } impl fmt::Display for FileType { @@ -275,6 +278,9 @@ impl fmt::Display for FileType { Self::ServiceProviderBoostedRewardsBannedRadioIngestReport => { SP_BOOSTED_REWARDS_BANNED_RADIO } + Self::VerifiedServiceProviderBoostedRewardsBannedRadioIngestReport => { + VERIFIED_SP_BOOSTED_REWARDS_BANNED_RADIO + } }; f.write_str(s) } @@ -342,6 +348,9 @@ impl FileType { Self::ServiceProviderBoostedRewardsBannedRadioIngestReport => { SP_BOOSTED_REWARDS_BANNED_RADIO } + Self::VerifiedServiceProviderBoostedRewardsBannedRadioIngestReport => { + VERIFIED_SP_BOOSTED_REWARDS_BANNED_RADIO + } } } } @@ -409,6 +418,9 @@ impl FromStr for FileType { SP_BOOSTED_REWARDS_BANNED_RADIO => { Self::ServiceProviderBoostedRewardsBannedRadioIngestReport } + VERIFIED_SP_BOOSTED_REWARDS_BANNED_RADIO => { + Self::VerifiedServiceProviderBoostedRewardsBannedRadioIngestReport + } _ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))), }; Ok(result) diff --git a/file_store/src/file_info_poller.rs b/file_store/src/file_info_poller.rs index a7bf6950a..7d80183bf 100644 --- a/file_store/src/file_info_poller.rs +++ b/file_store/src/file_info_poller.rs @@ -277,10 +277,10 @@ where } } -pub struct ProtoFileInfoPollerParser; +pub struct MsgDecodeFileInfoPollerParser; #[async_trait::async_trait] -impl FileInfoPollerParser for ProtoFileInfoPollerParser +impl FileInfoPollerParser for MsgDecodeFileInfoPollerParser where T: MsgDecode + TryFrom + Send + Sync + 'static, { @@ -312,6 +312,41 @@ where } } +pub struct ProstFileInfoPollerParser; + +#[async_trait::async_trait] +impl FileInfoPollerParser for ProstFileInfoPollerParser +where + T: helium_proto::Message + Default, +{ + async fn parse(&self, byte_stream: ByteStream) -> Result> { + Ok(file_store::stream_source(byte_stream) + .filter_map(|msg| async { + msg.map_err(|err| { + tracing::error!( + "Error streaming entry in file of type {}: {err:?}", + std::any::type_name::() + ); + err + }) + .ok() + }) + .filter_map(|msg| async { + ::decode(msg) + .map_err(|err| { + tracing::error!( + "Error in decoding message of type {}: {err:?}", + std::any::type_name::() + ); + err + }) + .ok() + }) + .collect() + .await) + } +} + fn create_cache() -> MemoryFileCache { Arc::new(Cache::new()) } diff --git a/file_store/src/file_source.rs b/file_store/src/file_source.rs index 8290d75ab..ff77f1162 100644 --- a/file_store/src/file_source.rs +++ b/file_store/src/file_source.rs @@ -1,5 +1,5 @@ use crate::{ - file_info_poller::{FileInfoPollerConfigBuilder, ProtoFileInfoPollerParser}, + file_info_poller::{FileInfoPollerConfigBuilder, MsgDecodeFileInfoPollerParser}, file_sink, BytesMutStream, Error, }; use async_compression::tokio::bufread::GzipDecoder; @@ -11,12 +11,12 @@ use std::path::{Path, PathBuf}; use tokio::{fs::File, io::BufReader}; use tokio_util::codec::{length_delimited::LengthDelimitedCodec, FramedRead}; -pub fn continuous_source() -> FileInfoPollerConfigBuilder +pub fn continuous_source() -> FileInfoPollerConfigBuilder where T: Clone, { - FileInfoPollerConfigBuilder::::default() - .parser(ProtoFileInfoPollerParser) + FileInfoPollerConfigBuilder::::default() + .parser(MsgDecodeFileInfoPollerParser) } pub fn source(paths: I) -> BytesMutStream diff --git a/file_store/src/lib.rs b/file_store/src/lib.rs index d7577e67e..66ab76973 100644 --- a/file_store/src/lib.rs +++ b/file_store/src/lib.rs @@ -22,7 +22,6 @@ pub mod mobile_subscriber; pub mod mobile_transfer; pub mod reward_manifest; mod settings; -pub mod sp_boosted_rewards_bans; pub mod speedtest; pub mod traits; pub mod wifi_heartbeat; diff --git a/file_store/src/sp_boosted_rewards_bans.rs b/file_store/src/sp_boosted_rewards_bans.rs deleted file mode 100644 index 06eb9e3bb..000000000 --- a/file_store/src/sp_boosted_rewards_bans.rs +++ /dev/null @@ -1,78 +0,0 @@ -use chrono::{DateTime, TimeZone, Utc}; -use helium_crypto::PublicKeyBinary; -use helium_proto::services::poc_mobile as proto; -use serde::{Deserialize, Serialize}; - -use crate::{error::DecodeError, traits::MsgDecode, Error}; - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct ServiceProviderBoostedRewardBannedRadio { - pub pubkey: PublicKeyBinary, - pub key_type: KeyType, - pub reason: proto::service_provider_boosted_rewards_banned_radio_req_v1::SpBoostedRewardsBannedRadioReason, - pub until: DateTime, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum KeyType { - CbsdId(String), - HotspotKey(PublicKeyBinary), -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct ServiceProviderBoostedRewardBannedRadioIngest { - pub received_timestamp: DateTime, - pub report: ServiceProviderBoostedRewardBannedRadio, -} - -impl MsgDecode for ServiceProviderBoostedRewardBannedRadio { - type Msg = proto::ServiceProviderBoostedRewardsBannedRadioReqV1; -} - -impl MsgDecode for ServiceProviderBoostedRewardBannedRadioIngest { - type Msg = proto::ServiceProviderBoostedRewardsBannedRadioIngestReportV1; -} - -impl TryFrom - for ServiceProviderBoostedRewardBannedRadio -{ - type Error = Error; - - fn try_from( - value: proto::ServiceProviderBoostedRewardsBannedRadioReqV1, - ) -> Result { - let reason = value.reason(); - - Ok(Self { - pubkey: value.pub_key.into(), - key_type: match value.key_type { - Some(proto::service_provider_boosted_rewards_banned_radio_req_v1::KeyType::CbsdId(cbsd_id)) => KeyType::CbsdId(cbsd_id), - Some(proto::service_provider_boosted_rewards_banned_radio_req_v1::KeyType::HotspotKey(bytes)) => KeyType::HotspotKey(bytes.into()), - None => return Err(Error::NotFound("key_type".to_string())), - }, - reason, - until: Utc.timestamp_opt(value.until as i64, 0).single().ok_or_else(|| DecodeError::invalid_timestamp(value.until))?, - }) - } -} - -impl TryFrom - for ServiceProviderBoostedRewardBannedRadioIngest -{ - type Error = Error; - - fn try_from( - value: proto::ServiceProviderBoostedRewardsBannedRadioIngestReportV1, - ) -> Result { - Ok(Self { - received_timestamp: Utc - .timestamp_millis_opt(value.received_timestamp as i64) - .single() - .ok_or_else(|| DecodeError::invalid_timestamp(value.received_timestamp))?, - report: value - .report - .ok_or_else(|| Error::not_found("report not found"))? - .try_into()?, - }) - } -} diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 7a90a8c21..dd32b1a77 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -186,9 +186,10 @@ impl Cmd { .add_task( ServiceProviderBoostedRewardsBanIngestor::create_managed_task( pool.clone(), + file_upload.clone(), report_ingest, auth_client, - settings.start_after, + settings, ) .await?, ) diff --git a/mobile_verifier/src/sp_boosted_rewards_bans.rs b/mobile_verifier/src/sp_boosted_rewards_bans.rs index 21bba4fdd..ea2fa854d 100644 --- a/mobile_verifier/src/sp_boosted_rewards_bans.rs +++ b/mobile_verifier/src/sp_boosted_rewards_bans.rs @@ -1,22 +1,77 @@ -use chrono::{DateTime, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use file_store::{ - file_info_poller::{FileInfoStream, LookbackBehavior}, - file_source, - sp_boosted_rewards_bans::{KeyType, ServiceProviderBoostedRewardBannedRadioIngest}, + file_info_poller::{ + FileInfoPollerConfigBuilder, FileInfoStream, LookbackBehavior, ProstFileInfoPollerParser, + }, + file_sink::{self, FileSinkClient}, + file_upload::FileUpload, FileStore, FileType, }; use futures::{prelude::future::LocalBoxFuture, StreamExt, TryFutureExt, TryStreamExt}; use helium_crypto::PublicKeyBinary; -use helium_proto::services::mobile_config::NetworkKeyRole; +use helium_proto::services::{ + mobile_config::NetworkKeyRole, + poc_mobile::{ + service_provider_boosted_rewards_banned_radio_req_v1::KeyType, + ServiceProviderBoostedRewardsBannedRadioIngestReportV1, + ServiceProviderBoostedRewardsBannedRadioVerificationStatus, + VerifiedServiceProviderBoostedRewardsBannedRadioIngestReportV1, + }, +}; use mobile_config::client::authorization_client::AuthorizationVerifier; use sqlx::{PgPool, Postgres, Transaction}; use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::Receiver; +use crate::Settings; + +struct BannedRadioReport { + received_timestamp: DateTime, + pubkey: PublicKeyBinary, + radio_type: String, + radio_key: String, + until: DateTime, +} + +impl TryFrom for BannedRadioReport { + type Error = anyhow::Error; + + fn try_from( + value: ServiceProviderBoostedRewardsBannedRadioIngestReportV1, + ) -> Result { + let report = value + .report + .ok_or_else(|| anyhow::anyhow!("invalid ingest report"))?; + + let (radio_type, radio_key) = match report.key_type { + Some(KeyType::CbsdId(cbsd_id)) => ("cbrs", cbsd_id), + Some(KeyType::HotspotKey(bytes)) => ("wifi", PublicKeyBinary::from(bytes).to_string()), + None => anyhow::bail!("Invalid keytype"), + }; + + Ok(Self { + received_timestamp: Utc + .timestamp_millis_opt(value.received_timestamp as i64) + .single() + .ok_or_else(|| { + anyhow::anyhow!("invalid received timestamp, {}", value.received_timestamp) + })?, + pubkey: report.pub_key.into(), + radio_type: radio_type.to_string(), + radio_key, + until: Utc + .timestamp_opt(report.until as i64, 0) + .single() + .ok_or_else(|| anyhow::anyhow!("invalid until: {}", report.until))?, + }) + } +} + pub struct ServiceProviderBoostedRewardsBanIngestor { pool: PgPool, authorization_verifier: AV, - receiver: Receiver>, + receiver: Receiver>, + verified_sink: FileSinkClient, } impl ManagedTask for ServiceProviderBoostedRewardsBanIngestor @@ -44,26 +99,43 @@ where { pub async fn create_managed_task( pool: PgPool, + file_upload: FileUpload, file_store: FileStore, authorization_verifier: AV, - start_after: DateTime, + settings: &Settings, ) -> anyhow::Result { - let (receiver, ingest_server) = - file_source::continuous_source::() - .state(pool.clone()) - .store(file_store) - .lookback(LookbackBehavior::StartAfter(start_after)) - .prefix(FileType::ServiceProviderBoostedRewardsBannedRadioIngestReport.to_string()) - .create() - .await?; + let (verified_sink, verified_sink_server) = file_sink::FileSinkBuilder::new( + FileType::VerifiedServiceProviderBoostedRewardsBannedRadioIngestReport, + settings.store_base_path(), + file_upload, + concat!(env!("CARGO_PKG_NAME"), "_verified_sp_boosted_rewards_ban"), + ) + .auto_commit(false) + .create() + .await?; + + let (receiver, ingest_server) = FileInfoPollerConfigBuilder::< + ServiceProviderBoostedRewardsBannedRadioIngestReportV1, + _, + _, + >::default() + .parser(ProstFileInfoPollerParser) + .state(pool.clone()) + .store(file_store) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) + .prefix(FileType::ServiceProviderBoostedRewardsBannedRadioIngestReport.to_string()) + .create() + .await?; let ingestor = Self { pool, authorization_verifier, receiver, + verified_sink, }; Ok(TaskManager::builder() + .add_task(verified_sink_server) .add_task(ingest_server) .add_task(ingestor) .build()) @@ -87,7 +159,7 @@ where async fn process_file( &self, - file_info_stream: FileInfoStream, + file_info_stream: FileInfoStream, ) -> anyhow::Result<()> { tracing::info!(file = %file_info_stream.file_info.key, "processing sp boosted rewards ban file"); let mut transaction = self.pool.begin().await?; @@ -96,16 +168,35 @@ where .await? .map(anyhow::Ok) .try_fold(transaction, |mut tx, ingest| async move { - if self.is_authorized(&ingest.report.pubkey).await? { - save(&mut tx, ingest).await?; + let report = BannedRadioReport::try_from(ingest.clone())?; + let is_authorized = self.is_authorized(&report.pubkey).await?; + + if is_authorized { + save(&mut tx, report).await?; } + let status = match is_authorized { + true => ServiceProviderBoostedRewardsBannedRadioVerificationStatus::SpBoostedRewardsBanValid, + false => ServiceProviderBoostedRewardsBannedRadioVerificationStatus::SpBoostedRewardsBanInvalidCarrierKey, + }; + + let verified_report = + VerifiedServiceProviderBoostedRewardsBannedRadioIngestReportV1 { + report: Some(ingest), + status: status.into(), + timestamp: Utc::now().timestamp_millis() as u64, + }; + + self.verified_sink.write(verified_report, &[("status", status.as_str_name())]).await?; + Ok(tx) }) .await? .commit() .await?; + self.verified_sink.commit().await?; + Ok(()) } @@ -119,23 +210,18 @@ where async fn save( transaction: &mut Transaction<'_, Postgres>, - ingest: ServiceProviderBoostedRewardBannedRadioIngest, + report: BannedRadioReport, ) -> anyhow::Result<()> { - let (radio_type, radio_key) = match ingest.report.key_type { - KeyType::CbsdId(cbsd_id) => ("cbrs", cbsd_id), - KeyType::HotspotKey(pubkey) => ("wifi", pubkey.to_string()), - }; - sqlx::query( r#" INSERT INTO sp_boosted_rewards_bans(radio_type, radio_key, received_timestamp, until) VALUES($1,$2,$3,$4) "#, ) - .bind(radio_type) - .bind(radio_key) - .bind(ingest.received_timestamp) - .bind(ingest.report.until) + .bind(report.radio_type) + .bind(report.radio_key) + .bind(report.received_timestamp) + .bind(report.until) .execute(transaction) .await .map(|_| ())