Mercurial > repos > jankanis > blast2html
comparison visualise.py @ 14:a459c754cdb5 draft
add links, refactor, proper commandline arguments
| author | Jan Kanis <jan.code@jankanis.nl> |
|---|---|
| date | Mon, 12 May 2014 17:12:24 +0200 |
| parents | 7660519f2dc9 |
| children |
comparison
equal
deleted
inserted
replaced
| 13:7660519f2dc9 | 14:a459c754cdb5 |
|---|---|
| 5 | 5 |
| 6 import sys | 6 import sys |
| 7 import math | 7 import math |
| 8 import warnings | 8 import warnings |
| 9 from itertools import repeat | 9 from itertools import repeat |
| 10 import argparse | |
| 10 from lxml import objectify | 11 from lxml import objectify |
| 11 import jinja2 | 12 import jinja2 |
| 12 | 13 |
| 13 | 14 |
| 14 blast = objectify.parse('blast xml example1.xml').getroot() | 15 |
| 15 loader = jinja2.FileSystemLoader(searchpath='.') | 16 _filters = {} |
| 16 environment = jinja2.Environment(loader=loader, lstrip_blocks=True, trim_blocks=True, autoescape=True) | |
| 17 | |
| 18 def filter(func_or_name): | 17 def filter(func_or_name): |
| 19 "Decorator to register a function as filter in the current jinja environment" | 18 "Decorator to register a function as filter in the current jinja environment" |
| 20 if isinstance(func_or_name, str): | 19 if isinstance(func_or_name, str): |
| 21 def inner(func): | 20 def inner(func): |
| 22 environment.filters[func_or_name] = func | 21 _filters[func_or_name] = func |
| 23 return func | 22 return func |
| 24 return inner | 23 return inner |
| 25 else: | 24 else: |
| 26 environment.filters[func_or_name.__name__] = func_or_name | 25 _filters[func_or_name.__name__] = func_or_name |
| 27 return func_or_name | 26 return func_or_name |
| 28 | 27 |
| 29 | 28 |
| 30 def color_idx(length): | 29 def color_idx(length): |
| 31 if length < 40: | 30 if length < 40: |
| 36 return 2 | 35 return 2 |
| 37 elif length < 200: | 36 elif length < 200: |
| 38 return 3 | 37 return 3 |
| 39 return 4 | 38 return 4 |
| 40 | 39 |
| 41 colors = ['black', 'blue', 'green', 'magenta', 'red'] | |
| 42 | |
| 43 environment.filters['color'] = lambda length: match_colors[color_idx(length)] | |
| 44 | |
| 45 @filter | 40 @filter |
| 46 def fmt(val, fmt): | 41 def fmt(val, fmt): |
| 47 return format(float(val), fmt) | 42 return format(float(val), fmt) |
| 48 | 43 |
| 49 @filter | 44 @filter |
| 56 id_titles = hit.Hit_def.text.split('>') | 51 id_titles = hit.Hit_def.text.split('>') |
| 57 | 52 |
| 58 titles = [] | 53 titles = [] |
| 59 for t in id_titles[1:]: | 54 for t in id_titles[1:]: |
| 60 fullid, title = t.split(' ', 1) | 55 fullid, title = t.split(' ', 1) |
| 61 id = fullid.split('|', 2)[2] | 56 hitid, id = fullid.split('|', 2)[1:3] |
| 62 titles.append(dict(id = id, | 57 titles.append(dict(id = id, |
| 58 hitid = hitid, | |
| 63 fullid = fullid, | 59 fullid = fullid, |
| 64 title = title)) | 60 title = title)) |
| 65 return titles | 61 return titles |
| 66 | 62 |
| 67 @filter | 63 @filter |
| 90 return 'Plus' | 86 return 'Plus' |
| 91 elif frame == -1: | 87 elif frame == -1: |
| 92 return 'Minus' | 88 return 'Minus' |
| 93 raise Exception("frame should be either +1 or -1") | 89 raise Exception("frame should be either +1 or -1") |
| 94 | 90 |
| 95 | 91 def genelink(hit, type='genbank', hsp=None): |
| 96 query_length = int(blast["BlastOutput_query-len"]) | 92 if not isinstance(hit, str): |
| 97 | 93 hit = hitid(hit) |
| 98 hits = blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit | 94 link = "http://www.ncbi.nlm.nih.gov/nucleotide/{}?report={}&log$=nuclalign".format(hit, type) |
| 99 # sort hits by longest hotspot first | 95 if hsp != None: |
| 100 ordered_hits = sorted(hits, | 96 link += "&from={}&to={}".format(hsp['Hsp_hit-from'], hsp['Hsp_hit-to']) |
| 101 key=lambda h: max(hsplen(hsp) for hsp in h.Hit_hsps.Hsp), | 97 return jinja2.Markup(link) |
| 102 reverse=True) | 98 |
| 103 | 99 |
| 104 def match_colors(): | 100 |
| 105 """ | 101 |
| 106 An iterator that yields lists of length-color pairs. | 102 class BlastVisualize: |
| 107 """ | 103 |
| 108 | 104 colors = ('black', 'blue', 'green', 'magenta', 'red') |
| 109 percent_multiplier = 100 / query_length | 105 |
| 110 | 106 max_scale_labels = 10 |
| 111 for hit in hits: | 107 |
| 112 # sort hotspots from short to long, so we can overwrite index colors of | 108 templatename = 'visualise.html.jinja' |
| 113 # short matches with those of long ones. | 109 |
| 114 hotspots = sorted(hit.Hit_hsps.Hsp, key=lambda hsp: hsplen(hsp)) | 110 def __init__(self, input): |
| 115 table = bytearray([255]) * query_length | 111 self.input = input |
| 116 for hsp in hotspots: | 112 |
| 117 frm = hsp['Hsp_query-from'] - 1 | 113 self.blast = objectify.parse(self.input).getroot() |
| 118 to = int(hsp['Hsp_query-to']) | 114 self.loader = jinja2.FileSystemLoader(searchpath='.') |
| 119 table[frm:to] = repeat(color_idx(hsplen(hsp)), to - frm) | 115 self.environment = jinja2.Environment(loader=self.loader, |
| 120 | 116 lstrip_blocks=True, trim_blocks=True, autoescape=True) |
| 121 matches = [] | 117 |
| 122 last = table[0] | 118 self.environment.filters['color'] = lambda length: match_colors[color_idx(length)] |
| 123 count = 0 | 119 |
| 124 for i in range(query_length): | 120 for name, filter in _filters.items(): |
| 125 if table[i] == last: | 121 self.environment.filters[name] = filter |
| 126 count += 1 | 122 |
| 127 continue | 123 self.query_length = int(self.blast["BlastOutput_query-len"]) |
| 128 matches.append((count * percent_multiplier, colors[last] if last != 255 else 'none')) | 124 self.hits = self.blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit |
| 129 last = table[i] | 125 # sort hits by longest hotspot first |
| 130 count = 1 | 126 self.ordered_hits = sorted(self.hits, |
| 131 matches.append((count * percent_multiplier, colors[last] if last != 255 else 'none')) | 127 key=lambda h: max(hsplen(hsp) for hsp in h.Hit_hsps.Hsp), |
| 132 | 128 reverse=True) |
| 133 yield dict(colors=matches, link="#hit"+hit.Hit_num.text, defline=firsttitle(hit)) | 129 |
| 134 | 130 def render(self, output): |
| 135 | 131 template = self.environment.get_template(self.templatename) |
| 136 def queryscale(): | 132 |
| 137 max_labels = 10 | 133 params = (('Query ID', self.blast["BlastOutput_query-ID"]), |
| 138 skip = math.ceil(query_length / max_labels) | 134 ('Query definition', self.blast["BlastOutput_query-def"]), |
| 139 percent_multiplier = 100 / query_length | 135 ('Query length', self.blast["BlastOutput_query-len"]), |
| 140 for i in range(1, query_length+1): | 136 ('Program', self.blast.BlastOutput_version), |
| 141 if i % skip == 0: | 137 ('Database', self.blast.BlastOutput_db), |
| 142 yield dict(label = i, width = skip * percent_multiplier) | 138 ) |
| 143 if query_length % skip != 0: | 139 |
| 144 yield dict(label = query_length, width = (query_length % skip) * percent_multiplier) | 140 if len(self.blast.BlastOutput_iterations.Iteration) > 1: |
| 145 | 141 warnings.warn("Multiple 'Iteration' elements found, showing only the first") |
| 146 | 142 |
| 147 def hit_info(): | 143 output.write(template.render(blast=self.blast, |
| 148 | 144 length=self.query_length, |
| 149 for hit in ordered_hits: | 145 hits=self.blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit, |
| 150 hsps = hit.Hit_hsps.Hsp | 146 colors=self.colors, |
| 151 | 147 match_colors=self.match_colors(), |
| 152 cover = [False] * query_length | 148 queryscale=self.queryscale(), |
| 153 for hsp in hsps: | 149 hit_info=self.hit_info(), |
| 154 cover[hsp['Hsp_query-from']-1 : int(hsp['Hsp_query-to'])] = repeat(True, hsplen(hsp)) | 150 genelink=genelink, |
| 155 cover_count = cover.count(True) | 151 params=params)) |
| 156 | 152 |
| 157 def hsp_val(path): | 153 |
| 158 return (hsp[path] for hsp in hsps) | 154 def match_colors(self): |
| 159 | 155 """ |
| 160 yield dict(hit = hit, | 156 An iterator that yields lists of length-color pairs. |
| 161 title = firsttitle(hit), | 157 """ |
| 162 link_id = hit.Hit_num, | 158 |
| 163 maxscore = "{:.1f}".format(float(max(hsp_val('Hsp_bit-score')))), | 159 percent_multiplier = 100 / self.query_length |
| 164 totalscore = "{:.1f}".format(float(sum(hsp_val('Hsp_bit-score')))), | 160 |
| 165 cover = "{:.0%}".format(cover_count / query_length), | 161 for hit in self.hits: |
| 166 e_value = "{:.4g}".format(float(min(hsp_val('Hsp_evalue')))), | 162 # sort hotspots from short to long, so we can overwrite index colors of |
| 167 # FIXME: is this the correct formula vv? | 163 # short matches with those of long ones. |
| 168 ident = "{:.0%}".format(float(min(hsp.Hsp_identity / hsplen(hsp) for hsp in hsps))), | 164 hotspots = sorted(hit.Hit_hsps.Hsp, key=lambda hsp: hsplen(hsp)) |
| 169 accession = hit.Hit_accession) | 165 table = bytearray([255]) * self.query_length |
| 166 for hsp in hotspots: | |
| 167 frm = hsp['Hsp_query-from'] - 1 | |
| 168 to = int(hsp['Hsp_query-to']) | |
| 169 table[frm:to] = repeat(color_idx(hsplen(hsp)), to - frm) | |
| 170 | |
| 171 matches = [] | |
| 172 last = table[0] | |
| 173 count = 0 | |
| 174 for i in range(self.query_length): | |
| 175 if table[i] == last: | |
| 176 count += 1 | |
| 177 continue | |
| 178 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'none')) | |
| 179 last = table[i] | |
| 180 count = 1 | |
| 181 matches.append((count * percent_multiplier, self.colors[last] if last != 255 else 'none')) | |
| 182 | |
| 183 yield dict(colors=matches, link="#hit"+hit.Hit_num.text, defline=firsttitle(hit)) | |
| 184 | |
| 185 | |
| 186 def queryscale(self): | |
| 187 skip = math.ceil(self.query_length / self.max_scale_labels) | |
| 188 percent_multiplier = 100 / self.query_length | |
| 189 for i in range(1, self.query_length+1): | |
| 190 if i % skip == 0: | |
| 191 yield dict(label = i, width = skip * percent_multiplier) | |
| 192 if self.query_length % skip != 0: | |
| 193 yield dict(label = self.query_length, width = (self.query_length % skip) * percent_multiplier) | |
| 194 | |
| 195 | |
| 196 def hit_info(self): | |
| 197 | |
| 198 for hit in self.ordered_hits: | |
| 199 hsps = hit.Hit_hsps.Hsp | |
| 200 | |
| 201 cover = [False] * self.query_length | |
| 202 for hsp in hsps: | |
| 203 cover[hsp['Hsp_query-from']-1 : int(hsp['Hsp_query-to'])] = repeat(True, hsplen(hsp)) | |
| 204 cover_count = cover.count(True) | |
| 205 | |
| 206 def hsp_val(path): | |
| 207 return (float(hsp[path]) for hsp in hsps) | |
| 208 | |
| 209 yield dict(hit = hit, | |
| 210 title = firsttitle(hit), | |
| 211 link_id = hit.Hit_num, | |
| 212 maxscore = "{:.1f}".format(max(hsp_val('Hsp_bit-score'))), | |
| 213 totalscore = "{:.1f}".format(sum(hsp_val('Hsp_bit-score'))), | |
| 214 cover = "{:.0%}".format(cover_count / self.query_length), | |
| 215 e_value = "{:.4g}".format(min(hsp_val('Hsp_evalue'))), | |
| 216 # FIXME: is this the correct formula vv? | |
| 217 ident = "{:.0%}".format(float(min(hsp.Hsp_identity / hsplen(hsp) for hsp in hsps))), | |
| 218 accession = hit.Hit_accession) | |
| 170 | 219 |
| 171 | 220 |
| 172 def main(): | 221 def main(): |
| 173 template = environment.get_template('visualise.html.jinja') | 222 |
| 174 | 223 parser = argparse.ArgumentParser(description="Convert a BLAST XML result into a nicely readable html page", |
| 175 params = (('Query ID', blast["BlastOutput_query-ID"]), | 224 usage="{} [-i] INPUT [-o OUTPUT]".format(sys.argv[0])) |
| 176 ('Query definition', blast["BlastOutput_query-def"]), | 225 input_group = parser.add_mutually_exclusive_group(required=True) |
| 177 ('Query length', blast["BlastOutput_query-len"]), | 226 input_group.add_argument('positional_arg', metavar='INPUT', nargs='?', type=argparse.FileType(mode='r'), |
| 178 ('Program', blast.BlastOutput_version), | 227 help='The input Blast XML file, same as -i/--input') |
| 179 ('Database', blast.BlastOutput_db), | 228 input_group.add_argument('-i', '--input', type=argparse.FileType(mode='r'), |
| 180 ) | 229 help='The input Blast XML file') |
| 181 | 230 parser.add_argument('-o', '--output', type=argparse.FileType(mode='w'), default=sys.stdout, |
| 182 if len(blast.BlastOutput_iterations.Iteration) > 1: | 231 help='The output html file') |
| 183 warnings.warn("Multiple 'Iteration' elements found, showing only the first") | 232 |
| 184 | 233 args = parser.parse_args() |
| 185 sys.stdout.write(template.render(blast=blast, | 234 if args.input == None: |
| 186 length=query_length, | 235 args.input = args.positional_arg |
| 187 hits=blast.BlastOutput_iterations.Iteration.Iteration_hits.Hit, | 236 if args.input == None: |
| 188 colors=colors, | 237 parser.error('no input specified') |
| 189 match_colors=match_colors(), | 238 |
| 190 queryscale=queryscale(), | 239 b = BlastVisualize(args.input) |
| 191 hit_info=hit_info(), | 240 b.render(args.output) |
| 192 params=params)) | 241 |
| 193 | 242 |
| 194 main() | 243 if __name__ == '__main__': |
| 195 | 244 main() |
| 196 # http://www.ncbi.nlm.nih.gov/nucleotide/557804451?report=genbank&log$=nuclalign&blast_rank=1&RID=PHWP1JNZ014 | 245 |
| 197 # http://www.ncbi.nlm.nih.gov/nuccore/557804451?report=graph&rid=PHWP1JNZ014[557804451]&tracks=[key:sequence_track,name:Sequence,display_name:Sequence,id:STD1,category:Sequence,annots:Sequence,ShowLabel:true][key:gene_model_track,CDSProductFeats:false][key:alignment_track,name:other%20alignments,annots:NG%20Alignments%7CRefseq%20Alignments%7CGnomon%20Alignments%7CUnnamed,shown:false]&v=752:2685&appname=ncbiblast&link_loc=fromSubj | |
| 198 | |
| 199 # http://www.ncbi.nlm.nih.gov/nucleotide/557804451?report=genbank&log$=nucltop&blast_rank=1&RID=PHWP1JNZ014 |
