0
|
1 #!/usr/bin/env python
|
|
2
|
|
3 import os
|
|
4 import sys
|
|
5 import argparse
|
|
6 import subprocess
|
|
7 import struct
|
|
8 import shutil
|
|
9 import tempfile
|
|
10
|
|
11 import urllib2
|
|
12 import csv
|
|
13 import json
|
|
14
|
|
15
|
|
16 CANDISNP_SERVER = 'http://candisnp.tsl.ac.uk' #:8080'
|
|
17
|
|
18 CANDI_TESTDATA = {
|
|
19 "ref": "athalianaTair10",
|
|
20 "data": [{
|
|
21 "is_synonymous": "FALSE",
|
|
22 "change": "E/K",
|
|
23 "effect": "NON_SYNONYMOUS_CODING",
|
|
24 "gene": "AT1G01320",
|
|
25 "allele_freq": 0.291666667,
|
|
26 "reference_base": "C",
|
|
27 "position": 126483,
|
|
28 "chromosome": "1",
|
|
29 "is_ctga": "TRUE",
|
|
30 "in_cds": "TRUE",
|
|
31 "alternate_base": "T",
|
|
32 },
|
|
33 ],
|
|
34 }
|
|
35 """
|
|
36 'is_ctga',
|
|
37 'effect',
|
|
38 'change',
|
|
39 'gene',
|
|
40
|
|
41 """
|
|
42
|
|
43
|
|
44 SNP_DATA_HEADERS = {
|
|
45 0: 'chromosome',
|
|
46 1: 'position',
|
|
47 3: 'reference_base',
|
|
48 4: 'alternate_base',
|
|
49 7: 'info'
|
|
50 }
|
|
51
|
|
52 contentHeaders = {
|
|
53 'Content-Type': 'application/json',
|
|
54 'Accept': 'text/plain'
|
|
55 }
|
|
56
|
|
57 class SNPEffect(object):
|
|
58 def __init__(self, string):
|
|
59 effectAndData = string.strip(')').split('(')
|
|
60 # sys.stderr.write(str(effectAndData)+'\n')
|
|
61 self.effect, effectData = effectAndData[0], effectAndData[1]
|
|
62 effectData = effectData.split('|')
|
|
63 assert len(effectData) >= 11, 'Invalid effect field %s' % string
|
|
64 self.impact, self.fClass, changeDist, self.aaChange, self.aaLength, self.gene, self.transcriptBioType, isCoding, self.transcriptID, exonIntronRank, genotypeNumber, warning = (effectData + [''])[:12]
|
|
65 if self.effect in ('UPSTREAM', 'DOWNSTREAM'):
|
|
66 self.transcriptDist = changeDist
|
|
67 self.codonChange = None
|
|
68 else:
|
|
69 self.transcriptDist = None
|
|
70 self.codonChange = changeDist
|
|
71 self.isCoding = (isCoding == 'CODING')
|
|
72 self.exonIntronRank = int(exonIntronRank) if exonIntronRank else 'NA'
|
|
73 self.genotypeNumber = int(genotypeNumber)
|
|
74 self.warning = warning if warning else None
|
|
75
|
|
76 self.is_synonymous = not self.effect == 'NON_SYNONYMOUS_CODING'
|
|
77 if self.effect in ('INTRON', 'INTERGENIC'):
|
|
78 self.in_cds, self.is_synonymous = False, False
|
|
79 else:
|
|
80 self.in_cds = True
|
|
81 try:
|
|
82 self.change = '%s/%s' % (self.aaChange[0], self.aaChange[-1])
|
|
83 except:
|
|
84 self.change = ''
|
|
85
|
|
86
|
|
87 pass
|
|
88 def toDict(self):
|
|
89 def is_ctga(ref, alt):
|
|
90 return (ref == 'C' and alt == 'T') or (ref == 'G' and alt == 'A')
|
|
91 assert hasattr(self, 'is_synonymous'), 'Missing attribute: is_synonymous'
|
|
92 assert hasattr(self, 'allele_frequency'), 'Missing attribute: allele_frequency'
|
|
93 assert hasattr(self, 'reference_base'), 'Missing attribute: reference_base'
|
|
94 assert hasattr(self, 'alternate_base'), 'Missing attribute: alternate_base'
|
|
95 return { "is_synonymous": str(self.is_synonymous).upper(),
|
|
96 "change": self.change if self.change else 'NA',
|
|
97 "effect": self.effect if self.effect else 'NA',
|
|
98 "gene": self.gene if self.gene else 'NA',
|
|
99 "allele_freq": self.allele_frequency,
|
|
100 "reference_base": self.reference_base,
|
|
101 "position": self.position,
|
|
102 "chromosome": self.chromosome,
|
|
103 "is_ctga": str(is_ctga(self.reference_base, self.alternate_base)).upper(),
|
|
104 "in_cds": str(self.in_cds).upper(),
|
|
105 "alternate_base": self.alternate_base, }
|
|
106
|
|
107
|
|
108 pass
|
|
109
|
|
110
|
|
111
|
|
112 class AnnotatedSNPEffectFactory(object):
|
|
113 def __init__(self, *args, **kwargs):
|
|
114 assert len(args) >= 7, 'Not enough values in args (%s)' % args
|
|
115 self.log = kwargs['log']
|
|
116 self.chromosome = args[0]
|
|
117 self.position = int(args[1])
|
|
118 self.reference_base = args[3]
|
|
119 self.alternate_base = args[4]
|
|
120
|
|
121 # info = dict([field.split('=') for field in args[7].strip().split(';')])
|
|
122 info = dict(field.split('=') for field in args[7].strip().split(';'))
|
|
123 # self.log.write('***'+str(info.get('EFF', '@@@')+'***\n'))
|
|
124
|
|
125 self.allele_frequency = float(info.get('AF', '0.0'))
|
|
126 self.effects = [effect
|
|
127 for effect in info.get('EFF', '').split(',')
|
|
128 if effect]
|
|
129 # assert self.effects, 'No effects in info-string %s' % info.get('EF', '')
|
|
130 pass
|
|
131
|
|
132 def getEffects(self, notWanted=('UPSTREAM', 'DOWNSTREAM', 'FRAME_SHIFT')):
|
|
133 for effectString in self.effects:
|
|
134 effect = SNPEffect(effectString)
|
|
135 if effect.effect not in notWanted:
|
|
136 for attr in ('chromosome', 'position', 'reference_base', 'alternate_base', 'allele_frequency'):
|
|
137 setattr(effect, attr, getattr(self, attr))
|
|
138 yield effect.toDict()
|
|
139
|
|
140
|
|
141 pass
|
|
142 pass
|
|
143
|
|
144 def extractCandiDataFromSnpEffVCF(snpEffVCF, fo):
|
|
145 snps = []
|
|
146 for line in snpEffVCF:
|
|
147 if not line.startswith('#'):
|
|
148 #fo.write(line)
|
|
149 asf = AnnotatedSNPEffectFactory(*line.strip().split('\t'), log=fo)
|
|
150 snpEffects = list(asf.getEffects())
|
|
151 #fo.write(str(snpEffects)+'\n')
|
|
152 snps.extend(snpEffects)
|
|
153 return snps
|
|
154
|
|
155
|
|
156
|
|
157 def main(argv):
|
|
158
|
|
159 descr = ''
|
|
160 parser = argparse.ArgumentParser(description=descr)
|
|
161 parser.add_argument('--ref', help='The snpEff genome reference.')
|
|
162 parser.add_argument('snpEff_output', type=str, help='The input file.')
|
|
163 parser.add_argument('candisnp_html', type=str, help='The output file.')
|
|
164 args = parser.parse_args()
|
|
165
|
|
166 fo = open(args.candisnp_html, 'wb')
|
|
167 """
|
|
168 import subprocess
|
|
169 import urllib2
|
|
170 response = urllib2.urlopen('http://ruup.xyz:8080/monitors')
|
|
171 html = response.read()
|
|
172
|
|
173
|
|
174 # p = subprocess.Popen(['wget http://ruup.xyz:8080/monitors'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
|
175 # fo.write(p.communicate()[0])
|
|
176 fo.write(html)
|
|
177 fo.close()
|
|
178
|
|
179 return None
|
|
180 """
|
|
181 """
|
|
182 # This works.
|
|
183 candiData = CANDI_TESTDATA
|
|
184 candiMessage = json.dumps(candiData)
|
|
185 """
|
|
186 candiData = extractCandiDataFromSnpEffVCF(open(args.snpEff_output), fo)
|
|
187 candiMessage = json.dumps({'ref': args.ref, 'data': candiData})
|
|
188
|
|
189 request = urllib2.Request(CANDISNP_SERVER + ':8080',
|
|
190 headers=contentHeaders,
|
|
191 data=candiMessage)
|
|
192
|
|
193
|
|
194 try:
|
|
195 response = urllib2.urlopen(request)
|
|
196 candiURL = response.read()
|
|
197 except urllib2.URLError, e:
|
|
198 candiURL = ''
|
|
199 sys.stderr.write(str(e.reason) + '\n')
|
|
200
|
|
201 if candiURL:
|
|
202 body = urllib2.urlopen(candiURL)
|
|
203 fo.write(body.read())
|
|
204 #fo.write('<iframe src="%s"></iframe>\n' % candiURL)
|
|
205 else:
|
|
206 fo.write('I am sorry. CandiSNP does not pick up. Maybe (<a href="%s" target="_blank">try it manually?</a>)\n' % CANDISNP_SERVER)
|
|
207
|
|
208 fo.close()
|
|
209 pass
|
|
210
|
|
211 # main(sys.argv[1:])
|
|
212
|
|
213 if __name__ == '__main__': main(sys.argv[1:])
|