Skip to content

Commit

Permalink
Add vcf module
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted Brookings committed Jul 11, 2023
1 parent 0c17fe0 commit 97ef17c
Show file tree
Hide file tree
Showing 3 changed files with 732 additions and 0 deletions.
369 changes: 369 additions & 0 deletions fgpyo/vcf/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,369 @@
"""
Classes for generating VCF and records for testing
--------------------------------------------------
This module contains utility classes for the generation of VCF files and variant records, for use
in testing.
The module contains the following public classes:
- :class:`~variantbuilder.VariantBuilder` -- A builder class that allows the
accumulation of variant records and access as a list and writing to file.
Examples
~~~~~~~~
Typically, we have :class:`~cyvcf2.Variant` records obtained from reading from a VCF file. The
:class:`~variantbuilder.VariantBuilder` class builds such records. The variants produced
are from a single-sample.
The preferred usage is within a context manager (i.e. use `with`):
>>> from fgpyo.vcf import VariantBuilder
>>> with VariantBuilder() as builder:
>>> builder.add() # uses the defaults
This enables the automatic release resources used by `VariantBuilder` to create `Variant` records.
One may also use the :func:`~variantbuilder.VariantBuilder.close()` method:
>>> from fgpyo.vcf import VariantBuilder
>>> builder: VariantBuilder = VariantBuilder()
>>> builder.add() # uses the defaults
>>> builder.close()
Variants are added with the :func:`~variantbuilder.VariantBuilder.add()` method, which
returns a `Variant`.
>>> with VariantBuilder() as builder:
>>> variant: Variant = builder.add(
>>> chr="chr2", pos=1001, id="rs1234", ref="C", alt=["T"],
>>> qual=40, flt=["PASS"], gt="0|1"
>>> )
The variants stored in the builder can be retrieved as a coordinate sorted VCF file via the
:func:`~variantbuilder.VariantBuilder.to_path()` method:
>>> from pathlib import Path
>>> path_to_vcf: Path = builder.to_path()
The variants may also be retrieved in the order they were added via the
:func:`~variantbuilder.VariantBuilder.to_unsorted_list()` method and in coordinate sorted
order via the :func:`~variantbuilder.VariantBuilder.to_sorted_list()` method.
"""
import os
import sys
from abc import ABC
from abc import abstractmethod
from collections import OrderedDict
from contextlib import contextmanager
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any
from typing import ClassVar
from typing import Dict
from typing import Generator
from typing import Generic
from typing import Iterable
from typing import List
from typing import Optional
from typing import TextIO
from typing import Tuple
from typing import TypeVar
from typing import Union

from pysam import VariantFile
from pysam import VariantFile as VcfReader
from pysam import VariantFile as VcfWriter
from pysam import VariantHeader

from fgpyo.sam.builder import SamBuilder

Variant = TypeVar("Variant")


class Defaults:
Contig: ClassVar[str] = "chr1"
Position: ClassVar[int] = 1000
Id: ClassVar[str] = "."
Ref: ClassVar[str] = "A"
Alts: ClassVar[Optional[List[str]]] = None
Qual: ClassVar[int] = 60

@staticmethod
def init(
chr: Optional[str] = None,
pos: Optional[int] = None,
id: Optional[str] = None,
ref: Optional[str] = None,
alts: Optional[Iterable[str]] = None,
qual: Optional[int] = None,
) -> Tuple[str, int, str, str, List[str], int]:
"""Sets the defaults if the values are not defined.
Args:
chr: the chromosome name
pos: the position
id: the variant id
ref: the reference allele
alts: the list of alternate alleles, None if no alternates
qual: the variant quality
"""
return (
Defaults.Contig if chr is None else chr,
Defaults.Position if pos is None else pos,
Defaults.Id if id is None else id,
Defaults.Ref if ref is None else ref,
Defaults.Alts if alts is None else list(alts),
Defaults.Qual if qual is None else qual,
)


"""The valid base classes for opening a SAM/BAM/CRAM file."""
VcfPath = Union[Path, str, TextIO]


@contextmanager
def redirect_dev_null(file_num: int = sys.stderr.fileno()) -> Generator[None, None, None]:
"""A context manager that redirects output of file handle to /dev/null
Args:
file_num: number of filehandle to redirect. Uses stderr by default
"""
# open /dev/null for writing
f_devnull = os.open(os.devnull, os.O_RDWR)
# save old file descriptor and redirect stderr to /dev/null
save_stderr = os.dup(file_num)
os.dup2(f_devnull, file_num)

yield

# restore file descriptor and close devnull
os.dup2(save_stderr, file_num)
os.close(f_devnull)


@contextmanager
def reader(path: VcfPath) -> Generator[VcfReader, None, None]:
"""Opens the given path for VCF reading
Args:
path: the path to a VCF, or an open file handle
"""
if isinstance(path, (str, Path, TextIO)):
with redirect_dev_null():
# to avoid spamming log about index older than vcf, redirect stderr to /dev/null: only
# when first opening the file
_reader = VariantFile(path, mode="r") # type: ignore
# now stderr is back, so any later stderr messages will go through
yield _reader
_reader.close()
else:
raise TypeError(f"Cannot open '{type(path)}' for VCF reading.")


@contextmanager
def writer(
path: VcfPath, reader: Optional[VcfReader] = None, header: Optional[VariantHeader] = None
) -> Generator[VcfWriter, None, None]:
"""Opens the given path for VCF writing. Either reader or header must be specified.
Args:
path: the path to a VCF, or an open filehandle
reader: the source VCF from which variants are read
header: the VCF header to use for the output VCF
"""
# Convert Path to str such that pysam will autodetect to write as a gzipped file if provided
# with a .vcf.gz suffix.
if isinstance(path, Path):
path = str(path)
if not isinstance(path, (str, TextIO)):
raise TypeError(f"Cannot open '{type(path)}' for VCF writing.")
if reader is not None and header is not None:
raise ValueError("Either reader or header must be specified when writing a VCF.")
elif reader is not None:
_writer = VariantFile(path, header=reader.header, mode="w")
elif header is not None:
_writer = VariantFile(path, header=header, mode="w")
else:
raise ValueError("Either reader or header must be given when writing a VCF.")
yield _writer
_writer.close()


class VariantBuilder(Generic[Variant], ABC):
"""Builder for constructing one or more variant records (Variant in cyvcf2 terms) for a
single-sample VCF.
Provides the ability to manufacture variants from minimal arguments, while generating
any remaining attributes to ensure a valid variant.
A builder is constructed with a handful of defaults including the sample name and sequence
dictionary.
Variants are then added using the :func:`~fgpyo.vcf.VariantBuilder.add` method.
Once accumulated the variants can be accessed in the order in which they were created through
the :func:`~fgpyo.vcf.VariantBuilder.to_unsorted_list` function, or in a
list sorted by coordinate order via
:func:`~fgpyo.vcf.VariantBuilder.to_sorted_list`. Lastly, the records can
be written to a temporary file using :func:`~fgpyo.vcf.VariantBuilder.to_path`.
Attributes:
samples: the sample name(s)
sd: list of dictionaries with names and lengths of each contig
seq_idx_lookup: dictionary mapping contig name to index of contig in sd
records: the list of variant records
"""

samples: List[str]
sd: Dict[str, Dict[str, Any]]
seq_idx_lookup: Dict[str, int]
records: List[Variant]

@classmethod
def default_sd(cls) -> Dict[str, Dict[str, Any]]:
"""Generates the sequence dictionary that is used by default by VariantBuilder.
Matches the names and lengths of the HG19 reference.
Returns:
A new copy of the sequence dictionary as a map of contig name to dictionary, one per
contig.
"""
sd: Dict[str, Dict[str, Any]] = OrderedDict()
for sequence in SamBuilder.default_sd():
contig = sequence["SN"]
sd[contig] = {"ID": contig, "length": sequence["LN"]}
return sd

@classmethod
def build_header_string(
cls, samples: Optional[List[str]] = None, sd: Optional[Dict[str, Dict[str, Any]]] = None
) -> str:
"""Builds the VCF header with the given sample name(s) and sequence dictionary.
Args:
samples: the sample name(s).
sd: the sequence dictionary mapping the contig name to the key-value pairs for the
given contig. Must include "ID" and "length" for each contig. If no sequence
dictionary is given, will use the default dictionary.
"""
if sd is None:
sd = VariantBuilder.default_sd()
buffer = "##fileformat=VCFv4.2\n"
buffer += '##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">\n'
for d in sd.values():
assert "ID" in d, "ID missing in sequence dictionary"
assert "length" in d, "length missing in sequence dictionary"
contig_id = d["ID"]
contig_length = d["length"]
contig_buffer = f"##contig=<ID={contig_id},length={contig_length}"
for key, value in d.items():
if key == "ID" or key == "length":
continue
contig_buffer += f",{key}={value}"
contig_buffer += ">\n"
buffer += contig_buffer

buffer += "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT"
if samples is not None:
buffer += "\t"
buffer += "\t".join(samples)
buffer += "\n"

return buffer

def __init__(
self, samples: Optional[List[str]] = None, sd: Optional[Dict[str, Dict[str, Any]]] = None
) -> None:
"""Initializes a new VariantBuilder for generating variants and VCF files.
Args:
samples: the name of the sample(s)
sd: optional sequence dictionary
"""
self.samples: List[str] = samples if samples is not None else []
self.sd: Dict[str, Dict[str, Any]] = sd if sd is not None else VariantBuilder.default_sd()
self.seq_idx_lookup: Dict[str, int] = {name: i for i, name in enumerate(self.sd.keys())}
self.records: List[Variant] = []

def __enter__(self) -> "VariantBuilder":
return self

def __exit__(self, *args: Any) -> bool: # type: ignore
self.close()
return False

@abstractmethod
def close(self) -> None:
"""Closes this builder"""
pass

@abstractmethod
def add(
self,
chr: Optional[str] = None,
pos: Optional[int] = None,
id: Optional[str] = None,
ref: Optional[str] = None,
alts: Optional[List[str]] = None,
qual: Optional[int] = None,
filters: Optional[List[str]] = None,
infos: Optional[Dict[str, Any]] = None,
formats: Optional[Dict[str, Dict[str, Any]]] = None,
) -> Variant:
"""Generates a new variant and adds it to the internal collection.
Args:
chr: the chromosome name
pos: the position
id: the variant id
ref: the reference allele
alts: the list of alternate alleles, None if no alternates
qual: the variant quality
filters: the list of filters, None if no filters (ex. PASS)
infos: the dictionary of INFO key-value pairs
formats: the dictionary of sample name to FORMAT key-value pairs.
"""
pass

@abstractmethod
def to_path(self, path: Optional[Path] = None) -> Path:
"""Returns a path to a VCF for variants added to this builder.
Args:
path: optional path to the VCF
"""
pass

@staticmethod
def _update_path(path: Optional[Path]) -> Path:
"""Gets the path to a VCF file. If path is a directory, a temporary VCF will be created in
that directory. If path is `None`, then a temporary VCF will be created. Otherwise, the
given path is simply returned.
Args:
path: optionally the path to the VCF, or a directory to create a temporary VCF.
"""
if path is None:
with NamedTemporaryFile(suffix=".vcf", delete=False) as fp:
path = Path(fp.name)
assert path.is_file()
return path

def to_unsorted_list(self) -> List[Variant]:
"""Returns the accumulated records in the order they were created."""
return list(self.records)

@abstractmethod
def _get_chrom_pos_end(self, variant: Variant) -> Tuple[str, int, int]:
"""Getter for chromosome, start and end pos from Variant"""
pass

def _sort_key(self, variant: Variant) -> Tuple[int, int, int]:
chrom, pos, end = self._get_chrom_pos_end(variant)
return self.seq_idx_lookup[chrom], pos, end

def to_sorted_list(self) -> List[Variant]:
"""Returns the accumulated records in coordinate order."""
chrom, pos, end = self._get_chrom_pos_end(self.records[0])
return sorted(self.records, key=self._sort_key)
Loading

0 comments on commit 97ef17c

Please sign in to comment.