diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java index 4becdc448..e2b7bdd2f 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java @@ -41,6 +41,7 @@ import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ArrayNode; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.JsonNodeFactory; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.NumericNode; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.ConnectSchema; @@ -277,7 +278,7 @@ private Map getMapFromJsonNodeForStreamingIngest(JsonNode node) } else if (columnNode.isNull()) { columnValue = null; } else { - columnValue = MAPPER.writeValueAsString(columnNode); + columnValue = writeValueAsStringOrNan(columnNode); } // while the value is always dumped into a string, the Streaming Ingest SDK // will transform the value according to its type in the table @@ -291,6 +292,14 @@ private Map getMapFromJsonNodeForStreamingIngest(JsonNode node) return streamingIngestRow; } + private String writeValueAsStringOrNan(JsonNode columnNode) throws JsonProcessingException { + if (columnNode instanceof NumericNode && ((NumericNode) columnNode).isNaN()) { + return "NaN"; + } else { + return MAPPER.writeValueAsString(columnNode); + } + } + /** For now there are two columns one is content and other is metadata. Both are Json */ private static class SnowflakeTableRow { // This can be a JsonNode but we will keep this as is. diff --git a/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java index ac33b3126..cf00ddb27 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java @@ -5,26 +5,6 @@ public class SchematizationTestUtils { - public static final Map SF_AVRO_SCHEMA_FOR_TABLE_CREATION; - - static { - SF_AVRO_SCHEMA_FOR_TABLE_CREATION = new HashMap<>(); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("ID_INT8", "NUMBER"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("ID_INT8_OPTIONAL", "NUMBER"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("ID_INT16", "NUMBER"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("ID_INT32", "NUMBER"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("ID_INT64", "NUMBER"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("FIRST_NAME", "VARCHAR"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT32", "FLOAT"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT64", "FLOAT"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("APPROVAL", "BOOLEAN"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_STRING", "ARRAY"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_INT", "ARRAY"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_JSON", "ARRAY"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_MAP", "VARIANT"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RECORD_METADATA", "VARIANT"); - } - public static final Map SF_JSON_SCHEMA_FOR_TABLE_CREATION; static { @@ -43,28 +23,6 @@ public class SchematizationTestUtils { SF_JSON_SCHEMA_FOR_TABLE_CREATION.put("RECORD_METADATA", "VARIANT"); } - public static final Map CONTENT_FOR_AVRO_TABLE_CREATION; - - static { - CONTENT_FOR_AVRO_TABLE_CREATION = new HashMap<>(); - CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT8", 0L); - CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT8_OPTIONAL", null); - CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT16", 42L); - CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT32", 42L); - CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT64", 42L); - CONTENT_FOR_AVRO_TABLE_CREATION.put("FIRST_NAME", "zekai"); - CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT32", 0.99); - CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT64", 0.99); - CONTENT_FOR_AVRO_TABLE_CREATION.put("APPROVAL", true); - CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_STRING", "[\"a\",\"b\"]"); - CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_INT", "[1,2]"); - CONTENT_FOR_AVRO_TABLE_CREATION.put( - "INFO_ARRAY_JSON", - "[null,\"{\\\"a\\\":1,\\\"b\\\":null,\\\"c\\\":null,\\\"d\\\":\\\"89asda9s0a\\\"}\"]"); - CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_MAP", "{\"field\":3}"); - CONTENT_FOR_AVRO_TABLE_CREATION.put("RECORD_METADATA", "RECORD_METADATA_PLACE_HOLDER"); - } - public static final Map CONTENT_FOR_JSON_TABLE_CREATION; static { diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java index eda7545f2..610ede649 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java @@ -39,7 +39,6 @@ import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; -import com.snowflake.kafka.connector.internal.streaming.StreamingUtils; import com.snowflake.kafka.connector.records.SnowflakeJsonSchema; import com.snowflake.kafka.connector.records.SnowflakeRecordContent; import io.confluent.connect.avro.AvroConverter; @@ -78,8 +77,6 @@ import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import net.snowflake.client.jdbc.internal.google.gson.JsonObject; import net.snowflake.client.jdbc.internal.google.gson.JsonParser; -import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; -import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; @@ -887,13 +884,16 @@ public static void checkTableContentOneRow(String tableName, Map } } - public static SnowflakeStreamingIngestClient createStreamingClient( - Map config, String clientName) { - Properties clientProperties = new Properties(); - clientProperties.putAll(StreamingUtils.convertConfigForStreamingClient(new HashMap<>(config))); - return SnowflakeStreamingIngestClientFactory.builder(clientName) - .setProperties(clientProperties) - .build(); + public static Map getTableContentOneRow(String tableName) throws SQLException { + String getRowQuery = "select * from " + tableName + " limit 1"; + ResultSet result = executeQuery(getRowQuery); + result.next(); + + Map contentMap = new HashMap<>(); + for (int i = 0; i < result.getMetaData().getColumnCount(); i++) { + contentMap.put(result.getMetaData().getColumnName(i + 1), result.getObject(i + 1)); + } + return contentMap; } /** diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java new file mode 100644 index 000000000..93a6f3bc0 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java @@ -0,0 +1,243 @@ +package com.snowflake.kafka.connector.internal.streaming; + +import static com.snowflake.kafka.connector.internal.TestUtils.getTableContentOneRow; +import static com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; +import static org.awaitility.Awaitility.await; + +import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; +import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter; +import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; +import com.snowflake.kafka.connector.internal.SnowflakeSinkService; +import com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory; +import com.snowflake.kafka.connector.internal.TestUtils; +import io.confluent.connect.avro.AvroConverter; +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class SnowflakeSinkServiceV2AvroSchematizationIT { + + private static final int PARTITION = 0; + private static final int START_OFFSET = 0; + + private static final String ID_INT8 = "ID_INT8"; + private static final String ID_INT8_OPTIONAL = "ID_INT8_OPTIONAL"; + private static final String ID_INT16 = "ID_INT16"; + private static final String ID_INT32 = "ID_INT32"; + private static final String ID_INT64 = "ID_INT64"; + private static final String FIRST_NAME = "FIRST_NAME"; + private static final String RATING_FLOAT32 = "RATING_FLOAT32"; + private static final String FLOAT_NAN = "FLOAT_NAN"; + private static final String RATING_FLOAT64 = "RATING_FLOAT64"; + private static final String APPROVAL = "APPROVAL"; + private static final String INFO_ARRAY_STRING = "INFO_ARRAY_STRING"; + private static final String INFO_ARRAY_INT = "INFO_ARRAY_INT"; + private static final String INFO_ARRAY_JSON = "INFO_ARRAY_JSON"; + private static final String INFO_MAP = "INFO_MAP"; + private static final String RECORD_METADATA = "RECORD_METADATA"; + + private static final Map EXPECTED_AVRO_SCHEMA = + new HashMap() { + { + put(ID_INT8, "NUMBER"); + put(ID_INT8_OPTIONAL, "NUMBER"); + put(ID_INT16, "NUMBER"); + put(ID_INT32, "NUMBER"); + put(ID_INT64, "NUMBER"); + put(FIRST_NAME, "VARCHAR"); + put(RATING_FLOAT32, "FLOAT"); + put(FLOAT_NAN, "FLOAT"); + put(RATING_FLOAT64, "FLOAT"); + put(APPROVAL, "BOOLEAN"); + put(INFO_ARRAY_STRING, "ARRAY"); + put(INFO_ARRAY_INT, "ARRAY"); + put(INFO_ARRAY_JSON, "ARRAY"); + put(INFO_MAP, "VARIANT"); + put(RECORD_METADATA, "VARIANT"); + } + }; + + private String table; + private SnowflakeConnectionService conn; + private String topic; + private TopicPartition topicPartition; + + private SnowflakeSinkService service; + + @BeforeEach + void before() { + table = TestUtils.randomTableName(); + topic = table; + conn = TestUtils.getConnectionServiceForStreaming(); + topicPartition = new TopicPartition(topic, PARTITION); + } + + @AfterEach + void after() { + service.closeAll(); + } + + @ParameterizedTest(name = "useSingleBuffer: {0}") + @MethodSource("singleBufferParameters") + public void testSchematizationWithTableCreationAndAvroInput(boolean useSingleBuffer) + throws Exception { + // given + conn.createTableWithOnlyMetadataColumn(table); + SinkRecord avroRecordValue = createSinkRecord(); + service = createService(useSingleBuffer); + + // when + // The first insert should fail and schema evolution will kick in to update the schema + service.insert(Collections.singletonList(avroRecordValue)); + + // then + waitUntilOffsetEquals(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); + TestUtils.checkTableSchema(table, EXPECTED_AVRO_SCHEMA); + + // when + // Retry the insert should succeed now with the updated schema + service.insert(Collections.singletonList(avroRecordValue)); + + // then + waitUntilOffsetEquals(START_OFFSET + 1); + + Map actual = getTableContentOneRow(topic); + Assertions.assertEquals(actual.get(ID_INT8), 0L); + Assertions.assertNull(actual.get(ID_INT8_OPTIONAL)); + Assertions.assertEquals(actual.get(ID_INT16), 42L); + Assertions.assertEquals(actual.get(ID_INT32), 42L); + Assertions.assertEquals(actual.get(ID_INT64), 42L); + Assertions.assertEquals(actual.get(FIRST_NAME), "zekai"); + Assertions.assertEquals(actual.get(RATING_FLOAT32), 0.99); + Assertions.assertEquals( + actual.get(FLOAT_NAN), Double.NaN); // float is extended to double on SF side + Assertions.assertEquals(actual.get(RATING_FLOAT64), 0.99); + Assertions.assertEquals(actual.get(APPROVAL), true); + Assertions.assertEquals( + StringUtils.deleteWhitespace(actual.get(INFO_ARRAY_STRING).toString()), "[\"a\",\"b\"]"); + Assertions.assertEquals( + StringUtils.deleteWhitespace(actual.get(INFO_ARRAY_INT).toString()), "[1,2]"); + Assertions.assertEquals( + StringUtils.deleteWhitespace(actual.get(INFO_ARRAY_JSON).toString()), + "[null,\"{\\\"a\\\":1,\\\"b\\\":null,\\\"c\\\":null,\\\"d\\\":\\\"89asda9s0a\\\"}\"]"); + Assertions.assertEquals( + StringUtils.deleteWhitespace(actual.get(INFO_MAP).toString()), "{\"field\":3}"); + } + + private SnowflakeSinkService createService(boolean useSingleBuffer) { + Map config = prepareConfig(useSingleBuffer); + return SnowflakeSinkServiceFactory.builder( + conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + .setRecordNumber(1) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) + .addTask(table, new TopicPartition(topic, PARTITION)) + .build(); + } + + private SinkRecord createSinkRecord() { + Schema schema = prepareSchema(); + Struct data = prepareData(schema); + AvroConverter avroConverter = prepareAvroConverter(); + + byte[] converted = avroConverter.fromConnectData(topic, data.schema(), data); + conn.createTableWithOnlyMetadataColumn(table); + + SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted); + + return new SinkRecord( + topic, + PARTITION, + Schema.STRING_SCHEMA, + "test", + avroInputValue.schema(), + avroInputValue.value(), + START_OFFSET); + } + + private AvroConverter prepareAvroConverter() { + SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient(); + AvroConverter avroConverter = new AvroConverter(schemaRegistry); + avroConverter.configure( + Collections.singletonMap("schema.registry.url", "http://fake-url"), false); + return avroConverter; + } + + private Map prepareConfig(boolean useSingleBuffer) { + Map config = TestUtils.getConfForStreaming(useSingleBuffer); + config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true"); + config.put( + SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, + "io.confluent.connect.avro.AvroConverter"); + config.put(SnowflakeSinkConnectorConfig.VALUE_SCHEMA_REGISTRY_CONFIG_FIELD, "http://fake-url"); + SnowflakeSinkConnectorConfig.setDefaultValues(config); + return config; + } + + private Schema prepareSchema() { + SchemaBuilder schemaBuilder = + SchemaBuilder.struct() + .field(ID_INT8, Schema.INT8_SCHEMA) + .field(ID_INT8_OPTIONAL, Schema.OPTIONAL_INT8_SCHEMA) + .field(ID_INT16, Schema.INT16_SCHEMA) + .field(ID_INT32, Schema.INT32_SCHEMA) + .field(ID_INT64, Schema.INT64_SCHEMA) + .field(FIRST_NAME, Schema.STRING_SCHEMA) + .field(RATING_FLOAT32, Schema.FLOAT32_SCHEMA) + .field(FLOAT_NAN, Schema.FLOAT32_SCHEMA) + .field(RATING_FLOAT64, Schema.FLOAT64_SCHEMA) + .field(APPROVAL, Schema.BOOLEAN_SCHEMA) + .field(INFO_ARRAY_STRING, SchemaBuilder.array(Schema.STRING_SCHEMA).build()) + .field(INFO_ARRAY_INT, SchemaBuilder.array(Schema.INT32_SCHEMA).build()) + .field(INFO_ARRAY_JSON, SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build()) + .field(INFO_MAP, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build()); + return schemaBuilder.build(); + } + + private Struct prepareData(Schema schema) { + return new Struct(schema) + .put(ID_INT8, (byte) 0) + .put(ID_INT16, (short) 42) + .put(ID_INT32, 42) + .put(ID_INT64, 42L) + .put(FIRST_NAME, "zekai") + .put(RATING_FLOAT32, 0.99f) + .put(FLOAT_NAN, Float.NaN) + .put(RATING_FLOAT64, 0.99d) + .put(APPROVAL, true) + .put(INFO_ARRAY_STRING, Arrays.asList("a", "b")) + .put(INFO_ARRAY_INT, Arrays.asList(1, 2)) + .put( + INFO_ARRAY_JSON, + Arrays.asList(null, "{\"a\": 1, \"b\": null, \"c\": null, \"d\": \"89asda9s0a\"}")) + .put(INFO_MAP, Collections.singletonMap("field", 3)); + } + + private static Stream singleBufferParameters() { + return Stream.of(Arguments.of(false), Arguments.of(true)); + } + + private void waitUntilOffsetEquals(long expectedOffset) { + await() + .timeout(Duration.ofSeconds(60)) + .until(() -> service.getOffset(new TopicPartition(topic, PARTITION)) == expectedOffset); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index 726b0ac29..3f23a4fa6 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -1256,105 +1256,6 @@ public void testStreamingIngestionWithExactlyOnceSemanticsOverlappingOffsets( service2.closeAll(); } - @ParameterizedTest(name = "useSingleBuffer: {0}") - @MethodSource("singleBufferParameters") - public void testSchematizationWithTableCreationAndAvroInput(boolean useSingleBuffer) - throws Exception { - conn = getConn(false); - Map config = TestUtils.getConfForStreaming(useSingleBuffer); - config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true"); - config.put( - SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, - "io.confluent.connect.avro.AvroConverter"); - config.put(SnowflakeSinkConnectorConfig.VALUE_SCHEMA_REGISTRY_CONFIG_FIELD, "http://fake-url"); - // get rid of these at the end - SnowflakeSinkConnectorConfig.setDefaultValues(config); - // avro - SchemaBuilder schemaBuilder = - SchemaBuilder.struct() - .field("id_int8", Schema.INT8_SCHEMA) - .field("id_int8_optional", Schema.OPTIONAL_INT8_SCHEMA) - .field("id_int16", Schema.INT16_SCHEMA) - .field("ID_INT32", Schema.INT32_SCHEMA) - .field("id_int64", Schema.INT64_SCHEMA) - .field("first_name", Schema.STRING_SCHEMA) - .field("rating_float32", Schema.FLOAT32_SCHEMA) - .field("rating_float64", Schema.FLOAT64_SCHEMA) - .field("approval", Schema.BOOLEAN_SCHEMA) - .field("info_array_string", SchemaBuilder.array(Schema.STRING_SCHEMA).build()) - .field("info_array_int", SchemaBuilder.array(Schema.INT32_SCHEMA).build()) - .field("info_array_json", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build()) - .field( - "info_map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build()); - - Struct original = - new Struct(schemaBuilder.build()) - .put("id_int8", (byte) 0) - .put("id_int16", (short) 42) - .put("ID_INT32", 42) - .put("id_int64", 42L) - .put("first_name", "zekai") - .put("rating_float32", 0.99f) - .put("rating_float64", 0.99d) - .put("approval", true) - .put("info_array_string", Arrays.asList("a", "b")) - .put("info_array_int", Arrays.asList(1, 2)) - .put( - "info_array_json", - Arrays.asList(null, "{\"a\": 1, \"b\": null, \"c\": null, \"d\": \"89asda9s0a\"}")) - .put("info_map", Collections.singletonMap("field", 3)); - - SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient(); - AvroConverter avroConverter = new AvroConverter(schemaRegistry); - avroConverter.configure( - Collections.singletonMap("schema.registry.url", "http://fake-url"), false); - byte[] converted = avroConverter.fromConnectData(topic, original.schema(), original); - conn.createTableWithOnlyMetadataColumn(table); - - SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted); - - long startOffset = 0; - - SinkRecord avroRecordValue = - new SinkRecord( - topic, - partition, - Schema.STRING_SCHEMA, - "test", - avroInputValue.schema(), - avroInputValue.value(), - startOffset); - - SnowflakeSinkService service = - SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) - .setRecordNumber(1) - .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) - .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) - .addTask(table, new TopicPartition(topic, partition)) - .build(); - - // The first insert should fail and schema evolution will kick in to update the schema - service.insert(Collections.singletonList(avroRecordValue)); - TestUtils.assertWithRetry( - () -> - service.getOffset(new TopicPartition(topic, partition)) - == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE, - 20, - 5); - - TestUtils.checkTableSchema(table, SchematizationTestUtils.SF_AVRO_SCHEMA_FOR_TABLE_CREATION); - - // Retry the insert should succeed now with the updated schema - service.insert(Collections.singletonList(avroRecordValue)); - TestUtils.assertWithRetry( - () -> service.getOffset(new TopicPartition(topic, partition)) == startOffset + 1, 20, 5); - - TestUtils.checkTableContentOneRow( - table, SchematizationTestUtils.CONTENT_FOR_AVRO_TABLE_CREATION); - - service.closeAll(); - } - @ParameterizedTest(name = "useSingleBuffer: {0}") @MethodSource("singleBufferParameters") public void testSchematizationWithTableCreationAndJsonInput(boolean useSingleBuffer) diff --git a/test/run_test_confluent.sh b/test/run_test_confluent.sh index 46db5b043..e3e584e44 100755 --- a/test/run_test_confluent.sh +++ b/test/run_test_confluent.sh @@ -204,6 +204,5 @@ if [ $testError -ne 0 ]; then RED='\033[0;31m' NC='\033[0m' # No Color echo -e "${RED} There is error above this line ${NC}" - cat $APACHE_LOG_PATH/kc.log error_exit "=== test_verify.py failed ===" fi diff --git a/test/test_suit/test_avrosr_avrosr.py b/test/test_suit/test_avrosr_avrosr.py index a8db5bfa9..0ecd5cf8d 100644 --- a/test/test_suit/test_avrosr_avrosr.py +++ b/test/test_suit/test_avrosr_avrosr.py @@ -25,7 +25,9 @@ def __init__(self, driver, nameSalt): "fields":[ {"name":"id","type":"int"}, {"name":"firstName","type":"string"}, - {"name":"time","type":"int"} + {"name":"time","type":"int"}, + {"name":"someFloat","type":"float"}, + {"name":"someFloatNaN","type":"float"} ] } """ @@ -41,7 +43,7 @@ def send(self): for e in range(100): # avro data must follow the schema defined in ValueSchemaStr key.append({"id": e}) - value.append({"id": e, "firstName": "abc0", "time": 1835}) + value.append({"id": e, "firstName": "abc0", "time": 1835, "someFloat": 21.37, "someFloatNaN": "NaN"}) self.driver.sendAvroSRData( self.topic, value, self.valueSchema, key, self.keySchema) @@ -58,7 +60,7 @@ def verify(self, round): "Select * from {} limit 1".format(self.topic)).fetchone() goldMeta = r'{"CreateTime":\d*,"key":{"id":0},"key_schema_id":\d*,"offset":0,"partition":0,"schema_id":\d*,' \ r'"topic":"travis_correct_avrosr_avrosr_\w*"}' - goldContent = r'{"firstName":"abc0","id":0,"time":1835}' + goldContent = r'{"firstName":"abc0","id":0,"someFloat":21.37,"someFloatNaN":"NaN","time":1835}' self.driver.regexMatchOneLine(res, goldMeta, goldContent) self.driver.verifyStageIsCleaned(self.topic) diff --git a/test/test_suit/test_schema_evolution_avro_sr.py b/test/test_suit/test_schema_evolution_avro_sr.py index bf328325f..cce02574a 100644 --- a/test/test_suit/test_schema_evolution_avro_sr.py +++ b/test/test_suit/test_schema_evolution_avro_sr.py @@ -32,7 +32,8 @@ def __init__(self, driver, nameSalt): self.records.append({ 'PERFORMANCE_STRING': 'Excellent', 'RATING_DOUBLE': 0.99, - 'APPROVAL': True + 'APPROVAL': True, + 'SOME_FLOAT_NAN': "NaN" }) self.ValueSchemaStr = [] @@ -56,7 +57,8 @@ def __init__(self, driver, nameSalt): "fields":[ {"name":"RATING_DOUBLE","type":"float"}, {"name":"PERFORMANCE_STRING","type":"string"}, - {"name":"APPROVAL","type":"boolean"} + {"name":"APPROVAL","type":"boolean"}, + {"name":"SOME_FLOAT_NAN","type":"float"} ] } """) @@ -67,7 +69,8 @@ def __init__(self, driver, nameSalt): 'RATING_INT': 'NUMBER', 'RATING_DOUBLE': 'FLOAT', 'APPROVAL': 'BOOLEAN', - 'RECORD_METADATA': 'VARIANT' + 'SOME_FLOAT_NAN': 'FLOAT', + 'RECORD_METADATA': 'VARIANT', } self.gold_columns = [columnName for columnName in self.gold_type] diff --git a/test/test_suit/test_snowpipe_streaming_string_avro_sr.py b/test/test_suit/test_snowpipe_streaming_string_avro_sr.py index e7310770d..25c087339 100644 --- a/test/test_suit/test_snowpipe_streaming_string_avro_sr.py +++ b/test/test_suit/test_snowpipe_streaming_string_avro_sr.py @@ -21,7 +21,9 @@ def __init__(self, driver, nameSalt): "fields":[ {"name":"id","type":"int"}, {"name":"firstName","type":"string"}, - {"name":"time","type":"int"} + {"name":"time","type":"int"}, + {"name":"someFloat","type":"float"}, + {"name":"someFloatNaN","type":"float"} ] } """ @@ -34,19 +36,13 @@ def getConfigFileName(self): return self.fileName + ".json" def send(self): - # create topic with n partitions and only one replication factor - print("Partition count:" + str(self.partitionNum)) - print("Topic:", self.topic) - - self.driver.describeTopic(self.topic) for p in range(self.partitionNum): print("Sending in Partition:" + str(p)) key = [] value = [] - value = [] for e in range(self.recordNum): - value.append({"id": e, "firstName": "abc0", "time": 1835}) + value.append({"id": e, "firstName": "abc0", "time": 1835, "someFloat": 21.37, "someFloatNaN": "NaN"}) self.driver.sendAvroSRData(self.topic, value, self.valueSchema, key=[], key_schema="", partition=p) sleep(2) diff --git a/test/test_suites.py b/test/test_suites.py index eae8d0264..5b86f31d6 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -164,7 +164,7 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS test_instance=TestSnowpipeStreamingStringJsonDLQ(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), - ("TestSnowpipeStreamingStringAvro", EndToEndTestSuite( + ("TestSnowpipeStreamingStringAvroSR", EndToEndTestSuite( test_instance=TestSnowpipeStreamingStringAvroSR(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False )),