From b86742d75a5daaab80448a83f3de4187cad8f5ed Mon Sep 17 00:00:00 2001 From: Arsalan Mostaani <102514087+arsalanmstn@users.noreply.github.com> Date: Mon, 4 Nov 2024 09:14:06 +0100 Subject: [PATCH] Modifying "process_file_list" function in cli.py (#330) Refactor process_filelist to work with forking when multiprocessing --------- Co-authored-by: Arsimstn --- pyopia/__init__.py | 2 +- pyopia/cli.py | 51 ++++++++++++++++++++++++++++++---------------- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/pyopia/__init__.py b/pyopia/__init__.py index 80e22f7..964a32a 100644 --- a/pyopia/__init__.py +++ b/pyopia/__init__.py @@ -1 +1 @@ -__version__ = '2.8.1' +__version__ = '2.8.2' diff --git a/pyopia/cli.py b/pyopia/cli.py index 0604646..959d5ff 100644 --- a/pyopia/cli.py +++ b/pyopia/cli.py @@ -172,28 +172,13 @@ def process(config_filename: str, num_chunks: int = 1, strategy: str = 'block'): progress.console.print("[blue]INITIALISE PIPELINE") - def process_file_list(file_list, c): - processing_pipeline = pyopia.pipeline.Pipeline(pipeline_config) - - with get_custom_progress_bar(f'[blue]Processing progress (chunk {c})', disable=c != 0) as pbar: - for filename in pbar.track(file_list, description=f'[blue]Processing progress (chunk {c})'): - try: - logger.debug(f'Chunk {c} starting to process {filename}') - processing_pipeline.run(filename) - except Exception as e: - logger.warning('[red]An error occured in processing, ' + - 'skipping rest of pipeline and moving to next image.' + - f'(chunk {c})') - logger.error(e) - logger.debug(''.join(traceback.format_tb(e.__traceback__))) - # With one chunk we keep the non-multiprocess functionality to ensure backwards compatibility job_list = [] if num_chunks == 1: - process_file_list(raw_files, 0) + process_file_list(raw_files, pipeline_config, 0) else: for c, chunk in enumerate(raw_files.chunked_files): - job = multiprocessing.Process(target=process_file_list, args=(chunk, c)) + job = multiprocessing.Process(target=process_file_list, args=(chunk, pipeline_config, c)) job_list.append(job) # Start all the jobs @@ -239,6 +224,38 @@ def merge_mfdata(path_to_data: str, prefix='*', overwrite_existing_partials: boo chunk_size=chunk_size) +def process_file_list(file_list, pipeline_config, c): + '''Run a PyOPIA processing pipeline for a chuncked list of files based on a given config.toml + + Parameters + ---------- + file_list : str + List of file paths to process, where each file will be passed individually through the processing pipeline + + pipeline_config : str + Loaded config.toml file to initialize the processing pipeline and setup logging + + c : int + Chunk index for tracking progress and logging. If set to 0, enables the + progress bar; for other values, the progress bar is disabled. + ''' + processing_pipeline = pyopia.pipeline.Pipeline(pipeline_config) + setup_logging(pipeline_config) + logger = logging.getLogger('rich') + + with get_custom_progress_bar(f'[blue]Processing progress (chunk {c})', disable=c != 0) as pbar: + for filename in pbar.track(file_list, description=f'[blue]Processing progress (chunk {c})'): + try: + logger.debug(f'Chunk {c} starting to process {filename}') + processing_pipeline.run(filename) + except Exception as e: + logger.warning('[red]An error occured in processing, ' + + 'skipping rest of pipeline and moving to next image.' + + f'(chunk {c})') + logger.error(e) + logger.debug(''.join(traceback.format_tb(e.__traceback__))) + + def setup_logging(pipeline_config): '''Configure logging