Skip to content

Commit

Permalink
wip: implementing api
Browse files Browse the repository at this point in the history
  • Loading branch information
junkurihara committed Jul 17, 2024
1 parent 4d889e6 commit 9e79a48
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 48 deletions.
2 changes: 2 additions & 0 deletions rpxy-acme/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ rustls-platform-verifier = { version = "0.3.2" }
rustls-acme = { path = "../submodules/rustls-acme/", default-features = false, features = [
"aws-lc-rs",
] }
tokio = { version = "1.38.1", default-features = false }
tokio-stream = { version = "0.1.15", default-features = false }
2 changes: 1 addition & 1 deletion rpxy-acme/src/dir_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ enum FileType {
Cert,
}

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct DirCache {
pub(super) account_dir: PathBuf,
pub(super) cert_dir: PathBuf,
Expand Down
4 changes: 2 additions & 2 deletions rpxy-acme/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod constants;
mod dir_cache;
mod error;
mod targets;
mod manager;

#[allow(unused_imports)]
mod log {
Expand All @@ -11,4 +11,4 @@ mod log {
pub use constants::{ACME_DIR_URL, ACME_REGISTRY_PATH};
pub use dir_cache::DirCache;
pub use error::RpxyAcmeError;
pub use targets::AcmeContexts;
pub use manager::AcmeManager;
91 changes: 63 additions & 28 deletions rpxy-acme/src/targets.rs → rpxy-acme/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,37 @@ use crate::{
log::*,
};
use rustc_hash::FxHashMap as HashMap;
// use rustls_acme::AcmeConfig;
use std::path::PathBuf;
use rustls::ServerConfig;
use rustls_acme::AcmeConfig;
use std::{path::PathBuf, sync::Arc};
use tokio::runtime::Handle;
use tokio_stream::StreamExt;
use url::Url;

#[derive(Debug)]
#[derive(Debug, Clone)]
/// ACME settings
pub struct AcmeContexts {
pub struct AcmeManager {
/// ACME directory url
acme_dir_url: Url,
/// ACME registry directory
acme_registry_dir: PathBuf,
// /// ACME registry directory
// acme_registry_dir: PathBuf,
/// ACME contacts
contacts: Vec<String>,
/// ACME directly cache information
inner: HashMap<String, DirCache>,
/// Tokio runtime handle
runtime_handle: Handle,
}

impl AcmeContexts {
impl AcmeManager {
/// Create a new instance. Note that for each domain, a new AcmeConfig is created.
/// This means that for each domain, a distinct operation will be dispatched and separated certificates will be generated.
pub fn try_new(
acme_dir_url: Option<&str>,
acme_registry_dir: Option<&str>,
contacts: &[String],
domains: &[String],
runtime_handle: Handle,
) -> Result<Self, RpxyAcmeError> {
// Install aws_lc_rs as default crypto provider for rustls
let _ = rustls::crypto::CryptoProvider::install_default(rustls::crypto::aws_lc_rs::default_provider());
Expand All @@ -45,11 +51,6 @@ impl AcmeContexts {
.as_deref()
.map_or_else(|| Url::parse(ACME_DIR_URL), Url::parse)?;
let contacts = contacts.iter().map(|email| format!("mailto:{email}")).collect::<Vec<_>>();
// let rustls_client_config = rustls::ClientConfig::builder()
// .dangerous() // The `Verifier` we're using is actually safe
// .with_custom_certificate_verifier(std::sync::Arc::new(rustls_platform_verifier::Verifier::new()))
// .with_no_client_auth();
// let rustls_client_config = Arc::new(rustls_client_config);

let inner = domains
.iter()
Expand All @@ -59,27 +60,59 @@ impl AcmeContexts {
(domain, dir_cache)
})
.collect::<HashMap<_, _>>();
// let inner = domains
// .iter()
// .map(|domain| {
// let dir_cache = DirCache::new(&acme_registry_dir, domain);
// let config = AcmeConfig::new([domain])
// .contact(&contacts)
// .cache(dir_cache)
// .directory(acme_dir_url.as_str())
// .client_tls_config(rustls_client_config.clone());
// let config = Box::new(config);
// (domain.to_ascii_lowercase(), config)
// })
// .collect::<HashMap<_, _>>();

Ok(Self {
acme_dir_url,
acme_registry_dir,
// acme_registry_dir,
contacts,
inner,
runtime_handle,
})
}

/// Start ACME manager to manage certificates for each domain.
/// Returns a Vec<JoinHandle<()>> as a tasks handles and a map of domain to ServerConfig for challenge.
pub fn spawn_manager_tasks(&self) -> (Vec<tokio::task::JoinHandle<()>>, HashMap<String, Arc<ServerConfig>>) {
info!("rpxy ACME manager started");

let rustls_client_config = rustls::ClientConfig::builder()
.dangerous() // The `Verifier` we're using is actually safe
.with_custom_certificate_verifier(Arc::new(rustls_platform_verifier::Verifier::new()))
.with_no_client_auth();
let rustls_client_config = Arc::new(rustls_client_config);

let mut server_configs_for_challenge: HashMap<String, Arc<ServerConfig>> = HashMap::default();
let join_handles = self
.inner
.clone()
.into_iter()
.map(|(domain, dir_cache)| {
let config = AcmeConfig::new([&domain])
.contact(&self.contacts)
.cache(dir_cache.to_owned())
.directory(self.acme_dir_url.as_str())
.client_tls_config(rustls_client_config.clone());
let mut state = config.state();
server_configs_for_challenge.insert(domain.to_ascii_lowercase(), state.challenge_rustls_config());
self.runtime_handle.spawn(async move {
info!("rpxy ACME manager task for {domain} started");
// infinite loop unless the return value is None
loop {
let Some(res) = state.next().await else {
error!("rpxy ACME manager task for {domain} exited");
break;
};
match res {
Ok(ok) => info!("rpxy ACME event: {ok:?}"),
Err(err) => error!("rpxy ACME error: {err:?}"),
}
}
})
})
.collect::<Vec<_>>();

(join_handles, server_configs_for_challenge)
}
}

#[cfg(test)]
Expand All @@ -93,17 +126,19 @@ mod tests {
let acme_dir_url = "https://acme.example.com/directory";
let acme_registry_dir = "/tmp/acme";
let contacts = vec!["[email protected]".to_string()];
let acme_contexts: AcmeContexts = AcmeContexts::try_new(
let handle = Handle::current();
let acme_contexts: AcmeManager = AcmeManager::try_new(
Some(acme_dir_url),
Some(acme_registry_dir),
&contacts,
&["example.com".to_string(), "example.org".to_string()],
handle,
)
.unwrap();
assert_eq!(acme_contexts.inner.len(), 2);
assert_eq!(acme_contexts.contacts, vec!["mailto:[email protected]".to_string()]);
assert_eq!(acme_contexts.acme_dir_url.as_str(), acme_dir_url);
assert_eq!(acme_contexts.acme_registry_dir, PathBuf::from(acme_registry_dir));
// assert_eq!(acme_contexts.acme_registry_dir, PathBuf::from(acme_registry_dir));
assert_eq!(
acme_contexts.inner["example.com"],
DirCache {
Expand Down
2 changes: 1 addition & 1 deletion rpxy-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ tokio = { version = "1.38.1", default-features = false, features = [
"macros",
] }
async-trait = "0.1.81"

futures-util = { version = "0.3.30", default-features = false }

# config
clap = { version = "4.5.9", features = ["std", "cargo", "wrap_help"] }
Expand Down
23 changes: 14 additions & 9 deletions rpxy-bin/src/config/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rpxy_lib::{AppConfig, AppConfigList, ProxyConfig};
use rustc_hash::FxHashMap as HashMap;

#[cfg(feature = "acme")]
use rpxy_acme::{AcmeContexts, ACME_DIR_URL, ACME_REGISTRY_PATH};
use rpxy_acme::{AcmeManager, ACME_DIR_URL, ACME_REGISTRY_PATH};

/// Parsed options
pub struct Opts {
Expand Down Expand Up @@ -157,12 +157,14 @@ pub async fn build_cert_manager(

/* ----------------------- */
#[cfg(feature = "acme")]
/// Build acme manager and dummy cert and key as initial states if not exists
/// TODO: CURRENTLY NOT IMPLEMENTED, UNDER DESIGNING
pub async fn build_acme_manager(config: &ConfigToml) -> Result<(), anyhow::Error> {
/// Build acme manager
pub async fn build_acme_manager(
config: &ConfigToml,
runtime_handle: tokio::runtime::Handle,
) -> Result<Option<AcmeManager>, anyhow::Error> {
let acme_option = config.experimental.as_ref().and_then(|v| v.acme.clone());
if acme_option.is_none() {
return Ok(());
return Ok(None);
}
let acme_option = acme_option.unwrap();

Expand All @@ -183,14 +185,17 @@ pub async fn build_acme_manager(config: &ConfigToml) -> Result<(), anyhow::Error
})
.collect::<Vec<_>>();

let acme_contexts = AcmeContexts::try_new(
if domains.is_empty() {
return Ok(None);
}

let acme_manager = AcmeManager::try_new(
acme_option.dir_url.as_deref(),
acme_option.registry_path.as_deref(),
&[acme_option.email],
domains.as_slice(),
runtime_handle,
)?;

// TODO: remove later
println!("ACME contexts: {:#?}", acme_contexts);
Ok(())
Ok(Some(acme_manager))
}
79 changes: 72 additions & 7 deletions rpxy-bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,33 @@ async fn rpxy_service_without_watcher(
let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?;
let (proxy_conf, app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?;

#[cfg(feature = "acme")] // TODO: CURRENTLY NOT IMPLEMENTED, UNDER DESIGNING
let acme_manager = build_acme_manager(&config_toml).await;
#[cfg(feature = "acme")]
let acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?;

let cert_service_and_rx = build_cert_manager(&config_toml)
.await
.map_err(|e| anyhow!("Invalid cert configuration: {e}"))?;

rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), &runtime_handle, None)
#[cfg(feature = "acme")]
{
rpxy_entrypoint(
&proxy_conf,
&app_conf,
cert_service_and_rx.as_ref(),
acme_manager.as_ref(),
&runtime_handle,
None,
)
.await
.map_err(|e| anyhow!(e))
}

#[cfg(not(feature = "acme"))]
{
rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), &runtime_handle, None)
.await
.map_err(|e| anyhow!(e))
}
}

async fn rpxy_service_with_watcher(
Expand All @@ -93,8 +110,8 @@ async fn rpxy_service_with_watcher(
.ok_or(anyhow!("Something wrong in config reloader receiver"))?;
let (mut proxy_conf, mut app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?;

#[cfg(feature = "acme")] // TODO: CURRENTLY NOT IMPLEMENTED, UNDER DESIGNING
let acme_manager = build_acme_manager(&config_toml).await;
#[cfg(feature = "acme")]
let acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?;

let mut cert_service_and_rx = build_cert_manager(&config_toml)
.await
Expand All @@ -106,7 +123,16 @@ async fn rpxy_service_with_watcher(
// Continuous monitoring
loop {
tokio::select! {
rpxy_res = rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), &runtime_handle, Some(term_notify.clone())) => {
rpxy_res = {
#[cfg(feature = "acme")]
{
rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), acme_manager.as_ref(), &runtime_handle, Some(term_notify.clone()))
}
#[cfg(not(feature = "acme"))]
{
rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), &runtime_handle, Some(term_notify.clone()))
}
} => {
error!("rpxy entrypoint or cert service exited");
return rpxy_res.map_err(|e| anyhow!(e));
}
Expand Down Expand Up @@ -145,14 +171,15 @@ async fn rpxy_service_with_watcher(
Ok(())
}

#[cfg(not(feature = "acme"))]
/// Wrapper of entry point for rpxy service with certificate management service
async fn rpxy_entrypoint(
proxy_config: &rpxy_lib::ProxyConfig,
app_config_list: &rpxy_lib::AppConfigList,
cert_service_and_rx: Option<&(
ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>,
ReloaderReceiver<rpxy_certs::ServerCryptoBase>,
)>, // TODO:
)>,
runtime_handle: &tokio::runtime::Handle,
term_notify: Option<std::sync::Arc<tokio::sync::Notify>>,
) -> Result<(), anyhow::Error> {
Expand All @@ -173,3 +200,41 @@ async fn rpxy_entrypoint(
.map_err(|e| anyhow!(e))
}
}

#[cfg(feature = "acme")]
/// Wrapper of entry point for rpxy service with certificate management service
async fn rpxy_entrypoint(
proxy_config: &rpxy_lib::ProxyConfig,
app_config_list: &rpxy_lib::AppConfigList,
cert_service_and_rx: Option<&(
ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>,
ReloaderReceiver<rpxy_certs::ServerCryptoBase>,
)>,
acme_manager: Option<&rpxy_acme::AcmeManager>,
runtime_handle: &tokio::runtime::Handle,
term_notify: Option<std::sync::Arc<tokio::sync::Notify>>,
) -> Result<(), anyhow::Error> {
// TODO: remove later, reconsider routine
println!("ACME manager:\n{:#?}", acme_manager);
let x = acme_manager.unwrap().clone();
let (handle, confs) = x.spawn_manager_tasks();
tokio::spawn(async move { futures_util::future::select_all(handle).await });
// TODO:

if let Some((cert_service, cert_rx)) = cert_service_and_rx {
tokio::select! {
rpxy_res = entrypoint(proxy_config, app_config_list, Some(cert_rx), runtime_handle, term_notify) => {
error!("rpxy entrypoint exited");
rpxy_res.map_err(|e| anyhow!(e))
}
cert_res = cert_service.start() => {
error!("cert reloader service exited");
cert_res.map_err(|e| anyhow!(e))
}
}
} else {
entrypoint(proxy_config, app_config_list, None, runtime_handle, term_notify)
.await
.map_err(|e| anyhow!(e))
}
}

0 comments on commit 9e79a48

Please sign in to comment.