Skip to content

Commit

Permalink
#2087 allows double underscores in identifiers (#2098)
Browse files Browse the repository at this point in the history
* removes astunparse and aiohttp

* allows for built-in ast unparse if present

* uses break path for normalization to allow names containing path separators, migrates old schema to enable compat mode with old behavior

* adds removeprefix util

* updates docs

* bumps dlt to version 1.4.1

* linter fixes

* fixes tests

* fixes and tests saving pandas indexes

* fixes sqllite read interface tests

* updates docs
  • Loading branch information
rudolfix authored Dec 2, 2024
1 parent 61c2ed9 commit f4faa83
Show file tree
Hide file tree
Showing 63 changed files with 2,203 additions and 1,721 deletions.
13 changes: 6 additions & 7 deletions dlt/cli/deploy_command_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from yaml import Dumper
from itertools import chain
from typing import List, Optional, Sequence, Tuple, Any, Dict
from astunparse import unparse

# optional dependencies
import pipdeptree
Expand All @@ -23,7 +22,7 @@
from dlt.common.git import get_origin, get_repo, Repo
from dlt.common.configuration.specs.runtime_configuration import get_default_pipeline_name
from dlt.common.typing import StrAny
from dlt.common.reflection.utils import evaluate_node_literal
from dlt.common.reflection.utils import evaluate_node_literal, ast_unparse
from dlt.common.pipeline import LoadInfo, TPipelineState, get_dlt_repos_dir
from dlt.common.storages import FileStorage
from dlt.common.utils import set_working_dir
Expand Down Expand Up @@ -313,7 +312,7 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
if f_r_value is None:
fmt.warning(
"The value of `dev_mode` in call to `dlt.pipeline` cannot be"
f" determined from {unparse(f_r_node).strip()}. We assume that you know"
f" determined from {ast_unparse(f_r_node).strip()}. We assume that you know"
" what you are doing :)"
)
if f_r_value is True:
Expand All @@ -331,8 +330,8 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
raise CliCommandInnerException(
"deploy",
"The value of 'pipelines_dir' argument in call to `dlt_pipeline` cannot be"
f" determined from {unparse(p_d_node).strip()}. Pipeline working dir will"
" be found. Pass it directly with --pipelines-dir option.",
f" determined from {ast_unparse(p_d_node).strip()}. Pipeline working dir"
" will be found. Pass it directly with --pipelines-dir option.",
)

p_n_node = call_args.arguments.get("pipeline_name")
Expand All @@ -342,8 +341,8 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
raise CliCommandInnerException(
"deploy",
"The value of 'pipeline_name' argument in call to `dlt_pipeline` cannot be"
f" determined from {unparse(p_d_node).strip()}. Pipeline working dir will"
" be found. Pass it directly with --pipeline-name option.",
f" determined from {ast_unparse(p_d_node).strip()}. Pipeline working dir"
" will be found. Pass it directly with --pipeline-name option.",
)
pipelines.append((pipeline_name, pipelines_dir))

Expand Down
5 changes: 2 additions & 3 deletions dlt/cli/source_detection.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import ast
import inspect
from astunparse import unparse
from typing import Dict, Tuple, Set, List

from dlt.common.configuration import is_secret_hint
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.reflection.utils import creates_func_def_name_node
from dlt.common.reflection.utils import creates_func_def_name_node, ast_unparse
from dlt.common.typing import is_optional_type

from dlt.sources import SourceReference
Expand Down Expand Up @@ -65,7 +64,7 @@ def find_source_calls_to_replace(
for calls in visitor.known_sources_resources_calls.values():
for call in calls:
transformed_nodes.append(
(call.func, ast.Name(id=pipeline_name + "_" + unparse(call.func)))
(call.func, ast.Name(id=pipeline_name + "_" + ast_unparse(call.func)))
)

return transformed_nodes
Expand Down
1 change: 0 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
DataFrame = Any
ArrowTable = Any
IbisBackend = Any

else:
DataFrame = Any
ArrowTable = Any
Expand Down
5 changes: 3 additions & 2 deletions dlt/common/libs/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
raise MissingDependencyException("dlt Pandas Helpers", ["pandas"])


def pandas_to_arrow(df: pandas.DataFrame) -> Any:
def pandas_to_arrow(df: pandas.DataFrame, preserve_index: bool = False) -> Any:
"""Converts pandas to arrow or raises an exception if pyarrow is not installed"""
from dlt.common.libs.pyarrow import pyarrow as pa

return pa.Table.from_pandas(df)
# NOTE: None preserves named indexes but ignores unnamed
return pa.Table.from_pandas(df, preserve_index=preserve_index)
141 changes: 141 additions & 0 deletions dlt/common/normalizers/json/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""
Cached helper methods for all operations that are called often
"""
from functools import lru_cache
from typing import Any, Dict, List, Optional, Tuple, cast

from dlt.common.json import json
from dlt.common.destination.utils import resolve_merge_strategy
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.normalizers.typing import TRowIdType
from dlt.common.normalizers.utils import DLT_ID_LENGTH_BYTES
from dlt.common.schema import Schema
from dlt.common.schema.typing import TColumnSchema, C_DLT_ID, DLT_NAME_PREFIX
from dlt.common.schema.utils import (
get_columns_names_with_prop,
get_first_column_name_with_prop,
is_nested_table,
)
from dlt.common.utils import digest128


@lru_cache(maxsize=None)
def shorten_fragments(naming: NamingConvention, *idents: str) -> str:
return naming.shorten_fragments(*idents)


@lru_cache(maxsize=None)
def normalize_table_identifier(schema: Schema, naming: NamingConvention, table_name: str) -> str:
if schema._normalizers_config.get("use_break_path_on_normalize", True):
return naming.normalize_tables_path(table_name)
else:
return naming.normalize_table_identifier(table_name)


@lru_cache(maxsize=None)
def normalize_identifier(schema: Schema, naming: NamingConvention, identifier: str) -> str:
if schema._normalizers_config.get("use_break_path_on_normalize", True):
return naming.normalize_path(identifier)
else:
return naming.normalize_identifier(identifier)


@lru_cache(maxsize=None)
def get_table_nesting_level(
schema: Schema, table_name: str, default_nesting: int = 1000
) -> Optional[int]:
"""gets table nesting level, will inherit from parent if not set"""

table = schema.tables.get(table_name)
if (
table
and (max_nesting := cast(int, table.get("x-normalizer", {}).get("max_nesting"))) is not None
):
return max_nesting
return default_nesting


@lru_cache(maxsize=None)
def get_primary_key(schema: Schema, table_name: str) -> List[str]:
if table_name not in schema.tables:
return []
table = schema.get_table(table_name)
return get_columns_names_with_prop(table, "primary_key", include_incomplete=True)


@lru_cache(maxsize=None)
def is_nested_type(
schema: Schema,
table_name: str,
field_name: str,
_r_lvl: int,
) -> bool:
"""For those paths the nested objects should be left in place.
Cache perf: max_nesting < _r_lvl: ~2x faster, full check 10x faster
"""

# nesting level is counted backwards
# is we have traversed to or beyond the calculated nesting level, we detect a nested type
if _r_lvl <= 0:
return True

column: TColumnSchema = None
table = schema.tables.get(table_name)
if table:
column = table["columns"].get(field_name)
if column is None or "data_type" not in column:
data_type = schema.get_preferred_type(field_name)
else:
data_type = column["data_type"]

return data_type == "json"


@lru_cache(maxsize=None)
def get_nested_row_id_type(schema: Schema, table_name: str) -> Tuple[TRowIdType, bool]:
"""Gets type of row id to be added to nested table and if linking information should be added"""
if table := schema.tables.get(table_name):
merge_strategy = resolve_merge_strategy(schema.tables, table)
if merge_strategy not in ("upsert", "scd2") and not is_nested_table(table):
return "random", False
else:
# table will be created, use standard linking
pass
return "row_hash", True


@lru_cache(maxsize=None)
def get_root_row_id_type(schema: Schema, table_name: str) -> TRowIdType:
if table := schema.tables.get(table_name):
merge_strategy = resolve_merge_strategy(schema.tables, table)
if merge_strategy == "upsert":
return "key_hash"
elif merge_strategy == "scd2":
x_row_version_col = get_first_column_name_with_prop(
schema.get_table(table_name),
"x-row-version",
include_incomplete=True,
)
if x_row_version_col == schema.naming.normalize_identifier(C_DLT_ID):
return "row_hash"
return "random"


def get_row_hash(row: Dict[str, Any], subset: Optional[List[str]] = None) -> str:
"""Returns hash of row.
Hash includes column names and values and is ordered by column name.
Excludes dlt system columns.
Can be used as deterministic row identifier.
"""
row_filtered = {k: v for k, v in row.items() if not k.startswith(DLT_NAME_PREFIX)}
if subset is not None:
row_filtered = {k: v for k, v in row.items() if k in subset}
row_str = json.dumps(row_filtered, sort_keys=True)
return digest128(row_str, DLT_ID_LENGTH_BYTES)


def get_nested_row_hash(parent_row_id: str, nested_table: str, list_idx: int) -> str:
# create deterministic unique id of the nested row taking into account that all lists are ordered
# and all nested tables must be lists
return digest128(f"{parent_row_id}_{nested_table}_{list_idx}", DLT_ID_LENGTH_BYTES)
Loading

0 comments on commit f4faa83

Please sign in to comment.