Skip to content

Commit

Permalink
326 Replace threading with multiprocessing for multiple chunks (#327)
Browse files Browse the repository at this point in the history
* Replace threading with multiprocessing for multiple chunks

* 326 Pin Keras version to 3.5.0 for compat with pretrained model

---------

Co-authored-by: Raymond Nepstad <[email protected]>
  • Loading branch information
nepstad and Raymond Nepstad authored Oct 10, 2024
1 parent 338fa7e commit 3c5d105
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 11 deletions.
2 changes: 1 addition & 1 deletion docs/notebooks/big_datasets.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"pyopia process config.toml --num-chunks 4\n",
"```\n",
"\n",
"This will split the list of raw files into 4 chunks to be processed in parallell using threading. This tool will organise the chunks of file names so that the appropriate background files into the correct places (i.e. for moving background, the last `average_window` number of files in the previous chunk are added to the start of the next chunk; and for fixed background the same initial `average_window` number of files are added to the top of each chunk)."
"This will split the list of raw files into 4 chunks to be processed in parallell using multiprocessing. This tool will organise the chunks of file names so that the appropriate background files into the correct places (i.e. for moving background, the last `average_window` number of files in the previous chunk are added to the start of the next chunk; and for fixed background the same initial `average_window` number of files are added to the top of each chunk)."
]
}
],
Expand Down
2 changes: 1 addition & 1 deletion pyopia/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.7.0'
__version__ = '2.8.0'
17 changes: 10 additions & 7 deletions pyopia/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from rich.logging import RichHandler
import rich.progress
import pandas as pd
import threading
import multiprocessing

import pyopia
import pyopia.background
Expand Down Expand Up @@ -187,19 +187,22 @@ def process_file_list(file_list, c):
logger.error(e)
logger.debug(''.join(traceback.format_tb(e.__traceback__)))

# With one chunk we keep the non-threaded functionality to ensure backwards compatibility
# With one chunk we keep the non-multiprocess functionality to ensure backwards compatibility
job_list = []
if num_chunks == 1:
process_file_list(raw_files, 0)
else:
for c, chunk in enumerate(raw_files.chunked_files):
job = threading.Thread(target=process_file_list, args=(chunk, c, ))
job.start()
job = multiprocessing.Process(target=process_file_list, args=(chunk, c))
job_list.append(job)

# Calculate and print total processing time
# If we are using threads, make sure all jobs have finished
# Start all the jobs
[job.start() for job in job_list]

# If we are using multiprocessing, make sure all jobs have finished
[job.join() for job in job_list]

# Calculate and print total processing time
time_total = pd.to_timedelta(time.time() - t1, 'seconds')
with Progress(transient=True) as progress:
progress.console.print(f"[blue]PROCESSING COMPLETED IN {time_total}")
Expand Down Expand Up @@ -256,7 +259,7 @@ def setup_logging(pipeline_config):
handlers = [logging.FileHandler(log_file, mode='a')]

# Configure logger
log_format = '%(asctime)s %(levelname)s %(threadName)s [%(module)s.%(funcName)s] %(message)s'
log_format = '%(asctime)s %(levelname)s %(processName)s [%(module)s.%(funcName)s] %(message)s'
logging.basicConfig(level=log_level, datefmt='%Y-%m-%d %H:%M:%S', format=log_format, handlers=handlers)


Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ h5py = "^3.9.0"
poetry-version-plugin = "^0.2.0"
tensorflow-macos = {version = "^2.16.2", optional = true, markers = "sys_platform == 'darwin' and platform_machine == 'arm64'"}
tensorflow-cpu = {version = "^2.16.2", optional = true}
keras = {version = "3.5.0", optional = true}
dask = ">=2024.8.1"
nbconvert = "^7.16.4"
h5netcdf = ">= 1.3.0"
scikit-image = "^0.24.0"

[tool.poetry.extras]
classification-arm64 = ["tensorflow-macos"]
classification = ["tensorflow-cpu"]
classification-arm64 = ["tensorflow-macos", "keras"]
classification = ["tensorflow-cpu", "keras"]

[tool.poetry-version-plugin]
source = "init"
Expand Down

0 comments on commit 3c5d105

Please sign in to comment.