|
0
|
1 #!/usr/bin/env python
|
|
|
2 import sys, os,argparse,logging, yaml, datetime, subprocess, stat
|
|
|
3
|
|
|
4 LOG_FORMAT = '%(asctime)s|%(levelname)-8s|%(message)s'
|
|
|
5 LOG_DATEFMT = '%Y-%m-%d %H:%M:%S'
|
|
|
6 LOG_LEVELS = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
|
|
|
7
|
|
|
8 def make_parser():
|
|
|
9 parser = argparse.ArgumentParser(description='')
|
|
|
10
|
|
|
11 parser.add_argument('--ini_file', type=str, default="{0}/init_file.yaml".format(os.path.dirname(os.path.realpath(sys.argv[0]))),help='Configuration file (yaml)')
|
|
|
12
|
|
|
13 parser.add_argument('--host', type=str, required=True, help='omero host')
|
|
|
14 parser.add_argument('--user', type=str, required=True, help='omero user')
|
|
|
15 parser.add_argument('--passwd', type=str, required=True, help='omero passwd')
|
|
|
16
|
|
|
17 parser.add_argument('--galaxy_host', type=str, help='Galaxy Host (with port)')
|
|
|
18 parser.add_argument('--galaxy_api_key', type=str, help='Galaxy API key')
|
|
|
19
|
|
|
20 parser.add_argument('--operator', type=str, help='Galaxy user email')
|
|
|
21
|
|
|
22 parser.add_argument('--library', type=str, required=False, help='library name')
|
|
|
23 parser.add_argument('--folder', type=str, required=False, help='library folder')
|
|
|
24
|
|
|
25 parser.add_argument('--data_objects', type=str, required=True, help='databojects id')
|
|
|
26
|
|
|
27 parser.add_argument('--loglevel', type=str, choices=LOG_LEVELS, help='logging level (default: INFO)', default='INFO')
|
|
|
28 parser.add_argument('--logfile', type=str, help='log file (default=stderr)')
|
|
|
29
|
|
|
30 return parser
|
|
|
31
|
|
|
32 def main(argv):
|
|
|
33 global logger
|
|
|
34 global ini_file
|
|
|
35 global kb
|
|
|
36 global apiGalaxy
|
|
|
37
|
|
|
38 parser = make_parser()
|
|
|
39 args = parser.parse_args(argv)
|
|
|
40
|
|
|
41 # Initializing logger
|
|
|
42 logger = init_logger(args,logging)
|
|
|
43
|
|
|
44 # Reading YAML configuration file
|
|
|
45 ini_file = init_config(args)
|
|
|
46
|
|
|
47 # Initializing python libraries
|
|
|
48 init_pythonpath(args,ini_file)
|
|
|
49
|
|
|
50 # Initializing connection to omero biobank
|
|
|
51 kb = init_omero_biobank(args)
|
|
|
52
|
|
|
53 logger = init_logger(args,logging)
|
|
|
54
|
|
|
55 # Initializing connection to apiGalaxy
|
|
|
56 apiGalaxy = init_api_galaxy(args)
|
|
|
57
|
|
|
58 # Getting library and folder id
|
|
|
59 library_id,folder_id = get_library_and_folder_ids(args)
|
|
|
60
|
|
|
61 # Getting data_objects
|
|
|
62 data_objects = get_data_objects(args)
|
|
|
63
|
|
|
64 import_data_objects(args,data_objects,library_id,folder_id)
|
|
|
65
|
|
|
66 def import_data_objects(args,data_objects,library_id,folder_id):
|
|
|
67 user_import_dir = get_user_import_dir(args)
|
|
|
68
|
|
|
69 logger.info("copying datasets in user import dir")
|
|
|
70 files = copy_in_user_import_dir(data_objects,user_import_dir)
|
|
|
71
|
|
|
72 logger.info("wait while copiyng")
|
|
|
73 polling(files)
|
|
|
74
|
|
|
75 logger.info("ready to import in library {0} under folder {1}".format(args.library,args.folder))
|
|
|
76
|
|
|
77 logger.info('importing in library')
|
|
|
78 successfull = 0
|
|
|
79 for file_type,folder in user_import_dir.iteritems():
|
|
|
80 if len(os.listdir(folder)) == 0: continue
|
|
|
81 if 'fastq' in file_type: file_type = 'fastqsanger'
|
|
|
82 status = apiGalaxy.libraries.upload_file_from_server(library_id, folder, folder_id, file_type=file_type,link_data_only='link_to_files')
|
|
|
83 successfull+=len(status)
|
|
|
84 if successfull == len(files):
|
|
|
85 logger.info("SUCCESS")
|
|
|
86 else:
|
|
|
87 logger.critical("ERROR WHILE IMPORTING")
|
|
|
88
|
|
|
89 raise SystemExit
|
|
|
90
|
|
|
91 def copy_in_user_import_dir(data_objects,user_import_dir):
|
|
|
92 files = list()
|
|
|
93
|
|
|
94 for dobj in data_objects:
|
|
|
95 if dobj.path.startswith('irods://'):
|
|
|
96 irods_path = dobj.path.replace('irods://','')
|
|
|
97 phys_path = irods.get_object_info(irods_path)['phys_path'].strip()
|
|
|
98
|
|
|
99 elif dobj.path.startswith('file://'):
|
|
|
100 irods_path = None
|
|
|
101 phys_path = dobj.path.replace('file://','')
|
|
|
102 else:
|
|
|
103 #continue
|
|
|
104 irods_path = dobj.path.replace('irods://','')
|
|
|
105 phys_path = irods.get_object_info(irods_path)['phys_path'].strip()
|
|
|
106
|
|
|
107 data_type = dobj.mimetype.split('/')[-1].replace('+64','')
|
|
|
108 dest_path = get_destination_path(irods_path,phys_path,dobj.sample,user_import_dir,data_type).strip()
|
|
|
109 #rsync_command = "qsub -b y /usr/bin/rsync -rcLPhv {0} {1}".format(phys_path,dest_path)
|
|
|
110 rsync_command = "rsync -rcLPhv {0} {1}".format(phys_path,dest_path)
|
|
|
111 logger.info('launching copy for {0} dataset'.format(os.path.basename(dest_path)))
|
|
|
112 subprocess.Popen(rsync_command.split(' '))
|
|
|
113
|
|
|
114 files.append(dest_path)
|
|
|
115
|
|
|
116 return files
|
|
|
117
|
|
|
118 def polling(files):
|
|
|
119 all_done = False
|
|
|
120 founds = list()
|
|
|
121 while not all_done:
|
|
|
122 done = True
|
|
|
123 for dest_path in files:
|
|
|
124 if dest_path.endswith('.gz'):
|
|
|
125 unzip_path = dest_path.replace('.gz','')
|
|
|
126 if not os.path.exists(dest_path) and not os.path.exists(unzip_path):
|
|
|
127 done = False
|
|
|
128 elif os.path.exists(dest_path):
|
|
|
129
|
|
|
130 done = False
|
|
|
131 logger.info("found {0}".format(os.path.basename(dest_path)))
|
|
|
132 logger.info("gunzipping {0}".format(os.path.basename(dest_path)))
|
|
|
133 cmd = "gunzip {0}".format(dest_path)
|
|
|
134 g_unzip = subprocess.check_output(cmd, stderr=subprocess.STDOUT,shell=True).strip()
|
|
|
135 logger.info(g_unzip)
|
|
|
136 else:
|
|
|
137 if not os.path.exists(dest_path):
|
|
|
138 done = False
|
|
|
139 elif os.path.exists(dest_path) and dest_path not in founds:
|
|
|
140 founds.append(dest_path)
|
|
|
141 logger.info("found {0}".format(os.path.basename(dest_path)))
|
|
|
142 all_done = done
|
|
|
143 return True
|
|
|
144
|
|
|
145 def get_user_import_dir(args):
|
|
|
146 user_import_dir = dict()
|
|
|
147 subfolder = str(datetime.datetime.now()).split('.')[0].replace(' ','_').replace(':','-')
|
|
|
148 user_import_dir={'fastq' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'fastq'),
|
|
|
149 'vcf' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'vcf'),
|
|
|
150 'bam' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'bam')
|
|
|
151 }
|
|
|
152 os.umask(0)
|
|
|
153 for k, folder in user_import_dir.iteritems():
|
|
|
154 if not os.path.exists(folder):
|
|
|
155 os.makedirs(folder,0775)
|
|
|
156 return user_import_dir
|
|
|
157
|
|
|
158 def get_destination_path(irods_path,phys_path,data_sample,user_import_dir,data_type):
|
|
|
159
|
|
|
160 if isinstance(data_sample, kb.SeqDataSample) or isinstance(data_sample, kb.AlignedSeqDataSample):
|
|
|
161 if data_sample.sample.label == 'TRAINING_tube_1' : label = 'FATHER'
|
|
|
162 elif data_sample.sample.label == 'TRAINING_tube_2' : label = 'PROBAND'
|
|
|
163 elif data_sample.sample.label == 'TRAINING_tube_3' : label = 'MOTHER'
|
|
|
164 else : continue
|
|
|
165 #label = data_sample.sample.label
|
|
|
166 if isinstance(data_sample, kb.GenomeVariationsDataSample):
|
|
|
167 label = data_sample.label
|
|
|
168
|
|
|
169 filename = "{0}/{1}".format(user_import_dir[data_type],label)
|
|
|
170
|
|
|
171 if irods_path:
|
|
|
172 attr = get_attributes(irods_path)
|
|
|
173
|
|
|
174 if attr.has_key('read'): filename = "{0}_R{1}".format(filename,attr['read'])
|
|
|
175 #if attr.has_key('lanes'): filename = "{0}_L{1}".format(filename,attr['lanes'])
|
|
|
176 if attr.has_key('compression') and attr['compression'] == 'gzip':
|
|
|
177 filename = "{0}.gz".format(filename)
|
|
|
178 else:
|
|
|
179 filename = "{0}_{1}".format(filename,os.path.basename(phys_path))
|
|
|
180 filename = filename.replace('.fq','')
|
|
|
181 return filename
|
|
|
182
|
|
|
183 def get_data_objects(args):
|
|
|
184 logger.info("getting data objects")
|
|
|
185 data_objects = list()
|
|
|
186 data_object_ids = args.data_objects.split(',')
|
|
|
187 for dataobj in kb.get_objects(kb.DataObject):
|
|
|
188 if str(dataobj.omero_id) in data_object_ids:
|
|
|
189 data_objects.append(dataobj)
|
|
|
190 logging.info("found {0}".format(len(data_objects)))
|
|
|
191 return data_objects
|
|
|
192
|
|
|
193 def get_library_and_folder_ids(args):
|
|
|
194 if args.library is None:
|
|
|
195 logger.critical("Library is a mandatory parameter")
|
|
|
196 sys.exit()
|
|
|
197 library_name = args.library.split('?')[0].replace('.',' ')
|
|
|
198 logger.info("searching for library")
|
|
|
199 orione_library = apiGalaxy.libraries.get_libraries(name="{0}".format(library_name))
|
|
|
200 if len(orione_library) == 0:
|
|
|
201 logger.critical("sorry, library {0} doesn't exist".format(library_name))
|
|
|
202 sys.exit()
|
|
|
203 library_id = orione_library[0].get('id',None)
|
|
|
204
|
|
|
205 if '?' in args.library and args.library == args.folder:
|
|
|
206 folder_name = args.library.split('?')[1].replace('.',' ')
|
|
|
207 else:
|
|
|
208 return library_id,None
|
|
|
209 logger.info("searching for folder {0}".format(folder_name))
|
|
|
210
|
|
|
211 folder = apiGalaxy.libraries.get_folders(library_id=library_id,name=u"/{0}".format(folder_name))
|
|
|
212 if len(folder) == 0:
|
|
|
213 logger.info("not found. creating it..")
|
|
|
214 try:
|
|
|
215 folder = apiGalaxy.libraries.create_folder(library_id,folder_name)
|
|
|
216 except:
|
|
|
217 logger.critical("impossible to create folder {0}".format(folder_name))
|
|
|
218 sys.exit()
|
|
|
219
|
|
|
220
|
|
|
221 folder_id = folder[0].get('id',None)
|
|
|
222
|
|
|
223 return library_id,folder_id
|
|
|
224
|
|
|
225 def get_attributes(irods_path):
|
|
|
226 cmd = ['imeta', 'ls', '-ld', irods_path]
|
|
|
227 imeta = [i.splitlines() for i in irods.__irods_check_output(cmd).split('----')]
|
|
|
228 attributes = {}
|
|
|
229 for i in imeta:
|
|
|
230 del i[0]
|
|
|
231 for a in i:
|
|
|
232 if 'attribute' in a:
|
|
|
233 key = a.split(':')[1].strip()
|
|
|
234 if 'value' in a:
|
|
|
235 value = a.split(':')[1].strip()
|
|
|
236 attributes[key] = value
|
|
|
237 return attributes
|
|
|
238
|
|
|
239 def init_logger(args,logging):
|
|
|
240 log_level = getattr(logging, args.loglevel)
|
|
|
241 kwargs = {
|
|
|
242 'format' : LOG_FORMAT,
|
|
|
243 'datefmt' : LOG_DATEFMT,
|
|
|
244 'level' : log_level}
|
|
|
245
|
|
|
246 if args.logfile:
|
|
|
247 kwargs['filename'] = args.logfile
|
|
|
248 logging.basicConfig(**kwargs)
|
|
|
249
|
|
|
250 logger = logging.getLogger( __name__ )
|
|
|
251 return logger
|
|
|
252
|
|
|
253 def init_config(args):
|
|
|
254 # Load YAML configuration file
|
|
|
255 logger.info('loading YAML configuration file: %s' % args.ini_file)
|
|
|
256 try:
|
|
|
257 ini_file = yaml.load(open(args.ini_file))
|
|
|
258 except:
|
|
|
259 logger.critical('%s is not a valid YAML configuration file' %args.ini_file)
|
|
|
260 sys.exit()
|
|
|
261
|
|
|
262 return ini_file
|
|
|
263
|
|
|
264 def init_pythonpath(args,ini_file):
|
|
|
265 logger.info('exporting pythonpath')
|
|
|
266 sys.path.reverse()
|
|
|
267 sys.path.append('/SHARE/USERFS/els7/users/galaxy/develop/usr-cluster/lib/python2.7/site-packages/')
|
|
|
268 sys.path.append('/u/galaxy/.local/lib/python2.7/site-packages/poster-0.8.1-py2.7.egg')
|
|
|
269 sys.path.append('/SHARE/USERFS/els7/users/sequencing/usr-cluster/lib/python2.7/site-packages/automator-0.1-py2.7.egg')
|
|
|
270 sys.path.append("{0}/{1}".format(ini_file['PYTHPATH'],args.host.split('.')[0]))
|
|
|
271 sys.path.reverse()
|
|
|
272
|
|
|
273 global KB
|
|
|
274 from bl.vl.kb import KnowledgeBase as KB
|
|
|
275
|
|
|
276 global irods
|
|
|
277 import automator.agent.irods as irods
|
|
|
278
|
|
|
279 #global bioblend
|
|
|
280 #import bioblend
|
|
|
281
|
|
|
282 global GalaxyInstance
|
|
|
283 from bioblend.galaxy import GalaxyInstance
|
|
|
284
|
|
|
285 def init_omero_biobank(args):
|
|
|
286 logger.info('opening kb connection to {0}'.format(args.host))
|
|
|
287
|
|
|
288 try:
|
|
|
289 kb = KB(driver='omero')(args.host, args.user, args.passwd)
|
|
|
290 return kb
|
|
|
291 except:
|
|
|
292 logger.critical('connection refused or failed')
|
|
|
293 sys.exit()
|
|
|
294
|
|
|
295 def init_api_galaxy(args):
|
|
|
296 try:
|
|
|
297 galaxy_host = args.galaxy_host or ini_file['GALAXY_HOST_{0}'.format(args.host.split('.')[0].upper())]
|
|
|
298 api_key = args.galaxy_api_key
|
|
|
299 except KeyError, ke:
|
|
|
300 msg = 'No argument passed and no global variable %s found' % ke
|
|
|
301 logger.critical(msg)
|
|
|
302 sys.exit(msg)
|
|
|
303
|
|
|
304
|
|
|
305 logger.info('opening connection to %s with key %s' %(galaxy_host,api_key) )
|
|
|
306 apiGalaxy = GalaxyInstance(galaxy_host, key=api_key)
|
|
|
307 return apiGalaxy
|
|
|
308
|
|
|
309 if __name__ == '__main__':
|
|
|
310 main(sys.argv[1:])
|