Skip to content

Commit

Permalink
add objects to concisely specify initialization
Browse files Browse the repository at this point in the history
PerProcess: once per process
PerThread: once per thread id
PerTask: once per task object
  • Loading branch information
vtjnash authored and KristofferC committed Oct 21, 2024
1 parent 636008a commit e9bfc9c
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 1 deletion.
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ variables. ([#53742]).
Multi-threading changes
-----------------------

* New types are defined to handle the pattern of code that must run once per process, called
a `PerProcess{T}` type, which allows defining a function that should be run exactly once
the first time it is called, and then always return the same result value of type `T`
every subsequent time afterwards. There are also `PerThread{T}` and `PerTask{T}` types for
similar usage with threads or tasks. ([#TBD])

Build system changes
--------------------

Expand Down
2 changes: 2 additions & 0 deletions base/docs/basedocs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ runtime initialization functions of external C libraries and initializing global
that involve pointers returned by external libraries.
See the [manual section about modules](@ref modules) for more details.
See also: [`PerProcess`](@ref).
# Examples
```julia
const foo_data_ptr = Ref{Ptr{Cvoid}}(0)
Expand Down
3 changes: 3 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ export
OrdinalRange,
Pair,
PartialQuickSort,
PerProcess,
PerTask,
PerThread,
PermutedDimsArray,
QuickSort,
Rational,
Expand Down
252 changes: 251 additions & 1 deletion base/lock.jl
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ Create a level-triggered event source. Tasks that call [`wait`](@ref) on an
After `notify` is called, the `Event` remains in a signaled state and
tasks will no longer block when waiting for it, until `reset` is called.
If `autoreset` is true, at most one task will be released from `wait` for
If `autoreset` is true, at most one task will be released from `wait` for)
each call to `notify`.
This provides an acquire & release memory ordering on notify/wait.
Expand Down Expand Up @@ -570,3 +570,253 @@ end
import .Base: Event
export Event
end


"""
PerProcess{T}
Calling a `PerProcess` object returns a value of type `T` by running the
function `initializer` exactly once per process. All concurrent and future
calls in the same process will return exactly the same value. This is useful in
code that will be precompiled, as it allows setting up caches or other state
which won't get serialized.
## Example
```jldoctest
julia> const global_state = Base.PerProcess{Vector{UInt32}}() do
println("Making lazy global value...done.")
return [Libc.rand()]
end;
julia> procstate = global_state();
Making lazy global value...done.
julia> procstate === global_state()
true
julia> procstate === fetch(@async global_state())
true
```
"""
mutable struct PerProcess{T, F}
x::Union{Nothing,T}
@atomic state::UInt8 # 0=initial, 1=hasrun, 2=error
@atomic allow_compile_time::Bool
const initializer::F
const lock::ReentrantLock

PerProcess{T}(initializer::F) where {T, F} = new{T,F}(nothing, 0x00, true, initializer, ReentrantLock())
PerProcess{T,F}(initializer::F) where {T, F} = new{T,F}(nothing, 0x00, true, initializer, ReentrantLock())
PerProcess(initializer) = new{Base.promote_op(initializer), typeof(initializer)}(nothing, 0x00, true, initializer, ReentrantLock())
end
@inline function (once::PerProcess{T})() where T
state = (@atomic :acquire once.state)
if state != 0x01
(@noinline function init_perprocesss(once, state)
state == 0x02 && error("PerProcess initializer failed previously")
Base.__precompile__(once.allow_compile_time)
lock(once.lock)
try
state = @atomic :monotonic once.state
if state == 0x00
once.x = once.initializer()
elseif state == 0x02
error("PerProcess initializer failed previously")
elseif state != 0x01
error("invalid state for PerProcess")
end
catch
state == 0x02 || @atomic :release once.state = 0x02
unlock(once.lock)
rethrow()
end
state == 0x01 || @atomic :release once.state = 0x01
unlock(once.lock)
nothing
end)(once, state)
end
return once.x::T
end

function copyto_monotonic!(dest::AtomicMemory, src)
i = 1
for j in eachindex(src)
if isassigned(src, j)
@atomic :monotonic dest[i] = src[j]
end
i += 1
end
dest
end

function fill_monotonic!(dest::AtomicMemory, x)
for i = 1:length(dest)
@atomic :monotonic dest[i] = x
end
dest
end


# share a lock, since we just need it briefly, so some contention is okay
const PerThreadLock = ThreadSynchronizer()
"""
PerThread{T}
Calling a `PerThread` object returns a value of type `T` by running the function
`initializer` exactly once per thread. All future calls in the same thread, and
concurrent or future calls with the same thread id, will return exactly the
same value. The object can also be indexed by the threadid for any existing
thread, to get (or initialize *on this thread*) the value stored for that
thread. Incorrect usage can lead to data-races or memory corruption so use only
if that behavior is correct within your library's threading-safety design.
Warning: it is not necessarily true that a Task only runs on one thread, therefore the value
returned here may alias other values or change in the middle of your program. This type may
get deprecated in the future. If initializer yields, the thread running the current task
after the call might not be the same as the one at the start of the call.
See also: [`PerTask`](@ref).
## Example
```jldoctest
julia> const thread_state = Base.PerThread{Vector{UInt32}}() do
println("Making lazy thread value...done.")
return [Libc.rand()]
end;
julia> threadvec = thread_state();
Making lazy thread value...done.
julia> threadvec === fetch(@async thread_state())
true
julia> threadvec === thread_state[Threads.threadid()]
true
```
"""
mutable struct PerThread{T, F}
@atomic xs::AtomicMemory{T} # values
@atomic ss::AtomicMemory{UInt8} # states: 0=initial, 1=hasrun, 2=error, 3==concurrent
const initializer::F

PerThread{T}(initializer::F) where {T, F} = new{T,F}(AtomicMemory{T}(), AtomicMemory{UInt8}(), initializer)
PerThread{T,F}(initializer::F) where {T, F} = new{T,F}(AtomicMemory{T}(), AtomicMemory{UInt8}(), initializer)
PerThread(initializer) = (T = Base.promote_op(initializer); new{T, typeof(initializer)}(AtomicMemory{T}(), AtomicMemory{UInt8}(), initializer))
end
@inline function getindex(once::PerThread, tid::Integer)
tid = Int(tid)
ss = @atomic :acquire once.ss
xs = @atomic :monotonic once.xs
# n.b. length(xs) >= length(ss)
if tid > length(ss) || (@atomic :acquire ss[tid]) != 0x01
(@noinline function init_perthread(once, tid)
local xs = @atomic :acquire once.xs
local ss = @atomic :monotonic once.ss
local len = length(ss)
# slow path to allocate it
nt = Threads.maxthreadid()
0 < tid <= nt || ArgumentError("thread id outside of allocated range")
if tid <= length(ss) && (@atomic :acquire ss[tid]) == 0x02
error("PerThread initializer failed previously")
end
newxs = xs
newss = ss
if tid > len
# attempt to do all allocations outside of PerThreadLock for better scaling
@assert length(xs) == length(ss) "logical constraint violation"
newxs = typeof(xs)(undef, len + nt)
newss = typeof(ss)(undef, len + nt)
end
# uses state and locks to ensure this runs exactly once per tid argument
lock(PerThreadLock)
try
ss = @atomic :monotonic once.ss
xs = @atomic :monotonic once.xs
if tid > length(ss)
@assert length(ss) >= len && newxs !== xs && newss != ss "logical constraint violation"
fill_monotonic!(newss, 0x00)
xs = copyto_monotonic!(newxs, xs)
ss = copyto_monotonic!(newss, ss)
@atomic :release once.xs = xs
@atomic :release once.ss = ss
end
state = @atomic :monotonic ss[tid]
while state == 0x04
# lost race, wait for notification this is done running elsewhere
wait(PerThreadLock) # wait for initializer to finish without releasing this thread
ss = @atomic :monotonic once.ss
state = @atomic :monotonic ss[tid] == 0x04
end
if state == 0x00
# won the race, drop lock in exchange for state, and run user initializer
@atomic :monotonic ss[tid] = 0x04
result = try
unlock(PerThreadLock)
once.initializer()
catch
lock(PerThreadLock)
ss = @atomic :monotonic once.ss
@atomic :release ss[tid] = 0x02
notify(PerThreadLock)
rethrow()
end
# store result and notify waiters
lock(PerThreadLock)
xs = @atomic :monotonic once.xs
@atomic :release xs[tid] = result
ss = @atomic :monotonic once.ss
@atomic :release ss[tid] = 0x01
notify(PerThreadLock)
elseif state == 0x02
error("PerThread initializer failed previously")
elseif state != 0x01
error("invalid state for PerThread")
end
finally
unlock(PerThreadLock)
end
nothing
end)(once, tid)
xs = @atomic :monotonic once.xs
end
return xs[tid]
end
@inline (once::PerThread)() = once[Threads.threadid()]

"""
PerTask{T}
Calling a `PerTask` object returns a value of type `T` by running the function `initializer`
exactly once per Task. All future calls in the same Task will return exactly the same value.
See also: [`task_local_storage`](@ref).
## Example
```jldoctest
julia> const task_state = Base.PerTask{Vector{UInt32}}() do
println("Making lazy task value...done.")
return [Libc.rand()]
end;
julia> taskvec = task_state();
Making lazy task value...done.
julia> taskvec === task_state()
true
julia> taskvec === fetch(@async task_state())
Making lazy task value...done.
false
```
"""
mutable struct PerTask{T, F}
const initializer::F

PerTask{T}(initializer::F) where {T, F} = new{T,F}(initializer)
PerTask{T,F}(initializer::F) where {T, F} = new{T,F}(initializer)
PerTask(initializer) = new{Base.promote_op(initializer), typeof(initializer)}(initializer)
end
@inline (once::PerTask)() = get!(once.initializer, task_local_storage(), once)
3 changes: 3 additions & 0 deletions doc/src/base/base.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ Main.include
Base.include_string
Base.include_dependency
__init__
Base.PerProcess
Base.PerTask
Base.PerThread
Base.which(::Any, ::Any)
Base.methods
Base.@show
Expand Down
21 changes: 21 additions & 0 deletions test/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ precompile_test_harness(false) do dir
end
abstract type AbstractAlgebraMap{A} end
struct GAPGroupHomomorphism{A, B} <: AbstractAlgebraMap{GAPGroupHomomorphism{B, A}} end
global process_state_calls::Int = 0
const process_state = Base.PerProcess{typeof(getpid())}() do
@assert (global process_state_calls += 1) == 1
return getpid()
end
const mypid = process_state()
@assert process_state_calls === 1
process_state_calls = 0
@assert process_state() === process_state()
@assert process_state_calls === 0
end
""")
write(Foo2_file,
Expand Down Expand Up @@ -272,6 +283,9 @@ precompile_test_harness(false) do dir
oid_vec_int = objectid(a_vec_int)
oid_mat_int = objectid(a_mat_int)
using $FooBase_module: process_state, mypid as FooBase_pid, process_state_calls
const mypid = process_state()
end
""")
# Issue #52063
Expand Down Expand Up @@ -333,6 +347,13 @@ precompile_test_harness(false) do dir
@test isready(Foo.ch2)
@test take!(Foo.ch2) === 2
@test !isready(Foo.ch2)

@test Foo.process_state_calls === 0
@test Foo.process_state() === getpid()
@test Foo.mypid !== getpid()
@test Foo.FooBase_pid !== getpid()
@test Foo.mypid !== Foo.FooBase_pid
@test Foo.process_state_calls === 1
end

let
Expand Down
Loading

0 comments on commit e9bfc9c

Please sign in to comment.