Skip to content

Commit

Permalink
Python Wrapper: Make ImportManager single use
Browse files Browse the repository at this point in the history
  • Loading branch information
N-o-Z committed Dec 3, 2023
1 parent 3e33178 commit 07db384
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 12 deletions.
12 changes: 12 additions & 0 deletions clients/python-wrapper/lakefs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,22 @@
Allow importing of models from package root
"""

from lakefs.client import Client, DEFAULT_CLIENT
from lakefs.repository import Repository
from lakefs.reference import Reference
from lakefs.models import Commit, Change, RepositoryProperties
from lakefs.tag import Tag
from lakefs.branch import Branch
from lakefs.object import StoredObject, WriteableObject, ObjectReader
from lakefs.object_manager import ObjectManager


def repository(repository_id: str) -> Repository:
"""
Wrapper for getting a Repository object from the lakefs module.
Enable more fluid syntax (lakefs.repository("x").branch("y") instead of lakefs.Repository("x").branch("y"))
:param repository_id: The repository name
:return: Repository object representing a lakeFS repository with the give repository_id
"""
return Repository(repository_id)
2 changes: 1 addition & 1 deletion clients/python-wrapper/lakefs/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def uncommitted(self, max_amount: Optional[int], after: Optional[str] = None, pr
**kwargs):
yield Change(**diff.dict())

def import_data(self, commit_message: str, metadata: Optional[dict] = None) -> ImportManager:
def import_data(self, commit_message: Optional[str] = "", metadata: Optional[dict] = None) -> ImportManager:
"""
Import data to lakeFS
Expand Down
2 changes: 2 additions & 0 deletions clients/python-wrapper/lakefs/import_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ def start(self) -> str:
"""
if self._in_progress:
raise ImportManagerException("Import in progress")
if self._import_id is not None:
raise ImportManagerException("Import Manager can only be used once")

creation = lakefs_sdk.ImportCreation(paths=self.sources,
commit=lakefs_sdk.CommitCreation(message=self.commit_message,
Expand Down
25 changes: 22 additions & 3 deletions clients/python-wrapper/tests/integration/test_import.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from time import sleep

import pytest

from lakefs import Client
from lakefs.exceptions import ImportManagerException, ConflictException
from tests.utests.common import expect_exception_context

Expand All @@ -14,8 +17,14 @@
"nested/prefix-7/file000101", ]


def skip_on_unsupported_blockstore(clt: Client, supported_blockstores: [str]):
if clt.storage_config.blockstore_type not in supported_blockstores:
pytest.skip(f"Unsupported blockstore type for test: {clt.storage_config.blockstore_type}")


def test_import_manager(setup_repo):
_, repo = setup_repo
clt, repo = setup_repo
skip_on_unsupported_blockstore(clt, "s3")
branch = repo.branch("import-branch").create("main")
mgr = branch.import_data(commit_message="my imported data", metadata={"foo": "bar"})

Expand All @@ -32,17 +41,22 @@ def test_import_manager(setup_repo):
assert res.commit.metadata.get("foo") == "bar"
assert res.ingested_objects == 0

# Expect failure trying to run manager twice
with expect_exception_context(ImportManagerException):
mgr.run()

# Import with objects and prefixes
mgr = branch.import_data()
dest_prefix = "imported/new-prefix/"
mgr.prefix(_IMPORT_PATH + "prefix-1/",
dest_prefix + "prefix-1/").prefix(_IMPORT_PATH + "prefix-2/",
dest_prefix + "prefix-2/")
for o in _FILES_TO_CHECK:
mgr.object(_IMPORT_PATH + o, dest_prefix + o)

mgr.commit_message = "new commit"
mgr.commit_metadata = None
res = mgr.run()

assert res.error is None
assert res.completed
assert res.commit.id == branch.commit_id()
Expand All @@ -56,7 +70,8 @@ def test_import_manager(setup_repo):


def test_import_manager_cancel(setup_repo):
_, repo = setup_repo
clt, repo = setup_repo
skip_on_unsupported_blockstore(clt, "s3")
branch = repo.branch("import-branch").create("main")
expected_commit_id = branch.commit_id()
expected_commit_message = branch.commit_message()
Expand All @@ -66,6 +81,10 @@ def test_import_manager_cancel(setup_repo):

mgr.start()
sleep(1)

with expect_exception_context(ImportManagerException):
mgr.start()

mgr.cancel()

status = mgr.status()
Expand Down
13 changes: 7 additions & 6 deletions clients/python-wrapper/tests/integration/test_sanity.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

from tests.utests.common import expect_exception_context
from lakefs.exceptions import NotFoundException, ConflictException, ObjectNotFoundException
from lakefs import RepositoryProperties, WriteableObject
import lakefs


def test_repository_sanity(storage_namespace, setup_repo):
_, repo = setup_repo
repo = lakefs.repository(repo.properties.id) # test the lakefs.repository function works properly
default_branch = "main"
expected_properties = RepositoryProperties(id=repo.properties.id,
default_branch=default_branch,
storage_namespace=storage_namespace,
creation_date=repo.properties.creation_date)
expected_properties = lakefs.RepositoryProperties(id=repo.properties.id,
default_branch=default_branch,
storage_namespace=storage_namespace,
creation_date=repo.properties.creation_date)
assert repo.properties == expected_properties

# Create with allow exists
Expand Down Expand Up @@ -103,7 +104,7 @@ def test_object_sanity(setup_repo):
data = b"test_data"
path = "test_obj"
metadata = {"foo": "bar"}
obj = WriteableObject(repository=repo.properties.id, reference="main", path=path, client=clt).upload(
obj = lakefs.WriteableObject(repository=repo.properties.id, reference="main", path=path, client=clt).upload(
data=data, metadata=metadata)
with obj.reader() as fd:
assert fd.read() == data
Expand Down
4 changes: 2 additions & 2 deletions clients/python-wrapper/tests/utests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ def env_var_context():
os.environ = old_env


def get_test_repo() -> lakefs.repository.Repository:
def get_test_repo() -> lakefs.Repository:
from lakefs.client import Client
client = Client(username="test_user", password="test_password", host="http://127.0.0.1:8000")
return lakefs.repository.Repository(repository_id=TEST_REPO_ARGS.name, client=client)
return lakefs.Repository(repository_id=TEST_REPO_ARGS.name, client=client)


@contextmanager
Expand Down

0 comments on commit 07db384

Please sign in to comment.