Mercurial > repos > melissacline > ucsc_xena_platform
view xena_utils.py @ 26:8666fd3e8e4f
Uploaded, to try to fix a merge problem.
author | melissacline |
---|---|
date | Tue, 02 Jun 2015 19:18:14 -0400 |
parents | 8bb037f88ed2 |
children | a3fbe077a14c |
line wrap: on
line source
#!/usr/bin/env python """ xenaUtils: a set of python utilities for the Galaxy / Xena interface """ import os import socket import subprocess def jarPath(): """Return the full pathname of the xena jar file""" jarPath = os.getenv("XENA_JAR_PATH", "~") return(os.path.join(jarPath, "xena.jar")) def baseDir(): return(os.getenv("XENA_BASE_DIR", "/tmp")) def fileDir(): return(baseDir() + "/files") def isRunning(xenaPort): """Determine if Xena is running on the specified port""" query = "wget -q -O- http://localhost:%s/data/'(+ 1 2)'" % xenaPort try: result = subprocess.check_output(query, shell=True) except: return False else: return(result == "3.0") def findUnusedPort(): """Find a random port that is available on the local system, and return the port number. """ ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ss.bind(('', 0)) portNumber = ss.getsockname()[1] ss.close() return(portNumber) def isPortAvailable(port): """Test if a given port is available""" ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: ss.bind(('', port)) except: return False else: ss.close() return True def portFilename(): """ Return the name of the file with the port of the running Xena, if any """ xenaBaseDir = os.getenv("XENA_BASE_DIR", "~") xenaPortFilename = xenaBaseDir + "/xena.port" return(xenaPortFilename) def port(testIfAvailable=False, findNewPort=False): preferredXenaPort = 7220 xenaPort = None xenaPortFname = portFilename() if os.path.exists(xenaPortFname): fp = open(xenaPortFname) line = fp.readline() xenaPort = int(line.rstrip()) if testIfAvailable and not isRunning(xenaPort): # Xena is not running on the port. Make sure that # the port is not occupied by some other process if not isPortAvailable(xenaPort): #cmd = "lsof -t -i :%s -sTCP:LISTEN" % portID #pid = subprocess.check_output(cmd, shell=True).rstrip() #print "not available, used by",pid xenaPort = None if findNewPort and xenaPort == None: if isPortAvailable(preferredXenaPort): xenaPort = preferredXenaPort else: xenaPort = findUnusedPort() fp = open(portFilename(), "w") fp.write("%d\n" % xenaPort) fp.close() return(xenaPort) def cleanUpPort(): """ Clean up the port file after Xena has stopped running""" os.unlink(portFilename())