Mercurial > repos > peterjc > blast_rbh
comparison tools/blast_rbh/best_hits.py @ 39:64369ee3aaf0 draft
planemo upload for repository https://github.com/peterjc/galaxy_blast/tree/master/tools/blast_rbh commit 0a172d2516a9d7e8d8fa26dff24c0c906b9d0c6a-dirty
author | peterjc |
---|---|
date | Mon, 01 Apr 2019 08:05:31 -0400 |
parents | |
children | 2bca27f8abb2 |
comparison
equal
deleted
inserted
replaced
38:632d8517dad1 | 39:64369ee3aaf0 |
---|---|
1 #!/usr/bin/env python | |
2 """Helper code for BLAST Reciprocal Best Hit (RBH) from two BLAST tabular files. | |
3 | |
4 This module defines a function best_hits to return the best hit in a BLAST | |
5 tabular file, intended for use in finding BLAST Reciprocal Best Hits (RBH). | |
6 | |
7 The code in this module originated from blast_rbh.py in | |
8 https://github.com/peterjc/galaxy_blast/tree/master/tools/blast_rbh | |
9 | |
10 Please cite the author per instructions in | |
11 https://github.com/peterjc/galaxy_blast/blob/master/tools/blast_rbh/README.rst | |
12 """ | |
13 | |
14 import sys | |
15 | |
16 tie_warning = 0 | |
17 | |
18 c_query = 0 | |
19 c_match = 1 | |
20 c_score = 2 | |
21 c_identity = 3 | |
22 c_coverage = 4 | |
23 c_qlen = 5 | |
24 c_length = 6 | |
25 | |
26 cols = "qseqid sseqid bitscore pident qcovhsp qlen length" | |
27 | |
28 | |
29 def best_hits(blast_tabular, min_identity=70, min_coverage=50, ignore_self=False): | |
30 """Parse BLAST tabular output returning the best hit. | |
31 | |
32 Argument blast_tabular should be a filename, containing BLAST tabular | |
33 output with "qseqid sseqid bitscore pident qcovhsp qlen length" as | |
34 the columns. | |
35 | |
36 Parses the tabular file, iterating over the results as 2-tuples | |
37 of (query name, tuple of values for the best hit). | |
38 | |
39 One hit is returned for tied best hits to the same sequence | |
40 (e.g. repeated domains). | |
41 | |
42 Tied best hits to different sequences are NOT returned. Instead, | |
43 global variable tie_warning is incremented. | |
44 """ | |
45 global tie_warning | |
46 current = None | |
47 best_score = None | |
48 best = None | |
49 col_count = len(cols.split()) | |
50 with open(blast_tabular) as h: | |
51 for line in h: | |
52 if line.startswith("#"): | |
53 continue | |
54 parts = line.rstrip("\n").split("\t") | |
55 if len(parts) != col_count: | |
56 # Using NCBI BLAST+ 2.2.27 the undefined field is ignored | |
57 # Even NCBI BLAST+ 2.5.0 silently ignores unknown fields :( | |
58 sys.exit( | |
59 "Old version of NCBI BLAST? Expected %i columns, got %i:\n%s\n" | |
60 "Note the qcovhsp field was only added in version 2.2.28\n" | |
61 % (col_count, len(parts), line) | |
62 ) | |
63 if ( | |
64 float(parts[c_identity]) < min_identity | |
65 or float(parts[c_coverage]) < min_coverage | |
66 ): | |
67 continue | |
68 a = parts[c_query] | |
69 b = parts[c_match] | |
70 if ignore_self and a == b: | |
71 continue | |
72 score = float(parts[c_score]) | |
73 qlen = int(parts[c_qlen]) | |
74 length = int(parts[c_length]) | |
75 # print("Considering hit for %s to %s with score %s..." % (a, b, score)) | |
76 if current is None: | |
77 # First hit | |
78 assert best is None | |
79 assert best_score is None | |
80 best = dict() | |
81 # Now append this hit... | |
82 elif a != current: | |
83 # New hit | |
84 if len(best) == 1: | |
85 # Unambiguous (no tied matches) | |
86 yield current, list(best.values())[0] | |
87 else: | |
88 # print("%s has %i equally good hits: %s" | |
89 # % (a, len(best), ", ".join(best))) | |
90 tie_warning += 1 | |
91 best = dict() | |
92 # Now append this hit... | |
93 elif score < best_score: | |
94 # print("No improvement for %s, %s < %s" % (a, score, best_score)) | |
95 continue | |
96 elif score > best_score: | |
97 # This is better, discard old best | |
98 best = dict() | |
99 # Now append this hit... | |
100 else: | |
101 # print("Tied best hits for %s" % a) | |
102 assert best_score == score | |
103 # Now append this hit... | |
104 current = a | |
105 best_score = score | |
106 # This will collapse two equally good hits | |
107 # to the same target (e.g. duplicated domain) | |
108 best[b] = ( | |
109 b, | |
110 score, | |
111 parts[c_score], | |
112 parts[c_identity], | |
113 parts[c_coverage], | |
114 qlen, | |
115 length, | |
116 ) | |
117 # Best hit for final query, if unambiguous: | |
118 if current is not None: | |
119 if len(best) == 1: | |
120 yield current, list(best.values())[0] | |
121 else: | |
122 # print("%s has %i equally good hits: %s" | |
123 # % (a, len(best), ", ".join(best))) | |
124 tie_warning += 1 |