Skip to content

Commit

Permalink
feat: Allow setting max partitions contributed for dynamic partition
Browse files Browse the repository at this point in the history
B&A inference may use a single metric context to accumulate metrics for multiple models if all are invoked in the same generateBid script.

Previously, the number of models that may accumulate metrics for a metric context is fixed statically.

Bug: 357274958
Change-Id: I9c2464d5adebd013b08676ae18fc0ea1ca2f6f4a
GitOrigin-RevId: 85e7c0206e2adb343c10e45586570eaf1fb7e571
  • Loading branch information
Privacy Sandbox Team authored and copybara-github committed Oct 3, 2024
1 parent cc49da3 commit ffda0e8
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 14 deletions.
12 changes: 7 additions & 5 deletions src/metric/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,9 @@ class Context {
const internal::Partitioned& partitioned, std::string_view name,
bool is_privacy_impacting) {
std::vector<std::pair<std::string, T>> ret;
int max_partitions_contributed =
metric_router_->metric_config().GetMaxPartitionsContributed(partitioned,
name);
if (std::unique_ptr<telemetry::BuildDependentConfig::PartitionView>
partition_view =
metric_router_->metric_config().GetPartition(partitioned, name);
Expand All @@ -493,7 +496,7 @@ class Context {
<< partition << " is not in public_partitions_ ["
<< partitioned.partition_type_ << "] of metric:" << name;
}
if (ret.size() >= partitioned.max_partitions_contributed_) {
if (ret.size() >= max_partitions_contributed) {
break;
}
}
Expand All @@ -503,13 +506,12 @@ class Context {
// just return the value; otherwise it is private partition metric that
// is not implemented yet, log a warning.
ABSL_LOG_IF_EVERY_N_SEC(
WARNING,
is_privacy_impacting && partitioned.max_partitions_contributed_ > 1,
WARNING, is_privacy_impacting && max_partitions_contributed > 1,
kLogLowFreqSec)
<< "public_partitions_ not defined for metric : " << name;
ret.insert(ret.begin(), value.begin(), value.end());
if (ret.size() >= partitioned.max_partitions_contributed_) {
ret.resize(partitioned.max_partitions_contributed_);
if (ret.size() >= max_partitions_contributed) {
ret.resize(max_partitions_contributed);
}
}
return ret;
Expand Down
6 changes: 4 additions & 2 deletions src/metric/context_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ class ContextMap {
// Resets Partition for a list of metrics. The metric list should consist of
// std::string_view of metric names taken from the metric definition.
void ResetPartitionAsync(const std::vector<std::string_view>& metric_list,
const std::vector<std::string>& partition_list) {
metric_router_->ResetPartitionAsync(metric_list, partition_list);
const std::vector<std::string>& partition_list,
int max_partitions_contributed = 0) {
metric_router_->ResetPartitionAsync(metric_list, partition_list,
max_partitions_contributed);
}

const U& metric_router() const { return *metric_router_.get(); }
Expand Down
11 changes: 9 additions & 2 deletions src/metric/dp.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,12 @@ class DifferentiallyPrivate {

// Queues a set partition request until the next metric output.
void ResetPartitionAsync(const std::vector<std::string_view>& metric_list,
const std::vector<std::string>& partition_list)
const std::vector<std::string>& partition_list,
int max_partions_contributed)
ABSL_LOCKS_EXCLUDED(mutex_) {
absl::MutexLock mutex_lock(&mutex_);
reset_partition_request_queue_.push({metric_list, partition_list});
reset_partition_request_queue_.push(
{metric_list, partition_list, max_partions_contributed});
}

~DifferentiallyPrivate() {
Expand All @@ -364,6 +366,7 @@ class DifferentiallyPrivate {
// name.
std::vector<std::string_view> metric_list;
std::vector<std::string> partition_list;
int max_partitions_contributed;
};

// Output aggregated results with DP noise added for all defintions with
Expand Down Expand Up @@ -400,6 +403,10 @@ class DifferentiallyPrivate {
metric_router_.metric_config().SetPartition(
metric_name, std::vector<std::string_view>{partition_list.begin(),
partition_list.end()});
if (request.max_partitions_contributed > 0) {
metric_router_.metric_config().SetMaxPartitionsContributed(
metric_name, request.max_partitions_contributed);
}
}
reset_partition_request_queue_.pop();
}
Expand Down
2 changes: 1 addition & 1 deletion src/metric/dp_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ TEST_F(NoNoiseTest, DifferentiallyPrivateResetPartition) {
A<int>(), _, ElementsAre(Pair(kNoiseAttribute, "Noised"))))
.WillRepeatedly(Return(absl::OkStatus()));

dp.ResetPartitionAsync({kUnitPartitionCounter.name_}, {"buyer_new"});
dp.ResetPartitionAsync({kUnitPartitionCounter.name_}, {"buyer_new"}, 1);

absl::MutexLock lock(&dp.mutex_);
ASSERT_TRUE(dp.OutputNoised().ok());
Expand Down
6 changes: 4 additions & 2 deletions src/metric/metric_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ class MetricRouter {

// Forwards DifferentialPrivate's ResetPartitionAsync to ContextMap.
void ResetPartitionAsync(const std::vector<std::string_view>& metric_list,
const std::vector<std::string>& partition_list) {
dp_.ResetPartitionAsync(metric_list, partition_list);
const std::vector<std::string>& partition_list,
int max_partions_contributed) {
dp_.ResetPartitionAsync(metric_list, partition_list,
max_partions_contributed);
}

// Add callback for observerable metric, must be Privacy:kNonImpacting
Expand Down
18 changes: 17 additions & 1 deletion src/telemetry/flag/telemetry_flag.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,28 @@ void BuildDependentConfig::SetPartition(

int BuildDependentConfig::GetMaxPartitionsContributed(
const metrics::internal::Partitioned& definition,
absl::string_view name) const {
absl::string_view name) const ABSL_LOCKS_EXCLUDED(partition_mutex_) {
{
absl::MutexLock lock(&partition_mutex_);
auto it = internal_config_.find(name);
if (it != internal_config_.end() &&
it->second.has_max_partitions_contributed()) {
return it->second.max_partitions_contributed();
}
}
absl::StatusOr<MetricConfig> metric_config = GetMetricConfig(name);
if (metric_config.ok() && metric_config->has_max_partitions_contributed()) {
return metric_config->max_partitions_contributed();
}
return definition.max_partitions_contributed_;
}

void BuildDependentConfig::SetMaxPartitionsContributed(
std::string_view name, int max_partitions_contributed)
ABSL_LOCKS_EXCLUDED(partition_mutex_) {
absl::MutexLock lock(&partition_mutex_);
internal_config_[name].set_max_partitions_contributed(
max_partitions_contributed);
}

} // namespace privacy_sandbox::server_common::telemetry
6 changes: 5 additions & 1 deletion src/telemetry/flag/telemetry_flag.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ class BuildDependentConfig {

int GetMaxPartitionsContributed(
const metrics::internal::Partitioned& definition,
absl::string_view name) const;
absl::string_view name) const ABSL_LOCKS_EXCLUDED(partition_mutex_);

void SetMaxPartitionsContributed(std::string_view name,
int max_partitions_contributed)
ABSL_LOCKS_EXCLUDED(partition_mutex_);

// Return drop_noisy_values_probability of a metric.
template <typename MetricT>
Expand Down
7 changes: 7 additions & 0 deletions src/telemetry/flag/telemetry_flag_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ TEST(BuildDependentConfig, Partition) {
testing::ElementsAreArray({"123", "456", "789"}));
}

TEST(BuildDependentConfig, SetMaxPartitionsContributed) {
TelemetryConfig config_proto;
BuildDependentConfig config(config_proto);
config.SetMaxPartitionsContributed(partition_metric.name_, 5);
EXPECT_EQ(config.GetMaxPartitionsContributed(partition_metric), 5);
}

constexpr metrics::Definition<int, metrics::Privacy::kImpacting,
metrics::Instrument::kPartitionedCounter>
metric_1("m_1", "", "partition_type", 1, kDefaultBuyers, 1, 0, 0.95);
Expand Down

0 comments on commit ffda0e8

Please sign in to comment.