view small_rna_map.py @ 6:f924a33e1eef draft

planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/master/tools/small_rna_map commit 19a447cd21c746c852cf7434b5df423504baf383
author artbio
date Sun, 23 Jul 2017 03:43:40 -0400
parents d65045e976e6
children 35d3f8ac99cf
line wrap: on
line source

import argparse
from collections import defaultdict

import numpy

import pysam


def Parser():
    the_parser = argparse.ArgumentParser()
    the_parser.add_argument('--input', dest='input', required=True,
                            nargs='+', help='input BAM files')
    the_parser.add_argument('--sample_name', dest='sample_name',
                            required=True, nargs='+', help='sample name')
    the_parser.add_argument('--output', action='store',
                            type=str, help='output tabular file')
    the_parser.add_argument('-S', '--sizes', action='store',
                            help='use to output read sizes dataframe')
    args = the_parser.parse_args()
    return args


class Map:

    def __init__(self, bam_file, sample):
        self.sample_name = sample
        self.bam_object = pysam.AlignmentFile(bam_file, 'rb')
        self.chromosomes = dict(zip(self.bam_object.references,
                                self.bam_object.lengths))
        self.map_dict = self.create_map(self.bam_object)
        self.max = self.compute_max(self.map_dict)
        self.mean = self.compute_mean(self.map_dict)
        self.median = self.compute_median(self.map_dict)
        self.coverage = self.compute_coverage(self.map_dict)
        self.size = self.compute_size(self.map_dict)

    def create_map(self, bam_object):
        '''
        Returns a map_dictionary {(chromosome,read_position,polarity):
                                                    [read_length, ...]}
        '''
        map_dictionary = dict()
        for chrom in self.chromosomes:
            for pos in range(self.chromosomes[chrom]):
                map_dictionary[(chrom, pos+1, 'F')] = []  # get all chromosomes
                map_dictionary[(chrom, pos+1, 'R')] = []  # get all chromosomes
        for chrom in self.chromosomes:
            for read in bam_object.fetch(chrom):
                read_positions = read.positions  # a list of covered positions
                if read.is_reverse:
                    map_dictionary[(chrom, read_positions[-1]+1,
                                    'R')].append(read.query_alignment_length)
                else:
                    map_dictionary[(chrom, read_positions[0]+1,
                                    'F')].append(read.query_alignment_length)
        return map_dictionary

    def compute_max(self, map_dictionary):
        '''
        takes a map_dictionary as input and returns
        a max_dictionary {(chromosome,read_position,polarity):
                              max_of_number_of_read_at_any_position}
        '''
        merge_keylist = [(i[0], 0) for i in map_dictionary.keys()]
        max_dictionary = dict(merge_keylist)
        for key in map_dictionary:
            if len(map_dictionary[key]) > max_dictionary[key[0]]:
                max_dictionary[key[0]] = len(map_dictionary[key])
        return max_dictionary

    def compute_mean(self, map_dictionary):
        '''
        takes a map_dictionary as input and returns
        a mean_dictionary {(chromosome,read_position,polarity):
                                                mean_value_of_reads}
        '''
        mean_dictionary = dict()
        for key in map_dictionary:
            if len(map_dictionary[key]) == 0:
                mean_dictionary[key] = 0
            else:
                mean_dictionary[key] = round(numpy.mean(map_dictionary[key]),
                                             1)
        return mean_dictionary

    def compute_median(self, map_dictionary):
        '''
        takes a map_dictionary as input and returns
        a mean_dictionary {(chromosome,read_position,polarity):
                                                    mean_value_of_reads}
        '''
        median_dictionary = dict()
        for key in map_dictionary:
            if len(map_dictionary[key]) == 0:
                median_dictionary[key] = 0
            else:
                median_dictionary[key] = numpy.median(map_dictionary[key])
        return median_dictionary

    def compute_coverage(self, map_dictionary, quality=10):
        '''
        Takes a map_dictionary and returns a dictionary of lists
        of coverage along the coordinates of pre_mirs (as keys)
        '''
        coverage_dictionary = dict()
        for chrom in self.chromosomes:
            coverage = self.bam_object.count_coverage(
                                                reference=chrom,
                                                start=0,
                                                end=self.chromosomes[chrom],
                                                quality_threshold=quality)
            """ Add the 4 coverage values """
            coverage = [sum(x) for x in zip(*coverage)]
            for pos, cov in enumerate(coverage):
                coverage_dictionary[(chrom, pos+1, "F")] = cov
                coverage_dictionary[(chrom, pos+1, "R")] = cov
        return coverage_dictionary

    def compute_size(self, map_dictionary):
        '''
        Takes a map_dictionary and returns a dictionary of sizes:
        {chrom: {size: {polarity: nbre of reads}}}
        '''
        size_dictionary = defaultdict(lambda: defaultdict(
                                      lambda: defaultdict( int )))
        for key in map_dictionary:
            if len(map_dictionary) == 0:
                #  to track empty chromosomes
                size_dictionary[key[0]][key[2]][size] = 0
                continue
            for size in map_dictionary[key]:
                size_dictionary[key[0]][key[2]][size] += 1
        return size_dictionary

    def write_size_table(self, out):
        '''
        Dataset, Chromosome, Polarity, Size, Nbr_reads
        out is an *open* file handler
        '''
        for chrom in sorted(self.size):
            sizes = self.size[chrom]['F'].keys()
            sizes.extend(self.size[chrom]['R'].keys())
            for polarity in sorted(self.size[chrom]):
                for size in range(min(sizes), max(sizes)+1):
                    try:
                        line = [self.sample_name, chrom, polarity, size,
                                self.size[chrom][polarity][size]]
                    except KeyError:
                        line = [self.sample_name, chrom, polarity, size, 0]
                    line = [str(i) for i in line]
                    out.write('\t'.join(line) + '\n')

    def write_table(self, out):
        '''
        Dataset, Chromosome, Chrom_length, Coordinate, Nbr_reads
        Polarity, Max, Mean, Median, Coverage
        out is an *open* file handler
        '''
        for key in sorted(self.map_dict):
            line = [self.sample_name, key[0], self.chromosomes[key[0]],
                    key[1], len(self.map_dict[key]), key[2], self.max[key[0]],
                    self.mean[key], self.median[key], self.coverage[key]]
            line = [str(i) for i in line]
            out.write('\t'.join(line) + '\n')


def main(inputs, samples, file_out, size_file_out=''):
    F = open(file_out, 'w')
    header = ["Dataset", "Chromosome", "Chrom_length", "Coordinate",
              "Nbr_reads", "Polarity", "Max", "Mean", "Median", "Coverage"]
    F.write('\t'.join(header) + '\n')
    if size_file_out:
        Fs = open(size_file_out, 'w')
        header = ["Dataset", "Chromosome", "Polarity", "Size", "Nbr_reads"]
        Fs.write('\t'.join(header) + '\n')
    for file, sample in zip(inputs, samples):
        mapobj = Map(file, sample)
        mapobj.write_table(F)
        if size_file_out:
            mapobj.write_size_table(Fs)
    F.close()
    if size_file_out:
        Fs.close()


if __name__ == "__main__":
    args = Parser()
    # if identical sample names # to be tested
    if len(set(args.sample_name)) != len(args.sample_name):
        args.sample_name = [name + '_' + str(i) for
                            i, name in enumerate(args.sample_name)]
    main(args.input, args.sample_name, args.output, args.sizes)