Skip to content

Commit

Permalink
[core][spark] Support renaming nested columns in Spark (#4489)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper authored Nov 11, 2024
1 parent f19a4c9 commit 34153b3
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ static SchemaChange addColumn(
}

static SchemaChange renameColumn(String fieldName, String newName) {
return new RenameColumn(fieldName, newName);
return new RenameColumn(Collections.singletonList(fieldName), newName);
}

static SchemaChange renameColumn(List<String> fieldNames, String newName) {
return new RenameColumn(fieldNames, newName);
}

static SchemaChange dropColumn(String fieldName) {
Expand Down Expand Up @@ -278,16 +282,16 @@ final class RenameColumn implements SchemaChange {

private static final long serialVersionUID = 1L;

private final String fieldName;
private final List<String> fieldNames;
private final String newName;

private RenameColumn(String fieldName, String newName) {
this.fieldName = fieldName;
private RenameColumn(List<String> fieldNames, String newName) {
this.fieldNames = fieldNames;
this.newName = newName;
}

public String fieldName() {
return fieldName;
public List<String> fieldNames() {
return fieldNames;
}

public String newName() {
Expand All @@ -303,14 +307,14 @@ public boolean equals(Object o) {
return false;
}
RenameColumn that = (RenameColumn) o;
return Objects.equals(fieldName, that.fieldName)
return Objects.equals(fieldNames, that.fieldNames)
&& Objects.equals(newName, that.newName);
}

@Override
public int hashCode() {
int result = Objects.hash(newName);
result = 31 * result + Objects.hashCode(fieldName);
result = 31 * result + Objects.hashCode(fieldNames);
return result;
}
}
Expand Down
121 changes: 79 additions & 42 deletions paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,18 +290,11 @@ public TableSchema commitChanges(List<SchemaChange> changes)
DataType dataType =
ReassignFieldId.reassign(addColumn.dataType(), highestFieldId);

new NestedColumnModifier<Catalog.ColumnAlreadyExistException>(
addColumn.fieldNames().toArray(new String[0])) {
new NestedColumnModifier(addColumn.fieldNames().toArray(new String[0])) {
@Override
protected void updateLastColumn(List<DataField> newFields, String fieldName)
throws Catalog.ColumnAlreadyExistException {
for (DataField field : newFields) {
if (field.name().equals(fieldName)) {
throw new Catalog.ColumnAlreadyExistException(
identifierFromPath(tableRoot.toString(), true, branch),
String.join(".", addColumn.fieldNames()));
}
}
assertColumnNotExists(newFields, fieldName);

DataField dataField =
new DataField(id, fieldName, dataType, addColumn.description());
Expand All @@ -327,34 +320,39 @@ protected void updateLastColumn(List<DataField> newFields, String fieldName)
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
renameColumnValidation(oldTableSchema, rename);
if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) {
throw new Catalog.ColumnAlreadyExistException(
identifierFromPath(tableRoot.toString(), true, branch),
rename.fieldName());
}
new NestedColumnModifier(rename.fieldNames().toArray(new String[0])) {
@Override
protected void updateLastColumn(List<DataField> newFields, String fieldName)
throws Catalog.ColumnNotExistException,
Catalog.ColumnAlreadyExistException {
assertColumnExists(newFields, fieldName);
assertColumnNotExists(newFields, rename.newName());
for (int i = 0; i < newFields.size(); i++) {
DataField field = newFields.get(i);
if (!field.name().equals(fieldName)) {
continue;
}

updateNestedColumn(
newFields,
new String[] {rename.fieldName()},
(field) ->
new DataField(
field.id(),
rename.newName(),
field.type(),
field.description()));
DataField newField =
new DataField(
field.id(),
rename.newName(),
field.type(),
field.description());
newFields.set(i, newField);
return;
}
}
}.updateIntermediateColumn(newFields, 0);
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
dropColumnValidation(oldTableSchema, drop);
new NestedColumnModifier<Catalog.ColumnNotExistException>(
drop.fieldNames().toArray(new String[0])) {
new NestedColumnModifier(drop.fieldNames().toArray(new String[0])) {
@Override
protected void updateLastColumn(List<DataField> newFields, String fieldName)
throws Catalog.ColumnNotExistException {
if (!newFields.removeIf(f -> f.name().equals(fieldName))) {
throw new Catalog.ColumnNotExistException(
identifierFromPath(tableRoot.toString(), true, branch),
String.join(".", drop.fieldNames()));
}
assertColumnExists(newFields, fieldName);
newFields.removeIf(f -> f.name().equals(fieldName));
if (newFields.isEmpty()) {
throw new IllegalArgumentException(
"Cannot drop all fields in table");
Expand Down Expand Up @@ -438,7 +436,7 @@ protected void updateLastColumn(List<DataField> newFields, String fieldName)
new Schema(
newFields,
oldTableSchema.partitionKeys(),
applyColumnRename(
applyNotNestedColumnRename(
oldTableSchema.primaryKeys(),
Iterables.filter(changes, RenameColumn.class)),
applySchemaChanges(newOptions, changes),
Expand Down Expand Up @@ -553,25 +551,28 @@ private static Map<String, String> applySchemaChanges(
if (!StringUtils.isNullOrWhitespaceOnly(bucketKeysStr)) {
List<String> bucketColumns = Arrays.asList(bucketKeysStr.split(","));
List<String> newBucketColumns =
applyColumnRename(bucketColumns, Iterables.filter(changes, RenameColumn.class));
applyNotNestedColumnRename(
bucketColumns, Iterables.filter(changes, RenameColumn.class));
newOptions.put(BUCKET_KEY.key(), Joiner.on(',').join(newBucketColumns));
}

// TODO: Apply changes to other options that contain column names, such as `sequence.field`
return newOptions;
}

// Apply column rename changes to the list of column names, this will not change the order of
// the column names
private static List<String> applyColumnRename(
// Apply column rename changes on not nested columns to the list of column names, this will not
// change the order of the column names
private static List<String> applyNotNestedColumnRename(
List<String> columns, Iterable<RenameColumn> renames) {
if (Iterables.isEmpty(renames)) {
return columns;
}

Map<String, String> columnNames = Maps.newHashMap();
for (RenameColumn renameColumn : renames) {
columnNames.put(renameColumn.fieldName(), renameColumn.newName());
if (renameColumn.fieldNames().size() == 1) {
columnNames.put(renameColumn.fieldNames().get(0), renameColumn.newName());
}
}

// The order of the column names will be preserved, as a non-parallel stream is used here.
Expand All @@ -594,14 +595,18 @@ private static void dropColumnValidation(TableSchema schema, DropColumn change)
}

private static void renameColumnValidation(TableSchema schema, RenameColumn change) {
String columnToRename = change.fieldName();
// partition keys can't be nested columns
if (change.fieldNames().size() > 1) {
return;
}
String columnToRename = change.fieldNames().get(0);
if (schema.partitionKeys().contains(columnToRename)) {
throw new UnsupportedOperationException(
String.format("Cannot rename partition column: [%s]", columnToRename));
}
}

private abstract class NestedColumnModifier<E extends Exception> {
private abstract class NestedColumnModifier {

private final String[] updateFieldNames;

Expand All @@ -610,7 +615,7 @@ private NestedColumnModifier(String[] updateFieldNames) {
}

public void updateIntermediateColumn(List<DataField> newFields, int depth)
throws Catalog.ColumnNotExistException, E {
throws Catalog.ColumnNotExistException, Catalog.ColumnAlreadyExistException {
if (depth == updateFieldNames.length - 1) {
updateLastColumn(newFields, updateFieldNames[depth]);
return;
Expand Down Expand Up @@ -643,15 +648,47 @@ public void updateIntermediateColumn(List<DataField> newFields, int depth)
}

protected abstract void updateLastColumn(List<DataField> newFields, String fieldName)
throws E;
throws Catalog.ColumnNotExistException, Catalog.ColumnAlreadyExistException;

protected void assertColumnExists(List<DataField> newFields, String fieldName)
throws Catalog.ColumnNotExistException {
for (DataField field : newFields) {
if (field.name().equals(fieldName)) {
return;
}
}
throw new Catalog.ColumnNotExistException(
identifierFromPath(tableRoot.toString(), true, branch),
getLastFieldName(fieldName));
}

protected void assertColumnNotExists(List<DataField> newFields, String fieldName)
throws Catalog.ColumnAlreadyExistException {
for (DataField field : newFields) {
if (field.name().equals(fieldName)) {
throw new Catalog.ColumnAlreadyExistException(
identifierFromPath(tableRoot.toString(), true, branch),
getLastFieldName(fieldName));
}
}
}

private String getLastFieldName(String fieldName) {
List<String> fieldNames = new ArrayList<>();
for (int i = 0; i + 1 < updateFieldNames.length; i++) {
fieldNames.add(updateFieldNames[i]);
}
fieldNames.add(fieldName);
return String.join(".", fieldNames);
}
}

private void updateNestedColumn(
List<DataField> newFields,
String[] updateFieldNames,
Function<DataField, DataField> updateFunc)
throws Catalog.ColumnNotExistException {
new NestedColumnModifier<Catalog.ColumnNotExistException>(updateFieldNames) {
throws Catalog.ColumnNotExistException, Catalog.ColumnAlreadyExistException {
new NestedColumnModifier(updateFieldNames) {
@Override
protected void updateLastColumn(List<DataField> newFields, String fieldName)
throws Catalog.ColumnNotExistException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ private static void validateForDeletionVectors(CoreOptions options) {

private static void validateSequenceField(TableSchema schema, CoreOptions options) {
List<String> sequenceField = options.sequenceField();
if (sequenceField.size() > 0) {
if (!sequenceField.isEmpty()) {
Map<String, Integer> fieldCount =
sequenceField.stream()
.collect(Collectors.toMap(field -> field, field -> 1, Integer::sum));
Expand Down Expand Up @@ -596,12 +596,12 @@ private static void validateBucket(TableSchema schema, CoreOptions options) {
== MAP
|| dataField.type().getTypeRoot()
== ROW))
.map(dataField -> dataField.name())
.map(DataField::name)
.collect(Collectors.toList());
if (nestedFields.size() > 0) {
if (!nestedFields.isEmpty()) {
throw new RuntimeException(
"nested type can not in bucket-key, in your table these key are "
+ nestedFields.toString());
+ nestedFields);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,9 @@ public void testAlterTableRenameColumn() throws Exception {
catalog.createTable(
identifier,
new Schema(
Lists.newArrayList(new DataField(0, "col1", DataTypes.STRING())),
Lists.newArrayList(
new DataField(0, "col1", DataTypes.STRING()),
new DataField(1, "col2", DataTypes.STRING())),
Collections.emptyList(),
Collections.emptyList(),
Maps.newHashMap(),
Expand All @@ -525,7 +527,7 @@ public void testAlterTableRenameColumn() throws Exception {
false);
Table table = catalog.getTable(identifier);

assertThat(table.rowType().getFields()).hasSize(1);
assertThat(table.rowType().getFields()).hasSize(2);
assertThat(table.rowType().getFieldIndex("col1")).isLessThan(0);
assertThat(table.rowType().getFieldIndex("new_col1")).isEqualTo(0);

Expand All @@ -536,12 +538,12 @@ public void testAlterTableRenameColumn() throws Exception {
catalog.alterTable(
identifier,
Lists.newArrayList(
SchemaChange.renameColumn("col1", "new_col1")),
SchemaChange.renameColumn("col2", "new_col1")),
false))
.satisfies(
anyCauseMatches(
Catalog.ColumnAlreadyExistException.class,
"Column col1 already exists in the test_db.test_table table."));
"Column new_col1 already exists in the test_db.test_table table."));

// Alter table renames a column throws ColumnNotExistException when column does not exist
assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,4 +606,61 @@ public void testAddAndDropNestedColumns() throws Exception {
assertThatCode(() -> manager.commitChanges(middleColumnNotExistDropColumn))
.hasMessageContaining("Column v.invalid does not exist");
}

@Test
public void testRenameNestedColumns() throws Exception {
RowType innerType =
RowType.of(
new DataField(4, "f1", DataTypes.INT()),
new DataField(5, "f2", DataTypes.BIGINT()));
RowType middleType =
RowType.of(
new DataField(2, "f1", DataTypes.STRING()),
new DataField(3, "f2", innerType));
RowType outerType =
RowType.of(
new DataField(0, "k", DataTypes.INT()), new DataField(1, "v", middleType));

Schema schema =
new Schema(
outerType.getFields(),
Collections.singletonList("k"),
Collections.emptyList(),
new HashMap<>(),
"");
SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
manager.createTable(schema);

SchemaChange renameColumn =
SchemaChange.renameColumn(Arrays.asList("v", "f2", "f1"), "f100");
manager.commitChanges(renameColumn);

innerType =
RowType.of(
new DataField(4, "f100", DataTypes.INT()),
new DataField(5, "f2", DataTypes.BIGINT()));
middleType =
RowType.of(
new DataField(2, "f1", DataTypes.STRING()),
new DataField(3, "f2", innerType));
outerType =
RowType.of(
new DataField(0, "k", DataTypes.INT()), new DataField(1, "v", middleType));
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);

SchemaChange middleColumnNotExistRenameColumn =
SchemaChange.renameColumn(Arrays.asList("v", "invalid", "f2"), "f200");
assertThatCode(() -> manager.commitChanges(middleColumnNotExistRenameColumn))
.hasMessageContaining("Column v.invalid does not exist");

SchemaChange lastColumnNotExistRenameColumn =
SchemaChange.renameColumn(Arrays.asList("v", "f2", "invalid"), "new_invalid");
assertThatCode(() -> manager.commitChanges(lastColumnNotExistRenameColumn))
.hasMessageContaining("Column v.f2.invalid does not exist");

SchemaChange newNameAlreadyExistRenameColumn =
SchemaChange.renameColumn(Arrays.asList("v", "f2", "f2"), "f100");
assertThatCode(() -> manager.commitChanges(newNameAlreadyExistRenameColumn))
.hasMessageContaining("Column v.f2.f100 already exists");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ public void testRenameField() throws Exception {
.hasMessage(
String.format(
"Column %s already exists in the %s table.",
"f0", identifier.getFullName()));
"f1", identifier.getFullName()));
}

@Test
Expand Down
Loading

0 comments on commit 34153b3

Please sign in to comment.