diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 00f2f5c5009..973c59f2072 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -411,6 +411,44 @@ steps: --job_id sphere_ssp_baroclinic_wave_rhoe_equilmoist_earth artifact_paths: "sphere_ssp_baroclinic_wave_rhoe_equilmoist_earth/output_active/*" + - group: "Restarting" + steps: + + - label: ":computer: test restart" + command: > + julia --color=yes --project=examples test/restart.jl + agents: + slurm_mem: 16GB + + - label: ":computer: test restart GPU" + command: > + julia --color=yes --project=examples test/restart.jl + env: + CLIMACOMMS_DEVICE: "CUDA" + agents: + slurm_gpus: 1 + slurm_mem: 16G + + - label: ":computer: test restart MPI" + command: > + srun julia --color=yes --project=examples test/restart.jl + env: + CLIMACOMMS_CONTEXT: "MPI" + agents: + slurm_ntasks: 2 + slurm_mem: 16G + + - label: ":computer: test restart GPU MPI" + command: > + srun julia --color=yes --project=examples test/restart.jl + env: + CLIMACOMMS_CONTEXT: "MPI" + CLIMACOMMS_DEVICE: "CUDA" + agents: + slurm_gpus_per_task: 1 + slurm_ntasks: 2 + slurm_mem: 16G + - group: "MPI Examples" steps: @@ -447,17 +485,6 @@ steps: #retry: # automatic: true - - label: ":computer: MPI GPU test restart" - command: > - srun julia --color=yes --project=examples test/restart.jl - env: - CLIMACOMMS_CONTEXT: "MPI" - CLIMACOMMS_DEVICE: "CUDA" - agents: - slurm_gpus_per_task: 1 - slurm_ntasks: 2 - slurm_mem: 16GB - - label: ":computer: MPI no lim aquaplanet (ρe) equilmoist clearsky radiation" command: > srun julia --color=yes --project=examples examples/hybrid/driver.jl diff --git a/examples/Manifest.toml b/examples/Manifest.toml index d247f216bf8..d9db23eb450 100644 --- a/examples/Manifest.toml +++ b/examples/Manifest.toml @@ -336,9 +336,9 @@ weakdeps = ["CUDA", "MPI"] [[deps.ClimaCore]] deps = ["Adapt", "BandedMatrices", "BlockArrays", "ClimaComms", "CubedSphere", "DataStructures", "DocStringExtensions", "ForwardDiff", "GaussQuadrature", "GilbertCurves", "HDF5", "InteractiveUtils", "IntervalSets", "KrylovKit", "LinearAlgebra", "MultiBroadcastFusion", "NVTX", "PkgVersion", "RecursiveArrayTools", "RootSolvers", "SparseArrays", "StaticArrays", "Statistics", "Unrolled"] -git-tree-sha1 = "806e8490ff1aa664ca579544d798f8addfa1b07d" +git-tree-sha1 = "527b11c35f00db0064b77a25fc881f2a2982abda" uuid = "d414da3d-4745-48bb-8d80-42e94e092884" -version = "0.14.15" +version = "0.14.16" weakdeps = ["CUDA", "Krylov"] [deps.ClimaCore.extensions] diff --git a/src/solver/type_getters.jl b/src/solver/type_getters.jl index af9ce7619e5..5e5f00cf968 100644 --- a/src/solver/type_getters.jl +++ b/src/solver/type_getters.jl @@ -301,9 +301,7 @@ function get_state_restart(config::AtmosConfig, restart_file, atmos_model_hash) atmos_model_hash_in_restart = InputOutput.HDF5.read_attribute(reader.file, "atmos_model_hash") if atmos_model_hash_in_restart != atmos_model_hash - error( - "Restart file $(restart_file) was constructed with a different AtmosModel", - ) + @warn "Restart file $(restart_file) was constructed with a different AtmosModel, no consistency check was performed" end return (Y, t_start) end @@ -484,6 +482,42 @@ thermo_state_type(::EquilMoistModel, ::Type{FT}) where {FT} = TD.PhaseEquil{FT} thermo_state_type(::NonEquilMoistModel, ::Type{FT}) where {FT} = TD.PhaseNonEquil{FT} +function get_restart_file(config::AtmosConfig, base_output_dir) + restart_file = nothing + (; parsed_args) = config + + if parsed_args["detect_restart_file"] && isdir(base_output_dir) + lowercase(parsed_args["output_dir_style"]) == "activelink" || + error("detect_restart_file works only with ActiveLink") + # output_dir will be something like ABC/DEF/output_1234 + name_rx = r"output_(\d\d\d\d)" + restart_file_rx = r"day\d+\.\w+\.hdf5" + + existing_outputs = + filter(x -> !isnothing(match(name_rx, x)), readdir(base_output_dir)) + if !isempty(existing_outputs) + latest_output = first(sort(existing_outputs, rev = true)) + previous_folder = joinpath(base_output_dir, latest_output) + possible_restart_files = filter( + f -> occursin(restart_file_rx, f), + readdir(previous_folder), + ) + if !isempty(possible_restart_files) + restart_file_name = + last(CA.sort_files_by_time(possible_restart_files)) + restart_file = joinpath(previous_folder, restart_file_name) + end + end + end + + # If a restart file was passed, override what we detected automatically + if !isnothing(parsed_args["restart_file"]) + restart_file = parsed_args["restart_file"] + end + + return restart_file +end + function get_sim_info(config::AtmosConfig) (; parsed_args) = config FT = eltype(config) @@ -503,53 +537,16 @@ function get_sim_info(config::AtmosConfig) haskey(allowed_dir_styles, lowercase(requested_style)) || error("output_dir_style $(requested_style) not available") + # We look for a restart before creating a new output dir because we want to + # look for previous folders + restart_file = get_restart_file(config, base_output_dir) + output_dir = OutputPathGenerator.generate_output_path( base_output_dir; context = config.comms_ctx, style = allowed_dir_styles[lowercase(requested_style)], ) - restart_file = nothing - - if parsed_args["detect_restart_file"] - lowercase(parsed_args["output_dir_style"]) == "activelink" || - error("detect_restart_file works only with ActiveLink") - # output_dir will be something like ......./output_1234 - name_rx = r"output_(\d\d\d\d)" - restart_file_rx = r"day\d+\.\w+\.hdf5" - counter_match = match(name_rx, output_dir) - if !isnothing(counter_match) - counter = parse(Int, counter_match[1]) - if counter != 0 - # As implemented in ClimaUtilities.OutputPathGenerator - previous_counter = counter - 1 - previous_counter_str = lpad(previous_counter, 4, "0") - counter_str = lpad(counter, 4, "0") - - previous_folder = - replace(output_dir, counter_str => previous_counter_str) - - isdir(previous_folder) || - error("Could not find a folder in $(previous_folder)") - - possible_restart_files = filter( - f -> occursin(restart_file_rx, f), - readdir(previous_folder), - ) - if !isempty(possible_restart_files) - restart_file_name = - last(CA.sort_files_by_time(possible_restart_files)) - restart_file = joinpath(previous_folder, restart_file_name) - end - end - end - end - - # If a restart file was passed, override what we detected automatically - if !isnothing(parsed_args["restart_file"]) - restart_file = parsed_args["restart_file"] - end - isnothing(restart_file) || @info "Restarting simulation from file $restart_file" diff --git a/test/restart.jl b/test/restart.jl index 277bb6878a2..ade43071e04 100644 --- a/test/restart.jl +++ b/test/restart.jl @@ -1,210 +1,306 @@ +redirect_stderr(IOContext(stderr, :stacktrace_types_limited => Ref(false))) import ClimaAtmos as CA -import ClimaCore: Fields, Geometry +import ClimaCore +import ClimaCore: DataLayouts, Fields, Geometry +import ClimaCore.Fields: Field +import ClimaCore.DataLayouts: AbstractData +import ClimaCore.Geometry: AxisTensor import ClimaComms +pkgversion(ClimaComms) >= v"0.6" && ClimaComms.@import_required_backends import Logging using Test -function compare( - one, - two; - name = "", - ignore = [ - :scratch, - :ghost_buffer, - :output_dir, - :hyperdiffusion_ghost_buffer, - ], +const device = ClimaComms.device() +const comms_ctx = ClimaComms.context(device) +ClimaComms.init(comms_ctx) + +# Test.jl really wants to give you a stacktrace for failing tests. This seems to be +# hardcoded in the package and not easy to change without defining a whole new +# AbstractTestSet. We don't want stacktraces, we just want to know which fields are +# different. +# +# The only way I (Gabriele) could find to run tests and not display the stack trace is to +# use `let` blocks. When `let` blocks are used, a different TestSet type is used internally, +# and this type does not add stack traces. The drawback is that it does not collect +# statistics on how many tests pass and how many fail. Also, when using let blocks, all the +# nested testsets have to be using let blocks. +# +# We need to try-catch because @test thorws exceptions. Exceptions are normally captured by +# the @testset, but this does not work when using a let block. :( +# +# And there's a final snag: by try-catching, the error is not propagated, so the +# job is not marked as failing. We have to manually trigger exit with an error +# status when the job has failed. +FAILED = Ref(0) +macro testmsg(ex, str) + esc(quote + try + @testset let name = $str + @test $ex + end + catch + FAILED[] = 1 + end + end) +end + +# This test checks that: + +# 1. A simulation, saved to a checkpoint, is read back identically (up to some +# tolerance and excluding those fields that are computed during the +# calculation of the tendencies) +# 2. A simulation, saved to a previous checkpoint, and read back and evolved to +# the same time is identical (up to some tolerance) +# 3. ClimaAtmos can automatically detect restarts + +""" + _error(arr1::AbstractArray, arr2::AbstractArray; ABS_TOL = 100eps(eltype(arr1))) + +We compute the error in this way: +- when the absolute value is larger than ABS_TOL, we use the absolute error +- in the other cases, we compare the relative errors +""" +function _error( + arr1::AbstractArray, + arr2::AbstractArray; + ABS_TOL = 100eps(eltype(arr1)), ) - propertynames(one) == propertynames(two) || - error("Cannot compare these objects") + diff = abs.(arr1 .- arr2) + denominator = abs.(arr1) + error = ifelse.(denominator .> ABS_TOL, diff ./ denominator, diff) + return error +end + +""" + compare(v1, v2; name = "", ignore = Set([])) + +Recursively compare `v1` and `v2` up to some numeric tolerance. + +`compare` walks through all the properties in `v1` and `v2` until it finds +that there are no more properties. At that point, `compare` tries to match the +resulting objects. When such objects are arrays with floating point, `compare` +defines a notion of `error` that is the following: when the absolute value is +less than `100eps(eltype)`, `error = absolute_error`, otherwise it is relative +error. The `error` is then compared against a tolerance. + +Keyword arguments +================= - properties = filter(x -> !(x in ignore), propertynames(one)) +- `name` is used to collect the name of the property while we go recursively + over all the properties. You can pass a base name. +- `ignore` is a collection of `Symbol`s that identify properties that are + ignored when walking through the tree. This is useful for properties that + are known to be different (e.g., `output_dir`). +""" +function compare(v1, v2; name = "", ignore = Set([])) + @testmsg propertynames(v1) == propertynames(v2) name + + properties = filter(x -> !(x in ignore), propertynames(v1)) if isempty(properties) - @test typeof(one) == typeof(two) - # Objects that we know how to compare - if one isa Fields.Field || one isa Array || one isa Number - if eltype(one) <: Geometry.AxisTensor + # Base case + @testmsg typeof(v1) == typeof(v2) name + + if v1 isa Field || v1 isa Array || v1 isa AbstractData + if eltype(v1) <: AxisTensor # We have to handle the AxisTensor case separately because it # behaves differently - for (f, g) in zip(one, two) + for (f, g) in zip(v1, v2) compare(f, g; name = "$(name)") end + elseif eltype(v1) <: CartesianIndex + @testmsg v1 == v2 name else - one isa Number && (one = [one]) - two isa Number && (two = [two]) - - arr1 = Array(parent(one)) - arr2 = Array(parent(two)) - - # Calculate element-wise relative difference, avoiding division by zero - diff = abs.(arr1 .- arr2) - denominator = abs.(arr1) - relative_diff = - ifelse.( - denominator .> 0, - diff ./ denominator, - ifelse.(diff .== 0, 0.0, Inf), - ) - - if !isempty(relative_diff) - # Check if the max relative differences is within tolerance - max_error = maximum(relative_diff) - println("Relative error in $name: ", max_error) - else - println("$name is empty on both sides") - end + arr1, arr2 = Array(parent(v1)), Array(parent(v2)) if eltype(arr1) <: AbstractFloat - @test max_error < 100eps(eltype(arr1)) + error = _error(arr1, arr2) + isempty(error) || + @testmsg maximum(error) <= 100eps(eltype(arr1)) name else - @test max_error ≈ 0 + @testmsg arr1 == arr2 name end end - elseif Base.issingletontype(typeof(one)) + elseif Base.issingletontype(typeof(v1)) # We have already tested this - elseif one isa AbstractString - @test one == two + elseif v1 isa AbstractString || v1 isa Symbol + @testmsg v1 == v2 name + elseif v1 isa Number + # We check with triple equal so that we also catch NaNs being equal + @testmsg v1 === v2 name else - println("Cannot compare $name") - @test false + # Uncaught case, we cannot compare the two + @testmsg false name end else # Recursive case for p in properties compare( - getproperty(one, p), - getproperty(two, p); + getproperty(v1, p), + getproperty(v2, p); name = "$(name).$(p)", ) end end end -@testset "Test restarts across configuration combinations" begin - # Disable all the @info statements that are produced when creating a simulation - Logging.disable_logging(Logging.Info) - - ### Test Description - # Generate a simulation with some complexity of - # config arguments. Some config combinations are - # incompatible so we do not sweep over all possible - # iterations. - - # Modify the timestep to 1-second increments. - # Save simulation state at each timestep, - # and generate a restart file at 0secs, 2secs simulation time. - # Verify objects read in using ClimaCore.InputOutput functions - # are identical (i.e. restarts result - # in the same simulation states as if one were to advance - # the timestepper uninterrupted.) - - # TODO: Restart and diagnostic behaviour needs to be - # clearly defined when config files have different - # settings (or when tendency computations conflict with - # dt or t_end parsed args) - - # for configuration in ["sphere", "column"] - # for moisture in ["equil"] - # for turb_conv in ["diagnostic_edmfx", "prognostic_edmfx"] - # for precip in ["0M", "1M"] - - configuration = "sphere" - moisture = "equil" - turb_conv = "prognostic_edmfx" - precip = "0M" - - mktempdir() do output_loc - job_id = "restart_$(configuration)_$(moisture)_$(turb_conv)_$(precip)" - test_dict = Dict( - "check_nan_every" => 3, - "log_progress" => false, - "moist" => moisture, - "precip_model" => precip, - "config" => configuration, - "turbconv" => turb_conv, - "perturb_initstate" => false, - "dt" => "1secs", - "insolation" => "timevarying", - "rad" => "allskywithclear", - "surface_setup" => "DefaultMoninObukhov", - "implicit_diffusion" => true, - "t_end" => "3secs", - "dt_save_state_to_disk" => "1secs", - "enable_diagnostics" => false, - "output_dir" => joinpath(output_loc, job_id), - ) - - println("output_dir: $(test_dict["output_dir"])") - - config = CA.AtmosConfig(test_dict, job_id = job_id) - - simulation = CA.get_simulation(config) - CA.solve_atmos!(simulation) - - # Check re-importing the same state - restart_dir = simulation.output_dir - @test isfile(joinpath(restart_dir), "day0.3.hdf5") - - config_should_be_same = CA.AtmosConfig( - merge(test_dict, Dict("detect_restart_file" => true)), - job_id = job_id, - ) - - simulation_restarted = CA.get_simulation(config_should_be_same) - println("Check file-read from checkpoint data") - println("Checking integrator for the case where we just read the data") - compare( - simulation.integrator.u, - simulation_restarted.integrator.u; - name = "integrator.u", - ) - compare( - simulation.integrator.p, - simulation_restarted.integrator.p; - name = "integrator.p", - ) - - # Check re-importing from previous state and advancing one step - restart_file = joinpath(simulation.output_dir, "day0.2.hdf5") - @test isfile(joinpath(restart_dir), "day0.2.hdf5") - println("Restart from specific file") - config2 = CA.AtmosConfig( - merge(test_dict, Dict("restart_file" => restart_file)), - job_id = job_id, - ) - - simulation_restarted2 = CA.get_simulation(config2) - println("Advancing restarted simulation") - CA.solve_atmos!(simulation_restarted2) - println("Restarted simulation complete") - println("Checking integrator.u for the case where we start from 2s") - compare( - simulation.integrator.u, - simulation_restarted2.integrator.u; - name = "integrator.u", - ) - compare( - simulation.integrator.p, - simulation_restarted2.integrator.p; - name = "integrator.p", - ) - - # Test that we can catch an Atmos model changing across restarts - config_different = CA.AtmosConfig( - merge( - test_dict, - Dict( - "restart_file" => restart_file, - "insolation" => "rcemipii", - ), - ), - job_id = job_id * "_different", - ) - @test_throws ErrorException CA.get_simulation(config_different) - - # end - # end - # end - # end +# Begin tests + +# Disable all the @info statements that are produced when creating a simulation +Logging.disable_logging(Logging.Info) + +if comms_ctx isa ClimaComms.SingletonCommsContext + configurations = ["sphere", "box", "column"] +else + configurations = ["sphere", "box"] +end + +for configuration in configurations + if configuration == "sphere" + moistures = ["equil", "nonequil"] + precips = ["0M", "1M"] + topography = "Earth" + else + moistures = ["nonequil"] + precips = ["1M"] + topography = "NoWarp" + end + + for moisture in moistures + for precip in precips + @testset let config = "$configuration $moisture $precip $topography" + # The `enable_bubble` case is broken for ClimaCore < 0.14.6, so we + # hard-code this to be always false for those versions + bubble = pkgversion(ClimaCore) > v"0.14.5" + + turb_conv = "diagnostic_edmfx" + precip = "0M" + + # Make sure that all MPI processes agree on the output_loc + output_loc = + ClimaComms.iamroot(comms_ctx) ? mktempdir(pwd()) : "" + output_loc = ClimaComms.bcast(comms_ctx, output_loc) + ClimaComms.barrier(comms_ctx) + + job_id = "restart" + test_dict = Dict( + "test_dycore_consistency" => true, # We will add NaNs to the cache, just to make sure + "check_nan_every" => 3, + "log_progress" => false, + "moist" => moisture, + "precip_model" => precip, + "config" => configuration, + "topography" => topography, + # "turbconv" => turb_conv, + "dt" => "1secs", + "bubble" => bubble, + "viscous_sponge" => true, + "rayleigh_sponge" => true, + # "insolation" => "timevarying", + # "rad" => "clearsky", + # "dt_rad" => "1secs", + "surface_setup" => "DefaultMoninObukhov", + # "implicit_diffusion" => true, + "call_cloud_diagnostics_per_stage" => true, # Needed to ensure that cloud variables are computed + "t_end" => "3secs", + "dt_save_state_to_disk" => "1secs", + "enable_diagnostics" => false, + "output_dir" => joinpath(output_loc, job_id), + ) + + config = CA.AtmosConfig(test_dict; job_id, comms_ctx) + + simulation = CA.get_simulation(config) + CA.solve_atmos!(simulation) + + # Check re-importing the same state + restart_dir = simulation.output_dir + @test isfile(joinpath(restart_dir), "day0.3.hdf5") + + @testset let what = "just reading data" + config_should_be_same = CA.AtmosConfig( + merge(test_dict, Dict("detect_restart_file" => true)); + job_id, + comms_ctx, + ) + + simulation_restarted = + CA.get_simulation(config_should_be_same) + + compare( + simulation.integrator.u, + simulation_restarted.integrator.u; + name = "integrator.u", + ignore = Set([:rc]), # rc is some CUDA/CuArray internal object that we don't care about + ) + compare( + axes(simulation.integrator.u.c), + axes(simulation_restarted.integrator.u.c); + name = "center_space", + ignore = Set([:rc]), # rc is some CUDA/CuArray internal object that we don't care about + ) + compare( + axes(simulation.integrator.u.f), + axes(simulation_restarted.integrator.u.f); + name = "face_space", + ignore = Set([:rc]), # rc is some CUDA/CuArray internal object that we don't care about + ) + compare( + simulation.integrator.p, + simulation_restarted.integrator.p; + name = "integrator.p", + ignore = Set([ + :ghost_buffer, + :hyperdiffusion_ghost_buffer, + :scratch, + :output_dir, + :ghost_buffer, + # Computed in tendencies (which are not computed in this case) + :hyperdiff, + :precipitation, + # rc is some CUDA/CuArray internal object that we don't care about + :rc, + ]), + ) + end + + @testset let what = "reading and simulating" + # Check re-importing from previous state and advancing one step + restart_file = + joinpath(simulation.output_dir, "day0.2.hdf5") + @test isfile(joinpath(restart_dir), "day0.2.hdf5") + # Restart from specific file + config2 = CA.AtmosConfig( + merge(test_dict, Dict("restart_file" => restart_file)), + job_id, + comms_ctx, + ) + + simulation_restarted2 = CA.get_simulation(config2) + CA.fill_with_nans!(simulation_restarted2.integrator.p) + + CA.solve_atmos!(simulation_restarted2) + compare( + simulation.integrator.u, + simulation_restarted2.integrator.u; + name = "integrator.u", + ignore = Set([:rc]), # rc is some CUDA/CuArray internal object that we don't care about + ) + compare( + simulation.integrator.p, + simulation_restarted2.integrator.p; + name = "integrator.p", + ignore = Set([ + :scratch, + :output_dir, + :ghost_buffer, + :hyperdiffusion_ghost_buffer, + :rc, + ]), + ) + end + end + end end end + +@test FAILED[] == 0