Skip to content

Commit

Permalink
enables external location and named credential in databricks (#1755)
Browse files Browse the repository at this point in the history
* allows to configure external location and named credential for databricks

* fixes #1703

* normalizes 'value' when wrapping simple objects in relational, fixes #1754

* simplifies fsspec globbing and allows various url formats that are preserved when reconstituting full url, allows abfss databricks format

* adds info on partially loaded packages to docs

* renames remote_uri to remote_url in traces

* fixes delta for abfss

* adds nested tables dlt columns collision test
  • Loading branch information
rudolfix authored and willi-mueller committed Sep 2, 2024
1 parent b7231eb commit 3e6c2d3
Show file tree
Hide file tree
Showing 32 changed files with 510 additions and 167 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ env:
# Test redshift and filesystem with all buckets
# postgres runs again here so we can test on mac/windows
ACTIVE_DESTINATIONS: "[\"redshift\", \"postgres\", \"duckdb\", \"filesystem\", \"dummy\"]"
# note that all buckets are enabled for testing

jobs:
get_docs_changes:
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/configuration/specs/azure_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def to_object_store_rs_credentials(self) -> Dict[str, str]:
creds = self.to_adlfs_credentials()
if creds["sas_token"] is None:
creds.pop("sas_token")
if creds["account_key"] is None:
creds.pop("account_key")
return creds

def create_sas_token(self) -> None:
Expand Down
3 changes: 2 additions & 1 deletion dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ def _deltalake_storage_options(config: FilesystemConfiguration) -> Dict[str, str
"""Returns dict that can be passed as `storage_options` in `deltalake` library."""
creds = {}
extra_options = {}
if config.protocol in ("az", "gs", "s3"):
# TODO: create a mixin with to_object_store_rs_credentials for a proper discovery
if hasattr(config.credentials, "to_object_store_rs_credentials"):
creds = config.credentials.to_object_store_rs_credentials()
if config.deltalake_storage_options is not None:
extra_options = config.deltalake_storage_options
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class LoadJobMetrics(NamedTuple):
started_at: datetime.datetime
finished_at: datetime.datetime
state: Optional[str]
remote_uri: Optional[str]
remote_url: Optional[str]


class LoadMetrics(StepMetrics):
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/normalizers/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ class SupportsDataItemNormalizer(Protocol):
"""A class with a name DataItemNormalizer deriving from normalizers.json.DataItemNormalizer"""


def wrap_in_dict(item: Any) -> DictStrAny:
def wrap_in_dict(label: str, item: Any) -> DictStrAny:
"""Wraps `item` that is not a dictionary into dictionary that can be json normalized"""
return {"value": item}
return {label: item}


__all__ = [
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def _normalize_list(
else:
# list of simple types
child_row_hash = DataItemNormalizer._get_child_row_hash(parent_row_id, table, idx)
wrap_v = wrap_in_dict(v)
wrap_v = wrap_in_dict(self.c_value, v)
wrap_v[self.c_dlt_id] = child_row_hash
e = self._link_row(wrap_v, parent_row_id, idx)
DataItemNormalizer._extend_row(extend, e)
Expand Down Expand Up @@ -387,7 +387,7 @@ def normalize_data_item(
) -> TNormalizedRowIterator:
# wrap items that are not dictionaries in dictionary, otherwise they cannot be processed by the JSON normalizer
if not isinstance(item, dict):
item = wrap_in_dict(item)
item = wrap_in_dict(self.c_value, item)
# we will extend event with all the fields necessary to load it as root row
row = cast(DictStrAny, item)
# identify load id if loaded data must be processed after loading incrementally
Expand Down
119 changes: 88 additions & 31 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import pathlib
from typing import Any, Literal, Optional, Type, get_args, ClassVar, Dict, Union
from urllib.parse import urlparse, unquote
from urllib.parse import urlparse, unquote, urlunparse

from dlt.common.configuration import configspec, resolve_type
from dlt.common.configuration.exceptions import ConfigurationValueError
Expand Down Expand Up @@ -52,14 +52,61 @@ class LoadStorageConfiguration(BaseConfiguration):
]


def _make_az_url(scheme: str, fs_path: str, bucket_url: str) -> str:
parsed_bucket_url = urlparse(bucket_url)
if parsed_bucket_url.username:
# az://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>
# fs_path always starts with container
split_path = fs_path.split("/", maxsplit=1)
if len(split_path) == 1:
split_path.append("")
container, path = split_path
netloc = f"{container}@{parsed_bucket_url.hostname}"
return urlunparse(parsed_bucket_url._replace(path=path, scheme=scheme, netloc=netloc))
return f"{scheme}://{fs_path}"


def _make_file_url(scheme: str, fs_path: str, bucket_url: str) -> str:
"""Creates a normalized file:// url from a local path
netloc is never set. UNC paths are represented as file://host/path
"""
p_ = pathlib.Path(fs_path)
p_ = p_.expanduser().resolve()
return p_.as_uri()


MAKE_URI_DISPATCH = {"az": _make_az_url, "file": _make_file_url}

MAKE_URI_DISPATCH["adl"] = MAKE_URI_DISPATCH["az"]
MAKE_URI_DISPATCH["abfs"] = MAKE_URI_DISPATCH["az"]
MAKE_URI_DISPATCH["azure"] = MAKE_URI_DISPATCH["az"]
MAKE_URI_DISPATCH["abfss"] = MAKE_URI_DISPATCH["az"]
MAKE_URI_DISPATCH["local"] = MAKE_URI_DISPATCH["file"]


def make_fsspec_url(scheme: str, fs_path: str, bucket_url: str) -> str:
"""Creates url from `fs_path` and `scheme` using bucket_url as an `url` template
Args:
scheme (str): scheme of the resulting url
fs_path (str): kind of absolute path that fsspec uses to locate resources for particular filesystem.
bucket_url (str): an url template. the structure of url will be preserved if possible
"""
_maker = MAKE_URI_DISPATCH.get(scheme)
if _maker:
return _maker(scheme, fs_path, bucket_url)
return f"{scheme}://{fs_path}"


@configspec
class FilesystemConfiguration(BaseConfiguration):
"""A configuration defining filesystem location and access credentials.
When configuration is resolved, `bucket_url` is used to extract a protocol and request corresponding credentials class.
* s3
* gs, gcs
* az, abfs, adl
* az, abfs, adl, abfss, azure
* file, memory
* gdrive
"""
Expand All @@ -72,6 +119,8 @@ class FilesystemConfiguration(BaseConfiguration):
"az": AnyAzureCredentials,
"abfs": AnyAzureCredentials,
"adl": AnyAzureCredentials,
"abfss": AnyAzureCredentials,
"azure": AnyAzureCredentials,
}

bucket_url: str = None
Expand All @@ -93,17 +142,21 @@ def protocol(self) -> str:
else:
return urlparse(self.bucket_url).scheme

@property
def is_local_filesystem(self) -> bool:
return self.protocol == "file"

def on_resolved(self) -> None:
uri = urlparse(self.bucket_url)
if not uri.path and not uri.netloc:
url = urlparse(self.bucket_url)
if not url.path and not url.netloc:
raise ConfigurationValueError(
"File path and netloc are missing. Field bucket_url of"
" FilesystemClientConfiguration must contain valid uri with a path or host:password"
" FilesystemClientConfiguration must contain valid url with a path or host:password"
" component."
)
# this is just a path in a local file system
if self.is_local_path(self.bucket_url):
self.bucket_url = self.make_file_uri(self.bucket_url)
self.bucket_url = self.make_file_url(self.bucket_url)

@resolve_type("credentials")
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
Expand All @@ -122,44 +175,50 @@ def fingerprint(self) -> str:
if self.is_local_path(self.bucket_url):
return digest128("")

uri = urlparse(self.bucket_url)
return digest128(self.bucket_url.replace(uri.path, ""))
url = urlparse(self.bucket_url)
return digest128(self.bucket_url.replace(url.path, ""))

def make_url(self, fs_path: str) -> str:
"""Makes a full url (with scheme) form fs_path which is kind-of absolute path used by fsspec to identify resources.
This method will use `bucket_url` to infer the original form of the url.
"""
return make_fsspec_url(self.protocol, fs_path, self.bucket_url)

def __str__(self) -> str:
"""Return displayable destination location"""
uri = urlparse(self.bucket_url)
url = urlparse(self.bucket_url)
# do not show passwords
if uri.password:
new_netloc = f"{uri.username}:****@{uri.hostname}"
if uri.port:
new_netloc += f":{uri.port}"
return uri._replace(netloc=new_netloc).geturl()
if url.password:
new_netloc = f"{url.username}:****@{url.hostname}"
if url.port:
new_netloc += f":{url.port}"
return url._replace(netloc=new_netloc).geturl()
return self.bucket_url

@staticmethod
def is_local_path(uri: str) -> bool:
"""Checks if `uri` is a local path, without a schema"""
uri_parsed = urlparse(uri)
def is_local_path(url: str) -> bool:
"""Checks if `url` is a local path, without a schema"""
url_parsed = urlparse(url)
# this prevents windows absolute paths to be recognized as schemas
return not uri_parsed.scheme or os.path.isabs(uri)
return not url_parsed.scheme or os.path.isabs(url)

@staticmethod
def make_local_path(file_uri: str) -> str:
def make_local_path(file_url: str) -> str:
"""Gets a valid local filesystem path from file:// scheme.
Supports POSIX/Windows/UNC paths
Returns:
str: local filesystem path
"""
uri = urlparse(file_uri)
if uri.scheme != "file":
raise ValueError(f"Must be file scheme but is {uri.scheme}")
if not uri.path and not uri.netloc:
url = urlparse(file_url)
if url.scheme != "file":
raise ValueError(f"Must be file scheme but is {url.scheme}")
if not url.path and not url.netloc:
raise ConfigurationValueError("File path and netloc are missing.")
local_path = unquote(uri.path)
if uri.netloc:
local_path = unquote(url.path)
if url.netloc:
# or UNC file://localhost/path
local_path = "//" + unquote(uri.netloc) + local_path
local_path = "//" + unquote(url.netloc) + local_path
else:
# if we are on windows, strip the POSIX root from path which is always absolute
if os.path.sep != local_path[0]:
Expand All @@ -172,11 +231,9 @@ def make_local_path(file_uri: str) -> str:
return str(pathlib.Path(local_path))

@staticmethod
def make_file_uri(local_path: str) -> str:
"""Creates a normalized file:// uri from a local path
def make_file_url(local_path: str) -> str:
"""Creates a normalized file:// url from a local path
netloc is never set. UNC paths are represented as file://host/path
"""
p_ = pathlib.Path(local_path)
p_ = p_.expanduser().resolve()
return p_.as_uri()
return make_fsspec_url("file", local_path, None)
58 changes: 36 additions & 22 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from urllib.parse import urlparse

from fsspec import AbstractFileSystem, register_implementation
from fsspec import AbstractFileSystem, register_implementation, get_filesystem_class
from fsspec.core import url_to_fs

from dlt import version
Expand All @@ -32,7 +32,11 @@
AzureCredentials,
)
from dlt.common.exceptions import MissingDependencyException
from dlt.common.storages.configuration import FileSystemCredentials, FilesystemConfiguration
from dlt.common.storages.configuration import (
FileSystemCredentials,
FilesystemConfiguration,
make_fsspec_url,
)
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import DictStrAny

Expand Down Expand Up @@ -65,18 +69,20 @@ class FileItem(TypedDict, total=False):
MTIME_DISPATCH["gs"] = MTIME_DISPATCH["gcs"]
MTIME_DISPATCH["s3a"] = MTIME_DISPATCH["s3"]
MTIME_DISPATCH["abfs"] = MTIME_DISPATCH["az"]
MTIME_DISPATCH["abfss"] = MTIME_DISPATCH["az"]

# Map of protocol to a filesystem type
CREDENTIALS_DISPATCH: Dict[str, Callable[[FilesystemConfiguration], DictStrAny]] = {
"s3": lambda config: cast(AwsCredentials, config.credentials).to_s3fs_credentials(),
"adl": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
"az": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
"gcs": lambda config: cast(GcpCredentials, config.credentials).to_gcs_credentials(),
"gs": lambda config: cast(GcpCredentials, config.credentials).to_gcs_credentials(),
"gdrive": lambda config: {"credentials": cast(GcpCredentials, config.credentials)},
"abfs": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
"azure": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
}
CREDENTIALS_DISPATCH["adl"] = CREDENTIALS_DISPATCH["az"]
CREDENTIALS_DISPATCH["abfs"] = CREDENTIALS_DISPATCH["az"]
CREDENTIALS_DISPATCH["azure"] = CREDENTIALS_DISPATCH["az"]
CREDENTIALS_DISPATCH["abfss"] = CREDENTIALS_DISPATCH["az"]
CREDENTIALS_DISPATCH["gcs"] = CREDENTIALS_DISPATCH["gs"]


def fsspec_filesystem(
Expand All @@ -90,7 +96,7 @@ def fsspec_filesystem(
Please supply credentials instance corresponding to the protocol.
The `protocol` is just the code name of the filesystem i.e.:
* s3
* az, abfs
* az, abfs, abfss, adl, azure
* gcs, gs
also see filesystem_from_config
Expand Down Expand Up @@ -136,7 +142,7 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys
Authenticates following filesystems:
* s3
* az, abfs
* az, abfs, abfss, adl, azure
* gcs, gs
All other filesystems are not authenticated
Expand All @@ -146,8 +152,14 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys
fs_kwargs = prepare_fsspec_args(config)

try:
# first get the class to check the protocol
fs_cls = get_filesystem_class(config.protocol)
if fs_cls.protocol == "abfs":
# if storage account is present in bucket_url and in credentials, az fsspec will fail
if urlparse(config.bucket_url).username:
fs_kwargs.pop("account_name")
return url_to_fs(config.bucket_url, **fs_kwargs) # type: ignore
except ModuleNotFoundError as e:
except ImportError as e:
raise MissingDependencyException(
"filesystem", [f"{version.DLT_PKG_NAME}[{config.protocol}]"]
) from e
Expand Down Expand Up @@ -291,18 +303,17 @@ def glob_files(
"""
is_local_fs = "file" in fs_client.protocol
if is_local_fs and FilesystemConfiguration.is_local_path(bucket_url):
bucket_url = FilesystemConfiguration.make_file_uri(bucket_url)
bucket_url_parsed = urlparse(bucket_url)
else:
bucket_url_parsed = urlparse(bucket_url)
bucket_url = FilesystemConfiguration.make_file_url(bucket_url)
bucket_url_parsed = urlparse(bucket_url)

if is_local_fs:
root_dir = FilesystemConfiguration.make_local_path(bucket_url)
# use a Python glob to get files
files = glob.glob(str(pathlib.Path(root_dir).joinpath(file_glob)), recursive=True)
glob_result = {file: fs_client.info(file) for file in files}
else:
root_dir = bucket_url_parsed._replace(scheme="", query="").geturl().lstrip("/")
# convert to fs_path
root_dir = fs_client._strip_protocol(bucket_url)
filter_url = posixpath.join(root_dir, file_glob)
glob_result = fs_client.glob(filter_url, detail=True)
if isinstance(glob_result, list):
Expand All @@ -314,20 +325,23 @@ def glob_files(
for file, md in glob_result.items():
if md["type"] != "file":
continue
scheme = bucket_url_parsed.scheme

# relative paths are always POSIX
if is_local_fs:
rel_path = pathlib.Path(file).relative_to(root_dir).as_posix()
file_url = FilesystemConfiguration.make_file_uri(file)
# use OS pathlib for local paths
loc_path = pathlib.Path(file)
file_name = loc_path.name
rel_path = loc_path.relative_to(root_dir).as_posix()
file_url = FilesystemConfiguration.make_file_url(file)
else:
rel_path = posixpath.relpath(file.lstrip("/"), root_dir)
file_url = bucket_url_parsed._replace(
path=posixpath.join(bucket_url_parsed.path, rel_path)
).geturl()
file_name = posixpath.basename(file)
rel_path = posixpath.relpath(file, root_dir)
file_url = make_fsspec_url(scheme, file, bucket_url)

scheme = bucket_url_parsed.scheme
mime_type, encoding = guess_mime_type(rel_path)
yield FileItem(
file_name=posixpath.basename(rel_path),
file_name=file_name,
relative_path=rel_path,
file_url=file_url,
mime_type=mime_type,
Expand Down
Loading

0 comments on commit 3e6c2d3

Please sign in to comment.