Mercurial > repos > fubar > brokenandnotdeletablebyowneroradmin
view rgToolFactory.py @ 8:220885b2d7ee
End to end test works. Add tests next
author | ross lazarus ross.lazarus@gmail.com |
---|---|
date | Sat, 02 Jun 2012 20:02:11 +1000 |
parents | 7221619caefa |
children | e09c76551bed |
line wrap: on
line source
# rgDynamicScriptWrapper.py # derived from # rgBaseScriptWrapper.py # to run some user supplied code # extremely dangerous # trusted users only - private site only # a list in the xml is searched - only users in the list can run this tool. # # copyright ross lazarus (ross.lazarus@gmail.com) May 2012 # # all rights reserved # Licensed under the LGPL for your pleasure # Derived from rgDGE.py in May 2012 # generalized to run required interpreter # to make your own tools based on a given script and interpreter such as perl or python # clone this and the corresponding xml wrapper # replace the parameters/inputs/outputs and the configfile contents with your script # Use the $foo syntax to place your parameter values inside the script to assign them - at run time, the script will be used as a template # and returned as part of the output to the user - with the right values for all the parameters. # Note that this assumes you want all the outputs arranged as a single Html file output # after this generic script runner runs your script with the specified interpreter, # it will collect all output files into the specified output_html, making thumbnails for all the pdfs it finds and making links for all the other files. import sys import shutil import subprocess import os import time import tempfile import optparse import tarfile import re import shutil progname = os.path.split(sys.argv[0])[1] myversion = 'V000.1 May 2012' verbose = False debug = False def timenow(): """return current time as a string """ return time.strftime('%d/%m/%Y %H:%M:%S', time.localtime(time.time())) # characters that are allowed but need to be escaped mapped_chars = { '>' :'__gt__', '<' :'__lt__', "'" :'__sq__', '"' :'__dq__', '{' :'__oc__', '}' :'__cc__', '@' : '__at__', '\n' : '__cn__', '\r' : '__cr__', '\t' : '__tc__', '#' : '__pd__', '[' :'__ob__', ']' :'__cb__', '\t' : 'Xt', 'systemCallsAreNotAllowed' : 'system' } def restore_text(text): """Restores sanitized text""" if not text: return text for key, value in mapped_chars.items(): text = text.replace(value, key) return text class ScriptRunner: """class is a wrapper for an arbitrary script """ def __init__(self,opts=None): """ run the script cheetah/galaxy will provide an escaped string so __pd__ your script goes here __cr____cn__ourargs __lt__- commandArgs(TRUE) __cr____cn__inf = ourargs[1] __cr____cn__outf = ourargs[2] __cr____cn__inp = read.table(inf,head=T,rownames=F,sep=__sq__Xt__sq__) __cr____cn__ write.table(inp,outf, quote=FALSE, sep=__dq__Xt__dq__,row.names=F) __cr____cn__sessionInfo() __cr____cn__ """ if opts.output_dir: # simplify for the tool tarball os.chdir(opts.output_dir) self.thumbformat = 'jpg' self.opts = opts self.toolname = re.sub('[^a-zA-Z0-9_]+', '', opts.tool_name) self.toolid = self.toolname s = open(self.opts.script_path,'r').read() self.script = restore_text(s) self.myname = sys.argv[0] # get our name because we write ourselves out as a tool later self.pyfile = self.myname # crude but efficient - the cruft won't hurt much self.xmlfile = '%s.xml' % self.toolname self.sfile = '%s.%s' % (self.toolname,opts.interpreter) localscript = open(self.sfile,'w') localscript.write(self.script) localscript.close() if opts.output_dir or self.opts.make_Tool: # may not want these complexities if a simple script self.tlog = os.path.join(opts.output_dir,"%s_runner.log" % self.toolname) artifactpath = os.path.join(opts.output_dir,'%s_run.script' % self.toolname) artifact = open(artifactpath,'w') artifact.write(self.script) artifact.write('\n') artifact.close() self.cl = [] self.html = [] a = self.cl.append a(opts.interpreter) a('-') # use stdin a(opts.input_tab) a(opts.output_tab) self.outFormats = 'tabular' # TODO make this an option at tool generation time self.inputFormats = 'tabular' # TODO make this an option at tool generation time def makeXML(self): """ Create a Galaxy xml tool wrapper for the new script as a string to write out """ newXML="""<tool id="%(toolid)s" name="%(toolname)s" version="0.01"> %(tooldesc)s %(command)s <inputs> %(inputs)s </inputs> <outputs> %(outputs)s </outputs> <help> %(help)s </help> <configfiles> <configfile name="runMe"> %(script)s </configfile> </configfiles> </tool>""" # needs a dict with toolname, toolid, interpreter, scriptname, command, inputs as a multi line string ready to write, outputs ditto, help ditto newCommand="""<command interpreter="python"> %(toolname)s.py --script_path "$runMe" --interpreter "%(interpreter)s" --tool_name "%(toolname)s" %(command_inputs)s %(command_outputs)s </command>""" # may NOT be an input or htmlout xdict = {} xdict['script'] = self.script # we pass this as a configfile because it's less painful that galaxy_tool_data_dir # embed script in tool - remove dependence on something else outside in the wilds if self.opts.help_text: h = open(self.opts.help_text,'r').read() xdict['help'] = restore_text(h) else: xdict['help'] = 'Please ask the tool author for help as none was supplied at tool generation' if self.opts.tool_desc: xdict['tooldesc'] = '<description>%s</description>' % self.opts.tool_desc else: xdict['tooldesc'] = '' xdict['command_outputs'] = '' # will probably be some! xdict['outputs'] = '' # will probably be some! if self.opts.input_tab: xdict['command_inputs'] = '--input_tab "$input1"' xdict['inputs'] = '<param name="input1" type="data" format="%s" label="Select a suitable input file from your history"/>' % self.inputFormats else: xdict['command_inputs'] = '' # assume no input - eg a random data generator xdict['inputs'] = '' xdict['inputs'] += '<param name="job_name" type="text" label="Supply a name for the outputs to remind you what they contain" value="%s"/>\n' % self.toolname xdict['toolname'] = self.toolname xdict['toolid'] = self.toolid xdict['interpreter'] = self.opts.interpreter xdict['scriptname'] = self.sfile if self.opts.make_HTML: xdict['command_outputs'] += '--output_dir "$html_file.files_path" --output_html "$html_file"' xdict['outputs'] += '<data format="html" name="html_file" label="${job_name}.html"/>\n' if self.opts.output_tab: xdict['command_outputs'] += '--output_tab "$tab_file"' xdict['outputs'] += '<data format="%s" name="tab_file" label="${job_name}"/>\n' % self.outFormats xdict['command'] = newCommand % xdict xmls = newXML % xdict xf = open(self.xmlfile,'w') xf.write(xmls) xf.write('\n') xf.close() # ready for the tarball def makeTooltar(self): """ a tool is a gz tarball with eg /toolname/tool.xml /toolname/tool.py /toolname/test-data/test1_in.foo ... """ retval = self.run() if retval: print >> sys.stderr,'## Run failed. Cannot build yet. Please fix and retry' sys.exit(1) self.makeXML() tdir = self.toolname os.mkdir(tdir) shutil.copyfile(self.xmlfile,os.path.join(tdir,self.xmlfile)) shutil.copyfile(self.pyfile,os.path.join(tdir,'%s.py' % self.toolname)) shutil.copyfile(self.sfile,os.path.join(tdir,self.sfile)) tarpath = "%s.gz" % self.toolname tar = tarfile.open(tarpath, "w:gz") tar.add(tdir,arcname=self.toolname) tar.close() shutil.rmtree(tdir) self.makeHtml() # call this to return the new gzip inside the autogenerated html file ## TODO: replace with optional direct upload to local toolshed? return retval def compressPDF(self,inpdf=None,thumbformat='png'): """need absolute path to pdf """ assert os.path.isfile(inpdf), "## Input %s supplied to %s compressPDF not found" % (inpdf,self.myName) hf,hlog = tempfile.mkstemp(suffix="%s.log" % self.toolname) sto = open(hlog,'w') outpdf = '%s_compressed' % inpdf cl = ["gs", "-sDEVICE=pdfwrite", "-dNOPAUSE", "-dBATCH", "-sOutputFile=%s" % outpdf,inpdf] x = subprocess.Popen(cl,stdout=sto,stderr=sto,cwd=self.opts.output_dir) retval1 = x.wait() if retval1 == 0: os.unlink(inpdf) shutil.move(outpdf,inpdf) outpng = '%s.%s' % (os.path.splitext(inpdf)[0],thumbformat) cl2 = ['convert', inpdf, outpng] x = subprocess.Popen(cl2,stdout=sto,stderr=sto,cwd=self.opts.output_dir) retval2 = x.wait() sto.close() retval = retval1 or retval2 return retval def getfSize(self,fpath,outpath): """ format a nice file size string """ size = '' fp = os.path.join(outpath,fpath) if os.path.isfile(fp): n = float(os.path.getsize(fp)) if n > 2**20: size = ' (%1.1f MB)' % (n/2**20) elif n > 2**10: size = ' (%1.1f KB)' % (n/2**10) elif n > 0: size = ' (%d B)' % (int(n)) return size def makeHtml(self): """ Create an HTML file content to list all the artefacts found in the output_dir """ galhtmlprefix = """<?xml version="1.0" encoding="utf-8" ?> <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <meta name="generator" content="Galaxy %s tool output - see http://g2.trac.bx.psu.edu/" /> <title></title> <link rel="stylesheet" href="/static/style/base.css" type="text/css" /> </head> <body> <div class="document"> """ galhtmlattr = """<hr/><b><a href="http://rgenetics.org">Galaxy Tool Factory Script Wrapper</a> tool output %s run at %s</b><br/>""" galhtmlpostfix = """</div></body></html>\n""" flist = os.listdir(self.opts.output_dir) flist = [x for x in flist if x <> 'Rplots.pdf'] flist.sort() html = [galhtmlprefix % progname,] html.append('<h2>Galaxy %s outputs run at %s</h2><br/>\n' % (self.toolname,timenow())) fhtml = [] if len(flist) > 0: html.append('<table cellpadding="3" cellspacing="3">\n') for fname in flist: dname,e = os.path.splitext(fname) sfsize = self.getfSize(fname,self.opts.output_dir) if e.lower() == '.pdf' : # compress and make a thumbnail thumb = '%s.%s' % (dname,self.thumbformat) pdff = os.path.join(self.opts.output_dir,fname) retval = self.compressPDF(inpdf=pdff,thumbformat=self.thumbformat) if retval == 0: s= '<tr><td><a href="%s"><img src="%s" title="Click to download a PDF of %s" hspace="10" width="600"></a></td></tr>\n' % (fname,thumb,fname) html.append(s) fhtml.append('<li><a href="%s">%s %s</a></li>' % (fname,fname,sfsize)) else: fhtml.append('<li><a href="%s">%s %s</a></li>' % (fname,fname,sfsize)) html.append('</table>\n') if len(fhtml) > 0: fhtml.insert(0,'<ul>') fhtml.append('</ul>') html += fhtml # add all non-pdf files to the end of the display else: html.append('<h2>### Error - %s returned no files - please confirm that parameters are sane</h1>' % self.opts.interpreter) html.append('<h3>%s log follows below</h3><hr/><pre>\n' % self.opts.interpreter) rlog = open(self.tlog,'r').readlines() html += rlog html.append('%s CL = %s<br/>\n' % (self.toolname,' '.join(sys.argv))) html.append('</pre>\n') html.append(galhtmlattr % (progname,timenow())) html.append(galhtmlpostfix) htmlf = file(self.opts.output_html,'w') htmlf.write('\n'.join(html)) htmlf.write('\n') htmlf.close() self.html = html def run(self): """ """ if self.opts.output_dir or self.opts.make_Tool: sto = open(self.tlog,'w') p = subprocess.Popen(' '.join(self.cl),shell=True,stdout=sto,stderr=sto,stdin=subprocess.PIPE,cwd=self.opts.output_dir) else: p = subprocess.Popen(' '.join(self.cl),shell=True,stdin=subprocess.PIPE) p.stdin.write(self.script) p.stdin.close() retval = p.wait() if self.opts.make_HTML or self.opts.make_Tool: sto.close() self.makeHtml() return retval def main(): u = """ This is a Galaxy wrapper. It expects to be called by a special purpose tool.xml as: <command interpreter="python">rgBaseScriptWrapper.py --script_path "$scriptPath" --tool_name "foo" --interpreter "Rscript" </command> """ op = optparse.OptionParser() a = op.add_option a('--script_path',default=None) a('--tool_name',default=None) a('--interpreter',default=None) a('--output_dir',default=None) a('--output_html',default=None) a('--input_tab',default=None) a('--output_tab',default=None) a('--user_email',default=None) a('--bad_user',default=None) a('--make_Tool',default=None) a('--make_HTML',default=None) a('--help_text',default=None) a('--tool_desc',default=None) opts, args = op.parse_args() assert not opts.bad_user,'%s is NOT authorized to use this tool. Please ask your friendly admin' % opts.bad_user assert opts.tool_name,'## Tool Factory expects a tool name - eg --tool_name=DESeq' assert opts.interpreter,'## Tool Factory wrapper expects an interpreter - eg --interpreter=Rscript' assert os.path.isfile(opts.script_path),'## Tool Factory wrapper expects a script path - eg --script_path=foo.R' if opts.output_dir: try: os.makedirs(opts.output_dir) except: pass r = ScriptRunner(opts) if opts.make_Tool: retcode = r.makeTooltar() else: retcode = r.run() if retcode: sys.exit(retcode) # indicate failure to job runner if __name__ == "__main__": main()