Mercurial > repos > dave > test_repository
comparison jøin.py @ 5:b95d7c323cc9 draft default tip
Uploaded
| author | dave |
|---|---|
| date | Thu, 12 Jun 2014 10:33:20 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 4:46aee6903d8f | 5:b95d7c323cc9 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 #Dan Blankenberg | |
| 3 """ | |
| 4 Script to Join Two Files on specified columns. | |
| 5 | |
| 6 Takes two tab delimited files, two column numbers (base 1) and outputs a new tab delimited file with lines joined by tabs. | |
| 7 User can also opt to have have non-joining rows of file1 echoed. | |
| 8 | |
| 9 """ | |
| 10 | |
| 11 import optparse, os, sys, tempfile, struct | |
| 12 import psyco_full | |
| 13 | |
| 14 try: | |
| 15 simple_json_exception = None | |
| 16 from galaxy import eggs | |
| 17 from galaxy.util.bunch import Bunch | |
| 18 from galaxy.util import stringify_dictionary_keys | |
| 19 import pkg_resources | |
| 20 pkg_resources.require("simplejson") | |
| 21 import simplejson | |
| 22 except Exception, e: | |
| 23 simplejson_exception = e | |
| 24 simplejson = None | |
| 25 | |
| 26 | |
| 27 class OffsetList: | |
| 28 def __init__( self, filesize = 0, fmt = None ): | |
| 29 self.file = tempfile.NamedTemporaryFile( 'w+b' ) | |
| 30 if fmt: | |
| 31 self.fmt = fmt | |
| 32 elif filesize and filesize <= sys.maxint * 2: | |
| 33 self.fmt = 'I' | |
| 34 else: | |
| 35 self.fmt = 'Q' | |
| 36 self.fmt_size = struct.calcsize( self.fmt ) | |
| 37 @property | |
| 38 def size( self ): | |
| 39 self.file.flush() | |
| 40 return self.file_size / self.fmt_size | |
| 41 @property | |
| 42 def file_size( self ): | |
| 43 self.file.flush() | |
| 44 return os.stat( self.file.name ).st_size | |
| 45 def add_offset( self, offset ): | |
| 46 if not isinstance( offset, list ): | |
| 47 offset = [offset] | |
| 48 self.file.seek( self.file_size ) | |
| 49 for off in offset: | |
| 50 self.file.write( struct.pack( self.fmt, off ) ) | |
| 51 def get_offsets( self, start = 0 ): | |
| 52 self.file.seek( start * self.fmt_size ) | |
| 53 while True: | |
| 54 packed = self.file.read( self.fmt_size ) | |
| 55 if not packed: break | |
| 56 yield struct.unpack( self.fmt, packed )[0] | |
| 57 def get_offset_by_index( self, index ): | |
| 58 self.file.seek( index * self.fmt_size ) | |
| 59 return struct.unpack( self.fmt, self.file.read( self.fmt_size ) )[0] | |
| 60 def set_offset_at_index( self, index, offset ): | |
| 61 if not isinstance( offset, list ): | |
| 62 offset = [offset] | |
| 63 if index >= self.size: | |
| 64 self.add_offset( offset ) | |
| 65 else: | |
| 66 temp_file = tempfile.NamedTemporaryFile( 'w+b' ) | |
| 67 self.file.seek( 0 ) | |
| 68 temp_file.write( self.file.read( ( index ) * self.fmt_size ) ) | |
| 69 for off in offset: | |
| 70 temp_file.write( struct.pack( self.fmt, off ) ) | |
| 71 temp_file.write( self.file.read() ) | |
| 72 self.file = temp_file | |
| 73 | |
| 74 class SortedOffsets( OffsetList ): | |
| 75 def __init__( self, indexed_filename, column, split = None ): | |
| 76 OffsetList.__init__( self, os.stat( indexed_filename ).st_size ) | |
| 77 self.indexed_filename = indexed_filename | |
| 78 self.indexed_file = open( indexed_filename, 'rb' ) | |
| 79 self.column = column | |
| 80 self.split = split | |
| 81 self.last_identifier = None | |
| 82 self.last_identifier_merged = None | |
| 83 self.last_offset_merged = 0 | |
| 84 def merge_with_dict( self, new_offset_dict ): | |
| 85 if not new_offset_dict: return #no items to merge in | |
| 86 keys = new_offset_dict.keys() | |
| 87 keys.sort() | |
| 88 identifier2 = keys.pop( 0 ) | |
| 89 | |
| 90 result_offsets = OffsetList( fmt = self.fmt ) | |
| 91 offsets1 = enumerate( self.get_offsets() ) | |
| 92 try: | |
| 93 index1, offset1 = offsets1.next() | |
| 94 identifier1 = self.get_identifier_by_offset( offset1 ) | |
| 95 except StopIteration: | |
| 96 offset1 = None | |
| 97 identifier1 = None | |
| 98 index1 = 0 | |
| 99 | |
| 100 while True: | |
| 101 if identifier1 is None and identifier2 is None: | |
| 102 self.file = result_offsets.file #self is now merged results | |
| 103 return | |
| 104 elif identifier1 is None or ( identifier2 and identifier2 < identifier1 ): | |
| 105 result_offsets.add_offset( new_offset_dict[identifier2] ) | |
| 106 if keys: | |
| 107 identifier2 = keys.pop( 0 ) | |
| 108 else: | |
| 109 identifier2 = None | |
| 110 elif identifier2 is None: | |
| 111 result_offsets.file.seek( result_offsets.file_size ) | |
| 112 self.file.seek( index1 * self.fmt_size ) | |
| 113 result_offsets.file.write( self.file.read() ) | |
| 114 identifier1 = None | |
| 115 offset1 = None | |
| 116 else: | |
| 117 result_offsets.add_offset( offset1 ) | |
| 118 try: | |
| 119 index1, offset1 = offsets1.next() | |
| 120 identifier1 = self.get_identifier_by_offset( offset1 ) | |
| 121 except StopIteration: | |
| 122 offset1 = None | |
| 123 identifier1 = None | |
| 124 index1 += 1 | |
| 125 #methods to help link offsets to lines, ids, etc | |
| 126 def get_identifier_by_line( self, line ): | |
| 127 if isinstance( line, str ): | |
| 128 fields = line.rstrip( '\r\n' ).split( self.split ) | |
| 129 if self.column < len( fields ): | |
| 130 return fields[self.column] | |
| 131 return None | |
| 132 def get_line_by_offset( self, offset ): | |
| 133 self.indexed_file.seek( offset ) | |
| 134 return self.indexed_file.readline() | |
| 135 def get_identifier_by_offset( self, offset ): | |
| 136 return self.get_identifier_by_line( self.get_line_by_offset( offset ) ) | |
| 137 | |
| 138 #indexed set of offsets, index is built on demand | |
| 139 class OffsetIndex: | |
| 140 def __init__( self, filename, column, split = None, index_depth = 3 ): | |
| 141 self.filename = filename | |
| 142 self.file = open( filename, 'rb' ) | |
| 143 self.column = column | |
| 144 self.split = split | |
| 145 self._offsets = {} | |
| 146 self._index = None | |
| 147 self.index_depth = index_depth | |
| 148 def _build_index( self ): | |
| 149 self._index = {} | |
| 150 for start_char, sorted_offsets in self._offsets.items(): | |
| 151 self._index[start_char]={} | |
| 152 for i, offset in enumerate( sorted_offsets.get_offsets() ): | |
| 153 identifier = sorted_offsets.get_identifier_by_offset( offset ) | |
| 154 if identifier[0:self.index_depth] not in self._index[start_char]: | |
| 155 self._index[start_char][identifier[0:self.index_depth]] = i | |
| 156 def get_lines_by_identifier( self, identifier ): | |
| 157 if not identifier: return | |
| 158 #if index doesn't exist, build it | |
| 159 if self._index is None: self._build_index() | |
| 160 | |
| 161 #identifier cannot exist | |
| 162 if identifier[0] not in self._index or identifier[0:self.index_depth] not in self._index[identifier[0]]: | |
| 163 return | |
| 164 #identifier might exist, search for it | |
| 165 offset_index = self._index[identifier[0]][identifier[0:self.index_depth]] | |
| 166 while True: | |
| 167 if offset_index >= self._offsets[identifier[0]].size: | |
| 168 return | |
| 169 offset = self._offsets[identifier[0]].get_offset_by_index( offset_index ) | |
| 170 identifier2 = self._offsets[identifier[0]].get_identifier_by_offset( offset ) | |
| 171 if not identifier2 or identifier2 > identifier: | |
| 172 return | |
| 173 if identifier2 == identifier: | |
| 174 yield self._offsets[identifier[0]].get_line_by_offset( offset ) | |
| 175 offset_index += 1 | |
| 176 def get_offsets( self ): | |
| 177 keys = self._offsets.keys() | |
| 178 keys.sort() | |
| 179 for key in keys: | |
| 180 for offset in self._offsets[key].get_offsets(): | |
| 181 yield offset | |
| 182 def get_line_by_offset( self, offset ): | |
| 183 self.file.seek( offset ) | |
| 184 return self.file.readline() | |
| 185 def get_identifiers_offsets( self ): | |
| 186 keys = self._offsets.keys() | |
| 187 keys.sort() | |
| 188 for key in keys: | |
| 189 for offset in self._offsets[key].get_offsets(): | |
| 190 yield self._offsets[key].get_identifier_by_offset( offset ), offset | |
| 191 def get_identifier_by_line( self, line ): | |
| 192 if isinstance( line, str ): | |
| 193 fields = line.rstrip( '\r\n' ).split( self.split ) | |
| 194 if self.column < len( fields ): | |
| 195 return fields[self.column] | |
| 196 return None | |
| 197 def merge_with_dict( self, d ): | |
| 198 if not d: return #no data to merge | |
| 199 self._index = None | |
| 200 keys = d.keys() | |
| 201 keys.sort() | |
| 202 identifier = keys.pop( 0 ) | |
| 203 first_char = identifier[0] | |
| 204 temp = { identifier: d[identifier] } | |
| 205 while True: | |
| 206 if not keys: | |
| 207 if first_char not in self._offsets: | |
| 208 self._offsets[first_char] = SortedOffsets( self.filename, self.column, self.split ) | |
| 209 self._offsets[first_char].merge_with_dict( temp ) | |
| 210 return | |
| 211 identifier = keys.pop( 0 ) | |
| 212 if identifier[0] == first_char: | |
| 213 temp[identifier] = d[identifier] | |
| 214 else: | |
| 215 if first_char not in self._offsets: | |
| 216 self._offsets[first_char] = SortedOffsets( self.filename, self.column, self.split ) | |
| 217 self._offsets[first_char].merge_with_dict( temp ) | |
| 218 temp = { identifier: d[identifier] } | |
| 219 first_char = identifier[0] | |
| 220 | |
| 221 class BufferedIndex: | |
| 222 def __init__( self, filename, column, split = None, buffer = 1000000, index_depth = 3 ): | |
| 223 self.index = OffsetIndex( filename, column, split, index_depth ) | |
| 224 self.buffered_offsets = {} | |
| 225 f = open( filename, 'rb' ) | |
| 226 offset = f.tell() | |
| 227 identified_offset_count = 1 | |
| 228 while True: | |
| 229 offset = f.tell() | |
| 230 line = f.readline() | |
| 231 if not line: break #EOF | |
| 232 identifier = self.index.get_identifier_by_line( line ) | |
| 233 if identifier: | |
| 234 #flush buffered offsets, if buffer size reached | |
| 235 if buffer and identified_offset_count % buffer == 0: | |
| 236 self.index.merge_with_dict( self.buffered_offsets ) | |
| 237 self.buffered_offsets = {} | |
| 238 if identifier not in self.buffered_offsets: | |
| 239 self.buffered_offsets[identifier] = [] | |
| 240 self.buffered_offsets[identifier].append( offset ) | |
| 241 identified_offset_count += 1 | |
| 242 f.close() | |
| 243 | |
| 244 def get_lines_by_identifier( self, identifier ): | |
| 245 for line in self.index.get_lines_by_identifier( identifier ): | |
| 246 yield line | |
| 247 if identifier in self.buffered_offsets: | |
| 248 for offset in self.buffered_offsets[identifier]: | |
| 249 yield self.index.get_line_by_offset( offset ) | |
| 250 | |
| 251 | |
| 252 def fill_empty_columns( line, split, fill_values ): | |
| 253 if not fill_values: | |
| 254 return line | |
| 255 filled_columns = [] | |
| 256 for i, field in enumerate( line.split( split ) ): | |
| 257 if field or i >= len( fill_values ): | |
| 258 filled_columns.append( field ) | |
| 259 else: | |
| 260 filled_columns.append( fill_values[i] ) | |
| 261 if len( fill_values ) > len( filled_columns ): | |
| 262 filled_columns.extend( fill_values[ len( filled_columns ) : ] ) | |
| 263 return split.join( filled_columns ) | |
| 264 | |
| 265 | |
| 266 def join_files( filename1, column1, filename2, column2, out_filename, split = None, buffer = 1000000, keep_unmatched = False, keep_partial = False, index_depth = 3, fill_options = None ): | |
| 267 #return identifier based upon line | |
| 268 def get_identifier_by_line( line, column, split = None ): | |
| 269 if isinstance( line, str ): | |
| 270 fields = line.rstrip( '\r\n' ).split( split ) | |
| 271 if column < len( fields ): | |
| 272 return fields[column] | |
| 273 return None | |
| 274 if fill_options is None: | |
| 275 fill_options = Bunch( fill_unjoined_only = True, file1_columns = None, file2_columns = None ) | |
| 276 out = open( out_filename, 'w+b' ) | |
| 277 index = BufferedIndex( filename2, column2, split, buffer, index_depth ) | |
| 278 for line1 in open( filename1, 'rb' ): | |
| 279 identifier = get_identifier_by_line( line1, column1, split ) | |
| 280 if identifier: | |
| 281 written = False | |
| 282 for line2 in index.get_lines_by_identifier( identifier ): | |
| 283 if not fill_options.fill_unjoined_only: | |
| 284 out.write( "%s%s%s\n" % ( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ), split, fill_empty_columns( line2.rstrip( '\r\n' ), split, fill_options.file2_columns ) ) ) | |
| 285 else: | |
| 286 out.write( "%s%s%s\n" % ( line1.rstrip( '\r\n' ), split, line2.rstrip( '\r\n' ) ) ) | |
| 287 written = True | |
| 288 if not written and keep_unmatched: | |
| 289 out.write( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ) ) | |
| 290 if fill_options: | |
| 291 if fill_options.file2_columns: | |
| 292 out.write( "%s%s" % ( split, fill_empty_columns( "", split, fill_options.file2_columns ) ) ) | |
| 293 out.write( "\n" ) | |
| 294 elif keep_partial: | |
| 295 out.write( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ) ) | |
| 296 if fill_options: | |
| 297 if fill_options.file2_columns: | |
| 298 out.write( "%s%s" % ( split, fill_empty_columns( "", split, fill_options.file2_columns ) ) ) | |
| 299 out.write( "\n" ) | |
| 300 out.close() | |
| 301 | |
| 302 def main(): | |
| 303 parser = optparse.OptionParser() | |
| 304 parser.add_option( | |
| 305 '-b','--buffer', | |
| 306 dest='buffer', | |
| 307 type='int',default=1000000, | |
| 308 help='Number of lines to buffer at a time. Default: 1,000,000 lines. A buffer of 0 will attempt to use memory only.' | |
| 309 ) | |
| 310 parser.add_option( | |
| 311 '-d','--index_depth', | |
| 312 dest='index_depth', | |
| 313 type='int',default=3, | |
| 314 help='Depth to use on filebased offset indexing. Default: 3.' | |
| 315 ) | |
| 316 parser.add_option( | |
| 317 '-p','--keep_partial', | |
| 318 action='store_true', | |
| 319 dest='keep_partial', | |
| 320 default=False, | |
| 321 help='Keep rows in first input which are missing identifiers.') | |
| 322 parser.add_option( | |
| 323 '-u','--keep_unmatched', | |
| 324 action='store_true', | |
| 325 dest='keep_unmatched', | |
| 326 default=False, | |
| 327 help='Keep rows in first input which are not joined with the second input.') | |
| 328 parser.add_option( | |
| 329 '-f','--fill_options_file', | |
| 330 dest='fill_options_file', | |
| 331 type='str',default=None, | |
| 332 help='Fill empty columns with a values from a JSONified file.') | |
| 333 | |
| 334 | |
| 335 options, args = parser.parse_args() | |
| 336 | |
| 337 fill_options = None | |
| 338 if options.fill_options_file is not None: | |
| 339 try: | |
| 340 if simplejson is None: | |
| 341 raise simplejson_exception | |
| 342 fill_options = Bunch( **stringify_dictionary_keys( simplejson.load( open( options.fill_options_file ) ) ) ) #simplejson.load( open( options.fill_options_file ) ) | |
| 343 except Exception, e: | |
| 344 print "Warning: Ignoring fill options due to simplejson error (%s)." % e | |
| 345 if fill_options is None: | |
| 346 fill_options = Bunch() | |
| 347 if 'fill_unjoined_only' not in fill_options: | |
| 348 fill_options.fill_unjoined_only = True | |
| 349 if 'file1_columns' not in fill_options: | |
| 350 fill_options.file1_columns = None | |
| 351 if 'file2_columns' not in fill_options: | |
| 352 fill_options.file2_columns = None | |
| 353 | |
| 354 | |
| 355 try: | |
| 356 filename1 = args[0] | |
| 357 filename2 = args[1] | |
| 358 column1 = int( args[2] ) - 1 | |
| 359 column2 = int( args[3] ) - 1 | |
| 360 out_filename = args[4] | |
| 361 except: | |
| 362 print >> sys.stderr, "Error parsing command line." | |
| 363 sys.exit() | |
| 364 | |
| 365 #Character for splitting fields and joining lines | |
| 366 split = "\t" | |
| 367 | |
| 368 return join_files( filename1, column1, filename2, column2, out_filename, split, options.buffer, options.keep_unmatched, options.keep_partial, options.index_depth, fill_options = fill_options ) | |
| 369 | |
| 370 if __name__ == "__main__": main() |
