Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] Fix alter table column type change nullability #3965

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ static SchemaChange dropColumn(String fieldName) {
}

static SchemaChange updateColumnType(String fieldName, DataType newDataType) {
return new UpdateColumnType(fieldName, newDataType);
return new UpdateColumnType(fieldName, newDataType, false);
}

static SchemaChange updateColumnType(
String fieldName, DataType newDataType, boolean keepNullability) {
return new UpdateColumnType(fieldName, newDataType, keepNullability);
}

static SchemaChange updateColumnNullability(String fieldName, boolean newNullability) {
Expand Down Expand Up @@ -338,10 +343,13 @@ final class UpdateColumnType implements SchemaChange {

private final String fieldName;
private final DataType newDataType;
// If true, do not change the target field nullability
private final boolean keepNullability;

private UpdateColumnType(String fieldName, DataType newDataType) {
private UpdateColumnType(String fieldName, DataType newDataType, boolean keepNullability) {
this.fieldName = fieldName;
this.newDataType = newDataType;
this.keepNullability = keepNullability;
}

public String fieldName() {
Expand All @@ -352,6 +360,10 @@ public DataType newDataType() {
return newDataType;
}

public boolean keepNullability() {
return keepNullability;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,27 +296,26 @@ public TableSchema commitChanges(List<SchemaChange> changes)
newFields,
update.fieldName(),
(field) -> {
DataType targetType = update.newDataType();
if (update.keepNullability()) {
targetType = targetType.copy(field.type().isNullable());
}
checkState(
DataTypeCasts.supportsExplicitCast(
field.type(), update.newDataType())
&& CastExecutors.resolve(
field.type(), update.newDataType())
DataTypeCasts.supportsExplicitCast(field.type(), targetType)
&& CastExecutors.resolve(field.type(), targetType)
!= null,
String.format(
"Column type %s[%s] cannot be converted to %s without loosing information.",
field.name(), field.type(), update.newDataType()));
field.name(), field.type(), targetType));
AtomicInteger dummyId = new AtomicInteger(0);
if (dummyId.get() != 0) {
throw new RuntimeException(
String.format(
"Update column to nested row type '%s' is not supported.",
update.newDataType()));
targetType));
}
return new DataField(
field.id(),
field.name(),
update.newDataType(),
field.description());
field.id(), field.name(), targetType, field.description());
});
} else if (change instanceof UpdateColumnNullability) {
UpdateColumnNullability update = (UpdateColumnNullability) change;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ private SchemaChange toSchemaChange(TableChange change) {
TableChange.UpdateColumnType update = (TableChange.UpdateColumnType) change;
validateAlterNestedField(update.fieldNames());
return SchemaChange.updateColumnType(
update.fieldNames()[0], toPaimonType(update.newDataType()));
update.fieldNames()[0], toPaimonType(update.newDataType()), true);
} else if (change instanceof TableChange.UpdateColumnNullability) {
TableChange.UpdateColumnNullability update =
(TableChange.UpdateColumnNullability) change;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ protected static void createTable(String tableName) {
tableName));
}

protected static void createTableWithNonNullColumn(String tableName) {
spark.sql(
String.format(
"CREATE TABLE paimon.default.%s (a INT NOT NULL, b BIGINT NOT NULL, c STRING) TBLPROPERTIES ('bucket' = '1', 'primary-key'='a', 'file.format'='avro')",
tableName));
}

protected static FileStoreTable getTable(String tableName) {
return FileStoreTableFactory.create(
LocalFileIO.create(),
Expand Down Expand Up @@ -239,4 +246,8 @@ protected String showCreateString(String table, String... fieldSpec) {
protected String defaultShowCreateString(String table) {
return showCreateString(table, "a INT NOT NULL", "b BIGINT", "c STRING");
}

protected String defaultShowCreateStringWithNonNullColumn(String table) {
return showCreateString(table, "a INT NOT NULL", "b BIGINT NOT NULL", "c STRING");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,18 @@ public void testUpdateColumnPosition() {

@Test
public void testAlterColumnType() {
createTable("testAlterColumnType");
createTableWithNonNullColumn("testAlterColumnType");
writeTable("testAlterColumnType", "(1, 2L, '1')", "(5, 6L, '3')");

assertThatThrownBy(
() -> {
writeTable("testAlterColumnType", "(1, null, 'a')");
})
.hasMessageContaining("Cannot write null to non-null column(b)");

List<Row> beforeAlter = spark.sql("SHOW CREATE TABLE testAlterColumnType").collectAsList();
assertThat(beforeAlter.toString()).contains(defaultShowCreateString("testAlterColumnType"));
assertThat(beforeAlter.toString())
.contains(defaultShowCreateStringWithNonNullColumn("testAlterColumnType"));

spark.sql("ALTER TABLE testAlterColumnType ALTER COLUMN b TYPE DOUBLE");
assertThat(spark.table("testAlterColumnType").collectAsList().toString())
Expand All @@ -345,7 +352,15 @@ public void testAlterColumnType() {
assertThat(afterAlter.toString())
.contains(
showCreateString(
"testAlterColumnType", "a INT NOT NULL", "b DOUBLE", "c STRING"));
"testAlterColumnType",
"a INT NOT NULL",
"b DOUBLE NOT NULL",
"c STRING"));
assertThatThrownBy(
() -> {
writeTable("testAlterColumnType", "(1, null, 'a')");
})
.hasMessageContaining("Cannot write null to non-null column(b)");
}

@Test
Expand Down
Loading