Mercurial > repos > cpt_testbed > functionalworkflow
comparison gff3_extract_sequence.py @ 0:f678e282b320 draft default tip
"planemo upload"
| author | cpt_testbed |
|---|---|
| date | Fri, 06 May 2022 07:07:23 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:f678e282b320 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 import sys | |
| 3 import argparse | |
| 4 import logging | |
| 5 import uuid | |
| 6 from CPT_GFFParser import gffParse, gffWrite | |
| 7 from Bio import SeqIO | |
| 8 from Bio.Seq import Seq | |
| 9 from Bio.SeqRecord import SeqRecord | |
| 10 from Bio.SeqFeature import FeatureLocation, CompoundLocation | |
| 11 from gff3 import feature_lambda, feature_test_type, get_id | |
| 12 | |
| 13 logging.basicConfig(level=logging.INFO) | |
| 14 log = logging.getLogger(__name__) | |
| 15 | |
| 16 | |
| 17 def main(fasta, gff3, feature_filter=None, nodesc=False): | |
| 18 if feature_filter == "nice_cds": | |
| 19 from gff2gb import gff3_to_genbank as cpt_Gff2Gbk | |
| 20 | |
| 21 | |
| 22 for rec in cpt_Gff2Gbk(gff3, fasta, 11): | |
| 23 seenList = {} | |
| 24 if rec.seq[0] == "?": | |
| 25 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") | |
| 26 exit(1) | |
| 27 for feat in sorted(rec.features, key=lambda x: x.location.start): | |
| 28 if feat.type != "CDS": | |
| 29 continue | |
| 30 | |
| 31 ind = 0 | |
| 32 if ( | |
| 33 str( | |
| 34 feat.qualifiers.get("locus_tag", get_id(feat)).replace(" ", "-") | |
| 35 ) | |
| 36 in seenList.keys() | |
| 37 ): | |
| 38 seenList[ | |
| 39 str( | |
| 40 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
| 41 " ", "-" | |
| 42 ) | |
| 43 ) | |
| 44 ] += 1 | |
| 45 ind = seenList[ | |
| 46 str( | |
| 47 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
| 48 " ", "-" | |
| 49 ) | |
| 50 ) | |
| 51 ] | |
| 52 else: | |
| 53 seenList[ | |
| 54 str( | |
| 55 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
| 56 " ", "-" | |
| 57 ) | |
| 58 ) | |
| 59 ] = 1 | |
| 60 append = "" | |
| 61 if ind != 0: | |
| 62 append = "_" + str(ind) | |
| 63 | |
| 64 if nodesc: | |
| 65 description = "" | |
| 66 else: | |
| 67 feat.qualifiers["ID"] = [feat._ID] | |
| 68 product = feat.qualifiers.get("product", "") | |
| 69 description = "{1} [Location={0.location};ID={0.qualifiers[ID][0]}]".format( | |
| 70 feat, product | |
| 71 ) | |
| 72 yield [ | |
| 73 SeqRecord( | |
| 74 feat.extract(rec).seq, | |
| 75 id=str( | |
| 76 feat.qualifiers.get("locus_tag", get_id(feat)).replace( | |
| 77 " ", "-" | |
| 78 ) | |
| 79 ) | |
| 80 + append, | |
| 81 description=description, | |
| 82 ) | |
| 83 ] | |
| 84 | |
| 85 elif feature_filter == "unique_cds": | |
| 86 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) | |
| 87 seen_ids = {} | |
| 88 | |
| 89 for rec in gffParse(gff3, base_dict=seq_dict): | |
| 90 noMatch = True | |
| 91 if "Alias" in rec.features[0].qualifiers.keys(): | |
| 92 lColumn = rec.features[0].qualifiers["Alias"][0] | |
| 93 else: | |
| 94 lColumn = "" | |
| 95 for x in seq_dict: | |
| 96 if x == rec.id or x == lColumn: | |
| 97 noMatch = False | |
| 98 if noMatch: | |
| 99 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") | |
| 100 exit(1) | |
| 101 newfeats = [] | |
| 102 for feat in sorted( | |
| 103 feature_lambda( | |
| 104 rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False | |
| 105 ), | |
| 106 key=lambda f: f.location.start, | |
| 107 ): | |
| 108 nid = rec.id + "____" + feat.id | |
| 109 if nid in seen_ids: | |
| 110 nid = nid + "__" + uuid.uuid4().hex | |
| 111 feat.qualifiers["ID"] = [nid] | |
| 112 newfeats.append(feat) | |
| 113 seen_ids[nid] = True | |
| 114 | |
| 115 if nodesc: | |
| 116 description = "" | |
| 117 else: | |
| 118 if feat.strand == -1: | |
| 119 important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)} | |
| 120 else: | |
| 121 important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)} | |
| 122 if "Name" in feat.qualifiers: | |
| 123 important_data["Name"] = feat.qualifiers.get("Name", [""])[0] | |
| 124 | |
| 125 description = "[{}]".format( | |
| 126 ";".join( | |
| 127 [ | |
| 128 "{key}={value}".format(key=k, value=v) | |
| 129 for (k, v) in important_data.items() | |
| 130 ] | |
| 131 ) | |
| 132 ) | |
| 133 #if feat.id == "CPT_Privateer_006.p01": | |
| 134 #print(feat) | |
| 135 #exit() | |
| 136 | |
| 137 if isinstance(feat.location, CompoundLocation): | |
| 138 finSeq = "" | |
| 139 if feat.strand == -1: | |
| 140 for x in feat.location.parts: | |
| 141 finSeq += str((rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement()) | |
| 142 else: | |
| 143 for x in feat.location.parts: | |
| 144 finSeq += str(rec.seq[feat.location.start + feat.phase: feat.location.end]) | |
| 145 yield [ | |
| 146 SeqRecord( | |
| 147 finSeq, | |
| 148 id=nid.replace(" ", "-"), | |
| 149 description=description, | |
| 150 ) | |
| 151 ] | |
| 152 elif feat.strand == -1: | |
| 153 yield [ | |
| 154 SeqRecord( | |
| 155 (rec.seq[feat.location.start: feat.location.end - feat.phase]).reverse_complement(), | |
| 156 id=nid.replace(" ", "-"), | |
| 157 description=description, | |
| 158 ) | |
| 159 ] | |
| 160 else: | |
| 161 yield [ | |
| 162 SeqRecord( | |
| 163 #feat.extract(rec).seq, | |
| 164 rec.seq[feat.location.start + feat.phase: feat.location.end], | |
| 165 id=nid.replace(" ", "-"), | |
| 166 description=description, | |
| 167 ) | |
| 168 ] | |
| 169 rec.features = newfeats | |
| 170 rec.annotations = {} | |
| 171 #gffWrite([rec], sys.stdout) | |
| 172 else: | |
| 173 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) | |
| 174 | |
| 175 for rec in gffParse(gff3, base_dict=seq_dict): | |
| 176 noMatch = True | |
| 177 if "Alias" in rec.features[0].qualifiers.keys(): | |
| 178 lColumn = rec.features[0].qualifiers["Alias"][0] | |
| 179 else: | |
| 180 lColumn = "" | |
| 181 for x in seq_dict: | |
| 182 if x == rec.id or x == lColumn: | |
| 183 noMatch = False | |
| 184 if noMatch: | |
| 185 sys.stderr.write("Error: No Fasta ID matches GFF ID '" + rec.id + "'\n") | |
| 186 exit(1) | |
| 187 for feat in sorted( | |
| 188 feature_lambda( | |
| 189 rec.features, | |
| 190 feature_test_type, | |
| 191 {"type": feature_filter}, | |
| 192 subfeatures=True, | |
| 193 ), | |
| 194 key=lambda f: f.location.start, | |
| 195 ): | |
| 196 id = feat.id | |
| 197 if len(id) == 0: | |
| 198 id = get_id(feat) | |
| 199 | |
| 200 if nodesc: | |
| 201 description = "" | |
| 202 else: | |
| 203 if feat.strand == -1: | |
| 204 important_data = {"Location": FeatureLocation(feat.location.start + 1, feat.location.end - feat.phase, feat.strand)} | |
| 205 else: | |
| 206 important_data = {"Location": FeatureLocation(feat.location.start + 1 + feat.phase, feat.location.end, feat.strand)} | |
| 207 if "Name" in feat.qualifiers: | |
| 208 important_data["Name"] = feat.qualifiers.get("Name", [""])[0] | |
| 209 | |
| 210 description = "[{}]".format( | |
| 211 ";".join( | |
| 212 [ | |
| 213 "{key}={value}".format(key=k, value=v) | |
| 214 for (k, v) in important_data.items() | |
| 215 ] | |
| 216 ) | |
| 217 ) | |
| 218 | |
| 219 if isinstance(feat.location, CompoundLocation): | |
| 220 finSeq = "" | |
| 221 if feat.strand == -1: | |
| 222 for x in feat.location.parts: | |
| 223 finSeq += str((rec.seq[x.start: x.end - feat.phase]).reverse_complement()) | |
| 224 else: | |
| 225 for x in feat.location.parts: | |
| 226 finSeq += str(rec.seq[x.start + feat.phase: x.end]) | |
| 227 yield [ | |
| 228 SeqRecord( | |
| 229 Seq(finSeq), | |
| 230 id=id.replace(" ", "-"), | |
| 231 description=description, | |
| 232 ) | |
| 233 ] | |
| 234 | |
| 235 else: | |
| 236 | |
| 237 if feat.strand == -1: | |
| 238 yield [ | |
| 239 SeqRecord( | |
| 240 seq=Seq(str(rec.seq[feat.location.start: feat.location.end - feat.phase])).reverse_complement(), | |
| 241 id=id.replace(" ", "-"), | |
| 242 description=description, | |
| 243 ) | |
| 244 ] | |
| 245 else: | |
| 246 yield [ | |
| 247 SeqRecord( | |
| 248 #feat.extract(rec).seq, | |
| 249 seq=Seq(str(rec.seq[feat.location.start + feat.phase: feat.location.end])), | |
| 250 id=id.replace(" ", "-"), | |
| 251 description=description, | |
| 252 ) | |
| 253 ] | |
| 254 | |
| 255 | |
| 256 if __name__ == "__main__": | |
| 257 parser = argparse.ArgumentParser( | |
| 258 description="Export corresponding sequence in genome from GFF3", epilog="" | |
| 259 ) | |
| 260 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") | |
| 261 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File") | |
| 262 parser.add_argument( | |
| 263 "--feature_filter", default=None, help="Filter for specific feature types" | |
| 264 ) | |
| 265 parser.add_argument( | |
| 266 "--nodesc", action="store_true", help="Strip description field off" | |
| 267 ) | |
| 268 args = parser.parse_args() | |
| 269 for seq in main(**vars(args)): | |
| 270 #if isinstance(seq, list): | |
| 271 # for x in seq: | |
| 272 # print(type(x.seq)) | |
| 273 # SeqIO.write(x, sys.stdout, "fasta") | |
| 274 #else: | |
| 275 SeqIO.write(seq, sys.stdout, "fasta") |
