From e33e216e1bf1a358d9d52f65316d8f7a2d1c6564 Mon Sep 17 00:00:00 2001 From: ddelange <14880945+ddelange@users.noreply.github.com> Date: Mon, 16 Dec 2024 19:41:20 +0100 Subject: [PATCH] Fix tests --- .github/workflows/python-package.yml | 10 ++++++ README.rst | 1 + smart_open/azure.py | 52 +++++++++++++++++----------- smart_open/hdfs.py | 25 ++++++++++--- smart_open/http.py | 11 ++++-- smart_open/s3.py | 12 +++++-- smart_open/tests/test_azure.py | 10 ++++++ smart_open/webhdfs.py | 8 +++++ 8 files changed, 100 insertions(+), 29 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index be6a7228..abd387cf 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -38,6 +38,7 @@ jobs: - {python-version: '3.10', os: windows-2019} - {python-version: '3.11', os: windows-2019} - {python-version: '3.12', os: windows-2019} + - {python-version: '3.13', os: windows-2019} steps: - uses: actions/checkout@v2 @@ -78,6 +79,9 @@ jobs: # - {python-version: '3.8', os: windows-2019} # - {python-version: '3.9', os: windows-2019} # - {python-version: '3.10', os: windows-2019} + # - {python-version: '3.11', os: windows-2019} + # - {python-version: '3.12', os: windows-2019} + # - {python-version: '3.13', os: windows-2019} steps: - uses: actions/checkout@v2 @@ -114,6 +118,9 @@ jobs: # - {python-version: '3.8', os: windows-2019} # - {python-version: '3.9', os: windows-2019} # - {python-version: '3.10', os: windows-2019} + # - {python-version: '3.11', os: windows-2019} + # - {python-version: '3.12', os: windows-2019} + # - {python-version: '3.13', os: windows-2019} steps: - uses: actions/checkout@v2 @@ -162,6 +169,9 @@ jobs: # - {python-version: '3.8', os: windows-2019} # - {python-version: '3.9', os: windows-2019} # - {python-version: '3.10', os: windows-2019} + # - {python-version: '3.11', os: windows-2019} + # - {python-version: '3.12', os: windows-2019} + # - {python-version: '3.13', os: windows-2019} steps: - uses: actions/checkout@v2 diff --git a/README.rst b/README.rst index c7060131..48a7ab48 100644 --- a/README.rst +++ b/README.rst @@ -227,6 +227,7 @@ The supported values for this parameter are: - ``disable`` - ``.gz`` - ``.bz2`` +- ``.zst`` By default, ``smart_open`` determines the compression algorithm to use based on the file extension. diff --git a/smart_open/azure.py b/smart_open/azure.py index 1c991f05..9f8c95a0 100644 --- a/smart_open/azure.py +++ b/smart_open/azure.py @@ -195,8 +195,9 @@ class Reader(io.BufferedIOBase): Implements the io.BufferedIOBase interface of the standard library. :raises azure.core.exceptions.ResourceNotFoundError: Raised when the blob to read from does not exist. - """ + _blob = None # so `closed` property works in case __init__ fails and __del__ is called + def __init__( self, container, @@ -207,9 +208,10 @@ def __init__( max_concurrency=DEFAULT_MAX_CONCURRENCY, ): self._container_name = container + self._blob_name = blob - self._blob = _get_blob_client(client, container, blob) # type: azure.storage.blob.BlobClient + self._blob = _get_blob_client(client, container, blob) if self._blob is None: raise azure.core.exceptions.ResourceNotFoundError( @@ -236,8 +238,13 @@ def __init__( def close(self): """Flush and close this stream.""" logger.debug("close: called") - self._blob = None - self._raw_reader = None + if not self.closed: + self._blob = None + self._raw_reader = None + + @property + def closed(self): + return self._blob is None def readable(self): """Return True if the stream can be read from.""" @@ -369,20 +376,26 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __str__(self): - return "(%s, %r, %r)" % (self.__class__.__name__, - self._container_name, - self._blob.blob_name) + return "(%s, %r, %r)" % ( + self.__class__.__name__, + self._container_name, + self._blob_name + ) def __repr__(self): return "%s(container=%r, blob=%r)" % ( - self.__class__.__name__, self._container_name, self._blob.blob_name, + self.__class__.__name__, + self._container_name, + self._blob_name, ) class Writer(io.BufferedIOBase): """Writes bytes to Azure Blob Storage. - Implements the io.BufferedIOBase interface of the standard library.""" + Implements the io.BufferedIOBase interface of the standard library. + """ + _blob = None # so `closed` property works in case __init__ fails and __del__ is called def __init__( self, @@ -392,21 +405,19 @@ def __init__( blob_kwargs=None, min_part_size=_DEFAULT_MIN_PART_SIZE, ): - self._is_closed = False self._container_name = container - - self._blob = _get_blob_client(client, container, blob) + self._blob_name = blob self._blob_kwargs = blob_kwargs or {} - # type: azure.storage.blob.BlobClient - self._min_part_size = min_part_size - self._total_size = 0 self._total_parts = 0 self._bytes_uploaded = 0 self._current_part = io.BytesIO() self._block_list = [] + # type: azure.storage.blob.BlobClient + self._blob = _get_blob_client(client, container, blob) + # # This member is part of the io.BufferedIOBase interface. # @@ -424,25 +435,26 @@ def terminate(self): logger.debug('%s: terminating multipart upload', self) if not self.closed: self._block_list = [] - self._is_closed = True + self._blob = None logger.debug('%s: terminated multipart upload', self) # # Override some methods from io.IOBase. # def close(self): + logger.debug("close: called") if not self.closed: logger.debug('%s: completing multipart upload', self) if self._current_part.tell() > 0: self._upload_part() self._blob.commit_block_list(self._block_list, **self._blob_kwargs) self._block_list = [] - self._is_closed = True + self._blob = None logger.debug('%s: completed multipart upload', self) @property def closed(self): - return self._is_closed + return self._blob is None def writable(self): """Return True if the stream supports writing.""" @@ -528,13 +540,13 @@ def __str__(self): return "(%s, %r, %r)" % ( self.__class__.__name__, self._container_name, - self._blob.blob_name + self._blob_name ) def __repr__(self): return "%s(container=%r, blob=%r, min_part_size=%r)" % ( self.__class__.__name__, self._container_name, - self._blob.blob_name, + self._blob_name, self._min_part_size ) diff --git a/smart_open/hdfs.py b/smart_open/hdfs.py index a247d3e3..1fc97e94 100644 --- a/smart_open/hdfs.py +++ b/smart_open/hdfs.py @@ -68,6 +68,7 @@ class CliRawInputBase(io.RawIOBase): Implements the io.RawIOBase interface of the standard library. """ + _sub = None # so `closed` property works in case __init__ fails and __del__ is called def __init__(self, uri): self._uri = uri @@ -84,8 +85,13 @@ def __init__(self, uri): def close(self): """Flush and close this stream.""" logger.debug("close: called") - self._sub.terminate() - self._sub = None + if not self.closed: + self._sub.terminate() + self._sub = None + + @property + def closed(self): + return self._sub is None def readable(self): """Return True if the stream can be read from.""" @@ -125,6 +131,8 @@ class CliRawOutputBase(io.RawIOBase): Implements the io.RawIOBase interface of the standard library. """ + _sub = None # so `closed` property works in case __init__ fails and __del__ is called + def __init__(self, uri): self._uri = uri self._sub = subprocess.Popen(["hdfs", "dfs", '-put', '-f', '-', self._uri], @@ -136,9 +144,16 @@ def __init__(self, uri): self.raw = None def close(self): - self.flush() - self._sub.stdin.close() - self._sub.wait() + logger.debug("close: called") + if not self.closed: + self.flush() + self._sub.stdin.close() + self._sub.wait() + self._sub = None + + @property + def closed(self): + return self._sub is None def flush(self): self._sub.stdin.flush() diff --git a/smart_open/http.py b/smart_open/http.py index 5e5b8140..9dfa1d5f 100644 --- a/smart_open/http.py +++ b/smart_open/http.py @@ -98,6 +98,8 @@ def open(uri, mode, kerberos=False, user=None, password=None, cert=None, class BufferedInputBase(io.BufferedIOBase): + response = None # so `closed` property works in case __init__ fails and __del__ is called + def __init__(self, url, mode='r', buffer_size=DEFAULT_BUFFER_SIZE, kerberos=False, user=None, password=None, cert=None, headers=None, session=None, timeout=None): @@ -149,8 +151,13 @@ def __init__(self, url, mode='r', buffer_size=DEFAULT_BUFFER_SIZE, def close(self): """Flush and close this stream.""" logger.debug("close: called") - self.response = None - self._read_iter = None + if not self.closed: + self.response = None + self._read_iter = None + + @property + def closed(self): + return self.response is None def readable(self): """Return True if the stream can be read from.""" diff --git a/smart_open/s3.py b/smart_open/s3.py index 60ae2a99..22796a4a 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -703,6 +703,7 @@ def __init__( def close(self): """Flush and close this stream.""" + logger.debug("close: called") pass def readable(self): @@ -869,6 +870,7 @@ class MultipartWriter(io.BufferedIOBase): """Writes bytes to S3 using the multi part API. Implements the io.BufferedIOBase interface of the standard library.""" + _upload_id = None # so `closed` property works in case __init__ fails and __del__ is called def __init__( self, @@ -925,6 +927,10 @@ def flush(self): # Override some methods from io.IOBase. # def close(self): + logger.debug("close: called") + if self.closed: + return + if self._buf.tell(): self._upload_next_part() @@ -1027,7 +1033,7 @@ def write(self, b: Buffer) -> int: def terminate(self): """Cancel the underlying multipart upload.""" - if self._upload_id is None: + if self.closed: return logger.debug('%s: terminating multipart upload', self) self._client.abort_multipart_upload( @@ -1112,6 +1118,7 @@ class SinglepartWriter(io.BufferedIOBase): This class buffers all of its input in memory until its `close` method is called. Only then will the data be written to S3 and the buffer is released.""" + _buf = None # so `closed` property works in case __init__ fails and __del__ is called def __init__( self, @@ -1147,7 +1154,8 @@ def flush(self): # Override some methods from io.IOBase. # def close(self): - if self._buf is None: + logger.debug("close: called") + if self.closed: return self._buf.seek(0) diff --git a/smart_open/tests/test_azure.py b/smart_open/tests/test_azure.py index a82dbf17..2eb23a0e 100644 --- a/smart_open/tests/test_azure.py +++ b/smart_open/tests/test_azure.py @@ -554,6 +554,16 @@ def test_read_blob_client(self): assert data == content + def test_nonexisting_container(self): + with self.assertRaises(azure.core.exceptions.ResourceNotFoundError): + with smart_open.azure.open( + 'thiscontainerdoesntexist', + 'mykey', + 'rb', + CLIENT + ) as fin: + fin.read() + class WriterTest(unittest.TestCase): """Test writing into Azure Blob files.""" diff --git a/smart_open/webhdfs.py b/smart_open/webhdfs.py index adf9231b..75804d7c 100644 --- a/smart_open/webhdfs.py +++ b/smart_open/webhdfs.py @@ -101,6 +101,8 @@ def convert_to_http_uri(parsed_uri): class BufferedInputBase(io.BufferedIOBase): + _buf = None # so `closed` property works in case __init__ fails and __del__ is called + def __init__(self, uri): self._uri = uri @@ -116,6 +118,12 @@ def __init__(self, uri): def close(self): """Flush and close this stream.""" logger.debug("close: called") + if not self.closed: + self._buf = None + + @property + def closed(self): + return self._buf is None def readable(self): """Return True if the stream can be read from."""