Mercurial > repos > jjohnson > ensembl_cdna_translate
diff ensembl_cdna_translate.py @ 8:5c92d0be6514 draft
Uploaded
author | jjohnson |
---|---|
date | Thu, 14 Dec 2017 13:32:00 -0500 |
parents | d59e3ce10e74 |
children |
line wrap: on
line diff
--- a/ensembl_cdna_translate.py Wed Dec 13 11:15:34 2017 -0500 +++ b/ensembl_cdna_translate.py Thu Dec 14 13:32:00 2017 -0500 @@ -21,249 +21,10 @@ import requests +from bedutil import BedEntry, bed_from_line import digest - - -server = "https://rest.ensembl.org" -ext = "/info/assembly/homo_sapiens?" -max_region = 4000000 - - -def ensembl_rest(ext, headers): - if True: print >> sys.stderr, "%s" % ext - r = requests.get(server+ext, headers=headers) - if r.status_code == 429: - print >> sys.stderr, "response headers: %s\n" % r.headers - if 'Retry-After' in r.headers: - sleep(r.headers['Retry-After']) - r = requests.get(server+ext, headers=headers) - if not r.ok: - r.raise_for_status() - return r - - -def get_species(): - results = dict() - ext = "/info/species" - req_header = {"Content-Type": "application/json"} - r = ensembl_rest(ext, req_header) - for species in r.json()['species']: - results[species['name']] = species - print >> sys.stdout,\ - "%s\t%s\t%s\t%s\t%s"\ - % (species['name'], species['common_name'], - species['display_name'], - species['strain'], - species['taxon_id']) - return results - - -def get_biotypes(species): - biotypes = [] - ext = "/info/biotypes/%s?" % species - req_header = {"Content-Type": "application/json"} - r = ensembl_rest(ext, req_header) - for entry in r.json(): - if 'biotype' in entry: - biotypes.append(entry['biotype']) - return biotypes - - -def get_toplevel(species): - coord_systems = dict() - ext = "/info/assembly/%s?" % species - req_header = {"Content-Type": "application/json"} - r = ensembl_rest(ext, req_header) - toplevel = r.json() - for seq in toplevel['top_level_region']: - if seq['coord_system'] not in coord_systems: - coord_systems[seq['coord_system']] = dict() - coord_system = coord_systems[seq['coord_system']] - coord_system[seq['name']] = int(seq['length']) - return coord_systems - - -def get_transcripts_bed(species, refseq, start, length, strand='', params=None): - bed = [] - param = params if params else '' - req_header = {"Content-Type": "text/x-bed"} - regions = range(start, length, max_region) - if not regions or regions[-1] < length: - regions.append(length) - for end in regions[1:]: - ext = "/overlap/region/%s/%s:%d-%d%s?feature=transcript;%s"\ - % (species, refseq, start, end, strand, param) - start = end + 1 - r = ensembl_rest(ext, req_header) - if r.text: - bed += r.text.splitlines() - return bed - - -def get_seq(id, seqtype,params=None): - param = params if params else '' - ext = "/sequence/id/%s?type=%s;%s" % (id, seqtype,param) - req_header = {"Content-Type": "text/plain"} - r = ensembl_rest(ext, req_header) - return r.text - - -def get_cdna(id,params=None): - return get_seq(id, 'cdna',params=params) - - -def get_cds(id,params=None): - return get_seq(id, 'cds',params=params) - - -def get_transcript_haplotypes(species,transcript): - ext = "/transcript_haplotypes/%s/%s?aligned_sequences=1" % (species,transcript) - req_header = {"Content-Type" : "application/json"} - r = ensembl_rest(ext, req_header) - decoded = r.json() - - -def bed_from_line(line): - fields = line.rstrip('\r\n').split('\t') - if len(fields) < 12: - return None - (chrom, chromStart, chromEnd, name, score, strand, - thickStart, thickEnd, itemRgb, - blockCount, blockSizes, blockStarts) = fields[0:12] - bed_entry = BedEntry(chrom=chrom, chromStart=chromStart, chromEnd=chromEnd, - name=name, score=score, strand=strand, - thickStart=thickStart, thickEnd=thickEnd, - itemRgb=itemRgb, - blockCount=blockCount, - blockSizes=blockSizes.rstrip(','), - blockStarts=blockStarts.rstrip(',')) - if len(fields) == 20: - bed_entry.second_name = fields[12] - bed_entry.cds_start_status = fields[13] - bed_entry.cds_end_status = fields[14] - bed_entry.exon_frames = fields[15] - bed_entry.biotype = fields[16] - bed_entry.gene_name = fields[17] - bed_entry.second_gene_name = fields[18] - bed_entry.gene_type = fields[19] - return bed_entry - - -class BedEntry(object): - def __init__(self, chrom=None, chromStart=None, chromEnd=None, - name=None, score=None, strand=None, - thickStart=None, thickEnd=None, itemRgb=None, - blockCount=None, blockSizes=None, blockStarts=None): - self.chrom = chrom - self.chromStart = int(chromStart) - self.chromEnd = int(chromEnd) - self.name = name - self.score = int(score) if score is not None else 0 - self.strand = '-' if str(strand).startswith('-') else '+' - self.thickStart = int(thickStart) if thickStart else self.chromStart - self.thickEnd = int(thickEnd) if thickEnd else self.chromEnd - self.itemRgb = str(itemRgb) if itemRgb is not None else r'100,100,100' - self.blockCount = int(blockCount) - if isinstance(blockSizes, str) or isinstance(blockSizes, unicode): - self.blockSizes = [int(x) for x in blockSizes.split(',')] - elif isinstance(blockSizes, list): - self.blockSizes = [int(x) for x in blockSizes] - else: - self.blockSizes = blockSizes - if isinstance(blockStarts, str) or isinstance(blockSizes, unicode): - self.blockStarts = [int(x) for x in blockStarts.split(',')] - elif isinstance(blockStarts, list): - self.blockStarts = [int(x) for x in blockStarts] - else: - self.blockStarts = blockStarts - self.second_name = None - self.cds_start_status = None - self.cds_end_status = None - self.exon_frames = None - self.biotype = None - self.gene_name = None - self.second_gene_name = None - self.gene_type = None - self.seq = None - self.pep = None - - def __str__(self): - return '%s\t%d\t%d\t%s\t%d\t%s\t%d\t%d\t%s\t%d\t%s\t%s' % ( - self.chrom, self.chromStart, self.chromEnd, - self.name, self.score, self.strand, - self.thickStart, self.thickEnd, str(self.itemRgb), self.blockCount, - ','.join([str(x) for x in self.blockSizes]), - ','.join([str(x) for x in self.blockStarts])) - - # (start, end) - def get_subrange(self, tstart, tstop, debug=False): - chromStart = self.chromStart - chromEnd = self.chromEnd - if debug: - print >> sys.stderr, "%s" % (str(self)) - r = range(self.blockCount) - if self.strand == '-': - r.reverse() - bStart = 0 - bEnd = 0 - for x in r: - bEnd = bStart + self.blockSizes[x] - if bStart <= tstart < bEnd: - if self.strand == '+': - chromStart = self.chromStart + self.blockStarts[x] +\ - (tstart - bStart) - else: - chromEnd = self.chromStart + self.blockStarts[x] +\ - self.blockSizes[x] - (tstart - bStart) - if bStart <= tstop < bEnd: - if self.strand == '+': - chromEnd = self.chromStart + self.blockStarts[x] +\ - (tstop - bStart) - else: - chromStart = self.chromStart + self.blockStarts[x] +\ - self.blockSizes[x] - (tstop - bStart) - if debug: - print >> sys.stderr,\ - "%3d %s\t%d\t%d\t%d\t%d\t%d\t%d"\ - % (x, self.strand, bStart, bEnd, - tstart, tstop, chromStart, chromEnd) - bStart += self.blockSizes[x] - return(chromStart, chromEnd) - - # get the blocks for sub range - def get_blocks(self, chromStart, chromEnd): - tblockCount = 0 - tblockSizes = [] - tblockStarts = [] - for x in range(self.blockCount): - bStart = self.chromStart + self.blockStarts[x] - bEnd = bStart + self.blockSizes[x] - if bStart > chromEnd: - break - if bEnd < chromStart: - continue - cStart = max(chromStart, bStart) - tblockStarts.append(cStart - chromStart) - tblockSizes.append(min(chromEnd, bEnd) - cStart) - tblockCount += 1 - return (tblockCount, tblockSizes, tblockStarts) - - def trim(self, tstart, tstop, debug=False): - (tchromStart, tchromEnd) =\ - self.get_subrange(tstart, tstop, debug=debug) - (tblockCount, tblockSizes, tblockStarts) =\ - self.get_blocks(tchromStart, tchromEnd) - tbed = BedEntry( - chrom=self.chrom, chromStart=tchromStart, chromEnd=tchromEnd, - name=self.name, score=self.score, strand=self.strand, - thickStart=tchromStart, thickEnd=tchromEnd, itemRgb=self.itemRgb, - blockCount=tblockCount, - blockSizes=tblockSizes, blockStarts=tblockStarts) - if self.seq: - ts = tchromStart-self.chromStart - te = tchromEnd - tchromStart + ts - tbed.seq = self.seq[ts:te] - return tbed +from ensembl_rest import get_toplevel, get_transcripts_bed, get_cds, get_cdna, max_region +from twobitreader import TwoBitFile def __main__(): @@ -282,6 +43,9 @@ '-i', '--input', default=None, help='Use BED instead of retrieving cDNA from ensembl (-) for stdin') parser.add_argument( + '-T', '--twobit', default=None, + help='Genome reference sequence in 2bit format') + parser.add_argument( '-t', '--transcripts', default=None, help='Path to output cDNA transcripts.bed (-) for stdout') parser.add_argument( @@ -325,6 +89,8 @@ biotypes = ';'.join(['biotype=%s' % bt.strip() for biotype in args.biotypes for bt in biotype.split(',') if bt.strip()]) # print >> sys.stderr, "biotypes: %s" % biotypes + twobit = TwoBitFile(args.twobit) if args.twobit else None + selected_regions = dict() # chrom:(start,end) region_pat = '^([^:]+)(?::(\d*)(?:-(\d+)([+-])?)?)?' if args.regions: @@ -340,6 +106,7 @@ if chrom not in selected_regions: selected_regions[chrom] = [] selected_regions[chrom].append([start,end,strand]) + if args.debug: print >> sys.stderr, "selected_regions: %s" % selected_regions translations = dict() # start : end : seq @@ -357,21 +124,36 @@ return False return True + def get_sequence(chrom,start,end): + if twobit: + if chrom in twobit: + return twobit[chrom][start:end] + contig = chrom[3:] if chrom.startswith('chr') else 'chr%s' % chrom + if contig in twobit: + return twobit[contig][start:end] + return None + def translate_bed(bed): translate_count = 0 if any([fa_wtr, bed_wtr]): transcript_id = bed.name refprot = None + if twobit: + bed.seq = get_sequence(bed.chrom,bed.chromStart,bed.chromEnd) + else: + bed.cdna = get_cdna(transcript_id) + cdna = bed.get_cdna() + cdna_len = len(cdna) if not args.all: try: - cds = get_cds(transcript_id) + cds = bed.get_cds() + if cds is None: + cds = get_cds(transcript_id) if len(cds) % 3 != 0: cds = cds[:-(len(cds) % 3)] refprot = translate(cds) if cds else None except: refprot = None - cdna = get_cdna(transcript_id) - cdna_len = len(cdna) for offset in range(3): seqend = cdna_len - (cdna_len - offset) % 3 aaseq = translate(cdna[offset:seqend])