Mercurial > repos > ric > test2
view galaxy-tools/biobank/updater/change_source_item.py @ 0:ba6cf6ede027 draft default tip
Uploaded
| author | ric |
|---|---|
| date | Wed, 28 Sep 2016 06:03:30 -0400 |
| parents | |
| children |
line wrap: on
line source
# The tool changes the source of an object inside the system. # Expected input file format is # # target new_source # V1415515 V1241441 # V1351124 V1511141 # ..... # # Where target is the object whose source will be changed with the # new_source object. New source type will be specified using the # command line option. import csv, argparse, sys, os, json, time from bl.vl.kb import KnowledgeBase as KB import bl.vl.utils.ome_utils as vlu from bl.vl.utils import get_logger, LOG_LEVELS import omero import omero.model def make_parser(): parser = argparse.ArgumentParser(description='change the source for given items') parser.add_argument('--logfile', type=str, help='log file (default=stderr)') parser.add_argument('--loglevel', type=str, choices=LOG_LEVELS, help='logger level', default='INFO') parser.add_argument('-H', '--host', type=str, help='omero hostname') parser.add_argument('-U', '--user', type=str, help='omero user') parser.add_argument('-P', '--passwd', type=str, help='omero password') parser.add_argument('--operator', type=str, required=True, help='operator username') parser.add_argument('--in_file', type=str, required=True, help='list of items with new sources') parser.add_argument('--target_type', type=str, required=True, help='type of the target objects') parser.add_argument('--source_type', type=str, required=True, help='type of the new source objects') return parser def do_check(records, targets, sources, target_type, source_type, kb, logger): logger.info('Starting consistency checks') src_map = dict([(s.id, s) for s in sources]) trg_map = dict([(t.id, t) for t in targets]) good_records = [] targets = {} sources = {} for i, r in enumerate(records): if r['target'] not in trg_map: logger.warning('No %s with ID %s, rejecting record %d' % (target_type, r['target'], i)) continue if r['new_source'] not in src_map: logger.warning('No %s with ID %s, rejecting record %d' % (source_type, r['new_source'], i)) continue targets[r['target']] = trg_map[r['target']] sources[r['new_source']] = src_map[r['new_source']] good_records.append(r) logger.info('Done with consistency checks') return good_records, targets, sources def update_data(records, targets, sources, operator, act_conf, kb, logger, batch_size = 500): def get_chunk(batch_size, records): offset = 0 while len(records[offset:]) > 0: yield records[offset:offset+batch_size] offset += batch_size dev = get_device(kb, logger) for i, recs in enumerate(get_chunk(batch_size, records)): logger.info('Updating batch %d' % i) batch_to_save = [] edges_to_delete = [] for r in recs: target = targets[r['target']] # Build the ActionOnAction backup object if not target.lastUpdate: last_action = target.action else: last_action = target.lastUpdate old_action = target.action asconf = {'backup' : {'action' : old_action.id}} aslabel = 'updater.update_source_item-%f' % time.time() backup = build_action(operator, old_action.context, dev, last_action, aslabel, asconf, kb, logger) target.lastUpdate = backup # Build the Action in order to attach the new source to # the target object new_source = sources[r['new_source']] if new_source.is_mapped: new_source.unload() asconf = act_conf aslabel = 'updater.update_source_item-%f' % time.time() new_act = build_action(operator, old_action.context, dev, new_source, aslabel, asconf, kb, logger) target.action = new_act if old_action.OME_TABLE == 'Action': # no old source, just save the new action batch_to_save.append(target) else: # check if the old target and the new one are different if new_source != old_action.target: batch_to_save.append(target) edges_to_delete.append((old_action.target, target)) if len(batch_to_save) > 0: kb.save_array(batch_to_save) else: logger.info('No record need to be updated') for vert in edges_to_delete: kb.dt.destroy_edge(*vert) def build_action(operator, context, device, target, action_setup_label, action_setup_conf, kb, logger): if action_setup_label: asetup = get_action_setup(action_setup_label, action_setup_conf, kb, logger) else: asetup = None aconf = { 'device' : device, 'actionCategory' : kb.ActionCategory.IMPORT, 'operator' : 'operator', 'context' : context, 'target' : target, } if asetup: aconf['setup'] = asetup action = kb.factory.create(retrieve_action_type(target, kb), aconf) return action def retrieve_action_type(target, kb): tklass = target.ome_obj.__class__.__name__ for i, k in enumerate(target.ome_obj.__class__.__mro__): if k is omero.model.IObject: tklass = target.ome_obj.__class__.__mro__[i-1].__name__ if tklass == 'Vessel': return kb.ActionOnVessel elif tklass == 'Individual': return kb.ActionOnIndividual elif tklass == 'DataSample': return kb.ActionOnDataSample elif tklass == 'DataCollectionItem': return kb.ActionOnDataCollectionItem elif tklass == 'Action': return kb.ActionOnAction # elif tklass == 'VLCollection': # return kb.ActionOnCollection else: raise ValueError('No Action related to %s klass' % tklass) def get_action_setup(label, conf, kb, logger): asetup_conf = { 'label' : label, 'conf' : json.dumps(conf), } asetup = kb.factory.create(kb.ActionSetup, asetup_conf) return asetup def get_device(kb, logger): dev_model = 'UPDATE' dev_maker = 'CRS4' dev_release = '0.1' dev_label = 'updater-%s.update_source_item' % dev_release device = kb.get_device(dev_label) if not device: logger.debug('No device with label %s, creating one' % dev_label) conf = { 'maker' : dev_maker, 'model' : dev_model, 'release' : dev_release, 'label' : dev_label, } device = kb.factory.create(kb.Device, conf).save() return device def find_action_setup_conf(args): action_setup_conf = {} for x in dir(args): if not (x.startswith('_') or x.startswith('func')): action_setup_conf[x] = getattr(args, x) if 'passwd' in action_setup_conf: action_setup_conf.pop('passwd') # Storing passwords into an # Omero obj is not a great idea... return action_setup_conf def main(argv): parser = make_parser() args = parser.parse_args(argv) logger = get_logger('change_source_item', level=args.loglevel, filename=args.logfile) try: host = args.host or vlu.ome_host() user = args.user or vlu.ome_user() passwd = args.passwd or vlu.ome_passwd() except ValueError, ve: logger.critical(ve) sys.exit(ve) kb = KB(driver='omero')(host, user, passwd) logger.info('Loading data from input file') with open(args.in_file) as f: reader = csv.DictReader(f, delimiter='\t') records = list(reader) logger.info('Loaded %d records' % len(records)) logger.info('Loading %s type objects' % args.target_type) targets = kb.get_objects(getattr(kb, args.target_type)) logger.info('Loaded %d objects' % len(targets)) if len(targets) == 0: msg = 'No targets loaded from the system, nothing to do' logger.critical(msg) sys.exit(msg) logger.info('Loading %s type objects' % args.source_type) sources = kb.get_objects(getattr(kb, args.source_type)) logger.info('Loaded %d objects' % len(sources)) if len(sources) == 0: msg = 'No sources loaded from the system, nothing to do' logger.critical(msg) sys.exit(msg) logger.info('Loading Action type objects') acts = kb.get_objects(kb.Action) logger.info('Loaded %d objects' % len(acts)) records, targets, sources = do_check(records, targets, sources, args.target_type, args.source_type, kb, logger) if len(records) == 0: msg = 'No records passed consistency checks, nothing to do' logger.critical(msg) sys.exit(msg) aconf = find_action_setup_conf(args) update_data(records, targets, sources, args.operator, aconf, kb, logger) logger.info('Job completed') if __name__ == '__main__': main(sys.argv[1:])
