Mercurial > repos > ric > test3
comparison galaxy-tools/biobank/updater/merge_individuals.py @ 0:e54d14bed3f5 draft default tip
Uploaded
| author | ric |
|---|---|
| date | Thu, 29 Sep 2016 06:09:15 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:e54d14bed3f5 |
|---|---|
| 1 #======================================= | |
| 2 # This tool moves all informations related to an individual (source) to | |
| 3 # another (target). Moved informations are: | |
| 4 # * children (Individual objects) | |
| 5 # * ActionOnInvidual | |
| 6 # * Enrollments | |
| 7 # * EHR records | |
| 8 # | |
| 9 # The tool expects as input a TSV file like this | |
| 10 # source target | |
| 11 # V0468D2D96999548BF9FC6AD24C055E038 V060BAA01C662240D181BB98A51885C498 | |
| 12 # V029CC0A614E2D42D0837602B15193EB58 V01B8122A7C75A452E9F80381CEA988557 | |
| 13 # V0B20C93E8A88D43EFB87A7E6911292A05 V0BED85E8E76A54AA7AB0AFB09F95798A8 | |
| 14 # ... | |
| 15 # | |
| 16 # NOTE WELL: | |
| 17 # * Parents of the "source" indivudal WILL NOT BE ASSIGNED | |
| 18 # to the "target" individual | |
| 19 # * For the Enrollmnent objects, if | |
| 20 # "target" individual has already a code in the same study of "source" | |
| 21 # individual, the script will try to move the Enrollment to the | |
| 22 # "duplicated" study (this will be fixed when a proper ALIASES | |
| 23 # manegement will be introduced) | |
| 24 # ======================================= | |
| 25 | |
| 26 import sys, argparse, csv, time, json, os | |
| 27 | |
| 28 from bl.vl.kb import KnowledgeBase as KB | |
| 29 from bl.vl.kb import KBError | |
| 30 import bl.vl.utils.ome_utils as vlu | |
| 31 from bl.vl.utils import get_logger, LOG_LEVELS | |
| 32 | |
| 33 | |
| 34 def make_parser(): | |
| 35 parser = argparse.ArgumentParser(description='merge informations related to an individual ("source") to another one ("target")') | |
| 36 parser.add_argument('--logfile', type=str, help='log file (default=stderr)') | |
| 37 parser.add_argument('--loglevel', type=str, choices = LOG_LEVELS, | |
| 38 help='logging level (default=INFO)', default='INFO') | |
| 39 parser.add_argument('-H', '--host', type=str, help='omero hostname') | |
| 40 parser.add_argument('-U', '--user', type=str, help='omero user') | |
| 41 parser.add_argument('-P', '--passwd', type=str, help='omero password') | |
| 42 parser.add_argument('-O', '--operator', type=str, help='operator', | |
| 43 required=True) | |
| 44 parser.add_argument('--in_file', type=str, required = True, | |
| 45 help='input TSV file') | |
| 46 return parser | |
| 47 | |
| 48 | |
| 49 def update_object(obj, backup_values, operator, kb, logger): | |
| 50 logger.debug('Building ActionOnAction for object %s::%s' % | |
| 51 (obj.get_ome_table(), | |
| 52 obj.id) | |
| 53 ) | |
| 54 act_setup = build_action_setup('merge-individuals-%f' % time.time(), | |
| 55 backup_values, kb) | |
| 56 aoa_conf = { | |
| 57 'setup': act_setup, | |
| 58 'actionCategory' : kb.ActionCategory.UPDATE, | |
| 59 'operator': operator, | |
| 60 'target': obj.lastUpdate if obj.lastUpdate else obj.action, | |
| 61 'context': obj.action.context | |
| 62 } | |
| 63 logger.debug('Updating object with new ActionOnAction') | |
| 64 obj.lastUpdate = kb.factory.create(kb.ActionOnAction, aoa_conf) | |
| 65 | |
| 66 | |
| 67 def build_action_setup(label, backup, kb, logger): | |
| 68 logger.debug('Creating a new ActionSetup with label %s and backup %r' % (label, backup)) | |
| 69 conf = { | |
| 70 'label': label, | |
| 71 'conf': json.dumps({'backup' : backup}) | |
| 72 } | |
| 73 asetup = kb.factory.create(kb.ActionSetup, conf) | |
| 74 return asetup | |
| 75 | |
| 76 | |
| 77 def update_children(source_ind, target_ind, operator, kb, logger): | |
| 78 if source_ind.gender.enum_label() == kb.Gender.MALE.enum_label(): | |
| 79 parent_type = 'father' | |
| 80 elif source_ind.gender.enum_label() == kb.Gender.FEMALE.enum_label(): | |
| 81 parent_type = 'mother' | |
| 82 else: | |
| 83 raise ValueError('%s is not a valid gender value' % (source_ind.gender.enum_label())) | |
| 84 query = ''' | |
| 85 SELECT ind FROM Individual ind | |
| 86 JOIN ind.{0} AS {0} | |
| 87 WHERE {0}.vid = :parent_vid | |
| 88 '''.format(parent_type) | |
| 89 children = kb.find_all_by_query(query, {'parent_vid' : source_ind.id}) | |
| 90 logger.info('Retrieved %d children for source individual' % len(children)) | |
| 91 for child in children: | |
| 92 backup = {} | |
| 93 logger.debug('Changing %s for individual %s' % (parent_type, | |
| 94 child.id)) | |
| 95 backup[parent_type] = getattr(child, parent_type).id | |
| 96 setattr(child, parent_type, target_ind) | |
| 97 update_object(child, backup, operator, kb) | |
| 98 kb.save_array(children) | |
| 99 | |
| 100 | |
| 101 def update_action_on_ind(source_ind, target_ind, operator, kb, logger): | |
| 102 query = '''SELECT act FROM ActionOnIndividual act | |
| 103 JOIN act.target AS ind | |
| 104 WHERE ind.vid = :ind_vid | |
| 105 ''' | |
| 106 src_acts = kb.find_all_by_query(query, {'ind_vid' : source_ind.id}) | |
| 107 logger.info('Retrieved %d actions for source individual' % len(src_acts)) | |
| 108 connected = kb.dt.get_connected(source_ind, direction=kb.dt.DIRECTION_OUTGOING, | |
| 109 query_depth=1) | |
| 110 if source_ind in connected: | |
| 111 connected.remove(source_ind) | |
| 112 for sa in src_acts: | |
| 113 logger.debug('Changing target for action %s' % sa.id) | |
| 114 sa.target = target_ind | |
| 115 logger.debug('Action %s target updated' % sa.id) | |
| 116 kb.save_array(src_acts) | |
| 117 for conn in connected: | |
| 118 kb.dt.destroy_edge(source_ind, conn) | |
| 119 kb.dt.create_edge(conn.action, target_ind, conn) | |
| 120 | |
| 121 | |
| 122 def update_enrollments(source_ind, target_ind, operator, kb, logger): | |
| 123 query = '''SELECT en FROM Enrollment en | |
| 124 JOIN en.individual AS ind | |
| 125 WHERE ind.vid = :ind_vid | |
| 126 ''' | |
| 127 enrolls = kb.find_all_by_query(query, {'ind_vid' : source_ind.id}) | |
| 128 logger.info('Retrieved %d enrollments for source individual' % len(enrolls)) | |
| 129 for sren in enrolls: | |
| 130 try: | |
| 131 sren.individual = target_ind | |
| 132 logger.debug('Changing individual for enrollment %s in study %s' % (sren.studyCode, | |
| 133 sren.study.label)) | |
| 134 kb.save(sren) | |
| 135 logger.info('Changed individual for enrollment %s (study code %s -- study %s)' % (sren.id, | |
| 136 sren.studyCode, | |
| 137 sren.study.label)) | |
| 138 except KBError, kbe: | |
| 139 logger.warning('Unable to update enrollment %s (study code %s -- study %s)' % (sren.id, | |
| 140 sren.studyCode, | |
| 141 sren.study.label)) | |
| 142 move_to_duplicated(sren, operator, kb, logger) | |
| 143 | |
| 144 | |
| 145 def update_ehr_records(source_ind, target_ind, kb): | |
| 146 kb.update_table_rows(kb.eadpt.EAV_EHR_TABLE, '(i_vid == "%s")' % source_ind.id, | |
| 147 {'i_vid' : target_ind.id}) | |
| 148 | |
| 149 | |
| 150 # This method should be considered as a temporary hack that will be | |
| 151 # used untill a proper ALIAS management will be introduced into the | |
| 152 # system | |
| 153 def move_to_duplicated(enrollment, operator, kb, logger): | |
| 154 old_st = enrollment.study | |
| 155 dupl_st = kb.get_study('%s_DUPLICATI' % old_st.label) | |
| 156 if not dupl_st: | |
| 157 logger.warning('No "duplicated" study ({0}_DUPLICATI) found for study {0}'.format(old_st.label)) | |
| 158 return | |
| 159 enrollment.study = dupl_st | |
| 160 try: | |
| 161 kb.save(enrollment) | |
| 162 logger.info('Enrollmnet %s moved from study %s to study %s' % (enrollment.studyCode, | |
| 163 old_st.label, dupl_st.label)) | |
| 164 except: | |
| 165 logger.error('An error occurred while moving enrollment %s from study %s to %s' % (enrollment.studyCode, | |
| 166 old_st.label, | |
| 167 dupl_st.label)) | |
| 168 | |
| 169 | |
| 170 def main(argv): | |
| 171 parser = make_parser() | |
| 172 args = parser.parse_args(argv) | |
| 173 | |
| 174 logger = get_logger('merge_individuals', level=args.loglevel, | |
| 175 filename=args.logfile) | |
| 176 | |
| 177 try: | |
| 178 host = args.host or vlu.ome_host() | |
| 179 user = args.user or vlu.ome_user() | |
| 180 passwd = args.passwd or vlu.ome_passwd() | |
| 181 except ValueError, ve: | |
| 182 logger.critical(ve) | |
| 183 sys.exit(ve) | |
| 184 | |
| 185 kb = KB(driver='omero')(host, user, passwd) | |
| 186 | |
| 187 logger.debug('Retrieving Individuals') | |
| 188 individuals = kb.get_objects(kb.Individual) | |
| 189 logger.debug('Retrieved %d Individuals' % len(individuals)) | |
| 190 ind_lookup = {} | |
| 191 for i in individuals: | |
| 192 ind_lookup[i.id] = i | |
| 193 | |
| 194 with open(args.in_file) as in_file: | |
| 195 reader = csv.DictReader(in_file, delimiter='\t') | |
| 196 for row in reader: | |
| 197 try: | |
| 198 source = ind_lookup[row['source']] | |
| 199 logger.info('Selected as source individual with ID %s' % source.id) | |
| 200 target = ind_lookup[row['target']] | |
| 201 logger.info('Selected as destination individual with ID %s' % target.id) | |
| 202 except KeyError, ke: | |
| 203 logger.warning('Unable to retrieve individual with ID %s, skipping row' % ke) | |
| 204 continue | |
| 205 | |
| 206 logger.info('Updating children connected to source individual') | |
| 207 update_children(source, target, args.operator, kb, logger) | |
| 208 logger.info('Children update complete') | |
| 209 | |
| 210 logger.info('Updating ActionOnIndividual related to source individual') | |
| 211 update_action_on_ind(source, target, args.operator, kb, logger) | |
| 212 logger.info('ActionOnIndividual update completed') | |
| 213 | |
| 214 logger.info('Updating enrollments related to source individual') | |
| 215 update_enrollments(source, target, args.operator, kb, logger) | |
| 216 logger.info('Enrollments update completed') | |
| 217 | |
| 218 logger.info('Updating EHR records related to source individual') | |
| 219 update_ehr_records(source, target, kb) | |
| 220 logger.info('EHR records update completed') | |
| 221 | |
| 222 if __name__ == '__main__': | |
| 223 main(sys.argv[1:]) |
