33
|
1 # -*- coding: utf-8 -*-
|
|
2 #
|
|
3 # test/test_pidfile.py
|
|
4 # Part of ‘python-daemon’, an implementation of PEP 3143.
|
|
5 #
|
|
6 # Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
|
|
7 #
|
|
8 # This is free software: you may copy, modify, and/or distribute this work
|
|
9 # under the terms of the Apache License, version 2.0 as published by the
|
|
10 # Apache Software Foundation.
|
|
11 # No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
|
|
12
|
|
13 """ Unit test for ‘pidfile’ module.
|
|
14 """
|
|
15
|
|
16 from __future__ import (absolute_import, unicode_literals)
|
|
17
|
|
18 try:
|
|
19 # Python 3 standard library.
|
|
20 import builtins
|
|
21 except ImportError:
|
|
22 # Python 2 standard library.
|
|
23 import __builtin__ as builtins
|
|
24 import os
|
|
25 import itertools
|
|
26 import tempfile
|
|
27 import errno
|
|
28 import functools
|
|
29 try:
|
|
30 # Standard library of Python 2.7 and later.
|
|
31 from io import StringIO
|
|
32 except ImportError:
|
|
33 # Standard library of Python 2.6 and earlier.
|
|
34 from StringIO import StringIO
|
|
35
|
|
36 import mock
|
|
37 import lockfile
|
|
38
|
|
39 from . import scaffold
|
|
40
|
|
41 import daemon.pidfile
|
|
42
|
|
43
|
|
44 class FakeFileDescriptorStringIO(StringIO, object):
|
|
45 """ A StringIO class that fakes a file descriptor. """
|
|
46
|
|
47 _fileno_generator = itertools.count()
|
|
48
|
|
49 def __init__(self, *args, **kwargs):
|
|
50 self._fileno = next(self._fileno_generator)
|
|
51 super(FakeFileDescriptorStringIO, self).__init__(*args, **kwargs)
|
|
52
|
|
53 def fileno(self):
|
|
54 return self._fileno
|
|
55
|
|
56 def __enter__(self):
|
|
57 return self
|
|
58
|
|
59 def __exit__(self, exc_type, exc_val, exc_tb):
|
|
60 pass
|
|
61
|
|
62
|
|
63 try:
|
|
64 FileNotFoundError
|
|
65 PermissionError
|
|
66 except NameError:
|
|
67 # Python 2 uses IOError.
|
|
68 FileNotFoundError = functools.partial(IOError, errno.ENOENT)
|
|
69 PermissionError = functools.partial(IOError, errno.EPERM)
|
|
70
|
|
71
|
|
72 def make_pidlockfile_scenarios():
|
|
73 """ Make a collection of scenarios for testing `PIDLockFile` instances.
|
|
74
|
|
75 :return: A collection of scenarios for tests involving
|
|
76 `PIDLockfFile` instances.
|
|
77
|
|
78 The collection is a mapping from scenario name to a dictionary of
|
|
79 scenario attributes.
|
|
80
|
|
81 """
|
|
82
|
|
83 fake_current_pid = 235
|
|
84 fake_other_pid = 8642
|
|
85 fake_pidfile_path = tempfile.mktemp()
|
|
86
|
|
87 fake_pidfile_empty = FakeFileDescriptorStringIO()
|
|
88 fake_pidfile_current_pid = FakeFileDescriptorStringIO(
|
|
89 "{pid:d}\n".format(pid=fake_current_pid))
|
|
90 fake_pidfile_other_pid = FakeFileDescriptorStringIO(
|
|
91 "{pid:d}\n".format(pid=fake_other_pid))
|
|
92 fake_pidfile_bogus = FakeFileDescriptorStringIO(
|
|
93 "b0gUs")
|
|
94
|
|
95 scenarios = {
|
|
96 'simple': {},
|
|
97 'not-exist': {
|
|
98 'open_func_name': 'fake_open_nonexist',
|
|
99 'os_open_func_name': 'fake_os_open_nonexist',
|
|
100 },
|
|
101 'not-exist-write-denied': {
|
|
102 'open_func_name': 'fake_open_nonexist',
|
|
103 'os_open_func_name': 'fake_os_open_nonexist',
|
|
104 },
|
|
105 'not-exist-write-busy': {
|
|
106 'open_func_name': 'fake_open_nonexist',
|
|
107 'os_open_func_name': 'fake_os_open_nonexist',
|
|
108 },
|
|
109 'exist-read-denied': {
|
|
110 'open_func_name': 'fake_open_read_denied',
|
|
111 'os_open_func_name': 'fake_os_open_read_denied',
|
|
112 },
|
|
113 'exist-locked-read-denied': {
|
|
114 'locking_pid': fake_other_pid,
|
|
115 'open_func_name': 'fake_open_read_denied',
|
|
116 'os_open_func_name': 'fake_os_open_read_denied',
|
|
117 },
|
|
118 'exist-empty': {},
|
|
119 'exist-invalid': {
|
|
120 'pidfile': fake_pidfile_bogus,
|
|
121 },
|
|
122 'exist-current-pid': {
|
|
123 'pidfile': fake_pidfile_current_pid,
|
|
124 'pidfile_pid': fake_current_pid,
|
|
125 },
|
|
126 'exist-current-pid-locked': {
|
|
127 'pidfile': fake_pidfile_current_pid,
|
|
128 'pidfile_pid': fake_current_pid,
|
|
129 'locking_pid': fake_current_pid,
|
|
130 },
|
|
131 'exist-other-pid': {
|
|
132 'pidfile': fake_pidfile_other_pid,
|
|
133 'pidfile_pid': fake_other_pid,
|
|
134 },
|
|
135 'exist-other-pid-locked': {
|
|
136 'pidfile': fake_pidfile_other_pid,
|
|
137 'pidfile_pid': fake_other_pid,
|
|
138 'locking_pid': fake_other_pid,
|
|
139 },
|
|
140 }
|
|
141
|
|
142 for scenario in scenarios.values():
|
|
143 scenario['pid'] = fake_current_pid
|
|
144 scenario['pidfile_path'] = fake_pidfile_path
|
|
145 if 'pidfile' not in scenario:
|
|
146 scenario['pidfile'] = fake_pidfile_empty
|
|
147 if 'pidfile_pid' not in scenario:
|
|
148 scenario['pidfile_pid'] = None
|
|
149 if 'locking_pid' not in scenario:
|
|
150 scenario['locking_pid'] = None
|
|
151 if 'open_func_name' not in scenario:
|
|
152 scenario['open_func_name'] = 'fake_open_okay'
|
|
153 if 'os_open_func_name' not in scenario:
|
|
154 scenario['os_open_func_name'] = 'fake_os_open_okay'
|
|
155
|
|
156 return scenarios
|
|
157
|
|
158
|
|
159 def setup_pidfile_fixtures(testcase):
|
|
160 """ Set up common fixtures for PID file test cases.
|
|
161
|
|
162 :param testcase: A `TestCase` instance to decorate.
|
|
163
|
|
164 Decorate the `testcase` with attributes to be fixtures for tests
|
|
165 involving `PIDLockFile` instances.
|
|
166
|
|
167 """
|
|
168 scenarios = make_pidlockfile_scenarios()
|
|
169 testcase.pidlockfile_scenarios = scenarios
|
|
170
|
|
171 def get_scenario_option(testcase, key, default=None):
|
|
172 value = default
|
|
173 try:
|
|
174 value = testcase.scenario[key]
|
|
175 except (NameError, TypeError, AttributeError, KeyError):
|
|
176 pass
|
|
177 return value
|
|
178
|
|
179 func_patcher_os_getpid = mock.patch.object(
|
|
180 os, "getpid",
|
|
181 return_value=scenarios['simple']['pid'])
|
|
182 func_patcher_os_getpid.start()
|
|
183 testcase.addCleanup(func_patcher_os_getpid.stop)
|
|
184
|
|
185 def make_fake_open_funcs(testcase):
|
|
186
|
|
187 def fake_open_nonexist(filename, mode, buffering):
|
|
188 if mode.startswith('r'):
|
|
189 error = FileNotFoundError(
|
|
190 "No such file {filename!r}".format(
|
|
191 filename=filename))
|
|
192 raise error
|
|
193 else:
|
|
194 result = testcase.scenario['pidfile']
|
|
195 return result
|
|
196
|
|
197 def fake_open_read_denied(filename, mode, buffering):
|
|
198 if mode.startswith('r'):
|
|
199 error = PermissionError(
|
|
200 "Read denied on {filename!r}".format(
|
|
201 filename=filename))
|
|
202 raise error
|
|
203 else:
|
|
204 result = testcase.scenario['pidfile']
|
|
205 return result
|
|
206
|
|
207 def fake_open_okay(filename, mode, buffering):
|
|
208 result = testcase.scenario['pidfile']
|
|
209 return result
|
|
210
|
|
211 def fake_os_open_nonexist(filename, flags, mode):
|
|
212 if (flags & os.O_CREAT):
|
|
213 result = testcase.scenario['pidfile'].fileno()
|
|
214 else:
|
|
215 error = FileNotFoundError(
|
|
216 "No such file {filename!r}".format(
|
|
217 filename=filename))
|
|
218 raise error
|
|
219 return result
|
|
220
|
|
221 def fake_os_open_read_denied(filename, flags, mode):
|
|
222 if (flags & os.O_CREAT):
|
|
223 result = testcase.scenario['pidfile'].fileno()
|
|
224 else:
|
|
225 error = PermissionError(
|
|
226 "Read denied on {filename!r}".format(
|
|
227 filename=filename))
|
|
228 raise error
|
|
229 return result
|
|
230
|
|
231 def fake_os_open_okay(filename, flags, mode):
|
|
232 result = testcase.scenario['pidfile'].fileno()
|
|
233 return result
|
|
234
|
|
235 funcs = dict(
|
|
236 (name, obj) for (name, obj) in vars().items()
|
|
237 if callable(obj))
|
|
238
|
|
239 return funcs
|
|
240
|
|
241 testcase.fake_pidfile_open_funcs = make_fake_open_funcs(testcase)
|
|
242
|
|
243 def fake_open(filename, mode='rt', buffering=None):
|
|
244 scenario_path = get_scenario_option(testcase, 'pidfile_path')
|
|
245 if filename == scenario_path:
|
|
246 func_name = testcase.scenario['open_func_name']
|
|
247 fake_open_func = testcase.fake_pidfile_open_funcs[func_name]
|
|
248 result = fake_open_func(filename, mode, buffering)
|
|
249 else:
|
|
250 result = FakeFileDescriptorStringIO()
|
|
251 return result
|
|
252
|
|
253 mock_open = mock.mock_open()
|
|
254 mock_open.side_effect = fake_open
|
|
255
|
|
256 func_patcher_builtin_open = mock.patch.object(
|
|
257 builtins, "open",
|
|
258 new=mock_open)
|
|
259 func_patcher_builtin_open.start()
|
|
260 testcase.addCleanup(func_patcher_builtin_open.stop)
|
|
261
|
|
262 def fake_os_open(filename, flags, mode=None):
|
|
263 scenario_path = get_scenario_option(testcase, 'pidfile_path')
|
|
264 if filename == scenario_path:
|
|
265 func_name = testcase.scenario['os_open_func_name']
|
|
266 fake_os_open_func = testcase.fake_pidfile_open_funcs[func_name]
|
|
267 result = fake_os_open_func(filename, flags, mode)
|
|
268 else:
|
|
269 result = FakeFileDescriptorStringIO().fileno()
|
|
270 return result
|
|
271
|
|
272 mock_os_open = mock.MagicMock(side_effect=fake_os_open)
|
|
273
|
|
274 func_patcher_os_open = mock.patch.object(
|
|
275 os, "open",
|
|
276 new=mock_os_open)
|
|
277 func_patcher_os_open.start()
|
|
278 testcase.addCleanup(func_patcher_os_open.stop)
|
|
279
|
|
280 def fake_os_fdopen(fd, mode='rt', buffering=None):
|
|
281 scenario_pidfile = get_scenario_option(
|
|
282 testcase, 'pidfile', FakeFileDescriptorStringIO())
|
|
283 if fd == testcase.scenario['pidfile'].fileno():
|
|
284 result = testcase.scenario['pidfile']
|
|
285 else:
|
|
286 raise OSError(errno.EBADF, "Bad file descriptor")
|
|
287 return result
|
|
288
|
|
289 mock_os_fdopen = mock.MagicMock(side_effect=fake_os_fdopen)
|
|
290
|
|
291 func_patcher_os_fdopen = mock.patch.object(
|
|
292 os, "fdopen",
|
|
293 new=mock_os_fdopen)
|
|
294 func_patcher_os_fdopen.start()
|
|
295 testcase.addCleanup(func_patcher_os_fdopen.stop)
|
|
296
|
|
297
|
|
298 def make_lockfile_method_fakes(scenario):
|
|
299 """ Make common fake methods for lockfile class.
|
|
300
|
|
301 :param scenario: A scenario for testing with PIDLockFile.
|
|
302 :return: A mapping from normal function name to the corresponding
|
|
303 fake function.
|
|
304
|
|
305 Each fake function behaves appropriately for the specified `scenario`.
|
|
306
|
|
307 """
|
|
308
|
|
309 def fake_func_read_pid():
|
|
310 return scenario['pidfile_pid']
|
|
311 def fake_func_is_locked():
|
|
312 return (scenario['locking_pid'] is not None)
|
|
313 def fake_func_i_am_locking():
|
|
314 return (
|
|
315 scenario['locking_pid'] == scenario['pid'])
|
|
316 def fake_func_acquire(timeout=None):
|
|
317 if scenario['locking_pid'] is not None:
|
|
318 raise lockfile.AlreadyLocked()
|
|
319 scenario['locking_pid'] = scenario['pid']
|
|
320 def fake_func_release():
|
|
321 if scenario['locking_pid'] is None:
|
|
322 raise lockfile.NotLocked()
|
|
323 if scenario['locking_pid'] != scenario['pid']:
|
|
324 raise lockfile.NotMyLock()
|
|
325 scenario['locking_pid'] = None
|
|
326 def fake_func_break_lock():
|
|
327 scenario['locking_pid'] = None
|
|
328
|
|
329 fake_methods = dict(
|
|
330 (
|
|
331 func_name.replace('fake_func_', ''),
|
|
332 mock.MagicMock(side_effect=fake_func))
|
|
333 for (func_name, fake_func) in vars().items()
|
|
334 if func_name.startswith('fake_func_'))
|
|
335
|
|
336 return fake_methods
|
|
337
|
|
338
|
|
339 def apply_lockfile_method_mocks(mock_lockfile, testcase, scenario):
|
|
340 """ Apply common fake methods to mock lockfile class.
|
|
341
|
|
342 :param mock_lockfile: An object providing the `LockFile` interface.
|
|
343 :param testcase: The `TestCase` instance providing the context for
|
|
344 the patch.
|
|
345 :param scenario: The `PIDLockFile` test scenario to use.
|
|
346
|
|
347 Mock the `LockFile` methods of `mock_lockfile`, by applying fake
|
|
348 methods customised for `scenario`. The mock is does by a patch
|
|
349 within the context of `testcase`.
|
|
350
|
|
351 """
|
|
352 fake_methods = dict(
|
|
353 (func_name, fake_func)
|
|
354 for (func_name, fake_func) in
|
|
355 make_lockfile_method_fakes(scenario).items()
|
|
356 if func_name not in ['read_pid'])
|
|
357
|
|
358 for (func_name, fake_func) in fake_methods.items():
|
|
359 func_patcher = mock.patch.object(
|
|
360 mock_lockfile, func_name,
|
|
361 new=fake_func)
|
|
362 func_patcher.start()
|
|
363 testcase.addCleanup(func_patcher.stop)
|
|
364
|
|
365
|
|
366 def setup_pidlockfile_fixtures(testcase, scenario_name=None):
|
|
367 """ Set up common fixtures for PIDLockFile test cases.
|
|
368
|
|
369 :param testcase: The `TestCase` instance to decorate.
|
|
370 :param scenario_name: The name of the `PIDLockFile` scenario to use.
|
|
371
|
|
372 Decorate the `testcase` with attributes that are fixtures for test
|
|
373 cases involving `PIDLockFile` instances.`
|
|
374
|
|
375 """
|
|
376
|
|
377 setup_pidfile_fixtures(testcase)
|
|
378
|
|
379 for func_name in [
|
|
380 'write_pid_to_pidfile',
|
|
381 'remove_existing_pidfile',
|
|
382 ]:
|
|
383 func_patcher = mock.patch.object(lockfile.pidlockfile, func_name)
|
|
384 func_patcher.start()
|
|
385 testcase.addCleanup(func_patcher.stop)
|
|
386
|
|
387
|
|
388 class TimeoutPIDLockFile_TestCase(scaffold.TestCase):
|
|
389 """ Test cases for ‘TimeoutPIDLockFile’ class. """
|
|
390
|
|
391 def setUp(self):
|
|
392 """ Set up test fixtures. """
|
|
393 super(TimeoutPIDLockFile_TestCase, self).setUp()
|
|
394
|
|
395 pidlockfile_scenarios = make_pidlockfile_scenarios()
|
|
396 self.pidlockfile_scenario = pidlockfile_scenarios['simple']
|
|
397 pidfile_path = self.pidlockfile_scenario['pidfile_path']
|
|
398
|
|
399 for func_name in ['__init__', 'acquire']:
|
|
400 func_patcher = mock.patch.object(
|
|
401 lockfile.pidlockfile.PIDLockFile, func_name)
|
|
402 func_patcher.start()
|
|
403 self.addCleanup(func_patcher.stop)
|
|
404
|
|
405 self.scenario = {
|
|
406 'pidfile_path': self.pidlockfile_scenario['pidfile_path'],
|
|
407 'acquire_timeout': self.getUniqueInteger(),
|
|
408 }
|
|
409
|
|
410 self.test_kwargs = dict(
|
|
411 path=self.scenario['pidfile_path'],
|
|
412 acquire_timeout=self.scenario['acquire_timeout'],
|
|
413 )
|
|
414 self.test_instance = daemon.pidfile.TimeoutPIDLockFile(
|
|
415 **self.test_kwargs)
|
|
416
|
|
417 def test_inherits_from_pidlockfile(self):
|
|
418 """ Should inherit from PIDLockFile. """
|
|
419 instance = self.test_instance
|
|
420 self.assertIsInstance(instance, lockfile.pidlockfile.PIDLockFile)
|
|
421
|
|
422 def test_init_has_expected_signature(self):
|
|
423 """ Should have expected signature for ‘__init__’. """
|
|
424 def test_func(self, path, acquire_timeout=None, *args, **kwargs): pass
|
|
425 test_func.__name__ = str('__init__')
|
|
426 self.assertFunctionSignatureMatch(
|
|
427 test_func,
|
|
428 daemon.pidfile.TimeoutPIDLockFile.__init__)
|
|
429
|
|
430 def test_has_specified_acquire_timeout(self):
|
|
431 """ Should have specified ‘acquire_timeout’ value. """
|
|
432 instance = self.test_instance
|
|
433 expected_timeout = self.test_kwargs['acquire_timeout']
|
|
434 self.assertEqual(expected_timeout, instance.acquire_timeout)
|
|
435
|
|
436 @mock.patch.object(
|
|
437 lockfile.pidlockfile.PIDLockFile, "__init__",
|
|
438 autospec=True)
|
|
439 def test_calls_superclass_init(self, mock_init):
|
|
440 """ Should call the superclass ‘__init__’. """
|
|
441 expected_path = self.test_kwargs['path']
|
|
442 instance = daemon.pidfile.TimeoutPIDLockFile(**self.test_kwargs)
|
|
443 mock_init.assert_called_with(instance, expected_path)
|
|
444
|
|
445 @mock.patch.object(
|
|
446 lockfile.pidlockfile.PIDLockFile, "acquire",
|
|
447 autospec=True)
|
|
448 def test_acquire_uses_specified_timeout(self, mock_func_acquire):
|
|
449 """ Should call the superclass ‘acquire’ with specified timeout. """
|
|
450 instance = self.test_instance
|
|
451 test_timeout = self.getUniqueInteger()
|
|
452 expected_timeout = test_timeout
|
|
453 instance.acquire(test_timeout)
|
|
454 mock_func_acquire.assert_called_with(instance, expected_timeout)
|
|
455
|
|
456 @mock.patch.object(
|
|
457 lockfile.pidlockfile.PIDLockFile, "acquire",
|
|
458 autospec=True)
|
|
459 def test_acquire_uses_stored_timeout_by_default(self, mock_func_acquire):
|
|
460 """ Should call superclass ‘acquire’ with stored timeout by default. """
|
|
461 instance = self.test_instance
|
|
462 test_timeout = self.test_kwargs['acquire_timeout']
|
|
463 expected_timeout = test_timeout
|
|
464 instance.acquire()
|
|
465 mock_func_acquire.assert_called_with(instance, expected_timeout)
|
|
466
|
|
467
|
|
468 # Local variables:
|
|
469 # coding: utf-8
|
|
470 # mode: python
|
|
471 # End:
|
|
472 # vim: fileencoding=utf-8 filetype=python :
|