Mercurial > repos > jjohnson > ensembl_variant_report
comparison ensemblref.py @ 0:c3a9e63e8c51 draft
planemo upload for repository https://github.com/jj-umn/galaxytools/tree/master/ensembl_variant_report commit 239c1ee096e5fc3e2e929f7bf2d4afba5c677d4b-dirty
author | jjohnson |
---|---|
date | Fri, 06 Jan 2017 16:19:40 -0500 |
parents | |
children | 9e83cc05d384 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:c3a9e63e8c51 |
---|---|
1 """ | |
2 with gtf and twobit | |
3 get_bed(transcript_id) | |
4 """ | |
5 from gtf_to_genes import gene, gene_utilities | |
6 from twobitreader import TwoBitFile | |
7 from Bio.Seq import reverse_complement, translate | |
8 import logging | |
9 logger = logging.getLogger("test") | |
10 | |
11 class EnsemblRef(object): | |
12 | |
13 def __init__(self,gtf_file,twobitfile,read_now=True): | |
14 self.gtf_file = gtf_file | |
15 self.twobitfile = twobitfile | |
16 self.twobit = TwoBitFile(self.twobitfile) | |
17 self.gene_dict = None | |
18 self.transcript_idx = None | |
19 self.name_idx = None | |
20 if read_now: | |
21 self.get_transcript_idx() | |
22 | |
23 | |
24 def get_gene_dict(self): | |
25 if self.gene_dict is None: | |
26 gene_structures = gene.t_parse_gtf('test') | |
27 self.gene_dict = gene_structures.get_genes(self.gtf_file,logger=logger) | |
28 return self.gene_dict | |
29 | |
30 | |
31 def get_transcript_idx(self): | |
32 if self.transcript_idx is None: | |
33 self.transcript_idx = gene_utilities.index_transcripts(self.get_gene_dict(),by_prot_id=False) | |
34 return self.transcript_idx | |
35 | |
36 | |
37 def get_name_idx(self): | |
38 if self.name_idx is None: | |
39 self.name_idx = dict() | |
40 for i,t in self.get_transcript_idx().items(): | |
41 for name in t.gene.names: | |
42 self.name_idx[name] = t.gene | |
43 for name in t.names: | |
44 self.name_idx[name] = t | |
45 if t.prot_id: | |
46 self.name_idx[t.prot_id] = t | |
47 return self.name_idx | |
48 | |
49 | |
50 def get_gtf_transcript(self,transcript_id): | |
51 idx = self.get_transcript_idx() | |
52 return idx[transcript_id] if transcript_id in idx else None | |
53 | |
54 | |
55 def transcript_is_coding(self,transcript_id): | |
56 tx = self.get_transcript_idx()[transcript_id] | |
57 return len(tx.start_codons) > 0 | |
58 | |
59 | |
60 def get_transcript_start_codon(self,transcript_id): | |
61 tx = self.get_transcript_idx()[transcript_id] | |
62 return tx.start_codons[0] if len(tx.start_codons) > 0 else None | |
63 | |
64 | |
65 def get_bed_line(self,transcript_id,coding=False): | |
66 tx = self.get_transcript_idx()[transcript_id] | |
67 chrom = tx.gene.contig | |
68 chromStart = tx.coding_beg if coding else tx.beg | |
69 chromEnd = tx.coding_end if coding else tx.end | |
70 name = transcript_id | |
71 score = 0 | |
72 strand = '+' if tx.gene.strand else '-' | |
73 thickStart = tx.coding_beg if tx.coding_beg else chromStart | |
74 thickEnd = tx.coding_end if tx.coding_end else chromEnd | |
75 itemRgb = '0,0,0' | |
76 exons = tx.get_coding_exons() if coding else tx.get_exons() | |
77 blockCount = len(exons) | |
78 if tx.gene.strand: | |
79 strand = '+' | |
80 blockSizes = [abs(e-s) for s,e in exons] | |
81 | |
82 def get_bed_line(self,transcript_id,coding=False): | |
83 tx = self.get_transcript_idx()[transcript_id] | |
84 chrom = tx.gene.contig | |
85 chromStart = tx.coding_beg if coding else tx.beg | |
86 chromEnd = tx.coding_end if coding else tx.end | |
87 name = transcript_id | |
88 score = 0 | |
89 strand = '+' if tx.gene.strand else '-' | |
90 thickStart = tx.coding_beg if tx.coding_beg else chromStart | |
91 thickEnd = tx.coding_end if tx.coding_end else chromEnd | |
92 itemRgb = '0,0,0' | |
93 exons = tx.get_coding_exons() if coding else tx.get_exons() | |
94 blockCount = len(exons) | |
95 if tx.gene.strand: | |
96 strand = '+' | |
97 blockSizes = [abs(e-s) for s,e in exons] | |
98 blockStarts = [s - chromStart for s,e in exons] | |
99 else: | |
100 strand = '-' | |
101 blockSizes = [abs(e-s) for s,e in reversed(exons)] | |
102 blockStarts = [s - chromStart for s,e in reversed(exons)] | |
103 blockSizes = ','.join([str(x) for x in blockSizes]) | |
104 blockStarts = ','.join([str(x) for x in blockStarts]) | |
105 return '%s\t%d\t%d\t%s\t%s\t%s\t%d\t%d\t%s\t%d\t%s\t%s' % (chrom,chromStart,chromEnd,name,score,strand,thickStart,thickEnd,itemRgb,blockCount,blockSizes,blockStarts) | |
106 | |
107 | |
108 def transcripts_in_range(self,chrom,startpos,endpos,strand=None): | |
109 spos = min(startpos,endpos) if endpos else startpos | |
110 epos = max(startpos,endpos) if endpos else startpos | |
111 transcripts = [] | |
112 for i,t in self.get_transcript_idx().items(): | |
113 if t.gene.contig == chrom and t.beg <= epos and spos <= t.end: | |
114 if strand and t.gene.strand != strand: | |
115 continue | |
116 transcripts.append(t) | |
117 return transcripts | |
118 | |
119 | |
120 def genes_in_range(self,chrom,startpos,endpos,strand=None,gene_types=None): | |
121 spos = min(startpos,endpos) if endpos else startpos | |
122 epos = max(startpos,endpos) if endpos else startpos | |
123 gene_dict = self.get_gene_dict() | |
124 gtypes = set(gene_types) & set(gene_dict.keys()) if gene_types else set(gene_dict.keys()) | |
125 genes = [] | |
126 for gt in gtypes: | |
127 for gene in gene_dict[gt]: | |
128 if gene.contig == chrom and gene.beg <= epos and spos <= gene.end: | |
129 if strand and gene.strand != strand: | |
130 continue | |
131 genes.append(gene) | |
132 return genes | |
133 | |
134 | |
135 def get_sequence(self,chrom,start,end): | |
136 if self.twobit: | |
137 if chrom in self.twobit: | |
138 return self.twobit[chrom][start:end] | |
139 contig = chrom[3:] if chrom.startswith('chr') else 'chr%s' % chrom | |
140 if contig in self.twobit: | |
141 return self.twobit[contig][start:end] | |
142 return None | |
143 | |
144 | |
145 def sequence_sizes(self): | |
146 return self.twobit.sequence_sizes() | |
147 | |
148 | |
149 def get_transcript_seq(self,transcript_id,coding=False): | |
150 tx = self.get_transcript_idx()[transcript_id] | |
151 chrom = tx.gene.contig | |
152 exonbnds = tx.get_coding_exons() if coding else tx.get_exons() | |
153 if tx.gene.strand: | |
154 seqs = [self.get_sequence(chrom,s,e) for s,e in exonbnds] | |
155 else: | |
156 seqs = [reverse_complement(self.get_sequence(chrom,s,e)) for s,e in exonbnds] | |
157 return ''.join(seqs) | |
158 | |
159 | |
160 def get_cdna(self,transcript_id): | |
161 return self.get_transcript_seq(transcript_id,coding=False) | |
162 | |
163 | |
164 def get_cds(self,transcript_id): | |
165 return self.get_transcript_seq(transcript_id,coding=True) | |
166 | |
167 | |
168 def genome_to_transcript_pos(self,transcript_id,genome_pos,coding=False): | |
169 tx = self.get_transcript_idx()[transcript_id] | |
170 if not tx.beg <= genome_pos < tx.end: | |
171 return None | |
172 exonbnds = tx.get_coding_exons() if coding else tx.get_exons() | |
173 cdna_pos = 0 | |
174 if tx.gene.strand: | |
175 for s,e in exonbnds: | |
176 if s <= genome_pos < e: | |
177 cdna_pos += genome_pos - s | |
178 break | |
179 else: | |
180 cdna_pos += e - s | |
181 else: | |
182 for s,e in exonbnds: | |
183 if s <= genome_pos < e: | |
184 cdna_pos += e - genome_pos - 1 | |
185 break | |
186 else: | |
187 cdna_pos += e - s | |
188 return cdna_pos | |
189 | |
190 | |
191 def genome_to_cdna_pos(self,transcript_id,genome_pos): | |
192 return self.genome_to_transcript_pos(transcript_id,genome_pos,coding=False) | |
193 | |
194 | |
195 def genome_to_cds_pos(self,transcript_id,genome_pos): | |
196 return self.genome_to_transcript_pos(transcript_id,genome_pos,coding=True) | |
197 | |
198 |