From 384e4c135986a0629f750fc25c722c27135f3a52 Mon Sep 17 00:00:00 2001 From: Geoff Martin Date: Thu, 22 Feb 2024 18:51:33 +0000 Subject: [PATCH 1/5] Support added for MQTTS including both sever-authentication and mutual TLS (mTLS) support. Error handling is still to be implemented. --- Cargo.lock | 31 +++-- Cargo.toml | 3 + zenoh-bridge-mqtt/src/main.rs | 17 ++- zenoh-plugin-mqtt/Cargo.toml | 3 + zenoh-plugin-mqtt/src/config.rs | 11 ++ zenoh-plugin-mqtt/src/lib.rs | 222 ++++++++++++++++++++++++-------- 6 files changed, 217 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa70648..cadd3da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3097,7 +3097,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile 1.0.3", + "rustls-pemfile 1.0.4", "schannel", "security-framework", ] @@ -3109,7 +3109,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" dependencies = [ "openssl-probe", - "rustls-pemfile 2.0.0", + "rustls-pemfile 2.1.0", "rustls-pki-types", "schannel", "security-framework", @@ -3117,18 +3117,18 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ "base64 0.21.4", ] [[package]] name = "rustls-pemfile" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4" +checksum = "3c333bb734fcdedcea57de1602543590f545f127dc8b533324318fd492c5c70b" dependencies = [ "base64 0.21.4", "rustls-pki-types", @@ -3136,9 +3136,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.0.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb0a1f9b9efec70d32e6d6aa3e58ebd88c3754ec98dfe9145c63cf54cc829b83" +checksum = "048a63e5b3ac996d78d402940b5fa47973d2d080c6c6fffa1d0f19c4445310b7" [[package]] name = "rustls-webpki" @@ -3152,9 +3152,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.102.0" +version = "0.102.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de2635c8bc2b88d367767c5de8ea1d8db9af3f6219eba28442242d9ab81d1b89" +checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" dependencies = [ "ring 0.17.6", "rustls-pki-types", @@ -4769,8 +4769,8 @@ dependencies = [ "quinn", "rustls", "rustls-native-certs 0.7.0", - "rustls-pemfile 2.0.0", - "rustls-webpki 0.102.0", + "rustls-pemfile 2.1.0", + "rustls-webpki 0.102.2", "secrecy", "zenoh-config", "zenoh-core", @@ -4809,8 +4809,8 @@ dependencies = [ "futures", "log", "rustls", - "rustls-pemfile 2.0.0", - "rustls-webpki 0.102.0", + "rustls-pemfile 2.1.0", + "rustls-webpki 0.102.2", "secrecy", "webpki-roots 0.26.0", "zenoh-config", @@ -4909,8 +4909,11 @@ dependencies = [ "log", "ntex", "ntex-mqtt", + "ntex-tls", "regex", "rustc_version 0.4.0", + "rustls", + "rustls-pemfile 1.0.4", "serde", "serde_json", "zenoh", diff --git a/Cargo.toml b/Cargo.toml index 7d76bd5..ebb1d1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,8 +39,11 @@ lazy_static = "1.4.0" log = "0.4.17" ntex = "0.7.17" ntex-mqtt = "0.12.16" +ntex-tls = "0.3.2" regex = "1.7.1" rustc_version = "0.4" +rustls = "0.21.7" +rustls-pemfile = "1.0.4" serde = "1.0.154" serde_json = "1.0.94" zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [ diff --git a/zenoh-bridge-mqtt/src/main.rs b/zenoh-bridge-mqtt/src/main.rs index 6163929..e425c9c 100644 --- a/zenoh-bridge-mqtt/src/main.rs +++ b/zenoh-bridge-mqtt/src/main.rs @@ -103,7 +103,19 @@ r#"-r, --generalise-sub=[String]... 'A list of key expression to use for gener )) .arg(Arg::from_usage( r#"-w, --generalise-pub=[String]... 'A list of key expression to use for generalising publications (usable multiple times).'"# - )); + )) + .arg(Arg::from_usage( +r#"--server-private-key=[FILE] 'Path to the TLS private key for the MQTT server. If specified a valid certificate for the server must also be provided.'"# + ) + .requires("server-certificate")) + .arg(Arg::from_usage( +r#"--server-certificate=[FILE] 'Path to the TLS public certificate for the MQTT server. If specified a valid private key for the server must also be provided.'"# + ) + .requires("server-private-key")) + .arg(Arg::from_usage( +r#"--root-ca-certificate=[FILE] 'Path to the certificate of the certificate authority used to validate clients connecting to the MQTT server. If specified a valid private key and certificate for the server must also be provided.'"# + ) + .requires_all(&["server-certificate", "server-private-key"])); let args = app.get_matches(); // load config file at first @@ -162,6 +174,9 @@ r#"-w, --generalise-pub=[String]... 'A list of key expression to use for gener insert_json5!(config, args, "plugins/mqtt/deny", if "deny", ); insert_json5!(config, args, "plugins/mqtt/generalise_pubs", for "generalise-pub", .collect::>()); insert_json5!(config, args, "plugins/mqtt/generalise_subs", for "generalise-sub", .collect::>()); + insert_json5!(config, args, "plugins/mqtt/tls/server_private_key", if "server-private-key", ); + insert_json5!(config, args, "plugins/mqtt/tls/server_certificate", if "server-certificate", ); + insert_json5!(config, args, "plugins/mqtt/tls/root_ca_certificate", if "root-ca-certificate", ); config } diff --git a/zenoh-plugin-mqtt/Cargo.toml b/zenoh-plugin-mqtt/Cargo.toml index 5335aef..cb48ed2 100644 --- a/zenoh-plugin-mqtt/Cargo.toml +++ b/zenoh-plugin-mqtt/Cargo.toml @@ -45,7 +45,10 @@ lazy_static = { workspace = true } log = { workspace = true } ntex = { workspace = true, features = ["async-std", "rustls"] } ntex-mqtt = { workspace = true } +ntex-tls = { workspace = true } regex = { workspace = true } +rustls = { workspace = true } +rustls-pemfile = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } zenoh = { workspace = true } diff --git a/zenoh-plugin-mqtt/src/config.rs b/zenoh-plugin-mqtt/src/config.rs index 5f52612..b8f1ca1 100644 --- a/zenoh-plugin-mqtt/src/config.rs +++ b/zenoh-plugin-mqtt/src/config.rs @@ -46,11 +46,22 @@ pub struct Config { pub generalise_subs: Vec, #[serde(default)] pub generalise_pubs: Vec, + #[serde(default)] + pub tls: Option, __required__: Option, #[serde(default, deserialize_with = "deserialize_path")] __path__: Option>, } +#[derive(Deserialize, Serialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct TLSConfig { + pub server_private_key: String, + pub server_certificate: String, + #[serde(default)] + pub root_ca_certificate: Option, +} + fn default_mqtt_port() -> String { format!("{DEFAULT_MQTT_INTERFACE}:{DEFAULT_MQTT_PORT}") } diff --git a/zenoh-plugin-mqtt/src/lib.rs b/zenoh-plugin-mqtt/src/lib.rs index fd6465a..f2a6824 100644 --- a/zenoh-plugin-mqtt/src/lib.rs +++ b/zenoh-plugin-mqtt/src/lib.rs @@ -12,11 +12,20 @@ // ZettaScale Zenoh Team, // use git_version::git_version; +use ntex::io::IoBoxed; +use ntex::service::chain_factory; use ntex::service::{fn_factory_with_config, fn_service}; +use ntex::time::Deadline; use ntex::util::Ready; -use ntex_mqtt::{v3, v5, MqttServer}; +use ntex::ServiceFactory; +use ntex_mqtt::{v3, v5, MqttError, MqttServer}; +use ntex_tls::rustls::Acceptor; +use rustls::server::AllowAnyAuthenticatedClient; +use rustls::{Certificate, PrivateKey, RootCertStore, ServerConfig}; use serde_json::Value; use std::env; +use std::fs::File; +use std::io::BufReader; use std::sync::Arc; use zenoh::plugins::{Plugin, RunningPluginTrait, Runtime, ZenohPlugin}; use zenoh::prelude::r#async::*; @@ -32,7 +41,7 @@ extern crate zenoh_core; pub mod config; mod mqtt_helpers; mod mqtt_session_state; -use config::Config; +use config::{Config, TLSConfig}; use mqtt_session_state::MqttSessionState; macro_rules! ke_for_sure { @@ -144,63 +153,160 @@ async fn run(runtime: Runtime, config: Config) { let config = Arc::new(config); ntex::rt::System::new(MqttPlugin::STATIC_NAME) .block_on(async move { - ntex::server::Server::build() - .bind("mqtt", config.port.clone(), move |_| { - let zs_v3 = zsession.clone(); - let zs_v5 = zsession.clone(); - let config_v3 = config.clone(); - let config_v5 = config.clone(); - MqttServer::new() - .v3(v3::MqttServer::new(fn_factory_with_config(move |_| { - let zs = zs_v3.clone(); - let config = config_v3.clone(); - Ready::Ok::<_, ()>(fn_service(move |h| { - handshake_v3(h, zs.clone(), config.clone()) - })) - })) - .publish(fn_factory_with_config( - |session: v3::Session| { - Ready::Ok::<_, MqttPluginError>(fn_service(move |req| { - publish_v3(session.clone(), req) - })) - }, - )) - .control(fn_factory_with_config( - |session: v3::Session| { - Ready::Ok::<_, MqttPluginError>(fn_service(move |req| { - control_v3(session.clone(), req) - })) - }, - ))) - .v5(v5::MqttServer::new(fn_factory_with_config(move |_| { - let zs = zs_v5.clone(); - let config = config_v5.clone(); - Ready::Ok::<_, ()>(fn_service(move |h| { - handshake_v5(h, zs.clone(), config.clone()) - })) - })) - .publish(fn_factory_with_config( - |session: v5::Session| { - Ready::Ok::<_, MqttPluginError>(fn_service(move |req| { - publish_v5(session.clone(), req) - })) - }, - )) - .control(fn_factory_with_config( - |session: v5::Session| { - Ready::Ok::<_, MqttPluginError>(fn_service(move |req| { - control_v5(session.clone(), req) - })) - }, - ))) - })? - .workers(1) - .run() - .await + let server = match config.tls.as_ref() { + Some(tls) => { + let tls_acceptor = create_tls_acceptor(tls); + ntex::server::Server::build() + .bind("mqtt", config.port.clone(), move |_| { + chain_factory(Acceptor::new(tls_acceptor.clone())) + .map_err(|err| MqttError::Service(MqttPluginError::from(err))) + .and_then(create_mqtt_server(zsession.clone(), config.clone())) + }) + .unwrap() + } + None => ntex::server::Server::build() + .bind("mqtt", config.port.clone(), move |_| { + create_mqtt_server(zsession.clone(), config.clone()) + }) + .unwrap(), + }; + server.workers(1).run().await }) .unwrap(); } +fn create_tls_acceptor(config: &TLSConfig) -> Arc { + let key = load_private_key(&config.server_private_key.as_str()); + let certs = load_certs(config.server_certificate.as_str()); + + let tls_config = match config.root_ca_certificate.as_ref() { + Some(file) => { + let root_cert_store = load_trust_anchors(file); + + ServerConfig::builder() + .with_safe_defaults() + .with_client_cert_verifier(Arc::new(AllowAnyAuthenticatedClient::new( + root_cert_store, + ))) + .with_single_cert(certs, key) + .unwrap() + } + None => ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(certs, key) + .unwrap(), + }; + Arc::new(tls_config) +} + +fn load_private_key(filename: &str) -> PrivateKey { + let keyfile = File::open(filename).expect("cannot open private key file"); + let mut reader = BufReader::new(keyfile); + + loop { + match rustls_pemfile::read_one(&mut reader).expect("cannot parse private key .pem file") { + Some(rustls_pemfile::Item::RSAKey(key)) => return rustls::PrivateKey(key), + Some(rustls_pemfile::Item::PKCS8Key(key)) => return rustls::PrivateKey(key), + Some(rustls_pemfile::Item::ECKey(key)) => return rustls::PrivateKey(key), + None => break, + _ => continue, + } + } + + panic!("No supported keys found in {:?}", filename); +} + +fn load_certs(filename: &str) -> Vec { + let certfile = File::open(filename).expect("cannot open certificate file"); + let mut reader = BufReader::new(certfile); + rustls_pemfile::certs(&mut reader) + .unwrap() + .iter() + .map(|c| Certificate(c.to_vec())) + .collect() +} + +fn load_trust_anchors(root_ca_file: &str) -> RootCertStore { + let mut root_cert_store = RootCertStore::empty(); + let roots = load_certs(root_ca_file); + for root in roots { + root_cert_store.add(&root).unwrap(); + } + root_cert_store +} + +fn create_mqtt_server( + session: Arc, + config: Arc, +) -> MqttServer< + impl ServiceFactory< + (IoBoxed, Deadline), + (), + Response = (), + Error = MqttError, + InitError = (), + >, + impl ServiceFactory< + (IoBoxed, Deadline), + (), + Response = (), + Error = MqttError, + InitError = (), + >, + MqttPluginError, + (), +> { + let zs_v3 = session.clone(); + let zs_v5 = session.clone(); + let config_v3 = config.clone(); + let config_v5 = config.clone(); + + MqttServer::new() + .v3(v3::MqttServer::new(fn_factory_with_config(move |_| { + let zs = zs_v3.clone(); + let config = config_v3.clone(); + Ready::Ok::<_, ()>(fn_service(move |h| { + handshake_v3(h, zs.clone(), config.clone()) + })) + })) + .publish(fn_factory_with_config( + |session: v3::Session| { + Ready::Ok::<_, MqttPluginError>(fn_service(move |req| { + publish_v3(session.clone(), req) + })) + }, + )) + .control(fn_factory_with_config( + |session: v3::Session| { + Ready::Ok::<_, MqttPluginError>(fn_service(move |req| { + control_v3(session.clone(), req) + })) + }, + ))) + .v5(v5::MqttServer::new(fn_factory_with_config(move |_| { + let zs = zs_v5.clone(); + let config = config_v5.clone(); + Ready::Ok::<_, ()>(fn_service(move |h| { + handshake_v5(h, zs.clone(), config.clone()) + })) + })) + .publish(fn_factory_with_config( + |session: v5::Session| { + Ready::Ok::<_, MqttPluginError>(fn_service(move |req| { + publish_v5(session.clone(), req) + })) + }, + )) + .control(fn_factory_with_config( + |session: v5::Session| { + Ready::Ok::<_, MqttPluginError>(fn_service(move |req| { + control_v5(session.clone(), req) + })) + }, + ))) +} + fn treat_admin_query(query: Query, admin_keyexpr_prefix: &keyexpr, config: &Config) { let selector = query.selector(); log::debug!("Query on admin space: {:?}", selector); @@ -256,6 +362,12 @@ impl From> for MqttPluginErro } } +impl From for MqttPluginError { + fn from(e: std::io::Error) -> Self { + MqttPluginError { err: e.into() } + } +} + // mqtt5 supports negative acks, so service error could be converted to PublishAck // (weird way to do it, but that's how it's done in ntex-mqtt examples...) impl std::convert::TryFrom for v5::PublishAck { From 2278a17c91d392b1eb5fe0ea26769198ece3a836 Mon Sep 17 00:00:00 2001 From: Geoff Martin Date: Fri, 23 Feb 2024 14:01:47 +0000 Subject: [PATCH 2/5] Improved error handling. --- zenoh-plugin-mqtt/src/lib.rs | 109 +++++++++++++++++++++-------------- 1 file changed, 67 insertions(+), 42 deletions(-) diff --git a/zenoh-plugin-mqtt/src/lib.rs b/zenoh-plugin-mqtt/src/lib.rs index be4ebe3..ece4052 100644 --- a/zenoh-plugin-mqtt/src/lib.rs +++ b/zenoh-plugin-mqtt/src/lib.rs @@ -125,91 +125,104 @@ async fn run(runtime: Runtime, config: Config) { .await .expect("Failed to create AdminSpace queryable"); + let tls_config = config + .tls + .as_ref() + .map(|tls| create_tls_config(tls).expect("Failed to configure TLS")); + // Start MQTT Server task let config = Arc::new(config); ntex::rt::System::new(MqttPlugin::DEFAULT_NAME) .block_on(async move { - let server = match config.tls.as_ref() { + let server = match tls_config { Some(tls) => { - let tls_acceptor = create_tls_acceptor(tls); - ntex::server::Server::build() - .bind("mqtt", config.port.clone(), move |_| { - chain_factory(Acceptor::new(tls_acceptor.clone())) - .map_err(|err| MqttError::Service(MqttPluginError::from(err))) - .and_then(create_mqtt_server(zsession.clone(), config.clone())) - }) - .unwrap() + ntex::server::Server::build().bind("mqtt", config.port.clone(), move |_| { + chain_factory(Acceptor::new(tls.clone())) + .map_err(|err| MqttError::Service(MqttPluginError::from(err))) + .and_then(create_mqtt_server(zsession.clone(), config.clone())) + })? } - None => ntex::server::Server::build() - .bind("mqtt", config.port.clone(), move |_| { + None => { + ntex::server::Server::build().bind("mqtt", config.port.clone(), move |_| { create_mqtt_server(zsession.clone(), config.clone()) - }) - .unwrap(), + })? + } }; server.workers(1).run().await }) .unwrap(); } -fn create_tls_acceptor(config: &TLSConfig) -> Arc { - let key = load_private_key(&config.server_private_key.as_str()); - let certs = load_certs(config.server_certificate.as_str()); +fn create_tls_config(config: &TLSConfig) -> Result, MqttPluginError> { + let key = load_private_key(config.server_private_key.as_str())?; + let certs = load_certs(config.server_certificate.as_str())?; let tls_config = match config.root_ca_certificate.as_ref() { Some(file) => { - let root_cert_store = load_trust_anchors(file); + let root_cert_store = load_trust_anchors(file)?; ServerConfig::builder() .with_safe_defaults() .with_client_cert_verifier(Arc::new(AllowAnyAuthenticatedClient::new( root_cert_store, ))) - .with_single_cert(certs, key) - .unwrap() + .with_single_cert(certs, key)? } None => ServerConfig::builder() .with_safe_defaults() .with_no_client_auth() - .with_single_cert(certs, key) - .unwrap(), + .with_single_cert(certs, key)?, }; - Arc::new(tls_config) + Ok(Arc::new(tls_config)) } -fn load_private_key(filename: &str) -> PrivateKey { - let keyfile = File::open(filename).expect("cannot open private key file"); +fn load_private_key(filename: &str) -> Result { + let keyfile = match File::open(filename) { + Ok(file) => file, + Err(_) => return Err(format!("Cannot open private key file {:?}", filename)), + }; let mut reader = BufReader::new(keyfile); loop { - match rustls_pemfile::read_one(&mut reader).expect("cannot parse private key .pem file") { - Some(rustls_pemfile::Item::RSAKey(key)) => return rustls::PrivateKey(key), - Some(rustls_pemfile::Item::PKCS8Key(key)) => return rustls::PrivateKey(key), - Some(rustls_pemfile::Item::ECKey(key)) => return rustls::PrivateKey(key), - None => break, - _ => continue, + match rustls_pemfile::read_one(&mut reader) { + Ok(item) => match item { + Some(rustls_pemfile::Item::RSAKey(key)) => return Ok(rustls::PrivateKey(key)), + Some(rustls_pemfile::Item::PKCS8Key(key)) => return Ok(rustls::PrivateKey(key)), + Some(rustls_pemfile::Item::ECKey(key)) => return Ok(rustls::PrivateKey(key)), + None => break, + _ => continue, + }, + Err(_) => return Err(format!("Cannot parse private key file {:?}", filename)), } } - - panic!("No supported keys found in {:?}", filename); + Err(format!("No supported private keys found in {:?}", filename)) } -fn load_certs(filename: &str) -> Vec { - let certfile = File::open(filename).expect("cannot open certificate file"); +fn load_certs(filename: &str) -> Result, String> { + let certfile = match File::open(filename) { + Ok(file) => file, + Err(_) => return Err(format!("Cannot open certificate file {:?}", filename)), + }; let mut reader = BufReader::new(certfile); - rustls_pemfile::certs(&mut reader) - .unwrap() - .iter() - .map(|c| Certificate(c.to_vec())) - .collect() + + let certs = match rustls_pemfile::certs(&mut reader) { + Ok(certs) => certs, + Err(_) => return Err(format!("Cannot parse certificate file {:?}", filename)), + }; + + match certs.is_empty() { + true => Err(format!("No certificates found in {:?}", filename)), + false => Ok(certs.iter().map(|c| Certificate(c.to_vec())).collect()), + } } -fn load_trust_anchors(root_ca_file: &str) -> RootCertStore { +fn load_trust_anchors(root_ca_file: &str) -> Result { let mut root_cert_store = RootCertStore::empty(); - let roots = load_certs(root_ca_file); + let roots = load_certs(root_ca_file)?; for root in roots { root_cert_store.add(&root).unwrap(); } - root_cert_store + Ok(root_cert_store) } fn create_mqtt_server( @@ -347,6 +360,18 @@ impl From for MqttPluginError { } } +impl From for MqttPluginError { + fn from(e: rustls::Error) -> Self { + MqttPluginError { err: e.into() } + } +} + +impl From for MqttPluginError { + fn from(e: String) -> Self { + MqttPluginError { err: e.into() } + } +} + // mqtt5 supports negative acks, so service error could be converted to PublishAck // (weird way to do it, but that's how it's done in ntex-mqtt examples...) impl std::convert::TryFrom for v5::PublishAck { From de9d25b7f3fe6f9a1ac54bf67f56ab98896ecf3a Mon Sep 17 00:00:00 2001 From: Geoff Martin Date: Fri, 23 Feb 2024 15:43:09 +0000 Subject: [PATCH 3/5] Adding documentation for MQTTS support. --- DEFAULT_CONFIG.json5 | 21 ++++++++++++++++++ README.md | 53 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 6ac359a..b5a8e3e 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -45,6 +45,27 @@ //// // generalise_subs: ["PUB1", "PUB2"], + //// + //// TLS related configuration (MQTTS active only if this part is defined). + //// + // tls: { + // //// + // //// server_private_key: Path to the TLS private key. + // //// + // server_private_key: "/path/to/private-key.pem", + // + // //// + // //// server_certificate: Path to the TLS public certificate. + // //// + // server_certificate: "/path/to/certificate.pem", + // + // //// + // //// root_ca_certificate: Path to the certificate of the certificate authority used to validate clients connecting to the MQTT server. + // //// This setting is optional and enables mutual TLS (mTLS) support if provided. + // //// + // // root_ca_certificate: "/path/to/root-ca-certificate.pem", + // }, + }, //// diff --git a/README.md b/README.md index 662d328..58464bd 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,9 @@ The `"mqtt"` part of this same configuration file can also be used in the config - **`-r, --generalise-sub `** : A list of key expressions to use for generalising the declaration of the zenoh subscriptions, and thus minimizing the discovery traffic (usable multiple times). See [this blog](https://zenoh.io/blog/2021-03-23-discovery/#leveraging-resource-generalisation) for more details. + - **`--server-private-key `** : Path to the TLS private key for the MQTT server. If specified a valid certificate for the server must also be provided. + - **`--server-certificate `** : Path to the TLS public certificate for the MQTT server. If specified a valid private key for the server must also be provided. + - **`--root-ca-certificate `** : Path to the certificate of the certificate authority used to validate clients connecting to the MQTT server. If specified a valid private key and certificate for the server must also be provided. ## Admin space @@ -89,6 +92,56 @@ Example of queries on administration space using the REST API with the `curl` co > _Pro tip: pipe the result into [**jq**](https://stedolan.github.io/jq/) command for JSON pretty print or transformation._ +## MQTTS support + +The MQTT plugin and standalone bridge for Eclipse Zenoh supports MQTTS. MQTTS can be configured in two ways: + + - server side authentication: MQTT clients validate the servers TLS certificate but not the other way around. + - mutual authentication (mTLS): where both server and clients validate each other. + + MQTTS can be configured via the configuration file or, if using the standalone bridge, via command line arguments. + +### Server side authentication configuration + +In the configuration file, the required **tls** fields are **server_private_key** and **server_certificate**. + +An example configuration file supporting server side authentication would be: + +```json +{ + plugins: { + mqtt: { + tls: { + server_private_key: "/path/to/private-key.pem", + server_certificate: "/path/to/certificate.pem" + } + } + } +} +``` + +The standalone bridge (`zenoh-bridge-mqtt`) also allows these settings to be provided through the **`--server-private-key`** and **`--server-certificate`** command line arguments. + +### Mutual authentication (mTLS) configuration + +In order to enable mutual authentication a certificate for the certificate authority used to validate clients connecting to the MQTT server must also be provided. In the configuration file, the required **tls** field is **root_ca_certificate**. + +An example configuration file supporting server side authentication would be: + +```json +{ + plugins: { + mqtt: { + tls: { + server_private_key: "/path/to/private-key.pem", + server_certificate: "/path/to/certificate.pem", + root_ca_certificate: "/path/to/root-ca-certificate.pem" + } + } + } +} +``` +The standalone bridge (`zenoh-bridge-mqtt`) also allows this setting to be provided through the **`--root-ca-certificate`** command line argument. ## How to install it From 0af65b49ed82cc515d74ce504671c1029e67f7ff Mon Sep 17 00:00:00 2001 From: Geoff Martin Date: Fri, 23 Feb 2024 15:47:51 +0000 Subject: [PATCH 4/5] Correcting JSON example configuration for MQTTS in README. --- README.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 58464bd..545d3d0 100644 --- a/README.md +++ b/README.md @@ -109,11 +109,11 @@ An example configuration file supporting server side authentication would be: ```json { - plugins: { - mqtt: { - tls: { - server_private_key: "/path/to/private-key.pem", - server_certificate: "/path/to/certificate.pem" + "plugins": { + "mqtt": { + "tls": { + "server_private_key": "/path/to/private-key.pem", + "server_certificate": "/path/to/certificate.pem" } } } @@ -130,12 +130,12 @@ An example configuration file supporting server side authentication would be: ```json { - plugins: { - mqtt: { - tls: { - server_private_key: "/path/to/private-key.pem", - server_certificate: "/path/to/certificate.pem", - root_ca_certificate: "/path/to/root-ca-certificate.pem" + "plugins": { + "mqtt": { + "tls": { + "server_private_key": "/path/to/private-key.pem", + "server_certificate": "/path/to/certificate.pem", + "root_ca_certificate": "/path/to/root-ca-certificate.pem" } } } From 280d65930e1a3cec6fb019e56c49f7cc469ff12b Mon Sep 17 00:00:00 2001 From: Geoff Martin Date: Wed, 28 Feb 2024 13:17:38 +0000 Subject: [PATCH 5/5] Added support for providing PKI material in configuration file in base64 encoded format. --- Cargo.lock | 2 + Cargo.toml | 2 + DEFAULT_CONFIG.json5 | 16 ++-- README.md | 12 ++- zenoh-plugin-mqtt/Cargo.toml | 2 + zenoh-plugin-mqtt/src/config.rs | 12 ++- zenoh-plugin-mqtt/src/lib.rs | 125 +++++++++++++++++++++++--------- 7 files changed, 126 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e733b7..16dc7d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4894,6 +4894,7 @@ dependencies = [ "async-channel 2.2.0", "async-std", "async-trait", + "base64 0.21.4", "derivative", "env_logger", "flume 0.10.14", @@ -4909,6 +4910,7 @@ dependencies = [ "rustc_version 0.4.0", "rustls", "rustls-pemfile 1.0.4", + "secrecy", "serde", "serde_json", "zenoh", diff --git a/Cargo.toml b/Cargo.toml index ebb1d1f..b19b663 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ categories = ["network-programming"] async-channel = "2.2.0" async-std = "=1.12.0" async-trait = "0.1.66" +base64 = "0.21.4" clap = "3.2.23" derivative = "2.2.0" env_logger = "0.10.0" @@ -44,6 +45,7 @@ regex = "1.7.1" rustc_version = "0.4" rustls = "0.21.7" rustls-pemfile = "1.0.4" +secrecy = { version = "0.8.0", features = ["serde", "alloc"] } serde = "1.0.154" serde_json = "1.0.94" zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [ diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index b5a8e3e..049e130 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -50,20 +50,26 @@ //// // tls: { // //// - // //// server_private_key: Path to the TLS private key. + // //// server_private_key: TLS private key provided as either a file or base 64 encoded string. + // //// One of the values below must be provided. // //// - // server_private_key: "/path/to/private-key.pem", + // // server_private_key: "/path/to/private-key.pem", + // // server_private_key_base64: "base64-private-key", // // //// - // //// server_certificate: Path to the TLS public certificate. + // //// server_certificate: TLS public certificate provided as either a file or base 64 encoded string. + // //// One of the values below must be provided. // //// - // server_certificate: "/path/to/certificate.pem", + // // server_certificate: "/path/to/certificate.pem", + // // server_certificate_base64: "base64-certificate", // // //// - // //// root_ca_certificate: Path to the certificate of the certificate authority used to validate clients connecting to the MQTT server. + // //// root_ca_certificate: Certificate of the certificate authority used to validate clients connecting to the MQTT server. + // //// Provided as either a file or base 64 encoded string. // //// This setting is optional and enables mutual TLS (mTLS) support if provided. // //// // // root_ca_certificate: "/path/to/root-ca-certificate.pem", + // // root_ca_certificate_base64: "base64-root-ca-certificate", // }, }, diff --git a/README.md b/README.md index 545d3d0..f767087 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,9 @@ The MQTT plugin and standalone bridge for Eclipse Zenoh supports MQTTS. MQTTS ca ### Server side authentication configuration -In the configuration file, the required **tls** fields are **server_private_key** and **server_certificate**. +Server side authentication requires both a private key and certificate for the server. These can be provided as either a file or as a base 64 encoded string. + +In the configuration file, the required **tls** fields when using files are **server_private_key** and **server_certificate**. When using base 64 encoded strings the required **tls** fields are **server_private_key_base64** and **server_certificate_base64**. An example configuration file supporting server side authentication would be: @@ -120,11 +122,13 @@ An example configuration file supporting server side authentication would be: } ``` -The standalone bridge (`zenoh-bridge-mqtt`) also allows these settings to be provided through the **`--server-private-key`** and **`--server-certificate`** command line arguments. +The standalone bridge (`zenoh-bridge-mqtt`) also allows the required files to be provided through the **`--server-private-key`** and **`--server-certificate`** command line arguments. ### Mutual authentication (mTLS) configuration -In order to enable mutual authentication a certificate for the certificate authority used to validate clients connecting to the MQTT server must also be provided. In the configuration file, the required **tls** field is **root_ca_certificate**. +In order to enable mutual authentication a certificate for the certificate authority used to validate clients connecting to the MQTT server must also be provided. This can be provided as either a file or a base 64 encoded string. + +In the configuration file, the required **tls** field when using a file is **root_ca_certificate**. When using base 64 encoded strings the required **tls** field when using a file is **root_ca_certificate_base64**. An example configuration file supporting server side authentication would be: @@ -141,7 +145,7 @@ An example configuration file supporting server side authentication would be: } } ``` -The standalone bridge (`zenoh-bridge-mqtt`) also allows this setting to be provided through the **`--root-ca-certificate`** command line argument. +The standalone bridge (`zenoh-bridge-mqtt`) also allows the required file to be provided through the **`--root-ca-certificate`** command line argument. ## How to install it diff --git a/zenoh-plugin-mqtt/Cargo.toml b/zenoh-plugin-mqtt/Cargo.toml index ebab4f1..035cf45 100644 --- a/zenoh-plugin-mqtt/Cargo.toml +++ b/zenoh-plugin-mqtt/Cargo.toml @@ -35,6 +35,7 @@ stats = ["zenoh/stats"] async-channel = { workspace = true } async-std = { workspace = true, features = ["unstable", "attributes"] } async-trait = { workspace = true } +base64 = { workspace = true } derivative = { workspace = true } env_logger = { workspace = true } flume = { workspace = true } @@ -49,6 +50,7 @@ ntex-tls = { workspace = true } regex = { workspace = true } rustls = { workspace = true } rustls-pemfile = { workspace = true } +secrecy = {workspace = true } serde = { workspace = true } serde_json = { workspace = true } zenoh = { workspace = true } diff --git a/zenoh-plugin-mqtt/src/config.rs b/zenoh-plugin-mqtt/src/config.rs index b8f1ca1..d2b514f 100644 --- a/zenoh-plugin-mqtt/src/config.rs +++ b/zenoh-plugin-mqtt/src/config.rs @@ -15,6 +15,7 @@ use regex::Regex; use serde::de::{Unexpected, Visitor}; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use std::fmt; +use zenoh::config::SecretValue; use zenoh::prelude::*; const DEFAULT_MQTT_INTERFACE: &str = "0.0.0.0"; @@ -56,10 +57,15 @@ pub struct Config { #[derive(Deserialize, Serialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct TLSConfig { - pub server_private_key: String, - pub server_certificate: String, - #[serde(default)] + pub server_private_key: Option, + #[serde(skip_serializing)] + pub server_private_key_base64: Option, + pub server_certificate: Option, + #[serde(skip_serializing)] + pub server_certificate_base64: Option, pub root_ca_certificate: Option, + #[serde(skip_serializing)] + pub root_ca_certificate_base64: Option, } fn default_mqtt_port() -> String { diff --git a/zenoh-plugin-mqtt/src/lib.rs b/zenoh-plugin-mqtt/src/lib.rs index ece4052..adfa22f 100644 --- a/zenoh-plugin-mqtt/src/lib.rs +++ b/zenoh-plugin-mqtt/src/lib.rs @@ -20,9 +20,9 @@ use ntex_mqtt::{v3, v5, MqttError, MqttServer}; use ntex_tls::rustls::Acceptor; use rustls::server::AllowAnyAuthenticatedClient; use rustls::{Certificate, PrivateKey, RootCertStore, ServerConfig}; +use secrecy::ExposeSecret; use serde_json::Value; use std::env; -use std::fs::File; use std::io::BufReader; use std::sync::Arc; use zenoh::plugins::{RunningPluginTrait, ZenohPlugin}; @@ -82,7 +82,13 @@ impl Plugin for MqttPlugin { .ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?; let config: Config = serde_json::from_value(plugin_conf.clone()) .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?; - async_std::task::spawn(run(runtime.clone(), config)); + + let tls_config = match config.tls.as_ref() { + Some(tls) => Some(create_tls_config(tls)?), + None => None, + }; + + async_std::task::spawn(run(runtime.clone(), config, tls_config)); Ok(Box::new(MqttPlugin)) } } @@ -90,7 +96,7 @@ impl Plugin for MqttPlugin { impl PluginControl for MqttPlugin {} impl RunningPluginTrait for MqttPlugin {} -async fn run(runtime: Runtime, config: Config) { +async fn run(runtime: Runtime, config: Config, tls_config: Option>) { // Try to initiate login. // Required in case of dynamic lib, otherwise no logs. // But cannot be done twice in case of static link. @@ -125,11 +131,6 @@ async fn run(runtime: Runtime, config: Config) { .await .expect("Failed to create AdminSpace queryable"); - let tls_config = config - .tls - .as_ref() - .map(|tls| create_tls_config(tls).expect("Failed to configure TLS")); - // Start MQTT Server task let config = Arc::new(config); ntex::rt::System::new(MqttPlugin::DEFAULT_NAME) @@ -153,13 +154,71 @@ async fn run(runtime: Runtime, config: Config) { .unwrap(); } -fn create_tls_config(config: &TLSConfig) -> Result, MqttPluginError> { - let key = load_private_key(config.server_private_key.as_str())?; - let certs = load_certs(config.server_certificate.as_str())?; +fn create_tls_config(config: &TLSConfig) -> ZResult> { + let key_bytes = match ( + config.server_private_key.as_ref(), + config.server_private_key_base64.as_ref(), + ) { + (Some(file), None) => { + std::fs::read(file).map_err(|e| zerror!("Invalid private key file: {e:?}"))? + } + (None, Some(base64)) => base64_decode(base64.expose_secret())?, + (None, None) => { + return Err(zerror!( + "Either 'server_private_key' or 'server_private_key_base64' must be present!" + ) + .into()); + } + _ => { + return Err(zerror!( + "Only one of 'server_private_key' and 'server_private_key_base64' can be present!" + ) + .into()); + } + }; + let key = load_private_key(key_bytes)?; + + let certs_bytes = match ( + config.server_certificate.as_ref(), + config.server_certificate_base64.as_ref(), + ) { + (Some(file), None) => { + std::fs::read(file).map_err(|e| zerror!("Invalid certificate file: {e:?}"))? + } + (None, Some(base64)) => base64_decode(base64.expose_secret())?, + (None, None) => { + return Err(zerror!( + "Either 'server_certificate' or 'server_certificate_base64' must be present!" + ) + .into()); + } + _ => { + return Err(zerror!( + "Only one of 'server_certificate' and 'server_certificate_base64' can be present!" + ) + .into()); + } + }; + let certs = load_certs(certs_bytes)?; + + // Providing a root CA certificate is optional - when provided clients will be verified + let rootca_bytes = match ( + config.root_ca_certificate.as_ref(), + config.root_ca_certificate_base64.as_ref(), + ) { + (Some(file), None) => { + Some(std::fs::read(file).map_err(|e| zerror!("Invalid root certificate file: {e:?}"))?) + } + (None, Some(base64)) => Some(base64_decode(base64.expose_secret())?), + (None, None) => None, + _ => { + return Err(zerror!("Only one of 'root_ca_certificate' and 'root_ca_certificate_base64' can be present!").into()); + } + }; - let tls_config = match config.root_ca_certificate.as_ref() { - Some(file) => { - let root_cert_store = load_trust_anchors(file)?; + let tls_config = match rootca_bytes { + Some(bytes) => { + let root_cert_store = load_trust_anchors(bytes)?; ServerConfig::builder() .with_safe_defaults() @@ -176,12 +235,16 @@ fn create_tls_config(config: &TLSConfig) -> Result, MqttPlugin Ok(Arc::new(tls_config)) } -fn load_private_key(filename: &str) -> Result { - let keyfile = match File::open(filename) { - Ok(file) => file, - Err(_) => return Err(format!("Cannot open private key file {:?}", filename)), - }; - let mut reader = BufReader::new(keyfile); +pub fn base64_decode(data: &str) -> ZResult> { + use base64::engine::general_purpose; + use base64::Engine; + Ok(general_purpose::STANDARD + .decode(data) + .map_err(|e| zerror!("Unable to perform base64 decoding: {e:?}"))?) +} + +fn load_private_key(bytes: Vec) -> ZResult { + let mut reader = BufReader::new(bytes.as_slice()); loop { match rustls_pemfile::read_one(&mut reader) { @@ -192,35 +255,31 @@ fn load_private_key(filename: &str) -> Result { None => break, _ => continue, }, - Err(_) => return Err(format!("Cannot parse private key file {:?}", filename)), + Err(e) => return Err(zerror!("Cannot parse private key: {e:?}").into()), } } - Err(format!("No supported private keys found in {:?}", filename)) + Err(zerror!("No supported private keys found").into()) } -fn load_certs(filename: &str) -> Result, String> { - let certfile = match File::open(filename) { - Ok(file) => file, - Err(_) => return Err(format!("Cannot open certificate file {:?}", filename)), - }; - let mut reader = BufReader::new(certfile); +fn load_certs(bytes: Vec) -> ZResult> { + let mut reader = BufReader::new(bytes.as_slice()); let certs = match rustls_pemfile::certs(&mut reader) { Ok(certs) => certs, - Err(_) => return Err(format!("Cannot parse certificate file {:?}", filename)), + Err(e) => return Err(zerror!("Cannot parse certificate {e:?}").into()), }; match certs.is_empty() { - true => Err(format!("No certificates found in {:?}", filename)), + true => Err(zerror!("No certificates found").into()), false => Ok(certs.iter().map(|c| Certificate(c.to_vec())).collect()), } } -fn load_trust_anchors(root_ca_file: &str) -> Result { +fn load_trust_anchors(bytes: Vec) -> ZResult { let mut root_cert_store = RootCertStore::empty(); - let roots = load_certs(root_ca_file)?; + let roots = load_certs(bytes)?; for root in roots { - root_cert_store.add(&root).unwrap(); + root_cert_store.add(&root)?; } Ok(root_cert_store) }