Skip to content

Commit

Permalink
[spark] Support changing column types in array<struct> or map<?,struc…
Browse files Browse the repository at this point in the history
…t> (#4618)
  • Loading branch information
tsreaper authored Dec 2, 2024
1 parent e72c06c commit 77b7d8d
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -639,9 +639,10 @@ public void updateIntermediateColumn(List<DataField> newFields, int depth)

String fullFieldName =
String.join(".", Arrays.asList(updateFieldNames).subList(0, depth + 1));
List<DataField> nestedFields =
new ArrayList<>(extractRowType(field.type(), fullFieldName).getFields());
updateIntermediateColumn(nestedFields, depth + 1);
List<DataField> nestedFields = new ArrayList<>();
int newDepth =
depth + extractRowDataFields(field.type(), fullFieldName, nestedFields);
updateIntermediateColumn(nestedFields, newDepth);
newFields.set(
i,
new DataField(
Expand All @@ -657,14 +658,22 @@ public void updateIntermediateColumn(List<DataField> newFields, int depth)
String.join(".", Arrays.asList(updateFieldNames).subList(0, depth + 1)));
}

private RowType extractRowType(DataType type, String fullFieldName) {
private int extractRowDataFields(
DataType type, String fullFieldName, List<DataField> nestedFields) {
switch (type.getTypeRoot()) {
case ROW:
return (RowType) type;
nestedFields.addAll(((RowType) type).getFields());
return 1;
case ARRAY:
return extractRowType(((ArrayType) type).getElementType(), fullFieldName);
return extractRowDataFields(
((ArrayType) type).getElementType(),
fullFieldName,
nestedFields)
+ 1;
case MAP:
return extractRowType(((MapType) type).getValueType(), fullFieldName);
return extractRowDataFields(
((MapType) type).getValueType(), fullFieldName, nestedFields)
+ 1;
default:
throw new IllegalArgumentException(
fullFieldName + " is not a structured type.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,13 +738,15 @@ public void testUpdateRowTypeInArrayAndMap() throws Exception {

SchemaChange addColumn =
SchemaChange.addColumn(
new String[] {"v", "f3"},
new String[] {"v", "element", "value", "f3"},
DataTypes.STRING(),
null,
SchemaChange.Move.first("f3"));
SchemaChange dropColumn = SchemaChange.dropColumn(new String[] {"v", "f2"});
SchemaChange dropColumn =
SchemaChange.dropColumn(new String[] {"v", "element", "value", "f2"});
SchemaChange updateColumnType =
SchemaChange.updateColumnType(new String[] {"v", "f1"}, DataTypes.BIGINT(), false);
SchemaChange.updateColumnType(
new String[] {"v", "element", "value", "f1"}, DataTypes.BIGINT(), false);
manager.commitChanges(addColumn, dropColumn, updateColumnType);

innerType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -756,8 +756,11 @@ private void generateNestedColumnUpdates(
"Column %s can only be updated to array type, and cannot be updated to %s type",
joinedNames,
newType);
List<String> fullFieldNames = new ArrayList<>(fieldNames);
// add a dummy column name indicating the element of array
fullFieldNames.add("element");
generateNestedColumnUpdates(
fieldNames,
fullFieldNames,
((org.apache.paimon.types.ArrayType) oldType).getElementType(),
((org.apache.paimon.types.ArrayType) newType).getElementType(),
schemaChanges);
Expand All @@ -775,8 +778,11 @@ private void generateNestedColumnUpdates(
joinedNames,
oldMapType.getKeyType(),
newMapType.getKeyType());
List<String> fullFieldNames = new ArrayList<>(fieldNames);
// add a dummy column name indicating the value of map
fullFieldNames.add("value");
generateNestedColumnUpdates(
fieldNames,
fullFieldNames,
oldMapType.getValueType(),
newMapType.getValueType(),
schemaChanges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,89 @@ public void testAddAndDropNestedColumn(String formatType) {
"[5,[53,[503,500.03,5003],five]]");
}

@ParameterizedTest()
@ValueSource(strings = {"orc", "avro", "parquet"})
public void testAddAndDropNestedColumnInArray(String formatType) {
String tableName = "testAddNestedColumnTable";
spark.sql(
"CREATE TABLE paimon.default."
+ tableName
+ " (k INT NOT NULL, v ARRAY<STRUCT<f1: STRING, f2: INT>>) "
+ "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = '"
+ formatType
+ "')");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, ARRAY(STRUCT('apple', 100), STRUCT('banana', 101))), "
+ "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 201)))");
assertThat(
spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList()
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
"[1,WrappedArray([apple,100], [banana,101])]",
"[2,WrappedArray([cat,200], [dog,201])]");

spark.sql(
"ALTER TABLE paimon.default."
+ tableName
+ " ADD COLUMN v.element.f3 STRING AFTER f2");
spark.sql("ALTER TABLE paimon.default." + tableName + " DROP COLUMN v.element.f1");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, ARRAY(STRUCT(110, 'APPLE'), STRUCT(111, 'BANANA'))), "
+ "(3, ARRAY(STRUCT(310, 'FLOWER')))");
assertThat(
spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList()
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
"[1,WrappedArray([110,APPLE], [111,BANANA])]",
"[2,WrappedArray([200,null], [201,null])]",
"[3,WrappedArray([310,FLOWER])]");
}

@ParameterizedTest()
@ValueSource(strings = {"orc", "avro", "parquet"})
public void testAddAndDropNestedColumnInMap(String formatType) {
String tableName = "testAddNestedColumnTable";
spark.sql(
"CREATE TABLE paimon.default."
+ tableName
+ " (k INT NOT NULL, v MAP<INT, STRUCT<f1: STRING, f2: INT>>) "
+ "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = '"
+ formatType
+ "')");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, MAP(10, STRUCT('apple', 100), 20, STRUCT('banana', 101))), "
+ "(2, MAP(10, STRUCT('cat', 200), 20, STRUCT('dog', 201)))");
assertThat(
spark.sql("SELECT k, v[10].f1, v[10].f2 FROM paimon.default." + tableName)
.collectAsList().stream()
.map(Row::toString))
.containsExactlyInAnyOrder("[1,apple,100]", "[2,cat,200]");

spark.sql(
"ALTER TABLE paimon.default."
+ tableName
+ " ADD COLUMN v.value.f3 STRING AFTER f2");
spark.sql("ALTER TABLE paimon.default." + tableName + " DROP COLUMN v.value.f1");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, MAP(10, STRUCT(110, 'APPLE'), 20, STRUCT(111, 'BANANA'))), "
+ "(3, MAP(10, STRUCT(310, 'FLOWER')))");
assertThat(
spark.sql("SELECT k, v[10].f2, v[10].f3 FROM paimon.default." + tableName)
.collectAsList().stream()
.map(Row::toString))
.containsExactlyInAnyOrder("[1,110,APPLE]", "[2,200,null]", "[3,310,FLOWER]");
}

@ParameterizedTest()
@ValueSource(strings = {"orc", "avro", "parquet"})
public void testRenameNestedColumn(String formatType) {
Expand Down Expand Up @@ -818,6 +901,67 @@ public void testRenameNestedColumn(String formatType) {
.containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
}

@ParameterizedTest()
@ValueSource(strings = {"orc", "avro", "parquet"})
public void testRenameNestedColumnInArray(String formatType) {
String tableName = "testRenameNestedColumnTable";
spark.sql(
"CREATE TABLE paimon.default."
+ tableName
+ " (k INT NOT NULL, v ARRAY<STRUCT<f1: STRING, f2: INT>>) "
+ "TBLPROPERTIES ('file.format' = '"
+ formatType
+ "')");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, ARRAY(STRUCT('apple', 100), STRUCT('banana', 101))), "
+ "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 201)))");
assertThat(
spark.sql("SELECT v[0].f1, k FROM paimon.default." + tableName)
.collectAsList().stream()
.map(Row::toString))
.containsExactlyInAnyOrder("[apple,1]", "[cat,2]");

spark.sql(
"ALTER TABLE paimon.default." + tableName + " RENAME COLUMN v.element.f1 to f100");
assertThat(
spark.sql("SELECT v[0].f100, k FROM paimon.default." + tableName)
.collectAsList().stream()
.map(Row::toString))
.containsExactlyInAnyOrder("[apple,1]", "[cat,2]");
}

@ParameterizedTest()
@ValueSource(strings = {"orc", "avro", "parquet"})
public void testRenameNestedColumnInMap(String formatType) {
String tableName = "testRenameNestedColumnTable";
spark.sql(
"CREATE TABLE paimon.default."
+ tableName
+ " (k INT NOT NULL, v MAP<INT, STRUCT<f1: STRING, f2: INT>>) "
+ "TBLPROPERTIES ('file.format' = '"
+ formatType
+ "')");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, MAP(10, STRUCT('apple', 100), 20, STRUCT('banana', 101))), "
+ "(2, MAP(10, STRUCT('cat', 200), 20, STRUCT('dog', 201)))");
assertThat(
spark.sql("SELECT v[10].f1, k FROM paimon.default." + tableName)
.collectAsList().stream()
.map(Row::toString))
.containsExactlyInAnyOrder("[apple,1]", "[cat,2]");

spark.sql("ALTER TABLE paimon.default." + tableName + " RENAME COLUMN v.value.f1 to f100");
assertThat(
spark.sql("SELECT v[10].f100, k FROM paimon.default." + tableName)
.collectAsList().stream()
.map(Row::toString))
.containsExactlyInAnyOrder("[apple,1]", "[cat,2]");
}

@ParameterizedTest()
@ValueSource(strings = {"orc", "avro", "parquet"})
public void testUpdateNestedColumnType(String formatType) {
Expand Down Expand Up @@ -850,4 +994,84 @@ public void testUpdateNestedColumnType(String formatType) {
.map(Row::toString))
.containsExactlyInAnyOrder("[101,1]", "[200,2]", "[3000000000000,3]");
}

@ParameterizedTest()
@ValueSource(strings = {"orc", "avro", "parquet"})
public void testUpdateNestedColumnTypeInArray(String formatType) {
String tableName = "testRenameNestedColumnTable";
spark.sql(
"CREATE TABLE paimon.default."
+ tableName
+ " (k INT NOT NULL, v ARRAY<STRUCT<f1: STRING, f2: INT>>) "
+ "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = '"
+ formatType
+ "')");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, ARRAY(STRUCT('apple', 100), STRUCT('banana', 101))), "
+ "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 201)))");
assertThat(
spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList()
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
"[1,WrappedArray([apple,100], [banana,101])]",
"[2,WrappedArray([cat,200], [dog,201])]");

spark.sql(
"ALTER TABLE paimon.default."
+ tableName
+ " CHANGE COLUMN v.element.f2 f2 BIGINT");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, ARRAY(STRUCT('APPLE', 1000000000000), STRUCT('BANANA', 111))), "
+ "(3, ARRAY(STRUCT('FLOWER', 3000000000000)))");
assertThat(
spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList()
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
"[1,WrappedArray([APPLE,1000000000000], [BANANA,111])]",
"[2,WrappedArray([cat,200], [dog,201])]",
"[3,WrappedArray([FLOWER,3000000000000])]");
}

@ParameterizedTest()
@ValueSource(strings = {"orc", "avro", "parquet"})
public void testUpdateNestedColumnTypeInMap(String formatType) {
String tableName = "testRenameNestedColumnTable";
spark.sql(
"CREATE TABLE paimon.default."
+ tableName
+ " (k INT NOT NULL, v MAP<INT, STRUCT<f1: STRING, f2: INT>>) "
+ "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = '"
+ formatType
+ "')");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, MAP(10, STRUCT('apple', 100), 20, STRUCT('banana', 101))), "
+ "(2, MAP(10, STRUCT('cat', 200), 20, STRUCT('dog', 201)))");
assertThat(
spark.sql("SELECT k, v[10].f1, v[10].f2 FROM paimon.default." + tableName)
.collectAsList().stream()
.map(Row::toString))
.containsExactlyInAnyOrder("[1,apple,100]", "[2,cat,200]");

spark.sql(
"ALTER TABLE paimon.default." + tableName + " CHANGE COLUMN v.value.f2 f2 BIGINT");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, MAP(10, STRUCT('APPLE', 1000000000000), 20, STRUCT('BANANA', 111))), "
+ "(3, MAP(10, STRUCT('FLOWER', 3000000000000)))");
assertThat(
spark.sql("SELECT k, v[10].f1, v[10].f2 FROM paimon.default." + tableName)
.collectAsList().stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
"[1,APPLE,1000000000000]", "[2,cat,200]", "[3,FLOWER,3000000000000]");
}
}

0 comments on commit 77b7d8d

Please sign in to comment.