| 4 | 1 # Kenneth Sabir | 
|  | 2 # Garvan Institute | 
|  | 3 import sys | 
|  | 4 import time | 
|  | 5 import os | 
|  | 6 import math | 
|  | 7 import subprocess | 
|  | 8 import string | 
|  | 9 import shutil | 
|  | 10 import pickle | 
|  | 11 import io | 
|  | 12 from multiprocessing import Process | 
|  | 13 from galaxy_utils.sequence.fastq import fastqReader, fastqVerboseErrorReader, fastqAggregator, fastqWriter | 
|  | 14 | 
|  | 15 | 
|  | 16 | 
|  | 17 def main(): | 
|  | 18     split_program = "split" | 
|  | 19     cat_program = "cat" | 
|  | 20     input_filename =  sys.argv[1] | 
|  | 21     output_filename = sys.argv[3] | 
|  | 22     number_of_processes = 1; | 
|  | 23     if (len(sys.argv) > 7): | 
|  | 24         number_of_processes = int(sys.argv[7]) | 
|  | 25     file_prefix = "temp_groomer_part_" | 
|  | 26 | 
|  | 27     t1 = time.time() | 
|  | 28     old_path = os.getcwd() | 
|  | 29 | 
|  | 30     lines_per_process,number_of_lines = calculate_lines_per_process(input_filename, number_of_processes) | 
|  | 31     temp_dir_name = move_to_temp_dir() | 
|  | 32     sequences = number_of_lines/4; | 
|  | 33     args = [split_program, "-l"+str(lines_per_process), input_filename, file_prefix] | 
|  | 34 #    print "The args are: " , args | 
|  | 35     subprocess.call(args) | 
|  | 36 #    print "Finished" | 
|  | 37     file_count = 0; | 
|  | 38     keep_checking = True | 
|  | 39     processes = [] | 
|  | 40     output_filenames = [] | 
|  | 41     while keep_checking: | 
|  | 42 | 
|  | 43         # only need to support 26x26 different processes, so do it brute force (ie not in a loop) for 2 chars. | 
|  | 44         lastchar = string.letters[file_count % len(string.letters)] | 
|  | 45         firstchar = string.letters[(file_count / len(string.letters)) % len(string.letters)] | 
|  | 46         temp_input_filename = "%s%c%c" % (file_prefix, firstchar, lastchar) | 
|  | 47 | 
|  | 48  #       print 'looking for ' + temp_input_filename | 
|  | 49         if os.path.exists(temp_input_filename): | 
|  | 50 #            print 'found ' + temp_input_filename | 
|  | 51             temp_output_filename = temp_input_filename + "_output" | 
|  | 52             output_filenames.append(temp_output_filename) | 
|  | 53             p = Process(target=partition, args=([temp_input_filename, temp_output_filename, file_count])) | 
|  | 54             p.start() | 
|  | 55             processes.append(p) | 
|  | 56             file_count = file_count + 1 | 
|  | 57         else: | 
|  | 58             break | 
|  | 59     for p in processes : | 
|  | 60         p.join() | 
|  | 61     cat_params = [cat_program] | 
|  | 62     cat_params.extend(output_filenames) | 
|  | 63     with open(output_filename, 'w') as catOutputFile: | 
|  | 64         subprocess.call(cat_params, stdout=catOutputFile) | 
|  | 65     summarize_input = sys.argv[6] == 'summarize_input' | 
|  | 66     input_type = sys.argv[2] | 
|  | 67     output_type = sys.argv[4] | 
|  | 68     print "Groomed %i %s reads into %s reads." % ( sequences, input_type, output_type ) | 
|  | 69 | 
|  | 70     aggregators = [] | 
|  | 71     if summarize_input: | 
|  | 72         for temp_output_filename in output_filenames : | 
|  | 73             with open(temp_output_filename + "_summary", 'r') as summaryLogFile: | 
|  | 74                 temp_aggregator = pickle.load(summaryLogFile) | 
|  | 75                 aggregators.append(temp_aggregator) | 
|  | 76 | 
|  | 77         print_aggregators(aggregators) | 
|  | 78     os.chdir(old_path) | 
|  | 79     shutil.rmtree(temp_dir_name) | 
|  | 80     time2 = time.time() | 
|  | 81     print 'Groomer took: %0.3f ms using %d processes' % (((time2 - t1)*1000.0), number_of_processes) | 
|  | 82 | 
|  | 83 def calculate_lines_per_process(input_filename, number_of_processes): | 
|  | 84     wc_program = "wc" | 
|  | 85     p = subprocess.Popen([wc_program, "-l", input_filename], stdout=subprocess.PIPE) | 
|  | 86     out, err = p.communicate() | 
|  | 87     number_of_lines = int(string.split(string.lstrip(out), ' ', 1)[0]) | 
|  | 88     exact_lines_per_process = number_of_lines * 1.0 / number_of_processes | 
|  | 89     lines_per_process = int(math.ceil((exact_lines_per_process / 4.0))) * 4 | 
|  | 90     return lines_per_process,number_of_lines | 
|  | 91 | 
|  | 92 def move_to_temp_dir(): | 
|  | 93     dirExists = False; | 
|  | 94     dir_name = None | 
|  | 95 | 
|  | 96     while not dirExists: | 
|  | 97         dir_name = "temp_groomer_part_" + str(time.time()) | 
|  | 98         if not os.path.exists(dir_name): | 
|  | 99             os.makedirs(dir_name) | 
|  | 100             break; | 
|  | 101     os.chdir(dir_name) | 
|  | 102     return dir_name | 
|  | 103 | 
|  | 104 def print_aggregators(aggregators): | 
|  | 105     total_ascii_range = [None, None] | 
|  | 106     total_decimal_range = [None, None] | 
|  | 107     total_valid_formats = set() | 
|  | 108     for aggregator in aggregators: | 
|  | 109 #        print "This aggregators valid formats are: " + str(aggregator.get_valid_formats()) | 
|  | 110         total_valid_formats = total_valid_formats.union(set(aggregator.get_valid_formats())) | 
|  | 111         ascii_range = aggregator.get_ascii_range() | 
|  | 112         decimal_range =  aggregator.get_decimal_range() | 
|  | 113 | 
|  | 114         if total_ascii_range[0] is None: | 
|  | 115             total_ascii_range[0] = ascii_range[0] | 
|  | 116         else: | 
|  | 117             total_ascii_range[0] = min (total_ascii_range[0], ascii_range[0]) | 
|  | 118 | 
|  | 119         # max of None and a value is the value | 
|  | 120         total_ascii_range[1] = max (total_ascii_range[1], ascii_range[1]) | 
|  | 121         if total_decimal_range[0] is None: | 
|  | 122             total_decimal_range[0] = decimal_range[0] | 
|  | 123         else: | 
|  | 124             total_decimal_range[0] = min (total_decimal_range[0], decimal_range[0]) | 
|  | 125         # max of None and a value is the value | 
|  | 126         total_decimal_range[1] = max (total_decimal_range[1], decimal_range[1]) | 
|  | 127     print "total_valid_formats= " + str(total_valid_formats) | 
|  | 128     print "Based upon quality and sequence, the input data is valid for: %s" % ( ", ".join( total_valid_formats )  or "None" ) | 
|  | 129     print "Input ASCII range: %s(%i) - %s(%i)" % ( repr( total_ascii_range[0] ), ord( total_ascii_range[0] ), repr( total_ascii_range[1] ), ord( total_ascii_range[1] ) ) #print using repr, since \x00 (null) causes info truncation in galaxy when printed | 
|  | 130     print "Input decimal range: %i - %i" % ( total_decimal_range[0], total_decimal_range[1] ) | 
|  | 131 | 
|  | 132 | 
|  | 133 def partition(input_filename, temp_output_filename, fileCount): | 
|  | 134 #    print 'Starting Thread: ' + str(fileCount) | 
|  | 135 #    input_filename = sys.argv[1] | 
|  | 136     input_type = sys.argv[2] | 
|  | 137     output_type = sys.argv[4] | 
|  | 138     force_quality_encoding = sys.argv[5] | 
|  | 139     summarize_input = sys.argv[6] == 'summarize_input' | 
|  | 140     if force_quality_encoding == 'None': | 
|  | 141         force_quality_encoding = None | 
|  | 142     aggregator = fastqAggregator() | 
|  | 143     temp_process_file = fastqWriter( open( temp_output_filename, 'wb'), format = output_type, force_quality_encoding = force_quality_encoding ) | 
|  | 144     read_count = None | 
|  | 145     if summarize_input: | 
|  | 146         reader = fastqVerboseErrorReader | 
|  | 147     else: | 
|  | 148         reader = fastqReader | 
|  | 149     for read_count, fastq_read in enumerate( reader( open(input_filename, 'rb'), format = input_type, apply_galaxy_conventions = True ) ): | 
|  | 150         if summarize_input: | 
|  | 151             aggregator.consume_read( fastq_read ) | 
|  | 152         temp_process_file.write( fastq_read ) | 
|  | 153 #        print "Just wrote (%d): " % read_count + str(fastq_read) | 
|  | 154     temp_process_file.close() | 
|  | 155     if read_count is not None: | 
|  | 156         if input_type != output_type and 'solexa' in [ input_type, output_type ]: | 
|  | 157             print "Converted between Solexa and PHRED scores." | 
|  | 158         if summarize_input: | 
|  | 159             with open(temp_output_filename + "_summary", 'w') as summaryLogFile : | 
|  | 160                 pickle.dump(aggregator, summaryLogFile) | 
|  | 161     else: | 
|  | 162         print "No valid FASTQ reads were provided." | 
|  | 163 | 
|  | 164 if __name__ == "__main__": main() |