From 7f0442e6200b8bcf4e935c0bf84b6e3e14dd15b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Mon, 9 Dec 2024 15:29:26 +0800 Subject: [PATCH 01/33] [core] Write thin mode --- .../generated/core_configuration.html | 6 + .../java/org/apache/paimon/CoreOptions.java | 10 ++ .../main/java/org/apache/paimon/KeyValue.java | 4 + .../apache/paimon/KeyValueThinSerializer.java | 58 ++++++ .../paimon/io/KeyValueDataFileWriter.java | 57 +++++- .../paimon/io/KeyValueFileWriterFactory.java | 17 +- .../paimon/mergetree/MergeTreeWriter.java | 6 +- .../operation/KeyValueFileStoreWrite.java | 2 + .../paimon/utils/StatsCollectorFactories.java | 10 +- .../paimon/format/ThinModeReadWriteTest.java | 170 ++++++++++++++++++ .../ChangelogMergeTreeRewriterTest.java | 5 +- .../paimon/mergetree/ContainsLevelsTest.java | 5 +- .../paimon/mergetree/LookupLevelsTest.java | 5 +- .../paimon/mergetree/MergeTreeTestBase.java | 3 + .../operation/MergeFileSplitReadTest.java | 2 +- .../apache/paimon/stats/StatsTableTest.java | 12 +- .../table/PrimaryKeyFileStoreTableTest.java | 4 +- .../apache/paimon/table/TableTestBase.java | 6 + .../source/TestChangelogDataReadWrite.java | 6 +- 19 files changed, 360 insertions(+), 28 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/KeyValueThinSerializer.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index b2bd3a976d66..ff47f4c36625 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -864,6 +864,12 @@ Integer Default spill compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease. + +
storage.thin-mode
+ true + Boolean + Enable storage thin mode to avoid duplicate columns store. +
streaming-read-mode
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 5db809cff1d1..965a704e29cf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1452,6 +1452,12 @@ public class CoreOptions implements Serializable { "For DELETE manifest entry in manifest file, drop stats to reduce memory and storage." + " Default value is false only for compatibility of old reader."); + public static final ConfigOption STORAGE_THIN_MODE = + key("storage.thin-mode") + .booleanType() + .defaultValue(true) + .withDescription("Enable storage thin mode to avoid duplicate columns store."); + @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption MATERIALIZED_TABLE_DEFINITION_QUERY = key("materialized-table.definition-query") @@ -2356,6 +2362,10 @@ public boolean statsDenseStore() { return options.get(METADATA_STATS_DENSE_STORE); } + public boolean thinMode() { + return options.get(STORAGE_THIN_MODE); + } + /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java index 36ac88996ce3..75e0a0659303 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java @@ -114,6 +114,10 @@ public static RowType schema(RowType keyType, RowType valueType) { return new RowType(createKeyValueFields(keyType.getFields(), valueType.getFields())); } + public static RowType schema(RowType valueType) { + return schema(RowType.of(), valueType); + } + public static RowType schemaWithLevel(RowType keyType, RowType valueType) { List fields = new ArrayList<>(schema(keyType, valueType).getFields()); fields.add(LEVEL); diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueThinSerializer.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueThinSerializer.java new file mode 100644 index 000000000000..6dd41a42506a --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueThinSerializer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.JoinedRow; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ObjectSerializer; + +/** Serialize KeyValue to InternalRow with ignorance of key. Only used to write KeyValue to disk. */ +public class KeyValueThinSerializer extends ObjectSerializer { + + private static final long serialVersionUID = 1L; + + private final GenericRow reusedMeta; + private final JoinedRow reusedKeyWithMeta; + + public KeyValueThinSerializer(RowType keyType, RowType valueType) { + super(KeyValue.schema(keyType, valueType)); + + this.reusedMeta = new GenericRow(2); + this.reusedKeyWithMeta = new JoinedRow(); + } + + public InternalRow toRow(KeyValue record) { + return toRow(record.sequenceNumber(), record.valueKind(), record.value()); + } + + public InternalRow toRow(long sequenceNumber, RowKind valueKind, InternalRow value) { + reusedMeta.setField(0, sequenceNumber); + reusedMeta.setField(1, valueKind.toByteValue()); + return reusedKeyWithMeta.replace(reusedMeta, value); + } + + @Override + public KeyValue fromRow(InternalRow row) { + throw new UnsupportedOperationException( + "KeyValue cannot be deserialized from InternalRow by this serializer."); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index ce0b3b02840b..55e1c56ea192 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -32,6 +32,7 @@ import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.stats.SimpleStatsConverter; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.StatsCollectorFactories; @@ -44,7 +45,9 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Function; import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath; @@ -77,6 +80,8 @@ public class KeyValueDataFileWriter private long minSeqNumber = Long.MAX_VALUE; private long maxSeqNumber = Long.MIN_VALUE; private long deleteRecordCount = 0; + @Nullable private final KeyStateAbstractor keyStateAbstractor; + private final boolean thinMode; public KeyValueDataFileWriter( FileIO fileIO, @@ -97,11 +102,14 @@ public KeyValueDataFileWriter( factory, path, converter, - KeyValue.schema(keyType, valueType), + KeyValue.schema(options.thinMode() ? RowType.of() : keyType, valueType), simpleStatsExtractor, compression, StatsCollectorFactories.createStatsFactories( - options, KeyValue.schema(keyType, valueType).getFieldNames()), + options, + KeyValue.schema(options.thinMode() ? RowType.of() : keyType, valueType) + .getFieldNames(), + keyType.getFieldNames()), options.asyncFileWrite()); this.keyType = keyType; @@ -116,6 +124,8 @@ public KeyValueDataFileWriter( this.dataFileIndexWriter = DataFileIndexWriter.create( fileIO, dataFileToFileIndexPath(path), valueType, fileIndexOptions); + this.thinMode = options.thinMode(); + this.keyStateAbstractor = thinMode ? new KeyStateAbstractor(keyType, valueType) : null; } @Override @@ -169,15 +179,17 @@ public DataFileMeta result() throws IOException { SimpleColStats[] rowStats = fieldStats(); int numKeyFields = keyType.getFieldCount(); - SimpleColStats[] keyFieldStats = Arrays.copyOfRange(rowStats, 0, numKeyFields); - SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyFieldStats); - - SimpleColStats[] valFieldStats = - Arrays.copyOfRange(rowStats, numKeyFields + 2, rowStats.length); - + int valueFrom = thinMode ? 2 : numKeyFields + 2; + SimpleColStats[] valFieldStats = Arrays.copyOfRange(rowStats, valueFrom, rowStats.length); Pair, SimpleStats> valueStatsPair = valueStatsConverter.toBinary(valFieldStats); + SimpleColStats[] keyFieldStats = + thinMode + ? keyStateAbstractor.abstractFromValueState(valFieldStats) + : Arrays.copyOfRange(rowStats, 0, numKeyFields); + SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyFieldStats); + DataFileIndexWriter.FileIndexResult indexResult = dataFileIndexWriter == null ? DataFileIndexWriter.EMPTY_RESULT @@ -211,4 +223,33 @@ public void close() throws IOException { } super.close(); } + + private static class KeyStateAbstractor { + + private final int[] keyStatMapping; + + public KeyStateAbstractor(RowType keyType, RowType valueType) { + + Map idToIndex = new HashMap<>(); + for (int i = 0; i < valueType.getFieldCount(); i++) { + idToIndex.put(valueType.getFields().get(i).id(), i); + } + + keyStatMapping = new int[keyType.getFieldCount()]; + + for (int i = 0; i < keyType.getFieldCount(); i++) { + keyStatMapping[i] = + idToIndex.get( + keyType.getFields().get(i).id() - SpecialFields.KEY_FIELD_ID_START); + } + } + + SimpleColStats[] abstractFromValueState(SimpleColStats[] valueStats) { + SimpleColStats[] keyStats = new SimpleColStats[keyStatMapping.length]; + for (int i = 0; i < keyStatMapping.length; i++) { + keyStats[i] = valueStats[keyStatMapping[i]]; + } + return keyStats; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index a6fddb43283a..2e25c4b1e539 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.KeyValueSerializer; +import org.apache.paimon.KeyValueThinSerializer; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fileindex.FileIndexOptions; @@ -33,6 +34,7 @@ import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.ObjectSerializer; import org.apache.paimon.utils.StatsCollectorFactories; import javax.annotation.Nullable; @@ -107,7 +109,10 @@ public RollingFileWriter createRollingChangelogFileWrite private KeyValueDataFileWriter createDataFileWriter( Path path, int level, FileSource fileSource) { - KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType); + ObjectSerializer kvSerializer = + options.thinMode() + ? new KeyValueThinSerializer(keyType, valueType) + : new KeyValueSerializer(keyType, valueType); return new KeyValueDataFileWriter( fileIO, formatContext.writerFactory(level), @@ -191,12 +196,14 @@ private Builder( public KeyValueFileWriterFactory build( BinaryRow partition, int bucket, CoreOptions options) { - RowType fileRowType = KeyValue.schema(keyType, valueType); + RowType finalKeyType = options.thinMode() ? RowType.of() : keyType; + RowType writeKeyType = KeyValue.schema(finalKeyType, valueType); WriteFormatContext context = new WriteFormatContext( partition, bucket, - fileRowType, + keyType, + writeKeyType, fileFormat, format2PathFactory, options); @@ -217,6 +224,7 @@ private static class WriteFormatContext { private WriteFormatContext( BinaryRow partition, int bucket, + RowType keyType, RowType rowType, FileFormat defaultFormat, Map parentFactories, @@ -236,7 +244,8 @@ private WriteFormatContext( this.format2PathFactory = new HashMap<>(); this.format2WriterFactory = new HashMap<>(); SimpleColStatsCollector.Factory[] statsCollectorFactories = - StatsCollectorFactories.createStatsFactories(options, rowType.getFieldNames()); + StatsCollectorFactories.createStatsFactories( + options, rowType.getFieldNames(), keyType.getFieldNames()); for (String format : parentFactories.keySet()) { format2PathFactory.put( format, diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index f2a964bae16a..34aab1b767ba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -96,6 +96,8 @@ public MergeTreeWriter( long maxSequenceNumber, Comparator keyComparator, MergeFunction mergeFunction, + RowType keyType, + RowType valueType, KeyValueFileWriterFactory writerFactory, boolean commitForceCompact, ChangelogProducer changelogProducer, @@ -106,8 +108,8 @@ public MergeTreeWriter( this.sortMaxFan = sortMaxFan; this.sortCompression = sortCompression; this.ioManager = ioManager; - this.keyType = writerFactory.keyType(); - this.valueType = writerFactory.valueType(); + this.keyType = keyType; + this.valueType = valueType; this.compactManager = compactManager; this.newSequenceNumber = maxSequenceNumber + 1; this.keyComparator = keyComparator; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index d061e181618b..05bf5797e050 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -220,6 +220,8 @@ protected MergeTreeWriter createWriter( restoredMaxSeqNumber, keyComparator, mfFactory.create(), + keyType, + valueType, writerFactory, options.commitForceCompact(), options.changelogProducer(), diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java index de94b2e23eff..f23976bedb58 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java @@ -24,6 +24,7 @@ import org.apache.paimon.statistics.TruncateSimpleColStatsCollector; import org.apache.paimon.table.SpecialFields; +import java.util.Collections; import java.util.List; import static org.apache.paimon.CoreOptions.FIELDS_PREFIX; @@ -35,6 +36,11 @@ public class StatsCollectorFactories { public static SimpleColStatsCollector.Factory[] createStatsFactories( CoreOptions options, List fields) { + return createStatsFactories(options, fields, Collections.emptyList()); + } + + public static SimpleColStatsCollector.Factory[] createStatsFactories( + CoreOptions options, List fields, List keyNames) { Options cfg = options.toConfiguration(); SimpleColStatsCollector.Factory[] modes = new SimpleColStatsCollector.Factory[fields.size()]; @@ -47,7 +53,9 @@ public static SimpleColStatsCollector.Factory[] createStatsFactories( .noDefaultValue()); if (fieldMode != null) { modes[i] = SimpleColStatsCollector.from(fieldMode); - } else if (SpecialFields.isSystemField(field)) { + } else if (SpecialFields.isSystemField(field) + || (options.thinMode() + && keyNames.contains(SpecialFields.KEY_FIELD_PREFIX + field))) { modes[i] = () -> new TruncateSimpleColStatsCollector(128); } else { modes[i] = SimpleColStatsCollector.from(cfg.get(CoreOptions.METADATA_STATS_MODE)); diff --git a/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java new file mode 100644 index 000000000000..2c66dfe79be3 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.types.DataTypes; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; + +/** This class test the compatibility and effectiveness of storage thin mode. */ +public class ThinModeReadWriteTest extends TableTestBase { + + private Table createTable(String format, Boolean thinMode) throws Exception { + catalog.createTable(identifier(), schema(format, thinMode), true); + return catalog.getTable(identifier()); + } + + private Schema schema(String format, Boolean thinMode) { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.INT()); + schemaBuilder.column("f2", DataTypes.SMALLINT()); + schemaBuilder.column("f3", DataTypes.STRING()); + schemaBuilder.column("f4", DataTypes.DOUBLE()); + schemaBuilder.column("f5", DataTypes.CHAR(100)); + schemaBuilder.column("f6", DataTypes.VARCHAR(100)); + schemaBuilder.column("f7", DataTypes.BOOLEAN()); + schemaBuilder.column("f8", DataTypes.INT()); + schemaBuilder.column("f9", DataTypes.TIME()); + schemaBuilder.column("f10", DataTypes.TIMESTAMP()); + schemaBuilder.column("f11", DataTypes.DECIMAL(10, 2)); + schemaBuilder.column("f12", DataTypes.BYTES()); + schemaBuilder.column("f13", DataTypes.FLOAT()); + schemaBuilder.column("f14", DataTypes.BINARY(100)); + schemaBuilder.column("f15", DataTypes.VARBINARY(100)); + schemaBuilder.primaryKey( + "f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12", + "f13"); + schemaBuilder.option("bucket", "1"); + schemaBuilder.option("bucket-key", "f1"); + schemaBuilder.option("file.format", format); + schemaBuilder.option("storage.thin-mode", thinMode.toString()); + return schemaBuilder.build(); + } + + @Test + public void testThinModeWorks() throws Exception { + + InternalRow[] datas = datas(200000); + + Table table = createTable("orc", true); + write(table, datas); + + long size1 = tableSize(table); + dropTableDefault(); + + table = createTable("orc", false); + write(table, datas); + long size2 = tableSize(table); + dropTableDefault(); + + Assertions.assertThat(size2).isGreaterThan(size1); + } + + @Test + public void testAllFormatReadWrite() throws Exception { + testFormat("orc"); + testFormat("parquet"); + testFormat("avro"); + } + + private void testFormat(String format) throws Exception { + testReadWrite(format, true, true); + testReadWrite(format, true, false); + testReadWrite(format, false, true); + testReadWrite(format, false, false); + } + + private void testReadWrite(String format, boolean writeThin, boolean readThin) + throws Exception { + Table tableWrite = createTable(format, writeThin); + Table tableRead = setThinMode(tableWrite, readThin); + + InternalRow[] datas = datas(2000); + + write(tableWrite, datas); + + List readed = read(tableRead); + + Assertions.assertThat(readed).containsExactlyInAnyOrder(datas); + dropTableDefault(); + } + + private Table setThinMode(Table table, Boolean flag) { + return table.copy( + new HashMap() { + { + put("storage.thin-mode", flag.toString()); + } + }); + } + + InternalRow[] datas(int i) { + InternalRow[] arrays = new InternalRow[i]; + for (int j = 0; j < i; j++) { + arrays[j] = data(); + } + return arrays; + } + + protected InternalRow data() { + return GenericRow.of( + RANDOM.nextInt(), + RANDOM.nextInt(), + (short) RANDOM.nextInt(), + randomString(), + RANDOM.nextDouble(), + randomString(), + randomString(), + RANDOM.nextBoolean(), + RANDOM.nextInt(), + RANDOM.nextInt(), + Timestamp.now(), + Decimal.zero(10, 2), + randomBytes(), + (float) RANDOM.nextDouble(), + randomBytes(), + randomBytes()); + } + + public static long tableSize(Table table) throws Exception { + long count = 0; + List files = + ((FileStoreTable) table).store().newScan().plan().files(FileKind.ADD); + for (ManifestEntry file : files) { + count += file.file().fileSize(); + } + + return count; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java index bbfa4d3659b6..e2967a2ff67d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java @@ -192,6 +192,9 @@ public void testRewriteSuccess(boolean rewriteChangelog) throws Exception { private KeyValueFileWriterFactory createWriterFactory( Path path, RowType keyType, RowType valueType) { + Options options = new Options(); + options.set(CoreOptions.STORAGE_THIN_MODE, false); + return KeyValueFileWriterFactory.builder( LocalFileIO.create(), 0, @@ -200,7 +203,7 @@ private KeyValueFileWriterFactory createWriterFactory( new FlushingFileFormat("avro"), Collections.singletonMap("avro", createNonPartFactory(path)), VALUE_128_MB.getBytes()) - .build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(new Options())); + .build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(options)); } private KeyValueFileReaderFactory createReaderFactory( diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index 0ab636c33aa3..066c84275680 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -41,6 +41,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.SchemaEvolutionTableTestBase; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; @@ -77,7 +78,9 @@ public class ContainsLevelsTest { private final Comparator comparator = Comparator.comparingInt(o -> o.getInt(0)); - private final RowType keyType = DataTypes.ROW(DataTypes.FIELD(0, "_key", DataTypes.INT())); + private final RowType keyType = + DataTypes.ROW( + DataTypes.FIELD(SpecialFields.KEY_FIELD_ID_START, "_key", DataTypes.INT())); private final RowType rowType = DataTypes.ROW( DataTypes.FIELD(0, "key", DataTypes.INT()), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 2dce81ce56b4..eba88272c504 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -41,6 +41,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.SchemaEvolutionTableTestBase; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; @@ -79,7 +80,9 @@ public class LookupLevelsTest { private final Comparator comparator = Comparator.comparingInt(o -> o.getInt(0)); - private final RowType keyType = DataTypes.ROW(DataTypes.FIELD(0, "_key", DataTypes.INT())); + private final RowType keyType = + DataTypes.ROW( + DataTypes.FIELD(SpecialFields.KEY_FIELD_ID_START, "_key", DataTypes.INT())); private final RowType rowType = DataTypes.ROW( DataTypes.FIELD(0, "key", DataTypes.INT()), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index f2a9c44dd7ce..af6c442d861d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -143,6 +143,7 @@ private void recreateMergeTree(long targetFileSize) { options.set( CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER, options.get(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER) + 1); + options.set(CoreOptions.STORAGE_THIN_MODE, false); this.options = new CoreOptions(options); RowType keyType = new RowType(singletonList(new DataField(0, "k", new IntType()))); RowType valueType = new RowType(singletonList(new DataField(1, "v", new IntType()))); @@ -513,6 +514,8 @@ private MergeTreeWriter createMergeTreeWriter( maxSequenceNumber, comparator, DeduplicateMergeFunction.factory().create(), + writerFactory.keyType(), + writerFactory.valueType(), writerFactory, options.commitForceCompact(), changelogProducer, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java index 46b64422fd9b..0e5769255680 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java @@ -287,7 +287,7 @@ private TestFileStore createStore( .map(field -> field.replace("key_", "")), partitionType.getFieldNames().stream()) .collect(Collectors.toList()), - Collections.emptyMap(), + Collections.singletonMap("storage.thin-mode", "false"), null); TableSchema tableSchema = schemaManager.createTable(schema); return new TestFileStore.Builder( diff --git a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java index 25282d898a3d..b9fc4da160ac 100644 --- a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java @@ -86,10 +86,10 @@ public void testPartitionStatsNotDense() throws Exception { manifestFile.read(manifest.fileName(), manifest.fileSize()).get(0).file(); SimpleStats recordStats = file.valueStats(); assertThat(recordStats.minValues().isNullAt(0)).isTrue(); - assertThat(recordStats.minValues().isNullAt(1)).isTrue(); + assertThat(recordStats.minValues().isNullAt(1)).isFalse(); assertThat(recordStats.minValues().isNullAt(2)).isTrue(); assertThat(recordStats.maxValues().isNullAt(0)).isTrue(); - assertThat(recordStats.maxValues().isNullAt(1)).isTrue(); + assertThat(recordStats.maxValues().isNullAt(1)).isFalse(); assertThat(recordStats.maxValues().isNullAt(2)).isTrue(); } @@ -135,9 +135,9 @@ public void testPartitionStatsDenseMode() throws Exception { DataFileMeta file = manifestFile.read(manifest.fileName(), manifest.fileSize()).get(0).file(); SimpleStats recordStats = file.valueStats(); - assertThat(file.valueStatsCols()).isEmpty(); - assertThat(recordStats.minValues().getFieldCount()).isEqualTo(0); - assertThat(recordStats.maxValues().getFieldCount()).isEqualTo(0); - assertThat(recordStats.nullCounts().size()).isEqualTo(0); + assertThat(file.valueStatsCols().size()).isEqualTo(1); + assertThat(recordStats.minValues().getFieldCount()).isEqualTo(1); + assertThat(recordStats.maxValues().getFieldCount()).isEqualTo(1); + assertThat(recordStats.nullCounts().size()).isEqualTo(1); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index fa635e2ab666..e44b649534d1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -186,7 +186,8 @@ public void testAsyncReader() throws Exception { @Test public void testBatchWriteBuilder() throws Exception { - FileStoreTable table = createFileStoreTable(); + FileStoreTable table = + createFileStoreTable().copy(Collections.singletonMap("storage.thin-mode", "true")); BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); BatchTableWrite write = writeBuilder.newWrite(); BatchTableCommit commit = writeBuilder.newCommit(); @@ -352,6 +353,7 @@ public void testBatchFilter(boolean statsDenseStore) throws Exception { if (statsDenseStore) { // pk table doesn't need value stats options.set(CoreOptions.METADATA_STATS_MODE, "none"); + // options.set(CoreOptions.STORAGE_THIN_MODE, false); } }; writeData(optionsSetter); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java index 7f850a7725b4..f8ab2cffade9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java @@ -160,6 +160,10 @@ protected void compact( } } + public void dropTableDefault() throws Exception { + catalog.dropTable(identifier(), true); + } + protected List read(Table table, Pair, String>... dynamicOptions) throws Exception { return read(table, null, dynamicOptions); @@ -175,6 +179,8 @@ protected List read( options.put(pair.getKey().key(), pair.getValue()); } table = table.copy(options); + // PredicateBuilder predicateBuilder = new PredicateBuilder(); + // predicateBuilder.equal() ReadBuilder readBuilder = table.newReadBuilder(); if (projection != null) { readBuilder.withProjection(projection); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index d2bb9eb98274..9dfaba60c78e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -167,8 +167,10 @@ public List writeFiles( } public RecordWriter createMergeTreeWriter(BinaryRow partition, int bucket) { - CoreOptions options = - new CoreOptions(Collections.singletonMap(CoreOptions.FILE_FORMAT.key(), "avro")); + Map optionMap = new HashMap<>(); + optionMap.put(CoreOptions.FILE_FORMAT.key(), "avro"); + optionMap.put(CoreOptions.STORAGE_THIN_MODE.key(), "false"); + CoreOptions options = new CoreOptions(optionMap); Map pathFactoryMap = new HashMap<>(); pathFactoryMap.put("avro", pathFactory); From 7461d4336c101381c97d6b6b13c675b27c1bafc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 14:31:02 +0800 Subject: [PATCH 02/33] fix comment --- .../paimon/format/ThinModeReadWriteTest.java | 30 ++++++------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java index 2c66dfe79be3..56f6688953e4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java @@ -33,7 +33,6 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; -import java.util.HashMap; import java.util.List; /** This class test the compatibility and effectiveness of storage thin mode. */ @@ -99,36 +98,25 @@ public void testAllFormatReadWrite() throws Exception { } private void testFormat(String format) throws Exception { - testReadWrite(format, true, true); - testReadWrite(format, true, false); - testReadWrite(format, false, true); - testReadWrite(format, false, false); + testReadWrite(format, true); + testReadWrite(format, true); + testReadWrite(format, false); + testReadWrite(format, false); } - private void testReadWrite(String format, boolean writeThin, boolean readThin) - throws Exception { - Table tableWrite = createTable(format, writeThin); - Table tableRead = setThinMode(tableWrite, readThin); + private void testReadWrite(String format, boolean writeThin) throws Exception { + Table table = createTable(format, writeThin); InternalRow[] datas = datas(2000); - write(tableWrite, datas); + write(table, datas); - List readed = read(tableRead); + List readed = read(table); Assertions.assertThat(readed).containsExactlyInAnyOrder(datas); dropTableDefault(); } - private Table setThinMode(Table table, Boolean flag) { - return table.copy( - new HashMap() { - { - put("storage.thin-mode", flag.toString()); - } - }); - } - InternalRow[] datas(int i) { InternalRow[] arrays = new InternalRow[i]; for (int j = 0; j < i; j++) { @@ -157,7 +145,7 @@ protected InternalRow data() { randomBytes()); } - public static long tableSize(Table table) throws Exception { + public static long tableSize(Table table) { long count = 0; List files = ((FileStoreTable) table).store().newScan().plan().files(FileKind.ADD); From b1aa915e05f41e75783157475fd939cef478c3fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 14:31:43 +0800 Subject: [PATCH 03/33] fix comment --- paimon-common/src/main/java/org/apache/paimon/CoreOptions.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 965a704e29cf..37d208bc7c92 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1456,7 +1456,8 @@ public class CoreOptions implements Serializable { key("storage.thin-mode") .booleanType() .defaultValue(true) - .withDescription("Enable storage thin mode to avoid duplicate columns store."); + .withDescription( + "Enable storage thin mode to avoid duplicate columns storage."); @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption MATERIALIZED_TABLE_DEFINITION_QUERY = From 5ddc7aca903f2c751bcb42ea17c470ce196bee53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 14:33:59 +0800 Subject: [PATCH 04/33] fix comment --- docs/layouts/shortcodes/generated/core_configuration.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index ff47f4c36625..db0119f3b082 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -868,7 +868,7 @@
storage.thin-mode
true Boolean - Enable storage thin mode to avoid duplicate columns store. + Enable storage thin mode to avoid duplicate columns storage.
streaming-read-mode
From 6272678fd498d2a3c8df3e5c2051ac135b7413c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 14:50:23 +0800 Subject: [PATCH 05/33] fix comment --- .../paimon/io/KeyValueDataFileWriter.java | 74 ++++++++++--------- 1 file changed, 41 insertions(+), 33 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index 55e1c56ea192..d601f1824238 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -43,7 +43,6 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -80,8 +79,7 @@ public class KeyValueDataFileWriter private long minSeqNumber = Long.MAX_VALUE; private long maxSeqNumber = Long.MIN_VALUE; private long deleteRecordCount = 0; - @Nullable private final KeyStateAbstractor keyStateAbstractor; - private final boolean thinMode; + private final KeyStateAbstractor keyStateAbstractor; public KeyValueDataFileWriter( FileIO fileIO, @@ -124,8 +122,7 @@ public KeyValueDataFileWriter( this.dataFileIndexWriter = DataFileIndexWriter.create( fileIO, dataFileToFileIndexPath(path), valueType, fileIndexOptions); - this.thinMode = options.thinMode(); - this.keyStateAbstractor = thinMode ? new KeyStateAbstractor(keyType, valueType) : null; + this.keyStateAbstractor = new KeyStateAbstractor(keyType, valueType, options.thinMode()); } @Override @@ -176,19 +173,12 @@ public DataFileMeta result() throws IOException { return null; } - SimpleColStats[] rowStats = fieldStats(); - int numKeyFields = keyType.getFieldCount(); + Pair keyValueStats = + keyStateAbstractor.abstractFromValueState(fieldStats()); - int valueFrom = thinMode ? 2 : numKeyFields + 2; - SimpleColStats[] valFieldStats = Arrays.copyOfRange(rowStats, valueFrom, rowStats.length); + SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyValueStats.getKey()); Pair, SimpleStats> valueStatsPair = - valueStatsConverter.toBinary(valFieldStats); - - SimpleColStats[] keyFieldStats = - thinMode - ? keyStateAbstractor.abstractFromValueState(valFieldStats) - : Arrays.copyOfRange(rowStats, 0, numKeyFields); - SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyFieldStats); + valueStatsConverter.toBinary(keyValueStats.getValue()); DataFileIndexWriter.FileIndexResult indexResult = dataFileIndexWriter == null @@ -225,31 +215,49 @@ public void close() throws IOException { } private static class KeyStateAbstractor { - - private final int[] keyStatMapping; - - public KeyStateAbstractor(RowType keyType, RowType valueType) { - + private final int numKeyFields; + private final int numValueFields; + // if keyStatMapping is not null, means thin mode on. + @Nullable private final int[] keyStatMapping; + + public KeyStateAbstractor(RowType keyType, RowType valueType, boolean thinMode) { + this.numKeyFields = keyType.getFieldCount(); + this.numValueFields = valueType.getFieldCount(); Map idToIndex = new HashMap<>(); for (int i = 0; i < valueType.getFieldCount(); i++) { idToIndex.put(valueType.getFields().get(i).id(), i); } - - keyStatMapping = new int[keyType.getFieldCount()]; - - for (int i = 0; i < keyType.getFieldCount(); i++) { - keyStatMapping[i] = - idToIndex.get( - keyType.getFields().get(i).id() - SpecialFields.KEY_FIELD_ID_START); + if (thinMode) { + this.keyStatMapping = new int[keyType.getFieldCount()]; + for (int i = 0; i < keyType.getFieldCount(); i++) { + keyStatMapping[i] = + idToIndex.get( + keyType.getFields().get(i).id() + - SpecialFields.KEY_FIELD_ID_START); + } + } else { + this.keyStatMapping = null; } } - SimpleColStats[] abstractFromValueState(SimpleColStats[] valueStats) { - SimpleColStats[] keyStats = new SimpleColStats[keyStatMapping.length]; - for (int i = 0; i < keyStatMapping.length; i++) { - keyStats[i] = valueStats[keyStatMapping[i]]; + Pair abstractFromValueState(SimpleColStats[] rowStats) { + SimpleColStats[] keyStats = new SimpleColStats[numKeyFields]; + SimpleColStats[] valFieldStats = new SimpleColStats[numValueFields]; + + int valueFrom = thinMode() ? 2 : numKeyFields + 2; + System.arraycopy(rowStats, valueFrom, valFieldStats, 0, numValueFields); + if (thinMode()) { + for (int i = 0; i < keyStatMapping.length; i++) { + keyStats[i] = valFieldStats[keyStatMapping[i]]; + } + } else { + System.arraycopy(valFieldStats, 0, keyStats, 0, numKeyFields); } - return keyStats; + return Pair.of(keyStats, valFieldStats); + } + + private boolean thinMode() { + return keyStatMapping != null; } } } From 068824b14c8a92a279d50a3d4a2ce1c3ede0acf5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 14:51:49 +0800 Subject: [PATCH 06/33] fix comment --- docs/layouts/shortcodes/generated/core_configuration.html | 2 +- paimon-common/src/main/java/org/apache/paimon/CoreOptions.java | 2 +- .../java/org/apache/paimon/format/ThinModeReadWriteTest.java | 2 +- .../org/apache/paimon/operation/MergeFileSplitReadTest.java | 2 +- .../org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java | 3 ++- 5 files changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index db0119f3b082..422d0c4b8cb3 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -865,7 +865,7 @@ Default spill compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease. -
storage.thin-mode
+
data-file.thin-mode
true Boolean Enable storage thin mode to avoid duplicate columns storage. diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 37d208bc7c92..5f4746545040 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1453,7 +1453,7 @@ public class CoreOptions implements Serializable { + " Default value is false only for compatibility of old reader."); public static final ConfigOption STORAGE_THIN_MODE = - key("storage.thin-mode") + key("data-file.thin-mode") .booleanType() .defaultValue(true) .withDescription( diff --git a/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java index 56f6688953e4..3f8015b33b2d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java @@ -67,7 +67,7 @@ private Schema schema(String format, Boolean thinMode) { schemaBuilder.option("bucket", "1"); schemaBuilder.option("bucket-key", "f1"); schemaBuilder.option("file.format", format); - schemaBuilder.option("storage.thin-mode", thinMode.toString()); + schemaBuilder.option("data-file.thin-mode", thinMode.toString()); return schemaBuilder.build(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java index 0e5769255680..a5b7fdd9f901 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java @@ -287,7 +287,7 @@ private TestFileStore createStore( .map(field -> field.replace("key_", "")), partitionType.getFieldNames().stream()) .collect(Collectors.toList()), - Collections.singletonMap("storage.thin-mode", "false"), + Collections.singletonMap("data-file.thin-mode", "false"), null); TableSchema tableSchema = schemaManager.createTable(schema); return new TestFileStore.Builder( diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index e44b649534d1..a3dce02e15fb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -187,7 +187,8 @@ public void testAsyncReader() throws Exception { @Test public void testBatchWriteBuilder() throws Exception { FileStoreTable table = - createFileStoreTable().copy(Collections.singletonMap("storage.thin-mode", "true")); + createFileStoreTable() + .copy(Collections.singletonMap("data-file.thin-mode", "true")); BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); BatchTableWrite write = writeBuilder.newWrite(); BatchTableCommit commit = writeBuilder.newCommit(); From 29f9056904159778d1934d23dbe26afff1732870 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 14:53:40 +0800 Subject: [PATCH 07/33] fix comment --- docs/layouts/shortcodes/generated/core_configuration.html | 2 +- .../src/main/java/org/apache/paimon/CoreOptions.java | 6 +++--- .../paimon/mergetree/ChangelogMergeTreeRewriterTest.java | 2 +- .../java/org/apache/paimon/mergetree/MergeTreeTestBase.java | 2 +- .../paimon/flink/source/TestChangelogDataReadWrite.java | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 422d0c4b8cb3..9200c48fb00e 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -868,7 +868,7 @@
data-file.thin-mode
true Boolean - Enable storage thin mode to avoid duplicate columns storage. + Enable data file thin mode to avoid duplicate columns storage.
streaming-read-mode
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 5f4746545040..8d5602602c5a 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1452,12 +1452,12 @@ public class CoreOptions implements Serializable { "For DELETE manifest entry in manifest file, drop stats to reduce memory and storage." + " Default value is false only for compatibility of old reader."); - public static final ConfigOption STORAGE_THIN_MODE = + public static final ConfigOption DATA_FILE_THIN_MODE = key("data-file.thin-mode") .booleanType() .defaultValue(true) .withDescription( - "Enable storage thin mode to avoid duplicate columns storage."); + "Enable data file thin mode to avoid duplicate columns storage."); @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption MATERIALIZED_TABLE_DEFINITION_QUERY = @@ -2364,7 +2364,7 @@ public boolean statsDenseStore() { } public boolean thinMode() { - return options.get(STORAGE_THIN_MODE); + return options.get(DATA_FILE_THIN_MODE); } /** Specifies the merge engine for table with primary key. */ diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java index e2967a2ff67d..dae6d08d7cdd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java @@ -193,7 +193,7 @@ public void testRewriteSuccess(boolean rewriteChangelog) throws Exception { private KeyValueFileWriterFactory createWriterFactory( Path path, RowType keyType, RowType valueType) { Options options = new Options(); - options.set(CoreOptions.STORAGE_THIN_MODE, false); + options.set(CoreOptions.DATA_FILE_THIN_MODE, false); return KeyValueFileWriterFactory.builder( LocalFileIO.create(), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index af6c442d861d..727f684801ff 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -143,7 +143,7 @@ private void recreateMergeTree(long targetFileSize) { options.set( CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER, options.get(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER) + 1); - options.set(CoreOptions.STORAGE_THIN_MODE, false); + options.set(CoreOptions.DATA_FILE_THIN_MODE, false); this.options = new CoreOptions(options); RowType keyType = new RowType(singletonList(new DataField(0, "k", new IntType()))); RowType valueType = new RowType(singletonList(new DataField(1, "v", new IntType()))); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 9dfaba60c78e..70c0a0ea9183 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -169,7 +169,7 @@ public List writeFiles( public RecordWriter createMergeTreeWriter(BinaryRow partition, int bucket) { Map optionMap = new HashMap<>(); optionMap.put(CoreOptions.FILE_FORMAT.key(), "avro"); - optionMap.put(CoreOptions.STORAGE_THIN_MODE.key(), "false"); + optionMap.put(CoreOptions.DATA_FILE_THIN_MODE.key(), "false"); CoreOptions options = new CoreOptions(optionMap); Map pathFactoryMap = new HashMap<>(); From 2cf0a0d1793ab71ae8afc033762aded5da7b3e5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 14:54:26 +0800 Subject: [PATCH 08/33] fix comment --- .../src/test/java/org/apache/paimon/table/TableTestBase.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java index f8ab2cffade9..7d7617cf8bd1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java @@ -179,8 +179,6 @@ protected List read( options.put(pair.getKey().key(), pair.getValue()); } table = table.copy(options); - // PredicateBuilder predicateBuilder = new PredicateBuilder(); - // predicateBuilder.equal() ReadBuilder readBuilder = table.newReadBuilder(); if (projection != null) { readBuilder.withProjection(projection); From ddca0f9487bf56276e7b5b756741482c98d82d0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 14:55:30 +0800 Subject: [PATCH 09/33] fix comment --- .../org/apache/paimon/io/KeyValueDataFileWriter.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index d601f1824238..55d7d9635615 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -79,7 +79,7 @@ public class KeyValueDataFileWriter private long minSeqNumber = Long.MAX_VALUE; private long maxSeqNumber = Long.MIN_VALUE; private long deleteRecordCount = 0; - private final KeyStateAbstractor keyStateAbstractor; + private final StateAbstractor stateAbstractor; public KeyValueDataFileWriter( FileIO fileIO, @@ -122,7 +122,7 @@ public KeyValueDataFileWriter( this.dataFileIndexWriter = DataFileIndexWriter.create( fileIO, dataFileToFileIndexPath(path), valueType, fileIndexOptions); - this.keyStateAbstractor = new KeyStateAbstractor(keyType, valueType, options.thinMode()); + this.stateAbstractor = new StateAbstractor(keyType, valueType, options.thinMode()); } @Override @@ -174,7 +174,7 @@ public DataFileMeta result() throws IOException { } Pair keyValueStats = - keyStateAbstractor.abstractFromValueState(fieldStats()); + stateAbstractor.abstractFromValueState(fieldStats()); SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyValueStats.getKey()); Pair, SimpleStats> valueStatsPair = @@ -214,13 +214,13 @@ public void close() throws IOException { super.close(); } - private static class KeyStateAbstractor { + private static class StateAbstractor { private final int numKeyFields; private final int numValueFields; // if keyStatMapping is not null, means thin mode on. @Nullable private final int[] keyStatMapping; - public KeyStateAbstractor(RowType keyType, RowType valueType, boolean thinMode) { + public StateAbstractor(RowType keyType, RowType valueType, boolean thinMode) { this.numKeyFields = keyType.getFieldCount(); this.numValueFields = valueType.getFieldCount(); Map idToIndex = new HashMap<>(); From 1adbe5c9a6c57401d86fc6cce8b9e7c6402d51d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 14:59:24 +0800 Subject: [PATCH 10/33] fix comment --- .../org/apache/paimon/io/KeyValueDataFileWriter.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index 55d7d9635615..60b4885ab2a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -174,7 +174,7 @@ public DataFileMeta result() throws IOException { } Pair keyValueStats = - stateAbstractor.abstractFromValueState(fieldStats()); + stateAbstractor.fetchKeyValueStats(fieldStats()); SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyValueStats.getKey()); Pair, SimpleStats> valueStatsPair = @@ -240,17 +240,22 @@ public StateAbstractor(RowType keyType, RowType valueType, boolean thinMode) { } } - Pair abstractFromValueState(SimpleColStats[] rowStats) { + Pair fetchKeyValueStats(SimpleColStats[] rowStats) { SimpleColStats[] keyStats = new SimpleColStats[numKeyFields]; SimpleColStats[] valFieldStats = new SimpleColStats[numValueFields]; + // If thin mode only, there is no key stats in rowStats, so we only jump + // _SEQUNCE_NUMBER_ and _ROW_KIND_ stats. Therefore, the 'from' value is 2. + // Otherwise, we need to jump key stats, so the 'from' value is numKeyFields + 2. int valueFrom = thinMode() ? 2 : numKeyFields + 2; System.arraycopy(rowStats, valueFrom, valFieldStats, 0, numValueFields); if (thinMode()) { + // Thin mode on, so need to map value stats to key stats. for (int i = 0; i < keyStatMapping.length; i++) { keyStats[i] = valFieldStats[keyStatMapping[i]]; } } else { + // Thin mode off, just copy stats from rowStats. System.arraycopy(valFieldStats, 0, keyStats, 0, numKeyFields); } return Pair.of(keyStats, valFieldStats); From d10a3b221bbc82eddb551189aaeab71d2a8cc673 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 15:00:33 +0800 Subject: [PATCH 11/33] fix comment --- .../java/org/apache/paimon/io/KeyValueFileWriterFactory.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 2e25c4b1e539..87f53ab5b013 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -196,14 +196,12 @@ private Builder( public KeyValueFileWriterFactory build( BinaryRow partition, int bucket, CoreOptions options) { - RowType finalKeyType = options.thinMode() ? RowType.of() : keyType; - RowType writeKeyType = KeyValue.schema(finalKeyType, valueType); WriteFormatContext context = new WriteFormatContext( partition, bucket, keyType, - writeKeyType, + KeyValue.schema(options.thinMode() ? RowType.of() : keyType, valueType), fileFormat, format2PathFactory, options); From df6c35302691b79caf1bb499f9f762f217355572 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 15:23:14 +0800 Subject: [PATCH 12/33] fix comment --- paimon-core/src/main/java/org/apache/paimon/KeyValue.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java index 75e0a0659303..36ac88996ce3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java @@ -114,10 +114,6 @@ public static RowType schema(RowType keyType, RowType valueType) { return new RowType(createKeyValueFields(keyType.getFields(), valueType.getFields())); } - public static RowType schema(RowType valueType) { - return schema(RowType.of(), valueType); - } - public static RowType schemaWithLevel(RowType keyType, RowType valueType) { List fields = new ArrayList<>(schema(keyType, valueType).getFields()); fields.add(LEVEL); From e01905353d440411592eacfa1da2b964c51a93ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 17:11:51 +0800 Subject: [PATCH 13/33] fix comment --- .../paimon/io/KeyValueDataFileWriter.java | 68 +-------- .../paimon/io/KeyValueDataFileWriterImpl.java | 80 +++++++++++ .../paimon/io/KeyValueFileWriterFactory.java | 48 ++++--- .../io/KeyValueThinDataFileWriterImpl.java | 130 ++++++++++++++++++ 4 files changed, 245 insertions(+), 81 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index 60b4885ab2a4..2e981ac507fb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -32,7 +32,6 @@ import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.stats.SimpleStatsConverter; -import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.StatsCollectorFactories; @@ -44,9 +43,7 @@ import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.function.Function; import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath; @@ -58,13 +55,13 @@ *

NOTE: records given to the writer must be sorted because it does not compare the min max keys * to produce {@link DataFileMeta}. */ -public class KeyValueDataFileWriter +public abstract class KeyValueDataFileWriter extends StatsCollectingSingleFileWriter { private static final Logger LOG = LoggerFactory.getLogger(KeyValueDataFileWriter.class); - private final RowType keyType; - private final RowType valueType; + protected final RowType keyType; + protected final RowType valueType; private final long schemaId; private final int level; @@ -79,7 +76,6 @@ public class KeyValueDataFileWriter private long minSeqNumber = Long.MAX_VALUE; private long maxSeqNumber = Long.MIN_VALUE; private long deleteRecordCount = 0; - private final StateAbstractor stateAbstractor; public KeyValueDataFileWriter( FileIO fileIO, @@ -122,7 +118,6 @@ public KeyValueDataFileWriter( this.dataFileIndexWriter = DataFileIndexWriter.create( fileIO, dataFileToFileIndexPath(path), valueType, fileIndexOptions); - this.stateAbstractor = new StateAbstractor(keyType, valueType, options.thinMode()); } @Override @@ -173,8 +168,7 @@ public DataFileMeta result() throws IOException { return null; } - Pair keyValueStats = - stateAbstractor.fetchKeyValueStats(fieldStats()); + Pair keyValueStats = fetchKeyValueStats(fieldStats()); SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyValueStats.getKey()); Pair, SimpleStats> valueStatsPair = @@ -206,6 +200,8 @@ public DataFileMeta result() throws IOException { valueStatsPair.getKey()); } + abstract Pair fetchKeyValueStats(SimpleColStats[] rowStats); + @Override public void close() throws IOException { if (dataFileIndexWriter != null) { @@ -213,56 +209,4 @@ public void close() throws IOException { } super.close(); } - - private static class StateAbstractor { - private final int numKeyFields; - private final int numValueFields; - // if keyStatMapping is not null, means thin mode on. - @Nullable private final int[] keyStatMapping; - - public StateAbstractor(RowType keyType, RowType valueType, boolean thinMode) { - this.numKeyFields = keyType.getFieldCount(); - this.numValueFields = valueType.getFieldCount(); - Map idToIndex = new HashMap<>(); - for (int i = 0; i < valueType.getFieldCount(); i++) { - idToIndex.put(valueType.getFields().get(i).id(), i); - } - if (thinMode) { - this.keyStatMapping = new int[keyType.getFieldCount()]; - for (int i = 0; i < keyType.getFieldCount(); i++) { - keyStatMapping[i] = - idToIndex.get( - keyType.getFields().get(i).id() - - SpecialFields.KEY_FIELD_ID_START); - } - } else { - this.keyStatMapping = null; - } - } - - Pair fetchKeyValueStats(SimpleColStats[] rowStats) { - SimpleColStats[] keyStats = new SimpleColStats[numKeyFields]; - SimpleColStats[] valFieldStats = new SimpleColStats[numValueFields]; - - // If thin mode only, there is no key stats in rowStats, so we only jump - // _SEQUNCE_NUMBER_ and _ROW_KIND_ stats. Therefore, the 'from' value is 2. - // Otherwise, we need to jump key stats, so the 'from' value is numKeyFields + 2. - int valueFrom = thinMode() ? 2 : numKeyFields + 2; - System.arraycopy(rowStats, valueFrom, valFieldStats, 0, numValueFields); - if (thinMode()) { - // Thin mode on, so need to map value stats to key stats. - for (int i = 0; i < keyStatMapping.length; i++) { - keyStats[i] = valFieldStats[keyStatMapping[i]]; - } - } else { - // Thin mode off, just copy stats from rowStats. - System.arraycopy(valFieldStats, 0, keyStats, 0, numKeyFields); - } - return Pair.of(keyStats, valFieldStats); - } - - private boolean thinMode() { - return keyStatMapping != null; - } - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java new file mode 100644 index 000000000000..fd69377661f8 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.SimpleColStats; +import org.apache.paimon.format.SimpleStatsExtractor; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.function.Function; + +/** Write data files containing {@link KeyValue}s. */ +public class KeyValueDataFileWriterImpl extends KeyValueDataFileWriter { + + public KeyValueDataFileWriterImpl( + FileIO fileIO, + FormatWriterFactory factory, + Path path, + Function converter, + RowType keyType, + RowType valueType, + @Nullable SimpleStatsExtractor simpleStatsExtractor, + long schemaId, + int level, + String compression, + CoreOptions options, + FileSource fileSource, + FileIndexOptions fileIndexOptions) { + super( + fileIO, + factory, + path, + converter, + keyType, + valueType, + simpleStatsExtractor, + schemaId, + level, + compression, + options, + fileSource, + fileIndexOptions); + } + + @Override + Pair fetchKeyValueStats(SimpleColStats[] rowStats) { + int numKeyFields = keyType.getFieldCount(); + int numValueFields = valueType.getFieldCount(); + return Pair.of( + Arrays.copyOfRange(rowStats, 0, numKeyFields), + Arrays.copyOfRange(rowStats, numKeyFields + 2, numValueFields)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 87f53ab5b013..c1996936c8a3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -34,7 +34,6 @@ import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; -import org.apache.paimon.utils.ObjectSerializer; import org.apache.paimon.utils.StatsCollectorFactories; import javax.annotation.Nullable; @@ -109,24 +108,35 @@ public RollingFileWriter createRollingChangelogFileWrite private KeyValueDataFileWriter createDataFileWriter( Path path, int level, FileSource fileSource) { - ObjectSerializer kvSerializer = - options.thinMode() - ? new KeyValueThinSerializer(keyType, valueType) - : new KeyValueSerializer(keyType, valueType); - return new KeyValueDataFileWriter( - fileIO, - formatContext.writerFactory(level), - path, - kvSerializer::toRow, - keyType, - valueType, - formatContext.extractor(level), - schemaId, - level, - formatContext.compression(level), - options, - fileSource, - fileIndexOptions); + return options.thinMode() + ? new KeyValueThinDataFileWriterImpl( + fileIO, + formatContext.writerFactory(level), + path, + new KeyValueThinSerializer(keyType, valueType)::toRow, + keyType, + valueType, + formatContext.extractor(level), + schemaId, + level, + formatContext.compression(level), + options, + fileSource, + fileIndexOptions) + : new KeyValueDataFileWriterImpl( + fileIO, + formatContext.writerFactory(level), + path, + new KeyValueSerializer(keyType, valueType)::toRow, + keyType, + valueType, + formatContext.extractor(level), + schemaId, + level, + formatContext.compression(level), + options, + fileSource, + fileIndexOptions); } public void deleteFile(String filename, int level) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java new file mode 100644 index 000000000000..1b93645a7b5e --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.SimpleColStats; +import org.apache.paimon.format.SimpleStatsExtractor; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +/** + * Implementation of KeyValueDataFileWriter for thin data files. Thin data files only contain + * _SEQUENCE_NUMBER_, _ROW_KIND_ and value fields. + */ +public class KeyValueThinDataFileWriterImpl extends KeyValueDataFileWriter { + + private final int[] keyStatMapping; + + /** + * Constructs a KeyValueThinDataFileWriterImpl. + * + * @param fileIO The file IO interface. + * @param factory The format writer factory. + * @param path The path to the file. + * @param converter The function to convert KeyValue to InternalRow. + * @param keyType The row type of the key. + * @param valueType The row type of the value. + * @param simpleStatsExtractor The simple stats extractor, can be null. + * @param schemaId The schema ID. + * @param level The level. + * @param compression The compression type. + * @param options The core options. + * @param fileSource The file source. + * @param fileIndexOptions The file index options. + */ + public KeyValueThinDataFileWriterImpl( + FileIO fileIO, + FormatWriterFactory factory, + Path path, + Function converter, + RowType keyType, + RowType valueType, + @Nullable SimpleStatsExtractor simpleStatsExtractor, + long schemaId, + int level, + String compression, + CoreOptions options, + FileSource fileSource, + FileIndexOptions fileIndexOptions) { + super( + fileIO, + factory, + path, + converter, + keyType, + valueType, + simpleStatsExtractor, + schemaId, + level, + compression, + options, + fileSource, + fileIndexOptions); + Map idToIndex = new HashMap<>(); + for (int i = 0; i < valueType.getFieldCount(); i++) { + idToIndex.put(valueType.getFields().get(i).id(), i); + } + this.keyStatMapping = new int[keyType.getFieldCount()]; + for (int i = 0; i < keyType.getFieldCount(); i++) { + keyStatMapping[i] = + idToIndex.get( + keyType.getFields().get(i).id() - SpecialFields.KEY_FIELD_ID_START); + } + } + + /** + * Fetches the key and value statistics. + * + * @param rowStats The row statistics. + * @return A pair of key statistics and value statistics. + */ + @Override + Pair fetchKeyValueStats(SimpleColStats[] rowStats) { + int numKeyFields = keyType.getFieldCount(); + int numValueFields = valueType.getFieldCount(); + + SimpleColStats[] keyStats = new SimpleColStats[numKeyFields]; + SimpleColStats[] valFieldStats = new SimpleColStats[numValueFields]; + + // If thin mode only, there is no key stats in rowStats, so we only jump + // _SEQUNCE_NUMBER_ and _ROW_KIND_ stats. Therefore, the 'from' value is 2. + System.arraycopy(rowStats, 2, valFieldStats, 0, numValueFields); + // Thin mode on, so need to map value stats to key stats. + for (int i = 0; i < keyStatMapping.length; i++) { + keyStats[i] = valFieldStats[keyStatMapping[i]]; + } + + return Pair.of(keyStats, valFieldStats); + } +} From 51e9da839e406a3b27043f792f01b8dfc355e61b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 17:20:52 +0800 Subject: [PATCH 14/33] fix comment --- .../java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java index fd69377661f8..fb48d4534b40 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java @@ -72,9 +72,8 @@ public KeyValueDataFileWriterImpl( @Override Pair fetchKeyValueStats(SimpleColStats[] rowStats) { int numKeyFields = keyType.getFieldCount(); - int numValueFields = valueType.getFieldCount(); return Pair.of( Arrays.copyOfRange(rowStats, 0, numKeyFields), - Arrays.copyOfRange(rowStats, numKeyFields + 2, numValueFields)); + Arrays.copyOfRange(rowStats, numKeyFields + 2, rowStats.length)); } } From 52b1601a954fd574a742d2754a2a7c0a1ee0d74e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 17:23:35 +0800 Subject: [PATCH 15/33] fix comment --- .../org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java index 1b93645a7b5e..c9c041b16f81 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java @@ -117,7 +117,7 @@ Pair fetchKeyValueStats(SimpleColStats[] row SimpleColStats[] keyStats = new SimpleColStats[numKeyFields]; SimpleColStats[] valFieldStats = new SimpleColStats[numValueFields]; - // If thin mode only, there is no key stats in rowStats, so we only jump + // In thin mode, there is no key stats in rowStats, so we only jump // _SEQUNCE_NUMBER_ and _ROW_KIND_ stats. Therefore, the 'from' value is 2. System.arraycopy(rowStats, 2, valFieldStats, 0, numValueFields); // Thin mode on, so need to map value stats to key stats. From fe095535428aa803725f570a4c4639d59f0b125e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 17:44:44 +0800 Subject: [PATCH 16/33] fix comment --- .../java/org/apache/paimon/io/KeyValueDataFileWriter.java | 8 +++----- .../org/apache/paimon/io/KeyValueDataFileWriterImpl.java | 1 + .../apache/paimon/io/KeyValueThinDataFileWriterImpl.java | 1 + 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index 2e981ac507fb..651c6a6f7b56 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -84,6 +84,7 @@ public KeyValueDataFileWriter( Function converter, RowType keyType, RowType valueType, + RowType writeRowType, @Nullable SimpleStatsExtractor simpleStatsExtractor, long schemaId, int level, @@ -96,14 +97,11 @@ public KeyValueDataFileWriter( factory, path, converter, - KeyValue.schema(options.thinMode() ? RowType.of() : keyType, valueType), + writeRowType, simpleStatsExtractor, compression, StatsCollectorFactories.createStatsFactories( - options, - KeyValue.schema(options.thinMode() ? RowType.of() : keyType, valueType) - .getFieldNames(), - keyType.getFieldNames()), + options, writeRowType.getFieldNames(), keyType.getFieldNames()), options.asyncFileWrite()); this.keyType = keyType; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java index fb48d4534b40..27a1aef64e36 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java @@ -60,6 +60,7 @@ public KeyValueDataFileWriterImpl( converter, keyType, valueType, + KeyValue.schema(keyType, valueType), simpleStatsExtractor, schemaId, level, diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java index c9c041b16f81..77a4f97e133d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java @@ -84,6 +84,7 @@ public KeyValueThinDataFileWriterImpl( converter, keyType, valueType, + KeyValue.schema(RowType.of(), valueType), simpleStatsExtractor, schemaId, level, From 4124716bd40dabd4ee08719ccd65fb7a88d2193e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 18:38:03 +0800 Subject: [PATCH 17/33] fix comment --- .../java/org/apache/paimon/mergetree/MergeTreeWriter.java | 6 ++---- .../org/apache/paimon/operation/KeyValueFileStoreWrite.java | 2 -- .../java/org/apache/paimon/mergetree/MergeTreeTestBase.java | 2 -- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index 34aab1b767ba..f2a964bae16a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -96,8 +96,6 @@ public MergeTreeWriter( long maxSequenceNumber, Comparator keyComparator, MergeFunction mergeFunction, - RowType keyType, - RowType valueType, KeyValueFileWriterFactory writerFactory, boolean commitForceCompact, ChangelogProducer changelogProducer, @@ -108,8 +106,8 @@ public MergeTreeWriter( this.sortMaxFan = sortMaxFan; this.sortCompression = sortCompression; this.ioManager = ioManager; - this.keyType = keyType; - this.valueType = valueType; + this.keyType = writerFactory.keyType(); + this.valueType = writerFactory.valueType(); this.compactManager = compactManager; this.newSequenceNumber = maxSequenceNumber + 1; this.keyComparator = keyComparator; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 05bf5797e050..d061e181618b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -220,8 +220,6 @@ protected MergeTreeWriter createWriter( restoredMaxSeqNumber, keyComparator, mfFactory.create(), - keyType, - valueType, writerFactory, options.commitForceCompact(), options.changelogProducer(), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 727f684801ff..35287ae778c8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -514,8 +514,6 @@ private MergeTreeWriter createMergeTreeWriter( maxSequenceNumber, comparator, DeduplicateMergeFunction.factory().create(), - writerFactory.keyType(), - writerFactory.valueType(), writerFactory, options.commitForceCompact(), changelogProducer, From d7969807f291f81b2b37776ad1c324f28ccaf6c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Wed, 11 Dec 2024 10:49:32 +0800 Subject: [PATCH 18/33] fix comment --- .../test/java/org/apache/paimon/mergetree/LookupLevelsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index eba88272c504..a678534042eb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -82,7 +82,7 @@ public class LookupLevelsTest { private final RowType keyType = DataTypes.ROW( - DataTypes.FIELD(SpecialFields.KEY_FIELD_ID_START, "_key", DataTypes.INT())); + DataTypes.FIELD(SpecialFields.KEY_FIELD_ID_START, "_KEY_key", DataTypes.INT())); private final RowType rowType = DataTypes.ROW( DataTypes.FIELD(0, "key", DataTypes.INT()), From 23a26375f871313546b6b464cc7ae85d273e70a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Wed, 11 Dec 2024 12:29:25 +0800 Subject: [PATCH 19/33] fix comment --- .../java/org/apache/paimon/TestKeyValueGenerator.java | 10 ++++++---- .../apache/paimon/mergetree/ContainsLevelsTest.java | 5 ++++- .../paimon/operation/MergeFileSplitReadTest.java | 8 +++++++- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java b/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java index 657c791351a4..587204cd7616 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestKeyValueGenerator.java @@ -95,10 +95,12 @@ public class TestKeyValueGenerator { public static final RowType KEY_TYPE = RowType.of( new DataField( - 2 + SpecialFields.KEY_FIELD_ID_START, "key_shopId", new IntType(false)), + 2 + SpecialFields.KEY_FIELD_ID_START, + SpecialFields.KEY_FIELD_PREFIX + "shopId", + new IntType(false)), new DataField( 3 + SpecialFields.KEY_FIELD_ID_START, - "key_orderId", + SpecialFields.KEY_FIELD_PREFIX + "orderId", new BigIntType(false))); public static final InternalRowSerializer DEFAULT_ROW_SERIALIZER = @@ -281,7 +283,7 @@ public BinaryRow getPartition(KeyValue kv) { public static List getPrimaryKeys(GeneratorMode mode) { List trimmedPk = KEY_TYPE.getFieldNames().stream() - .map(f -> f.replaceFirst("key_", "")) + .map(f -> f.replaceFirst(SpecialFields.KEY_FIELD_PREFIX, "")) .collect(Collectors.toList()); if (mode != NON_PARTITIONED) { trimmedPk = new ArrayList<>(trimmedPk); @@ -394,7 +396,7 @@ public List keyFields(TableSchema schema) { f -> new DataField( f.id() + SpecialFields.KEY_FIELD_ID_START, - "key_" + f.name(), + SpecialFields.KEY_FIELD_PREFIX + f.name(), f.type(), f.description())) .collect(Collectors.toList()); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index 066c84275680..be49311427a0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -80,7 +80,10 @@ public class ContainsLevelsTest { private final RowType keyType = DataTypes.ROW( - DataTypes.FIELD(SpecialFields.KEY_FIELD_ID_START, "_key", DataTypes.INT())); + DataTypes.FIELD( + SpecialFields.KEY_FIELD_ID_START, + SpecialFields.KEY_FIELD_PREFIX + "key", + DataTypes.INT())); private final RowType rowType = DataTypes.ROW( DataTypes.FIELD(0, "key", DataTypes.INT()), diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java index a5b7fdd9f901..da7f78deea0d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java @@ -37,6 +37,7 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; @@ -284,7 +285,12 @@ private TestFileStore createStore( ? Collections.emptyList() : Stream.concat( keyType.getFieldNames().stream() - .map(field -> field.replace("key_", "")), + .map( + field -> + field.replace( + SpecialFields + .KEY_FIELD_PREFIX, + "")), partitionType.getFieldNames().stream()) .collect(Collectors.toList()), Collections.singletonMap("data-file.thin-mode", "false"), From b6bf207940bbdad8e0574b1cfa445ae35473c07c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Wed, 11 Dec 2024 13:37:07 +0800 Subject: [PATCH 20/33] fix comment --- .../java/org/apache/paimon/io/KeyValueFileReadWriteTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index b648e2af8972..e43cd898dbc2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -37,6 +37,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.stats.StatsTestUtils; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CloseableIterator; import org.apache.paimon.utils.FailingFileIO; @@ -158,7 +159,7 @@ public void testReadKeyType() throws Exception { List actualMetas = writer.result(); // projection: (shopId, orderId) -> (orderId) - RowType readKeyType = KEY_TYPE.project("key_orderId"); + RowType readKeyType = KEY_TYPE.project(SpecialFields.KEY_FIELD_PREFIX + "orderId"); KeyValueFileReaderFactory readerFactory = createReaderFactory(tempDir.toString(), "avro", readKeyType, null); InternalRowSerializer projectedKeySerializer = new InternalRowSerializer(readKeyType); From 03c6621c63888a26e9d3ecc752eaec2d1e34ea75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Wed, 11 Dec 2024 18:10:40 +0800 Subject: [PATCH 21/33] fix comment --- .../org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index a3dce02e15fb..99dd690d5eda 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -354,7 +354,6 @@ public void testBatchFilter(boolean statsDenseStore) throws Exception { if (statsDenseStore) { // pk table doesn't need value stats options.set(CoreOptions.METADATA_STATS_MODE, "none"); - // options.set(CoreOptions.STORAGE_THIN_MODE, false); } }; writeData(optionsSetter); From 52c448f9b62e2b65c2e2364b4e6381ca245a40e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 12 Dec 2024 12:27:44 +0800 Subject: [PATCH 22/33] fix comment --- .../main/java/org/apache/paimon/CoreOptions.java | 4 ++-- .../paimon/io/KeyValueFileWriterFactory.java | 5 +++-- .../io/KeyValueThinDataFileWriterImpl.java | 11 ++++------- .../paimon/utils/StatsCollectorFactories.java | 2 +- .../org/apache/paimon/stats/StatsTableTest.java | 16 ++++++++++++---- 5 files changed, 22 insertions(+), 16 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 8d5602602c5a..dd5632c18b42 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1455,7 +1455,7 @@ public class CoreOptions implements Serializable { public static final ConfigOption DATA_FILE_THIN_MODE = key("data-file.thin-mode") .booleanType() - .defaultValue(true) + .defaultValue(false) .withDescription( "Enable data file thin mode to avoid duplicate columns storage."); @@ -2363,7 +2363,7 @@ public boolean statsDenseStore() { return options.get(METADATA_STATS_DENSE_STORE); } - public boolean thinMode() { + public boolean dataFileThinMode() { return options.get(DATA_FILE_THIN_MODE); } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index c1996936c8a3..76967d52f7e3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -108,7 +108,7 @@ public RollingFileWriter createRollingChangelogFileWrite private KeyValueDataFileWriter createDataFileWriter( Path path, int level, FileSource fileSource) { - return options.thinMode() + return options.dataFileThinMode() ? new KeyValueThinDataFileWriterImpl( fileIO, formatContext.writerFactory(level), @@ -211,7 +211,8 @@ public KeyValueFileWriterFactory build( partition, bucket, keyType, - KeyValue.schema(options.thinMode() ? RowType.of() : keyType, valueType), + KeyValue.schema( + options.dataFileThinMode() ? RowType.of() : keyType, valueType), fileFormat, format2PathFactory, options); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java index 77a4f97e133d..dd7ebb006764 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java @@ -34,6 +34,7 @@ import javax.annotation.Nullable; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.function.Function; @@ -92,7 +93,7 @@ public KeyValueThinDataFileWriterImpl( options, fileSource, fileIndexOptions); - Map idToIndex = new HashMap<>(); + Map idToIndex = new HashMap<>(valueType.getFieldCount()); for (int i = 0; i < valueType.getFieldCount(); i++) { idToIndex.put(valueType.getFields().get(i).id(), i); } @@ -113,15 +114,11 @@ public KeyValueThinDataFileWriterImpl( @Override Pair fetchKeyValueStats(SimpleColStats[] rowStats) { int numKeyFields = keyType.getFieldCount(); - int numValueFields = valueType.getFieldCount(); - - SimpleColStats[] keyStats = new SimpleColStats[numKeyFields]; - SimpleColStats[] valFieldStats = new SimpleColStats[numValueFields]; - // In thin mode, there is no key stats in rowStats, so we only jump // _SEQUNCE_NUMBER_ and _ROW_KIND_ stats. Therefore, the 'from' value is 2. - System.arraycopy(rowStats, 2, valFieldStats, 0, numValueFields); + SimpleColStats[] valFieldStats = Arrays.copyOfRange(rowStats, 2, rowStats.length); // Thin mode on, so need to map value stats to key stats. + SimpleColStats[] keyStats = new SimpleColStats[numKeyFields]; for (int i = 0; i < keyStatMapping.length; i++) { keyStats[i] = valFieldStats[keyStatMapping[i]]; } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java index f23976bedb58..a9318a237c1d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java @@ -54,7 +54,7 @@ public static SimpleColStatsCollector.Factory[] createStatsFactories( if (fieldMode != null) { modes[i] = SimpleColStatsCollector.from(fieldMode); } else if (SpecialFields.isSystemField(field) - || (options.thinMode() + || (options.dataFileThinMode() && keyNames.contains(SpecialFields.KEY_FIELD_PREFIX + field))) { modes[i] = () -> new TruncateSimpleColStatsCollector(128); } else { diff --git a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java index b9fc4da160ac..c1cc131640fc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java @@ -34,6 +34,8 @@ import org.apache.paimon.types.DataTypes; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static org.apache.paimon.CoreOptions.METADATA_STATS_DENSE_STORE; import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE; @@ -42,13 +44,15 @@ /** Test for table stats mode. */ public class StatsTableTest extends TableTestBase { - @Test - public void testPartitionStatsNotDense() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPartitionStatsNotDense(boolean thinMode) throws Exception { Identifier identifier = identifier("T"); Options options = new Options(); options.set(METADATA_STATS_MODE, "NONE"); options.set(METADATA_STATS_DENSE_STORE, false); options.set(CoreOptions.BUCKET, 1); + options.set(CoreOptions.DATA_FILE_THIN_MODE, thinMode); Schema schema = Schema.newBuilder() .column("pt", DataTypes.INT()) @@ -86,11 +90,15 @@ public void testPartitionStatsNotDense() throws Exception { manifestFile.read(manifest.fileName(), manifest.fileSize()).get(0).file(); SimpleStats recordStats = file.valueStats(); assertThat(recordStats.minValues().isNullAt(0)).isTrue(); - assertThat(recordStats.minValues().isNullAt(1)).isFalse(); + assertThat(recordStats.minValues().isNullAt(1)).isEqualTo(!thinMode); assertThat(recordStats.minValues().isNullAt(2)).isTrue(); assertThat(recordStats.maxValues().isNullAt(0)).isTrue(); - assertThat(recordStats.maxValues().isNullAt(1)).isFalse(); + assertThat(recordStats.maxValues().isNullAt(1)).isEqualTo(!thinMode); assertThat(recordStats.maxValues().isNullAt(2)).isTrue(); + + SimpleStats keyStats = file.keyStats(); + assertThat(keyStats.minValues().isNullAt(0)).isFalse(); + assertThat(keyStats.maxValues().isNullAt(0)).isFalse(); } @Test From 24d9828cf8b6082e7f548b77260bd15747ee686e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 12 Dec 2024 12:49:33 +0800 Subject: [PATCH 23/33] fix comment --- .../org/apache/paimon/stats/StatsTableTest.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java index c1cc131640fc..ce8cfc9228ad 100644 --- a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java @@ -33,7 +33,6 @@ import org.apache.paimon.table.TableTestBase; import org.apache.paimon.types.DataTypes; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -101,12 +100,14 @@ public void testPartitionStatsNotDense(boolean thinMode) throws Exception { assertThat(keyStats.maxValues().isNullAt(0)).isFalse(); } - @Test - public void testPartitionStatsDenseMode() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPartitionStatsDenseMode(boolean thinMode) throws Exception { Identifier identifier = identifier("T"); Options options = new Options(); options.set(METADATA_STATS_MODE, "NONE"); options.set(CoreOptions.BUCKET, 1); + options.set(CoreOptions.DATA_FILE_THIN_MODE, thinMode); Schema schema = Schema.newBuilder() .column("pt", DataTypes.INT()) @@ -143,9 +144,10 @@ public void testPartitionStatsDenseMode() throws Exception { DataFileMeta file = manifestFile.read(manifest.fileName(), manifest.fileSize()).get(0).file(); SimpleStats recordStats = file.valueStats(); - assertThat(file.valueStatsCols().size()).isEqualTo(1); - assertThat(recordStats.minValues().getFieldCount()).isEqualTo(1); - assertThat(recordStats.maxValues().getFieldCount()).isEqualTo(1); - assertThat(recordStats.nullCounts().size()).isEqualTo(1); + int count = thinMode ? 1 : 0; + assertThat(file.valueStatsCols().size()).isEqualTo(count); + assertThat(recordStats.minValues().getFieldCount()).isEqualTo(count); + assertThat(recordStats.maxValues().getFieldCount()).isEqualTo(count); + assertThat(recordStats.nullCounts().size()).isEqualTo(count); } } From cae6c75ecf316c36a6cb8e12f32d3ad25cfcc5f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 12 Dec 2024 13:27:15 +0800 Subject: [PATCH 24/33] comment --- docs/layouts/shortcodes/generated/core_configuration.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 9200c48fb00e..15b1aac93543 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -866,7 +866,7 @@

data-file.thin-mode
- true + false Boolean Enable data file thin mode to avoid duplicate columns storage. From ceb989bfcdeec89e31a72732e80f0f3aea273c1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 12 Dec 2024 19:09:54 +0800 Subject: [PATCH 25/33] [core] Retry if snapshot commit hint failed. --- .../paimon/io/KeyValueFileWriterFactory.java | 57 +++++++++++++++---- .../operation/AppendOnlyFileStoreWrite.java | 3 + 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 76967d52f7e3..d2033124284d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -32,6 +32,8 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.StatsCollectorFactories; @@ -39,10 +41,13 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; /** A factory to create {@link FileWriter}s for writing {@link KeyValue} files. */ public class KeyValueFileWriterFactory { @@ -59,15 +64,13 @@ public class KeyValueFileWriterFactory { private KeyValueFileWriterFactory( FileIO fileIO, long schemaId, - RowType keyType, - RowType valueType, WriteFormatContext formatContext, long suggestedFileSize, CoreOptions options) { this.fileIO = fileIO; this.schemaId = schemaId; - this.keyType = keyType; - this.valueType = valueType; + this.keyType = formatContext.keyType; + this.valueType = formatContext.valueType; this.formatContext = formatContext; this.suggestedFileSize = suggestedFileSize; this.options = options; @@ -108,7 +111,7 @@ public RollingFileWriter createRollingChangelogFileWrite private KeyValueDataFileWriter createDataFileWriter( Path path, int level, FileSource fileSource) { - return options.dataFileThinMode() + return formatContext.thinModeEnabled() ? new KeyValueThinDataFileWriterImpl( fileIO, formatContext.writerFactory(level), @@ -211,13 +214,12 @@ public KeyValueFileWriterFactory build( partition, bucket, keyType, - KeyValue.schema( - options.dataFileThinMode() ? RowType.of() : keyType, valueType), + valueType, fileFormat, format2PathFactory, options); return new KeyValueFileWriterFactory( - fileIO, schemaId, keyType, valueType, context, suggestedFileSize, options); + fileIO, schemaId, context, suggestedFileSize, options); } } @@ -230,14 +232,24 @@ private static class WriteFormatContext { private final Map format2PathFactory; private final Map format2WriterFactory; + private final RowType keyType; + private final RowType valueType; + private final boolean thinModeEnabled; + private WriteFormatContext( BinaryRow partition, int bucket, RowType keyType, - RowType rowType, + RowType valueType, FileFormat defaultFormat, Map parentFactories, CoreOptions options) { + this.keyType = keyType; + this.valueType = valueType; + this.thinModeEnabled = + options.dataFileThinMode() && supportsThinMode(keyType, valueType); + RowType writeRowType = + KeyValue.schema(thinModeEnabled ? RowType.of() : keyType, valueType); Map fileFormatPerLevel = options.fileFormatPerLevel(); this.level2Format = level -> @@ -254,7 +266,9 @@ private WriteFormatContext( this.format2WriterFactory = new HashMap<>(); SimpleColStatsCollector.Factory[] statsCollectorFactories = StatsCollectorFactories.createStatsFactories( - options, rowType.getFieldNames(), keyType.getFieldNames()); + options, + writeRowType.getFieldNames(), + thinModeEnabled ? Collections.emptyList() : keyType.getFieldNames()); for (String format : parentFactories.keySet()) { format2PathFactory.put( format, @@ -270,9 +284,28 @@ private WriteFormatContext( format.equals("avro") ? Optional.empty() : fileFormat.createStatsExtractor( - rowType, statsCollectorFactories)); - format2WriterFactory.put(format, fileFormat.createWriterFactory(rowType)); + writeRowType, statsCollectorFactories)); + format2WriterFactory.put(format, fileFormat.createWriterFactory(writeRowType)); + } + } + + private boolean supportsThinMode(RowType keyType, RowType valueType) { + Set keyFieldIds = + valueType.getFields().stream().map(DataField::id).collect(Collectors.toSet()); + + for (DataField field : keyType.getFields()) { + if (!SpecialFields.isKeyField(field.name())) { + return false; + } + if (!keyFieldIds.contains(field.id() - SpecialFields.KEY_FIELD_ID_START)) { + return false; + } } + return true; + } + + private boolean thinModeEnabled() { + return thinModeEnabled; } @Nullable diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 3ce019c91638..4a6196453df6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -212,6 +212,9 @@ protected void forceBufferSpill() throws Exception { if (ioManager == null) { return; } + if (forceBufferSpill) { + return; + } forceBufferSpill = true; LOG.info( "Force buffer spill for append-only file store write, writer number is: {}", From 52808db61d7a8f94ad4f479b1a609d18c9a8b948 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 13 Dec 2024 10:03:11 +0800 Subject: [PATCH 26/33] fix comment --- .../org/apache/paimon/io/KeyValueFileWriterFactory.java | 2 +- .../main/java/org/apache/paimon/tag/TagAutoCreation.java | 6 ++++++ .../src/main/java/org/apache/paimon/tag/TagAutoManager.java | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index d2033124284d..a6aae3985bd4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -268,7 +268,7 @@ private WriteFormatContext( StatsCollectorFactories.createStatsFactories( options, writeRowType.getFieldNames(), - thinModeEnabled ? Collections.emptyList() : keyType.getFieldNames()); + thinModeEnabled ? keyType.getFieldNames() : Collections.emptyList()); for (String format : parentFactories.keySet()) { format2PathFactory.put( format, diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 58241033f5fb..3989786bd277 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -150,22 +150,28 @@ public void run() { private void tryToCreateTags(Snapshot snapshot) { Optional timeOptional = timeExtractor.extract(snapshot.timeMillis(), snapshot.watermark()); + LOG.info("Starting to create a tag for snapshot {}.", snapshot.id()); if (!timeOptional.isPresent()) { return; } LocalDateTime time = timeOptional.get(); + LOG.info("The time of snapshot {} is {}.", snapshot.id(), time); + LOG.info("The next tag time is {}.", nextTag); if (nextTag == null || isAfterOrEqual(time.minus(delay), periodHandler.nextTagTime(nextTag))) { LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time); + LOG.info("Create tag for snapshot {} with time {}.", snapshot.id(), thisTag); if (automaticCompletion && nextTag != null) { thisTag = nextTag; } String tagName = periodHandler.timeToTag(thisTag); + LOG.info("The tag name is {}.", tagName); if (!tagManager.tagExists(tagName)) { tagManager.createTag(snapshot, tagName, defaultTimeRetained, callbacks); } nextTag = periodHandler.nextTagTime(thisTag); + LOG.info("The next tag time after this is {}.", nextTag); if (numRetainedMax != null) { // only handle auto-created tags here diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java index 1ed1b3f2d4a2..817c20af4612 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java @@ -52,6 +52,7 @@ public static TagAutoManager create( TagManager tagManager, TagDeletion tagDeletion, List callbacks) { + TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options); return new TagAutoManager( From 93a77bf2a31e49d5302c5957734a6b2634d9f411 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 13 Dec 2024 10:11:39 +0800 Subject: [PATCH 27/33] comment --- .../paimon/mergetree/ChangelogMergeTreeRewriterTest.java | 1 - .../org/apache/paimon/operation/MergeFileSplitReadTest.java | 2 +- .../apache/paimon/table/PrimaryKeyFileStoreTableTest.java | 3 +-- .../paimon/flink/source/TestChangelogDataReadWrite.java | 6 ++---- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java index dae6d08d7cdd..4e5fed39c763 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java @@ -193,7 +193,6 @@ public void testRewriteSuccess(boolean rewriteChangelog) throws Exception { private KeyValueFileWriterFactory createWriterFactory( Path path, RowType keyType, RowType valueType) { Options options = new Options(); - options.set(CoreOptions.DATA_FILE_THIN_MODE, false); return KeyValueFileWriterFactory.builder( LocalFileIO.create(), diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java index da7f78deea0d..59f848a296cf 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java @@ -293,7 +293,7 @@ private TestFileStore createStore( "")), partitionType.getFieldNames().stream()) .collect(Collectors.toList()), - Collections.singletonMap("data-file.thin-mode", "false"), + Collections.emptyMap(), null); TableSchema tableSchema = schemaManager.createTable(schema); return new TestFileStore.Builder( diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 99dd690d5eda..a2ddecb4bd3a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -187,8 +187,7 @@ public void testAsyncReader() throws Exception { @Test public void testBatchWriteBuilder() throws Exception { FileStoreTable table = - createFileStoreTable() - .copy(Collections.singletonMap("data-file.thin-mode", "true")); + createFileStoreTable(); BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); BatchTableWrite write = writeBuilder.newWrite(); BatchTableCommit commit = writeBuilder.newCommit(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 70c0a0ea9183..d2bb9eb98274 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -167,10 +167,8 @@ public List writeFiles( } public RecordWriter createMergeTreeWriter(BinaryRow partition, int bucket) { - Map optionMap = new HashMap<>(); - optionMap.put(CoreOptions.FILE_FORMAT.key(), "avro"); - optionMap.put(CoreOptions.DATA_FILE_THIN_MODE.key(), "false"); - CoreOptions options = new CoreOptions(optionMap); + CoreOptions options = + new CoreOptions(Collections.singletonMap(CoreOptions.FILE_FORMAT.key(), "avro")); Map pathFactoryMap = new HashMap<>(); pathFactoryMap.put("avro", pathFactory); From 952b3a5ca06ffab86cc4d93ca21bc880aa3577fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 13 Dec 2024 10:14:23 +0800 Subject: [PATCH 28/33] fix comment --- .../java/org/apache/paimon/utils/StatsCollectorFactories.java | 2 ++ .../org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java index a9318a237c1d..ae1e304da983 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java @@ -55,6 +55,8 @@ public static SimpleColStatsCollector.Factory[] createStatsFactories( modes[i] = SimpleColStatsCollector.from(fieldMode); } else if (SpecialFields.isSystemField(field) || (options.dataFileThinMode() + // If we config METADATA_STATS_MODE to true, we need to maintain the + // stats for key fields. && keyNames.contains(SpecialFields.KEY_FIELD_PREFIX + field))) { modes[i] = () -> new TruncateSimpleColStatsCollector(128); } else { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index a2ddecb4bd3a..fa635e2ab666 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -186,8 +186,7 @@ public void testAsyncReader() throws Exception { @Test public void testBatchWriteBuilder() throws Exception { - FileStoreTable table = - createFileStoreTable(); + FileStoreTable table = createFileStoreTable(); BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); BatchTableWrite write = writeBuilder.newWrite(); BatchTableCommit commit = writeBuilder.newCommit(); From 8269a415bf5a1aa6f021db0cfcb4f6df3997733e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 13 Dec 2024 10:18:18 +0800 Subject: [PATCH 29/33] fix comment --- .../org/apache/paimon/utils/StatsCollectorFactories.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java index ae1e304da983..3331bbba2748 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java @@ -54,10 +54,10 @@ public static SimpleColStatsCollector.Factory[] createStatsFactories( if (fieldMode != null) { modes[i] = SimpleColStatsCollector.from(fieldMode); } else if (SpecialFields.isSystemField(field) - || (options.dataFileThinMode() - // If we config METADATA_STATS_MODE to true, we need to maintain the - // stats for key fields. - && keyNames.contains(SpecialFields.KEY_FIELD_PREFIX + field))) { + || + // If we config METADATA_STATS_MODE to true, we need to maintain the + // stats for key fields. + keyNames.contains(SpecialFields.KEY_FIELD_PREFIX + field)) { modes[i] = () -> new TruncateSimpleColStatsCollector(128); } else { modes[i] = SimpleColStatsCollector.from(cfg.get(CoreOptions.METADATA_STATS_MODE)); From 04e6ea04cefaeac9c808525550576a75bb747382 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 13 Dec 2024 11:10:51 +0800 Subject: [PATCH 30/33] fix comment --- .../test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 35287ae778c8..f2a9c44dd7ce 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -143,7 +143,6 @@ private void recreateMergeTree(long targetFileSize) { options.set( CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER, options.get(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER) + 1); - options.set(CoreOptions.DATA_FILE_THIN_MODE, false); this.options = new CoreOptions(options); RowType keyType = new RowType(singletonList(new DataField(0, "k", new IntType()))); RowType valueType = new RowType(singletonList(new DataField(1, "v", new IntType()))); From 78ac5dbeab5d85d09948f9e156d94e9af8600901 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 13 Dec 2024 11:44:19 +0800 Subject: [PATCH 31/33] fix comment --- .../src/main/java/org/apache/paimon/CoreOptions.java | 2 +- .../java/org/apache/paimon/utils/StatsCollectorFactories.java | 2 +- .../paimon/mergetree/ChangelogMergeTreeRewriterTest.java | 4 +--- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index dd5632c18b42..08ae41a1a795 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1455,7 +1455,7 @@ public class CoreOptions implements Serializable { public static final ConfigOption DATA_FILE_THIN_MODE = key("data-file.thin-mode") .booleanType() - .defaultValue(false) + .defaultValue(true) .withDescription( "Enable data file thin mode to avoid duplicate columns storage."); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java index 3331bbba2748..abb1d686073f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java @@ -55,7 +55,7 @@ public static SimpleColStatsCollector.Factory[] createStatsFactories( modes[i] = SimpleColStatsCollector.from(fieldMode); } else if (SpecialFields.isSystemField(field) || - // If we config METADATA_STATS_MODE to true, we need to maintain the + // If we config DATA_FILE_THIN_MODE to true, we need to maintain the // stats for key fields. keyNames.contains(SpecialFields.KEY_FIELD_PREFIX + field)) { modes[i] = () -> new TruncateSimpleColStatsCollector(128); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java index 4e5fed39c763..bbfa4d3659b6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java @@ -192,8 +192,6 @@ public void testRewriteSuccess(boolean rewriteChangelog) throws Exception { private KeyValueFileWriterFactory createWriterFactory( Path path, RowType keyType, RowType valueType) { - Options options = new Options(); - return KeyValueFileWriterFactory.builder( LocalFileIO.create(), 0, @@ -202,7 +200,7 @@ private KeyValueFileWriterFactory createWriterFactory( new FlushingFileFormat("avro"), Collections.singletonMap("avro", createNonPartFactory(path)), VALUE_128_MB.getBytes()) - .build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(options)); + .build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(new Options())); } private KeyValueFileReaderFactory createReaderFactory( From a3b68b298cb5b1f59252621b0ae8593086041aa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 13 Dec 2024 12:41:18 +0800 Subject: [PATCH 32/33] comment --- docs/layouts/shortcodes/generated/core_configuration.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 15b1aac93543..9200c48fb00e 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -866,7 +866,7 @@
data-file.thin-mode
- false + true Boolean Enable data file thin mode to avoid duplicate columns storage. From cf626f8948bd4f3a5dfd88445c1f288030896aad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 13 Dec 2024 13:27:20 +0800 Subject: [PATCH 33/33] fix comment --- docs/layouts/shortcodes/generated/core_configuration.html | 2 +- paimon-common/src/main/java/org/apache/paimon/CoreOptions.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 9200c48fb00e..15b1aac93543 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -866,7 +866,7 @@
data-file.thin-mode
- true + false Boolean Enable data file thin mode to avoid duplicate columns storage. diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 08ae41a1a795..dd5632c18b42 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1455,7 +1455,7 @@ public class CoreOptions implements Serializable { public static final ConfigOption DATA_FILE_THIN_MODE = key("data-file.thin-mode") .booleanType() - .defaultValue(true) + .defaultValue(false) .withDescription( "Enable data file thin mode to avoid duplicate columns storage.");