Mercurial > repos > kevyin > fastq_groomer_parallel
comparison fastq_groomer_parallel.py @ 4:28a0718eb9e7
Uploaded
| author | kevyin |
|---|---|
| date | Thu, 13 Sep 2012 04:33:58 -0400 |
| parents | |
| children | 4983170729cc |
comparison
equal
deleted
inserted
replaced
| 3:abdba8109a85 | 4:28a0718eb9e7 |
|---|---|
| 1 # Kenneth Sabir | |
| 2 # Garvan Institute | |
| 3 | |
| 4 # Copyright (c) 2012, Kenneth Sabir | |
| 5 # All rights reserved. | |
| 6 # | |
| 7 # Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: | |
| 8 # | |
| 9 # Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. | |
| 10 # Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. | |
| 11 # Neither the name of the Garvan Institute nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. | |
| 12 # | |
| 13 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
| 14 | |
| 15 import sys | |
| 16 import time | |
| 17 import os | |
| 18 import math | |
| 19 import subprocess | |
| 20 import string | |
| 21 import shutil | |
| 22 import pickle | |
| 23 import io | |
| 24 from multiprocessing import Process | |
| 25 from galaxy_utils.sequence.fastq import fastqReader, fastqVerboseErrorReader, fastqAggregator, fastqWriter | |
| 26 | |
| 27 | |
| 28 | |
| 29 def main(): | |
| 30 split_program = "split" | |
| 31 cat_program = "cat" | |
| 32 input_filename = sys.argv[1] | |
| 33 output_filename = sys.argv[3] | |
| 34 number_of_processes = 1; | |
| 35 if (len(sys.argv) > 7): | |
| 36 number_of_processes = int(sys.argv[7]) | |
| 37 file_prefix = "temp_groomer_part_" | |
| 38 | |
| 39 t1 = time.time() | |
| 40 old_path = os.getcwd() | |
| 41 | |
| 42 lines_per_process,number_of_lines = calculate_lines_per_process(input_filename, number_of_processes) | |
| 43 temp_dir_name = move_to_temp_dir() | |
| 44 sequences = number_of_lines/4; | |
| 45 args = [split_program, "-l"+str(lines_per_process), input_filename, file_prefix] | |
| 46 # print "The args are: " , args | |
| 47 subprocess.call(args) | |
| 48 # print "Finished" | |
| 49 file_count = 0; | |
| 50 keep_checking = True | |
| 51 processes = [] | |
| 52 output_filenames = [] | |
| 53 while keep_checking: | |
| 54 | |
| 55 # only need to support 26x26 different processes, so do it brute force (ie not in a loop) for 2 chars. | |
| 56 lastchar = string.letters[file_count % len(string.letters)] | |
| 57 firstchar = string.letters[(file_count / len(string.letters)) % len(string.letters)] | |
| 58 temp_input_filename = "%s%c%c" % (file_prefix, firstchar, lastchar) | |
| 59 | |
| 60 # print 'looking for ' + temp_input_filename | |
| 61 if os.path.exists(temp_input_filename): | |
| 62 # print 'found ' + temp_input_filename | |
| 63 temp_output_filename = temp_input_filename + "_output" | |
| 64 output_filenames.append(temp_output_filename) | |
| 65 p = Process(target=partition, args=([temp_input_filename, temp_output_filename, file_count])) | |
| 66 p.start() | |
| 67 processes.append(p) | |
| 68 file_count = file_count + 1 | |
| 69 else: | |
| 70 break | |
| 71 for p in processes : | |
| 72 p.join() | |
| 73 cat_params = [cat_program] | |
| 74 cat_params.extend(output_filenames) | |
| 75 with open(output_filename, 'w') as catOutputFile: | |
| 76 subprocess.call(cat_params, stdout=catOutputFile) | |
| 77 summarize_input = sys.argv[6] == 'summarize_input' | |
| 78 input_type = sys.argv[2] | |
| 79 output_type = sys.argv[4] | |
| 80 print "Groomed %i %s reads into %s reads." % ( sequences, input_type, output_type ) | |
| 81 | |
| 82 aggregators = [] | |
| 83 if summarize_input: | |
| 84 for temp_output_filename in output_filenames : | |
| 85 with open(temp_output_filename + "_summary", 'r') as summaryLogFile: | |
| 86 temp_aggregator = pickle.load(summaryLogFile) | |
| 87 aggregators.append(temp_aggregator) | |
| 88 | |
| 89 print_aggregators(aggregators) | |
| 90 os.chdir(old_path) | |
| 91 shutil.rmtree(temp_dir_name) | |
| 92 time2 = time.time() | |
| 93 print 'Groomer took: %0.3f ms using %d processes' % (((time2 - t1)*1000.0), number_of_processes) | |
| 94 | |
| 95 def calculate_lines_per_process(input_filename, number_of_processes): | |
| 96 wc_program = "wc" | |
| 97 p = subprocess.Popen([wc_program, "-l", input_filename], stdout=subprocess.PIPE) | |
| 98 out, err = p.communicate() | |
| 99 number_of_lines = int(string.split(string.lstrip(out), ' ', 1)[0]) | |
| 100 exact_lines_per_process = number_of_lines * 1.0 / number_of_processes | |
| 101 lines_per_process = int(math.ceil((exact_lines_per_process / 4.0))) * 4 | |
| 102 return lines_per_process,number_of_lines | |
| 103 | |
| 104 def move_to_temp_dir(): | |
| 105 dirExists = False; | |
| 106 dir_name = None | |
| 107 | |
| 108 while not dirExists: | |
| 109 dir_name = "temp_groomer_part_" + str(time.time()) | |
| 110 if not os.path.exists(dir_name): | |
| 111 os.makedirs(dir_name) | |
| 112 break; | |
| 113 os.chdir(dir_name) | |
| 114 return dir_name | |
| 115 | |
| 116 def print_aggregators(aggregators): | |
| 117 total_ascii_range = [None, None] | |
| 118 total_decimal_range = [None, None] | |
| 119 total_valid_formats = set() | |
| 120 for aggregator in aggregators: | |
| 121 # print "This aggregators valid formats are: " + str(aggregator.get_valid_formats()) | |
| 122 total_valid_formats = total_valid_formats.union(set(aggregator.get_valid_formats())) | |
| 123 ascii_range = aggregator.get_ascii_range() | |
| 124 decimal_range = aggregator.get_decimal_range() | |
| 125 | |
| 126 if total_ascii_range[0] is None: | |
| 127 total_ascii_range[0] = ascii_range[0] | |
| 128 else: | |
| 129 total_ascii_range[0] = min (total_ascii_range[0], ascii_range[0]) | |
| 130 | |
| 131 # max of None and a value is the value | |
| 132 total_ascii_range[1] = max (total_ascii_range[1], ascii_range[1]) | |
| 133 if total_decimal_range[0] is None: | |
| 134 total_decimal_range[0] = decimal_range[0] | |
| 135 else: | |
| 136 total_decimal_range[0] = min (total_decimal_range[0], decimal_range[0]) | |
| 137 # max of None and a value is the value | |
| 138 total_decimal_range[1] = max (total_decimal_range[1], decimal_range[1]) | |
| 139 print "total_valid_formats= " + str(total_valid_formats) | |
| 140 print "Based upon quality and sequence, the input data is valid for: %s" % ( ", ".join( total_valid_formats ) or "None" ) | |
| 141 print "Input ASCII range: %s(%i) - %s(%i)" % ( repr( total_ascii_range[0] ), ord( total_ascii_range[0] ), repr( total_ascii_range[1] ), ord( total_ascii_range[1] ) ) #print using repr, since \x00 (null) causes info truncation in galaxy when printed | |
| 142 print "Input decimal range: %i - %i" % ( total_decimal_range[0], total_decimal_range[1] ) | |
| 143 | |
| 144 | |
| 145 def partition(input_filename, temp_output_filename, fileCount): | |
| 146 # print 'Starting Thread: ' + str(fileCount) | |
| 147 # input_filename = sys.argv[1] | |
| 148 input_type = sys.argv[2] | |
| 149 output_type = sys.argv[4] | |
| 150 force_quality_encoding = sys.argv[5] | |
| 151 summarize_input = sys.argv[6] == 'summarize_input' | |
| 152 if force_quality_encoding == 'None': | |
| 153 force_quality_encoding = None | |
| 154 aggregator = fastqAggregator() | |
| 155 temp_process_file = fastqWriter( open( temp_output_filename, 'wb'), format = output_type, force_quality_encoding = force_quality_encoding ) | |
| 156 read_count = None | |
| 157 if summarize_input: | |
| 158 reader = fastqVerboseErrorReader | |
| 159 else: | |
| 160 reader = fastqReader | |
| 161 for read_count, fastq_read in enumerate( reader( open(input_filename, 'rb'), format = input_type, apply_galaxy_conventions = True ) ): | |
| 162 if summarize_input: | |
| 163 aggregator.consume_read( fastq_read ) | |
| 164 temp_process_file.write( fastq_read ) | |
| 165 # print "Just wrote (%d): " % read_count + str(fastq_read) | |
| 166 temp_process_file.close() | |
| 167 if read_count is not None: | |
| 168 if input_type != output_type and 'solexa' in [ input_type, output_type ]: | |
| 169 print "Converted between Solexa and PHRED scores." | |
| 170 if summarize_input: | |
| 171 with open(temp_output_filename + "_summary", 'w') as summaryLogFile : | |
| 172 pickle.dump(aggregator, summaryLogFile) | |
| 173 else: | |
| 174 print "No valid FASTQ reads were provided." | |
| 175 | |
| 176 if __name__ == "__main__": main() |
