view vcfToMutationVector.py @ 29:d776b829f83f

Rolling the synapseClient dependency back to version 1.1
author melissacline
date Wed, 22 Jul 2015 16:59:35 -0700
parents 60efb9214eaa
children
line wrap: on
line source

#!/usr/bin/env python

"""A mutationVector generator for the UCSC Xena platform

This script takes a VCF file generated by snpEff.
Its output is a mutationVector object that
represents the data in the language of the Xena platform.

Required Arguments: 
snpEff output VCF filename: the output of snpEFF for the same VCF file

Optional Arguments:
- trinity: TRUE if this is a trinity-formatted VCF.  This concerns how to 
get the sample name.
"""

import argparse
import re
import string
import vcf

impact = {"MODIFIER":0, "LOW":1, "MODERATE":2, "HIGH":3}


def _parseSnpEffColumnLabels(vcfReader):
    """Given a line of format
    ##INFO=<ID=EFF,Number=.,Type=String,Description="Predicted
    effects for this variant.Format: 'Effect ( Effect_Impact |
    Functional_Class | Codon_Change | Amino_Acid_Change|
    Amino_Acid_length | Gene_Name | Transcript_BioType | Gene_Coding |
    Transcript_ID | Exon_Rank | Genotype_Number [ | ERRORS | WARNINGS
    ] )'">
    
    which is translated by the VCF reader to the following:
    
    'Effect ( Effect_Impact | Functional_Class | Codon_Change | Amino_Acid_change| Amino_Acid_length | Gene_Name | Transcript_BioType | Gene_Coding | Transcript_ID | Exon  | GenotypeNum [ | ERRORS | WARNINGS ] )'
    
    Parse out the ordered list of tokens as they will appear:       
    ['Functional_Class', 'Codon_Change', 'Amino_Acid_Change',       
    'Amino_Acid_length', 'Gene_Name', 'Transcript_BioType',         
    'Gene_Coding', 'Transcript_ID', 'Exon_Rank', 'Genotype_Number'  
    'ERRORS']"""
    snpEffHeaderLabel = vcfReader.infos["EFF"].desc.split("\'")[1]
    snpEffHeaderLabel = re.sub("[\[\] ]", "", snpEffHeaderLabel)
    snpEffHeaderLabel = re.sub("ERRORS[|]WARNINGS", "ERRORS",
                               snpEffHeaderLabel)
    snpEffLabelTokens = re.split("[(|)]", snpEffHeaderLabel)[1:-1]
    return(snpEffLabelTokens)


#def _parseEffectsPerGene(effectString, columnLabels, GT_code):
def parseEffectsPerGene(effectString, columnLabels):
    if effectString =="":
        return {}
    effectPerGene = dict()
    effects = effectString.split(",")
    for thisEffect in effects:
        effectType = thisEffect.split("(")[0]
        #                                                               
        # Given a string such as                                     
        # downstream_gene_variant(MODIFIER||3956||459|CA9|||NM_001216.2||1)
        # extract the stuff between the parens, divide it by |, and store  
        # it in a dictionary indexed by the column labels given as input   
        #                                                                 
        effectTokens = re.sub(".+\(", "",
                              re.sub("\)", "", thisEffect)).split("|")
        effect = dict()
        for ii in range(0,len(effectTokens)):
            effect[columnLabels[ii]] = effectTokens[ii]
        #match GT_code                             
        #if GT_code and GT_code != int(effect["Genotype_Number"]):
        #    continue
        effect["effect"] = effectType
        #                                                 
        # Parse through the list of effects.  Extract the gene.          
        # Save one effect per gene, choosing an arbitrary effect     
        # from the most severe effect class.                          
        #                                     
        thisGene = effect["Gene_Name"]
        if not effectPerGene.has_key(thisGene):
            effectPerGene[thisGene] = effect
        else:
            impactThisEffect = effect["Effect_Impact"]
            worstImpactYet = effectPerGene[thisGene]["Effect_Impact"]
            if impact[impactThisEffect] > impact[worstImpactYet]:
                effectPerGene[thisGene] = effect
            elif impact[impactThisEffect] == impact[worstImpactYet]:
                if effect["Amino_Acid_length"] > effectPerGene[thisGene]["Amino_Acid_length"]:
                    effectPerGene[thisGene] = effect
    return(effectPerGene)




def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("snpEffVcf", type=str, help="VCF file from snpEff")
    parser.add_argument("--trinity", type=int, default=False, 
                        help="""True if this VCF represents Trinity output,
                                and if the sample name is then stored in 
                                the info field under samples[name]""")
    args = parser.parse_args()
    print string.join(["#sample", "chr", "start", "end", "reference", 
                       "alt", "gene", "effect", "DNA_VAF", "RNA_VAF",
                       "Amino_Acid_Change"], "\t")
    vcfReader = vcf.VCFReader(open(args.snpEffVcf, "r"))
    snpEffHeaderColumnLabels = _parseSnpEffColumnLabels(vcfReader)
    for record in vcfReader:
        if record.ALT =="NA":
            continue ######## bad calls in the VCF     
        sampleName = "Unknown"
        if args.trinity:
            sampleName = record.samples[0]['name']
        strand = record.ID
        start = record.POS
        end = int(record.POS) + len(record.REF) - 1
        snpEffEffectsString = record.INFO["EFF"]
        effectPerGene = parseEffectsPerGene(snpEffEffectsString, 
                                            snpEffHeaderColumnLabels)
        if len(effectPerGene)!=0:
            for gene in effectPerGene.keys():
                aaChange = effectPerGene[gene]["Amino_Acid_change"]
                if aaChange !="":
                    aaChange="p."+aaChange
                print string.join([sampleName, record.CHROM, str(start), 
                                   str(end), 
                                   record.REF, record.ALT[0], 
                                   gene, effectPerGene[gene]['effect'],
                                   "0.0", str(record.INFO["AF"][0]), aaChange], "\t")
        else:
            print string.join([sampleName, record.CHROM, str(start), str(end), 
                               record.REF, record.ALT[0], "", "",
                               "0.0", str(record.INFO["AF"][0]), ""], "\t")




if __name__ == '__main__':
        main()