diff --git a/pom.xml b/pom.xml index 55ee540bc..c28c22fd3 100644 --- a/pom.xml +++ b/pom.xml @@ -346,7 +346,7 @@ org.apache.avro avro - 1.11.3 + 1.11.1 com.fasterxml.jackson.core diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java index 70ef9130e..4fccf8eef 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java @@ -566,6 +566,7 @@ public boolean shouldSkipNullValue( // get valueSchema Schema valueSchema = record.valueSchema(); if (valueSchema instanceof SnowflakeJsonSchema) { + // TODO SNOW-916052: will not skip if record.value() == null // we can conclude this is a custom/KC defined converter. // i.e one of SFJson, SFAvro and SFAvroWithSchemaRegistry Converter if (record.value() instanceof SnowflakeRecordContent) { diff --git a/test/rest_request_template/test_string_json_ignore_tombstone.json b/test/rest_request_template/test_string_json_ignore_tombstone.json index d045dc556..3c8a62bfc 100644 --- a/test/rest_request_template/test_string_json_ignore_tombstone.json +++ b/test/rest_request_template/test_string_json_ignore_tombstone.json @@ -10,7 +10,8 @@ "snowflake.database.name":"SNOWFLAKE_DATABASE", "snowflake.schema.name":"SNOWFLAKE_SCHEMA", "key.converter":"org.apache.kafka.connect.storage.StringConverter", - "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", "behavior.on.null.values": "IGNORE" } } diff --git a/test/test_suit/test_schema_evolution_json.py b/test/test_suit/test_schema_evolution_json.py index d00ee90ef..579d3964b 100644 --- a/test/test_suit/test_schema_evolution_json.py +++ b/test/test_suit/test_schema_evolution_json.py @@ -55,20 +55,17 @@ def send(self): key = [] value = [] - # send two less record because we are sending tombstone records. tombstone ingestion is enabled by default - for e in range(self.recordNum - 2): + # send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default + for e in range(self.recordNum - 1): key.append(json.dumps({'number': str(e)}).encode('utf-8')) value.append(json.dumps(self.records[i]).encode('utf-8')) # append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477 if self.driver.testVersion == '2.5.1': value.append(json.dumps(self.records[i]).encode('utf-8')) - value.append(json.dumps(self.records[i]).encode('utf-8')) else: - value.append(None) - value.append("") # community converters treat this as a tombstone - key.append(json.dumps({'number': str(self.recordNum - 1)}).encode('utf-8')) - key.append(json.dumps({'number': str(self.recordNum)}).encode('utf-8')) + value.append('') + key.append(json.dumps({'number': str(i)}).encode('utf-8')) self.driver.sendBytesData(topic, value, key) diff --git a/test/test_suit/test_schema_evolution_json_ignore_tombstone.py b/test/test_suit/test_schema_evolution_json_ignore_tombstone.py index 9c5d669c5..18685b4f4 100644 --- a/test/test_suit/test_schema_evolution_json_ignore_tombstone.py +++ b/test/test_suit/test_schema_evolution_json_ignore_tombstone.py @@ -55,17 +55,15 @@ def send(self): key = [] value = [] - # send two less record because we are sending tombstone records. tombstone ingestion is enabled by default - for e in range(self.recordNum - 2): + # send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default + for e in range(self.recordNum - 1): key.append(json.dumps({'number': str(e)}).encode('utf-8')) value.append(json.dumps(self.records[i]).encode('utf-8')) # append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477 if self.driver.testVersion != '2.5.1': - key.append(json.dumps({'number': str(self.recordNum - 1)}).encode('utf-8')) - value.append(None) - key.append(json.dumps({'number': str(self.recordNum)}).encode('utf-8')) - value.append("") # community converters treat this as a tombstone + value.append('') + key.append(json.dumps({'number': str(i)}).encode('utf-8')) self.driver.sendBytesData(topic, value, key) @@ -89,7 +87,7 @@ def verify(self, round): res = self.driver.snowflake_conn.cursor().execute( "SELECT count(*) FROM {}".format(self.table)).fetchone()[0] - if res != len(self.topics) * (self.recordNum - 2): + if res != len(self.topics) * (self.recordNum - 1): print("Number of record expected: {}, got: {}".format(len(self.topics) * (self.recordNum - 1), res)) raise NonRetryableError("Number of record in table is different from number of record sent") diff --git a/test/test_suit/test_snowpipe_streaming_string_json.py b/test/test_suit/test_snowpipe_streaming_string_json.py index 4cf3a19c6..eb2f5f3bf 100644 --- a/test/test_suit/test_snowpipe_streaming_string_json.py +++ b/test/test_suit/test_snowpipe_streaming_string_json.py @@ -32,23 +32,19 @@ def send(self): key = [] value = [] - # send two less record because we are sending tombstone records. tombstone ingestion is enabled by default - for e in range(self.recordNum - 2): + # send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default + for e in range(self.recordNum - 1): value.append(json.dumps( {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(e)} ).encode('utf-8')) # append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477 if self.driver.testVersion == '2.5.1': - value.append(json.dumps( - {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(self.recordNum - 1)} - ).encode('utf-8')) value.append(json.dumps( {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(self.recordNum)} ).encode('utf-8')) else: - value.append(None) - value.append("") # community converters treat this as a tombstone + value.append('') self.driver.sendBytesData(self.topic, value, key, partition=p) sleep(2) diff --git a/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py b/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py index 57611c4b9..9d9437167 100644 --- a/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py +++ b/test/test_suit/test_snowpipe_streaming_string_json_ignore_tombstone.py @@ -32,16 +32,15 @@ def send(self): key = [] value = [] - # send two less record because we are sending tombstone records. tombstone ingestion is enabled by default - for e in range(self.recordNum - 2): + # send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default + for e in range(self.recordNum - 1): value.append(json.dumps( {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(e)} ).encode('utf-8')) # append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477 if self.driver.testVersion != '2.5.1': - value.append(None) - value.append("") # community converters treat this as a tombstone + value.append('') self.driver.sendBytesData(self.topic, value, key, partition=p) sleep(2) @@ -50,7 +49,7 @@ def verify(self, round): res = self.driver.snowflake_conn.cursor().execute( "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] print("Count records in table {}={}".format(self.topic, str(res))) - goalCount = (self.recordNum - 2) * self.partitionNum + goalCount = (self.recordNum - 1) * self.partitionNum if res < goalCount: print("Topic:" + self.topic + " count is less, will retry") raise RetryableError() @@ -76,7 +75,7 @@ def verify(self, round): for p in range(self.partitionNum): # unique offset count and partition no are two columns (returns tuple) - if rows[p][0] != (self.recordNum - 2) or rows[p][1] != p: + if rows[p][0] != (self.recordNum - 1) or rows[p][1] != p: raise NonRetryableError("Unique offsets for partitions count doesnt match") def clean(self): diff --git a/test/test_suit/test_string_json.py b/test/test_suit/test_string_json.py index 9a8c93577..69a1e1ac1 100644 --- a/test/test_suit/test_string_json.py +++ b/test/test_suit/test_string_json.py @@ -7,7 +7,6 @@ def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "travis_correct_string_json" self.topic = self.fileName + nameSalt - self.recordCount = 100 def getConfigFileName(self): return self.fileName + ".json" @@ -15,18 +14,17 @@ def getConfigFileName(self): def send(self): value = [] - # send two less record because we are sending tombstone records. tombstone ingestion is enabled by default - for e in range(self.recordCount - 2): + # send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default + for e in range(99): value.append(json.dumps({'number': str(e)}).encode('utf-8')) # append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477 if self.driver.testVersion == '2.5.1': value.append(json.dumps( - {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(self.recordCount)} + {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(100)} ).encode('utf-8')) else: - value.append(None) - value.append("") # custom sf converters treat this as a normal record + value.append('') header = [('header1', 'value1'), ('header2', '{}')] self.driver.sendBytesData(self.topic, value, [], 0, header) @@ -34,10 +32,9 @@ def send(self): def verify(self, round): res = self.driver.snowflake_conn.cursor().execute( "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] - if res == 0: raise RetryableError() - elif res != self.recordCount: + elif res != 100: raise NonRetryableError("Number of record in table is different from number of record sent") # validate content of line 1 diff --git a/test/test_suit/test_string_json_ignore_tombstone.py b/test/test_suit/test_string_json_ignore_tombstone.py index b3fe77495..b049d4fb0 100644 --- a/test/test_suit/test_string_json_ignore_tombstone.py +++ b/test/test_suit/test_string_json_ignore_tombstone.py @@ -7,7 +7,6 @@ def __init__(self, driver, nameSalt): self.driver = driver self.fileName = "test_string_json_ignore_tombstone" self.topic = self.fileName + nameSalt - self.recordCount = 100 def getConfigFileName(self): return self.fileName + ".json" @@ -15,15 +14,13 @@ def getConfigFileName(self): def send(self): value = [] - # send two less record because we are sending tombstone records. tombstone ingestion is enabled by default - for e in range(self.recordCount - 2): + # send one less record because we are sending a tombstone record. tombstone ingestion is enabled by default + for e in range(99): value.append(json.dumps({'number': str(e)}).encode('utf-8')) # append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477 if self.driver.testVersion != '2.5.1': - value.append(None) - - value.append("") # custom sf converters treat this as a normal record + value.append('') header = [('header1', 'value1'), ('header2', '{}')] self.driver.sendBytesData(self.topic, value, [], 0, header) @@ -31,7 +28,7 @@ def send(self): def verify(self, round): res = self.driver.snowflake_conn.cursor().execute( "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] - goalCount = self.recordCount - 1 # custom sf converters treat this as a normal record, so only None value is ignored + goalCount = 99 print("Got " + str(res) + " rows. Expected " + str(goalCount) + " rows") if res == 0: raise RetryableError()