comparison galaxy-tools/biobank/updater/change_source_item.py @ 0:e54d14bed3f5 draft default tip

Uploaded
author ric
date Thu, 29 Sep 2016 06:09:15 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:e54d14bed3f5
1 # The tool changes the source of an object inside the system.
2 # Expected input file format is
3 #
4 # target new_source
5 # V1415515 V1241441
6 # V1351124 V1511141
7 # .....
8 #
9 # Where target is the object whose source will be changed with the
10 # new_source object. New source type will be specified using the
11 # command line option.
12
13 import csv, argparse, sys, os, json, time
14
15 from bl.vl.kb import KnowledgeBase as KB
16 import bl.vl.utils.ome_utils as vlu
17 from bl.vl.utils import get_logger, LOG_LEVELS
18 import omero
19 import omero.model
20
21
22 def make_parser():
23 parser = argparse.ArgumentParser(description='change the source for given items')
24 parser.add_argument('--logfile', type=str, help='log file (default=stderr)')
25 parser.add_argument('--loglevel', type=str, choices=LOG_LEVELS,
26 help='logger level', default='INFO')
27 parser.add_argument('-H', '--host', type=str, help='omero hostname')
28 parser.add_argument('-U', '--user', type=str, help='omero user')
29 parser.add_argument('-P', '--passwd', type=str, help='omero password')
30 parser.add_argument('--operator', type=str, required=True,
31 help='operator username')
32 parser.add_argument('--in_file', type=str, required=True,
33 help='list of items with new sources')
34 parser.add_argument('--target_type', type=str, required=True,
35 help='type of the target objects')
36 parser.add_argument('--source_type', type=str, required=True,
37 help='type of the new source objects')
38 return parser
39
40
41 def do_check(records, targets, sources,
42 target_type, source_type,
43 kb, logger):
44 logger.info('Starting consistency checks')
45 src_map = dict([(s.id, s) for s in sources])
46 trg_map = dict([(t.id, t) for t in targets])
47 good_records = []
48 targets = {}
49 sources = {}
50 for i, r in enumerate(records):
51 if r['target'] not in trg_map:
52 logger.warning('No %s with ID %s, rejecting record %d' % (target_type,
53 r['target'], i))
54 continue
55 if r['new_source'] not in src_map:
56 logger.warning('No %s with ID %s, rejecting record %d' % (source_type,
57 r['new_source'], i))
58 continue
59 targets[r['target']] = trg_map[r['target']]
60 sources[r['new_source']] = src_map[r['new_source']]
61 good_records.append(r)
62 logger.info('Done with consistency checks')
63 return good_records, targets, sources
64
65
66 def update_data(records, targets, sources, operator, act_conf,
67 kb, logger, batch_size = 500):
68 def get_chunk(batch_size, records):
69 offset = 0
70 while len(records[offset:]) > 0:
71 yield records[offset:offset+batch_size]
72 offset += batch_size
73 dev = get_device(kb, logger)
74 for i, recs in enumerate(get_chunk(batch_size, records)):
75 logger.info('Updating batch %d' % i)
76 batch_to_save = []
77 edges_to_delete = []
78 for r in recs:
79 target = targets[r['target']]
80 # Build the ActionOnAction backup object
81 if not target.lastUpdate:
82 last_action = target.action
83 else:
84 last_action = target.lastUpdate
85 old_action = target.action
86 asconf = {'backup' : {'action' : old_action.id}}
87 aslabel = 'updater.update_source_item-%f' % time.time()
88 backup = build_action(operator, old_action.context,
89 dev, last_action, aslabel,
90 asconf, kb, logger)
91 target.lastUpdate = backup
92 # Build the Action in order to attach the new source to
93 # the target object
94 new_source = sources[r['new_source']]
95 if new_source.is_mapped:
96 new_source.unload()
97 asconf = act_conf
98 aslabel = 'updater.update_source_item-%f' % time.time()
99 new_act = build_action(operator, old_action.context,
100 dev, new_source, aslabel,
101 asconf, kb, logger)
102 target.action = new_act
103 if old_action.OME_TABLE == 'Action':
104 # no old source, just save the new action
105 batch_to_save.append(target)
106 else:
107 # check if the old target and the new one are different
108 if new_source != old_action.target:
109 batch_to_save.append(target)
110 edges_to_delete.append((old_action.target, target))
111 if len(batch_to_save) > 0:
112 kb.save_array(batch_to_save)
113 else:
114 logger.info('No record need to be updated')
115 for vert in edges_to_delete:
116 kb.dt.destroy_edge(*vert)
117
118
119 def build_action(operator, context, device, target,
120 action_setup_label, action_setup_conf,
121 kb, logger):
122 if action_setup_label:
123 asetup = get_action_setup(action_setup_label, action_setup_conf,
124 kb, logger)
125 else:
126 asetup = None
127 aconf = {
128 'device' : device,
129 'actionCategory' : kb.ActionCategory.IMPORT,
130 'operator' : 'operator',
131 'context' : context,
132 'target' : target,
133 }
134 if asetup:
135 aconf['setup'] = asetup
136 action = kb.factory.create(retrieve_action_type(target, kb), aconf)
137 return action
138
139
140 def retrieve_action_type(target, kb):
141 tklass = target.ome_obj.__class__.__name__
142 for i, k in enumerate(target.ome_obj.__class__.__mro__):
143 if k is omero.model.IObject:
144 tklass = target.ome_obj.__class__.__mro__[i-1].__name__
145 if tklass == 'Vessel':
146 return kb.ActionOnVessel
147 elif tklass == 'Individual':
148 return kb.ActionOnIndividual
149 elif tklass == 'DataSample':
150 return kb.ActionOnDataSample
151 elif tklass == 'DataCollectionItem':
152 return kb.ActionOnDataCollectionItem
153 elif tklass == 'Action':
154 return kb.ActionOnAction
155 # elif tklass == 'VLCollection':
156 # return kb.ActionOnCollection
157 else:
158 raise ValueError('No Action related to %s klass' % tklass)
159
160
161 def get_action_setup(label, conf, kb, logger):
162 asetup_conf = {
163 'label' : label,
164 'conf' : json.dumps(conf),
165 }
166 asetup = kb.factory.create(kb.ActionSetup, asetup_conf)
167 return asetup
168
169
170 def get_device(kb, logger):
171 dev_model = 'UPDATE'
172 dev_maker = 'CRS4'
173 dev_release = '0.1'
174 dev_label = 'updater-%s.update_source_item' % dev_release
175 device = kb.get_device(dev_label)
176 if not device:
177 logger.debug('No device with label %s, creating one' % dev_label)
178 conf = {
179 'maker' : dev_maker,
180 'model' : dev_model,
181 'release' : dev_release,
182 'label' : dev_label,
183 }
184 device = kb.factory.create(kb.Device, conf).save()
185 return device
186
187
188 def find_action_setup_conf(args):
189 action_setup_conf = {}
190 for x in dir(args):
191 if not (x.startswith('_') or x.startswith('func')):
192 action_setup_conf[x] = getattr(args, x)
193 if 'passwd' in action_setup_conf:
194 action_setup_conf.pop('passwd') # Storing passwords into an
195 # Omero obj is not a great idea...
196 return action_setup_conf
197
198
199 def main(argv):
200 parser = make_parser()
201 args = parser.parse_args(argv)
202
203 logger = get_logger('change_source_item', level=args.loglevel,
204 filename=args.logfile)
205
206 try:
207 host = args.host or vlu.ome_host()
208 user = args.user or vlu.ome_user()
209 passwd = args.passwd or vlu.ome_passwd()
210 except ValueError, ve:
211 logger.critical(ve)
212 sys.exit(ve)
213
214 kb = KB(driver='omero')(host, user, passwd)
215 logger.info('Loading data from input file')
216 with open(args.in_file) as f:
217 reader = csv.DictReader(f, delimiter='\t')
218 records = list(reader)
219 logger.info('Loaded %d records' % len(records))
220
221 logger.info('Loading %s type objects' % args.target_type)
222 targets = kb.get_objects(getattr(kb, args.target_type))
223 logger.info('Loaded %d objects' % len(targets))
224 if len(targets) == 0:
225 msg = 'No targets loaded from the system, nothing to do'
226 logger.critical(msg)
227 sys.exit(msg)
228
229 logger.info('Loading %s type objects' % args.source_type)
230 sources = kb.get_objects(getattr(kb, args.source_type))
231 logger.info('Loaded %d objects' % len(sources))
232 if len(sources) == 0:
233 msg = 'No sources loaded from the system, nothing to do'
234 logger.critical(msg)
235 sys.exit(msg)
236
237 logger.info('Loading Action type objects')
238 acts = kb.get_objects(kb.Action)
239 logger.info('Loaded %d objects' % len(acts))
240
241 records, targets, sources = do_check(records, targets, sources,
242 args.target_type, args.source_type,
243 kb, logger)
244 if len(records) == 0:
245 msg = 'No records passed consistency checks, nothing to do'
246 logger.critical(msg)
247 sys.exit(msg)
248
249 aconf = find_action_setup_conf(args)
250
251 update_data(records, targets, sources, args.operator,
252 aconf, kb, logger)
253
254 logger.info('Job completed')
255
256
257 if __name__ == '__main__':
258 main(sys.argv[1:])