Skip to content

Commit

Permalink
Revert "Revert commits (#46)" (#48)
Browse files Browse the repository at this point in the history
This reverts commit bd64a92.
  • Loading branch information
khsoneji authored Dec 4, 2023
1 parent bd64a92 commit 76ecc7c
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.1</version>
<version>1.11.3</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,6 @@ public boolean shouldSkipNullValue(
// get valueSchema
Schema valueSchema = record.valueSchema();
if (valueSchema instanceof SnowflakeJsonSchema) {
// TODO SNOW-916052: will not skip if record.value() == null
// we can conclude this is a custom/KC defined converter.
// i.e one of SFJson, SFAvro and SFAvroWithSchemaRegistry Converter
if (record.value() instanceof SnowflakeRecordContent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
"snowflake.database.name":"SNOWFLAKE_DATABASE",
"snowflake.schema.name":"SNOWFLAKE_SCHEMA",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
"behavior.on.null.values": "IGNORE"
}
}
11 changes: 7 additions & 4 deletions test/test_suit/test_schema_evolution_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,20 @@ def send(self):
key = []
value = []

# send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default
for e in range(self.recordNum - 1):
# send two less record because we are sending tombstone records. tombstone ingestion is enabled by default
for e in range(self.recordNum - 2):
key.append(json.dumps({'number': str(e)}).encode('utf-8'))
value.append(json.dumps(self.records[i]).encode('utf-8'))

# append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477
if self.driver.testVersion == '2.5.1':
value.append(json.dumps(self.records[i]).encode('utf-8'))
value.append(json.dumps(self.records[i]).encode('utf-8'))
else:
value.append('')
key.append(json.dumps({'number': str(i)}).encode('utf-8'))
value.append(None)
value.append("") # community converters treat this as a tombstone
key.append(json.dumps({'number': str(self.recordNum - 1)}).encode('utf-8'))
key.append(json.dumps({'number': str(self.recordNum)}).encode('utf-8'))

self.driver.sendBytesData(topic, value, key)

Expand Down
12 changes: 7 additions & 5 deletions test/test_suit/test_schema_evolution_json_ignore_tombstone.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,17 @@ def send(self):
key = []
value = []

# send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default
for e in range(self.recordNum - 1):
# send two less record because we are sending tombstone records. tombstone ingestion is enabled by default
for e in range(self.recordNum - 2):
key.append(json.dumps({'number': str(e)}).encode('utf-8'))
value.append(json.dumps(self.records[i]).encode('utf-8'))

# append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477
if self.driver.testVersion != '2.5.1':
value.append('')
key.append(json.dumps({'number': str(i)}).encode('utf-8'))
key.append(json.dumps({'number': str(self.recordNum - 1)}).encode('utf-8'))
value.append(None)
key.append(json.dumps({'number': str(self.recordNum)}).encode('utf-8'))
value.append("") # community converters treat this as a tombstone

self.driver.sendBytesData(topic, value, key)

Expand All @@ -87,7 +89,7 @@ def verify(self, round):

res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.table)).fetchone()[0]
if res != len(self.topics) * (self.recordNum - 1):
if res != len(self.topics) * (self.recordNum - 2):
print("Number of record expected: {}, got: {}".format(len(self.topics) * (self.recordNum - 1), res))
raise NonRetryableError("Number of record in table is different from number of record sent")

Expand Down
10 changes: 7 additions & 3 deletions test/test_suit/test_snowpipe_streaming_string_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,23 @@ def send(self):
key = []
value = []

# send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default
for e in range(self.recordNum - 1):
# send two less record because we are sending tombstone records. tombstone ingestion is enabled by default
for e in range(self.recordNum - 2):
value.append(json.dumps(
{'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(e)}
).encode('utf-8'))

# append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477
if self.driver.testVersion == '2.5.1':
value.append(json.dumps(
{'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(self.recordNum - 1)}
).encode('utf-8'))
value.append(json.dumps(
{'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(self.recordNum)}
).encode('utf-8'))
else:
value.append('')
value.append(None)
value.append("") # community converters treat this as a tombstone

self.driver.sendBytesData(self.topic, value, key, partition=p)
sleep(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ def send(self):
key = []
value = []

# send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default
for e in range(self.recordNum - 1):
# send two less record because we are sending tombstone records. tombstone ingestion is enabled by default
for e in range(self.recordNum - 2):
value.append(json.dumps(
{'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(e)}
).encode('utf-8'))

# append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477
if self.driver.testVersion != '2.5.1':
value.append('')
value.append(None)
value.append("") # community converters treat this as a tombstone

self.driver.sendBytesData(self.topic, value, key, partition=p)
sleep(2)
Expand All @@ -49,7 +50,7 @@ def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
print("Count records in table {}={}".format(self.topic, str(res)))
goalCount = (self.recordNum - 1) * self.partitionNum
goalCount = (self.recordNum - 2) * self.partitionNum
if res < goalCount:
print("Topic:" + self.topic + " count is less, will retry")
raise RetryableError()
Expand All @@ -75,7 +76,7 @@ def verify(self, round):

for p in range(self.partitionNum):
# unique offset count and partition no are two columns (returns tuple)
if rows[p][0] != (self.recordNum - 1) or rows[p][1] != p:
if rows[p][0] != (self.recordNum - 2) or rows[p][1] != p:
raise NonRetryableError("Unique offsets for partitions count doesnt match")

def clean(self):
Expand Down
13 changes: 8 additions & 5 deletions test/test_suit/test_string_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,37 @@ def __init__(self, driver, nameSalt):
self.driver = driver
self.fileName = "travis_correct_string_json"
self.topic = self.fileName + nameSalt
self.recordCount = 100

def getConfigFileName(self):
return self.fileName + ".json"

def send(self):
value = []

# send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default
for e in range(99):
# send two less record because we are sending tombstone records. tombstone ingestion is enabled by default
for e in range(self.recordCount - 2):
value.append(json.dumps({'number': str(e)}).encode('utf-8'))

# append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477
if self.driver.testVersion == '2.5.1':
value.append(json.dumps(
{'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(100)}
{'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(self.recordCount)}
).encode('utf-8'))
else:
value.append('')
value.append(None)
value.append("") # custom sf converters treat this as a normal record

header = [('header1', 'value1'), ('header2', '{}')]
self.driver.sendBytesData(self.topic, value, [], 0, header)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]

if res == 0:
raise RetryableError()
elif res != 100:
elif res != self.recordCount:
raise NonRetryableError("Number of record in table is different from number of record sent")

# validate content of line 1
Expand Down
11 changes: 7 additions & 4 deletions test/test_suit/test_string_json_ignore_tombstone.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,31 @@ def __init__(self, driver, nameSalt):
self.driver = driver
self.fileName = "test_string_json_ignore_tombstone"
self.topic = self.fileName + nameSalt
self.recordCount = 100

def getConfigFileName(self):
return self.fileName + ".json"

def send(self):
value = []

# send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default
for e in range(99):
# send two less record because we are sending tombstone records. tombstone ingestion is enabled by default
for e in range(self.recordCount - 2):
value.append(json.dumps({'number': str(e)}).encode('utf-8'))

# append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477
if self.driver.testVersion != '2.5.1':
value.append('')
value.append(None)

value.append("") # custom sf converters treat this as a normal record

header = [('header1', 'value1'), ('header2', '{}')]
self.driver.sendBytesData(self.topic, value, [], 0, header)

def verify(self, round):
res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.topic)).fetchone()[0]
goalCount = 99
goalCount = self.recordCount - 1 # custom sf converters treat this as a normal record, so only None value is ignored
print("Got " + str(res) + " rows. Expected " + str(goalCount) + " rows")
if res == 0:
raise RetryableError()
Expand Down

0 comments on commit 76ecc7c

Please sign in to comment.