Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Multi aggregate #296

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
94 changes: 94 additions & 0 deletions pipeline_dp/private_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import collections
import dataclasses
import typing
from apache_beam.transforms import ptransform
Expand Down Expand Up @@ -534,3 +535,96 @@ def expand(self, pcol: pvalue.PCollection):
# dp_result : (partition_key, result)

return dp_result


# Cache for namedtuple types. It is should be used only in
# '_get_or_create_named_tuple()' function.
_agg_named_tuple_cache = {}


def _get_or_create_named_tuple(type_name: str,
field_names: tuple) -> 'MetricsTuple':
"""Creates namedtuple type with a custom serializer."""

# The custom serializer is required for supporting serialization of
# namedtuples in Apache Beam.
cache_key = (type_name, field_names)
named_tuple = _agg_named_tuple_cache.get(cache_key)
if named_tuple is None:
named_tuple = collections.namedtuple(type_name, field_names)
named_tuple.__reduce__ = lambda self: (_create_named_tuple_instance,
(type_name, field_names,
tuple(self)))
_agg_named_tuple_cache[cache_key] = named_tuple
return named_tuple


def _create_named_tuple_instance(type_name: str, field_names: tuple, values):
chinmayshah99 marked this conversation as resolved.
Show resolved Hide resolved
return _get_or_create_named_tuple(type_name, field_names)(*values)


class Aggregate(PrivatePTransform):
"""Transform class for performing multiple aggregations on a PrivatePCollection."""

def __init__(self, label=None):
super().__init__(return_anonymized=True, label=label)

def aggregate_value(self, *args, col_name: str,
agg_type: pipeline_dp.Metrics):
"""Returns _Aggregate transform corresponding to the agg_type

Args:
args: args for Aggregate Transforms like SumParams.)
col_name: name of the column for the resulting aggregate value.
agg_type: type of pipeline_dp.Metrics identifying the aggregate
to calculate."""
return _Aggregate([args], col_name=[col_name], agg_type=[agg_type])


class _Aggregate(PrivatePTransform):

def __init__(self,
*args,
col_name: str,
agg_type: pipeline_dp.Metrics,
label: Optional[str] = None):
super().__init__(return_anonymized=True, label=label)
self.args = args
self.col_name = col_name
self.agg_type = agg_type

def aggregate_value(self, *args, col_name: str,
agg_type: pipeline_dp.Metrics):
return _Aggregate(list(*self.args) + [args],
col_name=list(self.col_name) + [col_name],
agg_type=list(self.agg_type) + [agg_type])

def expand(self, pcol: pvalue.PCollection):
columns = {
self.col_name[i]: pcol | "agg " + str(i) >> self._getTransform(
self.agg_type[i], *self.args[0][i])
for i in range(len(self.col_name))
}
return columns | 'LeftJoiner: Combine' >> beam.CoGroupByKey(
) | beam.Map(lambda x: _create_named_tuple_instance(
'AggregatesTuple', tuple(["pid"] + [k for k in x[1]]),
tuple([x[0]] + [x[1][k][0] for k in x[1]])))

def _getTransform(self, agg_type: pipeline_dp.Metrics, *args):
"""Gets the correct transform corresponding to agg_type."""
transform = None
if agg_type == pipeline_dp.Metrics.MEAN:
transform = Mean(*args)
elif agg_type == pipeline_dp.Metrics.SUM:
transform = Sum(*args)
elif agg_type == pipeline_dp.Metrics.COUNT:
transform = Count(*args)
elif agg_type == pipeline_dp.Metrics.PRIVACY_ID_COUNT:
transform = PrivacyIdCount(*args)
else:
raise NotImplementedError(
"Transform for agg_type: %s is not "
"implemented.", agg_type)
transform.set_additional_parameters(
budget_accountant=self._budget_accountant)
return transform
68 changes: 68 additions & 0 deletions tests/private_beam_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import unittest
import apache_beam as beam
from apache_beam.runners.portability import fn_api_runner
Expand Down Expand Up @@ -41,6 +42,14 @@ def value_per_key_within_tolerance(expected, actual, tolerance):
return actual[0] == expected[0] and abs(actual[1] -
expected[1]) <= tolerance

@staticmethod
def value_per_key_within_tolerance_named_tuple(expected, actual, tolerance):
expected_dict = expected._asdict()
actual_dict = actual._asdict()
return all([(actual_dict[k] == expected_dict[k]) or
(abs(actual_dict[k] - expected_dict[k]) <= tolerance)
for k in actual_dict])

def test_make_private_transform_succeeds(self):
runner = fn_api_runner.FnApiRunner()
with beam.Pipeline(runner=runner) as pipeline:
Expand Down Expand Up @@ -890,6 +899,65 @@ def test_combine_per_returns_sensible_result(self):
equals_fn=lambda e, a: PrivateBeamTest.
value_per_key_within_tolerance(e, a, 10.0)))

def test_multiple_aggregates(self):
with TestPipeline() as pipeline:
# Arrange
col = [(u, "pk1", 100) for u in range(30)]
col += [(f"{u + 20}", "pk2", 100) for u in range(30)]
col += [(f"{u + 30}", "pk1", -100.0) for u in range(30)]
pcol = pipeline | 'Create produce' >> beam.Create(col)
# Use very high epsilon and delta to minimize noise and test
# flakiness.
budget_accountant = budget_accounting.NaiveBudgetAccountant(
total_epsilon=800, total_delta=0.999)
private_collection = (
pcol | 'Create private collection' >> private_beam.MakePrivate(
budget_accountant=budget_accountant,
privacy_id_extractor=lambda x: x[0]))

privacy_id_count_params = aggregate_params.PrivacyIdCountParams(
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
max_partitions_contributed=2,
budget_weight=1,
partition_extractor=lambda x: x[1])
sum_params = aggregate_params.SumParams(
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
max_partitions_contributed=2,
max_contributions_per_partition=3,
min_value=1.55,
max_value=2.7889,
budget_weight=1,
partition_extractor=lambda x: x[1],
value_extractor=lambda x: x[2])

# Act
result = private_collection | private_beam.Aggregate(
).aggregate_value(
privacy_id_count_params,
col_name='privacy_id_count',
agg_type=pipeline_dp.Metrics.PRIVACY_ID_COUNT).aggregate_value(
sum_params,
col_name='sum',
agg_type=pipeline_dp.Metrics.SUM)
budget_accountant.compute_budgets()

# Assert
# This is a health check to validate that the result is sensible.
# Hence, we use a very large tolerance to reduce test flakiness.
beam_util.assert_that(
result,
beam_util.equal_to(
[
collections.namedtuple(
"AggregatesTuple",
['pid', 'privacy_id_count', 'sum'])('pk1', 60, 130),
collections.namedtuple(
"AggregatesTuple",
['pid', 'privacy_id_count', 'sum'])('pk2', 30, 83)
],
equals_fn=lambda e, a: PrivateBeamTest.
value_per_key_within_tolerance_named_tuple(e, a, 10)))


class SumCombineFn(private_beam.PrivateCombineFn):
"""Test-only, not private combine_fn."""
Expand Down