comparison gff3_extract_sequence.py @ 0:f678e282b320 draft default tip

"planemo upload"
author cpt_testbed
date Fri, 06 May 2022 07:07:23 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:f678e282b320
1 #!/usr/bin/env python
2 import sys
3 import argparse
4 import logging
5 import uuid
6 from CPT_GFFParser import gffParse, gffWrite
7 from Bio import SeqIO
8 from Bio.Seq import Seq
9 from Bio.SeqRecord import SeqRecord
10 from Bio.SeqFeature import FeatureLocation, CompoundLocation
11 from gff3 import feature_lambda, feature_test_type, get_id
12
13 logging.basicConfig(level=logging.INFO)
14 log = logging.getLogger(__name__)
15
16
17 def main(fasta, gff3, feature_filter=None, nodesc=False):
18 if feature_filter == "nice_cds":
19 from gff2gb import gff3_to_genbank as cpt_Gff2Gbk
20
21
22 for rec in cpt_Gff2Gbk(gff3, fasta, 11):
23 seenList = {}
24 if rec.seq[0] == "?":
25 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
26 exit(1)
27 for feat in sorted(rec.features, key=lambda x: x.location.start):
28 if feat.type != "CDS":
29 continue
30
31 ind = 0
32 if (
33 str(
34 feat.qualifiers.get("locus_tag", get_id(feat)).replace(" ", "-")
35 )
36 in seenList.keys()
37 ):
38 seenList[
39 str(
40 feat.qualifiers.get("locus_tag", get_id(feat)).replace(
41 " ", "-"
42 )
43 )
44 ] += 1
45 ind = seenList[
46 str(
47 feat.qualifiers.get("locus_tag", get_id(feat)).replace(
48 " ", "-"
49 )
50 )
51 ]
52 else:
53 seenList[
54 str(
55 feat.qualifiers.get("locus_tag", get_id(feat)).replace(
56 " ", "-"
57 )
58 )
59 ] = 1
60 append = ""
61 if ind != 0:
62 append = "_" + str(ind)
63
64 if nodesc:
65 description = ""
66 else:
67 feat.qualifiers["ID"] = [feat._ID]
68 product = feat.qualifiers.get("product", "")
69 description = "{1} [Location={0.location};ID={0.qualifiers[ID][0]}]".format(
70 feat, product
71 )
72 yield [
73 SeqRecord(
74 feat.extract(rec).seq,
75 id=str(
76 feat.qualifiers.get("locus_tag", get_id(feat)).replace(
77 " ", "-"
78 )
79 )
80 + append,
81 description=description,
82 )
83 ]
84
85 elif feature_filter == "unique_cds":
86 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
87 seen_ids = {}
88
89 for rec in gffParse(gff3, base_dict=seq_dict):
90 noMatch = True
91 if "Alias" in rec.features[0].qualifiers.keys():
92 lColumn = rec.features[0].qualifiers["Alias"][0]
93 else:
94 lColumn = ""
95 for x in seq_dict:
96 if x == rec.id or x == lColumn:
97 noMatch = False
98 if noMatch:
99 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
100 exit(1)
101 newfeats = []
102 for feat in sorted(
103 feature_lambda(
104 rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False
105 ),
106 key=lambda f: f.location.start,
107 ):
108 nid = rec.id + "____" + feat.id
109 if nid in seen_ids:
110 nid = nid + "__" + uuid.uuid4().hex
111 feat.qualifiers["ID"] = [nid]
112 newfeats.append(feat)
113 seen_ids[nid] = True
114
115 if nodesc:
116 description = ""
117 else:
118 if feat.strand == -1:
119 important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)}
120 else:
121 important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)}
122 if "Name" in feat.qualifiers:
123 important_data["Name"] = feat.qualifiers.get("Name", [""])[0]
124
125 description = "[{}]".format(
126 ";".join(
127 [
128 "{key}={value}".format(key=k, value=v)
129 for (k, v) in important_data.items()
130 ]
131 )
132 )
133 #if feat.id == "CPT_Privateer_006.p01":
134 #print(feat)
135 #exit()
136
137 if isinstance(feat.location, CompoundLocation):
138 finSeq = ""
139 if feat.strand == -1:
140 for x in feat.location.parts:
141 finSeq += str((rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement())
142 else:
143 for x in feat.location.parts:
144 finSeq += str(rec.seq[feat.location.start + feat.phase: feat.location.end])
145 yield [
146 SeqRecord(
147 finSeq,
148 id=nid.replace(" ", "-"),
149 description=description,
150 )
151 ]
152 elif feat.strand == -1:
153 yield [
154 SeqRecord(
155 (rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement(),
156 id=nid.replace(" ", "-"),
157 description=description,
158 )
159 ]
160 else:
161 yield [
162 SeqRecord(
163 #feat.extract(rec).seq,
164 rec.seq[feat.location.start + feat.phase: feat.location.end],
165 id=nid.replace(" ", "-"),
166 description=description,
167 )
168 ]
169 rec.features = newfeats
170 rec.annotations = {}
171 #gffWrite([rec], sys.stdout)
172 else:
173 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta"))
174
175 for rec in gffParse(gff3, base_dict=seq_dict):
176 noMatch = True
177 if "Alias" in rec.features[0].qualifiers.keys():
178 lColumn = rec.features[0].qualifiers["Alias"][0]
179 else:
180 lColumn = ""
181 for x in seq_dict:
182 if x == rec.id or x == lColumn:
183 noMatch = False
184 if noMatch:
185 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n")
186 exit(1)
187 for feat in sorted(
188 feature_lambda(
189 rec.features,
190 feature_test_type,
191 {"type": feature_filter},
192 subfeatures=True,
193 ),
194 key=lambda f: f.location.start,
195 ):
196 id = feat.id
197 if len(id) == 0:
198 id = get_id(feat)
199
200 if nodesc:
201 description = ""
202 else:
203 if feat.strand == -1:
204 important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)}
205 else:
206 important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)}
207 if "Name" in feat.qualifiers:
208 important_data["Name"] = feat.qualifiers.get("Name", [""])[0]
209
210 description = "[{}]".format(
211 ";".join(
212 [
213 "{key}={value}".format(key=k, value=v)
214 for (k, v) in important_data.items()
215 ]
216 )
217 )
218
219 if isinstance(feat.location, CompoundLocation):
220 finSeq = ""
221 if feat.strand == -1:
222 for x in feat.location.parts:
223 finSeq += str((rec.seq[x.start: x.end - feat.phase]).reverse_complement())
224 else:
225 for x in feat.location.parts:
226 finSeq += str(rec.seq[x.start + feat.phase: x.end])
227 yield [
228 SeqRecord(
229 Seq(finSeq),
230 id=id.replace(" ", "-"),
231 description=description,
232 )
233 ]
234
235 else:
236
237 if feat.strand == -1:
238 yield [
239 SeqRecord(
240 seq=Seq(str(rec.seq[feat.location.start: feat.location.end - feat.phase])).reverse_complement(),
241 id=id.replace(" ", "-"),
242 description=description,
243 )
244 ]
245 else:
246 yield [
247 SeqRecord(
248 #feat.extract(rec).seq,
249 seq=Seq(str(rec.seq[feat.location.start + feat.phase: feat.location.end])),
250 id=id.replace(" ", "-"),
251 description=description,
252 )
253 ]
254
255
256 if __name__ == "__main__":
257 parser = argparse.ArgumentParser(
258 description="Export corresponding sequence in genome from GFF3", epilog=""
259 )
260 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome")
261 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File")
262 parser.add_argument(
263 "--feature_filter", default=None, help="Filter for specific feature types"
264 )
265 parser.add_argument(
266 "--nodesc", action="store_true", help="Strip description field off"
267 )
268 args = parser.parse_args()
269 for seq in main(**vars(args)):
270 #if isinstance(seq, list):
271 # for x in seq:
272 # print(type(x.seq))
273 # SeqIO.write(x, sys.stdout, "fasta")
274 #else:
275 SeqIO.write(seq, sys.stdout, "fasta")