Skip to content

Commit

Permalink
298 Add progress bar, load datasets and merge manually (#309)
Browse files Browse the repository at this point in the history
* 298 Add progress bar, load datasets and merge manually

* 298 Merge stats files by chunks to save memory and add checkpointing

* 298 Keep old merge function with deprecation warning
  • Loading branch information
nepstad authored Sep 24, 2024
1 parent 7ec7585 commit 598ea55
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pyopia/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.5.10'
__version__ = '2.5.11'
18 changes: 16 additions & 2 deletions pyopia/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ def process_file_list(file_list, c):


@app.command()
def merge_mfdata(path_to_data: str, prefix='*'):
def merge_mfdata(path_to_data: str, prefix='*', overwrite_existing_partials: bool = True,
chunk_size: int = None):
'''Combine a multi-file directory of STATS.nc files into a single '-STATS.nc' file
that can then be loaded with {func}`pyopia.io.load_stats`
Expand All @@ -198,8 +199,21 @@ def merge_mfdata(path_to_data: str, prefix='*'):
prefix : str
Prefix to multi-file dataset (for replacing the wildcard in '*Image-D*-STATS.nc').
Defaults to '*'
overwrite_existing_partials : bool
Do not reprocess existing merged netcdf files for each chunk if False.
Otherwise reprocess (load) and overwrite. This can be used to restart
or continue a previous merge operation as new files become available.
chunk_size : int
Process this many files together and store as partially merged netcdf files, which
are then merged at the end. Default: None, process all files together.
'''
pyopia.io.merge_and_save_mfdataset(path_to_data, prefix=prefix)
setup_logging({'general': {}})

pyopia.io.merge_and_save_mfdataset(path_to_data, prefix=prefix,
overwrite_existing_partials=overwrite_existing_partials,
chunk_size=chunk_size)


def setup_logging(pipeline_config):
Expand Down
149 changes: 133 additions & 16 deletions pyopia/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
'''

from datetime import datetime

import numpy as np
import h5py
import pandas as pd
import toml
import xarray
import os
from glob import glob
import xarray as xr
from tqdm.auto import tqdm

from pyopia import __version__ as pyopia_version

Expand Down Expand Up @@ -178,7 +179,7 @@ def load_stats(datafilename):
'''

if datafilename.endswith('.nc'):
with xarray.open_dataset(datafilename) as xstats:
with xarray.open_dataset(datafilename, engine=NETCDF_ENGINE) as xstats:
xstats.load()
elif datafilename.endswith('.h5'):
logger.warning('In future, load_stats will only take .nc files')
Expand All @@ -194,7 +195,11 @@ def load_stats(datafilename):


def combine_stats_netcdf_files(path_to_data, prefix='*'):
'''Combine a multi-file directory of STATS.nc files into a 'stats' xarray dataset created by :func:`pyopia.io.write_stats`
'''.. deprecated:: 2.4.11
:class:`pyopia.io.combine_stats_netcdf_files` will be removed in version 3.0.0, it is replaced by
:class:`pyopia.io.concat_stats_netcdf_files`.
Combine a multi-file directory of STATS.nc files into a 'stats' xarray dataset created by :func:`pyopia.io.write_stats`
when using 'append = false'
Parameters
Expand Down Expand Up @@ -231,7 +236,63 @@ def combine_stats_netcdf_files(path_to_data, prefix='*'):
return xstats, image_stats


def merge_and_save_mfdataset(path_to_data, prefix='*'):
def concat_stats_netcdf_files(sorted_filelist):
'''Concatenate specified list of STATS.nc files into one 'xstats' xarray dataset
created by :func:`pyopia.io.write_stats when using 'append = false'.
Existing files are first loaded and then combined, so memory usage will go up with longer file lists.
Parameters
----------
sorted_filelist : str
List of files to be combined into single dataset
Returns
-------
xstats : xarray.Dataset or None
Particle statistics and metatdata from processing steps
image_stats : xarray.Dataset or None
Summary statistics of each raw image (including those with no particles)
'''
if len(sorted_filelist) < 1:
logger.error('No files found to concatenate, doing nothing.')
return None, None

# We load one dataset at the time into a list for later merge
datasets = []
datasets_image_stats = []

# Check if we have image statistics in first file, if not, we skip checking the rest
skip_image_stats = False
try:
with xr.open_dataset(sorted_filelist[0], group='image_stats', engine=NETCDF_ENGINE) as ds:
ds.load()
except OSError:
logger.info('Could get image_stats from netcdf files for merging, returning None for this.')
skip_image_stats = True

# Load datasets from each file into the lists
for f in tqdm(sorted_filelist, desc='Loading datasets'):
with xr.open_dataset(f, engine=NETCDF_ENGINE) as ds:
ds.load()
datasets.append(ds)

if not skip_image_stats:
with xr.open_dataset(f, group='image_stats', engine=NETCDF_ENGINE) as ds:
ds.load()
datasets_image_stats.append(ds)

# Combine the individual datasets loaded above
logging.info('Combining datasets')
xstats = xr.concat(datasets, dim='index')
image_stats = None
if not skip_image_stats:
image_stats = xr.concat(datasets_image_stats, dim='timestamp')

return xstats, image_stats


def merge_and_save_mfdataset(path_to_data, prefix='*', overwrite_existing_partials=False, chunk_size=None):
'''Combine a multi-file directory of STATS.nc files into a single '-STATS.nc' file
that can then be loaded with :func:`pyopia.io.load_stats`
Expand All @@ -243,24 +304,80 @@ def merge_and_save_mfdataset(path_to_data, prefix='*'):
prefix : str
Prefix to multi-file dataset (for replacing the wildcard in '*Image-D*-STATS.nc').
Defaults to '*'
overwrite_existing_partials : bool
Do not reprocess existing merged netcdf files for each chunk if False.
Otherwise reprocess (load) and overwrite. This can be used to restart
or continue a previous merge operation as new files become available.
chunk_size : int
Number of files to be loaded and merged in each step. Produces a number
of intermediate/partially merged netcdf files equal to the total number
of input files divided by chunk_size. The last chunk may contain less
files than specified, depending on the total number of files.
Default: None, which processes all files together.
'''
logging.info(f'Combine stats netcdf files from {path_to_data}')
if (chunk_size is not None) and (chunk_size < 1):
raise ValueError(f'Invalid chunk size, must be greater than 0, was {chunk_size}')

logging.info(f'combine stats netcdf files from {path_to_data}')
xstats, image_stats = combine_stats_netcdf_files(path_to_data, prefix=prefix)
# Get sorted list of per-image stats netcdf files
sorted_filelist = sorted(glob(os.path.join(path_to_data, prefix + 'Image-D*-STATS.nc')))

# Chunk the file list into smaller parts if specified
num_files = len(sorted_filelist)
chunk_size_used = num_files if chunk_size is None else min(chunk_size, num_files)
num_chunks = int(np.ceil(num_files / chunk_size_used))
filelist_chunks = [sorted_filelist[i*chunk_size_used:min(num_files, (i+1)*chunk_size_used)] for i in range(num_chunks)]
infostr = f'Processing {num_chunks} partial file lists of {chunk_size_used} files each'
infostr += ', based on a total of {num_files} files.'
logging.info(infostr)

# Get config from first file in list
xstats = load_stats(sorted_filelist[0])
settings = steps_from_xstats(xstats)

prefix_out = os.path.basename(settings['steps']['output']['output_datafile'])
output_name = os.path.join(path_to_data, prefix_out)

logging.info(f'writing {output_name}')
# save the particle statistics (xstats) to NetCDF
encoding = setup_xstats_encoding(xstats)
xstats.to_netcdf(output_name + '-STATS.nc', encoding=encoding, engine=NETCDF_ENGINE, format='NETCDF4')
# if summary data for each raw image are available (image_stats), save this into the image_stats group
if image_stats is not None:
image_stats.to_netcdf(output_name + '-STATS.nc', group='image_stats', mode='a', engine=NETCDF_ENGINE)
logging.info(f'writing {output_name} done.')

def process_store(i, filelist_):
output_name = os.path.join(path_to_data, f'part-{i:04d}-{prefix_out}-STATS.nc')

# Skip this chunk if the merged output file exists and overwrite is set to False
if os.path.exists(output_name) and not overwrite_existing_partials:
logging.info(f'File exists ({output_name}), skipping')
return output_name

# Load the individual datasets
xstats, image_stats = concat_stats_netcdf_files(filelist_)

# Save the particle statistics (xstats) to NetCDF
logging.info(f'Writing {output_name}')
if xstats is not None:
xstats.to_netcdf(output_name, mode='w', encoding=encoding, engine=NETCDF_ENGINE, format='NETCDF4')

# If summary data for each raw image are available (image_stats), save this into the image_stats group
if image_stats is not None:
image_stats.to_netcdf(output_name, group='image_stats', mode='a', engine=NETCDF_ENGINE)
logging.info(f'Writing {output_name} done.')

return output_name

# Loop over filelist chunkst and created merged netcdf files for each
merged_files = []
for i, filelist_ in enumerate(filelist_chunks):
output_name = process_store(i, filelist_)
merged_files.append(output_name)

# Finally, merge the partially merged files
logging.info('Doing final merge of partially merged files')
output_name = os.path.join(path_to_data, prefix_out + '-STATS.nc')
with xr.open_mfdataset(merged_files, concat_dim='index', combine='nested') as ds:
ds.to_netcdf(output_name, mode='w', encoding=encoding, engine=NETCDF_ENGINE, format='NETCDF4')

with xr.open_mfdataset(merged_files, group='image_stats', concat_dim='timestamp', combine='nested') as ds:
ds.to_netcdf(output_name, mode='a', group='image_stats', engine=NETCDF_ENGINE)

logging.info(f'Writing {output_name} done.')


def steps_from_xstats(xstats):
Expand Down

0 comments on commit 598ea55

Please sign in to comment.