view eden_iterative_motif_finder.py @ 0:95a776023fbc draft

Uploaded
author bgruening
date Thu, 12 Jun 2014 11:35:21 -0400
parents
children
line wrap: on
line source

#!/usr/bin/env python
import logging
import argparse
import sys
import os
import subprocess
from Bio import SeqIO
import random
import tempfile
import shlex
import shutil
import logging
from  eden_wrapper import EDeNWrapper
from eden_utilities import log
from eden_utilities import create_params


SEQ_POS_FILE="pos.seq"
SEQ_NEG_FILE="neg.seq"
SEQ_FILE="dat.seq"
TARGET_FILE="dat.target"
SCORE_FILE="pos.score"

def SolveMaximumSubarrayProblem(vec):
    begin_temp = begin = end = 0
    start_val=vec[0]
    max_ending_here = max_so_far = start_val
    for pos,x in enumerate(vec[1:],1):
        if max_ending_here < 0:
            max_ending_here=x
            begin_temp=pos
        else:
            max_ending_here=max_ending_here+x
        if max_ending_here > max_so_far:
            max_so_far=max_ending_here
            begin=begin_temp
            end=pos
    return begin,end

def ComputeIteratedMaximumSubarray(seq_list, score_list, min_motif_size):
    motif_list=[]
    assert len(seq_list) == len(score_list)
    for seq,score in zip(seq_list, score_list):
        while 1:
            #find (begin,end) of motif in each element
            begin,end=SolveMaximumSubarrayProblem(score)
            if end-begin < min_motif_size:
                break
            else:            
                #extract subsequence
                motif=seq[begin:end+1]
                #print motif, begin, end, seq, score
                motif_item=[motif, begin, end, seq]
                motif_list.append(motif_item)
                #remove motif by zeoring importance values
                motif_size=end+1-begin
                assert motif_size == len(motif)
                score[begin:end+1]=[0.0]*motif_size
                #iterate
    return motif_list

def ComputeAndOutputIteratedMaximumSubarray(seq_file, score_file, min_subarray_size, opath):
    """
    Description and unit test
    """
    #import the sequence file
    seq_list=[]
    with open(seq_file, 'r') as f:
        for line in f:
            seq_list.append(line) 
    #import the score file        
    score_list=[]
    with open(score_file, 'r') as f:
        for line in f:
            str_vec=line.split()
            int_vec = map(float, str_vec)
            score_list.append(int_vec) 

    #extract for each sequence the motif
    motif_list=ComputeIteratedMaximumSubarray(seq_list, score_list, min_subarray_size)

    #output to file
    for motif, begin, end, seq in motif_list:
        opath.write('{}\t{}\t{}\t{}'.format( motif, begin, end, seq))

def IsRNA(seq):
    if 'U' in seq.upper():
        return True
    else:
        return False

def ShuffleSequence(seq, order =1):
    nucleotides = [str(seq)[nucl:nucl+order] for nucl in range(0,len(seq)-order+1,order)]
    if order != 1:
        start = len(seq)%order
        if start != 0 :
            nucleotides.append(str(seq)[-start:])
    random.shuffle(nucleotides)
    shuffled_seq=''.join(nucleotides)
    return shuffled_seq

def NormaliseSequenceToDNA(seq):
    dna=seq
    if IsRNA(seq):
        dna=seq.back_transcribe()
    normdna=dna.upper()
    return normdna

def MakePositiveSequenceFile(fasta_file, output_dir):
    seq_pos_file_path = os.path.join(output_dir, SEQ_POS_FILE)
    seq_pos_handle=open(seq_pos_file_path, 'w+')
    pos_counter=0
    with open(fasta_file, 'r') as f:
        for record in SeqIO.parse(f, 'fasta'):
            dna=NormaliseSequenceToDNA(record.seq)
            seq_pos_handle.write("%s\n" % dna)
            pos_counter+=1
    seq_pos_handle.close()
    log.info( "Created %s positive sequences in file: %s" % (pos_counter, seq_pos_file_path) )
    return seq_pos_file_path

def MakeNegativeSequenceFile(sequence_file, output_dir, negative_repeat_factor, max_shuffle_order):
    seq_neg_file_path = os.path.join(output_dir, SEQ_NEG_FILE)
    seq_neg_handle=open(seq_neg_file_path, 'w+')
    neg_counter=0
    with open(sequence_file, 'r') as file_handle:
        for seq in file_handle:
            seq = seq.strip()
            for rep in range(negative_repeat_factor):
                for order in range(1,max_shuffle_order):
                    sdna=ShuffleSequence(seq,order)
                    seq_neg_handle.write("%s\n" % sdna)
                    neg_counter+=1
    seq_neg_handle.close()
    log.info( "Created %s negative sequences in file: %s" % (neg_counter, seq_neg_file_path) )
    return seq_neg_file_path

def MakeDataFile(fasta_file, negative_repeat_factor, max_shuffle_order):
    output_dir=tempfile.mkdtemp()
    
    seq_pos_file_path=MakePositiveSequenceFile(fasta_file, output_dir)
    seq_neg_file_path=MakeNegativeSequenceFile(seq_pos_file_path, output_dir, negative_repeat_factor, max_shuffle_order)
    
    dat_file_path = os.path.join(output_dir, SEQ_FILE)
    target_file_path = os.path.join(output_dir, TARGET_FILE)
    
    merged_seq_file_path = open(dat_file_path, 'wb')
    shutil.copyfileobj(open(seq_pos_file_path,'rb'), merged_seq_file_path)
    shutil.copyfileobj(open(seq_neg_file_path,'rb'), merged_seq_file_path)
    merged_seq_file_path.close()
    
    with open(target_file_path, 'w+') as target_handle:
        for line in open(seq_pos_file_path):
            target_handle.write('1\n')
        for line in open(seq_neg_file_path):
            target_handle.write('-1\n')
     
    return {'output_dir': output_dir, 
            'seq_pos_file_path': seq_pos_file_path,
            'seq_neg_file_path': seq_neg_file_path,
            'dat_file_path':dat_file_path, 
            'target_file_path':target_file_path}

def MakeScoreFile(output_dir):    
    score_file_path = os.path.join(output_dir, SCORE_FILE)
    score_file = open( score_file_path, 'w+')
    counter_old = -1
    values = list()
    for line in open(os.path.join( output_dir, 'prediction_part' )):
        counter, vertex_id, value = line.strip().split()
        counter = int(counter)
        if counter != counter_old :
            if values :
                score_file.write(' '.join(values) + '\n')
            values = list()
        values.append(value)
        counter_old = counter
    if values :
        score_file.write(' '.join(values) + '\n')
    score_file.close()
    log.info( "Score file in: %s" % score_file_path )
    return score_file_path
    
def main(args):
    #create data file, i.e. positive normalised sequences and negative shuffled sequences
    param_dict = MakeDataFile(args.fasta_file,
                       args.neg_rep,
                       args.max_order)
    param_dict.update({'radius':args.radius})
    param_dict.update({'distance':args.distance})
    #train a model     
    eden=EDeNWrapper(param_dict)
    eden.Train()
    #test
    eden.param.update({'dat_file_path':param_dict['seq_pos_file_path']})    
    eden.TestPart()
    
    #extract scores
    score_file_path=MakeScoreFile(param_dict['output_dir'])
    
    ComputeAndOutputIteratedMaximumSubarray( param_dict['seq_pos_file_path'], 
                                                     score_file_path, 
                                                     args.min_motif_size, 
                                                     args.output_file_path )
    #cleanup
    shutil.rmtree(param_dict['output_dir'])
    



if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Extract motifs patterns with EDeN.')
    parser = create_params(parser, 'eden_iterative_motif_finder')
    args = parser.parse_args()
    main(args)