Skip to content

Commit

Permalink
Merge branch 'main' into fix_utility_analysis2
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym committed Aug 2, 2023
2 parents 5b91acd + 0dee86d commit bfefada
Show file tree
Hide file tree
Showing 30 changed files with 1,083 additions and 335 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
max-parallel: 3
matrix:
os: [ubuntu-latest]
python-version: [3.7]
python-version: [3.8]
runs-on: ${{ matrix.os }}

steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
max-parallel: 3
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
python-version: ["3.7", "3.8", "3.9"]
python-version: ["3.8", "3.9", "3.10"]

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ on Apache Beam:

`pip install pipeline-dp apache-beam`.

Supported Python version >= 3.7.
Supported Python version >= 3.8.

**Note for Apple Silicon users:** PipelineDP pip package is currently available only
for x86 architecture. The reason is that [PyDP](https://github.com/OpenMined/PyDP) does not
Expand Down
113 changes: 72 additions & 41 deletions analysis/cross_partition_combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
import pipeline_dp
from analysis import metrics
import dataclasses
from typing import Iterable, List, Optional, Tuple
from typing import List, Tuple, Callable
import math


def _sum_metrics_to_data_dropped(
sum_metrics: metrics.SumMetrics, partition_keep_probability: float,
dp_metric: pipeline_dp.Metrics) -> metrics.DataDropInfo:
dp_metric: pipeline_dp.Metric) -> metrics.DataDropInfo:
"""Finds Data drop information from per-partition metrics."""
# TODO(dvadym): implement for Sum
assert dp_metric != pipeline_dp.Metrics.SUM, "Cross-partition metrics are not implemented for SUM"
Expand Down Expand Up @@ -63,7 +63,8 @@ def _create_contribution_bounding_errors(


def _sum_metrics_to_value_error(sum_metrics: metrics.SumMetrics,
keep_prob: float) -> metrics.ValueErrors:
keep_prob: float,
weight: float) -> metrics.ValueErrors:
"""Creates ValueErrors from per-partition metrics."""
value = sum_metrics.sum
bounding_errors = _create_contribution_bounding_errors(sum_metrics)
Expand All @@ -83,12 +84,18 @@ def _sum_metrics_to_value_error(sum_metrics: metrics.SumMetrics,
l1=l1,
rmse_with_dropped_partitions=rmse_with_dropped_partitions,
l1_with_dropped_partitions=l1_with_dropped_partitions)
if weight != 1:
# Weight per-partition result for computing weighted sum.
_multiply_float_dataclasses_field(result,
weight,
fields_to_ignore=["noise_std"])
return result


def _sum_metrics_to_metric_utility(
sum_metrics: metrics.SumMetrics, dp_metric: pipeline_dp.Metrics,
partition_keep_probability: float) -> metrics.MetricUtility:
sum_metrics: metrics.SumMetrics, dp_metric: pipeline_dp.Metric,
partition_keep_probability: float,
partition_weight: float) -> metrics.MetricUtility:
"""Creates cross-partition MetricUtility from 1 partition utility.
Attributes:
Expand All @@ -99,8 +106,9 @@ def _sum_metrics_to_metric_utility(
data_dropped = _sum_metrics_to_data_dropped(sum_metrics,
partition_keep_probability,
dp_metric)
absolute_error = _sum_metrics_to_value_error(
sum_metrics, keep_prob=partition_keep_probability)
absolute_error = _sum_metrics_to_value_error(sum_metrics,
partition_keep_probability,
partition_weight)
relative_error = absolute_error.to_relative(sum_metrics.sum)

return metrics.MetricUtility(metric=dp_metric,
Expand Down Expand Up @@ -187,13 +195,13 @@ def _multiply_float_dataclasses_field(dataclass,

def _per_partition_to_utility_report(
per_partition_utility: metrics.PerPartitionMetrics,
dp_metrics: List[pipeline_dp.Metrics],
public_partitions: bool) -> metrics.UtilityReport:
dp_metrics: List[pipeline_dp.Metric], public_partitions: bool,
partition_weight: float) -> metrics.UtilityReport:
"""Converts per-partition metrics to cross-partition utility report."""
# Fill partition selection metrics.
if public_partitions:
prob_to_keep = 1
is_empty_partition = False # TODO(dvadym): compute this
is_empty_partition = per_partition_utility.raw_statistics.count == 0
partition_metrics = _partition_metrics_public_partitions(
is_empty_partition)
else:
Expand All @@ -208,7 +216,7 @@ def _per_partition_to_utility_report(
dp_metrics):
metric_errors.append(
_sum_metrics_to_metric_utility(metric_error, dp_metric,
prob_to_keep))
prob_to_keep, partition_weight))

return metrics.UtilityReport(configuration_index=-1,
partitions_info=partition_metrics,
Expand Down Expand Up @@ -249,26 +257,41 @@ def _merge_utility_reports(report1: metrics.UtilityReport,
_merge_metric_utility(utility1, utility2)


def _average_utility_report(report: metrics.UtilityReport,
public_partitions: bool,
sums_actual: Tuple) -> None:
def _average_utility_report(report: metrics.UtilityReport, sums_actual: Tuple,
total_weight: float) -> None:
"""Averages fields of the 'report' across partitions."""
partitions = report.partitions_info
if public_partitions:
num_output_partitions = partitions.num_dataset_partitions + partitions.num_empty_partitions
else:
num_output_partitions = partitions.kept_partitions.mean
_multiply_float_dataclasses_field(report.partitions_info,
1.0 / num_output_partitions)
if report.metric_errors:
for sum_actual, metric_error in zip(sums_actual, report.metric_errors):
_multiply_float_dataclasses_field(
metric_error,
1.0 / num_output_partitions,
fields_to_ignore=["noise_std", "ratio_data_dropped"])
scaling_factor = 1 if sum_actual == 0 else 1.0 / sum_actual
_multiply_float_dataclasses_field(metric_error.ratio_data_dropped,
scaling_factor)
if not report.metric_errors:
return

for sum_actual, metric_error in zip(sums_actual, report.metric_errors):
_multiply_float_dataclasses_field(
metric_error,
1.0 / total_weight,
fields_to_ignore=["noise_std", "ratio_data_dropped"])
scaling_factor = 1 if sum_actual == 0 else 1.0 / sum_actual
_multiply_float_dataclasses_field(metric_error.ratio_data_dropped,
scaling_factor)


def partition_size_weight_fn(
per_partition_metrics: metrics.PerPartitionMetrics) -> float:
"""Weights partitions according to their size."""
# Only one metric is calculated as of now.
return per_partition_metrics.metric_errors[0].sum


def equal_weight_fn(
per_partition_metrics: metrics.PerPartitionMetrics) -> float:
"""Weights partitions according to their probability to be kept."""
# For the public partitions weights will be 1, and we will do normal
# averaging because total weight will equal to the total number of
# partitions. The function assumes that
# partition_selection_probability_to_keep for public partitions is 1 and all
# public partitions including empty are processed in CrossPartitionCombiner.
# For private partitions we will do weighted average and
# total weight will equal to mean number of kept partitions
# (`partitions.kept_partitions.mean`).
return per_partition_metrics.partition_selection_probability_to_keep


class CrossPartitionCombiner(pipeline_dp.combiners.Combiner):
Expand All @@ -277,33 +300,41 @@ class CrossPartitionCombiner(pipeline_dp.combiners.Combiner):
# 1. The sum of non dp metrics, which is used for averaging of error
# metrics.
# 2. metrics.UtilityReport contains error metrics.
AccumulatorType = Tuple[Tuple, metrics.UtilityReport]

def __init__(self, dp_metrics: List[pipeline_dp.Metrics],
public_partitions: bool):
# 3. Accumulated weight. Used to calculate total weight after accumulation.
# During creation of accumulator in `create_accumulator` the initial weight
# is applied to metric errors of a partition.
AccumulatorType = Tuple[Tuple, metrics.UtilityReport, float]

def __init__(self,
dp_metrics: List[pipeline_dp.Metric],
public_partitions: bool,
weight_fn: Callable[[metrics.PerPartitionMetrics],
float] = equal_weight_fn):
self._dp_metrics = dp_metrics
self._public_partitions = public_partitions
self._weight_fn = weight_fn

def create_accumulator(
self, metrics: metrics.PerPartitionMetrics) -> AccumulatorType:
actual_metrics = tuple(me.sum for me in metrics.metric_errors)
weight = self._weight_fn(metrics)
return actual_metrics, _per_partition_to_utility_report(
metrics, self._dp_metrics, self._public_partitions)
metrics, self._dp_metrics, self._public_partitions, weight), weight

def merge_accumulators(self, acc1: AccumulatorType,
acc2: AccumulatorType) -> AccumulatorType:
sum_actual1, report1 = acc1
sum_actual2, report2 = acc2
sum_actual1, report1, weight1 = acc1
sum_actual2, report2, weight2 = acc2
sum_actual = tuple(x + y for x, y in zip(sum_actual1, sum_actual2))
_merge_utility_reports(report1, report2)
return sum_actual, report1
weight = weight1 + weight2
return sum_actual, report1, weight

def compute_metrics(self, acc: AccumulatorType) -> metrics.UtilityReport:
"""Returns UtilityReport with final metrics."""
sum_actual, report = acc
sum_actual, report, total_weight = acc
report_copy = copy.deepcopy(report)
_average_utility_report(report_copy, self._public_partitions,
sum_actual)
_average_utility_report(report_copy, sum_actual, total_weight)
return report_copy

def metrics_names(self):
Expand Down
108 changes: 108 additions & 0 deletions analysis/dataset_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright 2023 OpenMined.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains dataset summary and the computation of the summary."""

import pipeline_dp
import dataclasses
from typing import Iterable


@dataclasses.dataclass
class PublicPartitionsSummary:
num_dataset_public_partitions: int
num_dataset_non_public_partitions: int
num_empty_public_partitions: int


_DATASET_PUBLIC = 1
_EMPTY_PUBLIC = 2
_DATASET_NONPUBLIC = 3


def compute_public_partitions_summary(col, backend: pipeline_dp.PipelineBackend,
extractors: pipeline_dp.DataExtractors,
public_partitions):
"""Computes Public Partitions Summary from dataset and public partitions.
Args:
col: the raw dataset. The collection where all elements are of the same
type.
backend: pipeline backend which corresponds to the type of 'col'.
extractors: functions that extract needed pieces of information
from elements of 'col'.
public_partitions: a collection of partition keys that will be present
in the result. If not provided, partitions will be selected in a DP
manner.
Returns:
1 element collection, which contains a PublicPartitionsSummary object.
"""
dataset_partitions = backend.map(col, extractors.partition_extractor,
"Extract partitions")
# (partition)

dataset_partitions = backend.distinct(dataset_partitions, "Distinct")
# (partition)

dataset_partitions = backend.map(dataset_partitions, lambda x: (x, True),
"Keyed by partition")
# (partition, is_from_dataset=True)

public_partitions = backend.map(public_partitions, lambda x: (x, False),
"Keyed by partition")
# (partition, is_from_dataset = False)

partitions = backend.flatten([dataset_partitions, public_partitions],
"flatten")
# (partition, is_from_dataset: bool)

col = backend.group_by_key(partitions, "Group by Key")

# (partition, Iterable)

def process_fn(_, a: Iterable[bool]) -> int:
# a contains up to 2 booleans.
# True means that the partition is dataset.
# False means that the partition is in public partitions.
a = list(a)
if len(a) == 2:
return _DATASET_PUBLIC
if a[0]:
return _DATASET_NONPUBLIC
return _EMPTY_PUBLIC

col = backend.map_tuple(col, process_fn, "Get Partition Type")
# (partition_type:int)

col = backend.count_per_element(col, "Count partition types")
# (partition_type:int, count_partition_type:int)

col = backend.to_list(col, "To list")

# 1 element with list of tuples (partition_type, count_partition_type)

def to_summary(partition_types_count: list) -> PublicPartitionsSummary:
num_dataset_public = num_dataset_non_public = num_empty_public = 0
for type, count in partition_types_count:
if type == _DATASET_PUBLIC:
num_dataset_public = count
elif type == _DATASET_NONPUBLIC:
num_dataset_non_public = count
else:
num_empty_public = count

return PublicPartitionsSummary(num_dataset_public,
num_dataset_non_public, num_empty_public)

return backend.map(col, to_summary, "ToSummary")
11 changes: 9 additions & 2 deletions analysis/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,16 @@ class SumMetrics:
noise_kind: pipeline_dp.NoiseKind


@dataclass
class RawStatistics:
privacy_id_count: int
count: int


@dataclass
class PerPartitionMetrics:
partition_selection_probability_to_keep: float
raw_statistics: RawStatistics
metric_errors: Optional[List[SumMetrics]] = None


Expand Down Expand Up @@ -195,7 +202,7 @@ class MetricUtility:
absolute_error: error in terms of (dp_value - actual_value).
relative_error: error in terms of (dp_value - actual_value)/actual_value.
"""
metric: pipeline_dp.Metrics
metric: pipeline_dp.Metric

# Noise information.
noise_std: float
Expand Down Expand Up @@ -245,7 +252,7 @@ class UtilityReport:
Attributes:
configuration_index: the index of the input parameter configuration for
which this report was computed.
partition_metrics: utility analysis of selected partition.
partitions_info: utility analysis of selected partition.
metric_errors: utility analysis of metrics (e.g. COUNT, SUM,
PRIVACY_ID_COUNT).
utility_report_histogram:
Expand Down
Loading

0 comments on commit bfefada

Please sign in to comment.