Mercurial > repos > dfornika > screen_abricate_report
comparison screen_abricate_report.py @ 3:1c1c680c70a0 draft default tip
"planemo upload for repository https://github.com/public-health-bioinformatics/galaxy_tools/blob/master/tools/screen_abricate_report commit 2ec76aac2fcf466fc16091bfff8b7cb83fd92467-dirty"
author | dfornika |
---|---|
date | Thu, 02 Jan 2020 21:04:04 +0000 |
parents | 40003338a8e8 |
children |
comparison
equal
deleted
inserted
replaced
2:378696e5f81c | 3:1c1c680c70a0 |
---|---|
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 | 2 |
3 from __future__ import print_function | 3 from __future__ import print_function |
4 | 4 |
5 import argparse | 5 import argparse |
6 import os | 6 import csv |
7 import re | 7 import re |
8 import sys | 8 |
9 import csv | 9 |
10 from pprint import pprint | 10 class Range(object): |
11 """ | |
12 Used to limit the min_coverage and min_identity args to range 0.0 - 100.0 | |
13 """ | |
14 def __init__(self, start, end): | |
15 self.start = start | |
16 self.end = end | |
17 | |
18 def __eq__(self, other): | |
19 return self.start <= other <= self.end | |
20 | |
21 def __contains__(self, item): | |
22 return self.__eq__(item) | |
23 | |
24 def __iter__(self): | |
25 yield self | |
26 | |
27 def __repr__(self): | |
28 return str(self.start) + " - " + str(self.end) | |
11 | 29 |
12 def parse_screen_file(screen_file): | 30 def parse_screen_file(screen_file): |
13 screen = [] | 31 screen = [] |
14 with open(screen_file) as f: | 32 with open(screen_file) as f: |
15 reader = csv.DictReader(f, delimiter="\t", quotechar='"') | 33 reader = csv.DictReader(f, delimiter="\t", quotechar='"') |
16 for row in reader: | 34 for row in reader: |
17 screen.append(row) | 35 screen.append(row) |
18 return screen | 36 return screen |
19 | 37 |
38 | |
20 def get_fieldnames(input_file): | 39 def get_fieldnames(input_file): |
21 with open(input_file) as f: | 40 with open(input_file) as f: |
22 reader = csv.DictReader(f, delimiter="\t", quotechar='"') | 41 reader = csv.DictReader(f, delimiter="\t", quotechar='"') |
23 row = next(reader) | 42 row = next(reader) |
24 fieldnames = row.keys() | 43 fieldnames = row.keys() |
25 return fieldnames | 44 return fieldnames |
26 | 45 |
46 def detect_gene(abricate_report_row, regex, min_coverage, min_identity): | |
47 gene_of_interest = bool(re.search(regex, abricate_report_row['GENE'])) | |
48 sufficient_coverage = float(abricate_report_row['%COVERAGE']) >= min_coverage | |
49 sufficient_identity = float(abricate_report_row['%IDENTITY']) >= min_identity | |
50 if gene_of_interest and sufficient_coverage and sufficient_identity: | |
51 return True | |
52 else: | |
53 return False | |
54 | |
55 | |
27 def main(args): | 56 def main(args): |
28 screen = parse_screen_file(args.screening_file) | 57 screen = parse_screen_file(args.screening_file) |
29 abricate_report_fieldnames = get_fieldnames(args.abricate_report) | |
30 gene_detection_status_fieldnames = ['gene_name', 'detected'] | 58 gene_detection_status_fieldnames = ['gene_name', 'detected'] |
31 with open(args.abricate_report, 'r') as f1, open(args.screened_report, 'w') as f2, open(args.gene_detection_status, 'w') as f3: | 59 with open(args.abricate_report, 'r') as f1, \ |
60 open(args.screened_report, 'w') as f2, \ | |
61 open(args.gene_detection_status, 'w') as f3: | |
32 abricate_report_reader = csv.DictReader(f1, delimiter="\t", quotechar='"') | 62 abricate_report_reader = csv.DictReader(f1, delimiter="\t", quotechar='"') |
33 screened_report_writer = csv.DictWriter(f2, delimiter="\t", quotechar='"', fieldnames=abricate_report_fieldnames) | 63 screened_report_writer = csv.DictWriter(f2, delimiter="\t", quotechar='"', |
34 gene_detection_status_writer = csv.DictWriter(f3, delimiter="\t", quotechar='"', fieldnames=gene_detection_status_fieldnames) | 64 fieldnames=abricate_report_reader.fieldnames) |
65 gene_detection_status_writer = csv.DictWriter(f3, delimiter="\t", quotechar='"', | |
66 fieldnames=gene_detection_status_fieldnames) | |
35 screened_report_writer.writeheader() | 67 screened_report_writer.writeheader() |
36 gene_detection_status_writer.writeheader() | 68 gene_detection_status_writer.writeheader() |
37 | 69 |
38 for gene in screen: | 70 for gene in screen: |
39 gene_detection_status = { | 71 gene_detection_status = { |
40 'gene_name': gene['gene_name'], | 72 'gene_name': gene['gene_name'], |
41 'detected': False | 73 'detected': False |
42 } | 74 } |
43 for abricate_report_row in abricate_report_reader: | 75 for abricate_report_row in abricate_report_reader: |
44 if re.search(gene['regex'], abricate_report_row['GENE']): | 76 if detect_gene(abricate_report_row, gene['regex'], args.min_coverage, args.min_identity): |
45 gene_detection_status['detected'] = True | 77 gene_detection_status['detected'] = True |
46 screened_report_writer.writerow(abricate_report_row) | 78 screened_report_writer.writerow(abricate_report_row) |
47 gene_detection_status_writer.writerow(gene_detection_status) | 79 gene_detection_status_writer.writerow(gene_detection_status) |
48 f1.seek(0) # return file pointer to start of abricate report | 80 f1.seek(0) # return file pointer to start of abricate report |
49 | 81 next(abricate_report_reader) |
50 | 82 |
51 if __name__ == '__main__': | 83 if __name__ == '__main__': |
52 parser = argparse.ArgumentParser() | 84 parser = argparse.ArgumentParser() |
53 parser.add_argument("abricate_report", help="Input: Abricate report to screen (tsv)") | 85 parser.add_argument("abricate_report", help="Input: Abricate report to screen (tsv)") |
54 parser.add_argument("--screening_file", help="Input: List of genes to screen for (tsv)") | 86 parser.add_argument("--screening_file", help="Input: List of genes to screen for (tsv)") |
55 parser.add_argument("--screened_report", help="Output: Screened abricate report including only genes of interest (tsv)") | 87 parser.add_argument("--screened_report", help=("Output: Screened abricate report, including only genes of interest (tsv)")) |
56 parser.add_argument("--gene_detection_status", help="Output: detection status for all genes listed in the screening file (tsv)") | 88 parser.add_argument("--gene_detection_status", help=("Output: detection status for all genes listed in the screening file (tsv)")) |
89 parser.add_argument("--min_coverage", type=float, default=90.0, choices=Range(0.0, 100.0), help=("Minimum percent coverage")) | |
90 parser.add_argument("--min_identity", type=float, default=90.0, choices=Range(0.0, 100.0), help=("Minimum percent identity")) | |
57 args = parser.parse_args() | 91 args = parser.parse_args() |
58 main(args) | 92 main(args) |