From 89985ce4109b9ff2b34127d2e8ad3992769fb57a Mon Sep 17 00:00:00 2001 From: Panos Date: Thu, 20 Aug 2020 12:46:15 +0100 Subject: [PATCH] Output exit code (#203) Migrated exit code to HostOutput property. Added tests. Updated output generation for existing commands via get_last_output Updated travis cfg, osx wheels Updated changelog, readme and docs --- .travis.yml | 99 +++++++--------------- Changelog.rst | 19 +++++ README.rst | 11 ++- ci/osx-wheel.sh | 25 ++++++ doc/quickstart.rst | 14 ++- pssh/__init__.py | 2 +- pssh/clients/base_pssh.py | 113 +++++++++++++------------ pssh/clients/miko/parallel.py | 36 ++++---- pssh/clients/miko/single.py | 7 ++ pssh/clients/native/parallel.py | 44 +++------- pssh/clients/native/single.py | 5 ++ pssh/output.py | 19 ++++- tests/test_native_parallel_client.py | 89 +++++++++---------- tests/test_output.py | 28 ++++-- tests/test_paramiko_parallel_client.py | 60 ++++++------- 15 files changed, 309 insertions(+), 262 deletions(-) create mode 100755 ci/osx-wheel.sh diff --git a/.travis.yml b/.travis.yml index 22b7ac4d..2fdefaaf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,9 +28,9 @@ install: script: # For testing SSH agent related functionality - eval `ssh-agent -s` - - pytest --reruns 5 --cov-append --cov=pssh -s tests/test_native_tunnel.py - - pytest --reruns 5 --cov-append --cov=pssh -s tests/test_native_*_client.py - - pytest --reruns 5 --cov-append --cov=pssh -s tests/test_paramiko*.py + - pytest --reruns 5 --cov-append --cov=pssh tests/test_native_tunnel.py + - pytest --reruns 5 --cov-append --cov=pssh tests/test_native_*_client.py + - pytest --reruns 5 --cov-append --cov=pssh tests/test_paramiko*.py - flake8 pssh - cd doc; make html; cd .. # Test building from source distribution @@ -43,7 +43,34 @@ after_success: jobs: include: + - stage: test + if: (type = push OR \ + (type = pull_request AND fork = true)) AND \ + tag IS blank + + - &osx-wheels + stage: build packages + if: tag IS present + os: osx + osx_image: xcode11.6 + before_install: + - sudo -H pip install twine + - which twine + install: skip + script: + - ./ci/osx-wheel.sh + after_success: + - if [[ ! -z "$TRAVIS_TAG" ]]; then + twine upload --skip-existing -u $PYPI_U -p $PYPI_P wheels/*.whl; + fi + language: generic + python: skip + + - <<: *osx-wheels + osx_image: xcode11.3 + - stage: build packages + if: tag IS present env: - WHEELS=1 os: linux @@ -69,69 +96,3 @@ jobs: distributions: sdist skip_upload_docs: true skip_cleanup: true - - - &osx-wheels - stage: build packages - os: osx - osx_image: xcode11.6 - env: - - PYENV: 3.6.11 - before_install: - - sudo -H pip install twine - - which twine - - mkdir -p wheels - install: skip - script: - - ./ci/travis/pyenv-wheel.sh - after_success: - - if [[ ! -z "$TRAVIS_TAG" ]]; then - twine upload --skip-existing -u $PYPI_U -p $PYPI_P wheels/*.whl; - fi - language: generic - python: skip - - - <<: *osx-wheels - osx_image: xcode11.3 - env: - - PYENV: 3.6.11 - - - <<: *osx-wheels - osx_image: xcode11.6 - env: - - PYENV: 3.7.8 - - - <<: *osx-wheels - osx_image: xcode11.3 - env: - - PYENV: 3.7.8 - - - <<: *osx-wheels - osx_image: xcode11.3 - env: - - PYENV: 3.8.5 - - - <<: *osx-wheels - osx_image: xcode11.6 - env: - - PYENV: 3.8.5 - - # - os: linux - # stage: build packages - # env: - # - SYSTEM_PACKAGES=1 - # python: 3.6 - # before_install: skip - # install: skip - # script: - # - docker login -u="$DOCKER_USERNAME" -p="$DOCKER_PASSWORD" - # - ./ci/docker/build-packages.sh - # deploy: - # - provider: releases - # skip_cleanup: true - # api_key: - # secure: hKf+D9ZWRCJWNQtlOWeFh7z1a+VSz+GK5qOY0e1+iV/PrM0f41wy2yej0bxG1zS6CQAnJBK6/gmq5uXXhQhGNQeIQs7zElyKlrijQAn5UstPPJTRIk2oywRr2b+q0k3V42tto6WbhjqPRpOQl/pNTjKJCc/UPgd6kOVZEhCfAec= - # file_glob: true - # file: '*.{deb,rpm}' - # on: - # repo: ParallelSSH/parallel-ssh - # tags: true diff --git a/Changelog.rst b/Changelog.rst index cc09ad40..0a836089 100644 --- a/Changelog.rst +++ b/Changelog.rst @@ -1,6 +1,25 @@ Change Log ============ +1.11.0 +++++++ + +Changes +------- + +* Moved polling to gevent.select.poll to increase performance and better handle high number of sockets - #189 +* ``HostOutput.exit_code`` is now a dynamic property returning either ``None`` when exit code not ready or the exit code as reported by channel. ``ParallelSSHClient.get_exit_codes`` is now a no-op and scheduled to be removed. + +Packaging +--------- + +* Removed OSX Python 3.6 and 3.7 wheels. OSX wheels for brew python, currently 3.8, on OSX 10.14 and 10.15 are provided. + +Fixes +------ + +* Native client would fail on opening sockets with large file descriptor values - #189 + 1.10.0 +++++++ diff --git a/README.rst b/README.rst index 4fe8e695..91fc95d3 100644 --- a/README.rst +++ b/README.rst @@ -116,7 +116,13 @@ Native Code Client Features Exit codes *********** -Once *either* standard output is iterated on *to completion*, or ``client.join(output)`` is called, exit codes become available in host output. Iteration ends *only when remote command has completed*, though it may be interrupted and resumed at any point. +Once *either* standard output is iterated on *to completion*, or ``client.join(output)`` is called, exit codes become available in host output. + +Iteration ends *only when remote command has completed*, though it may be interrupted and resumed at any point. + +``HostOutput.exit_code`` is a dynamic property and will return ``None`` when exit code is not ready, meaning command has not finished, or channel is unavailable due to error. + +Once all output has been gathered exit codes become available even without calling ``join``. .. code-block:: python @@ -144,8 +150,7 @@ Similarly, output and exit codes are available after ``client.join`` is called: output = client.run_command('exit 0') - # Wait for commands to complete and gather exit codes. - # Output is updated in-place. + # Wait for commands to complete client.join(output) pprint(output.values()[0].exit_code) diff --git a/ci/osx-wheel.sh b/ci/osx-wheel.sh new file mode 100755 index 00000000..5fa974b5 --- /dev/null +++ b/ci/osx-wheel.sh @@ -0,0 +1,25 @@ +#!/bin/bash -xe + +pip3 install -U virtualenv +python3 -m virtualenv -p "$(which python3)" venv + +set +x +source venv/bin/activate +set -x + +python -V +pip3 install -U setuptools pip +pip3 install -U delocate wheel +python3 setup.py bdist_wheel +delocate-listdeps dist/*.whl +delocate-wheel -v -w wheels dist/*.whl +delocate-listdeps wheels/*.whl + +ls -l wheels/*.whl +pip3 install -v wheels/*.whl +pwd; mkdir -p temp; cd temp; pwd +python3 -c "from pssh.clients import ParallelSSHClient" +cd ..; pwd +set +x +deactivate +set -x diff --git a/doc/quickstart.rst b/doc/quickstart.rst index 1cdc1273..c30e6262 100644 --- a/doc/quickstart.rst +++ b/doc/quickstart.rst @@ -131,9 +131,9 @@ It is advised that client code uses ``return_list=True`` to avoid breaking on up Exit codes ------------- -Exit codes are available on the host output object. +Exit codes are available on the host output object as a dynamic property. Exit code will be ``None`` if not available, or the exit code as reported by channel. -First, ensure that all commands have finished and exit codes gathered by joining on the output object, then iterate over all host's output to print their exit codes. +First, ensure that all commands have finished by either joining on the output object or gathering all output, then iterate over all host's output to print their exit codes. .. code-block:: python @@ -141,6 +141,16 @@ First, ensure that all commands have finished and exit codes gathered by joining for host, host_output in output.items(): print("Host %s exit code: %s" % (host, host_output.exit_code)) +As of ``1.11.0``, ``client.join`` is not required as long as output has been gathered. + +.. code-block:: python + + for host_out in output.values(): + for line in host_out.stdout: + print(line) + print(host_out.exit_code) + + .. seealso:: :py:class:`pssh.output.HostOutput` diff --git a/pssh/__init__.py b/pssh/__init__.py index da6fbe77..8f5b416c 100644 --- a/pssh/__init__.py +++ b/pssh/__init__.py @@ -21,7 +21,7 @@ asynchronously and with minimal system load on the client host. New users should start with -:py:func:`pssh.pssh_client.ParallelSSHClient.run_command` +:py:func:`pssh.clients.ParallelSSHClient.run_command` See also :py:class:`pssh.ParallelSSHClient` and :py:class:mod:`pssh.SSHClient` for class documentation. diff --git a/pssh/clients/base_pssh.py b/pssh/clients/base_pssh.py index bc7386d9..20fae6b9 100644 --- a/pssh/clients/base_pssh.py +++ b/pssh/clients/base_pssh.py @@ -82,11 +82,12 @@ def run_command(self, command, user=None, stop_on_errors=True, greenlet_timeout = kwargs.pop('greenlet_timeout', None) if host_args: try: - cmds = [self.pool.spawn(self._run_command, host_i, host, - command % host_args[host_i], - user=user, encoding=encoding, - use_pty=use_pty, shell=shell, - *args, **kwargs) + cmds = [self.pool.spawn( + self._run_command, host_i, host, + command % host_args[host_i], + user=user, encoding=encoding, + use_pty=use_pty, shell=shell, + *args, **kwargs) for host_i, host in enumerate(self.hosts)] except IndexError: raise HostArgumentException( @@ -100,24 +101,30 @@ def run_command(self, command, user=None, stop_on_errors=True, for host_i, host in enumerate(self.hosts)] self.cmds = cmds joinall(cmds, raise_error=False, timeout=greenlet_timeout) + return self._get_output_from_cmds(cmds, stop_on_errors=stop_on_errors, + timeout=greenlet_timeout, + return_list=return_list) + + def _get_output_from_cmds(self, cmds, stop_on_errors=False, timeout=None, + return_list=False): if not return_list: warn(_output_depr_notice) output = {} return self._get_output_dict( cmds, output, stop_on_errors=stop_on_errors, - timeout=greenlet_timeout) - return [self._get_output_from_greenlet(cmd, timeout=greenlet_timeout) + timeout=timeout) + return [self._get_output_from_greenlet(cmd, timeout=timeout) for cmd in cmds] def _get_output_from_greenlet(self, cmd, timeout=None): try: - (channel, host, stdout, stderr, stdin) = cmd.get(timeout=timeout) + (channel, host, stdout, stderr, stdin), _client = cmd.get( + timeout=timeout) except Exception as ex: host = ex.host - return HostOutput(host, cmd, None, None, None, None, exception=ex) - return HostOutput( - host, cmd, channel, stdout, stderr, stdin, - exit_code=self._get_exit_code(channel)) + return HostOutput(host, cmd, None, None, None, None, + None, exception=ex) + return HostOutput(host, cmd, channel, stdout, stderr, stdin, _client) def _get_output_dict(self, cmds, output, timeout=None, stop_on_errors=False): @@ -158,14 +165,9 @@ def get_last_output(self, cmds=None, greenlet_timeout=None, cmds = self.cmds if cmds is None else cmds if cmds is None: return - if not return_list: - warn(_output_depr_notice) - output = {} - for cmd in self.cmds: - self.get_output(cmd, output, timeout=greenlet_timeout) - return output - return [self._get_output_from_greenlet(cmd, timeout=greenlet_timeout) - for cmd in cmds] + return self._get_output_from_cmds( + cmds, timeout=greenlet_timeout, return_list=return_list, + stop_on_errors=False) def _get_host_config_values(self, host): _user = self.host_config.get(host, {}).get('user', self.user) @@ -175,8 +177,19 @@ def _get_host_config_values(self, host): _pkey = self.host_config.get(host, {}).get('private_key', self.pkey) return _user, _port, _password, _pkey - def _run_command(self, host_i, host, command, *args, **kwargs): - raise NotImplementedError + def _run_command(self, host_i, host, command, sudo=False, user=None, + shell=None, use_pty=False, + encoding='utf-8', timeout=None): + """Make SSHClient if needed, run command on host""" + try: + _client = self._make_ssh_client(host_i, host) + return _client.run_command( + command, sudo=sudo, user=user, shell=shell, + use_pty=use_pty, encoding=encoding, timeout=timeout), _client + except Exception as ex: + ex.host = host + logger.error("Failed to run on host %s - %s", host, ex) + raise ex def get_output(self, cmd, output, timeout=None): """Get output from command. @@ -193,17 +206,24 @@ def get_output(self, cmd, output, timeout=None): "get_output is for the deprecated dictionary output only. " "To be removed in 2.0.0") try: - (channel, host, stdout, stderr, stdin) = cmd.get(timeout=timeout) + (channel, host, stdout, stderr, stdin), _client = cmd.get( + timeout=timeout) except Exception as ex: host = ex.host self._update_host_output( - output, host, None, None, None, None, None, cmd, exception=ex) + output, host, None, None, None, None, cmd, None, exception=ex) raise - self._update_host_output(output, host, self._get_exit_code(channel), - channel, stdout, stderr, stdin, cmd) + self._update_host_output( + output, host, channel, stdout, stderr, stdin, cmd, _client) + + def _consume_output(self, stdout, stderr): + for line in stdout: + pass + for line in stderr: + pass - def _update_host_output(self, output, host, exit_code, channel, stdout, - stderr, stdin, cmd, exception=None): + def _update_host_output(self, output, host, channel, stdout, + stderr, stdin, cmd, client, exception=None): """Update host output with given data""" if host in output: new_host = "_".join([host, @@ -214,7 +234,7 @@ def _update_host_output(self, output, host, exit_code, channel, stdout, "key for %s to %s", host, host, new_host) host = new_host output[host] = HostOutput(host, cmd, channel, stdout, stderr, stdin, - exit_code=exit_code, exception=exception) + client, exception=exception) def join(self, output, consume_output=False): raise NotImplementedError @@ -233,37 +253,20 @@ def finished(self, output): return True def get_exit_codes(self, output): - """Get exit code for all hosts in output *if available*. - Output parameter is modified in-place. + """This function is now a no-op. Exit code is gathered + on calling .exit_code on a ``HostOutput`` object. - :param output: As returned by - :py:func:`pssh.pssh_client.ParallelSSHClient.get_output` - :rtype: None + to be removed in 2.0.0 """ - if isinstance(output, list): - for host_out in output: - if host_out is None: - continue - host_out.exit_code = self.get_exit_code(host_out) - elif isinstance(output, dict): - for host in output: - host_out = output[host] - if output[host] is None: - continue - host_out.exit_code = self.get_exit_code(host_out) - else: - raise ValueError("Unexpected output object type") + warn("get_exit_codes is deprecated and will be removed in 2.0.0") def get_exit_code(self, host_output): - """Get exit code from host output *if available*. + """This function is now a no-op. Exit code is gathered + on calling .exit_code on a ``HostOutput`` object. - :param host_output: Per host output as returned by - :py:func:`pssh.pssh_client.ParallelSSHClient.get_output` - :rtype: int or None if exit code not ready""" - if not hasattr(host_output, 'channel'): - logger.error("%s does not look like host output..", host_output,) - return - return self._get_exit_code(host_output.channel) + to be removed in 2.0.0 + """ + warn("get_exit_code is deprecated and will be removed in 2.0.0") def copy_file(self, local_file, remote_file, recurse=False, copy_args=None): """Copy local file to remote file in parallel diff --git a/pssh/clients/miko/parallel.py b/pssh/clients/miko/parallel.py index 04c7b27e..9b80d7f3 100644 --- a/pssh/clients/miko/parallel.py +++ b/pssh/clients/miko/parallel.py @@ -241,10 +241,10 @@ def _run_command(self, host_i, host, command, sudo=False, user=None, **paramiko_kwargs): """Make SSHClient, run command on host""" try: - self._make_ssh_client(host_i, host, **paramiko_kwargs) + client = self._make_ssh_client(host_i, host, **paramiko_kwargs) return self._host_clients[(host_i, host)].exec_command( command, sudo=sudo, user=user, shell=shell, - use_shell=use_shell, use_pty=use_pty) + use_shell=use_shell, use_pty=use_pty), client except Exception as ex: ex.host = host logger.error("Failed to run on host %s", host) @@ -263,7 +263,7 @@ def get_output(self, cmd, output, encoding='utf-8'): :type output: dict :rtype: None""" try: - (channel, host, stdout, stderr, stdin) = cmd.get() + (channel, host, stdout, stderr, stdin), client = cmd.get() except Exception as ex: try: host = ex.args[1] @@ -272,7 +272,7 @@ def get_output(self, cmd, output, encoding='utf-8'): "cannot update output data with %s", ex) raise ex self._update_host_output( - output, host, None, None, None, None, None, cmd, exception=ex) + output, host, None, None, None, None, cmd, None, exception=ex) raise stdout = self.host_clients[host].read_output_buffer( stdout, callback=self.get_exit_codes, @@ -282,8 +282,8 @@ def get_output(self, cmd, output, encoding='utf-8'): stderr, prefix='\t[err]', callback=self.get_exit_codes, callback_args=(output,), encoding=encoding) - self._update_host_output(output, host, self._get_exit_code(channel), - channel, stdout, stderr, stdin, cmd) + self._update_host_output(output, host, + channel, stdout, stderr, stdin, cmd, client) def join(self, output, consume_output=False): """Block until all remote commands in output have finished @@ -300,12 +300,17 @@ def join(self, output, consume_output=False): for host in output: output[host].cmd.join() if output[host].channel is not None: + # This blocks greenlet until cmd completion output[host].channel.recv_exit_status() + if not output[host].channel.closed: + output[host].channel.close() if consume_output: - for line in output[host].stdout: - pass - for line in output[host].stderr: - pass + if output[host].stdout: + for line in output[host].stdout: + pass + if output[host].stderr: + for line in output[host].stderr: + pass self.get_exit_codes(output) def finished(self, output): @@ -317,17 +322,10 @@ def finished(self, output): """ for host in output: chan = output[host].channel - if chan is not None and not chan.closed: + if chan is not None and not chan.exit_status_ready(): return False return True - def _get_exit_code(self, channel): - """Get exit code from channel if ready""" - if channel is None or not channel.exit_status_ready(): - return - channel.close() - return channel.recv_exit_status() - def _make_ssh_client(self, host_i, host, **paramiko_kwargs): if (host_i, host) not in self._host_clients \ or self._host_clients[(host_i, host)] is None: @@ -343,3 +341,5 @@ def _make_ssh_client(self, host_i, host, **paramiko_kwargs): **paramiko_kwargs) self.host_clients[host] = _client self._host_clients[(host_i, host)] = _client + return _client + return self._host_clients[(host_i, host)] diff --git a/pssh/clients/miko/single.py b/pssh/clients/miko/single.py index 9e644049..7a49fcce 100644 --- a/pssh/clients/miko/single.py +++ b/pssh/clients/miko/single.py @@ -493,3 +493,10 @@ def _parent_paths_split(self, file_path, sep=None): if file_path.startswith(sep) or not destination: destination = sep + destination return destination + + def get_exit_status(self, channel): + if channel is None or not channel.exit_status_ready(): + return + if not channel.closed: + channel.close() + return channel.recv_exit_status() diff --git a/pssh/clients/native/parallel.py b/pssh/clients/native/parallel.py index 27efd6f8..59bac31a 100644 --- a/pssh/clients/native/parallel.py +++ b/pssh/clients/native/parallel.py @@ -240,20 +240,6 @@ def __del__(self): pass del s_client - def _run_command(self, host_i, host, command, sudo=False, user=None, - shell=None, use_pty=False, - encoding='utf-8', timeout=None): - """Make SSHClient if needed, run command on host""" - try: - self._make_ssh_client(host_i, host) - return self._host_clients[(host_i, host)].run_command( - command, sudo=sudo, user=user, shell=shell, - use_pty=use_pty, encoding=encoding, timeout=timeout) - except Exception as ex: - ex.host = host - logger.error("Failed to run on host %s - %s", host, ex) - raise ex - def join(self, output, consume_output=False, timeout=None, encoding='utf-8'): """Wait until all remote commands in output have finished @@ -261,8 +247,7 @@ def join(self, output, consume_output=False, timeout=None, running in parallel. :param output: Output of commands to join on - :type output: dict as returned by - :py:func:`pssh.pssh_client.ParallelSSHClient.get_output` + :type output: `HostOutput` objects :param consume_output: Whether or not join should consume output buffers. Output buffers will be empty after ``join`` if set to ``True``. Must be set to ``True`` to allow host logger to log @@ -286,29 +271,30 @@ def join(self, output, consume_output=False, timeout=None, for host_i, host_out in enumerate(output): host = self.hosts[host_i] cmds.append(self.pool.spawn( - self._join, host_i, host, host_out, + self._join, host_out, consume_output=consume_output, timeout=timeout)) elif isinstance(output, dict): for host_i, (host, host_out) in enumerate(output.items()): cmds.append(self.pool.spawn( - self._join, host_i, host, host_out, + self._join, host_out, consume_output=consume_output, timeout=timeout)) else: raise ValueError("Unexpected output object type") # Errors raised by self._join should be propagated. # Timeouts are handled by self._join itself. joinall(cmds, raise_error=True) - self.get_exit_codes(output) - def _join(self, host_i, host, host_out, consume_output=False, timeout=None, + def _join(self, host_out, consume_output=False, timeout=None, encoding="utf-8"): - if (host_i, host) not in self._host_clients or \ - self._host_clients[(host_i, host)] is None: + if host_out is None: return - client = self._host_clients[(host_i, host)] channel = host_out.channel + client = host_out.client + host = host_out.host + if client is None: + return stdout, stderr = self.reset_output_generators( - host_out, client=client, channel=channel, timeout=timeout, + host_out, channel=channel, timeout=timeout, encoding=encoding) try: client.wait_finished(channel, timeout=timeout) @@ -349,7 +335,7 @@ def reset_output_generators(self, host_out, timeout=None, :rtype: tuple(stdout, stderr) """ channel = host_out.channel if channel is None else channel - client = self.host_clients[host_out.host] if client is None else client + client = host_out.client if client is None else client stdout = client.read_output_buffer( client.read_output(channel, timeout=timeout), encoding=encoding) stderr = client.read_output_buffer( @@ -365,12 +351,6 @@ def _consume_output(self, stdout, stderr): for line in stderr: pass - def _get_exit_code(self, channel): - """Get exit code from channel if ready""" - if channel is None: - return - return channel.get_exit_status() - def _start_tunnel_thread(self): self._tunnel_lock = RLock() self._tunnel_in_q = deque() @@ -434,6 +414,8 @@ def _make_ssh_client(self, host_i, host): keepalive_seconds=self.keepalive_seconds) self.host_clients[host] = _client self._host_clients[(host_i, host)] = _client + return _client + return self._host_clients[(host_i, host)] def copy_file(self, local_file, remote_file, recurse=False, copy_args=None): """Copy local file to remote file in parallel via SFTP. diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index bb2f4d7a..f8d55d10 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -874,3 +874,8 @@ def _remote_paths_split(self, file_path): if _sep > 0: return file_path[:_sep] return + + def get_exit_status(self, channel): + if not channel.eof(): + return + return channel.get_exit_status() diff --git a/pssh/output.py b/pssh/output.py index 0cf23dcd..da5bc122 100644 --- a/pssh/output.py +++ b/pssh/output.py @@ -20,15 +20,17 @@ from os import linesep +from . import logger + class HostOutput(dict): """Class to hold host output""" __slots__ = ('host', 'cmd', 'channel', 'stdout', 'stderr', 'stdin', - 'exit_code', 'exception') + 'client', 'exception') def __init__(self, host, cmd, channel, stdout, stderr, stdin, - exit_code=None, exception=None): + client, exception=None): """ :param host: Host name output is for :type host: str @@ -50,7 +52,7 @@ def __init__(self, host, cmd, channel, stdout, stderr, stdin, super(HostOutput, self).__init__( (('host', host), ('cmd', cmd), ('channel', channel), ('stdout', stdout), ('stderr', stderr), - ('stdin', stdin), ('exit_code', exit_code), + ('stdin', stdin), ('exception', exception))) self.host = host self.cmd = cmd @@ -58,8 +60,17 @@ def __init__(self, host, cmd, channel, stdout, stderr, stdin, self.stdout = stdout self.stderr = stderr self.stdin = stdin + self.client = client self.exception = exception - self.exit_code = exit_code + + @property + def exit_code(self): + if not self.client: + return + try: + return self.client.get_exit_status(self.channel) + except Exception as ex: + logger.error("Error getting exit status - %s", ex) def __setattr__(self, name, value): object.__setattr__(self, name, value) diff --git a/tests/test_native_parallel_client.py b/tests/test_native_parallel_client.py index 47935d62..bf7fe2f3 100644 --- a/tests/test_native_parallel_client.py +++ b/tests/test_native_parallel_client.py @@ -35,6 +35,7 @@ import time +from pytest import mark from gevent import joinall, spawn, Greenlet from pssh.clients.native import ParallelSSHClient from pssh.exceptions import UnknownHostException, \ @@ -116,9 +117,9 @@ def test_client_join_stdout(self): expected_stdout = [self.resp] expected_stderr = [] self.client.join(output) - exit_code = output[self.host]['exit_code'] stdout = list(output[self.host].stdout) stderr = list(output[self.host].stderr) + exit_code = output[self.host].exit_code self.assertEqual(expected_exit_code, exit_code, msg="Got unexpected exit code - %s, expected %s" % (exit_code, @@ -132,7 +133,7 @@ def test_client_join_stdout(self): (stderr, expected_stderr,)) output = self.client.run_command(";".join([self.cmd, 'exit 1'])) - self.client.join(output) + self.client.join(output, consume_output=True) exit_code = output[self.host].exit_code self.assertTrue(exit_code == 1) self.assertTrue(len(output), len(self.client.cmds)) @@ -158,11 +159,11 @@ def test_get_last_output(self): self.assertEqual(len(client.cmds), len(hosts)) output = client.get_last_output() self.assertTrue(len(output), len(hosts)) - client.join(output) + client.join(output, consume_output=True) for host in hosts: self.assertTrue(host in output) exit_code = output[host].exit_code - self.assertTrue(exit_code == 0) + self.assertEqual(exit_code, 0) output = client.get_last_output(return_list=True) self.assertTrue(len(output), len(hosts)) for i, host_output in enumerate(output): @@ -171,28 +172,24 @@ def test_get_last_output(self): server.stop() def test_pssh_client_no_stdout_non_zero_exit_code_immediate_exit(self): - output = self.client.run_command('exit 1') + output = self.client.run_command('exit 1', return_list=True) expected_exit_code = 1 self.client.join(output) - exit_code = output[self.host]['exit_code'] + exit_code = output[0].exit_code self.assertEqual(expected_exit_code, exit_code, msg="Got unexpected exit code - %s, expected %s" % (exit_code, expected_exit_code,)) - # def test_pssh_client_no_stdout_non_zero_exit_code_immediate_exit_no_join(self): - # output = self.client.run_command('echo me; exit 1', return_list=True) - # expected_exit_code = 1 - # for host_out in output: - # for line in host_out.stdout: - # pass - # # self.client.join(output) - # exit_code = output[0].exit_code - # self.client.join(output) - # self.assertEqual(expected_exit_code, exit_code, - # msg="Got unexpected exit code - %s, expected %s" % - # (exit_code, - # expected_exit_code,)) + def test_pssh_client_no_stdout_non_zero_exit_code_immediate_exit_no_join(self): + output = self.client.run_command('echo me && exit 1', return_list=True) + expected_exit_code = 1 + for host_out in output: + for line in host_out.stdout: + pass + exit_code = output[0].exit_code + self.client.join(output) + self.assertEqual(expected_exit_code, exit_code) def test_pssh_client_run_command_get_output(self): output = self.client.run_command(self.cmd) @@ -201,7 +198,7 @@ def test_pssh_client_run_command_get_output(self): expected_stderr = [] stdout = list(output[self.host]['stdout']) stderr = list(output[self.host]['stderr']) - exit_code = output[self.host]['exit_code'] + exit_code = output[self.host].exit_code self.assertEqual(expected_exit_code, exit_code, msg="Got unexpected exit code - %s, expected %s" % (exit_code, @@ -226,7 +223,7 @@ def test_pssh_client_run_command_get_output_explicit(self): expected_stderr = [] stdout = list(output[self.host]['stdout']) stderr = list(output[self.host]['stderr']) - exit_code = output[self.host]['exit_code'] + exit_code = output[self.host].exit_code self.assertEqual(expected_exit_code, exit_code, msg="Got unexpected exit code - %s, expected %s" % (exit_code, @@ -326,15 +323,15 @@ def test_pssh_client_long_running_command_exit_codes(self): output = self.client.run_command(self.long_cmd(expected_lines)) self.client.join(output) self.assertTrue(self.host in output, msg="Got no output for command") - self.assertTrue(not output[self.host]['exit_code'], + self.assertTrue(not output[self.host].exit_code, msg="Got exit code %s for still running cmd.." % ( - output[self.host]['exit_code'],)) + output[self.host].exit_code,)) self.assertFalse(self.client.finished(output)) self.client.join(output, consume_output=True) self.assertTrue(self.client.finished(output)) - self.assertTrue(output[self.host]['exit_code'] == 0, + self.assertTrue(output[self.host].exit_code == 0, msg="Got non-zero exit code %s" % ( - output[self.host]['exit_code'],)) + output[self.host].exit_code,)) def test_pssh_client_retries(self): """Test connection error retries""" @@ -883,6 +880,9 @@ def test_connection_error_exception(self): self.assertTrue('exception' in output[host], msg="Got no exception for host %s - expected connection error" % ( host,)) + for host_output in output.values(): + exit_code = host_output.exit_code + self.assertEqual(exit_code, None) try: raise output[host]['exception'] except ConnectionErrorException as ex: @@ -907,7 +907,7 @@ def test_multiple_single_quotes_in_cmd(self): expected = 'me and me' self.assertTrue(len(stdout)==1, msg="Got incorrect number of lines in output - %s" % (stdout,)) - self.assertTrue(output[self.host]['exit_code'] == 0, + self.assertTrue(output[self.host].exit_code == 0, msg="Error executing cmd with multiple single quotes - %s" % ( stdout,)) self.assertEqual(expected, stdout[0], @@ -917,19 +917,19 @@ def test_multiple_single_quotes_in_cmd(self): def test_backtics_in_cmd(self): """Test running command with backtics in it""" output = self.client.run_command("out=`ls` && echo $out") - self.client.join(output) - self.assertTrue(output[self.host]['exit_code'] == 0, + self.client.join(output, consume_output=True) + self.assertEqual(output[self.host].exit_code, 0, msg="Error executing cmd with backtics - error code %s" % ( - output[self.host]['exit_code'],)) + output[self.host].exit_code,)) def test_multiple_shell_commands(self): """Test running multiple shell commands in one go""" output = self.client.run_command("echo me; echo and; echo me") stdout = list(output[self.host]['stdout']) expected = ["me", "and", "me"] - self.assertTrue(output[self.host]['exit_code'] == 0, + self.assertTrue(output[self.host].exit_code == 0, msg="Error executing multiple shell cmds - error code %s" % ( - output[self.host]['exit_code'],)) + output[self.host].exit_code,)) self.assertEqual(expected, stdout, msg="Got unexpected output. Expected %s, got %s" % ( expected, stdout,)) @@ -939,9 +939,9 @@ def test_escaped_quotes(self): output = self.client.run_command('t="--flags=\\"this\\""; echo $t') stdout = list(output[self.host]['stdout']) expected = ['--flags="this"'] - self.assertTrue(output[self.host]['exit_code'] == 0, + self.assertTrue(output[self.host].exit_code == 0, msg="Error executing multiple shell cmds - error code %s" % ( - output[self.host]['exit_code'],)) + output[self.host].exit_code,)) self.assertEqual(expected, stdout, msg="Got unexpected output. Expected %s, got %s" % ( expected, stdout,)) @@ -1001,7 +1001,7 @@ def test_pssh_client_override_allow_agent_authentication(self): expected_stderr = [] stdout = list(output[self.host]['stdout']) stderr = list(output[self.host]['stderr']) - exit_code = output[self.host]['exit_code'] + exit_code = output[self.host].exit_code self.assertEqual(expected_exit_code, exit_code, msg="Got unexpected exit code - %s, expected %s" % (exit_code, @@ -1038,7 +1038,7 @@ def test_per_host_tuple_args(self): expected = [host_args[i]] stdout = list(output[host]['stdout']) self.assertEqual(expected, stdout) - self.assertTrue(output[host]['exit_code'] == 0) + self.assertTrue(output[host].exit_code == 0) host_args = (('arg1', 'arg2'), ('arg3', 'arg4'), ('arg5', 'arg6'),) cmd = 'echo %s %s' output = client.run_command(cmd, host_args=host_args) @@ -1046,7 +1046,7 @@ def test_per_host_tuple_args(self): expected = ["%s %s" % host_args[i]] stdout = list(output[host]['stdout']) self.assertEqual(expected, stdout) - self.assertTrue(output[host]['exit_code'] == 0) + self.assertTrue(output[host].exit_code == 0) self.assertRaises(HostArgumentException, client.run_command, cmd, host_args=[host_args[0]]) # Invalid number of args @@ -1077,7 +1077,7 @@ def test_per_host_dict_args(self): expected = ["%(host_arg1)s %(host_arg2)s" % host_args[i]] stdout = list(output[host]['stdout']) self.assertEqual(expected, stdout) - self.assertTrue(output[host]['exit_code'] == 0) + self.assertTrue(output[host].exit_code == 0) self.assertRaises(HostArgumentException, client.run_command, cmd, host_args=[host_args[0]]) # Host list generator should work also @@ -1087,7 +1087,7 @@ def test_per_host_dict_args(self): expected = ["%(host_arg1)s %(host_arg2)s" % host_args[i]] stdout = list(output[host]['stdout']) self.assertEqual(expected, stdout) - self.assertTrue(output[host]['exit_code'] == 0) + self.assertTrue(output[host].exit_code == 0) client.hosts = (h for h in hosts) self.assertRaises(HostArgumentException, client.run_command, cmd, host_args=[host_args[0]]) @@ -1141,12 +1141,11 @@ def test_output_attributes(self): expected_stdout = [self.resp] expected_stderr = [] self.client.join(output) - exit_code = output[self.host]['exit_code'] + exit_code = output[self.host].exit_code stdout = list(output[self.host]['stdout']) stderr = list(output[self.host]['stderr']) host_output = output[self.host] self.assertEqual(expected_exit_code, host_output.exit_code) - self.assertEqual(expected_exit_code, host_output['exit_code']) self.assertEqual(host_output['cmd'], host_output.cmd) self.assertEqual(host_output['exception'], host_output.exception) self.assertEqual(host_output['stdout'], host_output.stdout) @@ -1222,9 +1221,11 @@ def test_unknown_host_failure(self): def test_open_channel_failure(self): client = ParallelSSHClient([self.host], port=self.port, pkey=self.user_key) - client.join(client.run_command(self.cmd)) + output = client.run_command(self.cmd, return_list=True) + client.join(output) client.host_clients[self.host].session.disconnect() self.assertRaises(SessionError, client.host_clients[self.host].open_session) + self.assertEqual(output[0].exit_code, None) def test_host_no_client(self): output = {'blah': None} @@ -1463,11 +1464,12 @@ def test_return_list(self): self.assertIsInstance(output[0], HostOutput) for host_output in output: self.assertEqual(host_output.host, self.client.hosts[0]) - self.assertEqual(host_output.exit_code, expected_exit_code) + self.assertEqual(host_output.exit_code, None) _stdout = list(host_output.stdout) _stderr = list(host_output.stderr) self.assertListEqual(expected_stdout, _stdout) self.assertListEqual(expected_stderr, _stderr) + self.assertEqual(host_output.exit_code, expected_exit_code) # self.client.cmds should be set for cmd in self.client.cmds: self.assertRaises(ValueError, self.client.get_output, cmd, output) @@ -1493,11 +1495,12 @@ def test_return_list_last_output_multi_host(self): self.assertIsInstance(last_out[0], HostOutput) for host_i, host_output in enumerate(last_out): self.assertEqual(host_output.host, client.hosts[host_i]) - self.assertEqual(host_output.exit_code, expected_exit_code) + self.assertEqual(host_output.exit_code, None) _stdout = list(host_output.stdout) _stderr = list(host_output.stderr) self.assertListEqual(expected_stdout, _stdout) self.assertListEqual(expected_stderr, _stderr) + self.assertEqual(host_output.exit_code, expected_exit_code) def test_client_disconnect(self): client = ParallelSSHClient([self.host], diff --git a/tests/test_output.py b/tests/test_output.py index d33bd905..7d34ba7b 100644 --- a/tests/test_output.py +++ b/tests/test_output.py @@ -22,28 +22,33 @@ import unittest +import logging + +from pssh import logger from pssh.output import HostOutput +logger.setLevel(logging.DEBUG) +logging.basicConfig() + class TestHostOutput(unittest.TestCase): def setUp(self): - self.output = HostOutput(None, None, None, None, None, None) + self.output = HostOutput(None, None, None, None, None, None, True) def test_print(self): self.assertTrue(str(self.output)) def test_update(self): host, cmd, chan, stdout, stderr, \ - stdin, exit_code, exception = 'host', 'cmd', 'chan', 'stdout', \ - 'stderr', 'stdin', 1, Exception() + stdin, exception = 'host', 'cmd', 'chan', 'stdout', \ + 'stderr', 'stdin', Exception() self.output.update({'host': host, 'cmd': cmd, 'channel': chan, 'stdout': stdout, 'stderr': stderr, 'stdin': stdin, - 'exit_code': exit_code, 'exception': exception}) self.assertEqual(host, self.output.host) self.assertEqual(self.output.host, self.output['host']) @@ -57,7 +62,18 @@ def test_update(self): self.assertEqual(self.output.stderr, self.output['stderr']) self.assertEqual(stdin, self.output.stdin) self.assertEqual(self.output.stdin, self.output['stdin']) - self.assertEqual(exit_code, self.output.exit_code) - self.assertEqual(self.output.exit_code, self.output['exit_code']) self.assertEqual(exception, self.output.exception) self.assertEqual(self.output.exception, self.output['exception']) + + def test_bad_exit_status(self): + self.assertEqual(self.output.exit_code, None) + + def test_excepting_client_exit_code(self): + class ExcSSHClient(object): + def get_exit_status(self, channel): + raise Exception + exc_client = ExcSSHClient() + host_out = HostOutput( + 'host', None, None, None, None, None, exc_client, None) + exit_code = host_out.exit_code + self.assertEqual(exit_code, None) diff --git a/tests/test_paramiko_parallel_client.py b/tests/test_paramiko_parallel_client.py index d49e7f9b..55528b21 100644 --- a/tests/test_paramiko_parallel_client.py +++ b/tests/test_paramiko_parallel_client.py @@ -88,7 +88,7 @@ def test_client_join_consume_output(self): output = self.client.run_command(self.fake_cmd) expected_exit_code = 0 self.client.join(output, consume_output=True) - exit_code = output[self.host]['exit_code'] + exit_code = output[self.host].exit_code stdout = list(output[self.host]['stdout']) stderr = list(output[self.host]['stderr']) self.assertTrue(len(stdout) == 0) @@ -101,7 +101,7 @@ def test_client_join_stdout(self): expected_stdout = [self.fake_resp] expected_stderr = [] self.client.join(output) - exit_code = output[self.host]['exit_code'] + exit_code = output[self.host].exit_code stdout = list(output[self.host]['stdout']) stderr = list(output[self.host]['stderr']) self.assertEqual(expected_exit_code, exit_code, @@ -121,7 +121,7 @@ def test_pssh_client_no_stdout_non_zero_exit_code_immediate_exit(self): output = self.client.run_command('exit 1') expected_exit_code = 1 self.client.join(output) - exit_code = output[self.host]['exit_code'] + exit_code = output[self.host].exit_code self.assertEqual(expected_exit_code, exit_code, msg="Got unexpected exit code - %s, expected %s" % (exit_code, @@ -134,7 +134,7 @@ def test_pssh_client_run_command_get_output(self): expected_stderr = [] stdout = list(output[self.host]['stdout']) stderr = list(output[self.host]['stderr']) - exit_code = output[self.host]['exit_code'] + exit_code = output[self.host].exit_code self.assertEqual(expected_exit_code, exit_code, msg="Got unexpected exit code - %s, expected %s" % (exit_code, @@ -159,7 +159,7 @@ def test_pssh_client_run_command_get_output_explicit(self): expected_stderr = [] stdout = list(output[self.host].stdout) stderr = list(output[self.host].stderr) - exit_code = output[self.host]['exit_code'] + exit_code = output[self.host].exit_code self.assertEqual(expected_exit_code, exit_code, msg="Got unexpected exit code - %s, expected %s" % (exit_code, @@ -203,10 +203,10 @@ def test_pssh_client_hosts_list_part_failure(self): port=self.listen_port, pkey=self.user_key, agent=self.agent) - output = client.run_command(self.fake_cmd, + output = client.run_command('sleep 1; echo me', stop_on_errors=False) self.assertFalse(client.finished(output)) - client.join(output) + client.join(output, consume_output=True) self.assertTrue(client.finished(output)) self.assertTrue(hosts[0] in output, msg="Successful host does not exist in output - output is %s" % (output,)) @@ -267,27 +267,27 @@ def test_pssh_client_run_command_password(self): stdout = list(output[self.host]['stdout']) self.assertTrue(self.host in output, msg="No output for host") - self.assertTrue(output[self.host]['exit_code'] == 0, + self.assertTrue(output[self.host].exit_code == 0, msg="Expected exit code 0, got %s" % ( - output[self.host]['exit_code'],)) + output[self.host].exit_code,)) self.assertEqual(stdout, [self.fake_resp]) def test_pssh_client_long_running_command_exit_codes(self): expected_lines = 5 output = self.client.run_command(self.long_cmd(expected_lines)) self.assertTrue(self.host in output, msg="Got no output for command") - self.assertTrue(not output[self.host]['exit_code'], + self.assertTrue(not output[self.host].exit_code, msg="Got exit code %s for still running cmd.." % ( - output[self.host]['exit_code'],)) + output[self.host].exit_code,)) self.assertFalse(self.client.finished(output)) # Embedded server is also asynchronous and in the same thread # as our client so need to sleep for duration of server connection + self.client.join(output, consume_output=True) sleep(expected_lines) - self.client.join(output) self.assertTrue(self.client.finished(output)) - self.assertTrue(output[self.host]['exit_code'] == 0, + self.assertEqual(output[self.host].exit_code, 0, msg="Got non-zero exit code %s" % ( - output[self.host]['exit_code'],)) + output[self.host].exit_code,)) def test_pssh_client_retries(self): """Test connection error retries""" @@ -778,7 +778,7 @@ def test_multiple_single_quotes_in_cmd(self): expected = 'me and me' self.assertTrue(len(stdout)==1, msg="Got incorrect number of lines in output - %s" % (stdout,)) - self.assertTrue(output[self.host]['exit_code'] == 0, + self.assertTrue(output[self.host].exit_code == 0, msg="Error executing cmd with multiple single quotes - %s" % ( stdout,)) self.assertEqual(expected, stdout[0], @@ -789,18 +789,18 @@ def test_backtics_in_cmd(self): """Test running command with backtics in it""" output = self.client.run_command("out=`ls` && echo $out") self.client.join(output) - self.assertTrue(output[self.host]['exit_code'] == 0, + self.assertTrue(output[self.host].exit_code == 0, msg="Error executing cmd with backtics - error code %s" % ( - output[self.host]['exit_code'],)) + output[self.host].exit_code,)) def test_multiple_shell_commands(self): """Test running multiple shell commands in one go""" output = self.client.run_command("echo me; echo and; echo me") stdout = list(output[self.host]['stdout']) expected = ["me", "and", "me"] - self.assertTrue(output[self.host]['exit_code'] == 0, + self.assertTrue(output[self.host].exit_code == 0, msg="Error executing multiple shell cmds - error code %s" % ( - output[self.host]['exit_code'],)) + output[self.host].exit_code,)) self.assertEqual(expected, stdout, msg="Got unexpected output. Expected %s, got %s" % ( expected, stdout,)) @@ -810,9 +810,9 @@ def test_escaped_quotes(self): output = self.client.run_command('t="--flags=\\"this\\""; echo $t') stdout = list(output[self.host]['stdout']) expected = ['--flags="this"'] - self.assertTrue(output[self.host]['exit_code'] == 0, + self.assertTrue(output[self.host].exit_code == 0, msg="Error executing multiple shell cmds - error code %s" % ( - output[self.host]['exit_code'],)) + output[self.host].exit_code,)) self.assertEqual(expected, stdout, msg="Got unexpected output. Expected %s, got %s" % ( expected, stdout,)) @@ -845,7 +845,7 @@ def test_host_config(self): else: raise AssertionError("Expected AutnenticationException on host %s", hosts[0]) - self.assertFalse(output[hosts[1]]['exit_code'], + self.assertFalse(output[hosts[1]].exit_code, msg="Execution failed on host %s" % (hosts[1],)) self.assertTrue(client.host_clients[hosts[0]].user == user, msg="Host config user override failed") @@ -864,7 +864,7 @@ def test_pssh_client_override_allow_agent_authentication(self): expected_stderr = [] stdout = list(output[self.host]['stdout']) stderr = list(output[self.host]['stderr']) - exit_code = output[self.host]['exit_code'] + exit_code = output[self.host].exit_code self.assertEqual(expected_exit_code, exit_code, msg="Got unexpected exit code - %s, expected %s" % (exit_code, @@ -897,7 +897,7 @@ def test_per_host_tuple_args(self): expected = [host_args[i]] stdout = list(output[host]['stdout']) self.assertEqual(expected, stdout) - self.assertTrue(output[host]['exit_code'] == 0) + self.assertTrue(output[host].exit_code == 0) host_args = (('arg1', 'arg2'), ('arg3', 'arg4'), ('arg5', 'arg6'),) cmd = 'echo %s %s' output = client.run_command(cmd, host_args=host_args) @@ -905,7 +905,7 @@ def test_per_host_tuple_args(self): expected = ["%s %s" % host_args[i]] stdout = list(output[host]['stdout']) self.assertEqual(expected, stdout) - self.assertTrue(output[host]['exit_code'] == 0) + self.assertTrue(output[host].exit_code == 0) self.assertRaises(HostArgumentException, client.run_command, cmd, host_args=[host_args[0]]) # Invalid number of args @@ -933,7 +933,7 @@ def test_per_host_dict_args(self): expected = ["%(host_arg1)s %(host_arg2)s" % host_args[i]] stdout = list(output[host]['stdout']) self.assertEqual(expected, stdout) - self.assertTrue(output[host]['exit_code'] == 0) + self.assertTrue(output[host].exit_code == 0) self.assertRaises(HostArgumentException, client.run_command, cmd, host_args=[host_args[0]]) # Host list generator should work also @@ -943,7 +943,7 @@ def test_per_host_dict_args(self): expected = ["%(host_arg1)s %(host_arg2)s" % host_args[i]] stdout = list(output[host]['stdout']) self.assertEqual(expected, stdout) - self.assertTrue(output[host]['exit_code'] == 0) + self.assertTrue(output[host].exit_code == 0) client.hosts = (h for h in hosts) self.assertRaises(HostArgumentException, client.run_command, cmd, host_args=[host_args[0]]) @@ -985,7 +985,7 @@ def test_pty(self): output = self.client.run_command(cmd, use_pty=False) self.client.join(output) stdout = list(output[self.host]['stdout']) - exit_code = output[self.host]['exit_code'] + exit_code = output[self.host].exit_code expected = [] self.assertEqual(expected, stdout) self.assertTrue(exit_code == 0) @@ -1003,12 +1003,12 @@ def test_output_attributes(self): expected_stdout = [self.fake_resp] expected_stderr = [] self.client.join(output) - exit_code = output[self.host]['exit_code'] + exit_code = output[self.host].exit_code stdout = list(output[self.host]['stdout']) stderr = list(output[self.host]['stderr']) host_output = output[self.host] self.assertEqual(expected_exit_code, host_output.exit_code) - self.assertEqual(expected_exit_code, host_output['exit_code']) + self.assertEqual(expected_exit_code, host_output.exit_code) self.assertEqual(host_output['cmd'], host_output.cmd) self.assertEqual(host_output['exception'], host_output.exception) self.assertEqual(host_output['stdout'], host_output.stdout)