From 3a399becc78dd70d468ee8e645dda997230cc9e4 Mon Sep 17 00:00:00 2001 From: Michal Bobowski Date: Wed, 23 Oct 2024 17:24:31 +0200 Subject: [PATCH] SNOW-1728000 Creating Iceberg table in e2e tests --- .../iceberg_json_aws.json | 30 ++++++++++++++ test/test_executor.py | 1 + test/test_suit/base_e2e.py | 3 ++ test/test_suit/iceberg_json_aws.py | 41 +++++++++++++++++++ .../resilience_tests/test_kc_delete_create.py | 3 +- .../test_kc_delete_create_chaos.py | 3 +- .../resilience_tests/test_kc_delete_resume.py | 3 +- .../test_kc_delete_resume_chaos.py | 3 +- .../resilience_tests/test_kc_pause_create.py | 3 +- .../test_kc_pause_create_chaos.py | 3 +- .../resilience_tests/test_kc_pause_resume.py | 3 +- .../test_kc_pause_resume_chaos.py | 3 +- .../resilience_tests/test_kc_recreate.py | 3 +- .../test_kc_recreate_chaos.py | 3 +- .../resilience_tests/test_kc_restart.py | 3 +- test/test_suit/test_auto_table_creation.py | 6 +-- .../test_auto_table_creation_topic2table.py | 3 +- test/test_suit/test_avro_avro.py | 6 +-- test/test_suit/test_avrosr_avrosr.py | 6 +-- .../test_confluent_protobuf_protobuf.py | 6 +-- test/test_suit/test_json_json.py | 6 +-- ...st_multiple_topic_to_one_table_snowpipe.py | 3 +- ...e_topic_to_one_table_snowpipe_streaming.py | 3 +- test/test_suit/test_native_complex_smt.py | 3 +- test/test_suit/test_native_string_avrosr.py | 7 ++-- .../test_native_string_json_without_schema.py | 7 ++-- test/test_suit/test_native_string_protobuf.py | 6 +-- .../test_nullable_values_after_smt.py | 3 +- test/test_suit/test_pressure.py | 3 +- test/test_suit/test_pressure_restart.py | 3 +- .../test_schema_evolution_avro_sr.py | 3 +- ..._schema_evolution_avro_sr_logical_types.py | 3 +- .../test_schema_evolution_drop_table.py | 3 +- test/test_suit/test_schema_evolution_json.py | 3 +- ..._schema_evolution_json_ignore_tombstone.py | 3 +- ...schema_evolution_multi_topic_drop_table.py | 3 +- .../test_schema_evolution_nonnullable_json.py | 3 +- ...ema_evolution_nullable_values_after_smt.py | 3 +- ...evolution_w_auto_table_creation_avro_sr.py | 3 +- ...ma_evolution_w_auto_table_creation_json.py | 3 +- ...est_schema_evolution_w_random_row_count.py | 3 +- test/test_suit/test_schema_mapping.py | 7 ++-- .../test_schema_not_supported_converter.py | 7 ++-- ...pe_streaming_channel_migration_disabled.py | 7 ++-- ...ipe_streaming_nullable_values_after_smt.py | 3 +- ...t_snowpipe_streaming_schema_mapping_dlq.py | 6 +-- .../test_snowpipe_streaming_string_avro_sr.py | 6 +-- .../test_snowpipe_streaming_string_json.py | 6 +-- ...test_snowpipe_streaming_string_json_dlq.py | 6 +-- ..._streaming_string_json_ignore_tombstone.py | 6 +-- ...est_streaming_client_parameter_override.py | 6 +-- test/test_suit/test_string_avro.py | 6 +-- test/test_suit/test_string_avrosr.py | 6 +-- test/test_suit/test_string_json.py | 6 +-- .../test_string_json_ignore_tombstone.py | 6 +-- test/test_suit/test_string_json_proxy.py | 6 +-- test/test_suit/test_utils.py | 1 + test/test_suites.py | 10 +++++ test/test_verify.py | 24 +++++++++++ 59 files changed, 228 insertions(+), 109 deletions(-) create mode 100644 test/rest_request_template/iceberg_json_aws.json create mode 100644 test/test_suit/base_e2e.py create mode 100644 test/test_suit/iceberg_json_aws.py diff --git a/test/rest_request_template/iceberg_json_aws.json b/test/rest_request_template/iceberg_json_aws.json new file mode 100644 index 000000000..d4b19c67f --- /dev/null +++ b/test/rest_request_template/iceberg_json_aws.json @@ -0,0 +1,30 @@ +{ + "name": "SNOWFLAKE_CONNECTOR_NAME", + "config": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "topics": "SNOWFLAKE_TEST_TOPIC", + "tasks.max": "1", + "buffer.flush.time": "10", + "buffer.count.records": "100", + "buffer.size.bytes": "5000000", + "snowflake.url.name": "SNOWFLAKE_HOST", + "snowflake.user.name": "SNOWFLAKE_USER", + "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", + "snowflake.database.name": "SNOWFLAKE_DATABASE", + "snowflake.schema.name": "SNOWFLAKE_SCHEMA", + "snowflake.role.name": "SNOWFLAKE_ROLE", + "snowflake.ingestion.method": "SNOWPIPE_STREAMING", + "snowflake.streaming.enable.single.buffer": true, + "snowflake.streaming.closeChannelsInParallel.enabled": true, + "snowflake.enable.schematization": "false", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "jmx": "true", + "errors.tolerance": "all", + "errors.log.enable": true, + "errors.deadletterqueue.topic.name": "DLQ_TOPIC", + "errors.deadletterqueue.topic.replication.factor": 1, + "snowflake.streaming.iceberg.enabled": true + } +} \ No newline at end of file diff --git a/test/test_executor.py b/test/test_executor.py index 9b707cd4f..f4ca5d55d 100644 --- a/test/test_executor.py +++ b/test/test_executor.py @@ -8,6 +8,7 @@ class TestExecutor: def execute(self, testSuitList, driver, nameSalt, round=1): try: for test in testSuitList: + test.setup() driver.createConnector(test.getConfigFileName(), nameSalt) driver.startConnectorWaitTime() diff --git a/test/test_suit/base_e2e.py b/test/test_suit/base_e2e.py new file mode 100644 index 000000000..60165b293 --- /dev/null +++ b/test/test_suit/base_e2e.py @@ -0,0 +1,3 @@ +class BaseE2eTest: + def setup(self): + pass \ No newline at end of file diff --git a/test/test_suit/iceberg_json_aws.py b/test/test_suit/iceberg_json_aws.py new file mode 100644 index 000000000..de9014a3d --- /dev/null +++ b/test/test_suit/iceberg_json_aws.py @@ -0,0 +1,41 @@ +import datetime + +from test_suit.test_utils import RetryableError, NonRetryableError +import json +from time import sleep +from test_suit.base_e2e import BaseE2eTest + + +class TestIcebergJsonAws(BaseE2eTest): + def __init__(self, driver, nameSalt: str): + self.driver = driver + self.fileName = "iceberg_json_aws" + self.topic = self.fileName + nameSalt + + def getConfigFileName(self): + return self.fileName + ".json" + + def setup(self): + self.driver.create_iceberg_table_with_content( + table_name=self.topic, + external_volume="kafka_push_e2e_volume_aws", # volume created manually + ) + + def send(self): + msg = json.dumps( + { + "id": 1, + "body_temperature": 36.6, + "name": "Steve", + "approved_coffee_types": ["Espresso"], + "animals_possessed": {"dogs": True, "cats": False}, + } + ) + + def verify(self, round): + res = self.driver.select_number_of_records(self.topic) + print("Count records in table {}={}".format(self.topic, str(res))) + + def clean(self): + self.driver.drop_iceberg_table(self.topic) + return diff --git a/test/test_suit/resilience_tests/test_kc_delete_create.py b/test/test_suit/resilience_tests/test_kc_delete_create.py index 06f774a9f..684931729 100644 --- a/test/test_suit/resilience_tests/test_kc_delete_create.py +++ b/test/test_suit/resilience_tests/test_kc_delete_create.py @@ -47,8 +47,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_delete_create_chaos.py b/test/test_suit/resilience_tests/test_kc_delete_create_chaos.py index 8cd2c590e..dc9d517c7 100644 --- a/test/test_suit/resilience_tests/test_kc_delete_create_chaos.py +++ b/test/test_suit/resilience_tests/test_kc_delete_create_chaos.py @@ -50,8 +50,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_delete_resume.py b/test/test_suit/resilience_tests/test_kc_delete_resume.py index 240a1c38d..044c77c6e 100644 --- a/test/test_suit/resilience_tests/test_kc_delete_resume.py +++ b/test/test_suit/resilience_tests/test_kc_delete_resume.py @@ -48,8 +48,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_delete_resume_chaos.py b/test/test_suit/resilience_tests/test_kc_delete_resume_chaos.py index 2783c39fb..5d63eb814 100644 --- a/test/test_suit/resilience_tests/test_kc_delete_resume_chaos.py +++ b/test/test_suit/resilience_tests/test_kc_delete_resume_chaos.py @@ -53,8 +53,7 @@ def verify(self, round): # since the pressure is applied during deletion, some of the data may be ingested, so look for a range goalCountUpper = self.recordNum * self.expectedsends goalCountLower = self.recordNum * (self.expectedsends - 1) - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count between: {} - {}".format(self.topic, str(res), str(goalCountLower), str(goalCountUpper))) diff --git a/test/test_suit/resilience_tests/test_kc_pause_create.py b/test/test_suit/resilience_tests/test_kc_pause_create.py index cc5b1a1d5..b8e6b29d5 100644 --- a/test/test_suit/resilience_tests/test_kc_pause_create.py +++ b/test/test_suit/resilience_tests/test_kc_pause_create.py @@ -47,8 +47,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_pause_create_chaos.py b/test/test_suit/resilience_tests/test_kc_pause_create_chaos.py index 488b1a557..372abfe47 100644 --- a/test/test_suit/resilience_tests/test_kc_pause_create_chaos.py +++ b/test/test_suit/resilience_tests/test_kc_pause_create_chaos.py @@ -50,8 +50,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_pause_resume.py b/test/test_suit/resilience_tests/test_kc_pause_resume.py index d3f79e34f..90c8f6056 100644 --- a/test/test_suit/resilience_tests/test_kc_pause_resume.py +++ b/test/test_suit/resilience_tests/test_kc_pause_resume.py @@ -47,8 +47,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_pause_resume_chaos.py b/test/test_suit/resilience_tests/test_kc_pause_resume_chaos.py index 7a58eca29..320fe5172 100644 --- a/test/test_suit/resilience_tests/test_kc_pause_resume_chaos.py +++ b/test/test_suit/resilience_tests/test_kc_pause_resume_chaos.py @@ -50,8 +50,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_recreate.py b/test/test_suit/resilience_tests/test_kc_recreate.py index 67200c470..c9d742af4 100644 --- a/test/test_suit/resilience_tests/test_kc_recreate.py +++ b/test/test_suit/resilience_tests/test_kc_recreate.py @@ -47,8 +47,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_recreate_chaos.py b/test/test_suit/resilience_tests/test_kc_recreate_chaos.py index 8f5f45080..c568962c1 100644 --- a/test/test_suit/resilience_tests/test_kc_recreate_chaos.py +++ b/test/test_suit/resilience_tests/test_kc_recreate_chaos.py @@ -48,8 +48,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/resilience_tests/test_kc_restart.py b/test/test_suit/resilience_tests/test_kc_restart.py index f08ba1f9b..a5542417b 100644 --- a/test/test_suit/resilience_tests/test_kc_restart.py +++ b/test/test_suit/resilience_tests/test_kc_restart.py @@ -53,8 +53,7 @@ def send(self): def verify(self, round): # verify record count goalCount = self.recordNum * self.expectedsends - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount))) diff --git a/test/test_suit/test_auto_table_creation.py b/test/test_suit/test_auto_table_creation.py index 70cdd33a1..65eea3241 100644 --- a/test/test_suit/test_auto_table_creation.py +++ b/test/test_suit/test_auto_table_creation.py @@ -3,10 +3,11 @@ from confluent_kafka import avro from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry import Schema +from test_suit.base_e2e import BaseE2eTest # SR -> Schema Registry # Runs only in confluent test suite environment -class TestAutoTableCreation: +class TestAutoTableCreation(BaseE2eTest): def __init__(self, driver, nameSalt, schemaRegistryAddress, testSet): self.driver = driver self.fileName = "travis_correct_auto_table_creation" @@ -118,8 +119,7 @@ def verify(self, round): raise NonRetryableError("Missing column {}".format(key)) - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_auto_table_creation_topic2table.py b/test/test_suit/test_auto_table_creation_topic2table.py index ac227f9d8..f29fa3818 100644 --- a/test/test_suit/test_auto_table_creation_topic2table.py +++ b/test/test_suit/test_auto_table_creation_topic2table.py @@ -3,10 +3,11 @@ from confluent_kafka import avro from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry import Schema +from test_suit.base_e2e import BaseE2eTest # SR -> Schema Registry # Runs only in confluent test suite environment -class TestAutoTableCreationTopic2Table: +class TestAutoTableCreationTopic2Table(BaseE2eTest): def __init__(self, driver, nameSalt, schemaRegistryAddress, testSet): self.driver = driver self.fileName = "travis_correct_auto_table_creation_topic2table" diff --git a/test/test_suit/test_avro_avro.py b/test/test_suit/test_avro_avro.py index c971cb0b8..094dd2841 100644 --- a/test/test_suit/test_avro_avro.py +++ b/test/test_suit/test_avro_avro.py @@ -1,7 +1,8 @@ from test_suit.test_utils import RetryableError, NonRetryableError +from test_suit.base_e2e import BaseE2eTest -class TestAvroAvro: +class TestAvroAvro(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_avro_avro" @@ -21,8 +22,7 @@ def send(self): self.driver.sendBytesData(self.topic, value, key) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_avrosr_avrosr.py b/test/test_suit/test_avrosr_avrosr.py index 0ecd5cf8d..ae609a7ef 100644 --- a/test/test_suit/test_avrosr_avrosr.py +++ b/test/test_suit/test_avrosr_avrosr.py @@ -1,8 +1,9 @@ from test_suit.test_utils import RetryableError, NonRetryableError from confluent_kafka import avro +from test_suit.base_e2e import BaseE2eTest -class TestAvrosrAvrosr: +class TestAvrosrAvrosr(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_avrosr_avrosr" @@ -48,8 +49,7 @@ def send(self): self.topic, value, self.valueSchema, key, self.keySchema) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_confluent_protobuf_protobuf.py b/test/test_suit/test_confluent_protobuf_protobuf.py index 0ad6ff508..b7408c586 100644 --- a/test/test_suit/test_confluent_protobuf_protobuf.py +++ b/test/test_suit/test_confluent_protobuf_protobuf.py @@ -3,10 +3,11 @@ from confluent_kafka.schema_registry.protobuf import ProtobufSerializer from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka import SerializingProducer +from test_suit.base_e2e import BaseE2eTest import time -class TestConfluentProtobufProtobuf: +class TestConfluentProtobufProtobuf(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_confluent_protobuf_protobuf" @@ -50,8 +51,7 @@ def send(self): self.protobufProducer.flush() def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_json_json.py b/test/test_suit/test_json_json.py index 3d7efcc00..ae7b7a9ec 100644 --- a/test/test_suit/test_json_json.py +++ b/test/test_suit/test_json_json.py @@ -1,8 +1,9 @@ from test_suit.test_utils import RetryableError, NonRetryableError import json +from test_suit.base_e2e import BaseE2eTest -class TestJsonJson: +class TestJsonJson(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_json_json" @@ -20,8 +21,7 @@ def send(self): self.driver.sendBytesData(self.topic, value, key) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_multiple_topic_to_one_table_snowpipe.py b/test/test_suit/test_multiple_topic_to_one_table_snowpipe.py index c2718ca9b..f7c77d4ca 100644 --- a/test/test_suit/test_multiple_topic_to_one_table_snowpipe.py +++ b/test/test_suit/test_multiple_topic_to_one_table_snowpipe.py @@ -2,9 +2,10 @@ from time import sleep from confluent_kafka import avro import json +from test_suit.base_e2e import BaseE2eTest # Runs only in confluent test suite environment -class TestMultipleTopicToOneTableSnowpipe: +class TestMultipleTopicToOneTableSnowpipe(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_multiple_topic_to_one_table_snowpipe" diff --git a/test/test_suit/test_multiple_topic_to_one_table_snowpipe_streaming.py b/test/test_suit/test_multiple_topic_to_one_table_snowpipe_streaming.py index 84f70140c..b45dce6bc 100644 --- a/test/test_suit/test_multiple_topic_to_one_table_snowpipe_streaming.py +++ b/test/test_suit/test_multiple_topic_to_one_table_snowpipe_streaming.py @@ -2,9 +2,10 @@ from time import sleep from confluent_kafka import avro import json +from test_suit.base_e2e import BaseE2eTest # Runs only in confluent test suite environment -class TestMultipleTopicToOneTableSnowpipeStreaming: +class TestMultipleTopicToOneTableSnowpipeStreaming(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_multiple_topic_to_one_table_snowpipe_streaming" diff --git a/test/test_suit/test_native_complex_smt.py b/test/test_suit/test_native_complex_smt.py index 8a3ea0c99..4600b2dc0 100644 --- a/test/test_suit/test_native_complex_smt.py +++ b/test/test_suit/test_native_complex_smt.py @@ -1,8 +1,9 @@ from test_suit.test_utils import RetryableError, NonRetryableError import json +from test_suit.base_e2e import BaseE2eTest -class TestNativeComplexSmt: +class TestNativeComplexSmt(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_native_complex_smt" diff --git a/test/test_suit/test_native_string_avrosr.py b/test/test_suit/test_native_string_avrosr.py index 01d9401d6..4638a9f6b 100644 --- a/test/test_suit/test_native_string_avrosr.py +++ b/test/test_suit/test_native_string_avrosr.py @@ -1,8 +1,8 @@ from test_suit.test_utils import RetryableError, NonRetryableError from confluent_kafka import avro +from test_suit.base_e2e import BaseE2eTest - -class TestNativeStringAvrosr: +class TestNativeStringAvrosr(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_native_string_avrosr" @@ -31,8 +31,7 @@ def send(self): self.driver.sendAvroSRData(self.topic, value, self.valueSchema) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_native_string_json_without_schema.py b/test/test_suit/test_native_string_json_without_schema.py index 621d69441..84c2224fe 100644 --- a/test/test_suit/test_native_string_json_without_schema.py +++ b/test/test_suit/test_native_string_json_without_schema.py @@ -1,8 +1,8 @@ from test_suit.test_utils import RetryableError, NonRetryableError import json +from test_suit.base_e2e import BaseE2eTest - -class TestNativeStringJsonWithoutSchema: +class TestNativeStringJsonWithoutSchema(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_native_string_json_without_schema" @@ -19,8 +19,7 @@ def send(self): self.driver.sendBytesData(self.topic, value) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_native_string_protobuf.py b/test/test_suit/test_native_string_protobuf.py index 9b4775f28..285d342cb 100644 --- a/test/test_suit/test_native_string_protobuf.py +++ b/test/test_suit/test_native_string_protobuf.py @@ -1,7 +1,8 @@ from test_suit.test_utils import RetryableError, NonRetryableError import test_data.sensor_pb2 as sensor_pb2 +from test_suit.base_e2e import BaseE2eTest -class TestNativeStringProtobuf: +class TestNativeStringProtobuf(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_native_string_protobuf" @@ -33,8 +34,7 @@ def send(self): self.driver.sendBytesData(self.topic, value) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_nullable_values_after_smt.py b/test/test_suit/test_nullable_values_after_smt.py index 809fbb0cb..d301e1d76 100644 --- a/test/test_suit/test_nullable_values_after_smt.py +++ b/test/test_suit/test_nullable_values_after_smt.py @@ -1,11 +1,12 @@ import json from snowflake.connector import DictCursor from test_suit.test_utils import NonRetryableError, RetryableError +from test_suit.base_e2e import BaseE2eTest # Testing behavior for behavior.on.null.values = IGNORE and SMTs that can return null values. # Uses a Snowpipe based connector. -class TestNullableValuesAfterSmt: +class TestNullableValuesAfterSmt(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver diff --git a/test/test_suit/test_pressure.py b/test/test_suit/test_pressure.py index ac6bbe0c4..177c0dc27 100644 --- a/test/test_suit/test_pressure.py +++ b/test/test_suit/test_pressure.py @@ -2,8 +2,9 @@ from time import sleep from multiprocessing.dummy import Pool as ThreadPool import json +from test_suit.base_e2e import BaseE2eTest -class TestPressure: +class TestPressure(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.topics = [] diff --git a/test/test_suit/test_pressure_restart.py b/test/test_suit/test_pressure_restart.py index 4b3e3d695..d779186e9 100644 --- a/test/test_suit/test_pressure_restart.py +++ b/test/test_suit/test_pressure_restart.py @@ -1,8 +1,9 @@ from test_suit.test_utils import RetryableError, NonRetryableError, ResetAndRetry import json from time import sleep +from test_suit.base_e2e import BaseE2eTest -class TestPressureRestart: +class TestPressureRestart(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.topics = [] diff --git a/test/test_suit/test_schema_evolution_avro_sr.py b/test/test_suit/test_schema_evolution_avro_sr.py index cce02574a..508bc88fa 100644 --- a/test/test_suit/test_schema_evolution_avro_sr.py +++ b/test/test_suit/test_schema_evolution_avro_sr.py @@ -1,10 +1,11 @@ from confluent_kafka import avro from test_suit.test_utils import NonRetryableError +from test_suit.base_e2e import BaseE2eTest # test if the table is updated with the correct column # add test if all the records from different topics safely land in the table -class TestSchemaEvolutionAvroSR: +class TestSchemaEvolutionAvroSR(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_schema_evolution_avro_sr" diff --git a/test/test_suit/test_schema_evolution_avro_sr_logical_types.py b/test/test_suit/test_schema_evolution_avro_sr_logical_types.py index 8c61677ca..72623d244 100644 --- a/test/test_suit/test_schema_evolution_avro_sr_logical_types.py +++ b/test/test_suit/test_schema_evolution_avro_sr_logical_types.py @@ -2,11 +2,12 @@ from confluent_kafka import avro from test_suit.test_utils import NonRetryableError +from test_suit.base_e2e import BaseE2eTest # test if the table is updated with the correct column # add test if all the records from different topics safely land in the table -class TestSchemaEvolutionAvroSRLogicalTypes: +class TestSchemaEvolutionAvroSRLogicalTypes(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_schema_evolution_avro_sr_logical_types" diff --git a/test/test_suit/test_schema_evolution_drop_table.py b/test/test_suit/test_schema_evolution_drop_table.py index 06c2cf49f..e12940a6e 100644 --- a/test/test_suit/test_schema_evolution_drop_table.py +++ b/test/test_suit/test_schema_evolution_drop_table.py @@ -2,11 +2,12 @@ from time import sleep from test_suit.test_utils import NonRetryableError +from test_suit.base_e2e import BaseE2eTest # test if the table is updated with the correct column, and if the table is # recreated and updated after it's being dropped -class TestSchemaEvolutionDropTable: +class TestSchemaEvolutionDropTable(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_schema_evolution_drop_table" diff --git a/test/test_suit/test_schema_evolution_json.py b/test/test_suit/test_schema_evolution_json.py index d00ee90ef..c4d1ff57b 100644 --- a/test/test_suit/test_schema_evolution_json.py +++ b/test/test_suit/test_schema_evolution_json.py @@ -1,11 +1,12 @@ import json from test_suit.test_utils import NonRetryableError +from test_suit.base_e2e import BaseE2eTest # test if the table is updated with the correct column # add test if all the records from different topics safely land in the table -class TestSchemaEvolutionJson: +class TestSchemaEvolutionJson(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_schema_evolution_json" diff --git a/test/test_suit/test_schema_evolution_json_ignore_tombstone.py b/test/test_suit/test_schema_evolution_json_ignore_tombstone.py index 117423c1f..33dd2829e 100644 --- a/test/test_suit/test_schema_evolution_json_ignore_tombstone.py +++ b/test/test_suit/test_schema_evolution_json_ignore_tombstone.py @@ -2,11 +2,12 @@ from time import sleep from test_suit.test_utils import NonRetryableError +from test_suit.base_e2e import BaseE2eTest # test if the table is updated with the correct column # add test if all the records from different topics safely land in the table -class TestSchemaEvolutionJsonIgnoreTombstone: +class TestSchemaEvolutionJsonIgnoreTombstone(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "test_schema_evolution_json_ignore_tombstone" diff --git a/test/test_suit/test_schema_evolution_multi_topic_drop_table.py b/test/test_suit/test_schema_evolution_multi_topic_drop_table.py index b72a24408..225156f8d 100644 --- a/test/test_suit/test_schema_evolution_multi_topic_drop_table.py +++ b/test/test_suit/test_schema_evolution_multi_topic_drop_table.py @@ -2,11 +2,12 @@ from time import sleep from test_suit.test_utils import NonRetryableError +from test_suit.base_e2e import BaseE2eTest # test if the table is updated with the correct column, and if the table is # recreated and updated after it's being dropped -class TestSchemaEvolutionMultiTopicDropTable: +class TestSchemaEvolutionMultiTopicDropTable(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_schema_evolution_multi_topic_drop_table" diff --git a/test/test_suit/test_schema_evolution_nonnullable_json.py b/test/test_suit/test_schema_evolution_nonnullable_json.py index 24a8ec145..849b36409 100644 --- a/test/test_suit/test_schema_evolution_nonnullable_json.py +++ b/test/test_suit/test_schema_evolution_nonnullable_json.py @@ -1,10 +1,11 @@ import json from test_suit.test_utils import NonRetryableError +from test_suit.base_e2e import BaseE2eTest # test if the table is updated with the correct column -class TestSchemaEvolutionNonNullableJson: +class TestSchemaEvolutionNonNullableJson(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_schema_evolution_nonnullable_json" diff --git a/test/test_suit/test_schema_evolution_nullable_values_after_smt.py b/test/test_suit/test_schema_evolution_nullable_values_after_smt.py index 910f3905d..b5deded31 100644 --- a/test/test_suit/test_schema_evolution_nullable_values_after_smt.py +++ b/test/test_suit/test_schema_evolution_nullable_values_after_smt.py @@ -1,11 +1,12 @@ import json from snowflake.connector import DictCursor from test_suit.test_utils import NonRetryableError, RetryableError +from test_suit.base_e2e import BaseE2eTest # Testing behavior for behavior.on.null.values = IGNORE and SMTs that can return null values. # With enabled schematization. -class TestSchemaEvolutionNullableValuesAfterSmt: +class TestSchemaEvolutionNullableValuesAfterSmt(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver diff --git a/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py b/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py index 81257b63c..39f1fc3fd 100644 --- a/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py +++ b/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py @@ -1,12 +1,13 @@ from confluent_kafka import avro from test_suit.test_utils import NonRetryableError +from test_suit.base_e2e import BaseE2eTest # test if the table is updated with the correct column # add test if all the records from different topics safely land in the table # the table is suppose to be created with only RECORD_METADATA in the beginning # while the rest of columns should be handled by schema evolution -class TestSchemaEvolutionWithAutoTableCreationAvroSR: +class TestSchemaEvolutionWithAutoTableCreationAvroSR(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_schema_evolution_w_auto_table_creation_avro_sr" diff --git a/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py b/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py index 103279ea6..61be741da 100644 --- a/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py +++ b/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py @@ -1,13 +1,14 @@ import json from test_suit.test_utils import NonRetryableError +from test_suit.base_e2e import BaseE2eTest # test if the table is updated with the correct column # add test if all the records from different topics safely land in the table # the table is suppose to be created with only RECORD_METADATA in the beginning # while the rest of columns should be handled by schema evolution -class TestSchemaEvolutionWithAutoTableCreationJson: +class TestSchemaEvolutionWithAutoTableCreationJson(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_schema_evolution_w_auto_table_creation_json" diff --git a/test/test_suit/test_schema_evolution_w_random_row_count.py b/test/test_suit/test_schema_evolution_w_random_row_count.py index fbd26b977..dc9e6ef76 100644 --- a/test/test_suit/test_schema_evolution_w_random_row_count.py +++ b/test/test_suit/test_schema_evolution_w_random_row_count.py @@ -2,11 +2,12 @@ import random from test_suit.test_utils import NonRetryableError +from test_suit.base_e2e import BaseE2eTest # test if the ingestion works when the schematization alter table invalidation happens # halfway through a batch -class TestSchemaEvolutionWithRandomRowCount: +class TestSchemaEvolutionWithRandomRowCount(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "test_schema_evolution_w_random_row_count" diff --git a/test/test_suit/test_schema_mapping.py b/test/test_suit/test_schema_mapping.py index 0e5e81200..85f0c2ca4 100644 --- a/test/test_suit/test_schema_mapping.py +++ b/test/test_suit/test_schema_mapping.py @@ -1,10 +1,12 @@ from test_suit.test_utils import RetryableError, NonRetryableError import json import datetime +from test_suit.base_e2e import BaseE2eTest + # test if each type of data fit into the right column with the right type # also test if the metadata column is automatically added -class TestSchemaMapping: +class TestSchemaMapping(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_schema_mapping" @@ -71,8 +73,7 @@ def verify(self, round): if not metadata_exist: raise NonRetryableError("Metadata column was not created") - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_schema_not_supported_converter.py b/test/test_suit/test_schema_not_supported_converter.py index db6884ba9..2d22f66dd 100644 --- a/test/test_suit/test_schema_not_supported_converter.py +++ b/test/test_suit/test_schema_not_supported_converter.py @@ -1,11 +1,13 @@ import json from test_suit.test_utils import NonRetryableError +from test_suit.base_e2e import BaseE2eTest + # test if each type of data fit into the right column with the right type # also test if the metadata column is automatically added -class TestSchemaNotSupportedConverter: +class TestSchemaNotSupportedConverter(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_schema_not_supported_converter" @@ -47,8 +49,7 @@ def send(self): self.driver.sendBytesData(self.topic, value, key) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res != 0: raise NonRetryableError("Nothing should be ingested with not supported converters.") diff --git a/test/test_suit/test_snowpipe_streaming_channel_migration_disabled.py b/test/test_suit/test_snowpipe_streaming_channel_migration_disabled.py index 924d02632..0fcbad81e 100644 --- a/test/test_suit/test_snowpipe_streaming_channel_migration_disabled.py +++ b/test/test_suit/test_snowpipe_streaming_channel_migration_disabled.py @@ -3,12 +3,14 @@ from test_suit.test_utils import RetryableError, NonRetryableError import json from time import sleep +from test_suit.base_e2e import BaseE2eTest + """ Only config added here is about migrating channel offsets from channel created in version 2.1.0 to any future versions. This test verifies if the functionality can be disabled """ -class TestSnowpipeStreamingStringJsonChannelMigrationDisabled: +class TestSnowpipeStreamingStringJsonChannelMigrationDisabled(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "test_snowpipe_streaming_channel_migration_disabled" @@ -58,8 +60,7 @@ def send(self): sleep(2) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}".format(self.topic, str(res))) if res < (self.recordNum * self.partitionNum): print("Topic:" + self.topic + " count is less, will retry") diff --git a/test/test_suit/test_snowpipe_streaming_nullable_values_after_smt.py b/test/test_suit/test_snowpipe_streaming_nullable_values_after_smt.py index 1e0c97a9d..3ed5cecd8 100644 --- a/test/test_suit/test_snowpipe_streaming_nullable_values_after_smt.py +++ b/test/test_suit/test_snowpipe_streaming_nullable_values_after_smt.py @@ -1,11 +1,12 @@ import json from snowflake.connector import DictCursor from test_suit.test_utils import NonRetryableError, RetryableError +from test_suit.base_e2e import BaseE2eTest # Testing behavior for behavior.on.null.values = IGNORE and SMTs that can return null values. # Schematization is disabled. -class TestSnowpipeStreamingNullableValuesAfterSmt: +class TestSnowpipeStreamingNullableValuesAfterSmt(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver diff --git a/test/test_suit/test_snowpipe_streaming_schema_mapping_dlq.py b/test/test_suit/test_snowpipe_streaming_schema_mapping_dlq.py index 9526984ae..1ad65fe1b 100644 --- a/test/test_suit/test_snowpipe_streaming_schema_mapping_dlq.py +++ b/test/test_suit/test_snowpipe_streaming_schema_mapping_dlq.py @@ -1,12 +1,13 @@ from test_suit.test_utils import RetryableError, NonRetryableError import json import datetime +from test_suit.base_e2e import BaseE2eTest # Test if incorrect data with a schematized column gets send to DLQ # It sends a String to a column with number data type - Expectation is that we send it to DLQ # This test is only running in kafka versions > 2.6.1 since DLQ apis are available only in later versions # Check createKafkaRecordErrorReporter in Java code -class TestSnowpipeStreamingSchemaMappingDLQ: +class TestSnowpipeStreamingSchemaMappingDLQ(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "snowpipe_streaming_schema_mapping_dlq" @@ -76,8 +77,7 @@ def verify(self, round): raise NonRetryableError("Metadata column was not created") # recordNum of records should be inserted - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != self.recordNum: diff --git a/test/test_suit/test_snowpipe_streaming_string_avro_sr.py b/test/test_suit/test_snowpipe_streaming_string_avro_sr.py index 25c087339..f115bcfc0 100644 --- a/test/test_suit/test_snowpipe_streaming_string_avro_sr.py +++ b/test/test_suit/test_snowpipe_streaming_string_avro_sr.py @@ -1,10 +1,11 @@ from test_suit.test_utils import RetryableError, NonRetryableError from time import sleep from confluent_kafka import avro +from test_suit.base_e2e import BaseE2eTest # SR -> Schema Registry # Runs only in confluent test suite environment -class TestSnowpipeStreamingStringAvroSR: +class TestSnowpipeStreamingStringAvroSR(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_snowpipe_streaming_string_avro_sr" @@ -47,8 +48,7 @@ def send(self): sleep(2) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}".format(self.topic, str(res))) if res < (self.recordNum * self.partitionNum): print("Topic:" + self.topic + " count is less, will retry") diff --git a/test/test_suit/test_snowpipe_streaming_string_json.py b/test/test_suit/test_snowpipe_streaming_string_json.py index 12a2d7ba7..fb41a8614 100644 --- a/test/test_suit/test_snowpipe_streaming_string_json.py +++ b/test/test_suit/test_snowpipe_streaming_string_json.py @@ -3,8 +3,9 @@ from test_suit.test_utils import RetryableError, NonRetryableError import json from time import sleep +from test_suit.base_e2e import BaseE2eTest -class TestSnowpipeStreamingStringJson: +class TestSnowpipeStreamingStringJson(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_snowpipe_streaming_string_json" @@ -54,8 +55,7 @@ def send(self): sleep(2) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}".format(self.topic, str(res))) if res < (self.recordNum * self.partitionNum): print("Topic:" + self.topic + " count is less, will retry") diff --git a/test/test_suit/test_snowpipe_streaming_string_json_dlq.py b/test/test_suit/test_snowpipe_streaming_string_json_dlq.py index 4d7a63f5b..35bb793a5 100644 --- a/test/test_suit/test_snowpipe_streaming_string_json_dlq.py +++ b/test/test_suit/test_snowpipe_streaming_string_json_dlq.py @@ -3,12 +3,13 @@ from test_suit.test_utils import RetryableError, NonRetryableError import json from time import sleep +from test_suit.base_e2e import BaseE2eTest ''' Testing this doesnt require a custom DLQ api to be invoked since this is happening at connect level. i.e the bad message being to Kafka is not being serialized at Converter level. ''' -class TestSnowpipeStreamingStringJsonDLQ: +class TestSnowpipeStreamingStringJsonDLQ(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "snowpipe_streaming_string_json_dlq" @@ -42,8 +43,7 @@ def send(self): sleep(2) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}".format(self.topic, str(res))) if res > 0: print("Topic:" + self.topic + " count is more than expected, will not retry") diff --git a/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py b/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py index 57611c4b9..b06d02d49 100644 --- a/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py +++ b/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py @@ -3,8 +3,9 @@ from test_suit.test_utils import RetryableError, NonRetryableError import json from time import sleep +from test_suit.base_e2e import BaseE2eTest -class TestSnowpipeStreamingStringJsonIgnoreTombstone: +class TestSnowpipeStreamingStringJsonIgnoreTombstone(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "test_snowpipe_streaming_string_json_ignore_tombstone" @@ -47,8 +48,7 @@ def send(self): sleep(2) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}".format(self.topic, str(res))) goalCount = (self.recordNum - 2) * self.partitionNum if res < goalCount: diff --git a/test/test_suit/test_streaming_client_parameter_override.py b/test/test_suit/test_streaming_client_parameter_override.py index 6c9f90084..6877c0b74 100644 --- a/test/test_suit/test_streaming_client_parameter_override.py +++ b/test/test_suit/test_streaming_client_parameter_override.py @@ -1,8 +1,9 @@ from test_suit.test_utils import RetryableError, NonRetryableError import json from time import sleep +from test_suit.base_e2e import BaseE2eTest -class TestStreamingClientParameterOverride: +class TestStreamingClientParameterOverride(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "test_streaming_client_parameter_override" @@ -52,8 +53,7 @@ def send(self): sleep(2) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) print("Count records in table {}={}".format(self.topic, str(res))) if res < (self.recordNum * self.partitionNum): print("Topic:" + self.topic + " count is less, will retry") diff --git a/test/test_suit/test_string_avro.py b/test/test_suit/test_string_avro.py index e20b5e9e6..dea6b2352 100644 --- a/test/test_suit/test_string_avro.py +++ b/test/test_suit/test_string_avro.py @@ -1,7 +1,8 @@ from test_suit.test_utils import RetryableError, NonRetryableError +from test_suit.base_e2e import BaseE2eTest -class TestStringAvro: +class TestStringAvro(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_string_avro" @@ -19,8 +20,7 @@ def send(self): self.driver.sendBytesData(self.topic, value) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_string_avrosr.py b/test/test_suit/test_string_avrosr.py index ad6424b1c..d81753ce9 100644 --- a/test/test_suit/test_string_avrosr.py +++ b/test/test_suit/test_string_avrosr.py @@ -1,8 +1,9 @@ from test_suit.test_utils import RetryableError, NonRetryableError from confluent_kafka import avro +from test_suit.base_e2e import BaseE2eTest -class TestStringAvrosr: +class TestStringAvrosr(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_string_avrosr" @@ -31,8 +32,7 @@ def send(self): self.driver.sendAvroSRData(self.topic, value, self.valueSchema) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_string_json.py b/test/test_suit/test_string_json.py index 43367fdb3..f2168e422 100644 --- a/test/test_suit/test_string_json.py +++ b/test/test_suit/test_string_json.py @@ -1,8 +1,9 @@ from test_suit.test_utils import RetryableError, NonRetryableError import json +from test_suit.base_e2e import BaseE2eTest -class TestStringJson: +class TestStringJson(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_string_json" @@ -32,8 +33,7 @@ def send(self): self.driver.sendBytesData(self.topic, value, [], 0, header) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() diff --git a/test/test_suit/test_string_json_ignore_tombstone.py b/test/test_suit/test_string_json_ignore_tombstone.py index 77bdacc68..cf58b09a6 100644 --- a/test/test_suit/test_string_json_ignore_tombstone.py +++ b/test/test_suit/test_string_json_ignore_tombstone.py @@ -1,8 +1,9 @@ from test_suit.test_utils import RetryableError, NonRetryableError import json +from test_suit.base_e2e import BaseE2eTest -class TestStringJsonIgnoreTombstone: +class TestStringJsonIgnoreTombstone(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "test_string_json_ignore_tombstone" @@ -29,8 +30,7 @@ def send(self): self.driver.sendBytesData(self.topic, value, [], 0, header) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) goalCount = self.recordCount - 1 # custom sf converters treat this as a normal record, so only None value is ignored print("Got " + str(res) + " rows. Expected " + str(goalCount) + " rows") if res == 0: diff --git a/test/test_suit/test_string_json_proxy.py b/test/test_suit/test_string_json_proxy.py index ade177ad9..d3d5648c9 100644 --- a/test/test_suit/test_string_json_proxy.py +++ b/test/test_suit/test_string_json_proxy.py @@ -1,8 +1,9 @@ from test_suit.test_utils import RetryableError, NonRetryableError import json, os +from test_suit.base_e2e import BaseE2eTest -class TestStringJsonProxy: +class TestStringJsonProxy(BaseE2eTest): def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_string_proxy" @@ -19,8 +20,7 @@ def send(self): self.driver.sendBytesData(self.topic, value, [], 0, header) def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + res = self.driver.select_number_of_records(self.topic) if res == 0: raise RetryableError() elif res != 100: diff --git a/test/test_suit/test_utils.py b/test/test_suit/test_utils.py index 94e0a2036..b44b2d970 100644 --- a/test/test_suit/test_utils.py +++ b/test/test_suit/test_utils.py @@ -1,6 +1,7 @@ import re from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization +from test_suit.base_e2e import BaseE2eTest class Error(Exception): diff --git a/test/test_suites.py b/test/test_suites.py index 9e2bb16c9..60e0e14bd 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -40,6 +40,7 @@ TestSchemaEvolutionAvroSRLogicalTypes, ) from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable +from test_suit.iceberg_json_aws import TestIcebergJsonAws from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson from test_suit.test_schema_evolution_json_ignore_tombstone import ( TestSchemaEvolutionJsonIgnoreTombstone, @@ -616,5 +617,14 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS cloud_platform=CloudPlatform.ALL, ), ), + ( + "TestIcebergJsonAws", + EndToEndTestSuite( + test_instance=TestIcebergJsonAws(driver, nameSalt), + run_in_confluent=True, + run_in_apache=True, + cloud_platform=CloudPlatform.AWS, + ), + ), ] ) diff --git a/test/test_verify.py b/test/test_verify.py index 91ee8250f..63fe639ba 100755 --- a/test/test_verify.py +++ b/test/test_verify.py @@ -296,6 +296,30 @@ def cleanTableStagePipe(self, connectorName, topicName="", partitionNumber=1): print(datetime.now().strftime("%H:%M:%S "), "=== Done ===", flush=True) + def create_iceberg_table_with_content(self, table_name: str, external_volume: str): + sql = """ + CREATE ICEBERG TABLE IF NOT EXISTS {} ( + record_content OBJECT( + id INT, + body_temperature FLOAT, + name STRING, + approved_coffee_types ARRAY(STRING), + animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN) + ) + ) + EXTERNAL_VOLUME = '{}' + CATALOG = 'SNOWFLAKE' + BASE_LOCATION = '{}' + ; + """.format(table_name, external_volume, table_name) + self.snowflake_conn.cursor().execute(sql) + + def drop_iceberg_table(self, table_name: str): + self.snowflake_conn.cursor().execute("DROP ICEBERG TABLE IF EXISTS {}".format(table_name)) + + def select_number_of_records(self, table_name: str) -> str: + return self.snowflake_conn.cursor().execute("SELECT count(*) FROM {}".format(table_name)).fetchone()[0] + def verifyStageIsCleaned(self, connectorName, topicName=""): if topicName == "": topicName = connectorName