Skip to content

Commit

Permalink
Cleanups, bug fixes for #104.
Browse files Browse the repository at this point in the history
    Updated documentation, changelog.
    Added raising timeout on native client wait_finished.
    Resolves #104.
  • Loading branch information
Pan committed Mar 7, 2018
1 parent f94aaf9 commit ab79b3a
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 17 deletions.
14 changes: 14 additions & 0 deletions Changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
Change Log
============

1.5.0
++++++

Changes
---------

* ``ParallelSSH2Client.join`` with timeout now consumes output to ensure command completion status is accurate.
* Output reading now raises ``pssh.exceptions.Timeout`` exception when timeout is requested and reached with command still running.

Fixes
------

* ``ParallelSSH2Client.join`` would always raise ``Timeout`` when output has not been consumed even if command has finished - #104.

1.4.0
++++++

Expand Down
51 changes: 50 additions & 1 deletion doc/advanced.rst
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,63 @@ Configuration for the proxy host's user name, port, password and private key can
Where ``proxy.key`` is a filename containing private key to use for proxy host authentication.

In the above example, connections to the target hosts are made via ``my_proxy_user@bastion:2222`` -> ``target_host_user@<host>``.
In the above example, connections to the target hosts are made via SSH through ``my_proxy_user@bastion:2222`` -> ``target_host_user@<host>``.

.. note::

Proxy host connections are asynchronous and use the SSH protocol's native TCP tunneling - aka local port forward. No external commands or processes are used for the proxy connection, unlike the `ProxyCommand` directive in OpenSSH and other utilities.

While connections initiated by ``parallel-ssh`` are asynchronous, connections from proxy host -> target hosts may not be, depending on SSH server implementation. If only one proxy host is used to connect to a large number of target hosts and proxy SSH server connections are *not* asynchronous, this may adversely impact performance on the proxy host.

Join and Output Timeouts
**************************

*New in 1.5.0*

The native clients have timeout functionality on reading output and ``client.join``.

.. code-block:: python
from pssh.exceptions import Timeout
output = client.run_command(..)
try:
client.join(output, timeout=5)
except Timeout:
pass
.. code-block:: python
output = client.run_command(.., timeout=5)
for host, host_out in output.items():
try:
for line in host_out.stdout:
pass
for line in host_out.stderr:
pass
except Timeout:
pass
The client will raise a ``Timeout`` exception if remote commands have not finished within five seconds in the above examples.

.. note::

``join`` with a timeout forces output to be consumed as otherwise the pending output will keep the channel open and make it appear as if command has not yet finished.

To capture output when using ``join`` with a timeout, gather output first before calling ``join``, making use of output timeout as well, and/or make use of :ref:`host logger` functionality.


.. warning::

Beware of race conditions when using timeout functionality. For best results, only send one command per call to ``run_command`` when using timeout functionality.

As the timeouts are performed on ``select`` calls on the socket which is responsible for all client <-> server communication, whether or not a timeout will occur depends on what the socket is doing at that time.

Multiple commands like ``run_command('echo blah; sleep 5')`` where ``sleep 5`` is a placeholder for something taking five seconds to complete will result in a race condition as the second command may or may not have started by the time ``join`` is called or output is read which will cause timeout to *not* be raised even if the second command has not started or completed.

It is responsibility of developer to avoid these race conditions such as by only sending one command in such cases.


Per-Host Configuration
***********************

Expand Down
2 changes: 2 additions & 0 deletions doc/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ Commands last executed by ``run_command`` can also be retrieved from the ``cmds`
*New in 1.2.0*

.. _host logger:

Host Logger
------------

Expand Down
34 changes: 25 additions & 9 deletions pssh/pssh2_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,29 +204,45 @@ def join(self, output, consume_output=False, timeout=None):
output on call to ``join`` when host logger has been enabled.
:type consume_output: bool
:param timeout: Timeout in seconds if remote command is not yet
finished.
finished. Note that use of timeout forces ``consume_output=True``
otherwise the channel output pending to be consumed always results
in the channel not being finished.
:type timeout: int
:raises: :py:class:`pssh.exceptions.Timeout` on timeout requested and
reached with commands still running.
:rtype: ``None``"""
for host in output:
for host, host_out in output.items():
if host not in self.host_clients or self.host_clients[host] is None:
continue
self.host_clients[host].wait_finished(output[host].channel,
timeout=timeout)
if timeout and not output[host].channel.eof():
client = self.host_clients[host]
channel = host_out.channel
try:
client.wait_finished(channel, timeout=timeout)
except Timeout:
raise Timeout(
"Timeout of %s sec(s) reached on host %s with command "
"still running", timeout, host)
stdout = host_out.stdout
stderr = host_out.stderr
if timeout:
# Must consume buffers prior to EOF check
self._consume_output(stdout, stderr)
if not channel.eof():
raise Timeout(
"Timeout of %s sec(s) reached on host %s with command "
"still running", timeout, host)
elif consume_output:
for line in output[host].stdout:
pass
for line in output[host].stderr:
pass
self._consume_output(stdout, stderr)
self.get_exit_codes(output)

def _consume_output(self, stdout, stderr):
for line in stdout:
pass
for line in stderr:
pass

def _get_exit_code(self, channel):
"""Get exit code from channel if ready"""
if channel is None:
Expand Down
11 changes: 8 additions & 3 deletions pssh/ssh2_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
LIBSSH2_SFTP_S_IXGRP, LIBSSH2_SFTP_S_IXOTH

from .exceptions import UnknownHostException, AuthenticationException, \
ConnectionErrorException, SessionError, SFTPError, SFTPIOError
ConnectionErrorException, SessionError, SFTPError, SFTPIOError, Timeout
from .constants import DEFAULT_RETRIES, RETRY_DELAY
from .native._ssh2 import wait_select, _read_output # , sftp_get, sftp_put

Expand Down Expand Up @@ -282,14 +282,19 @@ def wait_finished(self, channel, timeout=None):
if channel is None:
return
# If .eof() returns EAGAIN after a select with a timeout, it means
# it reached timeout without EOF and the connection should not be
# it reached timeout without EOF and the channel should not be
# closed as the command is still running.
ret = channel.wait_eof()
while ret == LIBSSH2_ERROR_EAGAIN:
wait_select(self.session, timeout=timeout)
ret = channel.wait_eof()
if ret == LIBSSH2_ERROR_EAGAIN and timeout is not None:
return
raise Timeout
# Close channel to indicate no more commands will be sent over it
self.close_channel(channel)

def close_channel(self, channel):
logger.debug("Closing channel")
self._eagain(channel.close)
self._eagain(channel.wait_closed)

Expand Down
21 changes: 19 additions & 2 deletions tests/test_pssh_ssh2_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ def setUpClass(cls):
cls.user_key = PKEY_FILENAME
cls.user_pub_key = PUB_FILE
cls.user = pwd.getpwuid(os.geteuid()).pw_name
# Single client for all tests ensures that the client does not do
# anything that causes server to disconnect the session and
# affect all subsequent uses of the same session.
cls.client = ParallelSSHClient([cls.host],
pkey=PKEY_FILENAME,
port=cls.port,
Expand Down Expand Up @@ -1166,13 +1169,27 @@ def test_host_no_client(self):
def test_join_timeout(self):
client = ParallelSSHClient([self.host], port=self.port,
pkey=self.user_key)
output = client.run_command('sleep 2')
output = client.run_command('echo me; sleep 2')
# Wait for long running command to start to avoid race condition
time.sleep(.1)
self.assertRaises(Timeout, client.join, output, timeout=1)
self.assertFalse(output[self.host].channel.eof())
client.join(output, timeout=2)
# Ensure command has actually finished - avoid race conditions
time.sleep(2)
client.join(output, timeout=3)
self.assertTrue(output[self.host].channel.eof())
self.assertTrue(client.finished(output))

def test_join_timeout_set_no_timeout(self):
client = ParallelSSHClient([self.host], port=self.port,
pkey=self.user_key)
output = client.run_command('echo me; sleep 1')
# Allow enough time for blocking command to start - avoid race condition
time.sleep(.1)
client.join(output, timeout=2)
self.assertTrue(client.finished(output))
self.assertTrue(output[self.host].channel.eof())

def test_read_timeout(self):
client = ParallelSSHClient([self.host], port=self.port,
pkey=self.user_key)
Expand Down
3 changes: 1 addition & 2 deletions tests/test_ssh2_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ def test_long_running_cmd(self):
def test_manual_auth(self):
client = SSHClient(self.host, port=self.port,
pkey=self.user_key,
num_retries=1,
timeout=1)
num_retries=1)
client.session.disconnect()
del client.session
del client.sock
Expand Down

0 comments on commit ab79b3a

Please sign in to comment.