Mercurial > repos > ric > test2
view galaxy-tools/biobank/tools/build_miniped.py @ 0:ba6cf6ede027 draft default tip
Uploaded
| author | ric |
|---|---|
| date | Wed, 28 Sep 2016 06:03:30 -0400 |
| parents | |
| children |
line wrap: on
line source
# BEGIN_COPYRIGHT # END_COPYRIGHT """ A rough example of basic pedigree info generation. """ import argparse import collections import csv import os import sys import yaml from bl.vl.kb import KnowledgeBase as KB from bl.vl.kb.drivers.omero.ehr import EHR import bl.vl.individual.pedigree as ped import bl.vl.utils.ome_utils as vlu from bl.vl.utils import LOG_LEVELS, get_logger PLINK_MISSING = -9 PLINK_UNAFFECTED = 1 PLINK_AFFECTED = 2 FIELDS = ["fam_label", "ind_label", "fat_label", "mot_label", "gender"] def load_config(config_file): with open(config_file) as cfg: conf = yaml.load(cfg) return conf class Diagnosis(object): def __init__(self, logger, yaml_file): self.logger = logger if os.path.isfile(yaml_file): self.conf = load_config(yaml_file) else: self.logger.critical('The config file {} does not exists'.format( yaml_file)) sys.exit() def get_openehr_data(self): return self.conf['openEHR']['archetype'], self.conf['openEHR']['field'] def get_diagnosis_label(self): return [l for l in self.get_diagnosis().iterkeys()] def get_diagnosis(self): results = collections.OrderedDict() diagnosis = collections.OrderedDict(sorted( self.conf['diagnosis'].items())) for v in diagnosis.itervalues(): results[v['label']] = v['values'] return results def get_unaffected_diagnosis_dictionary(self): labels = self.get_diagnosis_label() d = {} for k in labels: d[k] = 1 return d def make_parser(): parser = argparse.ArgumentParser( description='build the first columns of a ped file from VL') parser.add_argument('--logfile', type=str, help='log file (default=stderr)') parser.add_argument('--loglevel', type=str, choices=LOG_LEVELS, help='logging level', default='INFO') parser.add_argument('--configfile', type=str, default=os.path.join( os.path.dirname(os.path.realpath(__file__)), 'build_miniped.yaml'), help='config file (yaml) with diagnosis dictionary') parser.add_argument('-H', '--host', type=str, help='omero hostname') parser.add_argument('-U', '--user', type=str, help='omero user') parser.add_argument('-P', '--passwd', type=str, help='omero password') parser.add_argument('-S', '--study', type=str, required=True, help="a list of comma separated studies used to " "retrieve individuals that will be written to " "ped file") parser.add_argument('--ofile', type=str, help='output file path', required=True) parser.add_argument('--write_header', action='store_true', default=False, help='Write header into the output file') return parser def build_families(individuals, logger): # Individuals with only one parent will be considered like founders # for i in individuals: # if ((i.mother is None) or (i.father is None)): # i.mother = None # i.father = None logger.info("individuals: %d" % len(individuals)) # logger.info("individuals: with 0 or 2 parents: %d" % len(not_one_parent)) logger.info("analyzing pedigree") founders, non_founders, dangling, couples, children = ped.analyze( individuals ) logger.info("splitting into families") return ped.split_disjoint(individuals, children) def main(argv): parser = make_parser() args = parser.parse_args(argv) logger = get_logger('build_miniped', level=args.loglevel, filename=args.logfile) dobj = Diagnosis(logger, args.configfile) logger.debug("l {}".format(dobj.get_diagnosis_label())) try: host = args.host or vlu.ome_host() user = args.user or vlu.ome_user() passwd = args.passwd or vlu.ome_passwd() except ValueError, ve: logger.critical(ve) sys.exit(ve) kb = KB(driver='omero')(host, user, passwd) logger.debug('Loading all individuals from omero') all_inds = kb.get_objects(kb.Individual) # store all inds to cache logger.debug('%d individuals loaded' % len(all_inds)) studies = [kb.get_study(s) for s in args.study.split(',')] # Removing None values studies = set(studies) try: studies.remove(None) except KeyError: pass studies = list(studies) # Sorting studies studies = sorted(studies, key=lambda k: k.label.lower()) if len(studies) == 0: logger.error( 'No matches found for labels %s, stopping program' % args.study) sys.exit(2) enrolled_map = {} for study in studies: logger.info('Loading enrolled individuals for study %s' % study.label) enrolled = kb.get_enrolled(study) logger.debug('%d individuals loaded' % len(enrolled)) for en in enrolled: if en.individual.id not in enrolled_map: enrolled_map[en.individual.id] = ( '%s:%s' % (en.study.label, en.studyCode), en.individual) else: logger.debug('Individual %s already mapped' % en.individual.id) logger.debug('Loading EHR records') ehr_records = kb.get_ehr_records() logger.debug('%s EHR records loaded' % len(ehr_records)) ehr_records_map = {} for r in ehr_records: ehr_records_map.setdefault(r['i_id'], []).append(r) affection_map = {} arch, field = dobj.get_openehr_data() for ind_id, ehr_recs in ehr_records_map.iteritems(): affection_map[ind_id] = dobj.get_unaffected_diagnosis_dictionary() ehr = EHR(ehr_recs) for k, v in dobj.get_diagnosis().iteritems(): for d in v: if ehr.matches(arch, field, d): affection_map[ind_id][k] = PLINK_AFFECTED immuno_inds = [i for (ind_id, (st_code, i)) in enrolled_map.iteritems()] families = build_families(immuno_inds, logger) logger.info("found %d families" % len(families)) def resolve_label(i): try: return enrolled_map[i.id][0] except KeyError: return i.id def resolve_pheno(i): try: immuno_affection = affection_map[i.id] except KeyError: return [(d, PLINK_MISSING) for d in dobj.get_diagnosis_label()] return [(d, immuno_affection[d]) for d in dobj.get_diagnosis_label()] kb.Gender.map_enums_values(kb) gender_map = lambda x: 2 if x.enum_label() == kb.Gender.FEMALE.enum_label() \ else 1 for d in dobj.get_diagnosis_label(): FIELDS.append("_".join([d, "status"])) with open(args.ofile, "w") as f: writer = csv.DictWriter(f, FIELDS, delimiter="\t", lineterminator="\n") if args.write_header: writer.writeheader() families_data = [] logger.info("building families data") for k, fam in enumerate(families): fam_label = "FAM_%d" % (k + 1) for i in fam: r = {"fam_label": fam_label, "ind_label": resolve_label(i), "fat_label": 0 if (i.father is None or i.father not in fam) else resolve_label(i.father), "mot_label": 0 if (i.mother is None or i.mother not in fam) else resolve_label(i.mother), "gender": gender_map(i.gender)} for p in resolve_pheno(i): r["_".join([p[0], "status"])] = p[1] families_data.append(r) logger.info("writing miniped") writer.writerows(families_data) if __name__ == "__main__": main(sys.argv[1:])
