0
|
1 #!/usr/bin/env python
|
|
2
|
|
3 """A mutationVector generator for the UCSC Xena platform
|
|
4
|
|
5 This script takes a VCF file generated by snpEff.
|
|
6 Its output is a mutationVector object that
|
|
7 represents the data in the language of the Xena platform.
|
|
8
|
|
9 Required Arguments:
|
|
10 snpEff output VCF filename: the output of snpEFF for the same VCF file
|
|
11
|
|
12 Optional Arguments:
|
|
13 - trinity: TRUE if this is a trinity-formatted VCF. This concerns how to
|
|
14 get the sample name.
|
|
15 """
|
|
16
|
|
17 import argparse
|
|
18 import re
|
|
19 import string
|
|
20 import vcf
|
|
21
|
|
22 impact = {"MODIFIER":0, "LOW":1, "MODERATE":2, "HIGH":3}
|
|
23
|
|
24
|
|
25 def _parseSnpEffColumnLabels(vcfReader):
|
|
26 """Given a line of format
|
|
27 ##INFO=<ID=EFF,Number=.,Type=String,Description="Predicted
|
|
28 effects for this variant.Format: 'Effect ( Effect_Impact |
|
|
29 Functional_Class | Codon_Change | Amino_Acid_Change|
|
|
30 Amino_Acid_length | Gene_Name | Transcript_BioType | Gene_Coding |
|
|
31 Transcript_ID | Exon_Rank | Genotype_Number [ | ERRORS | WARNINGS
|
|
32 ] )'">
|
|
33
|
|
34 which is translated by the VCF reader to the following:
|
|
35
|
|
36 'Effect ( Effect_Impact | Functional_Class | Codon_Change | Amino_Acid_change| Amino_Acid_length | Gene_Name | Transcript_BioType | Gene_Coding | Transcript_ID | Exon | GenotypeNum [ | ERRORS | WARNINGS ] )'
|
|
37
|
|
38 Parse out the ordered list of tokens as they will appear:
|
|
39 ['Functional_Class', 'Codon_Change', 'Amino_Acid_Change',
|
|
40 'Amino_Acid_length', 'Gene_Name', 'Transcript_BioType',
|
|
41 'Gene_Coding', 'Transcript_ID', 'Exon_Rank', 'Genotype_Number'
|
|
42 'ERRORS']"""
|
|
43 snpEffHeaderLabel = vcfReader.infos["EFF"].desc.split("\'")[1]
|
|
44 snpEffHeaderLabel = re.sub("[\[\] ]", "", snpEffHeaderLabel)
|
|
45 snpEffHeaderLabel = re.sub("ERRORS[|]WARNINGS", "ERRORS",
|
|
46 snpEffHeaderLabel)
|
|
47 snpEffLabelTokens = re.split("[(|)]", snpEffHeaderLabel)[1:-1]
|
|
48 return(snpEffLabelTokens)
|
|
49
|
|
50
|
|
51 #def _parseEffectsPerGene(effectString, columnLabels, GT_code):
|
|
52 def parseEffectsPerGene(effectString, columnLabels):
|
|
53 if effectString =="":
|
|
54 return {}
|
|
55 effectPerGene = dict()
|
|
56 effects = effectString.split(",")
|
|
57 for thisEffect in effects:
|
|
58 effectType = thisEffect.split("(")[0]
|
|
59 #
|
|
60 # Given a string such as
|
|
61 # downstream_gene_variant(MODIFIER||3956||459|CA9|||NM_001216.2||1)
|
|
62 # extract the stuff between the parens, divide it by |, and store
|
|
63 # it in a dictionary indexed by the column labels given as input
|
|
64 #
|
|
65 effectTokens = re.sub(".+\(", "",
|
|
66 re.sub("\)", "", thisEffect)).split("|")
|
|
67 effect = dict()
|
|
68 for ii in range(0,len(effectTokens)):
|
|
69 effect[columnLabels[ii]] = effectTokens[ii]
|
|
70 #match GT_code
|
|
71 #if GT_code and GT_code != int(effect["Genotype_Number"]):
|
|
72 # continue
|
|
73 effect["effect"] = effectType
|
|
74 #
|
|
75 # Parse through the list of effects. Extract the gene.
|
|
76 # Save one effect per gene, choosing an arbitrary effect
|
|
77 # from the most severe effect class.
|
|
78 #
|
|
79 thisGene = effect["Gene_Name"]
|
|
80 if not effectPerGene.has_key(thisGene):
|
|
81 effectPerGene[thisGene] = effect
|
|
82 else:
|
|
83 impactThisEffect = effect["Effect_Impact"]
|
|
84 worstImpactYet = effectPerGene[thisGene]["Effect_Impact"]
|
|
85 if impact[impactThisEffect] > impact[worstImpactYet]:
|
|
86 effectPerGene[thisGene] = effect
|
|
87 elif impact[impactThisEffect] == impact[worstImpactYet]:
|
|
88 if effect["Amino_Acid_length"] > effectPerGene[thisGene]["Amino_Acid_length"]:
|
|
89 effectPerGene[thisGene] = effect
|
|
90 return(effectPerGene)
|
|
91
|
|
92
|
|
93
|
|
94
|
|
95 def main():
|
|
96 parser = argparse.ArgumentParser()
|
|
97 parser.add_argument("snpEffVcf", type=str, help="VCF file from snpEff")
|
|
98 parser.add_argument("--trinity", type=int, default=False,
|
|
99 help="""True if this VCF represents Trinity output,
|
|
100 and if the sample name is then stored in
|
|
101 the info field under samples[name]""")
|
|
102 args = parser.parse_args()
|
|
103 print string.join(["#sample", "chr", "start", "end", "reference",
|
|
104 "alt", "gene", "effect", "DNA_VAF", "RNA_VAF",
|
|
105 "Amino_Acid_Change"], "\t")
|
|
106 vcfReader = vcf.VCFReader(open(args.snpEffVcf, "r"))
|
|
107 snpEffHeaderColumnLabels = _parseSnpEffColumnLabels(vcfReader)
|
|
108 for record in vcfReader:
|
|
109 if record.ALT =="NA":
|
|
110 continue ######## bad calls in the VCF
|
|
111 sampleName = "Unknown"
|
|
112 if args.trinity:
|
|
113 sampleName = record.samples[0]['name']
|
|
114 strand = record.ID
|
|
115 start = record.POS
|
|
116 end = int(record.POS) + len(record.REF) - 1
|
|
117 snpEffEffectsString = record.INFO["EFF"]
|
|
118 effectPerGene = parseEffectsPerGene(snpEffEffectsString,
|
|
119 snpEffHeaderColumnLabels)
|
|
120 if len(effectPerGene)!=0:
|
|
121 for gene in effectPerGene.keys():
|
|
122 aaChange = effectPerGene[gene]["Amino_Acid_change"]
|
|
123 if aaChange !="":
|
|
124 aaChange="p."+aaChange
|
|
125 print string.join([sampleName, record.CHROM, str(start),
|
|
126 str(end),
|
|
127 record.REF, record.ALT[0],
|
|
128 gene, effectPerGene[gene]['effect'],
|
|
129 "0.0", str(record.INFO["AF"][0]), aaChange], "\t")
|
|
130 else:
|
|
131 print string.join([sampleName, record.CHROM, str(start), str(end),
|
|
132 record.REF, record.ALT[0], "", "",
|
|
133 "0.0", str(record.INFO["AF"][0]), ""], "\t")
|
|
134
|
|
135
|
|
136
|
|
137
|
|
138 if __name__ == '__main__':
|
|
139 main()
|
|
140
|
|
141
|
|
142
|