Mercurial > repos > ric > test3
comparison galaxy-tools/biobank/updater/change_source_item.py @ 0:e54d14bed3f5 draft default tip
Uploaded
| author | ric |
|---|---|
| date | Thu, 29 Sep 2016 06:09:15 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:e54d14bed3f5 |
|---|---|
| 1 # The tool changes the source of an object inside the system. | |
| 2 # Expected input file format is | |
| 3 # | |
| 4 # target new_source | |
| 5 # V1415515 V1241441 | |
| 6 # V1351124 V1511141 | |
| 7 # ..... | |
| 8 # | |
| 9 # Where target is the object whose source will be changed with the | |
| 10 # new_source object. New source type will be specified using the | |
| 11 # command line option. | |
| 12 | |
| 13 import csv, argparse, sys, os, json, time | |
| 14 | |
| 15 from bl.vl.kb import KnowledgeBase as KB | |
| 16 import bl.vl.utils.ome_utils as vlu | |
| 17 from bl.vl.utils import get_logger, LOG_LEVELS | |
| 18 import omero | |
| 19 import omero.model | |
| 20 | |
| 21 | |
| 22 def make_parser(): | |
| 23 parser = argparse.ArgumentParser(description='change the source for given items') | |
| 24 parser.add_argument('--logfile', type=str, help='log file (default=stderr)') | |
| 25 parser.add_argument('--loglevel', type=str, choices=LOG_LEVELS, | |
| 26 help='logger level', default='INFO') | |
| 27 parser.add_argument('-H', '--host', type=str, help='omero hostname') | |
| 28 parser.add_argument('-U', '--user', type=str, help='omero user') | |
| 29 parser.add_argument('-P', '--passwd', type=str, help='omero password') | |
| 30 parser.add_argument('--operator', type=str, required=True, | |
| 31 help='operator username') | |
| 32 parser.add_argument('--in_file', type=str, required=True, | |
| 33 help='list of items with new sources') | |
| 34 parser.add_argument('--target_type', type=str, required=True, | |
| 35 help='type of the target objects') | |
| 36 parser.add_argument('--source_type', type=str, required=True, | |
| 37 help='type of the new source objects') | |
| 38 return parser | |
| 39 | |
| 40 | |
| 41 def do_check(records, targets, sources, | |
| 42 target_type, source_type, | |
| 43 kb, logger): | |
| 44 logger.info('Starting consistency checks') | |
| 45 src_map = dict([(s.id, s) for s in sources]) | |
| 46 trg_map = dict([(t.id, t) for t in targets]) | |
| 47 good_records = [] | |
| 48 targets = {} | |
| 49 sources = {} | |
| 50 for i, r in enumerate(records): | |
| 51 if r['target'] not in trg_map: | |
| 52 logger.warning('No %s with ID %s, rejecting record %d' % (target_type, | |
| 53 r['target'], i)) | |
| 54 continue | |
| 55 if r['new_source'] not in src_map: | |
| 56 logger.warning('No %s with ID %s, rejecting record %d' % (source_type, | |
| 57 r['new_source'], i)) | |
| 58 continue | |
| 59 targets[r['target']] = trg_map[r['target']] | |
| 60 sources[r['new_source']] = src_map[r['new_source']] | |
| 61 good_records.append(r) | |
| 62 logger.info('Done with consistency checks') | |
| 63 return good_records, targets, sources | |
| 64 | |
| 65 | |
| 66 def update_data(records, targets, sources, operator, act_conf, | |
| 67 kb, logger, batch_size = 500): | |
| 68 def get_chunk(batch_size, records): | |
| 69 offset = 0 | |
| 70 while len(records[offset:]) > 0: | |
| 71 yield records[offset:offset+batch_size] | |
| 72 offset += batch_size | |
| 73 dev = get_device(kb, logger) | |
| 74 for i, recs in enumerate(get_chunk(batch_size, records)): | |
| 75 logger.info('Updating batch %d' % i) | |
| 76 batch_to_save = [] | |
| 77 edges_to_delete = [] | |
| 78 for r in recs: | |
| 79 target = targets[r['target']] | |
| 80 # Build the ActionOnAction backup object | |
| 81 if not target.lastUpdate: | |
| 82 last_action = target.action | |
| 83 else: | |
| 84 last_action = target.lastUpdate | |
| 85 old_action = target.action | |
| 86 asconf = {'backup' : {'action' : old_action.id}} | |
| 87 aslabel = 'updater.update_source_item-%f' % time.time() | |
| 88 backup = build_action(operator, old_action.context, | |
| 89 dev, last_action, aslabel, | |
| 90 asconf, kb, logger) | |
| 91 target.lastUpdate = backup | |
| 92 # Build the Action in order to attach the new source to | |
| 93 # the target object | |
| 94 new_source = sources[r['new_source']] | |
| 95 if new_source.is_mapped: | |
| 96 new_source.unload() | |
| 97 asconf = act_conf | |
| 98 aslabel = 'updater.update_source_item-%f' % time.time() | |
| 99 new_act = build_action(operator, old_action.context, | |
| 100 dev, new_source, aslabel, | |
| 101 asconf, kb, logger) | |
| 102 target.action = new_act | |
| 103 if old_action.OME_TABLE == 'Action': | |
| 104 # no old source, just save the new action | |
| 105 batch_to_save.append(target) | |
| 106 else: | |
| 107 # check if the old target and the new one are different | |
| 108 if new_source != old_action.target: | |
| 109 batch_to_save.append(target) | |
| 110 edges_to_delete.append((old_action.target, target)) | |
| 111 if len(batch_to_save) > 0: | |
| 112 kb.save_array(batch_to_save) | |
| 113 else: | |
| 114 logger.info('No record need to be updated') | |
| 115 for vert in edges_to_delete: | |
| 116 kb.dt.destroy_edge(*vert) | |
| 117 | |
| 118 | |
| 119 def build_action(operator, context, device, target, | |
| 120 action_setup_label, action_setup_conf, | |
| 121 kb, logger): | |
| 122 if action_setup_label: | |
| 123 asetup = get_action_setup(action_setup_label, action_setup_conf, | |
| 124 kb, logger) | |
| 125 else: | |
| 126 asetup = None | |
| 127 aconf = { | |
| 128 'device' : device, | |
| 129 'actionCategory' : kb.ActionCategory.IMPORT, | |
| 130 'operator' : 'operator', | |
| 131 'context' : context, | |
| 132 'target' : target, | |
| 133 } | |
| 134 if asetup: | |
| 135 aconf['setup'] = asetup | |
| 136 action = kb.factory.create(retrieve_action_type(target, kb), aconf) | |
| 137 return action | |
| 138 | |
| 139 | |
| 140 def retrieve_action_type(target, kb): | |
| 141 tklass = target.ome_obj.__class__.__name__ | |
| 142 for i, k in enumerate(target.ome_obj.__class__.__mro__): | |
| 143 if k is omero.model.IObject: | |
| 144 tklass = target.ome_obj.__class__.__mro__[i-1].__name__ | |
| 145 if tklass == 'Vessel': | |
| 146 return kb.ActionOnVessel | |
| 147 elif tklass == 'Individual': | |
| 148 return kb.ActionOnIndividual | |
| 149 elif tklass == 'DataSample': | |
| 150 return kb.ActionOnDataSample | |
| 151 elif tklass == 'DataCollectionItem': | |
| 152 return kb.ActionOnDataCollectionItem | |
| 153 elif tklass == 'Action': | |
| 154 return kb.ActionOnAction | |
| 155 # elif tklass == 'VLCollection': | |
| 156 # return kb.ActionOnCollection | |
| 157 else: | |
| 158 raise ValueError('No Action related to %s klass' % tklass) | |
| 159 | |
| 160 | |
| 161 def get_action_setup(label, conf, kb, logger): | |
| 162 asetup_conf = { | |
| 163 'label' : label, | |
| 164 'conf' : json.dumps(conf), | |
| 165 } | |
| 166 asetup = kb.factory.create(kb.ActionSetup, asetup_conf) | |
| 167 return asetup | |
| 168 | |
| 169 | |
| 170 def get_device(kb, logger): | |
| 171 dev_model = 'UPDATE' | |
| 172 dev_maker = 'CRS4' | |
| 173 dev_release = '0.1' | |
| 174 dev_label = 'updater-%s.update_source_item' % dev_release | |
| 175 device = kb.get_device(dev_label) | |
| 176 if not device: | |
| 177 logger.debug('No device with label %s, creating one' % dev_label) | |
| 178 conf = { | |
| 179 'maker' : dev_maker, | |
| 180 'model' : dev_model, | |
| 181 'release' : dev_release, | |
| 182 'label' : dev_label, | |
| 183 } | |
| 184 device = kb.factory.create(kb.Device, conf).save() | |
| 185 return device | |
| 186 | |
| 187 | |
| 188 def find_action_setup_conf(args): | |
| 189 action_setup_conf = {} | |
| 190 for x in dir(args): | |
| 191 if not (x.startswith('_') or x.startswith('func')): | |
| 192 action_setup_conf[x] = getattr(args, x) | |
| 193 if 'passwd' in action_setup_conf: | |
| 194 action_setup_conf.pop('passwd') # Storing passwords into an | |
| 195 # Omero obj is not a great idea... | |
| 196 return action_setup_conf | |
| 197 | |
| 198 | |
| 199 def main(argv): | |
| 200 parser = make_parser() | |
| 201 args = parser.parse_args(argv) | |
| 202 | |
| 203 logger = get_logger('change_source_item', level=args.loglevel, | |
| 204 filename=args.logfile) | |
| 205 | |
| 206 try: | |
| 207 host = args.host or vlu.ome_host() | |
| 208 user = args.user or vlu.ome_user() | |
| 209 passwd = args.passwd or vlu.ome_passwd() | |
| 210 except ValueError, ve: | |
| 211 logger.critical(ve) | |
| 212 sys.exit(ve) | |
| 213 | |
| 214 kb = KB(driver='omero')(host, user, passwd) | |
| 215 logger.info('Loading data from input file') | |
| 216 with open(args.in_file) as f: | |
| 217 reader = csv.DictReader(f, delimiter='\t') | |
| 218 records = list(reader) | |
| 219 logger.info('Loaded %d records' % len(records)) | |
| 220 | |
| 221 logger.info('Loading %s type objects' % args.target_type) | |
| 222 targets = kb.get_objects(getattr(kb, args.target_type)) | |
| 223 logger.info('Loaded %d objects' % len(targets)) | |
| 224 if len(targets) == 0: | |
| 225 msg = 'No targets loaded from the system, nothing to do' | |
| 226 logger.critical(msg) | |
| 227 sys.exit(msg) | |
| 228 | |
| 229 logger.info('Loading %s type objects' % args.source_type) | |
| 230 sources = kb.get_objects(getattr(kb, args.source_type)) | |
| 231 logger.info('Loaded %d objects' % len(sources)) | |
| 232 if len(sources) == 0: | |
| 233 msg = 'No sources loaded from the system, nothing to do' | |
| 234 logger.critical(msg) | |
| 235 sys.exit(msg) | |
| 236 | |
| 237 logger.info('Loading Action type objects') | |
| 238 acts = kb.get_objects(kb.Action) | |
| 239 logger.info('Loaded %d objects' % len(acts)) | |
| 240 | |
| 241 records, targets, sources = do_check(records, targets, sources, | |
| 242 args.target_type, args.source_type, | |
| 243 kb, logger) | |
| 244 if len(records) == 0: | |
| 245 msg = 'No records passed consistency checks, nothing to do' | |
| 246 logger.critical(msg) | |
| 247 sys.exit(msg) | |
| 248 | |
| 249 aconf = find_action_setup_conf(args) | |
| 250 | |
| 251 update_data(records, targets, sources, args.operator, | |
| 252 aconf, kb, logger) | |
| 253 | |
| 254 logger.info('Job completed') | |
| 255 | |
| 256 | |
| 257 if __name__ == '__main__': | |
| 258 main(sys.argv[1:]) |
