From 36823449f28f3053ed5412760637aed0e17a2076 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Tue, 30 Jul 2024 17:46:00 +0800 Subject: [PATCH] [core] Immutable table options can now be changed on an empty table (#3845) --- .../java/org/apache/paimon/schema/Schema.java | 12 ++- .../apache/paimon/schema/SchemaManager.java | 59 ++++++++----- .../paimon/schema/SchemaManagerTest.java | 86 +++++++++++++++++++ .../paimon/flink/SchemaChangeITCase.java | 26 +++++- .../paimon/flink/SchemaChangeITCase.java | 25 +++++- 5 files changed, 183 insertions(+), 25 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java index b7575837471c..c6c79f4d4afd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java @@ -161,7 +161,11 @@ private List normalizePrimaryKeys(List primaryKeys) { "Cannot define primary key on DDL and table options at the same time."); } String pk = options.get(CoreOptions.PRIMARY_KEY.key()); - primaryKeys = Arrays.asList(pk.split(",")); + primaryKeys = + Arrays.stream(pk.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); options.remove(CoreOptions.PRIMARY_KEY.key()); } return primaryKeys; @@ -174,7 +178,11 @@ private List normalizePartitionKeys(List partitionKeys) { "Cannot define partition on DDL and table options at the same time."); } String partitions = options.get(CoreOptions.PARTITION.key()); - partitionKeys = Arrays.asList(partitions.split(",")); + partitionKeys = + Arrays.stream(partitions.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); options.remove(CoreOptions.PARTITION.key()); } return partitionKeys; diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 4f70ac725e48..684adfe9da72 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -45,6 +45,7 @@ import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.StringUtils; import javax.annotation.Nullable; @@ -108,14 +109,14 @@ public Optional latest() { try { return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX) .reduce(Math::max) - .map(id -> schema(id)); + .map(this::schema); } catch (IOException e) { throw new UncheckedIOException(e); } } public List listAll() { - return listAllIds().stream().map(id -> schema(id)).collect(Collectors.toList()); + return listAllIds().stream().map(this::schema).collect(Collectors.toList()); } /** List all schema IDs. */ @@ -184,24 +185,31 @@ public TableSchema commitChanges(SchemaChange... changes) throws Exception { public TableSchema commitChanges(List changes) throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException { + SnapshotManager snapshotManager = new SnapshotManager(fileIO, tableRoot, branch); + boolean hasSnapshots = (snapshotManager.latestSnapshotId() != null); + while (true) { - TableSchema schema = + TableSchema oldTableSchema = latest().orElseThrow( () -> new Catalog.TableNotExistException( fromPath(branchPath(), true))); - Map newOptions = new HashMap<>(schema.options()); - List newFields = new ArrayList<>(schema.fields()); - AtomicInteger highestFieldId = new AtomicInteger(schema.highestFieldId()); - String newComment = schema.comment(); + Map newOptions = new HashMap<>(oldTableSchema.options()); + List newFields = new ArrayList<>(oldTableSchema.fields()); + AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId()); + String newComment = oldTableSchema.comment(); for (SchemaChange change : changes) { if (change instanceof SetOption) { SetOption setOption = (SetOption) change; - checkAlterTableOption(setOption.key()); + if (hasSnapshots) { + checkAlterTableOption(setOption.key()); + } newOptions.put(setOption.key(), setOption.value()); } else if (change instanceof RemoveOption) { RemoveOption removeOption = (RemoveOption) change; - checkAlterTableOption(removeOption.key()); + if (hasSnapshots) { + checkAlterTableOption(removeOption.key()); + } newOptions.remove(removeOption.key()); } else if (change instanceof UpdateComment) { UpdateComment updateComment = (UpdateComment) change; @@ -245,7 +253,7 @@ public TableSchema commitChanges(List changes) } else if (change instanceof RenameColumn) { RenameColumn rename = (RenameColumn) change; - validateNotPrimaryAndPartitionKey(schema, rename.fieldName()); + validateNotPrimaryAndPartitionKey(oldTableSchema, rename.fieldName()); if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) { throw new Catalog.ColumnAlreadyExistException( fromPath(branchPath(), true), rename.fieldName()); @@ -263,7 +271,7 @@ public TableSchema commitChanges(List changes) field.description())); } else if (change instanceof DropColumn) { DropColumn drop = (DropColumn) change; - validateNotPrimaryAndPartitionKey(schema, drop.fieldName()); + validateNotPrimaryAndPartitionKey(oldTableSchema, drop.fieldName()); if (!newFields.removeIf( f -> f.name().equals(((DropColumn) change).fieldName()))) { throw new Catalog.ColumnNotExistException( @@ -274,7 +282,7 @@ public TableSchema commitChanges(List changes) } } else if (change instanceof UpdateColumnType) { UpdateColumnType update = (UpdateColumnType) change; - if (schema.partitionKeys().contains(update.fieldName())) { + if (oldTableSchema.partitionKeys().contains(update.fieldName())) { throw new IllegalArgumentException( String.format( "Cannot update partition column [%s] type in the table[%s].", @@ -310,7 +318,7 @@ public TableSchema commitChanges(List changes) UpdateColumnNullability update = (UpdateColumnNullability) change; if (update.fieldNames().length == 1 && update.newNullability() - && schema.primaryKeys().contains(update.fieldNames()[0])) { + && oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) { throw new UnsupportedOperationException( "Cannot change nullability of primary key"); } @@ -346,20 +354,29 @@ public TableSchema commitChanges(List changes) } } - TableSchema newSchema = - new TableSchema( - schema.id() + 1, + // We change TableSchema to Schema, because we want to deal with primary-key and + // partition in options. + Schema newSchema = + new Schema( newFields, - highestFieldId.get(), - schema.partitionKeys(), - schema.primaryKeys(), + oldTableSchema.partitionKeys(), + oldTableSchema.primaryKeys(), newOptions, newComment); + TableSchema newTableSchema = + new TableSchema( + oldTableSchema.id() + 1, + newSchema.fields(), + highestFieldId.get(), + newSchema.partitionKeys(), + newSchema.primaryKeys(), + newSchema.options(), + newSchema.comment()); try { - boolean success = commit(newSchema); + boolean success = commit(newTableSchema); if (success) { - return newSchema; + return newTableSchema; } } catch (Exception e) { throw new RuntimeException(e); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 742e2188f2b0..4bd965268f00 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -19,9 +19,18 @@ package org.apache.paimon.schema; import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -441,4 +450,81 @@ public void testMoveBefore() { Assertions.assertEquals( 2, fields.get(0).id(), "The field id should remain as 2 after moving f2 before f0"); } + + @Test + public void testAlterImmutableOptionsOnEmptyTable() throws Exception { + // create table without primary keys + Schema schema = + new Schema( + rowType.getFields(), + Collections.emptyList(), + Collections.emptyList(), + options, + ""); + Path tableRoot = new Path(tempDir.toString(), "table"); + SchemaManager manager = new SchemaManager(LocalFileIO.create(), tableRoot); + manager.createTable(schema); + + // set immutable options and set primary keys + manager.commitChanges( + SchemaChange.setOption("primary-key", "f0, f1"), + SchemaChange.setOption("partition", "f0"), + SchemaChange.setOption("bucket", "2"), + SchemaChange.setOption("merge-engine", "first-row")); + + FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tableRoot); + assertThat(table.schema().partitionKeys()).containsExactly("f0"); + assertThat(table.schema().primaryKeys()).containsExactly("f0", "f1"); + + // read and write data to check that table is really a primary key table with first-row + // merge engine + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = + table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io")); + TableCommitImpl commit = table.newCommit(commitUser); + write.write(GenericRow.of(1, 10L, BinaryString.fromString("apple"))); + write.write(GenericRow.of(1, 20L, BinaryString.fromString("banana"))); + write.write(GenericRow.of(2, 10L, BinaryString.fromString("cat"))); + write.write(GenericRow.of(2, 20L, BinaryString.fromString("dog"))); + commit.commit(1, write.prepareCommit(false, 1)); + write.write(GenericRow.of(1, 20L, BinaryString.fromString("peach"))); + write.write(GenericRow.of(1, 30L, BinaryString.fromString("mango"))); + write.write(GenericRow.of(2, 20L, BinaryString.fromString("tiger"))); + write.write(GenericRow.of(2, 30L, BinaryString.fromString("wolf"))); + commit.commit(2, write.prepareCommit(false, 2)); + write.close(); + commit.close(); + + List actual = new ArrayList<>(); + try (RecordReaderIterator it = + new RecordReaderIterator<>( + table.newRead().createReader(table.newSnapshotReader().read()))) { + while (it.hasNext()) { + InternalRow row = it.next(); + actual.add( + String.format( + "%s %d %d %s", + row.getRowKind().shortString(), + row.getInt(0), + row.getLong(1), + row.getString(2))); + } + } + assertThat(actual) + .containsExactlyInAnyOrder( + "+I 1 10 apple", + "+I 1 20 banana", + "+I 1 30 mango", + "+I 2 10 cat", + "+I 2 20 dog", + "+I 2 30 wolf"); + + // now that table is not empty, we cannot change immutable options + assertThatThrownBy( + () -> + manager.commitChanges( + SchemaChange.setOption("merge-engine", "deduplicate"))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Change 'merge-engine' is not supported yet."); + } } diff --git a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index adaa5b28c4e8..19ad41cae5ac 100644 --- a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink; +import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; import java.util.Map; @@ -44,9 +45,29 @@ public void testSetAndRemoveOption() throws Exception { } @Test - public void testSetAndResetImmutableOptions() { + public void testSetAndResetImmutableOptionsOnEmptyTables() { + sql("CREATE TABLE T1 (a INT, b INT)"); + sql( + "ALTER TABLE T1 SET ('primary-key' = 'a', 'bucket' = '1', 'merge-engine' = 'first-row')"); + sql("INSERT INTO T1 VALUES (1, 10), (2, 20), (1, 11), (2, 21)"); + assertThat(queryAndSort("SELECT * FROM T1")).containsExactly(Row.of(1, 10), Row.of(2, 20)); + assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('merge-engine' = 'deduplicate')")) + .rootCause() + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Change 'merge-engine' is not supported yet."); + + sql( + "CREATE TABLE T2 (a INT, b INT, PRIMARY KEY (a) NOT ENFORCED) WITH ('bucket' = '1', 'merge-engine' = 'first-row')"); + sql("ALTER TABLE T2 RESET ('merge-engine')"); + sql("INSERT INTO T2 VALUES (1, 10), (2, 20), (1, 11), (2, 21)"); + assertThat(queryAndSort("SELECT * FROM T2")).containsExactly(Row.of(1, 11), Row.of(2, 21)); + } + + @Test + public void testSetAndResetImmutableOptionsOnNonEmptyTables() { // bucket-key is immutable sql("CREATE TABLE T1 (a STRING, b STRING, c STRING)"); + sql("INSERT INTO T1 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('bucket-key' = 'c')")) .rootCause() @@ -55,6 +76,7 @@ public void testSetAndResetImmutableOptions() { sql( "CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'c')"); + sql("INSERT INTO T2 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')")) .rootCause() .isInstanceOf(UnsupportedOperationException.class) @@ -63,6 +85,7 @@ public void testSetAndResetImmutableOptions() { // merge-engine is immutable sql( "CREATE TABLE T4 (a STRING, b STRING, c STRING) WITH ('merge-engine' = 'partial-update')"); + sql("INSERT INTO T4 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T4 RESET ('merge-engine')")) .rootCause() .isInstanceOf(UnsupportedOperationException.class) @@ -70,6 +93,7 @@ public void testSetAndResetImmutableOptions() { // sequence.field is immutable sql("CREATE TABLE T5 (a STRING, b STRING, c STRING) WITH ('sequence.field' = 'b')"); + sql("INSERT INTO T5 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T5 SET ('sequence.field' = 'c')")) .rootCause() .isInstanceOf(UnsupportedOperationException.class) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index fc5a3dbe0545..81f07b224ca7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -846,10 +846,30 @@ public void testSetAndRemoveOption() throws Exception { } @Test - public void testSetAndResetImmutableOptions() throws Exception { + public void testSetAndResetImmutableOptionsOnEmptyTables() { + sql("CREATE TABLE T1 (a INT, b INT)"); + sql( + "ALTER TABLE T1 SET ('primary-key' = 'a', 'bucket' = '1', 'merge-engine' = 'first-row')"); + sql("INSERT INTO T1 VALUES (1, 10), (2, 20), (1, 11), (2, 21)"); + assertThat(queryAndSort("SELECT * FROM T1")).containsExactly(Row.of(1, 10), Row.of(2, 20)); + assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('merge-engine' = 'deduplicate')")) + .rootCause() + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Change 'merge-engine' is not supported yet."); + + sql( + "CREATE TABLE T2 (a INT, b INT, PRIMARY KEY (a) NOT ENFORCED) WITH ('bucket' = '1', 'merge-engine' = 'first-row')"); + sql("ALTER TABLE T2 RESET ('merge-engine')"); + sql("INSERT INTO T2 VALUES (1, 10), (2, 20), (1, 11), (2, 21)"); + assertThat(queryAndSort("SELECT * FROM T2")).containsExactly(Row.of(1, 11), Row.of(2, 21)); + } + + @Test + public void testSetAndResetImmutableOptionsOnNonEmptyTables() { // bucket-key is immutable sql( "CREATE TABLE T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'a')"); + sql("INSERT INTO T1 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('bucket-key' = 'c')")) .rootCause() @@ -858,6 +878,7 @@ public void testSetAndResetImmutableOptions() throws Exception { sql( "CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'c')"); + sql("INSERT INTO T2 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')")) .rootCause() .isInstanceOf(UnsupportedOperationException.class) @@ -866,6 +887,7 @@ public void testSetAndResetImmutableOptions() throws Exception { // merge-engine is immutable sql( "CREATE TABLE T4 (a STRING, b STRING, c STRING) WITH ('merge-engine' = 'partial-update')"); + sql("INSERT INTO T4 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T4 RESET ('merge-engine')")) .rootCause() .isInstanceOf(UnsupportedOperationException.class) @@ -873,6 +895,7 @@ public void testSetAndResetImmutableOptions() throws Exception { // sequence.field is immutable sql("CREATE TABLE T5 (a STRING, b STRING, c STRING) WITH ('sequence.field' = 'b')"); + sql("INSERT INTO T5 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T5 SET ('sequence.field' = 'c')")) .rootCause() .isInstanceOf(UnsupportedOperationException.class)