Mercurial > repos > artbio > small_rna_map
view small_rna_map.py @ 6:f924a33e1eef draft
planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/master/tools/small_rna_map commit 19a447cd21c746c852cf7434b5df423504baf383
author | artbio |
---|---|
date | Sun, 23 Jul 2017 03:43:40 -0400 |
parents | d65045e976e6 |
children | 35d3f8ac99cf |
line wrap: on
line source
import argparse from collections import defaultdict import numpy import pysam def Parser(): the_parser = argparse.ArgumentParser() the_parser.add_argument('--input', dest='input', required=True, nargs='+', help='input BAM files') the_parser.add_argument('--sample_name', dest='sample_name', required=True, nargs='+', help='sample name') the_parser.add_argument('--output', action='store', type=str, help='output tabular file') the_parser.add_argument('-S', '--sizes', action='store', help='use to output read sizes dataframe') args = the_parser.parse_args() return args class Map: def __init__(self, bam_file, sample): self.sample_name = sample self.bam_object = pysam.AlignmentFile(bam_file, 'rb') self.chromosomes = dict(zip(self.bam_object.references, self.bam_object.lengths)) self.map_dict = self.create_map(self.bam_object) self.max = self.compute_max(self.map_dict) self.mean = self.compute_mean(self.map_dict) self.median = self.compute_median(self.map_dict) self.coverage = self.compute_coverage(self.map_dict) self.size = self.compute_size(self.map_dict) def create_map(self, bam_object): ''' Returns a map_dictionary {(chromosome,read_position,polarity): [read_length, ...]} ''' map_dictionary = dict() for chrom in self.chromosomes: for pos in range(self.chromosomes[chrom]): map_dictionary[(chrom, pos+1, 'F')] = [] # get all chromosomes map_dictionary[(chrom, pos+1, 'R')] = [] # get all chromosomes for chrom in self.chromosomes: for read in bam_object.fetch(chrom): read_positions = read.positions # a list of covered positions if read.is_reverse: map_dictionary[(chrom, read_positions[-1]+1, 'R')].append(read.query_alignment_length) else: map_dictionary[(chrom, read_positions[0]+1, 'F')].append(read.query_alignment_length) return map_dictionary def compute_max(self, map_dictionary): ''' takes a map_dictionary as input and returns a max_dictionary {(chromosome,read_position,polarity): max_of_number_of_read_at_any_position} ''' merge_keylist = [(i[0], 0) for i in map_dictionary.keys()] max_dictionary = dict(merge_keylist) for key in map_dictionary: if len(map_dictionary[key]) > max_dictionary[key[0]]: max_dictionary[key[0]] = len(map_dictionary[key]) return max_dictionary def compute_mean(self, map_dictionary): ''' takes a map_dictionary as input and returns a mean_dictionary {(chromosome,read_position,polarity): mean_value_of_reads} ''' mean_dictionary = dict() for key in map_dictionary: if len(map_dictionary[key]) == 0: mean_dictionary[key] = 0 else: mean_dictionary[key] = round(numpy.mean(map_dictionary[key]), 1) return mean_dictionary def compute_median(self, map_dictionary): ''' takes a map_dictionary as input and returns a mean_dictionary {(chromosome,read_position,polarity): mean_value_of_reads} ''' median_dictionary = dict() for key in map_dictionary: if len(map_dictionary[key]) == 0: median_dictionary[key] = 0 else: median_dictionary[key] = numpy.median(map_dictionary[key]) return median_dictionary def compute_coverage(self, map_dictionary, quality=10): ''' Takes a map_dictionary and returns a dictionary of lists of coverage along the coordinates of pre_mirs (as keys) ''' coverage_dictionary = dict() for chrom in self.chromosomes: coverage = self.bam_object.count_coverage( reference=chrom, start=0, end=self.chromosomes[chrom], quality_threshold=quality) """ Add the 4 coverage values """ coverage = [sum(x) for x in zip(*coverage)] for pos, cov in enumerate(coverage): coverage_dictionary[(chrom, pos+1, "F")] = cov coverage_dictionary[(chrom, pos+1, "R")] = cov return coverage_dictionary def compute_size(self, map_dictionary): ''' Takes a map_dictionary and returns a dictionary of sizes: {chrom: {size: {polarity: nbre of reads}}} ''' size_dictionary = defaultdict(lambda: defaultdict( lambda: defaultdict( int ))) for key in map_dictionary: if len(map_dictionary) == 0: # to track empty chromosomes size_dictionary[key[0]][key[2]][size] = 0 continue for size in map_dictionary[key]: size_dictionary[key[0]][key[2]][size] += 1 return size_dictionary def write_size_table(self, out): ''' Dataset, Chromosome, Polarity, Size, Nbr_reads out is an *open* file handler ''' for chrom in sorted(self.size): sizes = self.size[chrom]['F'].keys() sizes.extend(self.size[chrom]['R'].keys()) for polarity in sorted(self.size[chrom]): for size in range(min(sizes), max(sizes)+1): try: line = [self.sample_name, chrom, polarity, size, self.size[chrom][polarity][size]] except KeyError: line = [self.sample_name, chrom, polarity, size, 0] line = [str(i) for i in line] out.write('\t'.join(line) + '\n') def write_table(self, out): ''' Dataset, Chromosome, Chrom_length, Coordinate, Nbr_reads Polarity, Max, Mean, Median, Coverage out is an *open* file handler ''' for key in sorted(self.map_dict): line = [self.sample_name, key[0], self.chromosomes[key[0]], key[1], len(self.map_dict[key]), key[2], self.max[key[0]], self.mean[key], self.median[key], self.coverage[key]] line = [str(i) for i in line] out.write('\t'.join(line) + '\n') def main(inputs, samples, file_out, size_file_out=''): F = open(file_out, 'w') header = ["Dataset", "Chromosome", "Chrom_length", "Coordinate", "Nbr_reads", "Polarity", "Max", "Mean", "Median", "Coverage"] F.write('\t'.join(header) + '\n') if size_file_out: Fs = open(size_file_out, 'w') header = ["Dataset", "Chromosome", "Polarity", "Size", "Nbr_reads"] Fs.write('\t'.join(header) + '\n') for file, sample in zip(inputs, samples): mapobj = Map(file, sample) mapobj.write_table(F) if size_file_out: mapobj.write_size_table(Fs) F.close() if size_file_out: Fs.close() if __name__ == "__main__": args = Parser() # if identical sample names # to be tested if len(set(args.sample_name)) != len(args.sample_name): args.sample_name = [name + '_' + str(i) for i, name in enumerate(args.sample_name)] main(args.input, args.sample_name, args.output, args.sizes)