comparison python-daemon-2.0.5/test/test_daemon.py @ 33:7ceb967147c3

start xena with no gui add library files
author jingchunzhu <jingchunzhu@gmail.com>
date Wed, 22 Jul 2015 13:24:44 -0700
parents
children
comparison
equal deleted inserted replaced
32:63b1ba1e3424 33:7ceb967147c3
1 # -*- coding: utf-8 -*-
2 #
3 # test/test_daemon.py
4 # Part of ‘python-daemon’, an implementation of PEP 3143.
5 #
6 # Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
7 #
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the Apache License, version 2.0 as published by the
10 # Apache Software Foundation.
11 # No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
12
13 """ Unit test for ‘daemon’ module.
14 """
15
16 from __future__ import (absolute_import, unicode_literals)
17
18 import os
19 import sys
20 import tempfile
21 import resource
22 import errno
23 import signal
24 import socket
25 from types import ModuleType
26 import collections
27 import functools
28 try:
29 # Standard library of Python 2.7 and later.
30 from io import StringIO
31 except ImportError:
32 # Standard library of Python 2.6 and earlier.
33 from StringIO import StringIO
34
35 import mock
36
37 from . import scaffold
38 from .scaffold import (basestring, unicode)
39 from .test_pidfile import (
40 FakeFileDescriptorStringIO,
41 setup_pidfile_fixtures,
42 )
43
44 import daemon
45
46
47 class ModuleExceptions_TestCase(scaffold.Exception_TestCase):
48 """ Test cases for module exception classes. """
49
50 scenarios = scaffold.make_exception_scenarios([
51 ('daemon.daemon.DaemonError', dict(
52 exc_type = daemon.daemon.DaemonError,
53 min_args = 1,
54 types = [Exception],
55 )),
56 ('daemon.daemon.DaemonOSEnvironmentError', dict(
57 exc_type = daemon.daemon.DaemonOSEnvironmentError,
58 min_args = 1,
59 types = [daemon.daemon.DaemonError, OSError],
60 )),
61 ('daemon.daemon.DaemonProcessDetachError', dict(
62 exc_type = daemon.daemon.DaemonProcessDetachError,
63 min_args = 1,
64 types = [daemon.daemon.DaemonError, OSError],
65 )),
66 ])
67
68
69 def setup_daemon_context_fixtures(testcase):
70 """ Set up common test fixtures for DaemonContext test case.
71
72 :param testcase: A ``TestCase`` instance to decorate.
73 :return: ``None``.
74
75 Decorate the `testcase` with fixtures for tests involving
76 `DaemonContext`.
77
78 """
79 setup_streams_fixtures(testcase)
80
81 setup_pidfile_fixtures(testcase)
82
83 testcase.fake_pidfile_path = tempfile.mktemp()
84 testcase.mock_pidlockfile = mock.MagicMock()
85 testcase.mock_pidlockfile.path = testcase.fake_pidfile_path
86
87 testcase.daemon_context_args = dict(
88 stdin=testcase.stream_files_by_name['stdin'],
89 stdout=testcase.stream_files_by_name['stdout'],
90 stderr=testcase.stream_files_by_name['stderr'],
91 )
92 testcase.test_instance = daemon.DaemonContext(
93 **testcase.daemon_context_args)
94
95 fake_default_signal_map = object()
96
97 @mock.patch.object(
98 daemon.daemon, "is_detach_process_context_required",
99 new=(lambda: True))
100 @mock.patch.object(
101 daemon.daemon, "make_default_signal_map",
102 new=(lambda: fake_default_signal_map))
103 @mock.patch.object(os, "setgid", new=(lambda x: object()))
104 @mock.patch.object(os, "setuid", new=(lambda x: object()))
105 class DaemonContext_BaseTestCase(scaffold.TestCase):
106 """ Base class for DaemonContext test case classes. """
107
108 def setUp(self):
109 """ Set up test fixtures. """
110 super(DaemonContext_BaseTestCase, self).setUp()
111
112 setup_daemon_context_fixtures(self)
113
114
115 class DaemonContext_TestCase(DaemonContext_BaseTestCase):
116 """ Test cases for DaemonContext class. """
117
118 def test_instantiate(self):
119 """ New instance of DaemonContext should be created. """
120 self.assertIsInstance(
121 self.test_instance, daemon.daemon.DaemonContext)
122
123 def test_minimum_zero_arguments(self):
124 """ Initialiser should not require any arguments. """
125 instance = daemon.daemon.DaemonContext()
126 self.assertIsNot(instance, None)
127
128 def test_has_specified_chroot_directory(self):
129 """ Should have specified chroot_directory option. """
130 args = dict(
131 chroot_directory=object(),
132 )
133 expected_directory = args['chroot_directory']
134 instance = daemon.daemon.DaemonContext(**args)
135 self.assertEqual(expected_directory, instance.chroot_directory)
136
137 def test_has_specified_working_directory(self):
138 """ Should have specified working_directory option. """
139 args = dict(
140 working_directory=object(),
141 )
142 expected_directory = args['working_directory']
143 instance = daemon.daemon.DaemonContext(**args)
144 self.assertEqual(expected_directory, instance.working_directory)
145
146 def test_has_default_working_directory(self):
147 """ Should have default working_directory option. """
148 args = dict()
149 expected_directory = "/"
150 instance = daemon.daemon.DaemonContext(**args)
151 self.assertEqual(expected_directory, instance.working_directory)
152
153 def test_has_specified_creation_mask(self):
154 """ Should have specified umask option. """
155 args = dict(
156 umask=object(),
157 )
158 expected_mask = args['umask']
159 instance = daemon.daemon.DaemonContext(**args)
160 self.assertEqual(expected_mask, instance.umask)
161
162 def test_has_default_creation_mask(self):
163 """ Should have default umask option. """
164 args = dict()
165 expected_mask = 0
166 instance = daemon.daemon.DaemonContext(**args)
167 self.assertEqual(expected_mask, instance.umask)
168
169 def test_has_specified_uid(self):
170 """ Should have specified uid option. """
171 args = dict(
172 uid=object(),
173 )
174 expected_id = args['uid']
175 instance = daemon.daemon.DaemonContext(**args)
176 self.assertEqual(expected_id, instance.uid)
177
178 def test_has_derived_uid(self):
179 """ Should have uid option derived from process. """
180 args = dict()
181 expected_id = os.getuid()
182 instance = daemon.daemon.DaemonContext(**args)
183 self.assertEqual(expected_id, instance.uid)
184
185 def test_has_specified_gid(self):
186 """ Should have specified gid option. """
187 args = dict(
188 gid=object(),
189 )
190 expected_id = args['gid']
191 instance = daemon.daemon.DaemonContext(**args)
192 self.assertEqual(expected_id, instance.gid)
193
194 def test_has_derived_gid(self):
195 """ Should have gid option derived from process. """
196 args = dict()
197 expected_id = os.getgid()
198 instance = daemon.daemon.DaemonContext(**args)
199 self.assertEqual(expected_id, instance.gid)
200
201 def test_has_specified_detach_process(self):
202 """ Should have specified detach_process option. """
203 args = dict(
204 detach_process=object(),
205 )
206 expected_value = args['detach_process']
207 instance = daemon.daemon.DaemonContext(**args)
208 self.assertEqual(expected_value, instance.detach_process)
209
210 def test_has_derived_detach_process(self):
211 """ Should have detach_process option derived from environment. """
212 args = dict()
213 func = daemon.daemon.is_detach_process_context_required
214 expected_value = func()
215 instance = daemon.daemon.DaemonContext(**args)
216 self.assertEqual(expected_value, instance.detach_process)
217
218 def test_has_specified_files_preserve(self):
219 """ Should have specified files_preserve option. """
220 args = dict(
221 files_preserve=object(),
222 )
223 expected_files_preserve = args['files_preserve']
224 instance = daemon.daemon.DaemonContext(**args)
225 self.assertEqual(expected_files_preserve, instance.files_preserve)
226
227 def test_has_specified_pidfile(self):
228 """ Should have the specified pidfile. """
229 args = dict(
230 pidfile=object(),
231 )
232 expected_pidfile = args['pidfile']
233 instance = daemon.daemon.DaemonContext(**args)
234 self.assertEqual(expected_pidfile, instance.pidfile)
235
236 def test_has_specified_stdin(self):
237 """ Should have specified stdin option. """
238 args = dict(
239 stdin=object(),
240 )
241 expected_file = args['stdin']
242 instance = daemon.daemon.DaemonContext(**args)
243 self.assertEqual(expected_file, instance.stdin)
244
245 def test_has_specified_stdout(self):
246 """ Should have specified stdout option. """
247 args = dict(
248 stdout=object(),
249 )
250 expected_file = args['stdout']
251 instance = daemon.daemon.DaemonContext(**args)
252 self.assertEqual(expected_file, instance.stdout)
253
254 def test_has_specified_stderr(self):
255 """ Should have specified stderr option. """
256 args = dict(
257 stderr=object(),
258 )
259 expected_file = args['stderr']
260 instance = daemon.daemon.DaemonContext(**args)
261 self.assertEqual(expected_file, instance.stderr)
262
263 def test_has_specified_signal_map(self):
264 """ Should have specified signal_map option. """
265 args = dict(
266 signal_map=object(),
267 )
268 expected_signal_map = args['signal_map']
269 instance = daemon.daemon.DaemonContext(**args)
270 self.assertEqual(expected_signal_map, instance.signal_map)
271
272 def test_has_derived_signal_map(self):
273 """ Should have signal_map option derived from system. """
274 args = dict()
275 expected_signal_map = daemon.daemon.make_default_signal_map()
276 instance = daemon.daemon.DaemonContext(**args)
277 self.assertEqual(expected_signal_map, instance.signal_map)
278
279
280 class DaemonContext_is_open_TestCase(DaemonContext_BaseTestCase):
281 """ Test cases for DaemonContext.is_open property. """
282
283 def test_begin_false(self):
284 """ Initial value of is_open should be False. """
285 instance = self.test_instance
286 self.assertEqual(False, instance.is_open)
287
288 def test_write_fails(self):
289 """ Writing to is_open should fail. """
290 instance = self.test_instance
291 self.assertRaises(
292 AttributeError,
293 setattr, instance, 'is_open', object())
294
295
296 class DaemonContext_open_TestCase(DaemonContext_BaseTestCase):
297 """ Test cases for DaemonContext.open method. """
298
299 def setUp(self):
300 """ Set up test fixtures. """
301 super(DaemonContext_open_TestCase, self).setUp()
302
303 self.test_instance._is_open = False
304
305 self.mock_module_daemon = mock.MagicMock()
306 daemon_func_patchers = dict(
307 (func_name, mock.patch.object(
308 daemon.daemon, func_name))
309 for func_name in [
310 "detach_process_context",
311 "change_working_directory",
312 "change_root_directory",
313 "change_file_creation_mask",
314 "change_process_owner",
315 "prevent_core_dump",
316 "close_all_open_files",
317 "redirect_stream",
318 "set_signal_handlers",
319 "register_atexit_function",
320 ])
321 for (func_name, patcher) in daemon_func_patchers.items():
322 mock_func = patcher.start()
323 self.addCleanup(patcher.stop)
324 self.mock_module_daemon.attach_mock(mock_func, func_name)
325
326 self.mock_module_daemon.attach_mock(mock.Mock(), 'DaemonContext')
327
328 self.test_files_preserve_fds = object()
329 self.test_signal_handler_map = object()
330 daemoncontext_method_return_values = {
331 '_get_exclude_file_descriptors':
332 self.test_files_preserve_fds,
333 '_make_signal_handler_map':
334 self.test_signal_handler_map,
335 }
336 daemoncontext_func_patchers = dict(
337 (func_name, mock.patch.object(
338 daemon.daemon.DaemonContext,
339 func_name,
340 return_value=return_value))
341 for (func_name, return_value) in
342 daemoncontext_method_return_values.items())
343 for (func_name, patcher) in daemoncontext_func_patchers.items():
344 mock_func = patcher.start()
345 self.addCleanup(patcher.stop)
346 self.mock_module_daemon.DaemonContext.attach_mock(
347 mock_func, func_name)
348
349 def test_performs_steps_in_expected_sequence(self):
350 """ Should perform daemonisation steps in expected sequence. """
351 instance = self.test_instance
352 instance.chroot_directory = object()
353 instance.detach_process = True
354 instance.pidfile = self.mock_pidlockfile
355 self.mock_module_daemon.attach_mock(
356 self.mock_pidlockfile, 'pidlockfile')
357 expected_calls = [
358 mock.call.change_root_directory(mock.ANY),
359 mock.call.prevent_core_dump(),
360 mock.call.change_file_creation_mask(mock.ANY),
361 mock.call.change_working_directory(mock.ANY),
362 mock.call.change_process_owner(mock.ANY, mock.ANY),
363 mock.call.detach_process_context(),
364 mock.call.DaemonContext._make_signal_handler_map(),
365 mock.call.set_signal_handlers(mock.ANY),
366 mock.call.DaemonContext._get_exclude_file_descriptors(),
367 mock.call.close_all_open_files(exclude=mock.ANY),
368 mock.call.redirect_stream(mock.ANY, mock.ANY),
369 mock.call.redirect_stream(mock.ANY, mock.ANY),
370 mock.call.redirect_stream(mock.ANY, mock.ANY),
371 mock.call.pidlockfile.__enter__(),
372 mock.call.register_atexit_function(mock.ANY),
373 ]
374 instance.open()
375 self.mock_module_daemon.assert_has_calls(expected_calls)
376
377 def test_returns_immediately_if_is_open(self):
378 """ Should return immediately if is_open property is true. """
379 instance = self.test_instance
380 instance._is_open = True
381 instance.open()
382 self.assertEqual(0, len(self.mock_module_daemon.mock_calls))
383
384 def test_changes_root_directory_to_chroot_directory(self):
385 """ Should change root directory to `chroot_directory` option. """
386 instance = self.test_instance
387 chroot_directory = object()
388 instance.chroot_directory = chroot_directory
389 instance.open()
390 self.mock_module_daemon.change_root_directory.assert_called_with(
391 chroot_directory)
392
393 def test_omits_chroot_if_no_chroot_directory(self):
394 """ Should omit changing root directory if no `chroot_directory`. """
395 instance = self.test_instance
396 instance.chroot_directory = None
397 instance.open()
398 self.assertFalse(self.mock_module_daemon.change_root_directory.called)
399
400 def test_prevents_core_dump(self):
401 """ Should request prevention of core dumps. """
402 instance = self.test_instance
403 instance.open()
404 self.mock_module_daemon.prevent_core_dump.assert_called_with()
405
406 def test_omits_prevent_core_dump_if_prevent_core_false(self):
407 """ Should omit preventing core dumps if `prevent_core` is false. """
408 instance = self.test_instance
409 instance.prevent_core = False
410 instance.open()
411 self.assertFalse(self.mock_module_daemon.prevent_core_dump.called)
412
413 def test_closes_open_files(self):
414 """ Should close all open files, excluding `files_preserve`. """
415 instance = self.test_instance
416 expected_exclude = self.test_files_preserve_fds
417 instance.open()
418 self.mock_module_daemon.close_all_open_files.assert_called_with(
419 exclude=expected_exclude)
420
421 def test_changes_directory_to_working_directory(self):
422 """ Should change current directory to `working_directory` option. """
423 instance = self.test_instance
424 working_directory = object()
425 instance.working_directory = working_directory
426 instance.open()
427 self.mock_module_daemon.change_working_directory.assert_called_with(
428 working_directory)
429
430 def test_changes_creation_mask_to_umask(self):
431 """ Should change file creation mask to `umask` option. """
432 instance = self.test_instance
433 umask = object()
434 instance.umask = umask
435 instance.open()
436 self.mock_module_daemon.change_file_creation_mask.assert_called_with(
437 umask)
438
439 def test_changes_owner_to_specified_uid_and_gid(self):
440 """ Should change process UID and GID to `uid` and `gid` options. """
441 instance = self.test_instance
442 uid = object()
443 gid = object()
444 instance.uid = uid
445 instance.gid = gid
446 instance.open()
447 self.mock_module_daemon.change_process_owner.assert_called_with(
448 uid, gid)
449
450 def test_detaches_process_context(self):
451 """ Should request detach of process context. """
452 instance = self.test_instance
453 instance.open()
454 self.mock_module_daemon.detach_process_context.assert_called_with()
455
456 def test_omits_process_detach_if_not_required(self):
457 """ Should omit detach of process context if not required. """
458 instance = self.test_instance
459 instance.detach_process = False
460 instance.open()
461 self.assertFalse(self.mock_module_daemon.detach_process_context.called)
462
463 def test_sets_signal_handlers_from_signal_map(self):
464 """ Should set signal handlers according to `signal_map`. """
465 instance = self.test_instance
466 instance.signal_map = object()
467 expected_signal_handler_map = self.test_signal_handler_map
468 instance.open()
469 self.mock_module_daemon.set_signal_handlers.assert_called_with(
470 expected_signal_handler_map)
471
472 def test_redirects_standard_streams(self):
473 """ Should request redirection of standard stream files. """
474 instance = self.test_instance
475 (system_stdin, system_stdout, system_stderr) = (
476 sys.stdin, sys.stdout, sys.stderr)
477 (target_stdin, target_stdout, target_stderr) = (
478 self.stream_files_by_name[name]
479 for name in ['stdin', 'stdout', 'stderr'])
480 expected_calls = [
481 mock.call(system_stdin, target_stdin),
482 mock.call(system_stdout, target_stdout),
483 mock.call(system_stderr, target_stderr),
484 ]
485 instance.open()
486 self.mock_module_daemon.redirect_stream.assert_has_calls(
487 expected_calls, any_order=True)
488
489 def test_enters_pidfile_context(self):
490 """ Should enter the PID file context manager. """
491 instance = self.test_instance
492 instance.pidfile = self.mock_pidlockfile
493 instance.open()
494 self.mock_pidlockfile.__enter__.assert_called_with()
495
496 def test_sets_is_open_true(self):
497 """ Should set the `is_open` property to True. """
498 instance = self.test_instance
499 instance.open()
500 self.assertEqual(True, instance.is_open)
501
502 def test_registers_close_method_for_atexit(self):
503 """ Should register the `close` method for atexit processing. """
504 instance = self.test_instance
505 close_method = instance.close
506 instance.open()
507 self.mock_module_daemon.register_atexit_function.assert_called_with(
508 close_method)
509
510
511 class DaemonContext_close_TestCase(DaemonContext_BaseTestCase):
512 """ Test cases for DaemonContext.close method. """
513
514 def setUp(self):
515 """ Set up test fixtures. """
516 super(DaemonContext_close_TestCase, self).setUp()
517
518 self.test_instance._is_open = True
519
520 def test_returns_immediately_if_not_is_open(self):
521 """ Should return immediately if is_open property is false. """
522 instance = self.test_instance
523 instance._is_open = False
524 instance.pidfile = object()
525 instance.close()
526 self.assertFalse(self.mock_pidlockfile.__exit__.called)
527
528 def test_exits_pidfile_context(self):
529 """ Should exit the PID file context manager. """
530 instance = self.test_instance
531 instance.pidfile = self.mock_pidlockfile
532 instance.close()
533 self.mock_pidlockfile.__exit__.assert_called_with(None, None, None)
534
535 def test_returns_none(self):
536 """ Should return None. """
537 instance = self.test_instance
538 expected_result = None
539 result = instance.close()
540 self.assertIs(result, expected_result)
541
542 def test_sets_is_open_false(self):
543 """ Should set the `is_open` property to False. """
544 instance = self.test_instance
545 instance.close()
546 self.assertEqual(False, instance.is_open)
547
548
549 @mock.patch.object(daemon.daemon.DaemonContext, "open")
550 class DaemonContext_context_manager_enter_TestCase(DaemonContext_BaseTestCase):
551 """ Test cases for DaemonContext.__enter__ method. """
552
553 def test_opens_daemon_context(self, mock_func_daemoncontext_open):
554 """ Should open the DaemonContext. """
555 instance = self.test_instance
556 instance.__enter__()
557 mock_func_daemoncontext_open.assert_called_with()
558
559 def test_returns_self_instance(self, mock_func_daemoncontext_open):
560 """ Should return DaemonContext instance. """
561 instance = self.test_instance
562 expected_result = instance
563 result = instance.__enter__()
564 self.assertIs(result, expected_result)
565
566
567 @mock.patch.object(daemon.daemon.DaemonContext, "close")
568 class DaemonContext_context_manager_exit_TestCase(DaemonContext_BaseTestCase):
569 """ Test cases for DaemonContext.__exit__ method. """
570
571 def setUp(self):
572 """ Set up test fixtures. """
573 super(DaemonContext_context_manager_exit_TestCase, self).setUp()
574
575 self.test_args = dict(
576 exc_type=object(),
577 exc_value=object(),
578 traceback=object(),
579 )
580
581 def test_closes_daemon_context(self, mock_func_daemoncontext_close):
582 """ Should close the DaemonContext. """
583 instance = self.test_instance
584 args = self.test_args
585 instance.__exit__(**args)
586 mock_func_daemoncontext_close.assert_called_with()
587
588 def test_returns_none(self, mock_func_daemoncontext_close):
589 """ Should return None, indicating exception was not handled. """
590 instance = self.test_instance
591 args = self.test_args
592 expected_result = None
593 result = instance.__exit__(**args)
594 self.assertIs(result, expected_result)
595
596
597 class DaemonContext_terminate_TestCase(DaemonContext_BaseTestCase):
598 """ Test cases for DaemonContext.terminate method. """
599
600 def setUp(self):
601 """ Set up test fixtures. """
602 super(DaemonContext_terminate_TestCase, self).setUp()
603
604 self.test_signal = signal.SIGTERM
605 self.test_frame = None
606 self.test_args = (self.test_signal, self.test_frame)
607
608 def test_raises_system_exit(self):
609 """ Should raise SystemExit. """
610 instance = self.test_instance
611 args = self.test_args
612 expected_exception = SystemExit
613 self.assertRaises(
614 expected_exception,
615 instance.terminate, *args)
616
617 def test_exception_message_contains_signal_number(self):
618 """ Should raise exception with a message containing signal number. """
619 instance = self.test_instance
620 args = self.test_args
621 signal_number = self.test_signal
622 expected_exception = SystemExit
623 exc = self.assertRaises(
624 expected_exception,
625 instance.terminate, *args)
626 self.assertIn(unicode(signal_number), unicode(exc))
627
628
629 class DaemonContext_get_exclude_file_descriptors_TestCase(
630 DaemonContext_BaseTestCase):
631 """ Test cases for DaemonContext._get_exclude_file_descriptors function. """
632
633 def setUp(self):
634 """ Set up test fixtures. """
635 super(
636 DaemonContext_get_exclude_file_descriptors_TestCase,
637 self).setUp()
638
639 self.test_files = {
640 2: FakeFileDescriptorStringIO(),
641 5: 5,
642 11: FakeFileDescriptorStringIO(),
643 17: None,
644 23: FakeFileDescriptorStringIO(),
645 37: 37,
646 42: FakeFileDescriptorStringIO(),
647 }
648 for (fileno, item) in self.test_files.items():
649 if hasattr(item, '_fileno'):
650 item._fileno = fileno
651 self.test_file_descriptors = set(
652 fd for (fd, item) in self.test_files.items()
653 if item is not None)
654 self.test_file_descriptors.update(
655 self.stream_files_by_name[name].fileno()
656 for name in ['stdin', 'stdout', 'stderr']
657 )
658
659 def test_returns_expected_file_descriptors(self):
660 """ Should return expected set of file descriptors. """
661 instance = self.test_instance
662 instance.files_preserve = list(self.test_files.values())
663 expected_result = self.test_file_descriptors
664 result = instance._get_exclude_file_descriptors()
665 self.assertEqual(expected_result, result)
666
667 def test_returns_stream_redirects_if_no_files_preserve(self):
668 """ Should return only stream redirects if no files_preserve. """
669 instance = self.test_instance
670 instance.files_preserve = None
671 expected_result = set(
672 stream.fileno()
673 for stream in self.stream_files_by_name.values())
674 result = instance._get_exclude_file_descriptors()
675 self.assertEqual(expected_result, result)
676
677 def test_returns_empty_set_if_no_files(self):
678 """ Should return empty set if no file options. """
679 instance = self.test_instance
680 for name in ['files_preserve', 'stdin', 'stdout', 'stderr']:
681 setattr(instance, name, None)
682 expected_result = set()
683 result = instance._get_exclude_file_descriptors()
684 self.assertEqual(expected_result, result)
685
686 def test_omits_non_file_streams(self):
687 """ Should omit non-file stream attributes. """
688 instance = self.test_instance
689 instance.files_preserve = list(self.test_files.values())
690 stream_files = self.stream_files_by_name
691 expected_result = self.test_file_descriptors.copy()
692 for (pseudo_stream_name, pseudo_stream) in stream_files.items():
693 test_non_file_object = object()
694 setattr(instance, pseudo_stream_name, test_non_file_object)
695 stream_fd = pseudo_stream.fileno()
696 expected_result.discard(stream_fd)
697 result = instance._get_exclude_file_descriptors()
698 self.assertEqual(expected_result, result)
699
700 def test_includes_verbatim_streams_without_file_descriptor(self):
701 """ Should include verbatim any stream without a file descriptor. """
702 instance = self.test_instance
703 instance.files_preserve = list(self.test_files.values())
704 stream_files = self.stream_files_by_name
705 mock_fileno_method = mock.MagicMock(
706 spec=sys.__stdin__.fileno,
707 side_effect=ValueError)
708 expected_result = self.test_file_descriptors.copy()
709 for (pseudo_stream_name, pseudo_stream) in stream_files.items():
710 test_non_fd_stream = StringIO()
711 if not hasattr(test_non_fd_stream, 'fileno'):
712 # Python < 3 StringIO doesn't have ‘fileno’ at all.
713 # Add a method which raises an exception.
714 test_non_fd_stream.fileno = mock_fileno_method
715 setattr(instance, pseudo_stream_name, test_non_fd_stream)
716 stream_fd = pseudo_stream.fileno()
717 expected_result.discard(stream_fd)
718 expected_result.add(test_non_fd_stream)
719 result = instance._get_exclude_file_descriptors()
720 self.assertEqual(expected_result, result)
721
722 def test_omits_none_streams(self):
723 """ Should omit any stream attribute which is None. """
724 instance = self.test_instance
725 instance.files_preserve = list(self.test_files.values())
726 stream_files = self.stream_files_by_name
727 expected_result = self.test_file_descriptors.copy()
728 for (pseudo_stream_name, pseudo_stream) in stream_files.items():
729 setattr(instance, pseudo_stream_name, None)
730 stream_fd = pseudo_stream.fileno()
731 expected_result.discard(stream_fd)
732 result = instance._get_exclude_file_descriptors()
733 self.assertEqual(expected_result, result)
734
735
736 class DaemonContext_make_signal_handler_TestCase(DaemonContext_BaseTestCase):
737 """ Test cases for DaemonContext._make_signal_handler function. """
738
739 def test_returns_ignore_for_none(self):
740 """ Should return SIG_IGN when None handler specified. """
741 instance = self.test_instance
742 target = None
743 expected_result = signal.SIG_IGN
744 result = instance._make_signal_handler(target)
745 self.assertEqual(expected_result, result)
746
747 def test_returns_method_for_name(self):
748 """ Should return method of DaemonContext when name specified. """
749 instance = self.test_instance
750 target = 'terminate'
751 expected_result = instance.terminate
752 result = instance._make_signal_handler(target)
753 self.assertEqual(expected_result, result)
754
755 def test_raises_error_for_unknown_name(self):
756 """ Should raise AttributeError for unknown method name. """
757 instance = self.test_instance
758 target = 'b0gUs'
759 expected_error = AttributeError
760 self.assertRaises(
761 expected_error,
762 instance._make_signal_handler, target)
763
764 def test_returns_object_for_object(self):
765 """ Should return same object for any other object. """
766 instance = self.test_instance
767 target = object()
768 expected_result = target
769 result = instance._make_signal_handler(target)
770 self.assertEqual(expected_result, result)
771
772
773 class DaemonContext_make_signal_handler_map_TestCase(
774 DaemonContext_BaseTestCase):
775 """ Test cases for DaemonContext._make_signal_handler_map function. """
776
777 def setUp(self):
778 """ Set up test fixtures. """
779 super(DaemonContext_make_signal_handler_map_TestCase, self).setUp()
780
781 self.test_instance.signal_map = {
782 object(): object(),
783 object(): object(),
784 object(): object(),
785 }
786
787 self.test_signal_handlers = dict(
788 (key, object())
789 for key in self.test_instance.signal_map.values())
790 self.test_signal_handler_map = dict(
791 (key, self.test_signal_handlers[target])
792 for (key, target) in self.test_instance.signal_map.items())
793
794 def fake_make_signal_handler(target):
795 return self.test_signal_handlers[target]
796
797 func_patcher_make_signal_handler = mock.patch.object(
798 daemon.daemon.DaemonContext, "_make_signal_handler",
799 side_effect=fake_make_signal_handler)
800 self.mock_func_make_signal_handler = (
801 func_patcher_make_signal_handler.start())
802 self.addCleanup(func_patcher_make_signal_handler.stop)
803
804 def test_returns_constructed_signal_handler_items(self):
805 """ Should return items as constructed via make_signal_handler. """
806 instance = self.test_instance
807 expected_result = self.test_signal_handler_map
808 result = instance._make_signal_handler_map()
809 self.assertEqual(expected_result, result)
810
811
812 try:
813 FileNotFoundError
814 except NameError:
815 # Python 2 uses IOError.
816 FileNotFoundError = functools.partial(IOError, errno.ENOENT)
817
818
819 @mock.patch.object(os, "chdir")
820 class change_working_directory_TestCase(scaffold.TestCase):
821 """ Test cases for change_working_directory function. """
822
823 def setUp(self):
824 """ Set up test fixtures. """
825 super(change_working_directory_TestCase, self).setUp()
826
827 self.test_directory = object()
828 self.test_args = dict(
829 directory=self.test_directory,
830 )
831
832 def test_changes_working_directory_to_specified_directory(
833 self,
834 mock_func_os_chdir):
835 """ Should change working directory to specified directory. """
836 args = self.test_args
837 directory = self.test_directory
838 daemon.daemon.change_working_directory(**args)
839 mock_func_os_chdir.assert_called_with(directory)
840
841 def test_raises_daemon_error_on_os_error(
842 self,
843 mock_func_os_chdir):
844 """ Should raise a DaemonError on receiving an IOError. """
845 args = self.test_args
846 test_error = FileNotFoundError("No such directory")
847 mock_func_os_chdir.side_effect = test_error
848 expected_error = daemon.daemon.DaemonOSEnvironmentError
849 exc = self.assertRaises(
850 expected_error,
851 daemon.daemon.change_working_directory, **args)
852 self.assertEqual(test_error, exc.__cause__)
853
854 def test_error_message_contains_original_error_message(
855 self,
856 mock_func_os_chdir):
857 """ Should raise a DaemonError with original message. """
858 args = self.test_args
859 test_error = FileNotFoundError("No such directory")
860 mock_func_os_chdir.side_effect = test_error
861 expected_error = daemon.daemon.DaemonOSEnvironmentError
862 exc = self.assertRaises(
863 expected_error,
864 daemon.daemon.change_working_directory, **args)
865 self.assertIn(unicode(test_error), unicode(exc))
866
867
868 @mock.patch.object(os, "chroot")
869 @mock.patch.object(os, "chdir")
870 class change_root_directory_TestCase(scaffold.TestCase):
871 """ Test cases for change_root_directory function. """
872
873 def setUp(self):
874 """ Set up test fixtures. """
875 super(change_root_directory_TestCase, self).setUp()
876
877 self.test_directory = object()
878 self.test_args = dict(
879 directory=self.test_directory,
880 )
881
882 def test_changes_working_directory_to_specified_directory(
883 self,
884 mock_func_os_chdir, mock_func_os_chroot):
885 """ Should change working directory to specified directory. """
886 args = self.test_args
887 directory = self.test_directory
888 daemon.daemon.change_root_directory(**args)
889 mock_func_os_chdir.assert_called_with(directory)
890
891 def test_changes_root_directory_to_specified_directory(
892 self,
893 mock_func_os_chdir, mock_func_os_chroot):
894 """ Should change root directory to specified directory. """
895 args = self.test_args
896 directory = self.test_directory
897 daemon.daemon.change_root_directory(**args)
898 mock_func_os_chroot.assert_called_with(directory)
899
900 def test_raises_daemon_error_on_os_error_from_chdir(
901 self,
902 mock_func_os_chdir, mock_func_os_chroot):
903 """ Should raise a DaemonError on receiving an IOError from chdir. """
904 args = self.test_args
905 test_error = FileNotFoundError("No such directory")
906 mock_func_os_chdir.side_effect = test_error
907 expected_error = daemon.daemon.DaemonOSEnvironmentError
908 exc = self.assertRaises(
909 expected_error,
910 daemon.daemon.change_root_directory, **args)
911 self.assertEqual(test_error, exc.__cause__)
912
913 def test_raises_daemon_error_on_os_error_from_chroot(
914 self,
915 mock_func_os_chdir, mock_func_os_chroot):
916 """ Should raise a DaemonError on receiving an OSError from chroot. """
917 args = self.test_args
918 test_error = OSError(errno.EPERM, "No chroot for you!")
919 mock_func_os_chroot.side_effect = test_error
920 expected_error = daemon.daemon.DaemonOSEnvironmentError
921 exc = self.assertRaises(
922 expected_error,
923 daemon.daemon.change_root_directory, **args)
924 self.assertEqual(test_error, exc.__cause__)
925
926 def test_error_message_contains_original_error_message(
927 self,
928 mock_func_os_chdir, mock_func_os_chroot):
929 """ Should raise a DaemonError with original message. """
930 args = self.test_args
931 test_error = FileNotFoundError("No such directory")
932 mock_func_os_chdir.side_effect = test_error
933 expected_error = daemon.daemon.DaemonOSEnvironmentError
934 exc = self.assertRaises(
935 expected_error,
936 daemon.daemon.change_root_directory, **args)
937 self.assertIn(unicode(test_error), unicode(exc))
938
939
940 @mock.patch.object(os, "umask")
941 class change_file_creation_mask_TestCase(scaffold.TestCase):
942 """ Test cases for change_file_creation_mask function. """
943
944 def setUp(self):
945 """ Set up test fixtures. """
946 super(change_file_creation_mask_TestCase, self).setUp()
947
948 self.test_mask = object()
949 self.test_args = dict(
950 mask=self.test_mask,
951 )
952
953 def test_changes_umask_to_specified_mask(self, mock_func_os_umask):
954 """ Should change working directory to specified directory. """
955 args = self.test_args
956 mask = self.test_mask
957 daemon.daemon.change_file_creation_mask(**args)
958 mock_func_os_umask.assert_called_with(mask)
959
960 def test_raises_daemon_error_on_os_error_from_chdir(
961 self,
962 mock_func_os_umask):
963 """ Should raise a DaemonError on receiving an OSError from umask. """
964 args = self.test_args
965 test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?")
966 mock_func_os_umask.side_effect = test_error
967 expected_error = daemon.daemon.DaemonOSEnvironmentError
968 exc = self.assertRaises(
969 expected_error,
970 daemon.daemon.change_file_creation_mask, **args)
971 self.assertEqual(test_error, exc.__cause__)
972
973 def test_error_message_contains_original_error_message(
974 self,
975 mock_func_os_umask):
976 """ Should raise a DaemonError with original message. """
977 args = self.test_args
978 test_error = FileNotFoundError("No such directory")
979 mock_func_os_umask.side_effect = test_error
980 expected_error = daemon.daemon.DaemonOSEnvironmentError
981 exc = self.assertRaises(
982 expected_error,
983 daemon.daemon.change_file_creation_mask, **args)
984 self.assertIn(unicode(test_error), unicode(exc))
985
986
987 @mock.patch.object(os, "setgid")
988 @mock.patch.object(os, "setuid")
989 class change_process_owner_TestCase(scaffold.TestCase):
990 """ Test cases for change_process_owner function. """
991
992 def setUp(self):
993 """ Set up test fixtures. """
994 super(change_process_owner_TestCase, self).setUp()
995
996 self.test_uid = object()
997 self.test_gid = object()
998 self.test_args = dict(
999 uid=self.test_uid,
1000 gid=self.test_gid,
1001 )
1002
1003 def test_changes_gid_and_uid_in_order(
1004 self,
1005 mock_func_os_setuid, mock_func_os_setgid):
1006 """ Should change process GID and UID in correct order.
1007
1008 Since the process requires appropriate privilege to use
1009 either of `setuid` or `setgid`, changing the UID must be
1010 done last.
1011
1012 """
1013 args = self.test_args
1014 daemon.daemon.change_process_owner(**args)
1015 mock_func_os_setuid.assert_called()
1016 mock_func_os_setgid.assert_called()
1017
1018 def test_changes_group_id_to_gid(
1019 self,
1020 mock_func_os_setuid, mock_func_os_setgid):
1021 """ Should change process GID to specified value. """
1022 args = self.test_args
1023 gid = self.test_gid
1024 daemon.daemon.change_process_owner(**args)
1025 mock_func_os_setgid.assert_called(gid)
1026
1027 def test_changes_user_id_to_uid(
1028 self,
1029 mock_func_os_setuid, mock_func_os_setgid):
1030 """ Should change process UID to specified value. """
1031 args = self.test_args
1032 uid = self.test_uid
1033 daemon.daemon.change_process_owner(**args)
1034 mock_func_os_setuid.assert_called(uid)
1035
1036 def test_raises_daemon_error_on_os_error_from_setgid(
1037 self,
1038 mock_func_os_setuid, mock_func_os_setgid):
1039 """ Should raise a DaemonError on receiving an OSError from setgid. """
1040 args = self.test_args
1041 test_error = OSError(errno.EPERM, "No switching for you!")
1042 mock_func_os_setgid.side_effect = test_error
1043 expected_error = daemon.daemon.DaemonOSEnvironmentError
1044 exc = self.assertRaises(
1045 expected_error,
1046 daemon.daemon.change_process_owner, **args)
1047 self.assertEqual(test_error, exc.__cause__)
1048
1049 def test_raises_daemon_error_on_os_error_from_setuid(
1050 self,
1051 mock_func_os_setuid, mock_func_os_setgid):
1052 """ Should raise a DaemonError on receiving an OSError from setuid. """
1053 args = self.test_args
1054 test_error = OSError(errno.EPERM, "No switching for you!")
1055 mock_func_os_setuid.side_effect = test_error
1056 expected_error = daemon.daemon.DaemonOSEnvironmentError
1057 exc = self.assertRaises(
1058 expected_error,
1059 daemon.daemon.change_process_owner, **args)
1060 self.assertEqual(test_error, exc.__cause__)
1061
1062 def test_error_message_contains_original_error_message(
1063 self,
1064 mock_func_os_setuid, mock_func_os_setgid):
1065 """ Should raise a DaemonError with original message. """
1066 args = self.test_args
1067 test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?")
1068 mock_func_os_setuid.side_effect = test_error
1069 expected_error = daemon.daemon.DaemonOSEnvironmentError
1070 exc = self.assertRaises(
1071 expected_error,
1072 daemon.daemon.change_process_owner, **args)
1073 self.assertIn(unicode(test_error), unicode(exc))
1074
1075
1076 RLimitResult = collections.namedtuple('RLimitResult', ['soft', 'hard'])
1077
1078 fake_RLIMIT_CORE = object()
1079
1080 @mock.patch.object(resource, "RLIMIT_CORE", new=fake_RLIMIT_CORE)
1081 @mock.patch.object(resource, "setrlimit", side_effect=(lambda x, y: None))
1082 @mock.patch.object(resource, "getrlimit", side_effect=(lambda x: None))
1083 class prevent_core_dump_TestCase(scaffold.TestCase):
1084 """ Test cases for prevent_core_dump function. """
1085
1086 def setUp(self):
1087 """ Set up test fixtures. """
1088 super(prevent_core_dump_TestCase, self).setUp()
1089
1090 def test_sets_core_limit_to_zero(
1091 self,
1092 mock_func_resource_getrlimit, mock_func_resource_setrlimit):
1093 """ Should set the RLIMIT_CORE resource to zero. """
1094 expected_resource = fake_RLIMIT_CORE
1095 expected_limit = tuple(RLimitResult(soft=0, hard=0))
1096 daemon.daemon.prevent_core_dump()
1097 mock_func_resource_getrlimit.assert_called_with(expected_resource)
1098 mock_func_resource_setrlimit.assert_called_with(
1099 expected_resource, expected_limit)
1100
1101 def test_raises_error_when_no_core_resource(
1102 self,
1103 mock_func_resource_getrlimit, mock_func_resource_setrlimit):
1104 """ Should raise DaemonError if no RLIMIT_CORE resource. """
1105 test_error = ValueError("Bogus platform doesn't have RLIMIT_CORE")
1106 def fake_getrlimit(res):
1107 if res == resource.RLIMIT_CORE:
1108 raise test_error
1109 else:
1110 return None
1111 mock_func_resource_getrlimit.side_effect = fake_getrlimit
1112 expected_error = daemon.daemon.DaemonOSEnvironmentError
1113 exc = self.assertRaises(
1114 expected_error,
1115 daemon.daemon.prevent_core_dump)
1116 self.assertEqual(test_error, exc.__cause__)
1117
1118
1119 @mock.patch.object(os, "close")
1120 class close_file_descriptor_if_open_TestCase(scaffold.TestCase):
1121 """ Test cases for close_file_descriptor_if_open function. """
1122
1123 def setUp(self):
1124 """ Set up test fixtures. """
1125 super(close_file_descriptor_if_open_TestCase, self).setUp()
1126
1127 self.fake_fd = 274
1128
1129 def test_requests_file_descriptor_close(self, mock_func_os_close):
1130 """ Should request close of file descriptor. """
1131 fd = self.fake_fd
1132 daemon.daemon.close_file_descriptor_if_open(fd)
1133 mock_func_os_close.assert_called_with(fd)
1134
1135 def test_ignores_badfd_error_on_close(self, mock_func_os_close):
1136 """ Should ignore OSError EBADF when closing. """
1137 fd = self.fake_fd
1138 test_error = OSError(errno.EBADF, "Bad file descriptor")
1139 def fake_os_close(fd):
1140 raise test_error
1141 mock_func_os_close.side_effect = fake_os_close
1142 daemon.daemon.close_file_descriptor_if_open(fd)
1143 mock_func_os_close.assert_called_with(fd)
1144
1145 def test_raises_error_if_oserror_on_close(self, mock_func_os_close):
1146 """ Should raise DaemonError if an OSError occurs when closing. """
1147 fd = self.fake_fd
1148 test_error = OSError(object(), "Unexpected error")
1149 def fake_os_close(fd):
1150 raise test_error
1151 mock_func_os_close.side_effect = fake_os_close
1152 expected_error = daemon.daemon.DaemonOSEnvironmentError
1153 exc = self.assertRaises(
1154 expected_error,
1155 daemon.daemon.close_file_descriptor_if_open, fd)
1156 self.assertEqual(test_error, exc.__cause__)
1157
1158 def test_raises_error_if_ioerror_on_close(self, mock_func_os_close):
1159 """ Should raise DaemonError if an IOError occurs when closing. """
1160 fd = self.fake_fd
1161 test_error = IOError(object(), "Unexpected error")
1162 def fake_os_close(fd):
1163 raise test_error
1164 mock_func_os_close.side_effect = fake_os_close
1165 expected_error = daemon.daemon.DaemonOSEnvironmentError
1166 exc = self.assertRaises(
1167 expected_error,
1168 daemon.daemon.close_file_descriptor_if_open, fd)
1169 self.assertEqual(test_error, exc.__cause__)
1170
1171
1172 class maxfd_TestCase(scaffold.TestCase):
1173 """ Test cases for module MAXFD constant. """
1174
1175 def test_positive(self):
1176 """ Should be a positive number. """
1177 maxfd = daemon.daemon.MAXFD
1178 self.assertTrue(maxfd > 0)
1179
1180 def test_integer(self):
1181 """ Should be an integer. """
1182 maxfd = daemon.daemon.MAXFD
1183 self.assertEqual(int(maxfd), maxfd)
1184
1185 def test_reasonably_high(self):
1186 """ Should be reasonably high for default open files limit.
1187
1188 If the system reports a limit of “infinity” on maximum
1189 file descriptors, we still need a finite number in order
1190 to close “all” of them. Ensure this is reasonably high
1191 to catch most use cases.
1192
1193 """
1194 expected_minimum = 2048
1195 maxfd = daemon.daemon.MAXFD
1196 self.assertTrue(
1197 expected_minimum <= maxfd,
1198 msg=(
1199 "MAXFD should be at least {minimum!r}"
1200 " (got {maxfd!r})".format(
1201 minimum=expected_minimum, maxfd=maxfd)))
1202
1203
1204 fake_default_maxfd = 8
1205 fake_RLIMIT_NOFILE = object()
1206 fake_RLIM_INFINITY = object()
1207 fake_rlimit_nofile_large = 2468
1208
1209 def fake_getrlimit_nofile_soft_infinity(resource):
1210 result = RLimitResult(soft=fake_RLIM_INFINITY, hard=object())
1211 if resource != fake_RLIMIT_NOFILE:
1212 result = NotImplemented
1213 return result
1214
1215 def fake_getrlimit_nofile_hard_infinity(resource):
1216 result = RLimitResult(soft=object(), hard=fake_RLIM_INFINITY)
1217 if resource != fake_RLIMIT_NOFILE:
1218 result = NotImplemented
1219 return result
1220
1221 def fake_getrlimit_nofile_hard_large(resource):
1222 result = RLimitResult(soft=object(), hard=fake_rlimit_nofile_large)
1223 if resource != fake_RLIMIT_NOFILE:
1224 result = NotImplemented
1225 return result
1226
1227 @mock.patch.object(daemon.daemon, "MAXFD", new=fake_default_maxfd)
1228 @mock.patch.object(resource, "RLIMIT_NOFILE", new=fake_RLIMIT_NOFILE)
1229 @mock.patch.object(resource, "RLIM_INFINITY", new=fake_RLIM_INFINITY)
1230 @mock.patch.object(
1231 resource, "getrlimit",
1232 side_effect=fake_getrlimit_nofile_hard_large)
1233 class get_maximum_file_descriptors_TestCase(scaffold.TestCase):
1234 """ Test cases for get_maximum_file_descriptors function. """
1235
1236 def test_returns_system_hard_limit(self, mock_func_resource_getrlimit):
1237 """ Should return process hard limit on number of files. """
1238 expected_result = fake_rlimit_nofile_large
1239 result = daemon.daemon.get_maximum_file_descriptors()
1240 self.assertEqual(expected_result, result)
1241
1242 def test_returns_module_default_if_hard_limit_infinity(
1243 self, mock_func_resource_getrlimit):
1244 """ Should return module MAXFD if hard limit is infinity. """
1245 mock_func_resource_getrlimit.side_effect = (
1246 fake_getrlimit_nofile_hard_infinity)
1247 expected_result = fake_default_maxfd
1248 result = daemon.daemon.get_maximum_file_descriptors()
1249 self.assertEqual(expected_result, result)
1250
1251
1252 def fake_get_maximum_file_descriptors():
1253 return fake_default_maxfd
1254
1255 @mock.patch.object(resource, "RLIMIT_NOFILE", new=fake_RLIMIT_NOFILE)
1256 @mock.patch.object(resource, "RLIM_INFINITY", new=fake_RLIM_INFINITY)
1257 @mock.patch.object(
1258 resource, "getrlimit",
1259 new=fake_getrlimit_nofile_soft_infinity)
1260 @mock.patch.object(
1261 daemon.daemon, "get_maximum_file_descriptors",
1262 new=fake_get_maximum_file_descriptors)
1263 @mock.patch.object(daemon.daemon, "close_file_descriptor_if_open")
1264 class close_all_open_files_TestCase(scaffold.TestCase):
1265 """ Test cases for close_all_open_files function. """
1266
1267 def test_requests_all_open_files_to_close(
1268 self, mock_func_close_file_descriptor_if_open):
1269 """ Should request close of all open files. """
1270 expected_file_descriptors = range(fake_default_maxfd)
1271 expected_calls = [
1272 mock.call(fd) for fd in expected_file_descriptors]
1273 daemon.daemon.close_all_open_files()
1274 mock_func_close_file_descriptor_if_open.assert_has_calls(
1275 expected_calls, any_order=True)
1276
1277 def test_requests_all_but_excluded_files_to_close(
1278 self, mock_func_close_file_descriptor_if_open):
1279 """ Should request close of all open files but those excluded. """
1280 test_exclude = set([3, 7])
1281 args = dict(
1282 exclude=test_exclude,
1283 )
1284 expected_file_descriptors = set(
1285 fd for fd in range(fake_default_maxfd)
1286 if fd not in test_exclude)
1287 expected_calls = [
1288 mock.call(fd) for fd in expected_file_descriptors]
1289 daemon.daemon.close_all_open_files(**args)
1290 mock_func_close_file_descriptor_if_open.assert_has_calls(
1291 expected_calls, any_order=True)
1292
1293
1294 class detach_process_context_TestCase(scaffold.TestCase):
1295 """ Test cases for detach_process_context function. """
1296
1297 class FakeOSExit(SystemExit):
1298 """ Fake exception raised for os._exit(). """
1299
1300 def setUp(self):
1301 """ Set up test fixtures. """
1302 super(detach_process_context_TestCase, self).setUp()
1303
1304 self.mock_module_os = mock.MagicMock(wraps=os)
1305
1306 fake_pids = [0, 0]
1307 func_patcher_os_fork = mock.patch.object(
1308 os, "fork",
1309 side_effect=iter(fake_pids))
1310 self.mock_func_os_fork = func_patcher_os_fork.start()
1311 self.addCleanup(func_patcher_os_fork.stop)
1312 self.mock_module_os.attach_mock(self.mock_func_os_fork, "fork")
1313
1314 func_patcher_os_setsid = mock.patch.object(os, "setsid")
1315 self.mock_func_os_setsid = func_patcher_os_setsid.start()
1316 self.addCleanup(func_patcher_os_setsid.stop)
1317 self.mock_module_os.attach_mock(self.mock_func_os_setsid, "setsid")
1318
1319 def raise_os_exit(status=None):
1320 raise self.FakeOSExit(status)
1321
1322 func_patcher_os_force_exit = mock.patch.object(
1323 os, "_exit",
1324 side_effect=raise_os_exit)
1325 self.mock_func_os_force_exit = func_patcher_os_force_exit.start()
1326 self.addCleanup(func_patcher_os_force_exit.stop)
1327 self.mock_module_os.attach_mock(self.mock_func_os_force_exit, "_exit")
1328
1329 def test_parent_exits(self):
1330 """ Parent process should exit. """
1331 parent_pid = 23
1332 self.mock_func_os_fork.side_effect = iter([parent_pid])
1333 self.assertRaises(
1334 self.FakeOSExit,
1335 daemon.daemon.detach_process_context)
1336 self.mock_module_os.assert_has_calls([
1337 mock.call.fork(),
1338 mock.call._exit(0),
1339 ])
1340
1341 def test_first_fork_error_raises_error(self):
1342 """ Error on first fork should raise DaemonProcessDetachError. """
1343 fork_errno = 13
1344 fork_strerror = "Bad stuff happened"
1345 test_error = OSError(fork_errno, fork_strerror)
1346 test_pids_iter = iter([test_error])
1347
1348 def fake_fork():
1349 next_item = next(test_pids_iter)
1350 if isinstance(next_item, Exception):
1351 raise next_item
1352 else:
1353 return next_item
1354
1355 self.mock_func_os_fork.side_effect = fake_fork
1356 exc = self.assertRaises(
1357 daemon.daemon.DaemonProcessDetachError,
1358 daemon.daemon.detach_process_context)
1359 self.assertEqual(test_error, exc.__cause__)
1360 self.mock_module_os.assert_has_calls([
1361 mock.call.fork(),
1362 ])
1363
1364 def test_child_starts_new_process_group(self):
1365 """ Child should start new process group. """
1366 daemon.daemon.detach_process_context()
1367 self.mock_module_os.assert_has_calls([
1368 mock.call.fork(),
1369 mock.call.setsid(),
1370 ])
1371
1372 def test_child_forks_next_parent_exits(self):
1373 """ Child should fork, then exit if parent. """
1374 fake_pids = [0, 42]
1375 self.mock_func_os_fork.side_effect = iter(fake_pids)
1376 self.assertRaises(
1377 self.FakeOSExit,
1378 daemon.daemon.detach_process_context)
1379 self.mock_module_os.assert_has_calls([
1380 mock.call.fork(),
1381 mock.call.setsid(),
1382 mock.call.fork(),
1383 mock.call._exit(0),
1384 ])
1385
1386 def test_second_fork_error_reports_to_stderr(self):
1387 """ Error on second fork should cause report to stderr. """
1388 fork_errno = 17
1389 fork_strerror = "Nasty stuff happened"
1390 test_error = OSError(fork_errno, fork_strerror)
1391 test_pids_iter = iter([0, test_error])
1392
1393 def fake_fork():
1394 next_item = next(test_pids_iter)
1395 if isinstance(next_item, Exception):
1396 raise next_item
1397 else:
1398 return next_item
1399
1400 self.mock_func_os_fork.side_effect = fake_fork
1401 exc = self.assertRaises(
1402 daemon.daemon.DaemonProcessDetachError,
1403 daemon.daemon.detach_process_context)
1404 self.assertEqual(test_error, exc.__cause__)
1405 self.mock_module_os.assert_has_calls([
1406 mock.call.fork(),
1407 mock.call.setsid(),
1408 mock.call.fork(),
1409 ])
1410
1411 def test_child_forks_next_child_continues(self):
1412 """ Child should fork, then continue if child. """
1413 daemon.daemon.detach_process_context()
1414 self.mock_module_os.assert_has_calls([
1415 mock.call.fork(),
1416 mock.call.setsid(),
1417 mock.call.fork(),
1418 ])
1419
1420
1421 @mock.patch("os.getppid", return_value=765)
1422 class is_process_started_by_init_TestCase(scaffold.TestCase):
1423 """ Test cases for is_process_started_by_init function. """
1424
1425 def test_returns_false_by_default(self, mock_func_os_getppid):
1426 """ Should return False under normal circumstances. """
1427 expected_result = False
1428 result = daemon.daemon.is_process_started_by_init()
1429 self.assertIs(result, expected_result)
1430
1431 def test_returns_true_if_parent_process_is_init(
1432 self, mock_func_os_getppid):
1433 """ Should return True if parent process is `init`. """
1434 init_pid = 1
1435 mock_func_os_getppid.return_value = init_pid
1436 expected_result = True
1437 result = daemon.daemon.is_process_started_by_init()
1438 self.assertIs(result, expected_result)
1439
1440
1441 class is_socket_TestCase(scaffold.TestCase):
1442 """ Test cases for is_socket function. """
1443
1444 def setUp(self):
1445 """ Set up test fixtures. """
1446 super(is_socket_TestCase, self).setUp()
1447
1448 def fake_getsockopt(level, optname, buflen=None):
1449 result = object()
1450 if optname is socket.SO_TYPE:
1451 result = socket.SOCK_RAW
1452 return result
1453
1454 self.fake_socket_getsockopt_func = fake_getsockopt
1455
1456 self.fake_socket_error = socket.error(
1457 errno.ENOTSOCK,
1458 "Socket operation on non-socket")
1459
1460 self.mock_socket = mock.MagicMock(spec=socket.socket)
1461 self.mock_socket.getsockopt.side_effect = self.fake_socket_error
1462
1463 def fake_socket_fromfd(fd, family, type, proto=None):
1464 return self.mock_socket
1465
1466 func_patcher_socket_fromfd = mock.patch.object(
1467 socket, "fromfd",
1468 side_effect=fake_socket_fromfd)
1469 func_patcher_socket_fromfd.start()
1470 self.addCleanup(func_patcher_socket_fromfd.stop)
1471
1472 def test_returns_false_by_default(self):
1473 """ Should return False under normal circumstances. """
1474 test_fd = 23
1475 expected_result = False
1476 result = daemon.daemon.is_socket(test_fd)
1477 self.assertIs(result, expected_result)
1478
1479 def test_returns_true_if_stdin_is_socket(self):
1480 """ Should return True if `stdin` is a socket. """
1481 test_fd = 23
1482 getsockopt = self.mock_socket.getsockopt
1483 getsockopt.side_effect = self.fake_socket_getsockopt_func
1484 expected_result = True
1485 result = daemon.daemon.is_socket(test_fd)
1486 self.assertIs(result, expected_result)
1487
1488 def test_returns_false_if_stdin_socket_raises_error(self):
1489 """ Should return True if `stdin` is a socket and raises error. """
1490 test_fd = 23
1491 getsockopt = self.mock_socket.getsockopt
1492 getsockopt.side_effect = socket.error(
1493 object(), "Weird socket stuff")
1494 expected_result = True
1495 result = daemon.daemon.is_socket(test_fd)
1496 self.assertIs(result, expected_result)
1497
1498
1499 class is_process_started_by_superserver_TestCase(scaffold.TestCase):
1500 """ Test cases for is_process_started_by_superserver function. """
1501
1502 def setUp(self):
1503 """ Set up test fixtures. """
1504 super(is_process_started_by_superserver_TestCase, self).setUp()
1505
1506 def fake_is_socket(fd):
1507 if sys.__stdin__.fileno() == fd:
1508 result = self.fake_stdin_is_socket_func()
1509 else:
1510 result = False
1511 return result
1512
1513 self.fake_stdin_is_socket_func = (lambda: False)
1514
1515 func_patcher_is_socket = mock.patch.object(
1516 daemon.daemon, "is_socket",
1517 side_effect=fake_is_socket)
1518 func_patcher_is_socket.start()
1519 self.addCleanup(func_patcher_is_socket.stop)
1520
1521 def test_returns_false_by_default(self):
1522 """ Should return False under normal circumstances. """
1523 expected_result = False
1524 result = daemon.daemon.is_process_started_by_superserver()
1525 self.assertIs(result, expected_result)
1526
1527 def test_returns_true_if_stdin_is_socket(self):
1528 """ Should return True if `stdin` is a socket. """
1529 self.fake_stdin_is_socket_func = (lambda: True)
1530 expected_result = True
1531 result = daemon.daemon.is_process_started_by_superserver()
1532 self.assertIs(result, expected_result)
1533
1534
1535 @mock.patch.object(
1536 daemon.daemon, "is_process_started_by_superserver",
1537 return_value=False)
1538 @mock.patch.object(
1539 daemon.daemon, "is_process_started_by_init",
1540 return_value=False)
1541 class is_detach_process_context_required_TestCase(scaffold.TestCase):
1542 """ Test cases for is_detach_process_context_required function. """
1543
1544 def test_returns_true_by_default(
1545 self,
1546 mock_func_is_process_started_by_init,
1547 mock_func_is_process_started_by_superserver):
1548 """ Should return True under normal circumstances. """
1549 expected_result = True
1550 result = daemon.daemon.is_detach_process_context_required()
1551 self.assertIs(result, expected_result)
1552
1553 def test_returns_false_if_started_by_init(
1554 self,
1555 mock_func_is_process_started_by_init,
1556 mock_func_is_process_started_by_superserver):
1557 """ Should return False if current process started by init. """
1558 mock_func_is_process_started_by_init.return_value = True
1559 expected_result = False
1560 result = daemon.daemon.is_detach_process_context_required()
1561 self.assertIs(result, expected_result)
1562
1563 def test_returns_true_if_started_by_superserver(
1564 self,
1565 mock_func_is_process_started_by_init,
1566 mock_func_is_process_started_by_superserver):
1567 """ Should return False if current process started by superserver. """
1568 mock_func_is_process_started_by_superserver.return_value = True
1569 expected_result = False
1570 result = daemon.daemon.is_detach_process_context_required()
1571 self.assertIs(result, expected_result)
1572
1573
1574 def setup_streams_fixtures(testcase):
1575 """ Set up common test fixtures for standard streams. """
1576 testcase.stream_file_paths = dict(
1577 stdin=tempfile.mktemp(),
1578 stdout=tempfile.mktemp(),
1579 stderr=tempfile.mktemp(),
1580 )
1581
1582 testcase.stream_files_by_name = dict(
1583 (name, FakeFileDescriptorStringIO())
1584 for name in ['stdin', 'stdout', 'stderr']
1585 )
1586
1587 testcase.stream_files_by_path = dict(
1588 (testcase.stream_file_paths[name],
1589 testcase.stream_files_by_name[name])
1590 for name in ['stdin', 'stdout', 'stderr']
1591 )
1592
1593
1594 @mock.patch.object(os, "dup2")
1595 class redirect_stream_TestCase(scaffold.TestCase):
1596 """ Test cases for redirect_stream function. """
1597
1598 def setUp(self):
1599 """ Set up test fixtures. """
1600 super(redirect_stream_TestCase, self).setUp()
1601
1602 self.test_system_stream = FakeFileDescriptorStringIO()
1603 self.test_target_stream = FakeFileDescriptorStringIO()
1604 self.test_null_file = FakeFileDescriptorStringIO()
1605
1606 def fake_os_open(path, flag, mode=None):
1607 if path == os.devnull:
1608 result = self.test_null_file.fileno()
1609 else:
1610 raise FileNotFoundError("No such file", path)
1611 return result
1612
1613 func_patcher_os_open = mock.patch.object(
1614 os, "open",
1615 side_effect=fake_os_open)
1616 self.mock_func_os_open = func_patcher_os_open.start()
1617 self.addCleanup(func_patcher_os_open.stop)
1618
1619 def test_duplicates_target_file_descriptor(
1620 self, mock_func_os_dup2):
1621 """ Should duplicate file descriptor from target to system stream. """
1622 system_stream = self.test_system_stream
1623 system_fileno = system_stream.fileno()
1624 target_stream = self.test_target_stream
1625 target_fileno = target_stream.fileno()
1626 daemon.daemon.redirect_stream(system_stream, target_stream)
1627 mock_func_os_dup2.assert_called_with(target_fileno, system_fileno)
1628
1629 def test_duplicates_null_file_descriptor_by_default(
1630 self, mock_func_os_dup2):
1631 """ Should by default duplicate the null file to the system stream. """
1632 system_stream = self.test_system_stream
1633 system_fileno = system_stream.fileno()
1634 target_stream = None
1635 null_path = os.devnull
1636 null_flag = os.O_RDWR
1637 null_file = self.test_null_file
1638 null_fileno = null_file.fileno()
1639 daemon.daemon.redirect_stream(system_stream, target_stream)
1640 self.mock_func_os_open.assert_called_with(null_path, null_flag)
1641 mock_func_os_dup2.assert_called_with(null_fileno, system_fileno)
1642
1643
1644 class make_default_signal_map_TestCase(scaffold.TestCase):
1645 """ Test cases for make_default_signal_map function. """
1646
1647 def setUp(self):
1648 """ Set up test fixtures. """
1649 super(make_default_signal_map_TestCase, self).setUp()
1650
1651 # Use whatever default string type this Python version needs.
1652 signal_module_name = str('signal')
1653 self.fake_signal_module = ModuleType(signal_module_name)
1654
1655 fake_signal_names = [
1656 'SIGHUP',
1657 'SIGCLD',
1658 'SIGSEGV',
1659 'SIGTSTP',
1660 'SIGTTIN',
1661 'SIGTTOU',
1662 'SIGTERM',
1663 ]
1664 for name in fake_signal_names:
1665 setattr(self.fake_signal_module, name, object())
1666
1667 module_patcher_signal = mock.patch.object(
1668 daemon.daemon, "signal", new=self.fake_signal_module)
1669 module_patcher_signal.start()
1670 self.addCleanup(module_patcher_signal.stop)
1671
1672 default_signal_map_by_name = {
1673 'SIGTSTP': None,
1674 'SIGTTIN': None,
1675 'SIGTTOU': None,
1676 'SIGTERM': 'terminate',
1677 }
1678 self.default_signal_map = dict(
1679 (getattr(self.fake_signal_module, name), target)
1680 for (name, target) in default_signal_map_by_name.items())
1681
1682 def test_returns_constructed_signal_map(self):
1683 """ Should return map per default. """
1684 expected_result = self.default_signal_map
1685 result = daemon.daemon.make_default_signal_map()
1686 self.assertEqual(expected_result, result)
1687
1688 def test_returns_signal_map_with_only_ids_in_signal_module(self):
1689 """ Should return map with only signals in the `signal` module.
1690
1691 The `signal` module is documented to only define those
1692 signals which exist on the running system. Therefore the
1693 default map should not contain any signals which are not
1694 defined in the `signal` module.
1695
1696 """
1697 del(self.default_signal_map[self.fake_signal_module.SIGTTOU])
1698 del(self.fake_signal_module.SIGTTOU)
1699 expected_result = self.default_signal_map
1700 result = daemon.daemon.make_default_signal_map()
1701 self.assertEqual(expected_result, result)
1702
1703
1704 @mock.patch.object(daemon.daemon.signal, "signal")
1705 class set_signal_handlers_TestCase(scaffold.TestCase):
1706 """ Test cases for set_signal_handlers function. """
1707
1708 def setUp(self):
1709 """ Set up test fixtures. """
1710 super(set_signal_handlers_TestCase, self).setUp()
1711
1712 self.signal_handler_map = {
1713 signal.SIGQUIT: object(),
1714 signal.SIGSEGV: object(),
1715 signal.SIGINT: object(),
1716 }
1717
1718 def test_sets_signal_handler_for_each_item(self, mock_func_signal_signal):
1719 """ Should set signal handler for each item in map. """
1720 signal_handler_map = self.signal_handler_map
1721 expected_calls = [
1722 mock.call(signal_number, handler)
1723 for (signal_number, handler) in signal_handler_map.items()]
1724 daemon.daemon.set_signal_handlers(signal_handler_map)
1725 self.assertEquals(expected_calls, mock_func_signal_signal.mock_calls)
1726
1727
1728 @mock.patch.object(daemon.daemon.atexit, "register")
1729 class register_atexit_function_TestCase(scaffold.TestCase):
1730 """ Test cases for register_atexit_function function. """
1731
1732 def test_registers_function_for_atexit_processing(
1733 self, mock_func_atexit_register):
1734 """ Should register specified function for atexit processing. """
1735 func = object()
1736 daemon.daemon.register_atexit_function(func)
1737 mock_func_atexit_register.assert_called_with(func)
1738
1739
1740 # Local variables:
1741 # coding: utf-8
1742 # mode: python
1743 # End:
1744 # vim: fileencoding=utf-8 filetype=python :