comparison venv/lib/python2.7/site-packages/click/_termui_impl.py @ 0:d67268158946 draft

planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author bcclaywell
date Mon, 12 Oct 2015 17:43:33 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d67268158946
1 """
2 click._termui_impl
3 ~~~~~~~~~~~~~~~~~~
4
5 This module contains implementations for the termui module. To keep the
6 import time of Click down, some infrequently used functionality is placed
7 in this module and only imported as needed.
8
9 :copyright: (c) 2014 by Armin Ronacher.
10 :license: BSD, see LICENSE for more details.
11 """
12 import os
13 import sys
14 import time
15 import math
16 from ._compat import _default_text_stdout, range_type, PY2, isatty, \
17 open_stream, strip_ansi, term_len, get_best_encoding, WIN
18 from .utils import echo
19 from .exceptions import ClickException
20
21
22 if os.name == 'nt':
23 BEFORE_BAR = '\r'
24 AFTER_BAR = '\n'
25 else:
26 BEFORE_BAR = '\r\033[?25l'
27 AFTER_BAR = '\033[?25h\n'
28
29
30 def _length_hint(obj):
31 """Returns the length hint of an object."""
32 try:
33 return len(obj)
34 except TypeError:
35 try:
36 get_hint = type(obj).__length_hint__
37 except AttributeError:
38 return None
39 try:
40 hint = get_hint(obj)
41 except TypeError:
42 return None
43 if hint is NotImplemented or \
44 not isinstance(hint, (int, long)) or \
45 hint < 0:
46 return None
47 return hint
48
49
50 class ProgressBar(object):
51
52 def __init__(self, iterable, length=None, fill_char='#', empty_char=' ',
53 bar_template='%(bar)s', info_sep=' ', show_eta=True,
54 show_percent=None, show_pos=False, item_show_func=None,
55 label=None, file=None, color=None, width=30):
56 self.fill_char = fill_char
57 self.empty_char = empty_char
58 self.bar_template = bar_template
59 self.info_sep = info_sep
60 self.show_eta = show_eta
61 self.show_percent = show_percent
62 self.show_pos = show_pos
63 self.item_show_func = item_show_func
64 self.label = label or ''
65 if file is None:
66 file = _default_text_stdout()
67 self.file = file
68 self.color = color
69 self.width = width
70 self.autowidth = width == 0
71
72 if length is None:
73 length = _length_hint(iterable)
74 if iterable is None:
75 if length is None:
76 raise TypeError('iterable or length is required')
77 iterable = range_type(length)
78 self.iter = iter(iterable)
79 self.length = length
80 self.length_known = length is not None
81 self.pos = 0
82 self.avg = []
83 self.start = self.last_eta = time.time()
84 self.eta_known = False
85 self.finished = False
86 self.max_width = None
87 self.entered = False
88 self.current_item = None
89 self.is_hidden = not isatty(self.file)
90
91 def __enter__(self):
92 self.entered = True
93 self.render_progress()
94 return self
95
96 def __exit__(self, exc_type, exc_value, tb):
97 self.render_finish()
98
99 def __iter__(self):
100 if not self.entered:
101 raise RuntimeError('You need to use progress bars in a with block.')
102 self.render_progress()
103 return self
104
105 def render_finish(self):
106 if self.is_hidden:
107 return
108 self.file.write(AFTER_BAR)
109 self.file.flush()
110
111 @property
112 def pct(self):
113 if self.finished:
114 return 1.0
115 return min(self.pos / (float(self.length) or 1), 1.0)
116
117 @property
118 def time_per_iteration(self):
119 if not self.avg:
120 return 0.0
121 return sum(self.avg) / float(len(self.avg))
122
123 @property
124 def eta(self):
125 if self.length_known and not self.finished:
126 return self.time_per_iteration * (self.length - self.pos)
127 return 0.0
128
129 def format_eta(self):
130 if self.eta_known:
131 return time.strftime('%H:%M:%S', time.gmtime(self.eta + 1))
132 return ''
133
134 def format_pos(self):
135 pos = str(self.pos)
136 if self.length_known:
137 pos += '/%s' % self.length
138 return pos
139
140 def format_pct(self):
141 return ('% 4d%%' % int(self.pct * 100))[1:]
142
143 def format_progress_line(self):
144 show_percent = self.show_percent
145
146 info_bits = []
147 if self.length_known:
148 bar_length = int(self.pct * self.width)
149 bar = self.fill_char * bar_length
150 bar += self.empty_char * (self.width - bar_length)
151 if show_percent is None:
152 show_percent = not self.show_pos
153 else:
154 if self.finished:
155 bar = self.fill_char * self.width
156 else:
157 bar = list(self.empty_char * (self.width or 1))
158 if self.time_per_iteration != 0:
159 bar[int((math.cos(self.pos * self.time_per_iteration)
160 / 2.0 + 0.5) * self.width)] = self.fill_char
161 bar = ''.join(bar)
162
163 if self.show_pos:
164 info_bits.append(self.format_pos())
165 if show_percent:
166 info_bits.append(self.format_pct())
167 if self.show_eta and self.eta_known and not self.finished:
168 info_bits.append(self.format_eta())
169 if self.item_show_func is not None:
170 item_info = self.item_show_func(self.current_item)
171 if item_info is not None:
172 info_bits.append(item_info)
173
174 return (self.bar_template % {
175 'label': self.label,
176 'bar': bar,
177 'info': self.info_sep.join(info_bits)
178 }).rstrip()
179
180 def render_progress(self):
181 from .termui import get_terminal_size
182
183 if self.is_hidden:
184 echo(self.label, file=self.file, color=self.color)
185 self.file.flush()
186 return
187
188 # Update width in case the terminal has been resized
189 if self.autowidth:
190 old_width = self.width
191 self.width = 0
192 clutter_length = term_len(self.format_progress_line())
193 new_width = max(0, get_terminal_size()[0] - clutter_length)
194 if new_width < old_width:
195 self.file.write(BEFORE_BAR)
196 self.file.write(' ' * self.max_width)
197 self.max_width = new_width
198 self.width = new_width
199
200 clear_width = self.width
201 if self.max_width is not None:
202 clear_width = self.max_width
203
204 self.file.write(BEFORE_BAR)
205 line = self.format_progress_line()
206 line_len = term_len(line)
207 if self.max_width is None or self.max_width < line_len:
208 self.max_width = line_len
209 # Use echo here so that we get colorama support.
210 echo(line, file=self.file, nl=False, color=self.color)
211 self.file.write(' ' * (clear_width - line_len))
212 self.file.flush()
213
214 def make_step(self, n_steps):
215 self.pos += n_steps
216 if self.length_known and self.pos >= self.length:
217 self.finished = True
218
219 if (time.time() - self.last_eta) < 1.0:
220 return
221
222 self.last_eta = time.time()
223 self.avg = self.avg[-6:] + [-(self.start - time.time()) / (self.pos)]
224
225 self.eta_known = self.length_known
226
227 def update(self, n_steps):
228 self.make_step(n_steps)
229 self.render_progress()
230
231 def finish(self):
232 self.eta_known = 0
233 self.current_item = None
234 self.finished = True
235
236 def next(self):
237 if self.is_hidden:
238 return next(self.iter)
239 try:
240 rv = next(self.iter)
241 self.current_item = rv
242 except StopIteration:
243 self.finish()
244 self.render_progress()
245 raise StopIteration()
246 else:
247 self.update(1)
248 return rv
249
250 if not PY2:
251 __next__ = next
252 del next
253
254
255 def pager(text, color=None):
256 """Decide what method to use for paging through text."""
257 stdout = _default_text_stdout()
258 if not isatty(sys.stdin) or not isatty(stdout):
259 return _nullpager(stdout, text, color)
260 pager_cmd = (os.environ.get('PAGER', None) or '').strip()
261 if pager_cmd:
262 if WIN:
263 return _tempfilepager(text, pager_cmd, color)
264 return _pipepager(text, pager_cmd, color)
265 if os.environ.get('TERM') in ('dumb', 'emacs'):
266 return _nullpager(stdout, text, color)
267 if WIN or sys.platform.startswith('os2'):
268 return _tempfilepager(text, 'more <', color)
269 if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
270 return _pipepager(text, 'less', color)
271
272 import tempfile
273 fd, filename = tempfile.mkstemp()
274 os.close(fd)
275 try:
276 if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
277 return _pipepager(text, 'more', color)
278 return _nullpager(stdout, text, color)
279 finally:
280 os.unlink(filename)
281
282
283 def _pipepager(text, cmd, color):
284 """Page through text by feeding it to another program. Invoking a
285 pager through this might support colors.
286 """
287 import subprocess
288 env = dict(os.environ)
289
290 # If we're piping to less we might support colors under the
291 # condition that
292 cmd_detail = cmd.rsplit('/', 1)[-1].split()
293 if color is None and cmd_detail[0] == 'less':
294 less_flags = os.environ.get('LESS', '') + ' '.join(cmd_detail[1:])
295 if not less_flags:
296 env['LESS'] = '-R'
297 color = True
298 elif 'r' in less_flags or 'R' in less_flags:
299 color = True
300
301 if not color:
302 text = strip_ansi(text)
303
304 c = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
305 env=env)
306 encoding = get_best_encoding(c.stdin)
307 try:
308 c.stdin.write(text.encode(encoding, 'replace'))
309 c.stdin.close()
310 except (IOError, KeyboardInterrupt):
311 pass
312
313 # Less doesn't respect ^C, but catches it for its own UI purposes (aborting
314 # search or other commands inside less).
315 #
316 # That means when the user hits ^C, the parent process (click) terminates,
317 # but less is still alive, paging the output and messing up the terminal.
318 #
319 # If the user wants to make the pager exit on ^C, they should set
320 # `LESS='-K'`. It's not our decision to make.
321 while True:
322 try:
323 c.wait()
324 except KeyboardInterrupt:
325 pass
326 else:
327 break
328
329
330 def _tempfilepager(text, cmd, color):
331 """Page through text by invoking a program on a temporary file."""
332 import tempfile
333 filename = tempfile.mktemp()
334 if not color:
335 text = strip_ansi(text)
336 encoding = get_best_encoding(sys.stdout)
337 with open_stream(filename, 'wb')[0] as f:
338 f.write(text.encode(encoding))
339 try:
340 os.system(cmd + ' "' + filename + '"')
341 finally:
342 os.unlink(filename)
343
344
345 def _nullpager(stream, text, color):
346 """Simply print unformatted text. This is the ultimate fallback."""
347 if not color:
348 text = strip_ansi(text)
349 stream.write(text)
350
351
352 class Editor(object):
353
354 def __init__(self, editor=None, env=None, require_save=True,
355 extension='.txt'):
356 self.editor = editor
357 self.env = env
358 self.require_save = require_save
359 self.extension = extension
360
361 def get_editor(self):
362 if self.editor is not None:
363 return self.editor
364 for key in 'VISUAL', 'EDITOR':
365 rv = os.environ.get(key)
366 if rv:
367 return rv
368 if WIN:
369 return 'notepad'
370 for editor in 'vim', 'nano':
371 if os.system('which %s >/dev/null 2>&1' % editor) == 0:
372 return editor
373 return 'vi'
374
375 def edit_file(self, filename):
376 import subprocess
377 editor = self.get_editor()
378 if self.env:
379 environ = os.environ.copy()
380 environ.update(self.env)
381 else:
382 environ = None
383 try:
384 c = subprocess.Popen('%s "%s"' % (editor, filename),
385 env=environ, shell=True)
386 exit_code = c.wait()
387 if exit_code != 0:
388 raise ClickException('%s: Editing failed!' % editor)
389 except OSError as e:
390 raise ClickException('%s: Editing failed: %s' % (editor, e))
391
392 def edit(self, text):
393 import tempfile
394
395 text = text or ''
396 if text and not text.endswith('\n'):
397 text += '\n'
398
399 fd, name = tempfile.mkstemp(prefix='editor-', suffix=self.extension)
400 try:
401 if WIN:
402 encoding = 'utf-8-sig'
403 text = text.replace('\n', '\r\n')
404 else:
405 encoding = 'utf-8'
406 text = text.encode(encoding)
407
408 f = os.fdopen(fd, 'wb')
409 f.write(text)
410 f.close()
411 timestamp = os.path.getmtime(name)
412
413 self.edit_file(name)
414
415 if self.require_save \
416 and os.path.getmtime(name) == timestamp:
417 return None
418
419 f = open(name, 'rb')
420 try:
421 rv = f.read()
422 finally:
423 f.close()
424 return rv.decode('utf-8-sig').replace('\r\n', '\n')
425 finally:
426 os.unlink(name)
427
428
429 def open_url(url, wait=False, locate=False):
430 import subprocess
431
432 def _unquote_file(url):
433 try:
434 import urllib
435 except ImportError:
436 import urllib
437 if url.startswith('file://'):
438 url = urllib.unquote(url[7:])
439 return url
440
441 if sys.platform == 'darwin':
442 args = ['open']
443 if wait:
444 args.append('-W')
445 if locate:
446 args.append('-R')
447 args.append(_unquote_file(url))
448 null = open('/dev/null', 'w')
449 try:
450 return subprocess.Popen(args, stderr=null).wait()
451 finally:
452 null.close()
453 elif WIN:
454 if locate:
455 url = _unquote_file(url)
456 args = 'explorer /select,"%s"' % _unquote_file(
457 url.replace('"', ''))
458 else:
459 args = 'start %s "" "%s"' % (
460 wait and '/WAIT' or '', url.replace('"', ''))
461 return os.system(args)
462
463 try:
464 if locate:
465 url = os.path.dirname(_unquote_file(url)) or '.'
466 else:
467 url = _unquote_file(url)
468 c = subprocess.Popen(['xdg-open', url])
469 if wait:
470 return c.wait()
471 return 0
472 except OSError:
473 if url.startswith(('http://', 'https://')) and not locate and not wait:
474 import webbrowser
475 webbrowser.open(url)
476 return 0
477 return 1
478
479
480 def _translate_ch_to_exc(ch):
481 if ch == '\x03':
482 raise KeyboardInterrupt()
483 if ch == '\x04':
484 raise EOFError()
485
486
487 if WIN:
488 import msvcrt
489
490 def getchar(echo):
491 rv = msvcrt.getch()
492 if echo:
493 msvcrt.putchar(rv)
494 _translate_ch_to_exc(rv)
495 if PY2:
496 enc = getattr(sys.stdin, 'encoding', None)
497 if enc is not None:
498 rv = rv.decode(enc, 'replace')
499 else:
500 rv = rv.decode('cp1252', 'replace')
501 return rv
502 else:
503 import tty
504 import termios
505
506 def getchar(echo):
507 if not isatty(sys.stdin):
508 f = open('/dev/tty')
509 fd = f.fileno()
510 else:
511 fd = sys.stdin.fileno()
512 f = None
513 try:
514 old_settings = termios.tcgetattr(fd)
515 try:
516 tty.setraw(fd)
517 ch = os.read(fd, 32)
518 if echo and isatty(sys.stdout):
519 sys.stdout.write(ch)
520 finally:
521 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
522 sys.stdout.flush()
523 if f is not None:
524 f.close()
525 except termios.error:
526 pass
527 _translate_ch_to_exc(ch)
528 return ch.decode(get_best_encoding(sys.stdin), 'replace')