| 2 | 1 #Dan Blankenberg | 
|  | 2 import sys | 
|  | 3 import time | 
|  | 4 import os | 
|  | 5 import math | 
|  | 6 import subprocess | 
|  | 7 import string | 
|  | 8 import shutil | 
|  | 9 import pickle | 
|  | 10 import io | 
|  | 11 from multiprocessing import Process | 
|  | 12 from galaxy_utils.sequence.fastq import fastqReader, fastqVerboseErrorReader, fastqAggregator, fastqWriter | 
|  | 13 | 
|  | 14 | 
|  | 15 | 
|  | 16 def main(): | 
|  | 17     split_program = "split" | 
|  | 18     cat_program = "cat" | 
|  | 19     input_filename =  sys.argv[1] | 
|  | 20     output_filename = sys.argv[3] | 
|  | 21     number_of_processes = 1; | 
|  | 22     if (len(sys.argv) > 7): | 
|  | 23         number_of_processes = int(sys.argv[7]) | 
|  | 24     file_prefix = "temp_groomer_part_" | 
|  | 25 | 
|  | 26     t1 = time.time() | 
|  | 27     old_path = os.getcwd() | 
|  | 28 | 
|  | 29     lines_per_process,number_of_lines = calculate_lines_per_process(input_filename, number_of_processes) | 
|  | 30     temp_dir_name = move_to_temp_dir() | 
|  | 31     sequences = number_of_lines/4; | 
|  | 32     args = [split_program, "-l"+str(lines_per_process), input_filename, file_prefix] | 
|  | 33 #    print "The args are: " , args | 
|  | 34     subprocess.call(args) | 
|  | 35 #    print "Finished" | 
|  | 36     file_count = 0; | 
|  | 37     keep_checking = True | 
|  | 38     processes = [] | 
|  | 39     output_filenames = [] | 
|  | 40     while keep_checking: | 
|  | 41 | 
|  | 42         # only need to support 26x26 different processes, so do it brute force (ie not in a loop) for 2 chars. | 
|  | 43         lastchar = string.letters[file_count % len(string.letters)] | 
|  | 44         firstchar = string.letters[(file_count / len(string.letters)) % len(string.letters)] | 
|  | 45         temp_input_filename = "%s%c%c" % (file_prefix, firstchar, lastchar) | 
|  | 46 | 
|  | 47  #       print 'looking for ' + temp_input_filename | 
|  | 48         if os.path.exists(temp_input_filename): | 
|  | 49 #            print 'found ' + temp_input_filename | 
|  | 50             temp_output_filename = temp_input_filename + "_output" | 
|  | 51             output_filenames.append(temp_output_filename) | 
|  | 52             p = Process(target=partition, args=([temp_input_filename, temp_output_filename, file_count])) | 
|  | 53             p.start() | 
|  | 54             processes.append(p) | 
|  | 55             file_count = file_count + 1 | 
|  | 56         else: | 
|  | 57             break | 
|  | 58     for p in processes : | 
|  | 59         p.join() | 
|  | 60     cat_params = [cat_program] | 
|  | 61     cat_params.extend(output_filenames) | 
|  | 62     with open(output_filename, 'w') as catOutputFile: | 
|  | 63         subprocess.call(cat_params, stdout=catOutputFile) | 
|  | 64     summarize_input = sys.argv[6] == 'summarize_input' | 
|  | 65     input_type = sys.argv[2] | 
|  | 66     output_type = sys.argv[4] | 
|  | 67     print "Groomed %i %s reads into %s reads." % ( sequences, input_type, output_type ) | 
|  | 68 | 
|  | 69     aggregators = [] | 
|  | 70     if summarize_input: | 
|  | 71         for temp_output_filename in output_filenames : | 
|  | 72             with open(temp_output_filename + "_summary", 'r') as summaryLogFile: | 
|  | 73                 temp_aggregator = pickle.load(summaryLogFile) | 
|  | 74                 aggregators.append(temp_aggregator) | 
|  | 75 | 
|  | 76         print_aggregators(aggregators) | 
|  | 77     os.chdir(old_path) | 
|  | 78     shutil.rmtree(temp_dir_name) | 
|  | 79     time2 = time.time() | 
|  | 80     print 'Groomer took: %0.3f ms using %d processes' % (((time2 - t1)*1000.0), number_of_processes) | 
|  | 81 | 
|  | 82 def calculate_lines_per_process(input_filename, number_of_processes): | 
|  | 83     wc_program = "wc" | 
|  | 84     p = subprocess.Popen([wc_program, "-l", input_filename], stdout=subprocess.PIPE) | 
|  | 85     out, err = p.communicate() | 
|  | 86     number_of_lines = int(string.split(string.lstrip(out), ' ', 1)[0]) | 
|  | 87     exact_lines_per_process = number_of_lines * 1.0 / number_of_processes | 
|  | 88     lines_per_process = int(math.ceil((exact_lines_per_process / 4.0))) * 4 | 
|  | 89     return lines_per_process,number_of_lines | 
|  | 90 | 
|  | 91 def move_to_temp_dir(): | 
|  | 92     dirExists = False; | 
|  | 93     dir_name = None | 
|  | 94 | 
|  | 95     while not dirExists: | 
|  | 96         dir_name = "temp_groomer_part_" + str(time.time()) | 
|  | 97         if not os.path.exists(dir_name): | 
|  | 98             os.makedirs(dir_name) | 
|  | 99             break; | 
|  | 100     os.chdir(dir_name) | 
|  | 101     return dir_name | 
|  | 102 | 
|  | 103 def print_aggregators(aggregators): | 
|  | 104     total_ascii_range = [None, None] | 
|  | 105     total_decimal_range = [None, None] | 
|  | 106     total_valid_formats = set() | 
|  | 107     for aggregator in aggregators: | 
|  | 108 #        print "This aggregators valid formats are: " + str(aggregator.get_valid_formats()) | 
|  | 109         total_valid_formats = total_valid_formats.union(set(aggregator.get_valid_formats())) | 
|  | 110         ascii_range = aggregator.get_ascii_range() | 
|  | 111         decimal_range =  aggregator.get_decimal_range() | 
|  | 112 | 
|  | 113         if total_ascii_range[0] is None: | 
|  | 114             total_ascii_range[0] = ascii_range[0] | 
|  | 115         else: | 
|  | 116             total_ascii_range[0] = min (total_ascii_range[0], ascii_range[0]) | 
|  | 117 | 
|  | 118         # max of None and a value is the value | 
|  | 119         total_ascii_range[1] = max (total_ascii_range[1], ascii_range[1]) | 
|  | 120         if total_decimal_range[0] is None: | 
|  | 121             total_decimal_range[0] = decimal_range[0] | 
|  | 122         else: | 
|  | 123             total_decimal_range[0] = min (total_decimal_range[0], decimal_range[0]) | 
|  | 124         # max of None and a value is the value | 
|  | 125         total_decimal_range[1] = max (total_decimal_range[1], decimal_range[1]) | 
|  | 126     print "total_valid_formats= " + str(total_valid_formats) | 
|  | 127     print "Based upon quality and sequence, the input data is valid for: %s" % ( ", ".join( total_valid_formats )  or "None" ) | 
|  | 128     print "Input ASCII range: %s(%i) - %s(%i)" % ( repr( total_ascii_range[0] ), ord( total_ascii_range[0] ), repr( total_ascii_range[1] ), ord( total_ascii_range[1] ) ) #print using repr, since \x00 (null) causes info truncation in galaxy when printed | 
|  | 129     print "Input decimal range: %i - %i" % ( total_decimal_range[0], total_decimal_range[1] ) | 
|  | 130 | 
|  | 131 | 
|  | 132 def partition(input_filename, temp_output_filename, fileCount): | 
|  | 133 #    print 'Starting Thread: ' + str(fileCount) | 
|  | 134 #    input_filename = sys.argv[1] | 
|  | 135     input_type = sys.argv[2] | 
|  | 136     output_type = sys.argv[4] | 
|  | 137     force_quality_encoding = sys.argv[5] | 
|  | 138     summarize_input = sys.argv[6] == 'summarize_input' | 
|  | 139     if force_quality_encoding == 'None': | 
|  | 140         force_quality_encoding = None | 
|  | 141     aggregator = fastqAggregator() | 
|  | 142     temp_process_file = fastqWriter( open( temp_output_filename, 'wb'), format = output_type, force_quality_encoding = force_quality_encoding ) | 
|  | 143     read_count = None | 
|  | 144     if summarize_input: | 
|  | 145         reader = fastqVerboseErrorReader | 
|  | 146     else: | 
|  | 147         reader = fastqReader | 
|  | 148     for read_count, fastq_read in enumerate( reader( open(input_filename, 'rb'), format = input_type, apply_galaxy_conventions = True ) ): | 
|  | 149         if summarize_input: | 
|  | 150             aggregator.consume_read( fastq_read ) | 
|  | 151         temp_process_file.write( fastq_read ) | 
|  | 152 #        print "Just wrote (%d): " % read_count + str(fastq_read) | 
|  | 153     temp_process_file.close() | 
|  | 154     if read_count is not None: | 
|  | 155         if input_type != output_type and 'solexa' in [ input_type, output_type ]: | 
|  | 156             print "Converted between Solexa and PHRED scores." | 
|  | 157         if summarize_input: | 
|  | 158             with open(temp_output_filename + "_summary", 'w') as summaryLogFile : | 
|  | 159                 pickle.dump(aggregator, summaryLogFile) | 
|  | 160     else: | 
|  | 161         print "No valid FASTQ reads were provided." | 
|  | 162 | 
|  | 163 if __name__ == "__main__": main() |