diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index b2bd3a976d66..ff47f4c36625 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -864,6 +864,12 @@
Integer |
Default spill compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease. |
+
+ storage.thin-mode |
+ true |
+ Boolean |
+ Enable storage thin mode to avoid duplicate columns store. |
+
streaming-read-mode |
(none) |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 5db809cff1d1..965a704e29cf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1452,6 +1452,12 @@ public class CoreOptions implements Serializable {
"For DELETE manifest entry in manifest file, drop stats to reduce memory and storage."
+ " Default value is false only for compatibility of old reader.");
+ public static final ConfigOption STORAGE_THIN_MODE =
+ key("storage.thin-mode")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Enable storage thin mode to avoid duplicate columns store.");
+
@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption MATERIALIZED_TABLE_DEFINITION_QUERY =
key("materialized-table.definition-query")
@@ -2356,6 +2362,10 @@ public boolean statsDenseStore() {
return options.get(METADATA_STATS_DENSE_STORE);
}
+ public boolean thinMode() {
+ return options.get(STORAGE_THIN_MODE);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
index 36ac88996ce3..75e0a0659303 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
@@ -114,6 +114,10 @@ public static RowType schema(RowType keyType, RowType valueType) {
return new RowType(createKeyValueFields(keyType.getFields(), valueType.getFields()));
}
+ public static RowType schema(RowType valueType) {
+ return schema(RowType.of(), valueType);
+ }
+
public static RowType schemaWithLevel(RowType keyType, RowType valueType) {
List fields = new ArrayList<>(schema(keyType, valueType).getFields());
fields.add(LEVEL);
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueThinSerializer.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueThinSerializer.java
new file mode 100644
index 000000000000..6dd41a42506a
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueThinSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ObjectSerializer;
+
+/** Serialize KeyValue to InternalRow with ignorance of key. Only used to write KeyValue to disk. */
+public class KeyValueThinSerializer extends ObjectSerializer {
+
+ private static final long serialVersionUID = 1L;
+
+ private final GenericRow reusedMeta;
+ private final JoinedRow reusedKeyWithMeta;
+
+ public KeyValueThinSerializer(RowType keyType, RowType valueType) {
+ super(KeyValue.schema(keyType, valueType));
+
+ this.reusedMeta = new GenericRow(2);
+ this.reusedKeyWithMeta = new JoinedRow();
+ }
+
+ public InternalRow toRow(KeyValue record) {
+ return toRow(record.sequenceNumber(), record.valueKind(), record.value());
+ }
+
+ public InternalRow toRow(long sequenceNumber, RowKind valueKind, InternalRow value) {
+ reusedMeta.setField(0, sequenceNumber);
+ reusedMeta.setField(1, valueKind.toByteValue());
+ return reusedKeyWithMeta.replace(reusedMeta, value);
+ }
+
+ @Override
+ public KeyValue fromRow(InternalRow row) {
+ throw new UnsupportedOperationException(
+ "KeyValue cannot be deserialized from InternalRow by this serializer.");
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index ce0b3b02840b..55e1c56ea192 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -32,6 +32,7 @@
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsConverter;
+import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StatsCollectorFactories;
@@ -44,7 +45,9 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.function.Function;
import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;
@@ -77,6 +80,8 @@ public class KeyValueDataFileWriter
private long minSeqNumber = Long.MAX_VALUE;
private long maxSeqNumber = Long.MIN_VALUE;
private long deleteRecordCount = 0;
+ @Nullable private final KeyStateAbstractor keyStateAbstractor;
+ private final boolean thinMode;
public KeyValueDataFileWriter(
FileIO fileIO,
@@ -97,11 +102,14 @@ public KeyValueDataFileWriter(
factory,
path,
converter,
- KeyValue.schema(keyType, valueType),
+ KeyValue.schema(options.thinMode() ? RowType.of() : keyType, valueType),
simpleStatsExtractor,
compression,
StatsCollectorFactories.createStatsFactories(
- options, KeyValue.schema(keyType, valueType).getFieldNames()),
+ options,
+ KeyValue.schema(options.thinMode() ? RowType.of() : keyType, valueType)
+ .getFieldNames(),
+ keyType.getFieldNames()),
options.asyncFileWrite());
this.keyType = keyType;
@@ -116,6 +124,8 @@ public KeyValueDataFileWriter(
this.dataFileIndexWriter =
DataFileIndexWriter.create(
fileIO, dataFileToFileIndexPath(path), valueType, fileIndexOptions);
+ this.thinMode = options.thinMode();
+ this.keyStateAbstractor = thinMode ? new KeyStateAbstractor(keyType, valueType) : null;
}
@Override
@@ -169,15 +179,17 @@ public DataFileMeta result() throws IOException {
SimpleColStats[] rowStats = fieldStats();
int numKeyFields = keyType.getFieldCount();
- SimpleColStats[] keyFieldStats = Arrays.copyOfRange(rowStats, 0, numKeyFields);
- SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyFieldStats);
-
- SimpleColStats[] valFieldStats =
- Arrays.copyOfRange(rowStats, numKeyFields + 2, rowStats.length);
-
+ int valueFrom = thinMode ? 2 : numKeyFields + 2;
+ SimpleColStats[] valFieldStats = Arrays.copyOfRange(rowStats, valueFrom, rowStats.length);
Pair, SimpleStats> valueStatsPair =
valueStatsConverter.toBinary(valFieldStats);
+ SimpleColStats[] keyFieldStats =
+ thinMode
+ ? keyStateAbstractor.abstractFromValueState(valFieldStats)
+ : Arrays.copyOfRange(rowStats, 0, numKeyFields);
+ SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyFieldStats);
+
DataFileIndexWriter.FileIndexResult indexResult =
dataFileIndexWriter == null
? DataFileIndexWriter.EMPTY_RESULT
@@ -211,4 +223,33 @@ public void close() throws IOException {
}
super.close();
}
+
+ private static class KeyStateAbstractor {
+
+ private final int[] keyStatMapping;
+
+ public KeyStateAbstractor(RowType keyType, RowType valueType) {
+
+ Map idToIndex = new HashMap<>();
+ for (int i = 0; i < valueType.getFieldCount(); i++) {
+ idToIndex.put(valueType.getFields().get(i).id(), i);
+ }
+
+ keyStatMapping = new int[keyType.getFieldCount()];
+
+ for (int i = 0; i < keyType.getFieldCount(); i++) {
+ keyStatMapping[i] =
+ idToIndex.get(
+ keyType.getFields().get(i).id() - SpecialFields.KEY_FIELD_ID_START);
+ }
+ }
+
+ SimpleColStats[] abstractFromValueState(SimpleColStats[] valueStats) {
+ SimpleColStats[] keyStats = new SimpleColStats[keyStatMapping.length];
+ for (int i = 0; i < keyStatMapping.length; i++) {
+ keyStats[i] = valueStats[keyStatMapping[i]];
+ }
+ return keyStats;
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index a6fddb43283a..2e25c4b1e539 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueSerializer;
+import org.apache.paimon.KeyValueThinSerializer;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fileindex.FileIndexOptions;
@@ -33,6 +34,7 @@
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.ObjectSerializer;
import org.apache.paimon.utils.StatsCollectorFactories;
import javax.annotation.Nullable;
@@ -107,7 +109,10 @@ public RollingFileWriter createRollingChangelogFileWrite
private KeyValueDataFileWriter createDataFileWriter(
Path path, int level, FileSource fileSource) {
- KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType);
+ ObjectSerializer kvSerializer =
+ options.thinMode()
+ ? new KeyValueThinSerializer(keyType, valueType)
+ : new KeyValueSerializer(keyType, valueType);
return new KeyValueDataFileWriter(
fileIO,
formatContext.writerFactory(level),
@@ -191,12 +196,14 @@ private Builder(
public KeyValueFileWriterFactory build(
BinaryRow partition, int bucket, CoreOptions options) {
- RowType fileRowType = KeyValue.schema(keyType, valueType);
+ RowType finalKeyType = options.thinMode() ? RowType.of() : keyType;
+ RowType writeKeyType = KeyValue.schema(finalKeyType, valueType);
WriteFormatContext context =
new WriteFormatContext(
partition,
bucket,
- fileRowType,
+ keyType,
+ writeKeyType,
fileFormat,
format2PathFactory,
options);
@@ -217,6 +224,7 @@ private static class WriteFormatContext {
private WriteFormatContext(
BinaryRow partition,
int bucket,
+ RowType keyType,
RowType rowType,
FileFormat defaultFormat,
Map parentFactories,
@@ -236,7 +244,8 @@ private WriteFormatContext(
this.format2PathFactory = new HashMap<>();
this.format2WriterFactory = new HashMap<>();
SimpleColStatsCollector.Factory[] statsCollectorFactories =
- StatsCollectorFactories.createStatsFactories(options, rowType.getFieldNames());
+ StatsCollectorFactories.createStatsFactories(
+ options, rowType.getFieldNames(), keyType.getFieldNames());
for (String format : parentFactories.keySet()) {
format2PathFactory.put(
format,
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index f2a964bae16a..34aab1b767ba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -96,6 +96,8 @@ public MergeTreeWriter(
long maxSequenceNumber,
Comparator keyComparator,
MergeFunction mergeFunction,
+ RowType keyType,
+ RowType valueType,
KeyValueFileWriterFactory writerFactory,
boolean commitForceCompact,
ChangelogProducer changelogProducer,
@@ -106,8 +108,8 @@ public MergeTreeWriter(
this.sortMaxFan = sortMaxFan;
this.sortCompression = sortCompression;
this.ioManager = ioManager;
- this.keyType = writerFactory.keyType();
- this.valueType = writerFactory.valueType();
+ this.keyType = keyType;
+ this.valueType = valueType;
this.compactManager = compactManager;
this.newSequenceNumber = maxSequenceNumber + 1;
this.keyComparator = keyComparator;
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index d061e181618b..05bf5797e050 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -220,6 +220,8 @@ protected MergeTreeWriter createWriter(
restoredMaxSeqNumber,
keyComparator,
mfFactory.create(),
+ keyType,
+ valueType,
writerFactory,
options.commitForceCompact(),
options.changelogProducer(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
index de94b2e23eff..f23976bedb58 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
@@ -24,6 +24,7 @@
import org.apache.paimon.statistics.TruncateSimpleColStatsCollector;
import org.apache.paimon.table.SpecialFields;
+import java.util.Collections;
import java.util.List;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
@@ -35,6 +36,11 @@ public class StatsCollectorFactories {
public static SimpleColStatsCollector.Factory[] createStatsFactories(
CoreOptions options, List fields) {
+ return createStatsFactories(options, fields, Collections.emptyList());
+ }
+
+ public static SimpleColStatsCollector.Factory[] createStatsFactories(
+ CoreOptions options, List fields, List keyNames) {
Options cfg = options.toConfiguration();
SimpleColStatsCollector.Factory[] modes =
new SimpleColStatsCollector.Factory[fields.size()];
@@ -47,7 +53,9 @@ public static SimpleColStatsCollector.Factory[] createStatsFactories(
.noDefaultValue());
if (fieldMode != null) {
modes[i] = SimpleColStatsCollector.from(fieldMode);
- } else if (SpecialFields.isSystemField(field)) {
+ } else if (SpecialFields.isSystemField(field)
+ || (options.thinMode()
+ && keyNames.contains(SpecialFields.KEY_FIELD_PREFIX + field))) {
modes[i] = () -> new TruncateSimpleColStatsCollector(128);
} else {
modes[i] = SimpleColStatsCollector.from(cfg.get(CoreOptions.METADATA_STATS_MODE));
diff --git a/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java
new file mode 100644
index 000000000000..2c66dfe79be3
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/format/ThinModeReadWriteTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+/** This class test the compatibility and effectiveness of storage thin mode. */
+public class ThinModeReadWriteTest extends TableTestBase {
+
+ private Table createTable(String format, Boolean thinMode) throws Exception {
+ catalog.createTable(identifier(), schema(format, thinMode), true);
+ return catalog.getTable(identifier());
+ }
+
+ private Schema schema(String format, Boolean thinMode) {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.INT());
+ schemaBuilder.column("f2", DataTypes.SMALLINT());
+ schemaBuilder.column("f3", DataTypes.STRING());
+ schemaBuilder.column("f4", DataTypes.DOUBLE());
+ schemaBuilder.column("f5", DataTypes.CHAR(100));
+ schemaBuilder.column("f6", DataTypes.VARCHAR(100));
+ schemaBuilder.column("f7", DataTypes.BOOLEAN());
+ schemaBuilder.column("f8", DataTypes.INT());
+ schemaBuilder.column("f9", DataTypes.TIME());
+ schemaBuilder.column("f10", DataTypes.TIMESTAMP());
+ schemaBuilder.column("f11", DataTypes.DECIMAL(10, 2));
+ schemaBuilder.column("f12", DataTypes.BYTES());
+ schemaBuilder.column("f13", DataTypes.FLOAT());
+ schemaBuilder.column("f14", DataTypes.BINARY(100));
+ schemaBuilder.column("f15", DataTypes.VARBINARY(100));
+ schemaBuilder.primaryKey(
+ "f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12",
+ "f13");
+ schemaBuilder.option("bucket", "1");
+ schemaBuilder.option("bucket-key", "f1");
+ schemaBuilder.option("file.format", format);
+ schemaBuilder.option("storage.thin-mode", thinMode.toString());
+ return schemaBuilder.build();
+ }
+
+ @Test
+ public void testThinModeWorks() throws Exception {
+
+ InternalRow[] datas = datas(200000);
+
+ Table table = createTable("orc", true);
+ write(table, datas);
+
+ long size1 = tableSize(table);
+ dropTableDefault();
+
+ table = createTable("orc", false);
+ write(table, datas);
+ long size2 = tableSize(table);
+ dropTableDefault();
+
+ Assertions.assertThat(size2).isGreaterThan(size1);
+ }
+
+ @Test
+ public void testAllFormatReadWrite() throws Exception {
+ testFormat("orc");
+ testFormat("parquet");
+ testFormat("avro");
+ }
+
+ private void testFormat(String format) throws Exception {
+ testReadWrite(format, true, true);
+ testReadWrite(format, true, false);
+ testReadWrite(format, false, true);
+ testReadWrite(format, false, false);
+ }
+
+ private void testReadWrite(String format, boolean writeThin, boolean readThin)
+ throws Exception {
+ Table tableWrite = createTable(format, writeThin);
+ Table tableRead = setThinMode(tableWrite, readThin);
+
+ InternalRow[] datas = datas(2000);
+
+ write(tableWrite, datas);
+
+ List readed = read(tableRead);
+
+ Assertions.assertThat(readed).containsExactlyInAnyOrder(datas);
+ dropTableDefault();
+ }
+
+ private Table setThinMode(Table table, Boolean flag) {
+ return table.copy(
+ new HashMap() {
+ {
+ put("storage.thin-mode", flag.toString());
+ }
+ });
+ }
+
+ InternalRow[] datas(int i) {
+ InternalRow[] arrays = new InternalRow[i];
+ for (int j = 0; j < i; j++) {
+ arrays[j] = data();
+ }
+ return arrays;
+ }
+
+ protected InternalRow data() {
+ return GenericRow.of(
+ RANDOM.nextInt(),
+ RANDOM.nextInt(),
+ (short) RANDOM.nextInt(),
+ randomString(),
+ RANDOM.nextDouble(),
+ randomString(),
+ randomString(),
+ RANDOM.nextBoolean(),
+ RANDOM.nextInt(),
+ RANDOM.nextInt(),
+ Timestamp.now(),
+ Decimal.zero(10, 2),
+ randomBytes(),
+ (float) RANDOM.nextDouble(),
+ randomBytes(),
+ randomBytes());
+ }
+
+ public static long tableSize(Table table) throws Exception {
+ long count = 0;
+ List files =
+ ((FileStoreTable) table).store().newScan().plan().files(FileKind.ADD);
+ for (ManifestEntry file : files) {
+ count += file.file().fileSize();
+ }
+
+ return count;
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
index bbfa4d3659b6..e2967a2ff67d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
@@ -192,6 +192,9 @@ public void testRewriteSuccess(boolean rewriteChangelog) throws Exception {
private KeyValueFileWriterFactory createWriterFactory(
Path path, RowType keyType, RowType valueType) {
+ Options options = new Options();
+ options.set(CoreOptions.STORAGE_THIN_MODE, false);
+
return KeyValueFileWriterFactory.builder(
LocalFileIO.create(),
0,
@@ -200,7 +203,7 @@ private KeyValueFileWriterFactory createWriterFactory(
new FlushingFileFormat("avro"),
Collections.singletonMap("avro", createNonPartFactory(path)),
VALUE_128_MB.getBytes())
- .build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(new Options()));
+ .build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(options));
}
private KeyValueFileReaderFactory createReaderFactory(
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 0ab636c33aa3..066c84275680 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -41,6 +41,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.SchemaEvolutionTableTestBase;
+import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
@@ -77,7 +78,9 @@ public class ContainsLevelsTest {
private final Comparator comparator = Comparator.comparingInt(o -> o.getInt(0));
- private final RowType keyType = DataTypes.ROW(DataTypes.FIELD(0, "_key", DataTypes.INT()));
+ private final RowType keyType =
+ DataTypes.ROW(
+ DataTypes.FIELD(SpecialFields.KEY_FIELD_ID_START, "_key", DataTypes.INT()));
private final RowType rowType =
DataTypes.ROW(
DataTypes.FIELD(0, "key", DataTypes.INT()),
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 2dce81ce56b4..eba88272c504 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -41,6 +41,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.SchemaEvolutionTableTestBase;
+import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
@@ -79,7 +80,9 @@ public class LookupLevelsTest {
private final Comparator comparator = Comparator.comparingInt(o -> o.getInt(0));
- private final RowType keyType = DataTypes.ROW(DataTypes.FIELD(0, "_key", DataTypes.INT()));
+ private final RowType keyType =
+ DataTypes.ROW(
+ DataTypes.FIELD(SpecialFields.KEY_FIELD_ID_START, "_key", DataTypes.INT()));
private final RowType rowType =
DataTypes.ROW(
DataTypes.FIELD(0, "key", DataTypes.INT()),
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index f2a9c44dd7ce..af6c442d861d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -143,6 +143,7 @@ private void recreateMergeTree(long targetFileSize) {
options.set(
CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER,
options.get(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER) + 1);
+ options.set(CoreOptions.STORAGE_THIN_MODE, false);
this.options = new CoreOptions(options);
RowType keyType = new RowType(singletonList(new DataField(0, "k", new IntType())));
RowType valueType = new RowType(singletonList(new DataField(1, "v", new IntType())));
@@ -513,6 +514,8 @@ private MergeTreeWriter createMergeTreeWriter(
maxSequenceNumber,
comparator,
DeduplicateMergeFunction.factory().create(),
+ writerFactory.keyType(),
+ writerFactory.valueType(),
writerFactory,
options.commitForceCompact(),
changelogProducer,
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
index 46b64422fd9b..0e5769255680 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
@@ -287,7 +287,7 @@ private TestFileStore createStore(
.map(field -> field.replace("key_", "")),
partitionType.getFieldNames().stream())
.collect(Collectors.toList()),
- Collections.emptyMap(),
+ Collections.singletonMap("storage.thin-mode", "false"),
null);
TableSchema tableSchema = schemaManager.createTable(schema);
return new TestFileStore.Builder(
diff --git a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java
index 25282d898a3d..b9fc4da160ac 100644
--- a/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/stats/StatsTableTest.java
@@ -86,10 +86,10 @@ public void testPartitionStatsNotDense() throws Exception {
manifestFile.read(manifest.fileName(), manifest.fileSize()).get(0).file();
SimpleStats recordStats = file.valueStats();
assertThat(recordStats.minValues().isNullAt(0)).isTrue();
- assertThat(recordStats.minValues().isNullAt(1)).isTrue();
+ assertThat(recordStats.minValues().isNullAt(1)).isFalse();
assertThat(recordStats.minValues().isNullAt(2)).isTrue();
assertThat(recordStats.maxValues().isNullAt(0)).isTrue();
- assertThat(recordStats.maxValues().isNullAt(1)).isTrue();
+ assertThat(recordStats.maxValues().isNullAt(1)).isFalse();
assertThat(recordStats.maxValues().isNullAt(2)).isTrue();
}
@@ -135,9 +135,9 @@ public void testPartitionStatsDenseMode() throws Exception {
DataFileMeta file =
manifestFile.read(manifest.fileName(), manifest.fileSize()).get(0).file();
SimpleStats recordStats = file.valueStats();
- assertThat(file.valueStatsCols()).isEmpty();
- assertThat(recordStats.minValues().getFieldCount()).isEqualTo(0);
- assertThat(recordStats.maxValues().getFieldCount()).isEqualTo(0);
- assertThat(recordStats.nullCounts().size()).isEqualTo(0);
+ assertThat(file.valueStatsCols().size()).isEqualTo(1);
+ assertThat(recordStats.minValues().getFieldCount()).isEqualTo(1);
+ assertThat(recordStats.maxValues().getFieldCount()).isEqualTo(1);
+ assertThat(recordStats.nullCounts().size()).isEqualTo(1);
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index fa635e2ab666..e44b649534d1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -186,7 +186,8 @@ public void testAsyncReader() throws Exception {
@Test
public void testBatchWriteBuilder() throws Exception {
- FileStoreTable table = createFileStoreTable();
+ FileStoreTable table =
+ createFileStoreTable().copy(Collections.singletonMap("storage.thin-mode", "true"));
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit();
@@ -352,6 +353,7 @@ public void testBatchFilter(boolean statsDenseStore) throws Exception {
if (statsDenseStore) {
// pk table doesn't need value stats
options.set(CoreOptions.METADATA_STATS_MODE, "none");
+ // options.set(CoreOptions.STORAGE_THIN_MODE, false);
}
};
writeData(optionsSetter);
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index 7f850a7725b4..f8ab2cffade9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -160,6 +160,10 @@ protected void compact(
}
}
+ public void dropTableDefault() throws Exception {
+ catalog.dropTable(identifier(), true);
+ }
+
protected List read(Table table, Pair, String>... dynamicOptions)
throws Exception {
return read(table, null, dynamicOptions);
@@ -175,6 +179,8 @@ protected List read(
options.put(pair.getKey().key(), pair.getValue());
}
table = table.copy(options);
+ // PredicateBuilder predicateBuilder = new PredicateBuilder();
+ // predicateBuilder.equal()
ReadBuilder readBuilder = table.newReadBuilder();
if (projection != null) {
readBuilder.withProjection(projection);
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index d2bb9eb98274..9dfaba60c78e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -167,8 +167,10 @@ public List writeFiles(
}
public RecordWriter createMergeTreeWriter(BinaryRow partition, int bucket) {
- CoreOptions options =
- new CoreOptions(Collections.singletonMap(CoreOptions.FILE_FORMAT.key(), "avro"));
+ Map optionMap = new HashMap<>();
+ optionMap.put(CoreOptions.FILE_FORMAT.key(), "avro");
+ optionMap.put(CoreOptions.STORAGE_THIN_MODE.key(), "false");
+ CoreOptions options = new CoreOptions(optionMap);
Map pathFactoryMap = new HashMap<>();
pathFactoryMap.put("avro", pathFactory);