Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce data-file.thin-mode in primary key table write #4666

Merged
merged 33 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,12 @@
<td>Integer</td>
<td>Default spill compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease.</td>
</tr>
<tr>
<td><h5>data-file.thin-mode</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Enable data file thin mode to avoid duplicate columns storage.</td>
</tr>
<tr>
<td><h5>streaming-read-mode</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1452,6 +1452,13 @@ public class CoreOptions implements Serializable {
"For DELETE manifest entry in manifest file, drop stats to reduce memory and storage."
+ " Default value is false only for compatibility of old reader.");

public static final ConfigOption<Boolean> DATA_FILE_THIN_MODE =
key("data-file.thin-mode")
.booleanType()
.defaultValue(true)
leaves12138 marked this conversation as resolved.
Show resolved Hide resolved
leaves12138 marked this conversation as resolved.
Show resolved Hide resolved
.withDescription(
"Enable data file thin mode to avoid duplicate columns storage.");

@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption<String> MATERIALIZED_TABLE_DEFINITION_QUERY =
key("materialized-table.definition-query")
Expand Down Expand Up @@ -2356,6 +2363,10 @@ public boolean statsDenseStore() {
return options.get(METADATA_STATS_DENSE_STORE);
}

public boolean thinMode() {
leaves12138 marked this conversation as resolved.
Show resolved Hide resolved
return options.get(DATA_FILE_THIN_MODE);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ObjectSerializer;

/** Serialize KeyValue to InternalRow with ignorance of key. Only used to write KeyValue to disk. */
public class KeyValueThinSerializer extends ObjectSerializer<KeyValue> {

private static final long serialVersionUID = 1L;

private final GenericRow reusedMeta;
private final JoinedRow reusedKeyWithMeta;

public KeyValueThinSerializer(RowType keyType, RowType valueType) {
super(KeyValue.schema(keyType, valueType));

this.reusedMeta = new GenericRow(2);
this.reusedKeyWithMeta = new JoinedRow();
}

public InternalRow toRow(KeyValue record) {
return toRow(record.sequenceNumber(), record.valueKind(), record.value());
}

public InternalRow toRow(long sequenceNumber, RowKind valueKind, InternalRow value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest add a comment here.
record.key() is not write into row

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already comment before class definition.

reusedMeta.setField(0, sequenceNumber);
reusedMeta.setField(1, valueKind.toByteValue());
return reusedKeyWithMeta.replace(reusedMeta, value);
}

@Override
public KeyValue fromRow(InternalRow row) {
throw new UnsupportedOperationException(
"KeyValue cannot be deserialized from InternalRow by this serializer.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
Expand All @@ -56,13 +55,13 @@
* <p>NOTE: records given to the writer must be sorted because it does not compare the min max keys
* to produce {@link DataFileMeta}.
*/
public class KeyValueDataFileWriter
public abstract class KeyValueDataFileWriter
extends StatsCollectingSingleFileWriter<KeyValue, DataFileMeta> {

private static final Logger LOG = LoggerFactory.getLogger(KeyValueDataFileWriter.class);

private final RowType keyType;
private final RowType valueType;
protected final RowType keyType;
protected final RowType valueType;
private final long schemaId;
private final int level;

Expand All @@ -85,6 +84,7 @@ public KeyValueDataFileWriter(
Function<KeyValue, InternalRow> converter,
RowType keyType,
RowType valueType,
RowType writeRowType,
@Nullable SimpleStatsExtractor simpleStatsExtractor,
long schemaId,
int level,
Expand All @@ -97,11 +97,11 @@ public KeyValueDataFileWriter(
factory,
path,
converter,
KeyValue.schema(keyType, valueType),
writeRowType,
simpleStatsExtractor,
compression,
StatsCollectorFactories.createStatsFactories(
options, KeyValue.schema(keyType, valueType).getFieldNames()),
options, writeRowType.getFieldNames(), keyType.getFieldNames()),
options.asyncFileWrite());

this.keyType = keyType;
Expand Down Expand Up @@ -166,17 +166,11 @@ public DataFileMeta result() throws IOException {
return null;
}

SimpleColStats[] rowStats = fieldStats();
int numKeyFields = keyType.getFieldCount();

SimpleColStats[] keyFieldStats = Arrays.copyOfRange(rowStats, 0, numKeyFields);
SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyFieldStats);

SimpleColStats[] valFieldStats =
Arrays.copyOfRange(rowStats, numKeyFields + 2, rowStats.length);
Pair<SimpleColStats[], SimpleColStats[]> keyValueStats = fetchKeyValueStats(fieldStats());

SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyValueStats.getKey());
Pair<List<String>, SimpleStats> valueStatsPair =
valueStatsConverter.toBinary(valFieldStats);
valueStatsConverter.toBinary(keyValueStats.getValue());

DataFileIndexWriter.FileIndexResult indexResult =
dataFileIndexWriter == null
Expand Down Expand Up @@ -204,6 +198,8 @@ public DataFileMeta result() throws IOException {
valueStatsPair.getKey());
}

abstract Pair<SimpleColStats[], SimpleColStats[]> fetchKeyValueStats(SimpleColStats[] rowStats);

@Override
public void close() throws IOException {
if (dataFileIndexWriter != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleColStats;
import org.apache.paimon.format.SimpleStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.function.Function;

/** Write data files containing {@link KeyValue}s. */
public class KeyValueDataFileWriterImpl extends KeyValueDataFileWriter {

public KeyValueDataFileWriterImpl(
FileIO fileIO,
FormatWriterFactory factory,
Path path,
Function<KeyValue, InternalRow> converter,
RowType keyType,
RowType valueType,
@Nullable SimpleStatsExtractor simpleStatsExtractor,
long schemaId,
int level,
String compression,
CoreOptions options,
FileSource fileSource,
FileIndexOptions fileIndexOptions) {
super(
fileIO,
factory,
path,
converter,
keyType,
valueType,
KeyValue.schema(keyType, valueType),
simpleStatsExtractor,
schemaId,
level,
compression,
options,
fileSource,
fileIndexOptions);
}

@Override
Pair<SimpleColStats[], SimpleColStats[]> fetchKeyValueStats(SimpleColStats[] rowStats) {
int numKeyFields = keyType.getFieldCount();
return Pair.of(
Arrays.copyOfRange(rowStats, 0, numKeyFields),
Arrays.copyOfRange(rowStats, numKeyFields + 2, rowStats.length));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueSerializer;
import org.apache.paimon.KeyValueThinSerializer;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fileindex.FileIndexOptions;
Expand Down Expand Up @@ -107,21 +108,35 @@ public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWrite

private KeyValueDataFileWriter createDataFileWriter(
Path path, int level, FileSource fileSource) {
KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType);
return new KeyValueDataFileWriter(
fileIO,
formatContext.writerFactory(level),
path,
kvSerializer::toRow,
keyType,
valueType,
formatContext.extractor(level),
schemaId,
level,
formatContext.compression(level),
options,
fileSource,
fileIndexOptions);
return options.thinMode()
? new KeyValueThinDataFileWriterImpl(
fileIO,
formatContext.writerFactory(level),
path,
new KeyValueThinSerializer(keyType, valueType)::toRow,
keyType,
valueType,
formatContext.extractor(level),
schemaId,
level,
formatContext.compression(level),
options,
fileSource,
fileIndexOptions)
: new KeyValueDataFileWriterImpl(
fileIO,
formatContext.writerFactory(level),
path,
new KeyValueSerializer(keyType, valueType)::toRow,
keyType,
valueType,
formatContext.extractor(level),
schemaId,
level,
formatContext.compression(level),
options,
fileSource,
fileIndexOptions);
}

public void deleteFile(String filename, int level) {
Expand Down Expand Up @@ -191,12 +206,12 @@ private Builder(

public KeyValueFileWriterFactory build(
BinaryRow partition, int bucket, CoreOptions options) {
RowType fileRowType = KeyValue.schema(keyType, valueType);
WriteFormatContext context =
new WriteFormatContext(
partition,
bucket,
fileRowType,
keyType,
KeyValue.schema(options.thinMode() ? RowType.of() : keyType, valueType),
fileFormat,
format2PathFactory,
options);
Expand All @@ -217,6 +232,7 @@ private static class WriteFormatContext {
private WriteFormatContext(
BinaryRow partition,
int bucket,
RowType keyType,
RowType rowType,
FileFormat defaultFormat,
Map<String, FileStorePathFactory> parentFactories,
Expand All @@ -236,7 +252,8 @@ private WriteFormatContext(
this.format2PathFactory = new HashMap<>();
this.format2WriterFactory = new HashMap<>();
SimpleColStatsCollector.Factory[] statsCollectorFactories =
StatsCollectorFactories.createStatsFactories(options, rowType.getFieldNames());
StatsCollectorFactories.createStatsFactories(
options, rowType.getFieldNames(), keyType.getFieldNames());
for (String format : parentFactories.keySet()) {
format2PathFactory.put(
format,
Expand Down
Loading
Loading