Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mark function to emit resource hints from decorated function #938

Merged
merged 3 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dlt/common/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ def unset_current_pipe_name() -> None:


def get_current_pipe_name() -> str:
"""Gets pipe name associated with current thread"""
"""When executed from withing dlt.resource decorated function, gets pipe name associated with current thread.

Pipe name is the same as resource name for all currently known cases. In some multithreading cases, pipe name may be not available.
"""
name = _CURRENT_PIPE_NAME.get(threading.get_ident())
if name is None:
raise ResourceNameNotAvailable()
Expand Down
5 changes: 4 additions & 1 deletion dlt/extract/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dlt.extract.resource import DltResource, with_table_name
from dlt.extract.resource import DltResource, with_table_name, with_hints
from dlt.extract.hints import make_hints
from dlt.extract.source import DltSource
from dlt.extract.decorators import source, resource, transformer, defer
from dlt.extract.incremental import Incremental
Expand All @@ -8,6 +9,8 @@
"DltResource",
"DltSource",
"with_table_name",
"with_hints",
"make_hints",
"source",
"resource",
"transformer",
Expand Down
31 changes: 27 additions & 4 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
TSchemaContract,
TTableFormat,
)
from dlt.extract.hints import make_hints
from dlt.extract.utils import (
ensure_table_schema_columns_hint,
simulate_func_call,
Expand All @@ -48,6 +49,7 @@
from dlt.common.typing import AnyFun, ParamSpec, Concatenate, TDataItem, TDataItems
from dlt.common.utils import get_callable_name, get_module_name, is_inner_callable
from dlt.extract.exceptions import (
CurrentSourceNotAvailable,
DynamicNameNotStandaloneResource,
InvalidTransformerDataTypeGeneratorFunctionRequired,
ResourceFunctionExpected,
Expand All @@ -56,7 +58,7 @@
SourceIsAClassTypeError,
ExplicitSourceNameInvalid,
SourceNotAFunction,
SourceSchemaNotAvailable,
CurrentSourceSchemaNotAvailable,
)
from dlt.extract.incremental import IncrementalResourceWrapper

Expand All @@ -67,7 +69,7 @@

@configspec
class SourceSchemaInjectableContext(ContainerInjectableContext):
"""A context containing the source schema, present when decorated function is executed"""
"""A context containing the source schema, present when dlt.source/resource decorated function is executed"""

schema: Schema

Expand All @@ -78,6 +80,19 @@ class SourceSchemaInjectableContext(ContainerInjectableContext):
def __init__(self, schema: Schema = None) -> None: ...


@configspec
class SourceInjectableContext(ContainerInjectableContext):
"""A context containing the source schema, present when dlt.resource decorated function is executed"""

source: DltSource

can_create_default: ClassVar[bool] = False

if TYPE_CHECKING:

def __init__(self, source: DltSource = None) -> None: ...


TSourceFunParams = ParamSpec("TSourceFunParams")
TResourceFunParams = ParamSpec("TResourceFunParams")
TDltSourceImpl = TypeVar("TDltSourceImpl", bound=DltSource, default=DltSource)
Expand Down Expand Up @@ -395,7 +410,7 @@ def resource(
def make_resource(
_name: str, _section: str, _data: Any, incremental: IncrementalResourceWrapper = None
) -> DltResource:
table_template = DltResource.new_table_template(
table_template = make_hints(
table_name,
write_disposition=write_disposition,
columns=columns,
Expand Down Expand Up @@ -694,7 +709,15 @@ def get_source_schema() -> Schema:
try:
return Container()[SourceSchemaInjectableContext].schema
except ContextDefaultCannotBeCreated:
raise SourceSchemaNotAvailable()
raise CurrentSourceSchemaNotAvailable()


def get_source() -> DltSource:
"""When executed from the function decorated with @dlt.resource, returns currently extracted source"""
try:
return Container()[SourceInjectableContext].source
except ContextDefaultCannotBeCreated:
raise CurrentSourceNotAvailable()


TBoundItems = TypeVar("TBoundItems", bound=TDataItems)
Expand Down
10 changes: 9 additions & 1 deletion dlt/extract/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,22 @@ def __init__(self, source_name: str, _typ: Type[Any]) -> None:
)


class SourceSchemaNotAvailable(DltSourceException):
class CurrentSourceSchemaNotAvailable(DltSourceException):
def __init__(self) -> None:
super().__init__(
"Current source schema is available only when called from a function decorated with"
" dlt.source or dlt.resource"
)


class CurrentSourceNotAvailable(DltSourceException):
def __init__(self) -> None:
super().__init__(
"Current source is available only when called from a function decorated with"
" dlt.resource or dlt.transformer during the extract step"
)


class ExplicitSourceNameInvalid(DltSourceException):
def __init__(self, source_name: str, schema_name: str) -> None:
self.source_name = source_name
Expand Down
6 changes: 4 additions & 2 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from dlt.common.storages.load_package import ParsedLoadJobFileName
from dlt.common.utils import get_callable_name, get_full_class_name

from dlt.extract.decorators import SourceSchemaInjectableContext
from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext
from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints
from dlt.extract.pipe import PipeIterator
from dlt.extract.source import DltSource
Expand Down Expand Up @@ -322,7 +322,9 @@ def extract(
) -> str:
# generate load package to be able to commit all the sources together later
load_id = self.extract_storage.create_load_package(source.discover_schema())
with Container().injectable_context(SourceSchemaInjectableContext(source.schema)):
with Container().injectable_context(
SourceSchemaInjectableContext(source.schema)
), Container().injectable_context(SourceInjectableContext(source)):
# inject the config section with the current source name
with inject_section(
ConfigSectionContext(
Expand Down
16 changes: 14 additions & 2 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
TTableSchemaColumns,
TPartialTableSchema,
)

from dlt.extract.hints import HintsMeta
from dlt.extract.resource import DltResource
from dlt.extract.typing import TableNameMeta
from dlt.extract.storage import ExtractStorage, ExtractorItemStorage
Expand Down Expand Up @@ -85,6 +85,12 @@ def item_format(items: TDataItems) -> Optional[TLoaderFileFormat]:

def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> None:
"""Write `items` to `resource` optionally computing table schemas and revalidating/filtering data"""
if isinstance(meta, HintsMeta):
# update the resource with new hints, remove all caches so schema is recomputed
# and contracts re-applied
resource.merge_hints(meta.hints)
self._reset_contracts_cache()

if table_name := self._get_static_table_name(resource, meta):
# write item belonging to table with static name
self._write_to_static_table(resource, table_name, items)
Expand Down Expand Up @@ -152,7 +158,7 @@ def _compute_and_update_table(
self, resource: DltResource, table_name: str, items: TDataItems
) -> TDataItems:
"""
Computes new table and does contract checks, if false is returned, the table may not be created and not items should be written
Computes new table and does contract checks, if false is returned, the table may not be created and no items should be written
"""
computed_table = self._compute_table(resource, items)
# overwrite table name (if coming from meta)
Expand Down Expand Up @@ -190,6 +196,12 @@ def _compute_and_update_table(
filtered_columns[name] = mode
return items

def _reset_contracts_cache(self) -> None:
"""Removes all cached contracts, filtered columns and tables"""
self._table_contracts.clear()
self._filtered_tables.clear()
self._filtered_columns.clear()


class JsonLExtractor(Extractor):
file_format = "puae-jsonl"
Expand Down
Loading
Loading