Skip to content

Commit

Permalink
Python Wrapper: Add ID field to repository model (#7124)
Browse files Browse the repository at this point in the history
  • Loading branch information
ozkatz authored Dec 10, 2023
1 parent 16ceae6 commit 3dca9c4
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 14 deletions.
13 changes: 8 additions & 5 deletions clients/python-wrapper/lakefs/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ def handle_conflict(e: LakeFSException):
if isinstance(e, ConflictException) and exist_ok:
return None
return e

branch_creation = lakefs_sdk.BranchCreation(name=self._id, source=str(source_reference_id))
if isinstance(source_reference_id, Reference):
source_reference_id = source_reference_id.id
branch_creation = lakefs_sdk.BranchCreation(name=self._id, source=source_reference_id)
with api_exception_handler(handle_conflict):
self._client.sdk_client.branches_api.create_branch(self._repo_id, branch_creation)
return self
Expand Down Expand Up @@ -192,10 +193,12 @@ def delete_objects(self, object_paths: str | StoredObject | Iterable[str | Store
:raise NotAuthorizedException: if user is not authorized to perform this operation
:raise ServerException: for any other errors
"""
if isinstance(object_paths, (str, StoredObject)):
object_paths = [str(object_paths)]
if isinstance(object_paths, str):
object_paths = [object_paths]
elif isinstance(object_paths, StoredObject):
object_paths = [object_paths.path]
elif isinstance(object_paths, Iterable):
object_paths = {str(o) for o in object_paths}
object_paths = [o.path if isinstance(o, StoredObject) else o for o in object_paths]
with api_exception_handler():
return self._client.sdk_client.objects_api.delete_objects(
self._repo_id,
Expand Down
9 changes: 9 additions & 0 deletions clients/python-wrapper/lakefs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class Change(LenientNamedTuple):
path_type: Literal["common_prefix", "object"]
size_bytes: Optional[int]

def __repr__(self):
return f'Change(type="{self.type}", path="{self.path}", path_type="{self.path_type}")'


class ImportStatus(LenientNamedTuple):
"""
Expand Down Expand Up @@ -90,13 +93,19 @@ class ObjectInfo(LenientNamedTuple):
metadata: Optional[dict[str, str]] = None
content_type: Optional[str] = None

def __repr__(self):
return f'ObjectInfo(path="{self.path}")'


class CommonPrefix(LenientNamedTuple):
"""
Represents a common prefix in lakeFS
"""
path: str

def __repr__(self):
return f'CommonPrefix(path="{self.path}")'


class RepositoryProperties(LenientNamedTuple):
"""
Expand Down
6 changes: 6 additions & 0 deletions clients/python-wrapper/lakefs/namedtuple.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ def __init__(self, **kwargs):
self.__initialized = True
super().__init__()

def __repr__(self):
class_name = self.__class__.__name__
if hasattr(self, 'id'):
return f'{class_name}(id="{self.id}")'
return f'{class_name}()'

def __setattr__(self, name, value):
if self.__initialized:
raise AttributeError("can't set attribute")
Expand Down
14 changes: 13 additions & 1 deletion clients/python-wrapper/lakefs/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ def _get_range_string(start, read_bytes=None):
return f"bytes={start}-"
return f"bytes={start}-{start + read_bytes - 1}"

def __str__(self):
return self._obj.path

def __repr__(self):
return f'ObjectReader(path="{self._obj.path}")'


class ObjectWriter(LakeFSIOBase):
"""
Expand Down Expand Up @@ -512,6 +518,9 @@ def read(self, n: int = None) -> str | bytes:
"""
raise io.UnsupportedOperation

def __repr__(self):
return f'ObjectWriter(path="{self._obj.path}")'


class StoredObject(_BaseLakeFSObject):
"""
Expand All @@ -532,7 +541,7 @@ def __str__(self) -> str:
return self.path

def __repr__(self):
return f"lakefs://{self._repo_id}/{self._ref_id}/{self._path}"
return f'StoredObject(repository="{self.repo}", reference="{self.ref}", path="{self.path}")'

@property
def repo(self) -> str:
Expand Down Expand Up @@ -643,6 +652,9 @@ def __init__(self, repository: str, reference: str, path: str,
client: Optional[Client] = None) -> None:
super().__init__(repository, reference, path, client=client)

def __repr__(self):
return f'WriteableObject(repository="{self.repo}", reference="{self.ref}", path="{self.path}")'

def upload(self,
data: str | bytes,
mode: WriteModes = 'wb',
Expand Down
15 changes: 9 additions & 6 deletions clients/python-wrapper/lakefs/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,13 @@ def diff(self,
:raise NotAuthorizedException: if user is not authorized to perform this operation
:raise ServerException: for any other errors
"""
other_ref_id = other_ref
if isinstance(other_ref, Reference):
other_ref_id = other_ref.id
for diff in generate_listing(self._client.sdk_client.refs_api.diff_refs,
repository=self._repo_id,
left_ref=self._id,
right_ref=str(other_ref),
right_ref=other_ref_id,
after=after,
max_amount=max_amount,
prefix=prefix,
Expand All @@ -153,11 +156,13 @@ def merge_into(self, destination_branch_id: str | Reference, **kwargs) -> str:
:raise NotAuthorizedException: if user is not authorized to perform this operation
:raise ServerException: for any other errors
"""
if isinstance(destination_branch_id, Reference):
destination_branch_id = destination_branch_id.id
with api_exception_handler():
merge = lakefs_sdk.Merge(**kwargs)
res = self._client.sdk_client.refs_api.merge_into_branch(self._repo_id,
self._id,
str(destination_branch_id),
destination_branch_id,
merge=merge)
return res.reference

Expand All @@ -169,11 +174,9 @@ def object(self, path: str) -> StoredObject: # pylint: disable=C0103
"""
return StoredObject(self._repo_id, self._id, path, self._client)

def __str__(self) -> str:
return self._id

def __repr__(self):
return f"lakefs://{self._repo_id}/{self._id}"
class_name = self.__class__.__name__
return f'{class_name}(repository="{self.repo_id}", id="{self.id}")'


def generate_listing(func, *args, max_amount: Optional[int] = None, **kwargs):
Expand Down
12 changes: 11 additions & 1 deletion clients/python-wrapper/lakefs/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def metadata(self) -> dict[str, str]:
@property
def properties(self) -> RepositoryProperties:
"""
Return the repositories properties object
Return the repository's properties object
"""
if self._properties is None:
with api_exception_handler():
Expand All @@ -161,6 +161,16 @@ def properties(self) -> RepositoryProperties:

return self._properties

@property
def id(self) -> str:
"""
Returns the repository's id
"""
return self._id

def __repr__(self) -> str:
return f'Repository(id="{self.id}")'


def repositories(client: Client = None,
prefix: Optional[str] = None,
Expand Down
4 changes: 3 additions & 1 deletion clients/python-wrapper/lakefs/tag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def create(self, source_ref_id: str | Reference, exist_ok: Optional[bool] = Fals
:raise NotFoundException: if source_ref_id doesn't exist on the lakeFS server
:raise ServerException: for any other errors.
"""
tag_creation = lakefs_sdk.TagCreation(id=self.id, ref=str(source_ref_id))
if isinstance(source_ref_id, Reference):
source_ref_id = source_ref_id.id
tag_creation = lakefs_sdk.TagCreation(id=self.id, ref=source_ref_id)

def handle_conflict(e: LakeFSException):
if not (isinstance(e, ConflictException) and exist_ok):
Expand Down

0 comments on commit 3dca9c4

Please sign in to comment.