Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eval parameter bounds #354

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 78 additions & 3 deletions pyabc/external/base.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import numpy as np
import pandas as pd
import tempfile
import subprocess # noqa: S404
import os
from typing import List
import logging

import copy
from ..model import Model
from ..parameters import Parameter

from .utils import timethis

logger = logging.getLogger("External")

Expand Down Expand Up @@ -121,7 +122,7 @@ def run(self, args: List[str] = None, cmd: str = None, loc: str = None):
# call
if cmd is not None:
status = subprocess.run(
cmd, shell=True, **stdout, **stderr) # noqa: S602
cmd, shell=True, **stdout, **stderr) # noqa: S602
else:
executable = self.create_executable(loc)
status = subprocess.run( # noqa: S603
Expand Down Expand Up @@ -196,6 +197,80 @@ def __call__(self, pars: Parameter):
def sample(self, pars):
return self(pars)

@timethis
def sample_timing(self, pars):
return self(pars)

def eval_param_limits(self, limits):
"""
evaluate single parameter's boundary value on computation time.

Parameters
----------
limits: dict
the lower and upper boundary values of parameters. The key would
be the parameter name and the value would be a list of the lower
and upper limit of parameter value, e.g., [lower, upper].

Returns
-------
time_eval_dict: dict
a dictionary that contains the parameter names as key and a list
as a value. The list contains the computation time when using
lower and upper limits, e.g., [lower, upper].
"""
time_eval_dict = dict()
for key, val in limits.items():
lower_bound = self.sample_timing({key: val[0]})
upper_bound = self.sample_timing({key: val[1]})
time_eval_dict[key] = [lower_bound, upper_bound]
return time_eval_dict

def eval_param_limits_matrix(self, limits):
"""
evaluate two paramters' boundary values on computation time.

Parameters
----------
limits: dict
the lower and upper boundary values of parameters. The key would
be the parameter name and the value would be a list of the lower
and upper limit of parameter value, e.g., [lower, upper].

Returns
-------
time_eval_mat_df_lower: df
a dataframe for the computation time measured when using the lower
limit value of parameters.
time_eval_mat_df_upper: df
a dataframe for the computation time measured when using the upper
limit value of parameters.
"""
time_eval_mat = np.zeros(shape=(len(limits), len(limits)))
time_eval_mat_df_lower = pd.DataFrame(time_eval_mat,
columns=[list(limits.keys())],
index=[list(limits.keys())])
time_eval_mat_df_upper = copy.deepcopy(time_eval_mat_df_lower)
for i, (key_col, val_col) in enumerate(limits.items(), 0):
for j, (key_row, val_row) in enumerate(limits.items(), 0):
if i < j:
time_eval_mat_df_lower.loc[[key_col], [key_row]] = 0
time_eval_mat_df_upper.loc[[key_col], [key_row]] = 0

if key_col == key_row:
lower_bound = self.sample_timing({key_col: val_col[0]})
upper_bound = self.sample_timing({key_col: val_col[1]})

else:
lower_bound = self.sample_timing(
{key_col: val_col[0], key_row: val_row[0]})
lower_bound = self.sample_timing(
{key_col: val_col[1], key_row: val_row[1]})
time_eval_mat_df_lower.loc[[key_col], [key_row]] = lower_bound
time_eval_mat_df_upper.loc[[key_col], [key_row]] = upper_bound

return time_eval_mat_df_lower, time_eval_mat_df_upper


class ExternalSumStat:
"""
Expand Down
12 changes: 12 additions & 0 deletions pyabc/external/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import time
from functools import wraps

def timethis(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
return end - start
return wrapper

60 changes: 40 additions & 20 deletions pyabc/inference/smc.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
from pyabc.transition import (
Transition, MultivariateNormalTransition, ModelPerturbationKernel)
from pyabc.weighted_statistics import effective_sample_size
from .util import (
from pyabc.util import (
create_simulate_from_prior_function, create_prior_pdf,
create_transition_pdf, create_simulate_function)
create_transition_pdf, create_simulate_function,
termination_criteria_fulfilled)


logger = logging.getLogger("ABC")
Expand Down Expand Up @@ -211,6 +212,7 @@ def __init__(
self.minimum_epsilon = None
self.max_nr_populations = None
self.min_acceptance_rate = None
self.max_t = None

self._sanity_check()

Expand Down Expand Up @@ -480,8 +482,8 @@ def _sample_from_prior(self, t: int) -> Population:

# call sampler
sample = self.sampler.sample_until_n_accepted(
self.population_size(-1), simulate_one,
max_eval=np.inf, all_accepted=True)
n=self.population_size(-1), simulate_one=simulate_one, t=t,
max_eval=np.inf, all_accepted=True, **self._vars_dict())

# extract accepted population
population = sample.get_accepted_population()
Expand Down Expand Up @@ -546,14 +548,11 @@ def run(self,

Parameters
----------

minimum_epsilon: float, optional
Stop if epsilon is smaller than minimum epsilon specified here.
Defaults in general to 0.0, and to 1.0 for a Temperature epsilon.

max_nr_populations: int, optional (default = np.inf)
The maximum number of populations. Stop if this number is reached.

min_acceptance_rate: float, optional (default = 0.0)
Minimal allowed acceptance rate. Sampling stops if a population
has a lower rate.
Expand Down Expand Up @@ -610,10 +609,10 @@ def run(self,
self.eps.configure_sampler(self.sampler)

# last time point
t_max = t0 + max_nr_populations - 1
self.max_t = t0 + max_nr_populations - 1
# run loop over time points
t = t0
while t <= t_max:
while True:
# get epsilon for generation t
current_eps = self.eps(t)
logger.info(f"t: {t}, eps: {current_eps}.")
Expand All @@ -629,7 +628,8 @@ def run(self,
# perform the sampling
logger.debug(f"Now submitting population {t}.")
sample = self.sampler.sample_until_n_accepted(
pop_size, simulate_one, max_eval)
n=pop_size, simulate_one=simulate_one, t=t,
max_eval=max_eval, **self._vars_dict())

# check sample health
if not sample.ok:
Expand Down Expand Up @@ -661,16 +661,14 @@ def run(self,
self._prepare_next_iteration(
t+1, sample, population, acceptance_rate)

# check termination conditions
if current_eps <= minimum_epsilon:
logger.info("Stopping: minimum epsilon.")
break
elif self.stop_if_only_single_model_alive \
and self.history.nr_of_models_alive() <= 1:
logger.info("Stopping: single model alive.")
break
elif acceptance_rate < min_acceptance_rate:
logger.info("Stopping: minimum acceptance rate.")
if termination_criteria_fulfilled(
current_eps=current_eps, min_eps=minimum_epsilon,
stop_if_single_model_alive=self
.stop_if_only_single_model_alive,
nr_of_models_alive=self.history.nr_of_models_alive(),
acceptance_rate=acceptance_rate,
min_acceptance_rate=min_acceptance_rate,
t=t, max_t=self.max_t):
break

# increment t
Expand Down Expand Up @@ -804,3 +802,25 @@ def _fit_transitions(self, t):
for m in self.history.alive_models(t - 1):
particles, w = self.history.get_distribution(m, t - 1)
self.transitions[m].fit(particles, w)

def _vars_dict(self):
"""Create a dictionary of analysis variables of interest."""
nr_samples_per_parameter = \
self.population_size.nr_samples_per_parameter
return dict(
model_perturbation_kernel=self.model_perturbation_kernel,
transitions=self.transitions,
model_prior=self.model_prior,
parameter_priors=self.parameter_priors,
nr_samples_per_parameter=nr_samples_per_parameter,
models=self.models,
summary_statistics=self.summary_statistics,
x_0=self.x_0,
distance_function=self.distance_function,
eps=self.eps,
acceptor=self.acceptor,
min_acceptance_rate=self.min_acceptance_rate,
min_eps=self.minimum_epsilon,
stop_if_single_model_alive=self.stop_if_only_single_model_alive,
max_t=self.max_t,
)
22 changes: 19 additions & 3 deletions pyabc/population.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"""


from typing import List, Callable
from typing import Callable, List, Tuple
import numpy as np
import pandas as pd
from pyabc.parameters import Parameter
import logging
Expand Down Expand Up @@ -161,7 +162,7 @@ def update_distances(self,
particle.accepted_distances[i] = distance_to_ground_truth(
particle.accepted_sum_stats[i], particle.parameter)

def get_model_probabilities(self) -> dict:
def get_model_probabilities(self) -> pd.DataFrame:
"""
Get probabilities of the individual models.

Expand All @@ -173,7 +174,22 @@ def get_model_probabilities(self) -> dict:
"""

# _model_probabilities are assigned during normalization
return self._model_probabilities
vars = [(key, val) for key, val in self._model_probabilities.items()]
ms = [var[0] for var in vars]
ps = [var[1] for var in vars]
return pd.DataFrame({'m': ms, 'p': ps}).set_index('m')

def get_alive_models(self) -> List:
return self._model_probabilities.keys()

def nr_of_models_alive(self) -> int:
return len(self.get_alive_models())

def get_distribution(self, m: int) -> Tuple[pd.DataFrame, np.ndarray]:
particles = self.to_dict()[m]
parameters = pd.DataFrame([p.parameter for p in particles])
weights = np.array([p.weight for p in particles])
return parameters, weights

def get_weighted_distances(self) -> pd.DataFrame:
"""
Expand Down
5 changes: 3 additions & 2 deletions pyabc/sampler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .base import Sample, Sampler
from .dask_sampler import DaskDistributedSampler
from .multicore_evaluation_parallel import MulticoreEvalParallelSampler
from .redis_eps import (RedisEvalParallelSampler,
RedisEvalParallelSamplerServerStarter)
from .redis_eps import (
RedisEvalParallelSampler,
RedisEvalParallelSamplerServerStarter)
from .concurrent_future import ConcurrentFutureSampler
9 changes: 6 additions & 3 deletions pyabc/sampler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ def wrap_sample(f):
Checks whether the sampling output is valid.
"""
def sample_until_n_accepted(
self, n, simulate_one, max_eval=np.inf, all_accepted=False):
sample = f(self, n, simulate_one, max_eval, all_accepted)
self, n, simulate_one, t, max_eval=np.inf, all_accepted=False,
**kwargs):
sample = f(self, n, simulate_one, t, max_eval, all_accepted, **kwargs)
if sample.n_accepted != n and sample.ok:
# this should not happen if the sampler is configured correctly
raise AssertionError(
Expand Down Expand Up @@ -203,8 +204,10 @@ def sample_until_n_accepted(
self,
n: int,
simulate_one: Callable,
t: int,
max_eval: int = np.inf,
all_accepted: bool = False) -> Sample:
all_accepted: bool = False,
**kwargs) -> Sample:
"""
Performs the sampling, i.e. creation of a new generation (i.e.
population) of particles.
Expand Down
4 changes: 2 additions & 2 deletions pyabc/sampler/eps_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ def full_submit_function_pickle(self, job_id):
return result_batch

def sample_until_n_accepted(
self, n, simulate_one, max_eval=np.inf, all_accepted=False,
show_progress=False):
self, n, simulate_one, t, max_eval=np.inf, all_accepted=False,
**kwargs):
# For default pickling
if self.default_pickle:
self.simulate_accept_one = pickle.dumps(simulate_one)
Expand Down
3 changes: 2 additions & 1 deletion pyabc/sampler/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def map_function(self, simulate_one, _):
return sample, nr_simulations

def sample_until_n_accepted(
self, n, simulate_one, max_eval=np.inf, all_accepted=False):
self, n, simulate_one, t, max_eval=np.inf, all_accepted=False,
**kwargs):
# pickle them as a tuple instead of individual pickling
# this should save time and should make better use of
# shared references.
Expand Down
3 changes: 2 additions & 1 deletion pyabc/sampler/multicore.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class MulticoreParticleParallelSampler(MultiCoreSampler):
"""

def sample_until_n_accepted(
self, n, simulate_one, max_eval=np.inf, all_accepted=False):
self, n, simulate_one, t, max_eval=np.inf, all_accepted=False,
**kwargs):
# starting more than n jobs
# does not help in this parallelization scheme
n_procs = min(n, self.n_procs)
Expand Down
3 changes: 2 additions & 1 deletion pyabc/sampler/multicore_evaluation_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class MulticoreEvalParallelSampler(MultiCoreSampler):
"""

def sample_until_n_accepted(
self, n, simulate_one, max_eval=np.inf, all_accepted=False):
self, n, simulate_one, t, max_eval=np.inf, all_accepted=False,
**kwargs):
n_eval = Value(c_longlong)
n_eval.value = 0

Expand Down
1 change: 1 addition & 0 deletions pyabc/sampler/multicorebase.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,5 @@ def get_if_worker_healthy(workers: List[Process], queue: Queue):
except Empty:
if not healthy(workers):
raise ProcessError("At least one worker is dead.")
continue
raise Exception("The code should never reach here")
Loading