diff galaxy-tools/biobank/updater/change_source_item.py @ 0:e54d14bed3f5 draft default tip

Uploaded
author ric
date Thu, 29 Sep 2016 06:09:15 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/galaxy-tools/biobank/updater/change_source_item.py	Thu Sep 29 06:09:15 2016 -0400
@@ -0,0 +1,258 @@
+# The tool changes the source of an object inside the system.
+# Expected input file format is
+#
+# target      new_source
+# V1415515    V1241441
+# V1351124    V1511141
+# .....
+#
+# Where target is the object whose source will be changed with the
+# new_source object.  New source type will be specified using the
+# command line option.
+
+import csv, argparse, sys, os, json, time
+
+from bl.vl.kb import KnowledgeBase as KB
+import bl.vl.utils.ome_utils as vlu
+from bl.vl.utils import get_logger, LOG_LEVELS
+import omero
+import omero.model
+
+
+def make_parser():
+    parser = argparse.ArgumentParser(description='change the source for given items')
+    parser.add_argument('--logfile', type=str, help='log file (default=stderr)')
+    parser.add_argument('--loglevel', type=str, choices=LOG_LEVELS,
+                        help='logger level', default='INFO')
+    parser.add_argument('-H', '--host', type=str, help='omero hostname')
+    parser.add_argument('-U', '--user', type=str, help='omero user')
+    parser.add_argument('-P', '--passwd', type=str, help='omero password')
+    parser.add_argument('--operator', type=str, required=True,
+                        help='operator username')
+    parser.add_argument('--in_file', type=str, required=True,
+                        help='list of items with new sources')
+    parser.add_argument('--target_type', type=str, required=True,
+                        help='type of the target objects')
+    parser.add_argument('--source_type', type=str, required=True,
+                        help='type of the new source objects')
+    return parser
+
+
+def do_check(records, targets, sources, 
+             target_type, source_type,
+             kb, logger):
+    logger.info('Starting consistency checks')
+    src_map = dict([(s.id, s) for s in sources])
+    trg_map = dict([(t.id, t) for t in targets])
+    good_records = []
+    targets = {}
+    sources = {}
+    for i, r in enumerate(records):
+        if r['target'] not in trg_map:
+            logger.warning('No %s with ID %s, rejecting record %d' % (target_type,
+                                                                      r['target'], i))
+            continue
+        if r['new_source'] not in src_map:
+            logger.warning('No %s with ID %s, rejecting record %d' % (source_type,
+                                                                      r['new_source'], i))
+            continue
+        targets[r['target']] = trg_map[r['target']]
+        sources[r['new_source']] = src_map[r['new_source']]
+        good_records.append(r)
+    logger.info('Done with consistency checks')
+    return good_records, targets, sources
+
+
+def update_data(records, targets, sources, operator, act_conf,
+                kb, logger, batch_size = 500):
+    def get_chunk(batch_size, records):
+        offset = 0
+        while len(records[offset:]) > 0:
+            yield records[offset:offset+batch_size]
+            offset += batch_size
+    dev = get_device(kb, logger)
+    for i, recs in enumerate(get_chunk(batch_size, records)):
+        logger.info('Updating batch %d' % i)
+        batch_to_save = []
+        edges_to_delete = []
+        for r in recs:
+            target = targets[r['target']]
+            # Build the ActionOnAction backup object
+            if not target.lastUpdate:
+                last_action = target.action
+            else:
+                last_action = target.lastUpdate
+            old_action = target.action
+            asconf = {'backup' : {'action' : old_action.id}}
+            aslabel = 'updater.update_source_item-%f' % time.time()
+            backup = build_action(operator, old_action.context,
+                                  dev, last_action, aslabel,
+                                  asconf, kb, logger)
+            target.lastUpdate = backup
+            # Build the Action in order to attach the new source to
+            # the target object
+            new_source = sources[r['new_source']]
+            if new_source.is_mapped:
+                new_source.unload()
+            asconf = act_conf
+            aslabel = 'updater.update_source_item-%f' % time.time()
+            new_act = build_action(operator, old_action.context,
+                                   dev, new_source, aslabel,
+                                   asconf, kb, logger)
+            target.action = new_act
+            if old_action.OME_TABLE == 'Action':
+                # no old source, just save the new action
+                batch_to_save.append(target)
+            else:
+                # check if the old target and the new one are different
+                if new_source != old_action.target:
+                    batch_to_save.append(target)
+                    edges_to_delete.append((old_action.target, target))
+        if len(batch_to_save) > 0:
+            kb.save_array(batch_to_save)
+        else:
+            logger.info('No record need to be updated')
+        for vert in edges_to_delete:            
+            kb.dt.destroy_edge(*vert)
+
+
+def build_action(operator, context, device, target,
+                 action_setup_label, action_setup_conf,
+                 kb, logger):
+    if action_setup_label:
+        asetup = get_action_setup(action_setup_label, action_setup_conf,
+                                  kb, logger)
+    else:
+        asetup = None
+    aconf = {
+        'device' : device,
+        'actionCategory' : kb.ActionCategory.IMPORT,
+        'operator' : 'operator',
+        'context' : context,
+        'target' : target,
+        }
+    if asetup:
+        aconf['setup'] = asetup
+        action = kb.factory.create(retrieve_action_type(target, kb), aconf)
+    return action
+
+
+def retrieve_action_type(target, kb):
+    tklass = target.ome_obj.__class__.__name__
+    for i, k in enumerate(target.ome_obj.__class__.__mro__):
+        if k is omero.model.IObject:
+            tklass = target.ome_obj.__class__.__mro__[i-1].__name__
+    if tklass == 'Vessel':
+        return kb.ActionOnVessel
+    elif tklass == 'Individual':
+        return kb.ActionOnIndividual
+    elif tklass == 'DataSample':
+        return kb.ActionOnDataSample
+    elif tklass == 'DataCollectionItem':
+        return kb.ActionOnDataCollectionItem
+    elif tklass == 'Action':
+        return kb.ActionOnAction
+    # elif tklass == 'VLCollection':
+    #     return kb.ActionOnCollection
+    else:
+        raise ValueError('No Action related to %s klass' % tklass)
+            
+
+def get_action_setup(label, conf, kb, logger):
+    asetup_conf = {
+        'label' : label,
+        'conf' : json.dumps(conf),
+        }
+    asetup = kb.factory.create(kb.ActionSetup, asetup_conf)
+    return asetup
+
+
+def get_device(kb, logger):
+    dev_model = 'UPDATE'
+    dev_maker = 'CRS4'
+    dev_release = '0.1'
+    dev_label = 'updater-%s.update_source_item' % dev_release
+    device = kb.get_device(dev_label)
+    if not device:
+        logger.debug('No device with label %s, creating one' % dev_label)
+        conf = {
+            'maker' : dev_maker,
+            'model' : dev_model,
+            'release' : dev_release,
+            'label' : dev_label,
+            }
+        device = kb.factory.create(kb.Device, conf).save()
+    return device
+
+
+def find_action_setup_conf(args):
+    action_setup_conf = {}
+    for x in dir(args):
+        if not (x.startswith('_') or x.startswith('func')):
+            action_setup_conf[x] = getattr(args, x)
+    if 'passwd' in action_setup_conf:
+        action_setup_conf.pop('passwd') # Storing passwords into an
+                                        # Omero obj is not a great idea...
+    return action_setup_conf
+
+
+def main(argv):
+    parser = make_parser()
+    args = parser.parse_args(argv)
+
+    logger = get_logger('change_source_item', level=args.loglevel,
+                        filename=args.logfile)
+
+    try:
+        host = args.host or vlu.ome_host()
+        user = args.user or vlu.ome_user()
+        passwd = args.passwd or vlu.ome_passwd()
+    except ValueError, ve:
+        logger.critical(ve)
+        sys.exit(ve)
+
+    kb = KB(driver='omero')(host, user, passwd)
+    logger.info('Loading data from input file')
+    with open(args.in_file) as f:
+        reader = csv.DictReader(f, delimiter='\t')
+        records = list(reader)
+    logger.info('Loaded %d records' % len(records))
+
+    logger.info('Loading %s type objects' % args.target_type)
+    targets = kb.get_objects(getattr(kb, args.target_type))
+    logger.info('Loaded %d objects' % len(targets))
+    if len(targets) == 0:
+        msg = 'No targets loaded from the system, nothing to do'
+        logger.critical(msg)
+        sys.exit(msg)
+
+    logger.info('Loading %s type objects' % args.source_type)
+    sources = kb.get_objects(getattr(kb, args.source_type))
+    logger.info('Loaded %d objects' % len(sources))
+    if len(sources) == 0:
+        msg = 'No sources loaded from the system, nothing to do'
+        logger.critical(msg)
+        sys.exit(msg)
+
+    logger.info('Loading Action type objects')
+    acts = kb.get_objects(kb.Action)
+    logger.info('Loaded %d objects' % len(acts))
+
+    records, targets, sources = do_check(records, targets, sources,
+                                         args.target_type, args.source_type,
+                                         kb, logger)
+    if len(records) == 0:
+        msg = 'No records passed consistency checks, nothing to do'
+        logger.critical(msg)
+        sys.exit(msg)
+
+    aconf = find_action_setup_conf(args)
+
+    update_data(records, targets, sources, args.operator, 
+                aconf, kb, logger)
+        
+    logger.info('Job completed')
+
+
+if __name__ == '__main__':
+    main(sys.argv[1:])