Skip to content

Commit

Permalink
Add support for distributed CUDA mapreduce
Browse files Browse the repository at this point in the history
Update bubble_3d_invariant_rhoe driver for distributed CUDA run
  • Loading branch information
sriharshakandala committed Jun 21, 2023
1 parent eb6f155 commit da03513
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 81 deletions.
9 changes: 9 additions & 0 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,15 @@ steps:
agents:
slurm_gpus: 1

- label: "Unit: distributed reduction cuda"
key: unit_distributed_reduction_cuda
command:
- "julia --project -e 'using CUDA; CUDA.versioninfo()'"
- "srun julia --color=yes --check-bounds=yes --project=test test/Fields/reduction_cuda_distributed.jl"
agents:
slurm_gpus_per_task: 1
slurm_ntasks: 2

- group: "Unit: Operators"
steps:

Expand Down
30 changes: 16 additions & 14 deletions examples/hybrid/box/bubble_3d_invariant_rhoe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ end

function rhs_invariant!(dY, Y, ghost_buffer, t)
(; C_p, C_v, MSLP, grav, R_d, T_0) = ghost_buffer.params
(; z, κ₄, cω³, fω¹², fu¹², cuvw, cE, ce, cI, cT, cp, ch_tot) = ghost_buffer
(; z, κ₄, cω³, fω¹², fu¹², fu³, cuvw, cE, ce, cI, cT, cp, ch_tot) =
ghost_buffer
= Y.Yc.ρ # scalar on centers
fw = Y.w # Covariant3Vector on faces
cuₕ = Y.uₕ # Covariant12Vector on centers
Expand Down Expand Up @@ -301,7 +302,7 @@ function rhs_invariant!(dY, Y, ghost_buffer, t)
Geometry.Contravariant12Vector.(
Geometry.Covariant123Vector.(Ic2f.(cuₕ)),
) # Contravariant12Vector in 3D
fu³ = Geometry.Contravariant3Vector.(Geometry.Covariant123Vector.(fw))
fu³ .= Geometry.Contravariant3Vector.(Geometry.Covariant123Vector.(fw))
@. dw -= fω¹² × fu¹² # Covariant3Vector on faces
@. duₕ -= If2c(fω¹² × fu³)

Expand Down Expand Up @@ -370,11 +371,11 @@ function bubble_3d_invariant_ρe(ARGS, comms_ctx, ::Type{FT}) where {FT}
args = ()
end

if ClimaComms.iamroot(comms_ctx)
@info "Context information" device = comms_ctx.device context =
comms_ctx nprocs = ClimaComms.nprocs(comms_ctx) Float_type = FT resolution =
resolution
end
logger_stream = ClimaComms.iamroot(comms_ctx) ? stderr : devnull
prev_logger = global_logger(ConsoleLogger(logger_stream, Logging.Info))

@info "Context information" device = comms_ctx.device context = comms_ctx nprocs =
ClimaComms.nprocs(comms_ctx) Float_type = FT resolution = resolution

sim_params = SimulationParameters(FT, resolution, args...)
(; lxy, lz) = sim_params
Expand All @@ -395,12 +396,11 @@ function bubble_3d_invariant_ρe(ARGS, comms_ctx, ::Type{FT}) where {FT}
w = map(_ -> Geometry.Covariant3Vector(0.0), face_coords)
Y = Fields.FieldVector(Yc = Yc, uₕ = uₕ, w = w)

energy_0 = ClimaComms.reduce(comms_ctx, sum(Y.Yc.ρe), +)
mass_0 = ClimaComms.reduce(comms_ctx, sum(Y.Yc.ρ), +)
energy_0 = sum(Y.Yc.ρe)
mass_0 = sum(Y.Yc.ρ)
κ₄ = compute_κ₄(sim_params)

@info "summary" energy_0 mass_0 κ₄

@info "Initial condition" energy_0 mass_0 κ₄
If2c = Operators.InterpolateF2C()
Ic2f = Operators.InterpolateC2F(
bottom = Operators.Extrapolate(),
Expand Down Expand Up @@ -429,6 +429,7 @@ function bubble_3d_invariant_ρe(ARGS, comms_ctx, ::Type{FT}) where {FT}
fu¹² = Geometry.Contravariant12Vector.(
Geometry.Covariant123Vector.(Ic2f.(Y.uₕ))
),
fu³ = Geometry.Contravariant3Vector.(Geometry.Covariant123Vector.(Y.w)),
cuvw = cuvw,
cE = cE,
ce = ce,
Expand Down Expand Up @@ -466,14 +467,15 @@ function bubble_3d_invariant_ρe(ARGS, comms_ctx, ::Type{FT}) where {FT}
Es = FT[]
Mass = FT[]
for sol_step in sol_invariant.u
Es_step = ClimaComms.reduce(comms_ctx, sum(sol_step.Yc.ρe), +)
Mass_step = ClimaComms.reduce(comms_ctx, sum(sol_step.Yc.ρ), +)
Es_step = sum(sol_step.Yc.ρe)
Mass_step = sum(sol_step.Yc.ρ)
if ClimaComms.iamroot(comms_ctx)
push!(Es, Es_step)
push!(Mass, Mass_step)
end
end

@info "summary" Es[end] Mass[end]
#-----------------------------------

ENV["GKSwstype"] = "nul"
Expand Down Expand Up @@ -539,4 +541,4 @@ end

comms_ctx = ClimaComms.context()
ClimaComms.init(comms_ctx)
bubble_3d_invariant_ρe(ARGS, comms_ctx, FloatType)
sol_invariant = bubble_3d_invariant_ρe(ARGS, comms_ctx, FloatType)
63 changes: 42 additions & 21 deletions src/Fields/mapreduce_cuda.jl
Original file line number Diff line number Diff line change
@@ -1,31 +1,56 @@

Base.sum(
function Base.sum(
field::Union{Field, Base.Broadcast.Broadcasted{<:FieldStyle}},
::ClimaComms.CUDADevice,
) = mapreduce_cuda(identity, +, field, weighting = true) #TODO: distributed support to be added
)
context = ClimaComms.context(axes(field))
localsum = mapreduce_cuda(identity, +, field, weighting = true)
ClimaComms.allreduce!(context, parent(localsum), +)
return localsum[]
end

Base.sum(fn, field::Field, ::ClimaComms.CUDADevice) =
mapreduce_cuda(fn, +, field, weighting = true) #TODO: distributed support to be added
function Base.sum(fn, field::Field, ::ClimaComms.CUDADevice)
context = ClimaComms.context(axes(field))
localsum = mapreduce_cuda(fn, +, field, weighting = true)
ClimaComms.allreduce!(context, parent(localsum), +)
return localsum[]
end

Base.maximum(fn, field::Field, ::ClimaComms.CUDADevice) =
mapreduce_cuda(fn, max, field) #TODO: distributed support to be added
function Base.maximum(fn, field::Field, ::ClimaComms.CUDADevice)
context = ClimaComms.context(axes(field))
localmax = mapreduce_cuda(fn, max, field)
ClimaComms.allreduce!(context, parent(localmax), max)
return localmax[]
end

Base.maximum(field::Field, ::ClimaComms.CUDADevice) =
mapreduce_cuda(identity, max, field) #TODO: distributed support to be added
function Base.maximum(field::Field, ::ClimaComms.CUDADevice)
context = ClimaComms.context(axes(field))
localmax = mapreduce_cuda(identity, max, field)
ClimaComms.allreduce!(context, parent(localmax), max)
return localmax[]
end

Base.minimum(fn, field::Field, ::ClimaComms.CUDADevice) =
mapreduce_cuda(fn, min, field) #TODO: distributed support to be added
function Base.minimum(fn, field::Field, ::ClimaComms.CUDADevice)
context = ClimaComms.context(axes(field))
localmin = mapreduce_cuda(fn, min, field)
ClimaComms.allreduce!(context, parent(localmin), min)
return localmin[]
end

Base.minimum(field::Field, ::ClimaComms.CUDADevice) =
mapreduce_cuda(identity, min, field) #TODO: distributed support to be added
function Base.minimum(field::Field, ::ClimaComms.CUDADevice)
context = ClimaComms.context(axes(field))
localmin = mapreduce_cuda(identity, min, field)
ClimaComms.allreduce!(context, parent(localmin), min)
return localmin[]
end

Statistics.mean(
field::Union{Field, Base.Broadcast.Broadcasted{<:FieldStyle}},
::ClimaComms.CUDADevice,
) = Base.sum(field) ./ Base.sum(ones(axes(field))) #TODO: distributed support to be added
) = Base.sum(field) ./ Base.sum(ones(axes(field)))

Statistics.mean(fn, field::Field, ::ClimaComms.CUDADevice) =
Base.sum(fn, field) ./ Base.sum(ones(axes(field))) #TODO: distributed support to be added
Base.sum(fn, field) ./ Base.sum(ones(axes(field)))

function LinearAlgebra.norm(
field::Field,
Expand All @@ -36,9 +61,9 @@ function LinearAlgebra.norm(
if p == 2
# currently only one which supports structured types
if normalize
sqrt.(Statistics.mean(LinearAlgebra.norm_sqr, field))
sqrt.(Statistics.mean(LinearAlgebra.norm_sqr.(field)))
else
sqrt.(sum(LinearAlgebra.norm_sqr, field))
sqrt.(sum(LinearAlgebra.norm_sqr.(field)))
end
elseif p == 1
if normalize
Expand Down Expand Up @@ -106,11 +131,7 @@ function mapreduce_cuda(
Val(shmemsize),
)
end
if Nf == 1
return Array(reduce_cuda)[1, 1]
else
return DataLayouts.DataF{S}(CuArray(view(reduce_cuda, 1, :)[:]))
end
return DataLayouts.DataF{S}(Array(Array(reduce_cuda)[1, :]))
end

function mapreduce_cuda_kernel!(
Expand Down
47 changes: 1 addition & 46 deletions test/Fields/reduction_cuda.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,7 @@ import ClimaCore:
Topologies,
DataLayouts

# set initial condition for steady-state test
function set_initial_condition(space)
Y = map(Fields.local_geometry_field(space)) do local_geometry
h = 1.0
return h
end
return Y
end

# set simple field
function set_simple_field(space)
α0 = 45.0
h0 = 1.0
Y = map(Fields.local_geometry_field(space)) do local_geometry
coord = local_geometry.coordinates
ϕ = coord.lat
λ = coord.long
z = coord.z
h = h0 * z * (cosd(α0) * cosd(ϕ) + sind(α0) * cosd(λ) * sind(ϕ))
return h
end
return Y
end

function set_elevation(space, h₀)
Y = map(Fields.local_geometry_field(space)) do local_geometry
coord = local_geometry.coordinates

ϕ = coord.lat
λ = coord.long
FT = eltype(λ)
ϕₘ = FT(0) # degrees (equator)
λₘ = FT(3 / 2 * 180) # degrees
rₘ =
FT(acos(sind(ϕₘ) * sind(ϕ) + cosd(ϕₘ) * cosd(ϕ) * cosd- λₘ))) # Great circle distance (rads)
Rₘ = FT(3π / 4) # Moutain radius
ζₘ = FT/ 16) # Mountain oscillation half-width
zₛ = ifelse(
rₘ < Rₘ,
FT(h₀ / 2) * (1 + cospi(rₘ / Rₘ)) * (cospi(rₘ / ζₘ))^2,
FT(0),
)
return zₛ
end
return Y
end
include("reduction_cuda_utils.jl")

@testset "test cuda reduction op on surface of sphere" begin
FT = Float64
Expand Down
Loading

0 comments on commit da03513

Please sign in to comment.