Mercurial > repos > peterjc > tmhmm_and_signalp
diff tools/protein_analysis/seq_analysis_utils.py @ 32:20da7f48b56f draft
"Check this is up to date with all 2020 changes"
author | peterjc |
---|---|
date | Thu, 17 Jun 2021 08:19:32 +0000 |
parents | 6d9d7cdf00fc |
children | 7a2e20baacee |
line wrap: on
line diff
--- a/tools/protein_analysis/seq_analysis_utils.py Thu Sep 21 11:23:01 2017 -0400 +++ b/tools/protein_analysis/seq_analysis_utils.py Thu Jun 17 08:19:32 2021 +0000 @@ -16,23 +16,26 @@ from time import sleep -__version__ = "0.0.2" +if sys.version_info[0] < 3: + range = xrange # noqa: F821 + +__version__ = "0.0.4" try: from multiprocessing import cpu_count except ImportError: # Must be under Python 2.5, this is copied from multiprocessing: def cpu_count(): - """Returns the number of CPUs in the system.""" - if sys.platform == 'win32': + """Return the number of CPUs in the system.""" + if sys.platform == "win32": try: - num = int(os.environ['NUMBER_OF_PROCESSORS']) + num = int(os.environ["NUMBER_OF_PROCESSORS"]) except (ValueError, KeyError): num = 0 - elif 'bsd' in sys.platform or sys.platform == 'darwin': - comm = '/sbin/sysctl -n hw.ncpu' - if sys.platform == 'darwin': - comm = '/usr' + comm + elif "bsd" in sys.platform or sys.platform == "darwin": + comm = "/sbin/sysctl -n hw.ncpu" + if sys.platform == "darwin": + comm = "/usr" + comm try: with os.popen(comm) as p: num = int(p.read()) @@ -40,14 +43,14 @@ num = 0 else: try: - num = os.sysconf('SC_NPROCESSORS_ONLN') + num = os.sysconf("SC_NPROCESSORS_ONLN") except (ValueError, OSError, AttributeError): num = 0 if num >= 1: return num else: - raise NotImplementedError('cannot determine number of cpus') + raise NotImplementedError("cannot determine number of cpus") def thread_count(command_line_arg, default=1): @@ -70,7 +73,7 @@ def fasta_iterator(filename, max_len=None, truncate=None): - """Simple FASTA parser yielding tuples of (title, sequence) strings.""" + """Parse FASTA file yielding tuples of (name, sequence).""" handle = open(filename) title, seq = "", "" for line in handle: @@ -79,8 +82,10 @@ if truncate: seq = seq[:truncate] if max_len and len(seq) > max_len: - raise ValueError("Sequence %s is length %i, max length %i" - % (title.split()[0], len(seq), max_len)) + raise ValueError( + "Sequence %s is length %i, max length %i" + % (title.split()[0], len(seq), max_len) + ) yield title, seq title = line[1:].rstrip() seq = "" @@ -98,13 +103,22 @@ if truncate: seq = seq[:truncate] if max_len and len(seq) > max_len: - raise ValueError("Sequence %s is length %i, max length %i" - % (title.split()[0], len(seq), max_len)) + raise ValueError( + "Sequence %s is length %i, max length %i" + % (title.split()[0], len(seq), max_len) + ) yield title, seq raise StopIteration -def split_fasta(input_filename, output_filename_base, n=500, truncate=None, keep_descr=False, max_len=None): +def split_fasta( + input_filename, + output_filename_base, + n=500, + truncate=None, + keep_descr=False, + max_len=None, +): """Split FASTA file into sub-files each of at most n sequences. Returns a list of the filenames used (based on the input filename). @@ -122,7 +136,7 @@ records = [] for i in range(n): try: - records.append(iterator.next()) + records.append(next(iterator)) except StopIteration: break if not records: @@ -133,12 +147,12 @@ for title, seq in records: handle.write(">%s\n" % title) for i in range(0, len(seq), 60): - handle.write(seq[i:i + 60] + "\n") + handle.write(seq[i : i + 60] + "\n") else: for title, seq in records: handle.write(">%s\n" % title.split()[0]) for i in range(0, len(seq), 60): - handle.write(seq[i:i + 60] + "\n") + handle.write(seq[i : i + 60] + "\n") handle.close() files.append(new_filename) # print "%i records in %s" % (len(records), new_filename) @@ -158,7 +172,7 @@ def run_jobs(jobs, threads, pause=10, verbose=False, fast_fail=True): - """Takes list of cmd strings, returns dict with error levels.""" + """Take list of cmd strings, return dict with error levels.""" pending = jobs[:] running = [] results = {} @@ -177,11 +191,12 @@ results[cmd] = return_code if return_code: failed = True - running = [(cmd, process) for (cmd, process) in running - if cmd not in results] + running = [(cmd, process) for (cmd, process) in running if cmd not in results] if verbose: - print("%i jobs pending, %i running, %i completed" % - (len(pending), len(running), len(results))) + print( + "%i jobs pending, %i running, %i completed" + % (len(pending), len(running), len(results)) + ) # See if we can start any new threads if pending and failed and fast_fail: # Don't start any more jobs