diff --git a/analysis/utility_analysis_engine.py b/analysis/utility_analysis_engine.py index 244d5418..599cc62f 100644 --- a/analysis/utility_analysis_engine.py +++ b/analysis/utility_analysis_engine.py @@ -181,6 +181,9 @@ def _annotate(self, col, params, budget): # Annotations are not needed because DP computations are not performed. return col + def _drop_partitions_under_threshold(self, col): + return col + def _check_utility_analysis_params( options: analysis.UtilityAnalysisOptions, @@ -216,3 +219,7 @@ def _check_utility_analysis_params( raise NotImplementedError( "utility analysis when contribution bounds are already enforced is " "not supported") + + if params.post_aggregation_thresholding: + raise NotImplementedError("Analysis with post_aggregation_thresholding " + "are not yet implemented") diff --git a/examples/restaurant_visits/run_without_frameworks.py b/examples/restaurant_visits/run_without_frameworks.py index f2114af2..fa2479ec 100644 --- a/examples/restaurant_visits/run_without_frameworks.py +++ b/examples/restaurant_visits/run_without_frameworks.py @@ -14,7 +14,7 @@ """ Demo of running PipelineDP locally, without any external data processing framework 1. Install Python and run on the command line `pip install pipeline-dp absl-py` -2. Run python python run_without_frameworks.py --input_file= --output_file=<...> +2. Run python run_without_frameworks.py --input_file= --output_file=<...> """ from absl import app @@ -26,6 +26,10 @@ flags.DEFINE_string('input_file', 'restaurants_week_data.csv', 'The file with the restaraunt visits data') flags.DEFINE_string('output_file', None, 'Output file') +flags.DEFINE_boolean('post_aggregation_thresholding', False, + 'Whether post aggregation thresholding is used') +flags.DEFINE_boolean('public_partitions', False, + 'Whether public partitions are used') def write_to_file(col, filename): @@ -62,11 +66,15 @@ def main(unused_argv): params = pipeline_dp.AggregateParams( noise_kind=pipeline_dp.NoiseKind.LAPLACE, - metrics=[pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM], + metrics=[ + pipeline_dp.Metrics.PRIVACY_ID_COUNT, pipeline_dp.Metrics.COUNT, + pipeline_dp.Metrics.SUM + ], max_partitions_contributed=3, max_contributions_per_partition=2, min_value=0, - max_value=60) + max_value=60, + post_aggregation_thresholding=FLAGS.post_aggregation_thresholding) # Specify how to extract privacy_id, partition_key and value from an # element of restaraunt_visits_rows. @@ -75,18 +83,28 @@ def main(unused_argv): privacy_id_extractor=lambda row: row.user_id, value_extractor=lambda row: row.spent_money) + # Create the Explain Computation report object for passing it into + # DPEngine.aggregate(). + explain_computation_report = pipeline_dp.ExplainComputationReport() + + public_partitions = list(range(1, 8)) if FLAGS.public_partitions else None + # Create a computational graph for the aggregation. # All computations are lazy. dp_result is iterable, but iterating it would # fail until budget is computed (below). # It’s possible to call DPEngine.aggregate multiple times with different # metrics to compute. - dp_result = dp_engine.aggregate(restaraunt_visits_rows, - params, - data_extractors, - public_partitions=list(range(1, 8))) + dp_result = dp_engine.aggregate( + restaraunt_visits_rows, + params, + data_extractors, + public_partitions=public_partitions, + out_explain_computation_report=explain_computation_report) budget_accountant.compute_budgets() + print(explain_computation_report.text()) + # Here's where the lazy iterator initiates computations and gets transformed # into actual results dp_result = list(dp_result) diff --git a/pipeline_dp/aggregate_params.py b/pipeline_dp/aggregate_params.py index 545513f5..737187be 100644 --- a/pipeline_dp/aggregate_params.py +++ b/pipeline_dp/aggregate_params.py @@ -22,6 +22,7 @@ import numpy as np +import pipeline_dp from pipeline_dp import input_validators @@ -83,9 +84,17 @@ def convert_to_mechanism_type(self): return MechanismType.GAUSSIAN +class PartitionSelectionStrategy(Enum): + TRUNCATED_GEOMETRIC = 'Truncated Geometric' + LAPLACE_THRESHOLDING = 'Laplace Thresholding' + GAUSSIAN_THRESHOLDING = 'Gaussian Thresholding' + + class MechanismType(Enum): LAPLACE = 'Laplace' GAUSSIAN = 'Gaussian' + LAPLACE_THRESHOLDING = 'Laplace Thresholding' + GAUSSIAN_THRESHOLDING = 'Gaussian Thresholding' GENERIC = 'Generic' def to_noise_kind(self): @@ -93,9 +102,30 @@ def to_noise_kind(self): return NoiseKind.LAPLACE if self.value == MechanismType.GAUSSIAN.value: return NoiseKind.GAUSSIAN + if self.value == MechanismType.LAPLACE_THRESHOLDING.value: + return NoiseKind.LAPLACE + if self.value == MechanismType.GAUSSIAN_THRESHOLDING.value: + return NoiseKind.GAUSSIAN raise ValueError(f"MechanismType {self.value} can not be converted to " f"NoiseKind") + def to_partition_selection_strategy(self) -> PartitionSelectionStrategy: + if self.value == MechanismType.LAPLACE_THRESHOLDING.value: + return PartitionSelectionStrategy.LAPLACE_THRESHOLDING + if self.value == MechanismType.GAUSSIAN_THRESHOLDING.value: + return PartitionSelectionStrategy.GAUSSIAN_THRESHOLDING + raise ValueError(f"MechanismType {self.value} can not be converted to " + f"PartitionSelectionStrategy") + + +def noise_to_thresholding(noise_kind: NoiseKind) -> MechanismType: + if noise_kind == pipeline_dp.NoiseKind.LAPLACE: + return MechanismType.LAPLACE_THRESHOLDING + if noise_kind == pipeline_dp.NoiseKind.GAUSSIAN: + return MechanismType.GAUSSIAN_THRESHOLDING + raise ValueError(f"NoiseKind {noise_kind} can not be converted to " + f"Thresholding mechanism") + class NormKind(Enum): Linf = "linf" @@ -104,12 +134,6 @@ class NormKind(Enum): L2 = "l2" -class PartitionSelectionStrategy(Enum): - TRUNCATED_GEOMETRIC = 'Truncated Geometric' - LAPLACE_THRESHOLDING = 'Laplace Thresholding' - GAUSSIAN_THRESHOLDING = 'Gaussian Thresholding' - - @dataclass class CalculatePrivateContributionBoundsParams: """Specifies parameters for function DPEngine.calculate_private_contribution_bounds() @@ -230,6 +254,7 @@ class AggregateParams: public_partitions_already_filtered: bool = False partition_selection_strategy: PartitionSelectionStrategy = PartitionSelectionStrategy.TRUNCATED_GEOMETRIC pre_threshold: Optional[int] = None + post_aggregation_thresholding: bool = False @property def metrics_str(self) -> str: diff --git a/pipeline_dp/combiners.py b/pipeline_dp/combiners.py index dcd8401c..cdf43901 100644 --- a/pipeline_dp/combiners.py +++ b/pipeline_dp/combiners.py @@ -55,7 +55,7 @@ class Combiner(abc.ABC): @abc.abstractmethod def create_accumulator(self, values): - """Creates accumulator from 'values'.""" + """Creates accumulator from 'values'. values are from one privacy ID""" @abc.abstractmethod def merge_accumulators(self, accumulator1, accumulator2): @@ -196,8 +196,8 @@ class MechanismContainerMixin(abc.ABC): @abc.abstractmethod def create_mechanism( self - ) -> Union[dp_computations.AdditiveMechanism, - dp_computations.MeanMechanism]: + ) -> Union[dp_computations.AdditiveMechanism, dp_computations.MeanMechanism, + dp_computations.ThresholdingMechanism]: pass def __getstate__(self): @@ -282,8 +282,9 @@ def sensitivities(self) -> dp_computations.Sensitivities: class PrivacyIdCountCombiner(Combiner, AdditiveMechanismMixin): """Combiner for computing DP privacy id count. - The type of the accumulator is int, which represents count of the elements - in the dataset for which this accumulator is computed. + + The type of the accumulator is int, which represents the count of + privacy IDs. """ AccumulatorType = int @@ -324,6 +325,63 @@ def expects_per_partition_sampling(self) -> bool: return False +class PostAggregationThresholdingCombiner(Combiner, MechanismContainerMixin): + """Combiner for computing DP privacy id count and applying thresholding. + + The type of the accumulator is int, which represents the count of + privacy IDs. + """ + AccumulatorType = int + + def __init__(self, budget_accountant: pipeline_dp.BudgetAccountant, + aggregate_params: pipeline_dp.AggregateParams): + mechanism_type = pipeline_dp.aggregate_params.noise_to_thresholding( + aggregate_params.noise_kind) + self._mechanism_spec = budget_accountant.request_budget( + mechanism_type, weight=aggregate_params.budget_weight) + self._sensitivities = dp_computations.compute_sensitivities_for_privacy_id_count( + aggregate_params) + self._pre_threshold = aggregate_params.pre_threshold + + def create_accumulator(self, values: Sized) -> AccumulatorType: + return 1 if values else 0 + + def merge_accumulators(self, accumulator1: AccumulatorType, + accumulator2: AccumulatorType): + return accumulator1 + accumulator2 + + def compute_metrics(self, count: AccumulatorType) -> dict: + return { + "privacy_id_count": + self.get_mechanism().noised_value_if_should_keep(count) + } + + def metrics_names(self) -> List[str]: + return ['privacy_id_count'] + + def explain_computation(self) -> ExplainComputationReport: + + def explain_computation_fn(): + # TODO + return (f"Computed DP privacy_id_count with\n" + f" {self.get_mechanism().describe()}") + + return explain_computation_fn + + def mechanism_spec(self) -> budget_accounting.MechanismSpec: + return self._mechanism_spec + + def sensitivities(self) -> dp_computations.Sensitivities: + return self._sensitivities + + def expects_per_partition_sampling(self) -> bool: + return False + + def create_mechanism(self) -> dp_computations.ThresholdingMechanism: + return dp_computations.create_thresholding_mechanism( + self.mechanism_spec(), self.sensitivities(), self._pre_threshold) + + class SumCombiner(Combiner, AdditiveMechanismMixin): """Combiner for computing dp sum. @@ -831,10 +889,16 @@ def create_compound_combiner( mechanism_type, weight=aggregate_params.budget_weight) combiners.append(SumCombiner(budget_sum, aggregate_params)) if pipeline_dp.Metrics.PRIVACY_ID_COUNT in aggregate_params.metrics: - budget_privacy_id_count = budget_accountant.request_budget( - mechanism_type, weight=aggregate_params.budget_weight) - combiners.append( - PrivacyIdCountCombiner(budget_privacy_id_count, aggregate_params)) + if aggregate_params.post_aggregation_thresholding: + combiners.append( + PostAggregationThresholdingCombiner(budget_accountant, + aggregate_params)) + else: + budget_privacy_id_count = budget_accountant.request_budget( + mechanism_type, weight=aggregate_params.budget_weight) + combiners.append( + PrivacyIdCountCombiner(budget_privacy_id_count, + aggregate_params)) if pipeline_dp.Metrics.VECTOR_SUM in aggregate_params.metrics: budget_vector_sum = budget_accountant.request_budget( mechanism_type, weight=aggregate_params.budget_weight) diff --git a/pipeline_dp/dp_computations.py b/pipeline_dp/dp_computations.py index 05d7211a..42ec44e3 100644 --- a/pipeline_dp/dp_computations.py +++ b/pipeline_dp/dp_computations.py @@ -14,15 +14,14 @@ """Differential privacy computing of count, sum, mean, variance.""" import abc +from dataclasses import dataclass import math -import typing - import numpy as np -from typing import Any, Optional, Tuple, Union +from typing import Any, List, Optional, Tuple, Union import pipeline_dp from pipeline_dp import budget_accounting -from dataclasses import dataclass +from pipeline_dp import partition_selection from pydp.algorithms import numerical_mechanisms as dp_mechanisms @@ -696,7 +695,7 @@ def is_monotonic(self) -> bool: def __init__(self, scoring_function: ScoringFunction) -> None: self._scoring_function = scoring_function - def apply(self, eps: float, inputs_to_score_col: typing.List[Any]) -> Any: + def apply(self, eps: float, inputs_to_score_col: List[Any]) -> Any: """Applies exponential mechanism. I.e. chooses a parameter from the list of possible parameters in a @@ -706,7 +705,7 @@ def apply(self, eps: float, inputs_to_score_col: typing.List[Any]) -> Any: return np.random.default_rng().choice(inputs_to_score_col, p=probs) def _calculate_probabilities(self, eps: float, - inputs_to_score_col: typing.List[Any]): + inputs_to_score_col: List[Any]): scores = np.array( list(map(self._scoring_function.score, inputs_to_score_col))) denominator = self._scoring_function.global_sensitivity @@ -759,3 +758,67 @@ def compute_sensitivities_for_normalized_sum( linf_sensitivity = max_abs_value * params.max_contributions_per_partition return Sensitivities(l0=l0_sensitivity, linf=linf_sensitivity) + + +class ThresholdingMechanism: + """Performs partition selection with thresholding mechanism. + + The (Laplace, Gaussian) thresholding algorithm is the following: + 1. Contribution bounding: for each privacy unit, find all the partitions + where it contributes. If there are more than max_partition_contributed, + randomly sample contributions to max_partition_contributed partitions per + privacy unit. + 2. Aggregation: for each partition, compute the count of contributing + privacy units. Add noise with stddev derived from + (epsilon, delta, l0_sensitivity=max_partition_contributed). + 3. Partition selection: compute threshold T based on (epsilon, delta, + l0_sensitivity=max_partition_contributed, pre_threshold). Return each + partition key and the corresponding noisy count of privacy units, + where the noisy count of contributing privacy units is >= T. + + The details on computing noise stddev and T can be found in + https://github.com/google/differential-privacy/blob/main/common_docs/Delta_For_Thresholding.pdf + + + This class performs steps [2] and [3]: it takes the count of privacy units + contributing to a partition after contribution bounding, adds noise to it + and compares the noisy value to the threshold. + """ + + def __init__(self, epsilon: float, delta: float, + strategy: pipeline_dp.PartitionSelectionStrategy, + l0_sensitivity: int, pre_threshold: Optional[int]): + self._strategy_type = strategy + self._pre_threshold = pre_threshold + self._thresholding_strategy = partition_selection.create_partition_selection_strategy( + strategy, epsilon, delta, l0_sensitivity, pre_threshold) + + def noised_value_if_should_keep(self, + num_privacy_units: int) -> Optional[float]: + return self._thresholding_strategy.noised_value_if_should_keep( + num_privacy_units) + + def describe(self) -> str: + eps = self._thresholding_strategy.epsilon + delta = self._thresholding_strategy.delta + threshold = self._thresholding_strategy.threshold + text = ( + f"{self._strategy_type.value} with threshold={threshold:.1f} eps={eps} delta={delta}" + ) + if self._pre_threshold is not None: + text += f" and pre_threshold={self._pre_threshold}" + # TODO: add noise scale to text, when it's exposed from C++. + return text + + +def create_thresholding_mechanism( + mechanism_spec: budget_accounting.MechanismSpec, + sensitivities: Sensitivities, + pre_threshold: Optional[int]) -> ThresholdingMechanism: + """Creates ThresholdingMechanism from a mechanism spec and sensitivities.""" + strategy = mechanism_spec.mechanism_type.to_partition_selection_strategy() + return ThresholdingMechanism(epsilon=mechanism_spec.eps, + delta=mechanism_spec.delta, + strategy=strategy, + l0_sensitivity=sensitivities.l0, + pre_threshold=pre_threshold) diff --git a/pipeline_dp/dp_engine.py b/pipeline_dp/dp_engine.py index edf48d3f..6fb2b613 100644 --- a/pipeline_dp/dp_engine.py +++ b/pipeline_dp/dp_engine.py @@ -151,7 +151,7 @@ def _aggregate(self, col, params: pipeline_dp.AggregateParams, col, combiner, "Reduce accumulators per partition key") # col : (partition_key, accumulator) - if public_partitions is None: + if public_partitions is None and not params.post_aggregation_thresholding: # Perform private partition selection. max_rows_per_privacy_id = 1 @@ -173,6 +173,9 @@ def _aggregate(self, col, params: pipeline_dp.AggregateParams, col = self._backend.map_values(col, combiner.compute_metrics, "Compute DP metrics") + if params.post_aggregation_thresholding: + col = self._drop_partitions_under_threshold(col) + return col def _check_select_private_partitions( @@ -424,6 +427,10 @@ def _check_aggregate_params(self, raise ValueError( "PRIVACY_ID_COUNT cannot be computed when " "contribution_bounds_already_enforced is True.") + if params.post_aggregation_thresholding: + if pipeline_dp.Metrics.PRIVACY_ID_COUNT not in params.metrics: + raise ValueError("When post_aggregation_thresholding = True, " + "PRIVACY_ID_COUNT must be in metrics") def calculate_private_contribution_bounds( self, @@ -519,6 +526,13 @@ def _check_budget_accountant_compatibility(self, is_public_partition: bool, raise ValueError(f"PLD budget accounting does not support custom " f"combiners") + def _drop_partitions_under_threshold(self, col): + self._add_report_stage("Drop partitions which have noised " + "privacy_id_count less than threshold.") + return self._backend.filter(col, + lambda row: row[1].privacy_id_count != None, + "Drop partitions under threshold") + def _annotate(self, col, params: pipeline_dp.SelectPartitionsParams, budget: budget_accounting.Budget): return self._backend.annotate(col, diff --git a/tests/combiners_test.py b/tests/combiners_test.py index 775782e1..2823c81b 100644 --- a/tests/combiners_test.py +++ b/tests/combiners_test.py @@ -17,10 +17,11 @@ from absl.testing import absltest from absl.testing import parameterized import typing -from typing import List +from typing import List, Optional import pipeline_dp import pipeline_dp.combiners as dp_combiners import pipeline_dp.budget_accounting as ba +from pipeline_dp import aggregate_params import numpy as np @@ -157,7 +158,7 @@ def test_create_compound_combiner(self, metrics, expected_combiner_types): budget_accountant.request_budget.assert_called_with( pipeline_dp.aggregate_params.MechanismType.GAUSSIAN, weight=aggregate_params.budget_weight) - # Check correctness of intenal combiners + # Check correctness of internal combiners combiners = compound_combiner._combiners self.assertLen(combiners, len(expected_combiner_types)) for combiner, expect_type, expected_budget in zip( @@ -192,6 +193,35 @@ def test_create_compound_combiner_with_custom_combiners(self): for combiner in custom_combiners: combiner.request_budget.assert_called_once() + def test_create_compound_combiner_with_post_aggregation(self): + # Arrange. + params = self._create_aggregate_params( + [pipeline_dp.Metrics.PRIVACY_ID_COUNT]) + params.post_aggregation_thresholding = True + params.budget_weight = 1 + + # Mock budget accountant. + budget_accountant = pipeline_dp.NaiveBudgetAccountant( + 1.5, 1e-10, num_aggregations=1) + + # Act. + compound_combiner = dp_combiners.create_compound_combiner( + params, budget_accountant) + budget_accountant._compute_budget_for_aggregation(params.budget_weight) + budget_accountant.compute_budgets() + + # Assert + # Check correctness of internal combiners + combiners = compound_combiner._combiners + self.assertLen(combiners, 1) + self.assertIsInstance(combiners[0], + dp_combiners.PostAggregationThresholdingCombiner) + mechanism_spec = combiners[0].mechanism_spec() + self.assertEqual(mechanism_spec.mechanism_type, + pipeline_dp.MechanismType.GAUSSIAN_THRESHOLDING) + self.assertEqual(mechanism_spec.eps, 1.5) + self.assertEqual(mechanism_spec.delta, 1e-10) + class CountCombinerTest(parameterized.TestCase): @@ -336,6 +366,120 @@ def test_explain_computation(self): self.assertRegex(combiner.explain_computation()(), expected) +class PostAggregationThresholdingCombinerTest(parameterized.TestCase): + + def _create_combiner( + self, + small_noise: bool = False, + noise_kind: pipeline_dp.NoiseKind = pipeline_dp.NoiseKind.GAUSSIAN, + pre_threshold: Optional[int] = None + ) -> dp_combiners.PostAggregationThresholdingCombiner: + eps, delta = (10**3, 0.1) if small_noise else (1, 1e-10) + budget_accountant = pipeline_dp.NaiveBudgetAccountant(eps, delta) + aggregate_params = _create_aggregate_params() + aggregate_params.noise_kind = noise_kind + aggregate_params.pre_threshold = pre_threshold + combiner = dp_combiners.PostAggregationThresholdingCombiner( + budget_accountant, aggregate_params) + budget_accountant.compute_budgets() + return combiner + + def _get_mechanism_type(self, noise_kind: pipeline_dp.NoiseKind): + if noise_kind == pipeline_dp.NoiseKind.GAUSSIAN: + return aggregate_params.MechanismType.GAUSSIAN_THRESHOLDING + if noise_kind == pipeline_dp.NoiseKind.LAPLACE: + return aggregate_params.MechanismType.LAPLACE_THRESHOLDING + + def _get_strategy(self, noise_kind: pipeline_dp.NoiseKind): + if noise_kind == pipeline_dp.NoiseKind.GAUSSIAN: + return pipeline_dp.PartitionSelectionStrategy.GAUSSIAN_THRESHOLDING + if noise_kind == pipeline_dp.NoiseKind.LAPLACE: + return pipeline_dp.PartitionSelectionStrategy.LAPLACE_THRESHOLDING + + @parameterized.named_parameters( + dict(testcase_name='gaussian', + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, + expected_strategy_class="GaussianPartitionSelectionStrategy"), + dict(testcase_name='laplace', + noise_kind=pipeline_dp.NoiseKind.LAPLACE, + expected_strategy_class="LaplacePartitionSelectionStrategy"), + ) + def test_create_combiner(self, noise_kind: pipeline_dp.NoiseKind, + expected_strategy_class: str): + # Arrange/act + budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1, + total_delta=1e-10) + aggregate_params = _create_aggregate_params() + aggregate_params.noise_kind = noise_kind + combiner = dp_combiners.PostAggregationThresholdingCombiner( + budget_accountant, aggregate_params) + budget_accountant.compute_budgets() + + # Assert + expected_mechanism_type = self._get_mechanism_type(noise_kind) + self.assertEqual( + combiner.mechanism_spec(), + ba.MechanismSpec(expected_mechanism_type, None, 1, 1e-10)) + self.assertEqual(combiner.sensitivities().l0, 2) + self.assertEqual(combiner.sensitivities().linf, 1) + thresholding_strategy = combiner.create_mechanism( + )._thresholding_strategy + self.assertEqual( + type(thresholding_strategy).__name__, expected_strategy_class) + + def test_create_accumulator(self): + combiner = self._create_combiner() + self.assertEqual(0, combiner.create_accumulator([])) + self.assertEqual(1, combiner.create_accumulator([1, 2])) + + def test_merge_accumulators(self): + combiner = self._create_combiner() + self.assertEqual(0, combiner.merge_accumulators(0, 0)) + self.assertEqual(5, combiner.merge_accumulators(1, 4)) + + def test_compute_metrics_no_noise(self): + combiner = self._create_combiner(small_noise=True) + self.assertAlmostEqual(3, + combiner.compute_metrics(3)['privacy_id_count'], + delta=1e-2) + + @patch( + 'pydp._pydp._partition_selection.GaussianPartitionSelectionStrategy.noised_value_if_should_keep' + ) + def test_noised_value_if_should_keep(self, mock_function): + combiner = self._create_combiner(False) + mock_function.return_value = "output" + self.assertEqual( + combiner.compute_metrics(100)['privacy_id_count'], "output") + mock_function.assert_called_once_with(100) + + @parameterized.named_parameters( + dict(testcase_name='gaussian', + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, + pre_threshold=None), + dict(testcase_name='laplace', + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, + pre_threshold=20), + ) + @patch( + 'pipeline_dp.partition_selection.create_partition_selection_strategy') + def test_mechanism(self, mock_create_partition_selection_strategy, + noise_kind: pipeline_dp.NoiseKind, + pre_threshold: Optional[int]): + combiner = self._create_combiner(False, noise_kind, pre_threshold) + combiner.get_mechanism() + + expected_strategy = self._get_strategy(noise_kind) + mock_create_partition_selection_strategy.assert_called_once_with( + expected_strategy, 1.0, 1e-10, 2, pre_threshold) + + def test_explain_computation(self): + combiner = self._create_combiner() + expected = ('Computed DP privacy_id_count with\n Gaussian ' + 'Thresholding with threshold=56.5 eps=1.0 delta=1e-10') + self.assertRegex(combiner.explain_computation()(), expected) + + class SumCombinerTest(parameterized.TestCase): def _create_aggregate_params_per_partition_bound(self): diff --git a/tests/dp_computations_test.py b/tests/dp_computations_test.py index 8ea52c18..5d5cd01a 100644 --- a/tests/dp_computations_test.py +++ b/tests/dp_computations_test.py @@ -837,6 +837,73 @@ def test_compute_mean_negative_dp_count(self, mock_sum_add_noise, mock_sum_add_noise.assert_called_with(21) +class ThresholdingMechanismTests(parameterized.TestCase): + + def create_thresholding_mechanism( + self, + mechanism_type: aggregate_params.MechanismType = aggregate_params. + MechanismType.GAUSSIAN_THRESHOLDING, + pre_threshold: Optional[int] = None + ) -> dp_computations.ThresholdingMechanism: + mechanism_spec = budget_accounting.MechanismSpec(mechanism_type) + + mechanism_spec.set_eps_delta(eps=1.5, delta=1e-5) + sensitivities = dp_computations.Sensitivities(l0=4, linf=1) + + return dp_computations.create_thresholding_mechanism( + mechanism_spec, sensitivities, pre_threshold) + + def test_create_thresholding_mechanism(self): + # Arrange/Act + mechanism_spec = budget_accounting.MechanismSpec( + aggregate_params.MechanismType.GAUSSIAN_THRESHOLDING) + mechanism_spec.set_eps_delta(eps=1.5, delta=1e-5) + sensitivities = dp_computations.Sensitivities(l0=4, linf=1) + mechanism = dp_computations.create_thresholding_mechanism( + mechanism_spec, sensitivities, pre_threshold=None) + + # Assert + self.assertEqual(mechanism._thresholding_strategy.epsilon, 1.5) + self.assertEqual(mechanism._thresholding_strategy.delta, 1e-5) + self.assertEqual( + type(mechanism._thresholding_strategy).__name__, + "GaussianPartitionSelectionStrategy") + self.assertEqual( + mechanism._thresholding_strategy.max_partitions_contributed, 4) + self.assertAlmostEqual(mechanism._thresholding_strategy.threshold, + 26.26941, + delta=1e-5) + + def test_create_thresholding_mechanism(self): + mechanism = self.create_thresholding_mechanism( + aggregate_params.MechanismType.LAPLACE_THRESHOLDING, + pre_threshold=20) + self.assertEqual( + type(mechanism._thresholding_strategy).__name__, + "PreThresholdingPartitionSelectionStrategy") + self.assertEqual( + mechanism._thresholding_strategy.max_partitions_contributed, 4) + self.assertEqual(mechanism._pre_threshold, 20) + + def test_describe(self): + mechanism = self.create_thresholding_mechanism( + aggregate_params.MechanismType.LAPLACE_THRESHOLDING) + self.assertEqual( + mechanism.describe(), + "Laplace Thresholding with threshold=33.5 eps=1.5 delta=1e-05") + + @patch( + 'pydp._pydp._partition_selection.LaplacePartitionSelectionStrategy.noised_value_if_should_keep' + ) + def test_noised_value_if_should_keep(self, mock_function): + mechanism = self.create_thresholding_mechanism( + aggregate_params.MechanismType.LAPLACE_THRESHOLDING) + mock_function.return_value = "output" + output = mechanism.noised_value_if_should_keep(10) + mock_function.assert_called_once_with(10) + self.assertEqual(output, "output") + + class ExponentialMechanismTests(unittest.TestCase): def test_one_parameter_that_has_much_greater_score_than_the_others_is_always_returned( diff --git a/tests/dp_engine_test.py b/tests/dp_engine_test.py index b7612886..7db7c8a9 100644 --- a/tests/dp_engine_test.py +++ b/tests/dp_engine_test.py @@ -424,6 +424,20 @@ def test_check_aggregate_params(self): engine.aggregate(test_case["col"], test_case["params"], test_case["data_extractor"]) + def test_check_post_aggregation_thresholding_and_privacy_id_count(self): + engine = self._create_dp_engine_default() + params = pipeline_dp.AggregateParams( + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, + max_partitions_contributed=1, + max_contributions_per_partition=1, + metrics=[pipeline_dp.Metrics.COUNT], + post_aggregation_thresholding=True) + with self.assertRaisesRegex( + ValueError, + "When post_aggregation_thresholding = True, PRIVACY_ID_COUNT must be in metrics" + ): + engine.aggregate([0], params, self._get_default_extractors()) + def _check_string_contains_strings(self, string: str, substrings: List[str]): print(string) @@ -1147,6 +1161,38 @@ def test_run_e2e_partition_selection_beam(self): beam_util.assert_that(output, beam_util.is_not_empty()) + def test_run_e2e_post_aggregation_thresholding(self): + # High budget for low noise. + engine, budget_accountant = self._create_dp_engine_default( + return_accountant=True, epsilon=10, delta=1e-10) + params = pipeline_dp.AggregateParams( + noise_kind=pipeline_dp.NoiseKind.LAPLACE, + metrics=[agg.Metrics.PRIVACY_ID_COUNT], + max_partitions_contributed=1, + max_contributions_per_partition=1, + post_aggregation_thresholding=True) + + # Generate input = [(partition_key, privacy_id)] + input = [] + # 1000 partitions with 3 contributions, threshold ~= 3.2, some of them + # should be kept. + for i in range(1000): + for k in range(3): + input.append((i, 100 * i + k)) + + data_extractors = pipeline_dp.DataExtractors( + privacy_id_extractor=lambda x: x[1], + partition_extractor=lambda x: x[0], + value_extractor=lambda x: 0) + + output = engine.aggregate(input, params, data_extractors) + budget_accountant.compute_budgets() + output = list(output) + self.assertTrue(len(output) > 10) + for partition, metrics in output: + # Check that privacy_id_count > threshold. + self.assertGreater(metrics.privacy_id_count, 3.2) + @patch( 'pipeline_dp.combiners.create_compound_combiner_with_custom_combiners') @patch('pipeline_dp.combiners.create_compound_combiner')