Skip to content

Commit

Permalink
SNOW-1728000 Creating Iceberg table in e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Oct 24, 2024
1 parent 6803e49 commit 942063a
Show file tree
Hide file tree
Showing 37 changed files with 137 additions and 66 deletions.
30 changes: 30 additions & 0 deletions test/rest_request_template/iceberg_json_aws.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"name": "SNOWFLAKE_CONNECTOR_NAME",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC",
"tasks.max": "1",
"buffer.flush.time": "10",
"buffer.count.records": "100",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
"snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY",
"snowflake.database.name": "SNOWFLAKE_DATABASE",
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.streaming.enable.single.buffer": true,
"snowflake.streaming.closeChannelsInParallel.enabled": true,
"snowflake.enable.schematization": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"jmx": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true
}
}
39 changes: 39 additions & 0 deletions test/test_suit/iceberg_json_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import datetime

from test_suit.test_utils import RetryableError, NonRetryableError
import json
from time import sleep


class TestIcebergJsonAws:
def __init__(self, driver, nameSalt: str):
self.driver = driver
self.fileName = "iceberg_json_aws"
self.topic = self.fileName + nameSalt

self.driver.create_iceberg_table_with_content(
table_name=self.topic,
external_volume="kafka_push_e2e_volume_aws", # volume created manually
)

def getConfigFileName(self):
return self.fileName + ".json"

def send(self):
msg = json.dumps(
{
"id": 1,
"body_temperature": 36.6,
"name": "Steve",
"approved_coffee_types": ["Espresso"],
"animals_possessed": {"dogs": True, "cats": False},
}
)

def verify(self, round):
res = self.driver.select_number_of_records(self.topic)
print("Count records in table {}={}".format(self.topic, str(res)))

def clean(self):
self.driver.drop_iceberg_table(self.topic)
return
3 changes: 1 addition & 2 deletions test/test_suit/resilience_tests/test_kc_delete_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/resilience_tests/test_kc_delete_resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ def verify(self, round):
# since the pressure is applied during deletion, some of the data may be ingested, so look for a range
goalCountUpper = self.recordNum * self.expectedsends
goalCountLower = self.recordNum * (self.expectedsends - 1)
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count between: {} - {}".format(self.topic, str(res), str(goalCountLower), str(goalCountUpper)))

Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/resilience_tests/test_kc_pause_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/resilience_tests/test_kc_pause_resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/resilience_tests/test_kc_recreate.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/resilience_tests/test_kc_recreate_chaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/resilience_tests/test_kc_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ def send(self):
def verify(self, round):
# verify record count
goalCount = self.recordNum * self.expectedsends
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

print("Count records in table {}={}. Goal record count: {}".format(self.topic, str(res), str(goalCount)))

Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_auto_table_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ def verify(self, round):
raise NonRetryableError("Missing column {}".format(key))


res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_avro_avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ def send(self):
self.driver.sendBytesData(self.topic, value, key)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_avrosr_avrosr.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ def send(self):
self.topic, value, self.valueSchema, key, self.keySchema)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_confluent_protobuf_protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ def send(self):
self.protobufProducer.flush()

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_json_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ def send(self):
self.driver.sendBytesData(self.topic, value, key)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_native_string_avrosr.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ def send(self):
self.driver.sendAvroSRData(self.topic, value, self.valueSchema)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_native_string_json_without_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ def send(self):
self.driver.sendBytesData(self.topic, value)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_native_string_protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ def send(self):
self.driver.sendBytesData(self.topic, value)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_schema_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ def verify(self, round):
if not metadata_exist:
raise NonRetryableError("Metadata column was not created")

res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_schema_not_supported_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ def send(self):
self.driver.sendBytesData(self.topic, value, key)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res != 0:
raise NonRetryableError("Nothing should be ingested with not supported converters.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ def send(self):
sleep(2)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
print("Count records in table {}={}".format(self.topic, str(res)))
if res < (self.recordNum * self.partitionNum):
print("Topic:" + self.topic + " count is less, will retry")
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_snowpipe_streaming_schema_mapping_dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ def verify(self, round):
raise NonRetryableError("Metadata column was not created")

# recordNum of records should be inserted
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != self.recordNum:
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_snowpipe_streaming_string_avro_sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ def send(self):
sleep(2)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
print("Count records in table {}={}".format(self.topic, str(res)))
if res < (self.recordNum * self.partitionNum):
print("Topic:" + self.topic + " count is less, will retry")
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_snowpipe_streaming_string_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ def send(self):
sleep(2)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
print("Count records in table {}={}".format(self.topic, str(res)))
if res < (self.recordNum * self.partitionNum):
print("Topic:" + self.topic + " count is less, will retry")
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_snowpipe_streaming_string_json_dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ def send(self):
sleep(2)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
print("Count records in table {}={}".format(self.topic, str(res)))
if res > 0:
print("Topic:" + self.topic + " count is more than expected, will not retry")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ def send(self):
sleep(2)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
print("Count records in table {}={}".format(self.topic, str(res)))
goalCount = (self.recordNum - 2) * self.partitionNum
if res < goalCount:
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_streaming_client_parameter_override.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ def send(self):
sleep(2)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
print("Count records in table {}={}".format(self.topic, str(res)))
if res < (self.recordNum * self.partitionNum):
print("Topic:" + self.topic + " count is less, will retry")
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_string_avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ def send(self):
self.driver.sendBytesData(self.topic, value)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_string_avrosr.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ def send(self):
self.driver.sendAvroSRData(self.topic, value, self.valueSchema)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
if res == 0:
raise RetryableError()
elif res != 100:
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_string_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ def send(self):
self.driver.sendBytesData(self.topic, value, [], 0, header)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)

if res == 0:
raise RetryableError()
Expand Down
3 changes: 1 addition & 2 deletions test/test_suit/test_string_json_ignore_tombstone.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ def send(self):
self.driver.sendBytesData(self.topic, value, [], 0, header)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
res = self.driver.select_number_of_records(self.topic)
goalCount = self.recordCount - 1 # custom sf converters treat this as a normal record, so only None value is ignored
print("Got " + str(res) + " rows. Expected " + str(goalCount) + " rows")
if res == 0:
Expand Down
Loading

0 comments on commit 942063a

Please sign in to comment.