Skip to content

Commit

Permalink
Python Wrapper: Implement Import Manager (#7084)
Browse files Browse the repository at this point in the history
* Python Wrapper: Implement Import Manager

* fix test

* CR Fixes
  • Loading branch information
N-o-Z authored Dec 1, 2023
1 parent b720c98 commit 45adc44
Show file tree
Hide file tree
Showing 17 changed files with 453 additions and 95 deletions.
7 changes: 7 additions & 0 deletions clients/python-wrapper/docs/lakefs.import_manager.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
lakefs.import\_manager module
=============================

.. automodule:: lakefs.import_manager
:members:
:undoc-members:
:show-inheritance:
7 changes: 7 additions & 0 deletions clients/python-wrapper/docs/lakefs.models.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
lakefs.models module
====================

.. automodule:: lakefs.models
:members:
:undoc-members:
:show-inheritance:
2 changes: 2 additions & 0 deletions clients/python-wrapper/lakefs/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ max-line-length=120
max-locals=25
# Maximum number of arguments for function / method
max-args=10
# Maximum number of class attributes
max-attributes=10

[MESSAGES CONTROL]
disable=too-few-public-methods, fixme
Expand Down
5 changes: 3 additions & 2 deletions clients/python-wrapper/lakefs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
Allow importing of models from package root
"""

from lakefs.repository import Repository, RepositoryProperties
from lakefs.reference import Reference, Commit, Change
from lakefs.repository import Repository
from lakefs.reference import Reference
from lakefs.models import Commit, Change, RepositoryProperties
from lakefs.tag import Tag
from lakefs.branch import Branch
from lakefs.object import StoredObject, WriteableObject, ObjectReader
Expand Down
3 changes: 2 additions & 1 deletion clients/python-wrapper/lakefs/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from lakefs.object import WriteableObject
from lakefs.object import StoredObject
from lakefs.import_manager import ImportManager
from lakefs.reference import Reference, Change, generate_listing
from lakefs.reference import Reference, generate_listing
from lakefs.models import Change
from lakefs.exceptions import api_exception_handler, ConflictException, LakeFSException


Expand Down
16 changes: 1 addition & 15 deletions clients/python-wrapper/lakefs/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,12 @@

from lakefs.config import ClientConfig
from lakefs.exceptions import NoAuthenticationFound, NotAuthorizedException, ServerException
from lakefs.namedtuple import LenientNamedTuple
from lakefs.models import ServerStorageConfiguration

# global default client
DEFAULT_CLIENT: Optional[Client] = None


class ServerStorageConfiguration(LenientNamedTuple):
"""
Represent a lakeFS server's storage configuration
"""
blockstore_type: str
pre_sign_support: bool
import_support: bool
blockstore_namespace_example: str
blockstore_namespace_validity_regex: str
pre_sign_support_ui: bool
import_validity_regex: str
default_namespace_prefix: Optional[str] = None


class ServerConfiguration:
"""
Represent a lakeFS server's configuration
Expand Down
2 changes: 2 additions & 0 deletions clients/python-wrapper/lakefs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Client configuration module
"""

from __future__ import annotations

import os
from pathlib import Path

Expand Down
34 changes: 20 additions & 14 deletions clients/python-wrapper/lakefs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ class LakeFSException(Exception):
"""
Base exception for all SDK exceptions
"""


class ServerException(LakeFSException):
"""
Generic exception when no other exception is applicable
"""
status_code: int
reason: str

Expand All @@ -21,7 +27,7 @@ def __init__(self, status=None, reason=None):
self.message = reason


class NotFoundException(LakeFSException):
class NotFoundException(ServerException):
"""
Resource could not be found on lakeFS server
"""
Expand All @@ -32,37 +38,31 @@ def __init__(self, status=None, reason=None):
super().__init__(status, reason)


class ForbiddenException(LakeFSException):
class ForbiddenException(ServerException):
"""
Operation not permitted
"""


class NoAuthenticationFound(LakeFSException):
class NoAuthenticationFound(ServerException):
"""
Raised when no authentication method could be found on Client instantiation
"""


class NotAuthorizedException(LakeFSException):
class NotAuthorizedException(ServerException):
"""
User not authorized to perform operation
"""


class ServerException(LakeFSException):
"""
Generic exception when no other exception is applicable
"""


class UnsupportedOperationException(LakeFSException):
class UnsupportedOperationException(ServerException):
"""
Operation not supported by lakeFS server or SDK
"""


class ConflictException(LakeFSException):
class ConflictException(ServerException):
"""
Resource / request conflict
"""
Expand All @@ -74,7 +74,7 @@ class ObjectNotFoundException(NotFoundException, FileNotFoundError):
"""


class ObjectExistsException(LakeFSException, FileExistsError):
class ObjectExistsException(ServerException, FileExistsError):
"""
Raised when Object('...').create(mode='x') and object exists
"""
Expand All @@ -86,12 +86,18 @@ class PermissionException(NotAuthorizedException, PermissionError):
"""


class InvalidRangeException(LakeFSException, OSError):
class InvalidRangeException(ServerException, OSError):
"""
Raised when the reference could not be found in the lakeFS server
"""


class ImportManagerException(LakeFSException):
"""
Import manager exceptions that are not originated from the SDK
"""


_STATUS_CODE_TO_EXCEPTION = {
http.HTTPStatus.UNAUTHORIZED.value: NotAuthorizedException,
http.HTTPStatus.FORBIDDEN.value: ForbiddenException,
Expand Down
183 changes: 173 additions & 10 deletions clients/python-wrapper/lakefs/import_manager.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,186 @@
"""
Module implementing import logic
Import module provides a simpler interface to the lakeFS SDK import functionality
"""
from typing import Optional

from __future__ import annotations

import asyncio
from datetime import timedelta
from typing import Optional, Dict, List

import lakefs_sdk

from lakefs.models import ImportStatus
from lakefs.client import Client, DEFAULT_CLIENT
from lakefs.exceptions import ImportManagerException, api_exception_handler

_PREFIX = "common_prefix"
_OBJECT = "object"


class ImportManager:
"""
Manage an import operation on a given repository
ImportManager provides an easy-to-use interface to perform imports with multiple sources.
It provides both synchronous and asynchronous functionality allowing the user to start an import process,
continue executing logic and poll for the import completion.
"""
_client: Client
_repo_id: str
_branch_id: str
_in_progress: bool = False
_import_id: str = None
commit_message: str
commit_metadata: Optional[Dict]
sources: List[lakefs_sdk.ImportLocation]

def __init__(self, repository_id: str, reference_id: str, commit_message: Optional[str] = None,
metadata: dict = None,
client: Client = DEFAULT_CLIENT):
def __init__(self, repository_id: str, branch_id: str, commit_message: Optional[str] = "",
commit_metadata: Optional[Dict] = None, client: Optional[Client] = DEFAULT_CLIENT) -> None:
self._client = client
self._repo_id = repository_id
self._ref_id = reference_id
self._commit_message = commit_message
self._metadata = metadata
self._branch_id = branch_id
self.commit_message = commit_message
self.commit_metadata = commit_metadata
self.sources = []

@property
def import_id(self) -> str:
"""
Returns the id of the current import process
"""
return self._import_id

def prefix(self, object_store_uri: str, destination: str) -> ImportManager:
"""
Creates a new import source of type "common_prefix" and adds it to the list of sources
:param object_store_uri: The URI from which to import the objects
:param destination: The destination prefix relative to the branch
:return: The ImportManager instance (self) after update, to allow operations chaining
"""
self.sources.append(lakefs_sdk.ImportLocation(type=_PREFIX, path=object_store_uri, destination=destination))
return self

def object(self, object_store_uri: str, destination: str) -> ImportManager:
"""
Creates a new import source of type "object" and adds it to the list of sources
:param object_store_uri: The URI from which to import the object
:param destination: The destination path for the object relative to the branch
:return: The ImportManager instance (self) after update, to allow operations chaining
"""
self.sources.append(lakefs_sdk.ImportLocation(type=_OBJECT, path=object_store_uri, destination=destination))
return self

def start(self) -> str:
"""
Start import, reporting back (and storing) a process id
:return: The import process identifier in lakeFS
:raises:
ImportManagerException if an import process is already in progress
NotFoundException if branch or repository do not exist
NotAuthorizedException if user is not authorized to perform this operation
ValidationError if path_type is not one of the allowed values
ServerException for any other errors
"""
if self._in_progress:
raise ImportManagerException("Import in progress")

creation = lakefs_sdk.ImportCreation(paths=self.sources,
commit=lakefs_sdk.CommitCreation(message=self.commit_message,
metadata=self.commit_metadata))
with api_exception_handler():
res = self._client.sdk_client.import_api.import_start(repository=self._repo_id,
branch=self._branch_id,
import_creation=creation)
self._import_id = res.id
self._in_progress = True

return self._import_id

async def _wait_for_completion(self, poll_interval: timedelta) -> lakefs_sdk.ImportStatus:
while True:
with api_exception_handler():
resp = self._client.sdk_client.import_api.import_status(repository=self._repo_id,
branch=self._branch_id,
id=self._import_id)
if resp.completed:
return resp
if resp.error is not None:
raise ImportManagerException(f"Import Error: {resp.error.message}")

await asyncio.sleep(poll_interval.total_seconds())

def wait(self, poll_interval: Optional[timedelta] = timedelta(seconds=2)) -> ImportStatus:
"""
Poll a started import task ID, blocking until completion
:param poll_interval: The interval for polling the import status.
:return: Import status as returned by the lakeFS server
:raises:
ImportManagerException if no import is in progress
NotFoundException if branch, repository or import id do not exist
NotAuthorizedException if user is not authorized to perform this operation
ServerException for any other errors
"""
if not self._in_progress:
raise ImportManagerException("No import in progress")

res = asyncio.run(self._wait_for_completion(poll_interval))
self._in_progress = False
self.sources = []
return ImportStatus(**res.dict())

def run(self, poll_interval: Optional[timedelta] = None) -> ImportStatus:
"""
Same as calling start() and then wait()
:param poll_interval: The interval for polling the import status.
:return: Import status as returned by the lakeFS server
:raises:
See start(), wait()
"""
self.start()
wait_kwargs = {} if poll_interval is None else {"poll_interval": poll_interval}
return self.wait(**wait_kwargs)

def cancel(self) -> None:
"""
Cancel an ongoing import process
:raises:
NotFoundException if branch, repository or import id do not exist
NotAuthorizedException if user is not authorized to perform this operation
ConflictException if the import was already completed
ServerException for any other errors
"""
if self._import_id is None: # Can't cancel on no id
raise ImportManagerException("No import in progress")

with api_exception_handler():
self._client.sdk_client.import_api.import_cancel(repository=self._repo_id,
branch=self._branch_id,
id=self._import_id)
self._in_progress = False
self.sources = []

def status(self) -> ImportStatus:
"""
Get the current import status
:return: Import status as returned by the lakeFS server
:raises:
ImportManagerException if no import is in progress
NotFoundException if branch, repository or import id do not exist
NotAuthorizedException if user is not authorized to perform this operation
ServerException for any other errors
"""

if self._import_id is None:
raise ImportManagerException("No import in progress")

# TODO: Implement
with api_exception_handler():
res = self._client.sdk_client.import_api.import_status(repository=self._repo_id,
branch=self._branch_id,
id=self._import_id)
return ImportStatus(**res.dict())
Loading

0 comments on commit 45adc44

Please sign in to comment.