Skip to content

Commit

Permalink
Python Wrapper: Objects generator (#7110)
Browse files Browse the repository at this point in the history
* Python Wrapper: Objects generator

* fix test

* fix test 2

* CF Fixes
  • Loading branch information
N-o-Z authored Dec 5, 2023
1 parent f1d923a commit 6972ce5
Show file tree
Hide file tree
Showing 14 changed files with 130 additions and 48 deletions.
7 changes: 0 additions & 7 deletions clients/python-wrapper/docs/lakefs.object_manager.rst

This file was deleted.

1 change: 0 additions & 1 deletion clients/python-wrapper/docs/lakefs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ Submodules
lakefs.models
lakefs.namedtuple
lakefs.object
lakefs.object_manager
lakefs.reference
lakefs.repository
lakefs.tag
Expand Down
11 changes: 9 additions & 2 deletions clients/python-wrapper/lakefs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@

from lakefs.repository import Repository
from lakefs.reference import Reference
from lakefs.models import Commit, Change, RepositoryProperties
from lakefs.models import (
Commit,
Change,
ImportStatus,
ServerStorageConfiguration,
ObjectInfo,
CommonPrefix,
RepositoryProperties
)
from lakefs.tag import Tag
from lakefs.branch import Branch
from lakefs.object import StoredObject, WriteableObject, ObjectReader
from lakefs.object_manager import ObjectManager
8 changes: 3 additions & 5 deletions clients/python-wrapper/lakefs/import_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,10 @@

import lakefs_sdk

from lakefs.models import ImportStatus
from lakefs.models import ImportStatus, _OBJECT, _COMMON_PREFIX
from lakefs.client import Client, DEFAULT_CLIENT
from lakefs.exceptions import ImportManagerException, api_exception_handler

_PREFIX = "common_prefix"
_OBJECT = "object"


class ImportManager:
"""
Expand Down Expand Up @@ -57,7 +54,8 @@ def prefix(self, object_store_uri: str, destination: str) -> ImportManager:
:param destination: The destination prefix relative to the branch
:return: The ImportManager instance (self) after update, to allow operations chaining
"""
self.sources.append(lakefs_sdk.ImportLocation(type=_PREFIX, path=object_store_uri, destination=destination))
self.sources.append(
lakefs_sdk.ImportLocation(type=_COMMON_PREFIX, path=object_store_uri, destination=destination))
return self

def object(self, object_store_uri: str, destination: str) -> ImportManager:
Expand Down
13 changes: 11 additions & 2 deletions clients/python-wrapper/lakefs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

from lakefs.namedtuple import LenientNamedTuple

_COMMON_PREFIX = "common_prefix"
_OBJECT = "object"


class Commit(LenientNamedTuple):
"""
Expand Down Expand Up @@ -74,12 +77,11 @@ class ServerStorageConfiguration(LenientNamedTuple):
default_namespace_prefix: Optional[str] = None


class ObjectStats(LenientNamedTuple):
class ObjectInfo(LenientNamedTuple):
"""
Represent a lakeFS object's stats
"""
path: str
path_type: str
physical_address: str
checksum: str
mtime: int
Expand All @@ -89,6 +91,13 @@ class ObjectStats(LenientNamedTuple):
content_type: Optional[str] = None


class CommonPrefix(LenientNamedTuple):
"""
Represents a common prefix in lakeFS
"""
path: str


class RepositoryProperties(LenientNamedTuple):
"""
Represent a lakeFS repository's properties
Expand Down
12 changes: 6 additions & 6 deletions clients/python-wrapper/lakefs/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
PermissionException,
ObjectExistsException,
)
from lakefs.models import ObjectStats
from lakefs.models import ObjectInfo

_LAKEFS_METADATA_PREFIX = "x-lakefs-meta-"
# _BUFFER_SIZE - Writer buffer size. While buffer size not exceed, data will be maintained in memory and file will
Expand Down Expand Up @@ -302,7 +302,7 @@ class ObjectWriter(LakeFSIOBase):
implicitly when using writer as a context.
"""
_fd: tempfile.SpooledTemporaryFile
_obj_stats: ObjectStats = None
_obj_stats: ObjectInfo = None

def __init__(self,
obj: StoredObject,
Expand Down Expand Up @@ -381,7 +381,7 @@ def close(self) -> None:
Write the data to the lakeFS server and close open descriptors
"""
stats = self._upload_presign() if self.pre_sign else self._upload_raw()
self._obj_stats = ObjectStats(**stats.dict())
self._obj_stats = ObjectInfo(**stats.dict())

self._fd.close()
super().close()
Expand Down Expand Up @@ -502,7 +502,7 @@ class StoredObject:
_repo_id: str
_ref_id: str
_path: str
_stats: Optional[ObjectStats] = None
_stats: Optional[ObjectInfo] = None

def __init__(self, repository: str, reference: str, path: str, client: Optional[Client] = DEFAULT_CLIENT):
self._client = client
Expand Down Expand Up @@ -548,14 +548,14 @@ def reader(self, mode: ReadModes = 'rb', pre_sign: Optional[bool] = None) -> Obj
"""
return ObjectReader(self, mode=mode, pre_sign=pre_sign, client=self._client)

def stat(self) -> ObjectStats:
def stat(self) -> ObjectInfo:
"""
Return the Stat object representing this object
"""
if self._stats is None:
with api_exception_handler(_io_exception_handler):
stat = self._client.sdk_client.objects_api.stat_object(self._repo_id, self._ref_id, self._path)
self._stats = ObjectStats(**stat.dict())
self._stats = ObjectInfo(**stat.dict())
return self._stats

def exists(self) -> bool:
Expand Down
10 changes: 0 additions & 10 deletions clients/python-wrapper/lakefs/object_manager.py

This file was deleted.

36 changes: 30 additions & 6 deletions clients/python-wrapper/lakefs/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@

import lakefs_sdk

from lakefs.models import Commit, Change
from lakefs.models import Commit, Change, CommonPrefix, ObjectInfo, _OBJECT
from lakefs.client import Client, DEFAULT_CLIENT
from lakefs.exceptions import api_exception_handler
from lakefs.object import StoredObject
from lakefs.object_manager import ObjectManager


class Reference:
Expand Down Expand Up @@ -43,12 +42,37 @@ def id(self) -> str:
"""
return self._id

@property
def objects(self) -> ObjectManager:
def objects(self,
max_amount: Optional[int] = None,
after: Optional[str] = None,
prefix: Optional[str] = None,
delimiter: Optional[str] = None,
**kwargs) -> Generator[StoredObject | CommonPrefix]:
"""
Returns a ObjectManager object for this reference
Returns an object generator for this reference, the generator can yield either a StoredObject or a CommonPrefix
object depending on the listing parameters provided.
:param max_amount: Stop showing changes after this amount
:param after: Return items after this value
:param prefix: Return items prefixed with this value
:param delimiter: Group common prefixes by this delimiter
:param kwargs: Additional Keyword Arguments to send to the server
:raises:
NotFoundException if this reference or other_ref does not exist
NotAuthorizedException if user is not authorized to perform this operation
ServerException for any other errors
"""
# TODO: Implement

for res in generate_listing(self._client.sdk_client.objects_api.list_objects,
repository=self._repo_id,
ref=self._id,
max_amount=max_amount,
after=after,
prefix=prefix,
delimiter=delimiter,
**kwargs):
type_class = ObjectInfo if res.path_type == _OBJECT else CommonPrefix
yield type_class(**res.dict())

def log(self, max_amount: Optional[int] = None, **kwargs) -> Generator[Commit]:
"""
Expand Down
1 change: 0 additions & 1 deletion clients/python-wrapper/tests/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ disable=
missing-function-docstring,
missing-module-docstring,
too-few-public-methods,
pointless-statement,
no-member,
fixme,

Expand Down
5 changes: 2 additions & 3 deletions clients/python-wrapper/tests/integration/test_branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ def test_reset_changes(setup_repo):
def test_delete_object_changes(setup_repo):
_, repo = setup_repo
test_branch = repo.branch("main")
path_and_data = ["a", "b", "bar/a", "bar/b", "bar/c", "c", "foo/a", "foo/b", "foo/c", ]
for s in path_and_data:
test_branch.object(s).upload(s)
path_and_data = ["a", "b", "bar/a", "bar/b", "bar/c", "c", "foo/a", "foo/b", "foo/c"]
upload_data(test_branch, path_and_data)
test_branch.commit("add some files", {"test_key": "test_value"})

test_branch.delete_objects("foo/a")
Expand Down
21 changes: 21 additions & 0 deletions clients/python-wrapper/tests/integration/test_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,24 @@ def test_reference_merge_into(setup_branch_with_commits):
branch.merge_into(other_branch.id, message="Merge2")
assert other_branch.commit_message() == "Merge2"
assert list(other_branch.log(max_amount=3))[2].id == commits[0].id


def test_reference_objects(setup_repo):
_, repo = setup_repo
test_branch = repo.branch("main")
path_and_data = ["a", "b", "bar/a", "bar/b", "bar/c", "c", "foo/a", "foo/b", "foo/c"]
for s in path_and_data:
test_branch.object(s).upload(s)

objects = list(test_branch.objects())
assert len(objects) == len(path_and_data)
for obj in objects:
assert obj.path in path_and_data

expected = ["a", "b", "bar/", "c", "foo/"]
i = 0
for obj in test_branch.objects(delimiter='/'):
i += 1
assert obj.path in expected

assert i == len(expected)
1 change: 0 additions & 1 deletion clients/python-wrapper/tests/integration/test_sanity.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ def test_object_sanity(setup_repo):

stats = obj.stat()
assert stats.path == path == obj.path
assert stats.path_type == "object"
assert stats.mtime <= time.time()
assert stats.size_bytes == len(data)
assert stats.metadata == metadata
Expand Down
4 changes: 2 additions & 2 deletions clients/python-wrapper/tests/utests/test_namedtuple.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ def test_namedtuple():
assert not nt2.field3

with expect_exception_context(AttributeError):
nt2.field4
nt2.field4 # pylint: disable=pointless-statement

with expect_exception_context(AttributeError):
nt2.field5
nt2.field5 # pylint: disable=pointless-statement

# Verify extra kwargs are in 'unknown' dict
assert nt2.unknown['field4'] == "something"
Expand Down
48 changes: 46 additions & 2 deletions clients/python-wrapper/tests/utests/test_reference.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import lakefs_sdk

from tests.utests.common import get_test_client

from lakefs import ObjectInfo, CommonPrefix
from lakefs.repository import Repository
from tests.utests.common import get_test_client, expect_exception_context


def get_test_ref():
Expand Down Expand Up @@ -118,3 +118,47 @@ def monkey_diff_refs(*_, **__):
idx = 0
max_amount = pages * items_per_page * 2
assert len(list(ref.diff(other_ref="other_ref", max_amount=max_amount))) == pages * items_per_page


def test_reference_objects(monkeypatch):
ref = get_test_ref()
with monkeypatch.context():
def monkey_list_objects(*_, **__):
results = []
for i in range(10):
if i % 2:
results.append(lakefs_sdk.ObjectStats(
path=f"path-{i}",
path_type="object",
physical_address=f"address-{i}",
checksum=f"{i}",
size_bytes=i,
mtime=i,
))
else:
results.append(lakefs_sdk.ObjectStats(
path=f"path-{i}",
path_type="common_prefix",
physical_address="?",
checksum="",
mtime=i,
))
return lakefs_sdk.ObjectStatsList(pagination=lakefs_sdk.Pagination(
has_more=False,
next_offset="",
max_per_page=1,
results=1),
results=results)

monkeypatch.setattr(ref._client.sdk_client.objects_api, "list_objects", monkey_list_objects)

for i, item in enumerate(ref.objects()):
if i % 2:
assert isinstance(item, ObjectInfo)
assert item.size_bytes == i
else:
assert isinstance(item, CommonPrefix)
with expect_exception_context(AttributeError):
item.checksum # pylint: disable=pointless-statement

assert item.path == f"path-{i}"

0 comments on commit 6972ce5

Please sign in to comment.