Mercurial > repos > melissacline > ucsc_xena_platform
comparison python-daemon-2.0.5/test/test_daemon.py @ 33:7ceb967147c3
start xena with no gui
add library files
author | jingchunzhu <jingchunzhu@gmail.com> |
---|---|
date | Wed, 22 Jul 2015 13:24:44 -0700 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
32:63b1ba1e3424 | 33:7ceb967147c3 |
---|---|
1 # -*- coding: utf-8 -*- | |
2 # | |
3 # test/test_daemon.py | |
4 # Part of ‘python-daemon’, an implementation of PEP 3143. | |
5 # | |
6 # Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au> | |
7 # | |
8 # This is free software: you may copy, modify, and/or distribute this work | |
9 # under the terms of the Apache License, version 2.0 as published by the | |
10 # Apache Software Foundation. | |
11 # No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details. | |
12 | |
13 """ Unit test for ‘daemon’ module. | |
14 """ | |
15 | |
16 from __future__ import (absolute_import, unicode_literals) | |
17 | |
18 import os | |
19 import sys | |
20 import tempfile | |
21 import resource | |
22 import errno | |
23 import signal | |
24 import socket | |
25 from types import ModuleType | |
26 import collections | |
27 import functools | |
28 try: | |
29 # Standard library of Python 2.7 and later. | |
30 from io import StringIO | |
31 except ImportError: | |
32 # Standard library of Python 2.6 and earlier. | |
33 from StringIO import StringIO | |
34 | |
35 import mock | |
36 | |
37 from . import scaffold | |
38 from .scaffold import (basestring, unicode) | |
39 from .test_pidfile import ( | |
40 FakeFileDescriptorStringIO, | |
41 setup_pidfile_fixtures, | |
42 ) | |
43 | |
44 import daemon | |
45 | |
46 | |
47 class ModuleExceptions_TestCase(scaffold.Exception_TestCase): | |
48 """ Test cases for module exception classes. """ | |
49 | |
50 scenarios = scaffold.make_exception_scenarios([ | |
51 ('daemon.daemon.DaemonError', dict( | |
52 exc_type = daemon.daemon.DaemonError, | |
53 min_args = 1, | |
54 types = [Exception], | |
55 )), | |
56 ('daemon.daemon.DaemonOSEnvironmentError', dict( | |
57 exc_type = daemon.daemon.DaemonOSEnvironmentError, | |
58 min_args = 1, | |
59 types = [daemon.daemon.DaemonError, OSError], | |
60 )), | |
61 ('daemon.daemon.DaemonProcessDetachError', dict( | |
62 exc_type = daemon.daemon.DaemonProcessDetachError, | |
63 min_args = 1, | |
64 types = [daemon.daemon.DaemonError, OSError], | |
65 )), | |
66 ]) | |
67 | |
68 | |
69 def setup_daemon_context_fixtures(testcase): | |
70 """ Set up common test fixtures for DaemonContext test case. | |
71 | |
72 :param testcase: A ``TestCase`` instance to decorate. | |
73 :return: ``None``. | |
74 | |
75 Decorate the `testcase` with fixtures for tests involving | |
76 `DaemonContext`. | |
77 | |
78 """ | |
79 setup_streams_fixtures(testcase) | |
80 | |
81 setup_pidfile_fixtures(testcase) | |
82 | |
83 testcase.fake_pidfile_path = tempfile.mktemp() | |
84 testcase.mock_pidlockfile = mock.MagicMock() | |
85 testcase.mock_pidlockfile.path = testcase.fake_pidfile_path | |
86 | |
87 testcase.daemon_context_args = dict( | |
88 stdin=testcase.stream_files_by_name['stdin'], | |
89 stdout=testcase.stream_files_by_name['stdout'], | |
90 stderr=testcase.stream_files_by_name['stderr'], | |
91 ) | |
92 testcase.test_instance = daemon.DaemonContext( | |
93 **testcase.daemon_context_args) | |
94 | |
95 fake_default_signal_map = object() | |
96 | |
97 @mock.patch.object( | |
98 daemon.daemon, "is_detach_process_context_required", | |
99 new=(lambda: True)) | |
100 @mock.patch.object( | |
101 daemon.daemon, "make_default_signal_map", | |
102 new=(lambda: fake_default_signal_map)) | |
103 @mock.patch.object(os, "setgid", new=(lambda x: object())) | |
104 @mock.patch.object(os, "setuid", new=(lambda x: object())) | |
105 class DaemonContext_BaseTestCase(scaffold.TestCase): | |
106 """ Base class for DaemonContext test case classes. """ | |
107 | |
108 def setUp(self): | |
109 """ Set up test fixtures. """ | |
110 super(DaemonContext_BaseTestCase, self).setUp() | |
111 | |
112 setup_daemon_context_fixtures(self) | |
113 | |
114 | |
115 class DaemonContext_TestCase(DaemonContext_BaseTestCase): | |
116 """ Test cases for DaemonContext class. """ | |
117 | |
118 def test_instantiate(self): | |
119 """ New instance of DaemonContext should be created. """ | |
120 self.assertIsInstance( | |
121 self.test_instance, daemon.daemon.DaemonContext) | |
122 | |
123 def test_minimum_zero_arguments(self): | |
124 """ Initialiser should not require any arguments. """ | |
125 instance = daemon.daemon.DaemonContext() | |
126 self.assertIsNot(instance, None) | |
127 | |
128 def test_has_specified_chroot_directory(self): | |
129 """ Should have specified chroot_directory option. """ | |
130 args = dict( | |
131 chroot_directory=object(), | |
132 ) | |
133 expected_directory = args['chroot_directory'] | |
134 instance = daemon.daemon.DaemonContext(**args) | |
135 self.assertEqual(expected_directory, instance.chroot_directory) | |
136 | |
137 def test_has_specified_working_directory(self): | |
138 """ Should have specified working_directory option. """ | |
139 args = dict( | |
140 working_directory=object(), | |
141 ) | |
142 expected_directory = args['working_directory'] | |
143 instance = daemon.daemon.DaemonContext(**args) | |
144 self.assertEqual(expected_directory, instance.working_directory) | |
145 | |
146 def test_has_default_working_directory(self): | |
147 """ Should have default working_directory option. """ | |
148 args = dict() | |
149 expected_directory = "/" | |
150 instance = daemon.daemon.DaemonContext(**args) | |
151 self.assertEqual(expected_directory, instance.working_directory) | |
152 | |
153 def test_has_specified_creation_mask(self): | |
154 """ Should have specified umask option. """ | |
155 args = dict( | |
156 umask=object(), | |
157 ) | |
158 expected_mask = args['umask'] | |
159 instance = daemon.daemon.DaemonContext(**args) | |
160 self.assertEqual(expected_mask, instance.umask) | |
161 | |
162 def test_has_default_creation_mask(self): | |
163 """ Should have default umask option. """ | |
164 args = dict() | |
165 expected_mask = 0 | |
166 instance = daemon.daemon.DaemonContext(**args) | |
167 self.assertEqual(expected_mask, instance.umask) | |
168 | |
169 def test_has_specified_uid(self): | |
170 """ Should have specified uid option. """ | |
171 args = dict( | |
172 uid=object(), | |
173 ) | |
174 expected_id = args['uid'] | |
175 instance = daemon.daemon.DaemonContext(**args) | |
176 self.assertEqual(expected_id, instance.uid) | |
177 | |
178 def test_has_derived_uid(self): | |
179 """ Should have uid option derived from process. """ | |
180 args = dict() | |
181 expected_id = os.getuid() | |
182 instance = daemon.daemon.DaemonContext(**args) | |
183 self.assertEqual(expected_id, instance.uid) | |
184 | |
185 def test_has_specified_gid(self): | |
186 """ Should have specified gid option. """ | |
187 args = dict( | |
188 gid=object(), | |
189 ) | |
190 expected_id = args['gid'] | |
191 instance = daemon.daemon.DaemonContext(**args) | |
192 self.assertEqual(expected_id, instance.gid) | |
193 | |
194 def test_has_derived_gid(self): | |
195 """ Should have gid option derived from process. """ | |
196 args = dict() | |
197 expected_id = os.getgid() | |
198 instance = daemon.daemon.DaemonContext(**args) | |
199 self.assertEqual(expected_id, instance.gid) | |
200 | |
201 def test_has_specified_detach_process(self): | |
202 """ Should have specified detach_process option. """ | |
203 args = dict( | |
204 detach_process=object(), | |
205 ) | |
206 expected_value = args['detach_process'] | |
207 instance = daemon.daemon.DaemonContext(**args) | |
208 self.assertEqual(expected_value, instance.detach_process) | |
209 | |
210 def test_has_derived_detach_process(self): | |
211 """ Should have detach_process option derived from environment. """ | |
212 args = dict() | |
213 func = daemon.daemon.is_detach_process_context_required | |
214 expected_value = func() | |
215 instance = daemon.daemon.DaemonContext(**args) | |
216 self.assertEqual(expected_value, instance.detach_process) | |
217 | |
218 def test_has_specified_files_preserve(self): | |
219 """ Should have specified files_preserve option. """ | |
220 args = dict( | |
221 files_preserve=object(), | |
222 ) | |
223 expected_files_preserve = args['files_preserve'] | |
224 instance = daemon.daemon.DaemonContext(**args) | |
225 self.assertEqual(expected_files_preserve, instance.files_preserve) | |
226 | |
227 def test_has_specified_pidfile(self): | |
228 """ Should have the specified pidfile. """ | |
229 args = dict( | |
230 pidfile=object(), | |
231 ) | |
232 expected_pidfile = args['pidfile'] | |
233 instance = daemon.daemon.DaemonContext(**args) | |
234 self.assertEqual(expected_pidfile, instance.pidfile) | |
235 | |
236 def test_has_specified_stdin(self): | |
237 """ Should have specified stdin option. """ | |
238 args = dict( | |
239 stdin=object(), | |
240 ) | |
241 expected_file = args['stdin'] | |
242 instance = daemon.daemon.DaemonContext(**args) | |
243 self.assertEqual(expected_file, instance.stdin) | |
244 | |
245 def test_has_specified_stdout(self): | |
246 """ Should have specified stdout option. """ | |
247 args = dict( | |
248 stdout=object(), | |
249 ) | |
250 expected_file = args['stdout'] | |
251 instance = daemon.daemon.DaemonContext(**args) | |
252 self.assertEqual(expected_file, instance.stdout) | |
253 | |
254 def test_has_specified_stderr(self): | |
255 """ Should have specified stderr option. """ | |
256 args = dict( | |
257 stderr=object(), | |
258 ) | |
259 expected_file = args['stderr'] | |
260 instance = daemon.daemon.DaemonContext(**args) | |
261 self.assertEqual(expected_file, instance.stderr) | |
262 | |
263 def test_has_specified_signal_map(self): | |
264 """ Should have specified signal_map option. """ | |
265 args = dict( | |
266 signal_map=object(), | |
267 ) | |
268 expected_signal_map = args['signal_map'] | |
269 instance = daemon.daemon.DaemonContext(**args) | |
270 self.assertEqual(expected_signal_map, instance.signal_map) | |
271 | |
272 def test_has_derived_signal_map(self): | |
273 """ Should have signal_map option derived from system. """ | |
274 args = dict() | |
275 expected_signal_map = daemon.daemon.make_default_signal_map() | |
276 instance = daemon.daemon.DaemonContext(**args) | |
277 self.assertEqual(expected_signal_map, instance.signal_map) | |
278 | |
279 | |
280 class DaemonContext_is_open_TestCase(DaemonContext_BaseTestCase): | |
281 """ Test cases for DaemonContext.is_open property. """ | |
282 | |
283 def test_begin_false(self): | |
284 """ Initial value of is_open should be False. """ | |
285 instance = self.test_instance | |
286 self.assertEqual(False, instance.is_open) | |
287 | |
288 def test_write_fails(self): | |
289 """ Writing to is_open should fail. """ | |
290 instance = self.test_instance | |
291 self.assertRaises( | |
292 AttributeError, | |
293 setattr, instance, 'is_open', object()) | |
294 | |
295 | |
296 class DaemonContext_open_TestCase(DaemonContext_BaseTestCase): | |
297 """ Test cases for DaemonContext.open method. """ | |
298 | |
299 def setUp(self): | |
300 """ Set up test fixtures. """ | |
301 super(DaemonContext_open_TestCase, self).setUp() | |
302 | |
303 self.test_instance._is_open = False | |
304 | |
305 self.mock_module_daemon = mock.MagicMock() | |
306 daemon_func_patchers = dict( | |
307 (func_name, mock.patch.object( | |
308 daemon.daemon, func_name)) | |
309 for func_name in [ | |
310 "detach_process_context", | |
311 "change_working_directory", | |
312 "change_root_directory", | |
313 "change_file_creation_mask", | |
314 "change_process_owner", | |
315 "prevent_core_dump", | |
316 "close_all_open_files", | |
317 "redirect_stream", | |
318 "set_signal_handlers", | |
319 "register_atexit_function", | |
320 ]) | |
321 for (func_name, patcher) in daemon_func_patchers.items(): | |
322 mock_func = patcher.start() | |
323 self.addCleanup(patcher.stop) | |
324 self.mock_module_daemon.attach_mock(mock_func, func_name) | |
325 | |
326 self.mock_module_daemon.attach_mock(mock.Mock(), 'DaemonContext') | |
327 | |
328 self.test_files_preserve_fds = object() | |
329 self.test_signal_handler_map = object() | |
330 daemoncontext_method_return_values = { | |
331 '_get_exclude_file_descriptors': | |
332 self.test_files_preserve_fds, | |
333 '_make_signal_handler_map': | |
334 self.test_signal_handler_map, | |
335 } | |
336 daemoncontext_func_patchers = dict( | |
337 (func_name, mock.patch.object( | |
338 daemon.daemon.DaemonContext, | |
339 func_name, | |
340 return_value=return_value)) | |
341 for (func_name, return_value) in | |
342 daemoncontext_method_return_values.items()) | |
343 for (func_name, patcher) in daemoncontext_func_patchers.items(): | |
344 mock_func = patcher.start() | |
345 self.addCleanup(patcher.stop) | |
346 self.mock_module_daemon.DaemonContext.attach_mock( | |
347 mock_func, func_name) | |
348 | |
349 def test_performs_steps_in_expected_sequence(self): | |
350 """ Should perform daemonisation steps in expected sequence. """ | |
351 instance = self.test_instance | |
352 instance.chroot_directory = object() | |
353 instance.detach_process = True | |
354 instance.pidfile = self.mock_pidlockfile | |
355 self.mock_module_daemon.attach_mock( | |
356 self.mock_pidlockfile, 'pidlockfile') | |
357 expected_calls = [ | |
358 mock.call.change_root_directory(mock.ANY), | |
359 mock.call.prevent_core_dump(), | |
360 mock.call.change_file_creation_mask(mock.ANY), | |
361 mock.call.change_working_directory(mock.ANY), | |
362 mock.call.change_process_owner(mock.ANY, mock.ANY), | |
363 mock.call.detach_process_context(), | |
364 mock.call.DaemonContext._make_signal_handler_map(), | |
365 mock.call.set_signal_handlers(mock.ANY), | |
366 mock.call.DaemonContext._get_exclude_file_descriptors(), | |
367 mock.call.close_all_open_files(exclude=mock.ANY), | |
368 mock.call.redirect_stream(mock.ANY, mock.ANY), | |
369 mock.call.redirect_stream(mock.ANY, mock.ANY), | |
370 mock.call.redirect_stream(mock.ANY, mock.ANY), | |
371 mock.call.pidlockfile.__enter__(), | |
372 mock.call.register_atexit_function(mock.ANY), | |
373 ] | |
374 instance.open() | |
375 self.mock_module_daemon.assert_has_calls(expected_calls) | |
376 | |
377 def test_returns_immediately_if_is_open(self): | |
378 """ Should return immediately if is_open property is true. """ | |
379 instance = self.test_instance | |
380 instance._is_open = True | |
381 instance.open() | |
382 self.assertEqual(0, len(self.mock_module_daemon.mock_calls)) | |
383 | |
384 def test_changes_root_directory_to_chroot_directory(self): | |
385 """ Should change root directory to `chroot_directory` option. """ | |
386 instance = self.test_instance | |
387 chroot_directory = object() | |
388 instance.chroot_directory = chroot_directory | |
389 instance.open() | |
390 self.mock_module_daemon.change_root_directory.assert_called_with( | |
391 chroot_directory) | |
392 | |
393 def test_omits_chroot_if_no_chroot_directory(self): | |
394 """ Should omit changing root directory if no `chroot_directory`. """ | |
395 instance = self.test_instance | |
396 instance.chroot_directory = None | |
397 instance.open() | |
398 self.assertFalse(self.mock_module_daemon.change_root_directory.called) | |
399 | |
400 def test_prevents_core_dump(self): | |
401 """ Should request prevention of core dumps. """ | |
402 instance = self.test_instance | |
403 instance.open() | |
404 self.mock_module_daemon.prevent_core_dump.assert_called_with() | |
405 | |
406 def test_omits_prevent_core_dump_if_prevent_core_false(self): | |
407 """ Should omit preventing core dumps if `prevent_core` is false. """ | |
408 instance = self.test_instance | |
409 instance.prevent_core = False | |
410 instance.open() | |
411 self.assertFalse(self.mock_module_daemon.prevent_core_dump.called) | |
412 | |
413 def test_closes_open_files(self): | |
414 """ Should close all open files, excluding `files_preserve`. """ | |
415 instance = self.test_instance | |
416 expected_exclude = self.test_files_preserve_fds | |
417 instance.open() | |
418 self.mock_module_daemon.close_all_open_files.assert_called_with( | |
419 exclude=expected_exclude) | |
420 | |
421 def test_changes_directory_to_working_directory(self): | |
422 """ Should change current directory to `working_directory` option. """ | |
423 instance = self.test_instance | |
424 working_directory = object() | |
425 instance.working_directory = working_directory | |
426 instance.open() | |
427 self.mock_module_daemon.change_working_directory.assert_called_with( | |
428 working_directory) | |
429 | |
430 def test_changes_creation_mask_to_umask(self): | |
431 """ Should change file creation mask to `umask` option. """ | |
432 instance = self.test_instance | |
433 umask = object() | |
434 instance.umask = umask | |
435 instance.open() | |
436 self.mock_module_daemon.change_file_creation_mask.assert_called_with( | |
437 umask) | |
438 | |
439 def test_changes_owner_to_specified_uid_and_gid(self): | |
440 """ Should change process UID and GID to `uid` and `gid` options. """ | |
441 instance = self.test_instance | |
442 uid = object() | |
443 gid = object() | |
444 instance.uid = uid | |
445 instance.gid = gid | |
446 instance.open() | |
447 self.mock_module_daemon.change_process_owner.assert_called_with( | |
448 uid, gid) | |
449 | |
450 def test_detaches_process_context(self): | |
451 """ Should request detach of process context. """ | |
452 instance = self.test_instance | |
453 instance.open() | |
454 self.mock_module_daemon.detach_process_context.assert_called_with() | |
455 | |
456 def test_omits_process_detach_if_not_required(self): | |
457 """ Should omit detach of process context if not required. """ | |
458 instance = self.test_instance | |
459 instance.detach_process = False | |
460 instance.open() | |
461 self.assertFalse(self.mock_module_daemon.detach_process_context.called) | |
462 | |
463 def test_sets_signal_handlers_from_signal_map(self): | |
464 """ Should set signal handlers according to `signal_map`. """ | |
465 instance = self.test_instance | |
466 instance.signal_map = object() | |
467 expected_signal_handler_map = self.test_signal_handler_map | |
468 instance.open() | |
469 self.mock_module_daemon.set_signal_handlers.assert_called_with( | |
470 expected_signal_handler_map) | |
471 | |
472 def test_redirects_standard_streams(self): | |
473 """ Should request redirection of standard stream files. """ | |
474 instance = self.test_instance | |
475 (system_stdin, system_stdout, system_stderr) = ( | |
476 sys.stdin, sys.stdout, sys.stderr) | |
477 (target_stdin, target_stdout, target_stderr) = ( | |
478 self.stream_files_by_name[name] | |
479 for name in ['stdin', 'stdout', 'stderr']) | |
480 expected_calls = [ | |
481 mock.call(system_stdin, target_stdin), | |
482 mock.call(system_stdout, target_stdout), | |
483 mock.call(system_stderr, target_stderr), | |
484 ] | |
485 instance.open() | |
486 self.mock_module_daemon.redirect_stream.assert_has_calls( | |
487 expected_calls, any_order=True) | |
488 | |
489 def test_enters_pidfile_context(self): | |
490 """ Should enter the PID file context manager. """ | |
491 instance = self.test_instance | |
492 instance.pidfile = self.mock_pidlockfile | |
493 instance.open() | |
494 self.mock_pidlockfile.__enter__.assert_called_with() | |
495 | |
496 def test_sets_is_open_true(self): | |
497 """ Should set the `is_open` property to True. """ | |
498 instance = self.test_instance | |
499 instance.open() | |
500 self.assertEqual(True, instance.is_open) | |
501 | |
502 def test_registers_close_method_for_atexit(self): | |
503 """ Should register the `close` method for atexit processing. """ | |
504 instance = self.test_instance | |
505 close_method = instance.close | |
506 instance.open() | |
507 self.mock_module_daemon.register_atexit_function.assert_called_with( | |
508 close_method) | |
509 | |
510 | |
511 class DaemonContext_close_TestCase(DaemonContext_BaseTestCase): | |
512 """ Test cases for DaemonContext.close method. """ | |
513 | |
514 def setUp(self): | |
515 """ Set up test fixtures. """ | |
516 super(DaemonContext_close_TestCase, self).setUp() | |
517 | |
518 self.test_instance._is_open = True | |
519 | |
520 def test_returns_immediately_if_not_is_open(self): | |
521 """ Should return immediately if is_open property is false. """ | |
522 instance = self.test_instance | |
523 instance._is_open = False | |
524 instance.pidfile = object() | |
525 instance.close() | |
526 self.assertFalse(self.mock_pidlockfile.__exit__.called) | |
527 | |
528 def test_exits_pidfile_context(self): | |
529 """ Should exit the PID file context manager. """ | |
530 instance = self.test_instance | |
531 instance.pidfile = self.mock_pidlockfile | |
532 instance.close() | |
533 self.mock_pidlockfile.__exit__.assert_called_with(None, None, None) | |
534 | |
535 def test_returns_none(self): | |
536 """ Should return None. """ | |
537 instance = self.test_instance | |
538 expected_result = None | |
539 result = instance.close() | |
540 self.assertIs(result, expected_result) | |
541 | |
542 def test_sets_is_open_false(self): | |
543 """ Should set the `is_open` property to False. """ | |
544 instance = self.test_instance | |
545 instance.close() | |
546 self.assertEqual(False, instance.is_open) | |
547 | |
548 | |
549 @mock.patch.object(daemon.daemon.DaemonContext, "open") | |
550 class DaemonContext_context_manager_enter_TestCase(DaemonContext_BaseTestCase): | |
551 """ Test cases for DaemonContext.__enter__ method. """ | |
552 | |
553 def test_opens_daemon_context(self, mock_func_daemoncontext_open): | |
554 """ Should open the DaemonContext. """ | |
555 instance = self.test_instance | |
556 instance.__enter__() | |
557 mock_func_daemoncontext_open.assert_called_with() | |
558 | |
559 def test_returns_self_instance(self, mock_func_daemoncontext_open): | |
560 """ Should return DaemonContext instance. """ | |
561 instance = self.test_instance | |
562 expected_result = instance | |
563 result = instance.__enter__() | |
564 self.assertIs(result, expected_result) | |
565 | |
566 | |
567 @mock.patch.object(daemon.daemon.DaemonContext, "close") | |
568 class DaemonContext_context_manager_exit_TestCase(DaemonContext_BaseTestCase): | |
569 """ Test cases for DaemonContext.__exit__ method. """ | |
570 | |
571 def setUp(self): | |
572 """ Set up test fixtures. """ | |
573 super(DaemonContext_context_manager_exit_TestCase, self).setUp() | |
574 | |
575 self.test_args = dict( | |
576 exc_type=object(), | |
577 exc_value=object(), | |
578 traceback=object(), | |
579 ) | |
580 | |
581 def test_closes_daemon_context(self, mock_func_daemoncontext_close): | |
582 """ Should close the DaemonContext. """ | |
583 instance = self.test_instance | |
584 args = self.test_args | |
585 instance.__exit__(**args) | |
586 mock_func_daemoncontext_close.assert_called_with() | |
587 | |
588 def test_returns_none(self, mock_func_daemoncontext_close): | |
589 """ Should return None, indicating exception was not handled. """ | |
590 instance = self.test_instance | |
591 args = self.test_args | |
592 expected_result = None | |
593 result = instance.__exit__(**args) | |
594 self.assertIs(result, expected_result) | |
595 | |
596 | |
597 class DaemonContext_terminate_TestCase(DaemonContext_BaseTestCase): | |
598 """ Test cases for DaemonContext.terminate method. """ | |
599 | |
600 def setUp(self): | |
601 """ Set up test fixtures. """ | |
602 super(DaemonContext_terminate_TestCase, self).setUp() | |
603 | |
604 self.test_signal = signal.SIGTERM | |
605 self.test_frame = None | |
606 self.test_args = (self.test_signal, self.test_frame) | |
607 | |
608 def test_raises_system_exit(self): | |
609 """ Should raise SystemExit. """ | |
610 instance = self.test_instance | |
611 args = self.test_args | |
612 expected_exception = SystemExit | |
613 self.assertRaises( | |
614 expected_exception, | |
615 instance.terminate, *args) | |
616 | |
617 def test_exception_message_contains_signal_number(self): | |
618 """ Should raise exception with a message containing signal number. """ | |
619 instance = self.test_instance | |
620 args = self.test_args | |
621 signal_number = self.test_signal | |
622 expected_exception = SystemExit | |
623 exc = self.assertRaises( | |
624 expected_exception, | |
625 instance.terminate, *args) | |
626 self.assertIn(unicode(signal_number), unicode(exc)) | |
627 | |
628 | |
629 class DaemonContext_get_exclude_file_descriptors_TestCase( | |
630 DaemonContext_BaseTestCase): | |
631 """ Test cases for DaemonContext._get_exclude_file_descriptors function. """ | |
632 | |
633 def setUp(self): | |
634 """ Set up test fixtures. """ | |
635 super( | |
636 DaemonContext_get_exclude_file_descriptors_TestCase, | |
637 self).setUp() | |
638 | |
639 self.test_files = { | |
640 2: FakeFileDescriptorStringIO(), | |
641 5: 5, | |
642 11: FakeFileDescriptorStringIO(), | |
643 17: None, | |
644 23: FakeFileDescriptorStringIO(), | |
645 37: 37, | |
646 42: FakeFileDescriptorStringIO(), | |
647 } | |
648 for (fileno, item) in self.test_files.items(): | |
649 if hasattr(item, '_fileno'): | |
650 item._fileno = fileno | |
651 self.test_file_descriptors = set( | |
652 fd for (fd, item) in self.test_files.items() | |
653 if item is not None) | |
654 self.test_file_descriptors.update( | |
655 self.stream_files_by_name[name].fileno() | |
656 for name in ['stdin', 'stdout', 'stderr'] | |
657 ) | |
658 | |
659 def test_returns_expected_file_descriptors(self): | |
660 """ Should return expected set of file descriptors. """ | |
661 instance = self.test_instance | |
662 instance.files_preserve = list(self.test_files.values()) | |
663 expected_result = self.test_file_descriptors | |
664 result = instance._get_exclude_file_descriptors() | |
665 self.assertEqual(expected_result, result) | |
666 | |
667 def test_returns_stream_redirects_if_no_files_preserve(self): | |
668 """ Should return only stream redirects if no files_preserve. """ | |
669 instance = self.test_instance | |
670 instance.files_preserve = None | |
671 expected_result = set( | |
672 stream.fileno() | |
673 for stream in self.stream_files_by_name.values()) | |
674 result = instance._get_exclude_file_descriptors() | |
675 self.assertEqual(expected_result, result) | |
676 | |
677 def test_returns_empty_set_if_no_files(self): | |
678 """ Should return empty set if no file options. """ | |
679 instance = self.test_instance | |
680 for name in ['files_preserve', 'stdin', 'stdout', 'stderr']: | |
681 setattr(instance, name, None) | |
682 expected_result = set() | |
683 result = instance._get_exclude_file_descriptors() | |
684 self.assertEqual(expected_result, result) | |
685 | |
686 def test_omits_non_file_streams(self): | |
687 """ Should omit non-file stream attributes. """ | |
688 instance = self.test_instance | |
689 instance.files_preserve = list(self.test_files.values()) | |
690 stream_files = self.stream_files_by_name | |
691 expected_result = self.test_file_descriptors.copy() | |
692 for (pseudo_stream_name, pseudo_stream) in stream_files.items(): | |
693 test_non_file_object = object() | |
694 setattr(instance, pseudo_stream_name, test_non_file_object) | |
695 stream_fd = pseudo_stream.fileno() | |
696 expected_result.discard(stream_fd) | |
697 result = instance._get_exclude_file_descriptors() | |
698 self.assertEqual(expected_result, result) | |
699 | |
700 def test_includes_verbatim_streams_without_file_descriptor(self): | |
701 """ Should include verbatim any stream without a file descriptor. """ | |
702 instance = self.test_instance | |
703 instance.files_preserve = list(self.test_files.values()) | |
704 stream_files = self.stream_files_by_name | |
705 mock_fileno_method = mock.MagicMock( | |
706 spec=sys.__stdin__.fileno, | |
707 side_effect=ValueError) | |
708 expected_result = self.test_file_descriptors.copy() | |
709 for (pseudo_stream_name, pseudo_stream) in stream_files.items(): | |
710 test_non_fd_stream = StringIO() | |
711 if not hasattr(test_non_fd_stream, 'fileno'): | |
712 # Python < 3 StringIO doesn't have ‘fileno’ at all. | |
713 # Add a method which raises an exception. | |
714 test_non_fd_stream.fileno = mock_fileno_method | |
715 setattr(instance, pseudo_stream_name, test_non_fd_stream) | |
716 stream_fd = pseudo_stream.fileno() | |
717 expected_result.discard(stream_fd) | |
718 expected_result.add(test_non_fd_stream) | |
719 result = instance._get_exclude_file_descriptors() | |
720 self.assertEqual(expected_result, result) | |
721 | |
722 def test_omits_none_streams(self): | |
723 """ Should omit any stream attribute which is None. """ | |
724 instance = self.test_instance | |
725 instance.files_preserve = list(self.test_files.values()) | |
726 stream_files = self.stream_files_by_name | |
727 expected_result = self.test_file_descriptors.copy() | |
728 for (pseudo_stream_name, pseudo_stream) in stream_files.items(): | |
729 setattr(instance, pseudo_stream_name, None) | |
730 stream_fd = pseudo_stream.fileno() | |
731 expected_result.discard(stream_fd) | |
732 result = instance._get_exclude_file_descriptors() | |
733 self.assertEqual(expected_result, result) | |
734 | |
735 | |
736 class DaemonContext_make_signal_handler_TestCase(DaemonContext_BaseTestCase): | |
737 """ Test cases for DaemonContext._make_signal_handler function. """ | |
738 | |
739 def test_returns_ignore_for_none(self): | |
740 """ Should return SIG_IGN when None handler specified. """ | |
741 instance = self.test_instance | |
742 target = None | |
743 expected_result = signal.SIG_IGN | |
744 result = instance._make_signal_handler(target) | |
745 self.assertEqual(expected_result, result) | |
746 | |
747 def test_returns_method_for_name(self): | |
748 """ Should return method of DaemonContext when name specified. """ | |
749 instance = self.test_instance | |
750 target = 'terminate' | |
751 expected_result = instance.terminate | |
752 result = instance._make_signal_handler(target) | |
753 self.assertEqual(expected_result, result) | |
754 | |
755 def test_raises_error_for_unknown_name(self): | |
756 """ Should raise AttributeError for unknown method name. """ | |
757 instance = self.test_instance | |
758 target = 'b0gUs' | |
759 expected_error = AttributeError | |
760 self.assertRaises( | |
761 expected_error, | |
762 instance._make_signal_handler, target) | |
763 | |
764 def test_returns_object_for_object(self): | |
765 """ Should return same object for any other object. """ | |
766 instance = self.test_instance | |
767 target = object() | |
768 expected_result = target | |
769 result = instance._make_signal_handler(target) | |
770 self.assertEqual(expected_result, result) | |
771 | |
772 | |
773 class DaemonContext_make_signal_handler_map_TestCase( | |
774 DaemonContext_BaseTestCase): | |
775 """ Test cases for DaemonContext._make_signal_handler_map function. """ | |
776 | |
777 def setUp(self): | |
778 """ Set up test fixtures. """ | |
779 super(DaemonContext_make_signal_handler_map_TestCase, self).setUp() | |
780 | |
781 self.test_instance.signal_map = { | |
782 object(): object(), | |
783 object(): object(), | |
784 object(): object(), | |
785 } | |
786 | |
787 self.test_signal_handlers = dict( | |
788 (key, object()) | |
789 for key in self.test_instance.signal_map.values()) | |
790 self.test_signal_handler_map = dict( | |
791 (key, self.test_signal_handlers[target]) | |
792 for (key, target) in self.test_instance.signal_map.items()) | |
793 | |
794 def fake_make_signal_handler(target): | |
795 return self.test_signal_handlers[target] | |
796 | |
797 func_patcher_make_signal_handler = mock.patch.object( | |
798 daemon.daemon.DaemonContext, "_make_signal_handler", | |
799 side_effect=fake_make_signal_handler) | |
800 self.mock_func_make_signal_handler = ( | |
801 func_patcher_make_signal_handler.start()) | |
802 self.addCleanup(func_patcher_make_signal_handler.stop) | |
803 | |
804 def test_returns_constructed_signal_handler_items(self): | |
805 """ Should return items as constructed via make_signal_handler. """ | |
806 instance = self.test_instance | |
807 expected_result = self.test_signal_handler_map | |
808 result = instance._make_signal_handler_map() | |
809 self.assertEqual(expected_result, result) | |
810 | |
811 | |
812 try: | |
813 FileNotFoundError | |
814 except NameError: | |
815 # Python 2 uses IOError. | |
816 FileNotFoundError = functools.partial(IOError, errno.ENOENT) | |
817 | |
818 | |
819 @mock.patch.object(os, "chdir") | |
820 class change_working_directory_TestCase(scaffold.TestCase): | |
821 """ Test cases for change_working_directory function. """ | |
822 | |
823 def setUp(self): | |
824 """ Set up test fixtures. """ | |
825 super(change_working_directory_TestCase, self).setUp() | |
826 | |
827 self.test_directory = object() | |
828 self.test_args = dict( | |
829 directory=self.test_directory, | |
830 ) | |
831 | |
832 def test_changes_working_directory_to_specified_directory( | |
833 self, | |
834 mock_func_os_chdir): | |
835 """ Should change working directory to specified directory. """ | |
836 args = self.test_args | |
837 directory = self.test_directory | |
838 daemon.daemon.change_working_directory(**args) | |
839 mock_func_os_chdir.assert_called_with(directory) | |
840 | |
841 def test_raises_daemon_error_on_os_error( | |
842 self, | |
843 mock_func_os_chdir): | |
844 """ Should raise a DaemonError on receiving an IOError. """ | |
845 args = self.test_args | |
846 test_error = FileNotFoundError("No such directory") | |
847 mock_func_os_chdir.side_effect = test_error | |
848 expected_error = daemon.daemon.DaemonOSEnvironmentError | |
849 exc = self.assertRaises( | |
850 expected_error, | |
851 daemon.daemon.change_working_directory, **args) | |
852 self.assertEqual(test_error, exc.__cause__) | |
853 | |
854 def test_error_message_contains_original_error_message( | |
855 self, | |
856 mock_func_os_chdir): | |
857 """ Should raise a DaemonError with original message. """ | |
858 args = self.test_args | |
859 test_error = FileNotFoundError("No such directory") | |
860 mock_func_os_chdir.side_effect = test_error | |
861 expected_error = daemon.daemon.DaemonOSEnvironmentError | |
862 exc = self.assertRaises( | |
863 expected_error, | |
864 daemon.daemon.change_working_directory, **args) | |
865 self.assertIn(unicode(test_error), unicode(exc)) | |
866 | |
867 | |
868 @mock.patch.object(os, "chroot") | |
869 @mock.patch.object(os, "chdir") | |
870 class change_root_directory_TestCase(scaffold.TestCase): | |
871 """ Test cases for change_root_directory function. """ | |
872 | |
873 def setUp(self): | |
874 """ Set up test fixtures. """ | |
875 super(change_root_directory_TestCase, self).setUp() | |
876 | |
877 self.test_directory = object() | |
878 self.test_args = dict( | |
879 directory=self.test_directory, | |
880 ) | |
881 | |
882 def test_changes_working_directory_to_specified_directory( | |
883 self, | |
884 mock_func_os_chdir, mock_func_os_chroot): | |
885 """ Should change working directory to specified directory. """ | |
886 args = self.test_args | |
887 directory = self.test_directory | |
888 daemon.daemon.change_root_directory(**args) | |
889 mock_func_os_chdir.assert_called_with(directory) | |
890 | |
891 def test_changes_root_directory_to_specified_directory( | |
892 self, | |
893 mock_func_os_chdir, mock_func_os_chroot): | |
894 """ Should change root directory to specified directory. """ | |
895 args = self.test_args | |
896 directory = self.test_directory | |
897 daemon.daemon.change_root_directory(**args) | |
898 mock_func_os_chroot.assert_called_with(directory) | |
899 | |
900 def test_raises_daemon_error_on_os_error_from_chdir( | |
901 self, | |
902 mock_func_os_chdir, mock_func_os_chroot): | |
903 """ Should raise a DaemonError on receiving an IOError from chdir. """ | |
904 args = self.test_args | |
905 test_error = FileNotFoundError("No such directory") | |
906 mock_func_os_chdir.side_effect = test_error | |
907 expected_error = daemon.daemon.DaemonOSEnvironmentError | |
908 exc = self.assertRaises( | |
909 expected_error, | |
910 daemon.daemon.change_root_directory, **args) | |
911 self.assertEqual(test_error, exc.__cause__) | |
912 | |
913 def test_raises_daemon_error_on_os_error_from_chroot( | |
914 self, | |
915 mock_func_os_chdir, mock_func_os_chroot): | |
916 """ Should raise a DaemonError on receiving an OSError from chroot. """ | |
917 args = self.test_args | |
918 test_error = OSError(errno.EPERM, "No chroot for you!") | |
919 mock_func_os_chroot.side_effect = test_error | |
920 expected_error = daemon.daemon.DaemonOSEnvironmentError | |
921 exc = self.assertRaises( | |
922 expected_error, | |
923 daemon.daemon.change_root_directory, **args) | |
924 self.assertEqual(test_error, exc.__cause__) | |
925 | |
926 def test_error_message_contains_original_error_message( | |
927 self, | |
928 mock_func_os_chdir, mock_func_os_chroot): | |
929 """ Should raise a DaemonError with original message. """ | |
930 args = self.test_args | |
931 test_error = FileNotFoundError("No such directory") | |
932 mock_func_os_chdir.side_effect = test_error | |
933 expected_error = daemon.daemon.DaemonOSEnvironmentError | |
934 exc = self.assertRaises( | |
935 expected_error, | |
936 daemon.daemon.change_root_directory, **args) | |
937 self.assertIn(unicode(test_error), unicode(exc)) | |
938 | |
939 | |
940 @mock.patch.object(os, "umask") | |
941 class change_file_creation_mask_TestCase(scaffold.TestCase): | |
942 """ Test cases for change_file_creation_mask function. """ | |
943 | |
944 def setUp(self): | |
945 """ Set up test fixtures. """ | |
946 super(change_file_creation_mask_TestCase, self).setUp() | |
947 | |
948 self.test_mask = object() | |
949 self.test_args = dict( | |
950 mask=self.test_mask, | |
951 ) | |
952 | |
953 def test_changes_umask_to_specified_mask(self, mock_func_os_umask): | |
954 """ Should change working directory to specified directory. """ | |
955 args = self.test_args | |
956 mask = self.test_mask | |
957 daemon.daemon.change_file_creation_mask(**args) | |
958 mock_func_os_umask.assert_called_with(mask) | |
959 | |
960 def test_raises_daemon_error_on_os_error_from_chdir( | |
961 self, | |
962 mock_func_os_umask): | |
963 """ Should raise a DaemonError on receiving an OSError from umask. """ | |
964 args = self.test_args | |
965 test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?") | |
966 mock_func_os_umask.side_effect = test_error | |
967 expected_error = daemon.daemon.DaemonOSEnvironmentError | |
968 exc = self.assertRaises( | |
969 expected_error, | |
970 daemon.daemon.change_file_creation_mask, **args) | |
971 self.assertEqual(test_error, exc.__cause__) | |
972 | |
973 def test_error_message_contains_original_error_message( | |
974 self, | |
975 mock_func_os_umask): | |
976 """ Should raise a DaemonError with original message. """ | |
977 args = self.test_args | |
978 test_error = FileNotFoundError("No such directory") | |
979 mock_func_os_umask.side_effect = test_error | |
980 expected_error = daemon.daemon.DaemonOSEnvironmentError | |
981 exc = self.assertRaises( | |
982 expected_error, | |
983 daemon.daemon.change_file_creation_mask, **args) | |
984 self.assertIn(unicode(test_error), unicode(exc)) | |
985 | |
986 | |
987 @mock.patch.object(os, "setgid") | |
988 @mock.patch.object(os, "setuid") | |
989 class change_process_owner_TestCase(scaffold.TestCase): | |
990 """ Test cases for change_process_owner function. """ | |
991 | |
992 def setUp(self): | |
993 """ Set up test fixtures. """ | |
994 super(change_process_owner_TestCase, self).setUp() | |
995 | |
996 self.test_uid = object() | |
997 self.test_gid = object() | |
998 self.test_args = dict( | |
999 uid=self.test_uid, | |
1000 gid=self.test_gid, | |
1001 ) | |
1002 | |
1003 def test_changes_gid_and_uid_in_order( | |
1004 self, | |
1005 mock_func_os_setuid, mock_func_os_setgid): | |
1006 """ Should change process GID and UID in correct order. | |
1007 | |
1008 Since the process requires appropriate privilege to use | |
1009 either of `setuid` or `setgid`, changing the UID must be | |
1010 done last. | |
1011 | |
1012 """ | |
1013 args = self.test_args | |
1014 daemon.daemon.change_process_owner(**args) | |
1015 mock_func_os_setuid.assert_called() | |
1016 mock_func_os_setgid.assert_called() | |
1017 | |
1018 def test_changes_group_id_to_gid( | |
1019 self, | |
1020 mock_func_os_setuid, mock_func_os_setgid): | |
1021 """ Should change process GID to specified value. """ | |
1022 args = self.test_args | |
1023 gid = self.test_gid | |
1024 daemon.daemon.change_process_owner(**args) | |
1025 mock_func_os_setgid.assert_called(gid) | |
1026 | |
1027 def test_changes_user_id_to_uid( | |
1028 self, | |
1029 mock_func_os_setuid, mock_func_os_setgid): | |
1030 """ Should change process UID to specified value. """ | |
1031 args = self.test_args | |
1032 uid = self.test_uid | |
1033 daemon.daemon.change_process_owner(**args) | |
1034 mock_func_os_setuid.assert_called(uid) | |
1035 | |
1036 def test_raises_daemon_error_on_os_error_from_setgid( | |
1037 self, | |
1038 mock_func_os_setuid, mock_func_os_setgid): | |
1039 """ Should raise a DaemonError on receiving an OSError from setgid. """ | |
1040 args = self.test_args | |
1041 test_error = OSError(errno.EPERM, "No switching for you!") | |
1042 mock_func_os_setgid.side_effect = test_error | |
1043 expected_error = daemon.daemon.DaemonOSEnvironmentError | |
1044 exc = self.assertRaises( | |
1045 expected_error, | |
1046 daemon.daemon.change_process_owner, **args) | |
1047 self.assertEqual(test_error, exc.__cause__) | |
1048 | |
1049 def test_raises_daemon_error_on_os_error_from_setuid( | |
1050 self, | |
1051 mock_func_os_setuid, mock_func_os_setgid): | |
1052 """ Should raise a DaemonError on receiving an OSError from setuid. """ | |
1053 args = self.test_args | |
1054 test_error = OSError(errno.EPERM, "No switching for you!") | |
1055 mock_func_os_setuid.side_effect = test_error | |
1056 expected_error = daemon.daemon.DaemonOSEnvironmentError | |
1057 exc = self.assertRaises( | |
1058 expected_error, | |
1059 daemon.daemon.change_process_owner, **args) | |
1060 self.assertEqual(test_error, exc.__cause__) | |
1061 | |
1062 def test_error_message_contains_original_error_message( | |
1063 self, | |
1064 mock_func_os_setuid, mock_func_os_setgid): | |
1065 """ Should raise a DaemonError with original message. """ | |
1066 args = self.test_args | |
1067 test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?") | |
1068 mock_func_os_setuid.side_effect = test_error | |
1069 expected_error = daemon.daemon.DaemonOSEnvironmentError | |
1070 exc = self.assertRaises( | |
1071 expected_error, | |
1072 daemon.daemon.change_process_owner, **args) | |
1073 self.assertIn(unicode(test_error), unicode(exc)) | |
1074 | |
1075 | |
1076 RLimitResult = collections.namedtuple('RLimitResult', ['soft', 'hard']) | |
1077 | |
1078 fake_RLIMIT_CORE = object() | |
1079 | |
1080 @mock.patch.object(resource, "RLIMIT_CORE", new=fake_RLIMIT_CORE) | |
1081 @mock.patch.object(resource, "setrlimit", side_effect=(lambda x, y: None)) | |
1082 @mock.patch.object(resource, "getrlimit", side_effect=(lambda x: None)) | |
1083 class prevent_core_dump_TestCase(scaffold.TestCase): | |
1084 """ Test cases for prevent_core_dump function. """ | |
1085 | |
1086 def setUp(self): | |
1087 """ Set up test fixtures. """ | |
1088 super(prevent_core_dump_TestCase, self).setUp() | |
1089 | |
1090 def test_sets_core_limit_to_zero( | |
1091 self, | |
1092 mock_func_resource_getrlimit, mock_func_resource_setrlimit): | |
1093 """ Should set the RLIMIT_CORE resource to zero. """ | |
1094 expected_resource = fake_RLIMIT_CORE | |
1095 expected_limit = tuple(RLimitResult(soft=0, hard=0)) | |
1096 daemon.daemon.prevent_core_dump() | |
1097 mock_func_resource_getrlimit.assert_called_with(expected_resource) | |
1098 mock_func_resource_setrlimit.assert_called_with( | |
1099 expected_resource, expected_limit) | |
1100 | |
1101 def test_raises_error_when_no_core_resource( | |
1102 self, | |
1103 mock_func_resource_getrlimit, mock_func_resource_setrlimit): | |
1104 """ Should raise DaemonError if no RLIMIT_CORE resource. """ | |
1105 test_error = ValueError("Bogus platform doesn't have RLIMIT_CORE") | |
1106 def fake_getrlimit(res): | |
1107 if res == resource.RLIMIT_CORE: | |
1108 raise test_error | |
1109 else: | |
1110 return None | |
1111 mock_func_resource_getrlimit.side_effect = fake_getrlimit | |
1112 expected_error = daemon.daemon.DaemonOSEnvironmentError | |
1113 exc = self.assertRaises( | |
1114 expected_error, | |
1115 daemon.daemon.prevent_core_dump) | |
1116 self.assertEqual(test_error, exc.__cause__) | |
1117 | |
1118 | |
1119 @mock.patch.object(os, "close") | |
1120 class close_file_descriptor_if_open_TestCase(scaffold.TestCase): | |
1121 """ Test cases for close_file_descriptor_if_open function. """ | |
1122 | |
1123 def setUp(self): | |
1124 """ Set up test fixtures. """ | |
1125 super(close_file_descriptor_if_open_TestCase, self).setUp() | |
1126 | |
1127 self.fake_fd = 274 | |
1128 | |
1129 def test_requests_file_descriptor_close(self, mock_func_os_close): | |
1130 """ Should request close of file descriptor. """ | |
1131 fd = self.fake_fd | |
1132 daemon.daemon.close_file_descriptor_if_open(fd) | |
1133 mock_func_os_close.assert_called_with(fd) | |
1134 | |
1135 def test_ignores_badfd_error_on_close(self, mock_func_os_close): | |
1136 """ Should ignore OSError EBADF when closing. """ | |
1137 fd = self.fake_fd | |
1138 test_error = OSError(errno.EBADF, "Bad file descriptor") | |
1139 def fake_os_close(fd): | |
1140 raise test_error | |
1141 mock_func_os_close.side_effect = fake_os_close | |
1142 daemon.daemon.close_file_descriptor_if_open(fd) | |
1143 mock_func_os_close.assert_called_with(fd) | |
1144 | |
1145 def test_raises_error_if_oserror_on_close(self, mock_func_os_close): | |
1146 """ Should raise DaemonError if an OSError occurs when closing. """ | |
1147 fd = self.fake_fd | |
1148 test_error = OSError(object(), "Unexpected error") | |
1149 def fake_os_close(fd): | |
1150 raise test_error | |
1151 mock_func_os_close.side_effect = fake_os_close | |
1152 expected_error = daemon.daemon.DaemonOSEnvironmentError | |
1153 exc = self.assertRaises( | |
1154 expected_error, | |
1155 daemon.daemon.close_file_descriptor_if_open, fd) | |
1156 self.assertEqual(test_error, exc.__cause__) | |
1157 | |
1158 def test_raises_error_if_ioerror_on_close(self, mock_func_os_close): | |
1159 """ Should raise DaemonError if an IOError occurs when closing. """ | |
1160 fd = self.fake_fd | |
1161 test_error = IOError(object(), "Unexpected error") | |
1162 def fake_os_close(fd): | |
1163 raise test_error | |
1164 mock_func_os_close.side_effect = fake_os_close | |
1165 expected_error = daemon.daemon.DaemonOSEnvironmentError | |
1166 exc = self.assertRaises( | |
1167 expected_error, | |
1168 daemon.daemon.close_file_descriptor_if_open, fd) | |
1169 self.assertEqual(test_error, exc.__cause__) | |
1170 | |
1171 | |
1172 class maxfd_TestCase(scaffold.TestCase): | |
1173 """ Test cases for module MAXFD constant. """ | |
1174 | |
1175 def test_positive(self): | |
1176 """ Should be a positive number. """ | |
1177 maxfd = daemon.daemon.MAXFD | |
1178 self.assertTrue(maxfd > 0) | |
1179 | |
1180 def test_integer(self): | |
1181 """ Should be an integer. """ | |
1182 maxfd = daemon.daemon.MAXFD | |
1183 self.assertEqual(int(maxfd), maxfd) | |
1184 | |
1185 def test_reasonably_high(self): | |
1186 """ Should be reasonably high for default open files limit. | |
1187 | |
1188 If the system reports a limit of “infinity” on maximum | |
1189 file descriptors, we still need a finite number in order | |
1190 to close “all” of them. Ensure this is reasonably high | |
1191 to catch most use cases. | |
1192 | |
1193 """ | |
1194 expected_minimum = 2048 | |
1195 maxfd = daemon.daemon.MAXFD | |
1196 self.assertTrue( | |
1197 expected_minimum <= maxfd, | |
1198 msg=( | |
1199 "MAXFD should be at least {minimum!r}" | |
1200 " (got {maxfd!r})".format( | |
1201 minimum=expected_minimum, maxfd=maxfd))) | |
1202 | |
1203 | |
1204 fake_default_maxfd = 8 | |
1205 fake_RLIMIT_NOFILE = object() | |
1206 fake_RLIM_INFINITY = object() | |
1207 fake_rlimit_nofile_large = 2468 | |
1208 | |
1209 def fake_getrlimit_nofile_soft_infinity(resource): | |
1210 result = RLimitResult(soft=fake_RLIM_INFINITY, hard=object()) | |
1211 if resource != fake_RLIMIT_NOFILE: | |
1212 result = NotImplemented | |
1213 return result | |
1214 | |
1215 def fake_getrlimit_nofile_hard_infinity(resource): | |
1216 result = RLimitResult(soft=object(), hard=fake_RLIM_INFINITY) | |
1217 if resource != fake_RLIMIT_NOFILE: | |
1218 result = NotImplemented | |
1219 return result | |
1220 | |
1221 def fake_getrlimit_nofile_hard_large(resource): | |
1222 result = RLimitResult(soft=object(), hard=fake_rlimit_nofile_large) | |
1223 if resource != fake_RLIMIT_NOFILE: | |
1224 result = NotImplemented | |
1225 return result | |
1226 | |
1227 @mock.patch.object(daemon.daemon, "MAXFD", new=fake_default_maxfd) | |
1228 @mock.patch.object(resource, "RLIMIT_NOFILE", new=fake_RLIMIT_NOFILE) | |
1229 @mock.patch.object(resource, "RLIM_INFINITY", new=fake_RLIM_INFINITY) | |
1230 @mock.patch.object( | |
1231 resource, "getrlimit", | |
1232 side_effect=fake_getrlimit_nofile_hard_large) | |
1233 class get_maximum_file_descriptors_TestCase(scaffold.TestCase): | |
1234 """ Test cases for get_maximum_file_descriptors function. """ | |
1235 | |
1236 def test_returns_system_hard_limit(self, mock_func_resource_getrlimit): | |
1237 """ Should return process hard limit on number of files. """ | |
1238 expected_result = fake_rlimit_nofile_large | |
1239 result = daemon.daemon.get_maximum_file_descriptors() | |
1240 self.assertEqual(expected_result, result) | |
1241 | |
1242 def test_returns_module_default_if_hard_limit_infinity( | |
1243 self, mock_func_resource_getrlimit): | |
1244 """ Should return module MAXFD if hard limit is infinity. """ | |
1245 mock_func_resource_getrlimit.side_effect = ( | |
1246 fake_getrlimit_nofile_hard_infinity) | |
1247 expected_result = fake_default_maxfd | |
1248 result = daemon.daemon.get_maximum_file_descriptors() | |
1249 self.assertEqual(expected_result, result) | |
1250 | |
1251 | |
1252 def fake_get_maximum_file_descriptors(): | |
1253 return fake_default_maxfd | |
1254 | |
1255 @mock.patch.object(resource, "RLIMIT_NOFILE", new=fake_RLIMIT_NOFILE) | |
1256 @mock.patch.object(resource, "RLIM_INFINITY", new=fake_RLIM_INFINITY) | |
1257 @mock.patch.object( | |
1258 resource, "getrlimit", | |
1259 new=fake_getrlimit_nofile_soft_infinity) | |
1260 @mock.patch.object( | |
1261 daemon.daemon, "get_maximum_file_descriptors", | |
1262 new=fake_get_maximum_file_descriptors) | |
1263 @mock.patch.object(daemon.daemon, "close_file_descriptor_if_open") | |
1264 class close_all_open_files_TestCase(scaffold.TestCase): | |
1265 """ Test cases for close_all_open_files function. """ | |
1266 | |
1267 def test_requests_all_open_files_to_close( | |
1268 self, mock_func_close_file_descriptor_if_open): | |
1269 """ Should request close of all open files. """ | |
1270 expected_file_descriptors = range(fake_default_maxfd) | |
1271 expected_calls = [ | |
1272 mock.call(fd) for fd in expected_file_descriptors] | |
1273 daemon.daemon.close_all_open_files() | |
1274 mock_func_close_file_descriptor_if_open.assert_has_calls( | |
1275 expected_calls, any_order=True) | |
1276 | |
1277 def test_requests_all_but_excluded_files_to_close( | |
1278 self, mock_func_close_file_descriptor_if_open): | |
1279 """ Should request close of all open files but those excluded. """ | |
1280 test_exclude = set([3, 7]) | |
1281 args = dict( | |
1282 exclude=test_exclude, | |
1283 ) | |
1284 expected_file_descriptors = set( | |
1285 fd for fd in range(fake_default_maxfd) | |
1286 if fd not in test_exclude) | |
1287 expected_calls = [ | |
1288 mock.call(fd) for fd in expected_file_descriptors] | |
1289 daemon.daemon.close_all_open_files(**args) | |
1290 mock_func_close_file_descriptor_if_open.assert_has_calls( | |
1291 expected_calls, any_order=True) | |
1292 | |
1293 | |
1294 class detach_process_context_TestCase(scaffold.TestCase): | |
1295 """ Test cases for detach_process_context function. """ | |
1296 | |
1297 class FakeOSExit(SystemExit): | |
1298 """ Fake exception raised for os._exit(). """ | |
1299 | |
1300 def setUp(self): | |
1301 """ Set up test fixtures. """ | |
1302 super(detach_process_context_TestCase, self).setUp() | |
1303 | |
1304 self.mock_module_os = mock.MagicMock(wraps=os) | |
1305 | |
1306 fake_pids = [0, 0] | |
1307 func_patcher_os_fork = mock.patch.object( | |
1308 os, "fork", | |
1309 side_effect=iter(fake_pids)) | |
1310 self.mock_func_os_fork = func_patcher_os_fork.start() | |
1311 self.addCleanup(func_patcher_os_fork.stop) | |
1312 self.mock_module_os.attach_mock(self.mock_func_os_fork, "fork") | |
1313 | |
1314 func_patcher_os_setsid = mock.patch.object(os, "setsid") | |
1315 self.mock_func_os_setsid = func_patcher_os_setsid.start() | |
1316 self.addCleanup(func_patcher_os_setsid.stop) | |
1317 self.mock_module_os.attach_mock(self.mock_func_os_setsid, "setsid") | |
1318 | |
1319 def raise_os_exit(status=None): | |
1320 raise self.FakeOSExit(status) | |
1321 | |
1322 func_patcher_os_force_exit = mock.patch.object( | |
1323 os, "_exit", | |
1324 side_effect=raise_os_exit) | |
1325 self.mock_func_os_force_exit = func_patcher_os_force_exit.start() | |
1326 self.addCleanup(func_patcher_os_force_exit.stop) | |
1327 self.mock_module_os.attach_mock(self.mock_func_os_force_exit, "_exit") | |
1328 | |
1329 def test_parent_exits(self): | |
1330 """ Parent process should exit. """ | |
1331 parent_pid = 23 | |
1332 self.mock_func_os_fork.side_effect = iter([parent_pid]) | |
1333 self.assertRaises( | |
1334 self.FakeOSExit, | |
1335 daemon.daemon.detach_process_context) | |
1336 self.mock_module_os.assert_has_calls([ | |
1337 mock.call.fork(), | |
1338 mock.call._exit(0), | |
1339 ]) | |
1340 | |
1341 def test_first_fork_error_raises_error(self): | |
1342 """ Error on first fork should raise DaemonProcessDetachError. """ | |
1343 fork_errno = 13 | |
1344 fork_strerror = "Bad stuff happened" | |
1345 test_error = OSError(fork_errno, fork_strerror) | |
1346 test_pids_iter = iter([test_error]) | |
1347 | |
1348 def fake_fork(): | |
1349 next_item = next(test_pids_iter) | |
1350 if isinstance(next_item, Exception): | |
1351 raise next_item | |
1352 else: | |
1353 return next_item | |
1354 | |
1355 self.mock_func_os_fork.side_effect = fake_fork | |
1356 exc = self.assertRaises( | |
1357 daemon.daemon.DaemonProcessDetachError, | |
1358 daemon.daemon.detach_process_context) | |
1359 self.assertEqual(test_error, exc.__cause__) | |
1360 self.mock_module_os.assert_has_calls([ | |
1361 mock.call.fork(), | |
1362 ]) | |
1363 | |
1364 def test_child_starts_new_process_group(self): | |
1365 """ Child should start new process group. """ | |
1366 daemon.daemon.detach_process_context() | |
1367 self.mock_module_os.assert_has_calls([ | |
1368 mock.call.fork(), | |
1369 mock.call.setsid(), | |
1370 ]) | |
1371 | |
1372 def test_child_forks_next_parent_exits(self): | |
1373 """ Child should fork, then exit if parent. """ | |
1374 fake_pids = [0, 42] | |
1375 self.mock_func_os_fork.side_effect = iter(fake_pids) | |
1376 self.assertRaises( | |
1377 self.FakeOSExit, | |
1378 daemon.daemon.detach_process_context) | |
1379 self.mock_module_os.assert_has_calls([ | |
1380 mock.call.fork(), | |
1381 mock.call.setsid(), | |
1382 mock.call.fork(), | |
1383 mock.call._exit(0), | |
1384 ]) | |
1385 | |
1386 def test_second_fork_error_reports_to_stderr(self): | |
1387 """ Error on second fork should cause report to stderr. """ | |
1388 fork_errno = 17 | |
1389 fork_strerror = "Nasty stuff happened" | |
1390 test_error = OSError(fork_errno, fork_strerror) | |
1391 test_pids_iter = iter([0, test_error]) | |
1392 | |
1393 def fake_fork(): | |
1394 next_item = next(test_pids_iter) | |
1395 if isinstance(next_item, Exception): | |
1396 raise next_item | |
1397 else: | |
1398 return next_item | |
1399 | |
1400 self.mock_func_os_fork.side_effect = fake_fork | |
1401 exc = self.assertRaises( | |
1402 daemon.daemon.DaemonProcessDetachError, | |
1403 daemon.daemon.detach_process_context) | |
1404 self.assertEqual(test_error, exc.__cause__) | |
1405 self.mock_module_os.assert_has_calls([ | |
1406 mock.call.fork(), | |
1407 mock.call.setsid(), | |
1408 mock.call.fork(), | |
1409 ]) | |
1410 | |
1411 def test_child_forks_next_child_continues(self): | |
1412 """ Child should fork, then continue if child. """ | |
1413 daemon.daemon.detach_process_context() | |
1414 self.mock_module_os.assert_has_calls([ | |
1415 mock.call.fork(), | |
1416 mock.call.setsid(), | |
1417 mock.call.fork(), | |
1418 ]) | |
1419 | |
1420 | |
1421 @mock.patch("os.getppid", return_value=765) | |
1422 class is_process_started_by_init_TestCase(scaffold.TestCase): | |
1423 """ Test cases for is_process_started_by_init function. """ | |
1424 | |
1425 def test_returns_false_by_default(self, mock_func_os_getppid): | |
1426 """ Should return False under normal circumstances. """ | |
1427 expected_result = False | |
1428 result = daemon.daemon.is_process_started_by_init() | |
1429 self.assertIs(result, expected_result) | |
1430 | |
1431 def test_returns_true_if_parent_process_is_init( | |
1432 self, mock_func_os_getppid): | |
1433 """ Should return True if parent process is `init`. """ | |
1434 init_pid = 1 | |
1435 mock_func_os_getppid.return_value = init_pid | |
1436 expected_result = True | |
1437 result = daemon.daemon.is_process_started_by_init() | |
1438 self.assertIs(result, expected_result) | |
1439 | |
1440 | |
1441 class is_socket_TestCase(scaffold.TestCase): | |
1442 """ Test cases for is_socket function. """ | |
1443 | |
1444 def setUp(self): | |
1445 """ Set up test fixtures. """ | |
1446 super(is_socket_TestCase, self).setUp() | |
1447 | |
1448 def fake_getsockopt(level, optname, buflen=None): | |
1449 result = object() | |
1450 if optname is socket.SO_TYPE: | |
1451 result = socket.SOCK_RAW | |
1452 return result | |
1453 | |
1454 self.fake_socket_getsockopt_func = fake_getsockopt | |
1455 | |
1456 self.fake_socket_error = socket.error( | |
1457 errno.ENOTSOCK, | |
1458 "Socket operation on non-socket") | |
1459 | |
1460 self.mock_socket = mock.MagicMock(spec=socket.socket) | |
1461 self.mock_socket.getsockopt.side_effect = self.fake_socket_error | |
1462 | |
1463 def fake_socket_fromfd(fd, family, type, proto=None): | |
1464 return self.mock_socket | |
1465 | |
1466 func_patcher_socket_fromfd = mock.patch.object( | |
1467 socket, "fromfd", | |
1468 side_effect=fake_socket_fromfd) | |
1469 func_patcher_socket_fromfd.start() | |
1470 self.addCleanup(func_patcher_socket_fromfd.stop) | |
1471 | |
1472 def test_returns_false_by_default(self): | |
1473 """ Should return False under normal circumstances. """ | |
1474 test_fd = 23 | |
1475 expected_result = False | |
1476 result = daemon.daemon.is_socket(test_fd) | |
1477 self.assertIs(result, expected_result) | |
1478 | |
1479 def test_returns_true_if_stdin_is_socket(self): | |
1480 """ Should return True if `stdin` is a socket. """ | |
1481 test_fd = 23 | |
1482 getsockopt = self.mock_socket.getsockopt | |
1483 getsockopt.side_effect = self.fake_socket_getsockopt_func | |
1484 expected_result = True | |
1485 result = daemon.daemon.is_socket(test_fd) | |
1486 self.assertIs(result, expected_result) | |
1487 | |
1488 def test_returns_false_if_stdin_socket_raises_error(self): | |
1489 """ Should return True if `stdin` is a socket and raises error. """ | |
1490 test_fd = 23 | |
1491 getsockopt = self.mock_socket.getsockopt | |
1492 getsockopt.side_effect = socket.error( | |
1493 object(), "Weird socket stuff") | |
1494 expected_result = True | |
1495 result = daemon.daemon.is_socket(test_fd) | |
1496 self.assertIs(result, expected_result) | |
1497 | |
1498 | |
1499 class is_process_started_by_superserver_TestCase(scaffold.TestCase): | |
1500 """ Test cases for is_process_started_by_superserver function. """ | |
1501 | |
1502 def setUp(self): | |
1503 """ Set up test fixtures. """ | |
1504 super(is_process_started_by_superserver_TestCase, self).setUp() | |
1505 | |
1506 def fake_is_socket(fd): | |
1507 if sys.__stdin__.fileno() == fd: | |
1508 result = self.fake_stdin_is_socket_func() | |
1509 else: | |
1510 result = False | |
1511 return result | |
1512 | |
1513 self.fake_stdin_is_socket_func = (lambda: False) | |
1514 | |
1515 func_patcher_is_socket = mock.patch.object( | |
1516 daemon.daemon, "is_socket", | |
1517 side_effect=fake_is_socket) | |
1518 func_patcher_is_socket.start() | |
1519 self.addCleanup(func_patcher_is_socket.stop) | |
1520 | |
1521 def test_returns_false_by_default(self): | |
1522 """ Should return False under normal circumstances. """ | |
1523 expected_result = False | |
1524 result = daemon.daemon.is_process_started_by_superserver() | |
1525 self.assertIs(result, expected_result) | |
1526 | |
1527 def test_returns_true_if_stdin_is_socket(self): | |
1528 """ Should return True if `stdin` is a socket. """ | |
1529 self.fake_stdin_is_socket_func = (lambda: True) | |
1530 expected_result = True | |
1531 result = daemon.daemon.is_process_started_by_superserver() | |
1532 self.assertIs(result, expected_result) | |
1533 | |
1534 | |
1535 @mock.patch.object( | |
1536 daemon.daemon, "is_process_started_by_superserver", | |
1537 return_value=False) | |
1538 @mock.patch.object( | |
1539 daemon.daemon, "is_process_started_by_init", | |
1540 return_value=False) | |
1541 class is_detach_process_context_required_TestCase(scaffold.TestCase): | |
1542 """ Test cases for is_detach_process_context_required function. """ | |
1543 | |
1544 def test_returns_true_by_default( | |
1545 self, | |
1546 mock_func_is_process_started_by_init, | |
1547 mock_func_is_process_started_by_superserver): | |
1548 """ Should return True under normal circumstances. """ | |
1549 expected_result = True | |
1550 result = daemon.daemon.is_detach_process_context_required() | |
1551 self.assertIs(result, expected_result) | |
1552 | |
1553 def test_returns_false_if_started_by_init( | |
1554 self, | |
1555 mock_func_is_process_started_by_init, | |
1556 mock_func_is_process_started_by_superserver): | |
1557 """ Should return False if current process started by init. """ | |
1558 mock_func_is_process_started_by_init.return_value = True | |
1559 expected_result = False | |
1560 result = daemon.daemon.is_detach_process_context_required() | |
1561 self.assertIs(result, expected_result) | |
1562 | |
1563 def test_returns_true_if_started_by_superserver( | |
1564 self, | |
1565 mock_func_is_process_started_by_init, | |
1566 mock_func_is_process_started_by_superserver): | |
1567 """ Should return False if current process started by superserver. """ | |
1568 mock_func_is_process_started_by_superserver.return_value = True | |
1569 expected_result = False | |
1570 result = daemon.daemon.is_detach_process_context_required() | |
1571 self.assertIs(result, expected_result) | |
1572 | |
1573 | |
1574 def setup_streams_fixtures(testcase): | |
1575 """ Set up common test fixtures for standard streams. """ | |
1576 testcase.stream_file_paths = dict( | |
1577 stdin=tempfile.mktemp(), | |
1578 stdout=tempfile.mktemp(), | |
1579 stderr=tempfile.mktemp(), | |
1580 ) | |
1581 | |
1582 testcase.stream_files_by_name = dict( | |
1583 (name, FakeFileDescriptorStringIO()) | |
1584 for name in ['stdin', 'stdout', 'stderr'] | |
1585 ) | |
1586 | |
1587 testcase.stream_files_by_path = dict( | |
1588 (testcase.stream_file_paths[name], | |
1589 testcase.stream_files_by_name[name]) | |
1590 for name in ['stdin', 'stdout', 'stderr'] | |
1591 ) | |
1592 | |
1593 | |
1594 @mock.patch.object(os, "dup2") | |
1595 class redirect_stream_TestCase(scaffold.TestCase): | |
1596 """ Test cases for redirect_stream function. """ | |
1597 | |
1598 def setUp(self): | |
1599 """ Set up test fixtures. """ | |
1600 super(redirect_stream_TestCase, self).setUp() | |
1601 | |
1602 self.test_system_stream = FakeFileDescriptorStringIO() | |
1603 self.test_target_stream = FakeFileDescriptorStringIO() | |
1604 self.test_null_file = FakeFileDescriptorStringIO() | |
1605 | |
1606 def fake_os_open(path, flag, mode=None): | |
1607 if path == os.devnull: | |
1608 result = self.test_null_file.fileno() | |
1609 else: | |
1610 raise FileNotFoundError("No such file", path) | |
1611 return result | |
1612 | |
1613 func_patcher_os_open = mock.patch.object( | |
1614 os, "open", | |
1615 side_effect=fake_os_open) | |
1616 self.mock_func_os_open = func_patcher_os_open.start() | |
1617 self.addCleanup(func_patcher_os_open.stop) | |
1618 | |
1619 def test_duplicates_target_file_descriptor( | |
1620 self, mock_func_os_dup2): | |
1621 """ Should duplicate file descriptor from target to system stream. """ | |
1622 system_stream = self.test_system_stream | |
1623 system_fileno = system_stream.fileno() | |
1624 target_stream = self.test_target_stream | |
1625 target_fileno = target_stream.fileno() | |
1626 daemon.daemon.redirect_stream(system_stream, target_stream) | |
1627 mock_func_os_dup2.assert_called_with(target_fileno, system_fileno) | |
1628 | |
1629 def test_duplicates_null_file_descriptor_by_default( | |
1630 self, mock_func_os_dup2): | |
1631 """ Should by default duplicate the null file to the system stream. """ | |
1632 system_stream = self.test_system_stream | |
1633 system_fileno = system_stream.fileno() | |
1634 target_stream = None | |
1635 null_path = os.devnull | |
1636 null_flag = os.O_RDWR | |
1637 null_file = self.test_null_file | |
1638 null_fileno = null_file.fileno() | |
1639 daemon.daemon.redirect_stream(system_stream, target_stream) | |
1640 self.mock_func_os_open.assert_called_with(null_path, null_flag) | |
1641 mock_func_os_dup2.assert_called_with(null_fileno, system_fileno) | |
1642 | |
1643 | |
1644 class make_default_signal_map_TestCase(scaffold.TestCase): | |
1645 """ Test cases for make_default_signal_map function. """ | |
1646 | |
1647 def setUp(self): | |
1648 """ Set up test fixtures. """ | |
1649 super(make_default_signal_map_TestCase, self).setUp() | |
1650 | |
1651 # Use whatever default string type this Python version needs. | |
1652 signal_module_name = str('signal') | |
1653 self.fake_signal_module = ModuleType(signal_module_name) | |
1654 | |
1655 fake_signal_names = [ | |
1656 'SIGHUP', | |
1657 'SIGCLD', | |
1658 'SIGSEGV', | |
1659 'SIGTSTP', | |
1660 'SIGTTIN', | |
1661 'SIGTTOU', | |
1662 'SIGTERM', | |
1663 ] | |
1664 for name in fake_signal_names: | |
1665 setattr(self.fake_signal_module, name, object()) | |
1666 | |
1667 module_patcher_signal = mock.patch.object( | |
1668 daemon.daemon, "signal", new=self.fake_signal_module) | |
1669 module_patcher_signal.start() | |
1670 self.addCleanup(module_patcher_signal.stop) | |
1671 | |
1672 default_signal_map_by_name = { | |
1673 'SIGTSTP': None, | |
1674 'SIGTTIN': None, | |
1675 'SIGTTOU': None, | |
1676 'SIGTERM': 'terminate', | |
1677 } | |
1678 self.default_signal_map = dict( | |
1679 (getattr(self.fake_signal_module, name), target) | |
1680 for (name, target) in default_signal_map_by_name.items()) | |
1681 | |
1682 def test_returns_constructed_signal_map(self): | |
1683 """ Should return map per default. """ | |
1684 expected_result = self.default_signal_map | |
1685 result = daemon.daemon.make_default_signal_map() | |
1686 self.assertEqual(expected_result, result) | |
1687 | |
1688 def test_returns_signal_map_with_only_ids_in_signal_module(self): | |
1689 """ Should return map with only signals in the `signal` module. | |
1690 | |
1691 The `signal` module is documented to only define those | |
1692 signals which exist on the running system. Therefore the | |
1693 default map should not contain any signals which are not | |
1694 defined in the `signal` module. | |
1695 | |
1696 """ | |
1697 del(self.default_signal_map[self.fake_signal_module.SIGTTOU]) | |
1698 del(self.fake_signal_module.SIGTTOU) | |
1699 expected_result = self.default_signal_map | |
1700 result = daemon.daemon.make_default_signal_map() | |
1701 self.assertEqual(expected_result, result) | |
1702 | |
1703 | |
1704 @mock.patch.object(daemon.daemon.signal, "signal") | |
1705 class set_signal_handlers_TestCase(scaffold.TestCase): | |
1706 """ Test cases for set_signal_handlers function. """ | |
1707 | |
1708 def setUp(self): | |
1709 """ Set up test fixtures. """ | |
1710 super(set_signal_handlers_TestCase, self).setUp() | |
1711 | |
1712 self.signal_handler_map = { | |
1713 signal.SIGQUIT: object(), | |
1714 signal.SIGSEGV: object(), | |
1715 signal.SIGINT: object(), | |
1716 } | |
1717 | |
1718 def test_sets_signal_handler_for_each_item(self, mock_func_signal_signal): | |
1719 """ Should set signal handler for each item in map. """ | |
1720 signal_handler_map = self.signal_handler_map | |
1721 expected_calls = [ | |
1722 mock.call(signal_number, handler) | |
1723 for (signal_number, handler) in signal_handler_map.items()] | |
1724 daemon.daemon.set_signal_handlers(signal_handler_map) | |
1725 self.assertEquals(expected_calls, mock_func_signal_signal.mock_calls) | |
1726 | |
1727 | |
1728 @mock.patch.object(daemon.daemon.atexit, "register") | |
1729 class register_atexit_function_TestCase(scaffold.TestCase): | |
1730 """ Test cases for register_atexit_function function. """ | |
1731 | |
1732 def test_registers_function_for_atexit_processing( | |
1733 self, mock_func_atexit_register): | |
1734 """ Should register specified function for atexit processing. """ | |
1735 func = object() | |
1736 daemon.daemon.register_atexit_function(func) | |
1737 mock_func_atexit_register.assert_called_with(func) | |
1738 | |
1739 | |
1740 # Local variables: | |
1741 # coding: utf-8 | |
1742 # mode: python | |
1743 # End: | |
1744 # vim: fileencoding=utf-8 filetype=python : |