Mercurial > repos > artbio > small_rna_maps
diff small_rna_maps.py @ 0:0a06985c0894 draft
planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/master/tools/small_rna_maps commit 7b2ceb05489c27ddb769c38fdec56274108a6fa1
author | artbio |
---|---|
date | Tue, 22 Aug 2017 12:06:58 -0400 |
parents | |
children | 615fa2171a34 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/small_rna_maps.py Tue Aug 22 12:06:58 2017 -0400 @@ -0,0 +1,210 @@ +import argparse +from collections import defaultdict + +import numpy + +import pysam + + +def Parser(): + the_parser = argparse.ArgumentParser() + the_parser.add_argument('--inputs', dest='inputs', required=True, + nargs='+', help='list of input BAM files') + the_parser.add_argument('--sample_names', dest='sample_names', + required=True, nargs='+', + help='list of sample names') + the_parser.add_argument('--outputs', nargs='+', action='store', + help='list of two output paths (only two)') + the_parser.add_argument('-M', '--plot_methods', nargs='+', action='store', + help='list of 2 plot methods (only two) among:\ + Counts, Max, Mean, Median, Coverage and Size') + args = the_parser.parse_args() + return args + + +class Map: + + def __init__(self, bam_file, sample): + self.sample_name = sample + self.bam_object = pysam.AlignmentFile(bam_file, 'rb') + self.chromosomes = dict(zip(self.bam_object.references, + self.bam_object.lengths)) + self.map_dict = self.create_map(self.bam_object) + + def create_map(self, bam_object): + ''' + Returns a map_dictionary {(chromosome,read_position,polarity): + [read_length, ...]} + ''' + map_dictionary = defaultdict(list) + # get empty value for start and end of each chromosome + for chrom in self.chromosomes: + map_dictionary[(chrom, 1, 'F')] = [] + map_dictionary[(chrom, self.chromosomes[chrom], 'F')] = [] + for chrom in self.chromosomes: + for read in bam_object.fetch(chrom): + positions = read.positions # a list of covered positions + for pos in positions: + if not map_dictionary[(chrom, pos+1, 'F')]: + map_dictionary[(chrom, pos+1, 'F')] = [] + if read.is_reverse: + map_dictionary[(chrom, positions[-1]+1, + 'R')].append(read.query_alignment_length) + else: + map_dictionary[(chrom, positions[0]+1, + 'F')].append(read.query_alignment_length) + return map_dictionary + + def compute_readcount(self, map_dictionary, out): + ''' + takes a map_dictionary as input and writes + a readmap_dictionary {(chromosome,read_position,polarity): + number_of_reads} + in an open file handler out + ''' + readmap_dictionary = dict() + for key in map_dictionary: + readmap_dictionary[key] = len(map_dictionary[key]) + self.write_table(readmap_dictionary, out) + + def compute_max(self, map_dictionary, out): + ''' + takes a map_dictionary as input and writes + a max_dictionary {(chromosome,read_position,polarity): + max_of_number_of_read_at_any_position} + Not clear this function is still required + ''' + merge_keylist = [(i[0], 0) for i in map_dictionary.keys()] + max_dictionary = dict(merge_keylist) + for key in map_dictionary: + if len(map_dictionary[key]) > max_dictionary[key[0]]: + max_dictionary[key[0]] = len(map_dictionary[key]) + self.write_table(max_dictionary, out) + + def compute_mean(self, map_dictionary, out): + ''' + takes a map_dictionary as input and returns + a mean_dictionary {(chromosome,read_position,polarity): + mean_value_of_reads} + ''' + mean_dictionary = dict() + for key in map_dictionary: + if len(map_dictionary[key]) == 0: + mean_dictionary[key] = 0 + else: + mean_dictionary[key] = round(numpy.mean(map_dictionary[key]), + 1) + self.write_table(mean_dictionary, out) + + def compute_median(self, map_dictionary, out): + ''' + takes a map_dictionary as input and returns + a mean_dictionary {(chromosome,read_position,polarity): + mean_value_of_reads} + ''' + median_dictionary = dict() + for key in map_dictionary: + if len(map_dictionary[key]) == 0: + median_dictionary[key] = 0 + else: + median_dictionary[key] = numpy.median(map_dictionary[key]) + self.write_table(median_dictionary, out) + + def compute_coverage(self, map_dictionary, out, quality=10): + ''' + takes a map_dictionary as input and returns + a coverage_dictionary {(chromosome,read_position,polarity): + coverage} + ''' + coverage_dictionary = dict() + for chrom in self.chromosomes: + coverage_dictionary[(chrom, 1, 'F')] = 0 + coverage_dictionary[(chrom, self.chromosomes[chrom], 'F')] = 0 + for key in map_dictionary: + coverage = self.bam_object.count_coverage( + reference=key[0], + start=key[1]-1, + end=key[1], + quality_threshold=quality) + """ Add the 4 coverage values """ + coverage = [sum(x) for x in zip(*coverage)] + coverage_dictionary[key] = coverage[0] + self.write_table(coverage_dictionary, out) + + def compute_size(self, map_dictionary, out): + ''' + Takes a map_dictionary and returns a dictionary of sizes: + {chrom: {polarity: {size: nbre of reads}}} + ''' + size_dictionary = defaultdict(lambda: defaultdict( + lambda: defaultdict(int))) + # to track empty chromosomes + for chrom in self.chromosomes: + if self.bam_object.count(chrom) == 0: + size_dictionary[chrom]['F'][10] = 0 + for key in map_dictionary: + for size in map_dictionary[key]: + size_dictionary[key[0]][key[2]][size] += 1 + self.write_size_table(size_dictionary, out) + + def write_table(self, mapdict, out): + ''' + Generic writer + Dataset, Chromosome, Chrom_length, Coordinate, Polarity, + <some mapped value> + out is an *open* file handler + ''' + for key in sorted(mapdict): + line = [self.sample_name, key[0], self.chromosomes[key[0]], + key[1], key[2], mapdict[key]] + line = [str(i) for i in line] + out.write('\t'.join(line) + '\n') + + def write_size_table(self, sizedic, out): + ''' + Generic writer of summary values + Dataset, Chromosome, Chrom_length, <some category>, <some value> + out is an *open* file handler + ''' + for chrom in sorted(sizedic): + sizes = sizedic[chrom]['F'].keys() + sizes.extend(sizedic[chrom]['R'].keys()) + for polarity in sorted(sizedic[chrom]): + for size in range(min(sizes), max(sizes)+1): + try: + line = [self.sample_name, chrom, polarity, size, + sizedic[chrom][polarity][size]] + except KeyError: + line = [self.sample_name, chrom, polarity, size, 0] + line = [str(i) for i in line] + out.write('\t'.join(line) + '\n') + + +def main(inputs, samples, methods, outputs): + for method, output in zip(methods, outputs): + F = open(output, 'w') + if method == 'Size': + header = ["Dataset", "Chromosome", "Polarity", method, "Count"] + else: + header = ["Dataset", "Chromosome", "Chrom_length", "Coordinate", + "Polarity", method] + F.write('\t'.join(header) + '\n') + for input, sample in zip(inputs, samples): + mapobj = Map(input, sample) + token = {"Counts": mapobj.compute_readcount, + "Max": mapobj.compute_max, + "Mean": mapobj.compute_mean, + "Median": mapobj.compute_median, + "Coverage": mapobj.compute_coverage, + "Size": mapobj.compute_size} + token[method](mapobj.map_dict, F) + F.close() + + +if __name__ == "__main__": + args = Parser() + # if identical sample names + if len(set(args.sample_names)) != len(args.sample_names): + args.sample_names = [name + '_' + str(i) for + i, name in enumerate(args.sample_names)] + main(args.inputs, args.sample_names, args.plot_methods, args.outputs)