From 3480d436c5bb54761ab40bb553118999e98ca30b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Bobowski?= <145468486+sfc-gh-mbobowski@users.noreply.github.com> Date: Fri, 29 Nov 2024 11:24:27 +0100 Subject: [PATCH] SNOW-1831140 Iceberg schema evolution e2e test setup (#1010) --- .../iceberg_avro_aws.json | 3 +- .../iceberg_json_aws.json | 3 +- .../iceberg_schema_evolution_avro_aws.json | 28 +++++++ .../iceberg_schema_evolution_json_aws.json | 27 +++++++ test/test_suit/base_iceberg_test.py | 78 ++++++++++++++++++- test/test_suit/iceberg_avro_aws.py | 55 ++----------- test/test_suit/iceberg_json_aws.py | 15 ++-- .../iceberg_schema_evolution_avro_aws.py | 21 +++++ .../iceberg_schema_evolution_json_aws.py | 22 ++++++ test/test_suites.py | 20 +++++ test/test_verify.py | 14 +++- 11 files changed, 221 insertions(+), 65 deletions(-) create mode 100644 test/rest_request_template/iceberg_schema_evolution_avro_aws.json create mode 100644 test/rest_request_template/iceberg_schema_evolution_json_aws.json create mode 100644 test/test_suit/iceberg_schema_evolution_avro_aws.py create mode 100644 test/test_suit/iceberg_schema_evolution_json_aws.py diff --git a/test/rest_request_template/iceberg_avro_aws.json b/test/rest_request_template/iceberg_avro_aws.json index 150e3d151..c807a6a3b 100644 --- a/test/rest_request_template/iceberg_avro_aws.json +++ b/test/rest_request_template/iceberg_avro_aws.json @@ -22,7 +22,6 @@ "errors.deadletterqueue.topic.name": "DLQ_TOPIC", "errors.deadletterqueue.topic.replication.factor": 1, "snowflake.streaming.iceberg.enabled": true, - "snowflake.streaming.max.client.lag": "1", - "snowflake.streaming.enable.single.buffer": "true" + "snowflake.streaming.max.client.lag": "1" } } \ No newline at end of file diff --git a/test/rest_request_template/iceberg_json_aws.json b/test/rest_request_template/iceberg_json_aws.json index 873acbdf0..1a6056c27 100644 --- a/test/rest_request_template/iceberg_json_aws.json +++ b/test/rest_request_template/iceberg_json_aws.json @@ -21,7 +21,6 @@ "errors.deadletterqueue.topic.name": "DLQ_TOPIC", "errors.deadletterqueue.topic.replication.factor": 1, "snowflake.streaming.iceberg.enabled": true, - "snowflake.streaming.max.client.lag": "1", - "snowflake.streaming.enable.single.buffer": "true" + "snowflake.streaming.max.client.lag": "1" } } \ No newline at end of file diff --git a/test/rest_request_template/iceberg_schema_evolution_avro_aws.json b/test/rest_request_template/iceberg_schema_evolution_avro_aws.json new file mode 100644 index 000000000..efeb86f8d --- /dev/null +++ b/test/rest_request_template/iceberg_schema_evolution_avro_aws.json @@ -0,0 +1,28 @@ +{ + "name": "SNOWFLAKE_CONNECTOR_NAME", + "config": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "topics": "SNOWFLAKE_TEST_TOPIC", + "tasks.max": "1", + "buffer.size.bytes": "5000000", + "snowflake.url.name": "SNOWFLAKE_HOST", + "snowflake.user.name": "SNOWFLAKE_USER", + "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", + "snowflake.database.name": "SNOWFLAKE_DATABASE", + "snowflake.schema.name": "SNOWFLAKE_SCHEMA", + "snowflake.role.name": "SNOWFLAKE_ROLE", + "snowflake.ingestion.method": "SNOWPIPE_STREAMING", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter":"io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", + "value.converter.schemas.enable": "false", + "jmx": "true", + "errors.tolerance": "all", + "errors.log.enable": true, + "errors.deadletterqueue.topic.name": "DLQ_TOPIC", + "errors.deadletterqueue.topic.replication.factor": 1, + "snowflake.streaming.iceberg.enabled": true, + "snowflake.enable.schematization": true, + "snowflake.streaming.max.client.lag": "1" + } +} \ No newline at end of file diff --git a/test/rest_request_template/iceberg_schema_evolution_json_aws.json b/test/rest_request_template/iceberg_schema_evolution_json_aws.json new file mode 100644 index 000000000..4b0b7895e --- /dev/null +++ b/test/rest_request_template/iceberg_schema_evolution_json_aws.json @@ -0,0 +1,27 @@ +{ + "name": "SNOWFLAKE_CONNECTOR_NAME", + "config": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "topics": "SNOWFLAKE_TEST_TOPIC", + "tasks.max": "1", + "buffer.size.bytes": "5000000", + "snowflake.url.name": "SNOWFLAKE_HOST", + "snowflake.user.name": "SNOWFLAKE_USER", + "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", + "snowflake.database.name": "SNOWFLAKE_DATABASE", + "snowflake.schema.name": "SNOWFLAKE_SCHEMA", + "snowflake.role.name": "SNOWFLAKE_ROLE", + "snowflake.ingestion.method": "SNOWPIPE_STREAMING", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "jmx": "true", + "errors.tolerance": "all", + "errors.log.enable": true, + "errors.deadletterqueue.topic.name": "DLQ_TOPIC", + "errors.deadletterqueue.topic.replication.factor": 1, + "snowflake.streaming.iceberg.enabled": true, + "snowflake.enable.schematization": true, + "snowflake.streaming.max.client.lag": "1" + } +} \ No newline at end of file diff --git a/test/test_suit/base_iceberg_test.py b/test/test_suit/base_iceberg_test.py index 0e9360c1d..ece407ede 100644 --- a/test/test_suit/base_iceberg_test.py +++ b/test/test_suit/base_iceberg_test.py @@ -3,17 +3,91 @@ class BaseIcebergTest(BaseE2eTest): - def __init__(self, driver, nameSalt): + def __init__(self, driver, name_salt: str, config_file_name: str): self.driver = driver - self.test_message = { + self.fileName = "iceberg_json_aws" + self.topic = config_file_name + name_salt + + self.test_message_from_docs = { "id": 1, "body_temperature": 36.6, "name": "Steve", "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"], "animals_possessed": {"dogs": True, "cats": False}, } + + self.test_message_for_schema_evolution_1 = { + "null_long": None, + "null_array": None, + "null_object": None, + "empty_array": [], + "some_object": { + "null_key": None, + "string_key": "string_key" + } + } + + self.test_message_for_schema_evolution_2 = { + "null_long": 2137, + "null_array": [1, 2, 3], + "null_object": {"key": "value"}, + "empty_array": [1, 2, 3], + "some_object": { + "null_key": None, + "string_key": "string_key", + "another_string_key": "another_string_key", + "inner_object": { + "inner_object_key": 456 + } + } + } + + self.test_message_from_docs_schema = """ + { + "type":"record", + "name":"value_schema", + "fields": [ + { + "name": "id", + "type": "int" + }, + { + "name": "body_temperature", + "type": "float" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "approved_coffee_types", + "type": { + "type": "array", + "items": "string" + } + }, + { + "name": "animals_possessed", + "type": { + "type": "map", + "values": "boolean" + } + } + ] + } + """ + self.test_headers = [("header1", "value1")] + + def getConfigFileName(self): + return self.file_name + ".json" + + + def clean(self): + self.driver.drop_iceberg_table(self.topic) + + def verify_iceberg_content(self, content: dict): assert_equals(1, content['id']) assert_equals_with_precision(36.6, content['body_temperature']) diff --git a/test/test_suit/iceberg_avro_aws.py b/test/test_suit/iceberg_avro_aws.py index add8c9502..c19d2b90e 100644 --- a/test/test_suit/iceberg_avro_aws.py +++ b/test/test_suit/iceberg_avro_aws.py @@ -5,69 +5,31 @@ class TestIcebergAvroAws(BaseIcebergTest): - def __init__(self, driver, nameSalt: str): - BaseIcebergTest.__init__(self, driver, nameSalt) - self.fileName = "iceberg_avro_aws" - self.topic = self.fileName + nameSalt + def __init__(self, driver, name_salt: str): + BaseIcebergTest.__init__(self, driver, name_salt, "iceberg_avro_aws") - valueSchemaStr = """ - { - "type":"record", - "name":"value_schema", - "fields": [ - { - "name": "id", - "type": "int" - }, - { - "name": "body_temperature", - "type": "float" - }, - { - "name": "name", - "type": "string" - }, - { - "name": "approved_coffee_types", - "type": { - "type": "array", - "items": "string" - } - }, - { - "name": "animals_possessed", - "type": { - "type": "map", - "values": "boolean" - } - } - ] - } - """ - self.valueSchema = avro.loads(valueSchemaStr) - - def getConfigFileName(self): - return self.fileName + ".json" def setup(self): - self.driver.create_iceberg_table_with_content( + self.driver.create_iceberg_table_with_sample_content( table_name=self.topic, external_volume="kafka_push_e2e_volume_aws", # volume created manually ) + def send(self): value = [] for e in range(100): - value.append(self.test_message) + value.append(self.test_message_from_docs) self.driver.sendAvroSRData( topic=self.topic, value=value, - value_schema=self.valueSchema, + value_schema=avro.loads(self.test_message_from_docs_schema), headers=self.test_headers, ) + def verify(self, round): number_of_records = self.driver.select_number_of_records(self.topic) if number_of_records == 0: @@ -85,6 +47,3 @@ def verify(self, round): self.verify_iceberg_content(json.loads(first_record[0])) self.verify_iceberg_metadata(json.loads(first_record[1])) - - def clean(self): - self.driver.drop_iceberg_table(self.topic) diff --git a/test/test_suit/iceberg_json_aws.py b/test/test_suit/iceberg_json_aws.py index 58f7faf28..72d09067b 100644 --- a/test/test_suit/iceberg_json_aws.py +++ b/test/test_suit/iceberg_json_aws.py @@ -4,20 +4,17 @@ class TestIcebergJsonAws(BaseIcebergTest): - def __init__(self, driver, nameSalt: str): - BaseIcebergTest.__init__(self, driver, nameSalt) - self.fileName = "iceberg_json_aws" - self.topic = self.fileName + nameSalt + def __init__(self, driver, name_salt: str): + BaseIcebergTest.__init__(self, driver, name_salt, "iceberg_json_aws") - def getConfigFileName(self): - return self.fileName + ".json" def setup(self): - self.driver.create_iceberg_table_with_content( + self.driver.create_iceberg_table_with_sample_content( table_name=self.topic, external_volume="kafka_push_e2e_volume_aws", # volume created manually ) + def send(self): msg = json.dumps(self.test_message) @@ -35,6 +32,7 @@ def send(self): headers=self.test_headers, ) + def verify(self, round): number_of_records = self.driver.select_number_of_records(self.topic) if number_of_records == 0: @@ -52,6 +50,3 @@ def verify(self, round): self.verify_iceberg_content(json.loads(first_record[0])) self.verify_iceberg_metadata(json.loads(first_record[1])) - - def clean(self): - self.driver.drop_iceberg_table(self.topic) diff --git a/test/test_suit/iceberg_schema_evolution_avro_aws.py b/test/test_suit/iceberg_schema_evolution_avro_aws.py new file mode 100644 index 000000000..41261aa0f --- /dev/null +++ b/test/test_suit/iceberg_schema_evolution_avro_aws.py @@ -0,0 +1,21 @@ +from test_suit.base_iceberg_test import BaseIcebergTest + + +class TestIcebergSchemaEvolutionAvroAws(BaseIcebergTest): + def __init__(self, driver, name_salt: str): + BaseIcebergTest.__init__(self, driver, name_salt, "iceberg_schema_evolution_avro_aws") + + + def setup(self): + self.driver.create_empty_iceberg_table( + table_name=self.topic, + external_volume="kafka_push_e2e_volume_aws", # volume created manually + ) + + + def send(self): + pass + + + def verify(self, round): + pass diff --git a/test/test_suit/iceberg_schema_evolution_json_aws.py b/test/test_suit/iceberg_schema_evolution_json_aws.py new file mode 100644 index 000000000..cc6716b13 --- /dev/null +++ b/test/test_suit/iceberg_schema_evolution_json_aws.py @@ -0,0 +1,22 @@ +from test_suit.base_iceberg_test import BaseIcebergTest + + +class TestIcebergSchemaEvolutionJsonAws(BaseIcebergTest): + def __init__(self, driver, name_salt: str): + BaseIcebergTest.__init__(self, driver, name_salt, "iceberg_schema_evolution_json_aws") + + + def setup(self): + self.driver.create_empty_iceberg_table( + table_name=self.topic, + external_volume="kafka_push_e2e_volume_aws", # volume created manually + ) + + + def send(self): + pass + + + def verify(self, round): + pass + diff --git a/test/test_suites.py b/test/test_suites.py index 568fd18af..f34076e82 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -42,6 +42,8 @@ from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable from test_suit.iceberg_avro_aws import TestIcebergAvroAws from test_suit.iceberg_json_aws import TestIcebergJsonAws +from test_suit.iceberg_schema_evolution_avro_aws import TestIcebergSchemaEvolutionAvroAws +from test_suit.iceberg_schema_evolution_json_aws import TestIcebergSchemaEvolutionJsonAws from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson from test_suit.test_schema_evolution_json_ignore_tombstone import ( TestSchemaEvolutionJsonIgnoreTombstone, @@ -636,5 +638,23 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS cloud_platform=CloudPlatform.AWS, ), ), + ( + "TestIcebergSchemaEvolutionJsonAws", + EndToEndTestSuite( + test_instance=TestIcebergSchemaEvolutionJsonAws(driver, nameSalt), + run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release + run_in_apache=False, # TODO set to true after ingest-sdk 3.0.1 release + cloud_platform=CloudPlatform.AWS, + ), + ), + ( + "TestIcebergSchemaEvolutionAvroAws", + EndToEndTestSuite( + test_instance=TestIcebergSchemaEvolutionAvroAws(driver, nameSalt), + run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release + run_in_apache=False, + cloud_platform=CloudPlatform.AWS, + ), + ), ] ) diff --git a/test/test_verify.py b/test/test_verify.py index 892c74ad2..3ab6c6bb7 100755 --- a/test/test_verify.py +++ b/test/test_verify.py @@ -296,7 +296,19 @@ def cleanTableStagePipe(self, connectorName, topicName="", partitionNumber=1): print(datetime.now().strftime("%H:%M:%S "), "=== Done ===", flush=True) - def create_iceberg_table_with_content(self, table_name: str, external_volume: str): + def create_empty_iceberg_table(self, table_name: str, external_volume: str): + sql = """ + CREATE ICEBERG TABLE IF NOT EXISTS {} ( + record_content OBJECT() + ) + EXTERNAL_VOLUME = '{}' + CATALOG = 'SNOWFLAKE' + BASE_LOCATION = '{}' + ; + """.format(table_name, external_volume, table_name) + self.snowflake_conn.cursor().execute(sql) + + def create_iceberg_table_with_sample_content(self, table_name: str, external_volume: str): sql = """ CREATE ICEBERG TABLE IF NOT EXISTS {} ( record_content OBJECT(