Skip to content

Commit

Permalink
Reduce locked scope
Browse files Browse the repository at this point in the history
  • Loading branch information
mmcgr committed Aug 17, 2023
1 parent 5bbf5b1 commit a54adca
Showing 1 changed file with 93 additions and 103 deletions.
196 changes: 93 additions & 103 deletions src/testpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Base.@kwdef mutable struct TestEnginePool
engines::Dict{String, Int64} = Dict()
# This is used to enable unique, simple, naming of engines
# Switching to randomly generated UUIDs would be needed if tests are run independently
next_id::Int64 = 0
next_id::Threads.Atomic{Int} = Threads.Atomic{Int}(0)
# Number of tests per engine. Values > 1 invalidate test timing, and require careful
# attention to engine sizing
concurrency::Int64 = 1
Expand All @@ -27,19 +27,25 @@ Base.@kwdef mutable struct TestEnginePool
creater::Function = create_default_engine
end

function _get_new_id()
return Threads.atomic_add!(TEST_ENGINE_POOL.next_id, 1)
end

function get_free_test_engine_name()::String
delay = 1
# One lock guards name acquisition, forming a queue
# The second lock guards modification of the engine pool
@lock TEST_SERVER_ACQUISITION_LOCK while true
if (length(TEST_ENGINE_POOL.engines) < 1)
error("No servers available!")
end
@lock TEST_SERVER_LOCK begin
if (length(TEST_ENGINE_POOL.engines) < 1)
error("No servers available!")
end

@lock TEST_SERVER_LOCK for e in TEST_ENGINE_POOL.engines
if e.second < TEST_ENGINE_POOL.concurrency
TEST_ENGINE_POOL.engines[e.first] += 1
return e.first
for e in TEST_ENGINE_POOL.engines
if e.second < TEST_ENGINE_POOL.concurrency
TEST_ENGINE_POOL.engines[e.first] += 1
return e.first
end
end
end
# Very naive wait protocol
Expand All @@ -50,36 +56,28 @@ end
"""
Test if an engine has been created and can be returned via the API.
"""
function is_valid_engine(name::String)
function validate_engine(name::String)
try
get_engine(get_context(), name; readtimeout=30)
# The engine exists and does not immediately return an error
return true
catch
# Engine does not exist
return false
response = get_engine(get_context(), name; readtimeout=30)
if response.state == "PROVISIONED"
return true
end
# The engine exists, but is not provisioned
@warn("$engine was not provisioned. Reported state was: $(response.state)")
catch e
if e isa HTTPError
@warn("$engine was not provisioned. Reported error was: $e")
else
rethrow()
end
end
return false
end

function replace_engine(name::String)
@lock TEST_SERVER_LOCK begin
delete!(TEST_ENGINE_POOL.engines, name)
end
# If the engine could not be deleted, notify and continue
try
delete_engine(get_context(), name; readtimeout=30)
catch
@warn("Could not delete engine: ", name)
end

name = TEST_ENGINE_POOL.name_generator(TEST_ENGINE_POOL.next_id)

# Provision the engine if it does not already exist.
TEST_ENGINE_POOL.creater(new_name)

@lock TEST_SERVER_LOCK begin
TEST_ENGINE_POOL.engines[name] = 0
end
delete_test_engine!(name)
new_name = TEST_ENGINE_POOL.name_generator(_get_new_id())
add_test_engine!(new_name)
end

function list_test_engines()
Expand All @@ -98,14 +96,39 @@ already.
"""
function add_test_engine!(name::String)
# Provision the engine if it does not already exist.
TEST_ENGINE_POOL.creater(new_name)

try
TEST_ENGINE_POOL.creater(name)
catch
# Provisioning failed. Attempt to delete the engine
delete_test_engine!(name)
@warn("Could not provision engine $name")
return
end
@lock TEST_SERVER_LOCK begin
engines = TEST_ENGINE_POOL.engines
engines[name] = 0
end

return nothing
return
end

"""
delete_test_engine!(name::String)
Delete an engine and remove it from the pool of test engines. The engine will be deleted
whether or not it is in the pool.
"""
function delete_test_engine!(name::String)
# Remove the engine from the list of available engines
@lock TEST_SERVER_LOCK begin
delete!(TEST_ENGINE_POOL.engines, name)
end
# Request engine deletion
try
delete_engine(get_context(), name; readtimeout=30)
catch e
@warn("Could not delete engine $name: ", e)
end
end

function get_next_engine_name(id::Int64)
Expand Down Expand Up @@ -140,99 +163,66 @@ function resize_test_engine_pool!(size::Int64, name_generator::Option{Function}=
TEST_ENGINE_POOL.name_generator = name_generator
end

@lock TEST_SERVER_LOCK begin
# Add engines if size > length
_create_and_add_engines!(size)
_validate_engine_pool!()
_trim_engine_pool!(size)
end
# Add engines if size > length
_create_and_add_engines!(size)
_validate_engine_pool!()
# Remove engines if size < length
_trim_engine_pool!(size)
end

# Test all engines and remove if they are unavailable or not successfully provisioned
function _validate_engine_pool!()
@lock TEST_SERVER_LOCK begin
@sync for engine in TEST_ENGINE_POOL.engines
try
response = get_engine(get_context(), engine.first; readtimeout=30)
if response.state == "PROVISIONED"
# Success! Move on and try the next engine
continue
end
# The engine exists, but is not provisioned despite our best attempts
@warn("$engine was not provisioned. Reported state was: $(response.state)")
catch e
if e isa HTTPError
@warn("$engine was not provisioned. Reported error was: $e")
else
rethrow()
end
end
# Something went wrong. Remove from the list and attempt to delete
delete!(TEST_ENGINE_POOL.engines, engine)

# Note that only the deletion is asynchronous, not the list modification
@async try
delete_engine(get_context(), engine.first; readtimeout=30)
@info("Removed failed engine $engine")
catch e
@info("Attempted to remove failed engine $engine", e)
end
for engine in keys(TEST_ENGINE_POOL.engines)
validate_engine(engine) && continue

# The engine was not provisioned or does not exist. Remove it from the pool
@info("Removing failed engine $engine")
delete_test_engine!(engine)
end
end
end

function _create_and_add_engines!(size::Int64)
new_engine_count = 0
@lock TEST_SERVER_LOCK begin
engines = TEST_ENGINE_POOL.engines
increase = size - length(engines)
increase < 0 && return

new_names = String[]
# Add engines while length < size
while (length(engines) < size)
new_name = TEST_ENGINE_POOL.name_generator(TEST_ENGINE_POOL.next_id)
TEST_ENGINE_POOL.next_id += 1

# Check the engine name generator isn't repeating names.
if haskey(engines, new_name)
throw(ArgumentError("Engine name already exists"))
end
push!(new_names, new_name)
engines[new_name] = 0
new_engine_count = size - length(TEST_ENGINE_POOL.engines)
if new_engine_count < 0
return
end
end

# Provision separately so we can do it asynchronously
@info("Provisioning $(increase) new engines")
@sync for new_name in new_names
@async try
TEST_ENGINE_POOL.creater(new_name)
catch
# Ignore any errors here as we check more thoroughly below
end
new_names = String[]
# Generate new names
for _ in 1:new_engine_count
push!(new_names, TEST_ENGINE_POOL.name_generator(_get_new_id()))
end

@debug("Provisioning $new_engine_count engines")
@sync for new_name in new_names
@async try
add_test_engine!(new_name)
catch e
@warn("Could not provision engine $new_name:", e)
end
end
end

# Remove engines if size < length(engine_pool)
function _trim_engine_pool!(size::Int64)
@assert size >= 0
engines_to_delete = String[]

@lock TEST_SERVER_LOCK begin
# Remove engines if size < length
# Move the first length - size engines to the list of engines to delete
engines_to_delete = String[]
while length(TEST_ENGINE_POOL.engines) > size
engine_name, _ = pop!(TEST_ENGINE_POOL.engines)
push!(engines_to_delete, engine_name)
end
# Asynchronously delete the engines
@sync for engine in engines_to_delete
@info("Deleting engine", engine)
@async try
delete_engine(get_context(), engine; readtimeout=30)
catch e
@info(e)
end
end
end

@sync for engine in engines_to_delete
@async delete_test_engine!(engine)
end
end

Expand Down

0 comments on commit a54adca

Please sign in to comment.