Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

326 Replace threading with multiprocessing for multiple chunks #327

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/notebooks/big_datasets.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"pyopia process config.toml --num-chunks 4\n",
"```\n",
"\n",
"This will split the list of raw files into 4 chunks to be processed in parallell using threading. This tool will organise the chunks of file names so that the appropriate background files into the correct places (i.e. for moving background, the last `average_window` number of files in the previous chunk are added to the start of the next chunk; and for fixed background the same initial `average_window` number of files are added to the top of each chunk)."
"This will split the list of raw files into 4 chunks to be processed in parallell using multiprocessing. This tool will organise the chunks of file names so that the appropriate background files into the correct places (i.e. for moving background, the last `average_window` number of files in the previous chunk are added to the start of the next chunk; and for fixed background the same initial `average_window` number of files are added to the top of each chunk)."
]
}
],
Expand Down
2 changes: 1 addition & 1 deletion pyopia/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.7.0'
__version__ = '2.8.0'
17 changes: 10 additions & 7 deletions pyopia/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from rich.logging import RichHandler
import rich.progress
import pandas as pd
import threading
import multiprocessing

import pyopia
import pyopia.background
Expand Down Expand Up @@ -187,19 +187,22 @@ def process_file_list(file_list, c):
logger.error(e)
logger.debug(''.join(traceback.format_tb(e.__traceback__)))

# With one chunk we keep the non-threaded functionality to ensure backwards compatibility
# With one chunk we keep the non-multiprocess functionality to ensure backwards compatibility
job_list = []
if num_chunks == 1:
process_file_list(raw_files, 0)
else:
for c, chunk in enumerate(raw_files.chunked_files):
job = threading.Thread(target=process_file_list, args=(chunk, c, ))
job.start()
job = multiprocessing.Process(target=process_file_list, args=(chunk, c))
job_list.append(job)

# Calculate and print total processing time
# If we are using threads, make sure all jobs have finished
# Start all the jobs
[job.start() for job in job_list]

# If we are using multiprocessing, make sure all jobs have finished
[job.join() for job in job_list]

# Calculate and print total processing time
time_total = pd.to_timedelta(time.time() - t1, 'seconds')
with Progress(transient=True) as progress:
progress.console.print(f"[blue]PROCESSING COMPLETED IN {time_total}")
Expand Down Expand Up @@ -256,7 +259,7 @@ def setup_logging(pipeline_config):
handlers = [logging.FileHandler(log_file, mode='a')]

# Configure logger
log_format = '%(asctime)s %(levelname)s %(threadName)s [%(module)s.%(funcName)s] %(message)s'
log_format = '%(asctime)s %(levelname)s %(processName)s [%(module)s.%(funcName)s] %(message)s'
logging.basicConfig(level=log_level, datefmt='%Y-%m-%d %H:%M:%S', format=log_format, handlers=handlers)


Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ h5py = "^3.9.0"
poetry-version-plugin = "^0.2.0"
tensorflow-macos = {version = "^2.16.2", optional = true, markers = "sys_platform == 'darwin' and platform_machine == 'arm64'"}
tensorflow-cpu = {version = "^2.16.2", optional = true}
keras = {version = "3.5.0", optional = true}
dask = ">=2024.8.1"
nbconvert = "^7.16.4"
h5netcdf = ">= 1.3.0"
scikit-image = "^0.24.0"

[tool.poetry.extras]
classification-arm64 = ["tensorflow-macos"]
classification = ["tensorflow-cpu"]
classification-arm64 = ["tensorflow-macos", "keras"]
classification = ["tensorflow-cpu", "keras"]

[tool.poetry-version-plugin]
source = "init"
Expand Down
Loading