Mercurial > repos > cpt_testbed > cpt_apolloprep
comparison gff3_prep_for_apollo.py @ 0:b276df131078 draft default tip
Uploaded
| author | cpt_testbed |
|---|---|
| date | Fri, 29 Apr 2022 11:24:01 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:b276df131078 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 import sys | |
| 3 import logging | |
| 4 import argparse | |
| 5 import copy | |
| 6 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature | |
| 7 from gff3 import feature_lambda, feature_test_type | |
| 8 from Bio.SeqFeature import FeatureLocation | |
| 9 | |
| 10 logging.basicConfig(level=logging.INFO) | |
| 11 log = logging.getLogger(__name__) | |
| 12 | |
| 13 ALLOWED_FEATURES = [ | |
| 14 "mRNA", | |
| 15 "exon", | |
| 16 "transposable_element", | |
| 17 "tRNA", | |
| 18 "transcript", | |
| 19 "terminator", | |
| 20 "Shine_Dalgarno_Sequence", | |
| 21 "pseudogene", | |
| 22 "stop_codon_read_through", | |
| 23 "repeat_region", | |
| 24 "CDS", | |
| 25 "gene", | |
| 26 "rRNA", | |
| 27 "ncRNA", | |
| 28 "snRNA", | |
| 29 "snoRNA", | |
| 30 "miRNA", | |
| 31 ] | |
| 32 | |
| 33 SPECIAL_REMOVED_FEATURES = ["gene_component_region", "sequence_difference"] | |
| 34 | |
| 35 | |
| 36 | |
| 37 def add_exons(features): | |
| 38 for gene in feature_lambda( | |
| 39 features, feature_test_type, {"type": "gene"}, subfeatures=True | |
| 40 ): | |
| 41 clean_gene = copy.deepcopy(gene) | |
| 42 exon_start = None | |
| 43 exon_end = None | |
| 44 exon_strand = None | |
| 45 cds_list = [] | |
| 46 | |
| 47 #for mRNA in gene.sub_features: | |
| 48 # for x in mRNA.sub_features: | |
| 49 # x.qualifiers["Parent"] = [gene.id] | |
| 50 # gene.sub_features.append(x) | |
| 51 | |
| 52 for exon in feature_lambda(gene.sub_features, feature_test_type, {"type": "exon"}, subfeatures=False,recurse=False): | |
| 53 #if the gene contains an exon, skip. | |
| 54 continue | |
| 55 hasMRNA = False | |
| 56 for x in gene.sub_features: | |
| 57 if x.type == "mRNA": | |
| 58 hasMRNA = True | |
| 59 mRNA = x | |
| 60 """ | |
| 61 if not hasMRNA: | |
| 62 mRNA = gffSeqFeature( | |
| 63 location=FeatureLocation(gene.location.start, gene.location.end, gene.location.strand), | |
| 64 type="mRNA", | |
| 65 source = "cpt.prepApollo", | |
| 66 qualifiers={ | |
| 67 "ID": ["%s.mRNA" % clean_gene.qualifiers["ID"][0]], | |
| 68 "Parent": clean_gene.qualifiers["ID"], | |
| 69 }, | |
| 70 sub_features=gene.sub_features, | |
| 71 strand=exon_strand | |
| 72 ) | |
| 73 for x in mRNA.sub_features: | |
| 74 x.qualifiers["Parent"] = mRNA["ID"] | |
| 75 clean_gene.sub_features = [mRNA] | |
| 76 else: | |
| 77 for x in clean_gene.sub_features: | |
| 78 if x.type != "mRNA": | |
| 79 x.qualifiers["Parent"] = [mRNA.id] """ | |
| 80 | |
| 81 # check for CDS child features of the gene, do not go a further step (this should skip any CDS children of exon child features) | |
| 82 for cds in feature_lambda( | |
| 83 gene.sub_features, | |
| 84 feature_test_type, | |
| 85 {"type": "CDS"}, | |
| 86 subfeatures=False, | |
| 87 recurse=False, | |
| 88 ): | |
| 89 # check all CDS features for min/max boundaries | |
| 90 if exon_start is None: | |
| 91 exon_start = cds.location.start | |
| 92 exon_strand = cds.location.strand | |
| 93 if exon_end is None: | |
| 94 exon_end = cds.location.end | |
| 95 exon_start = min(exon_start, cds.location.start) | |
| 96 exon_end = max(exon_end, cds.location.end) | |
| 97 cds_list.append(cds) | |
| 98 if cds_list: | |
| 99 # we found a CDS to adopt | |
| 100 new_exon = gffSeqFeature( | |
| 101 location=FeatureLocation(exon_start, exon_end), | |
| 102 type="exon", | |
| 103 source = "cpt.prepApollo", | |
| 104 qualifiers={ | |
| 105 "ID": ["%s.exon" % clean_gene.qualifiers["ID"][0]], | |
| 106 "Parent": [clean_gene.id], | |
| 107 "ApolloExon": ["True"], | |
| 108 }, | |
| 109 sub_features=[], | |
| 110 strand=exon_strand | |
| 111 ) | |
| 112 for cds in cds_list: | |
| 113 cds.qualifiers["Parent"] = new_exon.qualifiers["ID"] | |
| 114 new_exon.sub_features.append(cds) | |
| 115 #gene.sub_features.append(new_exon) | |
| 116 # get all the other children of gene that AREN'T a CDS including the new exon | |
| 117 clean_gene.sub_features.append(copy.deepcopy(new_exon)) | |
| 118 #clean_gene.sub_features.append(gffSeqFeature(location=FeatureLocation(exon_start, exon_end, exon_strand), type="exon", source = "cpt.prepApollo", qualifiers={"ID": ["%s.exon" % clean_gene.qualifiers["ID"][0]], "Parent": clean_gene.qualifiers["ID"]}, sub_features=[], strand=exon_strand)) | |
| 119 """ | |
| 120 for sf in feature_lambda( | |
| 121 gene.sub_features, | |
| 122 feature_test_type, | |
| 123 {"type": "CDS"}, | |
| 124 subfeatures=True, | |
| 125 recurse=False, | |
| 126 invert=True, | |
| 127 ): | |
| 128 child = copy.deepcopy(sf) | |
| 129 child.qualifiers["Parent"] = new_exon.qualifiers["ID"] | |
| 130 clean_gene.sub_features.append(child) | |
| 131 """ | |
| 132 # add them to the new Exon feature | |
| 133 # return the cleaned gene with new exon | |
| 134 yield clean_gene | |
| 135 | |
| 136 def process_features(features): | |
| 137 # change RBS to 'Shine_Dalgarno_sequence' | |
| 138 for rbs in feature_lambda(features, feature_test_type, {'type': "RBS"}): | |
| 139 rbs.type = "Shine_Dalgarno_sequence" | |
| 140 | |
| 141 # Filter top level features | |
| 142 for feature in feature_lambda(features, feature_test_type, {"types": ALLOWED_FEATURES}, subfeatures=True): | |
| 143 cleaned_subfeatures = [] | |
| 144 for sf in feature.sub_features: | |
| 145 if sf.type in SPECIAL_REMOVED_FEATURES: | |
| 146 # 'gene_component_region' is uncaught by feature_test_type as it contains `gene` | |
| 147 continue | |
| 148 else: | |
| 149 cleaned_subfeatures.append(sf) | |
| 150 feature.sub_features = copy.deepcopy(cleaned_subfeatures) | |
| 151 yield feature | |
| 152 | |
| 153 def gff_filter(gff3): | |
| 154 for rec in gffParse(gff3): | |
| 155 cleaned_features = sorted(list(process_features(rec.features)), key=lambda x: x.location.start) | |
| 156 rec.features = sorted(list(add_exons(cleaned_features)), key=lambda x: x.location.start) | |
| 157 rec.annotations = {} | |
| 158 gffWrite([rec], sys.stdout) | |
| 159 | |
| 160 | |
| 161 if __name__ == "__main__": | |
| 162 parser = argparse.ArgumentParser( | |
| 163 description="add parent exon features to CDSs for Apollo" | |
| 164 ) | |
| 165 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations") | |
| 166 args = parser.parse_args() | |
| 167 gff_filter(**vars(args)) |
