Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Multi aggregate #296

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
57 changes: 31 additions & 26 deletions examples/movie_view_ratings/run_on_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@
"""

from absl import app
from absl import flags
# from absl import flags
import apache_beam as beam
from apache_beam.runners.portability import fn_api_runner
import pipeline_dp
from pipeline_dp import private_beam
from pipeline_dp import SumParams
from pipeline_dp import SumParams, PrivacyIdCountParams
from pipeline_dp.private_beam import MakePrivate
from common_utils import ParseFile

FLAGS = flags.FLAGS
flags.DEFINE_string('input_file', None, 'The file with the movie view data')
flags.DEFINE_string('output_file', None, 'Output file')
# FLAGS = flags.FLAGS
# flags.DEFINE_string('input_file', None, 'The file with the movie view data')
# flags.DEFINE_string('output_file', None, 'Output file')


def main(unused_argv):
Expand All @@ -49,7 +49,7 @@ def main(unused_argv):

# Load and parse input data
movie_views_pcol = pipeline | \
beam.io.ReadFromText(FLAGS.input_file) | \
beam.io.ReadFromText(input_file) | \
beam.ParDo(ParseFile())

# Wrap Beam's PCollection into it's private version
Expand All @@ -58,36 +58,41 @@ def main(unused_argv):
budget_accountant=budget_accountant,
privacy_id_extractor=lambda mv: mv.user_id))

explain_computation_report = pipeline_dp.ExplainComputationReport()
global_params = pipeline_dp.aggregate_params.AggregationBuilderParams(
partition_extractor=lambda mv: mv.movie_id,
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
max_partitions_contributed=2,
max_contributions_per_partition=1,
)

value_range = pipeline_dp.aggregate_params.Range(min_value=1,
max_value=5)

# explain_computation_report = pipeline_dp.ExplainComputationReport()
# Calculate the private sum
dp_result = private_movie_views | "Private Sum" >> private_beam.Sum(
SumParams(
# Limits to how much one user can contribute:
# .. at most two movies rated per user
max_partitions_contributed=2,
# .. at most one rating for each movie
max_contributions_per_partition=1,
# .. with minimal rating of "1"
min_value=1,
# .. and maximum rating of "5"
max_value=5,
# The aggregation key: we're grouping data by movies
partition_extractor=lambda mv: mv.movie_id,
# The value we're aggregating: we're summing up ratings
value_extractor=lambda mv: mv.rating))
dp_result = private_movie_views | \
"Private aggregate" >> private_beam.AggregationBuilder(global_params, [1,2,3,4,5]).count().\
aggregate_value(lambda mv:mv.rating, metrics=[pipeline_dp.Metrics.MEAN, pipeline_dp.Metrics.PERCENTILE(50), pipeline_dp.Metrics.PERCENTILE(90)], output_column_prefix="rating1", value_range=value_range).\
aggregate_value(lambda mv:mv.rating, metrics=pipeline_dp.Metrics.MEAN, output_column_prefix="rating2", value_range=value_range)
budget_accountant.compute_budgets()

# Generate the Explain Computation Report. It must be called after
# budget_accountant.compute_budgets().
print(explain_computation_report.text())
# print(explain_computation_report.text())

# Save the results
dp_result | beam.io.WriteToText(FLAGS.output_file)
dp_result | beam.io.WriteToText(output_file)

return 0


input_file = "/usr/local/google/home/dvadym/data/movie_views/netflix_dataset_100000.txt"
output_file = "/usr/local/google/home/dvadym/IdeaProjects/Dev/dp_100000b"

# --input_file=/usr/local/google/home/dvadym/data/movie_views/netflix_dataset_100000.txt
# --output_file=/usr/local/google/home/dvadym/IdeaProjects/dp_100000

if __name__ == '__main__':
flags.mark_flag_as_required("input_file")
flags.mark_flag_as_required("output_file")
# flags.mark_flag_as_required("input_file")
# flags.mark_flag_as_required("output_file")
app.run(main)
26 changes: 26 additions & 0 deletions pipeline_dp/aggregate_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

from pipeline_dp import input_validators

import pipeline_dp


@dataclass
class Metric:
Expand Down Expand Up @@ -611,6 +613,30 @@ def __post_init__(self):
"transform.")


@dataclass
class AggregationBuilderParams:
# todo: add budget weight
partition_extractor: Callable
noise_kind: NoiseKind
max_partitions_contributed: Optional[int] = None
max_contributions_per_partition: Optional[int] = None
max_contributions: Optional[int] = None

def __post_init__(self):
pass # TODO: l0, linf or l1


@dataclass
class Range:
min_value: float
max_value: float

def __post_init__(self):
if self.min_value > self.max_value:
raise ValueError(
f"min_value={self.min_value} < max_value={self.max_value}")


def _not_a_proper_number(num: Any) -> bool:
"""Returns true if num is inf or NaN, false otherwise."""
return math.isnan(num) or math.isinf(num)
Expand Down
4 changes: 4 additions & 0 deletions pipeline_dp/pipeline_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ def distinct(self, col, stage_name: str):
def to_list(self, col, stage_name: str):
return col | self._ulg.unique(stage_name) >> beam.combiners.ToList()

def to_pcollection(self, col, not_col, stage_name: str):
return col.pipeline | self._ulg.unique(stage_name) >> beam.Create(
not_col)

def annotate(self, col, stage_name: str, **kwargs):
if not _annotators:
return col
Expand Down
197 changes: 196 additions & 1 deletion pipeline_dp/private_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import collections
import dataclasses
import typing
from apache_beam.transforms import ptransform
from abc import abstractmethod
from typing import Callable, Optional
from typing import Callable, Optional, Sequence, Union
from apache_beam import pvalue
import apache_beam as beam

Expand Down Expand Up @@ -313,6 +314,7 @@ def __init__(self,
pipeline_dp.ExplainComputationReport] = None):
"""Initialize.


Args:
count_params: parameters for calculation
public_partitions: A collection of partition keys that will be
Expand Down Expand Up @@ -647,3 +649,196 @@ def expand(self, pcol: pvalue.PCollection):
# dp_result : (partition_key, result)

return dp_result


# Cache for namedtuple types. It is should be used only in
# '_get_or_create_named_tuple()' function.
_agg_named_tuple_cache = {}


def _get_or_create_named_tuple(type_name: str,
field_names: tuple) -> 'MetricsTuple':
"""Creates namedtuple type with a custom serializer."""

# The custom serializer is required for supporting serialization of
# namedtuples in Apache Beam.
cache_key = (type_name, field_names)
named_tuple = _agg_named_tuple_cache.get(cache_key)
if named_tuple is None:
named_tuple = collections.namedtuple(type_name, field_names)
named_tuple.__reduce__ = lambda self: (_create_named_tuple_instance,
(type_name, field_names,
tuple(self)))
_agg_named_tuple_cache[cache_key] = named_tuple
return named_tuple


def _create_named_tuple_instance(type_name: str, field_names: tuple, values):
chinmayshah99 marked this conversation as resolved.
Show resolved Hide resolved
return _get_or_create_named_tuple(type_name, field_names)(*values)


@dataclasses.dataclass
class _ValueAggregationParams:
value_extractor: Callable
metrics: Union[pipeline_dp.Metrics, Sequence[pipeline_dp.Metrics]]
output_col_name: str
value_range: aggregate_params.Range


class AggregationBuilder(PrivatePTransform):
"""Transform class for performing multiple aggregations on a PrivatePCollection."""

def __init__(self,
params: aggregate_params.AggregationBuilderParams,
public_partitions=None,
label=None):
super().__init__(return_anonymized=True, label=label)
self._params = params
self._public_partitions = public_partitions
self._aggregations = []
self._perform_count = False
self._perform_privacy_id_count = False

def count(self) -> "AggregationBuilder":
"""Adds COUNT aggregation to the computation graph and returns self."""
self._perform_count = True
return self

def privacy_id_count(self) -> "AggregationBuilder":
"""Adds PRIVACY_ID_COUNT aggregation to the computation graph and returns self."""
self._perform_privacy_id_count = True
return self

def aggregate_value(
self, value_extractor: Callable,
metrics: Union[pipeline_dp.Metrics, Sequence[pipeline_dp.Metrics]],
output_column_prefix: str,
value_range: aggregate_params.Range) -> "AggregationBuilder":
"""Adds value aggregations to the computation graph and returns self.

Args:
value_extractor: a function which, given an input element, will
return the value for aggregation.
metrics: 1 or more metrics to compute. Supported Metrics are
SUM, MEAN, VARIANCE, PERCENTILE.
output_column_prefix: the prefix of the column for the resulting
aggregate metric. The output columns for 'metric' will be
f"{output_column_prefix}_{metric}".
value_range: the range for value."""

if isinstance(metrics, pipeline_dp.aggregate_params.Metric):
metrics = [metrics]

for m in metrics:
if m == pipeline_dp.Metrics.COUNT:
raise ValueError(
"Use AggregationBuilder.count() for computing DP count.")
if m == pipeline_dp.Metrics.PRIVACY_ID_COUNT:
raise ValueError(
"Use AggregationBuilder.privacy_id_count() for computing DP privacy id count."
)
if m == pipeline_dp.Metrics.VECTOR_SUM:
raise NotImplementedError(
"Vector sum is not implemented in AggregationBuilder")

self._aggregations.append(
_ValueAggregationParams(value_extractor, metrics,
output_column_prefix, value_range))
return self

def expand(self, pcol: pvalue.PCollection) -> pvalue.PCollection:
backend, dp_engine = self._create_dp_engine()
privacy_id_extractor = lambda x: x[0]
partition_extractor = lambda x: self._params.partition_extractor(x[1])

# Partition selection.
if self._public_partitions is None:
# Performs private partition selection once and then use selected
# partitions as public partitions.
selection_partition_params = aggregate_params.SelectPartitionsParams(
max_partitions_contributed=self._params.
max_partitions_contributed)
data_extractors = pipeline_dp.DataExtractors(
privacy_id_extractor, partition_extractor)
partitions = dp_engine.select_partitions(
pcol, selection_partition_params, data_extractors)
else:
partitions = self._public_partitions

output_pcollections = []

# COUNT and PRIVACY_ID_COUNT aggregations.
count_metrics = []
if self._perform_count:
count_metrics.append(pipeline_dp.Metrics.COUNT)
if self._perform_privacy_id_count:
count_metrics.append(pipeline_dp.Metrics.PRIVACY_ID_COUNT)
if count_metrics:
data_extractors = pipeline_dp.DataExtractors(
privacy_id_extractor,
partition_extractor,
value_extractor=lambda _: 0)
output_pcollections.append(
dp_engine.aggregate(
pcol, self._create_aggregate_params(count_metrics),
data_extractors, partitions))

# Value aggregations.
for value_aggregation_params in self._aggregations:
value_extractor = lambda x: value_aggregation_params.value_extractor(
x[1])
data_extractors = pipeline_dp.DataExtractors(
privacy_id_extractor, partition_extractor, value_extractor)
dp_engine_aggregate_params = self._create_aggregate_params(
value_aggregation_params.metrics,
value_aggregation_params.value_range)
output_pcollections.append(
dp_engine.aggregate(pcol, dp_engine_aggregate_params,
data_extractors, partitions))

n_aggregations = len(output_pcollections)
if count_metrics:
col_names = [""]
else:
col_names = []
col_names.extend(
[agg.output_col_name + "_" for agg in self._aggregations])

def pack_per_partition(join_data):
column_names = []
values = []
for i in range(n_aggregations):
pk, join_dict = join_data
metric_tuple = join_dict[i][0]
metric_names = sorted(metric_tuple._fields)
for m in metric_names:
column_names.append(f"{col_names[i]}{m}")
values.append(getattr(metric_tuple, m))
return _create_named_tuple_instance("ResultTuple",
tuple(column_names),
tuple(values))

dict_pcollections = dict(enumerate(output_pcollections))
return dict_pcollections | backend._ulg.unique(
"CoGroup by partition key") >> beam.CoGroupByKey(
) | backend._ulg.unique("Pack per partition") >> beam.Map(
pack_per_partition)

def _create_aggregate_params(
self,
metrics: list[pipeline_dp.Metrics],
range: Optional[aggregate_params.Range] = None
) -> aggregate_params.AggregateParams:
min_value = max_value = None
if range is not None:
min_value, max_value = range.min_value, range.max_value
return pipeline_dp.AggregateParams(
metrics=metrics,
noise_kind=self._params.noise_kind,
max_partitions_contributed=self._params.max_partitions_contributed,
max_contributions_per_partition=self._params.
max_contributions_per_partition,
max_contributions=self._params.max_contributions,
min_value=min_value,
max_value=max_value,
)
Loading