Mercurial > repos > ric > test2
view galaxy-tools/biobank/library/import_to_library.py @ 0:ba6cf6ede027 draft default tip
Uploaded
| author | ric |
|---|---|
| date | Wed, 28 Sep 2016 06:03:30 -0400 |
| parents | |
| children |
line wrap: on
line source
#!/usr/bin/env python import sys, os,argparse,logging, yaml, datetime, subprocess, stat LOG_FORMAT = '%(asctime)s|%(levelname)-8s|%(message)s' LOG_DATEFMT = '%Y-%m-%d %H:%M:%S' LOG_LEVELS = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] def make_parser(): parser = argparse.ArgumentParser(description='') parser.add_argument('--ini_file', type=str, default="{0}/init_file.yaml".format(os.path.dirname(os.path.realpath(sys.argv[0]))),help='Configuration file (yaml)') parser.add_argument('--host', type=str, required=True, help='omero host') parser.add_argument('--user', type=str, required=True, help='omero user') parser.add_argument('--passwd', type=str, required=True, help='omero passwd') parser.add_argument('--galaxy_host', type=str, help='Galaxy Host (with port)') parser.add_argument('--galaxy_api_key', type=str, help='Galaxy API key') parser.add_argument('--operator', type=str, help='Galaxy user email') parser.add_argument('--library', type=str, required=False, help='library name') parser.add_argument('--folder', type=str, required=False, help='library folder') parser.add_argument('--data_objects', type=str, required=True, help='databojects id') parser.add_argument('--loglevel', type=str, choices=LOG_LEVELS, help='logging level (default: INFO)', default='INFO') parser.add_argument('--logfile', type=str, help='log file (default=stderr)') return parser def main(argv): global logger global ini_file global kb global apiGalaxy parser = make_parser() args = parser.parse_args(argv) # Initializing logger logger = init_logger(args,logging) # Reading YAML configuration file ini_file = init_config(args) # Initializing python libraries init_pythonpath(args,ini_file) # Initializing connection to omero biobank kb = init_omero_biobank(args) logger = init_logger(args,logging) # Initializing connection to apiGalaxy apiGalaxy = init_api_galaxy(args) # Getting library and folder id library_id,folder_id = get_library_and_folder_ids(args) # Getting data_objects data_objects = get_data_objects(args) import_data_objects(args,data_objects,library_id,folder_id) def import_data_objects(args,data_objects,library_id,folder_id): user_import_dir = get_user_import_dir(args) logger.info("copying datasets in user import dir") files = copy_in_user_import_dir(data_objects,user_import_dir) logger.info("wait while copiyng") polling(files) logger.info("ready to import in library {0} under folder {1}".format(args.library,args.folder)) logger.info('importing in library') successfull = 0 for file_type,folder in user_import_dir.iteritems(): if len(os.listdir(folder)) == 0: continue if 'fastq' in file_type: file_type = 'fastqsanger' status = apiGalaxy.libraries.upload_file_from_server(library_id, folder, folder_id, file_type=file_type,link_data_only='link_to_files') successfull+=len(status) if successfull == len(files): logger.info("SUCCESS") else: logger.critical("ERROR WHILE IMPORTING") raise SystemExit def copy_in_user_import_dir(data_objects,user_import_dir): files = list() for dobj in data_objects: if dobj.path.startswith('irods://'): irods_path = dobj.path.replace('irods://','') phys_path = irods.get_object_info(irods_path)['phys_path'].strip() elif dobj.path.startswith('file://'): irods_path = None phys_path = dobj.path.replace('file://','') else: #continue irods_path = dobj.path.replace('irods://','') phys_path = irods.get_object_info(irods_path)['phys_path'].strip() data_type = dobj.mimetype.split('/')[-1].replace('+64','') dest_path = get_destination_path(irods_path,phys_path,dobj.sample,user_import_dir,data_type).strip() #rsync_command = "qsub -b y /usr/bin/rsync -rcLPhv {0} {1}".format(phys_path,dest_path) rsync_command = "rsync -rcLPhv {0} {1}".format(phys_path,dest_path) logger.info('launching copy for {0} dataset'.format(os.path.basename(dest_path))) subprocess.Popen(rsync_command.split(' ')) files.append(dest_path) return files def polling(files): all_done = False founds = list() while not all_done: done = True for dest_path in files: if dest_path.endswith('.gz'): unzip_path = dest_path.replace('.gz','') if not os.path.exists(dest_path) and not os.path.exists(unzip_path): done = False elif os.path.exists(dest_path): done = False logger.info("found {0}".format(os.path.basename(dest_path))) logger.info("gunzipping {0}".format(os.path.basename(dest_path))) cmd = "gunzip {0}".format(dest_path) g_unzip = subprocess.check_output(cmd, stderr=subprocess.STDOUT,shell=True).strip() logger.info(g_unzip) else: if not os.path.exists(dest_path): done = False elif os.path.exists(dest_path) and dest_path not in founds: founds.append(dest_path) logger.info("found {0}".format(os.path.basename(dest_path))) all_done = done return True def get_user_import_dir(args): user_import_dir = dict() subfolder = str(datetime.datetime.now()).split('.')[0].replace(' ','_').replace(':','-') user_import_dir={'fastq' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'fastq'), 'vcf' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'vcf'), 'bam' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'bam') } os.umask(0) for k, folder in user_import_dir.iteritems(): if not os.path.exists(folder): os.makedirs(folder,0775) return user_import_dir def get_destination_path(irods_path,phys_path,data_sample,user_import_dir,data_type): if isinstance(data_sample, kb.SeqDataSample) or isinstance(data_sample, kb.AlignedSeqDataSample): if data_sample.sample.label == 'TRAINING_tube_1' : label = 'FATHER' elif data_sample.sample.label == 'TRAINING_tube_2' : label = 'PROBAND' elif data_sample.sample.label == 'TRAINING_tube_3' : label = 'MOTHER' else : continue #label = data_sample.sample.label if isinstance(data_sample, kb.GenomeVariationsDataSample): label = data_sample.label filename = "{0}/{1}".format(user_import_dir[data_type],label) if irods_path: attr = get_attributes(irods_path) if attr.has_key('read'): filename = "{0}_R{1}".format(filename,attr['read']) #if attr.has_key('lanes'): filename = "{0}_L{1}".format(filename,attr['lanes']) if attr.has_key('compression') and attr['compression'] == 'gzip': filename = "{0}.gz".format(filename) else: filename = "{0}_{1}".format(filename,os.path.basename(phys_path)) filename = filename.replace('.fq','') return filename def get_data_objects(args): logger.info("getting data objects") data_objects = list() data_object_ids = args.data_objects.split(',') for dataobj in kb.get_objects(kb.DataObject): if str(dataobj.omero_id) in data_object_ids: data_objects.append(dataobj) logging.info("found {0}".format(len(data_objects))) return data_objects def get_library_and_folder_ids(args): if args.library is None: logger.critical("Library is a mandatory parameter") sys.exit() library_name = args.library.split('?')[0].replace('.',' ') logger.info("searching for library") orione_library = apiGalaxy.libraries.get_libraries(name="{0}".format(library_name)) if len(orione_library) == 0: logger.critical("sorry, library {0} doesn't exist".format(library_name)) sys.exit() library_id = orione_library[0].get('id',None) if '?' in args.library and args.library == args.folder: folder_name = args.library.split('?')[1].replace('.',' ') else: return library_id,None logger.info("searching for folder {0}".format(folder_name)) folder = apiGalaxy.libraries.get_folders(library_id=library_id,name=u"/{0}".format(folder_name)) if len(folder) == 0: logger.info("not found. creating it..") try: folder = apiGalaxy.libraries.create_folder(library_id,folder_name) except: logger.critical("impossible to create folder {0}".format(folder_name)) sys.exit() folder_id = folder[0].get('id',None) return library_id,folder_id def get_attributes(irods_path): cmd = ['imeta', 'ls', '-ld', irods_path] imeta = [i.splitlines() for i in irods.__irods_check_output(cmd).split('----')] attributes = {} for i in imeta: del i[0] for a in i: if 'attribute' in a: key = a.split(':')[1].strip() if 'value' in a: value = a.split(':')[1].strip() attributes[key] = value return attributes def init_logger(args,logging): log_level = getattr(logging, args.loglevel) kwargs = { 'format' : LOG_FORMAT, 'datefmt' : LOG_DATEFMT, 'level' : log_level} if args.logfile: kwargs['filename'] = args.logfile logging.basicConfig(**kwargs) logger = logging.getLogger( __name__ ) return logger def init_config(args): # Load YAML configuration file logger.info('loading YAML configuration file: %s' % args.ini_file) try: ini_file = yaml.load(open(args.ini_file)) except: logger.critical('%s is not a valid YAML configuration file' %args.ini_file) sys.exit() return ini_file def init_pythonpath(args,ini_file): logger.info('exporting pythonpath') sys.path.reverse() sys.path.append('/SHARE/USERFS/els7/users/galaxy/develop/usr-cluster/lib/python2.7/site-packages/') sys.path.append('/u/galaxy/.local/lib/python2.7/site-packages/poster-0.8.1-py2.7.egg') sys.path.append('/SHARE/USERFS/els7/users/sequencing/usr-cluster/lib/python2.7/site-packages/automator-0.1-py2.7.egg') sys.path.append("{0}/{1}".format(ini_file['PYTHPATH'],args.host.split('.')[0])) sys.path.reverse() global KB from bl.vl.kb import KnowledgeBase as KB global irods import automator.agent.irods as irods #global bioblend #import bioblend global GalaxyInstance from bioblend.galaxy import GalaxyInstance def init_omero_biobank(args): logger.info('opening kb connection to {0}'.format(args.host)) try: kb = KB(driver='omero')(args.host, args.user, args.passwd) return kb except: logger.critical('connection refused or failed') sys.exit() def init_api_galaxy(args): try: galaxy_host = args.galaxy_host or ini_file['GALAXY_HOST_{0}'.format(args.host.split('.')[0].upper())] api_key = args.galaxy_api_key except KeyError, ke: msg = 'No argument passed and no global variable %s found' % ke logger.critical(msg) sys.exit(msg) logger.info('opening connection to %s with key %s' %(galaxy_host,api_key) ) apiGalaxy = GalaxyInstance(galaxy_host, key=api_key) return apiGalaxy if __name__ == '__main__': main(sys.argv[1:])
