view fasta_merge_files_and_filter_unique_sequences.py @ 1:8593cac31de8 draft

planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/fasta_merge_files_and_filter_unique_sequences commit 240d1baaa04767c7d6ad6e36c854c2b54093e92f
author galaxyp
date Wed, 01 Feb 2017 13:24:03 -0500
parents 7ba52c5163f6
children 7892a1fd1648
line wrap: on
line source

#!/usr/bin/env python
import os
import sys

class Sequence:
    ''' Holds protein sequence information '''
    def __init__(self):
        self.header = ""
        self.sequence = ""

class FASTAReader:
    """
        FASTA db iterator. Returns a single FASTA sequence object.
    """
    def __init__(self, fasta_name):
        self.fasta_file = open(fasta_name)

    def __iter__(self):
        return self

    def __next__(self):
        ''' Iteration '''
        while True:
            line = self.fasta_file.readline()
            if not line:
                raise StopIteration
            if line[0] == '>':
                break

        seq = Sequence()
        seq.header = line.rstrip().replace('\n','').replace('\r','')

        while True:
            tail = self.fasta_file.tell()
            line = self.fasta_file.readline()
            if not line:
                break
            if line[0] == '>':
                self.fasta_file.seek(tail)
                break
            seq.sequence = seq.sequence + line.rstrip().replace('\n','').replace('\r','')
        return seq

    # Python 2/3 compat
    next = __next__


def main():
    seen_sequences = dict([])
    seen_headers = set([])

    out_file = open(sys.argv[1], 'w')
    if sys.argv[2] == "sequence":
        unique_sequences = True
    elif sys.argv[2] == "accession":
        unique_sequences = False
    else:
        sys.exit("2nd argument must be 'sequence' or 'accession'")

    for fasta_file in sys.argv[3:]:
        print("Reading entries from '%s'" % fasta_file)
        fa_reader = FASTAReader(fasta_file)
        for protein in fa_reader:
            if unique_sequences:
                if protein.header in seen_headers:
                    print("Skipping protein '%s' with duplicate header" % protein.header)
                    continue
                elif protein.sequence in seen_sequences:
                    print("Skipping protein '%s' with duplicate sequence (first seen as '%s')" % (protein.header, seen_sequences[protein.sequence]))
                    continue
                else:
                    seen_sequences[protein.sequence] = protein.header
                    seen_headers.add(protein.header)
            else:
                if protein.header in seen_headers:
                    print("Skipping protein '%s' with duplicate header" % protein.header)
                    continue
                else:
                    seen_headers.add(protein.header)
            out_file.write(protein.header)
            out_file.write(os.linesep)
            out_file.write(protein.sequence)
            out_file.write(os.linesep)
    out_file.close()

if __name__ == "__main__":
    main()