Skip to content

Commit

Permalink
[core][spark] Support updating nested column types in Spark (#4494)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper authored Nov 11, 2024
1 parent 70b2d0c commit ee8b1b1
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,18 @@ static SchemaChange dropColumn(List<String> fieldNames) {
}

static SchemaChange updateColumnType(String fieldName, DataType newDataType) {
return new UpdateColumnType(fieldName, newDataType, false);
return new UpdateColumnType(Collections.singletonList(fieldName), newDataType, false);
}

static SchemaChange updateColumnType(
String fieldName, DataType newDataType, boolean keepNullability) {
return new UpdateColumnType(fieldName, newDataType, keepNullability);
return new UpdateColumnType(
Collections.singletonList(fieldName), newDataType, keepNullability);
}

static SchemaChange updateColumnType(
List<String> fieldNames, DataType newDataType, boolean keepNullability) {
return new UpdateColumnType(fieldNames, newDataType, keepNullability);
}

static SchemaChange updateColumnNullability(String fieldName, boolean newNullability) {
Expand Down Expand Up @@ -357,19 +363,20 @@ final class UpdateColumnType implements SchemaChange {

private static final long serialVersionUID = 1L;

private final String fieldName;
private final List<String> fieldNames;
private final DataType newDataType;
// If true, do not change the target field nullability
private final boolean keepNullability;

private UpdateColumnType(String fieldName, DataType newDataType, boolean keepNullability) {
this.fieldName = fieldName;
private UpdateColumnType(
List<String> fieldNames, DataType newDataType, boolean keepNullability) {
this.fieldNames = fieldNames;
this.newDataType = newDataType;
this.keepNullability = keepNullability;
}

public String fieldName() {
return fieldName;
public List<String> fieldNames() {
return fieldNames;
}

public DataType newDataType() {
Expand All @@ -389,14 +396,14 @@ public boolean equals(Object o) {
return false;
}
UpdateColumnType that = (UpdateColumnType) o;
return Objects.equals(fieldName, that.fieldName)
return Objects.equals(fieldNames, that.fieldNames)
&& newDataType.equals(that.newDataType);
}

@Override
public int hashCode() {
int result = Objects.hash(newDataType);
result = 31 * result + Objects.hashCode(fieldName);
result = 31 * result + Objects.hashCode(fieldNames);
return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ protected void updateLastColumn(List<DataField> newFields, String fieldName)
}.updateIntermediateColumn(newFields, 0);
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
renameColumnValidation(oldTableSchema, rename);
assertNotUpdatingPrimaryKeys(oldTableSchema, rename.fieldNames(), "rename");
new NestedColumnModifier(rename.fieldNames().toArray(new String[0])) {
@Override
protected void updateLastColumn(List<DataField> newFields, String fieldName)
Expand Down Expand Up @@ -361,15 +361,10 @@ protected void updateLastColumn(List<DataField> newFields, String fieldName)
}.updateIntermediateColumn(newFields, 0);
} else if (change instanceof UpdateColumnType) {
UpdateColumnType update = (UpdateColumnType) change;
if (oldTableSchema.partitionKeys().contains(update.fieldName())) {
throw new IllegalArgumentException(
String.format(
"Cannot update partition column [%s] type in the table[%s].",
update.fieldName(), tableRoot.getName()));
}
assertNotUpdatingPrimaryKeys(oldTableSchema, update.fieldNames(), "update");
updateNestedColumn(
newFields,
new String[] {update.fieldName()},
update.fieldNames().toArray(new String[0]),
(field) -> {
DataType targetType = update.newDataType();
if (update.keepNullability()) {
Expand All @@ -382,13 +377,6 @@ protected void updateLastColumn(List<DataField> newFields, String fieldName)
String.format(
"Column type %s[%s] cannot be converted to %s without loosing information.",
field.name(), field.type(), targetType));
AtomicInteger dummyId = new AtomicInteger(0);
if (dummyId.get() != 0) {
throw new RuntimeException(
String.format(
"Update column to nested row type '%s' is not supported.",
targetType));
}
return new DataField(
field.id(), field.name(), targetType, field.description());
});
Expand Down Expand Up @@ -594,15 +582,17 @@ private static void dropColumnValidation(TableSchema schema, DropColumn change)
}
}

private static void renameColumnValidation(TableSchema schema, RenameColumn change) {
private static void assertNotUpdatingPrimaryKeys(
TableSchema schema, List<String> fieldNames, String operation) {
// partition keys can't be nested columns
if (change.fieldNames().size() > 1) {
if (fieldNames.size() > 1) {
return;
}
String columnToRename = change.fieldNames().get(0);
String columnToRename = fieldNames.get(0);
if (schema.partitionKeys().contains(columnToRename)) {
throw new UnsupportedOperationException(
String.format("Cannot rename partition column: [%s]", columnToRename));
String.format(
"Cannot " + operation + " partition column: [%s]", columnToRename));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,8 @@ public void testAlterTableUpdateColumnType() throws Exception {
false))
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"Cannot update partition column [dt] type in the table"));
UnsupportedOperationException.class,
"Cannot update partition column: [dt]"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,4 +663,53 @@ public void testRenameNestedColumns() throws Exception {
assertThatCode(() -> manager.commitChanges(newNameAlreadyExistRenameColumn))
.hasMessageContaining("Column v.f2.f100 already exists");
}

@Test
public void testUpdateNestedColumnType() throws Exception {
RowType innerType =
RowType.of(
new DataField(4, "f1", DataTypes.INT()),
new DataField(5, "f2", DataTypes.BIGINT()));
RowType middleType =
RowType.of(
new DataField(2, "f1", DataTypes.STRING()),
new DataField(3, "f2", innerType));
RowType outerType =
RowType.of(
new DataField(0, "k", DataTypes.INT()), new DataField(1, "v", middleType));

Schema schema =
new Schema(
outerType.getFields(),
Collections.singletonList("k"),
Collections.emptyList(),
new HashMap<>(),
"");
SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
manager.createTable(schema);

SchemaChange updateColumnType =
SchemaChange.updateColumnType(
Arrays.asList("v", "f2", "f1"), DataTypes.BIGINT(), true);
manager.commitChanges(updateColumnType);

innerType =
RowType.of(
new DataField(4, "f1", DataTypes.BIGINT()),
new DataField(5, "f2", DataTypes.BIGINT()));
middleType =
RowType.of(
new DataField(2, "f1", DataTypes.STRING()),
new DataField(3, "f2", innerType));
outerType =
RowType.of(
new DataField(0, "k", DataTypes.INT()), new DataField(1, "v", middleType));
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);

SchemaChange middleColumnNotExistUpdateColumnType =
SchemaChange.updateColumnType(
Arrays.asList("v", "invalid", "f1"), DataTypes.BIGINT(), true);
assertThatCode(() -> manager.commitChanges(middleColumnNotExistUpdateColumnType))
.hasMessageContaining("Column v.invalid does not exist");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,21 @@ protected void applySchemaChange(
} else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
SchemaChange.UpdateColumnType updateColumnType =
(SchemaChange.UpdateColumnType) schemaChange;
Preconditions.checkState(
updateColumnType.fieldNames().size() == 1,
"Paimon CDC currently does not support nested type schema evolution.");
TableSchema schema =
schemaManager
.latest()
.orElseThrow(
() ->
new RuntimeException(
"Table does not exist. This is unexpected."));
int idx = schema.fieldNames().indexOf(updateColumnType.fieldName());
int idx = schema.fieldNames().indexOf(updateColumnType.fieldNames().get(0));
Preconditions.checkState(
idx >= 0,
"Field name "
+ updateColumnType.fieldName()
+ updateColumnType.fieldNames().get(0)
+ " does not exist in table. This is unexpected.");
DataType oldType = schema.fields().get(idx).type();
DataType newType = updateColumnType.newDataType();
Expand All @@ -123,7 +126,7 @@ protected void applySchemaChange(
throw new UnsupportedOperationException(
String.format(
"Cannot convert field %s from type %s to %s of Paimon table %s.",
updateColumnType.fieldName(),
updateColumnType.fieldNames().get(0),
oldType,
newType,
identifier.getFullName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,8 @@ private SchemaChange toSchemaChange(TableChange change) {
return SchemaChange.dropColumn(Arrays.asList(delete.fieldNames()));
} else if (change instanceof TableChange.UpdateColumnType) {
TableChange.UpdateColumnType update = (TableChange.UpdateColumnType) change;
validateAlterNestedField(update.fieldNames());
return SchemaChange.updateColumnType(
update.fieldNames()[0], toPaimonType(update.newDataType()), true);
Arrays.asList(update.fieldNames()), toPaimonType(update.newDataType()), true);
} else if (change instanceof TableChange.UpdateColumnNullability) {
TableChange.UpdateColumnNullability update =
(TableChange.UpdateColumnNullability) change;
Expand Down Expand Up @@ -449,13 +448,6 @@ private Schema toInitialSchema(
return schemaBuilder.build();
}

private void validateAlterNestedField(String[] fieldNames) {
if (fieldNames.length > 1) {
throw new UnsupportedOperationException(
"Alter nested column is not supported: " + Arrays.toString(fieldNames));
}
}

private void validateAlterProperty(String alterKey) {
if (PRIMARY_KEY_IDENTIFIER.equals(alterKey)) {
throw new UnsupportedOperationException("Alter primary key is not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,4 +817,37 @@ public void testRenameNestedColumn(String formatType) {
.map(Row::toString))
.containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
}

@ParameterizedTest()
@ValueSource(strings = {"orc", "avro", "parquet"})
public void testUpdateNestedColumnType(String formatType) {
String tableName = "testRenameNestedColumnTable";
spark.sql(
"CREATE TABLE paimon.default."
+ tableName
+ " (k INT NOT NULL, v STRUCT<f1: INT, f2: STRUCT<f1: STRING, f2: INT>>) "
+ "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = '"
+ formatType
+ "')");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, STRUCT(10, STRUCT('apple', 100))), (2, STRUCT(20, STRUCT('banana', 200)))");
assertThat(
spark.sql("SELECT v.f2.f2, k FROM paimon.default." + tableName)
.collectAsList().stream()
.map(Row::toString))
.containsExactlyInAnyOrder("[100,1]", "[200,2]");

spark.sql("ALTER TABLE paimon.default." + tableName + " CHANGE COLUMN v.f2.f2 f2 BIGINT");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, STRUCT(11, STRUCT('APPLE', 101))), (3, STRUCT(31, STRUCT('CHERRY', 3000000000000)))");
assertThat(
spark.sql("SELECT v.f2.f2, k FROM paimon.default." + tableName)
.collectAsList().stream()
.map(Row::toString))
.containsExactlyInAnyOrder("[101,1]", "[200,2]", "[3000000000000,3]");
}
}

0 comments on commit ee8b1b1

Please sign in to comment.