Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple Aggregates #254

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open

Conversation

preethiraghavan1
Copy link
Contributor

Description

Please include a summary of the change, the motivation, and any additional context that will help others understand your PR. If it closes one or more open issues, please tag them as described here.

Affected Dependencies

How has this been tested?

  • Unit test

Checklist

Copy link
Collaborator

@dvadym dvadym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks the approach is great! I left comments


def expand(self, pcol: pvalue.PCollection):
columns = {
self.col_name[i]: pcol | "agg " + str(i) >> self._getTransform(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the right idea to add numbers to solve problem with duplicating labels!

Nit: f"Aggregation{i}"

Comment about adding numbers to label names: In BeamBackend such function was implemented with UniqueLabelGenerator class. But here it's simple enough, so I think the current approach to add numbers instead of using UniqueLabelGenerator makes sense.

class Aggregate(PrivatePTransform):
"""Transform class for performing multiple aggregations on a PrivatePCollection."""

def __init__(self, label=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've though in more details, in most use cases aggregations will share the same parameters (and also sharing the same parameters will help to optimize performance and utility of queries). Could you please

  1. add argument params of type AggregateParams.

2.add argument partition_extractor_fn

Those arguments will be used in each aggregation

col_name: name of the column for the resulting aggregate value.
agg_type: type of pipeline_dp.Metrics identifying the aggregate
to calculate."""
return _Aggregate([args], col_name=[col_name], agg_type=[agg_type])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks that we can have only one class Aggregate, w/o _Aggregate namely

  1. aggregate_value returns self. aggregate_value saves in some member variable information about aggregations.
  2. expand works as in _Aggregate

The advantage is that it will be simpler and no need to create multiple instances of _Aggregate. WDYT?

_agg_named_tuple_cache = {}


def _get_or_create_named_tuple(type_name: str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is a correct approach to generate dynamic tuples!

def __init__(self, label=None):
super().__init__(return_anonymized=True, label=label)

def aggregate_value(self, *args, col_name: str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all shared parameters will be provided in the constructor, there are just a few parameters that's needed value_extractor

'AggregatesTuple', tuple(["pid"] + [k for k in x[1]]),
tuple([x[0]] + [x[1][k][0] for k in x[1]])))

def _getTransform(self, agg_type: pipeline_dp.Metrics, *args):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to use DPEngine.aggregate instead of PrivatePTransforms Mean, Sum, Count. The benefits are that in future we can optimize performance/utility by computing multiple aggregations with DPEngine.aggregate

@dvadym
Copy link
Collaborator

dvadym commented Apr 11, 2022

class Aggregate can aggregate only for one partition, but by multiple values.

For each value aggregation, we run DPEngine.aggregate we need to have AggregateParams:

class AggregateParams:
    metrics: Iterable[Metrics]
    max_partitions_contributed: int
    max_contributions_per_partition: int
    budget_weight: float = 1
    min_value: float = None
    max_value: float = None
    public_partitions: Any = None
    noise_kind: NoiseKind = NoiseKind.LAPLACE
    custom_combiners: Iterable['CustomCombiner'] = None

Some of those parameters are common for all values to aggregate (i.e. they needed to be specified in constructor in Aggregate), some of them specific for value (i.e. in aggregate_value arguments).

1. def __init__(self, partition_extractor, common_params:CommonAggregateParams, public_partitions=None)
2.def aggregate_value(self, value_extractor, value_params:ValueParams,output_col:str)

class CommonAggregateParams:  # name AggregateParams is already taken, maybe other name?
  max_partitions_contributed: int
  max_contributions_per_partition: int
  noise_kind: NoiseKind = NoiseKind.LAPLACE
class ValueParams:
    budget_weight: float = 1
    min_value: float = None
    max_value: float = None

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants