diff vcfToMutationVector.py @ 0:60efb9214eaa

Uploaded
author melissacline
date Wed, 14 Jan 2015 13:54:03 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/vcfToMutationVector.py	Wed Jan 14 13:54:03 2015 -0500
@@ -0,0 +1,142 @@
+#!/usr/bin/env python
+
+"""A mutationVector generator for the UCSC Xena platform
+
+This script takes a VCF file generated by snpEff.
+Its output is a mutationVector object that
+represents the data in the language of the Xena platform.
+
+Required Arguments: 
+snpEff output VCF filename: the output of snpEFF for the same VCF file
+
+Optional Arguments:
+- trinity: TRUE if this is a trinity-formatted VCF.  This concerns how to 
+get the sample name.
+"""
+
+import argparse
+import re
+import string
+import vcf
+
+impact = {"MODIFIER":0, "LOW":1, "MODERATE":2, "HIGH":3}
+
+
+def _parseSnpEffColumnLabels(vcfReader):
+    """Given a line of format
+    ##INFO=<ID=EFF,Number=.,Type=String,Description="Predicted
+    effects for this variant.Format: 'Effect ( Effect_Impact |
+    Functional_Class | Codon_Change | Amino_Acid_Change|
+    Amino_Acid_length | Gene_Name | Transcript_BioType | Gene_Coding |
+    Transcript_ID | Exon_Rank | Genotype_Number [ | ERRORS | WARNINGS
+    ] )'">
+    
+    which is translated by the VCF reader to the following:
+    
+    'Effect ( Effect_Impact | Functional_Class | Codon_Change | Amino_Acid_change| Amino_Acid_length | Gene_Name | Transcript_BioType | Gene_Coding | Transcript_ID | Exon  | GenotypeNum [ | ERRORS | WARNINGS ] )'
+    
+    Parse out the ordered list of tokens as they will appear:       
+    ['Functional_Class', 'Codon_Change', 'Amino_Acid_Change',       
+    'Amino_Acid_length', 'Gene_Name', 'Transcript_BioType',         
+    'Gene_Coding', 'Transcript_ID', 'Exon_Rank', 'Genotype_Number'  
+    'ERRORS']"""
+    snpEffHeaderLabel = vcfReader.infos["EFF"].desc.split("\'")[1]
+    snpEffHeaderLabel = re.sub("[\[\] ]", "", snpEffHeaderLabel)
+    snpEffHeaderLabel = re.sub("ERRORS[|]WARNINGS", "ERRORS",
+                               snpEffHeaderLabel)
+    snpEffLabelTokens = re.split("[(|)]", snpEffHeaderLabel)[1:-1]
+    return(snpEffLabelTokens)
+
+
+#def _parseEffectsPerGene(effectString, columnLabels, GT_code):
+def parseEffectsPerGene(effectString, columnLabels):
+    if effectString =="":
+        return {}
+    effectPerGene = dict()
+    effects = effectString.split(",")
+    for thisEffect in effects:
+        effectType = thisEffect.split("(")[0]
+        #                                                               
+        # Given a string such as                                     
+        # downstream_gene_variant(MODIFIER||3956||459|CA9|||NM_001216.2||1)
+        # extract the stuff between the parens, divide it by |, and store  
+        # it in a dictionary indexed by the column labels given as input   
+        #                                                                 
+        effectTokens = re.sub(".+\(", "",
+                              re.sub("\)", "", thisEffect)).split("|")
+        effect = dict()
+        for ii in range(0,len(effectTokens)):
+            effect[columnLabels[ii]] = effectTokens[ii]
+        #match GT_code                             
+        #if GT_code and GT_code != int(effect["Genotype_Number"]):
+        #    continue
+        effect["effect"] = effectType
+        #                                                 
+        # Parse through the list of effects.  Extract the gene.          
+        # Save one effect per gene, choosing an arbitrary effect     
+        # from the most severe effect class.                          
+        #                                     
+        thisGene = effect["Gene_Name"]
+        if not effectPerGene.has_key(thisGene):
+            effectPerGene[thisGene] = effect
+        else:
+            impactThisEffect = effect["Effect_Impact"]
+            worstImpactYet = effectPerGene[thisGene]["Effect_Impact"]
+            if impact[impactThisEffect] > impact[worstImpactYet]:
+                effectPerGene[thisGene] = effect
+            elif impact[impactThisEffect] == impact[worstImpactYet]:
+                if effect["Amino_Acid_length"] > effectPerGene[thisGene]["Amino_Acid_length"]:
+                    effectPerGene[thisGene] = effect
+    return(effectPerGene)
+
+
+
+
+def main():
+    parser = argparse.ArgumentParser()
+    parser.add_argument("snpEffVcf", type=str, help="VCF file from snpEff")
+    parser.add_argument("--trinity", type=int, default=False, 
+                        help="""True if this VCF represents Trinity output,
+                                and if the sample name is then stored in 
+                                the info field under samples[name]""")
+    args = parser.parse_args()
+    print string.join(["#sample", "chr", "start", "end", "reference", 
+                       "alt", "gene", "effect", "DNA_VAF", "RNA_VAF",
+                       "Amino_Acid_Change"], "\t")
+    vcfReader = vcf.VCFReader(open(args.snpEffVcf, "r"))
+    snpEffHeaderColumnLabels = _parseSnpEffColumnLabels(vcfReader)
+    for record in vcfReader:
+        if record.ALT =="NA":
+            continue ######## bad calls in the VCF     
+        sampleName = "Unknown"
+        if args.trinity:
+            sampleName = record.samples[0]['name']
+        strand = record.ID
+        start = record.POS
+        end = int(record.POS) + len(record.REF) - 1
+        snpEffEffectsString = record.INFO["EFF"]
+        effectPerGene = parseEffectsPerGene(snpEffEffectsString, 
+                                            snpEffHeaderColumnLabels)
+        if len(effectPerGene)!=0:
+            for gene in effectPerGene.keys():
+                aaChange = effectPerGene[gene]["Amino_Acid_change"]
+                if aaChange !="":
+                    aaChange="p."+aaChange
+                print string.join([sampleName, record.CHROM, str(start), 
+                                   str(end), 
+                                   record.REF, record.ALT[0], 
+                                   gene, effectPerGene[gene]['effect'],
+                                   "0.0", str(record.INFO["AF"][0]), aaChange], "\t")
+        else:
+            print string.join([sampleName, record.CHROM, str(start), str(end), 
+                               record.REF, record.ALT[0], "", "",
+                               "0.0", str(record.INFO["AF"][0]), ""], "\t")
+
+
+
+
+if __name__ == '__main__':
+        main()
+
+
+