Skip to content

Commit

Permalink
SNOW-1728000 Iceberg e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Nov 19, 2024
1 parent 896c345 commit d350401
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 50 deletions.
28 changes: 28 additions & 0 deletions test/rest_request_template/iceberg_avro_aws.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"name": "SNOWFLAKE_CONNECTOR_NAME",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC",
"tasks.max": "1",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
"snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY",
"snowflake.database.name": "SNOWFLAKE_DATABASE",
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
"value.converter.schemas.enable": "false",
"jmx": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": "true"
}
}
9 changes: 3 additions & 6 deletions test/rest_request_template/iceberg_json_aws.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC",
"tasks.max": "1",
"buffer.flush.time": "10",
"buffer.count.records": "100",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
Expand All @@ -14,9 +12,6 @@
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.streaming.enable.single.buffer": true,
"snowflake.streaming.closeChannelsInParallel.enabled": true,
"snowflake.enable.schematization": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
Expand All @@ -25,6 +20,8 @@
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true
"snowflake.streaming.iceberg.enabled": true,
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": "true"
}
}
6 changes: 3 additions & 3 deletions test/run_test_confluent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ KAFKA_CONNECT_PLUGIN_PATH="/usr/local/share/kafka/plugins"
echo "Built zip file using kafka connect maven plugin:"
ls /tmp/sf-kafka-connect-plugin*
# Plugin path is used by kafka connect to install plugin, in our case, SF Kafka Connector
unzip /tmp/sf-kafka-connect-plugin.zip -d $KAFKA_CONNECT_PLUGIN_PATH
echo "list KAFKA_CONNECT_PLUGIN_PATH: $KAFKA_CONNECT_PLUGIN_PATH"
ls $KAFKA_CONNECT_PLUGIN_PATH
#unzip /tmp/sf-kafka-connect-plugin.zip -d $KAFKA_CONNECT_PLUGIN_PATH
#echo "list KAFKA_CONNECT_PLUGIN_PATH: $KAFKA_CONNECT_PLUGIN_PATH"
#ls $KAFKA_CONNECT_PLUGIN_PATH

# Copy the sample connect log4j properties file to appropriate directory
echo "Copying connect-log4j.properties file to confluent folder"
Expand Down
26 changes: 26 additions & 0 deletions test/test_suit/assertions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from test_suit.test_utils import NonRetryableError


def assert_equals(expected, actual):
if expected != actual:
raise NonRetryableError('Actual {} does not equal expected {}'.format(actual, expected))


def assert_equals_with_precision(expected, actual, precision = 0.1):
if not expected - precision < actual < expected + precision:
raise NonRetryableError('Actual {} does not equal expected {} with precision {}'.format(actual, expected, precision))


def assert_starts_with(expected_prefix, actual):
if not actual.startswith(expected_prefix):
raise NonRetryableError('Actual {} does not start with {}'.format(expected_prefix, actual))


def assert_not_null(actual):
if actual is None:
raise NonRetryableError('Actual {} is null'.format(actual))


def assert_dict_contains(expected_key, expected_value, actual_dict):
if actual_dict[expected_key] != expected_value:
raise NonRetryableError('Actual value from dict {} does not equal expected {}'.format(actual_dict[expected_key], expected_value))
38 changes: 38 additions & 0 deletions test/test_suit/base_iceberg_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from test_suit.base_e2e import BaseE2eTest
from test_suit.assertions import *

class BaseIcebergTest(BaseE2eTest):

def __init__(self, driver, nameSalt):
self.driver = driver
self.test_message = {
"id": 1,
"body_temperature": 36.6,
"name": "Steve",
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed": {"dogs": True, "cats": False},
}
self.test_headers = [("header1", "value1")]

def verify_iceberg_content(self, content: dict):
assert_equals(1, content['id'])
assert_equals_with_precision(36.6, content['body_temperature'])
assert_equals('Steve', content['name'])

assert_equals('Espresso', content['approved_coffee_types'][0])
assert_equals('Doppio', content['approved_coffee_types'][1])
assert_equals('Ristretto', content['approved_coffee_types'][2])
assert_equals('Lungo', content['approved_coffee_types'][3])

assert_equals(True, content['animals_possessed']['dogs'])
assert_equals(False, content['animals_possessed']['cats'])


def verify_iceberg_metadata(self, metadata: dict):
assert_equals(0, metadata['offset'])
assert_equals(0, metadata['partition'])
assert_starts_with('iceberg_', metadata['topic'])
assert_not_null(metadata['SnowflakeConnectorPushTime'])

assert_dict_contains('header1', 'value1', metadata['headers'])

85 changes: 85 additions & 0 deletions test/test_suit/iceberg_avro_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from test_suit.test_utils import RetryableError, NonRetryableError
import json
from confluent_kafka import avro
from test_suit.base_iceberg_test import BaseIcebergTest


class TestIcebergAvroAws(BaseIcebergTest):
def __init__(self, driver, nameSalt: str):
BaseIcebergTest.__init__(self, driver, nameSalt)
self.fileName = "iceberg_avro_aws"
self.topic = self.fileName + nameSalt

valueSchemaStr = """
{
"type":"record",
"name":"value_schema",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "body_temperature",
"type": "float"
},
{
"name": "name",
"type": "string"
},
{
"name": "approved_coffee_types",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "animals_possessed",
"type": {
"type": "map",
"values": "boolean"
}
}
]
}
"""
self.valueSchema = avro.loads(valueSchemaStr)

def getConfigFileName(self):
return self.fileName + ".json"

def setup(self):
self.driver.create_iceberg_table_with_content(
table_name=self.topic,
external_volume="kafka_push_e2e_volume_aws", # volume created manually
)

def send(self):
value = []

for e in range(100):
value.append(self.test_message)

self.driver.sendAvroSRData(
topic = self.topic,
value = value,
value_schema = self.valueSchema,
headers = self.test_headers
)

def verify(self, round):
number_of_records = self.driver.select_number_of_records(self.topic)
if number_of_records == 0:
raise RetryableError()
elif number_of_records != 100:
raise NonRetryableError("Number of record in table is different from number of record sent")

first_record = self.driver.snowflake_conn.cursor().execute(
"Select * from {} limit 1".format(self.topic)).fetchone()

self.verify_iceberg_content(json.loads(first_record[0]))
self.verify_iceberg_metadata(json.loads(first_record[1]))

def clean(self):
self.driver.drop_iceberg_table(self.topic)
50 changes: 33 additions & 17 deletions test/test_suit/iceberg_json_aws.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import datetime

from test_suit.test_utils import RetryableError, NonRetryableError
import json
from time import sleep
from test_suit.base_e2e import BaseE2eTest
from test_suit.base_iceberg_test import BaseIcebergTest


class TestIcebergJsonAws(BaseE2eTest):
class TestIcebergJsonAws(BaseIcebergTest):
def __init__(self, driver, nameSalt: str):
self.driver = driver
BaseIcebergTest.__init__(self, driver, nameSalt)
self.fileName = "iceberg_json_aws"
self.topic = self.fileName + nameSalt

Expand All @@ -22,20 +19,39 @@ def setup(self):
)

def send(self):
msg = json.dumps(
{
"id": 1,
"body_temperature": 36.6,
"name": "Steve",
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed": {"dogs": True, "cats": False},
}
msg = json.dumps(self.test_message)

key = []
value = []
for e in range(100):
key.append(json.dumps({"number": str(e)}).encode("utf-8"))
value.append(msg.encode("utf-8"))

self.driver.sendBytesData(
topic = self.topic,
value = value,
key = key,
partition = 0,
headers = self.test_headers
)

def verify(self, round):
res = self.driver.select_number_of_records(self.topic)
print("Count records in table {}={}".format(self.topic, str(res)))
number_of_records = self.driver.select_number_of_records(self.topic)
if number_of_records == 0:
raise RetryableError()
elif number_of_records != 100:
raise NonRetryableError(
"Number of record in table is different from number of record sent"
)

first_record = (
self.driver.snowflake_conn.cursor()
.execute("Select * from {} limit 1".format(self.topic))
.fetchone()
)

self.verify_iceberg_content(json.loads(first_record[0]))
self.verify_iceberg_metadata(json.loads(first_record[1]))

def clean(self):
self.driver.drop_iceberg_table(self.topic)
return
1 change: 0 additions & 1 deletion test/test_suit/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import re
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from test_suit.base_e2e import BaseE2eTest


class Error(Exception):
Expand Down
50 changes: 30 additions & 20 deletions test/test_suites.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from test_suit.test_avro_avro import TestAvroAvro
from test_suit.test_avrosr_avrosr import TestAvrosrAvrosr
from test_suit.test_confluent_protobuf_protobuf import TestConfluentProtobufProtobuf
# from test_suit.test_confluent_protobuf_protobuf import TestConfluentProtobufProtobuf
from test_suit.test_json_json import TestJsonJson
from test_suit.test_multiple_topic_to_one_table_snowpipe import (
TestMultipleTopicToOneTableSnowpipe,
Expand All @@ -40,6 +40,7 @@
TestSchemaEvolutionAvroSRLogicalTypes,
)
from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable
from test_suit.iceberg_avro_aws import TestIcebergAvroAws
from test_suit.iceberg_json_aws import TestIcebergJsonAws
from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson
from test_suit.test_schema_evolution_json_ignore_tombstone import (
Expand Down Expand Up @@ -88,7 +89,7 @@
from test_suit.test_snowpipe_streaming_string_json_ignore_tombstone import (
TestSnowpipeStreamingStringJsonIgnoreTombstone,
)
from test_suit.test_native_string_protobuf import TestNativeStringProtobuf
# from test_suit.test_native_string_protobuf import TestNativeStringProtobuf
from test_suit.test_string_avro import TestStringAvro
from test_suit.test_string_avrosr import TestStringAvrosr
from test_suit.test_string_json import TestStringJson
Expand Down Expand Up @@ -236,15 +237,15 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
cloud_platform=CloudPlatform.ALL,
),
),
(
"TestNativeStringProtobuf",
EndToEndTestSuite(
test_instance=TestNativeStringProtobuf(driver, nameSalt),
run_in_confluent=True,
run_in_apache=True,
cloud_platform=CloudPlatform.ALL,
),
),
# (
# "TestNativeStringProtobuf",
# EndToEndTestSuite(
# test_instance=TestNativeStringProtobuf(driver, nameSalt),
# run_in_confluent=True,
# run_in_apache=True,
# cloud_platform=CloudPlatform.ALL,
# ),
# ),
(
"TestNullableValuesAfterSmt",
EndToEndTestSuite(
Expand All @@ -254,15 +255,15 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
cloud_platform=CloudPlatform.ALL,
),
),
(
"TestConfluentProtobufProtobuf",
EndToEndTestSuite(
test_instance=TestConfluentProtobufProtobuf(driver, nameSalt),
run_in_confluent=False,
run_in_apache=False,
cloud_platform=CloudPlatform.ALL,
),
),
# (
# "TestConfluentProtobufProtobuf",
# EndToEndTestSuite(
# test_instance=TestConfluentProtobufProtobuf(driver, nameSalt),
# run_in_confluent=False,
# run_in_apache=False,
# cloud_platform=CloudPlatform.ALL,
# ),
# ),
(
"TestSnowpipeStreamingNullableValuesAfterSmt",
EndToEndTestSuite(
Expand Down Expand Up @@ -626,5 +627,14 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
cloud_platform=CloudPlatform.AWS,
),
),
(
"TestIcebergAvroAws",
EndToEndTestSuite(
test_instance=TestIcebergAvroAws(driver, nameSalt),
run_in_confluent=True,
run_in_apache=False,
cloud_platform=CloudPlatform.AWS,
),
),
]
)
Loading

0 comments on commit d350401

Please sign in to comment.