Mercurial > repos > bcclaywell > argo_navis
view venv/lib/python2.7/site-packages/planemo/io.py @ 0:d67268158946 draft
planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author | bcclaywell |
---|---|
date | Mon, 12 Oct 2015 17:43:33 -0400 |
parents | |
children |
line wrap: on
line source
from __future__ import print_function import contextlib import os import shutil import sys import tempfile import time import click from galaxy.tools.deps import commands from galaxy.tools.deps.commands import which def communicate(cmds, **kwds): info(cmds) p = commands.shell_process(cmds, **kwds) ret_val = p.communicate() if p.returncode != 0: template = "Problem executing commands {0} - ({1}, {2})" msg = template.format(cmds, ret_val[0], ret_val[1]) raise RuntimeError(msg) return ret_val def shell(cmds, **kwds): info(cmds) return commands.shell(cmds, **kwds) def info(message, *args): if args: message = message % args _echo(click.style(message, bold=True, fg='green')) def can_write_to_path(path, **kwds): if not kwds["force"] and os.path.exists(path): error("%s already exists, exiting." % path) return False return True def error(message, *args): if args: message = message % args _echo(click.style(message, bold=True, fg='red'), err=True) def warn(message, *args): if args: message = message % args _echo(click.style(message, fg='red'), err=True) def _echo(message, err=False): if sys.version_info[0] == 2: click.echo(message, err=err) else: print(message) def write_file(path, content): with open(path, "w") as f: f.write(content) def untar_to(url, path, tar_args): if which("wget"): download_cmd = "wget -q --recursive -O - '%s'" else: download_cmd = "curl '%s'" download_cmd = download_cmd % url if tar_args: if not os.path.exists(path): os.makedirs(path) untar_cmd = "tar %s" % tar_args shell("%s | %s" % (download_cmd, untar_cmd)) else: shell("%s > '%s'" % (download_cmd, path)) @contextlib.contextmanager def temp_directory(prefix="planemo_tmp_"): temp_dir = tempfile.mkdtemp(prefix=prefix) try: yield temp_dir finally: shutil.rmtree(temp_dir) def kill_pid_file(pid_file): if not os.path.exists(pid_file): return pid = int(open(pid_file, "r").read()) kill_posix(pid) def kill_posix(pid): def _check_pid(): try: os.kill(pid, 0) return True except OSError: return False if _check_pid(): for sig in [15, 9]: try: os.kill(pid, sig) except OSError: return time.sleep(1) if not _check_pid(): return