Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
RamSaw committed Jul 27, 2023
1 parent 3459ee9 commit 26fbe27
Showing 1 changed file with 34 additions and 33 deletions.
67 changes: 34 additions & 33 deletions analysis/cross_partition_combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def _merge_utility_reports(report1: metrics.UtilityReport,


def _average_utility_report(report: metrics.UtilityReport, sums_actual: Tuple,
total_weights: Tuple[float]) -> None:
total_weights: Tuple) -> None:
"""Averages fields of the 'report' across partitions."""
if not report.metric_errors:
return
Expand All @@ -272,59 +272,60 @@ def _average_utility_report(report: metrics.UtilityReport, sums_actual: Tuple,
scaling_factor)


def partition_size_weight_fn(
per_partition_metrics: metrics.PerPartitionMetrics) -> Tuple[float]:
return tuple(map(lambda el: el.sum, per_partition_metrics.metric_errors))


def equal_weight_fn(
per_partition_metrics: metrics.PerPartitionMetrics) -> Tuple[float]:
"""
Assumes that partition_selection_probability_to_keep for public
partitions is 1 and all public partitions including are processed
via `create_accumulator`.
For the public partitions weights will be 1, and we will do normal
averaging because total weight will equal to the total number of
partitions. For private partitions we will do weighted average and
total weight will equal to mean number of kept partitions
(`partitions.kept_partitions.mean`).
"""
return ((per_partition_metrics.partition_selection_probability_to_keep,) *
len(per_partition_metrics.metric_errors))


class CrossPartitionCombiner(pipeline_dp.combiners.Combiner):
"""A combiner for aggregating error metrics across partitions"""
# Accumulator is a tuple of
# 1. The sum of non dp metrics, which is used for averaging of error
# metrics.
# 2. metrics.UtilityReport contains error metrics.
AccumulatorType = Tuple[Tuple, metrics.UtilityReport, Tuple[float]]

@staticmethod
def partition_contribution_weighing_function(
metrics: metrics.PerPartitionMetrics) -> Tuple[float]:
return tuple(map(lambda el: el.sum, metrics.metric_errors))

@staticmethod
def equal_weighing_function(
metrics: metrics.PerPartitionMetrics) -> Tuple[float]:
"""
Assumes that partition_selection_probability_to_keep for public
partitions is 1 and all public partitions including are processed
via `create_accumulator`.
For the public partitions weights will be 1, and we will do normal
averaging because total weight will equal to the total number of
partitions. For private partitions we will do weighted average and
total weight will equal to mean number of kept partitions
(`partitions.kept_partitions.mean`).
"""
return (metrics.partition_selection_probability_to_keep,) * len(
metrics.metric_errors)

def __init__(
self,
dp_metrics: List[pipeline_dp.Metrics],
public_partitions: bool,
weighing_function: Callable[[metrics.PerPartitionMetrics],
Tuple[float]] = equal_weighing_function):
# 3. Accumulated weight. Used to weigh parts during their aggregation.
AccumulatorType = Tuple[Tuple, metrics.UtilityReport, Tuple]

def __init__(self,
dp_metrics: List[pipeline_dp.Metric],
public_partitions: bool,
weight_fn: Callable[[metrics.PerPartitionMetrics],
Tuple[float]] = equal_weight_fn):
self._dp_metrics = dp_metrics
self._public_partitions = public_partitions
self.weighing_function = weighing_function
self._weight_fn = weight_fn

def create_accumulator(
self, metrics: metrics.PerPartitionMetrics) -> AccumulatorType:
actual_metrics = tuple(me.sum for me in metrics.metric_errors)
return actual_metrics, _per_partition_to_utility_report(
metrics, self._dp_metrics,
self._public_partitions), self.weighing_function(metrics)
self._public_partitions), self._weight_fn(metrics)

def merge_accumulators(self, acc1: AccumulatorType,
acc2: AccumulatorType) -> AccumulatorType:
sum_actual1, report1, weight1 = acc1
sum_actual2, report2, weight2 = acc2
sum_actual = tuple(x + y for x, y in zip(sum_actual1, sum_actual2))
_merge_utility_reports(report1, report2)
return sum_actual, report1, tuple(map(sum, zip(weight1, weight2)))
weight = tuple(map(sum, zip(weight1, weight2)))
return sum_actual, report1, weight

def compute_metrics(self, acc: AccumulatorType) -> metrics.UtilityReport:
"""Returns UtilityReport with final metrics."""
Expand Down

0 comments on commit 26fbe27

Please sign in to comment.