From df5f941dee574940ac4fc7e710e64f66aaad986c Mon Sep 17 00:00:00 2001 From: RussTreadon-NOAA <26926959+RussTreadon-NOAA@users.noreply.github.com> Date: Fri, 18 Aug 2023 15:16:49 -0400 Subject: [PATCH] Create and populate ensemble directory for UFS-based ATM DA (#1801) Adds scripting to stage ensemble files for use in hybrid variational and ensemble UFS-based atmospheric DA. --- ush/python/pygfs/task/analysis.py | 65 +++++++++++++++++++++- ush/python/pygfs/task/atm_analysis.py | 15 +++++- ush/python/pygfs/task/atmens_analysis.py | 68 +++--------------------- 3 files changed, 85 insertions(+), 63 deletions(-) diff --git a/ush/python/pygfs/task/analysis.py b/ush/python/pygfs/task/analysis.py index 94c1413283..cfd1fb2206 100644 --- a/ush/python/pygfs/task/analysis.py +++ b/ush/python/pygfs/task/analysis.py @@ -8,7 +8,8 @@ from typing import List, Dict, Any, Union from wxflow import (parse_j2yaml, FileHandler, rm_p, logit, - Task, Executable, WorkflowException) + Task, Executable, WorkflowException, to_fv3time, to_YMD, + Template, TemplateConstants) logger = getLogger(__name__.split('.')[-1]) @@ -200,6 +201,68 @@ def link_jediexe(self) -> None: return + @staticmethod + @logit(logger) + def get_fv3ens_dict(config: Dict[str, Any]) -> Dict[str, Any]: + """Compile a dictionary of ensemble member restarts to copy + + This method constructs a dictionary of ensemble FV3 restart files (coupler, core, tracer) + that are needed for global atmens DA and returns said dictionary for use by the FileHandler class. + + Parameters + ---------- + config: Dict + a dictionary containing all of the configuration needed + + Returns + ---------- + ens_dict: Dict + a dictionary containing the list of ensemble member restart files to copy for FileHandler + """ + # NOTE for now this is FV3 restart files and just assumed to be fh006 + + # define template + template_res = config.COM_ATMOS_RESTART_TMPL + prev_cycle = config.previous_cycle + tmpl_res_dict = { + 'ROTDIR': config.ROTDIR, + 'RUN': config.RUN, + 'YMD': to_YMD(prev_cycle), + 'HH': prev_cycle.strftime('%H'), + 'MEMDIR': None + } + + # construct ensemble member file list + dirlist = [] + enslist = [] + for imem in range(1, config.NMEM_ENS + 1): + memchar = f"mem{imem:03d}" + + # create directory path for ensemble member restart + dirlist.append(os.path.join(config.DATA, config.dirname, f'mem{imem:03d}')) + + # get FV3 restart files, this will be a lot simpler when using history files + tmpl_res_dict['MEMDIR'] = memchar + rst_dir = Template.substitute_structure(template_res, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_res_dict.get) + run_dir = os.path.join(config.DATA, config.dirname, memchar) + + # atmens DA needs coupler + basename = f'{to_fv3time(config.current_cycle)}.coupler.res' + enslist.append([os.path.join(rst_dir, basename), os.path.join(config.DATA, config.dirname, memchar, basename)]) + + # atmens DA needs core, srf_wnd, tracer, phy_data, sfc_data + for ftype in ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data']: + template = f'{to_fv3time(config.current_cycle)}.{ftype}.tile{{tilenum}}.nc' + for itile in range(1, config.ntiles + 1): + basename = template.format(tilenum=itile) + enslist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) + + ens_dict = { + 'mkdir': dirlist, + 'copy': enslist, + } + return ens_dict + @staticmethod @logit(logger) def execute_jediexe(workdir: Union[str, os.PathLike], aprun_cmd: str, jedi_exec: str, jedi_yaml: str) -> None: diff --git a/ush/python/pygfs/task/atm_analysis.py b/ush/python/pygfs/task/atm_analysis.py index b69e83c3cd..da41574fc9 100644 --- a/ush/python/pygfs/task/atm_analysis.py +++ b/ush/python/pygfs/task/atm_analysis.py @@ -82,11 +82,22 @@ def initialize(self: Analysis) -> None: jedi_fix_list = parse_j2yaml(jedi_fix_list_path, self.task_config) FileHandler(jedi_fix_list).sync() - # stage berror files - # copy static background error files, otherwise it will assume ID matrix + # stage static background error files, otherwise it will assume ID matrix logger.debug(f"Stage files for STATICB_TYPE {self.task_config.STATICB_TYPE}") FileHandler(self.get_berror_dict(self.task_config)).sync() + # stage ensemble files for use in hybrid background error + if self.task_config.DOHYBVAR: + logger.debug(f"Stage ensemble files for DOHYBVAR {self.task_config.DOHYBVAR}") + localconf = AttrDict() + keys = ['COM_ATMOS_RESTART_TMPL', 'previous_cycle', 'ROTDIR', 'RUN', + 'NMEM_ENS', 'DATA', 'current_cycle', 'ntiles'] + for key in keys: + localconf[key] = self.task_config[key] + localconf.RUN = 'enkf' + self.task_config.RUN + localconf.dirname = 'ens' + FileHandler(self.get_fv3ens_dict(localconf)).sync() + # stage backgrounds FileHandler(self.get_bkg_dict(AttrDict(self.task_config))).sync() diff --git a/ush/python/pygfs/task/atmens_analysis.py b/ush/python/pygfs/task/atmens_analysis.py index e5f8663d27..9cf84c07c7 100644 --- a/ush/python/pygfs/task/atmens_analysis.py +++ b/ush/python/pygfs/task/atmens_analysis.py @@ -108,7 +108,14 @@ def initialize(self: Analysis) -> None: FileHandler(jedi_fix_list).sync() # stage backgrounds - FileHandler(self.get_bkg_dict()).sync() + logger.debug(f"Stage ensemble member background files") + localconf = AttrDict() + keys = ['COM_ATMOS_RESTART_TMPL', 'previous_cycle', 'ROTDIR', 'RUN', + 'NMEM_ENS', 'DATA', 'current_cycle', 'ntiles'] + for key in keys: + localconf[key] = self.task_config[key] + localconf.dirname = 'bkg' + FileHandler(self.get_fv3ens_dict(localconf)).sync() # generate ensemble da YAML file logger.debug(f"Generate ensemble da YAML file: {self.task_config.fv3jedi_yaml}") @@ -286,62 +293,3 @@ def jedi2fv3inc(self: Analysis) -> None: cmd.add_default_arg(atminc_fv3) logger.debug(f"Executing {cmd}") cmd(output='stdout', error='stderr') - - @logit(logger) - def get_bkg_dict(self: Analysis) -> Dict[str, List[str]]: - """Compile a dictionary of model background files to copy - - This method constructs a dictionary of ensemble FV3 restart files (coupler, core, tracer) - that are needed for global atmens DA and returns said dictionary for use by the FileHandler class. - - Parameters - ---------- - None - - Returns - ---------- - bkg_dict: Dict - a dictionary containing the list of model background files to copy for FileHandler - """ - # NOTE for now this is FV3 restart files and just assumed to be fh006 - # loop over ensemble members - rstlist = [] - bkglist = [] - - # get FV3 restart files, this will be a lot simpler when using history files - template_res = self.task_config.COM_ATMOS_RESTART_TMPL - tmpl_res_dict = { - 'ROTDIR': self.task_config.ROTDIR, - 'RUN': self.task_config.RUN, - 'YMD': to_YMD(self.task_config.previous_cycle), - 'HH': self.task_config.previous_cycle.strftime('%H'), - 'MEMDIR': None - } - - for imem in range(1, self.task_config.NMEM_ENS + 1): - memchar = f"mem{imem:03d}" - - # get FV3 restart files, this will be a lot simpler when using history files - tmpl_res_dict['MEMDIR'] = memchar - rst_dir = Template.substitute_structure(template_res, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_res_dict.get) - rstlist.append(rst_dir) - - run_dir = os.path.join(self.task_config.DATA, 'bkg', memchar) - - # atmens DA needs coupler - basename = f'{to_fv3time(self.task_config.current_cycle)}.coupler.res' - bkglist.append([os.path.join(rst_dir, basename), os.path.join(self.task_config.DATA, 'bkg', memchar, basename)]) - - # atmens DA needs core, srf_wnd, tracer, phy_data, sfc_data - for ftype in ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data']: - template = f'{to_fv3time(self.task_config.current_cycle)}.{ftype}.tile{{tilenum}}.nc' - for itile in range(1, self.task_config.ntiles + 1): - basename = template.format(tilenum=itile) - bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) - - bkg_dict = { - 'mkdir': rstlist, - 'copy': bkglist, - } - - return bkg_dict