Mercurial > repos > melissacline > ucsc_xena_platform
view xena_utils.py @ 38:1ef1886dae04
data import sucess/unsuccess message more informative
author | jingchunzhu |
---|---|
date | Fri, 24 Jul 2015 12:12:37 -0700 |
parents | d64a002c3b0c |
children |
line wrap: on
line source
#!/usr/bin/env python """ xenaUtils: a set of python utilities for the Galaxy / Xena interface """ import os import socket import subprocess import httplib def jarPath(): """Return the full pathname of the xena jar file""" jarPath = os.getenv('XENA_JAR_PATH', "~") return(os.path.join(jarPath, "xena.jar")) def baseDir(): return(os.getenv('XENA_BASE_DIR', "/tmp")) def fileDir(): return(baseDir() + "/files") def hostname(): hostname = subprocess.check_output("hostname -f", shell=True).rstrip() return hostname def isRunning(xenaPort): """Determine if Xena is running on the specified port""" host = hostname() try: httpServ = httplib.HTTPConnection(host, xenaPort) httpServ.connect() data = "(+ 1 2)" httpServ.request('POST', '/data/', data) response = httpServ.getresponse() if response.status == httplib.OK: content = response.read() except: return False return (content == "3.0") def findUnusedPort(): """Find a random port that is available on the local system, and return the port number. """ ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ss.bind(('', 0)) portNumber = ss.getsockname()[1] ss.close() return(portNumber) def isPortAvailable(port): """Test if a given port is available""" ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: ss.bind(('', port)) except: return False else: ss.close() return True def portFilename(): """ Return the name of the file with the port of the running Xena, if any """ xenaBaseDir = os.getenv('XENA_BASE_DIR', "~") xenaPortFilename = xenaBaseDir + "/xena.port" return(xenaPortFilename) def port(testIfAvailable=False, findNewPort=False): preferredXenaPort = 7220 xenaPort = None xenaPortFname = portFilename() if os.path.exists(xenaPortFname): fp = open(xenaPortFname) line = fp.readline() xenaPort = int(line.rstrip()) if testIfAvailable and not isRunning(xenaPort): # Xena is not running on the port. Make sure that # the port is not occupied by some other process if not isPortAvailable(xenaPort): #cmd = "lsof -t -i :%s -sTCP:LISTEN" % portID #pid = subprocess.check_output(cmd, shell=True).rstrip() #print "not available, used by",pid xenaPort = None if findNewPort and xenaPort == None: if isPortAvailable(preferredXenaPort): xenaPort = preferredXenaPort else: xenaPort = findUnusedPort() fp = open(portFilename(), "w") fp.write("%d\n" % xenaPort) fp.close() return(xenaPort) def cleanUpPort(): """ Clean up the port file after Xena has stopped running""" os.unlink(portFilename())