Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Open Source ClickHouse Deployments #1496

Merged
merged 42 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
56b3818
Default to for both on-prem and cloud
Pipboyguy Jun 19, 2024
e4e1010
Add documentation for new engine family types
Pipboyguy Jun 19, 2024
d998ac9
Typo
Pipboyguy Jun 19, 2024
825982b
Merge branch 'refs/heads/devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jun 20, 2024
4912173
Minor doc changes
Pipboyguy Jun 20, 2024
e1ab71d
Fix local clickhouse deployment timestamp parsing issue with simple c…
Pipboyguy Jun 20, 2024
db603a5
Merge branch 'devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jun 25, 2024
8f80391
Extend support for local deployment time types
Pipboyguy Jun 25, 2024
544925a
Adapt test to check whether CH OSS or cloud
Pipboyguy Jun 25, 2024
b2a3596
Defend against CH OSS unsupported dbapi datetime parsing
Pipboyguy Jun 26, 2024
e2a5c4a
Fix typo
Pipboyguy Jun 26, 2024
9ef12c1
Add ClickHouse to local destination tests
Pipboyguy Jun 26, 2024
2388cd6
Update ClickHouse test workflow and remove engine types
Pipboyguy Jun 26, 2024
b0e3751
Use Python 3.10.x for ClickHouse destination tests
Pipboyguy Jun 27, 2024
d207686
Merge branch 'devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jun 27, 2024
f077346
Add ClickHouse MergeTree support and refactor code
Pipboyguy Jun 27, 2024
6e44a71
Update ClickHouse Docker setup and test workflow
Pipboyguy Jun 27, 2024
61bae45
Refactor ClickHouse tests to cover both OSS and Cloud versions
Pipboyguy Jun 27, 2024
6d50c51
Disable SSL for ClickHouse OSS tests
Pipboyguy Jun 27, 2024
fc46384
Merge branch 'devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jun 28, 2024
e1ac1ce
Use state instead of sentinel tables
Pipboyguy Jun 28, 2024
409487c
Remove mention of sentinel table for ClickHouse datasets
Pipboyguy Jun 28, 2024
173610a
Merge branch 'refs/heads/devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jul 1, 2024
07c8f49
Refactor ClickHouse deployment type detection
Pipboyguy Jul 1, 2024
aff6c04
Add conditional execution for ClickHouse OSS tests
Pipboyguy Jul 1, 2024
cbc782c
Update ClickHouse compose file path and move to tests directory
Pipboyguy Jul 1, 2024
c408824
Update ClickHouse docker-compose file path in test workflow
Pipboyguy Jul 1, 2024
0d306a5
Cast client to ClickHouseSqlClient in get_deployment_type call
Pipboyguy Jul 1, 2024
d6913cc
Merge branch 'refs/heads/devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jul 4, 2024
28866ee
Revert "Remove mention of sentinel table for ClickHouse datasets"
Pipboyguy Jul 4, 2024
41a7e1a
Revert "Use state instead of sentinel tables"
Pipboyguy Jul 4, 2024
bc7b309
Add tests for ClickHouse table engine configuration and adapter overr…
Pipboyguy Jul 4, 2024
98d2329
Merge branch 'refs/heads/devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jul 4, 2024
f852d44
Add configurable default table engine type for ClickHouse
Pipboyguy Jul 4, 2024
b0cec6e
Docs
Pipboyguy Jul 4, 2024
d764734
Merge branch 'refs/heads/devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jul 12, 2024
b2bc5b1
Fix comments
Pipboyguy Jul 12, 2024
8d26ef1
Add ClickHouse typing module for improved type handling
Pipboyguy Jul 12, 2024
9c67c65
Merge branch 'refs/heads/devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jul 12, 2024
3028cf4
Move ClickHouse configuration options from credentials to client conf…
Pipboyguy Jul 12, 2024
8f60436
Move table_engine_type from credentials to client configuration
Pipboyguy Jul 12, 2024
477e815
Docs
Pipboyguy Jul 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions .github/workflows/test_destination_clickhouse.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

name: test | clickhouse

on:
Expand All @@ -8,7 +7,7 @@ on:
- devel
workflow_dispatch:
schedule:
- cron: '0 2 * * *'
- cron: '0 2 * * *'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
Expand All @@ -20,7 +19,7 @@ env:
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

ACTIVE_DESTINATIONS: "[\"clickhouse\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]"

jobs:
get_docs_changes:
Expand Down Expand Up @@ -67,12 +66,51 @@ jobs:
- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

# OSS ClickHouse
- run: |
docker-compose -f "tests/load/clickhouse/clickhouse-compose.yml" up -d
echo "Waiting for ClickHouse to be healthy..."
timeout 30s bash -c 'until docker-compose -f "tests/load/clickhouse/clickhouse-compose.yml" ps | grep -q "healthy"; do sleep 1; done'
echo "ClickHouse is up and running"
name: Start ClickHouse OSS


- run: poetry run pytest tests/load -m "essential"
name: Run essential tests Linux (ClickHouse OSS)
Pipboyguy marked this conversation as resolved.
Show resolved Hide resolved
if: ${{ ! (contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule')}}
env:
DESTINATION__CLICKHOUSE__CREDENTIALS__HOST: localhost
DESTINATION__CLICKHOUSE__CREDENTIALS__DATABASE: dlt_data
DESTINATION__CLICKHOUSE__CREDENTIALS__USERNAME: loader
DESTINATION__CLICKHOUSE__CREDENTIALS__PASSWORD: loader
DESTINATION__CLICKHOUSE__CREDENTIALS__PORT: 9000
DESTINATION__CLICKHOUSE__CREDENTIALS__HTTP_PORT: 8123
DESTINATION__CLICKHOUSE__CREDENTIALS__SECURE: 0

- run: poetry run pytest tests/load
name: Run all tests Linux (ClickHouse OSS)
if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}}
env:
DESTINATION__CLICKHOUSE__CREDENTIALS__HOST: localhost
DESTINATION__CLICKHOUSE__CREDENTIALS__DATABASE: dlt_data
DESTINATION__CLICKHOUSE__CREDENTIALS__USERNAME: loader
DESTINATION__CLICKHOUSE__CREDENTIALS__PASSWORD: loader
DESTINATION__CLICKHOUSE__CREDENTIALS__PORT: 9000
DESTINATION__CLICKHOUSE__CREDENTIALS__HTTP_PORT: 8123
DESTINATION__CLICKHOUSE__CREDENTIALS__SECURE: 0

- name: Stop ClickHouse OSS
if: always()
run: docker-compose -f "tests/load/clickhouse/clickhouse-compose.yml" down -v

# ClickHouse Cloud
- run: |
poetry run pytest tests/load -m "essential"
name: Run essential tests Linux
name: Run essential tests Linux (ClickHouse Cloud)
if: ${{ ! (contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule')}}

- run: |
poetry run pytest tests/load
name: Run all tests Linux
name: Run all tests Linux (ClickHouse Cloud)
if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}}

80 changes: 38 additions & 42 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@
import re
from copy import deepcopy
from textwrap import dedent
from typing import ClassVar, Optional, Dict, List, Sequence, cast, Tuple
from typing import Optional, List, Sequence, cast
from urllib.parse import urlparse

import clickhouse_connect
from clickhouse_connect.driver.tools import insert_file

import dlt
from dlt import config
from dlt.common.configuration.specs import (
CredentialsConfiguration,
AzureCredentialsWithoutDefaults,
GcpCredentials,
AwsCredentialsWithoutDefaults,
)
from dlt.destinations.exceptions import DestinationTransientException
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import (
SupportsStagingDestination,
Expand All @@ -29,26 +26,27 @@
from dlt.common.schema.typing import (
TTableFormat,
TTableSchema,
TColumnHint,
TColumnType,
TTableSchemaColumns,
TColumnSchemaBase,
)
from dlt.common.storages import FileStorage
from dlt.destinations.exceptions import LoadJobTerminalException
from dlt.destinations.impl.clickhouse.clickhouse_adapter import (
TTableEngineType,
TABLE_ENGINE_TYPE_HINT,
)
from dlt.destinations.impl.clickhouse.configuration import (
ClickHouseClientConfiguration,
)
from dlt.destinations.impl.clickhouse.sql_client import ClickHouseSqlClient
from dlt.destinations.impl.clickhouse.utils import (
convert_storage_to_http_scheme,
from dlt.destinations.impl.clickhouse.typing import (
HINT_TO_CLICKHOUSE_ATTR,
TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR,
)
from dlt.destinations.impl.clickhouse.typing import (
TTableEngineType,
TABLE_ENGINE_TYPE_HINT,
FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING,
SUPPORTED_FILE_FORMATS,
)
from dlt.destinations.impl.clickhouse.utils import (
convert_storage_to_http_scheme,
)
from dlt.destinations.job_client_impl import (
SqlJobClientBase,
SqlJobClientWithStaging,
Expand All @@ -58,18 +56,6 @@
from dlt.destinations.type_mapping import TypeMapper


HINT_TO_CLICKHOUSE_ATTR: Dict[TColumnHint, str] = {
"primary_key": "PRIMARY KEY",
"unique": "", # No unique constraints available in ClickHouse.
"foreign_key": "", # No foreign key constraints support in ClickHouse.
}

TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR: Dict[TTableEngineType, str] = {
"merge_tree": "MergeTree",
"replicated_merge_tree": "ReplicatedMergeTree",
}


class ClickHouseTypeMapper(TypeMapper):
sct_to_unbound_dbt = {
"complex": "String",
Expand Down Expand Up @@ -113,7 +99,8 @@ def from_db_type(
if db_type == "DateTime('UTC')":
db_type = "DateTime"
if datetime_match := re.match(
r"DateTime64(?:\((?P<precision>\d+)(?:,?\s*'(?P<timezone>UTC)')?\))?", db_type
r"DateTime64(?:\((?P<precision>\d+)(?:,?\s*'(?P<timezone>UTC)')?\))?",
db_type,
):
if datetime_match["precision"]:
precision = int(datetime_match["precision"])
Expand All @@ -131,7 +118,7 @@ def from_db_type(
db_type = "Decimal"

if db_type == "Decimal" and (precision, scale) == self.capabilities.wei_precision:
return dict(data_type="wei")
return cast(TColumnType, dict(data_type="wei"))

return super().from_db_type(db_type, precision, scale)

Expand Down Expand Up @@ -161,7 +148,7 @@ def __init__(

compression = "auto"

# Don't use dbapi driver for local files.
# Don't use the DBAPI driver for local files.
if not bucket_path:
# Local filesystem.
if ext == "jsonl":
Expand All @@ -182,8 +169,8 @@ def __init__(
fmt=clickhouse_format,
settings={
"allow_experimental_lightweight_delete": 1,
# "allow_experimental_object_type": 1,
"enable_http_compression": 1,
"date_time_input_format": "best_effort",
},
compression=compression,
)
Expand All @@ -201,13 +188,7 @@ def __init__(
compression = "none" if config.get("data_writer.disable_compression") else "gz"

if bucket_scheme in ("s3", "gs", "gcs"):
if isinstance(staging_credentials, AwsCredentialsWithoutDefaults):
bucket_http_url = convert_storage_to_http_scheme(
bucket_url, endpoint=staging_credentials.endpoint_url
)
access_key_id = staging_credentials.aws_access_key_id
secret_access_key = staging_credentials.aws_secret_access_key
else:
if not isinstance(staging_credentials, AwsCredentialsWithoutDefaults):
raise LoadJobTerminalException(
file_path,
dedent(
Expand All @@ -219,6 +200,11 @@ def __init__(
).strip(),
)

bucket_http_url = convert_storage_to_http_scheme(
bucket_url, endpoint=staging_credentials.endpoint_url
)
access_key_id = staging_credentials.aws_access_key_id
secret_access_key = staging_credentials.aws_secret_access_key
auth = "NOSIGN"
if access_key_id and secret_access_key:
auth = f"'{access_key_id}','{secret_access_key}'"
Expand Down Expand Up @@ -299,6 +285,7 @@ def __init__(
config.normalize_staging_dataset_name(schema),
config.credentials,
capabilities,
config,
)
super().__init__(schema, config, self.sql_client)
self.config: ClickHouseClientConfiguration = config
Expand All @@ -311,10 +298,10 @@ def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> Li
def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
# Build column definition.
# The primary key and sort order definition is defined outside column specification.
hints_str = " ".join(
hints_ = " ".join(
self.active_hints.get(hint)
for hint in self.active_hints.keys()
if c.get(hint, False) is True
if c.get(cast(str, hint), False) is True
and hint not in ("primary_key", "sort")
and hint in self.active_hints
)
Expand All @@ -328,7 +315,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non
)

return (
f"{self.sql_client.escape_column_name(c['name'])} {type_with_nullability_modifier} {hints_str}"
f"{self.sql_client.escape_column_name(c['name'])} {type_with_nullability_modifier} {hints_}"
.strip()
)

Expand All @@ -343,17 +330,26 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
)

def _get_table_update_sql(
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
self,
table_name: str,
new_columns: Sequence[TColumnSchema],
generate_alter: bool,
) -> List[str]:
table: TTableSchema = self.prepare_load_table(table_name, self.in_staging_mode)
sql = SqlJobClientBase._get_table_update_sql(self, table_name, new_columns, generate_alter)

if generate_alter:
return sql

# Default to 'ReplicatedMergeTree' if user didn't explicitly set a table engine hint.
# Default to 'MergeTree' if the user didn't explicitly set a table engine hint.
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
# Clickhouse Cloud will automatically pick `SharedMergeTree` for this option,
# so it will work on both local and cloud instances of CH.
table_type = cast(
TTableEngineType, table.get(TABLE_ENGINE_TYPE_HINT, "replicated_merge_tree")
TTableEngineType,
table.get(
cast(str, TABLE_ENGINE_TYPE_HINT),
self.config.table_engine_type,
),
)
sql[0] = f"{sql[0]}\nENGINE = {TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR.get(table_type)}"

Expand Down
11 changes: 6 additions & 5 deletions dlt/destinations/impl/clickhouse/clickhouse_adapter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from typing import Any, Literal, Set, get_args, Dict
from typing import Any, Dict

from dlt.destinations.impl.clickhouse.configuration import TTableEngineType
from dlt.destinations.impl.clickhouse.typing import (
TABLE_ENGINE_TYPES,
TABLE_ENGINE_TYPE_HINT,
)
from dlt.destinations.utils import ensure_resource
from dlt.extract import DltResource
from dlt.extract.items import TTableHintTemplate


TTableEngineType = Literal["merge_tree", "replicated_merge_tree"]

"""
The table engine (type of table) determines:

Expand All @@ -19,8 +22,6 @@

See https://clickhouse.com/docs/en/engines/table-engines.
"""
TABLE_ENGINE_TYPES: Set[TTableEngineType] = set(get_args(TTableEngineType))
TABLE_ENGINE_TYPE_HINT: Literal["x-table-engine-type"] = "x-table-engine-type"


def clickhouse_adapter(data: Any, table_engine_type: TTableEngineType = None) -> DltResource:
Expand Down
36 changes: 19 additions & 17 deletions dlt/destinations/impl/clickhouse/configuration.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import dataclasses
from typing import ClassVar, Dict, List, Any, Final, Literal, cast, Optional
from typing import ClassVar, Dict, List, Any, Final, cast, Optional

from dlt.common.configuration import configspec
from dlt.common.configuration.specs import ConnectionStringCredentials
from dlt.common.destination.reference import (
DestinationClientDwhWithStagingConfiguration,
)
from dlt.common.libs.sql_alchemy import URL
from dlt.common.utils import digest128


TSecureConnection = Literal[0, 1]
from dlt.destinations.impl.clickhouse.typing import TSecureConnection, TTableEngineType


@configspec(init=False)
Expand All @@ -34,10 +31,6 @@ class ClickHouseCredentials(ConnectionStringCredentials):
"""Timeout for establishing connection. Defaults to 10 seconds."""
send_receive_timeout: int = 300
"""Timeout for sending and receiving data. Defaults to 300 seconds."""
dataset_table_separator: str = "___"
"""Separator for dataset table names, defaults to '___', i.e. 'database.dataset___table'."""
dataset_sentinel_table_name: str = "dlt_sentinel_table"
"""Special table to mark dataset as existing"""
gcp_access_key_id: Optional[str] = None
"""When loading from a gcp bucket, you need to provide gcp interoperable keys"""
gcp_secret_access_key: Optional[str] = None
Expand Down Expand Up @@ -67,27 +60,36 @@ def get_query(self) -> Dict[str, Any]:
"connect_timeout": str(self.connect_timeout),
"send_receive_timeout": str(self.send_receive_timeout),
"secure": 1 if self.secure else 0,
# Toggle experimental settings. These are necessary for certain datatypes and not optional.
"allow_experimental_lightweight_delete": 1,
# "allow_experimental_object_type": 1,
"enable_http_compression": 1,
"date_time_input_format": "best_effort",
}
)
return query


@configspec
class ClickHouseClientConfiguration(DestinationClientDwhWithStagingConfiguration):
destination_type: Final[str] = dataclasses.field(default="clickhouse", init=False, repr=False, compare=False) # type: ignore[misc]
destination_type: Final[str] = dataclasses.field( # type: ignore[misc]
default="clickhouse", init=False, repr=False, compare=False
)
credentials: ClickHouseCredentials = None

# Primary key columns are used to build a sparse primary index which allows for efficient data retrieval,
# but they do not enforce uniqueness constraints. It permits duplicate values even for the primary key
# columns within the same granule.
# See: https://clickhouse.com/docs/en/optimize/sparse-primary-indexes
dataset_table_separator: str = "___"
"""Separator for dataset table names, defaults to '___', i.e. 'database.dataset___table'."""
table_engine_type: Optional[TTableEngineType] = "merge_tree"
"""The default table engine to use. Defaults to 'merge_tree'. Other implemented options are 'shared_merge_tree' and 'replicated_merge_tree'."""
dataset_sentinel_table_name: str = "dlt_sentinel_table"
"""Special table to mark dataset as existing"""

__config_gen_annotations__: ClassVar[List[str]] = [
"dataset_table_separator",
"dataset_sentinel_table_name",
"table_engine_type",
]

def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string."""
"""Returns a fingerprint of the host part of a connection string."""
if self.credentials and self.credentials.host:
return digest128(self.credentials.host)
return ""
3 changes: 1 addition & 2 deletions dlt/destinations/impl/clickhouse/factory.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import sys
import typing as t

from dlt.common.destination import Destination, DestinationCapabilitiesContext
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.data_writers.escape import (
escape_clickhouse_identifier,
escape_clickhouse_literal,
format_clickhouse_datetime_literal,
)

from dlt.common.destination import Destination, DestinationCapabilitiesContext
from dlt.destinations.impl.clickhouse.configuration import (
ClickHouseClientConfiguration,
ClickHouseCredentials,
Expand Down
Loading
Loading