Skip to content

Commit

Permalink
improves collision detection when naming convention changes (#1536)
Browse files Browse the repository at this point in the history
* adds more info to pipeline drop and info commands

* extracts known env variables to separate module

* drops tables on staging

* tests create/drop datasets and tables

* simplifies drop command and helpers + tests

* adds no print linter module and a few other small fixes

* improves collision detection when normalizers change

* allows glob to work with memory filesystem

* replaces walk in filesystem destination with own glob

* standardizes drop_dataset beahvior for all destinations

* creates athena iceberg tables in random locations
  • Loading branch information
rudolfix authored Jul 5, 2024
1 parent 48c93f5 commit 60c7327
Show file tree
Hide file tree
Showing 59 changed files with 1,010 additions and 330 deletions.
2 changes: 1 addition & 1 deletion dlt/cli/_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def schema_command_wrapper(file_path: str, format_: str, remove_defaults: bool)
schema_str = json.dumps(s.to_dict(remove_defaults=remove_defaults), pretty=True)
else:
schema_str = s.to_pretty_yaml(remove_defaults=remove_defaults)
print(schema_str)
fmt.echo(schema_str)
return 0


Expand Down
44 changes: 41 additions & 3 deletions dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
from dlt.common.destination.reference import TDestinationReferenceArg
from dlt.common.runners import Venv
from dlt.common.runners.stdout import iter_stdout
from dlt.common.schema.utils import group_tables_by_resource, remove_defaults
from dlt.common.schema.utils import (
group_tables_by_resource,
has_table_seen_data,
is_complete_column,
remove_defaults,
)
from dlt.common.storages import FileStorage, PackageStorage
from dlt.pipeline.helpers import DropCommand
from dlt.pipeline.exceptions import CannotRestorePipelineException
Expand Down Expand Up @@ -180,6 +185,35 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
fmt.bold(str(res_state_slots)),
)
)
if verbosity > 0:
for table in tables:
incomplete_columns = len(
[
col
for col in table["columns"].values()
if not is_complete_column(col)
]
)
fmt.echo(
"\t%s table %s column(s) %s %s"
% (
fmt.bold(table["name"]),
fmt.bold(str(len(table["columns"]))),
(
fmt.style("received data", fg="green")
if has_table_seen_data(table)
else fmt.style("not yet received data", fg="yellow")
),
(
fmt.style(
f"{incomplete_columns} incomplete column(s)",
fg="yellow",
)
if incomplete_columns > 0
else ""
),
)
)
fmt.echo()
fmt.echo("Working dir content:")
_display_pending_packages()
Expand Down Expand Up @@ -272,7 +306,7 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
fmt.echo(package_info.asstr(verbosity))
if len(package_info.schema_update) > 0:
if verbosity == 0:
print("Add -v option to see schema update. Note that it could be large.")
fmt.echo("Add -v option to see schema update. Note that it could be large.")
else:
tables = remove_defaults({"tables": package_info.schema_update}) # type: ignore
fmt.echo(fmt.bold("Schema update:"))
Expand Down Expand Up @@ -316,7 +350,7 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
fmt.echo(
"About to drop the following data in dataset %s in destination %s:"
% (
fmt.bold(drop.info["dataset_name"]),
fmt.bold(p.dataset_name),
fmt.bold(p.destination.destination_name),
)
)
Expand All @@ -329,6 +363,10 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
)
)
fmt.echo("%s: %s" % (fmt.style("Table(s) to drop", fg="green"), drop.info["tables"]))
fmt.echo(
"%s: %s"
% (fmt.style("\twith data in destination", fg="green"), drop.info["tables_with_data"])
)
fmt.echo(
"%s: %s"
% (
Expand Down
24 changes: 12 additions & 12 deletions dlt/common/configuration/paths.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import os
import tempfile

# dlt settings folder
DOT_DLT = ".dlt"
from dlt.common import known_env


# dlt data dir is by default not set, see get_dlt_data_dir for details
DLT_DATA_DIR: str = None
# dlt settings folder
DOT_DLT = os.environ.get(known_env.DLT_CONFIG_FOLDER, ".dlt")


def get_dlt_project_dir() -> str:
"""The dlt project dir is the current working directory but may be overridden by DLT_PROJECT_DIR env variable."""
return os.environ.get("DLT_PROJECT_DIR", ".")
return os.environ.get(known_env.DLT_PROJECT_DIR, ".")


def get_dlt_settings_dir() -> str:
Expand All @@ -27,14 +27,14 @@ def make_dlt_settings_path(path: str) -> str:


def get_dlt_data_dir() -> str:
"""Gets default directory where pipelines' data will be stored
1. in user home directory: ~/.dlt/
2. if current user is root: in /var/dlt/
3. if current user does not have a home directory: in /tmp/dlt/
4. if DLT_DATA_DIR is set in env then it is used
"""Gets default directory where pipelines' data (working directories) will be stored
1. if DLT_DATA_DIR is set in env then it is used
2. in user home directory: ~/.dlt/
3. if current user is root: in /var/dlt/
4. if current user does not have a home directory: in /tmp/dlt/
"""
if "DLT_DATA_DIR" in os.environ:
return os.environ["DLT_DATA_DIR"]
if known_env.DLT_DATA_DIR in os.environ:
return os.environ[known_env.DLT_DATA_DIR]

# geteuid not available on Windows
if hasattr(os, "geteuid") and os.geteuid() == 0:
Expand Down
5 changes: 3 additions & 2 deletions dlt/common/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
except ImportError:
PydanticBaseModel = None # type: ignore[misc]

from dlt.common import known_env
from dlt.common.pendulum import pendulum
from dlt.common.arithmetics import Decimal
from dlt.common.wei import Wei
Expand Down Expand Up @@ -80,7 +81,7 @@ def custom_encode(obj: Any) -> str:


# use PUA range to encode additional types
PUA_START = int(os.environ.get("DLT_JSON_TYPED_PUA_START", "0xf026"), 16)
PUA_START = int(os.environ.get(known_env.DLT_JSON_TYPED_PUA_START, "0xf026"), 16)

_DECIMAL = chr(PUA_START)
_DATETIME = chr(PUA_START + 1)
Expand Down Expand Up @@ -191,7 +192,7 @@ def may_have_pua(line: bytes) -> bool:

# pick the right impl
json: SupportsJson = None
if os.environ.get("DLT_USE_JSON") == "simplejson":
if os.environ.get(known_env.DLT_USE_JSON) == "simplejson":
from dlt.common.json import _simplejson as _json_d

json = _json_d # type: ignore[assignment]
Expand Down
25 changes: 25 additions & 0 deletions dlt/common/known_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Defines env variables that `dlt` uses independently of its configuration system"""

DLT_PROJECT_DIR = "DLT_PROJECT_DIR"
"""The dlt project dir is the current working directory, '.' (current working dir) by default"""

DLT_DATA_DIR = "DLT_DATA_DIR"
"""Gets default directory where pipelines' data (working directories) will be stored"""

DLT_CONFIG_FOLDER = "DLT_CONFIG_FOLDER"
"""A folder (path relative to DLT_PROJECT_DIR) where config and secrets are stored"""

DLT_DEFAULT_NAMING_NAMESPACE = "DLT_DEFAULT_NAMING_NAMESPACE"
"""Python namespace default where naming modules reside, defaults to dlt.common.normalizers.naming"""

DLT_DEFAULT_NAMING_MODULE = "DLT_DEFAULT_NAMING_MODULE"
"""A module name with the default naming convention, defaults to snake_case"""

DLT_DLT_ID_LENGTH_BYTES = "DLT_DLT_ID_LENGTH_BYTES"
"""The length of the _dlt_id identifier, before base64 encoding"""

DLT_USE_JSON = "DLT_USE_JSON"
"""Type of json parser to use, defaults to orjson, may be simplejson"""

DLT_JSON_TYPED_PUA_START = "DLT_JSON_TYPED_PUA_START"
"""Start of the unicode block within the PUA used to encode types in typed json"""
10 changes: 7 additions & 3 deletions dlt/common/normalizers/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os
from importlib import import_module
from types import ModuleType
from typing import Any, Dict, Optional, Type, Tuple, cast, List

import dlt
from dlt.common import logger
from dlt.common import known_env
from dlt.common.configuration.inject import with_config
from dlt.common.configuration.specs import known_sections
from dlt.common.destination import DestinationCapabilitiesContext
Expand All @@ -24,9 +26,11 @@
from dlt.common.typing import is_subclass
from dlt.common.utils import get_full_class_name, uniq_id_base64, many_uniq_ids_base64

DEFAULT_NAMING_NAMESPACE = "dlt.common.normalizers.naming"
DLT_ID_LENGTH_BYTES = 10
DEFAULT_NAMING_MODULE = "snake_case"
DEFAULT_NAMING_NAMESPACE = os.environ.get(
known_env.DLT_DEFAULT_NAMING_NAMESPACE, "dlt.common.normalizers.naming"
)
DEFAULT_NAMING_MODULE = os.environ.get(known_env.DLT_DEFAULT_NAMING_MODULE, "snake_case")
DLT_ID_LENGTH_BYTES = int(os.environ.get(known_env.DLT_DLT_ID_LENGTH_BYTES, 10))


def _section_for_schema(kwargs: Dict[str, Any]) -> Tuple[str, ...]:
Expand Down
6 changes: 3 additions & 3 deletions dlt/common/runners/stdout.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ def exec_to_stdout(f: AnyFun) -> Iterator[Any]:
rv = f()
yield rv
except Exception as ex:
print(encode_obj(ex), file=sys.stderr, flush=True)
print(encode_obj(ex), file=sys.stderr, flush=True) # noqa
raise
finally:
if rv is not None:
print(encode_obj(rv), flush=True)
print(encode_obj(rv), flush=True) # noqa


def iter_std(
Expand Down Expand Up @@ -126,6 +126,6 @@ def iter_stdout_with_result(
if isinstance(exception, Exception):
raise exception from cpe
else:
print(cpe.stderr, file=sys.stderr)
sys.stderr.write(cpe.stderr)
# otherwise reraise cpe
raise
2 changes: 1 addition & 1 deletion dlt/common/runtime/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def _log(self, log_level: int, log_message: str) -> None:
if isinstance(self.logger, (logging.Logger, logging.LoggerAdapter)):
self.logger.log(log_level, log_message)
else:
print(log_message, file=self.logger or sys.stdout)
print(log_message, file=self.logger or sys.stdout) # noqa

def _start(self, step: str) -> None:
self.counters = defaultdict(int)
Expand Down
Loading

0 comments on commit 60c7327

Please sign in to comment.