From e9c46c9ea55b8bcd39e1f099725598ed577a29d6 Mon Sep 17 00:00:00 2001 From: Xiduo You Date: Thu, 15 Aug 2024 12:07:36 +0800 Subject: [PATCH] [spark] Fix alter table column type change nullability (#3965) --- .../apache/paimon/schema/SchemaChange.java | 16 ++++++++++++-- .../apache/paimon/schema/SchemaManager.java | 19 ++++++++--------- .../org/apache/paimon/spark/SparkCatalog.java | 2 +- .../paimon/spark/SparkReadTestBase.java | 11 ++++++++++ .../spark/SparkSchemaEvolutionITCase.java | 21 ++++++++++++++++--- 5 files changed, 53 insertions(+), 16 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java index 20000389dccd..1e790bf659a1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java @@ -68,7 +68,12 @@ static SchemaChange dropColumn(String fieldName) { } static SchemaChange updateColumnType(String fieldName, DataType newDataType) { - return new UpdateColumnType(fieldName, newDataType); + return new UpdateColumnType(fieldName, newDataType, false); + } + + static SchemaChange updateColumnType( + String fieldName, DataType newDataType, boolean keepNullability) { + return new UpdateColumnType(fieldName, newDataType, keepNullability); } static SchemaChange updateColumnNullability(String fieldName, boolean newNullability) { @@ -338,10 +343,13 @@ final class UpdateColumnType implements SchemaChange { private final String fieldName; private final DataType newDataType; + // If true, do not change the target field nullability + private final boolean keepNullability; - private UpdateColumnType(String fieldName, DataType newDataType) { + private UpdateColumnType(String fieldName, DataType newDataType, boolean keepNullability) { this.fieldName = fieldName; this.newDataType = newDataType; + this.keepNullability = keepNullability; } public String fieldName() { @@ -352,6 +360,10 @@ public DataType newDataType() { return newDataType; } + public boolean keepNullability() { + return keepNullability; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 56773c8215b9..482e25d48df8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -296,27 +296,26 @@ public TableSchema commitChanges(List changes) newFields, update.fieldName(), (field) -> { + DataType targetType = update.newDataType(); + if (update.keepNullability()) { + targetType = targetType.copy(field.type().isNullable()); + } checkState( - DataTypeCasts.supportsExplicitCast( - field.type(), update.newDataType()) - && CastExecutors.resolve( - field.type(), update.newDataType()) + DataTypeCasts.supportsExplicitCast(field.type(), targetType) + && CastExecutors.resolve(field.type(), targetType) != null, String.format( "Column type %s[%s] cannot be converted to %s without loosing information.", - field.name(), field.type(), update.newDataType())); + field.name(), field.type(), targetType)); AtomicInteger dummyId = new AtomicInteger(0); if (dummyId.get() != 0) { throw new RuntimeException( String.format( "Update column to nested row type '%s' is not supported.", - update.newDataType())); + targetType)); } return new DataField( - field.id(), - field.name(), - update.newDataType(), - field.description()); + field.id(), field.name(), targetType, field.description()); }); } else if (change instanceof UpdateColumnNullability) { UpdateColumnNullability update = (UpdateColumnNullability) change; diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index a2ea6d0faa34..0306842a4a9d 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -356,7 +356,7 @@ private SchemaChange toSchemaChange(TableChange change) { TableChange.UpdateColumnType update = (TableChange.UpdateColumnType) change; validateAlterNestedField(update.fieldNames()); return SchemaChange.updateColumnType( - update.fieldNames()[0], toPaimonType(update.newDataType())); + update.fieldNames()[0], toPaimonType(update.newDataType()), true); } else if (change instanceof TableChange.UpdateColumnNullability) { TableChange.UpdateColumnNullability update = (TableChange.UpdateColumnNullability) change; diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java index ec82d8511d61..204404e1badd 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java @@ -178,6 +178,13 @@ protected static void createTable(String tableName) { tableName)); } + protected static void createTableWithNonNullColumn(String tableName) { + spark.sql( + String.format( + "CREATE TABLE paimon.default.%s (a INT NOT NULL, b BIGINT NOT NULL, c STRING) TBLPROPERTIES ('bucket' = '1', 'primary-key'='a', 'file.format'='avro')", + tableName)); + } + protected static FileStoreTable getTable(String tableName) { return FileStoreTableFactory.create( LocalFileIO.create(), @@ -239,4 +246,8 @@ protected String showCreateString(String table, String... fieldSpec) { protected String defaultShowCreateString(String table) { return showCreateString(table, "a INT NOT NULL", "b BIGINT", "c STRING"); } + + protected String defaultShowCreateStringWithNonNullColumn(String table) { + return showCreateString(table, "a INT NOT NULL", "b BIGINT NOT NULL", "c STRING"); + } } diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java index 136da9572b65..7d94e7d5df73 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java @@ -331,11 +331,18 @@ public void testUpdateColumnPosition() { @Test public void testAlterColumnType() { - createTable("testAlterColumnType"); + createTableWithNonNullColumn("testAlterColumnType"); writeTable("testAlterColumnType", "(1, 2L, '1')", "(5, 6L, '3')"); + assertThatThrownBy( + () -> { + writeTable("testAlterColumnType", "(1, null, 'a')"); + }) + .hasMessageContaining("Cannot write null to non-null column(b)"); + List beforeAlter = spark.sql("SHOW CREATE TABLE testAlterColumnType").collectAsList(); - assertThat(beforeAlter.toString()).contains(defaultShowCreateString("testAlterColumnType")); + assertThat(beforeAlter.toString()) + .contains(defaultShowCreateStringWithNonNullColumn("testAlterColumnType")); spark.sql("ALTER TABLE testAlterColumnType ALTER COLUMN b TYPE DOUBLE"); assertThat(spark.table("testAlterColumnType").collectAsList().toString()) @@ -345,7 +352,15 @@ public void testAlterColumnType() { assertThat(afterAlter.toString()) .contains( showCreateString( - "testAlterColumnType", "a INT NOT NULL", "b DOUBLE", "c STRING")); + "testAlterColumnType", + "a INT NOT NULL", + "b DOUBLE NOT NULL", + "c STRING")); + assertThatThrownBy( + () -> { + writeTable("testAlterColumnType", "(1, null, 'a')"); + }) + .hasMessageContaining("Cannot write null to non-null column(b)"); } @Test