diff dunovo.py @ 18:e4d75f9efb90 draft

planemo upload commit b'4303231da9e48b2719b4429a29b72421d24310f4\n'-dirty
author nick
date Thu, 02 Feb 2017 18:44:31 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dunovo.py	Thu Feb 02 18:44:31 2017 -0500
@@ -0,0 +1,340 @@
+#!/usr/bin/env python
+from __future__ import division
+import os
+import sys
+import time
+import logging
+import tempfile
+import argparse
+import subprocess
+import collections
+import consensus
+import swalign
+
+SANGER_START = 33
+SOLEXA_START = 64
+OPT_DEFAULTS = {'min_reads':3, 'processes':1, 'qual':20, 'qual_format':'sanger'}
+USAGE = "%(prog)s [options]"
+DESCRIPTION = """Build consensus sequences from read aligned families. Prints duplex consensus
+sequences in FASTA to stdout. The sequence ids are BARCODE.MATE, e.g. "CTCAGATAACATACCTTATATGCA.1",
+where "BARCODE" is the input barcode, and "MATE" is "1" or "2" as an arbitrary designation of the
+two reads in the pair. The id is followed by the count of the number of reads in the two families
+(one from each strand) that make up the duplex, in the format READS1/READS2. If the duplex is
+actually a single-strand consensus because the matching strand is missing, only one number is
+listed.
+Rules for consensus building: Single-strand consensus sequences are made by counting how many of
+each base are at a given position. Bases with a PHRED quality score below the --qual threshold are
+not counted. If a majority of the reads (that pass the --qual threshold at that position) have one
+base at that position, then that base is used as the consensus base. If no base has a majority, then
+an N is used. Duplex consensus sequences are made by aligning pairs of single-strand consensuses,
+and comparing bases at each position. If they agree, that base is used in the consensus. Otherwise,
+the IUPAC ambiguity code for both bases is used (N + anything and gap + non-gap result in an N)."""
+
+
+def main(argv):
+
+  parser = argparse.ArgumentParser(description=DESCRIPTION)
+  parser.set_defaults(**OPT_DEFAULTS)
+
+  parser.add_argument('infile', metavar='read-families.tsv', nargs='?',
+    help='The output of align_families.py. 6 columns: 1. (canonical) barcode. 2. order ("ab" or '
+         '"ba"). 3. mate ("1" or "2"). 4. read name. 5. aligned sequence. 6. aligned quality '
+         'scores.')
+  parser.add_argument('-r', '--min-reads', type=int,
+    help='The minimum number of reads (from each strand) required to form a single-strand '
+         'consensus. Strands with fewer reads will be skipped. Default: %(default)s.')
+  parser.add_argument('-q', '--qual', type=int,
+    help='Base quality threshold. Bases below this quality will not be counted. '
+         'Default: %(default)s.')
+  parser.add_argument('-F', '--qual-format', choices=('sanger', 'solexa'),
+    help='FASTQ quality score format. Sanger scores are assumed to begin at \'{}\' ({}). Default: '
+         '%(default)s.'.format(SANGER_START, chr(SANGER_START)))
+  parser.add_argument('--incl-sscs', action='store_true',
+    help='When outputting duplex consensus sequences, include reads without a full duplex (missing '
+         'one strand). The result will just be the single-strand consensus of the remaining read.')
+  parser.add_argument('-s', '--sscs-file',
+    help='Save single-strand consensus sequences in this file (FASTA format). Currently does not '
+         'work when in parallel mode.')
+  parser.add_argument('-f', '--family-log', type=argparse.FileType('w'),
+    help='Save a record of the reads in each family to this file.')
+  parser.add_argument('-l', '--log', metavar='LOG_FILE', dest='stats_file',
+    help='Print statistics on the run to this file. Use "-" to print to stderr.')
+  parser.add_argument('-p', '--processes', type=int,
+    help='Number of processes to use. If > 1, launches this many worker subprocesses. Note: if '
+         'this option is used, no output will be generated until the end of the entire run, so no '
+         'streaming is possible. Default: %(default)s.')
+
+  args = parser.parse_args(argv[1:])
+
+  assert args.processes > 0, '-p must be greater than zero'
+  # Make dict of process_family() parameters that don't change between families.
+  static = {}
+  static['processes'] = args.processes
+  static['incl_sscs'] = args.incl_sscs
+  static['min_reads'] = args.min_reads
+  if args.sscs_file:
+    static['sscs_fh'] = open(args.sscs_file, 'w')
+  if args.qual_format == 'sanger':
+    static['qual_thres'] = chr(args.qual + SANGER_START)
+  elif args.qual_format == 'solexa':
+    static['qual_thres'] = chr(args.qual + SOLEXA_START)
+  else:
+    fail('Error: unrecognized --qual-format.')
+
+  if args.infile:
+    infile = open(args.infile)
+  else:
+    infile = sys.stdin
+
+  if args.stats_file:
+    if args.stats_file == '-':
+      logging.basicConfig(stream=sys.stderr, level=logging.INFO, format='%(message)s')
+    else:
+      logging.basicConfig(filename=args.stats_file, filemode='w', level=logging.INFO,
+                          format='%(message)s')
+  else:
+    logging.disable(logging.CRITICAL)
+
+  # Open all the worker processes, if we're using more than one.
+  workers = None
+  if args.processes > 1:
+    workers = open_workers(args.processes, args)
+
+  stats = {'time':0, 'reads':0, 'runs':0, 'families':0}
+  all_reads = 0
+  duplex = collections.OrderedDict()
+  family = []
+  barcode = None
+  order = None
+  mate = None
+  for line in infile:
+    fields = line.rstrip('\r\n').split('\t')
+    if len(fields) != 6:
+      continue
+    (this_barcode, this_order, this_mate, name, seq, qual) = fields
+    this_mate = int(this_mate)
+    # If the barcode or order has changed, we're in a new single-stranded family.
+    # Process the reads we've previously gathered as one family and start a new family.
+    if this_barcode != barcode or this_order != order or this_mate != mate:
+      duplex[(order, mate)] = family
+      # We're at the end of the duplex pair if the barcode changes or if the order changes without
+      # the mate changing, or vice versa (the second read in each duplex comes when the barcode
+      # stays the same while both the order and mate switch). Process the duplex and start
+      # a new one. If the barcode is the same, we're in the same duplex, but we've switched strands.
+      if this_barcode != barcode or not (this_order != order and this_mate != mate):
+        # sys.stderr.write('New duplex:  {}, {}, {}\n'.format(this_barcode, this_order, this_mate))
+        log_family(args.family_log, duplex, barcode, args.min_reads)
+        process_duplex(duplex, barcode, workers=workers, stats=stats, **static)
+        duplex = collections.OrderedDict()
+      # else:
+      #   sys.stderr.write('Same duplex: {}, {}, {}\n'.format(this_barcode, this_order, this_mate))
+      barcode = this_barcode
+      order = this_order
+      mate = this_mate
+      family = []
+    read = {'name': name, 'seq':seq, 'qual':qual}
+    family.append(read)
+    all_reads += 1
+  # Process the last family.
+  duplex[(order, mate)] = family
+  log_family(args.family_log, duplex, barcode, args.min_reads)
+  process_duplex(duplex, barcode, workers=workers, stats=stats, **static)
+
+  if args.processes > 1:
+    close_workers(workers)
+    compile_results(workers)
+    delete_tempfiles(workers)
+
+  if args.sscs_file:
+    static['sscs_fh'].close()
+  if infile is not sys.stdin:
+    infile.close()
+
+  if not args.stats_file:
+    return
+
+  # Final stats on the run.
+  logging.info('Processed {} reads and {} duplexes.'
+               .format(all_reads, stats['runs']))
+  per_read = stats['time'] / stats['reads']
+  per_run = stats['time'] / stats['runs']
+  logging.info('{:0.3f}s per read, {:0.3f}s per run.'.format(per_read, per_run))
+
+
+def open_workers(num_workers, args):
+  """Open the required number of worker processes."""
+  script_path = os.path.realpath(sys.argv[0])
+  workers = []
+  for i in range(num_workers):
+    if args.slurm:
+      command = ['srun', '-C', 'new', 'python', script_path]
+    else:
+      command = ['python', script_path]
+    arguments = gather_args(sys.argv, args.infile)
+    command.extend(arguments)
+    stats_subfile = None
+    if args.stats_file:
+      if args.stats_file == '-':
+        stats_subfile = '-'
+      else:
+        stats_subfile = "{}.{}.log".format(args.stats_file, i)
+      command.extend(['-s', stats_subfile])
+    outfile = tempfile.NamedTemporaryFile('w', delete=False, prefix='sscs.out.part.')
+    process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=outfile)
+    worker = {'proc':process, 'outfile':outfile, 'stats':stats_subfile}
+    workers.append(worker)
+  return workers
+
+
+def gather_args(args, infile, excluded_flags={'-S', '--slurm'},
+                excluded_args={'-p', '--processes', '-l', '--log', '-s', '--sscs-file'}):
+  """Take the full list of command-line arguments and return only the ones which
+  should be passed to worker processes.
+  Excludes the 0th argument (the command name), the input filename ("infile"), all
+  arguments in "excluded_flags", and all arguments in "excluded_args" plus the
+  arguments which follow."""
+  out_args = []
+  skip = True
+  for arg in args:
+    if skip:
+      skip = False
+      continue
+    if arg in excluded_flags:
+      continue
+    if arg in excluded_args:
+      skip = True
+      continue
+    if arg == infile:
+      continue
+    out_args.append(arg)
+  return out_args
+
+
+def delegate(worker, duplex, barcode):
+  """Send a family to a worker process."""
+  for (order, mate), family in duplex.items():
+    for read in family:
+      line = '{}\t{}\t{}\t{name}\t{seq}\t{qual}\n'.format(barcode, order, mate, **read)
+    if family:
+      worker['proc'].stdin.write(line)
+
+
+def close_workers(workers):
+  for worker in workers:
+    worker['outfile'].close()
+    worker['proc'].stdin.close()
+
+
+def compile_results(workers):
+  for worker in workers:
+    worker['proc'].wait()
+    with open(worker['outfile'].name, 'r') as outfile:
+      for line in outfile:
+        sys.stdout.write(line)
+
+
+def delete_tempfiles(workers):
+  for worker in workers:
+    os.remove(worker['outfile'].name)
+    if worker['stats']:
+      os.remove(worker['stats'])
+
+
+def log_family(family_log, duplex, barcode, min_reads):
+  """Write a record of the reads in this family."""
+  if not family_log:
+    return
+  for (order, mate), family in duplex.items():
+    if len(family) < min_reads:
+      continue
+    
+
+
+def process_duplex(duplex, barcode, workers=None, stats=None, incl_sscs=False, sscs_fh=None,
+                   processes=1, min_reads=1, qual_thres=' '):
+  stats['families'] += 1
+  # Are we the controller process or a worker?
+  if processes > 1:
+    i = stats['families'] % len(workers)
+    worker = workers[i]
+    delegate(worker, duplex, barcode)
+    return
+  # We're a worker. Actually process the family.
+  start = time.time()
+  consensi = []
+  reads_per_strand = []
+  duplex_mate = None
+  for (order, mate), family in duplex.items():
+    reads = len(family)
+    if reads < min_reads:
+      continue
+    # The mate number for the duplex consensus. It's arbitrary, but all that matters is that the
+    # two mates have different numbers. This system ensures that:
+    # Mate 1 is from the consensus of ab/1 and ba/2 families, while mate 2 is from ba/1 and ab/2.
+    if (order == 'ab' and mate == 1) or (order == 'ba' and mate == 2):
+      duplex_mate = 1
+    else:
+      duplex_mate = 2
+    seqs = [read['seq'] for read in family]
+    quals = [read['qual'] for read in family]
+    consensi.append(consensus.get_consensus(seqs, quals, qual_thres=qual_thres))
+    reads_per_strand.append(reads)
+  assert len(consensi) <= 2
+  if sscs_fh:
+    for cons, (order, mate), reads in zip(consensi, duplex.keys(), reads_per_strand):
+      sscs_fh.write('>{bar}.{order}.{mate} {reads}\n'.format(bar=barcode, order=order, mate=mate,
+                                                             reads=reads))
+      sscs_fh.write(cons+'\n')
+  if len(consensi) == 1 and incl_sscs:
+    print_duplex(consensi[0], barcode, duplex_mate, reads_per_strand)
+  elif len(consensi) == 2:
+    align = swalign.smith_waterman(*consensi)
+    #TODO: log error & return if len(align.target) != len(align.query)
+    cons = consensus.build_consensus_duplex_simple(align.target, align.query)
+    print_duplex(cons, barcode, duplex_mate, reads_per_strand)
+  elapsed = time.time() - start
+  logging.info('{} sec for {} reads.'.format(elapsed, sum(reads_per_strand)))
+  if stats and len(consensi) > 0:
+    stats['time'] += elapsed
+    stats['reads'] += sum(reads_per_strand)
+    stats['runs'] += 1
+
+
+def print_duplex(cons, barcode, mate, reads_per_strand, outfile=sys.stdout):
+  header = '>{bar}.{mate} {reads}'.format(bar=barcode, mate=mate,
+                                          reads='-'.join(map(str, reads_per_strand)))
+  outfile.write(header+'\n')
+  outfile.write(cons+'\n')
+
+
+def read_fasta(fasta, is_file=True):
+  """Quick and dirty FASTA parser. Return the sequences and their names.
+  Returns a list of sequences. Each is a dict of 'name' and 'seq'.
+  Warning: Reads the entire contents of the file into memory at once."""
+  sequences = []
+  seq_lines = []
+  seq_name = None
+  if is_file:
+    with open(fasta) as fasta_file:
+      fasta_lines = fasta_file.readlines()
+  else:
+    fasta_lines = fasta.splitlines()
+  for line in fasta_lines:
+    if line.startswith('>'):
+      if seq_lines:
+        sequences.append({'name':seq_name, 'seq':''.join(seq_lines)})
+      seq_lines = []
+      seq_name = line.rstrip('\r\n')[1:]
+      continue
+    seq_lines.append(line.strip())
+  if seq_lines:
+    sequences.append({'name':seq_name, 'seq':''.join(seq_lines)})
+  return sequences
+
+
+def fail(message):
+  sys.stderr.write(message+"\n")
+  sys.exit(1)
+
+if __name__ == '__main__':
+  sys.exit(main(sys.argv))