From 1e956f884a29948fe9c7317ff06cc2fb3bd5cd4b Mon Sep 17 00:00:00 2001 From: Nir Ozery Date: Tue, 5 Dec 2023 21:36:39 +0000 Subject: [PATCH] CF Fixes --- .../python-wrapper/lakefs/import_manager.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/clients/python-wrapper/lakefs/import_manager.py b/clients/python-wrapper/lakefs/import_manager.py index 2e0fe78b49d..b89f49189c2 100644 --- a/clients/python-wrapper/lakefs/import_manager.py +++ b/clients/python-wrapper/lakefs/import_manager.py @@ -46,6 +46,12 @@ def import_id(self) -> str: """ return self._import_id + def _append_source(self, import_location: lakefs_sdk.ImportLocation): + if self._import_id is not None: + raise ImportManagerException("Cannot add additional sources to an already started import") + + self.sources.append(import_location) + def prefix(self, object_store_uri: str, destination: str) -> ImportManager: """ Creates a new import source of type "common_prefix" and adds it to the list of sources @@ -54,8 +60,7 @@ def prefix(self, object_store_uri: str, destination: str) -> ImportManager: :param destination: The destination prefix relative to the branch :return: The ImportManager instance (self) after update, to allow operations chaining """ - self.sources.append( - lakefs_sdk.ImportLocation(type=_COMMON_PREFIX, path=object_store_uri, destination=destination)) + self._append_source(lakefs_sdk.ImportLocation(type=_COMMON_PREFIX, path=object_store_uri, destination=destination)) return self def object(self, object_store_uri: str, destination: str) -> ImportManager: @@ -66,7 +71,7 @@ def object(self, object_store_uri: str, destination: str) -> ImportManager: :param destination: The destination path for the object relative to the branch :return: The ImportManager instance (self) after update, to allow operations chaining """ - self.sources.append(lakefs_sdk.ImportLocation(type=_OBJECT, path=object_store_uri, destination=destination)) + self._append_source(lakefs_sdk.ImportLocation(type=_OBJECT, path=object_store_uri, destination=destination)) return self def start(self) -> str: @@ -123,12 +128,11 @@ def wait(self, poll_interval: Optional[timedelta] = timedelta(seconds=2)) -> Imp NotAuthorizedException if user is not authorized to perform this operation ServerException for any other errors """ - if not self._in_progress: + if self._import_id is None: raise ImportManagerException("No import in progress") res = asyncio.run(self._wait_for_completion(poll_interval)) self._in_progress = False - self.sources = [] return ImportStatus(**res.dict()) def run(self, poll_interval: Optional[timedelta] = None) -> ImportStatus: @@ -162,7 +166,6 @@ def cancel(self) -> None: branch=self._branch_id, id=self._import_id) self._in_progress = False - self.sources = [] def status(self) -> ImportStatus: """ @@ -183,4 +186,7 @@ def status(self) -> ImportStatus: res = self._client.sdk_client.import_api.import_status(repository=self._repo_id, branch=self._branch_id, id=self._import_id) + + if res.completed: + self._in_progress = False return ImportStatus(**res.dict())