Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/split runner #239

Open
wants to merge 3 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 227 additions & 0 deletions oktoberfest/jobs/ce_calibration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import logging
from pathlib import Path
from typing import List, Optional, Union

import numpy as np
from sklearn.linear_model import LinearRegression, RANSACRegressor

from oktoberfest import plotting as pl
from oktoberfest import predict as pr
from oktoberfest import preprocessing as pp

from ..data.spectra import Spectra
from ..utils import Config, JobPool, ProcessStep

logger = logging.getLogger(__name__)


def preprocess(spectra_files: List[Path], config: Config) -> List[Path]:
preprocess_search_step = ProcessStep(config.output, "preprocessing_search")
if not preprocess_search_step.is_done():
# load search results
if not config.search_results_type == "internal":
logger.info(f"Converting search results from {config.search_results} to internal search result.")

msms_output = config.output / "msms"
msms_output.mkdir(exist_ok=True)
internal_search_file = msms_output / "msms.prosit"
tmt_label = config.tag

search_results = pp.convert_search(
input_path=config.search_results,
search_engine=config.search_results_type,
tmt_label=tmt_label,
output_file=internal_search_file,
)
if config.spectra_type.lower() in ["d", "hdf"]:
timstof_metadata = pp.convert_timstof_metadata(
input_path=config.search_results,
search_engine=config.search_results_type,
output_file=msms_output / "tims_meta.csv",
)
else:
internal_search_file = config.search_results
search_results = pp.load_search(internal_search_file)
# TODO add support for internal timstof metadata
logger.info(f"Read {len(search_results)} PSMs from {internal_search_file}")

# filter search results
search_results = pp.filter_peptides_for_model(peptides=search_results, model=config.models["intensity"])

# split search results
searchfiles_found = pp.split_search(
search_results=search_results,
output_dir=config.output / "msms",
filenames=[spectra_file.stem for spectra_file in spectra_files],
)
# split timstof metadata
if config.spectra_type.lower() in ["d", "hdf"]:
_ = pp.split_timstof_metadata(
timstof_metadata=timstof_metadata,
output_dir=config.output / "msms",
filenames=searchfiles_found,
)
preprocess_search_step.mark_done()
else:
searchfiles_found = [msms_file.stem for msms_file in (config.output / "msms").glob("*rescore")]
spectra_files_to_return = []
for spectra_file in spectra_files:
if spectra_file.stem in searchfiles_found:
spectra_files_to_return.append(spectra_file)

return spectra_files_to_return


def _annotate_and_get_library(spectra_file: Path, config: Config, tims_meta_file: Optional[Path] = None) -> Spectra:
data_dir = config.output / "data"
data_dir.mkdir(exist_ok=True)
hdf5_path = data_dir / spectra_file.with_suffix(".mzml.hdf5").name
if hdf5_path.is_file():
aspec = Spectra.from_hdf5(hdf5_path)
instrument_type = config.instrument_type
if instrument_type is not None and aspec.obs["INSTRUMENT_TYPES"].values[0] != instrument_type:
aspec.obs["INSTRUMENT_TYPES"] = instrument_type
aspec.write_as_hdf5(hdf5_path)
else:
spectra_dir = config.output / "spectra"
spectra_dir.mkdir(exist_ok=True)
format_ = spectra_file.suffix.lower()
if format_ == ".raw":
file_to_load = spectra_dir / spectra_file.with_suffix(".mzML").name
pp.convert_raw_to_mzml(spectra_file, file_to_load, thermo_exe=config.thermo_exe)
elif format_ in [".mzml", ".hdf"]:
file_to_load = spectra_file
elif format_ == ".d":
file_to_load = spectra_dir / spectra_file.with_suffix(".hdf").name
pp.convert_d_to_hdf(spectra_file, file_to_load)
spectra = pp.load_spectra(file_to_load, tims_meta_file=tims_meta_file)
config_instrument_type = config.instrument_type
if config_instrument_type is not None:
spectra["INSTRUMENT_TYPES"] = config_instrument_type
search = pp.load_search(config.output / "msms" / spectra_file.with_suffix(".rescore").name)
library = pp.merge_spectra_and_peptides(spectra, search)
aspec = pp.annotate_spectral_library(
library, mass_tol=config.mass_tolerance, unit_mass_tol=config.unit_mass_tolerance
)
aspec.write_as_hdf5(hdf5_path) # write_metadata_annotation

return aspec


def _get_best_ce(library: Spectra, spectra_file: Path, config: Config):
results_dir = config.output / "results"
results_dir.mkdir(exist_ok=True)
if (library.obs["FRAGMENTATION"] == "HCD").any():
server_kwargs = {
"server_url": config.prediction_server,
"ssl": config.ssl,
"model_name": config.models["intensity"],
}
use_ransac_model = config.use_ransac_model
alignment_library = pr.ce_calibration(library, config.ce_range, use_ransac_model, **server_kwargs)

if use_ransac_model:
logger.info("Performing RANSAC regression")
calib_group = (
alignment_library.obs.groupby(
by=["PRECURSOR_CHARGE", "ORIG_COLLISION_ENERGY", "COLLISION_ENERGY", "MASS"], as_index=False
)["SPECTRAL_ANGLE"]
.mean()
.groupby(["PRECURSOR_CHARGE", "ORIG_COLLISION_ENERGY", "MASS"], as_index=False)
.apply(lambda x: x.loc[x["SPECTRAL_ANGLE"].idxmax()])
)
calib_group["delta_collision_energy"] = (
calib_group["COLLISION_ENERGY"] - calib_group["ORIG_COLLISION_ENERGY"]
)
x = calib_group[["MASS", "PRECURSOR_CHARGE"]] # input feature
y = calib_group["delta_collision_energy"] # target variable
ransac = RANSACRegressor(LinearRegression(), residual_threshold=1.5, random_state=42)
ransac.fit(x, y)

for charge, df in calib_group.groupby("PRECURSOR_CHARGE"):
r2_score = ransac.score(df[["MASS", "PRECURSOR_CHARGE"]], df["COLLISION_ENERGY"])
title = f"Scatter Plot with RANSAC Model \nSlope: {ransac.estimator_.coef_[0]:.2f}, "
title += f"Intercept: {ransac.estimator_.intercept_:.2f}, R2: {r2_score:.2f}"
pl.plot_ce_ransac_model(
sa_ce_df=df,
filename=results_dir / f"{spectra_file.stem}_ce_ransac_model_{charge}.svg",
title=title,
)

delta_ce = ransac.predict(library.obs[["MASS", "PRECURSOR_CHARGE"]])
library.obs["COLLISION_ENERGY"] = np.maximum(0, library.obs["COLLISION_ENERGY"] + delta_ce)

else:
ce_alignment = alignment_library.obs.groupby(by=["COLLISION_ENERGY"])["SPECTRAL_ANGLE"].mean()

best_ce = ce_alignment.idxmax()
pl.plot_mean_sa_ce(
sa_ce_df=ce_alignment.to_frame().reset_index(),
filename=results_dir / f"{spectra_file.stem}_mean_spectral_angle_ce.svg",
)
pl.plot_violin_sa_ce(
sa_ce_df=alignment_library.obs[["COLLISION_ENERGY", "SPECTRAL_ANGLE"]],
filename=results_dir / f"{spectra_file.stem}_violin_spectral_angle_ce.svg",
)
library.obs["COLLISION_ENERGY"] = best_ce
with open(results_dir / f"{spectra_file.stem}_ce.txt", "w") as f:
f.write(str(best_ce))
f.close()
else:
best_ce = 35
library.obs["COLLISION_ENERGY"] = best_ce

with open(results_dir / f"{spectra_file.stem}_ce.txt", "w") as f:
f.write(str(best_ce))
f.close()\


def ce_calib(spectra_file: Path, config: Config) -> Spectra:
ce_calib_step = ProcessStep(config.output, "ce_calib." + spectra_file.stem)
if ce_calib_step.is_done():
hdf5_path = config.output / "data" / spectra_file.with_suffix(".mzml.hdf5").name
if hdf5_path.is_file():
library = Spectra.from_hdf5(hdf5_path)
return library
else:
raise FileNotFoundError(f"{hdf5_path} not found but ce_calib.{spectra_file.stem} found. Please check.")
tims_meta_file = None
if config.spectra_type.lower() in ["hdf", "d"]: # if it is timstof
tims_meta_file = config.output / "msms" / spectra_file.with_suffix(".timsmeta").name
aspec = _annotate_and_get_library(spectra_file, config, tims_meta_file=tims_meta_file)
_get_best_ce(aspec, spectra_file, config)

aspec.write_as_hdf5(config.output / "data" / spectra_file.with_suffix(".mzml.hdf5").name)

ce_calib_step.mark_done()

return aspec

def run_ce_calibration(
config_path: Union[str, Path],
):
"""
Create a CeCalibration object and run the CE calibration.

# TODO full description
:param config_path: path to config file
"""
config = Config()
config.read(config_path)

# load spectra file names
spectra_files = pp.list_spectra(input_dir=config.spectra, input_format=config.spectra_type)

proc_dir = config.output / "proc"
proc_dir.mkdir(parents=True, exist_ok=True)

spectra_files = preprocess(spectra_files, config)

if config.num_threads > 1:
processing_pool = JobPool(processes=config.num_threads)
for spectra_file in spectra_files:
processing_pool.apply_async(ce_calib, [spectra_file, config])
processing_pool.check_pool()
else:
for spectra_file in spectra_files:
ce_calib(spectra_file, config)
151 changes: 151 additions & 0 deletions oktoberfest/jobs/rescoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import logging
from pathlib import Path
from typing import Union

from oktoberfest import plotting as pl
from oktoberfest import predict as pr
from oktoberfest import preprocessing as pp
from oktoberfest import rescore as re
from oktoberfest.jobs import ce_calibration as ce

from ..utils import Config, JobPool, ProcessStep, group_iterator

logger = logging.getLogger(__name__)


def _calculate_features(spectra_file: Path, config: Config):
library = ce.ce_calib(spectra_file, config)

calc_feature_step = ProcessStep(config.output, "calculate_features." + spectra_file.stem)
if calc_feature_step.is_done():
return

predict_step = ProcessStep(config.output, "predict." + spectra_file.stem)
if not predict_step.is_done():

predict_kwargs = {
"server_url": config.prediction_server,
"ssl": config.ssl,
}

if "alphapept" in config.models["intensity"].lower():
chunk_idx = list(group_iterator(df=library.obs, group_by_column="PEPTIDE_LENGTH"))
else:
chunk_idx = None
pr.predict_intensities(
data=library, chunk_idx=chunk_idx, model_name=config.models["intensity"], **predict_kwargs
)

pr.predict_rt(data=library, model_name=config.models["irt"], **predict_kwargs)

library.write_as_hdf5(config.output / "data" / spectra_file.with_suffix(".mzml.hdf5").name)
predict_step.mark_done()

# produce percolator tab files
fdr_dir = config.output / "results" / config.fdr_estimation_method
fdr_dir.mkdir(exist_ok=True)

re.generate_features(
library=library,
search_type="original",
output_file=fdr_dir / spectra_file.with_suffix(".original.tab").name,
all_features=config.all_features,
regression_method=config.curve_fitting_method,
)
re.generate_features(
library=library,
search_type="rescore",
output_file=fdr_dir / spectra_file.with_suffix(".rescore.tab").name,
all_features=config.all_features,
regression_method=config.curve_fitting_method,
)

calc_feature_step.mark_done()


def _rescore(fdr_dir: Path, config: Config):
"""
High level rescore function for original and rescore.

:param fdr_dir: the output directory
:param config: the configuration object
:raises ValueError: if the provided fdr estimation method in the config is not recognized
"""
rescore_original_step = ProcessStep(config.output, f"{config.fdr_estimation_method}_original")
rescore_prosit_step = ProcessStep(config.output, f"{config.fdr_estimation_method}_prosit")

if config.fdr_estimation_method == "percolator":
if not rescore_original_step.is_done():
re.rescore_with_percolator(input_file=fdr_dir / "original.tab", output_folder=fdr_dir)
rescore_original_step.mark_done()
if not rescore_prosit_step.is_done():
re.rescore_with_percolator(input_file=fdr_dir / "rescore.tab", output_folder=fdr_dir)
rescore_prosit_step.mark_done()
elif config.fdr_estimation_method == "mokapot":
if not rescore_original_step.is_done():
re.rescore_with_mokapot(input_file=fdr_dir / "original.tab", output_folder=fdr_dir)
rescore_original_step.mark_done()
if not rescore_prosit_step.is_done():
re.rescore_with_mokapot(input_file=fdr_dir / "rescore.tab", output_folder=fdr_dir)
rescore_prosit_step.mark_done()
else:
raise ValueError(
'f{config.fdr_estimation_method} is not a valid rescoring tool, use either "percolator" or "mokapot"'
)


def run_rescoring(config_path: Union[str, Path]):
"""
Create a ReScore object and run the rescoring.

# TODO full description
:param config_path: path to config file
"""
config = Config()
config.read(config_path)

# load spectra file names
spectra_files = pp.list_spectra(input_dir=config.spectra, input_format=config.spectra_type)

proc_dir = config.output / "proc"
proc_dir.mkdir(parents=True, exist_ok=True)

spectra_files = ce.preprocess(spectra_files, config)

if config.num_threads > 1:
processing_pool = JobPool(processes=config.num_threads)
for spectra_file in spectra_files:
processing_pool.apply_async(_calculate_features, [spectra_file, config])
processing_pool.check_pool()
else:
for spectra_file in spectra_files:
_calculate_features(spectra_file, config)

# prepare rescoring

fdr_dir = config.output / "results" / config.fdr_estimation_method

original_tab_files = [fdr_dir / spectra_file.with_suffix(".original.tab").name for spectra_file in spectra_files]
rescore_tab_files = [fdr_dir / spectra_file.with_suffix(".rescore.tab").name for spectra_file in spectra_files]

prepare_tab_original_step = ProcessStep(config.output, f"{config.fdr_estimation_method}_prepare_tab_original")
prepare_tab_rescore_step = ProcessStep(config.output, f"{config.fdr_estimation_method}_prepare_tab_prosit")

if not prepare_tab_original_step.is_done():
logger.info("Merging input tab files for rescoring without peptide property prediction")
re.merge_input(tab_files=original_tab_files, output_file=fdr_dir / "original.tab")
prepare_tab_original_step.mark_done()

if not prepare_tab_rescore_step.is_done():
logger.info("Merging input tab files for rescoring with peptide property prediction")
re.merge_input(tab_files=rescore_tab_files, output_file=fdr_dir / "rescore.tab")
prepare_tab_rescore_step.mark_done()

# rescoring
_rescore(fdr_dir, config)

# plotting
logger.info("Generating summary plots...")
pl.plot_all(fdr_dir)

logger.info("Finished rescoring.")
Loading
Loading