From ac645d479773021087b4b40761319bd6df80926d Mon Sep 17 00:00:00 2001 From: Massimiliano Incudini Date: Tue, 23 Jan 2024 18:06:14 +0100 Subject: [PATCH 1/2] Added parallel tempering --- quepistasis/header/python_wrapper.h | 28 +++ quepistasis/header/snps_optimization.h | 10 ++ quepistasis/parallel_tempering.py | 239 +++++++++++++++++++++++++ quepistasis/quantum_annealer.py | 2 +- quepistasis/src/python_wrapper.cpp | 57 ++++++ quepistasis/src/snps_optimization.cpp | 19 ++ 6 files changed, 354 insertions(+), 1 deletion(-) create mode 100644 quepistasis/parallel_tempering.py diff --git a/quepistasis/header/python_wrapper.h b/quepistasis/header/python_wrapper.h index 5fcf53113..75cf0cc6d 100644 --- a/quepistasis/header/python_wrapper.h +++ b/quepistasis/header/python_wrapper.h @@ -25,11 +25,21 @@ class PythonWrapper { */ PyObject *pModuleQAOA = nullptr; + /** + * Object representing the module used to interact with the Parallel Tempering procedure + */ + PyObject *pModuleParallelTempering = nullptr; + /** * Object representing the function to call "run_quantum_annealer" function in Python */ PyObject *pFuncQuantumAnnealer = nullptr; + /** + * Object representing the function to call "run_parallel_tempering" function in Python + */ + PyObject *pFuncParallelTempering = nullptr; + /** * Object representing the function to call "run_quantum_annealer" function in Python */ @@ -145,6 +155,24 @@ class PythonWrapper { double rev_annealing_ramp_time, double rev_annealing_pause_time, double rev_annealing_s_target, const char* save_path); + + /** + * @brief Run the quantum annealer method in Python + * @param h: Ising formulation h coefficients (which in Python results in the object np.nparray[float] of shape (N,)) + * @param j: Ising formulation J coefficients (which - together with start and end params - in Python results in the object dict[tuple(int,int) -> float]) + * @param start: vector Ising formulation J starting index + * @param end: vector Ising formulation J starting index + * @param num_chains Number of parallel walkers + * @param num_steps Number of operations per walker + * @param save_path: Path where to save the file including the first part of the file name + * @return solution to the Ising problem or the empty list in case of errors. + */ + std::vector run_parallel_tempering( + std::vector h, std::vector J, std::vector start, std::vector end, + int num_chains, int num_steps, const char* save_path); + + + /** * @brief Run the optimizers available of Azure QIO * @param h: Ising formulation h coefficients (which in Python results in the object np.nparray[float] of shape (N,)) diff --git a/quepistasis/header/snps_optimization.h b/quepistasis/header/snps_optimization.h index bcfe4df6c..ebf85dd77 100644 --- a/quepistasis/header/snps_optimization.h +++ b/quepistasis/header/snps_optimization.h @@ -177,6 +177,16 @@ class snps_qubo_matrix : public matrix { char* initial_state = nullptr, const char* save_path = nullptr); + /** + * @brief Solve the current QUBO matrix with quantum annealing (call Python then D-Wave devices). In case of errors the return value is the empty vector. + * @param num_chains Number of parallel walkers + * @param num_steps Number of operations per walker + * @param save_path Path where to save the file including the first part of the file name + * @return A vector of 'n_cliques' elements, each element represents a suggested SNPs set OR the empty vector in case of errors. + */ + std::vector> solve_cpu_parallel_tempering(int num_chains, int num_steps, const char* save_path); + + /** * @brief Solve the current QUBO matrix with quantum annealing (call Python then D-Wave devices). In case of errors the return value is the empty vector. * @param token: D-Wave platform token diff --git a/quepistasis/parallel_tempering.py b/quepistasis/parallel_tempering.py new file mode 100644 index 000000000..a82cbdeed --- /dev/null +++ b/quepistasis/parallel_tempering.py @@ -0,0 +1,239 @@ +import numpy as np +from abc import ABC, abstractmethod +from utils import Trace + + +class ParallelTempering(ABC): + + def __init__(self, num_chains, num_steps, betas): + """ + Initialize the ParallelTempering instance. + + Parameters: + - num_chains (int): Number of parallel chains. + - num_steps (int): Number of MCMC steps for each chain. + - betas (array-like): Temperatures for each chain. + """ + self.num_chains = num_chains + self.num_steps = num_steps + self.betas = betas + self.chains = None + self.log_likes = None + + @abstractmethod + def initial_probability(self): + """ + Abstract method to provide the initial probability configuration for each chain. + """ + pass + + @abstractmethod + def log_like(self, x): + """ + Abstract method to provide the log likelihood function for a given configuration. + + Parameters: + - x (array-like): Configuration for which to calculate the log likelihood. + + Returns: + - float: Log likelihood. + """ + pass + + @abstractmethod + def log_prior(self, x): + """ + Abstract method to provide the log prior function for a given configuration. + + Parameters: + - x (array-like): Configuration for which to calculate the log prior. + + Returns: + - float: Log prior. + """ + pass + + def metropolis_hastings_proposal(self, x, beta): + """ + Metropolis-Hastings proposal for binary variables. + + Parameters: + - x (array-like): Current configuration. + - beta (float): Inverse temperature for the current chain. + + Returns: + - array-like: Proposed configuration. + """ + proposal = np.copy(x) + flip_index = np.random.randint(len(x)) + proposal[flip_index] *= -1 + return proposal + + def acceptance_probability(self, old_prob, new_prob, beta): + """ + Calculate the acceptance probability for the Metropolis-Hastings step. + + Parameters: + - old_prob (float): Log probability of the current configuration. + - new_prob (float): Log probability of the proposed configuration. + - beta (float): Inverse temperature for the current chain. + + Returns: + - float: Acceptance probability. + """ + return min(1, np.exp(beta * (old_prob - new_prob))) + + def run(self): + """ + Run the Parallel Tempering MCMC algorithm. + + Returns: + - tuple: Chains and log likelihoods. + """ + ndim = len(self.initial_probability()) + self.chains = np.zeros((self.num_chains, self.num_steps, ndim)) + self.log_likes = np.zeros((self.num_chains, self.num_steps)) + + for i in range(self.num_chains): + self.chains[i, 0] = self.initial_probability() + self.log_likes[i, 0] = self.log_like(self.chains[i, 0]) + self.log_prior(self.chains[i, 0]) + + for step in range(1, self.num_steps): + for i in range(self.num_chains): + current_chain = self.chains[i, step - 1] + current_log_prob = self.log_like(current_chain) + self.log_prior(current_chain) + + # Perform Metropolis-Hastings proposal + proposed_chain = self.metropolis_hastings_proposal(current_chain, self.betas[i]) + + # Calculate log likelihood and log prior for proposed chain + proposed_log_prob = self.log_like(proposed_chain) + self.log_prior(proposed_chain) + + # Accept or reject the proposed chain + if np.log(np.random.uniform()) < self.acceptance_probability(current_log_prob, proposed_log_prob, self.betas[i]): + self.chains[i, step] = proposed_chain + self.log_likes[i, step] = proposed_log_prob + else: + self.chains[i, step] = current_chain + self.log_likes[i, step] = current_log_prob + + # Exchange states between adjacent chains + for i in range(self.num_chains - 1): + diff_prob = self.betas[i + 1] * (self.log_likes[i, step] - self.log_likes[i + 1, step]) + if np.log(np.random.uniform()) < diff_prob: + # Swap states between chains i and i+1 + self.chains[i, step], self.chains[i + 1, step] = self.chains[i + 1, step], self.chains[i, step] + self.log_likes[i, step], self.log_likes[i + 1, step] = self.log_likes[i + 1, step], self.log_likes[i, step] + + return self.chains, self.log_likes + + + +class IsingParallelTempering(ParallelTempering): + + def __init__(self, h, J, num_chains, num_steps, betas): + """ + Initialize the IsingParallelTempering instance. + + Parameters: + - h (numpy array): External magnetic field. + - J (dictionary): Coupling strengths between spins (interaction matrix). + - num_chains (int): Number of parallel chains. + - num_steps (int): Number of MCMC steps for each chain. + - betas (array-like): Temperatures for each chain. + """ + super().__init__(num_chains, num_steps, betas) + self.h = h + self.J = J + self.ising_upper = np.sum(np.abs(self.h)) + np.sum(np.abs(np.array(list(J.values())))) + self.ndim = len(h) + + def initial_probability(self): + """ + Generate the initial spin configuration. + + Returns: + - array-like: Initial spin configuration (+1 or -1). + """ + return np.random.choice([-1, 1], size=self.ndim) + + @staticmethod + def ising_energy(x, h, J): + energy = np.sum(h * x) + for (i, j), value in J.items(): + energy += value * x[i] * x[j] + return energy + + def log_like(self, x): + """ + Calculate the log likelihood for a given spin configuration. + + Parameters: + - x (array-like): Spin configuration for which to calculate the log likelihood. + + Returns: + - float: Log likelihood + """ + energy = self.ising_energy(x, self.h, self.J) + return -1 * (self.ising_upper - energy) + + def log_prior(self, x): + """ + Calculate the log prior for a given spin configuration. + + Parameters: + - x (array-like): Spin configuration for which to calculate the log prior. + + Returns: + - float: Log prior (always 0.0 in this example). + """ + return 0.0 + + def get_solution(self): + """ + Return the solution of the optimization problem. + + Returns: + - np.ndarray: spin configuration of the best solution. + - float: Log likelihood of the best solution. + - float: Energy of the best solution. + """ + + best_chain_index, best_step_index = np.unravel_index(np.argmin(self.log_likes), self.log_likes.shape) + best_configuration = self.chains[best_chain_index, best_step_index] + best_log_likelihood = self.log_likes[best_chain_index, best_step_index] + best_energy = self.ising_energy(best_configuration, self.h, self.J) + return best_configuration, best_log_likelihood, best_energy + + +def run_parallel_tempering(h, J, num_chains, num_steps, save_path): + + # trace the input except for the qubo formulation itself which is too large + trace = Trace(print=True, path=save_path) + trace.add('input', None, 'num_chains', num_chains) + trace.add('input', None, 'num_steps', num_steps) + trace.add('input', None, 'save_path', save_path) + betas = np.geomspace(1, 1e-2, num_chains) + ising_pt = IsingParallelTempering(h, J, num_chains, num_steps, betas) + ising_pt.run() + spins, loglike, energy = ising_pt.get_solution() + trace.add('solution', None, 'spins', spins) + trace.add('solution', None, 'loglike', loglike) + trace.add('solution', None, 'energy', energy) + return spins.tolist() + +def test_parallel_tempering(): + """ + Run a basic Ising model to check if the code works correctly. + """ + h = np.array([5, 10, -20]) + J = {(0, 1): 1, (1, 2): 2} + + num_chains = 4 + num_steps = 1000 + betas = np.geomspace(1, 1e-2, num_chains) + + ising_pt = IsingParallelTempering(h, J, num_chains, num_steps, betas) + ising_pt.run() + spins, loglike, energy = ising_pt.get_solution() + print(f"{spins=} {loglike=} {energy=}") diff --git a/quepistasis/quantum_annealer.py b/quepistasis/quantum_annealer.py index 4a87197e5..6e77ea237 100644 --- a/quepistasis/quantum_annealer.py +++ b/quepistasis/quantum_annealer.py @@ -265,7 +265,7 @@ def run_quantum_annealer(token, h, J, num_reads, solver_idx, client.close() trace.save() - if best_fw_energy > best_fw_energy: + if best_fw_energy > best_rev_energy: return best_fw_sample if type(best_fw_sample) == 'list' else best_fw_sample.tolist() else: return best_rev_sample if type(best_rev_sample) == 'list' else best_rev_sample.tolist() diff --git a/quepistasis/src/python_wrapper.cpp b/quepistasis/src/python_wrapper.cpp index b0836eeed..d939067a4 100644 --- a/quepistasis/src/python_wrapper.cpp +++ b/quepistasis/src/python_wrapper.cpp @@ -34,6 +34,10 @@ int PythonWrapper::init() { this->pModuleQAOA = get_module("azure_device"); if(this->pModuleQAOA == NULL) { return -1; } + std::cout << "Load parallel tempering module" << std::endl << std::flush; + this->pModuleParallelTempering = get_module("parallel_tempering"); + if(this->pModuleParallelTempering == NULL) { return -1; } + std::cout << "Load QA function" << std::endl << std::flush; this->pFuncQuantumAnnealer = get_function("run_quantum_annealer", pModuleQuantumAnnealer); if(this->pFuncQuantumAnnealer == NULL) { return -1; } @@ -46,6 +50,10 @@ int PythonWrapper::init() { this->pFuncQAOA = get_function("run_qaoa", pModuleQAOA); if(this->pFuncQAOA == NULL) { return -1; } + std::cout << "Load parallel tempering function" << std::endl << std::flush; + this->pFuncParallelTempering = get_function("run_parallel_tempering", pModuleParallelTempering); + if(this->pFuncParallelTempering == NULL) { return -1; } + std::cout << "Loaded" << std::endl << std::flush; return 0; @@ -55,9 +63,11 @@ void PythonWrapper::close() { Py_DECREF(this->pFuncQAOA); Py_DECREF(this->pFuncAzureOptimizer); Py_DECREF(this->pFuncQuantumAnnealer); + Py_DECREF(this->pFuncParallelTempering); Py_DECREF(this->pModuleQAOA); Py_DECREF(this->pModuleAzureOptimizer); Py_DECREF(this->pModuleQuantumAnnealer); + Py_DECREF(this->pModuleParallelTempering); std::cout << "Finalizing..." << std::endl << std::flush; if(Py_FinalizeEx() != 0) { // Python >= 3.6 PyErr_Print(); @@ -305,6 +315,53 @@ std::vector PythonWrapper::run_azure_optimizers( return result; } +std::vector PythonWrapper::run_parallel_tempering( + std::vector h, std::vector J, std::vector start, std::vector end, + int num_chains, int num_steps, const char* save_path) { + + std::vector garbage; + + // create parameters + PyObject* pH = this->create_h(h, garbage); + PyObject* pJ = this->create_j(J, start, end, garbage); + PyObject* pNumChains = PyLong_FromLong(num_chains); + PyObject* pNumSteps = PyLong_FromLong(num_steps); + PyObject* pSavePath = PyUnicode_FromString(save_path); + + PyObject *pArgs = PyTuple_New(5); + PyTuple_SetItem(pArgs, 0, pH); + PyTuple_SetItem(pArgs, 1, pJ); + PyTuple_SetItem(pArgs, 2, pNumChains); + PyTuple_SetItem(pArgs, 3, pNumSteps); + PyTuple_SetItem(pArgs, 4, pSavePath); + + // call Python + PyObject *pResult = this->call_function(this->pFuncParallelTempering, pArgs); + + // convert Python result in C format + std::vector result = this->unpack_result(pResult); + + // clean parameters and results object + // for(PyObject* pObj : garbage) { + // Py_DECREF(pObj); + // } + // Py_DECREF(pToken); + // Py_DECREF(pH); + // Py_DECREF(pJ); + // Py_DECREF(pNumReads); + // Py_DECREF(pSolverIdx); + // Py_DECREF(pFwAnnealingRampTime); + // Py_DECREF(pFwAnnealingPauseTime); + // Py_DECREF(pRevAnnealingRampTime); + // Py_DECREF(pRevAnnealingPauseTime); + // Py_DECREF(pRevAnnealingSTarget); + // Py_DECREF(pSavePath); + // Py_DECREF(pArgs); + // Py_DECREF(pResult); + + return result; +} + std::vector PythonWrapper::run_quantum_annealer( const char* token, std::vector h, std::vector J, std::vector start, std::vector end, int num_reads, int solver_idx, double fw_annealing_ramp_time, double fw_annealing_pause_time, diff --git a/quepistasis/src/snps_optimization.cpp b/quepistasis/src/snps_optimization.cpp index 0721a95c9..6baa6750f 100644 --- a/quepistasis/src/snps_optimization.cpp +++ b/quepistasis/src/snps_optimization.cpp @@ -396,6 +396,25 @@ std::vector> snps_qubo_matrix::get_snp_set_list_from_solution_v return snp_set_list; } +std::vector> snps_qubo_matrix::solve_cpu_parallel_tempering( + int num_chains, int num_steps, const char* save_path) { + + // Convert QUBO into Ising formulation + std::tuple, std::vector, std::vector, std::vector, double> ising = get_ising(); + std::vector h = std::get<0>(ising); + std::vector coupler_start = std::get<1>(ising); + std::vector coupler_end = std::get<2>(ising); + std::vector J = std::get<3>(ising); + double offset = std::get<4>(ising); + + // Call python + std::vector best_solution = PythonWrapper::get_instance().run_parallel_tempering( + h, J, coupler_start, coupler_end, num_chains, num_steps, save_path); + + std::vector> snp_set_list = get_snp_set_list_from_solution_vector(best_solution, n_cliques, n_snps); + + return snp_set_list; +} std::vector> snps_qubo_matrix::solve_dwave_quantum_annealing( const char* token, From 1fdcf8f3583e62eef36c29239ffd4cd5d8cd58ab Mon Sep 17 00:00:00 2001 From: Massimiliano Incudini Date: Tue, 30 Jan 2024 11:43:24 +0100 Subject: [PATCH 2/2] Make parallel tempering parallel again! --- quepistasis/parallel_tempering.py | 111 ++++++++++++++++++++++-------- 1 file changed, 84 insertions(+), 27 deletions(-) diff --git a/quepistasis/parallel_tempering.py b/quepistasis/parallel_tempering.py index a82cbdeed..5efa86935 100644 --- a/quepistasis/parallel_tempering.py +++ b/quepistasis/parallel_tempering.py @@ -1,6 +1,8 @@ import numpy as np from abc import ABC, abstractmethod from utils import Trace +from joblib import Parallel, delayed +import random class ParallelTempering(ABC): @@ -83,6 +85,28 @@ def acceptance_probability(self, old_prob, new_prob, beta): """ return min(1, np.exp(beta * (old_prob - new_prob))) + def initialize_chain(self, i): + self.chains[i, 0] = self.initial_probability() + self.log_likes[i, 0] = self.log_like(self.chains[i, 0]) + self.log_prior(self.chains[i, 0]) + + def step_chain(self, step, i): + current_chain = self.chains[i, step - 1] + current_log_prob = self.log_like(current_chain) + self.log_prior(current_chain) + + # Perform Metropolis-Hastings proposal + proposed_chain = self.metropolis_hastings_proposal(current_chain, self.betas[i]) + + # Calculate log likelihood and log prior for proposed chain + proposed_log_prob = self.log_like(proposed_chain) + self.log_prior(proposed_chain) + + # Accept or reject the proposed chain + if np.log(np.random.uniform()) < self.acceptance_probability(current_log_prob, proposed_log_prob, self.betas[i]): + self.chains[i, step] = proposed_chain + self.log_likes[i, step] = proposed_log_prob + else: + self.chains[i, step] = current_chain + self.log_likes[i, step] = current_log_prob + def run(self): """ Run the Parallel Tempering MCMC algorithm. @@ -95,27 +119,44 @@ def run(self): self.log_likes = np.zeros((self.num_chains, self.num_steps)) for i in range(self.num_chains): - self.chains[i, 0] = self.initial_probability() - self.log_likes[i, 0] = self.log_like(self.chains[i, 0]) + self.log_prior(self.chains[i, 0]) + self.initialize_chain(i) for step in range(1, self.num_steps): + for i in range(self.num_chains): - current_chain = self.chains[i, step - 1] - current_log_prob = self.log_like(current_chain) + self.log_prior(current_chain) + self.step_chain(step, i) + + # Exchange states between adjacent chains + for i in range(self.num_chains - 1): + diff_prob = self.betas[i + 1] * (self.log_likes[i, step] - self.log_likes[i + 1, step]) + if np.log(np.random.uniform()) < diff_prob: + # Swap states between chains i and i+1 + self.chains[i, step], self.chains[i + 1, step] = self.chains[i + 1, step], self.chains[i, step] + self.log_likes[i, step], self.log_likes[i + 1, step] = self.log_likes[i + 1, step], self.log_likes[i, step] + + return self.chains, self.log_likes - # Perform Metropolis-Hastings proposal - proposed_chain = self.metropolis_hastings_proposal(current_chain, self.betas[i]) + def run_parallel(self): + """ + Run the Parallel Tempering MCMC algorithm in parallel. + + Returns: + - tuple: Chains and log likelihoods. + """ + + ndim = len(self.initial_probability()) + self.chains = np.zeros((self.num_chains, self.num_steps, ndim)) + self.log_likes = np.zeros((self.num_chains, self.num_steps)) + + # Initialize chains and log likelihoods for each chain + Parallel(n_jobs=self.num_chains, backend="threading")(delayed(self.initialize_chain)(i) for i in range(self.num_chains)) + print(self.chains) - # Calculate log likelihood and log prior for proposed chain - proposed_log_prob = self.log_like(proposed_chain) + self.log_prior(proposed_chain) + for step in range(1, self.num_steps): - # Accept or reject the proposed chain - if np.log(np.random.uniform()) < self.acceptance_probability(current_log_prob, proposed_log_prob, self.betas[i]): - self.chains[i, step] = proposed_chain - self.log_likes[i, step] = proposed_log_prob - else: - self.chains[i, step] = current_chain - self.log_likes[i, step] = current_log_prob + # Run Metropolis-Hastings proposals and updates in parallel + Parallel(n_jobs=self.num_chains, backend="threading")(delayed(self.step_chain)(step, i) for i in range(self.num_chains)) + print(self.chains) # Exchange states between adjacent chains for i in range(self.num_chains - 1): @@ -127,8 +168,6 @@ def run(self): return self.chains, self.log_likes - - class IsingParallelTempering(ParallelTempering): def __init__(self, h, J, num_chains, num_steps, betas): @@ -206,6 +245,16 @@ def get_solution(self): return best_configuration, best_log_likelihood, best_energy +def sparsify_solution(spins, MAX_UP_SPIN=10): + # delete snps if they are too many + indexes_to_one = [i for i, value in enumerate(spins) if value == 1] + if len(indexes_to_one) > MAX_UP_SPIN: + random.shuffle(indexes_to_one) + for idx in indexes_to_one[MAX_UP_SPIN:]: + spins[idx] = -1 + return spins + + def run_parallel_tempering(h, J, num_chains, num_steps, save_path): # trace the input except for the qubo formulation itself which is too large @@ -215,25 +264,33 @@ def run_parallel_tempering(h, J, num_chains, num_steps, save_path): trace.add('input', None, 'save_path', save_path) betas = np.geomspace(1, 1e-2, num_chains) ising_pt = IsingParallelTempering(h, J, num_chains, num_steps, betas) - ising_pt.run() + ising_pt.run_parallel() spins, loglike, energy = ising_pt.get_solution() - trace.add('solution', None, 'spins', spins) - trace.add('solution', None, 'loglike', loglike) - trace.add('solution', None, 'energy', energy) - return spins.tolist() - -def test_parallel_tempering(): + spins = [int(i) for i in spins.tolist()] + trace.add('solution', 'original', 'spins', spins) + trace.add('solution', 'original', 'loglike', loglike) + trace.add('solution', 'original', 'energy', energy) + spins = sparsify_solution(spins, 10) + trace.add('solution', 'sparsified', 'spins', spins) + trace.add('solution', 'sparsified', 'loglike', None) + trace.add('solution', 'sparsified', 'energy', IsingParallelTempering.ising_energy(spins, h, J)) + return spins + +def test_parallel_tempering(parallel=False): """ Run a basic Ising model to check if the code works correctly. """ h = np.array([5, 10, -20]) J = {(0, 1): 1, (1, 2): 2} - num_chains = 4 - num_steps = 1000 + num_chains = 2 + num_steps = 3 betas = np.geomspace(1, 1e-2, num_chains) ising_pt = IsingParallelTempering(h, J, num_chains, num_steps, betas) - ising_pt.run() + if parallel: + ising_pt.run_parallel() + else: + ising_pt.run() spins, loglike, energy = ising_pt.get_solution() print(f"{spins=} {loglike=} {energy=}")