Skip to content

Commit

Permalink
refactor methods for generating queries
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bzabek committed Nov 21, 2024
1 parent 6f9f2fa commit a1adeb6
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -537,60 +537,52 @@ private void appendIcebergColumnsQuery(String tableName, List<IcebergColumnTree>
if (columnsToAdd.isEmpty()) {
return;
}
StringBuilder appendColumnQuery = new StringBuilder("alter iceberg ");
appendColumnQuery.append("table identifier(?) add column if not exists ");
boolean first = true;
StringBuilder logColumn = new StringBuilder("[");
StringBuilder addColumnQuery = new StringBuilder("alter iceberg ");
addColumnQuery.append("table identifier(?) add column ");

for (IcebergColumnTree column : columnsToAdd) {
if (first) {
first = false;
} else {
appendColumnQuery.append(", if not exists ");
logColumn.append(",");
}
addColumnQuery.append("if not exists ");

String columnName = column.getColumnName();
String dataType = column.buildType();

appendColumnQuery.append(" ").append(columnName).append(" ").append(dataType);
// todo handle comments .append(columnInfos.getDdlComments());
logColumn.append(columnName).append(" (").append(column.buildType()).append(")");
addColumnQuery.append(" ").append(columnName).append(" ").append(dataType).append(", ");
}
// remove last comma and whitespace
addColumnQuery.deleteCharAt(addColumnQuery.length() - 1);
addColumnQuery.deleteCharAt(addColumnQuery.length() - 1);

try {
LOGGER.info("Trying to run query: {}", appendColumnQuery.toString());
PreparedStatement stmt = conn.prepareStatement(appendColumnQuery.toString());
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
} catch (SQLException e) {
throw SnowflakeErrors.ERROR_2015.getException(e);
}
executeStatement(tableName, addColumnQuery.toString());

logColumn.insert(0, "Following columns created for table {}:\n").append("]");
LOGGER.info(logColumn.toString(), tableName);
LOGGER.info("Query SUCCEEDED: " + addColumnQuery);
}

private void modifyIcebergColumnsQuery(
String tableName, List<IcebergColumnTree> columnsToModify) {
if (columnsToModify.isEmpty()) {
return;
}
StringBuilder appendColumnQuery = new StringBuilder("alter iceberg ");
appendColumnQuery.append("table identifier(?) alter column ");
StringBuilder setDataTypeQuery = new StringBuilder("alter iceberg ");
setDataTypeQuery.append("table identifier(?) alter column ");
for (IcebergColumnTree column : columnsToModify) {
String columnName = column.getColumnName();
String dataType = column.buildType();

appendColumnQuery.append(columnName).append(" set data type ").append(dataType).append(", ");
setDataTypeQuery.append(columnName).append(" set data type ").append(dataType).append(", ");
}
// remove last comma
appendColumnQuery.deleteCharAt(appendColumnQuery.length() - 1);
appendColumnQuery.deleteCharAt(appendColumnQuery.length() - 1);
// remove last comma and whitespace
setDataTypeQuery.deleteCharAt(setDataTypeQuery.length() - 1);
setDataTypeQuery.deleteCharAt(setDataTypeQuery.length() - 1);

executeStatement(tableName, setDataTypeQuery.toString());

LOGGER.info("Query SUCCEEDED: " + setDataTypeQuery);
}

private void executeStatement(String tableName, String query) {
try {
LOGGER.info("Trying to run query: {}", appendColumnQuery.toString());
PreparedStatement stmt = conn.prepareStatement(appendColumnQuery.toString());
LOGGER.info("Trying to run query: {}", query);
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,62 +24,68 @@ public class ParseIcebergColumnTreeTest {

@ParameterizedTest
@MethodSource("icebergSchemas")
void parseFromApacheIcebergSchema(String plainIcebergSchema, String expectedQuery) {
void parseFromApacheIcebergSchema(String plainIcebergSchema, String expectedType) {
// given
Type type = IcebergDataTypeParser.deserializeIcebergType(plainIcebergSchema);
// when
ApacheIcebergColumnSchema apacheSchema =
new ApacheIcebergColumnSchema(type, "TEST_COLUMN_NAME");
IcebergColumnTree tree = new IcebergColumnTree(apacheSchema);
// then
Assertions.assertEquals(expectedQuery, tree.buildType());
Assertions.assertEquals(expectedType, tree.buildType());
Assertions.assertEquals("TEST_COLUMN_NAME", tree.getColumnName());
}

static Stream<Arguments> icebergSchemas() {
return Stream.of(
// primitives
arguments("\"boolean\"", "TEST_COLUMN_NAME BOOLEAN"),
arguments("\"int\"", "TEST_COLUMN_NAME NUMBER(10,0)"),
arguments("\"long\"", "TEST_COLUMN_NAME NUMBER(19,0)"),
arguments("\"float\"", "TEST_COLUMN_NAME FLOAT"),
arguments("\"double\"", "TEST_COLUMN_NAME FLOAT"),
arguments("\"date\"", "TEST_COLUMN_NAME DATE"),
arguments("\"time\"", "TEST_COLUMN_NAME TIME(6)"),
arguments("\"timestamptz\"", "TEST_COLUMN_NAME TIMESTAMP_LTZ"),
arguments("\"timestamp\"", "TEST_COLUMN_NAME TIMESTAMP"),
arguments("\"string\"", "TEST_COLUMN_NAME VARCHAR(16777216)"),
arguments("\"uuid\"", "TEST_COLUMN_NAME BINARY(16)"),
arguments("\"binary\"", "TEST_COLUMN_NAME BINARY"),
arguments("\"decimal(10,5)\"", "TEST_COLUMN_NAME DECIMAL(10, 5)"),
arguments("\"boolean\"", "BOOLEAN"),
arguments("\"int\"", "NUMBER(10,0)"),
arguments("\"long\"", "NUMBER(19,0)"),
arguments("\"float\"", "FLOAT"),
arguments("\"double\"", "FLOAT"),
arguments("\"date\"", "DATE"),
arguments("\"time\"", "TIME(6)"),
arguments("\"timestamptz\"", "TIMESTAMP_LTZ"),
arguments("\"timestamp\"", "TIMESTAMP"),
arguments("\"string\"", "VARCHAR(16777216)"),
arguments("\"uuid\"", "BINARY(16)"),
arguments("\"binary\"", "BINARY"),
arguments("\"decimal(10,5)\"", "DECIMAL(10, 5)"),
// simple struct
arguments(
"{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}]}",
"TEST_COLUMN_NAME OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0))"),
"OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0))"),
// list
arguments(
"{\"type\":\"list\",\"element-id\":23,\"element\":\"long\",\"element-required\":false}",
"TEST_COLUMN_NAME ARRAY(NUMBER(19,0))"),
"ARRAY(NUMBER(19,0))"),
// map
arguments(
"{\"type\":\"map\",\"key-id\":4,\"key\":\"int\",\"value-id\":5,\"value\":\"string\",\"value-required\":false}",
"TEST_COLUMN_NAME MAP(NUMBER(10,0), VARCHAR(16777216))"),
"MAP(NUMBER(10,0), VARCHAR(16777216))"),
// structs with nested objects
arguments(
"{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"},{\"id\":25,\"name\":\"nested_object\",\"required\":false,\"type\":{\"type\":\"struct\",\"fields\":[{\"id\":26,\"name\":\"nested_key1\",\"required\":false,\"type\":\"string\"},{\"id\":27,\"name\":\"nested_key2\",\"required\":false,\"type\":\"string\"}]}}]}",
"TEST_COLUMN_NAME OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0), nested_object"
"{\"type\":\"struct\",\"fields\":["
+ "{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"},"
+ " {\"id\":25,\"name\":\"nested_object\",\"required\":false,\"type\":{\"type\":\"struct\",\"fields\":["
+ " {\"id\":26,\"name\":\"nested_key1\",\"required\":false,\"type\":\"string\"},"
+ " {\"id\":27,\"name\":\"nested_key2\",\"required\":false,\"type\":\"string\"}"
+ "]}}]}",
"OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0), nested_object"
+ " OBJECT(nested_key1 VARCHAR(16777216), nested_key2 VARCHAR(16777216)))"),
arguments(
"{\"type\":\"struct\",\"fields\":[{\"id\":2,\"name\":\"offset\",\"required\":false,\"type\":\"int\"},{\"id\":3,\"name\":\"topic\",\"required\":false,\"type\":\"string\"},{\"id\":4,\"name\":\"partition\",\"required\":false,\"type\":\"int\"},{\"id\":5,\"name\":\"key\",\"required\":false,\"type\":\"string\"},{\"id\":6,\"name\":\"schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":7,\"name\":\"key_schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":8,\"name\":\"CreateTime\",\"required\":false,\"type\":\"long\"},{\"id\":9,\"name\":\"LogAppendTime\",\"required\":false,\"type\":\"long\"},{\"id\":10,\"name\":\"SnowflakeConnectorPushTime\",\"required\":false,\"type\":\"long\"},{\"id\":11,\"name\":\"headers\",\"required\":false,\"type\":{\"type\":\"map\",\"key-id\":12,\"key\":\"string\",\"value-id\":13,\"value\":\"string\",\"value-required\":false}}]}\n",
"TEST_COLUMN_NAME OBJECT(offset NUMBER(10,0), topic VARCHAR(16777216), partition"
"OBJECT(offset NUMBER(10,0), topic VARCHAR(16777216), partition"
+ " NUMBER(10,0), key VARCHAR(16777216), schema_id NUMBER(10,0), key_schema_id"
+ " NUMBER(10,0), CreateTime NUMBER(19,0), LogAppendTime NUMBER(19,0),"
+ " SnowflakeConnectorPushTime NUMBER(19,0), headers MAP(VARCHAR(16777216),"
+ " VARCHAR(16777216)))"));
}

@ParameterizedTest
@MethodSource("recordNodes")
void parseFromJsonRecordSchema(String jsonString, String expectedQuery) {
@MethodSource("parseFromJsonArguments")
void parseFromJsonRecordSchema(String jsonString, String expectedType) {
// given
SinkRecord record = createKafkaRecord(jsonString, false);
JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true);
Expand All @@ -88,32 +94,41 @@ void parseFromJsonRecordSchema(String jsonString, String expectedQuery) {
// when
IcebergColumnTree tree = new IcebergColumnTree(columnValuePair);
// then
Assertions.assertEquals(expectedQuery, tree.buildType());
Assertions.assertEquals(expectedType, tree.buildType());
Assertions.assertEquals("TESTCOLUMNNAME", tree.getColumnName());
}

static Stream<Arguments> recordNodes() {
static Stream<Arguments> parseFromJsonArguments() {
return Stream.of(
arguments("{\"test_number\" : 1 }", "test_number LONG"),
arguments("{\"testColumnName\" : 1 }", "LONG"),
arguments(
"{ \"testStruct\": {" + "\"k1\" : 1," + "\"k2\" : 2" + "} " + "}",
"testStruct OBJECT(k1 LONG, k2 LONG)"),
"{ \"testColumnName\": {" + "\"k1\" : 1," + "\"k2\" : 2" + "} " + "}",
"OBJECT(k1 LONG, k2 LONG)"),
arguments(
"{ \"testStruct\": {"
"{ \"testColumnName\": {"
+ "\"k1\" : { \"nested_key1\" : 1},"
+ "\"k2\" : { \"nested_key2\" : 2}"
+ "}}",
"testStruct OBJECT(k1 OBJECT(nested_key1 LONG), k2 OBJECT(nested_key2 LONG))"),
"OBJECT(k1 OBJECT(nested_key1 LONG), k2 OBJECT(nested_key2 LONG))"),
arguments(
"{ \"vehiclesTestStruct\": {"
"{ \"testColumnName\": {"
+ "\"vehicle1\" : { \"car\" : { \"brand\" : \"vw\" } },"
+ "\"vehicle2\" : { \"car\" : { \"brand\" : \"toyota\" } }"
+ "}}",
"vehiclesTestStruct OBJECT(vehicle1 OBJECT(car OBJECT(brand VARCHAR)), vehicle2"
"OBJECT(vehicle1 OBJECT(car OBJECT(brand VARCHAR)), "
+ "vehicle2 OBJECT(car OBJECT(brand VARCHAR)))"),
arguments(
"{ \"testColumnName\": {"
+ "\"k1\" : { \"car\" : { \"brand\" : \"vw\" } },"
+ "\"k2\" : { \"car\" : { \"brand\" : \"toyota\" } }"
+ "}}",
"OBJECT(k1 OBJECT(car OBJECT(brand VARCHAR)), k2"
+ " OBJECT(car"
+ " OBJECT(brand"
+ " VARCHAR)))"), // todo lol przy k1, k2 normalna kolejnosc, a przy nazwach
// vehicle1 i vehicle 2 juz inna
arguments("{\"test_array\": [1,2,3] }", "not ready"));
+ " VARCHAR)))"));
// <- todo lol with k1, k2 the order is natural, however it changes an order when I used
// vehicle1, vehicle2
// arguments("{\"test_array\": [1,2,3] }", "Array not yet implemented"));
}

@ParameterizedTest
Expand All @@ -128,19 +143,44 @@ void mergeTwoTreesTest(String plainIcebergSchema, String recordJson, String expe
JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true);
IcebergColumnJsonValuePair columnValuePair =
IcebergColumnJsonValuePair.from(recordNode.fields().next());

// parse trees
IcebergColumnTree alreadyExistingTree = new IcebergColumnTree(apacheSchema);
IcebergColumnTree modifiedTree = new IcebergColumnTree(columnValuePair);
// then
Assertions.assertEquals(expectedResult, alreadyExistingTree.merge(modifiedTree).buildType());
// merge modified tree
alreadyExistingTree.merge(modifiedTree);

String expected = expectedResult.replaceAll("/ +/g", " ");
Assertions.assertEquals(expected, alreadyExistingTree.buildType());
Assertions.assertEquals("TESTSTRUCT", alreadyExistingTree.getColumnName());
}

static Stream<Arguments> mergeTestArguments() {
return Stream.of(
arguments(
"{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}]}",
"{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2, \"k3\" : 3 } }",
"OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0), k3 LONG)"));
"OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0), k3 LONG)"),
arguments(
"{\"type\":\"struct\",\"fields\":["
+ "{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"},"
+ " {\"id\":25,\"name\":\"nested_object\",\"required\":false,\"type\":{\"type\":\"struct\",\"fields\":["
+ " {\"id\":26,\"name\":\"nested_key1\",\"required\":false,\"type\":\"string\"},"
+ " {\"id\":27,\"name\":\"nested_key2\",\"required\":false,\"type\":\"string\"}"
+ "]}}]}",
"{\"testStruct\" : {"
+ " \"k1\" : 1, "
+ " \"k2\" : 2, "
+ " \"nested_object\": { "
+ " \"nested_key1\" : \"string\", "
+ " \"nested_key2\" : \"blah\", "
+ " \"nested_object2\" : { "
+ " \"nested_key2\" : 23.5 "
+ " }}"
+ "}}",
"OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0), nested_object OBJECT(nested_key1"
+ " VARCHAR(16777216), nested_key2 VARCHAR(16777216), nested_object2"
+ " OBJECT(nested_key2 DOUBLE)))"));
}

protected SinkRecord createKafkaRecord(String jsonString, boolean withSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,73 +118,34 @@ public void alterAlreadyExistingStructure() throws Exception {
waitForOffset(1);

// insert the structure but with additional field k3
String testStruct2 = "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2, \"k3\" : 3 } }";
String testStruct2 = "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2, \"k3\" : \"foo\" } }";
service.insert(Collections.singletonList(createKafkaRecord(testStruct2, 1, false)));
service.insert(Collections.singletonList(createKafkaRecord(testStruct2, 1, false)));
waitForOffset(2);

List<DescribeTableRow> rows = describeTable(tableName);
assertEquals(rows.size(), 2);
}

@ParameterizedTest(name = "{0}")
@MethodSource("prepareData")
// @Disabled
void shouldEvolveSchemaAndInsertRecords_structuredData2(
String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema)
throws Exception {
// start off with just one column
List<DescribeTableRow> rows = describeTable(tableName);
assertThat(rows)
.hasSize(1)
.extracting(DescribeTableRow::getColumn)
.contains(Utils.TABLE_COLUMN_METADATA);

SinkRecord record = createKafkaRecord(message, 0, withSchema);
service.insert(Collections.singletonList(record));
waitForOffset(-1);
rows = describeTable(tableName);
assertThat(rows.size()).isEqualTo(9);

// don't check metadata column schema, we have different tests for that
// rows =
// rows.stream()
// .filter(r -> !r.getColumn().equals(Utils.TABLE_COLUMN_METADATA))
// .collect(Collectors.toList());
//
// assertThat(rows).containsExactlyInAnyOrder(expectedSchema);

// resend and store same record without any issues now
// service.insert(Collections.singletonList(record));
// waitForOffset(1);
//
// // and another record with same schema
// service.insert(Collections.singletonList(createKafkaRecord(message, 1, withSchema)));
// waitForOffset(2);

String testStruct = "{ \"testStruct\": { \"k1\" : \"fdf1\" }}";

// String testStruct =
// "{ \"testStruct\": {" +
// "\"k1\" : { \"nested_key1\" : 1}," +
// "\"k2\" : { \"nested_key2\" : 2}" +
// "} " +
// "}";

// String testStruct =
// "{ \"testStruct\": {" +
// "\"k1\" : { \"car\" : { \"brand\" : \"vw\" } }," +
// "\"k2\" : { \"car\" : { \"brand\" : \"toyota\" } }" +
// "} " +
// "}";
// reinsert record with extra field
service.insert(Collections.singletonList(createKafkaRecord(testStruct, 1, false)));

service.insert(Collections.singletonList(createKafkaRecord(testStruct, 1, false)));
waitForOffset(2);
String alteredStruct = "{ \"testStruct\": { \"k1\" : \"fdf1\", \"k3\" : \"dfdf2\"} }";
service.insert(Collections.singletonList(createKafkaRecord(alteredStruct, 2, false)));
// k1, k2, k3, k4
String testStruct3 =
"{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2, \"k3\" : \"bar\", \"k4\" : 4.5 } }";
service.insert(Collections.singletonList(createKafkaRecord(testStruct3, 2, false)));
service.insert(Collections.singletonList(createKafkaRecord(testStruct3, 2, false)));
waitForOffset(3);

List<DescribeTableRow> columns = describeTable(tableName);
assertEquals(
columns.get(1).getType(),
"OBJECT(k1 NUMBER(19,0), k2 NUMBER(19,0), k3 VARCHAR(16777216), k4 FLOAT)");

// struck without k1 - verify that schema was not evolved back
String testStruct4 = "{ \"testStruct\": { \"k2\" : 2, \"k3\" : 3, \"k4\" : 4.34 } }";
service.insert(Collections.singletonList(createKafkaRecord(testStruct4, 3, false)));
service.insert(Collections.singletonList(createKafkaRecord(testStruct4, 3, false)));
waitForOffset(4);

columns = describeTable(tableName);
assertEquals(
columns.get(1).getType(),
"OBJECT(k1 NUMBER(19,0), k2 NUMBER(19,0), k3 VARCHAR(16777216), k4 FLOAT)");
assertEquals(columns.size(), 2);
}

private void assertRecordsInTable() {
Expand Down

0 comments on commit a1adeb6

Please sign in to comment.