Mercurial > repos > cpt_testbed > suite_work2
view transterm.py @ 0:d5c3354c166d draft default tip
Uploaded
author | cpt_testbed |
---|---|
date | Fri, 29 Apr 2022 10:33:36 +0000 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python import os import sys import argparse import subprocess import tempfile import re import logging from Bio.Seq import Seq from Bio.SeqRecord import SeqRecord from Bio.SeqFeature import SeqFeature, FeatureLocation from CPT_GFFParser import gffParse, gffWrite, gffSeqFeature from gff3 import feature_lambda, feature_test_type logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__)) REGEX_TERM = re.compile( " TERM \d+ \s* (\d+) - (\d+)\s*(\+|-) [^ ]* \s* (\d+)\s* ([0-9.-]+)\s* -([0-9.]+)\s*\|\s*(.*)" ) PARTS_TERM = re.compile(" ([^ ]*)\s+([^ ]*)\s+([^ ]*)\s+([^ ]*)\s+([^ ]*)") COLS = ( "start", "end", "strand", "confidence", "hp score", "tail score", "notes", "5' tail", "5' stem", "loop", "3' stem", "3' tail", ) def build_expterm(): pass def generate_annotation_file(gff3): # TODO: cleanup t = tempfile.NamedTemporaryFile(mode="w",delete=False, suffix=".coords") for rec in gffParse(gff3): features = feature_lambda( rec.features, feature_test_type, {"type": "CDS"}, subfeatures=False ) for feature in sorted(features, key=lambda x: x.location.start): t.write( "\t".join( map( str, [ feature.id, feature.location.start + 1, feature.location.end, rec.id, ], ) ) + "\n" ) name = t.name t.close() return name def run_transterm(expterm, fasta, annotations): output = subprocess.check_output( ["transterm", "-p", expterm, "--all-context", fasta.name, annotations] ) return output def pairwise(it): it = iter(it) while True: try: yield next(it), next(it) except StopIteration: return def parse_transterm(data): data = data.decode("utf-8") data = data.split("SEQUENCE")[1:] for datum in data: lines = datum.split("\n") seq_name = lines[0][1:] seq_name = seq_name[0 : seq_name.index(" ")] record = SeqRecord(Seq("ACTG"), id=seq_name) important_lines = [ x for x in lines if len(x.strip()) > 0 and x.startswith(" ") ] # Must have an even # assert len(important_lines) % 2 == 0 for idx, (a, b) in enumerate(pairwise(important_lines)): parsed_data = REGEX_TERM.match(a).groups() + PARTS_TERM.match(b).groups() pd = {k: v for (k, v) in zip(COLS, parsed_data)} # start , end , strand , confidence , hp score , tail score , notes # , 5' tail , 5'stem , loop , 3' stem , 3'loop start = int(pd["start"]) end = int(pd["end"]) notes = {k: pd[k] for k in COLS[6:]} notes2 = {} # However, if on - strand, we need to swap 5', 3': if pd["strand"] == "-": # Also need to flip the data notes2 = { "[1] 5' stem": str(Seq(notes["3' stem"]).reverse_complement()), "[2] loop": str(Seq(notes["loop"]).reverse_complement()), "[3] 3' stem": str(Seq(notes["5' stem"]).reverse_complement()), "[4] 3' tail": str(Seq(notes["5' tail"]).reverse_complement()), } else: notes2 = { "[1] 5' stem": notes["5' stem"], "[2] loop": notes["loop"], "[3] 3' stem": notes["3' stem"], "[4] 3' tail": notes["3' tail"], } qualifiers = { "score": [pd["confidence"]], "source": ["TranstermHP"], "ID": ["terminator_%s" % idx], } current_start = min(start, end) - 1 current_end = max(start, end) if pd["strand"] == "+": # Let's extend the current_end to include any Ts we find. # Take the 3' tail, and check to see how many Ts we can strip: # # Updated algo: take as many chars as possible until >1 is non-T # If the non-T is last, strip. # Otherwise (internal), leave it. addition = "" prime3tail = notes["3' tail"] for idx in range(len(prime3tail)): if ( prime3tail[idx] != "T" and addition.count("A") + addition.count("C") + addition.count("G") == 1 ): break addition += prime3tail[idx] if addition[-1] != "T": addition = addition[0:-1] current_end += len(addition) else: addition = "" prime5tail = notes["5' tail"][::-1] for idx in range(len(prime5tail)): if ( prime5tail[idx] != "A" and addition.count("T") + addition.count("C") + addition.count("G") == 1 ): break addition += prime5tail[idx] if addition[-1] != "A": addition = addition[0:-1] current_start -= len(addition) qualifiers.update(notes2) feature = gffSeqFeature( FeatureLocation(current_start, current_end), type="terminator", strand=1 if pd["strand"] == "+" else -1, qualifiers=qualifiers, ) record.features.append(feature) yield record def main(fasta, gff3, existing_expterm="", **kwargs): coords_file = generate_annotation_file(gff3) transterm_output = run_transterm(existing_expterm, fasta, coords_file) try: os.unlink(coords_file) except Exception: pass # Not my job for record in parse_transterm(transterm_output): yield record if __name__ == "__main__": parser = argparse.ArgumentParser( description="Export corresponding sequence in genome from GFF3", epilog="" ) parser.add_argument("fasta", type=argparse.FileType("r"), help="Fasta Genome") parser.add_argument("gff3", type=argparse.FileType("r"), help="GFF3 File") parser.add_argument( "--min_conf", type=int, default=76, help="Only output terminators with confidence >= n", ) # parser.add_argument('--gc', type=float, default=-2.3, help='Score of a G-C pair') # parser.add_argument('--au', type=float, default=-0.9, help='Score of an A-U pair') # parser.add_argument('--gu', type=float, default=1.3, help='Score of a G-U pair') # parser.add_argument('--mm', type=float, default=3.5, help='Score of any other pair') # parser.add_argument('--gap', type=int, default=6, help='Score of a gap in the hairpin') # parser.add_argument('--max_hp_score', type=float, default=-2, help='Maximum allowable hairpin score') # parser.add_argument('--max_tail_score', type=float, default=-2.5, help='Maximum allowable tail score') # parser.add_argument('--max_len', type=int, default=59, help='Total extent of hairpin <= n NT long') # parser.add_argument('--min_stem', type=int, default=4, help='Stem must be n nucleotides long') # parser.add_argument('--max_loop', type=int, default=13, help='The loop portion can be no longer than n') # parser.add_argument('--min_loop', type=int, default=3, help='Loop portion of the hairpin must be at least n long') # parser.add_argument('--uwin_require', type=int, default=3, help='Number of "U" nucleotides in the --uwin_length long region.') # parser.add_argument('--loop_penalty', default='1,2,3,4,5,6,7,8,9,10,11', help='The cost of loops of various lengths can be set using --loop_penalty=f1,f2,f3,f4,f5,...fn, where f1 is the cost of a loop of length --min_loop, f2 is the cost of a loop of length --min_loop+1, as so on. If there are too few terms to cover up to max_loop, the last term is repeated.',) args = parser.parse_args() for record in main( existing_expterm=os.path.join(SCRIPT_PATH, "expterm.dat"), **vars(args) ): gffWrite([record], sys.stdout)