Skip to content

Commit

Permalink
Enable encrypted MQTT/ connection with Azure EventGrid
Browse files Browse the repository at this point in the history
Enable/Disable SubscriptionIds depending on broker capabilities.
Azure EventGrid does not support SubscriptionIds even though they are specified in the MQTTv5 spec.

Co-authored-by: Pete LeVasseur <[email protected]>
  • Loading branch information
ValMobBIllich and PLeVasseur authored Oct 29, 2024
1 parent 8af1a74 commit 18fe342
Show file tree
Hide file tree
Showing 9 changed files with 309 additions and 33 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ target/
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock

.cargo/

# These are backup files generated by rustfmt
**/*.rs.bk

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
tokio = { version = "1.38", features = ["full"] }
tokio-macros = { version = "2.3" }
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "da722852004f657fa8d0282369fcfccb0ccda112" }
up-rust = "0.2.0"
url = { version = "2.5" }
uuid = { version = "1.7", features = ["v8"] }

Expand Down
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,22 @@ cargo test

### Running the Examples

First, ensure you have a local MQTT broker running, such as [Mosquitto](https://github.com/eclipse/mosquitto).
1. Start an MQTT broker (e.g. mosquitto)

2. Set up your environment (for example with a config file at .cargo/config.toml)

Make sure to set these parameters:
```toml
[env]
MQTT_PROTOCOL = "'mqtt' or 'mqtts'"
MQTT_PORT = "8883 for ssl encrypted mqtt"
MQTT_HOSTNAME = "the hostname/ url of the broker"
KEY_STORE = "the .pem file location corresponding to an ssl certificate (if using mqtts)"
PRIVATE_KEY_PW = "the password to the .pem file (if using mqtts)"
CLIENT_NAME = "the name of the eventgrid client (if using mqtts)"
```

Then start the following two examples from your repo root directory.
3. Start the following two examples from your repo root directory.

```bash
cargo run --example publisher_example
Expand Down
92 changes: 92 additions & 0 deletions examples/encrypted_publisher_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::{env, str::FromStr, time::SystemTime};

use env_logger::{Builder, Target};
use log::LevelFilter;
use paho_mqtt::SslOptionsBuilder;
use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType};
use up_rust::{UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUri, UUID};

#[tokio::main]
async fn main() -> Result<(), UStatus> {
Builder::new()
.target(Target::Stdout) // Logs to stdout
.filter(None, LevelFilter::Trace) // Default level
.init();

// Set the protocol type (Mqtts for encrypted mqtt)
let protocol = MqttProtocol::Mqtts;

// Build the ssl options (only needed if protocol is Mqtts!)
let ssl_options = Some(
SslOptionsBuilder::new()
.key_store(env::var("KEY_STORE").expect("KEY_STORE env variable not found"))
.expect("Certificate file not found.")
.private_key_password(
env::var("PRIVATE_KEY_PW").expect("PRIVATE_KEY_PW env variable not found"),
)
.enable_server_cert_auth(false)
.finalize(),
);
// If the mqtt broker has a specific username attached to the ssl certificate, it must be included in the config
let user_name = env::var("CLIENT_NAME")
.expect("CLIENT_NAME env variable not found")
.to_string();

let config = MqttConfig {
mqtt_protocol: protocol,
mqtt_hostname: env::var("MQTT_HOSTNAME")
.expect("MQTT_HOSTNAME env variable not found")
.to_string(),
mqtt_port: 8883,
max_buffered_messages: 100,
max_subscriptions: 100,
session_expiry_interval: 3600,
ssl_options,
username: user_name,
};

let client = UPClientMqtt::new(
config,
UUID::build(),
"Vehicle_B".to_string(),
UPClientMqttType::Device,
)
.await?;

let source =
UUri::from_str("//Vehicle_B/A8000/2/8A50").expect("Failed to create source filter");

loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let current_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let message = UMessageBuilder::publish(source.clone())
.build_with_payload(
current_time.to_string(),
UPayloadFormat::UPAYLOAD_FORMAT_TEXT,
)
.expect("Failed to build message");

println!(
"Sending message: {} to source: {}",
current_time,
source.to_uri(false)
);
client.send(message).await?;
}
}
105 changes: 105 additions & 0 deletions examples/encrypted_subscriber_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::{
env,
str::{self, FromStr},
sync::Arc,
};

use async_trait::async_trait;
use env_logger::{Builder, Target};
use log::LevelFilter;
use paho_mqtt::SslOptionsBuilder;
use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType};
use up_rust::{UListener, UMessage, UStatus, UTransport, UUri, UUID};

const WILDCARD_ENTITY_ID: u32 = 0x0000_FFFF;
const WILDCARD_ENTITY_VERSION: u32 = 0x0000_00FF;
const WILDCARD_RESOURCE_ID: u32 = 0x0000_FFFF;

struct PrintlnListener {}

#[async_trait]
impl UListener for PrintlnListener {
async fn on_receive(&self, message: UMessage) {
let msg_payload = message.payload.unwrap();
let msg_str: &str = str::from_utf8(&msg_payload).unwrap();
println!("Received message: {msg_str}");
}
}

#[tokio::main]
async fn main() -> Result<(), UStatus> {
Builder::new()
.target(Target::Stdout) // Logs to stdout
.filter(None, LevelFilter::Trace) // Default level
.init();

// Set the protocol type ("mqtts" for encrypted mqtt)
let protocol = MqttProtocol::Mqtts;

// Build the ssl options (only needed if protocol is Mqtts!)
let ssl_options = Some(
SslOptionsBuilder::new()
.key_store(env::var("KEY_STORE").expect("KEY_STORE env variable not found"))
.expect("Certificate file not found.")
.private_key_password(
env::var("PRIVATE_KEY_PW").expect("PRIVATE_KEY_PW env variable not found"),
)
.enable_server_cert_auth(false)
.finalize(),
);
// If the mqtt broker has a specific username attached to the ssl certificate, it must be included in the config
let user_name = env::var("CLIENT_NAME")
.expect("CLIENT_NAME env variable not found")
.to_string();

// Build the configuration for the connection
let config = MqttConfig {
mqtt_protocol: protocol,
mqtt_hostname: env::var("MQTT_HOSTNAME")
.expect("MQTT_HOSTNAME env variable not found")
.to_string(),
mqtt_port: 8883,
max_buffered_messages: 100,
max_subscriptions: 100,
session_expiry_interval: 3600,
ssl_options,
username: user_name,
};

let client = UPClientMqtt::new(
config,
UUID::build(),
"Vehicle_B".to_string(),
UPClientMqttType::Device,
)
.await?;

let listener = Arc::new(PrintlnListener {});
let source_filter = UUri::from_str(&format!(
"//Vehicle_B/{WILDCARD_ENTITY_ID:X}/{WILDCARD_ENTITY_VERSION:X}/{WILDCARD_RESOURCE_ID:X}"
))
.expect("Failed to create source filter");

println!("Subscribing to: {}", source_filter.to_uri(false));

client
.register_listener(&source_filter, None, listener.clone())
.await?;

loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
22 changes: 19 additions & 3 deletions examples/publisher_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,34 @@

use std::{str::FromStr, time::SystemTime};

use up_client_mqtt5_rust::{MqttConfig, UPClientMqtt, UPClientMqttType};
use env_logger::{Builder, Target};
use log::LevelFilter;
use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType};
use up_rust::{UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUri, UUID};

#[tokio::main]
async fn main() -> Result<(), UStatus> {
Builder::new()
.target(Target::Stdout) // Logs to stdout
.filter(None, LevelFilter::Trace) // Default level
.init();

// Set the protocol type ("mqtt" for unencrypted mqtt)
let protocol = MqttProtocol::Mqtt;

// no need to build ssl options since we are using unencrypted mqtt, username is arbitrary
let ssl_options = None;
let user_name = "eclipse_testuser".to_string();

let config = MqttConfig {
mqtt_protocol: protocol,
mqtt_hostname: "localhost".to_string(),
mqtt_port: "1883".to_string(),
mqtt_port: 1883,
max_buffered_messages: 100,
max_subscriptions: 100,
session_expiry_interval: 3600,
ssl_options: None,
ssl_options,
username: user_name,
};

let client = UPClientMqtt::new(
Expand Down
24 changes: 20 additions & 4 deletions examples/subscriber_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use std::{
};

use async_trait::async_trait;
use up_client_mqtt5_rust::{MqttConfig, UPClientMqtt, UPClientMqttType};
use env_logger::{Builder, Target};
use log::LevelFilter;
use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType};
use up_rust::{UListener, UMessage, UStatus, UTransport, UUri, UUID};

const WILDCARD_ENTITY_ID: u32 = 0x0000_FFFF;
Expand All @@ -37,19 +39,33 @@ impl UListener for PrintlnListener {

#[tokio::main]
async fn main() -> Result<(), UStatus> {
Builder::new()
.target(Target::Stdout) // Logs to stdout
.filter(None, LevelFilter::Trace) // Default level
.init();

// Set the protocol type ("mqtt" for unencrypted mqtt)
let protocol = MqttProtocol::Mqtt;

// no need to build ssl options since we are using unencrypted mqtt, username is arbitrary
let ssl_options = None;
let user_name = "eclipse_testuser".to_string();

let config = MqttConfig {
mqtt_protocol: protocol,
mqtt_hostname: "localhost".to_string(),
mqtt_port: "1883".to_string(),
mqtt_port: 1883,
max_buffered_messages: 100,
max_subscriptions: 100,
session_expiry_interval: 3600,
ssl_options: None,
ssl_options,
username: user_name,
};

let client = UPClientMqtt::new(
config,
UUID::build(),
"Vehicle_A".to_string(),
"Vehicle_B".to_string(),
UPClientMqttType::Device,
)
.await?;
Expand Down
Loading

0 comments on commit 18fe342

Please sign in to comment.