From 5904d98f0ffc6826895263af0dd29c8445cae487 Mon Sep 17 00:00:00 2001 From: Christian Berkhoff Date: Fri, 15 Nov 2024 16:22:39 -0800 Subject: [PATCH] Revert "Starting sharded helpers" This reverts commit a3d4097add03ec88b95af105e1169f1357d30865. --- ipa-core/src/bin/helper.rs | 155 +++++++++--------------------- ipa-core/src/config.rs | 88 +---------------- ipa-core/tests/common/mod.rs | 21 ++-- ipa-core/tests/helper_networks.rs | 6 +- 4 files changed, 61 insertions(+), 209 deletions(-) diff --git a/ipa-core/src/bin/helper.rs b/ipa-core/src/bin/helper.rs index 9b558c862..734db1bc5 100644 --- a/ipa-core/src/bin/helper.rs +++ b/ipa-core/src/bin/helper.rs @@ -8,23 +8,17 @@ use std::{ }; use clap::{self, Parser, Subcommand}; -use futures::future::join; use hyper::http::uri::Scheme; use ipa_core::{ cli::{ client_config_setup, keygen, test_setup, ConfGenArgs, KeygenArgs, LoggingHandle, TestSetupArgs, Verbosity, }, - config::{ - hpke_registry, sharded_server_from_toml_str, HpkeServerConfig, ServerConfig, TlsConfig, - }, + config::{hpke_registry, HpkeServerConfig, NetworkConfig, ServerConfig, TlsConfig}, error::BoxError, executor::IpaRuntime, helpers::HelperIdentity, - net::{ - ClientIdentity, ConnectionFlavor, IpaHttpClient, MpcHttpTransport, Shard, - ShardHttpTransport, - }, + net::{ClientIdentity, IpaHttpClient, MpcHttpTransport, ShardHttpTransport}, sharding::ShardIndex, AppConfig, AppSetup, NonZeroU32PowerOfTwo, }; @@ -61,31 +55,16 @@ struct ServerArgs { #[arg(short, long, required = true)] identity: Option, - #[arg(default_value = "0")] - shard_index: Option, - - #[arg(default_value = "1")] - shard_count: Option, - /// Port to listen on #[arg(short, long, default_value = "3000")] port: Option, - #[arg(default_value = "6000")] - shard_port: Option, - - /// Use the supplied prebound socket instead of binding a new socket for mpc + /// Use the supplied prebound socket instead of binding a new socket /// /// This is only intended for avoiding port conflicts in tests. #[arg(hide = true, long)] server_socket_fd: Option, - /// Use the supplied prebound socket instead of binding a new socket for shard server - /// - /// This is only intended for avoiding port conflicts in tests. - #[arg(hide = true, long)] - shard_server_socket_fd: Option, - /// Use insecure HTTP #[arg(short = 'k', long)] disable_https: bool, @@ -94,7 +73,7 @@ struct ServerArgs { #[arg(long, required = true)] network: Option, - /// TLS certificate for helper-to-helper and shard-to-shard communication + /// TLS certificate for helper-to-helper communication #[arg( long, visible_alias("cert"), @@ -103,7 +82,7 @@ struct ServerArgs { )] tls_cert: Option, - /// TLS key for helper-to-helper and shard-to-shard communication + /// TLS key for helper-to-helper communication #[arg(long, visible_alias("key"), requires = "tls_cert")] tls_key: Option, @@ -135,58 +114,24 @@ fn read_file(path: &Path) -> Result, BoxError> { .map_err(|e| format!("failed to open file {}: {e:?}", path.display()))?) } -/// Helper function that creates the client identity; either with certificates if they are provided -/// or just with headers otherwise. This works both for sharded and helper configs. -fn create_client_identity( - id: F::Identity, - tls_cert: Option, - tls_key: Option, -) -> Result<(ClientIdentity, Option), BoxError> { - match (tls_cert, tls_key) { +async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), BoxError> { + let my_identity = HelperIdentity::try_from(args.identity.expect("enforced by clap")).unwrap(); + + let (identity, server_tls) = match (args.tls_cert, args.tls_key) { (Some(cert_file), Some(key_file)) => { let mut key = read_file(&key_file)?; let mut certs = read_file(&cert_file)?; - Ok(( - ClientIdentity::::from_pkcs8(&mut certs, &mut key)?, + ( + ClientIdentity::from_pkcs8(&mut certs, &mut key)?, Some(TlsConfig::File { certificate_file: cert_file, private_key_file: key_file, }), - )) + ) } - (None, None) => Ok((ClientIdentity::Header(id), None)), - _ => Err("should have been rejected by clap".into()), - } -} - -// SAFETY: -// 1. The `--server-socket-fd` option is only intended for use in tests, not in production. -// 2. This must be the only call to from_raw_fd for this file descriptor, to ensure it has -// only one owner. -fn create_listener(server_socket_fd: Option) -> Result, BoxError> { - server_socket_fd - .map(|fd| { - let listener = unsafe { TcpListener::from_raw_fd(fd) }; - if listener.local_addr().is_ok() { - info!("adopting fd {fd} as listening socket"); - Ok(listener) - } else { - Err(BoxError::from(format!("the server was asked to listen on fd {fd}, but it does not appear to be a valid socket"))) - } - }) - .transpose() -} - -async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), BoxError> { - let my_identity = HelperIdentity::try_from(args.identity.expect("enforced by clap")).unwrap(); - let shard_index = ShardIndex::from(args.shard_index.expect("enforced by clap")); - let shard_count = ShardIndex::from(args.shard_count.expect("enforced by clap")); - assert!(shard_index < shard_count); - - let (identity, server_tls) = - create_client_identity(my_identity, args.tls_cert.clone(), args.tls_key.clone())?; - let (shard_identity, shard_server_tls) = - create_client_identity(shard_index, args.tls_cert, args.tls_key)?; + (None, None) => (ClientIdentity::Header(my_identity), None), + _ => panic!("should have been rejected by clap"), + }; let mk_encryption = args.mk_private_key.map(|sk_path| HpkeServerConfig::File { private_key_file: sk_path, @@ -204,13 +149,6 @@ async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), B port: args.port, disable_https: args.disable_https, tls: server_tls, - hpke_config: mk_encryption.clone(), - }; - - let shard_server_config = ServerConfig { - port: args.shard_port, - disable_https: args.disable_https, - tls: shard_server_tls, hpke_config: mk_encryption, }; @@ -219,48 +157,60 @@ async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), B } else { Scheme::HTTPS }; - let network_config_path = args.network.as_deref().unwrap(); - let network_config_string = &fs::read_to_string(network_config_path)?; - let (mut mpc_network, mut shard_network) = - sharded_server_from_toml_str(network_config_string, my_identity, shard_index, shard_count)?; - mpc_network = mpc_network.override_scheme(&scheme); - shard_network = shard_network.override_scheme(&scheme); + let network_config = NetworkConfig::from_toml_str(&fs::read_to_string(network_config_path)?)? + .override_scheme(&scheme); + + // TODO: Following is just temporary until Shard Transport is actually used. + let shard_clients_config = network_config.client.clone(); + let shard_server_config = server_config.clone(); + // --- let http_runtime = new_http_runtime(&logging_handle); let clients = IpaHttpClient::from_conf( &IpaRuntime::from_tokio_runtime(&http_runtime), - &mpc_network, + &network_config, &identity, ); let (transport, server) = MpcHttpTransport::new( IpaRuntime::from_tokio_runtime(&http_runtime), my_identity, server_config, - mpc_network, + network_config, &clients, Some(handler), ); - let shard_clients = IpaHttpClient::::shards_from_conf( - &IpaRuntime::from_tokio_runtime(&http_runtime), - &shard_network, - &shard_identity, - ); - let (shard_transport, shard_server) = ShardHttpTransport::new( + // TODO: Following is just temporary until Shard Transport is actually used. + let shard_network_config = NetworkConfig::new_shards(vec![], shard_clients_config); + let (shard_transport, _shard_server) = ShardHttpTransport::new( IpaRuntime::from_tokio_runtime(&http_runtime), - shard_index, - shard_count, + ShardIndex::FIRST, + ShardIndex::from(1), shard_server_config, - shard_network, - shard_clients, + shard_network_config, + vec![], Some(shard_handler), ); + // --- let _app = setup.connect(transport.clone(), shard_transport.clone()); - let listener = create_listener(args.server_socket_fd)?; - let shard_listener = create_listener(args.shard_server_socket_fd)?; + let listener = args.server_socket_fd + .map(|fd| { + // SAFETY: + // 1. The `--server-socket-fd` option is only intended for use in tests, not in production. + // 2. This must be the only call to from_raw_fd for this file descriptor, to ensure it has + // only one owner. + let listener = unsafe { TcpListener::from_raw_fd(fd) }; + if listener.local_addr().is_ok() { + info!("adopting fd {fd} as listening socket"); + Ok(listener) + } else { + Err(BoxError::from(format!("the server was asked to listen on fd {fd}, but it does not appear to be a valid socket"))) + } + }) + .transpose()?; let (_addr, server_handle) = server .start_on( @@ -270,17 +220,8 @@ async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), B None as Option<()>, ) .await; - let (_saddr, shard_server_handle) = shard_server - .start_on( - &IpaRuntime::from_tokio_runtime(&http_runtime), - shard_listener, - // TODO, trace based on the content of the query. - None as Option<()>, - ) - .await; - - join(server_handle, shard_server_handle).await; + server_handle.await; [query_runtime, http_runtime].map(Runtime::shutdown_background); Ok(()) diff --git a/ipa-core/src/config.rs b/ipa-core/src/config.rs index 408213b3e..49384b814 100644 --- a/ipa-core/src/config.rs +++ b/ipa-core/src/config.rs @@ -16,7 +16,7 @@ use tokio::fs; use crate::{ error::BoxError, - helpers::{HelperIdentity, TransportIdentity}, + helpers::HelperIdentity, hpke::{ Deserializable as _, IpaPrivateKey, IpaPublicKey, KeyRegistry, PrivateKeyOnly, PublicKeyOnly, Serializable as _, @@ -32,10 +32,8 @@ pub type OwnedPrivateKey = PrivateKeyDer<'static>; pub enum Error { #[error(transparent)] ParseError(#[from] config::ConfigError), - #[error("Invalid uri: {0}")] + #[error("invalid uri: {0}")] InvalidUri(#[from] hyper::http::uri::InvalidUri), - #[error("Invalid network size {0}")] - InvalidNetworkSize(usize), #[error(transparent)] IOError(#[from] std::io::Error), } @@ -119,88 +117,6 @@ impl NetworkConfig { } } -/// Reads a the config for a specific, single, sharded server from string. Expects config to be -/// toml format. The server in the network is specified via `id`, `shard_index` and -/// `shard_count`. -/// -/// First we read the configuration without assigning any identities. The number of peers in the -/// configuration must be a multiple of 6, or 3 as a special case to support older, non-sharded -/// configurations. -/// -/// If there are 3 entries, we assign helper identities for them. We create a dummy sharded -/// configuration. -/// -/// If there are any multiple of 6 peers, then peer assignment is as follows: -/// By rings (to be reminiscent of the previous config). The first 6 entries corresponds to the -/// leaders Ring. H1 shard 0, H2, shard 0, and H3 shard 0. The next 6 correspond increases the -/// shard index by one. -/// -/// Other methods to read the network.toml exist depending on the use, for example -/// [`NetworkConfig::from_toml_str`] reads a non-sharded config. -/// TODO: There will be one to read the information relevant for the RC (doesn't need shard -/// info) -/// -/// # Errors -/// if `input` is in an invalid format -pub fn sharded_server_from_toml_str( - input: &str, - id: HelperIdentity, - shard_index: ShardIndex, - shard_count: ShardIndex, -) -> Result<(NetworkConfig, NetworkConfig), Error> { - use config::{Config, File, FileFormat}; - - let all_network: NetworkConfig = Config::builder() - .add_source(File::from_str(input, FileFormat::Toml)) - .build()? - .try_deserialize()?; - - let ix: usize = shard_index.as_index(); - let ix_count: usize = shard_count.as_index(); - let mpc_id: usize = id.as_index(); - - let total_peers = all_network.peers.len(); - if total_peers == 3 { - let mpc_network = NetworkConfig { - peers: all_network.peers.clone(), - client: all_network.client.clone(), - identities: HelperIdentity::make_three().to_vec(), - }; - let shard_network = NetworkConfig { - peers: vec![all_network.peers[mpc_id].clone()], - client: all_network.client, - identities: vec![ShardIndex(0)], - }; - Ok((mpc_network, shard_network)) - } else if total_peers > 0 && total_peers % 6 == 0 { - let mpc_network = NetworkConfig { - peers: all_network - .peers - .clone() - .into_iter() - .skip(ix * 6) - .take(3) - .collect(), - client: all_network.client.clone(), - identities: HelperIdentity::make_three().to_vec(), - }; - let shard_network = NetworkConfig { - peers: all_network - .peers - .into_iter() - .skip(3 + mpc_id) - .step_by(6) - .take(ix_count) - .collect(), - client: all_network.client, - identities: shard_count.iter().collect(), - }; - Ok((mpc_network, shard_network)) - } else { - Err(Error::InvalidNetworkSize(total_peers)) - } -} - impl NetworkConfig { /// # Panics /// In the unexpected case there are more than max usize shards. diff --git a/ipa-core/tests/common/mod.rs b/ipa-core/tests/common/mod.rs index be582537c..ca1d5e08a 100644 --- a/ipa-core/tests/common/mod.rs +++ b/ipa-core/tests/common/mod.rs @@ -109,9 +109,9 @@ impl CommandExt for Command { } } -fn test_setup(config_path: &Path) -> [TcpListener; 6] { - let sockets: [_; 6] = array::from_fn(|_| TcpListener::bind("127.0.0.1:0").unwrap()); - let ports: [u16; 6] = sockets +fn test_setup(config_path: &Path) -> [TcpListener; 3] { + let sockets: [_; 3] = array::from_fn(|_| TcpListener::bind("127.0.0.1:0").unwrap()); + let ports: [u16; 3] = sockets .each_ref() .map(|sock| sock.local_addr().unwrap().port()); @@ -121,7 +121,7 @@ fn test_setup(config_path: &Path) -> [TcpListener; 6] { .arg("test-setup") .args(["--output-dir".as_ref(), config_path.as_os_str()]) .arg("--ports") - .args(ports.chunks(2).map(|p| p[0].to_string())); + .args(ports.map(|p| p.to_string())); command.status().unwrap_status(); sockets @@ -129,10 +129,10 @@ fn test_setup(config_path: &Path) -> [TcpListener; 6] { pub fn spawn_helpers( config_path: &Path, - sockets: &[TcpListener; 6], + sockets: &[TcpListener; 3], https: bool, ) -> Vec { - zip([1, 2, 3], sockets.chunks(2)) + zip([1, 2, 3], sockets) .map(|(id, socket)| { let mut command = Command::new(HELPER_BIN); command @@ -156,13 +156,8 @@ pub fn spawn_helpers( command.arg("--disable-https"); } - command.preserved_fds(vec![socket[0].as_raw_fd()]); - command.args(["--server-socket-fd", &socket[0].as_raw_fd().to_string()]); - command.preserved_fds(vec![socket[1].as_raw_fd()]); - command.args([ - "--shard-server-socket-fd", - &socket[1].as_raw_fd().to_string(), - ]); + command.preserved_fds(vec![socket.as_raw_fd()]); + command.args(["--server-socket-fd", &socket.as_raw_fd().to_string()]); // something went wrong if command is terminated at this point. let mut child = command.spawn().unwrap(); diff --git a/ipa-core/tests/helper_networks.rs b/ipa-core/tests/helper_networks.rs index 4eb59a38e..7775ffba4 100644 --- a/ipa-core/tests/helper_networks.rs +++ b/ipa-core/tests/helper_networks.rs @@ -71,8 +71,8 @@ fn keygen_confgen() { let dir = TempDir::new_delete_on_drop(); let path = dir.path(); - let sockets: [_; 6] = array::from_fn(|_| TcpListener::bind("127.0.0.1:0").unwrap()); - let ports: [u16; 6] = sockets + let sockets: [_; 3] = array::from_fn(|_| TcpListener::bind("127.0.0.1:0").unwrap()); + let ports: [u16; 3] = sockets .each_ref() .map(|sock| sock.local_addr().unwrap().port()); @@ -85,7 +85,7 @@ fn keygen_confgen() { .args(["--output-dir".as_ref(), path.as_os_str()]) .args(["--keys-dir".as_ref(), path.as_os_str()]) .arg("--ports") - .args(ports.chunks(2).map(|p| p[0].to_string())) + .args(ports.map(|p| p.to_string())) .arg("--hosts") .args(["localhost", "localhost", "localhost"]); if overwrite {