diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index b2bd3a976d66..15b1aac93543 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -864,6 +864,12 @@ Integer Default spill compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease. + +
data-file.thin-mode
+ false + Boolean + Enable data file thin mode to avoid duplicate columns storage. +
streaming-read-mode
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 5db809cff1d1..dd5632c18b42 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1452,6 +1452,13 @@ public class CoreOptions implements Serializable { "For DELETE manifest entry in manifest file, drop stats to reduce memory and storage." + " Default value is false only for compatibility of old reader."); + public static final ConfigOption DATA_FILE_THIN_MODE = + key("data-file.thin-mode") + .booleanType() + .defaultValue(false) + .withDescription( + "Enable data file thin mode to avoid duplicate columns storage."); + @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption MATERIALIZED_TABLE_DEFINITION_QUERY = key("materialized-table.definition-query") @@ -2356,6 +2363,10 @@ public boolean statsDenseStore() { return options.get(METADATA_STATS_DENSE_STORE); } + public boolean dataFileThinMode() { + return options.get(DATA_FILE_THIN_MODE); + } + /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueThinSerializer.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueThinSerializer.java new file mode 100644 index 000000000000..6dd41a42506a --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueThinSerializer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.JoinedRow; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ObjectSerializer; + +/** Serialize KeyValue to InternalRow with ignorance of key. Only used to write KeyValue to disk. */ +public class KeyValueThinSerializer extends ObjectSerializer { + + private static final long serialVersionUID = 1L; + + private final GenericRow reusedMeta; + private final JoinedRow reusedKeyWithMeta; + + public KeyValueThinSerializer(RowType keyType, RowType valueType) { + super(KeyValue.schema(keyType, valueType)); + + this.reusedMeta = new GenericRow(2); + this.reusedKeyWithMeta = new JoinedRow(); + } + + public InternalRow toRow(KeyValue record) { + return toRow(record.sequenceNumber(), record.valueKind(), record.value()); + } + + public InternalRow toRow(long sequenceNumber, RowKind valueKind, InternalRow value) { + reusedMeta.setField(0, sequenceNumber); + reusedMeta.setField(1, valueKind.toByteValue()); + return reusedKeyWithMeta.replace(reusedMeta, value); + } + + @Override + public KeyValue fromRow(InternalRow row) { + throw new UnsupportedOperationException( + "KeyValue cannot be deserialized from InternalRow by this serializer."); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index ce0b3b02840b..651c6a6f7b56 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -42,7 +42,6 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.function.Function; @@ -56,13 +55,13 @@ *

NOTE: records given to the writer must be sorted because it does not compare the min max keys * to produce {@link DataFileMeta}. */ -public class KeyValueDataFileWriter +public abstract class KeyValueDataFileWriter extends StatsCollectingSingleFileWriter { private static final Logger LOG = LoggerFactory.getLogger(KeyValueDataFileWriter.class); - private final RowType keyType; - private final RowType valueType; + protected final RowType keyType; + protected final RowType valueType; private final long schemaId; private final int level; @@ -85,6 +84,7 @@ public KeyValueDataFileWriter( Function converter, RowType keyType, RowType valueType, + RowType writeRowType, @Nullable SimpleStatsExtractor simpleStatsExtractor, long schemaId, int level, @@ -97,11 +97,11 @@ public KeyValueDataFileWriter( factory, path, converter, - KeyValue.schema(keyType, valueType), + writeRowType, simpleStatsExtractor, compression, StatsCollectorFactories.createStatsFactories( - options, KeyValue.schema(keyType, valueType).getFieldNames()), + options, writeRowType.getFieldNames(), keyType.getFieldNames()), options.asyncFileWrite()); this.keyType = keyType; @@ -166,17 +166,11 @@ public DataFileMeta result() throws IOException { return null; } - SimpleColStats[] rowStats = fieldStats(); - int numKeyFields = keyType.getFieldCount(); - - SimpleColStats[] keyFieldStats = Arrays.copyOfRange(rowStats, 0, numKeyFields); - SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyFieldStats); - - SimpleColStats[] valFieldStats = - Arrays.copyOfRange(rowStats, numKeyFields + 2, rowStats.length); + Pair keyValueStats = fetchKeyValueStats(fieldStats()); + SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyValueStats.getKey()); Pair, SimpleStats> valueStatsPair = - valueStatsConverter.toBinary(valFieldStats); + valueStatsConverter.toBinary(keyValueStats.getValue()); DataFileIndexWriter.FileIndexResult indexResult = dataFileIndexWriter == null @@ -204,6 +198,8 @@ public DataFileMeta result() throws IOException { valueStatsPair.getKey()); } + abstract Pair fetchKeyValueStats(SimpleColStats[] rowStats); + @Override public void close() throws IOException { if (dataFileIndexWriter != null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java new file mode 100644 index 000000000000..27a1aef64e36 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.SimpleColStats; +import org.apache.paimon.format.SimpleStatsExtractor; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.function.Function; + +/** Write data files containing {@link KeyValue}s. */ +public class KeyValueDataFileWriterImpl extends KeyValueDataFileWriter { + + public KeyValueDataFileWriterImpl( + FileIO fileIO, + FormatWriterFactory factory, + Path path, + Function converter, + RowType keyType, + RowType valueType, + @Nullable SimpleStatsExtractor simpleStatsExtractor, + long schemaId, + int level, + String compression, + CoreOptions options, + FileSource fileSource, + FileIndexOptions fileIndexOptions) { + super( + fileIO, + factory, + path, + converter, + keyType, + valueType, + KeyValue.schema(keyType, valueType), + simpleStatsExtractor, + schemaId, + level, + compression, + options, + fileSource, + fileIndexOptions); + } + + @Override + Pair fetchKeyValueStats(SimpleColStats[] rowStats) { + int numKeyFields = keyType.getFieldCount(); + return Pair.of( + Arrays.copyOfRange(rowStats, 0, numKeyFields), + Arrays.copyOfRange(rowStats, numKeyFields + 2, rowStats.length)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index a6fddb43283a..a6aae3985bd4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.KeyValueSerializer; +import org.apache.paimon.KeyValueThinSerializer; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fileindex.FileIndexOptions; @@ -31,6 +32,8 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.StatsCollectorFactories; @@ -38,10 +41,13 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; /** A factory to create {@link FileWriter}s for writing {@link KeyValue} files. */ public class KeyValueFileWriterFactory { @@ -58,15 +64,13 @@ public class KeyValueFileWriterFactory { private KeyValueFileWriterFactory( FileIO fileIO, long schemaId, - RowType keyType, - RowType valueType, WriteFormatContext formatContext, long suggestedFileSize, CoreOptions options) { this.fileIO = fileIO; this.schemaId = schemaId; - this.keyType = keyType; - this.valueType = valueType; + this.keyType = formatContext.keyType; + this.valueType = formatContext.valueType; this.formatContext = formatContext; this.suggestedFileSize = suggestedFileSize; this.options = options; @@ -107,21 +111,35 @@ public RollingFileWriter createRollingChangelogFileWrite private KeyValueDataFileWriter createDataFileWriter( Path path, int level, FileSource fileSource) { - KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType); - return new KeyValueDataFileWriter( - fileIO, - formatContext.writerFactory(level), - path, - kvSerializer::toRow, - keyType, - valueType, - formatContext.extractor(level), - schemaId, - level, - formatContext.compression(level), - options, - fileSource, - fileIndexOptions); + return formatContext.thinModeEnabled() + ? new KeyValueThinDataFileWriterImpl( + fileIO, + formatContext.writerFactory(level), + path, + new KeyValueThinSerializer(keyType, valueType)::toRow, + keyType, + valueType, + formatContext.extractor(level), + schemaId, + level, + formatContext.compression(level), + options, + fileSource, + fileIndexOptions) + : new KeyValueDataFileWriterImpl( + fileIO, + formatContext.writerFactory(level), + path, + new KeyValueSerializer(keyType, valueType)::toRow, + keyType, + valueType, + formatContext.extractor(level), + schemaId, + level, + formatContext.compression(level), + options, + fileSource, + fileIndexOptions); } public void deleteFile(String filename, int level) { @@ -191,17 +209,17 @@ private Builder( public KeyValueFileWriterFactory build( BinaryRow partition, int bucket, CoreOptions options) { - RowType fileRowType = KeyValue.schema(keyType, valueType); WriteFormatContext context = new WriteFormatContext( partition, bucket, - fileRowType, + keyType, + valueType, fileFormat, format2PathFactory, options); return new KeyValueFileWriterFactory( - fileIO, schemaId, keyType, valueType, context, suggestedFileSize, options); + fileIO, schemaId, context, suggestedFileSize, options); } } @@ -214,13 +232,24 @@ private static class WriteFormatContext { private final Map format2PathFactory; private final Map format2WriterFactory; + private final RowType keyType; + private final RowType valueType; + private final boolean thinModeEnabled; + private WriteFormatContext( BinaryRow partition, int bucket, - RowType rowType, + RowType keyType, + RowType valueType, FileFormat defaultFormat, Map parentFactories, CoreOptions options) { + this.keyType = keyType; + this.valueType = valueType; + this.thinModeEnabled = + options.dataFileThinMode() && supportsThinMode(keyType, valueType); + RowType writeRowType = + KeyValue.schema(thinModeEnabled ? RowType.of() : keyType, valueType); Map fileFormatPerLevel = options.fileFormatPerLevel(); this.level2Format = level -> @@ -236,7 +265,10 @@ private WriteFormatContext( this.format2PathFactory = new HashMap<>(); this.format2WriterFactory = new HashMap<>(); SimpleColStatsCollector.Factory[] statsCollectorFactories = - StatsCollectorFactories.createStatsFactories(options, rowType.getFieldNames()); + StatsCollectorFactories.createStatsFactories( + options, + writeRowType.getFieldNames(), + thinModeEnabled ? keyType.getFieldNames() : Collections.emptyList()); for (String format : parentFactories.keySet()) { format2PathFactory.put( format, @@ -252,11 +284,30 @@ private WriteFormatContext( format.equals("avro") ? Optional.empty() : fileFormat.createStatsExtractor( - rowType, statsCollectorFactories)); - format2WriterFactory.put(format, fileFormat.createWriterFactory(rowType)); + writeRowType, statsCollectorFactories)); + format2WriterFactory.put(format, fileFormat.createWriterFactory(writeRowType)); } } + private boolean supportsThinMode(RowType keyType, RowType valueType) { + Set keyFieldIds = + valueType.getFields().stream().map(DataField::id).collect(Collectors.toSet()); + + for (DataField field : keyType.getFields()) { + if (!SpecialFields.isKeyField(field.name())) { + return false; + } + if (!keyFieldIds.contains(field.id() - SpecialFields.KEY_FIELD_ID_START)) { + return false; + } + } + return true; + } + + private boolean thinModeEnabled() { + return thinModeEnabled; + } + @Nullable private SimpleStatsExtractor extractor(int level) { return format2Extractor.get(level2Format.apply(level)).orElse(null); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java new file mode 100644 index 000000000000..dd7ebb006764 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.SimpleColStats; +import org.apache.paimon.format.SimpleStatsExtractor; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +/** + * Implementation of KeyValueDataFileWriter for thin data files. Thin data files only contain + * _SEQUENCE_NUMBER_, _ROW_KIND_ and value fields. + */ +public class KeyValueThinDataFileWriterImpl extends KeyValueDataFileWriter { + + private final int[] keyStatMapping; + + /** + * Constructs a KeyValueThinDataFileWriterImpl. + * + * @param fileIO The file IO interface. + * @param factory The format writer factory. + * @param path The path to the file. + * @param converter The function to convert KeyValue to InternalRow. + * @param keyType The row type of the key. + * @param valueType The row type of the value. + * @param simpleStatsExtractor The simple stats extractor, can be null. + * @param schemaId The schema ID. + * @param level The level. + * @param compression The compression type. + * @param options The core options. + * @param fileSource The file source. + * @param fileIndexOptions The file index options. + */ + public KeyValueThinDataFileWriterImpl( + FileIO fileIO, + FormatWriterFactory factory, + Path path, + Function converter, + RowType keyType, + RowType valueType, + @Nullable SimpleStatsExtractor simpleStatsExtractor, + long schemaId, + int level, + String compression, + CoreOptions options, + FileSource fileSource, + FileIndexOptions fileIndexOptions) { + super( + fileIO, + factory, + path, + converter, + keyType, + valueType, + KeyValue.schema(RowType.of(), valueType), + simpleStatsExtractor, + schemaId, + level, + compression, + options, + fileSource, + fileIndexOptions); + Map idToIndex = new HashMap<>(valueType.getFieldCount()); + for (int i = 0; i < valueType.getFieldCount(); i++) { + idToIndex.put(valueType.getFields().get(i).id(), i); + } + this.keyStatMapping = new int[keyType.getFieldCount()]; + for (int i = 0; i < keyType.getFieldCount(); i++) { + keyStatMapping[i] = + idToIndex.get( + keyType.getFields().get(i).id() - SpecialFields.KEY_FIELD_ID_START); + } + } + + /** + * Fetches the key and value statistics. + * + * @param rowStats The row statistics. + * @return A pair of key statistics and value statistics. + */ + @Override + Pair fetchKeyValueStats(SimpleColStats[] rowStats) { + int numKeyFields = keyType.getFieldCount(); + // In thin mode, there is no key stats in rowStats, so we only jump + // _SEQUNCE_NUMBER_ and _ROW_KIND_ stats. Therefore, the 'from' value is 2. + SimpleColStats[] valFieldStats = Arrays.copyOfRange(rowStats, 2, rowStats.length); + // Thin mode on, so need to map value stats to key stats. + SimpleColStats[] keyStats = new SimpleColStats[numKeyFields]; + for (int i = 0; i < keyStatMapping.length; i++) { + keyStats[i] = valFieldStats[keyStatMapping[i]]; + } + + return Pair.of(keyStats, valFieldStats); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 3ce019c91638..4a6196453df6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -212,6 +212,9 @@ protected void forceBufferSpill() throws Exception { if (ioManager == null) { return; } + if (forceBufferSpill) { + return; + } forceBufferSpill = true; LOG.info( "Force buffer spill for append-only file store write, writer number is: {}", diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 58241033f5fb..3989786bd277 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -150,22 +150,28 @@ public void run() { private void tryToCreateTags(Snapshot snapshot) { Optional timeOptional = timeExtractor.extract(snapshot.timeMillis(), snapshot.watermark()); + LOG.info("Starting to create a tag for snapshot {}.", snapshot.id()); if (!timeOptional.isPresent()) { return; } LocalDateTime time = timeOptional.get(); + LOG.info("The time of snapshot {} is {}.", snapshot.id(), time); + LOG.info("The next tag time is {}.", nextTag); if (nextTag == null || isAfterOrEqual(time.minus(delay), periodHandler.nextTagTime(nextTag))) { LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time); + LOG.info("Create tag for snapshot {} with time {}.", snapshot.id(), thisTag); if (automaticCompletion && nextTag != null) { thisTag = nextTag; } String tagName = periodHandler.timeToTag(thisTag); + LOG.info("The tag name is {}.", tagName); if (!tagManager.tagExists(tagName)) { tagManager.createTag(snapshot, tagName, defaultTimeRetained, callbacks); } nextTag = periodHandler.nextTagTime(thisTag); + LOG.info("The next tag time after this is {}.", nextTag); if (numRetainedMax != null) { // only handle auto-created tags here diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java index 1ed1b3f2d4a2..817c20af4612 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java @@ -52,6 +52,7 @@ public static TagAutoManager create( TagManager tagManager, TagDeletion tagDeletion, List callbacks) { + TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options); return new TagAutoManager( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java index de94b2e23eff..abb1d686073f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java @@ -24,6 +24,7 @@ import org.apache.paimon.statistics.TruncateSimpleColStatsCollector; import org.apache.paimon.table.SpecialFields; +import java.util.Collections; import java.util.List; import static org.apache.paimon.CoreOptions.FIELDS_PREFIX; @@ -35,6 +36,11 @@ public class StatsCollectorFactories { public static SimpleColStatsCollector.Factory[] createStatsFactories( CoreOptions options, List fields) { + return createStatsFactories(options, fields, Collections.emptyList()); + } + + public static SimpleColStatsCollector.Factory[] createStatsFactories( + CoreOptions options, List fields, List keyNames) { Options cfg = options.toConfiguration(); SimpleColStatsCollector.Factory[] modes = new SimpleColStatsCollector.Factory[fields.size()]; @@ -47,7 +53,11 @@ public static SimpleColStatsCollector.Factory[] createStatsFactories( .noDefaultValue()); if (fieldMode != null) { modes[i] = SimpleColStatsCollector.from(fieldMode); - } else if (SpecialFields.isSystemField(field)) { + } else if (SpecialFields.isSystemField(field) + || + // If we config DATA_FILE_THIN_MODE to true, we need to maintain the + // stats for key fields. + keyNames.contains(SpecialFields.KEY_FIELD_PREFIX + field)) { modes[i] = () -> new TruncateSimpleColStatsCollector(128); } else { modes[i] = SimpleColStatsCollector.from(cfg.get(CoreOptions.METADATA_STATS_MODE)); diff --git a/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java b/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java index 657c791351a4..587204cd7616 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java @@ -95,10 +95,12 @@ public class TestKeyValueGenerator { public static final RowType KEY_TYPE = RowType.of( new DataField( - 2 + SpecialFields.KEY_FIELD_ID_START, "key_shopId", new IntType(false)), + 2 + SpecialFields.KEY_FIELD_ID_START, + SpecialFields.KEY_FIELD_PREFIX + "shopId", + new IntType(false)), new DataField( 3 + SpecialFields.KEY_FIELD_ID_START, - "key_orderId", + SpecialFields.KEY_FIELD_PREFIX + "orderId", new BigIntType(false))); public static final InternalRowSerializer DEFAULT_ROW_SERIALIZER = @@ -281,7 +283,7 @@ public BinaryRow getPartition(KeyValue kv) { public static List getPrimaryKeys(GeneratorMode mode) { List trimmedPk = KEY_TYPE.getFieldNames().stream() - .map(f -> f.replaceFirst("key_", "")) + .map(f -> f.replaceFirst(SpecialFields.KEY_FIELD_PREFIX, "")) .collect(Collectors.toList()); if (mode != NON_PARTITIONED) { trimmedPk = new ArrayList<>(trimmedPk); @@ -394,7 +396,7 @@ public List keyFields(TableSchema schema) { f -> new DataField( f.id() + SpecialFields.KEY_FIELD_ID_START, - "key_" + f.name(), + SpecialFields.KEY_FIELD_PREFIX + f.name(), f.type(), f.description())) .collect(Collectors.toList()); diff --git a/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java new file mode 100644 index 000000000000..3f8015b33b2d --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.types.DataTypes; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; + +/** This class test the compatibility and effectiveness of storage thin mode. */ +public class ThinModeReadWriteTest extends TableTestBase { + + private Table createTable(String format, Boolean thinMode) throws Exception { + catalog.createTable(identifier(), schema(format, thinMode), true); + return catalog.getTable(identifier()); + } + + private Schema schema(String format, Boolean thinMode) { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.INT()); + schemaBuilder.column("f2", DataTypes.SMALLINT()); + schemaBuilder.column("f3", DataTypes.STRING()); + schemaBuilder.column("f4", DataTypes.DOUBLE()); + schemaBuilder.column("f5", DataTypes.CHAR(100)); + schemaBuilder.column("f6", DataTypes.VARCHAR(100)); + schemaBuilder.column("f7", DataTypes.BOOLEAN()); + schemaBuilder.column("f8", DataTypes.INT()); + schemaBuilder.column("f9", DataTypes.TIME()); + schemaBuilder.column("f10", DataTypes.TIMESTAMP()); + schemaBuilder.column("f11", DataTypes.DECIMAL(10, 2)); + schemaBuilder.column("f12", DataTypes.BYTES()); + schemaBuilder.column("f13", DataTypes.FLOAT()); + schemaBuilder.column("f14", DataTypes.BINARY(100)); + schemaBuilder.column("f15", DataTypes.VARBINARY(100)); + schemaBuilder.primaryKey( + "f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12", + "f13"); + schemaBuilder.option("bucket", "1"); + schemaBuilder.option("bucket-key", "f1"); + schemaBuilder.option("file.format", format); + schemaBuilder.option("data-file.thin-mode", thinMode.toString()); + return schemaBuilder.build(); + } + + @Test + public void testThinModeWorks() throws Exception { + + InternalRow[] datas = datas(200000); + + Table table = createTable("orc", true); + write(table, datas); + + long size1 = tableSize(table); + dropTableDefault(); + + table = createTable("orc", false); + write(table, datas); + long size2 = tableSize(table); + dropTableDefault(); + + Assertions.assertThat(size2).isGreaterThan(size1); + } + + @Test + public void testAllFormatReadWrite() throws Exception { + testFormat("orc"); + testFormat("parquet"); + testFormat("avro"); + } + + private void testFormat(String format) throws Exception { + testReadWrite(format, true); + testReadWrite(format, true); + testReadWrite(format, false); + testReadWrite(format, false); + } + + private void testReadWrite(String format, boolean writeThin) throws Exception { + Table table = createTable(format, writeThin); + + InternalRow[] datas = datas(2000); + + write(table, datas); + + List readed = read(table); + + Assertions.assertThat(readed).containsExactlyInAnyOrder(datas); + dropTableDefault(); + } + + InternalRow[] datas(int i) { + InternalRow[] arrays = new InternalRow[i]; + for (int j = 0; j < i; j++) { + arrays[j] = data(); + } + return arrays; + } + + protected InternalRow data() { + return GenericRow.of( + RANDOM.nextInt(), + RANDOM.nextInt(), + (short) RANDOM.nextInt(), + randomString(), + RANDOM.nextDouble(), + randomString(), + randomString(), + RANDOM.nextBoolean(), + RANDOM.nextInt(), + RANDOM.nextInt(), + Timestamp.now(), + Decimal.zero(10, 2), + randomBytes(), + (float) RANDOM.nextDouble(), + randomBytes(), + randomBytes()); + } + + public static long tableSize(Table table) { + long count = 0; + List files = + ((FileStoreTable) table).store().newScan().plan().files(FileKind.ADD); + for (ManifestEntry file : files) { + count += file.file().fileSize(); + } + + return count; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index b648e2af8972..e43cd898dbc2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -37,6 +37,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.stats.StatsTestUtils; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CloseableIterator; import org.apache.paimon.utils.FailingFileIO; @@ -158,7 +159,7 @@ public void testReadKeyType() throws Exception { List actualMetas = writer.result(); // projection: (shopId, orderId) -> (orderId) - RowType readKeyType = KEY_TYPE.project("key_orderId"); + RowType readKeyType = KEY_TYPE.project(SpecialFields.KEY_FIELD_PREFIX + "orderId"); KeyValueFileReaderFactory readerFactory = createReaderFactory(tempDir.toString(), "avro", readKeyType, null); InternalRowSerializer projectedKeySerializer = new InternalRowSerializer(readKeyType); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index 0ab636c33aa3..be49311427a0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -41,6 +41,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.SchemaEvolutionTableTestBase; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; @@ -77,7 +78,12 @@ public class ContainsLevelsTest { private final Comparator comparator = Comparator.comparingInt(o -> o.getInt(0)); - private final RowType keyType = DataTypes.ROW(DataTypes.FIELD(0, "_key", DataTypes.INT())); + private final RowType keyType = + DataTypes.ROW( + DataTypes.FIELD( + SpecialFields.KEY_FIELD_ID_START, + SpecialFields.KEY_FIELD_PREFIX + "key", + DataTypes.INT())); private final RowType rowType = DataTypes.ROW( DataTypes.FIELD(0, "key", DataTypes.INT()), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 2dce81ce56b4..a678534042eb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -41,6 +41,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.SchemaEvolutionTableTestBase; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; @@ -79,7 +80,9 @@ public class LookupLevelsTest { private final Comparator comparator = Comparator.comparingInt(o -> o.getInt(0)); - private final RowType keyType = DataTypes.ROW(DataTypes.FIELD(0, "_key", DataTypes.INT())); + private final RowType keyType = + DataTypes.ROW( + DataTypes.FIELD(SpecialFields.KEY_FIELD_ID_START, "_KEY_key", DataTypes.INT())); private final RowType rowType = DataTypes.ROW( DataTypes.FIELD(0, "key", DataTypes.INT()), diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java index 46b64422fd9b..59f848a296cf 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java @@ -37,6 +37,7 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; @@ -284,7 +285,12 @@ private TestFileStore createStore( ? Collections.emptyList() : Stream.concat( keyType.getFieldNames().stream() - .map(field -> field.replace("key_", "")), + .map( + field -> + field.replace( + SpecialFields + .KEY_FIELD_PREFIX, + "")), partitionType.getFieldNames().stream()) .collect(Collectors.toList()), Collections.emptyMap(), diff --git a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java index 25282d898a3d..ce8cfc9228ad 100644 --- a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java @@ -33,7 +33,8 @@ import org.apache.paimon.table.TableTestBase; import org.apache.paimon.types.DataTypes; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static org.apache.paimon.CoreOptions.METADATA_STATS_DENSE_STORE; import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE; @@ -42,13 +43,15 @@ /** Test for table stats mode. */ public class StatsTableTest extends TableTestBase { - @Test - public void testPartitionStatsNotDense() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPartitionStatsNotDense(boolean thinMode) throws Exception { Identifier identifier = identifier("T"); Options options = new Options(); options.set(METADATA_STATS_MODE, "NONE"); options.set(METADATA_STATS_DENSE_STORE, false); options.set(CoreOptions.BUCKET, 1); + options.set(CoreOptions.DATA_FILE_THIN_MODE, thinMode); Schema schema = Schema.newBuilder() .column("pt", DataTypes.INT()) @@ -86,19 +89,25 @@ public void testPartitionStatsNotDense() throws Exception { manifestFile.read(manifest.fileName(), manifest.fileSize()).get(0).file(); SimpleStats recordStats = file.valueStats(); assertThat(recordStats.minValues().isNullAt(0)).isTrue(); - assertThat(recordStats.minValues().isNullAt(1)).isTrue(); + assertThat(recordStats.minValues().isNullAt(1)).isEqualTo(!thinMode); assertThat(recordStats.minValues().isNullAt(2)).isTrue(); assertThat(recordStats.maxValues().isNullAt(0)).isTrue(); - assertThat(recordStats.maxValues().isNullAt(1)).isTrue(); + assertThat(recordStats.maxValues().isNullAt(1)).isEqualTo(!thinMode); assertThat(recordStats.maxValues().isNullAt(2)).isTrue(); + + SimpleStats keyStats = file.keyStats(); + assertThat(keyStats.minValues().isNullAt(0)).isFalse(); + assertThat(keyStats.maxValues().isNullAt(0)).isFalse(); } - @Test - public void testPartitionStatsDenseMode() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPartitionStatsDenseMode(boolean thinMode) throws Exception { Identifier identifier = identifier("T"); Options options = new Options(); options.set(METADATA_STATS_MODE, "NONE"); options.set(CoreOptions.BUCKET, 1); + options.set(CoreOptions.DATA_FILE_THIN_MODE, thinMode); Schema schema = Schema.newBuilder() .column("pt", DataTypes.INT()) @@ -135,9 +144,10 @@ public void testPartitionStatsDenseMode() throws Exception { DataFileMeta file = manifestFile.read(manifest.fileName(), manifest.fileSize()).get(0).file(); SimpleStats recordStats = file.valueStats(); - assertThat(file.valueStatsCols()).isEmpty(); - assertThat(recordStats.minValues().getFieldCount()).isEqualTo(0); - assertThat(recordStats.maxValues().getFieldCount()).isEqualTo(0); - assertThat(recordStats.nullCounts().size()).isEqualTo(0); + int count = thinMode ? 1 : 0; + assertThat(file.valueStatsCols().size()).isEqualTo(count); + assertThat(recordStats.minValues().getFieldCount()).isEqualTo(count); + assertThat(recordStats.maxValues().getFieldCount()).isEqualTo(count); + assertThat(recordStats.nullCounts().size()).isEqualTo(count); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java index 7f850a7725b4..7d7617cf8bd1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java @@ -160,6 +160,10 @@ protected void compact( } } + public void dropTableDefault() throws Exception { + catalog.dropTable(identifier(), true); + } + protected List read(Table table, Pair, String>... dynamicOptions) throws Exception { return read(table, null, dynamicOptions);