| 0 | 1 #!/usr/bin/env python | 
|  | 2 import sys | 
|  | 3 import logging | 
|  | 4 import argparse | 
|  | 5 import copy | 
|  | 6 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature | 
|  | 7 from gff3 import feature_lambda, feature_test_type | 
|  | 8 from Bio.SeqFeature import FeatureLocation | 
|  | 9 | 
|  | 10 logging.basicConfig(level=logging.INFO) | 
|  | 11 log = logging.getLogger(__name__) | 
|  | 12 | 
|  | 13 ALLOWED_FEATURES = [ | 
|  | 14         "mRNA", | 
|  | 15         "exon", | 
|  | 16         "transposable_element", | 
|  | 17         "tRNA", | 
|  | 18         "transcript", | 
|  | 19         "terminator", | 
|  | 20         "Shine_Dalgarno_Sequence", | 
|  | 21         "pseudogene", | 
|  | 22         "stop_codon_read_through", | 
|  | 23         "repeat_region", | 
|  | 24         "CDS", | 
|  | 25         "gene", | 
|  | 26         "rRNA", | 
|  | 27         "ncRNA", | 
|  | 28         "snRNA", | 
|  | 29         "snoRNA", | 
|  | 30         "miRNA", | 
|  | 31         ] | 
|  | 32 | 
|  | 33 SPECIAL_REMOVED_FEATURES = ["gene_component_region", "sequence_difference"] | 
|  | 34 | 
|  | 35 | 
|  | 36 | 
|  | 37 def add_exons(features): | 
|  | 38     for gene in feature_lambda( | 
|  | 39         features, feature_test_type, {"type": "gene"}, subfeatures=True | 
|  | 40     ): | 
|  | 41         clean_gene = copy.deepcopy(gene) | 
|  | 42         exon_start = None | 
|  | 43         exon_end = None | 
|  | 44         exon_strand = None | 
|  | 45         cds_list = [] | 
|  | 46 | 
|  | 47         #for mRNA in gene.sub_features: | 
|  | 48         #    for x in mRNA.sub_features: | 
|  | 49         #        x.qualifiers["Parent"] = [gene.id] | 
|  | 50         #        gene.sub_features.append(x) | 
|  | 51 | 
|  | 52         for exon in feature_lambda(gene.sub_features, feature_test_type, {"type": "exon"}, subfeatures=False,recurse=False): | 
|  | 53             #if the gene contains an exon, skip. | 
|  | 54             continue | 
|  | 55         hasMRNA = False | 
|  | 56         for x in gene.sub_features: | 
|  | 57           if x.type == "mRNA": | 
|  | 58             hasMRNA = True | 
|  | 59             mRNA = x | 
|  | 60         """ | 
|  | 61         if not hasMRNA: | 
|  | 62           mRNA = gffSeqFeature( | 
|  | 63                    location=FeatureLocation(gene.location.start, gene.location.end, gene.location.strand), | 
|  | 64                    type="mRNA", | 
|  | 65                    source = "cpt.prepApollo", | 
|  | 66                    qualifiers={ | 
|  | 67                        "ID": ["%s.mRNA" % clean_gene.qualifiers["ID"][0]], | 
|  | 68                        "Parent": clean_gene.qualifiers["ID"], | 
|  | 69                    }, | 
|  | 70                    sub_features=gene.sub_features, | 
|  | 71                    strand=exon_strand | 
|  | 72                  ) | 
|  | 73           for x in mRNA.sub_features: | 
|  | 74             x.qualifiers["Parent"] = mRNA["ID"] | 
|  | 75           clean_gene.sub_features = [mRNA] | 
|  | 76         else: | 
|  | 77           for x in clean_gene.sub_features: | 
|  | 78             if x.type != "mRNA": | 
|  | 79               x.qualifiers["Parent"] = [mRNA.id] """ | 
|  | 80 | 
|  | 81         # check for CDS child features of the gene, do not go a further step (this should skip any CDS children of exon child features) | 
|  | 82         for cds in feature_lambda( | 
|  | 83             gene.sub_features, | 
|  | 84             feature_test_type, | 
|  | 85             {"type": "CDS"}, | 
|  | 86             subfeatures=False, | 
|  | 87             recurse=False, | 
|  | 88             ): | 
|  | 89             # check all CDS features for min/max boundaries | 
|  | 90             if exon_start is None: | 
|  | 91                 exon_start = cds.location.start | 
|  | 92                 exon_strand = cds.location.strand | 
|  | 93             if exon_end is None: | 
|  | 94                 exon_end = cds.location.end | 
|  | 95             exon_start = min(exon_start, cds.location.start) | 
|  | 96             exon_end = max(exon_end, cds.location.end) | 
|  | 97             cds_list.append(cds) | 
|  | 98         if cds_list: | 
|  | 99             # we found a CDS to adopt | 
|  | 100             new_exon = gffSeqFeature( | 
|  | 101                 location=FeatureLocation(exon_start, exon_end), | 
|  | 102                 type="exon", | 
|  | 103                 source = "cpt.prepApollo", | 
|  | 104                 qualifiers={ | 
|  | 105                     "ID": ["%s.exon" % clean_gene.qualifiers["ID"][0]], | 
|  | 106                     "Parent": [clean_gene.id], | 
|  | 107                     "ApolloExon": ["True"], | 
|  | 108                 }, | 
|  | 109                 sub_features=[], | 
|  | 110                 strand=exon_strand | 
|  | 111             ) | 
|  | 112             for cds in cds_list: | 
|  | 113                 cds.qualifiers["Parent"] = new_exon.qualifiers["ID"] | 
|  | 114                 new_exon.sub_features.append(cds) | 
|  | 115             #gene.sub_features.append(new_exon) | 
|  | 116             # get all the other children of gene that AREN'T a CDS including the new exon | 
|  | 117             clean_gene.sub_features.append(copy.deepcopy(new_exon)) | 
|  | 118             #clean_gene.sub_features.append(gffSeqFeature(location=FeatureLocation(exon_start, exon_end, exon_strand), type="exon", source = "cpt.prepApollo", qualifiers={"ID": ["%s.exon" % clean_gene.qualifiers["ID"][0]], "Parent": clean_gene.qualifiers["ID"]}, sub_features=[], strand=exon_strand)) | 
|  | 119             """ | 
|  | 120             for sf in feature_lambda( | 
|  | 121                 gene.sub_features, | 
|  | 122                 feature_test_type, | 
|  | 123                 {"type": "CDS"}, | 
|  | 124                 subfeatures=True, | 
|  | 125                 recurse=False, | 
|  | 126                 invert=True, | 
|  | 127             ): | 
|  | 128                 child = copy.deepcopy(sf) | 
|  | 129                 child.qualifiers["Parent"] = new_exon.qualifiers["ID"] | 
|  | 130                 clean_gene.sub_features.append(child) | 
|  | 131             """ | 
|  | 132             # add them to the new Exon feature | 
|  | 133         # return the cleaned gene with new exon | 
|  | 134         yield clean_gene | 
|  | 135 | 
|  | 136 def process_features(features): | 
|  | 137     # change RBS to 'Shine_Dalgarno_sequence' | 
|  | 138     for rbs in feature_lambda(features, feature_test_type, {'type': "RBS"}): | 
|  | 139         rbs.type = "Shine_Dalgarno_sequence" | 
|  | 140 | 
|  | 141     # Filter top level features | 
|  | 142     for feature in feature_lambda(features, feature_test_type, {"types": ALLOWED_FEATURES}, subfeatures=True): | 
|  | 143         cleaned_subfeatures = [] | 
|  | 144         for sf in feature.sub_features: | 
|  | 145             if sf.type in SPECIAL_REMOVED_FEATURES: | 
|  | 146                 # 'gene_component_region' is uncaught by feature_test_type as it contains `gene` | 
|  | 147                 continue | 
|  | 148             else: | 
|  | 149                 cleaned_subfeatures.append(sf) | 
|  | 150         feature.sub_features = copy.deepcopy(cleaned_subfeatures) | 
|  | 151         yield feature | 
|  | 152 | 
|  | 153 def gff_filter(gff3): | 
|  | 154     for rec in gffParse(gff3): | 
|  | 155         cleaned_features = sorted(list(process_features(rec.features)), key=lambda x: x.location.start) | 
|  | 156         rec.features = sorted(list(add_exons(cleaned_features)), key=lambda x: x.location.start) | 
|  | 157         rec.annotations = {} | 
|  | 158         gffWrite([rec], sys.stdout) | 
|  | 159 | 
|  | 160 | 
|  | 161 if __name__ == "__main__": | 
|  | 162     parser = argparse.ArgumentParser( | 
|  | 163         description="add parent exon features to CDSs for Apollo" | 
|  | 164     ) | 
|  | 165     parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 annotations") | 
|  | 166     args = parser.parse_args() | 
|  | 167     gff_filter(**vars(args)) |