comparison candisnp.py @ 0:36f6520671b3 draft

Uploaded
author cschu
date Fri, 22 May 2015 13:23:28 -0400
parents
children 9215ffe7d4d5
comparison
equal deleted inserted replaced
-1:000000000000 0:36f6520671b3
1 #!/usr/bin/env python
2
3 import os
4 import sys
5 import argparse
6 import subprocess
7 import struct
8 import shutil
9 import tempfile
10
11 import urllib2
12 import csv
13 import json
14
15
16 CANDISNP_SERVER = 'http://candisnp.tsl.ac.uk' #:8080'
17
18 CANDI_TESTDATA = {
19 "ref": "athalianaTair10",
20 "data": [{
21 "is_synonymous": "FALSE",
22 "change": "E/K",
23 "effect": "NON_SYNONYMOUS_CODING",
24 "gene": "AT1G01320",
25 "allele_freq": 0.291666667,
26 "reference_base": "C",
27 "position": 126483,
28 "chromosome": "1",
29 "is_ctga": "TRUE",
30 "in_cds": "TRUE",
31 "alternate_base": "T",
32 },
33 ],
34 }
35 """
36 'is_ctga',
37 'effect',
38 'change',
39 'gene',
40
41 """
42
43
44 SNP_DATA_HEADERS = {
45 0: 'chromosome',
46 1: 'position',
47 3: 'reference_base',
48 4: 'alternate_base',
49 7: 'info'
50 }
51
52 contentHeaders = {
53 'Content-Type': 'application/json',
54 'Accept': 'text/plain'
55 }
56
57 class SNPEffect(object):
58 def __init__(self, string):
59 effectAndData = string.strip(')').split('(')
60 # sys.stderr.write(str(effectAndData)+'\n')
61 self.effect, effectData = effectAndData[0], effectAndData[1]
62 effectData = effectData.split('|')
63 assert len(effectData) >= 11, 'Invalid effect field %s' % string
64 self.impact, self.fClass, changeDist, self.aaChange, self.aaLength, self.gene, self.transcriptBioType, isCoding, self.transcriptID, exonIntronRank, genotypeNumber, warning = (effectData + [''])[:12]
65 if self.effect in ('UPSTREAM', 'DOWNSTREAM'):
66 self.transcriptDist = changeDist
67 self.codonChange = None
68 else:
69 self.transcriptDist = None
70 self.codonChange = changeDist
71 self.isCoding = (isCoding == 'CODING')
72 self.exonIntronRank = int(exonIntronRank) if exonIntronRank else 'NA'
73 self.genotypeNumber = int(genotypeNumber)
74 self.warning = warning if warning else None
75
76 self.is_synonymous = not self.effect == 'NON_SYNONYMOUS_CODING'
77 if self.effect in ('INTRON', 'INTERGENIC'):
78 self.in_cds, self.is_synonymous = False, False
79 else:
80 self.in_cds = True
81 try:
82 self.change = '%s/%s' % (self.aaChange[0], self.aaChange[-1])
83 except:
84 self.change = ''
85
86
87 pass
88 def toDict(self):
89 def is_ctga(ref, alt):
90 return (ref == 'C' and alt == 'T') or (ref == 'G' and alt == 'A')
91 assert hasattr(self, 'is_synonymous'), 'Missing attribute: is_synonymous'
92 assert hasattr(self, 'allele_frequency'), 'Missing attribute: allele_frequency'
93 assert hasattr(self, 'reference_base'), 'Missing attribute: reference_base'
94 assert hasattr(self, 'alternate_base'), 'Missing attribute: alternate_base'
95 return { "is_synonymous": str(self.is_synonymous).upper(),
96 "change": self.change if self.change else 'NA',
97 "effect": self.effect if self.effect else 'NA',
98 "gene": self.gene if self.gene else 'NA',
99 "allele_freq": self.allele_frequency,
100 "reference_base": self.reference_base,
101 "position": self.position,
102 "chromosome": self.chromosome,
103 "is_ctga": str(is_ctga(self.reference_base, self.alternate_base)).upper(),
104 "in_cds": str(self.in_cds).upper(),
105 "alternate_base": self.alternate_base, }
106
107
108 pass
109
110
111
112 class AnnotatedSNPEffectFactory(object):
113 def __init__(self, *args, **kwargs):
114 assert len(args) >= 7, 'Not enough values in args (%s)' % args
115 self.log = kwargs['log']
116 self.chromosome = args[0]
117 self.position = int(args[1])
118 self.reference_base = args[3]
119 self.alternate_base = args[4]
120
121 # info = dict([field.split('=') for field in args[7].strip().split(';')])
122 info = dict(field.split('=') for field in args[7].strip().split(';'))
123 # self.log.write('***'+str(info.get('EFF', '@@@')+'***\n'))
124
125 self.allele_frequency = float(info.get('AF', '0.0'))
126 self.effects = [effect
127 for effect in info.get('EFF', '').split(',')
128 if effect]
129 # assert self.effects, 'No effects in info-string %s' % info.get('EF', '')
130 pass
131
132 def getEffects(self, notWanted=('UPSTREAM', 'DOWNSTREAM', 'FRAME_SHIFT')):
133 for effectString in self.effects:
134 effect = SNPEffect(effectString)
135 if effect.effect not in notWanted:
136 for attr in ('chromosome', 'position', 'reference_base', 'alternate_base', 'allele_frequency'):
137 setattr(effect, attr, getattr(self, attr))
138 yield effect.toDict()
139
140
141 pass
142 pass
143
144 def extractCandiDataFromSnpEffVCF(snpEffVCF, fo):
145 snps = []
146 for line in snpEffVCF:
147 if not line.startswith('#'):
148 #fo.write(line)
149 asf = AnnotatedSNPEffectFactory(*line.strip().split('\t'), log=fo)
150 snpEffects = list(asf.getEffects())
151 #fo.write(str(snpEffects)+'\n')
152 snps.extend(snpEffects)
153 return snps
154
155
156
157 def main(argv):
158
159 descr = ''
160 parser = argparse.ArgumentParser(description=descr)
161 parser.add_argument('--ref', help='The snpEff genome reference.')
162 parser.add_argument('snpEff_output', type=str, help='The input file.')
163 parser.add_argument('candisnp_html', type=str, help='The output file.')
164 args = parser.parse_args()
165
166 fo = open(args.candisnp_html, 'wb')
167 """
168 import subprocess
169 import urllib2
170 response = urllib2.urlopen('http://ruup.xyz:8080/monitors')
171 html = response.read()
172
173
174 # p = subprocess.Popen(['wget http://ruup.xyz:8080/monitors'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
175 # fo.write(p.communicate()[0])
176 fo.write(html)
177 fo.close()
178
179 return None
180 """
181 """
182 # This works.
183 candiData = CANDI_TESTDATA
184 candiMessage = json.dumps(candiData)
185 """
186 candiData = extractCandiDataFromSnpEffVCF(open(args.snpEff_output), fo)
187 candiMessage = json.dumps({'ref': args.ref, 'data': candiData})
188
189 request = urllib2.Request(CANDISNP_SERVER + ':8080',
190 headers=contentHeaders,
191 data=candiMessage)
192
193
194 try:
195 response = urllib2.urlopen(request)
196 candiURL = response.read()
197 except urllib2.URLError, e:
198 candiURL = ''
199 sys.stderr.write(str(e.reason) + '\n')
200
201 if candiURL:
202 body = urllib2.urlopen(candiURL)
203 fo.write(body.read())
204 #fo.write('<iframe src="%s"></iframe>\n' % candiURL)
205 else:
206 fo.write('I am sorry. CandiSNP does not pick up. Maybe (<a href="%s" target="_blank">try it manually?</a>)\n' % CANDISNP_SERVER)
207
208 fo.close()
209 pass
210
211 # main(sys.argv[1:])
212
213 if __name__ == '__main__': main(sys.argv[1:])