From 942063af3ccc91de5c80e2c94c95d2f93bde9a0d Mon Sep 17 00:00:00 2001 From: Michal Bobowski Date: Wed, 23 Oct 2024 17:24:31 +0200 Subject: [PATCH] SNOW-1728000 Creating Iceberg table in e2e tests --- .../iceberg_json_aws.json | 30 ++++++++++++++ test/test_suit/iceberg_json_aws.py | 39 +++++++++++++++++++ .../resilience_tests/test_kc_delete_create.py | 3 +- .../test_kc_delete_create_chaos.py | 3 +- .../resilience_tests/test_kc_delete_resume.py | 3 +- .../test_kc_delete_resume_chaos.py | 3 +- .../resilience_tests/test_kc_pause_create.py | 3 +- .../test_kc_pause_create_chaos.py | 3 +- .../resilience_tests/test_kc_pause_resume.py | 3 +- .../test_kc_pause_resume_chaos.py | 3 +- .../resilience_tests/test_kc_recreate.py | 3 +- .../test_kc_recreate_chaos.py | 3 +- .../resilience_tests/test_kc_restart.py | 3 +- test/test_suit/test_auto_table_creation.py | 3 +- test/test_suit/test_avro_avro.py | 3 +- test/test_suit/test_avrosr_avrosr.py | 3 +- .../test_confluent_protobuf_protobuf.py | 3 +- test/test_suit/test_json_json.py | 3 +- test/test_suit/test_native_string_avrosr.py | 3 +- .../test_native_string_json_without_schema.py | 3 +- test/test_suit/test_native_string_protobuf.py | 3 +- test/test_suit/test_schema_mapping.py | 3 +- .../test_schema_not_supported_converter.py | 3 +- ...pe_streaming_channel_migration_disabled.py | 3 +- ...t_snowpipe_streaming_schema_mapping_dlq.py | 3 +- .../test_snowpipe_streaming_string_avro_sr.py | 3 +- .../test_snowpipe_streaming_string_json.py | 3 +- ...test_snowpipe_streaming_string_json_dlq.py | 3 +- ..._streaming_string_json_ignore_tombstone.py | 3 +- ...est_streaming_client_parameter_override.py | 3 +- test/test_suit/test_string_avro.py | 3 +- test/test_suit/test_string_avrosr.py | 3 +- test/test_suit/test_string_json.py | 3 +- .../test_string_json_ignore_tombstone.py | 3 +- test/test_suit/test_string_json_proxy.py | 3 +- test/test_suites.py | 11 ++++++ test/test_verify.py | 24 ++++++++++++ 37 files changed, 137 insertions(+), 66 deletions(-) create mode 100644 test/rest_request_template/iceberg_json_aws.json create mode 100644 test/test_suit/iceberg_json_aws.py diff --git a/test/rest_request_template/iceberg_json_aws.json b/test/rest_request_template/iceberg_json_aws.json new file mode 100644 index 000000000..d4b19c67f --- /dev/null +++ b/test/rest_request_template/iceberg_json_aws.json @@ -0,0 +1,30 @@ +{ + "name": "SNOWFLAKE_CONNECTOR_NAME", + "config": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "topics": "SNOWFLAKE_TEST_TOPIC", + "tasks.max": "1", + "buffer.flush.time": "10", + "buffer.count.records": "100", + "buffer.size.bytes": "5000000", + "snowflake.url.name": "SNOWFLAKE_HOST", + "snowflake.user.name": "SNOWFLAKE_USER", + "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", + "snowflake.database.name": "SNOWFLAKE_DATABASE", + "snowflake.schema.name": "SNOWFLAKE_SCHEMA", + "snowflake.role.name": "SNOWFLAKE_ROLE", + "snowflake.ingestion.method": "SNOWPIPE_STREAMING", + "snowflake.streaming.enable.single.buffer": true, + "snowflake.streaming.closeChannelsInParallel.enabled": true, + "snowflake.enable.schematization": "false", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "jmx": "true", + "errors.tolerance": "all", + "errors.log.enable": true, + "errors.deadletterqueue.topic.name": "DLQ_TOPIC", + "errors.deadletterqueue.topic.replication.factor": 1, + "snowflake.streaming.iceberg.enabled": true + } +} \ No newline at end of file diff --git a/test/test_suit/iceberg_json_aws.py b/test/test_suit/iceberg_json_aws.py new file mode 100644 index 000000000..ce0e9f077 --- /dev/null +++ b/test/test_suit/iceberg_json_aws.py @@ -0,0 +1,39 @@ +import datetime + +from test_suit.test_utils import RetryableError, NonRetryableError +import json +from time import sleep + + +class TestIcebergJsonAws: + def __init__(self, driver, nameSalt: str): + self.driver = driver + self.fileName = "iceberg_json_aws" + self.topic = self.fileName + nameSalt + + self.driver.create_iceberg_table_with_content( + table_name=self.topic, + external_volume="kafka_push_e2e_volume_aws", # volume created manually + ) + + def getConfigFileName(self): + return self.fileName + ".json" + + def send(self): + msg = json.dumps( + { + "id": 1, + "body_temperature": 36.6, + "name": "Steve", + "approved_coffee_types": ["Espresso"], + "animals_possessed": {"dogs": True, "cats": False}, + } + ) + + def verify(self, round): + res = self.driver.select_number_of_records(self.topic) + print("Count records in table {}={}".format(self.topic, str(res))) + + def clean(self): + self.driver.drop_iceberg_table(self.topic) + return diff --git a/test/test_suit/resilience_tests/test_kc_delete_create.py b/test/test_suit/resilience_tests/test_kc_delete_create.py index 06f774a9f..684931729 100644 --- a/test/test_suit/resilience_tests/test_kc_delete_create.py +++ b/test/test_suit/resilience_tests/test_kc_delete_create.py @@ -47,8 +47,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_delete_create_chaos.py b/test/test_suit/resilience_tests/test_kc_delete_create_chaos.py index 8cd2c590e..dc9d517c7 100644 --- a/test/test_suit/resilience_tests/test_kc_delete_create_chaos.py +++ b/test/test_suit/resilience_tests/test_kc_delete_create_chaos.py @@ -50,8 +50,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_delete_resume.py b/test/test_suit/resilience_tests/test_kc_delete_resume.py index 240a1c38d..044c77c6e 100644 --- a/test/test_suit/resilience_tests/test_kc_delete_resume.py +++ b/test/test_suit/resilience_tests/test_kc_delete_resume.py @@ -48,8 +48,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_delete_resume_chaos.py b/test/test_suit/resilience_tests/test_kc_delete_resume_chaos.py index 2783c39fb..5d63eb814 100644 --- a/test/test_suit/resilience_tests/test_kc_delete_resume_chaos.py +++ b/test/test_suit/resilience_tests/test_kc_delete_resume_chaos.py @@ -53,8 +53,7 @@ def verify(self, round): # since the pressure is applied during deletion, some of the data may be ingested, so look for a range goalCountUpper = self.recordNum * self.expectedsends goalCountLower = self.recordNum * (self.expectedsends - 1) - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count between: {} - {}".format(self.topic, str(res), str(goalCountLower), str(goalCountUpper))) diff --git a/test/test_suit/resilience_tests/test_kc_pause_create.py b/test/test_suit/resilience_tests/test_kc_pause_create.py index cc5b1a1d5..b8e6b29d5 100644 --- a/test/test_suit/resilience_tests/test_kc_pause_create.py +++ b/test/test_suit/resilience_tests/test_kc_pause_create.py @@ -47,8 +47,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_pause_create_chaos.py b/test/test_suit/resilience_tests/test_kc_pause_create_chaos.py index 488b1a557..372abfe47 100644 --- a/test/test_suit/resilience_tests/test_kc_pause_create_chaos.py +++ b/test/test_suit/resilience_tests/test_kc_pause_create_chaos.py @@ -50,8 +50,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_pause_resume.py b/test/test_suit/resilience_tests/test_kc_pause_resume.py index d3f79e34f..90c8f6056 100644 --- a/test/test_suit/resilience_tests/test_kc_pause_resume.py +++ b/test/test_suit/resilience_tests/test_kc_pause_resume.py @@ -47,8 +47,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_pause_resume_chaos.py b/test/test_suit/resilience_tests/test_kc_pause_resume_chaos.py index 7a58eca29..320fe5172 100644 --- a/test/test_suit/resilience_tests/test_kc_pause_resume_chaos.py +++ b/test/test_suit/resilience_tests/test_kc_pause_resume_chaos.py @@ -50,8 +50,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_recreate.py b/test/test_suit/resilience_tests/test_kc_recreate.py index 67200c470..c9d742af4 100644 --- a/test/test_suit/resilience_tests/test_kc_recreate.py +++ b/test/test_suit/resilience_tests/test_kc_recreate.py @@ -47,8 +47,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_recreate_chaos.py b/test/test_suit/resilience_tests/test_kc_recreate_chaos.py index 8f5f45080..c568962c1 100644 --- a/test/test_suit/resilience_tests/test_kc_recreate_chaos.py +++ b/test/test_suit/resilience_tests/test_kc_recreate_chaos.py @@ -48,8 +48,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_restart.py b/test/test_suit/resilience_tests/test_kc_restart.py index f08ba1f9b..a5542417b 100644 --- a/test/test_suit/resilience_tests/test_kc_restart.py +++ b/test/test_suit/resilience_tests/test_kc_restart.py @@ -53,8 +53,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/test_auto_table_creation.py b/test/test_suit/test_auto_table_creation.py index 70cdd33a1..6ea723f01 100644 --- a/test/test_suit/test_auto_table_creation.py +++ b/test/test_suit/test_auto_table_creation.py @@ -118,8 +118,7 @@ def verify(self, round): raise NonRetryableError("Missing column {}".format(key)) - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_avro_avro.py b/test/test_suit/test_avro_avro.py index c971cb0b8..e11b16317 100644 --- a/test/test_suit/test_avro_avro.py +++ b/test/test_suit/test_avro_avro.py @@ -21,8 +21,7 @@ def send(self): self.driver.sendBytesData(self.topic, value, key) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_avrosr_avrosr.py b/test/test_suit/test_avrosr_avrosr.py index 0ecd5cf8d..dc8d23c91 100644 --- a/test/test_suit/test_avrosr_avrosr.py +++ b/test/test_suit/test_avrosr_avrosr.py @@ -48,8 +48,7 @@ def send(self): self.topic, value, self.valueSchema, key, self.keySchema) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_confluent_protobuf_protobuf.py b/test/test_suit/test_confluent_protobuf_protobuf.py index 0ad6ff508..bbc5abebf 100644 --- a/test/test_suit/test_confluent_protobuf_protobuf.py +++ b/test/test_suit/test_confluent_protobuf_protobuf.py @@ -50,8 +50,7 @@ def send(self): self.protobufProducer.flush() def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_json_json.py b/test/test_suit/test_json_json.py index 3d7efcc00..2cfa0a07d 100644 --- a/test/test_suit/test_json_json.py +++ b/test/test_suit/test_json_json.py @@ -20,8 +20,7 @@ def send(self): self.driver.sendBytesData(self.topic, value, key) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_native_string_avrosr.py b/test/test_suit/test_native_string_avrosr.py index 01d9401d6..b8cbbe131 100644 --- a/test/test_suit/test_native_string_avrosr.py +++ b/test/test_suit/test_native_string_avrosr.py @@ -31,8 +31,7 @@ def send(self): self.driver.sendAvroSRData(self.topic, value, self.valueSchema) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_native_string_json_without_schema.py b/test/test_suit/test_native_string_json_without_schema.py index 621d69441..1519ade31 100644 --- a/test/test_suit/test_native_string_json_without_schema.py +++ b/test/test_suit/test_native_string_json_without_schema.py @@ -19,8 +19,7 @@ def send(self): self.driver.sendBytesData(self.topic, value) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_native_string_protobuf.py b/test/test_suit/test_native_string_protobuf.py index 9b4775f28..f4625e454 100644 --- a/test/test_suit/test_native_string_protobuf.py +++ b/test/test_suit/test_native_string_protobuf.py @@ -33,8 +33,7 @@ def send(self): self.driver.sendBytesData(self.topic, value) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_schema_mapping.py b/test/test_suit/test_schema_mapping.py index 0e5e81200..3fd3b2c0d 100644 --- a/test/test_suit/test_schema_mapping.py +++ b/test/test_suit/test_schema_mapping.py @@ -71,8 +71,7 @@ def verify(self, round): if not metadata_exist: raise NonRetryableError("Metadata column was not created") - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_schema_not_supported_converter.py b/test/test_suit/test_schema_not_supported_converter.py index db6884ba9..24c31d52a 100644 --- a/test/test_suit/test_schema_not_supported_converter.py +++ b/test/test_suit/test_schema_not_supported_converter.py @@ -47,8 +47,7 @@ def send(self): self.driver.sendBytesData(self.topic, value, key) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res != 0: raise NonRetryableError("Nothing should be ingested with not supported converters.") diff --git a/test/test_suit/test_snowpipe_streaming_channel_migration_disabled.py b/test/test_suit/test_snowpipe_streaming_channel_migration_disabled.py index 924d02632..5d60e33d4 100644 --- a/test/test_suit/test_snowpipe_streaming_channel_migration_disabled.py +++ b/test/test_suit/test_snowpipe_streaming_channel_migration_disabled.py @@ -58,8 +58,7 @@ def send(self): sleep(2) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}".format(self.topic, str(res))) if res < (self.recordNum * self.partitionNum): print("Topic:" + self.topic + " count is less, will retry") diff --git a/test/test_suit/test_snowpipe_streaming_schema_mapping_dlq.py b/test/test_suit/test_snowpipe_streaming_schema_mapping_dlq.py index 9526984ae..228261241 100644 --- a/test/test_suit/test_snowpipe_streaming_schema_mapping_dlq.py +++ b/test/test_suit/test_snowpipe_streaming_schema_mapping_dlq.py @@ -76,8 +76,7 @@ def verify(self, round): raise NonRetryableError("Metadata column was not created") # recordNum of records should be inserted - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != self.recordNum: diff --git a/test/test_suit/test_snowpipe_streaming_string_avro_sr.py b/test/test_suit/test_snowpipe_streaming_string_avro_sr.py index 25c087339..e3ecf3c5b 100644 --- a/test/test_suit/test_snowpipe_streaming_string_avro_sr.py +++ b/test/test_suit/test_snowpipe_streaming_string_avro_sr.py @@ -47,8 +47,7 @@ def send(self): sleep(2) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}".format(self.topic, str(res))) if res < (self.recordNum * self.partitionNum): print("Topic:" + self.topic + " count is less, will retry") diff --git a/test/test_suit/test_snowpipe_streaming_string_json.py b/test/test_suit/test_snowpipe_streaming_string_json.py index 12a2d7ba7..df05482a4 100644 --- a/test/test_suit/test_snowpipe_streaming_string_json.py +++ b/test/test_suit/test_snowpipe_streaming_string_json.py @@ -54,8 +54,7 @@ def send(self): sleep(2) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}".format(self.topic, str(res))) if res < (self.recordNum * self.partitionNum): print("Topic:" + self.topic + " count is less, will retry") diff --git a/test/test_suit/test_snowpipe_streaming_string_json_dlq.py b/test/test_suit/test_snowpipe_streaming_string_json_dlq.py index 4d7a63f5b..085cc9892 100644 --- a/test/test_suit/test_snowpipe_streaming_string_json_dlq.py +++ b/test/test_suit/test_snowpipe_streaming_string_json_dlq.py @@ -42,8 +42,7 @@ def send(self): sleep(2) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}".format(self.topic, str(res))) if res > 0: print("Topic:" + self.topic + " count is more than expected, will not retry") diff --git a/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py b/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py index 57611c4b9..c0556462f 100644 --- a/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py +++ b/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py @@ -47,8 +47,7 @@ def send(self): sleep(2) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}".format(self.topic, str(res))) goalCount = (self.recordNum - 2) * self.partitionNum if res < goalCount: diff --git a/test/test_suit/test_streaming_client_parameter_override.py b/test/test_suit/test_streaming_client_parameter_override.py index 6c9f90084..c3d9ce8cf 100644 --- a/test/test_suit/test_streaming_client_parameter_override.py +++ b/test/test_suit/test_streaming_client_parameter_override.py @@ -52,8 +52,7 @@ def send(self): sleep(2) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}".format(self.topic, str(res))) if res < (self.recordNum * self.partitionNum): print("Topic:" + self.topic + " count is less, will retry") diff --git a/test/test_suit/test_string_avro.py b/test/test_suit/test_string_avro.py index e20b5e9e6..21a98f080 100644 --- a/test/test_suit/test_string_avro.py +++ b/test/test_suit/test_string_avro.py @@ -19,8 +19,7 @@ def send(self): self.driver.sendBytesData(self.topic, value) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_string_avrosr.py b/test/test_suit/test_string_avrosr.py index ad6424b1c..c1ff74429 100644 --- a/test/test_suit/test_string_avrosr.py +++ b/test/test_suit/test_string_avrosr.py @@ -31,8 +31,7 @@ def send(self): self.driver.sendAvroSRData(self.topic, value, self.valueSchema) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_string_json.py b/test/test_suit/test_string_json.py index 43367fdb3..32f7074fc 100644 --- a/test/test_suit/test_string_json.py +++ b/test/test_suit/test_string_json.py @@ -32,8 +32,7 @@ def send(self): self.driver.sendBytesData(self.topic, value, [], 0, header) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() diff --git a/test/test_suit/test_string_json_ignore_tombstone.py b/test/test_suit/test_string_json_ignore_tombstone.py index 77bdacc68..0c618c156 100644 --- a/test/test_suit/test_string_json_ignore_tombstone.py +++ b/test/test_suit/test_string_json_ignore_tombstone.py @@ -29,8 +29,7 @@ def send(self): self.driver.sendBytesData(self.topic, value, [], 0, header) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) goalCount = self.recordCount - 1 # custom sf converters treat this as a normal record, so only None value is ignored print("Got " + str(res) + " rows. Expected " + str(goalCount) + " rows") if res == 0: diff --git a/test/test_suit/test_string_json_proxy.py b/test/test_suit/test_string_json_proxy.py index ade177ad9..84113c0f6 100644 --- a/test/test_suit/test_string_json_proxy.py +++ b/test/test_suit/test_string_json_proxy.py @@ -19,8 +19,7 @@ def send(self): self.driver.sendBytesData(self.topic, value, [], 0, header) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suites.py b/test/test_suites.py index 9e2bb16c9..d20dfef32 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -616,5 +616,16 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS cloud_platform=CloudPlatform.ALL, ), ), + ( + "TestIcebergJsonAws", + EndToEndTestSuite( + test_instance=TestIcebergJsonAws( + driver, nameSalt + ), + run_in_confluent=True, + run_in_apache=True, + cloud_platform=CloudPlatform.AWS, + ), + ), ] ) diff --git a/test/test_verify.py b/test/test_verify.py index 91ee8250f..63fe639ba 100755 --- a/test/test_verify.py +++ b/test/test_verify.py @@ -296,6 +296,30 @@ def cleanTableStagePipe(self, connectorName, topicName="", partitionNumber=1): print(datetime.now().strftime("%H:%M:%S "), "=== Done ===", flush=True) + def create_iceberg_table_with_content(self, table_name: str, external_volume: str): + sql = """ + CREATE ICEBERG TABLE IF NOT EXISTS {} ( + record_content OBJECT( + id INT, + body_temperature FLOAT, + name STRING, + approved_coffee_types ARRAY(STRING), + animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN) + ) + ) + EXTERNAL_VOLUME = '{}' + CATALOG = 'SNOWFLAKE' + BASE_LOCATION = '{}' + ; + """.format(table_name, external_volume, table_name) + self.snowflake_conn.cursor().execute(sql) + + def drop_iceberg_table(self, table_name: str): + self.snowflake_conn.cursor().execute("DROP ICEBERG TABLE IF EXISTS {}".format(table_name)) + + def select_number_of_records(self, table_name: str) -> str: + return self.snowflake_conn.cursor().execute("SELECT count(*) FROM {}".format(table_name)).fetchone()[0] + def verifyStageIsCleaned(self, connectorName, topicName=""): if topicName == "": topicName = connectorName