Skip to content

Commit

Permalink
Add StringSet metrics to Python SDK (#31969)
Browse files Browse the repository at this point in the history
* Add StringSet metrics to Python SDK

* Address comments

* Use string_set everywhere

* fix leftover SET_STRING_TYPE -> STRING_SET_TYPE
  • Loading branch information
Abacn authored Jul 30, 2024
1 parent ee3d57f commit 17ef888
Show file tree
Hide file tree
Showing 17 changed files with 315 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,12 @@ public static Gauge gauge(Class<?> namespace, String name) {
return new DelegatingGauge(MetricName.named(namespace, name));
}

/**
* Create a metric that can have its new value set, and is aggregated by taking the last reported
* value.
*/
/** Create a metric that accumulates and reports set of unique string values. */
public static StringSet stringSet(String namespace, String name) {
return new DelegatingStringSet(MetricName.named(namespace, name));
}

/**
* Create a metric that can have its new value set, and is aggregated by taking the last reported
* value.
*/
/** Create a metric that accumulates and reports set of unique string values. */
public static StringSet stringSet(Class<?> namespace, String name) {
return new DelegatingStringSet(MetricName.named(namespace, name));
}
Expand Down
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/metrics/cells.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ cdef class GaugeCell(MetricCell):
cdef readonly object data


cdef class StringSetCell(MetricCell):
cdef readonly set data

cdef inline bint _update(self, value) except -1


cdef class DistributionData(object):
cdef readonly libc.stdint.int64_t sum
cdef readonly libc.stdint.int64_t count
Expand Down
75 changes: 75 additions & 0 deletions sdks/python/apache_beam/metrics/cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,62 @@ def to_runner_api_monitoring_info_impl(self, name, transform_id):
ptransform=transform_id)


class StringSetCell(MetricCell):
"""For internal use only; no backwards-compatibility guarantees.
Tracks the current value for a StringSet metric.
Each cell tracks the state of a metric independently per context per bundle.
Therefore, each metric has a different cell in each bundle, that is later
aggregated.
This class is thread safe.
"""
def __init__(self, *args):
super().__init__(*args)
self.data = StringSetAggregator.identity_element()

def add(self, value):
self.update(value)

def update(self, value):
# type: (str) -> None
if cython.compiled:
# We will hold the GIL throughout the entire _update.
self._update(value)
else:
with self._lock:
self._update(value)

def _update(self, value):
self.data.add(value)

def get_cumulative(self):
# type: () -> set
with self._lock:
return set(self.data)

def combine(self, other):
# type: (StringSetCell) -> StringSetCell
combined = StringSetAggregator().combine(self.data, other.data)
result = StringSetCell()
result.data = combined
return result

def to_runner_api_monitoring_info_impl(self, name, transform_id):
from apache_beam.metrics import monitoring_infos

return monitoring_infos.user_set_string(
name.namespace,
name.name,
self.get_cumulative(),
ptransform=transform_id)

def reset(self):
# type: () -> None
self.data = StringSetAggregator.identity_element()


class DistributionResult(object):
"""The result of a Distribution metric."""
def __init__(self, data):
Expand Down Expand Up @@ -553,3 +609,22 @@ def combine(self, x, y):
def result(self, x):
# type: (GaugeData) -> GaugeResult
return GaugeResult(x.get_cumulative())


class StringSetAggregator(MetricAggregator):
@staticmethod
def identity_element():
# type: () -> set
return set()

def combine(self, x, y):
# type: (set, set) -> set
if len(x) == 0:
return y
elif len(y) == 0:
return x
else:
return set.union(x, y)

def result(self, x):
return x
24 changes: 24 additions & 0 deletions sdks/python/apache_beam/metrics/cells_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from apache_beam.metrics.cells import DistributionData
from apache_beam.metrics.cells import GaugeCell
from apache_beam.metrics.cells import GaugeData
from apache_beam.metrics.cells import StringSetCell
from apache_beam.metrics.metricbase import MetricName


Expand Down Expand Up @@ -169,5 +170,28 @@ def test_start_time_set(self):
self.assertGreater(mi.start_time.seconds, 0)


class TestStringSetCell(unittest.TestCase):
def test_not_leak_mutable_set(self):
c = StringSetCell()
c.add('test')
c.add('another')
s = c.get_cumulative()
self.assertEqual(s, set(('test', 'another')))
s.add('yet another')
self.assertEqual(c.get_cumulative(), set(('test', 'another')))

def test_combine_appropriately(self):
s1 = StringSetCell()
s1.add('1')
s1.add('2')

s2 = StringSetCell()
s2.add('1')
s2.add('3')

result = s2.combine(s1)
self.assertEqual(result.data, set(('1', '2', '3')))


if __name__ == '__main__':
unittest.main()
20 changes: 18 additions & 2 deletions sdks/python/apache_beam/metrics/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from apache_beam.metrics.cells import CounterCell
from apache_beam.metrics.cells import DistributionCell
from apache_beam.metrics.cells import GaugeCell
from apache_beam.metrics.cells import StringSetCell
from apache_beam.runners.worker import statesampler
from apache_beam.runners.worker.statesampler import get_current_tracker

Expand Down Expand Up @@ -259,6 +260,12 @@ def get_gauge(self, metric_name):
GaugeCell,
self.get_metric_cell(_TypedMetricName(GaugeCell, metric_name)))

def get_string_set(self, metric_name):
# type: (MetricName) -> StringSetCell
return cast(
StringSetCell,
self.get_metric_cell(_TypedMetricName(StringSetCell, metric_name)))

def get_metric_cell(self, typed_metric_name):
# type: (_TypedMetricName) -> MetricCell
cell = self.metrics.get(typed_metric_name, None)
Expand Down Expand Up @@ -292,7 +299,13 @@ def get_cumulative(self):
v in self.metrics.items() if k.cell_type == GaugeCell
}

return MetricUpdates(counters, distributions, gauges)
string_sets = {
MetricKey(self.step_name, k.metric_name): v.get_cumulative()
for k,
v in self.metrics.items() if k.cell_type == StringSetCell
}

return MetricUpdates(counters, distributions, gauges, string_sets)

def to_runner_api(self):
return [
Expand Down Expand Up @@ -344,7 +357,8 @@ def __init__(
self,
counters=None, # type: Optional[Dict[MetricKey, int]]
distributions=None, # type: Optional[Dict[MetricKey, DistributionData]]
gauges=None # type: Optional[Dict[MetricKey, GaugeData]]
gauges=None, # type: Optional[Dict[MetricKey, GaugeData]]
string_sets=None, # type: Optional[Dict[MetricKey, set]]
):
# type: (...) -> None

Expand All @@ -354,7 +368,9 @@ def __init__(
counters: Dictionary of MetricKey:MetricUpdate updates.
distributions: Dictionary of MetricKey:MetricUpdate objects.
gauges: Dictionary of MetricKey:MetricUpdate objects.
string_sets: Dictionary of MetricKey:MetricUpdate objects.
"""
self.counters = counters or {}
self.distributions = distributions or {}
self.gauges = gauges or {}
self.string_sets = string_sets or {}
9 changes: 9 additions & 0 deletions sdks/python/apache_beam/metrics/execution_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

# pytype: skip-file

import functools
import unittest

from apache_beam.metrics.execution import MetricKey
Expand Down Expand Up @@ -88,24 +89,32 @@ def test_get_cumulative_or_updates(self):
distribution = mc.get_distribution(
MetricName('namespace', 'name{}'.format(i)))
gauge = mc.get_gauge(MetricName('namespace', 'name{}'.format(i)))
str_set = mc.get_string_set(MetricName('namespace', 'name{}'.format(i)))

counter.inc(i)
distribution.update(i)
gauge.set(i)
str_set.add(str(i % 7))
all_values.append(i)

# Retrieve ALL updates.
cumulative = mc.get_cumulative()
self.assertEqual(len(cumulative.counters), 10)
self.assertEqual(len(cumulative.distributions), 10)
self.assertEqual(len(cumulative.gauges), 10)
self.assertEqual(len(cumulative.string_sets), 10)

self.assertEqual(
set(all_values), {v
for _, v in cumulative.counters.items()})
self.assertEqual(
set(all_values), {v.value
for _, v in cumulative.gauges.items()})
self.assertEqual({str(i % 7)
for i in all_values},
functools.reduce(
set.union,
(v for _, v in cumulative.string_sets.items())))


if __name__ == '__main__':
Expand Down
31 changes: 29 additions & 2 deletions sdks/python/apache_beam/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from apache_beam.metrics.metricbase import Distribution
from apache_beam.metrics.metricbase import Gauge
from apache_beam.metrics.metricbase import MetricName
from apache_beam.metrics.metricbase import StringSet

if TYPE_CHECKING:
from apache_beam.metrics.execution import MetricKey
Expand Down Expand Up @@ -115,6 +116,23 @@ def gauge(
namespace = Metrics.get_namespace(namespace)
return Metrics.DelegatingGauge(MetricName(namespace, name))

@staticmethod
def string_set(
namespace: Union[Type, str], name: str) -> 'Metrics.DelegatingStringSet':
"""Obtains or creates a String set metric.
String set metrics are restricted to string values.
Args:
namespace: A class or string that gives the namespace to a metric
name: A string that gives a unique name to a metric
Returns:
A StringSet object.
"""
namespace = Metrics.get_namespace(namespace)
return Metrics.DelegatingStringSet(MetricName(namespace, name))

class DelegatingCounter(Counter):
"""Metrics Counter that Delegates functionality to MetricsEnvironment."""
def __init__(
Expand All @@ -138,11 +156,18 @@ def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self.set = MetricUpdater(cells.GaugeCell, metric_name) # type: ignore[assignment]

class DelegatingStringSet(StringSet):
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self.add = MetricUpdater(cells.StringSetCell, metric_name) # type: ignore[assignment]


class MetricResults(object):
COUNTERS = "counters"
DISTRIBUTIONS = "distributions"
GAUGES = "gauges"
STRINGSETS = "string_sets"

@staticmethod
def _matches_name(filter: 'MetricsFilter', metric_key: 'MetricKey') -> bool:
Expand Down Expand Up @@ -207,11 +232,13 @@ def query(
{
"counters": [MetricResult(counter_key, committed, attempted), ...],
"distributions": [MetricResult(dist_key, committed, attempted), ...],
"gauges": [] // Empty list if nothing matched the filter.
"gauges": [], // Empty list if nothing matched the filter.
"string_sets": [] [MetricResult(string_set_key, committed, attempted),
...]
}
The committed / attempted values are DistributionResult / GaugeResult / int
objects.
/ set objects.
"""
raise NotImplementedError

Expand Down
16 changes: 15 additions & 1 deletion sdks/python/apache_beam/metrics/metricbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@
from typing import Optional

__all__ = [
'Metric', 'Counter', 'Distribution', 'Gauge', 'Histogram', 'MetricName'
'Metric',
'Counter',
'Distribution',
'Gauge',
'StringSet',
'Histogram',
'MetricName'
]


Expand Down Expand Up @@ -138,6 +144,14 @@ def set(self, value):
raise NotImplementedError


class StringSet(Metric):
"""StringSet Metric interface.
Reports set of unique string values during pipeline execution.."""
def add(self, value):
raise NotImplementedError


class Histogram(Metric):
"""Histogram Metric interface.
Expand Down
Loading

0 comments on commit 17ef888

Please sign in to comment.