diff transterm.py @ 0:d5c3354c166d draft default tip

Uploaded
author cpt_testbed
date Fri, 29 Apr 2022 10:33:36 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/transterm.py	Fri Apr 29 10:33:36 2022 +0000
@@ -0,0 +1,240 @@
+#!/usr/bin/env python
+import os
+import sys
+import argparse
+import subprocess
+import tempfile
+import re
+import logging
+from Bio.Seq import Seq
+from Bio.SeqRecord import SeqRecord
+from Bio.SeqFeature import SeqFeature, FeatureLocation
+from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature
+from gff3 import feature_lambda, feature_test_type
+
+logging.basicConfig(level=logging.INFO)
+log = logging.getLogger(__name__)
+SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__))
+
+REGEX_TERM = re.compile(
+    "  TERM \d+ \s* (\d+) - (\d+)\s*(\+|-) [^ ]* \s* (\d+)\s* ([0-9.-]+)\s* -([0-9.]+)\s*\|\s*(.*)"
+)
+PARTS_TERM = re.compile("  ([^ ]*)\s+([^ ]*)\s+([^ ]*)\s+([^ ]*)\s+([^ ]*)")
+COLS = (
+    "start",
+    "end",
+    "strand",
+    "confidence",
+    "hp score",
+    "tail score",
+    "notes",
+    "5' tail",
+    "5' stem",
+    "loop",
+    "3' stem",
+    "3' tail",
+)
+
+
+def build_expterm():
+    pass
+
+
+def generate_annotation_file(gff3):
+    # TODO: cleanup
+    t = tempfile.NamedTemporaryFile(mode="w",delete=False, suffix=".coords")
+    for rec in gffParse(gff3):
+        features = feature_lambda(
+            rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False
+        )
+        for feature in sorted(features, key=lambda x: x.location.start):
+            t.write(
+                "\t".join(
+                    map(
+                        str,
+                        [
+                            feature.id,
+                            feature.location.start + 1,
+                            feature.location.end,
+                            rec.id,
+                        ],
+                    )
+                )
+                + "\n"
+            )
+    name = t.name
+    t.close()
+    return name
+
+
+def run_transterm(expterm, fasta, annotations):
+    output = subprocess.check_output(
+        ["transterm", "-p", expterm, "--all-context", fasta.name, annotations]
+    )
+    return output
+
+
+def pairwise(it):
+    it = iter(it)
+    while True:
+      try:
+        yield next(it), next(it)
+      except StopIteration:
+        return
+
+def parse_transterm(data):
+    data = data.decode("utf-8")
+    data = data.split("SEQUENCE")[1:]
+    for datum in data:
+        lines = datum.split("\n")
+
+        seq_name = lines[0][1:]
+        seq_name = seq_name[0 : seq_name.index(" ")]
+
+        record = SeqRecord(Seq("ACTG"), id=seq_name)
+
+        important_lines = [
+            x for x in lines if len(x.strip()) > 0 and x.startswith("  ")
+        ]
+        # Must have an even #
+        assert len(important_lines) % 2 == 0
+        for idx, (a, b) in enumerate(pairwise(important_lines)):
+            parsed_data = REGEX_TERM.match(a).groups() + PARTS_TERM.match(b).groups()
+
+            pd = {k: v for (k, v) in zip(COLS, parsed_data)}
+            # start , end  , strand , confidence , hp score , tail score , notes
+            # , 5' tail         , 5'stem   , loop , 3' stem  , 3'loop
+            start = int(pd["start"])
+            end = int(pd["end"])
+
+            notes = {k: pd[k] for k in COLS[6:]}
+            notes2 = {}
+            # However, if on - strand, we need to swap 5', 3':
+            if pd["strand"] == "-":
+                # Also need to flip the data
+                notes2 = {
+                    "[1] 5' stem": str(Seq(notes["3' stem"]).reverse_complement()),
+                    "[2] loop": str(Seq(notes["loop"]).reverse_complement()),
+                    "[3] 3' stem": str(Seq(notes["5' stem"]).reverse_complement()),
+                    "[4] 3' tail": str(Seq(notes["5' tail"]).reverse_complement()),
+                }
+            else:
+                notes2 = {
+                    "[1] 5' stem": notes["5' stem"],
+                    "[2] loop": notes["loop"],
+                    "[3] 3' stem": notes["3' stem"],
+                    "[4] 3' tail": notes["3' tail"],
+                }
+
+            qualifiers = {
+                "score": [pd["confidence"]],
+                "source": ["TranstermHP"],
+                "ID": ["terminator_%s" % idx],
+            }
+            current_start = min(start, end) - 1
+            current_end = max(start, end)
+
+            if pd["strand"] == "+":
+                # Let's extend the current_end to include any Ts we find.
+                # Take the 3' tail, and check to see how many Ts we can strip:
+                #
+                # Updated algo: take as many chars as possible until >1 is non-T
+                # If the non-T is last, strip.
+                # Otherwise (internal), leave it.
+                addition = ""
+                prime3tail = notes["3' tail"]
+                for idx in range(len(prime3tail)):
+                    if (
+                        prime3tail[idx] != "T"
+                        and addition.count("A")
+                        + addition.count("C")
+                        + addition.count("G")
+                        == 1
+                    ):
+                        break
+
+                    addition += prime3tail[idx]
+
+                if addition[-1] != "T":
+                    addition = addition[0:-1]
+
+                current_end += len(addition)
+            else:
+                addition = ""
+                prime5tail = notes["5' tail"][::-1]
+                for idx in range(len(prime5tail)):
+                    if (
+                        prime5tail[idx] != "A"
+                        and addition.count("T")
+                        + addition.count("C")
+                        + addition.count("G")
+                        == 1
+                    ):
+                        break
+
+                    addition += prime5tail[idx]
+
+                if addition[-1] != "A":
+                    addition = addition[0:-1]
+
+                current_start -= len(addition)
+
+            qualifiers.update(notes2)
+            feature = gffSeqFeature(
+                FeatureLocation(current_start, current_end),
+                type="terminator",
+                strand=1 if pd["strand"] == "+" else -1,
+                qualifiers=qualifiers,
+            )
+            record.features.append(feature)
+
+        yield record
+
+
+def main(fasta, gff3, existing_expterm="", **kwargs):
+    coords_file = generate_annotation_file(gff3)
+    transterm_output = run_transterm(existing_expterm, fasta, coords_file)
+    try:
+        os.unlink(coords_file)
+    except Exception:
+        pass
+        # Not my job
+
+    for record in parse_transterm(transterm_output):
+        yield record
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(
+        description="Export corresponding sequence in genome from GFF3", epilog=""
+    )
+    parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome")
+    parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File")
+
+    parser.add_argument(
+        "--min_conf",
+        type=int,
+        default=76,
+        help="Only output terminators with confidence >= n",
+    )
+
+    # parser.add_argument('--gc', type=float, default=-2.3, help='Score of a G-C pair')
+    # parser.add_argument('--au', type=float, default=-0.9, help='Score of an A-U pair')
+    # parser.add_argument('--gu', type=float, default=1.3, help='Score of a G-U pair')
+    # parser.add_argument('--mm', type=float, default=3.5, help='Score of any other pair')
+    # parser.add_argument('--gap', type=int, default=6, help='Score of a gap in the hairpin')
+    # parser.add_argument('--max_hp_score', type=float, default=-2, help='Maximum allowable hairpin score')
+    # parser.add_argument('--max_tail_score', type=float, default=-2.5, help='Maximum allowable tail score')
+    # parser.add_argument('--max_len', type=int, default=59, help='Total extent of hairpin <= n NT long')
+    # parser.add_argument('--min_stem', type=int, default=4, help='Stem must be n nucleotides long')
+    # parser.add_argument('--max_loop', type=int, default=13, help='The loop portion can be no longer than n')
+    # parser.add_argument('--min_loop', type=int, default=3, help='Loop portion of the hairpin must be at least n long')
+    # parser.add_argument('--uwin_require', type=int, default=3, help='Number of "U" nucleotides in the --uwin_length long region.')
+    # parser.add_argument('--loop_penalty', default='1,2,3,4,5,6,7,8,9,10,11', help='The cost of loops of various lengths can be set using --loop_penalty=f1,f2,f3,f4,f5,...fn, where f1 is the cost of a loop of length --min_loop, f2 is the cost of a loop of length --min_loop+1, as so on. If there are too few terms to cover up to max_loop, the last term is repeated.',)
+
+    args = parser.parse_args()
+
+    for record in main(
+        existing_expterm=os.path.join(SCRIPT_PATH, "expterm.dat"), **vars(args)
+    ):
+        gffWrite([record], sys.stdout)