comparison python-daemon-2.0.5/test/test_pidfile.py @ 33:7ceb967147c3

start xena with no gui add library files
author jingchunzhu <jingchunzhu@gmail.com>
date Wed, 22 Jul 2015 13:24:44 -0700
parents
children
comparison
equal deleted inserted replaced
32:63b1ba1e3424 33:7ceb967147c3
1 # -*- coding: utf-8 -*-
2 #
3 # test/test_pidfile.py
4 # Part of ‘python-daemon’, an implementation of PEP 3143.
5 #
6 # Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
7 #
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the Apache License, version 2.0 as published by the
10 # Apache Software Foundation.
11 # No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
12
13 """ Unit test for ‘pidfile’ module.
14 """
15
16 from __future__ import (absolute_import, unicode_literals)
17
18 try:
19 # Python 3 standard library.
20 import builtins
21 except ImportError:
22 # Python 2 standard library.
23 import __builtin__ as builtins
24 import os
25 import itertools
26 import tempfile
27 import errno
28 import functools
29 try:
30 # Standard library of Python 2.7 and later.
31 from io import StringIO
32 except ImportError:
33 # Standard library of Python 2.6 and earlier.
34 from StringIO import StringIO
35
36 import mock
37 import lockfile
38
39 from . import scaffold
40
41 import daemon.pidfile
42
43
44 class FakeFileDescriptorStringIO(StringIO, object):
45 """ A StringIO class that fakes a file descriptor. """
46
47 _fileno_generator = itertools.count()
48
49 def __init__(self, *args, **kwargs):
50 self._fileno = next(self._fileno_generator)
51 super(FakeFileDescriptorStringIO, self).__init__(*args, **kwargs)
52
53 def fileno(self):
54 return self._fileno
55
56 def __enter__(self):
57 return self
58
59 def __exit__(self, exc_type, exc_val, exc_tb):
60 pass
61
62
63 try:
64 FileNotFoundError
65 PermissionError
66 except NameError:
67 # Python 2 uses IOError.
68 FileNotFoundError = functools.partial(IOError, errno.ENOENT)
69 PermissionError = functools.partial(IOError, errno.EPERM)
70
71
72 def make_pidlockfile_scenarios():
73 """ Make a collection of scenarios for testing `PIDLockFile` instances.
74
75 :return: A collection of scenarios for tests involving
76 `PIDLockfFile` instances.
77
78 The collection is a mapping from scenario name to a dictionary of
79 scenario attributes.
80
81 """
82
83 fake_current_pid = 235
84 fake_other_pid = 8642
85 fake_pidfile_path = tempfile.mktemp()
86
87 fake_pidfile_empty = FakeFileDescriptorStringIO()
88 fake_pidfile_current_pid = FakeFileDescriptorStringIO(
89 "{pid:d}\n".format(pid=fake_current_pid))
90 fake_pidfile_other_pid = FakeFileDescriptorStringIO(
91 "{pid:d}\n".format(pid=fake_other_pid))
92 fake_pidfile_bogus = FakeFileDescriptorStringIO(
93 "b0gUs")
94
95 scenarios = {
96 'simple': {},
97 'not-exist': {
98 'open_func_name': 'fake_open_nonexist',
99 'os_open_func_name': 'fake_os_open_nonexist',
100 },
101 'not-exist-write-denied': {
102 'open_func_name': 'fake_open_nonexist',
103 'os_open_func_name': 'fake_os_open_nonexist',
104 },
105 'not-exist-write-busy': {
106 'open_func_name': 'fake_open_nonexist',
107 'os_open_func_name': 'fake_os_open_nonexist',
108 },
109 'exist-read-denied': {
110 'open_func_name': 'fake_open_read_denied',
111 'os_open_func_name': 'fake_os_open_read_denied',
112 },
113 'exist-locked-read-denied': {
114 'locking_pid': fake_other_pid,
115 'open_func_name': 'fake_open_read_denied',
116 'os_open_func_name': 'fake_os_open_read_denied',
117 },
118 'exist-empty': {},
119 'exist-invalid': {
120 'pidfile': fake_pidfile_bogus,
121 },
122 'exist-current-pid': {
123 'pidfile': fake_pidfile_current_pid,
124 'pidfile_pid': fake_current_pid,
125 },
126 'exist-current-pid-locked': {
127 'pidfile': fake_pidfile_current_pid,
128 'pidfile_pid': fake_current_pid,
129 'locking_pid': fake_current_pid,
130 },
131 'exist-other-pid': {
132 'pidfile': fake_pidfile_other_pid,
133 'pidfile_pid': fake_other_pid,
134 },
135 'exist-other-pid-locked': {
136 'pidfile': fake_pidfile_other_pid,
137 'pidfile_pid': fake_other_pid,
138 'locking_pid': fake_other_pid,
139 },
140 }
141
142 for scenario in scenarios.values():
143 scenario['pid'] = fake_current_pid
144 scenario['pidfile_path'] = fake_pidfile_path
145 if 'pidfile' not in scenario:
146 scenario['pidfile'] = fake_pidfile_empty
147 if 'pidfile_pid' not in scenario:
148 scenario['pidfile_pid'] = None
149 if 'locking_pid' not in scenario:
150 scenario['locking_pid'] = None
151 if 'open_func_name' not in scenario:
152 scenario['open_func_name'] = 'fake_open_okay'
153 if 'os_open_func_name' not in scenario:
154 scenario['os_open_func_name'] = 'fake_os_open_okay'
155
156 return scenarios
157
158
159 def setup_pidfile_fixtures(testcase):
160 """ Set up common fixtures for PID file test cases.
161
162 :param testcase: A `TestCase` instance to decorate.
163
164 Decorate the `testcase` with attributes to be fixtures for tests
165 involving `PIDLockFile` instances.
166
167 """
168 scenarios = make_pidlockfile_scenarios()
169 testcase.pidlockfile_scenarios = scenarios
170
171 def get_scenario_option(testcase, key, default=None):
172 value = default
173 try:
174 value = testcase.scenario[key]
175 except (NameError, TypeError, AttributeError, KeyError):
176 pass
177 return value
178
179 func_patcher_os_getpid = mock.patch.object(
180 os, "getpid",
181 return_value=scenarios['simple']['pid'])
182 func_patcher_os_getpid.start()
183 testcase.addCleanup(func_patcher_os_getpid.stop)
184
185 def make_fake_open_funcs(testcase):
186
187 def fake_open_nonexist(filename, mode, buffering):
188 if mode.startswith('r'):
189 error = FileNotFoundError(
190 "No such file {filename!r}".format(
191 filename=filename))
192 raise error
193 else:
194 result = testcase.scenario['pidfile']
195 return result
196
197 def fake_open_read_denied(filename, mode, buffering):
198 if mode.startswith('r'):
199 error = PermissionError(
200 "Read denied on {filename!r}".format(
201 filename=filename))
202 raise error
203 else:
204 result = testcase.scenario['pidfile']
205 return result
206
207 def fake_open_okay(filename, mode, buffering):
208 result = testcase.scenario['pidfile']
209 return result
210
211 def fake_os_open_nonexist(filename, flags, mode):
212 if (flags & os.O_CREAT):
213 result = testcase.scenario['pidfile'].fileno()
214 else:
215 error = FileNotFoundError(
216 "No such file {filename!r}".format(
217 filename=filename))
218 raise error
219 return result
220
221 def fake_os_open_read_denied(filename, flags, mode):
222 if (flags & os.O_CREAT):
223 result = testcase.scenario['pidfile'].fileno()
224 else:
225 error = PermissionError(
226 "Read denied on {filename!r}".format(
227 filename=filename))
228 raise error
229 return result
230
231 def fake_os_open_okay(filename, flags, mode):
232 result = testcase.scenario['pidfile'].fileno()
233 return result
234
235 funcs = dict(
236 (name, obj) for (name, obj) in vars().items()
237 if callable(obj))
238
239 return funcs
240
241 testcase.fake_pidfile_open_funcs = make_fake_open_funcs(testcase)
242
243 def fake_open(filename, mode='rt', buffering=None):
244 scenario_path = get_scenario_option(testcase, 'pidfile_path')
245 if filename == scenario_path:
246 func_name = testcase.scenario['open_func_name']
247 fake_open_func = testcase.fake_pidfile_open_funcs[func_name]
248 result = fake_open_func(filename, mode, buffering)
249 else:
250 result = FakeFileDescriptorStringIO()
251 return result
252
253 mock_open = mock.mock_open()
254 mock_open.side_effect = fake_open
255
256 func_patcher_builtin_open = mock.patch.object(
257 builtins, "open",
258 new=mock_open)
259 func_patcher_builtin_open.start()
260 testcase.addCleanup(func_patcher_builtin_open.stop)
261
262 def fake_os_open(filename, flags, mode=None):
263 scenario_path = get_scenario_option(testcase, 'pidfile_path')
264 if filename == scenario_path:
265 func_name = testcase.scenario['os_open_func_name']
266 fake_os_open_func = testcase.fake_pidfile_open_funcs[func_name]
267 result = fake_os_open_func(filename, flags, mode)
268 else:
269 result = FakeFileDescriptorStringIO().fileno()
270 return result
271
272 mock_os_open = mock.MagicMock(side_effect=fake_os_open)
273
274 func_patcher_os_open = mock.patch.object(
275 os, "open",
276 new=mock_os_open)
277 func_patcher_os_open.start()
278 testcase.addCleanup(func_patcher_os_open.stop)
279
280 def fake_os_fdopen(fd, mode='rt', buffering=None):
281 scenario_pidfile = get_scenario_option(
282 testcase, 'pidfile', FakeFileDescriptorStringIO())
283 if fd == testcase.scenario['pidfile'].fileno():
284 result = testcase.scenario['pidfile']
285 else:
286 raise OSError(errno.EBADF, "Bad file descriptor")
287 return result
288
289 mock_os_fdopen = mock.MagicMock(side_effect=fake_os_fdopen)
290
291 func_patcher_os_fdopen = mock.patch.object(
292 os, "fdopen",
293 new=mock_os_fdopen)
294 func_patcher_os_fdopen.start()
295 testcase.addCleanup(func_patcher_os_fdopen.stop)
296
297
298 def make_lockfile_method_fakes(scenario):
299 """ Make common fake methods for lockfile class.
300
301 :param scenario: A scenario for testing with PIDLockFile.
302 :return: A mapping from normal function name to the corresponding
303 fake function.
304
305 Each fake function behaves appropriately for the specified `scenario`.
306
307 """
308
309 def fake_func_read_pid():
310 return scenario['pidfile_pid']
311 def fake_func_is_locked():
312 return (scenario['locking_pid'] is not None)
313 def fake_func_i_am_locking():
314 return (
315 scenario['locking_pid'] == scenario['pid'])
316 def fake_func_acquire(timeout=None):
317 if scenario['locking_pid'] is not None:
318 raise lockfile.AlreadyLocked()
319 scenario['locking_pid'] = scenario['pid']
320 def fake_func_release():
321 if scenario['locking_pid'] is None:
322 raise lockfile.NotLocked()
323 if scenario['locking_pid'] != scenario['pid']:
324 raise lockfile.NotMyLock()
325 scenario['locking_pid'] = None
326 def fake_func_break_lock():
327 scenario['locking_pid'] = None
328
329 fake_methods = dict(
330 (
331 func_name.replace('fake_func_', ''),
332 mock.MagicMock(side_effect=fake_func))
333 for (func_name, fake_func) in vars().items()
334 if func_name.startswith('fake_func_'))
335
336 return fake_methods
337
338
339 def apply_lockfile_method_mocks(mock_lockfile, testcase, scenario):
340 """ Apply common fake methods to mock lockfile class.
341
342 :param mock_lockfile: An object providing the `LockFile` interface.
343 :param testcase: The `TestCase` instance providing the context for
344 the patch.
345 :param scenario: The `PIDLockFile` test scenario to use.
346
347 Mock the `LockFile` methods of `mock_lockfile`, by applying fake
348 methods customised for `scenario`. The mock is does by a patch
349 within the context of `testcase`.
350
351 """
352 fake_methods = dict(
353 (func_name, fake_func)
354 for (func_name, fake_func) in
355 make_lockfile_method_fakes(scenario).items()
356 if func_name not in ['read_pid'])
357
358 for (func_name, fake_func) in fake_methods.items():
359 func_patcher = mock.patch.object(
360 mock_lockfile, func_name,
361 new=fake_func)
362 func_patcher.start()
363 testcase.addCleanup(func_patcher.stop)
364
365
366 def setup_pidlockfile_fixtures(testcase, scenario_name=None):
367 """ Set up common fixtures for PIDLockFile test cases.
368
369 :param testcase: The `TestCase` instance to decorate.
370 :param scenario_name: The name of the `PIDLockFile` scenario to use.
371
372 Decorate the `testcase` with attributes that are fixtures for test
373 cases involving `PIDLockFile` instances.`
374
375 """
376
377 setup_pidfile_fixtures(testcase)
378
379 for func_name in [
380 'write_pid_to_pidfile',
381 'remove_existing_pidfile',
382 ]:
383 func_patcher = mock.patch.object(lockfile.pidlockfile, func_name)
384 func_patcher.start()
385 testcase.addCleanup(func_patcher.stop)
386
387
388 class TimeoutPIDLockFile_TestCase(scaffold.TestCase):
389 """ Test cases for ‘TimeoutPIDLockFile’ class. """
390
391 def setUp(self):
392 """ Set up test fixtures. """
393 super(TimeoutPIDLockFile_TestCase, self).setUp()
394
395 pidlockfile_scenarios = make_pidlockfile_scenarios()
396 self.pidlockfile_scenario = pidlockfile_scenarios['simple']
397 pidfile_path = self.pidlockfile_scenario['pidfile_path']
398
399 for func_name in ['__init__', 'acquire']:
400 func_patcher = mock.patch.object(
401 lockfile.pidlockfile.PIDLockFile, func_name)
402 func_patcher.start()
403 self.addCleanup(func_patcher.stop)
404
405 self.scenario = {
406 'pidfile_path': self.pidlockfile_scenario['pidfile_path'],
407 'acquire_timeout': self.getUniqueInteger(),
408 }
409
410 self.test_kwargs = dict(
411 path=self.scenario['pidfile_path'],
412 acquire_timeout=self.scenario['acquire_timeout'],
413 )
414 self.test_instance = daemon.pidfile.TimeoutPIDLockFile(
415 **self.test_kwargs)
416
417 def test_inherits_from_pidlockfile(self):
418 """ Should inherit from PIDLockFile. """
419 instance = self.test_instance
420 self.assertIsInstance(instance, lockfile.pidlockfile.PIDLockFile)
421
422 def test_init_has_expected_signature(self):
423 """ Should have expected signature for ‘__init__’. """
424 def test_func(self, path, acquire_timeout=None, *args, **kwargs): pass
425 test_func.__name__ = str('__init__')
426 self.assertFunctionSignatureMatch(
427 test_func,
428 daemon.pidfile.TimeoutPIDLockFile.__init__)
429
430 def test_has_specified_acquire_timeout(self):
431 """ Should have specified ‘acquire_timeout’ value. """
432 instance = self.test_instance
433 expected_timeout = self.test_kwargs['acquire_timeout']
434 self.assertEqual(expected_timeout, instance.acquire_timeout)
435
436 @mock.patch.object(
437 lockfile.pidlockfile.PIDLockFile, "__init__",
438 autospec=True)
439 def test_calls_superclass_init(self, mock_init):
440 """ Should call the superclass ‘__init__’. """
441 expected_path = self.test_kwargs['path']
442 instance = daemon.pidfile.TimeoutPIDLockFile(**self.test_kwargs)
443 mock_init.assert_called_with(instance, expected_path)
444
445 @mock.patch.object(
446 lockfile.pidlockfile.PIDLockFile, "acquire",
447 autospec=True)
448 def test_acquire_uses_specified_timeout(self, mock_func_acquire):
449 """ Should call the superclass ‘acquire’ with specified timeout. """
450 instance = self.test_instance
451 test_timeout = self.getUniqueInteger()
452 expected_timeout = test_timeout
453 instance.acquire(test_timeout)
454 mock_func_acquire.assert_called_with(instance, expected_timeout)
455
456 @mock.patch.object(
457 lockfile.pidlockfile.PIDLockFile, "acquire",
458 autospec=True)
459 def test_acquire_uses_stored_timeout_by_default(self, mock_func_acquire):
460 """ Should call superclass ‘acquire’ with stored timeout by default. """
461 instance = self.test_instance
462 test_timeout = self.test_kwargs['acquire_timeout']
463 expected_timeout = test_timeout
464 instance.acquire()
465 mock_func_acquire.assert_called_with(instance, expected_timeout)
466
467
468 # Local variables:
469 # coding: utf-8
470 # mode: python
471 # End:
472 # vim: fileencoding=utf-8 filetype=python :