diff --git a/pyopia/__init__.py b/pyopia/__init__.py index 87f7ae6..6bd065c 100644 --- a/pyopia/__init__.py +++ b/pyopia/__init__.py @@ -1 +1 @@ -__version__ = '2.5.10' +__version__ = '2.5.11' diff --git a/pyopia/cli.py b/pyopia/cli.py index 93cf956..7fe100b 100644 --- a/pyopia/cli.py +++ b/pyopia/cli.py @@ -186,7 +186,8 @@ def process_file_list(file_list, c): @app.command() -def merge_mfdata(path_to_data: str, prefix='*'): +def merge_mfdata(path_to_data: str, prefix='*', overwrite_existing_partials: bool = True, + chunk_size: int = None): '''Combine a multi-file directory of STATS.nc files into a single '-STATS.nc' file that can then be loaded with {func}`pyopia.io.load_stats` @@ -198,8 +199,21 @@ def merge_mfdata(path_to_data: str, prefix='*'): prefix : str Prefix to multi-file dataset (for replacing the wildcard in '*Image-D*-STATS.nc'). Defaults to '*' + + overwrite_existing_partials : bool + Do not reprocess existing merged netcdf files for each chunk if False. + Otherwise reprocess (load) and overwrite. This can be used to restart + or continue a previous merge operation as new files become available. + + chunk_size : int + Process this many files together and store as partially merged netcdf files, which + are then merged at the end. Default: None, process all files together. ''' - pyopia.io.merge_and_save_mfdataset(path_to_data, prefix=prefix) + setup_logging({'general': {}}) + + pyopia.io.merge_and_save_mfdataset(path_to_data, prefix=prefix, + overwrite_existing_partials=overwrite_existing_partials, + chunk_size=chunk_size) def setup_logging(pipeline_config): diff --git a/pyopia/io.py b/pyopia/io.py index 0f91baf..83b75da 100644 --- a/pyopia/io.py +++ b/pyopia/io.py @@ -3,7 +3,7 @@ ''' from datetime import datetime - +import numpy as np import h5py import pandas as pd import toml @@ -11,6 +11,7 @@ import os from glob import glob import xarray as xr +from tqdm.auto import tqdm from pyopia import __version__ as pyopia_version @@ -178,7 +179,7 @@ def load_stats(datafilename): ''' if datafilename.endswith('.nc'): - with xarray.open_dataset(datafilename) as xstats: + with xarray.open_dataset(datafilename, engine=NETCDF_ENGINE) as xstats: xstats.load() elif datafilename.endswith('.h5'): logger.warning('In future, load_stats will only take .nc files') @@ -194,7 +195,11 @@ def load_stats(datafilename): def combine_stats_netcdf_files(path_to_data, prefix='*'): - '''Combine a multi-file directory of STATS.nc files into a 'stats' xarray dataset created by :func:`pyopia.io.write_stats` + '''.. deprecated:: 2.4.11 + :class:`pyopia.io.combine_stats_netcdf_files` will be removed in version 3.0.0, it is replaced by + :class:`pyopia.io.concat_stats_netcdf_files`. + + Combine a multi-file directory of STATS.nc files into a 'stats' xarray dataset created by :func:`pyopia.io.write_stats` when using 'append = false' Parameters @@ -231,7 +236,63 @@ def combine_stats_netcdf_files(path_to_data, prefix='*'): return xstats, image_stats -def merge_and_save_mfdataset(path_to_data, prefix='*'): +def concat_stats_netcdf_files(sorted_filelist): + '''Concatenate specified list of STATS.nc files into one 'xstats' xarray dataset + created by :func:`pyopia.io.write_stats when using 'append = false'. + + Existing files are first loaded and then combined, so memory usage will go up with longer file lists. + + Parameters + ---------- + sorted_filelist : str + List of files to be combined into single dataset + + Returns + ------- + xstats : xarray.Dataset or None + Particle statistics and metatdata from processing steps + image_stats : xarray.Dataset or None + Summary statistics of each raw image (including those with no particles) + ''' + if len(sorted_filelist) < 1: + logger.error('No files found to concatenate, doing nothing.') + return None, None + + # We load one dataset at the time into a list for later merge + datasets = [] + datasets_image_stats = [] + + # Check if we have image statistics in first file, if not, we skip checking the rest + skip_image_stats = False + try: + with xr.open_dataset(sorted_filelist[0], group='image_stats', engine=NETCDF_ENGINE) as ds: + ds.load() + except OSError: + logger.info('Could get image_stats from netcdf files for merging, returning None for this.') + skip_image_stats = True + + # Load datasets from each file into the lists + for f in tqdm(sorted_filelist, desc='Loading datasets'): + with xr.open_dataset(f, engine=NETCDF_ENGINE) as ds: + ds.load() + datasets.append(ds) + + if not skip_image_stats: + with xr.open_dataset(f, group='image_stats', engine=NETCDF_ENGINE) as ds: + ds.load() + datasets_image_stats.append(ds) + + # Combine the individual datasets loaded above + logging.info('Combining datasets') + xstats = xr.concat(datasets, dim='index') + image_stats = None + if not skip_image_stats: + image_stats = xr.concat(datasets_image_stats, dim='timestamp') + + return xstats, image_stats + + +def merge_and_save_mfdataset(path_to_data, prefix='*', overwrite_existing_partials=False, chunk_size=None): '''Combine a multi-file directory of STATS.nc files into a single '-STATS.nc' file that can then be loaded with :func:`pyopia.io.load_stats` @@ -243,24 +304,80 @@ def merge_and_save_mfdataset(path_to_data, prefix='*'): prefix : str Prefix to multi-file dataset (for replacing the wildcard in '*Image-D*-STATS.nc'). Defaults to '*' + + overwrite_existing_partials : bool + Do not reprocess existing merged netcdf files for each chunk if False. + Otherwise reprocess (load) and overwrite. This can be used to restart + or continue a previous merge operation as new files become available. + + chunk_size : int + Number of files to be loaded and merged in each step. Produces a number + of intermediate/partially merged netcdf files equal to the total number + of input files divided by chunk_size. The last chunk may contain less + files than specified, depending on the total number of files. + Default: None, which processes all files together. ''' + logging.info(f'Combine stats netcdf files from {path_to_data}') + if (chunk_size is not None) and (chunk_size < 1): + raise ValueError(f'Invalid chunk size, must be greater than 0, was {chunk_size}') - logging.info(f'combine stats netcdf files from {path_to_data}') - xstats, image_stats = combine_stats_netcdf_files(path_to_data, prefix=prefix) + # Get sorted list of per-image stats netcdf files + sorted_filelist = sorted(glob(os.path.join(path_to_data, prefix + 'Image-D*-STATS.nc'))) + # Chunk the file list into smaller parts if specified + num_files = len(sorted_filelist) + chunk_size_used = num_files if chunk_size is None else min(chunk_size, num_files) + num_chunks = int(np.ceil(num_files / chunk_size_used)) + filelist_chunks = [sorted_filelist[i*chunk_size_used:min(num_files, (i+1)*chunk_size_used)] for i in range(num_chunks)] + infostr = f'Processing {num_chunks} partial file lists of {chunk_size_used} files each' + infostr += ', based on a total of {num_files} files.' + logging.info(infostr) + + # Get config from first file in list + xstats = load_stats(sorted_filelist[0]) settings = steps_from_xstats(xstats) - prefix_out = os.path.basename(settings['steps']['output']['output_datafile']) - output_name = os.path.join(path_to_data, prefix_out) - - logging.info(f'writing {output_name}') - # save the particle statistics (xstats) to NetCDF encoding = setup_xstats_encoding(xstats) - xstats.to_netcdf(output_name + '-STATS.nc', encoding=encoding, engine=NETCDF_ENGINE, format='NETCDF4') - # if summary data for each raw image are available (image_stats), save this into the image_stats group - if image_stats is not None: - image_stats.to_netcdf(output_name + '-STATS.nc', group='image_stats', mode='a', engine=NETCDF_ENGINE) - logging.info(f'writing {output_name} done.') + + def process_store(i, filelist_): + output_name = os.path.join(path_to_data, f'part-{i:04d}-{prefix_out}-STATS.nc') + + # Skip this chunk if the merged output file exists and overwrite is set to False + if os.path.exists(output_name) and not overwrite_existing_partials: + logging.info(f'File exists ({output_name}), skipping') + return output_name + + # Load the individual datasets + xstats, image_stats = concat_stats_netcdf_files(filelist_) + + # Save the particle statistics (xstats) to NetCDF + logging.info(f'Writing {output_name}') + if xstats is not None: + xstats.to_netcdf(output_name, mode='w', encoding=encoding, engine=NETCDF_ENGINE, format='NETCDF4') + + # If summary data for each raw image are available (image_stats), save this into the image_stats group + if image_stats is not None: + image_stats.to_netcdf(output_name, group='image_stats', mode='a', engine=NETCDF_ENGINE) + logging.info(f'Writing {output_name} done.') + + return output_name + + # Loop over filelist chunkst and created merged netcdf files for each + merged_files = [] + for i, filelist_ in enumerate(filelist_chunks): + output_name = process_store(i, filelist_) + merged_files.append(output_name) + + # Finally, merge the partially merged files + logging.info('Doing final merge of partially merged files') + output_name = os.path.join(path_to_data, prefix_out + '-STATS.nc') + with xr.open_mfdataset(merged_files, concat_dim='index', combine='nested') as ds: + ds.to_netcdf(output_name, mode='w', encoding=encoding, engine=NETCDF_ENGINE, format='NETCDF4') + + with xr.open_mfdataset(merged_files, group='image_stats', concat_dim='timestamp', combine='nested') as ds: + ds.to_netcdf(output_name, mode='a', group='image_stats', engine=NETCDF_ENGINE) + + logging.info(f'Writing {output_name} done.') def steps_from_xstats(xstats):