Mercurial > repos > jankanis > blast2html
comparison blast_html.py @ 21:67ddcb807b7d draft
make it work with multiple queries
| author | Jan Kanis <jan.code@jankanis.nl> |
|---|---|
| date | Tue, 13 May 2014 18:06:36 +0200 |
| parents | 4434ffab721a |
| children | 53cd304c5f26 |
comparison
equal
deleted
inserted
replaced
| 20:4434ffab721a | 21:67ddcb807b7d |
|---|---|
| 17 _filters = {} | 17 _filters = {} |
| 18 def filter(func_or_name): | 18 def filter(func_or_name): |
| 19 "Decorator to register a function as filter in the current jinja environment" | 19 "Decorator to register a function as filter in the current jinja environment" |
| 20 if isinstance(func_or_name, str): | 20 if isinstance(func_or_name, str): |
| 21 def inner(func): | 21 def inner(func): |
| 22 _filters[func_or_name] = func | 22 _filters[func_or_name] = func.__name__ |
| 23 return func | 23 return func |
| 24 return inner | 24 return inner |
| 25 else: | 25 else: |
| 26 _filters[func_or_name.__name__] = func_or_name | 26 _filters[func_or_name.__name__] = func_or_name.__name__ |
| 27 return func_or_name | 27 return func_or_name |
| 28 | 28 |
| 29 | 29 |
| 30 def color_idx(length): | 30 def color_idx(length): |
| 31 if length < 40: | 31 if length < 40: |
| 76 " {:7s} {}\n".format('', hsp.Hsp_midline) + | 76 " {:7s} {}\n".format('', hsp.Hsp_midline) + |
| 77 "Subject{:>7s} {} {}".format(hsp['Hsp_hit-from'].text, hsp.Hsp_hseq, hsp['Hsp_hit-to']) | 77 "Subject{:>7s} {} {}".format(hsp['Hsp_hit-from'].text, hsp.Hsp_hseq, hsp['Hsp_hit-to']) |
| 78 ) | 78 ) |
| 79 | 79 |
| 80 @filter('len') | 80 @filter('len') |
| 81 def hsplen(node): | 81 def blastxml_len(node): |
| 82 return int(node['Hsp_align-len']) | 82 if node.tag == 'Hsp': |
| 83 return int(node['Hsp_align-len']) | |
| 84 elif node.tag == 'Iteration': | |
| 85 return int(node['Iteration_query-len']) | |
| 86 raise Exception("Unknown XML node type: "+node.tag) | |
| 87 | |
| 83 | 88 |
| 84 @filter | 89 @filter |
| 85 def asframe(frame): | 90 def asframe(frame): |
| 86 if frame == 1: | 91 if frame == 1: |
| 87 return 'Plus' | 92 return 'Plus' |
| 132 for bad, good in _js_escapes: | 137 for bad, good in _js_escapes: |
| 133 value = value.replace(bad, good) | 138 value = value.replace(bad, good) |
| 134 | 139 |
| 135 return value | 140 return value |
| 136 | 141 |
| 142 @filter | |
| 143 def hits(result): | |
| 144 # sort hits by longest hotspot first | |
| 145 return sorted(result.Iteration_hits.findall('Hit'), | |
| 146 key=lambda h: max(blastxml_len(hsp) for hsp in h.Hit_hsps.Hsp), | |
| 147 reverse=True) | |
| 148 | |
| 137 | 149 |
| 138 | 150 |
| 139 class BlastVisualize: | 151 class BlastVisualize: |
| 140 | 152 |
| 141 colors = ('black', 'blue', 'green', 'magenta', 'red') | 153 colors = ('black', 'blue', 'green', 'magenta', 'red') |
| 149 self.blast = objectify.parse(self.input).getroot() | 161 self.blast = objectify.parse(self.input).getroot() |
| 150 self.loader = jinja2.FileSystemLoader(searchpath=templatedir) | 162 self.loader = jinja2.FileSystemLoader(searchpath=templatedir) |
| 151 self.environment = jinja2.Environment(loader=self.loader, | 163 self.environment = jinja2.Environment(loader=self.loader, |
| 152 lstrip_blocks=True, trim_blocks=True, autoescape=True) | 164 lstrip_blocks=True, trim_blocks=True, autoescape=True) |
| 153 | 165 |
| 154 self.environment.filters.update(_filters) | 166 self._addfilters(self.environment) |
| 155 self.environment.filters['color'] = lambda length: match_colors[color_idx(length)] | 167 |
| 156 | 168 |
| 157 self.query_length = int(self.blast["BlastOutput_query-len"]) | 169 def _addfilters(self, environment): |
| 158 self.hits = self.blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit | 170 for filtername, funcname in _filters.items(): |
| 159 # sort hits by longest hotspot first | 171 try: |
| 160 self.ordered_hits = sorted(self.hits, | 172 environment.filters[filtername] = getattr(self, funcname) |
| 161 key=lambda h: max(hsplen(hsp) for hsp in h.Hit_hsps.Hsp), | 173 except AttributeError: |
| 162 reverse=True) | 174 environment.filters[filtername] = globals()[funcname] |
| 163 | 175 |
| 164 def render(self, output): | 176 def render(self, output): |
| 165 template = self.environment.get_template(self.templatename) | 177 template = self.environment.get_template(self.templatename) |
| 166 | 178 |
| 167 params = (('Query ID', self.blast["BlastOutput_query-ID"]), | 179 params = (('Query ID', self.blast["BlastOutput_query-ID"]), |
| 169 ('Query length', self.blast["BlastOutput_query-len"]), | 181 ('Query length', self.blast["BlastOutput_query-len"]), |
| 170 ('Program', self.blast.BlastOutput_version), | 182 ('Program', self.blast.BlastOutput_version), |
| 171 ('Database', self.blast.BlastOutput_db), | 183 ('Database', self.blast.BlastOutput_db), |
| 172 ) | 184 ) |
| 173 | 185 |
| 174 if len(self.blast.BlastOutput_iterations.Iteration) > 1: | |
| 175 warnings.warn("Multiple 'Iteration' elements found, showing only the first") | |
| 176 | |
| 177 output.write(template.render(blast=self.blast, | 186 output.write(template.render(blast=self.blast, |
| 178 length=self.query_length, | 187 iterations=self.blast.BlastOutput_iterations.Iteration, |
| 179 hits=self.blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit, | |
| 180 colors=self.colors, | 188 colors=self.colors, |
| 181 match_colors=self.match_colors(), | 189 # match_colors=self.match_colors(), |
| 182 queryscale=self.queryscale(), | 190 # hit_info=self.hit_info(), |
| 183 hit_info=self.hit_info(), | |
| 184 genelink=genelink, | 191 genelink=genelink, |
| 185 params=params)) | 192 params=params)) |
| 186 | 193 |
| 187 | 194 @filter |
| 188 def match_colors(self): | 195 def match_colors(self, result): |
| 189 """ | 196 """ |
| 190 An iterator that yields lists of length-color pairs. | 197 An iterator that yields lists of length-color pairs. |
| 191 """ | 198 """ |
| 192 | 199 |
| 193 percent_multiplier = 100 / self.query_length | 200 query_length = blastxml_len(result) |
| 194 | 201 |
| 195 for hit in self.hits: | 202 percent_multiplier = 100 / query_length |
| 203 | |
| 204 for hit in hits(result): | |
| 196 # sort hotspots from short to long, so we can overwrite index colors of | 205 # sort hotspots from short to long, so we can overwrite index colors of |
| 197 # short matches with those of long ones. | 206 # short matches with those of long ones. |
| 198 hotspots = sorted(hit.Hit_hsps.Hsp, key=lambda hsp: hsplen(hsp)) | 207 hotspots = sorted(hit.Hit_hsps.Hsp, key=lambda hsp: blastxml_len(hsp)) |
| 199 table = bytearray([255]) * self.query_length | 208 table = bytearray([255]) * query_length |
| 200 for hsp in hotspots: | 209 for hsp in hotspots: |
| 201 frm = hsp['Hsp_query-from'] - 1 | 210 frm = hsp['Hsp_query-from'] - 1 |
| 202 to = int(hsp['Hsp_query-to']) | 211 to = int(hsp['Hsp_query-to']) |
| 203 table[frm:to] = repeat(color_idx(hsplen(hsp)), to - frm) | 212 table[frm:to] = repeat(color_idx(blastxml_len(hsp)), to - frm) |
| 204 | 213 |
| 205 matches = [] | 214 matches = [] |
| 206 last = table[0] | 215 last = table[0] |
| 207 count = 0 | 216 count = 0 |
| 208 for i in range(self.query_length): | 217 for i in range(query_length): |
| 209 if table[i] == last: | 218 if table[i] == last: |
| 210 count += 1 | 219 count += 1 |
| 211 continue | 220 continue |
| 212 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'transparent')) | 221 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'transparent')) |
| 213 last = table[i] | 222 last = table[i] |
| 214 count = 1 | 223 count = 1 |
| 215 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'transparent')) | 224 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'transparent')) |
| 216 | 225 |
| 217 yield dict(colors=matches, link="#hit"+hit.Hit_num.text, defline=firsttitle(hit)) | 226 yield dict(colors=matches, link="#hit"+hit.Hit_num.text, defline=firsttitle(hit)) |
| 218 | 227 |
| 219 | 228 @filter |
| 220 def queryscale(self): | 229 def queryscale(self, result): |
| 221 skip = math.ceil(self.query_length / self.max_scale_labels) | 230 query_length = blastxml_len(result) |
| 222 percent_multiplier = 100 / self.query_length | 231 skip = math.ceil(query_length / self.max_scale_labels) |
| 223 for i in range(1, self.query_length+1): | 232 percent_multiplier = 100 / query_length |
| 233 for i in range(1, query_length+1): | |
| 224 if i % skip == 0: | 234 if i % skip == 0: |
| 225 yield dict(label = i, width = skip * percent_multiplier) | 235 yield dict(label = i, width = skip * percent_multiplier) |
| 226 if self.query_length % skip != 0: | 236 if query_length % skip != 0: |
| 227 yield dict(label = self.query_length, width = (self.query_length % skip) * percent_multiplier) | 237 yield dict(label = query_length, width = (query_length % skip) * percent_multiplier) |
| 228 | 238 |
| 229 | 239 @filter |
| 230 def hit_info(self): | 240 def hit_info(self, result): |
| 231 | 241 |
| 232 for hit in self.ordered_hits: | 242 query_length = blastxml_len(result) |
| 243 | |
| 244 for hit in hits(result): | |
| 233 hsps = hit.Hit_hsps.Hsp | 245 hsps = hit.Hit_hsps.Hsp |
| 234 | 246 |
| 235 cover = [False] * self.query_length | 247 cover = [False] * query_length |
| 236 for hsp in hsps: | 248 for hsp in hsps: |
| 237 cover[hsp['Hsp_query-from']-1 : int(hsp['Hsp_query-to'])] = repeat(True, hsplen(hsp)) | 249 cover[hsp['Hsp_query-from']-1 : int(hsp['Hsp_query-to'])] = repeat(True, blastxml_len(hsp)) |
| 238 cover_count = cover.count(True) | 250 cover_count = cover.count(True) |
| 239 | 251 |
| 240 def hsp_val(path): | 252 def hsp_val(path): |
| 241 return (float(hsp[path]) for hsp in hsps) | 253 return (float(hsp[path]) for hsp in hsps) |
| 242 | 254 |
| 243 yield dict(hit = hit, | 255 yield dict(hit = hit, |
| 244 title = firsttitle(hit), | 256 title = firsttitle(hit), |
| 245 link_id = hit.Hit_num, | 257 link_id = hit.Hit_num, |
| 246 maxscore = "{:.1f}".format(max(hsp_val('Hsp_bit-score'))), | 258 maxscore = "{:.1f}".format(max(hsp_val('Hsp_bit-score'))), |
| 247 totalscore = "{:.1f}".format(sum(hsp_val('Hsp_bit-score'))), | 259 totalscore = "{:.1f}".format(sum(hsp_val('Hsp_bit-score'))), |
| 248 cover = "{:.0%}".format(cover_count / self.query_length), | 260 cover = "{:.0%}".format(cover_count / query_length), |
| 249 e_value = "{:.4g}".format(min(hsp_val('Hsp_evalue'))), | 261 e_value = "{:.4g}".format(min(hsp_val('Hsp_evalue'))), |
| 250 # FIXME: is this the correct formula vv? | 262 # FIXME: is this the correct formula vv? |
| 251 ident = "{:.0%}".format(float(min(hsp.Hsp_identity / hsplen(hsp) for hsp in hsps))), | 263 ident = "{:.0%}".format(float(min(hsp.Hsp_identity / blastxml_len(hsp) for hsp in hsps))), |
| 252 accession = hit.Hit_accession) | 264 accession = hit.Hit_accession) |
| 253 | 265 |
| 254 def main(): | 266 def main(): |
| 255 | 267 |
| 256 parser = argparse.ArgumentParser(description="Convert a BLAST XML result into a nicely readable html page", | 268 parser = argparse.ArgumentParser(description="Convert a BLAST XML result into a nicely readable html page", |
