Mercurial > repos > melissacline > ucsc_xena_platform
comparison python-daemon-2.0.5/test/test_pidfile.py @ 33:7ceb967147c3
start xena with no gui
add library files
author | jingchunzhu <jingchunzhu@gmail.com> |
---|---|
date | Wed, 22 Jul 2015 13:24:44 -0700 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
32:63b1ba1e3424 | 33:7ceb967147c3 |
---|---|
1 # -*- coding: utf-8 -*- | |
2 # | |
3 # test/test_pidfile.py | |
4 # Part of ‘python-daemon’, an implementation of PEP 3143. | |
5 # | |
6 # Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au> | |
7 # | |
8 # This is free software: you may copy, modify, and/or distribute this work | |
9 # under the terms of the Apache License, version 2.0 as published by the | |
10 # Apache Software Foundation. | |
11 # No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details. | |
12 | |
13 """ Unit test for ‘pidfile’ module. | |
14 """ | |
15 | |
16 from __future__ import (absolute_import, unicode_literals) | |
17 | |
18 try: | |
19 # Python 3 standard library. | |
20 import builtins | |
21 except ImportError: | |
22 # Python 2 standard library. | |
23 import __builtin__ as builtins | |
24 import os | |
25 import itertools | |
26 import tempfile | |
27 import errno | |
28 import functools | |
29 try: | |
30 # Standard library of Python 2.7 and later. | |
31 from io import StringIO | |
32 except ImportError: | |
33 # Standard library of Python 2.6 and earlier. | |
34 from StringIO import StringIO | |
35 | |
36 import mock | |
37 import lockfile | |
38 | |
39 from . import scaffold | |
40 | |
41 import daemon.pidfile | |
42 | |
43 | |
44 class FakeFileDescriptorStringIO(StringIO, object): | |
45 """ A StringIO class that fakes a file descriptor. """ | |
46 | |
47 _fileno_generator = itertools.count() | |
48 | |
49 def __init__(self, *args, **kwargs): | |
50 self._fileno = next(self._fileno_generator) | |
51 super(FakeFileDescriptorStringIO, self).__init__(*args, **kwargs) | |
52 | |
53 def fileno(self): | |
54 return self._fileno | |
55 | |
56 def __enter__(self): | |
57 return self | |
58 | |
59 def __exit__(self, exc_type, exc_val, exc_tb): | |
60 pass | |
61 | |
62 | |
63 try: | |
64 FileNotFoundError | |
65 PermissionError | |
66 except NameError: | |
67 # Python 2 uses IOError. | |
68 FileNotFoundError = functools.partial(IOError, errno.ENOENT) | |
69 PermissionError = functools.partial(IOError, errno.EPERM) | |
70 | |
71 | |
72 def make_pidlockfile_scenarios(): | |
73 """ Make a collection of scenarios for testing `PIDLockFile` instances. | |
74 | |
75 :return: A collection of scenarios for tests involving | |
76 `PIDLockfFile` instances. | |
77 | |
78 The collection is a mapping from scenario name to a dictionary of | |
79 scenario attributes. | |
80 | |
81 """ | |
82 | |
83 fake_current_pid = 235 | |
84 fake_other_pid = 8642 | |
85 fake_pidfile_path = tempfile.mktemp() | |
86 | |
87 fake_pidfile_empty = FakeFileDescriptorStringIO() | |
88 fake_pidfile_current_pid = FakeFileDescriptorStringIO( | |
89 "{pid:d}\n".format(pid=fake_current_pid)) | |
90 fake_pidfile_other_pid = FakeFileDescriptorStringIO( | |
91 "{pid:d}\n".format(pid=fake_other_pid)) | |
92 fake_pidfile_bogus = FakeFileDescriptorStringIO( | |
93 "b0gUs") | |
94 | |
95 scenarios = { | |
96 'simple': {}, | |
97 'not-exist': { | |
98 'open_func_name': 'fake_open_nonexist', | |
99 'os_open_func_name': 'fake_os_open_nonexist', | |
100 }, | |
101 'not-exist-write-denied': { | |
102 'open_func_name': 'fake_open_nonexist', | |
103 'os_open_func_name': 'fake_os_open_nonexist', | |
104 }, | |
105 'not-exist-write-busy': { | |
106 'open_func_name': 'fake_open_nonexist', | |
107 'os_open_func_name': 'fake_os_open_nonexist', | |
108 }, | |
109 'exist-read-denied': { | |
110 'open_func_name': 'fake_open_read_denied', | |
111 'os_open_func_name': 'fake_os_open_read_denied', | |
112 }, | |
113 'exist-locked-read-denied': { | |
114 'locking_pid': fake_other_pid, | |
115 'open_func_name': 'fake_open_read_denied', | |
116 'os_open_func_name': 'fake_os_open_read_denied', | |
117 }, | |
118 'exist-empty': {}, | |
119 'exist-invalid': { | |
120 'pidfile': fake_pidfile_bogus, | |
121 }, | |
122 'exist-current-pid': { | |
123 'pidfile': fake_pidfile_current_pid, | |
124 'pidfile_pid': fake_current_pid, | |
125 }, | |
126 'exist-current-pid-locked': { | |
127 'pidfile': fake_pidfile_current_pid, | |
128 'pidfile_pid': fake_current_pid, | |
129 'locking_pid': fake_current_pid, | |
130 }, | |
131 'exist-other-pid': { | |
132 'pidfile': fake_pidfile_other_pid, | |
133 'pidfile_pid': fake_other_pid, | |
134 }, | |
135 'exist-other-pid-locked': { | |
136 'pidfile': fake_pidfile_other_pid, | |
137 'pidfile_pid': fake_other_pid, | |
138 'locking_pid': fake_other_pid, | |
139 }, | |
140 } | |
141 | |
142 for scenario in scenarios.values(): | |
143 scenario['pid'] = fake_current_pid | |
144 scenario['pidfile_path'] = fake_pidfile_path | |
145 if 'pidfile' not in scenario: | |
146 scenario['pidfile'] = fake_pidfile_empty | |
147 if 'pidfile_pid' not in scenario: | |
148 scenario['pidfile_pid'] = None | |
149 if 'locking_pid' not in scenario: | |
150 scenario['locking_pid'] = None | |
151 if 'open_func_name' not in scenario: | |
152 scenario['open_func_name'] = 'fake_open_okay' | |
153 if 'os_open_func_name' not in scenario: | |
154 scenario['os_open_func_name'] = 'fake_os_open_okay' | |
155 | |
156 return scenarios | |
157 | |
158 | |
159 def setup_pidfile_fixtures(testcase): | |
160 """ Set up common fixtures for PID file test cases. | |
161 | |
162 :param testcase: A `TestCase` instance to decorate. | |
163 | |
164 Decorate the `testcase` with attributes to be fixtures for tests | |
165 involving `PIDLockFile` instances. | |
166 | |
167 """ | |
168 scenarios = make_pidlockfile_scenarios() | |
169 testcase.pidlockfile_scenarios = scenarios | |
170 | |
171 def get_scenario_option(testcase, key, default=None): | |
172 value = default | |
173 try: | |
174 value = testcase.scenario[key] | |
175 except (NameError, TypeError, AttributeError, KeyError): | |
176 pass | |
177 return value | |
178 | |
179 func_patcher_os_getpid = mock.patch.object( | |
180 os, "getpid", | |
181 return_value=scenarios['simple']['pid']) | |
182 func_patcher_os_getpid.start() | |
183 testcase.addCleanup(func_patcher_os_getpid.stop) | |
184 | |
185 def make_fake_open_funcs(testcase): | |
186 | |
187 def fake_open_nonexist(filename, mode, buffering): | |
188 if mode.startswith('r'): | |
189 error = FileNotFoundError( | |
190 "No such file {filename!r}".format( | |
191 filename=filename)) | |
192 raise error | |
193 else: | |
194 result = testcase.scenario['pidfile'] | |
195 return result | |
196 | |
197 def fake_open_read_denied(filename, mode, buffering): | |
198 if mode.startswith('r'): | |
199 error = PermissionError( | |
200 "Read denied on {filename!r}".format( | |
201 filename=filename)) | |
202 raise error | |
203 else: | |
204 result = testcase.scenario['pidfile'] | |
205 return result | |
206 | |
207 def fake_open_okay(filename, mode, buffering): | |
208 result = testcase.scenario['pidfile'] | |
209 return result | |
210 | |
211 def fake_os_open_nonexist(filename, flags, mode): | |
212 if (flags & os.O_CREAT): | |
213 result = testcase.scenario['pidfile'].fileno() | |
214 else: | |
215 error = FileNotFoundError( | |
216 "No such file {filename!r}".format( | |
217 filename=filename)) | |
218 raise error | |
219 return result | |
220 | |
221 def fake_os_open_read_denied(filename, flags, mode): | |
222 if (flags & os.O_CREAT): | |
223 result = testcase.scenario['pidfile'].fileno() | |
224 else: | |
225 error = PermissionError( | |
226 "Read denied on {filename!r}".format( | |
227 filename=filename)) | |
228 raise error | |
229 return result | |
230 | |
231 def fake_os_open_okay(filename, flags, mode): | |
232 result = testcase.scenario['pidfile'].fileno() | |
233 return result | |
234 | |
235 funcs = dict( | |
236 (name, obj) for (name, obj) in vars().items() | |
237 if callable(obj)) | |
238 | |
239 return funcs | |
240 | |
241 testcase.fake_pidfile_open_funcs = make_fake_open_funcs(testcase) | |
242 | |
243 def fake_open(filename, mode='rt', buffering=None): | |
244 scenario_path = get_scenario_option(testcase, 'pidfile_path') | |
245 if filename == scenario_path: | |
246 func_name = testcase.scenario['open_func_name'] | |
247 fake_open_func = testcase.fake_pidfile_open_funcs[func_name] | |
248 result = fake_open_func(filename, mode, buffering) | |
249 else: | |
250 result = FakeFileDescriptorStringIO() | |
251 return result | |
252 | |
253 mock_open = mock.mock_open() | |
254 mock_open.side_effect = fake_open | |
255 | |
256 func_patcher_builtin_open = mock.patch.object( | |
257 builtins, "open", | |
258 new=mock_open) | |
259 func_patcher_builtin_open.start() | |
260 testcase.addCleanup(func_patcher_builtin_open.stop) | |
261 | |
262 def fake_os_open(filename, flags, mode=None): | |
263 scenario_path = get_scenario_option(testcase, 'pidfile_path') | |
264 if filename == scenario_path: | |
265 func_name = testcase.scenario['os_open_func_name'] | |
266 fake_os_open_func = testcase.fake_pidfile_open_funcs[func_name] | |
267 result = fake_os_open_func(filename, flags, mode) | |
268 else: | |
269 result = FakeFileDescriptorStringIO().fileno() | |
270 return result | |
271 | |
272 mock_os_open = mock.MagicMock(side_effect=fake_os_open) | |
273 | |
274 func_patcher_os_open = mock.patch.object( | |
275 os, "open", | |
276 new=mock_os_open) | |
277 func_patcher_os_open.start() | |
278 testcase.addCleanup(func_patcher_os_open.stop) | |
279 | |
280 def fake_os_fdopen(fd, mode='rt', buffering=None): | |
281 scenario_pidfile = get_scenario_option( | |
282 testcase, 'pidfile', FakeFileDescriptorStringIO()) | |
283 if fd == testcase.scenario['pidfile'].fileno(): | |
284 result = testcase.scenario['pidfile'] | |
285 else: | |
286 raise OSError(errno.EBADF, "Bad file descriptor") | |
287 return result | |
288 | |
289 mock_os_fdopen = mock.MagicMock(side_effect=fake_os_fdopen) | |
290 | |
291 func_patcher_os_fdopen = mock.patch.object( | |
292 os, "fdopen", | |
293 new=mock_os_fdopen) | |
294 func_patcher_os_fdopen.start() | |
295 testcase.addCleanup(func_patcher_os_fdopen.stop) | |
296 | |
297 | |
298 def make_lockfile_method_fakes(scenario): | |
299 """ Make common fake methods for lockfile class. | |
300 | |
301 :param scenario: A scenario for testing with PIDLockFile. | |
302 :return: A mapping from normal function name to the corresponding | |
303 fake function. | |
304 | |
305 Each fake function behaves appropriately for the specified `scenario`. | |
306 | |
307 """ | |
308 | |
309 def fake_func_read_pid(): | |
310 return scenario['pidfile_pid'] | |
311 def fake_func_is_locked(): | |
312 return (scenario['locking_pid'] is not None) | |
313 def fake_func_i_am_locking(): | |
314 return ( | |
315 scenario['locking_pid'] == scenario['pid']) | |
316 def fake_func_acquire(timeout=None): | |
317 if scenario['locking_pid'] is not None: | |
318 raise lockfile.AlreadyLocked() | |
319 scenario['locking_pid'] = scenario['pid'] | |
320 def fake_func_release(): | |
321 if scenario['locking_pid'] is None: | |
322 raise lockfile.NotLocked() | |
323 if scenario['locking_pid'] != scenario['pid']: | |
324 raise lockfile.NotMyLock() | |
325 scenario['locking_pid'] = None | |
326 def fake_func_break_lock(): | |
327 scenario['locking_pid'] = None | |
328 | |
329 fake_methods = dict( | |
330 ( | |
331 func_name.replace('fake_func_', ''), | |
332 mock.MagicMock(side_effect=fake_func)) | |
333 for (func_name, fake_func) in vars().items() | |
334 if func_name.startswith('fake_func_')) | |
335 | |
336 return fake_methods | |
337 | |
338 | |
339 def apply_lockfile_method_mocks(mock_lockfile, testcase, scenario): | |
340 """ Apply common fake methods to mock lockfile class. | |
341 | |
342 :param mock_lockfile: An object providing the `LockFile` interface. | |
343 :param testcase: The `TestCase` instance providing the context for | |
344 the patch. | |
345 :param scenario: The `PIDLockFile` test scenario to use. | |
346 | |
347 Mock the `LockFile` methods of `mock_lockfile`, by applying fake | |
348 methods customised for `scenario`. The mock is does by a patch | |
349 within the context of `testcase`. | |
350 | |
351 """ | |
352 fake_methods = dict( | |
353 (func_name, fake_func) | |
354 for (func_name, fake_func) in | |
355 make_lockfile_method_fakes(scenario).items() | |
356 if func_name not in ['read_pid']) | |
357 | |
358 for (func_name, fake_func) in fake_methods.items(): | |
359 func_patcher = mock.patch.object( | |
360 mock_lockfile, func_name, | |
361 new=fake_func) | |
362 func_patcher.start() | |
363 testcase.addCleanup(func_patcher.stop) | |
364 | |
365 | |
366 def setup_pidlockfile_fixtures(testcase, scenario_name=None): | |
367 """ Set up common fixtures for PIDLockFile test cases. | |
368 | |
369 :param testcase: The `TestCase` instance to decorate. | |
370 :param scenario_name: The name of the `PIDLockFile` scenario to use. | |
371 | |
372 Decorate the `testcase` with attributes that are fixtures for test | |
373 cases involving `PIDLockFile` instances.` | |
374 | |
375 """ | |
376 | |
377 setup_pidfile_fixtures(testcase) | |
378 | |
379 for func_name in [ | |
380 'write_pid_to_pidfile', | |
381 'remove_existing_pidfile', | |
382 ]: | |
383 func_patcher = mock.patch.object(lockfile.pidlockfile, func_name) | |
384 func_patcher.start() | |
385 testcase.addCleanup(func_patcher.stop) | |
386 | |
387 | |
388 class TimeoutPIDLockFile_TestCase(scaffold.TestCase): | |
389 """ Test cases for ‘TimeoutPIDLockFile’ class. """ | |
390 | |
391 def setUp(self): | |
392 """ Set up test fixtures. """ | |
393 super(TimeoutPIDLockFile_TestCase, self).setUp() | |
394 | |
395 pidlockfile_scenarios = make_pidlockfile_scenarios() | |
396 self.pidlockfile_scenario = pidlockfile_scenarios['simple'] | |
397 pidfile_path = self.pidlockfile_scenario['pidfile_path'] | |
398 | |
399 for func_name in ['__init__', 'acquire']: | |
400 func_patcher = mock.patch.object( | |
401 lockfile.pidlockfile.PIDLockFile, func_name) | |
402 func_patcher.start() | |
403 self.addCleanup(func_patcher.stop) | |
404 | |
405 self.scenario = { | |
406 'pidfile_path': self.pidlockfile_scenario['pidfile_path'], | |
407 'acquire_timeout': self.getUniqueInteger(), | |
408 } | |
409 | |
410 self.test_kwargs = dict( | |
411 path=self.scenario['pidfile_path'], | |
412 acquire_timeout=self.scenario['acquire_timeout'], | |
413 ) | |
414 self.test_instance = daemon.pidfile.TimeoutPIDLockFile( | |
415 **self.test_kwargs) | |
416 | |
417 def test_inherits_from_pidlockfile(self): | |
418 """ Should inherit from PIDLockFile. """ | |
419 instance = self.test_instance | |
420 self.assertIsInstance(instance, lockfile.pidlockfile.PIDLockFile) | |
421 | |
422 def test_init_has_expected_signature(self): | |
423 """ Should have expected signature for ‘__init__’. """ | |
424 def test_func(self, path, acquire_timeout=None, *args, **kwargs): pass | |
425 test_func.__name__ = str('__init__') | |
426 self.assertFunctionSignatureMatch( | |
427 test_func, | |
428 daemon.pidfile.TimeoutPIDLockFile.__init__) | |
429 | |
430 def test_has_specified_acquire_timeout(self): | |
431 """ Should have specified ‘acquire_timeout’ value. """ | |
432 instance = self.test_instance | |
433 expected_timeout = self.test_kwargs['acquire_timeout'] | |
434 self.assertEqual(expected_timeout, instance.acquire_timeout) | |
435 | |
436 @mock.patch.object( | |
437 lockfile.pidlockfile.PIDLockFile, "__init__", | |
438 autospec=True) | |
439 def test_calls_superclass_init(self, mock_init): | |
440 """ Should call the superclass ‘__init__’. """ | |
441 expected_path = self.test_kwargs['path'] | |
442 instance = daemon.pidfile.TimeoutPIDLockFile(**self.test_kwargs) | |
443 mock_init.assert_called_with(instance, expected_path) | |
444 | |
445 @mock.patch.object( | |
446 lockfile.pidlockfile.PIDLockFile, "acquire", | |
447 autospec=True) | |
448 def test_acquire_uses_specified_timeout(self, mock_func_acquire): | |
449 """ Should call the superclass ‘acquire’ with specified timeout. """ | |
450 instance = self.test_instance | |
451 test_timeout = self.getUniqueInteger() | |
452 expected_timeout = test_timeout | |
453 instance.acquire(test_timeout) | |
454 mock_func_acquire.assert_called_with(instance, expected_timeout) | |
455 | |
456 @mock.patch.object( | |
457 lockfile.pidlockfile.PIDLockFile, "acquire", | |
458 autospec=True) | |
459 def test_acquire_uses_stored_timeout_by_default(self, mock_func_acquire): | |
460 """ Should call superclass ‘acquire’ with stored timeout by default. """ | |
461 instance = self.test_instance | |
462 test_timeout = self.test_kwargs['acquire_timeout'] | |
463 expected_timeout = test_timeout | |
464 instance.acquire() | |
465 mock_func_acquire.assert_called_with(instance, expected_timeout) | |
466 | |
467 | |
468 # Local variables: | |
469 # coding: utf-8 | |
470 # mode: python | |
471 # End: | |
472 # vim: fileencoding=utf-8 filetype=python : |