diff galaxy-tools/biobank/updater/merge_individuals.py @ 0:e54d14bed3f5 draft default tip

Uploaded
author ric
date Thu, 29 Sep 2016 06:09:15 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/galaxy-tools/biobank/updater/merge_individuals.py	Thu Sep 29 06:09:15 2016 -0400
@@ -0,0 +1,223 @@
+#======================================= 
+# This tool moves all informations related to an individual (source) to
+# another (target). Moved informations are:
+# * children (Individual objects)
+# * ActionOnInvidual
+# * Enrollments
+# * EHR records
+#
+# The tool expects as input a TSV file like this
+#   source                               target
+#   V0468D2D96999548BF9FC6AD24C055E038   V060BAA01C662240D181BB98A51885C498
+#   V029CC0A614E2D42D0837602B15193EB58   V01B8122A7C75A452E9F80381CEA988557
+#   V0B20C93E8A88D43EFB87A7E6911292A05   V0BED85E8E76A54AA7AB0AFB09F95798A8
+#   ...
+#
+# NOTE WELL: 
+# * Parents of the "source" indivudal WILL NOT BE ASSIGNED
+#   to the "target" individual
+# * For the Enrollmnent objects, if
+#   "target" individual has already a code in the same study of "source"
+#   individual, the script will try to move the Enrollment to the
+#   "duplicated" study (this will be fixed when a proper ALIASES
+#   manegement will be introduced)
+# =======================================
+
+import sys, argparse, csv, time, json, os
+
+from bl.vl.kb import KnowledgeBase as KB
+from bl.vl.kb import KBError
+import bl.vl.utils.ome_utils as vlu
+from bl.vl.utils import get_logger, LOG_LEVELS
+
+
+def make_parser():
+    parser = argparse.ArgumentParser(description='merge informations related to an individual ("source") to another one ("target")')
+    parser.add_argument('--logfile', type=str, help='log file (default=stderr)')
+    parser.add_argument('--loglevel', type=str, choices = LOG_LEVELS,
+                        help='logging level (default=INFO)', default='INFO')
+    parser.add_argument('-H', '--host', type=str, help='omero hostname')
+    parser.add_argument('-U', '--user', type=str, help='omero user')
+    parser.add_argument('-P', '--passwd', type=str, help='omero password')
+    parser.add_argument('-O', '--operator', type=str, help='operator',
+                        required=True)
+    parser.add_argument('--in_file', type=str, required = True,
+                        help='input TSV file')
+    return parser
+
+
+def update_object(obj, backup_values, operator, kb, logger):
+    logger.debug('Building ActionOnAction for object %s::%s' %
+                 (obj.get_ome_table(),
+                  obj.id)
+                 )
+    act_setup = build_action_setup('merge-individuals-%f' % time.time(),
+                                   backup_values, kb)
+    aoa_conf = {
+        'setup': act_setup,
+        'actionCategory' : kb.ActionCategory.UPDATE,
+        'operator': operator,
+        'target': obj.lastUpdate if obj.lastUpdate else obj.action,
+        'context': obj.action.context
+        }
+    logger.debug('Updating object with new ActionOnAction')
+    obj.lastUpdate = kb.factory.create(kb.ActionOnAction, aoa_conf)
+
+
+def build_action_setup(label, backup, kb, logger):
+    logger.debug('Creating a new ActionSetup with label %s and backup %r' % (label, backup))
+    conf = {
+        'label': label,
+        'conf': json.dumps({'backup' : backup})
+        }
+    asetup = kb.factory.create(kb.ActionSetup, conf)
+    return asetup
+
+
+def update_children(source_ind, target_ind, operator, kb, logger):
+    if source_ind.gender.enum_label() == kb.Gender.MALE.enum_label():
+        parent_type = 'father'
+    elif source_ind.gender.enum_label() == kb.Gender.FEMALE.enum_label():
+        parent_type = 'mother'
+    else:
+        raise ValueError('%s is not a valid gender value' % (source_ind.gender.enum_label()))
+    query = '''
+            SELECT ind FROM Individual ind
+            JOIN ind.{0} AS {0}
+            WHERE {0}.vid = :parent_vid
+            '''.format(parent_type)
+    children = kb.find_all_by_query(query, {'parent_vid' : source_ind.id})
+    logger.info('Retrieved %d children for source individual' % len(children))
+    for child in children:
+        backup = {}
+        logger.debug('Changing %s for individual %s' % (parent_type,
+                                                        child.id))
+        backup[parent_type] = getattr(child, parent_type).id
+        setattr(child, parent_type, target_ind)
+        update_object(child, backup, operator, kb)
+    kb.save_array(children)
+
+
+def update_action_on_ind(source_ind, target_ind, operator, kb, logger):
+    query = '''SELECT act FROM ActionOnIndividual act
+               JOIN act.target AS ind
+               WHERE ind.vid = :ind_vid
+            '''
+    src_acts = kb.find_all_by_query(query, {'ind_vid' : source_ind.id})
+    logger.info('Retrieved %d actions for source individual' % len(src_acts))
+    connected = kb.dt.get_connected(source_ind, direction=kb.dt.DIRECTION_OUTGOING, 
+                                    query_depth=1)
+    if source_ind in connected:
+        connected.remove(source_ind)
+    for sa in src_acts:
+        logger.debug('Changing target for action %s' % sa.id)
+        sa.target = target_ind
+        logger.debug('Action %s target updated' % sa.id)
+    kb.save_array(src_acts)
+    for conn in connected:
+        kb.dt.destroy_edge(source_ind, conn)
+        kb.dt.create_edge(conn.action, target_ind, conn)
+
+
+def update_enrollments(source_ind, target_ind, operator, kb, logger):
+    query = '''SELECT en FROM Enrollment en
+               JOIN en.individual AS ind
+               WHERE ind.vid = :ind_vid
+            '''
+    enrolls = kb.find_all_by_query(query, {'ind_vid' : source_ind.id})
+    logger.info('Retrieved %d enrollments for source individual' % len(enrolls))
+    for sren in enrolls:
+        try:
+            sren.individual = target_ind
+            logger.debug('Changing individual for enrollment %s in study %s' % (sren.studyCode,
+                                                                                sren.study.label))
+            kb.save(sren)
+            logger.info('Changed individual for enrollment %s (study code %s -- study %s)' % (sren.id,
+                                                                                              sren.studyCode,
+                                                                                              sren.study.label))
+        except KBError, kbe:
+            logger.warning('Unable to update enrollment %s (study code %s -- study %s)' % (sren.id,
+                                                                                           sren.studyCode,
+                                                                                           sren.study.label))
+            move_to_duplicated(sren, operator, kb, logger)
+
+
+def update_ehr_records(source_ind, target_ind, kb):
+    kb.update_table_rows(kb.eadpt.EAV_EHR_TABLE, '(i_vid == "%s")' % source_ind.id,
+                         {'i_vid' : target_ind.id})
+                                    
+
+# This method should be considered as a temporary hack that will be
+# used untill a proper ALIAS management will be introduced into the
+# system
+def move_to_duplicated(enrollment, operator, kb, logger):
+    old_st = enrollment.study
+    dupl_st = kb.get_study('%s_DUPLICATI' % old_st.label)
+    if not dupl_st:
+        logger.warning('No "duplicated" study ({0}_DUPLICATI) found for study {0}'.format(old_st.label))
+        return
+    enrollment.study = dupl_st
+    try:
+        kb.save(enrollment)
+        logger.info('Enrollmnet %s moved from study %s to study %s' % (enrollment.studyCode,
+                                                                       old_st.label, dupl_st.label))
+    except:
+        logger.error('An error occurred while moving enrollment %s from study %s to %s' % (enrollment.studyCode,
+                                                                                           old_st.label,
+                                                                                           dupl_st.label))
+
+
+def main(argv):
+    parser = make_parser()
+    args = parser.parse_args(argv)
+
+    logger = get_logger('merge_individuals', level=args.loglevel,
+                         filename=args.logfile)
+
+    try:
+        host = args.host or vlu.ome_host()
+        user = args.user or vlu.ome_user()
+        passwd = args.passwd or vlu.ome_passwd()
+    except ValueError, ve:
+        logger.critical(ve)
+        sys.exit(ve)
+
+    kb = KB(driver='omero')(host, user, passwd)
+
+    logger.debug('Retrieving Individuals')
+    individuals = kb.get_objects(kb.Individual)
+    logger.debug('Retrieved %d Individuals' % len(individuals))
+    ind_lookup = {}
+    for i in individuals:
+        ind_lookup[i.id] = i
+
+    with open(args.in_file) as in_file:
+        reader = csv.DictReader(in_file, delimiter='\t')
+        for row in reader:            
+            try:
+                source = ind_lookup[row['source']]
+                logger.info('Selected as source individual with ID %s' % source.id)
+                target = ind_lookup[row['target']]
+                logger.info('Selected as destination individual with ID %s' % target.id)
+            except KeyError, ke:
+                logger.warning('Unable to retrieve individual with ID %s, skipping row' % ke)
+                continue
+
+            logger.info('Updating children connected to source individual')
+            update_children(source, target, args.operator, kb, logger)
+            logger.info('Children update complete')
+
+            logger.info('Updating ActionOnIndividual related to source individual')
+            update_action_on_ind(source, target, args.operator, kb, logger)
+            logger.info('ActionOnIndividual update completed')
+
+            logger.info('Updating enrollments related to source individual')
+            update_enrollments(source, target, args.operator, kb, logger)
+            logger.info('Enrollments update completed')
+            
+            logger.info('Updating EHR records related to source individual')
+            update_ehr_records(source, target, kb)
+            logger.info('EHR records update completed')
+    
+if __name__ == '__main__':
+    main(sys.argv[1:])