Skip to content

Commit

Permalink
Address comments from Pete
Browse files Browse the repository at this point in the history
* Change subscription_cache to track based on authority instead of topic
* Remove some unnecessary commented code
  • Loading branch information
matthewd0123 committed Aug 7, 2024
1 parent 37e4fe3 commit f48c9d5
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 161 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ tarpaulin-report.html

.idea/
.vscode/launch.json
.vscode/settings.json
165 changes: 82 additions & 83 deletions example-utils/usubscription-static-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,14 @@ use up_rust::core::usubscription::{
FetchSubscriptionsResponse, NotificationsRequest, SubscriberInfo, Subscription,
SubscriptionRequest, SubscriptionResponse, USubscription, UnsubscribeRequest,
};
use up_rust::{UStatus, UUri};
use up_rust::{UCode, UStatus, UUri};

pub struct USubscriptionStaticFile {
static_file: PathBuf,
}

impl USubscriptionStaticFile {
pub fn new(static_file: Option<PathBuf>) -> Self {
// Set static_file to static_file if it has value, else set to "static-configs/testdata.json"
let static_file =
static_file.unwrap_or_else(|| PathBuf::from("static-configs/testdata.json"));
pub fn new(static_file: PathBuf) -> Self {
USubscriptionStaticFile { static_file }
}
}
Expand Down Expand Up @@ -70,91 +67,93 @@ impl USubscription for USubscriptionStaticFile {

println!("subscription_json_file: {:?}", subscription_json_file);

match canonicalize(subscription_json_file) {
Ok(subscription_json_file) => {

match fs::read_to_string(&subscription_json_file) {
Ok(data) => match serde_json::from_str::<Value>(&data) {
Ok(res) => {
if let Some(obj) = res.as_object() {
for (key, value) in obj {
println!("key: {}, value: {}", key, value);
let mut subscriber_set: HashSet<UUri> = HashSet::new();

if let Some(array) = value.as_array() {
for subscriber in array {
println!("subscriber: {}", subscriber);

if let Some(subscriber_str) = subscriber.as_str() {
match UUri::from_str(subscriber_str) {
Ok(uri) => {
println!("All good for subscriber");
subscriber_set.insert(uri);
}
Err(error) => {
println!("Error with Deserializing Subscriber: {}", error);
}
}
} else {
println!("Unable to parse subscriber");
}
}
}

println!("key: {}", key);
let topic = match UUri::from_str(&key.to_string()) {
Ok(mut uri) => {
println!("All good for key");
uri.resource_id = 0x8001;
uri
}
Err(error) => {
println!("Error with Deserializing Key: {}", error);
continue;
}
};

let details_vec = Vec::new();
for subscriber in subscriber_set {
let subscriber_info_tmp = SubscriberInfo {
uri: Some(subscriber).into(),
details: details_vec.clone(),
..Default::default()
};
let subscription_tmp = Subscription {
topic: Some(topic.clone()).into(),
subscriber: Some(subscriber_info_tmp).into(),
..Default::default()
};
subscriptions_vec.push(subscription_tmp);
}
let subscription_json_file = match canonicalize(subscription_json_file) {
Ok(path) => path,
Err(e) => {
return Err(UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
format!("Static subscription file not found: {e:?}"),
))
}
};

let data = fs::read_to_string(&subscription_json_file).map_err(|e| {
UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
format!("Unable to read file: {e:?}"),
)
})?;

let res: Value = serde_json::from_str(&data).map_err(|e| {
UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
format!("Unable to parse JSON: {e:?}"),
)
})?;

if let Some(obj) = res.as_object() {
for (key, value) in obj {
println!("key: {}, value: {}", key, value);
let mut subscriber_set: HashSet<UUri> = HashSet::new();

if let Some(array) = value.as_array() {
for subscriber in array {
println!("subscriber: {}", subscriber);

if let Some(subscriber_str) = subscriber.as_str() {
match UUri::from_str(subscriber_str) {
Ok(uri) => {
println!("All good for subscriber");
subscriber_set.insert(uri);
}
Err(error) => {
println!("Error with Deserializing Subscriber: {}", error);
}
}

println!("{}", res);
dbg!(&subscriptions_vec);
let fetch_response = FetchSubscriptionsResponse {
subscriptions: subscriptions_vec,
..Default::default()
};
Ok(fetch_response)
}
Err(e) => {
eprintln!("Unable to parse JSON: {}", e);
Ok(empty_fetch_response)
} else {
println!("Unable to parse subscriber");
}
},
Err(e) => {
eprintln!("Unable to read file: {}", e);
Ok(empty_fetch_response)
}
}
}
Err(e) => {
eprintln!("Unable to canonicalize path: {}", e);
Ok(empty_fetch_response)

println!("key: {}", key);
let topic = match UUri::from_str(&key.to_string()) {
Ok(mut uri) => {
println!("All good for key");
uri.resource_id = 0x8001;
uri
}
Err(error) => {
println!("Error with Deserializing Key: {}", error);
continue;
}
};

let details_vec = Vec::new();
for subscriber in subscriber_set {
let subscriber_info_tmp = SubscriberInfo {
uri: Some(subscriber).into(),
details: details_vec.clone(),
..Default::default()
};
let subscription_tmp = Subscription {
topic: Some(topic.clone()).into(),
subscriber: Some(subscriber_info_tmp).into(),
..Default::default()
};
subscriptions_vec.push(subscription_tmp);
}
}
}

println!("{}", res);
dbg!(&subscriptions_vec);
let fetch_response = FetchSubscriptionsResponse {
subscriptions: subscriptions_vec,
..Default::default()
};

Ok(fetch_response)
}

async fn unsubscribe(&self, unsubscribe_request: UnsubscribeRequest) -> Result<(), UStatus> {
Expand Down
33 changes: 21 additions & 12 deletions subscription-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ use up_rust::core::usubscription::{
};
use up_rust::UUri;

pub type SubscribersMap = Mutex<HashMap<UUri, HashSet<SubscriptionInformation>>>;
pub type SubscribersMap = Mutex<HashMap<String, HashSet<SubscriptionInformation>>>;

// Tracks subscription information inside the SubscriptionCache
pub struct SubscriptionInformation {
pub topic: UUri,
pub subscriber: SubscriberInfo,
pub status: SubscriptionStatus,
pub attributes: SubscribeAttributes,
Expand All @@ -47,6 +49,7 @@ impl Hash for SubscriptionInformation {
impl Clone for SubscriptionInformation {
fn clone(&self) -> Self {
Self {
topic: self.topic.clone(),
subscriber: self.subscriber.clone(),
status: self.status.clone(),
attributes: self.attributes.clone(),
Expand All @@ -66,45 +69,53 @@ impl SubscriptionCache {
pub fn new(subscription_cache_map: FetchSubscriptionsResponse) -> Self {
let mut subscription_cache_hash_map = HashMap::new();
for subscription in subscription_cache_map.subscriptions {
let uri = if let Some(uri) = subscription.topic.into_option(){
uri
let topic = if let Some(topic) = subscription.topic.into_option() {
topic
} else {
println!("Unable to parse URI from subscription, skipping...");
continue;
};
let subscriber = if let Some(subscriber) = subscription.subscriber.into_option(){
let subscriber = if let Some(subscriber) = subscription.subscriber.into_option() {
subscriber
} else {
println!("Unable to parse subscriber from subscription, skipping...");
continue;
};
let status = if let Some(status) = subscription.status.into_option(){
let status = if let Some(status) = subscription.status.into_option() {
status
} else {
println!("Unable to parse status from subscription, setting as default");
SubscriptionStatus::default()
};
let attributes = if let Some(attributes) = subscription.attributes.into_option(){
let attributes = if let Some(attributes) = subscription.attributes.into_option() {
attributes
} else {
println!("Unable to parse attributes from subscription, setting as default");
SubscribeAttributes::default()
};
let config = if let Some(config) = subscription.config.into_option(){
let config = if let Some(config) = subscription.config.into_option() {
config
} else {
println!("Unable to parse config from subscription, setting as default");
EventDeliveryConfig::default()
};
// Create new hashset if the key does not exist and insert the subscription
let subscription_information = SubscriptionInformation {
topic: topic.clone(),
subscriber,
status,
attributes,
config
config,
};
let subscriber_authority_name = subscription_information
.subscriber
.uri
.as_ref()
.unwrap()
.authority_name
.clone();
subscription_cache_hash_map
.entry(uri)
.entry(subscriber_authority_name)
.or_insert_with(HashSet::new)
.insert(subscription_information);
}
Expand All @@ -113,9 +124,7 @@ impl SubscriptionCache {
}
}

pub async fn fetch_cache(
&self,
) -> HashMap<UUri, HashSet<SubscriptionInformation>> {
pub async fn fetch_cache(&self) -> HashMap<String, HashSet<SubscriptionInformation>> {
let cache_map = self.subscription_cache_map.lock().await;
let mut cloned_map = HashMap::new();

Expand Down
34 changes: 6 additions & 28 deletions up-linux-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,19 @@ mod config;
use crate::config::{Config, HostTransport};
use clap::Parser;
use log::trace;
use std::path::PathBuf;
use std::fs::File;
use std::io::Read;
use std::sync::Arc;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::{env, thread};
use up_rust::{UCode, UStatus, UTransport};

use std::str::FromStr;
use up_streamer::{Endpoint, UStreamer};
use up_transport_vsomeip::UPTransportVsomeip;
use up_transport_zenoh::UPClientZenoh;
use usubscription_static_file::USubscriptionStaticFile;
use usubscription_static_file::USubscriptionStaticFile;
use zenoh::config::Config as ZenohConfig;
use zenoh::config::Endpoint as ZenohEndpoint;
use zenoh::config::EndPoint as ZenohEndpoint;

#[derive(Parser)]
#[command()]
Expand All @@ -31,28 +28,9 @@ struct StreamerArgs {
async fn main() -> Result<(), UStatus> {
env_logger::init();

let args = StreamerArgs::parse();

let mut file = File::open(args.config)
.map_err(|e| UStatus::fail_with_code(UCode::NOT_FOUND, format!("File not found: {e:?}")))?;
let mut contents = String::new();
file.read_to_string(&mut contents).map_err(|e| {
UStatus::fail_with_code(
UCode::INTERNAL,
format!("Unable to read config file: {e:?}"),
)
})?;

let config: Config = json5::from_str(&contents).map_err(|e| {
UStatus::fail_with_code(
UCode::INTERNAL,
format!("Unable to parse config file: {e:?}"),
)
})?;

let usubscription = Arc::new(USubscriptionStaticFile::new(Some(PathBuf::from(
let usubscription = Arc::new(USubscriptionStaticFile::new(PathBuf::from(
"example-utils/usubscription-static-file/static-configs/testdata.json",
))));
)));

let args = StreamerArgs::parse();

Expand Down Expand Up @@ -82,7 +60,7 @@ async fn main() -> Result<(), UStatus> {
let mut zenoh_config = ZenohConfig::default();

// Specify the address to listen on using IPv4
let ipv4_endpoint = EndPoint::from_str("tcp/0.0.0.0:7447");
let ipv4_endpoint = ZenohEndpoint::from_str("tcp/0.0.0.0:7447");

// Add the IPv4 endpoint to the Zenoh configuration
zenoh_config
Expand Down
Loading

0 comments on commit f48c9d5

Please sign in to comment.