Mercurial > repos > melissacline > ucsc_xena_platform
diff xena_utils.py @ 0:8bb037f88ed2
Uploaded
author | melissacline |
---|---|
date | Tue, 13 Jan 2015 23:37:23 -0500 |
parents | |
children | a3fbe077a14c |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/xena_utils.py Tue Jan 13 23:37:23 2015 -0500 @@ -0,0 +1,97 @@ +#!/usr/bin/env python +""" +xenaUtils: a set of python utilities for the Galaxy / Xena interface +""" + +import os +import socket +import subprocess + +def jarPath(): + """Return the full pathname of the xena jar file""" + jarPath = os.getenv("XENA_JAR_PATH", "~") + return(os.path.join(jarPath, "xena.jar")) + + +def baseDir(): + return(os.getenv("XENA_BASE_DIR", "/tmp")) + +def fileDir(): + return(baseDir() + "/files") + +def isRunning(xenaPort): + """Determine if Xena is running on the specified port""" + query = "wget -q -O- http://localhost:%s/data/'(+ 1 2)'" % xenaPort + try: + result = subprocess.check_output(query, shell=True) + except: + return False + else: + return(result == "3.0") + + +def findUnusedPort(): + """Find a random port that is available on the local system, and return + the port number. + """ + ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ss.bind(('', 0)) + portNumber = ss.getsockname()[1] + ss.close() + return(portNumber) + +def isPortAvailable(port): + """Test if a given port is available""" + ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + ss.bind(('', port)) + except: + return False + else: + ss.close() + return True + + +def portFilename(): + """ Return the name of the file with the port of the running Xena, + if any + """ + xenaBaseDir = os.getenv("XENA_BASE_DIR", "~") + xenaPortFilename = xenaBaseDir + "/xena.port" + return(xenaPortFilename) + + + + +def port(testIfAvailable=False, findNewPort=False): + preferredXenaPort = 7220 + xenaPort = None + xenaPortFname = portFilename() + if os.path.exists(xenaPortFname): + fp = open(xenaPortFname) + line = fp.readline() + xenaPort = int(line.rstrip()) + if testIfAvailable and not isRunning(xenaPort): + # Xena is not running on the port. Make sure that + # the port is not occupied by some other process + if not isPortAvailable(xenaPort): + #cmd = "lsof -t -i :%s -sTCP:LISTEN" % portID + #pid = subprocess.check_output(cmd, shell=True).rstrip() + #print "not available, used by",pid + xenaPort = None + if findNewPort and xenaPort == None: + if isPortAvailable(preferredXenaPort): + xenaPort = preferredXenaPort + else: + xenaPort = findUnusedPort() + fp = open(portFilename(), "w") + fp.write("%d\n" % xenaPort) + fp.close() + return(xenaPort) + +def cleanUpPort(): + """ Clean up the port file after Xena has stopped running""" + os.unlink(portFilename()) + + +