Skip to content

Commit

Permalink
Fix/refresh standalone resources (#2140)
Browse files Browse the repository at this point in the history
* drops tables from schema and relational

* documents custom sections for sql_database and source rename

* clones schema without data tables when resources without source are extacted, adds tests

* skips airflow tests if not installed

* adds doc on setting up FUSE on bucket

* adds doc on setting up FUSE on bucket

* adds row key propagation for table when its nested table require it

* fixes tests
  • Loading branch information
rudolfix authored Dec 15, 2024
1 parent fd5ba0b commit 95d6063
Show file tree
Hide file tree
Showing 17 changed files with 304 additions and 47 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ format:
lint-snippets:
cd docs/tools && poetry run python check_embedded_snippets.py full


lint-and-test-snippets: lint-snippets
poetry run mypy --config-file mypy.ini docs/website docs/tools --exclude docs/tools/lint_setup --exclude docs/website/docs_processed
poetry run flake8 --max-line-length=200 docs/website docs/tools --exclude docs/website/.dlt-repo
Expand All @@ -82,7 +81,7 @@ lint-security:
poetry run bandit -r dlt/ -n 3 -l

test:
(set -a && . tests/.env && poetry run pytest tests)
poetry run pytest tests

test-load-local:
DESTINATION__POSTGRES__CREDENTIALS=postgresql://loader:loader@localhost:5432/dlt_data DESTINATION__DUCKDB__CREDENTIALS=duckdb:///_storage/test_quack.duckdb poetry run pytest tests -k '(postgres or duckdb)'
Expand Down
4 changes: 4 additions & 0 deletions dlt/common/normalizers/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def extend_schema(self) -> None:
def extend_table(self, table_name: str) -> None:
pass

@abc.abstractmethod
def remove_table(self, table_name: str) -> None:
pass

@classmethod
@abc.abstractmethod
def update_normalizer_config(cls, schema: Schema, config: TNormalizerConfig) -> None:
Expand Down
54 changes: 45 additions & 9 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
from typing import Dict, List, Mapping, Optional, Sequence, Tuple, cast, TypedDict, Any
from typing import (
ClassVar,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
Type,
cast,
TypedDict,
Any,
)

from dlt.common.normalizers.exceptions import InvalidJsonNormalizer
from dlt.common.normalizers.typing import TJSONNormalizer
Expand All @@ -14,6 +26,9 @@
from dlt.common.schema.utils import (
column_name_validator,
is_nested_table,
get_nested_tables,
has_column_with_prop,
get_first_column_name_with_prop,
)
from dlt.common.utils import update_dict_nested
from dlt.common.normalizers.json import (
Expand Down Expand Up @@ -48,6 +63,7 @@ class DataItemNormalizer(DataItemNormalizerBase[RelationalNormalizerConfig]):

# other constants
EMPTY_KEY_IDENTIFIER = "_empty" # replace empty keys with this
RELATIONAL_CONFIG_TYPE: ClassVar[Type[RelationalNormalizerConfig]] = RelationalNormalizerConfig

normalizer_config: RelationalNormalizerConfig
propagation_config: RelationalNormalizerConfigPropagation
Expand Down Expand Up @@ -310,20 +326,38 @@ def extend_table(self, table_name: str) -> None:
Table name should be normalized.
"""
table = self.schema.tables.get(table_name)
if not is_nested_table(table) and table.get("write_disposition") == "merge":
DataItemNormalizer.update_normalizer_config(
# add root key prop when merge disposition is used or any of nested tables needs row_key
if not is_nested_table(table) and (
table.get("write_disposition") == "merge"
or any(
has_column_with_prop(t, "root_key", include_incomplete=True)
for t in get_nested_tables(self.schema.tables, table_name)
)
):
# get row id column from table, assume that we propagate it into c_dlt_root_id always
c_dlt_id = get_first_column_name_with_prop(table, "row_key", include_incomplete=True)
self.update_normalizer_config(
self.schema,
{
"propagation": {
"tables": {
table_name: {
TColumnName(self.c_dlt_id): TColumnName(self.c_dlt_root_id)
TColumnName(c_dlt_id or self.c_dlt_id): TColumnName(
self.c_dlt_root_id
)
}
}
}
},
)

def remove_table(self, table_name: str) -> None:
"""Called by the Schema when table is removed from it."""
config = self.get_normalizer_config(self.schema)
if propagation := config.get("propagation"):
if tables := propagation.get("tables"):
tables.pop(table_name, None)

def normalize_data_item(
self, item: TDataItem, load_id: str, table_name: str
) -> TNormalizedRowIterator:
Expand Down Expand Up @@ -352,8 +386,8 @@ def normalize_data_item(
def ensure_this_normalizer(cls, norm_config: TJSONNormalizer) -> None:
# make sure schema has right normalizer
present_normalizer = norm_config["module"]
if present_normalizer != __name__:
raise InvalidJsonNormalizer(__name__, present_normalizer)
if present_normalizer != cls.__module__:
raise InvalidJsonNormalizer(cls.__module__, present_normalizer)

@classmethod
def update_normalizer_config(cls, schema: Schema, config: RelationalNormalizerConfig) -> None:
Expand All @@ -371,8 +405,10 @@ def get_normalizer_config(cls, schema: Schema) -> RelationalNormalizerConfig:
cls.ensure_this_normalizer(norm_config)
return cast(RelationalNormalizerConfig, norm_config.get("config", {}))

@staticmethod
def _validate_normalizer_config(schema: Schema, config: RelationalNormalizerConfig) -> None:
@classmethod
def _validate_normalizer_config(
cls, schema: Schema, config: RelationalNormalizerConfig
) -> None:
"""Normalizes all known column identifiers according to the schema and then validates the configuration"""

def _normalize_prop(
Expand All @@ -397,7 +433,7 @@ def _normalize_prop(
)

validate_dict(
RelationalNormalizerConfig,
cls.RELATIONAL_CONFIG_TYPE,
config,
"./normalizers/json/config",
validator_f=column_name_validator(schema.naming),
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,12 @@ def drop_tables(
) -> List[TTableSchema]:
"""Drops tables from the schema and returns the dropped tables"""
result = []
# TODO: make sure all nested tables to table_names are also dropped
for table_name in table_names:
table = self.get_table(table_name)
if table and (not seen_data_only or utils.has_table_seen_data(table)):
result.append(self._schema_tables.pop(table_name))
self.data_item_normalizer.remove_table(table_name)
return result

def filter_row_with_hint(
Expand Down
7 changes: 6 additions & 1 deletion dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ def choose_schema() -> Schema:
schema_ = schema
# take pipeline schema to make newest version visible to the resources
elif pipeline.default_schema_name:
schema_ = pipeline.schemas[pipeline.default_schema_name].clone()
# clones with name which will drop previous hashes
schema_ = pipeline.schemas[pipeline.default_schema_name].clone(
with_name=pipeline.default_schema_name
)
# delete data tables
schema_.drop_tables(schema_.data_table_names(include_incomplete=True))
else:
schema_ = pipeline._make_schema_with_default_name()
return schema_
Expand Down
9 changes: 6 additions & 3 deletions docs/tools/check_embedded_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


SNIPPET_MARKER = "```"
ALLOWED_LANGUAGES = ["py", "toml", "json", "yaml", "text", "sh", "bat", "sql"]
ALLOWED_LANGUAGES = ["py", "toml", "json", "yaml", "text", "sh", "bat", "sql", "hcl"]

LINT_TEMPLATE = "./lint_setup/template.py"
LINT_FILE = "./lint_setup/lint_me.py"
Expand Down Expand Up @@ -163,8 +163,11 @@ def parse_snippets(snippets: List[Snippet], verbose: bool) -> None:
json.loads(snippet.code)
elif snippet.language == "yaml":
yaml.safe_load(snippet.code)
# ignore text and sh scripts
elif snippet.language in ["text", "sh", "bat", "sql"]:
elif snippet.language == "hcl":
# TODO: implement hcl parsers
pass
# ignore all other scripts
elif snippet.language in ALLOWED_LANGUAGES:
pass
else:
raise ValueError(f"Unknown language {snippet.language}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,24 @@ SOURCES__SQL_DATABASE__CHUNK_SIZE=1000
SOURCES__SQL_DATABASE__CHAT_MESSAGE__INCREMENTAL__CURSOR_PATH=updated_at
```

### Configure many sources side by side with custom sections
`dlt` allows you to rename any source to place the source configuration into custom section or to have many instances
of the source created side by side. For example:
```py
from dlt.sources.sql_database import sql_database

my_db = sql_database.with_args(name="my_db", section="my_db")(table_names=["chat_message"])
print(my_db.name)
```
Here we create a renamed version of the `sql_database` and then instantiate it. Such source will read
credentials from:
```toml
[sources.my_db]
credentials="mssql+pyodbc://loader.database.windows.net/dlt_data?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server"
schema="data"
backend="pandas"
chunk_size=1000

[sources.my_db.chat_message.incremental]
cursor_path="updated_at"
```
17 changes: 16 additions & 1 deletion docs/website/docs/general-usage/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ Do not extract data in the source function. Leave that task to your resources if

If this is impractical (for example, you want to reflect a database to create resources for tables), make sure you do not call the source function too often. [See this note if you plan to deploy on Airflow](../walkthroughs/deploy-a-pipeline/deploy-with-airflow-composer.md#2-modify-dag-file)


## Customize sources

### Access and select resources to load
Expand Down Expand Up @@ -114,6 +113,22 @@ Note that `add_limit` **does not limit the number of records** but rather the "n

Find more on sampling data [here](resource.md#sample-from-large-data).

### Rename the source
`dlt` allows you to rename the source ie. to place the source configuration into custom section or to have many instances
of the source created side by side. For example:
```py
from dlt.sources.sql_database import sql_database

my_db = sql_database.with_args(name="my_db", section="my_db")(table_names=["table_1"])
print(my_db.name)
```
Here we create a renamed version of the `sql_database` and then instantiate it. Such source will read
credentials from:
```toml
[sources.my_db.my_db.credentials]
password="..."
```

### Add more resources to existing source

You can add a custom resource to a source after it was created. Imagine that you want to score all the deals with a keras model that will tell you if the deal is a fraud or not. In order to do that, you declare a new [transformer that takes the data from](resource.md#feeding-data-from-one-resource-into-another) `deals` resource and add it to the source.
Expand Down
26 changes: 26 additions & 0 deletions docs/website/docs/reference/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,29 @@ DLT_USE_JSON=simplejson

Instead of using Python Requests directly, you can use the built-in [requests wrapper](../general-usage/http/requests) or [`RESTClient`](../general-usage/http/rest-client) for API calls. This will make your pipeline more resilient to intermittent network errors and other random glitches.


## Keep pipeline working folder in a bucket on constrained environments.
`dlt` stores extracted data in load packages in order to load them atomically. In case you extract a lot of data at once (ie. backfill) or
your runtime env has constrained local storage (ie. cloud functions) you can keep your data on a bucket by using [FUSE](https://github.com/libfuse/libfuse) or
any other option which your cloud provider supplies.

`dlt` users rename when saving files and "committing" packages (folder rename). Those may be not supported on bucket filesystems. Often
`rename` is translated into `copy` automatically. In other cases `dlt` will fallback to copy itself.

In case of cloud function and gs bucket mounts, increasing the rename limit for folders is possible:
```hcl
volume_mounts {
mount_path = "/usr/src/ingestion/pipeline_storage"
name = "pipeline_bucket"
}
volumes {
name = "pipeline_bucket"
gcs {
bucket = google_storage_bucket.dlt_pipeline_data_bucket.name
read_only = false
mount_options = [
"rename-dir-limit=100000"
]
}
}
```
33 changes: 33 additions & 0 deletions tests/common/normalizers/test_json_relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,35 @@ def test_propagation_update_on_table_change(norm: RelationalNormalizer):
"table_3"
] == {"_dlt_id": "_dlt_root_id", "prop1": "prop2"}

# force propagation when table has nested table that needs root_key
# also use custom name for row_key
table_4 = new_table(
"table_4", write_disposition="replace", columns=[{"name": "primary_key", "row_key": True}]
)
table_4_nested = new_table(
"table_4__nested",
parent_table_name="table_4",
columns=[{"name": "_dlt_root_id", "root_key": True}],
)
# must add table_4 first
norm.schema.update_table(table_4)
norm.schema.update_table(table_4_nested)
# row key table_4 not propagated because it was added before nested that needs that
# TODO: maybe fix it
assert (
"table_4" not in norm.schema._normalizers_config["json"]["config"]["propagation"]["tables"]
)
norm.schema.update_table(table_4)
# also custom key was used
assert norm.schema._normalizers_config["json"]["config"]["propagation"]["tables"][
"table_4"
] == {"primary_key": "_dlt_root_id"}
# drop table from schema
norm.schema.drop_tables(["table_4"])
assert (
"table_4" not in norm.schema._normalizers_config["json"]["config"]["propagation"]["tables"]
)


def test_caching_perf(norm: RelationalNormalizer) -> None:
from time import time
Expand All @@ -893,6 +922,10 @@ def test_caching_perf(norm: RelationalNormalizer) -> None:
print(f"{time() - start}")


def test_extend_table(norm: RelationalNormalizer) -> None:
pass


def set_max_nesting(norm: RelationalNormalizer, max_nesting: int) -> None:
RelationalNormalizer.update_normalizer_config(norm.schema, {"max_nesting": max_nesting})
norm._reset()
Expand Down
4 changes: 4 additions & 0 deletions tests/helpers/airflow_tests/test_airflow_provider.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import pytest

pytest.importorskip("airflow")

from airflow import DAG
from airflow.decorators import task, dag
from airflow.operators.python import PythonOperator
Expand Down
2 changes: 2 additions & 0 deletions tests/helpers/airflow_tests/test_airflow_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import pytest
from unittest import mock
from typing import Iterator, List

pytest.importorskip("airflow")
from airflow import DAG
from airflow.decorators import dag
from airflow.operators.python import PythonOperator, get_current_context
Expand Down
3 changes: 3 additions & 0 deletions tests/helpers/airflow_tests/test_join_airflow_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import pytest
import datetime
from pendulum.tz import UTC

pytest.importorskip("airflow")
from airflow import DAG
from airflow.decorators import dag, task
from airflow.models import DagRun
Expand Down
8 changes: 5 additions & 3 deletions tests/helpers/airflow_tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
import os
import argparse
import pytest
from airflow.cli.commands.db_command import resetdb
from airflow.configuration import conf
from airflow.models.variable import Variable

from dlt.common.configuration.container import Container
from dlt.common.configuration.specs import PluggableRunContext
Expand All @@ -19,6 +16,8 @@

@pytest.fixture(scope="function", autouse=True)
def initialize_airflow_db():
from airflow.models.variable import Variable

setup_airflow()
# backup context providers
providers = Container()[PluggableRunContext].providers
Expand All @@ -35,6 +34,9 @@ def initialize_airflow_db():


def setup_airflow() -> None:
from airflow.cli.commands.db_command import resetdb
from airflow.configuration import conf

# Disable loading examples
try:
conf.add_section("core")
Expand Down
Loading

0 comments on commit 95d6063

Please sign in to comment.