Mercurial > repos > ric > test2
comparison galaxy-tools/biobank/library/import_to_library.py @ 0:ba6cf6ede027 draft default tip
Uploaded
| author | ric |
|---|---|
| date | Wed, 28 Sep 2016 06:03:30 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:ba6cf6ede027 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 import sys, os,argparse,logging, yaml, datetime, subprocess, stat | |
| 3 | |
| 4 LOG_FORMAT = '%(asctime)s|%(levelname)-8s|%(message)s' | |
| 5 LOG_DATEFMT = '%Y-%m-%d %H:%M:%S' | |
| 6 LOG_LEVELS = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] | |
| 7 | |
| 8 def make_parser(): | |
| 9 parser = argparse.ArgumentParser(description='') | |
| 10 | |
| 11 parser.add_argument('--ini_file', type=str, default="{0}/init_file.yaml".format(os.path.dirname(os.path.realpath(sys.argv[0]))),help='Configuration file (yaml)') | |
| 12 | |
| 13 parser.add_argument('--host', type=str, required=True, help='omero host') | |
| 14 parser.add_argument('--user', type=str, required=True, help='omero user') | |
| 15 parser.add_argument('--passwd', type=str, required=True, help='omero passwd') | |
| 16 | |
| 17 parser.add_argument('--galaxy_host', type=str, help='Galaxy Host (with port)') | |
| 18 parser.add_argument('--galaxy_api_key', type=str, help='Galaxy API key') | |
| 19 | |
| 20 parser.add_argument('--operator', type=str, help='Galaxy user email') | |
| 21 | |
| 22 parser.add_argument('--library', type=str, required=False, help='library name') | |
| 23 parser.add_argument('--folder', type=str, required=False, help='library folder') | |
| 24 | |
| 25 parser.add_argument('--data_objects', type=str, required=True, help='databojects id') | |
| 26 | |
| 27 parser.add_argument('--loglevel', type=str, choices=LOG_LEVELS, help='logging level (default: INFO)', default='INFO') | |
| 28 parser.add_argument('--logfile', type=str, help='log file (default=stderr)') | |
| 29 | |
| 30 return parser | |
| 31 | |
| 32 def main(argv): | |
| 33 global logger | |
| 34 global ini_file | |
| 35 global kb | |
| 36 global apiGalaxy | |
| 37 | |
| 38 parser = make_parser() | |
| 39 args = parser.parse_args(argv) | |
| 40 | |
| 41 # Initializing logger | |
| 42 logger = init_logger(args,logging) | |
| 43 | |
| 44 # Reading YAML configuration file | |
| 45 ini_file = init_config(args) | |
| 46 | |
| 47 # Initializing python libraries | |
| 48 init_pythonpath(args,ini_file) | |
| 49 | |
| 50 # Initializing connection to omero biobank | |
| 51 kb = init_omero_biobank(args) | |
| 52 | |
| 53 logger = init_logger(args,logging) | |
| 54 | |
| 55 # Initializing connection to apiGalaxy | |
| 56 apiGalaxy = init_api_galaxy(args) | |
| 57 | |
| 58 # Getting library and folder id | |
| 59 library_id,folder_id = get_library_and_folder_ids(args) | |
| 60 | |
| 61 # Getting data_objects | |
| 62 data_objects = get_data_objects(args) | |
| 63 | |
| 64 import_data_objects(args,data_objects,library_id,folder_id) | |
| 65 | |
| 66 def import_data_objects(args,data_objects,library_id,folder_id): | |
| 67 user_import_dir = get_user_import_dir(args) | |
| 68 | |
| 69 logger.info("copying datasets in user import dir") | |
| 70 files = copy_in_user_import_dir(data_objects,user_import_dir) | |
| 71 | |
| 72 logger.info("wait while copiyng") | |
| 73 polling(files) | |
| 74 | |
| 75 logger.info("ready to import in library {0} under folder {1}".format(args.library,args.folder)) | |
| 76 | |
| 77 logger.info('importing in library') | |
| 78 successfull = 0 | |
| 79 for file_type,folder in user_import_dir.iteritems(): | |
| 80 if len(os.listdir(folder)) == 0: continue | |
| 81 if 'fastq' in file_type: file_type = 'fastqsanger' | |
| 82 status = apiGalaxy.libraries.upload_file_from_server(library_id, folder, folder_id, file_type=file_type,link_data_only='link_to_files') | |
| 83 successfull+=len(status) | |
| 84 if successfull == len(files): | |
| 85 logger.info("SUCCESS") | |
| 86 else: | |
| 87 logger.critical("ERROR WHILE IMPORTING") | |
| 88 | |
| 89 raise SystemExit | |
| 90 | |
| 91 def copy_in_user_import_dir(data_objects,user_import_dir): | |
| 92 files = list() | |
| 93 | |
| 94 for dobj in data_objects: | |
| 95 if dobj.path.startswith('irods://'): | |
| 96 irods_path = dobj.path.replace('irods://','') | |
| 97 phys_path = irods.get_object_info(irods_path)['phys_path'].strip() | |
| 98 | |
| 99 elif dobj.path.startswith('file://'): | |
| 100 irods_path = None | |
| 101 phys_path = dobj.path.replace('file://','') | |
| 102 else: | |
| 103 #continue | |
| 104 irods_path = dobj.path.replace('irods://','') | |
| 105 phys_path = irods.get_object_info(irods_path)['phys_path'].strip() | |
| 106 | |
| 107 data_type = dobj.mimetype.split('/')[-1].replace('+64','') | |
| 108 dest_path = get_destination_path(irods_path,phys_path,dobj.sample,user_import_dir,data_type).strip() | |
| 109 #rsync_command = "qsub -b y /usr/bin/rsync -rcLPhv {0} {1}".format(phys_path,dest_path) | |
| 110 rsync_command = "rsync -rcLPhv {0} {1}".format(phys_path,dest_path) | |
| 111 logger.info('launching copy for {0} dataset'.format(os.path.basename(dest_path))) | |
| 112 subprocess.Popen(rsync_command.split(' ')) | |
| 113 | |
| 114 files.append(dest_path) | |
| 115 | |
| 116 return files | |
| 117 | |
| 118 def polling(files): | |
| 119 all_done = False | |
| 120 founds = list() | |
| 121 while not all_done: | |
| 122 done = True | |
| 123 for dest_path in files: | |
| 124 if dest_path.endswith('.gz'): | |
| 125 unzip_path = dest_path.replace('.gz','') | |
| 126 if not os.path.exists(dest_path) and not os.path.exists(unzip_path): | |
| 127 done = False | |
| 128 elif os.path.exists(dest_path): | |
| 129 | |
| 130 done = False | |
| 131 logger.info("found {0}".format(os.path.basename(dest_path))) | |
| 132 logger.info("gunzipping {0}".format(os.path.basename(dest_path))) | |
| 133 cmd = "gunzip {0}".format(dest_path) | |
| 134 g_unzip = subprocess.check_output(cmd, stderr=subprocess.STDOUT,shell=True).strip() | |
| 135 logger.info(g_unzip) | |
| 136 else: | |
| 137 if not os.path.exists(dest_path): | |
| 138 done = False | |
| 139 elif os.path.exists(dest_path) and dest_path not in founds: | |
| 140 founds.append(dest_path) | |
| 141 logger.info("found {0}".format(os.path.basename(dest_path))) | |
| 142 all_done = done | |
| 143 return True | |
| 144 | |
| 145 def get_user_import_dir(args): | |
| 146 user_import_dir = dict() | |
| 147 subfolder = str(datetime.datetime.now()).split('.')[0].replace(' ','_').replace(':','-') | |
| 148 user_import_dir={'fastq' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'fastq'), | |
| 149 'vcf' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'vcf'), | |
| 150 'bam' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'bam') | |
| 151 } | |
| 152 os.umask(0) | |
| 153 for k, folder in user_import_dir.iteritems(): | |
| 154 if not os.path.exists(folder): | |
| 155 os.makedirs(folder,0775) | |
| 156 return user_import_dir | |
| 157 | |
| 158 def get_destination_path(irods_path,phys_path,data_sample,user_import_dir,data_type): | |
| 159 | |
| 160 if isinstance(data_sample, kb.SeqDataSample) or isinstance(data_sample, kb.AlignedSeqDataSample): | |
| 161 if data_sample.sample.label == 'TRAINING_tube_1' : label = 'FATHER' | |
| 162 elif data_sample.sample.label == 'TRAINING_tube_2' : label = 'PROBAND' | |
| 163 elif data_sample.sample.label == 'TRAINING_tube_3' : label = 'MOTHER' | |
| 164 else : continue | |
| 165 #label = data_sample.sample.label | |
| 166 if isinstance(data_sample, kb.GenomeVariationsDataSample): | |
| 167 label = data_sample.label | |
| 168 | |
| 169 filename = "{0}/{1}".format(user_import_dir[data_type],label) | |
| 170 | |
| 171 if irods_path: | |
| 172 attr = get_attributes(irods_path) | |
| 173 | |
| 174 if attr.has_key('read'): filename = "{0}_R{1}".format(filename,attr['read']) | |
| 175 #if attr.has_key('lanes'): filename = "{0}_L{1}".format(filename,attr['lanes']) | |
| 176 if attr.has_key('compression') and attr['compression'] == 'gzip': | |
| 177 filename = "{0}.gz".format(filename) | |
| 178 else: | |
| 179 filename = "{0}_{1}".format(filename,os.path.basename(phys_path)) | |
| 180 filename = filename.replace('.fq','') | |
| 181 return filename | |
| 182 | |
| 183 def get_data_objects(args): | |
| 184 logger.info("getting data objects") | |
| 185 data_objects = list() | |
| 186 data_object_ids = args.data_objects.split(',') | |
| 187 for dataobj in kb.get_objects(kb.DataObject): | |
| 188 if str(dataobj.omero_id) in data_object_ids: | |
| 189 data_objects.append(dataobj) | |
| 190 logging.info("found {0}".format(len(data_objects))) | |
| 191 return data_objects | |
| 192 | |
| 193 def get_library_and_folder_ids(args): | |
| 194 if args.library is None: | |
| 195 logger.critical("Library is a mandatory parameter") | |
| 196 sys.exit() | |
| 197 library_name = args.library.split('?')[0].replace('.',' ') | |
| 198 logger.info("searching for library") | |
| 199 orione_library = apiGalaxy.libraries.get_libraries(name="{0}".format(library_name)) | |
| 200 if len(orione_library) == 0: | |
| 201 logger.critical("sorry, library {0} doesn't exist".format(library_name)) | |
| 202 sys.exit() | |
| 203 library_id = orione_library[0].get('id',None) | |
| 204 | |
| 205 if '?' in args.library and args.library == args.folder: | |
| 206 folder_name = args.library.split('?')[1].replace('.',' ') | |
| 207 else: | |
| 208 return library_id,None | |
| 209 logger.info("searching for folder {0}".format(folder_name)) | |
| 210 | |
| 211 folder = apiGalaxy.libraries.get_folders(library_id=library_id,name=u"/{0}".format(folder_name)) | |
| 212 if len(folder) == 0: | |
| 213 logger.info("not found. creating it..") | |
| 214 try: | |
| 215 folder = apiGalaxy.libraries.create_folder(library_id,folder_name) | |
| 216 except: | |
| 217 logger.critical("impossible to create folder {0}".format(folder_name)) | |
| 218 sys.exit() | |
| 219 | |
| 220 | |
| 221 folder_id = folder[0].get('id',None) | |
| 222 | |
| 223 return library_id,folder_id | |
| 224 | |
| 225 def get_attributes(irods_path): | |
| 226 cmd = ['imeta', 'ls', '-ld', irods_path] | |
| 227 imeta = [i.splitlines() for i in irods.__irods_check_output(cmd).split('----')] | |
| 228 attributes = {} | |
| 229 for i in imeta: | |
| 230 del i[0] | |
| 231 for a in i: | |
| 232 if 'attribute' in a: | |
| 233 key = a.split(':')[1].strip() | |
| 234 if 'value' in a: | |
| 235 value = a.split(':')[1].strip() | |
| 236 attributes[key] = value | |
| 237 return attributes | |
| 238 | |
| 239 def init_logger(args,logging): | |
| 240 log_level = getattr(logging, args.loglevel) | |
| 241 kwargs = { | |
| 242 'format' : LOG_FORMAT, | |
| 243 'datefmt' : LOG_DATEFMT, | |
| 244 'level' : log_level} | |
| 245 | |
| 246 if args.logfile: | |
| 247 kwargs['filename'] = args.logfile | |
| 248 logging.basicConfig(**kwargs) | |
| 249 | |
| 250 logger = logging.getLogger( __name__ ) | |
| 251 return logger | |
| 252 | |
| 253 def init_config(args): | |
| 254 # Load YAML configuration file | |
| 255 logger.info('loading YAML configuration file: %s' % args.ini_file) | |
| 256 try: | |
| 257 ini_file = yaml.load(open(args.ini_file)) | |
| 258 except: | |
| 259 logger.critical('%s is not a valid YAML configuration file' %args.ini_file) | |
| 260 sys.exit() | |
| 261 | |
| 262 return ini_file | |
| 263 | |
| 264 def init_pythonpath(args,ini_file): | |
| 265 logger.info('exporting pythonpath') | |
| 266 sys.path.reverse() | |
| 267 sys.path.append('/SHARE/USERFS/els7/users/galaxy/develop/usr-cluster/lib/python2.7/site-packages/') | |
| 268 sys.path.append('/u/galaxy/.local/lib/python2.7/site-packages/poster-0.8.1-py2.7.egg') | |
| 269 sys.path.append('/SHARE/USERFS/els7/users/sequencing/usr-cluster/lib/python2.7/site-packages/automator-0.1-py2.7.egg') | |
| 270 sys.path.append("{0}/{1}".format(ini_file['PYTHPATH'],args.host.split('.')[0])) | |
| 271 sys.path.reverse() | |
| 272 | |
| 273 global KB | |
| 274 from bl.vl.kb import KnowledgeBase as KB | |
| 275 | |
| 276 global irods | |
| 277 import automator.agent.irods as irods | |
| 278 | |
| 279 #global bioblend | |
| 280 #import bioblend | |
| 281 | |
| 282 global GalaxyInstance | |
| 283 from bioblend.galaxy import GalaxyInstance | |
| 284 | |
| 285 def init_omero_biobank(args): | |
| 286 logger.info('opening kb connection to {0}'.format(args.host)) | |
| 287 | |
| 288 try: | |
| 289 kb = KB(driver='omero')(args.host, args.user, args.passwd) | |
| 290 return kb | |
| 291 except: | |
| 292 logger.critical('connection refused or failed') | |
| 293 sys.exit() | |
| 294 | |
| 295 def init_api_galaxy(args): | |
| 296 try: | |
| 297 galaxy_host = args.galaxy_host or ini_file['GALAXY_HOST_{0}'.format(args.host.split('.')[0].upper())] | |
| 298 api_key = args.galaxy_api_key | |
| 299 except KeyError, ke: | |
| 300 msg = 'No argument passed and no global variable %s found' % ke | |
| 301 logger.critical(msg) | |
| 302 sys.exit(msg) | |
| 303 | |
| 304 | |
| 305 logger.info('opening connection to %s with key %s' %(galaxy_host,api_key) ) | |
| 306 apiGalaxy = GalaxyInstance(galaxy_host, key=api_key) | |
| 307 return apiGalaxy | |
| 308 | |
| 309 if __name__ == '__main__': | |
| 310 main(sys.argv[1:]) |
