Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for MQTTS #43

Merged
merged 7 commits into from
Feb 29, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Added support for providing PKI material in configuration file in bas…
…e64 encoded format.
  • Loading branch information
gmartin82 committed Feb 28, 2024
commit 280d65930e1a3cec6fb019e56c49f7cc469ff12b
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ categories = ["network-programming"]
async-channel = "2.2.0"
async-std = "=1.12.0"
async-trait = "0.1.66"
base64 = "0.21.4"
clap = "3.2.23"
derivative = "2.2.0"
env_logger = "0.10.0"
@@ -44,6 +45,7 @@ regex = "1.7.1"
rustc_version = "0.4"
rustls = "0.21.7"
rustls-pemfile = "1.0.4"
secrecy = { version = "0.8.0", features = ["serde", "alloc"] }
serde = "1.0.154"
serde_json = "1.0.94"
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [
16 changes: 11 additions & 5 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
@@ -50,20 +50,26 @@
////
// tls: {
// ////
// //// server_private_key: Path to the TLS private key.
// //// server_private_key: TLS private key provided as either a file or base 64 encoded string.
// //// One of the values below must be provided.
// ////
// server_private_key: "/path/to/private-key.pem",
// // server_private_key: "/path/to/private-key.pem",
// // server_private_key_base64: "base64-private-key",
//
// ////
// //// server_certificate: Path to the TLS public certificate.
// //// server_certificate: TLS public certificate provided as either a file or base 64 encoded string.
// //// One of the values below must be provided.
// ////
// server_certificate: "/path/to/certificate.pem",
// // server_certificate: "/path/to/certificate.pem",
// // server_certificate_base64: "base64-certificate",
//
// ////
// //// root_ca_certificate: Path to the certificate of the certificate authority used to validate clients connecting to the MQTT server.
// //// root_ca_certificate: Certificate of the certificate authority used to validate clients connecting to the MQTT server.
// //// Provided as either a file or base 64 encoded string.
// //// This setting is optional and enables mutual TLS (mTLS) support if provided.
// ////
// // root_ca_certificate: "/path/to/root-ca-certificate.pem",
// // root_ca_certificate_base64: "base64-root-ca-certificate",
// },

},
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -103,7 +103,9 @@ The MQTT plugin and standalone bridge for Eclipse Zenoh supports MQTTS. MQTTS ca

### Server side authentication configuration

In the configuration file, the required **tls** fields are **server_private_key** and **server_certificate**.
Server side authentication requires both a private key and certificate for the server. These can be provided as either a file or as a base 64 encoded string.

In the configuration file, the required **tls** fields when using files are **server_private_key** and **server_certificate**. When using base 64 encoded strings the required **tls** fields are **server_private_key_base64** and **server_certificate_base64**.

An example configuration file supporting server side authentication would be:

@@ -120,11 +122,13 @@ An example configuration file supporting server side authentication would be:
}
```

The standalone bridge (`zenoh-bridge-mqtt`) also allows these settings to be provided through the **`--server-private-key`** and **`--server-certificate`** command line arguments.
The standalone bridge (`zenoh-bridge-mqtt`) also allows the required files to be provided through the **`--server-private-key`** and **`--server-certificate`** command line arguments.

### Mutual authentication (mTLS) configuration

In order to enable mutual authentication a certificate for the certificate authority used to validate clients connecting to the MQTT server must also be provided. In the configuration file, the required **tls** field is **root_ca_certificate**.
In order to enable mutual authentication a certificate for the certificate authority used to validate clients connecting to the MQTT server must also be provided. This can be provided as either a file or a base 64 encoded string.

In the configuration file, the required **tls** field when using a file is **root_ca_certificate**. When using base 64 encoded strings the required **tls** field when using a file is **root_ca_certificate_base64**.

An example configuration file supporting server side authentication would be:

@@ -141,7 +145,7 @@ An example configuration file supporting server side authentication would be:
}
}
```
The standalone bridge (`zenoh-bridge-mqtt`) also allows this setting to be provided through the **`--root-ca-certificate`** command line argument.
The standalone bridge (`zenoh-bridge-mqtt`) also allows the required file to be provided through the **`--root-ca-certificate`** command line argument.

## How to install it

2 changes: 2 additions & 0 deletions zenoh-plugin-mqtt/Cargo.toml
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ stats = ["zenoh/stats"]
async-channel = { workspace = true }
async-std = { workspace = true, features = ["unstable", "attributes"] }
async-trait = { workspace = true }
base64 = { workspace = true }
derivative = { workspace = true }
env_logger = { workspace = true }
flume = { workspace = true }
@@ -49,6 +50,7 @@ ntex-tls = { workspace = true }
regex = { workspace = true }
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
secrecy = {workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
zenoh = { workspace = true }
12 changes: 9 additions & 3 deletions zenoh-plugin-mqtt/src/config.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ use regex::Regex;
use serde::de::{Unexpected, Visitor};
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use std::fmt;
use zenoh::config::SecretValue;
use zenoh::prelude::*;

const DEFAULT_MQTT_INTERFACE: &str = "0.0.0.0";
@@ -56,10 +57,15 @@ pub struct Config {
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct TLSConfig {
pub server_private_key: String,
pub server_certificate: String,
#[serde(default)]
pub server_private_key: Option<String>,
#[serde(skip_serializing)]
pub server_private_key_base64: Option<SecretValue>,
pub server_certificate: Option<String>,
#[serde(skip_serializing)]
pub server_certificate_base64: Option<SecretValue>,
pub root_ca_certificate: Option<String>,
#[serde(skip_serializing)]
pub root_ca_certificate_base64: Option<SecretValue>,
}

fn default_mqtt_port() -> String {
125 changes: 92 additions & 33 deletions zenoh-plugin-mqtt/src/lib.rs
Original file line number Diff line number Diff line change
@@ -20,9 +20,9 @@ use ntex_mqtt::{v3, v5, MqttError, MqttServer};
use ntex_tls::rustls::Acceptor;
use rustls::server::AllowAnyAuthenticatedClient;
use rustls::{Certificate, PrivateKey, RootCertStore, ServerConfig};
use secrecy::ExposeSecret;
use serde_json::Value;
use std::env;
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;
use zenoh::plugins::{RunningPluginTrait, ZenohPlugin};
@@ -82,15 +82,21 @@ impl Plugin for MqttPlugin {
.ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
let config: Config = serde_json::from_value(plugin_conf.clone())
.map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
async_std::task::spawn(run(runtime.clone(), config));

let tls_config = match config.tls.as_ref() {
Some(tls) => Some(create_tls_config(tls)?),
None => None,
};

async_std::task::spawn(run(runtime.clone(), config, tls_config));
Ok(Box::new(MqttPlugin))
}
}

impl PluginControl for MqttPlugin {}
impl RunningPluginTrait for MqttPlugin {}

async fn run(runtime: Runtime, config: Config) {
async fn run(runtime: Runtime, config: Config, tls_config: Option<Arc<ServerConfig>>) {
// Try to initiate login.
// Required in case of dynamic lib, otherwise no logs.
// But cannot be done twice in case of static link.
@@ -125,11 +131,6 @@ async fn run(runtime: Runtime, config: Config) {
.await
.expect("Failed to create AdminSpace queryable");

let tls_config = config
.tls
.as_ref()
.map(|tls| create_tls_config(tls).expect("Failed to configure TLS"));

// Start MQTT Server task
let config = Arc::new(config);
ntex::rt::System::new(MqttPlugin::DEFAULT_NAME)
@@ -153,13 +154,71 @@ async fn run(runtime: Runtime, config: Config) {
.unwrap();
}

fn create_tls_config(config: &TLSConfig) -> Result<Arc<ServerConfig>, MqttPluginError> {
let key = load_private_key(config.server_private_key.as_str())?;
let certs = load_certs(config.server_certificate.as_str())?;
fn create_tls_config(config: &TLSConfig) -> ZResult<Arc<ServerConfig>> {
let key_bytes = match (
config.server_private_key.as_ref(),
config.server_private_key_base64.as_ref(),
) {
(Some(file), None) => {
std::fs::read(file).map_err(|e| zerror!("Invalid private key file: {e:?}"))?
}
(None, Some(base64)) => base64_decode(base64.expose_secret())?,
(None, None) => {
return Err(zerror!(
"Either 'server_private_key' or 'server_private_key_base64' must be present!"
)
.into());
}
_ => {
return Err(zerror!(
"Only one of 'server_private_key' and 'server_private_key_base64' can be present!"
)
.into());
}
};
let key = load_private_key(key_bytes)?;

let certs_bytes = match (
config.server_certificate.as_ref(),
config.server_certificate_base64.as_ref(),
) {
(Some(file), None) => {
std::fs::read(file).map_err(|e| zerror!("Invalid certificate file: {e:?}"))?
}
(None, Some(base64)) => base64_decode(base64.expose_secret())?,
(None, None) => {
return Err(zerror!(
"Either 'server_certificate' or 'server_certificate_base64' must be present!"
)
.into());
}
_ => {
return Err(zerror!(
"Only one of 'server_certificate' and 'server_certificate_base64' can be present!"
)
.into());
}
};
let certs = load_certs(certs_bytes)?;

// Providing a root CA certificate is optional - when provided clients will be verified
let rootca_bytes = match (
config.root_ca_certificate.as_ref(),
config.root_ca_certificate_base64.as_ref(),
) {
(Some(file), None) => {
Some(std::fs::read(file).map_err(|e| zerror!("Invalid root certificate file: {e:?}"))?)
}
(None, Some(base64)) => Some(base64_decode(base64.expose_secret())?),
(None, None) => None,
_ => {
return Err(zerror!("Only one of 'root_ca_certificate' and 'root_ca_certificate_base64' can be present!").into());
}
};

let tls_config = match config.root_ca_certificate.as_ref() {
Some(file) => {
let root_cert_store = load_trust_anchors(file)?;
let tls_config = match rootca_bytes {
Some(bytes) => {
let root_cert_store = load_trust_anchors(bytes)?;

ServerConfig::builder()
.with_safe_defaults()
@@ -176,12 +235,16 @@ fn create_tls_config(config: &TLSConfig) -> Result<Arc<ServerConfig>, MqttPlugin
Ok(Arc::new(tls_config))
}

fn load_private_key(filename: &str) -> Result<PrivateKey, String> {
let keyfile = match File::open(filename) {
Ok(file) => file,
Err(_) => return Err(format!("Cannot open private key file {:?}", filename)),
};
let mut reader = BufReader::new(keyfile);
pub fn base64_decode(data: &str) -> ZResult<Vec<u8>> {
use base64::engine::general_purpose;
use base64::Engine;
Ok(general_purpose::STANDARD
.decode(data)
.map_err(|e| zerror!("Unable to perform base64 decoding: {e:?}"))?)
}

fn load_private_key(bytes: Vec<u8>) -> ZResult<PrivateKey> {
let mut reader = BufReader::new(bytes.as_slice());

loop {
match rustls_pemfile::read_one(&mut reader) {
@@ -192,35 +255,31 @@ fn load_private_key(filename: &str) -> Result<PrivateKey, String> {
None => break,
_ => continue,
},
Err(_) => return Err(format!("Cannot parse private key file {:?}", filename)),
Err(e) => return Err(zerror!("Cannot parse private key: {e:?}").into()),
}
}
Err(format!("No supported private keys found in {:?}", filename))
Err(zerror!("No supported private keys found").into())
}

fn load_certs(filename: &str) -> Result<Vec<Certificate>, String> {
let certfile = match File::open(filename) {
Ok(file) => file,
Err(_) => return Err(format!("Cannot open certificate file {:?}", filename)),
};
let mut reader = BufReader::new(certfile);
fn load_certs(bytes: Vec<u8>) -> ZResult<Vec<Certificate>> {
let mut reader = BufReader::new(bytes.as_slice());

let certs = match rustls_pemfile::certs(&mut reader) {
Ok(certs) => certs,
Err(_) => return Err(format!("Cannot parse certificate file {:?}", filename)),
Err(e) => return Err(zerror!("Cannot parse certificate {e:?}").into()),
};

match certs.is_empty() {
true => Err(format!("No certificates found in {:?}", filename)),
true => Err(zerror!("No certificates found").into()),
false => Ok(certs.iter().map(|c| Certificate(c.to_vec())).collect()),
}
}

fn load_trust_anchors(root_ca_file: &str) -> Result<RootCertStore, String> {
fn load_trust_anchors(bytes: Vec<u8>) -> ZResult<RootCertStore> {
let mut root_cert_store = RootCertStore::empty();
let roots = load_certs(root_ca_file)?;
let roots = load_certs(bytes)?;
for root in roots {
root_cert_store.add(&root).unwrap();
root_cert_store.add(&root)?;
}
Ok(root_cert_store)
}