Skip to content

Commit

Permalink
Create missing directories before copying the file
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Apr 2, 2024
1 parent c300425 commit be088ee
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
3 changes: 1 addition & 2 deletions dlt/destinations/impl/filesystem/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from dlt.destinations.impl.filesystem import capabilities
from dlt.common.destination import Destination, DestinationCapabilitiesContext
from dlt.common.storages.configuration import FileSystemCredentials
from dlt.destinations.path_utils import PathParams

if t.TYPE_CHECKING:
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient
Expand All @@ -32,7 +31,7 @@ def __init__(
current_datetime: t.Optional[datetime] = None,
datetime_format: t.Optional[str] = None,
extra_params: t.Optional[t.Dict[str, t.Any]] = None,
suffix_fn: t.Optional[t.Callable[[PathParams], str]] = None,
suffix_fn: t.Optional[t.Callable[[], str]] = None,
**kwargs: t.Any,
) -> None:
"""Configure the filesystem destination to use in a pipeline and load data to local or remote filesystem.
Expand Down
32 changes: 24 additions & 8 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,19 @@ def __init__(
file_name = FileStorage.get_file_name_from_file_path(local_path)
self.config = config
self.dataset_path = dataset_path
self.destination_file_name = make_filename(config, file_name, schema_name, load_id)
self.destination_file_name = make_filename(
config, file_name, schema_name, load_id
)

super().__init__(file_name)
fs_client, _ = fsspec_from_config(config)
self.destination_file_name = make_filename(config, file_name, schema_name, load_id)
self.destination_file_name = make_filename(
config, file_name, schema_name, load_id
)

item = self.make_remote_path()
dir_path = self.make_remote_path(only_dir=True)
fs_client.makedirs(dir_path, exist_ok=True)
fs_client.put_file(local_path, item)

@staticmethod
Expand Down Expand Up @@ -78,10 +84,16 @@ def make_destination_filename(
# return layout_helper.create_path()
pass

def make_remote_path(self) -> str:
return (
f"{self.config.protocol}://{posixpath.join(self.dataset_path, self.destination_file_name)}"
)
def make_remote_path(self, only_dir: bool = False) -> str:
if only_dir:
dir_path = FileStorage.get_dir_name_from_file_path(
self.destination_file_name
)
return (
f"{self.config.protocol}://{posixpath.join(self.dataset_path, dir_path)}"
)
else:
return f"{self.config.protocol}://{posixpath.join(self.dataset_path, self.destination_file_name)}"

def state(self) -> TLoadJobState:
return "completed"
Expand Down Expand Up @@ -164,7 +176,9 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
# NOTE: without refresh you get random results here
logger.info(f"Will truncate tables in {truncate_dir}")
try:
all_files = self.fs_client.ls(truncate_dir, detail=False, refresh=True)
all_files = self.fs_client.ls(
truncate_dir, detail=False, refresh=True
)
# logger.debug(f"Found {len(all_files)} CANDIDATE files in {truncate_dir}")
# print(f"in truncate dir {truncate_dir}: {all_files}")
for item in all_files:
Expand Down Expand Up @@ -212,7 +226,9 @@ def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]:
def is_storage_initialized(self) -> bool:
return self.fs_client.isdir(self.dataset_path) # type: ignore[no-any-return]

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
def start_file_load(
self, table: TTableSchema, file_path: str, load_id: str
) -> LoadJob:
cls = FollowupFilesystemJob if self.config.as_staging else LoadFilesystemJob
return cls(
file_path,
Expand Down

0 comments on commit be088ee

Please sign in to comment.