Skip to content

Commit

Permalink
Merge branch 'devel' into default_write
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaFaer committed Mar 6, 2024
2 parents afbf4ec + 3761335 commit 985b83d
Show file tree
Hide file tree
Showing 62 changed files with 1,446 additions and 529 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ jobs:
run: poetry install --no-interaction -E duckdb --with sentry-sdk

- run: |
poetry run pytest tests/pipeline/test_pipeline.py
poetry run pytest tests/pipeline/test_pipeline.py tests/pipeline/test_import_export_schema.py
if: runner.os != 'Windows'
name: Run pipeline smoke tests with minimum deps Linux/MAC
- run: |
poetry run pytest tests/pipeline/test_pipeline.py
poetry run pytest tests/pipeline/test_pipeline.py tests/pipeline/test_import_export_schema.py
if: runner.os == 'Windows'
name: Run smoke tests with minimum deps Windows
shell: cmd
Expand Down
18 changes: 12 additions & 6 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,19 @@ def finished_at(self) -> datetime.datetime:

def asdict(self) -> DictStrAny:
# to be mixed with NamedTuple
d: DictStrAny = self._asdict() # type: ignore
d["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name}
d["load_packages"] = [package.asdict() for package in self.load_packages]
step_info: DictStrAny = self._asdict() # type: ignore
step_info["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name}
step_info["load_packages"] = [package.asdict() for package in self.load_packages]
if self.metrics:
d["started_at"] = self.started_at
d["finished_at"] = self.finished_at
return d
step_info["started_at"] = self.started_at
step_info["finished_at"] = self.finished_at
all_metrics = []
for load_id, metrics in step_info["metrics"].items():
for metric in metrics:
all_metrics.append({**dict(metric), "load_id": load_id})

step_info["metrics"] = all_metrics
return step_info

def __str__(self) -> str:
return self.asstr(verbosity=0)
Expand Down
6 changes: 4 additions & 2 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ def replace_schema_content(
if link_to_replaced_schema:
replaced_version_hash = self.stored_version_hash
assert replaced_version_hash is not None
utils.store_prev_hash(stored_schema, replaced_version_hash)
stored_schema["version_hash"] = replaced_version_hash
# do not store hash if the replaced schema is identical
if stored_schema["version_hash"] != replaced_version_hash:
utils.store_prev_hash(stored_schema, replaced_version_hash)
stored_schema["version_hash"] = replaced_version_hash
self._reset_schema(schema.name, schema._normalizers_config)
self._from_stored_schema(stored_schema)

Expand Down
2 changes: 1 addition & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from dlt.common.data_types import TDataType
from dlt.common.normalizers.typing import TNormalizersConfig
from dlt.common.typing import TSortOrder

try:
from pydantic import BaseModel as _PydanticBaseModel
Expand Down Expand Up @@ -71,7 +72,6 @@
TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]]
TColumnNames = Union[str, Sequence[str]]
"""A string representing a column name or a list of"""
TSortOrder = Literal["asc", "desc"]

COLUMN_PROPS: Set[TColumnProp] = set(get_args(TColumnProp))
COLUMN_HINTS: Set[TColumnHint] = set(
Expand Down
22 changes: 17 additions & 5 deletions dlt/common/storages/live_schema_storage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Dict, List
from typing import Dict, List, cast

from dlt.common.schema.schema import Schema
from dlt.common.configuration.accessors import config
from dlt.common.storages.exceptions import SchemaNotFoundError
from dlt.common.storages.schema_storage import SchemaStorage
from dlt.common.storages.configuration import SchemaStorageConfiguration

Expand All @@ -23,10 +24,10 @@ def __getitem__(self, name: str) -> Schema:

return schema

def load_schema(self, name: str) -> Schema:
self.commit_live_schema(name)
# now live schema is saved so we can load it with the changes
return super().load_schema(name)
# def load_schema(self, name: str) -> Schema:
# self.commit_live_schema(name)
# # now live schema is saved so we can load it with the changes
# return super().load_schema(name)

def save_schema(self, schema: Schema) -> str:
rv = super().save_schema(schema)
Expand Down Expand Up @@ -55,6 +56,17 @@ def commit_live_schema(self, name: str) -> Schema:
self._save_schema(live_schema)
return live_schema

def is_live_schema_committed(self, name: str) -> bool:
"""Checks if live schema is present in storage and have same hash"""
live_schema = self.live_schemas.get(name)
if live_schema is None:
raise SchemaNotFoundError(name, f"live-schema://{name}")
try:
stored_schema_json = self._load_schema_json(name)
return live_schema.version_hash == cast(str, stored_schema_json.get("version_hash"))
except FileNotFoundError:
return False

def update_live_schema(self, schema: Schema, can_create_new: bool = True) -> None:
"""Will update live schema content without writing to storage. Optionally allows to create a new live schema"""
live_schema = self.live_schemas.get(schema.name)
Expand Down
9 changes: 6 additions & 3 deletions dlt/common/storages/schema_storage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import yaml
from typing import Iterator, List, Mapping, Tuple
from typing import Iterator, List, Mapping, Tuple, cast

from dlt.common import json, logger
from dlt.common.configuration import with_config
Expand Down Expand Up @@ -31,12 +31,15 @@ def __init__(
self.config = config
self.storage = FileStorage(config.schema_volume_path, makedirs=makedirs)

def _load_schema_json(self, name: str) -> DictStrAny:
schema_file = self._file_name_in_store(name, "json")
return cast(DictStrAny, json.loads(self.storage.load(schema_file)))

def load_schema(self, name: str) -> Schema:
# loads a schema from a store holding many schemas
schema_file = self._file_name_in_store(name, "json")
storage_schema: DictStrAny = None
try:
storage_schema = json.loads(self.storage.load(schema_file))
storage_schema = self._load_schema_json(name)
# prevent external modifications of schemas kept in storage
if not verify_schema_hash(storage_schema, verifies_if_not_migrated=True):
raise InStorageSchemaModified(name, self.config.schema_volume_path)
Expand Down
1 change: 1 addition & 0 deletions dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
TVariantRV = Tuple[str, Any]
VARIANT_FIELD_FORMAT = "v_%s"
TFileOrPath = Union[str, os.PathLike, IO[Any]]
TSortOrder = Literal["asc", "desc"]


@runtime_checkable
Expand Down
15 changes: 12 additions & 3 deletions dlt/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,18 @@
RowCounts = Dict[str, int]


def chunks(seq: Sequence[T], n: int) -> Iterator[Sequence[T]]:
for i in range(0, len(seq), n):
yield seq[i : i + n]
def chunks(iterable: Iterable[T], n: int) -> Iterator[Sequence[T]]:
it = iter(iterable)
while True:
chunk = list()
try:
for _ in range(n):
chunk.append(next(it))
except StopIteration:
if chunk:
yield chunk
break
yield chunk


def uniq_id(len_: int = 16) -> str:
Expand Down
8 changes: 6 additions & 2 deletions dlt/common/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ def verify_prop(pk: str, pv: Any, t: Any) -> None:
has_passed = True
if not has_passed:
type_names = [
str(get_args(ut)) if is_literal_type(ut) else ut.__name__
(
str(get_args(ut))
if is_literal_type(ut)
else getattr(ut, "__name__", str(ut))
)
for ut in union_types
]
raise DictValidationException(
Expand Down Expand Up @@ -162,7 +166,7 @@ def verify_prop(pk: str, pv: Any, t: Any) -> None:
if not validator_f(path, pk, pv, t):
# TODO: when Python 3.9 and earlier support is
# dropped, just __name__ can be used
type_name = getattr(t, "__name__", t.__class__)
type_name = getattr(t, "__name__", str(t))
raise DictValidationException(
f"In {path}: field {pk} has expected type {type_name} which lacks validator",
path,
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/bigquery/bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
)
from dlt.destinations.utils import ensure_resource
from dlt.extract import DltResource
from dlt.extract.typing import TTableHintTemplate
from dlt.extract.items import TTableHintTemplate


PARTITION_HINT: Literal["x-bigquery-partition"] = "x-bigquery-partition"
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/synapse/synapse_adapter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Literal, Set, get_args, Final, Dict

from dlt.extract import DltResource, resource as make_resource
from dlt.extract.typing import TTableHintTemplate
from dlt.extract.items import TTableHintTemplate
from dlt.extract.hints import TResourceHints
from dlt.destinations.utils import ensure_resource

Expand Down
3 changes: 1 addition & 2 deletions dlt/extract/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
from dlt.common.exceptions import PipelineException
from dlt.common.configuration.container import Container
from dlt.common.runtime.signals import sleep
from dlt.extract.typing import DataItemWithMeta, TItemFuture
from dlt.extract.items import ResolvablePipeItem, FuturePipeItem
from dlt.extract.items import DataItemWithMeta, TItemFuture, ResolvablePipeItem, FuturePipeItem

from dlt.extract.exceptions import (
DltSourceException,
Expand Down
2 changes: 1 addition & 1 deletion dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
)
from dlt.extract.incremental import IncrementalResourceWrapper

from dlt.extract.typing import TTableHintTemplate
from dlt.extract.items import TTableHintTemplate
from dlt.extract.source import DltSource
from dlt.extract.resource import DltResource, TUnboundDltResource

Expand Down
15 changes: 13 additions & 2 deletions dlt/extract/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from inspect import Signature, isgenerator
from inspect import Signature, isgenerator, isgeneratorfunction, unwrap
from typing import Any, Set, Type

from dlt.common.exceptions import DltException
from dlt.common.utils import get_callable_name
from dlt.extract.typing import ValidateItem, TDataItems
from dlt.extract.items import ValidateItem, TDataItems


class ExtractorException(DltException):
Expand Down Expand Up @@ -101,6 +101,17 @@ def __init__(self, pipe_name: str, gen: Any) -> None:
super().__init__(pipe_name, msg)


class UnclosablePipe(PipeException):
def __init__(self, pipe_name: str, gen: Any) -> None:
type_name = str(type(gen))
if gen_name := getattr(gen, "__name__", None):
type_name = f"{type_name} ({gen_name})"
msg = f"Pipe with gen of type {type_name} cannot be closed."
if callable(gen) and isgeneratorfunction(unwrap(gen)):
msg += " Closing of partially evaluated transformers is not yet supported."
super().__init__(pipe_name, msg)


class ResourceNameMissing(DltResourceException):
def __init__(self) -> None:
super().__init__(
Expand Down
15 changes: 12 additions & 3 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import contextlib
from collections.abc import Sequence as C_Sequence
from datetime import datetime # noqa: 251
from copy import copy
import itertools
from typing import List, Set, Dict, Optional, Set, Any
import yaml
Expand Down Expand Up @@ -33,6 +33,7 @@

from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext
from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints
from dlt.extract.incremental import IncrementalResourceWrapper
from dlt.extract.pipe_iterator import PipeIterator
from dlt.extract.source import DltSource
from dlt.extract.resource import DltResource
Expand Down Expand Up @@ -75,6 +76,8 @@ def choose_schema() -> Schema:
"""Except of explicitly passed schema, use a clone that will get discarded if extraction fails"""
if schema:
schema_ = schema
# TODO: We should start with a new schema of the same name here ideally, but many tests fail
# because of this. So some investigation is needed.
elif pipeline.default_schema_name:
schema_ = pipeline.schemas[pipeline.default_schema_name].clone()
else:
Expand Down Expand Up @@ -200,14 +203,20 @@ def _compute_metrics(self, load_id: str, source: DltSource) -> ExtractMetrics:
for resource in source.selected_resources.values():
# cleanup the hints
hints = clean_hints[resource.name] = {}
resource_hints = resource._hints or resource.compute_table_schema()
resource_hints = copy(resource._hints) or resource.compute_table_schema()
if resource.incremental and "incremental" not in resource_hints:
resource_hints["incremental"] = resource.incremental # type: ignore

for name, hint in resource_hints.items():
if hint is None or name in ["validator"]:
continue
if name == "incremental":
# represent incremental as dictionary (it derives from BaseConfiguration)
hints[name] = dict(hint) # type: ignore[call-overload]
if isinstance(hint, IncrementalResourceWrapper):
hint = hint._incremental
# sometimes internal incremental is not bound
if hint:
hints[name] = dict(hint) # type: ignore[call-overload]
continue
if name == "original_columns":
# this is original type of the columns ie. Pydantic model
Expand Down
2 changes: 1 addition & 1 deletion dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from dlt.extract.hints import HintsMeta
from dlt.extract.resource import DltResource
from dlt.extract.typing import TableNameMeta
from dlt.extract.items import TableNameMeta
from dlt.extract.storage import ExtractStorage, ExtractorItemStorage

try:
Expand Down
2 changes: 1 addition & 1 deletion dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
InconsistentTableTemplate,
)
from dlt.extract.incremental import Incremental
from dlt.extract.typing import TFunHintTemplate, TTableHintTemplate, ValidateItem
from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, ValidateItem
from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint
from dlt.extract.validation import create_item_validator

Expand Down
Loading

0 comments on commit 985b83d

Please sign in to comment.