Mercurial > repos > nick > duplex
comparison consensus.py @ 18:e4d75f9efb90 draft
planemo upload commit b'4303231da9e48b2719b4429a29b72421d24310f4\n'-dirty
| author | nick |
|---|---|
| date | Thu, 02 Feb 2017 18:44:31 -0500 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 17:836fa4fe9494 | 18:e4d75f9efb90 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 import os | |
| 3 import sys | |
| 4 import ctypes | |
| 5 import argparse | |
| 6 | |
| 7 script_dir = os.path.dirname(os.path.realpath(__file__)) | |
| 8 consensus = ctypes.cdll.LoadLibrary(os.path.join(script_dir, 'libconsensus.so')) | |
| 9 consensus.get_consensus.restype = ctypes.c_char_p | |
| 10 consensus.get_consensus_duplex.restype = ctypes.c_char_p | |
| 11 consensus.build_consensus_duplex_simple.restype = ctypes.c_char_p | |
| 12 | |
| 13 ARG_DEFAULTS = {'alignment':sys.stdin} | |
| 14 DESCRIPTION = "Get the consensus of a set of aligned sequences." | |
| 15 | |
| 16 | |
| 17 def make_argparser(): | |
| 18 parser = argparse.ArgumentParser(description=DESCRIPTION) | |
| 19 parser.set_defaults(**ARG_DEFAULTS) | |
| 20 parser.add_argument('alignment', type=argparse.FileType('r'), | |
| 21 help='The aligned sequences, in FASTA format (but no multi-line sequences).') | |
| 22 return parser | |
| 23 | |
| 24 | |
| 25 def main(argv): | |
| 26 parser = make_argparser() | |
| 27 args = parser.parse_args(argv[1:]) | |
| 28 sequences = [] | |
| 29 line_num = 0 | |
| 30 for line in args.alignment: | |
| 31 line_num += 1 | |
| 32 if line_num % 2 == 0: | |
| 33 sequences.append(line.rstrip('\r\n')) | |
| 34 cons = get_consensus(sequences) | |
| 35 print(cons) | |
| 36 | |
| 37 | |
| 38 # N.B.: The quality scores must be aligned with their accompanying sequences. | |
| 39 def get_consensus(align, quals=[], cons_thres=-1.0, qual_thres=' ', gapped=False): | |
| 40 cons_thres_c = ctypes.c_double(cons_thres) | |
| 41 qual_thres_c = ctypes.c_char(qual_thres) | |
| 42 n_seqs = len(align) | |
| 43 if gapped: | |
| 44 gapped_c = 1 | |
| 45 else: | |
| 46 gapped_c = 0 | |
| 47 assert not quals or len(quals) == n_seqs, 'Different number of sequences and quals.' | |
| 48 seq_len = None | |
| 49 for seq in (align + quals): | |
| 50 if seq_len is None: | |
| 51 seq_len = len(seq) | |
| 52 else: | |
| 53 assert seq_len == len(seq), ('All sequences in the alignment must be the same length: ' | |
| 54 '{}bp != {}bp.\nAlignment:\n{}'.format(seq_len, len(seq), | |
| 55 '\n'.join(align))) | |
| 56 align_c = (ctypes.c_char_p * n_seqs)() | |
| 57 for i, seq in enumerate(align): | |
| 58 align_c[i] = ctypes.c_char_p(seq) | |
| 59 quals_c = (ctypes.c_char_p * n_seqs)() | |
| 60 for i, qual in enumerate(quals): | |
| 61 quals_c[i] = ctypes.c_char_p(qual) | |
| 62 if not quals: | |
| 63 quals_c = 0 | |
| 64 return consensus.get_consensus(align_c, quals_c, n_seqs, seq_len, cons_thres_c, qual_thres_c, | |
| 65 gapped_c) | |
| 66 | |
| 67 | |
| 68 # N.B.: The quality scores must be aligned with their accompanying sequences. | |
| 69 def get_consensus_duplex(align1, align2, quals1=[], quals2=[], cons_thres=-1.0, qual_thres=' ', | |
| 70 method='iupac'): | |
| 71 assert method in ('iupac', 'freq') | |
| 72 cons_thres_c = ctypes.c_double(cons_thres) | |
| 73 qual_thres_c = ctypes.c_char(qual_thres) | |
| 74 n_seqs1 = len(align1) | |
| 75 n_seqs2 = len(align2) | |
| 76 assert (not quals1 and not quals2) or (quals1 and quals2) | |
| 77 assert not quals1 or len(quals1) == n_seqs1 | |
| 78 assert not quals2 or len(quals2) == n_seqs2 | |
| 79 seq_len = None | |
| 80 for seq in (align1 + align2 + quals1 + quals2): | |
| 81 if seq_len is None: | |
| 82 seq_len = len(seq) | |
| 83 else: | |
| 84 assert seq_len == len(seq), 'All sequences in the alignment must be the same length.' | |
| 85 align1_c = (ctypes.c_char_p * n_seqs1)() | |
| 86 for i, seq in enumerate(align1): | |
| 87 align1_c[i] = ctypes.c_char_p(seq) | |
| 88 align2_c = (ctypes.c_char_p * n_seqs1)() | |
| 89 for i, seq in enumerate(align2): | |
| 90 align2_c[i] = ctypes.c_char_p(seq) | |
| 91 quals1_c = (ctypes.c_char_p * n_seqs1)() | |
| 92 for i, seq in enumerate(quals1): | |
| 93 quals1_c[i] = ctypes.c_char_p(seq) | |
| 94 quals2_c = (ctypes.c_char_p * n_seqs1)() | |
| 95 for i, seq in enumerate(quals2): | |
| 96 quals2_c[i] = ctypes.c_char_p(seq) | |
| 97 if not quals1: | |
| 98 quals1_c = 0 | |
| 99 if not quals2: | |
| 100 quals2_c = 0 | |
| 101 return consensus.get_consensus_duplex(align1_c, align2_c, quals1_c, quals2_c, n_seqs1, n_seqs2, | |
| 102 seq_len, cons_thres_c, qual_thres_c, method) | |
| 103 | |
| 104 | |
| 105 def build_consensus_duplex_simple(cons1, cons2, gapped=False): | |
| 106 assert len(cons1) == len(cons2) | |
| 107 cons1_c = ctypes.c_char_p(cons1) | |
| 108 cons2_c = ctypes.c_char_p(cons2) | |
| 109 if gapped: | |
| 110 gapped_c = 1 | |
| 111 else: | |
| 112 gapped_c = 0 | |
| 113 return consensus.build_consensus_duplex_simple(cons1_c, cons2_c, gapped_c) | |
| 114 | |
| 115 | |
| 116 if __name__ == '__main__': | |
| 117 sys.exit(main(sys.argv)) |
