Skip to content

Commit

Permalink
zenoh-mqtt client-service works
Browse files Browse the repository at this point in the history
  • Loading branch information
ValMobBIllich committed Nov 11, 2024
1 parent de15769 commit fdca46f
Show file tree
Hide file tree
Showing 16 changed files with 418 additions and 509 deletions.
190 changes: 95 additions & 95 deletions Cargo.lock

Large diffs are not rendered by default.

106 changes: 106 additions & 0 deletions example-streamer-uses/src/bin/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
use clap::Parser;
use up_client_mqtt5_rust::MqttConfig;
use up_transport_zenoh::zenoh_config;

#[derive(clap::ValueEnum, Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum WhatAmIType {
Peer,
Client,
Router,
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
pub struct ZenohArgs {
#[arg(short, long)]
/// A configuration file.
config: Option<String>,
#[arg(short, long)]
/// The Zenoh session mode [default: peer].
mode: Option<WhatAmIType>,
#[arg(short = 'e', long)]
/// Endpoints to connect to.
connect: Vec<String>,
#[arg(short, long)]
/// Endpoints to listen on.
listen: Vec<String>,
#[arg(long)]
/// Disable the multicast-based scouting mechanism.
no_multicast_scouting: bool,
}

#[allow(clippy::must_use_candidate, clippy::missing_panics_doc)]
pub fn get_zenoh_config() -> zenoh_config::Config {
let args = ZenohArgs::parse();

// Load the config from file path
let mut zenoh_cfg = match &args.config {
Some(path) => zenoh_config::Config::from_file(path).unwrap(),
None => zenoh_config::Config::default(),
};

// You can choose from Router, Peer, Client
match args.mode {
Some(WhatAmIType::Peer) => zenoh_cfg.set_mode(Some(zenoh::config::WhatAmI::Peer)),
Some(WhatAmIType::Client) => zenoh_cfg.set_mode(Some(zenoh::config::WhatAmI::Client)),
Some(WhatAmIType::Router) => zenoh_cfg.set_mode(Some(zenoh::config::WhatAmI::Router)),
None => Ok(None),
}
.unwrap();

// Set connection address
if !args.connect.is_empty() {
zenoh_cfg
.connect
.endpoints
.set(args.connect.iter().map(|v| v.parse().unwrap()).collect())
.unwrap();
}

// Set listener address
if !args.listen.is_empty() {
zenoh_cfg
.listen
.endpoints
.set(args.listen.iter().map(|v| v.parse().unwrap()).collect())
.unwrap();
}

// Set multicast configuration
if args.no_multicast_scouting {
zenoh_cfg
.scouting
.multicast
.set_enabled(Some(false))
.unwrap();
}

zenoh_cfg
}

pub fn get_mqtt_config() -> MqttConfig {
// Todo: Once MqttConfig is serializable make this ocnfigurable via json5 like zenoh
let ssl_options = None;

MqttConfig {
mqtt_protocol: up_client_mqtt5_rust::MqttProtocol::Mqtt,
mqtt_port: 1883,
mqtt_hostname: "localhost".to_string(),
max_buffered_messages: 100,
max_subscriptions: 100,
session_expiry_interval: 3600,
ssl_options: ssl_options,
username: "user".to_string(),
}
}
54 changes: 24 additions & 30 deletions example-streamer-uses/src/bin/mqtt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,24 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

mod common;

use async_trait::async_trait;
use clap::Parser;
use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse};
use log::debug;
use protobuf::Message;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType};
use up_client_mqtt5_rust::{UPClientMqtt, UPClientMqttType};
use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri, UUID};

const SERVICE_AUTHORITY: &str = "linux";
const SERVICE_AUTHORITY: &str = "ecu_authority";
const SERVICE_UE_ID: u32 = 0x1236;
const SERVICE_UE_VERSION_MAJOR: u8 = 1;
const SERVICE_RESOURCE_ID: u16 = 0x0896;

const CLIENT_AUTHORITY: &str = "mqtt_authority";
const CLIENT_UE_ID: u32 = 0x4321;
const CLIENT_AUTHORITY: &str = "cloud_authority";
const CLIENT_UE_ID: u32 = 0x5678;
const CLIENT_UE_VERSION_MAJOR: u8 = 1;
const CLIENT_RESOURCE_ID: u16 = 0;

Expand All @@ -38,7 +39,7 @@ struct ServiceResponseListener;
#[async_trait]
impl UListener for ServiceResponseListener {
async fn on_receive(&self, msg: UMessage) {
println!("ServiceResponseListener: Received a message: {msg:?}");
debug!("ServiceResponseListener: Received a message: {msg:?}");

let Some(payload_bytes) = msg.payload else {
panic!("No payload bytes");
Expand All @@ -48,7 +49,7 @@ impl UListener for ServiceResponseListener {
panic!("Unable to parse into HelloResponse");
};

println!("Here we received response: {hello_response:?}");
debug!("Here we received response: {hello_response:?}");
}
}

Expand All @@ -58,35 +59,15 @@ async fn main() -> Result<(), UStatus> {

println!("Started mqtt_client.");

let mqtt_config = MqttConfig {
mqtt_protocol: MqttProtocol::Mqtt,
mqtt_hostname: "localhost".to_string(),
mqtt_port: 1883,
max_buffered_messages: 100,
max_subscriptions: 100,
session_expiry_interval: 3600,
ssl_options: None,
username: "user_name".to_string(),
};

let client: Arc<dyn UTransport> = Arc::new(
UPClientMqtt::new(
mqtt_config,
UUID::build(),
"authority_name".to_string(),
UPClientMqttType::Cloud,
)
.await
.expect("Could not create mqtt transport."),
);

// Source represents the client (specifically the topic that the client sends to)
let source = UUri::try_from_parts(
CLIENT_AUTHORITY,
CLIENT_UE_ID,
CLIENT_UE_VERSION_MAJOR,
CLIENT_RESOURCE_ID,
)
.unwrap();
// Sink is the destination entity which the streamer should rout our messages to.
let sink = UUri::try_from_parts(
SERVICE_AUTHORITY,
SERVICE_UE_ID,
Expand All @@ -95,6 +76,19 @@ async fn main() -> Result<(), UStatus> {
)
.unwrap();

let mqtt_config = common::get_mqtt_config();

let client: Arc<dyn UTransport> = Arc::new(
UPClientMqtt::new(
mqtt_config,
UUID::build(),
CLIENT_AUTHORITY.to_string(),
UPClientMqttType::Device,
)
.await
.expect("Could not create mqtt transport."),
);

let service_response_listener: Arc<dyn UListener> = Arc::new(ServiceResponseListener);
client
.register_listener(&sink, Some(&source), service_response_listener)
Expand Down
48 changes: 21 additions & 27 deletions example-streamer-uses/src/bin/mqtt_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

mod common;

use chrono::Local;
use chrono::Timelike;
use clap::Parser;
use hello_world_protos::hello_world_topics::Timer;
use hello_world_protos::timeofday::TimeOfDay;
use std::str::FromStr;
use log::info;
use std::sync::Arc;
use std::time::Duration;
use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType};
use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri, UUID};
use up_client_mqtt5_rust::{UPClientMqtt, UPClientMqttType};
use up_rust::{UMessageBuilder, UStatus, UTransport, UUri, UUID};

const PUB_TOPIC_AUTHORITY: &str = "mqtt_authority";
const PUB_TOPIC_AUTHORITY: &str = "cloud_authority";
const PUB_TOPIC_UE_ID: u32 = 0x3039;
const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1;
const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001;
Expand All @@ -31,38 +32,30 @@ const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001;
async fn main() -> Result<(), UStatus> {
env_logger::init();

println!("Started mqtt_publisher.");
info!("Started mqtt_publisher.");

// This is the URI of the publisher entity
let source = UUri::try_from_parts(
PUB_TOPIC_AUTHORITY,
PUB_TOPIC_UE_ID,
PUB_TOPIC_UE_VERSION_MAJOR,
PUB_TOPIC_RESOURCE_ID,
)
.unwrap();

let mqtt_config = MqttConfig {
mqtt_protocol: MqttProtocol::Mqtt,
mqtt_hostname: "localhost".to_string(),
mqtt_port: 1883,
max_buffered_messages: 100,
max_subscriptions: 100,
session_expiry_interval: 3600,
ssl_options: None,
username: "user_name".to_string(),
};
let mqtt_config = common::get_mqtt_config();

let publisher: Arc<dyn UTransport> = Arc::new(
UPClientMqtt::new(
mqtt_config,
UUID::build(),
"authority_name".to_string(),
UPClientMqttType::Cloud,
PUB_TOPIC_AUTHORITY.to_string(),
UPClientMqttType::Device,
)
.await
.expect("Could not create mqtt transport."),
);

let source = UUri::try_from_parts(
PUB_TOPIC_AUTHORITY,
PUB_TOPIC_UE_ID,
PUB_TOPIC_UE_VERSION_MAJOR,
PUB_TOPIC_RESOURCE_ID,
)
.unwrap();

loop {
tokio::time::sleep(Duration::from_millis(1000)).await;

Expand All @@ -81,10 +74,11 @@ async fn main() -> Result<(), UStatus> {
..Default::default()
};

// Publish messages signed with the source URI
let publish_msg = UMessageBuilder::publish(source.clone())
.build_with_protobuf_payload(&timer_message)
.unwrap();
println!("Sending Publish message:\n{publish_msg:?}");
info!("Sending Publish message:\n{publish_msg:?}");

publisher.send(publish_msg).await?;
}
Expand Down
51 changes: 16 additions & 35 deletions example-streamer-uses/src/bin/mqtt_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,22 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

mod common;

use async_trait::async_trait;
use clap::Parser;
use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse};
use log::error;
use protobuf::Message;
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType};
use up_client_mqtt5_rust::{UPClientMqtt, UPClientMqttType};
use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri, UUID};

const SERVICE_AUTHORITY: &str = "linux";
const SERVICE_AUTHORITY: &str = "cloud_authority";
const SERVICE_UE_ID: u32 = 0x1236;
const SERVICE_UE_VERSION_MAJOR: u8 = 1;
const SERVICE_RESOURCE_ID: u16 = 0x0896;

fn service_uuri() -> UUri {
UUri::try_from_parts(
SERVICE_AUTHORITY,
SERVICE_UE_ID,
SERVICE_UE_VERSION_MAJOR,
0,
)
.unwrap()
}

struct ServiceRequestResponder {
client: Arc<dyn UTransport>,
}
Expand Down Expand Up @@ -83,39 +73,30 @@ async fn main() -> Result<(), UStatus> {

println!("Started mqtt_service.");

let mqtt_config = MqttConfig {
mqtt_protocol: MqttProtocol::Mqtt,
mqtt_hostname: "localhost".to_string(),
mqtt_port: 1883,
max_buffered_messages: 100,
max_subscriptions: 100,
session_expiry_interval: 3600,
ssl_options: None,
username: "user_name".to_string(),
};
// We set the source filter to "any" so that we process messages from all device that send some.
let source_filter = UUri::any();
// The sink filter gets specified so that we only process messages directed at this entity.
let sink_filter = UUri::try_from_parts(
SERVICE_AUTHORITY,
SERVICE_UE_ID,
SERVICE_UE_VERSION_MAJOR,
SERVICE_RESOURCE_ID,
)
.unwrap();

let service_uri: String = (&service_uuri()).into();
let mqtt_config = common::get_mqtt_config();

let service: Arc<dyn UTransport> = Arc::new(
UPClientMqtt::new(
mqtt_config,
UUID::build(),
SERVICE_AUTHORITY.to_string(),
UPClientMqttType::Device,
UPClientMqttType::Device, // Todo: make sure that UPClientMqttType::Cloud also works
)
.await
.expect("Could not create mqtt transport."),
);

let source_filter = UUri::any();
let sink_filter = UUri::try_from_parts(
SERVICE_AUTHORITY,
SERVICE_UE_ID,
SERVICE_UE_VERSION_MAJOR,
SERVICE_RESOURCE_ID,
)
.unwrap();

let service_request_responder: Arc<dyn UListener> =
Arc::new(ServiceRequestResponder::new(service.clone()));
service
Expand Down
Loading

0 comments on commit fdca46f

Please sign in to comment.