Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python Wrapper: Objects generator #7110

Merged
merged 4 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions clients/python-wrapper/docs/lakefs.object_manager.rst

This file was deleted.

1 change: 0 additions & 1 deletion clients/python-wrapper/docs/lakefs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ Submodules
lakefs.models
lakefs.namedtuple
lakefs.object
lakefs.object_manager
lakefs.reference
lakefs.repository
lakefs.tag
Expand Down
11 changes: 9 additions & 2 deletions clients/python-wrapper/lakefs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@

from lakefs.repository import Repository
from lakefs.reference import Reference
from lakefs.models import Commit, Change, RepositoryProperties
from lakefs.models import (
Commit,
Change,
ImportStatus,
ServerStorageConfiguration,
ObjectInfo,
CommonPrefix,
RepositoryProperties
)
from lakefs.tag import Tag
from lakefs.branch import Branch
from lakefs.object import StoredObject, WriteableObject, ObjectReader
from lakefs.object_manager import ObjectManager
5 changes: 1 addition & 4 deletions clients/python-wrapper/lakefs/import_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,10 @@

import lakefs_sdk

from lakefs.models import ImportStatus
from lakefs.models import ImportStatus, _OBJECT, _PREFIX
from lakefs.client import Client, DEFAULT_CLIENT
from lakefs.exceptions import ImportManagerException, api_exception_handler

_PREFIX = "common_prefix"
_OBJECT = "object"


class ImportManager:
"""
Expand Down
13 changes: 11 additions & 2 deletions clients/python-wrapper/lakefs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

from lakefs.namedtuple import LenientNamedTuple

_PREFIX = "common_prefix"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's not from this PR, but I prefer common_prefix, prefix has a different meaning

Suggested change
_PREFIX = "common_prefix"
_COMMON_PREFIX = "common_prefix"

_OBJECT = "object"


class Commit(LenientNamedTuple):
"""
Expand Down Expand Up @@ -74,12 +77,11 @@ class ServerStorageConfiguration(LenientNamedTuple):
default_namespace_prefix: Optional[str] = None


class ObjectStats(LenientNamedTuple):
class ObjectInfo(LenientNamedTuple):
"""
Represent a lakeFS object's stats
"""
path: str
path_type: str
physical_address: str
checksum: str
mtime: int
Expand All @@ -89,6 +91,13 @@ class ObjectStats(LenientNamedTuple):
content_type: Optional[str] = None


class CommonPrefix(LenientNamedTuple):
"""
Represents a common prefix in lakeFS
"""
path: str

Comment on lines +94 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's still wanted, but on the initial PRD CommonPrefix has an exists method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, both ObjectInfo and CommonPrefix are data models and do not contain logic. For now it feels redundant providing them with functionality that is accessible via the branch / object level methods. Lets see if there's demand for that and add it as needed


class RepositoryProperties(LenientNamedTuple):
"""
Represent a lakeFS repository's properties
Expand Down
12 changes: 6 additions & 6 deletions clients/python-wrapper/lakefs/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
PermissionException,
ObjectExistsException,
)
from lakefs.models import ObjectStats
from lakefs.models import ObjectInfo

_LAKEFS_METADATA_PREFIX = "x-lakefs-meta-"
# _BUFFER_SIZE - Writer buffer size. While buffer size not exceed, data will be maintained in memory and file will
Expand Down Expand Up @@ -302,7 +302,7 @@ class ObjectWriter(LakeFSIOBase):
implicitly when using writer as a context.
"""
_fd: tempfile.SpooledTemporaryFile
_obj_stats: ObjectStats = None
_obj_stats: ObjectInfo = None

def __init__(self,
obj: StoredObject,
Expand Down Expand Up @@ -381,7 +381,7 @@ def close(self) -> None:
Write the data to the lakeFS server and close open descriptors
"""
stats = self._upload_presign() if self.pre_sign else self._upload_raw()
self._obj_stats = ObjectStats(**stats.dict())
self._obj_stats = ObjectInfo(**stats.dict())

self._fd.close()
super().close()
Expand Down Expand Up @@ -502,7 +502,7 @@ class StoredObject:
_repo_id: str
_ref_id: str
_path: str
_stats: Optional[ObjectStats] = None
_stats: Optional[ObjectInfo] = None

def __init__(self, repository: str, reference: str, path: str, client: Optional[Client] = DEFAULT_CLIENT):
self._client = client
Expand Down Expand Up @@ -548,14 +548,14 @@ def reader(self, mode: ReadModes = 'rb', pre_sign: Optional[bool] = None) -> Obj
"""
return ObjectReader(self, mode=mode, pre_sign=pre_sign, client=self._client)

def stat(self) -> ObjectStats:
def stat(self) -> ObjectInfo:
"""
Return the Stat object representing this object
"""
if self._stats is None:
with api_exception_handler(_io_exception_handler):
stat = self._client.sdk_client.objects_api.stat_object(self._repo_id, self._ref_id, self._path)
self._stats = ObjectStats(**stat.dict())
self._stats = ObjectInfo(**stat.dict())
return self._stats

def exists(self) -> bool:
Expand Down
10 changes: 0 additions & 10 deletions clients/python-wrapper/lakefs/object_manager.py

This file was deleted.

35 changes: 29 additions & 6 deletions clients/python-wrapper/lakefs/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@

import lakefs_sdk

from lakefs.models import Commit, Change
from lakefs.models import Commit, Change, CommonPrefix, ObjectInfo, _OBJECT
from lakefs.client import Client, DEFAULT_CLIENT
from lakefs.exceptions import api_exception_handler
from lakefs.object import StoredObject
from lakefs.object_manager import ObjectManager


class Reference:
Expand Down Expand Up @@ -43,12 +42,36 @@ def id(self) -> str:
"""
return self._id

@property
def objects(self) -> ObjectManager:
def objects(self,
max_amount: Optional[int] = None,
after: Optional[str] = None,
prefix: Optional[str] = None,
delimiter: Optional[str] = None,
**kwargs) -> Generator[StoredObject | CommonPrefix]:
"""
Returns a ObjectManager object for this reference
Returns an object generator for this reference
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

object/common_prefix or find a better way to phrase it (even though the name of the method is objects)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more doc information


:param max_amount: Stop showing changes after this amount
:param after: Return items after this value
:param prefix: Return items prefixed with this value
:param delimiter: Group common prefixes by this delimiter
:param kwargs: Additional Keyword Arguments to send to the server
:raises:
NotFoundException if this reference or other_ref does not exist
NotAuthorizedException if user is not authorized to perform this operation
ServerException for any other errors
"""
# TODO: Implement

for res in generate_listing(self._client.sdk_client.objects_api.list_objects,
repository=self._repo_id,
ref=self._id,
max_amount=max_amount,
after=after,
prefix=prefix,
delimiter=delimiter,
**kwargs):
type_class = ObjectInfo if res.path_type == _OBJECT else CommonPrefix
yield type_class(**res.dict())

def log(self, max_amount: Optional[int] = None, **kwargs) -> Generator[Commit]:
"""
Expand Down
1 change: 0 additions & 1 deletion clients/python-wrapper/tests/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ disable=
missing-function-docstring,
missing-module-docstring,
too-few-public-methods,
pointless-statement,
no-member,
fixme,

Expand Down
5 changes: 2 additions & 3 deletions clients/python-wrapper/tests/integration/test_branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ def test_reset_changes(setup_repo):
def test_delete_object_changes(setup_repo):
_, repo = setup_repo
test_branch = repo.branch("main")
path_and_data = ["a", "b", "bar/a", "bar/b", "bar/c", "c", "foo/a", "foo/b", "foo/c", ]
for s in path_and_data:
test_branch.object(s).upload(s)
path_and_data = ["a", "b", "bar/a", "bar/b", "bar/c", "c", "foo/a", "foo/b", "foo/c"]
upload_data(test_branch, path_and_data)
test_branch.commit("add some files", {"test_key": "test_value"})

test_branch.delete_objects("foo/a")
Expand Down
21 changes: 21 additions & 0 deletions clients/python-wrapper/tests/integration/test_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,24 @@ def test_reference_merge_into(setup_branch_with_commits):
branch.merge_into(other_branch.id, message="Merge2")
assert other_branch.commit_message() == "Merge2"
assert list(other_branch.log(max_amount=3))[2].id == commits[0].id


def test_reference_objects(setup_repo):
_, repo = setup_repo
test_branch = repo.branch("main")
path_and_data = ["a", "b", "bar/a", "bar/b", "bar/c", "c", "foo/a", "foo/b", "foo/c"]
for s in path_and_data:
test_branch.object(s).upload(s)

objects = list(test_branch.objects())
assert len(objects) == len(path_and_data)
for obj in objects:
assert obj.path in path_and_data

expected = ["a", "b", "bar/", "c", "foo/"]
i = 0
for obj in test_branch.objects(delimiter='/'):
i += 1
assert obj.path in expected

assert i == len(expected)
1 change: 0 additions & 1 deletion clients/python-wrapper/tests/integration/test_sanity.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ def test_object_sanity(setup_repo):

stats = obj.stat()
assert stats.path == path == obj.path
assert stats.path_type == "object"
assert stats.mtime <= time.time()
assert stats.size_bytes == len(data)
assert stats.metadata == metadata
Expand Down
4 changes: 2 additions & 2 deletions clients/python-wrapper/tests/utests/test_namedtuple.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ def test_namedtuple():
assert not nt2.field3

with expect_exception_context(AttributeError):
nt2.field4
nt2.field4 # pylint: disable=pointless-statement

with expect_exception_context(AttributeError):
nt2.field5
nt2.field5 # pylint: disable=pointless-statement

# Verify extra kwargs are in 'unknown' dict
assert nt2.unknown['field4'] == "something"
Expand Down
48 changes: 46 additions & 2 deletions clients/python-wrapper/tests/utests/test_reference.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import lakefs_sdk

from tests.utests.common import get_test_client

from lakefs import ObjectInfo, CommonPrefix
from lakefs.repository import Repository
from tests.utests.common import get_test_client, expect_exception_context


def get_test_ref():
Expand Down Expand Up @@ -118,3 +118,47 @@ def monkey_diff_refs(*_, **__):
idx = 0
max_amount = pages * items_per_page * 2
assert len(list(ref.diff(other_ref="other_ref", max_amount=max_amount))) == pages * items_per_page


def test_reference_objects(monkeypatch):
ref = get_test_ref()
with monkeypatch.context():
def monkey_list_objects(*_, **__):
results = []
for i in range(10):
if i % 2:
results.append(lakefs_sdk.ObjectStats(
path=f"path-{i}",
path_type="object",
physical_address=f"address-{i}",
checksum=f"{i}",
size_bytes=i,
mtime=i,
))
else:
results.append(lakefs_sdk.ObjectStats(
path=f"path-{i}",
path_type="common_prefix",
physical_address="?",
checksum="",
mtime=i,
))
return lakefs_sdk.ObjectStatsList(pagination=lakefs_sdk.Pagination(
has_more=False,
next_offset="",
max_per_page=1,
results=1),
results=results)

monkeypatch.setattr(ref._client.sdk_client.objects_api, "list_objects", monkey_list_objects)

for i, item in enumerate(ref.objects()):
if i % 2:
assert isinstance(item, ObjectInfo)
assert item.size_bytes == i
else:
assert isinstance(item, CommonPrefix)
with expect_exception_context(AttributeError):
item.checksum # pylint: disable=pointless-statement

assert item.path == f"path-{i}"
Loading