Mercurial > repos > ric > test3
diff galaxy-tools/biobank/updater/change_source_item.py @ 0:e54d14bed3f5 draft default tip
Uploaded
| author | ric |
|---|---|
| date | Thu, 29 Sep 2016 06:09:15 -0400 |
| parents | |
| children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/galaxy-tools/biobank/updater/change_source_item.py Thu Sep 29 06:09:15 2016 -0400 @@ -0,0 +1,258 @@ +# The tool changes the source of an object inside the system. +# Expected input file format is +# +# target new_source +# V1415515 V1241441 +# V1351124 V1511141 +# ..... +# +# Where target is the object whose source will be changed with the +# new_source object. New source type will be specified using the +# command line option. + +import csv, argparse, sys, os, json, time + +from bl.vl.kb import KnowledgeBase as KB +import bl.vl.utils.ome_utils as vlu +from bl.vl.utils import get_logger, LOG_LEVELS +import omero +import omero.model + + +def make_parser(): + parser = argparse.ArgumentParser(description='change the source for given items') + parser.add_argument('--logfile', type=str, help='log file (default=stderr)') + parser.add_argument('--loglevel', type=str, choices=LOG_LEVELS, + help='logger level', default='INFO') + parser.add_argument('-H', '--host', type=str, help='omero hostname') + parser.add_argument('-U', '--user', type=str, help='omero user') + parser.add_argument('-P', '--passwd', type=str, help='omero password') + parser.add_argument('--operator', type=str, required=True, + help='operator username') + parser.add_argument('--in_file', type=str, required=True, + help='list of items with new sources') + parser.add_argument('--target_type', type=str, required=True, + help='type of the target objects') + parser.add_argument('--source_type', type=str, required=True, + help='type of the new source objects') + return parser + + +def do_check(records, targets, sources, + target_type, source_type, + kb, logger): + logger.info('Starting consistency checks') + src_map = dict([(s.id, s) for s in sources]) + trg_map = dict([(t.id, t) for t in targets]) + good_records = [] + targets = {} + sources = {} + for i, r in enumerate(records): + if r['target'] not in trg_map: + logger.warning('No %s with ID %s, rejecting record %d' % (target_type, + r['target'], i)) + continue + if r['new_source'] not in src_map: + logger.warning('No %s with ID %s, rejecting record %d' % (source_type, + r['new_source'], i)) + continue + targets[r['target']] = trg_map[r['target']] + sources[r['new_source']] = src_map[r['new_source']] + good_records.append(r) + logger.info('Done with consistency checks') + return good_records, targets, sources + + +def update_data(records, targets, sources, operator, act_conf, + kb, logger, batch_size = 500): + def get_chunk(batch_size, records): + offset = 0 + while len(records[offset:]) > 0: + yield records[offset:offset+batch_size] + offset += batch_size + dev = get_device(kb, logger) + for i, recs in enumerate(get_chunk(batch_size, records)): + logger.info('Updating batch %d' % i) + batch_to_save = [] + edges_to_delete = [] + for r in recs: + target = targets[r['target']] + # Build the ActionOnAction backup object + if not target.lastUpdate: + last_action = target.action + else: + last_action = target.lastUpdate + old_action = target.action + asconf = {'backup' : {'action' : old_action.id}} + aslabel = 'updater.update_source_item-%f' % time.time() + backup = build_action(operator, old_action.context, + dev, last_action, aslabel, + asconf, kb, logger) + target.lastUpdate = backup + # Build the Action in order to attach the new source to + # the target object + new_source = sources[r['new_source']] + if new_source.is_mapped: + new_source.unload() + asconf = act_conf + aslabel = 'updater.update_source_item-%f' % time.time() + new_act = build_action(operator, old_action.context, + dev, new_source, aslabel, + asconf, kb, logger) + target.action = new_act + if old_action.OME_TABLE == 'Action': + # no old source, just save the new action + batch_to_save.append(target) + else: + # check if the old target and the new one are different + if new_source != old_action.target: + batch_to_save.append(target) + edges_to_delete.append((old_action.target, target)) + if len(batch_to_save) > 0: + kb.save_array(batch_to_save) + else: + logger.info('No record need to be updated') + for vert in edges_to_delete: + kb.dt.destroy_edge(*vert) + + +def build_action(operator, context, device, target, + action_setup_label, action_setup_conf, + kb, logger): + if action_setup_label: + asetup = get_action_setup(action_setup_label, action_setup_conf, + kb, logger) + else: + asetup = None + aconf = { + 'device' : device, + 'actionCategory' : kb.ActionCategory.IMPORT, + 'operator' : 'operator', + 'context' : context, + 'target' : target, + } + if asetup: + aconf['setup'] = asetup + action = kb.factory.create(retrieve_action_type(target, kb), aconf) + return action + + +def retrieve_action_type(target, kb): + tklass = target.ome_obj.__class__.__name__ + for i, k in enumerate(target.ome_obj.__class__.__mro__): + if k is omero.model.IObject: + tklass = target.ome_obj.__class__.__mro__[i-1].__name__ + if tklass == 'Vessel': + return kb.ActionOnVessel + elif tklass == 'Individual': + return kb.ActionOnIndividual + elif tklass == 'DataSample': + return kb.ActionOnDataSample + elif tklass == 'DataCollectionItem': + return kb.ActionOnDataCollectionItem + elif tklass == 'Action': + return kb.ActionOnAction + # elif tklass == 'VLCollection': + # return kb.ActionOnCollection + else: + raise ValueError('No Action related to %s klass' % tklass) + + +def get_action_setup(label, conf, kb, logger): + asetup_conf = { + 'label' : label, + 'conf' : json.dumps(conf), + } + asetup = kb.factory.create(kb.ActionSetup, asetup_conf) + return asetup + + +def get_device(kb, logger): + dev_model = 'UPDATE' + dev_maker = 'CRS4' + dev_release = '0.1' + dev_label = 'updater-%s.update_source_item' % dev_release + device = kb.get_device(dev_label) + if not device: + logger.debug('No device with label %s, creating one' % dev_label) + conf = { + 'maker' : dev_maker, + 'model' : dev_model, + 'release' : dev_release, + 'label' : dev_label, + } + device = kb.factory.create(kb.Device, conf).save() + return device + + +def find_action_setup_conf(args): + action_setup_conf = {} + for x in dir(args): + if not (x.startswith('_') or x.startswith('func')): + action_setup_conf[x] = getattr(args, x) + if 'passwd' in action_setup_conf: + action_setup_conf.pop('passwd') # Storing passwords into an + # Omero obj is not a great idea... + return action_setup_conf + + +def main(argv): + parser = make_parser() + args = parser.parse_args(argv) + + logger = get_logger('change_source_item', level=args.loglevel, + filename=args.logfile) + + try: + host = args.host or vlu.ome_host() + user = args.user or vlu.ome_user() + passwd = args.passwd or vlu.ome_passwd() + except ValueError, ve: + logger.critical(ve) + sys.exit(ve) + + kb = KB(driver='omero')(host, user, passwd) + logger.info('Loading data from input file') + with open(args.in_file) as f: + reader = csv.DictReader(f, delimiter='\t') + records = list(reader) + logger.info('Loaded %d records' % len(records)) + + logger.info('Loading %s type objects' % args.target_type) + targets = kb.get_objects(getattr(kb, args.target_type)) + logger.info('Loaded %d objects' % len(targets)) + if len(targets) == 0: + msg = 'No targets loaded from the system, nothing to do' + logger.critical(msg) + sys.exit(msg) + + logger.info('Loading %s type objects' % args.source_type) + sources = kb.get_objects(getattr(kb, args.source_type)) + logger.info('Loaded %d objects' % len(sources)) + if len(sources) == 0: + msg = 'No sources loaded from the system, nothing to do' + logger.critical(msg) + sys.exit(msg) + + logger.info('Loading Action type objects') + acts = kb.get_objects(kb.Action) + logger.info('Loaded %d objects' % len(acts)) + + records, targets, sources = do_check(records, targets, sources, + args.target_type, args.source_type, + kb, logger) + if len(records) == 0: + msg = 'No records passed consistency checks, nothing to do' + logger.critical(msg) + sys.exit(msg) + + aconf = find_action_setup_conf(args) + + update_data(records, targets, sources, args.operator, + aconf, kb, logger) + + logger.info('Job completed') + + +if __name__ == '__main__': + main(sys.argv[1:])
