Skip to content

Commit

Permalink
[spark] Fix alter table column type change nullability (apache#3965)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you authored Aug 15, 2024
1 parent 1bb00c0 commit e9c46c9
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ static SchemaChange dropColumn(String fieldName) {
}

static SchemaChange updateColumnType(String fieldName, DataType newDataType) {
return new UpdateColumnType(fieldName, newDataType);
return new UpdateColumnType(fieldName, newDataType, false);
}

static SchemaChange updateColumnType(
String fieldName, DataType newDataType, boolean keepNullability) {
return new UpdateColumnType(fieldName, newDataType, keepNullability);
}

static SchemaChange updateColumnNullability(String fieldName, boolean newNullability) {
Expand Down Expand Up @@ -338,10 +343,13 @@ final class UpdateColumnType implements SchemaChange {

private final String fieldName;
private final DataType newDataType;
// If true, do not change the target field nullability
private final boolean keepNullability;

private UpdateColumnType(String fieldName, DataType newDataType) {
private UpdateColumnType(String fieldName, DataType newDataType, boolean keepNullability) {
this.fieldName = fieldName;
this.newDataType = newDataType;
this.keepNullability = keepNullability;
}

public String fieldName() {
Expand All @@ -352,6 +360,10 @@ public DataType newDataType() {
return newDataType;
}

public boolean keepNullability() {
return keepNullability;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,27 +296,26 @@ public TableSchema commitChanges(List<SchemaChange> changes)
newFields,
update.fieldName(),
(field) -> {
DataType targetType = update.newDataType();
if (update.keepNullability()) {
targetType = targetType.copy(field.type().isNullable());
}
checkState(
DataTypeCasts.supportsExplicitCast(
field.type(), update.newDataType())
&& CastExecutors.resolve(
field.type(), update.newDataType())
DataTypeCasts.supportsExplicitCast(field.type(), targetType)
&& CastExecutors.resolve(field.type(), targetType)
!= null,
String.format(
"Column type %s[%s] cannot be converted to %s without loosing information.",
field.name(), field.type(), update.newDataType()));
field.name(), field.type(), targetType));
AtomicInteger dummyId = new AtomicInteger(0);
if (dummyId.get() != 0) {
throw new RuntimeException(
String.format(
"Update column to nested row type '%s' is not supported.",
update.newDataType()));
targetType));
}
return new DataField(
field.id(),
field.name(),
update.newDataType(),
field.description());
field.id(), field.name(), targetType, field.description());
});
} else if (change instanceof UpdateColumnNullability) {
UpdateColumnNullability update = (UpdateColumnNullability) change;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ private SchemaChange toSchemaChange(TableChange change) {
TableChange.UpdateColumnType update = (TableChange.UpdateColumnType) change;
validateAlterNestedField(update.fieldNames());
return SchemaChange.updateColumnType(
update.fieldNames()[0], toPaimonType(update.newDataType()));
update.fieldNames()[0], toPaimonType(update.newDataType()), true);
} else if (change instanceof TableChange.UpdateColumnNullability) {
TableChange.UpdateColumnNullability update =
(TableChange.UpdateColumnNullability) change;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ protected static void createTable(String tableName) {
tableName));
}

protected static void createTableWithNonNullColumn(String tableName) {
spark.sql(
String.format(
"CREATE TABLE paimon.default.%s (a INT NOT NULL, b BIGINT NOT NULL, c STRING) TBLPROPERTIES ('bucket' = '1', 'primary-key'='a', 'file.format'='avro')",
tableName));
}

protected static FileStoreTable getTable(String tableName) {
return FileStoreTableFactory.create(
LocalFileIO.create(),
Expand Down Expand Up @@ -239,4 +246,8 @@ protected String showCreateString(String table, String... fieldSpec) {
protected String defaultShowCreateString(String table) {
return showCreateString(table, "a INT NOT NULL", "b BIGINT", "c STRING");
}

protected String defaultShowCreateStringWithNonNullColumn(String table) {
return showCreateString(table, "a INT NOT NULL", "b BIGINT NOT NULL", "c STRING");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,18 @@ public void testUpdateColumnPosition() {

@Test
public void testAlterColumnType() {
createTable("testAlterColumnType");
createTableWithNonNullColumn("testAlterColumnType");
writeTable("testAlterColumnType", "(1, 2L, '1')", "(5, 6L, '3')");

assertThatThrownBy(
() -> {
writeTable("testAlterColumnType", "(1, null, 'a')");
})
.hasMessageContaining("Cannot write null to non-null column(b)");

List<Row> beforeAlter = spark.sql("SHOW CREATE TABLE testAlterColumnType").collectAsList();
assertThat(beforeAlter.toString()).contains(defaultShowCreateString("testAlterColumnType"));
assertThat(beforeAlter.toString())
.contains(defaultShowCreateStringWithNonNullColumn("testAlterColumnType"));

spark.sql("ALTER TABLE testAlterColumnType ALTER COLUMN b TYPE DOUBLE");
assertThat(spark.table("testAlterColumnType").collectAsList().toString())
Expand All @@ -345,7 +352,15 @@ public void testAlterColumnType() {
assertThat(afterAlter.toString())
.contains(
showCreateString(
"testAlterColumnType", "a INT NOT NULL", "b DOUBLE", "c STRING"));
"testAlterColumnType",
"a INT NOT NULL",
"b DOUBLE NOT NULL",
"c STRING"));
assertThatThrownBy(
() -> {
writeTable("testAlterColumnType", "(1, null, 'a')");
})
.hasMessageContaining("Cannot write null to non-null column(b)");
}

@Test
Expand Down

0 comments on commit e9c46c9

Please sign in to comment.