Skip to content

Commit

Permalink
Revert "Starting sharded helpers"
Browse files Browse the repository at this point in the history
This reverts commit a3d4097.
  • Loading branch information
cberkhoff committed Nov 16, 2024
1 parent 4ac03a4 commit 5904d98
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 209 deletions.
155 changes: 48 additions & 107 deletions ipa-core/src/bin/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,17 @@ use std::{
};

use clap::{self, Parser, Subcommand};
use futures::future::join;
use hyper::http::uri::Scheme;
use ipa_core::{
cli::{
client_config_setup, keygen, test_setup, ConfGenArgs, KeygenArgs, LoggingHandle,
TestSetupArgs, Verbosity,
},
config::{
hpke_registry, sharded_server_from_toml_str, HpkeServerConfig, ServerConfig, TlsConfig,
},
config::{hpke_registry, HpkeServerConfig, NetworkConfig, ServerConfig, TlsConfig},
error::BoxError,
executor::IpaRuntime,
helpers::HelperIdentity,
net::{
ClientIdentity, ConnectionFlavor, IpaHttpClient, MpcHttpTransport, Shard,
ShardHttpTransport,
},
net::{ClientIdentity, IpaHttpClient, MpcHttpTransport, ShardHttpTransport},
sharding::ShardIndex,
AppConfig, AppSetup, NonZeroU32PowerOfTwo,
};
Expand Down Expand Up @@ -61,31 +55,16 @@ struct ServerArgs {
#[arg(short, long, required = true)]
identity: Option<usize>,

#[arg(default_value = "0")]
shard_index: Option<u32>,

#[arg(default_value = "1")]
shard_count: Option<u32>,

/// Port to listen on
#[arg(short, long, default_value = "3000")]
port: Option<u16>,

#[arg(default_value = "6000")]
shard_port: Option<u16>,

/// Use the supplied prebound socket instead of binding a new socket for mpc
/// Use the supplied prebound socket instead of binding a new socket
///
/// This is only intended for avoiding port conflicts in tests.
#[arg(hide = true, long)]
server_socket_fd: Option<RawFd>,

/// Use the supplied prebound socket instead of binding a new socket for shard server
///
/// This is only intended for avoiding port conflicts in tests.
#[arg(hide = true, long)]
shard_server_socket_fd: Option<RawFd>,

/// Use insecure HTTP
#[arg(short = 'k', long)]
disable_https: bool,
Expand All @@ -94,7 +73,7 @@ struct ServerArgs {
#[arg(long, required = true)]
network: Option<PathBuf>,

/// TLS certificate for helper-to-helper and shard-to-shard communication
/// TLS certificate for helper-to-helper communication
#[arg(
long,
visible_alias("cert"),
Expand All @@ -103,7 +82,7 @@ struct ServerArgs {
)]
tls_cert: Option<PathBuf>,

/// TLS key for helper-to-helper and shard-to-shard communication
/// TLS key for helper-to-helper communication
#[arg(long, visible_alias("key"), requires = "tls_cert")]
tls_key: Option<PathBuf>,

Expand Down Expand Up @@ -135,58 +114,24 @@ fn read_file(path: &Path) -> Result<BufReader<fs::File>, BoxError> {
.map_err(|e| format!("failed to open file {}: {e:?}", path.display()))?)
}

/// Helper function that creates the client identity; either with certificates if they are provided
/// or just with headers otherwise. This works both for sharded and helper configs.
fn create_client_identity<F: ConnectionFlavor>(
id: F::Identity,
tls_cert: Option<PathBuf>,
tls_key: Option<PathBuf>,
) -> Result<(ClientIdentity<F>, Option<TlsConfig>), BoxError> {
match (tls_cert, tls_key) {
async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), BoxError> {
let my_identity = HelperIdentity::try_from(args.identity.expect("enforced by clap")).unwrap();

Check warning on line 118 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L117-L118

Added lines #L117 - L118 were not covered by tests

let (identity, server_tls) = match (args.tls_cert, args.tls_key) {

Check warning on line 120 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L120

Added line #L120 was not covered by tests
(Some(cert_file), Some(key_file)) => {
let mut key = read_file(&key_file)?;
let mut certs = read_file(&cert_file)?;
Ok((
ClientIdentity::<F>::from_pkcs8(&mut certs, &mut key)?,
(
ClientIdentity::from_pkcs8(&mut certs, &mut key)?,

Check warning on line 125 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L125

Added line #L125 was not covered by tests
Some(TlsConfig::File {
certificate_file: cert_file,
private_key_file: key_file,
}),
))
)
}
(None, None) => Ok((ClientIdentity::Header(id), None)),
_ => Err("should have been rejected by clap".into()),
}
}

// SAFETY:
// 1. The `--server-socket-fd` option is only intended for use in tests, not in production.
// 2. This must be the only call to from_raw_fd for this file descriptor, to ensure it has
// only one owner.
fn create_listener(server_socket_fd: Option<RawFd>) -> Result<Option<TcpListener>, BoxError> {
server_socket_fd
.map(|fd| {
let listener = unsafe { TcpListener::from_raw_fd(fd) };
if listener.local_addr().is_ok() {
info!("adopting fd {fd} as listening socket");
Ok(listener)
} else {
Err(BoxError::from(format!("the server was asked to listen on fd {fd}, but it does not appear to be a valid socket")))
}
})
.transpose()
}

async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), BoxError> {
let my_identity = HelperIdentity::try_from(args.identity.expect("enforced by clap")).unwrap();
let shard_index = ShardIndex::from(args.shard_index.expect("enforced by clap"));
let shard_count = ShardIndex::from(args.shard_count.expect("enforced by clap"));
assert!(shard_index < shard_count);

let (identity, server_tls) =
create_client_identity(my_identity, args.tls_cert.clone(), args.tls_key.clone())?;
let (shard_identity, shard_server_tls) =
create_client_identity(shard_index, args.tls_cert, args.tls_key)?;
(None, None) => (ClientIdentity::Header(my_identity), None),
_ => panic!("should have been rejected by clap"),

Check warning on line 133 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L132-L133

Added lines #L132 - L133 were not covered by tests
};

let mk_encryption = args.mk_private_key.map(|sk_path| HpkeServerConfig::File {
private_key_file: sk_path,
Expand All @@ -204,13 +149,6 @@ async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), B
port: args.port,
disable_https: args.disable_https,
tls: server_tls,
hpke_config: mk_encryption.clone(),
};

let shard_server_config = ServerConfig {
port: args.shard_port,
disable_https: args.disable_https,
tls: shard_server_tls,
hpke_config: mk_encryption,
};

Expand All @@ -219,48 +157,60 @@ async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), B
} else {
Scheme::HTTPS
};

let network_config_path = args.network.as_deref().unwrap();
let network_config_string = &fs::read_to_string(network_config_path)?;
let (mut mpc_network, mut shard_network) =
sharded_server_from_toml_str(network_config_string, my_identity, shard_index, shard_count)?;
mpc_network = mpc_network.override_scheme(&scheme);
shard_network = shard_network.override_scheme(&scheme);
let network_config = NetworkConfig::from_toml_str(&fs::read_to_string(network_config_path)?)?
.override_scheme(&scheme);

// TODO: Following is just temporary until Shard Transport is actually used.
let shard_clients_config = network_config.client.clone();
let shard_server_config = server_config.clone();
// ---

Check warning on line 167 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L161-L167

Added lines #L161 - L167 were not covered by tests

let http_runtime = new_http_runtime(&logging_handle);
let clients = IpaHttpClient::from_conf(
&IpaRuntime::from_tokio_runtime(&http_runtime),
&mpc_network,
&network_config,

Check warning on line 172 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L172

Added line #L172 was not covered by tests
&identity,
);
let (transport, server) = MpcHttpTransport::new(
IpaRuntime::from_tokio_runtime(&http_runtime),
my_identity,
server_config,
mpc_network,
network_config,

Check warning on line 179 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L179

Added line #L179 was not covered by tests
&clients,
Some(handler),
);

let shard_clients = IpaHttpClient::<Shard>::shards_from_conf(
&IpaRuntime::from_tokio_runtime(&http_runtime),
&shard_network,
&shard_identity,
);
let (shard_transport, shard_server) = ShardHttpTransport::new(
// TODO: Following is just temporary until Shard Transport is actually used.
let shard_network_config = NetworkConfig::new_shards(vec![], shard_clients_config);
let (shard_transport, _shard_server) = ShardHttpTransport::new(

Check warning on line 186 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L184-L186

Added lines #L184 - L186 were not covered by tests
IpaRuntime::from_tokio_runtime(&http_runtime),
shard_index,
shard_count,
ShardIndex::FIRST,
ShardIndex::from(1),

Check warning on line 189 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L188-L189

Added lines #L188 - L189 were not covered by tests
shard_server_config,
shard_network,
shard_clients,
shard_network_config,
vec![],

Check warning on line 192 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L191-L192

Added lines #L191 - L192 were not covered by tests
Some(shard_handler),
);
// ---

Check warning on line 195 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L195

Added line #L195 was not covered by tests

let _app = setup.connect(transport.clone(), shard_transport.clone());

let listener = create_listener(args.server_socket_fd)?;
let shard_listener = create_listener(args.shard_server_socket_fd)?;
let listener = args.server_socket_fd
.map(|fd| {
// SAFETY:
// 1. The `--server-socket-fd` option is only intended for use in tests, not in production.
// 2. This must be the only call to from_raw_fd for this file descriptor, to ensure it has
// only one owner.
let listener = unsafe { TcpListener::from_raw_fd(fd) };
if listener.local_addr().is_ok() {
info!("adopting fd {fd} as listening socket");
Ok(listener)

Check warning on line 208 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L199-L208

Added lines #L199 - L208 were not covered by tests
} else {
Err(BoxError::from(format!("the server was asked to listen on fd {fd}, but it does not appear to be a valid socket")))

Check warning on line 210 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L210

Added line #L210 was not covered by tests
}
})
.transpose()?;

Check warning on line 213 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L212-L213

Added lines #L212 - L213 were not covered by tests

let (_addr, server_handle) = server
.start_on(
Expand All @@ -270,17 +220,8 @@ async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), B
None as Option<()>,
)
.await;
let (_saddr, shard_server_handle) = shard_server
.start_on(
&IpaRuntime::from_tokio_runtime(&http_runtime),
shard_listener,
// TODO, trace based on the content of the query.
None as Option<()>,
)
.await;

join(server_handle, shard_server_handle).await;

server_handle.await;

Check warning on line 224 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L224

Added line #L224 was not covered by tests
[query_runtime, http_runtime].map(Runtime::shutdown_background);

Ok(())
Expand Down
88 changes: 2 additions & 86 deletions ipa-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::fs;

use crate::{
error::BoxError,
helpers::{HelperIdentity, TransportIdentity},
helpers::HelperIdentity,
hpke::{
Deserializable as _, IpaPrivateKey, IpaPublicKey, KeyRegistry, PrivateKeyOnly,
PublicKeyOnly, Serializable as _,
Expand All @@ -32,10 +32,8 @@ pub type OwnedPrivateKey = PrivateKeyDer<'static>;
pub enum Error {
#[error(transparent)]
ParseError(#[from] config::ConfigError),
#[error("Invalid uri: {0}")]
#[error("invalid uri: {0}")]
InvalidUri(#[from] hyper::http::uri::InvalidUri),
#[error("Invalid network size {0}")]
InvalidNetworkSize(usize),
#[error(transparent)]
IOError(#[from] std::io::Error),
}
Expand Down Expand Up @@ -119,88 +117,6 @@ impl<F: ConnectionFlavor> NetworkConfig<F> {
}
}

/// Reads a the config for a specific, single, sharded server from string. Expects config to be
/// toml format. The server in the network is specified via `id`, `shard_index` and
/// `shard_count`.
///
/// First we read the configuration without assigning any identities. The number of peers in the
/// configuration must be a multiple of 6, or 3 as a special case to support older, non-sharded
/// configurations.
///
/// If there are 3 entries, we assign helper identities for them. We create a dummy sharded
/// configuration.
///
/// If there are any multiple of 6 peers, then peer assignment is as follows:
/// By rings (to be reminiscent of the previous config). The first 6 entries corresponds to the
/// leaders Ring. H1 shard 0, H2, shard 0, and H3 shard 0. The next 6 correspond increases the
/// shard index by one.
///
/// Other methods to read the network.toml exist depending on the use, for example
/// [`NetworkConfig::from_toml_str`] reads a non-sharded config.
/// TODO: There will be one to read the information relevant for the RC (doesn't need shard
/// info)
///
/// # Errors
/// if `input` is in an invalid format
pub fn sharded_server_from_toml_str(
input: &str,
id: HelperIdentity,
shard_index: ShardIndex,
shard_count: ShardIndex,
) -> Result<(NetworkConfig<Helper>, NetworkConfig<Shard>), Error> {
use config::{Config, File, FileFormat};

let all_network: NetworkConfig = Config::builder()
.add_source(File::from_str(input, FileFormat::Toml))
.build()?
.try_deserialize()?;

let ix: usize = shard_index.as_index();
let ix_count: usize = shard_count.as_index();
let mpc_id: usize = id.as_index();

let total_peers = all_network.peers.len();
if total_peers == 3 {
let mpc_network = NetworkConfig {
peers: all_network.peers.clone(),
client: all_network.client.clone(),
identities: HelperIdentity::make_three().to_vec(),
};
let shard_network = NetworkConfig {
peers: vec![all_network.peers[mpc_id].clone()],
client: all_network.client,
identities: vec![ShardIndex(0)],
};
Ok((mpc_network, shard_network))
} else if total_peers > 0 && total_peers % 6 == 0 {
let mpc_network = NetworkConfig {
peers: all_network
.peers
.clone()
.into_iter()
.skip(ix * 6)
.take(3)
.collect(),
client: all_network.client.clone(),
identities: HelperIdentity::make_three().to_vec(),
};
let shard_network = NetworkConfig {
peers: all_network
.peers
.into_iter()
.skip(3 + mpc_id)
.step_by(6)
.take(ix_count)
.collect(),
client: all_network.client,
identities: shard_count.iter().collect(),
};
Ok((mpc_network, shard_network))
} else {
Err(Error::InvalidNetworkSize(total_peers))
}
}

impl NetworkConfig<Shard> {
/// # Panics
/// In the unexpected case there are more than max usize shards.
Expand Down
Loading

0 comments on commit 5904d98

Please sign in to comment.