diff galaxy-tools/biobank/library/import_to_library.py @ 0:ba6cf6ede027 draft default tip

Uploaded
author ric
date Wed, 28 Sep 2016 06:03:30 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/galaxy-tools/biobank/library/import_to_library.py	Wed Sep 28 06:03:30 2016 -0400
@@ -0,0 +1,310 @@
+#!/usr/bin/env python
+import sys, os,argparse,logging, yaml, datetime, subprocess, stat
+
+LOG_FORMAT = '%(asctime)s|%(levelname)-8s|%(message)s'
+LOG_DATEFMT = '%Y-%m-%d %H:%M:%S'
+LOG_LEVELS = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
+
+def make_parser():
+    parser = argparse.ArgumentParser(description='')
+
+    parser.add_argument('--ini_file', type=str, default="{0}/init_file.yaml".format(os.path.dirname(os.path.realpath(sys.argv[0]))),help='Configuration file (yaml)')
+
+    parser.add_argument('--host', type=str, required=True, help='omero host')
+    parser.add_argument('--user', type=str, required=True, help='omero user')
+    parser.add_argument('--passwd', type=str, required=True, help='omero passwd')
+
+    parser.add_argument('--galaxy_host', type=str, help='Galaxy Host (with port)')
+    parser.add_argument('--galaxy_api_key', type=str, help='Galaxy API key')
+
+    parser.add_argument('--operator', type=str, help='Galaxy user email')
+
+    parser.add_argument('--library', type=str, required=False, help='library name')
+    parser.add_argument('--folder', type=str, required=False, help='library folder')
+
+    parser.add_argument('--data_objects', type=str, required=True, help='databojects id')
+
+    parser.add_argument('--loglevel', type=str, choices=LOG_LEVELS, help='logging level (default: INFO)', default='INFO')
+    parser.add_argument('--logfile', type=str, help='log file (default=stderr)')
+
+    return parser
+
+def main(argv):
+    global logger
+    global ini_file
+    global kb
+    global apiGalaxy
+
+    parser = make_parser()
+    args = parser.parse_args(argv)
+
+     # Initializing logger
+    logger = init_logger(args,logging)
+
+    # Reading YAML configuration file
+    ini_file = init_config(args)
+
+    # Initializing python libraries
+    init_pythonpath(args,ini_file)
+
+    # Initializing connection to omero biobank
+    kb = init_omero_biobank(args)
+
+    logger = init_logger(args,logging)
+
+    # Initializing connection to apiGalaxy
+    apiGalaxy = init_api_galaxy(args)
+
+    # Getting library and folder id
+    library_id,folder_id = get_library_and_folder_ids(args)
+
+    # Getting data_objects
+    data_objects = get_data_objects(args)
+
+    import_data_objects(args,data_objects,library_id,folder_id)
+
+def import_data_objects(args,data_objects,library_id,folder_id):
+    user_import_dir = get_user_import_dir(args)
+
+    logger.info("copying datasets in user import dir")
+    files = copy_in_user_import_dir(data_objects,user_import_dir)
+
+    logger.info("wait while copiyng")
+    polling(files)
+
+    logger.info("ready to import in library {0} under folder {1}".format(args.library,args.folder))
+
+    logger.info('importing in library')
+    successfull = 0
+    for file_type,folder in user_import_dir.iteritems():
+        if len(os.listdir(folder)) == 0: continue
+        if 'fastq' in file_type: file_type = 'fastqsanger'
+        status = apiGalaxy.libraries.upload_file_from_server(library_id, folder, folder_id, file_type=file_type,link_data_only='link_to_files')
+        successfull+=len(status)
+    if successfull == len(files):
+        logger.info("SUCCESS")
+    else:
+        logger.critical("ERROR WHILE IMPORTING")
+
+    raise SystemExit
+
+def copy_in_user_import_dir(data_objects,user_import_dir):
+    files = list()
+
+    for dobj in data_objects:
+        if dobj.path.startswith('irods://'):
+            irods_path = dobj.path.replace('irods://','')
+            phys_path = irods.get_object_info(irods_path)['phys_path'].strip()
+
+        elif dobj.path.startswith('file://'):
+            irods_path = None
+            phys_path = dobj.path.replace('file://','')
+        else: 
+            #continue
+            irods_path = dobj.path.replace('irods://','')
+            phys_path = irods.get_object_info(irods_path)['phys_path'].strip()
+
+        data_type = dobj.mimetype.split('/')[-1].replace('+64','')
+        dest_path = get_destination_path(irods_path,phys_path,dobj.sample,user_import_dir,data_type).strip()
+        #rsync_command = "qsub -b y /usr/bin/rsync -rcLPhv {0} {1}".format(phys_path,dest_path)
+        rsync_command = "rsync -rcLPhv {0} {1}".format(phys_path,dest_path)
+        logger.info('launching copy for {0} dataset'.format(os.path.basename(dest_path)))
+        subprocess.Popen(rsync_command.split(' '))
+
+        files.append(dest_path)
+
+    return files
+
+def polling(files):
+    all_done = False
+    founds = list()
+    while not all_done:
+        done = True
+        for dest_path in files:
+            if dest_path.endswith('.gz'):
+                unzip_path = dest_path.replace('.gz','')
+                if not os.path.exists(dest_path) and not os.path.exists(unzip_path):
+                    done = False
+                elif os.path.exists(dest_path):
+                    
+                    done = False
+                    logger.info("found {0}".format(os.path.basename(dest_path)))
+                    logger.info("gunzipping {0}".format(os.path.basename(dest_path)))
+                    cmd = "gunzip {0}".format(dest_path)
+                    g_unzip = subprocess.check_output(cmd, stderr=subprocess.STDOUT,shell=True).strip()
+                    logger.info(g_unzip)
+            else:
+                if not os.path.exists(dest_path):
+                    done = False
+                elif os.path.exists(dest_path) and dest_path not in founds:
+                    founds.append(dest_path)
+                    logger.info("found {0}".format(os.path.basename(dest_path)))
+        all_done = done
+    return True
+
+def get_user_import_dir(args):
+    user_import_dir = dict()
+    subfolder = str(datetime.datetime.now()).split('.')[0].replace(' ','_').replace(':','-')
+    user_import_dir={'fastq' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'fastq'),
+                    'vcf' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'vcf'),
+                    'bam' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'bam')
+    }
+    os.umask(0)
+    for k, folder in user_import_dir.iteritems():
+        if not os.path.exists(folder):
+            os.makedirs(folder,0775)
+    return user_import_dir
+
+def get_destination_path(irods_path,phys_path,data_sample,user_import_dir,data_type):
+
+    if isinstance(data_sample, kb.SeqDataSample) or isinstance(data_sample, kb.AlignedSeqDataSample):
+        if data_sample.sample.label == 'TRAINING_tube_1' : label = 'FATHER'
+        elif data_sample.sample.label == 'TRAINING_tube_2' : label = 'PROBAND'
+        elif data_sample.sample.label == 'TRAINING_tube_3' : label = 'MOTHER'
+        else : continue
+        #label = data_sample.sample.label
+    if isinstance(data_sample, kb.GenomeVariationsDataSample):
+        label = data_sample.label
+
+    filename = "{0}/{1}".format(user_import_dir[data_type],label)
+
+    if irods_path:
+        attr = get_attributes(irods_path)
+
+        if attr.has_key('read'): filename = "{0}_R{1}".format(filename,attr['read'])
+        #if attr.has_key('lanes'): filename = "{0}_L{1}".format(filename,attr['lanes'])
+        if attr.has_key('compression') and attr['compression'] == 'gzip':
+            filename = "{0}.gz".format(filename)
+    else:
+        filename = "{0}_{1}".format(filename,os.path.basename(phys_path))
+        filename = filename.replace('.fq','')
+    return filename
+
+def get_data_objects(args):
+    logger.info("getting data objects")
+    data_objects = list()
+    data_object_ids = args.data_objects.split(',')
+    for dataobj in kb.get_objects(kb.DataObject):
+        if str(dataobj.omero_id) in data_object_ids:
+            data_objects.append(dataobj)
+    logging.info("found {0}".format(len(data_objects)))
+    return data_objects
+
+def get_library_and_folder_ids(args):
+    if args.library is None:
+        logger.critical("Library is a mandatory parameter")
+        sys.exit()
+    library_name = args.library.split('?')[0].replace('.',' ')
+    logger.info("searching for library")
+    orione_library = apiGalaxy.libraries.get_libraries(name="{0}".format(library_name))
+    if len(orione_library) == 0:
+        logger.critical("sorry, library {0} doesn't exist".format(library_name))
+        sys.exit()
+    library_id = orione_library[0].get('id',None)
+
+    if '?' in args.library and args.library == args.folder:
+        folder_name = args.library.split('?')[1].replace('.',' ')
+    else:
+        return library_id,None
+    logger.info("searching for folder {0}".format(folder_name))
+
+    folder = apiGalaxy.libraries.get_folders(library_id=library_id,name=u"/{0}".format(folder_name))
+    if len(folder) == 0:
+        logger.info("not found. creating it..")
+        try:
+            folder = apiGalaxy.libraries.create_folder(library_id,folder_name)
+        except:
+            logger.critical("impossible to create folder {0}".format(folder_name))
+            sys.exit()
+
+
+    folder_id = folder[0].get('id',None)
+
+    return library_id,folder_id
+
+def get_attributes(irods_path):
+    cmd = ['imeta', 'ls', '-ld', irods_path]
+    imeta = [i.splitlines() for i in irods.__irods_check_output(cmd).split('----')]
+    attributes = {}
+    for i in imeta:
+        del i[0]
+        for a in i:
+            if 'attribute' in a:
+                key = a.split(':')[1].strip()
+            if 'value' in a:
+                value = a.split(':')[1].strip()
+                attributes[key] = value
+    return attributes
+
+def init_logger(args,logging):
+    log_level = getattr(logging, args.loglevel)
+    kwargs = {
+        'format'  : LOG_FORMAT,
+        'datefmt' : LOG_DATEFMT,
+        'level'   : log_level}
+
+    if args.logfile:
+        kwargs['filename'] = args.logfile
+    logging.basicConfig(**kwargs)
+
+    logger = logging.getLogger( __name__ )
+    return logger
+
+def init_config(args):
+    # Load YAML configuration file
+    logger.info('loading YAML configuration file: %s' % args.ini_file)
+    try:
+        ini_file = yaml.load(open(args.ini_file))
+    except:
+        logger.critical('%s is not a valid YAML configuration file' %args.ini_file)
+        sys.exit()
+
+    return ini_file
+
+def init_pythonpath(args,ini_file):
+    logger.info('exporting pythonpath')
+    sys.path.reverse()
+    sys.path.append('/SHARE/USERFS/els7/users/galaxy/develop/usr-cluster/lib/python2.7/site-packages/')
+    sys.path.append('/u/galaxy/.local/lib/python2.7/site-packages/poster-0.8.1-py2.7.egg')
+    sys.path.append('/SHARE/USERFS/els7/users/sequencing/usr-cluster/lib/python2.7/site-packages/automator-0.1-py2.7.egg')
+    sys.path.append("{0}/{1}".format(ini_file['PYTHPATH'],args.host.split('.')[0]))
+    sys.path.reverse()
+
+    global KB
+    from bl.vl.kb import KnowledgeBase as KB
+
+    global irods
+    import automator.agent.irods as irods
+
+    #global bioblend
+    #import bioblend
+
+    global GalaxyInstance
+    from bioblend.galaxy import GalaxyInstance
+
+def init_omero_biobank(args):
+    logger.info('opening kb connection to {0}'.format(args.host))
+
+    try:
+        kb = KB(driver='omero')(args.host, args.user, args.passwd)
+        return kb
+    except:
+        logger.critical('connection refused or failed')
+        sys.exit()
+
+def init_api_galaxy(args):
+    try:
+        galaxy_host = args.galaxy_host or ini_file['GALAXY_HOST_{0}'.format(args.host.split('.')[0].upper())]
+        api_key = args.galaxy_api_key
+    except KeyError, ke:
+        msg = 'No argument passed and no global variable %s found' % ke
+        logger.critical(msg)
+        sys.exit(msg)
+
+
+    logger.info('opening connection to %s with key %s' %(galaxy_host,api_key) )
+    apiGalaxy = GalaxyInstance(galaxy_host, key=api_key)
+    return apiGalaxy
+
+if __name__ == '__main__':
+    main(sys.argv[1:])