From 5b8db579494bc20b54841489ef997c44f910639e Mon Sep 17 00:00:00 2001 From: tsreaper Date: Mon, 11 Nov 2024 15:46:51 +0800 Subject: [PATCH 1/3] [core][spark] Support updating nested column types in Spark --- .../apache/paimon/schema/SchemaChange.java | 25 ++++++---- .../apache/paimon/schema/SchemaManager.java | 28 ++++------- .../paimon/schema/SchemaManagerTest.java | 49 +++++++++++++++++++ .../org/apache/paimon/spark/SparkCatalog.java | 10 +--- .../spark/SparkSchemaEvolutionITCase.java | 33 +++++++++++++ 5 files changed, 108 insertions(+), 37 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java index 7e94b0a776c2..1c1d601bced8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java @@ -83,12 +83,18 @@ static SchemaChange dropColumn(List fieldNames) { } static SchemaChange updateColumnType(String fieldName, DataType newDataType) { - return new UpdateColumnType(fieldName, newDataType, false); + return new UpdateColumnType(Collections.singletonList(fieldName), newDataType, false); } static SchemaChange updateColumnType( String fieldName, DataType newDataType, boolean keepNullability) { - return new UpdateColumnType(fieldName, newDataType, keepNullability); + return new UpdateColumnType( + Collections.singletonList(fieldName), newDataType, keepNullability); + } + + static SchemaChange updateColumnType( + List fieldNames, DataType newDataType, boolean keepNullability) { + return new UpdateColumnType(fieldNames, newDataType, keepNullability); } static SchemaChange updateColumnNullability(String fieldName, boolean newNullability) { @@ -357,19 +363,20 @@ final class UpdateColumnType implements SchemaChange { private static final long serialVersionUID = 1L; - private final String fieldName; + private final List fieldNames; private final DataType newDataType; // If true, do not change the target field nullability private final boolean keepNullability; - private UpdateColumnType(String fieldName, DataType newDataType, boolean keepNullability) { - this.fieldName = fieldName; + private UpdateColumnType( + List fieldNames, DataType newDataType, boolean keepNullability) { + this.fieldNames = fieldNames; this.newDataType = newDataType; this.keepNullability = keepNullability; } - public String fieldName() { - return fieldName; + public List fieldNames() { + return fieldNames; } public DataType newDataType() { @@ -389,14 +396,14 @@ public boolean equals(Object o) { return false; } UpdateColumnType that = (UpdateColumnType) o; - return Objects.equals(fieldName, that.fieldName) + return Objects.equals(fieldNames, that.fieldNames) && newDataType.equals(that.newDataType); } @Override public int hashCode() { int result = Objects.hash(newDataType); - result = 31 * result + Objects.hashCode(fieldName); + result = 31 * result + Objects.hashCode(fieldNames); return result; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 5ffeca65d0c1..86ed96d5b01b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -319,7 +319,7 @@ protected void updateLastColumn(List newFields, String fieldName) }.updateIntermediateColumn(newFields, 0); } else if (change instanceof RenameColumn) { RenameColumn rename = (RenameColumn) change; - renameColumnValidation(oldTableSchema, rename); + assertNotUpdatingPrimaryKeys(oldTableSchema, rename.fieldNames(), "rename"); new NestedColumnModifier(rename.fieldNames().toArray(new String[0])) { @Override protected void updateLastColumn(List newFields, String fieldName) @@ -361,15 +361,10 @@ protected void updateLastColumn(List newFields, String fieldName) }.updateIntermediateColumn(newFields, 0); } else if (change instanceof UpdateColumnType) { UpdateColumnType update = (UpdateColumnType) change; - if (oldTableSchema.partitionKeys().contains(update.fieldName())) { - throw new IllegalArgumentException( - String.format( - "Cannot update partition column [%s] type in the table[%s].", - update.fieldName(), tableRoot.getName())); - } + assertNotUpdatingPrimaryKeys(oldTableSchema, update.fieldNames(), "update"); updateNestedColumn( newFields, - new String[] {update.fieldName()}, + update.fieldNames().toArray(new String[0]), (field) -> { DataType targetType = update.newDataType(); if (update.keepNullability()) { @@ -382,13 +377,6 @@ protected void updateLastColumn(List newFields, String fieldName) String.format( "Column type %s[%s] cannot be converted to %s without loosing information.", field.name(), field.type(), targetType)); - AtomicInteger dummyId = new AtomicInteger(0); - if (dummyId.get() != 0) { - throw new RuntimeException( - String.format( - "Update column to nested row type '%s' is not supported.", - targetType)); - } return new DataField( field.id(), field.name(), targetType, field.description()); }); @@ -594,15 +582,17 @@ private static void dropColumnValidation(TableSchema schema, DropColumn change) } } - private static void renameColumnValidation(TableSchema schema, RenameColumn change) { + private static void assertNotUpdatingPrimaryKeys( + TableSchema schema, List fieldNames, String operation) { // partition keys can't be nested columns - if (change.fieldNames().size() > 1) { + if (fieldNames.size() > 1) { return; } - String columnToRename = change.fieldNames().get(0); + String columnToRename = fieldNames.get(0); if (schema.partitionKeys().contains(columnToRename)) { throw new UnsupportedOperationException( - String.format("Cannot rename partition column: [%s]", columnToRename)); + String.format( + "Cannot " + operation + " partition column: [%s]", columnToRename)); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 5fb76387eb2b..ac8d4cd91e1d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -663,4 +663,53 @@ public void testRenameNestedColumns() throws Exception { assertThatCode(() -> manager.commitChanges(newNameAlreadyExistRenameColumn)) .hasMessageContaining("Column v.f2.f100 already exists"); } + + @Test + public void testUpdateNestedColumnType() throws Exception { + RowType innerType = + RowType.of( + new DataField(4, "f1", DataTypes.INT()), + new DataField(5, "f2", DataTypes.BIGINT())); + RowType middleType = + RowType.of( + new DataField(2, "f1", DataTypes.STRING()), + new DataField(3, "f2", innerType)); + RowType outerType = + RowType.of( + new DataField(0, "k", DataTypes.INT()), new DataField(1, "v", middleType)); + + Schema schema = + new Schema( + outerType.getFields(), + Collections.singletonList("k"), + Collections.emptyList(), + new HashMap<>(), + ""); + SchemaManager manager = new SchemaManager(LocalFileIO.create(), path); + manager.createTable(schema); + + SchemaChange updateColumnType = + SchemaChange.updateColumnType( + Arrays.asList("v", "f2", "f1"), DataTypes.BIGINT(), true); + manager.commitChanges(updateColumnType); + + innerType = + RowType.of( + new DataField(4, "f1", DataTypes.BIGINT()), + new DataField(5, "f2", DataTypes.BIGINT())); + middleType = + RowType.of( + new DataField(2, "f1", DataTypes.STRING()), + new DataField(3, "f2", innerType)); + outerType = + RowType.of( + new DataField(0, "k", DataTypes.INT()), new DataField(1, "v", middleType)); + assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType); + + SchemaChange middleColumnNotExistUpdateColumnType = + SchemaChange.updateColumnType( + Arrays.asList("v", "invalid", "f1"), DataTypes.BIGINT(), true); + assertThatCode(() -> manager.commitChanges(middleColumnNotExistUpdateColumnType)) + .hasMessageContaining("Column v.invalid does not exist"); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 82c8939eab87..5fde2c56596f 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -385,9 +385,8 @@ private SchemaChange toSchemaChange(TableChange change) { return SchemaChange.dropColumn(Arrays.asList(delete.fieldNames())); } else if (change instanceof TableChange.UpdateColumnType) { TableChange.UpdateColumnType update = (TableChange.UpdateColumnType) change; - validateAlterNestedField(update.fieldNames()); return SchemaChange.updateColumnType( - update.fieldNames()[0], toPaimonType(update.newDataType()), true); + Arrays.asList(update.fieldNames()), toPaimonType(update.newDataType()), true); } else if (change instanceof TableChange.UpdateColumnNullability) { TableChange.UpdateColumnNullability update = (TableChange.UpdateColumnNullability) change; @@ -449,13 +448,6 @@ private Schema toInitialSchema( return schemaBuilder.build(); } - private void validateAlterNestedField(String[] fieldNames) { - if (fieldNames.length > 1) { - throw new UnsupportedOperationException( - "Alter nested column is not supported: " + Arrays.toString(fieldNames)); - } - } - private void validateAlterProperty(String alterKey) { if (PRIMARY_KEY_IDENTIFIER.equals(alterKey)) { throw new UnsupportedOperationException("Alter primary key is not supported"); diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java index ccae59e88675..771ddc62878d 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java @@ -817,4 +817,37 @@ public void testRenameNestedColumn(String formatType) { .map(Row::toString)) .containsExactlyInAnyOrder("[apple,1]", "[banana,2]"); } + + @ParameterizedTest() + @ValueSource(strings = {"orc", "avro", "parquet"}) + public void testUpdateNestedColumnType(String formatType) { + String tableName = "testRenameNestedColumnTable"; + spark.sql( + "CREATE TABLE paimon.default." + + tableName + + " (k INT NOT NULL, v STRUCT>) " + + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = '" + + formatType + + "')"); + spark.sql( + "INSERT INTO paimon.default." + + tableName + + " VALUES (1, STRUCT(10, STRUCT('apple', 100))), (2, STRUCT(20, STRUCT('banana', 200)))"); + assertThat( + spark.sql("SELECT v.f2.f2, k FROM paimon.default." + tableName) + .collectAsList().stream() + .map(Row::toString)) + .containsExactlyInAnyOrder("[100,1]", "[200,2]"); + + spark.sql("ALTER TABLE paimon.default." + tableName + " CHANGE COLUMN v.f2.f2 f2 BIGINT"); + spark.sql( + "INSERT INTO paimon.default." + + tableName + + " VALUES (1, STRUCT(11, STRUCT('APPLE', 101))), (3, STRUCT(31, STRUCT('CHERRY', 3000000000000)))"); + assertThat( + spark.sql("SELECT v.f2.f2, k FROM paimon.default." + tableName) + .collectAsList().stream() + .map(Row::toString)) + .containsExactlyInAnyOrder("[101,1]", "[200,2]", "[3000000000000,3]"); + } } From 4ed3bb23c3ee0fc1e741a1263465ac7f2260a088 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Mon, 11 Nov 2024 18:58:52 +0800 Subject: [PATCH 2/3] [fix] Fix compile --- .../sink/cdc/UpdatedDataFieldsProcessFunctionBase.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java index 77c49e8f3da2..0e93fdb07324 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java @@ -100,6 +100,9 @@ protected void applySchemaChange( } else if (schemaChange instanceof SchemaChange.UpdateColumnType) { SchemaChange.UpdateColumnType updateColumnType = (SchemaChange.UpdateColumnType) schemaChange; + Preconditions.checkState( + updateColumnType.fieldNames().size() == 1, + "Paimon CDC currently does not support nested type schema evolution."); TableSchema schema = schemaManager .latest() @@ -107,11 +110,11 @@ protected void applySchemaChange( () -> new RuntimeException( "Table does not exist. This is unexpected.")); - int idx = schema.fieldNames().indexOf(updateColumnType.fieldName()); + int idx = schema.fieldNames().indexOf(updateColumnType.fieldNames().get(0)); Preconditions.checkState( idx >= 0, "Field name " - + updateColumnType.fieldName() + + updateColumnType.fieldNames().get(0) + " does not exist in table. This is unexpected."); DataType oldType = schema.fields().get(idx).type(); DataType newType = updateColumnType.newDataType(); @@ -123,7 +126,7 @@ protected void applySchemaChange( throw new UnsupportedOperationException( String.format( "Cannot convert field %s from type %s to %s of Paimon table %s.", - updateColumnType.fieldName(), + updateColumnType.fieldNames().get(0), oldType, newType, identifier.getFullName())); From 1d3a626ed7afe236a03a4ed5bd489354df6645f8 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Mon, 11 Nov 2024 19:23:21 +0800 Subject: [PATCH 3/3] [fix] Fix failed tests --- .../test/java/org/apache/paimon/catalog/CatalogTestBase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index 3bba6d5624cd..f130920a7c0e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -675,8 +675,8 @@ public void testAlterTableUpdateColumnType() throws Exception { false)) .satisfies( anyCauseMatches( - IllegalArgumentException.class, - "Cannot update partition column [dt] type in the table")); + UnsupportedOperationException.class, + "Cannot update partition column: [dt]")); } @Test