Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Override constrain_resources() in individual steps #481

Merged
merged 9 commits into from
Dec 20, 2022
19 changes: 0 additions & 19 deletions compass/ocean/tests/global_convergence/cosine_bell/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,6 @@ def configure(self):

self.update_cores()

def run(self):
"""
Run each step of the testcase
"""
config = self.config
for resolution in self.resolutions:
if self.icosahedral:
mesh_name = f'Icos{resolution}'
else:
mesh_name = f'QU{resolution}'
ntasks = config.getint('cosine_bell', f'{mesh_name}_ntasks')
min_tasks = config.getint('cosine_bell', f'{mesh_name}_min_tasks')
step = self.steps[f'{mesh_name}_forward']
step.ntasks = ntasks
step.min_tasks = min_tasks

# run the step
super().run()

def update_cores(self):
""" Update the number of cores and min_tasks for each forward step """

Expand Down
18 changes: 18 additions & 0 deletions compass/ocean/tests/global_convergence/cosine_bell/forward.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ class Forward(Step):
----------
resolution : int
The resolution of the (uniform) mesh in km

mesh_name : str
The name of the mesh
"""

def __init__(self, test_case, resolution, mesh_name):
Expand All @@ -35,6 +38,7 @@ def __init__(self, test_case, resolution, mesh_name):
subdir=f'{mesh_name}/forward')

self.resolution = resolution
self.mesh_name = mesh_name

# make sure output is double precision
self.add_streams_file('compass.ocean.streams', 'streams.output')
Expand All @@ -61,6 +65,14 @@ def setup(self):
"""
dt = self.get_dt()
self.add_namelist_options({'config_dt': dt})
self._get_resources()

def constrain_resources(self, available_cores):
"""
Update resources at runtime from config options
"""
self._get_resources()
super().constrain_resources(available_cores)

def run(self):
"""
Expand Down Expand Up @@ -92,3 +104,9 @@ def get_dt(self):
dt = time.strftime('%H:%M:%S', time.gmtime(dt))

return dt

def _get_resources(self):
mesh_name = self.mesh_name
config = self.config
self.ntasks = config.getint('cosine_bell', f'{mesh_name}_ntasks')
self.min_tasks = config.getint('cosine_bell', f'{mesh_name}_min_tasks')
66 changes: 25 additions & 41 deletions compass/ocean/tests/global_ocean/forward.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,9 @@ class ForwardStep(Step):
time_integrator : {'split_explicit', 'RK4'}
The time integrator to use for the forward run

ntasks_from_config : bool
Whether to get ``ntasks`` from the config file

min_tasks_from_config : bool
Whether to get ``min_tasks`` from the config file

threads_from_config : bool
Whether to get ``threads`` from the config file
resources_from_config : bool
Whether to get ``ntasks``, ``min_tasks`` and ``openmp_threads`` from
config options
"""
def __init__(self, test_case, mesh, init, time_integrator, name='forward',
subdir=None, ntasks=None, min_tasks=None,
Expand Down Expand Up @@ -81,9 +76,10 @@ def __init__(self, test_case, mesh, init, time_integrator, name='forward',
ntasks=ntasks, min_tasks=min_tasks,
openmp_threads=openmp_threads)

self.ntasks_from_config = ntasks is None
self.min_tasks_from_config = min_tasks is None
self.threads_from_config = openmp_threads is None
if (ntasks is None) != (openmp_threads is None):
raise ValueError('You must specify both ntasks and openmp_threads '
'or neither.')
self.resources_from_config = ntasks is None

# make sure output is double precision
self.add_streams_file('compass.ocean.streams', 'streams.output')
Expand Down Expand Up @@ -140,15 +136,14 @@ def setup(self):
Set up the test case in the work directory, including downloading any
dependencies
"""
if self.ntasks_from_config:
self.ntasks = self.config.getint(
'global_ocean', 'forward_ntasks')
if self.min_tasks_from_config:
self.min_tasks = self.config.getint(
'global_ocean', 'forward_min_tasks')
if self.threads_from_config:
self.openmp_threads = self.config.getint(
'global_ocean', 'forward_threads')
self._get_resources()

def constrain_resources(self, available_cores):
"""
Update resources at runtime from config options
"""
self._get_resources()
super().constrain_resources(available_cores)

def run(self):
"""
Expand All @@ -158,6 +153,16 @@ def run(self):
add_mesh_and_init_metadata(self.outputs, self.config,
init_filename='init.nc')

def _get_resources(self):
# get the these properties from the config options
if self.resources_from_config:
config = self.config
self.ntasks = config.getint(
'global_ocean', 'forward_ntasks')
self.min_tasks = config.getint(
'global_ocean', 'forward_min_tasks')
self.openmp_threads = config.getint(
'global_ocean', 'forward_threads')

class ForwardTestCase(TestCase):
"""
Expand Down Expand Up @@ -208,27 +213,6 @@ def configure(self):
"""
configure_global_ocean(test_case=self, mesh=self.mesh, init=self.init)

def run(self):
"""
Run each step of the testcase
"""
config = self.config
for step_name in self.steps_to_run:
step = self.steps[step_name]
if isinstance(step, ForwardStep):
if step.ntasks_from_config:
step.ntasks = config.getint('global_ocean',
'forward_ntasks')
if step.min_tasks_from_config:
step.min_tasks = config.getint('global_ocean',
'forward_min_tasks')
if step.threads_from_config:
step.threads = config.getint('global_ocean',
'forward_threads')

# run the steps
super().run()


def get_forward_subdir(init_subdir, time_integrator, name):
"""
Expand Down
25 changes: 1 addition & 24 deletions compass/ocean/tests/global_ocean/init/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,37 +65,14 @@ def __init__(self, test_group, mesh, initial_condition, with_bgc):

if mesh.with_ice_shelf_cavities:
self.add_step(
SshAdjustment(test_case=self, ntasks=4))
SshAdjustment(test_case=self))

def configure(self):
"""
Modify the configuration options for this test case
"""
configure_global_ocean(test_case=self, mesh=self.mesh, init=self)

def run(self):
"""
Run each step of the testcase
"""
config = self.config
steps = self.steps_to_run
if 'initial_state' in steps:
step = self.steps['initial_state']
# get the these properties from the config options
step.ntasks = config.getint('global_ocean', 'init_ntasks')
step.min_tasks = config.getint('global_ocean', 'init_min_tasks')
step.threads = config.getint('global_ocean', 'init_threads')

if 'ssh_adjustment' in steps:
step = self.steps['ssh_adjustment']
# get the these properties from the config options
step.ntasks = config.getint('global_ocean', 'forward_ntasks')
step.min_tasks = config.getint('global_ocean', 'forward_min_tasks')
step.threads = config.getint('global_ocean', 'forward_threads')

# run the steps
super().run()

def validate(self):
"""
Test cases can override this method to perform validation of variables
Expand Down
24 changes: 17 additions & 7 deletions compass/ocean/tests/global_ocean/init/initial_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,17 @@ def __init__(self, test_case, mesh, initial_condition, with_bgc):

def setup(self):
"""
Set up the test case in the work directory, including downloading any
dependencies.
Get resources at setup from config options
"""
# get the these properties from the config options
config = self.config
self.ntasks = config.getint('global_ocean', 'init_ntasks')
self.min_tasks = config.getint('global_ocean', 'init_min_tasks')
self.openmp_threads = config.getint('global_ocean', 'init_threads')
self._get_resources()


def constrain_resources(self, available_cores):
"""
Update resources at runtime from config options
"""
self._get_resources()
super().constrain_resources(available_cores)

def run(self):
"""
Expand All @@ -175,3 +178,10 @@ def run(self):

plot_initial_state(input_file_name='initial_state.nc',
output_file_name='initial_state.png')

def _get_resources(self):
# get the these properties from the config options
config = self.config
self.ntasks = config.getint('global_ocean', 'init_ntasks')
self.min_tasks = config.getint('global_ocean', 'init_min_tasks')
self.openmp_threads = config.getint('global_ocean', 'init_threads')
46 changes: 17 additions & 29 deletions compass/ocean/tests/global_ocean/init/ssh_adjustment.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,16 @@ class SshAdjustment(Step):
A step for iteratively adjusting the pressure from the weight of the ice
shelf to match the sea-surface height as part of ice-shelf 2D test cases
"""
def __init__(self, test_case, ntasks=None, min_tasks=None,
openmp_threads=None):
def __init__(self, test_case):
"""
Create the step

Parameters
----------
test_case : compass.ocean.tests.global_ocean.init.Init
The test case this step belongs to

ntasks : int, optional
the number of tasks the step would ideally use. If fewer tasks
are available on the system, the step will run on all available
tasks as long as this is not below ``min_tasks``

min_tasks : int, optional
the number of tasks the step requires. If the system has fewer
than this number of tasks, the step will fail

openmp_threads : int, optional
the number of OpenMP threads the step will use

"""
if min_tasks is None:
min_tasks = ntasks
super().__init__(test_case=test_case, name='ssh_adjustment',
ntasks=ntasks, min_tasks=min_tasks,
openmp_threads=openmp_threads)
super().__init__(test_case=test_case, name='ssh_adjustment')

# make sure output is double precision
self.add_streams_file('compass.ocean.streams', 'streams.output')
Expand Down Expand Up @@ -71,15 +53,14 @@ def setup(self):
Set up the test case in the work directory, including downloading any
dependencies
"""
if self.ntasks is None:
self.ntasks = self.config.getint(
'global_ocean', 'forward_ntasks')
if self.min_tasks is None:
self.min_tasks = self.config.getint(
'global_ocean', 'forward_min_tasks')
if self.openmp_threads is None:
self.openmp_threads = self.config.getint(
'global_ocean', 'forward_threads')
self._get_resources()

def constrain_resources(self, available_cores):
"""
Update resources at runtime from config options
"""
self._get_resources()
super().constrain_resources(available_cores)

def run(self):
"""
Expand All @@ -89,3 +70,10 @@ def run(self):
iteration_count = config.getint('ssh_adjustment', 'iterations')
adjust_ssh(variable='landIcePressure', iteration_count=iteration_count,
step=self)

def _get_resources(self):
# get the these properties from the config options
config = self.config
self.ntasks = config.getint('global_ocean', 'forward_ntasks')
self.min_tasks = config.getint('global_ocean', 'forward_min_tasks')
self.openmp_threads = config.getint('global_ocean', 'forward_threads')
7 changes: 0 additions & 7 deletions compass/ocean/tests/hurricane/forward/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,3 @@ def configure(self):
Modify the configuration options for this test case
"""
configure_hurricane(test_case=self, mesh=self.mesh)

def run(self):
"""
Run each step of the testcase
"""
# run the steps
super().run()
19 changes: 15 additions & 4 deletions compass/ocean/tests/hurricane/forward/forward.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,24 @@ def setup(self):
Set up the test case in the work directory, including downloading any
dependencies
"""
self.ntasks = self.config.getint('hurricane', 'forward_ntasks')
self.min_tasks = self.config.getint('hurricane', 'forward_min_tasks')
self.openmp_threads = self.config.getint('hurricane',
'forward_threads')
self._get_resources()

def constrain_resources(self, available_cores):
"""
Update resources at runtime from config options
"""
self._get_resources()
super().constrain_resources(available_cores)

def run(self):
"""
Run this step of the testcase
"""
run_model(self)

def _get_resources(self):
# get the these properties from the config options
config = self.config
self.ntasks = config.getint('hurricane', 'forward_ntasks')
self.min_tasks = config.getint('hurricane', 'forward_min_tasks')
self.openmp_threads = config.getint('hurricane', 'forward_threads')
8 changes: 0 additions & 8 deletions compass/ocean/tests/hurricane/init/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,3 @@ def configure(self):
Modify the configuration options for this test case
"""
configure_hurricane(test_case=self, mesh=self.mesh)

def run(self):
"""
Run each step of the testcase
"""

# run the steps
super().run()
Loading