Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Add a couple quality-of-life improvemenets to testing.util.assert_that #30771

Merged
merged 18 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ def groupby_expr(test=None):
| beam.GroupBy(lambda s: s[0])
| beam.Map(print))
# [END groupby_expr]

if test:
test(grouped)
if test:
test(grouped)


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ def global_aggregate(test=None):
'unit_price', max, 'max_price')
| beam.Map(print))
# [END global_aggregate]

if test:
test(grouped)
if test:
test(grouped)


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ def simple_aggregate(test=None):
'quantity', sum, 'total_quantity')
| beam.Map(print))
# [END simple_aggregate]

if test:
test(grouped)
if test:
test(grouped)


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
from .groupby_simple_aggregate import simple_aggregate
from .groupby_two_exprs import groupby_two_exprs

#
# Temporarily skip all tests in file
# TODO: Reenable https://github.com/apache/beam/issues/30778
__test__ = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you plan to enable this later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a spinoff issue since I think fixing the tests (which were already broken before these changes) is probably out of scope

#30778

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated with a comment linking the issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably preferable to comment out the incorrect asserts rather than disable the tests altogether (which will at least confirm the examples are syntactically correct and run without errors).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Though if you're committing to fix them ASAP, we can get this in.)



class UnorderedList(object):
def __init__(self, contents):
Expand Down
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/testing/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,22 @@ def assert_that(
"""
assert isinstance(actual, pvalue.PCollection), (
'%s is not a supported type for Beam assert' % type(actual))
pipeline = actual.pipeline
if getattr(actual.pipeline, 'result', None):
# The pipeline was already run. The user most likely called assert_that
# after the pipeleline context.
raise RuntimeError(
robertwb marked this conversation as resolved.
Show resolved Hide resolved
'assert_that must be used within a beam.Pipeline context')

# Usually, the uniqueness of the label is left to the pipeline
# writer to guarantee. Since we're in a testing context, we'll
# just automatically append a number to the label if it's
# already in use.
robertwb marked this conversation as resolved.
Show resolved Hide resolved
if label in pipeline.applied_labels:
label_idx = 2
while f"{label}_{label_idx}" in pipeline.applied_labels:
label_idx += 1
label = f"{label}_{label_idx}"

if isinstance(matcher, _EqualToPerWindowMatcher):
reify_windows = True
Expand Down
13 changes: 13 additions & 0 deletions sdks/python/apache_beam/testing/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,19 @@ def test_equal_to_per_window_fail_unmatched_window(self):
equal_to_per_window(expected),
reify_windows=True)

def test_runtimeerror_outside_of_context(self):
with beam.Pipeline() as p:
outputs = (p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x + 1))
with self.assertRaises(RuntimeError):
assert_that(outputs, equal_to([2, 3, 4]))

def test_multiple_assert_that_labels(self):
with beam.Pipeline() as p:
outputs = (p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x + 1))
assert_that(outputs, equal_to([2, 3, 4]))
assert_that(outputs, equal_to([2, 3, 4]))
assert_that(outputs, equal_to([2, 3, 4]))

def test_equal_to_per_window_fail_unmatched_element(self):
with self.assertRaises(BeamAssertException):
start = int(MIN_TIMESTAMP.micros // 1e6) - 5
Expand Down
11 changes: 5 additions & 6 deletions sdks/python/apache_beam/transforms/trigger_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,6 @@ def test_after_processing_time(self):
accumulation_mode=AccumulationMode.DISCARDING)
| beam.GroupByKey()
| beam.Map(lambda x: x[1]))

assert_that(results, equal_to([list(range(total_elements_in_trigger))]))

def test_repeatedly_after_processing_time(self):
Expand Down Expand Up @@ -772,11 +771,11 @@ def test_multiple_accumulating_firings(self):
| beam.GroupByKey()
| beam.FlatMap(lambda x: x[1]))

# The trigger should fire twice. Once after 5 seconds, and once after 10.
# The firings should accumulate the output.
first_firing = [str(i) for i in elements if i <= 5]
second_firing = [str(i) for i in elements]
assert_that(records, equal_to(first_firing + second_firing))
# The trigger should fire twice. Once after 5 seconds, and once after 10.
# The firings should accumulate the output.
first_firing = [str(i) for i in elements if i <= 5]
second_firing = [str(i) for i in elements]
assert_that(records, equal_to(first_firing + second_firing))

def test_on_pane_watermark_hold_no_pipeline_stall(self):
"""A regression test added for
Expand Down
8 changes: 4 additions & 4 deletions sdks/python/apache_beam/transforms/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -991,13 +991,13 @@ def test_constant_k(self):
with TestPipeline() as p:
pc = p | beam.Create(self.l)
with_keys = pc | util.WithKeys('k')
assert_that(with_keys, equal_to([('k', 1), ('k', 2), ('k', 3)], ))
assert_that(with_keys, equal_to([('k', 1), ('k', 2), ('k', 3)], ))

def test_callable_k(self):
with TestPipeline() as p:
pc = p | beam.Create(self.l)
with_keys = pc | util.WithKeys(lambda x: x * x)
assert_that(with_keys, equal_to([(1, 1), (4, 2), (9, 3)]))
assert_that(with_keys, equal_to([(1, 1), (4, 2), (9, 3)]))

@staticmethod
def _test_args_kwargs_fn(x, multiply, subtract):
Expand All @@ -1008,7 +1008,7 @@ def test_args_kwargs_k(self):
pc = p | beam.Create(self.l)
with_keys = pc | util.WithKeys(
WithKeysTest._test_args_kwargs_fn, 2, subtract=1)
assert_that(with_keys, equal_to([(1, 1), (3, 2), (5, 3)]))
assert_that(with_keys, equal_to([(1, 1), (3, 2), (5, 3)]))

def test_sideinputs(self):
with TestPipeline() as p:
Expand All @@ -1021,7 +1021,7 @@ def test_sideinputs(self):
the_singleton: x + sum(the_list) + the_singleton,
si1,
the_singleton=si2)
assert_that(with_keys, equal_to([(17, 1), (18, 2), (19, 3)]))
assert_that(with_keys, equal_to([(17, 1), (18, 2), (19, 3)]))


class GroupIntoBatchesTest(unittest.TestCase):
Expand Down
Loading