From 76ecc7cebd2c4be6e74df6b48846c6a40a4cecb7 Mon Sep 17 00:00:00 2001 From: Khyati Soneji <134907874+khsoneji@users.noreply.github.com> Date: Mon, 4 Dec 2023 12:53:22 +0530 Subject: [PATCH] Revert "Revert commits (#46)" (#48) This reverts commit bd64a9235de4cbccd8e59795b148ebd59b9912d9. --- pom.xml | 2 +- .../kafka/connector/records/RecordService.java | 1 - .../test_string_json_ignore_tombstone.json | 3 +-- test/test_suit/test_schema_evolution_json.py | 11 +++++++---- .../test_schema_evolution_json_ignore_tombstone.py | 12 +++++++----- .../test_snowpipe_streaming_string_json.py | 10 +++++++--- ...owpipe_streaming_string_json_ignore_tombstone.py | 11 ++++++----- test/test_suit/test_string_json.py | 13 ++++++++----- test/test_suit/test_string_json_ignore_tombstone.py | 11 +++++++---- 9 files changed, 44 insertions(+), 30 deletions(-) diff --git a/pom.xml b/pom.xml index c28c22fd3..55ee540bc 100644 --- a/pom.xml +++ b/pom.xml @@ -346,7 +346,7 @@ org.apache.avro avro - 1.11.1 + 1.11.3 com.fasterxml.jackson.core diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java index 4fccf8eef..70ef9130e 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java @@ -566,7 +566,6 @@ public boolean shouldSkipNullValue( // get valueSchema Schema valueSchema = record.valueSchema(); if (valueSchema instanceof SnowflakeJsonSchema) { - // TODO SNOW-916052: will not skip if record.value() == null // we can conclude this is a custom/KC defined converter. // i.e one of SFJson, SFAvro and SFAvroWithSchemaRegistry Converter if (record.value() instanceof SnowflakeRecordContent) { diff --git a/test/rest_request_template/test_string_json_ignore_tombstone.json b/test/rest_request_template/test_string_json_ignore_tombstone.json index 3c8a62bfc..d045dc556 100644 --- a/test/rest_request_template/test_string_json_ignore_tombstone.json +++ b/test/rest_request_template/test_string_json_ignore_tombstone.json @@ -10,8 +10,7 @@ "snowflake.database.name":"SNOWFLAKE_DATABASE", "snowflake.schema.name":"SNOWFLAKE_SCHEMA", "key.converter":"org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", + "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter", "behavior.on.null.values": "IGNORE" } } diff --git a/test/test_suit/test_schema_evolution_json.py b/test/test_suit/test_schema_evolution_json.py index 579d3964b..d00ee90ef 100644 --- a/test/test_suit/test_schema_evolution_json.py +++ b/test/test_suit/test_schema_evolution_json.py @@ -55,17 +55,20 @@ def send(self): key = [] value = [] - # send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default - for e in range(self.recordNum - 1): + # send two less record because we are sending tombstone records. tombstone ingestion is enabled by default + for e in range(self.recordNum - 2): key.append(json.dumps({'number': str(e)}).encode('utf-8')) value.append(json.dumps(self.records[i]).encode('utf-8')) # append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477 if self.driver.testVersion == '2.5.1': value.append(json.dumps(self.records[i]).encode('utf-8')) + value.append(json.dumps(self.records[i]).encode('utf-8')) else: - value.append('') - key.append(json.dumps({'number': str(i)}).encode('utf-8')) + value.append(None) + value.append("") # community converters treat this as a tombstone + key.append(json.dumps({'number': str(self.recordNum - 1)}).encode('utf-8')) + key.append(json.dumps({'number': str(self.recordNum)}).encode('utf-8')) self.driver.sendBytesData(topic, value, key) diff --git a/test/test_suit/test_schema_evolution_json_ignore_tombstone.py b/test/test_suit/test_schema_evolution_json_ignore_tombstone.py index 18685b4f4..9c5d669c5 100644 --- a/test/test_suit/test_schema_evolution_json_ignore_tombstone.py +++ b/test/test_suit/test_schema_evolution_json_ignore_tombstone.py @@ -55,15 +55,17 @@ def send(self): key = [] value = [] - # send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default - for e in range(self.recordNum - 1): + # send two less record because we are sending tombstone records. tombstone ingestion is enabled by default + for e in range(self.recordNum - 2): key.append(json.dumps({'number': str(e)}).encode('utf-8')) value.append(json.dumps(self.records[i]).encode('utf-8')) # append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477 if self.driver.testVersion != '2.5.1': - value.append('') - key.append(json.dumps({'number': str(i)}).encode('utf-8')) + key.append(json.dumps({'number': str(self.recordNum - 1)}).encode('utf-8')) + value.append(None) + key.append(json.dumps({'number': str(self.recordNum)}).encode('utf-8')) + value.append("") # community converters treat this as a tombstone self.driver.sendBytesData(topic, value, key) @@ -87,7 +89,7 @@ def verify(self, round): res = self.driver.snowflake_conn.cursor().execute( "SELECT count(*) FROM {}".format(self.table)).fetchone()[0] - if res != len(self.topics) * (self.recordNum - 1): + if res != len(self.topics) * (self.recordNum - 2): print("Number of record expected: {}, got: {}".format(len(self.topics) * (self.recordNum - 1), res)) raise NonRetryableError("Number of record in table is different from number of record sent") diff --git a/test/test_suit/test_snowpipe_streaming_string_json.py b/test/test_suit/test_snowpipe_streaming_string_json.py index eb2f5f3bf..4cf3a19c6 100644 --- a/test/test_suit/test_snowpipe_streaming_string_json.py +++ b/test/test_suit/test_snowpipe_streaming_string_json.py @@ -32,19 +32,23 @@ def send(self): key = [] value = [] - # send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default - for e in range(self.recordNum - 1): + # send two less record because we are sending tombstone records. tombstone ingestion is enabled by default + for e in range(self.recordNum - 2): value.append(json.dumps( {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(e)} ).encode('utf-8')) # append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477 if self.driver.testVersion == '2.5.1': + value.append(json.dumps( + {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(self.recordNum - 1)} + ).encode('utf-8')) value.append(json.dumps( {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(self.recordNum)} ).encode('utf-8')) else: - value.append('') + value.append(None) + value.append("") # community converters treat this as a tombstone self.driver.sendBytesData(self.topic, value, key, partition=p) sleep(2) diff --git a/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py b/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py index 9d9437167..57611c4b9 100644 --- a/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py +++ b/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py @@ -32,15 +32,16 @@ def send(self): key = [] value = [] - # send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default - for e in range(self.recordNum - 1): + # send two less record because we are sending tombstone records. tombstone ingestion is enabled by default + for e in range(self.recordNum - 2): value.append(json.dumps( {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(e)} ).encode('utf-8')) # append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477 if self.driver.testVersion != '2.5.1': - value.append('') + value.append(None) + value.append("") # community converters treat this as a tombstone self.driver.sendBytesData(self.topic, value, key, partition=p) sleep(2) @@ -49,7 +50,7 @@ def verify(self, round): res = self.driver.snowflake_conn.cursor().execute( "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] print("Count records in table {}={}".format(self.topic, str(res))) - goalCount = (self.recordNum - 1) * self.partitionNum + goalCount = (self.recordNum - 2) * self.partitionNum if res < goalCount: print("Topic:" + self.topic + " count is less, will retry") raise RetryableError() @@ -75,7 +76,7 @@ def verify(self, round): for p in range(self.partitionNum): # unique offset count and partition no are two columns (returns tuple) - if rows[p][0] != (self.recordNum - 1) or rows[p][1] != p: + if rows[p][0] != (self.recordNum - 2) or rows[p][1] != p: raise NonRetryableError("Unique offsets for partitions count doesnt match") def clean(self): diff --git a/test/test_suit/test_string_json.py b/test/test_suit/test_string_json.py index 69a1e1ac1..9a8c93577 100644 --- a/test/test_suit/test_string_json.py +++ b/test/test_suit/test_string_json.py @@ -7,6 +7,7 @@ def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_string_json" self.topic = self.fileName + nameSalt + self.recordCount = 100 def getConfigFileName(self): return self.fileName + ".json" @@ -14,17 +15,18 @@ def getConfigFileName(self): def send(self): value = [] - # send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default - for e in range(99): + # send two less record because we are sending tombstone records. tombstone ingestion is enabled by default + for e in range(self.recordCount - 2): value.append(json.dumps({'number': str(e)}).encode('utf-8')) # append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477 if self.driver.testVersion == '2.5.1': value.append(json.dumps( - {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(100)} + {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(self.recordCount)} ).encode('utf-8')) else: - value.append('') + value.append(None) + value.append("") # custom sf converters treat this as a normal record header = [('header1', 'value1'), ('header2', '{}')] self.driver.sendBytesData(self.topic, value, [], 0, header) @@ -32,9 +34,10 @@ def send(self): def verify(self, round): res = self.driver.snowflake_conn.cursor().execute( "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + if res == 0: raise RetryableError() - elif res != 100: + elif res != self.recordCount: raise NonRetryableError("Number of record in table is different from number of record sent") # validate content of line 1 diff --git a/test/test_suit/test_string_json_ignore_tombstone.py b/test/test_suit/test_string_json_ignore_tombstone.py index b049d4fb0..b3fe77495 100644 --- a/test/test_suit/test_string_json_ignore_tombstone.py +++ b/test/test_suit/test_string_json_ignore_tombstone.py @@ -7,6 +7,7 @@ def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "test_string_json_ignore_tombstone" self.topic = self.fileName + nameSalt + self.recordCount = 100 def getConfigFileName(self): return self.fileName + ".json" @@ -14,13 +15,15 @@ def getConfigFileName(self): def send(self): value = [] - # send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default - for e in range(99): + # send two less record because we are sending tombstone records. tombstone ingestion is enabled by default + for e in range(self.recordCount - 2): value.append(json.dumps({'number': str(e)}).encode('utf-8')) # append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477 if self.driver.testVersion != '2.5.1': - value.append('') + value.append(None) + + value.append("") # custom sf converters treat this as a normal record header = [('header1', 'value1'), ('header2', '{}')] self.driver.sendBytesData(self.topic, value, [], 0, header) @@ -28,7 +31,7 @@ def send(self): def verify(self, round): res = self.driver.snowflake_conn.cursor().execute( "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] - goalCount = 99 + goalCount = self.recordCount - 1 # custom sf converters treat this as a normal record, so only None value is ignored print("Got " + str(res) + " rows. Expected " + str(goalCount) + " rows") if res == 0: raise RetryableError()