Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][spark] Support updating nested column types in Spark #4494

Merged
merged 3 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,18 @@ static SchemaChange dropColumn(List<String> fieldNames) {
}

static SchemaChange updateColumnType(String fieldName, DataType newDataType) {
return new UpdateColumnType(fieldName, newDataType, false);
return new UpdateColumnType(Collections.singletonList(fieldName), newDataType, false);
}

static SchemaChange updateColumnType(
String fieldName, DataType newDataType, boolean keepNullability) {
return new UpdateColumnType(fieldName, newDataType, keepNullability);
return new UpdateColumnType(
Collections.singletonList(fieldName), newDataType, keepNullability);
}

static SchemaChange updateColumnType(
List<String> fieldNames, DataType newDataType, boolean keepNullability) {
return new UpdateColumnType(fieldNames, newDataType, keepNullability);
}

static SchemaChange updateColumnNullability(String fieldName, boolean newNullability) {
Expand Down Expand Up @@ -357,19 +363,20 @@ final class UpdateColumnType implements SchemaChange {

private static final long serialVersionUID = 1L;

private final String fieldName;
private final List<String> fieldNames;
private final DataType newDataType;
// If true, do not change the target field nullability
private final boolean keepNullability;

private UpdateColumnType(String fieldName, DataType newDataType, boolean keepNullability) {
this.fieldName = fieldName;
private UpdateColumnType(
List<String> fieldNames, DataType newDataType, boolean keepNullability) {
this.fieldNames = fieldNames;
this.newDataType = newDataType;
this.keepNullability = keepNullability;
}

public String fieldName() {
return fieldName;
public List<String> fieldNames() {
return fieldNames;
}

public DataType newDataType() {
Expand All @@ -389,14 +396,14 @@ public boolean equals(Object o) {
return false;
}
UpdateColumnType that = (UpdateColumnType) o;
return Objects.equals(fieldName, that.fieldName)
return Objects.equals(fieldNames, that.fieldNames)
&& newDataType.equals(that.newDataType);
}

@Override
public int hashCode() {
int result = Objects.hash(newDataType);
result = 31 * result + Objects.hashCode(fieldName);
result = 31 * result + Objects.hashCode(fieldNames);
return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ protected void updateLastColumn(List<DataField> newFields, String fieldName)
}.updateIntermediateColumn(newFields, 0);
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
renameColumnValidation(oldTableSchema, rename);
assertNotUpdatingPrimaryKeys(oldTableSchema, rename.fieldNames(), "rename");
new NestedColumnModifier(rename.fieldNames().toArray(new String[0])) {
@Override
protected void updateLastColumn(List<DataField> newFields, String fieldName)
Expand Down Expand Up @@ -361,15 +361,10 @@ protected void updateLastColumn(List<DataField> newFields, String fieldName)
}.updateIntermediateColumn(newFields, 0);
} else if (change instanceof UpdateColumnType) {
UpdateColumnType update = (UpdateColumnType) change;
if (oldTableSchema.partitionKeys().contains(update.fieldName())) {
throw new IllegalArgumentException(
String.format(
"Cannot update partition column [%s] type in the table[%s].",
update.fieldName(), tableRoot.getName()));
}
assertNotUpdatingPrimaryKeys(oldTableSchema, update.fieldNames(), "update");
updateNestedColumn(
newFields,
new String[] {update.fieldName()},
update.fieldNames().toArray(new String[0]),
(field) -> {
DataType targetType = update.newDataType();
if (update.keepNullability()) {
Expand All @@ -382,13 +377,6 @@ protected void updateLastColumn(List<DataField> newFields, String fieldName)
String.format(
"Column type %s[%s] cannot be converted to %s without loosing information.",
field.name(), field.type(), targetType));
AtomicInteger dummyId = new AtomicInteger(0);
if (dummyId.get() != 0) {
throw new RuntimeException(
String.format(
"Update column to nested row type '%s' is not supported.",
targetType));
}
return new DataField(
field.id(), field.name(), targetType, field.description());
});
Expand Down Expand Up @@ -594,15 +582,17 @@ private static void dropColumnValidation(TableSchema schema, DropColumn change)
}
}

private static void renameColumnValidation(TableSchema schema, RenameColumn change) {
private static void assertNotUpdatingPrimaryKeys(
TableSchema schema, List<String> fieldNames, String operation) {
// partition keys can't be nested columns
if (change.fieldNames().size() > 1) {
if (fieldNames.size() > 1) {
return;
}
String columnToRename = change.fieldNames().get(0);
String columnToRename = fieldNames.get(0);
if (schema.partitionKeys().contains(columnToRename)) {
throw new UnsupportedOperationException(
String.format("Cannot rename partition column: [%s]", columnToRename));
String.format(
"Cannot " + operation + " partition column: [%s]", columnToRename));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,8 @@ public void testAlterTableUpdateColumnType() throws Exception {
false))
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"Cannot update partition column [dt] type in the table"));
UnsupportedOperationException.class,
"Cannot update partition column: [dt]"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,4 +663,53 @@ public void testRenameNestedColumns() throws Exception {
assertThatCode(() -> manager.commitChanges(newNameAlreadyExistRenameColumn))
.hasMessageContaining("Column v.f2.f100 already exists");
}

@Test
public void testUpdateNestedColumnType() throws Exception {
RowType innerType =
RowType.of(
new DataField(4, "f1", DataTypes.INT()),
new DataField(5, "f2", DataTypes.BIGINT()));
RowType middleType =
RowType.of(
new DataField(2, "f1", DataTypes.STRING()),
new DataField(3, "f2", innerType));
RowType outerType =
RowType.of(
new DataField(0, "k", DataTypes.INT()), new DataField(1, "v", middleType));

Schema schema =
new Schema(
outerType.getFields(),
Collections.singletonList("k"),
Collections.emptyList(),
new HashMap<>(),
"");
SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
manager.createTable(schema);

SchemaChange updateColumnType =
SchemaChange.updateColumnType(
Arrays.asList("v", "f2", "f1"), DataTypes.BIGINT(), true);
manager.commitChanges(updateColumnType);

innerType =
RowType.of(
new DataField(4, "f1", DataTypes.BIGINT()),
new DataField(5, "f2", DataTypes.BIGINT()));
middleType =
RowType.of(
new DataField(2, "f1", DataTypes.STRING()),
new DataField(3, "f2", innerType));
outerType =
RowType.of(
new DataField(0, "k", DataTypes.INT()), new DataField(1, "v", middleType));
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);

SchemaChange middleColumnNotExistUpdateColumnType =
SchemaChange.updateColumnType(
Arrays.asList("v", "invalid", "f1"), DataTypes.BIGINT(), true);
assertThatCode(() -> manager.commitChanges(middleColumnNotExistUpdateColumnType))
.hasMessageContaining("Column v.invalid does not exist");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,21 @@ protected void applySchemaChange(
} else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
SchemaChange.UpdateColumnType updateColumnType =
(SchemaChange.UpdateColumnType) schemaChange;
Preconditions.checkState(
updateColumnType.fieldNames().size() == 1,
"Paimon CDC currently does not support nested type schema evolution.");
TableSchema schema =
schemaManager
.latest()
.orElseThrow(
() ->
new RuntimeException(
"Table does not exist. This is unexpected."));
int idx = schema.fieldNames().indexOf(updateColumnType.fieldName());
int idx = schema.fieldNames().indexOf(updateColumnType.fieldNames().get(0));
Preconditions.checkState(
idx >= 0,
"Field name "
+ updateColumnType.fieldName()
+ updateColumnType.fieldNames().get(0)
+ " does not exist in table. This is unexpected.");
DataType oldType = schema.fields().get(idx).type();
DataType newType = updateColumnType.newDataType();
Expand All @@ -123,7 +126,7 @@ protected void applySchemaChange(
throw new UnsupportedOperationException(
String.format(
"Cannot convert field %s from type %s to %s of Paimon table %s.",
updateColumnType.fieldName(),
updateColumnType.fieldNames().get(0),
oldType,
newType,
identifier.getFullName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,8 @@ private SchemaChange toSchemaChange(TableChange change) {
return SchemaChange.dropColumn(Arrays.asList(delete.fieldNames()));
} else if (change instanceof TableChange.UpdateColumnType) {
TableChange.UpdateColumnType update = (TableChange.UpdateColumnType) change;
validateAlterNestedField(update.fieldNames());
return SchemaChange.updateColumnType(
update.fieldNames()[0], toPaimonType(update.newDataType()), true);
Arrays.asList(update.fieldNames()), toPaimonType(update.newDataType()), true);
} else if (change instanceof TableChange.UpdateColumnNullability) {
TableChange.UpdateColumnNullability update =
(TableChange.UpdateColumnNullability) change;
Expand Down Expand Up @@ -449,13 +448,6 @@ private Schema toInitialSchema(
return schemaBuilder.build();
}

private void validateAlterNestedField(String[] fieldNames) {
if (fieldNames.length > 1) {
throw new UnsupportedOperationException(
"Alter nested column is not supported: " + Arrays.toString(fieldNames));
}
}

private void validateAlterProperty(String alterKey) {
if (PRIMARY_KEY_IDENTIFIER.equals(alterKey)) {
throw new UnsupportedOperationException("Alter primary key is not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,4 +817,37 @@ public void testRenameNestedColumn(String formatType) {
.map(Row::toString))
.containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
}

@ParameterizedTest()
@ValueSource(strings = {"orc", "avro", "parquet"})
public void testUpdateNestedColumnType(String formatType) {
String tableName = "testRenameNestedColumnTable";
spark.sql(
"CREATE TABLE paimon.default."
+ tableName
+ " (k INT NOT NULL, v STRUCT<f1: INT, f2: STRUCT<f1: STRING, f2: INT>>) "
+ "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = '"
+ formatType
+ "')");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, STRUCT(10, STRUCT('apple', 100))), (2, STRUCT(20, STRUCT('banana', 200)))");
assertThat(
spark.sql("SELECT v.f2.f2, k FROM paimon.default." + tableName)
.collectAsList().stream()
.map(Row::toString))
.containsExactlyInAnyOrder("[100,1]", "[200,2]");

spark.sql("ALTER TABLE paimon.default." + tableName + " CHANGE COLUMN v.f2.f2 f2 BIGINT");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (1, STRUCT(11, STRUCT('APPLE', 101))), (3, STRUCT(31, STRUCT('CHERRY', 3000000000000)))");
assertThat(
spark.sql("SELECT v.f2.f2, k FROM paimon.default." + tableName)
.collectAsList().stream()
.map(Row::toString))
.containsExactlyInAnyOrder("[101,1]", "[200,2]", "[3000000000000,3]");
}
}
Loading