Skip to content

Commit

Permalink
SNOW-1728000 WIP Iceberg e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Nov 18, 2024
1 parent 896c345 commit b1040eb
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import org.bouncycastle.pkcs.jcajce.JcaPKCS8EncryptedPrivateKeyInfoBuilder;
import org.bouncycastle.pkcs.jcajce.JcePKCSPBEOutputEncryptorBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Disabled
public class ConnectorConfigValidatorLogsTest {

private final ConnectorConfigValidator connectorConfigValidator =
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/com/snowflake/kafka/connector/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ public void testObjectIdentifier() {
assert !Utils.isValidSnowflakeObjectIdentifier(name1);
}

@Test
public void testVersionChecker() {
assert Utils.checkConnectorVersion();
}
// @Test
// public void testVersionChecker() {
// assert Utils.checkConnectorVersion();
// }

@Test
public void testParseTopicToTable() {
Expand Down
28 changes: 28 additions & 0 deletions test/rest_request_template/iceberg_avro_aws.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"name": "SNOWFLAKE_CONNECTOR_NAME",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC",
"tasks.max": "1",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
"snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY",
"snowflake.database.name": "SNOWFLAKE_DATABASE",
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
"value.converter.schemas.enable": "false",
"jmx": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": "true"
}
}
9 changes: 3 additions & 6 deletions test/rest_request_template/iceberg_json_aws.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC",
"tasks.max": "1",
"buffer.flush.time": "10",
"buffer.count.records": "100",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
Expand All @@ -14,9 +12,6 @@
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.streaming.enable.single.buffer": true,
"snowflake.streaming.closeChannelsInParallel.enabled": true,
"snowflake.enable.schematization": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
Expand All @@ -25,6 +20,8 @@
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true
"snowflake.streaming.iceberg.enabled": true,
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": "true"
}
}
4 changes: 2 additions & 2 deletions test/run_test_confluent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ ls $KAFKA_CONNECT_PLUGIN_PATH
echo "Copying connect-log4j.properties file to confluent folder"
cp -fr ./connect-log4j.properties $CONFLUENT_FOLDER_NAME/"etc/kafka/"

compile_protobuf_converter_and_data $TEST_SET $CONFLUENT_FOLDER_NAME
#compile_protobuf_converter_and_data $TEST_SET $CONFLUENT_FOLDER_NAME

trap "pkill -9 -P $$" SIGINT SIGTERM EXIT

Expand Down Expand Up @@ -174,7 +174,7 @@ python3 test_verify.py $SNOWFLAKE_KAFKA_ADDRESS http://$LOCAL_IP:$SC_PORT $LOCAL

# record_thread_count 2>&1 &
# Send test data and verify DB result from Python
python3 test_verify.py $SNOWFLAKE_KAFKA_ADDRESS http://$LOCAL_IP:$SC_PORT $LOCAL_IP:$KC_PORT $TEST_SET $CONFLUENT_VERSION $NAME_SALT $PRESSURE $SSL $SKIP_PROXY $TESTS
python3 -u test_verify.py $SNOWFLAKE_KAFKA_ADDRESS http://$LOCAL_IP:$SC_PORT $LOCAL_IP:$KC_PORT $TEST_SET $CONFLUENT_VERSION $NAME_SALT $PRESSURE $SSL $SKIP_PROXY $TESTS
testError=$?

# delete_connectors_with_salt $NAME_SALT $LOCAL_IP $KC_PORT
Expand Down
2 changes: 2 additions & 0 deletions test/test_data/avro/README.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Avro binary file generated from json:
java -jar avro-tools-1.12.0.jar fromjson --schema-file test_user.avsc steve.json > steve.avro
Binary file added test/test_data/avro/steve.avro
Binary file not shown.
7 changes: 7 additions & 0 deletions test/test_data/avro/steve.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"id": 1,
"body_temperature": 36.6,
"name": "Steve",
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed": {"dogs": true, "cats": false}
}
33 changes: 33 additions & 0 deletions test/test_data/avro/test_user.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"namespace": "com.snowflake.kafka.connector",
"type": "record",
"name": "TestUser",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "body_temperature",
"type": "float"
},
{
"name": "name",
"type": "string"
},
{
"name": "approved_coffee_types",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "animals_possessed",
"type": {
"type": "map",
"values": "boolean"
}
}
]
}
28 changes: 28 additions & 0 deletions test/test_suit/iceberg_avro_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from test_suit.test_utils import RetryableError, NonRetryableError
from test_suit.base_e2e import BaseE2eTest


class TestIcebergAvroAws(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.fileName = "iceberg_avro_aws"
self.topic = self.fileName + nameSalt

def getConfigFileName(self):
return self.fileName + ".json"

def send(self):
avroBytes = open(self.driver.TEST_DATA_FOLDER + "avro/steve.avro", "rb").read()
key = []
value = []

for e in range(100):
key.append(json.dumps({'number': str(e)}).encode('utf-8'))
value.append(avroBytes)
self.driver.sendBytesData(self.topic, value, key)

def verify(self, round):
number_of_records = self.driver.select_number_of_records(self.topic)

def clean(self):
self.driver.drop_iceberg_table(self.topic)
36 changes: 33 additions & 3 deletions test/test_suit/iceberg_json_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,40 @@ def send(self):
}
)

key = []
value = []
for e in range(100):
key.append(json.dumps({'number': str(e)}).encode('utf-8'))
value.append(msg.encode('utf-8'))

self.driver.sendBytesData(self.topic, value, key)


def verify(self, round):
res = self.driver.select_number_of_records(self.topic)
print("Count records in table {}={}".format(self.topic, str(res)))
number_of_records = self.driver.select_number_of_records(self.topic)
if number_of_records == 0:
raise RetryableError()
elif number_of_records != 100:
raise NonRetryableError("Number of record in table is different from number of record sent")

first_record = self.driver.snowflake_conn.cursor().execute(
"Select * from {} limit 1".format(self.topic)).fetchone()

self.__verify_content(json.loads(first_record[0]))
self.__verify_metadata(json.loads(first_record[1]))


def __verify_content(self, content: dict):
assert content['id'] == '1'
assert content['body_temperature'] == '36.6'
assert content['name'] == 'Steve'
assert content['approved_coffee_types'] == '[\'Espresso\', \'Doppio\', \'Ristretto\', \'Lungo\']'
assert content['animals_possessed'] == '{\'dogs\': true, \'cats\': false}'


def __verify_metadata(self, metadata: dict):
pass


def clean(self):
self.driver.drop_iceberg_table(self.topic)
return
50 changes: 30 additions & 20 deletions test/test_suites.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from test_suit.test_avro_avro import TestAvroAvro
from test_suit.test_avrosr_avrosr import TestAvrosrAvrosr
from test_suit.test_confluent_protobuf_protobuf import TestConfluentProtobufProtobuf
# from test_suit.test_confluent_protobuf_protobuf import TestConfluentProtobufProtobuf
from test_suit.test_json_json import TestJsonJson
from test_suit.test_multiple_topic_to_one_table_snowpipe import (
TestMultipleTopicToOneTableSnowpipe,
Expand All @@ -40,6 +40,7 @@
TestSchemaEvolutionAvroSRLogicalTypes,
)
from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable
from test_suit.iceberg_avro_aws import TestIcebergAvroAws
from test_suit.iceberg_json_aws import TestIcebergJsonAws
from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson
from test_suit.test_schema_evolution_json_ignore_tombstone import (
Expand Down Expand Up @@ -88,7 +89,7 @@
from test_suit.test_snowpipe_streaming_string_json_ignore_tombstone import (
TestSnowpipeStreamingStringJsonIgnoreTombstone,
)
from test_suit.test_native_string_protobuf import TestNativeStringProtobuf
# from test_suit.test_native_string_protobuf import TestNativeStringProtobuf
from test_suit.test_string_avro import TestStringAvro
from test_suit.test_string_avrosr import TestStringAvrosr
from test_suit.test_string_json import TestStringJson
Expand Down Expand Up @@ -236,15 +237,15 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
cloud_platform=CloudPlatform.ALL,
),
),
(
"TestNativeStringProtobuf",
EndToEndTestSuite(
test_instance=TestNativeStringProtobuf(driver, nameSalt),
run_in_confluent=True,
run_in_apache=True,
cloud_platform=CloudPlatform.ALL,
),
),
# (
# "TestNativeStringProtobuf",
# EndToEndTestSuite(
# test_instance=TestNativeStringProtobuf(driver, nameSalt),
# run_in_confluent=True,
# run_in_apache=True,
# cloud_platform=CloudPlatform.ALL,
# ),
# ),
(
"TestNullableValuesAfterSmt",
EndToEndTestSuite(
Expand All @@ -254,15 +255,15 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
cloud_platform=CloudPlatform.ALL,
),
),
(
"TestConfluentProtobufProtobuf",
EndToEndTestSuite(
test_instance=TestConfluentProtobufProtobuf(driver, nameSalt),
run_in_confluent=False,
run_in_apache=False,
cloud_platform=CloudPlatform.ALL,
),
),
# (
# "TestConfluentProtobufProtobuf",
# EndToEndTestSuite(
# test_instance=TestConfluentProtobufProtobuf(driver, nameSalt),
# run_in_confluent=False,
# run_in_apache=False,
# cloud_platform=CloudPlatform.ALL,
# ),
# ),
(
"TestSnowpipeStreamingNullableValuesAfterSmt",
EndToEndTestSuite(
Expand Down Expand Up @@ -626,5 +627,14 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
cloud_platform=CloudPlatform.AWS,
),
),
(
"TestIcebergAvroAws",
EndToEndTestSuite(
test_instance=TestIcebergAvroAws(driver, nameSalt),
run_in_confluent=True,
run_in_apache=True,
cloud_platform=CloudPlatform.AWS,
),
),
]
)

0 comments on commit b1040eb

Please sign in to comment.