Mercurial > repos > jankanis > blast2html
comparison blast_html.py @ 15:c2d63adb83db draft
renamed files
| author | Jan Kanis <jan.code@jankanis.nl> |
|---|---|
| date | Mon, 12 May 2014 17:13:49 +0200 |
| parents | visualise.py@a459c754cdb5 |
| children | 0b33898bba45 |
comparison
equal
deleted
inserted
replaced
| 14:a459c754cdb5 | 15:c2d63adb83db |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 # Copyright The Hyve B.V. 2014 | |
| 4 # License: GPL version 3 or higher | |
| 5 | |
| 6 import sys | |
| 7 import math | |
| 8 import warnings | |
| 9 from itertools import repeat | |
| 10 import argparse | |
| 11 from lxml import objectify | |
| 12 import jinja2 | |
| 13 | |
| 14 | |
| 15 | |
| 16 _filters = {} | |
| 17 def filter(func_or_name): | |
| 18 "Decorator to register a function as filter in the current jinja environment" | |
| 19 if isinstance(func_or_name, str): | |
| 20 def inner(func): | |
| 21 _filters[func_or_name] = func | |
| 22 return func | |
| 23 return inner | |
| 24 else: | |
| 25 _filters[func_or_name.__name__] = func_or_name | |
| 26 return func_or_name | |
| 27 | |
| 28 | |
| 29 def color_idx(length): | |
| 30 if length < 40: | |
| 31 return 0 | |
| 32 elif length < 50: | |
| 33 return 1 | |
| 34 elif length < 80: | |
| 35 return 2 | |
| 36 elif length < 200: | |
| 37 return 3 | |
| 38 return 4 | |
| 39 | |
| 40 @filter | |
| 41 def fmt(val, fmt): | |
| 42 return format(float(val), fmt) | |
| 43 | |
| 44 @filter | |
| 45 def firsttitle(hit): | |
| 46 return hit.Hit_def.text.split('>')[0] | |
| 47 | |
| 48 @filter | |
| 49 def othertitles(hit): | |
| 50 """Split a hit.Hit_def that contains multiple titles up, splitting out the hit ids from the titles.""" | |
| 51 id_titles = hit.Hit_def.text.split('>') | |
| 52 | |
| 53 titles = [] | |
| 54 for t in id_titles[1:]: | |
| 55 fullid, title = t.split(' ', 1) | |
| 56 hitid, id = fullid.split('|', 2)[1:3] | |
| 57 titles.append(dict(id = id, | |
| 58 hitid = hitid, | |
| 59 fullid = fullid, | |
| 60 title = title)) | |
| 61 return titles | |
| 62 | |
| 63 @filter | |
| 64 def hitid(hit): | |
| 65 return hit.Hit_id.text.split('|', 2)[1] | |
| 66 | |
| 67 @filter | |
| 68 def seqid(hit): | |
| 69 return hit.Hit_id.text.split('|', 2)[2] | |
| 70 | |
| 71 @filter | |
| 72 def alignment_pre(hsp): | |
| 73 return ( | |
| 74 "Query {:>7s} {} {}\n".format(hsp['Hsp_query-from'], hsp.Hsp_qseq, hsp['Hsp_query-to']) + | |
| 75 " {:7s} {}\n".format('', hsp.Hsp_midline) + | |
| 76 "Subject{:>7s} {} {}".format(hsp['Hsp_hit-from'], hsp.Hsp_hseq, hsp['Hsp_hit-to']) | |
| 77 ) | |
| 78 | |
| 79 @filter('len') | |
| 80 def hsplen(node): | |
| 81 return int(node['Hsp_align-len']) | |
| 82 | |
| 83 @filter | |
| 84 def asframe(frame): | |
| 85 if frame == 1: | |
| 86 return 'Plus' | |
| 87 elif frame == -1: | |
| 88 return 'Minus' | |
| 89 raise Exception("frame should be either +1 or -1") | |
| 90 | |
| 91 def genelink(hit, type='genbank', hsp=None): | |
| 92 if not isinstance(hit, str): | |
| 93 hit = hitid(hit) | |
| 94 link = "http://www.ncbi.nlm.nih.gov/nucleotide/{}?report={}&log$=nuclalign".format(hit, type) | |
| 95 if hsp != None: | |
| 96 link += "&from={}&to={}".format(hsp['Hsp_hit-from'], hsp['Hsp_hit-to']) | |
| 97 return jinja2.Markup(link) | |
| 98 | |
| 99 | |
| 100 | |
| 101 | |
| 102 class BlastVisualize: | |
| 103 | |
| 104 colors = ('black', 'blue', 'green', 'magenta', 'red') | |
| 105 | |
| 106 max_scale_labels = 10 | |
| 107 | |
| 108 templatename = 'visualise.html.jinja' | |
| 109 | |
| 110 def __init__(self, input): | |
| 111 self.input = input | |
| 112 | |
| 113 self.blast = objectify.parse(self.input).getroot() | |
| 114 self.loader = jinja2.FileSystemLoader(searchpath='.') | |
| 115 self.environment = jinja2.Environment(loader=self.loader, | |
| 116 lstrip_blocks=True, trim_blocks=True, autoescape=True) | |
| 117 | |
| 118 self.environment.filters['color'] = lambda length: match_colors[color_idx(length)] | |
| 119 | |
| 120 for name, filter in _filters.items(): | |
| 121 self.environment.filters[name] = filter | |
| 122 | |
| 123 self.query_length = int(self.blast["BlastOutput_query-len"]) | |
| 124 self.hits = self.blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit | |
| 125 # sort hits by longest hotspot first | |
| 126 self.ordered_hits = sorted(self.hits, | |
| 127 key=lambda h: max(hsplen(hsp) for hsp in h.Hit_hsps.Hsp), | |
| 128 reverse=True) | |
| 129 | |
| 130 def render(self, output): | |
| 131 template = self.environment.get_template(self.templatename) | |
| 132 | |
| 133 params = (('Query ID', self.blast["BlastOutput_query-ID"]), | |
| 134 ('Query definition', self.blast["BlastOutput_query-def"]), | |
| 135 ('Query length', self.blast["BlastOutput_query-len"]), | |
| 136 ('Program', self.blast.BlastOutput_version), | |
| 137 ('Database', self.blast.BlastOutput_db), | |
| 138 ) | |
| 139 | |
| 140 if len(self.blast.BlastOutput_iterations.Iteration) > 1: | |
| 141 warnings.warn("Multiple 'Iteration' elements found, showing only the first") | |
| 142 | |
| 143 output.write(template.render(blast=self.blast, | |
| 144 length=self.query_length, | |
| 145 hits=self.blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit, | |
| 146 colors=self.colors, | |
| 147 match_colors=self.match_colors(), | |
| 148 queryscale=self.queryscale(), | |
| 149 hit_info=self.hit_info(), | |
| 150 genelink=genelink, | |
| 151 params=params)) | |
| 152 | |
| 153 | |
| 154 def match_colors(self): | |
| 155 """ | |
| 156 An iterator that yields lists of length-color pairs. | |
| 157 """ | |
| 158 | |
| 159 percent_multiplier = 100 / self.query_length | |
| 160 | |
| 161 for hit in self.hits: | |
| 162 # sort hotspots from short to long, so we can overwrite index colors of | |
| 163 # short matches with those of long ones. | |
| 164 hotspots = sorted(hit.Hit_hsps.Hsp, key=lambda hsp: hsplen(hsp)) | |
| 165 table = bytearray([255]) * self.query_length | |
| 166 for hsp in hotspots: | |
| 167 frm = hsp['Hsp_query-from'] - 1 | |
| 168 to = int(hsp['Hsp_query-to']) | |
| 169 table[frm:to] = repeat(color_idx(hsplen(hsp)), to - frm) | |
| 170 | |
| 171 matches = [] | |
| 172 last = table[0] | |
| 173 count = 0 | |
| 174 for i in range(self.query_length): | |
| 175 if table[i] == last: | |
| 176 count += 1 | |
| 177 continue | |
| 178 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'none')) | |
| 179 last = table[i] | |
| 180 count = 1 | |
| 181 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'none')) | |
| 182 | |
| 183 yield dict(colors=matches, link="#hit"+hit.Hit_num.text, defline=firsttitle(hit)) | |
| 184 | |
| 185 | |
| 186 def queryscale(self): | |
| 187 skip = math.ceil(self.query_length / self.max_scale_labels) | |
| 188 percent_multiplier = 100 / self.query_length | |
| 189 for i in range(1, self.query_length+1): | |
| 190 if i % skip == 0: | |
| 191 yield dict(label = i, width = skip * percent_multiplier) | |
| 192 if self.query_length % skip != 0: | |
| 193 yield dict(label = self.query_length, width = (self.query_length % skip) * percent_multiplier) | |
| 194 | |
| 195 | |
| 196 def hit_info(self): | |
| 197 | |
| 198 for hit in self.ordered_hits: | |
| 199 hsps = hit.Hit_hsps.Hsp | |
| 200 | |
| 201 cover = [False] * self.query_length | |
| 202 for hsp in hsps: | |
| 203 cover[hsp['Hsp_query-from']-1 : int(hsp['Hsp_query-to'])] = repeat(True, hsplen(hsp)) | |
| 204 cover_count = cover.count(True) | |
| 205 | |
| 206 def hsp_val(path): | |
| 207 return (float(hsp[path]) for hsp in hsps) | |
| 208 | |
| 209 yield dict(hit = hit, | |
| 210 title = firsttitle(hit), | |
| 211 link_id = hit.Hit_num, | |
| 212 maxscore = "{:.1f}".format(max(hsp_val('Hsp_bit-score'))), | |
| 213 totalscore = "{:.1f}".format(sum(hsp_val('Hsp_bit-score'))), | |
| 214 cover = "{:.0%}".format(cover_count / self.query_length), | |
| 215 e_value = "{:.4g}".format(min(hsp_val('Hsp_evalue'))), | |
| 216 # FIXME: is this the correct formula vv? | |
| 217 ident = "{:.0%}".format(float(min(hsp.Hsp_identity / hsplen(hsp) for hsp in hsps))), | |
| 218 accession = hit.Hit_accession) | |
| 219 | |
| 220 | |
| 221 def main(): | |
| 222 | |
| 223 parser = argparse.ArgumentParser(description="Convert a BLAST XML result into a nicely readable html page", | |
| 224 usage="{} [-i] INPUT [-o OUTPUT]".format(sys.argv[0])) | |
| 225 input_group = parser.add_mutually_exclusive_group(required=True) | |
| 226 input_group.add_argument('positional_arg', metavar='INPUT', nargs='?', type=argparse.FileType(mode='r'), | |
| 227 help='The input Blast XML file, same as -i/--input') | |
| 228 input_group.add_argument('-i', '--input', type=argparse.FileType(mode='r'), | |
| 229 help='The input Blast XML file') | |
| 230 parser.add_argument('-o', '--output', type=argparse.FileType(mode='w'), default=sys.stdout, | |
| 231 help='The output html file') | |
| 232 | |
| 233 args = parser.parse_args() | |
| 234 if args.input == None: | |
| 235 args.input = args.positional_arg | |
| 236 if args.input == None: | |
| 237 parser.error('no input specified') | |
| 238 | |
| 239 b = BlastVisualize(args.input) | |
| 240 b.render(args.output) | |
| 241 | |
| 242 | |
| 243 if __name__ == '__main__': | |
| 244 main() | |
| 245 |
