Mercurial > repos > melissacline > ucsc_cancer_utilities
view vcf.py @ 15:d9c36f0aa1d3
Trying one more thing for synapse versioning
author | melissacline |
---|---|
date | Tue, 10 Mar 2015 19:35:51 -0700 |
parents | 60efb9214eaa |
children |
line wrap: on
line source
#!/usr/bin/env python '''A VCFv4.0 parser for Python. The intent of this module is to mimic the ``csv`` module in the Python stdlib, as opposed to more flexible serialization formats like JSON or YAML. ``vcf`` will attempt to parse the content of each record based on the data types specified in the meta-information lines -- specifically the ##INFO and ##FORMAT lines. If these lines are missing or incomplete, it will check against the reserved types mentioned in the spec. Failing that, it will just return strings. There is currently one piece of interface: ``VCFReader``. It takes a file-like object and acts as a reader:: >>> import vcf >>> vcf_reader = vcf.VCFReader(open('example.vcf', 'rb')) >>> for record in vcf_reader: ... print record Record(CHROM='20', POS=14370, ID='rs6054257', REF='G', ALT=['A'], QUAL=29, FILTER='PASS', INFO={'H2': True, 'NS': 3, 'DB': True, 'DP': 14, 'AF': [0.5] }, FORMAT='GT:GQ:DP:HQ', samples=[{'GT': '0', 'HQ': [58, 50], 'DP': 3, 'GQ' : 49, 'name': 'NA00001'}, {'GT': '0', 'HQ': [65, 3], 'DP': 5, 'GQ': 3, 'nam e' : 'NA00002'}, {'GT': '0', 'DP': 3, 'GQ': 41, 'name': 'NA00003'}]) This produces a great deal of information, but it is conveniently accessed. The attributes of a Record are the 8 fixed fields from the VCF spec plus two more. That is: * ``Record.CHROM`` * ``Record.POS`` * ``Record.ID`` * ``Record.REF`` * ``Record.ALT`` * ``Record.QUAL`` * ``Record.FILTER`` * ``Record.INFO`` plus two more attributes to handle genotype information: * ``Record.FORMAT`` * ``Record.samples`` ``samples``, not being the title of any column, is left lowercase. The format of the fixed fields is from the spec. Comma-separated lists in the VCF are converted to lists. In particular, one-entry VCF lists are converted to one-entry Python lists (see, e.g., ``Record.ALT``). Semicolon-delimited lists of key=value pairs are converted to Python dictionaries, with flags being given a ``True`` value. Integers and floats are handled exactly as you'd expect:: >>> record = vcf_reader.next() >>> print record.POS 17330 >>> print record.ALT ['A'] >>> print record.INFO['AF'] [0.017] ``record.FORMAT`` will be a string specifying the format of the genotype fields. In case the FORMAT column does not exist, ``record.FORMAT`` is ``None``. Finally, ``record.samples`` is a list of dictionaries containing the parsed sample column:: >>> record = vcf_reader.next() >>> for sample in record.samples: ... print sample['GT'] '1|2' '2|1' '2/2' Metadata regarding the VCF file itself can be investigated through the following attributes: * ``VCFReader.metadata`` * ``VCFReader.infos`` * ``VCFReader.filters`` * ``VCFReader.formats`` * ``VCFReader.samples`` For example:: >>> vcf_reader.metadata['fileDate'] 20090805 >>> vcf_reader.samples ['NA00001', 'NA00002', 'NA00003'] >>> vcf_reader.filters {'q10': Filter(id='q10', desc='Quality below 10'), 's50': Filter(id='s50', desc='Less than 50% of samples have data')} >>> vcf_reader.infos['AA'].desc Ancestral Allele ''' import collections import re # Metadata parsers/constants RESERVED_INFO = { 'AA': 'String', 'AC': 'Integer', 'AF': 'Float', 'AN': 'Integer', 'BQ': 'Float', 'CIGAR': 'String', 'DB': 'Flag', 'DP': 'Integer', 'END': 'Integer', 'H2': 'Flag', 'MQ': 'Float', 'MQ0': 'Integer', 'NS': 'Integer', 'SB': 'String', 'SOMATIC': 'Flag', 'VALIDATED': 'Flag' } RESERVED_FORMAT = { 'GT': 'String', 'DP': 'Integer', 'FT': 'String', 'GL': 'Float', 'GQ': 'Float', 'HQ': 'Float' } _Info = collections.namedtuple('Info', ['id', 'num', 'type', 'desc']) _Filter = collections.namedtuple('Filter', ['id', 'desc']) _Format = collections.namedtuple('Format', ['id', 'num', 'type', 'desc']) class _vcf_metadata_parser(object): '''Parse the metadat in the header of a VCF file.''' def __init__(self, aggressive=False): super(_vcf_metadata_parser, self).__init__() self.aggro = aggressive self.info_pattern = re.compile(r'''\#\#INFO=< ID=(?P<id>[^,]+), Number=(?P<number>\d+|\.|[AG]), Type=(?P<type>Integer|Float|Flag|Character|String), Description="(?P<desc>[^"]*)" >''', re.VERBOSE) self.filter_pattern = re.compile(r'''\#\#FILTER=< ID=(?P<id>[^,]+), Description="(?P<desc>[^"]*)" >''', re.VERBOSE) self.format_pattern = re.compile(r'''\#\#FORMAT=< ID=(?P<id>.+), Number=(?P<number>\d+|\.|[AG]), Type=(?P<type>.+), Description="(?P<desc>.*)" >''', re.VERBOSE) self.meta_pattern = re.compile(r'''##(?P<key>.+)=(?P<val>.+)''') def read_info(self, info_string): '''Read a meta-information INFO line.''' match = self.info_pattern.match(info_string) if not match: raise SyntaxError( "One of the INFO lines is malformed: {}".format(info_string)) try: num = int(match.group('number')) except ValueError: num = None if self.aggro else '.' info = _Info(match.group('id'), num, match.group('type'), match.group('desc')) return (match.group('id'), info) def read_filter(self, filter_string): '''Read a meta-information FILTER line.''' match = self.filter_pattern.match(filter_string) if not match: raise SyntaxError( "One of the FILTER lines is malformed: {}".format( filter_string)) filt = _Filter(match.group('id'), match.group('desc')) return (match.group('id'), filt) def read_format(self, format_string): '''Read a meta-information FORMAT line.''' match = self.format_pattern.match(format_string) if not match: raise SyntaxError( "One of the FORMAT lines is malformed: {}".format( format_string)) try: num = int(match.group('number')) except ValueError: num = None if self.aggro else '.' form = _Format(match.group('id'), num, match.group('type'), match.group('desc')) return (match.group('id'), form) def read_meta(self, meta_string): match = self.meta_pattern.match(meta_string) return match.group('key'), match.group('val') # Reader class class _meta_info(object): '''Decorator for a property stored in the header info.''' def __init__(self, func): self.func = func def __call__(self, fself): if getattr(fself, "_%s" % self.func.__name__) is None: fself._parse_metainfo() return self.func(fself) def __repr__(self): '''Return the function's docstring.''' return self.func.__doc__ def __doc__(self): '''Return the function's docstring.''' return self.func.__doc__ _Record = collections.namedtuple('Record', [ 'CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT', 'samples' ]) class VCFReader(object): '''Read and parse a VCF v 4.0 file''' def __init__(self, fsock, aggressive=False): super(VCFReader, self).__init__() self.aggro = aggressive self._metadata = None self._infos = None self._filters = None self._formats = None self._samples = None self.reader = fsock if aggressive: self._mapper = self._none_map else: self._mapper = self._pass_map def __iter__(self): return self @property @_meta_info def metadata(self): '''Return the information from lines starting "##"''' return self._metadata @property @_meta_info def infos(self): '''Return the information from lines starting "##INFO"''' return self._infos @property @_meta_info def filters(self): '''Return the information from lines starting "##FILTER"''' return self._filters @property @_meta_info def formats(self): '''Return the information from lines starting "##FORMAT"''' return self._formats @property @_meta_info def samples(self): '''Return the names of the genotype fields.''' return self._samples def _parse_metainfo(self): '''Parse the information stored in the metainfo of the VCF. The end user shouldn't have to use this. She can access the metainfo directly with ``self.metadata``.''' for attr in ('_metadata', '_infos', '_filters', '_formats'): setattr(self, attr, {}) parser = _vcf_metadata_parser() line = self.reader.next() while line.startswith('##'): line = line.strip() if line.startswith('##INFO'): key, val = parser.read_info(line) self._infos[key] = val elif line.startswith('##FILTER'): key, val = parser.read_filter(line) self._filters[key] = val elif line.startswith('##FORMAT'): key, val = parser.read_format(line) self._formats[key] = val else: key, val = parser.read_meta(line.strip()) self._metadata[key] = val line = self.reader.next() fields = line.split() self._samples = fields[9:] def _none_map(self, func, iterable, bad='.'): '''``map``, but make bad values None.''' return [func(x) if x != bad else None for x in iterable] def _pass_map(self, func, iterable, bad='.'): '''``map``, but make bad values None.''' return [func(x) if x != bad else bad for x in iterable] def _parse_info(self, info_str): '''Parse the INFO field of a VCF entry into a dictionary of Python types. ''' entries = info_str.split(';') retdict = {} for entry in entries: entry = entry.split('=') ID = entry[0] try: entry_type = self.infos[ID].type except KeyError: try: entry_type = RESERVED_INFO[ID] except KeyError: if entry[1:]: entry_type = 'String' else: entry_type = 'Flag' if entry_type == 'Integer': vals = entry[1].split(',') val = self._mapper(int, vals) elif entry_type == 'Float': vals = entry[1].split(',') val = self._mapper(float, vals) elif entry_type == 'Flag': val = True elif entry_type == 'String': val = entry[1] try: if self.infos[ID].num == 1: val = val[0] except KeyError: pass retdict[ID] = val return retdict def _parse_samples(self, samples, samp_fmt): '''Parse a sample entry according to the format specified in the FORMAT column.''' samp_data = [] samp_fmt = samp_fmt.split(':') for sample in samples: sampdict = dict(zip(samp_fmt, sample.split(':'))) for fmt in sampdict: vals = sampdict[fmt].split(',') try: entry_type = self.formats[fmt].type except KeyError: try: entry_type = RESERVED_FORMAT[fmt] except KeyError: entry_type = 'String' if entry_type == 'Integer': sampdict[fmt] = self._mapper(int, vals) elif entry_type == 'Float' or entry_type == 'Numeric': sampdict[fmt] = self._mapper(float, vals) elif sampdict[fmt] == './.' and self.aggro: sampdict[fmt] = None samp_data.append(sampdict) for name, data in zip(self.samples, samp_data): data['name'] = name return samp_data def next(self): '''Return the next record in the file.''' if self._samples is None: self._parse_metainfo() row = self.reader.next().split() chrom = row[0] pos = int(row[1]) if row[2] != '.': ID = row[2] else: ID = None if self.aggro else row[2] ref = row[3] alt = self._mapper(str, row[4].split(',')) qual = float(row[5]) if '.' in row[5] else int(row[5]) filt = row[6].split(';') if ';' in row[6] else row[6] if filt == 'PASS' and self.aggro: filt = None info = self._parse_info(row[7]) try: fmt = row[8] except IndexError: fmt = None samples = None else: samples = self._parse_samples(row[9:], fmt) record = _Record(chrom, pos, ID, ref, alt, qual, filt, info, fmt, samples) return record def main(): '''Parse the example VCF file from the specification and print every record.''' import contextlib import StringIO import textwrap buff = '''\ ##fileformat=VCFv4.0 ##fileDate=20090805 ##source=myImputationProgramV3.1 ##reference=1000GenomesPilot-NCBI36 ##phasing=partial ##INFO=<ID=NS,Number=1,Type=Integer,Description="Number of Samples With Data"> ##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth"> ##INFO=<ID=AF,Number=.,Type=Float,Description="Allele Frequency"> ##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele"> ##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129"> ##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership"> ##INFO=<ID=AC,Number=A,Type=Integer,Description="Total number of alternate alleles in called genotypes"> ##FILTER=<ID=q10,Description="Quality below 10"> ##FILTER=<ID=s50,Description="Less than 50% of samples have data"> ##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype"> ##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality"> ##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth"> ##FORMAT=<ID=HQ,Number=2,Type=Integer,Description="Haplotype Quality"> #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tNA00001\tNA00002\tNA00003 20\t14370\trs6054257\tG\tA\t29\tPASS\tNS=3;DP=14;AF=0.5;DB;H2\tGT:GQ:DP:HQ\t0|0:48:1:51,51\t1|0:48:8:51,51\t1/1:43:5:.,. 20\t17330\t.\tT\tA\t3\tq10\tNS=3;DP=11;AF=0.017\tGT:GQ:DP:HQ\t0|0:49:3:58,50\t0|1:3:5:65,3\t0/0:41:3 20\t1110696\trs6040355\tA\tG,T\t67\tPASS\tNS=2;DP=10;AF=0.333,0.667;AA=T;DB\tGT:GQ:DP:HQ\t1|2:21:6:23,27\t2|1:2:0:18,2\t2/2:35:4 20\t1230237\t.\tT\t.\t47\tPASS\tNS=3;DP=13;AA=T\tGT:GQ:DP:HQ\t0|0:54:7:56,60\t0|0:48:4:51,51\t0/0:61:2 20\t1234567\tmicrosat1\tGTCT\tG,GTACT\t50\tPASS\tNS=3;DP=9;AA=G\tGT:GQ:DP\t./.:35:4\t0/2:17:2\t1/1:40:3 ''' with contextlib.closing(StringIO.StringIO(textwrap.dedent(buff))) as sock: vcf_file = VCFReader(sock, aggressive=True) for record in vcf_file: print record if __name__ == '__main__': main()