Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filesystem): implement Google Drive source #932

Merged
merged 36 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
f13c9b3
WIP(gdrive): implement basic version of gdrive source
IlyaFaer Jan 10, 2024
cec612b
WIP: google drive source
IlyaFaer Jan 31, 2024
442afde
fixes
IlyaFaer Feb 5, 2024
5db1041
revert wrong changes
IlyaFaer Feb 5, 2024
81fa30f
lint fix
IlyaFaer Feb 5, 2024
8d8af68
add dependency
IlyaFaer Feb 5, 2024
c52a63e
type
IlyaFaer Feb 5, 2024
4883ea2
lint fixes
IlyaFaer Feb 5, 2024
c4c6876
lint fix
IlyaFaer Feb 5, 2024
f80ef52
dependency
IlyaFaer Feb 5, 2024
950cb06
add gdrive bucket into tests
IlyaFaer Feb 7, 2024
1ac6153
Merge branch 'devel' into gdrive_new_source
IlyaFaer Feb 7, 2024
94e3c5d
fix tests
IlyaFaer Feb 7, 2024
53006c5
fixes
IlyaFaer Feb 7, 2024
a4442f9
add config
IlyaFaer Feb 7, 2024
3cf7ab2
fix dependencies
IlyaFaer Feb 7, 2024
1c5b707
lint fix
IlyaFaer Feb 7, 2024
14689e1
lint fix
IlyaFaer Feb 7, 2024
14943bc
fix
IlyaFaer Feb 7, 2024
633c8de
fix
IlyaFaer Feb 7, 2024
96fa828
fix
IlyaFaer Feb 7, 2024
9b8213c
skip destination tests
IlyaFaer Feb 7, 2024
79debb9
add temporary fixture for gdrive testing
IlyaFaer Feb 8, 2024
877762e
return env preservation
IlyaFaer Feb 9, 2024
9b39de6
put dlt credentials
IlyaFaer Feb 9, 2024
c42f7bf
lint fix
IlyaFaer Feb 9, 2024
b5279f0
fix
IlyaFaer Feb 9, 2024
9aa4247
Merge branch 'devel' into gdrive_new_source
rudolfix Feb 10, 2024
4963303
refactors google drive fsspec
rudolfix Feb 11, 2024
4536f2f
documents and cleanups code
rudolfix Feb 12, 2024
e82f3b9
Merge branch 'devel' into gdrive_new_source
rudolfix Feb 12, 2024
9014566
Merge branch 'devel' into gdrive_new_source
rudolfix Feb 12, 2024
34c6902
TO-REVERT
IlyaFaer Feb 12, 2024
1ef0141
revert a commit
IlyaFaer Feb 12, 2024
7a5297e
add test retries
IlyaFaer Feb 12, 2024
35ba3c7
implements fs.info to be consistent
rudolfix Feb 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class FilesystemConfiguration(BaseConfiguration):
PROTOCOL_CREDENTIALS: ClassVar[Dict[str, Any]] = {
"gs": Union[GcpServiceAccountCredentials, GcpOAuthCredentials],
"gcs": Union[GcpServiceAccountCredentials, GcpOAuthCredentials],
"gdrive": GcpOAuthCredentials,
"gdrive": Union[GcpServiceAccountCredentials, GcpOAuthCredentials],
"s3": AwsCredentials,
"az": Union[AzureCredentialsWithoutDefaults, AzureCredentials],
"abfs": Union[AzureCredentialsWithoutDefaults, AzureCredentials],
Expand All @@ -98,6 +98,8 @@ class FilesystemConfiguration(BaseConfiguration):
# should be a union of all possible credentials as found in PROTOCOL_CREDENTIALS
credentials: FileSystemCredentials

read_only: bool = False
"""Indicates read only filesystem access. Will enable caching"""
kwargs: Optional[DictStrAny] = None
client_kwargs: Optional[DictStrAny] = None

Expand Down Expand Up @@ -149,11 +151,7 @@ def __init__(
self,
bucket_url: str,
credentials: FileSystemCredentials = None,
read_only: bool = False,
kwargs: Optional[DictStrAny] = None,
client_kwargs: Optional[DictStrAny] = None,
) -> None:
self.bucket_url = bucket_url
self.credentials = credentials
self.kwargs = kwargs
self.client_kwargs = client_kwargs
...
) -> None: ...
32 changes: 22 additions & 10 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)
from urllib.parse import urlparse

from fsspec import AbstractFileSystem
from fsspec import AbstractFileSystem, register_implementation
from fsspec.core import url_to_fs

from dlt import version
Expand Down Expand Up @@ -57,6 +57,7 @@ class FileItem(TypedDict, total=False):
"gcs": lambda f: ensure_pendulum_datetime(f["updated"]),
"file": lambda f: ensure_pendulum_datetime(f["mtime"]),
"memory": lambda f: ensure_pendulum_datetime(f["created"]),
"gdrive": lambda f: ensure_pendulum_datetime(f["modifiedTime"]),
}
# Support aliases
MTIME_DISPATCH["gs"] = MTIME_DISPATCH["gcs"]
Expand All @@ -70,6 +71,7 @@ class FileItem(TypedDict, total=False):
"az": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
"gcs": lambda config: cast(GcpCredentials, config.credentials).to_gcs_credentials(),
"gs": lambda config: cast(GcpCredentials, config.credentials).to_gcs_credentials(),
"gdrive": lambda config: {"credentials": cast(GcpCredentials, config.credentials)},
"abfs": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
"azure": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
}
Expand Down Expand Up @@ -105,9 +107,15 @@ def prepare_fsspec_args(config: FilesystemConfiguration) -> DictStrAny:
Returns:
DictStrAny: The arguments for the fsspec filesystem constructor.
"""
proto = config.protocol
fs_kwargs: DictStrAny = {"use_listings_cache": False}
credentials = CREDENTIALS_DISPATCH.get(proto, lambda _: {})(config)
protocol = config.protocol
# never use listing caches
fs_kwargs: DictStrAny = {"use_listings_cache": False, "listings_expiry_time": 60.0}
credentials = CREDENTIALS_DISPATCH.get(protocol, lambda _: {})(config)

if protocol == "gdrive":
from dlt.common.storages.fsspecs.google_drive import GoogleDriveFileSystem

register_implementation("gdrive", GoogleDriveFileSystem, "GoogleDriveFileSystem")

if config.kwargs is not None:
fs_kwargs.update(config.kwargs)
Expand All @@ -134,6 +142,7 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys
Returns: (fsspec filesystem, normalized url)
"""
fs_kwargs = prepare_fsspec_args(config)

try:
return url_to_fs(config.bucket_url, **fs_kwargs) # type: ignore
except ModuleNotFoundError as e:
Expand Down Expand Up @@ -273,10 +282,11 @@ def glob_files(
# this is a file so create a proper file url
bucket_url = pathlib.Path(bucket_url).absolute().as_uri()
bucket_url_parsed = urlparse(bucket_url)

bucket_path = bucket_url_parsed._replace(scheme="").geturl()
bucket_path = bucket_path[2:] if bucket_path.startswith("//") else bucket_path
filter_url = posixpath.join(bucket_path, file_glob)
bucket_url_no_schema = bucket_url_parsed._replace(scheme="", query="").geturl()
bucket_url_no_schema = (
bucket_url_no_schema[2:] if bucket_url_no_schema.startswith("//") else bucket_url_no_schema
)
filter_url = posixpath.join(bucket_url_no_schema, file_glob)

glob_result = fs_client.glob(filter_url, detail=True)
if isinstance(glob_result, list):
Expand All @@ -292,8 +302,10 @@ def glob_files(
# make that absolute path on a file://
if bucket_url_parsed.scheme == "file" and not file.startswith("/"):
file = f"/{file}"
file_name = posixpath.relpath(file, bucket_path)
file_url = f"{bucket_url_parsed.scheme}://{file}"
file_name = posixpath.relpath(file, bucket_url_no_schema)
file_url = bucket_url_parsed._replace(
path=posixpath.join(bucket_url_parsed.path, file_name)
).geturl()

mime_type, encoding = guess_mime_type(file_name)
yield FileItem(
Expand Down
Empty file.
Loading
Loading