Skip to content

Commit

Permalink
Post aggregation thresholding (#494)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym authored Oct 30, 2023
1 parent c062e77 commit ed588e1
Show file tree
Hide file tree
Showing 9 changed files with 479 additions and 31 deletions.
7 changes: 7 additions & 0 deletions analysis/utility_analysis_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ def _annotate(self, col, params, budget):
# Annotations are not needed because DP computations are not performed.
return col

def _drop_partitions_under_threshold(self, col):
return col


def _check_utility_analysis_params(
options: analysis.UtilityAnalysisOptions,
Expand Down Expand Up @@ -216,3 +219,7 @@ def _check_utility_analysis_params(
raise NotImplementedError(
"utility analysis when contribution bounds are already enforced is "
"not supported")

if params.post_aggregation_thresholding:
raise NotImplementedError("Analysis with post_aggregation_thresholding "
"are not yet implemented")
32 changes: 25 additions & 7 deletions examples/restaurant_visits/run_without_frameworks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
""" Demo of running PipelineDP locally, without any external data processing framework
1. Install Python and run on the command line `pip install pipeline-dp absl-py`
2. Run python python run_without_frameworks.py --input_file=<path to data.txt from 3> --output_file=<...>
2. Run python run_without_frameworks.py --input_file=<path to data.txt from 3> --output_file=<...>
"""

from absl import app
Expand All @@ -26,6 +26,10 @@
flags.DEFINE_string('input_file', 'restaurants_week_data.csv',
'The file with the restaraunt visits data')
flags.DEFINE_string('output_file', None, 'Output file')
flags.DEFINE_boolean('post_aggregation_thresholding', False,
'Whether post aggregation thresholding is used')
flags.DEFINE_boolean('public_partitions', False,
'Whether public partitions are used')


def write_to_file(col, filename):
Expand Down Expand Up @@ -62,11 +66,15 @@ def main(unused_argv):

params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
metrics=[pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM],
metrics=[
pipeline_dp.Metrics.PRIVACY_ID_COUNT, pipeline_dp.Metrics.COUNT,
pipeline_dp.Metrics.SUM
],
max_partitions_contributed=3,
max_contributions_per_partition=2,
min_value=0,
max_value=60)
max_value=60,
post_aggregation_thresholding=FLAGS.post_aggregation_thresholding)

# Specify how to extract privacy_id, partition_key and value from an
# element of restaraunt_visits_rows.
Expand All @@ -75,18 +83,28 @@ def main(unused_argv):
privacy_id_extractor=lambda row: row.user_id,
value_extractor=lambda row: row.spent_money)

# Create the Explain Computation report object for passing it into
# DPEngine.aggregate().
explain_computation_report = pipeline_dp.ExplainComputationReport()

public_partitions = list(range(1, 8)) if FLAGS.public_partitions else None

# Create a computational graph for the aggregation.
# All computations are lazy. dp_result is iterable, but iterating it would
# fail until budget is computed (below).
# It’s possible to call DPEngine.aggregate multiple times with different
# metrics to compute.
dp_result = dp_engine.aggregate(restaraunt_visits_rows,
params,
data_extractors,
public_partitions=list(range(1, 8)))
dp_result = dp_engine.aggregate(
restaraunt_visits_rows,
params,
data_extractors,
public_partitions=public_partitions,
out_explain_computation_report=explain_computation_report)

budget_accountant.compute_budgets()

print(explain_computation_report.text())

# Here's where the lazy iterator initiates computations and gets transformed
# into actual results
dp_result = list(dp_result)
Expand Down
37 changes: 31 additions & 6 deletions pipeline_dp/aggregate_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import numpy as np

import pipeline_dp
from pipeline_dp import input_validators


Expand Down Expand Up @@ -83,19 +84,48 @@ def convert_to_mechanism_type(self):
return MechanismType.GAUSSIAN


class PartitionSelectionStrategy(Enum):
TRUNCATED_GEOMETRIC = 'Truncated Geometric'
LAPLACE_THRESHOLDING = 'Laplace Thresholding'
GAUSSIAN_THRESHOLDING = 'Gaussian Thresholding'


class MechanismType(Enum):
LAPLACE = 'Laplace'
GAUSSIAN = 'Gaussian'
LAPLACE_THRESHOLDING = 'Laplace Thresholding'
GAUSSIAN_THRESHOLDING = 'Gaussian Thresholding'
GENERIC = 'Generic'

def to_noise_kind(self):
if self.value == MechanismType.LAPLACE.value:
return NoiseKind.LAPLACE
if self.value == MechanismType.GAUSSIAN.value:
return NoiseKind.GAUSSIAN
if self.value == MechanismType.LAPLACE_THRESHOLDING.value:
return NoiseKind.LAPLACE
if self.value == MechanismType.GAUSSIAN_THRESHOLDING.value:
return NoiseKind.GAUSSIAN
raise ValueError(f"MechanismType {self.value} can not be converted to "
f"NoiseKind")

def to_partition_selection_strategy(self) -> PartitionSelectionStrategy:
if self.value == MechanismType.LAPLACE_THRESHOLDING.value:
return PartitionSelectionStrategy.LAPLACE_THRESHOLDING
if self.value == MechanismType.GAUSSIAN_THRESHOLDING.value:
return PartitionSelectionStrategy.GAUSSIAN_THRESHOLDING
raise ValueError(f"MechanismType {self.value} can not be converted to "
f"PartitionSelectionStrategy")


def noise_to_thresholding(noise_kind: NoiseKind) -> MechanismType:
if noise_kind == pipeline_dp.NoiseKind.LAPLACE:
return MechanismType.LAPLACE_THRESHOLDING
if noise_kind == pipeline_dp.NoiseKind.GAUSSIAN:
return MechanismType.GAUSSIAN_THRESHOLDING
raise ValueError(f"NoiseKind {noise_kind} can not be converted to "
f"Thresholding mechanism")


class NormKind(Enum):
Linf = "linf"
Expand All @@ -104,12 +134,6 @@ class NormKind(Enum):
L2 = "l2"


class PartitionSelectionStrategy(Enum):
TRUNCATED_GEOMETRIC = 'Truncated Geometric'
LAPLACE_THRESHOLDING = 'Laplace Thresholding'
GAUSSIAN_THRESHOLDING = 'Gaussian Thresholding'


@dataclass
class CalculatePrivateContributionBoundsParams:
"""Specifies parameters for function DPEngine.calculate_private_contribution_bounds()
Expand Down Expand Up @@ -230,6 +254,7 @@ class AggregateParams:
public_partitions_already_filtered: bool = False
partition_selection_strategy: PartitionSelectionStrategy = PartitionSelectionStrategy.TRUNCATED_GEOMETRIC
pre_threshold: Optional[int] = None
post_aggregation_thresholding: bool = False

@property
def metrics_str(self) -> str:
Expand Down
82 changes: 73 additions & 9 deletions pipeline_dp/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Combiner(abc.ABC):

@abc.abstractmethod
def create_accumulator(self, values):
"""Creates accumulator from 'values'."""
"""Creates accumulator from 'values'. values are from one privacy ID"""

@abc.abstractmethod
def merge_accumulators(self, accumulator1, accumulator2):
Expand Down Expand Up @@ -196,8 +196,8 @@ class MechanismContainerMixin(abc.ABC):
@abc.abstractmethod
def create_mechanism(
self
) -> Union[dp_computations.AdditiveMechanism,
dp_computations.MeanMechanism]:
) -> Union[dp_computations.AdditiveMechanism, dp_computations.MeanMechanism,
dp_computations.ThresholdingMechanism]:
pass

def __getstate__(self):
Expand Down Expand Up @@ -282,8 +282,9 @@ def sensitivities(self) -> dp_computations.Sensitivities:

class PrivacyIdCountCombiner(Combiner, AdditiveMechanismMixin):
"""Combiner for computing DP privacy id count.
The type of the accumulator is int, which represents count of the elements
in the dataset for which this accumulator is computed.
The type of the accumulator is int, which represents the count of
privacy IDs.
"""
AccumulatorType = int

Expand Down Expand Up @@ -324,6 +325,63 @@ def expects_per_partition_sampling(self) -> bool:
return False


class PostAggregationThresholdingCombiner(Combiner, MechanismContainerMixin):
"""Combiner for computing DP privacy id count and applying thresholding.
The type of the accumulator is int, which represents the count of
privacy IDs.
"""
AccumulatorType = int

def __init__(self, budget_accountant: pipeline_dp.BudgetAccountant,
aggregate_params: pipeline_dp.AggregateParams):
mechanism_type = pipeline_dp.aggregate_params.noise_to_thresholding(
aggregate_params.noise_kind)
self._mechanism_spec = budget_accountant.request_budget(
mechanism_type, weight=aggregate_params.budget_weight)
self._sensitivities = dp_computations.compute_sensitivities_for_privacy_id_count(
aggregate_params)
self._pre_threshold = aggregate_params.pre_threshold

def create_accumulator(self, values: Sized) -> AccumulatorType:
return 1 if values else 0

def merge_accumulators(self, accumulator1: AccumulatorType,
accumulator2: AccumulatorType):
return accumulator1 + accumulator2

def compute_metrics(self, count: AccumulatorType) -> dict:
return {
"privacy_id_count":
self.get_mechanism().noised_value_if_should_keep(count)
}

def metrics_names(self) -> List[str]:
return ['privacy_id_count']

def explain_computation(self) -> ExplainComputationReport:

def explain_computation_fn():
# TODO
return (f"Computed DP privacy_id_count with\n"
f" {self.get_mechanism().describe()}")

return explain_computation_fn

def mechanism_spec(self) -> budget_accounting.MechanismSpec:
return self._mechanism_spec

def sensitivities(self) -> dp_computations.Sensitivities:
return self._sensitivities

def expects_per_partition_sampling(self) -> bool:
return False

def create_mechanism(self) -> dp_computations.ThresholdingMechanism:
return dp_computations.create_thresholding_mechanism(
self.mechanism_spec(), self.sensitivities(), self._pre_threshold)


class SumCombiner(Combiner, AdditiveMechanismMixin):
"""Combiner for computing dp sum.
Expand Down Expand Up @@ -831,10 +889,16 @@ def create_compound_combiner(
mechanism_type, weight=aggregate_params.budget_weight)
combiners.append(SumCombiner(budget_sum, aggregate_params))
if pipeline_dp.Metrics.PRIVACY_ID_COUNT in aggregate_params.metrics:
budget_privacy_id_count = budget_accountant.request_budget(
mechanism_type, weight=aggregate_params.budget_weight)
combiners.append(
PrivacyIdCountCombiner(budget_privacy_id_count, aggregate_params))
if aggregate_params.post_aggregation_thresholding:
combiners.append(
PostAggregationThresholdingCombiner(budget_accountant,
aggregate_params))
else:
budget_privacy_id_count = budget_accountant.request_budget(
mechanism_type, weight=aggregate_params.budget_weight)
combiners.append(
PrivacyIdCountCombiner(budget_privacy_id_count,
aggregate_params))
if pipeline_dp.Metrics.VECTOR_SUM in aggregate_params.metrics:
budget_vector_sum = budget_accountant.request_budget(
mechanism_type, weight=aggregate_params.budget_weight)
Expand Down
75 changes: 69 additions & 6 deletions pipeline_dp/dp_computations.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
"""Differential privacy computing of count, sum, mean, variance."""

import abc
from dataclasses import dataclass
import math
import typing

import numpy as np
from typing import Any, Optional, Tuple, Union
from typing import Any, List, Optional, Tuple, Union

import pipeline_dp
from pipeline_dp import budget_accounting
from dataclasses import dataclass
from pipeline_dp import partition_selection
from pydp.algorithms import numerical_mechanisms as dp_mechanisms


Expand Down Expand Up @@ -696,7 +695,7 @@ def is_monotonic(self) -> bool:
def __init__(self, scoring_function: ScoringFunction) -> None:
self._scoring_function = scoring_function

def apply(self, eps: float, inputs_to_score_col: typing.List[Any]) -> Any:
def apply(self, eps: float, inputs_to_score_col: List[Any]) -> Any:
"""Applies exponential mechanism.
I.e. chooses a parameter from the list of possible parameters in a
Expand All @@ -706,7 +705,7 @@ def apply(self, eps: float, inputs_to_score_col: typing.List[Any]) -> Any:
return np.random.default_rng().choice(inputs_to_score_col, p=probs)

def _calculate_probabilities(self, eps: float,
inputs_to_score_col: typing.List[Any]):
inputs_to_score_col: List[Any]):
scores = np.array(
list(map(self._scoring_function.score, inputs_to_score_col)))
denominator = self._scoring_function.global_sensitivity
Expand Down Expand Up @@ -759,3 +758,67 @@ def compute_sensitivities_for_normalized_sum(
linf_sensitivity = max_abs_value * params.max_contributions_per_partition

return Sensitivities(l0=l0_sensitivity, linf=linf_sensitivity)


class ThresholdingMechanism:
"""Performs partition selection with thresholding mechanism.
The (Laplace, Gaussian) thresholding algorithm is the following:
1. Contribution bounding: for each privacy unit, find all the partitions
where it contributes. If there are more than max_partition_contributed,
randomly sample contributions to max_partition_contributed partitions per
privacy unit.
2. Aggregation: for each partition, compute the count of contributing
privacy units. Add noise with stddev derived from
(epsilon, delta, l0_sensitivity=max_partition_contributed).
3. Partition selection: compute threshold T based on (epsilon, delta,
l0_sensitivity=max_partition_contributed, pre_threshold). Return each
partition key and the corresponding noisy count of privacy units,
where the noisy count of contributing privacy units is >= T.
The details on computing noise stddev and T can be found in
https://github.com/google/differential-privacy/blob/main/common_docs/Delta_For_Thresholding.pdf
This class performs steps [2] and [3]: it takes the count of privacy units
contributing to a partition after contribution bounding, adds noise to it
and compares the noisy value to the threshold.
"""

def __init__(self, epsilon: float, delta: float,
strategy: pipeline_dp.PartitionSelectionStrategy,
l0_sensitivity: int, pre_threshold: Optional[int]):
self._strategy_type = strategy
self._pre_threshold = pre_threshold
self._thresholding_strategy = partition_selection.create_partition_selection_strategy(
strategy, epsilon, delta, l0_sensitivity, pre_threshold)

def noised_value_if_should_keep(self,
num_privacy_units: int) -> Optional[float]:
return self._thresholding_strategy.noised_value_if_should_keep(
num_privacy_units)

def describe(self) -> str:
eps = self._thresholding_strategy.epsilon
delta = self._thresholding_strategy.delta
threshold = self._thresholding_strategy.threshold
text = (
f"{self._strategy_type.value} with threshold={threshold:.1f} eps={eps} delta={delta}"
)
if self._pre_threshold is not None:
text += f" and pre_threshold={self._pre_threshold}"
# TODO: add noise scale to text, when it's exposed from C++.
return text


def create_thresholding_mechanism(
mechanism_spec: budget_accounting.MechanismSpec,
sensitivities: Sensitivities,
pre_threshold: Optional[int]) -> ThresholdingMechanism:
"""Creates ThresholdingMechanism from a mechanism spec and sensitivities."""
strategy = mechanism_spec.mechanism_type.to_partition_selection_strategy()
return ThresholdingMechanism(epsilon=mechanism_spec.eps,
delta=mechanism_spec.delta,
strategy=strategy,
l0_sensitivity=sensitivities.l0,
pre_threshold=pre_threshold)
Loading

0 comments on commit ed588e1

Please sign in to comment.