annotate fastq_groomer_parallel.py @ 4:28a0718eb9e7

Uploaded
author kevyin
date Thu, 13 Sep 2012 04:33:58 -0400
parents
children 4983170729cc
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
4
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
1 # Kenneth Sabir
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
2 # Garvan Institute
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
3
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
4 # Copyright (c) 2012, Kenneth Sabir
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
5 # All rights reserved.
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
6 #
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
7 # Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
8 #
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
9 # Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
10 # Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
11 # Neither the name of the Garvan Institute nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
12 #
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
13 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
14
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
15 import sys
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
16 import time
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
17 import os
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
18 import math
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
19 import subprocess
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
20 import string
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
21 import shutil
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
22 import pickle
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
23 import io
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
24 from multiprocessing import Process
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
25 from galaxy_utils.sequence.fastq import fastqReader, fastqVerboseErrorReader, fastqAggregator, fastqWriter
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
26
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
27
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
28
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
29 def main():
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
30 split_program = "split"
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
31 cat_program = "cat"
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
32 input_filename = sys.argv[1]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
33 output_filename = sys.argv[3]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
34 number_of_processes = 1;
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
35 if (len(sys.argv) > 7):
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
36 number_of_processes = int(sys.argv[7])
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
37 file_prefix = "temp_groomer_part_"
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
38
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
39 t1 = time.time()
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
40 old_path = os.getcwd()
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
41
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
42 lines_per_process,number_of_lines = calculate_lines_per_process(input_filename, number_of_processes)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
43 temp_dir_name = move_to_temp_dir()
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
44 sequences = number_of_lines/4;
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
45 args = [split_program, "-l"+str(lines_per_process), input_filename, file_prefix]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
46 # print "The args are: " , args
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
47 subprocess.call(args)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
48 # print "Finished"
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
49 file_count = 0;
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
50 keep_checking = True
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
51 processes = []
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
52 output_filenames = []
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
53 while keep_checking:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
54
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
55 # only need to support 26x26 different processes, so do it brute force (ie not in a loop) for 2 chars.
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
56 lastchar = string.letters[file_count % len(string.letters)]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
57 firstchar = string.letters[(file_count / len(string.letters)) % len(string.letters)]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
58 temp_input_filename = "%s%c%c" % (file_prefix, firstchar, lastchar)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
59
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
60 # print 'looking for ' + temp_input_filename
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
61 if os.path.exists(temp_input_filename):
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
62 # print 'found ' + temp_input_filename
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
63 temp_output_filename = temp_input_filename + "_output"
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
64 output_filenames.append(temp_output_filename)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
65 p = Process(target=partition, args=([temp_input_filename, temp_output_filename, file_count]))
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
66 p.start()
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
67 processes.append(p)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
68 file_count = file_count + 1
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
69 else:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
70 break
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
71 for p in processes :
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
72 p.join()
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
73 cat_params = [cat_program]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
74 cat_params.extend(output_filenames)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
75 with open(output_filename, 'w') as catOutputFile:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
76 subprocess.call(cat_params, stdout=catOutputFile)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
77 summarize_input = sys.argv[6] == 'summarize_input'
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
78 input_type = sys.argv[2]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
79 output_type = sys.argv[4]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
80 print "Groomed %i %s reads into %s reads." % ( sequences, input_type, output_type )
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
81
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
82 aggregators = []
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
83 if summarize_input:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
84 for temp_output_filename in output_filenames :
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
85 with open(temp_output_filename + "_summary", 'r') as summaryLogFile:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
86 temp_aggregator = pickle.load(summaryLogFile)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
87 aggregators.append(temp_aggregator)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
88
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
89 print_aggregators(aggregators)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
90 os.chdir(old_path)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
91 shutil.rmtree(temp_dir_name)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
92 time2 = time.time()
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
93 print 'Groomer took: %0.3f ms using %d processes' % (((time2 - t1)*1000.0), number_of_processes)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
94
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
95 def calculate_lines_per_process(input_filename, number_of_processes):
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
96 wc_program = "wc"
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
97 p = subprocess.Popen([wc_program, "-l", input_filename], stdout=subprocess.PIPE)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
98 out, err = p.communicate()
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
99 number_of_lines = int(string.split(string.lstrip(out), ' ', 1)[0])
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
100 exact_lines_per_process = number_of_lines * 1.0 / number_of_processes
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
101 lines_per_process = int(math.ceil((exact_lines_per_process / 4.0))) * 4
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
102 return lines_per_process,number_of_lines
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
103
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
104 def move_to_temp_dir():
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
105 dirExists = False;
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
106 dir_name = None
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
107
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
108 while not dirExists:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
109 dir_name = "temp_groomer_part_" + str(time.time())
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
110 if not os.path.exists(dir_name):
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
111 os.makedirs(dir_name)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
112 break;
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
113 os.chdir(dir_name)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
114 return dir_name
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
115
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
116 def print_aggregators(aggregators):
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
117 total_ascii_range = [None, None]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
118 total_decimal_range = [None, None]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
119 total_valid_formats = set()
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
120 for aggregator in aggregators:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
121 # print "This aggregators valid formats are: " + str(aggregator.get_valid_formats())
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
122 total_valid_formats = total_valid_formats.union(set(aggregator.get_valid_formats()))
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
123 ascii_range = aggregator.get_ascii_range()
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
124 decimal_range = aggregator.get_decimal_range()
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
125
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
126 if total_ascii_range[0] is None:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
127 total_ascii_range[0] = ascii_range[0]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
128 else:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
129 total_ascii_range[0] = min (total_ascii_range[0], ascii_range[0])
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
130
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
131 # max of None and a value is the value
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
132 total_ascii_range[1] = max (total_ascii_range[1], ascii_range[1])
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
133 if total_decimal_range[0] is None:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
134 total_decimal_range[0] = decimal_range[0]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
135 else:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
136 total_decimal_range[0] = min (total_decimal_range[0], decimal_range[0])
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
137 # max of None and a value is the value
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
138 total_decimal_range[1] = max (total_decimal_range[1], decimal_range[1])
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
139 print "total_valid_formats= " + str(total_valid_formats)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
140 print "Based upon quality and sequence, the input data is valid for: %s" % ( ", ".join( total_valid_formats ) or "None" )
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
141 print "Input ASCII range: %s(%i) - %s(%i)" % ( repr( total_ascii_range[0] ), ord( total_ascii_range[0] ), repr( total_ascii_range[1] ), ord( total_ascii_range[1] ) ) #print using repr, since \x00 (null) causes info truncation in galaxy when printed
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
142 print "Input decimal range: %i - %i" % ( total_decimal_range[0], total_decimal_range[1] )
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
143
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
144
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
145 def partition(input_filename, temp_output_filename, fileCount):
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
146 # print 'Starting Thread: ' + str(fileCount)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
147 # input_filename = sys.argv[1]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
148 input_type = sys.argv[2]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
149 output_type = sys.argv[4]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
150 force_quality_encoding = sys.argv[5]
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
151 summarize_input = sys.argv[6] == 'summarize_input'
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
152 if force_quality_encoding == 'None':
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
153 force_quality_encoding = None
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
154 aggregator = fastqAggregator()
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
155 temp_process_file = fastqWriter( open( temp_output_filename, 'wb'), format = output_type, force_quality_encoding = force_quality_encoding )
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
156 read_count = None
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
157 if summarize_input:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
158 reader = fastqVerboseErrorReader
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
159 else:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
160 reader = fastqReader
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
161 for read_count, fastq_read in enumerate( reader( open(input_filename, 'rb'), format = input_type, apply_galaxy_conventions = True ) ):
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
162 if summarize_input:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
163 aggregator.consume_read( fastq_read )
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
164 temp_process_file.write( fastq_read )
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
165 # print "Just wrote (%d): " % read_count + str(fastq_read)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
166 temp_process_file.close()
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
167 if read_count is not None:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
168 if input_type != output_type and 'solexa' in [ input_type, output_type ]:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
169 print "Converted between Solexa and PHRED scores."
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
170 if summarize_input:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
171 with open(temp_output_filename + "_summary", 'w') as summaryLogFile :
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
172 pickle.dump(aggregator, summaryLogFile)
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
173 else:
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
174 print "No valid FASTQ reads were provided."
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
175
28a0718eb9e7 Uploaded
kevyin
parents:
diff changeset
176 if __name__ == "__main__": main()