|
0
|
1 # The tool changes the source of an object inside the system.
|
|
|
2 # Expected input file format is
|
|
|
3 #
|
|
|
4 # target new_source
|
|
|
5 # V1415515 V1241441
|
|
|
6 # V1351124 V1511141
|
|
|
7 # .....
|
|
|
8 #
|
|
|
9 # Where target is the object whose source will be changed with the
|
|
|
10 # new_source object. New source type will be specified using the
|
|
|
11 # command line option.
|
|
|
12
|
|
|
13 import csv, argparse, sys, os, json, time
|
|
|
14
|
|
|
15 from bl.vl.kb import KnowledgeBase as KB
|
|
|
16 import bl.vl.utils.ome_utils as vlu
|
|
|
17 from bl.vl.utils import get_logger, LOG_LEVELS
|
|
|
18 import omero
|
|
|
19 import omero.model
|
|
|
20
|
|
|
21
|
|
|
22 def make_parser():
|
|
|
23 parser = argparse.ArgumentParser(description='change the source for given items')
|
|
|
24 parser.add_argument('--logfile', type=str, help='log file (default=stderr)')
|
|
|
25 parser.add_argument('--loglevel', type=str, choices=LOG_LEVELS,
|
|
|
26 help='logger level', default='INFO')
|
|
|
27 parser.add_argument('-H', '--host', type=str, help='omero hostname')
|
|
|
28 parser.add_argument('-U', '--user', type=str, help='omero user')
|
|
|
29 parser.add_argument('-P', '--passwd', type=str, help='omero password')
|
|
|
30 parser.add_argument('--operator', type=str, required=True,
|
|
|
31 help='operator username')
|
|
|
32 parser.add_argument('--in_file', type=str, required=True,
|
|
|
33 help='list of items with new sources')
|
|
|
34 parser.add_argument('--target_type', type=str, required=True,
|
|
|
35 help='type of the target objects')
|
|
|
36 parser.add_argument('--source_type', type=str, required=True,
|
|
|
37 help='type of the new source objects')
|
|
|
38 return parser
|
|
|
39
|
|
|
40
|
|
|
41 def do_check(records, targets, sources,
|
|
|
42 target_type, source_type,
|
|
|
43 kb, logger):
|
|
|
44 logger.info('Starting consistency checks')
|
|
|
45 src_map = dict([(s.id, s) for s in sources])
|
|
|
46 trg_map = dict([(t.id, t) for t in targets])
|
|
|
47 good_records = []
|
|
|
48 targets = {}
|
|
|
49 sources = {}
|
|
|
50 for i, r in enumerate(records):
|
|
|
51 if r['target'] not in trg_map:
|
|
|
52 logger.warning('No %s with ID %s, rejecting record %d' % (target_type,
|
|
|
53 r['target'], i))
|
|
|
54 continue
|
|
|
55 if r['new_source'] not in src_map:
|
|
|
56 logger.warning('No %s with ID %s, rejecting record %d' % (source_type,
|
|
|
57 r['new_source'], i))
|
|
|
58 continue
|
|
|
59 targets[r['target']] = trg_map[r['target']]
|
|
|
60 sources[r['new_source']] = src_map[r['new_source']]
|
|
|
61 good_records.append(r)
|
|
|
62 logger.info('Done with consistency checks')
|
|
|
63 return good_records, targets, sources
|
|
|
64
|
|
|
65
|
|
|
66 def update_data(records, targets, sources, operator, act_conf,
|
|
|
67 kb, logger, batch_size = 500):
|
|
|
68 def get_chunk(batch_size, records):
|
|
|
69 offset = 0
|
|
|
70 while len(records[offset:]) > 0:
|
|
|
71 yield records[offset:offset+batch_size]
|
|
|
72 offset += batch_size
|
|
|
73 dev = get_device(kb, logger)
|
|
|
74 for i, recs in enumerate(get_chunk(batch_size, records)):
|
|
|
75 logger.info('Updating batch %d' % i)
|
|
|
76 batch_to_save = []
|
|
|
77 edges_to_delete = []
|
|
|
78 for r in recs:
|
|
|
79 target = targets[r['target']]
|
|
|
80 # Build the ActionOnAction backup object
|
|
|
81 if not target.lastUpdate:
|
|
|
82 last_action = target.action
|
|
|
83 else:
|
|
|
84 last_action = target.lastUpdate
|
|
|
85 old_action = target.action
|
|
|
86 asconf = {'backup' : {'action' : old_action.id}}
|
|
|
87 aslabel = 'updater.update_source_item-%f' % time.time()
|
|
|
88 backup = build_action(operator, old_action.context,
|
|
|
89 dev, last_action, aslabel,
|
|
|
90 asconf, kb, logger)
|
|
|
91 target.lastUpdate = backup
|
|
|
92 # Build the Action in order to attach the new source to
|
|
|
93 # the target object
|
|
|
94 new_source = sources[r['new_source']]
|
|
|
95 if new_source.is_mapped:
|
|
|
96 new_source.unload()
|
|
|
97 asconf = act_conf
|
|
|
98 aslabel = 'updater.update_source_item-%f' % time.time()
|
|
|
99 new_act = build_action(operator, old_action.context,
|
|
|
100 dev, new_source, aslabel,
|
|
|
101 asconf, kb, logger)
|
|
|
102 target.action = new_act
|
|
|
103 if old_action.OME_TABLE == 'Action':
|
|
|
104 # no old source, just save the new action
|
|
|
105 batch_to_save.append(target)
|
|
|
106 else:
|
|
|
107 # check if the old target and the new one are different
|
|
|
108 if new_source != old_action.target:
|
|
|
109 batch_to_save.append(target)
|
|
|
110 edges_to_delete.append((old_action.target, target))
|
|
|
111 if len(batch_to_save) > 0:
|
|
|
112 kb.save_array(batch_to_save)
|
|
|
113 else:
|
|
|
114 logger.info('No record need to be updated')
|
|
|
115 for vert in edges_to_delete:
|
|
|
116 kb.dt.destroy_edge(*vert)
|
|
|
117
|
|
|
118
|
|
|
119 def build_action(operator, context, device, target,
|
|
|
120 action_setup_label, action_setup_conf,
|
|
|
121 kb, logger):
|
|
|
122 if action_setup_label:
|
|
|
123 asetup = get_action_setup(action_setup_label, action_setup_conf,
|
|
|
124 kb, logger)
|
|
|
125 else:
|
|
|
126 asetup = None
|
|
|
127 aconf = {
|
|
|
128 'device' : device,
|
|
|
129 'actionCategory' : kb.ActionCategory.IMPORT,
|
|
|
130 'operator' : 'operator',
|
|
|
131 'context' : context,
|
|
|
132 'target' : target,
|
|
|
133 }
|
|
|
134 if asetup:
|
|
|
135 aconf['setup'] = asetup
|
|
|
136 action = kb.factory.create(retrieve_action_type(target, kb), aconf)
|
|
|
137 return action
|
|
|
138
|
|
|
139
|
|
|
140 def retrieve_action_type(target, kb):
|
|
|
141 tklass = target.ome_obj.__class__.__name__
|
|
|
142 for i, k in enumerate(target.ome_obj.__class__.__mro__):
|
|
|
143 if k is omero.model.IObject:
|
|
|
144 tklass = target.ome_obj.__class__.__mro__[i-1].__name__
|
|
|
145 if tklass == 'Vessel':
|
|
|
146 return kb.ActionOnVessel
|
|
|
147 elif tklass == 'Individual':
|
|
|
148 return kb.ActionOnIndividual
|
|
|
149 elif tklass == 'DataSample':
|
|
|
150 return kb.ActionOnDataSample
|
|
|
151 elif tklass == 'DataCollectionItem':
|
|
|
152 return kb.ActionOnDataCollectionItem
|
|
|
153 elif tklass == 'Action':
|
|
|
154 return kb.ActionOnAction
|
|
|
155 # elif tklass == 'VLCollection':
|
|
|
156 # return kb.ActionOnCollection
|
|
|
157 else:
|
|
|
158 raise ValueError('No Action related to %s klass' % tklass)
|
|
|
159
|
|
|
160
|
|
|
161 def get_action_setup(label, conf, kb, logger):
|
|
|
162 asetup_conf = {
|
|
|
163 'label' : label,
|
|
|
164 'conf' : json.dumps(conf),
|
|
|
165 }
|
|
|
166 asetup = kb.factory.create(kb.ActionSetup, asetup_conf)
|
|
|
167 return asetup
|
|
|
168
|
|
|
169
|
|
|
170 def get_device(kb, logger):
|
|
|
171 dev_model = 'UPDATE'
|
|
|
172 dev_maker = 'CRS4'
|
|
|
173 dev_release = '0.1'
|
|
|
174 dev_label = 'updater-%s.update_source_item' % dev_release
|
|
|
175 device = kb.get_device(dev_label)
|
|
|
176 if not device:
|
|
|
177 logger.debug('No device with label %s, creating one' % dev_label)
|
|
|
178 conf = {
|
|
|
179 'maker' : dev_maker,
|
|
|
180 'model' : dev_model,
|
|
|
181 'release' : dev_release,
|
|
|
182 'label' : dev_label,
|
|
|
183 }
|
|
|
184 device = kb.factory.create(kb.Device, conf).save()
|
|
|
185 return device
|
|
|
186
|
|
|
187
|
|
|
188 def find_action_setup_conf(args):
|
|
|
189 action_setup_conf = {}
|
|
|
190 for x in dir(args):
|
|
|
191 if not (x.startswith('_') or x.startswith('func')):
|
|
|
192 action_setup_conf[x] = getattr(args, x)
|
|
|
193 if 'passwd' in action_setup_conf:
|
|
|
194 action_setup_conf.pop('passwd') # Storing passwords into an
|
|
|
195 # Omero obj is not a great idea...
|
|
|
196 return action_setup_conf
|
|
|
197
|
|
|
198
|
|
|
199 def main(argv):
|
|
|
200 parser = make_parser()
|
|
|
201 args = parser.parse_args(argv)
|
|
|
202
|
|
|
203 logger = get_logger('change_source_item', level=args.loglevel,
|
|
|
204 filename=args.logfile)
|
|
|
205
|
|
|
206 try:
|
|
|
207 host = args.host or vlu.ome_host()
|
|
|
208 user = args.user or vlu.ome_user()
|
|
|
209 passwd = args.passwd or vlu.ome_passwd()
|
|
|
210 except ValueError, ve:
|
|
|
211 logger.critical(ve)
|
|
|
212 sys.exit(ve)
|
|
|
213
|
|
|
214 kb = KB(driver='omero')(host, user, passwd)
|
|
|
215 logger.info('Loading data from input file')
|
|
|
216 with open(args.in_file) as f:
|
|
|
217 reader = csv.DictReader(f, delimiter='\t')
|
|
|
218 records = list(reader)
|
|
|
219 logger.info('Loaded %d records' % len(records))
|
|
|
220
|
|
|
221 logger.info('Loading %s type objects' % args.target_type)
|
|
|
222 targets = kb.get_objects(getattr(kb, args.target_type))
|
|
|
223 logger.info('Loaded %d objects' % len(targets))
|
|
|
224 if len(targets) == 0:
|
|
|
225 msg = 'No targets loaded from the system, nothing to do'
|
|
|
226 logger.critical(msg)
|
|
|
227 sys.exit(msg)
|
|
|
228
|
|
|
229 logger.info('Loading %s type objects' % args.source_type)
|
|
|
230 sources = kb.get_objects(getattr(kb, args.source_type))
|
|
|
231 logger.info('Loaded %d objects' % len(sources))
|
|
|
232 if len(sources) == 0:
|
|
|
233 msg = 'No sources loaded from the system, nothing to do'
|
|
|
234 logger.critical(msg)
|
|
|
235 sys.exit(msg)
|
|
|
236
|
|
|
237 logger.info('Loading Action type objects')
|
|
|
238 acts = kb.get_objects(kb.Action)
|
|
|
239 logger.info('Loaded %d objects' % len(acts))
|
|
|
240
|
|
|
241 records, targets, sources = do_check(records, targets, sources,
|
|
|
242 args.target_type, args.source_type,
|
|
|
243 kb, logger)
|
|
|
244 if len(records) == 0:
|
|
|
245 msg = 'No records passed consistency checks, nothing to do'
|
|
|
246 logger.critical(msg)
|
|
|
247 sys.exit(msg)
|
|
|
248
|
|
|
249 aconf = find_action_setup_conf(args)
|
|
|
250
|
|
|
251 update_data(records, targets, sources, args.operator,
|
|
|
252 aconf, kb, logger)
|
|
|
253
|
|
|
254 logger.info('Job completed')
|
|
|
255
|
|
|
256
|
|
|
257 if __name__ == '__main__':
|
|
|
258 main(sys.argv[1:])
|