diff --git a/docs/content/flink/sql-ddl.md b/docs/content/flink/sql-ddl.md index 363d7475761c..0324e6655689 100644 --- a/docs/content/flink/sql-ddl.md +++ b/docs/content/flink/sql-ddl.md @@ -203,6 +203,9 @@ Paimon will automatically collect the statistics of the data file for speeding u The statistics collector mode can be configured by `'metadata.stats-mode'`, by default is `'truncate(16)'`. You can configure the field level by setting `'fields.{field_name}.stats-mode'`. +For the stats mode of `none`, we suggest that you configure `metadata.stats-dense-store` = `true`, which will +significantly reduce the storage size of the manifest. + ### Field Default Value Paimon table currently supports setting default values for fields in table properties by `'fields.item_id.default-value'`, diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index ed05e7e45841..aeab93eda81f 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -471,6 +471,12 @@

Enum

Specify the merge engine for table with primary key.

Possible values: + +
metadata.stats-dense-store
+ false + Boolean + Whether to store statistic densely in metadata (manifest files), which will significantly reduce the storage size of metadata when the none statistic mode is set.
Note, when this mode is enabled, the Paimon sdk in reading engine requires at least version 0.9.1 or 1.0.0 or higher. +
metadata.stats-mode
"truncate(16)" diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 51aca09e84df..1256c7ba0d87 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1026,7 +1026,7 @@ public class CoreOptions implements Serializable { public static final String STATS_MODE_SUFFIX = "stats-mode"; public static final ConfigOption METADATA_STATS_MODE = - key("metadata." + STATS_MODE_SUFFIX) + key("metadata.stats-mode") .stringType() .defaultValue("truncate(16)") .withDescription( @@ -1053,6 +1053,22 @@ public class CoreOptions implements Serializable { + STATS_MODE_SUFFIX)) .build()); + public static final ConfigOption METADATA_STATS_DENSE_STORE = + key("metadata.stats-dense-store") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text( + "Whether to store statistic densely in metadata (manifest files), which" + + " will significantly reduce the storage size of metadata when the" + + " none statistic mode is set.") + .linebreak() + .text( + "Note, when this mode is enabled, the Paimon sdk in reading engine requires" + + " at least version 0.9.1 or 1.0.0 or higher.") + .build()); + public static final ConfigOption COMMIT_CALLBACKS = key("commit.callbacks") .stringType() @@ -2233,6 +2249,10 @@ public boolean asyncFileWrite() { return options.get(ASYNC_FILE_WRITE); } + public boolean statsDenseStore() { + return options.get(METADATA_STATS_DENSE_STORE); + } + /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-common/src/main/java/org/apache/paimon/format/SimpleColStats.java b/paimon-common/src/main/java/org/apache/paimon/format/SimpleColStats.java index 5d5891a8a3b2..0b0062b7568f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/SimpleColStats.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/SimpleColStats.java @@ -33,6 +33,8 @@ */ public class SimpleColStats { + public static final SimpleColStats NONE = new SimpleColStats(null, null, null); + @Nullable private final Object min; @Nullable private final Object max; private final Long nullCount; @@ -58,6 +60,10 @@ public Long nullCount() { return nullCount; } + public boolean isNone() { + return min == null && max == null && nullCount == null; + } + @Override public boolean equals(Object o) { if (!(o instanceof SimpleColStats)) { diff --git a/paimon-common/src/main/java/org/apache/paimon/statistics/NoneSimpleColStatsCollector.java b/paimon-common/src/main/java/org/apache/paimon/statistics/NoneSimpleColStatsCollector.java index d4f645b49957..57369c9cd89e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/statistics/NoneSimpleColStatsCollector.java +++ b/paimon-common/src/main/java/org/apache/paimon/statistics/NoneSimpleColStatsCollector.java @@ -29,11 +29,11 @@ public void collect(Object field, Serializer fieldSerializer) {} @Override public SimpleColStats result() { - return new SimpleColStats(null, null, null); + return SimpleColStats.NONE; } @Override public SimpleColStats convert(SimpleColStats source) { - return new SimpleColStats(null, null, null); + return SimpleColStats.NONE; } } diff --git a/paimon-common/src/main/java/org/apache/paimon/table/SystemFields.java b/paimon-common/src/main/java/org/apache/paimon/table/SystemFields.java index 4ed212f362e2..15568398114a 100644 --- a/paimon-common/src/main/java/org/apache/paimon/table/SystemFields.java +++ b/paimon-common/src/main/java/org/apache/paimon/table/SystemFields.java @@ -55,4 +55,8 @@ public class SystemFields { public static boolean isSystemField(int fieldId) { return fieldId >= SYSTEM_FIELD_ID_START; } + + public static boolean isSystemField(String field) { + return field.startsWith(KEY_FIELD_PREFIX) || SYSTEM_FIELD_NAMES.contains(field); + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java index 0d0383747ffe..eaa077fa81ff 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java @@ -43,6 +43,8 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; +import javax.annotation.Nullable; + import java.math.BigDecimal; import java.math.RoundingMode; import java.util.ArrayList; @@ -198,7 +200,11 @@ public static Object get(DataGetters dataGetters, int pos, DataType fieldType) { } } - public static InternalArray toStringArrayData(List list) { + public static InternalArray toStringArrayData(@Nullable List list) { + if (list == null) { + return null; + } + return new GenericArray(list.stream().map(BinaryString::fromString).toArray()); } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedArray.java b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedArray.java new file mode 100644 index 000000000000..2182ea1a32bc --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedArray.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.DataType; + +/** + * An implementation of {@link InternalArray} which provides a projected view of the underlying + * {@link InternalArray}. + * + *

Projection includes both reducing the accessible fields and reordering them. + * + *

Note: This class supports only top-level projections, not nested projections. + */ +public class ProjectedArray implements InternalArray { + + private final int[] indexMapping; + + private InternalArray array; + + private ProjectedArray(int[] indexMapping) { + this.indexMapping = indexMapping; + } + + /** + * Replaces the underlying {@link InternalArray} backing this {@link ProjectedArray}. + * + *

This method replaces the row data in place and does not return a new object. This is done + * for performance reasons. + */ + public ProjectedArray replaceArray(InternalArray array) { + this.array = array; + return this; + } + + // --------------------------------------------------------------------------------------------- + + @Override + public int size() { + return indexMapping.length; + } + + @Override + public boolean isNullAt(int pos) { + if (indexMapping[pos] < 0) { + return true; + } + return array.isNullAt(indexMapping[pos]); + } + + @Override + public boolean getBoolean(int pos) { + return array.getBoolean(indexMapping[pos]); + } + + @Override + public byte getByte(int pos) { + return array.getByte(indexMapping[pos]); + } + + @Override + public short getShort(int pos) { + return array.getShort(indexMapping[pos]); + } + + @Override + public int getInt(int pos) { + return array.getInt(indexMapping[pos]); + } + + @Override + public long getLong(int pos) { + return array.getLong(indexMapping[pos]); + } + + @Override + public float getFloat(int pos) { + return array.getFloat(indexMapping[pos]); + } + + @Override + public double getDouble(int pos) { + return array.getDouble(indexMapping[pos]); + } + + @Override + public BinaryString getString(int pos) { + return array.getString(indexMapping[pos]); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + return array.getDecimal(indexMapping[pos], precision, scale); + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + return array.getTimestamp(indexMapping[pos], precision); + } + + @Override + public byte[] getBinary(int pos) { + return array.getBinary(indexMapping[pos]); + } + + @Override + public InternalArray getArray(int pos) { + return array.getArray(indexMapping[pos]); + } + + @Override + public InternalMap getMap(int pos) { + return array.getMap(indexMapping[pos]); + } + + @Override + public InternalRow getRow(int pos, int numFields) { + return array.getRow(indexMapping[pos], numFields); + } + + @Override + public boolean equals(Object o) { + throw new UnsupportedOperationException("Projected row data cannot be compared"); + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException("Projected row data cannot be hashed"); + } + + @Override + public String toString() { + throw new UnsupportedOperationException("Projected row data cannot be toString"); + } + + /** + * Create an empty {@link ProjectedArray} starting from a {@code projection} array. + * + *

The array represents the mapping of the fields of the original {@link DataType}. For + * example, {@code [0, 2, 1]} specifies to include in the following order the 1st field, the 3rd + * field and the 2nd field of the row. + * + * @see Projection + * @see ProjectedArray + */ + public static ProjectedArray from(int[] projection) { + return new ProjectedArray(projection); + } + + @Override + public boolean[] toBooleanArray() { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] toByteArray() { + throw new UnsupportedOperationException(); + } + + @Override + public short[] toShortArray() { + throw new UnsupportedOperationException(); + } + + @Override + public int[] toIntArray() { + throw new UnsupportedOperationException(); + } + + @Override + public long[] toLongArray() { + throw new UnsupportedOperationException(); + } + + @Override + public float[] toFloatArray() { + throw new UnsupportedOperationException(); + } + + @Override + public double[] toDoubleArray() { + throw new UnsupportedOperationException(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 402c6c1f45f4..47502aa707d1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -75,6 +75,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { private final IOFunction, RecordReaderIterator> bucketFileRead; private final boolean forceCompact; private final boolean asyncFileWrite; + private final boolean statsDenseStore; private final List newFiles; private final List deletedFiles; private final List compactBefore; @@ -111,7 +112,8 @@ public AppendOnlyWriter( SimpleColStatsCollector.Factory[] statsCollectors, MemorySize maxDiskSize, FileIndexOptions fileIndexOptions, - boolean asyncFileWrite) { + boolean asyncFileWrite, + boolean statsDenseStore) { this.fileIO = fileIO; this.schemaId = schemaId; this.fileFormat = fileFormat; @@ -122,6 +124,7 @@ public AppendOnlyWriter( this.bucketFileRead = bucketFileRead; this.forceCompact = forceCompact; this.asyncFileWrite = asyncFileWrite; + this.statsDenseStore = statsDenseStore; this.newFiles = new ArrayList<>(); this.deletedFiles = new ArrayList<>(); this.compactBefore = new ArrayList<>(); @@ -286,7 +289,8 @@ private RowDataRollingFileWriter createRollingRowWriter() { statsCollectors, fileIndexOptions, FileSource.APPEND, - asyncFileWrite); + asyncFileWrite, + statsDenseStore); } private void trySyncLatestCompaction(boolean blocking) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 364d4d42eb23..bfda80db984c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -78,7 +78,11 @@ public class DataFileMeta { new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()), new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)), new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true)), - new DataField(15, "_FILE_SOURCE", new TinyIntType(true)))); + new DataField(15, "_FILE_SOURCE", new TinyIntType(true)), + new DataField( + 16, + "_VALUE_STATS_COLS", + DataTypes.ARRAY(DataTypes.STRING().notNull())))); public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW; public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW; @@ -114,27 +118,7 @@ public class DataFileMeta { private final @Nullable FileSource fileSource; - public static DataFileMeta forAppend( - String fileName, - long fileSize, - long rowCount, - SimpleStats rowStats, - long minSequenceNumber, - long maxSequenceNumber, - long schemaId, - @Nullable FileSource fileSource) { - return forAppend( - fileName, - fileSize, - rowCount, - rowStats, - minSequenceNumber, - maxSequenceNumber, - schemaId, - Collections.emptyList(), - null, - fileSource); - } + private final @Nullable List valueStatsCols; public static DataFileMeta forAppend( String fileName, @@ -146,7 +130,8 @@ public static DataFileMeta forAppend( long schemaId, List extraFiles, @Nullable byte[] embeddedIndex, - @Nullable FileSource fileSource) { + @Nullable FileSource fileSource, + @Nullable List valueStatsCols) { return new DataFileMeta( fileName, fileSize, @@ -163,7 +148,8 @@ public static DataFileMeta forAppend( Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(), 0L, embeddedIndex, - fileSource); + fileSource, + valueStatsCols); } public DataFileMeta( @@ -180,7 +166,8 @@ public DataFileMeta( int level, @Nullable Long deleteRowCount, @Nullable byte[] embeddedIndex, - @Nullable FileSource fileSource) { + @Nullable FileSource fileSource, + @Nullable List valueStatsCols) { this( fileName, fileSize, @@ -197,7 +184,8 @@ public DataFileMeta( Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(), deleteRowCount, embeddedIndex, - fileSource); + fileSource, + valueStatsCols); } public DataFileMeta( @@ -216,7 +204,8 @@ public DataFileMeta( Timestamp creationTime, @Nullable Long deleteRowCount, @Nullable byte[] embeddedIndex, - @Nullable FileSource fileSource) { + @Nullable FileSource fileSource, + @Nullable List valueStatsCols) { this.fileName = fileName; this.fileSize = fileSize; @@ -237,6 +226,7 @@ public DataFileMeta( this.deleteRowCount = deleteRowCount; this.fileSource = fileSource; + this.valueStatsCols = valueStatsCols; } public String fileName() { @@ -334,6 +324,11 @@ public Optional fileSource() { return Optional.ofNullable(fileSource); } + @Nullable + public List valueStatsCols() { + return valueStatsCols; + } + public DataFileMeta upgrade(int newLevel) { checkArgument(newLevel > this.level); return new DataFileMeta( @@ -352,7 +347,8 @@ public DataFileMeta upgrade(int newLevel) { creationTime, deleteRowCount, embeddedIndex, - fileSource); + fileSource, + valueStatsCols); } public DataFileMeta rename(String newFileName) { @@ -372,7 +368,8 @@ public DataFileMeta rename(String newFileName) { creationTime, deleteRowCount, embeddedIndex, - fileSource); + fileSource, + valueStatsCols); } public List collectFiles(DataFilePathFactory pathFactory) { @@ -399,7 +396,8 @@ public DataFileMeta copy(List newExtraFiles) { creationTime, deleteRowCount, embeddedIndex, - fileSource); + fileSource, + valueStatsCols); } public DataFileMeta copy(byte[] newEmbeddedIndex) { @@ -419,7 +417,8 @@ public DataFileMeta copy(byte[] newEmbeddedIndex) { creationTime, deleteRowCount, newEmbeddedIndex, - fileSource); + fileSource, + valueStatsCols); } @Override @@ -446,7 +445,8 @@ public boolean equals(Object o) { && Objects.equals(extraFiles, that.extraFiles) && Objects.equals(creationTime, that.creationTime) && Objects.equals(deleteRowCount, that.deleteRowCount) - && Objects.equals(fileSource, that.fileSource); + && Objects.equals(fileSource, that.fileSource) + && Objects.equals(valueStatsCols, that.valueStatsCols); } @Override @@ -467,7 +467,8 @@ public int hashCode() { extraFiles, creationTime, deleteRowCount, - fileSource); + fileSource, + valueStatsCols); } @Override @@ -477,7 +478,7 @@ public String toString() { + "minKey: %s, maxKey: %s, keyStats: %s, valueStats: %s, " + "minSequenceNumber: %d, maxSequenceNumber: %d, " + "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, " - + "deleteRowCount: %d, fileSource: %s}", + + "deleteRowCount: %d, fileSource: %s, valueStatsCols: %s}", fileName, fileSize, rowCount, @@ -493,7 +494,8 @@ public String toString() { extraFiles, creationTime, deleteRowCount, - fileSource); + fileSource, + valueStatsCols); } public static long getMaxSequenceNumber(List fileMetas) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java index c65f7e78ed6d..03e4ed51f4be 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java @@ -132,6 +132,7 @@ public DataFileMeta deserialize(DataInputView in) throws IOException { row.getTimestamp(12, 3), row.isNullAt(13) ? null : row.getLong(13), row.isNullAt(14) ? null : row.getBinary(14), + null, null); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java new file mode 100644 index 000000000000..2f8d89f5b1ab --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.safe.SafeBinaryRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TinyIntType; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData; +import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData; +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; +import static org.apache.paimon.utils.SerializationUtils.newBytesType; +import static org.apache.paimon.utils.SerializationUtils.newStringType; +import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; + +/** Serializer for {@link DataFileMeta} with 0.9 version. */ +public class DataFileMeta09Serializer implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final RowType SCHEMA = + new RowType( + false, + Arrays.asList( + new DataField(0, "_FILE_NAME", newStringType(false)), + new DataField(1, "_FILE_SIZE", new BigIntType(false)), + new DataField(2, "_ROW_COUNT", new BigIntType(false)), + new DataField(3, "_MIN_KEY", newBytesType(false)), + new DataField(4, "_MAX_KEY", newBytesType(false)), + new DataField(5, "_KEY_STATS", SimpleStats.SCHEMA), + new DataField(6, "_VALUE_STATS", SimpleStats.SCHEMA), + new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false)), + new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false)), + new DataField(9, "_SCHEMA_ID", new BigIntType(false)), + new DataField(10, "_LEVEL", new IntType(false)), + new DataField( + 11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))), + new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()), + new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)), + new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true)), + new DataField(15, "_FILE_SOURCE", new TinyIntType(true)))); + + protected final InternalRowSerializer rowSerializer; + + public DataFileMeta09Serializer() { + this.rowSerializer = InternalSerializers.create(SCHEMA); + } + + public final void serializeList(List records, DataOutputView target) + throws IOException { + target.writeInt(records.size()); + for (DataFileMeta t : records) { + serialize(t, target); + } + } + + public void serialize(DataFileMeta meta, DataOutputView target) throws IOException { + GenericRow row = + GenericRow.of( + BinaryString.fromString(meta.fileName()), + meta.fileSize(), + meta.rowCount(), + serializeBinaryRow(meta.minKey()), + serializeBinaryRow(meta.maxKey()), + meta.keyStats().toRow(), + meta.valueStats().toRow(), + meta.minSequenceNumber(), + meta.maxSequenceNumber(), + meta.schemaId(), + meta.level(), + toStringArrayData(meta.extraFiles()), + meta.creationTime(), + meta.deleteRowCount().orElse(null), + meta.embeddedIndex(), + meta.fileSource().map(FileSource::toByteValue).orElse(null)); + rowSerializer.serialize(row, target); + } + + public final List deserializeList(DataInputView source) throws IOException { + int size = source.readInt(); + List records = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + records.add(deserialize(source)); + } + return records; + } + + public DataFileMeta deserialize(DataInputView in) throws IOException { + byte[] bytes = new byte[in.readInt()]; + in.readFully(bytes); + SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes, 0); + return new DataFileMeta( + row.getString(0).toString(), + row.getLong(1), + row.getLong(2), + deserializeBinaryRow(row.getBinary(3)), + deserializeBinaryRow(row.getBinary(4)), + SimpleStats.fromRow(row.getRow(5, 3)), + SimpleStats.fromRow(row.getRow(6, 3)), + row.getLong(7), + row.getLong(8), + row.getLong(9), + row.getInt(10), + fromStringArrayData(row.getArray(11)), + row.getTimestamp(12, 3), + row.isNullAt(13) ? null : row.getLong(13), + row.isNullAt(14) ? null : row.getBinary(14), + row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)), + null); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java index 209aaafd8bf1..626201ca30ce 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java @@ -57,7 +57,8 @@ public InternalRow toRow(DataFileMeta meta) { meta.creationTime(), meta.deleteRowCount().orElse(null), meta.embeddedIndex(), - meta.fileSource().map(FileSource::toByteValue).orElse(null)); + meta.fileSource().map(FileSource::toByteValue).orElse(null), + toStringArrayData(meta.valueStatsCols())); } @Override @@ -78,6 +79,7 @@ public DataFileMeta fromRow(InternalRow row) { row.getTimestamp(12, 3), row.isNullAt(13) ? null : row.getLong(13), row.isNullAt(14) ? null : row.getBinary(14), - row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15))); + row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)), + row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16))); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index 064d72829141..ba42f87209e7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -32,6 +32,7 @@ import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.stats.SimpleStatsConverter; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.StatsCollectorFactories; import org.slf4j.Logger; @@ -41,6 +42,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.function.Function; /** @@ -102,7 +104,7 @@ public KeyValueDataFileWriter( this.level = level; this.keyStatsConverter = new SimpleStatsConverter(keyType); - this.valueStatsConverter = new SimpleStatsConverter(valueType); + this.valueStatsConverter = new SimpleStatsConverter(valueType, options.statsDenseStore()); this.keySerializer = new InternalRowSerializer(keyType); this.fileSource = fileSource; } @@ -155,11 +157,13 @@ public DataFileMeta result() throws IOException { int numKeyFields = keyType.getFieldCount(); SimpleColStats[] keyFieldStats = Arrays.copyOfRange(rowStats, 0, numKeyFields); - SimpleStats keyStats = keyStatsConverter.toBinary(keyFieldStats); + SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyFieldStats); SimpleColStats[] valFieldStats = Arrays.copyOfRange(rowStats, numKeyFields + 2, rowStats.length); - SimpleStats valueStats = valueStatsConverter.toBinary(valFieldStats); + + Pair, SimpleStats> valueStatsPair = + valueStatsConverter.toBinary(valFieldStats); return new DataFileMeta( path.getName(), @@ -168,7 +172,7 @@ public DataFileMeta result() throws IOException { minKey, keySerializer.toBinaryRow(maxKey).copy(), keyStats, - valueStats, + valueStatsPair.getValue(), minSeqNumber, maxSeqNumber, schemaId, @@ -176,6 +180,7 @@ public DataFileMeta result() throws IOException { deleteRecordCount, // TODO: enable file filter for primary key table (e.g. deletion table). null, - fileSource); + fileSource, + valueStatsPair.getKey()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index f9c9b950214f..8c2e8ec9498c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -30,11 +30,13 @@ import org.apache.paimon.stats.SimpleStatsConverter; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.LongCounter; +import org.apache.paimon.utils.Pair; import javax.annotation.Nullable; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.function.Function; import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath; @@ -63,7 +65,8 @@ public RowDataFileWriter( SimpleColStatsCollector.Factory[] statsCollectors, FileIndexOptions fileIndexOptions, FileSource fileSource, - boolean asyncFileWrite) { + boolean asyncFileWrite, + boolean statsDenseStore) { super( fileIO, factory, @@ -76,7 +79,7 @@ public RowDataFileWriter( asyncFileWrite); this.schemaId = schemaId; this.seqNumCounter = seqNumCounter; - this.statsArraySerializer = new SimpleStatsConverter(writeSchema); + this.statsArraySerializer = new SimpleStatsConverter(writeSchema, statsDenseStore); this.dataFileIndexWriter = DataFileIndexWriter.create( fileIO, dataFileToFileIndexPath(path), writeSchema, fileIndexOptions); @@ -103,7 +106,7 @@ public void close() throws IOException { @Override public DataFileMeta result() throws IOException { - SimpleStats stats = statsArraySerializer.toBinary(fieldStats()); + Pair, SimpleStats> statsPair = statsArraySerializer.toBinary(fieldStats()); DataFileIndexWriter.FileIndexResult indexResult = dataFileIndexWriter == null ? DataFileIndexWriter.EMPTY_RESULT @@ -112,7 +115,7 @@ public DataFileMeta result() throws IOException { path.getName(), fileIO.getFileSize(path), recordCount(), - stats, + statsPair.getRight(), seqNumCounter.getValue() - super.recordCount(), seqNumCounter.getValue() - 1, schemaId, @@ -120,6 +123,7 @@ public DataFileMeta result() throws IOException { ? Collections.emptyList() : Collections.singletonList(indexResult.independentIndexFile()), indexResult.embeddedIndexBytes(), - fileSource); + fileSource, + statsPair.getKey()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java index e60913d25f87..b929a4ae22af 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java @@ -43,7 +43,8 @@ public RowDataRollingFileWriter( SimpleColStatsCollector.Factory[] statsCollectors, FileIndexOptions fileIndexOptions, FileSource fileSource, - boolean asyncFileWrite) { + boolean asyncFileWrite, + boolean statsDenseStore) { super( () -> new RowDataFileWriter( @@ -62,7 +63,8 @@ public RowDataRollingFileWriter( statsCollectors, fileIndexOptions, fileSource, - asyncFileWrite), + asyncFileWrite, + statsDenseStore), targetFileSize); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java index 48216b0c474b..38181a8234aa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java @@ -154,7 +154,7 @@ public ManifestFileMeta result() throws IOException { fileIO.getFileSize(path), numAddedFiles, numDeletedFiles, - partitionStatsSerializer.toBinary(partitionStatsCollector.extract()), + partitionStatsSerializer.toBinaryAllMode(partitionStatsCollector.extract()), numAddedFiles + numDeletedFiles > 0 ? schemaId : schemaManager.latest().get().id()); diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java index 4c299bd6074b..391c5f9bb615 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java @@ -156,7 +156,7 @@ private static DataFileMeta constructFileMeta( Pair fileInfo = simpleStatsExtractor.extractWithFileInfo(fileIO, path); - SimpleStats stats = statsArraySerializer.toBinary(fileInfo.getLeft()); + SimpleStats stats = statsArraySerializer.toBinaryAllMode(fileInfo.getLeft()); return DataFileMeta.forAppend( fileName, @@ -166,7 +166,10 @@ private static DataFileMeta constructFileMeta( 0, 0, ((FileStoreTable) table).schema().id(), - FileSource.APPEND); + Collections.emptyList(), + null, + FileSource.APPEND, + null); } public static BinaryRow writePartitionValue( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index a3bc9c22dc53..50a8c74dcf4a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -25,9 +25,8 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.stats.SimpleStats; -import org.apache.paimon.stats.SimpleStatsConverter; -import org.apache.paimon.stats.SimpleStatsConverters; +import org.apache.paimon.stats.SimpleStatsEvolution; +import org.apache.paimon.stats.SimpleStatsEvolutions; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; @@ -42,7 +41,7 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan { private final BucketSelectConverter bucketSelectConverter; - private final SimpleStatsConverters simpleStatsConverters; + private final SimpleStatsEvolutions simpleStatsEvolutions; private final boolean fileIndexReadEnabled; @@ -68,8 +67,8 @@ public AppendOnlyFileStoreScan( manifestFileFactory, scanManifestParallelism); this.bucketSelectConverter = bucketSelectConverter; - this.simpleStatsConverters = - new SimpleStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id()); + this.simpleStatsEvolutions = + new SimpleStatsEvolutions(sid -> scanTableSchema(sid).fields(), schema.id()); this.fileIndexReadEnabled = fileIndexReadEnabled; } @@ -86,15 +85,18 @@ protected boolean filterByStats(ManifestEntry entry) { return true; } - SimpleStatsConverter serializer = - simpleStatsConverters.getOrCreate(entry.file().schemaId()); - SimpleStats stats = entry.file().valueStats(); + SimpleStatsEvolution evolution = simpleStatsEvolutions.getOrCreate(entry.file().schemaId()); + SimpleStatsEvolution.Result stats = + evolution.evolution( + entry.file().valueStats(), + entry.file().rowCount(), + entry.file().valueStatsCols()); return filter.test( entry.file().rowCount(), - serializer.evolution(stats.minValues()), - serializer.evolution(stats.maxValues()), - serializer.evolution(stats.nullCounts(), entry.file().rowCount())) + stats.minValues(), + stats.maxValues(), + stats.nullCounts()) && (!fileIndexReadEnabled || testFileIndex(entry.file().embeddedIndex(), entry)); } @@ -114,7 +116,7 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry Predicate dataPredicate = dataFilterMapping.computeIfAbsent( entry.file().schemaId(), - id -> simpleStatsConverters.convertFilter(entry.file().schemaId(), filter)); + id -> simpleStatsEvolutions.convertFilter(entry.file().schemaId(), filter)); try (FileIndexPredicate predicate = new FileIndexPredicate(embeddedIndexBytes, dataRowType)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 0a4d5d56a13e..3ce019c91638 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -132,7 +132,8 @@ protected RecordWriter createWriter( statsCollectors, options.writeBufferSpillDiskSize(), fileIndexOptions, - options.asyncFileWrite()); + options.asyncFileWrite(), + options.statsDenseStore()); } protected abstract CompactManager getCompactManager( @@ -193,7 +194,8 @@ private RowDataRollingFileWriter createRollingFileWriter( statsCollectors, fileIndexOptions, FileSource.COMPACT, - options.asyncFileWrite()); + options.asyncFileWrite(), + options.statsDenseStore()); } private RecordReaderIterator createFilesIterator( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 3311161b54a5..8300bdcfaff9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -21,15 +21,15 @@ import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.KeyValueFileStore; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.stats.SimpleStats; -import org.apache.paimon.stats.SimpleStatsConverter; -import org.apache.paimon.stats.SimpleStatsConverters; +import org.apache.paimon.stats.SimpleStatsEvolution; +import org.apache.paimon.stats.SimpleStatsEvolutions; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.utils.SnapshotManager; @@ -44,8 +44,8 @@ /** {@link FileStoreScan} for {@link KeyValueFileStore}. */ public class KeyValueFileStoreScan extends AbstractFileStoreScan { - private final SimpleStatsConverters fieldKeyStatsConverters; - private final SimpleStatsConverters fieldValueStatsConverters; + private final SimpleStatsEvolutions fieldKeyStatsConverters; + private final SimpleStatsEvolutions fieldValueStatsConverters; private final BucketSelectConverter bucketSelectConverter; private Predicate keyFilter; @@ -75,11 +75,11 @@ public KeyValueFileStoreScan( scanManifestParallelism); this.bucketSelectConverter = bucketSelectConverter; this.fieldKeyStatsConverters = - new SimpleStatsConverters( + new SimpleStatsEvolutions( sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), schema.id()); this.fieldValueStatsConverters = - new SimpleStatsConverters( + new SimpleStatsEvolutions( sid -> keyValueFieldsExtractor.valueFields(scanTableSchema(sid)), schema.id()); this.deletionVectorsEnabled = deletionVectorsEnabled; @@ -101,30 +101,21 @@ public KeyValueFileStoreScan withValueFilter(Predicate predicate) { /** Note: Keep this thread-safe. */ @Override protected boolean filterByStats(ManifestEntry entry) { - Predicate filter = null; - SimpleStatsConverter serializer = null; - SimpleStats stats = null; - if (isValueFilterEnabled(entry)) { - filter = valueFilter; - serializer = fieldValueStatsConverters.getOrCreate(entry.file().schemaId()); - stats = entry.file().valueStats(); + DataFileMeta file = entry.file(); + if (isValueFilterEnabled(entry) && !filterByValueFilter(entry)) { + return false; } - if (filter == null && keyFilter != null) { - filter = keyFilter; - serializer = fieldKeyStatsConverters.getOrCreate(entry.file().schemaId()); - stats = entry.file().keyStats(); + if (keyFilter != null) { + SimpleStatsEvolution.Result stats = + fieldKeyStatsConverters + .getOrCreate(file.schemaId()) + .evolution(file.keyStats(), file.rowCount(), null); + return keyFilter.test( + file.rowCount(), stats.minValues(), stats.maxValues(), stats.nullCounts()); } - if (filter == null) { - return true; - } - - return filter.test( - entry.file().rowCount(), - serializer.evolution(stats.minValues()), - serializer.evolution(stats.maxValues()), - serializer.evolution(stats.nullCounts(), entry.file().rowCount())); + return true; } private boolean isValueFilterEnabled(ManifestEntry entry) { @@ -184,14 +175,13 @@ private List filterWholeBucketAllFiles(List entrie } private boolean filterByValueFilter(ManifestEntry entry) { - SimpleStatsConverter serializer = - fieldValueStatsConverters.getOrCreate(entry.file().schemaId()); - SimpleStats stats = entry.file().valueStats(); + DataFileMeta file = entry.file(); + SimpleStatsEvolution.Result result = + fieldValueStatsConverters + .getOrCreate(file.schemaId()) + .evolution(file.valueStats(), file.rowCount(), file.valueStatsCols()); return valueFilter.test( - entry.file().rowCount(), - serializer.evolution(stats.minValues()), - serializer.evolution(stats.maxValues()), - serializer.evolution(stats.nullCounts(), entry.file().rowCount())); + file.rowCount(), result.minValues(), result.maxValues(), result.nullCounts()); } private static boolean noOverlapping(List entries) { diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsConverter.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsConverter.java index b76dc72b8fff..13bafd879866 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsConverter.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsConverter.java @@ -18,48 +18,78 @@ package org.apache.paimon.stats; -import org.apache.paimon.casting.CastFieldGetter; -import org.apache.paimon.casting.CastedRow; import org.apache.paimon.data.BinaryArray; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; -import org.apache.paimon.data.InternalArray; -import org.apache.paimon.data.InternalMap; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.format.SimpleColStats; -import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.ProjectedRow; +import org.apache.paimon.utils.Pair; -import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** Converter for array of {@link SimpleColStats}. */ public class SimpleStatsConverter { + private final RowType rowType; + private final boolean denseStore; private final InternalRowSerializer serializer; - - @Nullable private final int[] indexMapping; - @Nullable private final CastFieldGetter[] castFieldGetters; + private final Map, InternalRowSerializer> serializers; public SimpleStatsConverter(RowType type) { - this(type, null, null); + this(type, false); + } + + public SimpleStatsConverter(RowType type, boolean denseStore) { + // as stated in RollingFile.Writer#finish, col stats are not collected currently so + // min/max values are all nulls + this.rowType = + type.copy( + type.getFields().stream() + .map(f -> f.newType(f.type().copy(true))) + .collect(Collectors.toList())); + this.denseStore = denseStore; + this.serializer = new InternalRowSerializer(rowType); + this.serializers = new HashMap<>(); } - public SimpleStatsConverter( - RowType type, - @Nullable int[] indexMapping, - @Nullable CastFieldGetter[] castFieldGetters) { - RowType safeType = toAllFieldsNullableRowType(type); - this.serializer = new InternalRowSerializer(safeType); - this.indexMapping = indexMapping; - this.castFieldGetters = castFieldGetters; + public Pair, SimpleStats> toBinary(SimpleColStats[] stats) { + return denseStore ? toBinaryDenseMode(stats) : Pair.of(null, toBinaryAllMode(stats)); + } + + private Pair, SimpleStats> toBinaryDenseMode(SimpleColStats[] stats) { + List fields = new ArrayList<>(); + List minValues = new ArrayList<>(); + List maxValues = new ArrayList<>(); + List nullCounts = new ArrayList<>(); + for (int i = 0; i < stats.length; i++) { + SimpleColStats colStats = stats[i]; + if (colStats.isNone()) { + continue; + } + + fields.add(rowType.getFields().get(i).name()); + minValues.add(colStats.min()); + maxValues.add(colStats.max()); + nullCounts.add(colStats.nullCount()); + } + + InternalRowSerializer serializer = + serializers.computeIfAbsent( + fields, key -> new InternalRowSerializer(rowType.project(key))); + + SimpleStats simpleStats = + new SimpleStats( + serializer.toBinaryRow(GenericRow.of(minValues.toArray())).copy(), + serializer.toBinaryRow(GenericRow.of(maxValues.toArray())).copy(), + BinaryArray.fromLongArray(nullCounts.toArray(new Long[0]))); + return Pair.of(fields.size() == rowType.getFieldCount() ? null : fields, simpleStats); } - public SimpleStats toBinary(SimpleColStats[] stats) { + public SimpleStats toBinaryAllMode(SimpleColStats[] stats) { int rowFieldCount = stats.length; GenericRow minValues = new GenericRow(rowFieldCount); GenericRow maxValues = new GenericRow(rowFieldCount); @@ -74,192 +104,4 @@ public SimpleStats toBinary(SimpleColStats[] stats) { serializer.toBinaryRow(maxValues).copy(), BinaryArray.fromLongArray(nullCounts)); } - - public InternalRow evolution(BinaryRow values) { - InternalRow row = values; - if (indexMapping != null) { - row = ProjectedRow.from(indexMapping).replaceRow(row); - } - - if (castFieldGetters != null) { - row = CastedRow.from(castFieldGetters).replaceRow(values); - } - - return row; - } - - public InternalArray evolution(BinaryArray nullCounts, @Nullable Long rowCount) { - if (indexMapping == null) { - return nullCounts; - } - - if (rowCount == null) { - throw new RuntimeException("Schema Evolution for stats needs row count."); - } - - return new NullCountsEvoArray(indexMapping, nullCounts, rowCount); - } - - private static RowType toAllFieldsNullableRowType(RowType rowType) { - // as stated in RollingFile.Writer#finish, col stats are not collected currently so - // min/max values are all nulls - return RowType.builder() - .fields( - rowType.getFields().stream() - .map(f -> f.type().copy(true)) - .toArray(DataType[]::new), - rowType.getFieldNames().toArray(new String[0])) - .build(); - } - - private static class NullCountsEvoArray implements InternalArray { - - private final int[] indexMapping; - private final InternalArray array; - private final long notFoundValue; - - protected NullCountsEvoArray(int[] indexMapping, InternalArray array, long notFoundValue) { - this.indexMapping = indexMapping; - this.array = array; - this.notFoundValue = notFoundValue; - } - - @Override - public int size() { - return indexMapping.length; - } - - @Override - public boolean isNullAt(int pos) { - if (indexMapping[pos] < 0) { - return false; - } - return array.isNullAt(indexMapping[pos]); - } - - @Override - public long getLong(int pos) { - if (indexMapping[pos] < 0) { - return notFoundValue; - } - return array.getLong(indexMapping[pos]); - } - - // ============================= Unsupported Methods ================================ - - @Override - public boolean getBoolean(int pos) { - throw new UnsupportedOperationException(); - } - - @Override - public byte getByte(int pos) { - throw new UnsupportedOperationException(); - } - - @Override - public short getShort(int pos) { - throw new UnsupportedOperationException(); - } - - @Override - public int getInt(int pos) { - throw new UnsupportedOperationException(); - } - - @Override - public float getFloat(int pos) { - throw new UnsupportedOperationException(); - } - - @Override - public double getDouble(int pos) { - throw new UnsupportedOperationException(); - } - - @Override - public BinaryString getString(int pos) { - throw new UnsupportedOperationException(); - } - - @Override - public Decimal getDecimal(int pos, int precision, int scale) { - throw new UnsupportedOperationException(); - } - - @Override - public Timestamp getTimestamp(int pos, int precision) { - throw new UnsupportedOperationException(); - } - - @Override - public byte[] getBinary(int pos) { - throw new UnsupportedOperationException(); - } - - @Override - public InternalArray getArray(int pos) { - throw new UnsupportedOperationException(); - } - - @Override - public InternalMap getMap(int pos) { - throw new UnsupportedOperationException(); - } - - @Override - public InternalRow getRow(int pos, int numFields) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean equals(Object o) { - throw new UnsupportedOperationException(); - } - - @Override - public int hashCode() { - throw new UnsupportedOperationException(); - } - - @Override - public String toString() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean[] toBooleanArray() { - throw new UnsupportedOperationException(); - } - - @Override - public byte[] toByteArray() { - throw new UnsupportedOperationException(); - } - - @Override - public short[] toShortArray() { - throw new UnsupportedOperationException(); - } - - @Override - public int[] toIntArray() { - throw new UnsupportedOperationException(); - } - - @Override - public long[] toLongArray() { - throw new UnsupportedOperationException(); - } - - @Override - public float[] toFloatArray() { - throw new UnsupportedOperationException(); - } - - @Override - public double[] toDoubleArray() { - throw new UnsupportedOperationException(); - } - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java new file mode 100644 index 000000000000..d3f6d4cd62af --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.stats; + +import org.apache.paimon.casting.CastFieldGetter; +import org.apache.paimon.casting.CastedRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.format.SimpleColStats; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ProjectedArray; +import org.apache.paimon.utils.ProjectedRow; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Converter for array of {@link SimpleColStats}. */ +public class SimpleStatsEvolution { + + private final List fieldNames; + @Nullable private final int[] indexMapping; + @Nullable private final CastFieldGetter[] castFieldGetters; + + private final Map, int[]> indexMappings; + + public SimpleStatsEvolution( + RowType rowType, + @Nullable int[] indexMapping, + @Nullable CastFieldGetter[] castFieldGetters) { + this.fieldNames = rowType.getFieldNames(); + this.indexMapping = indexMapping; + this.castFieldGetters = castFieldGetters; + this.indexMappings = new HashMap<>(); + } + + public Result evolution( + SimpleStats stats, @Nullable Long rowCount, @Nullable List denseFields) { + InternalRow minValues = stats.minValues(); + InternalRow maxValues = stats.maxValues(); + InternalArray nullCounts = stats.nullCounts(); + + if (denseFields != null) { + int[] denseIndexMapping = + indexMappings.computeIfAbsent( + denseFields, + k -> fieldNames.stream().mapToInt(denseFields::indexOf).toArray()); + minValues = ProjectedRow.from(denseIndexMapping).replaceRow(minValues); + maxValues = ProjectedRow.from(denseIndexMapping).replaceRow(maxValues); + nullCounts = ProjectedArray.from(denseIndexMapping).replaceArray(nullCounts); + } + + if (indexMapping != null) { + minValues = ProjectedRow.from(indexMapping).replaceRow(minValues); + maxValues = ProjectedRow.from(indexMapping).replaceRow(maxValues); + + if (rowCount == null) { + throw new RuntimeException("Schema Evolution for stats needs row count."); + } + + nullCounts = new NullCountsEvoArray(indexMapping, nullCounts, rowCount); + } + + if (castFieldGetters != null) { + minValues = CastedRow.from(castFieldGetters).replaceRow(minValues); + maxValues = CastedRow.from(castFieldGetters).replaceRow(maxValues); + } + + return new Result(minValues, maxValues, nullCounts); + } + + /** Result to {@link SimpleStats} evolution. */ + public static class Result { + + private final InternalRow minValues; + private final InternalRow maxValues; + private final InternalArray nullCounts; + + public Result(InternalRow minValues, InternalRow maxValues, InternalArray nullCounts) { + this.minValues = minValues; + this.maxValues = maxValues; + this.nullCounts = nullCounts; + } + + public InternalRow minValues() { + return minValues; + } + + public InternalRow maxValues() { + return maxValues; + } + + public InternalArray nullCounts() { + return nullCounts; + } + } + + private static class NullCountsEvoArray implements InternalArray { + + private final int[] indexMapping; + private final InternalArray array; + private final long notFoundValue; + + private NullCountsEvoArray(int[] indexMapping, InternalArray array, long notFoundValue) { + this.indexMapping = indexMapping; + this.array = array; + this.notFoundValue = notFoundValue; + } + + @Override + public int size() { + return indexMapping.length; + } + + @Override + public boolean isNullAt(int pos) { + if (indexMapping[pos] < 0) { + return false; + } + return array.isNullAt(indexMapping[pos]); + } + + @Override + public long getLong(int pos) { + if (indexMapping[pos] < 0) { + return notFoundValue; + } + return array.getLong(indexMapping[pos]); + } + + // ============================= Unsupported Methods ================================ + + @Override + public boolean getBoolean(int pos) { + throw new UnsupportedOperationException(); + } + + @Override + public byte getByte(int pos) { + throw new UnsupportedOperationException(); + } + + @Override + public short getShort(int pos) { + throw new UnsupportedOperationException(); + } + + @Override + public int getInt(int pos) { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat(int pos) { + throw new UnsupportedOperationException(); + } + + @Override + public double getDouble(int pos) { + throw new UnsupportedOperationException(); + } + + @Override + public BinaryString getString(int pos) { + throw new UnsupportedOperationException(); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + throw new UnsupportedOperationException(); + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getBinary(int pos) { + throw new UnsupportedOperationException(); + } + + @Override + public InternalArray getArray(int pos) { + throw new UnsupportedOperationException(); + } + + @Override + public InternalMap getMap(int pos) { + throw new UnsupportedOperationException(); + } + + @Override + public InternalRow getRow(int pos, int numFields) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean[] toBooleanArray() { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] toByteArray() { + throw new UnsupportedOperationException(); + } + + @Override + public short[] toShortArray() { + throw new UnsupportedOperationException(); + } + + @Override + public int[] toIntArray() { + throw new UnsupportedOperationException(); + } + + @Override + public long[] toLongArray() { + throw new UnsupportedOperationException(); + } + + @Override + public float[] toFloatArray() { + throw new UnsupportedOperationException(); + } + + @Override + public double[] toDoubleArray() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsConverters.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java similarity index 86% rename from paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsConverters.java rename to paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java index 10694769b62c..a0814b8c04c4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsConverters.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java @@ -37,28 +37,29 @@ import static org.apache.paimon.schema.SchemaEvolutionUtil.createIndexCastMapping; /** Converters to create col stats array serializer. */ -public class SimpleStatsConverters { +public class SimpleStatsEvolutions { private final Function> schemaFields; private final long tableSchemaId; private final List tableDataFields; private final AtomicReference> tableFields; - private final ConcurrentMap converters; + private final ConcurrentMap evolutions; - public SimpleStatsConverters(Function> schemaFields, long tableSchemaId) { + public SimpleStatsEvolutions(Function> schemaFields, long tableSchemaId) { this.schemaFields = schemaFields; this.tableSchemaId = tableSchemaId; this.tableDataFields = schemaFields.apply(tableSchemaId); this.tableFields = new AtomicReference<>(); - this.converters = new ConcurrentHashMap<>(); + this.evolutions = new ConcurrentHashMap<>(); } - public SimpleStatsConverter getOrCreate(long dataSchemaId) { - return converters.computeIfAbsent( + public SimpleStatsEvolution getOrCreate(long dataSchemaId) { + return evolutions.computeIfAbsent( dataSchemaId, id -> { if (tableSchemaId == id) { - return new SimpleStatsConverter(new RowType(schemaFields.apply(id))); + return new SimpleStatsEvolution( + new RowType(schemaFields.apply(id)), null, null); } // Get atomic schema fields. @@ -66,10 +67,10 @@ public SimpleStatsConverter getOrCreate(long dataSchemaId) { tableFields.updateAndGet(v -> v == null ? tableDataFields : v); List dataFields = schemaFields.apply(id); IndexCastMapping indexCastMapping = - createIndexCastMapping(schemaTableFields, dataFields); + createIndexCastMapping(schemaTableFields, schemaFields.apply(id)); @Nullable int[] indexMapping = indexCastMapping.getIndexMapping(); // Create col stats array serializer with schema evolution - return new SimpleStatsConverter( + return new SimpleStatsEvolution( new RowType(dataFields), indexMapping, indexCastMapping.getCastMapping()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index 3bd337294b4f..01227dd35407 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -21,10 +21,8 @@ import org.apache.paimon.FileStore; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.Path; -import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.query.LocalTableQuery; import org.apache.paimon.table.sink.RowKeyExtractor; import org.apache.paimon.table.sink.TableCommitImpl; @@ -104,10 +102,6 @@ default Optional comment() { LocalTableQuery newLocalTableQuery(); - default SimpleStats getSchemaFieldStats(DataFileMeta dataFileMeta) { - return dataFileMeta.valueStats(); - } - boolean supportStreamingReadOverwrite(); RowKeyExtractor createRowKeyExtractor(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java index 989f32f50beb..3e351cd1dae9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java @@ -154,6 +154,7 @@ public DataFileMeta fromRow(InternalRow row) { row.getTimestamp(12, 3), null, null, + null, null); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index cc8abfa485f3..7918914b2c63 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -21,7 +21,9 @@ import org.apache.paimon.data.serializer.VersionedSerializer; import org.apache.paimon.index.IndexFileMetaSerializer; import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMeta08Serializer; +import org.apache.paimon.io.DataFileMeta09Serializer; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.io.DataInputDeserializer; @@ -29,6 +31,7 @@ import org.apache.paimon.io.DataOutputView; import org.apache.paimon.io.DataOutputViewStreamWrapper; import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.utils.IOExceptionSupplier; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -42,7 +45,7 @@ /** {@link VersionedSerializer} for {@link CommitMessage}. */ public class CommitMessageSerializer implements VersionedSerializer { - private static final int CURRENT_VERSION = 3; + private static final int CURRENT_VERSION = 4; private final DataFileMetaSerializer dataFileSerializer; private final IndexFileMetaSerializer indexEntrySerializer; @@ -104,30 +107,25 @@ public List deserializeList(int version, DataInputView view) thro } private CommitMessage deserialize(int version, DataInputView view) throws IOException { - if (version == CURRENT_VERSION) { + if (version >= 3) { + IOExceptionSupplier> fileDeserializer = + () -> dataFileSerializer.deserializeList(view); + if (version == 3) { + DataFileMeta09Serializer serializer = new DataFileMeta09Serializer(); + fileDeserializer = () -> serializer.deserializeList(view); + } return new CommitMessageImpl( deserializeBinaryRow(view), view.readInt(), new DataIncrement( - dataFileSerializer.deserializeList(view), - dataFileSerializer.deserializeList(view), - dataFileSerializer.deserializeList(view)), + fileDeserializer.get(), fileDeserializer.get(), fileDeserializer.get()), new CompactIncrement( - dataFileSerializer.deserializeList(view), - dataFileSerializer.deserializeList(view), - dataFileSerializer.deserializeList(view)), + fileDeserializer.get(), fileDeserializer.get(), fileDeserializer.get()), new IndexIncrement( indexEntrySerializer.deserializeList(view), indexEntrySerializer.deserializeList(view))); - } else if (version <= 2) { - return deserialize08(version, view); } else { - throw new UnsupportedOperationException( - "Expecting CommitMessageSerializer version to be smaller or equal than " - + CURRENT_VERSION - + ", but found " - + version - + "."); + return deserialize08(version, view); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 067bf055c241..1dac6584d698 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMeta08Serializer; +import org.apache.paimon.io.DataFileMeta09Serializer; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.io.DataInputView; import org.apache.paimon.io.DataInputViewStreamWrapper; @@ -49,7 +50,7 @@ public class DataSplit implements Split { private static final long serialVersionUID = 7L; private static final long MAGIC = -2394839472490812314L; - private static final int VERSION = 2; + private static final int VERSION = 3; private long snapshotId = 0; private BinaryRow partition; @@ -316,6 +317,9 @@ private static FunctionWithIOException getFileMetaS DataFileMeta08Serializer serializer = new DataFileMeta08Serializer(); return serializer::deserialize; } else if (version == 2) { + DataFileMeta09Serializer serializer = new DataFileMeta09Serializer(); + return serializer::deserialize; + } else if (version == 3) { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); return serializer::deserialize; } else { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index ed06afd3a19a..ab08d9e9f2cc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -34,9 +34,8 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.stats.SimpleStats; -import org.apache.paimon.stats.SimpleStatsConverter; -import org.apache.paimon.stats.SimpleStatsConverters; +import org.apache.paimon.stats.SimpleStatsEvolution; +import org.apache.paimon.stats.SimpleStatsEvolutions; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; import org.apache.paimon.table.Table; @@ -314,8 +313,8 @@ public RecordReader createReader(Split split) { List> iteratorList = new ArrayList<>(); // dataFilePlan.snapshotId indicates there's no files in the table, use the newest // schema id directly - SimpleStatsConverters simpleStatsConverters = - new SimpleStatsConverters( + SimpleStatsEvolutions simpleStatsEvolutions = + new SimpleStatsEvolutions( sid -> schemaManager.schema(sid).fields(), storeTable.schema().id()); RowDataToObjectArrayConverter partitionConverter = @@ -352,8 +351,7 @@ public RowDataToObjectArrayConverter apply(Long schemaId) { partitionConverter, keyConverters, file, - storeTable.getSchemaFieldStats(file), - simpleStatsConverters))); + simpleStatsEvolutions))); } Iterator rows = Iterators.concat(iteratorList.iterator()); if (readType != null) { @@ -372,10 +370,8 @@ private LazyGenericRow toRow( RowDataToObjectArrayConverter partitionConverter, Function keyConverters, DataFileMeta dataFileMeta, - SimpleStats stats, - SimpleStatsConverters simpleStatsConverters) { - StatsLazyGetter statsGetter = - new StatsLazyGetter(stats, dataFileMeta, simpleStatsConverters); + SimpleStatsEvolutions simpleStatsEvolutions) { + StatsLazyGetter statsGetter = new StatsLazyGetter(dataFileMeta, simpleStatsEvolutions); @SuppressWarnings("unchecked") Supplier[] fields = new Supplier[] { @@ -426,32 +422,31 @@ private LazyGenericRow toRow( private static class StatsLazyGetter { - private final SimpleStats stats; private final DataFileMeta file; - private final SimpleStatsConverters simpleStatsConverters; + private final SimpleStatsEvolutions simpleStatsEvolutions; private Map lazyNullValueCounts; private Map lazyLowerValueBounds; private Map lazyUpperValueBounds; - private StatsLazyGetter( - SimpleStats stats, DataFileMeta file, SimpleStatsConverters simpleStatsConverters) { - this.stats = stats; + private StatsLazyGetter(DataFileMeta file, SimpleStatsEvolutions simpleStatsEvolutions) { this.file = file; - this.simpleStatsConverters = simpleStatsConverters; + this.simpleStatsEvolutions = simpleStatsEvolutions; } private void initialize() { - SimpleStatsConverter serializer = simpleStatsConverters.getOrCreate(file.schemaId()); + SimpleStatsEvolution evolution = simpleStatsEvolutions.getOrCreate(file.schemaId()); // Create value stats - InternalRow min = serializer.evolution(stats.minValues()); - InternalRow max = serializer.evolution(stats.maxValues()); - InternalArray nullCounts = serializer.evolution(stats.nullCounts(), file.rowCount()); + SimpleStatsEvolution.Result result = + evolution.evolution(file.valueStats(), file.rowCount(), file.valueStatsCols()); + InternalRow min = result.minValues(); + InternalRow max = result.maxValues(); + InternalArray nullCounts = result.nullCounts(); lazyNullValueCounts = new TreeMap<>(); lazyLowerValueBounds = new TreeMap<>(); lazyUpperValueBounds = new TreeMap<>(); for (int i = 0; i < min.getFieldCount(); i++) { - DataField field = simpleStatsConverters.tableDataFields().get(i); + DataField field = simpleStatsEvolutions.tableDataFields().get(i); String name = field.name(); DataType type = field.type(); lazyNullValueCounts.put( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java index 6d93224e5600..ed23a246a1f0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java @@ -21,6 +21,8 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.options.Options; import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.statistics.TruncateSimpleColStatsCollector; +import org.apache.paimon.table.SystemFields; import java.util.List; @@ -37,15 +39,16 @@ public static SimpleColStatsCollector.Factory[] createStatsFactories( SimpleColStatsCollector.Factory[] modes = new SimpleColStatsCollector.Factory[fields.size()]; for (int i = 0; i < fields.size(); i++) { + String field = fields.get(i); String fieldMode = cfg.get( - key(String.format( - "%s.%s.%s", - FIELDS_PREFIX, fields.get(i), STATS_MODE_SUFFIX)) + key(String.format("%s.%s.%s", FIELDS_PREFIX, field, STATS_MODE_SUFFIX)) .stringType() .noDefaultValue()); if (fieldMode != null) { modes[i] = SimpleColStatsCollector.from(fieldMode); + } else if (SystemFields.isSystemField(field)) { + modes[i] = () -> new TruncateSimpleColStatsCollector(128); } else { modes[i] = SimpleColStatsCollector.from(cfg.get(CoreOptions.METADATA_STATS_MODE)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index a4ba849ad7c0..04b5fa6e60f7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -137,7 +137,7 @@ public void testSingleWrite() throws Exception { new SimpleColStats[] { initStats(1, 1, 0), initStats("AAA", "AAA", 0), initStats(PART, PART, 0) }; - assertThat(meta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinary(expected)); + assertThat(meta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinaryAllMode(expected)); assertThat(meta.minSequenceNumber()).isEqualTo(0); assertThat(meta.maxSequenceNumber()).isEqualTo(0); @@ -200,7 +200,7 @@ public void testMultipleCommits() throws Exception { initStats(String.format("%03d", start), String.format("%03d", end - 1), 0), initStats(PART, PART, 0) }; - assertThat(meta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinary(expected)); + assertThat(meta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinaryAllMode(expected)); assertThat(meta.minSequenceNumber()).isEqualTo(start); assertThat(meta.maxSequenceNumber()).isEqualTo(end - 1); @@ -243,7 +243,7 @@ public void testRollingWrite() throws Exception { initStats(String.format("%03d", min), String.format("%03d", max), 0), initStats(PART, PART, 0) }; - assertThat(meta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinary(expected)); + assertThat(meta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinaryAllMode(expected)); assertThat(meta.minSequenceNumber()).isEqualTo(min); assertThat(meta.maxSequenceNumber()).isEqualTo(max); @@ -633,7 +633,8 @@ private Pair> createWriter( options, AppendOnlyWriterTest.SCHEMA.getFieldNames()), MemorySize.MAX_VALUE, new FileIndexOptions(), - true); + true, + false); writer.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); return Pair.of(writer, compactManager.allFiles()); @@ -649,7 +650,7 @@ private DataFileMeta generateCompactAfter(List toCompact) throws I fileName, toCompact.stream().mapToLong(DataFileMeta::fileSize).sum(), toCompact.stream().mapToLong(DataFileMeta::rowCount).sum(), - STATS_SERIALIZER.toBinary( + STATS_SERIALIZER.toBinaryAllMode( new SimpleColStats[] { initStats( toCompact.get(0).valueStats().minValues().getInt(0), @@ -674,6 +675,9 @@ private DataFileMeta generateCompactAfter(List toCompact) throws I minSeq, maxSeq, toCompact.get(0).schemaId(), - FileSource.APPEND); + Collections.emptyList(), + null, + FileSource.APPEND, + null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinatorTest.java b/paimon-core/src/test/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinatorTest.java index d69773900954..95826c195ec7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinatorTest.java @@ -190,6 +190,7 @@ private DataFileMeta newFile(long fileSize) { 0, 0L, null, - FileSource.APPEND); + FileSource.APPEND, + null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java index d9d8e1f69bcf..be4147735614 100644 --- a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java @@ -159,7 +159,8 @@ private static DataFileMeta newFile(long timeMillis) { .toLocalDateTime()), 0L, null, - FileSource.APPEND); + FileSource.APPEND, + null); } private Pair row(int pt, int col, int pk, int bucket) { diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index 4aee88298926..439fc499929c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -97,7 +97,8 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception options, SCHEMA.getFieldNames()), MemorySize.MAX_VALUE, new FileIndexOptions(), - true); + true, + false); appendOnlyWriter.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); appendOnlyWriter.write( diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java index 810cef860784..7de4214beaa9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java @@ -156,15 +156,16 @@ private Data createDataFile(List kvs, int level, BinaryRow partition, kvs.size(), minKey, maxKey, - keyStatsSerializer.toBinary(keyStatsCollector.extract()), - valueStatsSerializer.toBinary(valueStatsCollector.extract()), + keyStatsSerializer.toBinaryAllMode(keyStatsCollector.extract()), + valueStatsSerializer.toBinaryAllMode(valueStatsCollector.extract()), minSequenceNumber, maxSequenceNumber, 0, level, kvs.stream().filter(kv -> kv.valueKind().isRetract()).count(), null, - FileSource.APPEND), + FileSource.APPEND, + null), kvs); } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java index 03b9babfaa39..48c8d44876ae 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java @@ -56,7 +56,8 @@ public static DataFileMeta newFile(long minSeq, long maxSeq) { Timestamp.fromEpochMillis(100), maxSeq - minSeq + 1, null, - FileSource.APPEND); + FileSource.APPEND, + null); } public static DataFileMeta newFile() { @@ -74,7 +75,8 @@ public static DataFileMeta newFile() { 0, 0L, null, - FileSource.APPEND); + FileSource.APPEND, + null); } public static DataFileMeta newFile( @@ -98,7 +100,8 @@ public static DataFileMeta newFile( level, deleteRowCount, null, - FileSource.APPEND); + FileSource.APPEND, + null); } public static BinaryRow row(int i) { diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java index df9f2ac0ba36..9bee36b7cc3c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java @@ -35,6 +35,7 @@ import org.apache.paimon.utils.StatsCollectorFactories; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -62,6 +63,10 @@ public class RollingFileWriterTest { private RollingFileWriter rollingFileWriter; public void initialize(String identifier) { + initialize(identifier, false); + } + + public void initialize(String identifier, boolean statsDenseStore) { FileFormat fileFormat = FileFormat.fromIdentifier(identifier, new Options()); rollingFileWriter = new RollingFileWriter<>( @@ -94,7 +99,8 @@ public void initialize(String identifier) { SCHEMA.getFieldNames()), new FileIndexOptions(), FileSource.APPEND, - true), + true, + statsDenseStore), TARGET_FILE_SIZE); } @@ -123,4 +129,16 @@ private void assertFileNum(int expected) { File[] files = dataDir.listFiles(); assertThat(files).isNotNull().hasSize(expected); } + + @Test + public void testStatsDenseStore() throws IOException { + initialize("parquet", true); + for (int i = 0; i < 1000; i++) { + rollingFileWriter.write(GenericRow.of(i)); + } + rollingFileWriter.close(); + DataFileMeta file = rollingFileWriter.result().get(0); + assertThat(file.valueStatsCols()).isNull(); + assertThat(file.valueStats().minValues().getFieldCount()).isEqualTo(SCHEMA.getFieldCount()); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java index 744904c71084..bd272b745dc4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java @@ -74,7 +74,8 @@ public void testProduction() throws IOException { Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), 11L, new byte[] {1, 2, 4}, - FileSource.COMPACT); + FileSource.COMPACT, + Arrays.asList("field1", "field2", "field3")); List dataFiles = Collections.singletonList(dataFile); LinkedHashMap> dvRanges = new LinkedHashMap<>(); @@ -105,6 +106,76 @@ public void testProduction() throws IOException { assertThat(deserialized).isEqualTo(manifestCommittable); } + @Test + public void testCompatibilityToVersion3() throws IOException { + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + DataFileMeta dataFile = + new DataFileMeta( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + 11L, + new byte[] {1, 2, 4}, + FileSource.COMPACT, + null); + List dataFiles = Collections.singletonList(dataFile); + + LinkedHashMap> dvRanges = new LinkedHashMap<>(); + dvRanges.put("dv_key1", Pair.of(1, 2)); + dvRanges.put("dv_key2", Pair.of(3, 4)); + IndexFileMeta indexFile = + new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvRanges); + List indexFiles = Collections.singletonList(indexFile); + + CommitMessageImpl commitMessage = + new CommitMessageImpl( + singleColumn("my_partition"), + 11, + new DataIncrement(dataFiles, dataFiles, dataFiles), + new CompactIncrement(dataFiles, dataFiles, dataFiles), + new IndexIncrement(indexFiles)); + + ManifestCommittable manifestCommittable = + new ManifestCommittable( + 5, + 202020L, + Collections.singletonMap(5, 555L), + Collections.singletonList(commitMessage)); + + ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); + byte[] bytes = serializer.serialize(manifestCommittable); + ManifestCommittable deserialized = serializer.deserialize(3, bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + + byte[] v2Bytes = + IOUtils.readFully( + ManifestCommittableSerializerCompatibilityTest.class + .getClassLoader() + .getResourceAsStream("compatibility/manifest-committable-v3"), + true); + deserialized = serializer.deserialize(2, v2Bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + } + @Test public void testCompatibilityToVersion2() throws IOException { SimpleStats keyStats = @@ -134,6 +205,7 @@ public void testCompatibilityToVersion2() throws IOException { Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), 11L, new byte[] {1, 2, 4}, + null, null); List dataFiles = Collections.singletonList(dataFile); @@ -203,6 +275,7 @@ public void testCompatibilityToVersion2PaimonV07() throws IOException { Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), null, null, + null, null); List dataFiles = Collections.singletonList(dataFile); diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java index 099c38003391..c179a2c0a789 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java @@ -117,6 +117,7 @@ public static DataFileMeta newFile(int name, int level) { level, 0L, null, - FileSource.APPEND); + FileSource.APPEND, + null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java index c2a7f821dda0..c3d6cc3be86a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java @@ -300,13 +300,13 @@ public void testTriggerFullCompaction() throws Exception { // case5:the sizes of some manifest files are greater than suggestedMetaSize, while the // sizes of other manifest files is less than. All manifest files have no delete files input.clear(); - input.addAll(Arrays.asList(manifest1, manifest2, manifest3)); + input.addAll(Arrays.asList(manifest1, manifest2, manifest3, manifest4)); List newMetas5 = new ArrayList<>(); List fullCompacted5 = ManifestFileMerger.tryFullCompaction( input, newMetas5, manifestFile, 1800, 100, getPartitionType(), null) .get(); - assertThat(fullCompacted5.size()).isEqualTo(2); + assertThat(fullCompacted5.size()).isEqualTo(3); assertThat(newMetas5.size()).isEqualTo(1); // trigger full compaction diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index a40d0538fdfa..31be9346c49b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -94,7 +94,8 @@ protected ManifestEntry makeEntry( Timestamp.fromEpochMillis(200000), 0L, // not used embeddedIndex, // not used - FileSource.APPEND)); + FileSource.APPEND, + null)); } protected ManifestFileMeta makeManifest(ManifestEntry... entries) { @@ -272,6 +273,7 @@ public static ManifestEntry makeEntry( 0, // not used 0L, null, - FileSource.APPEND)); + FileSource.APPEND, + null)); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java index 5294436aefc9..0283eda853e2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java @@ -113,7 +113,7 @@ public ManifestFileMeta createManifestFileMeta(List entries) { entries.size() * 100L, numAddedFiles, numDeletedFiles, - serializer.toBinary(collector.extract()), + serializer.toBinaryAllMode(collector.extract()), 0); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java index a86aa445b2a6..d804c9790282 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java @@ -83,6 +83,7 @@ public static DataFileMeta newFile(int level) { level, 0L, null, - FileSource.APPEND); + FileSource.APPEND, + null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java index 05fce451a40e..bdee5c5f7507 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java @@ -183,7 +183,8 @@ private DataFileMeta makeInterval(int left, int right) { Timestamp.fromEpochMillis(100000), 0L, null, - FileSource.APPEND); + FileSource.APPEND, + null); } private List> toMultiset(List> sections) { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java index 25d263a93102..793aea7598e9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java @@ -359,6 +359,6 @@ private LevelSortedRun level(int level, long size) { static DataFileMeta file(long size) { return new DataFileMeta( - "", size, 1, null, null, null, null, 0, 0, 0, 0, 0L, null, FileSource.APPEND); + "", size, 1, null, null, null, null, 0, 0, 0, 0, 0L, null, FileSource.APPEND, null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 16a2ab56ba03..739d4b6bd6b3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -211,7 +211,8 @@ public void testExpireExtraFiles() throws IOException { Timestamp.now(), 0L, null, - FileSource.APPEND); + FileSource.APPEND, + null); ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, dataFile); ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 0, 1, dataFile); diff --git a/paimon-core/src/test/java/org/apache/paimon/stats/SimpleStatsConverterTest.java b/paimon-core/src/test/java/org/apache/paimon/stats/SimpleStatsEvolutionTest.java similarity index 91% rename from paimon-core/src/test/java/org/apache/paimon/stats/SimpleStatsConverterTest.java rename to paimon-core/src/test/java/org/apache/paimon/stats/SimpleStatsEvolutionTest.java index a88276c503d2..af4fb78b24c7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/stats/SimpleStatsConverterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/stats/SimpleStatsEvolutionTest.java @@ -36,8 +36,8 @@ import static org.apache.paimon.io.DataFileTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link SimpleStatsConverter}. */ -public class SimpleStatsConverterTest { +/** Tests for {@link SimpleStatsEvolution}. */ +public class SimpleStatsEvolutionTest { @Test public void testFromBinary() { @@ -73,8 +73,8 @@ public void testFromBinary() { SchemaEvolutionUtil.createIndexCastMapping( tableSchema.fields(), dataSchema.fields()); int[] indexMapping = indexCastMapping.getIndexMapping(); - SimpleStatsConverter serializer = - new SimpleStatsConverter( + SimpleStatsEvolution evolution = + new SimpleStatsEvolution( tableSchema.logicalRowType(), indexMapping, indexCastMapping.getCastMapping()); @@ -83,10 +83,10 @@ public void testFromBinary() { Long[] nullCounts = new Long[] {1L, 0L, 10L, 100L}; SimpleStats stats = new SimpleStats(minRowData, maxRowData, BinaryArray.fromLongArray(nullCounts)); - - InternalRow min = serializer.evolution(stats.minValues()); - InternalRow max = serializer.evolution(stats.maxValues()); - InternalArray nulls = serializer.evolution(stats.nullCounts(), 1000L); + SimpleStatsEvolution.Result result = evolution.evolution(stats, 1000L, null); + InternalRow min = result.minValues(); + InternalRow max = result.maxValues(); + InternalArray nulls = result.nullCounts(); checkFieldStats(min, max, nulls, 0, 2, 99, 0L); checkFieldStats(min, max, nulls, 1, 4, 97, 100L); diff --git a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTestUtils.java b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTestUtils.java index 0b20815795ec..20c1dc8bcd69 100644 --- a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTestUtils.java +++ b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTestUtils.java @@ -94,11 +94,12 @@ public static SimpleStats newEmptySimpleStats(int fieldCount) { for (int i = 0; i < fieldCount; i++) { array[i] = new SimpleColStats(null, null, 0L); } - return statsConverter.toBinary(array); + return statsConverter.toBinaryAllMode(array); } public static SimpleStats newSimpleStats(int min, int max) { SimpleStatsConverter statsConverter = new SimpleStatsConverter(RowType.of(new IntType())); - return statsConverter.toBinary(new SimpleColStats[] {new SimpleColStats(min, max, 0L)}); + return statsConverter.toBinaryAllMode( + new SimpleColStats[] {new SimpleColStats(min, max, 0L)}); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index 102477fe4848..e6503cc401b3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -58,6 +58,8 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; import java.util.Arrays; @@ -220,10 +222,19 @@ public void testBatchProjection() throws Exception { .hasSameElementsAs(Arrays.asList("200|20", "201|21", "202|22", "201|21")); } - @Test - public void testBatchFilter() throws Exception { - writeData(); - FileStoreTable table = createFileStoreTable(); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testBatchFilter(boolean statsDenseStore) throws Exception { + Consumer optionsSetter = + options -> { + options.set(CoreOptions.METADATA_STATS_DENSE_STORE, statsDenseStore); + if (statsDenseStore) { + options.set(CoreOptions.METADATA_STATS_MODE, "none"); + options.set("fields.b.stats-mode", "full"); + } + }; + writeData(optionsSetter); + FileStoreTable table = createFileStoreTable(optionsSetter); PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType()); Predicate predicate = builder.equal(2, 201L); @@ -858,7 +869,11 @@ public void testBatchOrderWithCompaction() throws Exception { } private void writeData() throws Exception { - FileStoreTable table = createFileStoreTable(); + writeData(options -> {}); + } + + private void writeData(Consumer optionsSetter) throws Exception { + FileStoreTable table = createFileStoreTable(optionsSetter); StreamTableWrite write = table.newWrite(commitUser); StreamTableCommit commit = table.newCommit(commitUser); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java index 12544a093e9f..7f264fa817b2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java @@ -26,8 +26,8 @@ import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.SimpleStats; -import org.apache.paimon.stats.SimpleStatsConverter; -import org.apache.paimon.stats.SimpleStatsConverters; +import org.apache.paimon.stats.SimpleStatsEvolution; +import org.apache.paimon.stats.SimpleStatsEvolutions; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.types.DataField; @@ -236,12 +236,13 @@ protected void validateValuesWithNewSchema( List filesName, List fileMetaList) { Function> schemaFields = id -> tableSchemas.get(id).fields(); - SimpleStatsConverters converters = new SimpleStatsConverters(schemaFields, schemaId); + SimpleStatsEvolutions converters = new SimpleStatsEvolutions(schemaFields, schemaId); for (DataFileMeta fileMeta : fileMetaList) { SimpleStats stats = getTableValueStats(fileMeta); - SimpleStatsConverter serializer = converters.getOrCreate(fileMeta.schemaId()); - InternalRow min = serializer.evolution(stats.minValues()); - InternalRow max = serializer.evolution(stats.maxValues()); + SimpleStatsEvolution.Result result = + converters.getOrCreate(fileMeta.schemaId()).evolution(stats, null, null); + InternalRow min = result.minValues(); + InternalRow max = result.maxValues(); assertThat(stats.minValues().getFieldCount()).isEqualTo(12); if (filesName.contains(fileMeta.fileName())) { checkTwoValues(min, max); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileMetaFilterTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileMetaFilterTestBase.java index 3d1ebb3d528e..182a42fe4bfc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileMetaFilterTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileMetaFilterTestBase.java @@ -23,8 +23,8 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.stats.SimpleStats; -import org.apache.paimon.stats.SimpleStatsConverter; -import org.apache.paimon.stats.SimpleStatsConverters; +import org.apache.paimon.stats.SimpleStatsEvolution; +import org.apache.paimon.stats.SimpleStatsEvolutions; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.types.DataField; @@ -78,14 +78,16 @@ public void testTableSplit() throws Exception { .containsAll(filesName); Function> schemaFields = id -> schemas.get(id).fields(); - SimpleStatsConverters converters = - new SimpleStatsConverters(schemaFields, table.schema().id()); + SimpleStatsEvolutions converters = + new SimpleStatsEvolutions(schemaFields, table.schema().id()); for (DataFileMeta fileMeta : fileMetaList) { SimpleStats stats = getTableValueStats(fileMeta); - SimpleStatsConverter serializer = - converters.getOrCreate(fileMeta.schemaId()); - InternalRow min = serializer.evolution(stats.minValues()); - InternalRow max = serializer.evolution(stats.maxValues()); + SimpleStatsEvolution.Result result = + converters + .getOrCreate(fileMeta.schemaId()) + .evolution(stats, 100L, null); + InternalRow min = result.minValues(); + InternalRow max = result.maxValues(); assertThat(min.getFieldCount()).isEqualTo(6); @@ -184,16 +186,18 @@ public void testTableSplitFilterExistFields() throws Exception { assertThat(filterAllSplits).isEqualTo(allSplits); Function> schemaFields = id -> schemas.get(id).fields(); - SimpleStatsConverters converters = - new SimpleStatsConverters(schemaFields, table.schema().id()); + SimpleStatsEvolutions converters = + new SimpleStatsEvolutions(schemaFields, table.schema().id()); Set filterFileNames = new HashSet<>(); for (DataSplit dataSplit : filterAllSplits) { for (DataFileMeta dataFileMeta : dataSplit.dataFiles()) { SimpleStats stats = getTableValueStats(dataFileMeta); - SimpleStatsConverter serializer = - converters.getOrCreate(dataFileMeta.schemaId()); - InternalRow min = serializer.evolution(stats.minValues()); - InternalRow max = serializer.evolution(stats.maxValues()); + SimpleStatsEvolution.Result result = + converters + .getOrCreate(dataFileMeta.schemaId()) + .evolution(stats, 100L, null); + InternalRow min = result.minValues(); + InternalRow max = result.maxValues(); int minValue = min.getInt(1); int maxValue = max.getInt(1); if (minValue >= 14 @@ -263,15 +267,17 @@ public void testTableSplitFilterNewFields() throws Exception { Set filterFileNames = new HashSet<>(); Function> schemaFields = id -> schemas.get(id).fields(); - SimpleStatsConverters converters = - new SimpleStatsConverters(schemaFields, table.schema().id()); + SimpleStatsEvolutions converters = + new SimpleStatsEvolutions(schemaFields, table.schema().id()); for (DataSplit dataSplit : allSplits) { for (DataFileMeta dataFileMeta : dataSplit.dataFiles()) { SimpleStats stats = getTableValueStats(dataFileMeta); - SimpleStatsConverter serializer = - converters.getOrCreate(dataFileMeta.schemaId()); - InternalRow min = serializer.evolution(stats.minValues()); - InternalRow max = serializer.evolution(stats.maxValues()); + SimpleStatsEvolution.Result result = + converters + .getOrCreate(dataFileMeta.schemaId()) + .evolution(stats, 100L, null); + InternalRow min = result.minValues(); + InternalRow max = result.maxValues(); Integer minValue = min.isNullAt(3) ? null : min.getInt(3); Integer maxValue = max.isNullAt(3) ? null : max.getInt(3); if (minValue != null diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 7edc418186a8..1ecfd6f910bc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -359,10 +359,20 @@ public void testBatchProjection() throws Exception { .isEqualTo(Arrays.asList("20001|21", "202|22")); } - @Test - public void testBatchFilter() throws Exception { - writeData(); - FileStoreTable table = createFileStoreTable(); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testBatchFilter(boolean statsDenseStore) throws Exception { + Consumer optionsSetter = + options -> { + options.set(CoreOptions.METADATA_STATS_DENSE_STORE, statsDenseStore); + if (statsDenseStore) { + // pk table doesn't need value stats + options.set(CoreOptions.METADATA_STATS_MODE, "none"); + } + }; + writeData(optionsSetter); + FileStoreTable table = createFileStoreTable(optionsSetter); + PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType()); Predicate predicate = and(builder.equal(2, 201L), builder.equal(1, 21)); @@ -604,7 +614,11 @@ private void innerTestStreamingFullChangelog(Consumer configure) throws } private void writeData() throws Exception { - FileStoreTable table = createFileStoreTable(); + writeData(options -> {}); + } + + private void writeData(Consumer optionsSetter) throws Exception { + FileStoreTable table = createFileStoreTable(optionsSetter); StreamTableWrite write = table.newWrite(commitUser); StreamTableCommit commit = table.newCommit(commitUser); @@ -1416,15 +1430,23 @@ public void testStreamingReadOptimizedTable() throws Exception { .hasMessage("Unsupported streaming scan for read optimized table"); } - @Test - public void testReadDeletionVectorTable() throws Exception { - FileStoreTable table = - createFileStoreTable( - options -> { - // let level has many files - options.set(TARGET_FILE_SIZE, new MemorySize(1)); - options.set(DELETION_VECTORS_ENABLED, true); - }); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testReadDeletionVectorTable(boolean statsDenseStore) throws Exception { + Consumer optionsSetter = + options -> { + // let level has many files + options.set(TARGET_FILE_SIZE, new MemorySize(1)); + options.set(DELETION_VECTORS_ENABLED, true); + + options.set(CoreOptions.METADATA_STATS_DENSE_STORE, statsDenseStore); + if (statsDenseStore) { + options.set(CoreOptions.METADATA_STATS_MODE, "none"); + options.set("fields.b.stats-mode", "full"); + } + }; + + FileStoreTable table = createFileStoreTable(optionsSetter); StreamTableWrite write = table.newWrite(commitUser); IOManager ioManager = IOManager.create(tablePath.toString()); write.withIOManager(ioManager); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java index 45b67842b985..489c1ba05217 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java @@ -26,8 +26,8 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.SimpleStats; -import org.apache.paimon.stats.SimpleStatsConverter; -import org.apache.paimon.stats.SimpleStatsConverters; +import org.apache.paimon.stats.SimpleStatsEvolution; +import org.apache.paimon.stats.SimpleStatsEvolutions; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.types.DataField; @@ -122,12 +122,13 @@ protected void validateValuesWithNewSchema( List fileMetaList) { Function> schemaFields = id -> tableSchemas.get(id).logicalTrimmedPrimaryKeysType().getFields(); - SimpleStatsConverters converters = new SimpleStatsConverters(schemaFields, schemaId); + SimpleStatsEvolutions converters = new SimpleStatsEvolutions(schemaFields, schemaId); for (DataFileMeta fileMeta : fileMetaList) { SimpleStats stats = getTableValueStats(fileMeta); - SimpleStatsConverter serializer = converters.getOrCreate(fileMeta.schemaId()); - InternalRow min = serializer.evolution(stats.minValues()); - InternalRow max = serializer.evolution(stats.maxValues()); + SimpleStatsEvolution.Result result = + converters.getOrCreate(fileMeta.schemaId()).evolution(stats, null, null); + InternalRow min = result.minValues(); + InternalRow max = result.maxValues(); assertThat(min.getFieldCount()).isEqualTo(4); if (filesName.contains(fileMeta.fileName())) { // parquet does not support padding diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java index 14faaa671096..a9e093dab124 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java @@ -58,7 +58,8 @@ public static DataFileMeta newFileFromSequence( 0, 0L, null, - FileSource.APPEND); + FileSource.APPEND, + null); } @Test diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java index 4bc9b6f8b15a..c64a12ffae2c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java @@ -26,6 +26,7 @@ import org.apache.paimon.io.DataFileTestDataGenerator; import org.apache.paimon.io.DataInputDeserializer; import org.apache.paimon.io.DataOutputViewStreamWrapper; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.utils.IOUtils; import org.apache.paimon.utils.InstantiationUtil; @@ -73,7 +74,7 @@ public void testSerializer() throws IOException { } @Test - public void testSerializerCompatible() throws Exception { + public void testSerializerNormal() throws Exception { SimpleStats keyStats = new SimpleStats( singleColumn("min_key"), @@ -102,6 +103,58 @@ public void testSerializerCompatible() throws Exception { Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), 11L, new byte[] {1, 2, 4}, + FileSource.COMPACT, + Arrays.asList("field1", "field2", "field3")); + List dataFiles = Collections.singletonList(dataFile); + + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition); + binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa")); + binaryRowWriter.complete(); + + DataSplit split = + DataSplit.builder() + .withSnapshot(18) + .withPartition(partition) + .withBucket(20) + .withDataFiles(dataFiles) + .withBucketPath("my path") + .build(); + + assertThat(InstantiationUtil.clone(split)).isEqualTo(split); + } + + @Test + public void testSerializerCompatibleV1() throws Exception { + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + + DataFileMeta dataFile = + new DataFileMeta( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + 11L, + new byte[] {1, 2, 4}, + null, null); List dataFiles = Collections.singletonList(dataFile); @@ -130,4 +183,64 @@ public void testSerializerCompatible() throws Exception { InstantiationUtil.deserializeObject(v2Bytes, DataSplit.class.getClassLoader()); assertThat(actual).isEqualTo(split); } + + @Test + public void testSerializerCompatibleV2() throws Exception { + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + + DataFileMeta dataFile = + new DataFileMeta( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + 11L, + new byte[] {1, 2, 4}, + FileSource.COMPACT, + null); + List dataFiles = Collections.singletonList(dataFile); + + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition); + binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa")); + binaryRowWriter.complete(); + + DataSplit split = + DataSplit.builder() + .withSnapshot(18) + .withPartition(partition) + .withBucket(20) + .withDataFiles(dataFiles) + .withBucketPath("my path") + .build(); + + byte[] v2Bytes = + IOUtils.readFully( + SplitTest.class + .getClassLoader() + .getResourceAsStream("compatibility/datasplit-v2"), + true); + + DataSplit actual = + InstantiationUtil.deserializeObject(v2Bytes, DataSplit.class.getClassLoader()); + assertThat(actual).isEqualTo(split); + } } diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v2 b/paimon-core/src/test/resources/compatibility/datasplit-v2 new file mode 100644 index 000000000000..8d2014984b8f Binary files /dev/null and b/paimon-core/src/test/resources/compatibility/datasplit-v2 differ diff --git a/paimon-core/src/test/resources/compatibility/manifest-committable-v3 b/paimon-core/src/test/resources/compatibility/manifest-committable-v3 new file mode 100644 index 000000000000..58ad2983cfb4 Binary files /dev/null and b/paimon-core/src/test/resources/compatibility/manifest-committable-v3 differ diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java index 7cacb6c2931f..9f6e8159a1be 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java @@ -79,6 +79,7 @@ private DataFileMeta newFile() { 0, 0L, null, - FileSource.APPEND); + FileSource.APPEND, + null); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java index ad30f6388c90..92c8dd94a14e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java @@ -115,7 +115,8 @@ private DataSplit dataSplit(int partition, int bucket, String... fileNames) { 0, // not used 0L, // not used null, // not used - FileSource.APPEND)); + FileSource.APPEND, + null)); } return DataSplit.builder() .withSnapshot(1) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java index f2c0732a2208..9c884cd5c33d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java @@ -88,7 +88,8 @@ public static DataFileMeta newFile(int level) { level, 0L, null, - FileSource.APPEND); + FileSource.APPEND, + null); } public static FileStoreSourceSplit newSourceSplit( diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala index 63711393039b..fc787246f9f1 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala @@ -26,6 +26,7 @@ import org.apache.paimon.table.source.{DataSplit, Split} import org.junit.jupiter.api.Assertions +import java.util import java.util.{HashMap => JHashMap} import scala.collection.JavaConverters._ @@ -42,7 +43,18 @@ class ScanHelperTest extends PaimonSparkTestBase { 0.until(fileNum).foreach { i => val path = s"f$i.parquet" - files += DataFileMeta.forAppend(path, 750000, 30000, null, 0, 29999, 1, FileSource.APPEND) + files += DataFileMeta.forAppend( + path, + 750000, + 30000, + null, + 0, + 29999, + 1, + new java.util.ArrayList[String](), + null, + FileSource.APPEND, + null) } val dataSplits = mutable.ArrayBuffer.empty[Split] @@ -67,7 +79,18 @@ class ScanHelperTest extends PaimonSparkTestBase { test("Paimon: reshuffle one split") { val files = List( - DataFileMeta.forAppend("f1.parquet", 750000, 30000, null, 0, 29999, 1, FileSource.APPEND) + DataFileMeta.forAppend( + "f1.parquet", + 750000, + 30000, + null, + 0, + 29999, + 1, + new java.util.ArrayList[String](), + null, + FileSource.APPEND, + null) ).asJava val dataSplits: Array[Split] = Array(