-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor test pool to be less optional #75
Changes from 1 commit
49ce841
a00fcdc
271233f
6de754b
9bb99d0
6b5e906
5ea9fe5
07157f4
7dcc887
c185c80
03ad455
60743c1
5bbf5b1
a54adca
f41bbbc
1fb0427
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,7 @@ Base.@kwdef mutable struct TestEnginePool | |
engines::Dict{String, Int64} = Dict() | ||
# This is used to enable unique, simple, naming of engines | ||
# Switching to randomly generated UUIDs would be needed if tests are run independently | ||
next_id::Int64 = 0 | ||
next_id::Threads.Atomic{Int} = Threads.Atomic{Int}(0) | ||
# Number of tests per engine. Values > 1 invalidate test timing, and require careful | ||
# attention to engine sizing | ||
concurrency::Int64 = 1 | ||
|
@@ -27,19 +27,25 @@ Base.@kwdef mutable struct TestEnginePool | |
creater::Function = create_default_engine | ||
end | ||
|
||
function _get_new_id() | ||
return Threads.atomic_add!(TEST_ENGINE_POOL.next_id, 1) | ||
end | ||
|
||
function get_free_test_engine_name()::String | ||
delay = 1 | ||
# One lock guards name acquisition, forming a queue | ||
# The second lock guards modification of the engine pool | ||
@lock TEST_SERVER_ACQUISITION_LOCK while true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with next id being an atomic, is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Disallowing modification of the list while tests are running would also work, of course. |
||
if (length(TEST_ENGINE_POOL.engines) < 1) | ||
error("No servers available!") | ||
end | ||
@lock TEST_SERVER_LOCK begin | ||
if (length(TEST_ENGINE_POOL.engines) < 1) | ||
error("No servers available!") | ||
end | ||
|
||
@lock TEST_SERVER_LOCK for e in TEST_ENGINE_POOL.engines | ||
if e.second < TEST_ENGINE_POOL.concurrency | ||
TEST_ENGINE_POOL.engines[e.first] += 1 | ||
return e.first | ||
for e in TEST_ENGINE_POOL.engines | ||
if e.second < TEST_ENGINE_POOL.concurrency | ||
TEST_ENGINE_POOL.engines[e.first] += 1 | ||
return e.first | ||
end | ||
end | ||
end | ||
# Very naive wait protocol | ||
|
@@ -50,36 +56,28 @@ end | |
""" | ||
Test if an engine has been created and can be returned via the API. | ||
""" | ||
function is_valid_engine(name::String) | ||
function validate_engine(name::String) | ||
try | ||
get_engine(get_context(), name; readtimeout=30) | ||
# The engine exists and does not immediately return an error | ||
return true | ||
catch | ||
# Engine does not exist | ||
return false | ||
response = get_engine(get_context(), name; readtimeout=30) | ||
if response.state == "PROVISIONED" | ||
return true | ||
end | ||
# The engine exists, but is not provisioned | ||
@warn("$engine was not provisioned. Reported state was: $(response.state)") | ||
catch e | ||
if e isa HTTPError | ||
@warn("$engine was not provisioned. Reported error was: $e") | ||
else | ||
rethrow() | ||
end | ||
end | ||
return false | ||
end | ||
|
||
function replace_engine(name::String) | ||
@lock TEST_SERVER_LOCK begin | ||
delete!(TEST_ENGINE_POOL.engines, name) | ||
end | ||
# If the engine could not be deleted, notify and continue | ||
try | ||
delete_engine(get_context(), name; readtimeout=30) | ||
catch | ||
@warn("Could not delete engine: ", name) | ||
end | ||
|
||
name = TEST_ENGINE_POOL.name_generator(TEST_ENGINE_POOL.next_id) | ||
|
||
# Provision the engine if it does not already exist. | ||
TEST_ENGINE_POOL.creater(new_name) | ||
|
||
@lock TEST_SERVER_LOCK begin | ||
TEST_ENGINE_POOL.engines[name] = 0 | ||
end | ||
delete_test_engine!(name) | ||
new_name = TEST_ENGINE_POOL.name_generator(_get_new_id()) | ||
add_test_engine!(new_name) | ||
end | ||
|
||
function list_test_engines() | ||
|
@@ -98,14 +96,39 @@ already. | |
""" | ||
function add_test_engine!(name::String) | ||
# Provision the engine if it does not already exist. | ||
TEST_ENGINE_POOL.creater(new_name) | ||
|
||
try | ||
TEST_ENGINE_POOL.creater(name) | ||
catch | ||
# Provisioning failed. Attempt to delete the engine | ||
delete_test_engine!(name) | ||
@warn("Could not provision engine $name") | ||
return | ||
end | ||
@lock TEST_SERVER_LOCK begin | ||
engines = TEST_ENGINE_POOL.engines | ||
engines[name] = 0 | ||
end | ||
|
||
return nothing | ||
return | ||
end | ||
|
||
""" | ||
delete_test_engine!(name::String) | ||
|
||
Delete an engine and remove it from the pool of test engines. The engine will be deleted | ||
whether or not it is in the pool. | ||
""" | ||
function delete_test_engine!(name::String) | ||
# Remove the engine from the list of available engines | ||
@lock TEST_SERVER_LOCK begin | ||
delete!(TEST_ENGINE_POOL.engines, name) | ||
end | ||
# Request engine deletion | ||
try | ||
delete_engine(get_context(), name; readtimeout=30) | ||
catch e | ||
@warn("Could not delete engine $name: ", e) | ||
end | ||
end | ||
|
||
function get_next_engine_name(id::Int64) | ||
|
@@ -140,99 +163,66 @@ function resize_test_engine_pool!(size::Int64, name_generator::Option{Function}= | |
TEST_ENGINE_POOL.name_generator = name_generator | ||
end | ||
|
||
@lock TEST_SERVER_LOCK begin | ||
# Add engines if size > length | ||
_create_and_add_engines!(size) | ||
_validate_engine_pool!() | ||
_trim_engine_pool!(size) | ||
end | ||
# Add engines if size > length | ||
_create_and_add_engines!(size) | ||
_validate_engine_pool!() | ||
# Remove engines if size < length | ||
_trim_engine_pool!(size) | ||
end | ||
|
||
# Test all engines and remove if they are unavailable or not successfully provisioned | ||
function _validate_engine_pool!() | ||
@lock TEST_SERVER_LOCK begin | ||
@sync for engine in TEST_ENGINE_POOL.engines | ||
try | ||
response = get_engine(get_context(), engine.first; readtimeout=30) | ||
if response.state == "PROVISIONED" | ||
# Success! Move on and try the next engine | ||
continue | ||
end | ||
# The engine exists, but is not provisioned despite our best attempts | ||
@warn("$engine was not provisioned. Reported state was: $(response.state)") | ||
catch e | ||
if e isa HTTPError | ||
@warn("$engine was not provisioned. Reported error was: $e") | ||
else | ||
rethrow() | ||
end | ||
end | ||
# Something went wrong. Remove from the list and attempt to delete | ||
delete!(TEST_ENGINE_POOL.engines, engine) | ||
|
||
# Note that only the deletion is asynchronous, not the list modification | ||
@async try | ||
delete_engine(get_context(), engine.first; readtimeout=30) | ||
@info("Removed failed engine $engine") | ||
catch e | ||
@info("Attempted to remove failed engine $engine", e) | ||
end | ||
for engine in keys(TEST_ENGINE_POOL.engines) | ||
validate_engine(engine) && continue | ||
|
||
# The engine was not provisioned or does not exist. Remove it from the pool | ||
@info("Removing failed engine $engine") | ||
delete_test_engine!(engine) | ||
end | ||
end | ||
end | ||
|
||
function _create_and_add_engines!(size::Int64) | ||
new_engine_count = 0 | ||
@lock TEST_SERVER_LOCK begin | ||
engines = TEST_ENGINE_POOL.engines | ||
increase = size - length(engines) | ||
increase < 0 && return | ||
|
||
new_names = String[] | ||
# Add engines while length < size | ||
while (length(engines) < size) | ||
new_name = TEST_ENGINE_POOL.name_generator(TEST_ENGINE_POOL.next_id) | ||
TEST_ENGINE_POOL.next_id += 1 | ||
|
||
# Check the engine name generator isn't repeating names. | ||
if haskey(engines, new_name) | ||
throw(ArgumentError("Engine name already exists")) | ||
end | ||
push!(new_names, new_name) | ||
engines[new_name] = 0 | ||
new_engine_count = size - length(TEST_ENGINE_POOL.engines) | ||
if new_engine_count < 0 | ||
return | ||
end | ||
end | ||
|
||
# Provision separately so we can do it asynchronously | ||
@info("Provisioning $(increase) new engines") | ||
@sync for new_name in new_names | ||
@async try | ||
TEST_ENGINE_POOL.creater(new_name) | ||
catch | ||
# Ignore any errors here as we check more thoroughly below | ||
end | ||
new_names = String[] | ||
# Generate new names | ||
for _ in 1:new_engine_count | ||
push!(new_names, TEST_ENGINE_POOL.name_generator(_get_new_id())) | ||
end | ||
|
||
@debug("Provisioning $new_engine_count engines") | ||
@sync for new_name in new_names | ||
@async try | ||
add_test_engine!(new_name) | ||
catch e | ||
@warn("Could not provision engine $new_name:", e) | ||
end | ||
end | ||
end | ||
|
||
# Remove engines if size < length(engine_pool) | ||
function _trim_engine_pool!(size::Int64) | ||
@assert size >= 0 | ||
engines_to_delete = String[] | ||
|
||
@lock TEST_SERVER_LOCK begin | ||
# Remove engines if size < length | ||
# Move the first length - size engines to the list of engines to delete | ||
engines_to_delete = String[] | ||
while length(TEST_ENGINE_POOL.engines) > size | ||
engine_name, _ = pop!(TEST_ENGINE_POOL.engines) | ||
push!(engines_to_delete, engine_name) | ||
end | ||
# Asynchronously delete the engines | ||
@sync for engine in engines_to_delete | ||
@info("Deleting engine", engine) | ||
@async try | ||
delete_engine(get_context(), engine; readtimeout=30) | ||
catch e | ||
@info(e) | ||
end | ||
end | ||
end | ||
|
||
@sync for engine in engines_to_delete | ||
@async delete_test_engine!(engine) | ||
end | ||
end | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should concurrency also be an atomic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, concurrency is the general limit, not the number of tests actually running