Mercurial > repos > sanbi-uwc > qc
diff qc.py @ 0:2aa014ad54bc draft
"planemo upload for repository https://github.com/SANBI-SA/tools-sanbi-uwc/tree/master/tools/qc commit 54450d56a66bd4b01a17c8ec8aa3e649e3e4749f-dirty"
author | sanbi-uwc |
---|---|
date | Tue, 30 Mar 2021 15:00:18 +0000 |
parents | |
children | 878f18249f76 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/qc.py Tue Mar 30 15:00:18 2021 +0000 @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 + +from Bio import SeqIO +import csv +import subprocess +import pandas as pd +import matplotlib.pyplot as plt +import shlex + +""" +This script can incorporate as many QC checks as required +as long as it outputs a csv file containing a final column +headed with 'qc_pass' and rows for each sample indcating +'TRUE' if the overall QC check has passed or 'FALSE' if not. +""" + +def make_qc_plot(depth_pos, n_density, samplename, window=200): + depth_df = pd.DataFrame( { 'position' : [pos[1] for pos in depth_pos], 'depth' : [dep[2] for dep in depth_pos] } ) + depth_df['depth_moving_average'] = depth_df.iloc[:,1].rolling(window=window).mean() + + n_df = pd.DataFrame( { 'position' : [pos[0] for pos in n_density], 'n_density' : [dens[1] for dens in n_density] } ) + + fig, ax1 = plt.subplots() + ax2 = ax1.twinx() + + ax1.set_xlabel('Position') + + ax1.set_ylabel('Depth', color = 'g') + ax1.set_ylim(top=10**5, bottom=1) + ax1.set_yscale('log') + ax1.plot(depth_df['depth_moving_average'], color = 'g') + + ax2.set_ylabel('N density', color = 'r') + ax2.plot(n_df['n_density'], color = 'r') + ax2.set_ylim(top=1) + + plt.title(samplename) + plt.savefig(samplename + '.depth.png') + +def read_depth_file(bamfile): + p = subprocess.Popen(['samtools', 'depth', '-a', '-d', '0', bamfile], + stdout=subprocess.PIPE) + out, err = p.communicate() + counter = 0 + + pos_depth = [] + for ln in out.decode('utf-8').split("\n"): + if ln: + pos_depth.append(ln.split("\t")) + + return pos_depth + + +def get_covered_pos(pos_depth, min_depth): + counter = 0 + for contig, pos,depth in pos_depth: + if int(depth) >= min_depth: + counter = counter + 1 + + return counter + +def get_N_positions(fasta): + n_pos = [i for i, letter in enumerate(fasta.seq.lower()) if letter == 'n'] + + return n_pos + +def get_pct_N_bases(fasta): + + count_N = len(get_N_positions(fasta)) + + pct_N_bases = count_N / len(fasta.seq) * 100 + + return pct_N_bases + +def get_largest_N_gap(fasta): + n_pos = get_N_positions(fasta) + + n_pos = [0] + n_pos + [len(fasta.seq)] + + n_gaps = [j-i for i, j in zip(n_pos[:-1], n_pos[1:])] + + return sorted(n_gaps)[-1] + +def get_ref_length(ref): + record = SeqIO.read(ref, "fasta") + return len(record.seq) + + +def sliding_window_N_density(sequence, window=10): + + sliding_window_n_density = [] + for i in range(0, len(sequence.seq), 1): + window_mid = i + ( window / 2) + window_seq = sequence.seq[i:i+window] + n_count = window_seq.lower().count('n') + n_density = n_count / window + + sliding_window_n_density.append( [ window_mid, n_density ] ) + + return sliding_window_n_density + +def get_num_reads(bamfile): + + st_filter = '0x900' + command = 'samtools view -c -F{} {}'.format(st_filter, bamfile) + what = shlex.split(command) + + return subprocess.check_output(what).decode().strip() + +def go(args): + if args.illumina: + depth = 10 + elif args.nanopore: + depth = 20 + + ## Depth calcs + ref_length = get_ref_length(args.ref) + depth_pos = read_depth_file(args.bam) + + depth_covered_bases = get_covered_pos(depth_pos, depth) + + pct_covered_bases = depth_covered_bases / ref_length * 100 + + ## Number of aligned reads calculaton + num_reads = get_num_reads(args.bam) + + # Unknown base calcs + fasta = SeqIO.read(args.fasta, "fasta") + + pct_N_bases = 0 + largest_N_gap = 0 + qc_pass = "FALSE" + + if len(fasta.seq) != 0: + + pct_N_bases = get_pct_N_bases(fasta) + largest_N_gap = get_largest_N_gap(fasta) + + # QC PASS / FAIL + if largest_N_gap >= 10000 or pct_N_bases < 50.0: + qc_pass = "TRUE" + + + qc_line = { 'sample_name' : args.sample, + 'pct_N_bases' : "{:.2f}".format(pct_N_bases), + 'pct_covered_bases' : "{:.2f}".format(pct_covered_bases), + 'longest_no_N_run' : largest_N_gap, + 'num_aligned_reads' : num_reads, + 'fasta': args.fasta, + 'bam' : args.bam, + 'qc_pass' : qc_pass} + + + with open(args.outfile, 'w') as csvfile: + header = qc_line.keys() + writer = csv.DictWriter(csvfile, fieldnames=header) + writer.writeheader() + writer.writerow(qc_line) + + N_density = sliding_window_N_density(fasta) + make_qc_plot(depth_pos, N_density, args.sample) + +def main(): + import argparse + + parser = argparse.ArgumentParser() + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument('--nanopore', action='store_true') + group.add_argument('--illumina', action='store_true') + parser.add_argument('--outfile', required=True) + parser.add_argument('--sample', required=True) + parser.add_argument('--ref', required=True) + parser.add_argument('--bam', required=True) + parser.add_argument('--fasta', required=True) + + args = parser.parse_args() + go(args) + +if __name__ == "__main__": + main()