Mercurial > repos > bgruening > eden_cluster_motif_finder
view eden_iterative_motif_finder.py @ 1:f8111c9f037d draft
Uploaded
author | bgruening |
---|---|
date | Thu, 12 Jun 2014 11:38:08 -0400 |
parents | 95a776023fbc |
children |
line wrap: on
line source
#!/usr/bin/env python import logging import argparse import sys import os import subprocess from Bio import SeqIO import random import tempfile import shlex import shutil import logging from eden_wrapper import EDeNWrapper from eden_utilities import log from eden_utilities import create_params SEQ_POS_FILE="pos.seq" SEQ_NEG_FILE="neg.seq" SEQ_FILE="dat.seq" TARGET_FILE="dat.target" SCORE_FILE="pos.score" def SolveMaximumSubarrayProblem(vec): begin_temp = begin = end = 0 start_val=vec[0] max_ending_here = max_so_far = start_val for pos,x in enumerate(vec[1:],1): if max_ending_here < 0: max_ending_here=x begin_temp=pos else: max_ending_here=max_ending_here+x if max_ending_here > max_so_far: max_so_far=max_ending_here begin=begin_temp end=pos return begin,end def ComputeIteratedMaximumSubarray(seq_list, score_list, min_motif_size): motif_list=[] assert len(seq_list) == len(score_list) for seq,score in zip(seq_list, score_list): while 1: #find (begin,end) of motif in each element begin,end=SolveMaximumSubarrayProblem(score) if end-begin < min_motif_size: break else: #extract subsequence motif=seq[begin:end+1] #print motif, begin, end, seq, score motif_item=[motif, begin, end, seq] motif_list.append(motif_item) #remove motif by zeoring importance values motif_size=end+1-begin assert motif_size == len(motif) score[begin:end+1]=[0.0]*motif_size #iterate return motif_list def ComputeAndOutputIteratedMaximumSubarray(seq_file, score_file, min_subarray_size, opath): """ Description and unit test """ #import the sequence file seq_list=[] with open(seq_file, 'r') as f: for line in f: seq_list.append(line) #import the score file score_list=[] with open(score_file, 'r') as f: for line in f: str_vec=line.split() int_vec = map(float, str_vec) score_list.append(int_vec) #extract for each sequence the motif motif_list=ComputeIteratedMaximumSubarray(seq_list, score_list, min_subarray_size) #output to file for motif, begin, end, seq in motif_list: opath.write('{}\t{}\t{}\t{}'.format( motif, begin, end, seq)) def IsRNA(seq): if 'U' in seq.upper(): return True else: return False def ShuffleSequence(seq, order =1): nucleotides = [str(seq)[nucl:nucl+order] for nucl in range(0,len(seq)-order+1,order)] if order != 1: start = len(seq)%order if start != 0 : nucleotides.append(str(seq)[-start:]) random.shuffle(nucleotides) shuffled_seq=''.join(nucleotides) return shuffled_seq def NormaliseSequenceToDNA(seq): dna=seq if IsRNA(seq): dna=seq.back_transcribe() normdna=dna.upper() return normdna def MakePositiveSequenceFile(fasta_file, output_dir): seq_pos_file_path = os.path.join(output_dir, SEQ_POS_FILE) seq_pos_handle=open(seq_pos_file_path, 'w+') pos_counter=0 with open(fasta_file, 'r') as f: for record in SeqIO.parse(f, 'fasta'): dna=NormaliseSequenceToDNA(record.seq) seq_pos_handle.write("%s\n" % dna) pos_counter+=1 seq_pos_handle.close() log.info( "Created %s positive sequences in file: %s" % (pos_counter, seq_pos_file_path) ) return seq_pos_file_path def MakeNegativeSequenceFile(sequence_file, output_dir, negative_repeat_factor, max_shuffle_order): seq_neg_file_path = os.path.join(output_dir, SEQ_NEG_FILE) seq_neg_handle=open(seq_neg_file_path, 'w+') neg_counter=0 with open(sequence_file, 'r') as file_handle: for seq in file_handle: seq = seq.strip() for rep in range(negative_repeat_factor): for order in range(1,max_shuffle_order): sdna=ShuffleSequence(seq,order) seq_neg_handle.write("%s\n" % sdna) neg_counter+=1 seq_neg_handle.close() log.info( "Created %s negative sequences in file: %s" % (neg_counter, seq_neg_file_path) ) return seq_neg_file_path def MakeDataFile(fasta_file, negative_repeat_factor, max_shuffle_order): output_dir=tempfile.mkdtemp() seq_pos_file_path=MakePositiveSequenceFile(fasta_file, output_dir) seq_neg_file_path=MakeNegativeSequenceFile(seq_pos_file_path, output_dir, negative_repeat_factor, max_shuffle_order) dat_file_path = os.path.join(output_dir, SEQ_FILE) target_file_path = os.path.join(output_dir, TARGET_FILE) merged_seq_file_path = open(dat_file_path, 'wb') shutil.copyfileobj(open(seq_pos_file_path,'rb'), merged_seq_file_path) shutil.copyfileobj(open(seq_neg_file_path,'rb'), merged_seq_file_path) merged_seq_file_path.close() with open(target_file_path, 'w+') as target_handle: for line in open(seq_pos_file_path): target_handle.write('1\n') for line in open(seq_neg_file_path): target_handle.write('-1\n') return {'output_dir': output_dir, 'seq_pos_file_path': seq_pos_file_path, 'seq_neg_file_path': seq_neg_file_path, 'dat_file_path':dat_file_path, 'target_file_path':target_file_path} def MakeScoreFile(output_dir): score_file_path = os.path.join(output_dir, SCORE_FILE) score_file = open( score_file_path, 'w+') counter_old = -1 values = list() for line in open(os.path.join( output_dir, 'prediction_part' )): counter, vertex_id, value = line.strip().split() counter = int(counter) if counter != counter_old : if values : score_file.write(' '.join(values) + '\n') values = list() values.append(value) counter_old = counter if values : score_file.write(' '.join(values) + '\n') score_file.close() log.info( "Score file in: %s" % score_file_path ) return score_file_path def main(args): #create data file, i.e. positive normalised sequences and negative shuffled sequences param_dict = MakeDataFile(args.fasta_file, args.neg_rep, args.max_order) param_dict.update({'radius':args.radius}) param_dict.update({'distance':args.distance}) #train a model eden=EDeNWrapper(param_dict) eden.Train() #test eden.param.update({'dat_file_path':param_dict['seq_pos_file_path']}) eden.TestPart() #extract scores score_file_path=MakeScoreFile(param_dict['output_dir']) ComputeAndOutputIteratedMaximumSubarray( param_dict['seq_pos_file_path'], score_file_path, args.min_motif_size, args.output_file_path ) #cleanup shutil.rmtree(param_dict['output_dir']) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Extract motifs patterns with EDeN.') parser = create_params(parser, 'eden_iterative_motif_finder') args = parser.parse_args() main(args)