Mercurial > repos > bcclaywell > argo_navis
comparison venv/lib/python2.7/site-packages/click/_termui_impl.py @ 0:d67268158946 draft
planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author | bcclaywell |
---|---|
date | Mon, 12 Oct 2015 17:43:33 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d67268158946 |
---|---|
1 """ | |
2 click._termui_impl | |
3 ~~~~~~~~~~~~~~~~~~ | |
4 | |
5 This module contains implementations for the termui module. To keep the | |
6 import time of Click down, some infrequently used functionality is placed | |
7 in this module and only imported as needed. | |
8 | |
9 :copyright: (c) 2014 by Armin Ronacher. | |
10 :license: BSD, see LICENSE for more details. | |
11 """ | |
12 import os | |
13 import sys | |
14 import time | |
15 import math | |
16 from ._compat import _default_text_stdout, range_type, PY2, isatty, \ | |
17 open_stream, strip_ansi, term_len, get_best_encoding, WIN | |
18 from .utils import echo | |
19 from .exceptions import ClickException | |
20 | |
21 | |
22 if os.name == 'nt': | |
23 BEFORE_BAR = '\r' | |
24 AFTER_BAR = '\n' | |
25 else: | |
26 BEFORE_BAR = '\r\033[?25l' | |
27 AFTER_BAR = '\033[?25h\n' | |
28 | |
29 | |
30 def _length_hint(obj): | |
31 """Returns the length hint of an object.""" | |
32 try: | |
33 return len(obj) | |
34 except TypeError: | |
35 try: | |
36 get_hint = type(obj).__length_hint__ | |
37 except AttributeError: | |
38 return None | |
39 try: | |
40 hint = get_hint(obj) | |
41 except TypeError: | |
42 return None | |
43 if hint is NotImplemented or \ | |
44 not isinstance(hint, (int, long)) or \ | |
45 hint < 0: | |
46 return None | |
47 return hint | |
48 | |
49 | |
50 class ProgressBar(object): | |
51 | |
52 def __init__(self, iterable, length=None, fill_char='#', empty_char=' ', | |
53 bar_template='%(bar)s', info_sep=' ', show_eta=True, | |
54 show_percent=None, show_pos=False, item_show_func=None, | |
55 label=None, file=None, color=None, width=30): | |
56 self.fill_char = fill_char | |
57 self.empty_char = empty_char | |
58 self.bar_template = bar_template | |
59 self.info_sep = info_sep | |
60 self.show_eta = show_eta | |
61 self.show_percent = show_percent | |
62 self.show_pos = show_pos | |
63 self.item_show_func = item_show_func | |
64 self.label = label or '' | |
65 if file is None: | |
66 file = _default_text_stdout() | |
67 self.file = file | |
68 self.color = color | |
69 self.width = width | |
70 self.autowidth = width == 0 | |
71 | |
72 if length is None: | |
73 length = _length_hint(iterable) | |
74 if iterable is None: | |
75 if length is None: | |
76 raise TypeError('iterable or length is required') | |
77 iterable = range_type(length) | |
78 self.iter = iter(iterable) | |
79 self.length = length | |
80 self.length_known = length is not None | |
81 self.pos = 0 | |
82 self.avg = [] | |
83 self.start = self.last_eta = time.time() | |
84 self.eta_known = False | |
85 self.finished = False | |
86 self.max_width = None | |
87 self.entered = False | |
88 self.current_item = None | |
89 self.is_hidden = not isatty(self.file) | |
90 | |
91 def __enter__(self): | |
92 self.entered = True | |
93 self.render_progress() | |
94 return self | |
95 | |
96 def __exit__(self, exc_type, exc_value, tb): | |
97 self.render_finish() | |
98 | |
99 def __iter__(self): | |
100 if not self.entered: | |
101 raise RuntimeError('You need to use progress bars in a with block.') | |
102 self.render_progress() | |
103 return self | |
104 | |
105 def render_finish(self): | |
106 if self.is_hidden: | |
107 return | |
108 self.file.write(AFTER_BAR) | |
109 self.file.flush() | |
110 | |
111 @property | |
112 def pct(self): | |
113 if self.finished: | |
114 return 1.0 | |
115 return min(self.pos / (float(self.length) or 1), 1.0) | |
116 | |
117 @property | |
118 def time_per_iteration(self): | |
119 if not self.avg: | |
120 return 0.0 | |
121 return sum(self.avg) / float(len(self.avg)) | |
122 | |
123 @property | |
124 def eta(self): | |
125 if self.length_known and not self.finished: | |
126 return self.time_per_iteration * (self.length - self.pos) | |
127 return 0.0 | |
128 | |
129 def format_eta(self): | |
130 if self.eta_known: | |
131 return time.strftime('%H:%M:%S', time.gmtime(self.eta + 1)) | |
132 return '' | |
133 | |
134 def format_pos(self): | |
135 pos = str(self.pos) | |
136 if self.length_known: | |
137 pos += '/%s' % self.length | |
138 return pos | |
139 | |
140 def format_pct(self): | |
141 return ('% 4d%%' % int(self.pct * 100))[1:] | |
142 | |
143 def format_progress_line(self): | |
144 show_percent = self.show_percent | |
145 | |
146 info_bits = [] | |
147 if self.length_known: | |
148 bar_length = int(self.pct * self.width) | |
149 bar = self.fill_char * bar_length | |
150 bar += self.empty_char * (self.width - bar_length) | |
151 if show_percent is None: | |
152 show_percent = not self.show_pos | |
153 else: | |
154 if self.finished: | |
155 bar = self.fill_char * self.width | |
156 else: | |
157 bar = list(self.empty_char * (self.width or 1)) | |
158 if self.time_per_iteration != 0: | |
159 bar[int((math.cos(self.pos * self.time_per_iteration) | |
160 / 2.0 + 0.5) * self.width)] = self.fill_char | |
161 bar = ''.join(bar) | |
162 | |
163 if self.show_pos: | |
164 info_bits.append(self.format_pos()) | |
165 if show_percent: | |
166 info_bits.append(self.format_pct()) | |
167 if self.show_eta and self.eta_known and not self.finished: | |
168 info_bits.append(self.format_eta()) | |
169 if self.item_show_func is not None: | |
170 item_info = self.item_show_func(self.current_item) | |
171 if item_info is not None: | |
172 info_bits.append(item_info) | |
173 | |
174 return (self.bar_template % { | |
175 'label': self.label, | |
176 'bar': bar, | |
177 'info': self.info_sep.join(info_bits) | |
178 }).rstrip() | |
179 | |
180 def render_progress(self): | |
181 from .termui import get_terminal_size | |
182 | |
183 if self.is_hidden: | |
184 echo(self.label, file=self.file, color=self.color) | |
185 self.file.flush() | |
186 return | |
187 | |
188 # Update width in case the terminal has been resized | |
189 if self.autowidth: | |
190 old_width = self.width | |
191 self.width = 0 | |
192 clutter_length = term_len(self.format_progress_line()) | |
193 new_width = max(0, get_terminal_size()[0] - clutter_length) | |
194 if new_width < old_width: | |
195 self.file.write(BEFORE_BAR) | |
196 self.file.write(' ' * self.max_width) | |
197 self.max_width = new_width | |
198 self.width = new_width | |
199 | |
200 clear_width = self.width | |
201 if self.max_width is not None: | |
202 clear_width = self.max_width | |
203 | |
204 self.file.write(BEFORE_BAR) | |
205 line = self.format_progress_line() | |
206 line_len = term_len(line) | |
207 if self.max_width is None or self.max_width < line_len: | |
208 self.max_width = line_len | |
209 # Use echo here so that we get colorama support. | |
210 echo(line, file=self.file, nl=False, color=self.color) | |
211 self.file.write(' ' * (clear_width - line_len)) | |
212 self.file.flush() | |
213 | |
214 def make_step(self, n_steps): | |
215 self.pos += n_steps | |
216 if self.length_known and self.pos >= self.length: | |
217 self.finished = True | |
218 | |
219 if (time.time() - self.last_eta) < 1.0: | |
220 return | |
221 | |
222 self.last_eta = time.time() | |
223 self.avg = self.avg[-6:] + [-(self.start - time.time()) / (self.pos)] | |
224 | |
225 self.eta_known = self.length_known | |
226 | |
227 def update(self, n_steps): | |
228 self.make_step(n_steps) | |
229 self.render_progress() | |
230 | |
231 def finish(self): | |
232 self.eta_known = 0 | |
233 self.current_item = None | |
234 self.finished = True | |
235 | |
236 def next(self): | |
237 if self.is_hidden: | |
238 return next(self.iter) | |
239 try: | |
240 rv = next(self.iter) | |
241 self.current_item = rv | |
242 except StopIteration: | |
243 self.finish() | |
244 self.render_progress() | |
245 raise StopIteration() | |
246 else: | |
247 self.update(1) | |
248 return rv | |
249 | |
250 if not PY2: | |
251 __next__ = next | |
252 del next | |
253 | |
254 | |
255 def pager(text, color=None): | |
256 """Decide what method to use for paging through text.""" | |
257 stdout = _default_text_stdout() | |
258 if not isatty(sys.stdin) or not isatty(stdout): | |
259 return _nullpager(stdout, text, color) | |
260 pager_cmd = (os.environ.get('PAGER', None) or '').strip() | |
261 if pager_cmd: | |
262 if WIN: | |
263 return _tempfilepager(text, pager_cmd, color) | |
264 return _pipepager(text, pager_cmd, color) | |
265 if os.environ.get('TERM') in ('dumb', 'emacs'): | |
266 return _nullpager(stdout, text, color) | |
267 if WIN or sys.platform.startswith('os2'): | |
268 return _tempfilepager(text, 'more <', color) | |
269 if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0: | |
270 return _pipepager(text, 'less', color) | |
271 | |
272 import tempfile | |
273 fd, filename = tempfile.mkstemp() | |
274 os.close(fd) | |
275 try: | |
276 if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0: | |
277 return _pipepager(text, 'more', color) | |
278 return _nullpager(stdout, text, color) | |
279 finally: | |
280 os.unlink(filename) | |
281 | |
282 | |
283 def _pipepager(text, cmd, color): | |
284 """Page through text by feeding it to another program. Invoking a | |
285 pager through this might support colors. | |
286 """ | |
287 import subprocess | |
288 env = dict(os.environ) | |
289 | |
290 # If we're piping to less we might support colors under the | |
291 # condition that | |
292 cmd_detail = cmd.rsplit('/', 1)[-1].split() | |
293 if color is None and cmd_detail[0] == 'less': | |
294 less_flags = os.environ.get('LESS', '') + ' '.join(cmd_detail[1:]) | |
295 if not less_flags: | |
296 env['LESS'] = '-R' | |
297 color = True | |
298 elif 'r' in less_flags or 'R' in less_flags: | |
299 color = True | |
300 | |
301 if not color: | |
302 text = strip_ansi(text) | |
303 | |
304 c = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, | |
305 env=env) | |
306 encoding = get_best_encoding(c.stdin) | |
307 try: | |
308 c.stdin.write(text.encode(encoding, 'replace')) | |
309 c.stdin.close() | |
310 except (IOError, KeyboardInterrupt): | |
311 pass | |
312 | |
313 # Less doesn't respect ^C, but catches it for its own UI purposes (aborting | |
314 # search or other commands inside less). | |
315 # | |
316 # That means when the user hits ^C, the parent process (click) terminates, | |
317 # but less is still alive, paging the output and messing up the terminal. | |
318 # | |
319 # If the user wants to make the pager exit on ^C, they should set | |
320 # `LESS='-K'`. It's not our decision to make. | |
321 while True: | |
322 try: | |
323 c.wait() | |
324 except KeyboardInterrupt: | |
325 pass | |
326 else: | |
327 break | |
328 | |
329 | |
330 def _tempfilepager(text, cmd, color): | |
331 """Page through text by invoking a program on a temporary file.""" | |
332 import tempfile | |
333 filename = tempfile.mktemp() | |
334 if not color: | |
335 text = strip_ansi(text) | |
336 encoding = get_best_encoding(sys.stdout) | |
337 with open_stream(filename, 'wb')[0] as f: | |
338 f.write(text.encode(encoding)) | |
339 try: | |
340 os.system(cmd + ' "' + filename + '"') | |
341 finally: | |
342 os.unlink(filename) | |
343 | |
344 | |
345 def _nullpager(stream, text, color): | |
346 """Simply print unformatted text. This is the ultimate fallback.""" | |
347 if not color: | |
348 text = strip_ansi(text) | |
349 stream.write(text) | |
350 | |
351 | |
352 class Editor(object): | |
353 | |
354 def __init__(self, editor=None, env=None, require_save=True, | |
355 extension='.txt'): | |
356 self.editor = editor | |
357 self.env = env | |
358 self.require_save = require_save | |
359 self.extension = extension | |
360 | |
361 def get_editor(self): | |
362 if self.editor is not None: | |
363 return self.editor | |
364 for key in 'VISUAL', 'EDITOR': | |
365 rv = os.environ.get(key) | |
366 if rv: | |
367 return rv | |
368 if WIN: | |
369 return 'notepad' | |
370 for editor in 'vim', 'nano': | |
371 if os.system('which %s >/dev/null 2>&1' % editor) == 0: | |
372 return editor | |
373 return 'vi' | |
374 | |
375 def edit_file(self, filename): | |
376 import subprocess | |
377 editor = self.get_editor() | |
378 if self.env: | |
379 environ = os.environ.copy() | |
380 environ.update(self.env) | |
381 else: | |
382 environ = None | |
383 try: | |
384 c = subprocess.Popen('%s "%s"' % (editor, filename), | |
385 env=environ, shell=True) | |
386 exit_code = c.wait() | |
387 if exit_code != 0: | |
388 raise ClickException('%s: Editing failed!' % editor) | |
389 except OSError as e: | |
390 raise ClickException('%s: Editing failed: %s' % (editor, e)) | |
391 | |
392 def edit(self, text): | |
393 import tempfile | |
394 | |
395 text = text or '' | |
396 if text and not text.endswith('\n'): | |
397 text += '\n' | |
398 | |
399 fd, name = tempfile.mkstemp(prefix='editor-', suffix=self.extension) | |
400 try: | |
401 if WIN: | |
402 encoding = 'utf-8-sig' | |
403 text = text.replace('\n', '\r\n') | |
404 else: | |
405 encoding = 'utf-8' | |
406 text = text.encode(encoding) | |
407 | |
408 f = os.fdopen(fd, 'wb') | |
409 f.write(text) | |
410 f.close() | |
411 timestamp = os.path.getmtime(name) | |
412 | |
413 self.edit_file(name) | |
414 | |
415 if self.require_save \ | |
416 and os.path.getmtime(name) == timestamp: | |
417 return None | |
418 | |
419 f = open(name, 'rb') | |
420 try: | |
421 rv = f.read() | |
422 finally: | |
423 f.close() | |
424 return rv.decode('utf-8-sig').replace('\r\n', '\n') | |
425 finally: | |
426 os.unlink(name) | |
427 | |
428 | |
429 def open_url(url, wait=False, locate=False): | |
430 import subprocess | |
431 | |
432 def _unquote_file(url): | |
433 try: | |
434 import urllib | |
435 except ImportError: | |
436 import urllib | |
437 if url.startswith('file://'): | |
438 url = urllib.unquote(url[7:]) | |
439 return url | |
440 | |
441 if sys.platform == 'darwin': | |
442 args = ['open'] | |
443 if wait: | |
444 args.append('-W') | |
445 if locate: | |
446 args.append('-R') | |
447 args.append(_unquote_file(url)) | |
448 null = open('/dev/null', 'w') | |
449 try: | |
450 return subprocess.Popen(args, stderr=null).wait() | |
451 finally: | |
452 null.close() | |
453 elif WIN: | |
454 if locate: | |
455 url = _unquote_file(url) | |
456 args = 'explorer /select,"%s"' % _unquote_file( | |
457 url.replace('"', '')) | |
458 else: | |
459 args = 'start %s "" "%s"' % ( | |
460 wait and '/WAIT' or '', url.replace('"', '')) | |
461 return os.system(args) | |
462 | |
463 try: | |
464 if locate: | |
465 url = os.path.dirname(_unquote_file(url)) or '.' | |
466 else: | |
467 url = _unquote_file(url) | |
468 c = subprocess.Popen(['xdg-open', url]) | |
469 if wait: | |
470 return c.wait() | |
471 return 0 | |
472 except OSError: | |
473 if url.startswith(('http://', 'https://')) and not locate and not wait: | |
474 import webbrowser | |
475 webbrowser.open(url) | |
476 return 0 | |
477 return 1 | |
478 | |
479 | |
480 def _translate_ch_to_exc(ch): | |
481 if ch == '\x03': | |
482 raise KeyboardInterrupt() | |
483 if ch == '\x04': | |
484 raise EOFError() | |
485 | |
486 | |
487 if WIN: | |
488 import msvcrt | |
489 | |
490 def getchar(echo): | |
491 rv = msvcrt.getch() | |
492 if echo: | |
493 msvcrt.putchar(rv) | |
494 _translate_ch_to_exc(rv) | |
495 if PY2: | |
496 enc = getattr(sys.stdin, 'encoding', None) | |
497 if enc is not None: | |
498 rv = rv.decode(enc, 'replace') | |
499 else: | |
500 rv = rv.decode('cp1252', 'replace') | |
501 return rv | |
502 else: | |
503 import tty | |
504 import termios | |
505 | |
506 def getchar(echo): | |
507 if not isatty(sys.stdin): | |
508 f = open('/dev/tty') | |
509 fd = f.fileno() | |
510 else: | |
511 fd = sys.stdin.fileno() | |
512 f = None | |
513 try: | |
514 old_settings = termios.tcgetattr(fd) | |
515 try: | |
516 tty.setraw(fd) | |
517 ch = os.read(fd, 32) | |
518 if echo and isatty(sys.stdout): | |
519 sys.stdout.write(ch) | |
520 finally: | |
521 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) | |
522 sys.stdout.flush() | |
523 if f is not None: | |
524 f.close() | |
525 except termios.error: | |
526 pass | |
527 _translate_ch_to_exc(ch) | |
528 return ch.decode(get_best_encoding(sys.stdin), 'replace') |