Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stateful processing using direct runner with type checks enabled #27646

Merged
merged 8 commits into from
Aug 10, 2023
19 changes: 19 additions & 0 deletions sdks/python/apache_beam/transforms/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from apache_beam.metrics import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.pvalue import AsList
Expand Down Expand Up @@ -1041,6 +1042,24 @@ def test_output_typehints(self):
ShardedKeyType[typehints.Tuple[int, int]], # type: ignore[misc]
typehints.Iterable[str]])

def test_runtime_type_check(self):
options = PipelineOptions()
options.view_as(TypeOptions).runtime_type_check = True
with TestPipeline(options=options) as pipeline:
collection = (
pipeline
| beam.Create(GroupIntoBatchesTest._create_test_data())
| util.GroupIntoBatches(GroupIntoBatchesTest.BATCH_SIZE))
num_batches = collection | beam.combiners.Count.Globally()
assert_that(
num_batches,
equal_to([
int(
math.ceil(
GroupIntoBatchesTest.NUM_ELEMENTS /
GroupIntoBatchesTest.BATCH_SIZE))
]))

def _test_runner_api_round_trip(self, transform, urn):
context = pipeline_context.PipelineContext()
proto = transform.to_runner_api(context)
Expand Down
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/typehints/typecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ def __init__(self, dofn):
super().__init__()
self.dofn = dofn

def __getattribute__(self, name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also we should now be able to drop setup and teardown overrides.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change the setup and teardown are no longer called in here:

def test_wrapper_pass_through(self):
# We use a file to check the result because the MyDoFn instance passed is
# not the same one that actually runs in the pipeline (it is serialized
# here and deserialized in the worker).
with tempfile.TemporaryDirectory() as tmp_dirname:
path = os.path.join(tmp_dirname + "tmp_filename")
dofn = MyDoFn(path)
result = self.p | beam.Create([1, 2, 3]) | beam.ParDo(dofn)
assert_that(result, equal_to([1, 2, 3]))
self.p.run()
with open(path, mode="r") as ft:
lines = [line.strip() for line in ft]
self.assertListEqual([
'setup',
'start_bundle',
'process',
'process',
'process',
'finish_bundle',
'teardown',
],
lines)

Reverted for now: 81cd93a

Any way we can proceed as is? Otherwise please suggest where to start digging, thanks!

if (name.startswith('_') or name in self.__dict__ or
hasattr(type(self), name)):
return object.__getattribute__(self, name)
else:
return getattr(self.dofn, name)

def _inspect_start_bundle(self):
return self.dofn.get_function_arguments('start_bundle')

Expand Down
Loading