Skip to content

Commit

Permalink
Update proto and add Entity
Browse files Browse the repository at this point in the history
  • Loading branch information
macpie committed Oct 14, 2024
1 parent 5fe9e33 commit 58ae090
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 48 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 46 additions & 4 deletions file_store/src/radio_location_estimates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,52 @@ use crate::{
use chrono::{DateTime, Utc};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{
RadioLocationEstimateV1, RadioLocationEstimatesReqV1, RleEventV1,
self as proto, RadioLocationEstimateV1, RadioLocationEstimatesReqV1, RleEventV1,
};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::fmt;

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum Entity {
CbrsId(String),
WifiPubKey(PublicKeyBinary),
}

impl fmt::Display for Entity {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Entity::CbrsId(id) => write!(f, "{}", id),
Entity::WifiPubKey(pub_key) => write!(f, "{}", pub_key),
}
}
}

impl From<proto::radio_location_estimates_req_v1::Entity> for Entity {
fn from(entity: proto::radio_location_estimates_req_v1::Entity) -> Self {
match entity {
proto::radio_location_estimates_req_v1::Entity::CbrsId(v) => Entity::CbrsId(v),
proto::radio_location_estimates_req_v1::Entity::WifiPubKey(k) => {
Entity::WifiPubKey(k.into())
}
}
}
}

impl From<Entity> for proto::radio_location_estimates_req_v1::Entity {
fn from(entity: Entity) -> Self {
match entity {
Entity::CbrsId(v) => proto::radio_location_estimates_req_v1::Entity::CbrsId(v),
Entity::WifiPubKey(k) => {
proto::radio_location_estimates_req_v1::Entity::WifiPubKey(k.into())
}
}
}
}

#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)]
pub struct RadioLocationEstimatesReq {
pub radio_id: String,
pub entity: Entity,
pub estimates: Vec<RadioLocationEstimate>,
pub timestamp: DateTime<Utc>,
pub carrier_key: PublicKeyBinary,
Expand All @@ -38,7 +76,7 @@ impl From<RadioLocationEstimatesReq> for RadioLocationEstimatesReqV1 {
fn from(rle: RadioLocationEstimatesReq) -> Self {
let timestamp = rle.timestamp();
RadioLocationEstimatesReqV1 {
radio_id: rle.radio_id,
entity: Some(rle.entity.into()),
estimates: rle.estimates.into_iter().map(|e| e.into()).collect(),
timestamp,
carrier_key: rle.carrier_key.into(),
Expand All @@ -52,7 +90,11 @@ impl TryFrom<RadioLocationEstimatesReqV1> for RadioLocationEstimatesReq {
fn try_from(req: RadioLocationEstimatesReqV1) -> Result<Self> {
let timestamp = req.timestamp()?;
Ok(Self {
radio_id: req.radio_id,
entity: if let Some(entity) = req.entity {
entity.into()
} else {
return Err(Error::NotFound("entity".to_string()));
},
estimates: req
.estimates
.into_iter()
Expand Down
16 changes: 9 additions & 7 deletions ingest/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use anyhow::bail;
use backon::{ExponentialBuilder, Retryable};
use file_store::file_sink::FileSinkClient;
use helium_crypto::{KeyTag, Keypair, Network, Sign};
use helium_crypto::{KeyTag, Keypair, Network, PublicKey, Sign};
use helium_proto::services::poc_mobile::{
Client as PocMobileClient, RadioLocationEstimateV1, RadioLocationEstimatesIngestReportV1,
RadioLocationEstimatesReqV1, RadioLocationEstimatesRespV1,
SubscriberVerifiedMappingEventIngestReportV1, SubscriberVerifiedMappingEventReqV1,
SubscriberVerifiedMappingEventResV1,
radio_location_estimates_req_v1, Client as PocMobileClient, RadioLocationEstimateV1,
RadioLocationEstimatesIngestReportV1, RadioLocationEstimatesReqV1,
RadioLocationEstimatesRespV1, SubscriberVerifiedMappingEventIngestReportV1,
SubscriberVerifiedMappingEventReqV1, SubscriberVerifiedMappingEventResV1,
};
use ingest::server_mobile::GrpcServer;
use prost::Message;
Expand Down Expand Up @@ -182,11 +182,13 @@ impl TestClient {

pub async fn submit_radio_location_estimates(
&mut self,
radio_id: String,
pub_key: &PublicKey,
estimates: Vec<RadioLocationEstimateV1>,
) -> anyhow::Result<RadioLocationEstimatesRespV1> {
let mut req = RadioLocationEstimatesReqV1 {
radio_id,
entity: Some(radio_location_estimates_req_v1::Entity::WifiPubKey(
pub_key.into(),
)),
estimates,
timestamp: 0,
carrier_key: self.key_pair.public_key().to_vec(),
Expand Down
25 changes: 21 additions & 4 deletions ingest/tests/mobile_ingest.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use helium_proto::services::poc_mobile::{RadioLocationEstimateV1, RleEventV1};
use helium_crypto::{KeyTag, Keypair, PublicKey};
use helium_proto::services::poc_mobile::{
radio_location_estimates_req_v1::Entity, RadioLocationEstimateV1, RadioLocationEstimatesReqV1,
RleEventV1,
};
use rand::rngs::OsRng;
use rust_decimal::prelude::*;

mod common;
Expand Down Expand Up @@ -41,7 +46,8 @@ async fn submit_verified_subscriber_mapping_event() -> anyhow::Result<()> {
async fn submit_radio_location_estimates() -> anyhow::Result<()> {
let (mut client, trigger) = common::setup_mobile().await?;

let radio_id = "radio_id".to_string();
let key_pair = Keypair::generate(KeyTag::default(), &mut OsRng);
let public_key = key_pair.public_key();
let estimates = vec![RadioLocationEstimateV1 {
radius: to_proto_decimal(2.0),
lat: to_proto_decimal(41.41208),
Expand All @@ -54,7 +60,7 @@ async fn submit_radio_location_estimates() -> anyhow::Result<()> {
}];

let res = client
.submit_radio_location_estimates(radio_id.clone(), estimates.clone())
.submit_radio_location_estimates(public_key, estimates.clone())
.await;

assert!(res.is_ok());
Expand All @@ -68,7 +74,8 @@ async fn submit_radio_location_estimates() -> anyhow::Result<()> {
match report.report {
None => panic!("No report found"),
Some(req) => {
assert_eq!(radio_id, req.radio_id);
let req_public_key = wifi_public_key(req.clone())?;
assert_eq!(public_key.to_string(), req_public_key.to_string());
assert_eq!(estimates, req.estimates);
}
}
Expand All @@ -86,3 +93,13 @@ fn to_proto_decimal(x: f64) -> Option<helium_proto::Decimal> {
value: d.to_string(),
})
}

fn wifi_public_key(req: RadioLocationEstimatesReqV1) -> anyhow::Result<PublicKey> {
let entity: Entity = req.entity.unwrap();
let Entity::WifiPubKey(public_key_bytes) = entity.clone() else {
anyhow::bail!("not WifiPubKey")
};
let public_key = PublicKey::from_bytes(&public_key_bytes)?;

Ok(public_key)
}
3 changes: 2 additions & 1 deletion mobile_verifier/migrations/38_radio_location_estimates.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CREATE TABLE IF NOT EXISTS radio_location_estimates (
hashed_key TEXT NOT NULL,
radio_id TEXT NOT NULL,
radio_type radio_type NOT NULL,
radio_key TEXT NOT NULL,
received_timestamp TIMESTAMPTZ NOT NULL,
radius DECIMAL NOT NULL,
lat DECIMAL NOT NULL,
Expand Down
38 changes: 23 additions & 15 deletions mobile_verifier/src/radio_location_estimates.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::Settings;
use crate::{heartbeats::HbType, Settings};
use chrono::{DateTime, Utc};
use file_store::{
file_info_poller::{FileInfoStream, LookbackBehavior},
file_sink::FileSinkClient,
file_source,
file_upload::FileUpload,
radio_location_estimates::{RadioLocationEstimate, RadioLocationEstimatesReq},
radio_location_estimates::{Entity, RadioLocationEstimate, RadioLocationEstimatesReq},
radio_location_estimates_ingest_report::RadioLocationEstimatesIngestReport,
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt},
verified_radio_location_estimates::VerifiedRadioLocationEstimatesReport,
Expand Down Expand Up @@ -200,12 +200,12 @@ async fn save_to_db(
exec: &mut Transaction<'_, Postgres>,
) -> Result<(), sqlx::Error> {
let estimates = &report.report.estimates;
let radio_id = &report.report.radio_id;
let entity = &report.report.entity;
let received_timestamp = report.received_timestamp;
for estimate in estimates {
insert_estimate(radio_id.clone(), received_timestamp, estimate, exec).await?;
insert_estimate(entity, received_timestamp, estimate, exec).await?;
}
invalidate_old_estimates(radio_id.clone(), received_timestamp, exec).await?;
invalidate_old_estimates(entity, received_timestamp, exec).await?;

Ok(())
}
Expand All @@ -231,19 +231,19 @@ async fn save_to_db(
// }

async fn invalidate_old_estimates(
radio_id: String,
entity: &Entity,
timestamp: DateTime<Utc>,
exec: &mut Transaction<'_, Postgres>,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE radio_location_estimates
SET invalided_at = now()
WHERE radio_id = $1
WHERE radio_key = $1
AND received_timestamp < $2;
"#,
)
.bind(radio_id)
.bind(entity.to_string())
.bind(timestamp)
.execute(exec)
.await?;
Expand All @@ -252,25 +252,26 @@ async fn invalidate_old_estimates(
}

async fn insert_estimate(
radio_id: String,
entity: &Entity,
received_timestamp: DateTime<Utc>,
estimate: &RadioLocationEstimate,
exec: &mut Transaction<'_, Postgres>,
) -> Result<(), sqlx::Error> {
let radius = estimate.radius;
let lat = estimate.lat;
let long = estimate.long;
let hashed_key = hash_key(radio_id.clone(), received_timestamp, radius, lat, long);
let hashed_key = hash_key(entity, received_timestamp, radius, lat, long);

sqlx::query(
r#"
INSERT INTO radio_location_estimates (hashed_key, radio_id, received_timestamp, radius, lat, long, confidence)
VALUES ($1, $2, $3, $4, $5, $6, $7)
INSERT INTO radio_location_estimates (hashed_key, radio_type, radio_key, received_timestamp, radius, lat, long, confidence)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (hashed_key) DO NOTHING
"#,
)
.bind(hashed_key)
.bind(radio_id)
.bind(entity_to_radio_type(entity))
.bind(entity.to_string())
.bind(received_timestamp)
.bind(estimate.radius)
.bind(lat)
Expand All @@ -283,13 +284,13 @@ async fn insert_estimate(
}

pub fn hash_key(
radio_id: String,
entity: &Entity,
timestamp: DateTime<Utc>,
radius: Decimal,
lat: Decimal,
long: Decimal,
) -> String {
let key = format!("{}{}{}{}{}", radio_id, timestamp, radius, lat, long);
let key = format!("{}{}{}{}{}", entity, timestamp, radius, lat, long);

let mut hasher = Sha256::new();
hasher.update(key);
Expand All @@ -313,3 +314,10 @@ pub async fn clear_invalided(
.await?;
Ok(())
}

fn entity_to_radio_type(entity: &Entity) -> HbType {
match entity {
Entity::CbrsId(_) => HbType::Cbrs,
Entity::WifiPubKey(_) => HbType::Wifi,
}
}
Loading

0 comments on commit 58ae090

Please sign in to comment.