From 56b381873d96225bb36bfc487784365a6239709e Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Wed, 19 Jun 2024 22:08:03 +0200 Subject: [PATCH 01/33] Default to for both on-prem and cloud Signed-off-by: Marcel Coetzee --- .../impl/clickhouse/clickhouse.py | 11 ++-- .../impl/clickhouse/clickhouse_adapter.py | 4 +- .../impl/clickhouse/sql_client.py | 11 ++-- .../clickhouse/test_clickhouse_adapter.py | 60 +++++++++++++------ 4 files changed, 59 insertions(+), 27 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index cf1f1bc857..4c5c4feafc 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -67,7 +67,10 @@ TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR: Dict[TTableEngineType, str] = { "merge_tree": "MergeTree", + "shared_merge_tree": "SharedMergeTree", "replicated_merge_tree": "ReplicatedMergeTree", + "stripe_log": "StripeLog", + "tiny_log": "TinyLog", } @@ -350,10 +353,10 @@ def _get_table_update_sql( if generate_alter: return sql - # Default to 'ReplicatedMergeTree' if user didn't explicitly set a table engine hint. - table_type = cast( - TTableEngineType, table.get(TABLE_ENGINE_TYPE_HINT, "replicated_merge_tree") - ) + # Default to 'MergeTree' if the user didn't explicitly set a table engine hint. + # Clickhouse Cloud will automatically pick `SharedMergeTree` for this option, + # so it will work on both local and cloud instances of CH. + table_type = cast(TTableEngineType, table.get(TABLE_ENGINE_TYPE_HINT, "merge_tree")) sql[0] = f"{sql[0]}\nENGINE = {TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR.get(table_type)}" if primary_key_list := [ diff --git a/dlt/destinations/impl/clickhouse/clickhouse_adapter.py b/dlt/destinations/impl/clickhouse/clickhouse_adapter.py index 1bbde8e45d..97e163ed62 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse_adapter.py +++ b/dlt/destinations/impl/clickhouse/clickhouse_adapter.py @@ -5,7 +5,9 @@ from dlt.extract.items import TTableHintTemplate -TTableEngineType = Literal["merge_tree", "replicated_merge_tree"] +TTableEngineType = Literal[ + "merge_tree", "shared_merge_tree", "replicated_merge_tree", "stripe_log", "tiny_log" +] """ The table engine (type of table) determines: diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index 8fb89c90cd..9c86f8f519 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -93,13 +93,16 @@ def execute_sql( return None if curr.description is None else curr.fetchall() def create_dataset(self) -> None: - # We create a sentinel table which defines wether we consider the dataset created + # We create a sentinel table which defines whether we consider the dataset created. sentinel_table_name = self.make_qualified_table_name( self.credentials.dataset_sentinel_table_name ) - self.execute_sql( - f"""CREATE TABLE {sentinel_table_name} (_dlt_id String NOT NULL PRIMARY KEY) ENGINE=ReplicatedMergeTree COMMENT 'internal dlt sentinel table'""" - ) + # `MergeTree` is guaranteed to work in both self-managed and cloud setups. + self.execute_sql(f""" + CREATE TABLE {sentinel_table_name} + (_dlt_id String NOT NULL PRIMARY KEY) + ENGINE=MergeTree + COMMENT 'internal dlt sentinel table'""") def drop_dataset(self) -> None: # Since ClickHouse doesn't have schemas, we need to drop all tables in our virtual schema, diff --git a/tests/load/clickhouse/test_clickhouse_adapter.py b/tests/load/clickhouse/test_clickhouse_adapter.py index 36d3ac07f7..cb364e63da 100644 --- a/tests/load/clickhouse/test_clickhouse_adapter.py +++ b/tests/load/clickhouse/test_clickhouse_adapter.py @@ -1,3 +1,5 @@ +from typing import Generator, Dict + import dlt from dlt.destinations.adapters import clickhouse_adapter from tests.pipeline.utils import assert_load_info @@ -5,15 +7,24 @@ def test_clickhouse_adapter() -> None: @dlt.resource - def merge_tree_resource(): + def merge_tree_resource() -> Generator[Dict[str, int], None, None]: yield {"field1": 1, "field2": 2} + # `ReplicatedMergeTree` has been supplanted by `ReplacingMergeTree` on CH Cloud, + # which is automatically selected even if `MergeTree` is selected. + # See https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree. + + # The `Log` Family of engines are only supported in self-managed deployments. + # So can't test in CH Cloud CI. + @dlt.resource - def replicated_merge_tree_resource(): + def replicated_merge_tree_resource() -> Generator[Dict[str, int], None, None]: yield {"field1": 1, "field2": 2} @dlt.resource - def not_annotated_resource(): + def not_annotated_resource() -> Generator[Dict[str, int], None, None]: + """Non annotated resource will default to `SharedMergeTree` for CH cloud + and `MergeTree` for self-managed installation.""" yield {"field1": 1, "field2": 2} clickhouse_adapter(merge_tree_resource, table_engine_type="merge_tree") @@ -25,37 +36,50 @@ def not_annotated_resource(): assert_load_info(pack) with pipe.sql_client() as client: - # get map of table names to full table names + # Get a map of table names to full table names. tables = {} for table in client._list_tables(): if "resource" in table: tables[table.split("___")[1]] = table assert (len(tables.keys())) == 3 - # check content + # Check the table content. for full_table_name in tables.values(): with client.execute_query(f"SELECT * FROM {full_table_name};") as cursor: res = cursor.fetchall() assert tuple(res[0])[:2] == (1, 2) - # check table format - # fails now, because we do not have a cluster (I think), it will fall back to SharedMergeTree - for full_table_name in tables.values(): + # Check the table engine. + for table_name, full_table_name in tables.items(): with client.execute_query( - "SELECT database, name, engine, engine_full FROM system.tables WHERE name =" - f" '{full_table_name}';" + "SELECT database, name, engine, engine_full FROM system.tables " + f"WHERE name = '{full_table_name}';" ) as cursor: res = cursor.fetchall() - # this should test that two tables should be replicatedmergetree tables - assert tuple(res[0])[2] == "SharedMergeTree" + if table_name in ( + "merge_tree_resource", + "replicated_merge_tree_resource", + ): + assert tuple(res[0])[2] in ( + "MergeTree", + "SharedMergeTree", + "ReplicatedMergeTree", + ) + else: + # Non annotated resource needs to default to detected installation + # type, i.e. cloud or self-managed. + # CI runs on CH cloud, so will be `SharedMergeTree`. + assert tuple(res[0])[2] == "SharedMergeTree" - # we can check the gen table sql though + # We can check the generated table's SQL, though. with pipe.destination_client() as dest_client: - for table in tables.keys(): + for table in tables: sql = dest_client._get_table_update_sql( # type: ignore[attr-defined] - table, pipe.default_schema.tables[table]["columns"].values(), generate_alter=False + table, + pipe.default_schema.tables[table]["columns"].values(), + generate_alter=False, ) - if table == "merge_tree_resource": - assert "ENGINE = MergeTree" in sql[0] - else: + if table == "replicated_merge_tree_resource": assert "ENGINE = ReplicatedMergeTree" in sql[0] + else: + assert "ENGINE = MergeTree" or "ENGINE = SharedMergeTree" in sql[0] From e4e10101a5dd5f9f9a398bd4b92f7866cbd61a34 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Wed, 19 Jun 2024 22:55:26 +0200 Subject: [PATCH 02/33] Add documentation for new engine family types Signed-off-by: Marcel Coetzee --- .../dlt-ecosystem/destinations/clickhouse.md | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md index b1dde5a328..5b813dfa77 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md @@ -37,7 +37,7 @@ or with `pip install "dlt[clickhouse]"`, which installs the `dlt` library and th ### 2. Setup ClickHouse database -To load data into ClickHouse, you need to create a ClickHouse database. While we recommend asking our GPT-4 assistant for details, we have provided a general outline of the process below: +To load data into ClickHouse, you need to create a ClickHouse database. While we recommend asking our GPT-4 assistant for details, we’ve provided a general outline of the process below: 1. You can use an existing ClickHouse database or create a new one. @@ -63,22 +63,29 @@ To load data into ClickHouse, you need to create a ClickHouse database. While we username = "dlt" # ClickHouse username, default is usually "default" password = "Dlt*12345789234567" # ClickHouse password if any host = "localhost" # ClickHouse server host - port = 9000 # ClickHouse HTTP port, default is 9000 - http_port = 8443 # HTTP Port to connect to ClickHouse server's HTTP interface. Defaults to 8443. + port = 9000 # ClickHouse HTTP port, default is 9000 for Clickhouse Cloud. + http_port = 8443 # HTTP Port to connect to ClickHouse server's HTTP interface. Defaults to 8443 for Clickhouse Cloud. secure = 1 # Set to 1 if using HTTPS, else 0. dataset_table_separator = "___" # Separator for dataset table names from dataset. ``` - :::info http_port - The `http_port` parameter specifies the port number to use when connecting to the ClickHouse server's HTTP interface. This is different from default port 9000, which is used for the native TCP - protocol. + :::info Network Ports + The `http_port` parameter specifies the port number to use when connecting to the ClickHouse server's HTTP interface. + The default non-secure HTTP port for ClickHouse is `8123`. + This is different from the default port `9000`, which is used for the native TCP protocol. - You must set `http_port` if you are not using external staging (i.e. you don't set the staging parameter in your pipeline). This is because dlt's built-in ClickHouse local storage staging uses the - [clickhouse-connect](https://github.com/ClickHouse/clickhouse-connect) library, which communicates with ClickHouse over HTTP. + You must set `http_port` if you are not using external staging (i.e. you don't set the `staging` parameter in your pipeline). This is because dlt's built-in ClickHouse local storage staging uses the [clickhouse-connect](https://github.com/ClickHouse/clickhouse-connect) library, which communicates with ClickHouse over HTTP. - Make sure your ClickHouse server is configured to accept HTTP connections on the port specified by `http_port`. For example, if you set `http_port = 8443`, then ClickHouse should be listening for - HTTP - requests on port 8443. If you are using external staging, you can omit the `http_port` parameter, since clickhouse-connect will not be used in this case. + Make sure your ClickHouse server is configured to accept HTTP connections on the port specified by `http_port`. For example: + + - If you set `http_port = 8123` (default non-secure HTTP port), then ClickHouse should be listening for HTTP requests on port 8123. + - If you set `http_port = 8443`, then ClickHouse should be listening for secure HTTPS requests on port 8443. + + If you're using external staging, you can omit the `http_port` parameter, since clickhouse-connect will not be used in this case. + + For local development and testing with ClickHouse running locally, it is recommended to use the default non-secure HTTP port `8123` by setting `http_port=8123` or omitting the parameter. + + Please see the [ClickHouse network port documentation](https://clickhouse.com/docs/en/guides/sre/network-ports) for further reference. ::: 2. You can pass a database connection string similar to the one used by the `clickhouse-driver` library. The credentials above will look like this: @@ -104,7 +111,7 @@ Data is loaded into ClickHouse using the most efficient method depending on the `Clickhouse` does not support multiple datasets in one database, dlt relies on datasets to exist for multiple reasons. To make `clickhouse` work with `dlt`, tables generated by `dlt` in your `clickhouse` database will have their name prefixed with the dataset name separated by -the configurable `dataset_table_separator`. Additionally, a special sentinel table that does not contain any data will be created, so dlt knows which virtual datasets already exist in a +the configurable `dataset_table_separator`. Additionally, a special sentinel table that doesn’t contain any data will be created, so dlt knows which virtual datasets already exist in a clickhouse destination. @@ -115,13 +122,13 @@ destination. The `clickhouse` destination has a few specific deviations from the default sql destinations: -1. `Clickhouse` has an experimental `object` datatype, but we have found it to be a bit unpredictable, so the dlt clickhouse destination will load the complex datatype to a `text` column. If you need +1. `Clickhouse` has an experimental `object` datatype, but we’ve found it to be a bit unpredictable, so the dlt clickhouse destination will load the complex datatype to a `text` column. If you need this feature, get in touch with our Slack community, and we will consider adding it. 2. `Clickhouse` does not support the `time` datatype. Time will be loaded to a `text` column. 3. `Clickhouse` does not support the `binary` datatype. Binary will be loaded to a `text` column. When loading from `jsonl`, this will be a base64 string, when loading from parquet this will be the `binary` object converted to `text`. -4. `Clickhouse` accepts adding columns to a populated table that are not null. -5. `Clickhouse` can produce rounding errors under certain conditions when using the float / double datatype. Make sure to use decimal if you cannot afford to have rounding errors. Loading the value +4. `Clickhouse` accepts adding columns to a populated table that aren’t null. +5. `Clickhouse` can produce rounding errors under certain conditions when using the float / double datatype. Make sure to use decimal if you can’t afford to have rounding errors. Loading the value 12.7001 to a double column with the loader file format jsonl set will predictbly produce a rounding error for example. ## Supported column hints @@ -130,31 +137,32 @@ ClickHouse supports the following [column hints](../../general-usage/schema#tabl - `primary_key` - marks the column as part of the primary key. Multiple columns can have this hint to create a composite primary key. -## Table Engine +## Choosing a Table Engine -By default, tables are created using the `ReplicatedMergeTree` table engine in ClickHouse. You can specify an alternate table engine using the `table_engine_type` with the clickhouse adapter: +By default, tables are created using the `MergeTree` table engine in ClickHouse. You can specify an alternate table engine using the `table_engine_type` parameter with the clickhouse adapter: -```py +```python from dlt.destinations.adapters import clickhouse_adapter - @dlt.resource() def my_resource(): - ... - + ... clickhouse_adapter(my_resource, table_engine_type="merge_tree") - ``` -Supported values are: +Supported values for `table_engine_type` are: + +- `merge_tree` (default) - creates tables using the `MergeTree` engine, suitable for most use cases. [Learn more about MergeTree](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree). +- `shared_merge_tree` - creates tables using the `SharedMergeTree` engine, optimized for cloud-native environments with shared storage. This table is **only** available on ClickHouse Cloud and it the default selection if `merge_tree` is selected. [Learn more about SharedMergeTree](https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree). +- `replicated_merge_tree` - creates tables using the `ReplicatedMergeTree` engine, which supports data replication across multiple nodes for high availability. [Learn more about ReplicatedMergeTree](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication). This also defaults to `shared_merge_tree` on ClickHouse Cloud. +- Experimental support for the `Log` engine family with `stripe_log` and `tiny_log`. -- `merge_tree` - creates tables using the `MergeTree` engine -- `replicated_merge_tree` (default) - creates tables using the `ReplicatedMergeTree` engine +For local development and testing with ClickHouse running locally, the `MergeTree` engine is recommended. ## Staging support -ClickHouse supports Amazon S3, Google Cloud Storage and Azure Blob Storage as file staging destinations. +ClickHouse supports Amazon S3, Google Cloud Storage, and Azure Blob Storage as file staging destinations. `dlt` will upload Parquet or JSONL files to the staging location and use ClickHouse table functions to load the data directly from the staged files. From d998ac94ef252ad5997c85b2eee4e56bb60a7f7f Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Wed, 19 Jun 2024 23:04:01 +0200 Subject: [PATCH 03/33] Typo Signed-off-by: Marcel Coetzee --- docs/website/docs/dlt-ecosystem/destinations/clickhouse.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md index 5b813dfa77..addbddff86 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md @@ -141,7 +141,7 @@ ClickHouse supports the following [column hints](../../general-usage/schema#tabl By default, tables are created using the `MergeTree` table engine in ClickHouse. You can specify an alternate table engine using the `table_engine_type` parameter with the clickhouse adapter: -```python +```py from dlt.destinations.adapters import clickhouse_adapter @dlt.resource() From 491217396d475a1eb952d449b964e4bffb5c3b26 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 20 Jun 2024 22:11:39 +0200 Subject: [PATCH 04/33] Minor doc changes Signed-off-by: Marcel Coetzee --- .../dlt-ecosystem/destinations/clickhouse.md | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md index addbddff86..ec5eb18523 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md @@ -37,7 +37,7 @@ or with `pip install "dlt[clickhouse]"`, which installs the `dlt` library and th ### 2. Setup ClickHouse database -To load data into ClickHouse, you need to create a ClickHouse database. While we recommend asking our GPT-4 assistant for details, we’ve provided a general outline of the process below: +To load data into ClickHouse, you need to create a ClickHouse database. While we recommend asking our GPT-4 assistant for details, we've provided a general outline of the process below: 1. You can use an existing ClickHouse database or create a new one. @@ -91,7 +91,7 @@ To load data into ClickHouse, you need to create a ClickHouse database. While we 2. You can pass a database connection string similar to the one used by the `clickhouse-driver` library. The credentials above will look like this: ```toml - # keep it at the top of your toml file, before any section starts. + # keep it at the top of your toml file before any section starts. destination.clickhouse.credentials="clickhouse://dlt:Dlt*12345789234567@localhost:9000/dlt?secure=1" ``` @@ -111,7 +111,8 @@ Data is loaded into ClickHouse using the most efficient method depending on the `Clickhouse` does not support multiple datasets in one database, dlt relies on datasets to exist for multiple reasons. To make `clickhouse` work with `dlt`, tables generated by `dlt` in your `clickhouse` database will have their name prefixed with the dataset name separated by -the configurable `dataset_table_separator`. Additionally, a special sentinel table that doesn’t contain any data will be created, so dlt knows which virtual datasets already exist in a +the configurable `dataset_table_separator`. +Additionally, a special sentinel table that doesn't contain any data will be created, so dlt knows which virtual datasets already exist in a clickhouse destination. @@ -122,14 +123,15 @@ destination. The `clickhouse` destination has a few specific deviations from the default sql destinations: -1. `Clickhouse` has an experimental `object` datatype, but we’ve found it to be a bit unpredictable, so the dlt clickhouse destination will load the complex datatype to a `text` column. If you need +1. `Clickhouse` has an experimental `object` datatype, but we've found it to be a bit unpredictable, so the dlt clickhouse destination will load the complex datatype to a `text` column. + If you need this feature, get in touch with our Slack community, and we will consider adding it. 2. `Clickhouse` does not support the `time` datatype. Time will be loaded to a `text` column. 3. `Clickhouse` does not support the `binary` datatype. Binary will be loaded to a `text` column. When loading from `jsonl`, this will be a base64 string, when loading from parquet this will be the `binary` object converted to `text`. 4. `Clickhouse` accepts adding columns to a populated table that aren’t null. 5. `Clickhouse` can produce rounding errors under certain conditions when using the float / double datatype. Make sure to use decimal if you can’t afford to have rounding errors. Loading the value - 12.7001 to a double column with the loader file format jsonl set will predictbly produce a rounding error for example. + 12.7001 to a double column with the loader file format jsonl set will predictably produce a rounding error, for example. ## Supported column hints @@ -154,8 +156,8 @@ clickhouse_adapter(my_resource, table_engine_type="merge_tree") Supported values for `table_engine_type` are: - `merge_tree` (default) - creates tables using the `MergeTree` engine, suitable for most use cases. [Learn more about MergeTree](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree). -- `shared_merge_tree` - creates tables using the `SharedMergeTree` engine, optimized for cloud-native environments with shared storage. This table is **only** available on ClickHouse Cloud and it the default selection if `merge_tree` is selected. [Learn more about SharedMergeTree](https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree). -- `replicated_merge_tree` - creates tables using the `ReplicatedMergeTree` engine, which supports data replication across multiple nodes for high availability. [Learn more about ReplicatedMergeTree](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication). This also defaults to `shared_merge_tree` on ClickHouse Cloud. +- `shared_merge_tree` - creates tables using the `SharedMergeTree` engine, optimized for cloud-native environments with shared storage. This table is **only** available on ClickHouse Cloud, and it the default selection if `merge_tree` is selected. [Learn more about SharedMergeTree](https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree). +- `replicated_merge_tree` - creates tables using the `ReplicatedMergeTree` engine, which supports data replication across multiple nodes for high availability. [Learn more about ReplicatedMergeTree](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication). This defaults to `shared_merge_tree` on ClickHouse Cloud. - Experimental support for the `Log` engine family with `stripe_log` and `tiny_log`. For local development and testing with ClickHouse running locally, the `MergeTree` engine is recommended. @@ -222,7 +224,7 @@ dlt's staging mechanisms for ClickHouse. ### dbt support -Integration with [dbt](../transformations/dbt/dbt.md) is generally supported via dbt-clickhouse, but not tested by us. +Integration with [dbt](../transformations/dbt/dbt.md) is generally supported via dbt-clickhouse but not tested by us. ### Syncing of `dlt` state From e1ab71d3cd98096b3e1bf8a4cdc200b01a73106f Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 20 Jun 2024 22:12:11 +0200 Subject: [PATCH 05/33] Fix local clickhouse deployment timestamp parsing issue with simple configuration setting~ Signed-off-by: Marcel Coetzee --- dlt/destinations/impl/clickhouse/clickhouse.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 4c5c4feafc..4e5c76da87 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -8,15 +8,12 @@ import clickhouse_connect from clickhouse_connect.driver.tools import insert_file -import dlt from dlt import config from dlt.common.configuration.specs import ( CredentialsConfiguration, AzureCredentialsWithoutDefaults, - GcpCredentials, AwsCredentialsWithoutDefaults, ) -from dlt.destinations.exceptions import DestinationTransientException from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import ( SupportsStagingDestination, @@ -117,7 +114,8 @@ def from_db_type( if db_type == "DateTime('UTC')": db_type = "DateTime" if datetime_match := re.match( - r"DateTime64(?:\((?P\d+)(?:,?\s*'(?PUTC)')?\))?", db_type + r"DateTime64(?:\((?P\d+)(?:,?\s*'(?PUTC)')?\))?", + db_type, ): if datetime_match["precision"]: precision = int(datetime_match["precision"]) @@ -135,7 +133,7 @@ def from_db_type( db_type = "Decimal" if db_type == "Decimal" and (precision, scale) == self.capabilities.wei_precision: - return dict(data_type="wei") + return cast(TColumnType, dict(data_type="wei")) return super().from_db_type(db_type, precision, scale) @@ -165,7 +163,7 @@ def __init__( compression = "auto" - # Don't use dbapi driver for local files. + # Don't use the DBAPI driver for local files. if not bucket_path: # Local filesystem. if ext == "jsonl": @@ -186,8 +184,8 @@ def __init__( fmt=clickhouse_format, settings={ "allow_experimental_lightweight_delete": 1, - # "allow_experimental_object_type": 1, "enable_http_compression": 1, + "date_time_input_format": "best_effort", }, compression=compression, ) @@ -345,7 +343,10 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> ) def _get_table_update_sql( - self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool + self, + table_name: str, + new_columns: Sequence[TColumnSchema], + generate_alter: bool, ) -> List[str]: table: TTableSchema = self.prepare_load_table(table_name, self.in_staging_mode) sql = SqlJobClientBase._get_table_update_sql(self, table_name, new_columns, generate_alter) From 8f80391d88261f628891156fe7b43ab5aa131607 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Tue, 25 Jun 2024 20:56:55 +0200 Subject: [PATCH 06/33] Extend support for local deployment time types Signed-off-by: Marcel Coetzee --- dlt/destinations/impl/clickhouse/clickhouse.py | 17 ++++++++--------- .../impl/clickhouse/configuration.py | 3 +-- .../custom_destination_lancedb.py | 4 +++- .../clickhouse/test_clickhouse_configuration.py | 4 ++-- .../sources/helpers/rest_client/test_client.py | 1 - 5 files changed, 14 insertions(+), 15 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 4e5c76da87..8baeff0dc4 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -203,13 +203,7 @@ def __init__( compression = "none" if config.get("data_writer.disable_compression") else "gz" if bucket_scheme in ("s3", "gs", "gcs"): - if isinstance(staging_credentials, AwsCredentialsWithoutDefaults): - bucket_http_url = convert_storage_to_http_scheme( - bucket_url, endpoint=staging_credentials.endpoint_url - ) - access_key_id = staging_credentials.aws_access_key_id - secret_access_key = staging_credentials.aws_secret_access_key - else: + if not isinstance(staging_credentials, AwsCredentialsWithoutDefaults): raise LoadJobTerminalException( file_path, dedent( @@ -221,6 +215,11 @@ def __init__( ).strip(), ) + bucket_http_url = convert_storage_to_http_scheme( + bucket_url, endpoint=staging_credentials.endpoint_url + ) + access_key_id = staging_credentials.aws_access_key_id + secret_access_key = staging_credentials.aws_secret_access_key auth = "NOSIGN" if access_key_id and secret_access_key: auth = f"'{access_key_id}','{secret_access_key}'" @@ -311,7 +310,7 @@ def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> Li def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str: # Build column definition. # The primary key and sort order definition is defined outside column specification. - hints_str = " ".join( + hints_ = " ".join( self.active_hints.get(hint) for hint in self.active_hints.keys() if c.get(hint, False) is True @@ -328,7 +327,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non ) return ( - f"{self.capabilities.escape_identifier(c['name'])} {type_with_nullability_modifier} {hints_str}" + f"{self.capabilities.escape_identifier(c['name'])} {type_with_nullability_modifier} {hints_}" .strip() ) diff --git a/dlt/destinations/impl/clickhouse/configuration.py b/dlt/destinations/impl/clickhouse/configuration.py index 483356f9f9..7b97071abf 100644 --- a/dlt/destinations/impl/clickhouse/configuration.py +++ b/dlt/destinations/impl/clickhouse/configuration.py @@ -67,10 +67,9 @@ def get_query(self) -> Dict[str, Any]: "connect_timeout": str(self.connect_timeout), "send_receive_timeout": str(self.send_receive_timeout), "secure": 1 if self.secure else 0, - # Toggle experimental settings. These are necessary for certain datatypes and not optional. "allow_experimental_lightweight_delete": 1, - # "allow_experimental_object_type": 1, "enable_http_compression": 1, + "date_time_input_format": "best_effort", } ) return query diff --git a/docs/examples/custom_destination_lancedb/custom_destination_lancedb.py b/docs/examples/custom_destination_lancedb/custom_destination_lancedb.py index 9d75d90f99..ba815d4fcd 100644 --- a/docs/examples/custom_destination_lancedb/custom_destination_lancedb.py +++ b/docs/examples/custom_destination_lancedb/custom_destination_lancedb.py @@ -38,7 +38,9 @@ from dlt.sources.helpers.rest_client import RESTClient, AuthConfigBase # access secrets to get openai key and instantiate embedding function -openai_api_key: str = dlt.secrets.get("destination.lancedb.credentials.embedding_model_provider_api_key") +openai_api_key: str = dlt.secrets.get( + "destination.lancedb.credentials.embedding_model_provider_api_key" +) func = get_registry().get("openai").create(name="text-embedding-3-small", api_key=openai_api_key) diff --git a/tests/load/clickhouse/test_clickhouse_configuration.py b/tests/load/clickhouse/test_clickhouse_configuration.py index eb02155406..29dbce4c76 100644 --- a/tests/load/clickhouse/test_clickhouse_configuration.py +++ b/tests/load/clickhouse/test_clickhouse_configuration.py @@ -27,8 +27,8 @@ def client() -> Iterator[ClickHouseClient]: def test_clickhouse_connection_string_with_all_params() -> None: url = ( "clickhouse://user1:pass1@host1:9000/testdb?allow_experimental_lightweight_delete=1&" - "allow_experimental_object_type=1&connect_timeout=230&enable_http_compression=1&secure=0" - "&send_receive_timeout=1000" + "allow_experimental_object_type=1&connect_timeout=230&date_time_input_format=best_effort&" + "enable_http_compression=1&secure=0&send_receive_timeout=1000" ) creds = ClickHouseCredentials() diff --git a/tests/sources/helpers/rest_client/test_client.py b/tests/sources/helpers/rest_client/test_client.py index 7196ef3436..aa3f02e51d 100644 --- a/tests/sources/helpers/rest_client/test_client.py +++ b/tests/sources/helpers/rest_client/test_client.py @@ -234,7 +234,6 @@ def test_oauth2_client_credentials_flow_wrong_client_secret(self, rest_client: R assert e.type == HTTPError assert e.match("401 Client Error") - def test_oauth_token_expired_refresh(self, rest_client_immediate_oauth_expiry: RESTClient): rest_client = rest_client_immediate_oauth_expiry auth = cast(OAuth2ClientCredentials, rest_client.auth) From 544925a9fbc1a17772a817bc53ba514a0d4e4b89 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Tue, 25 Jun 2024 21:32:00 +0200 Subject: [PATCH 07/33] Adapt test to check whether CH OSS or cloud Signed-off-by: Marcel Coetzee --- .../impl/clickhouse/clickhouse.py | 3 +- .../impl/clickhouse/sql_client.py | 9 ++++- .../clickhouse/test_clickhouse_adapter.py | 36 ++++++++++++++----- .../test_clickhouse_configuration.py | 3 ++ 4 files changed, 41 insertions(+), 10 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 8baeff0dc4..728697a64e 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -2,7 +2,7 @@ import re from copy import deepcopy from textwrap import dedent -from typing import ClassVar, Optional, Dict, List, Sequence, cast, Tuple +from typing import ClassVar, Optional, Dict, List, Sequence, cast, Tuple, Literal from urllib.parse import urlparse import clickhouse_connect @@ -411,3 +411,4 @@ def _from_db_type( def restore_file_load(self, file_path: str) -> LoadJob: return EmptyLoadJob.from_file_path(file_path, "completed") + diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index 9c86f8f519..e1bee4b796 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -6,7 +6,7 @@ List, Optional, Sequence, - ClassVar, + ClassVar, Literal, ) import clickhouse_driver # type: ignore[import-untyped] @@ -32,6 +32,7 @@ from dlt.destinations.utils import _convert_to_old_pyformat +TDeployment = Literal["ClickHouseOSS", "ClickHouseCloud"] TRANSACTIONS_UNSUPPORTED_WARNING_MESSAGE = ( "ClickHouse does not support transactions! Each statement is auto-committed separately." ) @@ -204,3 +205,9 @@ def _make_database_exception(cls, ex: Exception) -> Exception: @staticmethod def is_dbapi_exception(ex: Exception) -> bool: return isinstance(ex, clickhouse_driver.dbapi.Error) + + def _get_deployment_type(self) -> TDeployment: + cloud_mode = int(self.execute_sql(""" + SELECT value FROM system.settings WHERE name = 'cloud_mode' + """)[0][0]) + return "ClickHouseCloud" if cloud_mode else "ClickHouseOSS" diff --git a/tests/load/clickhouse/test_clickhouse_adapter.py b/tests/load/clickhouse/test_clickhouse_adapter.py index cb364e63da..d1c0e4f993 100644 --- a/tests/load/clickhouse/test_clickhouse_adapter.py +++ b/tests/load/clickhouse/test_clickhouse_adapter.py @@ -2,6 +2,7 @@ import dlt from dlt.destinations.adapters import clickhouse_adapter +from dlt.destinations.impl.clickhouse.sql_client import TDeployment from tests.pipeline.utils import assert_load_info @@ -31,7 +32,15 @@ def not_annotated_resource() -> Generator[Dict[str, int], None, None]: clickhouse_adapter(replicated_merge_tree_resource, table_engine_type="replicated_merge_tree") pipe = dlt.pipeline(pipeline_name="adapter_test", destination="clickhouse", full_refresh=True) - pack = pipe.run([merge_tree_resource, replicated_merge_tree_resource, not_annotated_resource]) + + with pipe.sql_client() as client: + deployment_type: TDeployment = client._get_deployment_type() + + if deployment_type == "ClickHouseCloud": + pack = pipe.run([merge_tree_resource, replicated_merge_tree_resource, not_annotated_resource]) + else: + # `ReplicatedMergeTree` not supported if only a single node. + pack = pipe.run([merge_tree_resource, not_annotated_resource]) assert_load_info(pack) @@ -41,7 +50,10 @@ def not_annotated_resource() -> Generator[Dict[str, int], None, None]: for table in client._list_tables(): if "resource" in table: tables[table.split("___")[1]] = table - assert (len(tables.keys())) == 3 + if deployment_type=="ClickHouseCloud": + assert (len(tables.keys())) == 3 + else: + assert (len(tables.keys())) == 2 # Check the table content. for full_table_name in tables.values(): @@ -60,16 +72,24 @@ def not_annotated_resource() -> Generator[Dict[str, int], None, None]: "merge_tree_resource", "replicated_merge_tree_resource", ): - assert tuple(res[0])[2] in ( - "MergeTree", - "SharedMergeTree", - "ReplicatedMergeTree", - ) + if deployment_type == "ClickHouseCloud": + assert tuple(res[0])[2] in ( + "MergeTree", + "SharedMergeTree", + "ReplicatedMergeTree", + ) + else: + assert tuple(res[0])[2] in ( + "MergeTree", + ) else: # Non annotated resource needs to default to detected installation # type, i.e. cloud or self-managed. # CI runs on CH cloud, so will be `SharedMergeTree`. - assert tuple(res[0])[2] == "SharedMergeTree" + if deployment_type == "ClickHouseCloud": + assert tuple(res[0])[2] == "SharedMergeTree" + else: + assert tuple(res[0])[2] == "MergeTree" # We can check the generated table's SQL, though. with pipe.destination_client() as dest_client: diff --git a/tests/load/clickhouse/test_clickhouse_configuration.py b/tests/load/clickhouse/test_clickhouse_configuration.py index 29dbce4c76..fc116fbe9c 100644 --- a/tests/load/clickhouse/test_clickhouse_configuration.py +++ b/tests/load/clickhouse/test_clickhouse_configuration.py @@ -74,3 +74,6 @@ def test_clickhouse_connection_settings(client: ClickHouseClient) -> None: assert ("allow_experimental_lightweight_delete", "1") in res assert ("enable_http_compression", "1") in res + assert ("date_time_input_format", "best_effort") in res + + From b2a3596cda597be6ae6daf6014843d87789fc17b Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Wed, 26 Jun 2024 20:44:26 +0200 Subject: [PATCH 08/33] Defend against CH OSS unsupported dbapi datetime parsing Signed-off-by: Marcel Coetzee --- .../impl/clickhouse/clickhouse.py | 1 - .../impl/clickhouse/sql_client.py | 21 ++++++++++++++++--- .../clickhouse/test_clickhouse_adapter.py | 10 ++++----- .../test_clickhouse_configuration.py | 2 -- 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 728697a64e..c2955af550 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -411,4 +411,3 @@ def _from_db_type( def restore_file_load(self, file_path: str) -> LoadJob: return EmptyLoadJob.from_file_path(file_path, "completed") - diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index e1bee4b796..aef0557ceb 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -1,3 +1,4 @@ +import datetime # noqa: I251 from contextlib import contextmanager from typing import ( Iterator, @@ -6,15 +7,18 @@ List, Optional, Sequence, - ClassVar, Literal, + ClassVar, + Literal, ) import clickhouse_driver # type: ignore[import-untyped] import clickhouse_driver.errors # type: ignore[import-untyped] from clickhouse_driver.dbapi import OperationalError # type: ignore[import-untyped] from clickhouse_driver.dbapi.extras import DictCursor # type: ignore[import-untyped] - +from pendulum import DateTime # noqa: I251 +i from dlt.common.destination import DestinationCapabilitiesContext +from dlt.common.typing import DictStrAny from dlt.destinations.exceptions import ( DatabaseUndefinedRelation, DatabaseTransientException, @@ -132,6 +136,15 @@ def _list_tables(self) -> List[str]: ) return [row[0] for row in rows] + @staticmethod + def _sanitise_dbargs(db_args: DictStrAny) -> DictStrAny: + """For ClickHouse OSS, the DBapi driver doesn't parse datetime types. + We remove timezone specifications in this case.""" + for key, value in db_args.items(): + if isinstance(value, (DateTime, datetime.datetime)): + db_args[key] = str(value.replace(microsecond=0, tzinfo=None)) + return db_args + @contextmanager @raise_database_error def execute_query( @@ -139,12 +152,14 @@ def execute_query( ) -> Iterator[ClickHouseDBApiCursorImpl]: assert isinstance(query, str), "Query must be a string." - db_args = kwargs.copy() + db_args: DictStrAny = kwargs.copy() if args: query, db_args = _convert_to_old_pyformat(query, args, OperationalError) db_args.update(kwargs) + db_args = self._sanitise_dbargs(db_args) + with self._conn.cursor() as cursor: for query_line in query.split(";"): if query_line := query_line.strip(): diff --git a/tests/load/clickhouse/test_clickhouse_adapter.py b/tests/load/clickhouse/test_clickhouse_adapter.py index d1c0e4f993..b07e58e98f 100644 --- a/tests/load/clickhouse/test_clickhouse_adapter.py +++ b/tests/load/clickhouse/test_clickhouse_adapter.py @@ -37,7 +37,9 @@ def not_annotated_resource() -> Generator[Dict[str, int], None, None]: deployment_type: TDeployment = client._get_deployment_type() if deployment_type == "ClickHouseCloud": - pack = pipe.run([merge_tree_resource, replicated_merge_tree_resource, not_annotated_resource]) + pack = pipe.run( + [merge_tree_resource, replicated_merge_tree_resource, not_annotated_resource] + ) else: # `ReplicatedMergeTree` not supported if only a single node. pack = pipe.run([merge_tree_resource, not_annotated_resource]) @@ -50,7 +52,7 @@ def not_annotated_resource() -> Generator[Dict[str, int], None, None]: for table in client._list_tables(): if "resource" in table: tables[table.split("___")[1]] = table - if deployment_type=="ClickHouseCloud": + if deployment_type == "ClickHouseCloud": assert (len(tables.keys())) == 3 else: assert (len(tables.keys())) == 2 @@ -79,9 +81,7 @@ def not_annotated_resource() -> Generator[Dict[str, int], None, None]: "ReplicatedMergeTree", ) else: - assert tuple(res[0])[2] in ( - "MergeTree", - ) + assert tuple(res[0])[2] in ("MergeTree",) else: # Non annotated resource needs to default to detected installation # type, i.e. cloud or self-managed. diff --git a/tests/load/clickhouse/test_clickhouse_configuration.py b/tests/load/clickhouse/test_clickhouse_configuration.py index fc116fbe9c..dfffcfa4c7 100644 --- a/tests/load/clickhouse/test_clickhouse_configuration.py +++ b/tests/load/clickhouse/test_clickhouse_configuration.py @@ -75,5 +75,3 @@ def test_clickhouse_connection_settings(client: ClickHouseClient) -> None: assert ("allow_experimental_lightweight_delete", "1") in res assert ("enable_http_compression", "1") in res assert ("date_time_input_format", "best_effort") in res - - From e2a5c4afb01fa8e92a70daa43b53c2b0bc5f5b6c Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Wed, 26 Jun 2024 20:51:55 +0200 Subject: [PATCH 09/33] Fix typo Signed-off-by: Marcel Coetzee --- dlt/destinations/impl/clickhouse/sql_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index aef0557ceb..b962774f3e 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -16,7 +16,7 @@ from clickhouse_driver.dbapi import OperationalError # type: ignore[import-untyped] from clickhouse_driver.dbapi.extras import DictCursor # type: ignore[import-untyped] from pendulum import DateTime # noqa: I251 -i + from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.typing import DictStrAny from dlt.destinations.exceptions import ( From 9ef12c12dab390ce16e58367e75875906cda224e Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Wed, 26 Jun 2024 22:24:42 +0200 Subject: [PATCH 10/33] Add ClickHouse to local destination tests Signed-off-by: Marcel Coetzee --- .github/clickhouse-compose.yml | 22 +++++++++++++++ .github/workflows/test_local_destinations.yml | 27 ++++++++++++++----- 2 files changed, 43 insertions(+), 6 deletions(-) create mode 100644 .github/clickhouse-compose.yml diff --git a/.github/clickhouse-compose.yml b/.github/clickhouse-compose.yml new file mode 100644 index 0000000000..6c277ef536 --- /dev/null +++ b/.github/clickhouse-compose.yml @@ -0,0 +1,22 @@ +--- +version: '3' + +services: + clickhouse: + image: clickhouse/clickhouse-server + ports: + - "9000:9000" + - "8123:8123" + environment: + - CLICKHOUSE_DB=dlt_data + - CLICKHOUSE_USER=loader + - CLICKHOUSE_PASSWORD=loader + - CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 + volumes: + - clickhouse_data:/var/lib/clickhouse/ + - clickhouse_logs:/var/log/clickhouse-server/ + restart: unless-stopped + +volumes: + clickhouse_data: + clickhouse_logs: diff --git a/.github/workflows/test_local_destinations.yml b/.github/workflows/test_local_destinations.yml index 263d3f588c..a30e7d2317 100644 --- a/.github/workflows/test_local_destinations.yml +++ b/.github/workflows/test_local_destinations.yml @@ -1,7 +1,7 @@ # Tests destinations that can run without credentials. -# i.e. local postgres, duckdb, filesystem (with local fs/memory bucket) +# i.e. postgres, duckdb, weaviate, clickhouse and filesystem (with local fs/memory bucket) -name: dest | postgres, duckdb and fs +name: dest | postgres, duckdb, weaviate, clickhouse and fs local tests on: pull_request: @@ -21,7 +21,7 @@ env: RUNTIME__SENTRY_DSN: https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752 RUNTIME__LOG_LEVEL: ERROR RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }} - ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\", \"weaviate\"]" + ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\", \"weaviate\", \"clickhouse\"]" ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]" DESTINATION__WEAVIATE__VECTORIZER: text2vec-contextionary @@ -33,7 +33,7 @@ jobs: uses: ./.github/workflows/get_docs_changes.yml run_loader: - name: dest | postgres, duckdb and fs local tests + name: dest | postgres, duckdb, weaviate, clickhouse and fs local tests needs: get_docs_changes if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' strategy: @@ -70,6 +70,9 @@ jobs: - name: Start weaviate run: docker-compose -f ".github/weaviate-compose.yml" up -d + - name: Start ClickHouse + run: docker-compose -f ".github/clickhouse-compose.yml" up -d + - name: Setup Python uses: actions/setup-python@v4 with: @@ -90,17 +93,29 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations - name: Install dependencies - run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline -E deltalake + run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E clickhouse --with sentry-sdk --with pipeline -E deltalake - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml - # always run full suite, also on branches + # Always run full suite, also on branches. - run: poetry run pytest tests/load && poetry run pytest tests/cli name: Run tests Linux env: DESTINATION__POSTGRES__CREDENTIALS: postgresql://loader:loader@localhost:5432/dlt_data + DESTINATION__CLICKHOUSE__CREDENTIALS__HOST: localhost + DESTINATION__CLICKHOUSE__CREDENTIALS__DATABASE: dlt_data + DESTINATION__CLICKHOUSE__CREDENTIALS__USERNAME: loader + DESTINATION__CLICKHOUSE__CREDENTIALS__PASSWORD: loader + DESTINATION__CLICKHOUSE__CREDENTIALS__PORT: 9000 + DESTINATION__CLICKHOUSE__CREDENTIALS__HTTP_PORT: 8123 + DESTINATION__CLICKHOUSE__CREDENTIALS__SECURE: 0 + - name: Stop weaviate if: always() run: docker-compose -f ".github/weaviate-compose.yml" down -v + + - name: Stop ClickHouse + if: always() + run: docker-compose -f ".github/clickhouse-compose.yml" down -v From 2388cd678ed0af581ead85600fe0af828d0ae398 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Wed, 26 Jun 2024 23:04:54 +0200 Subject: [PATCH 11/33] Update ClickHouse test workflow and remove engine types Signed-off-by: Marcel Coetzee --- .../workflows/test_destination_clickhouse.yml | 22 +++++++++++++-- .github/workflows/test_local_destinations.yml | 27 +++++-------------- .../impl/clickhouse/clickhouse.py | 2 -- .../impl/clickhouse/clickhouse_adapter.py | 2 +- 4 files changed, 27 insertions(+), 26 deletions(-) diff --git a/.github/workflows/test_destination_clickhouse.yml b/.github/workflows/test_destination_clickhouse.yml index d834df6b28..6b782219a6 100644 --- a/.github/workflows/test_destination_clickhouse.yml +++ b/.github/workflows/test_destination_clickhouse.yml @@ -20,7 +20,7 @@ env: DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }} ACTIVE_DESTINATIONS: "[\"clickhouse\"]" - ALL_FILESYSTEM_DRIVERS: "[\"memory\"]" + ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]" jobs: get_docs_changes: @@ -45,7 +45,7 @@ jobs: - name: Setup Python uses: actions/setup-python@v4 with: - python-version: "3.10.x" + python-version: "3.11.x" - name: Install Poetry uses: snok/install-poetry@v1.3.2 @@ -76,3 +76,21 @@ jobs: poetry run pytest tests/load name: Run all tests Linux if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}} + + - name: Start ClickHouse + run: docker-compose -f ".github/clickhouse-compose.yml" up -d + + - run: poetry run pytest tests/load && poetry run pytest tests/cli + name: Run all local tests Linux + env: + DESTINATION__CLICKHOUSE__CREDENTIALS__HOST: localhost + DESTINATION__CLICKHOUSE__CREDENTIALS__DATABASE: dlt_data + DESTINATION__CLICKHOUSE__CREDENTIALS__USERNAME: loader + DESTINATION__CLICKHOUSE__CREDENTIALS__PASSWORD: loader + DESTINATION__CLICKHOUSE__CREDENTIALS__PORT: 9000 + DESTINATION__CLICKHOUSE__CREDENTIALS__HTTP_PORT: 8123 + DESTINATION__CLICKHOUSE__CREDENTIALS__SECURE: 0 + + - name: Stop ClickHouse + if: always() + run: docker-compose -f ".github/clickhouse-compose.yml" down -v diff --git a/.github/workflows/test_local_destinations.yml b/.github/workflows/test_local_destinations.yml index a30e7d2317..263d3f588c 100644 --- a/.github/workflows/test_local_destinations.yml +++ b/.github/workflows/test_local_destinations.yml @@ -1,7 +1,7 @@ # Tests destinations that can run without credentials. -# i.e. postgres, duckdb, weaviate, clickhouse and filesystem (with local fs/memory bucket) +# i.e. local postgres, duckdb, filesystem (with local fs/memory bucket) -name: dest | postgres, duckdb, weaviate, clickhouse and fs local tests +name: dest | postgres, duckdb and fs on: pull_request: @@ -21,7 +21,7 @@ env: RUNTIME__SENTRY_DSN: https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752 RUNTIME__LOG_LEVEL: ERROR RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }} - ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\", \"weaviate\", \"clickhouse\"]" + ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\", \"weaviate\"]" ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]" DESTINATION__WEAVIATE__VECTORIZER: text2vec-contextionary @@ -33,7 +33,7 @@ jobs: uses: ./.github/workflows/get_docs_changes.yml run_loader: - name: dest | postgres, duckdb, weaviate, clickhouse and fs local tests + name: dest | postgres, duckdb and fs local tests needs: get_docs_changes if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' strategy: @@ -70,9 +70,6 @@ jobs: - name: Start weaviate run: docker-compose -f ".github/weaviate-compose.yml" up -d - - name: Start ClickHouse - run: docker-compose -f ".github/clickhouse-compose.yml" up -d - - name: Setup Python uses: actions/setup-python@v4 with: @@ -93,29 +90,17 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations - name: Install dependencies - run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E clickhouse --with sentry-sdk --with pipeline -E deltalake + run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline -E deltalake - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml - # Always run full suite, also on branches. + # always run full suite, also on branches - run: poetry run pytest tests/load && poetry run pytest tests/cli name: Run tests Linux env: DESTINATION__POSTGRES__CREDENTIALS: postgresql://loader:loader@localhost:5432/dlt_data - DESTINATION__CLICKHOUSE__CREDENTIALS__HOST: localhost - DESTINATION__CLICKHOUSE__CREDENTIALS__DATABASE: dlt_data - DESTINATION__CLICKHOUSE__CREDENTIALS__USERNAME: loader - DESTINATION__CLICKHOUSE__CREDENTIALS__PASSWORD: loader - DESTINATION__CLICKHOUSE__CREDENTIALS__PORT: 9000 - DESTINATION__CLICKHOUSE__CREDENTIALS__HTTP_PORT: 8123 - DESTINATION__CLICKHOUSE__CREDENTIALS__SECURE: 0 - - name: Stop weaviate if: always() run: docker-compose -f ".github/weaviate-compose.yml" down -v - - - name: Stop ClickHouse - if: always() - run: docker-compose -f ".github/clickhouse-compose.yml" down -v diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index c2955af550..70c2749085 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -66,8 +66,6 @@ "merge_tree": "MergeTree", "shared_merge_tree": "SharedMergeTree", "replicated_merge_tree": "ReplicatedMergeTree", - "stripe_log": "StripeLog", - "tiny_log": "TinyLog", } diff --git a/dlt/destinations/impl/clickhouse/clickhouse_adapter.py b/dlt/destinations/impl/clickhouse/clickhouse_adapter.py index 97e163ed62..413b3007b7 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse_adapter.py +++ b/dlt/destinations/impl/clickhouse/clickhouse_adapter.py @@ -6,7 +6,7 @@ TTableEngineType = Literal[ - "merge_tree", "shared_merge_tree", "replicated_merge_tree", "stripe_log", "tiny_log" + "merge_tree", "shared_merge_tree", "replicated_merge_tree", ] """ From b0e3751b4162c6af41b2afbdd129c8ac32e15118 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 27 Jun 2024 11:09:00 +0200 Subject: [PATCH 12/33] Use Python 3.10.x for ClickHouse destination tests Signed-off-by: Marcel Coetzee --- .github/workflows/test_destination_clickhouse.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_destination_clickhouse.yml b/.github/workflows/test_destination_clickhouse.yml index 6b782219a6..cbe95756c7 100644 --- a/.github/workflows/test_destination_clickhouse.yml +++ b/.github/workflows/test_destination_clickhouse.yml @@ -45,7 +45,7 @@ jobs: - name: Setup Python uses: actions/setup-python@v4 with: - python-version: "3.11.x" + python-version: "3.10.x" - name: Install Poetry uses: snok/install-poetry@v1.3.2 From f077346d6ecc4168e77ef4a1f9b6393d429e9e2a Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 27 Jun 2024 11:16:33 +0200 Subject: [PATCH 13/33] Add ClickHouse MergeTree support and refactor code Signed-off-by: Marcel Coetzee --- dlt/destinations/impl/clickhouse/clickhouse.py | 8 ++------ dlt/destinations/impl/clickhouse/clickhouse_adapter.py | 4 +++- dlt/destinations/impl/clickhouse/sql_client.py | 1 + 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 561a1730b0..798f302be2 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -2,7 +2,7 @@ import re from copy import deepcopy from textwrap import dedent -from typing import ClassVar, Optional, Dict, List, Sequence, cast, Tuple, Literal +from typing import Optional, Dict, List, Sequence, cast from urllib.parse import urlparse import clickhouse_connect @@ -28,12 +28,9 @@ TTableSchema, TColumnHint, TColumnType, - TTableSchemaColumns, - TColumnSchemaBase, ) from dlt.common.storages import FileStorage from dlt.destinations.exceptions import LoadJobTerminalException -from dlt.destinations.impl.clickhouse import capabilities from dlt.destinations.impl.clickhouse.clickhouse_adapter import ( TTableEngineType, TABLE_ENGINE_TYPE_HINT, @@ -324,8 +321,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non ) return ( - f"{self.sql_client.escape_column_name(c['name'])} {type_with_nullability_modifier} {hints_str}" - f"{self.capabilities.escape_identifier(c['name'])} {type_with_nullability_modifier} {hints_}" + f"{self.sql_client.escape_column_name(c['name'])} {type_with_nullability_modifier} {hints_}" .strip() ) diff --git a/dlt/destinations/impl/clickhouse/clickhouse_adapter.py b/dlt/destinations/impl/clickhouse/clickhouse_adapter.py index 413b3007b7..fce5694a08 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse_adapter.py +++ b/dlt/destinations/impl/clickhouse/clickhouse_adapter.py @@ -6,7 +6,9 @@ TTableEngineType = Literal[ - "merge_tree", "shared_merge_tree", "replicated_merge_tree", + "merge_tree", + "shared_merge_tree", + "replicated_merge_tree", ] """ diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index c103c447f4..9b5dcd939b 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -9,6 +9,7 @@ Sequence, ClassVar, Literal, + Tuple, ) import clickhouse_driver # type: ignore[import-untyped] From 6e44a711700df095334e0df145741d921d440be9 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 27 Jun 2024 17:27:36 +0200 Subject: [PATCH 14/33] Update ClickHouse Docker setup and test workflow Signed-off-by: Marcel Coetzee --- .github/clickhouse-compose.yml | 8 ++++++-- .github/workflows/test_destination_clickhouse.yml | 12 ++++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/.github/clickhouse-compose.yml b/.github/clickhouse-compose.yml index 6c277ef536..b6415b120a 100644 --- a/.github/clickhouse-compose.yml +++ b/.github/clickhouse-compose.yml @@ -1,6 +1,4 @@ --- -version: '3' - services: clickhouse: image: clickhouse/clickhouse-server @@ -16,6 +14,12 @@ services: - clickhouse_data:/var/lib/clickhouse/ - clickhouse_logs:/var/log/clickhouse-server/ restart: unless-stopped + healthcheck: + test: [ "CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8123/ping" ] + interval: 3s + timeout: 5s + retries: 5 + volumes: clickhouse_data: diff --git a/.github/workflows/test_destination_clickhouse.yml b/.github/workflows/test_destination_clickhouse.yml index cbe95756c7..687b110cfc 100644 --- a/.github/workflows/test_destination_clickhouse.yml +++ b/.github/workflows/test_destination_clickhouse.yml @@ -1,4 +1,3 @@ - name: test | clickhouse on: @@ -8,7 +7,7 @@ on: - devel workflow_dispatch: schedule: - - cron: '0 2 * * *' + - cron: '0 2 * * *' concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} @@ -77,8 +76,13 @@ jobs: name: Run all tests Linux if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}} - - name: Start ClickHouse - run: docker-compose -f ".github/clickhouse-compose.yml" up -d + - run: | + docker-compose -f ".github/clickhouse-compose.yml" up -d + echo "Waiting for ClickHouse to be healthy..." + timeout 30s bash -c 'until docker-compose -f ".github/clickhouse-compose.yml" ps | grep -q "healthy"; do sleep 1; done' + echo "ClickHouse is up and running" + name: Start ClickHouse + - run: poetry run pytest tests/load && poetry run pytest tests/cli name: Run all local tests Linux From 61bae45717b391dcc67ddd975515d58529d9cc6f Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 27 Jun 2024 19:52:16 +0200 Subject: [PATCH 15/33] Refactor ClickHouse tests to cover both OSS and Cloud versions Signed-off-by: Marcel Coetzee --- .../workflows/test_destination_clickhouse.yml | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/.github/workflows/test_destination_clickhouse.yml b/.github/workflows/test_destination_clickhouse.yml index 687b110cfc..26916d7167 100644 --- a/.github/workflows/test_destination_clickhouse.yml +++ b/.github/workflows/test_destination_clickhouse.yml @@ -66,26 +66,27 @@ jobs: - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml - - run: | - poetry run pytest tests/load -m "essential" - name: Run essential tests Linux - if: ${{ ! (contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule')}} - - - run: | - poetry run pytest tests/load - name: Run all tests Linux - if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}} - + # OSS ClickHouse - run: | docker-compose -f ".github/clickhouse-compose.yml" up -d echo "Waiting for ClickHouse to be healthy..." timeout 30s bash -c 'until docker-compose -f ".github/clickhouse-compose.yml" ps | grep -q "healthy"; do sleep 1; done' echo "ClickHouse is up and running" - name: Start ClickHouse + name: Start ClickHouse OSS - - run: poetry run pytest tests/load && poetry run pytest tests/cli - name: Run all local tests Linux + - run: poetry run pytest tests/load -m "essential" + name: Run essential tests Linux (ClickHouse OSS) + env: + DESTINATION__CLICKHOUSE__CREDENTIALS__HOST: localhost + DESTINATION__CLICKHOUSE__CREDENTIALS__DATABASE: dlt_data + DESTINATION__CLICKHOUSE__CREDENTIALS__USERNAME: loader + DESTINATION__CLICKHOUSE__CREDENTIALS__PASSWORD: loader + DESTINATION__CLICKHOUSE__CREDENTIALS__PORT: 9000 + DESTINATION__CLICKHOUSE__CREDENTIALS__HTTP_PORT: 8123 + + - run: poetry run pytest tests/load + name: Run all tests Linux (ClickHouse OSS) env: DESTINATION__CLICKHOUSE__CREDENTIALS__HOST: localhost DESTINATION__CLICKHOUSE__CREDENTIALS__DATABASE: dlt_data @@ -95,6 +96,18 @@ jobs: DESTINATION__CLICKHOUSE__CREDENTIALS__HTTP_PORT: 8123 DESTINATION__CLICKHOUSE__CREDENTIALS__SECURE: 0 - - name: Stop ClickHouse + - name: Stop ClickHouse OSS if: always() run: docker-compose -f ".github/clickhouse-compose.yml" down -v + + # ClickHouse Cloud + - run: | + poetry run pytest tests/load -m "essential" + name: Run essential tests Linux (ClickHouse Cloud) + if: ${{ ! (contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule')}} + + - run: | + poetry run pytest tests/load + name: Run all tests Linux (ClickHouse Cloud) + if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}} + From 6d50c519957951fea4254b9e673aa41fadb44757 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 27 Jun 2024 19:54:19 +0200 Subject: [PATCH 16/33] Disable SSL for ClickHouse OSS tests Signed-off-by: Marcel Coetzee --- .github/workflows/test_destination_clickhouse.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/test_destination_clickhouse.yml b/.github/workflows/test_destination_clickhouse.yml index 26916d7167..e322f12d6c 100644 --- a/.github/workflows/test_destination_clickhouse.yml +++ b/.github/workflows/test_destination_clickhouse.yml @@ -84,6 +84,7 @@ jobs: DESTINATION__CLICKHOUSE__CREDENTIALS__PASSWORD: loader DESTINATION__CLICKHOUSE__CREDENTIALS__PORT: 9000 DESTINATION__CLICKHOUSE__CREDENTIALS__HTTP_PORT: 8123 + DESTINATION__CLICKHOUSE__CREDENTIALS__SECURE: 0 - run: poetry run pytest tests/load name: Run all tests Linux (ClickHouse OSS) From e1ac1cebc44c725fed38664d5f9a0167e571c10e Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Sat, 29 Jun 2024 00:03:40 +0200 Subject: [PATCH 17/33] Use state instead of sentinel tables Signed-off-by: Marcel Coetzee --- .../impl/clickhouse/clickhouse.py | 46 +++++++++++++++++++ .../impl/clickhouse/configuration.py | 5 +- .../impl/clickhouse/sql_client.py | 21 +-------- 3 files changed, 48 insertions(+), 24 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 798f302be2..3abff0c74f 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -1,3 +1,4 @@ +import contextlib import os import re from copy import deepcopy @@ -9,6 +10,7 @@ from clickhouse_connect.driver.tools import insert_file from dlt import config +from dlt.common.configuration.container import Container from dlt.common.configuration.specs import ( CredentialsConfiguration, AzureCredentialsWithoutDefaults, @@ -30,6 +32,13 @@ TColumnType, ) from dlt.common.storages import FileStorage +from dlt.common.storages.load_package import ( + LoadPackageStateInjectableContext, + commit_load_package_state, +) +from dlt.common.storages.load_package import ( + TLoadPackageState, +) from dlt.destinations.exceptions import LoadJobTerminalException from dlt.destinations.impl.clickhouse.clickhouse_adapter import ( TTableEngineType, @@ -298,6 +307,43 @@ def __init__( self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR) self.type_mapper = ClickHouseTypeMapper(self.capabilities) + def has_dataset(self) -> bool: + try: + container = Container() + state_ctx = container[LoadPackageStateInjectableContext] + datasets = cast(List[str], state_ctx.state.get("datasets", [])) + return self.sql_client.dataset_name in datasets + except KeyError: + return False + + def create_dataset(self) -> None: + with contextlib.suppress(KeyError): + container = Container() + state_ctx = container[LoadPackageStateInjectableContext] + datasets = cast(List[str], state_ctx.state.get("datasets", [])) + if self.sql_client.dataset_name not in datasets: + datasets.append(self.sql_client.dataset_name) + state: TLoadPackageState = state_ctx.state + state["datasets"] = datasets # type: ignore[typeddict-unknown-key] + commit_load_package_state() + + def drop_dataset(self) -> None: + with contextlib.suppress(KeyError): + container = Container() + state_ctx = container[LoadPackageStateInjectableContext] + datasets = cast(List[str], state_ctx.state.get("datasets", [])) + if self.sql_client.dataset_name in datasets: + datasets.remove(self.sql_client.dataset_name) + state: TLoadPackageState = state_ctx.state + state["datasets"] = datasets # type: ignore[typeddict-unknown-key] + commit_load_package_state() + + to_drop_results = self.sql_client._list_tables() + for table in to_drop_results: + self.sql_client.execute_sql( + f"""DROP TABLE {self.sql_client.catalog_name()}.{self.capabilities.escape_identifier(table)} SYNC""" + ) + def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: return [ClickHouseMergeJob.from_table_chain(table_chain, self.sql_client)] diff --git a/dlt/destinations/impl/clickhouse/configuration.py b/dlt/destinations/impl/clickhouse/configuration.py index 7b97071abf..5c6204b9bc 100644 --- a/dlt/destinations/impl/clickhouse/configuration.py +++ b/dlt/destinations/impl/clickhouse/configuration.py @@ -6,7 +6,6 @@ from dlt.common.destination.reference import ( DestinationClientDwhWithStagingConfiguration, ) -from dlt.common.libs.sql_alchemy import URL from dlt.common.utils import digest128 @@ -36,8 +35,6 @@ class ClickHouseCredentials(ConnectionStringCredentials): """Timeout for sending and receiving data. Defaults to 300 seconds.""" dataset_table_separator: str = "___" """Separator for dataset table names, defaults to '___', i.e. 'database.dataset___table'.""" - dataset_sentinel_table_name: str = "dlt_sentinel_table" - """Special table to mark dataset as existing""" gcp_access_key_id: Optional[str] = None """When loading from a gcp bucket, you need to provide gcp interoperable keys""" gcp_secret_access_key: Optional[str] = None @@ -86,7 +83,7 @@ class ClickHouseClientConfiguration(DestinationClientDwhWithStagingConfiguration # See: https://clickhouse.com/docs/en/optimize/sparse-primary-indexes def fingerprint(self) -> str: - """Returns a fingerprint of host part of a connection string.""" + """Returns a fingerprint of the host part of a connection string.""" if self.credentials and self.credentials.host: return digest128(self.credentials.host) return "" diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index 9b5dcd939b..9b1617f043 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -62,13 +62,6 @@ def __init__( self.credentials = credentials self.database_name = credentials.database - def has_dataset(self) -> bool: - # we do not need to normalize dataset_sentinel_table_name - sentinel_table = self.credentials.dataset_sentinel_table_name - return sentinel_table in [ - t.split(self.credentials.dataset_table_separator)[1] for t in self._list_tables() - ] - def open_connection(self) -> clickhouse_driver.dbapi.connection.Connection: self._conn = clickhouse_driver.connect(dsn=self.credentials.to_native_representation()) return self._conn @@ -102,18 +95,6 @@ def execute_sql( with self.execute_query(sql, *args, **kwargs) as curr: return None if curr.description is None else curr.fetchall() - def create_dataset(self) -> None: - # We create a sentinel table which defines whether we consider the dataset created. - sentinel_table_name = self.make_qualified_table_name( - self.credentials.dataset_sentinel_table_name - ) - # `MergeTree` is guaranteed to work in both self-managed and cloud setups. - self.execute_sql(f""" - CREATE TABLE {sentinel_table_name} - (_dlt_id String NOT NULL PRIMARY KEY) - ENGINE=MergeTree - COMMENT 'internal dlt sentinel table'""") - def drop_dataset(self) -> None: # Since ClickHouse doesn't have schemas, we need to drop all tables in our virtual schema, # or collection of tables, that has the `dataset_name` as a prefix. @@ -121,7 +102,7 @@ def drop_dataset(self) -> None: for table in to_drop_results: # The "DROP TABLE" clause is discarded if we allow clickhouse_driver to handle parameter substitution. # This is because the driver incorrectly substitutes the entire query string, causing the "DROP TABLE" keyword to be omitted. - # To resolve this, we are forced to provide the full query string here. + # To resolve this, we're forced to provide the full query string here. self.execute_sql( f"""DROP TABLE {self.catalog_name()}.{self.capabilities.escape_identifier(table)} SYNC""" ) From 409487ce0669a06941769c0a5414c8d71289d2b7 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Sat, 29 Jun 2024 00:19:19 +0200 Subject: [PATCH 18/33] Remove mention of sentinel table for ClickHouse datasets Signed-off-by: Marcel Coetzee --- docs/website/docs/dlt-ecosystem/destinations/clickhouse.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md index ec5eb18523..a092c6f9f0 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md @@ -112,9 +112,6 @@ Data is loaded into ClickHouse using the most efficient method depending on the `Clickhouse` does not support multiple datasets in one database, dlt relies on datasets to exist for multiple reasons. To make `clickhouse` work with `dlt`, tables generated by `dlt` in your `clickhouse` database will have their name prefixed with the dataset name separated by the configurable `dataset_table_separator`. -Additionally, a special sentinel table that doesn't contain any data will be created, so dlt knows which virtual datasets already exist in a -clickhouse -destination. ## Supported file formats From 07c8f498d45c59e0db1e81e319b885e5dd257e8e Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Mon, 1 Jul 2024 15:39:25 +0200 Subject: [PATCH 19/33] Refactor ClickHouse deployment type detection Signed-off-by: Marcel Coetzee --- dlt/destinations/impl/clickhouse/sql_client.py | 6 ------ tests/load/clickhouse/test_clickhouse_adapter.py | 3 ++- tests/load/clickhouse/utils.py | 8 ++++++++ 3 files changed, 10 insertions(+), 7 deletions(-) create mode 100644 tests/load/clickhouse/utils.py diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index 9b1617f043..8d42bbef58 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -217,9 +217,3 @@ def _make_database_exception(cls, ex: Exception) -> Exception: @staticmethod def is_dbapi_exception(ex: Exception) -> bool: return isinstance(ex, clickhouse_driver.dbapi.Error) - - def _get_deployment_type(self) -> TDeployment: - cloud_mode = int(self.execute_sql(""" - SELECT value FROM system.settings WHERE name = 'cloud_mode' - """)[0][0]) - return "ClickHouseCloud" if cloud_mode else "ClickHouseOSS" diff --git a/tests/load/clickhouse/test_clickhouse_adapter.py b/tests/load/clickhouse/test_clickhouse_adapter.py index 88a21f9bc8..4abf094576 100644 --- a/tests/load/clickhouse/test_clickhouse_adapter.py +++ b/tests/load/clickhouse/test_clickhouse_adapter.py @@ -3,6 +3,7 @@ import dlt from dlt.destinations.adapters import clickhouse_adapter from dlt.destinations.impl.clickhouse.sql_client import TDeployment +from tests.load.clickhouse.utils import get_deployment_type from tests.pipeline.utils import assert_load_info @@ -34,7 +35,7 @@ def not_annotated_resource() -> Generator[Dict[str, int], None, None]: pipe = dlt.pipeline(pipeline_name="adapter_test", destination="clickhouse", dev_mode=True) with pipe.sql_client() as client: - deployment_type: TDeployment = client._get_deployment_type() + deployment_type: TDeployment = get_deployment_type(client) if deployment_type == "ClickHouseCloud": pack = pipe.run( diff --git a/tests/load/clickhouse/utils.py b/tests/load/clickhouse/utils.py new file mode 100644 index 0000000000..809e929261 --- /dev/null +++ b/tests/load/clickhouse/utils.py @@ -0,0 +1,8 @@ +from dlt.destinations.impl.clickhouse.sql_client import TDeployment, ClickHouseSqlClient + + +def get_deployment_type(client: ClickHouseSqlClient) -> TDeployment: + cloud_mode = int(client.execute_sql(""" + SELECT value FROM system.settings WHERE name = 'cloud_mode' + """)[0][0]) + return "ClickHouseCloud" if cloud_mode else "ClickHouseOSS" From aff6c047994ebfd22728ec116c53be4e9a5d75c2 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Mon, 1 Jul 2024 15:43:14 +0200 Subject: [PATCH 20/33] Add conditional execution for ClickHouse OSS tests Signed-off-by: Marcel Coetzee --- .github/workflows/test_destination_clickhouse.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/test_destination_clickhouse.yml b/.github/workflows/test_destination_clickhouse.yml index e322f12d6c..4b03c60b9a 100644 --- a/.github/workflows/test_destination_clickhouse.yml +++ b/.github/workflows/test_destination_clickhouse.yml @@ -77,6 +77,7 @@ jobs: - run: poetry run pytest tests/load -m "essential" name: Run essential tests Linux (ClickHouse OSS) + if: ${{ ! (contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule')}} env: DESTINATION__CLICKHOUSE__CREDENTIALS__HOST: localhost DESTINATION__CLICKHOUSE__CREDENTIALS__DATABASE: dlt_data @@ -88,6 +89,7 @@ jobs: - run: poetry run pytest tests/load name: Run all tests Linux (ClickHouse OSS) + if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}} env: DESTINATION__CLICKHOUSE__CREDENTIALS__HOST: localhost DESTINATION__CLICKHOUSE__CREDENTIALS__DATABASE: dlt_data From cbc782c6c272dba083546ec8970905066e8de1cf Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Mon, 1 Jul 2024 15:50:31 +0200 Subject: [PATCH 21/33] Update ClickHouse compose file path and move to tests directory Signed-off-by: Marcel Coetzee --- .github/workflows/test_destination_clickhouse.yml | 4 ++-- {.github => tests/load/clickhouse}/clickhouse-compose.yml | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename {.github => tests/load/clickhouse}/clickhouse-compose.yml (100%) diff --git a/.github/workflows/test_destination_clickhouse.yml b/.github/workflows/test_destination_clickhouse.yml index 4b03c60b9a..c61e278777 100644 --- a/.github/workflows/test_destination_clickhouse.yml +++ b/.github/workflows/test_destination_clickhouse.yml @@ -70,7 +70,7 @@ jobs: - run: | docker-compose -f ".github/clickhouse-compose.yml" up -d echo "Waiting for ClickHouse to be healthy..." - timeout 30s bash -c 'until docker-compose -f ".github/clickhouse-compose.yml" ps | grep -q "healthy"; do sleep 1; done' + timeout 30s bash -c 'until docker-compose -f "tests/load/clickhouse/clickhouse-compose.yml" ps | grep -q "healthy"; do sleep 1; done' echo "ClickHouse is up and running" name: Start ClickHouse OSS @@ -101,7 +101,7 @@ jobs: - name: Stop ClickHouse OSS if: always() - run: docker-compose -f ".github/clickhouse-compose.yml" down -v + run: docker-compose -f "tests/load/clickhouse/clickhouse-compose.yml" down -v # ClickHouse Cloud - run: | diff --git a/.github/clickhouse-compose.yml b/tests/load/clickhouse/clickhouse-compose.yml similarity index 100% rename from .github/clickhouse-compose.yml rename to tests/load/clickhouse/clickhouse-compose.yml From c4088240dbd0978b4e5ee8824dfa179af534761a Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Mon, 1 Jul 2024 15:56:27 +0200 Subject: [PATCH 22/33] Update ClickHouse docker-compose file path in test workflow Signed-off-by: Marcel Coetzee --- .github/workflows/test_destination_clickhouse.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_destination_clickhouse.yml b/.github/workflows/test_destination_clickhouse.yml index c61e278777..4aea5a8e90 100644 --- a/.github/workflows/test_destination_clickhouse.yml +++ b/.github/workflows/test_destination_clickhouse.yml @@ -68,7 +68,7 @@ jobs: # OSS ClickHouse - run: | - docker-compose -f ".github/clickhouse-compose.yml" up -d + docker-compose -f "tests/load/clickhouse/clickhouse-compose.yml" up -d echo "Waiting for ClickHouse to be healthy..." timeout 30s bash -c 'until docker-compose -f "tests/load/clickhouse/clickhouse-compose.yml" ps | grep -q "healthy"; do sleep 1; done' echo "ClickHouse is up and running" From 0d306a55fe658384ff5574cc60b7a447c747caa0 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Mon, 1 Jul 2024 16:02:42 +0200 Subject: [PATCH 23/33] Cast client to ClickHouseSqlClient in get_deployment_type call Signed-off-by: Marcel Coetzee --- tests/load/clickhouse/test_clickhouse_adapter.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/load/clickhouse/test_clickhouse_adapter.py b/tests/load/clickhouse/test_clickhouse_adapter.py index 4abf094576..6fadbd122b 100644 --- a/tests/load/clickhouse/test_clickhouse_adapter.py +++ b/tests/load/clickhouse/test_clickhouse_adapter.py @@ -1,8 +1,8 @@ -from typing import Generator, Dict +from typing import Generator, Dict, cast import dlt from dlt.destinations.adapters import clickhouse_adapter -from dlt.destinations.impl.clickhouse.sql_client import TDeployment +from dlt.destinations.impl.clickhouse.sql_client import TDeployment, ClickHouseSqlClient from tests.load.clickhouse.utils import get_deployment_type from tests.pipeline.utils import assert_load_info @@ -35,7 +35,7 @@ def not_annotated_resource() -> Generator[Dict[str, int], None, None]: pipe = dlt.pipeline(pipeline_name="adapter_test", destination="clickhouse", dev_mode=True) with pipe.sql_client() as client: - deployment_type: TDeployment = get_deployment_type(client) + deployment_type: TDeployment = get_deployment_type(cast(ClickHouseSqlClient, client)) if deployment_type == "ClickHouseCloud": pack = pipe.run( From 28866ee80ea4a0768812eb46eb84375a1585173f Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 4 Jul 2024 16:02:02 +0200 Subject: [PATCH 24/33] Revert "Remove mention of sentinel table for ClickHouse datasets" This reverts commit 409487ce0669a06941769c0a5414c8d71289d2b7. --- docs/website/docs/dlt-ecosystem/destinations/clickhouse.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md index a092c6f9f0..ec5eb18523 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md @@ -112,6 +112,9 @@ Data is loaded into ClickHouse using the most efficient method depending on the `Clickhouse` does not support multiple datasets in one database, dlt relies on datasets to exist for multiple reasons. To make `clickhouse` work with `dlt`, tables generated by `dlt` in your `clickhouse` database will have their name prefixed with the dataset name separated by the configurable `dataset_table_separator`. +Additionally, a special sentinel table that doesn't contain any data will be created, so dlt knows which virtual datasets already exist in a +clickhouse +destination. ## Supported file formats From 41a7e1a4f47f070f9c761fe0402e2723b1a8def6 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 4 Jul 2024 16:02:26 +0200 Subject: [PATCH 25/33] Revert "Use state instead of sentinel tables" This reverts commit e1ac1cebc44c725fed38664d5f9a0167e571c10e. --- .../impl/clickhouse/clickhouse.py | 46 ------------------- .../impl/clickhouse/configuration.py | 5 +- .../impl/clickhouse/sql_client.py | 21 ++++++++- 3 files changed, 24 insertions(+), 48 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 3abff0c74f..798f302be2 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -1,4 +1,3 @@ -import contextlib import os import re from copy import deepcopy @@ -10,7 +9,6 @@ from clickhouse_connect.driver.tools import insert_file from dlt import config -from dlt.common.configuration.container import Container from dlt.common.configuration.specs import ( CredentialsConfiguration, AzureCredentialsWithoutDefaults, @@ -32,13 +30,6 @@ TColumnType, ) from dlt.common.storages import FileStorage -from dlt.common.storages.load_package import ( - LoadPackageStateInjectableContext, - commit_load_package_state, -) -from dlt.common.storages.load_package import ( - TLoadPackageState, -) from dlt.destinations.exceptions import LoadJobTerminalException from dlt.destinations.impl.clickhouse.clickhouse_adapter import ( TTableEngineType, @@ -307,43 +298,6 @@ def __init__( self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR) self.type_mapper = ClickHouseTypeMapper(self.capabilities) - def has_dataset(self) -> bool: - try: - container = Container() - state_ctx = container[LoadPackageStateInjectableContext] - datasets = cast(List[str], state_ctx.state.get("datasets", [])) - return self.sql_client.dataset_name in datasets - except KeyError: - return False - - def create_dataset(self) -> None: - with contextlib.suppress(KeyError): - container = Container() - state_ctx = container[LoadPackageStateInjectableContext] - datasets = cast(List[str], state_ctx.state.get("datasets", [])) - if self.sql_client.dataset_name not in datasets: - datasets.append(self.sql_client.dataset_name) - state: TLoadPackageState = state_ctx.state - state["datasets"] = datasets # type: ignore[typeddict-unknown-key] - commit_load_package_state() - - def drop_dataset(self) -> None: - with contextlib.suppress(KeyError): - container = Container() - state_ctx = container[LoadPackageStateInjectableContext] - datasets = cast(List[str], state_ctx.state.get("datasets", [])) - if self.sql_client.dataset_name in datasets: - datasets.remove(self.sql_client.dataset_name) - state: TLoadPackageState = state_ctx.state - state["datasets"] = datasets # type: ignore[typeddict-unknown-key] - commit_load_package_state() - - to_drop_results = self.sql_client._list_tables() - for table in to_drop_results: - self.sql_client.execute_sql( - f"""DROP TABLE {self.sql_client.catalog_name()}.{self.capabilities.escape_identifier(table)} SYNC""" - ) - def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: return [ClickHouseMergeJob.from_table_chain(table_chain, self.sql_client)] diff --git a/dlt/destinations/impl/clickhouse/configuration.py b/dlt/destinations/impl/clickhouse/configuration.py index 5c6204b9bc..7b97071abf 100644 --- a/dlt/destinations/impl/clickhouse/configuration.py +++ b/dlt/destinations/impl/clickhouse/configuration.py @@ -6,6 +6,7 @@ from dlt.common.destination.reference import ( DestinationClientDwhWithStagingConfiguration, ) +from dlt.common.libs.sql_alchemy import URL from dlt.common.utils import digest128 @@ -35,6 +36,8 @@ class ClickHouseCredentials(ConnectionStringCredentials): """Timeout for sending and receiving data. Defaults to 300 seconds.""" dataset_table_separator: str = "___" """Separator for dataset table names, defaults to '___', i.e. 'database.dataset___table'.""" + dataset_sentinel_table_name: str = "dlt_sentinel_table" + """Special table to mark dataset as existing""" gcp_access_key_id: Optional[str] = None """When loading from a gcp bucket, you need to provide gcp interoperable keys""" gcp_secret_access_key: Optional[str] = None @@ -83,7 +86,7 @@ class ClickHouseClientConfiguration(DestinationClientDwhWithStagingConfiguration # See: https://clickhouse.com/docs/en/optimize/sparse-primary-indexes def fingerprint(self) -> str: - """Returns a fingerprint of the host part of a connection string.""" + """Returns a fingerprint of host part of a connection string.""" if self.credentials and self.credentials.host: return digest128(self.credentials.host) return "" diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index 8d42bbef58..f5dc675fd6 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -62,6 +62,13 @@ def __init__( self.credentials = credentials self.database_name = credentials.database + def has_dataset(self) -> bool: + # we do not need to normalize dataset_sentinel_table_name + sentinel_table = self.credentials.dataset_sentinel_table_name + return sentinel_table in [ + t.split(self.credentials.dataset_table_separator)[1] for t in self._list_tables() + ] + def open_connection(self) -> clickhouse_driver.dbapi.connection.Connection: self._conn = clickhouse_driver.connect(dsn=self.credentials.to_native_representation()) return self._conn @@ -95,6 +102,18 @@ def execute_sql( with self.execute_query(sql, *args, **kwargs) as curr: return None if curr.description is None else curr.fetchall() + def create_dataset(self) -> None: + # We create a sentinel table which defines whether we consider the dataset created. + sentinel_table_name = self.make_qualified_table_name( + self.credentials.dataset_sentinel_table_name + ) + # `MergeTree` is guaranteed to work in both self-managed and cloud setups. + self.execute_sql(f""" + CREATE TABLE {sentinel_table_name} + (_dlt_id String NOT NULL PRIMARY KEY) + ENGINE=MergeTree + COMMENT 'internal dlt sentinel table'""") + def drop_dataset(self) -> None: # Since ClickHouse doesn't have schemas, we need to drop all tables in our virtual schema, # or collection of tables, that has the `dataset_name` as a prefix. @@ -102,7 +121,7 @@ def drop_dataset(self) -> None: for table in to_drop_results: # The "DROP TABLE" clause is discarded if we allow clickhouse_driver to handle parameter substitution. # This is because the driver incorrectly substitutes the entire query string, causing the "DROP TABLE" keyword to be omitted. - # To resolve this, we're forced to provide the full query string here. + # To resolve this, we are forced to provide the full query string here. self.execute_sql( f"""DROP TABLE {self.catalog_name()}.{self.capabilities.escape_identifier(table)} SYNC""" ) From bc7b309971ab57ab1204d6dd79a462424e5e56a8 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 4 Jul 2024 20:37:13 +0200 Subject: [PATCH 26/33] Add tests for ClickHouse table engine configuration and adapter overrides Signed-off-by: Marcel Coetzee --- .../clickhouse/test_clickhouse_adapter.py | 79 ++++++++++++++++++- .../test_clickhouse_configuration.py | 39 ++++++--- 2 files changed, 103 insertions(+), 15 deletions(-) diff --git a/tests/load/clickhouse/test_clickhouse_adapter.py b/tests/load/clickhouse/test_clickhouse_adapter.py index 6fadbd122b..dbbb5b1da0 100644 --- a/tests/load/clickhouse/test_clickhouse_adapter.py +++ b/tests/load/clickhouse/test_clickhouse_adapter.py @@ -1,7 +1,11 @@ +import os from typing import Generator, Dict, cast +import pytest + import dlt from dlt.destinations.adapters import clickhouse_adapter +from dlt.destinations.impl.clickhouse.clickhouse_adapter import TABLE_ENGINE_TYPE_HINT, TTableEngineType from dlt.destinations.impl.clickhouse.sql_client import TDeployment, ClickHouseSqlClient from tests.load.clickhouse.utils import get_deployment_type from tests.pipeline.utils import assert_load_info @@ -30,16 +34,26 @@ def not_annotated_resource() -> Generator[Dict[str, int], None, None]: yield {"field1": 1, "field2": 2} clickhouse_adapter(merge_tree_resource, table_engine_type="merge_tree") - clickhouse_adapter(replicated_merge_tree_resource, table_engine_type="replicated_merge_tree") + clickhouse_adapter( + replicated_merge_tree_resource, table_engine_type="replicated_merge_tree" + ) - pipe = dlt.pipeline(pipeline_name="adapter_test", destination="clickhouse", dev_mode=True) + pipe = dlt.pipeline( + pipeline_name="adapter_test", destination="clickhouse", dev_mode=True + ) with pipe.sql_client() as client: - deployment_type: TDeployment = get_deployment_type(cast(ClickHouseSqlClient, client)) + deployment_type: TDeployment = get_deployment_type( + cast(ClickHouseSqlClient, client) + ) if deployment_type == "ClickHouseCloud": pack = pipe.run( - [merge_tree_resource, replicated_merge_tree_resource, not_annotated_resource] + [ + merge_tree_resource, + replicated_merge_tree_resource, + not_annotated_resource, + ] ) else: # `ReplicatedMergeTree` not supported if only a single node. @@ -104,3 +118,60 @@ def not_annotated_resource() -> Generator[Dict[str, int], None, None]: assert "ENGINE = ReplicatedMergeTree" in sql[0] else: assert "ENGINE = MergeTree" or "ENGINE = SharedMergeTree" in sql[0] + + +def test_clickhouse_configuration_adapter_table_engine_override() -> None: + """Tests that, given a user has specified a non-default engine to use across all tables, + that adapter overriding works on a per-resource level.""" + os.environ["DESTINATION__CLICKHOUSE__TABLE_ENGINE_TYPE"] = "replicated_merge_tree" + + @dlt.resource + def annotated_resource() -> Generator[Dict[str, int], None, None]: + yield {"field1": 1, "field2": 2} + + @dlt.resource + def non_annotated_resource() -> Generator[Dict[str, int], None, None]: + yield {"field1": 1, "field2": 2} + + clickhouse_adapter(annotated_resource, table_engine_type="merge_tree") + + pipe = dlt.pipeline( + pipeline_name="adapter_test", destination="clickhouse", dev_mode=True + ) + + with pipe.sql_client() as client: + deployment_type: TDeployment = get_deployment_type( + cast(ClickHouseSqlClient, client) + ) + + if deployment_type != "ClickHouseCloud": + # TODO: Run for local when log engines are implemented. + pytest.skip( + "Only ClickHouseCloud has enough table types implemented for this test to work for now." + ) + + pack = pipe.run( + [ + annotated_resource, + non_annotated_resource, + ] + ) + + assert_load_info(pack) + + with pipe.sql_client() as client: + tables = {} + for table in client._list_tables(): + if "resource" in table: + tables[table.split("___")[1]] = table + + for table_name, full_table_name in tables.items(): + with client.execute_query( + "SELECT database, name, engine, engine_full FROM system.tables " + f"WHERE name = '{full_table_name}';" + ) as cursor: + res = cursor.fetchall() + if table_name == "non_annotated_resource": + assert tuple(res[0])[2] == "ReplicatedMergeTree" + else: + assert tuple(res[0])[2] in ("SharedMergeTree", "MergeTree") diff --git a/tests/load/clickhouse/test_clickhouse_configuration.py b/tests/load/clickhouse/test_clickhouse_configuration.py index dfffcfa4c7..b41cd614bc 100644 --- a/tests/load/clickhouse/test_clickhouse_configuration.py +++ b/tests/load/clickhouse/test_clickhouse_configuration.py @@ -1,8 +1,8 @@ -from typing import Any, Iterator +import os +from typing import Iterator import pytest -import dlt from dlt.common.configuration.resolve import resolve_configuration from dlt.common.libs.sql_alchemy import make_url from dlt.common.utils import digest128 @@ -11,11 +11,6 @@ ClickHouseCredentials, ClickHouseClientConfiguration, ) -from dlt.destinations.impl.snowflake.configuration import ( - SnowflakeClientConfiguration, - SnowflakeCredentials, -) -from tests.common.configuration.utils import environment from tests.load.utils import yield_client_with_storage @@ -53,15 +48,17 @@ def test_clickhouse_configuration() -> None: # def empty fingerprint assert ClickHouseClientConfiguration().fingerprint() == "" # based on host - c = resolve_configuration( - SnowflakeCredentials(), + config = resolve_configuration( + ClickHouseCredentials(), explicit_value="clickhouse://user1:pass1@host1:9000/db1", ) - assert SnowflakeClientConfiguration(credentials=c).fingerprint() == digest128("host1") + assert ClickHouseClientConfiguration(credentials=config).fingerprint() == digest128( + "host1" + ) def test_clickhouse_connection_settings(client: ClickHouseClient) -> None: - """Test experimental settings are set correctly for session.""" + """Test experimental settings are set correctly for the session.""" conn = client.sql_client.open_connection() cursor1 = conn.cursor() cursor2 = conn.cursor() @@ -75,3 +72,23 @@ def test_clickhouse_connection_settings(client: ClickHouseClient) -> None: assert ("allow_experimental_lightweight_delete", "1") in res assert ("enable_http_compression", "1") in res assert ("date_time_input_format", "best_effort") in res + + +def test_clickhouse_table_engine_configuration() -> None: + os.environ["DESTINATION__CLICKHOUSE__CREDENTIALS__HOST"] = "localhost" + + # Test the default engine. + config = resolve_configuration( + ClickHouseClientConfiguration()._bind_dataset_name(dataset_name="dataset"), + sections=("destination", "clickhouse"), + ) + assert config.credentials.table_engine_type == "merge_tree" + + # Test user provided engine. + os.environ["DESTINATION__CLICKHOUSE__TABLE_ENGINE_TYPE"] = "replicated_merge_tree" + + config = resolve_configuration( + ClickHouseClientConfiguration()._bind_dataset_name(dataset_name="dataset"), + sections=("destination", "clickhouse"), + ) + assert config.credentials.table_engine_type == "replicated_merge_tree" From f852d44d41ff77327a4735772ccd8f7affb03bb3 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 4 Jul 2024 22:49:43 +0200 Subject: [PATCH 27/33] Add configurable default table engine type for ClickHouse Signed-off-by: Marcel Coetzee --- .../impl/clickhouse/clickhouse.py | 5 +- .../impl/clickhouse/clickhouse_adapter.py | 7 +- .../impl/clickhouse/configuration.py | 14 +++- dlt/load/load.py | 4 +- .../clickhouse/test_clickhouse_adapter.py | 72 ++----------------- .../test_clickhouse_configuration.py | 24 +------ .../test_clickhouse_table_builder.py | 20 +++++- 7 files changed, 42 insertions(+), 104 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 798f302be2..1d19555f5c 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -350,7 +350,10 @@ def _get_table_update_sql( # Default to 'MergeTree' if the user didn't explicitly set a table engine hint. # Clickhouse Cloud will automatically pick `SharedMergeTree` for this option, # so it will work on both local and cloud instances of CH. - table_type = cast(TTableEngineType, table.get(TABLE_ENGINE_TYPE_HINT, "merge_tree")) + table_type = cast( + TTableEngineType, + table.get(TABLE_ENGINE_TYPE_HINT, self.config.credentials.table_engine_type), + ) sql[0] = f"{sql[0]}\nENGINE = {TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR.get(table_type)}" if primary_key_list := [ diff --git a/dlt/destinations/impl/clickhouse/clickhouse_adapter.py b/dlt/destinations/impl/clickhouse/clickhouse_adapter.py index fce5694a08..b47f4f90c4 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse_adapter.py +++ b/dlt/destinations/impl/clickhouse/clickhouse_adapter.py @@ -1,16 +1,11 @@ from typing import Any, Literal, Set, get_args, Dict +from dlt.destinations.impl.clickhouse.configuration import TTableEngineType from dlt.destinations.utils import ensure_resource from dlt.extract import DltResource from dlt.extract.items import TTableHintTemplate -TTableEngineType = Literal[ - "merge_tree", - "shared_merge_tree", - "replicated_merge_tree", -] - """ The table engine (type of table) determines: diff --git a/dlt/destinations/impl/clickhouse/configuration.py b/dlt/destinations/impl/clickhouse/configuration.py index 7b97071abf..90b9d4a0b6 100644 --- a/dlt/destinations/impl/clickhouse/configuration.py +++ b/dlt/destinations/impl/clickhouse/configuration.py @@ -6,11 +6,15 @@ from dlt.common.destination.reference import ( DestinationClientDwhWithStagingConfiguration, ) -from dlt.common.libs.sql_alchemy import URL from dlt.common.utils import digest128 TSecureConnection = Literal[0, 1] +TTableEngineType = Literal[ + "merge_tree", + "shared_merge_tree", + "replicated_merge_tree", +] @configspec(init=False) @@ -36,6 +40,8 @@ class ClickHouseCredentials(ConnectionStringCredentials): """Timeout for sending and receiving data. Defaults to 300 seconds.""" dataset_table_separator: str = "___" """Separator for dataset table names, defaults to '___', i.e. 'database.dataset___table'.""" + table_engine_type: Optional[TTableEngineType] = "merge_tree" + """The default table engine to use. Defaults to 'merge_tree'. Other implemented options are 'shared_merge_tree' and 'replicated_merge_tree'.""" dataset_sentinel_table_name: str = "dlt_sentinel_table" """Special table to mark dataset as existing""" gcp_access_key_id: Optional[str] = None @@ -77,7 +83,9 @@ def get_query(self) -> Dict[str, Any]: @configspec class ClickHouseClientConfiguration(DestinationClientDwhWithStagingConfiguration): - destination_type: Final[str] = dataclasses.field(default="clickhouse", init=False, repr=False, compare=False) # type: ignore[misc] + destination_type: Final[str] = dataclasses.field( # type: ignore[misc] + default="clickhouse", init=False, repr=False, compare=False + ) credentials: ClickHouseCredentials = None # Primary key columns are used to build a sparse primary index which allows for efficient data retrieval, @@ -86,7 +94,7 @@ class ClickHouseClientConfiguration(DestinationClientDwhWithStagingConfiguration # See: https://clickhouse.com/docs/en/optimize/sparse-primary-indexes def fingerprint(self) -> str: - """Returns a fingerprint of host part of a connection string.""" + """Returns a fingerprint of the host part of a connection string.""" if self.credentials and self.credentials.host: return digest128(self.credentials.host) return "" diff --git a/dlt/load/load.py b/dlt/load/load.py index 76b4806694..ec93fbd9c9 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -197,7 +197,9 @@ def spool_new_jobs(self, load_id: str, schema: Schema) -> Tuple[int, List[LoadJo # use thread based pool as jobs processing is mostly I/O and we do not want to pickle jobs load_files = filter_new_jobs( self.load_storage.list_new_jobs(load_id), - self.destination.capabilities(self.destination.configuration(self.initial_client_config)), + self.destination.capabilities( + self.destination.configuration(self.initial_client_config) + ), self.config, ) file_count = len(load_files) diff --git a/tests/load/clickhouse/test_clickhouse_adapter.py b/tests/load/clickhouse/test_clickhouse_adapter.py index dbbb5b1da0..fad8b7a02c 100644 --- a/tests/load/clickhouse/test_clickhouse_adapter.py +++ b/tests/load/clickhouse/test_clickhouse_adapter.py @@ -1,11 +1,10 @@ -import os from typing import Generator, Dict, cast import pytest import dlt +from dlt.common.utils import custom_environ from dlt.destinations.adapters import clickhouse_adapter -from dlt.destinations.impl.clickhouse.clickhouse_adapter import TABLE_ENGINE_TYPE_HINT, TTableEngineType from dlt.destinations.impl.clickhouse.sql_client import TDeployment, ClickHouseSqlClient from tests.load.clickhouse.utils import get_deployment_type from tests.pipeline.utils import assert_load_info @@ -34,18 +33,12 @@ def not_annotated_resource() -> Generator[Dict[str, int], None, None]: yield {"field1": 1, "field2": 2} clickhouse_adapter(merge_tree_resource, table_engine_type="merge_tree") - clickhouse_adapter( - replicated_merge_tree_resource, table_engine_type="replicated_merge_tree" - ) + clickhouse_adapter(replicated_merge_tree_resource, table_engine_type="replicated_merge_tree") - pipe = dlt.pipeline( - pipeline_name="adapter_test", destination="clickhouse", dev_mode=True - ) + pipe = dlt.pipeline(pipeline_name="adapter_test", destination="clickhouse", dev_mode=True) with pipe.sql_client() as client: - deployment_type: TDeployment = get_deployment_type( - cast(ClickHouseSqlClient, client) - ) + deployment_type: TDeployment = get_deployment_type(cast(ClickHouseSqlClient, client)) if deployment_type == "ClickHouseCloud": pack = pipe.run( @@ -118,60 +111,3 @@ def not_annotated_resource() -> Generator[Dict[str, int], None, None]: assert "ENGINE = ReplicatedMergeTree" in sql[0] else: assert "ENGINE = MergeTree" or "ENGINE = SharedMergeTree" in sql[0] - - -def test_clickhouse_configuration_adapter_table_engine_override() -> None: - """Tests that, given a user has specified a non-default engine to use across all tables, - that adapter overriding works on a per-resource level.""" - os.environ["DESTINATION__CLICKHOUSE__TABLE_ENGINE_TYPE"] = "replicated_merge_tree" - - @dlt.resource - def annotated_resource() -> Generator[Dict[str, int], None, None]: - yield {"field1": 1, "field2": 2} - - @dlt.resource - def non_annotated_resource() -> Generator[Dict[str, int], None, None]: - yield {"field1": 1, "field2": 2} - - clickhouse_adapter(annotated_resource, table_engine_type="merge_tree") - - pipe = dlt.pipeline( - pipeline_name="adapter_test", destination="clickhouse", dev_mode=True - ) - - with pipe.sql_client() as client: - deployment_type: TDeployment = get_deployment_type( - cast(ClickHouseSqlClient, client) - ) - - if deployment_type != "ClickHouseCloud": - # TODO: Run for local when log engines are implemented. - pytest.skip( - "Only ClickHouseCloud has enough table types implemented for this test to work for now." - ) - - pack = pipe.run( - [ - annotated_resource, - non_annotated_resource, - ] - ) - - assert_load_info(pack) - - with pipe.sql_client() as client: - tables = {} - for table in client._list_tables(): - if "resource" in table: - tables[table.split("___")[1]] = table - - for table_name, full_table_name in tables.items(): - with client.execute_query( - "SELECT database, name, engine, engine_full FROM system.tables " - f"WHERE name = '{full_table_name}';" - ) as cursor: - res = cursor.fetchall() - if table_name == "non_annotated_resource": - assert tuple(res[0])[2] == "ReplicatedMergeTree" - else: - assert tuple(res[0])[2] in ("SharedMergeTree", "MergeTree") diff --git a/tests/load/clickhouse/test_clickhouse_configuration.py b/tests/load/clickhouse/test_clickhouse_configuration.py index b41cd614bc..7f3cb6a1f6 100644 --- a/tests/load/clickhouse/test_clickhouse_configuration.py +++ b/tests/load/clickhouse/test_clickhouse_configuration.py @@ -52,9 +52,7 @@ def test_clickhouse_configuration() -> None: ClickHouseCredentials(), explicit_value="clickhouse://user1:pass1@host1:9000/db1", ) - assert ClickHouseClientConfiguration(credentials=config).fingerprint() == digest128( - "host1" - ) + assert ClickHouseClientConfiguration(credentials=config).fingerprint() == digest128("host1") def test_clickhouse_connection_settings(client: ClickHouseClient) -> None: @@ -72,23 +70,3 @@ def test_clickhouse_connection_settings(client: ClickHouseClient) -> None: assert ("allow_experimental_lightweight_delete", "1") in res assert ("enable_http_compression", "1") in res assert ("date_time_input_format", "best_effort") in res - - -def test_clickhouse_table_engine_configuration() -> None: - os.environ["DESTINATION__CLICKHOUSE__CREDENTIALS__HOST"] = "localhost" - - # Test the default engine. - config = resolve_configuration( - ClickHouseClientConfiguration()._bind_dataset_name(dataset_name="dataset"), - sections=("destination", "clickhouse"), - ) - assert config.credentials.table_engine_type == "merge_tree" - - # Test user provided engine. - os.environ["DESTINATION__CLICKHOUSE__TABLE_ENGINE_TYPE"] = "replicated_merge_tree" - - config = resolve_configuration( - ClickHouseClientConfiguration()._bind_dataset_name(dataset_name="dataset"), - sections=("destination", "clickhouse"), - ) - assert config.credentials.table_engine_type == "replicated_merge_tree" diff --git a/tests/load/clickhouse/test_clickhouse_table_builder.py b/tests/load/clickhouse/test_clickhouse_table_builder.py index 867102dde9..281e39e826 100644 --- a/tests/load/clickhouse/test_clickhouse_table_builder.py +++ b/tests/load/clickhouse/test_clickhouse_table_builder.py @@ -6,7 +6,6 @@ from dlt.common.schema import Schema from dlt.common.utils import custom_environ, digest128 from dlt.common.utils import uniq_id - from dlt.destinations import clickhouse from dlt.destinations.impl.clickhouse.clickhouse import ClickHouseClient from dlt.destinations.impl.clickhouse.configuration import ( @@ -140,7 +139,9 @@ def test_clickhouse_alter_table(clickhouse_client: ClickHouseClient) -> None: @pytest.mark.usefixtures("empty_schema") -def test_clickhouse_create_table_with_primary_keys(clickhouse_client: ClickHouseClient) -> None: +def test_clickhouse_create_table_with_primary_keys( + clickhouse_client: ClickHouseClient, +) -> None: mod_update = deepcopy(TABLE_UPDATE) mod_update[1]["primary_key"] = True @@ -172,3 +173,18 @@ def test_clickhouse_create_table_with_hints(client: ClickHouseClient) -> None: # No hints. assert "`col3` boolean NOT NULL" in sql assert "`col4` timestamp with time zone NOT NULL" in sql + + +def test_clickhouse_table_engine_configuration() -> None: + with custom_environ({"DESTINATION__CLICKHOUSE__CREDENTIALS__HOST": "localhost"}): + C = resolve_configuration(ClickHouseCredentials(), sections=("destination", "clickhouse")) + assert C.table_engine_type == "merge_tree" + + with custom_environ( + { + "DESTINATION__CLICKHOUSE__CREDENTIALS__HOST": "localhost", + "DESTINATION__CLICKHOUSE__CREDENTIALS__TABLE_ENGINE_TYPE": "replicated_merge_tree", + } + ): + C = resolve_configuration(ClickHouseCredentials(), sections=("destination", "clickhouse")) + assert C.table_engine_type == "replicated_merge_tree" From b0cec6e723aad1f92409a0fca50b7bd1bcf8b2b1 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 4 Jul 2024 23:16:13 +0200 Subject: [PATCH 28/33] Docs Signed-off-by: Marcel Coetzee --- .../dlt-ecosystem/destinations/clickhouse.md | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md index ec5eb18523..16a0e22d62 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md @@ -67,6 +67,7 @@ To load data into ClickHouse, you need to create a ClickHouse database. While we http_port = 8443 # HTTP Port to connect to ClickHouse server's HTTP interface. Defaults to 8443 for Clickhouse Cloud. secure = 1 # Set to 1 if using HTTPS, else 0. dataset_table_separator = "___" # Separator for dataset table names from dataset. + table_engine_type = "merge_tree" # The default table engine to use. ``` :::info Network Ports @@ -141,7 +142,21 @@ ClickHouse supports the following [column hints](../../general-usage/schema#tabl ## Choosing a Table Engine -By default, tables are created using the `MergeTree` table engine in ClickHouse. You can specify an alternate table engine using the `table_engine_type` parameter with the clickhouse adapter: +dlt defaults to `MergeTree` table engine. You can specify an alternate table engine in two ways: + +### Setting a default table engine in the configuration + +You can set a default table engine for all resources and dlt tables by adding the `table_engine_type` parameter to your ClickHouse credentials in the `.dlt/secrets.toml` file: + +```toml +[destination.clickhouse.credentials] +# ... (other credentials) +table_engine_type = "merge_tree" # The default table engine to use. +``` + +### Setting the table engine for specific resources + +You can also set the table engine for specific resources using the clickhouse_adapter, which will override the default engine set in `.dlt/secrets.toml`, for that resource: ```py from dlt.destinations.adapters import clickhouse_adapter From b2bc5b195966c7acc6f36c639000ccb102151443 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Fri, 12 Jul 2024 15:00:04 +0200 Subject: [PATCH 29/33] Fix comments Signed-off-by: Marcel Coetzee --- .../docs/dlt-ecosystem/destinations/clickhouse.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md index 16a0e22d62..917cd0bc0c 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md @@ -59,12 +59,12 @@ To load data into ClickHouse, you need to create a ClickHouse database. While we ```toml [destination.clickhouse.credentials] - database = "dlt" # The database name you created - username = "dlt" # ClickHouse username, default is usually "default" - password = "Dlt*12345789234567" # ClickHouse password if any - host = "localhost" # ClickHouse server host - port = 9000 # ClickHouse HTTP port, default is 9000 for Clickhouse Cloud. - http_port = 8443 # HTTP Port to connect to ClickHouse server's HTTP interface. Defaults to 8443 for Clickhouse Cloud. + database = "dlt" # The database name you created. + username = "dlt" # ClickHouse username, default is usually "default". + password = "Dlt*12345789234567" # ClickHouse password if any. + host = "localhost" # ClickHouse server host. + port = 9000 # ClickHouse native TCP protocol port, default is 9000. + http_port = 8443 # ClickHouse HTTP port, default is 9000. secure = 1 # Set to 1 if using HTTPS, else 0. dataset_table_separator = "___" # Separator for dataset table names from dataset. table_engine_type = "merge_tree" # The default table engine to use. From 8d26ef131b782b63155f0e114e270258b160540f Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Fri, 12 Jul 2024 16:12:41 +0200 Subject: [PATCH 30/33] Add ClickHouse typing module for improved type handling Signed-off-by: Marcel Coetzee --- .../impl/clickhouse/clickhouse.py | 39 ++++++++----------- .../impl/clickhouse/clickhouse_adapter.py | 8 ++-- .../impl/clickhouse/configuration.py | 11 +----- dlt/destinations/impl/clickhouse/factory.py | 3 +- .../impl/clickhouse/sql_client.py | 11 ++++-- dlt/destinations/impl/clickhouse/typing.py | 32 +++++++++++++++ dlt/destinations/impl/clickhouse/utils.py | 14 +++---- .../clickhouse/test_clickhouse_adapter.py | 5 +-- .../test_clickhouse_configuration.py | 1 - tests/load/clickhouse/utils.py | 3 +- 10 files changed, 73 insertions(+), 54 deletions(-) create mode 100644 dlt/destinations/impl/clickhouse/typing.py diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index baa0d27f84..ae5c008c1b 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -2,7 +2,7 @@ import re from copy import deepcopy from textwrap import dedent -from typing import Optional, Dict, List, Sequence, cast +from typing import Optional, List, Sequence, cast from urllib.parse import urlparse import clickhouse_connect @@ -26,24 +26,27 @@ from dlt.common.schema.typing import ( TTableFormat, TTableSchema, - TColumnHint, TColumnType, ) from dlt.common.storages import FileStorage from dlt.destinations.exceptions import LoadJobTerminalException -from dlt.destinations.impl.clickhouse.clickhouse_adapter import ( - TTableEngineType, - TABLE_ENGINE_TYPE_HINT, -) from dlt.destinations.impl.clickhouse.configuration import ( ClickHouseClientConfiguration, ) from dlt.destinations.impl.clickhouse.sql_client import ClickHouseSqlClient -from dlt.destinations.impl.clickhouse.utils import ( - convert_storage_to_http_scheme, +from dlt.destinations.impl.clickhouse.typing import ( + HINT_TO_CLICKHOUSE_ATTR, + TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR, +) +from dlt.destinations.impl.clickhouse.typing import ( + TTableEngineType, + TABLE_ENGINE_TYPE_HINT, FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING, SUPPORTED_FILE_FORMATS, ) +from dlt.destinations.impl.clickhouse.utils import ( + convert_storage_to_http_scheme, +) from dlt.destinations.job_client_impl import ( SqlJobClientBase, SqlJobClientWithStaging, @@ -53,19 +56,6 @@ from dlt.destinations.type_mapping import TypeMapper -HINT_TO_CLICKHOUSE_ATTR: Dict[TColumnHint, str] = { - "primary_key": "PRIMARY KEY", - "unique": "", # No unique constraints available in ClickHouse. - "foreign_key": "", # No foreign key constraints support in ClickHouse. -} - -TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR: Dict[TTableEngineType, str] = { - "merge_tree": "MergeTree", - "shared_merge_tree": "SharedMergeTree", - "replicated_merge_tree": "ReplicatedMergeTree", -} - - class ClickHouseTypeMapper(TypeMapper): sct_to_unbound_dbt = { "complex": "String", @@ -310,7 +300,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non hints_ = " ".join( self.active_hints.get(hint) for hint in self.active_hints.keys() - if c.get(hint, False) is True + if c.get(cast(str, hint), False) is True and hint not in ("primary_key", "sort") and hint in self.active_hints ) @@ -355,7 +345,10 @@ def _get_table_update_sql( # so it will work on both local and cloud instances of CH. table_type = cast( TTableEngineType, - table.get(TABLE_ENGINE_TYPE_HINT, self.config.credentials.table_engine_type), + table.get( + cast(str, TABLE_ENGINE_TYPE_HINT), + self.config.credentials.table_engine_type, + ), ) sql[0] = f"{sql[0]}\nENGINE = {TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR.get(table_type)}" diff --git a/dlt/destinations/impl/clickhouse/clickhouse_adapter.py b/dlt/destinations/impl/clickhouse/clickhouse_adapter.py index b47f4f90c4..dc030ef88c 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse_adapter.py +++ b/dlt/destinations/impl/clickhouse/clickhouse_adapter.py @@ -1,6 +1,10 @@ -from typing import Any, Literal, Set, get_args, Dict +from typing import Any, Dict from dlt.destinations.impl.clickhouse.configuration import TTableEngineType +from dlt.destinations.impl.clickhouse.typing import ( + TABLE_ENGINE_TYPES, + TABLE_ENGINE_TYPE_HINT, +) from dlt.destinations.utils import ensure_resource from dlt.extract import DltResource from dlt.extract.items import TTableHintTemplate @@ -18,8 +22,6 @@ See https://clickhouse.com/docs/en/engines/table-engines. """ -TABLE_ENGINE_TYPES: Set[TTableEngineType] = set(get_args(TTableEngineType)) -TABLE_ENGINE_TYPE_HINT: Literal["x-table-engine-type"] = "x-table-engine-type" def clickhouse_adapter(data: Any, table_engine_type: TTableEngineType = None) -> DltResource: diff --git a/dlt/destinations/impl/clickhouse/configuration.py b/dlt/destinations/impl/clickhouse/configuration.py index 90b9d4a0b6..1e548cc5b9 100644 --- a/dlt/destinations/impl/clickhouse/configuration.py +++ b/dlt/destinations/impl/clickhouse/configuration.py @@ -1,5 +1,5 @@ import dataclasses -from typing import ClassVar, Dict, List, Any, Final, Literal, cast, Optional +from typing import ClassVar, Dict, List, Any, Final, cast, Optional from dlt.common.configuration import configspec from dlt.common.configuration.specs import ConnectionStringCredentials @@ -7,14 +7,7 @@ DestinationClientDwhWithStagingConfiguration, ) from dlt.common.utils import digest128 - - -TSecureConnection = Literal[0, 1] -TTableEngineType = Literal[ - "merge_tree", - "shared_merge_tree", - "replicated_merge_tree", -] +from dlt.destinations.impl.clickhouse.typing import TSecureConnection, TTableEngineType @configspec(init=False) diff --git a/dlt/destinations/impl/clickhouse/factory.py b/dlt/destinations/impl/clickhouse/factory.py index 32acf96d21..93da6c866a 100644 --- a/dlt/destinations/impl/clickhouse/factory.py +++ b/dlt/destinations/impl/clickhouse/factory.py @@ -1,14 +1,13 @@ import sys import typing as t -from dlt.common.destination import Destination, DestinationCapabilitiesContext from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE from dlt.common.data_writers.escape import ( escape_clickhouse_identifier, escape_clickhouse_literal, format_clickhouse_datetime_literal, ) - +from dlt.common.destination import Destination, DestinationCapabilitiesContext from dlt.destinations.impl.clickhouse.configuration import ( ClickHouseClientConfiguration, ClickHouseCredentials, diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index f3c1d986a5..c3c4f67806 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -10,6 +10,7 @@ ClassVar, Literal, Tuple, + cast, ) import clickhouse_driver # type: ignore[import-untyped] @@ -26,6 +27,10 @@ DatabaseTerminalException, ) from dlt.destinations.impl.clickhouse.configuration import ClickHouseCredentials +from dlt.destinations.impl.clickhouse.typing import ( + TTableEngineType, + TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR, +) from dlt.destinations.sql_client import ( DBApiCursorImpl, SqlClientBase, @@ -108,15 +113,15 @@ def create_dataset(self) -> None: sentinel_table_name = self.make_qualified_table_name( self.credentials.dataset_sentinel_table_name ) - # `MergeTree` is guaranteed to work in both self-managed and cloud setups. + sentinel_table_type = cast(TTableEngineType, self.credentials.table_engine_type) self.execute_sql(f""" CREATE TABLE {sentinel_table_name} (_dlt_id String NOT NULL PRIMARY KEY) - ENGINE=MergeTree + ENGINE={TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR.get(sentinel_table_type)} COMMENT 'internal dlt sentinel table'""") def drop_dataset(self) -> None: - # always try to drop sentinel table + # always try to drop the sentinel table. sentinel_table_name = self.make_qualified_table_name( self.credentials.dataset_sentinel_table_name ) diff --git a/dlt/destinations/impl/clickhouse/typing.py b/dlt/destinations/impl/clickhouse/typing.py new file mode 100644 index 0000000000..658822149c --- /dev/null +++ b/dlt/destinations/impl/clickhouse/typing.py @@ -0,0 +1,32 @@ +from typing import Literal, Dict, get_args, Set + +from dlt.common.schema import TColumnHint + +TSecureConnection = Literal[0, 1] +TTableEngineType = Literal[ + "merge_tree", + "shared_merge_tree", + "replicated_merge_tree", +] + +HINT_TO_CLICKHOUSE_ATTR: Dict[TColumnHint, str] = { + "primary_key": "PRIMARY KEY", + "unique": "", # No unique constraints available in ClickHouse. + "foreign_key": "", # No foreign key constraints support in ClickHouse. +} + +TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR: Dict[TTableEngineType, str] = { + "merge_tree": "MergeTree", + "shared_merge_tree": "SharedMergeTree", + "replicated_merge_tree": "ReplicatedMergeTree", +} + +TDeployment = Literal["ClickHouseOSS", "ClickHouseCloud"] + +SUPPORTED_FILE_FORMATS = Literal["jsonl", "parquet"] +FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING: Dict[SUPPORTED_FILE_FORMATS, str] = { + "jsonl": "JSONEachRow", + "parquet": "Parquet", +} +TABLE_ENGINE_TYPES: Set[TTableEngineType] = set(get_args(TTableEngineType)) +TABLE_ENGINE_TYPE_HINT: Literal["x-table-engine-type"] = "x-table-engine-type" diff --git a/dlt/destinations/impl/clickhouse/utils.py b/dlt/destinations/impl/clickhouse/utils.py index 0e2fa3db00..02e4e93943 100644 --- a/dlt/destinations/impl/clickhouse/utils.py +++ b/dlt/destinations/impl/clickhouse/utils.py @@ -1,16 +1,12 @@ -from typing import Union, Literal, Dict +from typing import Union from urllib.parse import urlparse, ParseResult -SUPPORTED_FILE_FORMATS = Literal["jsonl", "parquet"] -FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING: Dict[SUPPORTED_FILE_FORMATS, str] = { - "jsonl": "JSONEachRow", - "parquet": "Parquet", -} - - def convert_storage_to_http_scheme( - url: Union[str, ParseResult], use_https: bool = False, endpoint: str = None, region: str = None + url: Union[str, ParseResult], + use_https: bool = False, + endpoint: str = None, + region: str = None, ) -> str: try: if isinstance(url, str): diff --git a/tests/load/clickhouse/test_clickhouse_adapter.py b/tests/load/clickhouse/test_clickhouse_adapter.py index fad8b7a02c..e8e2b327c0 100644 --- a/tests/load/clickhouse/test_clickhouse_adapter.py +++ b/tests/load/clickhouse/test_clickhouse_adapter.py @@ -1,11 +1,10 @@ from typing import Generator, Dict, cast -import pytest - import dlt from dlt.common.utils import custom_environ from dlt.destinations.adapters import clickhouse_adapter -from dlt.destinations.impl.clickhouse.sql_client import TDeployment, ClickHouseSqlClient +from dlt.destinations.impl.clickhouse.sql_client import ClickHouseSqlClient +from dlt.destinations.impl.clickhouse.typing import TDeployment from tests.load.clickhouse.utils import get_deployment_type from tests.pipeline.utils import assert_load_info diff --git a/tests/load/clickhouse/test_clickhouse_configuration.py b/tests/load/clickhouse/test_clickhouse_configuration.py index 7f3cb6a1f6..a4e8abc8dd 100644 --- a/tests/load/clickhouse/test_clickhouse_configuration.py +++ b/tests/load/clickhouse/test_clickhouse_configuration.py @@ -1,4 +1,3 @@ -import os from typing import Iterator import pytest diff --git a/tests/load/clickhouse/utils.py b/tests/load/clickhouse/utils.py index 809e929261..5c34d52148 100644 --- a/tests/load/clickhouse/utils.py +++ b/tests/load/clickhouse/utils.py @@ -1,4 +1,5 @@ -from dlt.destinations.impl.clickhouse.sql_client import TDeployment, ClickHouseSqlClient +from dlt.destinations.impl.clickhouse.sql_client import ClickHouseSqlClient +from dlt.destinations.impl.clickhouse.typing import TDeployment def get_deployment_type(client: ClickHouseSqlClient) -> TDeployment: From 3028cf4bb13d44d22987fb46031f994ef94b76b9 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Fri, 12 Jul 2024 16:59:17 +0200 Subject: [PATCH 31/33] Move ClickHouse configuration options from credentials to client configuration Signed-off-by: Marcel Coetzee --- .../impl/clickhouse/configuration.py | 22 ++++++++++--------- .../impl/clickhouse/sql_client.py | 12 +++++----- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/configuration.py b/dlt/destinations/impl/clickhouse/configuration.py index 1e548cc5b9..fbda58abc7 100644 --- a/dlt/destinations/impl/clickhouse/configuration.py +++ b/dlt/destinations/impl/clickhouse/configuration.py @@ -31,12 +31,6 @@ class ClickHouseCredentials(ConnectionStringCredentials): """Timeout for establishing connection. Defaults to 10 seconds.""" send_receive_timeout: int = 300 """Timeout for sending and receiving data. Defaults to 300 seconds.""" - dataset_table_separator: str = "___" - """Separator for dataset table names, defaults to '___', i.e. 'database.dataset___table'.""" - table_engine_type: Optional[TTableEngineType] = "merge_tree" - """The default table engine to use. Defaults to 'merge_tree'. Other implemented options are 'shared_merge_tree' and 'replicated_merge_tree'.""" - dataset_sentinel_table_name: str = "dlt_sentinel_table" - """Special table to mark dataset as existing""" gcp_access_key_id: Optional[str] = None """When loading from a gcp bucket, you need to provide gcp interoperable keys""" gcp_secret_access_key: Optional[str] = None @@ -81,10 +75,18 @@ class ClickHouseClientConfiguration(DestinationClientDwhWithStagingConfiguration ) credentials: ClickHouseCredentials = None - # Primary key columns are used to build a sparse primary index which allows for efficient data retrieval, - # but they do not enforce uniqueness constraints. It permits duplicate values even for the primary key - # columns within the same granule. - # See: https://clickhouse.com/docs/en/optimize/sparse-primary-indexes + dataset_table_separator: str = "___" + """Separator for dataset table names, defaults to '___', i.e. 'database.dataset___table'.""" + table_engine_type: Optional[TTableEngineType] = "merge_tree" + """The default table engine to use. Defaults to 'merge_tree'. Other implemented options are 'shared_merge_tree' and 'replicated_merge_tree'.""" + dataset_sentinel_table_name: str = "dlt_sentinel_table" + """Special table to mark dataset as existing""" + + __config_gen_annotations__: ClassVar[List[str]] = [ + "dataset_table_separator", + "dataset_sentinel_table_name", + "table_engine_type", + ] def fingerprint(self) -> str: """Returns a fingerprint of the host part of a connection string.""" diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index c3c4f67806..7584b1b027 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -70,9 +70,9 @@ def __init__( def has_dataset(self) -> bool: # we do not need to normalize dataset_sentinel_table_name - sentinel_table = self.credentials.dataset_sentinel_table_name + sentinel_table = self.config.dataset_sentinel_table_name return sentinel_table in [ - t.split(self.credentials.dataset_table_separator)[1] for t in self._list_tables() + t.split(self.config.dataset_table_separator)[1] for t in self._list_tables() ] def open_connection(self) -> clickhouse_driver.dbapi.connection.Connection: @@ -111,9 +111,9 @@ def execute_sql( def create_dataset(self) -> None: # We create a sentinel table which defines whether we consider the dataset created. sentinel_table_name = self.make_qualified_table_name( - self.credentials.dataset_sentinel_table_name + self.config.dataset_sentinel_table_name ) - sentinel_table_type = cast(TTableEngineType, self.credentials.table_engine_type) + sentinel_table_type = cast(TTableEngineType, self.config.table_engine_type) self.execute_sql(f""" CREATE TABLE {sentinel_table_name} (_dlt_id String NOT NULL PRIMARY KEY) @@ -123,7 +123,7 @@ def create_dataset(self) -> None: def drop_dataset(self) -> None: # always try to drop the sentinel table. sentinel_table_name = self.make_qualified_table_name( - self.credentials.dataset_sentinel_table_name + self.config.dataset_sentinel_table_name ) # drop a sentinel table self.execute_sql(f"DROP TABLE {sentinel_table_name} SYNC") @@ -212,7 +212,7 @@ def make_qualified_table_name_path( if table_name: # table name combines dataset name and table name table_name = self.capabilities.casefold_identifier( - f"{self.dataset_name}{self.credentials.dataset_table_separator}{table_name}" + f"{self.dataset_name}{self.config.dataset_table_separator}{table_name}" ) if escape: table_name = self.capabilities.escape_identifier(table_name) From 8f6043680ecd849b4b8975dc350fd6d9f08fd357 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Fri, 12 Jul 2024 17:16:53 +0200 Subject: [PATCH 32/33] Move table_engine_type from credentials to client configuration Signed-off-by: Marcel Coetzee --- dlt/destinations/impl/clickhouse/clickhouse.py | 3 ++- dlt/destinations/impl/clickhouse/sql_client.py | 11 +++++++---- .../clickhouse/test_clickhouse_table_builder.py | 13 +++++++------ 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index ae5c008c1b..f3d5b9aeac 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -285,6 +285,7 @@ def __init__( config.normalize_staging_dataset_name(schema), config.credentials, capabilities, + config ) super().__init__(schema, config, self.sql_client) self.config: ClickHouseClientConfiguration = config @@ -347,7 +348,7 @@ def _get_table_update_sql( TTableEngineType, table.get( cast(str, TABLE_ENGINE_TYPE_HINT), - self.config.credentials.table_engine_type, + self.config.table_engine_type, ), ) sql[0] = f"{sql[0]}\nENGINE = {TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR.get(table_type)}" diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index 7584b1b027..10bd055104 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -1,4 +1,5 @@ import datetime # noqa: I251 +from clickhouse_driver import dbapi as clickhouse_dbapi # type: ignore[import-untyped] from contextlib import contextmanager from typing import ( Iterator, @@ -13,7 +14,7 @@ cast, ) -import clickhouse_driver # type: ignore[import-untyped] +import clickhouse_driver import clickhouse_driver.errors # type: ignore[import-untyped] from clickhouse_driver.dbapi import OperationalError # type: ignore[import-untyped] from clickhouse_driver.dbapi.extras import DictCursor # type: ignore[import-untyped] @@ -26,7 +27,7 @@ DatabaseTransientException, DatabaseTerminalException, ) -from dlt.destinations.impl.clickhouse.configuration import ClickHouseCredentials +from dlt.destinations.impl.clickhouse.configuration import ClickHouseCredentials, ClickHouseClientConfiguration from dlt.destinations.impl.clickhouse.typing import ( TTableEngineType, TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR, @@ -54,7 +55,7 @@ class ClickHouseDBApiCursorImpl(DBApiCursorImpl): class ClickHouseSqlClient( SqlClientBase[clickhouse_driver.dbapi.connection.Connection], DBTransaction ): - dbapi: ClassVar[DBApi] = clickhouse_driver.dbapi + dbapi: ClassVar[DBApi] = clickhouse_dbapi def __init__( self, @@ -62,14 +63,16 @@ def __init__( staging_dataset_name: str, credentials: ClickHouseCredentials, capabilities: DestinationCapabilitiesContext, + config: ClickHouseClientConfiguration, ) -> None: super().__init__(credentials.database, dataset_name, staging_dataset_name, capabilities) self._conn: clickhouse_driver.dbapi.connection = None self.credentials = credentials self.database_name = credentials.database + self.config = config def has_dataset(self) -> bool: - # we do not need to normalize dataset_sentinel_table_name + # we do not need to normalize dataset_sentinel_table_name. sentinel_table = self.config.dataset_sentinel_table_name return sentinel_table in [ t.split(self.config.dataset_table_separator)[1] for t in self._list_tables() diff --git a/tests/load/clickhouse/test_clickhouse_table_builder.py b/tests/load/clickhouse/test_clickhouse_table_builder.py index 281e39e826..ec26bf66ac 100644 --- a/tests/load/clickhouse/test_clickhouse_table_builder.py +++ b/tests/load/clickhouse/test_clickhouse_table_builder.py @@ -176,15 +176,16 @@ def test_clickhouse_create_table_with_hints(client: ClickHouseClient) -> None: def test_clickhouse_table_engine_configuration() -> None: - with custom_environ({"DESTINATION__CLICKHOUSE__CREDENTIALS__HOST": "localhost"}): - C = resolve_configuration(ClickHouseCredentials(), sections=("destination", "clickhouse")) - assert C.table_engine_type == "merge_tree" + with custom_environ({"DESTINATION__CLICKHOUSE__CREDENTIALS__HOST": "localhost", "DESTINATION__CLICKHOUSE__DATASET_NAME": f"test_{uniq_id()}"}): + config = resolve_configuration(ClickHouseClientConfiguration(), sections=("destination", "clickhouse")) + assert config.table_engine_type == "merge_tree" with custom_environ( { "DESTINATION__CLICKHOUSE__CREDENTIALS__HOST": "localhost", - "DESTINATION__CLICKHOUSE__CREDENTIALS__TABLE_ENGINE_TYPE": "replicated_merge_tree", + "DESTINATION__CLICKHOUSE__TABLE_ENGINE_TYPE": "replicated_merge_tree", + "DESTINATION__CLICKHOUSE__DATASET_NAME": f"test_{uniq_id()}" } ): - C = resolve_configuration(ClickHouseCredentials(), sections=("destination", "clickhouse")) - assert C.table_engine_type == "replicated_merge_tree" + config = resolve_configuration(ClickHouseClientConfiguration(), sections=("destination", "clickhouse")) + assert config.table_engine_type == "replicated_merge_tree" From 477e8153720c6626ab577b67312206bac2b37dd4 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Fri, 12 Jul 2024 20:28:27 +0200 Subject: [PATCH 33/33] Docs Signed-off-by: Marcel Coetzee --- .../impl/clickhouse/clickhouse.py | 2 +- .../impl/clickhouse/sql_client.py | 5 ++++- .../dlt-ecosystem/destinations/clickhouse.md | 19 ++++++++++++++----- .../test_clickhouse_table_builder.py | 17 +++++++++++++---- 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index f3d5b9aeac..148fca3f1e 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -285,7 +285,7 @@ def __init__( config.normalize_staging_dataset_name(schema), config.credentials, capabilities, - config + config, ) super().__init__(schema, config, self.sql_client) self.config: ClickHouseClientConfiguration = config diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index 10bd055104..25914e4093 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -27,7 +27,10 @@ DatabaseTransientException, DatabaseTerminalException, ) -from dlt.destinations.impl.clickhouse.configuration import ClickHouseCredentials, ClickHouseClientConfiguration +from dlt.destinations.impl.clickhouse.configuration import ( + ClickHouseCredentials, + ClickHouseClientConfiguration, +) from dlt.destinations.impl.clickhouse.typing import ( TTableEngineType, TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR, diff --git a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md index 917cd0bc0c..bf8e2bce02 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md @@ -66,8 +66,6 @@ To load data into ClickHouse, you need to create a ClickHouse database. While we port = 9000 # ClickHouse native TCP protocol port, default is 9000. http_port = 8443 # ClickHouse HTTP port, default is 9000. secure = 1 # Set to 1 if using HTTPS, else 0. - dataset_table_separator = "___" # Separator for dataset table names from dataset. - table_engine_type = "merge_tree" # The default table engine to use. ``` :::info Network Ports @@ -96,6 +94,17 @@ To load data into ClickHouse, you need to create a ClickHouse database. While we destination.clickhouse.credentials="clickhouse://dlt:Dlt*12345789234567@localhost:9000/dlt?secure=1" ``` +### 3. Add configuration options + +You can set the following configuration options in the `.dlt/secrets.toml` file: + +```toml +[destination.clickhouse] +dataset_table_separator = "___" # The default separator for dataset table names from dataset. +table_engine_type = "merge_tree" # The default table engine to use. +dataset_sentinel_table_name = "dlt_sentinel_table" # The default name for sentinel tables. +``` + ## Write disposition All [write dispositions](../../general-usage/incremental-loading#choosing-a-write-disposition) are supported. @@ -149,9 +158,9 @@ dlt defaults to `MergeTree` table engine. You can specify an alternate table eng You can set a default table engine for all resources and dlt tables by adding the `table_engine_type` parameter to your ClickHouse credentials in the `.dlt/secrets.toml` file: ```toml -[destination.clickhouse.credentials] -# ... (other credentials) -table_engine_type = "merge_tree" # The default table engine to use. +[destination.clickhouse] +# ... (other configuration options) +table_engine_type = "merge_tree" # The default table engine to use. ``` ### Setting the table engine for specific resources diff --git a/tests/load/clickhouse/test_clickhouse_table_builder.py b/tests/load/clickhouse/test_clickhouse_table_builder.py index ec26bf66ac..433383b631 100644 --- a/tests/load/clickhouse/test_clickhouse_table_builder.py +++ b/tests/load/clickhouse/test_clickhouse_table_builder.py @@ -176,16 +176,25 @@ def test_clickhouse_create_table_with_hints(client: ClickHouseClient) -> None: def test_clickhouse_table_engine_configuration() -> None: - with custom_environ({"DESTINATION__CLICKHOUSE__CREDENTIALS__HOST": "localhost", "DESTINATION__CLICKHOUSE__DATASET_NAME": f"test_{uniq_id()}"}): - config = resolve_configuration(ClickHouseClientConfiguration(), sections=("destination", "clickhouse")) + with custom_environ( + { + "DESTINATION__CLICKHOUSE__CREDENTIALS__HOST": "localhost", + "DESTINATION__CLICKHOUSE__DATASET_NAME": f"test_{uniq_id()}", + } + ): + config = resolve_configuration( + ClickHouseClientConfiguration(), sections=("destination", "clickhouse") + ) assert config.table_engine_type == "merge_tree" with custom_environ( { "DESTINATION__CLICKHOUSE__CREDENTIALS__HOST": "localhost", "DESTINATION__CLICKHOUSE__TABLE_ENGINE_TYPE": "replicated_merge_tree", - "DESTINATION__CLICKHOUSE__DATASET_NAME": f"test_{uniq_id()}" + "DESTINATION__CLICKHOUSE__DATASET_NAME": f"test_{uniq_id()}", } ): - config = resolve_configuration(ClickHouseClientConfiguration(), sections=("destination", "clickhouse")) + config = resolve_configuration( + ClickHouseClientConfiguration(), sections=("destination", "clickhouse") + ) assert config.table_engine_type == "replicated_merge_tree"