Mercurial > repos > cpt_testbed > functionalworkflow
comparison gff3_extract_sequence.py @ 0:f678e282b320 draft default tip
"planemo upload"
author | cpt_testbed |
---|---|
date | Fri, 06 May 2022 07:07:23 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:f678e282b320 |
---|---|
1 #!/usr/bin/env python | |
2 import sys | |
3 import argparse | |
4 import logging | |
5 import uuid | |
6 from CPT_GFFParser import gffParse, gffWrite | |
7 from Bio import SeqIO | |
8 from Bio.Seq import Seq | |
9 from Bio.SeqRecord import SeqRecord | |
10 from Bio.SeqFeature import FeatureLocation, CompoundLocation | |
11 from gff3 import feature_lambda, feature_test_type, get_id | |
12 | |
13 logging.basicConfig(level=logging.INFO) | |
14 log = logging.getLogger(__name__) | |
15 | |
16 | |
17 def main(fasta, gff3, feature_filter=None, nodesc=False): | |
18 if feature_filter == "nice_cds": | |
19 from gff2gb import gff3_to_genbank as cpt_Gff2Gbk | |
20 | |
21 | |
22 for rec in cpt_Gff2Gbk(gff3, fasta, 11): | |
23 seenList = {} | |
24 if rec.seq[0] == "?": | |
25 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") | |
26 exit(1) | |
27 for feat in sorted(rec.features, key=lambda x: x.location.start): | |
28 if feat.type != "CDS": | |
29 continue | |
30 | |
31 ind = 0 | |
32 if ( | |
33 str( | |
34 feat.qualifiers.get("locus_tag", get_id(feat)).replace(" ", "-") | |
35 ) | |
36 in seenList.keys() | |
37 ): | |
38 seenList[ | |
39 str( | |
40 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
41 " ", "-" | |
42 ) | |
43 ) | |
44 ] += 1 | |
45 ind = seenList[ | |
46 str( | |
47 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
48 " ", "-" | |
49 ) | |
50 ) | |
51 ] | |
52 else: | |
53 seenList[ | |
54 str( | |
55 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
56 " ", "-" | |
57 ) | |
58 ) | |
59 ] = 1 | |
60 append = "" | |
61 if ind != 0: | |
62 append = "_" + str(ind) | |
63 | |
64 if nodesc: | |
65 description = "" | |
66 else: | |
67 feat.qualifiers["ID"] = [feat._ID] | |
68 product = feat.qualifiers.get("product", "") | |
69 description = "{1} [Location={0.location};ID={0.qualifiers[ID][0]}]".format( | |
70 feat, product | |
71 ) | |
72 yield [ | |
73 SeqRecord( | |
74 feat.extract(rec).seq, | |
75 id=str( | |
76 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
77 " ", "-" | |
78 ) | |
79 ) | |
80 + append, | |
81 description=description, | |
82 ) | |
83 ] | |
84 | |
85 elif feature_filter == "unique_cds": | |
86 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) | |
87 seen_ids = {} | |
88 | |
89 for rec in gffParse(gff3, base_dict=seq_dict): | |
90 noMatch = True | |
91 if "Alias" in rec.features[0].qualifiers.keys(): | |
92 lColumn = rec.features[0].qualifiers["Alias"][0] | |
93 else: | |
94 lColumn = "" | |
95 for x in seq_dict: | |
96 if x == rec.id or x == lColumn: | |
97 noMatch = False | |
98 if noMatch: | |
99 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") | |
100 exit(1) | |
101 newfeats = [] | |
102 for feat in sorted( | |
103 feature_lambda( | |
104 rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False | |
105 ), | |
106 key=lambda f: f.location.start, | |
107 ): | |
108 nid = rec.id + "____" + feat.id | |
109 if nid in seen_ids: | |
110 nid = nid + "__" + uuid.uuid4().hex | |
111 feat.qualifiers["ID"] = [nid] | |
112 newfeats.append(feat) | |
113 seen_ids[nid] = True | |
114 | |
115 if nodesc: | |
116 description = "" | |
117 else: | |
118 if feat.strand == -1: | |
119 important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)} | |
120 else: | |
121 important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)} | |
122 if "Name" in feat.qualifiers: | |
123 important_data["Name"] = feat.qualifiers.get("Name", [""])[0] | |
124 | |
125 description = "[{}]".format( | |
126 ";".join( | |
127 [ | |
128 "{key}={value}".format(key=k, value=v) | |
129 for (k, v) in important_data.items() | |
130 ] | |
131 ) | |
132 ) | |
133 #if feat.id == "CPT_Privateer_006.p01": | |
134 #print(feat) | |
135 #exit() | |
136 | |
137 if isinstance(feat.location, CompoundLocation): | |
138 finSeq = "" | |
139 if feat.strand == -1: | |
140 for x in feat.location.parts: | |
141 finSeq += str((rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement()) | |
142 else: | |
143 for x in feat.location.parts: | |
144 finSeq += str(rec.seq[feat.location.start + feat.phase: feat.location.end]) | |
145 yield [ | |
146 SeqRecord( | |
147 finSeq, | |
148 id=nid.replace(" ", "-"), | |
149 description=description, | |
150 ) | |
151 ] | |
152 elif feat.strand == -1: | |
153 yield [ | |
154 SeqRecord( | |
155 (rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement(), | |
156 id=nid.replace(" ", "-"), | |
157 description=description, | |
158 ) | |
159 ] | |
160 else: | |
161 yield [ | |
162 SeqRecord( | |
163 #feat.extract(rec).seq, | |
164 rec.seq[feat.location.start + feat.phase: feat.location.end], | |
165 id=nid.replace(" ", "-"), | |
166 description=description, | |
167 ) | |
168 ] | |
169 rec.features = newfeats | |
170 rec.annotations = {} | |
171 #gffWrite([rec], sys.stdout) | |
172 else: | |
173 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) | |
174 | |
175 for rec in gffParse(gff3, base_dict=seq_dict): | |
176 noMatch = True | |
177 if "Alias" in rec.features[0].qualifiers.keys(): | |
178 lColumn = rec.features[0].qualifiers["Alias"][0] | |
179 else: | |
180 lColumn = "" | |
181 for x in seq_dict: | |
182 if x == rec.id or x == lColumn: | |
183 noMatch = False | |
184 if noMatch: | |
185 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") | |
186 exit(1) | |
187 for feat in sorted( | |
188 feature_lambda( | |
189 rec.features, | |
190 feature_test_type, | |
191 {"type": feature_filter}, | |
192 subfeatures=True, | |
193 ), | |
194 key=lambda f: f.location.start, | |
195 ): | |
196 id = feat.id | |
197 if len(id) == 0: | |
198 id = get_id(feat) | |
199 | |
200 if nodesc: | |
201 description = "" | |
202 else: | |
203 if feat.strand == -1: | |
204 important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)} | |
205 else: | |
206 important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)} | |
207 if "Name" in feat.qualifiers: | |
208 important_data["Name"] = feat.qualifiers.get("Name", [""])[0] | |
209 | |
210 description = "[{}]".format( | |
211 ";".join( | |
212 [ | |
213 "{key}={value}".format(key=k, value=v) | |
214 for (k, v) in important_data.items() | |
215 ] | |
216 ) | |
217 ) | |
218 | |
219 if isinstance(feat.location, CompoundLocation): | |
220 finSeq = "" | |
221 if feat.strand == -1: | |
222 for x in feat.location.parts: | |
223 finSeq += str((rec.seq[x.start: x.end - feat.phase]).reverse_complement()) | |
224 else: | |
225 for x in feat.location.parts: | |
226 finSeq += str(rec.seq[x.start + feat.phase: x.end]) | |
227 yield [ | |
228 SeqRecord( | |
229 Seq(finSeq), | |
230 id=id.replace(" ", "-"), | |
231 description=description, | |
232 ) | |
233 ] | |
234 | |
235 else: | |
236 | |
237 if feat.strand == -1: | |
238 yield [ | |
239 SeqRecord( | |
240 seq=Seq(str(rec.seq[feat.location.start: feat.location.end - feat.phase])).reverse_complement(), | |
241 id=id.replace(" ", "-"), | |
242 description=description, | |
243 ) | |
244 ] | |
245 else: | |
246 yield [ | |
247 SeqRecord( | |
248 #feat.extract(rec).seq, | |
249 seq=Seq(str(rec.seq[feat.location.start + feat.phase: feat.location.end])), | |
250 id=id.replace(" ", "-"), | |
251 description=description, | |
252 ) | |
253 ] | |
254 | |
255 | |
256 if __name__ == "__main__": | |
257 parser = argparse.ArgumentParser( | |
258 description="Export corresponding sequence in genome from GFF3", epilog="" | |
259 ) | |
260 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") | |
261 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File") | |
262 parser.add_argument( | |
263 "--feature_filter", default=None, help="Filter for specific feature types" | |
264 ) | |
265 parser.add_argument( | |
266 "--nodesc", action="store_true", help="Strip description field off" | |
267 ) | |
268 args = parser.parse_args() | |
269 for seq in main(**vars(args)): | |
270 #if isinstance(seq, list): | |
271 # for x in seq: | |
272 # print(type(x.seq)) | |
273 # SeqIO.write(x, sys.stdout, "fasta") | |
274 #else: | |
275 SeqIO.write(seq, sys.stdout, "fasta") |