Skip to content

Commit

Permalink
Create and populate ensemble directory for UFS-based ATM DA (NOAA-EMC…
Browse files Browse the repository at this point in the history
…#1801)

Adds scripting to stage ensemble files for use in hybrid variational and ensemble UFS-based atmospheric DA.
  • Loading branch information
RussTreadon-NOAA authored Aug 18, 2023
1 parent 17c9d57 commit df5f941
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 63 deletions.
65 changes: 64 additions & 1 deletion ush/python/pygfs/task/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from typing import List, Dict, Any, Union

from wxflow import (parse_j2yaml, FileHandler, rm_p, logit,
Task, Executable, WorkflowException)
Task, Executable, WorkflowException, to_fv3time, to_YMD,
Template, TemplateConstants)

logger = getLogger(__name__.split('.')[-1])

Expand Down Expand Up @@ -200,6 +201,68 @@ def link_jediexe(self) -> None:

return

@staticmethod
@logit(logger)
def get_fv3ens_dict(config: Dict[str, Any]) -> Dict[str, Any]:
"""Compile a dictionary of ensemble member restarts to copy
This method constructs a dictionary of ensemble FV3 restart files (coupler, core, tracer)
that are needed for global atmens DA and returns said dictionary for use by the FileHandler class.
Parameters
----------
config: Dict
a dictionary containing all of the configuration needed
Returns
----------
ens_dict: Dict
a dictionary containing the list of ensemble member restart files to copy for FileHandler
"""
# NOTE for now this is FV3 restart files and just assumed to be fh006

# define template
template_res = config.COM_ATMOS_RESTART_TMPL
prev_cycle = config.previous_cycle
tmpl_res_dict = {
'ROTDIR': config.ROTDIR,
'RUN': config.RUN,
'YMD': to_YMD(prev_cycle),
'HH': prev_cycle.strftime('%H'),
'MEMDIR': None
}

# construct ensemble member file list
dirlist = []
enslist = []
for imem in range(1, config.NMEM_ENS + 1):
memchar = f"mem{imem:03d}"

# create directory path for ensemble member restart
dirlist.append(os.path.join(config.DATA, config.dirname, f'mem{imem:03d}'))

# get FV3 restart files, this will be a lot simpler when using history files
tmpl_res_dict['MEMDIR'] = memchar
rst_dir = Template.substitute_structure(template_res, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_res_dict.get)
run_dir = os.path.join(config.DATA, config.dirname, memchar)

# atmens DA needs coupler
basename = f'{to_fv3time(config.current_cycle)}.coupler.res'
enslist.append([os.path.join(rst_dir, basename), os.path.join(config.DATA, config.dirname, memchar, basename)])

# atmens DA needs core, srf_wnd, tracer, phy_data, sfc_data
for ftype in ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data']:
template = f'{to_fv3time(config.current_cycle)}.{ftype}.tile{{tilenum}}.nc'
for itile in range(1, config.ntiles + 1):
basename = template.format(tilenum=itile)
enslist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)])

ens_dict = {
'mkdir': dirlist,
'copy': enslist,
}
return ens_dict

@staticmethod
@logit(logger)
def execute_jediexe(workdir: Union[str, os.PathLike], aprun_cmd: str, jedi_exec: str, jedi_yaml: str) -> None:
Expand Down
15 changes: 13 additions & 2 deletions ush/python/pygfs/task/atm_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,22 @@ def initialize(self: Analysis) -> None:
jedi_fix_list = parse_j2yaml(jedi_fix_list_path, self.task_config)
FileHandler(jedi_fix_list).sync()

# stage berror files
# copy static background error files, otherwise it will assume ID matrix
# stage static background error files, otherwise it will assume ID matrix
logger.debug(f"Stage files for STATICB_TYPE {self.task_config.STATICB_TYPE}")
FileHandler(self.get_berror_dict(self.task_config)).sync()

# stage ensemble files for use in hybrid background error
if self.task_config.DOHYBVAR:
logger.debug(f"Stage ensemble files for DOHYBVAR {self.task_config.DOHYBVAR}")
localconf = AttrDict()
keys = ['COM_ATMOS_RESTART_TMPL', 'previous_cycle', 'ROTDIR', 'RUN',
'NMEM_ENS', 'DATA', 'current_cycle', 'ntiles']
for key in keys:
localconf[key] = self.task_config[key]
localconf.RUN = 'enkf' + self.task_config.RUN
localconf.dirname = 'ens'
FileHandler(self.get_fv3ens_dict(localconf)).sync()

# stage backgrounds
FileHandler(self.get_bkg_dict(AttrDict(self.task_config))).sync()

Expand Down
68 changes: 8 additions & 60 deletions ush/python/pygfs/task/atmens_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,14 @@ def initialize(self: Analysis) -> None:
FileHandler(jedi_fix_list).sync()

# stage backgrounds
FileHandler(self.get_bkg_dict()).sync()
logger.debug(f"Stage ensemble member background files")
localconf = AttrDict()
keys = ['COM_ATMOS_RESTART_TMPL', 'previous_cycle', 'ROTDIR', 'RUN',
'NMEM_ENS', 'DATA', 'current_cycle', 'ntiles']
for key in keys:
localconf[key] = self.task_config[key]
localconf.dirname = 'bkg'
FileHandler(self.get_fv3ens_dict(localconf)).sync()

# generate ensemble da YAML file
logger.debug(f"Generate ensemble da YAML file: {self.task_config.fv3jedi_yaml}")
Expand Down Expand Up @@ -286,62 +293,3 @@ def jedi2fv3inc(self: Analysis) -> None:
cmd.add_default_arg(atminc_fv3)
logger.debug(f"Executing {cmd}")
cmd(output='stdout', error='stderr')

@logit(logger)
def get_bkg_dict(self: Analysis) -> Dict[str, List[str]]:
"""Compile a dictionary of model background files to copy
This method constructs a dictionary of ensemble FV3 restart files (coupler, core, tracer)
that are needed for global atmens DA and returns said dictionary for use by the FileHandler class.
Parameters
----------
None
Returns
----------
bkg_dict: Dict
a dictionary containing the list of model background files to copy for FileHandler
"""
# NOTE for now this is FV3 restart files and just assumed to be fh006
# loop over ensemble members
rstlist = []
bkglist = []

# get FV3 restart files, this will be a lot simpler when using history files
template_res = self.task_config.COM_ATMOS_RESTART_TMPL
tmpl_res_dict = {
'ROTDIR': self.task_config.ROTDIR,
'RUN': self.task_config.RUN,
'YMD': to_YMD(self.task_config.previous_cycle),
'HH': self.task_config.previous_cycle.strftime('%H'),
'MEMDIR': None
}

for imem in range(1, self.task_config.NMEM_ENS + 1):
memchar = f"mem{imem:03d}"

# get FV3 restart files, this will be a lot simpler when using history files
tmpl_res_dict['MEMDIR'] = memchar
rst_dir = Template.substitute_structure(template_res, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_res_dict.get)
rstlist.append(rst_dir)

run_dir = os.path.join(self.task_config.DATA, 'bkg', memchar)

# atmens DA needs coupler
basename = f'{to_fv3time(self.task_config.current_cycle)}.coupler.res'
bkglist.append([os.path.join(rst_dir, basename), os.path.join(self.task_config.DATA, 'bkg', memchar, basename)])

# atmens DA needs core, srf_wnd, tracer, phy_data, sfc_data
for ftype in ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data']:
template = f'{to_fv3time(self.task_config.current_cycle)}.{ftype}.tile{{tilenum}}.nc'
for itile in range(1, self.task_config.ntiles + 1):
basename = template.format(tilenum=itile)
bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)])

bkg_dict = {
'mkdir': rstlist,
'copy': bkglist,
}

return bkg_dict

0 comments on commit df5f941

Please sign in to comment.