| 0 | 1 import copy | 
|  | 2 import logging | 
|  | 3 | 
|  | 4 log = logging.getLogger() | 
|  | 5 log.setLevel(logging.WARN) | 
|  | 6 | 
|  | 7 | 
|  | 8 def feature_lambda( | 
|  | 9     feature_list, | 
|  | 10     test, | 
|  | 11     test_kwargs, | 
|  | 12     subfeatures=True, | 
|  | 13     parent=None, | 
|  | 14     invert=False, | 
|  | 15     recurse=True, | 
|  | 16 ): | 
|  | 17     """Recursively search through features, testing each with a test function, yielding matches. | 
|  | 18 | 
|  | 19     GFF3 is a hierachical data structure, so we need to be able to recursively | 
|  | 20     search through features. E.g. if you're looking for a feature with | 
|  | 21     ID='bob.42', you can't just do a simple list comprehension with a test | 
|  | 22     case. You don't know how deeply burried bob.42 will be in the feature tree. This is where feature_lambda steps in. | 
|  | 23 | 
|  | 24     :type feature_list: list | 
|  | 25     :param feature_list: an iterable of features | 
|  | 26 | 
|  | 27     :type test: function reference | 
|  | 28     :param test: a closure with the method signature (feature, **kwargs) where | 
|  | 29                  the kwargs are those passed in the next argument. This | 
|  | 30                  function should return True or False, True if the feature is | 
|  | 31                  to be yielded as part of the main feature_lambda function, or | 
|  | 32                  False if it is to be ignored. This function CAN mutate the | 
|  | 33                  features passed to it (think "apply"). | 
|  | 34 | 
|  | 35     :type test_kwargs: dictionary | 
|  | 36     :param test_kwargs: kwargs to pass to your closure when it is called. | 
|  | 37 | 
|  | 38     :type subfeatures: boolean | 
|  | 39     :param subfeatures: when a feature is matched, should just that feature be | 
|  | 40                         yielded to the caller, or should the entire sub_feature | 
|  | 41                         tree for that feature be included? subfeatures=True is | 
|  | 42                         useful in cases such as searching for a gene feature, | 
|  | 43                         and wanting to know what RBS/Shine_Dalgarno_sequences | 
|  | 44                         are in the sub_feature tree (which can be accomplished | 
|  | 45                         with two feature_lambda calls). subfeatures=False is | 
|  | 46                         useful in cases when you want to process (and possibly | 
|  | 47                         return) the entire feature tree, such as applying a | 
|  | 48                         qualifier to every single feature. | 
|  | 49 | 
|  | 50     :type invert: boolean | 
|  | 51     :param invert: Negate/invert the result of the filter. | 
|  | 52 | 
|  | 53     :rtype: yielded list | 
|  | 54     :return: Yields a list of matching features. | 
|  | 55     """ | 
|  | 56     # Either the top level set of [features] or the subfeature attribute | 
|  | 57     for feature in feature_list: | 
|  | 58         feature._parent = parent | 
|  | 59         if not parent: | 
|  | 60             # Set to self so we cannot go above root. | 
|  | 61             feature._parent = feature | 
|  | 62         test_result = test(feature, **test_kwargs) | 
|  | 63         # if (not invert and test_result) or (invert and not test_result): | 
|  | 64         if invert ^ test_result: | 
|  | 65             if not subfeatures: | 
|  | 66                 feature_copy = copy.deepcopy(feature) | 
|  | 67                 feature_copy.sub_features = list() | 
|  | 68                 yield feature_copy | 
|  | 69             else: | 
|  | 70                 yield feature | 
|  | 71 | 
|  | 72         if recurse and hasattr(feature, "sub_features"): | 
|  | 73             for x in feature_lambda( | 
|  | 74                 feature.sub_features, | 
|  | 75                 test, | 
|  | 76                 test_kwargs, | 
|  | 77                 subfeatures=subfeatures, | 
|  | 78                 parent=feature, | 
|  | 79                 invert=invert, | 
|  | 80                 recurse=recurse, | 
|  | 81             ): | 
|  | 82                 yield x | 
|  | 83 | 
|  | 84 | 
|  | 85 def fetchParent(feature): | 
|  | 86     if not hasattr(feature, "_parent") or feature._parent is None: | 
|  | 87         return feature | 
|  | 88     else: | 
|  | 89         return fetchParent(feature._parent) | 
|  | 90 | 
|  | 91 | 
|  | 92 def feature_test_true(feature, **kwargs): | 
|  | 93     return True | 
|  | 94 | 
|  | 95 | 
|  | 96 def feature_test_type(feature, **kwargs): | 
|  | 97     if "type" in kwargs: | 
|  | 98         return str(feature.type).upper() == str(kwargs["type"]).upper() | 
|  | 99     elif "types" in kwargs: | 
|  | 100       for x in kwargs["types"]: | 
|  | 101         if str(feature.type).upper() == str(x).upper(): | 
|  | 102           return True | 
|  | 103       return False | 
|  | 104     raise Exception("Incorrect feature_test_type call, need type or types") | 
|  | 105 | 
|  | 106 | 
|  | 107 def feature_test_qual_value(feature, **kwargs): | 
|  | 108     """Test qualifier values. | 
|  | 109 | 
|  | 110     For every feature, check that at least one value in | 
|  | 111     feature.quailfiers(kwargs['qualifier']) is in kwargs['attribute_list'] | 
|  | 112     """ | 
|  | 113     if isinstance(kwargs["qualifier"], list): | 
|  | 114         for qualifier in kwargs["qualifier"]: | 
|  | 115             for attribute_value in feature.qualifiers.get(qualifier, []): | 
|  | 116                 if attribute_value in kwargs["attribute_list"]: | 
|  | 117                     return True | 
|  | 118     else: | 
|  | 119         for attribute_value in feature.qualifiers.get(kwargs["qualifier"], []): | 
|  | 120             if attribute_value in kwargs["attribute_list"]: | 
|  | 121                 return True | 
|  | 122     return False | 
|  | 123 | 
|  | 124 | 
|  | 125 def feature_test_location(feature, **kwargs): | 
|  | 126     if "strand" in kwargs: | 
|  | 127         if feature.location.strand != kwargs["strand"]: | 
|  | 128             return False | 
|  | 129 | 
|  | 130     return feature.location.start <= kwargs["loc"] <= feature.location.end | 
|  | 131 | 
|  | 132 | 
|  | 133 def feature_test_quals(feature, **kwargs): | 
|  | 134     """ | 
|  | 135     Example:: | 
|  | 136 | 
|  | 137         a = Feature(qualifiers={'Note': ['Some notes', 'Aasdf']}) | 
|  | 138 | 
|  | 139         # Check if a contains a Note | 
|  | 140         feature_test_quals(a, {'Note': None})  # Returns True | 
|  | 141         feature_test_quals(a, {'Product': None})  # Returns False | 
|  | 142 | 
|  | 143         # Check if a contains a note with specific value | 
|  | 144         feature_test_quals(a, {'Note': ['ome']})  # Returns True | 
|  | 145 | 
|  | 146         # Check if a contains a note with specific value | 
|  | 147         feature_test_quals(a, {'Note': ['other']})  # Returns False | 
|  | 148     """ | 
|  | 149     for key in kwargs: | 
|  | 150         if key not in feature.qualifiers: | 
|  | 151             return False | 
|  | 152 | 
|  | 153         # Key is present, no value specified | 
|  | 154         if kwargs[key] is None: | 
|  | 155             return True | 
|  | 156 | 
|  | 157         # Otherwise there is a key value we're looking for. | 
|  | 158         # so we make a list of matches | 
|  | 159         matches = [] | 
|  | 160         # And check all of the feature qualifier valuse | 
|  | 161         for value in feature.qualifiers[key]: | 
|  | 162             # For that kwargs[key] value | 
|  | 163             for x in kwargs[key]: | 
|  | 164                 matches.append(x in value) | 
|  | 165 | 
|  | 166         # If none matched, then we return false. | 
|  | 167         if not any(matches): | 
|  | 168             return False | 
|  | 169 | 
|  | 170     return True | 
|  | 171 | 
|  | 172 | 
|  | 173 def feature_test_contains(feature, **kwargs): | 
|  | 174     if "index" in kwargs: | 
|  | 175         return feature.location.start < kwargs["index"] < feature.location.end | 
|  | 176     elif "range" in kwargs: | 
|  | 177         return ( | 
|  | 178             feature.location.start < kwargs["range"]["start"] < feature.location.end | 
|  | 179             and feature.location.start < kwargs["range"]["end"] < feature.location.end | 
|  | 180         ) | 
|  | 181     else: | 
|  | 182         raise RuntimeError("Must use index or range keyword") | 
|  | 183 | 
|  | 184 | 
|  | 185 def get_id(feature=None, parent_prefix=None): | 
|  | 186     result = "" | 
|  | 187     if parent_prefix is not None: | 
|  | 188         result += parent_prefix + "|" | 
|  | 189     if "locus_tag" in feature.qualifiers: | 
|  | 190         result += feature.qualifiers["locus_tag"][0] | 
|  | 191     elif "gene" in feature.qualifiers: | 
|  | 192         result += feature.qualifiers["gene"][0] | 
|  | 193     elif "Gene" in feature.qualifiers: | 
|  | 194         result += feature.qualifiers["Gene"][0] | 
|  | 195     elif "product" in feature.qualifiers: | 
|  | 196         result += feature.qualifiers["product"][0] | 
|  | 197     elif "Product" in feature.qualifiers: | 
|  | 198         result += feature.qualifiers["Product"][0] | 
|  | 199     elif "Name" in feature.qualifiers: | 
|  | 200         result += feature.qualifiers["Name"][0] | 
|  | 201     else: | 
|  | 202         return feature.id | 
|  | 203         # Leaving in case bad things happen. | 
|  | 204         # result += '%s_%s_%s_%s' % ( | 
|  | 205         # feature.id, | 
|  | 206         # feature.location.start, | 
|  | 207         # feature.location.end, | 
|  | 208         # feature.location.strand | 
|  | 209         # ) | 
|  | 210     return result | 
|  | 211 | 
|  | 212 | 
|  | 213 def get_gff3_id(gene): | 
|  | 214     return gene.qualifiers.get("Name", [gene.id])[0] | 
|  | 215 | 
|  | 216 | 
|  | 217 def ensure_location_in_bounds(start=0, end=0, parent_length=0): | 
|  | 218     # This prevents frameshift errors | 
|  | 219     while start < 0: | 
|  | 220         start += 3 | 
|  | 221     while end < 0: | 
|  | 222         end += 3 | 
|  | 223     while start > parent_length: | 
|  | 224         start -= 3 | 
|  | 225     while end > parent_length: | 
|  | 226         end -= 3 | 
|  | 227     return (start, end) | 
|  | 228 | 
|  | 229 | 
|  | 230 def coding_genes(feature_list): | 
|  | 231     for x in genes(feature_list): | 
|  | 232         if ( | 
|  | 233             len( | 
|  | 234                 list( | 
|  | 235                     feature_lambda( | 
|  | 236                         x.sub_features, | 
|  | 237                         feature_test_type, | 
|  | 238                         {"type": "CDS"}, | 
|  | 239                         subfeatures=False, | 
|  | 240                     ) | 
|  | 241                 ) | 
|  | 242             ) | 
|  | 243             > 0 | 
|  | 244         ): | 
|  | 245             yield x | 
|  | 246 | 
|  | 247 | 
|  | 248 def genes(feature_list, feature_type="gene", sort=False): | 
|  | 249     """ | 
|  | 250     Simple filter to extract gene features from the feature set. | 
|  | 251     """ | 
|  | 252 | 
|  | 253     if not sort: | 
|  | 254         for x in feature_lambda( | 
|  | 255             feature_list, feature_test_type, {"type": feature_type}, subfeatures=True | 
|  | 256         ): | 
|  | 257             yield x | 
|  | 258     else: | 
|  | 259         data = list(genes(feature_list, feature_type=feature_type, sort=False)) | 
|  | 260         data = sorted(data, key=lambda feature: feature.location.start) | 
|  | 261         for x in data: | 
|  | 262             yield x | 
|  | 263 | 
|  | 264 | 
|  | 265 def wa_unified_product_name(feature): | 
|  | 266     """ | 
|  | 267     Try and figure out a name. We gave conflicting instructions, so | 
|  | 268     this isn't as trivial as it should be. Sometimes it will be in | 
|  | 269     'product' or 'Product', othertimes in 'Name' | 
|  | 270     """ | 
|  | 271     # Manually applied tags. | 
|  | 272     protein_product = feature.qualifiers.get( | 
|  | 273         "product", feature.qualifiers.get("Product", [None]) | 
|  | 274     )[0] | 
|  | 275 | 
|  | 276     # If neither of those are available ... | 
|  | 277     if protein_product is None: | 
|  | 278         # And there's a name... | 
|  | 279         if "Name" in feature.qualifiers: | 
|  | 280             if not is_uuid(feature.qualifiers["Name"][0]): | 
|  | 281                 protein_product = feature.qualifiers["Name"][0] | 
|  | 282 | 
|  | 283     return protein_product | 
|  | 284 | 
|  | 285 | 
|  | 286 def is_uuid(name): | 
|  | 287     return name.count("-") == 4 and len(name) == 36 | 
|  | 288 | 
|  | 289 | 
|  | 290 def get_rbs_from(gene): | 
|  | 291     # Normal RBS annotation types | 
|  | 292     rbs_rbs = list( | 
|  | 293         feature_lambda( | 
|  | 294             gene.sub_features, feature_test_type, {"type": "RBS"}, subfeatures=False | 
|  | 295         ) | 
|  | 296     ) | 
|  | 297     rbs_sds = list( | 
|  | 298         feature_lambda( | 
|  | 299             gene.sub_features, | 
|  | 300             feature_test_type, | 
|  | 301             {"type": "Shine_Dalgarno_sequence"}, | 
|  | 302             subfeatures=False, | 
|  | 303         ) | 
|  | 304     ) | 
|  | 305     # Fraking apollo | 
|  | 306     apollo_exons = list( | 
|  | 307         feature_lambda( | 
|  | 308             gene.sub_features, feature_test_type, {"type": "exon"}, subfeatures=False | 
|  | 309         ) | 
|  | 310     ) | 
|  | 311     apollo_exons = [x for x in apollo_exons if len(x) < 10] | 
|  | 312     # These are more NCBI's style | 
|  | 313     regulatory_elements = list( | 
|  | 314         feature_lambda( | 
|  | 315             gene.sub_features, | 
|  | 316             feature_test_type, | 
|  | 317             {"type": "regulatory"}, | 
|  | 318             subfeatures=False, | 
|  | 319         ) | 
|  | 320     ) | 
|  | 321     rbs_regulatory = list( | 
|  | 322         feature_lambda( | 
|  | 323             regulatory_elements, | 
|  | 324             feature_test_quals, | 
|  | 325             {"regulatory_class": ["ribosome_binding_site"]}, | 
|  | 326             subfeatures=False, | 
|  | 327         ) | 
|  | 328     ) | 
|  | 329     # Here's hoping you find just one ;) | 
|  | 330     return rbs_rbs + rbs_sds + rbs_regulatory + apollo_exons | 
|  | 331 | 
|  | 332 | 
|  | 333 def nice_name(record): | 
|  | 334     """ | 
|  | 335     get the real name rather than NCBI IDs and so on. If fails, will return record.id | 
|  | 336     """ | 
|  | 337     name = record.id | 
|  | 338     likely_parental_contig = list(genes(record.features, feature_type="contig")) | 
|  | 339     if len(likely_parental_contig) == 1: | 
|  | 340         name = likely_parental_contig[0].qualifiers.get("organism", [name])[0] | 
|  | 341     return name | 
|  | 342 | 
|  | 343 | 
|  | 344 def fsort(it): | 
|  | 345     for i in sorted(it, key=lambda x: int(x.location.start)): | 
|  | 346         yield i |