Skip to content

Commit

Permalink
Add end to end test which disables the migration code path
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-japatel committed Nov 21, 2023
1 parent 21697ad commit 0b2b4a6
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"name": "SNOWFLAKE_CONNECTOR_NAME",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC",
"tasks.max": "1",
"buffer.flush.time": "60",
"buffer.count.records": "300",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
"snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY",
"snowflake.database.name": "SNOWFLAKE_DATABASE",
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"enable.streaming.channel.offset.migration": "false",
"jmx": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.enable.schematization": true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import datetime

from test_suit.test_utils import RetryableError, NonRetryableError
import json
from time import sleep

class TestSnowpipeStreamingStringJsonChannelMigrationDisabled:
def __init__(self, driver, nameSalt):
self.driver = driver
self.fileName = "travis_correct_snowpipe_streaming_string_json_channel_migration_disabled"
self.topic = self.fileName + nameSalt

self.topicNum = 1
self.partitionNum = 3
self.recordNum = 1000

# create topic and partitions in constructor since the post REST api will automatically create topic with only one partition
self.driver.createTopics(self.topic, partitionNum=self.partitionNum, replicationNum=1)

def getConfigFileName(self):
return self.fileName + ".json"

def send(self):
# create topic with n partitions and only one replication factor
print("Partition count:" + str(self.partitionNum))
print("Topic:", self.topic)

self.driver.describeTopic(self.topic)

for p in range(self.partitionNum):
print("Sending in Partition:" + str(p))
key = []
value = []

# send two less record because we are sending tombstone records. tombstone ingestion is enabled by default
for e in range(self.recordNum - 2):
value.append(json.dumps(
{'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(e)}
).encode('utf-8'))

# append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477
if self.driver.testVersion == '2.5.1':
value.append(json.dumps(
{'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(self.recordNum - 1)}
).encode('utf-8'))
value.append(json.dumps(
{'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(self.recordNum)}
).encode('utf-8'))
else:
value.append(None)
value.append("") # community converters treat this as a tombstone

self.driver.sendBytesData(self.topic, value, key, partition=p)
sleep(2)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
print("Count records in table {}={}".format(self.topic, str(res)))
if res < (self.recordNum * self.partitionNum):
print("Topic:" + self.topic + " count is less, will retry")
raise RetryableError()
elif res > (self.recordNum * self.partitionNum):
print("Topic:" + self.topic + " count is more, duplicates detected")
raise NonRetryableError("Duplication occurred, number of record in table is larger than number of record sent")
else:
print("Table:" + self.topic + " count is exactly " + str(self.recordNum * self.partitionNum))

# for duplicates
res = self.driver.snowflake_conn.cursor().execute("Select record_metadata:\"offset\"::string as OFFSET_NO,record_metadata:\"partition\"::string as PARTITION_NO from {} group by OFFSET_NO, PARTITION_NO having count(*)>1".format(self.topic)).fetchone()
print("Duplicates:{}".format(res))
if res is not None:
raise NonRetryableError("Duplication detected")

# for uniqueness in offset numbers
rows = self.driver.snowflake_conn.cursor().execute("Select count(distinct record_metadata:\"offset\"::number) as UNIQUE_OFFSETS,record_metadata:\"partition\"::number as PARTITION_NO from {} group by PARTITION_NO order by PARTITION_NO".format(self.topic)).fetchall()

if rows is None:
raise NonRetryableError("Unique offsets for partitions not found")
else:
assert len(rows) == 3

for p in range(self.partitionNum):
# unique offset count and partition no are two columns (returns tuple)
if rows[p][0] != self.recordNum or rows[p][1] != p:
raise NonRetryableError("Unique offsets for partitions count doesnt match")

def clean(self):
# dropping of stage and pipe doesnt apply for snowpipe streaming. (It executes drop if exists)
self.driver.cleanTableStagePipe(self.topic)
return
4 changes: 4 additions & 0 deletions test/test_suites.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from test_suit.test_string_avrosr import TestStringAvrosr
from test_suit.test_string_json import TestStringJson
from test_suit.test_string_json_ignore_tombstone import TestStringJsonIgnoreTombstone
from test_suit.test_snowpipe_streaming_channel_migration_disabled import TestSnowpipeStreamingStringJsonChannelMigrationDisabled


class EndToEndTestSuite:
Expand Down Expand Up @@ -134,6 +135,9 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
test_instance=TestSnowpipeStreamingStringJson(driver, nameSalt), clean=True, run_in_confluent=True,
run_in_apache=True
)),
("TestSnowpipeStreamingStringJsonChannelMigrationDisabled", EndToEndTestSuite(
test_instance=TestSnowpipeStreamingStringJsonChannelMigrationDisabled(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
)),
("TestSnowpipeStreamingStringJsonIgnoreTombstone", EndToEndTestSuite(
test_instance=TestSnowpipeStreamingStringJsonIgnoreTombstone(driver, nameSalt), clean=True,
run_in_confluent=True,
Expand Down

0 comments on commit 0b2b4a6

Please sign in to comment.