Skip to content

Commit

Permalink
Merge branch 'main' into anonymize_values
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym committed Nov 16, 2023
2 parents 60a344b + 2b48526 commit a075844
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 51 deletions.
2 changes: 1 addition & 1 deletion analysis/parameter_tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _find_candidate_parameters(
max_sum_per_partition_bounds = min_sum_per_partition_bounds = None

if calculate_sum_per_partition_param:
if hist.linf_sum_contributions_histogram.bins[0].lower >= 0:
if hist.linf_sum_contributions_histogram.bins[0].lower < 0:
logging.warning(
"max_sum_per_partition should not contain negative sums because"
" min_sum_per_partition tuning is not supported yet and "
Expand Down
22 changes: 19 additions & 3 deletions analysis/per_partition_combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,18 +267,26 @@ def create_accumulator(
def compute_metrics(self, acc: AccumulatorType) -> metrics.SumMetrics:
"""Computes metrics based on the accumulator properties."""
partition_sum, clipping_to_min_error, clipping_to_max_error, expected_l0_bounding_error, var_cross_partition_error = acc
std_noise = dp_computations.compute_dp_count_noise_std(
self._params.scalar_noise_params)
return metrics.SumMetrics(
aggregation=self._metric,
sum=partition_sum,
clipping_to_min_error=clipping_to_min_error,
clipping_to_max_error=clipping_to_max_error,
expected_l0_bounding_error=expected_l0_bounding_error,
std_l0_bounding_error=math.sqrt(var_cross_partition_error),
std_noise=std_noise,
std_noise=self._get_std_noise(),
noise_kind=self._params.aggregate_params.noise_kind)

def get_sensitivities(self) -> dp_computations.Sensitivities:
return dp_computations.compute_sensitivities_for_sum(
self._params.aggregate_params)

def _get_std_noise(self) -> float:
sensitivities = self.get_sensitivities()
mechanism = dp_computations.create_additive_mechanism(
self._params.mechanism_spec, sensitivities)
return mechanism.std


class CountCombiner(SumCombiner):
"""A combiner for utility analysis counts."""
Expand All @@ -298,6 +306,10 @@ def create_accumulator(
self._params.aggregate_params.max_sum_per_partition = self._params.aggregate_params.max_contributions_per_partition
return super().create_accumulator(data)

def get_sensitivities(self) -> dp_computations.Sensitivities:
return dp_computations.compute_sensitivities_for_count(
self._params.aggregate_params)


class PrivacyIdCountCombiner(SumCombiner):
"""A combiner for utility analysis privacy ID counts."""
Expand All @@ -319,6 +331,10 @@ def create_accumulator(
self._params.aggregate_params.max_sum_per_partition = 1.0
return super().create_accumulator(data)

def get_sensitivities(self) -> dp_computations.Sensitivities:
return dp_computations.compute_sensitivities_for_privacy_id_count(
self._params.aggregate_params)


class RawStatisticsCombiner(UtilityAnalysisCombiner):
"""A combiner for computing per-partition raw statistics (count etc)."""
Expand Down
12 changes: 6 additions & 6 deletions analysis/tests/per_partition_combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,15 @@ class UtilityAnalysisSumCombinerTest(parameterized.TestCase):
dict(testcase_name='empty',
num_partitions=[0],
contribution_values=[()],
params=_create_combiner_params_for_sum(0, 0),
params=_create_combiner_params_for_sum(0, 1),
expected_metrics=metrics.SumMetrics(
aggregation=pipeline_dp.Metrics.SUM,
sum=0,
clipping_to_min_error=0,
clipping_to_max_error=0,
expected_l0_bounding_error=0,
std_l0_bounding_error=0,
std_noise=7.46484375,
std_noise=3.732421875,
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN)),
dict(testcase_name='one_privacy_id_zero_partition_error',
num_partitions=[1],
Expand All @@ -300,7 +300,7 @@ class UtilityAnalysisSumCombinerTest(parameterized.TestCase):
clipping_to_max_error=0,
expected_l0_bounding_error=0,
std_l0_bounding_error=0,
std_noise=7.46484375,
std_noise=12.690234375,
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN)),
dict(testcase_name='1_privacy_id_4_contributions_clip_max_error_half',
num_partitions=[4],
Expand All @@ -313,7 +313,7 @@ class UtilityAnalysisSumCombinerTest(parameterized.TestCase):
clipping_to_max_error=-5.5,
expected_l0_bounding_error=-4.125,
std_l0_bounding_error=2.381569860407206,
std_noise=7.46484375,
std_noise=20.5283203125,
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN)),
dict(testcase_name='1_privacy_id_4_partitions_4_contributions_clip_min',
num_partitions=[4],
Expand All @@ -326,7 +326,7 @@ class UtilityAnalysisSumCombinerTest(parameterized.TestCase):
clipping_to_max_error=0,
expected_l0_bounding_error=-1.5,
std_l0_bounding_error=0.8660254037844386,
std_noise=7.46484375,
std_noise=74.6484375,
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN)),
dict(testcase_name='2_privacy_ids',
num_partitions=[2, 4],
Expand All @@ -339,7 +339,7 @@ class UtilityAnalysisSumCombinerTest(parameterized.TestCase):
clipping_to_max_error=-1.0,
expected_l0_bounding_error=-0.625,
std_l0_bounding_error=0.33071891388307384,
std_noise=7.46484375,
std_noise=1.8662109375,
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN)))
def test_compute_metrics(self, num_partitions, contribution_values, params,
expected_metrics):
Expand Down
2 changes: 1 addition & 1 deletion pipeline_dp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@
from pipeline_dp.pipeline_backend import PipelineBackend
from pipeline_dp.pipeline_backend import SparkRDDBackend

__version__ = '0.2.1'
__version__ = '0.2.2rc1'
12 changes: 6 additions & 6 deletions pipeline_dp/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,16 @@ class CombinerParams:

def __init__(self, spec: budget_accounting.MechanismSpec,
aggregate_params: pipeline_dp.AggregateParams):
self._mechanism_spec = spec
self.mechanism_spec = spec
self.aggregate_params = copy.copy(aggregate_params)

@property
def eps(self):
return self._mechanism_spec.eps
return self.mechanism_spec.eps

@property
def delta(self):
return self._mechanism_spec.delta
return self.mechanism_spec.delta

@property
def scalar_noise_params(self):
Expand Down Expand Up @@ -584,7 +584,7 @@ def explain_computation(self) -> ExplainComputationReport:
return lambda: f"Computed variance with (eps={self._params.eps} delta={self._params.delta})"

def mechanism_spec(self) -> budget_accounting.MechanismSpec:
return self._params._mechanism_spec
return self._params.mechanism_spec


class QuantileCombiner(Combiner):
Expand Down Expand Up @@ -666,7 +666,7 @@ def _noise_type(self) -> str:
assert False, f"{noise_kind} is not support by PyDP quantile tree."

def mechanism_spec(self) -> budget_accounting.MechanismSpec:
return self._params._mechanism_spec
return self._params.mechanism_spec


# Cache for namedtuple types. It should be used only in
Expand Down Expand Up @@ -843,7 +843,7 @@ def explain_computation(self) -> ExplainComputationReport:
lambda: f"Computed vector sum with (eps={self._params.eps} delta={self._params.delta})"

def mechanism_spec(self) -> budget_accounting.MechanismSpec:
return self._params._mechanism_spec
return self._params.mechanism_spec


def create_compound_combiner(
Expand Down
97 changes: 66 additions & 31 deletions pipeline_dp/dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import abc
from collections import namedtuple
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Optional
from typing import Any, Callable, Dict, Iterable, Optional, Sequence, Union

import pipeline_dp
import pyspark
Expand All @@ -36,7 +36,7 @@ def __post_init__(self):
@dataclass
class Columns:
privacy_key: str
partition_key: str
partition_key: Union[str, Sequence[str]]
value: Optional[str]


Expand Down Expand Up @@ -68,17 +68,30 @@ def __init__(self, spark: pyspark.sql.SparkSession):

def dataframe_to_collection(self, df: SparkDataFrame,
columns: Columns) -> pyspark.RDD:
columns_to_keep = [columns.privacy_key, columns.partition_key]
if columns.value is not None:
columns_to_keep = [columns.privacy_key]
if isinstance(columns.partition_key, str):
num_partition_columns = 1
columns_to_keep.append(columns.partition_key)
else: # Sequence[str], multiple columns
num_partition_columns = len(columns.partition_key)
columns_to_keep.extend(columns.partition_key)
value_present = columns.value is not None
if value_present:
columns_to_keep.append(columns.value)

df = df[columns_to_keep] # leave only needed columns.
if columns.value is None:
return df.rdd.map(lambda row: (row[0], row[1], 0))
return df.rdd.map(lambda row: (row[0], row[1], row[2]))

def extractor(row):
privacy_key = row[0]
partition_key = row[1] if num_partition_columns == 1 else row[
1:1 + num_partition_columns]
value = row[1 + num_partition_columns] if value_present else 0
return (privacy_key, partition_key, value)

return df.rdd.map(extractor)

def collection_to_dataframe(self, col: pyspark.RDD) -> SparkDataFrame:
df = self._spark.createDataFrame(col)
return df
return self._spark.createDataFrame(col)


def _create_backend_for_dataframe(
Expand Down Expand Up @@ -179,13 +192,21 @@ def run_query(
metrics_names_to_output_columns[metric_name] = output_column

output_columns = list(metrics_names_to_output_columns.values())
partition_key_column = self._columns.partition_key
PartitionMetricsTuple = namedtuple("Result", [partition_key_column] +
output_columns)
partition_key = self._columns.partition_key
partition_key_one_column = isinstance(partition_key, str)
if partition_key_one_column:
partition_key = [partition_key]
PartitionMetricsTuple = namedtuple("Result",
partition_key + output_columns)

def convert_to_partition_metrics_tuple(row):
partition, metrics = row
result = {partition_key_column: partition}
if partition_key_one_column:
result = {partition_key[0]: partition}
else:
result = {}
for key, value in zip(partition_key, partition):
result[key] = value
for key, value in metrics._asdict().items():
# Map default metric names to metric names specified in
# self.metrics_names_to_output_columns
Expand Down Expand Up @@ -229,24 +250,27 @@ def __init__(self, df, privacy_unit_column: str):
f"Column {privacy_unit_column} is not present in DataFrame")
self._df = df
self._privacy_unit_column = privacy_unit_column
self._groupby_column = None
self._by = None
self._value_column = None
self._metrics = {} # map from pipeline_dp.Metric -> output column name
self._contribution_bounds = ContributionBounds()
self._public_keys = None

def groupby(self,
column: str,
*,
max_groups_contributed: int,
max_contributions_per_group: int,
public_keys: Optional[Iterable[Any]] = None) -> 'QueryBuilder':
def groupby(
self,
by: Union[str, Sequence[str]],
*,
max_groups_contributed: int,
max_contributions_per_group: int,
public_keys: Optional[Iterable[Any]] = None,
column: Optional[str] = None # deprecated
) -> 'QueryBuilder':
"""Adds groupby by the given column to the query.
All following aggregation will be applied to grouped by DataFrame.
Args:
column: column to group.
by: a column or a list of columns used to determine the groups.
max_groups_contributed: the maximum groups that can each privacy
unit contributes to the result. If some privacy unit contributes
more in the input dataset, the groups are sub-sampled to
Expand All @@ -257,11 +281,23 @@ def groupby(self,
max_contributions_per_group.
public_keys:
"""
if self._groupby_column is not None:
if column is not None:
raise ValueError("column argument is deprecated. Use by")
if self._by is not None:
raise ValueError("groupby can be called only once.")
if column not in self._df.columns:
raise ValueError(f"Column {column} is not present in DataFrame")
self._groupby_column = column

if isinstance(by, str):
if by not in self._df.columns:
raise ValueError(f"Column {by} is not present in DataFrame")
elif isinstance(by, list):
# List of columns
for column in by:
if column not in self._df.columns:
raise ValueError(
f"Column {column} is not present in DataFrame")
else:
raise ValueError(f"by argument must be column name(s)")
self._by = by
self._contribution_bounds.max_partitions_contributed = max_groups_contributed
self._contribution_bounds.max_contributions_per_partition = max_contributions_per_group
self._public_keys = public_keys
Expand All @@ -273,7 +309,7 @@ def count(self, name: str = None) -> 'QueryBuilder':
Args:
name: the name of the output column.
"""
if self._groupby_column is None:
if self._by is None:
raise ValueError(
"Global aggregations are not supported. Use groupby.")
if pipeline_dp.Metrics.COUNT in self._metrics:
Expand All @@ -294,7 +330,7 @@ def sum(self,
min_value, max_value: capping limits to each value.
name: the name of the output column.
"""
if self._groupby_column is None:
if self._by is None:
raise ValueError(
"Global aggregations are not supported. Use groupby.")
if pipeline_dp.Metrics.SUM in self._metrics:
Expand All @@ -310,14 +346,13 @@ def sum(self,

def build_query(self) -> Query:
"""Builds the DP query."""
if self._groupby_column is None:
if self._by is None:
raise NotImplementedError(
"Global aggregations are not implemented yet. Call groupby.")
if not self._metrics:
raise ValueError(
"No aggregations in the query. Call for example count.")
return Query(
self._df,
Columns(self._privacy_unit_column, self._groupby_column,
self._value_column), self._metrics,
self._contribution_bounds, self._public_keys)
Columns(self._privacy_unit_column, self._by, self._value_column),
self._metrics, self._contribution_bounds, self._public_keys)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pipeline-dp"
version = "0.2.1"
version = "0.2.2rc1"
description = ""
authors = ["Chinmay Shah <[email protected]>", "Vadym Doroshenko <[email protected]>"]
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.2.1
current_version = 0.2.2rc1
commit = True
tag = True

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def read(fname):

setup_kwargs = {
'name': 'pipeline-dp',
'version': '0.2.1',
'version': '0.2.2rc1',
'description': 'Framework for applying differential privacy to large datasets using batch processing systems',
'author': 'Chinmay Shah',
'author_email': '[email protected]',
Expand Down
Loading

0 comments on commit a075844

Please sign in to comment.