Mercurial > repos > cpt_testbed > suite_work2
diff transterm.py @ 0:d5c3354c166d draft default tip
Uploaded
author | cpt_testbed |
---|---|
date | Fri, 29 Apr 2022 10:33:36 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/transterm.py Fri Apr 29 10:33:36 2022 +0000 @@ -0,0 +1,240 @@ +#!/usr/bin/env python +import os +import sys +import argparse +import subprocess +import tempfile +import re +import logging +from Bio.Seq import Seq +from Bio.SeqRecord import SeqRecord +from Bio.SeqFeature import SeqFeature, FeatureLocation +from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature +from gff3 import feature_lambda, feature_test_type + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) +SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__)) + +REGEX_TERM = re.compile( + " TERM \d+ \s* (\d+) - (\d+)\s*(\+|-) [^ ]* \s* (\d+)\s* ([0-9.-]+)\s* -([0-9.]+)\s*\|\s*(.*)" +) +PARTS_TERM = re.compile(" ([^ ]*)\s+([^ ]*)\s+([^ ]*)\s+([^ ]*)\s+([^ ]*)") +COLS = ( + "start", + "end", + "strand", + "confidence", + "hp score", + "tail score", + "notes", + "5' tail", + "5' stem", + "loop", + "3' stem", + "3' tail", +) + + +def build_expterm(): + pass + + +def generate_annotation_file(gff3): + # TODO: cleanup + t = tempfile.NamedTemporaryFile(mode="w",delete=False, suffix=".coords") + for rec in gffParse(gff3): + features = feature_lambda( + rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False + ) + for feature in sorted(features, key=lambda x: x.location.start): + t.write( + "\t".join( + map( + str, + [ + feature.id, + feature.location.start + 1, + feature.location.end, + rec.id, + ], + ) + ) + + "\n" + ) + name = t.name + t.close() + return name + + +def run_transterm(expterm, fasta, annotations): + output = subprocess.check_output( + ["transterm", "-p", expterm, "--all-context", fasta.name, annotations] + ) + return output + + +def pairwise(it): + it = iter(it) + while True: + try: + yield next(it), next(it) + except StopIteration: + return + +def parse_transterm(data): + data = data.decode("utf-8") + data = data.split("SEQUENCE")[1:] + for datum in data: + lines = datum.split("\n") + + seq_name = lines[0][1:] + seq_name = seq_name[0 : seq_name.index(" ")] + + record = SeqRecord(Seq("ACTG"), id=seq_name) + + important_lines = [ + x for x in lines if len(x.strip()) > 0 and x.startswith(" ") + ] + # Must have an even # + assert len(important_lines) % 2 == 0 + for idx, (a, b) in enumerate(pairwise(important_lines)): + parsed_data = REGEX_TERM.match(a).groups() + PARTS_TERM.match(b).groups() + + pd = {k: v for (k, v) in zip(COLS, parsed_data)} + # start , end , strand , confidence , hp score , tail score , notes + # , 5' tail , 5'stem , loop , 3' stem , 3'loop + start = int(pd["start"]) + end = int(pd["end"]) + + notes = {k: pd[k] for k in COLS[6:]} + notes2 = {} + # However, if on - strand, we need to swap 5', 3': + if pd["strand"] == "-": + # Also need to flip the data + notes2 = { + "[1] 5' stem": str(Seq(notes["3' stem"]).reverse_complement()), + "[2] loop": str(Seq(notes["loop"]).reverse_complement()), + "[3] 3' stem": str(Seq(notes["5' stem"]).reverse_complement()), + "[4] 3' tail": str(Seq(notes["5' tail"]).reverse_complement()), + } + else: + notes2 = { + "[1] 5' stem": notes["5' stem"], + "[2] loop": notes["loop"], + "[3] 3' stem": notes["3' stem"], + "[4] 3' tail": notes["3' tail"], + } + + qualifiers = { + "score": [pd["confidence"]], + "source": ["TranstermHP"], + "ID": ["terminator_%s" % idx], + } + current_start = min(start, end) - 1 + current_end = max(start, end) + + if pd["strand"] == "+": + # Let's extend the current_end to include any Ts we find. + # Take the 3' tail, and check to see how many Ts we can strip: + # + # Updated algo: take as many chars as possible until >1 is non-T + # If the non-T is last, strip. + # Otherwise (internal), leave it. + addition = "" + prime3tail = notes["3' tail"] + for idx in range(len(prime3tail)): + if ( + prime3tail[idx] != "T" + and addition.count("A") + + addition.count("C") + + addition.count("G") + == 1 + ): + break + + addition += prime3tail[idx] + + if addition[-1] != "T": + addition = addition[0:-1] + + current_end += len(addition) + else: + addition = "" + prime5tail = notes["5' tail"][::-1] + for idx in range(len(prime5tail)): + if ( + prime5tail[idx] != "A" + and addition.count("T") + + addition.count("C") + + addition.count("G") + == 1 + ): + break + + addition += prime5tail[idx] + + if addition[-1] != "A": + addition = addition[0:-1] + + current_start -= len(addition) + + qualifiers.update(notes2) + feature = gffSeqFeature( + FeatureLocation(current_start, current_end), + type="terminator", + strand=1 if pd["strand"] == "+" else -1, + qualifiers=qualifiers, + ) + record.features.append(feature) + + yield record + + +def main(fasta, gff3, existing_expterm="", **kwargs): + coords_file = generate_annotation_file(gff3) + transterm_output = run_transterm(existing_expterm, fasta, coords_file) + try: + os.unlink(coords_file) + except Exception: + pass + # Not my job + + for record in parse_transterm(transterm_output): + yield record + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Export corresponding sequence in genome from GFF3", epilog="" + ) + parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") + parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File") + + parser.add_argument( + "--min_conf", + type=int, + default=76, + help="Only output terminators with confidence >= n", + ) + + # parser.add_argument('--gc', type=float, default=-2.3, help='Score of a G-C pair') + # parser.add_argument('--au', type=float, default=-0.9, help='Score of an A-U pair') + # parser.add_argument('--gu', type=float, default=1.3, help='Score of a G-U pair') + # parser.add_argument('--mm', type=float, default=3.5, help='Score of any other pair') + # parser.add_argument('--gap', type=int, default=6, help='Score of a gap in the hairpin') + # parser.add_argument('--max_hp_score', type=float, default=-2, help='Maximum allowable hairpin score') + # parser.add_argument('--max_tail_score', type=float, default=-2.5, help='Maximum allowable tail score') + # parser.add_argument('--max_len', type=int, default=59, help='Total extent of hairpin <= n NT long') + # parser.add_argument('--min_stem', type=int, default=4, help='Stem must be n nucleotides long') + # parser.add_argument('--max_loop', type=int, default=13, help='The loop portion can be no longer than n') + # parser.add_argument('--min_loop', type=int, default=3, help='Loop portion of the hairpin must be at least n long') + # parser.add_argument('--uwin_require', type=int, default=3, help='Number of "U" nucleotides in the --uwin_length long region.') + # parser.add_argument('--loop_penalty', default='1,2,3,4,5,6,7,8,9,10,11', help='The cost of loops of various lengths can be set using --loop_penalty=f1,f2,f3,f4,f5,...fn, where f1 is the cost of a loop of length --min_loop, f2 is the cost of a loop of length --min_loop+1, as so on. If there are too few terms to cover up to max_loop, the last term is repeated.',) + + args = parser.parse_args() + + for record in main( + existing_expterm=os.path.join(SCRIPT_PATH, "expterm.dat"), **vars(args) + ): + gffWrite([record], sys.stdout)