Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Open Source ClickHouse Deployments #1496

Merged
merged 42 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
56b3818
Default to for both on-prem and cloud
Pipboyguy Jun 19, 2024
e4e1010
Add documentation for new engine family types
Pipboyguy Jun 19, 2024
d998ac9
Typo
Pipboyguy Jun 19, 2024
825982b
Merge branch 'refs/heads/devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jun 20, 2024
4912173
Minor doc changes
Pipboyguy Jun 20, 2024
e1ab71d
Fix local clickhouse deployment timestamp parsing issue with simple c…
Pipboyguy Jun 20, 2024
db603a5
Merge branch 'devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jun 25, 2024
8f80391
Extend support for local deployment time types
Pipboyguy Jun 25, 2024
544925a
Adapt test to check whether CH OSS or cloud
Pipboyguy Jun 25, 2024
b2a3596
Defend against CH OSS unsupported dbapi datetime parsing
Pipboyguy Jun 26, 2024
e2a5c4a
Fix typo
Pipboyguy Jun 26, 2024
9ef12c1
Add ClickHouse to local destination tests
Pipboyguy Jun 26, 2024
2388cd6
Update ClickHouse test workflow and remove engine types
Pipboyguy Jun 26, 2024
b0e3751
Use Python 3.10.x for ClickHouse destination tests
Pipboyguy Jun 27, 2024
d207686
Merge branch 'devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jun 27, 2024
f077346
Add ClickHouse MergeTree support and refactor code
Pipboyguy Jun 27, 2024
6e44a71
Update ClickHouse Docker setup and test workflow
Pipboyguy Jun 27, 2024
61bae45
Refactor ClickHouse tests to cover both OSS and Cloud versions
Pipboyguy Jun 27, 2024
6d50c51
Disable SSL for ClickHouse OSS tests
Pipboyguy Jun 27, 2024
fc46384
Merge branch 'devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jun 28, 2024
e1ac1ce
Use state instead of sentinel tables
Pipboyguy Jun 28, 2024
409487c
Remove mention of sentinel table for ClickHouse datasets
Pipboyguy Jun 28, 2024
173610a
Merge branch 'refs/heads/devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jul 1, 2024
07c8f49
Refactor ClickHouse deployment type detection
Pipboyguy Jul 1, 2024
aff6c04
Add conditional execution for ClickHouse OSS tests
Pipboyguy Jul 1, 2024
cbc782c
Update ClickHouse compose file path and move to tests directory
Pipboyguy Jul 1, 2024
c408824
Update ClickHouse docker-compose file path in test workflow
Pipboyguy Jul 1, 2024
0d306a5
Cast client to ClickHouseSqlClient in get_deployment_type call
Pipboyguy Jul 1, 2024
d6913cc
Merge branch 'refs/heads/devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jul 4, 2024
28866ee
Revert "Remove mention of sentinel table for ClickHouse datasets"
Pipboyguy Jul 4, 2024
41a7e1a
Revert "Use state instead of sentinel tables"
Pipboyguy Jul 4, 2024
bc7b309
Add tests for ClickHouse table engine configuration and adapter overr…
Pipboyguy Jul 4, 2024
98d2329
Merge branch 'refs/heads/devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jul 4, 2024
f852d44
Add configurable default table engine type for ClickHouse
Pipboyguy Jul 4, 2024
b0cec6e
Docs
Pipboyguy Jul 4, 2024
d764734
Merge branch 'refs/heads/devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jul 12, 2024
b2bc5b1
Fix comments
Pipboyguy Jul 12, 2024
8d26ef1
Add ClickHouse typing module for improved type handling
Pipboyguy Jul 12, 2024
9c67c65
Merge branch 'refs/heads/devel' into 1387-clickhouse-mergetree-support
Pipboyguy Jul 12, 2024
3028cf4
Move ClickHouse configuration options from credentials to client conf…
Pipboyguy Jul 12, 2024
8f60436
Move table_engine_type from credentials to client configuration
Pipboyguy Jul 12, 2024
477e815
Docs
Pipboyguy Jul 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions .github/workflows/test_destination_clickhouse.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

name: test | clickhouse

on:
Expand All @@ -8,7 +7,7 @@ on:
- devel
workflow_dispatch:
schedule:
- cron: '0 2 * * *'
- cron: '0 2 * * *'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
Expand All @@ -20,7 +19,7 @@ env:
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

ACTIVE_DESTINATIONS: "[\"clickhouse\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]"

jobs:
get_docs_changes:
Expand Down Expand Up @@ -67,12 +66,51 @@ jobs:
- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

# OSS ClickHouse
- run: |
docker-compose -f "tests/load/clickhouse/clickhouse-compose.yml" up -d
echo "Waiting for ClickHouse to be healthy..."
timeout 30s bash -c 'until docker-compose -f "tests/load/clickhouse/clickhouse-compose.yml" ps | grep -q "healthy"; do sleep 1; done'
echo "ClickHouse is up and running"
name: Start ClickHouse OSS


- run: poetry run pytest tests/load -m "essential"
name: Run essential tests Linux (ClickHouse OSS)
Pipboyguy marked this conversation as resolved.
Show resolved Hide resolved
if: ${{ ! (contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule')}}
env:
DESTINATION__CLICKHOUSE__CREDENTIALS__HOST: localhost
DESTINATION__CLICKHOUSE__CREDENTIALS__DATABASE: dlt_data
DESTINATION__CLICKHOUSE__CREDENTIALS__USERNAME: loader
DESTINATION__CLICKHOUSE__CREDENTIALS__PASSWORD: loader
DESTINATION__CLICKHOUSE__CREDENTIALS__PORT: 9000
DESTINATION__CLICKHOUSE__CREDENTIALS__HTTP_PORT: 8123
DESTINATION__CLICKHOUSE__CREDENTIALS__SECURE: 0

- run: poetry run pytest tests/load
name: Run all tests Linux (ClickHouse OSS)
if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}}
env:
DESTINATION__CLICKHOUSE__CREDENTIALS__HOST: localhost
DESTINATION__CLICKHOUSE__CREDENTIALS__DATABASE: dlt_data
DESTINATION__CLICKHOUSE__CREDENTIALS__USERNAME: loader
DESTINATION__CLICKHOUSE__CREDENTIALS__PASSWORD: loader
DESTINATION__CLICKHOUSE__CREDENTIALS__PORT: 9000
DESTINATION__CLICKHOUSE__CREDENTIALS__HTTP_PORT: 8123
DESTINATION__CLICKHOUSE__CREDENTIALS__SECURE: 0

- name: Stop ClickHouse OSS
if: always()
run: docker-compose -f "tests/load/clickhouse/clickhouse-compose.yml" down -v

# ClickHouse Cloud
- run: |
poetry run pytest tests/load -m "essential"
name: Run essential tests Linux
name: Run essential tests Linux (ClickHouse Cloud)
if: ${{ ! (contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule')}}

- run: |
poetry run pytest tests/load
name: Run all tests Linux
name: Run all tests Linux (ClickHouse Cloud)
if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}}

46 changes: 24 additions & 22 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@
import re
from copy import deepcopy
from textwrap import dedent
from typing import ClassVar, Optional, Dict, List, Sequence, cast, Tuple
from typing import Optional, Dict, List, Sequence, cast
from urllib.parse import urlparse

import clickhouse_connect
from clickhouse_connect.driver.tools import insert_file

import dlt
from dlt import config
from dlt.common.configuration.specs import (
CredentialsConfiguration,
AzureCredentialsWithoutDefaults,
GcpCredentials,
AwsCredentialsWithoutDefaults,
)
from dlt.destinations.exceptions import DestinationTransientException
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import (
SupportsStagingDestination,
Expand All @@ -31,8 +28,6 @@
TTableSchema,
TColumnHint,
TColumnType,
TTableSchemaColumns,
TColumnSchemaBase,
)
from dlt.common.storages import FileStorage
from dlt.destinations.exceptions import LoadJobTerminalException
Expand Down Expand Up @@ -66,6 +61,7 @@

TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR: Dict[TTableEngineType, str] = {
"merge_tree": "MergeTree",
"shared_merge_tree": "SharedMergeTree",
"replicated_merge_tree": "ReplicatedMergeTree",
}

Expand Down Expand Up @@ -113,7 +109,8 @@ def from_db_type(
if db_type == "DateTime('UTC')":
db_type = "DateTime"
if datetime_match := re.match(
r"DateTime64(?:\((?P<precision>\d+)(?:,?\s*'(?P<timezone>UTC)')?\))?", db_type
r"DateTime64(?:\((?P<precision>\d+)(?:,?\s*'(?P<timezone>UTC)')?\))?",
db_type,
):
if datetime_match["precision"]:
precision = int(datetime_match["precision"])
Expand All @@ -131,7 +128,7 @@ def from_db_type(
db_type = "Decimal"

if db_type == "Decimal" and (precision, scale) == self.capabilities.wei_precision:
return dict(data_type="wei")
return cast(TColumnType, dict(data_type="wei"))

return super().from_db_type(db_type, precision, scale)

Expand Down Expand Up @@ -161,7 +158,7 @@ def __init__(

compression = "auto"

# Don't use dbapi driver for local files.
# Don't use the DBAPI driver for local files.
if not bucket_path:
# Local filesystem.
if ext == "jsonl":
Expand All @@ -182,8 +179,8 @@ def __init__(
fmt=clickhouse_format,
settings={
"allow_experimental_lightweight_delete": 1,
# "allow_experimental_object_type": 1,
"enable_http_compression": 1,
"date_time_input_format": "best_effort",
},
compression=compression,
)
Expand All @@ -201,13 +198,7 @@ def __init__(
compression = "none" if config.get("data_writer.disable_compression") else "gz"

if bucket_scheme in ("s3", "gs", "gcs"):
if isinstance(staging_credentials, AwsCredentialsWithoutDefaults):
bucket_http_url = convert_storage_to_http_scheme(
bucket_url, endpoint=staging_credentials.endpoint_url
)
access_key_id = staging_credentials.aws_access_key_id
secret_access_key = staging_credentials.aws_secret_access_key
else:
if not isinstance(staging_credentials, AwsCredentialsWithoutDefaults):
raise LoadJobTerminalException(
file_path,
dedent(
Expand All @@ -219,6 +210,11 @@ def __init__(
).strip(),
)

bucket_http_url = convert_storage_to_http_scheme(
bucket_url, endpoint=staging_credentials.endpoint_url
)
access_key_id = staging_credentials.aws_access_key_id
secret_access_key = staging_credentials.aws_secret_access_key
auth = "NOSIGN"
if access_key_id and secret_access_key:
auth = f"'{access_key_id}','{secret_access_key}'"
Expand Down Expand Up @@ -308,7 +304,7 @@ def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> Li
def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
# Build column definition.
# The primary key and sort order definition is defined outside column specification.
hints_str = " ".join(
hints_ = " ".join(
self.active_hints.get(hint)
for hint in self.active_hints.keys()
if c.get(hint, False) is True
Expand All @@ -325,7 +321,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non
)

return (
f"{self.sql_client.escape_column_name(c['name'])} {type_with_nullability_modifier} {hints_str}"
f"{self.sql_client.escape_column_name(c['name'])} {type_with_nullability_modifier} {hints_}"
.strip()
)

Expand All @@ -340,17 +336,23 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
)

def _get_table_update_sql(
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
self,
table_name: str,
new_columns: Sequence[TColumnSchema],
generate_alter: bool,
) -> List[str]:
table: TTableSchema = self.prepare_load_table(table_name, self.in_staging_mode)
sql = SqlJobClientBase._get_table_update_sql(self, table_name, new_columns, generate_alter)

if generate_alter:
return sql

# Default to 'ReplicatedMergeTree' if user didn't explicitly set a table engine hint.
# Default to 'MergeTree' if the user didn't explicitly set a table engine hint.
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
# Clickhouse Cloud will automatically pick `SharedMergeTree` for this option,
# so it will work on both local and cloud instances of CH.
table_type = cast(
TTableEngineType, table.get(TABLE_ENGINE_TYPE_HINT, "replicated_merge_tree")
TTableEngineType,
table.get(TABLE_ENGINE_TYPE_HINT, self.config.credentials.table_engine_type),
)
sql[0] = f"{sql[0]}\nENGINE = {TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR.get(table_type)}"

Expand Down
3 changes: 1 addition & 2 deletions dlt/destinations/impl/clickhouse/clickhouse_adapter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from typing import Any, Literal, Set, get_args, Dict

from dlt.destinations.impl.clickhouse.configuration import TTableEngineType
from dlt.destinations.utils import ensure_resource
from dlt.extract import DltResource
from dlt.extract.items import TTableHintTemplate


TTableEngineType = Literal["merge_tree", "replicated_merge_tree"]

"""
The table engine (type of table) determines:

Expand Down
17 changes: 12 additions & 5 deletions dlt/destinations/impl/clickhouse/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@
from dlt.common.destination.reference import (
DestinationClientDwhWithStagingConfiguration,
)
from dlt.common.libs.sql_alchemy import URL
from dlt.common.utils import digest128


TSecureConnection = Literal[0, 1]
TTableEngineType = Literal[
"merge_tree",
"shared_merge_tree",
"replicated_merge_tree",
]


@configspec(init=False)
Expand All @@ -36,6 +40,8 @@ class ClickHouseCredentials(ConnectionStringCredentials):
"""Timeout for sending and receiving data. Defaults to 300 seconds."""
dataset_table_separator: str = "___"
"""Separator for dataset table names, defaults to '___', i.e. 'database.dataset___table'."""
table_engine_type: Optional[TTableEngineType] = "merge_tree"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you define this attribute under ClickHouseCredentials?

I think it makes more sense to put it under ClickHouseClientConfiguration. Same for dataset_table_separator and dataset_sentinel_table_name. They aren't credentials.

As a reference: synapse has a default_table_index_type that's similar to table_engine_type, which is defined in the destination client class:

default_table_index_type: Optional[TTableIndexType] = "heap"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Pipboyguy yes I agree with @jorritsandbrink
this is probably due to sql_client not seeing config but credentials - but you can pass additional arguments to it
so my take is that we move it and if it is hard then we'll need to change how sql_client is instantiated. I can help in that case

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree. These don't belong in credentials.

Please find latest changes in both code base and docs.

"""The default table engine to use. Defaults to 'merge_tree'. Other implemented options are 'shared_merge_tree' and 'replicated_merge_tree'."""
dataset_sentinel_table_name: str = "dlt_sentinel_table"
"""Special table to mark dataset as existing"""
gcp_access_key_id: Optional[str] = None
Expand Down Expand Up @@ -67,18 +73,19 @@ def get_query(self) -> Dict[str, Any]:
"connect_timeout": str(self.connect_timeout),
"send_receive_timeout": str(self.send_receive_timeout),
"secure": 1 if self.secure else 0,
# Toggle experimental settings. These are necessary for certain datatypes and not optional.
"allow_experimental_lightweight_delete": 1,
# "allow_experimental_object_type": 1,
"enable_http_compression": 1,
"date_time_input_format": "best_effort",
}
)
return query


@configspec
class ClickHouseClientConfiguration(DestinationClientDwhWithStagingConfiguration):
destination_type: Final[str] = dataclasses.field(default="clickhouse", init=False, repr=False, compare=False) # type: ignore[misc]
destination_type: Final[str] = dataclasses.field( # type: ignore[misc]
default="clickhouse", init=False, repr=False, compare=False
)
credentials: ClickHouseCredentials = None

# Primary key columns are used to build a sparse primary index which allows for efficient data retrieval,
Expand All @@ -87,7 +94,7 @@ class ClickHouseClientConfiguration(DestinationClientDwhWithStagingConfiguration
# See: https://clickhouse.com/docs/en/optimize/sparse-primary-indexes

def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string."""
"""Returns a fingerprint of the host part of a connection string."""
if self.credentials and self.credentials.host:
return digest128(self.credentials.host)
return ""
29 changes: 24 additions & 5 deletions dlt/destinations/impl/clickhouse/sql_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime # noqa: I251
from contextlib import contextmanager
from typing import (
Iterator,
Expand All @@ -7,15 +8,18 @@
Optional,
Sequence,
ClassVar,
Literal,
Tuple,
)

import clickhouse_driver # type: ignore[import-untyped]
import clickhouse_driver.errors # type: ignore[import-untyped]
from clickhouse_driver.dbapi import OperationalError # type: ignore[import-untyped]
from clickhouse_driver.dbapi.extras import DictCursor # type: ignore[import-untyped]
from pendulum import DateTime # noqa: I251

from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.typing import DictStrAny
from dlt.destinations.exceptions import (
DatabaseUndefinedRelation,
DatabaseTransientException,
Expand All @@ -32,6 +36,7 @@
from dlt.destinations.utils import _convert_to_old_pyformat


TDeployment = Literal["ClickHouseOSS", "ClickHouseCloud"]
TRANSACTIONS_UNSUPPORTED_WARNING_MESSAGE = (
"ClickHouse does not support transactions! Each statement is auto-committed separately."
)
Expand Down Expand Up @@ -98,13 +103,16 @@ def execute_sql(
return None if curr.description is None else curr.fetchall()

def create_dataset(self) -> None:
# We create a sentinel table which defines wether we consider the dataset created
# We create a sentinel table which defines whether we consider the dataset created.
sentinel_table_name = self.make_qualified_table_name(
self.credentials.dataset_sentinel_table_name
)
self.execute_sql(
f"""CREATE TABLE {sentinel_table_name} (_dlt_id String NOT NULL PRIMARY KEY) ENGINE=ReplicatedMergeTree COMMENT 'internal dlt sentinel table'"""
)
# `MergeTree` is guaranteed to work in both self-managed and cloud setups.
self.execute_sql(f"""
CREATE TABLE {sentinel_table_name}
(_dlt_id String NOT NULL PRIMARY KEY)
ENGINE=MergeTree
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably nitpicking but should it use config.table_engine_type here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rudolfix Thanks for the spot!

Changed to config table engine:

sentinel_table_type = cast(TTableEngineType, self.credentials.table_engine_type)
self.execute_sql(f"""
CREATE TABLE {sentinel_table_name}
(_dlt_id String NOT NULL PRIMARY KEY)
ENGINE={TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR.get(sentinel_table_type)}
COMMENT 'internal dlt sentinel table'""")

COMMENT 'internal dlt sentinel table'""")

def drop_dataset(self) -> None:
# Since ClickHouse doesn't have schemas, we need to drop all tables in our virtual schema,
Expand Down Expand Up @@ -132,19 +140,30 @@ def _list_tables(self) -> List[str]:
)
return [row[0] for row in rows]

@staticmethod
def _sanitise_dbargs(db_args: DictStrAny) -> DictStrAny:
"""For ClickHouse OSS, the DBapi driver doesn't parse datetime types.
We remove timezone specifications in this case."""
for key, value in db_args.items():
if isinstance(value, (DateTime, datetime.datetime)):
db_args[key] = str(value.replace(microsecond=0, tzinfo=None))
return db_args

@contextmanager
@raise_database_error
def execute_query(
self, query: AnyStr, *args: Any, **kwargs: Any
) -> Iterator[ClickHouseDBApiCursorImpl]:
assert isinstance(query, str), "Query must be a string."

db_args = kwargs.copy()
db_args: DictStrAny = kwargs.copy()

if args:
query, db_args = _convert_to_old_pyformat(query, args, OperationalError)
db_args.update(kwargs)

db_args = self._sanitise_dbargs(db_args)

with self._conn.cursor() as cursor:
for query_line in query.split(";"):
if query_line := query_line.strip():
Expand Down
4 changes: 3 additions & 1 deletion dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ def spool_new_jobs(self, load_id: str, schema: Schema) -> Tuple[int, List[LoadJo
# use thread based pool as jobs processing is mostly I/O and we do not want to pickle jobs
load_files = filter_new_jobs(
self.load_storage.list_new_jobs(load_id),
self.destination.capabilities(self.destination.configuration(self.initial_client_config)),
self.destination.capabilities(
self.destination.configuration(self.initial_client_config)
),
self.config,
)
file_count = len(load_files)
Expand Down
Loading
Loading