Skip to content

Commit

Permalink
uses right table path format in delta lake load job
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Aug 26, 2024
1 parent d192ee7 commit 00e46e9
Showing 1 changed file with 4 additions and 6 deletions.
10 changes: 4 additions & 6 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ def run(self) -> None:
self.is_local_filesystem = self._job_client.config.protocol == "file"
self.pathlib = os.path if self.is_local_filesystem else posixpath
# `destination_file_name` is a folder path, not a file path
self.destination_file_name = self._job_client.make_remote_uri(
self._job_client.get_table_dir(self.load_table_name)
)
self.destination_file_name = self._job_client.get_table_dir(self.load_table_name)

from dlt.common.libs.deltalake import write_delta_table, merge_delta_table

Expand All @@ -138,7 +136,7 @@ def run(self) -> None:
else:
write_delta_table(
table_or_uri=(
self.destination_file_name if self._delta_table is None else self._delta_table
self.make_remote_uri() if self._delta_table is None else self._delta_table
),
data=arrow_rbr,
write_disposition=self._load_table["write_disposition"],
Expand All @@ -156,7 +154,7 @@ def _storage_options(self) -> Dict[str, str]:
def _delta_table(self) -> Optional["DeltaTable"]: # type: ignore[name-defined] # noqa: F821
from dlt.common.libs.deltalake import try_get_deltatable

return try_get_deltatable(self.destination_file_name, storage_options=self._storage_options)
return try_get_deltatable(self.make_remote_uri(), storage_options=self._storage_options)

@property
def _partition_columns(self) -> List[str]:
Expand All @@ -171,7 +169,7 @@ def _create_or_evolve_delta_table(self) -> None:

if self._delta_table is None:
DeltaTable.create(
table_uri=self.destination_file_name,
table_uri=self.make_remote_uri(),
schema=ensure_delta_compatible_arrow_schema(self.arrow_ds.schema),
mode="overwrite",
partition_by=self._partition_columns,
Expand Down

0 comments on commit 00e46e9

Please sign in to comment.