Mercurial > repos > ric > test2
diff galaxy-tools/biobank/library/import_to_library.py @ 0:ba6cf6ede027 draft default tip
Uploaded
| author | ric |
|---|---|
| date | Wed, 28 Sep 2016 06:03:30 -0400 |
| parents | |
| children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/galaxy-tools/biobank/library/import_to_library.py Wed Sep 28 06:03:30 2016 -0400 @@ -0,0 +1,310 @@ +#!/usr/bin/env python +import sys, os,argparse,logging, yaml, datetime, subprocess, stat + +LOG_FORMAT = '%(asctime)s|%(levelname)-8s|%(message)s' +LOG_DATEFMT = '%Y-%m-%d %H:%M:%S' +LOG_LEVELS = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] + +def make_parser(): + parser = argparse.ArgumentParser(description='') + + parser.add_argument('--ini_file', type=str, default="{0}/init_file.yaml".format(os.path.dirname(os.path.realpath(sys.argv[0]))),help='Configuration file (yaml)') + + parser.add_argument('--host', type=str, required=True, help='omero host') + parser.add_argument('--user', type=str, required=True, help='omero user') + parser.add_argument('--passwd', type=str, required=True, help='omero passwd') + + parser.add_argument('--galaxy_host', type=str, help='Galaxy Host (with port)') + parser.add_argument('--galaxy_api_key', type=str, help='Galaxy API key') + + parser.add_argument('--operator', type=str, help='Galaxy user email') + + parser.add_argument('--library', type=str, required=False, help='library name') + parser.add_argument('--folder', type=str, required=False, help='library folder') + + parser.add_argument('--data_objects', type=str, required=True, help='databojects id') + + parser.add_argument('--loglevel', type=str, choices=LOG_LEVELS, help='logging level (default: INFO)', default='INFO') + parser.add_argument('--logfile', type=str, help='log file (default=stderr)') + + return parser + +def main(argv): + global logger + global ini_file + global kb + global apiGalaxy + + parser = make_parser() + args = parser.parse_args(argv) + + # Initializing logger + logger = init_logger(args,logging) + + # Reading YAML configuration file + ini_file = init_config(args) + + # Initializing python libraries + init_pythonpath(args,ini_file) + + # Initializing connection to omero biobank + kb = init_omero_biobank(args) + + logger = init_logger(args,logging) + + # Initializing connection to apiGalaxy + apiGalaxy = init_api_galaxy(args) + + # Getting library and folder id + library_id,folder_id = get_library_and_folder_ids(args) + + # Getting data_objects + data_objects = get_data_objects(args) + + import_data_objects(args,data_objects,library_id,folder_id) + +def import_data_objects(args,data_objects,library_id,folder_id): + user_import_dir = get_user_import_dir(args) + + logger.info("copying datasets in user import dir") + files = copy_in_user_import_dir(data_objects,user_import_dir) + + logger.info("wait while copiyng") + polling(files) + + logger.info("ready to import in library {0} under folder {1}".format(args.library,args.folder)) + + logger.info('importing in library') + successfull = 0 + for file_type,folder in user_import_dir.iteritems(): + if len(os.listdir(folder)) == 0: continue + if 'fastq' in file_type: file_type = 'fastqsanger' + status = apiGalaxy.libraries.upload_file_from_server(library_id, folder, folder_id, file_type=file_type,link_data_only='link_to_files') + successfull+=len(status) + if successfull == len(files): + logger.info("SUCCESS") + else: + logger.critical("ERROR WHILE IMPORTING") + + raise SystemExit + +def copy_in_user_import_dir(data_objects,user_import_dir): + files = list() + + for dobj in data_objects: + if dobj.path.startswith('irods://'): + irods_path = dobj.path.replace('irods://','') + phys_path = irods.get_object_info(irods_path)['phys_path'].strip() + + elif dobj.path.startswith('file://'): + irods_path = None + phys_path = dobj.path.replace('file://','') + else: + #continue + irods_path = dobj.path.replace('irods://','') + phys_path = irods.get_object_info(irods_path)['phys_path'].strip() + + data_type = dobj.mimetype.split('/')[-1].replace('+64','') + dest_path = get_destination_path(irods_path,phys_path,dobj.sample,user_import_dir,data_type).strip() + #rsync_command = "qsub -b y /usr/bin/rsync -rcLPhv {0} {1}".format(phys_path,dest_path) + rsync_command = "rsync -rcLPhv {0} {1}".format(phys_path,dest_path) + logger.info('launching copy for {0} dataset'.format(os.path.basename(dest_path))) + subprocess.Popen(rsync_command.split(' ')) + + files.append(dest_path) + + return files + +def polling(files): + all_done = False + founds = list() + while not all_done: + done = True + for dest_path in files: + if dest_path.endswith('.gz'): + unzip_path = dest_path.replace('.gz','') + if not os.path.exists(dest_path) and not os.path.exists(unzip_path): + done = False + elif os.path.exists(dest_path): + + done = False + logger.info("found {0}".format(os.path.basename(dest_path))) + logger.info("gunzipping {0}".format(os.path.basename(dest_path))) + cmd = "gunzip {0}".format(dest_path) + g_unzip = subprocess.check_output(cmd, stderr=subprocess.STDOUT,shell=True).strip() + logger.info(g_unzip) + else: + if not os.path.exists(dest_path): + done = False + elif os.path.exists(dest_path) and dest_path not in founds: + founds.append(dest_path) + logger.info("found {0}".format(os.path.basename(dest_path))) + all_done = done + return True + +def get_user_import_dir(args): + user_import_dir = dict() + subfolder = str(datetime.datetime.now()).split('.')[0].replace(' ','_').replace(':','-') + user_import_dir={'fastq' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'fastq'), + 'vcf' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'vcf'), + 'bam' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'bam') + } + os.umask(0) + for k, folder in user_import_dir.iteritems(): + if not os.path.exists(folder): + os.makedirs(folder,0775) + return user_import_dir + +def get_destination_path(irods_path,phys_path,data_sample,user_import_dir,data_type): + + if isinstance(data_sample, kb.SeqDataSample) or isinstance(data_sample, kb.AlignedSeqDataSample): + if data_sample.sample.label == 'TRAINING_tube_1' : label = 'FATHER' + elif data_sample.sample.label == 'TRAINING_tube_2' : label = 'PROBAND' + elif data_sample.sample.label == 'TRAINING_tube_3' : label = 'MOTHER' + else : continue + #label = data_sample.sample.label + if isinstance(data_sample, kb.GenomeVariationsDataSample): + label = data_sample.label + + filename = "{0}/{1}".format(user_import_dir[data_type],label) + + if irods_path: + attr = get_attributes(irods_path) + + if attr.has_key('read'): filename = "{0}_R{1}".format(filename,attr['read']) + #if attr.has_key('lanes'): filename = "{0}_L{1}".format(filename,attr['lanes']) + if attr.has_key('compression') and attr['compression'] == 'gzip': + filename = "{0}.gz".format(filename) + else: + filename = "{0}_{1}".format(filename,os.path.basename(phys_path)) + filename = filename.replace('.fq','') + return filename + +def get_data_objects(args): + logger.info("getting data objects") + data_objects = list() + data_object_ids = args.data_objects.split(',') + for dataobj in kb.get_objects(kb.DataObject): + if str(dataobj.omero_id) in data_object_ids: + data_objects.append(dataobj) + logging.info("found {0}".format(len(data_objects))) + return data_objects + +def get_library_and_folder_ids(args): + if args.library is None: + logger.critical("Library is a mandatory parameter") + sys.exit() + library_name = args.library.split('?')[0].replace('.',' ') + logger.info("searching for library") + orione_library = apiGalaxy.libraries.get_libraries(name="{0}".format(library_name)) + if len(orione_library) == 0: + logger.critical("sorry, library {0} doesn't exist".format(library_name)) + sys.exit() + library_id = orione_library[0].get('id',None) + + if '?' in args.library and args.library == args.folder: + folder_name = args.library.split('?')[1].replace('.',' ') + else: + return library_id,None + logger.info("searching for folder {0}".format(folder_name)) + + folder = apiGalaxy.libraries.get_folders(library_id=library_id,name=u"/{0}".format(folder_name)) + if len(folder) == 0: + logger.info("not found. creating it..") + try: + folder = apiGalaxy.libraries.create_folder(library_id,folder_name) + except: + logger.critical("impossible to create folder {0}".format(folder_name)) + sys.exit() + + + folder_id = folder[0].get('id',None) + + return library_id,folder_id + +def get_attributes(irods_path): + cmd = ['imeta', 'ls', '-ld', irods_path] + imeta = [i.splitlines() for i in irods.__irods_check_output(cmd).split('----')] + attributes = {} + for i in imeta: + del i[0] + for a in i: + if 'attribute' in a: + key = a.split(':')[1].strip() + if 'value' in a: + value = a.split(':')[1].strip() + attributes[key] = value + return attributes + +def init_logger(args,logging): + log_level = getattr(logging, args.loglevel) + kwargs = { + 'format' : LOG_FORMAT, + 'datefmt' : LOG_DATEFMT, + 'level' : log_level} + + if args.logfile: + kwargs['filename'] = args.logfile + logging.basicConfig(**kwargs) + + logger = logging.getLogger( __name__ ) + return logger + +def init_config(args): + # Load YAML configuration file + logger.info('loading YAML configuration file: %s' % args.ini_file) + try: + ini_file = yaml.load(open(args.ini_file)) + except: + logger.critical('%s is not a valid YAML configuration file' %args.ini_file) + sys.exit() + + return ini_file + +def init_pythonpath(args,ini_file): + logger.info('exporting pythonpath') + sys.path.reverse() + sys.path.append('/SHARE/USERFS/els7/users/galaxy/develop/usr-cluster/lib/python2.7/site-packages/') + sys.path.append('/u/galaxy/.local/lib/python2.7/site-packages/poster-0.8.1-py2.7.egg') + sys.path.append('/SHARE/USERFS/els7/users/sequencing/usr-cluster/lib/python2.7/site-packages/automator-0.1-py2.7.egg') + sys.path.append("{0}/{1}".format(ini_file['PYTHPATH'],args.host.split('.')[0])) + sys.path.reverse() + + global KB + from bl.vl.kb import KnowledgeBase as KB + + global irods + import automator.agent.irods as irods + + #global bioblend + #import bioblend + + global GalaxyInstance + from bioblend.galaxy import GalaxyInstance + +def init_omero_biobank(args): + logger.info('opening kb connection to {0}'.format(args.host)) + + try: + kb = KB(driver='omero')(args.host, args.user, args.passwd) + return kb + except: + logger.critical('connection refused or failed') + sys.exit() + +def init_api_galaxy(args): + try: + galaxy_host = args.galaxy_host or ini_file['GALAXY_HOST_{0}'.format(args.host.split('.')[0].upper())] + api_key = args.galaxy_api_key + except KeyError, ke: + msg = 'No argument passed and no global variable %s found' % ke + logger.critical(msg) + sys.exit(msg) + + + logger.info('opening connection to %s with key %s' %(galaxy_host,api_key) ) + apiGalaxy = GalaxyInstance(galaxy_host, key=api_key) + return apiGalaxy + +if __name__ == '__main__': + main(sys.argv[1:])
