Skip to content

Commit

Permalink
SNOW-1831140 Iceberg schema evolution e2e test setup (#1010)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski authored Nov 29, 2024
1 parent 0235f9f commit 3480d43
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 65 deletions.
3 changes: 1 addition & 2 deletions test/rest_request_template/iceberg_avro_aws.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": "true"
"snowflake.streaming.max.client.lag": "1"
}
}
3 changes: 1 addition & 2 deletions test/rest_request_template/iceberg_json_aws.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": "true"
"snowflake.streaming.max.client.lag": "1"
}
}
28 changes: 28 additions & 0 deletions test/rest_request_template/iceberg_schema_evolution_avro_aws.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"name": "SNOWFLAKE_CONNECTOR_NAME",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC",
"tasks.max": "1",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
"snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY",
"snowflake.database.name": "SNOWFLAKE_DATABASE",
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
"value.converter.schemas.enable": "false",
"jmx": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.enable.schematization": true,
"snowflake.streaming.max.client.lag": "1"
}
}
27 changes: 27 additions & 0 deletions test/rest_request_template/iceberg_schema_evolution_json_aws.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"name": "SNOWFLAKE_CONNECTOR_NAME",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC",
"tasks.max": "1",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
"snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY",
"snowflake.database.name": "SNOWFLAKE_DATABASE",
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"jmx": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.enable.schematization": true,
"snowflake.streaming.max.client.lag": "1"
}
}
78 changes: 76 additions & 2 deletions test/test_suit/base_iceberg_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,91 @@

class BaseIcebergTest(BaseE2eTest):

def __init__(self, driver, nameSalt):
def __init__(self, driver, name_salt: str, config_file_name: str):
self.driver = driver
self.test_message = {
self.fileName = "iceberg_json_aws"
self.topic = config_file_name + name_salt

self.test_message_from_docs = {
"id": 1,
"body_temperature": 36.6,
"name": "Steve",
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed": {"dogs": True, "cats": False},
}

self.test_message_for_schema_evolution_1 = {
"null_long": None,
"null_array": None,
"null_object": None,
"empty_array": [],
"some_object": {
"null_key": None,
"string_key": "string_key"
}
}

self.test_message_for_schema_evolution_2 = {
"null_long": 2137,
"null_array": [1, 2, 3],
"null_object": {"key": "value"},
"empty_array": [1, 2, 3],
"some_object": {
"null_key": None,
"string_key": "string_key",
"another_string_key": "another_string_key",
"inner_object": {
"inner_object_key": 456
}
}
}

self.test_message_from_docs_schema = """
{
"type":"record",
"name":"value_schema",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "body_temperature",
"type": "float"
},
{
"name": "name",
"type": "string"
},
{
"name": "approved_coffee_types",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "animals_possessed",
"type": {
"type": "map",
"values": "boolean"
}
}
]
}
"""

self.test_headers = [("header1", "value1")]


def getConfigFileName(self):
return self.file_name + ".json"


def clean(self):
self.driver.drop_iceberg_table(self.topic)


def verify_iceberg_content(self, content: dict):
assert_equals(1, content['id'])
assert_equals_with_precision(36.6, content['body_temperature'])
Expand Down
55 changes: 7 additions & 48 deletions test/test_suit/iceberg_avro_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,31 @@


class TestIcebergAvroAws(BaseIcebergTest):
def __init__(self, driver, nameSalt: str):
BaseIcebergTest.__init__(self, driver, nameSalt)
self.fileName = "iceberg_avro_aws"
self.topic = self.fileName + nameSalt
def __init__(self, driver, name_salt: str):
BaseIcebergTest.__init__(self, driver, name_salt, "iceberg_avro_aws")

valueSchemaStr = """
{
"type":"record",
"name":"value_schema",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "body_temperature",
"type": "float"
},
{
"name": "name",
"type": "string"
},
{
"name": "approved_coffee_types",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "animals_possessed",
"type": {
"type": "map",
"values": "boolean"
}
}
]
}
"""
self.valueSchema = avro.loads(valueSchemaStr)

def getConfigFileName(self):
return self.fileName + ".json"

def setup(self):
self.driver.create_iceberg_table_with_content(
self.driver.create_iceberg_table_with_sample_content(
table_name=self.topic,
external_volume="kafka_push_e2e_volume_aws", # volume created manually
)


def send(self):
value = []

for e in range(100):
value.append(self.test_message)
value.append(self.test_message_from_docs)

self.driver.sendAvroSRData(
topic=self.topic,
value=value,
value_schema=self.valueSchema,
value_schema=avro.loads(self.test_message_from_docs_schema),
headers=self.test_headers,
)


def verify(self, round):
number_of_records = self.driver.select_number_of_records(self.topic)
if number_of_records == 0:
Expand All @@ -85,6 +47,3 @@ def verify(self, round):

self.verify_iceberg_content(json.loads(first_record[0]))
self.verify_iceberg_metadata(json.loads(first_record[1]))

def clean(self):
self.driver.drop_iceberg_table(self.topic)
15 changes: 5 additions & 10 deletions test/test_suit/iceberg_json_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,17 @@


class TestIcebergJsonAws(BaseIcebergTest):
def __init__(self, driver, nameSalt: str):
BaseIcebergTest.__init__(self, driver, nameSalt)
self.fileName = "iceberg_json_aws"
self.topic = self.fileName + nameSalt
def __init__(self, driver, name_salt: str):
BaseIcebergTest.__init__(self, driver, name_salt, "iceberg_json_aws")

def getConfigFileName(self):
return self.fileName + ".json"

def setup(self):
self.driver.create_iceberg_table_with_content(
self.driver.create_iceberg_table_with_sample_content(
table_name=self.topic,
external_volume="kafka_push_e2e_volume_aws", # volume created manually
)


def send(self):
msg = json.dumps(self.test_message)

Expand All @@ -35,6 +32,7 @@ def send(self):
headers=self.test_headers,
)


def verify(self, round):
number_of_records = self.driver.select_number_of_records(self.topic)
if number_of_records == 0:
Expand All @@ -52,6 +50,3 @@ def verify(self, round):

self.verify_iceberg_content(json.loads(first_record[0]))
self.verify_iceberg_metadata(json.loads(first_record[1]))

def clean(self):
self.driver.drop_iceberg_table(self.topic)
21 changes: 21 additions & 0 deletions test/test_suit/iceberg_schema_evolution_avro_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from test_suit.base_iceberg_test import BaseIcebergTest


class TestIcebergSchemaEvolutionAvroAws(BaseIcebergTest):
def __init__(self, driver, name_salt: str):
BaseIcebergTest.__init__(self, driver, name_salt, "iceberg_schema_evolution_avro_aws")


def setup(self):
self.driver.create_empty_iceberg_table(
table_name=self.topic,
external_volume="kafka_push_e2e_volume_aws", # volume created manually
)


def send(self):
pass


def verify(self, round):
pass
22 changes: 22 additions & 0 deletions test/test_suit/iceberg_schema_evolution_json_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from test_suit.base_iceberg_test import BaseIcebergTest


class TestIcebergSchemaEvolutionJsonAws(BaseIcebergTest):
def __init__(self, driver, name_salt: str):
BaseIcebergTest.__init__(self, driver, name_salt, "iceberg_schema_evolution_json_aws")


def setup(self):
self.driver.create_empty_iceberg_table(
table_name=self.topic,
external_volume="kafka_push_e2e_volume_aws", # volume created manually
)


def send(self):
pass


def verify(self, round):
pass

20 changes: 20 additions & 0 deletions test/test_suites.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable
from test_suit.iceberg_avro_aws import TestIcebergAvroAws
from test_suit.iceberg_json_aws import TestIcebergJsonAws
from test_suit.iceberg_schema_evolution_avro_aws import TestIcebergSchemaEvolutionAvroAws
from test_suit.iceberg_schema_evolution_json_aws import TestIcebergSchemaEvolutionJsonAws
from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson
from test_suit.test_schema_evolution_json_ignore_tombstone import (
TestSchemaEvolutionJsonIgnoreTombstone,
Expand Down Expand Up @@ -636,5 +638,23 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
cloud_platform=CloudPlatform.AWS,
),
),
(
"TestIcebergSchemaEvolutionJsonAws",
EndToEndTestSuite(
test_instance=TestIcebergSchemaEvolutionJsonAws(driver, nameSalt),
run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release
run_in_apache=False, # TODO set to true after ingest-sdk 3.0.1 release
cloud_platform=CloudPlatform.AWS,
),
),
(
"TestIcebergSchemaEvolutionAvroAws",
EndToEndTestSuite(
test_instance=TestIcebergSchemaEvolutionAvroAws(driver, nameSalt),
run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release
run_in_apache=False,
cloud_platform=CloudPlatform.AWS,
),
),
]
)
14 changes: 13 additions & 1 deletion test/test_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,19 @@ def cleanTableStagePipe(self, connectorName, topicName="", partitionNumber=1):

print(datetime.now().strftime("%H:%M:%S "), "=== Done ===", flush=True)

def create_iceberg_table_with_content(self, table_name: str, external_volume: str):
def create_empty_iceberg_table(self, table_name: str, external_volume: str):
sql = """
CREATE ICEBERG TABLE IF NOT EXISTS {} (
record_content OBJECT()
)
EXTERNAL_VOLUME = '{}'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = '{}'
;
""".format(table_name, external_volume, table_name)
self.snowflake_conn.cursor().execute(sql)

def create_iceberg_table_with_sample_content(self, table_name: str, external_volume: str):
sql = """
CREATE ICEBERG TABLE IF NOT EXISTS {} (
record_content OBJECT(
Expand Down

0 comments on commit 3480d43

Please sign in to comment.