Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor test pool to be less optional #75

Merged
merged 16 commits into from
Aug 18, 2023
196 changes: 93 additions & 103 deletions src/testpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Base.@kwdef mutable struct TestEnginePool
engines::Dict{String, Int64} = Dict()
# This is used to enable unique, simple, naming of engines
# Switching to randomly generated UUIDs would be needed if tests are run independently
next_id::Int64 = 0
next_id::Threads.Atomic{Int} = Threads.Atomic{Int}(0)
# Number of tests per engine. Values > 1 invalidate test timing, and require careful
# attention to engine sizing
concurrency::Int64 = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should concurrency also be an atomic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, concurrency is the general limit, not the number of tests actually running

Expand All @@ -27,19 +27,25 @@ Base.@kwdef mutable struct TestEnginePool
creater::Function = create_default_engine
end

function _get_new_id()
return Threads.atomic_add!(TEST_ENGINE_POOL.next_id, 1)
end

function get_free_test_engine_name()::String
delay = 1
# One lock guards name acquisition, forming a queue
# The second lock guards modification of the engine pool
@lock TEST_SERVER_ACQUISITION_LOCK while true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with next id being an atomic, is the TEST_SERVER_ACQUISITION_LOCK still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next_id is for engine naming, so they don't interact. It's there to simplify starting up/naming/renaming multiple engines at once.

TEST_SERVER_ACQUISITION_LOCK is there to restrict engine claiming to one test a time - which forms a queue once all engines are claimed. The inner lock then gives 'privileged' access to anything else trying to modify the list. A standard test has to claim both locks, restricting access to the inner lock to one test thread at a time. Thus an attempt to replace an engine has to contend for the lock with only one thread that spends most of its time sleeping. Otherwise attempts to modify the list during testing could wait until all the tests had finished.

Disallowing modification of the list while tests are running would also work, of course.

if (length(TEST_ENGINE_POOL.engines) < 1)
error("No servers available!")
end
@lock TEST_SERVER_LOCK begin
if (length(TEST_ENGINE_POOL.engines) < 1)
error("No servers available!")
end

@lock TEST_SERVER_LOCK for e in TEST_ENGINE_POOL.engines
if e.second < TEST_ENGINE_POOL.concurrency
TEST_ENGINE_POOL.engines[e.first] += 1
return e.first
for e in TEST_ENGINE_POOL.engines
if e.second < TEST_ENGINE_POOL.concurrency
TEST_ENGINE_POOL.engines[e.first] += 1
return e.first
end
end
end
# Very naive wait protocol
Expand All @@ -50,36 +56,28 @@ end
"""
Test if an engine has been created and can be returned via the API.
"""
function is_valid_engine(name::String)
function validate_engine(name::String)
try
get_engine(get_context(), name; readtimeout=30)
# The engine exists and does not immediately return an error
return true
catch
# Engine does not exist
return false
response = get_engine(get_context(), name; readtimeout=30)
if response.state == "PROVISIONED"
return true
end
# The engine exists, but is not provisioned
@warn("$engine was not provisioned. Reported state was: $(response.state)")
catch e
if e isa HTTPError
@warn("$engine was not provisioned. Reported error was: $e")
else
rethrow()
end
end
return false
end

function replace_engine(name::String)
@lock TEST_SERVER_LOCK begin
delete!(TEST_ENGINE_POOL.engines, name)
end
# If the engine could not be deleted, notify and continue
try
delete_engine(get_context(), name; readtimeout=30)
catch
@warn("Could not delete engine: ", name)
end

name = TEST_ENGINE_POOL.name_generator(TEST_ENGINE_POOL.next_id)

# Provision the engine if it does not already exist.
TEST_ENGINE_POOL.creater(new_name)

@lock TEST_SERVER_LOCK begin
TEST_ENGINE_POOL.engines[name] = 0
end
delete_test_engine!(name)
new_name = TEST_ENGINE_POOL.name_generator(_get_new_id())
add_test_engine!(new_name)
end

function list_test_engines()
Expand All @@ -98,14 +96,39 @@ already.
"""
function add_test_engine!(name::String)
# Provision the engine if it does not already exist.
TEST_ENGINE_POOL.creater(new_name)

try
TEST_ENGINE_POOL.creater(name)
catch
# Provisioning failed. Attempt to delete the engine
delete_test_engine!(name)
@warn("Could not provision engine $name")
return
end
@lock TEST_SERVER_LOCK begin
engines = TEST_ENGINE_POOL.engines
engines[name] = 0
end

return nothing
return
end

"""
delete_test_engine!(name::String)

Delete an engine and remove it from the pool of test engines. The engine will be deleted
whether or not it is in the pool.
"""
function delete_test_engine!(name::String)
# Remove the engine from the list of available engines
@lock TEST_SERVER_LOCK begin
delete!(TEST_ENGINE_POOL.engines, name)
end
# Request engine deletion
try
delete_engine(get_context(), name; readtimeout=30)
catch e
@warn("Could not delete engine $name: ", e)
end
end

function get_next_engine_name(id::Int64)
Expand Down Expand Up @@ -140,99 +163,66 @@ function resize_test_engine_pool!(size::Int64, name_generator::Option{Function}=
TEST_ENGINE_POOL.name_generator = name_generator
end

@lock TEST_SERVER_LOCK begin
# Add engines if size > length
_create_and_add_engines!(size)
_validate_engine_pool!()
_trim_engine_pool!(size)
end
# Add engines if size > length
_create_and_add_engines!(size)
_validate_engine_pool!()
# Remove engines if size < length
_trim_engine_pool!(size)
end

# Test all engines and remove if they are unavailable or not successfully provisioned
function _validate_engine_pool!()
@lock TEST_SERVER_LOCK begin
@sync for engine in TEST_ENGINE_POOL.engines
try
response = get_engine(get_context(), engine.first; readtimeout=30)
if response.state == "PROVISIONED"
# Success! Move on and try the next engine
continue
end
# The engine exists, but is not provisioned despite our best attempts
@warn("$engine was not provisioned. Reported state was: $(response.state)")
catch e
if e isa HTTPError
@warn("$engine was not provisioned. Reported error was: $e")
else
rethrow()
end
end
# Something went wrong. Remove from the list and attempt to delete
delete!(TEST_ENGINE_POOL.engines, engine)

# Note that only the deletion is asynchronous, not the list modification
@async try
delete_engine(get_context(), engine.first; readtimeout=30)
@info("Removed failed engine $engine")
catch e
@info("Attempted to remove failed engine $engine", e)
end
for engine in keys(TEST_ENGINE_POOL.engines)
validate_engine(engine) && continue

# The engine was not provisioned or does not exist. Remove it from the pool
@info("Removing failed engine $engine")
delete_test_engine!(engine)
end
end
end

function _create_and_add_engines!(size::Int64)
new_engine_count = 0
@lock TEST_SERVER_LOCK begin
engines = TEST_ENGINE_POOL.engines
increase = size - length(engines)
increase < 0 && return

new_names = String[]
# Add engines while length < size
while (length(engines) < size)
new_name = TEST_ENGINE_POOL.name_generator(TEST_ENGINE_POOL.next_id)
TEST_ENGINE_POOL.next_id += 1

# Check the engine name generator isn't repeating names.
if haskey(engines, new_name)
throw(ArgumentError("Engine name already exists"))
end
push!(new_names, new_name)
engines[new_name] = 0
new_engine_count = size - length(TEST_ENGINE_POOL.engines)
if new_engine_count < 0
return
end
end

# Provision separately so we can do it asynchronously
@info("Provisioning $(increase) new engines")
@sync for new_name in new_names
@async try
TEST_ENGINE_POOL.creater(new_name)
catch
# Ignore any errors here as we check more thoroughly below
end
new_names = String[]
# Generate new names
for _ in 1:new_engine_count
push!(new_names, TEST_ENGINE_POOL.name_generator(_get_new_id()))
end

@debug("Provisioning $new_engine_count engines")
@sync for new_name in new_names
@async try
add_test_engine!(new_name)
catch e
@warn("Could not provision engine $new_name:", e)
end
end
end

# Remove engines if size < length(engine_pool)
function _trim_engine_pool!(size::Int64)
@assert size >= 0
engines_to_delete = String[]

@lock TEST_SERVER_LOCK begin
# Remove engines if size < length
# Move the first length - size engines to the list of engines to delete
engines_to_delete = String[]
while length(TEST_ENGINE_POOL.engines) > size
engine_name, _ = pop!(TEST_ENGINE_POOL.engines)
push!(engines_to_delete, engine_name)
end
# Asynchronously delete the engines
@sync for engine in engines_to_delete
@info("Deleting engine", engine)
@async try
delete_engine(get_context(), engine; readtimeout=30)
catch e
@info(e)
end
end
end

@sync for engine in engines_to_delete
@async delete_test_engine!(engine)
end
end

Expand Down