0
|
1 #!/usr/bin/env python3
|
|
2 # Actually runs under either python 2 or 3
|
|
3
|
|
4 # Copyright The Hyve B.V. 2014
|
|
5 # License: GPL version 3 or higher
|
|
6
|
|
7 import sys
|
|
8 import math
|
|
9 import warnings
|
|
10 from os import path
|
|
11 from itertools import repeat
|
|
12 import argparse
|
|
13 from lxml import objectify
|
|
14 import jinja2
|
|
15
|
|
16
|
|
17
|
|
18 _filters = {}
|
|
19 def filter(func_or_name):
|
|
20 "Decorator to register a function as filter in the current jinja environment"
|
|
21 if isinstance(func_or_name, str):
|
|
22 def inner(func):
|
|
23 _filters[func_or_name] = func.__name__
|
|
24 return func
|
|
25 return inner
|
|
26 else:
|
|
27 _filters[func_or_name.__name__] = func_or_name.__name__
|
|
28 return func_or_name
|
|
29
|
|
30
|
|
31 def color_idx(length):
|
|
32 if length < 40:
|
|
33 return 0
|
|
34 elif length < 50:
|
|
35 return 1
|
|
36 elif length < 80:
|
|
37 return 2
|
|
38 elif length < 200:
|
|
39 return 3
|
|
40 return 4
|
|
41
|
|
42 @filter
|
|
43 def fmt(val, fmt):
|
|
44 return format(float(val), fmt)
|
|
45
|
|
46 @filter
|
|
47 def firsttitle(hit):
|
|
48 return hit.Hit_def.text.split('>')[0]
|
|
49
|
|
50 @filter
|
|
51 def othertitles(hit):
|
|
52 """Split a hit.Hit_def that contains multiple titles up, splitting out the hit ids from the titles."""
|
|
53 id_titles = hit.Hit_def.text.split('>')
|
|
54
|
|
55 titles = []
|
|
56 for t in id_titles[1:]:
|
|
57 fullid, title = t.split(' ', 1)
|
|
58 hitid, id = fullid.split('|', 2)[1:3]
|
|
59 titles.append(dict(id = id,
|
|
60 hitid = hitid,
|
|
61 fullid = fullid,
|
|
62 title = title))
|
|
63 return titles
|
|
64
|
|
65 @filter
|
|
66 def hitid(hit):
|
|
67 hitid = hit.Hit_id.text
|
|
68 s = hitid.split('|', 2)
|
|
69 if len(s) >= 2:
|
|
70 return s[1]
|
|
71 return hitid
|
|
72
|
|
73 @filter
|
|
74 def seqid(hit):
|
|
75 hitid = hit.Hit_id.text
|
|
76 s = hitid.split('|', 2)
|
|
77 if len(s) >= 3:
|
|
78 return s[2]
|
|
79 return hitid
|
|
80
|
|
81
|
|
82 @filter
|
|
83 def alignment_pre(hsp):
|
|
84 return (
|
|
85 "Query {:>7s} {} {}\n".format(hsp['Hsp_query-from'].text, hsp.Hsp_qseq, hsp['Hsp_query-to']) +
|
|
86 " {:7s} {}\n".format('', hsp.Hsp_midline) +
|
|
87 "Subject{:>7s} {} {}".format(hsp['Hsp_hit-from'].text, hsp.Hsp_hseq, hsp['Hsp_hit-to'])
|
|
88 )
|
|
89
|
|
90 @filter('len')
|
|
91 def blastxml_len(node):
|
|
92 if node.tag == 'Hsp':
|
|
93 return int(node['Hsp_align-len'])
|
|
94 elif node.tag == 'Iteration':
|
|
95 return int(node['Iteration_query-len'])
|
|
96 raise Exception("Unknown XML node type: "+node.tag)
|
|
97
|
|
98
|
|
99 @filter
|
|
100 def asframe(frame):
|
|
101 if frame == 1:
|
|
102 return 'Plus'
|
|
103 elif frame == -1:
|
|
104 return 'Minus'
|
|
105 raise Exception("frame should be either +1 or -1")
|
|
106
|
|
107 def genelink(hit, type='genbank', hsp=None):
|
|
108 if not isinstance(hit, str):
|
|
109 hit = hitid(hit)
|
|
110 link = "http://www.ncbi.nlm.nih.gov/nucleotide/{}?report={}&log$=nuclalign".format(hit, type)
|
|
111 if hsp != None:
|
|
112 link += "&from={}&to={}".format(hsp['Hsp_hit-from'], hsp['Hsp_hit-to'])
|
|
113 return link
|
|
114
|
|
115
|
|
116 # javascript escape filter based on Django's, from https://github.com/dsissitka/khan-website/blob/master/templatefilters.py#L112-139
|
|
117 # I've removed the html escapes, since html escaping is already being performed by the template engine.
|
|
118
|
|
119 _base_js_escapes = (
|
|
120 ('\\', r'\u005C'),
|
|
121 ('\'', r'\u0027'),
|
|
122 ('"', r'\u0022'),
|
|
123 # ('>', r'\u003E'),
|
|
124 # ('<', r'\u003C'),
|
|
125 # ('&', r'\u0026'),
|
|
126 # ('=', r'\u003D'),
|
|
127 # ('-', r'\u002D'),
|
|
128 # (';', r'\u003B'),
|
|
129 # (u'\u2028', r'\u2028'),
|
|
130 # (u'\u2029', r'\u2029')
|
|
131 )
|
|
132
|
|
133 # Escape every ASCII character with a value less than 32. This is
|
|
134 # needed a.o. to prevent html parsers from jumping out of javascript
|
|
135 # parsing mode.
|
|
136 _js_escapes = (_base_js_escapes +
|
|
137 tuple(('%c' % z, '\\u%04X' % z) for z in range(32)))
|
|
138
|
|
139 @filter
|
|
140 def js_string_escape(value):
|
|
141 """Escape javascript string literal escapes. Note that this only works
|
|
142 within javascript string literals, not in general javascript
|
|
143 snippets."""
|
|
144
|
|
145 value = str(value)
|
|
146
|
|
147 for bad, good in _js_escapes:
|
|
148 value = value.replace(bad, good)
|
|
149
|
|
150 return value
|
|
151
|
|
152 @filter
|
|
153 def hits(result):
|
|
154 # sort hits by longest hotspot first
|
|
155 return sorted(result.Iteration_hits.findall('Hit'),
|
|
156 key=lambda h: max(blastxml_len(hsp) for hsp in h.Hit_hsps.Hsp),
|
|
157 reverse=True)
|
|
158
|
|
159
|
|
160
|
|
161 class BlastVisualize:
|
|
162
|
|
163 colors = ('black', 'blue', 'green', 'magenta', 'red')
|
|
164
|
|
165 max_scale_labels = 10
|
|
166
|
|
167 def __init__(self, input, templatedir, templatename):
|
|
168 self.input = input
|
|
169 self.templatename = templatename
|
|
170
|
|
171 self.blast = objectify.parse(self.input).getroot()
|
|
172 self.loader = jinja2.FileSystemLoader(searchpath=templatedir)
|
|
173 self.environment = jinja2.Environment(loader=self.loader,
|
|
174 lstrip_blocks=True, trim_blocks=True, autoescape=True)
|
|
175
|
|
176 self._addfilters(self.environment)
|
|
177
|
|
178
|
|
179 def _addfilters(self, environment):
|
|
180 for filtername, funcname in _filters.items():
|
|
181 try:
|
|
182 environment.filters[filtername] = getattr(self, funcname)
|
|
183 except AttributeError:
|
|
184 environment.filters[filtername] = globals()[funcname]
|
|
185
|
|
186 def render(self, output):
|
|
187 template = self.environment.get_template(self.templatename)
|
|
188
|
|
189 params = (('Query ID', self.blast["BlastOutput_query-ID"]),
|
|
190 ('Query definition', self.blast["BlastOutput_query-def"]),
|
|
191 ('Query length', self.blast["BlastOutput_query-len"]),
|
|
192 ('Program', self.blast.BlastOutput_version),
|
|
193 ('Database', self.blast.BlastOutput_db),
|
|
194 )
|
|
195
|
|
196 output.write(template.render(blast=self.blast,
|
|
197 iterations=self.blast.BlastOutput_iterations.Iteration,
|
|
198 colors=self.colors,
|
|
199 # match_colors=self.match_colors(),
|
|
200 # hit_info=self.hit_info(),
|
|
201 genelink=genelink,
|
|
202 params=params))
|
|
203
|
|
204 @filter
|
|
205 def match_colors(self, result):
|
|
206 """
|
|
207 An iterator that yields lists of length-color pairs.
|
|
208 """
|
|
209
|
|
210 query_length = blastxml_len(result)
|
|
211
|
|
212 percent_multiplier = 100 / query_length
|
|
213
|
|
214 for hit in hits(result):
|
|
215 # sort hotspots from short to long, so we can overwrite index colors of
|
|
216 # short matches with those of long ones.
|
|
217 hotspots = sorted(hit.Hit_hsps.Hsp, key=lambda hsp: blastxml_len(hsp))
|
|
218 table = bytearray([255]) * query_length
|
|
219 for hsp in hotspots:
|
|
220 frm = hsp['Hsp_query-from'] - 1
|
|
221 to = int(hsp['Hsp_query-to'])
|
|
222 table[frm:to] = repeat(color_idx(blastxml_len(hsp)), to - frm)
|
|
223
|
|
224 matches = []
|
|
225 last = table[0]
|
|
226 count = 0
|
|
227 for i in range(query_length):
|
|
228 if table[i] == last:
|
|
229 count += 1
|
|
230 continue
|
|
231 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'transparent'))
|
|
232 last = table[i]
|
|
233 count = 1
|
|
234 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'transparent'))
|
|
235
|
|
236 yield dict(colors=matches, link="#hit"+hit.Hit_num.text, defline=firsttitle(hit))
|
|
237
|
|
238 @filter
|
|
239 def queryscale(self, result):
|
|
240 query_length = blastxml_len(result)
|
|
241 skip = math.ceil(query_length / self.max_scale_labels)
|
|
242 percent_multiplier = 100 / query_length
|
|
243 for i in range(1, query_length+1):
|
|
244 if i % skip == 0:
|
|
245 yield dict(label = i, width = skip * percent_multiplier)
|
|
246 if query_length % skip != 0:
|
|
247 yield dict(label = query_length,
|
|
248 width = (query_length % skip) * percent_multiplier)
|
|
249
|
|
250 @filter
|
|
251 def hit_info(self, result):
|
|
252
|
|
253 query_length = blastxml_len(result)
|
|
254
|
|
255 for hit in hits(result):
|
|
256 hsps = hit.Hit_hsps.Hsp
|
|
257
|
|
258 cover = [False] * query_length
|
|
259 for hsp in hsps:
|
|
260 cover[hsp['Hsp_query-from']-1 : int(hsp['Hsp_query-to'])] = repeat(True, blastxml_len(hsp))
|
|
261 cover_count = cover.count(True)
|
|
262
|
|
263 def hsp_val(path):
|
|
264 return (float(hsp[path]) for hsp in hsps)
|
|
265
|
|
266 yield dict(hit = hit,
|
|
267 title = firsttitle(hit),
|
|
268 link_id = hit.Hit_num,
|
|
269 maxscore = "{:.1f}".format(max(hsp_val('Hsp_bit-score'))),
|
|
270 totalscore = "{:.1f}".format(sum(hsp_val('Hsp_bit-score'))),
|
|
271 cover = "{:.0%}".format(cover_count / query_length),
|
|
272 e_value = "{:.4g}".format(min(hsp_val('Hsp_evalue'))),
|
|
273 # FIXME: is this the correct formula vv?
|
|
274 ident = "{:.0%}".format(float(min(hsp.Hsp_identity / blastxml_len(hsp) for hsp in hsps))),
|
|
275 accession = hit.Hit_accession)
|
|
276
|
|
277
|
|
278 def main():
|
|
279 default_template = path.join(path.dirname(__file__), 'blast2html.html.jinja')
|
|
280
|
|
281 parser = argparse.ArgumentParser(description="Convert a BLAST XML result into a nicely readable html page",
|
|
282 usage="{} [-i] INPUT [-o OUTPUT]".format(sys.argv[0]))
|
|
283 input_group = parser.add_mutually_exclusive_group(required=True)
|
|
284 input_group.add_argument('positional_arg', metavar='INPUT', nargs='?', type=argparse.FileType(mode='r'),
|
|
285 help='The input Blast XML file, same as -i/--input')
|
|
286 input_group.add_argument('-i', '--input', type=argparse.FileType(mode='r'),
|
|
287 help='The input Blast XML file')
|
|
288 parser.add_argument('-o', '--output', type=argparse.FileType(mode='w'), default=sys.stdout,
|
|
289 help='The output html file')
|
|
290 # We just want the file name here, so jinja can open the file
|
|
291 # itself. But it is easier to just use a FileType so argparse can
|
|
292 # handle the errors. This introduces a small race condition when
|
|
293 # jinja later tries to re-open the template file, but we don't
|
|
294 # care too much.
|
|
295 parser.add_argument('--template', type=argparse.FileType(mode='r'), default=default_template,
|
|
296 help='The template file to use. Defaults to blast_html.html.jinja')
|
|
297
|
|
298 args = parser.parse_args()
|
|
299 if args.input == None:
|
|
300 args.input = args.positional_arg
|
|
301 if args.input == None:
|
|
302 parser.error('no input specified')
|
|
303
|
|
304 templatedir, templatename = path.split(args.template.name)
|
|
305 args.template.close()
|
|
306 if not templatedir:
|
|
307 templatedir = '.'
|
|
308
|
|
309 b = BlastVisualize(args.input, templatedir, templatename)
|
|
310 b.render(args.output)
|
|
311
|
|
312
|
|
313 if __name__ == '__main__':
|
|
314 main()
|
|
315
|