diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java index 4efb0877066d..e78ca5818d28 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java @@ -44,6 +44,8 @@ import org.apache.paimon.options.ConfigOptions; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.table.source.DataSplit; @@ -62,6 +64,7 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -170,10 +173,11 @@ private void createMetadata(long snapshotId, FileChangesCollector fileChangesCol private void createMetadataWithoutBase(long snapshotId) throws IOException { SnapshotReader snapshotReader = table.newSnapshotReader().withSnapshot(snapshotId); + SchemaCache schemas = new SchemaCache(); Iterator entryIterator = snapshotReader.read().dataSplits().stream() .filter(DataSplit::rawConvertible) - .flatMap(s -> dataSplitToManifestEntries(s, snapshotId).stream()) + .flatMap(s -> dataSplitToManifestEntries(s, snapshotId, schemas).stream()) .iterator(); List manifestFileMetas = manifestFile.rollingWrite(entryIterator, snapshotId); @@ -219,17 +223,22 @@ private void createMetadataWithoutBase(long snapshotId) throws IOException { } private List dataSplitToManifestEntries( - DataSplit dataSplit, long snapshotId) { + DataSplit dataSplit, long snapshotId, SchemaCache schemas) { List result = new ArrayList<>(); - for (RawFile rawFile : dataSplit.convertToRawFiles().get()) { + List rawFiles = dataSplit.convertToRawFiles().get(); + for (int i = 0; i < dataSplit.dataFiles().size(); i++) { + DataFileMeta paimonFileMeta = dataSplit.dataFiles().get(i); + RawFile rawFile = rawFiles.get(i); IcebergDataFileMeta fileMeta = - new IcebergDataFileMeta( + IcebergDataFileMeta.create( IcebergDataFileMeta.Content.DATA, rawFile.path(), rawFile.format(), dataSplit.partition(), rawFile.rowCount(), - rawFile.fileSize()); + rawFile.fileSize(), + schemas.get(paimonFileMeta.schemaId()), + paimonFileMeta.valueStats()); result.add( new IcebergManifestEntry( IcebergManifestEntry.Status.ADDED, @@ -414,24 +423,28 @@ private List createNewlyAddedManifestFileMetas( return Collections.emptyList(); } + SchemaCache schemas = new SchemaCache(); return manifestFile.rollingWrite( addedFiles.entrySet().stream() .map( e -> { - IcebergDataFileMeta fileMeta = - new IcebergDataFileMeta( + DataFileMeta paimonFileMeta = e.getValue().getRight(); + IcebergDataFileMeta icebergFileMeta = + IcebergDataFileMeta.create( IcebergDataFileMeta.Content.DATA, e.getKey(), - e.getValue().getRight().fileFormat(), + paimonFileMeta.fileFormat(), e.getValue().getLeft(), - e.getValue().getRight().rowCount(), - e.getValue().getRight().fileSize()); + paimonFileMeta.rowCount(), + paimonFileMeta.fileSize(), + schemas.get(paimonFileMeta.schemaId()), + paimonFileMeta.valueStats()); return new IcebergManifestEntry( IcebergManifestEntry.Status.ADDED, currentSnapshotId, currentSnapshotId, currentSnapshotId, - fileMeta); + icebergFileMeta); }) .iterator(), currentSnapshotId); @@ -660,4 +673,18 @@ private void expireAllBefore(long snapshotId) throws IOException { table.fileIO().deleteQuietly(path); } } + + // ------------------------------------------------------------------------------------- + // Utils + // ------------------------------------------------------------------------------------- + + private class SchemaCache { + + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + Map tableSchemas = new HashMap<>(); + + private TableSchema get(long schemaId) { + return tableSchemas.computeIfAbsent(schemaId, id -> schemaManager.schema(id)); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java index 292d8488d4d2..da13eb3188a8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java @@ -19,12 +19,20 @@ package org.apache.paimon.iceberg.manifest; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -69,20 +77,72 @@ public static Content fromId(int id) { private final BinaryRow partition; private final long recordCount; private final long fileSizeInBytes; + private final InternalMap nullValueCounts; + private final InternalMap lowerBounds; + private final InternalMap upperBounds; - public IcebergDataFileMeta( + IcebergDataFileMeta( Content content, String filePath, String fileFormat, BinaryRow partition, long recordCount, - long fileSizeInBytes) { + long fileSizeInBytes, + InternalMap nullValueCounts, + InternalMap lowerBounds, + InternalMap upperBounds) { this.content = content; this.filePath = filePath; this.fileFormat = fileFormat; this.partition = partition; this.recordCount = recordCount; this.fileSizeInBytes = fileSizeInBytes; + this.nullValueCounts = nullValueCounts; + this.lowerBounds = lowerBounds; + this.upperBounds = upperBounds; + } + + public static IcebergDataFileMeta create( + Content content, + String filePath, + String fileFormat, + BinaryRow partition, + long recordCount, + long fileSizeInBytes, + TableSchema tableSchema, + SimpleStats stats) { + Map nullValueCounts = new HashMap<>(); + Map lowerBounds = new HashMap<>(); + Map upperBounds = new HashMap<>(); + + List fieldGetters = new ArrayList<>(); + int numFields = tableSchema.fields().size(); + for (int i = 0; i < numFields; i++) { + fieldGetters.add(InternalRow.createFieldGetter(tableSchema.fields().get(i).type(), i)); + } + + for (int i = 0; i < numFields; i++) { + int fieldId = tableSchema.fields().get(i).id(); + DataType type = tableSchema.fields().get(i).type(); + nullValueCounts.put(fieldId, stats.nullCounts().getLong(i)); + Object minValue = fieldGetters.get(i).getFieldOrNull(stats.minValues()); + Object maxValue = fieldGetters.get(i).getFieldOrNull(stats.maxValues()); + if (minValue != null && maxValue != null) { + lowerBounds.put(fieldId, IcebergConversions.toByteBuffer(type, minValue).array()); + upperBounds.put(fieldId, IcebergConversions.toByteBuffer(type, maxValue).array()); + } + } + + return new IcebergDataFileMeta( + content, + filePath, + fileFormat, + partition, + recordCount, + fileSizeInBytes, + new GenericMap(nullValueCounts), + new GenericMap(lowerBounds), + new GenericMap(upperBounds)); } public Content content() { @@ -109,6 +169,18 @@ public long fileSizeInBytes() { return fileSizeInBytes; } + public InternalMap nullValueCounts() { + return nullValueCounts; + } + + public InternalMap lowerBounds() { + return lowerBounds; + } + + public InternalMap upperBounds() { + return upperBounds; + } + public static RowType schema(RowType partitionType) { List fields = new ArrayList<>(); fields.add(new DataField(134, "content", DataTypes.INT().notNull())); @@ -117,6 +189,21 @@ public static RowType schema(RowType partitionType) { fields.add(new DataField(102, "partition", partitionType)); fields.add(new DataField(103, "record_count", DataTypes.BIGINT().notNull())); fields.add(new DataField(104, "file_size_in_bytes", DataTypes.BIGINT().notNull())); + fields.add( + new DataField( + 110, + "null_value_counts", + DataTypes.MAP(DataTypes.INT().notNull(), DataTypes.BIGINT().notNull()))); + fields.add( + new DataField( + 125, + "lower_bounds", + DataTypes.MAP(DataTypes.INT().notNull(), DataTypes.BYTES().notNull()))); + fields.add( + new DataField( + 128, + "upper_bounds", + DataTypes.MAP(DataTypes.INT().notNull(), DataTypes.BYTES().notNull()))); return new RowType(fields); } @@ -134,11 +221,23 @@ public boolean equals(Object o) { && fileSizeInBytes == that.fileSizeInBytes && Objects.equals(filePath, that.filePath) && Objects.equals(fileFormat, that.fileFormat) - && Objects.equals(partition, that.partition); + && Objects.equals(partition, that.partition) + && Objects.equals(nullValueCounts, that.nullValueCounts) + && Objects.equals(lowerBounds, that.lowerBounds) + && Objects.equals(upperBounds, that.upperBounds); } @Override public int hashCode() { - return Objects.hash(content, filePath, fileFormat, partition, recordCount, fileSizeInBytes); + return Objects.hash( + content, + filePath, + fileFormat, + partition, + recordCount, + fileSizeInBytes, + nullValueCounts, + lowerBounds, + upperBounds); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java index b4aa281e6090..9b97425936b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java @@ -21,7 +21,9 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalMapSerializer; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.ObjectSerializer; @@ -31,10 +33,17 @@ public class IcebergDataFileMetaSerializer extends ObjectSerializer write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); + + GenericRow lowerBounds = + GenericRow.of( + 1, + true, + 10L, + 100.0f, + 1000.0, + Decimal.fromUnscaledLong(123456, 8, 3), + BinaryString.fromString("apple"), + BinaryString.fromString("cat"), + "B_apple".getBytes(), + "B_cat".getBytes(), + 100); + write.write(lowerBounds); + GenericRow upperBounds = + GenericRow.of( + 2, + true, + 20L, + 200.0f, + 2000.0, + Decimal.fromUnscaledLong(234567, 8, 3), + BinaryString.fromString("banana"), + BinaryString.fromString("dog"), + "B_banana".getBytes(), + "B_dog".getBytes(), + 200); + write.write(upperBounds); + commit.commit(1, write.prepareCommit(false, 1)); + + write.close(); + commit.close(); + + int numFields = rowType.getFieldCount(); + for (int i = 0; i < numFields; i++) { + DataType type = rowType.getTypeAt(i); + String name = rowType.getFieldNames().get(i); + if (type.getTypeRoot() == DataTypeRoot.BOOLEAN + || type.getTypeRoot() == DataTypeRoot.BINARY + || type.getTypeRoot() == DataTypeRoot.VARBINARY) { + // lower bounds and upper bounds of these types have no actual use case + continue; + } + + final Object lower; + final Object upper; + // change Paimon objects to Iceberg Java API objects + if (type.getTypeRoot() == DataTypeRoot.CHAR + || type.getTypeRoot() == DataTypeRoot.VARCHAR) { + lower = lowerBounds.getField(i).toString(); + upper = upperBounds.getField(i).toString(); + } else if (type.getTypeRoot() == DataTypeRoot.DECIMAL) { + lower = new BigDecimal(lowerBounds.getField(i).toString()); + upper = new BigDecimal(upperBounds.getField(i).toString()); + } else { + lower = lowerBounds.getField(i); + upper = upperBounds.getField(i); + } + + String expectedLower = lower.toString(); + String expectedUpper = upper.toString(); + if (type.getTypeRoot() == DataTypeRoot.DATE) { + expectedLower = LocalDate.ofEpochDay((int) lower).toString(); + expectedUpper = LocalDate.ofEpochDay((int) upper).toString(); + } + + assertThat( + getIcebergResult( + icebergTable -> + IcebergGenerics.read(icebergTable) + .select(name) + .where(Expressions.lessThan(name, upper)) + .build(), + Record::toString)) + .containsExactly("Record(" + expectedLower + ")"); + assertThat( + getIcebergResult( + icebergTable -> + IcebergGenerics.read(icebergTable) + .select(name) + .where(Expressions.greaterThan(name, lower)) + .build(), + Record::toString)) + .containsExactly("Record(" + expectedUpper + ")"); + assertThat( + getIcebergResult( + icebergTable -> + IcebergGenerics.read(icebergTable) + .select(name) + .where(Expressions.lessThan(name, lower)) + .build(), + Record::toString)) + .isEmpty(); + assertThat( + getIcebergResult( + icebergTable -> + IcebergGenerics.read(icebergTable) + .select(name) + .where(Expressions.greaterThan(name, upper)) + .build(), + Record::toString)) + .isEmpty(); + } + } + // ------------------------------------------------------------------------ // Random Tests // ------------------------------------------------------------------------ @@ -471,126 +614,6 @@ public void testPartitionedPrimaryKeyTable() throws Exception { Record::toString); } - @Test - public void testAppendOnlyTableWithAllTypes() throws Exception { - RowType rowType = - RowType.of( - new DataType[] { - DataTypes.INT(), - DataTypes.BOOLEAN(), - DataTypes.BIGINT(), - DataTypes.FLOAT(), - DataTypes.DOUBLE(), - DataTypes.DECIMAL(8, 3), - DataTypes.CHAR(20), - DataTypes.STRING(), - DataTypes.BINARY(20), - DataTypes.VARBINARY(20), - DataTypes.DATE() - }, - new String[] { - "pt", - "v_boolean", - "v_bigint", - "v_float", - "v_double", - "v_decimal", - "v_char", - "v_varchar", - "v_binary", - "v_varbinary", - "v_date" - }); - - Function binaryRow = - (pt) -> { - BinaryRow b = new BinaryRow(1); - BinaryRowWriter writer = new BinaryRowWriter(b); - writer.writeInt(0, pt); - writer.complete(); - return b; - }; - - int numRounds = 5; - int numRecords = 500; - ThreadLocalRandom random = ThreadLocalRandom.current(); - List> testRecords = new ArrayList<>(); - List> expected = new ArrayList<>(); - List currentExpected = new ArrayList<>(); - for (int r = 0; r < numRounds; r++) { - List round = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - int pt = random.nextInt(0, 2); - Boolean vBoolean = random.nextBoolean() ? random.nextBoolean() : null; - Long vBigInt = random.nextBoolean() ? random.nextLong() : null; - Float vFloat = random.nextBoolean() ? random.nextFloat() : null; - Double vDouble = random.nextBoolean() ? random.nextDouble() : null; - Decimal vDecimal = - random.nextBoolean() - ? Decimal.fromUnscaledLong(random.nextLong(0, 100000000), 8, 3) - : null; - String vChar = random.nextBoolean() ? String.valueOf(random.nextInt()) : null; - String vVarChar = random.nextBoolean() ? String.valueOf(random.nextInt()) : null; - byte[] vBinary = - random.nextBoolean() ? String.valueOf(random.nextInt()).getBytes() : null; - byte[] vVarBinary = - random.nextBoolean() ? String.valueOf(random.nextInt()).getBytes() : null; - Integer vDate = random.nextBoolean() ? random.nextInt(0, 30000) : null; - - round.add( - new TestRecord( - binaryRow.apply(pt), - GenericRow.of( - pt, - vBoolean, - vBigInt, - vFloat, - vDouble, - vDecimal, - BinaryString.fromString(vChar), - BinaryString.fromString(vVarChar), - vBinary, - vVarBinary, - vDate))); - currentExpected.add( - String.format( - "%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s", - pt, - vBoolean, - vBigInt, - vFloat, - vDouble, - vDecimal, - vChar, - vVarChar, - vBinary == null ? null : new String(vBinary), - vVarBinary == null ? null : new String(vVarBinary), - vDate == null ? null : LocalDate.ofEpochDay(vDate))); - } - testRecords.add(round); - expected.add(new ArrayList<>(currentExpected)); - } - - runCompatibilityTest( - rowType, - Collections.emptyList(), - Collections.emptyList(), - testRecords, - expected, - r -> - IntStream.range(0, rowType.getFieldCount()) - .mapToObj( - i -> { - Object field = r.get(i); - if (field instanceof ByteBuffer) { - return new String(((ByteBuffer) field).array()); - } else { - return String.valueOf(field); - } - }) - .collect(Collectors.joining(", "))); - } - private void runCompatibilityTest( RowType rowType, List partitionKeys, diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java index de3211ad2e4e..87c08396f3da 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java @@ -213,6 +213,9 @@ public static Schema convertToSchema( ? kvBuilder.withDefault(null) : kvBuilder.noDefault(); map = SchemaBuilder.builder().array().items(assembler.endRecord()); + // Compatible with Iceberg's avro format. + // We don't use avro's logical type to check if an array field is actually a + // map. We use Paimon's DataType to check this. map = LogicalMap.get().addToSchema(map); } else { map = diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaVisitor.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaVisitor.java index 421afc23e94c..ab719ce0f86d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaVisitor.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaVisitor.java @@ -50,12 +50,9 @@ default T visit(Schema schema, DataType type) { return visitUnion(schema, type); case ARRAY: - if (schema.getLogicalType() instanceof LogicalMap) { + if (type instanceof MapType) { MapType mapType = (MapType) type; - return visitArrayMap( - schema, - mapType == null ? null : mapType.getKeyType(), - mapType == null ? null : mapType.getValueType()); + return visitArrayMap(schema, mapType.getKeyType(), mapType.getValueType()); } else { return visitArray( schema, type == null ? null : ((ArrayType) type).getElementType()); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/LogicalMap.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/LogicalMap.java index 9675c503794f..75506854642a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/LogicalMap.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/LogicalMap.java @@ -28,16 +28,18 @@ * *

Modified from Iceberg. + * + *

NOTE: We don't register this type into Avro's {@link org.apache.avro.LogicalTypes}, because + * Iceberg will register a logical type with the same name, and we don't want to conflict with + * Iceberg. This class is only for our avro's format to be compatible with Iceberg's avro format. We + * use Paimon's {@link org.apache.paimon.types.DataType} to check if an array field is actually a + * map. */ public class LogicalMap extends LogicalType { private static final String NAME = "map"; private static final LogicalMap INSTANCE = new LogicalMap(); - static String name() { - return NAME; - } - static LogicalMap get() { return INSTANCE; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/LogicalMapFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/LogicalMapFactory.java deleted file mode 100644 index f69db88c8ea0..000000000000 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/LogicalMapFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.format.avro; - -import org.apache.avro.LogicalType; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; - -/** Factory to create {@link LogicalMap}. */ -public class LogicalMapFactory implements LogicalTypes.LogicalTypeFactory { - - @Override - public LogicalType fromSchema(Schema schema) { - return LogicalMap.get(); - } - - @Override - public String getTypeName() { - return LogicalMap.name(); - } -} diff --git a/paimon-format/src/main/resources/META-INF/services/org.apache.avro.LogicalTypes$LogicalTypeFactory b/paimon-format/src/main/resources/META-INF/services/org.apache.avro.LogicalTypes$LogicalTypeFactory deleted file mode 100644 index 0cbada05f956..000000000000 --- a/paimon-format/src/main/resources/META-INF/services/org.apache.avro.LogicalTypes$LogicalTypeFactory +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.paimon.format.avro.LogicalMapFactory