From 054bc162cd6abe695b6e0918082e665278854148 Mon Sep 17 00:00:00 2001 From: Nir Ozery Date: Sun, 3 Dec 2023 17:17:12 +0000 Subject: [PATCH 1/4] Python Wrapper: Make ImportManager single use --- clients/python-wrapper/lakefs/__init__.py | 12 +++++++++ clients/python-wrapper/lakefs/branch.py | 2 +- .../python-wrapper/lakefs/import_manager.py | 2 ++ .../tests/integration/test_import.py | 25 ++++++++++++++++--- .../tests/integration/test_sanity.py | 13 +++++----- clients/python-wrapper/tests/utests/common.py | 4 +-- 6 files changed, 46 insertions(+), 12 deletions(-) diff --git a/clients/python-wrapper/lakefs/__init__.py b/clients/python-wrapper/lakefs/__init__.py index 7515110a78e..ab09d57c047 100644 --- a/clients/python-wrapper/lakefs/__init__.py +++ b/clients/python-wrapper/lakefs/__init__.py @@ -2,6 +2,7 @@ Allow importing of models from package root """ +from lakefs.client import Client, DEFAULT_CLIENT from lakefs.repository import Repository from lakefs.reference import Reference from lakefs.models import Commit, Change, RepositoryProperties @@ -9,3 +10,14 @@ from lakefs.branch import Branch from lakefs.object import StoredObject, WriteableObject, ObjectReader from lakefs.object_manager import ObjectManager + + +def repository(repository_id: str) -> Repository: + """ + Wrapper for getting a Repository object from the lakefs module. + Enable more fluid syntax (lakefs.repository("x").branch("y") instead of lakefs.Repository("x").branch("y")) + + :param repository_id: The repository name + :return: Repository object representing a lakeFS repository with the give repository_id + """ + return Repository(repository_id) diff --git a/clients/python-wrapper/lakefs/branch.py b/clients/python-wrapper/lakefs/branch.py index b82ac744539..305a04c0a0b 100644 --- a/clients/python-wrapper/lakefs/branch.py +++ b/clients/python-wrapper/lakefs/branch.py @@ -153,7 +153,7 @@ def uncommitted(self, max_amount: Optional[int], after: Optional[str] = None, pr **kwargs): yield Change(**diff.dict()) - def import_data(self, commit_message: str, metadata: Optional[dict] = None) -> ImportManager: + def import_data(self, commit_message: Optional[str] = "", metadata: Optional[dict] = None) -> ImportManager: """ Import data to lakeFS diff --git a/clients/python-wrapper/lakefs/import_manager.py b/clients/python-wrapper/lakefs/import_manager.py index 0ff4490864b..6439d499bd1 100644 --- a/clients/python-wrapper/lakefs/import_manager.py +++ b/clients/python-wrapper/lakefs/import_manager.py @@ -85,6 +85,8 @@ def start(self) -> str: """ if self._in_progress: raise ImportManagerException("Import in progress") + if self._import_id is not None: + raise ImportManagerException("Import Manager can only be used once") creation = lakefs_sdk.ImportCreation(paths=self.sources, commit=lakefs_sdk.CommitCreation(message=self.commit_message, diff --git a/clients/python-wrapper/tests/integration/test_import.py b/clients/python-wrapper/tests/integration/test_import.py index caa22e515fb..d09610f9dc7 100644 --- a/clients/python-wrapper/tests/integration/test_import.py +++ b/clients/python-wrapper/tests/integration/test_import.py @@ -1,5 +1,8 @@ from time import sleep +import pytest + +from lakefs import Client from lakefs.exceptions import ImportManagerException, ConflictException from tests.utests.common import expect_exception_context @@ -14,8 +17,14 @@ "nested/prefix-7/file000101", ] +def skip_on_unsupported_blockstore(clt: Client, supported_blockstores: [str]): + if clt.storage_config.blockstore_type not in supported_blockstores: + pytest.skip(f"Unsupported blockstore type for test: {clt.storage_config.blockstore_type}") + + def test_import_manager(setup_repo): - _, repo = setup_repo + clt, repo = setup_repo + skip_on_unsupported_blockstore(clt, "s3") branch = repo.branch("import-branch").create("main") mgr = branch.import_data(commit_message="my imported data", metadata={"foo": "bar"}) @@ -32,17 +41,22 @@ def test_import_manager(setup_repo): assert res.commit.metadata.get("foo") == "bar" assert res.ingested_objects == 0 + # Expect failure trying to run manager twice + with expect_exception_context(ImportManagerException): + mgr.run() + # Import with objects and prefixes + mgr = branch.import_data() dest_prefix = "imported/new-prefix/" mgr.prefix(_IMPORT_PATH + "prefix-1/", dest_prefix + "prefix-1/").prefix(_IMPORT_PATH + "prefix-2/", dest_prefix + "prefix-2/") for o in _FILES_TO_CHECK: mgr.object(_IMPORT_PATH + o, dest_prefix + o) - mgr.commit_message = "new commit" mgr.commit_metadata = None res = mgr.run() + assert res.error is None assert res.completed assert res.commit.id == branch.commit_id() @@ -56,7 +70,8 @@ def test_import_manager(setup_repo): def test_import_manager_cancel(setup_repo): - _, repo = setup_repo + clt, repo = setup_repo + skip_on_unsupported_blockstore(clt, "s3") branch = repo.branch("import-branch").create("main") expected_commit_id = branch.commit_id() expected_commit_message = branch.commit_message() @@ -66,6 +81,10 @@ def test_import_manager_cancel(setup_repo): mgr.start() sleep(1) + + with expect_exception_context(ImportManagerException): + mgr.start() + mgr.cancel() status = mgr.status() diff --git a/clients/python-wrapper/tests/integration/test_sanity.py b/clients/python-wrapper/tests/integration/test_sanity.py index bfe9efdfa58..5ae37af26c3 100644 --- a/clients/python-wrapper/tests/integration/test_sanity.py +++ b/clients/python-wrapper/tests/integration/test_sanity.py @@ -2,16 +2,17 @@ from tests.utests.common import expect_exception_context from lakefs.exceptions import NotFoundException, ConflictException, ObjectNotFoundException -from lakefs import RepositoryProperties, WriteableObject +import lakefs def test_repository_sanity(storage_namespace, setup_repo): _, repo = setup_repo + repo = lakefs.repository(repo.properties.id) # test the lakefs.repository function works properly default_branch = "main" - expected_properties = RepositoryProperties(id=repo.properties.id, - default_branch=default_branch, - storage_namespace=storage_namespace, - creation_date=repo.properties.creation_date) + expected_properties = lakefs.RepositoryProperties(id=repo.properties.id, + default_branch=default_branch, + storage_namespace=storage_namespace, + creation_date=repo.properties.creation_date) assert repo.properties == expected_properties # Create with allow exists @@ -103,7 +104,7 @@ def test_object_sanity(setup_repo): data = b"test_data" path = "test_obj" metadata = {"foo": "bar"} - obj = WriteableObject(repository=repo.properties.id, reference="main", path=path, client=clt).upload( + obj = lakefs.WriteableObject(repository=repo.properties.id, reference="main", path=path, client=clt).upload( data=data, metadata=metadata) with obj.reader() as fd: assert fd.read() == data diff --git a/clients/python-wrapper/tests/utests/common.py b/clients/python-wrapper/tests/utests/common.py index 42bb69fc378..09d2fc96594 100644 --- a/clients/python-wrapper/tests/utests/common.py +++ b/clients/python-wrapper/tests/utests/common.py @@ -64,10 +64,10 @@ def env_var_context(): os.environ = old_env -def get_test_repo() -> lakefs.repository.Repository: +def get_test_repo() -> lakefs.Repository: from lakefs.client import Client client = Client(username="test_user", password="test_password", host="http://127.0.0.1:8000") - return lakefs.repository.Repository(repository_id=TEST_REPO_ARGS.name, client=client) + return lakefs.Repository(repository_id=TEST_REPO_ARGS.name, client=client) @contextmanager From 1e956f884a29948fe9c7317ff06cc2fb3bd5cd4b Mon Sep 17 00:00:00 2001 From: Nir Ozery Date: Tue, 5 Dec 2023 21:36:39 +0000 Subject: [PATCH 2/4] CF Fixes --- .../python-wrapper/lakefs/import_manager.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/clients/python-wrapper/lakefs/import_manager.py b/clients/python-wrapper/lakefs/import_manager.py index 2e0fe78b49d..b89f49189c2 100644 --- a/clients/python-wrapper/lakefs/import_manager.py +++ b/clients/python-wrapper/lakefs/import_manager.py @@ -46,6 +46,12 @@ def import_id(self) -> str: """ return self._import_id + def _append_source(self, import_location: lakefs_sdk.ImportLocation): + if self._import_id is not None: + raise ImportManagerException("Cannot add additional sources to an already started import") + + self.sources.append(import_location) + def prefix(self, object_store_uri: str, destination: str) -> ImportManager: """ Creates a new import source of type "common_prefix" and adds it to the list of sources @@ -54,8 +60,7 @@ def prefix(self, object_store_uri: str, destination: str) -> ImportManager: :param destination: The destination prefix relative to the branch :return: The ImportManager instance (self) after update, to allow operations chaining """ - self.sources.append( - lakefs_sdk.ImportLocation(type=_COMMON_PREFIX, path=object_store_uri, destination=destination)) + self._append_source(lakefs_sdk.ImportLocation(type=_COMMON_PREFIX, path=object_store_uri, destination=destination)) return self def object(self, object_store_uri: str, destination: str) -> ImportManager: @@ -66,7 +71,7 @@ def object(self, object_store_uri: str, destination: str) -> ImportManager: :param destination: The destination path for the object relative to the branch :return: The ImportManager instance (self) after update, to allow operations chaining """ - self.sources.append(lakefs_sdk.ImportLocation(type=_OBJECT, path=object_store_uri, destination=destination)) + self._append_source(lakefs_sdk.ImportLocation(type=_OBJECT, path=object_store_uri, destination=destination)) return self def start(self) -> str: @@ -123,12 +128,11 @@ def wait(self, poll_interval: Optional[timedelta] = timedelta(seconds=2)) -> Imp NotAuthorizedException if user is not authorized to perform this operation ServerException for any other errors """ - if not self._in_progress: + if self._import_id is None: raise ImportManagerException("No import in progress") res = asyncio.run(self._wait_for_completion(poll_interval)) self._in_progress = False - self.sources = [] return ImportStatus(**res.dict()) def run(self, poll_interval: Optional[timedelta] = None) -> ImportStatus: @@ -162,7 +166,6 @@ def cancel(self) -> None: branch=self._branch_id, id=self._import_id) self._in_progress = False - self.sources = [] def status(self) -> ImportStatus: """ @@ -183,4 +186,7 @@ def status(self) -> ImportStatus: res = self._client.sdk_client.import_api.import_status(repository=self._repo_id, branch=self._branch_id, id=self._import_id) + + if res.completed: + self._in_progress = False return ImportStatus(**res.dict()) From ff8713385a132eb694f352212c52330294499128 Mon Sep 17 00:00:00 2001 From: Nir Ozery Date: Tue, 5 Dec 2023 21:39:35 +0000 Subject: [PATCH 3/4] CF Fixes --- clients/python-wrapper/lakefs/import_manager.py | 4 +++- clients/python-wrapper/tests/integration/test_import.py | 4 ++-- clients/python-wrapper/tests/utests/test_import.py | 5 ++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/clients/python-wrapper/lakefs/import_manager.py b/clients/python-wrapper/lakefs/import_manager.py index b89f49189c2..5ee652891ce 100644 --- a/clients/python-wrapper/lakefs/import_manager.py +++ b/clients/python-wrapper/lakefs/import_manager.py @@ -60,7 +60,9 @@ def prefix(self, object_store_uri: str, destination: str) -> ImportManager: :param destination: The destination prefix relative to the branch :return: The ImportManager instance (self) after update, to allow operations chaining """ - self._append_source(lakefs_sdk.ImportLocation(type=_COMMON_PREFIX, path=object_store_uri, destination=destination)) + self._append_source(lakefs_sdk.ImportLocation(type=_COMMON_PREFIX, + path=object_store_uri, + destination=destination)) return self def object(self, object_store_uri: str, destination: str) -> ImportManager: diff --git a/clients/python-wrapper/tests/integration/test_import.py b/clients/python-wrapper/tests/integration/test_import.py index d09610f9dc7..ba66f442454 100644 --- a/clients/python-wrapper/tests/integration/test_import.py +++ b/clients/python-wrapper/tests/integration/test_import.py @@ -2,7 +2,7 @@ import pytest -from lakefs import Client +from lakefs.client import Client from lakefs.exceptions import ImportManagerException, ConflictException from tests.utests.common import expect_exception_context @@ -92,4 +92,4 @@ def test_import_manager_cancel(setup_repo): assert branch.commit_message() == expected_commit_message assert not status.completed assert "Canceled" in status.error.message - assert len(mgr.sources) == 0 + assert len(mgr.sources) == 1 diff --git a/clients/python-wrapper/tests/utests/test_import.py b/clients/python-wrapper/tests/utests/test_import.py index efbedc5824d..61530268485 100644 --- a/clients/python-wrapper/tests/utests/test_import.py +++ b/clients/python-wrapper/tests/utests/test_import.py @@ -49,6 +49,5 @@ def monkey_import_status(*_, **__): assert res.completed assert requests == 0 - # try again and expect error - with expect_exception_context(ImportManagerException): - mgr.wait() + # try again and expect no error + mgr.wait() From 73c0ca90395bb2280da42c4b6a8cb3fd48efdf3d Mon Sep 17 00:00:00 2001 From: Nir Ozery Date: Wed, 6 Dec 2023 08:41:32 +0000 Subject: [PATCH 4/4] CR Fixes 2 --- clients/python-wrapper/lakefs/branch.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/python-wrapper/lakefs/branch.py b/clients/python-wrapper/lakefs/branch.py index 305a04c0a0b..97c2304e8ec 100644 --- a/clients/python-wrapper/lakefs/branch.py +++ b/clients/python-wrapper/lakefs/branch.py @@ -158,7 +158,8 @@ def import_data(self, commit_message: Optional[str] = "", metadata: Optional[dic Import data to lakeFS :param metadata: metadata to attach to the commit - :param commit_message: once the data is imported, a commit is created with this message + :param commit_message: once the data is imported, a commit is created with this message. If default (empty) + message is provided, uses the default server commit message for imports. :return: an ImportManager object """ return ImportManager(self._repo_id, self._id, commit_message, metadata, self._client)