Skip to content

Commit

Permalink
Refactor filesystem and path utilities methods #930
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Feb 26, 2024
1 parent e035e38 commit 7246de5
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 61 deletions.
103 changes: 53 additions & 50 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import posixpath
import os
import posixpath
from contextlib import contextmanager
from types import TracebackType
from typing import ClassVar, List, Type, Iterable, Set, Iterator

from fsspec import AbstractFileSystem
from contextlib import contextmanager

from dlt.common import logger
from dlt.common.schema import Schema, TSchemaTables, TTableSchema
from dlt.common.storages import FileStorage, ParsedLoadJobFileName, fsspec_from_config
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import (
NewLoadJob,
Expand All @@ -17,12 +16,13 @@
FollowupJob,
WithStagingDataset,
)

from dlt.destinations.job_impl import EmptyLoadJob
from dlt.common.schema import Schema, TSchemaTables, TTableSchema
from dlt.common.storages import FileStorage, ParsedLoadJobFileName, fsspec_from_config
from dlt.destinations import path_utils
from dlt.destinations.impl.filesystem import capabilities
from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration
from dlt.destinations.job_impl import EmptyLoadJob
from dlt.destinations.job_impl import NewReferenceJob
from dlt.destinations import path_utils


class LoadFilesystemJob(LoadJob):
Expand Down Expand Up @@ -98,8 +98,9 @@ def __init__(self, schema: Schema, config: FilesystemDestinationClientConfigurat
super().__init__(schema, config)
self.fs_client, self.fs_path = fsspec_from_config(config)
self.config: FilesystemDestinationClientConfiguration = config
# verify files layout. we need {table_name} and only allow {schema_name} before it, otherwise tables
# cannot be replaced and we cannot initialize folders consistently
# Verify file layout.
# we need {table_name} and only allow {schema_name} before it, otherwise tables
# can't be replaced, and we can't initialize folders consistently.
self.table_prefix_layout = path_utils.get_table_prefix_layout(config.layout)
self._dataset_path = self.config.normalize_dataset_name(self.schema)

Expand All @@ -116,7 +117,7 @@ def with_staging_dataset(self) -> Iterator["FilesystemClient"]:
current_dataset_path = self._dataset_path
try:
self._dataset_path = self.schema.naming.normalize_table_identifier(
current_dataset_path + "_staging"
f"{current_dataset_path}_staging"
)
yield self
finally:
Expand All @@ -125,47 +126,49 @@ def with_staging_dataset(self) -> Iterator["FilesystemClient"]:

def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
# clean up existing files for tables selected for truncating
if truncate_tables and self.fs_client.isdir(self.dataset_path):
# get all dirs with table data to delete. the table data are guaranteed to be files in those folders
# TODO: when we do partitioning it is no longer the case and we may remove folders below instead
truncated_dirs = self._get_table_dirs(truncate_tables)
# print(f"TRUNCATE {truncated_dirs}")
truncate_prefixes: Set[str] = set()
for table in truncate_tables:
table_prefix = self.table_prefix_layout.format(
schema_name=self.schema.name, table_name=table
if not truncate_tables or not self.fs_client.isdir(self.dataset_path):
return
# get all dirs with table data to delete.
# the table data are guaranteed to be files in those folders
# TODO: when we do partitioning it is no longer the case and we may remove folders below instead
truncated_dirs = self._get_table_dirs(truncate_tables)
# print(f"TRUNCATE {truncated_dirs}")
truncate_prefixes: Set[str] = set()
for table in truncate_tables:
table_prefix = self.table_prefix_layout.format(
schema_name=self.schema.name, table_name=table
)
truncate_prefixes.add(posixpath.join(self.dataset_path, table_prefix))
# print(f"TRUNCATE PREFIXES {truncate_prefixes} on {truncate_tables}")

for truncate_dir in truncated_dirs:
# get files in truncate dirs
# NOTE: glob implementation in fsspec doesn't look thread safe, way better is to use ls and then filter
# NOTE: without refresh you get random results here.
logger.info(f"Will truncate tables in {truncate_dir}")
try:
all_files = self.fs_client.ls(truncate_dir, detail=False, refresh=True)
# logger.debug(f"Found {len(all_files)} CANDIDATE files in {truncate_dir}")
# print(f"in truncate dir {truncate_dir}: {all_files}")
for item in all_files:
# check every file against all the prefixes
for search_prefix in truncate_prefixes:
if item.startswith(search_prefix):
# NOTE: deleting in chunks on s3 doesn't raise on access denied, file non-existing and probably other errors
# print(f"DEL {item}")
try:
# NOTE: must use rm_file to get errors on deleting.
self.fs_client.rm_file(item)
except NotImplementedError as e:
# not all filesystems implement the above.
self.fs_client.rm(item)
if self.fs_client.exists(item):
raise FileExistsError(item) from e
except FileNotFoundError:
logger.info(
f"Directory or path to truncate tables {truncate_dir} does not exist but it"
" should be created previously!"
)
truncate_prefixes.add(posixpath.join(self.dataset_path, table_prefix))
# print(f"TRUNCATE PREFIXES {truncate_prefixes} on {truncate_tables}")

for truncate_dir in truncated_dirs:
# get files in truncate dirs
# NOTE: glob implementation in fsspec does not look thread safe, way better is to use ls and then filter
# NOTE: without refresh you get random results here
logger.info(f"Will truncate tables in {truncate_dir}")
try:
all_files = self.fs_client.ls(truncate_dir, detail=False, refresh=True)
# logger.debug(f"Found {len(all_files)} CANDIDATE files in {truncate_dir}")
# print(f"in truncate dir {truncate_dir}: {all_files}")
for item in all_files:
# check every file against all the prefixes
for search_prefix in truncate_prefixes:
if item.startswith(search_prefix):
# NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors
# print(f"DEL {item}")
try:
# NOTE: must use rm_file to get errors on delete
self.fs_client.rm_file(item)
except NotImplementedError:
# not all filesystem implement the above
self.fs_client.rm(item)
if self.fs_client.exists(item):
raise FileExistsError(item)
except FileNotFoundError:
logger.info(
f"Directory or path to truncate tables {truncate_dir} does not exist but it"
" should be created previously!"
)

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
Expand Down
23 changes: 12 additions & 11 deletions dlt/destinations/path_utils.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
# this can probably go some other place, but it is shared by destinations, so for now it is here
from typing import List, Sequence, Tuple
import re
from typing import List, Sequence

import pendulum
import re

from dlt.destinations.exceptions import InvalidFilesystemLayout, CantExtractTablePrefix

# TODO: ensure layout only has supported placeholders

# TODO: Ensure layout only has supported placeholders.
SUPPORTED_PLACEHOLDERS = {"schema_name", "table_name", "load_id", "file_id", "ext", "curr_date"}

SUPPORTED_TABLE_NAME_PREFIX_PLACEHOLDERS = ("schema_name",)


def check_layout(layout: str) -> List[str]:
placeholders = get_placeholders(layout)
invalid_placeholders = [p for p in placeholders if p not in SUPPORTED_PLACEHOLDERS]
if invalid_placeholders:
if invalid_placeholders := [
p for p in placeholders if p not in SUPPORTED_PLACEHOLDERS
]:
raise InvalidFilesystemLayout(invalid_placeholders)
return placeholders


def get_placeholders(layout: str) -> List[str]:
return re.findall(r"\{(.*?)\}", layout)
return re.findall(r"\{(.*?)}", layout)


def create_path(
Expand All @@ -37,7 +38,7 @@ def create_path(
ext=ext,
curr_date=str(pendulum.today()),
)
# if extension is not defined, we append it at the end
# If an extension is not defined, we append it at the end.
if "ext" not in placeholders:
path += f".{ext}"
return path
Expand All @@ -47,7 +48,7 @@ def get_table_prefix_layout(
layout: str,
supported_prefix_placeholders: Sequence[str] = SUPPORTED_TABLE_NAME_PREFIX_PLACEHOLDERS,
) -> str:
"""get layout fragment that defines positions of the table, cutting other placeholders
"""Get layout fragment that defines positions of the table, cutting other placeholders.
allowed `supported_prefix_placeholders` that may appear before table.
"""
Expand All @@ -72,8 +73,8 @@ def get_table_prefix_layout(
)
raise CantExtractTablePrefix(layout, details)

# we include the char after the table_name here, this should be a separator not a new placeholder
# this is to prevent selecting tables that have the same starting name
# We include the char after the table_name here; this should be a separator, not a new placeholder
# this is to prevent selecting tables that have the same starting name.
prefix = layout[: layout.index("{table_name}") + 13]
if prefix[-1] == "{":
raise CantExtractTablePrefix(layout, "A separator is required after a {table_name}. ")
Expand Down

0 comments on commit 7246de5

Please sign in to comment.