Mercurial > repos > yhoogstrate > edger_with_design_matrix
view design_matrix_creator @ 79:c86f1f86b3c1 draft
Uploaded
| author | yhoogstrate | 
|---|---|
| date | Thu, 13 Nov 2014 10:30:43 -0500 | 
| parents | |
| children | 
line wrap: on
 line source
#!/usr/bin/env python import argparse, os, shutil, sys, tempfile, subprocess __version_info__ = ('1', '0', '0')#, 'beta') __version__ = '.'.join(__version_info__) if (len(__version_info__) == 3) else '.'.join(__version_info__[0:3])+"-"+__version_info__[3] __author__ = 'Youri Hoogstrate' __homepage__ = 'https://bitbucket.org/EMCbioinf/galaxy-tool-shed-tools' __license__ = 'GNU General Public License v3 (GPLv3)' class sampleContainer: def __init__(self): self.samples = [] self.treatments = {} self.treatment_index = [] self.treatment_types = {} def do_decode(self,encoded_str): return encoded_str.decode("base64").strip().replace("\t",'') def add_samples(self,argument): print " - Adding samples" for sample in argument: self.add_sample(self.do_decode(sample)) def add_sample(self,sample): if(sample in self.samples): sys.stderr.write("Error:\n* Non-unique sample: "+sample+"\n") sys.exit(1) else: self.samples.append(sample) print " - Added: "+sample def add_blocking(self,argument): print " - Adding paired samples" pair = [] for block in argument: self.add_block(block) def add_block(self,blocks): blocks = blocks.split(":") as_treatment = blocks[0] blocks = blocks[1:] used_samples = [] indexed_samples = {} for i in range(len(blocks)): block = blocks[i] samples = self.get_samples_from_block(block) indexed_samples[i+1] = [] for sample in samples: if(sample in used_samples): sys.stderr.write("Error:\n* Blocking contains multiple times the same sample: "+sample+"\n") sys.exit(0) else: indexed_samples[i+1] = block used_samples.append(sample) for sample in self.samples: if(sample not in used_samples): i = i + 1 indexed_samples[i+1] = str(sample).encode('base64').strip() for index in indexed_samples.keys(): key = str(index).encode('base64').strip() as_treatment += ":"+key+":"+indexed_samples[index] self.add_treatment(as_treatment) def get_samples_from_block(self,decoded_block): return [ self.do_decode(x) for x in decoded_block.split(",")] def add_treatments(self,argument): print " - Adding treatments" for treatment in argument: self.add_treatment(treatment) def add_treatment(self,treatment_argument): print " - Parsing treatment" treatment_argument = treatment_argument.split(":") name = self.do_decode(treatment_argument[0]) treatment_argument = treatment_argument[1:] treatment = {"factor_index":{},"sample_index":{}} only_integers = True i = 1 for item in treatment_argument: if(i % 2): factor = self.do_decode(item) if(treatment['factor_index'].has_key(factor)): sys.stderr.write("Error:\n* Factor has been added multiple times to treatment: "+factor+"\n") sys.exit(0) else: print " - Adding factor: "+factor treatment["factor_index"][factor] = [] if(not factor.isdigit()): only_integers = False else: for sample in item.split(","): sample = self.do_decode(sample) if(not sample in self.samples): sys.stderr.write("Error:\n* Unknown sample: "+sample+"\n") sys.exit(0) treatment["factor_index"][factor].append(sample) if(treatment["sample_index"].has_key(sample)): sys.stderr.write("Error:\n* Factor has been added to treatment before: "+sample+"/"+factor+", factors must be mutually exclusive!\n") sys.exit(0) else: treatment["sample_index"][sample] = factor i += 1 treatment_factors = sorted(treatment["factor_index"].keys()) if(name == None): treatment["name"] = "_vs_".join(treatment_factors) else: treatment["name"] = str(name) if(len(treatment["sample_index"]) != len(self.samples)): sys.stderr.write("Error:\n* The number of samples for treatment '"+treatment["name"]+"' ("+str(len(treatment["sample_index"]))+") is different from the total number of samples ("+str(len(self.samples))+").\n") if(only_integers): treatment_type = "integer" else: treatment_type = "string" if(self.treatments.has_key(treatment["name"])): sys.stderr.write("Error:\n* Treatment was already added: '"+treatment["name"]+"\n") else: self.treatments[treatment["name"]] = treatment self.treatment_index.append(treatment["name"]) self.treatment_types[treatment["name"]] = treatment_type print " - Treatment \""+treatment["name"]+"\" of type \""+treatment_type+"\" is valid" def export(self,output): # Open file stream if(args.output == "-"): fh = sys.stdout else: fh = open(args.output,"w") # Write header: fh.write("sample-name\t"+"\t".join(self.treatment_index)+"\n") # Write body: for sample in self.samples: fh.write(sample) for treatment_id in self.treatment_index: treatment = self.treatments[treatment_id] fh.write("\t"+treatment["sample_index"][sample]) fh.write("\n") fh.close() if __name__=="__main__": parser = argparse.ArgumentParser(description="Create an edgeR design matrix with read-count datasets.") parser.add_argument("-o","--output", help="Output file, '-' for stdout.",required=True) parser.add_argument("-c","--columns-file", nargs="?", help='Use columns of [this] file as UIDs (counting from 1)') parser.add_argument("-s","--sample-names", nargs="*", help='Sample names (UIDs that correspond to the columns in the expression matrix)') parser.add_argument("-t","--treatments", nargs="+", help='Treatment or conditions: "name::sample:condition& (sample-names and conditions have to be provided using Base64 encoding to avoid weird characters)',required=True) parser.add_argument("-b","--blocking", nargs="+", help='Description of sample blocking: "blocking_condition*&sample-1-name&sample-2-name&sample-n-name"') args = parser.parse_args() columns = None if(args.columns_file): with open(args.columns_file, "r") as f: listed_columns = [None] + f.readline().strip("\n").split("\t") for i in range(1,len(listed_columns)): listed_columns[i] = listed_columns[i].encode('base64').replace('\n','') s = sampleContainer() if(listed_columns): columns = [] for sample in args.sample_names: columns.append(listed_columns[int(sample)]) treatments = [] for treatment in args.treatments: treatment = treatment.split(":") for i in range(1,len(treatment)): if(i%2 == 0): treatment_tmp = treatment[i].split(",") for j in range(len(treatment_tmp)): treatment_tmp[j] = listed_columns[int(treatment_tmp[j])] treatment[i] = ",".join(treatment_tmp) treatments.append(":".join(treatment)) blockings = [] if(args.blocking): for blocking in args.blocking: blocking = blocking.split(":") for i in range(1,len(blocking)): block = blocking[i].split(",") for j in range(len(block)): block[j] = listed_columns[int(block[j])] blocking[i] = ",".join(block) blockings.append(":".join(blocking)) s.add_samples(columns) s.add_treatments(treatments) s.add_blocking(blockings) else: s.add_samples(args.sample_names) s.add_treatments(args.treatments) if(args.blocking): s.add_blocking(args.blocking) s.export(args.output)
