From 5afa6ce90b5c0cc94bd8fcf32b73954075251c4b Mon Sep 17 00:00:00 2001 From: rudolfix Date: Mon, 23 Dec 2024 11:39:56 +0100 Subject: [PATCH] #2020 #2151 - fixes dbt 1.8.6 and arrow dict types (#2175) * fix links (#1977) * correctly converts dict arrow types into dlt types * drops dbt compat code for versions below 1.5 * ignores encoding errors when reading from process pipe --------- Co-authored-by: adrianbr --- dlt/common/libs/pyarrow.py | 4 +++ dlt/common/runners/stdout.py | 16 ++++++++-- dlt/common/runners/venv.py | 16 +++++++--- dlt/helpers/dbt/dbt_utils.py | 32 ++++++------------- .../docs/general-usage/credentials/index.md | 4 +-- .../dbt_tests/test_runner_dbt_versions.py | 4 +-- tests/libs/pyarrow/test_pyarrow.py | 12 +++++++ 7 files changed, 56 insertions(+), 32 deletions(-) diff --git a/dlt/common/libs/pyarrow.py b/dlt/common/libs/pyarrow.py index 029cd75399..255fcd344e 100644 --- a/dlt/common/libs/pyarrow.py +++ b/dlt/common/libs/pyarrow.py @@ -183,6 +183,10 @@ def get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType: return dict(data_type="decimal", precision=dtype.precision, scale=dtype.scale) elif pyarrow.types.is_nested(dtype): return dict(data_type="json") + elif pyarrow.types.is_dictionary(dtype): + # Dictionary types are essentially categorical encodings. The underlying value_type + # dictates the "logical" type. We simply delegate to the underlying value_type. + return get_column_type_from_py_arrow(dtype.value_type) else: raise ValueError(dtype) diff --git a/dlt/common/runners/stdout.py b/dlt/common/runners/stdout.py index bb5251764c..adf1fdc0d3 100644 --- a/dlt/common/runners/stdout.py +++ b/dlt/common/runners/stdout.py @@ -38,7 +38,13 @@ def iter_std( Use -u in scripts_args for unbuffered python execution """ with venv.start_command( - command, *script_args, stdout=PIPE, stderr=PIPE, bufsize=1, text=True + command, + *script_args, + stdout=PIPE, + stderr=PIPE, + bufsize=1, + text=True, + errors="backslashreplace", ) as process: exit_code: int = None q_: queue.Queue[Tuple[OutputStdStreamNo, str]] = queue.Queue() @@ -72,7 +78,13 @@ def _r_q(std_: OutputStdStreamNo) -> None: def iter_stdout(venv: Venv, command: str, *script_args: Any) -> Iterator[str]: # start a process in virtual environment, assume that text comes from stdout with venv.start_command( - command, *script_args, stdout=PIPE, stderr=PIPE, bufsize=1, text=True + command, + *script_args, + stdout=PIPE, + stderr=PIPE, + bufsize=1, + text=True, + errors="backslashreplace", ) as process: exit_code: int = None line = "" diff --git a/dlt/common/runners/venv.py b/dlt/common/runners/venv.py index ad6448dd2c..eb12970e97 100644 --- a/dlt/common/runners/venv.py +++ b/dlt/common/runners/venv.py @@ -98,14 +98,18 @@ def run_command(self, entry_point: str, *script_args: Any) -> str: # runs one of installed entry points typically CLIs coming with packages and installed into PATH command = os.path.join(self.context.bin_path, entry_point) cmd = [command, *script_args] - return subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True) + return subprocess.check_output( + cmd, stderr=subprocess.STDOUT, text=True, errors="backslashreplace" + ) def run_script(self, script_path: str, *script_args: Any) -> str: """Runs a python `script` source with specified `script_args`. Current `os.environ` and cwd is passed to executed process""" # os.environ is passed to executed process cmd = [self.context.env_exe, os.path.abspath(script_path), *script_args] try: - return subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True) + return subprocess.check_output( + cmd, stderr=subprocess.STDOUT, text=True, errors="backslashreplace" + ) except subprocess.CalledProcessError as cpe: if cpe.returncode == 2: raise FileNotFoundError(script_path) @@ -115,7 +119,9 @@ def run_script(self, script_path: str, *script_args: Any) -> str: def run_module(self, module: str, *module_args: Any) -> str: """Runs a python `module` with specified `module_args`. Current `os.environ` and cwd is passed to executed process""" cmd = [self.context.env_exe, "-m", module, *module_args] - return subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True) + return subprocess.check_output( + cmd, stderr=subprocess.STDOUT, text=True, errors="backslashreplace" + ) def add_dependencies(self, dependencies: List[str] = None) -> None: Venv._install_deps(self.context, dependencies) @@ -134,7 +140,9 @@ def _install_deps(context: types.SimpleNamespace, dependencies: List[str]) -> No cmd = [context.env_exe, "-Im", Venv.PIP_TOOL, "install"] try: - subprocess.check_output(cmd + dependencies, stderr=subprocess.STDOUT) + subprocess.check_output( + cmd + dependencies, stderr=subprocess.STDOUT, errors="backslashreplace" + ) except subprocess.CalledProcessError as exc: raise CannotInstallDependencies(dependencies, context.env_exe, exc.output) diff --git a/dlt/helpers/dbt/dbt_utils.py b/dlt/helpers/dbt/dbt_utils.py index 80774d9858..7654f372e7 100644 --- a/dlt/helpers/dbt/dbt_utils.py +++ b/dlt/helpers/dbt/dbt_utils.py @@ -22,23 +22,16 @@ # can only import DBT after redirect is disabled # https://stackoverflow.com/questions/48619517/call-a-click-command-from-code +except ImportError: + pass +try: import dbt.logger from dbt.contracts import results as dbt_results -except ModuleNotFoundError: - raise MissingDependencyException("DBT Core", ["dbt-core"]) - -try: - # dbt <1.5 - from dbt.main import handle_and_check # type: ignore[import-not-found] -except ImportError: - # dbt >=1.5 from dbt.cli.main import dbtRunner - -try: - from dbt.exceptions import FailFastException # type: ignore + from dbt.exceptions import FailFastError except ImportError: - from dbt.exceptions import FailFastError as FailFastException + raise MissingDependencyException("DBT Core", ["dbt-core"]) _DBT_LOGGER_INITIALIZED = False @@ -135,15 +128,10 @@ def run_dbt_command( runner_args = (global_args or []) + [command] + args # type: ignore with dbt.logger.log_manager.applicationbound(): - try: - # dbt 1.5 - runner = dbtRunner() - run_result = runner.invoke(runner_args) - success = run_result.success - results = run_result.result # type: ignore - except NameError: - # dbt < 1.5 - results, success = handle_and_check(runner_args) + runner = dbtRunner() + run_result = runner.invoke(runner_args) + success = run_result.success + results = run_result.result # type: ignore assert type(success) is bool parsed_results = parse_dbt_execution_results(results) @@ -157,7 +145,7 @@ def run_dbt_command( except SystemExit as sys_ex: # oftentimes dbt tries to exit on error raise DBTProcessingError(command, None, sys_ex) - except FailFastException as ff: + except FailFastError as ff: dbt_exc = DBTProcessingError(command, parse_dbt_execution_results(ff.result), ff.result) # detect incremental model out of sync if is_incremental_schema_out_of_sync_error(ff.result): diff --git a/docs/website/docs/general-usage/credentials/index.md b/docs/website/docs/general-usage/credentials/index.md index 95e0ec36ac..8d1ac1e834 100644 --- a/docs/website/docs/general-usage/credentials/index.md +++ b/docs/website/docs/general-usage/credentials/index.md @@ -5,13 +5,13 @@ keywords: [credentials, secrets.toml, secrets, config, configuration, environmen --- import DocCardList from '@theme/DocCardList'; -`dlt` pipelines usually require configurations and credentials. These can be set up in [various ways](setup): +`dlt` pipelines usually require configurations and credentials. These can be set up in [various ways](./setup): 1. Environment variables 2. Configuration files (`secrets.toml` and `config.toml`) 3. Key managers and vaults -`dlt` automatically extracts configuration settings and secrets based on flexible [naming conventions](setup/#naming-convention). It then [injects](advanced/#injection-mechanism) these values where needed in code. +`dlt` automatically extracts configuration settings and secrets based on flexible [naming conventions](./setup/#naming-convention). It then [injects](./advanced/#injection-mechanism) these values where needed in code. # Learn details about diff --git a/tests/helpers/dbt_tests/test_runner_dbt_versions.py b/tests/helpers/dbt_tests/test_runner_dbt_versions.py index 67908e176c..07e4b70d4e 100644 --- a/tests/helpers/dbt_tests/test_runner_dbt_versions.py +++ b/tests/helpers/dbt_tests/test_runner_dbt_versions.py @@ -45,11 +45,11 @@ def client() -> Iterator[PostgresClient]: PACKAGE_PARAMS = [ ("postgres", "1.5.2"), ("postgres", "1.6.13"), - ("postgres", "1.8.1"), + ("postgres", "1.8.6"), ("postgres", None), ("snowflake", "1.5.2"), ("snowflake", "1.6.13"), - ("snowflake", "1.8.1"), + ("snowflake", "1.8.6"), ("snowflake", None), ] PACKAGE_IDS = [ diff --git a/tests/libs/pyarrow/test_pyarrow.py b/tests/libs/pyarrow/test_pyarrow.py index f81b3d1b99..07e8d3428d 100644 --- a/tests/libs/pyarrow/test_pyarrow.py +++ b/tests/libs/pyarrow/test_pyarrow.py @@ -66,6 +66,18 @@ def test_py_arrow_to_table_schema_columns(): assert result == dlt_schema +def test_py_arrow_dict_to_column() -> None: + array_1 = pa.array(["a", "b", "c"], type=pa.dictionary(pa.int8(), pa.string())) + array_2 = pa.array([1, 2, 3], type=pa.dictionary(pa.int8(), pa.int64())) + table = pa.table({"strings": array_1, "ints": array_2}) + columns = py_arrow_to_table_schema_columns(table.schema) + assert columns == { + "strings": {"name": "strings", "nullable": True, "data_type": "text"}, + "ints": {"name": "ints", "nullable": True, "data_type": "bigint"}, + } + assert table.to_pydict() == {"strings": ["a", "b", "c"], "ints": [1, 2, 3]} + + def test_to_arrow_scalar() -> None: naive_dt = get_py_arrow_timestamp(6, tz=None) # print(naive_dt)