Skip to content

Commit

Permalink
Implement groupby by multiple columns in Spark DataFrame API (#501)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym authored Nov 3, 2023
1 parent ed588e1 commit 5031a3b
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 31 deletions.
97 changes: 66 additions & 31 deletions pipeline_dp/dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import abc
from collections import namedtuple
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Optional
from typing import Any, Callable, Dict, Iterable, Optional, Sequence, Union

import pipeline_dp
import pyspark
Expand All @@ -36,7 +36,7 @@ def __post_init__(self):
@dataclass
class Columns:
privacy_key: str
partition_key: str
partition_key: Union[str, Sequence[str]]
value: Optional[str]


Expand Down Expand Up @@ -68,17 +68,30 @@ def __init__(self, spark: pyspark.sql.SparkSession):

def dataframe_to_collection(self, df: SparkDataFrame,
columns: Columns) -> pyspark.RDD:
columns_to_keep = [columns.privacy_key, columns.partition_key]
if columns.value is not None:
columns_to_keep = [columns.privacy_key]
if isinstance(columns.partition_key, str):
num_partition_columns = 1
columns_to_keep.append(columns.partition_key)
else: # Sequence[str], multiple columns
num_partition_columns = len(columns.partition_key)
columns_to_keep.extend(columns.partition_key)
value_present = columns.value is not None
if value_present:
columns_to_keep.append(columns.value)

df = df[columns_to_keep] # leave only needed columns.
if columns.value is None:
return df.rdd.map(lambda row: (row[0], row[1], 0))
return df.rdd.map(lambda row: (row[0], row[1], row[2]))

def extractor(row):
privacy_key = row[0]
partition_key = row[1] if num_partition_columns == 1 else row[
1:1 + num_partition_columns]
value = row[1 + num_partition_columns] if value_present else 0
return (privacy_key, partition_key, value)

return df.rdd.map(extractor)

def collection_to_dataframe(self, col: pyspark.RDD) -> SparkDataFrame:
df = self._spark.createDataFrame(col)
return df
return self._spark.createDataFrame(col)


def _create_backend_for_dataframe(
Expand Down Expand Up @@ -179,13 +192,21 @@ def run_query(
metrics_names_to_output_columns[metric_name] = output_column

output_columns = list(metrics_names_to_output_columns.values())
partition_key_column = self._columns.partition_key
PartitionMetricsTuple = namedtuple("Result", [partition_key_column] +
output_columns)
partition_key = self._columns.partition_key
partition_key_one_column = isinstance(partition_key, str)
if partition_key_one_column:
partition_key = [partition_key]
PartitionMetricsTuple = namedtuple("Result",
partition_key + output_columns)

def convert_to_partition_metrics_tuple(row):
partition, metrics = row
result = {partition_key_column: partition}
if partition_key_one_column:
result = {partition_key[0]: partition}
else:
result = {}
for key, value in zip(partition_key, partition):
result[key] = value
for key, value in metrics._asdict().items():
# Map default metric names to metric names specified in
# self.metrics_names_to_output_columns
Expand Down Expand Up @@ -229,24 +250,27 @@ def __init__(self, df, privacy_unit_column: str):
f"Column {privacy_unit_column} is not present in DataFrame")
self._df = df
self._privacy_unit_column = privacy_unit_column
self._groupby_column = None
self._by = None
self._value_column = None
self._metrics = {} # map from pipeline_dp.Metric -> output column name
self._contribution_bounds = ContributionBounds()
self._public_keys = None

def groupby(self,
column: str,
*,
max_groups_contributed: int,
max_contributions_per_group: int,
public_keys: Optional[Iterable[Any]] = None) -> 'QueryBuilder':
def groupby(
self,
by: Union[str, Sequence[str]],
*,
max_groups_contributed: int,
max_contributions_per_group: int,
public_keys: Optional[Iterable[Any]] = None,
column: Optional[str] = None # deprecated
) -> 'QueryBuilder':
"""Adds groupby by the given column to the query.
All following aggregation will be applied to grouped by DataFrame.
Args:
column: column to group.
by: a column or a list of columns used to determine the groups.
max_groups_contributed: the maximum groups that can each privacy
unit contributes to the result. If some privacy unit contributes
more in the input dataset, the groups are sub-sampled to
Expand All @@ -257,11 +281,23 @@ def groupby(self,
max_contributions_per_group.
public_keys:
"""
if self._groupby_column is not None:
if column is not None:
raise ValueError("column argument is deprecated. Use by")
if self._by is not None:
raise ValueError("groupby can be called only once.")
if column not in self._df.columns:
raise ValueError(f"Column {column} is not present in DataFrame")
self._groupby_column = column

if isinstance(by, str):
if by not in self._df.columns:
raise ValueError(f"Column {by} is not present in DataFrame")
elif isinstance(by, list):
# List of columns
for column in by:
if column not in self._df.columns:
raise ValueError(
f"Column {column} is not present in DataFrame")
else:
raise ValueError(f"by argument must be column name(s)")
self._by = by
self._contribution_bounds.max_partitions_contributed = max_groups_contributed
self._contribution_bounds.max_contributions_per_partition = max_contributions_per_group
self._public_keys = public_keys
Expand All @@ -273,7 +309,7 @@ def count(self, name: str = None) -> 'QueryBuilder':
Args:
name: the name of the output column.
"""
if self._groupby_column is None:
if self._by is None:
raise ValueError(
"Global aggregations are not supported. Use groupby.")
if pipeline_dp.Metrics.COUNT in self._metrics:
Expand All @@ -294,7 +330,7 @@ def sum(self,
min_value, max_value: capping limits to each value.
name: the name of the output column.
"""
if self._groupby_column is None:
if self._by is None:
raise ValueError(
"Global aggregations are not supported. Use groupby.")
if pipeline_dp.Metrics.SUM in self._metrics:
Expand All @@ -310,14 +346,13 @@ def sum(self,

def build_query(self) -> Query:
"""Builds the DP query."""
if self._groupby_column is None:
if self._by is None:
raise NotImplementedError(
"Global aggregations are not implemented yet. Call groupby.")
if not self._metrics:
raise ValueError(
"No aggregations in the query. Call for example count.")
return Query(
self._df,
Columns(self._privacy_unit_column, self._groupby_column,
self._value_column), self._metrics,
self._contribution_bounds, self._public_keys)
Columns(self._privacy_unit_column, self._by, self._value_column),
self._metrics, self._contribution_bounds, self._public_keys)
62 changes: 62 additions & 0 deletions tests/dataframes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,25 @@ def test_public_keys(self):

self.assertEqual(query._public_partitions, ["key1"])

def test_partition_key_multiple_columns_query(self):
df = get_pandas_df()
df["second_group_by_key"] = ["a", "b", "c"]

query = dataframes.QueryBuilder(df, "privacy_key").groupby(
["group_key", "second_group_by_key"],
max_groups_contributed=8,
max_contributions_per_group=11).sum("value",
min_value=1,
max_value=2.5).build_query()

self.assertTrue(query._df.equals(df))
self.assertEqual(
query._columns,
dataframes.Columns("privacy_key",
["group_key", "second_group_by_key"], "value"))
self.assertEqual(query._metrics_output_columns,
{pipeline_dp.Metrics.SUM: None})


CountNamedTuple = namedtuple("Count", ["count"])
CountSumNamedTuple = namedtuple("CountSum", ["count", "sum"])
Expand Down Expand Up @@ -246,6 +265,49 @@ def test_run_query_e2e_run(self):
self.assertAlmostEqual(row1["count_column"], 2, delta=1e-3)
self.assertAlmostEqual(row1["sum"], 3, delta=1e-3)

def test_run_query_multiple_partition_keys_e2e_run(self):
# Arrange
spark = self._get_spark_session()
pandas_df = get_pandas_df()
pandas_df["second_group_by_key"] = ["a", "b", "c"]
df = spark.createDataFrame(pandas_df)
columns = dataframes.Columns("privacy_key",
["group_key", "second_group_by_key"],
"value")
metrics = {
pipeline_dp.Metrics.COUNT: None, # it returns default name "count"
pipeline_dp.Metrics.SUM: "sum_column"
}
bounds = dataframes.ContributionBounds(
max_partitions_contributed=2,
max_contributions_per_partition=2,
min_value=-5,
max_value=5)
public_keys = [("key1", "a"), ("key0", "b")]
query = dataframes.Query(df, columns, metrics, bounds, public_keys)

# Act
budget = dataframes.Budget(1e6, 1e-1) # large budget to get small noise
result_df = query.run_query(budget)

# Assert
pandas_df = result_df.toPandas()
pandas_df = pandas_df.sort_values(by=['group_key']).reset_index(
drop=True)
self.assertLen(pandas_df, 2)
# check row[0] = "key0", "b", 0+noise, 0+noise
row0 = pandas_df.loc[0]
self.assertEqual(row0["group_key"], "key0")
self.assertEqual(row0["second_group_by_key"], "b")
self.assertAlmostEqual(row0["count"], 0, delta=1e-3)
self.assertAlmostEqual(row0["sum_column"], 0, delta=1e-3)
# check row[1] = "key1", "a", 1+noise, 3+noise
row1 = pandas_df.loc[1]
self.assertEqual(row1["group_key"], "key1")
self.assertEqual(row1["second_group_by_key"], "a")
self.assertAlmostEqual(row1["count"], 1, delta=1e-3)
self.assertAlmostEqual(row1["sum_column"], 5, delta=1e-3)


if __name__ == '__main__':
absltest.main()

0 comments on commit 5031a3b

Please sign in to comment.