diff --git a/Changelog.rst b/Changelog.rst index 7f994c31..35faa007 100644 --- a/Changelog.rst +++ b/Changelog.rst @@ -1,6 +1,24 @@ Change Log ============ +2.5.0 ++++++ + +Changes +------- + +* Python 2 no longer supported. +* Updated class arguments, refactor for ``pssh.clients.native.tunnel``. + +Fixes +----- + +* Closed clients with proxy host enabled would not shutdown their proxy servers. +* Clients with proxy host enabled would not disconnect the proxy client on ``.disconnect`` being called. +* Default identity files would not be used when private key was not specified - #222. +* ``ParallelSSHClient(<..>, identity_auth=False`` would not be honoured. + + 2.4.0 +++++ diff --git a/README.rst b/README.rst index b43acff1..80342b92 100644 --- a/README.rst +++ b/README.rst @@ -6,7 +6,7 @@ Asynchronous parallel SSH client library. Run SSH commands over many - hundreds/hundreds of thousands - number of servers asynchronously and with minimal system load on the client host. -Native code based client with extremely high performance - based on ``libssh2`` C library. +Native code based clients with extremely high performance, making use of C libraries. .. image:: https://img.shields.io/badge/License-LGPL%20v2.1-blue.svg :target: https://pypi.python.org/pypi/parallel-ssh @@ -21,10 +21,10 @@ Native code based client with extremely high performance - based on ``libssh2`` .. image:: https://img.shields.io/pypi/wheel/parallel-ssh.svg :target: https://pypi.python.org/pypi/parallel-ssh .. image:: https://readthedocs.org/projects/parallel-ssh/badge/?version=latest - :target: http://parallel-ssh.readthedocs.org/en/latest/ + :target: https://parallel-ssh.readthedocs.org/en/latest/ :alt: Latest documentation -.. _`read the docs`: http://parallel-ssh.readthedocs.org/en/latest/ +.. _`read the docs`: https://parallel-ssh.readthedocs.org/en/latest/ ************ Installation @@ -34,6 +34,15 @@ Installation pip install parallel-ssh + +An update to `pip` may be needed to be able to install binary wheels. + +.. code-block:: shell + + pip install -U pip + pip install parallel-ssh + + ************* Usage Example ************* @@ -53,6 +62,7 @@ Run ``uname`` on two hosts in parallel. for host_output in output: for line in host_output.stdout: print(line) + exit_code = host_out.exit_code :Output: @@ -65,7 +75,7 @@ Run ``uname`` on two hosts in parallel. Single Host Client ******************* -Single host client with similar API for users that do not need parallel functionality. +Single host client with similar API can be used if parallel functionality is not needed. .. code-block:: python @@ -78,11 +88,19 @@ Single host client with similar API for users that do not need parallel function host_out = client.run_command(cmd) for line in host_out.stdout: print(line) + exit_code = host_out.exit_code .. contents:: +************************ +Questions And Discussion +************************ + +`Github discussions `_ can be used to discuss, ask questions and share ideas regarding the use of parallel-ssh. + + ************** Native clients ************** @@ -91,36 +109,59 @@ The default client in ``parallel-ssh`` is a native client based on ``ssh2-python See `this post `_ for a performance comparison of different Python SSH libraries. -Alternative clients based on ``ssh-python`` (``libssh``) are also available under ``pssh.clients.ssh``. See `client documentation `_ for a feature comparison of the available clients in the library. +Alternative clients based on ``ssh-python`` (``libssh``) are also available under ``pssh.clients.ssh``. See `client documentation `_ for a feature comparison of the available clients in the library. ``parallel-ssh`` makes use of clients and an event loop solely based on C libraries providing native code levels of performance and stability with an easy to use Python API. -**************************** Native Code Client Features **************************** * Highest performance and least overhead of any Python SSH library * Thread safe - makes use of native threads for CPU bound calls like authentication -* Natively asynchronous utilising ``libssh2`` via ``ssh2-python`` +* Natively asynchronous utilising C libraries implementing the SSH protocol * Significantly reduced overhead in CPU and memory usage -*********** -Exit codes -*********** +**************** +Why This Library +**************** + +Because other options are either immature, unstable, lacking in performance or all of the aforementioned. + +Certain other self-proclaimed *leading* Python SSH libraries leave a lot to be desired from a performance and stability point of view, as well as suffering from a lack of maintenance with hundreds of open issues, unresolved pull requests and inherent design flaws. + +The SSH libraries ``parallel-ssh`` uses are, on the other hand, long standing mature C libraries in `libssh2 `_ and `libssh `_ that have been in production use for decades and are part of some of the most widely distributed software available today - `Git` itself, `OpenSSH`, `Curl` and many others. + +These low level libraries are far better placed to provide the maturity, stability and performance needed from an SSH client for production use. + +``parallel-ssh`` provides easy to use SSH clients that hide the complexity, while offering stability and native code levels of performance and as well as the ability to scale to hundreds or more concurrent hosts. + +See `alternatives `_ for a more complete comparison of alternative SSH libraries, as well as `performance comparisons `_ mentioned previously. + + +************************************* +Waiting for Completion and Exit Codes +************************************* + +The client's ``join`` function can be used to wait for all commands in output to finish. + +After ``join`` returns, commands have finished and all output can be read without blocking. Once *either* standard output is iterated on *to completion*, or ``client.join()`` is called, exit codes become available in host output. -Iteration ends *only when remote command has completed*, though it may be interrupted and resumed at any point. +Iteration ends *only when remote command has completed*, though it may be interrupted and resumed at any point - see `join and output timeouts `_ documentation. -``HostOutput.exit_code`` is a dynamic property and will return ``None`` when exit code is not ready, meaning command has not finished, or channel is unavailable due to error. +``HostOutput.exit_code`` is a dynamic property and will return ``None`` when exit code is not ready, meaning command has not finished, or unavailable due to error. -Once all output has been gathered exit codes become available even without calling ``join``. +Once all output has been gathered exit codes become available even without calling ``join`` as per previous examples. .. code-block:: python - output = client.run_command('uname', return_list=True) + output = client.run_command('uname') + + client.join() + for host_out in output: for line in host_out.stdout: print(line) @@ -134,22 +175,6 @@ Once all output has been gathered exit codes become available even without calli Linux 0 -********************** -Waiting for Completion -********************** - -The client's ``join`` function can be used to wait for all commands in output object to finish. - -After ``join`` returns, commands have finished and all output can be read without blocking. - -.. code-block:: python - - client.join() - - for host_out in output: - for line in host_output.stdout: - print(line) - print(host_out.exit_code) Similarly, exit codes are available after ``client.join()`` without reading output. @@ -171,7 +196,7 @@ Similarly, exit codes are available after ``client.join()`` without reading outp *************************** -Build in Host Output Logger +Built in Host Output Logger *************************** There is also a built in host logger that can be enabled to log output from remote hosts for both stdout and stderr. The helper function ``pssh.utils.enable_host_logger`` will enable host logging to stdout. @@ -183,8 +208,8 @@ To log output without having to iterate over output generators, the ``consume_ou from pssh.utils import enable_host_logger enable_host_logger() - output = client.run_command('uname') - client.join(output, consume_output=True) + client.run_command('uname') + client.join(consume_output=True) :Output: .. code-block:: shell @@ -192,6 +217,7 @@ To log output without having to iterate over output generators, the ``consume_ou [localhost] Linux +**** SCP **** @@ -213,9 +239,10 @@ To copy a local file to remote hosts in parallel with SCP: cmds = client.scp_send('../test', 'test_dir/test') joinall(cmds, raise_error=True) -See `SFTP and SCP documentation `_ for more examples. +See `SFTP and SCP documentation `_ for more examples. +***** SFTP ***** @@ -235,19 +262,20 @@ To copy a local file to remote hosts in parallel: cmds = client.copy_file('../test', 'test_dir/test') joinall(cmds, raise_error=True) + :Output: .. code-block:: python Copied local file ../test to remote destination myhost1:test_dir/test Copied local file ../test to remote destination myhost2:test_dir/test -There is similar capability to copy remote files to local ones suffixed with the host's name with the ``copy_remote_file`` function. +There is similar capability to copy remote files to local ones with configurable file names via the `copy_remote_file `_ function. -In addition, per-host configurable file name functionality is provided for both SFTP and SCP - see `documentation `_. +In addition, per-host configurable file name functionality is provided for both SFTP and SCP - see `documentation `_. Directory recursion is supported in both cases via the ``recurse`` parameter - defaults to off. -See `SFTP and SCP documentation `_ for more examples. +See `SFTP and SCP documentation `_ for more examples. ***************** @@ -259,108 +287,5 @@ Design And Goals To meet these goals, API driven solutions are preferred first and foremost. This frees up developers to drive the library via any method desired, be that environment variables, CI driven tasks, command line tools, existing OpenSSH or new configuration files, from within an application et al. -Comparison With Alternatives -***************************** - -There are not many alternatives for SSH libraries in Python. Of the few that do exist, here is how they compare with ``parallel-ssh``. - -As always, it is best to use a tool that is suited to the task at hand. ``parallel-ssh`` is a library for programmatic and non-interactive use - see `Design And Goals`_. If requirements do not match what it provides then it best not be used. Same applies for the tools described below. - -Paramiko -________ - -The default SSH client library in ``parallel-ssh`` <=``1.6.x`` series. - -Pure Python code, while having native extensions as dependencies, with poor performance and numerous bugs compared to both OpenSSH binaries and the ``libssh2`` based native clients in ``parallel-ssh`` ``1.2.x`` and above. Recent versions have regressed in performance and have `blocker issues `_. - -It does not support non-blocking mode, so to make it non-blocking monkey patching must be used which affects all other uses of the Python standard library. - -asyncssh -________ - -Pure Python ``asyncio`` framework using client library. License (`EPL`) is not compatible with GPL, BSD or other open source licenses and `combined works cannot be distributed `_. - -Therefore unsuitable for use in many projects, including ``parallel-ssh``. - -Fabric -______ - -Port of Capistrano from Ruby to Python. Intended for command line use and is heavily systems administration oriented rather than non-interactive library. Same maintainer as Paramiko. - -Uses Paramiko and suffers from the same limitations. More over, uses threads for parallelisation, while `not being thread safe `_, and exhibits very poor performance and extremely high CPU usage even for limited number of hosts - 1 to 10 - with scaling limited to one core. - -Library API is non-standard, poorly documented and with numerous issues as API use is not intended. - -Ansible -_______ - -A configuration management and automation tool that makes use of SSH remote commands. Uses, in parts, both Paramiko and OpenSSH binaries. - -Similarly to Fabric, uses threads for parallelisation and suffers from the poor scaling that this model offers. - -See `The State of Python SSH Libraries `_ for what to expect from scaling SSH with threads, as compared `to non-blocking I/O `_ with ``parallel-ssh``. - -Again similar to Fabric, its intended and documented use is interactive via command line rather than library API based. It may, however, be an option if Ansible is already being used for automation purposes with existing playbooks, the number of hosts is small, and when the use case is interactive via command line. - -``parallel-ssh`` is, on the other hand, a suitable option for Ansible as an SSH client that would improve its parallel SSH performance significantly. - -ssh2-python -___________ - -Bindings for ``libssh2`` C library. Used by ``parallel-ssh`` as of ``1.2.0`` and is by same author. - -Does not do parallelisation out of the box but can be made parallel via Python's ``threading`` library relatively easily and as it is a wrapper to a native library that releases Python's GIL, can scale to multiple cores. - -``parallel-ssh`` uses ``ssh2-python`` in its native non-blocking mode with event loop and co-operative sockets provided by ``gevent`` for an extremely high performance library without the side-effects of monkey patching - see `benchmarks `_. - -In addition, ``parallel-ssh`` uses native threads to offload CPU blocked tasks like authentication in order to scale to multiple cores while still remaining non-blocking for network I/O. - -``pssh.clients.native.SSHClient`` is a single host natively non-blocking client for users that do not need parallel capabilities but still want a non-blocking client with native code performance. - -Out of all the available Python SSH libraries, ``libssh2`` and ``ssh2-python`` have been shown, see benchmarks above, to perform the best with the least resource utilisation and ironically for a native code extension the least amount of dependencies. Only ``libssh2`` C library and its dependencies which are included in binary wheels. - -However, it lacks support for some SSH features present elsewhere like GSS-API and certificate authentication. - -ssh-python ----------- - -Bindings for ``libssh`` C library. A client option in ``parallel-ssh``, same author. Similar performance to ssh2-python above. - -For non-blocking use, only certain functions are supported. SCP/SFTP in particular cannot be used in non-blocking mode, nor can tunnels. - -Supports more authentication options compared to ``ssh2-python`` like GSS-API (Kerberos) and certificate authentication. - - -******** -Scaling -******** - -Some guide lines on scaling ``parallel-ssh`` and pool size numbers. - -In general, long lived commands with little or no output *gathering* will scale better. Pool sizes in the multiple thousands have been used successfully with little CPU overhead in the single thread running them in these use cases. - -Conversely, many short lived commands with output gathering will not scale as well. In this use case, smaller pool sizes in the hundreds are likely to perform better with regards to CPU overhead in the event loop. - -Multiple Python native threads, each of which can get its own event loop, may be used to scale this use case further as number of CPU cores allows. Note that ``parallel-ssh`` imports *must* be done within the target function of the newly started thread for it to receive its own event loop. ``gevent.get_hub()`` may be used to confirm that the worker thread event loop differs from the main thread. - -Gathering is highlighted here as output generation does not affect scaling. Only when output is gathered either over multiple still running commands, or while more commands are being triggered, is overhead increased. - -Technical Details -****************** - -To understand why this is, consider that in co-operative multi tasking, which is being used in this project via the ``gevent`` library, a co-routine (greenlet) needs to ``yield`` the event loop to allow others to execute - *co-operation*. When one co-routine is constantly grabbing the event loop in order to gather output, or when co-routines are constantly trying to start new short-lived commands, it causes contention with other co-routines that also want to use the event loop. - -This manifests itself as increased CPU usage in the process running the event loop and reduced performance with regards to scaling improvements from increasing pool size. - -On the other end of the spectrum, long lived remote commands that generate *no* output only need the event loop at the start, when they are establishing connections, and at the end, when they are finished and need to gather exit codes, which results in practically zero CPU overhead at any time other than start or end of command execution. - -Output *generation* is done remotely and has no effect on the event loop until output is gathered - output buffers are iterated on. Only at that point does the event loop need to be held. - -************* -User's group -************* - -There is a public `ParallelSSH Google group `_ setup for this purpose - both posting and viewing are open to the public. - .. image:: https://ga-beacon.appspot.com/UA-9132694-7/parallel-ssh/README.rst?pixel :target: https://github.com/igrigorik/ga-beacon diff --git a/doc/advanced.rst b/doc/advanced.rst index a6586b23..242d0e7f 100644 --- a/doc/advanced.rst +++ b/doc/advanced.rst @@ -513,7 +513,7 @@ Stderr is empty: .. code-block:: python - for line in output[client.hosts[0]].stderr: + for line in output[0].stderr: print(line) No output from ``stderr``. @@ -523,9 +523,9 @@ No output from ``stderr``. SFTP and SCP ************* -SFTP and SCP are both supported by ``parallel-ssh`` and functions are provided by the client for copying files with SFTP to and from remote servers - default native client only. +SFTP and SCP are both supported by ``parallel-ssh`` and functions are provided by the client for copying files to and from remote servers - default native clients only. -Neither SFTP nor SCP have a shell interface and no output is provided for any SFTP/SCP commands. +Neither SFTP nor SCP have a shell interface and no output is sent for any SFTP/SCP commands. As such, SFTP functions in ``ParallelSSHClient`` return greenlets that will need to be joined to raise any exceptions from them. :py:func:`gevent.joinall` may be used for that. @@ -542,15 +542,15 @@ To copy the local file with relative path ``../test`` to the remote relative pat client = ParallelSSHClient(hosts) - greenlets = client.copy_file('../test', 'test_dir/test') - joinall(greenlets, raise_error=True) + cmds = client.copy_file('../test', 'test_dir/test') + joinall(cmds, raise_error=True) To recursively copy directory structures, enable the ``recurse`` flag: .. code-block:: python - greenlets = client.copy_file('my_dir', 'my_dir', recurse=True) - joinall(greenlets, raise_error=True) + cmds = client.copy_file('my_dir', 'my_dir', recurse=True) + joinall(cmds, raise_error=True) .. seealso:: @@ -570,8 +570,8 @@ Copying remote files in parallel requires that file names are de-duplicated othe client = ParallelSSHClient(hosts) - greenlets = client.copy_remote_file('remote.file', 'local.file') - joinall(greenlets, raise_error=True) + cmds = client.copy_remote_file('remote.file', 'local.file') + joinall(cmds, raise_error=True) The above will create files ``local.file_host1`` where ``host1`` is the host name the file was copied from. @@ -855,7 +855,7 @@ Clients for hosts that are no longer on the host list are removed on host list a <..> -When wanting to reassign host list frequently, it is best to sort or otherwise ensure order is maintained to avoid reconnections on hosts that are still in the host list but in a different order. +When reassigning host list frequently, it is best to sort or otherwise ensure order is maintained to avoid reconnections on hosts that are still in the host list but in a different position. For example, the following will cause reconnections on both hosts, though both are still in the list. diff --git a/doc/alternatives.rst b/doc/alternatives.rst new file mode 100644 index 00000000..9b5d6ab6 --- /dev/null +++ b/doc/alternatives.rst @@ -0,0 +1,74 @@ +Comparison With Alternatives +***************************** + +There are not many alternatives for SSH libraries in Python. Of the few that do exist, here is how they compare with ``parallel-ssh``. + +As always, it is best to use a tool that is suited to the task at hand. ``parallel-ssh`` is a library for programmatic and non-interactive use. If requirements do not match what it provides then it best not be used. Same applies for the tools described below. + +Paramiko +________ + +The default SSH client library in ``parallel-ssh<=1.6.x`` series. + +Pure Python code, while having native extensions as dependencies, with poor performance and numerous bugs compared to both OpenSSH binaries and the ``libssh2`` based native clients in ``parallel-ssh`` ``1.2.x`` and above. Recent versions have regressed in performance and have `blocker issues `_. + +It does not support non-blocking mode, so to make it non-blocking monkey patching must be used which affects all other uses of the Python standard library. + +Based on its use in historical ``parallel-ssh`` releases as well as `performance testing `_, paramiko is very far from being mature enough to be used. + +This is why ``parallel-ssh`` has moved away from paramiko entirely since ``2.0.0`` where it was dropped as a dependency. + +asyncssh +________ + +Pure Python ``asyncio`` framework using client library. License (`EPL`) is not compatible with GPL, BSD or other open source licenses and `combined works cannot be distributed `_. + +Therefore unsuitable for use in many projects, including ``parallel-ssh``. + +Fabric +______ + +Port of Capistrano from Ruby to Python. Intended for command line use and is heavily systems administration oriented rather than non-interactive library. Same maintainer as Paramiko. + +Uses Paramiko and suffers from the same limitations. More over, uses threads for parallelisation, while `not being thread safe `_, and exhibits very poor performance and extremely high CPU usage even for limited number of hosts - 1 to 10 - with scaling limited to one core. + +Library API is non-standard, poorly documented and with numerous issues as API use is not intended. + +Ansible +_______ + +A configuration management and automation tool that makes use of SSH remote commands. Uses, in parts, both Paramiko and OpenSSH binaries. + +Similarly to Fabric, uses threads for parallelisation and suffers from the poor scaling that this model offers. + +See `The State of Python SSH Libraries `_ for what to expect from scaling SSH with threads, as compared `to non-blocking I/O `_ with ``parallel-ssh``. + +Again similar to Fabric, its intended and documented use is interactive via command line rather than library API based. It may, however, be an option if Ansible is already being used for automation purposes with existing playbooks, the number of hosts is small, and when the use case is interactive via command line. + +``parallel-ssh`` is, on the other hand, a suitable option for Ansible as an SSH client that would improve its parallel SSH performance significantly. + +ssh2-python +___________ + +Bindings for ``libssh2`` C library. Used by ``parallel-ssh`` as of ``1.2.0`` and is by same author. + +Does not do parallelisation out of the box but can be made parallel via Python's ``threading`` library relatively easily and as it is a wrapper to a native library that releases Python's GIL, can scale to multiple cores. + +``parallel-ssh`` uses ``ssh2-python`` in its native non-blocking mode with event loop and co-operative sockets provided by ``gevent`` for an extremely high performance library without the side-effects of monkey patching - see `benchmarks `_. + +In addition, ``parallel-ssh`` uses native threads to offload CPU bound tasks like authentication in order to scale to multiple cores while still remaining non-blocking for network I/O. + +``pssh.clients.native.SSHClient`` is a single host natively non-blocking client for users that do not need parallel capabilities but still want a fully featured client with native code performance. + +Out of all the available Python SSH libraries, ``libssh2`` and ``ssh2-python`` have been shown, see benchmarks above, to perform the best with the least resource utilisation and ironically for a native code extension the least amount of dependencies. Only ``libssh2`` C library and its dependencies which are included in binary wheels. + +However, it lacks support for some SSH features present elsewhere like GSS-API and certificate authentication. + +ssh-python +__________ + +Bindings for ``libssh`` C library. A client option in ``parallel-ssh``, same author. Similar performance to ssh2-python above. + +For non-blocking use, only certain functions are supported. SCP/SFTP in particular cannot be used in non-blocking mode, nor can tunnels. + +Supports more authentication options compared to ``ssh2-python`` like GSS-API (Kerberos) and certificate authentication. diff --git a/doc/index.rst b/doc/index.rst index 1d26601b..cd3bded9 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -76,6 +76,8 @@ Single host client is also available with similar API. advanced api clients + scaling + alternatives Changelog api_upgrade_2_0 diff --git a/doc/installation.rst b/doc/installation.rst index 93ba08aa..ae384ac6 100644 --- a/doc/installation.rst +++ b/doc/installation.rst @@ -56,3 +56,14 @@ Or for developing changes: pip install -r requirements_dev.txt + +Python 2 +-------- + +As of January 2021, Python 2 is no longer supported by the Python Software Foundation nor ``parallel-ssh`` - see `Sunset Python 2 `_. + +Versions of ``parallel-ssh<=2.4.0`` will still work. + +Future releases are not guaranteed to be compatible or work at all with Python 2. + +If your company requires Python 2 support contact the author directly at the email address on Github commits to discuss rates. diff --git a/doc/quickstart.rst b/doc/quickstart.rst index 79349338..2d1e4613 100644 --- a/doc/quickstart.rst +++ b/doc/quickstart.rst @@ -68,7 +68,7 @@ Output:: Step by Step -------------- +============ Make a list or other iterable of the hosts to run on: @@ -119,7 +119,7 @@ Standard output, aka ``stdout``, for a given :py:class:`HostOutput `_. +The ``read_timeout`` keyword argument to ``run_command`` may be used to cause reading to timeout if no output is received after the given number of seconds - see `join and output timeouts `_. ``stdout`` is a generator. To retrieve all of stdout can wrap it with list, per below. @@ -176,8 +176,8 @@ First, ensure that all commands have finished by either joining on the output ob .. code-block:: python client.join(output) - for host, host_output in output: - print("Host %s exit code: %s" % (host, host_output.exit_code)) + for host_output in output: + print("Host %s exit code: %s" % (host_output.host, host_output.exit_code)) As of ``1.11.0``, ``client.join`` is not required as long as output has been gathered. @@ -235,8 +235,6 @@ To use files under a user's ``.ssh`` directory: .. code-block:: python - import os - client = ParallelSSHClient(hosts, pkey='~/.ssh/my_pkey') @@ -271,8 +269,8 @@ The helper function :py:func:`pssh.utils.enable_host_logger` will enable host lo from pssh.utils import enable_host_logger enable_host_logger() - output = client.run_command('uname') - client.join(output, consume_output=True) + client.run_command('uname') + client.join(consume_output=True) :Output: .. code-block:: python @@ -288,10 +286,10 @@ The ``stdin`` attribute on :py:class:`HostOutput ` is a .. code-block:: python - output = client.run_command('read') + output = client.run_command('read line; echo $line') host_output = output[0] stdin = host_output.stdin - stdin.write("writing to stdin\\n") + stdin.write("writing to stdin\n") stdin.flush() for line in host_output.stdout: print(line) @@ -325,8 +323,8 @@ With this flag, the ``exception`` output attribute will contain the exception on :Output: .. code-block:: python - host1: 0, None - host2: None, AuthenticationError <..> + Host host1: exit code 0, exception None + Host host2: exit code None, exception AuthenticationError <..> .. seealso:: diff --git a/doc/scaling.rst b/doc/scaling.rst new file mode 100644 index 00000000..dbca4961 --- /dev/null +++ b/doc/scaling.rst @@ -0,0 +1,24 @@ +******** +Scaling +******** + +Some guide lines on scaling ``parallel-ssh`` and pool size numbers. + +In general, long lived commands with little or no output *gathering* will scale better. Pool sizes in the multiple thousands have been used successfully with little CPU overhead in the single thread running them in these use cases. + +Conversely, many short lived commands with output gathering will not scale as well. In this use case, smaller pool sizes in the hundreds are likely to perform better with regards to CPU overhead in the event loop. + +Multiple Python native threads, each of which can get its own event loop, may be used to scale this use case further as number of CPU cores allows. Note that ``parallel-ssh`` imports *must* be done within the target function of the newly started thread for it to receive its own event loop. ``gevent.get_hub()`` may be used to confirm that the worker thread event loop differs from the main thread. + +Gathering is highlighted here as output generation does not affect scaling. Only when output is gathered either over multiple still running commands, or while more commands are being triggered, is overhead increased. + +Technical Details +****************** + +To understand why this is, consider that in co-operative multi tasking, which is being used in this project via the ``gevent`` library, a co-routine (greenlet) needs to ``yield`` the event loop to allow others to execute - *co-operation*. When one co-routine is constantly grabbing the event loop in order to gather output, or when co-routines are constantly trying to start new short-lived commands, it causes contention with other co-routines that also want to use the event loop. + +This manifests itself as increased CPU usage in the process running the event loop and reduced performance with regards to scaling improvements from increasing pool size. + +On the other end of the spectrum, long lived remote commands that generate *no* output only need the event loop at the start, when they are establishing connections, and at the end, when they are finished and need to gather exit codes, which results in practically zero CPU overhead at any time other than start or end of command execution. + +Output *generation* is done remotely and has no effect on the event loop until output is gathered - output buffers are iterated on. Only at that point does the event loop need to be held. diff --git a/pssh/clients/base/parallel.py b/pssh/clients/base/parallel.py index 62db0081..77905f12 100644 --- a/pssh/clients/base/parallel.py +++ b/pssh/clients/base/parallel.py @@ -88,12 +88,7 @@ def hosts(self, _hosts): def _check_host_config(self): if self.host_config is None: return - host_len = 0 - try: - host_len = len(self.hosts) - except TypeError: - # Generator - return + host_len = len(self.hosts) if host_len != len(self.host_config): raise ValueError( "Host config entries must match number of hosts if provided. " @@ -169,8 +164,10 @@ def join_shells(self, shells, timeout=None): finished_shells = [g.get() for g in finished] unfinished_shells = list(set(shells).difference(set(finished_shells))) if len(unfinished_shells) > 0: - raise Timeout("Timeout of %s sec(s) reached with commands " - "still running", timeout, finished_shells, unfinished_shells) + raise Timeout( + "Timeout of %s sec(s) reached with commands still running", + timeout, finished_shells, unfinished_shells, + ) def run_command(self, command, user=None, stop_on_errors=True, host_args=None, use_pty=False, shell=None, @@ -354,8 +351,10 @@ def join(self, output=None, consume_output=False, timeout=None, if unfinished_cmds: finished_output = self.get_last_output(cmds=finished_cmds) unfinished_output = list(set.difference(set(output), set(finished_output))) - raise Timeout("Timeout of %s sec(s) reached with commands " - "still running", timeout, finished_output, unfinished_output) + raise Timeout( + "Timeout of %s sec(s) reached with commands still running", + timeout, finished_output, unfinished_output, + ) def _join(self, host_out, consume_output=False, timeout=None, encoding="utf-8"): diff --git a/pssh/clients/base/single.py b/pssh/clients/base/single.py index 46d83f5c..9e3bcdca 100644 --- a/pssh/clients/base/single.py +++ b/pssh/clients/base/single.py @@ -323,7 +323,10 @@ def auth(self): def _password_auth(self): raise NotImplementedError - def _pkey_auth(self, password=None): + def _pkey_auth(self, pkey_file, password=None): + raise NotImplementedError + + def _open_session(self): raise NotImplementedError def open_session(self): @@ -500,9 +503,7 @@ def copy_file(self, local_file, remote_file, recurse=False, raise NotImplementedError def _sftp_put(self, remote_fh, local_file): - with open(local_file, 'rb') as local_fh: - for data in local_fh: - self._eagain(remote_fh.write, data) + raise NotImplementedError def sftp_put(self, sftp, local_file, remote_file): raise NotImplementedError diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index 438b9673..ef287241 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -35,8 +35,8 @@ from .tunnel import FORWARDER from ..base.single import BaseSSHClient from ...output import HostOutput -from ...exceptions import AuthenticationException, SessionError, SFTPError, \ - SFTPIOError, Timeout, SCPError, ProxyError +from ...exceptions import SessionError, SFTPError, \ + SFTPIOError, Timeout, SCPError, ProxyError, AuthenticationError from ...constants import DEFAULT_RETRIES, RETRY_DELAY @@ -87,11 +87,9 @@ def __init__(self, host, :type allow_agent: bool :param identity_auth: (Optional) set to False to disable attempting to authenticate with default identity files from - `pssh.clients.base_ssh_client.BaseSSHClient.IDENTITIES` + `pssh.clients.base.single.BaseSSHClient.IDENTITIES` :type identity_auth: bool - :param forward_ssh_agent: (Optional) Turn on SSH agent forwarding - - equivalent to `ssh -A` from the `ssh` command line utility. - Defaults to True if not set. + :param forward_ssh_agent: Unused - agent forwarding not implemented. :type forward_ssh_agent: bool :param proxy_host: Connect to target host via given proxy host. :type proxy_host: str @@ -161,7 +159,7 @@ def _connect_proxy(self, proxy_host, proxy_port, proxy_pkey, if not FORWARDER.started.is_set(): FORWARDER.start() FORWARDER.started.wait() - FORWARDER.in_q.put((self._proxy_client, self.host, self.port)) + FORWARDER.enqueue(self._proxy_client, self.host, self.port) proxy_local_port = FORWARDER.out_q.get() return proxy_local_port @@ -175,6 +173,8 @@ def disconnect(self): pass self.session = None self.sock = None + if isinstance(self._proxy_client, SSHClient): + self._proxy_client.disconnect() def spawn_send_keepalive(self): """Spawns a new greenlet that sends keep alive messages every @@ -215,14 +215,17 @@ def _keepalive(self): self.configure_keepalive() self._keepalive_greenlet = self.spawn_send_keepalive() + def _agent_auth(self): + self.session.agent_auth(self.user) + def auth(self): if self.pkey is not None: logger.debug( "Proceeding with private key file authentication") - return self._pkey_auth(password=self.password) + return self._pkey_auth(self.pkey, password=self.password) if self.allow_agent: try: - self.session.agent_auth(self.user) + self._agent_auth() except (AgentAuthenticationError, AgentConnectionError, AgentGetIdentityError, AgentListIdentitiesError) as ex: logger.debug("Agent auth failed with %s" @@ -232,41 +235,44 @@ def auth(self): else: logger.debug("Authentication with SSH Agent succeeded") return - try: - self._identity_auth() - except AuthenticationException: - if self.password is None: - raise - logger.debug("Private key auth failed, trying password") - self._password_auth() - - def _pkey_auth(self, password=None): + if self.identity_auth: + try: + self._identity_auth() + except AuthenticationError: + if self.password is None: + raise + logger.debug("Private key auth failed, trying password") + self._password_auth() + + def _pkey_auth(self, pkey_file, password=None): self.session.userauth_publickey_fromfile( self.user, - self.pkey, + pkey_file, passphrase=password if password is not None else '') def _password_auth(self): try: self.session.userauth_password(self.user, self.password) - except Exception: - raise AuthenticationException("Password authentication failed") + except Exception as ex: + raise AuthenticationError("Password authentication failed - %s", ex) + + def _open_session(self): + chan = self._eagain(self.session.open_session) + return chan def open_session(self): """Open new channel from session""" try: - chan = self._eagain(self.session.open_session) + chan = self._open_session() except Exception as ex: raise SessionError(ex) - # Multiple forward requests result in ChannelRequestDenied - # errors, flag is used to avoid this. if self.forward_ssh_agent and not self._forward_requested: if not hasattr(chan, 'request_auth_agent'): warn("Requested SSH Agent forwarding but libssh2 version used " "does not support it - ignoring") return chan - self._eagain(chan.request_auth_agent) - self._forward_requested = True + # self._eagain(chan.request_auth_agent) + # self._forward_requested = True return chan def _make_output_readers(self, channel, stdout_buffer, stderr_buffer): @@ -346,15 +352,9 @@ def _eagain(self, func, *args, **kwargs): def _make_sftp(self): """Make SFTP client from open transport""" try: - sftp = self.session.sftp_init() + sftp = self._eagain(self.session.sftp_init) except Exception as ex: raise SFTPError(ex) - while sftp == LIBSSH2_ERROR_EAGAIN: - self.poll() - try: - sftp = self.session.sftp_init() - except Exception as ex: - raise SFTPError(ex) return sftp def _mkdir(self, sftp, directory): @@ -435,8 +435,6 @@ def sftp_put(self, sftp, local_file, remote_file): sftp.open, remote_file, f_flags, mode) as remote_fh: try: self._sftp_put(remote_fh, local_file) - # THREAD_POOL.apply( - # sftp_put, args=(self.session, remote_fh, local_file)) except SFTPProtocolError as ex: msg = "Error writing to remote file %s - %s" logger.error(msg, remote_file, ex) @@ -674,15 +672,9 @@ def _scp_send(self, local_file, remote_file): def _sftp_openfh(self, open_func, remote_file, *args): try: - fh = open_func(remote_file, *args) + fh = self._eagain(open_func, remote_file, *args) except Exception as ex: raise SFTPError(ex) - while fh == LIBSSH2_ERROR_EAGAIN: - self.poll(timeout=0.1) - try: - fh = open_func(remote_file, *args) - except Exception as ex: - raise SFTPError(ex) return fh def _sftp_get(self, remote_fh, local_file): @@ -699,17 +691,18 @@ def sftp_get(self, sftp, remote_file, local_file): LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR) as remote_fh: try: self._sftp_get(remote_fh, local_file) - # Running SFTP in a thread requires a new session - # as session handles or any handles created by a session - # cannot be used simultaneously in multiple threads. - # THREAD_POOL.apply( - # sftp_get, args=(self.session, remote_fh, local_file)) except SFTPProtocolError as ex: msg = "Error reading from remote file %s - %s" logger.error(msg, remote_file, ex) raise SFTPIOError(msg, remote_file, ex) def get_exit_status(self, channel): + """Get exit status code for channel or ``None`` if not ready. + + :param channel: The channel to get status from. + :type channel: :py:mod:`ssh2.channel.Channel` + :rtype: int or ``None`` + """ if not channel.eof(): return return channel.get_exit_status() diff --git a/pssh/clients/native/tunnel.py b/pssh/clients/native/tunnel.py index 775436d6..4b435544 100644 --- a/pssh/clients/native/tunnel.py +++ b/pssh/clients/native/tunnel.py @@ -25,8 +25,6 @@ from gevent import spawn, joinall, get_hub, sleep from gevent.server import StreamServer -from gevent.select import poll, POLLIN, POLLOUT -from ssh2.session import LIBSSH2_SESSION_BLOCK_INBOUND, LIBSSH2_SESSION_BLOCK_OUTBOUND from ssh2.error_codes import LIBSSH2_ERROR_EAGAIN from ...constants import DEFAULT_RETRIES @@ -42,7 +40,7 @@ def __init__(self): Starts servers in their own gevent hub via thread run target. - Input ``(SSHClient, target_host, target_port)`` tuples to ``in_q`` to create new servers + Use ``enqueue`` to create new servers and get port to connect to via ``out_q`` once a target has been put into the input queue. ``SSHClient`` is the client for the SSH host that will be proxying. @@ -59,24 +57,44 @@ def _start_server(self): client, host, port = self.in_q.get() server = TunnelServer(client, host, port) server.start() + self._get_server_listen_port(client, server) + + def _get_server_listen_port(self, client, server): while not server.started: sleep(0.01) self._servers[client] = server - local_port = server.socket.getsockname()[1] + local_port = server.listen_port self.out_q.put(local_port) + def enqueue(self, client, host, port): + """Add target host:port to tunnel via client to queue. + + :param client: The client to connect via. + :type client: :py:mod:`pssh.clients.native.single.SSHClient` + :param host: Target host to open connection to. + :type host: str + :param port: Target port to connect on. + :type port: int + """ + self.in_q.put((client, host, port)) + def shutdown(self): + """Stop all tunnel servers.""" for client, server in self._servers.items(): server.stop() - def _cleanup_servers(self): + def _cleanup_servers_let(self): while True: - for client, server in self._servers.items(): - if client.sock.closed: - server.stop() - del self._servers[client] + self._cleanup_servers() sleep(60) + def _cleanup_servers(self): + for client in list(self._servers.keys()): + server = self._servers[client] + if client.sock is None or client.sock.closed: + server.stop() + del self._servers[client] + def run(self): """Thread runner ensures a non main hub has been created for all subsequent greenlets and waits for (client, host, port) tuples to be put into self.in_q. @@ -87,7 +105,7 @@ def run(self): self._hub = get_hub() assert self._hub.main_hub is False self.started.set() - self._cleanup_let = spawn(self._cleanup_servers) + self._cleanup_let = spawn(self._cleanup_servers_let) logger.debug("Hub in server runner is main hub: %s", self._hub.main_hub) try: while True: @@ -108,14 +126,20 @@ class TunnelServer(StreamServer): to/from remote SSH host for each connection. """ - def __init__(self, client, host, port, bind_address='127.0.0.1', timeout=0.1): + def __init__(self, client, host, port, bind_address='127.0.0.1', + num_retries=DEFAULT_RETRIES): StreamServer.__init__(self, (bind_address, 0), self._read_rw) self.client = client self.host = host self.port = port self.session = client.session - self._retries = DEFAULT_RETRIES - self.timeout = timeout + self._client = client + self._retries = num_retries + self.bind_address = bind_address + + @property + def listen_port(self): + return self.socket.getsockname()[1] if self.socket is not None else None def _read_rw(self, socket, address): local_addr, local_port = address @@ -142,8 +166,7 @@ def _wait_send_receive_lets(self, source, dest, channel, forward_sock): joinall((source, dest), raise_error=True) finally: logger.debug("Closing channel and forward socket") - while channel is not None and channel.close() == LIBSSH2_ERROR_EAGAIN: - self.poll(timeout=.5) + self._client.close_channel(channel) forward_sock.close() def _read_forward_sock(self, forward_sock, channel): @@ -161,16 +184,11 @@ def _read_forward_sock(self, forward_sock, channel): if data_len == 0: sleep(.01) continue - data_written = 0 - while data_written < data_len: - try: - rc, bytes_written = channel.write(data[data_written:]) - except Exception as ex: - logger.error("Channel write error: %s", ex) - raise - data_written += bytes_written - if rc == LIBSSH2_ERROR_EAGAIN: - self.poll() + try: + self._client._eagain_write(channel.write, data) + except Exception as ex: + logger.error("Error writing data to channel - %s", ex) + raise logger.debug("Wrote all data to channel") def _read_channel(self, forward_sock, channel): @@ -185,7 +203,7 @@ def _read_channel(self, forward_sock, channel): raise # logger.debug("Read %s data from channel" % (size,)) if size == LIBSSH2_ERROR_EAGAIN: - self.poll() + self._client.poll() continue elif size == 0: sleep(.01) @@ -200,12 +218,12 @@ def _read_channel(self, forward_sock, channel): def _open_channel(self, fw_host, fw_port, local_port): channel = self.session.direct_tcpip_ex( - fw_host, fw_port, '127.0.0.1', + fw_host, fw_port, self.bind_address, local_port) while channel == LIBSSH2_ERROR_EAGAIN: - self.poll() + self._client.poll() channel = self.session.direct_tcpip_ex( - fw_host, fw_port, '127.0.0.1', + fw_host, fw_port, self.bind_address, local_port) return channel @@ -226,30 +244,6 @@ def _open_channel_retries(self, fw_host, fw_port, local_port, continue return channel - def poll(self, timeout=None): - """Perform co-operative gevent poll on ssh2 session socket. - - Blocks current greenlet only if socket has pending read or write operations - in the appropriate direction. - """ - directions = self.session.block_directions() - if directions == 0: - return - events = 0 - if directions & LIBSSH2_SESSION_BLOCK_INBOUND: - events = POLLIN - if directions & LIBSSH2_SESSION_BLOCK_OUTBOUND: - events |= POLLOUT - self._poll_socket(self.session.sock, events, timeout=timeout) - - def _poll_socket(self, sock, events, timeout=None): - # gevent.select.poll converts seconds to miliseconds to match python socket - # implementation - timeout = timeout * 1000 if timeout is not None else 100 - poller = poll() - poller.register(sock, eventmask=events) - return poller.poll(timeout=timeout) - FORWARDER = LocalForwarder() FORWARDER.daemon = True diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index f5069c10..bef0d069 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -156,7 +156,7 @@ def auth(self): if self.pkey is not None: logger.debug( "Proceeding with private key file authentication") - return self._pkey_auth(self.password) + return self._pkey_auth(self.pkey, password=self.password) if self.allow_agent: try: self.session.userauth_agent(self.user) @@ -194,8 +194,8 @@ def _password_auth(self): except Exception as ex: raise AuthenticationError("Password authentication failed - %s", ex) - def _pkey_auth(self, password=None): - pkey = import_privkey_file(self.pkey, passphrase=password if password is not None else '') + def _pkey_auth(self, pkey_file, password=None): + pkey = import_privkey_file(pkey_file, passphrase=password if password is not None else '') if self.cert_file is not None: logger.debug("Certificate file set - trying certificate authentication") self._import_cert_file(pkey) @@ -210,19 +210,17 @@ def _import_cert_file(self, pkey): def _shell(self, channel): return self._eagain(channel.request_shell) + def _open_session(self): + channel = self.session.channel_new() + channel.set_blocking(0) + self._eagain(channel.open_session) + return channel + def open_session(self): """Open new channel from session.""" logger.debug("Opening new channel on %s", self.host) try: - channel = self.session.channel_new() - channel.set_blocking(0) - while channel.open_session() == SSH_AGAIN: - logger.debug( - "Channel open session blocked, waiting on socket..") - self.poll() - # Select on open session can dead lock without - # yielding event loop - sleep(.1) + channel = self._open_session() except Exception as ex: raise SessionError(ex) return channel @@ -307,9 +305,11 @@ def finished(self, channel): return channel.is_eof() def get_exit_status(self, channel): - """Get exit status from channel if ready else return `None`. + """Get exit status code for channel or ``None`` if not ready. - :rtype: int or `None` + :param channel: The channel to get status from. + :type channel: :py:mod:`ssh.channel.Channel` + :rtype: int or ``None`` """ if not channel.is_eof(): return diff --git a/setup.py b/setup.py index 79987017..ddb9c23e 100644 --- a/setup.py +++ b/setup.py @@ -45,8 +45,6 @@ 'Intended Audience :: Developers', 'Operating System :: OS Independent', 'Programming Language :: C', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', diff --git a/tests/native/test_parallel_client.py b/tests/native/test_parallel_client.py index 35d307fb..c6522021 100644 --- a/tests/native/test_parallel_client.py +++ b/tests/native/test_parallel_client.py @@ -36,7 +36,7 @@ from pssh.exceptions import UnknownHostException, \ AuthenticationException, ConnectionErrorException, SessionError, \ HostArgumentException, SFTPError, SFTPIOError, Timeout, SCPError, \ - PKeyFileError, ShellError + PKeyFileError, ShellError, HostArgumentError from pssh.output import HostOutput from .base_ssh2_case import PKEY_FILENAME, PUB_FILE @@ -166,6 +166,8 @@ def test_client_join_consume_output(self): self.assertTrue(len(stdout) == 0) self.assertTrue(len(stderr) == 0) self.assertEqual(expected_exit_code, exit_code) + self.assertIsNone(self.client._join(None)) + self.assertIsNone(self.client.join([None])) def test_client_join_stdout(self): output = self.client.run_command(self.cmd) @@ -295,7 +297,7 @@ def test_timeout_on_open_session(self): def _session(timeout=1): sleep(timeout+1) joinall(client.connect_auth()) - sleep(.01) + sleep(.1) client._host_clients[(0, self.host)].open_session = _session self.assertRaises(Timeout, client.run_command, self.cmd) @@ -366,7 +368,9 @@ def test_sftp_exceptions(self): # Port with no server listening on it on separate ip port = self.make_random_port(host=self.host) client = ParallelSSHClient([self.host], port=port, num_retries=1) - cmds = client.copy_file("test", "test") + _local = "fake_local" + _remote = "fake_remote" + cmds = client.copy_file(_local, _remote) client.pool.join() for cmd in cmds: try: @@ -376,6 +380,8 @@ def test_sftp_exceptions(self): self.assertIsInstance(ex, ConnectionErrorException) else: raise Exception("Expected ConnectionErrorException, got none") + self.assertFalse(os.path.isfile(_local)) + self.assertFalse(os.path.isfile(_remote)) def test_pssh_copy_file(self): """Test parallel copy file""" @@ -1625,12 +1631,17 @@ def test_scp_send(self): except OSError: pass - def test_scp_send_bad_copy_args(self): + def test_scp_bad_copy_args(self): client = ParallelSSHClient([self.host, self.host]) - copy_args = [{'local_file': 'test', 'remote_file': 'test'}] + copy_args = [{'local_file': 'fake_file', 'remote_file': 'fake_remote_file'}] self.assertRaises(HostArgumentException, client.scp_send, '%(local_file)s', '%(remote_file)s', copy_args=copy_args) + self.assertRaises(HostArgumentError, + client.scp_recv, '%(local_file)s', '%(remote_file)s', + copy_args=copy_args) + self.assertFalse(os.path.isfile('fake_file')) + self.assertFalse(os.path.isfile('fake_remote_file')) def test_scp_send_exc(self): client = ParallelSSHClient([self.host], pkey=self.user_key, num_retries=1) @@ -1794,9 +1805,8 @@ def test_client_disconnect(self): port=self.port, pkey=self.user_key, num_retries=1) - output = client.run_command(self.cmd, - return_list=True) - client.join(output, consume_output=True) + client.run_command(self.cmd) + client.join(consume_output=True) single_client = list(client._host_clients.values())[0] del client self.assertEqual(single_client.session, None) diff --git a/tests/native/test_single_client.py b/tests/native/test_single_client.py index fc15f95c..cacb1412 100644 --- a/tests/native/test_single_client.py +++ b/tests/native/test_single_client.py @@ -20,6 +20,7 @@ import time import subprocess import shutil +import tempfile from hashlib import sha256 from datetime import datetime @@ -30,9 +31,10 @@ from ssh2.channel import Channel from ssh2.exceptions import SocketDisconnectError, BannerRecvError, SocketRecvError, \ AgentConnectionError, AgentListIdentitiesError, \ - AgentAuthenticationError, AgentGetIdentityError + AgentAuthenticationError, AgentGetIdentityError, SFTPProtocolError from pssh.exceptions import AuthenticationException, ConnectionErrorException, \ - SessionError, SFTPIOError, SFTPError, SCPError, PKeyFileError, Timeout + SessionError, SFTPIOError, SFTPError, SCPError, PKeyFileError, Timeout, \ + AuthenticationError from .base_ssh2_case import SSH2TestCase from ..embedded_server.openssh import OpenSSHServer @@ -51,6 +53,39 @@ def test_sftp_fail(self): self.assertRaises(SFTPIOError, self.client._mkdir, sftp, '/blah') self.assertRaises(SFTPError, self.client.sftp_put, sftp, 'a file', '/blah') + def test_sftp_exc(self): + def _sftp_exc(local_file, remote_file): + raise SFTPProtocolError + client = SSHClient(self.host, port=self.port, + pkey=self.user_key, + num_retries=1) + client._sftp_put = _sftp_exc + local_file = 'local_file' + try: + with open(local_file, 'wb') as fh: + fh.write(b'asdf') + fh.flush() + self.assertRaises(SFTPIOError, client.copy_file, local_file, 'remote_file') + finally: + try: + os.unlink(local_file) + except Exception: + pass + client._sftp_get = _sftp_exc + remote_file = os.path.expanduser('~/remote_file') + try: + with open(remote_file, 'wb') as fh: + fh.write(b'asdf') + fh.flush() + self.assertRaises(SFTPIOError, client.copy_remote_file, remote_file, 'local_file') + finally: + try: + os.unlink(remote_file) + except Exception: + pass + self.assertRaises( + SFTPIOError, client.copy_remote_file, 'fake_remote_file_not_exists', 'local') + def test_scp_fail(self): self.assertRaises(SCPError, self.client.scp_recv, 'fakey', 'fake') try: @@ -81,6 +116,17 @@ def _session(timeout=2): client.open_session = _session self.assertRaises(GTimeout, client.run_command, self.cmd) + def test_open_session_exc(self): + class Error(Exception): + pass + def _session(): + raise Error + client = SSHClient(self.host, port=self.port, + pkey=self.user_key, + num_retries=1) + client._open_session = _session + self.assertRaises(SessionError, client.open_session) + def test_finished_error(self): self.assertRaises(ValueError, self.client.wait_finished, None) self.assertIsNone(self.client.finished(None)) @@ -117,7 +163,6 @@ def test_manual_auth(self): client.session.disconnect() del client.session del client.sock - client.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client._connect(self.host, self.port) client._init_session() # Identity auth @@ -125,12 +170,55 @@ def test_manual_auth(self): client.session.disconnect() del client.session del client.sock - client.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client._connect(self.host, self.port) client.session = Session() client.session.handshake(client.sock) self.assertRaises(AuthenticationException, client.auth) + def test_default_identities_auth(self): + client = SSHClient(self.host, port=self.port, + pkey=self.user_key, + num_retries=1, + allow_agent=False) + client.session.disconnect() + client.pkey = None + del client.session + del client.sock + client._connect(self.host, self.port) + client._init_session() + # Default identities auth only + self.assertRaises(AuthenticationException, client._identity_auth) + # Default auth + self.assertRaises(AuthenticationException, client.auth) + + def test_agent_auth_failure(self): + class UnknownError(Exception): + pass + def _agent_auth_unk(): + raise UnknownError + def _agent_auth_agent_err(): + raise AgentConnectionError + client = SSHClient(self.host, port=self.port, + pkey=self.user_key, + num_retries=1, + allow_agent=True) + client.session.disconnect() + client.pkey = None + client._connect(self.host, self.port) + client._agent_auth = _agent_auth_unk + self.assertRaises(AuthenticationError, client.auth) + client._agent_auth = _agent_auth_agent_err + self.assertRaises(AuthenticationError, client.auth) + + def test_agent_fwd(self): + client = SSHClient(self.host, port=self.port, + pkey=self.user_key, + num_retries=1, + allow_agent=True, + forward_ssh_agent=True) + out = client.run_command(self.cmd) + client.wait_finished(out) + def test_failed_auth(self): self.assertRaises(PKeyFileError, SSHClient, self.host, port=self.port, pkey='client_pkey', @@ -169,13 +257,6 @@ def test_identity_auth_failure(self): SSHClient, self.host, port=self.port, num_retries=1, allow_agent=False) - @unittest.skipUnless(bool(os.getenv('TRAVIS')), - "Not on Travis-CI - skipping agent auth failure test") - def test_agent_auth_failure(self): - self.assertRaises(AuthenticationException, - SSHClient, self.host, port=self.port, num_retries=1, - allow_agent=True) - def test_password_auth_failure(self): self.assertRaises(AuthenticationException, SSHClient, self.host, port=self.port, num_retries=1, @@ -264,6 +345,9 @@ def test_finished(self): stdout = list(host_out.stdout) self.assertTrue(self.client.finished(channel)) self.assertListEqual(stdout, [self.resp]) + self.assertRaises(ValueError, self.client.wait_finished, None) + host_out.channel = None + self.assertIsNone(self.client.wait_finished(host_out)) def test_wait_finished_timeout(self): host_out = self.client.run_command('sleep 2') @@ -304,12 +388,14 @@ def test_scp_abspath_recursion(self): for _file in files: local_file_path = os.path.sep.join([copy_to_path, _file]) self.assertTrue(os.path.isfile(local_file_path)) + shutil.rmtree(to_copy_dir_path) + self.assertRaises( + SCPError, self.client.scp_recv, to_copy_dir_path, copy_to_path, recurse=True) finally: - for _path in (to_copy_dir_path, copy_to_path): - try: - shutil.rmtree(_path) - except Exception: - pass + try: + shutil.rmtree(copy_to_path) + except Exception: + pass def test_copy_file_abspath_recurse(self): cur_dir = os.path.dirname(__file__) @@ -449,6 +535,38 @@ def test_scp_recv_large_file(self): except Exception: pass + def test_scp_send_write_exc(self): + class WriteError(Exception): + pass + def write_exc(func, data): + raise WriteError + cur_dir = os.path.dirname(__file__) + file_name = 'file1' + file_copy_to = 'file_copied' + file_path_from = os.path.sep.join([cur_dir, file_name]) + file_copy_to_dirpath = os.path.expanduser('~/') + file_copy_to + client = SSHClient(self.host, port=self.port, + pkey=self.user_key, + num_retries=1) + for _path in (file_path_from, file_copy_to_dirpath): + try: + os.unlink(_path) + except OSError: + pass + try: + with open(file_path_from, 'wb') as fh: + fh.write(b"adsfasldkfjabafj") + client.eagain_write = write_exc + self.assertRaises(SCPError, client.scp_send, file_path_from, file_copy_to_dirpath) + # File created on SCP channel open + self.assertTrue(os.path.isfile(file_copy_to_dirpath)) + finally: + for _path in (file_path_from, file_copy_to_dirpath): + try: + os.unlink(_path) + except Exception: + pass + def test_scp_send_large_file(self): cur_dir = os.path.dirname(__file__) file_name = 'file1' @@ -496,6 +614,41 @@ def test_scp_send_dir_target(self): file_path_from = os.path.sep.join([cur_dir, file_name]) file_copy_to_dirpath = os.path.expanduser('~/') file_copy_to_abs = file_copy_to_dirpath + file_name + dir_copy_from = os.path.sep.join([cur_dir, 'copy_from']) + dir_copy_file_from = os.path.sep.join([dir_copy_from, file_name]) + os.makedirs(dir_copy_from) + dir_copy_to = tempfile.mkdtemp() + # Should be created by client + shutil.rmtree(dir_copy_to) + for _path in (file_path_from, file_copy_to_abs): + try: + os.unlink(_path) + except OSError: + pass + try: + with open(file_path_from, 'wb') as fh, \ + open(dir_copy_file_from, 'wb') as fh2: + fh.write(b"adsfasldkfjabafj") + fh2.write(b"adsfasldkfjabafj") + self.client.scp_send(file_path_from, file_copy_to_dirpath) + self.assertTrue(os.path.isfile(file_copy_to_abs)) + self.assertRaises(ValueError, self.client.scp_send, dir_copy_from, dir_copy_to) + self.assertFalse(os.path.isdir(dir_copy_to)) + self.client.scp_send(dir_copy_from, dir_copy_to, recurse=True) + self.assertTrue(os.path.isdir(dir_copy_to)) + self.assertTrue(os.path.isfile(os.path.sep.join([dir_copy_to, file_name]))) + finally: + try: + for _path in (file_path_from, file_copy_to_abs): + os.unlink(_path) + except OSError: + pass + try: + shutil.rmtree(dir_copy_from) + except Exception: + pass + # Relative path + file_copy_to_dirpath = './' for _path in (file_path_from, file_copy_to_abs): try: os.unlink(_path) @@ -512,8 +665,13 @@ def test_scp_send_dir_target(self): os.unlink(_path) except OSError: pass - # Relative path - file_copy_to_dirpath = './' + + def test_sftp_openfh_exc(self): + cur_dir = os.path.dirname(__file__) + file_name = 'file1' + file_path_from = os.path.sep.join([cur_dir, file_name]) + file_copy_to_dirpath = os.path.expanduser('~/') + file_copy_to_abs = file_copy_to_dirpath + file_name for _path in (file_path_from, file_copy_to_abs): try: os.unlink(_path) @@ -522,8 +680,10 @@ def test_scp_send_dir_target(self): try: with open(file_path_from, 'wb') as fh: fh.write(b"adsfasldkfjabafj") - self.client.scp_send(file_path_from, file_copy_to_dirpath) - self.assertTrue(os.path.isfile(file_copy_to_abs)) + os.chmod(file_path_from, 0o200) + self.assertRaises( + SFTPError, self.client.copy_remote_file, file_path_from, file_copy_to_dirpath) + self.assertFalse(os.path.isfile(file_copy_to_abs)) finally: for _path in (file_path_from, file_copy_to_abs): try: @@ -579,6 +739,8 @@ def test_interactive_shell(self): stdout = list(shell.stdout) self.assertListEqual(stdout, [self.resp, self.resp]) self.assertEqual(shell.exit_code, 0) + shell._chan = None + self.assertIsNone(shell.close()) def test_interactive_shell_exit_code(self): with self.client.open_shell() as shell: @@ -592,18 +754,8 @@ def test_interactive_shell_exit_code(self): # TODO - # * scp send recursive # * scp recv recursive local dir permission denied - # * scp_recv remote file not exists exception - # * scp send open local file exception # * read output callback - # * identity auth success - # * connect init retries - # * handshake retries - # * agent forwarding - # * password auth # * disconnect exception # * SFTP init exception - # * sftp openfh exception - # * sftp get exception # * copy file local_file dir no recurse exception diff --git a/tests/native/test_tunnel.py b/tests/native/test_tunnel.py index a84a6565..b5f30454 100644 --- a/tests/native/test_tunnel.py +++ b/tests/native/test_tunnel.py @@ -28,15 +28,16 @@ from socket import timeout as socket_timeout from sys import version_info from collections import deque +from gevent import sleep, spawn, Timeout as GTimeout from pssh.config import HostConfig from pssh.clients.native import SSHClient, ParallelSSHClient -from pssh.clients.native.tunnel import LocalForwarder +from pssh.clients.native.tunnel import LocalForwarder, TunnelServer from pssh.exceptions import UnknownHostException, \ AuthenticationException, ConnectionErrorException, SessionError, \ HostArgumentException, SFTPError, SFTPIOError, Timeout, SCPError, \ ProxyError -from ssh2.exceptions import ChannelFailure, SocketSendError +from ssh2.exceptions import ChannelFailure, SocketSendError, SocketRecvError from .base_ssh2_case import PKEY_FILENAME, PUB_FILE from ..embedded_server.openssh import OpenSSHServer @@ -70,7 +71,7 @@ def test_forwarder(self): forwarder.started.wait() client = SSHClient( self.proxy_host, port=self.proxy_port, pkey=self.user_key) - forwarder.in_q.put((client, self.proxy_host, self.port)) + forwarder.enqueue(client, self.proxy_host, self.port) forwarder.out_q.get() self.assertTrue(len(forwarder._servers) > 0) forwarder.shutdown() @@ -234,5 +235,106 @@ def test_proxy_error(self): client.join(output) self.assertIsInstance(output[0].exception, ProxyError) - # TODO: - # * channel/socket read/write failure tests + def test_proxy_bad_target(self): + self.assertRaises( + SocketRecvError, SSHClient, + '127.0.0.155', port=self.proxy_port, pkey=self.user_key, + proxy_host=self.proxy_host, proxy_port=self.proxy_port, + num_retries=1, + ) + + def test_forwarder_exit(self): + def _start_server(): + raise Exception + + forwarder = LocalForwarder() + forwarder.daemon = True + forwarder.start() + forwarder.started.wait() + client = SSHClient( + self.proxy_host, port=self.proxy_port, pkey=self.user_key) + forwarder.enqueue(client, self.proxy_host, self.port) + forwarder.out_q.get() + self.assertTrue(len(forwarder._servers) > 0) + client.sock.close() + client.disconnect() + forwarder._cleanup_servers() + self.assertEqual(len(forwarder._servers), 0) + forwarder._start_server = _start_server + forwarder.enqueue(client, self.proxy_host, self.port) + sleep(.1) + + def test_socket_channel_error(self): + class SocketError(Exception): + pass + class ChannelFailure(object): + def read(self): + raise SocketRecvError + def write(self, data): + raise SocketSendError + def eof(self): + return False + def close(self): + return + class Channel(object): + def __init__(self): + self._eof = False + def read(self): + return 5, b"asdfa" + def write(self, data): + return 0, len(data) + def eof(self): + return self._eof + def close(self): + return + class Socket(object): + def recv(self, num): + return b"asdfaf" + def close(self): + return + class SocketFailure(object): + def sendall(self, data): + raise SocketError + def recv(self, num): + raise SocketError + def close(self): + return + class SocketEmpty(object): + def recv(self, num): + return b"" + def close(self): + return + client = SSHClient( + self.proxy_host, port=self.proxy_port, pkey=self.user_key) + server = TunnelServer(client, self.proxy_host, self.port) + let = spawn(server._read_forward_sock, SocketEmpty(), Channel()) + let.start() + sleep(.01) + self.assertRaises(SocketSendError, server._read_forward_sock, Socket(), ChannelFailure()) + self.assertRaises(SocketError, server._read_forward_sock, SocketFailure(), Channel()) + self.assertRaises(SocketError, server._read_channel, SocketFailure(), Channel()) + self.assertRaises(SocketRecvError, server._read_channel, Socket(), ChannelFailure()) + channel = Channel() + _socket = Socket() + source_let = spawn(server._read_forward_sock, _socket, channel) + dest_let = spawn(server._read_channel, _socket, channel) + channel._eof = True + self.assertIsNone(server._wait_send_receive_lets(source_let, dest_let, channel, _socket)) + let.kill() + + def test_server_start(self): + _port = 1234 + class Server(object): + def __init__(self): + self.started = False + self.listen_port = _port + server = Server() + forwarder = LocalForwarder() + let = spawn(forwarder._get_server_listen_port, None, server) + let.start() + sleep(.01) + server.started = True + sleep(.01) + with GTimeout(seconds=1): + port = forwarder.out_q.get() + self.assertEqual(port, _port) diff --git a/tests/ssh/test_parallel_client.py b/tests/ssh/test_parallel_client.py index 024d92c0..8ba5137f 100644 --- a/tests/ssh/test_parallel_client.py +++ b/tests/ssh/test_parallel_client.py @@ -451,6 +451,11 @@ def test_finished_list_output(self): self.client.join(output) self.assertTrue(self.client.finished(output)) + def test_default_finished(self): + client = ParallelSSHClient([self.host], port=self.port, + pkey=self.user_key) + self.assertTrue(client.finished()) + def test_agent_auth(self): client = ParallelSSHClient( [self.host], port=self.port, @@ -492,6 +497,12 @@ def test_read_multi_same_hosts(self): stdout = list(host_out.stdout) self.assertListEqual(stdout, [self.resp]) + def test_join_bad_host_out(self): + out = HostOutput(None, None, None, None) + self.assertIsNone(self.client._join(out)) + self.assertIsNone(self.client._join(None)) + self.assertIsNone(self.client.join([None])) + # def test_multiple_run_command_timeout(self): # client = ParallelSSHClient([self.host], port=self.port, # pkey=self.user_key) diff --git a/tests/ssh/test_single_client.py b/tests/ssh/test_single_client.py index a54a7ac0..9056aacf 100644 --- a/tests/ssh/test_single_client.py +++ b/tests/ssh/test_single_client.py @@ -20,11 +20,11 @@ from datetime import datetime -from gevent import sleep, Timeout as GTimeout +from gevent import sleep, Timeout as GTimeout, spawn from ssh.session import Session -# from ssh.exceptions import SocketDisconnectError from pssh.exceptions import AuthenticationException, ConnectionErrorException, \ - SessionError, SFTPIOError, SFTPError, SCPError, PKeyFileError, Timeout + SessionError, SFTPIOError, SFTPError, SCPError, PKeyFileError, Timeout, \ + AuthenticationError from pssh.clients.ssh.single import SSHClient, logger as ssh_logger from .base_ssh_case import SSHTestCase @@ -61,6 +61,20 @@ def test_execute(self): exit_code = host_out.channel.get_exit_status() self.assertEqual(exit_code, 0) + def test_finished(self): + self.assertFalse(self.client.finished(None)) + host_out = self.client.run_command(self.cmd) + channel = host_out.channel + self.assertFalse(self.client.finished(channel)) + self.assertRaises(ValueError, self.client.wait_finished, host_out.channel) + self.client.wait_finished(host_out) + stdout = list(host_out.stdout) + self.assertTrue(self.client.finished(channel)) + self.assertListEqual(stdout, [self.resp]) + self.assertRaises(ValueError, self.client.wait_finished, None) + host_out.channel = None + self.assertIsNone(self.client.wait_finished(host_out)) + def test_finished_error(self): self.assertRaises(ValueError, self.client.wait_finished, None) self.assertIsNone(self.client.finished(None)) @@ -74,6 +88,22 @@ def test_stderr(self): self.assertListEqual(expected, stderr) self.assertEqual(len(output), 0) + def test_default_identities_auth(self): + client = SSHClient(self.host, port=self.port, + pkey=self.user_key, + num_retries=1, + allow_agent=False) + client.session.disconnect() + client.pkey = None + del client.session + del client.sock + client._connect(self.host, self.port) + client._init_session() + # Default identities auth only + self.assertRaises(AuthenticationException, client._identity_auth) + # Default auth + self.assertRaises(AuthenticationException, client.auth) + def test_long_running_cmd(self): host_out = self.client.run_command('sleep 2; exit 2') self.assertRaises(ValueError, self.client.wait_finished, host_out.channel) @@ -100,6 +130,14 @@ def test_client_disconnect_on_del(self): del client self.assertTrue(client_sock.closed) + def test_client_bad_sock(self): + client = SSHClient(self.host, port=self.port, + pkey=self.user_key, + num_retries=1) + client.disconnect() + client.sock = None + self.assertIsNone(client.poll()) + def test_client_read_timeout(self): client = SSHClient(self.host, port=self.port, pkey=self.user_key, @@ -141,11 +179,50 @@ def test_interactive_shell_exit_code(self): self.assertListEqual(stdout, [self.resp, self.resp]) self.assertEqual(shell.output.exit_code, 1) + def test_password_auth_failure(self): + self.assertRaises(AuthenticationError, + SSHClient, self.host, port=self.port, num_retries=1, + allow_agent=False, + password='blah blah blah') + + def test_open_session_timeout(self): + client = SSHClient(self.host, port=self.port, + pkey=self.user_key, + num_retries=2, + timeout=1) + def _session(timeout=2): + sleep(2) + client.open_session = _session + self.assertRaises(GTimeout, client.run_command, self.cmd) + + def test_connection_timeout(self): + cmd = spawn(SSHClient, 'fakehost.com', port=12345, + retry_delay=1, + num_retries=2, timeout=1, _auth_thread_pool=False) + # Should fail within greenlet timeout, otherwise greenlet will + # raise timeout which will fail the test + self.assertRaises(ConnectionErrorException, cmd.get, timeout=5) + + def test_client_read_timeout(self): + client = SSHClient(self.host, port=self.port, + pkey=self.user_key, + num_retries=1) + host_out = client.run_command('sleep 2; echo me', timeout=0.2) + self.assertRaises(Timeout, list, host_out.stdout) + + def test_open_session_exc(self): + class Error(Exception): + pass + def _session(): + raise Error + client = SSHClient(self.host, port=self.port, + pkey=self.user_key, + num_retries=1) + client._open_session = _session + self.assertRaises(SessionError, client.open_session) + + def test_invalid_mkdir(self): + self.assertRaises(OSError, self.client._make_local_dir, '/my_new_dir') # TODO: - # * read timeouts - # * session connect retry - # * agent auth success - # * password auth failure - # * open session error # * disconnect exc diff --git a/tests/test_output.py b/tests/test_output.py index 4873f12a..acd00875 100644 --- a/tests/test_output.py +++ b/tests/test_output.py @@ -21,13 +21,19 @@ import unittest -from pssh.output import HostOutput +from pssh.output import HostOutput, BufferData, HostOutputBuffers class TestHostOutput(unittest.TestCase): def setUp(self): - self.output = HostOutput(None, None, None, None, None, None, True) + self.output = HostOutput( + None, None, None, None, + buffers=HostOutputBuffers( + BufferData(None, None), + BufferData(None, None), + ) + ) def test_print(self): self.assertTrue(str(self.output)) @@ -36,11 +42,21 @@ def test_bad_exit_status(self): self.assertIsNone(self.output.exit_code) def test_excepting_client_exit_code(self): + class ChannelError(Exception): + pass class ExcSSHClient(object): def get_exit_status(self, channel): - raise Exception + raise ChannelError exc_client = ExcSSHClient() host_out = HostOutput( - 'host', None, None, None, None, exc_client) + 'host', None, None, client=exc_client) + exit_code = host_out.exit_code + self.assertIsNone(exit_code) + + def test_none_output_client(self): + host_out = HostOutput( + 'host', None, None, client=None) exit_code = host_out.exit_code self.assertEqual(exit_code, None) + self.assertIsNone(host_out.stdout) + self.assertIsNone(host_out.stderr)