Skip to content

Commit

Permalink
[core] Write thin mode
Browse files Browse the repository at this point in the history
  • Loading branch information
仟弋 committed Dec 9, 2024
1 parent 9349d7b commit 07119d9
Show file tree
Hide file tree
Showing 17 changed files with 350 additions and 26 deletions.
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,12 @@ public class CoreOptions implements Serializable {
"For DELETE manifest entry in manifest file, drop stats to reduce memory and storage."
+ " Default value is false only for compatibility of old reader.");

public static final ConfigOption<Boolean> STORAGE_THIN_MODE =
key("storage.thin-mode")
.booleanType()
.defaultValue(true)
.withDescription("Enable storage thin mode to avoid duplicate columns store.");

@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption<String> MATERIALIZED_TABLE_DEFINITION_QUERY =
key("materialized-table.definition-query")
Expand Down Expand Up @@ -2323,6 +2329,10 @@ public boolean statsDenseStore() {
return options.get(METADATA_STATS_DENSE_STORE);
}

public boolean thinMode() {
return options.get(STORAGE_THIN_MODE);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
4 changes: 4 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public static RowType schema(RowType keyType, RowType valueType) {
return new RowType(createKeyValueFields(keyType.getFields(), valueType.getFields()));
}

public static RowType schema(RowType valueType) {
return schema(RowType.of(), valueType);
}

public static RowType schemaWithLevel(RowType keyType, RowType valueType) {
List<DataField> fields = new ArrayList<>(schema(keyType, valueType).getFields());
fields.add(LEVEL);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ObjectSerializer;

/** Serialize KeyValue to InternalRow with ignorance of key. Only used to write KeyValue to disk. */
public class KeyValueThinSerializer extends ObjectSerializer<KeyValue> {

private static final long serialVersionUID = 1L;

private final GenericRow reusedMeta;
private final JoinedRow reusedKeyWithMeta;

public KeyValueThinSerializer(RowType keyType, RowType valueType) {
super(KeyValue.schema(keyType, valueType));

this.reusedMeta = new GenericRow(2);
this.reusedKeyWithMeta = new JoinedRow();
}

public InternalRow toRow(KeyValue record) {
return toRow(record.sequenceNumber(), record.valueKind(), record.value());
}

public InternalRow toRow(long sequenceNumber, RowKind valueKind, InternalRow value) {
reusedMeta.setField(0, sequenceNumber);
reusedMeta.setField(1, valueKind.toByteValue());
return reusedKeyWithMeta.replace(reusedMeta, value);
}

@Override
public KeyValue fromRow(InternalRow row) {
throw new UnsupportedOperationException(
"KeyValue cannot be deserialized from InternalRow by this serializer.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StatsCollectorFactories;
Expand All @@ -44,7 +45,9 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;
Expand Down Expand Up @@ -77,6 +80,8 @@ public class KeyValueDataFileWriter
private long minSeqNumber = Long.MAX_VALUE;
private long maxSeqNumber = Long.MIN_VALUE;
private long deleteRecordCount = 0;
@Nullable private final KeyStateAbstractor keyStateAbstractor;
private final boolean thinMode;

public KeyValueDataFileWriter(
FileIO fileIO,
Expand All @@ -97,11 +102,14 @@ public KeyValueDataFileWriter(
factory,
path,
converter,
KeyValue.schema(keyType, valueType),
KeyValue.schema(options.thinMode() ? RowType.of() : keyType, valueType),
simpleStatsExtractor,
compression,
StatsCollectorFactories.createStatsFactories(
options, KeyValue.schema(keyType, valueType).getFieldNames()),
options,
KeyValue.schema(options.thinMode() ? RowType.of() : keyType, valueType)
.getFieldNames(),
keyType.getFieldNames()),
options.asyncFileWrite());

this.keyType = keyType;
Expand All @@ -116,6 +124,8 @@ public KeyValueDataFileWriter(
this.dataFileIndexWriter =
DataFileIndexWriter.create(
fileIO, dataFileToFileIndexPath(path), valueType, fileIndexOptions);
this.thinMode = options.thinMode();
this.keyStateAbstractor = thinMode ? new KeyStateAbstractor(keyType, valueType) : null;
}

@Override
Expand Down Expand Up @@ -169,15 +179,17 @@ public DataFileMeta result() throws IOException {
SimpleColStats[] rowStats = fieldStats();
int numKeyFields = keyType.getFieldCount();

SimpleColStats[] keyFieldStats = Arrays.copyOfRange(rowStats, 0, numKeyFields);
SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyFieldStats);

SimpleColStats[] valFieldStats =
Arrays.copyOfRange(rowStats, numKeyFields + 2, rowStats.length);

int valueFrom = thinMode ? 2 : numKeyFields + 2;
SimpleColStats[] valFieldStats = Arrays.copyOfRange(rowStats, valueFrom, rowStats.length);
Pair<List<String>, SimpleStats> valueStatsPair =
valueStatsConverter.toBinary(valFieldStats);

SimpleColStats[] keyFieldStats =
thinMode
? keyStateAbstractor.abstractFromValueState(valFieldStats)
: Arrays.copyOfRange(rowStats, 0, numKeyFields);
SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyFieldStats);

DataFileIndexWriter.FileIndexResult indexResult =
dataFileIndexWriter == null
? DataFileIndexWriter.EMPTY_RESULT
Expand Down Expand Up @@ -211,4 +223,33 @@ public void close() throws IOException {
}
super.close();
}

private static class KeyStateAbstractor {

private final int[] keyStatMapping;

public KeyStateAbstractor(RowType keyType, RowType valueType) {

Map<Integer, Integer> idToIndex = new HashMap<>();
for (int i = 0; i < valueType.getFieldCount(); i++) {
idToIndex.put(valueType.getFields().get(i).id(), i);
}

keyStatMapping = new int[keyType.getFieldCount()];

for (int i = 0; i < keyType.getFieldCount(); i++) {
keyStatMapping[i] =
idToIndex.get(
keyType.getFields().get(i).id() - SpecialFields.KEY_FIELD_ID_START);
}
}

SimpleColStats[] abstractFromValueState(SimpleColStats[] valueStats) {
SimpleColStats[] keyStats = new SimpleColStats[keyStatMapping.length];
for (int i = 0; i < keyStatMapping.length; i++) {
keyStats[i] = valueStats[keyStatMapping[i]];
}
return keyStats;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueSerializer;
import org.apache.paimon.KeyValueThinSerializer;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fileindex.FileIndexOptions;
Expand All @@ -33,6 +34,7 @@
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ObjectSerializer;
import org.apache.paimon.utils.StatsCollectorFactories;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -107,7 +109,10 @@ public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWrite

private KeyValueDataFileWriter createDataFileWriter(
Path path, int level, FileSource fileSource) {
KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType);
ObjectSerializer<KeyValue> kvSerializer =
options.thinMode()
? new KeyValueThinSerializer(keyType, valueType)
: new KeyValueSerializer(keyType, valueType);
return new KeyValueDataFileWriter(
fileIO,
formatContext.writerFactory(level),
Expand Down Expand Up @@ -191,12 +196,14 @@ private Builder(

public KeyValueFileWriterFactory build(
BinaryRow partition, int bucket, CoreOptions options) {
RowType fileRowType = KeyValue.schema(keyType, valueType);
RowType finalKeyType = options.thinMode() ? RowType.of() : keyType;
RowType writeKeyType = KeyValue.schema(finalKeyType, valueType);
WriteFormatContext context =
new WriteFormatContext(
partition,
bucket,
fileRowType,
keyType,
writeKeyType,
fileFormat,
format2PathFactory,
options);
Expand All @@ -217,6 +224,7 @@ private static class WriteFormatContext {
private WriteFormatContext(
BinaryRow partition,
int bucket,
RowType keyType,
RowType rowType,
FileFormat defaultFormat,
Map<String, FileStorePathFactory> parentFactories,
Expand All @@ -236,7 +244,8 @@ private WriteFormatContext(
this.format2PathFactory = new HashMap<>();
this.format2WriterFactory = new HashMap<>();
SimpleColStatsCollector.Factory[] statsCollectorFactories =
StatsCollectorFactories.createStatsFactories(options, rowType.getFieldNames());
StatsCollectorFactories.createStatsFactories(
options, rowType.getFieldNames(), keyType.getFieldNames());
for (String format : parentFactories.keySet()) {
format2PathFactory.put(
format,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public MergeTreeWriter(
long maxSequenceNumber,
Comparator<InternalRow> keyComparator,
MergeFunction<KeyValue> mergeFunction,
RowType keyType,
RowType valueType,
KeyValueFileWriterFactory writerFactory,
boolean commitForceCompact,
ChangelogProducer changelogProducer,
Expand All @@ -106,8 +108,8 @@ public MergeTreeWriter(
this.sortMaxFan = sortMaxFan;
this.sortCompression = sortCompression;
this.ioManager = ioManager;
this.keyType = writerFactory.keyType();
this.valueType = writerFactory.valueType();
this.keyType = keyType;
this.valueType = valueType;
this.compactManager = compactManager;
this.newSequenceNumber = maxSequenceNumber + 1;
this.keyComparator = keyComparator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ protected MergeTreeWriter createWriter(
restoredMaxSeqNumber,
keyComparator,
mfFactory.create(),
keyType,
valueType,
writerFactory,
options.commitForceCompact(),
options.changelogProducer(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.statistics.TruncateSimpleColStatsCollector;
import org.apache.paimon.table.SpecialFields;

import java.util.Collections;
import java.util.List;

import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
Expand All @@ -35,6 +36,11 @@ public class StatsCollectorFactories {

public static SimpleColStatsCollector.Factory[] createStatsFactories(
CoreOptions options, List<String> fields) {
return createStatsFactories(options, fields, Collections.emptyList());
}

public static SimpleColStatsCollector.Factory[] createStatsFactories(
CoreOptions options, List<String> fields, List<String> keyNames) {
Options cfg = options.toConfiguration();
SimpleColStatsCollector.Factory[] modes =
new SimpleColStatsCollector.Factory[fields.size()];
Expand All @@ -47,7 +53,9 @@ public static SimpleColStatsCollector.Factory[] createStatsFactories(
.noDefaultValue());
if (fieldMode != null) {
modes[i] = SimpleColStatsCollector.from(fieldMode);
} else if (SpecialFields.isSystemField(field)) {
} else if (SpecialFields.isSystemField(field)
|| (options.thinMode()
&& keyNames.contains(SpecialFields.KEY_FIELD_PREFIX + field))) {
modes[i] = () -> new TruncateSimpleColStatsCollector(128);
} else {
modes[i] = SimpleColStatsCollector.from(cfg.get(CoreOptions.METADATA_STATS_MODE));
Expand Down
Loading

0 comments on commit 07119d9

Please sign in to comment.