Skip to content

Commit

Permalink
Merge branch 'devel' into feat/1484-integrate-rest-api-generic-source…
Browse files Browse the repository at this point in the history
…-into-dlt-core
  • Loading branch information
willi-mueller committed Aug 29, 2024
2 parents 375d28b + e9c9ecf commit e6599d2
Show file tree
Hide file tree
Showing 45 changed files with 836 additions and 237 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ env:
# Test redshift and filesystem with all buckets
# postgres runs again here so we can test on mac/windows
ACTIVE_DESTINATIONS: "[\"redshift\", \"postgres\", \"duckdb\", \"filesystem\", \"dummy\"]"
# note that all buckets are enabled for testing

jobs:
get_docs_changes:
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/configuration/specs/azure_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def to_object_store_rs_credentials(self) -> Dict[str, str]:
creds = self.to_adlfs_credentials()
if creds["sas_token"] is None:
creds.pop("sas_token")
if creds["account_key"] is None:
creds.pop("account_key")
return creds

def create_sas_token(self) -> None:
Expand Down
19 changes: 16 additions & 3 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ class DestinationClientDwhWithStagingConfiguration(DestinationClientDwhConfigura

staging_config: Optional[DestinationClientStagingConfiguration] = None
"""configuration of the staging, if present, injected at runtime"""
truncate_tables_on_staging_destination_before_load: bool = True
"""If dlt should truncate the tables on staging destination before loading data."""


TLoadJobState = Literal["ready", "running", "failed", "retry", "completed"]
Expand Down Expand Up @@ -578,17 +580,28 @@ def with_staging_dataset(self) -> ContextManager["JobClientBase"]:
return self # type: ignore


class SupportsStagingDestination:
class SupportsStagingDestination(ABC):
"""Adds capability to support a staging destination for the load"""

def should_load_data_to_staging_dataset_on_staging_destination(
self, table: TTableSchema
) -> bool:
"""If set to True, and staging destination is configured, the data will be loaded to staging dataset on staging destination
instead of a regular dataset on staging destination. Currently it is used by Athena Iceberg which uses staging dataset
on staging destination to copy data to iceberg tables stored on regular dataset on staging destination.
The default is to load data to regular dataset on staging destination from where warehouses like Snowflake (that have their
own storage) will copy data.
"""
return False

@abstractmethod
def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
# the default is to truncate the tables on the staging destination...
return True
"""If set to True, data in `table` will be truncated on staging destination (regular dataset). This is the default behavior which
can be changed with a config flag.
For Athena + Iceberg this setting is always False - Athena uses regular dataset to store Iceberg tables and we avoid touching it.
For Athena we truncate those tables only on "replace" write disposition.
"""
pass


# TODO: type Destination properly
Expand Down
3 changes: 2 additions & 1 deletion dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ def _deltalake_storage_options(config: FilesystemConfiguration) -> Dict[str, str
"""Returns dict that can be passed as `storage_options` in `deltalake` library."""
creds = {}
extra_options = {}
if config.protocol in ("az", "gs", "s3"):
# TODO: create a mixin with to_object_store_rs_credentials for a proper discovery
if hasattr(config.credentials, "to_object_store_rs_credentials"):
creds = config.credentials.to_object_store_rs_credentials()
if config.deltalake_storage_options is not None:
extra_options = config.deltalake_storage_options
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class LoadJobMetrics(NamedTuple):
started_at: datetime.datetime
finished_at: datetime.datetime
state: Optional[str]
remote_uri: Optional[str]
remote_url: Optional[str]


class LoadMetrics(StepMetrics):
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/normalizers/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ class SupportsDataItemNormalizer(Protocol):
"""A class with a name DataItemNormalizer deriving from normalizers.json.DataItemNormalizer"""


def wrap_in_dict(item: Any) -> DictStrAny:
def wrap_in_dict(label: str, item: Any) -> DictStrAny:
"""Wraps `item` that is not a dictionary into dictionary that can be json normalized"""
return {"value": item}
return {label: item}


__all__ = [
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def _normalize_list(
else:
# list of simple types
child_row_hash = DataItemNormalizer._get_child_row_hash(parent_row_id, table, idx)
wrap_v = wrap_in_dict(v)
wrap_v = wrap_in_dict(self.c_value, v)
wrap_v[self.c_dlt_id] = child_row_hash
e = self._link_row(wrap_v, parent_row_id, idx)
DataItemNormalizer._extend_row(extend, e)
Expand Down Expand Up @@ -387,7 +387,7 @@ def normalize_data_item(
) -> TNormalizedRowIterator:
# wrap items that are not dictionaries in dictionary, otherwise they cannot be processed by the JSON normalizer
if not isinstance(item, dict):
item = wrap_in_dict(item)
item = wrap_in_dict(self.c_value, item)
# we will extend event with all the fields necessary to load it as root row
row = cast(DictStrAny, item)
# identify load id if loaded data must be processed after loading incrementally
Expand Down
119 changes: 88 additions & 31 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import pathlib
from typing import Any, Literal, Optional, Type, get_args, ClassVar, Dict, Union
from urllib.parse import urlparse, unquote
from urllib.parse import urlparse, unquote, urlunparse

from dlt.common.configuration import configspec, resolve_type
from dlt.common.configuration.exceptions import ConfigurationValueError
Expand Down Expand Up @@ -52,14 +52,61 @@ class LoadStorageConfiguration(BaseConfiguration):
]


def _make_az_url(scheme: str, fs_path: str, bucket_url: str) -> str:
parsed_bucket_url = urlparse(bucket_url)
if parsed_bucket_url.username:
# az://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>
# fs_path always starts with container
split_path = fs_path.split("/", maxsplit=1)
if len(split_path) == 1:
split_path.append("")
container, path = split_path
netloc = f"{container}@{parsed_bucket_url.hostname}"
return urlunparse(parsed_bucket_url._replace(path=path, scheme=scheme, netloc=netloc))
return f"{scheme}://{fs_path}"


def _make_file_url(scheme: str, fs_path: str, bucket_url: str) -> str:
"""Creates a normalized file:// url from a local path
netloc is never set. UNC paths are represented as file://host/path
"""
p_ = pathlib.Path(fs_path)
p_ = p_.expanduser().resolve()
return p_.as_uri()


MAKE_URI_DISPATCH = {"az": _make_az_url, "file": _make_file_url}

MAKE_URI_DISPATCH["adl"] = MAKE_URI_DISPATCH["az"]
MAKE_URI_DISPATCH["abfs"] = MAKE_URI_DISPATCH["az"]
MAKE_URI_DISPATCH["azure"] = MAKE_URI_DISPATCH["az"]
MAKE_URI_DISPATCH["abfss"] = MAKE_URI_DISPATCH["az"]
MAKE_URI_DISPATCH["local"] = MAKE_URI_DISPATCH["file"]


def make_fsspec_url(scheme: str, fs_path: str, bucket_url: str) -> str:
"""Creates url from `fs_path` and `scheme` using bucket_url as an `url` template
Args:
scheme (str): scheme of the resulting url
fs_path (str): kind of absolute path that fsspec uses to locate resources for particular filesystem.
bucket_url (str): an url template. the structure of url will be preserved if possible
"""
_maker = MAKE_URI_DISPATCH.get(scheme)
if _maker:
return _maker(scheme, fs_path, bucket_url)
return f"{scheme}://{fs_path}"


@configspec
class FilesystemConfiguration(BaseConfiguration):
"""A configuration defining filesystem location and access credentials.
When configuration is resolved, `bucket_url` is used to extract a protocol and request corresponding credentials class.
* s3
* gs, gcs
* az, abfs, adl
* az, abfs, adl, abfss, azure
* file, memory
* gdrive
"""
Expand All @@ -72,6 +119,8 @@ class FilesystemConfiguration(BaseConfiguration):
"az": AnyAzureCredentials,
"abfs": AnyAzureCredentials,
"adl": AnyAzureCredentials,
"abfss": AnyAzureCredentials,
"azure": AnyAzureCredentials,
}

bucket_url: str = None
Expand All @@ -93,17 +142,21 @@ def protocol(self) -> str:
else:
return urlparse(self.bucket_url).scheme

@property
def is_local_filesystem(self) -> bool:
return self.protocol == "file"

def on_resolved(self) -> None:
uri = urlparse(self.bucket_url)
if not uri.path and not uri.netloc:
url = urlparse(self.bucket_url)
if not url.path and not url.netloc:
raise ConfigurationValueError(
"File path and netloc are missing. Field bucket_url of"
" FilesystemClientConfiguration must contain valid uri with a path or host:password"
" FilesystemClientConfiguration must contain valid url with a path or host:password"
" component."
)
# this is just a path in a local file system
if self.is_local_path(self.bucket_url):
self.bucket_url = self.make_file_uri(self.bucket_url)
self.bucket_url = self.make_file_url(self.bucket_url)

@resolve_type("credentials")
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
Expand All @@ -122,44 +175,50 @@ def fingerprint(self) -> str:
if self.is_local_path(self.bucket_url):
return digest128("")

uri = urlparse(self.bucket_url)
return digest128(self.bucket_url.replace(uri.path, ""))
url = urlparse(self.bucket_url)
return digest128(self.bucket_url.replace(url.path, ""))

def make_url(self, fs_path: str) -> str:
"""Makes a full url (with scheme) form fs_path which is kind-of absolute path used by fsspec to identify resources.
This method will use `bucket_url` to infer the original form of the url.
"""
return make_fsspec_url(self.protocol, fs_path, self.bucket_url)

def __str__(self) -> str:
"""Return displayable destination location"""
uri = urlparse(self.bucket_url)
url = urlparse(self.bucket_url)
# do not show passwords
if uri.password:
new_netloc = f"{uri.username}:****@{uri.hostname}"
if uri.port:
new_netloc += f":{uri.port}"
return uri._replace(netloc=new_netloc).geturl()
if url.password:
new_netloc = f"{url.username}:****@{url.hostname}"
if url.port:
new_netloc += f":{url.port}"
return url._replace(netloc=new_netloc).geturl()
return self.bucket_url

@staticmethod
def is_local_path(uri: str) -> bool:
"""Checks if `uri` is a local path, without a schema"""
uri_parsed = urlparse(uri)
def is_local_path(url: str) -> bool:
"""Checks if `url` is a local path, without a schema"""
url_parsed = urlparse(url)
# this prevents windows absolute paths to be recognized as schemas
return not uri_parsed.scheme or os.path.isabs(uri)
return not url_parsed.scheme or os.path.isabs(url)

@staticmethod
def make_local_path(file_uri: str) -> str:
def make_local_path(file_url: str) -> str:
"""Gets a valid local filesystem path from file:// scheme.
Supports POSIX/Windows/UNC paths
Returns:
str: local filesystem path
"""
uri = urlparse(file_uri)
if uri.scheme != "file":
raise ValueError(f"Must be file scheme but is {uri.scheme}")
if not uri.path and not uri.netloc:
url = urlparse(file_url)
if url.scheme != "file":
raise ValueError(f"Must be file scheme but is {url.scheme}")
if not url.path and not url.netloc:
raise ConfigurationValueError("File path and netloc are missing.")
local_path = unquote(uri.path)
if uri.netloc:
local_path = unquote(url.path)
if url.netloc:
# or UNC file://localhost/path
local_path = "//" + unquote(uri.netloc) + local_path
local_path = "//" + unquote(url.netloc) + local_path
else:
# if we are on windows, strip the POSIX root from path which is always absolute
if os.path.sep != local_path[0]:
Expand All @@ -172,11 +231,9 @@ def make_local_path(file_uri: str) -> str:
return str(pathlib.Path(local_path))

@staticmethod
def make_file_uri(local_path: str) -> str:
"""Creates a normalized file:// uri from a local path
def make_file_url(local_path: str) -> str:
"""Creates a normalized file:// url from a local path
netloc is never set. UNC paths are represented as file://host/path
"""
p_ = pathlib.Path(local_path)
p_ = p_.expanduser().resolve()
return p_.as_uri()
return make_fsspec_url("file", local_path, None)
Loading

0 comments on commit e6599d2

Please sign in to comment.