|
2
|
1 #Dan Blankenberg
|
|
|
2 import sys
|
|
|
3 import time
|
|
|
4 import os
|
|
|
5 import math
|
|
|
6 import subprocess
|
|
|
7 import string
|
|
|
8 import shutil
|
|
|
9 import pickle
|
|
|
10 import io
|
|
|
11 from multiprocessing import Process
|
|
|
12 from galaxy_utils.sequence.fastq import fastqReader, fastqVerboseErrorReader, fastqAggregator, fastqWriter
|
|
|
13
|
|
|
14
|
|
|
15
|
|
|
16 def main():
|
|
|
17 split_program = "split"
|
|
|
18 cat_program = "cat"
|
|
|
19 input_filename = sys.argv[1]
|
|
|
20 output_filename = sys.argv[3]
|
|
|
21 number_of_processes = 1;
|
|
|
22 if (len(sys.argv) > 7):
|
|
|
23 number_of_processes = int(sys.argv[7])
|
|
|
24 file_prefix = "temp_groomer_part_"
|
|
|
25
|
|
|
26 t1 = time.time()
|
|
|
27 old_path = os.getcwd()
|
|
|
28
|
|
|
29 lines_per_process,number_of_lines = calculate_lines_per_process(input_filename, number_of_processes)
|
|
|
30 temp_dir_name = move_to_temp_dir()
|
|
|
31 sequences = number_of_lines/4;
|
|
|
32 args = [split_program, "-l"+str(lines_per_process), input_filename, file_prefix]
|
|
|
33 # print "The args are: " , args
|
|
|
34 subprocess.call(args)
|
|
|
35 # print "Finished"
|
|
|
36 file_count = 0;
|
|
|
37 keep_checking = True
|
|
|
38 processes = []
|
|
|
39 output_filenames = []
|
|
|
40 while keep_checking:
|
|
|
41
|
|
|
42 # only need to support 26x26 different processes, so do it brute force (ie not in a loop) for 2 chars.
|
|
|
43 lastchar = string.letters[file_count % len(string.letters)]
|
|
|
44 firstchar = string.letters[(file_count / len(string.letters)) % len(string.letters)]
|
|
|
45 temp_input_filename = "%s%c%c" % (file_prefix, firstchar, lastchar)
|
|
|
46
|
|
|
47 # print 'looking for ' + temp_input_filename
|
|
|
48 if os.path.exists(temp_input_filename):
|
|
|
49 # print 'found ' + temp_input_filename
|
|
|
50 temp_output_filename = temp_input_filename + "_output"
|
|
|
51 output_filenames.append(temp_output_filename)
|
|
|
52 p = Process(target=partition, args=([temp_input_filename, temp_output_filename, file_count]))
|
|
|
53 p.start()
|
|
|
54 processes.append(p)
|
|
|
55 file_count = file_count + 1
|
|
|
56 else:
|
|
|
57 break
|
|
|
58 for p in processes :
|
|
|
59 p.join()
|
|
|
60 cat_params = [cat_program]
|
|
|
61 cat_params.extend(output_filenames)
|
|
|
62 with open(output_filename, 'w') as catOutputFile:
|
|
|
63 subprocess.call(cat_params, stdout=catOutputFile)
|
|
|
64 summarize_input = sys.argv[6] == 'summarize_input'
|
|
|
65 input_type = sys.argv[2]
|
|
|
66 output_type = sys.argv[4]
|
|
|
67 print "Groomed %i %s reads into %s reads." % ( sequences, input_type, output_type )
|
|
|
68
|
|
|
69 aggregators = []
|
|
|
70 if summarize_input:
|
|
|
71 for temp_output_filename in output_filenames :
|
|
|
72 with open(temp_output_filename + "_summary", 'r') as summaryLogFile:
|
|
|
73 temp_aggregator = pickle.load(summaryLogFile)
|
|
|
74 aggregators.append(temp_aggregator)
|
|
|
75
|
|
|
76 print_aggregators(aggregators)
|
|
|
77 os.chdir(old_path)
|
|
|
78 shutil.rmtree(temp_dir_name)
|
|
|
79 time2 = time.time()
|
|
|
80 print 'Groomer took: %0.3f ms using %d processes' % (((time2 - t1)*1000.0), number_of_processes)
|
|
|
81
|
|
|
82 def calculate_lines_per_process(input_filename, number_of_processes):
|
|
|
83 wc_program = "wc"
|
|
|
84 p = subprocess.Popen([wc_program, "-l", input_filename], stdout=subprocess.PIPE)
|
|
|
85 out, err = p.communicate()
|
|
|
86 number_of_lines = int(string.split(string.lstrip(out), ' ', 1)[0])
|
|
|
87 exact_lines_per_process = number_of_lines * 1.0 / number_of_processes
|
|
|
88 lines_per_process = int(math.ceil((exact_lines_per_process / 4.0))) * 4
|
|
|
89 return lines_per_process,number_of_lines
|
|
|
90
|
|
|
91 def move_to_temp_dir():
|
|
|
92 dirExists = False;
|
|
|
93 dir_name = None
|
|
|
94
|
|
|
95 while not dirExists:
|
|
|
96 dir_name = "temp_groomer_part_" + str(time.time())
|
|
|
97 if not os.path.exists(dir_name):
|
|
|
98 os.makedirs(dir_name)
|
|
|
99 break;
|
|
|
100 os.chdir(dir_name)
|
|
|
101 return dir_name
|
|
|
102
|
|
|
103 def print_aggregators(aggregators):
|
|
|
104 total_ascii_range = [None, None]
|
|
|
105 total_decimal_range = [None, None]
|
|
|
106 total_valid_formats = set()
|
|
|
107 for aggregator in aggregators:
|
|
|
108 # print "This aggregators valid formats are: " + str(aggregator.get_valid_formats())
|
|
|
109 total_valid_formats = total_valid_formats.union(set(aggregator.get_valid_formats()))
|
|
|
110 ascii_range = aggregator.get_ascii_range()
|
|
|
111 decimal_range = aggregator.get_decimal_range()
|
|
|
112
|
|
|
113 if total_ascii_range[0] is None:
|
|
|
114 total_ascii_range[0] = ascii_range[0]
|
|
|
115 else:
|
|
|
116 total_ascii_range[0] = min (total_ascii_range[0], ascii_range[0])
|
|
|
117
|
|
|
118 # max of None and a value is the value
|
|
|
119 total_ascii_range[1] = max (total_ascii_range[1], ascii_range[1])
|
|
|
120 if total_decimal_range[0] is None:
|
|
|
121 total_decimal_range[0] = decimal_range[0]
|
|
|
122 else:
|
|
|
123 total_decimal_range[0] = min (total_decimal_range[0], decimal_range[0])
|
|
|
124 # max of None and a value is the value
|
|
|
125 total_decimal_range[1] = max (total_decimal_range[1], decimal_range[1])
|
|
|
126 print "total_valid_formats= " + str(total_valid_formats)
|
|
|
127 print "Based upon quality and sequence, the input data is valid for: %s" % ( ", ".join( total_valid_formats ) or "None" )
|
|
|
128 print "Input ASCII range: %s(%i) - %s(%i)" % ( repr( total_ascii_range[0] ), ord( total_ascii_range[0] ), repr( total_ascii_range[1] ), ord( total_ascii_range[1] ) ) #print using repr, since \x00 (null) causes info truncation in galaxy when printed
|
|
|
129 print "Input decimal range: %i - %i" % ( total_decimal_range[0], total_decimal_range[1] )
|
|
|
130
|
|
|
131
|
|
|
132 def partition(input_filename, temp_output_filename, fileCount):
|
|
|
133 # print 'Starting Thread: ' + str(fileCount)
|
|
|
134 # input_filename = sys.argv[1]
|
|
|
135 input_type = sys.argv[2]
|
|
|
136 output_type = sys.argv[4]
|
|
|
137 force_quality_encoding = sys.argv[5]
|
|
|
138 summarize_input = sys.argv[6] == 'summarize_input'
|
|
|
139 if force_quality_encoding == 'None':
|
|
|
140 force_quality_encoding = None
|
|
|
141 aggregator = fastqAggregator()
|
|
|
142 temp_process_file = fastqWriter( open( temp_output_filename, 'wb'), format = output_type, force_quality_encoding = force_quality_encoding )
|
|
|
143 read_count = None
|
|
|
144 if summarize_input:
|
|
|
145 reader = fastqVerboseErrorReader
|
|
|
146 else:
|
|
|
147 reader = fastqReader
|
|
|
148 for read_count, fastq_read in enumerate( reader( open(input_filename, 'rb'), format = input_type, apply_galaxy_conventions = True ) ):
|
|
|
149 if summarize_input:
|
|
|
150 aggregator.consume_read( fastq_read )
|
|
|
151 temp_process_file.write( fastq_read )
|
|
|
152 # print "Just wrote (%d): " % read_count + str(fastq_read)
|
|
|
153 temp_process_file.close()
|
|
|
154 if read_count is not None:
|
|
|
155 if input_type != output_type and 'solexa' in [ input_type, output_type ]:
|
|
|
156 print "Converted between Solexa and PHRED scores."
|
|
|
157 if summarize_input:
|
|
|
158 with open(temp_output_filename + "_summary", 'w') as summaryLogFile :
|
|
|
159 pickle.dump(aggregator, summaryLogFile)
|
|
|
160 else:
|
|
|
161 print "No valid FASTQ reads were provided."
|
|
|
162
|
|
|
163 if __name__ == "__main__": main()
|