From 56db95fcb1d184d193286faa96ef834ad1671f4c Mon Sep 17 00:00:00 2001 From: Pan Date: Wed, 7 Mar 2018 15:12:37 +0000 Subject: [PATCH] Added timeout exception raising on output reading time out. Added timeout file read test, updated tests --- Changelog.rst | 8 ++++ doc/advanced.rst | 32 ++++++++++++++ pssh/native/_ssh2.c | 79 ++++++++++++++++++++-------------- pssh/native/_ssh2.pyx | 4 +- tests/test_pssh_ssh2_client.py | 41 +++++++++++++++--- 5 files changed, 123 insertions(+), 41 deletions(-) diff --git a/Changelog.rst b/Changelog.rst index 5233093a..a76f97aa 100644 --- a/Changelog.rst +++ b/Changelog.rst @@ -1,6 +1,14 @@ Change Log ============ +1.5.1 +++++++ + +Fixes +-------- + +* Output ``pssh.exceptions.Timeout`` exception raising was not enabled. + 1.5.0 ++++++ diff --git a/doc/advanced.rst b/doc/advanced.rst index 18963091..bf1cd3e3 100644 --- a/doc/advanced.rst +++ b/doc/advanced.rst @@ -178,6 +178,38 @@ The native clients have timeout functionality on reading output and ``client.joi The client will raise a ``Timeout`` exception if remote commands have not finished within five seconds in the above examples. +In some cases, such as when the remote command never terminates unless interrupted, it is necessary to use PTY and to close the channel to force the process to be terminated before a ``join`` sans timeout can complete. For example: + +.. code-block:: python + + output = client.run_command('tail -f /var/log/messages', use_pty=True) + client.join(output, timeout=1) + # Closing channel which has PTY has the effect of terminating + # any running processes started on that channel. + for host in client.hosts: + client.host_clients[host].close_channel(output[host].channel) + client.join(output) + +Without a PTY, the ``join`` will complete but the remote process will be left running as per SSH protocol specifications. + +Output reading and Timeouts +______________________________ + +Furthermore, once reading output has timed out, it is necessary to restart the output generators as by Python design they only iterate once. This can be done as follows: + +.. code-block:: python + + output = client.run_command(.., timeout=1) + for host, host_out in output.items(): + try: + stdout = list(host_out.stdout) + except Timeout: + stdout_buf = client.host_clients[host].read_output_buffer( + client.host_clients[host].read_output( + output[host].channel, timeout=1)) + # Reset generator to be able to gather new output + host_out.stdout = stdout_buf + .. note:: ``join`` with a timeout forces output to be consumed as otherwise the pending output will keep the channel open and make it appear as if command has not yet finished. diff --git a/pssh/native/_ssh2.c b/pssh/native/_ssh2.c index 66556ec4..731824db 100644 --- a/pssh/native/_ssh2.c +++ b/pssh/native/_ssh2.c @@ -1005,6 +1005,9 @@ static CYTHON_INLINE int __Pyx_IterFinish(void); /* UnpackItemEndCheck.proto */ static int __Pyx_IternextUnpackEndCheck(PyObject *retval, Py_ssize_t expected); +/* GetModuleGlobalName.proto */ +static CYTHON_INLINE PyObject *__Pyx_GetModuleGlobalName(PyObject *name); + /* PyThreadStateGet.proto */ #if CYTHON_FAST_THREAD_STATE #define __Pyx_PyThreadState_declare PyThreadState *__pyx_tstate; @@ -1044,9 +1047,6 @@ static CYTHON_INLINE void __Pyx_ErrFetchInState(PyThreadState *tstate, PyObject /* RaiseException.proto */ static void __Pyx_Raise(PyObject *type, PyObject *value, PyObject *tb, PyObject *cause); -/* GetModuleGlobalName.proto */ -static CYTHON_INLINE PyObject *__Pyx_GetModuleGlobalName(PyObject *name); - /* GetException.proto */ #if CYTHON_FAST_THREAD_STATE #define __Pyx_GetException(type, value, tb) __Pyx__GetException(__pyx_tstate, type, value, tb) @@ -1296,6 +1296,7 @@ static const char __pyx_k_rstrip[] = "rstrip"; static const char __pyx_k_select[] = "select"; static const char __pyx_k_IOError[] = "IOError"; static const char __pyx_k_OSError[] = "OSError"; +static const char __pyx_k_Timeout[] = "Timeout"; static const char __pyx_k_linesep[] = "linesep"; static const char __pyx_k_session[] = "session"; static const char __pyx_k_timeout[] = "timeout"; @@ -1329,6 +1330,7 @@ static PyObject *__pyx_n_s_MemoryError; static PyObject *__pyx_n_s_OSError; static PyObject *__pyx_n_s_SFTPIOError; static PyObject *__pyx_n_s_SessionError; +static PyObject *__pyx_n_s_Timeout; static PyObject *__pyx_kp_b__4; static PyObject *__pyx_n_s_args; static PyObject *__pyx_n_s_b_local_file; @@ -1725,7 +1727,7 @@ static PyObject *__pyx_gb_4pssh_6native_5_ssh2_2generator(__pyx_CoroutineObject * _wait_select(_sock, _session, timeout) * _size, _data = read_func() # <<<<<<<<<<<<<< * if timeout is not None and _size == LIBSSH2_ERROR_EAGAIN: - * break + * raise Timeout */ __Pyx_INCREF(__pyx_cur_scope->__pyx_v_read_func); __pyx_t_5 = __pyx_cur_scope->__pyx_v_read_func; __pyx_t_4 = NULL; @@ -1809,7 +1811,7 @@ static PyObject *__pyx_gb_4pssh_6native_5_ssh2_2generator(__pyx_CoroutineObject * _wait_select(_sock, _session, timeout) * _size, _data = read_func() * if timeout is not None and _size == LIBSSH2_ERROR_EAGAIN: # <<<<<<<<<<<<<< - * break + * raise Timeout * while _size > 0: */ __pyx_t_10 = (__pyx_cur_scope->__pyx_v_timeout != Py_None); @@ -1827,17 +1829,21 @@ static PyObject *__pyx_gb_4pssh_6native_5_ssh2_2generator(__pyx_CoroutineObject /* "pssh/native/_ssh2.pyx":57 * _size, _data = read_func() * if timeout is not None and _size == LIBSSH2_ERROR_EAGAIN: - * break # <<<<<<<<<<<<<< + * raise Timeout # <<<<<<<<<<<<<< * while _size > 0: * while _pos < _size: */ - goto __pyx_L7_break; + __pyx_t_3 = __Pyx_GetModuleGlobalName(__pyx_n_s_Timeout); if (unlikely(!__pyx_t_3)) __PYX_ERR(0, 57, __pyx_L1_error) + __Pyx_GOTREF(__pyx_t_3); + __Pyx_Raise(__pyx_t_3, 0, 0, 0); + __Pyx_DECREF(__pyx_t_3); __pyx_t_3 = 0; + __PYX_ERR(0, 57, __pyx_L1_error) /* "pssh/native/_ssh2.pyx":56 * _wait_select(_sock, _session, timeout) * _size, _data = read_func() * if timeout is not None and _size == LIBSSH2_ERROR_EAGAIN: # <<<<<<<<<<<<<< - * break + * raise Timeout * while _size > 0: */ } @@ -1853,7 +1859,7 @@ static PyObject *__pyx_gb_4pssh_6native_5_ssh2_2generator(__pyx_CoroutineObject /* "pssh/native/_ssh2.pyx":58 * if timeout is not None and _size == LIBSSH2_ERROR_EAGAIN: - * break + * raise Timeout * while _size > 0: # <<<<<<<<<<<<<< * while _pos < _size: * linesep = _data[:_size].find(LINESEP, _pos) @@ -1863,7 +1869,7 @@ static PyObject *__pyx_gb_4pssh_6native_5_ssh2_2generator(__pyx_CoroutineObject if (!__pyx_t_9) break; /* "pssh/native/_ssh2.pyx":59 - * break + * raise Timeout * while _size > 0: * while _pos < _size: # <<<<<<<<<<<<<< * linesep = _data[:_size].find(LINESEP, _pos) @@ -2248,7 +2254,6 @@ static PyObject *__pyx_gb_4pssh_6native_5_ssh2_2generator(__pyx_CoroutineObject __pyx_cur_scope->__pyx_v__pos = 0; } } - __pyx_L7_break:; /* "pssh/native/_ssh2.pyx":75 * _size, _data = read_func() @@ -4382,6 +4387,7 @@ static __Pyx_StringTabEntry __pyx_string_tab[] = { {&__pyx_n_s_OSError, __pyx_k_OSError, sizeof(__pyx_k_OSError), 0, 0, 1, 1}, {&__pyx_n_s_SFTPIOError, __pyx_k_SFTPIOError, sizeof(__pyx_k_SFTPIOError), 0, 0, 1, 1}, {&__pyx_n_s_SessionError, __pyx_k_SessionError, sizeof(__pyx_k_SessionError), 0, 0, 1, 1}, + {&__pyx_n_s_Timeout, __pyx_k_Timeout, sizeof(__pyx_k_Timeout), 0, 0, 1, 1}, {&__pyx_kp_b__4, __pyx_k__4, sizeof(__pyx_k__4), 0, 0, 0, 0}, {&__pyx_n_s_args, __pyx_k_args, sizeof(__pyx_k_args), 0, 0, 1, 1}, {&__pyx_n_s_b_local_file, __pyx_k_b_local_file, sizeof(__pyx_k_b_local_file), 0, 0, 1, 1}, @@ -4705,15 +4711,18 @@ static int __pyx_pymod_exec__ssh2(PyObject *__pyx_pyinit_module) /* "pssh/native/_ssh2.pyx":36 * from ssh2.utils cimport to_bytes * - * from ..exceptions import SessionError # <<<<<<<<<<<<<< + * from ..exceptions import SessionError, Timeout # <<<<<<<<<<<<<< * * */ - __pyx_t_2 = PyList_New(1); if (unlikely(!__pyx_t_2)) __PYX_ERR(0, 36, __pyx_L1_error) + __pyx_t_2 = PyList_New(2); if (unlikely(!__pyx_t_2)) __PYX_ERR(0, 36, __pyx_L1_error) __Pyx_GOTREF(__pyx_t_2); __Pyx_INCREF(__pyx_n_s_SessionError); __Pyx_GIVEREF(__pyx_n_s_SessionError); PyList_SET_ITEM(__pyx_t_2, 0, __pyx_n_s_SessionError); + __Pyx_INCREF(__pyx_n_s_Timeout); + __Pyx_GIVEREF(__pyx_n_s_Timeout); + PyList_SET_ITEM(__pyx_t_2, 1, __pyx_n_s_Timeout); __pyx_t_3 = __Pyx_Import(__pyx_n_s_exceptions, __pyx_t_2, 2); if (unlikely(!__pyx_t_3)) __PYX_ERR(0, 36, __pyx_L1_error) __Pyx_GOTREF(__pyx_t_3); __Pyx_DECREF(__pyx_t_2); __pyx_t_2 = 0; @@ -4721,6 +4730,10 @@ static int __pyx_pymod_exec__ssh2(PyObject *__pyx_pyinit_module) __Pyx_GOTREF(__pyx_t_2); if (PyDict_SetItem(__pyx_d, __pyx_n_s_SessionError, __pyx_t_2) < 0) __PYX_ERR(0, 36, __pyx_L1_error) __Pyx_DECREF(__pyx_t_2); __pyx_t_2 = 0; + __pyx_t_2 = __Pyx_ImportFrom(__pyx_t_3, __pyx_n_s_Timeout); if (unlikely(!__pyx_t_2)) __PYX_ERR(0, 36, __pyx_L1_error) + __Pyx_GOTREF(__pyx_t_2); + if (PyDict_SetItem(__pyx_d, __pyx_n_s_Timeout, __pyx_t_2) < 0) __PYX_ERR(0, 36, __pyx_L1_error) + __Pyx_DECREF(__pyx_t_2); __pyx_t_2 = 0; __Pyx_DECREF(__pyx_t_3); __pyx_t_3 = 0; /* "pssh/native/_ssh2.pyx":39 @@ -5336,8 +5349,26 @@ static CYTHON_INLINE PyObject* __Pyx_PyObject_CallNoArg(PyObject *func) { return 0; } +/* GetModuleGlobalName */ + static CYTHON_INLINE PyObject *__Pyx_GetModuleGlobalName(PyObject *name) { + PyObject *result; +#if !CYTHON_AVOID_BORROWED_REFS + result = PyDict_GetItem(__pyx_d, name); + if (likely(result)) { + Py_INCREF(result); + } else { +#else + result = PyObject_GetItem(__pyx_d, name); + if (!result) { + PyErr_Clear(); +#endif + result = __Pyx_GetBuiltinName(name); + } + return result; +} + /* PyErrFetchRestore */ - #if CYTHON_FAST_THREAD_STATE + #if CYTHON_FAST_THREAD_STATE static CYTHON_INLINE void __Pyx_ErrRestoreInState(PyThreadState *tstate, PyObject *type, PyObject *value, PyObject *tb) { PyObject *tmp_type, *tmp_value, *tmp_tb; tmp_type = tstate->curexc_type; @@ -5361,7 +5392,7 @@ static CYTHON_INLINE void __Pyx_ErrFetchInState(PyThreadState *tstate, PyObject #endif /* RaiseException */ - #if PY_MAJOR_VERSION < 3 + #if PY_MAJOR_VERSION < 3 static void __Pyx_Raise(PyObject *type, PyObject *value, PyObject *tb, CYTHON_UNUSED PyObject *cause) { __Pyx_PyThreadState_declare @@ -5519,24 +5550,6 @@ static void __Pyx_Raise(PyObject *type, PyObject *value, PyObject *tb, PyObject } #endif -/* GetModuleGlobalName */ - static CYTHON_INLINE PyObject *__Pyx_GetModuleGlobalName(PyObject *name) { - PyObject *result; -#if !CYTHON_AVOID_BORROWED_REFS - result = PyDict_GetItem(__pyx_d, name); - if (likely(result)) { - Py_INCREF(result); - } else { -#else - result = PyObject_GetItem(__pyx_d, name); - if (!result) { - PyErr_Clear(); -#endif - result = __Pyx_GetBuiltinName(name); - } - return result; -} - /* GetException */ #if CYTHON_FAST_THREAD_STATE static int __Pyx__GetException(PyThreadState *tstate, PyObject **type, PyObject **value, PyObject **tb) { diff --git a/pssh/native/_ssh2.pyx b/pssh/native/_ssh2.pyx index 3f420cec..36808134 100644 --- a/pssh/native/_ssh2.pyx +++ b/pssh/native/_ssh2.pyx @@ -33,7 +33,7 @@ from ssh2.sftp_handle cimport SFTPHandle from ssh2.exceptions import SFTPIOError from ssh2.utils cimport to_bytes -from ..exceptions import SessionError +from ..exceptions import SessionError, Timeout cdef bytes LINESEP = b'\n' @@ -54,7 +54,7 @@ def _read_output(Session session, read_func, timeout=None): _wait_select(_sock, _session, timeout) _size, _data = read_func() if timeout is not None and _size == LIBSSH2_ERROR_EAGAIN: - break + raise Timeout while _size > 0: while _pos < _size: linesep = _data[:_size].find(LINESEP, _pos) diff --git a/tests/test_pssh_ssh2_client.py b/tests/test_pssh_ssh2_client.py index 5443f4e2..bb0cfd6e 100644 --- a/tests/test_pssh_ssh2_client.py +++ b/tests/test_pssh_ssh2_client.py @@ -1193,14 +1193,43 @@ def test_join_timeout_set_no_timeout(self): def test_read_timeout(self): client = ParallelSSHClient([self.host], port=self.port, pkey=self.user_key) - output = client.run_command('sleep 2', timeout=1) - stdout = list(output[self.host].stdout) + output = client.run_command('sleep 2; echo me', timeout=1) + for host, host_out in output.items(): + self.assertRaises(Timeout, list, host_out.stdout) self.assertFalse(output[self.host].channel.eof()) - self.assertEqual(len(stdout), 0) - list(output[self.host].stdout) - list(output[self.host].stdout) client.join(output) - self.assertTrue(output[self.host].channel.eof()) + for host, host_out in output.items(): + stdout_buf = client.host_clients[self.host].read_output_buffer( + client.host_clients[self.host].read_output( + output[self.host].channel, timeout=1)) + host_out.stdout = stdout_buf + stdout = list(output[self.host].stdout) + self.assertEqual(len(stdout), 1) + + def test_timeout_file_read(self): + dir_name = os.path.dirname(__file__) + _file = os.sep.join((dir_name, 'file_to_read')) + contents = [b'a line\n' for _ in range(50)] + with open(_file, 'wb') as fh: + fh.writelines(contents) + try: + output = self.client.run_command('tail -f %s' % (_file,), + use_pty=True, + timeout=1) + self.assertRaises(Timeout, self.client.join, output, timeout=1) + for host, host_out in output.items(): + try: + for line in host_out.stdout: + pass + except Timeout: + pass + else: + raise Exception("Timeout should have been raised") + channel = output[self.host].channel + self.client.host_clients[self.host].close_channel(channel) + self.client.join(output) + finally: + os.unlink(_file) ## OpenSSHServer needs to run in its own thread for this test to work ## Race conditions otherwise.