Skip to content

Commit

Permalink
SNOW-1728000 WIP Iceberg e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Nov 19, 2024
1 parent 896c345 commit 66cd8b6
Show file tree
Hide file tree
Showing 12 changed files with 229 additions and 37 deletions.
28 changes: 28 additions & 0 deletions test/rest_request_template/iceberg_avro_aws.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"name": "SNOWFLAKE_CONNECTOR_NAME",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC",
"tasks.max": "1",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
"snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY",
"snowflake.database.name": "SNOWFLAKE_DATABASE",
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
"value.converter.schemas.enable": "false",
"jmx": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": "true"
}
}
9 changes: 3 additions & 6 deletions test/rest_request_template/iceberg_json_aws.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC",
"tasks.max": "1",
"buffer.flush.time": "10",
"buffer.count.records": "100",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
Expand All @@ -14,9 +12,6 @@
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.streaming.enable.single.buffer": true,
"snowflake.streaming.closeChannelsInParallel.enabled": true,
"snowflake.enable.schematization": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
Expand All @@ -25,6 +20,8 @@
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true
"snowflake.streaming.iceberg.enabled": true,
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": "true"
}
}
4 changes: 2 additions & 2 deletions test/run_test_confluent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ ls $KAFKA_CONNECT_PLUGIN_PATH
echo "Copying connect-log4j.properties file to confluent folder"
cp -fr ./connect-log4j.properties $CONFLUENT_FOLDER_NAME/"etc/kafka/"

compile_protobuf_converter_and_data $TEST_SET $CONFLUENT_FOLDER_NAME
#compile_protobuf_converter_and_data $TEST_SET $CONFLUENT_FOLDER_NAME

trap "pkill -9 -P $$" SIGINT SIGTERM EXIT

Expand Down Expand Up @@ -174,7 +174,7 @@ python3 test_verify.py $SNOWFLAKE_KAFKA_ADDRESS http://$LOCAL_IP:$SC_PORT $LOCAL

# record_thread_count 2>&1 &
# Send test data and verify DB result from Python
python3 test_verify.py $SNOWFLAKE_KAFKA_ADDRESS http://$LOCAL_IP:$SC_PORT $LOCAL_IP:$KC_PORT $TEST_SET $CONFLUENT_VERSION $NAME_SALT $PRESSURE $SSL $SKIP_PROXY $TESTS
python3 -u test_verify.py $SNOWFLAKE_KAFKA_ADDRESS http://$LOCAL_IP:$SC_PORT $LOCAL_IP:$KC_PORT $TEST_SET $CONFLUENT_VERSION $NAME_SALT $PRESSURE $SSL $SKIP_PROXY $TESTS
testError=$?

# delete_connectors_with_salt $NAME_SALT $LOCAL_IP $KC_PORT
Expand Down
2 changes: 2 additions & 0 deletions test/test_data/avro/README.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Avro binary file generated from json:
java -jar avro-tools-1.12.0.jar fromjson --schema-file test_user.avsc steve.json > steve.avro
Binary file added test/test_data/avro/steve.avro
Binary file not shown.
7 changes: 7 additions & 0 deletions test/test_data/avro/steve.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"id": 1,
"body_temperature": 36.6,
"name": "Steve",
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed": {"dogs": true, "cats": false}
}
33 changes: 33 additions & 0 deletions test/test_data/avro/test_user.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"namespace": "com.snowflake.kafka.connector",
"type": "record",
"name": "TestUser",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "body_temperature",
"type": "float"
},
{
"name": "name",
"type": "string"
},
{
"name": "approved_coffee_types",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "animals_possessed",
"type": {
"type": "map",
"values": "boolean"
}
}
]
}
26 changes: 26 additions & 0 deletions test/test_suit/assertions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from test_suit.test_utils import NonRetryableError


def assert_equals(expected, actual):
if expected != actual:
raise NonRetryableError('Actual {} does not equal expected {}'.format(actual, expected))


def assert_equals_with_precision(expected, actual, precision = 0.1):
if not expected - precision < actual < expected + precision:
raise NonRetryableError('Actual {} does not equal expected {} with precision {}'.format(actual, expected, precision))


def assert_starts_with(expected_prefix, actual):
if not actual.startswith(expected_prefix):
raise NonRetryableError('Actual {} does not start with {}'.format(expected_prefix, actual))


def assert_not_null(actual):
if actual is None:
raise NonRetryableError('Actual {} is null'.format(actual))


def assert_dict_contains(expected_key, expected_value, actual_dict):
if actual_dict[expected_key] != expected_value:
raise NonRetryableError('Actual value from dict {} does not equal expected {}'.format(actual_dict[expected_key], expected_value))
27 changes: 27 additions & 0 deletions test/test_suit/base_iceberg_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from test_suit.base_e2e import BaseE2eTest
from test_suit.assertions import *

class BaseIcebergTest(BaseE2eTest):

def verify_iceberg_content(self, content: dict):
assert_equals(1, content['id'])
assert_equals_with_precision(36.6, content['body_temperature'])
assert_equals('Steve', content['name'])

assert_equals('Espresso', content['approved_coffee_types'][0])
assert_equals('Doppio', content['approved_coffee_types'][1])
assert_equals('Ristretto', content['approved_coffee_types'][2])
assert_equals('Lungo', content['approved_coffee_types'][3])

assert_equals(True, content['animals_possessed']['dogs'])
assert_equals(False, content['animals_possessed']['cats'])


def verify_iceberg_metadata(self, metadata: dict):
assert_equals(0, metadata['offset'])
assert_equals(0, metadata['partition'])
assert_starts_with('iceberg_json_aws_', metadata['topic'])
assert_not_null(metadata['SnowflakeConnectorPushTime'])

assert_dict_contains('header1', 'value1', metadata['headers'])

46 changes: 46 additions & 0 deletions test/test_suit/iceberg_avro_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from test_suit.test_utils import RetryableError, NonRetryableError
import json
from test_suit.base_iceberg_test import BaseIcebergTest


class TestIcebergAvroAws(BaseIcebergTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.fileName = "iceberg_avro_aws"
self.topic = self.fileName + nameSalt

def getConfigFileName(self):
return self.fileName + ".json"

def setup(self):
self.driver.create_iceberg_table_with_content(
table_name=self.topic,
external_volume="kafka_push_e2e_volume_aws", # volume created manually
)


def send(self):
avroBytes = open(self.driver.TEST_DATA_FOLDER + "avro/steve.avro", "rb").read()
key = []
value = []

for e in range(100):
key.append(json.dumps({'number': str(e)}).encode('utf-8'))
value.append(avroBytes)
self.driver.sendBytesData(self.topic, value, key)

def verify(self, round):
number_of_records = self.driver.select_number_of_records(self.topic)
if number_of_records == 0:
raise RetryableError()
elif number_of_records != 100:
raise NonRetryableError("Number of record in table is different from number of record sent")

first_record = self.driver.snowflake_conn.cursor().execute(
"Select * from {} limit 1".format(self.topic)).fetchone()

self.verify_iceberg_content(json.loads(first_record[0]))
self.verify_iceberg_metadata(json.loads(first_record[1]))

def clean(self):
self.driver.drop_iceberg_table(self.topic)
34 changes: 25 additions & 9 deletions test/test_suit/iceberg_json_aws.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import datetime

from test_suit.test_utils import RetryableError, NonRetryableError
import json
from time import sleep
from test_suit.base_e2e import BaseE2eTest

from test_suit.base_iceberg_test import BaseIcebergTest

class TestIcebergJsonAws(BaseE2eTest):
class TestIcebergJsonAws(BaseIcebergTest):
def __init__(self, driver, nameSalt: str):
self.driver = driver
self.fileName = "iceberg_json_aws"
Expand All @@ -32,10 +28,30 @@ def send(self):
}
)

headers = [('header1', 'value1')]

key = []
value = []
for e in range(100):
key.append(json.dumps({'number': str(e)}).encode('utf-8'))
value.append(msg.encode('utf-8'))

self.driver.sendBytesData(self.topic, value, key, 0, headers)


def verify(self, round):
res = self.driver.select_number_of_records(self.topic)
print("Count records in table {}={}".format(self.topic, str(res)))
number_of_records = self.driver.select_number_of_records(self.topic)
if number_of_records == 0:
raise RetryableError()
elif number_of_records != 100:
raise NonRetryableError("Number of record in table is different from number of record sent")

first_record = self.driver.snowflake_conn.cursor().execute(
"Select * from {} limit 1".format(self.topic)).fetchone()

self.verify_iceberg_content(json.loads(first_record[0]))
self.verify_iceberg_metadata(json.loads(first_record[1]))


def clean(self):
self.driver.drop_iceberg_table(self.topic)
return
50 changes: 30 additions & 20 deletions test/test_suites.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from test_suit.test_avro_avro import TestAvroAvro
from test_suit.test_avrosr_avrosr import TestAvrosrAvrosr
from test_suit.test_confluent_protobuf_protobuf import TestConfluentProtobufProtobuf
# from test_suit.test_confluent_protobuf_protobuf import TestConfluentProtobufProtobuf
from test_suit.test_json_json import TestJsonJson
from test_suit.test_multiple_topic_to_one_table_snowpipe import (
TestMultipleTopicToOneTableSnowpipe,
Expand All @@ -40,6 +40,7 @@
TestSchemaEvolutionAvroSRLogicalTypes,
)
from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable
from test_suit.iceberg_avro_aws import TestIcebergAvroAws
from test_suit.iceberg_json_aws import TestIcebergJsonAws
from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson
from test_suit.test_schema_evolution_json_ignore_tombstone import (
Expand Down Expand Up @@ -88,7 +89,7 @@
from test_suit.test_snowpipe_streaming_string_json_ignore_tombstone import (
TestSnowpipeStreamingStringJsonIgnoreTombstone,
)
from test_suit.test_native_string_protobuf import TestNativeStringProtobuf
# from test_suit.test_native_string_protobuf import TestNativeStringProtobuf
from test_suit.test_string_avro import TestStringAvro
from test_suit.test_string_avrosr import TestStringAvrosr
from test_suit.test_string_json import TestStringJson
Expand Down Expand Up @@ -236,15 +237,15 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
cloud_platform=CloudPlatform.ALL,
),
),
(
"TestNativeStringProtobuf",
EndToEndTestSuite(
test_instance=TestNativeStringProtobuf(driver, nameSalt),
run_in_confluent=True,
run_in_apache=True,
cloud_platform=CloudPlatform.ALL,
),
),
# (
# "TestNativeStringProtobuf",
# EndToEndTestSuite(
# test_instance=TestNativeStringProtobuf(driver, nameSalt),
# run_in_confluent=True,
# run_in_apache=True,
# cloud_platform=CloudPlatform.ALL,
# ),
# ),
(
"TestNullableValuesAfterSmt",
EndToEndTestSuite(
Expand All @@ -254,15 +255,15 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
cloud_platform=CloudPlatform.ALL,
),
),
(
"TestConfluentProtobufProtobuf",
EndToEndTestSuite(
test_instance=TestConfluentProtobufProtobuf(driver, nameSalt),
run_in_confluent=False,
run_in_apache=False,
cloud_platform=CloudPlatform.ALL,
),
),
# (
# "TestConfluentProtobufProtobuf",
# EndToEndTestSuite(
# test_instance=TestConfluentProtobufProtobuf(driver, nameSalt),
# run_in_confluent=False,
# run_in_apache=False,
# cloud_platform=CloudPlatform.ALL,
# ),
# ),
(
"TestSnowpipeStreamingNullableValuesAfterSmt",
EndToEndTestSuite(
Expand Down Expand Up @@ -626,5 +627,14 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
cloud_platform=CloudPlatform.AWS,
),
),
(
"TestIcebergAvroAws",
EndToEndTestSuite(
test_instance=TestIcebergAvroAws(driver, nameSalt),
run_in_confluent=True,
run_in_apache=True,
cloud_platform=CloudPlatform.AWS,
),
),
]
)

0 comments on commit 66cd8b6

Please sign in to comment.