Skip to content

Commit

Permalink
fix: connection dropped for refresh-data [TCTC-6839] (#600)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sanix-Darker authored Oct 13, 2023
1 parent 4a18184 commit 9dcfb2e
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

### Fixed

- FTP: retry connection on `SSHException` while opening a remote url.

## [0.9.5] - 2023-04-19

### Added
Expand Down
30 changes: 24 additions & 6 deletions peakina/io/ftp/ftp_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import ftplib
import logging
import os
import re
import socket
import ssl
Expand All @@ -16,6 +17,8 @@
import paramiko

FTP_SCHEMES = ["ftp", "ftps", "sftp"]
_DEFAULT_MAX_TIMEOUT_SECONDS = 30
_DEFAULT_MAX_RETRY = 7

FTPClient = ftplib.FTP | paramiko.SFTPClient

Expand Down Expand Up @@ -73,7 +76,9 @@ def makepasv(self) -> tuple[str, int]:
def ftps_client(params: ParseResult) -> Generator[tuple[FTPS, str], None, None]:
ftps = FTPS()
try:
ftps.connect(host=params.hostname or "", port=params.port, timeout=3)
ftps.connect(
host=params.hostname or "", port=params.port, timeout=_DEFAULT_MAX_TIMEOUT_SECONDS
)
try:
ftps.prot_p()
ftps.login(user=params.username or "", passwd=params.password or "")
Expand All @@ -97,7 +102,7 @@ def ftp_client(params: ParseResult) -> Generator[tuple[ftplib.FTP, str], None, N
port = params.port or 21
ftp = ftplib.FTP()
try:
ftp.connect(host=params.hostname or "", port=port, timeout=3)
ftp.connect(host=params.hostname or "", port=port, timeout=_DEFAULT_MAX_TIMEOUT_SECONDS)
ftp.login(user=params.username or "", passwd=params.password or "")
yield ftp, params.path

Expand All @@ -117,13 +122,16 @@ def sftp_client(params: ParseResult) -> Generator[tuple[paramiko.SFTPClient, str
username=params.username,
password=params.password,
port=port,
timeout=3,
timeout=_DEFAULT_MAX_TIMEOUT_SECONDS,
)
sftp = ssh_client.open_sftp()
yield sftp, params.path

finally:
ssh_client.close()
# In cae of Exception, we don't want to raise it
with suppress(AttributeError):
logging.getLogger(__name__).warning("Unable to close the Connection the SSHConnection.")
ssh_client.close()


def _urlparse(url: str) -> ParseResult:
Expand Down Expand Up @@ -177,11 +185,21 @@ def _open(url: str) -> IO[bytes]:
return ret


def ftp_open(url: str, retry: int = 4) -> IO[bytes]: # type: ignore
def ftp_open(url: str, retry: int = _DEFAULT_MAX_RETRY) -> IO[bytes]: # type: ignore
for i in range(1, retry + 1):
try:
return _open(url)
except (AttributeError, OSError, ftplib.error_temp) as e:
except (AttributeError, OSError, ftplib.error_temp, paramiko.SSHException) as e:
log = logging.getLogger(__name__)

# FileNotFoundError inherits from OSError
# We need to log that we're not seeing the specified file
if isinstance(e, FileNotFoundError): # pragma: no cover
log.warning(
f"File '{os.path.basename(url)}' not available inside : "
f"'{os.path.dirname(urlparse(url).path)}' !"
)

sleep_time = 2 * i**2
logging.getLogger(__name__).warning(f"Retry #{i}: Sleeping {sleep_time}s because {e}")
sleep(sleep_time)
Expand Down
1 change: 0 additions & 1 deletion peakina/readers/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ def read_excel(
preview_offset: int = 0,
**kwargs: Any,
) -> pd.DataFrame:

df_or_dict: dict[str, pd.DataFrame] | pd.DataFrame = pd.read_excel(*args, **kwargs)

if isinstance(df_or_dict, dict): # multiple sheets
Expand Down
1 change: 0 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ def f(
skip_exception=None,
timeout=None,
):

if docker_pull:
print(f"Pulling {image} image")
docker.pull(image)
Expand Down
40 changes: 36 additions & 4 deletions tests/io/ftp/test_ftp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,20 @@
import os
import socket
import ssl
from urllib.parse import ParseResult

from paramiko.ssh_exception import SSHException
from pytest import fixture, raises
from pytest_mock import MockFixture

from peakina.io.ftp.ftp_utils import dir_mtimes, ftp_listdir, ftp_mtime, ftp_open
from peakina.io.ftp.ftp_utils import (
_DEFAULT_MAX_TIMEOUT_SECONDS,
dir_mtimes,
ftp_listdir,
ftp_mtime,
ftp_open,
sftp_client,
)


@fixture
Expand Down Expand Up @@ -59,6 +69,7 @@ def test_retry_open(mocker):
ftplib.error_temp("421 Could not create socket"),
AttributeError("'NoneType' object has no attribute 'sendall'"),
OSError("Random OSError"),
SSHException("Random connection dropped error"),
"ok",
]
mock_sleep = mocker.patch("peakina.io.ftp.ftp_utils.sleep")
Expand Down Expand Up @@ -102,7 +113,9 @@ def test_ftp_client(mocker):
url = "ftp://[email protected]:123/picha/chu.csv"
ftp_open(url)

mock_ftp_client.connect.assert_called_once_with(host="ondine.com", port=123, timeout=3)
mock_ftp_client.connect.assert_called_once_with(
host="ondine.com", port=123, timeout=_DEFAULT_MAX_TIMEOUT_SECONDS
)
mock_ftp_client.login.assert_called_once_with(passwd="", user="sacha")
mock_ftp_client.quit.assert_called_once()

Expand All @@ -122,7 +135,9 @@ def test_ftps_client(mocker):
url = "ftps://[email protected]:123/picha/chu.csv"
ftp_open(url)

mock_ftps_client.connect.assert_called_once_with(host="ondine.com", port=123, timeout=3)
mock_ftps_client.connect.assert_called_once_with(
host="ondine.com", port=123, timeout=_DEFAULT_MAX_TIMEOUT_SECONDS
)
mock_ftps_client.login.assert_called_once_with(passwd="", user="sacha")
mock_ftps_client.quit.assert_called_once()

Expand Down Expand Up @@ -167,7 +182,11 @@ def test_sftp_client(mocker):
ftp_open(url)

mock_ssh_client.connect.assert_called_once_with(
timeout=3, hostname="atat.com", port=666, username="id#de@me*de", password="randompass"
timeout=_DEFAULT_MAX_TIMEOUT_SECONDS,
hostname="atat.com",
port=666,
username="id#de@me*de",
password="randompass",
)
mock_ssh_client.open_sftp.assert_called_once()
mock_ssh_client.close.assert_called_once()
Expand All @@ -181,3 +200,16 @@ def test_sftp_client(mocker):
url = "sftp://id#de@me*de:[email protected]:666"
ftp_listdir(url)
cl_ftp.listdir.assert_called_once_with(".")


def test_sftp_client_silent_close(mocker: MockFixture) -> None:
invalid_params = ParseResult(scheme="", netloc="", path="", params="", query="", fragment="")
ssh_client = mocker.patch("paramiko.SSHClient")
ssh_client.return_value.close.side_effect = AttributeError("NoneType doesnt have .close()")

with sftp_client(invalid_params) as (sftp, _):
# This block should raise an exception due to invalid parameters
# The exception is expected to be suppressed by the context manager in the finally block
# So, the test will pass if no exception propagates beyond this point

assert sftp.get_channel()

0 comments on commit 9dcfb2e

Please sign in to comment.