Skip to content

Commit

Permalink
[core][spark] Fix create external table with schema evolution (#4719)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Dec 16, 2024
1 parent 7767020 commit c9eafb6
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 21 deletions.
15 changes: 15 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/types/ArrayType.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,21 @@ public boolean equals(Object o) {
return elementType.equals(arrayType.elementType);
}

@Override
public boolean equalsIgnoreFieldId(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
ArrayType arrayType = (ArrayType) o;
return elementType.equalsIgnoreFieldId(arrayType.elementType);
}

@Override
public boolean isPrunedFrom(Object o) {
if (this == o) {
Expand Down
26 changes: 19 additions & 7 deletions paimon-common/src/main/java/org/apache/paimon/types/DataField.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,29 @@ public boolean equals(Object o) {
&& Objects.equals(description, field.description);
}

public boolean isPrunedFrom(DataField field) {
if (this == field) {
public boolean equalsIgnoreFieldId(DataField other) {
if (this == other) {
return true;
}
if (field == null) {
if (other == null) {
return false;
}
return Objects.equals(id, field.id)
&& Objects.equals(name, field.name)
&& type.isPrunedFrom(field.type)
&& Objects.equals(description, field.description);
return Objects.equals(name, other.name)
&& type.equalsIgnoreFieldId(other.type)
&& Objects.equals(description, other.description);
}

public boolean isPrunedFrom(DataField other) {
if (this == other) {
return true;
}
if (other == null) {
return false;
}
return Objects.equals(id, other.id)
&& Objects.equals(name, other.name)
&& type.isPrunedFrom(other.type)
&& Objects.equals(description, other.description);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ public boolean equals(Object o) {
return isNullable == that.isNullable && typeRoot == that.typeRoot;
}

public boolean equalsIgnoreFieldId(Object o) {
return equals(o);
}

/**
* Determine whether the current type is the result of the target type after pruning (e.g.
* select some fields from a nested type) or just the same.
Expand Down
16 changes: 16 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/types/MapType.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,22 @@ public boolean equals(Object o) {
return keyType.equals(mapType.keyType) && valueType.equals(mapType.valueType);
}

@Override
public boolean equalsIgnoreFieldId(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
MapType mapType = (MapType) o;
return keyType.equalsIgnoreFieldId(mapType.keyType)
&& valueType.equalsIgnoreFieldId(mapType.valueType);
}

@Override
public boolean isPrunedFrom(Object o) {
if (this == o) {
Expand Down
24 changes: 18 additions & 6 deletions paimon-common/src/main/java/org/apache/paimon/types/RowType.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,25 @@ public boolean equals(Object o) {
return false;
}
RowType rowType = (RowType) o;
// For nested RowTypes e.g. DataField.dataType = RowType we need to ignoreIds as they can be
// different
if (fields.size() != rowType.fields.size()) {
return fields.equals(rowType.fields);
}

public boolean equalsIgnoreFieldId(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
for (int i = 0; i < fields.size(); ++i) {
if (!DataField.dataFieldEqualsIgnoreId(fields.get(i), rowType.fields.get(i))) {
if (!super.equals(o)) {
return false;
}
RowType other = (RowType) o;
if (fields.size() != other.fields.size()) {
return false;
}
for (int i = 0; i < fields.size(); i++) {
if (!fields.get(i).equalsIgnoreFieldId(other.fields.get(i))) {
return false;
}
}
Expand All @@ -236,7 +248,7 @@ public boolean isPrunedFrom(Object o) {
}
RowType rowType = (RowType) o;
for (DataField field : fields) {
if (!field.isPrunedFrom(rowType.getField(field.name()))) {
if (!field.isPrunedFrom(rowType.getField(field.id()))) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public Schema(
this.partitionKeys = normalizePartitionKeys(partitionKeys);
this.primaryKeys = normalizePrimaryKeys(primaryKeys);
this.fields = normalizeFields(fields, this.primaryKeys, this.partitionKeys);

this.comment = comment;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public TableSchema createTable(Schema schema, boolean externalTable) throws Exce
if (latest.isPresent()) {
TableSchema latestSchema = latest.get();
if (externalTable) {
checkSchemaForExternalTable(latestSchema, schema);
checkSchemaForExternalTable(latestSchema.toSchema(), schema);
return latestSchema;
} else {
throw new IllegalStateException(
Expand Down Expand Up @@ -248,14 +248,15 @@ public TableSchema createTable(Schema schema, boolean externalTable) throws Exce
}
}

private void checkSchemaForExternalTable(TableSchema existsSchema, Schema newSchema) {
private void checkSchemaForExternalTable(Schema existsSchema, Schema newSchema) {
// When creating an external table, if the table already exists in the location, we can
// choose not to specify the fields.
if (newSchema.fields().isEmpty()
// When the fields are explicitly specified, we need check for consistency.
|| (Objects.equals(existsSchema.fields(), newSchema.fields())
&& Objects.equals(existsSchema.partitionKeys(), newSchema.partitionKeys())
&& Objects.equals(existsSchema.primaryKeys(), newSchema.primaryKeys()))) {
if ((newSchema.fields().isEmpty()
|| newSchema.rowType().equalsIgnoreFieldId(existsSchema.rowType()))
&& (newSchema.partitionKeys().isEmpty()
|| Objects.equals(newSchema.partitionKeys(), existsSchema.partitionKeys()))
&& (newSchema.primaryKeys().isEmpty()
|| Objects.equals(newSchema.primaryKeys(), existsSchema.primaryKeys()))) {
// check for options
Map<String, String> existsOptions = existsSchema.options();
Map<String, String> newOptions = newSchema.options();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ public static List<DataField> newFields(RowType rowType) {
return rowType.getFields();
}

public Schema toSchema() {
return new Schema(fields, partitionKeys, primaryKeys, options, comment);
}

// =================== Utils for reading =========================

public static TableSchema fromJson(String json) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,63 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
}
}

test("Paimon DDL with hive catalog: create external table with schema evolution") {
Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
catalogName =>
spark.sql(s"USE $catalogName")
withTempDir {
tbLocation =>
withDatabase("paimon_db") {
spark.sql(s"CREATE DATABASE IF NOT EXISTS paimon_db")
spark.sql(s"USE paimon_db")
withTable("t1", "t2") {
val expertTbLocation = tbLocation.getCanonicalPath
spark.sql(
s"""
|CREATE TABLE t1 (a INT, b INT, c STRUCT<f1: INT, f2: INT, f3: INT>) USING paimon
|LOCATION '$expertTbLocation'
|""".stripMargin)
spark.sql("INSERT INTO t1 VALUES (1, 1, STRUCT(1, 1, 1))")
spark.sql("ALTER TABLE t1 DROP COLUMN b")
spark.sql("ALTER TABLE t1 ADD COLUMN b INT")
spark.sql("ALTER TABLE t1 DROP COLUMN c.f2")
spark.sql("ALTER TABLE t1 ADD COLUMN c.f2 INT")
spark.sql("INSERT INTO t1 VALUES (2, STRUCT(1, 1, 1), 1)")
checkAnswer(
spark.sql("SELECT * FROM t1 ORDER by a"),
Seq(Row(1, Row(1, 1, null), null), Row(2, Row(1, 1, 1), 1)))

spark.sql(
s"""
|CREATE TABLE t2 (a INT, c STRUCT<f1: INT, f3: INT, f2: INT>, b INT) USING paimon
|LOCATION '$expertTbLocation'
|""".stripMargin)
checkAnswer(
spark.sql("SELECT * FROM t2 ORDER by a"),
Seq(Row(1, Row(1, 1, null), null), Row(2, Row(1, 1, 1), 1)))

// create table with wrong schema
intercept[Exception] {
spark.sql(
s"""
|CREATE TABLE t3 (a INT, b INT, c STRUCT<f1: INT, f3: INT, f2: INT>) USING paimon
|LOCATION '$expertTbLocation'
|""".stripMargin)
}

intercept[Exception] {
spark.sql(
s"""
|CREATE TABLE t4 (a INT, c STRUCT<f1: INT, f2: INT, f3: INT>, b INT) USING paimon
|LOCATION '$expertTbLocation'
|""".stripMargin)
}
}
}
}
}
}

def getDatabaseProp(dbName: String, propertyName: String): String = {
spark
.sql(s"DESC DATABASE EXTENDED $dbName")
Expand Down

0 comments on commit c9eafb6

Please sign in to comment.