Mercurial > repos > cpt_testbed > suite_work2
comparison shinefind.py @ 0:d5c3354c166d draft default tip
Uploaded
| author | cpt_testbed |
|---|---|
| date | Fri, 29 Apr 2022 10:33:36 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d5c3354c166d |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 import re | |
| 3 import sys | |
| 4 import argparse | |
| 5 import logging | |
| 6 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature | |
| 7 from Bio import SeqIO | |
| 8 from Bio.SeqRecord import SeqRecord | |
| 9 from Bio.SeqFeature import FeatureLocation | |
| 10 from gff3 import ( | |
| 11 feature_lambda, | |
| 12 feature_test_type, | |
| 13 feature_test_true, | |
| 14 feature_test_quals, | |
| 15 get_id, | |
| 16 ensure_location_in_bounds, | |
| 17 ) | |
| 18 | |
| 19 logging.basicConfig(level=logging.INFO) | |
| 20 log = logging.getLogger() | |
| 21 | |
| 22 | |
| 23 class NaiveSDCaller(object): | |
| 24 | |
| 25 # TODO May make switch for different sequence sets | |
| 26 SD_SEQUENCES = ( | |
| 27 "AGGAGGT", | |
| 28 "GGAGGT", | |
| 29 "AGGAGG", | |
| 30 "GGGGGG", | |
| 31 "AGGAG", | |
| 32 "GAGGT", | |
| 33 "GGAGG", | |
| 34 "GGGGG", | |
| 35 "AGGT", | |
| 36 "GGGT", | |
| 37 "GAGG", | |
| 38 "GGGG", | |
| 39 "AGGA", | |
| 40 "GGAG", | |
| 41 "GGA", | |
| 42 "GAG", | |
| 43 "AGG", | |
| 44 "GGT", | |
| 45 "GGG", | |
| 46 ) | |
| 47 | |
| 48 def __init__(self): | |
| 49 self.sd_reg = [re.compile(x, re.IGNORECASE) for x in self.SD_SEQUENCES] | |
| 50 | |
| 51 def list_sds(self, sequence, sd_min=3, sd_max=17): | |
| 52 hits = [] | |
| 53 for regex in self.sd_reg: | |
| 54 for match in regex.finditer(sequence): | |
| 55 spacing = len(sequence) - len(match.group()) - match.start() | |
| 56 if sd_max >= spacing+sd_min and spacing+sd_min >= sd_min: | |
| 57 #if the spacing is within gap limits, add | |
| 58 #(search space is [sd_max+7 .. sd_min] so actual gap is spacing+sd_min) | |
| 59 #print('min %d max %d - adding SD with gap %d' % (sd_min, sd_max, spacing+sd_min)) | |
| 60 hits.append( | |
| 61 { | |
| 62 "spacing": spacing, | |
| 63 "hit": match.group(), | |
| 64 "start": match.start(), | |
| 65 "end": match.end(), | |
| 66 "len": len(match.group()), | |
| 67 } | |
| 68 ) | |
| 69 hits = sorted(hits, key= lambda x: (-x['len'],x['spacing'])) | |
| 70 return hits | |
| 71 | |
| 72 @classmethod | |
| 73 def highlight_sd(cls, sequence, start, end): | |
| 74 return " ".join( | |
| 75 [ | |
| 76 sequence[0:start].lower(), | |
| 77 sequence[start:end].upper(), | |
| 78 sequence[end:].lower(), | |
| 79 ] | |
| 80 ) | |
| 81 | |
| 82 @classmethod | |
| 83 def to_features(cls, hits, strand, parent_start, parent_end, feature_id=None, sd_min=3, sd_max=17): | |
| 84 results = [] | |
| 85 for idx, hit in enumerate(hits): | |
| 86 # gene complement(124..486) | |
| 87 # -1 491 501 0 5 5 | |
| 88 # -1 491 501 0 4 5 | |
| 89 # -1 491 501 1 4 5 | |
| 90 # -1 491 501 2 3 5 | |
| 91 # -1 491 501 1 3 5 | |
| 92 # -1 491 501 0 3 5 | |
| 93 | |
| 94 qualifiers = { | |
| 95 "source": "CPT_ShineFind", | |
| 96 "ID": "%s.rbs-%s" % (feature_id, idx), | |
| 97 } | |
| 98 | |
| 99 if strand > 0: | |
| 100 start = parent_end - hit["spacing"] - hit["len"] | |
| 101 end = parent_end - hit["spacing"] | |
| 102 else: | |
| 103 start = parent_start + hit["spacing"] | |
| 104 end = parent_start + hit["spacing"] + hit["len"] | |
| 105 # check that the END of the SD sequence is within the given min/max of parent start/end | |
| 106 | |
| 107 # gap is either the sd_start-cds_end (neg strand) or the sd_end-cds_start (pos strand) | |
| 108 # minimum absolute value of these two will be the proper gap regardless of strand | |
| 109 tmp = gffSeqFeature( | |
| 110 FeatureLocation(min(start, end), max(start, end), strand=strand), | |
| 111 #FeatureLocation(min(start, end), max(start, end), strand=strand), | |
| 112 type="Shine_Dalgarno_sequence", | |
| 113 qualifiers=qualifiers, | |
| 114 ) | |
| 115 results.append(tmp) | |
| 116 return results | |
| 117 | |
| 118 def testFeatureUpstream(self, feature, record, sd_min=3, sd_max=17): | |
| 119 # Strand information necessary to getting correct upstream sequence | |
| 120 strand = feature.location.strand | |
| 121 | |
| 122 # n_bases_upstream (plus/minus 7 upstream to make the min/max define the possible gap position) | |
| 123 if strand > 0: | |
| 124 start = feature.location.start - sd_max - 7 | |
| 125 end = feature.location.start - sd_min | |
| 126 else: | |
| 127 start = feature.location.end + sd_min | |
| 128 end = feature.location.end + sd_max + 7 | |
| 129 | |
| 130 (start, end) = ensure_location_in_bounds( | |
| 131 start=start, end=end, parent_length=len(record) | |
| 132 ) | |
| 133 | |
| 134 # Create our temp feature used to obtain correct portion of | |
| 135 # genome | |
| 136 tmp = gffSeqFeature(FeatureLocation(min(start, end), max(start, end), strand=strand), type="domain") | |
| 137 seq = str(tmp.extract(record.seq)) | |
| 138 return self.list_sds(seq, sd_min, sd_max), start, end, seq | |
| 139 | |
| 140 def hasSd(self, feature, record, sd_min=3, sd_max=17): | |
| 141 sds, start, end, seq = self.testFeatureUpstream( | |
| 142 feature, record, sd_min=sd_min, sd_max=sd_max | |
| 143 ) | |
| 144 return len(sds) > 0 | |
| 145 | |
| 146 | |
| 147 # Cycle through subfeatures, set feature's location to be equal | |
| 148 # to the smallest start and largest end. | |
| 149 # Remove pending bugfix for feature display in Apollo | |
| 150 def fminmax(feature): | |
| 151 fmin = None | |
| 152 fmax = None | |
| 153 for sf in feature_lambda([feature], feature_test_true, {}, subfeatures=True): | |
| 154 if fmin is None: | |
| 155 fmin = sf.location.start | |
| 156 fmax = sf.location.end | |
| 157 if sf.location.start < fmin: | |
| 158 fmin = sf.location.start | |
| 159 if sf.location.end > fmax: | |
| 160 fmax = sf.location.end | |
| 161 return fmin, fmax | |
| 162 | |
| 163 | |
| 164 def fix_gene_boundaries(feature): | |
| 165 # There is a bug in Apollo whereby we have created gene | |
| 166 # features which are larger than expected, but we cannot see this. | |
| 167 # We only see a perfect sized gene + SD together. | |
| 168 # | |
| 169 # So, we clamp the location of the gene feature to the | |
| 170 # contained mRNAs. Will remove pending Apollo upgrade. | |
| 171 fmin, fmax = fminmax(feature) | |
| 172 if feature.location.strand > 0: | |
| 173 feature.location = FeatureLocation(fmin, fmax, strand=1) | |
| 174 else: | |
| 175 feature.location = FeatureLocation(fmin, fmax, strand=-1) | |
| 176 return feature | |
| 177 | |
| 178 def shinefind( | |
| 179 fasta, | |
| 180 gff3, | |
| 181 gff3_output=None, | |
| 182 table_output=None, | |
| 183 lookahead_min=3, | |
| 184 lookahead_max=17, | |
| 185 top_only=False, | |
| 186 add=False, | |
| 187 ): | |
| 188 table_output.write( | |
| 189 "\t".join( | |
| 190 [ | |
| 191 "ID", | |
| 192 "Name", | |
| 193 "Terminus", | |
| 194 "Terminus", | |
| 195 "Strand", | |
| 196 "Upstream Sequence", | |
| 197 "SD", | |
| 198 "Spacing", | |
| 199 ] | |
| 200 ) | |
| 201 + "\n" | |
| 202 ) | |
| 203 | |
| 204 sd_finder = NaiveSDCaller() | |
| 205 # Load up sequence(s) for GFF3 data | |
| 206 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) | |
| 207 # Parse GFF3 records | |
| 208 for record in gffParse(gff3, base_dict=seq_dict): | |
| 209 # Shinefind's gff3_output. | |
| 210 gff3_output_record = SeqRecord(record.seq, record.id) | |
| 211 # Filter out just coding sequences | |
| 212 ignored_features = [] | |
| 213 for x in record.features: | |
| 214 # If feature X does NOT contain a CDS, add to ignored_features | |
| 215 # list. This means if we have a top level gene feature with or | |
| 216 # without a CDS subfeature, we're catch it appropriately here. | |
| 217 if ( | |
| 218 len( | |
| 219 list( | |
| 220 feature_lambda( | |
| 221 [x], feature_test_type, {"type": "CDS"}, subfeatures=True | |
| 222 ) | |
| 223 ) | |
| 224 ) | |
| 225 == 0 | |
| 226 ): | |
| 227 ignored_features.append(x) | |
| 228 | |
| 229 # Loop over all gene features | |
| 230 for gene in feature_lambda( | |
| 231 record.features, feature_test_type, {"type": "gene"}, subfeatures=True | |
| 232 ): | |
| 233 | |
| 234 # Get the CDS from this gene. | |
| 235 feature = sorted( | |
| 236 list( | |
| 237 feature_lambda( | |
| 238 gene.sub_features, | |
| 239 feature_test_type, | |
| 240 {"type": "CDS"}, | |
| 241 subfeatures=True, | |
| 242 ) | |
| 243 ), | |
| 244 key=lambda x: x.location.start, | |
| 245 ) | |
| 246 # If no CDSs are in this gene feature, then quit | |
| 247 if len(feature) == 0: | |
| 248 # We've already caught these above in our ignored_features | |
| 249 # list, so we skip out on the rest of this for loop | |
| 250 continue | |
| 251 else: | |
| 252 # Otherwise pull the first on the strand. | |
| 253 feature = feature[0] | |
| 254 | |
| 255 # Three different ways RBSs can be stored that we expect. | |
| 256 rbs_rbs = list( | |
| 257 feature_lambda( | |
| 258 gene.sub_features, | |
| 259 feature_test_type, | |
| 260 {"type": "RBS"}, | |
| 261 subfeatures=False, | |
| 262 ) | |
| 263 ) | |
| 264 rbs_sds = list( | |
| 265 feature_lambda( | |
| 266 gene.sub_features, | |
| 267 feature_test_type, | |
| 268 {"type": "Shine_Dalgarno_sequence"}, | |
| 269 subfeatures=False, | |
| 270 ) | |
| 271 ) | |
| 272 regulatory_elements = list( | |
| 273 feature_lambda( | |
| 274 gene.sub_features, | |
| 275 feature_test_type, | |
| 276 {"type": "regulatory"}, | |
| 277 subfeatures=False, | |
| 278 ) | |
| 279 ) | |
| 280 rbs_regulatory = list( | |
| 281 feature_lambda( | |
| 282 regulatory_elements, | |
| 283 feature_test_quals, | |
| 284 {"regulatory_class": ["ribosome_binding_site"]}, | |
| 285 subfeatures=False, | |
| 286 ) | |
| 287 ) | |
| 288 rbss = rbs_rbs + rbs_sds + rbs_regulatory | |
| 289 | |
| 290 # If someone has already annotated an RBS, we move to the next gene | |
| 291 if len(rbss) > 0: | |
| 292 log.debug("Has %s RBSs", len(rbss)) | |
| 293 ignored_features.append(gene) | |
| 294 continue | |
| 295 | |
| 296 sds, start, end, seq = sd_finder.testFeatureUpstream( | |
| 297 feature, record, sd_min=lookahead_min, sd_max=lookahead_max | |
| 298 ) | |
| 299 | |
| 300 feature_id = get_id(feature) | |
| 301 sd_features = sd_finder.to_features( | |
| 302 sds, feature.location.strand, start, end, feature_id=feature.id | |
| 303 ) | |
| 304 | |
| 305 human_strand = "+" if feature.location.strand == 1 else "-" | |
| 306 | |
| 307 # http://book.pythontips.com/en/latest/for_-_else.html | |
| 308 log.debug("Found %s SDs", len(sds)) | |
| 309 for (sd, sd_feature) in zip(sds, sd_features): | |
| 310 # If we only want the top feature, after the bulk of the | |
| 311 # forloop executes once, we append the top feature, and fake a | |
| 312 # break, because an actual break triggers the else: block | |
| 313 table_output.write( | |
| 314 "\t".join( | |
| 315 map( | |
| 316 str, | |
| 317 [ | |
| 318 feature.id, | |
| 319 feature_id, | |
| 320 feature.location.start, | |
| 321 feature.location.end, | |
| 322 human_strand, | |
| 323 sd_finder.highlight_sd(seq, sd["start"], sd["end"]), | |
| 324 sd["hit"], | |
| 325 int(sd["spacing"]) + lookahead_min, | |
| 326 ], | |
| 327 ) | |
| 328 ) | |
| 329 + "\n" | |
| 330 ) | |
| 331 | |
| 332 if add: | |
| 333 # Append the top RBS to the gene feature | |
| 334 gene.sub_features.append(sd_feature) | |
| 335 # Pick out start/end locations for all sub_features | |
| 336 locations = [x.location.start for x in gene.sub_features] + [ | |
| 337 x.location.end for x in gene.sub_features | |
| 338 ] | |
| 339 # Update gene's start/end to be inclusive | |
| 340 gene.location._start = min(locations) | |
| 341 gene.location._end = max(locations) | |
| 342 # Also register the feature with the separate GFF3 output | |
| 343 sd_feature = fix_gene_boundaries(sd_feature) | |
| 344 gff3_output_record.features.append(sd_feature) | |
| 345 | |
| 346 if top_only or sd == (sds[-1]): | |
| 347 break | |
| 348 else: | |
| 349 table_output.write( | |
| 350 "\t".join( | |
| 351 map( | |
| 352 str, | |
| 353 [ | |
| 354 feature.id, | |
| 355 feature_id, | |
| 356 feature.location.start, | |
| 357 feature.location.end, | |
| 358 human_strand, | |
| 359 seq, | |
| 360 None, | |
| 361 -1, | |
| 362 ], | |
| 363 ) | |
| 364 ) | |
| 365 + "\n" | |
| 366 ) | |
| 367 | |
| 368 record.annotations = {} | |
| 369 gffWrite([record], sys.stdout) | |
| 370 | |
| 371 gff3_output_record.features = sorted( | |
| 372 gff3_output_record.features, key=lambda x: x.location.start | |
| 373 ) | |
| 374 gff3_output_record.annotations = {} | |
| 375 gffWrite([gff3_output_record], gff3_output) | |
| 376 | |
| 377 | |
| 378 if __name__ == "__main__": | |
| 379 parser = argparse.ArgumentParser(description="Identify shine-dalgarno sequences") | |
| 380 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") | |
| 381 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations") | |
| 382 | |
| 383 parser.add_argument( | |
| 384 "--gff3_output", | |
| 385 type=argparse.FileType("w"), | |
| 386 help="GFF3 Output", | |
| 387 default="shinefind.gff3", | |
| 388 ) | |
| 389 parser.add_argument( | |
| 390 "--table_output", | |
| 391 type=argparse.FileType("w"), | |
| 392 help="Tabular Output", | |
| 393 default="shinefind.tbl", | |
| 394 ) | |
| 395 | |
| 396 parser.add_argument( | |
| 397 "--lookahead_min", | |
| 398 nargs="?", | |
| 399 type=int, | |
| 400 help="Number of bases upstream of CDSs to end search", | |
| 401 default=3, | |
| 402 ) | |
| 403 parser.add_argument( | |
| 404 "--lookahead_max", | |
| 405 nargs="?", | |
| 406 type=int, | |
| 407 help="Number of bases upstream of CDSs to begin search", | |
| 408 default=17, | |
| 409 ) | |
| 410 | |
| 411 parser.add_argument("--top_only", action="store_true", help="Only report best hits") | |
| 412 parser.add_argument( | |
| 413 "--add", | |
| 414 action="store_true", | |
| 415 help='Function in "addition" mode whereby the ' | |
| 416 + "RBSs are added directly to the gene model.", | |
| 417 ) | |
| 418 | |
| 419 args = parser.parse_args() | |
| 420 shinefind(**vars(args)) |
