Skip to content

Commit

Permalink
Core: Support rename primary key columns and bucket key columns.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongyujiang committed Jul 31, 2024
1 parent 5bd3c6f commit ec87a86
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.guava30.com.google.common.base.Joiner;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand All @@ -66,6 +70,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Catalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
Expand Down Expand Up @@ -253,7 +259,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)

} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
validateNotPrimaryAndPartitionKey(oldTableSchema, rename.fieldName());
columnChangeValidation(oldTableSchema, change);
if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) {
throw new Catalog.ColumnAlreadyExistException(
fromPath(branchPath(), true), rename.fieldName());
Expand All @@ -271,7 +277,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
field.description()));
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
validateNotPrimaryAndPartitionKey(oldTableSchema, drop.fieldName());
columnChangeValidation(oldTableSchema, change);
if (!newFields.removeIf(
f -> f.name().equals(((DropColumn) change).fieldName()))) {
throw new Catalog.ColumnNotExistException(
Expand Down Expand Up @@ -360,8 +366,10 @@ public TableSchema commitChanges(List<SchemaChange> changes)
new Schema(
newFields,
oldTableSchema.partitionKeys(),
oldTableSchema.primaryKeys(),
newOptions,
applyColumnRename(
oldTableSchema.primaryKeys(),
Iterables.filter(changes, RenameColumn.class)),
applySchemaChanges(newOptions, changes),
newComment);
TableSchema newTableSchema =
new TableSchema(
Expand Down Expand Up @@ -466,15 +474,58 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
}
}

private void validateNotPrimaryAndPartitionKey(TableSchema schema, String fieldName) {
/// TODO support partition and primary keys schema evolution
if (schema.partitionKeys().contains(fieldName)) {
throw new UnsupportedOperationException(
String.format("Cannot drop/rename partition key[%s]", fieldName));
private static Map<String, String> applySchemaChanges(
Map<String, String> options, Iterable<SchemaChange> changes) {
Map<String, String> newOptions = Maps.newHashMap(options);
String bucketKeysStr = options.get(BUCKET_KEY.key());
if (!StringUtils.isNullOrWhitespaceOnly(bucketKeysStr)) {
List<String> bucketColumns = Arrays.asList(bucketKeysStr.split(","));
List<String> newBucketColumns =
applyColumnRename(bucketColumns, Iterables.filter(changes, RenameColumn.class));
newOptions.put(BUCKET_KEY.key(), Joiner.on(',').join(newBucketColumns));
}
if (schema.primaryKeys().contains(fieldName)) {
throw new UnsupportedOperationException(
String.format("Cannot drop/rename primary key[%s]", fieldName));

return newOptions;
}

private static List<String> applyColumnRename(
List<String> columns, Iterable<RenameColumn> renames) {
if (Iterables.isEmpty(renames)) {
return columns;
}

Map<String, String> columnNames = Maps.newHashMap();
for (RenameColumn renameColumn : renames) {
columnNames.put(renameColumn.fieldName(), renameColumn.newName());
}

// The order of the column names will be preserved, as a non-parallel stream is used here.
return columns.stream()
.map(column -> columnNames.getOrDefault(column, column))
.collect(Collectors.toList());
}

private static void columnChangeValidation(TableSchema schema, SchemaChange change) {
/// TODO support partition and primary keys schema evolution
if (change instanceof DropColumn) {
String columnToDrop = ((DropColumn) change).fieldName();
if (schema.partitionKeys().contains(columnToDrop)
|| schema.primaryKeys().contains(columnToDrop)) {
throw new UnsupportedOperationException(
String.format(
"Cannot drop partition key or primary key: [%s]", columnToDrop));
}
} else if (change instanceof RenameColumn) {
String columnToRename = ((RenameColumn) change).fieldName();
if (schema.partitionKeys().contains(columnToRename)) {
throw new UnsupportedOperationException(
String.format("Cannot rename partition column: [%s]", columnToRename));
}
} else {
throw new IllegalArgumentException(
String.format(
"Validation for %s is not supported",
change.getClass().getSimpleName()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -353,6 +356,51 @@ public void testRenameField() throws Exception {
"f0", identifier.getFullName()));
}

@Test
public void testRenamePrimaryKeyColumn() throws Exception {
Schema schema =
new Schema(
RowType.of(DataTypes.INT(), DataTypes.BIGINT()).getFields(),
Lists.newArrayList("f0"),
Lists.newArrayList("f0", "f1"),
Maps.newHashMap(),
"");

schemaManager.createTable(schema);
assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f0", "f1");

schemaManager.commitChanges(SchemaChange.renameColumn("f1", "f1_"));
TableSchema newSchema = schemaManager.latest().get();
assertThat(newSchema.fieldNames()).containsExactly("f0", "f1_");
assertThat(newSchema.primaryKeys()).containsExactlyInAnyOrder("f0", "f1_");

assertThatThrownBy(
() -> schemaManager.commitChanges(SchemaChange.renameColumn("f0", "f0_")))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Cannot rename partition column: [f0]");
}

@Test
public void testRenameBucketKeyColumn() throws Exception {
Schema schema =
new Schema(
RowType.of(DataTypes.INT(), DataTypes.BIGINT()).getFields(),
ImmutableList.of(),
Lists.newArrayList("f0", "f1"),
ImmutableMap.of(
CoreOptions.BUCKET_KEY.key(),
"f1,f0",
CoreOptions.BUCKET.key(),
"16"),
"");

schemaManager.createTable(schema);
schemaManager.commitChanges(SchemaChange.renameColumn("f0", "f0_"));
TableSchema newSchema = schemaManager.latest().get();

assertThat(newSchema.options().get(CoreOptions.BUCKET_KEY.key())).isEqualTo("f1,f0_");
}

@Test
public void testDropField() throws Exception {
Schema schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,72 @@ public void testDropPrimaryKey() {
.satisfies(
anyCauseMatches(
UnsupportedOperationException.class,
"Cannot drop/rename primary key[b]"));
"Cannot drop partition key or primary key: [b]"));
}

@Test
public void testRenamePrimaryKey() {
spark.sql(
"CREATE TABLE test_rename_primary_key_table (\n"
+ "a BIGINT NOT NULL,\n"
+ "b STRING)\n"
+ "TBLPROPERTIES ('primary-key' = 'a')");

spark.sql("INSERT INTO test_rename_primary_key_table VALUES(1, 'aaa'), (2, 'bbb')");

spark.sql("ALTER TABLE test_rename_primary_key_table RENAME COLUMN a to a_");

List<Row> result =
spark.sql("SHOW CREATE TABLE test_rename_primary_key_table").collectAsList();
assertThat(result.toString())
.contains(
showCreateString(
"test_rename_primary_key_table", "a_ BIGINT NOT NULL", "b STRING"))
.contains("'primary-key' = 'a_'");

List<String> actual =
spark.sql("SELECT * FROM test_rename_primary_key_table").collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList());

assertThat(actual).containsExactlyInAnyOrder("[1,aaa]", "[2,bbb]");

spark.sql("INSERT INTO test_rename_primary_key_table VALUES(1, 'AAA'), (2, 'BBB')");

actual =
spark.sql("SELECT * FROM test_rename_primary_key_table").collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList());
assertThat(actual).containsExactlyInAnyOrder("[1,AAA]", "[2,BBB]");
}

@Test
public void testRenameBucketKey() {
spark.sql(
"CREATE TABLE test_rename_bucket_key_table (\n"
+ "a BIGINT NOT NULL,\n"
+ "b STRING)\n"
+ "TBLPROPERTIES ('bucket-key' = 'a,b', 'bucket'='16')");

spark.sql("INSERT INTO test_rename_bucket_key_table VALUES(1, 'aaa'), (2, 'bbb')");

spark.sql("ALTER TABLE test_rename_bucket_key_table RENAME COLUMN b to b_");

List<Row> result =
spark.sql("SHOW CREATE TABLE test_rename_bucket_key_table").collectAsList();
assertThat(result.toString())
.contains(
showCreateString(
"test_rename_bucket_key_table", "a BIGINT NOT NULL", "b_ STRING"))
.contains("'bucket-key' = 'a,b_'");

List<String> actual =
spark.sql("SELECT * FROM test_rename_bucket_key_table where b_ = 'bbb'")
.collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList());

assertThat(actual).containsExactlyInAnyOrder("[2,bbb]");
}

@Test
Expand Down

0 comments on commit ec87a86

Please sign in to comment.