Skip to content

Commit

Permalink
SNOW-1728000 Iceberg e2e tests (#1002)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski authored Nov 20, 2024
1 parent 896c345 commit d969e97
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 29 deletions.
28 changes: 28 additions & 0 deletions test/rest_request_template/iceberg_avro_aws.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"name": "SNOWFLAKE_CONNECTOR_NAME",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC",
"tasks.max": "1",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
"snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY",
"snowflake.database.name": "SNOWFLAKE_DATABASE",
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
"value.converter.schemas.enable": "false",
"jmx": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": "true"
}
}
9 changes: 3 additions & 6 deletions test/rest_request_template/iceberg_json_aws.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC",
"tasks.max": "1",
"buffer.flush.time": "10",
"buffer.count.records": "100",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
Expand All @@ -14,9 +12,6 @@
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.streaming.enable.single.buffer": true,
"snowflake.streaming.closeChannelsInParallel.enabled": true,
"snowflake.enable.schematization": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
Expand All @@ -25,6 +20,8 @@
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true
"snowflake.streaming.iceberg.enabled": true,
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": "true"
}
}
38 changes: 38 additions & 0 deletions test/test_suit/assertions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from test_suit.test_utils import NonRetryableError


def assert_equals(expected, actual):
if expected != actual:
raise NonRetryableError(
"Actual {} does not equal expected {}".format(actual, expected)
)


def assert_equals_with_precision(expected, actual, precision=0.1):
if not expected - precision < actual < expected + precision:
raise NonRetryableError(
"Actual {} does not equal expected {} with precision {}".format(
actual, expected, precision
)
)


def assert_starts_with(expected_prefix, actual):
if not actual.startswith(expected_prefix):
raise NonRetryableError(
"Actual {} does not start with {}".format(expected_prefix, actual)
)


def assert_not_null(actual):
if actual is None:
raise NonRetryableError("Actual {} is null".format(actual))


def assert_dict_contains(expected_key, expected_value, actual_dict):
if actual_dict[expected_key] != expected_value:
raise NonRetryableError(
"Actual value from dict {} does not equal expected {}".format(
actual_dict[expected_key], expected_value
)
)
38 changes: 38 additions & 0 deletions test/test_suit/base_iceberg_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from test_suit.base_e2e import BaseE2eTest
from test_suit.assertions import *

class BaseIcebergTest(BaseE2eTest):

def __init__(self, driver, nameSalt):
self.driver = driver
self.test_message = {
"id": 1,
"body_temperature": 36.6,
"name": "Steve",
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed": {"dogs": True, "cats": False},
}
self.test_headers = [("header1", "value1")]

def verify_iceberg_content(self, content: dict):
assert_equals(1, content['id'])
assert_equals_with_precision(36.6, content['body_temperature'])
assert_equals('Steve', content['name'])

assert_equals('Espresso', content['approved_coffee_types'][0])
assert_equals('Doppio', content['approved_coffee_types'][1])
assert_equals('Ristretto', content['approved_coffee_types'][2])
assert_equals('Lungo', content['approved_coffee_types'][3])

assert_equals(True, content['animals_possessed']['dogs'])
assert_equals(False, content['animals_possessed']['cats'])


def verify_iceberg_metadata(self, metadata: dict):
assert_equals(0, metadata['offset'])
assert_equals(0, metadata['partition'])
assert_starts_with('iceberg_', metadata['topic'])
assert_not_null(metadata['SnowflakeConnectorPushTime'])

assert_dict_contains('header1', 'value1', metadata['headers'])

90 changes: 90 additions & 0 deletions test/test_suit/iceberg_avro_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from test_suit.test_utils import RetryableError, NonRetryableError
import json
from confluent_kafka import avro
from test_suit.base_iceberg_test import BaseIcebergTest


class TestIcebergAvroAws(BaseIcebergTest):
def __init__(self, driver, nameSalt: str):
BaseIcebergTest.__init__(self, driver, nameSalt)
self.fileName = "iceberg_avro_aws"
self.topic = self.fileName + nameSalt

valueSchemaStr = """
{
"type":"record",
"name":"value_schema",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "body_temperature",
"type": "float"
},
{
"name": "name",
"type": "string"
},
{
"name": "approved_coffee_types",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "animals_possessed",
"type": {
"type": "map",
"values": "boolean"
}
}
]
}
"""
self.valueSchema = avro.loads(valueSchemaStr)

def getConfigFileName(self):
return self.fileName + ".json"

def setup(self):
self.driver.create_iceberg_table_with_content(
table_name=self.topic,
external_volume="kafka_push_e2e_volume_aws", # volume created manually
)

def send(self):
value = []

for e in range(100):
value.append(self.test_message)

self.driver.sendAvroSRData(
topic=self.topic,
value=value,
value_schema=self.valueSchema,
headers=self.test_headers,
)

def verify(self, round):
number_of_records = self.driver.select_number_of_records(self.topic)
if number_of_records == 0:
raise RetryableError()
elif number_of_records != 100:
raise NonRetryableError(
"Number of record in table is different from number of record sent"
)

first_record = (
self.driver.snowflake_conn.cursor()
.execute("Select * from {} limit 1".format(self.topic))
.fetchone()
)

self.verify_iceberg_content(json.loads(first_record[0]))
self.verify_iceberg_metadata(json.loads(first_record[1]))

def clean(self):
self.driver.drop_iceberg_table(self.topic)
50 changes: 33 additions & 17 deletions test/test_suit/iceberg_json_aws.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import datetime

from test_suit.test_utils import RetryableError, NonRetryableError
import json
from time import sleep
from test_suit.base_e2e import BaseE2eTest
from test_suit.base_iceberg_test import BaseIcebergTest


class TestIcebergJsonAws(BaseE2eTest):
class TestIcebergJsonAws(BaseIcebergTest):
def __init__(self, driver, nameSalt: str):
self.driver = driver
BaseIcebergTest.__init__(self, driver, nameSalt)
self.fileName = "iceberg_json_aws"
self.topic = self.fileName + nameSalt

Expand All @@ -22,20 +19,39 @@ def setup(self):
)

def send(self):
msg = json.dumps(
{
"id": 1,
"body_temperature": 36.6,
"name": "Steve",
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed": {"dogs": True, "cats": False},
}
msg = json.dumps(self.test_message)

key = []
value = []
for e in range(100):
key.append(json.dumps({"number": str(e)}).encode("utf-8"))
value.append(msg.encode("utf-8"))

self.driver.sendBytesData(
topic=self.topic,
value=value,
key=key,
partition=0,
headers=self.test_headers,
)

def verify(self, round):
res = self.driver.select_number_of_records(self.topic)
print("Count records in table {}={}".format(self.topic, str(res)))
number_of_records = self.driver.select_number_of_records(self.topic)
if number_of_records == 0:
raise RetryableError()
elif number_of_records != 100:
raise NonRetryableError(
"Number of record in table is different from number of record sent"
)

first_record = (
self.driver.snowflake_conn.cursor()
.execute("Select * from {} limit 1".format(self.topic))
.fetchone()
)

self.verify_iceberg_content(json.loads(first_record[0]))
self.verify_iceberg_metadata(json.loads(first_record[1]))

def clean(self):
self.driver.drop_iceberg_table(self.topic)
return
1 change: 0 additions & 1 deletion test/test_suit/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import re
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from test_suit.base_e2e import BaseE2eTest


class Error(Exception):
Expand Down
14 changes: 12 additions & 2 deletions test/test_suites.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
TestSchemaEvolutionAvroSRLogicalTypes,
)
from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable
from test_suit.iceberg_avro_aws import TestIcebergAvroAws
from test_suit.iceberg_json_aws import TestIcebergJsonAws
from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson
from test_suit.test_schema_evolution_json_ignore_tombstone import (
Expand Down Expand Up @@ -621,8 +622,17 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
"TestIcebergJsonAws",
EndToEndTestSuite(
test_instance=TestIcebergJsonAws(driver, nameSalt),
run_in_confluent=True,
run_in_apache=True,
run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release
run_in_apache=False, # TODO set to true after ingest-sdk 3.0.1 release
cloud_platform=CloudPlatform.AWS,
),
),
(
"TestIcebergAvroAws",
EndToEndTestSuite(
test_instance=TestIcebergAvroAws(driver, nameSalt),
run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release
run_in_apache=False,
cloud_platform=CloudPlatform.AWS,
),
),
Expand Down
6 changes: 3 additions & 3 deletions test/test_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,17 @@ def sendBytesData(self, topic, value, key=[], partition=0, headers=[]):
self.producer.flush()
self.producer.flush()

def sendAvroSRData(self, topic, value, value_schema, key=[], key_schema="", partition=0):
def sendAvroSRData(self, topic, value, value_schema, key=[], key_schema="", partition=0, headers=[]):
if len(key) == 0:
for i, v in enumerate(value):
self.avroProducer.produce(
topic=topic, value=v, value_schema=value_schema, partition=partition)
topic=topic, value=v, value_schema=value_schema, partition=partition, headers=headers)
if (i + 1) % self.MAX_FLUSH_BUFFER_SIZE == 0:
self.producer.flush()
else:
for i, (k, v) in enumerate(zip(key, value)):
self.avroProducer.produce(
topic=topic, value=v, value_schema=value_schema, key=k, key_schema=key_schema, partition=partition)
topic=topic, value=v, value_schema=value_schema, key=k, key_schema=key_schema, partition=partition, headers=headers)
if (i + 1) % self.MAX_FLUSH_BUFFER_SIZE == 0:
self.producer.flush()
self.avroProducer.flush()
Expand Down

0 comments on commit d969e97

Please sign in to comment.