Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core]: support rename primary key columns and bucket key columns #3809

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.guava30.com.google.common.base.Joiner;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -68,6 +73,8 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Catalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
Expand Down Expand Up @@ -305,7 +312,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)

} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
validateNotPrimaryAndPartitionKey(oldTableSchema, rename.fieldName());
columnChangeValidation(oldTableSchema, change);
if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) {
throw new Catalog.ColumnAlreadyExistException(
identifierFromPath(tableRoot.toString(), true, branch),
Expand All @@ -324,7 +331,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
field.description()));
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
validateNotPrimaryAndPartitionKey(oldTableSchema, drop.fieldName());
columnChangeValidation(oldTableSchema, change);
if (!newFields.removeIf(
f -> f.name().equals(((DropColumn) change).fieldName()))) {
throw new Catalog.ColumnNotExistException(
Expand Down Expand Up @@ -413,8 +420,10 @@ public TableSchema commitChanges(List<SchemaChange> changes)
new Schema(
newFields,
oldTableSchema.partitionKeys(),
oldTableSchema.primaryKeys(),
newOptions,
applyColumnRename(
oldTableSchema.primaryKeys(),
Iterables.filter(changes, RenameColumn.class)),
applySchemaChanges(newOptions, changes),
newComment);
TableSchema newTableSchema =
new TableSchema(
Expand Down Expand Up @@ -519,15 +528,61 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
}
}

private void validateNotPrimaryAndPartitionKey(TableSchema schema, String fieldName) {
/// TODO support partition and primary keys schema evolution
if (schema.partitionKeys().contains(fieldName)) {
throw new UnsupportedOperationException(
String.format("Cannot drop/rename partition key[%s]", fieldName));
private static Map<String, String> applySchemaChanges(
Map<String, String> options, Iterable<SchemaChange> changes) {
Map<String, String> newOptions = Maps.newHashMap(options);
String bucketKeysStr = options.get(BUCKET_KEY.key());
if (!StringUtils.isNullOrWhitespaceOnly(bucketKeysStr)) {
List<String> bucketColumns = Arrays.asList(bucketKeysStr.split(","));
List<String> newBucketColumns =
applyColumnRename(bucketColumns, Iterables.filter(changes, RenameColumn.class));
newOptions.put(BUCKET_KEY.key(), Joiner.on(',').join(newBucketColumns));
}
if (schema.primaryKeys().contains(fieldName)) {
throw new UnsupportedOperationException(
String.format("Cannot drop/rename primary key[%s]", fieldName));

// TODO: Apply changes to other options that contain column names, such as `sequence.field`
return newOptions;
}

// Apply column rename changes to the list of column names, this will not change the order of
// the column names
private static List<String> applyColumnRename(
List<String> columns, Iterable<RenameColumn> renames) {
if (Iterables.isEmpty(renames)) {
return columns;
}

Map<String, String> columnNames = Maps.newHashMap();
for (RenameColumn renameColumn : renames) {
columnNames.put(renameColumn.fieldName(), renameColumn.newName());
}

// The order of the column names will be preserved, as a non-parallel stream is used here.
return columns.stream()
.map(column -> columnNames.getOrDefault(column, column))
.collect(Collectors.toList());
}

private static void columnChangeValidation(TableSchema schema, SchemaChange change) {
/// TODO support partition and primary keys schema evolution
if (change instanceof DropColumn) {
String columnToDrop = ((DropColumn) change).fieldName();
if (schema.partitionKeys().contains(columnToDrop)
|| schema.primaryKeys().contains(columnToDrop)) {
throw new UnsupportedOperationException(
String.format(
"Cannot drop partition key or primary key: [%s]", columnToDrop));
}
} else if (change instanceof RenameColumn) {
String columnToRename = ((RenameColumn) change).fieldName();
if (schema.partitionKeys().contains(columnToRename)) {
throw new UnsupportedOperationException(
String.format("Cannot rename partition column: [%s]", columnToRename));
}
} else {
throw new IllegalArgumentException(
String.format(
"Validation for %s is not supported",
change.getClass().getSimpleName()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -353,6 +356,51 @@ public void testRenameField() throws Exception {
"f0", identifier.getFullName()));
}

@Test
public void testRenamePrimaryKeyColumn() throws Exception {
Schema schema =
new Schema(
RowType.of(DataTypes.INT(), DataTypes.BIGINT()).getFields(),
Lists.newArrayList("f0"),
Lists.newArrayList("f0", "f1"),
Maps.newHashMap(),
"");

schemaManager.createTable(schema);
assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f0", "f1");

schemaManager.commitChanges(SchemaChange.renameColumn("f1", "f1_"));
TableSchema newSchema = schemaManager.latest().get();
assertThat(newSchema.fieldNames()).containsExactly("f0", "f1_");
assertThat(newSchema.primaryKeys()).containsExactlyInAnyOrder("f0", "f1_");

assertThatThrownBy(
() -> schemaManager.commitChanges(SchemaChange.renameColumn("f0", "f0_")))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Cannot rename partition column: [f0]");
}

@Test
public void testRenameBucketKeyColumn() throws Exception {
Schema schema =
new Schema(
RowType.of(DataTypes.INT(), DataTypes.BIGINT()).getFields(),
ImmutableList.of(),
Lists.newArrayList("f0", "f1"),
ImmutableMap.of(
CoreOptions.BUCKET_KEY.key(),
"f1,f0",
CoreOptions.BUCKET.key(),
"16"),
"");

schemaManager.createTable(schema);
schemaManager.commitChanges(SchemaChange.renameColumn("f0", "f0_"));
TableSchema newSchema = schemaManager.latest().get();

assertThat(newSchema.options().get(CoreOptions.BUCKET_KEY.key())).isEqualTo("f1,f0_");
}

@Test
public void testDropField() throws Exception {
Schema schema =
Expand All @@ -379,14 +427,14 @@ public void testDropField() throws Exception {
schemaManager.commitChanges(
Collections.singletonList(SchemaChange.dropColumn("f0"))))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(String.format("Cannot drop/rename partition key[%s]", "f0"));
.hasMessage(String.format("Cannot drop partition key or primary key: [%s]", "f0"));

assertThatThrownBy(
() ->
schemaManager.commitChanges(
Collections.singletonList(SchemaChange.dropColumn("f2"))))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(String.format("Cannot drop/rename primary key[%s]", "f2"));
.hasMessage(String.format("Cannot drop partition key or primary key: [%s]", "f2"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public void testRenamePartitionKey() {
.satisfies(
anyCauseMatches(
UnsupportedOperationException.class,
"Cannot drop/rename partition key[a]"));
"Cannot rename partition column: [a]"));
}

@Test
Expand Down Expand Up @@ -254,7 +254,7 @@ public void testDropPartitionKey() {
.satisfies(
anyCauseMatches(
UnsupportedOperationException.class,
"Cannot drop/rename partition key[a]"));
"Cannot drop partition key or primary key: [a]"));
}

@Test
Expand All @@ -276,7 +276,72 @@ public void testDropPrimaryKey() {
.satisfies(
anyCauseMatches(
UnsupportedOperationException.class,
"Cannot drop/rename primary key[b]"));
"Cannot drop partition key or primary key: [b]"));
}

@Test
public void testRenamePrimaryKey() {
spark.sql(
"CREATE TABLE test_rename_primary_key_table (\n"
+ "a BIGINT NOT NULL,\n"
+ "b STRING)\n"
+ "TBLPROPERTIES ('primary-key' = 'a')");

spark.sql("INSERT INTO test_rename_primary_key_table VALUES(1, 'aaa'), (2, 'bbb')");

spark.sql("ALTER TABLE test_rename_primary_key_table RENAME COLUMN a to a_");

List<Row> result =
spark.sql("SHOW CREATE TABLE test_rename_primary_key_table").collectAsList();
assertThat(result.toString())
.contains(
showCreateString(
"test_rename_primary_key_table", "a_ BIGINT NOT NULL", "b STRING"))
.contains("'primary-key' = 'a_'");

List<String> actual =
spark.sql("SELECT * FROM test_rename_primary_key_table").collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList());

assertThat(actual).containsExactlyInAnyOrder("[1,aaa]", "[2,bbb]");

spark.sql("INSERT INTO test_rename_primary_key_table VALUES(1, 'AAA'), (2, 'BBB')");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refers to the actual column projection used when reading data files during the debugging process.

This refers to data written before rename, the projection columns are _KEY_a and a:
Screenshot 2024-08-04 at 21 51 16

This refers to data written after rename, the projection columns are _KEY_a_ and a_:
Screenshot 2024-08-04 at 22 11 52


actual =
spark.sql("SELECT * FROM test_rename_primary_key_table").collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList());
assertThat(actual).containsExactlyInAnyOrder("[1,AAA]", "[2,BBB]");
}

@Test
public void testRenameBucketKey() {
spark.sql(
"CREATE TABLE test_rename_bucket_key_table (\n"
+ "a BIGINT NOT NULL,\n"
+ "b STRING)\n"
+ "TBLPROPERTIES ('bucket-key' = 'a,b', 'bucket'='16')");

spark.sql("INSERT INTO test_rename_bucket_key_table VALUES(1, 'aaa'), (2, 'bbb')");

spark.sql("ALTER TABLE test_rename_bucket_key_table RENAME COLUMN b to b_");

List<Row> result =
spark.sql("SHOW CREATE TABLE test_rename_bucket_key_table").collectAsList();
assertThat(result.toString())
.contains(
showCreateString(
"test_rename_bucket_key_table", "a BIGINT NOT NULL", "b_ STRING"))
.contains("'bucket-key' = 'a,b_'");

List<String> actual =
spark.sql("SELECT * FROM test_rename_bucket_key_table where b_ = 'bbb'")
.collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList());

assertThat(actual).containsExactlyInAnyOrder("[2,bbb]");
}

@Test
Expand Down
Loading