Skip to content

Commit

Permalink
adds bigquery partition expiration and motherduck connection string (#…
Browse files Browse the repository at this point in the history
…1968)

* supports Motherduck md:? connstr and env variable for token

* supports bigquery partition expiration days

* removes test code
  • Loading branch information
rudolfix authored Oct 20, 2024
1 parent 1023b8f commit 4f58c71
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 9 deletions.
6 changes: 6 additions & 0 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from dlt.destinations.impl.bigquery.bigquery_adapter import (
AUTODETECT_SCHEMA_HINT,
PARTITION_HINT,
PARTITION_EXPIRATION_DAYS_HINT,
CLUSTER_HINT,
TABLE_DESCRIPTION_HINT,
ROUND_HALF_EVEN_HINT,
Expand Down Expand Up @@ -277,6 +278,11 @@ def _get_table_update_sql(
if table.get(TABLE_EXPIRATION_HINT)
else None
),
"partition_expiration_days": (
str(table.get(PARTITION_EXPIRATION_DAYS_HINT))
if table.get(PARTITION_EXPIRATION_DAYS_HINT)
else None
),
}
if not any(table_options.values()):
return sql
Expand Down
12 changes: 12 additions & 0 deletions dlt/destinations/impl/bigquery/bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
TABLE_EXPIRATION_HINT: Literal["x-bigquery-table-expiration"] = "x-bigquery-table-expiration"
TABLE_DESCRIPTION_HINT: Literal["x-bigquery-table-description"] = "x-bigquery-table-description"
AUTODETECT_SCHEMA_HINT: Literal["x-bigquery-autodetect-schema"] = "x-bigquery-autodetect-schema"
PARTITION_EXPIRATION_DAYS_HINT: Literal["x-bigquery-partition-expiration-days"] = (
"x-bigquery-partition-expiration-days"
)


def bigquery_adapter(
Expand All @@ -34,6 +37,7 @@ def bigquery_adapter(
table_expiration_datetime: Optional[str] = None,
insert_api: Optional[Literal["streaming", "default"]] = None,
autodetect_schema: Optional[bool] = None,
partition_expiration_days: Optional[int] = None,
) -> DltResource:
"""
Prepares data for loading into BigQuery.
Expand Down Expand Up @@ -67,6 +71,8 @@ def bigquery_adapter(
NOTE: due to BigQuery features, streaming insert is only available for `append` write_disposition.
autodetect_schema (bool, optional): If set to True, BigQuery schema autodetection will be used to create data tables. This
allows to create structured types from nested data.
partition_expiration_days (int, optional): For date/time based partitions it tells when partition is expired and removed.
Partitions are expired based on a partitioned column value. (https://cloud.google.com/bigquery/docs/managing-partitioned-tables#partition-expiration)
Returns:
A `DltResource` object that is ready to be loaded into BigQuery.
Expand Down Expand Up @@ -158,6 +164,12 @@ def bigquery_adapter(
except ValueError as e:
raise ValueError(f"{table_expiration_datetime} could not be parsed!") from e

if partition_expiration_days is not None:
assert isinstance(
partition_expiration_days, int
), "partition_expiration_days must be an integer (days)"
additional_table_hints[PARTITION_EXPIRATION_DAYS_HINT] = partition_expiration_days

if insert_api is not None:
if insert_api == "streaming" and data.write_disposition != "append":
raise ValueError(
Expand Down
27 changes: 22 additions & 5 deletions dlt/destinations/impl/motherduck/configuration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import dataclasses
import sys
from typing import Any, ClassVar, Dict, Final, List, Optional
Expand All @@ -13,6 +14,7 @@

MOTHERDUCK_DRIVERNAME = "md"
MOTHERDUCK_USER_AGENT = f"dlt/{__version__}({sys.platform})"
MOTHERDUCK_DEFAULT_TOKEN_ENV = "motherduck_token"


@configspec(init=False)
Expand All @@ -30,12 +32,18 @@ class MotherDuckCredentials(DuckDbBaseCredentials):
__config_gen_annotations__: ClassVar[List[str]] = ["password", "database"]

def _conn_str(self) -> str:
return f"{MOTHERDUCK_DRIVERNAME}:{self.database}?token={self.password}"
_str = f"{MOTHERDUCK_DRIVERNAME}:{self.database}"
if self.password:
_str += f"?motherduck_token={self.password}"
return _str

def _token_to_password(self) -> None:
# could be motherduck connection
if self.query and "token" in self.query:
self.password = self.query.pop("token")
if self.query:
# backward compat
if "token" in self.query:
self.password = self.query.pop("token")
if "motherduck_token" in self.query:
self.password = self.query.pop("motherduck_token")

def borrow_conn(self, read_only: bool) -> Any:
from duckdb import HTTPException, InvalidInputException
Expand All @@ -51,15 +59,24 @@ def borrow_conn(self, read_only: bool) -> Any:
raise

def parse_native_representation(self, native_value: Any) -> None:
if isinstance(native_value, str):
# https://motherduck.com/docs/key-tasks/authenticating-and-connecting-to-motherduck/authenticating-to-motherduck/#storing-the-access-token-as-an-environment-variable
# ie. md:dlt_data_3?motherduck_token=<my service token>
if native_value.startswith("md:") and not native_value.startswith("md:/"):
native_value = "md:///" + native_value[3:] # skip md:
super().parse_native_representation(native_value)
self._token_to_password()

def on_partial(self) -> None:
"""Takes a token from query string and reuses it as a password"""
self._token_to_password()
if not self.is_partial():
if not self.is_partial() or self._has_default_token():
self.resolve()

def _has_default_token(self) -> bool:
# TODO: implement default connection interface
return MOTHERDUCK_DEFAULT_TOKEN_ENV in os.environ

def _get_conn_config(self) -> Dict[str, Any]:
# If it was explicitly set to None/null then we
# need to use the default value
Expand Down
12 changes: 11 additions & 1 deletion docs/website/docs/dlt-ecosystem/destinations/motherduck.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,21 @@ Paste your **service token** into the password field. The `database` field is op
Alternatively, you can use the connection string syntax.
```toml
[destination]
motherduck.credentials="md:///dlt_data_3?token=<my service token>"
motherduck.credentials="md:dlt_data_3?motherduck_token=<my service token>"
```

:::tip
Motherduck now supports configurable **access tokens**. Please refer to the [documentation](https://motherduck.com/docs/key-tasks/authenticating-to-motherduck/#authentication-using-an-access-token)

You can pass token in a native Motherduck environment variable:
```sh
export motherduck_token='<token>'
```
in that case you can skip **password** / **motherduck_token** secret.

**database** defaults to `my_db`.

More in Motherduck [documentation](https://motherduck.com/docs/key-tasks/authenticating-and-connecting-to-motherduck/authenticating-to-motherduck/#storing-the-access-token-as-an-environment-variable)
:::

**4. Run the pipeline**
Expand Down
18 changes: 16 additions & 2 deletions tests/load/bigquery/test_bigquery_table_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,14 +534,16 @@ def test_adapter_hints_parsing_partitioning() -> None:
def some_data() -> Iterator[Dict[str, str]]:
yield from next(sequence_generator())

bigquery_adapter(some_data, partition="int_col")
bigquery_adapter(some_data, partition="int_col", partition_expiration_days=4)
assert some_data.columns == {
"int_col": {
"name": "int_col",
"data_type": "bigint",
"x-bigquery-partition": True,
},
}
table_schema = some_data.compute_table_schema()
assert table_schema["x-bigquery-partition-expiration-days"] == 4 # type: ignore[typeddict-item]


def test_adapter_on_data() -> None:
Expand All @@ -562,11 +564,20 @@ def test_adapter_hints_partitioning(
def no_hints() -> Iterator[Dict[str, int]]:
yield from [{"col1": i} for i in range(10)]

@dlt.resource(columns=[{"name": "col1", "data_type": "date"}])
def date_no_hints() -> Iterator[Dict[str, pendulum.Date]]:
yield from [{"col1": pendulum.now().add(days=i).date()} for i in range(10)]

hints = bigquery_adapter(no_hints.with_name(new_name="hints"), partition="col1")
date_hints = bigquery_adapter(
date_no_hints.with_name(new_name="date_hints"),
partition="col1",
partition_expiration_days=3,
)

@dlt.source(max_table_nesting=0)
def sources() -> List[DltResource]:
return [no_hints, hints]
return [no_hints, hints, date_hints]

pipeline = destination_config.setup_pipeline(
f"bigquery_{uniq_id()}",
Expand All @@ -580,11 +591,14 @@ def sources() -> List[DltResource]:

fqtn_no_hints = c.make_qualified_table_name("no_hints", escape=False)
fqtn_hints = c.make_qualified_table_name("hints", escape=False)
fqtn_date_hints = c.make_qualified_table_name("date_hints", escape=False)

no_hints_table = nc.get_table(fqtn_no_hints)
hints_table = nc.get_table(fqtn_hints)
date_hints_table = nc.get_table(fqtn_date_hints)

assert not no_hints_table.range_partitioning, "`no_hints` table IS clustered on a column."
assert date_hints_table.time_partitioning.expiration_ms == 3 * 24 * 60 * 60 * 1000

if not hints_table.range_partitioning:
raise ValueError("`hints` table IS NOT clustered on a column.")
Expand Down
31 changes: 30 additions & 1 deletion tests/load/duckdb/test_motherduck_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,19 @@ def test_motherduck_configuration() -> None:
assert cred.is_resolved() is False

cred = MotherDuckCredentials()
cred.parse_native_representation("md:///?token=TOKEN")
cred.parse_native_representation("md:///?motherduck_token=TOKEN")
assert cred.password == "TOKEN"
assert cred.database == ""
assert cred.is_partial() is False
assert cred.is_resolved() is False

cred = MotherDuckCredentials()
cred.parse_native_representation("md:xdb?motherduck_token=TOKEN2")
assert cred.password == "TOKEN2"
assert cred.database == "xdb"
assert cred.is_partial() is False
assert cred.is_resolved() is False

# password or token are mandatory
with pytest.raises(ConfigFieldMissingException) as conf_ex:
resolve_configuration(MotherDuckCredentials())
Expand All @@ -52,6 +59,28 @@ def test_motherduck_configuration() -> None:
assert config.password == "tok"


def test_motherduck_connect_default_token() -> None:
import dlt

credentials = dlt.secrets.get(
"destination.motherduck.credentials", expected_type=MotherDuckCredentials
)
assert credentials.password
os.environ["motherduck_token"] = credentials.password

credentials = MotherDuckCredentials()
assert credentials._has_default_token() is True
credentials.on_partial()
assert credentials.is_resolved()

config = MotherDuckClientConfiguration(credentials=credentials)
print(config.credentials._conn_str())
# connect
con = config.credentials.borrow_conn(read_only=False)
con.sql("SHOW DATABASES")
config.credentials.return_conn(con)


@pytest.mark.parametrize("custom_user_agent", [MOTHERDUCK_USER_AGENT, "patates", None, ""])
def test_motherduck_connect_with_user_agent_string(
custom_user_agent: Optional[str], mocker: MockerFixture
Expand Down

0 comments on commit 4f58c71

Please sign in to comment.