Skip to content

Commit

Permalink
Merge pull request #1449 from cberkhoff/scli2
Browse files Browse the repository at this point in the history
Adding sharding CLI params to helper
  • Loading branch information
cberkhoff authored Nov 21, 2024
2 parents fab480b + 8ea3cc7 commit 0270d5f
Show file tree
Hide file tree
Showing 7 changed files with 728 additions and 100 deletions.
162 changes: 114 additions & 48 deletions ipa-core/src/bin/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@ use std::{
};

use clap::{self, Parser, Subcommand};
use futures::future::join;
use hyper::http::uri::Scheme;
use ipa_core::{
cli::{
client_config_setup, keygen, test_setup, ConfGenArgs, KeygenArgs, LoggingHandle,
TestSetupArgs, Verbosity,
},
config::{hpke_registry, HpkeServerConfig, NetworkConfig, ServerConfig, TlsConfig},
config::{
hpke_registry, sharded_server_from_toml_str, HpkeServerConfig, ServerConfig, TlsConfig,
},
error::BoxError,
executor::IpaRuntime,
helpers::HelperIdentity,
net::{ClientIdentity, IpaHttpClient, MpcHttpTransport, ShardHttpTransport},
net::{
ClientIdentity, ConnectionFlavor, IpaHttpClient, MpcHttpTransport, Shard,
ShardHttpTransport,
},
sharding::ShardIndex,
AppConfig, AppSetup, NonZeroU32PowerOfTwo,
};
Expand Down Expand Up @@ -55,16 +61,32 @@ struct ServerArgs {
#[arg(short, long, required = true)]
identity: Option<usize>,

#[arg(default_value = "0")]
shard_index: Option<u32>,

#[arg(default_value = "1")]
shard_count: Option<u32>,

/// Port to listen on
#[arg(short, long, default_value = "3000")]
port: Option<u16>,

/// Use the supplied prebound socket instead of binding a new socket
/// Port to use for shard-to-shard communication, if sharded MPC is used
#[arg(default_value = "6000")]
shard_port: Option<u16>,

/// Use the supplied prebound socket instead of binding a new socket for mpc
///
/// This is only intended for avoiding port conflicts in tests.
#[arg(hide = true, long)]
server_socket_fd: Option<RawFd>,

/// Use the supplied prebound socket instead of binding a new socket for shard server
///
/// This is only intended for avoiding port conflicts in tests.
#[arg(hide = true, long)]
shard_server_socket_fd: Option<RawFd>,

/// Use insecure HTTP
#[arg(short = 'k', long)]
disable_https: bool,
Expand All @@ -73,7 +95,7 @@ struct ServerArgs {
#[arg(long, required = true)]
network: Option<PathBuf>,

/// TLS certificate for helper-to-helper communication
/// TLS certificate for helper-to-helper and shard-to-shard communication
#[arg(
long,
visible_alias("cert"),
Expand All @@ -82,7 +104,7 @@ struct ServerArgs {
)]
tls_cert: Option<PathBuf>,

/// TLS key for helper-to-helper communication
/// TLS key for helper-to-helper and shard-to-shard communication
#[arg(long, visible_alias("key"), requires = "tls_cert")]
tls_key: Option<PathBuf>,

Expand Down Expand Up @@ -114,24 +136,59 @@ fn read_file(path: &Path) -> Result<BufReader<fs::File>, BoxError> {
.map_err(|e| format!("failed to open file {}: {e:?}", path.display()))?)
}

async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), BoxError> {
let my_identity = HelperIdentity::try_from(args.identity.expect("enforced by clap")).unwrap();

let (identity, server_tls) = match (args.tls_cert, args.tls_key) {
/// Helper function that creates the client identity; either with certificates if they are provided
/// or just with headers otherwise. This works both for sharded and helper configs.
fn create_client_identity<F: ConnectionFlavor>(
id: F::Identity,
tls_cert: Option<PathBuf>,
tls_key: Option<PathBuf>,
) -> Result<(ClientIdentity<F>, Option<TlsConfig>), BoxError> {
match (tls_cert, tls_key) {
(Some(cert_file), Some(key_file)) => {
let mut key = read_file(&key_file)?;
let mut certs = read_file(&cert_file)?;
(
ClientIdentity::from_pkcs8(&mut certs, &mut key)?,
Ok((
ClientIdentity::<F>::from_pkcs8(&mut certs, &mut key)?,
Some(TlsConfig::File {
certificate_file: cert_file,
private_key_file: key_file,
}),
)
))
}
(None, None) => (ClientIdentity::Header(my_identity), None),
_ => panic!("should have been rejected by clap"),
};
(None, None) => Ok((ClientIdentity::Header(id), None)),
_ => Err("should have been rejected by clap".into()),
}
}

/// Creates a [`TcpListener`] from an optional raw file descriptor. Safety notes:
/// 1. The `--server-socket-fd` option is only intended for use in tests, not in production.
/// 2. This must be the only call to from_raw_fd for this file descriptor, to ensure it has
/// only one owner.
fn create_listener(server_socket_fd: Option<RawFd>) -> Result<Option<TcpListener>, BoxError> {
server_socket_fd
.map(|fd| {
let listener = unsafe { TcpListener::from_raw_fd(fd) };
if listener.local_addr().is_ok() {
info!("adopting fd {fd} as listening socket");
Ok(listener)
} else {
Err(BoxError::from(format!("the server was asked to listen on fd {fd}, but it does not appear to be a valid socket")))
}
})
.transpose()
}

async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), BoxError> {
let my_identity = HelperIdentity::try_from(args.identity.expect("enforced by clap")).unwrap();
let shard_index = ShardIndex::from(args.shard_index.expect("enforced by clap"));
let shard_count = ShardIndex::from(args.shard_count.expect("enforced by clap"));
assert!(shard_index < shard_count);
assert_eq!(args.tls_cert.is_some(), !args.disable_https);

let (identity, server_tls) =
create_client_identity(my_identity, args.tls_cert.clone(), args.tls_key.clone())?;
let (shard_identity, shard_server_tls) =
create_client_identity(shard_index, args.tls_cert, args.tls_key)?;

let mk_encryption = args.mk_private_key.map(|sk_path| HpkeServerConfig::File {
private_key_file: sk_path,
Expand All @@ -149,6 +206,13 @@ async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), B
port: args.port,
disable_https: args.disable_https,
tls: server_tls,
hpke_config: mk_encryption.clone(),
};

let shard_server_config = ServerConfig {
port: args.shard_port,
disable_https: args.disable_https,
tls: shard_server_tls,
hpke_config: mk_encryption,
};

Expand All @@ -157,60 +221,53 @@ async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), B
} else {
Scheme::HTTPS
};
let network_config_path = args.network.as_deref().unwrap();
let network_config = NetworkConfig::from_toml_str(&fs::read_to_string(network_config_path)?)?
.override_scheme(&scheme);

// TODO: Following is just temporary until Shard Transport is actually used.
let shard_clients_config = network_config.client.clone();
let shard_server_config = server_config.clone();
// ---
let network_config_path = args.network.as_deref().unwrap();
let network_config_string = &fs::read_to_string(network_config_path)?;
let (mut mpc_network, mut shard_network) = sharded_server_from_toml_str(
network_config_string,
my_identity,
shard_index,
shard_count,
args.shard_port,
)?;
mpc_network = mpc_network.override_scheme(&scheme);
shard_network = shard_network.override_scheme(&scheme);

let http_runtime = new_http_runtime(&logging_handle);
let clients = IpaHttpClient::from_conf(
&IpaRuntime::from_tokio_runtime(&http_runtime),
&network_config,
&mpc_network,
&identity,
);
let (transport, server) = MpcHttpTransport::new(
IpaRuntime::from_tokio_runtime(&http_runtime),
my_identity,
server_config,
network_config,
mpc_network,
&clients,
Some(handler),
);

// TODO: Following is just temporary until Shard Transport is actually used.
let shard_network_config = NetworkConfig::new_shards(vec![], shard_clients_config);
let (shard_transport, _shard_server) = ShardHttpTransport::new(
let shard_clients = IpaHttpClient::<Shard>::shards_from_conf(
&IpaRuntime::from_tokio_runtime(&http_runtime),
&shard_network,
&shard_identity,
);
let (shard_transport, shard_server) = ShardHttpTransport::new(
IpaRuntime::from_tokio_runtime(&http_runtime),
ShardIndex::FIRST,
ShardIndex::from(1),
shard_index,
shard_count,
shard_server_config,
shard_network_config,
vec![],
shard_network,
shard_clients,
Some(shard_handler),
);
// ---

let _app = setup.connect(transport.clone(), shard_transport.clone());

let listener = args.server_socket_fd
.map(|fd| {
// SAFETY:
// 1. The `--server-socket-fd` option is only intended for use in tests, not in production.
// 2. This must be the only call to from_raw_fd for this file descriptor, to ensure it has
// only one owner.
let listener = unsafe { TcpListener::from_raw_fd(fd) };
if listener.local_addr().is_ok() {
info!("adopting fd {fd} as listening socket");
Ok(listener)
} else {
Err(BoxError::from(format!("the server was asked to listen on fd {fd}, but it does not appear to be a valid socket")))
}
})
.transpose()?;
let listener = create_listener(args.server_socket_fd)?;
let shard_listener = create_listener(args.shard_server_socket_fd)?;

let (_addr, server_handle) = server
.start_on(
Expand All @@ -220,8 +277,17 @@ async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), B
None as Option<()>,
)
.await;
let (_saddr, shard_server_handle) = shard_server
.start_on(
&IpaRuntime::from_tokio_runtime(&http_runtime),
shard_listener,
// TODO, trace based on the content of the query.
None as Option<()>,
)
.await;

join(server_handle, shard_server_handle).await;

server_handle.await;
[query_runtime, http_runtime].map(Runtime::shutdown_background);

Ok(())
Expand Down
17 changes: 15 additions & 2 deletions ipa-core/src/cli/clientconf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub struct ConfGenArgs {
#[arg(short, long, num_args = 3, value_name = "PORT", default_values = vec!["3000", "3001", "3002"])]
ports: Vec<u16>,

#[arg(short, long, num_args = 3, value_name = "SHARD_PORTS", default_values = vec!["6000", "6001", "6002"])]
shard_ports: Vec<u16>,

#[arg(long, num_args = 3, default_values = vec!["localhost", "localhost", "localhost"])]
hosts: Vec<String>,

Expand Down Expand Up @@ -54,13 +57,14 @@ pub struct ConfGenArgs {
/// [`ConfGenArgs`]: ConfGenArgs
/// [`Paths`]: crate::cli::paths::PathExt
pub fn setup(args: ConfGenArgs) -> Result<(), BoxError> {
let clients_conf: [_; 3] = zip(args.hosts.iter(), args.ports)
let clients_conf: [_; 3] = zip(args.hosts.iter(), zip(args.ports, args.shard_ports))
.enumerate()
.map(|(id, (host, port))| {
.map(|(id, (host, (port, shard_port)))| {
let id: u8 = u8::try_from(id).unwrap() + 1;
HelperClientConf {
host,
port,
shard_port,
tls_cert_file: args.keys_dir.helper_tls_cert(id),
mk_public_key_file: args.keys_dir.helper_mk_public_key(id),
}
Expand Down Expand Up @@ -96,6 +100,7 @@ pub fn setup(args: ConfGenArgs) -> Result<(), BoxError> {
pub struct HelperClientConf<'a> {
pub(crate) host: &'a str,
pub(crate) port: u16,
pub(crate) shard_port: u16,
pub(crate) tls_cert_file: PathBuf,
pub(crate) mk_public_key_file: PathBuf,
}
Expand Down Expand Up @@ -133,6 +138,14 @@ pub fn gen_client_config<'a>(
port = client_conf.port
)),
);
peer.insert(
String::from("shard_url"),
Value::String(format!(
"{host}:{port}",
host = client_conf.host,
port = client_conf.shard_port
)),
);
peer.insert(String::from("certificate"), Value::String(certificate));
peer.insert(
String::from("hpke"),
Expand Down
8 changes: 6 additions & 2 deletions ipa-core/src/cli/test_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub struct TestSetupArgs {

#[arg(short, long, num_args = 3, value_name = "PORT", default_values = vec!["3000", "3001", "3002"])]
ports: Vec<u16>,

#[arg(short, long, num_args = 3, value_name = "SHARD_PORT", default_values = vec!["6000", "6001", "6002"])]
shard_ports: Vec<u16>,
}

/// Prepare a test network of three helpers.
Expand All @@ -56,8 +59,8 @@ pub fn test_setup(args: TestSetupArgs) -> Result<(), BoxError> {

let localhost = String::from("localhost");

let clients_config: [_; 3] = zip([1, 2, 3], args.ports)
.map(|(id, port)| {
let clients_config: [_; 3] = zip([1, 2, 3], zip(args.ports, args.shard_ports))
.map(|(id, (port, shard_port))| {
let keygen_args = KeygenArgs {
name: localhost.clone(),
tls_cert: args.output_dir.helper_tls_cert(id),
Expand All @@ -72,6 +75,7 @@ pub fn test_setup(args: TestSetupArgs) -> Result<(), BoxError> {
Ok(HelperClientConf {
host: &localhost,
port,
shard_port,
tls_cert_file: keygen_args.tls_cert,
mk_public_key_file: keygen_args.mk_public_key,
})
Expand Down
Loading

0 comments on commit 0270d5f

Please sign in to comment.