Mercurial > repos > cpt_testbed > suite_work2
comparison transterm.py @ 0:d5c3354c166d draft default tip
Uploaded
| author | cpt_testbed |
|---|---|
| date | Fri, 29 Apr 2022 10:33:36 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d5c3354c166d |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 import os | |
| 3 import sys | |
| 4 import argparse | |
| 5 import subprocess | |
| 6 import tempfile | |
| 7 import re | |
| 8 import logging | |
| 9 from Bio.Seq import Seq | |
| 10 from Bio.SeqRecord import SeqRecord | |
| 11 from Bio.SeqFeature import SeqFeature, FeatureLocation | |
| 12 from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature | |
| 13 from gff3 import feature_lambda, feature_test_type | |
| 14 | |
| 15 logging.basicConfig(level=logging.INFO) | |
| 16 log = logging.getLogger(__name__) | |
| 17 SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__)) | |
| 18 | |
| 19 REGEX_TERM = re.compile( | |
| 20 " TERM \d+ \s* (\d+) - (\d+)\s*(\+|-) [^ ]* \s* (\d+)\s* ([0-9.-]+)\s* -([0-9.]+)\s*\|\s*(.*)" | |
| 21 ) | |
| 22 PARTS_TERM = re.compile(" ([^ ]*)\s+([^ ]*)\s+([^ ]*)\s+([^ ]*)\s+([^ ]*)") | |
| 23 COLS = ( | |
| 24 "start", | |
| 25 "end", | |
| 26 "strand", | |
| 27 "confidence", | |
| 28 "hp score", | |
| 29 "tail score", | |
| 30 "notes", | |
| 31 "5' tail", | |
| 32 "5' stem", | |
| 33 "loop", | |
| 34 "3' stem", | |
| 35 "3' tail", | |
| 36 ) | |
| 37 | |
| 38 | |
| 39 def build_expterm(): | |
| 40 pass | |
| 41 | |
| 42 | |
| 43 def generate_annotation_file(gff3): | |
| 44 # TODO: cleanup | |
| 45 t = tempfile.NamedTemporaryFile(mode="w",delete=False, suffix=".coords") | |
| 46 for rec in gffParse(gff3): | |
| 47 features = feature_lambda( | |
| 48 rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False | |
| 49 ) | |
| 50 for feature in sorted(features, key=lambda x: x.location.start): | |
| 51 t.write( | |
| 52 "\t".join( | |
| 53 map( | |
| 54 str, | |
| 55 [ | |
| 56 feature.id, | |
| 57 feature.location.start + 1, | |
| 58 feature.location.end, | |
| 59 rec.id, | |
| 60 ], | |
| 61 ) | |
| 62 ) | |
| 63 + "\n" | |
| 64 ) | |
| 65 name = t.name | |
| 66 t.close() | |
| 67 return name | |
| 68 | |
| 69 | |
| 70 def run_transterm(expterm, fasta, annotations): | |
| 71 output = subprocess.check_output( | |
| 72 ["transterm", "-p", expterm, "--all-context", fasta.name, annotations] | |
| 73 ) | |
| 74 return output | |
| 75 | |
| 76 | |
| 77 def pairwise(it): | |
| 78 it = iter(it) | |
| 79 while True: | |
| 80 try: | |
| 81 yield next(it), next(it) | |
| 82 except StopIteration: | |
| 83 return | |
| 84 | |
| 85 def parse_transterm(data): | |
| 86 data = data.decode("utf-8") | |
| 87 data = data.split("SEQUENCE")[1:] | |
| 88 for datum in data: | |
| 89 lines = datum.split("\n") | |
| 90 | |
| 91 seq_name = lines[0][1:] | |
| 92 seq_name = seq_name[0 : seq_name.index(" ")] | |
| 93 | |
| 94 record = SeqRecord(Seq("ACTG"), id=seq_name) | |
| 95 | |
| 96 important_lines = [ | |
| 97 x for x in lines if len(x.strip()) > 0 and x.startswith(" ") | |
| 98 ] | |
| 99 # Must have an even # | |
| 100 assert len(important_lines) % 2 == 0 | |
| 101 for idx, (a, b) in enumerate(pairwise(important_lines)): | |
| 102 parsed_data = REGEX_TERM.match(a).groups() + PARTS_TERM.match(b).groups() | |
| 103 | |
| 104 pd = {k: v for (k, v) in zip(COLS, parsed_data)} | |
| 105 # start , end , strand , confidence , hp score , tail score , notes | |
| 106 # , 5' tail , 5'stem , loop , 3' stem , 3'loop | |
| 107 start = int(pd["start"]) | |
| 108 end = int(pd["end"]) | |
| 109 | |
| 110 notes = {k: pd[k] for k in COLS[6:]} | |
| 111 notes2 = {} | |
| 112 # However, if on - strand, we need to swap 5', 3': | |
| 113 if pd["strand"] == "-": | |
| 114 # Also need to flip the data | |
| 115 notes2 = { | |
| 116 "[1] 5' stem": str(Seq(notes["3' stem"]).reverse_complement()), | |
| 117 "[2] loop": str(Seq(notes["loop"]).reverse_complement()), | |
| 118 "[3] 3' stem": str(Seq(notes["5' stem"]).reverse_complement()), | |
| 119 "[4] 3' tail": str(Seq(notes["5' tail"]).reverse_complement()), | |
| 120 } | |
| 121 else: | |
| 122 notes2 = { | |
| 123 "[1] 5' stem": notes["5' stem"], | |
| 124 "[2] loop": notes["loop"], | |
| 125 "[3] 3' stem": notes["3' stem"], | |
| 126 "[4] 3' tail": notes["3' tail"], | |
| 127 } | |
| 128 | |
| 129 qualifiers = { | |
| 130 "score": [pd["confidence"]], | |
| 131 "source": ["TranstermHP"], | |
| 132 "ID": ["terminator_%s" % idx], | |
| 133 } | |
| 134 current_start = min(start, end) - 1 | |
| 135 current_end = max(start, end) | |
| 136 | |
| 137 if pd["strand"] == "+": | |
| 138 # Let's extend the current_end to include any Ts we find. | |
| 139 # Take the 3' tail, and check to see how many Ts we can strip: | |
| 140 # | |
| 141 # Updated algo: take as many chars as possible until >1 is non-T | |
| 142 # If the non-T is last, strip. | |
| 143 # Otherwise (internal), leave it. | |
| 144 addition = "" | |
| 145 prime3tail = notes["3' tail"] | |
| 146 for idx in range(len(prime3tail)): | |
| 147 if ( | |
| 148 prime3tail[idx] != "T" | |
| 149 and addition.count("A") | |
| 150 + addition.count("C") | |
| 151 + addition.count("G") | |
| 152 == 1 | |
| 153 ): | |
| 154 break | |
| 155 | |
| 156 addition += prime3tail[idx] | |
| 157 | |
| 158 if addition[-1] != "T": | |
| 159 addition = addition[0:-1] | |
| 160 | |
| 161 current_end += len(addition) | |
| 162 else: | |
| 163 addition = "" | |
| 164 prime5tail = notes["5' tail"][::-1] | |
| 165 for idx in range(len(prime5tail)): | |
| 166 if ( | |
| 167 prime5tail[idx] != "A" | |
| 168 and addition.count("T") | |
| 169 + addition.count("C") | |
| 170 + addition.count("G") | |
| 171 == 1 | |
| 172 ): | |
| 173 break | |
| 174 | |
| 175 addition += prime5tail[idx] | |
| 176 | |
| 177 if addition[-1] != "A": | |
| 178 addition = addition[0:-1] | |
| 179 | |
| 180 current_start -= len(addition) | |
| 181 | |
| 182 qualifiers.update(notes2) | |
| 183 feature = gffSeqFeature( | |
| 184 FeatureLocation(current_start, current_end), | |
| 185 type="terminator", | |
| 186 strand=1 if pd["strand"] == "+" else -1, | |
| 187 qualifiers=qualifiers, | |
| 188 ) | |
| 189 record.features.append(feature) | |
| 190 | |
| 191 yield record | |
| 192 | |
| 193 | |
| 194 def main(fasta, gff3, existing_expterm="", **kwargs): | |
| 195 coords_file = generate_annotation_file(gff3) | |
| 196 transterm_output = run_transterm(existing_expterm, fasta, coords_file) | |
| 197 try: | |
| 198 os.unlink(coords_file) | |
| 199 except Exception: | |
| 200 pass | |
| 201 # Not my job | |
| 202 | |
| 203 for record in parse_transterm(transterm_output): | |
| 204 yield record | |
| 205 | |
| 206 | |
| 207 if __name__ == "__main__": | |
| 208 parser = argparse.ArgumentParser( | |
| 209 description="Export corresponding sequence in genome from GFF3", epilog="" | |
| 210 ) | |
| 211 parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") | |
| 212 parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File") | |
| 213 | |
| 214 parser.add_argument( | |
| 215 "--min_conf", | |
| 216 type=int, | |
| 217 default=76, | |
| 218 help="Only output terminators with confidence >= n", | |
| 219 ) | |
| 220 | |
| 221 # parser.add_argument('--gc', type=float, default=-2.3, help='Score of a G-C pair') | |
| 222 # parser.add_argument('--au', type=float, default=-0.9, help='Score of an A-U pair') | |
| 223 # parser.add_argument('--gu', type=float, default=1.3, help='Score of a G-U pair') | |
| 224 # parser.add_argument('--mm', type=float, default=3.5, help='Score of any other pair') | |
| 225 # parser.add_argument('--gap', type=int, default=6, help='Score of a gap in the hairpin') | |
| 226 # parser.add_argument('--max_hp_score', type=float, default=-2, help='Maximum allowable hairpin score') | |
| 227 # parser.add_argument('--max_tail_score', type=float, default=-2.5, help='Maximum allowable tail score') | |
| 228 # parser.add_argument('--max_len', type=int, default=59, help='Total extent of hairpin <= n NT long') | |
| 229 # parser.add_argument('--min_stem', type=int, default=4, help='Stem must be n nucleotides long') | |
| 230 # parser.add_argument('--max_loop', type=int, default=13, help='The loop portion can be no longer than n') | |
| 231 # parser.add_argument('--min_loop', type=int, default=3, help='Loop portion of the hairpin must be at least n long') | |
| 232 # parser.add_argument('--uwin_require', type=int, default=3, help='Number of "U" nucleotides in the --uwin_length long region.') | |
| 233 # parser.add_argument('--loop_penalty', default='1,2,3,4,5,6,7,8,9,10,11', help='The cost of loops of various lengths can be set using --loop_penalty=f1,f2,f3,f4,f5,...fn, where f1 is the cost of a loop of length --min_loop, f2 is the cost of a loop of length --min_loop+1, as so on. If there are too few terms to cover up to max_loop, the last term is repeated.',) | |
| 234 | |
| 235 args = parser.parse_args() | |
| 236 | |
| 237 for record in main( | |
| 238 existing_expterm=os.path.join(SCRIPT_PATH, "expterm.dat"), **vars(args) | |
| 239 ): | |
| 240 gffWrite([record], sys.stdout) |
