view frogsUtils.py @ 0:da4101033e10 draft default tip

planemo upload
author oinizan
date Wed, 18 Oct 2017 05:30:40 -0400
parents
children
line wrap: on
line source

#
# Copyright (C) 2014 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

__author__ = 'Frederic Escudie - Plateforme bioinformatique Toulouse'
__copyright__ = 'Copyright (C) 2015 INRA'
__license__ = 'GNU General Public License'
__version__ = '0.2.0'
__email__ = 'frogs@inra.fr'
__status__ = 'prod'

import os
import sys
import time
import subprocess
from subprocess import Popen, PIPE


def which(exec_name):
    """
    @summary: Returns the software absolute path.
    @param exec_name: [str] The software file (example : blastn or classifier.jar).
    @return: [str] The software absolute path.
    """
    path_locations = os.getenv("PATH").split(os.pathsep) + sys.path
    exec_path = None
    for current_location in path_locations:
        if exec_path is None and os.path.isfile(os.path.join(current_location, exec_name)):
            exec_path = os.path.abspath( os.path.join(current_location, exec_name) )
    if exec_path is None:
        raise Exception( "The software '" + exec_name + "' cannot be retrieved in path." )
    return exec_path


def prevent_shell_injections(argparse_namespace, excluded_args=None):
    """
    @summary: Raises an exception if one parameter contains a backquote or a semi-colon.
    @param argparse_namespace: [Namespase] The result of parser.parse_args().
    @param excluded_args: [list] List of unchecked parameters.
    """
    exceptions = list() if excluded_args is None else excluded_args
    for param_name in argparse_namespace.__dict__.keys():
        if not param_name in exceptions:
            param_val = getattr(argparse_namespace, param_name)
            if issubclass(param_val.__class__, list):
                new_param_val = list()
                for val in param_val:
                    if ';' in val.encode('utf8') or '`' in val.encode('utf8') or '|' in val.encode('utf8'):
                        raise Exception( "';' and '`' are unauthorized characters." ) 
            elif param_val is not None and issubclass(param_val.__class__, str):
                if ';' in param_val.encode('utf8') or '`' in param_val.encode('utf8') or '|' in param_val.encode('utf8'):
                    raise Exception( "';' and '`' are unauthorized characters." )


class Cmd:
    """
    @summary : Command wrapper.
    """
    def __init__(self, program, description, exec_parameters, version_parameters=None):
        """
        @param exec_parameters: [str] The parameters to execute the program. Two possibles syntaxes.
                                If the parameter contains the string '##PROGRAM##', this tag will be replaced by the program parameter before submit.
                                Otherwise the parameters will be added after the program in command line.
        @param version_parameters: [str] The parameters to get the program version. Two possibles syntaxes.
                                   If the parameter contains the string '##PROGRAM##', this tag will be replaced by the program parameter before submit.
                                   Otherwise the parameters will be added after the program in command line.
        """
        self.program = program
        self.description = description
        self.exec_parameters = exec_parameters
        self.version_parameters = version_parameters

    def get_cmd(self):
        """
        @summary : Returns the command line.
        @return : [str] The command line.
        """
        cmd = None
        if '##PROGRAM##' in self.exec_parameters:
            cmd = self.exec_parameters.replace('##PROGRAM##', self.program)
        else:
            cmd = self.program + ' ' + self.exec_parameters
        return cmd

    def get_version(self, location='stderr'):
        """
        @summary : Returns the program version number.
        @param location : [str] If the version command returns the version number on 'stdout' or on 'stderr'.
        @return : [str] version number if this is possible, otherwise this method return 'unknown'.
        """
        if self.version_parameters is None:
            return "unknown"
        else:
            try:
                cmd = self.program + ' ' + self.version_parameters
                if '##PROGRAM##' in self.exec_parameters:
                    cmd = self.version_parameters.replace('##PROGRAM##', self.program)
                p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
                stdout, stderr = p.communicate()
                if location == 'stderr':
                    return stderr.strip()
                else:
                    return stdout.strip()
            except:
                raise Exception( "Version cannot be retrieve for the software '" + self.program + "'." )

    def parser(self, log_file):
        """
        @summary : Parse the command results to add information in log_file.
        @log_file : [str] Path to the sample process log file.
        """
        pass

    def submit(self, log_file=None):
        """
        @summary : Launch command, trace this action in log and parse results.
        @log_file : [str] Path to the sample process log file.
        """
        # Log
        if log_file is not None:
            FH_log = Logger( log_file )
            FH_log.write( '# ' + self.description + ' (' + os.path.basename(self.program) + ' version : ' + self.get_version() + ')\n' )
            FH_log.write( 'Command:\n\t' + self.get_cmd() + '\n\n' )
            FH_log.write( 'Execution:\n\tstart: ' + time.strftime("%d %b %Y %H:%M:%S", time.localtime()) + '\n' )
            FH_log.close()
        # Process
        subprocess.check_output( self.get_cmd(), shell=True )
        # Log
        if log_file is not None:
            FH_log = Logger( log_file )
            FH_log.write( '\tend:   ' + time.strftime("%d %b %Y %H:%M:%S", time.localtime()) + '\n\n' )
            FH_log.close()
            # Post-process results
            self.parser(log_file)


class Logger:
    """
    @summary: Log file handler.
    """

    def __init__(self, filepath=None):
        """
        @param filepath: [str] The log filepath. [default : STDOUT]
        """
        self.filepath = filepath
        if self.filepath is not None and self.filepath is not sys.stdout:
            self.file_handle = open( self.filepath, "a" )
        else:
            self.file_handle = sys.stdout

    def __del__(self):
        """
        @summary: Closed file handler when the logger is detroyed.
        """
        self.close()

    def close(self):
        """
        @summary: Closed file handler.
        """
        if self.filepath is not None and self.filepath is not sys.stdout:
            if self.file_handle is not None:
                self.file_handle.close()
                self.file_handle = None

    def write(self, msg):
        """
        @summary: Writes msg on file.
        @param msg: [str] The message to write.
        """
        self.file_handle.write( msg )

    @staticmethod
    def static_write(filepath, msg):
        """
        @summary: Writes msg on file.
        @param filepath: [str] The log filepath. [default : STDOUT]
        @param msg: [str] The message to write.
        """
        if filepath is not None and filepath is not sys.stdout:
            FH_log = open( filepath, "a" )
            FH_log.write( msg )
            FH_log.close()
        else:
            sys.stdout.write( msg )


class TmpFiles:
    """
    @summary: Manager for temporary files.
    @note:
        tmpFiles = TmpFiles(out_dir)
        try:
            ...
            tmp_seq = tmpFiles.add( "toto.fasta" )
            ...
            tmp_log = tmpFiles.add( "log.txt" )
            ...
        finaly:
            tmpFiles.deleteAll()
    """
    def __init__(self, tmp_dir, prefix=None):
        """
        @param tmp_dir: [str] The temporary directory path.
        @param prefix: [str] The prefix added to each temporary file [default: <TIMESTAMP>_<PID>].
        """
        if prefix is None:
            prefix = str(time.time()) + "_" + str(os.getpid())
        self.files = list()
        self.dirs = list()
        self.tmp_dir = tmp_dir
        self.prefix = prefix

    def add(self, filename, prefix=None, dir=None):
        """
        @summary: Add a temporary file.
        @param filename: The filename without prefix.
        @param prefix: The prefix added [default: TmpFiles.prefix].
        @param dir: The directory path [default: TmpFiles.tmp_dir].
        @return: [str] The filepath.
        """
        # Default
        if prefix is None:
            prefix = self.prefix
        if dir is None:
            dir = self.tmp_dir
        # Process
        filepath = os.path.join(dir, prefix + "_" + filename)
        self.files.append(filepath)
        return filepath

    def add_dir(self, dirname, prefix=None, dir=None):
        """
        @summary: Add a temporary dir.
        @param filename: The dirname without prefix.
        @param prefix: The prefix added [default: TmpFiles.prefix].
        @param dir: The directory path [default: TmpFiles.tmp_dir].
        @return: [str] The filepath.
        """
        # Default
        if prefix is None:
            prefix = self.prefix
        if dir is None:
            dir = self.tmp_dir
        # Process
        dirpath = os.path.join(dir, prefix + "_" + dirname)
        self.dirs.append(dirpath)
        return dirpath

    def delete(self, filepath):
        """
        @summary: Deletes the specified temporary file.
        @param filepath: [str] The file path to delete.
        """
        self.files.remove(filepath)
        if os.path.exists(filepath): os.remove(filepath)

    def delete_dir(self, dirpath):
        """
        @summary: Deletes the specified temporary dir.
        @param filepath: [str] The file path to delete.
        """
        if dirpath in self.dirs: self.dirs.remove(dirpath)
            
        if os.path.exists(dirpath):
            for root, dirnames,filenames in os.walk(dirpath):
                for f in filenames:
                    if f in self.files: self.files.remove(os.path.join(dirpath,f))
                    if os.path.exists(os.path.join(dirpath,f)): os.remove(os.path.join(dirpath,f))
                for d in dirnames:
                    if d in self.dirs: self.dirs.remove(os.path.join(dirpath,d))
                    if os.path.exists(os.path.join(dirpath,d)): self.delete_dir(os.path.join(dirpath,d))
            os.rmdir(dirpath)

    def deleteAll(self):
        """
        @summary: Deletes all temporary files.
        """
        all_tmp_files = [tmp_file for tmp_file in self.files]
        for tmp_file in all_tmp_files:
            self.delete(tmp_file)
        
        all_tmp_dirs=[tmp_dir for tmp_dir in self.dirs]
        for tmp_dir in all_tmp_dirs:
            self.delete_dir(tmp_dir)