view vcfToMutationVector.py @ 7:1d150e860c4d

Expanded the functionality of the merge genomic datasets tool, to generate an output dataset with the file (or label) indicating where each column came from
author melissacline
date Mon, 09 Mar 2015 19:58:03 -0700
parents 60efb9214eaa
children
line wrap: on
line source

#!/usr/bin/env python

"""A mutationVector generator for the UCSC Xena platform

This script takes a VCF file generated by snpEff.
Its output is a mutationVector object that
represents the data in the language of the Xena platform.

Required Arguments: 
snpEff output VCF filename: the output of snpEFF for the same VCF file

Optional Arguments:
- trinity: TRUE if this is a trinity-formatted VCF.  This concerns how to 
get the sample name.
"""

import argparse
import re
import string
import vcf

impact = {"MODIFIER":0, "LOW":1, "MODERATE":2, "HIGH":3}


def _parseSnpEffColumnLabels(vcfReader):
    """Given a line of format
    ##INFO=<ID=EFF,Number=.,Type=String,Description="Predicted
    effects for this variant.Format: 'Effect ( Effect_Impact |
    Functional_Class | Codon_Change | Amino_Acid_Change|
    Amino_Acid_length | Gene_Name | Transcript_BioType | Gene_Coding |
    Transcript_ID | Exon_Rank | Genotype_Number [ | ERRORS | WARNINGS
    ] )'">
    
    which is translated by the VCF reader to the following:
    
    'Effect ( Effect_Impact | Functional_Class | Codon_Change | Amino_Acid_change| Amino_Acid_length | Gene_Name | Transcript_BioType | Gene_Coding | Transcript_ID | Exon  | GenotypeNum [ | ERRORS | WARNINGS ] )'
    
    Parse out the ordered list of tokens as they will appear:       
    ['Functional_Class', 'Codon_Change', 'Amino_Acid_Change',       
    'Amino_Acid_length', 'Gene_Name', 'Transcript_BioType',         
    'Gene_Coding', 'Transcript_ID', 'Exon_Rank', 'Genotype_Number'  
    'ERRORS']"""
    snpEffHeaderLabel = vcfReader.infos["EFF"].desc.split("\'")[1]
    snpEffHeaderLabel = re.sub("[\[\] ]", "", snpEffHeaderLabel)
    snpEffHeaderLabel = re.sub("ERRORS[|]WARNINGS", "ERRORS",
                               snpEffHeaderLabel)
    snpEffLabelTokens = re.split("[(|)]", snpEffHeaderLabel)[1:-1]
    return(snpEffLabelTokens)


#def _parseEffectsPerGene(effectString, columnLabels, GT_code):
def parseEffectsPerGene(effectString, columnLabels):
    if effectString =="":
        return {}
    effectPerGene = dict()
    effects = effectString.split(",")
    for thisEffect in effects:
        effectType = thisEffect.split("(")[0]
        #                                                               
        # Given a string such as                                     
        # downstream_gene_variant(MODIFIER||3956||459|CA9|||NM_001216.2||1)
        # extract the stuff between the parens, divide it by |, and store  
        # it in a dictionary indexed by the column labels given as input   
        #                                                                 
        effectTokens = re.sub(".+\(", "",
                              re.sub("\)", "", thisEffect)).split("|")
        effect = dict()
        for ii in range(0,len(effectTokens)):
            effect[columnLabels[ii]] = effectTokens[ii]
        #match GT_code                             
        #if GT_code and GT_code != int(effect["Genotype_Number"]):
        #    continue
        effect["effect"] = effectType
        #                                                 
        # Parse through the list of effects.  Extract the gene.          
        # Save one effect per gene, choosing an arbitrary effect     
        # from the most severe effect class.                          
        #                                     
        thisGene = effect["Gene_Name"]
        if not effectPerGene.has_key(thisGene):
            effectPerGene[thisGene] = effect
        else:
            impactThisEffect = effect["Effect_Impact"]
            worstImpactYet = effectPerGene[thisGene]["Effect_Impact"]
            if impact[impactThisEffect] > impact[worstImpactYet]:
                effectPerGene[thisGene] = effect
            elif impact[impactThisEffect] == impact[worstImpactYet]:
                if effect["Amino_Acid_length"] > effectPerGene[thisGene]["Amino_Acid_length"]:
                    effectPerGene[thisGene] = effect
    return(effectPerGene)




def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("snpEffVcf", type=str, help="VCF file from snpEff")
    parser.add_argument("--trinity", type=int, default=False, 
                        help="""True if this VCF represents Trinity output,
                                and if the sample name is then stored in 
                                the info field under samples[name]""")
    args = parser.parse_args()
    print string.join(["#sample", "chr", "start", "end", "reference", 
                       "alt", "gene", "effect", "DNA_VAF", "RNA_VAF",
                       "Amino_Acid_Change"], "\t")
    vcfReader = vcf.VCFReader(open(args.snpEffVcf, "r"))
    snpEffHeaderColumnLabels = _parseSnpEffColumnLabels(vcfReader)
    for record in vcfReader:
        if record.ALT =="NA":
            continue ######## bad calls in the VCF     
        sampleName = "Unknown"
        if args.trinity:
            sampleName = record.samples[0]['name']
        strand = record.ID
        start = record.POS
        end = int(record.POS) + len(record.REF) - 1
        snpEffEffectsString = record.INFO["EFF"]
        effectPerGene = parseEffectsPerGene(snpEffEffectsString, 
                                            snpEffHeaderColumnLabels)
        if len(effectPerGene)!=0:
            for gene in effectPerGene.keys():
                aaChange = effectPerGene[gene]["Amino_Acid_change"]
                if aaChange !="":
                    aaChange="p."+aaChange
                print string.join([sampleName, record.CHROM, str(start), 
                                   str(end), 
                                   record.REF, record.ALT[0], 
                                   gene, effectPerGene[gene]['effect'],
                                   "0.0", str(record.INFO["AF"][0]), aaChange], "\t")
        else:
            print string.join([sampleName, record.CHROM, str(start), str(end), 
                               record.REF, record.ALT[0], "", "",
                               "0.0", str(record.INFO["AF"][0]), ""], "\t")




if __name__ == '__main__':
        main()