Skip to content

Commit

Permalink
add random row count and small pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Oct 25, 2023
1 parent 5188952 commit 669e182
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public class TopicPartitionChannel {
* <p>This boolean is used to indicate that we reset offset in kafka and we will only buffer once
* we see the offset which is one more than an offset present in Snowflake.
*/
private boolean isOffsetResetInKafka = false; // TODO @rcheng question: atomic?
private boolean isOffsetResetInKafka = false;

private final SnowflakeStreamingIngestClient streamingIngestClient;

Expand Down Expand Up @@ -399,12 +399,7 @@ private boolean shouldIgnoreAddingRecordToBuffer(
SinkRecord kafkaSinkRecord, final long currentProcessedOffset) {
// Don't skip rows if there is no offset reset and there is no offset token information in the
// channel
if (!isOffsetResetInKafka
&& currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
LOGGER.debug(
"No offset registered in Snowflake and offset is not being reset, we can add this offset"
+ " to buffer for channel:{}",
currentProcessedOffset);
if (!isOffsetResetInKafka) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"name": "SNOWFLAKE_CONNECTOR_NAME",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1",
"snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME",
"tasks.max": "1",
"buffer.flush.time": "60",
"buffer.count.records": "300",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
"snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY",
"snowflake.database.name": "SNOWFLAKE_DATABASE",
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"jmx": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.enable.schematization": true
}
}
99 changes: 99 additions & 0 deletions test/test_suit/test_schema_evolution_w_random_row_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import json
import random

from test_suit.test_utils import NonRetryableError
from time import sleep


# test if the ingestion works when the schematization alter table invalidation happens
# halfway through a batch
class TestSchemaEvolutionWithRandomRowCount:
def __init__(self, driver, nameSalt):
self.driver = driver
self.fileName = "test_schema_evolution_w_random_row_count"
self.topics = []
self.table = self.fileName + nameSalt

# records
self.initialRecordCount = random.randrange(1,300)
self.flushRecordCount = 300
self.recordNum = self.initialRecordCount + self.flushRecordCount

for i in range(2):
self.topics.append(self.table + str(i))

self.records = []

self.records.append({
'PERFORMANCE_STRING': 'Excellent',
'PERFORMANCE_CHAR': 'A',
'RATING_INT': 100
})

self.records.append({
'PERFORMANCE_STRING': 'Excellent',
'RATING_DOUBLE': 0.99,
'APPROVAL': True
})

self.gold_type = {
'PERFORMANCE_STRING': 'VARCHAR',
'PERFORMANCE_CHAR': 'VARCHAR',
'RATING_INT': 'NUMBER',
'RATING_DOUBLE': 'FLOAT',
'APPROVAL': 'BOOLEAN',
'RECORD_METADATA': 'VARIANT'
}

self.gold_columns = [columnName for columnName in self.gold_type]

def getConfigFileName(self):
return self.fileName + ".json"

def send(self):
print("Got random record count of {}".format(str(self.initialRecordCount)))

for i, topic in enumerate(self.topics):
# send initial batch
key = []
value = []
for e in range(self.initialRecordCount):
key.append(json.dumps({'number': str(e)}).encode('utf-8'))
value.append(json.dumps(self.records[i]).encode('utf-8'))
self.driver.sendBytesData(topic, value, key)

# send second batch that should flush
key = []
value = []
for e in range(self.flushRecordCount):
key.append(json.dumps({'number': str(e)}).encode('utf-8'))
value.append(json.dumps(self.records[i]).encode('utf-8'))
self.driver.sendBytesData(topic, value, key)

def verify(self, round):
sleep(60)
rows = self.driver.snowflake_conn.cursor().execute(
"desc table {}".format(self.table)).fetchall()
res_col = {}

for index, row in enumerate(rows):
self.gold_columns.remove(row[0])
if not row[1].startswith(self.gold_type[row[0]]):
raise NonRetryableError("Column {} has the wrong type. got: {}, expected: {}".format(row[0], row[1],
self.gold_type[
row[0]]))
res_col[row[0]] = index

print("Columns not in table: ", self.gold_columns)

for columnName in self.gold_columns:
raise NonRetryableError("Column {} was not created".format(columnName))

res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.table)).fetchone()[0]
if res != len(self.topics) * self.recordNum:
print("Number of record expected: {}, got: {}".format(len(self.topics) * self.recordNum, res))
raise NonRetryableError("Number of record in table is different from number of record sent")

def clean(self):
self.driver.cleanTableStagePipe(self.table)
6 changes: 6 additions & 0 deletions test/test_suites.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
TestSchemaEvolutionWithAutoTableCreationAvroSR
from test_suit.test_schema_evolution_w_auto_table_creation_json import \
TestSchemaEvolutionWithAutoTableCreationJson
from test_suit.test_schema_evolution_w_random_row_count import \
TestSchemaEvolutionWithRandomRowCount
from test_suit.test_schema_mapping import TestSchemaMapping
from test_suit.test_schema_not_supported_converter import TestSchemaNotSupportedConverter
from test_suit.test_snowpipe_streaming_schema_mapping_dlq import TestSnowpipeStreamingSchemaMappingDLQ
Expand Down Expand Up @@ -185,6 +187,10 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
test_instance=TestSchemaEvolutionWithAutoTableCreationAvroSR(driver, nameSalt), clean=True,
run_in_confluent=True, run_in_apache=False
)),
("TestSchemaEvolutionWithRandomRowCount", EndToEndTestSuite(
test_instance=TestSchemaEvolutionWithRandomRowCount(driver, nameSalt), clean=True,
run_in_confluent=True, run_in_apache=True
)),
("TestSchemaEvolutionNonNullableJson", EndToEndTestSuite(
test_instance=TestSchemaEvolutionNonNullableJson(driver, nameSalt), clean=True, run_in_confluent=True,
run_in_apache=True
Expand Down

0 comments on commit 669e182

Please sign in to comment.