Mercurial > repos > jankanis > blast2html2
comparison visualise.py @ 12:a459c754cdb5
add links, refactor, proper commandline arguments
author | Jan Kanis <jan.code@jankanis.nl> |
---|---|
date | Mon, 12 May 2014 17:12:24 +0200 |
parents | 7660519f2dc9 |
children |
comparison
equal
deleted
inserted
replaced
11:7660519f2dc9 | 12:a459c754cdb5 |
---|---|
5 | 5 |
6 import sys | 6 import sys |
7 import math | 7 import math |
8 import warnings | 8 import warnings |
9 from itertools import repeat | 9 from itertools import repeat |
10 import argparse | |
10 from lxml import objectify | 11 from lxml import objectify |
11 import jinja2 | 12 import jinja2 |
12 | 13 |
13 | 14 |
14 blast = objectify.parse('blast xml example1.xml').getroot() | 15 |
15 loader = jinja2.FileSystemLoader(searchpath='.') | 16 _filters = {} |
16 environment = jinja2.Environment(loader=loader, lstrip_blocks=True, trim_blocks=True, autoescape=True) | |
17 | |
18 def filter(func_or_name): | 17 def filter(func_or_name): |
19 "Decorator to register a function as filter in the current jinja environment" | 18 "Decorator to register a function as filter in the current jinja environment" |
20 if isinstance(func_or_name, str): | 19 if isinstance(func_or_name, str): |
21 def inner(func): | 20 def inner(func): |
22 environment.filters[func_or_name] = func | 21 _filters[func_or_name] = func |
23 return func | 22 return func |
24 return inner | 23 return inner |
25 else: | 24 else: |
26 environment.filters[func_or_name.__name__] = func_or_name | 25 _filters[func_or_name.__name__] = func_or_name |
27 return func_or_name | 26 return func_or_name |
28 | 27 |
29 | 28 |
30 def color_idx(length): | 29 def color_idx(length): |
31 if length < 40: | 30 if length < 40: |
36 return 2 | 35 return 2 |
37 elif length < 200: | 36 elif length < 200: |
38 return 3 | 37 return 3 |
39 return 4 | 38 return 4 |
40 | 39 |
41 colors = ['black', 'blue', 'green', 'magenta', 'red'] | |
42 | |
43 environment.filters['color'] = lambda length: match_colors[color_idx(length)] | |
44 | |
45 @filter | 40 @filter |
46 def fmt(val, fmt): | 41 def fmt(val, fmt): |
47 return format(float(val), fmt) | 42 return format(float(val), fmt) |
48 | 43 |
49 @filter | 44 @filter |
56 id_titles = hit.Hit_def.text.split('>') | 51 id_titles = hit.Hit_def.text.split('>') |
57 | 52 |
58 titles = [] | 53 titles = [] |
59 for t in id_titles[1:]: | 54 for t in id_titles[1:]: |
60 fullid, title = t.split(' ', 1) | 55 fullid, title = t.split(' ', 1) |
61 id = fullid.split('|', 2)[2] | 56 hitid, id = fullid.split('|', 2)[1:3] |
62 titles.append(dict(id = id, | 57 titles.append(dict(id = id, |
58 hitid = hitid, | |
63 fullid = fullid, | 59 fullid = fullid, |
64 title = title)) | 60 title = title)) |
65 return titles | 61 return titles |
66 | 62 |
67 @filter | 63 @filter |
90 return 'Plus' | 86 return 'Plus' |
91 elif frame == -1: | 87 elif frame == -1: |
92 return 'Minus' | 88 return 'Minus' |
93 raise Exception("frame should be either +1 or -1") | 89 raise Exception("frame should be either +1 or -1") |
94 | 90 |
95 | 91 def genelink(hit, type='genbank', hsp=None): |
96 query_length = int(blast["BlastOutput_query-len"]) | 92 if not isinstance(hit, str): |
97 | 93 hit = hitid(hit) |
98 hits = blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit | 94 link = "http://www.ncbi.nlm.nih.gov/nucleotide/{}?report={}&log$=nuclalign".format(hit, type) |
99 # sort hits by longest hotspot first | 95 if hsp != None: |
100 ordered_hits = sorted(hits, | 96 link += "&from={}&to={}".format(hsp['Hsp_hit-from'], hsp['Hsp_hit-to']) |
101 key=lambda h: max(hsplen(hsp) for hsp in h.Hit_hsps.Hsp), | 97 return jinja2.Markup(link) |
102 reverse=True) | 98 |
103 | 99 |
104 def match_colors(): | 100 |
105 """ | 101 |
106 An iterator that yields lists of length-color pairs. | 102 class BlastVisualize: |
107 """ | 103 |
108 | 104 colors = ('black', 'blue', 'green', 'magenta', 'red') |
109 percent_multiplier = 100 / query_length | 105 |
110 | 106 max_scale_labels = 10 |
111 for hit in hits: | 107 |
112 # sort hotspots from short to long, so we can overwrite index colors of | 108 templatename = 'visualise.html.jinja' |
113 # short matches with those of long ones. | 109 |
114 hotspots = sorted(hit.Hit_hsps.Hsp, key=lambda hsp: hsplen(hsp)) | 110 def __init__(self, input): |
115 table = bytearray([255]) * query_length | 111 self.input = input |
116 for hsp in hotspots: | 112 |
117 frm = hsp['Hsp_query-from'] - 1 | 113 self.blast = objectify.parse(self.input).getroot() |
118 to = int(hsp['Hsp_query-to']) | 114 self.loader = jinja2.FileSystemLoader(searchpath='.') |
119 table[frm:to] = repeat(color_idx(hsplen(hsp)), to - frm) | 115 self.environment = jinja2.Environment(loader=self.loader, |
120 | 116 lstrip_blocks=True, trim_blocks=True, autoescape=True) |
121 matches = [] | 117 |
122 last = table[0] | 118 self.environment.filters['color'] = lambda length: match_colors[color_idx(length)] |
123 count = 0 | 119 |
124 for i in range(query_length): | 120 for name, filter in _filters.items(): |
125 if table[i] == last: | 121 self.environment.filters[name] = filter |
126 count += 1 | 122 |
127 continue | 123 self.query_length = int(self.blast["BlastOutput_query-len"]) |
128 matches.append((count * percent_multiplier, colors[last] if last != 255 else 'none')) | 124 self.hits = self.blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit |
129 last = table[i] | 125 # sort hits by longest hotspot first |
130 count = 1 | 126 self.ordered_hits = sorted(self.hits, |
131 matches.append((count * percent_multiplier, colors[last] if last != 255 else 'none')) | 127 key=lambda h: max(hsplen(hsp) for hsp in h.Hit_hsps.Hsp), |
132 | 128 reverse=True) |
133 yield dict(colors=matches, link="#hit"+hit.Hit_num.text, defline=firsttitle(hit)) | 129 |
134 | 130 def render(self, output): |
135 | 131 template = self.environment.get_template(self.templatename) |
136 def queryscale(): | 132 |
137 max_labels = 10 | 133 params = (('Query ID', self.blast["BlastOutput_query-ID"]), |
138 skip = math.ceil(query_length / max_labels) | 134 ('Query definition', self.blast["BlastOutput_query-def"]), |
139 percent_multiplier = 100 / query_length | 135 ('Query length', self.blast["BlastOutput_query-len"]), |
140 for i in range(1, query_length+1): | 136 ('Program', self.blast.BlastOutput_version), |
141 if i % skip == 0: | 137 ('Database', self.blast.BlastOutput_db), |
142 yield dict(label = i, width = skip * percent_multiplier) | 138 ) |
143 if query_length % skip != 0: | 139 |
144 yield dict(label = query_length, width = (query_length % skip) * percent_multiplier) | 140 if len(self.blast.BlastOutput_iterations.Iteration) > 1: |
145 | 141 warnings.warn("Multiple 'Iteration' elements found, showing only the first") |
146 | 142 |
147 def hit_info(): | 143 output.write(template.render(blast=self.blast, |
148 | 144 length=self.query_length, |
149 for hit in ordered_hits: | 145 hits=self.blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit, |
150 hsps = hit.Hit_hsps.Hsp | 146 colors=self.colors, |
151 | 147 match_colors=self.match_colors(), |
152 cover = [False] * query_length | 148 queryscale=self.queryscale(), |
153 for hsp in hsps: | 149 hit_info=self.hit_info(), |
154 cover[hsp['Hsp_query-from']-1 : int(hsp['Hsp_query-to'])] = repeat(True, hsplen(hsp)) | 150 genelink=genelink, |
155 cover_count = cover.count(True) | 151 params=params)) |
156 | 152 |
157 def hsp_val(path): | 153 |
158 return (hsp[path] for hsp in hsps) | 154 def match_colors(self): |
159 | 155 """ |
160 yield dict(hit = hit, | 156 An iterator that yields lists of length-color pairs. |
161 title = firsttitle(hit), | 157 """ |
162 link_id = hit.Hit_num, | 158 |
163 maxscore = "{:.1f}".format(float(max(hsp_val('Hsp_bit-score')))), | 159 percent_multiplier = 100 / self.query_length |
164 totalscore = "{:.1f}".format(float(sum(hsp_val('Hsp_bit-score')))), | 160 |
165 cover = "{:.0%}".format(cover_count / query_length), | 161 for hit in self.hits: |
166 e_value = "{:.4g}".format(float(min(hsp_val('Hsp_evalue')))), | 162 # sort hotspots from short to long, so we can overwrite index colors of |
167 # FIXME: is this the correct formula vv? | 163 # short matches with those of long ones. |
168 ident = "{:.0%}".format(float(min(hsp.Hsp_identity / hsplen(hsp) for hsp in hsps))), | 164 hotspots = sorted(hit.Hit_hsps.Hsp, key=lambda hsp: hsplen(hsp)) |
169 accession = hit.Hit_accession) | 165 table = bytearray([255]) * self.query_length |
166 for hsp in hotspots: | |
167 frm = hsp['Hsp_query-from'] - 1 | |
168 to = int(hsp['Hsp_query-to']) | |
169 table[frm:to] = repeat(color_idx(hsplen(hsp)), to - frm) | |
170 | |
171 matches = [] | |
172 last = table[0] | |
173 count = 0 | |
174 for i in range(self.query_length): | |
175 if table[i] == last: | |
176 count += 1 | |
177 continue | |
178 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'none')) | |
179 last = table[i] | |
180 count = 1 | |
181 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'none')) | |
182 | |
183 yield dict(colors=matches, link="#hit"+hit.Hit_num.text, defline=firsttitle(hit)) | |
184 | |
185 | |
186 def queryscale(self): | |
187 skip = math.ceil(self.query_length / self.max_scale_labels) | |
188 percent_multiplier = 100 / self.query_length | |
189 for i in range(1, self.query_length+1): | |
190 if i % skip == 0: | |
191 yield dict(label = i, width = skip * percent_multiplier) | |
192 if self.query_length % skip != 0: | |
193 yield dict(label = self.query_length, width = (self.query_length % skip) * percent_multiplier) | |
194 | |
195 | |
196 def hit_info(self): | |
197 | |
198 for hit in self.ordered_hits: | |
199 hsps = hit.Hit_hsps.Hsp | |
200 | |
201 cover = [False] * self.query_length | |
202 for hsp in hsps: | |
203 cover[hsp['Hsp_query-from']-1 : int(hsp['Hsp_query-to'])] = repeat(True, hsplen(hsp)) | |
204 cover_count = cover.count(True) | |
205 | |
206 def hsp_val(path): | |
207 return (float(hsp[path]) for hsp in hsps) | |
208 | |
209 yield dict(hit = hit, | |
210 title = firsttitle(hit), | |
211 link_id = hit.Hit_num, | |
212 maxscore = "{:.1f}".format(max(hsp_val('Hsp_bit-score'))), | |
213 totalscore = "{:.1f}".format(sum(hsp_val('Hsp_bit-score'))), | |
214 cover = "{:.0%}".format(cover_count / self.query_length), | |
215 e_value = "{:.4g}".format(min(hsp_val('Hsp_evalue'))), | |
216 # FIXME: is this the correct formula vv? | |
217 ident = "{:.0%}".format(float(min(hsp.Hsp_identity / hsplen(hsp) for hsp in hsps))), | |
218 accession = hit.Hit_accession) | |
170 | 219 |
171 | 220 |
172 def main(): | 221 def main(): |
173 template = environment.get_template('visualise.html.jinja') | 222 |
174 | 223 parser = argparse.ArgumentParser(description="Convert a BLAST XML result into a nicely readable html page", |
175 params = (('Query ID', blast["BlastOutput_query-ID"]), | 224 usage="{} [-i] INPUT [-o OUTPUT]".format(sys.argv[0])) |
176 ('Query definition', blast["BlastOutput_query-def"]), | 225 input_group = parser.add_mutually_exclusive_group(required=True) |
177 ('Query length', blast["BlastOutput_query-len"]), | 226 input_group.add_argument('positional_arg', metavar='INPUT', nargs='?', type=argparse.FileType(mode='r'), |
178 ('Program', blast.BlastOutput_version), | 227 help='The input Blast XML file, same as -i/--input') |
179 ('Database', blast.BlastOutput_db), | 228 input_group.add_argument('-i', '--input', type=argparse.FileType(mode='r'), |
180 ) | 229 help='The input Blast XML file') |
181 | 230 parser.add_argument('-o', '--output', type=argparse.FileType(mode='w'), default=sys.stdout, |
182 if len(blast.BlastOutput_iterations.Iteration) > 1: | 231 help='The output html file') |
183 warnings.warn("Multiple 'Iteration' elements found, showing only the first") | 232 |
184 | 233 args = parser.parse_args() |
185 sys.stdout.write(template.render(blast=blast, | 234 if args.input == None: |
186 length=query_length, | 235 args.input = args.positional_arg |
187 hits=blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit, | 236 if args.input == None: |
188 colors=colors, | 237 parser.error('no input specified') |
189 match_colors=match_colors(), | 238 |
190 queryscale=queryscale(), | 239 b = BlastVisualize(args.input) |
191 hit_info=hit_info(), | 240 b.render(args.output) |
192 params=params)) | 241 |
193 | 242 |
194 main() | 243 if __name__ == '__main__': |
195 | 244 main() |
196 # http://www.ncbi.nlm.nih.gov/nucleotide/557804451?report=genbank&log$=nuclalign&blast_rank=1&RID=PHWP1JNZ014 | 245 |
197 # http://www.ncbi.nlm.nih.gov/nuccore/557804451?report=graph&rid=PHWP1JNZ014[557804451]&tracks=[key:sequence_track,name:Sequence,display_name:Sequence,id:STD1,category:Sequence,annots:Sequence,ShowLabel:true][key:gene_model_track,CDSProductFeats:false][key:alignment_track,name:other%20alignments,annots:NG%20Alignments%7CRefseq%20Alignments%7CGnomon%20Alignments%7CUnnamed,shown:false]&v=752:2685&appname=ncbiblast&link_loc=fromSubj | |
198 | |
199 # http://www.ncbi.nlm.nih.gov/nucleotide/557804451?report=genbank&log$=nucltop&blast_rank=1&RID=PHWP1JNZ014 |