Mercurial > repos > nick > duplex
view dunovo.py @ 18:e4d75f9efb90 draft
planemo upload commit b'4303231da9e48b2719b4429a29b72421d24310f4\n'-dirty
author | nick |
---|---|
date | Thu, 02 Feb 2017 18:44:31 -0500 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python from __future__ import division import os import sys import time import logging import tempfile import argparse import subprocess import collections import consensus import swalign SANGER_START = 33 SOLEXA_START = 64 OPT_DEFAULTS = {'min_reads':3, 'processes':1, 'qual':20, 'qual_format':'sanger'} USAGE = "%(prog)s [options]" DESCRIPTION = """Build consensus sequences from read aligned families. Prints duplex consensus sequences in FASTA to stdout. The sequence ids are BARCODE.MATE, e.g. "CTCAGATAACATACCTTATATGCA.1", where "BARCODE" is the input barcode, and "MATE" is "1" or "2" as an arbitrary designation of the two reads in the pair. The id is followed by the count of the number of reads in the two families (one from each strand) that make up the duplex, in the format READS1/READS2. If the duplex is actually a single-strand consensus because the matching strand is missing, only one number is listed. Rules for consensus building: Single-strand consensus sequences are made by counting how many of each base are at a given position. Bases with a PHRED quality score below the --qual threshold are not counted. If a majority of the reads (that pass the --qual threshold at that position) have one base at that position, then that base is used as the consensus base. If no base has a majority, then an N is used. Duplex consensus sequences are made by aligning pairs of single-strand consensuses, and comparing bases at each position. If they agree, that base is used in the consensus. Otherwise, the IUPAC ambiguity code for both bases is used (N + anything and gap + non-gap result in an N).""" def main(argv): parser = argparse.ArgumentParser(description=DESCRIPTION) parser.set_defaults(**OPT_DEFAULTS) parser.add_argument('infile', metavar='read-families.tsv', nargs='?', help='The output of align_families.py. 6 columns: 1. (canonical) barcode. 2. order ("ab" or ' '"ba"). 3. mate ("1" or "2"). 4. read name. 5. aligned sequence. 6. aligned quality ' 'scores.') parser.add_argument('-r', '--min-reads', type=int, help='The minimum number of reads (from each strand) required to form a single-strand ' 'consensus. Strands with fewer reads will be skipped. Default: %(default)s.') parser.add_argument('-q', '--qual', type=int, help='Base quality threshold. Bases below this quality will not be counted. ' 'Default: %(default)s.') parser.add_argument('-F', '--qual-format', choices=('sanger', 'solexa'), help='FASTQ quality score format. Sanger scores are assumed to begin at \'{}\' ({}). Default: ' '%(default)s.'.format(SANGER_START, chr(SANGER_START))) parser.add_argument('--incl-sscs', action='store_true', help='When outputting duplex consensus sequences, include reads without a full duplex (missing ' 'one strand). The result will just be the single-strand consensus of the remaining read.') parser.add_argument('-s', '--sscs-file', help='Save single-strand consensus sequences in this file (FASTA format). Currently does not ' 'work when in parallel mode.') parser.add_argument('-f', '--family-log', type=argparse.FileType('w'), help='Save a record of the reads in each family to this file.') parser.add_argument('-l', '--log', metavar='LOG_FILE', dest='stats_file', help='Print statistics on the run to this file. Use "-" to print to stderr.') parser.add_argument('-p', '--processes', type=int, help='Number of processes to use. If > 1, launches this many worker subprocesses. Note: if ' 'this option is used, no output will be generated until the end of the entire run, so no ' 'streaming is possible. Default: %(default)s.') args = parser.parse_args(argv[1:]) assert args.processes > 0, '-p must be greater than zero' # Make dict of process_family() parameters that don't change between families. static = {} static['processes'] = args.processes static['incl_sscs'] = args.incl_sscs static['min_reads'] = args.min_reads if args.sscs_file: static['sscs_fh'] = open(args.sscs_file, 'w') if args.qual_format == 'sanger': static['qual_thres'] = chr(args.qual + SANGER_START) elif args.qual_format == 'solexa': static['qual_thres'] = chr(args.qual + SOLEXA_START) else: fail('Error: unrecognized --qual-format.') if args.infile: infile = open(args.infile) else: infile = sys.stdin if args.stats_file: if args.stats_file == '-': logging.basicConfig(stream=sys.stderr, level=logging.INFO, format='%(message)s') else: logging.basicConfig(filename=args.stats_file, filemode='w', level=logging.INFO, format='%(message)s') else: logging.disable(logging.CRITICAL) # Open all the worker processes, if we're using more than one. workers = None if args.processes > 1: workers = open_workers(args.processes, args) stats = {'time':0, 'reads':0, 'runs':0, 'families':0} all_reads = 0 duplex = collections.OrderedDict() family = [] barcode = None order = None mate = None for line in infile: fields = line.rstrip('\r\n').split('\t') if len(fields) != 6: continue (this_barcode, this_order, this_mate, name, seq, qual) = fields this_mate = int(this_mate) # If the barcode or order has changed, we're in a new single-stranded family. # Process the reads we've previously gathered as one family and start a new family. if this_barcode != barcode or this_order != order or this_mate != mate: duplex[(order, mate)] = family # We're at the end of the duplex pair if the barcode changes or if the order changes without # the mate changing, or vice versa (the second read in each duplex comes when the barcode # stays the same while both the order and mate switch). Process the duplex and start # a new one. If the barcode is the same, we're in the same duplex, but we've switched strands. if this_barcode != barcode or not (this_order != order and this_mate != mate): # sys.stderr.write('New duplex: {}, {}, {}\n'.format(this_barcode, this_order, this_mate)) log_family(args.family_log, duplex, barcode, args.min_reads) process_duplex(duplex, barcode, workers=workers, stats=stats, **static) duplex = collections.OrderedDict() # else: # sys.stderr.write('Same duplex: {}, {}, {}\n'.format(this_barcode, this_order, this_mate)) barcode = this_barcode order = this_order mate = this_mate family = [] read = {'name': name, 'seq':seq, 'qual':qual} family.append(read) all_reads += 1 # Process the last family. duplex[(order, mate)] = family log_family(args.family_log, duplex, barcode, args.min_reads) process_duplex(duplex, barcode, workers=workers, stats=stats, **static) if args.processes > 1: close_workers(workers) compile_results(workers) delete_tempfiles(workers) if args.sscs_file: static['sscs_fh'].close() if infile is not sys.stdin: infile.close() if not args.stats_file: return # Final stats on the run. logging.info('Processed {} reads and {} duplexes.' .format(all_reads, stats['runs'])) per_read = stats['time'] / stats['reads'] per_run = stats['time'] / stats['runs'] logging.info('{:0.3f}s per read, {:0.3f}s per run.'.format(per_read, per_run)) def open_workers(num_workers, args): """Open the required number of worker processes.""" script_path = os.path.realpath(sys.argv[0]) workers = [] for i in range(num_workers): if args.slurm: command = ['srun', '-C', 'new', 'python', script_path] else: command = ['python', script_path] arguments = gather_args(sys.argv, args.infile) command.extend(arguments) stats_subfile = None if args.stats_file: if args.stats_file == '-': stats_subfile = '-' else: stats_subfile = "{}.{}.log".format(args.stats_file, i) command.extend(['-s', stats_subfile]) outfile = tempfile.NamedTemporaryFile('w', delete=False, prefix='sscs.out.part.') process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=outfile) worker = {'proc':process, 'outfile':outfile, 'stats':stats_subfile} workers.append(worker) return workers def gather_args(args, infile, excluded_flags={'-S', '--slurm'}, excluded_args={'-p', '--processes', '-l', '--log', '-s', '--sscs-file'}): """Take the full list of command-line arguments and return only the ones which should be passed to worker processes. Excludes the 0th argument (the command name), the input filename ("infile"), all arguments in "excluded_flags", and all arguments in "excluded_args" plus the arguments which follow.""" out_args = [] skip = True for arg in args: if skip: skip = False continue if arg in excluded_flags: continue if arg in excluded_args: skip = True continue if arg == infile: continue out_args.append(arg) return out_args def delegate(worker, duplex, barcode): """Send a family to a worker process.""" for (order, mate), family in duplex.items(): for read in family: line = '{}\t{}\t{}\t{name}\t{seq}\t{qual}\n'.format(barcode, order, mate, **read) if family: worker['proc'].stdin.write(line) def close_workers(workers): for worker in workers: worker['outfile'].close() worker['proc'].stdin.close() def compile_results(workers): for worker in workers: worker['proc'].wait() with open(worker['outfile'].name, 'r') as outfile: for line in outfile: sys.stdout.write(line) def delete_tempfiles(workers): for worker in workers: os.remove(worker['outfile'].name) if worker['stats']: os.remove(worker['stats']) def log_family(family_log, duplex, barcode, min_reads): """Write a record of the reads in this family.""" if not family_log: return for (order, mate), family in duplex.items(): if len(family) < min_reads: continue def process_duplex(duplex, barcode, workers=None, stats=None, incl_sscs=False, sscs_fh=None, processes=1, min_reads=1, qual_thres=' '): stats['families'] += 1 # Are we the controller process or a worker? if processes > 1: i = stats['families'] % len(workers) worker = workers[i] delegate(worker, duplex, barcode) return # We're a worker. Actually process the family. start = time.time() consensi = [] reads_per_strand = [] duplex_mate = None for (order, mate), family in duplex.items(): reads = len(family) if reads < min_reads: continue # The mate number for the duplex consensus. It's arbitrary, but all that matters is that the # two mates have different numbers. This system ensures that: # Mate 1 is from the consensus of ab/1 and ba/2 families, while mate 2 is from ba/1 and ab/2. if (order == 'ab' and mate == 1) or (order == 'ba' and mate == 2): duplex_mate = 1 else: duplex_mate = 2 seqs = [read['seq'] for read in family] quals = [read['qual'] for read in family] consensi.append(consensus.get_consensus(seqs, quals, qual_thres=qual_thres)) reads_per_strand.append(reads) assert len(consensi) <= 2 if sscs_fh: for cons, (order, mate), reads in zip(consensi, duplex.keys(), reads_per_strand): sscs_fh.write('>{bar}.{order}.{mate} {reads}\n'.format(bar=barcode, order=order, mate=mate, reads=reads)) sscs_fh.write(cons+'\n') if len(consensi) == 1 and incl_sscs: print_duplex(consensi[0], barcode, duplex_mate, reads_per_strand) elif len(consensi) == 2: align = swalign.smith_waterman(*consensi) #TODO: log error & return if len(align.target) != len(align.query) cons = consensus.build_consensus_duplex_simple(align.target, align.query) print_duplex(cons, barcode, duplex_mate, reads_per_strand) elapsed = time.time() - start logging.info('{} sec for {} reads.'.format(elapsed, sum(reads_per_strand))) if stats and len(consensi) > 0: stats['time'] += elapsed stats['reads'] += sum(reads_per_strand) stats['runs'] += 1 def print_duplex(cons, barcode, mate, reads_per_strand, outfile=sys.stdout): header = '>{bar}.{mate} {reads}'.format(bar=barcode, mate=mate, reads='-'.join(map(str, reads_per_strand))) outfile.write(header+'\n') outfile.write(cons+'\n') def read_fasta(fasta, is_file=True): """Quick and dirty FASTA parser. Return the sequences and their names. Returns a list of sequences. Each is a dict of 'name' and 'seq'. Warning: Reads the entire contents of the file into memory at once.""" sequences = [] seq_lines = [] seq_name = None if is_file: with open(fasta) as fasta_file: fasta_lines = fasta_file.readlines() else: fasta_lines = fasta.splitlines() for line in fasta_lines: if line.startswith('>'): if seq_lines: sequences.append({'name':seq_name, 'seq':''.join(seq_lines)}) seq_lines = [] seq_name = line.rstrip('\r\n')[1:] continue seq_lines.append(line.strip()) if seq_lines: sequences.append({'name':seq_name, 'seq':''.join(seq_lines)}) return sequences def fail(message): sys.stderr.write(message+"\n") sys.exit(1) if __name__ == '__main__': sys.exit(main(sys.argv))