Mercurial > repos > bcclaywell > argo_navis
comparison venv/lib/python2.7/site-packages/click/testing.py @ 0:d67268158946 draft
planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
| author | bcclaywell |
|---|---|
| date | Mon, 12 Oct 2015 17:43:33 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d67268158946 |
|---|---|
| 1 import os | |
| 2 import sys | |
| 3 import shutil | |
| 4 import tempfile | |
| 5 import contextlib | |
| 6 | |
| 7 from ._compat import iteritems, PY2 | |
| 8 | |
| 9 | |
| 10 # If someone wants to vendor click, we want to ensure the | |
| 11 # correct package is discovered. Ideally we could use a | |
| 12 # relative import here but unfortunately Python does not | |
| 13 # support that. | |
| 14 clickpkg = sys.modules[__name__.rsplit('.', 1)[0]] | |
| 15 | |
| 16 | |
| 17 if PY2: | |
| 18 from cStringIO import StringIO | |
| 19 else: | |
| 20 import io | |
| 21 from ._compat import _find_binary_reader | |
| 22 | |
| 23 | |
| 24 class EchoingStdin(object): | |
| 25 | |
| 26 def __init__(self, input, output): | |
| 27 self._input = input | |
| 28 self._output = output | |
| 29 | |
| 30 def __getattr__(self, x): | |
| 31 return getattr(self._input, x) | |
| 32 | |
| 33 def _echo(self, rv): | |
| 34 self._output.write(rv) | |
| 35 return rv | |
| 36 | |
| 37 def read(self, n=-1): | |
| 38 return self._echo(self._input.read(n)) | |
| 39 | |
| 40 def readline(self, n=-1): | |
| 41 return self._echo(self._input.readline(n)) | |
| 42 | |
| 43 def readlines(self): | |
| 44 return [self._echo(x) for x in self._input.readlines()] | |
| 45 | |
| 46 def __iter__(self): | |
| 47 return iter(self._echo(x) for x in self._input) | |
| 48 | |
| 49 def __repr__(self): | |
| 50 return repr(self._input) | |
| 51 | |
| 52 | |
| 53 def make_input_stream(input, charset): | |
| 54 # Is already an input stream. | |
| 55 if hasattr(input, 'read'): | |
| 56 if PY2: | |
| 57 return input | |
| 58 rv = _find_binary_reader(input) | |
| 59 if rv is not None: | |
| 60 return rv | |
| 61 raise TypeError('Could not find binary reader for input stream.') | |
| 62 | |
| 63 if input is None: | |
| 64 input = b'' | |
| 65 elif not isinstance(input, bytes): | |
| 66 input = input.encode(charset) | |
| 67 if PY2: | |
| 68 return StringIO(input) | |
| 69 return io.BytesIO(input) | |
| 70 | |
| 71 | |
| 72 class Result(object): | |
| 73 """Holds the captured result of an invoked CLI script.""" | |
| 74 | |
| 75 def __init__(self, runner, output_bytes, exit_code, exception, | |
| 76 exc_info=None): | |
| 77 #: The runner that created the result | |
| 78 self.runner = runner | |
| 79 #: The output as bytes. | |
| 80 self.output_bytes = output_bytes | |
| 81 #: The exit code as integer. | |
| 82 self.exit_code = exit_code | |
| 83 #: The exception that happend if one did. | |
| 84 self.exception = exception | |
| 85 #: The traceback | |
| 86 self.exc_info = exc_info | |
| 87 | |
| 88 @property | |
| 89 def output(self): | |
| 90 """The output as unicode string.""" | |
| 91 return self.output_bytes.decode(self.runner.charset, 'replace') \ | |
| 92 .replace('\r\n', '\n') | |
| 93 | |
| 94 def __repr__(self): | |
| 95 return '<Result %s>' % ( | |
| 96 self.exception and repr(self.exception) or 'okay', | |
| 97 ) | |
| 98 | |
| 99 | |
| 100 class CliRunner(object): | |
| 101 """The CLI runner provides functionality to invoke a Click command line | |
| 102 script for unittesting purposes in a isolated environment. This only | |
| 103 works in single-threaded systems without any concurrency as it changes the | |
| 104 global interpreter state. | |
| 105 | |
| 106 :param charset: the character set for the input and output data. This is | |
| 107 UTF-8 by default and should not be changed currently as | |
| 108 the reporting to Click only works in Python 2 properly. | |
| 109 :param env: a dictionary with environment variables for overriding. | |
| 110 :param echo_stdin: if this is set to `True`, then reading from stdin writes | |
| 111 to stdout. This is useful for showing examples in | |
| 112 some circumstances. Note that regular prompts | |
| 113 will automatically echo the input. | |
| 114 """ | |
| 115 | |
| 116 def __init__(self, charset=None, env=None, echo_stdin=False): | |
| 117 if charset is None: | |
| 118 charset = 'utf-8' | |
| 119 self.charset = charset | |
| 120 self.env = env or {} | |
| 121 self.echo_stdin = echo_stdin | |
| 122 | |
| 123 def get_default_prog_name(self, cli): | |
| 124 """Given a command object it will return the default program name | |
| 125 for it. The default is the `name` attribute or ``"root"`` if not | |
| 126 set. | |
| 127 """ | |
| 128 return cli.name or 'root' | |
| 129 | |
| 130 def make_env(self, overrides=None): | |
| 131 """Returns the environment overrides for invoking a script.""" | |
| 132 rv = dict(self.env) | |
| 133 if overrides: | |
| 134 rv.update(overrides) | |
| 135 return rv | |
| 136 | |
| 137 @contextlib.contextmanager | |
| 138 def isolation(self, input=None, env=None, color=False): | |
| 139 """A context manager that sets up the isolation for invoking of a | |
| 140 command line tool. This sets up stdin with the given input data | |
| 141 and `os.environ` with the overrides from the given dictionary. | |
| 142 This also rebinds some internals in Click to be mocked (like the | |
| 143 prompt functionality). | |
| 144 | |
| 145 This is automatically done in the :meth:`invoke` method. | |
| 146 | |
| 147 .. versionadded:: 4.0 | |
| 148 The ``color`` parameter was added. | |
| 149 | |
| 150 :param input: the input stream to put into sys.stdin. | |
| 151 :param env: the environment overrides as dictionary. | |
| 152 :param color: whether the output should contain color codes. The | |
| 153 application can still override this explicitly. | |
| 154 """ | |
| 155 input = make_input_stream(input, self.charset) | |
| 156 | |
| 157 old_stdin = sys.stdin | |
| 158 old_stdout = sys.stdout | |
| 159 old_stderr = sys.stderr | |
| 160 | |
| 161 env = self.make_env(env) | |
| 162 | |
| 163 if PY2: | |
| 164 sys.stdout = sys.stderr = bytes_output = StringIO() | |
| 165 if self.echo_stdin: | |
| 166 input = EchoingStdin(input, bytes_output) | |
| 167 else: | |
| 168 bytes_output = io.BytesIO() | |
| 169 if self.echo_stdin: | |
| 170 input = EchoingStdin(input, bytes_output) | |
| 171 input = io.TextIOWrapper(input, encoding=self.charset) | |
| 172 sys.stdout = sys.stderr = io.TextIOWrapper( | |
| 173 bytes_output, encoding=self.charset) | |
| 174 | |
| 175 sys.stdin = input | |
| 176 | |
| 177 def visible_input(prompt=None): | |
| 178 sys.stdout.write(prompt or '') | |
| 179 val = input.readline().rstrip('\r\n') | |
| 180 sys.stdout.write(val + '\n') | |
| 181 sys.stdout.flush() | |
| 182 return val | |
| 183 | |
| 184 def hidden_input(prompt=None): | |
| 185 sys.stdout.write((prompt or '') + '\n') | |
| 186 sys.stdout.flush() | |
| 187 return input.readline().rstrip('\r\n') | |
| 188 | |
| 189 def _getchar(echo): | |
| 190 char = sys.stdin.read(1) | |
| 191 if echo: | |
| 192 sys.stdout.write(char) | |
| 193 sys.stdout.flush() | |
| 194 return char | |
| 195 | |
| 196 default_color = color | |
| 197 def should_strip_ansi(stream=None, color=None): | |
| 198 if color is None: | |
| 199 return not default_color | |
| 200 return not color | |
| 201 | |
| 202 old_visible_prompt_func = clickpkg.termui.visible_prompt_func | |
| 203 old_hidden_prompt_func = clickpkg.termui.hidden_prompt_func | |
| 204 old__getchar_func = clickpkg.termui._getchar | |
| 205 old_should_strip_ansi = clickpkg.utils.should_strip_ansi | |
| 206 clickpkg.termui.visible_prompt_func = visible_input | |
| 207 clickpkg.termui.hidden_prompt_func = hidden_input | |
| 208 clickpkg.termui._getchar = _getchar | |
| 209 clickpkg.utils.should_strip_ansi = should_strip_ansi | |
| 210 | |
| 211 old_env = {} | |
| 212 try: | |
| 213 for key, value in iteritems(env): | |
| 214 old_env[key] = os.environ.get(value) | |
| 215 if value is None: | |
| 216 try: | |
| 217 del os.environ[key] | |
| 218 except Exception: | |
| 219 pass | |
| 220 else: | |
| 221 os.environ[key] = value | |
| 222 yield bytes_output | |
| 223 finally: | |
| 224 for key, value in iteritems(old_env): | |
| 225 if value is None: | |
| 226 try: | |
| 227 del os.environ[key] | |
| 228 except Exception: | |
| 229 pass | |
| 230 else: | |
| 231 os.environ[key] = value | |
| 232 sys.stdout = old_stdout | |
| 233 sys.stderr = old_stderr | |
| 234 sys.stdin = old_stdin | |
| 235 clickpkg.termui.visible_prompt_func = old_visible_prompt_func | |
| 236 clickpkg.termui.hidden_prompt_func = old_hidden_prompt_func | |
| 237 clickpkg.termui._getchar = old__getchar_func | |
| 238 clickpkg.utils.should_strip_ansi = old_should_strip_ansi | |
| 239 | |
| 240 def invoke(self, cli, args=None, input=None, env=None, | |
| 241 catch_exceptions=True, color=False, **extra): | |
| 242 """Invokes a command in an isolated environment. The arguments are | |
| 243 forwarded directly to the command line script, the `extra` keyword | |
| 244 arguments are passed to the :meth:`~clickpkg.Command.main` function of | |
| 245 the command. | |
| 246 | |
| 247 This returns a :class:`Result` object. | |
| 248 | |
| 249 .. versionadded:: 3.0 | |
| 250 The ``catch_exceptions`` parameter was added. | |
| 251 | |
| 252 .. versionchanged:: 3.0 | |
| 253 The result object now has an `exc_info` attribute with the | |
| 254 traceback if available. | |
| 255 | |
| 256 .. versionadded:: 4.0 | |
| 257 The ``color`` parameter was added. | |
| 258 | |
| 259 :param cli: the command to invoke | |
| 260 :param args: the arguments to invoke | |
| 261 :param input: the input data for `sys.stdin`. | |
| 262 :param env: the environment overrides. | |
| 263 :param catch_exceptions: Whether to catch any other exceptions than | |
| 264 ``SystemExit``. | |
| 265 :param extra: the keyword arguments to pass to :meth:`main`. | |
| 266 :param color: whether the output should contain color codes. The | |
| 267 application can still override this explicitly. | |
| 268 """ | |
| 269 exc_info = None | |
| 270 with self.isolation(input=input, env=env, color=color) as out: | |
| 271 exception = None | |
| 272 exit_code = 0 | |
| 273 | |
| 274 try: | |
| 275 cli.main(args=args or (), | |
| 276 prog_name=self.get_default_prog_name(cli), **extra) | |
| 277 except SystemExit as e: | |
| 278 if e.code != 0: | |
| 279 exception = e | |
| 280 | |
| 281 exc_info = sys.exc_info() | |
| 282 | |
| 283 exit_code = e.code | |
| 284 if not isinstance(exit_code, int): | |
| 285 sys.stdout.write(str(exit_code)) | |
| 286 sys.stdout.write('\n') | |
| 287 exit_code = 1 | |
| 288 except Exception as e: | |
| 289 if not catch_exceptions: | |
| 290 raise | |
| 291 exception = e | |
| 292 exit_code = -1 | |
| 293 exc_info = sys.exc_info() | |
| 294 finally: | |
| 295 sys.stdout.flush() | |
| 296 output = out.getvalue() | |
| 297 | |
| 298 return Result(runner=self, | |
| 299 output_bytes=output, | |
| 300 exit_code=exit_code, | |
| 301 exception=exception, | |
| 302 exc_info=exc_info) | |
| 303 | |
| 304 @contextlib.contextmanager | |
| 305 def isolated_filesystem(self): | |
| 306 """A context manager that creates a temporary folder and changes | |
| 307 the current working directory to it for isolated filesystem tests. | |
| 308 """ | |
| 309 cwd = os.getcwd() | |
| 310 t = tempfile.mkdtemp() | |
| 311 os.chdir(t) | |
| 312 try: | |
| 313 yield t | |
| 314 finally: | |
| 315 os.chdir(cwd) | |
| 316 try: | |
| 317 shutil.rmtree(t) | |
| 318 except (OSError, IOError): | |
| 319 pass |
