Skip to content

Commit

Permalink
Hybrid integration (complete)
Browse files Browse the repository at this point in the history
* tmp commit: failing

* fix panic from encrypt-hybrid command

* progress

* distinct filenames for in_the_clear and mpc output

* temporary printlns for debugging

* make sure to call .wait on command

* fix os error with config setup

* add query_type lookup

* add plumbing for starting a hybrid query

* make helpers not silent for debugging

* integrate with RoundRobinSubmission, in progress

* Per shard submission from report collector

* update HistogramValue type to be consistent across helper and report collector

* add hybrid_test assert against in the clear results

* update comment

* Make hybrid run with compact gate

* Add required features for Hybrid integration tests

* update pre-commit and github check.yaml to use correct flags for hybrid test

* no default features for hybrid

* check.yaml typo

* increase STEP_COUNT_LIMIT

* lower step count limit

* Update ipa-core/src/bin/report_collector.rs

Co-authored-by: Alex Koshelev <[email protected]>

---------

Co-authored-by: Alex Koshelev <[email protected]>
Co-authored-by: Alex Koshelev <[email protected]>
  • Loading branch information
3 people authored Dec 6, 2024
1 parent cf4ef60 commit fcfdfa6
Show file tree
Hide file tree
Showing 17 changed files with 185 additions and 70 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ jobs:
run: cargo test --release --test "helper_networks" --no-default-features --features "cli web-app real-world-infra test-fixture compact-gate"

- name: Integration Tests - Hybrid
run: cargo test --release --test "hybrid" --features "cli test-fixture"
run: cargo test --release --test "hybrid" --no-default-features --features "cli compact-gate web-app real-world-infra test-fixture relaxed-dp"

- name: Integration Tests - IPA with Relaxed DP
run: cargo test --release --test "ipa_with_relaxed_dp" --no-default-features --features "cli web-app real-world-infra test-fixture compact-gate relaxed-dp"
Expand Down
6 changes: 5 additions & 1 deletion ipa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ required-features = [
[[test]]
name = "hybrid"
required-features = [
"test-fixture",
"cli",
"compact-gate",
"web-app",
"real-world-infra",
"test-fixture",
"relaxed-dp",
]
69 changes: 54 additions & 15 deletions ipa-core/src/bin/report_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
fmt::Debug,
fs::{File, OpenOptions},
io,
io::{stdout, Write},
io::{stdout, BufReader, Write},
ops::Deref,
path::{Path, PathBuf},
};
Expand All @@ -16,13 +16,20 @@ use ipa_core::{
playbook::{
make_clients, make_sharded_clients, playbook_oprf_ipa, run_hybrid_query_and_validate,
run_query_and_validate, validate, validate_dp, HybridQueryResult, InputSource,
RoundRobinSubmission, StreamingSubmission,
},
CsvSerializer, IpaQueryResult, Verbosity,
},
config::{KeyRegistries, NetworkConfig},
ff::{boolean_array::BA32, FieldType},
helpers::query::{
DpMechanism, HybridQueryParams, IpaQueryConfig, QueryConfig, QuerySize, QueryType,
ff::{
boolean_array::{BA16, BA32},
FieldType,
},
helpers::{
query::{
DpMechanism, HybridQueryParams, IpaQueryConfig, QueryConfig, QuerySize, QueryType,
},
BodyStream,
},
net::{Helper, IpaHttpClient},
report::{EncryptedOprfReportStreams, DEFAULT_KEY_ID},
Expand Down Expand Up @@ -143,6 +150,10 @@ enum ReportCollectorCommand {

#[clap(flatten)]
hybrid_query_config: HybridQueryParams,

/// Number of records to aggregate
#[clap(long, short = 'n')]
count: u32,
},
}

Expand Down Expand Up @@ -255,7 +266,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
ReportCollectorCommand::MaliciousHybrid {
ref encrypted_inputs,
hybrid_query_config,
} => hybrid(&args, hybrid_query_config, clients, encrypted_inputs).await?,
count,
} => {
hybrid(
&args,
hybrid_query_config,
clients,
encrypted_inputs,
count.try_into().expect("u32 should fit into usize"),
)
.await?
}
};

Ok(())
Expand Down Expand Up @@ -402,20 +423,37 @@ async fn hybrid(
hybrid_query_config: HybridQueryParams,
helper_clients: Vec<[IpaHttpClient<Helper>; 3]>,
encrypted_inputs: &EncryptedInputs,
count: usize,
) -> Result<(), Box<dyn Error>> {
let query_type = QueryType::MaliciousHybrid(hybrid_query_config);

let files = [
let [h1_streams, h2_streams, h3_streams] = [
&encrypted_inputs.enc_input_file1,
&encrypted_inputs.enc_input_file2,
&encrypted_inputs.enc_input_file3,
];

// despite the name, this is generic enough to work with hybrid
let encrypted_report_streams = EncryptedOprfReportStreams::from(files);
]
.map(|path| {
let file = File::open(path).unwrap_or_else(|e| panic!("unable to open file {path:?}. {e}"));
RoundRobinSubmission::new(BufReader::new(file))
})
.map(|s| s.into_byte_streams(args.shard_count));

// create byte streams for each shard
let submissions = h1_streams
.into_iter()
.zip(h2_streams.into_iter())
.zip(h3_streams.into_iter())
.map(|((s1, s2), s3)| {
[
BodyStream::from_bytes_stream(s1),
BodyStream::from_bytes_stream(s2),
BodyStream::from_bytes_stream(s3),
]
})
.collect::<Vec<_>>();

let query_config = QueryConfig {
size: QuerySize::try_from(encrypted_report_streams.query_size).unwrap(),
size: QuerySize::try_from(count).unwrap(),
field_type: FieldType::Fp32BitPrime,
query_type,
};
Expand All @@ -426,12 +464,13 @@ async fn hybrid(
.expect("Unable to create query!");

tracing::info!("Starting query for OPRF");
// the value for histogram values (BA32) must be kept in sync with the server-side

// the value for histogram values (BA16) must be kept in sync with the server-side
// implementation, otherwise a runtime reconstruct error will be generated.
// see ipa-core/src/query/executor.rs
let actual = run_hybrid_query_and_validate::<BA32>(
encrypted_report_streams.streams,
encrypted_report_streams.query_size,
let actual = run_hybrid_query_and_validate::<BA16>(
submissions,
count,
helper_clients,
query_id,
hybrid_query_config,
Expand Down
8 changes: 4 additions & 4 deletions ipa-core/src/cli/crypto/hybrid_encrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ impl HybridEncryptArgs {
let mut key_registries = KeyRegistries::default();

let network =
NetworkConfig::from_toml_str(&read_to_string(&self.network).unwrap_or_else(|e| {
panic!("Failed to open network file: {:?}. {}", &self.network, e)
}))
NetworkConfig::from_toml_str_sharded(&read_to_string(&self.network).unwrap_or_else(
|e| panic!("Failed to open network file: {:?}. {}", &self.network, e),
))
.unwrap_or_else(|e| {
panic!(
"Failed to parse network file into toml: {:?}. {}",
&self.network, e
)
});
let Some(key_registries) = key_registries.init_from(&network) else {
let Some(key_registries) = key_registries.init_from(&network[0]) else {
panic!("could not load network file")
};

Expand Down
35 changes: 19 additions & 16 deletions ipa-core/src/cli/playbook/hybrid.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![cfg(all(feature = "web-app", feature = "cli"))]
use std::{
cmp::min,
iter::zip,
time::{Duration, Instant},
};

Expand All @@ -25,7 +26,7 @@ use crate::{
/// if results are invalid
#[allow(clippy::disallowed_methods)] // allow try_join_all
pub async fn run_hybrid_query_and_validate<HV>(
inputs: [BodyStream; 3],
inputs: Vec<[BodyStream; 3]>,
query_size: usize,
clients: Vec<[IpaHttpClient<Helper>; 3]>,
query_id: QueryId,
Expand All @@ -36,28 +37,30 @@ where
AdditiveShare<HV>: Serializable,
{
let mpc_time = Instant::now();

// for now, submit everything to the leader. TODO: round robin submission
let leader_clients = &clients[0];
try_join_all(
inputs
.into_iter()
.zip(leader_clients)
.map(|(input_stream, client)| {
client.query_input(QueryInput {
query_id,
input_stream,
})
}),
)
assert_eq!(clients.len(), inputs.len());
// submit inputs to each shard
let _ = try_join_all(zip(clients.iter(), inputs.into_iter()).map(
|(shard_clients, shard_inputs)| {
try_join_all(shard_clients.iter().zip(shard_inputs.into_iter()).map(
|(client, input)| {
client.query_input(QueryInput {
query_id,
input_stream: input,
})
},
))
},
))
.await
.unwrap();

let leader_clients = &clients[0];

let mut delay = Duration::from_millis(125);
loop {
if try_join_all(
leader_clients
.iter()
.each_ref()
.map(|client| client.query_status(query_id)),
)
.await
Expand Down
1 change: 1 addition & 0 deletions ipa-core/src/cli/playbook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tokio::time::sleep;
pub use self::{
hybrid::{run_hybrid_query_and_validate, HybridQueryResult},
ipa::{playbook_oprf_ipa, run_query_and_validate},
streaming::{RoundRobinSubmission, StreamingSubmission},
};
use crate::{
cli::config_parse::HelperNetworkConfigParseExt,
Expand Down
4 changes: 2 additions & 2 deletions ipa-core/src/cli/playbook/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{

/// Trait for submitting inputs as streams, rather than reading everything
/// in memory. Should provide better performance for very large inputs.
trait StreamingSubmission {
pub trait StreamingSubmission {
/// Spits itself into `count` instances of [`BytesStream`].
fn into_byte_streams(self, count: usize) -> Vec<impl BytesStream>;
}
Expand All @@ -25,7 +25,7 @@ trait StreamingSubmission {
/// and delimited by newlines. The output streams will have
/// run-length encoding, meaning that each element will have
/// a 2 byte length prefix added to it.
struct RoundRobinSubmission<R>(R);
pub struct RoundRobinSubmission<R>(R);

impl<R: BufRead> RoundRobinSubmission<R> {
pub fn new(read_from: R) -> Self {
Expand Down
4 changes: 4 additions & 0 deletions ipa-core/src/net/http_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ pub mod query {
let Query(q) = req.extract().await?;
Ok(QueryType::MaliciousOprfIpa(q))
}
QueryType::MALICIOUS_HYBRID_STR => {
let Query(q) = req.extract().await?;
Ok(QueryType::MaliciousHybrid(q))
}
other => Err(Error::bad_query_value("query_type", other)),
}?;
Ok(QueryConfigQueryParams(QueryConfig {
Expand Down
6 changes: 2 additions & 4 deletions ipa-core/src/protocol/basics/shard_fin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
ff::{boolean::Boolean, boolean_array::BooleanArray, Serializable},
helpers::{Message, TotalRecords},
protocol::{
boolean::step::SixteenBitStep,
boolean::step::ThirtyTwoBitStep,
context::{
dzkp_validator::DZKPValidator, DZKPContext, DZKPUpgradedMaliciousContext,
DZKPUpgradedSemiHonestContext, MaliciousProtocolSteps, ShardedContext,
Expand Down Expand Up @@ -288,9 +288,7 @@ where
C: 'a,
{
async move {
// todo: SixteenBit only works for values up to BA16. EightBitStep will panic if we try
// to add larger values
self.values = integer_sat_add::<_, SixteenBitStep, B>(
self.values = integer_sat_add::<_, ThirtyTwoBitStep, B>(
ctx,
record_id,
&self.values,
Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/protocol/hybrid/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ where
let aggregated_reports = aggregate_reports::<BK, V, C>(ctx.clone(), sharded_reports).await?;

let histogram = breakdown_reveal_aggregation::<C, BK, V, HV, B>(
ctx.clone(),
ctx.narrow(&Step::Aggregate),
aggregated_reports,
&dp_padding_params,
)
Expand Down
3 changes: 3 additions & 0 deletions ipa-core/src/protocol/hybrid/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub(crate) enum HybridStep {
GroupBySum,
#[step(child = crate::protocol::context::step::DzkpValidationProtocolStep)]
GroupBySumValidate,
#[step(child = crate::protocol::ipa_prf::aggregation::step::AggregationStep)]
Aggregate,
#[step(child = FinalizeSteps)]
Finalize,
}
Expand All @@ -33,6 +35,7 @@ pub(crate) enum AggregateReportsStep {

#[derive(CompactStep)]
pub(crate) enum FinalizeSteps {
#[step(child = crate::protocol::ipa_prf::boolean_ops::step::SaturatedAdditionStep)]
Add,
#[step(child = crate::protocol::context::step::DzkpValidationProtocolStep)]
Validate,
Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/protocol/ipa_prf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ mod compact_gate_tests {
fn step_count_limit() {
// This is an arbitrary limit intended to catch changes that unintentionally
// blow up the step count. It can be increased, within reason.
const STEP_COUNT_LIMIT: u32 = 24_000;
const STEP_COUNT_LIMIT: u32 = 32_500;
assert!(
ProtocolStep::STEP_COUNT < STEP_COUNT_LIMIT,
"Step count of {actual} exceeds limit of {STEP_COUNT_LIMIT}.",
Expand Down
19 changes: 17 additions & 2 deletions ipa-core/src/query/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{
Gate,
},
query::{
runner::{OprfIpaQuery, QueryResult},
runner::{execute_hybrid_protocol, OprfIpaQuery, QueryResult},
state::RunningQuery,
},
sync::Arc,
Expand Down Expand Up @@ -165,7 +165,22 @@ pub fn execute<R: PrivateKeyRegistry>(
)
},
),
(QueryType::MaliciousHybrid(_), _) => todo!(),
(QueryType::MaliciousHybrid(ipa_config), _) => do_query(
runtime,
config,
gateway,
input,
move |prss, gateway, config, input| {
Box::pin(execute_hybrid_protocol(
prss,
gateway,
input,
ipa_config,
config,
key_registry,
))
},
),
}
}

Expand Down
Loading

0 comments on commit fcfdfa6

Please sign in to comment.