From 1fb1300557e3daf69139ecc7203d2f8d06376b03 Mon Sep 17 00:00:00 2001 From: Ted Brookings Date: Fri, 28 Jul 2023 09:50:39 -0400 Subject: [PATCH] Respond to review comments --- fgpyo/vcf/__init__.py | 408 +++----------------------------- fgpyo/vcf/builder.py | 285 ++++++++++++++++++---- fgpyo/vcf/tests/test_builder.py | 4 +- 3 files changed, 279 insertions(+), 418 deletions(-) diff --git a/fgpyo/vcf/__init__.py b/fgpyo/vcf/__init__.py index 42621ee3..c8122e4f 100644 --- a/fgpyo/vcf/__init__.py +++ b/fgpyo/vcf/__init__.py @@ -7,32 +7,36 @@ The module contains the following public classes: - - :class:`~variantbuilder.VariantBuilder` -- A builder class that allows the + - :class:`~VariantBuilder` -- A builder class that allows the accumulation of variant records and access as a list and writing to file. Examples ~~~~~~~~ -Typically, we have :class:`~cyvcf2.Variant` records obtained from reading from a VCF file. The -:class:`~variantbuilder.VariantBuilder` class builds such records. The variants produced -are from a single-sample. +Typically, we have :class:`~pysam.VariantRecord` records obtained from reading from a VCF file. +The :class:`~VariantBuilder` class builds such records. -The preferred usage is within a context manager (i.e. use `with`): +Variants are added with the :func:`~VariantBuilder.add()` method, which +returns a `Variant`. - >>> from fgpyo.vcf import VariantBuilder - >>> with VariantBuilder() as builder: - >>> builder.add() # uses the defaults + >>> import pysam + >>> from fgpyo.vcf.builder import VariantBuilder + >>> builder: VariantBuilder = VariantBuilder() + >>> new_record_1: pysam.VariantRecord = builder.add() # uses the defaults + >>> new_record_2: pysam.VariantRecord = builder.add( + >>> contig="chr2", pos=1001, id="rs1234", ref="C", alts=["T"], + >>> qual=40, filter=["PASS"] + >>> ) -This enables the automatic release resources used by `VariantBuilder` to create `Variant` records. -One may also use the :func:`~variantbuilder.VariantBuilder.close()` method: +VariantBuilder can create sites-only, single-sample, or multi-sample VCF files. If not producing a +sites-only VCF file, VariantBuilder must be created by passing a list of sample IDs - >>> from fgpyo.vcf import VariantBuilder - >>> builder: VariantBuilder = VariantBuilder() - >>> builder.add() # uses the defaults - >>> builder.close() + >>> builder: VariantBuilder = VariantBuilder(samples=["sample1", "sample2"]) + >>> new_record_1: pysam.VariantRecord = builder.add() # uses the defaults + >>> new_record_2: pysam.VariantRecord = builder.add( + >>> samples={"sample1": {"GT": "0|1"}, "sample2": {"GT": "0|0"}} + >>> ) -Variants are added with the :func:`~variantbuilder.VariantBuilder.add()` method, which -returns a `Variant`. >>> with VariantBuilder() as builder: >>> variant: Variant = builder.add( @@ -41,35 +45,22 @@ >>> ) The variants stored in the builder can be retrieved as a coordinate sorted VCF file via the -:func:`~variantbuilder.VariantBuilder.to_path()` method: +:func:`~VariantBuilder.to_path()` method: >>> from pathlib import Path >>> path_to_vcf: Path = builder.to_path() The variants may also be retrieved in the order they were added via the -:func:`~variantbuilder.VariantBuilder.to_unsorted_list()` method and in coordinate sorted -order via the :func:`~variantbuilder.VariantBuilder.to_sorted_list()` method. +:func:`~VariantBuilder.to_unsorted_list()` method and in coordinate sorted +order via the :func:`~VariantBuilder.to_sorted_list()` method. """ import os import sys -from abc import ABC -from abc import abstractmethod from contextlib import contextmanager -from enum import Enum from pathlib import Path -from tempfile import NamedTemporaryFile -from typing import Any -from typing import ClassVar -from typing import Dict from typing import Generator -from typing import Generic -from typing import Iterable -from typing import List -from typing import Optional from typing import TextIO -from typing import Tuple -from typing import TypeVar from typing import Union from pysam import VariantFile @@ -77,72 +68,11 @@ from pysam import VariantFile as VcfWriter from pysam import VariantHeader -from fgpyo.sam.builder import SamBuilder - -Variant = TypeVar("Variant") - - -class Defaults: - Position: ClassVar[int] = 1000 - Id: ClassVar[str] = "." - Ref: ClassVar[str] = "A" - Alts: ClassVar[Tuple[str, ...]] = (".",) - Qual: ClassVar[int] = 60 - - @staticmethod - def init( - sd: Dict[str, Dict[str, Any]], - contig: Optional[str] = None, - pos: Optional[int] = None, - id: Optional[str] = None, - ref: Optional[str] = None, - alts: Optional[Iterable[str]] = None, - qual: Optional[int] = None, - ) -> Tuple[str, int, str, str, Tuple[str, ...], int]: - """Sets the defaults if the values are not defined. - - Args: - sd: the sequence dictionary for the variant builder - contig: the chromosome name - pos: the position - id: the variant id - ref: the reference allele - alts: the list of alternate alleles, None if no alternates - qual: the variant quality - """ - return ( - next(iter(sd.keys())) if contig is None else contig, - Defaults.Position if pos is None else pos, - Defaults.Id if id is None else id, - Defaults.Ref if ref is None else ref, - Defaults.Alts if alts is None else tuple(alts), - Defaults.Qual if qual is None else qual, - ) - """The valid base classes for opening a VCF file.""" VcfPath = Union[Path, str, TextIO] -class VcfFieldType(Enum): - """Codes for VCF field types""" - - INTEGER = "Integer" - FLOAT = "Float" - FLAG = "Flag" - CHARACTER = "Character" - STRING = "String" - - -class VcfFieldNumber(Enum): - """Special codes for VCF field numbers""" - - NUM_ALT_ALLELES = "A" - NUM_ALLELES = "R" - NUM_GENOTYPES = "G" - UNKNOWN = "." - - @contextmanager def redirect_dev_null(file_num: int = sys.stderr.fileno()) -> Generator[None, None, None]: """A context manager that redirects output of file handle to /dev/null @@ -170,295 +100,27 @@ def reader(path: VcfPath) -> Generator[VcfReader, None, None]: Args: path: the path to a VCF, or an open file handle """ - if isinstance(path, (str, Path, TextIO)): - with redirect_dev_null(): - # to avoid spamming log about index older than vcf, redirect stderr to /dev/null: only - # when first opening the file - _reader = VariantFile(path, mode="r") # type: ignore - # now stderr is back, so any later stderr messages will go through - yield _reader - _reader.close() - else: - raise TypeError(f"Cannot open '{type(path)}' for VCF reading.") + with redirect_dev_null(): + # to avoid spamming log about index older than vcf, redirect stderr to /dev/null: only + # when first opening the file + _reader = VariantFile(path, mode="r") # type: ignore + # now stderr is back, so any later stderr messages will go through + yield _reader + _reader.close() @contextmanager -def writer( - path: VcfPath, reader: Optional[VcfReader] = None, header: Optional[VariantHeader] = None -) -> Generator[VcfWriter, None, None]: - """Opens the given path for VCF writing. Either reader or header must be specified. - +def writer(path: VcfPath, header: VariantHeader) -> Generator[VcfWriter, None, None]: + """Opens the given path for VCF writing. Args: path: the path to a VCF, or an open filehandle - reader: the source VCF from which variants are read - header: the VCF header to use for the output VCF + header: the source for the output VCF header. If you are modifying a VCF file that you are + reading from, you can pass reader.header """ # Convert Path to str such that pysam will autodetect to write as a gzipped file if provided # with a .vcf.gz suffix. if isinstance(path, Path): path = str(path) - if not isinstance(path, (str, TextIO)): - raise TypeError(f"Cannot open '{type(path)}' for VCF writing.") - if reader is not None and header is not None: - raise ValueError("Either reader or header must be specified when writing a VCF.") - elif reader is not None: - _writer = VariantFile(path, header=reader.header, mode="w") - elif header is not None: - _writer = VariantFile(path, header=header, mode="w") - else: - raise ValueError("Either reader or header must be given when writing a VCF.") + _writer = VariantFile(path, header=header, mode="w") yield _writer _writer.close() - - -class VariantBuilder(Generic[Variant], ABC): - """Builder for constructing one or more variant records (Variant in cyvcf2 terms) for a - single-sample VCF. - - Provides the ability to manufacture variants from minimal arguments, while generating - any remaining attributes to ensure a valid variant. - - A builder is constructed with a handful of defaults including the sample name and sequence - dictionary. - - Variants are then added using the :func:`~fgpyo.vcf.VariantBuilder.add` method. - Once accumulated the variants can be accessed in the order in which they were created through - the :func:`~fgpyo.vcf.VariantBuilder.to_unsorted_list` function, or in a - list sorted by coordinate order via - :func:`~fgpyo.vcf.VariantBuilder.to_sorted_list`. Lastly, the records can - be written to a temporary file using :func:`~fgpyo.vcf.VariantBuilder.to_path`. - - Attributes: - samples: the sample name(s) - sd: list of dictionaries with names and lengths of each contig - seq_idx_lookup: dictionary mapping contig name to index of contig in sd - records: the list of variant records - """ - - samples: List[str] - sd: Dict[str, Dict[str, Any]] - seq_idx_lookup: Dict[str, int] - records: List[Variant] - - @classmethod - def default_sd(cls) -> Dict[str, Dict[str, Any]]: - """Generates the sequence dictionary that is used by default by VariantBuilder. - - Matches the names and lengths of the HG19 reference. - - Returns: - A new copy of the sequence dictionary as a map of contig name to dictionary, one per - contig. - """ - sd: Dict[str, Dict[str, Any]] = {} - for sequence in SamBuilder.default_sd(): - contig = sequence["SN"] - sd[contig] = {"ID": contig, "length": sequence["LN"]} - return sd - - @classmethod - def build_header_string( - cls, samples: Optional[List[str]] = None, sd: Optional[Dict[str, Dict[str, Any]]] = None - ) -> str: - """Builds the VCF header with the given sample name(s) and sequence dictionary. - - Args: - samples: the sample name(s). - sd: the sequence dictionary mapping the contig name to the key-value pairs for the - given contig. Must include "ID" and "length" for each contig. If no sequence - dictionary is given, will use the default dictionary. - """ - if sd is None: - sd = VariantBuilder.default_sd() - buffer = "##fileformat=VCFv4.2\n" - buffer += '##FORMAT=\n' - for d in sd.values(): - assert "ID" in d, "ID missing in sequence dictionary" - assert "length" in d, "length missing in sequence dictionary" - contig_id = d["ID"] - contig_length = d["length"] - contig_buffer = f"##contig= None: - """Initializes a new VariantBuilder for generating variants and VCF files. - - Args: - samples: the name of the sample(s) - sd: optional sequence dictionary - """ - self.samples: List[str] = samples if samples is not None else [] - self.sd: Dict[str, Dict[str, Any]] = sd if sd is not None else VariantBuilder.default_sd() - self.seq_idx_lookup: Dict[str, int] = {name: i for i, name in enumerate(self.sd.keys())} - self.records: List[Variant] = [] - - def __enter__(self) -> "VariantBuilder": - return self - - def __exit__(self, *args: Any) -> bool: # type: ignore - self.close() - return False - - @abstractmethod - def close(self) -> None: - """Closes this builder""" - pass - - @abstractmethod - def add( - self, - contig: Optional[str] = None, - pos: Optional[int] = None, - id: Optional[str] = None, - ref: Optional[str] = None, - alts: Optional[Iterable[str]] = None, - qual: Optional[int] = None, - filter: Optional[Iterable[str]] = None, - info: Optional[Dict[str, Any]] = None, - samples: Optional[Dict[str, Dict[str, Any]]] = None, - ) -> Variant: - """Generates a new variant and adds it to the internal collection. - - Args: - contig: the chromosome name - pos: the position - id: the variant id - ref: the reference allele - alts: the list of alternate alleles, None if no alternates - qual: the variant quality - filter: the list of filters, None if no filters (ex. PASS) - info: the dictionary of INFO key-value pairs - samples: the dictionary of sample name to FORMAT key-value pairs. - """ - pass - - @abstractmethod - def to_path(self, path: Optional[Path] = None) -> Path: - """Returns a path to a VCF for variants added to this builder. - Args: - path: optional path to the VCF - """ - pass - - @staticmethod - def _update_path(path: Optional[Path]) -> Path: - """Gets the path to a VCF file. If path is a directory, a temporary VCF will be created in - that directory. If path is `None`, then a temporary VCF will be created. Otherwise, the - given path is simply returned. - - Args: - path: optionally the path to the VCF, or a directory to create a temporary VCF. - """ - if path is None: - with NamedTemporaryFile(suffix=".vcf", delete=False) as fp: - path = Path(fp.name) - assert path.is_file() - return path - - def to_unsorted_list(self) -> List[Variant]: - """Returns the accumulated records in the order they were created.""" - return list(self.records) - - @abstractmethod - def _get_contig_pos_end(self, variant: Variant) -> Tuple[str, int, int]: - """Getter for chromosome, start and end pos from Variant""" - pass - - def _sort_key(self, variant: Variant) -> Tuple[int, int, int]: - contig, pos, end = self._get_contig_pos_end(variant) - return self.seq_idx_lookup[contig], pos, end - - def to_sorted_list(self) -> List[Variant]: - """Returns the accumulated records in coordinate order.""" - return sorted(self.records, key=self._sort_key) - - @abstractmethod - def add_header_line(self, header_line: str) -> None: - """Add a line to the VCF header.""" - pass - - def add_info_header_field( - self, - name: str, - field_type: VcfFieldType, - number: Union[int, VcfFieldNumber] = 1, - description: Optional[str] = None, - source: Optional[str] = None, - version: Optional[str] = None, - ) -> None: - """Add an INFO header field to the VCF header. - - Args: - name: the name of the field - field_type: the field_type of the field - number: the number of the field - description: the description of the field - source: the source of the field - version: the version of the field - """ - if field_type == VcfFieldType.FLAG: - number = 0 # FLAGs always have number = 0 - header_line = f"##INFO= None: - """ - Add a FORMAT header field to the VCF header. - - Args: - name: the name of the field - field_type: the field_type of the field - number: the number of the field - description: the description of the field - """ - header_line = f"##FORMAT= None: - """ - Add a FILTER header field to the VCF header. - - Args: - name: the name of the field - description: the description of the field - """ - header_line = f"##FILTER= None: - """Closes this builder""" - pass + @classmethod + def default_sd(cls) -> Dict[str, Dict[str, Any]]: + """Generates the sequence dictionary that is used by default by VariantBuilder. + Re-uses the dictionary from SamBuilder for consistency. + + Returns: + A new copy of the sequence dictionary as a map of contig name to dictionary, one per + contig. + """ + sd: Dict[str, Dict[str, Any]] = {} + for sequence in SamBuilder.default_sd(): + contig = sequence["SN"] + sd[contig] = {"ID": contig, "length": sequence["LN"]} + return sd + + @classmethod + def _build_header_string( + cls, sd: Optional[Dict[str, Dict[str, Any]]] = None + ) -> Iterator[str]: + """Builds the VCF header with the given sample name(s) and sequence dictionary. + + Args: + sd: the sequence dictionary mapping the contig name to the key-value pairs for the + given contig. Must include "ID" and "length" for each contig. If no sequence + dictionary is given, will use the default dictionary. + """ + if sd is None: + sd = VariantBuilder.default_sd() + # add mandatory VCF format + yield "##fileformat=VCFv4.2" + # add GT + yield '##FORMAT=' + # add additional common INFO lines + yield '##INFO=' + yield ( + '##INFO=' + ) + yield '##INFO=' + # add additional common FORMAT lines + yield ( + '##FORMAT=' + ) + yield '##FORMAT=' + yield '##FORMAT=' + + for d in sd.values(): + if "ID" not in d or "length" not in d: + raise ValueError( + "Sequence dictionary must include 'ID' and 'length' for each contig." + ) + contig_id = d["ID"] + contig_length = d["length"] + contig_header = f"##contig= PysamVariant: + ) -> VariantRecord: """Generates a new variant and adds it to the internal collection. - Very little validation is done with respect to INFO and FORMAT keys being defined in the + Notes: + * Very little validation is done with respect to INFO and FORMAT keys being defined in the header. + * VCFs are 1-based, but pysam is (mostly) 0-based. We define the function in terms of the + VCF property "pos", which is 1-based. pysam will also report "pos" as 1-based, so that is + the property that should be accessed when using the records produced by this function (not + "start"). Args: - contig: the chromosome name - pos: the position + contig: the chromosome name. If None, will use the first contig in the sequence + dictionary. + pos: the 1-based position of the variant id: the variant id ref: the reference allele alts: the list of alternate alleles, None if no alternates qual: the variant quality filter: the list of filters, None if no filters (ex. PASS) info: the dictionary of INFO key-value pairs - formats: the dictionary of sample name to FORMAT key-value pairs. - input_start_index: the genomic start index for input data (e.g. 0 for 0-based, 1-for - 1-based). VCF files are 1-based, but pysam is 0-based. + samples: the dictionary of sample name to FORMAT key-value pairs. + if a sample property is supplied for any sample but omitted in some, it will + be set to missing (".") for samples that don't have that property explicitly + assigned. If a sample in the VCF is omitted, all its properties will be set to + missing. """ - contig, pos, id, ref, alts, qual = VariantBuilderDefaults.init( - self.sd, contig, pos, id, ref, alts, qual - ) + if contig is None: + contig = next(iter(self.sd.keys())) if contig not in self.sd: raise ValueError(f"Chromosome `{contig}` not in the sequence dictionary.") if samples is not None and any(sample not in self.samples for sample in samples): raise ValueError( - "Extra sample(s) given: " + "Unknown sample(s) given: " + ", ".join(sample for sample in samples if sample not in self.samples) ) + if isinstance(alts, str): + alts = (alts,) alleles = (ref,) if alts is None else (ref, *alts) if self.samples is None or len(self.samples) == 0: @@ -110,15 +215,16 @@ def add( elif samples is None or len(samples) == 0: formats = [{"GT": "./."} for _ in self.samples] else: - format_keys = sorted({key for sample in self.samples for key in samples[sample]}) + all_format_keys = sorted({key for sample in self.samples for key in samples[sample]}) formats = [ None if sample not in samples - else {key: samples[sample].get(key, ".") for key in format_keys} + else {key: samples[sample].get(key, ".") for key in all_format_keys} for sample in self.samples ] - start = pos - input_start_index # pysam is zero-based + # pysam is zero based, half-open [start, stop) + start = pos - 1 # pysam "start" is zero-based stop = start + len(ref) variant = self.header.new_record( contig=contig, @@ -141,7 +247,7 @@ def to_path(self, path: Optional[Path] = None) -> Path: path: optional path to the VCF """ # update the path - path = self._update_path(path) + path = self._to_vcf_path(path) # Create a writer and write to it with PysamWriter(path, header=self.header) as writer: @@ -150,10 +256,103 @@ def to_path(self, path: Optional[Path] = None) -> Path: return path - def _get_contig_pos_end(self, variant: PysamVariant) -> Tuple[str, int, int]: - """Getter for chromosome, start and end pos from Variant""" - return variant.contig, variant.start, variant.stop + @staticmethod + def _to_vcf_path(path: Optional[Path]) -> Path: + """Gets the path to a VCF file. If path is a directory, a temporary VCF will be created in + that directory. If path is `None`, then a temporary VCF will be created. Otherwise, the + given path is simply returned. + + Args: + path: optionally the path to the VCF, or a directory to create a temporary VCF. + """ + if path is None: + with NamedTemporaryFile(suffix=".vcf", delete=False) as fp: + path = Path(fp.name) + assert path.is_file() + return path + + def to_unsorted_list(self) -> List[VariantRecord]: + """Returns the accumulated records in the order they were created.""" + return list(self.records) + + def to_sorted_list(self) -> List[VariantRecord]: + """Returns the accumulated records in coordinate order.""" + return sorted(self.records, key=self._sort_key) + + def _sort_key(self, variant: VariantRecord) -> Tuple[int, int, int]: + return self.seq_idx_lookup[variant.contig], variant.start, variant.stop def add_header_line(self, line: str) -> None: """Adds a header line to the header""" self.header.add_line(line) + + def add_info_header( + self, + name: str, + field_type: VcfFieldType, + number: Union[int, VcfFieldNumber] = 1, + description: Optional[str] = None, + source: Optional[str] = None, + version: Optional[str] = None, + ) -> None: + """Add an INFO header field to the VCF header. + + Args: + name: the name of the field + field_type: the field_type of the field + number: the number of the field + description: the description of the field + source: the source of the field + version: the version of the field + """ + if field_type == VcfFieldType.FLAG: + number = 0 # FLAGs always have number = 0 + header_line = f"##INFO= None: + """ + Add a FORMAT header field to the VCF header. + + Args: + name: the name of the field + field_type: the field_type of the field + number: the number of the field + description: the description of the field + """ + header_line = f"##FORMAT= None: + """ + Add a FILTER header field to the VCF header. + + Args: + name: the name of the field + description: the description of the field + """ + header_line = f"##FILTER= None: """Add needed headers to the VariantBuilder.""" for filter in _ALL_FILTERS: - variant_builder.add_filter_header_field(filter) + variant_builder.add_filter_header(filter) for field_name, field_type in _INFO_FIELD_TYPES.items(): - variant_builder.add_info_header_field(field_name, field_type=field_type) + variant_builder.add_info_header(field_name, field_type=field_type) def _fix_value(value: Any) -> Any: