Mercurial > repos > melissacline > ucsc_cancer_utilities
diff vcf.py @ 0:60efb9214eaa
Uploaded
author | melissacline |
---|---|
date | Wed, 14 Jan 2015 13:54:03 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/vcf.py Wed Jan 14 13:54:03 2015 -0500 @@ -0,0 +1,455 @@ +#!/usr/bin/env python +'''A VCFv4.0 parser for Python. + +The intent of this module is to mimic the ``csv`` module in the Python stdlib, +as opposed to more flexible serialization formats like JSON or YAML. ``vcf`` +will attempt to parse the content of each record based on the data types +specified in the meta-information lines -- specifically the ##INFO and +##FORMAT lines. If these lines are missing or incomplete, it will check +against the reserved types mentioned in the spec. Failing that, it will just +return strings. + +There is currently one piece of interface: ``VCFReader``. It takes a file-like +object and acts as a reader:: + + >>> import vcf + >>> vcf_reader = vcf.VCFReader(open('example.vcf', 'rb')) + >>> for record in vcf_reader: + ... print record + Record(CHROM='20', POS=14370, ID='rs6054257', REF='G', ALT=['A'], QUAL=29, + FILTER='PASS', INFO={'H2': True, 'NS': 3, 'DB': True, 'DP': 14, 'AF': [0.5] + }, FORMAT='GT:GQ:DP:HQ', samples=[{'GT': '0', 'HQ': [58, 50], 'DP': 3, 'GQ' + : 49, 'name': 'NA00001'}, {'GT': '0', 'HQ': [65, 3], 'DP': 5, 'GQ': 3, 'nam + e' : 'NA00002'}, {'GT': '0', 'DP': 3, 'GQ': 41, 'name': 'NA00003'}]) + +This produces a great deal of information, but it is conveniently accessed. +The attributes of a Record are the 8 fixed fields from the VCF spec plus two +more. That is: + + * ``Record.CHROM`` + * ``Record.POS`` + * ``Record.ID`` + * ``Record.REF`` + * ``Record.ALT`` + * ``Record.QUAL`` + * ``Record.FILTER`` + * ``Record.INFO`` + +plus two more attributes to handle genotype information: + + * ``Record.FORMAT`` + * ``Record.samples`` + +``samples``, not being the title of any column, is left lowercase. The format +of the fixed fields is from the spec. Comma-separated lists in the VCF are +converted to lists. In particular, one-entry VCF lists are converted to +one-entry Python lists (see, e.g., ``Record.ALT``). Semicolon-delimited lists +of key=value pairs are converted to Python dictionaries, with flags being given +a ``True`` value. Integers and floats are handled exactly as you'd expect:: + + >>> record = vcf_reader.next() + >>> print record.POS + 17330 + >>> print record.ALT + ['A'] + >>> print record.INFO['AF'] + [0.017] + +``record.FORMAT`` will be a string specifying the format of the genotype +fields. In case the FORMAT column does not exist, ``record.FORMAT`` is +``None``. Finally, ``record.samples`` is a list of dictionaries containing the +parsed sample column:: + + >>> record = vcf_reader.next() + >>> for sample in record.samples: + ... print sample['GT'] + '1|2' + '2|1' + '2/2' + +Metadata regarding the VCF file itself can be investigated through the +following attributes: + + * ``VCFReader.metadata`` + * ``VCFReader.infos`` + * ``VCFReader.filters`` + * ``VCFReader.formats`` + * ``VCFReader.samples`` + +For example:: + + >>> vcf_reader.metadata['fileDate'] + 20090805 + >>> vcf_reader.samples + ['NA00001', 'NA00002', 'NA00003'] + >>> vcf_reader.filters + {'q10': Filter(id='q10', desc='Quality below 10'), + 's50': Filter(id='s50', desc='Less than 50% of samples have data')} + >>> vcf_reader.infos['AA'].desc + Ancestral Allele + +''' +import collections +import re + + +# Metadata parsers/constants +RESERVED_INFO = { + 'AA': 'String', 'AC': 'Integer', 'AF': 'Float', 'AN': 'Integer', + 'BQ': 'Float', 'CIGAR': 'String', 'DB': 'Flag', 'DP': 'Integer', + 'END': 'Integer', 'H2': 'Flag', 'MQ': 'Float', 'MQ0': 'Integer', + 'NS': 'Integer', 'SB': 'String', 'SOMATIC': 'Flag', 'VALIDATED': 'Flag' +} + +RESERVED_FORMAT = { + 'GT': 'String', 'DP': 'Integer', 'FT': 'String', 'GL': 'Float', + 'GQ': 'Float', 'HQ': 'Float' +} + + +_Info = collections.namedtuple('Info', ['id', 'num', 'type', 'desc']) +_Filter = collections.namedtuple('Filter', ['id', 'desc']) +_Format = collections.namedtuple('Format', ['id', 'num', 'type', 'desc']) + + +class _vcf_metadata_parser(object): + '''Parse the metadat in the header of a VCF file.''' + def __init__(self, aggressive=False): + super(_vcf_metadata_parser, self).__init__() + self.aggro = aggressive + self.info_pattern = re.compile(r'''\#\#INFO=< + ID=(?P<id>[^,]+), + Number=(?P<number>\d+|\.|[AG]), + Type=(?P<type>Integer|Float|Flag|Character|String), + Description="(?P<desc>[^"]*)" + >''', re.VERBOSE) + self.filter_pattern = re.compile(r'''\#\#FILTER=< + ID=(?P<id>[^,]+), + Description="(?P<desc>[^"]*)" + >''', re.VERBOSE) + self.format_pattern = re.compile(r'''\#\#FORMAT=< + ID=(?P<id>.+), + Number=(?P<number>\d+|\.|[AG]), + Type=(?P<type>.+), + Description="(?P<desc>.*)" + >''', re.VERBOSE) + self.meta_pattern = re.compile(r'''##(?P<key>.+)=(?P<val>.+)''') + + def read_info(self, info_string): + '''Read a meta-information INFO line.''' + match = self.info_pattern.match(info_string) + if not match: + raise SyntaxError( + "One of the INFO lines is malformed: {}".format(info_string)) + + try: + num = int(match.group('number')) + except ValueError: + num = None if self.aggro else '.' + + info = _Info(match.group('id'), num, + match.group('type'), match.group('desc')) + + return (match.group('id'), info) + + def read_filter(self, filter_string): + '''Read a meta-information FILTER line.''' + match = self.filter_pattern.match(filter_string) + if not match: + raise SyntaxError( + "One of the FILTER lines is malformed: {}".format( + filter_string)) + + filt = _Filter(match.group('id'), match.group('desc')) + + return (match.group('id'), filt) + + def read_format(self, format_string): + '''Read a meta-information FORMAT line.''' + match = self.format_pattern.match(format_string) + if not match: + raise SyntaxError( + "One of the FORMAT lines is malformed: {}".format( + format_string)) + + try: + num = int(match.group('number')) + except ValueError: + num = None if self.aggro else '.' + + form = _Format(match.group('id'), num, + match.group('type'), match.group('desc')) + + return (match.group('id'), form) + + def read_meta(self, meta_string): + match = self.meta_pattern.match(meta_string) + return match.group('key'), match.group('val') + + +# Reader class +class _meta_info(object): + '''Decorator for a property stored in the header info.''' + def __init__(self, func): + self.func = func + + def __call__(self, fself): + if getattr(fself, "_%s" % self.func.__name__) is None: + fself._parse_metainfo() + + return self.func(fself) + + def __repr__(self): + '''Return the function's docstring.''' + return self.func.__doc__ + + def __doc__(self): + '''Return the function's docstring.''' + return self.func.__doc__ + +_Record = collections.namedtuple('Record', [ + 'CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT', + 'samples' +]) + + +class VCFReader(object): + '''Read and parse a VCF v 4.0 file''' + def __init__(self, fsock, aggressive=False): + super(VCFReader, self).__init__() + self.aggro = aggressive + self._metadata = None + self._infos = None + self._filters = None + self._formats = None + self._samples = None + self.reader = fsock + if aggressive: + self._mapper = self._none_map + else: + self._mapper = self._pass_map + + def __iter__(self): + return self + + @property + @_meta_info + def metadata(self): + '''Return the information from lines starting "##"''' + return self._metadata + + @property + @_meta_info + def infos(self): + '''Return the information from lines starting "##INFO"''' + return self._infos + + @property + @_meta_info + def filters(self): + '''Return the information from lines starting "##FILTER"''' + return self._filters + + @property + @_meta_info + def formats(self): + '''Return the information from lines starting "##FORMAT"''' + return self._formats + + @property + @_meta_info + def samples(self): + '''Return the names of the genotype fields.''' + return self._samples + + def _parse_metainfo(self): + '''Parse the information stored in the metainfo of the VCF. + + The end user shouldn't have to use this. She can access the metainfo + directly with ``self.metadata``.''' + for attr in ('_metadata', '_infos', '_filters', '_formats'): + setattr(self, attr, {}) + + parser = _vcf_metadata_parser() + + line = self.reader.next() + while line.startswith('##'): + line = line.strip() + if line.startswith('##INFO'): + key, val = parser.read_info(line) + self._infos[key] = val + + elif line.startswith('##FILTER'): + key, val = parser.read_filter(line) + self._filters[key] = val + + elif line.startswith('##FORMAT'): + key, val = parser.read_format(line) + self._formats[key] = val + + else: + key, val = parser.read_meta(line.strip()) + self._metadata[key] = val + + line = self.reader.next() + + fields = line.split() + self._samples = fields[9:] + + def _none_map(self, func, iterable, bad='.'): + '''``map``, but make bad values None.''' + return [func(x) if x != bad else None + for x in iterable] + + def _pass_map(self, func, iterable, bad='.'): + '''``map``, but make bad values None.''' + return [func(x) if x != bad else bad + for x in iterable] + + def _parse_info(self, info_str): + '''Parse the INFO field of a VCF entry into a dictionary of Python + types. + + ''' + entries = info_str.split(';') + retdict = {} + for entry in entries: + entry = entry.split('=') + ID = entry[0] + try: + entry_type = self.infos[ID].type + except KeyError: + try: + entry_type = RESERVED_INFO[ID] + except KeyError: + if entry[1:]: + entry_type = 'String' + else: + entry_type = 'Flag' + + if entry_type == 'Integer': + vals = entry[1].split(',') + val = self._mapper(int, vals) + elif entry_type == 'Float': + vals = entry[1].split(',') + val = self._mapper(float, vals) + elif entry_type == 'Flag': + val = True + elif entry_type == 'String': + val = entry[1] + + try: + if self.infos[ID].num == 1: + val = val[0] + except KeyError: + pass + + retdict[ID] = val + + return retdict + + def _parse_samples(self, samples, samp_fmt): + '''Parse a sample entry according to the format specified in the FORMAT + column.''' + samp_data = [] + samp_fmt = samp_fmt.split(':') + for sample in samples: + sampdict = dict(zip(samp_fmt, sample.split(':'))) + for fmt in sampdict: + vals = sampdict[fmt].split(',') + try: + entry_type = self.formats[fmt].type + except KeyError: + try: + entry_type = RESERVED_FORMAT[fmt] + except KeyError: + entry_type = 'String' + + if entry_type == 'Integer': + sampdict[fmt] = self._mapper(int, vals) + elif entry_type == 'Float' or entry_type == 'Numeric': + sampdict[fmt] = self._mapper(float, vals) + elif sampdict[fmt] == './.' and self.aggro: + sampdict[fmt] = None + + samp_data.append(sampdict) + + for name, data in zip(self.samples, samp_data): + data['name'] = name + + return samp_data + + def next(self): + '''Return the next record in the file.''' + if self._samples is None: + self._parse_metainfo() + row = self.reader.next().split() + chrom = row[0] + pos = int(row[1]) + + if row[2] != '.': + ID = row[2] + else: + ID = None if self.aggro else row[2] + + ref = row[3] + alt = self._mapper(str, row[4].split(',')) + qual = float(row[5]) if '.' in row[5] else int(row[5]) + filt = row[6].split(';') if ';' in row[6] else row[6] + if filt == 'PASS' and self.aggro: + filt = None + info = self._parse_info(row[7]) + + try: + fmt = row[8] + except IndexError: + fmt = None + samples = None + else: + samples = self._parse_samples(row[9:], fmt) + + record = _Record(chrom, pos, ID, ref, alt, qual, filt, info, fmt, + samples) + return record + + +def main(): + '''Parse the example VCF file from the specification and print every + record.''' + import contextlib + import StringIO + import textwrap + buff = '''\ + ##fileformat=VCFv4.0 + ##fileDate=20090805 + ##source=myImputationProgramV3.1 + ##reference=1000GenomesPilot-NCBI36 + ##phasing=partial + ##INFO=<ID=NS,Number=1,Type=Integer,Description="Number of Samples With Data"> + ##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth"> + ##INFO=<ID=AF,Number=.,Type=Float,Description="Allele Frequency"> + ##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele"> + ##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129"> + ##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership"> + ##INFO=<ID=AC,Number=A,Type=Integer,Description="Total number of alternate alleles in called genotypes"> + ##FILTER=<ID=q10,Description="Quality below 10"> + ##FILTER=<ID=s50,Description="Less than 50% of samples have data"> + ##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype"> + ##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality"> + ##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth"> + ##FORMAT=<ID=HQ,Number=2,Type=Integer,Description="Haplotype Quality"> + #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tNA00001\tNA00002\tNA00003 + 20\t14370\trs6054257\tG\tA\t29\tPASS\tNS=3;DP=14;AF=0.5;DB;H2\tGT:GQ:DP:HQ\t0|0:48:1:51,51\t1|0:48:8:51,51\t1/1:43:5:.,. + 20\t17330\t.\tT\tA\t3\tq10\tNS=3;DP=11;AF=0.017\tGT:GQ:DP:HQ\t0|0:49:3:58,50\t0|1:3:5:65,3\t0/0:41:3 + 20\t1110696\trs6040355\tA\tG,T\t67\tPASS\tNS=2;DP=10;AF=0.333,0.667;AA=T;DB\tGT:GQ:DP:HQ\t1|2:21:6:23,27\t2|1:2:0:18,2\t2/2:35:4 + 20\t1230237\t.\tT\t.\t47\tPASS\tNS=3;DP=13;AA=T\tGT:GQ:DP:HQ\t0|0:54:7:56,60\t0|0:48:4:51,51\t0/0:61:2 + 20\t1234567\tmicrosat1\tGTCT\tG,GTACT\t50\tPASS\tNS=3;DP=9;AA=G\tGT:GQ:DP\t./.:35:4\t0/2:17:2\t1/1:40:3 + ''' + with contextlib.closing(StringIO.StringIO(textwrap.dedent(buff))) as sock: + vcf_file = VCFReader(sock, aggressive=True) + for record in vcf_file: + print record + + +if __name__ == '__main__': + main()