Skip to content

Commit

Permalink
Python Wrapper: Handle no client exception (#7116)
Browse files Browse the repository at this point in the history
* Python Wrapper: Handle no client exception

* More fixes

* CR Fixes

* CR Fixes

* Fixes

* Update clients/python-wrapper/lakefs/client.py

Co-authored-by: Barak Amar <[email protected]>

---------

Co-authored-by: Barak Amar <[email protected]>
  • Loading branch information
N-o-Z and nopcoder authored Dec 6, 2023
1 parent 7a59eaa commit 0fbd104
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 107 deletions.
4 changes: 2 additions & 2 deletions clients/python-wrapper/lakefs/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Optional, Generator, Iterable, Literal

import lakefs_sdk
from lakefs.client import Client, DEFAULT_CLIENT
from lakefs.client import Client
from lakefs.object import WriteableObject
from lakefs.object import StoredObject
from lakefs.import_manager import ImportManager
Expand Down Expand Up @@ -234,7 +234,7 @@ class Transaction(Branch):
Manage a transactions on a given branch (TBD)
"""

def __init__(self, repository_id: str, branch_id: str, commit_message: str, client: Client = DEFAULT_CLIENT):
def __init__(self, repository_id: str, branch_id: str, commit_message: str, client: Client = None):
super().__init__(repository_id, branch_id, client)
self._commit_message = commit_message

Expand Down
43 changes: 27 additions & 16 deletions clients/python-wrapper/lakefs/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@
from __future__ import annotations

from typing import Optional
from threading import Lock

import lakefs_sdk
from lakefs_sdk.client import LakeFSClient

from lakefs.config import ClientConfig
from lakefs.exceptions import NoAuthenticationFound, NotAuthorizedException, ServerException
from lakefs.exceptions import NotAuthorizedException, ServerException
from lakefs.models import ServerStorageConfiguration

# global default client
DEFAULT_CLIENT: Optional[Client] = None


class ServerConfiguration:
"""
Expand All @@ -31,7 +29,7 @@ class ServerConfiguration:
_conf: lakefs_sdk.Config
_storage_conf: ServerStorageConfiguration

def __init__(self, client: Optional[Client] = DEFAULT_CLIENT):
def __init__(self, client: Optional[Client] = None):
try:
self._conf = client.sdk_client.config_api.get_config()
self._storage_conf = ServerStorageConfiguration(**self._conf.storage_config.dict())
Expand Down Expand Up @@ -113,16 +111,29 @@ def version(self) -> str:
return self._server_conf.version


try:
DEFAULT_CLIENT = Client()
except NoAuthenticationFound:
# must call init() explicitly
DEFAULT_CLIENT = None # pylint: disable=C0103


def init(**kwargs) -> None:
class _BaseLakeFSObject:
"""
Initialize DefaultClient using the provided parameters
Base class for all lakeFS SDK objects, holds the client object and handles errors where no authentication method
found for client. Attempts to reload client dynamically in case of changes in the environment.
"""
global DEFAULT_CLIENT # pylint: disable=W0603
DEFAULT_CLIENT = Client(**kwargs)
__mutex: Lock = Lock()
__client: Optional[Client] = None

def __init__(self, client: Optional[Client]):
self.__client = client

@property
def _client(self):
"""
If client is None due to missing authentication params, try to init again. If authentication method is still
missing - will raise exception
:return: The initialized client object
:raise NoAuthenticationFound: If no authentication method found to configure the lakeFS client with
"""
if self.__client is not None:
return self.__client

with _BaseLakeFSObject.__mutex:
if _BaseLakeFSObject.__client is None:
_BaseLakeFSObject.__client = Client()
return _BaseLakeFSObject.__client
2 changes: 1 addition & 1 deletion clients/python-wrapper/lakefs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ForbiddenException(ServerException):
"""


class NoAuthenticationFound(ServerException):
class NoAuthenticationFound(LakeFSException):
"""
Raised when no authentication method could be found on Client instantiation
"""
Expand Down
9 changes: 4 additions & 5 deletions clients/python-wrapper/lakefs/import_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
import lakefs_sdk

from lakefs.models import ImportStatus, _OBJECT, _COMMON_PREFIX
from lakefs.client import Client, DEFAULT_CLIENT
from lakefs.client import Client, _BaseLakeFSObject
from lakefs.exceptions import ImportManagerException, api_exception_handler


class ImportManager:
class ImportManager(_BaseLakeFSObject):
"""
ImportManager provides an easy-to-use interface to perform imports with multiple sources.
It provides both synchronous and asynchronous functionality allowing the user to start an import process,
Expand All @@ -38,7 +38,6 @@ class ImportManager:
mgr.run()
"""
_client: Client
_repo_id: str
_branch_id: str
_in_progress: bool = False
Expand All @@ -48,13 +47,13 @@ class ImportManager:
sources: List[lakefs_sdk.ImportLocation]

def __init__(self, repository_id: str, branch_id: str, commit_message: str = "",
commit_metadata: Optional[Dict] = None, client: Optional[Client] = DEFAULT_CLIENT) -> None:
self._client = client
commit_metadata: Optional[Dict] = None, client: Optional[Client] = None) -> None:
self._repo_id = repository_id
self._branch_id = branch_id
self.commit_message = commit_message
self.commit_metadata = commit_metadata
self.sources = []
super().__init__(client)

@property
def import_id(self) -> str:
Expand Down
24 changes: 11 additions & 13 deletions clients/python-wrapper/lakefs/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import urllib3
from lakefs_sdk import StagingMetadata

from lakefs.client import Client, DEFAULT_CLIENT
from lakefs.client import Client, _BaseLakeFSObject
from lakefs.exceptions import (
api_exception_handler,
handle_http_error,
Expand All @@ -41,25 +41,23 @@
WriteModes = Literal['x', 'xb', 'w', 'wb']


class LakeFSIOBase(IO):
class LakeFSIOBase(_BaseLakeFSObject, IO):
"""
Base class for the lakeFS Reader and Writer classes
"""

_client: Client
_obj: StoredObject
_mode: ReadModes
_pos: int
_pre_sign: Optional[bool] = None
_is_closed: bool = False

def __init__(self, obj: StoredObject, mode: Union[ReadModes, WriteModes], pre_sign: Optional[bool] = None,
client: Optional[Client] = DEFAULT_CLIENT) -> None:
client: Optional[Client] = None) -> None:
self._obj = obj
self._mode = mode
self._pre_sign = pre_sign if pre_sign is not None else client.storage_config.pre_sign_support
self._client = client
self._pos = 0
super().__init__(client)

@property
def mode(self) -> str:
Expand Down Expand Up @@ -187,7 +185,7 @@ class ObjectReader(LakeFSIOBase):
"""

def __init__(self, obj: StoredObject, mode: ReadModes, pre_sign: Optional[bool] = None,
client: Optional[Client] = DEFAULT_CLIENT) -> None:
client: Optional[Client] = None) -> None:
if mode not in get_args(ReadModes):
raise ValueError(f"invalid read mode: '{mode}'. ReadModes: {ReadModes}")

Expand Down Expand Up @@ -308,7 +306,7 @@ def __init__(self,
pre_sign: Optional[bool] = None,
content_type: Optional[str] = None,
metadata: Optional[dict[str, str]] = None,
client: Optional[Client] = DEFAULT_CLIENT) -> None:
client: Optional[Client] = None) -> None:

if 'x' in mode and obj.exists(): # Requires explicit create
raise ObjectExistsException
Expand Down Expand Up @@ -492,21 +490,20 @@ def read(self, n: int = None) -> str | bytes:
raise io.UnsupportedOperation


class StoredObject:
class StoredObject(_BaseLakeFSObject):
"""
Class representing an object in lakeFS.
"""
_client: Client
_repo_id: str
_ref_id: str
_path: str
_stats: Optional[ObjectInfo] = None

def __init__(self, repository: str, reference: str, path: str, client: Optional[Client] = DEFAULT_CLIENT):
self._client = client
def __init__(self, repository: str, reference: str, path: str, client: Optional[Client] = None):
self._repo_id = repository
self._ref_id = reference
self._path = path
super().__init__(client)

def __str__(self) -> str:
return self.path
Expand Down Expand Up @@ -619,7 +616,8 @@ class WriteableObject(StoredObject):
This Object is instantiated and returned upon invoking writer() on Branch reference type.
"""

def __init__(self, repository: str, reference: str, path: str, client: Optional[Client] = DEFAULT_CLIENT) -> None:
def __init__(self, repository: str, reference: str, path: str,
client: Optional[Client] = None) -> None:
super().__init__(repository, reference, path, client=client)

def upload(self,
Expand Down
9 changes: 4 additions & 5 deletions clients/python-wrapper/lakefs/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,23 @@
import lakefs_sdk

from lakefs.models import Commit, Change, CommonPrefix, ObjectInfo, _OBJECT
from lakefs.client import Client, DEFAULT_CLIENT
from lakefs.client import Client, _BaseLakeFSObject
from lakefs.exceptions import api_exception_handler
from lakefs.object import StoredObject


class Reference:
class Reference(_BaseLakeFSObject):
"""
Class representing a reference in lakeFS.
"""
_client: Client
_repo_id: str
_id: str
_commit: Optional[Commit] = None

def __init__(self, repo_id: str, ref_id: str, client: Optional[Client] = DEFAULT_CLIENT) -> None:
self._client = client
def __init__(self, repo_id: str, ref_id: str, client: Optional[Client] = None) -> None:
self._repo_id = repo_id
self._id = ref_id
super().__init__(client)

@property
def repo_id(self) -> str:
Expand Down
17 changes: 8 additions & 9 deletions clients/python-wrapper/lakefs/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,23 @@
from lakefs.models import RepositoryProperties
from lakefs.tag import Tag
from lakefs.branch import Branch
from lakefs.client import Client, DEFAULT_CLIENT
from lakefs.exceptions import api_exception_handler, ConflictException, LakeFSException, NoAuthenticationFound
from lakefs.client import Client, _BaseLakeFSObject
from lakefs.exceptions import api_exception_handler, ConflictException, LakeFSException
from lakefs.reference import Reference, generate_listing


class Repository:
class Repository(_BaseLakeFSObject):
"""
Class representing a Repository in lakeFS.
The Repository object provides the context for the other objects that are found in it.
Access to these objects should be done from this class
"""
_client: Client
_id: str
_properties: RepositoryProperties = None

def __init__(self, repository_id: str, client: Optional[Client] = DEFAULT_CLIENT) -> None:
def __init__(self, repository_id: str, client: Optional[Client] = None) -> None:
self._id = repository_id
self._client = client
super().__init__(client)

def create(self,
storage_namespace: str,
Expand Down Expand Up @@ -163,7 +162,7 @@ def properties(self) -> RepositoryProperties:
return self._properties


def repositories(client: Client = DEFAULT_CLIENT,
def repositories(client: Client = None,
prefix: Optional[str] = None,
after: Optional[str] = None,
**kwargs) -> Generator[Repository]:
Expand All @@ -175,8 +174,8 @@ def repositories(client: Client = DEFAULT_CLIENT,
:param after: Return items after this value
:return: A generator listing lakeFS repositories
"""
if client is None:
raise NoAuthenticationFound("Explicitly provide a client or invoke client module's init method")
if client is None: # Try to get default client
client = Client()

for res in generate_listing(client.sdk_client.repositories_api.list_repositories,
prefix=prefix,
Expand Down
4 changes: 2 additions & 2 deletions clients/python-wrapper/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def get_storage_namespace(test_name):


def _setup_repo(namespace, name, default_branch):
clt = client.DEFAULT_CLIENT
clt = client.Client()
repo_name = name + str(int(time.time()))
repo = Repository(repo_name, clt)
repo.create(storage_namespace=namespace, default_branch=default_branch)
Expand Down Expand Up @@ -56,7 +56,7 @@ def setup_branch_with_commits():

@pytest.fixture(name="pre_sign", scope="function")
def fixture_pre_sign(request):
clt = client.DEFAULT_CLIENT
clt = client.Client()
if request.param and not clt.storage_config.pre_sign_support:
pytest.skip("Storage adapter does not support pre-sign mode")
return request.param
3 changes: 2 additions & 1 deletion clients/python-wrapper/tests/integration/test_repository.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
import uuid
import pytest

import lakefs

from tests.integration.conftest import _setup_repo, get_storage_namespace
Expand Down
2 changes: 1 addition & 1 deletion clients/python-wrapper/tests/utests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def lakectl_test_config_context(monkey, tmp_path):
try:
yield client
finally:
client.DEFAULT_CLIENT = None
client._DEFAULT_CLIENT = None


@contextmanager
Expand Down
Loading

0 comments on commit 0fbd104

Please sign in to comment.