Mercurial > repos > nick > duplex
diff consensus.py @ 18:e4d75f9efb90 draft
planemo upload commit b'4303231da9e48b2719b4429a29b72421d24310f4\n'-dirty
| author | nick |
|---|---|
| date | Thu, 02 Feb 2017 18:44:31 -0500 |
| parents | |
| children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/consensus.py Thu Feb 02 18:44:31 2017 -0500 @@ -0,0 +1,117 @@ +#!/usr/bin/env python +import os +import sys +import ctypes +import argparse + +script_dir = os.path.dirname(os.path.realpath(__file__)) +consensus = ctypes.cdll.LoadLibrary(os.path.join(script_dir, 'libconsensus.so')) +consensus.get_consensus.restype = ctypes.c_char_p +consensus.get_consensus_duplex.restype = ctypes.c_char_p +consensus.build_consensus_duplex_simple.restype = ctypes.c_char_p + +ARG_DEFAULTS = {'alignment':sys.stdin} +DESCRIPTION = "Get the consensus of a set of aligned sequences." + + +def make_argparser(): + parser = argparse.ArgumentParser(description=DESCRIPTION) + parser.set_defaults(**ARG_DEFAULTS) + parser.add_argument('alignment', type=argparse.FileType('r'), + help='The aligned sequences, in FASTA format (but no multi-line sequences).') + return parser + + +def main(argv): + parser = make_argparser() + args = parser.parse_args(argv[1:]) + sequences = [] + line_num = 0 + for line in args.alignment: + line_num += 1 + if line_num % 2 == 0: + sequences.append(line.rstrip('\r\n')) + cons = get_consensus(sequences) + print(cons) + + +# N.B.: The quality scores must be aligned with their accompanying sequences. +def get_consensus(align, quals=[], cons_thres=-1.0, qual_thres=' ', gapped=False): + cons_thres_c = ctypes.c_double(cons_thres) + qual_thres_c = ctypes.c_char(qual_thres) + n_seqs = len(align) + if gapped: + gapped_c = 1 + else: + gapped_c = 0 + assert not quals or len(quals) == n_seqs, 'Different number of sequences and quals.' + seq_len = None + for seq in (align + quals): + if seq_len is None: + seq_len = len(seq) + else: + assert seq_len == len(seq), ('All sequences in the alignment must be the same length: ' + '{}bp != {}bp.\nAlignment:\n{}'.format(seq_len, len(seq), + '\n'.join(align))) + align_c = (ctypes.c_char_p * n_seqs)() + for i, seq in enumerate(align): + align_c[i] = ctypes.c_char_p(seq) + quals_c = (ctypes.c_char_p * n_seqs)() + for i, qual in enumerate(quals): + quals_c[i] = ctypes.c_char_p(qual) + if not quals: + quals_c = 0 + return consensus.get_consensus(align_c, quals_c, n_seqs, seq_len, cons_thres_c, qual_thres_c, + gapped_c) + + +# N.B.: The quality scores must be aligned with their accompanying sequences. +def get_consensus_duplex(align1, align2, quals1=[], quals2=[], cons_thres=-1.0, qual_thres=' ', + method='iupac'): + assert method in ('iupac', 'freq') + cons_thres_c = ctypes.c_double(cons_thres) + qual_thres_c = ctypes.c_char(qual_thres) + n_seqs1 = len(align1) + n_seqs2 = len(align2) + assert (not quals1 and not quals2) or (quals1 and quals2) + assert not quals1 or len(quals1) == n_seqs1 + assert not quals2 or len(quals2) == n_seqs2 + seq_len = None + for seq in (align1 + align2 + quals1 + quals2): + if seq_len is None: + seq_len = len(seq) + else: + assert seq_len == len(seq), 'All sequences in the alignment must be the same length.' + align1_c = (ctypes.c_char_p * n_seqs1)() + for i, seq in enumerate(align1): + align1_c[i] = ctypes.c_char_p(seq) + align2_c = (ctypes.c_char_p * n_seqs1)() + for i, seq in enumerate(align2): + align2_c[i] = ctypes.c_char_p(seq) + quals1_c = (ctypes.c_char_p * n_seqs1)() + for i, seq in enumerate(quals1): + quals1_c[i] = ctypes.c_char_p(seq) + quals2_c = (ctypes.c_char_p * n_seqs1)() + for i, seq in enumerate(quals2): + quals2_c[i] = ctypes.c_char_p(seq) + if not quals1: + quals1_c = 0 + if not quals2: + quals2_c = 0 + return consensus.get_consensus_duplex(align1_c, align2_c, quals1_c, quals2_c, n_seqs1, n_seqs2, + seq_len, cons_thres_c, qual_thres_c, method) + + +def build_consensus_duplex_simple(cons1, cons2, gapped=False): + assert len(cons1) == len(cons2) + cons1_c = ctypes.c_char_p(cons1) + cons2_c = ctypes.c_char_p(cons2) + if gapped: + gapped_c = 1 + else: + gapped_c = 0 + return consensus.build_consensus_duplex_simple(cons1_c, cons2_c, gapped_c) + + +if __name__ == '__main__': + sys.exit(main(sys.argv))
