diff frogsUtils.py @ 0:da4101033e10 draft default tip

planemo upload
author oinizan
date Wed, 18 Oct 2017 05:30:40 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/frogsUtils.py	Wed Oct 18 05:30:40 2017 -0400
@@ -0,0 +1,299 @@
+#
+# Copyright (C) 2014 INRA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+__author__ = 'Frederic Escudie - Plateforme bioinformatique Toulouse'
+__copyright__ = 'Copyright (C) 2015 INRA'
+__license__ = 'GNU General Public License'
+__version__ = '0.2.0'
+__email__ = 'frogs@inra.fr'
+__status__ = 'prod'
+
+import os
+import sys
+import time
+import subprocess
+from subprocess import Popen, PIPE
+
+
+def which(exec_name):
+    """
+    @summary: Returns the software absolute path.
+    @param exec_name: [str] The software file (example : blastn or classifier.jar).
+    @return: [str] The software absolute path.
+    """
+    path_locations = os.getenv("PATH").split(os.pathsep) + sys.path
+    exec_path = None
+    for current_location in path_locations:
+        if exec_path is None and os.path.isfile(os.path.join(current_location, exec_name)):
+            exec_path = os.path.abspath( os.path.join(current_location, exec_name) )
+    if exec_path is None:
+        raise Exception( "The software '" + exec_name + "' cannot be retrieved in path." )
+    return exec_path
+
+
+def prevent_shell_injections(argparse_namespace, excluded_args=None):
+    """
+    @summary: Raises an exception if one parameter contains a backquote or a semi-colon.
+    @param argparse_namespace: [Namespase] The result of parser.parse_args().
+    @param excluded_args: [list] List of unchecked parameters.
+    """
+    exceptions = list() if excluded_args is None else excluded_args
+    for param_name in argparse_namespace.__dict__.keys():
+        if not param_name in exceptions:
+            param_val = getattr(argparse_namespace, param_name)
+            if issubclass(param_val.__class__, list):
+                new_param_val = list()
+                for val in param_val:
+                    if ';' in val.encode('utf8') or '`' in val.encode('utf8') or '|' in val.encode('utf8'):
+                        raise Exception( "';' and '`' are unauthorized characters." ) 
+            elif param_val is not None and issubclass(param_val.__class__, str):
+                if ';' in param_val.encode('utf8') or '`' in param_val.encode('utf8') or '|' in param_val.encode('utf8'):
+                    raise Exception( "';' and '`' are unauthorized characters." )
+
+
+class Cmd:
+    """
+    @summary : Command wrapper.
+    """
+    def __init__(self, program, description, exec_parameters, version_parameters=None):
+        """
+        @param exec_parameters: [str] The parameters to execute the program. Two possibles syntaxes.
+                                If the parameter contains the string '##PROGRAM##', this tag will be replaced by the program parameter before submit.
+                                Otherwise the parameters will be added after the program in command line.
+        @param version_parameters: [str] The parameters to get the program version. Two possibles syntaxes.
+                                   If the parameter contains the string '##PROGRAM##', this tag will be replaced by the program parameter before submit.
+                                   Otherwise the parameters will be added after the program in command line.
+        """
+        self.program = program
+        self.description = description
+        self.exec_parameters = exec_parameters
+        self.version_parameters = version_parameters
+
+    def get_cmd(self):
+        """
+        @summary : Returns the command line.
+        @return : [str] The command line.
+        """
+        cmd = None
+        if '##PROGRAM##' in self.exec_parameters:
+            cmd = self.exec_parameters.replace('##PROGRAM##', self.program)
+        else:
+            cmd = self.program + ' ' + self.exec_parameters
+        return cmd
+
+    def get_version(self, location='stderr'):
+        """
+        @summary : Returns the program version number.
+        @param location : [str] If the version command returns the version number on 'stdout' or on 'stderr'.
+        @return : [str] version number if this is possible, otherwise this method return 'unknown'.
+        """
+        if self.version_parameters is None:
+            return "unknown"
+        else:
+            try:
+                cmd = self.program + ' ' + self.version_parameters
+                if '##PROGRAM##' in self.exec_parameters:
+                    cmd = self.version_parameters.replace('##PROGRAM##', self.program)
+                p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
+                stdout, stderr = p.communicate()
+                if location == 'stderr':
+                    return stderr.strip()
+                else:
+                    return stdout.strip()
+            except:
+                raise Exception( "Version cannot be retrieve for the software '" + self.program + "'." )
+
+    def parser(self, log_file):
+        """
+        @summary : Parse the command results to add information in log_file.
+        @log_file : [str] Path to the sample process log file.
+        """
+        pass
+
+    def submit(self, log_file=None):
+        """
+        @summary : Launch command, trace this action in log and parse results.
+        @log_file : [str] Path to the sample process log file.
+        """
+        # Log
+        if log_file is not None:
+            FH_log = Logger( log_file )
+            FH_log.write( '# ' + self.description + ' (' + os.path.basename(self.program) + ' version : ' + self.get_version() + ')\n' )
+            FH_log.write( 'Command:\n\t' + self.get_cmd() + '\n\n' )
+            FH_log.write( 'Execution:\n\tstart: ' + time.strftime("%d %b %Y %H:%M:%S", time.localtime()) + '\n' )
+            FH_log.close()
+        # Process
+        subprocess.check_output( self.get_cmd(), shell=True )
+        # Log
+        if log_file is not None:
+            FH_log = Logger( log_file )
+            FH_log.write( '\tend:   ' + time.strftime("%d %b %Y %H:%M:%S", time.localtime()) + '\n\n' )
+            FH_log.close()
+            # Post-process results
+            self.parser(log_file)
+
+
+class Logger:
+    """
+    @summary: Log file handler.
+    """
+
+    def __init__(self, filepath=None):
+        """
+        @param filepath: [str] The log filepath. [default : STDOUT]
+        """
+        self.filepath = filepath
+        if self.filepath is not None and self.filepath is not sys.stdout:
+            self.file_handle = open( self.filepath, "a" )
+        else:
+            self.file_handle = sys.stdout
+
+    def __del__(self):
+        """
+        @summary: Closed file handler when the logger is detroyed.
+        """
+        self.close()
+
+    def close(self):
+        """
+        @summary: Closed file handler.
+        """
+        if self.filepath is not None and self.filepath is not sys.stdout:
+            if self.file_handle is not None:
+                self.file_handle.close()
+                self.file_handle = None
+
+    def write(self, msg):
+        """
+        @summary: Writes msg on file.
+        @param msg: [str] The message to write.
+        """
+        self.file_handle.write( msg )
+
+    @staticmethod
+    def static_write(filepath, msg):
+        """
+        @summary: Writes msg on file.
+        @param filepath: [str] The log filepath. [default : STDOUT]
+        @param msg: [str] The message to write.
+        """
+        if filepath is not None and filepath is not sys.stdout:
+            FH_log = open( filepath, "a" )
+            FH_log.write( msg )
+            FH_log.close()
+        else:
+            sys.stdout.write( msg )
+
+
+class TmpFiles:
+    """
+    @summary: Manager for temporary files.
+    @note:
+        tmpFiles = TmpFiles(out_dir)
+        try:
+            ...
+            tmp_seq = tmpFiles.add( "toto.fasta" )
+            ...
+            tmp_log = tmpFiles.add( "log.txt" )
+            ...
+        finaly:
+            tmpFiles.deleteAll()
+    """
+    def __init__(self, tmp_dir, prefix=None):
+        """
+        @param tmp_dir: [str] The temporary directory path.
+        @param prefix: [str] The prefix added to each temporary file [default: <TIMESTAMP>_<PID>].
+        """
+        if prefix is None:
+            prefix = str(time.time()) + "_" + str(os.getpid())
+        self.files = list()
+        self.dirs = list()
+        self.tmp_dir = tmp_dir
+        self.prefix = prefix
+
+    def add(self, filename, prefix=None, dir=None):
+        """
+        @summary: Add a temporary file.
+        @param filename: The filename without prefix.
+        @param prefix: The prefix added [default: TmpFiles.prefix].
+        @param dir: The directory path [default: TmpFiles.tmp_dir].
+        @return: [str] The filepath.
+        """
+        # Default
+        if prefix is None:
+            prefix = self.prefix
+        if dir is None:
+            dir = self.tmp_dir
+        # Process
+        filepath = os.path.join(dir, prefix + "_" + filename)
+        self.files.append(filepath)
+        return filepath
+
+    def add_dir(self, dirname, prefix=None, dir=None):
+        """
+        @summary: Add a temporary dir.
+        @param filename: The dirname without prefix.
+        @param prefix: The prefix added [default: TmpFiles.prefix].
+        @param dir: The directory path [default: TmpFiles.tmp_dir].
+        @return: [str] The filepath.
+        """
+        # Default
+        if prefix is None:
+            prefix = self.prefix
+        if dir is None:
+            dir = self.tmp_dir
+        # Process
+        dirpath = os.path.join(dir, prefix + "_" + dirname)
+        self.dirs.append(dirpath)
+        return dirpath
+
+    def delete(self, filepath):
+        """
+        @summary: Deletes the specified temporary file.
+        @param filepath: [str] The file path to delete.
+        """
+        self.files.remove(filepath)
+        if os.path.exists(filepath): os.remove(filepath)
+
+    def delete_dir(self, dirpath):
+        """
+        @summary: Deletes the specified temporary dir.
+        @param filepath: [str] The file path to delete.
+        """
+        if dirpath in self.dirs: self.dirs.remove(dirpath)
+            
+        if os.path.exists(dirpath):
+            for root, dirnames,filenames in os.walk(dirpath):
+                for f in filenames:
+                    if f in self.files: self.files.remove(os.path.join(dirpath,f))
+                    if os.path.exists(os.path.join(dirpath,f)): os.remove(os.path.join(dirpath,f))
+                for d in dirnames:
+                    if d in self.dirs: self.dirs.remove(os.path.join(dirpath,d))
+                    if os.path.exists(os.path.join(dirpath,d)): self.delete_dir(os.path.join(dirpath,d))
+            os.rmdir(dirpath)
+
+    def deleteAll(self):
+        """
+        @summary: Deletes all temporary files.
+        """
+        all_tmp_files = [tmp_file for tmp_file in self.files]
+        for tmp_file in all_tmp_files:
+            self.delete(tmp_file)
+        
+        all_tmp_dirs=[tmp_dir for tmp_dir in self.dirs]
+        for tmp_dir in all_tmp_dirs:
+            self.delete_dir(tmp_dir)
\ No newline at end of file