Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check the OM coverage after alignment #3

Open
yuxuanyuan opened this issue Apr 22, 2020 · 0 comments
Open

Check the OM coverage after alignment #3

yuxuanyuan opened this issue Apr 22, 2020 · 0 comments

Comments

@yuxuanyuan
Copy link
Collaborator

yuxuanyuan commented Apr 22, 2020

Here I post a python3 script to help check the om coverage after alignment

#!/usr/bin/env python3

# This script aims to parse the OM alignments from OMBlast and get the coverage 
# ScriptName: omCoverage.py
# Created date: 22/04/2020
# Last modified: 22/04/2020
# Notes: v1.0 is the initial version
# Copyright: Copyright (c) 2020 Yuxuan Yuan ([email protected])

#======================= Modules ======================
import sys
import os
import logging
import argparse
import numpy as np
import pandas as pd

#======================== Function ==========================
DEBUG="" #change it when debugging
logFormat = "%(asctime)s [%(levelname)s] %(message)s"
level = "DEBUG" if DEBUG != "" else "INFO"
logging.basicConfig( stream=sys.stderr, level=level, format=logFormat )

cmapHeader = "# CMAP File Version:\t0.1\n\
# Label Channels:\t1\n\
# Nickase Recognition Site 1:\tunkbown\n\
# Number of Consensus Maps:\tunknown\n\
#h CMapID\tContigLength\tNumSites\tSiteID\tLabelChannel\tPosition\tStdDev\tCoverage\tOccurrence\n\
#f int\tfloat\tint\tint\tint\tfloat\tfloat\tint\tint\
"

class omCov(object):
    """check the OM coverage of chromosome specific regions"""
    
    def __init__(self, oma, refcmap, chr, start, end, prefix, outDir):
        self.oma = oma
        self.refcmap = refcmap
        self.chr = chr
        self.start = int(start)
        self.end = int(end)
        self.prefix = prefix
        self.outDir = outDir

    def checkDir(self):
        """check the output directory"""
        logging.info("Checking folder: '%s'" % self.outDir)
        dirname = os.path.abspath(self.outDir)
        if not os.path.isdir(dirname):
            logging.error("Folder: '%s' does not exit. Please check!" % self.outDir)
            sys.exit(-1)
        if not os.access(dirname, os.W_OK):
            logging.error("Folder folder: '%s' is not writable. Please check!" % self.outDir)
            sys.exit(-1) 

    def checkFile(self, Filename):
        """check the input file"""
        logging.info("Checking file: '%s'" % Filename) 
        filename =  os.path.abspath(Filename)
        if not os.path.isfile(filename):
            logging.error("File: '%s' does not exit. Please check!" % Filename)
            sys.exit(-1)
        if not os.access(filename, os.R_OK):
            logging.error("File: '%s' is not readable. Please check!" % Filename)
            sys.exit(-1) 

    def parseAlignment(self):
        """parse the alignment"""
        logging.info("Starting to parse the alignemnt ...")
        refCoords = dict()
        qryCoords = dict()
        parsed = np.empty(shape = [0, 8])

        logging.info("Loading the refcmap file ...")
        ## parse refcmap
        with open (self.refcmap, 'r') as ref:
            for line in ref:
                line = line.strip()
                if line[0] != "#":
                    items = line.split("\t")
                    refID = items[0]
                    #seqLen = int(float(items[1]))
                    nSites = int(items[2])
                    siteID = int(items[3])
                    siteCoord = int(float(items[5]))
                    if siteID <= nSites:
                        refCoords['%s-%s'%(refID, siteID)] = siteCoord
        
        logging.info("Parsing the OMA file ...")
        ##parse the OM file
        with open (self.oma, 'r') as OMA:
            for line in OMA:
                sum = 0
                line = line.strip()
                if line[0] != "#":
                    items = line.split("\t")
                    molID = items[0]
                    #nMolSeq = int(items[1])
                    molSegs = items[2]
                    coords = molSegs.split(";")
                    S = 0.0
                    for i in range(len(coords)):
                        S += int(coords[i])
                    S += len(coords) - 1
                    for i in range(len(coords)):                                
                        sum = 0.0
                        for j in range(i+1):
                            sum += int(coords[j])
                        qryCoords['%s-%s'%(molID, i+1)] = sum+i+1

                    try:
                        refID = items[3]
                        strand = items[4]
                        score = float(items[5])
                        #confidence = float(items[6])
                        refSegStartID = int(items[7])
                        refSegEndID = int(items[8])
                        molSegStartID = int(items[9])
                        molSegEndID = int(items[10])
                        #refStartCoord = int(items[11])
                        #refEndCoord = int(items[12])
                        cigar = items[13]

                        mylist = []
                        start = 0
                        i = 0 
                        while  i <len(cigar):
                            try: 
                                int(cigar[i])
                            except ValueError:
                                if cigar[i] == 'M':
                                    match = int(cigar[start:i])
                                    mylist.append((match, 'M'))
                                if cigar[i] == 'D':
                                    deletion = int(cigar[start:i])
                                    mylist.append((deletion, 'D'))
                                if cigar[i] == 'I':
                                    insertion = int(cigar[start:i])
                                    mylist.append((insertion, 'I'))
                                start = i + 1
                            i += 1

                        if strand == "+":
                            for i in range(len(mylist)):
                                if mylist[i][1] == 'M':
                                    j = 0
                                    while j < int(mylist[i][0]):
                                        try:
                                            refPos = refCoords['%s-%s'%(refID, refSegStartID+j)]
                                            molPos = qryCoords['%s-%s'%(molID, molSegStartID+j)]
                                            if molSegStartID +j <= molSegEndID+1:
                                                parsed = np.append(parsed,[[refID, refSegStartID+j, refPos, molID, molSegStartID+j, molPos, strand, score]], axis = 0)
                                        except KeyError:
                                            pass
                                        j += 1
                                    refSegStartID += j
                                    molSegStartID += j

                                if mylist[i][1] == 'D':
                                    refSegStartID +=  int(mylist[i][0])
                                if mylist[i][1] == 'I':
                                    molSegStartID += int(mylist[i][0])                
                        if strand == "-":
                            molSegStartID += 1
                            i = 0
                            while i < len(mylist):
                                if mylist[i][1] == 'M':
                                    j = 0
                                    while j < int(mylist[i][0]):
                                        try:
                                            refPos = refCoords['%s-%s'%(refID, refSegStartID+j)]
                                            molPos = qryCoords['%s-%s'%(molID, molSegStartID-j)]
                                            if refSegStartID <= refSegEndID+1:
                                                parsed = np.append(parsed, [[refID, refSegStartID+j, refPos, molID, molSegStartID-j, molPos, strand, score]], axis = 0) 
                                        except KeyError:
                                            pass
                                        j += 1
                                    refSegStartID += j
                                    molSegStartID -= j     
                                                                        
                                if mylist[i][1] == 'D':
                                    refSegStartID += int(mylist[i][0])
                                if mylist[i][1] == 'I':
                                    molSegStartID -=  int(mylist[i][0]) 
                                i += 1
                    except IndexError:
                        pass
        self.parsed = pd.DataFrame(parsed, columns=['RefID', 'RefSiteID', 'RefSitePos', 'QryID', 'QrySiteID', 'QrySitePos', 'Strand', 'Score'])
        #self.parsed.to_csv("%s/%s.parsed.txt" % (self.outDir, self.prefix), sep = '\t', index = False, header = True)
        logging.info("Completing the parsing ...")

    def getCov(self):
        logging.info("Writing the coverage information to '%s.cov.txt'"% self.prefix)
        with open('%s/%s.cov.txt' % (self.outDir, self.prefix), 'w') as cov:
            cov.write('RefID\tRefSiteID\tRefSitePos\tCoverage\n')
            tmp = self.parsed[self.parsed['RefID'] == self.chr].reset_index(drop=True)
            if len(tmp) == 0:
                logging.error("Oops! It seem the input chrID: '%s' is not correct! Please check!" % self.chr)
                sys.exit(-1)
            if self.end < self.start:
                logging.error("Oops! It seem the end postion is smaller the start position! Please check!")
                sys.exit(-1)                
            sortTmp = tmp.sort_values(['RefSitePos']).reset_index(drop=True)
            sortTmp = sortTmp.astype({'RefSitePos': 'int32'})
            target = sortTmp[(sortTmp['RefSitePos']<=self.end) & (sortTmp['RefSitePos']>=self.start)]
            for i in target['RefSiteID'].unique():
                sub = target[target['RefSiteID'] == i].reset_index(drop=True)
                coverage = len(sub)
                pos = sub['RefSitePos'][0]
                cov.write('%s\t%s\t%s\t%s\n' % (self.chr, i, pos, coverage))


if __name__=="__main__":
    """
    Parse the function and usage.
    """
    parser = argparse.ArgumentParser(description='Parse the OM alignments from OMBlast')
    parser.add_argument('-v', '--version', action='version', version='1.0')
    parser.add_argument('-O', dest='oma', help='OMA file from OMBlast', type = str)
    parser.add_argument('-r', dest='refCMAP', help='reference cmap file', type = str) 
    parser.add_argument('-c', dest='chr', help='the chromosome ID', type = str)   
    parser.add_argument('-s', dest='start', help='start position', default = 0, type=int)
    parser.add_argument('-e', dest='end', help='end position', default = 0, type=int)
    parser.add_argument('-p', dest='prefix', help='prefix of the output', type=str)
    parser.add_argument('-o', dest='outDir', help='output directory', type=str)
    args = parser.parse_args()
    if None not in [args.oma, args.refCMAP, args.chr, args.start, args.end, args.prefix, args.outDir]:            
        run = omCov(args.oma, args.refCMAP, args.chr, args.start, args.end, args.prefix, args.outDir)
        run.checkDir()
        run.checkFile(args.oma)
        run.checkFile(args.refCMAP)
        run.parseAlignment()
        run.getCov()
    else:
        print
        parser.print_help()
        print
        sys.exit(-1)
@yuxuanyuan yuxuanyuan changed the title Check the OM coverage afte alignment Check the OM coverage after alignment Apr 22, 2020
@yuxuanyuan yuxuanyuan reopened this Apr 22, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant