Mercurial > repos > nick > duplex
diff utils/stats.py @ 18:e4d75f9efb90 draft
planemo upload commit b'4303231da9e48b2719b4429a29b72421d24310f4\n'-dirty
author | nick |
---|---|
date | Thu, 02 Feb 2017 18:44:31 -0500 |
parents | af383638de66 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/utils/stats.py Thu Feb 02 18:44:31 2017 -0500 @@ -0,0 +1,154 @@ +#!/usr/bin/env python +from __future__ import division +import os +import sys +import math +import argparse +sys.path.append(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) +import seqtools +import swalign + +INF = float('inf') +STATS = ('diffs', 'diffs-binned', 'seqlen', 'strand') +OPT_DEFAULTS = {'bins':10, 'probes':'', 'thres':0.75} +USAGE = "%(prog)s [options]" +DESCRIPTION = """""" + + +def main(argv): + + parser = argparse.ArgumentParser(description=DESCRIPTION) + parser.set_defaults(**OPT_DEFAULTS) + + parser.add_argument('stats', + help='The type of statistics to compute and print. Give a comma-separated list of stat names, ' + 'choosing from "{}".'.format('", "'.join(STATS))) + parser.add_argument('infile', metavar='read-families.msa.tsv', nargs='?', + help='The --msa output of sscs.py. Will read from stdin if not provided.') + parser.add_argument('-b', '--bins', type=int, + help='The number of bins to segment reads into when doing "diffs-binned".') + parser.add_argument('-p', '--probes', + help='Sequence excerpts from the sense strand. Required for "strand" statistic. ' + 'Comma-separated.') + parser.add_argument('-t', '--thres', type=int, + help='Alignment identity threshold (in fraction, not decimal). Default: %(default)s') + + args = parser.parse_args(argv[1:]) + + stats = args.stats.split(',') + for stat in stats: + if stat not in STATS: + fail('Error: invalid statistic "{}". Must choose one of "{}".'.format(stat, '", "'.join(STATS))) + if 'strand' in stats and not args.probes: + fail('Error: must provide a probe if requesting "strand" statistic.') + + if args.infile: + infile = open(args.infile) + else: + infile = sys.stdin + + family = [] + consensus = None + barcode = None + for line in infile: + fields = line.rstrip('\r\n').split('\t') + if len(fields) != 3: + continue + (this_barcode, name, seq) = fields + if fields[1] == 'CONSENSUS': + if family and consensus: + process_family(stats, barcode, consensus, family, args) + barcode = this_barcode + consensus = seq + family = [] + else: + family.append(seq) + if family and consensus: + process_family(stats, barcode, consensus, family, args) + + if infile is not sys.stdin: + infile.close() + + +#TODO: Maybe print the number of N's in the consensus? +def process_family(stats, barcode, consensus, family, args): + # Compute stats requiring the whole family at once. + for stat in stats: + if stat == 'diffs': + diffs = seqtools.get_diffs_frac_simple(consensus, family) + elif stat == 'diffs-binned': + diffs_binned = seqtools.get_diffs_frac_binned(consensus, family, args.bins) + elif stat == 'strand': + probes = args.probes.split(',') + strand = get_strand(consensus, probes, args.thres) + # Print the requested stats for each read. + # Columns: barcode, [stat columns], read sequence. + for (i, read) in enumerate(family): + sys.stdout.write(barcode+'\t') + for stat in stats: + if stat == 'diffs': + sys.stdout.write('{}\t'.format(round_sig_figs(diffs[i], 3))) + elif stat == 'diffs-binned': + if diffs_binned is None: + sys.stdout.write('\t' * args.bins) + else: + for diff in diffs_binned[i]: + sys.stdout.write(str(round_sig_figs(diff, 3))+'\t') + elif stat == 'seqlen': + sys.stdout.write('{}\t'.format(len(read))) + elif stat == 'strand': + sys.stdout.write('{}\t'.format(strand)) + print read.upper() + + +def get_strand(seq, probes, thres): + """Determine which strand the sequence comes from by trying to align probes from the sense strand. + Returns 'sense', 'anti', or None. + Algorithm: This tries each probe in both directions. + If at least one of the alignments has an identity above the threshold, a vote is cast for the + direction with a higher identity. + If the votes that were cast are unanimous for one direction, that strand is returned. + Else, return None.""" + votes = [] + for probe in probes: + alignment = swalign.smith_waterman(seq, probe) + sense_id = alignment.matches/len(probe) + alignment = swalign.smith_waterman(seq, seqtools.get_revcomp(probe)) + anti_id = alignment.matches/len(probe) + # print '{}: sense: {}, anti: {}'.format(probe, sense_id, anti_id) + if sense_id > thres or anti_id > thres: + if sense_id > anti_id: + votes.append('sense') + else: + votes.append('anti') + strand = None + for vote in votes: + if strand: + if strand != vote: + return None + else: + strand = vote + return strand + + +def round_sig_figs(n, figs): + if n == 0: + return n + elif n < 0: + n = -n + sign = -1 + elif n > 0: + sign = 1 + elif math.isnan(n) or n == INF: + return n + magnitude = int(math.floor(math.log10(n))) + return sign * round(n, figs - 1 - magnitude) + + +def fail(message): + sys.stderr.write(message+"\n") + sys.exit(1) + + +if __name__ == '__main__': + sys.exit(main(sys.argv))