diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java index 1b4c58e30de9..7e94b0a776c2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java @@ -67,7 +67,11 @@ static SchemaChange addColumn( } static SchemaChange renameColumn(String fieldName, String newName) { - return new RenameColumn(fieldName, newName); + return new RenameColumn(Collections.singletonList(fieldName), newName); + } + + static SchemaChange renameColumn(List fieldNames, String newName) { + return new RenameColumn(fieldNames, newName); } static SchemaChange dropColumn(String fieldName) { @@ -278,16 +282,16 @@ final class RenameColumn implements SchemaChange { private static final long serialVersionUID = 1L; - private final String fieldName; + private final List fieldNames; private final String newName; - private RenameColumn(String fieldName, String newName) { - this.fieldName = fieldName; + private RenameColumn(List fieldNames, String newName) { + this.fieldNames = fieldNames; this.newName = newName; } - public String fieldName() { - return fieldName; + public List fieldNames() { + return fieldNames; } public String newName() { @@ -303,14 +307,14 @@ public boolean equals(Object o) { return false; } RenameColumn that = (RenameColumn) o; - return Objects.equals(fieldName, that.fieldName) + return Objects.equals(fieldNames, that.fieldNames) && Objects.equals(newName, that.newName); } @Override public int hashCode() { int result = Objects.hash(newName); - result = 31 * result + Objects.hashCode(fieldName); + result = 31 * result + Objects.hashCode(fieldNames); return result; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 6b4127ceebee..5ffeca65d0c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -290,18 +290,11 @@ public TableSchema commitChanges(List changes) DataType dataType = ReassignFieldId.reassign(addColumn.dataType(), highestFieldId); - new NestedColumnModifier( - addColumn.fieldNames().toArray(new String[0])) { + new NestedColumnModifier(addColumn.fieldNames().toArray(new String[0])) { @Override protected void updateLastColumn(List newFields, String fieldName) throws Catalog.ColumnAlreadyExistException { - for (DataField field : newFields) { - if (field.name().equals(fieldName)) { - throw new Catalog.ColumnAlreadyExistException( - identifierFromPath(tableRoot.toString(), true, branch), - String.join(".", addColumn.fieldNames())); - } - } + assertColumnNotExists(newFields, fieldName); DataField dataField = new DataField(id, fieldName, dataType, addColumn.description()); @@ -327,34 +320,39 @@ protected void updateLastColumn(List newFields, String fieldName) } else if (change instanceof RenameColumn) { RenameColumn rename = (RenameColumn) change; renameColumnValidation(oldTableSchema, rename); - if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) { - throw new Catalog.ColumnAlreadyExistException( - identifierFromPath(tableRoot.toString(), true, branch), - rename.fieldName()); - } + new NestedColumnModifier(rename.fieldNames().toArray(new String[0])) { + @Override + protected void updateLastColumn(List newFields, String fieldName) + throws Catalog.ColumnNotExistException, + Catalog.ColumnAlreadyExistException { + assertColumnExists(newFields, fieldName); + assertColumnNotExists(newFields, rename.newName()); + for (int i = 0; i < newFields.size(); i++) { + DataField field = newFields.get(i); + if (!field.name().equals(fieldName)) { + continue; + } - updateNestedColumn( - newFields, - new String[] {rename.fieldName()}, - (field) -> - new DataField( - field.id(), - rename.newName(), - field.type(), - field.description())); + DataField newField = + new DataField( + field.id(), + rename.newName(), + field.type(), + field.description()); + newFields.set(i, newField); + return; + } + } + }.updateIntermediateColumn(newFields, 0); } else if (change instanceof DropColumn) { DropColumn drop = (DropColumn) change; dropColumnValidation(oldTableSchema, drop); - new NestedColumnModifier( - drop.fieldNames().toArray(new String[0])) { + new NestedColumnModifier(drop.fieldNames().toArray(new String[0])) { @Override protected void updateLastColumn(List newFields, String fieldName) throws Catalog.ColumnNotExistException { - if (!newFields.removeIf(f -> f.name().equals(fieldName))) { - throw new Catalog.ColumnNotExistException( - identifierFromPath(tableRoot.toString(), true, branch), - String.join(".", drop.fieldNames())); - } + assertColumnExists(newFields, fieldName); + newFields.removeIf(f -> f.name().equals(fieldName)); if (newFields.isEmpty()) { throw new IllegalArgumentException( "Cannot drop all fields in table"); @@ -438,7 +436,7 @@ protected void updateLastColumn(List newFields, String fieldName) new Schema( newFields, oldTableSchema.partitionKeys(), - applyColumnRename( + applyNotNestedColumnRename( oldTableSchema.primaryKeys(), Iterables.filter(changes, RenameColumn.class)), applySchemaChanges(newOptions, changes), @@ -553,7 +551,8 @@ private static Map applySchemaChanges( if (!StringUtils.isNullOrWhitespaceOnly(bucketKeysStr)) { List bucketColumns = Arrays.asList(bucketKeysStr.split(",")); List newBucketColumns = - applyColumnRename(bucketColumns, Iterables.filter(changes, RenameColumn.class)); + applyNotNestedColumnRename( + bucketColumns, Iterables.filter(changes, RenameColumn.class)); newOptions.put(BUCKET_KEY.key(), Joiner.on(',').join(newBucketColumns)); } @@ -561,9 +560,9 @@ private static Map applySchemaChanges( return newOptions; } - // Apply column rename changes to the list of column names, this will not change the order of - // the column names - private static List applyColumnRename( + // Apply column rename changes on not nested columns to the list of column names, this will not + // change the order of the column names + private static List applyNotNestedColumnRename( List columns, Iterable renames) { if (Iterables.isEmpty(renames)) { return columns; @@ -571,7 +570,9 @@ private static List applyColumnRename( Map columnNames = Maps.newHashMap(); for (RenameColumn renameColumn : renames) { - columnNames.put(renameColumn.fieldName(), renameColumn.newName()); + if (renameColumn.fieldNames().size() == 1) { + columnNames.put(renameColumn.fieldNames().get(0), renameColumn.newName()); + } } // The order of the column names will be preserved, as a non-parallel stream is used here. @@ -594,14 +595,18 @@ private static void dropColumnValidation(TableSchema schema, DropColumn change) } private static void renameColumnValidation(TableSchema schema, RenameColumn change) { - String columnToRename = change.fieldName(); + // partition keys can't be nested columns + if (change.fieldNames().size() > 1) { + return; + } + String columnToRename = change.fieldNames().get(0); if (schema.partitionKeys().contains(columnToRename)) { throw new UnsupportedOperationException( String.format("Cannot rename partition column: [%s]", columnToRename)); } } - private abstract class NestedColumnModifier { + private abstract class NestedColumnModifier { private final String[] updateFieldNames; @@ -610,7 +615,7 @@ private NestedColumnModifier(String[] updateFieldNames) { } public void updateIntermediateColumn(List newFields, int depth) - throws Catalog.ColumnNotExistException, E { + throws Catalog.ColumnNotExistException, Catalog.ColumnAlreadyExistException { if (depth == updateFieldNames.length - 1) { updateLastColumn(newFields, updateFieldNames[depth]); return; @@ -643,15 +648,47 @@ public void updateIntermediateColumn(List newFields, int depth) } protected abstract void updateLastColumn(List newFields, String fieldName) - throws E; + throws Catalog.ColumnNotExistException, Catalog.ColumnAlreadyExistException; + + protected void assertColumnExists(List newFields, String fieldName) + throws Catalog.ColumnNotExistException { + for (DataField field : newFields) { + if (field.name().equals(fieldName)) { + return; + } + } + throw new Catalog.ColumnNotExistException( + identifierFromPath(tableRoot.toString(), true, branch), + getLastFieldName(fieldName)); + } + + protected void assertColumnNotExists(List newFields, String fieldName) + throws Catalog.ColumnAlreadyExistException { + for (DataField field : newFields) { + if (field.name().equals(fieldName)) { + throw new Catalog.ColumnAlreadyExistException( + identifierFromPath(tableRoot.toString(), true, branch), + getLastFieldName(fieldName)); + } + } + } + + private String getLastFieldName(String fieldName) { + List fieldNames = new ArrayList<>(); + for (int i = 0; i + 1 < updateFieldNames.length; i++) { + fieldNames.add(updateFieldNames[i]); + } + fieldNames.add(fieldName); + return String.join(".", fieldNames); + } } private void updateNestedColumn( List newFields, String[] updateFieldNames, Function updateFunc) - throws Catalog.ColumnNotExistException { - new NestedColumnModifier(updateFieldNames) { + throws Catalog.ColumnNotExistException, Catalog.ColumnAlreadyExistException { + new NestedColumnModifier(updateFieldNames) { @Override protected void updateLastColumn(List newFields, String fieldName) throws Catalog.ColumnNotExistException { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index bb9b440ce95e..20cbdea66d9a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -515,7 +515,7 @@ private static void validateForDeletionVectors(CoreOptions options) { private static void validateSequenceField(TableSchema schema, CoreOptions options) { List sequenceField = options.sequenceField(); - if (sequenceField.size() > 0) { + if (!sequenceField.isEmpty()) { Map fieldCount = sequenceField.stream() .collect(Collectors.toMap(field -> field, field -> 1, Integer::sum)); @@ -596,12 +596,12 @@ private static void validateBucket(TableSchema schema, CoreOptions options) { == MAP || dataField.type().getTypeRoot() == ROW)) - .map(dataField -> dataField.name()) + .map(DataField::name) .collect(Collectors.toList()); - if (nestedFields.size() > 0) { + if (!nestedFields.isEmpty()) { throw new RuntimeException( "nested type can not in bucket-key, in your table these key are " - + nestedFields.toString()); + + nestedFields); } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index 24eefbcb65d2..3bba6d5624cd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -513,7 +513,9 @@ public void testAlterTableRenameColumn() throws Exception { catalog.createTable( identifier, new Schema( - Lists.newArrayList(new DataField(0, "col1", DataTypes.STRING())), + Lists.newArrayList( + new DataField(0, "col1", DataTypes.STRING()), + new DataField(1, "col2", DataTypes.STRING())), Collections.emptyList(), Collections.emptyList(), Maps.newHashMap(), @@ -525,7 +527,7 @@ public void testAlterTableRenameColumn() throws Exception { false); Table table = catalog.getTable(identifier); - assertThat(table.rowType().getFields()).hasSize(1); + assertThat(table.rowType().getFields()).hasSize(2); assertThat(table.rowType().getFieldIndex("col1")).isLessThan(0); assertThat(table.rowType().getFieldIndex("new_col1")).isEqualTo(0); @@ -536,12 +538,12 @@ public void testAlterTableRenameColumn() throws Exception { catalog.alterTable( identifier, Lists.newArrayList( - SchemaChange.renameColumn("col1", "new_col1")), + SchemaChange.renameColumn("col2", "new_col1")), false)) .satisfies( anyCauseMatches( Catalog.ColumnAlreadyExistException.class, - "Column col1 already exists in the test_db.test_table table.")); + "Column new_col1 already exists in the test_db.test_table table.")); // Alter table renames a column throws ColumnNotExistException when column does not exist assertThatThrownBy( diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 1a175de24e3a..5fb76387eb2b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -606,4 +606,61 @@ public void testAddAndDropNestedColumns() throws Exception { assertThatCode(() -> manager.commitChanges(middleColumnNotExistDropColumn)) .hasMessageContaining("Column v.invalid does not exist"); } + + @Test + public void testRenameNestedColumns() throws Exception { + RowType innerType = + RowType.of( + new DataField(4, "f1", DataTypes.INT()), + new DataField(5, "f2", DataTypes.BIGINT())); + RowType middleType = + RowType.of( + new DataField(2, "f1", DataTypes.STRING()), + new DataField(3, "f2", innerType)); + RowType outerType = + RowType.of( + new DataField(0, "k", DataTypes.INT()), new DataField(1, "v", middleType)); + + Schema schema = + new Schema( + outerType.getFields(), + Collections.singletonList("k"), + Collections.emptyList(), + new HashMap<>(), + ""); + SchemaManager manager = new SchemaManager(LocalFileIO.create(), path); + manager.createTable(schema); + + SchemaChange renameColumn = + SchemaChange.renameColumn(Arrays.asList("v", "f2", "f1"), "f100"); + manager.commitChanges(renameColumn); + + innerType = + RowType.of( + new DataField(4, "f100", DataTypes.INT()), + new DataField(5, "f2", DataTypes.BIGINT())); + middleType = + RowType.of( + new DataField(2, "f1", DataTypes.STRING()), + new DataField(3, "f2", innerType)); + outerType = + RowType.of( + new DataField(0, "k", DataTypes.INT()), new DataField(1, "v", middleType)); + assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType); + + SchemaChange middleColumnNotExistRenameColumn = + SchemaChange.renameColumn(Arrays.asList("v", "invalid", "f2"), "f200"); + assertThatCode(() -> manager.commitChanges(middleColumnNotExistRenameColumn)) + .hasMessageContaining("Column v.invalid does not exist"); + + SchemaChange lastColumnNotExistRenameColumn = + SchemaChange.renameColumn(Arrays.asList("v", "f2", "invalid"), "new_invalid"); + assertThatCode(() -> manager.commitChanges(lastColumnNotExistRenameColumn)) + .hasMessageContaining("Column v.f2.invalid does not exist"); + + SchemaChange newNameAlreadyExistRenameColumn = + SchemaChange.renameColumn(Arrays.asList("v", "f2", "f2"), "f100"); + assertThatCode(() -> manager.commitChanges(newNameAlreadyExistRenameColumn)) + .hasMessageContaining("Column v.f2.f100 already exists"); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index 44c3beea7c0a..951539299cbb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -353,7 +353,7 @@ public void testRenameField() throws Exception { .hasMessage( String.format( "Column %s already exists in the %s table.", - "f0", identifier.getFullName())); + "f1", identifier.getFullName())); } @Test diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 2ac1d032cf76..82c8939eab87 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -379,8 +379,7 @@ private SchemaChange toSchemaChange(TableChange change) { move); } else if (change instanceof TableChange.RenameColumn) { TableChange.RenameColumn rename = (TableChange.RenameColumn) change; - validateAlterNestedField(rename.fieldNames()); - return SchemaChange.renameColumn(rename.fieldNames()[0], rename.newName()); + return SchemaChange.renameColumn(Arrays.asList(rename.fieldNames()), rename.newName()); } else if (change instanceof TableChange.DeleteColumn) { TableChange.DeleteColumn delete = (TableChange.DeleteColumn) change; return SchemaChange.dropColumn(Arrays.asList(delete.fieldNames())); diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java index e876a002736d..ccae59e88675 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java @@ -788,4 +788,33 @@ public void testAddAndDropNestedColumn(String formatType) { "[4,[42,[402,null,4002],four]]", "[5,[53,[503,500.03,5003],five]]"); } + + @ParameterizedTest() + @ValueSource(strings = {"orc", "avro", "parquet"}) + public void testRenameNestedColumn(String formatType) { + String tableName = "testRenameNestedColumnTable"; + spark.sql( + "CREATE TABLE paimon.default." + + tableName + + " (k INT NOT NULL, v STRUCT>) " + + "TBLPROPERTIES ('file.format' = '" + + formatType + + "')"); + spark.sql( + "INSERT INTO paimon.default." + + tableName + + " VALUES (1, STRUCT(10, STRUCT('apple', 100))), (2, STRUCT(20, STRUCT('banana', 200)))"); + assertThat( + spark.sql("SELECT v.f2.f1, k FROM paimon.default." + tableName) + .collectAsList().stream() + .map(Row::toString)) + .containsExactlyInAnyOrder("[apple,1]", "[banana,2]"); + + spark.sql("ALTER TABLE paimon.default." + tableName + " RENAME COLUMN v.f2.f1 to f100"); + assertThat( + spark.sql("SELECT v.f2.f100, k FROM paimon.default." + tableName) + .collectAsList().stream() + .map(Row::toString)) + .containsExactlyInAnyOrder("[apple,1]", "[banana,2]"); + } }