Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

master merge for 0.5.3 release #1682

Merged
merged 10 commits into from
Aug 13, 2024
80 changes: 80 additions & 0 deletions .github/workflows/test_destination_motherduck.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@

name: dest | motherduck

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:
schedule:
- cron: '0 2 * * *'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"motherduck\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"

jobs:
get_docs_changes:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: dest | motherduck tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
run:
shell: bash
runs-on: "ubuntu-latest"

steps:

- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-motherduck

- name: Install dependencies
run: poetry install --no-interaction -E motherduck -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/load -m "essential"
name: Run essential tests Linux
if: ${{ ! (contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule')}}

- run: |
poetry run pytest tests/load
name: Run all tests Linux
if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}}
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ env:
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
# Test redshift and filesystem with all buckets
# postgres runs again here so we can test on mac/windows
ACTIVE_DESTINATIONS: "[\"redshift\", \"postgres\", \"duckdb\", \"filesystem\", \"dummy\", \"motherduck\"]"
ACTIVE_DESTINATIONS: "[\"redshift\", \"postgres\", \"duckdb\", \"filesystem\", \"dummy\"]"

jobs:
get_docs_changes:
Expand Down
185 changes: 127 additions & 58 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dlt.common import logger
from dlt.common.configuration.specs.base_configuration import extract_inner_hint
from dlt.common.destination.utils import verify_schema_capabilities
from dlt.common.exceptions import TerminalValueError
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.utils import (
Expand All @@ -42,6 +43,8 @@
InvalidDestinationReference,
UnknownDestinationModule,
DestinationSchemaTampered,
DestinationTransientException,
DestinationTerminalException,
)
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
Expand Down Expand Up @@ -187,6 +190,8 @@ class DestinationClientDwhConfiguration(DestinationClientConfiguration):
"""How to handle replace disposition for this destination, can be classic or staging"""
staging_dataset_name_layout: str = "%s_staging"
"""Layout for staging dataset, where %s is replaced with dataset name. placeholder is optional"""
enable_dataset_name_normalization: bool = True
"""Whether to normalize the dataset name. Affects staging dataset as well."""

def _bind_dataset_name(
self: TDestinationDwhClient, dataset_name: str, default_schema_name: str = None
Expand All @@ -205,11 +210,14 @@ def normalize_dataset_name(self, schema: Schema) -> str:
If default schema name is None or equals schema.name, the schema suffix is skipped.
"""
dataset_name = self._make_dataset_name(schema.name)
return (
dataset_name
if not dataset_name
else schema.naming.normalize_table_identifier(dataset_name)
)
if not dataset_name:
return dataset_name
else:
return (
schema.naming.normalize_table_identifier(dataset_name)
if self.enable_dataset_name_normalization
else dataset_name
)

def normalize_staging_dataset_name(self, schema: Schema) -> str:
"""Builds staging dataset name out of dataset_name and staging_dataset_name_layout."""
Expand All @@ -224,7 +232,11 @@ def normalize_staging_dataset_name(self, schema: Schema) -> str:
# no placeholder, then layout is a full name. so you can have a single staging dataset
dataset_name = self.staging_dataset_name_layout

return schema.naming.normalize_table_identifier(dataset_name)
return (
schema.naming.normalize_table_identifier(dataset_name)
if self.enable_dataset_name_normalization
else dataset_name
)

def _make_dataset_name(self, schema_name: str) -> str:
if not schema_name:
Expand Down Expand Up @@ -258,11 +270,45 @@ class DestinationClientDwhWithStagingConfiguration(DestinationClientDwhConfigura
"""configuration of the staging, if present, injected at runtime"""


TLoadJobState = Literal["running", "failed", "retry", "completed"]
TLoadJobState = Literal["ready", "running", "failed", "retry", "completed"]


class LoadJob(ABC):
"""
A stateful load job, represents one job file
"""

def __init__(self, file_path: str) -> None:
self._file_path = file_path
self._file_name = FileStorage.get_file_name_from_file_path(file_path)
# NOTE: we only accept a full filepath in the constructor
assert self._file_name != self._file_path
self._parsed_file_name = ParsedLoadJobFileName.parse(self._file_name)

def job_id(self) -> str:
"""The job id that is derived from the file name and does not changes during job lifecycle"""
return self._parsed_file_name.job_id()

def file_name(self) -> str:
"""A name of the job file"""
return self._file_name

def job_file_info(self) -> ParsedLoadJobFileName:
return self._parsed_file_name

@abstractmethod
def state(self) -> TLoadJobState:
"""Returns current state. Should poll external resource if necessary."""
pass

@abstractmethod
def exception(self) -> str:
"""The exception associated with failed or retry states"""
pass


class LoadJob:
"""Represents a job that loads a single file
class RunnableLoadJob(LoadJob, ABC):
"""Represents a runnable job that loads a single file

Each job starts in "running" state and ends in one of terminal states: "retry", "failed" or "completed".
Each job is uniquely identified by a file name. The file is guaranteed to exist in "running" state. In terminal state, the file may not be present.
Expand All @@ -273,75 +319,95 @@ class LoadJob:
immediately transition job into "failed" or "retry" state respectively.
"""

def __init__(self, file_name: str) -> None:
def __init__(self, file_path: str) -> None:
"""
File name is also a job id (or job id is deterministically derived) so it must be globally unique
"""
# ensure file name
assert file_name == FileStorage.get_file_name_from_file_path(file_name)
self._file_name = file_name
self._parsed_file_name = ParsedLoadJobFileName.parse(file_name)
super().__init__(file_path)
self._state: TLoadJobState = "ready"
self._exception: Exception = None

@abstractmethod
def state(self) -> TLoadJobState:
"""Returns current state. Should poll external resource if necessary."""
pass
# variables needed by most jobs, set by the loader in set_run_vars
self._schema: Schema = None
self._load_table: TTableSchema = None
self._load_id: str = None
self._job_client: "JobClientBase" = None

def file_name(self) -> str:
"""A name of the job file"""
return self._file_name
def set_run_vars(self, load_id: str, schema: Schema, load_table: TTableSchema) -> None:
"""
called by the loader right before the job is run
"""
self._load_id = load_id
self._schema = schema
self._load_table = load_table

def job_id(self) -> str:
"""The job id that is derived from the file name and does not changes during job lifecycle"""
return self._parsed_file_name.job_id()
@property
def load_table_name(self) -> str:
return self._load_table["name"]

def job_file_info(self) -> ParsedLoadJobFileName:
return self._parsed_file_name
def run_managed(
self,
job_client: "JobClientBase",
) -> None:
"""
wrapper around the user implemented run method
"""
# only jobs that are not running or have not reached a final state
# may be started
assert self._state in ("ready", "retry")
self._job_client = job_client

# filepath is now moved to running
try:
self._state = "running"
self._job_client.prepare_load_job_execution(self)
self.run()
self._state = "completed"
except (DestinationTerminalException, TerminalValueError) as e:
self._state = "failed"
self._exception = e
except (DestinationTransientException, Exception) as e:
self._state = "retry"
self._exception = e
finally:
# sanity check
assert self._state in ("completed", "retry", "failed")

@abstractmethod
def run(self) -> None:
"""
run the actual job, this will be executed on a thread and should be implemented by the user
exception will be handled outside of this function
"""
raise NotImplementedError()

def state(self) -> TLoadJobState:
"""Returns current state. Should poll external resource if necessary."""
return self._state

def exception(self) -> str:
"""The exception associated with failed or retry states"""
pass
return str(self._exception)


class NewLoadJob(LoadJob):
"""Adds a trait that allows to save new job file"""
class FollowupJob:
"""Base class for follow up jobs that should be created"""

@abstractmethod
def new_file_path(self) -> str:
"""Path to a newly created temporary job file. If empty, no followup job should be created"""
pass


class FollowupJob:
"""Adds a trait that allows to create a followup job"""
class HasFollowupJobs:
"""Adds a trait that allows to create single or table chain followup jobs"""

def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
def create_followup_jobs(self, final_state: TLoadJobState) -> List[FollowupJob]:
"""Return list of new jobs. `final_state` is state to which this job transits"""
return []


class DoNothingJob(LoadJob):
"""The most lazy class of dlt"""

def __init__(self, file_path: str) -> None:
super().__init__(FileStorage.get_file_name_from_file_path(file_path))

def state(self) -> TLoadJobState:
# this job is always done
return "completed"

def exception(self) -> str:
# this part of code should be never reached
raise NotImplementedError()


class DoNothingFollowupJob(DoNothingJob, FollowupJob):
"""The second most lazy class of dlt"""

pass


class JobClientBase(ABC):
def __init__(
self,
Expand Down Expand Up @@ -394,13 +460,16 @@ def update_stored_schema(
return expected_update

@abstractmethod
def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
"""Creates and starts a load job for a particular `table` with content in `file_path`"""
def create_load_job(
self, table: TTableSchema, file_path: str, load_id: str, restore: bool = False
) -> LoadJob:
"""Creates a load job for a particular `table` with content in `file_path`"""
pass

@abstractmethod
def restore_file_load(self, file_path: str) -> LoadJob:
"""Finds and restores already started loading job identified by `file_path` if destination supports it."""
def prepare_load_job_execution( # noqa: B027, optional override
self, job: RunnableLoadJob
) -> None:
"""Prepare the connected job client for the execution of a load job (used for query tags in sql clients)"""
pass

def should_truncate_table_before_load(self, table: TTableSchema) -> bool:
Expand All @@ -410,7 +479,7 @@ def create_table_chain_completed_followup_jobs(
self,
table_chain: Sequence[TTableSchema],
completed_table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None,
) -> List[NewLoadJob]:
) -> List[FollowupJob]:
"""Creates a list of followup jobs that should be executed after a table chain is completed"""
return []

Expand Down
Loading
Loading