Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC/WIP] Asynchronous ParallelEvaluator #46

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion src/BlackBoxOptim.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module BlackBoxOptim

using Distributions, StatsBase, Random, LinearAlgebra, Printf, Distributed, Compat
using Distributions, StatsBase, Random, LinearAlgebra, Printf, Distributed, SharedArrays, Compat
using Printf: @printf, @sprintf
using Compat: String, view

Expand Down
107 changes: 101 additions & 6 deletions src/borg_moea.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ mutable struct BorgMOEA{FS<:FitnessScheme,V<:Evaluator,P<:Population,M<:GeneticO

n_restarts::Int
n_steps::Int
n_recombined::Int
n_processed::Int
last_restart_check::Int
last_restart::Int
last_wrecombinate_update::Int
Expand Down Expand Up @@ -49,7 +51,7 @@ mutable struct BorgMOEA{FS<:FitnessScheme,V<:Evaluator,P<:Population,M<:GeneticO
fit_scheme = EpsBoxDominanceFitnessScheme(fit_scheme, params[:ϵ])
archive = EpsBoxArchive(fit_scheme, params)
evaluator = make_evaluator(problem, archive, params)
new{typeof(fit_scheme),typeof(evaluator),P,M,E}(evaluator, pop, Vector{Int}(), 0, 0, 0, 0, 0,
new{typeof(fit_scheme),typeof(evaluator),P,M,E}(evaluator, pop, Vector{Int}(), 0, 0, 0, 0, 0, 0, 0,
params[:τ], params[:γ], params[:γ_δ], params[:PopulationSize],
Categorical(ones(length(recombinate))/length(recombinate)),
params[:θ], params[:ζ], params[:OperatorsUpdatePeriod], params[:RestartCheckPeriod],
Expand Down Expand Up @@ -114,6 +116,12 @@ function step!(alg::BorgMOEA)
if alg.n_steps >= alg.last_wrecombinate_update + alg.wrecombinate_update_period
update_recombination_weights!(alg)
end
recombine_individuals!(alg)
return alg
end

function recombine_individuals!(alg::BorgMOEA)
prepare_recombination(alg)
# Select the operators to apply based on their probabilities
recomb_op_ix = rand(alg.recombinate_distr)
recombinate!(alg, recomb_op_ix, alg.recombinate[recomb_op_ix])
Expand All @@ -138,16 +146,45 @@ function recombinate!(alg::BorgMOEA, recomb_op_ix::Int, recomb_op::CrossoverOper
apply!(recomb_op, Individual[child.params for child in children],
zeros(Int, length(children)), alg.population, parent_indices)
for child in children
child.extra = recomb_op
child.tag = recomb_op_ix
process_candidate!(alg, child, parent_indices[1])
preprocess_recombined!(alg, child, recomb_op_ix, parent_indices[1])
alg.n_recombined += 1
postprocess_recombined!(alg, child)
end
end

function process_candidate!(alg::BorgMOEA, candi::Candidate, ref_index::Int)
prepare_recombination(alg::BorgMOEA) = nothing # do nothing

function preprocess_recombined!(alg::BorgMOEA, candi::Candidate, recomb_op_ix::Int, ref_index::Int)
apply!(alg.embed, candi.params, alg.population, ref_index)
reset_fitness!(candi, alg.population)
ifitness = fitness(update_fitness!(alg.evaluator, candi)) # implicitly updates the archive
candi.extra = alg.recombinate[recomb_op_ix]
candi.tag = recomb_op_ix
candi
end

function postprocess_recombined!(alg::BorgMOEA, candi::Candidate)
update_fitness!(alg.evaluator, candi) # implicitly updates the archive
process_candidate!(alg, candi)
end

# ParallelEvaluator version -- process previously submitted candidates with the completed fitness
function prepare_recombination(alg::BorgMOEA{<:FitnessScheme, <:ParallelEvaluator})
process_completed!(alg.evaluator) do fit_job_id, candi
process_candidate!(alg, candi)
true
end
end

# ParallelEvaluator version, just submit to fitness calculation, nothing else
# if the queue is full, waits until some jobs are processed -- that established the
# balance between recombining and fitness evaluation
function postprocess_recombined!(alg::BorgMOEA{<:FitnessScheme, <:ParallelEvaluator}, candi::Candidate)
async_update_fitness(alg.evaluator, candi, wait=true)
return candi
end

function process_candidate!(alg::BorgMOEA, candi::Candidate)
ifitness = fitness(candi)
# test the population
hat_comp = HatCompare(fitness_scheme(archive(alg)))
popsz = popsize(alg.population)
Expand Down Expand Up @@ -190,6 +227,7 @@ function process_candidate!(alg::BorgMOEA, candi::Candidate, ref_index::Int)
else
release_candi(alg.population, candi)
end
alg.n_processed += 1
alg
end

Expand Down Expand Up @@ -221,6 +259,38 @@ function update_population_fitness!(alg::BorgMOEA)
end
end

function update_population_fitness!(alg::BorgMOEA{<:FitnessScheme, <:ParallelEvaluator})
fs = fitness_scheme(alg.evaluator.archive)
popsz = popsize(alg.population)
last_checked = n_processed = 0
job_ids = BitSet()
while n_processed < popsz
# process the calculated fitnesses
process_completed!(alg.evaluator) do fit_job_id, candi
if pop!(job_ids, fit_job_id, 0) > 0
alg.population.fitness[candi.index] = candi.fitness
release_candi(alg.population, candi)
n_processed += 1
return true
else
return false # some unrelated candidate, skip
end
end
if last_checked < popsz
# submit to fitness evaluation
ix = (last_checked += 1)
if isnafitness(fitness(alg.population, ix), fs)
candi = acquire_candi(alg.population, ix)
push!(job_ids, async_update_fitness(alg.evaluator, candi, wait=true))
else # fitness already calculated
n_processed += 1
end
else
yield() # allow the other tasks to process the incoming fitnesses
end
end
end

"""
Update recombination operator probabilities based on the archive tag counts.
"""
Expand Down Expand Up @@ -254,6 +324,31 @@ function populate_by_mutants(alg::BorgMOEA, last_nonmutant::Int)
end
end

function populate_by_mutants(alg::BorgMOEA{<:FitnessScheme, <:ParallelEvaluator}, last_nonmutant::Int)
popsz = popsize(alg.population)
last_submitted = last_accepted = last_nonmutant
mutant_job_ids = BitSet()
while last_accepted < popsz
# process the calculated fitnesses
process_completed!(alg.evaluator) do fit_job_id, candi
if pop!(mutant_job_ids, fit_job_id, 0) > 0 # it's our mutant!
candi.index = (last_accepted += 1)
accept_candi!(alg.population, candi)
return true
else
return false # some unrelated candidate, skip
end
end
if last_submitted < popsz
# generate the new mutant and submit to fitness evaluation
mutant = acquire_mutant(alg, last_submitted+=1, last_nonmutant)
push!(mutant_job_ids, async_update_fitness(alg.evaluator, mutant, wait=true))
else
yield() # allow the other tasks to process the incoming fitnesses
end
end
end

"""
Restart Borg MOEA.

Expand Down
29 changes: 29 additions & 0 deletions src/ntuple_fitness.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,35 @@ aggregate(f::NTuple{N,F}, fs::TupleFitnessScheme{N,F}) where {N,F} = fs.aggregat
@inline is_better(f1::NTuple{N,F}, f2::NTuple{N,F}, fs::TupleFitnessScheme{N,F,NTuple{N,F}}) where {N,F} = hat_compare(f1, f2, fs, -1) == -1
@inline is_worse(f1::NTuple{N,F}, f2::NTuple{N,F}, fs::TupleFitnessScheme{N,F,NTuple{N,F}}) where {N,F} = hat_compare(f1, f2, fs, 1) == 1

"""
getfitness(::Type{FitnessType}, src::AbstractMatrix, col_ix)

Get fitness stored in a column of a matrix.
Used by `ParallelEvaluator` to get the fitness from the `SharedArray`.
"""
getfitness(::Type{F}, src::AbstractVector{F}) where F<:Number = src[1]

@generated function getfitness(::Type{NTuple{N,F}}, src::AbstractVector{F}) where {N, F<:Number}
quote
return Base.Cartesian.@ntuple $N i -> src[i]
end
end

"""
setfitness!(src::AbstractMatrix, fitness, col_ix)

Put fitness into a column of a matrix.
Used by `ParallelEvaluator` to store the fitness in the `SharedArray`.
"""
setfitness!(dest::AbstractVector{F}, fitness::F) where F<:Number =
dest[1] = fitness

@generated function setfitness!(dest::AbstractVector{F}, fitness::NTuple{N,F}) where {N, F<:Number}
quote
Base.Cartesian.@nexprs $N i -> dest[i] = fitness[i]
end
end

"""
Pareto dominance for `N`-tuple (`N`≧1) fitnesses.

Expand Down
Loading