view utils/stats.py @ 18:e4d75f9efb90 draft

planemo upload commit b'4303231da9e48b2719b4429a29b72421d24310f4\n'-dirty
author nick
date Thu, 02 Feb 2017 18:44:31 -0500
parents af383638de66
children
line wrap: on
line source

#!/usr/bin/env python
from __future__ import division
import os
import sys
import math
import argparse
sys.path.append(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
import seqtools
import swalign

INF = float('inf')
STATS = ('diffs', 'diffs-binned', 'seqlen', 'strand')
OPT_DEFAULTS = {'bins':10, 'probes':'', 'thres':0.75}
USAGE = "%(prog)s [options]"
DESCRIPTION = """"""


def main(argv):

  parser = argparse.ArgumentParser(description=DESCRIPTION)
  parser.set_defaults(**OPT_DEFAULTS)

  parser.add_argument('stats',
    help='The type of statistics to compute and print. Give a comma-separated list of stat names, '
         'choosing from "{}".'.format('", "'.join(STATS)))
  parser.add_argument('infile', metavar='read-families.msa.tsv', nargs='?',
    help='The --msa output of sscs.py. Will read from stdin if not provided.')
  parser.add_argument('-b', '--bins', type=int,
    help='The number of bins to segment reads into when doing "diffs-binned".')
  parser.add_argument('-p', '--probes',
    help='Sequence excerpts from the sense strand. Required for "strand" statistic. '
         'Comma-separated.')
  parser.add_argument('-t', '--thres', type=int,
    help='Alignment identity threshold (in fraction, not decimal). Default: %(default)s')

  args = parser.parse_args(argv[1:])

  stats = args.stats.split(',')
  for stat in stats:
    if stat not in STATS:
      fail('Error: invalid statistic "{}". Must choose one of "{}".'.format(stat, '", "'.join(STATS)))
  if 'strand' in stats and not args.probes:
    fail('Error: must provide a probe if requesting "strand" statistic.')

  if args.infile:
    infile = open(args.infile)
  else:
    infile = sys.stdin

  family = []
  consensus = None
  barcode = None
  for line in infile:
    fields = line.rstrip('\r\n').split('\t')
    if len(fields) != 3:
      continue
    (this_barcode, name, seq) = fields
    if fields[1] == 'CONSENSUS':
      if family and consensus:
        process_family(stats, barcode, consensus, family, args)
      barcode = this_barcode
      consensus = seq
      family = []
    else:
      family.append(seq)
  if family and consensus:
    process_family(stats, barcode, consensus, family, args)

  if infile is not sys.stdin:
    infile.close()


#TODO: Maybe print the number of N's in the consensus?
def process_family(stats, barcode, consensus, family, args):
  # Compute stats requiring the whole family at once.
  for stat in stats:
    if stat == 'diffs':
      diffs = seqtools.get_diffs_frac_simple(consensus, family)
    elif stat == 'diffs-binned':
      diffs_binned = seqtools.get_diffs_frac_binned(consensus, family, args.bins)
    elif stat == 'strand':
      probes = args.probes.split(',')
      strand = get_strand(consensus, probes, args.thres)
  # Print the requested stats for each read.
  # Columns: barcode, [stat columns], read sequence.
  for (i, read) in enumerate(family):
    sys.stdout.write(barcode+'\t')
    for stat in stats:
      if stat == 'diffs':
        sys.stdout.write('{}\t'.format(round_sig_figs(diffs[i], 3)))
      elif stat == 'diffs-binned':
        if diffs_binned is None:
          sys.stdout.write('\t' * args.bins)
        else:
          for diff in diffs_binned[i]:
            sys.stdout.write(str(round_sig_figs(diff, 3))+'\t')
      elif stat == 'seqlen':
        sys.stdout.write('{}\t'.format(len(read)))
      elif stat == 'strand':
        sys.stdout.write('{}\t'.format(strand))
    print read.upper()


def get_strand(seq, probes, thres):
  """Determine which strand the sequence comes from by trying to align probes from the sense strand.
  Returns 'sense', 'anti', or None.
  Algorithm: This tries each probe in both directions.
  If at least one of the alignments has an identity above the threshold, a vote is cast for the
  direction with a higher identity.
  If the votes that were cast are unanimous for one direction, that strand is returned.
  Else, return None."""
  votes = []
  for probe in probes:
    alignment = swalign.smith_waterman(seq, probe)
    sense_id = alignment.matches/len(probe)
    alignment = swalign.smith_waterman(seq, seqtools.get_revcomp(probe))
    anti_id  = alignment.matches/len(probe)
    # print '{}: sense: {}, anti: {}'.format(probe, sense_id, anti_id)
    if sense_id > thres or anti_id > thres:
      if sense_id > anti_id:
        votes.append('sense')
      else:
        votes.append('anti')
  strand = None
  for vote in votes:
    if strand:
      if strand != vote:
        return None
    else:
      strand = vote
  return strand


def round_sig_figs(n, figs):
  if n == 0:
    return n
  elif n < 0:
    n = -n
    sign = -1
  elif n > 0:
    sign = 1
  elif math.isnan(n) or n == INF:
    return n
  magnitude = int(math.floor(math.log10(n)))
  return sign * round(n, figs - 1 - magnitude)


def fail(message):
  sys.stderr.write(message+"\n")
  sys.exit(1)


if __name__ == '__main__':
  sys.exit(main(sys.argv))