diff --git a/qforce/additionalstructures.py b/qforce/additionalstructures.py deleted file mode 100644 index 68b0712..0000000 --- a/qforce/additionalstructures.py +++ /dev/null @@ -1,542 +0,0 @@ -import os -from shutil import copy2 as copy -from itertools import chain -import numpy as np -# -from colt import Colt -from ase.io import read -from calkeeper import CalculationIncompleteError - - -class CalculationStorage: - """Basic class to store calculations and calculation results""" - - def __init__(self, calculations=None, results=None, **metadata): - if calculations is None: - calculations = [] - self.calculations = calculations - if results is None: - results = [] - self.results = results - self._meta = metadata - - def get(self, arg, default=None): - return self._meta.get(arg, default) - - def __getitem__(self, arg): - return self._meta[arg] - - -class CostumStructureCreator(Colt): - """Creator class to generate structures for the fitting procedure - Basic idea to call the creator in this way: - - pre: optional - main: main step, needs to be implemented - - """ - - def __init__(self, weight): - self.weight = weight - # will be overwritten! - self._folder = None - - @property - def folder(self): - if self._folder is None: - raise ValueError("Please setup main folder") - return self._folder - - @folder.setter - def folder(self, value): - self._folder = value - - def setup_pre(self, qm): - pass - - def check_pre(self): - pass - - def parse_pre(self, qm): - pass - - def setup_main(self, qm): - raise NotImplementedError - - def check_main(self): - raise NotImplementedError - - def parse_main(self, qm): - raise NotImplementedError - - def enouts(self): - raise NotImplementedError - - def gradouts(self): - raise NotImplementedError - - def hessouts(self): - raise NotImplementedError - - -class AdditionalStructureCreator(CostumStructureCreator): - - name = None - __classes = {} - __predefined = ['dihedrals', 'hessian'] - _user_input = "" - - def __init_subclass__(cls, **kwargs): - super().__init_subclass__(**kwargs) - if cls.name in cls.__predefined: - raise ValueError(f"Cannot create class with name {cls.name}") - cls.__classes[cls.name] = cls - cls._user_input += ("\n# Weight of the structures in the forcefield fit" - "\n weight = 1 :: float \n") - - @classmethod - def classes(cls): - return cls.__classes - - @classmethod - def get(cls, name): - return cls.__classes.get(name) - - def setup_main(self, qm): - raise NotImplementedError - - def check_main(self): - raise NotImplementedError - - def parse_main(self, qm): - raise NotImplementedError - - def enouts(self): - raise NotImplementedError - - def gradouts(self): - raise NotImplementedError - - def hessouts(self): - raise NotImplementedError - - -class StructuresFromFile(AdditionalStructureCreator): - - name = 'fromfile' - - _user_input = """ - en_struct = :: existing_file, optional - grad_struct = :: existing_file, optional - hess_struct = :: existing_file, optional - """ - - def __init__(self, weight, en_struct, grad_struct, hess_struct): - super().__init__(weight) - self._energy = CalculationStorage(file=en_struct) - self._grad = CalculationStorage(file=grad_struct) - self._hess = CalculationStorage(file=hess_struct) - - @classmethod - def from_config(cls, config): - if not config['en_struct'] and not config['grad_struct'] and not config['hess_struct']: - return None - return cls(config['weight'], config['en_struct'], config['grad_struct'], - config['hess_struct']) - - @staticmethod - def _fileiter(filename): - molecules = read(filename, index=':') - for i, molecule in enumerate(molecules): - yield i, (molecule.get_positions(), molecule.get_atomic_numbers()) - - def setup_main(self, qm): - parent = self.folder - - if self._energy['file'] is not None: - folder = parent / 'en_structs' - os.makedirs(folder, exist_ok=True) - self._energy.calculations = qm.do_sp_calculations(folder, - self._fileiter(self._energy['file'])) - if self._grad['file'] is not None: - folder = parent / 'grad_structs' - os.makedirs(folder, exist_ok=True) - self._grad.calculations = qm.do_grad_calculations(folder, - self._fileiter(self._grad['file'])) - if self._hess['file'] is not None: - folder = parent / 'hess_structs' - os.makedirs(folder, exist_ok=True) - self._hess.calculations = qm.do_hessian_calculations(folder, - self._fileiter(self._hess['file'])) - - def check_main(self): - for calculation in chain(self._energy.calculations, self._grad.calculations, - self._hess.calculations): - try: - _ = calculation.check() - except CalculationIncompleteError: - return calculation - return None - - def parse_main(self, qm): - results = [] - for calculation in self._hess.calculations: - hessian_files = calculation.check() - results.append(qm.read_hessian(hessian_files)) - - en_results = [] - for calculation in self._energy.calculations: - files = calculation.check() - en_results.append(qm.read_energy(files)) - - grad_results = [] - for calculation in self._grad.calculations: - files = calculation.check() - grad_results.append(qm.read_gradient(files)) - - self._energy.results = en_results - self._grad.results = grad_results - self._hess.results = results - - def enouts(self): - return self._energy.results - - def gradouts(self): - return self._grad.results - - def hessouts(self): - return self._hess.results - - -class MetaDynamics(AdditionalStructureCreator): - - name = 'metadynamics' - - _user_input = """ - compute = none :: str :: [none, en, grad] - - # Number of frames used for fitting - n_fit = 100 :: int - # Number of frames used for validation - n_valid = 0 :: int - # temperature (K) - temp = 800 :: float :: >0 - # interval for trajectory printout (fs) - dump = 500.0 :: float :: >0 - # time step for propagation (fs) - step = 2.0 :: float :: >0 - # Bond constraints (0: none, 1: H-only, 2: all) - shake = 0 - - """ - - def __init__(self, weight, compute, config): - self.compute = compute - self.weight = weight - self.n_fit = config['n_fit'] - self.n_valid = config['n_valid'] - self.total_frames = self.n_fit + self.n_valid - - time = config['dump'] * self.total_frames / 1e3 - - self.xtbinput = {'time': time} - for item in ['temp', 'dump', 'step', 'shake']: - self.xtbinput[item] = config[item] - - self._energy = CalculationStorage() - self._grad = CalculationStorage() - # not used so far - self._hess = CalculationStorage() - self._md = CalculationStorage() - - @classmethod - def from_config(cls, config): - if config['compute'] == 'none': - return None - return cls(config['weight'], config['compute'], config) - - @staticmethod - def _fileiter(filename): - molecules = read(filename, index=':') - for i, molecule in enumerate(molecules): - yield i, (molecule.get_positions(), molecule.get_atomic_numbers()) - - def setup_pre(self, qm): - """setup md calculation""" - folder = self.folder / '0_md' - os.makedirs(folder, exist_ok=True) - - # setup - self._md.calculations = [qm.xtb_md(folder, self.xtbinput)] - - def check_pre(self): - """check that the md calculation was finished""" - for calc in self._md.calculations: - try: - _ = calc.check() - except CalculationIncompleteError: - return calc - return None - - def parse_pre(self, qm): - files = self._md.calculations[0].check() - traj = files['traj'] - filename = self.folder / 'xtbmd.xyz' - copy(traj, filename) - self._md.results = [{'file': filename}] - - def setup_main(self, qm): - # setupt - parent = self.folder - # currently only one result there - filename = self._md.results[0]['file'] - - traj = self._fileiter(filename) - - if self.compute == 'en': - folder = parent / '1_en_structs' - os.makedirs(folder, exist_ok=True) - self._energy.calculations = qm.do_sp_calculations(folder, traj) - elif self.compute == 'grad': - folder = parent / '1_grad_structs' - os.makedirs(folder, exist_ok=True) - self._grad.calculations = qm.do_grad_calculations(folder, traj) - else: - raise ValueError("do not know compute method!") - - def check_main(self): - for calculation in chain(self._energy.calculations, self._grad.calculations): - try: - _ = calculation.check() - except CalculationIncompleteError: - return calculation - return None - - def parse_main(self, qm): - en_results = [] - for i, calculation in enumerate(self._energy.calculations): - files = calculation.check() - if i < self.n_fit: - en_results.append(qm.read_energy(files)) - - grad_results = [] - for i, calculation in enumerate(self._grad.calculations): - files = calculation.check() - if i < self.n_fit: - grad_results.append(qm.read_gradient(files)) - - self._energy.results = en_results - self._grad.results = grad_results - - def enouts(self): - return self._energy.results - - def gradouts(self): - return self._grad.results - - def hessouts(self): - return self._hess.results - - -class HessianOutput(CostumStructureCreator): - - def __init__(self, weight, hessout): - super().__init__(weight) - if not isinstance(hessout, (tuple, list)): - hessout = [hessout] - self._hessout = hessout - - def enouts(self): - return [] - - def gradouts(self): - return [] - - def hessouts(self): - return self._hessout - - def setup_main(self, qm): - pass - - def check_main(self): - pass - - def parse_main(self, qm): - pass - - -class DihedralOutput(CostumStructureCreator): - - def __init__(self, weight, gradouts): - super().__init__(weight) - self._gradouts = gradouts - - def enouts(self): - return [] - - def gradouts(self): - return self._gradouts - - def hessouts(self): - return [] - - def setup_main(self, qm): - pass - - def check_main(self): - pass - - def parse_main(self, qm): - pass - -def minval(itr, value=None): - try: - return min(itr) - except ValueError: - return value - - -class AdditionalStructures(Colt): - - _user_input = """ - energy_element_weights = 1 :: float - gradient_element_weights = 1 :: float - hessian_element_weights = 1 :: float - - hessian_weight = 1 :: float - dihedral_weight = 1 :: float - - """ - - def __init__(self, creators, energy_ele_weight, gradient_ele_weight, hessian_ele_weight, - hessian_weight, dihedral_weight): - self.creators = creators - self.energy_weight = energy_ele_weight - self.gradient_weight = gradient_ele_weight - self.hessian_weight = hessian_ele_weight - # - self._hessian_weight = hessian_weight - self._dihedral_weight = dihedral_weight - - def add_hessians(self, hessout): - """add hessian""" - self.creators['hessian'] = HessianOutput(self._hessian_weight, hessout) - - def add_dihedrals(self, scans): - """add computed dihedrals""" - self.creators['dihedrals'] = DihedralOutput(self._dihedral_weight, scans) - - def register(self, name, creator): - """add a new creator""" - if not isinstance(creator, CostumStructureCreator): - raise ValueError(f"Can not register '{type(creator)}' " - "only register CostumStructureCreator instances") - if name == 'dihedrals': - creator.weight = self._dihedral_weight - elif name == 'hessian': - creator.weight = self._hessian_weight - self.creators[name] = creator - - @classmethod - def from_config(cls, config): - creators = {} - for name, clss in AdditionalStructureCreator.classes().items(): - _cls = clss.from_config(getattr(config, name)) - if _cls is not None: - creators[name] = _cls - return cls(creators, config.energy_element_weights, - config.gradient_element_weights, config.hessian_element_weights, - config.hessian_weight, config.dihedral_weight) - - @classmethod - def _extend_user_input(cls, questions): - for name, clss in AdditionalStructureCreator.classes().items(): - questions.generate_block(name, clss.colt_user_input) - - def create(self, qm): - parent = qm.pathways.getdir("addstruct", create=True) - - for i, (name, creator) in enumerate(self.creators.items()): - folder = parent / f'{i}_{creator.name}' - os.makedirs(folder, exist_ok=True) - creator.folder = folder - - # prestep - for name, creator in self.creators.items(): - creator.setup_pre(qm) - - for name, creator in self.creators.items(): - cal = creator.check(prestep=False) - if cal is not None: - qm.logger.exit(f"Required output file(s) not found in '{cal.folder}' .\n" - 'Creating the necessary input file and exiting...\nPlease run the ' - 'calculation and put the output files in the same directory.\n' - 'Necessary output files and the corresponding extensions ' - f"are:\n{cal.missing_as_string()}\n\n\n") - - for name, creator in self.creators.items(): - creator.parse_pre(qm) - - # mainstep - for name, creator in self.creators.items(): - creator.setup_main(qm) - - for name, creator in self.creators.items(): - cal = creator.check_main() - if cal is not None: - qm.logger.exit(f"Required output file(s) not found in '{cal.folder}' .\n" - 'Creating the necessary input file and exiting...\nPlease run the ' - 'calculation and put the output files in the same directory.\n' - 'Necessary output files and the corresponding extensions ' - f"are:\n{cal.missing_as_string()}\n\n\n") - - for name, creator in self.creators.items(): - creator.parse_main(qm) - - def _get_lowest_energy_and_coords(self): - minimum = np.inf - coords = None - - for creator in self.creators.values(): - for calctype in [creator.enouts(), creator.gradouts(), creator.hessouts()]: - if calctype: - argmin = np.argmin((out.energy for out in calctype)) - lowest = calctype[argmin].energy - if lowest < minimum: - minimum = lowest - coords = calctype[argmin].coords - return minimum, coords - - def normalize(self): - """set all energies to the minimum one""" - emin, coords = self._get_lowest_energy_and_coords() - self.subtract_energy(emin) - return emin, coords - - def subtract_energy(self, energy): - """subtract energy from all qm energies, forces and hessian are not affected""" - for creator in self.creators.values(): - for out in creator.enouts(): - out.energy -= energy - for out in creator.gradouts(): - out.energy -= energy - for out in creator.hessouts(): - out.energy -= energy - - def enitr(self): - for creator in self.creators.values(): - weight = creator.weight * self.energy_weight - for out in creator.enouts(): - yield weight, out - - def graditr(self): - for creator in self.creators.values(): - weight = creator.weight * self.gradient_weight - for out in creator.gradouts(): - yield weight, out - - def hessitr(self): - for creator in self.creators.values(): - weight = creator.weight * self.hessian_weight - for out in creator.hessouts(): - yield weight, out diff --git a/qforce/cli.py b/qforce/cli.py index 318745e..8073d16 100644 --- a/qforce/cli.py +++ b/qforce/cli.py @@ -67,6 +67,7 @@ def __init__(self, config, job): @classmethod def from_config(cls, _config): + print("config = ", _config) config, job = initialize(_config) return cls(config, job) diff --git a/qforce/initialize.py b/qforce/initialize.py index cf64411..0a2f44f 100644 --- a/qforce/initialize.py +++ b/qforce/initialize.py @@ -6,8 +6,8 @@ from colt import Colt from calkeeper import CalculationKeeper # -from .additionalstructures import AdditionalStructures -from .qm.qm import QM, implemented_qm_software, calculators +from .schemes import Computations +from .qm.qm import QM, calculators from .forcefield.forcefield import ForceField from .dihedral_scan import DihedralScan from .pathkeeper import Pathways @@ -36,19 +36,7 @@ def _extend_user_input(cls, questions): questions.generate_block("qm", QM.colt_user_input) questions.generate_block("ff", ForceField.colt_user_input) questions.generate_block("scan", DihedralScan.colt_user_input) - questions.generate_cases("software", {key: software.colt_user_input for key, software in - implemented_qm_software.items()}, block='qm') - questions.generate_cases("preopt", {key: software.colt_user_input for key, software in - implemented_qm_software.items()}, block='qm') - questions.generate_cases("scan_software", {key: software.colt_user_input - for key, software in - implemented_qm_software.items()}, - block='qm') - questions.generate_cases("charge_software", {key: software.colt_user_input - for key, software - in implemented_qm_software.items()}, - block='qm') - questions.generate_block("addstructs", AdditionalStructures.colt_user_input) + questions.generate_block("addstructs", Computations.colt_user_input) # ff terms for name, ffcls in ForceField.implemented_md_software.items(): questions.generate_block(name, ffcls.get_questions(), block='ff') diff --git a/qforce/main.py b/qforce/main.py index 39b2734..04e697a 100755 --- a/qforce/main.py +++ b/qforce/main.py @@ -1,30 +1,47 @@ +from io import StringIO +# +from ase.io import read, write from calkeeper import CalculationKeeper, CalculationIncompleteError - +# from .initialize import initialize from .qm.qm import QM -from .qm.qm_base import HessianOutput from .forcefield.forcefield import ForceField from .molecule import Molecule -from .fragment import fragment -from .no_fragment_scanning import do_nofrag_scanning -from .dihedral_scan import DihedralScan from .frequencies import calc_qm_vs_md_frequencies from .fit import multi_fit from .charge_flux import fit_charge_flux from .misc import LOGO from .logger import LoggerExit -from .additionalstructures import AdditionalStructures +# +from .schemes import Computations, HessianCreator, PreoptCreator, DihedralCreator -def runjob(config, job, ext_q=None, ext_lj=None): - qm = QM(job, config.qm) - qm.preopt() - qm_hessian_out = qm.get_hessian() - main_hessian = qm_hessian_out[0] +CRESTPREOPT = StringIO(''' +preopt = crest +software = xtb +''') - structs = AdditionalStructures.from_config(config.addstructs) - structs.create(qm) - structs.add_hessians(qm_hessian_out) + +def save_molecule(filename, molecule, comment=None): + write(filename, molecule, plain=True, comment=comment) + + +def runjob2(config, job, ext_q=None, ext_lj=None): + # setup the xtb preopt using crest + xtb = QM.from_questions(job, config=CRESTPREOPT, check_only=True) + # read initial structure, ase molecule structure + molecule = read(job.coord_file) + # run preopt + preopt = PreoptCreator(molecule) + preopt.run(xtb) + molecule = preopt.molecule() + # setup hessian calculation + save_molecule(xtb.pathways.get('init.xyz'), molecule, + comment=f'{job.name} - input geometry for hessian') + # get main hessian + hessian = HessianCreator(molecule) + hessian.run(xtb) + main_hessian = hessian.main_hessian() ffcls = ForceField.implemented_md_software.get(config.ff.output_software, None) if ffcls is None: @@ -32,9 +49,52 @@ def runjob(config, job, ext_q=None, ext_lj=None): mol = Molecule(config, job, main_hessian, ffcls, ext_q, ext_lj) - scans = do_nofrag_scanning(mol, qm, job, config) - structs.add_dihedrals(scans) + structs = Computations.from_config(config.addstructs) + structs.register('dihedrals', DihedralCreator(mol, job, config)) + # setup the actual qm calculations + qm = QM(job, config.qm) + # do all additional calculations + structs.run(qm) + # register hessian after structs were run! + structs.register('hessian', hessian) + # + mol.qm_minimum_energy, mol.qm_minimum_coords = structs.normalize() + + md_hessian = multi_fit(job.logger, config.terms, mol, structs) + + return mol, structs + +def runjob(config, job, ext_q=None, ext_lj=None): + # setup qm calculation + qm = QM(job, config.qm) + # read initial structure, ase molecule structure + molecule = read(job.coord_file) + # run preopt + preopt = PreoptCreator(molecule) + preopt.run(qm) + molecule = preopt.molecule() + # setup hessian calculation + save_molecule(qm.pathways.get('init.xyz'), molecule, + comment=f'{job.name} - input geometry for hessian') + # get main hessian + hessian = HessianCreator(molecule) + hessian.run(qm) + main_hessian = hessian.main_hessian() + + ffcls = ForceField.implemented_md_software.get(config.ff.output_software, None) + if ffcls is None: + raise ValueError(f"Forcefield '{config.ff.output_software}' unknown!") + + mol = Molecule(config, job, main_hessian, ffcls, ext_q, ext_lj) + + structs = AdditionalStructures.from_config(config.addstructs) + structs.register('dihedrals', DihedralCreator(mol, job, config)) + # do all additional calculations + structs.run(qm) + # register hessian after structs were run! + structs.register('hessian', hessian) + # mol.qm_minimum_energy, mol.qm_minimum_coords = structs.normalize() md_hessian = multi_fit(job.logger, config.terms, mol, structs) diff --git a/qforce/no_fragment_scanning.py b/qforce/no_fragment_scanning.py deleted file mode 100644 index 2ab64eb..0000000 --- a/qforce/no_fragment_scanning.py +++ /dev/null @@ -1,125 +0,0 @@ -from calkeeper import CalculationKeeper, CalculationIncompleteError -import os -import numpy as np -# -from .fragment import check_and_notify -from .logger import LoggerExit -from .forces import get_dihed -from .qm.qm_base import GradientOutput - - -def do_nofrag_scanning(mol, qm, job, config): - scans = [] - unique_dihedrals = {} - - print(mol.terms['dihedral/flexible']) - - if 'dihedral/flexible' not in mol.terms or len(mol.terms['dihedral/flexible']) == 0 or not config.scan.do_scan: - return scans - - os.makedirs(config.scan.frag_lib, exist_ok=True) - scan_dir = job.pathways.getdir('fragments', create=True) - - for term in mol.terms['dihedral/flexible']: - dih_type = mol.topo.edge(term.atomids[1], term.atomids[2])['vers'] - if dih_type not in unique_dihedrals: - unique_dihedrals[dih_type] = term.atomids - - generated = [] # Number of fragments generated but not computed - error = '' - for name, atomids in unique_dihedrals.items(): - try: - scan = Scan(job, config, mol, qm, atomids, name) - if scan.has_data: - scans.extend(convert_to_grad_out(scan)) - elif config.scan.batch_run and scan.has_inp: - generated.extend(convert_to_grad_out(scan)) - except (CalculationIncompleteError, LoggerExit): - # ignore these errors, checking is done in check_and_notify! - pass - - n_scan_steps = int(np.ceil(360/config.qm.scan_step_size)) - - check_and_notify(job, config.scan, len(unique_dihedrals), len(scans)//n_scan_steps, len(generated)//n_scan_steps) - - return scans - - -def convert_to_grad_out(scan): - grad_outs = [] - for i in range(len(scan.coords)): - grad_out = GradientOutput(scan.energies[i], scan.forces[i], scan.dipoles[i], scan.atomids, scan.coords[i]) - grad_outs.append(grad_out) - return grad_outs - - -class Scan(): - def __init__(self, job, config, mol, qm, scanned_atomids, name): - self.central_atoms = tuple(scanned_atomids[1:3]) - self.scanned_atomids = scanned_atomids - self.atomids = mol.atomids - self.opt_coords = mol.coords - self.charge = mol.charge - self.multiplicity = mol.multiplicity - self.equil_angle = np.degrees(get_dihed(self.opt_coords[self.scanned_atomids])[0]) - self.name = name - self.n_atoms = len(self.atomids) - self.has_data = False - self.has_inp = False - - self.software = qm.get_scan_software() - self.hash = self.make_hash() - self.folder = job.pathways.getdir('frag', self.hash, create=True) - self.calc = self.set_calc(config, job) - - self.energies = [] - self.qcoords = [] - self.forces = [] - self.dipoles = [] - - self.check_for_qm_data(job, config, mol, qm) - - def make_hash(self): - atomids = '_'.join((self.scanned_atomids+1).astype(dtype=str)) + '_' - hash = self.software.hash(self.charge, self.multiplicity) - return atomids + hash - - def set_calc(self, config, job): - if config.qm.dihedral_scanner == 'torsiondrive': - calc = job.Calculation(f'{self.hash}_torsiondrive.inp', - self.software.required_scan_torsiondrive_files, - folder=self.folder, software='torsiondrive') - elif config.qm.dihedral_scanner == 'relaxed_scan': - calc = job.Calculation(f'{self.hash}.inp', self.software.required_scan_files, - folder=self.folder, software=self.software.name) - else: - raise ValueError("scanner can only be 'torsiondrive' or 'relaxed_scan'") - - return calc - - def check_for_qm_data(self, job, config, mol, qm): - files = [f for f in os.listdir(self.folder) if self.hash in f and f.endswith(('log', 'out'))] - if files: - self.has_data = True - qm_out = qm.read_scan(self.folder, files) - qm_out = qm.do_scan_sp_calculations(self.folder, self.hash, qm_out, mol.atomids) - - self.energies = qm_out.energies - self.forces = qm_out.forces - self.coords = qm_out.coords - self.dipoles = qm_out.dipoles - - if qm_out.mismatch: - if config.scan.avail_only: - job.logger.info('"\navail_only" requested, attempting to continue with ' - 'the missing points...\n\n') - else: - job.logger.exit('Exiting...\n\n') - - if not self.has_data: - self.make_qm_input(qm) - - def make_qm_input(self, qm): - with open(self.calc.inputfile, 'w') as file: - qm.write_scan(file, self.hash, self.opt_coords, self.atomids, self.scanned_atomids+1, - self.equil_angle, self.charge, self.multiplicity) diff --git a/qforce/pathkeeper.py b/qforce/pathkeeper.py index 2a6cee0..dc2a92d 100644 --- a/qforce/pathkeeper.py +++ b/qforce/pathkeeper.py @@ -110,6 +110,7 @@ class Pathways: 'preopt': '0_preopt', 'hessian': '1_hessian', 'addstruct': '5_additional', + 'hessian_new': 'hessian_new', 'hessian_charge': ['hessian', 'charge'], 'hessian_energy': ['hessian', '${idx}_en_conformer'], 'hessian_gradient': ['hessian', '${idx}_grad_conformer'], @@ -173,6 +174,10 @@ def charge_filename(self, software, charge, mult): base, ending = self.basename(software, charge, mult) return f'{base}_charge.{ending}' + def scan_filename(self, software, charge, mult): + base, ending = self.basename(software, charge, mult) + return f'{base}_sp.{ending}' + def scan_sp_filename(self, software, charge, mult, i): base, ending = self.basename(software, charge, mult) return f'{base}_sp_step_{i:02d}.{ending}' diff --git a/qforce/qm/qm.py b/qforce/qm/qm.py index 6557149..9fac8a9 100644 --- a/qforce/qm/qm.py +++ b/qforce/qm/qm.py @@ -15,6 +15,7 @@ from .xtb import xTB, XTBGaussian, xTBCalculator from .qm_base import HessianOutput, ScanOutput, Calculator from .qm_base import EnergyOutput, GradientOutput +from ..forces import get_dihed class TorsiondriveCalculator(Calculator): @@ -90,10 +91,6 @@ class QM(Colt): # Use the internal relaxed scan method of the QM software or the Torsiondrive method using xTB dihedral_scanner = relaxed_scan :: str :: [relaxed_scan, torsiondrive] -[addstructures] -en_struct = :: existing_file, optional -grad_struct = :: existing_file, optional -hess_struct = :: existing_file, optional """ _method = ['scan_step_size', 'dihedral_scanner'] @@ -106,6 +103,27 @@ def __init__(self, job, config): # check hessian files and if not present write the input file self.method = self._register_method() + @classmethod + def from_config(cls, config, job): + res = SimpleNamespace(**{key: value for key, value in config.items()}) + return cls(job, res) + + + @classmethod + def _extend_user_input(cls, questions): + questions.generate_cases("software", {key: software.colt_user_input for key, software in + implemented_qm_software.items()}) + questions.generate_cases("preopt", {key: software.colt_user_input for key, software in + implemented_qm_software.items()}) + questions.generate_cases("scan_software", {key: software.colt_user_input + for key, software in + implemented_qm_software.items()}, + ) + questions.generate_cases("charge_software", {key: software.colt_user_input + for key, software + in implemented_qm_software.items()}, + ) + def get_scan_software(self): return self.softwares['scan_software'] @@ -120,6 +138,9 @@ def hessian_charge_name(self, software): def charge_name(self, software): return self.pathways.charge_filename(software, self.config.charge, self.config.multiplicity) + def scan_name(self, software): + return self.pathways.scan_filename(software, self.config.charge, + self.config.multiplicity) def scan_sp_name(self, software, i): return self.pathways.scan_sp_filename(software, self.config.charge, @@ -183,6 +204,41 @@ def do_sp_calculations(self, parent, iterator): en_calcs.append(calculation) return en_calcs + def charge_software(self): + software = self.softwares['charge_software'] + if software is None: + software = self.softwares['software'] + return software + + def setup_charge_calculation(self, folder, coords, atnums): + """Setup charge calculation""" + software = self.charge_software() + + calculation = self.Calculation(self.hessian_charge_name(software), + software.read.charge_files, + folder=folder, + software=software.name) + + if not calculation.input_exists(): + with open(calculation.inputfile, 'w') as file: + self.write_charge(file, calculation.base, output.coords, + output.atomids, charge_software) + return calculation + + def setup_hessian(self, folder, coords, atnums): + """Setup hessian calculation""" + + software = self.softwares['software'] + + calculation = self.Calculation(self.hessian_name(software), + software.read.hessian_files, + folder=folder, + software=software.name) + if not calculation.input_exists(): + with open(calculation.inputfile, 'w') as file: + self.write_hessian(file, calculation.base, coords, atnums) + return calculation + def get_hessian(self): """Setup hessian files, and if present read the hessian information""" @@ -254,6 +310,67 @@ def get_hessian(self): # return results + def setup_scan_calculation(self, folder, scan_hash, scanned_atomids, coords, atomids): + software = self.softwares['scan_software'] + + charge = self.config.charge + mult = self.config.multiplicity + + if self.config.dihedral_scanner == 'relaxed_scan': + calc = self.Calculation(f'{scan_hash}.inp', + software.required_scan_files, + folder=folder, + software=software.name) + elif self.config.dihedral_scanner == 'torsiondrive': + calc = self.Calculation(f'{scan_hash}_torsiondrive.inp', + software.required_scan_torsiondrive_files, + folder=folder, + software='torsiondrive') + else: + raise ValueError("scanner can only be 'torsiondrive' or 'relaxed_scan'") + + + if not calc.input_exists(): + equil_angle = np.degrees(get_dihed(coords[scanned_atomids])[0]) + with open(calc.inputfile, 'w') as file: + self.write_scan(file, scan_hash, coords, atomids, scanned_atomids+1, + equil_angle, charge, mult) + return calc + + def read_charges(self, charge_files): + software = self.charge_software() + point_charges = charge_software.read.charges(self.config, **charge_files) + return point_charges[software.config.charge_method] + + def read_hessian(self, hessian_files): + software = self.softwares['software'] + qm_out = software.read.hessian(self.config, **hessian_files) + if 'fchk_file' in hessian_files: + fchk_file = hessian_files['fchk_file'] + else: + fchk_file = None + return HessianOutput(self.config.vib_scaling, fchk_file, *qm_out) + + def read_scan_data(self, files): + software = self.softwares['scan_software'] + + if self.config.dihedral_scanner == 'relaxed_scan': + out = software.read.scan_new(self.config, **files) + elif self.config.dihedral_scanner == 'torsiondrive': + out = software.read.scan_torsiondrive(self.config, **files) + + qm_out = self._get_unique_scan_points([out]) + + + files = list(files.values()) + if len(files) == 0: + file = 'unknown' + else: + file = files[0] + + n_scan_steps = int(np.ceil(360/self.config.scan_step_size)) + return ScanOutput(file, n_scan_steps, *qm_out) + def read_scan(self, folder, files): software = self.softwares['scan_software'] qm_outs = [] @@ -268,6 +385,28 @@ def read_scan(self, folder, files): return ScanOutput(file, n_scan_steps, *qm_out) + def setup_scan_sp_calculations(self, parent, scan_out, atnums): + """do scan sp calculations if necessary and update the scan out""" + software = self.softwares['software'] + + calculations = [] + for i in range(scan_out.n_steps): + folder = parent / f'{i}_conformer' + os.makedirs(folder, exist_ok=True) + # + calc = self.Calculation(self.scan_name(software), + software.read.gradient_files, + folder=folder, + software=software.name) + # + if not calc.input_exists(): + with open(calc.inputfile, 'w') as file: + extra_info = f', scan angle: {scan_out.angles[i]}' + self.write_gradient(file, calc.base, scan_out.coords[i], atnums, extra_info=extra_info) + calculations.append(calc) + + return calculations + def do_scan_sp_calculations(self, parent, scan_id, scan_out, atnums): """do scan sp calculations if necessary and update the scan out""" software = self.softwares['software'] @@ -319,23 +458,6 @@ def do_scan_sp_calculations(self, parent, scan_id, scan_out, atnums): scan_out.coords = np.array(coords, dtype=np.float64) return scan_out - def xtb_md(self, folder, xtbinput): - initfile = self.pathways.getfile('init.xyz') - copy(initfile, folder / 'xtb.xyz') - with open(folder / 'md.inp', 'w') as fh: - fh.write("$md\n") - for key, value in xtbinput.items(): - fh.write(f"{key} = {value}\n") - fh.write("$end\n") - - with open(folder / 'xtb.inp', 'w') as fh: - fh.write("xtb xtb.xyz --input md.inp --md") - calc = self.Calculation('xtb.inp', - {'traj': ['xtb.trj']}, - folder=folder, - software='xtb') - return calc - def write_preopt(self, file, job_name, coords, atnums): software = self.softwares['preopt'] software.write.opt(file, job_name, self.config, coords, atnums) @@ -445,6 +567,19 @@ def _get_unique_scan_points(self, qm_outs): return n_atoms, all_coords, all_angles, all_energies, all_dipoles, point_charges + def setup_preopt(self, folder, coords, atnums): + software = self.softwares['preopt'] + # setup calculation + calculation = self.Calculation(self.preopt_name(software), + software.read.opt_files, + folder=folder, + software=software.name) + if not calculation.input_exists(): + # + with open(calculation.inputfile, 'w') as file: + self.write_preopt(file, calculation.base, coords, atnums) + return calculation + def preopt(self): molecule, coords, atnums = self._read_coord_file(self.job.coord_file) software = self.softwares['preopt'] @@ -476,7 +611,7 @@ def preopt(self): 'Creating the necessary input file and exiting...\nPlease run the ' 'calculation and put the output files in the same directory.\n') # - coords = self._read_opt(preopt_files) + coords = self.read_opt(preopt_files) molecules = [] for coord in coords: mol = deepcopy(molecule) diff --git a/qforce/qm/xtb.py b/qforce/qm/xtb.py index c1fe3ba..e682534 100644 --- a/qforce/qm/xtb.py +++ b/qforce/qm/xtb.py @@ -186,7 +186,7 @@ def _commands(self, filename, basename, ncores): class WritexTB(WriteABC): - def gradient(self, file, job_name, settings, coords, atnums): + def gradient(self, file, job_name, settings, coords, atnums, extra_info=False): """ Write the input file for sp gradient Parameters @@ -428,7 +428,11 @@ class ReadxTB(ReadABC): charge_files = {'pc_file': ['${base}.charges']} - scan_files = {'file_name': ['${base}.xtbscan.log']} + scan_files = { + 'charge_file': ['${base}.charges'], + 'scan_log': ['${base}.xtbscan.log'], + 'input_dat': ['${base}.dat'], + } def opt(self, config, coord_file): _, elements, coords = self._read_xtb_xyz(coord_file) @@ -552,6 +556,51 @@ def gradient(self, config, grad_file, xyz_file): dipol = None return energy, grad, dipol, molecule.get_atomic_numbers(), molecule.get_positions() + def scan_new(self, config, charge_file, scan_log, input_dat): + """ Read data from the scan file. + + Parameters + ---------- + config : config + A configparser object with all the parameters. + + charge_file : string + File name of the xTB charges. + + scan_log : string + File name of the xTB scan log + + input_dat : string + input of the dihedral scan + + Returns + ------- + n_atoms : int + The number of atoms in the molecule. + coords : list + A list of array of float. The list has the length of the number + of steps and the array has the shape of (n_atoms, 3). + angles : list + A list (length: steps) of the angles that is being scanned. + energies : list + A list (length: steps) of the energy. + point_charges : dict + A dictionary with key in charge_method and the value to be a + list of float of the size of n_atoms. + """ + point_charges = {} + n_atoms, charges = self._read_xtb_charge(charge_file) + point_charges["xtb"] = charges + + elements, energies, coords = self._read_xtb_scan_log(scan_log) + + angles = self._read_xtb_input_angle(input_dat) + + energies = np.array(energies) * Hartree * mol / kJ + dipoles = [[None, None, None] for _ in angles] + + return n_atoms, coords, angles, energies, dipoles, point_charges + def scan(self, config, file_name): """ Read data from the scan file. diff --git a/qforce/schemes/__init__.py b/qforce/schemes/__init__.py new file mode 100644 index 0000000..67b8590 --- /dev/null +++ b/qforce/schemes/__init__.py @@ -0,0 +1,8 @@ +from .computations import Computations +from .additionalstructures import AdditionalStructureCreator +# +from .creator import CostumStructureCreator +# +from .hessian import HessianCreator +from .preopt import PreoptCreator +from .no_fragment_scanning import DihedralCreator diff --git a/qforce/schemes/additionalstructures.py b/qforce/schemes/additionalstructures.py new file mode 100644 index 0000000..2427cdd --- /dev/null +++ b/qforce/schemes/additionalstructures.py @@ -0,0 +1,285 @@ +import os +from shutil import copy2 as copy +from itertools import chain +# +from ase.io import read +from calkeeper import CalculationIncompleteError +from .creator import CalculationStorage, CostumStructureCreator + + +class AdditionalStructureCreator(CostumStructureCreator): + + name = None + __classes = {} + __predefined = ['dihedrals', 'hessian'] + _user_input = "" + + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + if cls.name in cls.__predefined: + raise ValueError(f"Cannot create class with name {cls.name}") + cls.__classes[cls.name] = cls + cls._user_input += ("\n# Weight of the structures in the forcefield fit" + "\n weight = 1 :: float \n") + + @classmethod + def classes(cls): + return cls.__classes + + @classmethod + def get(cls, name): + return cls.__classes.get(name) + + def setup_main(self, qm): + raise NotImplementedError + + def check_main(self): + raise NotImplementedError + + def parse_main(self, qm): + raise NotImplementedError + + def enouts(self): + raise NotImplementedError + + def gradouts(self): + raise NotImplementedError + + def hessouts(self): + raise NotImplementedError + + +class StructuresFromFile(AdditionalStructureCreator): + + name = 'fromfile' + + _user_input = """ + en_struct = :: existing_file, optional + grad_struct = :: existing_file, optional + hess_struct = :: existing_file, optional + """ + + def __init__(self, weight, en_struct, grad_struct, hess_struct): + super().__init__(weight) + self._energy = CalculationStorage(file=en_struct) + self._grad = CalculationStorage(file=grad_struct) + self._hess = CalculationStorage(file=hess_struct) + + @classmethod + def from_config(cls, config): + if not config['en_struct'] and not config['grad_struct'] and not config['hess_struct']: + return None + return cls(config['weight'], config['en_struct'], config['grad_struct'], + config['hess_struct']) + + @staticmethod + def _fileiter(filename): + molecules = read(filename, index=':') + for i, molecule in enumerate(molecules): + yield i, (molecule.get_positions(), molecule.get_atomic_numbers()) + + def setup_main(self, qm): + parent = self.folder + + if self._energy['file'] is not None: + folder = parent / 'en_structs' + os.makedirs(folder, exist_ok=True) + self._energy.calculations = qm.do_sp_calculations(folder, + self._fileiter(self._energy['file'])) + if self._grad['file'] is not None: + folder = parent / 'grad_structs' + os.makedirs(folder, exist_ok=True) + self._grad.calculations = qm.do_grad_calculations(folder, + self._fileiter(self._grad['file'])) + if self._hess['file'] is not None: + folder = parent / 'hess_structs' + os.makedirs(folder, exist_ok=True) + self._hess.calculations = qm.do_hessian_calculations(folder, + self._fileiter(self._hess['file'])) + + def check_main(self): + for calculation in chain(self._energy.calculations, self._grad.calculations, + self._hess.calculations): + try: + _ = calculation.check() + except CalculationIncompleteError: + return calculation + return None + + def parse_main(self, qm): + results = [] + for calculation in self._hess.calculations: + hessian_files = calculation.check() + results.append(qm.read_hessian(hessian_files)) + + en_results = [] + for calculation in self._energy.calculations: + files = calculation.check() + en_results.append(qm.read_energy(files)) + + grad_results = [] + for calculation in self._grad.calculations: + files = calculation.check() + grad_results.append(qm.read_gradient(files)) + + self._energy.results = en_results + self._grad.results = grad_results + self._hess.results = results + + def enouts(self): + return self._energy.results + + def gradouts(self): + return self._grad.results + + def hessouts(self): + return self._hess.results + + +class MetaDynamics(AdditionalStructureCreator): + + name = 'metadynamics' + + _user_input = """ + # What to compute: + # none: do not do metadynamics, default + # en: do metadynamics and do single point energy calculations + # grad: do metadynamics and do single point energy+ gradient calculations + # + compute = none :: str :: [none, en, grad] + # Number of frames used for fitting + n_fit = 100 :: int + # Number of frames used for validation + n_valid = 0 :: int + # temperature (K) + temp = 800 :: float :: >0 + # interval for trajectory printout (fs) + dump = 500.0 :: float :: >0 + # time step for propagation (fs) + step = 2.0 :: float :: >0 + # Bond constraints (0: none, 1: H-only, 2: all) + shake = 0 + """ + + def __init__(self, weight, compute, config): + self.compute = compute + self.weight = weight + self.n_fit = config['n_fit'] + self.n_valid = config['n_valid'] + self.total_frames = self.n_fit + self.n_valid + + time = config['dump'] * self.total_frames / 1e3 + + self.xtbinput = {'time': time} + for item in ['temp', 'dump', 'step', 'shake']: + self.xtbinput[item] = config[item] + + self._energy = CalculationStorage() + self._grad = CalculationStorage() + # not used so far + self._hess = CalculationStorage() + self._md = CalculationStorage() + + @classmethod + def from_config(cls, config): + if config['compute'] == 'none': + return None + return cls(config['weight'], config['compute'], config) + + @staticmethod + def _fileiter(filename): + molecules = read(filename, index=':') + for i, molecule in enumerate(molecules): + yield i, (molecule.get_positions(), molecule.get_atomic_numbers()) + + def setup_pre(self, qm): + """setup md calculation""" + folder = self.folder / '0_md' + os.makedirs(folder, exist_ok=True) + + # setup + initfile = qm.pathways.getfile('init.xyz') + copy(initfile, folder / 'xtb.xyz') + with open(folder / 'md.inp', 'w') as fh: + fh.write("$md\n") + for key, value in self.xtbinput.items(): + fh.write(f"{key} = {value}\n") + fh.write("$end\n") + + with open(folder / 'xtb.inp', 'w') as fh: + fh.write("xtb xtb.xyz --input md.inp --md") + + # + calc = self.Calculation('xtb.inp', + {'traj': ['xtb.trj']}, + folder=folder, + software='xtb') + self._md.calculations = [calc] + + def check_pre(self): + """check that the md calculation was finished""" + for calc in self._md.calculations: + try: + _ = calc.check() + except CalculationIncompleteError: + return calc + return None + + def parse_pre(self, qm): + files = self._md.calculations[0].check() + traj = files['traj'] + filename = self.folder / 'xtbmd.xyz' + copy(traj, filename) + self._md.results = [{'file': filename}] + + def setup_main(self, qm): + # setupt + parent = self.folder + # currently only one result there + filename = self._md.results[0]['file'] + + traj = self._fileiter(filename) + + if self.compute == 'en': + folder = parent / '1_en_structs' + os.makedirs(folder, exist_ok=True) + self._energy.calculations = qm.do_sp_calculations(folder, traj) + elif self.compute == 'grad': + folder = parent / '1_grad_structs' + os.makedirs(folder, exist_ok=True) + self._grad.calculations = qm.do_grad_calculations(folder, traj) + else: + raise ValueError("do not know compute method!") + + def check_main(self): + for calculation in chain(self._energy.calculations, self._grad.calculations): + try: + _ = calculation.check() + except CalculationIncompleteError: + return calculation + return None + + def parse_main(self, qm): + en_results = [] + for i, calculation in enumerate(self._energy.calculations): + files = calculation.check() + if i < self.n_fit: + en_results.append(qm.read_energy(files)) + + grad_results = [] + for i, calculation in enumerate(self._grad.calculations): + files = calculation.check() + if i < self.n_fit: + grad_results.append(qm.read_gradient(files)) + + self._energy.results = en_results + self._grad.results = grad_results + + def enouts(self): + return self._energy.results + + def gradouts(self): + return self._grad.results + + def hessouts(self): + return self._hess.results diff --git a/qforce/schemes/computations.py b/qforce/schemes/computations.py new file mode 100644 index 0000000..41e3f3c --- /dev/null +++ b/qforce/schemes/computations.py @@ -0,0 +1,205 @@ +import os +# +from colt import Colt +import numpy as np +# +from .creator import CostumStructureCreator +from .additionalstructures import AdditionalStructureCreator + + + +class Computations(Colt): + + _user_input = """ + energy_element_weights = 15000 :: float + gradient_element_weights = 100 :: float + hessian_element_weights = 0.1 :: float + + hessian_weight = 1 :: float + dihedral_weight = 1 :: float + """ + + def __init__(self, creators, energy_ele_weight, gradient_ele_weight, hessian_ele_weight, + hessian_weight, dihedral_weight): + self.creators = creators + self.energy_weight = energy_ele_weight + self.gradient_weight = gradient_ele_weight + self.hessian_weight = hessian_ele_weight + # + self._hessian_weight = hessian_weight + self._dihedral_weight = dihedral_weight + + def add_hessians(self, hessout): + """add hessian""" + self.creators['hessian'] = HessianOutput(self._hessian_weight, hessout) + + def add_dihedrals(self, scans): + """add computed dihedrals""" + self.creators['dihedrals'] = DihedralOutput(self._dihedral_weight, scans) + + def register(self, name, creator): + """add a new creator""" + if not isinstance(creator, CostumStructureCreator): + raise ValueError(f"Can not register '{type(creator)}' " + "only register CostumStructureCreator instances") + if name == 'dihedrals': + creator.weight = self._dihedral_weight + elif name == 'hessian': + creator.weight = self._hessian_weight + self.creators[name] = creator + + @classmethod + def from_config(cls, config): + creators = {} + for name, clss in AdditionalStructureCreator.classes().items(): + _cls = clss.from_config(getattr(config, name)) + if _cls is not None: + creators[name] = _cls + return cls(creators, config.energy_element_weights, + config.gradient_element_weights, config.hessian_element_weights, + config.hessian_weight, config.dihedral_weight) + + @classmethod + def _extend_user_input(cls, questions): + for name, clss in AdditionalStructureCreator.classes().items(): + questions.generate_block(name, clss.colt_user_input) + + def run(self, qm): + parent = qm.pathways.getdir("addstruct", create=True) + + for i, (name, creator) in enumerate(self.creators.items()): + folder = parent / f'{creator.name}' + os.makedirs(folder, exist_ok=True) + creator.folder = folder + + # prestep + for name, creator in self.creators.items(): + creator.setup_pre(qm) + + for name, creator in self.creators.items(): + cal = creator.check_pre() + if cal is not None: + qm.logger.exit(f"Required output file(s) not found in '{cal.folder}' .\n" + 'Creating the necessary input file and exiting...\nPlease run the ' + 'calculation and put the output files in the same directory.\n' + 'Necessary output files and the corresponding extensions ' + f"are:\n{cal.missing_as_string()}\n\n\n") + + for name, creator in self.creators.items(): + creator.parse_pre(qm) + + # mainstep + for name, creator in self.creators.items(): + creator.setup_main(qm) + + for name, creator in self.creators.items(): + cal = creator.check_main() + if cal is not None: + qm.logger.exit(f"Required output file(s) not found in '{cal.folder}' .\n" + 'Creating the necessary input file and exiting...\nPlease run the ' + 'calculation and put the output files in the same directory.\n' + 'Necessary output files and the corresponding extensions ' + f"are:\n{cal.missing_as_string()}\n\n\n") + + for name, creator in self.creators.items(): + creator.parse_main(qm) + + def _get_lowest_energy_and_coords(self): + minimum = np.inf + coords = None + + for creator in self.creators.values(): + for calctype in [creator.enouts(), creator.gradouts(), creator.hessouts()]: + if calctype: + argmin = np.argmin((out.energy for out in calctype)) + lowest = calctype[argmin].energy + if lowest < minimum: + minimum = lowest + coords = calctype[argmin].coords + return minimum, coords + + def normalize(self): + """set all energies to the minimum one""" + emin, coords = self._get_lowest_energy_and_coords() + self.subtract_energy(emin) + return emin, coords + + def subtract_energy(self, energy): + """subtract energy from all qm energies, forces and hessian are not affected""" + for creator in self.creators.values(): + for out in creator.enouts(): + out.energy -= energy + for out in creator.gradouts(): + out.energy -= energy + for out in creator.hessouts(): + out.energy -= energy + + def enitr(self): + for creator in self.creators.values(): + weight = creator.weight * self.energy_weight + for out in creator.enouts(): + yield weight, out + + def graditr(self): + for creator in self.creators.values(): + weight = creator.weight * self.gradient_weight + for out in creator.gradouts(): + yield weight, out + + def hessitr(self): + for creator in self.creators.values(): + weight = creator.weight * self.hessian_weight + for out in creator.hessouts(): + yield weight, out + + +class HessianOutput(CostumStructureCreator): + + def __init__(self, weight, hessout): + super().__init__(weight) + if not isinstance(hessout, (tuple, list)): + hessout = [hessout] + self._hessout = hessout + + def enouts(self): + return [] + + def gradouts(self): + return [] + + def hessouts(self): + return self._hessout + + def setup_main(self, qm): + pass + + def check_main(self): + pass + + def parse_main(self, qm): + pass + + +class DihedralOutput(CostumStructureCreator): + + def __init__(self, weight, gradouts): + super().__init__(weight) + self._gradouts = gradouts + + def enouts(self): + return [] + + def gradouts(self): + return self._gradouts + + def hessouts(self): + return [] + + def setup_main(self, qm): + pass + + def check_main(self): + pass + + def parse_main(self, qm): + pass diff --git a/qforce/schemes/creator.py b/qforce/schemes/creator.py new file mode 100644 index 0000000..b441323 --- /dev/null +++ b/qforce/schemes/creator.py @@ -0,0 +1,101 @@ +from colt import Colt + + +class CalculationStorage: + """Basic class to store calculations and calculation results""" + + def __init__(self, calculations=None, results=None, as_dict=False, **metadata): + if as_dict is True: + container = dict + else: + container = list + # + if calculations is None: + calculations = container() + self.calculations = calculations + # + if results is None: + results = container() + self.results = results + # + self._meta = metadata + + def get(self, arg, default=None): + return self._meta.get(arg, default) + + def __getitem__(self, arg): + return self._meta[arg] + + +class CostumStructureCreator(Colt): + """Creator class to generate structures for the fitting procedure + Basic idea to call the creator in this way: + + pre: optional + main: main step, needs to be implemented + + """ + + def __init__(self, weight): + self.weight = weight + # will be overwritten! + self._folder = None + + @property + def folder(self): + if self._folder is None: + raise ValueError("Please setup main folder") + return self._folder + + @folder.setter + def folder(self, value): + self._folder = value + + def run(self, qm): + self.setup_pre(qm) + self.check_pre() + self.parse_pre(qm) + # + self.setup_main(qm) + self.check_main() + self.parse_main(qm) + # + self.setup_post(qm) + self.check_post() + self.parse_post(qm) + + def setup_pre(self, qm): + pass + + def check_pre(self): + pass + + def parse_pre(self, qm): + pass + + def setup_post(self, qm): + pass + + def check_post(self): + pass + + def parse_post(self, qm): + pass + + def setup_main(self, qm): + raise NotImplementedError + + def check_main(self): + raise NotImplementedError + + def parse_main(self, qm): + raise NotImplementedError + + def enouts(self): + raise NotImplementedError + + def gradouts(self): + raise NotImplementedError + + def hessouts(self): + raise NotImplementedError diff --git a/qforce/schemes/hessian.py b/qforce/schemes/hessian.py new file mode 100644 index 0000000..50cdc09 --- /dev/null +++ b/qforce/schemes/hessian.py @@ -0,0 +1,77 @@ +from ase.io import read +from calkeeper import CalculationIncompleteError +# +from .creator import CostumStructureCreator, CalculationStorage + + +class HessianCreator(CostumStructureCreator): + + name = 'hessian' + + def __init__(self, molecule): + self.init_coords = molecule.get_positions() + self.atnums = molecule.get_atomic_numbers() + # + self._hessian = CalculationStorage() + self._charges = CalculationStorage() + + def enouts(self): + return [] + + def gradouts(self): + return [] + + def hessouts(self): + return self._hessian.results + + def main_hessian(self): + if len(self._hessian.results) == 0: + raise ValueError("At least one hessian needs to be provided") + return self._hessian.results[0] + + def setup_pre(self, qm): + # + folder = qm.pathways.getdir("hessian_new", create=True) + self._hessian.calculations.append(qm.setup_hessian(folder, self.init_coords, self.atnums)) + + def check_pre(self): + for calc in self._hessian.calculations: + try: + _ = calc.check() + except CalculationIncompleteError: + return calc + return None + + def parse_pre(self, qm): + results = [] + for calculation in self._hessian.calculations: + hessian_files = calculation.check() + results.append(qm.read_hessian(hessian_files)) + self._hessian.results = results + + def setup_main(self, qm): + # + if qm.softwares['charge_software'] is None: + return + folder = self.pathways.getdir("hessian_charge", create=True) + qm_out = self._hessian.results[0] + + self._charges.calculations.append(qm.setup_charge_calculation(folder, qm_out.coords, qm_out.atomids)) + + def check_main(self): + for calc in self._charges.calculations: + try: + _ = calc.check() + except CalculationIncompleteError: + return calc + return None + + def parse_main(self, qm): + # adjust hessian out + results = [] + for i, calculation in enumerate(self._charges.calculations): + files = calculation.check() + point_charges = qm.read_charges(files) + output = self._hessian.results[i] + output.point_charges = output.check_type_and_shape( + point_charges, 'point_charges', float, (output.n_atoms,)) diff --git a/qforce/schemes/no_fragment_scanning.py b/qforce/schemes/no_fragment_scanning.py new file mode 100644 index 0000000..bafa689 --- /dev/null +++ b/qforce/schemes/no_fragment_scanning.py @@ -0,0 +1,109 @@ +from calkeeper import CalculationIncompleteError +# +from .creator import CostumStructureCreator, CalculationStorage + + +class DihedralCreator(CostumStructureCreator): + + name = 'no_frag_dihedrals' + + def __init__(self, mol, job, config): + # set weight to 0, it has to be set later one + super().__init__(0) + self.atomids = mol.atomids + self.coords = mol.coords + # + self._unique_dihedrals = get_unique_dihedrals(mol, config.scan.do_scan) + self._scans = CalculationStorage(as_dict=True) + self._dihedrals = {name: CalculationStorage() for name in self._unique_dihedrals} + # + self.charge = config.qm.charge + self.multiplicity = config.qm.multiplicity + + def _scan(self, qm, software, scanned_atomids): + scan_hash = '_'.join(tuple(('_'.join((scanned_atomids+1).astype(dtype=str)), + software.hash(self.charge, self.multiplicity)))) + + folder = qm.pathways.getdir('frag', scan_hash, create=True) + + calc = qm.setup_scan_calculation(folder, scan_hash, scanned_atomids, + self.coords, self.atomids) + calc.scan_hash = scan_hash + return calc + + def enouts(self): + return [] + + def gradouts(self): + results = [] + for dihedral in self._dihedrals.values(): + for res in dihedral.results: + results.append(res) + return results + + def hessouts(self): + return [] + + def setup_pre(self, qm): + """setup scans""" + software = qm.get_scan_software() + scans = {} + for name, scanned_atomids in self._unique_dihedrals.items(): + scans[name] = self._scan(qm, software, scanned_atomids) + self._scans.calculations = scans + + def check_pre(self): + for calc in self._scans.calculations.values(): + try: + _ = calc.check() + except CalculationIncompleteError: + return calc + return None + + def parse_pre(self, qm): + results = {} + for name, calculation in self._scans.calculations.items(): + files = calculation.check() + results[name] = qm.read_scan_data(files) + self._scans.results = results + + def setup_main(self, qm): + # + for name, calc in self._scans.calculations.items(): + qm_out = self._scans.results[name] + calcs = qm.setup_scan_sp_calculations(calc.folder, qm_out, self.atomids) + self._dihedrals[name].calculations = calcs + + def check_main(self): + for scan in self._dihedrals.values(): + for calc in scan.calculations: + try: + _ = calc.check() + except CalculationIncompleteError: + return calc + return None + + def parse_main(self, qm): + for dihedral in self._dihedrals.values(): + results = [] + for calculation in dihedral.calculations: + files = calculation.check() + results.append(qm.read_gradient(files)) + dihedral.results = results + + +def get_unique_dihedrals(mol, do_scan): + scans = [] + unique_dihedrals = {} + + print(mol.terms['dihedral/flexible']) + + if 'dihedral/flexible' not in mol.terms or len(mol.terms['dihedral/flexible']) == 0 or not do_scan: + return {} + + for term in mol.terms['dihedral/flexible']: + dih_type = mol.topo.edge(term.atomids[1], term.atomids[2])['vers'] + if dih_type not in unique_dihedrals: + unique_dihedrals[dih_type] = term.atomids + + return unique_dihedrals diff --git a/qforce/schemes/preopt.py b/qforce/schemes/preopt.py new file mode 100644 index 0000000..c09287c --- /dev/null +++ b/qforce/schemes/preopt.py @@ -0,0 +1,80 @@ +from calkeeper import CalculationIncompleteError +# +from .creator import CostumStructureCreator, CalculationStorage + + +class OptimizationOutput: + + def __init__(self, mol, coords): + self._molecule = mol + self.coords = coords[0] + self._molecule.set_positions(self.coords) + self._all = coords + + @property + def molecule(self): + return self._molecule + + def all_molecules(self): + molecules = [] + for coords in self._all_coords: + mol = deepcopy(self._molecule) + mol.set_positions(coords) + molecules.append(mol) + return molecules + + +class PreoptCreator(CostumStructureCreator): + + name = 'preopt' + + def __init__(self, molecule): + self._init_molecule = molecule + self.atnums = molecule.get_atomic_numbers() + self._init_coords = molecule.get_positions() + self._preopt = CalculationStorage() + + def coords(self): + """get current coordinates""" + if len(self._preopt.results) == 0: + return self._init_coords + return self._preopt.results[0].coords + + def molecule(self, get_all=False): + if len(self._preopt.results) == 0: + return self._init_molecule + # + if get_all is False: + return self._preopt.results[0].molecule + return self._preopt.results[0].all_molecules() + + def enouts(self): + return [] + + def gradouts(self): + return [] + + def hessouts(self): + return [] + + def setup_main(self, qm): + if qm.softwares['preopt'] is None: + return + folder = qm.pathways.getdir("preopt", create=True) + calc = qm.setup_preopt(folder, self._init_coords, self.atnums) + self._preopt.calculations.append(calc) + + def check_main(self): + for calc in self._preopt.calculations: + try: + _ = calc.check() + except CalculationIncompleteError: + return calc + return None + + def parse_main(self, qm): + results = [] + for calculation in self._preopt.calculations: + files = calculation.check() + results.append(OptimizationOutput(self._init_molecule, qm.read_opt(files))) + self._preopt.results = results