From 5031a3bf620db865536a2473e318ef1142c8fa76 Mon Sep 17 00:00:00 2001 From: Vadym Doroshenko <53558779+dvadym@users.noreply.github.com> Date: Fri, 3 Nov 2023 12:16:19 +0100 Subject: [PATCH 1/3] Implement groupby by multiple columns in Spark DataFrame API (#501) --- pipeline_dp/dataframes.py | 97 ++++++++++++++++++++++++++------------- tests/dataframes_test.py | 62 +++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 31 deletions(-) diff --git a/pipeline_dp/dataframes.py b/pipeline_dp/dataframes.py index 1e488c8b..056a6321 100644 --- a/pipeline_dp/dataframes.py +++ b/pipeline_dp/dataframes.py @@ -15,7 +15,7 @@ import abc from collections import namedtuple from dataclasses import dataclass -from typing import Any, Dict, Iterable, Optional +from typing import Any, Callable, Dict, Iterable, Optional, Sequence, Union import pipeline_dp import pyspark @@ -36,7 +36,7 @@ def __post_init__(self): @dataclass class Columns: privacy_key: str - partition_key: str + partition_key: Union[str, Sequence[str]] value: Optional[str] @@ -68,17 +68,30 @@ def __init__(self, spark: pyspark.sql.SparkSession): def dataframe_to_collection(self, df: SparkDataFrame, columns: Columns) -> pyspark.RDD: - columns_to_keep = [columns.privacy_key, columns.partition_key] - if columns.value is not None: + columns_to_keep = [columns.privacy_key] + if isinstance(columns.partition_key, str): + num_partition_columns = 1 + columns_to_keep.append(columns.partition_key) + else: # Sequence[str], multiple columns + num_partition_columns = len(columns.partition_key) + columns_to_keep.extend(columns.partition_key) + value_present = columns.value is not None + if value_present: columns_to_keep.append(columns.value) + df = df[columns_to_keep] # leave only needed columns. - if columns.value is None: - return df.rdd.map(lambda row: (row[0], row[1], 0)) - return df.rdd.map(lambda row: (row[0], row[1], row[2])) + + def extractor(row): + privacy_key = row[0] + partition_key = row[1] if num_partition_columns == 1 else row[ + 1:1 + num_partition_columns] + value = row[1 + num_partition_columns] if value_present else 0 + return (privacy_key, partition_key, value) + + return df.rdd.map(extractor) def collection_to_dataframe(self, col: pyspark.RDD) -> SparkDataFrame: - df = self._spark.createDataFrame(col) - return df + return self._spark.createDataFrame(col) def _create_backend_for_dataframe( @@ -179,13 +192,21 @@ def run_query( metrics_names_to_output_columns[metric_name] = output_column output_columns = list(metrics_names_to_output_columns.values()) - partition_key_column = self._columns.partition_key - PartitionMetricsTuple = namedtuple("Result", [partition_key_column] + - output_columns) + partition_key = self._columns.partition_key + partition_key_one_column = isinstance(partition_key, str) + if partition_key_one_column: + partition_key = [partition_key] + PartitionMetricsTuple = namedtuple("Result", + partition_key + output_columns) def convert_to_partition_metrics_tuple(row): partition, metrics = row - result = {partition_key_column: partition} + if partition_key_one_column: + result = {partition_key[0]: partition} + else: + result = {} + for key, value in zip(partition_key, partition): + result[key] = value for key, value in metrics._asdict().items(): # Map default metric names to metric names specified in # self.metrics_names_to_output_columns @@ -229,24 +250,27 @@ def __init__(self, df, privacy_unit_column: str): f"Column {privacy_unit_column} is not present in DataFrame") self._df = df self._privacy_unit_column = privacy_unit_column - self._groupby_column = None + self._by = None self._value_column = None self._metrics = {} # map from pipeline_dp.Metric -> output column name self._contribution_bounds = ContributionBounds() self._public_keys = None - def groupby(self, - column: str, - *, - max_groups_contributed: int, - max_contributions_per_group: int, - public_keys: Optional[Iterable[Any]] = None) -> 'QueryBuilder': + def groupby( + self, + by: Union[str, Sequence[str]], + *, + max_groups_contributed: int, + max_contributions_per_group: int, + public_keys: Optional[Iterable[Any]] = None, + column: Optional[str] = None # deprecated + ) -> 'QueryBuilder': """Adds groupby by the given column to the query. All following aggregation will be applied to grouped by DataFrame. Args: - column: column to group. + by: a column or a list of columns used to determine the groups. max_groups_contributed: the maximum groups that can each privacy unit contributes to the result. If some privacy unit contributes more in the input dataset, the groups are sub-sampled to @@ -257,11 +281,23 @@ def groupby(self, max_contributions_per_group. public_keys: """ - if self._groupby_column is not None: + if column is not None: + raise ValueError("column argument is deprecated. Use by") + if self._by is not None: raise ValueError("groupby can be called only once.") - if column not in self._df.columns: - raise ValueError(f"Column {column} is not present in DataFrame") - self._groupby_column = column + + if isinstance(by, str): + if by not in self._df.columns: + raise ValueError(f"Column {by} is not present in DataFrame") + elif isinstance(by, list): + # List of columns + for column in by: + if column not in self._df.columns: + raise ValueError( + f"Column {column} is not present in DataFrame") + else: + raise ValueError(f"by argument must be column name(s)") + self._by = by self._contribution_bounds.max_partitions_contributed = max_groups_contributed self._contribution_bounds.max_contributions_per_partition = max_contributions_per_group self._public_keys = public_keys @@ -273,7 +309,7 @@ def count(self, name: str = None) -> 'QueryBuilder': Args: name: the name of the output column. """ - if self._groupby_column is None: + if self._by is None: raise ValueError( "Global aggregations are not supported. Use groupby.") if pipeline_dp.Metrics.COUNT in self._metrics: @@ -294,7 +330,7 @@ def sum(self, min_value, max_value: capping limits to each value. name: the name of the output column. """ - if self._groupby_column is None: + if self._by is None: raise ValueError( "Global aggregations are not supported. Use groupby.") if pipeline_dp.Metrics.SUM in self._metrics: @@ -310,7 +346,7 @@ def sum(self, def build_query(self) -> Query: """Builds the DP query.""" - if self._groupby_column is None: + if self._by is None: raise NotImplementedError( "Global aggregations are not implemented yet. Call groupby.") if not self._metrics: @@ -318,6 +354,5 @@ def build_query(self) -> Query: "No aggregations in the query. Call for example count.") return Query( self._df, - Columns(self._privacy_unit_column, self._groupby_column, - self._value_column), self._metrics, - self._contribution_bounds, self._public_keys) + Columns(self._privacy_unit_column, self._by, self._value_column), + self._metrics, self._contribution_bounds, self._public_keys) diff --git a/tests/dataframes_test.py b/tests/dataframes_test.py index 322ca10b..50c39a1e 100644 --- a/tests/dataframes_test.py +++ b/tests/dataframes_test.py @@ -146,6 +146,25 @@ def test_public_keys(self): self.assertEqual(query._public_partitions, ["key1"]) + def test_partition_key_multiple_columns_query(self): + df = get_pandas_df() + df["second_group_by_key"] = ["a", "b", "c"] + + query = dataframes.QueryBuilder(df, "privacy_key").groupby( + ["group_key", "second_group_by_key"], + max_groups_contributed=8, + max_contributions_per_group=11).sum("value", + min_value=1, + max_value=2.5).build_query() + + self.assertTrue(query._df.equals(df)) + self.assertEqual( + query._columns, + dataframes.Columns("privacy_key", + ["group_key", "second_group_by_key"], "value")) + self.assertEqual(query._metrics_output_columns, + {pipeline_dp.Metrics.SUM: None}) + CountNamedTuple = namedtuple("Count", ["count"]) CountSumNamedTuple = namedtuple("CountSum", ["count", "sum"]) @@ -246,6 +265,49 @@ def test_run_query_e2e_run(self): self.assertAlmostEqual(row1["count_column"], 2, delta=1e-3) self.assertAlmostEqual(row1["sum"], 3, delta=1e-3) + def test_run_query_multiple_partition_keys_e2e_run(self): + # Arrange + spark = self._get_spark_session() + pandas_df = get_pandas_df() + pandas_df["second_group_by_key"] = ["a", "b", "c"] + df = spark.createDataFrame(pandas_df) + columns = dataframes.Columns("privacy_key", + ["group_key", "second_group_by_key"], + "value") + metrics = { + pipeline_dp.Metrics.COUNT: None, # it returns default name "count" + pipeline_dp.Metrics.SUM: "sum_column" + } + bounds = dataframes.ContributionBounds( + max_partitions_contributed=2, + max_contributions_per_partition=2, + min_value=-5, + max_value=5) + public_keys = [("key1", "a"), ("key0", "b")] + query = dataframes.Query(df, columns, metrics, bounds, public_keys) + + # Act + budget = dataframes.Budget(1e6, 1e-1) # large budget to get small noise + result_df = query.run_query(budget) + + # Assert + pandas_df = result_df.toPandas() + pandas_df = pandas_df.sort_values(by=['group_key']).reset_index( + drop=True) + self.assertLen(pandas_df, 2) + # check row[0] = "key0", "b", 0+noise, 0+noise + row0 = pandas_df.loc[0] + self.assertEqual(row0["group_key"], "key0") + self.assertEqual(row0["second_group_by_key"], "b") + self.assertAlmostEqual(row0["count"], 0, delta=1e-3) + self.assertAlmostEqual(row0["sum_column"], 0, delta=1e-3) + # check row[1] = "key1", "a", 1+noise, 3+noise + row1 = pandas_df.loc[1] + self.assertEqual(row1["group_key"], "key1") + self.assertEqual(row1["second_group_by_key"], "a") + self.assertAlmostEqual(row1["count"], 1, delta=1e-3) + self.assertAlmostEqual(row1["sum_column"], 5, delta=1e-3) + if __name__ == '__main__': absltest.main() From 52e210bbce0504d062ae3cd64e4145a1237a309b Mon Sep 17 00:00:00 2001 From: Vadym Doroshenko <53558779+dvadym@users.noreply.github.com> Date: Fri, 3 Nov 2023 15:38:12 +0100 Subject: [PATCH 2/3] Update to the version v0.2.2rc1 (#502) --- pipeline_dp/__init__.py | 2 +- pyproject.toml | 2 +- setup.cfg | 2 +- setup.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pipeline_dp/__init__.py b/pipeline_dp/__init__.py index bf47db00..8c251a58 100644 --- a/pipeline_dp/__init__.py +++ b/pipeline_dp/__init__.py @@ -38,4 +38,4 @@ from pipeline_dp.pipeline_backend import PipelineBackend from pipeline_dp.pipeline_backend import SparkRDDBackend -__version__ = '0.2.1' +__version__ = '0.2.2rc1' diff --git a/pyproject.toml b/pyproject.toml index e099fab8..44202076 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pipeline-dp" -version = "0.2.1" +version = "0.2.2rc1" description = "" authors = ["Chinmay Shah ", "Vadym Doroshenko "] license = "Apache-2.0" diff --git a/setup.cfg b/setup.cfg index 94d999f5..84631ad1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.2.1 +current_version = 0.2.2rc1 commit = True tag = True diff --git a/setup.py b/setup.py index b0c411a3..4a6be132 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ def read(fname): setup_kwargs = { 'name': 'pipeline-dp', - 'version': '0.2.1', + 'version': '0.2.2rc1', 'description': 'Framework for applying differential privacy to large datasets using batch processing systems', 'author': 'Chinmay Shah', 'author_email': 'chinmayshah3899@gmail.com', From 2b48526656ffc38d5e97bf0c7a5395dcdb1c25af Mon Sep 17 00:00:00 2001 From: Vadym Doroshenko <53558779+dvadym@users.noreply.github.com> Date: Thu, 16 Nov 2023 16:09:18 +0100 Subject: [PATCH 3/3] Fix noise_std computation in utility analysis (#504) --- analysis/parameter_tuning.py | 2 +- analysis/per_partition_combiners.py | 22 ++++++++++++++++--- .../tests/per_partition_combiners_test.py | 12 +++++----- pipeline_dp/combiners.py | 12 +++++----- 4 files changed, 32 insertions(+), 16 deletions(-) diff --git a/analysis/parameter_tuning.py b/analysis/parameter_tuning.py index 26d837b8..30f1fe71 100644 --- a/analysis/parameter_tuning.py +++ b/analysis/parameter_tuning.py @@ -140,7 +140,7 @@ def _find_candidate_parameters( max_sum_per_partition_bounds = min_sum_per_partition_bounds = None if calculate_sum_per_partition_param: - if hist.linf_sum_contributions_histogram.bins[0].lower >= 0: + if hist.linf_sum_contributions_histogram.bins[0].lower < 0: logging.warning( "max_sum_per_partition should not contain negative sums because" " min_sum_per_partition tuning is not supported yet and " diff --git a/analysis/per_partition_combiners.py b/analysis/per_partition_combiners.py index 677e6c41..39abcd5c 100644 --- a/analysis/per_partition_combiners.py +++ b/analysis/per_partition_combiners.py @@ -267,8 +267,6 @@ def create_accumulator( def compute_metrics(self, acc: AccumulatorType) -> metrics.SumMetrics: """Computes metrics based on the accumulator properties.""" partition_sum, clipping_to_min_error, clipping_to_max_error, expected_l0_bounding_error, var_cross_partition_error = acc - std_noise = dp_computations.compute_dp_count_noise_std( - self._params.scalar_noise_params) return metrics.SumMetrics( aggregation=self._metric, sum=partition_sum, @@ -276,9 +274,19 @@ def compute_metrics(self, acc: AccumulatorType) -> metrics.SumMetrics: clipping_to_max_error=clipping_to_max_error, expected_l0_bounding_error=expected_l0_bounding_error, std_l0_bounding_error=math.sqrt(var_cross_partition_error), - std_noise=std_noise, + std_noise=self._get_std_noise(), noise_kind=self._params.aggregate_params.noise_kind) + def get_sensitivities(self) -> dp_computations.Sensitivities: + return dp_computations.compute_sensitivities_for_sum( + self._params.aggregate_params) + + def _get_std_noise(self) -> float: + sensitivities = self.get_sensitivities() + mechanism = dp_computations.create_additive_mechanism( + self._params.mechanism_spec, sensitivities) + return mechanism.std + class CountCombiner(SumCombiner): """A combiner for utility analysis counts.""" @@ -298,6 +306,10 @@ def create_accumulator( self._params.aggregate_params.max_sum_per_partition = self._params.aggregate_params.max_contributions_per_partition return super().create_accumulator(data) + def get_sensitivities(self) -> dp_computations.Sensitivities: + return dp_computations.compute_sensitivities_for_count( + self._params.aggregate_params) + class PrivacyIdCountCombiner(SumCombiner): """A combiner for utility analysis privacy ID counts.""" @@ -319,6 +331,10 @@ def create_accumulator( self._params.aggregate_params.max_sum_per_partition = 1.0 return super().create_accumulator(data) + def get_sensitivities(self) -> dp_computations.Sensitivities: + return dp_computations.compute_sensitivities_for_privacy_id_count( + self._params.aggregate_params) + class RawStatisticsCombiner(UtilityAnalysisCombiner): """A combiner for computing per-partition raw statistics (count etc).""" diff --git a/analysis/tests/per_partition_combiners_test.py b/analysis/tests/per_partition_combiners_test.py index 4349d1a8..be434b78 100644 --- a/analysis/tests/per_partition_combiners_test.py +++ b/analysis/tests/per_partition_combiners_test.py @@ -279,7 +279,7 @@ class UtilityAnalysisSumCombinerTest(parameterized.TestCase): dict(testcase_name='empty', num_partitions=[0], contribution_values=[()], - params=_create_combiner_params_for_sum(0, 0), + params=_create_combiner_params_for_sum(0, 1), expected_metrics=metrics.SumMetrics( aggregation=pipeline_dp.Metrics.SUM, sum=0, @@ -287,7 +287,7 @@ class UtilityAnalysisSumCombinerTest(parameterized.TestCase): clipping_to_max_error=0, expected_l0_bounding_error=0, std_l0_bounding_error=0, - std_noise=7.46484375, + std_noise=3.732421875, noise_kind=pipeline_dp.NoiseKind.GAUSSIAN)), dict(testcase_name='one_privacy_id_zero_partition_error', num_partitions=[1], @@ -300,7 +300,7 @@ class UtilityAnalysisSumCombinerTest(parameterized.TestCase): clipping_to_max_error=0, expected_l0_bounding_error=0, std_l0_bounding_error=0, - std_noise=7.46484375, + std_noise=12.690234375, noise_kind=pipeline_dp.NoiseKind.GAUSSIAN)), dict(testcase_name='1_privacy_id_4_contributions_clip_max_error_half', num_partitions=[4], @@ -313,7 +313,7 @@ class UtilityAnalysisSumCombinerTest(parameterized.TestCase): clipping_to_max_error=-5.5, expected_l0_bounding_error=-4.125, std_l0_bounding_error=2.381569860407206, - std_noise=7.46484375, + std_noise=20.5283203125, noise_kind=pipeline_dp.NoiseKind.GAUSSIAN)), dict(testcase_name='1_privacy_id_4_partitions_4_contributions_clip_min', num_partitions=[4], @@ -326,7 +326,7 @@ class UtilityAnalysisSumCombinerTest(parameterized.TestCase): clipping_to_max_error=0, expected_l0_bounding_error=-1.5, std_l0_bounding_error=0.8660254037844386, - std_noise=7.46484375, + std_noise=74.6484375, noise_kind=pipeline_dp.NoiseKind.GAUSSIAN)), dict(testcase_name='2_privacy_ids', num_partitions=[2, 4], @@ -339,7 +339,7 @@ class UtilityAnalysisSumCombinerTest(parameterized.TestCase): clipping_to_max_error=-1.0, expected_l0_bounding_error=-0.625, std_l0_bounding_error=0.33071891388307384, - std_noise=7.46484375, + std_noise=1.8662109375, noise_kind=pipeline_dp.NoiseKind.GAUSSIAN))) def test_compute_metrics(self, num_partitions, contribution_values, params, expected_metrics): diff --git a/pipeline_dp/combiners.py b/pipeline_dp/combiners.py index cdf43901..8fc94505 100644 --- a/pipeline_dp/combiners.py +++ b/pipeline_dp/combiners.py @@ -150,16 +150,16 @@ class CombinerParams: def __init__(self, spec: budget_accounting.MechanismSpec, aggregate_params: pipeline_dp.AggregateParams): - self._mechanism_spec = spec + self.mechanism_spec = spec self.aggregate_params = copy.copy(aggregate_params) @property def eps(self): - return self._mechanism_spec.eps + return self.mechanism_spec.eps @property def delta(self): - return self._mechanism_spec.delta + return self.mechanism_spec.delta @property def scalar_noise_params(self): @@ -584,7 +584,7 @@ def explain_computation(self) -> ExplainComputationReport: return lambda: f"Computed variance with (eps={self._params.eps} delta={self._params.delta})" def mechanism_spec(self) -> budget_accounting.MechanismSpec: - return self._params._mechanism_spec + return self._params.mechanism_spec class QuantileCombiner(Combiner): @@ -666,7 +666,7 @@ def _noise_type(self) -> str: assert False, f"{noise_kind} is not support by PyDP quantile tree." def mechanism_spec(self) -> budget_accounting.MechanismSpec: - return self._params._mechanism_spec + return self._params.mechanism_spec # Cache for namedtuple types. It should be used only in @@ -843,7 +843,7 @@ def explain_computation(self) -> ExplainComputationReport: lambda: f"Computed vector sum with (eps={self._params.eps} delta={self._params.delta})" def mechanism_spec(self) -> budget_accounting.MechanismSpec: - return self._params._mechanism_spec + return self._params.mechanism_spec def create_compound_combiner(