diff --git a/Cargo.lock b/Cargo.lock index e8c10563d..5689a27f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1617,7 +1617,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=macpie/radio_location_estimates#571c0781bf2c06db375a3ff9f837cc94fd11053d" +source = "git+https://github.com/helium/proto?branch=macpie/radio_location_estimates#4cefc5d337a4dddebd3fa3896b3c29f12e86f1ef" dependencies = [ "base64 0.21.7", "byteorder", @@ -3882,7 +3882,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=macpie/radio_location_estimates#571c0781bf2c06db375a3ff9f837cc94fd11053d" +source = "git+https://github.com/helium/proto?branch=macpie/radio_location_estimates#4cefc5d337a4dddebd3fa3896b3c29f12e86f1ef" dependencies = [ "bytes", "prost", diff --git a/file_store/src/radio_location_estimates.rs b/file_store/src/radio_location_estimates.rs index 3b9eaf547..25fc6d518 100644 --- a/file_store/src/radio_location_estimates.rs +++ b/file_store/src/radio_location_estimates.rs @@ -5,14 +5,52 @@ use crate::{ use chrono::{DateTime, Utc}; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ - RadioLocationEstimateV1, RadioLocationEstimatesReqV1, RleEventV1, + self as proto, RadioLocationEstimateV1, RadioLocationEstimatesReqV1, RleEventV1, }; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; +use std::fmt; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum Entity { + CbrsId(String), + WifiPubKey(PublicKeyBinary), +} + +impl fmt::Display for Entity { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Entity::CbrsId(id) => write!(f, "{}", id), + Entity::WifiPubKey(pub_key) => write!(f, "{}", pub_key), + } + } +} + +impl From for Entity { + fn from(entity: proto::radio_location_estimates_req_v1::Entity) -> Self { + match entity { + proto::radio_location_estimates_req_v1::Entity::CbrsId(v) => Entity::CbrsId(v), + proto::radio_location_estimates_req_v1::Entity::WifiPubKey(k) => { + Entity::WifiPubKey(k.into()) + } + } + } +} + +impl From for proto::radio_location_estimates_req_v1::Entity { + fn from(entity: Entity) -> Self { + match entity { + Entity::CbrsId(v) => proto::radio_location_estimates_req_v1::Entity::CbrsId(v), + Entity::WifiPubKey(k) => { + proto::radio_location_estimates_req_v1::Entity::WifiPubKey(k.into()) + } + } + } +} #[derive(Clone, Deserialize, Serialize, Debug, PartialEq)] pub struct RadioLocationEstimatesReq { - pub radio_id: String, + pub entity: Entity, pub estimates: Vec, pub timestamp: DateTime, pub carrier_key: PublicKeyBinary, @@ -38,7 +76,7 @@ impl From for RadioLocationEstimatesReqV1 { fn from(rle: RadioLocationEstimatesReq) -> Self { let timestamp = rle.timestamp(); RadioLocationEstimatesReqV1 { - radio_id: rle.radio_id, + entity: Some(rle.entity.into()), estimates: rle.estimates.into_iter().map(|e| e.into()).collect(), timestamp, carrier_key: rle.carrier_key.into(), @@ -52,7 +90,11 @@ impl TryFrom for RadioLocationEstimatesReq { fn try_from(req: RadioLocationEstimatesReqV1) -> Result { let timestamp = req.timestamp()?; Ok(Self { - radio_id: req.radio_id, + entity: if let Some(entity) = req.entity { + entity.into() + } else { + return Err(Error::NotFound("entity".to_string())); + }, estimates: req .estimates .into_iter() diff --git a/ingest/tests/common/mod.rs b/ingest/tests/common/mod.rs index f36e0bb38..bd5e0b51b 100644 --- a/ingest/tests/common/mod.rs +++ b/ingest/tests/common/mod.rs @@ -1,12 +1,12 @@ use anyhow::bail; use backon::{ExponentialBuilder, Retryable}; use file_store::file_sink::FileSinkClient; -use helium_crypto::{KeyTag, Keypair, Network, Sign}; +use helium_crypto::{KeyTag, Keypair, Network, PublicKey, Sign}; use helium_proto::services::poc_mobile::{ - Client as PocMobileClient, RadioLocationEstimateV1, RadioLocationEstimatesIngestReportV1, - RadioLocationEstimatesReqV1, RadioLocationEstimatesRespV1, - SubscriberVerifiedMappingEventIngestReportV1, SubscriberVerifiedMappingEventReqV1, - SubscriberVerifiedMappingEventResV1, + radio_location_estimates_req_v1, Client as PocMobileClient, RadioLocationEstimateV1, + RadioLocationEstimatesIngestReportV1, RadioLocationEstimatesReqV1, + RadioLocationEstimatesRespV1, SubscriberVerifiedMappingEventIngestReportV1, + SubscriberVerifiedMappingEventReqV1, SubscriberVerifiedMappingEventResV1, }; use ingest::server_mobile::GrpcServer; use prost::Message; @@ -182,11 +182,13 @@ impl TestClient { pub async fn submit_radio_location_estimates( &mut self, - radio_id: String, + pub_key: &PublicKey, estimates: Vec, ) -> anyhow::Result { let mut req = RadioLocationEstimatesReqV1 { - radio_id, + entity: Some(radio_location_estimates_req_v1::Entity::WifiPubKey( + pub_key.into(), + )), estimates, timestamp: 0, carrier_key: self.key_pair.public_key().to_vec(), diff --git a/ingest/tests/mobile_ingest.rs b/ingest/tests/mobile_ingest.rs index f50e6acbd..f0597b75e 100644 --- a/ingest/tests/mobile_ingest.rs +++ b/ingest/tests/mobile_ingest.rs @@ -1,4 +1,9 @@ -use helium_proto::services::poc_mobile::{RadioLocationEstimateV1, RleEventV1}; +use helium_crypto::{KeyTag, Keypair, PublicKey}; +use helium_proto::services::poc_mobile::{ + radio_location_estimates_req_v1::Entity, RadioLocationEstimateV1, RadioLocationEstimatesReqV1, + RleEventV1, +}; +use rand::rngs::OsRng; use rust_decimal::prelude::*; mod common; @@ -41,7 +46,8 @@ async fn submit_verified_subscriber_mapping_event() -> anyhow::Result<()> { async fn submit_radio_location_estimates() -> anyhow::Result<()> { let (mut client, trigger) = common::setup_mobile().await?; - let radio_id = "radio_id".to_string(); + let key_pair = Keypair::generate(KeyTag::default(), &mut OsRng); + let public_key = key_pair.public_key(); let estimates = vec![RadioLocationEstimateV1 { radius: to_proto_decimal(2.0), lat: to_proto_decimal(41.41208), @@ -54,7 +60,7 @@ async fn submit_radio_location_estimates() -> anyhow::Result<()> { }]; let res = client - .submit_radio_location_estimates(radio_id.clone(), estimates.clone()) + .submit_radio_location_estimates(public_key, estimates.clone()) .await; assert!(res.is_ok()); @@ -68,7 +74,8 @@ async fn submit_radio_location_estimates() -> anyhow::Result<()> { match report.report { None => panic!("No report found"), Some(req) => { - assert_eq!(radio_id, req.radio_id); + let req_public_key = wifi_public_key(req.clone())?; + assert_eq!(public_key.to_string(), req_public_key.to_string()); assert_eq!(estimates, req.estimates); } } @@ -86,3 +93,13 @@ fn to_proto_decimal(x: f64) -> Option { value: d.to_string(), }) } + +fn wifi_public_key(req: RadioLocationEstimatesReqV1) -> anyhow::Result { + let entity: Entity = req.entity.unwrap(); + let Entity::WifiPubKey(public_key_bytes) = entity.clone() else { + anyhow::bail!("not WifiPubKey") + }; + let public_key = PublicKey::from_bytes(&public_key_bytes)?; + + Ok(public_key) +} diff --git a/mobile_verifier/migrations/38_radio_location_estimates.sql b/mobile_verifier/migrations/38_radio_location_estimates.sql index 04fef8a86..e9ddbd176 100644 --- a/mobile_verifier/migrations/38_radio_location_estimates.sql +++ b/mobile_verifier/migrations/38_radio_location_estimates.sql @@ -1,6 +1,7 @@ CREATE TABLE IF NOT EXISTS radio_location_estimates ( hashed_key TEXT NOT NULL, - radio_id TEXT NOT NULL, + radio_type radio_type NOT NULL, + radio_key TEXT NOT NULL, received_timestamp TIMESTAMPTZ NOT NULL, radius DECIMAL NOT NULL, lat DECIMAL NOT NULL, diff --git a/mobile_verifier/src/radio_location_estimates.rs b/mobile_verifier/src/radio_location_estimates.rs index 998263070..b72b39dcc 100644 --- a/mobile_verifier/src/radio_location_estimates.rs +++ b/mobile_verifier/src/radio_location_estimates.rs @@ -1,11 +1,11 @@ -use crate::Settings; +use crate::{heartbeats::HbType, Settings}; use chrono::{DateTime, Utc}; use file_store::{ file_info_poller::{FileInfoStream, LookbackBehavior}, file_sink::FileSinkClient, file_source, file_upload::FileUpload, - radio_location_estimates::{RadioLocationEstimate, RadioLocationEstimatesReq}, + radio_location_estimates::{Entity, RadioLocationEstimate, RadioLocationEstimatesReq}, radio_location_estimates_ingest_report::RadioLocationEstimatesIngestReport, traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, verified_radio_location_estimates::VerifiedRadioLocationEstimatesReport, @@ -200,12 +200,12 @@ async fn save_to_db( exec: &mut Transaction<'_, Postgres>, ) -> Result<(), sqlx::Error> { let estimates = &report.report.estimates; - let radio_id = &report.report.radio_id; + let entity = &report.report.entity; let received_timestamp = report.received_timestamp; for estimate in estimates { - insert_estimate(radio_id.clone(), received_timestamp, estimate, exec).await?; + insert_estimate(entity, received_timestamp, estimate, exec).await?; } - invalidate_old_estimates(radio_id.clone(), received_timestamp, exec).await?; + invalidate_old_estimates(entity, received_timestamp, exec).await?; Ok(()) } @@ -231,7 +231,7 @@ async fn save_to_db( // } async fn invalidate_old_estimates( - radio_id: String, + entity: &Entity, timestamp: DateTime, exec: &mut Transaction<'_, Postgres>, ) -> Result<(), sqlx::Error> { @@ -239,11 +239,11 @@ async fn invalidate_old_estimates( r#" UPDATE radio_location_estimates SET invalided_at = now() - WHERE radio_id = $1 + WHERE radio_key = $1 AND received_timestamp < $2; "#, ) - .bind(radio_id) + .bind(entity.to_string()) .bind(timestamp) .execute(exec) .await?; @@ -252,7 +252,7 @@ async fn invalidate_old_estimates( } async fn insert_estimate( - radio_id: String, + entity: &Entity, received_timestamp: DateTime, estimate: &RadioLocationEstimate, exec: &mut Transaction<'_, Postgres>, @@ -260,17 +260,18 @@ async fn insert_estimate( let radius = estimate.radius; let lat = estimate.lat; let long = estimate.long; - let hashed_key = hash_key(radio_id.clone(), received_timestamp, radius, lat, long); + let hashed_key = hash_key(entity, received_timestamp, radius, lat, long); sqlx::query( r#" - INSERT INTO radio_location_estimates (hashed_key, radio_id, received_timestamp, radius, lat, long, confidence) - VALUES ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO radio_location_estimates (hashed_key, radio_type, radio_key, received_timestamp, radius, lat, long, confidence) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (hashed_key) DO NOTHING "#, ) .bind(hashed_key) - .bind(radio_id) + .bind(entity_to_radio_type(entity)) + .bind(entity.to_string()) .bind(received_timestamp) .bind(estimate.radius) .bind(lat) @@ -283,13 +284,13 @@ async fn insert_estimate( } pub fn hash_key( - radio_id: String, + entity: &Entity, timestamp: DateTime, radius: Decimal, lat: Decimal, long: Decimal, ) -> String { - let key = format!("{}{}{}{}{}", radio_id, timestamp, radius, lat, long); + let key = format!("{}{}{}{}{}", entity, timestamp, radius, lat, long); let mut hasher = Sha256::new(); hasher.update(key); @@ -313,3 +314,10 @@ pub async fn clear_invalided( .await?; Ok(()) } + +fn entity_to_radio_type(entity: &Entity) -> HbType { + match entity { + Entity::CbrsId(_) => HbType::Cbrs, + Entity::WifiPubKey(_) => HbType::Wifi, + } +} diff --git a/mobile_verifier/tests/integrations/radio_location_estimates.rs b/mobile_verifier/tests/integrations/radio_location_estimates.rs index f8465c2a6..c0d64bb4c 100644 --- a/mobile_verifier/tests/integrations/radio_location_estimates.rs +++ b/mobile_verifier/tests/integrations/radio_location_estimates.rs @@ -4,7 +4,7 @@ use file_store::{ file_info_poller::FileInfoStream, file_sink::FileSinkClient, radio_location_estimates::{ - RadioLocationEstimate, RadioLocationEstimateEvent, RadioLocationEstimatesReq, + Entity, RadioLocationEstimate, RadioLocationEstimateEvent, RadioLocationEstimatesReq, }, radio_location_estimates_ingest_report::RadioLocationEstimatesIngestReport, FileInfo, @@ -36,7 +36,7 @@ async fn verifier_test(pool: PgPool) -> anyhow::Result<()> { }); // Sending reports as if they are coming from ingestor - let (fis, reports, _public_key_binary) = file_info_stream(); + let (fis, reports) = file_info_stream(); reports_tx.send(fis).await?; let mut retry = 0; @@ -88,7 +88,6 @@ async fn verifier_test(pool: PgPool) -> anyhow::Result<()> { fn file_info_stream() -> ( FileInfoStream, Vec, - PublicKeyBinary, ) { let file_info = FileInfo { key: "test_file_info".to_string(), @@ -97,14 +96,21 @@ fn file_info_stream() -> ( size: 0, }; - let key_pair = generate_keypair(); - let public_key_binary: PublicKeyBinary = key_pair.public_key().to_owned().into(); + let carrier_key_pair = generate_keypair(); + let carrier_public_key_binary: PublicKeyBinary = + carrier_key_pair.public_key().to_owned().into(); + + let hotspot_key_pair = generate_keypair(); + let hotspot_public_key_binary: PublicKeyBinary = + hotspot_key_pair.public_key().to_owned().into(); + + let entity = Entity::WifiPubKey(hotspot_public_key_binary); let reports = vec![ RadioLocationEstimatesIngestReport { received_timestamp: Utc::now() - Duration::hours(1), report: RadioLocationEstimatesReq { - radio_id: "radio_1".to_string(), + entity: entity.clone(), estimates: vec![RadioLocationEstimate { radius: rust_decimal::Decimal::from_f32(0.1).unwrap(), lat: rust_decimal::Decimal::from_f32(0.1).unwrap(), @@ -116,13 +122,13 @@ fn file_info_stream() -> ( }], }], timestamp: Utc::now() - Duration::hours(1), - carrier_key: public_key_binary.clone(), + carrier_key: carrier_public_key_binary.clone(), }, }, RadioLocationEstimatesIngestReport { received_timestamp: Utc::now(), report: RadioLocationEstimatesReq { - radio_id: "radio_1".to_string(), + entity: entity.clone(), estimates: vec![RadioLocationEstimate { radius: rust_decimal::Decimal::from_f32(0.2).unwrap(), lat: rust_decimal::Decimal::from_f32(0.2).unwrap(), @@ -134,14 +140,13 @@ fn file_info_stream() -> ( }], }], timestamp: Utc::now(), - carrier_key: public_key_binary.clone(), + carrier_key: carrier_public_key_binary.clone(), }, }, ]; ( FileInfoStream::new("default".to_string(), file_info, reports.clone()), reports, - public_key_binary, ) } @@ -161,7 +166,7 @@ fn compare_report_and_estimate( ) { assert_eq!( hash_key( - report.report.radio_id.clone(), + &report.report.entity, report.received_timestamp, report.report.estimates[0].radius, report.report.estimates[0].lat, @@ -170,7 +175,7 @@ fn compare_report_and_estimate( estimate.hashed_key ); - assert_eq!(report.report.radio_id, estimate.radio_id); + assert_eq!(report.report.entity.to_string(), estimate.radio_key); assert!(timestamp_match( report.received_timestamp, estimate.received_timestamp @@ -190,7 +195,7 @@ fn compare_report_and_estimate( #[derive(Debug)] pub struct RadioLocationEstimateDB { pub hashed_key: String, - pub radio_id: String, + pub radio_key: String, pub received_timestamp: DateTime, pub radius: rust_decimal::Decimal, pub lat: rust_decimal::Decimal, @@ -204,7 +209,7 @@ pub async fn select_radio_location_estimates( ) -> anyhow::Result> { let rows = sqlx::query( r#" - SELECT hashed_key, radio_id, received_timestamp, radius, lat, long, confidence, invalided_at + SELECT hashed_key, radio_key, hashed_key, received_timestamp, radius, lat, long, confidence, invalided_at FROM radio_location_estimates ORDER BY received_timestamp ASC "#, @@ -216,7 +221,7 @@ pub async fn select_radio_location_estimates( .into_iter() .map(|row| RadioLocationEstimateDB { hashed_key: row.get("hashed_key"), - radio_id: row.get("radio_id"), + radio_key: row.get("radio_key"), received_timestamp: row.get("received_timestamp"), radius: row.get("radius"), lat: row.get("lat"),