diff --git a/pipeline_dp/dataframes.py b/pipeline_dp/dataframes.py index 1e488c8b..056a6321 100644 --- a/pipeline_dp/dataframes.py +++ b/pipeline_dp/dataframes.py @@ -15,7 +15,7 @@ import abc from collections import namedtuple from dataclasses import dataclass -from typing import Any, Dict, Iterable, Optional +from typing import Any, Callable, Dict, Iterable, Optional, Sequence, Union import pipeline_dp import pyspark @@ -36,7 +36,7 @@ def __post_init__(self): @dataclass class Columns: privacy_key: str - partition_key: str + partition_key: Union[str, Sequence[str]] value: Optional[str] @@ -68,17 +68,30 @@ def __init__(self, spark: pyspark.sql.SparkSession): def dataframe_to_collection(self, df: SparkDataFrame, columns: Columns) -> pyspark.RDD: - columns_to_keep = [columns.privacy_key, columns.partition_key] - if columns.value is not None: + columns_to_keep = [columns.privacy_key] + if isinstance(columns.partition_key, str): + num_partition_columns = 1 + columns_to_keep.append(columns.partition_key) + else: # Sequence[str], multiple columns + num_partition_columns = len(columns.partition_key) + columns_to_keep.extend(columns.partition_key) + value_present = columns.value is not None + if value_present: columns_to_keep.append(columns.value) + df = df[columns_to_keep] # leave only needed columns. - if columns.value is None: - return df.rdd.map(lambda row: (row[0], row[1], 0)) - return df.rdd.map(lambda row: (row[0], row[1], row[2])) + + def extractor(row): + privacy_key = row[0] + partition_key = row[1] if num_partition_columns == 1 else row[ + 1:1 + num_partition_columns] + value = row[1 + num_partition_columns] if value_present else 0 + return (privacy_key, partition_key, value) + + return df.rdd.map(extractor) def collection_to_dataframe(self, col: pyspark.RDD) -> SparkDataFrame: - df = self._spark.createDataFrame(col) - return df + return self._spark.createDataFrame(col) def _create_backend_for_dataframe( @@ -179,13 +192,21 @@ def run_query( metrics_names_to_output_columns[metric_name] = output_column output_columns = list(metrics_names_to_output_columns.values()) - partition_key_column = self._columns.partition_key - PartitionMetricsTuple = namedtuple("Result", [partition_key_column] + - output_columns) + partition_key = self._columns.partition_key + partition_key_one_column = isinstance(partition_key, str) + if partition_key_one_column: + partition_key = [partition_key] + PartitionMetricsTuple = namedtuple("Result", + partition_key + output_columns) def convert_to_partition_metrics_tuple(row): partition, metrics = row - result = {partition_key_column: partition} + if partition_key_one_column: + result = {partition_key[0]: partition} + else: + result = {} + for key, value in zip(partition_key, partition): + result[key] = value for key, value in metrics._asdict().items(): # Map default metric names to metric names specified in # self.metrics_names_to_output_columns @@ -229,24 +250,27 @@ def __init__(self, df, privacy_unit_column: str): f"Column {privacy_unit_column} is not present in DataFrame") self._df = df self._privacy_unit_column = privacy_unit_column - self._groupby_column = None + self._by = None self._value_column = None self._metrics = {} # map from pipeline_dp.Metric -> output column name self._contribution_bounds = ContributionBounds() self._public_keys = None - def groupby(self, - column: str, - *, - max_groups_contributed: int, - max_contributions_per_group: int, - public_keys: Optional[Iterable[Any]] = None) -> 'QueryBuilder': + def groupby( + self, + by: Union[str, Sequence[str]], + *, + max_groups_contributed: int, + max_contributions_per_group: int, + public_keys: Optional[Iterable[Any]] = None, + column: Optional[str] = None # deprecated + ) -> 'QueryBuilder': """Adds groupby by the given column to the query. All following aggregation will be applied to grouped by DataFrame. Args: - column: column to group. + by: a column or a list of columns used to determine the groups. max_groups_contributed: the maximum groups that can each privacy unit contributes to the result. If some privacy unit contributes more in the input dataset, the groups are sub-sampled to @@ -257,11 +281,23 @@ def groupby(self, max_contributions_per_group. public_keys: """ - if self._groupby_column is not None: + if column is not None: + raise ValueError("column argument is deprecated. Use by") + if self._by is not None: raise ValueError("groupby can be called only once.") - if column not in self._df.columns: - raise ValueError(f"Column {column} is not present in DataFrame") - self._groupby_column = column + + if isinstance(by, str): + if by not in self._df.columns: + raise ValueError(f"Column {by} is not present in DataFrame") + elif isinstance(by, list): + # List of columns + for column in by: + if column not in self._df.columns: + raise ValueError( + f"Column {column} is not present in DataFrame") + else: + raise ValueError(f"by argument must be column name(s)") + self._by = by self._contribution_bounds.max_partitions_contributed = max_groups_contributed self._contribution_bounds.max_contributions_per_partition = max_contributions_per_group self._public_keys = public_keys @@ -273,7 +309,7 @@ def count(self, name: str = None) -> 'QueryBuilder': Args: name: the name of the output column. """ - if self._groupby_column is None: + if self._by is None: raise ValueError( "Global aggregations are not supported. Use groupby.") if pipeline_dp.Metrics.COUNT in self._metrics: @@ -294,7 +330,7 @@ def sum(self, min_value, max_value: capping limits to each value. name: the name of the output column. """ - if self._groupby_column is None: + if self._by is None: raise ValueError( "Global aggregations are not supported. Use groupby.") if pipeline_dp.Metrics.SUM in self._metrics: @@ -310,7 +346,7 @@ def sum(self, def build_query(self) -> Query: """Builds the DP query.""" - if self._groupby_column is None: + if self._by is None: raise NotImplementedError( "Global aggregations are not implemented yet. Call groupby.") if not self._metrics: @@ -318,6 +354,5 @@ def build_query(self) -> Query: "No aggregations in the query. Call for example count.") return Query( self._df, - Columns(self._privacy_unit_column, self._groupby_column, - self._value_column), self._metrics, - self._contribution_bounds, self._public_keys) + Columns(self._privacy_unit_column, self._by, self._value_column), + self._metrics, self._contribution_bounds, self._public_keys) diff --git a/tests/dataframes_test.py b/tests/dataframes_test.py index 322ca10b..50c39a1e 100644 --- a/tests/dataframes_test.py +++ b/tests/dataframes_test.py @@ -146,6 +146,25 @@ def test_public_keys(self): self.assertEqual(query._public_partitions, ["key1"]) + def test_partition_key_multiple_columns_query(self): + df = get_pandas_df() + df["second_group_by_key"] = ["a", "b", "c"] + + query = dataframes.QueryBuilder(df, "privacy_key").groupby( + ["group_key", "second_group_by_key"], + max_groups_contributed=8, + max_contributions_per_group=11).sum("value", + min_value=1, + max_value=2.5).build_query() + + self.assertTrue(query._df.equals(df)) + self.assertEqual( + query._columns, + dataframes.Columns("privacy_key", + ["group_key", "second_group_by_key"], "value")) + self.assertEqual(query._metrics_output_columns, + {pipeline_dp.Metrics.SUM: None}) + CountNamedTuple = namedtuple("Count", ["count"]) CountSumNamedTuple = namedtuple("CountSum", ["count", "sum"]) @@ -246,6 +265,49 @@ def test_run_query_e2e_run(self): self.assertAlmostEqual(row1["count_column"], 2, delta=1e-3) self.assertAlmostEqual(row1["sum"], 3, delta=1e-3) + def test_run_query_multiple_partition_keys_e2e_run(self): + # Arrange + spark = self._get_spark_session() + pandas_df = get_pandas_df() + pandas_df["second_group_by_key"] = ["a", "b", "c"] + df = spark.createDataFrame(pandas_df) + columns = dataframes.Columns("privacy_key", + ["group_key", "second_group_by_key"], + "value") + metrics = { + pipeline_dp.Metrics.COUNT: None, # it returns default name "count" + pipeline_dp.Metrics.SUM: "sum_column" + } + bounds = dataframes.ContributionBounds( + max_partitions_contributed=2, + max_contributions_per_partition=2, + min_value=-5, + max_value=5) + public_keys = [("key1", "a"), ("key0", "b")] + query = dataframes.Query(df, columns, metrics, bounds, public_keys) + + # Act + budget = dataframes.Budget(1e6, 1e-1) # large budget to get small noise + result_df = query.run_query(budget) + + # Assert + pandas_df = result_df.toPandas() + pandas_df = pandas_df.sort_values(by=['group_key']).reset_index( + drop=True) + self.assertLen(pandas_df, 2) + # check row[0] = "key0", "b", 0+noise, 0+noise + row0 = pandas_df.loc[0] + self.assertEqual(row0["group_key"], "key0") + self.assertEqual(row0["second_group_by_key"], "b") + self.assertAlmostEqual(row0["count"], 0, delta=1e-3) + self.assertAlmostEqual(row0["sum_column"], 0, delta=1e-3) + # check row[1] = "key1", "a", 1+noise, 3+noise + row1 = pandas_df.loc[1] + self.assertEqual(row1["group_key"], "key1") + self.assertEqual(row1["second_group_by_key"], "a") + self.assertAlmostEqual(row1["count"], 1, delta=1e-3) + self.assertAlmostEqual(row1["sum_column"], 5, delta=1e-3) + if __name__ == '__main__': absltest.main()