Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log a warning, if the process method of DoFn returns None #28159

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1423,15 +1423,28 @@ def _check_fn_use_yield_and_return(fn):
source_code = _get_function_body_without_inners(fn)
has_yield = False
has_return = False
return_none_warning = (
"No iterator is returned by the process method in %s.",
fn.__self__.__class__)

for line in source_code.split("\n"):
if line.lstrip().startswith("yield ") or line.lstrip().startswith(
lstripped_line = line.lstrip()

if lstripped_line.startswith("yield ") or lstripped_line.startswith(
"yield("):
has_yield = True
if line.lstrip().startswith("return ") or line.lstrip().startswith(
if lstripped_line.startswith("return ") or lstripped_line.startswith(
"return("):
has_return = True
if lstripped_line.startswith(
"return None") or lstripped_line.rstrip() == "return":
_LOGGER.warning(return_none_warning)
if has_yield and has_return:
return True

if not has_return and not has_yield:
_LOGGER.warning(return_none_warning)

return False
except Exception as e:
_LOGGER.debug(str(e))
Expand Down
38 changes: 38 additions & 0 deletions sdks/python/apache_beam/transforms/core_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import apache_beam as beam

RETURN_NONE_PARTIAL_WARNING = "No iterator is returned"


class TestDoFn1(beam.DoFn):
def process(self, element):
Expand Down Expand Up @@ -84,6 +86,24 @@ def process(self, element):
yield element


class TestDoFn9(beam.DoFn):
"""test process returning None explicitly"""
def process(self, element):
return None


class TestDoFn10(beam.DoFn):
"""test process returning None (no return and no yield)"""
def process(self, element):
pass


class TestDoFn11(beam.DoFn):
"""test process returning None (return statement without a value)"""
def process(self, element):
return


class CreateTest(unittest.TestCase):
@pytest.fixture(autouse=True)
def inject_fixtures(self, caplog):
Expand All @@ -107,6 +127,24 @@ def test_dofn_with_yield_and_return(self):
beam.ParDo(TestDoFn3())
assert warning_text in self._caplog.text

def test_dofn_with_explicit_return_none(self):
with self._caplog.at_level(logging.WARNING):
beam.ParDo(TestDoFn9())
assert RETURN_NONE_PARTIAL_WARNING in self._caplog.text
assert str(TestDoFn9) in self._caplog.text

def test_dofn_with_implicit_return_none_missing_return_and_yield(self):
with self._caplog.at_level(logging.WARNING):
beam.ParDo(TestDoFn10())
assert RETURN_NONE_PARTIAL_WARNING in self._caplog.text
assert str(TestDoFn10) in self._caplog.text

def test_dofn_with_implicit_return_none_return_without_value(self):
with self._caplog.at_level(logging.WARNING):
beam.ParDo(TestDoFn11())
assert RETURN_NONE_PARTIAL_WARNING in self._caplog.text
assert str(TestDoFn11) in self._caplog.text


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
Loading