33
|
1 import os
|
|
2 import threading
|
|
3 import shutil
|
|
4
|
|
5 import lockfile
|
|
6
|
|
7 class ComplianceTest(object):
|
|
8 def __init__(self):
|
|
9 self.saved_class = lockfile.LockFile
|
|
10
|
|
11 def _testfile(self):
|
|
12 """Return platform-appropriate file. Helper for tests."""
|
|
13 import tempfile
|
|
14 return os.path.join(tempfile.gettempdir(), 'trash-%s' % os.getpid())
|
|
15
|
|
16 def setup(self):
|
|
17 lockfile.LockFile = self.class_to_test
|
|
18
|
|
19 def teardown(self):
|
|
20 try:
|
|
21 tf = self._testfile()
|
|
22 if os.path.isdir(tf):
|
|
23 shutil.rmtree(tf)
|
|
24 elif os.path.isfile(tf):
|
|
25 os.unlink(tf)
|
|
26 elif not os.path.exists(tf):
|
|
27 pass
|
|
28 else:
|
|
29 raise SystemError("unrecognized file: %s" % tf)
|
|
30 finally:
|
|
31 lockfile.LockFile = self.saved_class
|
|
32
|
|
33 def _test_acquire_helper(self, tbool):
|
|
34 # As simple as it gets.
|
|
35 lock = lockfile.LockFile(self._testfile(), threaded=tbool)
|
|
36 lock.acquire()
|
|
37 assert lock.i_am_locking()
|
|
38 lock.release()
|
|
39 assert not lock.is_locked()
|
|
40
|
|
41 ## def test_acquire_basic_threaded(self):
|
|
42 ## self._test_acquire_helper(True)
|
|
43
|
|
44 def test_acquire_basic_unthreaded(self):
|
|
45 self._test_acquire_helper(False)
|
|
46
|
|
47 def _test_acquire_no_timeout_helper(self, tbool):
|
|
48 # No timeout test
|
|
49 e1, e2 = threading.Event(), threading.Event()
|
|
50 t = _in_thread(self._lock_wait_unlock, e1, e2)
|
|
51 e1.wait() # wait for thread t to acquire lock
|
|
52 lock2 = lockfile.LockFile(self._testfile(), threaded=tbool)
|
|
53 assert lock2.is_locked()
|
|
54 if tbool:
|
|
55 assert not lock2.i_am_locking()
|
|
56 else:
|
|
57 assert lock2.i_am_locking()
|
|
58
|
|
59 try:
|
|
60 lock2.acquire(timeout=-1)
|
|
61 except lockfile.AlreadyLocked:
|
|
62 pass
|
|
63 else:
|
|
64 lock2.release()
|
|
65 raise AssertionError("did not raise AlreadyLocked in"
|
|
66 " thread %s" %
|
|
67 threading.current_thread().get_name())
|
|
68
|
|
69 try:
|
|
70 lock2.acquire(timeout=0)
|
|
71 except lockfile.AlreadyLocked:
|
|
72 pass
|
|
73 else:
|
|
74 lock2.release()
|
|
75 raise AssertionError("did not raise AlreadyLocked in"
|
|
76 " thread %s" %
|
|
77 threading.current_thread().get_name())
|
|
78
|
|
79 e2.set() # tell thread t to release lock
|
|
80 t.join()
|
|
81
|
|
82 ## def test_acquire_no_timeout_threaded(self):
|
|
83 ## self._test_acquire_no_timeout_helper(True)
|
|
84
|
|
85 ## def test_acquire_no_timeout_unthreaded(self):
|
|
86 ## self._test_acquire_no_timeout_helper(False)
|
|
87
|
|
88 def _test_acquire_timeout_helper(self, tbool):
|
|
89 # Timeout test
|
|
90 e1, e2 = threading.Event(), threading.Event()
|
|
91 t = _in_thread(self._lock_wait_unlock, e1, e2)
|
|
92 e1.wait() # wait for thread t to acquire lock
|
|
93 lock2 = lockfile.LockFile(self._testfile(), threaded=tbool)
|
|
94 assert lock2.is_locked()
|
|
95 try:
|
|
96 lock2.acquire(timeout=0.1)
|
|
97 except lockfile.LockTimeout:
|
|
98 pass
|
|
99 else:
|
|
100 lock2.release()
|
|
101 raise AssertionError("did not raise LockTimeout in thread %s" %
|
|
102 threading.current_thread().get_name())
|
|
103
|
|
104 e2.set()
|
|
105 t.join()
|
|
106
|
|
107 def test_acquire_timeout_threaded(self):
|
|
108 self._test_acquire_timeout_helper(True)
|
|
109
|
|
110 def test_acquire_timeout_unthreaded(self):
|
|
111 self._test_acquire_timeout_helper(False)
|
|
112
|
|
113 def _test_context_timeout_helper(self, tbool):
|
|
114 # Timeout test
|
|
115 e1, e2 = threading.Event(), threading.Event()
|
|
116 t = _in_thread(self._lock_wait_unlock, e1, e2)
|
|
117 e1.wait() # wait for thread t to acquire lock
|
|
118 lock2 = lockfile.LockFile(self._testfile(), threaded=tbool,
|
|
119 timeout=0.2)
|
|
120 assert lock2.is_locked()
|
|
121 try:
|
|
122 lock2.acquire()
|
|
123 except lockfile.LockTimeout:
|
|
124 pass
|
|
125 else:
|
|
126 lock2.release()
|
|
127 raise AssertionError("did not raise LockTimeout in thread %s" %
|
|
128 threading.current_thread().get_name())
|
|
129
|
|
130 e2.set()
|
|
131 t.join()
|
|
132
|
|
133 def test_context_timeout_unthreaded(self):
|
|
134 self._test_context_timeout_helper(False)
|
|
135
|
|
136 def _test_release_basic_helper(self, tbool):
|
|
137 lock = lockfile.LockFile(self._testfile(), threaded=tbool)
|
|
138 lock.acquire()
|
|
139 assert lock.is_locked()
|
|
140 lock.release()
|
|
141 assert not lock.is_locked()
|
|
142 assert not lock.i_am_locking()
|
|
143 try:
|
|
144 lock.release()
|
|
145 except lockfile.NotLocked:
|
|
146 pass
|
|
147 except lockfile.NotMyLock:
|
|
148 raise AssertionError('unexpected exception: %s' %
|
|
149 lockfile.NotMyLock)
|
|
150 else:
|
|
151 raise AssertionError('erroneously unlocked file')
|
|
152
|
|
153 ## def test_release_basic_threaded(self):
|
|
154 ## self._test_release_basic_helper(True)
|
|
155
|
|
156 def test_release_basic_unthreaded(self):
|
|
157 self._test_release_basic_helper(False)
|
|
158
|
|
159 ## def test_release_from_thread(self):
|
|
160 ## e1, e2 = threading.Event(), threading.Event()
|
|
161 ## t = _in_thread(self._lock_wait_unlock, e1, e2)
|
|
162 ## e1.wait()
|
|
163 ## lock2 = lockfile.LockFile(self._testfile(), threaded=False)
|
|
164 ## assert not lock2.i_am_locking()
|
|
165 ## try:
|
|
166 ## lock2.release()
|
|
167 ## except lockfile.NotMyLock:
|
|
168 ## pass
|
|
169 ## else:
|
|
170 ## raise AssertionError('erroneously unlocked a file locked'
|
|
171 ## ' by another thread.')
|
|
172 ## e2.set()
|
|
173 ## t.join()
|
|
174
|
|
175 def _test_is_locked_helper(self, tbool):
|
|
176 lock = lockfile.LockFile(self._testfile(), threaded=tbool)
|
|
177 lock.acquire(timeout=2)
|
|
178 assert lock.is_locked()
|
|
179 lock.release()
|
|
180 assert not lock.is_locked(), "still locked after release!"
|
|
181
|
|
182 ## def test_is_locked_threaded(self):
|
|
183 ## self._test_is_locked_helper(True)
|
|
184
|
|
185 def test_is_locked_unthreaded(self):
|
|
186 self._test_is_locked_helper(False)
|
|
187
|
|
188 ## def test_i_am_locking_threaded(self):
|
|
189 ## self._test_i_am_locking_helper(True)
|
|
190
|
|
191 def test_i_am_locking_unthreaded(self):
|
|
192 self._test_i_am_locking_helper(False)
|
|
193
|
|
194 def _test_i_am_locking_helper(self, tbool):
|
|
195 lock1 = lockfile.LockFile(self._testfile(), threaded=tbool)
|
|
196 assert not lock1.is_locked()
|
|
197 lock1.acquire()
|
|
198 try:
|
|
199 assert lock1.i_am_locking()
|
|
200 lock2 = lockfile.LockFile(self._testfile(), threaded=tbool)
|
|
201 assert lock2.is_locked()
|
|
202 if tbool:
|
|
203 assert not lock2.i_am_locking()
|
|
204 finally:
|
|
205 lock1.release()
|
|
206
|
|
207 def _test_break_lock_helper(self, tbool):
|
|
208 lock = lockfile.LockFile(self._testfile(), threaded=tbool)
|
|
209 lock.acquire()
|
|
210 assert lock.is_locked()
|
|
211 lock2 = lockfile.LockFile(self._testfile(), threaded=tbool)
|
|
212 assert lock2.is_locked()
|
|
213 lock2.break_lock()
|
|
214 assert not lock2.is_locked()
|
|
215 try:
|
|
216 lock.release()
|
|
217 except lockfile.NotLocked:
|
|
218 pass
|
|
219 else:
|
|
220 raise AssertionError('break lock failed')
|
|
221
|
|
222 ## def test_break_lock_threaded(self):
|
|
223 ## self._test_break_lock_helper(True)
|
|
224
|
|
225 def test_break_lock_unthreaded(self):
|
|
226 self._test_break_lock_helper(False)
|
|
227
|
|
228 def _lock_wait_unlock(self, event1, event2):
|
|
229 """Lock from another thread. Helper for tests."""
|
|
230 l = lockfile.LockFile(self._testfile())
|
|
231 l.acquire()
|
|
232 try:
|
|
233 event1.set() # we're in,
|
|
234 event2.wait() # wait for boss's permission to leave
|
|
235 finally:
|
|
236 l.release()
|
|
237
|
|
238 def test_enter(self):
|
|
239 lock = lockfile.LockFile(self._testfile())
|
|
240 lock.acquire()
|
|
241 try:
|
|
242 assert lock.is_locked(), "Not locked after acquire!"
|
|
243 finally:
|
|
244 lock.release()
|
|
245 assert not lock.is_locked(), "still locked after release!"
|
|
246
|
|
247 def test_decorator(self):
|
|
248 @lockfile.locked(self._testfile())
|
|
249 def func(a, b):
|
|
250 return a + b
|
|
251 assert func(4, 3) == 7
|
|
252
|
|
253 def _in_thread(func, *args, **kwargs):
|
|
254 """Execute func(*args, **kwargs) after dt seconds. Helper for tests."""
|
|
255 def _f():
|
|
256 func(*args, **kwargs)
|
|
257 t = threading.Thread(target=_f, name='/*/*')
|
|
258 t.setDaemon(True)
|
|
259 t.start()
|
|
260 return t
|
|
261
|