From e8b0c33fbcd82ed4c8156496e06a71781cfa600d Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Thu, 28 Mar 2024 17:30:57 +0100 Subject: [PATCH] Create missing directories before copying the file --- dlt/destinations/impl/filesystem/factory.py | 3 +- .../impl/filesystem/filesystem.py | 32 ++++++++++++++----- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/dlt/destinations/impl/filesystem/factory.py b/dlt/destinations/impl/filesystem/factory.py index eb17e43a83..1226f009b6 100644 --- a/dlt/destinations/impl/filesystem/factory.py +++ b/dlt/destinations/impl/filesystem/factory.py @@ -5,7 +5,6 @@ from dlt.destinations.impl.filesystem import capabilities from dlt.common.destination import Destination, DestinationCapabilitiesContext from dlt.common.storages.configuration import FileSystemCredentials -from dlt.destinations.path_utils import PathParams if t.TYPE_CHECKING: from dlt.destinations.impl.filesystem.filesystem import FilesystemClient @@ -32,7 +31,7 @@ def __init__( current_datetime: t.Optional[datetime] = None, datetime_format: t.Optional[str] = None, extra_params: t.Optional[t.Dict[str, t.Any]] = None, - suffix_fn: t.Optional[t.Callable[[PathParams], str]] = None, + suffix_fn: t.Optional[t.Callable[[], str]] = None, **kwargs: t.Any, ) -> None: """Configure the filesystem destination to use in a pipeline and load data to local or remote filesystem. diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 4b88f5ae26..58d16fe0e5 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -42,13 +42,19 @@ def __init__( file_name = FileStorage.get_file_name_from_file_path(local_path) self.config = config self.dataset_path = dataset_path - self.destination_file_name = make_filename(config, file_name, schema_name, load_id) + self.destination_file_name = make_filename( + config, file_name, schema_name, load_id + ) super().__init__(file_name) fs_client, _ = fsspec_from_config(config) - self.destination_file_name = make_filename(config, file_name, schema_name, load_id) + self.destination_file_name = make_filename( + config, file_name, schema_name, load_id + ) item = self.make_remote_path() + dir_path = self.make_remote_path(only_dir=True) + fs_client.makedirs(dir_path, exist_ok=True) fs_client.put_file(local_path, item) @staticmethod @@ -78,10 +84,16 @@ def make_destination_filename( # return layout_helper.create_path() pass - def make_remote_path(self) -> str: - return ( - f"{self.config.protocol}://{posixpath.join(self.dataset_path, self.destination_file_name)}" - ) + def make_remote_path(self, only_dir: bool = False) -> str: + if only_dir: + dir_path = FileStorage.get_dir_name_from_file_path( + self.destination_file_name + ) + return ( + f"{self.config.protocol}://{posixpath.join(self.dataset_path, dir_path)}" + ) + else: + return f"{self.config.protocol}://{posixpath.join(self.dataset_path, self.destination_file_name)}" def state(self) -> TLoadJobState: return "completed" @@ -164,7 +176,9 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: # NOTE: without refresh you get random results here logger.info(f"Will truncate tables in {truncate_dir}") try: - all_files = self.fs_client.ls(truncate_dir, detail=False, refresh=True) + all_files = self.fs_client.ls( + truncate_dir, detail=False, refresh=True + ) # logger.debug(f"Found {len(all_files)} CANDIDATE files in {truncate_dir}") # print(f"in truncate dir {truncate_dir}: {all_files}") for item in all_files: @@ -212,7 +226,9 @@ def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]: def is_storage_initialized(self) -> bool: return self.fs_client.isdir(self.dataset_path) # type: ignore[no-any-return] - def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: + def start_file_load( + self, table: TTableSchema, file_path: str, load_id: str + ) -> LoadJob: cls = FollowupFilesystemJob if self.config.as_staging else LoadFilesystemJob return cls( file_path,