From c9eafb61216114c1e6dfed3493293b5485dbc3a4 Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Mon, 16 Dec 2024 19:42:52 +0800 Subject: [PATCH] [core][spark] Fix create external table with schema evolution (#4719) --- .../org/apache/paimon/types/ArrayType.java | 15 +++++ .../org/apache/paimon/types/DataField.java | 26 ++++++--- .../org/apache/paimon/types/DataType.java | 4 ++ .../java/org/apache/paimon/types/MapType.java | 16 ++++++ .../java/org/apache/paimon/types/RowType.java | 24 ++++++-- .../java/org/apache/paimon/schema/Schema.java | 1 - .../apache/paimon/schema/SchemaManager.java | 15 ++--- .../org/apache/paimon/schema/TableSchema.java | 4 ++ .../sql/DDLWithHiveCatalogTestBase.scala | 57 +++++++++++++++++++ 9 files changed, 141 insertions(+), 21 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/types/ArrayType.java b/paimon-common/src/main/java/org/apache/paimon/types/ArrayType.java index 00cf0c072886..62fb9ce65b69 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/ArrayType.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/ArrayType.java @@ -100,6 +100,21 @@ public boolean equals(Object o) { return elementType.equals(arrayType.elementType); } + @Override + public boolean equalsIgnoreFieldId(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + ArrayType arrayType = (ArrayType) o; + return elementType.equalsIgnoreFieldId(arrayType.elementType); + } + @Override public boolean isPrunedFrom(Object o) { if (this == o) { diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataField.java b/paimon-common/src/main/java/org/apache/paimon/types/DataField.java index e9052684f33f..209118023ba5 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/DataField.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/DataField.java @@ -148,17 +148,29 @@ public boolean equals(Object o) { && Objects.equals(description, field.description); } - public boolean isPrunedFrom(DataField field) { - if (this == field) { + public boolean equalsIgnoreFieldId(DataField other) { + if (this == other) { return true; } - if (field == null) { + if (other == null) { return false; } - return Objects.equals(id, field.id) - && Objects.equals(name, field.name) - && type.isPrunedFrom(field.type) - && Objects.equals(description, field.description); + return Objects.equals(name, other.name) + && type.equalsIgnoreFieldId(other.type) + && Objects.equals(description, other.description); + } + + public boolean isPrunedFrom(DataField other) { + if (this == other) { + return true; + } + if (other == null) { + return false; + } + return Objects.equals(id, other.id) + && Objects.equals(name, other.name) + && type.isPrunedFrom(other.type) + && Objects.equals(description, other.description); } @Override diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataType.java b/paimon-common/src/main/java/org/apache/paimon/types/DataType.java index aff150090e63..dd9a4685ef5b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/DataType.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/DataType.java @@ -145,6 +145,10 @@ public boolean equals(Object o) { return isNullable == that.isNullable && typeRoot == that.typeRoot; } + public boolean equalsIgnoreFieldId(Object o) { + return equals(o); + } + /** * Determine whether the current type is the result of the target type after pruning (e.g. * select some fields from a nested type) or just the same. diff --git a/paimon-common/src/main/java/org/apache/paimon/types/MapType.java b/paimon-common/src/main/java/org/apache/paimon/types/MapType.java index 57cb1a9724b3..b715d49284fa 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/MapType.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/MapType.java @@ -109,6 +109,22 @@ public boolean equals(Object o) { return keyType.equals(mapType.keyType) && valueType.equals(mapType.valueType); } + @Override + public boolean equalsIgnoreFieldId(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + MapType mapType = (MapType) o; + return keyType.equalsIgnoreFieldId(mapType.keyType) + && valueType.equalsIgnoreFieldId(mapType.valueType); + } + @Override public boolean isPrunedFrom(Object o) { if (this == o) { diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java index f3fce0db6df1..625a4634b320 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java @@ -210,13 +210,25 @@ public boolean equals(Object o) { return false; } RowType rowType = (RowType) o; - // For nested RowTypes e.g. DataField.dataType = RowType we need to ignoreIds as they can be - // different - if (fields.size() != rowType.fields.size()) { + return fields.equals(rowType.fields); + } + + public boolean equalsIgnoreFieldId(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { return false; } - for (int i = 0; i < fields.size(); ++i) { - if (!DataField.dataFieldEqualsIgnoreId(fields.get(i), rowType.fields.get(i))) { + if (!super.equals(o)) { + return false; + } + RowType other = (RowType) o; + if (fields.size() != other.fields.size()) { + return false; + } + for (int i = 0; i < fields.size(); i++) { + if (!fields.get(i).equalsIgnoreFieldId(other.fields.get(i))) { return false; } } @@ -236,7 +248,7 @@ public boolean isPrunedFrom(Object o) { } RowType rowType = (RowType) o; for (DataField field : fields) { - if (!field.isPrunedFrom(rowType.getField(field.name()))) { + if (!field.isPrunedFrom(rowType.getField(field.id()))) { return false; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java index 33309a7cecc9..a3b30d81a3dd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java @@ -68,7 +68,6 @@ public Schema( this.partitionKeys = normalizePartitionKeys(partitionKeys); this.primaryKeys = normalizePrimaryKeys(primaryKeys); this.fields = normalizeFields(fields, this.primaryKeys, this.partitionKeys); - this.comment = comment; } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 2139dca4a990..325e0c392d12 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -214,7 +214,7 @@ public TableSchema createTable(Schema schema, boolean externalTable) throws Exce if (latest.isPresent()) { TableSchema latestSchema = latest.get(); if (externalTable) { - checkSchemaForExternalTable(latestSchema, schema); + checkSchemaForExternalTable(latestSchema.toSchema(), schema); return latestSchema; } else { throw new IllegalStateException( @@ -248,14 +248,15 @@ public TableSchema createTable(Schema schema, boolean externalTable) throws Exce } } - private void checkSchemaForExternalTable(TableSchema existsSchema, Schema newSchema) { + private void checkSchemaForExternalTable(Schema existsSchema, Schema newSchema) { // When creating an external table, if the table already exists in the location, we can // choose not to specify the fields. - if (newSchema.fields().isEmpty() - // When the fields are explicitly specified, we need check for consistency. - || (Objects.equals(existsSchema.fields(), newSchema.fields()) - && Objects.equals(existsSchema.partitionKeys(), newSchema.partitionKeys()) - && Objects.equals(existsSchema.primaryKeys(), newSchema.primaryKeys()))) { + if ((newSchema.fields().isEmpty() + || newSchema.rowType().equalsIgnoreFieldId(existsSchema.rowType())) + && (newSchema.partitionKeys().isEmpty() + || Objects.equals(newSchema.partitionKeys(), existsSchema.partitionKeys())) + && (newSchema.primaryKeys().isEmpty() + || Objects.equals(newSchema.primaryKeys(), existsSchema.primaryKeys()))) { // check for options Map existsOptions = existsSchema.options(); Map newOptions = newSchema.options(); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java index a0a149d1ae9b..791269dc73b5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -331,6 +331,10 @@ public static List newFields(RowType rowType) { return rowType.getFields(); } + public Schema toSchema() { + return new Schema(fields, partitionKeys, primaryKeys, options, comment); + } + // =================== Utils for reading ========================= public static TableSchema fromJson(String json) { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index 1189f1f2906b..4ba079ea0bb2 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -434,6 +434,63 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { } } + test("Paimon DDL with hive catalog: create external table with schema evolution") { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + spark.sql(s"USE $catalogName") + withTempDir { + tbLocation => + withDatabase("paimon_db") { + spark.sql(s"CREATE DATABASE IF NOT EXISTS paimon_db") + spark.sql(s"USE paimon_db") + withTable("t1", "t2") { + val expertTbLocation = tbLocation.getCanonicalPath + spark.sql( + s""" + |CREATE TABLE t1 (a INT, b INT, c STRUCT) USING paimon + |LOCATION '$expertTbLocation' + |""".stripMargin) + spark.sql("INSERT INTO t1 VALUES (1, 1, STRUCT(1, 1, 1))") + spark.sql("ALTER TABLE t1 DROP COLUMN b") + spark.sql("ALTER TABLE t1 ADD COLUMN b INT") + spark.sql("ALTER TABLE t1 DROP COLUMN c.f2") + spark.sql("ALTER TABLE t1 ADD COLUMN c.f2 INT") + spark.sql("INSERT INTO t1 VALUES (2, STRUCT(1, 1, 1), 1)") + checkAnswer( + spark.sql("SELECT * FROM t1 ORDER by a"), + Seq(Row(1, Row(1, 1, null), null), Row(2, Row(1, 1, 1), 1))) + + spark.sql( + s""" + |CREATE TABLE t2 (a INT, c STRUCT, b INT) USING paimon + |LOCATION '$expertTbLocation' + |""".stripMargin) + checkAnswer( + spark.sql("SELECT * FROM t2 ORDER by a"), + Seq(Row(1, Row(1, 1, null), null), Row(2, Row(1, 1, 1), 1))) + + // create table with wrong schema + intercept[Exception] { + spark.sql( + s""" + |CREATE TABLE t3 (a INT, b INT, c STRUCT) USING paimon + |LOCATION '$expertTbLocation' + |""".stripMargin) + } + + intercept[Exception] { + spark.sql( + s""" + |CREATE TABLE t4 (a INT, c STRUCT, b INT) USING paimon + |LOCATION '$expertTbLocation' + |""".stripMargin) + } + } + } + } + } + } + def getDatabaseProp(dbName: String, propertyName: String): String = { spark .sql(s"DESC DATABASE EXTENDED $dbName")