Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] GroupByKey() fails if PCollection is empty and direct_running_mode is set #25315

Closed
SamHjelmfelt opened this issue Feb 5, 2023 · 2 comments · Fixed by #27373 or #27676
Closed
Assignees
Labels
done & done Issue has been reviewed after it was closed for verification, followups, etc. python

Comments

@SamHjelmfelt
Copy link

API: python
apache-beam version: 2.44.0 (installed via pip)
Python version: 3.10.9

The first step of my pipeline collects all files ready for processing using fileio.MatchAll(). It is possible that no files will be available, and file.MatchAll() will output zero records. If zero records are output, the expected behavior is that no processing will be done, but if direct_running_mode is set to any value, an exception is thrown by GroupByKey().

Here is sample code to reproduce the error and examples for all 8 scenarios:

Without direct_running_mode:

with apache_beam.Pipeline() as p:
    p | apache_beam.Create([('test', 'test')]) | apache_beam.Filter(lambda x: True) | apache_beam.GroupByKey() | apache_beam.Map(print)
#output: ('test_key', ['test_value'])

with apache_beam.Pipeline() as p:
    p | apache_beam.Create([('test', 'test')]) | apache_beam.Filter(lambda x: False) | apache_beam.GroupByKey() | apache_beam.Map(print)
#No output. Execution succeeds

With direct running mode:

with apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_processing")) as p:
    p | apache_beam.Create([('test_key', 'test_value')]) | apache_beam.Filter(lambda x: True) | apache_beam.GroupByKey() | apache_beam.Map(print)
#output: ('test_key', ['test_value'])

with apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="in_memory")) as p:
    p | apache_beam.Create([('test_key', 'test_value')]) | apache_beam.Filter(lambda x: True) | apache_beam.GroupByKey() | apache_beam.Map(print)
#output: ('test_key', ['test_value'])

with apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_threading")) as p:
    p | apache_beam.Create([('test_key', 'test_value')]) | apache_beam.Filter(lambda x: True) | apache_beam.GroupByKey() | apache_beam.Map(print)
#output: ('test_key', ['test_value'])


with apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_processing")) as p:
    p | apache_beam.Create([('test_key', 'test_value')]) | apache_beam.Filter(lambda x: False) | apache_beam.GroupByKey() | apache_beam.Map(print)
#Exception (similar to below)

with apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="in_memory")) as p:
    p | apache_beam.Create([('test_key', 'test_value')]) | apache_beam.Filter(lambda x: False) | apache_beam.GroupByKey() | apache_beam.Map(print)
#Exception (similar to below)

with apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_threading")) as p:
    p | apache_beam.Create([('test_key', 'test_value')]) | apache_beam.Filter(lambda x: False) | apache_beam.GroupByKey() | apache_beam.Map(print)
#Exception 
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/[email protected]/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/opt/homebrew/Cellar/[email protected]/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/__main__.py", line 39, in <module>
    cli.main()
  File "/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/../debugpy/server/cli.py", line 430, in main
    run()
  File "/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher/../../debugpy/../debugpy/server/cli.py", line 284, in run_file
    runpy.run_path(target, run_name="__main__")
  File "/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 321, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 135, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "/Users/sam/.vscode/extensions/ms-python.python-2022.20.2/pythonFiles/lib/python/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 124, in _run_code
    exec(code, run_globals)
  File "main.py", line 6, in <module>
    with apache_beam.Pipeline(options=PipelineOptions(direct_running_mode="multi_threading")) as p:
  File "/opt/homebrew/lib/python3.10/site-packages/apache_beam/pipeline.py", line 600, in __exit__
    self.result = self.run()
  File "/opt/homebrew/lib/python3.10/site-packages/apache_beam/pipeline.py", line 577, in run
    return self.runner.run_pipeline(self, self._options)
  File "/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 201, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 222, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 453, in run_stages
    bundle_results = self._execute_bundle(
  File "/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 781, in _execute_bundle
    self._run_bundle(
  File "/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1010, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/opt/homebrew/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1430, in process_bundle
    for ix, part in enumerate(input.partition(self._num_workers)):
AttributeError: 'NoneType' object has no attribute 'partition'
@SamHjelmfelt SamHjelmfelt changed the title (Python) GroupByKey() fails if PCollection is empty and direct_running_mode is set [Bug] GroupByKey() fails if PCollection is empty and direct_running_mode is set Feb 5, 2023
@tvalentyn
Copy link
Contributor

tvalentyn commented Apr 19, 2023

We suspect this was caused by: #26190

Another anecdote implicating the same PR was reproducible as below:

Prepare a fresh virtualenv. I used Python 3.9 but it failed in 3.7, too.
$ pip install tfx # apache-beam 2.39.0 will be installed as a dependency.
$ git clone https://github.com/tensorflow/tfx.git
$ cd tfx/tfx/examples/penguin

Edit penguin_pipeline_local_e2e_test.py:
in line 112: Change _make_beam_pipeline_args() like following.

111   def _make_beam_pipeline_args(self):
112     return ['--direct_running_mode=multi_processing', '--direct_num_workers=0']

Run the test with following command:

$ python penguin_pipeline_local_e2e_test.py PenguinPipelineLocalEndToEndTest.testPenguinPipelineLocal0
  • This command will run the testPenguinPipelineLocal() test only with 'keras' parameter.
  • The test fail with the trace:
ERROR: testPenguinPipelineLocal0 ('keras') (__main__.PenguinPipelineLocalEndToEndTest)
PenguinPipelineLocalEndToEndTest.testPenguinPipelineLocal0 ('keras')
testPenguinPipelineLocal('keras')
----------------------------------------------------------------------
Traceback (most recent call last):
  File ".../site-packages/absl/testing/parameterized.py", line 316, in bound_param_test
    return test_method(self, *testcase_params)
  File "~/tfx/tfx/examples/penguin/penguin_pipeline_local_e2e_test.py", line 141, in testPenguinPipelineLocal
    LocalDagRunner().run(pipeline)
  File ".../site-packages/tfx/orchestration/portable/tfx_runner.py", line 124, in run
    return self.run_with_ir(pipeline_pb, run_options=run_options_pb, **kwargs)
  File ".../site-packages/tfx/orchestration/local/local_dag_runner.py", line 109, in run_with_ir
    component_launcher.launch()
  File ".../site-packages/tfx/orchestration/portable/launcher.py", line 549, in launch
    executor_output = self._run_executor(execution_info)
  File ".../site-packages/tfx/orchestration/portable/launcher.py", line 424, in _run_executor
    executor_output = self._executor_operator.run_executor(execution_info)
  File ".../site-packages/tfx/orchestration/portable/beam_executor_operator.py", line 98, in run_executor
    return python_executor_operator.run_with_executor(execution_info, executor)
  File ".../site-packages/tfx/orchestration/portable/python_executor_operator.py", line 58, in run_with_executor
    result = executor.Do(execution_info.input_dict, output_dict,
  File ".../site-packages/tfx/components/statistics_gen/executor.py", line 141, in Do
    logging.info('Statistics for split %s written to %s.', split,
  File ".../site-packages/apache_beam/pipeline.py", line 596, in __exit__
    self.result = self.run()
  File ".../site-packages/apache_beam/pipeline.py", line 573, in run
    return self.runner.run_pipeline(self, self._options)
  File ".../site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File ".../site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 199, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File ".../site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 208, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File ".../site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 408, in run_stages
    bundle_results = self._execute_bundle(
  File ".../site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 736, in _execute_bundle
    self._run_bundle(
  File ".../site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 965, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "..., in process_bundle
    for ix, part in enumerate(input.partition(self._num_workers)):
AttributeError: 'NoneType' object has no attribute 'partition'


@AnandInguva
Copy link
Contributor

This will be solved in the version 2.50.0. Thanks

@github-actions github-actions bot added this to the 2.50.0 Release milestone Jul 11, 2023
@AnandInguva AnandInguva reopened this Jul 13, 2023
@tvalentyn tvalentyn removed this from the 2.50.0 Release milestone Jul 18, 2023
@github-actions github-actions bot added this to the 2.50.0 Release milestone Jul 27, 2023
@damccorm damccorm added the done & done Issue has been reviewed after it was closed for verification, followups, etc. label Aug 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
done & done Issue has been reviewed after it was closed for verification, followups, etc. python
Projects
None yet
4 participants