diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index 530e8851a..9fb872650 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -1,6 +1,8 @@ package com.snowflake.kafka.connector.internal; -import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.*; +import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT; +import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES; +import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SUB_DOMAIN; import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE; import com.codahale.metrics.Histogram; @@ -660,7 +662,7 @@ private SinkRecord handleNativeRecord(SinkRecord record, boolean isKey) { Schema schema = isKey ? record.keySchema() : record.valueSchema(); Object content = isKey ? record.key() : record.value(); try { - newSFContent = new SnowflakeRecordContent(schema, content); + newSFContent = new SnowflakeRecordContent(schema, content, false); } catch (Exception e) { LOGGER.error("Native content parser error:\n{}", e.getMessage()); try { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java index ff1e20e87..bd4668996 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java @@ -40,9 +40,9 @@ import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.data.Timestamp; -import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,7 +135,7 @@ static Map getColumnTypes(SinkRecord record, List column } Map columnToType = new LinkedHashMap<>(); Map schemaMap = getSchemaMapFromRecord(record); - JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value()); + JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true); Set columnNamesSet = new HashSet<>(columnNames); Iterator> fields = recordNode.fields(); @@ -174,7 +174,15 @@ private static Map getSchemaMapFromRecord(SinkRecord record) { Schema schema = record.valueSchema(); if (schema != null && schema.fields() != null) { for (Field field : schema.fields()) { - schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type(), field.schema().name())); + String snowflakeType = convertToSnowflakeType(field.schema().type(), field.schema().name()); + LOGGER.info( + "Got the snowflake data type for field:{}, schemaName:{}, kafkaType:{}," + + " snowflakeType:{}", + field.name(), + field.schema().name(), + field.schema().type(), + snowflakeType); + schemaMap.put(field.name(), snowflakeType); } } return schemaMap; @@ -187,6 +195,7 @@ private static String inferDataTypeFromJsonObject(JsonNode value) { // only when the type of the value is unrecognizable for JAVA throw SnowflakeErrors.ERROR_5021.getException("class: " + value.getClass()); } + // Passing null to schemaName when there is no schema information return convertToSnowflakeType(schemaType, null); } @@ -221,9 +230,9 @@ private static Type convertJsonNodeTypeToKafkaType(JsonNode value) { } /** Convert the kafka data type to Snowflake data type */ - private static String convertToSnowflakeType(Type kafkaType, String semanticType) { - if (semanticType != null) { - switch (semanticType) { + private static String convertToSnowflakeType(Type kafkaType, String schemaName) { + if (schemaName != null) { + switch (schemaName) { case Decimal.LOGICAL_NAME: return "DOUBLE"; case Time.LOGICAL_NAME: @@ -249,9 +258,19 @@ private static String convertToSnowflakeType(Type kafkaType, String semanticType case INT16: return "SMALLINT"; case INT32: - return "INT"; + if (Date.LOGICAL_NAME.equals(schemaName)) { + return "DATE"; + } else if (Time.LOGICAL_NAME.equals(schemaName)) { + return "TIME(6)"; + } else { + return "INT"; + } case INT64: - return "BIGINT"; + if (Timestamp.LOGICAL_NAME.equals(schemaName)) { + return "TIMESTAMP(6)"; + } else { + return "BIGINT"; + } case FLOAT32: return "FLOAT"; case FLOAT64: @@ -259,12 +278,16 @@ private static String convertToSnowflakeType(Type kafkaType, String semanticType case BOOLEAN: return "BOOLEAN"; case STRING: - if (semanticType != null && semanticType.equals("io.debezium.data.Json")) { + if (schemaName != null && schemaName.equals("io.debezium.data.Json")) { return "VARIANT"; } return "VARCHAR"; case BYTES: - return "BINARY"; + if (Decimal.LOGICAL_NAME.equals(schemaName)) { + return "VARCHAR"; + } else { + return "BINARY"; + } case ARRAY: return "ARRAY"; default: diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 349bfda87..dced25d4f 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -451,7 +451,7 @@ private SinkRecord handleNativeRecord(SinkRecord record, boolean isKey) { Schema schema = isKey ? record.keySchema() : record.valueSchema(); Object content = isKey ? record.key() : record.value(); try { - newSFContent = new SnowflakeRecordContent(schema, content); + newSFContent = new SnowflakeRecordContent(schema, content, true); } catch (Exception e) { LOGGER.error("Native content parser error:\n{}", e.getMessage()); try { @@ -783,7 +783,7 @@ private void handleInsertRowsFailures( } } - // TODO: SNOW-529755 POLL committed offsets in backgraound thread + // TODO: SNOW-529755 POLL committed offsets in background thread /** * Get committed offset from Snowflake. It does an HTTP call internally to find out what was the diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java index 108d7e3c4..965e1e321 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java @@ -89,6 +89,8 @@ public class RecordService { public static final ThreadLocal TIME_FORMAT = ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss.SSSZ")); + public static final ThreadLocal TIME_FORMAT_STREAMING = + ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss.SSSXXX")); static final int MAX_SNOWFLAKE_NUMBER_PRECISION = 38; // This class is designed to work with empty metadata config map @@ -384,7 +386,7 @@ void putKey(SinkRecord record, ObjectNode meta) { static JsonNode parseHeaders(Headers headers) { ObjectNode result = MAPPER.createObjectNode(); for (Header header : headers) { - result.set(header.key(), convertToJson(header.schema(), header.value())); + result.set(header.key(), convertToJson(header.schema(), header.value(), false)); } return result; } @@ -395,15 +397,17 @@ static JsonNode parseHeaders(Headers headers) { * * @param schema schema of the object * @param logicalValue object to be converted + * @param isStreaming indicates whether this is part of snowpipe streaming * @return a JsonNode of the object */ - public static JsonNode convertToJson(Schema schema, Object logicalValue) { + public static JsonNode convertToJson(Schema schema, Object logicalValue, boolean isStreaming) { if (logicalValue == null) { if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional // schema return null; - if (schema.defaultValue() != null) return convertToJson(schema, schema.defaultValue()); + if (schema.defaultValue() != null) + return convertToJson(schema, schema.defaultValue(), isStreaming); if (schema.isOptional()) return JsonNodeFactory.instance.nullNode(); throw SnowflakeErrors.ERROR_5015.getException( "Conversion error: null value for field that is required and has no default value"); @@ -439,8 +443,9 @@ public static JsonNode convertToJson(Schema schema, Object logicalValue) { ISO_DATE_TIME_FORMAT.get().format((java.util.Date) value)); } if (schema != null && Time.LOGICAL_NAME.equals(schema.name())) { - return JsonNodeFactory.instance.textNode( - TIME_FORMAT.get().format((java.util.Date) value)); + ThreadLocal format = + isStreaming ? TIME_FORMAT_STREAMING : TIME_FORMAT; + return JsonNodeFactory.instance.textNode(format.get().format((java.util.Date) value)); } return JsonNodeFactory.instance.numberNode((Integer) value); case INT64: @@ -497,7 +502,7 @@ else if (value instanceof ByteBuffer) { ArrayNode list = JsonNodeFactory.instance.arrayNode(); for (Object elem : collection) { Schema valueSchema = schema == null ? null : schema.valueSchema(); - JsonNode fieldValue = convertToJson(valueSchema, elem); + JsonNode fieldValue = convertToJson(valueSchema, elem, isStreaming); list.add(fieldValue); } return list; @@ -527,8 +532,8 @@ else if (value instanceof ByteBuffer) { for (Map.Entry entry : map.entrySet()) { Schema keySchema = schema == null ? null : schema.keySchema(); Schema valueSchema = schema == null ? null : schema.valueSchema(); - JsonNode mapKey = convertToJson(keySchema, entry.getKey()); - JsonNode mapValue = convertToJson(valueSchema, entry.getValue()); + JsonNode mapKey = convertToJson(keySchema, entry.getKey(), isStreaming); + JsonNode mapValue = convertToJson(valueSchema, entry.getValue(), isStreaming); if (objectMode) obj.set(mapKey.asText(), mapValue); else list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue)); @@ -542,7 +547,7 @@ else if (value instanceof ByteBuffer) { throw SnowflakeErrors.ERROR_5015.getException("Mismatching schema."); ObjectNode obj = JsonNodeFactory.instance.objectNode(); for (Field field : schema.fields()) { - obj.set(field.name(), convertToJson(field.schema(), struct.get(field))); + obj.set(field.name(), convertToJson(field.schema(), struct.get(field), isStreaming)); } return obj; } diff --git a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeRecordContent.java b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeRecordContent.java index 612626f6d..c88d7f3aa 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeRecordContent.java +++ b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeRecordContent.java @@ -40,11 +40,12 @@ public SnowflakeRecordContent() { * * @param schema schema of the object * @param data object produced by native avro/json converters + * @param isStreaming indicates whether this is part of snowpipe streaming */ - public SnowflakeRecordContent(Schema schema, Object data) { + public SnowflakeRecordContent(Schema schema, Object data, boolean isStreaming) { this.content = new JsonNode[1]; this.schemaID = NON_AVRO_SCHEMA; - this.content[0] = RecordService.convertToJson(schema, data); + this.content[0] = RecordService.convertToJson(schema, data, isStreaming); this.isBroken = false; this.brokenData = null; } diff --git a/src/test/java/com/snowflake/kafka/connector/records/ConverterTest.java b/src/test/java/com/snowflake/kafka/connector/records/ConverterTest.java index aa03d82b1..e9407bfc8 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/ConverterTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/ConverterTest.java @@ -291,7 +291,8 @@ public void testConnectJsonConverter_MapInt64() throws JsonProcessingException { jsonMap.put("test", Integer.MAX_VALUE); SchemaAndValue schemaAndValue = jsonConverter.toConnectData("test", mapper.writeValueAsBytes(jsonMap)); - JsonNode result = RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value()); + JsonNode result = + RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value(), false); ObjectNode expected = mapper.createObjectNode(); expected.put("test", Integer.MAX_VALUE); @@ -332,7 +333,7 @@ private boolean testSimpleDataFormat_jsonConverter_thread_safe() { SchemaAndValue schemaInputValue = jsonConverter.toConnectData("test", value.getBytes()); JsonNode result = - RecordService.convertToJson(schemaInputValue.schema(), schemaInputValue.value()); + RecordService.convertToJson(schemaInputValue.schema(), schemaInputValue.value(), false); System.out.println("Record Service result:" + result + " Thread :" + Thread.currentThread()); String exptectedDateTimeFormatStr = @@ -350,7 +351,8 @@ public void testConnectJsonConverter_MapBigDecimalExceedsMaxPrecision() jsonMap.put("test", new BigDecimal("999999999999999999999999999999999999999")); SchemaAndValue schemaAndValue = jsonConverter.toConnectData("test", mapper.writeValueAsBytes(jsonMap)); - JsonNode result = RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value()); + JsonNode result = + RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value(), false); ObjectNode expected = mapper.createObjectNode(); expected.put("test", new BigDecimal("999999999999999999999999999999999999999")); @@ -367,7 +369,8 @@ public void testConnectSimpleHeaderConverter_MapDateAndOtherTypes() SchemaAndValue schemaAndValue = headerConverter.toConnectHeader( "test", "h1", rawHeader.getBytes(StandardCharsets.US_ASCII)); - JsonNode result = RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value()); + JsonNode result = + RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value(), false); ObjectNode expected = mapper.createObjectNode(); long expectedTimestampValue = diff --git a/src/test/java/com/snowflake/kafka/connector/records/HeaderTest.java b/src/test/java/com/snowflake/kafka/connector/records/HeaderTest.java index 1ec49b614..be2fe7c74 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/HeaderTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/HeaderTest.java @@ -3,7 +3,14 @@ import java.io.IOException; import java.math.BigDecimal; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.record.TimestampType; diff --git a/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java b/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java index 2ab7c1326..94ce51299 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java @@ -104,7 +104,7 @@ public void test() throws IOException { .put("map", Collections.singletonMap("field", 1)) .put("mapNonStringKeys", Collections.singletonMap(1, 1)); - content = new SnowflakeRecordContent(schema, original); + content = new SnowflakeRecordContent(schema, original, false); assert content .getData()[0] .toString() @@ -117,7 +117,7 @@ public void test() throws IOException { "{\"int8\":12,\"int16\":12,\"int32\":12,\"int64\":12,\"float32\":12.2,\"float64\":12.2,\"boolean\":true,\"string\":\"foo\",\"bytes\":\"Zm9v\",\"array\":[\"a\",\"b\",\"c\"],\"map\":{\"field\":1},\"mapNonStringKeys\":[[1,1]]}"); Map jsonMap = mapper.convertValue(jsonObject, new TypeReference>() {}); - content = new SnowflakeRecordContent(null, jsonMap); + content = new SnowflakeRecordContent(null, jsonMap, false); assert content .getData()[0] .toString() @@ -206,24 +206,22 @@ public void testWrongKeyType() { @Test(expected = SnowflakeKafkaConnectorException.class) public void testConvertToJsonEmptyValue() { - assert RecordService.convertToJson(null, null) == null; - Schema schema = SchemaBuilder.int32().optional().defaultValue(123).build(); - assert RecordService.convertToJson(schema, null).toString().equals("123"); + assert RecordService.convertToJson(schema, null, false).toString().equals("123"); schema = SchemaBuilder.int32().build(); - RecordService.convertToJson(schema, null); + RecordService.convertToJson(schema, null, false); } @Test(expected = SnowflakeKafkaConnectorException.class) public void testConvertToJsonNonOptional() { Schema schema = SchemaBuilder.int32().build(); - RecordService.convertToJson(schema, null); + RecordService.convertToJson(schema, null, false); } @Test(expected = SnowflakeKafkaConnectorException.class) public void testConvertToJsonNoSchemaType() { - RecordService.convertToJson(null, new SnowflakeJsonSchema()); + RecordService.convertToJson(null, new SnowflakeJsonSchema(), false); } @Test @@ -233,7 +231,7 @@ public void testConvertToJsonReadOnlyByteBuffer() { String expected = "\"" + Base64.getEncoder().encodeToString(original.getBytes()) + "\""; ByteBuffer buffer = ByteBuffer.wrap(original.getBytes()).asReadOnlyBuffer(); Schema schema = SchemaBuilder.bytes().build(); - assert RecordService.convertToJson(schema, buffer).toString().equals(expected); + assert RecordService.convertToJson(schema, buffer, false).toString().equals(expected); } @Test diff --git a/test/rest_request_template/travis_correct_schema_evolution_avro_sr_logical_types.json b/test/rest_request_template/travis_correct_schema_evolution_avro_sr_logical_types.json new file mode 100644 index 000000000..27de6cf6e --- /dev/null +++ b/test/rest_request_template/travis_correct_schema_evolution_avro_sr_logical_types.json @@ -0,0 +1,29 @@ +{ + "name": "SNOWFLAKE_CONNECTOR_NAME", + "config": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1", + "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME", + "tasks.max": "1", + "buffer.flush.time": "10", + "buffer.count.records": "100", + "buffer.size.bytes": "5000000", + "snowflake.url.name": "SNOWFLAKE_HOST", + "snowflake.user.name": "SNOWFLAKE_USER", + "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", + "snowflake.database.name": "SNOWFLAKE_DATABASE", + "snowflake.schema.name": "SNOWFLAKE_SCHEMA", + "snowflake.role.name": "SNOWFLAKE_ROLE", + "snowflake.ingestion.method": "SNOWPIPE_STREAMING", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter":"io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", + "value.converter.schemas.enable": "false", + "jmx": "true", + "errors.tolerance": "all", + "errors.log.enable": true, + "errors.deadletterqueue.topic.name": "DLQ_TOPIC", + "errors.deadletterqueue.topic.replication.factor": 1, + "snowflake.enable.schematization": true + } +} \ No newline at end of file diff --git a/test/test_suit/test_schema_evolution_avro_sr_logical_types.py b/test/test_suit/test_schema_evolution_avro_sr_logical_types.py new file mode 100644 index 000000000..8c61677ca --- /dev/null +++ b/test/test_suit/test_schema_evolution_avro_sr_logical_types.py @@ -0,0 +1,120 @@ +from decimal import Decimal + +from confluent_kafka import avro +from test_suit.test_utils import NonRetryableError + + +# test if the table is updated with the correct column +# add test if all the records from different topics safely land in the table +class TestSchemaEvolutionAvroSRLogicalTypes: + def __init__(self, driver, nameSalt): + self.driver = driver + self.fileName = "travis_correct_schema_evolution_avro_sr_logical_types" + self.topics = [] + self.table = self.fileName + nameSalt + self.recordNum = 100 + + for i in range(2): + self.topics.append(self.table + str(i)) + + self.driver.snowflake_conn.cursor().execute( + "Create or replace table {} (PERFORMANCE_STRING STRING)".format(self.table)) + + self.driver.snowflake_conn.cursor().execute( + "alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table)) + + self.records = [] + + self.records.append({ + 'PERFORMANCE_STRING': 'Excellent', + 'TIME_MILLIS': 10, + 'DATE': 11, + 'TIMESTAMP_MILLIS': 12, + 'DECIMAL': Decimal(4.0) + }) + + self.records.append({ + 'PERFORMANCE_STRING': 'Excellent', + 'RATING_DOUBLE': 0.99, + }) + + self.ValueSchemaStr = [] + + self.ValueSchemaStr.append(""" + { + "type":"record", + "name":"value_schema_0", + "fields":[ + {"name":"PERFORMANCE_STRING","type":"string"}, + {"name":"TIME_MILLIS","type":{"type":"int","logicalType":"time-millis"}}, + {"name":"DATE","type":{"type":"int","logicalType":"date"}}, + {"name":"DECIMAL","type":{"type":"bytes","logicalType":"decimal", "precision":4, "scale":2}}, + {"name":"TIMESTAMP_MILLIS","type":{"type":"long","logicalType":"timestamp-millis"}} + ] + } + """) + + self.ValueSchemaStr.append(""" + { + "type":"record", + "name":"value_schema_1", + "fields":[ + {"name":"RATING_DOUBLE","type":"float"}, + {"name":"PERFORMANCE_STRING","type":"string"} + ] + } + """) + + self.gold_type = { + 'PERFORMANCE_STRING': 'VARCHAR', + 'RATING_DOUBLE': 'FLOAT', + 'TIME_MILLIS': 'TIME(6)', + 'DATE': 'DATE', + 'TIMESTAMP_MILLIS': 'TIMESTAMP_NTZ(6)', + 'DECIMAL': 'VARCHAR', + 'RECORD_METADATA': 'VARIANT' + } + + self.gold_columns = [columnName for columnName in self.gold_type] + + self.valueSchema = [] + + for valueSchemaStr in self.ValueSchemaStr: + self.valueSchema.append(avro.loads(str(valueSchemaStr))) + + def getConfigFileName(self): + return self.fileName + ".json" + + def send(self): + for i, topic in enumerate(self.topics): + value = [] + for _ in range(self.recordNum): + value.append(self.records[i]) + self.driver.sendAvroSRData(topic, value, self.valueSchema[i], key=[], key_schema="", partition=0) + + def verify(self, round): + rows = self.driver.snowflake_conn.cursor().execute( + "desc table {}".format(self.table)).fetchall() + res_col = {} + + for index, row in enumerate(rows): + self.gold_columns.remove(row[0]) + if not row[1].startswith(self.gold_type[row[0]]): + raise NonRetryableError("Column {} has the wrong type. got: {}, expected: {}".format(row[0], row[1], + self.gold_type[ + row[0]])) + res_col[row[0]] = index + + print("Columns not in table: ", self.gold_columns) + + for columnName in self.gold_columns: + raise NonRetryableError("Column {} was not created".format(columnName)) + + res = self.driver.snowflake_conn.cursor().execute( + "SELECT count(*) FROM {}".format(self.table)).fetchone()[0] + if res != len(self.topics) * self.recordNum: + print("Number of record expected: {}, got: {}".format(len(self.topics) * self.recordNum, res)) + raise NonRetryableError("Number of record in table is different from number of record sent") + + def clean(self): + self.driver.cleanTableStagePipe(self.table) diff --git a/test/test_suites.py b/test/test_suites.py index 001fc0a89..a88e1f3e8 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -25,6 +25,7 @@ from test_suit.test_native_string_json_without_schema import TestNativeStringJsonWithoutSchema from test_suit.test_native_string_protobuf import TestNativeStringProtobuf from test_suit.test_schema_evolution_avro_sr import TestSchemaEvolutionAvroSR +from test_suit.test_schema_evolution_avro_sr_logical_types import TestSchemaEvolutionAvroSRLogicalTypes from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson from test_suit.test_schema_evolution_json_ignore_tombstone import TestSchemaEvolutionJsonIgnoreTombstone @@ -39,8 +40,9 @@ from test_suit.test_snowpipe_streaming_schema_mapping_dlq import TestSnowpipeStreamingSchemaMappingDLQ from test_suit.test_snowpipe_streaming_string_avro_sr import TestSnowpipeStreamingStringAvroSR from test_suit.test_snowpipe_streaming_string_json import TestSnowpipeStreamingStringJson -from test_suit.test_snowpipe_streaming_string_json_ignore_tombstone import TestSnowpipeStreamingStringJsonIgnoreTombstone from test_suit.test_snowpipe_streaming_string_json_dlq import TestSnowpipeStreamingStringJsonDLQ +from test_suit.test_snowpipe_streaming_string_json_ignore_tombstone import \ + TestSnowpipeStreamingStringJsonIgnoreTombstone from test_suit.test_string_avro import TestStringAvro from test_suit.test_string_avrosr import TestStringAvrosr from test_suit.test_string_json import TestStringJson @@ -91,7 +93,8 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS test_instance=TestStringJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), ("TestStringJsonIgnoreTombstone", EndToEndTestSuite( - test_instance=TestStringJsonIgnoreTombstone(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestStringJsonIgnoreTombstone(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestJsonJson", EndToEndTestSuite( test_instance=TestJsonJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True @@ -132,7 +135,8 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS run_in_apache=True )), ("TestSnowpipeStreamingStringJsonIgnoreTombstone", EndToEndTestSuite( - test_instance=TestSnowpipeStreamingStringJsonIgnoreTombstone(driver, nameSalt), clean=True, run_in_confluent=True, + test_instance=TestSnowpipeStreamingStringJsonIgnoreTombstone(driver, nameSalt), clean=True, + run_in_confluent=True, run_in_apache=True )), ("TestSnowpipeStreamingStringJsonDLQ", EndToEndTestSuite( @@ -178,6 +182,11 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS test_instance=TestSchemaEvolutionAvroSR(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False )), + # SNOW-947731: Re-enable after avro-python3 package is updated in merge gate + ("TestSchemaEvolutionAvroSRLogicalTypes", EndToEndTestSuite( + test_instance=TestSchemaEvolutionAvroSRLogicalTypes(driver, nameSalt), clean=True, run_in_confluent=False, + run_in_apache=False + )), ("TestSchemaEvolutionWithAutoTableCreationJson", EndToEndTestSuite( test_instance=TestSchemaEvolutionWithAutoTableCreationJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True