Mercurial > repos > jjohnson > ensembl_cdna_translate
view ensembl_cdna_translate.py @ 6:f8e0e5e237a5 draft
Uploaded
author | jjohnson |
---|---|
date | Wed, 29 Nov 2017 17:46:00 -0500 |
parents | b7f2f5e3390c |
children | d59e3ce10e74 |
line wrap: on
line source
#!/usr/bin/env python """ # #------------------------------------------------------------------------------ # University of Minnesota # Copyright 2017, Regents of the University of Minnesota #------------------------------------------------------------------------------ # Author: # # James E Johnson # #------------------------------------------------------------------------------ """ import argparse import sys from time import sleep from Bio.Seq import translate import requests import digest server = "https://rest.ensembl.org" ext = "/info/assembly/homo_sapiens?" max_region = 5000000 def ensembl_rest(ext, headers): # if True: print >> sys.stderr, "%s" % ext r = requests.get(server+ext, headers=headers) if r.status_code == 429: print >> sys.stderr, "response headers: %s\n" % r.headers if 'Retry-After' in r.headers: sleep(r.headers['Retry-After']) r = requests.get(server+ext, headers=headers) if not r.ok: r.raise_for_status() return r def get_species(): results = dict() ext = "/info/species" req_header = {"Content-Type": "application/json"} r = ensembl_rest(ext, req_header) for species in r.json()['species']: results[species['name']] = species print >> sys.stdout,\ "%s\t%s\t%s\t%s\t%s"\ % (species['name'], species['common_name'], species['display_name'], species['strain'], species['taxon_id']) return results def get_biotypes(species): biotypes = [] ext = "/info/biotypes/%s?" % species req_header = {"Content-Type": "application/json"} r = ensembl_rest(ext, req_header) for entry in r.json(): if 'biotype' in entry: biotypes.append(entry['biotype']) return biotypes def get_toplevel(species): coord_systems = dict() ext = "/info/assembly/%s?" % species req_header = {"Content-Type": "application/json"} r = ensembl_rest(ext, req_header) toplevel = r.json() for seq in toplevel['top_level_region']: if seq['coord_system'] not in coord_systems: coord_systems[seq['coord_system']] = dict() coord_system = coord_systems[seq['coord_system']] coord_system[seq['name']] = int(seq['length']) return coord_systems def get_transcripts_bed(species, refseq, start, length,params=None): bed = [] param = params if params else '' req_header = {"Content-Type": "text/x-bed"} regions = range(start, length, max_region) if not regions or regions[-1] < length: regions.append(length) for end in regions[1:]: ext = "/overlap/region/%s/%s:%d-%d?feature=transcript;%s"\ % (species, refseq, start, end, param) start = end + 1 r = ensembl_rest(ext, req_header) if r.text: bed += r.text.splitlines() return bed def get_seq(id, seqtype,params=None): param = params if params else '' ext = "/sequence/id/%s?type=%s;%s" % (id, seqtype,param) req_header = {"Content-Type": "text/plain"} r = ensembl_rest(ext, req_header) return r.text def get_cdna(id,params=None): return get_seq(id, 'cdna',params=params) def get_cds(id,params=None): return get_seq(id, 'cds',params=params) def bed_from_line(line): fields = line.rstrip('\r\n').split('\t') (chrom, chromStart, chromEnd, name, score, strand, thickStart, thickEnd, itemRgb, blockCount, blockSizes, blockStarts) = fields[0:12] bed_entry = BedEntry(chrom=chrom, chromStart=chromStart, chromEnd=chromEnd, name=name, score=score, strand=strand, thickStart=thickStart, thickEnd=thickEnd, itemRgb=itemRgb, blockCount=blockCount, blockSizes=blockSizes.rstrip(','), blockStarts=blockStarts.rstrip(',')) return bed_entry class BedEntry(object): def __init__(self, chrom=None, chromStart=None, chromEnd=None, name=None, score=None, strand=None, thickStart=None, thickEnd=None, itemRgb=None, blockCount=None, blockSizes=None, blockStarts=None): self.chrom = chrom self.chromStart = int(chromStart) self.chromEnd = int(chromEnd) self.name = name self.score = int(score) if score is not None else 0 self.strand = '-' if str(strand).startswith('-') else '+' self.thickStart = int(thickStart) if thickStart else self.chromStart self.thickEnd = int(thickEnd) if thickEnd else self.chromEnd self.itemRgb = str(itemRgb) if itemRgb is not None else r'100,100,100' self.blockCount = int(blockCount) if isinstance(blockSizes, str) or isinstance(blockSizes, unicode): self.blockSizes = [int(x) for x in blockSizes.split(',')] elif isinstance(blockSizes, list): self.blockSizes = [int(x) for x in blockSizes] else: self.blockSizes = blockSizes if isinstance(blockStarts, str) or isinstance(blockSizes, unicode): self.blockStarts = [int(x) for x in blockStarts.split(',')] elif isinstance(blockStarts, list): self.blockStarts = [int(x) for x in blockStarts] else: self.blockStarts = blockStarts self.seq = None self.pep = None def __str__(self): return '%s\t%d\t%d\t%s\t%d\t%s\t%d\t%d\t%s\t%d\t%s\t%s' % ( self.chrom, self.chromStart, self.chromEnd, self.name, self.score, self.strand, self.thickStart, self.thickEnd, str(self.itemRgb), self.blockCount, ','.join([str(x) for x in self.blockSizes]), ','.join([str(x) for x in self.blockStarts])) # (start, end) def get_subrange(self, tstart, tstop, debug=False): chromStart = self.chromStart chromEnd = self.chromEnd if debug: print >> sys.stderr, "%s" % (str(self)) r = range(self.blockCount) if self.strand == '-': r.reverse() bStart = 0 bEnd = 0 for x in r: bEnd = bStart + self.blockSizes[x] if bStart <= tstart < bEnd: if self.strand == '+': chromStart = self.chromStart + self.blockStarts[x] +\ (tstart - bStart) else: chromEnd = self.chromStart + self.blockStarts[x] +\ self.blockSizes[x] - (tstart - bStart) if bStart <= tstop < bEnd: if self.strand == '+': chromEnd = self.chromStart + self.blockStarts[x] +\ (tstop - bStart) else: chromStart = self.chromStart + self.blockStarts[x] +\ self.blockSizes[x] - (tstop - bStart) if debug: print >> sys.stderr,\ "%3d %s\t%d\t%d\t%d\t%d\t%d\t%d"\ % (x, self.strand, bStart, bEnd, tstart, tstop, chromStart, chromEnd) bStart += self.blockSizes[x] return(chromStart, chromEnd) # get the blocks for sub range def get_blocks(self, chromStart, chromEnd): tblockCount = 0 tblockSizes = [] tblockStarts = [] for x in range(self.blockCount): bStart = self.chromStart + self.blockStarts[x] bEnd = bStart + self.blockSizes[x] if bStart > chromEnd: break if bEnd < chromStart: continue cStart = max(chromStart, bStart) tblockStarts.append(cStart - chromStart) tblockSizes.append(min(chromEnd, bEnd) - cStart) tblockCount += 1 return (tblockCount, tblockSizes, tblockStarts) def trim(self, tstart, tstop, debug=False): (tchromStart, tchromEnd) =\ self.get_subrange(tstart, tstop, debug=debug) (tblockCount, tblockSizes, tblockStarts) =\ self.get_blocks(tchromStart, tchromEnd) tbed = BedEntry( chrom=self.chrom, chromStart=tchromStart, chromEnd=tchromEnd, name=self.name, score=self.score, strand=self.strand, thickStart=tchromStart, thickEnd=tchromEnd, itemRgb=self.itemRgb, blockCount=tblockCount, blockSizes=tblockSizes, blockStarts=tblockStarts) if self.seq: ts = tchromStart-self.chromStart te = tchromEnd - tchromStart + ts tbed.seq = self.seq[ts:te] return tbed def __main__(): parser = argparse.ArgumentParser( description='Retrieve Ensembl cDNAs and three frame translate') parser.add_argument( '-s', '--species', default='human', help='Ensembl Species to retrieve') parser.add_argument( '-B', '--biotypes', action='append', default=[], help='Restrict Ensembl biotypes to retrieve') parser.add_argument( '-i', '--input', default=None, help='Use BED instead of retrieving cDNA from ensembl (-) for stdin') parser.add_argument( '-t', '--transcripts', default=None, help='Path to output cDNA transcripts.bed (-) for stdout') parser.add_argument( '-r', '--raw', action='store_true', help='Report transcript exacty as returned from Ensembl') parser.add_argument( '-f', '--fasta', default=None, help='Path to output translations.fasta') parser.add_argument( '-b', '--bed', default=None, help='Path to output translations.bed') parser.add_argument( '-m', '--min_length', type=int, default=7, help='Minimum length of protein translation to report') parser.add_argument( '-e', '--enzyme', default=None, help='Digest translation with enzyme') parser.add_argument( '-a', '--all', action='store_true', help='Include reference protein translations') parser.add_argument('-v', '--verbose', action='store_true', help='Verbose') parser.add_argument('-d', '--debug', action='store_true', help='Debug') args = parser.parse_args() # print >> sys.stderr, "args: %s" % args species = args.species input_rdr = None if args.input is not None: input_rdr = open(args.input, 'r') if args.input != '-' else sys.stdin tx_wtr = None if args.transcripts is not None: tx_wtr = open(args.transcripts, 'w')\ if args.transcripts != '-' else sys.stdout fa_wtr = open(args.fasta, 'w') if args.fasta is not None else None bed_wtr = open(args.bed, 'w') if args.bed is not None else None enzyme = digest.expasy_rules.get(args.enzyme,args.enzyme) # print >> sys.stderr, "args biotypes: %s" % args.biotypes biotypea = ['biotype=%s' % bt.strip() for biotype in args.biotypes for bt in biotype.split(',')] # print >> sys.stderr, "args biotypes: %s" % biotypea biotypes = ';'.join(['biotype=%s' % bt.strip() for biotype in args.biotypes for bt in biotype.split(',') if bt.strip()]) # print >> sys.stderr, "biotypes: %s" % biotypes translations = dict() # start : end : seq def unique_prot(tbed, seq): if tbed.chromStart not in translations: translations[tbed.chromStart] = dict() translations[tbed.chromStart][tbed.chromEnd] = [] translations[tbed.chromStart][tbed.chromEnd].append(seq) elif tbed.chromEnd not in translations[tbed.chromStart]: translations[tbed.chromStart][tbed.chromEnd] = [] translations[tbed.chromStart][tbed.chromEnd].append(seq) elif seq not in translations[tbed.chromStart][tbed.chromEnd]: translations[tbed.chromStart][tbed.chromEnd].append(seq) else: return False return True def translate_bed(bed): translate_count = 0 if any([fa_wtr, bed_wtr]): transcript_id = bed.name refprot = None if not args.all: try: cds = get_cds(transcript_id) if len(cds) % 3 != 0: cds = cds[:-(len(cds) % 3)] refprot = translate(cds) if cds else None except: refprot = None cdna = get_cdna(transcript_id) cdna_len = len(cdna) for offset in range(3): seqend = cdna_len - (cdna_len - offset) % 3 aaseq = translate(cdna[offset:seqend]) aa_start = 0 while aa_start < len(aaseq): aa_end = aaseq.find('*', aa_start) if aa_end < 0: aa_end = len(aaseq) prot = aaseq[aa_start:aa_end] if enzyme and refprot: frags = digest._cleave(prot,enzyme) for frag in reversed(frags): if frag in refprot: prot = prot[:prot.rfind(frag)] else: break if len(prot) < args.min_length: pass elif refprot and prot in refprot: pass else: tstart = aa_start*3+offset tend = aa_end*3+offset prot_acc = "%s_%d_%d" % (transcript_id, tstart, tend) tbed = bed.trim(tstart, tend) if args.all or unique_prot(tbed, prot): translate_count += 1 tbed.name = prot_acc bed_wtr.write("%s\t%s\n" % (str(tbed), prot)) bed_wtr.flush() fa_id = ">%s\n" % (prot_acc) fa_wtr.write(fa_id) fa_wtr.write(prot) fa_wtr.write("\n") fa_wtr.flush() aa_start = aa_end + 1 return translate_count if input_rdr: translation_count = 0 for i, bedline in enumerate(input_rdr): try: bed = bed_from_line(bedline) translation_count += translate_bed(bed) except: print >> sys.stderr, "BED format error: %s\n" % bedline if args.debug or (args.verbose and any([fa_wtr, bed_wtr])): print >> sys.stderr,\ "%s\tcDNA translations:%d" % (species, translation_count) else: coord_systems = get_toplevel(species) if 'chromosome' in coord_systems: for ref in sorted(coord_systems['chromosome'].keys()): length = coord_systems['chromosome'][ref] if not any([tx_wtr, fa_wtr, bed_wtr]): print >> sys.stderr,\ "%s\t%s\tlength: %d" % (species, ref, length) continue if args.debug: print >> sys.stderr,\ "Retrieving transcripts: %s\t%s\tlength: %d"\ % (species, ref, length) translation_count = 0 start = 0 regions = range(start, length, max_region) if not regions or regions[-1] < length: regions.append(length) for end in regions[1:]: bedlines = get_transcripts_bed(species, ref, start, end, params=biotypes) if args.verbose or args.debug: print >> sys.stderr,\ "%s\t%s\tstart: %d\tend: %d\tcDNA transcripts:%d"\ % (species, ref, start, end, len(bedlines)) # start, end, seq for i, bedline in enumerate(bedlines): try: bed = bed_from_line(bedline)\ if any([not args.raw, fa_wtr, bed_wtr])\ else None if tx_wtr: tx_wtr.write(bedline if args.raw else str(bed)) tx_wtr.write("\n") tx_wtr.flush() if bed: translation_count += translate_bed(bed) except Exception as e: print >> sys.stderr,\ "BED error (%s) : %s\n" % (e, bedline) start = end + 1 if args.debug or (args.verbose and any([fa_wtr, bed_wtr])): print >> sys.stderr,\ "%s\t%s\tlength: %d\tcDNA translations:%d"\ % (species, ref, length, translation_count) if __name__ == "__main__": __main__()