Mercurial > repos > melissacline > ucsc_cancer_utilities
view vcfToMutationVector.py @ 30:7a7a52e9b019
Rolling back synapseClient to version 1.0
author | melissacline |
---|---|
date | Wed, 22 Jul 2015 17:04:25 -0700 |
parents | 60efb9214eaa |
children |
line wrap: on
line source
#!/usr/bin/env python """A mutationVector generator for the UCSC Xena platform This script takes a VCF file generated by snpEff. Its output is a mutationVector object that represents the data in the language of the Xena platform. Required Arguments: snpEff output VCF filename: the output of snpEFF for the same VCF file Optional Arguments: - trinity: TRUE if this is a trinity-formatted VCF. This concerns how to get the sample name. """ import argparse import re import string import vcf impact = {"MODIFIER":0, "LOW":1, "MODERATE":2, "HIGH":3} def _parseSnpEffColumnLabels(vcfReader): """Given a line of format ##INFO=<ID=EFF,Number=.,Type=String,Description="Predicted effects for this variant.Format: 'Effect ( Effect_Impact | Functional_Class | Codon_Change | Amino_Acid_Change| Amino_Acid_length | Gene_Name | Transcript_BioType | Gene_Coding | Transcript_ID | Exon_Rank | Genotype_Number [ | ERRORS | WARNINGS ] )'"> which is translated by the VCF reader to the following: 'Effect ( Effect_Impact | Functional_Class | Codon_Change | Amino_Acid_change| Amino_Acid_length | Gene_Name | Transcript_BioType | Gene_Coding | Transcript_ID | Exon | GenotypeNum [ | ERRORS | WARNINGS ] )' Parse out the ordered list of tokens as they will appear: ['Functional_Class', 'Codon_Change', 'Amino_Acid_Change', 'Amino_Acid_length', 'Gene_Name', 'Transcript_BioType', 'Gene_Coding', 'Transcript_ID', 'Exon_Rank', 'Genotype_Number' 'ERRORS']""" snpEffHeaderLabel = vcfReader.infos["EFF"].desc.split("\'")[1] snpEffHeaderLabel = re.sub("[\[\] ]", "", snpEffHeaderLabel) snpEffHeaderLabel = re.sub("ERRORS[|]WARNINGS", "ERRORS", snpEffHeaderLabel) snpEffLabelTokens = re.split("[(|)]", snpEffHeaderLabel)[1:-1] return(snpEffLabelTokens) #def _parseEffectsPerGene(effectString, columnLabels, GT_code): def parseEffectsPerGene(effectString, columnLabels): if effectString =="": return {} effectPerGene = dict() effects = effectString.split(",") for thisEffect in effects: effectType = thisEffect.split("(")[0] # # Given a string such as # downstream_gene_variant(MODIFIER||3956||459|CA9|||NM_001216.2||1) # extract the stuff between the parens, divide it by |, and store # it in a dictionary indexed by the column labels given as input # effectTokens = re.sub(".+\(", "", re.sub("\)", "", thisEffect)).split("|") effect = dict() for ii in range(0,len(effectTokens)): effect[columnLabels[ii]] = effectTokens[ii] #match GT_code #if GT_code and GT_code != int(effect["Genotype_Number"]): # continue effect["effect"] = effectType # # Parse through the list of effects. Extract the gene. # Save one effect per gene, choosing an arbitrary effect # from the most severe effect class. # thisGene = effect["Gene_Name"] if not effectPerGene.has_key(thisGene): effectPerGene[thisGene] = effect else: impactThisEffect = effect["Effect_Impact"] worstImpactYet = effectPerGene[thisGene]["Effect_Impact"] if impact[impactThisEffect] > impact[worstImpactYet]: effectPerGene[thisGene] = effect elif impact[impactThisEffect] == impact[worstImpactYet]: if effect["Amino_Acid_length"] > effectPerGene[thisGene]["Amino_Acid_length"]: effectPerGene[thisGene] = effect return(effectPerGene) def main(): parser = argparse.ArgumentParser() parser.add_argument("snpEffVcf", type=str, help="VCF file from snpEff") parser.add_argument("--trinity", type=int, default=False, help="""True if this VCF represents Trinity output, and if the sample name is then stored in the info field under samples[name]""") args = parser.parse_args() print string.join(["#sample", "chr", "start", "end", "reference", "alt", "gene", "effect", "DNA_VAF", "RNA_VAF", "Amino_Acid_Change"], "\t") vcfReader = vcf.VCFReader(open(args.snpEffVcf, "r")) snpEffHeaderColumnLabels = _parseSnpEffColumnLabels(vcfReader) for record in vcfReader: if record.ALT =="NA": continue ######## bad calls in the VCF sampleName = "Unknown" if args.trinity: sampleName = record.samples[0]['name'] strand = record.ID start = record.POS end = int(record.POS) + len(record.REF) - 1 snpEffEffectsString = record.INFO["EFF"] effectPerGene = parseEffectsPerGene(snpEffEffectsString, snpEffHeaderColumnLabels) if len(effectPerGene)!=0: for gene in effectPerGene.keys(): aaChange = effectPerGene[gene]["Amino_Acid_change"] if aaChange !="": aaChange="p."+aaChange print string.join([sampleName, record.CHROM, str(start), str(end), record.REF, record.ALT[0], gene, effectPerGene[gene]['effect'], "0.0", str(record.INFO["AF"][0]), aaChange], "\t") else: print string.join([sampleName, record.CHROM, str(start), str(end), record.REF, record.ALT[0], "", "", "0.0", str(record.INFO["AF"][0]), ""], "\t") if __name__ == '__main__': main()