diff --git a/rpxy-acme/Cargo.toml b/rpxy-acme/Cargo.toml index 9473429c..7adc8fdd 100644 --- a/rpxy-acme/Cargo.toml +++ b/rpxy-acme/Cargo.toml @@ -29,3 +29,5 @@ rustls-platform-verifier = { version = "0.3.2" } rustls-acme = { path = "../submodules/rustls-acme/", default-features = false, features = [ "aws-lc-rs", ] } +tokio = { version = "1.38.1", default-features = false } +tokio-stream = { version = "0.1.15", default-features = false } diff --git a/rpxy-acme/src/dir_cache.rs b/rpxy-acme/src/dir_cache.rs index 33504fa4..5170ad61 100644 --- a/rpxy-acme/src/dir_cache.rs +++ b/rpxy-acme/src/dir_cache.rs @@ -15,7 +15,7 @@ enum FileType { Cert, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct DirCache { pub(super) account_dir: PathBuf, pub(super) cert_dir: PathBuf, diff --git a/rpxy-acme/src/lib.rs b/rpxy-acme/src/lib.rs index 813b388a..246b9008 100644 --- a/rpxy-acme/src/lib.rs +++ b/rpxy-acme/src/lib.rs @@ -1,7 +1,7 @@ mod constants; mod dir_cache; mod error; -mod targets; +mod manager; #[allow(unused_imports)] mod log { @@ -11,4 +11,4 @@ mod log { pub use constants::{ACME_DIR_URL, ACME_REGISTRY_PATH}; pub use dir_cache::DirCache; pub use error::RpxyAcmeError; -pub use targets::AcmeContexts; +pub use manager::AcmeManager; diff --git a/rpxy-acme/src/targets.rs b/rpxy-acme/src/manager.rs similarity index 54% rename from rpxy-acme/src/targets.rs rename to rpxy-acme/src/manager.rs index 74e62623..112c4493 100644 --- a/rpxy-acme/src/targets.rs +++ b/rpxy-acme/src/manager.rs @@ -5,24 +5,29 @@ use crate::{ log::*, }; use rustc_hash::FxHashMap as HashMap; -// use rustls_acme::AcmeConfig; -use std::path::PathBuf; +use rustls::ServerConfig; +use rustls_acme::AcmeConfig; +use std::{path::PathBuf, sync::Arc}; +use tokio::runtime::Handle; +use tokio_stream::StreamExt; use url::Url; -#[derive(Debug)] +#[derive(Debug, Clone)] /// ACME settings -pub struct AcmeContexts { +pub struct AcmeManager { /// ACME directory url acme_dir_url: Url, - /// ACME registry directory - acme_registry_dir: PathBuf, + // /// ACME registry directory + // acme_registry_dir: PathBuf, /// ACME contacts contacts: Vec, /// ACME directly cache information inner: HashMap, + /// Tokio runtime handle + runtime_handle: Handle, } -impl AcmeContexts { +impl AcmeManager { /// Create a new instance. Note that for each domain, a new AcmeConfig is created. /// This means that for each domain, a distinct operation will be dispatched and separated certificates will be generated. pub fn try_new( @@ -30,6 +35,7 @@ impl AcmeContexts { acme_registry_dir: Option<&str>, contacts: &[String], domains: &[String], + runtime_handle: Handle, ) -> Result { // Install aws_lc_rs as default crypto provider for rustls let _ = rustls::crypto::CryptoProvider::install_default(rustls::crypto::aws_lc_rs::default_provider()); @@ -45,11 +51,6 @@ impl AcmeContexts { .as_deref() .map_or_else(|| Url::parse(ACME_DIR_URL), Url::parse)?; let contacts = contacts.iter().map(|email| format!("mailto:{email}")).collect::>(); - // let rustls_client_config = rustls::ClientConfig::builder() - // .dangerous() // The `Verifier` we're using is actually safe - // .with_custom_certificate_verifier(std::sync::Arc::new(rustls_platform_verifier::Verifier::new())) - // .with_no_client_auth(); - // let rustls_client_config = Arc::new(rustls_client_config); let inner = domains .iter() @@ -59,27 +60,59 @@ impl AcmeContexts { (domain, dir_cache) }) .collect::>(); - // let inner = domains - // .iter() - // .map(|domain| { - // let dir_cache = DirCache::new(&acme_registry_dir, domain); - // let config = AcmeConfig::new([domain]) - // .contact(&contacts) - // .cache(dir_cache) - // .directory(acme_dir_url.as_str()) - // .client_tls_config(rustls_client_config.clone()); - // let config = Box::new(config); - // (domain.to_ascii_lowercase(), config) - // }) - // .collect::>(); Ok(Self { acme_dir_url, - acme_registry_dir, + // acme_registry_dir, contacts, inner, + runtime_handle, }) } + + /// Start ACME manager to manage certificates for each domain. + /// Returns a Vec> as a tasks handles and a map of domain to ServerConfig for challenge. + pub fn spawn_manager_tasks(&self) -> (Vec>, HashMap>) { + info!("rpxy ACME manager started"); + + let rustls_client_config = rustls::ClientConfig::builder() + .dangerous() // The `Verifier` we're using is actually safe + .with_custom_certificate_verifier(Arc::new(rustls_platform_verifier::Verifier::new())) + .with_no_client_auth(); + let rustls_client_config = Arc::new(rustls_client_config); + + let mut server_configs_for_challenge: HashMap> = HashMap::default(); + let join_handles = self + .inner + .clone() + .into_iter() + .map(|(domain, dir_cache)| { + let config = AcmeConfig::new([&domain]) + .contact(&self.contacts) + .cache(dir_cache.to_owned()) + .directory(self.acme_dir_url.as_str()) + .client_tls_config(rustls_client_config.clone()); + let mut state = config.state(); + server_configs_for_challenge.insert(domain.to_ascii_lowercase(), state.challenge_rustls_config()); + self.runtime_handle.spawn(async move { + info!("rpxy ACME manager task for {domain} started"); + // infinite loop unless the return value is None + loop { + let Some(res) = state.next().await else { + error!("rpxy ACME manager task for {domain} exited"); + break; + }; + match res { + Ok(ok) => info!("rpxy ACME event: {ok:?}"), + Err(err) => error!("rpxy ACME error: {err:?}"), + } + } + }) + }) + .collect::>(); + + (join_handles, server_configs_for_challenge) + } } #[cfg(test)] @@ -93,17 +126,19 @@ mod tests { let acme_dir_url = "https://acme.example.com/directory"; let acme_registry_dir = "/tmp/acme"; let contacts = vec!["test@example.com".to_string()]; - let acme_contexts: AcmeContexts = AcmeContexts::try_new( + let handle = Handle::current(); + let acme_contexts: AcmeManager = AcmeManager::try_new( Some(acme_dir_url), Some(acme_registry_dir), &contacts, &["example.com".to_string(), "example.org".to_string()], + handle, ) .unwrap(); assert_eq!(acme_contexts.inner.len(), 2); assert_eq!(acme_contexts.contacts, vec!["mailto:test@example.com".to_string()]); assert_eq!(acme_contexts.acme_dir_url.as_str(), acme_dir_url); - assert_eq!(acme_contexts.acme_registry_dir, PathBuf::from(acme_registry_dir)); + // assert_eq!(acme_contexts.acme_registry_dir, PathBuf::from(acme_registry_dir)); assert_eq!( acme_contexts.inner["example.com"], DirCache { diff --git a/rpxy-bin/Cargo.toml b/rpxy-bin/Cargo.toml index 61263469..f330d9d5 100644 --- a/rpxy-bin/Cargo.toml +++ b/rpxy-bin/Cargo.toml @@ -40,7 +40,7 @@ tokio = { version = "1.38.1", default-features = false, features = [ "macros", ] } async-trait = "0.1.81" - +futures-util = { version = "0.3.30", default-features = false } # config clap = { version = "4.5.9", features = ["std", "cargo", "wrap_help"] } diff --git a/rpxy-bin/src/config/parse.rs b/rpxy-bin/src/config/parse.rs index 3dd6599e..a591c409 100644 --- a/rpxy-bin/src/config/parse.rs +++ b/rpxy-bin/src/config/parse.rs @@ -7,7 +7,7 @@ use rpxy_lib::{AppConfig, AppConfigList, ProxyConfig}; use rustc_hash::FxHashMap as HashMap; #[cfg(feature = "acme")] -use rpxy_acme::{AcmeContexts, ACME_DIR_URL, ACME_REGISTRY_PATH}; +use rpxy_acme::{AcmeManager, ACME_DIR_URL, ACME_REGISTRY_PATH}; /// Parsed options pub struct Opts { @@ -157,12 +157,14 @@ pub async fn build_cert_manager( /* ----------------------- */ #[cfg(feature = "acme")] -/// Build acme manager and dummy cert and key as initial states if not exists -/// TODO: CURRENTLY NOT IMPLEMENTED, UNDER DESIGNING -pub async fn build_acme_manager(config: &ConfigToml) -> Result<(), anyhow::Error> { +/// Build acme manager +pub async fn build_acme_manager( + config: &ConfigToml, + runtime_handle: tokio::runtime::Handle, +) -> Result, anyhow::Error> { let acme_option = config.experimental.as_ref().and_then(|v| v.acme.clone()); if acme_option.is_none() { - return Ok(()); + return Ok(None); } let acme_option = acme_option.unwrap(); @@ -183,14 +185,17 @@ pub async fn build_acme_manager(config: &ConfigToml) -> Result<(), anyhow::Error }) .collect::>(); - let acme_contexts = AcmeContexts::try_new( + if domains.is_empty() { + return Ok(None); + } + + let acme_manager = AcmeManager::try_new( acme_option.dir_url.as_deref(), acme_option.registry_path.as_deref(), &[acme_option.email], domains.as_slice(), + runtime_handle, )?; - // TODO: remove later - println!("ACME contexts: {:#?}", acme_contexts); - Ok(()) + Ok(Some(acme_manager)) } diff --git a/rpxy-bin/src/main.rs b/rpxy-bin/src/main.rs index 9847a5fa..f07212e3 100644 --- a/rpxy-bin/src/main.rs +++ b/rpxy-bin/src/main.rs @@ -68,16 +68,33 @@ async fn rpxy_service_without_watcher( let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?; let (proxy_conf, app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?; - #[cfg(feature = "acme")] // TODO: CURRENTLY NOT IMPLEMENTED, UNDER DESIGNING - let acme_manager = build_acme_manager(&config_toml).await; + #[cfg(feature = "acme")] + let acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?; let cert_service_and_rx = build_cert_manager(&config_toml) .await .map_err(|e| anyhow!("Invalid cert configuration: {e}"))?; - rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), &runtime_handle, None) + #[cfg(feature = "acme")] + { + rpxy_entrypoint( + &proxy_conf, + &app_conf, + cert_service_and_rx.as_ref(), + acme_manager.as_ref(), + &runtime_handle, + None, + ) .await .map_err(|e| anyhow!(e)) + } + + #[cfg(not(feature = "acme"))] + { + rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), &runtime_handle, None) + .await + .map_err(|e| anyhow!(e)) + } } async fn rpxy_service_with_watcher( @@ -93,8 +110,8 @@ async fn rpxy_service_with_watcher( .ok_or(anyhow!("Something wrong in config reloader receiver"))?; let (mut proxy_conf, mut app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?; - #[cfg(feature = "acme")] // TODO: CURRENTLY NOT IMPLEMENTED, UNDER DESIGNING - let acme_manager = build_acme_manager(&config_toml).await; + #[cfg(feature = "acme")] + let acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?; let mut cert_service_and_rx = build_cert_manager(&config_toml) .await @@ -106,7 +123,16 @@ async fn rpxy_service_with_watcher( // Continuous monitoring loop { tokio::select! { - rpxy_res = rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), &runtime_handle, Some(term_notify.clone())) => { + rpxy_res = { + #[cfg(feature = "acme")] + { + rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), acme_manager.as_ref(), &runtime_handle, Some(term_notify.clone())) + } + #[cfg(not(feature = "acme"))] + { + rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), &runtime_handle, Some(term_notify.clone())) + } + } => { error!("rpxy entrypoint or cert service exited"); return rpxy_res.map_err(|e| anyhow!(e)); } @@ -145,6 +171,7 @@ async fn rpxy_service_with_watcher( Ok(()) } +#[cfg(not(feature = "acme"))] /// Wrapper of entry point for rpxy service with certificate management service async fn rpxy_entrypoint( proxy_config: &rpxy_lib::ProxyConfig, @@ -152,7 +179,7 @@ async fn rpxy_entrypoint( cert_service_and_rx: Option<&( ReloaderService, ReloaderReceiver, - )>, // TODO: + )>, runtime_handle: &tokio::runtime::Handle, term_notify: Option>, ) -> Result<(), anyhow::Error> { @@ -173,3 +200,41 @@ async fn rpxy_entrypoint( .map_err(|e| anyhow!(e)) } } + +#[cfg(feature = "acme")] +/// Wrapper of entry point for rpxy service with certificate management service +async fn rpxy_entrypoint( + proxy_config: &rpxy_lib::ProxyConfig, + app_config_list: &rpxy_lib::AppConfigList, + cert_service_and_rx: Option<&( + ReloaderService, + ReloaderReceiver, + )>, + acme_manager: Option<&rpxy_acme::AcmeManager>, + runtime_handle: &tokio::runtime::Handle, + term_notify: Option>, +) -> Result<(), anyhow::Error> { + // TODO: remove later, reconsider routine + println!("ACME manager:\n{:#?}", acme_manager); + let x = acme_manager.unwrap().clone(); + let (handle, confs) = x.spawn_manager_tasks(); + tokio::spawn(async move { futures_util::future::select_all(handle).await }); + // TODO: + + if let Some((cert_service, cert_rx)) = cert_service_and_rx { + tokio::select! { + rpxy_res = entrypoint(proxy_config, app_config_list, Some(cert_rx), runtime_handle, term_notify) => { + error!("rpxy entrypoint exited"); + rpxy_res.map_err(|e| anyhow!(e)) + } + cert_res = cert_service.start() => { + error!("cert reloader service exited"); + cert_res.map_err(|e| anyhow!(e)) + } + } + } else { + entrypoint(proxy_config, app_config_list, None, runtime_handle, term_notify) + .await + .map_err(|e| anyhow!(e)) + } +}