view vcfToMutationVector.py @ 37:e81019e3ac99

Updated synapseGetDataset to look at the filename rather than the (no longer existant) content type field to determine if the data is in zip format
author melissacline
date Mon, 27 Jul 2015 16:29:24 -0700
parents 60efb9214eaa
children
line wrap: on
line source

#!/usr/bin/env python

"""A mutationVector generator for the UCSC Xena platform

This script takes a VCF file generated by snpEff.
Its output is a mutationVector object that
represents the data in the language of the Xena platform.

Required Arguments: 
snpEff output VCF filename: the output of snpEFF for the same VCF file

Optional Arguments:
- trinity: TRUE if this is a trinity-formatted VCF.  This concerns how to 
get the sample name.
"""

import argparse
import re
import string
import vcf

impact = {"MODIFIER":0, "LOW":1, "MODERATE":2, "HIGH":3}


def _parseSnpEffColumnLabels(vcfReader):
    """Given a line of format
    ##INFO=<ID=EFF,Number=.,Type=String,Description="Predicted
    effects for this variant.Format: 'Effect ( Effect_Impact |
    Functional_Class | Codon_Change | Amino_Acid_Change|
    Amino_Acid_length | Gene_Name | Transcript_BioType | Gene_Coding |
    Transcript_ID | Exon_Rank | Genotype_Number [ | ERRORS | WARNINGS
    ] )'">
    
    which is translated by the VCF reader to the following:
    
    'Effect ( Effect_Impact | Functional_Class | Codon_Change | Amino_Acid_change| Amino_Acid_length | Gene_Name | Transcript_BioType | Gene_Coding | Transcript_ID | Exon  | GenotypeNum [ | ERRORS | WARNINGS ] )'
    
    Parse out the ordered list of tokens as they will appear:       
    ['Functional_Class', 'Codon_Change', 'Amino_Acid_Change',       
    'Amino_Acid_length', 'Gene_Name', 'Transcript_BioType',         
    'Gene_Coding', 'Transcript_ID', 'Exon_Rank', 'Genotype_Number'  
    'ERRORS']"""
    snpEffHeaderLabel = vcfReader.infos["EFF"].desc.split("\'")[1]
    snpEffHeaderLabel = re.sub("[\[\] ]", "", snpEffHeaderLabel)
    snpEffHeaderLabel = re.sub("ERRORS[|]WARNINGS", "ERRORS",
                               snpEffHeaderLabel)
    snpEffLabelTokens = re.split("[(|)]", snpEffHeaderLabel)[1:-1]
    return(snpEffLabelTokens)


#def _parseEffectsPerGene(effectString, columnLabels, GT_code):
def parseEffectsPerGene(effectString, columnLabels):
    if effectString =="":
        return {}
    effectPerGene = dict()
    effects = effectString.split(",")
    for thisEffect in effects:
        effectType = thisEffect.split("(")[0]
        #                                                               
        # Given a string such as                                     
        # downstream_gene_variant(MODIFIER||3956||459|CA9|||NM_001216.2||1)
        # extract the stuff between the parens, divide it by |, and store  
        # it in a dictionary indexed by the column labels given as input   
        #                                                                 
        effectTokens = re.sub(".+\(", "",
                              re.sub("\)", "", thisEffect)).split("|")
        effect = dict()
        for ii in range(0,len(effectTokens)):
            effect[columnLabels[ii]] = effectTokens[ii]
        #match GT_code                             
        #if GT_code and GT_code != int(effect["Genotype_Number"]):
        #    continue
        effect["effect"] = effectType
        #                                                 
        # Parse through the list of effects.  Extract the gene.          
        # Save one effect per gene, choosing an arbitrary effect     
        # from the most severe effect class.                          
        #                                     
        thisGene = effect["Gene_Name"]
        if not effectPerGene.has_key(thisGene):
            effectPerGene[thisGene] = effect
        else:
            impactThisEffect = effect["Effect_Impact"]
            worstImpactYet = effectPerGene[thisGene]["Effect_Impact"]
            if impact[impactThisEffect] > impact[worstImpactYet]:
                effectPerGene[thisGene] = effect
            elif impact[impactThisEffect] == impact[worstImpactYet]:
                if effect["Amino_Acid_length"] > effectPerGene[thisGene]["Amino_Acid_length"]:
                    effectPerGene[thisGene] = effect
    return(effectPerGene)




def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("snpEffVcf", type=str, help="VCF file from snpEff")
    parser.add_argument("--trinity", type=int, default=False, 
                        help="""True if this VCF represents Trinity output,
                                and if the sample name is then stored in 
                                the info field under samples[name]""")
    args = parser.parse_args()
    print string.join(["#sample", "chr", "start", "end", "reference", 
                       "alt", "gene", "effect", "DNA_VAF", "RNA_VAF",
                       "Amino_Acid_Change"], "\t")
    vcfReader = vcf.VCFReader(open(args.snpEffVcf, "r"))
    snpEffHeaderColumnLabels = _parseSnpEffColumnLabels(vcfReader)
    for record in vcfReader:
        if record.ALT =="NA":
            continue ######## bad calls in the VCF     
        sampleName = "Unknown"
        if args.trinity:
            sampleName = record.samples[0]['name']
        strand = record.ID
        start = record.POS
        end = int(record.POS) + len(record.REF) - 1
        snpEffEffectsString = record.INFO["EFF"]
        effectPerGene = parseEffectsPerGene(snpEffEffectsString, 
                                            snpEffHeaderColumnLabels)
        if len(effectPerGene)!=0:
            for gene in effectPerGene.keys():
                aaChange = effectPerGene[gene]["Amino_Acid_change"]
                if aaChange !="":
                    aaChange="p."+aaChange
                print string.join([sampleName, record.CHROM, str(start), 
                                   str(end), 
                                   record.REF, record.ALT[0], 
                                   gene, effectPerGene[gene]['effect'],
                                   "0.0", str(record.INFO["AF"][0]), aaChange], "\t")
        else:
            print string.join([sampleName, record.CHROM, str(start), str(end), 
                               record.REF, record.ALT[0], "", "",
                               "0.0", str(record.INFO["AF"][0]), ""], "\t")




if __name__ == '__main__':
        main()