Mercurial > repos > galaxyp > filter_by_fasta_ids
view filter_by_fasta_ids.py @ 1:cf802877cad3 draft
planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/filter_by_fasta_ids commit 0556e0fe5aa17c84033a75a45baeb3a4c2b5ff76
author | galaxyp |
---|---|
date | Fri, 15 Feb 2019 16:38:09 -0500 |
parents | cf716ec47072 |
children | 709692e8d856 |
line wrap: on
line source
#!/usr/bin/env python """ A script to build specific fasta databases """ from __future__ import print_function import argparse import re import sys class Sequence(object): def __init__(self, header, sequence_parts): self.header = header self.sequence_parts = sequence_parts self._sequence = None @property def sequence(self): if self._sequence is None: self._sequence = ''.join(self.sequence_parts) return self._sequence def print(self, fh=sys.stdout): print(self.header, file=fh) for line in self.sequence_parts: print(line, file=fh) def FASTAReader_gen(fasta_filename): with open(fasta_filename) as fasta_file: line = fasta_file.readline() while True: if not line: return assert line.startswith('>'), "FASTA headers must start with >" header = line.rstrip() sequence_parts = [] line = fasta_file.readline() while line and line[0] != '>': sequence_parts.append(line.rstrip()) line = fasta_file.readline() yield Sequence(header, sequence_parts) def target_match(targets, search_entry, pattern='>([^| ]+)'): ''' Matches ''' search_entry = search_entry.upper() m = re.search(pattern,search_entry) if m: target = m.group(len(m.groups())) if target in targets: return target else: print( 'No ID match: %s' % search_entry, file=sys.stdout) return None def main(): ''' the main function''' parser = argparse.ArgumentParser() parser.add_argument('-i', required=True, help='Path to input FASTA file') parser.add_argument('-o', required=True, help='Path to output FASTA file') parser.add_argument('-d', help='Path to discarded entries file') header_criteria = parser.add_mutually_exclusive_group() header_criteria.add_argument('--id_list', help='Path to the ID list file') parser.add_argument('--pattern', help='regex earch attern for ID in Fasta entry') header_criteria.add_argument('--header_regexp', help='Regular expression pattern the header should match') sequence_criteria = parser.add_mutually_exclusive_group() sequence_criteria.add_argument('--min_length', type=int, help='Minimum sequence length') sequence_criteria.add_argument('--sequence_regexp', help='Regular expression pattern the header should match') parser.add_argument('--max_length', type=int, help='Maximum sequence length') parser.add_argument('--dedup', action='store_true', default=False, help='Whether to remove duplicate sequences') options = parser.parse_args() if options.pattern: pattern = options.pattern if not re.match('^.*[(](?![?]:).*[)].*$',pattern): print('pattern: "%s" did not include capture group "()" in regex ' % pattern) exit(1) if options.min_length is not None and options.max_length is None: options.max_length = sys.maxsize if options.header_regexp: regexp = re.compile(options.header_regexp) if options.sequence_regexp: regexp = re.compile(options.sequence_regexp) work_summary = {'found': 0} if options.dedup: used_sequences = set() work_summary['duplicates'] = 0 if options.id_list: targets = [] with open(options.id_list) as f_target: for line in f_target.readlines(): targets.append(line.strip().upper()) work_summary['wanted'] = len(targets) homd_db = FASTAReader_gen(options.i) if options.d: discarded = open(options.d, 'w') with open(options.o, "w") as output: for entry in homd_db: print_entry = True if options.id_list: target_matched_results = target_match(targets, entry.header, pattern=pattern) if target_matched_results: work_summary['found'] += 1 targets.remove(target_matched_results) else: print_entry = False elif options.header_regexp: if regexp.search(entry.header) is None: print_entry = False if options.min_length is not None: sequence_length = len(entry.sequence) if not(options.min_length <= sequence_length <= options.max_length): print_entry = False elif options.sequence_regexp: if regexp.search(entry.sequence) is None: print_entry = False if print_entry: if options.dedup: if entry.sequence in used_sequences: work_summary['duplicates'] += 1 continue else: used_sequences.add(entry.sequence) entry.print(output) elif options.d: entry.print(discarded) for parm, count in work_summary.items(): print('%s ==> %d' % (parm, count)) if __name__ == "__main__": main()