comparison json_data_source.py @ 4:96103d66b7af

Properly handle extra data paths, write complete line separated JSON for Galaxy to scoop up and set metadata.
author Matt Shirley <mdshw5@gmail.com>
date Wed, 02 Jul 2014 09:33:03 -0400
parents 988f34ef5c9f
children 33fa019735a4
comparison
equal deleted inserted replaced
3:988f34ef5c9f 4:96103d66b7af
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 import json 2 import json
3 import optparse 3 import optparse
4 import urllib 4 import urllib
5 import os.path 5 import os.path
6 import os
7 from operator import itemgetter
6 8
7 CHUNK_SIZE = 2**20 #1mb 9 CHUNK_SIZE = 2**20 #1mb
10 VALID_CHARS = '.-()[]0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ '
8 11
9 12
10 def chunk_write( source_stream, target_stream, source_method = "read", target_method="write" ): 13 def chunk_write( source_stream, target_stream, source_method = "read", target_method="write" ):
11 source_method = getattr( source_stream, source_method ) 14 source_method = getattr( source_stream, source_method )
12 target_method = getattr( target_stream, target_method ) 15 target_method = getattr( target_stream, target_method )
42 query_stream.close() 45 query_stream.close()
43 output_stream.close() 46 output_stream.close()
44 47
45 48
46 def download_extra_data( query_ext_data, base_path ): 49 def download_extra_data( query_ext_data, base_path ):
50 """ Download any extra data defined in the JSON.
51 NOTE: the "path" value is a relative path to the file on our
52 file system. This is slightly dangerous and we should make every effort
53 to avoid a malicious absolute path to write the file elsewhere on the
54 filesystem.
55 """
47 for ext_data in query_ext_data: 56 for ext_data in query_ext_data:
57 if not os.path.exists( base_path ):
58 os.mkdir( base_path )
48 query_stream = urllib.urlopen( ext_data.get( 'url' ) ) 59 query_stream = urllib.urlopen( ext_data.get( 'url' ) )
49 output_stream = open( os.path.normpath( '/'.join( [ base_path, extra_item.get( 'path' ) ] ) ), 'wb' ) 60 ext_path = ext_data.get( 'path' )
61 os.makedirs( os.path.normpath( '/'.join( [ base_path, os.path.dirname( ext_path ) ] ) ) )
62 output_stream = open( os.path.normpath( '/'.join( [ base_path, ext_path ] ) ), 'wb' )
50 chunk_write( query_stream, output_stream ) 63 chunk_write( query_stream, output_stream )
51 query_stream.close() 64 query_stream.close()
52 output_stream.close() 65 output_stream.close()
66
67
68 def metadata_to_json( dataset_id, metadata, filename, ds_type='dataset', primary=False):
69 """ Return line separated JSON """
70 meta_dict = dict( type = ds_type,
71 ext = metadata.get( 'extension' ),
72 filename = filename,
73 name = metadata.get( 'name' ),
74 metadata = metadata.get( 'metadata' ) )
75 if metadata.get( 'extra_data', None ):
76 meta_dict[ 'extra_data' ] = '_'.join( [ filename, 'files' ] )
77 if primary:
78 meta_dict[ 'base_dataset_id' ] = dataset_id
79 else:
80 meta_dict[ 'dataset_id' ] = dataset_id
81 return "%s\n" % json.dumps( meta_dict )
82
83
84 def download_files_and_write_metadata(query_item, json_params, output_base_path, metadata_parameter_file, primary):
85 """ Main work function that operates on the JSON representation of
86 one dataset and its metadata. Returns True.
87 """
88 dataset_url, output_filename, \
89 extra_files_path, file_name, \
90 ext, out_data_name, \
91 hda_id, dataset_id = set_up_config_values(json_params)
92 extension = query_item.get( 'extension' )
93 filename = query_item.get( 'url' )
94 extra_data = query_item.get( 'extra_data', None )
95 if primary:
96 filename = ''.join( c in VALID_CHARS and c or '-' for c in filename )
97 name = construct_multi_filename( hda_id, filename, extension )
98 target_output_filename = os.path.normpath( '/'.join( [ output_base_path, name ] ) )
99 else:
100 target_output_filename = output_filename
101 download_from_query( query_item, target_output_filename )
102 if extra_data:
103 download_extra_data( extra_data, '_'.join( [ target_output_filename, 'files' ] ) )
104 metadata_parameter_file.write( metadata_to_json( dataset_id, query_item,
105 target_output_filename,
106 primary=primary) )
107 return True
108
109
110 def set_up_config_values(json_params):
111 """ Parse json_params file and return a tuple of necessary configuration
112 values.
113 """
114 datasource_params = json_params.get( 'param_dict' )
115 dataset_url = datasource_params.get( 'URL' )
116 output_filename = datasource_params.get( 'output1', None )
117 output_data = json_params.get( 'output_data' )
118 extra_files_path, file_name, ext, out_data_name, hda_id, dataset_id = \
119 itemgetter('extra_files_path', 'file_name', 'ext', 'out_data_name', 'hda_id', 'dataset_id')(output_data[0])
120 return (dataset_url, output_filename,
121 extra_files_path, file_name,
122 ext, out_data_name,
123 hda_id, dataset_id)
53 124
54 125
55 def download_from_json_data( options, args ): 126 def download_from_json_data( options, args ):
56 """ Parse the returned JSON data and download files. Write metadata 127 """ Parse the returned JSON data and download files. Write metadata
57 to flat JSON file. 128 to flat JSON file.
58 """ 129 """
130 output_base_path = options.path
131 # read tool job configuration file and parse parameters we need
59 json_params = json.loads( open( options.json_param_file, 'r' ).read() ) 132 json_params = json.loads( open( options.json_param_file, 'r' ).read() )
60 datasource_params = json_params.get( 'param_dict' ) 133 dataset_url, output_filename, \
61 dataset_id = base_dataset_id = json_params['output_data'][0]['dataset_id'] 134 extra_files_path, file_name, \
62 hda_id = json_params['output_data'][0]['hda_id'] 135 ext, out_data_name, \
63 dataset_url = json_params['URL'] 136 hda_id, dataset_id = set_up_config_values(json_params)
64 137 # line separated JSON file to contain all dataset metadata
65 output_filename = datasource_params.get( "output1", None ) 138 metadata_parameter_file = open( json_params['job_config']['TOOL_PROVIDED_JOB_METADATA_FILE'], 'wb' )
66 output_base_path = options.path
67 139
68 # get JSON response from data source 140 # get JSON response from data source
69 # TODO: make sure response is not enormous 141 # TODO: make sure response is not enormous
70 query_params = json.loads(urllib.urlopen( dataset_url ).read()) 142 query_params = json.loads(urllib.urlopen( dataset_url ).read())
71 metadata_to_write = []
72 # download and write files 143 # download and write files
144 primary = False
145 # query_item, hda_id, output_base_path, dataset_id
73 for query_item in query_params: 146 for query_item in query_params:
74 if isinstance( query_item, list ): 147 if isinstance( query_item, list ):
75 # do something with the nested list as a collection 148 # TODO: do something with the nested list as a collection
76 for query_subitem in query_item: 149 for query_subitem in query_item:
77 multi_name = construct_multi_filename( hda_id, output_filename, extension ) 150 primary = download_files_and_write_metadata(query_subitem, json_params, output_base_path,
78 target_output_filename = os.path.normpath( '/'.join( [ output_base_path, multi_name ] ) ) 151 metadata_parameter_file, primary)
79 download_from_query( query_subitem, target_output_filename )
80 if query_item.get( 'extra_data' ):
81 download_extra_data( query_item.get( 'extra_data' ), output_base_path )
82 metadata_to_write.append( query_item )
83 152
84 elif isinstance( query_item, dict ): 153 elif isinstance( query_item, dict ):
85 # what is the difference between hda_id and dataset_id? 154 primary = download_files_and_write_metadata(query_item, json_params, output_base_path,
86 multi_name = construct_multi_filename( hda_id, output_filename, extension ) 155 metadata_parameter_file, primary)
87 target_output_filename = os.path.normpath( [ '/'.join( output_base_path, multi_name ) ] ) 156 metadata_parameter_file.close()
88 download_from_query( query_item, target_output_filename )
89 if query_item.get( 'extra_data' ):
90 download_extra_data( query_item.get( 'extra_data' ), output_base_path )
91 metadata_to_write.append( query_item )
92
93 with open( json_params['job_config']['TOOL_PROVIDED_JOB_METADATA_FILE'], 'wb' ) as metadata_parameter_file:
94 # write JSON metadata from flattened list
95 metadata_parameter_file.write( json.dumps( metadata_to_write ) )
96
97 157
98 def __main__(): 158 def __main__():
99 """ Read the JSON return from a data source. Parse each line and request 159 """ Read the JSON return from a data source. Parse each line and request
100 the data, download to "newfilepath", and write metadata. 160 the data, download to "newfilepath", and write metadata.
101 161
117 usage = "Usage: json_data_source.py max_size --json_param_file filename [options]" 177 usage = "Usage: json_data_source.py max_size --json_param_file filename [options]"
118 parser = optparse.OptionParser(usage = usage) 178 parser = optparse.OptionParser(usage = usage)
119 parser.add_option("-j", "--json_param_file", type="string", 179 parser.add_option("-j", "--json_param_file", type="string",
120 action="store", dest="json_param_file", help="json schema return data") 180 action="store", dest="json_param_file", help="json schema return data")
121 parser.add_option("-p", "--path", type="string", 181 parser.add_option("-p", "--path", type="string",
122 action="store", dest="newfilepath", help="new file path") 182 action="store", dest="path", help="new file path")
123 183
124 (options, args) = parser.parse_args() 184 (options, args) = parser.parse_args()
125 download_from_json_data( options, args ) 185 download_from_json_data( options, args )
126 186
127 187