From 033bf5ba175cc05cf86ed7b24f35da3c1fff9df7 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Fri, 4 Aug 2023 12:28:32 +0100 Subject: [PATCH 1/2] move denylist check from loader to the runner verification list --- denylist/src/denylist.rs | 18 ++++- iot_verifier/src/loader.rs | 56 +-------------- iot_verifier/src/main.rs | 2 +- iot_verifier/src/poc.rs | 135 ++++++++++++++++++++++++++++++------- iot_verifier/src/runner.rs | 52 ++++++++++++-- 5 files changed, 175 insertions(+), 88 deletions(-) diff --git a/denylist/src/denylist.rs b/denylist/src/denylist.rs index 4960e271d..35c898151 100644 --- a/denylist/src/denylist.rs +++ b/denylist/src/denylist.rs @@ -1,6 +1,6 @@ use crate::{client::DenyListClient, models::metadata::Asset, Error, Result}; use bytes::Buf; -use helium_crypto::{PublicKey, Verify}; +use helium_crypto::{PublicKey, PublicKeyBinary, Verify}; use serde::Serialize; use std::{fs, hash::Hasher, path, str::FromStr}; use twox_hash::XxHash64; @@ -25,6 +25,20 @@ pub struct DenyList { pub filter: Xor32, } +impl TryFrom> for DenyList { + type Error = Error; + fn try_from(v: Vec) -> Result { + let keys: Vec = v.into_iter().map(public_key_hash).collect(); + let filter = Xor32::from(&keys); + let client = DenyListClient::new()?; + Ok(Self { + tag_name: 0, + client, + filter, + }) + } +} + impl DenyList { pub fn new() -> Result { tracing::debug!("initializing new denylist"); @@ -89,7 +103,7 @@ impl DenyList { Ok(()) } - pub async fn check_key>(&self, pub_key: K) -> bool { + pub fn check_key>(&self, pub_key: K) -> bool { if self.filter.len() == 0 { tracing::warn!("empty denylist filter, rejecting key"); return true; diff --git a/iot_verifier/src/loader.rs b/iot_verifier/src/loader.rs index 42f1a43ca..04ee62584 100644 --- a/iot_verifier/src/loader.rs +++ b/iot_verifier/src/loader.rs @@ -7,7 +7,6 @@ use crate::{ }; use chrono::DateTime; use chrono::{Duration as ChronoDuration, Utc}; -use denylist::DenyList; use file_store::{ iot_beacon_report::IotBeaconIngestReport, iot_witness_report::IotWitnessIngestReport, @@ -17,7 +16,7 @@ use file_store::{ use futures::{stream, StreamExt}; use helium_crypto::PublicKeyBinary; use sqlx::PgPool; -use std::{hash::Hasher, ops::DerefMut, time::Duration}; +use std::{hash::Hasher, ops::DerefMut}; use tokio::{ sync::Mutex, time::{self, MissedTickBehavior}, @@ -34,9 +33,6 @@ pub struct Loader { window_width: ChronoDuration, ingestor_rollup_time: ChronoDuration, max_lookback_age: ChronoDuration, - deny_list_latest_url: String, - deny_list_trigger_interval: Duration, - deny_list: DenyList, } #[derive(thiserror::Error, Debug)] @@ -45,13 +41,10 @@ pub enum NewLoaderError { FileStoreError(#[from] file_store::Error), #[error("db_store error: {0}")] DbStoreError(#[from] db_store::Error), - #[error("denylist error: {0}")] - DenyListError(#[from] denylist::Error), } pub enum ValidGatewayResult { Valid, - Denied, Unknown, } @@ -63,7 +56,6 @@ impl Loader { let window_width = settings.poc_loader_window_width(); let ingestor_rollup_time = settings.ingestor_rollup_time(); let max_lookback_age = settings.loader_window_max_lookback_age(); - let deny_list = DenyList::new()?; Ok(Self { pool, ingest_store, @@ -71,9 +63,6 @@ impl Loader { window_width, ingestor_rollup_time, max_lookback_age, - deny_list_latest_url: settings.denylist.denylist_url.clone(), - deny_list_trigger_interval: settings.denylist.trigger_interval(), - deny_list, }) } @@ -85,21 +74,12 @@ impl Loader { tracing::info!("started verifier loader"); let mut report_timer = time::interval(self.poll_time); report_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); - let mut denylist_timer = time::interval(self.deny_list_trigger_interval); - denylist_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { if shutdown.is_triggered() { break; } tokio::select! { _ = shutdown.clone() => break, - _ = denylist_timer.tick() => - match self.handle_denylist_tick().await { - Ok(()) => (), - Err(err) => { - tracing::error!("fatal loader error, denylist_tick triggered: {err:?}"); - } - }, _ = report_timer.tick() => match self.handle_report_tick(gateway_cache).await { Ok(()) => (), Err(err) => { @@ -112,23 +92,6 @@ impl Loader { Ok(()) } - async fn handle_denylist_tick(&mut self) -> anyhow::Result<()> { - tracing::info!("handling denylist tick"); - // sink any errors whilst updating the denylist - // the verifier should not stop just because github - // could not be reached for example - match self - .deny_list - .update_to_latest(&self.deny_list_latest_url) - .await - { - Ok(()) => (), - Err(e) => tracing::warn!("failed to update denylist: {e}"), - } - tracing::info!("completed handling denylist tick"); - Ok(()) - } - async fn handle_report_tick(&self, gateway_cache: &GatewayCache) -> anyhow::Result<()> { tracing::info!("handling report tick"); let now = Utc::now(); @@ -375,11 +338,6 @@ impl Loader { }; Ok(Some(res)) } - ValidGatewayResult::Denied => { - metrics.increment_beacons_denied(); - Ok(None) - } - ValidGatewayResult::Unknown => { metrics.increment_beacons_unknown(); Ok(None) @@ -410,10 +368,6 @@ impl Loader { metrics.increment_witnesses(); Ok(Some(res)) } - ValidGatewayResult::Denied => { - metrics.increment_witnesses_denied(); - Ok(None) - } ValidGatewayResult::Unknown => { metrics.increment_witnesses_unknown(); Ok(None) @@ -445,10 +399,6 @@ impl Loader { pub_key: &PublicKeyBinary, gateway_cache: &GatewayCache, ) -> ValidGatewayResult { - if self.check_gw_denied(pub_key).await { - tracing::debug!("dropping denied gateway : {:?}", &pub_key); - return ValidGatewayResult::Denied; - } if self.check_unknown_gw(pub_key, gateway_cache).await { tracing::debug!("dropping unknown gateway: {:?}", &pub_key); return ValidGatewayResult::Unknown; @@ -463,10 +413,6 @@ impl Loader { ) -> bool { gateway_cache.resolve_gateway_info(pub_key).await.is_err() } - - async fn check_gw_denied(&self, pub_key: &PublicKeyBinary) -> bool { - self.deny_list.check_key(pub_key).await - } } fn filter_key_hash(data: &[u8]) -> u64 { diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index a80f820ef..8ad6e7189 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -164,7 +164,7 @@ impl Server { // init da processes let mut loader = loader::Loader::from_settings(settings, pool.clone()).await?; - let mut runner = runner::Runner::from_settings(settings, pool.clone()).await?; + let mut runner = runner::Runner::new(settings, pool.clone()).await?; let purger = purger::Purger::from_settings(settings, pool.clone()).await?; let mut density_scaler = DensityScaler::from_settings(settings, pool, gateway_updater_receiver.clone()).await?; diff --git a/iot_verifier/src/poc.rs b/iot_verifier/src/poc.rs index c823bf51a..7099909d8 100644 --- a/iot_verifier/src/poc.rs +++ b/iot_verifier/src/poc.rs @@ -8,6 +8,7 @@ use crate::{ }; use beacon; use chrono::{DateTime, Duration, Utc}; +use denylist::denylist::DenyList; use file_store::{ iot_beacon_report::{IotBeaconIngestReport, IotBeaconReport}, iot_valid_poc::IotVerifiedWitnessReport, @@ -99,6 +100,7 @@ impl Poc { } } + #[allow(clippy::too_many_arguments)] pub async fn verify_beacon( &mut self, hex_density_map: impl HexDensityMap, @@ -107,6 +109,7 @@ impl Poc { pool: &PgPool, beacon_interval: Duration, beacon_interval_tolerance: Duration, + deny_list: &DenyList, ) -> Result { let beacon = &self.beacon_report.report; let beaconer_pub_key = beacon.pub_key.clone(); @@ -137,6 +140,7 @@ impl Poc { // we have beaconer info, proceed to verifications let last_beacon = LastBeacon::get(pool, beaconer_pub_key.as_ref()).await?; match do_beacon_verifications( + deny_list, self.entropy_start, self.entropy_end, self.entropy_version, @@ -163,6 +167,7 @@ impl Poc { beacon_info: &GatewayInfo, hex_density_map: impl HexDensityMap, gateway_cache: &GatewayCache, + deny_list: &DenyList, ) -> Result { let mut verified_witnesses: Vec = Vec::new(); let mut failed_witnesses: Vec = Vec::new(); @@ -176,6 +181,7 @@ impl Poc { // not a dup, run the verifications match self .verify_witness( + deny_list, &witness_report, beacon_info, gateway_cache, @@ -214,6 +220,7 @@ impl Poc { async fn verify_witness( &mut self, + deny_list: &DenyList, witness_report: &IotWitnessIngestReport, beaconer_info: &GatewayInfo, gateway_cache: &GatewayCache, @@ -268,6 +275,7 @@ impl Poc { }; // run the witness verifications match do_witness_verifications( + deny_list, self.entropy_start, self.entropy_end, witness_report, @@ -304,6 +312,7 @@ impl Poc { #[allow(clippy::too_many_arguments)] pub fn do_beacon_verifications( + deny_list: &DenyList, entropy_start: DateTime, entropy_end: DateTime, entropy_version: i32, @@ -323,6 +332,7 @@ pub fn do_beacon_verifications( Some(ref metadata) => metadata, None => return Err(InvalidReason::NotAsserted), }; + verify_denylist(&beacon_report.report.pub_key, deny_list)?; verify_entropy(entropy_start, entropy_end, beacon_received_ts)?; verify_gw_capability(beaconer_info.is_full_hotspot)?; verify_beacon_schedule( @@ -347,6 +357,7 @@ pub fn do_beacon_verifications( } pub fn do_witness_verifications( + deny_list: &DenyList, entropy_start: DateTime, entropy_end: DateTime, witness_report: &IotWitnessIngestReport, @@ -363,6 +374,7 @@ pub fn do_witness_verifications( Some(ref metadata) => metadata, None => return Err(InvalidReason::NotAsserted), }; + verify_denylist(&witness_report.report.pub_key, deny_list)?; verify_self_witness( &beacon_report.report.pub_key, &witness_report.report.pub_key, @@ -423,6 +435,20 @@ fn verify_beacon_schedule( Ok(()) } +/// verify if gateway is on the deny list +fn verify_denylist(pub_key: &PublicKeyBinary, deny_list: &DenyList) -> GenericVerifyResult { + if deny_list.check_key(pub_key) { + tracing::debug!( + "report verification failed, reason: {:?}. + pubkey: {}", + InvalidReason::Denied, + pub_key, + ); + return Err(InvalidReason::Denied); + } + Ok(()) +} + /// verify remote entropy /// if received timestamp is outside of entopy start/end then return invalid fn verify_entropy( @@ -767,6 +793,7 @@ mod tests { use super::*; use crate::last_beacon::LastBeacon; use chrono::{Duration, TimeZone}; + use denylist::DenyList; use file_store::iot_beacon_report::IotBeaconReport; use file_store::iot_witness_report::IotWitnessReport; use helium_proto::DataRate; @@ -797,6 +824,7 @@ mod tests { const PUBKEY1: &str = "112bUuQaE7j73THS9ABShHGokm46Miip9L361FSyWv7zSYn8hZWf"; const PUBKEY2: &str = "11z69eJ3czc92k6snrfR9ek7g2uRWXosFbnG9v4bXgwhfUCivUo"; + const DENIED_PUBKEY1: &str = "112bUGwooPd1dCDd3h3yZwskjxCzBsQNKeaJTuUF4hSgYedcsFa9"; // hardcode beacon & entropy data taken from a beacon generated on a hotspot const LOCAL_ENTROPY: [u8; 4] = [233, 70, 25, 176]; @@ -884,7 +912,7 @@ mod tests { assert_eq!(POC_DATA.to_vec(), generated_beacon.data); // get a valid a beacon report in the form of an ingested beacon report - let mut ingest_beacon_report = valid_beacon_report(received_ts); + let mut ingest_beacon_report = valid_beacon_report(PUBKEY1, received_ts); // assert the generated beacon report from the ingest report // matches our received report @@ -994,6 +1022,21 @@ mod tests { ); } + #[test] + fn test_verify_denylist() { + let deny_list: DenyList = vec![PublicKeyBinary::from_str(DENIED_PUBKEY1).unwrap()] + .try_into() + .unwrap(); + assert!(verify_denylist(&PublicKeyBinary::from_str(PUBKEY1).unwrap(), &deny_list).is_ok()); + assert_eq!( + Err(InvalidReason::Denied), + verify_denylist( + &PublicKeyBinary::from_str(DENIED_PUBKEY1).unwrap(), + &deny_list + ) + ); + } + #[test] fn test_verify_capability() { assert!(verify_gw_capability(true).is_ok()); @@ -1140,10 +1183,31 @@ mod tests { let entropy_end = entropy_start + Duration::minutes(3); let beacon_interval = Duration::minutes(5); let beacon_interval_tolerance = Duration::seconds(60); + let deny_list: DenyList = vec![PublicKeyBinary::from_str(DENIED_PUBKEY1).unwrap()] + .try_into() + .unwrap(); + + // test deny list verification is active in the beacon validation list + let beacon_report1 = + valid_beacon_report(DENIED_PUBKEY1, entropy_start + Duration::minutes(4)); + let resp1 = do_beacon_verifications( + &deny_list, + entropy_start, + entropy_end, + ENTROPY_VERSION, + None, + &beacon_report1, + &beaconer_info, + &default_region_params(), + beacon_interval, + beacon_interval_tolerance, + ); + assert_eq!(Err(InvalidReason::Denied), resp1); // test entropy lifepsan verification is active in the beacon validation list - let beacon_report1 = valid_beacon_report(entropy_start + Duration::minutes(4)); + let beacon_report1 = valid_beacon_report(PUBKEY1, entropy_start + Duration::minutes(4)); let resp1 = do_beacon_verifications( + &deny_list, entropy_start, entropy_end, ENTROPY_VERSION, @@ -1157,9 +1221,10 @@ mod tests { assert_eq!(Err(InvalidReason::EntropyExpired), resp1); // test location verification is active in the beacon validation list - let beacon_report2 = valid_beacon_report(entropy_start + Duration::minutes(2)); + let beacon_report2 = valid_beacon_report(PUBKEY1, entropy_start + Duration::minutes(2)); let beacon_info2 = beaconer_gateway_info(None, ProtoRegion::Eu868, true); let resp2 = do_beacon_verifications( + &deny_list, entropy_start, entropy_end, ENTROPY_VERSION, @@ -1173,12 +1238,13 @@ mod tests { assert_eq!(Err(InvalidReason::NotAsserted), resp2); // test schedule verification is active in the beacon validation list - let beacon_report3 = valid_beacon_report(entropy_start + Duration::minutes(2)); + let beacon_report3 = valid_beacon_report(PUBKEY1, entropy_start + Duration::minutes(2)); let last_beacon3 = LastBeacon { id: vec![], timestamp: Utc::now() - Duration::minutes(5), }; let resp3 = do_beacon_verifications( + &deny_list, entropy_start, entropy_end, ENTROPY_VERSION, @@ -1192,9 +1258,10 @@ mod tests { assert_eq!(Err(InvalidReason::IrregularInterval), resp3); // test capability verification is active in the beacon validation list - let beacon_report4 = valid_beacon_report(entropy_start + Duration::minutes(2)); + let beacon_report4 = valid_beacon_report(PUBKEY1, entropy_start + Duration::minutes(2)); let beacon_info4 = beaconer_gateway_info(Some(LOC0), ProtoRegion::Eu868, false); let resp4 = do_beacon_verifications( + &deny_list, entropy_start, entropy_end, ENTROPY_VERSION, @@ -1210,6 +1277,7 @@ mod tests { // test beacon construction verification is active in the beacon validation list let beacon_report5 = invalid_beacon_bad_payload(entropy_start + Duration::minutes(2)); let resp5 = do_beacon_verifications( + &deny_list, entropy_start, entropy_end, ENTROPY_VERSION, @@ -1223,8 +1291,9 @@ mod tests { assert_eq!(Err(InvalidReason::InvalidPacket), resp5); // for completeness, confirm our valid beacon report is sane - let beacon_report6 = valid_beacon_report(entropy_start + Duration::minutes(2)); + let beacon_report6 = valid_beacon_report(PUBKEY1, entropy_start + Duration::minutes(2)); let resp6 = do_beacon_verifications( + &deny_list, entropy_start, entropy_end, ENTROPY_VERSION, @@ -1249,7 +1318,7 @@ mod tests { // from `do_witness_verifications` // create default data structs - let beacon_report = valid_beacon_report(Utc::now() - Duration::minutes(2)); + let beacon_report = valid_beacon_report(PUBKEY1, Utc::now() - Duration::minutes(2)); let beaconer_info = beaconer_gateway_info(Some(LOC0), ProtoRegion::Eu868, true); let beaconer_metadata = beaconer_info .metadata @@ -1257,10 +1326,14 @@ mod tests { let witness_info = witness_gateway_info(Some(LOC4), ProtoRegion::Eu868, true); let entropy_start = Utc.timestamp_millis_opt(1676381847900).unwrap(); let entropy_end = entropy_start + Duration::minutes(3); + let deny_list: DenyList = vec![PublicKeyBinary::from_str(DENIED_PUBKEY1).unwrap()] + .try_into() + .unwrap(); // test self witness verification is active in the witness validation list let witness_report1 = invalid_witness_self_witness(entropy_start + Duration::minutes(2)); let resp1 = do_witness_verifications( + &deny_list, entropy_start, entropy_end, &witness_report1, @@ -1271,8 +1344,9 @@ mod tests { assert_eq!(Err(InvalidReason::SelfWitness), resp1); // test entropy lifepsan verification is active in the witness validation list - let witness_report2 = valid_witness_report(entropy_start + Duration::minutes(5)); + let witness_report2 = valid_witness_report(PUBKEY2, entropy_start + Duration::minutes(5)); let resp2 = do_witness_verifications( + &deny_list, entropy_start, entropy_end, &witness_report2, @@ -1285,6 +1359,7 @@ mod tests { // test witness packet data verification is active in the witness validation list let witness_report3 = invalid_witness_bad_data(entropy_start + Duration::minutes(2)); let resp3 = do_witness_verifications( + &deny_list, entropy_start, entropy_end, &witness_report3, @@ -1295,9 +1370,10 @@ mod tests { assert_eq!(Err(InvalidReason::InvalidPacket), resp3); // test location verification is active in the witness validation list - let witness_report4 = valid_witness_report(entropy_start + Duration::minutes(2)); + let witness_report4 = valid_witness_report(PUBKEY2, entropy_start + Duration::minutes(2)); let witness_info4 = witness_gateway_info(None, ProtoRegion::Eu868, true); let resp4 = do_witness_verifications( + &deny_list, entropy_start, entropy_end, &witness_report4, @@ -1310,6 +1386,7 @@ mod tests { // test witness frequency verification is active in the witness validation list let witness_report5 = invalid_witness_bad_freq(entropy_start + Duration::minutes(2)); let resp5 = do_witness_verifications( + &deny_list, entropy_start, entropy_end, &witness_report5, @@ -1320,9 +1397,10 @@ mod tests { assert_eq!(Err(InvalidReason::InvalidFrequency), resp5); // test witness region verification is active in the witness validation list - let witness_report6 = valid_witness_report(entropy_start + Duration::minutes(2)); + let witness_report6 = valid_witness_report(PUBKEY2, entropy_start + Duration::minutes(2)); let witness_info6 = witness_gateway_info(Some(LOC1), ProtoRegion::Us915, true); let resp6 = do_witness_verifications( + &deny_list, entropy_start, entropy_end, &witness_report6, @@ -1333,9 +1411,10 @@ mod tests { assert_eq!(Err(InvalidReason::InvalidRegion), resp6); // test witness min cell distance verification is active in the witness validation list - let witness_report7 = valid_witness_report(entropy_start + Duration::minutes(2)); + let witness_report7 = valid_witness_report(PUBKEY2, entropy_start + Duration::minutes(2)); let witness_info7 = witness_gateway_info(Some(LOC3), ProtoRegion::Eu868, true); let resp7 = do_witness_verifications( + &deny_list, entropy_start, entropy_end, &witness_report7, @@ -1346,9 +1425,10 @@ mod tests { assert_eq!(Err(InvalidReason::BelowMinDistance), resp7); // test witness max distance verification is active in the witness validation list - let witness_report8 = valid_witness_report(entropy_start + Duration::minutes(2)); + let witness_report8 = valid_witness_report(PUBKEY2, entropy_start + Duration::minutes(2)); let witness_info8 = witness_gateway_info(Some(LOC2), ProtoRegion::Eu868, true); let resp8 = do_witness_verifications( + &deny_list, entropy_start, entropy_end, &witness_report8, @@ -1361,6 +1441,7 @@ mod tests { // test witness rssi verification is active in the witness validation list let witness_report9 = invalid_witness_bad_rssi(entropy_start + Duration::minutes(2)); let resp9 = do_witness_verifications( + &deny_list, entropy_start, entropy_end, &witness_report9, @@ -1371,9 +1452,10 @@ mod tests { assert_eq!(Err(InvalidReason::BadRssi), resp9); // test witness capability verification is active in the witness validation list - let witness_report10 = valid_witness_report(entropy_start + Duration::minutes(2)); + let witness_report10 = valid_witness_report(PUBKEY2, entropy_start + Duration::minutes(2)); let witness_info10 = witness_gateway_info(Some(LOC4), ProtoRegion::Eu868, false); let resp10 = do_witness_verifications( + &deny_list, entropy_start, entropy_end, &witness_report10, @@ -1384,9 +1466,10 @@ mod tests { assert_eq!(Err(InvalidReason::InvalidCapability), resp10); // for completeness, confirm our valid witness report is sane - let witness_report11 = valid_witness_report(entropy_start + Duration::minutes(2)); + let witness_report11 = valid_witness_report(PUBKEY2, entropy_start + Duration::minutes(2)); let witness_info11 = witness_gateway_info(Some(LOC4), ProtoRegion::Eu868, true); let resp11 = do_witness_verifications( + &deny_list, entropy_start, entropy_end, &witness_report11, @@ -1443,10 +1526,13 @@ mod tests { region_params.params } - fn valid_beacon_report(received_timestamp: DateTime) -> IotBeaconIngestReport { + fn valid_beacon_report( + pubkey: &str, + received_timestamp: DateTime, + ) -> IotBeaconIngestReport { beacon_report_to_ingest_report( IotBeaconReport { - pub_key: PublicKeyBinary::from_str(PUBKEY1).unwrap(), + pub_key: PublicKeyBinary::from_str(pubkey).unwrap(), local_entropy: LOCAL_ENTROPY.to_vec(), remote_entropy: REMOTE_ENTROPY.to_vec(), data: POC_DATA.to_vec(), @@ -1463,7 +1549,7 @@ mod tests { } fn invalid_beacon_bad_payload(received_timestamp: DateTime) -> IotBeaconIngestReport { - let mut report = valid_beacon_report(received_timestamp); + let mut report = valid_beacon_report(PUBKEY1, received_timestamp); report.report.data = [ 138, 139, 152, 130, 179, 215, 179, 238, 75, 167, 66, 182, 209, 87, 168, 137, 192, ] @@ -1471,10 +1557,13 @@ mod tests { report } - fn valid_witness_report(received_timestamp: DateTime) -> IotWitnessIngestReport { + fn valid_witness_report( + pubkey: &str, + received_timestamp: DateTime, + ) -> IotWitnessIngestReport { witness_report_to_ingest_report( IotWitnessReport { - pub_key: PublicKeyBinary::from_str(PUBKEY2).unwrap(), + pub_key: PublicKeyBinary::from_str(pubkey).unwrap(), data: POC_DATA.to_vec(), timestamp: Utc::now(), tmst: 0, @@ -1491,12 +1580,12 @@ mod tests { // each invalid below overrides a field of a valid report which // will render it invalid in the desired way fn invalid_witness_self_witness(received_timestamp: DateTime) -> IotWitnessIngestReport { - let mut report = valid_witness_report(received_timestamp); + let mut report = valid_witness_report(PUBKEY2, received_timestamp); report.report.pub_key = PublicKeyBinary::from_str(PUBKEY1).unwrap(); report } fn invalid_witness_bad_data(received_timestamp: DateTime) -> IotWitnessIngestReport { - let mut report = valid_witness_report(received_timestamp); + let mut report = valid_witness_report(PUBKEY2, received_timestamp); report.report.data = [ 138, 139, 152, 130, 179, 215, 179, 238, 75, 167, 66, 182, 209, 87, 168, 137, 192, ] @@ -1504,12 +1593,12 @@ mod tests { report } fn invalid_witness_bad_freq(received_timestamp: DateTime) -> IotWitnessIngestReport { - let mut report = valid_witness_report(received_timestamp); + let mut report = valid_witness_report(PUBKEY2, received_timestamp); report.report.frequency = 867100000; report } fn invalid_witness_bad_rssi(received_timestamp: DateTime) -> IotWitnessIngestReport { - let mut report = valid_witness_report(received_timestamp); + let mut report = valid_witness_report(PUBKEY2, received_timestamp); report.report.signal = 300; report } diff --git a/iot_verifier/src/runner.rs b/iot_verifier/src/runner.rs index 3f94661ae..1fe336fca 100644 --- a/iot_verifier/src/runner.rs +++ b/iot_verifier/src/runner.rs @@ -4,6 +4,7 @@ use crate::{ Settings, }; use chrono::{Duration as ChronoDuration, Utc}; +use denylist::DenyList; use file_store::{ file_sink, file_sink::FileSinkClient, @@ -23,7 +24,7 @@ use helium_proto::services::poc_lora::{ use rust_decimal::{Decimal, MathematicalOps}; use rust_decimal_macros::dec; use sqlx::PgPool; -use std::path::Path; +use std::{path::Path, time::Duration}; use tokio::time::{self, MissedTickBehavior}; /// the cadence in seconds at which the DB is polled for ready POCs @@ -42,12 +43,11 @@ pub struct Runner { max_witnesses_per_poc: u64, beacon_max_retries: u64, witness_max_retries: u64, + deny_list_latest_url: String, + deny_list_trigger_interval: Duration, + deny_list: DenyList, } -#[derive(thiserror::Error, Debug)] -#[error("error creating runner: {0}")] -pub struct NewRunnerError(#[from] db_store::Error); - #[derive(thiserror::Error, Debug)] pub enum RunnerError { #[error("not found: {0}")] @@ -58,14 +58,16 @@ pub enum FilterStatus { Drop, Include, } + impl Runner { - pub async fn from_settings(settings: &Settings, pool: PgPool) -> Result { + pub async fn new(settings: &Settings, pool: PgPool) -> anyhow::Result { let cache = settings.cache.clone(); let beacon_interval = settings.beacon_interval(); let beacon_interval_tolerance = settings.beacon_interval_tolerance(); let max_witnesses_per_poc = settings.max_witnesses_per_poc; let beacon_max_retries = settings.beacon_max_retries; let witness_max_retries = settings.witness_max_retries; + let deny_list = DenyList::new()?; Ok(Self { pool, cache, @@ -74,6 +76,9 @@ impl Runner { max_witnesses_per_poc, beacon_max_retries, witness_max_retries, + deny_list_latest_url: settings.denylist.denylist_url.clone(), + deny_list_trigger_interval: settings.denylist.trigger_interval(), + deny_list, }) } @@ -90,6 +95,9 @@ impl Runner { let mut db_timer = time::interval(DB_POLL_TIME); db_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut denylist_timer = time::interval(self.deny_list_trigger_interval); + denylist_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); + let store_base_path = Path::new(&self.cache); let (iot_invalid_beacon_sink, mut iot_invalid_beacon_sink_server) = @@ -137,6 +145,13 @@ impl Runner { } tokio::select! { _ = shutdown.clone() => break, + _ = denylist_timer.tick() => + match self.handle_denylist_tick().await { + Ok(()) => (), + Err(err) => { + tracing::error!("fatal loader error, denylist_tick triggered: {err:?}"); + } + }, _ = db_timer.tick() => match self.handle_db_tick( shutdown.clone(), &iot_invalid_beacon_sink, @@ -156,6 +171,23 @@ impl Runner { Ok(()) } + async fn handle_denylist_tick(&mut self) -> anyhow::Result<()> { + tracing::info!("handling denylist tick"); + // sink any errors whilst updating the denylist + // the verifier should not stop just because github + // could not be reached for example + match self + .deny_list + .update_to_latest(&self.deny_list_latest_url) + .await + { + Ok(()) => (), + Err(e) => tracing::warn!("failed to update denylist: {e}"), + } + tracing::info!("completed handling denylist tick"); + Ok(()) + } + #[allow(clippy::too_many_arguments)] async fn handle_db_tick( &self, @@ -270,6 +302,7 @@ impl Runner { &self.pool, self.beacon_interval, self.beacon_interval_tolerance, + &self.deny_list, ) .await?; match beacon_verify_result.result { @@ -277,7 +310,12 @@ impl Runner { // beacon is valid, verify the POC witnesses if let Some(beacon_info) = beacon_verify_result.gateway_info { let verified_witnesses_result = poc - .verify_witnesses(&beacon_info, hex_density_map, gateway_cache) + .verify_witnesses( + &beacon_info, + hex_density_map, + gateway_cache, + &self.deny_list, + ) .await?; // check if there are any failed witnesses // if so update the DB attempts count From 965f619dfcd3057df98dc3beb2052224f99efb80 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Fri, 4 Aug 2023 16:13:00 +0100 Subject: [PATCH 2/2] enrich invalid beacon reports with gateway metadata --- Cargo.lock | 4 ++-- file_store/src/iot_invalid_poc.rs | 12 ++++++++++ iot_verifier/src/purger.rs | 3 +++ iot_verifier/src/runner.rs | 37 ++++++++++++++++++++++--------- 4 files changed, 44 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7297ba324..f727a1c07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1118,7 +1118,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#d86b3e3394e8d9f014fcef3ee08740b3fe269e99" +source = "git+https://github.com/helium/proto?branch=master#7d85f190b207a3ac8d6ad081e2f70e45eecd1a3a" dependencies = [ "base64 0.21.0", "byteorder", @@ -2944,7 +2944,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#d86b3e3394e8d9f014fcef3ee08740b3fe269e99" +source = "git+https://github.com/helium/proto?branch=master#7d85f190b207a3ac8d6ad081e2f70e45eecd1a3a" dependencies = [ "bytes", "prost", diff --git a/file_store/src/iot_invalid_poc.rs b/file_store/src/iot_invalid_poc.rs index 74d527260..d137ba516 100644 --- a/file_store/src/iot_invalid_poc.rs +++ b/file_store/src/iot_invalid_poc.rs @@ -17,6 +17,9 @@ pub struct IotInvalidBeaconReport { pub received_timestamp: DateTime, pub reason: InvalidReason, pub report: IotBeaconReport, + pub location: Option, + pub gain: i32, + pub elevation: i32, } #[derive(Serialize, Clone)] @@ -75,6 +78,9 @@ impl TryFrom for IotInvalidBeaconReport { .report .ok_or_else(|| Error::not_found("iot invalid beacon report v1"))? .try_into()?, + location: v.location.parse().ok(), + gain: v.gain, + elevation: v.elevation, }) } } @@ -87,6 +93,12 @@ impl From for LoraInvalidBeaconReportV1 { received_timestamp, reason: v.reason as i32, report: Some(report), + location: v + .location + .map(|l| l.to_string()) + .unwrap_or_else(String::new), + gain: v.gain, + elevation: v.elevation, } } } diff --git a/iot_verifier/src/purger.rs b/iot_verifier/src/purger.rs index b565fbef4..31dfd445b 100644 --- a/iot_verifier/src/purger.rs +++ b/iot_verifier/src/purger.rs @@ -198,6 +198,9 @@ impl Purger { received_timestamp, reason: InvalidReason::Stale, report: beacon.clone(), + location: None, + gain: 0, + elevation: 0, } .into(); diff --git a/iot_verifier/src/runner.rs b/iot_verifier/src/runner.rs index 1fe336fca..169d81d68 100644 --- a/iot_verifier/src/runner.rs +++ b/iot_verifier/src/runner.rs @@ -1,7 +1,12 @@ use crate::{ - gateway_cache::GatewayCache, hex_density::HexDensityMap, last_beacon::LastBeacon, poc::Poc, - poc_report::Report, region_cache::RegionCache, reward_share::GatewayPocShare, telemetry, - Settings, + gateway_cache::GatewayCache, + hex_density::HexDensityMap, + last_beacon::LastBeacon, + poc::{Poc, VerifyBeaconResult}, + poc_report::Report, + region_cache::RegionCache, + reward_share::GatewayPocShare, + telemetry, Settings, }; use chrono::{Duration as ChronoDuration, Utc}; use denylist::DenyList; @@ -149,7 +154,7 @@ impl Runner { match self.handle_denylist_tick().await { Ok(()) => (), Err(err) => { - tracing::error!("fatal loader error, denylist_tick triggered: {err:?}"); + tracing::error!("error whilst handling denylist tick: {err:?}"); } }, _ = db_timer.tick() => @@ -404,9 +409,9 @@ impl Runner { VerificationStatus::Invalid => { // the beacon is invalid, which in turn renders all witnesses invalid self.handle_invalid_poc( + beacon_verify_result, &beacon_report, witnesses, - beacon_verify_result.invalid_reason, iot_invalid_beacon_sink, iot_invalid_witness_sink, ) @@ -418,9 +423,9 @@ impl Runner { async fn handle_invalid_poc( &self, + beacon_verify_result: VerifyBeaconResult, beacon_report: &IotBeaconIngestReport, witness_reports: Vec, - invalid_reason: InvalidReason, iot_invalid_beacon_sink: &FileSinkClient, iot_invalid_witness_sink: &FileSinkClient, ) -> anyhow::Result<()> { @@ -428,10 +433,22 @@ impl Runner { let beacon = &beacon_report.report; let beacon_id = beacon.data.clone(); let beacon_report_id = beacon_report.ingest_id(); + + let (location, elevation, gain) = match beacon_verify_result.gateway_info { + Some(gateway_info) => match gateway_info.metadata { + Some(metadata) => (Some(metadata.location), metadata.elevation, metadata.gain), + None => (None, 0, 0), + }, + None => (None, 0, 0), + }; + let invalid_poc: IotInvalidBeaconReport = IotInvalidBeaconReport { received_timestamp: beacon_report.received_timestamp, - reason: invalid_reason, + reason: beacon_verify_result.invalid_reason, report: beacon.clone(), + location, + elevation, + gain, }; let invalid_poc_proto: LoraInvalidBeaconReportV1 = invalid_poc.into(); // save invalid poc to s3, if write fails update attempts and go no further @@ -439,7 +456,7 @@ impl Runner { match iot_invalid_beacon_sink .write( invalid_poc_proto, - &[("reason", invalid_reason.as_str_name())], + &[("reason", beacon_verify_result.invalid_reason.as_str_name())], ) .await { @@ -459,7 +476,7 @@ impl Runner { let invalid_witness_report: IotInvalidWitnessReport = IotInvalidWitnessReport { received_timestamp: witness_report.received_timestamp, report: witness_report.report, - reason: invalid_reason, + reason: beacon_verify_result.invalid_reason, participant_side: InvalidParticipantSide::Beaconer, }; let invalid_witness_report_proto: LoraInvalidWitnessReportV1 = @@ -467,7 +484,7 @@ impl Runner { match iot_invalid_witness_sink .write( invalid_witness_report_proto, - &[("reason", invalid_reason.as_str_name())], + &[("reason", beacon_verify_result.invalid_reason.as_str_name())], ) .await {