Skip to content

Commit

Permalink
Simplify structs and add verified file sink
Browse files Browse the repository at this point in the history
  • Loading branch information
bbalser committed Jun 27, 2024
1 parent dae629f commit 1d58063
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 117 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ pub const URBANIZATION_DATA_SET: &str = "urbanization";
pub const FOOTFALL_DATA_SET: &str = "footfall";
pub const LANDTYPE_DATA_SET: &str = "landtype";
pub const SP_BOOSTED_REWARDS_BANNED_RADIO: &str = "service_provider_boosted_rewards_banned_radio";
pub const VERIFIED_SP_BOOSTED_REWARDS_BANNED_RADIO: &str =
"verified_service_provider_boosted_rewards_banned_radio";

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -211,6 +213,7 @@ pub enum FileType {
InvalidatedRadioThresholdIngestReport,
VerifiedInvalidatedRadioThresholdIngestReport,
ServiceProviderBoostedRewardsBannedRadioIngestReport,
VerifiedServiceProviderBoostedRewardsBannedRadioIngestReport,
}

impl fmt::Display for FileType {
Expand Down Expand Up @@ -275,6 +278,9 @@ impl fmt::Display for FileType {
Self::ServiceProviderBoostedRewardsBannedRadioIngestReport => {
SP_BOOSTED_REWARDS_BANNED_RADIO
}
Self::VerifiedServiceProviderBoostedRewardsBannedRadioIngestReport => {
VERIFIED_SP_BOOSTED_REWARDS_BANNED_RADIO
}
};
f.write_str(s)
}
Expand Down Expand Up @@ -342,6 +348,9 @@ impl FileType {
Self::ServiceProviderBoostedRewardsBannedRadioIngestReport => {
SP_BOOSTED_REWARDS_BANNED_RADIO
}
Self::VerifiedServiceProviderBoostedRewardsBannedRadioIngestReport => {
VERIFIED_SP_BOOSTED_REWARDS_BANNED_RADIO
}
}
}
}
Expand Down Expand Up @@ -409,6 +418,9 @@ impl FromStr for FileType {
SP_BOOSTED_REWARDS_BANNED_RADIO => {
Self::ServiceProviderBoostedRewardsBannedRadioIngestReport
}
VERIFIED_SP_BOOSTED_REWARDS_BANNED_RADIO => {
Self::VerifiedServiceProviderBoostedRewardsBannedRadioIngestReport
}
_ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))),
};
Ok(result)
Expand Down
39 changes: 37 additions & 2 deletions file_store/src/file_info_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ where
}
}

pub struct ProtoFileInfoPollerParser;
pub struct MsgDecodeFileInfoPollerParser;

#[async_trait::async_trait]
impl<T> FileInfoPollerParser<T> for ProtoFileInfoPollerParser
impl<T> FileInfoPollerParser<T> for MsgDecodeFileInfoPollerParser
where
T: MsgDecode + TryFrom<T::Msg, Error = Error> + Send + Sync + 'static,
{
Expand Down Expand Up @@ -312,6 +312,41 @@ where
}
}

pub struct ProstFileInfoPollerParser;

#[async_trait::async_trait]
impl<T> FileInfoPollerParser<T> for ProstFileInfoPollerParser
where
T: helium_proto::Message + Default,
{
async fn parse(&self, byte_stream: ByteStream) -> Result<Vec<T>> {
Ok(file_store::stream_source(byte_stream)
.filter_map(|msg| async {
msg.map_err(|err| {
tracing::error!(
"Error streaming entry in file of type {}: {err:?}",
std::any::type_name::<T>()
);
err
})
.ok()
})
.filter_map(|msg| async {
<T as helium_proto::Message>::decode(msg)
.map_err(|err| {
tracing::error!(
"Error in decoding message of type {}: {err:?}",
std::any::type_name::<T>()
);
err
})
.ok()
})
.collect()
.await)
}
}

fn create_cache() -> MemoryFileCache {
Arc::new(Cache::new())
}
Expand Down
8 changes: 4 additions & 4 deletions file_store/src/file_source.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
file_info_poller::{FileInfoPollerConfigBuilder, ProtoFileInfoPollerParser},
file_info_poller::{FileInfoPollerConfigBuilder, MsgDecodeFileInfoPollerParser},
file_sink, BytesMutStream, Error,
};
use async_compression::tokio::bufread::GzipDecoder;
Expand All @@ -11,12 +11,12 @@ use std::path::{Path, PathBuf};
use tokio::{fs::File, io::BufReader};
use tokio_util::codec::{length_delimited::LengthDelimitedCodec, FramedRead};

pub fn continuous_source<T, S>() -> FileInfoPollerConfigBuilder<T, S, ProtoFileInfoPollerParser>
pub fn continuous_source<T, S>() -> FileInfoPollerConfigBuilder<T, S, MsgDecodeFileInfoPollerParser>
where
T: Clone,
{
FileInfoPollerConfigBuilder::<T, S, ProtoFileInfoPollerParser>::default()
.parser(ProtoFileInfoPollerParser)
FileInfoPollerConfigBuilder::<T, S, MsgDecodeFileInfoPollerParser>::default()
.parser(MsgDecodeFileInfoPollerParser)
}

pub fn source<I, P>(paths: I) -> BytesMutStream
Expand Down
1 change: 0 additions & 1 deletion file_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub mod mobile_subscriber;
pub mod mobile_transfer;
pub mod reward_manifest;
mod settings;
pub mod sp_boosted_rewards_bans;
pub mod speedtest;
pub mod traits;
pub mod wifi_heartbeat;
Expand Down
78 changes: 0 additions & 78 deletions file_store/src/sp_boosted_rewards_bans.rs

This file was deleted.

3 changes: 2 additions & 1 deletion mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,10 @@ impl Cmd {
.add_task(
ServiceProviderBoostedRewardsBanIngestor::create_managed_task(
pool.clone(),
file_upload.clone(),
report_ingest,
auth_client,
settings.start_after,
settings,
)
.await?,
)
Expand Down
Loading

0 comments on commit 1d58063

Please sign in to comment.