Mercurial > repos > melissacline > ucsc_cancer_utilities
diff vcfToMutationVector.py @ 0:60efb9214eaa
Uploaded
author | melissacline |
---|---|
date | Wed, 14 Jan 2015 13:54:03 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/vcfToMutationVector.py Wed Jan 14 13:54:03 2015 -0500 @@ -0,0 +1,142 @@ +#!/usr/bin/env python + +"""A mutationVector generator for the UCSC Xena platform + +This script takes a VCF file generated by snpEff. +Its output is a mutationVector object that +represents the data in the language of the Xena platform. + +Required Arguments: +snpEff output VCF filename: the output of snpEFF for the same VCF file + +Optional Arguments: +- trinity: TRUE if this is a trinity-formatted VCF. This concerns how to +get the sample name. +""" + +import argparse +import re +import string +import vcf + +impact = {"MODIFIER":0, "LOW":1, "MODERATE":2, "HIGH":3} + + +def _parseSnpEffColumnLabels(vcfReader): + """Given a line of format + ##INFO=<ID=EFF,Number=.,Type=String,Description="Predicted + effects for this variant.Format: 'Effect ( Effect_Impact | + Functional_Class | Codon_Change | Amino_Acid_Change| + Amino_Acid_length | Gene_Name | Transcript_BioType | Gene_Coding | + Transcript_ID | Exon_Rank | Genotype_Number [ | ERRORS | WARNINGS + ] )'"> + + which is translated by the VCF reader to the following: + + 'Effect ( Effect_Impact | Functional_Class | Codon_Change | Amino_Acid_change| Amino_Acid_length | Gene_Name | Transcript_BioType | Gene_Coding | Transcript_ID | Exon | GenotypeNum [ | ERRORS | WARNINGS ] )' + + Parse out the ordered list of tokens as they will appear: + ['Functional_Class', 'Codon_Change', 'Amino_Acid_Change', + 'Amino_Acid_length', 'Gene_Name', 'Transcript_BioType', + 'Gene_Coding', 'Transcript_ID', 'Exon_Rank', 'Genotype_Number' + 'ERRORS']""" + snpEffHeaderLabel = vcfReader.infos["EFF"].desc.split("\'")[1] + snpEffHeaderLabel = re.sub("[\[\] ]", "", snpEffHeaderLabel) + snpEffHeaderLabel = re.sub("ERRORS[|]WARNINGS", "ERRORS", + snpEffHeaderLabel) + snpEffLabelTokens = re.split("[(|)]", snpEffHeaderLabel)[1:-1] + return(snpEffLabelTokens) + + +#def _parseEffectsPerGene(effectString, columnLabels, GT_code): +def parseEffectsPerGene(effectString, columnLabels): + if effectString =="": + return {} + effectPerGene = dict() + effects = effectString.split(",") + for thisEffect in effects: + effectType = thisEffect.split("(")[0] + # + # Given a string such as + # downstream_gene_variant(MODIFIER||3956||459|CA9|||NM_001216.2||1) + # extract the stuff between the parens, divide it by |, and store + # it in a dictionary indexed by the column labels given as input + # + effectTokens = re.sub(".+\(", "", + re.sub("\)", "", thisEffect)).split("|") + effect = dict() + for ii in range(0,len(effectTokens)): + effect[columnLabels[ii]] = effectTokens[ii] + #match GT_code + #if GT_code and GT_code != int(effect["Genotype_Number"]): + # continue + effect["effect"] = effectType + # + # Parse through the list of effects. Extract the gene. + # Save one effect per gene, choosing an arbitrary effect + # from the most severe effect class. + # + thisGene = effect["Gene_Name"] + if not effectPerGene.has_key(thisGene): + effectPerGene[thisGene] = effect + else: + impactThisEffect = effect["Effect_Impact"] + worstImpactYet = effectPerGene[thisGene]["Effect_Impact"] + if impact[impactThisEffect] > impact[worstImpactYet]: + effectPerGene[thisGene] = effect + elif impact[impactThisEffect] == impact[worstImpactYet]: + if effect["Amino_Acid_length"] > effectPerGene[thisGene]["Amino_Acid_length"]: + effectPerGene[thisGene] = effect + return(effectPerGene) + + + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("snpEffVcf", type=str, help="VCF file from snpEff") + parser.add_argument("--trinity", type=int, default=False, + help="""True if this VCF represents Trinity output, + and if the sample name is then stored in + the info field under samples[name]""") + args = parser.parse_args() + print string.join(["#sample", "chr", "start", "end", "reference", + "alt", "gene", "effect", "DNA_VAF", "RNA_VAF", + "Amino_Acid_Change"], "\t") + vcfReader = vcf.VCFReader(open(args.snpEffVcf, "r")) + snpEffHeaderColumnLabels = _parseSnpEffColumnLabels(vcfReader) + for record in vcfReader: + if record.ALT =="NA": + continue ######## bad calls in the VCF + sampleName = "Unknown" + if args.trinity: + sampleName = record.samples[0]['name'] + strand = record.ID + start = record.POS + end = int(record.POS) + len(record.REF) - 1 + snpEffEffectsString = record.INFO["EFF"] + effectPerGene = parseEffectsPerGene(snpEffEffectsString, + snpEffHeaderColumnLabels) + if len(effectPerGene)!=0: + for gene in effectPerGene.keys(): + aaChange = effectPerGene[gene]["Amino_Acid_change"] + if aaChange !="": + aaChange="p."+aaChange + print string.join([sampleName, record.CHROM, str(start), + str(end), + record.REF, record.ALT[0], + gene, effectPerGene[gene]['effect'], + "0.0", str(record.INFO["AF"][0]), aaChange], "\t") + else: + print string.join([sampleName, record.CHROM, str(start), str(end), + record.REF, record.ALT[0], "", "", + "0.0", str(record.INFO["AF"][0]), ""], "\t") + + + + +if __name__ == '__main__': + main() + + +