diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index ab2f26f5e..896cee995 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -537,38 +537,24 @@ private void appendIcebergColumnsQuery(String tableName, List if (columnsToAdd.isEmpty()) { return; } - StringBuilder appendColumnQuery = new StringBuilder("alter iceberg "); - appendColumnQuery.append("table identifier(?) add column if not exists "); - boolean first = true; - StringBuilder logColumn = new StringBuilder("["); + StringBuilder addColumnQuery = new StringBuilder("alter iceberg "); + addColumnQuery.append("table identifier(?) add column "); for (IcebergColumnTree column : columnsToAdd) { - if (first) { - first = false; - } else { - appendColumnQuery.append(", if not exists "); - logColumn.append(","); - } + addColumnQuery.append("if not exists "); + String columnName = column.getColumnName(); String dataType = column.buildType(); - appendColumnQuery.append(" ").append(columnName).append(" ").append(dataType); - // todo handle comments .append(columnInfos.getDdlComments()); - logColumn.append(columnName).append(" (").append(column.buildType()).append(")"); + addColumnQuery.append(" ").append(columnName).append(" ").append(dataType).append(", "); } + // remove last comma and whitespace + addColumnQuery.deleteCharAt(addColumnQuery.length() - 1); + addColumnQuery.deleteCharAt(addColumnQuery.length() - 1); - try { - LOGGER.info("Trying to run query: {}", appendColumnQuery.toString()); - PreparedStatement stmt = conn.prepareStatement(appendColumnQuery.toString()); - stmt.setString(1, tableName); - stmt.execute(); - stmt.close(); - } catch (SQLException e) { - throw SnowflakeErrors.ERROR_2015.getException(e); - } + executeStatement(tableName, addColumnQuery.toString()); - logColumn.insert(0, "Following columns created for table {}:\n").append("]"); - LOGGER.info(logColumn.toString(), tableName); + LOGGER.info("Query SUCCEEDED: " + addColumnQuery); } private void modifyIcebergColumnsQuery( @@ -576,21 +562,27 @@ private void modifyIcebergColumnsQuery( if (columnsToModify.isEmpty()) { return; } - StringBuilder appendColumnQuery = new StringBuilder("alter iceberg "); - appendColumnQuery.append("table identifier(?) alter column "); + StringBuilder setDataTypeQuery = new StringBuilder("alter iceberg "); + setDataTypeQuery.append("table identifier(?) alter column "); for (IcebergColumnTree column : columnsToModify) { String columnName = column.getColumnName(); String dataType = column.buildType(); - appendColumnQuery.append(columnName).append(" set data type ").append(dataType).append(", "); + setDataTypeQuery.append(columnName).append(" set data type ").append(dataType).append(", "); } - // remove last comma - appendColumnQuery.deleteCharAt(appendColumnQuery.length() - 1); - appendColumnQuery.deleteCharAt(appendColumnQuery.length() - 1); + // remove last comma and whitespace + setDataTypeQuery.deleteCharAt(setDataTypeQuery.length() - 1); + setDataTypeQuery.deleteCharAt(setDataTypeQuery.length() - 1); + executeStatement(tableName, setDataTypeQuery.toString()); + + LOGGER.info("Query SUCCEEDED: " + setDataTypeQuery); + } + + private void executeStatement(String tableName, String query) { try { - LOGGER.info("Trying to run query: {}", appendColumnQuery.toString()); - PreparedStatement stmt = conn.prepareStatement(appendColumnQuery.toString()); + LOGGER.info("Trying to run query: {}", query); + PreparedStatement stmt = conn.prepareStatement(query); stmt.setString(1, tableName); stmt.execute(); stmt.close(); diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java index 9b18149c2..8c19da967 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java @@ -24,7 +24,7 @@ public class ParseIcebergColumnTreeTest { @ParameterizedTest @MethodSource("icebergSchemas") - void parseFromApacheIcebergSchema(String plainIcebergSchema, String expectedQuery) { + void parseFromApacheIcebergSchema(String plainIcebergSchema, String expectedType) { // given Type type = IcebergDataTypeParser.deserializeIcebergType(plainIcebergSchema); // when @@ -32,45 +32,51 @@ void parseFromApacheIcebergSchema(String plainIcebergSchema, String expectedQuer new ApacheIcebergColumnSchema(type, "TEST_COLUMN_NAME"); IcebergColumnTree tree = new IcebergColumnTree(apacheSchema); // then - Assertions.assertEquals(expectedQuery, tree.buildType()); + Assertions.assertEquals(expectedType, tree.buildType()); + Assertions.assertEquals("TEST_COLUMN_NAME", tree.getColumnName()); } static Stream icebergSchemas() { return Stream.of( // primitives - arguments("\"boolean\"", "TEST_COLUMN_NAME BOOLEAN"), - arguments("\"int\"", "TEST_COLUMN_NAME NUMBER(10,0)"), - arguments("\"long\"", "TEST_COLUMN_NAME NUMBER(19,0)"), - arguments("\"float\"", "TEST_COLUMN_NAME FLOAT"), - arguments("\"double\"", "TEST_COLUMN_NAME FLOAT"), - arguments("\"date\"", "TEST_COLUMN_NAME DATE"), - arguments("\"time\"", "TEST_COLUMN_NAME TIME(6)"), - arguments("\"timestamptz\"", "TEST_COLUMN_NAME TIMESTAMP_LTZ"), - arguments("\"timestamp\"", "TEST_COLUMN_NAME TIMESTAMP"), - arguments("\"string\"", "TEST_COLUMN_NAME VARCHAR(16777216)"), - arguments("\"uuid\"", "TEST_COLUMN_NAME BINARY(16)"), - arguments("\"binary\"", "TEST_COLUMN_NAME BINARY"), - arguments("\"decimal(10,5)\"", "TEST_COLUMN_NAME DECIMAL(10, 5)"), + arguments("\"boolean\"", "BOOLEAN"), + arguments("\"int\"", "NUMBER(10,0)"), + arguments("\"long\"", "NUMBER(19,0)"), + arguments("\"float\"", "FLOAT"), + arguments("\"double\"", "FLOAT"), + arguments("\"date\"", "DATE"), + arguments("\"time\"", "TIME(6)"), + arguments("\"timestamptz\"", "TIMESTAMP_LTZ"), + arguments("\"timestamp\"", "TIMESTAMP"), + arguments("\"string\"", "VARCHAR(16777216)"), + arguments("\"uuid\"", "BINARY(16)"), + arguments("\"binary\"", "BINARY"), + arguments("\"decimal(10,5)\"", "DECIMAL(10, 5)"), // simple struct arguments( "{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}]}", - "TEST_COLUMN_NAME OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0))"), + "OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0))"), // list arguments( "{\"type\":\"list\",\"element-id\":23,\"element\":\"long\",\"element-required\":false}", - "TEST_COLUMN_NAME ARRAY(NUMBER(19,0))"), + "ARRAY(NUMBER(19,0))"), // map arguments( "{\"type\":\"map\",\"key-id\":4,\"key\":\"int\",\"value-id\":5,\"value\":\"string\",\"value-required\":false}", - "TEST_COLUMN_NAME MAP(NUMBER(10,0), VARCHAR(16777216))"), + "MAP(NUMBER(10,0), VARCHAR(16777216))"), // structs with nested objects arguments( - "{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"},{\"id\":25,\"name\":\"nested_object\",\"required\":false,\"type\":{\"type\":\"struct\",\"fields\":[{\"id\":26,\"name\":\"nested_key1\",\"required\":false,\"type\":\"string\"},{\"id\":27,\"name\":\"nested_key2\",\"required\":false,\"type\":\"string\"}]}}]}", - "TEST_COLUMN_NAME OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0), nested_object" + "{\"type\":\"struct\",\"fields\":[" + + "{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}," + + " {\"id\":25,\"name\":\"nested_object\",\"required\":false,\"type\":{\"type\":\"struct\",\"fields\":[" + + " {\"id\":26,\"name\":\"nested_key1\",\"required\":false,\"type\":\"string\"}," + + " {\"id\":27,\"name\":\"nested_key2\",\"required\":false,\"type\":\"string\"}" + + "]}}]}", + "OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0), nested_object" + " OBJECT(nested_key1 VARCHAR(16777216), nested_key2 VARCHAR(16777216)))"), arguments( "{\"type\":\"struct\",\"fields\":[{\"id\":2,\"name\":\"offset\",\"required\":false,\"type\":\"int\"},{\"id\":3,\"name\":\"topic\",\"required\":false,\"type\":\"string\"},{\"id\":4,\"name\":\"partition\",\"required\":false,\"type\":\"int\"},{\"id\":5,\"name\":\"key\",\"required\":false,\"type\":\"string\"},{\"id\":6,\"name\":\"schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":7,\"name\":\"key_schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":8,\"name\":\"CreateTime\",\"required\":false,\"type\":\"long\"},{\"id\":9,\"name\":\"LogAppendTime\",\"required\":false,\"type\":\"long\"},{\"id\":10,\"name\":\"SnowflakeConnectorPushTime\",\"required\":false,\"type\":\"long\"},{\"id\":11,\"name\":\"headers\",\"required\":false,\"type\":{\"type\":\"map\",\"key-id\":12,\"key\":\"string\",\"value-id\":13,\"value\":\"string\",\"value-required\":false}}]}\n", - "TEST_COLUMN_NAME OBJECT(offset NUMBER(10,0), topic VARCHAR(16777216), partition" + "OBJECT(offset NUMBER(10,0), topic VARCHAR(16777216), partition" + " NUMBER(10,0), key VARCHAR(16777216), schema_id NUMBER(10,0), key_schema_id" + " NUMBER(10,0), CreateTime NUMBER(19,0), LogAppendTime NUMBER(19,0)," + " SnowflakeConnectorPushTime NUMBER(19,0), headers MAP(VARCHAR(16777216)," @@ -78,8 +84,8 @@ static Stream icebergSchemas() { } @ParameterizedTest - @MethodSource("recordNodes") - void parseFromJsonRecordSchema(String jsonString, String expectedQuery) { + @MethodSource("parseFromJsonArguments") + void parseFromJsonRecordSchema(String jsonString, String expectedType) { // given SinkRecord record = createKafkaRecord(jsonString, false); JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true); @@ -88,32 +94,41 @@ void parseFromJsonRecordSchema(String jsonString, String expectedQuery) { // when IcebergColumnTree tree = new IcebergColumnTree(columnValuePair); // then - Assertions.assertEquals(expectedQuery, tree.buildType()); + Assertions.assertEquals(expectedType, tree.buildType()); + Assertions.assertEquals("TESTCOLUMNNAME", tree.getColumnName()); } - static Stream recordNodes() { + static Stream parseFromJsonArguments() { return Stream.of( - arguments("{\"test_number\" : 1 }", "test_number LONG"), + arguments("{\"testColumnName\" : 1 }", "LONG"), arguments( - "{ \"testStruct\": {" + "\"k1\" : 1," + "\"k2\" : 2" + "} " + "}", - "testStruct OBJECT(k1 LONG, k2 LONG)"), + "{ \"testColumnName\": {" + "\"k1\" : 1," + "\"k2\" : 2" + "} " + "}", + "OBJECT(k1 LONG, k2 LONG)"), arguments( - "{ \"testStruct\": {" + "{ \"testColumnName\": {" + "\"k1\" : { \"nested_key1\" : 1}," + "\"k2\" : { \"nested_key2\" : 2}" + "}}", - "testStruct OBJECT(k1 OBJECT(nested_key1 LONG), k2 OBJECT(nested_key2 LONG))"), + "OBJECT(k1 OBJECT(nested_key1 LONG), k2 OBJECT(nested_key2 LONG))"), arguments( - "{ \"vehiclesTestStruct\": {" + "{ \"testColumnName\": {" + "\"vehicle1\" : { \"car\" : { \"brand\" : \"vw\" } }," + "\"vehicle2\" : { \"car\" : { \"brand\" : \"toyota\" } }" + "}}", - "vehiclesTestStruct OBJECT(vehicle1 OBJECT(car OBJECT(brand VARCHAR)), vehicle2" + "OBJECT(vehicle1 OBJECT(car OBJECT(brand VARCHAR)), " + + "vehicle2 OBJECT(car OBJECT(brand VARCHAR)))"), + arguments( + "{ \"testColumnName\": {" + + "\"k1\" : { \"car\" : { \"brand\" : \"vw\" } }," + + "\"k2\" : { \"car\" : { \"brand\" : \"toyota\" } }" + + "}}", + "OBJECT(k1 OBJECT(car OBJECT(brand VARCHAR)), k2" + " OBJECT(car" + " OBJECT(brand" - + " VARCHAR)))"), // todo lol przy k1, k2 normalna kolejnosc, a przy nazwach - // vehicle1 i vehicle 2 juz inna - arguments("{\"test_array\": [1,2,3] }", "not ready")); + + " VARCHAR)))")); + // <- todo lol with k1, k2 the order is natural, however it changes an order when I used + // vehicle1, vehicle2 + // arguments("{\"test_array\": [1,2,3] }", "Array not yet implemented")); } @ParameterizedTest @@ -128,11 +143,16 @@ void mergeTwoTreesTest(String plainIcebergSchema, String recordJson, String expe JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true); IcebergColumnJsonValuePair columnValuePair = IcebergColumnJsonValuePair.from(recordNode.fields().next()); - + // parse trees IcebergColumnTree alreadyExistingTree = new IcebergColumnTree(apacheSchema); IcebergColumnTree modifiedTree = new IcebergColumnTree(columnValuePair); // then - Assertions.assertEquals(expectedResult, alreadyExistingTree.merge(modifiedTree).buildType()); + // merge modified tree + alreadyExistingTree.merge(modifiedTree); + + String expected = expectedResult.replaceAll("/ +/g", " "); + Assertions.assertEquals(expected, alreadyExistingTree.buildType()); + Assertions.assertEquals("TESTSTRUCT", alreadyExistingTree.getColumnName()); } static Stream mergeTestArguments() { @@ -140,7 +160,27 @@ static Stream mergeTestArguments() { arguments( "{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}]}", "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2, \"k3\" : 3 } }", - "OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0), k3 LONG)")); + "OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0), k3 LONG)"), + arguments( + "{\"type\":\"struct\",\"fields\":[" + + "{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}," + + " {\"id\":25,\"name\":\"nested_object\",\"required\":false,\"type\":{\"type\":\"struct\",\"fields\":[" + + " {\"id\":26,\"name\":\"nested_key1\",\"required\":false,\"type\":\"string\"}," + + " {\"id\":27,\"name\":\"nested_key2\",\"required\":false,\"type\":\"string\"}" + + "]}}]}", + "{\"testStruct\" : {" + + " \"k1\" : 1, " + + " \"k2\" : 2, " + + " \"nested_object\": { " + + " \"nested_key1\" : \"string\", " + + " \"nested_key2\" : \"blah\", " + + " \"nested_object2\" : { " + + " \"nested_key2\" : 23.5 " + + " }}" + + "}}", + "OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0), nested_object OBJECT(nested_key1" + + " VARCHAR(16777216), nested_key2 VARCHAR(16777216), nested_object2" + + " OBJECT(nested_key2 DOUBLE)))")); } protected SinkRecord createKafkaRecord(String jsonString, boolean withSchema) { diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index b3776a4cb..b3a07cd8f 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -118,73 +118,34 @@ public void alterAlreadyExistingStructure() throws Exception { waitForOffset(1); // insert the structure but with additional field k3 - String testStruct2 = "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2, \"k3\" : 3 } }"; + String testStruct2 = "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2, \"k3\" : \"foo\" } }"; service.insert(Collections.singletonList(createKafkaRecord(testStruct2, 1, false))); service.insert(Collections.singletonList(createKafkaRecord(testStruct2, 1, false))); waitForOffset(2); - List rows = describeTable(tableName); - assertEquals(rows.size(), 2); - } - - @ParameterizedTest(name = "{0}") - @MethodSource("prepareData") - // @Disabled - void shouldEvolveSchemaAndInsertRecords_structuredData2( - String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema) - throws Exception { - // start off with just one column - List rows = describeTable(tableName); - assertThat(rows) - .hasSize(1) - .extracting(DescribeTableRow::getColumn) - .contains(Utils.TABLE_COLUMN_METADATA); - - SinkRecord record = createKafkaRecord(message, 0, withSchema); - service.insert(Collections.singletonList(record)); - waitForOffset(-1); - rows = describeTable(tableName); - assertThat(rows.size()).isEqualTo(9); - - // don't check metadata column schema, we have different tests for that - // rows = - // rows.stream() - // .filter(r -> !r.getColumn().equals(Utils.TABLE_COLUMN_METADATA)) - // .collect(Collectors.toList()); - // - // assertThat(rows).containsExactlyInAnyOrder(expectedSchema); - - // resend and store same record without any issues now - // service.insert(Collections.singletonList(record)); - // waitForOffset(1); - // - // // and another record with same schema - // service.insert(Collections.singletonList(createKafkaRecord(message, 1, withSchema))); - // waitForOffset(2); - - String testStruct = "{ \"testStruct\": { \"k1\" : \"fdf1\" }}"; - - // String testStruct = - // "{ \"testStruct\": {" + - // "\"k1\" : { \"nested_key1\" : 1}," + - // "\"k2\" : { \"nested_key2\" : 2}" + - // "} " + - // "}"; - - // String testStruct = - // "{ \"testStruct\": {" + - // "\"k1\" : { \"car\" : { \"brand\" : \"vw\" } }," + - // "\"k2\" : { \"car\" : { \"brand\" : \"toyota\" } }" + - // "} " + - // "}"; - // reinsert record with extra field - service.insert(Collections.singletonList(createKafkaRecord(testStruct, 1, false))); - - service.insert(Collections.singletonList(createKafkaRecord(testStruct, 1, false))); - waitForOffset(2); - String alteredStruct = "{ \"testStruct\": { \"k1\" : \"fdf1\", \"k3\" : \"dfdf2\"} }"; - service.insert(Collections.singletonList(createKafkaRecord(alteredStruct, 2, false))); + // k1, k2, k3, k4 + String testStruct3 = + "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2, \"k3\" : \"bar\", \"k4\" : 4.5 } }"; + service.insert(Collections.singletonList(createKafkaRecord(testStruct3, 2, false))); + service.insert(Collections.singletonList(createKafkaRecord(testStruct3, 2, false))); waitForOffset(3); + + List columns = describeTable(tableName); + assertEquals( + columns.get(1).getType(), + "OBJECT(k1 NUMBER(19,0), k2 NUMBER(19,0), k3 VARCHAR(16777216), k4 FLOAT)"); + + // struck without k1 - verify that schema was not evolved back + String testStruct4 = "{ \"testStruct\": { \"k2\" : 2, \"k3\" : 3, \"k4\" : 4.34 } }"; + service.insert(Collections.singletonList(createKafkaRecord(testStruct4, 3, false))); + service.insert(Collections.singletonList(createKafkaRecord(testStruct4, 3, false))); + waitForOffset(4); + + columns = describeTable(tableName); + assertEquals( + columns.get(1).getType(), + "OBJECT(k1 NUMBER(19,0), k2 NUMBER(19,0), k3 VARCHAR(16777216), k4 FLOAT)"); + assertEquals(columns.size(), 2); } private void assertRecordsInTable() {