Skip to content

Commit

Permalink
SNOW-1728000 Creating Iceberg table in e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Oct 24, 2024
1 parent 204e311 commit 679fe47
Show file tree
Hide file tree
Showing 59 changed files with 239 additions and 120 deletions.
30 changes: 30 additions & 0 deletions test/rest_request_template/iceberg_json_aws.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"name": "SNOWFLAKE_CONNECTOR_NAME",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC",
"tasks.max": "1",
"buffer.flush.time": "10",
"buffer.count.records": "100",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
"snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY",
"snowflake.database.name": "SNOWFLAKE_DATABASE",
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.streaming.enable.single.buffer": true,
"snowflake.streaming.closeChannelsInParallel.enabled": true,
"snowflake.enable.schematization": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"jmx": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true
}
}
1 change: 1 addition & 0 deletions test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class TestExecutor:
def execute(self, testSuitList, driver, nameSalt, round=1):
try:
for test in testSuitList:
test.setup()
driver.createConnector(test.getConfigFileName(), nameSalt)

driver.startConnectorWaitTime()
Expand Down
3 changes: 3 additions & 0 deletions test/test_suit/base_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class BaseE2eTest:
def setup(self):
pass
41 changes: 41 additions & 0 deletions test/test_suit/iceberg_json_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import datetime

from test_suit.test_utils import RetryableError, NonRetryableError
import json
from time import sleep
from test_suit.base_e2e import BaseE2eTest


class TestIcebergJsonAws(BaseE2eTest):
def __init__(self, driver, nameSalt: str):
self.driver = driver
self.fileName = "iceberg_json_aws"
self.topic = self.fileName + nameSalt

def getConfigFileName(self):
return self.fileName + ".json"

def setup(self):
self.driver.create_iceberg_table_with_content(
table_name=self.topic,
external_volume="kafka_push_e2e_volume_aws", # volume created manually
)

def send(self):
msg = json.dumps(
{
"id": 1,
"body_temperature": 36.6,
"name": "Steve",
"approved_coffee_types": ["Espresso"],
"animals_possessed": {"dogs": True, "cats": False},
}
)

def verify(self, round):
res = self.driver.select_number_of_records(self.topic)
print("Count records in table {}={}".format(self.topic, str(res)))

def clean(self):
self.driver.drop_iceberg_table(self.topic)
return
5 changes: 2 additions & 3 deletions test/test_suit/resilience_tests/test_kc_delete_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# creates the connector
# sends data 2/2
# verifies that 2 rounds of data were ingested
class TestKcDeleteCreate:
class TestKcDeleteCreate(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.nameSalt = nameSalt
Expand Down Expand Up @@ -47,8 +47,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# creates the connector
# sends data 3/3
# verifies that 3 rounds of data were ingested
class TestKcDeleteCreateChaos:
class TestKcDeleteCreateChaos(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.nameSalt = nameSalt
Expand Down Expand Up @@ -50,8 +50,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
5 changes: 2 additions & 3 deletions test/test_suit/resilience_tests/test_kc_delete_resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# resumes the connector (will not work, because connector was deleted)
# sends data 2/2
# verifies that 1 round of data was ingested
class TestKcDeleteResume:
class TestKcDeleteResume(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.nameSalt = nameSalt
Expand Down Expand Up @@ -48,8 +48,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# resumes the connector (will not work, because connector was deleted)
# sends data 2/3
# verifies that 2-3 rounds of data was ingested, since some round 2 data may have been ingested prior to connector deletion
class TestKcDeleteResumeChaos:
class TestKcDeleteResumeChaos(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.nameSalt = nameSalt
Expand Down Expand Up @@ -53,8 +53,7 @@ def verify(self, round):
# since the pressure is applied during deletion, some of the data may be ingested, so look for a range
goalCountUpper = self.recordNum * self.expectedsends
goalCountLower = self.recordNum * (self.expectedsends - 1)
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count between: {} - {}".format(self.topic, str(res), str(goalCountLower), str(goalCountUpper)))

Expand Down
5 changes: 2 additions & 3 deletions test/test_suit/resilience_tests/test_kc_pause_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# creates the connector
# sends data 2/2
# verifies that 2 rounds of data were ingested
class TestKcPauseCreate:
class TestKcPauseCreate(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.nameSalt = nameSalt
Expand Down Expand Up @@ -47,8 +47,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
5 changes: 2 additions & 3 deletions test/test_suit/resilience_tests/test_kc_pause_create_chaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# creates the connector
# sends data 3/3
# verifies that 3 rounds of data were ingested
class TestKcPauseCreateChaos:
class TestKcPauseCreateChaos(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.nameSalt = nameSalt
Expand Down Expand Up @@ -50,8 +50,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
5 changes: 2 additions & 3 deletions test/test_suit/resilience_tests/test_kc_pause_resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# resumes the connector
# sends data 2/2
# verifies that 2 rounds of data were ingested
class TestKcPauseResume:
class TestKcPauseResume(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.nameSalt = nameSalt
Expand Down Expand Up @@ -47,8 +47,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
5 changes: 2 additions & 3 deletions test/test_suit/resilience_tests/test_kc_pause_resume_chaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# resumes the connector
# sends data 3/3
# verifies that 3 rounds of data were ingested
class TestKcPauseResumeChaos:
class TestKcPauseResumeChaos(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.nameSalt = nameSalt
Expand Down Expand Up @@ -50,8 +50,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
5 changes: 2 additions & 3 deletions test/test_suit/resilience_tests/test_kc_recreate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# creates the connector 2/2
# sends data 2/2
# verifies that 2 rounds of data were ingested
class TestKcRecreate:
class TestKcRecreate(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.nameSalt = nameSalt
Expand Down Expand Up @@ -47,8 +47,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
5 changes: 2 additions & 3 deletions test/test_suit/resilience_tests/test_kc_recreate_chaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# creates the connector 2/2
# sends data 3/3
# verifies that 3 rounds of data were ingested
class TestKcRecreateChaos:
class TestKcRecreateChaos(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.nameSalt = nameSalt
Expand Down Expand Up @@ -48,8 +48,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
5 changes: 2 additions & 3 deletions test/test_suit/resilience_tests/test_kc_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

# a pressure test cannot be created with this, since restart is just one method
# restarting the connector and restarting the connector and all its tasks should technically be separate tests, but should be fine to group here
class TestKcRestart:
class TestKcRestart(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.nameSalt = nameSalt
Expand Down Expand Up @@ -53,8 +53,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
6 changes: 3 additions & 3 deletions test/test_suit/test_auto_table_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from confluent_kafka import avro
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry import Schema
from test_suit.base_e2e import BaseE2eTest

# SR -> Schema Registry
# Runs only in confluent test suite environment
class TestAutoTableCreation:
class TestAutoTableCreation(BaseE2eTest):
def __init__(self, driver, nameSalt, schemaRegistryAddress, testSet):
self.driver = driver
self.fileName = "travis_correct_auto_table_creation"
Expand Down Expand Up @@ -118,8 +119,7 @@ def verify(self, round):
raise NonRetryableError("Missing column {}".format(key))


res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
3 changes: 2 additions & 1 deletion test/test_suit/test_auto_table_creation_topic2table.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from confluent_kafka import avro
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry import Schema
from test_suit.base_e2e import BaseE2eTest

# SR -> Schema Registry
# Runs only in confluent test suite environment
class TestAutoTableCreationTopic2Table:
class TestAutoTableCreationTopic2Table(BaseE2eTest):
def __init__(self, driver, nameSalt, schemaRegistryAddress, testSet):
self.driver = driver
self.fileName = "travis_correct_auto_table_creation_topic2table"
Expand Down
6 changes: 3 additions & 3 deletions test/test_suit/test_avro_avro.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from test_suit.test_utils import RetryableError, NonRetryableError
from test_suit.base_e2e import BaseE2eTest


class TestAvroAvro:
class TestAvroAvro(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.fileName = "travis_correct_avro_avro"
Expand All @@ -21,8 +22,7 @@ def send(self):
self.driver.sendBytesData(self.topic, value, key)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
6 changes: 3 additions & 3 deletions test/test_suit/test_avrosr_avrosr.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from test_suit.test_utils import RetryableError, NonRetryableError
from confluent_kafka import avro
from test_suit.base_e2e import BaseE2eTest


class TestAvrosrAvrosr:
class TestAvrosrAvrosr(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.fileName = "travis_correct_avrosr_avrosr"
Expand Down Expand Up @@ -48,8 +49,7 @@ def send(self):
self.topic, value, self.valueSchema, key, self.keySchema)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
6 changes: 3 additions & 3 deletions test/test_suit/test_confluent_protobuf_protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka import SerializingProducer
from test_suit.base_e2e import BaseE2eTest

import time

class TestConfluentProtobufProtobuf:
class TestConfluentProtobufProtobuf(BaseE2eTest):
def __init__(self, driver, nameSalt):
self.driver = driver
self.fileName = "travis_correct_confluent_protobuf_protobuf"
Expand Down Expand Up @@ -50,8 +51,7 @@ def send(self):
self.protobufProducer.flush()

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
Loading

0 comments on commit 679fe47

Please sign in to comment.