Mercurial > repos > kevyin > fastq_groomer_parallel
comparison fastq_groomer_parallel.py @ 18:954a6dea0561 draft default tip
Uploaded
| author | kevyin |
|---|---|
| date | Sat, 04 Feb 2017 00:48:48 -0500 |
| parents | cc357aad2022 |
| children |
comparison
equal
deleted
inserted
replaced
| 17:cc357aad2022 | 18:954a6dea0561 |
|---|---|
| 1 # Kenneth Sabir | |
| 2 # Garvan Institute | |
| 3 import sys | |
| 4 import time | |
| 5 import os | |
| 6 import math | |
| 7 import subprocess | |
| 8 import string | |
| 9 import shutil | |
| 10 import pickle | |
| 11 import io | |
| 12 from multiprocessing import Process | |
| 13 from galaxy_utils.sequence.fastq import fastqReader, fastqVerboseErrorReader, fastqAggregator, fastqWriter | |
| 14 | |
| 15 | |
| 16 | |
| 17 def main(): | |
| 18 split_program = "split" | |
| 19 cat_program = "cat" | |
| 20 input_filename = sys.argv[1] | |
| 21 output_filename = sys.argv[3] | |
| 22 number_of_processes = 1; | |
| 23 if (len(sys.argv) > 7): | |
| 24 number_of_processes = int(sys.argv[7]) | |
| 25 file_prefix = "temp_groomer_part_" | |
| 26 | |
| 27 t1 = time.time() | |
| 28 old_path = os.getcwd() | |
| 29 | |
| 30 lines_per_process,number_of_lines = calculate_lines_per_process(input_filename, number_of_processes) | |
| 31 temp_dir_name = move_to_temp_dir() | |
| 32 sequences = number_of_lines/4; | |
| 33 args = [split_program, "-l"+str(lines_per_process), input_filename, file_prefix] | |
| 34 # print "The args are: " , args | |
| 35 subprocess.call(args) | |
| 36 # print "Finished" | |
| 37 file_count = 0; | |
| 38 keep_checking = True | |
| 39 processes = [] | |
| 40 output_filenames = [] | |
| 41 while keep_checking: | |
| 42 | |
| 43 # only need to support 26x26 different processes, so do it brute force (ie not in a loop) for 2 chars. | |
| 44 lastchar = string.letters[file_count % len(string.letters)] | |
| 45 firstchar = string.letters[(file_count / len(string.letters)) % len(string.letters)] | |
| 46 temp_input_filename = "%s%c%c" % (file_prefix, firstchar, lastchar) | |
| 47 | |
| 48 # print 'looking for ' + temp_input_filename | |
| 49 if os.path.exists(temp_input_filename): | |
| 50 # print 'found ' + temp_input_filename | |
| 51 temp_output_filename = temp_input_filename + "_output" | |
| 52 output_filenames.append(temp_output_filename) | |
| 53 p = Process(target=partition, args=([temp_input_filename, temp_output_filename, file_count])) | |
| 54 p.start() | |
| 55 processes.append(p) | |
| 56 file_count = file_count + 1 | |
| 57 else: | |
| 58 break | |
| 59 for p in processes : | |
| 60 p.join() | |
| 61 cat_params = [cat_program] | |
| 62 cat_params.extend(output_filenames) | |
| 63 with open(output_filename, 'w') as catOutputFile: | |
| 64 subprocess.call(cat_params, stdout=catOutputFile) | |
| 65 summarize_input = sys.argv[6] == 'summarize_input' | |
| 66 input_type = sys.argv[2] | |
| 67 output_type = sys.argv[4] | |
| 68 print "Groomed %i %s reads into %s reads." % ( sequences, input_type, output_type ) | |
| 69 | |
| 70 aggregators = [] | |
| 71 if summarize_input: | |
| 72 for temp_output_filename in output_filenames : | |
| 73 with open(temp_output_filename + "_summary", 'r') as summaryLogFile: | |
| 74 temp_aggregator = pickle.load(summaryLogFile) | |
| 75 aggregators.append(temp_aggregator) | |
| 76 | |
| 77 print_aggregators(aggregators) | |
| 78 os.chdir(old_path) | |
| 79 shutil.rmtree(temp_dir_name) | |
| 80 time2 = time.time() | |
| 81 print 'Groomer took: %0.3f ms using %d processes' % (((time2 - t1)*1000.0), number_of_processes) | |
| 82 | |
| 83 def calculate_lines_per_process(input_filename, number_of_processes): | |
| 84 wc_program = "wc" | |
| 85 p = subprocess.Popen([wc_program, "-l", input_filename], stdout=subprocess.PIPE) | |
| 86 out, err = p.communicate() | |
| 87 number_of_lines = int(string.split(string.lstrip(out), ' ', 1)[0]) | |
| 88 exact_lines_per_process = number_of_lines * 1.0 / number_of_processes | |
| 89 lines_per_process = int(math.ceil((exact_lines_per_process / 4.0))) * 4 | |
| 90 return lines_per_process,number_of_lines | |
| 91 | |
| 92 def move_to_temp_dir(): | |
| 93 dirExists = False; | |
| 94 dir_name = None | |
| 95 | |
| 96 while not dirExists: | |
| 97 dir_name = "temp_groomer_part_" + str(time.time()) | |
| 98 if not os.path.exists(dir_name): | |
| 99 os.makedirs(dir_name) | |
| 100 break; | |
| 101 os.chdir(dir_name) | |
| 102 return dir_name | |
| 103 | |
| 104 def print_aggregators(aggregators): | |
| 105 total_ascii_range = [None, None] | |
| 106 total_decimal_range = [None, None] | |
| 107 total_valid_formats = set() | |
| 108 for aggregator in aggregators: | |
| 109 # print "This aggregators valid formats are: " + str(aggregator.get_valid_formats()) | |
| 110 total_valid_formats = total_valid_formats.union(set(aggregator.get_valid_formats())) | |
| 111 ascii_range = aggregator.get_ascii_range() | |
| 112 decimal_range = aggregator.get_decimal_range() | |
| 113 | |
| 114 if total_ascii_range[0] is None: | |
| 115 total_ascii_range[0] = ascii_range[0] | |
| 116 else: | |
| 117 total_ascii_range[0] = min (total_ascii_range[0], ascii_range[0]) | |
| 118 | |
| 119 # max of None and a value is the value | |
| 120 total_ascii_range[1] = max (total_ascii_range[1], ascii_range[1]) | |
| 121 if total_decimal_range[0] is None: | |
| 122 total_decimal_range[0] = decimal_range[0] | |
| 123 else: | |
| 124 total_decimal_range[0] = min (total_decimal_range[0], decimal_range[0]) | |
| 125 # max of None and a value is the value | |
| 126 total_decimal_range[1] = max (total_decimal_range[1], decimal_range[1]) | |
| 127 print "total_valid_formats= " + str(total_valid_formats) | |
| 128 print "Based upon quality and sequence, the input data is valid for: %s" % ( ", ".join( total_valid_formats ) or "None" ) | |
| 129 print "Input ASCII range: %s(%i) - %s(%i)" % ( repr( total_ascii_range[0] ), ord( total_ascii_range[0] ), repr( total_ascii_range[1] ), ord( total_ascii_range[1] ) ) #print using repr, since \x00 (null) causes info truncation in galaxy when printed | |
| 130 print "Input decimal range: %i - %i" % ( total_decimal_range[0], total_decimal_range[1] ) | |
| 131 | |
| 132 | |
| 133 def partition(input_filename, temp_output_filename, fileCount): | |
| 134 # print 'Starting Thread: ' + str(fileCount) | |
| 135 # input_filename = sys.argv[1] | |
| 136 input_type = sys.argv[2] | |
| 137 output_type = sys.argv[4] | |
| 138 force_quality_encoding = sys.argv[5] | |
| 139 summarize_input = sys.argv[6] == 'summarize_input' | |
| 140 if force_quality_encoding == 'None': | |
| 141 force_quality_encoding = None | |
| 142 aggregator = fastqAggregator() | |
| 143 temp_process_file = fastqWriter( open( temp_output_filename, 'wb'), format = output_type, force_quality_encoding = force_quality_encoding ) | |
| 144 read_count = None | |
| 145 if summarize_input: | |
| 146 reader = fastqVerboseErrorReader | |
| 147 else: | |
| 148 reader = fastqReader | |
| 149 for read_count, fastq_read in enumerate( reader( open(input_filename, 'rb'), format = input_type, apply_galaxy_conventions = True ) ): | |
| 150 if summarize_input: | |
| 151 aggregator.consume_read( fastq_read ) | |
| 152 temp_process_file.write( fastq_read ) | |
| 153 # print "Just wrote (%d): " % read_count + str(fastq_read) | |
| 154 temp_process_file.close() | |
| 155 if read_count is not None: | |
| 156 if input_type != output_type and 'solexa' in [ input_type, output_type ]: | |
| 157 print "Converted between Solexa and PHRED scores." | |
| 158 if summarize_input: | |
| 159 with open(temp_output_filename + "_summary", 'w') as summaryLogFile : | |
| 160 pickle.dump(aggregator, summaryLogFile) | |
| 161 else: | |
| 162 print "No valid FASTQ reads were provided." | |
| 163 | |
| 164 if __name__ == "__main__": main() |
