Skip to content

Commit

Permalink
improve OncePer implementation
Browse files Browse the repository at this point in the history
Address reviewer feedback, add more fixes and more tests,
rename to add Once prefix.
  • Loading branch information
vtjnash authored and KristofferC committed Oct 21, 2024
1 parent 0438f2a commit 53ca3b2
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 107 deletions.
4 changes: 2 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ Multi-threading changes
-----------------------

* New types are defined to handle the pattern of code that must run once per process, called
a `PerProcess{T}` type, which allows defining a function that should be run exactly once
a `OncePerProcess{T}` type, which allows defining a function that should be run exactly once
the first time it is called, and then always return the same result value of type `T`
every subsequent time afterwards. There are also `PerThread{T}` and `PerTask{T}` types for
every subsequent time afterwards. There are also `OncePerThread{T}` and `OncePerTask{T}` types for
similar usage with threads or tasks. ([#TBD])

Build system changes
Expand Down
2 changes: 1 addition & 1 deletion base/docs/basedocs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ runtime initialization functions of external C libraries and initializing global
that involve pointers returned by external libraries.
See the [manual section about modules](@ref modules) for more details.
See also: [`PerProcess`](@ref).
See also: [`OncePerProcess`](@ref).
# Examples
```julia
Expand Down
6 changes: 3 additions & 3 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ export
OrdinalRange,
Pair,
PartialQuickSort,
PerProcess,
PerTask,
PerThread,
OncePerProcess,
OncePerTask,
OncePerThread,
PermutedDimsArray,
QuickSort,
Rational,
Expand Down
148 changes: 78 additions & 70 deletions base/lock.jl
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ Create a level-triggered event source. Tasks that call [`wait`](@ref) on an
After `notify` is called, the `Event` remains in a signaled state and
tasks will no longer block when waiting for it, until `reset` is called.
If `autoreset` is true, at most one task will be released from `wait` for)
If `autoreset` is true, at most one task will be released from `wait` for
each call to `notify`.
This provides an acquire & release memory ordering on notify/wait.
Expand Down Expand Up @@ -578,11 +578,15 @@ end
export Event
end

const PerStateInitial = 0x00
const PerStateHasrun = 0x01
const PerStateErrored = 0x02
const PerStateConcurrent = 0x03

"""
PerProcess{T}
OncePerProcess{T}(init::Function)() -> T
Calling a `PerProcess` object returns a value of type `T` by running the
Calling a `OncePerProcess` object returns a value of type `T` by running the
function `initializer` exactly once per process. All concurrent and future
calls in the same process will return exactly the same value. This is useful in
code that will be precompiled, as it allows setting up caches or other state
Expand All @@ -591,13 +595,14 @@ which won't get serialized.
## Example
```jldoctest
julia> const global_state = Base.PerProcess{Vector{UInt32}}() do
julia> const global_state = Base.OncePerProcess{Vector{UInt32}}() do
println("Making lazy global value...done.")
return [Libc.rand()]
end;
julia> procstate = global_state();
julia> (procstate = global_state()) |> typeof
Making lazy global value...done.
Vector{UInt32} (alias for Array{UInt32, 1})
julia> procstate === global_state()
true
Expand All @@ -606,51 +611,51 @@ julia> procstate === fetch(@async global_state())
true
```
"""
mutable struct PerProcess{T, F}
x::Union{Nothing,T}
mutable struct OncePerProcess{T, F}
value::Union{Nothing,T}
@atomic state::UInt8 # 0=initial, 1=hasrun, 2=error
@atomic allow_compile_time::Bool
const initializer::F
const lock::ReentrantLock

function PerProcess{T,F}(initializer::F) where {T, F}
once = new{T,F}(nothing, 0x00, true, initializer, ReentrantLock())
function OncePerProcess{T,F}(initializer::F) where {T, F}
once = new{T,F}(nothing, PerStateInitial, true, initializer, ReentrantLock())
ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any),
once, :x, nothing)
once, :value, nothing)
ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any),
once, :state, 0x00)
once, :state, PerStateInitial)
return once
end
end
PerProcess{T}(initializer::F) where {T, F} = PerProcess{T, F}(initializer)
PerProcess(initializer) = PerProcess{Base.promote_op(initializer), typeof(initializer)}(initializer)
@inline function (once::PerProcess{T})() where T
OncePerProcess{T}(initializer::F) where {T, F} = OncePerProcess{T, F}(initializer)
OncePerProcess(initializer) = OncePerProcess{Base.promote_op(initializer), typeof(initializer)}(initializer)
@inline function (once::OncePerProcess{T})() where T
state = (@atomic :acquire once.state)
if state != 0x01
if state != PerStateHasrun
(@noinline function init_perprocesss(once, state)
state == 0x02 && error("PerProcess initializer failed previously")
state == PerStateErrored && error("OncePerProcess initializer failed previously")
once.allow_compile_time || __precompile__(false)
lock(once.lock)
try
state = @atomic :monotonic once.state
if state == 0x00
once.x = once.initializer()
elseif state == 0x02
error("PerProcess initializer failed previously")
elseif state != 0x01
error("invalid state for PerProcess")
if state == PerStateInitial
once.value = once.initializer()
elseif state == PerStateErrored
error("OncePerProcess initializer failed previously")
elseif state != PerStateHasrun
error("invalid state for OncePerProcess")
end
catch
state == 0x02 || @atomic :release once.state = 0x02
state == PerStateErrored || @atomic :release once.state = PerStateErrored
unlock(once.lock)
rethrow()
end
state == 0x01 || @atomic :release once.state = 0x01
state == PerStateHasrun || @atomic :release once.state = PerStateHasrun
unlock(once.lock)
nothing
end)(once, state)
end
return once.x::T
return once.value::T
end

function copyto_monotonic!(dest::AtomicMemory, src)
Expand All @@ -659,7 +664,7 @@ function copyto_monotonic!(dest::AtomicMemory, src)
if isassigned(src, j)
@atomic :monotonic dest[i] = src[j]
#else
# _unsafeindex_atomic!(dest, i, src[j], :monotonic)
# _unsetindex_atomic!(dest, i, src[j], :monotonic)
end
i += 1
end
Expand All @@ -674,36 +679,38 @@ function fill_monotonic!(dest::AtomicMemory, x)
end


# share a lock, since we just need it briefly, so some contention is okay
# share a lock/condition, since we just need it briefly, so some contention is okay
const PerThreadLock = ThreadSynchronizer()
"""
PerThread{T}
OncePerThread{T}(init::Function)() -> T
Calling a `PerThread` object returns a value of type `T` by running the function
Calling a `OncePerThread` object returns a value of type `T` by running the function
`initializer` exactly once per thread. All future calls in the same thread, and
concurrent or future calls with the same thread id, will return exactly the
same value. The object can also be indexed by the threadid for any existing
thread, to get (or initialize *on this thread*) the value stored for that
thread. Incorrect usage can lead to data-races or memory corruption so use only
if that behavior is correct within your library's threading-safety design.
Warning: it is not necessarily true that a Task only runs on one thread, therefore the value
returned here may alias other values or change in the middle of your program. This type may
get deprecated in the future. If initializer yields, the thread running the current task
after the call might not be the same as the one at the start of the call.
!!! warning
It is not necessarily true that a Task only runs on one thread, therefore the value
returned here may alias other values or change in the middle of your program. This function
may get deprecated in the future. If initializer yields, the thread running the current
task after the call might not be the same as the one at the start of the call.
See also: [`PerTask`](@ref).
See also: [`OncePerTask`](@ref).
## Example
```jldoctest
julia> const thread_state = Base.PerThread{Vector{UInt32}}() do
julia> const thread_state = Base.OncePerThread{Vector{UInt32}}() do
println("Making lazy thread value...done.")
return [Libc.rand()]
end;
julia> threadvec = thread_state();
julia> (threadvec = thread_state()) |> typeof
Making lazy thread value...done.
Vector{UInt32} (alias for Array{UInt32, 1})
julia> threadvec === fetch(@async thread_state())
true
Expand All @@ -712,12 +719,12 @@ julia> threadvec === thread_state[Threads.threadid()]
true
```
"""
mutable struct PerThread{T, F}
mutable struct OncePerThread{T, F}
@atomic xs::AtomicMemory{T} # values
@atomic ss::AtomicMemory{UInt8} # states: 0=initial, 1=hasrun, 2=error, 3==concurrent
const initializer::F

function PerThread{T,F}(initializer::F) where {T, F}
function OncePerThread{T,F}(initializer::F) where {T, F}
xs, ss = AtomicMemory{T}(), AtomicMemory{UInt8}()
once = new{T,F}(xs, ss, initializer)
ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any),
Expand All @@ -727,29 +734,30 @@ mutable struct PerThread{T, F}
return once
end
end
PerThread{T}(initializer::F) where {T, F} = PerThread{T,F}(initializer)
PerThread(initializer) = PerThread{Base.promote_op(initializer), typeof(initializer)}(initializer)
@inline function getindex(once::PerThread, tid::Integer)
OncePerThread{T}(initializer::F) where {T, F} = OncePerThread{T,F}(initializer)
OncePerThread(initializer) = OncePerThread{Base.promote_op(initializer), typeof(initializer)}(initializer)
@inline (once::OncePerThread)() = once[Threads.threadid()]
@inline function getindex(once::OncePerThread, tid::Integer)
tid = Int(tid)
ss = @atomic :acquire once.ss
xs = @atomic :monotonic once.xs
# n.b. length(xs) >= length(ss)
if tid > length(ss) || (@atomic :acquire ss[tid]) != 0x01
if tid <= 0 || tid > length(ss) || (@atomic :acquire ss[tid]) != PerStateHasrun
(@noinline function init_perthread(once, tid)
local xs = @atomic :acquire once.xs
local ss = @atomic :monotonic once.ss
local ss = @atomic :acquire once.ss
local xs = @atomic :monotonic once.xs
local len = length(ss)
# slow path to allocate it
nt = Threads.maxthreadid()
0 < tid <= nt || ArgumentError("thread id outside of allocated range")
if tid <= length(ss) && (@atomic :acquire ss[tid]) == 0x02
error("PerThread initializer failed previously")
0 < tid <= nt || throw(ArgumentError("thread id outside of allocated range"))
if tid <= length(ss) && (@atomic :acquire ss[tid]) == PerStateErrored
error("OncePerThread initializer failed previously")
end
newxs = xs
newss = ss
if tid > len
# attempt to do all allocations outside of PerThreadLock for better scaling
@assert length(xs) == length(ss) "logical constraint violation"
@assert length(xs) >= length(ss) "logical constraint violation"
newxs = typeof(xs)(undef, len + nt)
newss = typeof(ss)(undef, len + nt)
end
Expand All @@ -759,30 +767,30 @@ PerThread(initializer) = PerThread{Base.promote_op(initializer), typeof(initiali
ss = @atomic :monotonic once.ss
xs = @atomic :monotonic once.xs
if tid > length(ss)
@assert length(ss) >= len && newxs !== xs && newss != ss "logical constraint violation"
fill_monotonic!(newss, 0x00)
@assert len <= length(ss) <= length(newss) "logical constraint violation"
fill_monotonic!(newss, PerStateInitial)
xs = copyto_monotonic!(newxs, xs)
ss = copyto_monotonic!(newss, ss)
@atomic :release once.xs = xs
@atomic :release once.ss = ss
end
state = @atomic :monotonic ss[tid]
while state == 0x04
while state == PerStateConcurrent
# lost race, wait for notification this is done running elsewhere
wait(PerThreadLock) # wait for initializer to finish without releasing this thread
ss = @atomic :monotonic once.ss
state = @atomic :monotonic ss[tid] == 0x04
state = @atomic :monotonic ss[tid]
end
if state == 0x00
if state == PerStateInitial
# won the race, drop lock in exchange for state, and run user initializer
@atomic :monotonic ss[tid] = 0x04
@atomic :monotonic ss[tid] = PerStateConcurrent
result = try
unlock(PerThreadLock)
once.initializer()
catch
lock(PerThreadLock)
ss = @atomic :monotonic once.ss
@atomic :release ss[tid] = 0x02
@atomic :release ss[tid] = PerStateErrored
notify(PerThreadLock)
rethrow()
end
Expand All @@ -791,12 +799,12 @@ PerThread(initializer) = PerThread{Base.promote_op(initializer), typeof(initiali
xs = @atomic :monotonic once.xs
@atomic :release xs[tid] = result
ss = @atomic :monotonic once.ss
@atomic :release ss[tid] = 0x01
@atomic :release ss[tid] = PerStateHasrun
notify(PerThreadLock)
elseif state == 0x02
error("PerThread initializer failed previously")
elseif state != 0x01
error("invalid state for PerThread")
elseif state == PerStateErrored
error("OncePerThread initializer failed previously")
elseif state != PerStateHasrun
error("invalid state for OncePerThread")
end
finally
unlock(PerThreadLock)
Expand All @@ -807,26 +815,26 @@ PerThread(initializer) = PerThread{Base.promote_op(initializer), typeof(initiali
end
return xs[tid]
end
@inline (once::PerThread)() = once[Threads.threadid()]

"""
PerTask{T}
OncePerTask{T}(init::Function)() -> T
Calling a `PerTask` object returns a value of type `T` by running the function `initializer`
Calling a `OncePerTask` object returns a value of type `T` by running the function `initializer`
exactly once per Task. All future calls in the same Task will return exactly the same value.
See also: [`task_local_storage`](@ref).
## Example
```jldoctest
julia> const task_state = Base.PerTask{Vector{UInt32}}() do
julia> const task_state = Base.OncePerTask{Vector{UInt32}}() do
println("Making lazy task value...done.")
return [Libc.rand()]
end;
julia> taskvec = task_state();
julia> (taskvec = task_state()) |> typeof
Making lazy task value...done.
Vector{UInt32} (alias for Array{UInt32, 1})
julia> taskvec === task_state()
true
Expand All @@ -836,11 +844,11 @@ Making lazy task value...done.
false
```
"""
mutable struct PerTask{T, F}
mutable struct OncePerTask{T, F}
const initializer::F

PerTask{T}(initializer::F) where {T, F} = new{T,F}(initializer)
PerTask{T,F}(initializer::F) where {T, F} = new{T,F}(initializer)
PerTask(initializer) = new{Base.promote_op(initializer), typeof(initializer)}(initializer)
OncePerTask{T}(initializer::F) where {T, F} = new{T,F}(initializer)
OncePerTask{T,F}(initializer::F) where {T, F} = new{T,F}(initializer)
OncePerTask(initializer) = new{Base.promote_op(initializer), typeof(initializer)}(initializer)
end
@inline (once::PerTask)() = get!(once.initializer, task_local_storage(), once)
@inline (once::OncePerTask)() = get!(once.initializer, task_local_storage(), once)
6 changes: 3 additions & 3 deletions doc/src/base/base.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ Main.include
Base.include_string
Base.include_dependency
__init__
Base.PerProcess
Base.PerTask
Base.PerThread
Base.OncePerProcess
Base.OncePerTask
Base.OncePerThread
Base.which(::Any, ::Any)
Base.methods
Base.@show
Expand Down
2 changes: 1 addition & 1 deletion test/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ precompile_test_harness(false) do dir
struct GAPGroupHomomorphism{A, B} <: AbstractAlgebraMap{GAPGroupHomomorphism{B, A}} end
global process_state_calls::Int = 0
const process_state = Base.PerProcess{typeof(getpid())}() do
const process_state = Base.OncePerProcess{typeof(getpid())}() do
@assert (global process_state_calls += 1) == 1
return getpid()
end
Expand Down
Loading

0 comments on commit 53ca3b2

Please sign in to comment.