From 97b9b33980deeca8389379f8893a569e59c07955 Mon Sep 17 00:00:00 2001 From: Yujiang Zhong <42907416+zhongyujiang@users.noreply.github.com> Date: Wed, 30 Oct 2024 18:57:58 +0800 Subject: [PATCH] [core] support rename primary key columns and bucket key columns (#3809) --- .../apache/paimon/schema/SchemaManager.java | 79 ++++++++++++++++--- .../paimon/table/SchemaEvolutionTest.java | 52 +++++++++++- .../spark/SparkSchemaEvolutionITCase.java | 71 ++++++++++++++++- 3 files changed, 185 insertions(+), 17 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 962cba952657..61dc305bc037 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -46,6 +46,11 @@ import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.StringUtils; + +import org.apache.paimon.shade.guava30.com.google.common.base.Joiner; +import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables; +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -68,6 +73,8 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; +import static org.apache.paimon.CoreOptions.BUCKET_KEY; +import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX; import static org.apache.paimon.catalog.Catalog.DB_SUFFIX; import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; @@ -305,7 +312,7 @@ public TableSchema commitChanges(List changes) } else if (change instanceof RenameColumn) { RenameColumn rename = (RenameColumn) change; - validateNotPrimaryAndPartitionKey(oldTableSchema, rename.fieldName()); + columnChangeValidation(oldTableSchema, change); if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) { throw new Catalog.ColumnAlreadyExistException( identifierFromPath(tableRoot.toString(), true, branch), @@ -324,7 +331,7 @@ public TableSchema commitChanges(List changes) field.description())); } else if (change instanceof DropColumn) { DropColumn drop = (DropColumn) change; - validateNotPrimaryAndPartitionKey(oldTableSchema, drop.fieldName()); + columnChangeValidation(oldTableSchema, change); if (!newFields.removeIf( f -> f.name().equals(((DropColumn) change).fieldName()))) { throw new Catalog.ColumnNotExistException( @@ -413,8 +420,10 @@ public TableSchema commitChanges(List changes) new Schema( newFields, oldTableSchema.partitionKeys(), - oldTableSchema.primaryKeys(), - newOptions, + applyColumnRename( + oldTableSchema.primaryKeys(), + Iterables.filter(changes, RenameColumn.class)), + applySchemaChanges(newOptions, changes), newComment); TableSchema newTableSchema = new TableSchema( @@ -519,15 +528,61 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) { } } - private void validateNotPrimaryAndPartitionKey(TableSchema schema, String fieldName) { - /// TODO support partition and primary keys schema evolution - if (schema.partitionKeys().contains(fieldName)) { - throw new UnsupportedOperationException( - String.format("Cannot drop/rename partition key[%s]", fieldName)); + private static Map applySchemaChanges( + Map options, Iterable changes) { + Map newOptions = Maps.newHashMap(options); + String bucketKeysStr = options.get(BUCKET_KEY.key()); + if (!StringUtils.isNullOrWhitespaceOnly(bucketKeysStr)) { + List bucketColumns = Arrays.asList(bucketKeysStr.split(",")); + List newBucketColumns = + applyColumnRename(bucketColumns, Iterables.filter(changes, RenameColumn.class)); + newOptions.put(BUCKET_KEY.key(), Joiner.on(',').join(newBucketColumns)); } - if (schema.primaryKeys().contains(fieldName)) { - throw new UnsupportedOperationException( - String.format("Cannot drop/rename primary key[%s]", fieldName)); + + // TODO: Apply changes to other options that contain column names, such as `sequence.field` + return newOptions; + } + + // Apply column rename changes to the list of column names, this will not change the order of + // the column names + private static List applyColumnRename( + List columns, Iterable renames) { + if (Iterables.isEmpty(renames)) { + return columns; + } + + Map columnNames = Maps.newHashMap(); + for (RenameColumn renameColumn : renames) { + columnNames.put(renameColumn.fieldName(), renameColumn.newName()); + } + + // The order of the column names will be preserved, as a non-parallel stream is used here. + return columns.stream() + .map(column -> columnNames.getOrDefault(column, column)) + .collect(Collectors.toList()); + } + + private static void columnChangeValidation(TableSchema schema, SchemaChange change) { + /// TODO support partition and primary keys schema evolution + if (change instanceof DropColumn) { + String columnToDrop = ((DropColumn) change).fieldName(); + if (schema.partitionKeys().contains(columnToDrop) + || schema.primaryKeys().contains(columnToDrop)) { + throw new UnsupportedOperationException( + String.format( + "Cannot drop partition key or primary key: [%s]", columnToDrop)); + } + } else if (change instanceof RenameColumn) { + String columnToRename = ((RenameColumn) change).fieldName(); + if (schema.partitionKeys().contains(columnToRename)) { + throw new UnsupportedOperationException( + String.format("Cannot rename partition column: [%s]", columnToRename)); + } + } else { + throw new IllegalArgumentException( + String.format( + "Validation for %s is not supported", + change.getClass().getSimpleName())); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index ad5c61779818..44c3beea7c0a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -41,7 +41,10 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -353,6 +356,51 @@ public void testRenameField() throws Exception { "f0", identifier.getFullName())); } + @Test + public void testRenamePrimaryKeyColumn() throws Exception { + Schema schema = + new Schema( + RowType.of(DataTypes.INT(), DataTypes.BIGINT()).getFields(), + Lists.newArrayList("f0"), + Lists.newArrayList("f0", "f1"), + Maps.newHashMap(), + ""); + + schemaManager.createTable(schema); + assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f0", "f1"); + + schemaManager.commitChanges(SchemaChange.renameColumn("f1", "f1_")); + TableSchema newSchema = schemaManager.latest().get(); + assertThat(newSchema.fieldNames()).containsExactly("f0", "f1_"); + assertThat(newSchema.primaryKeys()).containsExactlyInAnyOrder("f0", "f1_"); + + assertThatThrownBy( + () -> schemaManager.commitChanges(SchemaChange.renameColumn("f0", "f0_"))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot rename partition column: [f0]"); + } + + @Test + public void testRenameBucketKeyColumn() throws Exception { + Schema schema = + new Schema( + RowType.of(DataTypes.INT(), DataTypes.BIGINT()).getFields(), + ImmutableList.of(), + Lists.newArrayList("f0", "f1"), + ImmutableMap.of( + CoreOptions.BUCKET_KEY.key(), + "f1,f0", + CoreOptions.BUCKET.key(), + "16"), + ""); + + schemaManager.createTable(schema); + schemaManager.commitChanges(SchemaChange.renameColumn("f0", "f0_")); + TableSchema newSchema = schemaManager.latest().get(); + + assertThat(newSchema.options().get(CoreOptions.BUCKET_KEY.key())).isEqualTo("f1,f0_"); + } + @Test public void testDropField() throws Exception { Schema schema = @@ -379,14 +427,14 @@ public void testDropField() throws Exception { schemaManager.commitChanges( Collections.singletonList(SchemaChange.dropColumn("f0")))) .isInstanceOf(UnsupportedOperationException.class) - .hasMessage(String.format("Cannot drop/rename partition key[%s]", "f0")); + .hasMessage(String.format("Cannot drop partition key or primary key: [%s]", "f0")); assertThatThrownBy( () -> schemaManager.commitChanges( Collections.singletonList(SchemaChange.dropColumn("f2")))) .isInstanceOf(UnsupportedOperationException.class) - .hasMessage(String.format("Cannot drop/rename primary key[%s]", "f2")); + .hasMessage(String.format("Cannot drop partition key or primary key: [%s]", "f2")); } @Test diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java index 7d94e7d5df73..9d958931cac3 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java @@ -203,7 +203,7 @@ public void testRenamePartitionKey() { .satisfies( anyCauseMatches( UnsupportedOperationException.class, - "Cannot drop/rename partition key[a]")); + "Cannot rename partition column: [a]")); } @Test @@ -254,7 +254,7 @@ public void testDropPartitionKey() { .satisfies( anyCauseMatches( UnsupportedOperationException.class, - "Cannot drop/rename partition key[a]")); + "Cannot drop partition key or primary key: [a]")); } @Test @@ -276,7 +276,72 @@ public void testDropPrimaryKey() { .satisfies( anyCauseMatches( UnsupportedOperationException.class, - "Cannot drop/rename primary key[b]")); + "Cannot drop partition key or primary key: [b]")); + } + + @Test + public void testRenamePrimaryKey() { + spark.sql( + "CREATE TABLE test_rename_primary_key_table (\n" + + "a BIGINT NOT NULL,\n" + + "b STRING)\n" + + "TBLPROPERTIES ('primary-key' = 'a')"); + + spark.sql("INSERT INTO test_rename_primary_key_table VALUES(1, 'aaa'), (2, 'bbb')"); + + spark.sql("ALTER TABLE test_rename_primary_key_table RENAME COLUMN a to a_"); + + List result = + spark.sql("SHOW CREATE TABLE test_rename_primary_key_table").collectAsList(); + assertThat(result.toString()) + .contains( + showCreateString( + "test_rename_primary_key_table", "a_ BIGINT NOT NULL", "b STRING")) + .contains("'primary-key' = 'a_'"); + + List actual = + spark.sql("SELECT * FROM test_rename_primary_key_table").collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList()); + + assertThat(actual).containsExactlyInAnyOrder("[1,aaa]", "[2,bbb]"); + + spark.sql("INSERT INTO test_rename_primary_key_table VALUES(1, 'AAA'), (2, 'BBB')"); + + actual = + spark.sql("SELECT * FROM test_rename_primary_key_table").collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList()); + assertThat(actual).containsExactlyInAnyOrder("[1,AAA]", "[2,BBB]"); + } + + @Test + public void testRenameBucketKey() { + spark.sql( + "CREATE TABLE test_rename_bucket_key_table (\n" + + "a BIGINT NOT NULL,\n" + + "b STRING)\n" + + "TBLPROPERTIES ('bucket-key' = 'a,b', 'bucket'='16')"); + + spark.sql("INSERT INTO test_rename_bucket_key_table VALUES(1, 'aaa'), (2, 'bbb')"); + + spark.sql("ALTER TABLE test_rename_bucket_key_table RENAME COLUMN b to b_"); + + List result = + spark.sql("SHOW CREATE TABLE test_rename_bucket_key_table").collectAsList(); + assertThat(result.toString()) + .contains( + showCreateString( + "test_rename_bucket_key_table", "a BIGINT NOT NULL", "b_ STRING")) + .contains("'bucket-key' = 'a,b_'"); + + List actual = + spark.sql("SELECT * FROM test_rename_bucket_key_table where b_ = 'bbb'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList()); + + assertThat(actual).containsExactlyInAnyOrder("[2,bbb]"); } @Test