From 6972ce5573463f3cdd6213a6de0007336678eaf5 Mon Sep 17 00:00:00 2001 From: N-o-Z Date: Tue, 5 Dec 2023 21:09:10 +0000 Subject: [PATCH] Python Wrapper: Objects generator (#7110) * Python Wrapper: Objects generator * fix test * fix test 2 * CF Fixes --- .../docs/lakefs.object_manager.rst | 7 --- clients/python-wrapper/docs/lakefs.rst | 1 - clients/python-wrapper/lakefs/__init__.py | 11 ++++- .../python-wrapper/lakefs/import_manager.py | 8 ++-- clients/python-wrapper/lakefs/models.py | 13 ++++- clients/python-wrapper/lakefs/object.py | 12 ++--- .../python-wrapper/lakefs/object_manager.py | 10 ---- clients/python-wrapper/lakefs/reference.py | 36 +++++++++++--- clients/python-wrapper/tests/.pylintrc | 1 - .../tests/integration/test_branch.py | 5 +- .../tests/integration/test_reference.py | 21 ++++++++ .../tests/integration/test_sanity.py | 1 - .../tests/utests/test_namedtuple.py | 4 +- .../tests/utests/test_reference.py | 48 ++++++++++++++++++- 14 files changed, 130 insertions(+), 48 deletions(-) delete mode 100644 clients/python-wrapper/docs/lakefs.object_manager.rst delete mode 100644 clients/python-wrapper/lakefs/object_manager.py diff --git a/clients/python-wrapper/docs/lakefs.object_manager.rst b/clients/python-wrapper/docs/lakefs.object_manager.rst deleted file mode 100644 index 97ceb3df9e4..00000000000 --- a/clients/python-wrapper/docs/lakefs.object_manager.rst +++ /dev/null @@ -1,7 +0,0 @@ -lakefs.object\_manager module -============================= - -.. automodule:: lakefs.object_manager - :members: - :undoc-members: - :show-inheritance: diff --git a/clients/python-wrapper/docs/lakefs.rst b/clients/python-wrapper/docs/lakefs.rst index df999d43025..b75ad4c681b 100644 --- a/clients/python-wrapper/docs/lakefs.rst +++ b/clients/python-wrapper/docs/lakefs.rst @@ -15,7 +15,6 @@ Submodules lakefs.models lakefs.namedtuple lakefs.object - lakefs.object_manager lakefs.reference lakefs.repository lakefs.tag diff --git a/clients/python-wrapper/lakefs/__init__.py b/clients/python-wrapper/lakefs/__init__.py index 7515110a78e..4810f6bc594 100644 --- a/clients/python-wrapper/lakefs/__init__.py +++ b/clients/python-wrapper/lakefs/__init__.py @@ -4,8 +4,15 @@ from lakefs.repository import Repository from lakefs.reference import Reference -from lakefs.models import Commit, Change, RepositoryProperties +from lakefs.models import ( + Commit, + Change, + ImportStatus, + ServerStorageConfiguration, + ObjectInfo, + CommonPrefix, + RepositoryProperties +) from lakefs.tag import Tag from lakefs.branch import Branch from lakefs.object import StoredObject, WriteableObject, ObjectReader -from lakefs.object_manager import ObjectManager diff --git a/clients/python-wrapper/lakefs/import_manager.py b/clients/python-wrapper/lakefs/import_manager.py index 0ff4490864b..e543cb7fdee 100644 --- a/clients/python-wrapper/lakefs/import_manager.py +++ b/clients/python-wrapper/lakefs/import_manager.py @@ -10,13 +10,10 @@ import lakefs_sdk -from lakefs.models import ImportStatus +from lakefs.models import ImportStatus, _OBJECT, _COMMON_PREFIX from lakefs.client import Client, DEFAULT_CLIENT from lakefs.exceptions import ImportManagerException, api_exception_handler -_PREFIX = "common_prefix" -_OBJECT = "object" - class ImportManager: """ @@ -57,7 +54,8 @@ def prefix(self, object_store_uri: str, destination: str) -> ImportManager: :param destination: The destination prefix relative to the branch :return: The ImportManager instance (self) after update, to allow operations chaining """ - self.sources.append(lakefs_sdk.ImportLocation(type=_PREFIX, path=object_store_uri, destination=destination)) + self.sources.append( + lakefs_sdk.ImportLocation(type=_COMMON_PREFIX, path=object_store_uri, destination=destination)) return self def object(self, object_store_uri: str, destination: str) -> ImportManager: diff --git a/clients/python-wrapper/lakefs/models.py b/clients/python-wrapper/lakefs/models.py index 11e93836415..40a1f06345c 100644 --- a/clients/python-wrapper/lakefs/models.py +++ b/clients/python-wrapper/lakefs/models.py @@ -9,6 +9,9 @@ from lakefs.namedtuple import LenientNamedTuple +_COMMON_PREFIX = "common_prefix" +_OBJECT = "object" + class Commit(LenientNamedTuple): """ @@ -74,12 +77,11 @@ class ServerStorageConfiguration(LenientNamedTuple): default_namespace_prefix: Optional[str] = None -class ObjectStats(LenientNamedTuple): +class ObjectInfo(LenientNamedTuple): """ Represent a lakeFS object's stats """ path: str - path_type: str physical_address: str checksum: str mtime: int @@ -89,6 +91,13 @@ class ObjectStats(LenientNamedTuple): content_type: Optional[str] = None +class CommonPrefix(LenientNamedTuple): + """ + Represents a common prefix in lakeFS + """ + path: str + + class RepositoryProperties(LenientNamedTuple): """ Represent a lakeFS repository's properties diff --git a/clients/python-wrapper/lakefs/object.py b/clients/python-wrapper/lakefs/object.py index a955ffd97aa..910e6990a1b 100644 --- a/clients/python-wrapper/lakefs/object.py +++ b/clients/python-wrapper/lakefs/object.py @@ -30,7 +30,7 @@ PermissionException, ObjectExistsException, ) -from lakefs.models import ObjectStats +from lakefs.models import ObjectInfo _LAKEFS_METADATA_PREFIX = "x-lakefs-meta-" # _BUFFER_SIZE - Writer buffer size. While buffer size not exceed, data will be maintained in memory and file will @@ -302,7 +302,7 @@ class ObjectWriter(LakeFSIOBase): implicitly when using writer as a context. """ _fd: tempfile.SpooledTemporaryFile - _obj_stats: ObjectStats = None + _obj_stats: ObjectInfo = None def __init__(self, obj: StoredObject, @@ -381,7 +381,7 @@ def close(self) -> None: Write the data to the lakeFS server and close open descriptors """ stats = self._upload_presign() if self.pre_sign else self._upload_raw() - self._obj_stats = ObjectStats(**stats.dict()) + self._obj_stats = ObjectInfo(**stats.dict()) self._fd.close() super().close() @@ -502,7 +502,7 @@ class StoredObject: _repo_id: str _ref_id: str _path: str - _stats: Optional[ObjectStats] = None + _stats: Optional[ObjectInfo] = None def __init__(self, repository: str, reference: str, path: str, client: Optional[Client] = DEFAULT_CLIENT): self._client = client @@ -548,14 +548,14 @@ def reader(self, mode: ReadModes = 'rb', pre_sign: Optional[bool] = None) -> Obj """ return ObjectReader(self, mode=mode, pre_sign=pre_sign, client=self._client) - def stat(self) -> ObjectStats: + def stat(self) -> ObjectInfo: """ Return the Stat object representing this object """ if self._stats is None: with api_exception_handler(_io_exception_handler): stat = self._client.sdk_client.objects_api.stat_object(self._repo_id, self._ref_id, self._path) - self._stats = ObjectStats(**stat.dict()) + self._stats = ObjectInfo(**stat.dict()) return self._stats def exists(self) -> bool: diff --git a/clients/python-wrapper/lakefs/object_manager.py b/clients/python-wrapper/lakefs/object_manager.py deleted file mode 100644 index d1eee06c1a7..00000000000 --- a/clients/python-wrapper/lakefs/object_manager.py +++ /dev/null @@ -1,10 +0,0 @@ -""" -Module implementing reference's object management logic -""" - - -class ObjectManager: - """ - Manage objects listing for a given repository - """ - # TODO: Implement diff --git a/clients/python-wrapper/lakefs/reference.py b/clients/python-wrapper/lakefs/reference.py index 36c77fd9ed7..79d8aa504bc 100644 --- a/clients/python-wrapper/lakefs/reference.py +++ b/clients/python-wrapper/lakefs/reference.py @@ -8,11 +8,10 @@ import lakefs_sdk -from lakefs.models import Commit, Change +from lakefs.models import Commit, Change, CommonPrefix, ObjectInfo, _OBJECT from lakefs.client import Client, DEFAULT_CLIENT from lakefs.exceptions import api_exception_handler from lakefs.object import StoredObject -from lakefs.object_manager import ObjectManager class Reference: @@ -43,12 +42,37 @@ def id(self) -> str: """ return self._id - @property - def objects(self) -> ObjectManager: + def objects(self, + max_amount: Optional[int] = None, + after: Optional[str] = None, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + **kwargs) -> Generator[StoredObject | CommonPrefix]: """ - Returns a ObjectManager object for this reference + Returns an object generator for this reference, the generator can yield either a StoredObject or a CommonPrefix + object depending on the listing parameters provided. + + :param max_amount: Stop showing changes after this amount + :param after: Return items after this value + :param prefix: Return items prefixed with this value + :param delimiter: Group common prefixes by this delimiter + :param kwargs: Additional Keyword Arguments to send to the server + :raises: + NotFoundException if this reference or other_ref does not exist + NotAuthorizedException if user is not authorized to perform this operation + ServerException for any other errors """ - # TODO: Implement + + for res in generate_listing(self._client.sdk_client.objects_api.list_objects, + repository=self._repo_id, + ref=self._id, + max_amount=max_amount, + after=after, + prefix=prefix, + delimiter=delimiter, + **kwargs): + type_class = ObjectInfo if res.path_type == _OBJECT else CommonPrefix + yield type_class(**res.dict()) def log(self, max_amount: Optional[int] = None, **kwargs) -> Generator[Commit]: """ diff --git a/clients/python-wrapper/tests/.pylintrc b/clients/python-wrapper/tests/.pylintrc index 79a51c08326..278290b517f 100644 --- a/clients/python-wrapper/tests/.pylintrc +++ b/clients/python-wrapper/tests/.pylintrc @@ -12,7 +12,6 @@ disable= missing-function-docstring, missing-module-docstring, too-few-public-methods, - pointless-statement, no-member, fixme, diff --git a/clients/python-wrapper/tests/integration/test_branch.py b/clients/python-wrapper/tests/integration/test_branch.py index 8dcbfb65a4a..2fa763e8947 100644 --- a/clients/python-wrapper/tests/integration/test_branch.py +++ b/clients/python-wrapper/tests/integration/test_branch.py @@ -50,9 +50,8 @@ def test_reset_changes(setup_repo): def test_delete_object_changes(setup_repo): _, repo = setup_repo test_branch = repo.branch("main") - path_and_data = ["a", "b", "bar/a", "bar/b", "bar/c", "c", "foo/a", "foo/b", "foo/c", ] - for s in path_and_data: - test_branch.object(s).upload(s) + path_and_data = ["a", "b", "bar/a", "bar/b", "bar/c", "c", "foo/a", "foo/b", "foo/c"] + upload_data(test_branch, path_and_data) test_branch.commit("add some files", {"test_key": "test_value"}) test_branch.delete_objects("foo/a") diff --git a/clients/python-wrapper/tests/integration/test_reference.py b/clients/python-wrapper/tests/integration/test_reference.py index e0a749b0058..82d2244382e 100644 --- a/clients/python-wrapper/tests/integration/test_reference.py +++ b/clients/python-wrapper/tests/integration/test_reference.py @@ -57,3 +57,24 @@ def test_reference_merge_into(setup_branch_with_commits): branch.merge_into(other_branch.id, message="Merge2") assert other_branch.commit_message() == "Merge2" assert list(other_branch.log(max_amount=3))[2].id == commits[0].id + + +def test_reference_objects(setup_repo): + _, repo = setup_repo + test_branch = repo.branch("main") + path_and_data = ["a", "b", "bar/a", "bar/b", "bar/c", "c", "foo/a", "foo/b", "foo/c"] + for s in path_and_data: + test_branch.object(s).upload(s) + + objects = list(test_branch.objects()) + assert len(objects) == len(path_and_data) + for obj in objects: + assert obj.path in path_and_data + + expected = ["a", "b", "bar/", "c", "foo/"] + i = 0 + for obj in test_branch.objects(delimiter='/'): + i += 1 + assert obj.path in expected + + assert i == len(expected) diff --git a/clients/python-wrapper/tests/integration/test_sanity.py b/clients/python-wrapper/tests/integration/test_sanity.py index bfe9efdfa58..a45ffbbe6fa 100644 --- a/clients/python-wrapper/tests/integration/test_sanity.py +++ b/clients/python-wrapper/tests/integration/test_sanity.py @@ -110,7 +110,6 @@ def test_object_sanity(setup_repo): stats = obj.stat() assert stats.path == path == obj.path - assert stats.path_type == "object" assert stats.mtime <= time.time() assert stats.size_bytes == len(data) assert stats.metadata == metadata diff --git a/clients/python-wrapper/tests/utests/test_namedtuple.py b/clients/python-wrapper/tests/utests/test_namedtuple.py index 265187208f2..ecf42fe8804 100644 --- a/clients/python-wrapper/tests/utests/test_namedtuple.py +++ b/clients/python-wrapper/tests/utests/test_namedtuple.py @@ -36,10 +36,10 @@ def test_namedtuple(): assert not nt2.field3 with expect_exception_context(AttributeError): - nt2.field4 + nt2.field4 # pylint: disable=pointless-statement with expect_exception_context(AttributeError): - nt2.field5 + nt2.field5 # pylint: disable=pointless-statement # Verify extra kwargs are in 'unknown' dict assert nt2.unknown['field4'] == "something" diff --git a/clients/python-wrapper/tests/utests/test_reference.py b/clients/python-wrapper/tests/utests/test_reference.py index 8ba925d9b8e..1e36e261964 100644 --- a/clients/python-wrapper/tests/utests/test_reference.py +++ b/clients/python-wrapper/tests/utests/test_reference.py @@ -1,8 +1,8 @@ import lakefs_sdk -from tests.utests.common import get_test_client - +from lakefs import ObjectInfo, CommonPrefix from lakefs.repository import Repository +from tests.utests.common import get_test_client, expect_exception_context def get_test_ref(): @@ -118,3 +118,47 @@ def monkey_diff_refs(*_, **__): idx = 0 max_amount = pages * items_per_page * 2 assert len(list(ref.diff(other_ref="other_ref", max_amount=max_amount))) == pages * items_per_page + + +def test_reference_objects(monkeypatch): + ref = get_test_ref() + with monkeypatch.context(): + def monkey_list_objects(*_, **__): + results = [] + for i in range(10): + if i % 2: + results.append(lakefs_sdk.ObjectStats( + path=f"path-{i}", + path_type="object", + physical_address=f"address-{i}", + checksum=f"{i}", + size_bytes=i, + mtime=i, + )) + else: + results.append(lakefs_sdk.ObjectStats( + path=f"path-{i}", + path_type="common_prefix", + physical_address="?", + checksum="", + mtime=i, + )) + return lakefs_sdk.ObjectStatsList(pagination=lakefs_sdk.Pagination( + has_more=False, + next_offset="", + max_per_page=1, + results=1), + results=results) + + monkeypatch.setattr(ref._client.sdk_client.objects_api, "list_objects", monkey_list_objects) + + for i, item in enumerate(ref.objects()): + if i % 2: + assert isinstance(item, ObjectInfo) + assert item.size_bytes == i + else: + assert isinstance(item, CommonPrefix) + with expect_exception_context(AttributeError): + item.checksum # pylint: disable=pointless-statement + + assert item.path == f"path-{i}"