Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow APP to differ between RUNs #2943

Open
wants to merge 19 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parm/config/gefs/config.resources
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ case ${step} in
export ntasks=1
export tasks_per_node=1
export threads_per_task=1
export is_exclusive=True
export memory="4096M"
;;

"waveinit")
Expand Down
31 changes: 24 additions & 7 deletions parm/config/gfs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,25 @@ export SENDDBN_NTC=${SENDDBN_NTC:-"NO"}
export SENDDBN=${SENDDBN:-"NO"}
export DBNROOT=${DBNROOT:-${UTILROOT:-}/fakedbn}

# APP settings
export APP=@APP@
# APP settings; configurable by RUN
case "${RUN}" in
"gfs")
export APP=@APP@
;;
"gdas")
export APP=@APP@
;;
"enkfgfs")
export APP=@APP@
;;
"enkfgdas")
export APP=@APP@
;;
*)
echo "FATAL ERROR: Unrecognized RUN (${RUN})!"
exit 1
;;
esac

shopt -s extglob
# Adjust APP based on RUN
Expand Down Expand Up @@ -220,7 +237,7 @@ case "${CASE}" in
;;
*)
echo "FATAL ERROR: Unrecognized CASE ${CASE}, ABORT!"
exit 1
exit 2
;;
esac

Expand Down Expand Up @@ -259,8 +276,8 @@ case "${APP}" in
fi
;;
*)
echo "Unrecognized APP: '${APP}'"
exit 1
echo "FATAL ERROR: Unrecognized APP: '${APP}'"
exit 3
;;
esac

Expand Down Expand Up @@ -464,8 +481,8 @@ export FHMAX_FITS=132
export HPSSARCH="@HPSSARCH@" # save data to HPSS archive
export LOCALARCH="@LOCALARCH@" # save data to local archive
if [[ ${HPSSARCH} = "YES" ]] && [[ ${LOCALARCH} = "YES" ]]; then
echo "Both HPSS and local archiving selected. Please choose one or the other."
exit 2
echo "FATAL ERROR: Both HPSS and local archiving selected. Please choose one or the other."
exit 4
fi
export ARCH_CYC=00 # Archive data at this cycle for warm_start capability
export ARCH_WARMICFREQ=4 # Archive frequency in days for warm_start capability
Expand Down
2 changes: 1 addition & 1 deletion parm/config/gfs/config.resources
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ case ${step} in
ntasks=1
tasks_per_node=1
threads_per_task=1
export is_exclusive=True
memory="4096M"
;;

"atmensanlinit")
Expand Down
200 changes: 121 additions & 79 deletions workflow/applications/applications.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WalterKolczynski-NOAA I have made some significant changes to the Application classes to allow the APP to vary by RUN. Next, I will need to update the Rocoto class to work with this modified class. Before I start that, could you take a look at these and see if you're OK with what I have done. Happy to meet to go over details if needed.

Original file line number Diff line number Diff line change
Expand Up @@ -31,96 +31,139 @@ def __init__(self, conf: Configuration) -> None:

self.scheduler = Host().scheduler

# Get the most basic settings from config.base to determine
# experiment type ({NET}_{MODE})
base = conf.parse_config('config.base')

self.mode = base['MODE']

if self.mode not in self.VALID_MODES:
raise NotImplementedError(f'{self.mode} is not a valid application mode.\n'
f'Valid application modes are:\n'
f'{", ".join(self.VALID_MODES)}\n')

self.net = base['NET']
self.model_app = base.get('APP', 'ATM')
self.do_atm = base.get('DO_ATM', True)
self.do_wave = base.get('DO_WAVE', False)
self.do_wave_bnd = base.get('DOBNDPNT_WAVE', False)
self.do_ocean = base.get('DO_OCN', False)
self.do_ice = base.get('DO_ICE', False)
self.do_aero = base.get('DO_AERO', False)
self.do_prep_obs_aero = base.get('DO_PREP_OBS_AERO', False)
self.do_bufrsnd = base.get('DO_BUFRSND', False)
self.do_gempak = base.get('DO_GEMPAK', False)
self.do_awips = base.get('DO_AWIPS', False)
self.do_verfozn = base.get('DO_VERFOZN', True)
self.do_verfrad = base.get('DO_VERFRAD', True)
self.do_vminmon = base.get('DO_VMINMON', True)
self.do_tracker = base.get('DO_TRACKER', True)
self.do_genesis = base.get('DO_GENESIS', True)
self.do_genesis_fsu = base.get('DO_GENESIS_FSU', False)
self.do_metp = base.get('DO_METP', False)
self.do_upp = not base.get('WRITE_DOPOST', True)
self.do_goes = base.get('DO_GOES', False)
self.do_mos = base.get('DO_MOS', False)
self.do_extractvars = base.get('DO_EXTRACTVARS', False)

self.do_hpssarch = base.get('HPSSARCH', False)

self.nens = base.get('NMEM_ENS', 0)
self.fcst_segments = base.get('FCST_SEGMENTS', None)

if not AppConfig.is_monotonic(self.fcst_segments):
raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}')

self.wave_runs = None
if self.do_wave:
wave_run = base.get('WAVE_RUN', 'BOTH').lower()
if wave_run in ['both']:
self.wave_runs = ['gfs', 'gdas']
elif wave_run in ['gfs', 'gdas']:
self.wave_runs = [wave_run]

self.aero_anl_runs = None
self.aero_fcst_runs = None
if self.do_aero:
aero_anl_run = base.get('AERO_ANL_RUN', 'BOTH').lower()
if aero_anl_run in ['both']:
self.aero_anl_runs = ['gfs', 'gdas']
elif aero_anl_run in ['gfs', 'gdas']:
self.aero_anl_runs = [aero_anl_run]
aero_fcst_run = base.get('AERO_FCST_RUN', None).lower()
if aero_fcst_run in ['both']:
self.aero_fcst_runs = ['gfs', 'gdas']
elif aero_fcst_run in ['gfs', 'gdas']:
self.aero_fcst_runs = [aero_fcst_run]
self.gfs_cyc = base.get('gfs_cyc')

print(f"Generating the XML for a {self.mode}_{self.net} case")

def _init_finalize(self, conf: Configuration):
print("Finalizing initialize")
'''
Finalize object initialization calling subclass methods
'''

# Get a list of all possible config_files that would be part of the application
self.configs_names = self._get_app_configs()
# Get run-, net-, and mode-based options
self.run_options = self._get_run_options(conf)

# Source the config files for the jobs in the application without specifying a RUN
self.configs = {'_no_run': self._source_configs(conf)}
# Get task names and runs for the application
self.task_names = self.get_task_names()

# Update the base config dictionary based on application
self.configs['_no_run']['base'] = self._update_base(self.configs['_no_run']['base'])
# Initialize the configs and model_apps dictionaries
self.configs = dict.fromkeys(self.runs)

# Save base in the internal state since it is often needed
base = self.configs['_no_run']['base']
# Now configure the experiment for each valid run
for run in self.runs:
self.configs[run] = self._source_configs(conf, run=run, log=False)

# Get more configuration options into the class attributes
self.gfs_cyc = base.get('gfs_cyc')
def _get_run_options(self, conf: Configuration) -> Dict[str, Any]:
'''
Determine the do_* and APP options for each RUN by sourcing config.base
for each RUN and collecting the flags into self.run_options
'''

# Get task names for the application
self.task_names = self.get_task_names()
run_options = {run: {} for run in dict.fromkeys(self.runs)}
for run in self.runs:
# Read config.base with RUN specified
run_base = conf.parse_config('config.base', RUN=run)

run_options[run]['app'] = run_base.get('APP', 'ATM')
run_options[run]['do_wave_bnd'] = run_base.get('DOBNDPNT_WAVE', False)
run_options[run]['do_bufrsnd'] = run_base.get('DO_BUFRSND', False)
run_options[run]['do_gempak'] = run_base.get('DO_GEMPAK', False)
run_options[run]['do_awips'] = run_base.get('DO_AWIPS', False)
run_options[run]['do_verfozn'] = run_base.get('DO_VERFOZN', True)
run_options[run]['do_verfrad'] = run_base.get('DO_VERFRAD', True)
run_options[run]['do_vminmon'] = run_base.get('DO_VMINMON', True)
run_options[run]['do_tracker'] = run_base.get('DO_TRACKER', True)
run_options[run]['do_genesis'] = run_base.get('DO_GENESIS', True)
run_options[run]['do_genesis_fsu'] = run_base.get('DO_GENESIS_FSU', False)
run_options[run]['do_metp'] = run_base.get('DO_METP', False)
run_options[run]['do_upp'] = not run_base.get('WRITE_DOPOST', True)
run_options[run]['do_goes'] = run_base.get('DO_GOES', False)
run_options[run]['do_mos'] = run_base.get('DO_MOS', False)
run_options[run]['do_extractvars'] = run_base.get('DO_EXTRACTVARS', False)

run_options[run]['do_atm'] = run_base.get('DO_ATM', True)
run_options[run]['do_wave'] = run_base.get('DO_WAVE', False)
run_options[run]['do_ocean'] = run_base.get('DO_OCN', False)
run_options[run]['do_ice'] = run_base.get('DO_ICE', False)
run_options[run]['do_aero'] = run_base.get('DO_AERO', False)
run_options[run]['do_prep_obs_aero'] = run_base.get('DO_PREP_OBS_AERO', False)

run_options[run]['do_hpssarch'] = run_base.get('HPSSARCH', False)
run_options[run]['fcst_segments'] = run_base.get('FCST_SEGMENTS', None)

if not AppConfig.is_monotonic(run_options[run]['fcst_segments']):
raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}')

wave_runs = []
if run_options[run]['do_wave']:
wave_run = run_base.get('WAVE_RUN', 'BOTH').lower()
if wave_run in ['both']:
wave_runs = ['gfs', 'gdas']
elif wave_run in ['gfs', 'gdas']:
wave_runs = [wave_run]
Comment on lines +108 to +114
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be handled in config.base so do_wave is set based on RUN.


run_options[run]['wave_runs'] = wave_runs

aero_anl_runs = []
aero_fcst_runs = []
if run_options[run]['do_aero']:
aero_anl_run = run_base.get('AERO_ANL_RUN', 'BOTH').lower()
if aero_anl_run in ['both']:
aero_anl_runs = ['gfs', 'gdas']
elif aero_anl_run in ['gfs', 'gdas']:
aero_anl_runs = [aero_anl_run]

aero_fcst_run = run_base.get('AERO_FCST_RUN', None).lower()
if aero_fcst_run in ['both']:
aero_fcst_runs = ['gfs', 'gdas']
elif aero_fcst_run in ['gfs', 'gdas']:
aero_fcst_runs = [aero_fcst_run]

run_options[run]['do_aero_anl'] = True if run in aero_anl_runs else False
run_options[run]['do_aero_fcst'] = True if run in aero_fcst_runs else False

Comment on lines +120 to +135
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic can be redesigned in config.base so that, e.g. do_aero_anl would be set based on RUN, making the Python logic simpler and making it easier for developers to modify. There are some other places I will tag that should be moveable as well. Should this be done as part of this PR or could it wait for a future issue?

run_options[run]['aero_anl_runs'] = aero_anl_runs
run_options[run]['aero_fcst_runs'] = aero_fcst_runs

# Append any MODE-specific options
run_options = self._netmode_run_options(run_base, run_options)

# Return the dictionary of run options
return run_options

# Finally, source the configuration files for each valid `RUN`
for run in self.task_names.keys():
self.configs[run] = self._source_configs(conf, run=run, log=False)
@abstractmethod
def _netmode_run_options(self, base: Dict[str, Any], run_options: Dict[str, Any]) -> Dict[str, Any]:
'''
Defines run-based options for a given NET_MODE case.

Parameters
----------
base: Dict
Parsed config.base settings

run_options: Dict
A dictionary with valid RUN-based sub-dictionaries containing generic options.

Returns
-------
run_options: Dict
Output dictionary with additional options valid for the given NET and MODE.
'''

# Update the base config dictionary based on application and RUN
self.configs[run]['base'] = self._update_base(self.configs[run]['base'])
# Valid NET_MODE options are defined in the appropriate subclass.

pass

@abstractmethod
def _get_app_configs(self):
Expand Down Expand Up @@ -152,13 +195,12 @@ def _source_configs(self, conf: Configuration, run: str = "gfs", log: bool = Tru
Every config depends on "config.base"
"""

configs = dict()

# Return config.base as well
configs['base'] = conf.parse_config('config.base', RUN=run)
# Include config.base by its lonesome and update it
configs = {'base': conf.parse_config('config.base', RUN=run)}
configs['base'] = self._update_base(configs['base'])

# Source the list of all config_files involved in the application
for config in self.configs_names:
for config in self._get_app_configs(run):

# All must source config.base first
files = ['config.base']
Expand All @@ -184,17 +226,17 @@ def _source_configs(self, conf: Configuration, run: str = "gfs", log: bool = Tru
return configs

@abstractmethod
def get_task_names(self, run="_no_run") -> Dict[str, List[str]]:
def get_task_names(self, run: str) -> Dict[str, List[str]]:
'''
Create a list of task names for each RUN valid for the configuation.
Create a list of valid RUNs and a dict of task names for each RUN valid for the configuation.

Parameters
----------
None

Returns
-------
Dict[str, List[str]]: Lists of tasks for each RUN.
Dict[str, List[str]]: Lists of all tasks for each RUN.

'''
pass
Expand Down
Loading
Loading