view galaxy-tools/biobank/library/import_to_library.py @ 0:ba6cf6ede027 draft default tip

Uploaded
author ric
date Wed, 28 Sep 2016 06:03:30 -0400
parents
children
line wrap: on
line source

#!/usr/bin/env python
import sys, os,argparse,logging, yaml, datetime, subprocess, stat

LOG_FORMAT = '%(asctime)s|%(levelname)-8s|%(message)s'
LOG_DATEFMT = '%Y-%m-%d %H:%M:%S'
LOG_LEVELS = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']

def make_parser():
    parser = argparse.ArgumentParser(description='')

    parser.add_argument('--ini_file', type=str, default="{0}/init_file.yaml".format(os.path.dirname(os.path.realpath(sys.argv[0]))),help='Configuration file (yaml)')

    parser.add_argument('--host', type=str, required=True, help='omero host')
    parser.add_argument('--user', type=str, required=True, help='omero user')
    parser.add_argument('--passwd', type=str, required=True, help='omero passwd')

    parser.add_argument('--galaxy_host', type=str, help='Galaxy Host (with port)')
    parser.add_argument('--galaxy_api_key', type=str, help='Galaxy API key')

    parser.add_argument('--operator', type=str, help='Galaxy user email')

    parser.add_argument('--library', type=str, required=False, help='library name')
    parser.add_argument('--folder', type=str, required=False, help='library folder')

    parser.add_argument('--data_objects', type=str, required=True, help='databojects id')

    parser.add_argument('--loglevel', type=str, choices=LOG_LEVELS, help='logging level (default: INFO)', default='INFO')
    parser.add_argument('--logfile', type=str, help='log file (default=stderr)')

    return parser

def main(argv):
    global logger
    global ini_file
    global kb
    global apiGalaxy

    parser = make_parser()
    args = parser.parse_args(argv)

     # Initializing logger
    logger = init_logger(args,logging)

    # Reading YAML configuration file
    ini_file = init_config(args)

    # Initializing python libraries
    init_pythonpath(args,ini_file)

    # Initializing connection to omero biobank
    kb = init_omero_biobank(args)

    logger = init_logger(args,logging)

    # Initializing connection to apiGalaxy
    apiGalaxy = init_api_galaxy(args)

    # Getting library and folder id
    library_id,folder_id = get_library_and_folder_ids(args)

    # Getting data_objects
    data_objects = get_data_objects(args)

    import_data_objects(args,data_objects,library_id,folder_id)

def import_data_objects(args,data_objects,library_id,folder_id):
    user_import_dir = get_user_import_dir(args)

    logger.info("copying datasets in user import dir")
    files = copy_in_user_import_dir(data_objects,user_import_dir)

    logger.info("wait while copiyng")
    polling(files)

    logger.info("ready to import in library {0} under folder {1}".format(args.library,args.folder))

    logger.info('importing in library')
    successfull = 0
    for file_type,folder in user_import_dir.iteritems():
        if len(os.listdir(folder)) == 0: continue
        if 'fastq' in file_type: file_type = 'fastqsanger'
        status = apiGalaxy.libraries.upload_file_from_server(library_id, folder, folder_id, file_type=file_type,link_data_only='link_to_files')
        successfull+=len(status)
    if successfull == len(files):
        logger.info("SUCCESS")
    else:
        logger.critical("ERROR WHILE IMPORTING")

    raise SystemExit

def copy_in_user_import_dir(data_objects,user_import_dir):
    files = list()

    for dobj in data_objects:
        if dobj.path.startswith('irods://'):
            irods_path = dobj.path.replace('irods://','')
            phys_path = irods.get_object_info(irods_path)['phys_path'].strip()

        elif dobj.path.startswith('file://'):
            irods_path = None
            phys_path = dobj.path.replace('file://','')
        else: 
            #continue
            irods_path = dobj.path.replace('irods://','')
            phys_path = irods.get_object_info(irods_path)['phys_path'].strip()

        data_type = dobj.mimetype.split('/')[-1].replace('+64','')
        dest_path = get_destination_path(irods_path,phys_path,dobj.sample,user_import_dir,data_type).strip()
        #rsync_command = "qsub -b y /usr/bin/rsync -rcLPhv {0} {1}".format(phys_path,dest_path)
        rsync_command = "rsync -rcLPhv {0} {1}".format(phys_path,dest_path)
        logger.info('launching copy for {0} dataset'.format(os.path.basename(dest_path)))
        subprocess.Popen(rsync_command.split(' '))

        files.append(dest_path)

    return files

def polling(files):
    all_done = False
    founds = list()
    while not all_done:
        done = True
        for dest_path in files:
            if dest_path.endswith('.gz'):
                unzip_path = dest_path.replace('.gz','')
                if not os.path.exists(dest_path) and not os.path.exists(unzip_path):
                    done = False
                elif os.path.exists(dest_path):
                    
                    done = False
                    logger.info("found {0}".format(os.path.basename(dest_path)))
                    logger.info("gunzipping {0}".format(os.path.basename(dest_path)))
                    cmd = "gunzip {0}".format(dest_path)
                    g_unzip = subprocess.check_output(cmd, stderr=subprocess.STDOUT,shell=True).strip()
                    logger.info(g_unzip)
            else:
                if not os.path.exists(dest_path):
                    done = False
                elif os.path.exists(dest_path) and dest_path not in founds:
                    founds.append(dest_path)
                    logger.info("found {0}".format(os.path.basename(dest_path)))
        all_done = done
    return True

def get_user_import_dir(args):
    user_import_dir = dict()
    subfolder = str(datetime.datetime.now()).split('.')[0].replace(' ','_').replace(':','-')
    user_import_dir={'fastq' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'fastq'),
                    'vcf' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'vcf'),
                    'bam' : "{0}/{1}/{2}_{3}".format(ini_file['LIBRARY_IMPORT_DIR_{0}'.format(args.host.split('.')[0].upper())],args.operator,subfolder,'bam')
    }
    os.umask(0)
    for k, folder in user_import_dir.iteritems():
        if not os.path.exists(folder):
            os.makedirs(folder,0775)
    return user_import_dir

def get_destination_path(irods_path,phys_path,data_sample,user_import_dir,data_type):

    if isinstance(data_sample, kb.SeqDataSample) or isinstance(data_sample, kb.AlignedSeqDataSample):
        if data_sample.sample.label == 'TRAINING_tube_1' : label = 'FATHER'
        elif data_sample.sample.label == 'TRAINING_tube_2' : label = 'PROBAND'
        elif data_sample.sample.label == 'TRAINING_tube_3' : label = 'MOTHER'
        else : continue
        #label = data_sample.sample.label
    if isinstance(data_sample, kb.GenomeVariationsDataSample):
        label = data_sample.label

    filename = "{0}/{1}".format(user_import_dir[data_type],label)

    if irods_path:
        attr = get_attributes(irods_path)

        if attr.has_key('read'): filename = "{0}_R{1}".format(filename,attr['read'])
        #if attr.has_key('lanes'): filename = "{0}_L{1}".format(filename,attr['lanes'])
        if attr.has_key('compression') and attr['compression'] == 'gzip':
            filename = "{0}.gz".format(filename)
    else:
        filename = "{0}_{1}".format(filename,os.path.basename(phys_path))
        filename = filename.replace('.fq','')
    return filename

def get_data_objects(args):
    logger.info("getting data objects")
    data_objects = list()
    data_object_ids = args.data_objects.split(',')
    for dataobj in kb.get_objects(kb.DataObject):
        if str(dataobj.omero_id) in data_object_ids:
            data_objects.append(dataobj)
    logging.info("found {0}".format(len(data_objects)))
    return data_objects

def get_library_and_folder_ids(args):
    if args.library is None:
        logger.critical("Library is a mandatory parameter")
        sys.exit()
    library_name = args.library.split('?')[0].replace('.',' ')
    logger.info("searching for library")
    orione_library = apiGalaxy.libraries.get_libraries(name="{0}".format(library_name))
    if len(orione_library) == 0:
        logger.critical("sorry, library {0} doesn't exist".format(library_name))
        sys.exit()
    library_id = orione_library[0].get('id',None)

    if '?' in args.library and args.library == args.folder:
        folder_name = args.library.split('?')[1].replace('.',' ')
    else:
        return library_id,None
    logger.info("searching for folder {0}".format(folder_name))

    folder = apiGalaxy.libraries.get_folders(library_id=library_id,name=u"/{0}".format(folder_name))
    if len(folder) == 0:
        logger.info("not found. creating it..")
        try:
            folder = apiGalaxy.libraries.create_folder(library_id,folder_name)
        except:
            logger.critical("impossible to create folder {0}".format(folder_name))
            sys.exit()


    folder_id = folder[0].get('id',None)

    return library_id,folder_id

def get_attributes(irods_path):
    cmd = ['imeta', 'ls', '-ld', irods_path]
    imeta = [i.splitlines() for i in irods.__irods_check_output(cmd).split('----')]
    attributes = {}
    for i in imeta:
        del i[0]
        for a in i:
            if 'attribute' in a:
                key = a.split(':')[1].strip()
            if 'value' in a:
                value = a.split(':')[1].strip()
                attributes[key] = value
    return attributes

def init_logger(args,logging):
    log_level = getattr(logging, args.loglevel)
    kwargs = {
        'format'  : LOG_FORMAT,
        'datefmt' : LOG_DATEFMT,
        'level'   : log_level}

    if args.logfile:
        kwargs['filename'] = args.logfile
    logging.basicConfig(**kwargs)

    logger = logging.getLogger( __name__ )
    return logger

def init_config(args):
    # Load YAML configuration file
    logger.info('loading YAML configuration file: %s' % args.ini_file)
    try:
        ini_file = yaml.load(open(args.ini_file))
    except:
        logger.critical('%s is not a valid YAML configuration file' %args.ini_file)
        sys.exit()

    return ini_file

def init_pythonpath(args,ini_file):
    logger.info('exporting pythonpath')
    sys.path.reverse()
    sys.path.append('/SHARE/USERFS/els7/users/galaxy/develop/usr-cluster/lib/python2.7/site-packages/')
    sys.path.append('/u/galaxy/.local/lib/python2.7/site-packages/poster-0.8.1-py2.7.egg')
    sys.path.append('/SHARE/USERFS/els7/users/sequencing/usr-cluster/lib/python2.7/site-packages/automator-0.1-py2.7.egg')
    sys.path.append("{0}/{1}".format(ini_file['PYTHPATH'],args.host.split('.')[0]))
    sys.path.reverse()

    global KB
    from bl.vl.kb import KnowledgeBase as KB

    global irods
    import automator.agent.irods as irods

    #global bioblend
    #import bioblend

    global GalaxyInstance
    from bioblend.galaxy import GalaxyInstance

def init_omero_biobank(args):
    logger.info('opening kb connection to {0}'.format(args.host))

    try:
        kb = KB(driver='omero')(args.host, args.user, args.passwd)
        return kb
    except:
        logger.critical('connection refused or failed')
        sys.exit()

def init_api_galaxy(args):
    try:
        galaxy_host = args.galaxy_host or ini_file['GALAXY_HOST_{0}'.format(args.host.split('.')[0].upper())]
        api_key = args.galaxy_api_key
    except KeyError, ke:
        msg = 'No argument passed and no global variable %s found' % ke
        logger.critical(msg)
        sys.exit(msg)


    logger.info('opening connection to %s with key %s' %(galaxy_host,api_key) )
    apiGalaxy = GalaxyInstance(galaxy_host, key=api_key)
    return apiGalaxy

if __name__ == '__main__':
    main(sys.argv[1:])