Mercurial > repos > matt-shirley > json_data_source
changeset 4:96103d66b7af
Properly handle extra data paths, write complete line separated JSON for Galaxy to scoop up and set metadata.
author | Matt Shirley <mdshw5@gmail.com> |
---|---|
date | Wed, 02 Jul 2014 09:33:03 -0400 |
parents | 988f34ef5c9f |
children | 33fa019735a4 |
files | json_data_source.py |
diffstat | 1 files changed, 89 insertions(+), 29 deletions(-) [+] |
line wrap: on
line diff
--- a/json_data_source.py Sun Jun 29 11:16:20 2014 -0400 +++ b/json_data_source.py Wed Jul 02 09:33:03 2014 -0400 @@ -3,8 +3,11 @@ import optparse import urllib import os.path +import os +from operator import itemgetter CHUNK_SIZE = 2**20 #1mb +VALID_CHARS = '.-()[]0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ ' def chunk_write( source_stream, target_stream, source_method = "read", target_method="write" ): @@ -44,56 +47,113 @@ def download_extra_data( query_ext_data, base_path ): + """ Download any extra data defined in the JSON. + NOTE: the "path" value is a relative path to the file on our + file system. This is slightly dangerous and we should make every effort + to avoid a malicious absolute path to write the file elsewhere on the + filesystem. + """ for ext_data in query_ext_data: + if not os.path.exists( base_path ): + os.mkdir( base_path ) query_stream = urllib.urlopen( ext_data.get( 'url' ) ) - output_stream = open( os.path.normpath( '/'.join( [ base_path, extra_item.get( 'path' ) ] ) ), 'wb' ) + ext_path = ext_data.get( 'path' ) + os.makedirs( os.path.normpath( '/'.join( [ base_path, os.path.dirname( ext_path ) ] ) ) ) + output_stream = open( os.path.normpath( '/'.join( [ base_path, ext_path ] ) ), 'wb' ) chunk_write( query_stream, output_stream ) query_stream.close() output_stream.close() +def metadata_to_json( dataset_id, metadata, filename, ds_type='dataset', primary=False): + """ Return line separated JSON """ + meta_dict = dict( type = ds_type, + ext = metadata.get( 'extension' ), + filename = filename, + name = metadata.get( 'name' ), + metadata = metadata.get( 'metadata' ) ) + if metadata.get( 'extra_data', None ): + meta_dict[ 'extra_data' ] = '_'.join( [ filename, 'files' ] ) + if primary: + meta_dict[ 'base_dataset_id' ] = dataset_id + else: + meta_dict[ 'dataset_id' ] = dataset_id + return "%s\n" % json.dumps( meta_dict ) + + +def download_files_and_write_metadata(query_item, json_params, output_base_path, metadata_parameter_file, primary): + """ Main work function that operates on the JSON representation of + one dataset and its metadata. Returns True. + """ + dataset_url, output_filename, \ + extra_files_path, file_name, \ + ext, out_data_name, \ + hda_id, dataset_id = set_up_config_values(json_params) + extension = query_item.get( 'extension' ) + filename = query_item.get( 'url' ) + extra_data = query_item.get( 'extra_data', None ) + if primary: + filename = ''.join( c in VALID_CHARS and c or '-' for c in filename ) + name = construct_multi_filename( hda_id, filename, extension ) + target_output_filename = os.path.normpath( '/'.join( [ output_base_path, name ] ) ) + else: + target_output_filename = output_filename + download_from_query( query_item, target_output_filename ) + if extra_data: + download_extra_data( extra_data, '_'.join( [ target_output_filename, 'files' ] ) ) + metadata_parameter_file.write( metadata_to_json( dataset_id, query_item, + target_output_filename, + primary=primary) ) + return True + + +def set_up_config_values(json_params): + """ Parse json_params file and return a tuple of necessary configuration + values. + """ + datasource_params = json_params.get( 'param_dict' ) + dataset_url = datasource_params.get( 'URL' ) + output_filename = datasource_params.get( 'output1', None ) + output_data = json_params.get( 'output_data' ) + extra_files_path, file_name, ext, out_data_name, hda_id, dataset_id = \ + itemgetter('extra_files_path', 'file_name', 'ext', 'out_data_name', 'hda_id', 'dataset_id')(output_data[0]) + return (dataset_url, output_filename, + extra_files_path, file_name, + ext, out_data_name, + hda_id, dataset_id) + + def download_from_json_data( options, args ): """ Parse the returned JSON data and download files. Write metadata to flat JSON file. """ + output_base_path = options.path + # read tool job configuration file and parse parameters we need json_params = json.loads( open( options.json_param_file, 'r' ).read() ) - datasource_params = json_params.get( 'param_dict' ) - dataset_id = base_dataset_id = json_params['output_data'][0]['dataset_id'] - hda_id = json_params['output_data'][0]['hda_id'] - dataset_url = json_params['URL'] - - output_filename = datasource_params.get( "output1", None ) - output_base_path = options.path + dataset_url, output_filename, \ + extra_files_path, file_name, \ + ext, out_data_name, \ + hda_id, dataset_id = set_up_config_values(json_params) + # line separated JSON file to contain all dataset metadata + metadata_parameter_file = open( json_params['job_config']['TOOL_PROVIDED_JOB_METADATA_FILE'], 'wb' ) # get JSON response from data source # TODO: make sure response is not enormous query_params = json.loads(urllib.urlopen( dataset_url ).read()) - metadata_to_write = [] # download and write files + primary = False + # query_item, hda_id, output_base_path, dataset_id for query_item in query_params: if isinstance( query_item, list ): - # do something with the nested list as a collection + # TODO: do something with the nested list as a collection for query_subitem in query_item: - multi_name = construct_multi_filename( hda_id, output_filename, extension ) - target_output_filename = os.path.normpath( '/'.join( [ output_base_path, multi_name ] ) ) - download_from_query( query_subitem, target_output_filename ) - if query_item.get( 'extra_data' ): - download_extra_data( query_item.get( 'extra_data' ), output_base_path ) - metadata_to_write.append( query_item ) + primary = download_files_and_write_metadata(query_subitem, json_params, output_base_path, + metadata_parameter_file, primary) elif isinstance( query_item, dict ): - # what is the difference between hda_id and dataset_id? - multi_name = construct_multi_filename( hda_id, output_filename, extension ) - target_output_filename = os.path.normpath( [ '/'.join( output_base_path, multi_name ) ] ) - download_from_query( query_item, target_output_filename ) - if query_item.get( 'extra_data' ): - download_extra_data( query_item.get( 'extra_data' ), output_base_path ) - metadata_to_write.append( query_item ) - - with open( json_params['job_config']['TOOL_PROVIDED_JOB_METADATA_FILE'], 'wb' ) as metadata_parameter_file: - # write JSON metadata from flattened list - metadata_parameter_file.write( json.dumps( metadata_to_write ) ) - + primary = download_files_and_write_metadata(query_item, json_params, output_base_path, + metadata_parameter_file, primary) + metadata_parameter_file.close() def __main__(): """ Read the JSON return from a data source. Parse each line and request @@ -119,7 +179,7 @@ parser.add_option("-j", "--json_param_file", type="string", action="store", dest="json_param_file", help="json schema return data") parser.add_option("-p", "--path", type="string", - action="store", dest="newfilepath", help="new file path") + action="store", dest="path", help="new file path") (options, args) = parser.parse_args() download_from_json_data( options, args )