# HG changeset patch # User devteam # Date 1484844439 18000 # Node ID b7d514d57e02bb726455baa48ce5219b7fd26106 # Parent a943a7e330cb66b38a597cbed8b42200ba80851f planemo upload for repository https://github.com/galaxyproject/tools-devteam/tree/master/data_managers/data_manager_fetch_genome_dbkeys_all_fasta commit ddee21fa767f3234a4e5a9acfeeabdded32e7d01 diff -r a943a7e330cb -r b7d514d57e02 data_manager/data_manager_fetch_genome_all_fasta_dbkeys.py --- a/data_manager/data_manager_fetch_genome_all_fasta_dbkeys.py Thu Oct 15 13:37:01 2015 -0400 +++ b/data_manager/data_manager_fetch_genome_all_fasta_dbkeys.py Thu Jan 19 11:47:19 2017 -0500 @@ -6,27 +6,37 @@ import tempfile import shutil import optparse -import urllib2 -#import uuid from ftplib import FTP import tarfile import zipfile import gzip import bz2 - +try: + # For Python 3.0 and later + from urllib.request import urlopen + from io import BytesIO as StringIO + from io import UnsupportedOperation +except ImportError: + # Fall back to Python 2's urllib2 + from urllib2 import urlopen + from StringIO import StringIO + UnsupportedOperation = AttributeError from json import loads, dumps -CHUNK_SIZE = 2**20 #1mb +CHUNK_SIZE = 2**20 # 1mb + def cleanup_before_exit( tmp_dir ): if tmp_dir and os.path.exists( tmp_dir ): shutil.rmtree( tmp_dir ) + def stop_err(msg): sys.stderr.write(msg) sys.exit(1) - + + def get_dbkey_dbname_id_name( params, dbkey_description=None ): dbkey = params['param_dict']['dbkey_source']['dbkey'] #TODO: ensure sequence_id is unique and does not already appear in location file @@ -48,28 +58,38 @@ sequence_name = dbkey return dbkey, dbkey_name, sequence_id, sequence_name + def _get_files_in_ftp_path( ftp, path ): path_contents = [] ftp.retrlines( 'MLSD %s' % ( path ), path_contents.append ) return [ line.split( ';' )[ -1 ].lstrip() for line in path_contents ] -def _get_stream_readers_for_tar( file_obj, tmp_dir ): - fasta_tar = tarfile.open( fileobj=file_obj, mode='r:*' ) - return filter( lambda x: x is not None, [ fasta_tar.extractfile( member ) for member in fasta_tar.getmembers() ] ) + +def _get_stream_readers_for_tar( fh, tmp_dir ): + fasta_tar = tarfile.open( fileobj=fh, mode='r:*' ) + return [x for x in [fasta_tar.extractfile(member) for member in fasta_tar.getmembers()] if x] + -def _get_stream_readers_for_zip( file_obj, tmp_dir ): - fasta_zip = zipfile.ZipFile( file_obj, 'r' ) +def _get_stream_readers_for_zip( fh, tmp_dir ): + """ + Unpacks all archived files in a zip file. + Individual files will be concatenated (in _stream_fasta_to_file) + """ + fasta_zip = zipfile.ZipFile( fh, 'r' ) rval = [] for member in fasta_zip.namelist(): fasta_zip.extract( member, tmp_dir ) rval.append( open( os.path.join( tmp_dir, member ), 'rb' ) ) return rval -def _get_stream_readers_for_gzip( file_obj, tmp_dir ): - return [ gzip.GzipFile( fileobj=file_obj, mode='rb' ) ] + +def _get_stream_readers_for_gzip( fh, tmp_dir ): + return [ gzip.GzipFile( fileobj=fh, mode='rb') ] -def _get_stream_readers_for_bz2( file_obj, tmp_dir ): - return [ bz2.BZ2File( file_obj.name, 'rb' ) ] + +def _get_stream_readers_for_bz2( fh, tmp_dir ): + return [ bz2.BZ2File( fh.name, 'rb') ] + def sort_fasta( fasta_filename, sort_method, params ): if sort_method is None: @@ -77,6 +97,7 @@ assert sort_method in SORTING_METHODS, ValueError( "%s is not a valid sorting option." % sort_method ) return SORTING_METHODS[ sort_method ]( fasta_filename, params ) + def _move_and_index_fasta_for_sorting( fasta_filename ): unsorted_filename = tempfile.NamedTemporaryFile().name shutil.move( fasta_filename, unsorted_filename ) @@ -94,6 +115,7 @@ current_order = map( lambda x: x[1], sorted( map( lambda x: ( x[1], x[0] ), fasta_offsets.items() ) ) ) return ( unsorted_filename, fasta_offsets, current_order ) + def _write_sorted_fasta( sorted_names, fasta_offsets, sorted_fasta_filename, unsorted_fasta_filename ): unsorted_fh = open( unsorted_fasta_filename ) sorted_fh = open( sorted_fasta_filename, 'wb+' ) @@ -110,6 +132,7 @@ unsorted_fh.close() sorted_fh.close() + def _sort_fasta_as_is( fasta_filename, params ): return @@ -121,6 +144,7 @@ else: _write_sorted_fasta( sorted_names, fasta_offsets, fasta_filename, unsorted_filename ) + def _sort_fasta_gatk( fasta_filename, params ): #This method was added by reviewer request. ( unsorted_filename, fasta_offsets, current_order ) = _move_and_index_fasta_for_sorting( fasta_filename ) @@ -153,12 +177,13 @@ else: _write_sorted_fasta( existing_sorted_names, fasta_offsets, fasta_filename, unsorted_filename ) + def _sort_fasta_custom( fasta_filename, params ): ( unsorted_filename, fasta_offsets, current_order ) = _move_and_index_fasta_for_sorting( fasta_filename ) sorted_names = [] for id_repeat in params['param_dict']['sorting']['sequence_identifiers']: sorted_names.append( id_repeat[ 'identifier' ] ) - handle_not_listed = params['param_dict']['sorting']['handle_not_listed']['handle_not_listed_selector'] + handle_not_listed = params['param_dict']['sorting']['handle_not_listed_selector'] if handle_not_listed.startswith( 'keep' ): add_list = [] for name in current_order: @@ -175,155 +200,163 @@ else: _write_sorted_fasta( sorted_names, fasta_offsets, fasta_filename, unsorted_filename ) -def download_from_ucsc( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name ): + +def _download_file(start, fh): + tmp = tempfile.NamedTemporaryFile() + tmp.write(start) + tmp.write(fh.read()) + tmp.flush() + tmp.seek(0) + return tmp + + +def get_stream_reader(fh, tmp_dir): + """ + Check if file is compressed and return correct stream reader. + If file has to be downloaded, do it now. + """ + magic_dict = { + b"\x1f\x8b\x08": _get_stream_readers_for_gzip, + b"\x42\x5a\x68": _get_stream_readers_for_bz2, + b"\x50\x4b\x03\x04": _get_stream_readers_for_zip, + } + start_of_file = fh.read(CHUNK_SIZE) + try: + fh.seek(0) + except UnsupportedOperation: # This is if fh has been created by urlopen + fh = _download_file(start_of_file, fh) + for k,v in magic_dict.items(): + if start_of_file.startswith(k): + return v(fh, tmp_dir) + try: # Check if file is tar file + if tarfile.open(fileobj=StringIO(start_of_file)): + return _get_stream_readers_for_tar(fh, tmp_dir) + except tarfile.ReadError: + pass + return fh + + +def _get_ucsc_download_address(params, dbkey): + """ + Check if we can find the correct file for the supplied dbkey on UCSC's FTP server + """ UCSC_FTP_SERVER = 'hgdownload.cse.ucsc.edu' UCSC_DOWNLOAD_PATH = '/goldenPath/%s/bigZips/' - COMPRESSED_EXTENSIONS = [ ( '.tar.gz', _get_stream_readers_for_tar ), ( '.tar.bz2', _get_stream_readers_for_tar ), ( '.zip', _get_stream_readers_for_zip ), ( '.fa.gz', _get_stream_readers_for_gzip ), ( '.fa.bz2', _get_stream_readers_for_bz2 ) ] - + COMPRESSED_EXTENSIONS = ['.tar.gz', '.tgz', '.tar.bz2', '.zip', '.fa.gz', '.fa.bz2'] + email = params['param_dict']['__user_email__'] if not email: email = 'anonymous@example.com' ucsc_dbkey = params['param_dict']['reference_source']['requested_dbkey'] or dbkey - UCSC_CHROM_FA_FILENAMES = [ '%s.chromFa' % ucsc_dbkey, 'chromFa', ucsc_dbkey ] - - ftp = FTP( UCSC_FTP_SERVER ) - ftp.login( 'anonymous', email ) - + UCSC_CHROM_FA_FILENAMES = ['%s.chromFa' % ucsc_dbkey, 'chromFa', ucsc_dbkey] + + ftp = FTP(UCSC_FTP_SERVER) + ftp.login('anonymous', email) + ucsc_path = UCSC_DOWNLOAD_PATH % ucsc_dbkey - path_contents = _get_files_in_ftp_path( ftp, ucsc_path ) - - ucsc_file_name = None - get_stream_reader = None - ext = None - ucsc_chrom_fa_filename = None + path_contents = _get_files_in_ftp_path(ftp, ucsc_path) + ftp.quit() + for ucsc_chrom_fa_filename in UCSC_CHROM_FA_FILENAMES: - for ext, get_stream_reader in COMPRESSED_EXTENSIONS: - if "%s%s" % ( ucsc_chrom_fa_filename, ext ) in path_contents: - ucsc_file_name = "%s%s%s" % ( ucsc_path, ucsc_chrom_fa_filename, ext ) - break - if ucsc_file_name: - break - - if not ucsc_file_name: - raise Exception( 'Unable to determine filename for UCSC Genome for %s: %s' % ( ucsc_dbkey, path_contents ) ) - - - tmp_dir = tempfile.mkdtemp( prefix='tmp-data-manager-ucsc-' ) - ucsc_fasta_filename = os.path.join( tmp_dir, "%s%s" % ( ucsc_chrom_fa_filename, ext ) ) - - fasta_base_filename = "%s.fa" % sequence_id - fasta_filename = os.path.join( target_directory, fasta_base_filename ) - fasta_writer = open( fasta_filename, 'wb+' ) - - tmp_extract_dir = os.path.join ( tmp_dir, 'extracted_fasta' ) - os.mkdir( tmp_extract_dir ) - - tmp_fasta = open( ucsc_fasta_filename, 'wb+' ) - - ftp.retrbinary( 'RETR %s' % ucsc_file_name, tmp_fasta.write ) - - tmp_fasta.flush() - tmp_fasta.seek( 0 ) - - fasta_readers = get_stream_reader( tmp_fasta, tmp_extract_dir ) - - for data_table_name, data_table_entry in _stream_fasta_to_file( fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params ): - if data_table_entry: - _add_data_table_entry( data_manager_dict, data_table_entry, data_table_name ) - - for fasta_reader in fasta_readers: - fasta_reader.close() - tmp_fasta.close() - cleanup_before_exit( tmp_dir ) + for ext in COMPRESSED_EXTENSIONS: + if "%s%s" % (ucsc_chrom_fa_filename, ext) in path_contents: + ucsc_file_name = "%s%s%s" % (ucsc_path, ucsc_chrom_fa_filename, ext) + return "ftp://%s%s" % (UCSC_FTP_SERVER, ucsc_file_name) -def download_from_ncbi( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name ): - NCBI_DOWNLOAD_URL = 'http://togows.dbcls.jp/entry/ncbi-nucleotide/%s.fasta' #FIXME: taken from dave's genome manager...why some japan site? - - requested_identifier = params['param_dict']['reference_source']['requested_identifier'] - url = NCBI_DOWNLOAD_URL % requested_identifier - fasta_readers = urllib2.urlopen( url ) - + raise Exception('Unable to determine filename for UCSC Genome for %s: %s' % (ucsc_dbkey, path_contents)) + +def add_fasta_to_table(data_manager_dict, fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params): for data_table_name, data_table_entry in _stream_fasta_to_file( fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params ): if data_table_entry: _add_data_table_entry( data_manager_dict, data_table_entry, data_table_name ) -def download_from_url( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name ): - #TODO: we should automatically do decompression here + +def download_from_ucsc( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, tmp_dir ): + url = _get_ucsc_download_address(params, dbkey) + fasta_readers = get_stream_reader(urlopen(url), tmp_dir) + add_fasta_to_table(data_manager_dict, fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params) + + +def download_from_ncbi( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, tmp_dir ): + NCBI_DOWNLOAD_URL = 'http://togows.dbcls.jp/entry/ncbi-nucleotide/%s.fasta' #FIXME: taken from dave's genome manager...why some japan site? + requested_identifier = params['param_dict']['reference_source']['requested_identifier'] + url = NCBI_DOWNLOAD_URL % requested_identifier + fasta_readers = get_stream_reader(urlopen(url), tmp_dir) + add_fasta_to_table(data_manager_dict, fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params) + + +def download_from_url( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, tmp_dir ): urls = filter( bool, map( lambda x: x.strip(), params['param_dict']['reference_source']['user_url'].split( '\n' ) ) ) - fasta_readers = [ urllib2.urlopen( url ) for url in urls ] - - for data_table_name, data_table_entry in _stream_fasta_to_file( fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params ): - if data_table_entry: - _add_data_table_entry( data_manager_dict, data_table_entry, data_table_name ) + fasta_readers = [ get_stream_reader(urlopen( url ), tmp_dir) for url in urls ] + add_fasta_to_table(data_manager_dict, fasta_readers, target_directory, dbkey, dbkey_name, sequence_id,sequence_name, params) -def download_from_history( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name ): + +def download_from_history( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, tmp_dir ): #TODO: allow multiple FASTA input files input_filename = params['param_dict']['reference_source']['input_fasta'] if isinstance( input_filename, list ): - fasta_readers = [ open( filename, 'rb' ) for filename in input_filename ] + fasta_readers = [ get_stream_reader(open(filename, 'rb'), tmp_dir) for filename in input_filename ] else: - fasta_readers = open( input_filename ) - - for data_table_name, data_table_entry in _stream_fasta_to_file( fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params ): - if data_table_entry: - _add_data_table_entry( data_manager_dict, data_table_entry, data_table_name ) + fasta_readers = get_stream_reader(open(input_filename), tmp_dir) + add_fasta_to_table(data_manager_dict, fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params) -def copy_from_directory( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name ): + +def copy_from_directory( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, tmp_dir ): input_filename = params['param_dict']['reference_source']['fasta_filename'] create_symlink = params['param_dict']['reference_source']['create_symlink'] == 'create_symlink' if create_symlink: data_table_entries = _create_symlink( input_filename, target_directory, dbkey, dbkey_name, sequence_id, sequence_name ) else: if isinstance( input_filename, list ): - fasta_readers = [ open( filename, 'rb' ) for filename in input_filename ] + fasta_readers = [ get_stream_reader(open(filename, 'rb'), tmp_dir) for filename in input_filename ] else: - fasta_readers = open( input_filename ) + fasta_readers = get_stream_reader(open(input_filename), tmp_dir) data_table_entries = _stream_fasta_to_file( fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params ) for data_table_name, data_table_entry in data_table_entries: if data_table_entry: _add_data_table_entry( data_manager_dict, data_table_entry, data_table_name ) + def _add_data_table_entry( data_manager_dict, data_table_entry, data_table_name ): data_manager_dict['data_tables'] = data_manager_dict.get( 'data_tables', {} ) data_manager_dict['data_tables'][data_table_name] = data_manager_dict['data_tables'].get( 'all_fasta', [] ) data_manager_dict['data_tables'][data_table_name].append( data_table_entry ) return data_manager_dict + def _stream_fasta_to_file( fasta_stream, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params, close_stream=True ): fasta_base_filename = "%s.fa" % sequence_id fasta_filename = os.path.join( target_directory, fasta_base_filename ) - fasta_writer = open( fasta_filename, 'wb+' ) - - if isinstance( fasta_stream, list ) and len( fasta_stream ) == 1: - fasta_stream = fasta_stream[0] - - if isinstance( fasta_stream, list ): - last_char = None - for fh in fasta_stream: - if last_char not in [ None, '\n', '\r' ]: - fasta_writer.write( '\n' ) + with open( fasta_filename, 'wb+' ) as fasta_writer: + + if isinstance( fasta_stream, list ) and len( fasta_stream ) == 1: + fasta_stream = fasta_stream[0] + + if isinstance( fasta_stream, list ): + last_char = None + for fh in fasta_stream: + if last_char not in [ None, '\n', '\r', b'\n', b'\r' ]: + fasta_writer.write( b'\n' ) + while True: + data = fh.read( CHUNK_SIZE ) + if data: + fasta_writer.write( data ) + last_char = data[-1] + else: + break + if close_stream: + fh.close() + else: while True: - data = fh.read( CHUNK_SIZE ) + data = fasta_stream.read( CHUNK_SIZE ) if data: fasta_writer.write( data ) - last_char = data[-1] else: break if close_stream: - fh.close() - else: - while True: - data = fasta_stream.read( CHUNK_SIZE ) - if data: - fasta_writer.write( data ) - else: - break - if close_stream: - fasta_stream.close() - - fasta_writer.close() - + fasta_stream.close() + sort_fasta( fasta_filename, params['param_dict']['sorting']['sort_selector'], params ) dbkey_dict = None @@ -335,6 +368,7 @@ return [ ( '__dbkeys__', dbkey_dict ), ( 'all_fasta', dict( value=sequence_id, dbkey=dbkey, name=sequence_name, path=fasta_base_filename ) ) ] + def compute_fasta_length( fasta_file, out_file, keep_first_word=False ): infile = fasta_file @@ -367,6 +401,7 @@ out.write( "%s\t%d\n" % ( fasta_title[ 1: ], seq_len ) ) out.close() + def _create_symlink( input_filename, target_directory, dbkey, dbkey_name, sequence_id, sequence_name ): fasta_base_filename = "%s.fa" % sequence_id fasta_filename = os.path.join( target_directory, fasta_base_filename ) @@ -382,12 +417,11 @@ return [ ( '__dbkeys__', dbkey_dict ), ( 'all_fasta', dict( value=sequence_id, dbkey=dbkey, name=sequence_name, path=fasta_base_filename ) ) ] - - REFERENCE_SOURCE_TO_DOWNLOAD = dict( ucsc=download_from_ucsc, ncbi=download_from_ncbi, url=download_from_url, history=download_from_history, directory=copy_from_directory ) SORTING_METHODS = dict( as_is=_sort_fasta_as_is, lexicographical=_sort_fasta_lexicographical, gatk=_sort_fasta_gatk, custom=_sort_fasta_custom ) + def main(): #Parse Command Line parser = optparse.OptionParser() @@ -405,11 +439,16 @@ if dbkey in [ None, '', '?' ]: raise Exception( '"%s" is not a valid dbkey. You must specify a valid dbkey.' % ( dbkey ) ) - + + # Create a tmp_dir, in case a zip file needs to be uncompressed + tmp_dir = tempfile.mkdtemp() #Fetch the FASTA - REFERENCE_SOURCE_TO_DOWNLOAD[ params['param_dict']['reference_source']['reference_source_selector'] ]( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name ) - + try: + REFERENCE_SOURCE_TO_DOWNLOAD[ params['param_dict']['reference_source']['reference_source_selector'] ]( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, tmp_dir ) + finally: + cleanup_before_exit(tmp_dir) #save info to json file - open( filename, 'wb' ).write( dumps( data_manager_dict ) ) + open( filename, 'wb' ).write( dumps( data_manager_dict ).encode() ) -if __name__ == "__main__": main() +if __name__ == "__main__": + main() diff -r a943a7e330cb -r b7d514d57e02 data_manager/data_manager_fetch_genome_all_fasta_dbkeys.xml --- a/data_manager/data_manager_fetch_genome_all_fasta_dbkeys.xml Thu Oct 15 13:37:01 2015 -0400 +++ b/data_manager/data_manager_fetch_genome_all_fasta_dbkeys.xml Thu Jan 19 11:47:19 2017 -0500 @@ -1,86 +1,77 @@ - + fetching - data_manager_fetch_genome_all_fasta_dbkeys.py "${out_file}" - #if str( $dbkey_source.dbkey_source_selector ) == 'existing': - --dbkey_description ${ dbkey_source.dbkey.get_display_text() } - #else - --dbkey_description "${ dbkey_source.dbkey_name or $dbkey_source.dbkey }" - #end if - - + - - - - - - - - - - - + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + - - - - - - - - + @@ -110,7 +101,7 @@ .. class:: infomark -**Notice:** If you leave name, description, or id blank, it will be generated automatically. +**Notice:** If you leave name, description, or id blank, it will be generated automatically. diff -r a943a7e330cb -r b7d514d57e02 test-data/test.tar Binary file test-data/test.tar has changed diff -r a943a7e330cb -r b7d514d57e02 test-data/test.tar.bz2 Binary file test-data/test.tar.bz2 has changed diff -r a943a7e330cb -r b7d514d57e02 test-data/test.tar.gz Binary file test-data/test.tar.gz has changed diff -r a943a7e330cb -r b7d514d57e02 test-data/test.zip Binary file test-data/test.zip has changed