changeset 1:29209aa4c829 draft

planemo upload for repository https://github.com/galaxyproject/tools-devteam/tree/master/data_managers/data_manager_fetch_genome_dbkeys_all_fasta commit 592acc3119624e0f213483d952b4cbd3b7e827bd-dirty
author mvdbeek
date Thu, 18 Aug 2016 06:09:29 -0400
parents 39fc40852a3b
children 0738889ff4d2
files data_manager/data_manager_fetch_genome_all_fasta_dbkeys.py data_manager/data_manager_fetch_genome_all_fasta_dbkeys.xml test-data/test.tar test-data/test.tar.bz2 test-data/test.tar.gz test-data/test.zip
diffstat 6 files changed, 120 insertions(+), 86 deletions(-) [+]
line wrap: on
line diff
--- a/data_manager/data_manager_fetch_genome_all_fasta_dbkeys.py	Wed Aug 17 07:36:44 2016 -0400
+++ b/data_manager/data_manager_fetch_genome_all_fasta_dbkeys.py	Thu Aug 18 06:09:29 2016 -0400
@@ -7,26 +7,29 @@
 import shutil
 import optparse
 import urllib2
-#import uuid
 from ftplib import FTP
 import tarfile
 import zipfile
 import gzip
 import bz2
+from StringIO import StringIO
 
 from json import loads, dumps
 
 
-CHUNK_SIZE = 2**20 #1mb
+CHUNK_SIZE = 2**20  # 1mb
+
 
 def cleanup_before_exit( tmp_dir ):
     if tmp_dir and os.path.exists( tmp_dir ):
         shutil.rmtree( tmp_dir )
 
+
 def stop_err(msg):
     sys.stderr.write(msg)
     sys.exit(1)
-    
+
+
 def get_dbkey_dbname_id_name( params, dbkey_description=None ):
     dbkey = params['param_dict']['dbkey_source']['dbkey']
     #TODO: ensure sequence_id is unique and does not already appear in location file
@@ -48,16 +51,23 @@
             sequence_name = dbkey
     return dbkey, dbkey_name, sequence_id, sequence_name
 
+
 def _get_files_in_ftp_path( ftp, path ):
     path_contents = []
     ftp.retrlines( 'MLSD %s' % ( path ), path_contents.append )
     return [ line.split( ';' )[ -1 ].lstrip() for line in path_contents ]
 
-def _get_stream_readers_for_tar( fh ):
+
+def _get_stream_readers_for_tar( fh, tmp_dir ):
     fasta_tar = tarfile.open( fileobj=fh, mode='r:*' )
     return filter( lambda x: x is not None, [ fasta_tar.extractfile( member ) for member in fasta_tar.getmembers() ] )
 
+
 def _get_stream_readers_for_zip( fh, tmp_dir ):
+    """
+    Unpacks all archived files in a zip file.
+    Individual files will be concatenated (in _stream_fasta_to_file)
+    """
     fasta_zip = zipfile.ZipFile( fh, 'r' )
     rval = []
     for member in fasta_zip.namelist():
@@ -65,11 +75,14 @@
         rval.append( open( os.path.join( tmp_dir, member ), 'rb' ) )
     return rval
 
-def _get_stream_readers_for_gzip( fh ):
-    return [ gzip.GzipFile( fileobj=fh, mode='rb' ) ]
+
+def _get_stream_readers_for_gzip( fh, tmp_dir ):
+    return [ gzip.GzipFile( fileobj=fh, mode='rb') ]
 
-def _get_stream_readers_for_bz2( fh ):
-    return [ bz2.BZ2Decompressor( fh.name, 'rb' ) ]
+
+def _get_stream_readers_for_bz2( fh, tmp_dir ):
+    return [ bz2.BZ2File( fh.name, 'rb') ]
+
 
 def sort_fasta( fasta_filename, sort_method, params ):
     if sort_method is None:
@@ -77,6 +90,7 @@
     assert sort_method in SORTING_METHODS, ValueError( "%s is not a valid sorting option." % sort_method )
     return SORTING_METHODS[ sort_method ]( fasta_filename, params )
 
+
 def _move_and_index_fasta_for_sorting( fasta_filename ):
     unsorted_filename = tempfile.NamedTemporaryFile().name
     shutil.move( fasta_filename, unsorted_filename )
@@ -94,6 +108,7 @@
     current_order = map( lambda x: x[1], sorted( map( lambda x: ( x[1], x[0] ), fasta_offsets.items() ) ) )
     return ( unsorted_filename, fasta_offsets, current_order )
 
+
 def _write_sorted_fasta( sorted_names, fasta_offsets, sorted_fasta_filename, unsorted_fasta_filename ):
     unsorted_fh = open( unsorted_fasta_filename )
     sorted_fh = open( sorted_fasta_filename, 'wb+' )
@@ -110,6 +125,7 @@
     unsorted_fh.close()
     sorted_fh.close()
 
+
 def _sort_fasta_as_is( fasta_filename, params ):
     return
 
@@ -121,6 +137,7 @@
     else:
         _write_sorted_fasta( sorted_names, fasta_offsets, fasta_filename, unsorted_filename )    
 
+
 def _sort_fasta_gatk( fasta_filename, params ):
     #This method was added by reviewer request.
     ( unsorted_filename, fasta_offsets, current_order ) = _move_and_index_fasta_for_sorting( fasta_filename )
@@ -153,6 +170,7 @@
     else:
         _write_sorted_fasta( existing_sorted_names, fasta_offsets, fasta_filename, unsorted_filename )
 
+
 def _sort_fasta_custom( fasta_filename, params ):
     ( unsorted_filename, fasta_offsets, current_order ) = _move_and_index_fasta_for_sorting( fasta_filename )
     sorted_names = []
@@ -175,24 +193,42 @@
     else:
         _write_sorted_fasta( sorted_names, fasta_offsets, fasta_filename, unsorted_filename )
 
-def get_stream_reader(fh):
+
+def _download_file(start, fh):
+    tmp = tempfile.NamedTemporaryFile()
+    tmp.write(start)
+    tmp.write(fh.read())
+    tmp.flush()
+    tmp.seek(0)
+    return tmp
+
+
+def get_stream_reader(fh, tmp_dir):
+    """
+    Check if file is compressed and return correct stream reader.
+    If file has to be downloaded, do it now.
+    """
     magic_dict = {
         "\x1f\x8b\x08": _get_stream_readers_for_gzip,
         "\x42\x5a\x68": _get_stream_readers_for_bz2,
         "\x50\x4b\x03\x04": _get_stream_readers_for_zip,
-        "\x76\x2f\x31\x01": _get_stream_readers_for_tar
     }
     start_of_file = fh.read(CHUNK_SIZE)
     try:
         fh.seek(0)
     except AttributeError:  # This is if fh has been created by urllib2.urlopen
-        url = fh.url
-        fh = urllib2.urlopen(url)
-    for k,v in magic_dict:
+        fh = _download_file(start_of_file, fh)
+    for k,v in magic_dict.items():
         if start_of_file.startswith(k):
-            return v(fh)
+            return v(fh, tmp_dir)
+    try:  # Check if file is tar file
+        if tarfile.open(fileobj=StringIO(start_of_file)):
+            return _get_stream_readers_for_tar(fh, tmp_dir)
+    except tarfile.ReadError:
+        pass
     return fh
 
+
 def _get_ucsc_download_address(params, dbkey):
     """
     Check if we can find the correct file for the supplied dbkey on UCSC's FTP server
@@ -213,115 +249,107 @@
 
     ucsc_path = UCSC_DOWNLOAD_PATH % ucsc_dbkey
     path_contents = _get_files_in_ftp_path(ftp, ucsc_path)
+    ftp.close()
 
     for ucsc_chrom_fa_filename in UCSC_CHROM_FA_FILENAMES:
         for ext in COMPRESSED_EXTENSIONS:
             if "%s%s" % (ucsc_chrom_fa_filename, ext) in path_contents:
                 ucsc_file_name = "%s%s%s" % (ucsc_path, ucsc_chrom_fa_filename, ext)
-                return "ftp://%s" % ucsc_file_name
+                return "ftp://%s%s" % (UCSC_FTP_SERVER, ucsc_file_name)
 
     raise Exception('Unable to determine filename for UCSC Genome for %s: %s' % (ucsc_dbkey, path_contents))
 
-
-def download_from_ucsc( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name ):
-
-    # TODO: may need to think of a good way to merge multiple fasta files if necessary
-    url = _get_ucsc_download_address(params, dbkey)
-    fasta_readers = get_stream_reader( urllib2.urlopen(url) )
-    
+def add_fasta_to_table(data_manager_dict, fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params):
     for data_table_name, data_table_entry in _stream_fasta_to_file( fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params ):
         if data_table_entry:
             _add_data_table_entry( data_manager_dict, data_table_entry, data_table_name )
 
-def download_from_ncbi( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name ):
+
+def download_from_ucsc( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, tmp_dir ):
+    url = _get_ucsc_download_address(params, dbkey)
+    fasta_readers = get_stream_reader(urllib2.urlopen(url), tmp_dir)
+    add_fasta_to_table(data_manager_dict, fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params)
+
+
+def download_from_ncbi( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, tmp_dir ):
     NCBI_DOWNLOAD_URL = 'http://togows.dbcls.jp/entry/ncbi-nucleotide/%s.fasta' #FIXME: taken from dave's genome manager...why some japan site?
-    
     requested_identifier = params['param_dict']['reference_source']['requested_identifier']
     url = NCBI_DOWNLOAD_URL % requested_identifier
-    fasta_readers = get_stream_reader(urllib2.urlopen( url ))
-    
-    for data_table_name, data_table_entry in _stream_fasta_to_file( fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params ):
-        if data_table_entry:
-            _add_data_table_entry( data_manager_dict, data_table_entry, data_table_name )
+    fasta_readers = get_stream_reader(urllib2.urlopen(url), tmp_dir)
+    add_fasta_to_table(data_manager_dict, fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params)
+
 
-def download_from_url( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name ):
-    #TODO: we should automatically do decompression here
+def download_from_url( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, tmp_dir ):
     urls = filter( bool, map( lambda x: x.strip(), params['param_dict']['reference_source']['user_url'].split( '\n' ) ) )
-    fasta_readers = [ get_stream_reader(urllib2.urlopen( url )) for url in urls ]
-    
-    for data_table_name, data_table_entry in _stream_fasta_to_file( fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params ):
-        if data_table_entry:
-            _add_data_table_entry( data_manager_dict, data_table_entry, data_table_name )
+    fasta_readers = [ get_stream_reader(urllib2.urlopen( url ), tmp_dir) for url in urls ]
+    add_fasta_to_table(data_manager_dict, fasta_readers, target_directory, dbkey, dbkey_name, sequence_id,sequence_name, params)
 
-def download_from_history( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name ):
+
+def download_from_history( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, tmp_dir ):
     #TODO: allow multiple FASTA input files
     input_filename = params['param_dict']['reference_source']['input_fasta']
     if isinstance( input_filename, list ):
-        fasta_readers = [ get_stream_reader(open( filename, 'rb' )) for filename in input_filename ]
+        fasta_readers = [ get_stream_reader(open(filename, 'rb'), tmp_dir) for filename in input_filename ]
     else:
-        fasta_readers = get_stream_reader(open( input_filename ))
-    
-    for data_table_name, data_table_entry in _stream_fasta_to_file( fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params ):
-        if data_table_entry:
-            _add_data_table_entry( data_manager_dict, data_table_entry, data_table_name )
+        fasta_readers = get_stream_reader(open(input_filename), tmp_dir)
+    add_fasta_to_table(data_manager_dict, fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params)
 
-def copy_from_directory( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name ):
+
+def copy_from_directory( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, tmp_dir ):
     input_filename = params['param_dict']['reference_source']['fasta_filename']
     create_symlink = params['param_dict']['reference_source']['create_symlink'] == 'create_symlink'
     if create_symlink:
         data_table_entries = _create_symlink( input_filename, target_directory, dbkey, dbkey_name, sequence_id, sequence_name )
     else:
         if isinstance( input_filename, list ):
-            fasta_readers = [ get_stream_reader(open( filename, 'rb' )) for filename in input_filename ]
+            fasta_readers = [ get_stream_reader(open(filename, 'rb'), tmp_dir) for filename in input_filename ]
         else:
-            fasta_readers = get_stream_reader(open( input_filename ))
+            fasta_readers = get_stream_reader(open(input_filename), tmp_dir)
         data_table_entries = _stream_fasta_to_file( fasta_readers, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params )
     for data_table_name, data_table_entry in data_table_entries:
         if data_table_entry:
             _add_data_table_entry( data_manager_dict, data_table_entry, data_table_name )
 
+
 def _add_data_table_entry( data_manager_dict, data_table_entry, data_table_name ):
     data_manager_dict['data_tables'] = data_manager_dict.get( 'data_tables', {} )
     data_manager_dict['data_tables'][data_table_name] = data_manager_dict['data_tables'].get( 'all_fasta', [] )
     data_manager_dict['data_tables'][data_table_name].append( data_table_entry )
     return data_manager_dict
 
+
 def _stream_fasta_to_file( fasta_stream, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, params, close_stream=True ):
     fasta_base_filename = "%s.fa" % sequence_id
     fasta_filename = os.path.join( target_directory, fasta_base_filename )
-    fasta_writer = open( fasta_filename, 'wb+' )
+    with open( fasta_filename, 'wb+' ) as fasta_writer:
 
-    if isinstance( fasta_stream, list ) and len( fasta_stream ) == 1:
-        fasta_stream = fasta_stream[0]
+        if isinstance( fasta_stream, list ) and len( fasta_stream ) == 1:
+            fasta_stream = fasta_stream[0]
 
-    if isinstance( fasta_stream, list ):
-        last_char = None
-        for fh in fasta_stream:
-            if last_char not in [ None, '\n', '\r' ]:
-                fasta_writer.write( '\n' )
+        if isinstance( fasta_stream, list ):
+            last_char = None
+            for fh in fasta_stream:
+                if last_char not in [ None, '\n', '\r' ]:
+                    fasta_writer.write( '\n' )
+                while True:
+                    data = fh.read( CHUNK_SIZE )
+                    if data:
+                        fasta_writer.write( data )
+                        last_char = data[-1]
+                    else:
+                        break
+                if close_stream:
+                    fh.close()
+        else:
             while True:
-                data = fh.read( CHUNK_SIZE )
+                data = fasta_stream.read( CHUNK_SIZE )
                 if data:
                     fasta_writer.write( data )
-                    last_char = data[-1]
                 else:
                     break
             if close_stream:
-                fh.close()
-    else:
-        while True:
-            data = fasta_stream.read( CHUNK_SIZE )
-            if data:
-                fasta_writer.write( data )
-            else:
-                break
-        if close_stream:
-            fasta_stream.close()
+                fasta_stream.close()
 
-    fasta_writer.close()
-
-
-    
     sort_fasta( fasta_filename, params['param_dict']['sorting']['sort_selector'], params )
     
     dbkey_dict = None
@@ -333,6 +361,7 @@
     
     return [ ( '__dbkeys__', dbkey_dict ), ( 'all_fasta', dict( value=sequence_id, dbkey=dbkey, name=sequence_name, path=fasta_base_filename ) ) ]
 
+
 def compute_fasta_length( fasta_file, out_file, keep_first_word=False ):
 
     infile = fasta_file
@@ -365,6 +394,7 @@
     out.write( "%s\t%d\n" % ( fasta_title[ 1: ], seq_len ) )
     out.close()
 
+
 def _create_symlink( input_filename, target_directory, dbkey, dbkey_name, sequence_id, sequence_name ):
     fasta_base_filename = "%s.fa" % sequence_id
     fasta_filename = os.path.join( target_directory, fasta_base_filename )
@@ -380,12 +410,11 @@
     return [ ( '__dbkeys__', dbkey_dict ), ( 'all_fasta', dict( value=sequence_id, dbkey=dbkey, name=sequence_name, path=fasta_base_filename ) ) ]
 
 
-
-
 REFERENCE_SOURCE_TO_DOWNLOAD = dict( ucsc=download_from_ucsc, ncbi=download_from_ncbi, url=download_from_url, history=download_from_history, directory=copy_from_directory )
 
 SORTING_METHODS = dict( as_is=_sort_fasta_as_is, lexicographical=_sort_fasta_lexicographical, gatk=_sort_fasta_gatk, custom=_sort_fasta_custom )
 
+
 def main():
     #Parse Command Line
     parser = optparse.OptionParser()
@@ -403,11 +432,16 @@
     
     if dbkey in [ None, '', '?' ]:
         raise Exception( '"%s" is not a valid dbkey. You must specify a valid dbkey.' % ( dbkey ) )
-    
+
+    # Create a tmp_dir, in case a zip file needs to be uncompressed
+    tmp_dir = tempfile.mkdtemp()
     #Fetch the FASTA
-    REFERENCE_SOURCE_TO_DOWNLOAD[ params['param_dict']['reference_source']['reference_source_selector'] ]( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name )
-    
+    try:
+        REFERENCE_SOURCE_TO_DOWNLOAD[ params['param_dict']['reference_source']['reference_source_selector'] ]( data_manager_dict, params, target_directory, dbkey, dbkey_name, sequence_id, sequence_name, tmp_dir )
+    finally:
+        cleanup_before_exit(tmp_dir)
     #save info to json file
     open( filename, 'wb' ).write( dumps( data_manager_dict ) )
         
-if __name__ == "__main__": main()
+if __name__ == "__main__":
+    main()
--- a/data_manager/data_manager_fetch_genome_all_fasta_dbkeys.xml	Wed Aug 17 07:36:44 2016 -0400
+++ b/data_manager/data_manager_fetch_genome_all_fasta_dbkeys.xml	Thu Aug 18 06:09:29 2016 -0400
@@ -1,13 +1,13 @@
-<tool id="data_manager_fetch_genome_all_fasta_dbkey" name="Create DBKey and Reference Genome" version="0.0.1" tool_type="manage_data">
+<tool id="data_manager_fetch_genome_all_fasta_dbkey" name="Create DBKey and Reference Genome" version="0.0.2" tool_type="manage_data">
     <description>fetching</description>
-    <command interpreter="python">data_manager_fetch_genome_all_fasta_dbkeys.py "${out_file}" 
-    #if str( $dbkey_source.dbkey_source_selector ) == 'existing':
-    --dbkey_description ${ dbkey_source.dbkey.get_display_text() }
-    #else
-    --dbkey_description "${ dbkey_source.dbkey_name or $dbkey_source.dbkey }"
-    #end if
-    
-    </command>
+    <command><![CDATA[
+       python "$__tool_directory__"/data_manager_fetch_genome_all_fasta_dbkeys.py "${out_file}"
+       #if str( $dbkey_source.dbkey_source_selector ) == 'existing':
+       --dbkey_description ${ dbkey_source.dbkey.get_display_text() }
+       #else
+       --dbkey_description "${ dbkey_source.dbkey_name or $dbkey_source.dbkey }"
+       #end if
+    ]]></command>
     <inputs>
         <conditional name="dbkey_source">
           <param name="dbkey_source_selector" type="select" label="Use existing dbkey or create a new one.">
@@ -37,7 +37,7 @@
             <param type="text" name="requested_dbkey" value="" label="UCSC's DBKEY for source FASTA" optional="False" />
           </when>
           <when value="ncbi">
-            <param type="text" name="requested_identifier" value="" label="NCBI identifier" optional="False" />
+              <param type="text" name="requested_identifier" value="" label="NCBI identifier/accession" help="Identifiers (e.g 667699573) or accessions (e.g AC020606.7) may be used" optional="False" />
           </when>
           <when value="url">
             <param type="text" area="True" name="user_url" value="http://" label="URLs" optional="False" />
Binary file test-data/test.tar has changed
Binary file test-data/test.tar.bz2 has changed
Binary file test-data/test.tar.gz has changed
Binary file test-data/test.zip has changed