From f9cb57b2957c49a50c56a65d3afc2d470ffd1c91 Mon Sep 17 00:00:00 2001 From: David Venhoek Date: Fri, 6 Dec 2024 13:20:15 +0100 Subject: [PATCH] Made tokio runtime dynamic based on configuration of daemon. --- ntpd/bin/ntp-ctl.rs | 5 +- ntpd/bin/ntp-daemon.rs | 5 +- ntpd/bin/ntp-metrics-exporter.rs | 5 +- ntpd/src/ctl.rs | 26 +++-- ntpd/src/daemon/config/mod.rs | 16 +-- ntpd/src/daemon/mod.rs | 114 +++++++++++---------- ntpd/src/force_sync/mod.rs | 80 ++++++++------- ntpd/src/metrics/exporter.rs | 164 ++++++++++++++++--------------- 8 files changed, 218 insertions(+), 197 deletions(-) diff --git a/ntpd/bin/ntp-ctl.rs b/ntpd/bin/ntp-ctl.rs index 966f61e28..4c51246fa 100644 --- a/ntpd/bin/ntp-ctl.rs +++ b/ntpd/bin/ntp-ctl.rs @@ -1,6 +1,5 @@ #![forbid(unsafe_code)] -#[tokio::main] -async fn main() -> std::io::Result { - ntpd::ctl_main().await +fn main() -> std::io::Result { + ntpd::ctl_main() } diff --git a/ntpd/bin/ntp-daemon.rs b/ntpd/bin/ntp-daemon.rs index 0b21dfcdd..90ddf45de 100644 --- a/ntpd/bin/ntp-daemon.rs +++ b/ntpd/bin/ntp-daemon.rs @@ -2,8 +2,7 @@ use std::process; -#[tokio::main] -async fn main() { - let result = ntpd::daemon_main().await; +fn main() { + let result = ntpd::daemon_main(); process::exit(if result.is_ok() { 0 } else { 1 }); } diff --git a/ntpd/bin/ntp-metrics-exporter.rs b/ntpd/bin/ntp-metrics-exporter.rs index ff4c221c7..e58aa98c6 100644 --- a/ntpd/bin/ntp-metrics-exporter.rs +++ b/ntpd/bin/ntp-metrics-exporter.rs @@ -1,6 +1,5 @@ #![forbid(unsafe_code)] -#[tokio::main] -async fn main() -> Result<(), Box> { - ntpd::metrics_exporter_main().await +fn main() -> Result<(), Box> { + ntpd::metrics_exporter_main() } diff --git a/ntpd/src/ctl.rs b/ntpd/src/ctl.rs index 6403afad6..3bf874981 100644 --- a/ntpd/src/ctl.rs +++ b/ntpd/src/ctl.rs @@ -4,6 +4,7 @@ use crate::{ daemon::{config::CliArg, tracing::LogLevel, Config, ObservableState}, force_sync, }; +use tokio::runtime::Builder; use tracing_subscriber::util::SubscriberInitExt; const USAGE_MSG: &str = "\ @@ -146,10 +147,10 @@ impl NtpCtlOptions { } } -async fn validate(config: Option) -> std::io::Result { +fn validate(config: Option) -> std::io::Result { // Late completion not needed, so ignore result. crate::daemon::tracing::tracing_init(LogLevel::Info, true).init(); - match Config::from_args(config, vec![], vec![]).await { + match Config::from_args(config, vec![], vec![]) { Ok(config) => { if config.check() { eprintln!("Config looks good"); @@ -167,7 +168,7 @@ async fn validate(config: Option) -> std::io::Result { const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub async fn main() -> std::io::Result { +pub fn main() -> std::io::Result { let options = match NtpCtlOptions::try_parse_from(std::env::args()) { Ok(options) => options, Err(msg) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)), @@ -182,10 +183,10 @@ pub async fn main() -> std::io::Result { eprintln!("ntp-ctl {VERSION}"); Ok(ExitCode::SUCCESS) } - NtpCtlAction::Validate => validate(options.config).await, - NtpCtlAction::ForceSync => force_sync::force_sync(options.config).await, + NtpCtlAction::Validate => validate(options.config), + NtpCtlAction::ForceSync => force_sync::force_sync(options.config), NtpCtlAction::Status => { - let config = Config::from_args(options.config, vec![], vec![]).await; + let config = Config::from_args(options.config, vec![], vec![]); if let Err(ref e) = config { println!("Warning: Unable to load configuration file: {e}"); @@ -198,10 +199,15 @@ pub async fn main() -> std::io::Result { .observation_path .unwrap_or_else(|| PathBuf::from("/var/run/ntpd-rs/observe")); - match options.format { - Format::Plain => print_state(Format::Plain, observation).await, - Format::Prometheus => print_state(Format::Prometheus, observation).await, - } + Builder::new_current_thread() + .enable_all() + .build()? + .block_on(async { + match options.format { + Format::Plain => print_state(Format::Plain, observation).await, + Format::Prometheus => print_state(Format::Prometheus, observation).await, + } + }) } } } diff --git a/ntpd/src/daemon/config/mod.rs b/ntpd/src/daemon/config/mod.rs index 1b2d3e0a1..75640b874 100644 --- a/ntpd/src/daemon/config/mod.rs +++ b/ntpd/src/daemon/config/mod.rs @@ -9,6 +9,7 @@ use ntp_proto::{AlgorithmConfig, SourceDefaultsConfig, SynchronizationConfig}; pub use ntp_source::*; use serde::{Deserialize, Deserializer}; pub use server::*; +use std::io; use std::{ fmt::Display, io::ErrorKind, @@ -18,7 +19,6 @@ use std::{ str::FromStr, }; use timestamped_socket::interface::InterfaceName; -use tokio::{fs::read_to_string, io}; use tracing::{info, warn}; use super::{clock::NtpClockWrapper, tracing::LogLevel}; @@ -373,7 +373,7 @@ pub struct Config { } impl Config { - async fn from_file(file: impl AsRef) -> Result { + fn from_file(file: impl AsRef) -> Result { let meta = std::fs::metadata(&file)?; let perm = meta.permissions(); @@ -381,23 +381,23 @@ impl Config { warn!("Unrestricted config file permissions: Others can write."); } - let contents = read_to_string(file).await?; + let contents = std::fs::read_to_string(file)?; Ok(toml::de::from_str(&contents)?) } - async fn from_first_file(file: Option>) -> Result { + fn from_first_file(file: Option>) -> Result { // if an explicit file is given, always use that one if let Some(f) = file { let path: &Path = f.as_ref(); info!(?path, "using config file"); - return Config::from_file(f).await; + return Config::from_file(f); } // for the global file we also ignore it when there are permission errors let global_path = Path::new("/etc/ntpd-rs/ntp.toml"); if global_path.exists() { info!("using config file at default location `{:?}`", global_path); - match Config::from_file(global_path).await { + match Config::from_file(global_path) { Err(ConfigError::Io(e)) if e.kind() == ErrorKind::PermissionDenied => { warn!("permission denied on global config file! using default config ..."); } @@ -410,12 +410,12 @@ impl Config { Ok(Config::default()) } - pub async fn from_args( + pub fn from_args( file: Option>, sources: Vec, servers: Vec, ) -> Result { - let mut config = Config::from_first_file(file.as_ref()).await?; + let mut config = Config::from_first_file(file.as_ref())?; if !sources.is_empty() { if !config.sources.is_empty() { diff --git a/ntpd/src/daemon/mod.rs b/ntpd/src/daemon/mod.rs index aac549fb3..510783378 100644 --- a/ntpd/src/daemon/mod.rs +++ b/ntpd/src/daemon/mod.rs @@ -20,6 +20,7 @@ pub use config::Config; use ntp_proto::KalmanClockController; pub use observer::ObservableState; pub use system::spawn; +use tokio::runtime::Builder; use tracing_subscriber::util::SubscriberInitExt; use config::NtpDaemonOptions; @@ -28,7 +29,7 @@ use self::tracing::LogLevel; const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub async fn main() -> Result<(), Box> { +pub fn main() -> Result<(), Box> { let options = NtpDaemonOptions::try_parse_from(std::env::args())?; match options.action { @@ -38,7 +39,7 @@ pub async fn main() -> Result<(), Box> { config::NtpDaemonAction::Version => { eprintln!("ntp-daemon {VERSION}"); } - config::NtpDaemonAction::Run => run(options).await?, + config::NtpDaemonAction::Run => run(options)?, } Ok(()) @@ -46,7 +47,7 @@ pub async fn main() -> Result<(), Box> { // initializes the logger so that logs during config parsing are reported. Then it overrides the // log level based on the config if required. -pub(crate) async fn initialize_logging_parse_config( +pub(crate) fn initialize_logging_parse_config( initial_log_level: Option, config_path: Option, ) -> Config { @@ -54,18 +55,15 @@ pub(crate) async fn initialize_logging_parse_config( let config_tracing = crate::daemon::tracing::tracing_init(log_level, true); let config = ::tracing::subscriber::with_default(config_tracing, || { - async { - match Config::from_args(config_path, vec![], vec![]).await { - Ok(c) => c, - Err(e) => { - // print to stderr because tracing is not yet setup - eprintln!("There was an error loading the config: {e}"); - std::process::exit(exitcode::CONFIG); - } + match Config::from_args(config_path, vec![], vec![]) { + Ok(c) => c, + Err(e) => { + // print to stderr because tracing is not yet setup + eprintln!("There was an error loading the config: {e}"); + std::process::exit(exitcode::CONFIG); } } - }) - .await; + }); if let Some(config_log_level) = config.observability.log_level { if initial_log_level.is_none() { @@ -80,51 +78,59 @@ pub(crate) async fn initialize_logging_parse_config( config } -async fn run(options: NtpDaemonOptions) -> Result<(), Box> { - let config = initialize_logging_parse_config(options.log_level, options.config).await; +fn run(options: NtpDaemonOptions) -> Result<(), Box> { + let config = initialize_logging_parse_config(options.log_level, options.config); - // give the user a warning that we use the command line option - if config.observability.log_level.is_some() && options.log_level.is_some() { - info!("Log level override from command line arguments is active"); - } + let runtime = if config.servers.is_empty() && config.nts_ke.is_empty() { + Builder::new_current_thread().enable_all().build()? + } else { + Builder::new_multi_thread().enable_all().build()? + }; - // Warn/error if the config is unreasonable. We do this after finishing - // tracing setup to ensure logging is fully configured. - config.check(); - - // we always generate the keyset (even if NTS is not used) - let keyset = nts_key_provider::spawn(config.keyset).await; - - #[cfg(feature = "hardware-timestamping")] - let clock_config = config.clock; - - #[cfg(not(feature = "hardware-timestamping"))] - let clock_config = config::ClockConfig::default(); - - ::tracing::debug!("Configuration loaded, spawning daemon jobs"); - let (main_loop_handle, channels) = spawn::>( - config.synchronization.synchronization_base, - config.synchronization.algorithm, - config.source_defaults, - clock_config, - &config.sources, - &config.servers, - keyset.clone(), - ) - .await?; - - for nts_ke_config in config.nts_ke { - let _join_handle = keyexchange::spawn(nts_ke_config, keyset.clone()); - } + runtime.block_on(async { + // give the user a warning that we use the command line option + if config.observability.log_level.is_some() && options.log_level.is_some() { + info!("Log level override from command line arguments is active"); + } - observer::spawn( - &config.observability, - channels.source_snapshots, - channels.server_data_receiver, - channels.system_snapshot_receiver, - ); + // Warn/error if the config is unreasonable. We do this after finishing + // tracing setup to ensure logging is fully configured. + config.check(); + + // we always generate the keyset (even if NTS is not used) + let keyset = nts_key_provider::spawn(config.keyset).await; + + #[cfg(feature = "hardware-timestamping")] + let clock_config = config.clock; + + #[cfg(not(feature = "hardware-timestamping"))] + let clock_config = config::ClockConfig::default(); + + ::tracing::debug!("Configuration loaded, spawning daemon jobs"); + let (main_loop_handle, channels) = spawn::>( + config.synchronization.synchronization_base, + config.synchronization.algorithm, + config.source_defaults, + clock_config, + &config.sources, + &config.servers, + keyset.clone(), + ) + .await?; + + for nts_ke_config in config.nts_ke { + let _join_handle = keyexchange::spawn(nts_ke_config, keyset.clone()); + } + + observer::spawn( + &config.observability, + channels.source_snapshots, + channels.server_data_receiver, + channels.system_snapshot_receiver, + ); - Ok(main_loop_handle.await??) + Ok(main_loop_handle.await??) + }) } pub(crate) mod exitcode { diff --git a/ntpd/src/force_sync/mod.rs b/ntpd/src/force_sync/mod.rs index cff0abb0b..20b044ad0 100644 --- a/ntpd/src/force_sync/mod.rs +++ b/ntpd/src/force_sync/mod.rs @@ -7,6 +7,7 @@ use std::{ use algorithm::{SingleShotController, SingleShotControllerConfig}; use ntp_proto::{NtpClock, NtpDuration}; +use tokio::runtime::Builder; #[cfg(feature = "unstable_nts-pool")] use crate::daemon::config::NtsPoolSourceConfig; @@ -108,8 +109,8 @@ impl SingleShotController { } } -pub(crate) async fn force_sync(config: Option) -> std::io::Result { - let config = initialize_logging_parse_config(Some(LogLevel::Warn), config).await; +pub(crate) fn force_sync(config: Option) -> std::io::Result { + let config = initialize_logging_parse_config(Some(LogLevel::Warn), config); // Warn/error if the config is unreasonable. We do this after finishing // tracing setup to ensure logging is fully configured. @@ -122,45 +123,52 @@ pub(crate) async fn force_sync(config: Option) -> std::io::Result total_sources += 1, - config::NtpSourceConfig::Pool(PoolSourceConfig { count, .. }) => total_sources += count, - #[cfg(feature = "unstable_nts-pool")] - config::NtpSourceConfig::NtsPool(NtsPoolSourceConfig { count, .. }) => { - total_sources += count + Builder::new_current_thread() + .enable_all() + .build()? + .block_on(async { + // Count number of sources + let mut total_sources = 0; + for source in &config.sources { + match source { + config::NtpSourceConfig::Standard(_) + | config::NtpSourceConfig::Nts(_) + | config::NtpSourceConfig::Sock(_) => total_sources += 1, + config::NtpSourceConfig::Pool(PoolSourceConfig { count, .. }) => { + total_sources += count + } + #[cfg(feature = "unstable_nts-pool")] + config::NtpSourceConfig::NtsPool(NtsPoolSourceConfig { count, .. }) => { + total_sources += count + } + } } - } - } - // We will need to have a keyset for the daemon - let keyset = nts_key_provider::spawn(config.keyset).await; + // We will need to have a keyset for the daemon + let keyset = nts_key_provider::spawn(config.keyset).await; - #[cfg(feature = "hardware-timestamping")] - let clock_config = config.clock; + #[cfg(feature = "hardware-timestamping")] + let clock_config = config.clock; - #[cfg(not(feature = "hardware-timestamping"))] - let clock_config = config::ClockConfig::default(); + #[cfg(not(feature = "hardware-timestamping"))] + let clock_config = config::ClockConfig::default(); - ::tracing::debug!("Configuration loaded, spawning daemon jobs"); - let (main_loop_handle, _) = spawn::>( - config.synchronization.synchronization_base, - SingleShotControllerConfig { - expected_sources: total_sources, - }, - config.source_defaults, - clock_config, - &config.sources, - &[], // No serving when operating in force sync mode - keyset.clone(), - ) - .await?; + ::tracing::debug!("Configuration loaded, spawning daemon jobs"); + let (main_loop_handle, _) = spawn::>( + config.synchronization.synchronization_base, + SingleShotControllerConfig { + expected_sources: total_sources, + }, + config.source_defaults, + clock_config, + &config.sources, + &[], // No serving when operating in force sync mode + keyset.clone(), + ) + .await?; - let _ = main_loop_handle.await; + let _ = main_loop_handle.await; - Ok(ExitCode::SUCCESS) + Ok(ExitCode::SUCCESS) + }) } diff --git a/ntpd/src/metrics/exporter.rs b/ntpd/src/metrics/exporter.rs index 998f1880f..1277c3e3d 100644 --- a/ntpd/src/metrics/exporter.rs +++ b/ntpd/src/metrics/exporter.rs @@ -2,6 +2,7 @@ use libc::{ECONNABORTED, EMFILE, ENFILE, ENOBUFS, ENOMEM}; use timestamped_socket::interface::ChangeDetector; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; +use tokio::runtime::Builder; use tracing::{debug, error, trace, warn}; use std::{ @@ -109,7 +110,7 @@ impl NtpMetricsExporterOptions { } } -pub async fn main() -> Result<(), Box> { +pub fn main() -> Result<(), Box> { let options = NtpMetricsExporterOptions::try_parse_from(std::env::args())?; match options.action { MetricsAction::Help => { @@ -120,94 +121,97 @@ pub async fn main() -> Result<(), Box> { eprintln!("ntp-metrics-exporter {VERSION}"); Ok(()) } - MetricsAction::Run => run(options).await, + MetricsAction::Run => run(options), } } -async fn run(options: NtpMetricsExporterOptions) -> Result<(), Box> { - let config = initialize_logging_parse_config(None, options.config).await; - let timeout = std::time::Duration::from_millis(1000); +fn run(options: NtpMetricsExporterOptions) -> Result<(), Box> { + let config = initialize_logging_parse_config(None, options.config); - let observation_socket_path = match config.observability.observation_path { - Some(path) => Arc::new(path), - None => { - eprintln!("An observation socket path must be configured using the observation-path option in the [observability] section of the configuration"); - std::process::exit(1); - } - }; - - println!( - "starting ntp-metrics-exporter on {}", - &config.observability.metrics_exporter_listen - ); - - let listener = loop { - match TcpListener::bind(&config.observability.metrics_exporter_listen).await { - Err(e) if e.kind() == std::io::ErrorKind::AddrNotAvailable => { - tracing::info!("Could not open listening socket, waiting for interface to come up"); - let _ = tokio::time::timeout( - std::time::Duration::from_secs(60), - ChangeDetector::new()?.wait_for_change(), - ) - .await; - } - Err(e) => { - tracing::warn!("Could not open listening socket: {}", e); - let _ = tokio::time::timeout( - std::time::Duration::from_secs(60), - ChangeDetector::new()?.wait_for_change(), - ) - .await; + Builder::new_current_thread().enable_all().build()?.block_on(async { + let timeout = std::time::Duration::from_millis(1000); + + let observation_socket_path = match config.observability.observation_path { + Some(path) => Arc::new(path), + None => { + eprintln!("An observation socket path must be configured using the observation-path option in the [observability] section of the configuration"); + std::process::exit(1); } - Ok(listener) => break listener, }; - }; - - // this has a lot more permits than the daemon observer has, but we expect http transfers to - // take longer than how much time the daemon needs to return observability data - let permits = Arc::new(tokio::sync::Semaphore::new(100)); - - loop { - let permit = permits - .clone() - .acquire_owned() - .await - .expect("semaphore was unexpectedly closed"); - let (mut tcp_stream, _) = match listener.accept().await { - Ok(a) => a, - Err(e) if matches!(e.raw_os_error(), Some(ECONNABORTED)) => { - debug!("Client unexpectedly closed connection: {e}"); - continue; - } - Err(e) - if matches!( - e.raw_os_error(), - Some(ENFILE) | Some(EMFILE) | Some(ENOMEM) | Some(ENOBUFS) - ) => - { - error!("Not enough resources available to accept incoming connection: {e}"); - tokio::time::sleep(timeout).await; - continue; - } - Err(e) => { - error!("Could not accept incoming connection: {e}"); - return Err(e.into()); - } + + println!( + "starting ntp-metrics-exporter on {}", + &config.observability.metrics_exporter_listen + ); + + let listener = loop { + match TcpListener::bind(&config.observability.metrics_exporter_listen).await { + Err(e) if e.kind() == std::io::ErrorKind::AddrNotAvailable => { + tracing::info!("Could not open listening socket, waiting for interface to come up"); + let _ = tokio::time::timeout( + std::time::Duration::from_secs(60), + ChangeDetector::new()?.wait_for_change(), + ) + .await; + } + Err(e) => { + tracing::warn!("Could not open listening socket: {}", e); + let _ = tokio::time::timeout( + std::time::Duration::from_secs(60), + ChangeDetector::new()?.wait_for_change(), + ) + .await; + } + Ok(listener) => break listener, + }; }; - let path = observation_socket_path.clone(); - // handle each connection on a separate task - let fut = async move { handle_connection(&mut tcp_stream, &path).await }; + // this has a lot more permits than the daemon observer has, but we expect http transfers to + // take longer than how much time the daemon needs to return observability data + let permits = Arc::new(tokio::sync::Semaphore::new(100)); - tokio::spawn(async move { - match tokio::time::timeout(timeout, fut).await { - Err(_) => debug!("connection timed out"), - Ok(Err(e)) => warn!("error handling connection: {e}"), - Ok(_) => trace!("connection handled successfully"), - } - drop(permit); - }); - } + loop { + let permit = permits + .clone() + .acquire_owned() + .await + .expect("semaphore was unexpectedly closed"); + let (mut tcp_stream, _) = match listener.accept().await { + Ok(a) => a, + Err(e) if matches!(e.raw_os_error(), Some(ECONNABORTED)) => { + debug!("Client unexpectedly closed connection: {e}"); + continue; + } + Err(e) + if matches!( + e.raw_os_error(), + Some(ENFILE) | Some(EMFILE) | Some(ENOMEM) | Some(ENOBUFS) + ) => + { + error!("Not enough resources available to accept incoming connection: {e}"); + tokio::time::sleep(timeout).await; + continue; + } + Err(e) => { + error!("Could not accept incoming connection: {e}"); + return Err(e.into()); + } + }; + let path = observation_socket_path.clone(); + + // handle each connection on a separate task + let fut = async move { handle_connection(&mut tcp_stream, &path).await }; + + tokio::spawn(async move { + match tokio::time::timeout(timeout, fut).await { + Err(_) => debug!("connection timed out"), + Ok(Err(e)) => warn!("error handling connection: {e}"), + Ok(_) => trace!("connection handled successfully"), + } + drop(permit); + }); + } + }) } async fn handle_connection(