Mercurial > repos > cpt_testbed > functionalworkflow
comparison gff3_require_sd.py @ 0:f678e282b320 draft default tip
"planemo upload"
| author | cpt_testbed |
|---|---|
| date | Fri, 06 May 2022 07:07:23 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:f678e282b320 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 import sys | |
| 3 import logging | |
| 4 import argparse | |
| 5 from Bio import SeqIO | |
| 6 from Bio.SeqFeature import FeatureLocation | |
| 7 from CPT_GFFParser import gffParse, gffWrite | |
| 8 from gff3 import feature_lambda, feature_test_type | |
| 9 from shinefind import NaiveSDCaller | |
| 10 | |
| 11 logging.basicConfig(level=logging.INFO) | |
| 12 log = logging.getLogger(__name__) | |
| 13 | |
| 14 | |
| 15 def require_shinefind(gff3, fasta): | |
| 16 sd_finder = NaiveSDCaller() | |
| 17 # Load up sequence(s) for GFF3 data | |
| 18 seq_dict = SeqIO.to_dict(SeqIO.parse(fasta, "fasta")) | |
| 19 # Parse GFF3 records | |
| 20 for record in gffParse(gff3, base_dict=seq_dict): | |
| 21 # Reopen | |
| 22 genes = list( | |
| 23 feature_lambda( | |
| 24 record.features, feature_test_type, {"type": "gene"}, subfeatures=True | |
| 25 ) | |
| 26 ) | |
| 27 good_genes = [] | |
| 28 for gene in genes: | |
| 29 cdss = sorted( | |
| 30 list( | |
| 31 feature_lambda( | |
| 32 gene.sub_features, | |
| 33 feature_test_type, | |
| 34 {"type": "CDS"}, | |
| 35 subfeatures=False, | |
| 36 ) | |
| 37 ), | |
| 38 key=lambda x: x.location.start, | |
| 39 ) | |
| 40 if len(cdss) == 0: | |
| 41 continue | |
| 42 | |
| 43 cds = cdss[0] | |
| 44 | |
| 45 sds, start, end, seq = sd_finder.testFeatureUpstream( | |
| 46 cds, record, sd_min=5, sd_max=15 | |
| 47 ) | |
| 48 if len(sds) >= 1: | |
| 49 sd_features = sd_finder.to_features( | |
| 50 sds, gene.location.strand, start, end, feature_id=gene.id | |
| 51 ) | |
| 52 gene.sub_features.append(sd_features[0]) | |
| 53 if gene.location.start > sd_features[0].location.start: | |
| 54 gene.location = FeatureLocation(int(sd_features[0].location.start), int(gene.location.end), gene.location.strand) | |
| 55 if gene.location.end < sd_features[0].location.end: | |
| 56 gene.location = FeatureLocation(int(gene.location.start), int(sd_features[0].location.end), gene.location.strand) | |
| 57 good_genes.append(gene) | |
| 58 | |
| 59 record.features = good_genes | |
| 60 yield record | |
| 61 | |
| 62 | |
| 63 if __name__ == "__main__": | |
| 64 parser = argparse.ArgumentParser(description="Identify shine-dalgarno sequences") | |
| 65 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") | |
| 66 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations") | |
| 67 args = parser.parse_args() | |
| 68 | |
| 69 for rec in require_shinefind(**vars(args)): | |
| 70 rec.annotations = {} | |
| 71 gffWrite([rec], sys.stdout) |
