diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index 8a4f148f24..35e1dd2ae5 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -69,17 +69,17 @@ def fcst(self): dep_dict = {'type': 'task', 'name': f'stage_ic'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave: + if self.options['do_wave']: dep_dict = {'type': 'task', 'name': f'wave_init'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_aero: + if self.options['do_aero']: dep_dict = {'type': 'task', 'name': f'prep_emissions'} dependencies.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) - num_fcst_segments = len(self.app_config.fcst_segments) - 1 + num_fcst_segments = len(self.options['fcst_segments']) - 1 fcst_vars = self.envars.copy() fcst_envars_dict = {'FCST_SEGMENT': '#seg#'} @@ -115,17 +115,17 @@ def efcs(self): dep_dict = {'type': 'task', 'name': f'stage_ic'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave: + if self.options['do_wave']: dep_dict = {'type': 'task', 'name': f'wave_init'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_aero: + if self.options['do_aero']: dep_dict = {'type': 'task', 'name': f'prep_emissions'} dependencies.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) - num_fcst_segments = len(self.app_config.fcst_segments) - 1 + num_fcst_segments = len(self.options['fcst_segments']) - 1 resources = self.get_resource('efcs') # Kludge to work around bug in rocoto with serial metatasks nested @@ -444,7 +444,7 @@ def wavepostpnt(self): deps = [] dep_dict = {'type': 'metatask', 'name': f'fcst_mem#member#'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave_bnd: + if self.options['do_wave_bnd']: dep_dict = {'type': 'task', 'name': f'wave_post_bndpnt_bull_mem#member#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -481,16 +481,16 @@ def wavepostpnt(self): def extractvars(self): deps = [] - if self.app_config.do_wave: + if self.options['do_wave']: dep_dict = {'type': 'task', 'name': 'wave_post_grid_mem#member#'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_ocean: + if self.options['do_ocean']: dep_dict = {'type': 'metatask', 'name': 'ocean_prod_#member#'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_ice: + if self.options['do_ice']: dep_dict = {'type': 'metatask', 'name': 'ice_prod_#member#'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_atm: + if self.options['do_atm']: dep_dict = {'type': 'metatask', 'name': 'atmos_prod_#member#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -530,23 +530,23 @@ def arch(self): deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': 'atmos_ensstat'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_ice: + if self.options['do_ice']: dep_dict = {'type': 'metatask', 'name': 'ice_prod'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_ocean: + if self.options['do_ocean']: dep_dict = {'type': 'metatask', 'name': 'ocean_prod'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave: + if self.options['do_wave']: dep_dict = {'type': 'metatask', 'name': 'wave_post_grid'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': 'wave_post_pnt'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave_bnd: + if self.options['do_wave_bnd']: dep_dict = {'type': 'metatask', 'name': 'wave_post_bndpnt'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': 'wave_post_bndpnt_bull'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_extractvars: + if self.options['do_extractvars']: dep_dict = {'type': 'metatask', 'name': 'extractvars'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps, dep_condition='and') diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index 34b21fdf24..fb3f822741 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -45,7 +45,7 @@ def prep(self): dump_path = self._template_to_rocoto_cycstring(self._base["COM_OBSDMP_TMPL"], {'DMPDIR': dmpdir, 'DUMP_SUFFIX': dump_suffix}) - gfs_enkf = True if self.app_config.do_hybvar and 'gfs' in self.app_config.ens_runs else False + gfs_enkf = True if self.options['do_hybvar'] and 'gfs' in self.app_config.ens_runs else False deps = [] dep_dict = {'type': 'metatask', 'name': 'gdasatmos_prod', 'offset': f"-{timedelta_to_HMS(self._base['cycle_interval'])}"} @@ -188,7 +188,7 @@ def anal(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.run}prep'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_hybvar: + if self.options['do_hybvar']: dep_dict = {'type': 'metatask', 'name': 'enkfgdasepmn', 'offset': f"-{timedelta_to_HMS(self._base['cycle_interval'])}"} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -215,12 +215,12 @@ def anal(self): def sfcanl(self): deps = [] - if self.app_config.do_jediatmvar: + if self.options['do_jediatmvar']: dep_dict = {'type': 'task', 'name': f'{self.run}atmanlfinal'} else: dep_dict = {'type': 'task', 'name': f'{self.run}anal'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_jedisnowda: + if self.options['do_jedisnowda']: dep_dict = {'type': 'task', 'name': f'{self.run}snowanl'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -247,14 +247,14 @@ def sfcanl(self): def analcalc(self): deps = [] - if self.app_config.do_jediatmvar: + if self.options['do_jediatmvar']: dep_dict = {'type': 'task', 'name': f'{self.run}atmanlfinal'} else: dep_dict = {'type': 'task', 'name': f'{self.run}anal'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': f'{self.run}sfcanl'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_hybvar and self.run in ['gdas']: + if self.options['do_hybvar'] and self.run in ['gdas']: dep_dict = {'type': 'task', 'name': 'enkfgdasechgres', 'offset': f"-{timedelta_to_HMS(self._base['cycle_interval'])}"} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -329,7 +329,7 @@ def atmanlinit(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.run}prepatmiodaobs'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_hybvar: + if self.options['do_hybvar']: dep_dict = {'type': 'metatask', 'name': 'enkfgdasepmn', 'offset': f"-{timedelta_to_HMS(self._base['cycle_interval'])}"} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -337,7 +337,7 @@ def atmanlinit(self): dependencies = rocoto.create_dependency(dep=deps) gfs_cyc = self._base["gfs_cyc"] - gfs_enkf = True if self.app_config.do_hybvar and 'gfs' in self.app_config.ens_runs else False + gfs_enkf = True if self.options['do_hybvar'] and 'gfs' in self.options['ens_runs'] else False cycledef = self.run if self.run in ['gfs'] and gfs_enkf and gfs_cyc != 4: @@ -487,7 +487,7 @@ def aeroanlinit(self): dep_dict = {'type': 'task', 'name': f'{self.run}prep'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_prep_obs_aero: + if self.options['do_prep_obs_aero']: dep_dict = {'type': 'task', 'name': f'{self.run}prepobsaero'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -771,12 +771,12 @@ def ocnanalecen(self): def ocnanalchkpt(self): deps = [] - if self.app_config.do_hybvar: + if self.options['do_hybvar']: dep_dict = {'type': 'task', 'name': f'{self.run}ocnanalecen'} else: dep_dict = {'type': 'task', 'name': f'{self.run}ocnanalrun'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_mergensst: + if self.options['do_mergensst']: data = f'&ROTDIR;/{self.run}.@Y@m@d/@H/atmos/{self.run}.t@Hz.sfcanl.nc' dep_dict = {'type': 'data', 'data': data} deps.append(rocoto.add_dependency(dep_dict)) @@ -867,13 +867,13 @@ def _fcst_forecast_only(self): dep_dict = {'type': 'task', 'name': f'{self.run}stage_ic'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave and self.run in self.app_config.wave_runs: - wave_job = 'waveprep' if self.app_config.model_apps[self.run] in ['ATMW'] else 'waveinit' + if self.options['do_wave'] and self.run in self.options['wave_runs']: + wave_job = 'waveprep' if self.options['app'] in ['ATMW'] else 'waveinit' dep_dict = {'type': 'task', 'name': f'{self.run}{wave_job}'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_aero and \ - self.run in self.app_config.aero_fcst_runs and \ + if self.options['do_aero'] and \ + self.run in self.options['aero_fcst_runs'] and \ not self._base['EXP_WARM_START']: # Calculate offset based on RUN = gfs | gdas interval = None @@ -892,7 +892,7 @@ def _fcst_forecast_only(self): dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) if self.run in ['gfs']: - num_fcst_segments = len(self.app_config.fcst_segments) - 1 + num_fcst_segments = len(self.options['fcst_segments']) - 1 else: num_fcst_segments = 1 @@ -931,15 +931,15 @@ def _fcst_cycled(self): dep = rocoto.add_dependency(dep_dict) dependencies = rocoto.create_dependency(dep=dep) - if self.app_config.do_jediocnvar: + if self.options['do_jediocnvar']: dep_dict = {'type': 'task', 'name': f'{self.run}ocnanalpost'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_aero and self.run in self.app_config.aero_anl_runs: + if self.options['do_aero'] and self.run in self.options['aero_anl_runs']: dep_dict = {'type': 'task', 'name': f'{self.run}aeroanlfinal'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_jedisnowda: + if self.options['do_jedisnowda']: dep_dict = {'type': 'task', 'name': f'{self.run}snowanl'} dependencies.append(rocoto.add_dependency(dep_dict)) @@ -950,7 +950,7 @@ def _fcst_cycled(self): dependencies.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='or', dep=dependencies) - if self.app_config.do_wave and self.run in self.app_config.wave_runs: + if self.options['do_wave'] and self.run in self.options['wave_runs']: dep_dict = {'type': 'task', 'name': f'{self.run}waveprep'} dependencies.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) @@ -958,7 +958,7 @@ def _fcst_cycled(self): cycledef = 'gdas_half,gdas' if self.run in ['gdas'] else self.run if self.run in ['gfs']: - num_fcst_segments = len(self.app_config.fcst_segments) - 1 + num_fcst_segments = len(self.options['fcst_segments']) - 1 else: num_fcst_segments = 1 @@ -1265,7 +1265,7 @@ def wavepostpnt(self): deps = [] dep_dict = {'type': 'metatask', 'name': f'{self.run}fcst'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave_bnd: + if self.options['do_wave_bnd']: dep_dict = {'type': 'task', 'name': f'{self.run}wavepostbndpntbll'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -2230,54 +2230,54 @@ def arch(self): if self.run in ['gfs']: dep_dict = {'type': 'task', 'name': f'{self.run}atmanlprod'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_vminmon: + if self.options['do_vminmon']: dep_dict = {'type': 'task', 'name': f'{self.run}vminmon'} deps.append(rocoto.add_dependency(dep_dict)) elif self.run in ['gdas']: dep_dict = {'type': 'task', 'name': f'{self.run}atmanlprod'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_fit2obs: + if self.options['do_fit2obs']: dep_dict = {'type': 'task', 'name': f'{self.run}fit2obs'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_verfozn: + if self.options['do_verfozn']: dep_dict = {'type': 'task', 'name': f'{self.run}verfozn'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_verfrad: + if self.options['do_verfrad']: dep_dict = {'type': 'task', 'name': f'{self.run}verfrad'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_vminmon: + if self.options['do_vminmon']: dep_dict = {'type': 'task', 'name': f'{self.run}vminmon'} deps.append(rocoto.add_dependency(dep_dict)) - if self.run in ['gfs'] and self.app_config.do_tracker: + if self.run in ['gfs'] and self.options['do_tracker']: dep_dict = {'type': 'task', 'name': f'{self.run}tracker'} deps.append(rocoto.add_dependency(dep_dict)) - if self.run in ['gfs'] and self.app_config.do_genesis: + if self.run in ['gfs'] and self.options['do_genesis']: dep_dict = {'type': 'task', 'name': f'{self.run}genesis'} deps.append(rocoto.add_dependency(dep_dict)) - if self.run in ['gfs'] and self.app_config.do_genesis_fsu: + if self.run in ['gfs'] and self.options['do_genesis_fsu']: dep_dict = {'type': 'task', 'name': f'{self.run}genesis_fsu'} deps.append(rocoto.add_dependency(dep_dict)) # Post job dependencies dep_dict = {'type': 'metatask', 'name': f'{self.run}atmos_prod'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave: + if self.options['do_wave']: dep_dict = {'type': 'task', 'name': f'{self.run}wavepostsbs'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': f'{self.run}wavepostpnt'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave_bnd: + if self.options['do_wave_bnd']: dep_dict = {'type': 'task', 'name': f'{self.run}wavepostbndpnt'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_ocean: + if self.options['do_ocean']: if self.run in ['gfs']: dep_dict = {'type': 'metatask', 'name': f'{self.run}ocean_prod'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_ice: + if self.options['do_ice']: if self.run in ['gfs']: dep_dict = {'type': 'metatask', 'name': f'{self.run}ice_prod'} deps.append(rocoto.add_dependency(dep_dict)) # MOS job dependencies - if self.run in ['gfs'] and self.app_config.do_mos: + if self.run in ['gfs'] and self.options['do_mos']: mos_jobs = ["stn_prep", "grd_prep", "ext_stn_prep", "ext_grd_prep", "stn_fcst", "grd_fcst", "ext_stn_fcst", "ext_grd_fcst", "stn_prdgen", "grd_prdgen", "ext_stn_prdgen", "ext_grd_prdgen", @@ -2315,7 +2315,7 @@ def cleanup(self): dep_dict = {'type': 'task', 'name': f'{self.run}arch'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_gempak: + if self.options['do_gempak']: if self.run in ['gdas']: dep_dict = {'type': 'task', 'name': f'{self.run}gempakmetancdc'} deps.append(rocoto.add_dependency(dep_dict)) @@ -2324,7 +2324,7 @@ def cleanup(self): deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': f'{self.run}gempakncdcupapgif'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_goes: + if self.options['do_goes']: dep_dict = {'type': 'metatask', 'name': f'{self.run}gempakgrb2spec'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': f'{self.run}npoess_pgrb2_0p5deg'} @@ -2436,7 +2436,7 @@ def ediag(self): def eupd(self): deps = [] - if self.app_config.lobsdiag_forenkf: + if self.options['lobsdiag_forenkf']: dep_dict = {'type': 'task', 'name': f'{self.run}ediag'} else: dep_dict = {'type': 'metatask', 'name': f'{self.run}eomg'} @@ -2567,7 +2567,7 @@ def atmensanlletkf(self): def atmensanlfv3inc(self): deps = [] - if self.app_config.lobsdiag_forenkf: + if self.options['lobsdiag_forenkf']: dep_dict = {'type': 'task', 'name': f'{self.run}atmensanlsol'} else: dep_dict = {'type': 'task', 'name': f'{self.run}atmensanlletkf'} @@ -2645,7 +2645,7 @@ def _get_ecengroups(): deps = [] dep_dict = {'type': 'task', 'name': f'{self.run.replace("enkf","")}analcalc'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_jediatmens: + if self.options['do_jediatmens']: dep_dict = {'type': 'task', 'name': f'{self.run}atmensanlfinal'} else: dep_dict = {'type': 'task', 'name': f'{self.run}eupd'} @@ -2689,12 +2689,12 @@ def esfc(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.run.replace("enkf","")}analcalc'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_jediatmens: + if self.options['do_jediatmens']: dep_dict = {'type': 'task', 'name': f'{self.run}atmensanlfinal'} else: dep_dict = {'type': 'task', 'name': f'{self.run}eupd'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_jedisnowda: + if self.options['do_jedisnowda']: dep_dict = {'type': 'task', 'name': f'{self.run}esnowrecen'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index ea84f5143a..21107afaa0 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -44,6 +44,9 @@ def __init__(self, app_config: AppConfig, run: str) -> None: # Get the configs for the specified RUN self._configs = self.app_config.configs[run] + # Get the workflow options for the specified RUN + self.options = self.app_config.run_options[run] + # Update the base config for the application self._configs['base'] = self.app_config._update_base(self._configs['base'])