Skip to content

Commit

Permalink
[core] Immutable table options can now be changed on an empty table (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper authored Jul 30, 2024
1 parent 1e98ba8 commit 3682344
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 25 deletions.
12 changes: 10 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ private List<String> normalizePrimaryKeys(List<String> primaryKeys) {
"Cannot define primary key on DDL and table options at the same time.");
}
String pk = options.get(CoreOptions.PRIMARY_KEY.key());
primaryKeys = Arrays.asList(pk.split(","));
primaryKeys =
Arrays.stream(pk.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
options.remove(CoreOptions.PRIMARY_KEY.key());
}
return primaryKeys;
Expand All @@ -174,7 +178,11 @@ private List<String> normalizePartitionKeys(List<String> partitionKeys) {
"Cannot define partition on DDL and table options at the same time.");
}
String partitions = options.get(CoreOptions.PARTITION.key());
partitionKeys = Arrays.asList(partitions.split(","));
partitionKeys =
Arrays.stream(partitions.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
options.remove(CoreOptions.PARTITION.key());
}
return partitionKeys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -108,14 +109,14 @@ public Optional<TableSchema> latest() {
try {
return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX)
.reduce(Math::max)
.map(id -> schema(id));
.map(this::schema);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public List<TableSchema> listAll() {
return listAllIds().stream().map(id -> schema(id)).collect(Collectors.toList());
return listAllIds().stream().map(this::schema).collect(Collectors.toList());
}

/** List all schema IDs. */
Expand Down Expand Up @@ -184,24 +185,31 @@ public TableSchema commitChanges(SchemaChange... changes) throws Exception {
public TableSchema commitChanges(List<SchemaChange> changes)
throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
SnapshotManager snapshotManager = new SnapshotManager(fileIO, tableRoot, branch);
boolean hasSnapshots = (snapshotManager.latestSnapshotId() != null);

while (true) {
TableSchema schema =
TableSchema oldTableSchema =
latest().orElseThrow(
() ->
new Catalog.TableNotExistException(
fromPath(branchPath(), true)));
Map<String, String> newOptions = new HashMap<>(schema.options());
List<DataField> newFields = new ArrayList<>(schema.fields());
AtomicInteger highestFieldId = new AtomicInteger(schema.highestFieldId());
String newComment = schema.comment();
Map<String, String> newOptions = new HashMap<>(oldTableSchema.options());
List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());
AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId());
String newComment = oldTableSchema.comment();
for (SchemaChange change : changes) {
if (change instanceof SetOption) {
SetOption setOption = (SetOption) change;
checkAlterTableOption(setOption.key());
if (hasSnapshots) {
checkAlterTableOption(setOption.key());
}
newOptions.put(setOption.key(), setOption.value());
} else if (change instanceof RemoveOption) {
RemoveOption removeOption = (RemoveOption) change;
checkAlterTableOption(removeOption.key());
if (hasSnapshots) {
checkAlterTableOption(removeOption.key());
}
newOptions.remove(removeOption.key());
} else if (change instanceof UpdateComment) {
UpdateComment updateComment = (UpdateComment) change;
Expand Down Expand Up @@ -245,7 +253,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)

} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
validateNotPrimaryAndPartitionKey(schema, rename.fieldName());
validateNotPrimaryAndPartitionKey(oldTableSchema, rename.fieldName());
if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) {
throw new Catalog.ColumnAlreadyExistException(
fromPath(branchPath(), true), rename.fieldName());
Expand All @@ -263,7 +271,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
field.description()));
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
validateNotPrimaryAndPartitionKey(schema, drop.fieldName());
validateNotPrimaryAndPartitionKey(oldTableSchema, drop.fieldName());
if (!newFields.removeIf(
f -> f.name().equals(((DropColumn) change).fieldName()))) {
throw new Catalog.ColumnNotExistException(
Expand All @@ -274,7 +282,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
}
} else if (change instanceof UpdateColumnType) {
UpdateColumnType update = (UpdateColumnType) change;
if (schema.partitionKeys().contains(update.fieldName())) {
if (oldTableSchema.partitionKeys().contains(update.fieldName())) {
throw new IllegalArgumentException(
String.format(
"Cannot update partition column [%s] type in the table[%s].",
Expand Down Expand Up @@ -310,7 +318,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
UpdateColumnNullability update = (UpdateColumnNullability) change;
if (update.fieldNames().length == 1
&& update.newNullability()
&& schema.primaryKeys().contains(update.fieldNames()[0])) {
&& oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) {
throw new UnsupportedOperationException(
"Cannot change nullability of primary key");
}
Expand Down Expand Up @@ -346,20 +354,29 @@ public TableSchema commitChanges(List<SchemaChange> changes)
}
}

TableSchema newSchema =
new TableSchema(
schema.id() + 1,
// We change TableSchema to Schema, because we want to deal with primary-key and
// partition in options.
Schema newSchema =
new Schema(
newFields,
highestFieldId.get(),
schema.partitionKeys(),
schema.primaryKeys(),
oldTableSchema.partitionKeys(),
oldTableSchema.primaryKeys(),
newOptions,
newComment);
TableSchema newTableSchema =
new TableSchema(
oldTableSchema.id() + 1,
newSchema.fields(),
highestFieldId.get(),
newSchema.partitionKeys(),
newSchema.primaryKeys(),
newSchema.options(),
newSchema.comment());

try {
boolean success = commit(newSchema);
boolean success = commit(newTableSchema);
if (success) {
return newSchema;
return newTableSchema;
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@
package org.apache.paimon.schema;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
Expand Down Expand Up @@ -441,4 +450,81 @@ public void testMoveBefore() {
Assertions.assertEquals(
2, fields.get(0).id(), "The field id should remain as 2 after moving f2 before f0");
}

@Test
public void testAlterImmutableOptionsOnEmptyTable() throws Exception {
// create table without primary keys
Schema schema =
new Schema(
rowType.getFields(),
Collections.emptyList(),
Collections.emptyList(),
options,
"");
Path tableRoot = new Path(tempDir.toString(), "table");
SchemaManager manager = new SchemaManager(LocalFileIO.create(), tableRoot);
manager.createTable(schema);

// set immutable options and set primary keys
manager.commitChanges(
SchemaChange.setOption("primary-key", "f0, f1"),
SchemaChange.setOption("partition", "f0"),
SchemaChange.setOption("bucket", "2"),
SchemaChange.setOption("merge-engine", "first-row"));

FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
assertThat(table.schema().partitionKeys()).containsExactly("f0");
assertThat(table.schema().primaryKeys()).containsExactly("f0", "f1");

// read and write data to check that table is really a primary key table with first-row
// merge engine
String commitUser = UUID.randomUUID().toString();
TableWriteImpl<?> write =
table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io"));
TableCommitImpl commit = table.newCommit(commitUser);
write.write(GenericRow.of(1, 10L, BinaryString.fromString("apple")));
write.write(GenericRow.of(1, 20L, BinaryString.fromString("banana")));
write.write(GenericRow.of(2, 10L, BinaryString.fromString("cat")));
write.write(GenericRow.of(2, 20L, BinaryString.fromString("dog")));
commit.commit(1, write.prepareCommit(false, 1));
write.write(GenericRow.of(1, 20L, BinaryString.fromString("peach")));
write.write(GenericRow.of(1, 30L, BinaryString.fromString("mango")));
write.write(GenericRow.of(2, 20L, BinaryString.fromString("tiger")));
write.write(GenericRow.of(2, 30L, BinaryString.fromString("wolf")));
commit.commit(2, write.prepareCommit(false, 2));
write.close();
commit.close();

List<String> actual = new ArrayList<>();
try (RecordReaderIterator<InternalRow> it =
new RecordReaderIterator<>(
table.newRead().createReader(table.newSnapshotReader().read()))) {
while (it.hasNext()) {
InternalRow row = it.next();
actual.add(
String.format(
"%s %d %d %s",
row.getRowKind().shortString(),
row.getInt(0),
row.getLong(1),
row.getString(2)));
}
}
assertThat(actual)
.containsExactlyInAnyOrder(
"+I 1 10 apple",
"+I 1 20 banana",
"+I 1 30 mango",
"+I 2 10 cat",
"+I 2 20 dog",
"+I 2 30 wolf");

// now that table is not empty, we cannot change immutable options
assertThatThrownBy(
() ->
manager.commitChanges(
SchemaChange.setOption("merge-engine", "deduplicate")))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Change 'merge-engine' is not supported yet.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink;

import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;

import java.util.Map;
Expand All @@ -44,9 +45,29 @@ public void testSetAndRemoveOption() throws Exception {
}

@Test
public void testSetAndResetImmutableOptions() {
public void testSetAndResetImmutableOptionsOnEmptyTables() {
sql("CREATE TABLE T1 (a INT, b INT)");
sql(
"ALTER TABLE T1 SET ('primary-key' = 'a', 'bucket' = '1', 'merge-engine' = 'first-row')");
sql("INSERT INTO T1 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
assertThat(queryAndSort("SELECT * FROM T1")).containsExactly(Row.of(1, 10), Row.of(2, 20));
assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('merge-engine' = 'deduplicate')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Change 'merge-engine' is not supported yet.");

sql(
"CREATE TABLE T2 (a INT, b INT, PRIMARY KEY (a) NOT ENFORCED) WITH ('bucket' = '1', 'merge-engine' = 'first-row')");
sql("ALTER TABLE T2 RESET ('merge-engine')");
sql("INSERT INTO T2 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
assertThat(queryAndSort("SELECT * FROM T2")).containsExactly(Row.of(1, 11), Row.of(2, 21));
}

@Test
public void testSetAndResetImmutableOptionsOnNonEmptyTables() {
// bucket-key is immutable
sql("CREATE TABLE T1 (a STRING, b STRING, c STRING)");
sql("INSERT INTO T1 VALUES ('a', 'b', 'c')");

assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('bucket-key' = 'c')"))
.rootCause()
Expand All @@ -55,6 +76,7 @@ public void testSetAndResetImmutableOptions() {

sql(
"CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'c')");
sql("INSERT INTO T2 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
Expand All @@ -63,13 +85,15 @@ public void testSetAndResetImmutableOptions() {
// merge-engine is immutable
sql(
"CREATE TABLE T4 (a STRING, b STRING, c STRING) WITH ('merge-engine' = 'partial-update')");
sql("INSERT INTO T4 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T4 RESET ('merge-engine')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Change 'merge-engine' is not supported yet.");

// sequence.field is immutable
sql("CREATE TABLE T5 (a STRING, b STRING, c STRING) WITH ('sequence.field' = 'b')");
sql("INSERT INTO T5 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T5 SET ('sequence.field' = 'c')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,10 +846,30 @@ public void testSetAndRemoveOption() throws Exception {
}

@Test
public void testSetAndResetImmutableOptions() throws Exception {
public void testSetAndResetImmutableOptionsOnEmptyTables() {
sql("CREATE TABLE T1 (a INT, b INT)");
sql(
"ALTER TABLE T1 SET ('primary-key' = 'a', 'bucket' = '1', 'merge-engine' = 'first-row')");
sql("INSERT INTO T1 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
assertThat(queryAndSort("SELECT * FROM T1")).containsExactly(Row.of(1, 10), Row.of(2, 20));
assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('merge-engine' = 'deduplicate')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Change 'merge-engine' is not supported yet.");

sql(
"CREATE TABLE T2 (a INT, b INT, PRIMARY KEY (a) NOT ENFORCED) WITH ('bucket' = '1', 'merge-engine' = 'first-row')");
sql("ALTER TABLE T2 RESET ('merge-engine')");
sql("INSERT INTO T2 VALUES (1, 10), (2, 20), (1, 11), (2, 21)");
assertThat(queryAndSort("SELECT * FROM T2")).containsExactly(Row.of(1, 11), Row.of(2, 21));
}

@Test
public void testSetAndResetImmutableOptionsOnNonEmptyTables() {
// bucket-key is immutable
sql(
"CREATE TABLE T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'a')");
sql("INSERT INTO T1 VALUES ('a', 'b', 'c')");

assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('bucket-key' = 'c')"))
.rootCause()
Expand All @@ -858,6 +878,7 @@ public void testSetAndResetImmutableOptions() throws Exception {

sql(
"CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'c')");
sql("INSERT INTO T2 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
Expand All @@ -866,13 +887,15 @@ public void testSetAndResetImmutableOptions() throws Exception {
// merge-engine is immutable
sql(
"CREATE TABLE T4 (a STRING, b STRING, c STRING) WITH ('merge-engine' = 'partial-update')");
sql("INSERT INTO T4 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T4 RESET ('merge-engine')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Change 'merge-engine' is not supported yet.");

// sequence.field is immutable
sql("CREATE TABLE T5 (a STRING, b STRING, c STRING) WITH ('sequence.field' = 'b')");
sql("INSERT INTO T5 VALUES ('a', 'b', 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T5 SET ('sequence.field' = 'c')"))
.rootCause()
.isInstanceOf(UnsupportedOperationException.class)
Expand Down

0 comments on commit 3682344

Please sign in to comment.