diff --git a/docs/content/migration/iceberg-compatibility.md b/docs/content/migration/iceberg-compatibility.md index 8e4d3c90176b..8a1637d7e96e 100644 --- a/docs/content/migration/iceberg-compatibility.md +++ b/docs/content/migration/iceberg-compatibility.md @@ -479,21 +479,24 @@ SELECT * FROM animals WHERE class = 'mammal'; Paimon Iceberg compatibility currently supports the following data types. -| Paimon Data Type | Iceberg Data Type | -|-------------------|-------------------| -| `BOOLEAN` | `boolean` | -| `INT` | `int` | -| `BIGINT` | `long` | -| `FLOAT` | `float` | -| `DOUBLE` | `double` | -| `DECIMAL` | `decimal` | -| `CHAR` | `string` | -| `VARCHAR` | `string` | -| `BINARY` | `binary` | -| `VARBINARY` | `binary` | -| `DATE` | `date` | -| `TIMESTAMP`* | `timestamp` | -| `TIMESTAMP_LTZ`* | `timestamptz` | +| Paimon Data Type | Iceberg Data Type | +|------------------|-------------------| +| `BOOLEAN` | `boolean` | +| `INT` | `int` | +| `BIGINT` | `long` | +| `FLOAT` | `float` | +| `DOUBLE` | `double` | +| `DECIMAL` | `decimal` | +| `CHAR` | `string` | +| `VARCHAR` | `string` | +| `BINARY` | `binary` | +| `VARBINARY` | `binary` | +| `DATE` | `date` | +| `TIMESTAMP`* | `timestamp` | +| `TIMESTAMP_LTZ`* | `timestamptz` | +| `ARRAY` | `list` | +| `MAP` | `map` | +| `ROW` | `struct` | *: `TIMESTAMP` and `TIMESTAMP_LTZ` type only support precision from 4 to 6 diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java index 7ea6cbe05777..f561546e8bb3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java @@ -295,7 +295,8 @@ private List dataSplitToManifestEntries( rawFile.rowCount(), rawFile.fileSize(), schemaCache.get(paimonFileMeta.schemaId()), - paimonFileMeta.valueStats()); + paimonFileMeta.valueStats(), + paimonFileMeta.valueStatsCols()); result.add( new IcebergManifestEntry( IcebergManifestEntry.Status.ADDED, @@ -509,7 +510,8 @@ private List createNewlyAddedManifestFileMetas( paimonFileMeta.rowCount(), paimonFileMeta.fileSize(), schemaCache.get(paimonFileMeta.schemaId()), - paimonFileMeta.valueStats()); + paimonFileMeta.valueStats(), + paimonFileMeta.valueStatsCols()); return new IcebergManifestEntry( IcebergManifestEntry.Status.ADDED, currentSnapshotId, diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java index 10dbf3d237de..d171962becad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java @@ -22,13 +22,15 @@ import org.apache.paimon.data.GenericMap; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.iceberg.metadata.IcebergDataField; import org.apache.paimon.iceberg.metadata.IcebergSchema; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.types.DataField; -import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -110,27 +112,44 @@ public static IcebergDataFileMeta create( long recordCount, long fileSizeInBytes, IcebergSchema icebergSchema, - SimpleStats stats) { + SimpleStats stats, + @Nullable List statsColumns) { + int numFields = icebergSchema.fields().size(); + Map indexMap = new HashMap<>(); + if (statsColumns == null) { + for (int i = 0; i < numFields; i++) { + indexMap.put(icebergSchema.fields().get(i).name(), i); + } + } else { + for (int i = 0; i < statsColumns.size(); i++) { + indexMap.put(statsColumns.get(i), i); + } + } + Map nullValueCounts = new HashMap<>(); Map lowerBounds = new HashMap<>(); Map upperBounds = new HashMap<>(); - List fieldGetters = new ArrayList<>(); - int numFields = icebergSchema.fields().size(); for (int i = 0; i < numFields; i++) { - fieldGetters.add( - InternalRow.createFieldGetter(icebergSchema.fields().get(i).dataType(), i)); - } + IcebergDataField field = icebergSchema.fields().get(i); + if (!indexMap.containsKey(field.name())) { + continue; + } - for (int i = 0; i < numFields; i++) { - int fieldId = icebergSchema.fields().get(i).id(); - DataType type = icebergSchema.fields().get(i).dataType(); - nullValueCounts.put(fieldId, stats.nullCounts().getLong(i)); - Object minValue = fieldGetters.get(i).getFieldOrNull(stats.minValues()); - Object maxValue = fieldGetters.get(i).getFieldOrNull(stats.maxValues()); + int idx = indexMap.get(field.name()); + nullValueCounts.put(field.id(), stats.nullCounts().getLong(idx)); + + InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(field.dataType(), idx); + Object minValue = fieldGetter.getFieldOrNull(stats.minValues()); + Object maxValue = fieldGetter.getFieldOrNull(stats.maxValues()); if (minValue != null && maxValue != null) { - lowerBounds.put(fieldId, IcebergConversions.toByteBuffer(type, minValue).array()); - upperBounds.put(fieldId, IcebergConversions.toByteBuffer(type, maxValue).array()); + lowerBounds.put( + field.id(), + IcebergConversions.toByteBuffer(field.dataType(), minValue).array()); + upperBounds.put( + field.id(), + IcebergConversions.toByteBuffer(field.dataType(), maxValue).array()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java index 93cb2ab6de23..4ecc77a13581 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java @@ -18,10 +18,14 @@ package org.apache.paimon.iceberg.metadata; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; import org.apache.paimon.utils.Preconditions; @@ -32,6 +36,7 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import java.util.Objects; +import java.util.stream.Collectors; /** * {@link DataField} in Iceberg. @@ -57,7 +62,7 @@ public class IcebergDataField { private final boolean required; @JsonProperty(FIELD_TYPE) - private final String type; + private final Object type; @JsonIgnore private final DataType dataType; @@ -69,7 +74,7 @@ public IcebergDataField(DataField dataField) { dataField.id(), dataField.name(), !dataField.type().isNullable(), - toTypeString(dataField.type()), + toTypeObject(dataField.type(), dataField.id(), 0), dataField.type(), dataField.description()); } @@ -79,13 +84,13 @@ public IcebergDataField( @JsonProperty(FIELD_ID) int id, @JsonProperty(FIELD_NAME) String name, @JsonProperty(FIELD_REQUIRED) boolean required, - @JsonProperty(FIELD_TYPE) String type, + @JsonProperty(FIELD_TYPE) Object type, @JsonProperty(FIELD_DOC) String doc) { this(id, name, required, type, null, doc); } public IcebergDataField( - int id, String name, boolean required, String type, DataType dataType, String doc) { + int id, String name, boolean required, Object type, DataType dataType, String doc) { this.id = id; this.name = name; this.required = required; @@ -110,7 +115,7 @@ public boolean required() { } @JsonGetter(FIELD_TYPE) - public String type() { + public Object type() { return type; } @@ -124,7 +129,7 @@ public DataType dataType() { return Preconditions.checkNotNull(dataType); } - private static String toTypeString(DataType dataType) { + private static Object toTypeObject(DataType dataType, int fieldId, int depth) { switch (dataType.getTypeRoot()) { case BOOLEAN: return "boolean"; @@ -160,6 +165,26 @@ private static String toTypeString(DataType dataType) { timestampLtzPrecision > 3 && timestampLtzPrecision <= 6, "Paimon Iceberg compatibility only support timestamp type with precision from 4 to 6."); return "timestamptz"; + case ARRAY: + ArrayType arrayType = (ArrayType) dataType; + return new IcebergListType( + SpecialFields.getArrayElementFieldId(fieldId, depth + 1), + !dataType.isNullable(), + toTypeObject(arrayType.getElementType(), fieldId, depth + 1)); + case MAP: + MapType mapType = (MapType) dataType; + return new IcebergMapType( + SpecialFields.getMapKeyFieldId(fieldId, depth + 1), + toTypeObject(mapType.getKeyType(), fieldId, depth + 1), + SpecialFields.getMapValueFieldId(fieldId, depth + 1), + !mapType.getValueType().isNullable(), + toTypeObject(mapType.getValueType(), fieldId, depth + 1)); + case ROW: + RowType rowType = (RowType) dataType; + return new IcebergStructType( + rowType.getFields().stream() + .map(IcebergDataField::new) + .collect(Collectors.toList())); default: throw new UnsupportedOperationException("Unsupported data type: " + dataType); } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergListType.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergListType.java new file mode 100644 index 000000000000..d25ead64fcb5 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergListType.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.metadata; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * {@link org.apache.paimon.types.ArrayType} in Iceberg. + * + *

See Iceberg spec. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class IcebergListType { + + private static final String FIELD_TYPE = "type"; + private static final String FIELD_ELEMENT_ID = "element-id"; + private static final String FIELD_ELEMENT_REQUIRED = "element-required"; + private static final String FIELD_ELEMENT = "element"; + + @JsonProperty(FIELD_TYPE) + private final String type; + + @JsonProperty(FIELD_ELEMENT_ID) + private final int elementId; + + @JsonProperty(FIELD_ELEMENT_REQUIRED) + private final boolean elementRequired; + + @JsonProperty(FIELD_ELEMENT) + private final Object element; + + public IcebergListType(int elementId, boolean elementRequired, Object element) { + this("list", elementId, elementRequired, element); + } + + @JsonCreator + public IcebergListType( + @JsonProperty(FIELD_TYPE) String type, + @JsonProperty(FIELD_ELEMENT_ID) int elementId, + @JsonProperty(FIELD_ELEMENT_REQUIRED) boolean elementRequired, + @JsonProperty(FIELD_ELEMENT) Object element) { + this.type = type; + this.elementId = elementId; + this.elementRequired = elementRequired; + this.element = element; + } + + @JsonGetter(FIELD_TYPE) + public String type() { + return type; + } + + @JsonGetter(FIELD_ELEMENT_ID) + public int elementId() { + return elementId; + } + + @JsonGetter(FIELD_ELEMENT_REQUIRED) + public boolean elementRequired() { + return elementRequired; + } + + @JsonGetter(FIELD_ELEMENT) + public Object element() { + return element; + } + + @Override + public int hashCode() { + return Objects.hash(type, elementId, elementRequired, element); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IcebergListType)) { + return false; + } + + IcebergListType that = (IcebergListType) o; + return Objects.equals(type, that.type) + && elementId == that.elementId + && elementRequired == that.elementRequired + && Objects.equals(element, that.element); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMapType.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMapType.java new file mode 100644 index 000000000000..81a3a04b1f41 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMapType.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.metadata; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * {@link org.apache.paimon.types.MapType} in Iceberg. + * + *

See Iceberg spec. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class IcebergMapType { + + private static final String FIELD_TYPE = "type"; + private static final String FIELD_KEY_ID = "key-id"; + private static final String FIELD_KEY = "key"; + private static final String FIELD_VALUE_ID = "value-id"; + private static final String FIELD_VALUE_REQUIRED = "value-required"; + private static final String FIELD_VALUE = "value"; + + @JsonProperty(FIELD_TYPE) + private final String type; + + @JsonProperty(FIELD_KEY_ID) + private final int keyId; + + @JsonProperty(FIELD_KEY) + private final Object key; + + @JsonProperty(FIELD_VALUE_ID) + private final int valueId; + + @JsonProperty(FIELD_VALUE_REQUIRED) + private final boolean valueRequired; + + @JsonProperty(FIELD_VALUE) + private final Object value; + + public IcebergMapType(int keyId, Object key, int valueId, boolean valueRequired, Object value) { + this("map", keyId, key, valueId, valueRequired, value); + } + + @JsonCreator + public IcebergMapType( + @JsonProperty(FIELD_TYPE) String type, + @JsonProperty(FIELD_KEY_ID) int keyId, + @JsonProperty(FIELD_KEY) Object key, + @JsonProperty(FIELD_VALUE_ID) int valueId, + @JsonProperty(FIELD_VALUE_REQUIRED) boolean valueRequired, + @JsonProperty(FIELD_VALUE) Object value) { + this.type = type; + this.keyId = keyId; + this.key = key; + this.valueId = valueId; + this.valueRequired = valueRequired; + this.value = value; + } + + @JsonGetter(FIELD_TYPE) + public String type() { + return type; + } + + @JsonGetter(FIELD_KEY_ID) + public int keyId() { + return keyId; + } + + @JsonGetter(FIELD_KEY) + public Object key() { + return key; + } + + @JsonGetter(FIELD_VALUE_ID) + public int valueId() { + return valueId; + } + + @JsonGetter(FIELD_VALUE_REQUIRED) + public boolean valueRequired() { + return valueRequired; + } + + @JsonGetter(FIELD_VALUE) + public Object value() { + return value; + } + + @Override + public int hashCode() { + return Objects.hash(type, keyId, key, valueId, valueRequired, value); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IcebergMapType)) { + return false; + } + IcebergMapType that = (IcebergMapType) o; + return Objects.equals(type, that.type) + && keyId == that.keyId + && Objects.equals(key, that.key) + && valueId == that.valueId + && valueRequired == that.valueRequired + && Objects.equals(value, that.value); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergStructType.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergStructType.java new file mode 100644 index 000000000000..84b0d430e438 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergStructType.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.metadata; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * {@link org.apache.paimon.types.RowType} in Iceberg. + * + *

See Iceberg spec. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class IcebergStructType { + + private static final String FIELD_TYPE = "type"; + private static final String FIELD_FIELDS = "fields"; + + @JsonProperty(FIELD_TYPE) + private final String type; + + @JsonProperty(FIELD_FIELDS) + private final List fields; + + public IcebergStructType(List fields) { + this("struct", fields); + } + + @JsonCreator + public IcebergStructType( + @JsonProperty(FIELD_TYPE) String type, + @JsonProperty(FIELD_FIELDS) List fields) { + this.type = type; + this.fields = fields; + } + + @JsonGetter(FIELD_TYPE) + public String type() { + return type; + } + + @JsonGetter(FIELD_FIELDS) + public List fields() { + return fields; + } + + @Override + public int hashCode() { + return Objects.hash(type, fields); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IcebergStructType)) { + return false; + } + + IcebergStructType that = (IcebergStructType) o; + return Objects.equals(type, that.type) && Objects.equals(fields, that.fields); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index e5b550ff94c4..7258a1dd4170 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -25,6 +25,8 @@ import org.apache.paimon.data.BinaryRowWriter; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericMap; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.disk.IOManagerImpl; @@ -44,6 +46,7 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.DataTypes; @@ -542,6 +545,56 @@ public void testAllTypeStatistics() throws Exception { } } + @Test + public void testNestedTypes() throws Exception { + RowType innerType = + RowType.of( + new DataField(2, "f1", DataTypes.STRING()), + new DataField(3, "f2", DataTypes.INT())); + RowType rowType = + RowType.of( + new DataField(0, "k", DataTypes.INT()), + new DataField( + 1, + "v", + DataTypes.MAP(DataTypes.INT(), DataTypes.ARRAY(innerType)))); + FileStoreTable table = + createPaimonTable(rowType, Collections.emptyList(), Collections.emptyList(), -1); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); + + Map map1 = new HashMap<>(); + map1.put( + 10, + new GenericArray( + new GenericRow[] { + GenericRow.of(BinaryString.fromString("apple"), 100), + GenericRow.of(BinaryString.fromString("banana"), 101) + })); + write.write(GenericRow.of(1, new GenericMap(map1))); + + Map map2 = new HashMap<>(); + map2.put( + 20, + new GenericArray( + new GenericRow[] { + GenericRow.of(BinaryString.fromString("cherry"), 200), + GenericRow.of(BinaryString.fromString("pear"), 201) + })); + write.write(GenericRow.of(2, new GenericMap(map2))); + + commit.commit(1, write.prepareCommit(false, 1)); + write.close(); + commit.close(); + + assertThat(getIcebergResult()) + .containsExactlyInAnyOrder( + "Record(1, {10=[Record(apple, 100), Record(banana, 101)]})", + "Record(2, {20=[Record(cherry, 200), Record(pear, 201)]})"); + } + // ------------------------------------------------------------------------ // Random Tests // ------------------------------------------------------------------------ diff --git a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java index 0097700c70fa..3001fefe4bb3 100644 --- a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java +++ b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java @@ -19,4 +19,11 @@ package org.apache.paimon.flink.iceberg; /** IT cases for Paimon Iceberg compatibility in Flink 1.16. */ -public class Flink116IcebergITCase extends FlinkIcebergITCaseBase {} +public class Flink116IcebergITCase extends FlinkIcebergITCaseBase { + + @Override + public void testNestedTypes(String format) { + // Flink 1.16 (or maybe Calcite?) will mistakenly cast the result to VARCHAR(5), + // so we skip this test in Flink 1.16. + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java index 413a404c41ed..9202cfb8fefb 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java @@ -404,6 +404,61 @@ public void testDropAndRecreateTable(String format) throws Exception { .containsExactlyInAnyOrder(Row.of("munich"), Row.of("cologne")); } + @ParameterizedTest + @ValueSource(strings = {"orc", "parquet", "avro"}) + public void testNestedTypes(String format) throws Exception { + String warehouse = getTempDirPath(); + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().parallelism(2).build(); + tEnv.executeSql( + "CREATE CATALOG paimon WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '" + + warehouse + + "'\n" + + ")"); + tEnv.executeSql( + "CREATE TABLE paimon.`default`.T (\n" + + " k INT,\n" + + " v MAP>,\n" + + " v2 BIGINT\n" + + ") WITH (\n" + + " 'metadata.iceberg.storage' = 'hadoop-catalog',\n" + + " 'file.format' = '" + + format + + "'\n" + + ")"); + tEnv.executeSql( + "INSERT INTO paimon.`default`.T VALUES " + + "(1, MAP[10, ARRAY[ROW('apple', 100), ROW('banana', 101)], 20, ARRAY[ROW('cat', 102), ROW('dog', 103)]], 1000), " + + "(2, MAP[10, ARRAY[ROW('cherry', 200), ROW('pear', 201)], 20, ARRAY[ROW('tiger', 202), ROW('wolf', 203)]], 2000)") + .await(); + + tEnv.executeSql( + "CREATE CATALOG iceberg WITH (\n" + + " 'type' = 'iceberg',\n" + + " 'catalog-type' = 'hadoop',\n" + + " 'warehouse' = '" + + warehouse + + "/iceberg',\n" + + " 'cache-enabled' = 'false'\n" + + ")"); + assertThat(collect(tEnv.executeSql("SELECT k, v[10], v2 FROM iceberg.`default`.T"))) + .containsExactlyInAnyOrder( + Row.of(1, new Row[] {Row.of("apple", 100), Row.of("banana", 101)}, 1000L), + Row.of(2, new Row[] {Row.of("cherry", 200), Row.of("pear", 201)}, 2000L)); + + tEnv.executeSql( + "INSERT INTO paimon.`default`.T VALUES " + + "(3, MAP[10, ARRAY[ROW('mango', 300), ROW('watermelon', 301)], 20, ARRAY[ROW('rabbit', 302), ROW('lion', 303)]], 3000)") + .await(); + assertThat( + collect( + tEnv.executeSql( + "SELECT k, v[10][2].f1, v2 FROM iceberg.`default`.T WHERE v[20][1].f2 > 200"))) + .containsExactlyInAnyOrder( + Row.of(2, "pear", 2000L), Row.of(3, "watermelon", 3000L)); + } + private List collect(TableResult result) throws Exception { List rows = new ArrayList<>(); try (CloseableIterator it = result.collect()) {