Mercurial > repos > sanbi-uwc > vcf_to_alignment
diff vcf_to_msa.py @ 4:f58178c0f00d draft
planemo upload for repository https://github.com/sanbi-sa/tools-sanbi-uwc commit 3e5c71977e50ec920d4f45be809d2528e55bff76
author | sanbi-uwc |
---|---|
date | Mon, 15 Oct 2018 00:34:44 -0400 |
parents | 62fbd3f96b30 |
children |
line wrap: on
line diff
--- a/vcf_to_msa.py Fri Feb 03 05:56:11 2017 -0500 +++ b/vcf_to_msa.py Mon Oct 15 00:34:44 2018 -0400 @@ -11,10 +11,16 @@ import vcf import intervaltree from operator import itemgetter +from pathlib import Path -difference = lambda x,y: 0 if x == y else 1 + +def difference(x, y): + return 0 if x == y else 1 -string_difference = lambda query, target, query_len: sum((difference(query[i], target[i])) for i in range(query_len)) + +def string_difference(query, target, query_len): + return sum((difference(query[i], target[i])) for i in range(query_len)) + def fuzzysearch(query, target): query_len = len(query) @@ -23,18 +29,49 @@ min_distance = string_difference(query, target, query_len) best_pos = 0 for i in range(0, target_len - query_len + 1): - distance = string_difference(query, target[i:i+query_len], query_len) + distance = string_difference(query, target[i:i + query_len], query_len) if distance < min_distance: (min_distance, best_pos) = (distance, i) return best_pos + +class readable_dir(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + prospective_dir = values + if not os.path.isdir(prospective_dir): + raise argparse.ArgumentTypeError("readable_dir:{0} is not a valid path".format(prospective_dir)) + if os.access(prospective_dir, os.R_OK): + setattr(namespace, self.dest, prospective_dir) + else: + raise argparse.ArgumentTypeError("readable_dir:{0} is not a readable dir".format(prospective_dir)) + + parser = argparse.ArgumentParser() parser.add_argument('--vcf_files', nargs="+") -parser.add_argument('--reference_file', type=argparse.FileType()) -parser.add_argument('--output_file', type=argparse.FileType('w')) +parser.add_argument('-d', '--vcf_dir', action=readable_dir, help="VCF directory ") +parser.add_argument('--reference_file', required=True, type=argparse.FileType()) +parser.add_argument('--output_file', required=True, type=argparse.FileType('w')) parser.add_argument('--remove_invariant', action='store_true', default=False) +parser.add_argument('--exclude', type=argparse.FileType(), help='BED format file of regions to exclude from variant calling') args = parser.parse_args() +exclude_trees = {} +if args.exclude is not None: + for line in args.exclude: + # all of BED format that we care about is chromosome, start, end + fields = line.strip().split('\t') + if len(fields) < 3: + continue + chrom = fields[0] + start = int(fields[1]) + end = int(fields[2]) + if chrom not in exclude_trees: + tree = intervaltree.IntervalTree() + exclude_trees[chrom] = tree + else: + tree = exclude_trees[chrom] + tree[start:end] = True + do_inserts = False do_deletes = False do_snps = True @@ -45,7 +82,7 @@ # vcf_files_dir = os.path.expanduser("~/Data/vcf") # vcf_files = [os.path.join(vcf_files_dir, "vcf{}.vcf".format(num)) for num in range(1,4)] # print(vcf_files) -reference_seq = SeqIO.read(args.reference_file, "fasta") +reference_seq = SeqIO.read(args.reference_file, "fasta") reference = str(reference_seq.seq) # output_file = open(os.path.join(os.path.expanduser("~/Data/fasta/vcf_to_msa"), 'output.fasta'), 'w') insertions = {} @@ -55,23 +92,39 @@ sequences = {} if args.remove_invariant: variant_sites = set() -for i, vcf_descriptor in enumerate(args.vcf_files): - # seqname = os.path.splitext(os.path.basename(vcf_filename))[0] - (seqname,vcf_filename) = vcf_descriptor.split('^^^') + +vcf_list = [] +if args.vcf_dir: + pathlist = Path(args.vcf_dir).glob('*.vcf') + for path in pathlist: + vcf_list.append(str(path)) +elif args.vcf_files: + vcf_list = args.vcf_files + +for i, vcf_descriptor in enumerate(vcf_list): + # print(os.path.basename(vcf_descriptor)) + if '^^^' in vcf_descriptor: + (seqname, vcf_filename) = vcf_descriptor.split('^^^') + else: + seqname = str(os.path.basename(vcf_descriptor)).rsplit('.vcf', 1)[0] + vcf_filename = vcf_descriptor sequence_names.append(seqname) sequence = list(reference) sequences[seqname] = sequence - print(seqname) - # tsv_filename = vcf_filename.replace(".vcf", ".tsv") - # output = open(tsv_filename, "wb") + insertions[seqname] = [] count = 0 for record in vcf.VCFReader(filename=vcf_filename): - type="unknown" + if args.exclude: + if record.CHROM in exclude_trees: + tree = exclude_trees[record.CHROM] + if tree.overlaps(record.affected_start, record.affected_end): + continue + type = "unknown" if record.is_snp and do_snps: - type="snp" + type = "snp" try: - sequence[record.affected_start] = str(record.alleles[1]) # SNP, simply insert alt allele + sequence[record.affected_start] = str(record.alleles[1]) # SNP, simply insert alt allele except IndexError as e: print("snp: Error assigning to {}:{}: {}".format(record.affected_start, record.affected_end, str(e)), file=sys.stderr) if args.remove_invariant: @@ -80,7 +133,7 @@ elif record.is_indel: length = record.affected_end - record.affected_start if record.is_deletion and do_deletes: - type="del" + type = "del" try: sequence[record.affected_start:record.affected_end] = ['-'] * length except IndexError as e: @@ -89,7 +142,7 @@ else: if do_inserts: print("Warning: insert processing from VCF is dangerously broken", file=sys.stderr) - type="ins" + type = "ins" # insertions[seqname].append(record) ref = str(record.alleles[0]) alt = str(record.alleles[1]) @@ -138,7 +191,7 @@ seq_str = ''.join(sequence) SeqIO.write(SeqRecord(Seq(seq_str, alphabet=IUPAC.ambiguous_dna), id=name, description=""), args.output_file, "fasta") - # output.write(bytes("\t".join([type, str(record.affected_start), str(record.affected_end), str(record.alleles[0]), str(record.alleles[1])])+"\n", encoding="ascii")) + # output.write(bytes("\t".join([type, str(record.affected_start), str(record.affected_end), str(record.alleles[0]), str(record.alleles[1])])+"\n", encoding="ascii")) # output.close() args.output_file.close()