comparison fastq_groomer_parallel.py @ 4:28a0718eb9e7

Uploaded
author kevyin
date Thu, 13 Sep 2012 04:33:58 -0400
parents
children 4983170729cc
comparison
equal deleted inserted replaced
3:abdba8109a85 4:28a0718eb9e7
1 # Kenneth Sabir
2 # Garvan Institute
3
4 # Copyright (c) 2012, Kenneth Sabir
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
8 #
9 # Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
10 # Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
11 # Neither the name of the Garvan Institute nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
12 #
13 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
14
15 import sys
16 import time
17 import os
18 import math
19 import subprocess
20 import string
21 import shutil
22 import pickle
23 import io
24 from multiprocessing import Process
25 from galaxy_utils.sequence.fastq import fastqReader, fastqVerboseErrorReader, fastqAggregator, fastqWriter
26
27
28
29 def main():
30 split_program = "split"
31 cat_program = "cat"
32 input_filename = sys.argv[1]
33 output_filename = sys.argv[3]
34 number_of_processes = 1;
35 if (len(sys.argv) > 7):
36 number_of_processes = int(sys.argv[7])
37 file_prefix = "temp_groomer_part_"
38
39 t1 = time.time()
40 old_path = os.getcwd()
41
42 lines_per_process,number_of_lines = calculate_lines_per_process(input_filename, number_of_processes)
43 temp_dir_name = move_to_temp_dir()
44 sequences = number_of_lines/4;
45 args = [split_program, "-l"+str(lines_per_process), input_filename, file_prefix]
46 # print "The args are: " , args
47 subprocess.call(args)
48 # print "Finished"
49 file_count = 0;
50 keep_checking = True
51 processes = []
52 output_filenames = []
53 while keep_checking:
54
55 # only need to support 26x26 different processes, so do it brute force (ie not in a loop) for 2 chars.
56 lastchar = string.letters[file_count % len(string.letters)]
57 firstchar = string.letters[(file_count / len(string.letters)) % len(string.letters)]
58 temp_input_filename = "%s%c%c" % (file_prefix, firstchar, lastchar)
59
60 # print 'looking for ' + temp_input_filename
61 if os.path.exists(temp_input_filename):
62 # print 'found ' + temp_input_filename
63 temp_output_filename = temp_input_filename + "_output"
64 output_filenames.append(temp_output_filename)
65 p = Process(target=partition, args=([temp_input_filename, temp_output_filename, file_count]))
66 p.start()
67 processes.append(p)
68 file_count = file_count + 1
69 else:
70 break
71 for p in processes :
72 p.join()
73 cat_params = [cat_program]
74 cat_params.extend(output_filenames)
75 with open(output_filename, 'w') as catOutputFile:
76 subprocess.call(cat_params, stdout=catOutputFile)
77 summarize_input = sys.argv[6] == 'summarize_input'
78 input_type = sys.argv[2]
79 output_type = sys.argv[4]
80 print "Groomed %i %s reads into %s reads." % ( sequences, input_type, output_type )
81
82 aggregators = []
83 if summarize_input:
84 for temp_output_filename in output_filenames :
85 with open(temp_output_filename + "_summary", 'r') as summaryLogFile:
86 temp_aggregator = pickle.load(summaryLogFile)
87 aggregators.append(temp_aggregator)
88
89 print_aggregators(aggregators)
90 os.chdir(old_path)
91 shutil.rmtree(temp_dir_name)
92 time2 = time.time()
93 print 'Groomer took: %0.3f ms using %d processes' % (((time2 - t1)*1000.0), number_of_processes)
94
95 def calculate_lines_per_process(input_filename, number_of_processes):
96 wc_program = "wc"
97 p = subprocess.Popen([wc_program, "-l", input_filename], stdout=subprocess.PIPE)
98 out, err = p.communicate()
99 number_of_lines = int(string.split(string.lstrip(out), ' ', 1)[0])
100 exact_lines_per_process = number_of_lines * 1.0 / number_of_processes
101 lines_per_process = int(math.ceil((exact_lines_per_process / 4.0))) * 4
102 return lines_per_process,number_of_lines
103
104 def move_to_temp_dir():
105 dirExists = False;
106 dir_name = None
107
108 while not dirExists:
109 dir_name = "temp_groomer_part_" + str(time.time())
110 if not os.path.exists(dir_name):
111 os.makedirs(dir_name)
112 break;
113 os.chdir(dir_name)
114 return dir_name
115
116 def print_aggregators(aggregators):
117 total_ascii_range = [None, None]
118 total_decimal_range = [None, None]
119 total_valid_formats = set()
120 for aggregator in aggregators:
121 # print "This aggregators valid formats are: " + str(aggregator.get_valid_formats())
122 total_valid_formats = total_valid_formats.union(set(aggregator.get_valid_formats()))
123 ascii_range = aggregator.get_ascii_range()
124 decimal_range = aggregator.get_decimal_range()
125
126 if total_ascii_range[0] is None:
127 total_ascii_range[0] = ascii_range[0]
128 else:
129 total_ascii_range[0] = min (total_ascii_range[0], ascii_range[0])
130
131 # max of None and a value is the value
132 total_ascii_range[1] = max (total_ascii_range[1], ascii_range[1])
133 if total_decimal_range[0] is None:
134 total_decimal_range[0] = decimal_range[0]
135 else:
136 total_decimal_range[0] = min (total_decimal_range[0], decimal_range[0])
137 # max of None and a value is the value
138 total_decimal_range[1] = max (total_decimal_range[1], decimal_range[1])
139 print "total_valid_formats= " + str(total_valid_formats)
140 print "Based upon quality and sequence, the input data is valid for: %s" % ( ", ".join( total_valid_formats ) or "None" )
141 print "Input ASCII range: %s(%i) - %s(%i)" % ( repr( total_ascii_range[0] ), ord( total_ascii_range[0] ), repr( total_ascii_range[1] ), ord( total_ascii_range[1] ) ) #print using repr, since \x00 (null) causes info truncation in galaxy when printed
142 print "Input decimal range: %i - %i" % ( total_decimal_range[0], total_decimal_range[1] )
143
144
145 def partition(input_filename, temp_output_filename, fileCount):
146 # print 'Starting Thread: ' + str(fileCount)
147 # input_filename = sys.argv[1]
148 input_type = sys.argv[2]
149 output_type = sys.argv[4]
150 force_quality_encoding = sys.argv[5]
151 summarize_input = sys.argv[6] == 'summarize_input'
152 if force_quality_encoding == 'None':
153 force_quality_encoding = None
154 aggregator = fastqAggregator()
155 temp_process_file = fastqWriter( open( temp_output_filename, 'wb'), format = output_type, force_quality_encoding = force_quality_encoding )
156 read_count = None
157 if summarize_input:
158 reader = fastqVerboseErrorReader
159 else:
160 reader = fastqReader
161 for read_count, fastq_read in enumerate( reader( open(input_filename, 'rb'), format = input_type, apply_galaxy_conventions = True ) ):
162 if summarize_input:
163 aggregator.consume_read( fastq_read )
164 temp_process_file.write( fastq_read )
165 # print "Just wrote (%d): " % read_count + str(fastq_read)
166 temp_process_file.close()
167 if read_count is not None:
168 if input_type != output_type and 'solexa' in [ input_type, output_type ]:
169 print "Converted between Solexa and PHRED scores."
170 if summarize_input:
171 with open(temp_output_filename + "_summary", 'w') as summaryLogFile :
172 pickle.dump(aggregator, summaryLogFile)
173 else:
174 print "No valid FASTQ reads were provided."
175
176 if __name__ == "__main__": main()