diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index c5cea0c215df..4c36ad4db3c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -536,7 +536,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List c for (SchemaChange change : changes) { if (change instanceof SchemaChange.AddColumn) { SchemaChange.AddColumn addColumn = (SchemaChange.AddColumn) change; - fieldNames.add(addColumn.fieldName()); + fieldNames.addAll(addColumn.fieldNames()); } else if (change instanceof SchemaChange.RenameColumn) { SchemaChange.RenameColumn rename = (SchemaChange.RenameColumn) change; fieldNames.add(rename.newName()); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java index 1e790bf659a1..1b4c58e30de9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java @@ -25,6 +25,8 @@ import java.io.Serializable; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Objects; /** @@ -52,11 +54,16 @@ static SchemaChange addColumn(String fieldName, DataType dataType) { } static SchemaChange addColumn(String fieldName, DataType dataType, String comment) { - return new AddColumn(fieldName, dataType, comment, null); + return new AddColumn(Collections.singletonList(fieldName), dataType, comment, null); } static SchemaChange addColumn(String fieldName, DataType dataType, String comment, Move move) { - return new AddColumn(fieldName, dataType, comment, move); + return new AddColumn(Collections.singletonList(fieldName), dataType, comment, move); + } + + static SchemaChange addColumn( + List fieldNames, DataType dataType, String comment, Move move) { + return new AddColumn(fieldNames, dataType, comment, move); } static SchemaChange renameColumn(String fieldName, String newName) { @@ -64,7 +71,11 @@ static SchemaChange renameColumn(String fieldName, String newName) { } static SchemaChange dropColumn(String fieldName) { - return new DropColumn(fieldName); + return new DropColumn(Collections.singletonList(fieldName)); + } + + static SchemaChange dropColumn(List fieldNames) { + return new DropColumn(fieldNames); } static SchemaChange updateColumnType(String fieldName, DataType newDataType) { @@ -207,20 +218,21 @@ final class AddColumn implements SchemaChange { private static final long serialVersionUID = 1L; - private final String fieldName; + private final List fieldNames; private final DataType dataType; private final String description; private final Move move; - private AddColumn(String fieldName, DataType dataType, String description, Move move) { - this.fieldName = fieldName; + private AddColumn( + List fieldNames, DataType dataType, String description, Move move) { + this.fieldNames = fieldNames; this.dataType = dataType; this.description = description; this.move = move; } - public String fieldName() { - return fieldName; + public List fieldNames() { + return fieldNames; } public DataType dataType() { @@ -246,7 +258,7 @@ public boolean equals(Object o) { return false; } AddColumn addColumn = (AddColumn) o; - return Objects.equals(fieldName, addColumn.fieldName) + return Objects.equals(fieldNames, addColumn.fieldNames) && dataType.equals(addColumn.dataType) && Objects.equals(description, addColumn.description) && move.equals(addColumn.move); @@ -255,7 +267,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = Objects.hash(dataType, description); - result = 31 * result + Objects.hashCode(fieldName); + result = 31 * result + Objects.hashCode(fieldNames); result = 31 * result + Objects.hashCode(move); return result; } @@ -308,14 +320,14 @@ final class DropColumn implements SchemaChange { private static final long serialVersionUID = 1L; - private final String fieldName; + private final List fieldNames; - private DropColumn(String fieldName) { - this.fieldName = fieldName; + private DropColumn(List fieldNames) { + this.fieldNames = fieldNames; } - public String fieldName() { - return fieldName; + public List fieldNames() { + return fieldNames; } @Override @@ -327,12 +339,12 @@ public boolean equals(Object o) { return false; } DropColumn that = (DropColumn) o; - return Objects.equals(fieldName, that.fieldName); + return Objects.equals(fieldNames, that.fieldNames); } @Override public int hashCode() { - return Objects.hashCode(fieldName); + return Objects.hashCode(fieldNames); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java index 083d131ec846..b5d730707359 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java @@ -22,6 +22,7 @@ import org.apache.paimon.casting.CastExecutor; import org.apache.paimon.casting.CastExecutors; import org.apache.paimon.casting.CastFieldGetter; +import org.apache.paimon.casting.CastedRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.predicate.LeafPredicate; import org.apache.paimon.predicate.Predicate; @@ -67,8 +68,6 @@ public class SchemaEvolutionUtil { * data fields, -1 is the index of 6->b in data fields and 1 is the index of 3->a in data * fields. * - *

/// TODO should support nest index mapping when nest schema evolution is supported. - * * @param tableFields the fields of table * @param dataFields the fields of underlying data * @return the index mapping @@ -394,18 +393,32 @@ private static CastFieldGetter[] createCastFieldGetterMapping( checkState( !(tableField.type() instanceof MapType || dataField.type() instanceof ArrayType - || dataField.type() instanceof MultisetType - || dataField.type() instanceof RowType), - "Only support column type evolution in atomic data type."); + || dataField.type() instanceof MultisetType), + "Only support column type evolution in atomic and row data type."); + + CastExecutor castExecutor; + if (tableField.type() instanceof RowType + && dataField.type() instanceof RowType) { + castExecutor = + createRowCastExecutor( + (RowType) dataField.type(), (RowType) tableField.type()); + } else { + castExecutor = CastExecutors.resolve(dataField.type(), tableField.type()); + } + checkNotNull( + castExecutor, + "Cannot cast from type " + + dataField.type() + + " to type " + + tableField.type()); + // Create getter with index i and projected row data will convert to underlying // data converterMapping[i] = new CastFieldGetter( InternalRowUtils.createNullCheckingFieldGetter( dataField.type(), i), - checkNotNull( - CastExecutors.resolve( - dataField.type(), tableField.type()))); + castExecutor); castExist = true; } } @@ -413,4 +426,24 @@ private static CastFieldGetter[] createCastFieldGetterMapping( return castExist ? converterMapping : null; } + + private static CastExecutor createRowCastExecutor( + RowType inputType, RowType targetType) { + int[] indexMapping = createIndexMapping(targetType.getFields(), inputType.getFields()); + CastFieldGetter[] castFieldGetters = + createCastFieldGetterMapping( + targetType.getFields(), inputType.getFields(), indexMapping); + + ProjectedRow projectedRow = indexMapping == null ? null : ProjectedRow.from(indexMapping); + CastedRow castedRow = castFieldGetters == null ? null : CastedRow.from(castFieldGetters); + return value -> { + if (projectedRow != null) { + value = projectedRow.replaceRow(value); + } + if (castedRow != null) { + value = castedRow.replaceRow(value); + } + return value; + }; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 7b987b049228..6b4127ceebee 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -40,7 +40,6 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeCasts; -import org.apache.paimon.types.DataTypeVisitor; import org.apache.paimon.types.ReassignFieldId; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; @@ -282,44 +281,52 @@ public TableSchema commitChanges(List changes) } else if (change instanceof AddColumn) { AddColumn addColumn = (AddColumn) change; SchemaChange.Move move = addColumn.move(); - if (newFields.stream().anyMatch(f -> f.name().equals(addColumn.fieldName()))) { - throw new Catalog.ColumnAlreadyExistException( - identifierFromPath(tableRoot.toString(), true, branch), - addColumn.fieldName()); - } Preconditions.checkArgument( addColumn.dataType().isNullable(), "Column %s cannot specify NOT NULL in the %s table.", - addColumn.fieldName(), + String.join(".", addColumn.fieldNames()), identifierFromPath(tableRoot.toString(), true, branch).getFullName()); int id = highestFieldId.incrementAndGet(); DataType dataType = ReassignFieldId.reassign(addColumn.dataType(), highestFieldId); - DataField dataField = - new DataField( - id, addColumn.fieldName(), dataType, addColumn.description()); - - // key: name ; value : index - Map map = new HashMap<>(); - for (int i = 0; i < newFields.size(); i++) { - map.put(newFields.get(i).name(), i); - } - - if (null != move) { - if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) { - newFields.add(0, dataField); - } else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) { - int fieldIndex = map.get(move.referenceFieldName()); - newFields.add(fieldIndex + 1, dataField); + new NestedColumnModifier( + addColumn.fieldNames().toArray(new String[0])) { + @Override + protected void updateLastColumn(List newFields, String fieldName) + throws Catalog.ColumnAlreadyExistException { + for (DataField field : newFields) { + if (field.name().equals(fieldName)) { + throw new Catalog.ColumnAlreadyExistException( + identifierFromPath(tableRoot.toString(), true, branch), + String.join(".", addColumn.fieldNames())); + } + } + + DataField dataField = + new DataField(id, fieldName, dataType, addColumn.description()); + + // key: name ; value : index + Map map = new HashMap<>(); + for (int i = 0; i < newFields.size(); i++) { + map.put(newFields.get(i).name(), i); + } + + if (null != move) { + if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) { + newFields.add(0, dataField); + } else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) { + int fieldIndex = map.get(move.referenceFieldName()); + newFields.add(fieldIndex + 1, dataField); + } + } else { + newFields.add(dataField); + } } - } else { - newFields.add(dataField); - } - + }.updateIntermediateColumn(newFields, 0); } else if (change instanceof RenameColumn) { RenameColumn rename = (RenameColumn) change; - columnChangeValidation(oldTableSchema, change); + renameColumnValidation(oldTableSchema, rename); if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) { throw new Catalog.ColumnAlreadyExistException( identifierFromPath(tableRoot.toString(), true, branch), @@ -329,7 +336,6 @@ public TableSchema commitChanges(List changes) updateNestedColumn( newFields, new String[] {rename.fieldName()}, - 0, (field) -> new DataField( field.id(), @@ -338,16 +344,23 @@ public TableSchema commitChanges(List changes) field.description())); } else if (change instanceof DropColumn) { DropColumn drop = (DropColumn) change; - columnChangeValidation(oldTableSchema, change); - if (!newFields.removeIf( - f -> f.name().equals(((DropColumn) change).fieldName()))) { - throw new Catalog.ColumnNotExistException( - identifierFromPath(tableRoot.toString(), true, branch), - drop.fieldName()); - } - if (newFields.isEmpty()) { - throw new IllegalArgumentException("Cannot drop all fields in table"); - } + dropColumnValidation(oldTableSchema, drop); + new NestedColumnModifier( + drop.fieldNames().toArray(new String[0])) { + @Override + protected void updateLastColumn(List newFields, String fieldName) + throws Catalog.ColumnNotExistException { + if (!newFields.removeIf(f -> f.name().equals(fieldName))) { + throw new Catalog.ColumnNotExistException( + identifierFromPath(tableRoot.toString(), true, branch), + String.join(".", drop.fieldNames())); + } + if (newFields.isEmpty()) { + throw new IllegalArgumentException( + "Cannot drop all fields in table"); + } + } + }.updateIntermediateColumn(newFields, 0); } else if (change instanceof UpdateColumnType) { UpdateColumnType update = (UpdateColumnType) change; if (oldTableSchema.partitionKeys().contains(update.fieldName())) { @@ -356,9 +369,9 @@ public TableSchema commitChanges(List changes) "Cannot update partition column [%s] type in the table[%s].", update.fieldName(), tableRoot.getName())); } - updateColumn( + updateNestedColumn( newFields, - update.fieldName(), + new String[] {update.fieldName()}, (field) -> { DataType targetType = update.newDataType(); if (update.keepNullability()) { @@ -392,7 +405,6 @@ public TableSchema commitChanges(List changes) updateNestedColumn( newFields, update.fieldNames(), - 0, (field) -> new DataField( field.id(), @@ -404,7 +416,6 @@ public TableSchema commitChanges(List changes) updateNestedColumn( newFields, update.fieldNames(), - 0, (field) -> new DataField( field.id(), @@ -569,74 +580,96 @@ private static List applyColumnRename( .collect(Collectors.toList()); } - private static void columnChangeValidation(TableSchema schema, SchemaChange change) { - /// TODO support partition and primary keys schema evolution - if (change instanceof DropColumn) { - String columnToDrop = ((DropColumn) change).fieldName(); - if (schema.partitionKeys().contains(columnToDrop) - || schema.primaryKeys().contains(columnToDrop)) { - throw new UnsupportedOperationException( - String.format( - "Cannot drop partition key or primary key: [%s]", columnToDrop)); - } - } else if (change instanceof RenameColumn) { - String columnToRename = ((RenameColumn) change).fieldName(); - if (schema.partitionKeys().contains(columnToRename)) { - throw new UnsupportedOperationException( - String.format("Cannot rename partition column: [%s]", columnToRename)); - } - } else { - throw new IllegalArgumentException( - String.format( - "Validation for %s is not supported", - change.getClass().getSimpleName())); + private static void dropColumnValidation(TableSchema schema, DropColumn change) { + // primary keys and partition keys can't be nested columns + if (change.fieldNames().size() > 1) { + return; + } + String columnToDrop = change.fieldNames().get(0); + if (schema.partitionKeys().contains(columnToDrop) + || schema.primaryKeys().contains(columnToDrop)) { + throw new UnsupportedOperationException( + String.format("Cannot drop partition key or primary key: [%s]", columnToDrop)); } } - /** This method is hacky, newFields may be immutable. We should use {@link DataTypeVisitor}. */ - private void updateNestedColumn( - List newFields, - String[] updateFieldNames, - int index, - Function updateFunc) - throws Catalog.ColumnNotExistException { - boolean found = false; - for (int i = 0; i < newFields.size(); i++) { - DataField field = newFields.get(i); - if (field.name().equals(updateFieldNames[index])) { - found = true; - if (index == updateFieldNames.length - 1) { - newFields.set(i, updateFunc.apply(field)); - break; - } else { - List nestedFields = - new ArrayList<>( - ((org.apache.paimon.types.RowType) field.type()).getFields()); - updateNestedColumn(nestedFields, updateFieldNames, index + 1, updateFunc); - newFields.set( - i, - new DataField( - field.id(), - field.name(), - new org.apache.paimon.types.RowType( - field.type().isNullable(), nestedFields), - field.description())); + private static void renameColumnValidation(TableSchema schema, RenameColumn change) { + String columnToRename = change.fieldName(); + if (schema.partitionKeys().contains(columnToRename)) { + throw new UnsupportedOperationException( + String.format("Cannot rename partition column: [%s]", columnToRename)); + } + } + + private abstract class NestedColumnModifier { + + private final String[] updateFieldNames; + + private NestedColumnModifier(String[] updateFieldNames) { + this.updateFieldNames = updateFieldNames; + } + + public void updateIntermediateColumn(List newFields, int depth) + throws Catalog.ColumnNotExistException, E { + if (depth == updateFieldNames.length - 1) { + updateLastColumn(newFields, updateFieldNames[depth]); + return; + } + + for (int i = 0; i < newFields.size(); i++) { + DataField field = newFields.get(i); + if (!field.name().equals(updateFieldNames[depth])) { + continue; } + + List nestedFields = + new ArrayList<>( + ((org.apache.paimon.types.RowType) field.type()).getFields()); + updateIntermediateColumn(nestedFields, depth + 1); + newFields.set( + i, + new DataField( + field.id(), + field.name(), + new org.apache.paimon.types.RowType( + field.type().isNullable(), nestedFields), + field.description())); + return; } - } - if (!found) { + throw new Catalog.ColumnNotExistException( identifierFromPath(tableRoot.toString(), true, branch), - Arrays.toString(updateFieldNames)); + String.join(".", Arrays.asList(updateFieldNames).subList(0, depth + 1))); } + + protected abstract void updateLastColumn(List newFields, String fieldName) + throws E; } - private void updateColumn( + private void updateNestedColumn( List newFields, - String updateFieldName, + String[] updateFieldNames, Function updateFunc) throws Catalog.ColumnNotExistException { - updateNestedColumn(newFields, new String[] {updateFieldName}, 0, updateFunc); + new NestedColumnModifier(updateFieldNames) { + @Override + protected void updateLastColumn(List newFields, String fieldName) + throws Catalog.ColumnNotExistException { + for (int i = 0; i < newFields.size(); i++) { + DataField field = newFields.get(i); + if (!field.name().equals(fieldName)) { + continue; + } + + newFields.set(i, updateFunc.apply(field)); + return; + } + + throw new Catalog.ColumnNotExistException( + identifierFromPath(tableRoot.toString(), true, branch), + String.join(".", updateFieldNames)); + } + }.updateIntermediateColumn(newFields, 0); } @VisibleForTesting diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index dbeedcfe5b9c..643e1372b614 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -541,7 +541,7 @@ public void testAlterTableRenameColumn() throws Exception { .satisfies( anyCauseMatches( Catalog.ColumnNotExistException.class, - "Column [non_existing_col] does not exist in the test_db.test_table table.")); + "Column non_existing_col does not exist in the test_db.test_table table.")); } @Test @@ -647,7 +647,7 @@ public void testAlterTableUpdateColumnType() throws Exception { .satisfies( anyCauseMatches( Catalog.ColumnNotExistException.class, - "Column [non_existing_col] does not exist in the test_db.test_table table.")); + "Column non_existing_col does not exist in the test_db.test_table table.")); // Alter table update a column type throws Exception when column is partition columns assertThatThrownBy( () -> @@ -718,7 +718,7 @@ public void testAlterTableUpdateColumnComment() throws Exception { .satisfies( anyCauseMatches( Catalog.ColumnNotExistException.class, - "Column [non_existing_col] does not exist in the test_db.test_table table.")); + "Column non_existing_col does not exist in the test_db.test_table table.")); } @Test @@ -774,7 +774,7 @@ public void testAlterTableUpdateColumnNullability() throws Exception { .satisfies( anyCauseMatches( Catalog.ColumnNotExistException.class, - "Column [non_existing_col] does not exist in the test_db.test_table table.")); + "Column non_existing_col does not exist in the test_db.test_table table.")); // Alter table update a column nullability throws Exception when column is pk columns assertThatThrownBy( diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 4bd965268f00..1a175de24e3a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -65,6 +65,7 @@ import static org.apache.paimon.utils.FailingFileIO.retryArtificialException; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link SchemaManager}. */ @@ -527,4 +528,82 @@ public void testAlterImmutableOptionsOnEmptyTable() throws Exception { .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Change 'merge-engine' is not supported yet."); } + + @Test + public void testAddAndDropNestedColumns() throws Exception { + RowType innerType = + RowType.of( + new DataField(4, "f1", DataTypes.INT()), + new DataField(5, "f2", DataTypes.BIGINT())); + RowType middleType = + RowType.of( + new DataField(2, "f1", DataTypes.STRING()), + new DataField(3, "f2", innerType)); + RowType outerType = + RowType.of( + new DataField(0, "k", DataTypes.INT()), new DataField(1, "v", middleType)); + + Schema schema = + new Schema( + outerType.getFields(), + Collections.singletonList("k"), + Collections.emptyList(), + new HashMap<>(), + ""); + SchemaManager manager = new SchemaManager(LocalFileIO.create(), path); + manager.createTable(schema); + + SchemaChange addColumn = + SchemaChange.addColumn( + Arrays.asList("v", "f2", "f3"), + DataTypes.STRING(), + "", + SchemaChange.Move.after("f3", "f1")); + manager.commitChanges(addColumn); + + innerType = + RowType.of( + new DataField(4, "f1", DataTypes.INT()), + new DataField(6, "f3", DataTypes.STRING(), ""), + new DataField(5, "f2", DataTypes.BIGINT())); + middleType = + RowType.of( + new DataField(2, "f1", DataTypes.STRING()), + new DataField(3, "f2", innerType)); + outerType = + RowType.of( + new DataField(0, "k", DataTypes.INT()), new DataField(1, "v", middleType)); + assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType); + + assertThatCode(() -> manager.commitChanges(addColumn)) + .hasMessageContaining("Column v.f2.f3 already exists"); + SchemaChange middleColumnNotExistAddColumn = + SchemaChange.addColumn( + Arrays.asList("v", "invalid", "f4"), DataTypes.STRING(), "", null); + assertThatCode(() -> manager.commitChanges(middleColumnNotExistAddColumn)) + .hasMessageContaining("Column v.invalid does not exist"); + + SchemaChange dropColumn = SchemaChange.dropColumn(Arrays.asList("v", "f2", "f1")); + manager.commitChanges(dropColumn); + + innerType = + RowType.of( + new DataField(6, "f3", DataTypes.STRING(), ""), + new DataField(5, "f2", DataTypes.BIGINT())); + middleType = + RowType.of( + new DataField(2, "f1", DataTypes.STRING()), + new DataField(3, "f2", innerType)); + outerType = + RowType.of( + new DataField(0, "k", DataTypes.INT()), new DataField(1, "v", middleType)); + assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType); + + assertThatCode(() -> manager.commitChanges(dropColumn)) + .hasMessageContaining("Column v.f2.f1 does not exist"); + SchemaChange middleColumnNotExistDropColumn = + SchemaChange.dropColumn(Arrays.asList("v", "invalid", "f2")); + assertThatCode(() -> manager.commitChanges(middleColumnNotExistDropColumn)) + .hasMessageContaining("Column v.invalid does not exist"); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index b500da8f19f2..2ac1d032cf76 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -371,10 +371,9 @@ private SchemaChange toSchemaChange(TableChange change) { } } else if (change instanceof TableChange.AddColumn) { TableChange.AddColumn add = (TableChange.AddColumn) change; - validateAlterNestedField(add.fieldNames()); SchemaChange.Move move = getMove(add.position(), add.fieldNames()); return SchemaChange.addColumn( - add.fieldNames()[0], + Arrays.asList(add.fieldNames()), toPaimonType(add.dataType()).copy(add.isNullable()), add.comment(), move); @@ -384,8 +383,7 @@ private SchemaChange toSchemaChange(TableChange change) { return SchemaChange.renameColumn(rename.fieldNames()[0], rename.newName()); } else if (change instanceof TableChange.DeleteColumn) { TableChange.DeleteColumn delete = (TableChange.DeleteColumn) change; - validateAlterNestedField(delete.fieldNames()); - return SchemaChange.dropColumn(delete.fieldNames()[0]); + return SchemaChange.dropColumn(Arrays.asList(delete.fieldNames())); } else if (change instanceof TableChange.UpdateColumnType) { TableChange.UpdateColumnType update = (TableChange.UpdateColumnType) change; validateAlterNestedField(update.fieldNames()); diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java index 32c3498a7cc9..b4565447c6fc 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java @@ -456,6 +456,11 @@ public void testReadNestedColumnTable() { "INSERT INTO paimon.default." + tableName + " VALUES (2, STRUCT(20, STRUCT('banana', 200)))"); + assertThat( + spark.sql("SELECT v.f2.f1, k FROM paimon.default." + tableName) + .collectAsList().stream() + .map(Row::toString)) + .containsExactlyInAnyOrder("[apple,1]", "[banana,2]"); spark.sql( "INSERT INTO paimon.default." + tableName diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java index 9d958931cac3..e876a002736d 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java @@ -23,6 +23,8 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.HashMap; import java.util.List; @@ -705,4 +707,85 @@ private List getFieldStatsList(List fieldStatsRows) { ",")) .collect(Collectors.toList()); } + + @ParameterizedTest() + @ValueSource(strings = {"orc", "avro", "parquet"}) + public void testAddAndDropNestedColumn(String formatType) { + String tableName = "testAddNestedColumnTable"; + spark.sql( + "CREATE TABLE paimon.default." + + tableName + + " (k INT NOT NULL, v STRUCT>) " + + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = '" + + formatType + + "')"); + spark.sql( + "INSERT INTO paimon.default." + + tableName + + " VALUES (1, STRUCT(10, STRUCT('apple', 100))), (2, STRUCT(20, STRUCT('banana', 200)))"); + assertThat( + spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList() + .stream() + .map(Row::toString)) + .containsExactlyInAnyOrder("[1,[10,[apple,100]]]", "[2,[20,[banana,200]]]"); + assertThat( + spark.sql("SELECT v.f2.f1, k FROM paimon.default." + tableName) + .collectAsList().stream() + .map(Row::toString)) + .containsExactlyInAnyOrder("[apple,1]", "[banana,2]"); + + spark.sql("ALTER TABLE paimon.default." + tableName + " ADD COLUMN v.f3 STRING"); + spark.sql("ALTER TABLE paimon.default." + tableName + " ADD COLUMN v.f2.f3 BIGINT"); + spark.sql( + "INSERT INTO paimon.default." + + tableName + + " VALUES (1, STRUCT(11, STRUCT('APPLE', 101, 1001), 'one')), (3, STRUCT(31, STRUCT('CHERRY', 301, 3001), 'three'))"); + assertThat( + spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList() + .stream() + .map(Row::toString)) + .containsExactlyInAnyOrder( + "[1,[11,[APPLE,101,1001],one]]", + "[2,[20,[banana,200,null],null]]", + "[3,[31,[CHERRY,301,3001],three]]"); + assertThat( + spark.sql("SELECT v.f2.f2, v.f3, k FROM paimon.default." + tableName) + .collectAsList().stream() + .map(Row::toString)) + .containsExactlyInAnyOrder("[101,one,1]", "[200,null,2]", "[301,three,3]"); + + spark.sql("ALTER TABLE paimon.default." + tableName + " DROP COLUMN v.f2.f1"); + spark.sql( + "INSERT INTO paimon.default." + + tableName + + " VALUES (1, STRUCT(12, STRUCT(102, 1002), 'one')), (4, STRUCT(42, STRUCT(402, 4002), 'four'))"); + assertThat( + spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList() + .stream() + .map(Row::toString)) + .containsExactlyInAnyOrder( + "[1,[12,[102,1002],one]]", + "[2,[20,[200,null],null]]", + "[3,[31,[301,3001],three]]", + "[4,[42,[402,4002],four]]"); + + spark.sql( + "ALTER TABLE paimon.default." + + tableName + + " ADD COLUMN v.f2.f1 DECIMAL(5, 2) AFTER f2"); + spark.sql( + "INSERT INTO paimon.default." + + tableName + + " VALUES (1, STRUCT(13, STRUCT(103, 100.03, 1003), 'one')), (5, STRUCT(53, STRUCT(503, 500.03, 5003), 'five'))"); + assertThat( + spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList() + .stream() + .map(Row::toString)) + .containsExactlyInAnyOrder( + "[1,[13,[103,100.03,1003],one]]", + "[2,[20,[200,null,null],null]]", + "[3,[31,[301,null,3001],three]]", + "[4,[42,[402,null,4002],four]]", + "[5,[53,[503,500.03,5003],five]]"); + } }