Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reference set builder dev #24

Merged
merged 22 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added fgpyo/fasta/__init__.py
Empty file.
174 changes: 174 additions & 0 deletions fgpyo/fasta/reference_set_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""
Classes for generating Fasta files and records for testing
----------------------------------------------------------------
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
"""
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
# import hashlib
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
import os
import textwrap
from tempfile import NamedTemporaryFile
from typing import List
from typing import Optional

# pylint: disable=W0511
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved


class ReferenceSetBuilder:
"""
Builder for constructing one or more fasta records.
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
"""

# The default asssembly
DEFAULT_ASSEMBLY: str = "testassembly"

# The default species
DEFAULT_SPECIES: str = "testspecies"
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved

# Way to store instance of ReferenceBuilder
# TODO make something better than a list... probably
REF_BUILDERS: List["ReferenceBuilder"] = []
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved

def __init__(
self,
assembly: Optional[str] = None,
species: Optional[str] = None,
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
line_length: int = 80,
):
self.assembly: str = assembly if assembly is not None else self.DEFAULT_ASSEMBLY
self.species: str = species if species is not None else self.DEFAULT_SPECIES
self.line_length = line_length
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved

def add(
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
self,
name: str,
) -> "ReferenceBuilder":
"""
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
Returns instance of ReferenceBuilder
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
"""

builder = ReferenceBuilder(name=name, assembly=self.assembly, species=self.species)
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
self.REF_BUILDERS.append(builder)
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
return self.REF_BUILDERS[-1]
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved

def writer_helper(
self,
) -> None:
""" Place holder for helper funciton """
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
return None

def to_temp_file(
self,
delete_on_exit: Optional[bool] = True,
calculate_md5_sum: Optional[bool] = False,
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""
For each instance of ReferenceBuilder in REF_BUILDERS write record to temp file
"""
# Write temp file path
path = NamedTemporaryFile(
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
prefix=f"{self.assembly}_{self.species}",
suffix=".fasta",
delete=delete_on_exit,
mode=("a+t"),
)

for record in range(len(self.REF_BUILDERS)):
seq = self.REF_BUILDERS[record].sequences
assembly = self.REF_BUILDERS[record].assembly
species = self.REF_BUILDERS[record].species
name = self.REF_BUILDERS[record].name
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
header = f">{name}[{assembly}][{species}]\n"
seq_format = "\n".join(textwrap.wrap(seq, self.line_length))
try:
path.write(header)
path.write(f"{seq_format}\n\n")
except OSError as error:
print(f"{error}\nCound not write to {path}")
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved

# if calculate_md5_sum:
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
# pylint: disable=W0612
# with open(path.name, "rb") as path_to_read:
# contents = path_to_read.read()
# md5 = hashlib.md5(contents).hexdigest()

# Use md5 to write dict
# Write .fai

path.close()

def to_file(
self,
path: str,
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
delete_on_exit: Optional[bool] = True,
calculate_md5_sum: Optional[bool] = False,
) -> None:
"""
Same as to_temp_file() but user provides path
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
"""
with open(path, "a+") as fasta_handle:
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
for record in range(len(self.REF_BUILDERS)):
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
seq = self.REF_BUILDERS[record].sequences
assembly = self.REF_BUILDERS[record].assembly
species = self.REF_BUILDERS[record].species
name = self.REF_BUILDERS[record].name
header = f">{name}[{assembly}][{species}]\n"
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
seq_format = "\n".join(textwrap.wrap(seq, self.line_length))
try:
fasta_handle.write(header)
fasta_handle.write(f"{seq_format}\n\n")
except OSError as error:
print(f"{error}\nCound not write to {path}")

# if calculate_md5_sum:
# with open(path, "rb") as path_to_read:
# contents = path_to_read.read()
# md5 = hashlib.md5(contents).hexdigest()

# Use md5 to write dict
# Write .fai
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved

if delete_on_exit:
os.remove(path)
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved


# pylint: disable=R0903
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
class ReferenceBuilder:
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
"""
Creates individiaul records
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(
self,
name: str,
assembly: str,
species: str,
sequences: Optional[str] = str(),
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
):
self.name = name
self.assembly = assembly
self.species = species
self.sequences = sequences
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved

def add(self, seq: str, times: int) -> "ReferenceBuilder":
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
"""
"AAA"*3 = AAAAAAAAA
sam-white04 marked this conversation as resolved.
Show resolved Hide resolved
"""
# add check that seq is not supplied via "AAA "*100
self.sequences += str(seq * times)
return self


# Scratch
# builder_ex = ReferenceSetBuilder()
# builder_ex.add("chr10").add("NNNNNNNNNN", 1)
# builder_ex.add("chr10").add("AAAAAAAAAA", 2)
# builder_ex.add("chr3").add("GGGGGGGGGG", 10)
# builder_ex.to_file(path="some.fasta", calculate_md5_sum=True, delete_on_exit=True)
# builder_ex.to_temp_file(calculate_md5_sum=True)


# builder_ex = ReferenceSetBuilder()
# b = builder_ex.add("chr10")
# b.add("NNNNNNNNNN", 1)
# b.add("ACGT", 1)
# c = builder_ex.add("chrY").add("NNNNN", 10)
# builder_ex.to_file(path="some.fasta", calculate_md5_sum=True, delete_on_exit=False)
Empty file added fgpyo/fasta/tests/__init__.py
Empty file.
11 changes: 11 additions & 0 deletions fgpyo/fasta/tests/test_reference_set_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""Basic tests for reference_set_builder"""

from fgpyo.fasta.reference_set_builder import ReferenceSetBuilder


def test_add_seq_to_record() -> None:
builder = ReferenceSetBuilder()
builder.add("chr1").add("AAAAAAAAAA", 10).add("NNNNNNNNNN", 10)
builder.add("chr10").add("GGGGGGGGGG", 10)
assert len(builder.REF_BUILDERS[0].sequences) == 200
assert len(builder.REF_BUILDERS[1].sequences) == 100