Mercurial > repos > ric > test1
view galaxy-tools/biobank/utils/prepare_seq_dsample_inputs.py @ 3:43be74e62bfe draft
Uploaded
author | ric |
---|---|
date | Thu, 22 Sep 2016 08:57:04 -0400 |
parents | |
children |
line wrap: on
line source
""" This tool produces files that can be used as input to import * samples * flowcells * lanes * laneslots within OMERO.biobank using import applications. If the optional 'study-output-file' parameter is given as input, the script will produce the input file for a new study definition. If the optional 'tubes-subsamples-output-file' is given, the script will generate another file with tubes definitions where each tube is produced appliying a specific laboratory protocol to an existing tube. Existing tubes are the ones in tubes-out-file, new tubes' labels are created using the pattern <tube_label>::<protocol> The config_parameters field must point to a YAML configuration file with the following structure: config_parameters: study_label: study_label namespace: namespace where study_label is mandatory """ import csv, sys, argparse, logging, yaml # Needed to import flowcell data from bioblend.galaxy import GalaxyInstance import nglimsclient, os LOG_FORMAT = '%(asctime)s|%(levelname)-8s|%(message)s' LOG_DATEFMT = '%Y-%m-%d %H:%M:%S' LOG_LEVELS = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] def make_parser(): parser = argparse.ArgumentParser(description='split sequencing samplesheet') parser.add_argument('--logfile', type=str, help='log file (default=stderr)') parser.add_argument('--loglevel', type=str, choices=LOG_LEVELS, help='logging level', default='INFO') parser.add_argument('--in-file', '-i', type=str, required=True, help='input file') parser.add_argument('--tubes-out-file', type=str, help='output file containing tube definitions', default='./tubes_def.tsv') parser.add_argument('--flowcells-out-file', type=str, help='output file containing flowcell definitions', default='./flowcells_def.tsv') parser.add_argument('--lanes-out-file', type=str, help='output file containing lane definitions', default='./lanes_def.tsv') parser.add_argument('--laneslots-out-file', type=str, help='output file containing laneslot definitions', default='./laneslots_def.tsv') parser.add_argument('--config-parameters', type=str, required=True, help='a YAML configuration file containing study label and labels namespace, ' 'namespace is optional') parser.add_argument('--study-output-file', type=str, help='output file containing study definition') parser.add_argument('--tubes-subsamples-output-file', type=str, help='output file containing tubes subsamples (samples produced applying a ' 'laboratory protocol to existing samples)') return parser def get_samplesheet_translator(samplesheet_type='default'): translator = {'default': {'flowcell_id': 'FCID', 'tube_id': 'SampleID', 'lane_id': 'Lane', 'sample_tag': 'Index', 'protocol': 'Recipe', 'operator': 'Operator', 'sample_project': 'SampleProject'} } return translator[samplesheet_type] def add_namespace(namespace, label, separator='|'): return separator.join([namespace, label]) def write_tubes_file(records, study_label, translator, ofile, namespace = None, logger = None): ofile_fields = ['study', 'label', 'vessel_type', 'vessel_content', 'vessel_status', 'source', 'source_type'] with open(ofile, 'w') as out_file: writer = csv.DictWriter(out_file, ofile_fields, delimiter='\t') writer.writeheader() tubes_def = set([r[translator['tube_id'].strip()] for r in records]) for x in tubes_def: writer.writerow({'study' : study_label, 'label' : x if not namespace else add_namespace(namespace, x), 'vessel_type' : 'Tube', 'vessel_content' : 'DNA', 'vessel_status' : 'UNKNOWN', 'source' : 'None', 'source_type' : 'NO_SOURCE'}) def write_subsamples_file(records, study_label, translator, ofile, namespace = None, logger = None): ofile_fields = ['study', 'label', 'vessel_type', 'vessel_content', 'vessel_status', 'source', 'source_type', 'options'] with open(ofile, 'w') as out_file: writer = csv.DictWriter(out_file, ofile_fields, delimiter='\t') writer.writeheader() subsamples_def = set([('%s::%s' % (r[translator['tube_id']].strip(), r[translator['protocol']].strip()), r[translator['tube_id']].strip(), r[translator['protocol']].strip()) for r in records]) for x in subsamples_def: writer.writerow({'study' : study_label, 'label' : x[0] if not namespace else add_namespace(namespace, x[0]), 'vessel_type' : 'Tube', 'vessel_content' : 'DNA', 'vessel_status' : 'UNKNOWN', 'source' : x[1] if not namespace else add_namespace(namespace, x[1]), 'source_type' : 'Tube', 'options' : 'protocol=%s' % x[2]}) def write_flowcells_file(records, study_label, translator, ofile, namespace = None, logger=None): ofile_fields = ['study', 'label', 'barcode', 'container_status', 'number_of_slots'] with open(ofile, 'w') as out_file: writer = csv.DictWriter(out_file, ofile_fields, delimiter='\t') writer.writeheader() flowcells_def = set([(r[translator['flowcell_id']].strip()) for r in records]) for x in flowcells_def: writer.writerow({'study' : study_label, 'label' : x if not namespace else add_namespace(namespace, x), 'barcode' : x if not namespace else add_namespace(namespace, x), 'container_status' : 'INSTOCK', 'number_of_slots' : '8'}) def write_lanes_file(records, study_label, translator, ofile, namespace = None, logger=None): ofile_fields = ['study', 'flow_cell', 'slot', 'container_status'] with open(ofile, 'w') as out_file: writer = csv.DictWriter(out_file, ofile_fields, delimiter='\t') writer.writeheader() lanes_def = set([(r[translator['flowcell_id']].strip(), r[translator['lane_id']].strip()) for r in records]) for x in lanes_def: writer.writerow({'study' : study_label, 'flow_cell' : x[0] if not namespace else add_namespace(namespace, x[0]), 'slot' : x[1], 'container_status' : 'INSTOCK'}) def write_laneslots_file(records, study_label, translator, ofile, subsamples_enabled=False, namespace = None, logger=None): logger.debug ('subsamples_ensabled: %r' % subsamples_enabled) ofile_fields = ['study', 'lane', 'tag', 'content', 'source', 'source_type', 'options'] # Get NGLIMS host and key try: galaxy_host = os.environ['NGLIMS_GALAXY_HOST'] api_key = os.environ['NGLIMS_GALAXY_API_KEY'] except KeyError as ke: msg = 'No environment variables %s set to configure access to the Galaxy server' % ke sys.exit(msg) # Get flowcell label (assuming label is the same for all records) fc_id = records[0][translator['flowcell_id']].strip() # Get flowcell details from nglims gi = nglimsclient.setup(GalaxyInstance(galaxy_host, api_key)) if gi.nglims.exists_flowcell_id(fc_id): fc_data = gi.nglims.flowcell_complete_details(fc_id) with open(ofile, 'w') as out_file: writer = csv.DictWriter(out_file, ofile_fields, delimiter='\t') writer.writeheader() laneslots_def = set() for r in records: fc_id = r[translator['flowcell_id']].strip() if not namespace else \ add_namespace(namespace, r[translator['flowcell_id']]).strip() if subsamples_enabled: source_tube_id = '%s::%s' % (r[translator['tube_id']].strip(), r[translator['protocol']].strip()) else: source_tube_id = r[translator['tube_id']].strip() # Identify adapter adapter = [i['adapter'] for i in fc_data['details'] if i['name']==r[translator['tube_id']].strip() and i['lane']==int(r[translator['lane_id']].strip())] laneslots_def.add(('%s:%s' % (fc_id, r[translator['lane_id']].strip()), r[translator['sample_tag']].strip(), source_tube_id, r[translator['protocol']].strip(), r[translator['operator']].strip(), r[translator['sample_project']].strip(), adapter[0])) for x in laneslots_def: writer.writerow({'study' : study_label, 'lane' : x[0], 'tag' : x[1], 'content' : 'DNA', 'source' : x[2] if not namespace else \ add_namespace(namespace, x[2]), 'source_type' : 'Tube', 'options' : 'protocol=%s,operator=%s,sample_project=%s,adapter=%s' % (x[3], x[4], x[5], x[6])}) def write_study_file(study_label, records, translator, ofile, logger=None): ofile_fields = ['label', 'description'] with open(ofile, 'w') as out_file: writer = csv.DictWriter(out_file, ofile_fields, delimiter='\t', ) writer.writeheader() writer.writerow({'label': study_label}) def main(argv): parser = make_parser() args = parser.parse_args(argv) log_level = getattr(logging, args.loglevel) kwargs = {'format' : LOG_FORMAT, 'datefmt' : LOG_DATEFMT, 'level' : log_level} if args.logfile: kwargs['filename'] = args.logfile logging.basicConfig(**kwargs) logger = logging.getLogger('prepare_seq_dsample_inputs') with open(args.in_file, 'rU') as f: logger.info('Loading data from file %s' % args.in_file) reader = csv.DictReader(f, delimiter='\t') recs = [r for r in reader] translator = get_samplesheet_translator() with open(args.config_parameters) as cfgf: conf = yaml.load(cfgf) if not conf.has_key('config_parameters'): raise RuntimeError('Bad configuration file') else: try: study_label = conf['config_parameters']['study_label'] except KeyError: raise RuntimeError('No study_label provided') if conf['config_parameters'].has_key('namespace'): namespace = conf['config_parameters']['namespace'] else: namespace = None if args.study_output_file: logger.info('Writing Study definition file %s' % args.study_output_file) write_study_file(study_label, recs, translator, args.study_output_file, logger) logger.info('Done writing file %s' % args.study_output_file) logger.info('Writing Tube definitions file %s' % args.tubes_out_file) write_tubes_file(recs, study_label, translator, args.tubes_out_file, namespace, logger) logger.info('Done writing file %s' % args.tubes_out_file) if args.tubes_subsamples_output_file: logger.info('Writing Tubes\' subsamples definitions file %s' \ % args.tubes_subsamples_output_file) write_subsamples_file(recs, study_label, translator, args.tubes_subsamples_output_file, namespace, logger) logger.info('Done writing file %s' % args.tubes_subsamples_output_file) logger.info('Writing FlowCell definitions file %s' % args.flowcells_out_file) write_flowcells_file(recs, study_label, translator, args.flowcells_out_file, namespace, logger) logger.info('Done writing file %s' % args.flowcells_out_file) logger.info('Writing Lane definitions file %s' % args.lanes_out_file) write_lanes_file(recs, study_label, translator, args.lanes_out_file, namespace, logger) logger.info('Done writing file %s' % args.lanes_out_file) logger.info('Writing LaneSlot definitions file %s' % args.laneslots_out_file) write_laneslots_file(recs, study_label, translator, args.laneslots_out_file, 'tubes_subsamples_output_file' in args, # Check if subsamples have been created namespace, logger) logger.info('Done writing file %s' % args.laneslots_out_file) if __name__ == '__main__': main(sys.argv[1:])