Mercurial > repos > jankanis > blast2html2
comparison blast_html.py @ 19:67ddcb807b7d
make it work with multiple queries
author | Jan Kanis <jan.code@jankanis.nl> |
---|---|
date | Tue, 13 May 2014 18:06:36 +0200 |
parents | 4434ffab721a |
children | 53cd304c5f26 |
comparison
equal
deleted
inserted
replaced
18:4434ffab721a | 19:67ddcb807b7d |
---|---|
17 _filters = {} | 17 _filters = {} |
18 def filter(func_or_name): | 18 def filter(func_or_name): |
19 "Decorator to register a function as filter in the current jinja environment" | 19 "Decorator to register a function as filter in the current jinja environment" |
20 if isinstance(func_or_name, str): | 20 if isinstance(func_or_name, str): |
21 def inner(func): | 21 def inner(func): |
22 _filters[func_or_name] = func | 22 _filters[func_or_name] = func.__name__ |
23 return func | 23 return func |
24 return inner | 24 return inner |
25 else: | 25 else: |
26 _filters[func_or_name.__name__] = func_or_name | 26 _filters[func_or_name.__name__] = func_or_name.__name__ |
27 return func_or_name | 27 return func_or_name |
28 | 28 |
29 | 29 |
30 def color_idx(length): | 30 def color_idx(length): |
31 if length < 40: | 31 if length < 40: |
76 " {:7s} {}\n".format('', hsp.Hsp_midline) + | 76 " {:7s} {}\n".format('', hsp.Hsp_midline) + |
77 "Subject{:>7s} {} {}".format(hsp['Hsp_hit-from'].text, hsp.Hsp_hseq, hsp['Hsp_hit-to']) | 77 "Subject{:>7s} {} {}".format(hsp['Hsp_hit-from'].text, hsp.Hsp_hseq, hsp['Hsp_hit-to']) |
78 ) | 78 ) |
79 | 79 |
80 @filter('len') | 80 @filter('len') |
81 def hsplen(node): | 81 def blastxml_len(node): |
82 return int(node['Hsp_align-len']) | 82 if node.tag == 'Hsp': |
83 return int(node['Hsp_align-len']) | |
84 elif node.tag == 'Iteration': | |
85 return int(node['Iteration_query-len']) | |
86 raise Exception("Unknown XML node type: "+node.tag) | |
87 | |
83 | 88 |
84 @filter | 89 @filter |
85 def asframe(frame): | 90 def asframe(frame): |
86 if frame == 1: | 91 if frame == 1: |
87 return 'Plus' | 92 return 'Plus' |
132 for bad, good in _js_escapes: | 137 for bad, good in _js_escapes: |
133 value = value.replace(bad, good) | 138 value = value.replace(bad, good) |
134 | 139 |
135 return value | 140 return value |
136 | 141 |
142 @filter | |
143 def hits(result): | |
144 # sort hits by longest hotspot first | |
145 return sorted(result.Iteration_hits.findall('Hit'), | |
146 key=lambda h: max(blastxml_len(hsp) for hsp in h.Hit_hsps.Hsp), | |
147 reverse=True) | |
148 | |
137 | 149 |
138 | 150 |
139 class BlastVisualize: | 151 class BlastVisualize: |
140 | 152 |
141 colors = ('black', 'blue', 'green', 'magenta', 'red') | 153 colors = ('black', 'blue', 'green', 'magenta', 'red') |
149 self.blast = objectify.parse(self.input).getroot() | 161 self.blast = objectify.parse(self.input).getroot() |
150 self.loader = jinja2.FileSystemLoader(searchpath=templatedir) | 162 self.loader = jinja2.FileSystemLoader(searchpath=templatedir) |
151 self.environment = jinja2.Environment(loader=self.loader, | 163 self.environment = jinja2.Environment(loader=self.loader, |
152 lstrip_blocks=True, trim_blocks=True, autoescape=True) | 164 lstrip_blocks=True, trim_blocks=True, autoescape=True) |
153 | 165 |
154 self.environment.filters.update(_filters) | 166 self._addfilters(self.environment) |
155 self.environment.filters['color'] = lambda length: match_colors[color_idx(length)] | 167 |
156 | 168 |
157 self.query_length = int(self.blast["BlastOutput_query-len"]) | 169 def _addfilters(self, environment): |
158 self.hits = self.blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit | 170 for filtername, funcname in _filters.items(): |
159 # sort hits by longest hotspot first | 171 try: |
160 self.ordered_hits = sorted(self.hits, | 172 environment.filters[filtername] = getattr(self, funcname) |
161 key=lambda h: max(hsplen(hsp) for hsp in h.Hit_hsps.Hsp), | 173 except AttributeError: |
162 reverse=True) | 174 environment.filters[filtername] = globals()[funcname] |
163 | 175 |
164 def render(self, output): | 176 def render(self, output): |
165 template = self.environment.get_template(self.templatename) | 177 template = self.environment.get_template(self.templatename) |
166 | 178 |
167 params = (('Query ID', self.blast["BlastOutput_query-ID"]), | 179 params = (('Query ID', self.blast["BlastOutput_query-ID"]), |
169 ('Query length', self.blast["BlastOutput_query-len"]), | 181 ('Query length', self.blast["BlastOutput_query-len"]), |
170 ('Program', self.blast.BlastOutput_version), | 182 ('Program', self.blast.BlastOutput_version), |
171 ('Database', self.blast.BlastOutput_db), | 183 ('Database', self.blast.BlastOutput_db), |
172 ) | 184 ) |
173 | 185 |
174 if len(self.blast.BlastOutput_iterations.Iteration) > 1: | |
175 warnings.warn("Multiple 'Iteration' elements found, showing only the first") | |
176 | |
177 output.write(template.render(blast=self.blast, | 186 output.write(template.render(blast=self.blast, |
178 length=self.query_length, | 187 iterations=self.blast.BlastOutput_iterations.Iteration, |
179 hits=self.blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit, | |
180 colors=self.colors, | 188 colors=self.colors, |
181 match_colors=self.match_colors(), | 189 # match_colors=self.match_colors(), |
182 queryscale=self.queryscale(), | 190 # hit_info=self.hit_info(), |
183 hit_info=self.hit_info(), | |
184 genelink=genelink, | 191 genelink=genelink, |
185 params=params)) | 192 params=params)) |
186 | 193 |
187 | 194 @filter |
188 def match_colors(self): | 195 def match_colors(self, result): |
189 """ | 196 """ |
190 An iterator that yields lists of length-color pairs. | 197 An iterator that yields lists of length-color pairs. |
191 """ | 198 """ |
192 | 199 |
193 percent_multiplier = 100 / self.query_length | 200 query_length = blastxml_len(result) |
194 | 201 |
195 for hit in self.hits: | 202 percent_multiplier = 100 / query_length |
203 | |
204 for hit in hits(result): | |
196 # sort hotspots from short to long, so we can overwrite index colors of | 205 # sort hotspots from short to long, so we can overwrite index colors of |
197 # short matches with those of long ones. | 206 # short matches with those of long ones. |
198 hotspots = sorted(hit.Hit_hsps.Hsp, key=lambda hsp: hsplen(hsp)) | 207 hotspots = sorted(hit.Hit_hsps.Hsp, key=lambda hsp: blastxml_len(hsp)) |
199 table = bytearray([255]) * self.query_length | 208 table = bytearray([255]) * query_length |
200 for hsp in hotspots: | 209 for hsp in hotspots: |
201 frm = hsp['Hsp_query-from'] - 1 | 210 frm = hsp['Hsp_query-from'] - 1 |
202 to = int(hsp['Hsp_query-to']) | 211 to = int(hsp['Hsp_query-to']) |
203 table[frm:to] = repeat(color_idx(hsplen(hsp)), to - frm) | 212 table[frm:to] = repeat(color_idx(blastxml_len(hsp)), to - frm) |
204 | 213 |
205 matches = [] | 214 matches = [] |
206 last = table[0] | 215 last = table[0] |
207 count = 0 | 216 count = 0 |
208 for i in range(self.query_length): | 217 for i in range(query_length): |
209 if table[i] == last: | 218 if table[i] == last: |
210 count += 1 | 219 count += 1 |
211 continue | 220 continue |
212 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'transparent')) | 221 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'transparent')) |
213 last = table[i] | 222 last = table[i] |
214 count = 1 | 223 count = 1 |
215 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'transparent')) | 224 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'transparent')) |
216 | 225 |
217 yield dict(colors=matches, link="#hit"+hit.Hit_num.text, defline=firsttitle(hit)) | 226 yield dict(colors=matches, link="#hit"+hit.Hit_num.text, defline=firsttitle(hit)) |
218 | 227 |
219 | 228 @filter |
220 def queryscale(self): | 229 def queryscale(self, result): |
221 skip = math.ceil(self.query_length / self.max_scale_labels) | 230 query_length = blastxml_len(result) |
222 percent_multiplier = 100 / self.query_length | 231 skip = math.ceil(query_length / self.max_scale_labels) |
223 for i in range(1, self.query_length+1): | 232 percent_multiplier = 100 / query_length |
233 for i in range(1, query_length+1): | |
224 if i % skip == 0: | 234 if i % skip == 0: |
225 yield dict(label = i, width = skip * percent_multiplier) | 235 yield dict(label = i, width = skip * percent_multiplier) |
226 if self.query_length % skip != 0: | 236 if query_length % skip != 0: |
227 yield dict(label = self.query_length, width = (self.query_length % skip) * percent_multiplier) | 237 yield dict(label = query_length, width = (query_length % skip) * percent_multiplier) |
228 | 238 |
229 | 239 @filter |
230 def hit_info(self): | 240 def hit_info(self, result): |
231 | 241 |
232 for hit in self.ordered_hits: | 242 query_length = blastxml_len(result) |
243 | |
244 for hit in hits(result): | |
233 hsps = hit.Hit_hsps.Hsp | 245 hsps = hit.Hit_hsps.Hsp |
234 | 246 |
235 cover = [False] * self.query_length | 247 cover = [False] * query_length |
236 for hsp in hsps: | 248 for hsp in hsps: |
237 cover[hsp['Hsp_query-from']-1 : int(hsp['Hsp_query-to'])] = repeat(True, hsplen(hsp)) | 249 cover[hsp['Hsp_query-from']-1 : int(hsp['Hsp_query-to'])] = repeat(True, blastxml_len(hsp)) |
238 cover_count = cover.count(True) | 250 cover_count = cover.count(True) |
239 | 251 |
240 def hsp_val(path): | 252 def hsp_val(path): |
241 return (float(hsp[path]) for hsp in hsps) | 253 return (float(hsp[path]) for hsp in hsps) |
242 | 254 |
243 yield dict(hit = hit, | 255 yield dict(hit = hit, |
244 title = firsttitle(hit), | 256 title = firsttitle(hit), |
245 link_id = hit.Hit_num, | 257 link_id = hit.Hit_num, |
246 maxscore = "{:.1f}".format(max(hsp_val('Hsp_bit-score'))), | 258 maxscore = "{:.1f}".format(max(hsp_val('Hsp_bit-score'))), |
247 totalscore = "{:.1f}".format(sum(hsp_val('Hsp_bit-score'))), | 259 totalscore = "{:.1f}".format(sum(hsp_val('Hsp_bit-score'))), |
248 cover = "{:.0%}".format(cover_count / self.query_length), | 260 cover = "{:.0%}".format(cover_count / query_length), |
249 e_value = "{:.4g}".format(min(hsp_val('Hsp_evalue'))), | 261 e_value = "{:.4g}".format(min(hsp_val('Hsp_evalue'))), |
250 # FIXME: is this the correct formula vv? | 262 # FIXME: is this the correct formula vv? |
251 ident = "{:.0%}".format(float(min(hsp.Hsp_identity / hsplen(hsp) for hsp in hsps))), | 263 ident = "{:.0%}".format(float(min(hsp.Hsp_identity / blastxml_len(hsp) for hsp in hsps))), |
252 accession = hit.Hit_accession) | 264 accession = hit.Hit_accession) |
253 | 265 |
254 def main(): | 266 def main(): |
255 | 267 |
256 parser = argparse.ArgumentParser(description="Convert a BLAST XML result into a nicely readable html page", | 268 parser = argparse.ArgumentParser(description="Convert a BLAST XML result into a nicely readable html page", |