diff python-daemon-2.0.5/test/test_pidfile.py @ 33:7ceb967147c3

start xena with no gui add library files
author jingchunzhu <jingchunzhu@gmail.com>
date Wed, 22 Jul 2015 13:24:44 -0700
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/python-daemon-2.0.5/test/test_pidfile.py	Wed Jul 22 13:24:44 2015 -0700
@@ -0,0 +1,472 @@
+# -*- coding: utf-8 -*-
+#
+# test/test_pidfile.py
+# Part of ‘python-daemon’, an implementation of PEP 3143.
+#
+# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
+#
+# This is free software: you may copy, modify, and/or distribute this work
+# under the terms of the Apache License, version 2.0 as published by the
+# Apache Software Foundation.
+# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
+
+""" Unit test for ‘pidfile’ module.
+    """
+
+from __future__ import (absolute_import, unicode_literals)
+
+try:
+    # Python 3 standard library.
+    import builtins
+except ImportError:
+    # Python 2 standard library.
+    import __builtin__ as builtins
+import os
+import itertools
+import tempfile
+import errno
+import functools
+try:
+    # Standard library of Python 2.7 and later.
+    from io import StringIO
+except ImportError:
+    # Standard library of Python 2.6 and earlier.
+    from StringIO import StringIO
+
+import mock
+import lockfile
+
+from . import scaffold
+
+import daemon.pidfile
+
+
+class FakeFileDescriptorStringIO(StringIO, object):
+    """ A StringIO class that fakes a file descriptor. """
+
+    _fileno_generator = itertools.count()
+
+    def __init__(self, *args, **kwargs):
+        self._fileno = next(self._fileno_generator)
+        super(FakeFileDescriptorStringIO, self).__init__(*args, **kwargs)
+
+    def fileno(self):
+        return self._fileno
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        pass
+
+
+try:
+    FileNotFoundError
+    PermissionError
+except NameError:
+    # Python 2 uses IOError.
+    FileNotFoundError = functools.partial(IOError, errno.ENOENT)
+    PermissionError = functools.partial(IOError, errno.EPERM)
+
+
+def make_pidlockfile_scenarios():
+    """ Make a collection of scenarios for testing `PIDLockFile` instances.
+
+        :return: A collection of scenarios for tests involving
+            `PIDLockfFile` instances.
+
+        The collection is a mapping from scenario name to a dictionary of
+        scenario attributes.
+
+        """
+
+    fake_current_pid = 235
+    fake_other_pid = 8642
+    fake_pidfile_path = tempfile.mktemp()
+
+    fake_pidfile_empty = FakeFileDescriptorStringIO()
+    fake_pidfile_current_pid = FakeFileDescriptorStringIO(
+            "{pid:d}\n".format(pid=fake_current_pid))
+    fake_pidfile_other_pid = FakeFileDescriptorStringIO(
+            "{pid:d}\n".format(pid=fake_other_pid))
+    fake_pidfile_bogus = FakeFileDescriptorStringIO(
+            "b0gUs")
+
+    scenarios = {
+            'simple': {},
+            'not-exist': {
+                'open_func_name': 'fake_open_nonexist',
+                'os_open_func_name': 'fake_os_open_nonexist',
+                },
+            'not-exist-write-denied': {
+                'open_func_name': 'fake_open_nonexist',
+                'os_open_func_name': 'fake_os_open_nonexist',
+                },
+            'not-exist-write-busy': {
+                'open_func_name': 'fake_open_nonexist',
+                'os_open_func_name': 'fake_os_open_nonexist',
+                },
+            'exist-read-denied': {
+                'open_func_name': 'fake_open_read_denied',
+                'os_open_func_name': 'fake_os_open_read_denied',
+                },
+            'exist-locked-read-denied': {
+                'locking_pid': fake_other_pid,
+                'open_func_name': 'fake_open_read_denied',
+                'os_open_func_name': 'fake_os_open_read_denied',
+                },
+            'exist-empty': {},
+            'exist-invalid': {
+                'pidfile': fake_pidfile_bogus,
+                },
+            'exist-current-pid': {
+                'pidfile': fake_pidfile_current_pid,
+                'pidfile_pid': fake_current_pid,
+                },
+            'exist-current-pid-locked': {
+                'pidfile': fake_pidfile_current_pid,
+                'pidfile_pid': fake_current_pid,
+                'locking_pid': fake_current_pid,
+                },
+            'exist-other-pid': {
+                'pidfile': fake_pidfile_other_pid,
+                'pidfile_pid': fake_other_pid,
+                },
+            'exist-other-pid-locked': {
+                'pidfile': fake_pidfile_other_pid,
+                'pidfile_pid': fake_other_pid,
+                'locking_pid': fake_other_pid,
+                },
+            }
+
+    for scenario in scenarios.values():
+        scenario['pid'] = fake_current_pid
+        scenario['pidfile_path'] = fake_pidfile_path
+        if 'pidfile' not in scenario:
+            scenario['pidfile'] = fake_pidfile_empty
+        if 'pidfile_pid' not in scenario:
+            scenario['pidfile_pid'] = None
+        if 'locking_pid' not in scenario:
+            scenario['locking_pid'] = None
+        if 'open_func_name' not in scenario:
+            scenario['open_func_name'] = 'fake_open_okay'
+        if 'os_open_func_name' not in scenario:
+            scenario['os_open_func_name'] = 'fake_os_open_okay'
+
+    return scenarios
+
+
+def setup_pidfile_fixtures(testcase):
+    """ Set up common fixtures for PID file test cases.
+
+        :param testcase: A `TestCase` instance to decorate.
+
+        Decorate the `testcase` with attributes to be fixtures for tests
+        involving `PIDLockFile` instances.
+
+        """
+    scenarios = make_pidlockfile_scenarios()
+    testcase.pidlockfile_scenarios = scenarios
+
+    def get_scenario_option(testcase, key, default=None):
+        value = default
+        try:
+            value = testcase.scenario[key]
+        except (NameError, TypeError, AttributeError, KeyError):
+            pass
+        return value
+
+    func_patcher_os_getpid = mock.patch.object(
+            os, "getpid",
+            return_value=scenarios['simple']['pid'])
+    func_patcher_os_getpid.start()
+    testcase.addCleanup(func_patcher_os_getpid.stop)
+
+    def make_fake_open_funcs(testcase):
+
+        def fake_open_nonexist(filename, mode, buffering):
+            if mode.startswith('r'):
+                error = FileNotFoundError(
+                        "No such file {filename!r}".format(
+                            filename=filename))
+                raise error
+            else:
+                result = testcase.scenario['pidfile']
+            return result
+
+        def fake_open_read_denied(filename, mode, buffering):
+            if mode.startswith('r'):
+                error = PermissionError(
+                        "Read denied on {filename!r}".format(
+                            filename=filename))
+                raise error
+            else:
+                result = testcase.scenario['pidfile']
+            return result
+
+        def fake_open_okay(filename, mode, buffering):
+            result = testcase.scenario['pidfile']
+            return result
+
+        def fake_os_open_nonexist(filename, flags, mode):
+            if (flags & os.O_CREAT):
+                result = testcase.scenario['pidfile'].fileno()
+            else:
+                error = FileNotFoundError(
+                        "No such file {filename!r}".format(
+                            filename=filename))
+                raise error
+            return result
+
+        def fake_os_open_read_denied(filename, flags, mode):
+            if (flags & os.O_CREAT):
+                result = testcase.scenario['pidfile'].fileno()
+            else:
+                error = PermissionError(
+                        "Read denied on {filename!r}".format(
+                            filename=filename))
+                raise error
+            return result
+
+        def fake_os_open_okay(filename, flags, mode):
+            result = testcase.scenario['pidfile'].fileno()
+            return result
+
+        funcs = dict(
+                (name, obj) for (name, obj) in vars().items()
+                if callable(obj))
+
+        return funcs
+
+    testcase.fake_pidfile_open_funcs = make_fake_open_funcs(testcase)
+
+    def fake_open(filename, mode='rt', buffering=None):
+        scenario_path = get_scenario_option(testcase, 'pidfile_path')
+        if filename == scenario_path:
+            func_name = testcase.scenario['open_func_name']
+            fake_open_func = testcase.fake_pidfile_open_funcs[func_name]
+            result = fake_open_func(filename, mode, buffering)
+        else:
+            result = FakeFileDescriptorStringIO()
+        return result
+
+    mock_open = mock.mock_open()
+    mock_open.side_effect = fake_open
+
+    func_patcher_builtin_open = mock.patch.object(
+            builtins, "open",
+            new=mock_open)
+    func_patcher_builtin_open.start()
+    testcase.addCleanup(func_patcher_builtin_open.stop)
+
+    def fake_os_open(filename, flags, mode=None):
+        scenario_path = get_scenario_option(testcase, 'pidfile_path')
+        if filename == scenario_path:
+            func_name = testcase.scenario['os_open_func_name']
+            fake_os_open_func = testcase.fake_pidfile_open_funcs[func_name]
+            result = fake_os_open_func(filename, flags, mode)
+        else:
+            result = FakeFileDescriptorStringIO().fileno()
+        return result
+
+    mock_os_open = mock.MagicMock(side_effect=fake_os_open)
+
+    func_patcher_os_open = mock.patch.object(
+            os, "open",
+            new=mock_os_open)
+    func_patcher_os_open.start()
+    testcase.addCleanup(func_patcher_os_open.stop)
+
+    def fake_os_fdopen(fd, mode='rt', buffering=None):
+        scenario_pidfile = get_scenario_option(
+                testcase, 'pidfile', FakeFileDescriptorStringIO())
+        if fd == testcase.scenario['pidfile'].fileno():
+            result = testcase.scenario['pidfile']
+        else:
+            raise OSError(errno.EBADF, "Bad file descriptor")
+        return result
+
+    mock_os_fdopen = mock.MagicMock(side_effect=fake_os_fdopen)
+
+    func_patcher_os_fdopen = mock.patch.object(
+            os, "fdopen",
+            new=mock_os_fdopen)
+    func_patcher_os_fdopen.start()
+    testcase.addCleanup(func_patcher_os_fdopen.stop)
+
+
+def make_lockfile_method_fakes(scenario):
+    """ Make common fake methods for lockfile class.
+
+        :param scenario: A scenario for testing with PIDLockFile.
+        :return: A mapping from normal function name to the corresponding
+            fake function.
+
+        Each fake function behaves appropriately for the specified `scenario`.
+
+        """
+
+    def fake_func_read_pid():
+        return scenario['pidfile_pid']
+    def fake_func_is_locked():
+        return (scenario['locking_pid'] is not None)
+    def fake_func_i_am_locking():
+        return (
+                scenario['locking_pid'] == scenario['pid'])
+    def fake_func_acquire(timeout=None):
+        if scenario['locking_pid'] is not None:
+            raise lockfile.AlreadyLocked()
+        scenario['locking_pid'] = scenario['pid']
+    def fake_func_release():
+        if scenario['locking_pid'] is None:
+            raise lockfile.NotLocked()
+        if scenario['locking_pid'] != scenario['pid']:
+            raise lockfile.NotMyLock()
+        scenario['locking_pid'] = None
+    def fake_func_break_lock():
+        scenario['locking_pid'] = None
+
+    fake_methods = dict(
+            (
+                func_name.replace('fake_func_', ''),
+                mock.MagicMock(side_effect=fake_func))
+            for (func_name, fake_func) in vars().items()
+                if func_name.startswith('fake_func_'))
+
+    return fake_methods
+
+
+def apply_lockfile_method_mocks(mock_lockfile, testcase, scenario):
+    """ Apply common fake methods to mock lockfile class.
+
+        :param mock_lockfile: An object providing the `LockFile` interface.
+        :param testcase: The `TestCase` instance providing the context for
+            the patch.
+        :param scenario: The `PIDLockFile` test scenario to use.
+
+        Mock the `LockFile` methods of `mock_lockfile`, by applying fake
+        methods customised for `scenario`. The mock is does by a patch
+        within the context of `testcase`.
+
+        """
+    fake_methods = dict(
+            (func_name, fake_func)
+            for (func_name, fake_func) in
+                make_lockfile_method_fakes(scenario).items()
+            if func_name not in ['read_pid'])
+
+    for (func_name, fake_func) in fake_methods.items():
+        func_patcher = mock.patch.object(
+                mock_lockfile, func_name,
+                new=fake_func)
+        func_patcher.start()
+        testcase.addCleanup(func_patcher.stop)
+
+
+def setup_pidlockfile_fixtures(testcase, scenario_name=None):
+    """ Set up common fixtures for PIDLockFile test cases.
+
+        :param testcase: The `TestCase` instance to decorate.
+        :param scenario_name: The name of the `PIDLockFile` scenario to use.
+
+        Decorate the `testcase` with attributes that are fixtures for test
+        cases involving `PIDLockFile` instances.`
+
+        """
+
+    setup_pidfile_fixtures(testcase)
+
+    for func_name in [
+            'write_pid_to_pidfile',
+            'remove_existing_pidfile',
+            ]:
+        func_patcher = mock.patch.object(lockfile.pidlockfile, func_name)
+        func_patcher.start()
+        testcase.addCleanup(func_patcher.stop)
+
+
+class TimeoutPIDLockFile_TestCase(scaffold.TestCase):
+    """ Test cases for ‘TimeoutPIDLockFile’ class. """
+
+    def setUp(self):
+        """ Set up test fixtures. """
+        super(TimeoutPIDLockFile_TestCase, self).setUp()
+
+        pidlockfile_scenarios = make_pidlockfile_scenarios()
+        self.pidlockfile_scenario = pidlockfile_scenarios['simple']
+        pidfile_path = self.pidlockfile_scenario['pidfile_path']
+
+        for func_name in ['__init__', 'acquire']:
+            func_patcher = mock.patch.object(
+                    lockfile.pidlockfile.PIDLockFile, func_name)
+            func_patcher.start()
+            self.addCleanup(func_patcher.stop)
+
+        self.scenario = {
+                'pidfile_path': self.pidlockfile_scenario['pidfile_path'],
+                'acquire_timeout': self.getUniqueInteger(),
+                }
+
+        self.test_kwargs = dict(
+                path=self.scenario['pidfile_path'],
+                acquire_timeout=self.scenario['acquire_timeout'],
+                )
+        self.test_instance = daemon.pidfile.TimeoutPIDLockFile(
+                **self.test_kwargs)
+
+    def test_inherits_from_pidlockfile(self):
+        """ Should inherit from PIDLockFile. """
+        instance = self.test_instance
+        self.assertIsInstance(instance, lockfile.pidlockfile.PIDLockFile)
+
+    def test_init_has_expected_signature(self):
+        """ Should have expected signature for ‘__init__’. """
+        def test_func(self, path, acquire_timeout=None, *args, **kwargs): pass
+        test_func.__name__ = str('__init__')
+        self.assertFunctionSignatureMatch(
+                test_func,
+                daemon.pidfile.TimeoutPIDLockFile.__init__)
+
+    def test_has_specified_acquire_timeout(self):
+        """ Should have specified ‘acquire_timeout’ value. """
+        instance = self.test_instance
+        expected_timeout = self.test_kwargs['acquire_timeout']
+        self.assertEqual(expected_timeout, instance.acquire_timeout)
+
+    @mock.patch.object(
+            lockfile.pidlockfile.PIDLockFile, "__init__",
+            autospec=True)
+    def test_calls_superclass_init(self, mock_init):
+        """ Should call the superclass ‘__init__’. """
+        expected_path = self.test_kwargs['path']
+        instance = daemon.pidfile.TimeoutPIDLockFile(**self.test_kwargs)
+        mock_init.assert_called_with(instance, expected_path)
+
+    @mock.patch.object(
+            lockfile.pidlockfile.PIDLockFile, "acquire",
+            autospec=True)
+    def test_acquire_uses_specified_timeout(self, mock_func_acquire):
+        """ Should call the superclass ‘acquire’ with specified timeout. """
+        instance = self.test_instance
+        test_timeout = self.getUniqueInteger()
+        expected_timeout = test_timeout
+        instance.acquire(test_timeout)
+        mock_func_acquire.assert_called_with(instance, expected_timeout)
+
+    @mock.patch.object(
+            lockfile.pidlockfile.PIDLockFile, "acquire",
+            autospec=True)
+    def test_acquire_uses_stored_timeout_by_default(self, mock_func_acquire):
+        """ Should call superclass ‘acquire’ with stored timeout by default. """
+        instance = self.test_instance
+        test_timeout = self.test_kwargs['acquire_timeout']
+        expected_timeout = test_timeout
+        instance.acquire()
+        mock_func_acquire.assert_called_with(instance, expected_timeout)
+
+
+# Local variables:
+# coding: utf-8
+# mode: python
+# End:
+# vim: fileencoding=utf-8 filetype=python :