From e01905353d440411592eacfa1da2b964c51a93ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 17:11:51 +0800 Subject: [PATCH] fix comment --- .../paimon/io/KeyValueDataFileWriter.java | 68 +-------- .../paimon/io/KeyValueDataFileWriterImpl.java | 80 +++++++++++ .../paimon/io/KeyValueFileWriterFactory.java | 48 ++++--- .../io/KeyValueThinDataFileWriterImpl.java | 130 ++++++++++++++++++ 4 files changed, 245 insertions(+), 81 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index 60b4885ab2a4..2e981ac507fb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -32,7 +32,6 @@ import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.stats.SimpleStatsConverter; -import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.StatsCollectorFactories; @@ -44,9 +43,7 @@ import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.function.Function; import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath; @@ -58,13 +55,13 @@ *

NOTE: records given to the writer must be sorted because it does not compare the min max keys * to produce {@link DataFileMeta}. */ -public class KeyValueDataFileWriter +public abstract class KeyValueDataFileWriter extends StatsCollectingSingleFileWriter { private static final Logger LOG = LoggerFactory.getLogger(KeyValueDataFileWriter.class); - private final RowType keyType; - private final RowType valueType; + protected final RowType keyType; + protected final RowType valueType; private final long schemaId; private final int level; @@ -79,7 +76,6 @@ public class KeyValueDataFileWriter private long minSeqNumber = Long.MAX_VALUE; private long maxSeqNumber = Long.MIN_VALUE; private long deleteRecordCount = 0; - private final StateAbstractor stateAbstractor; public KeyValueDataFileWriter( FileIO fileIO, @@ -122,7 +118,6 @@ public KeyValueDataFileWriter( this.dataFileIndexWriter = DataFileIndexWriter.create( fileIO, dataFileToFileIndexPath(path), valueType, fileIndexOptions); - this.stateAbstractor = new StateAbstractor(keyType, valueType, options.thinMode()); } @Override @@ -173,8 +168,7 @@ public DataFileMeta result() throws IOException { return null; } - Pair keyValueStats = - stateAbstractor.fetchKeyValueStats(fieldStats()); + Pair keyValueStats = fetchKeyValueStats(fieldStats()); SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyValueStats.getKey()); Pair, SimpleStats> valueStatsPair = @@ -206,6 +200,8 @@ public DataFileMeta result() throws IOException { valueStatsPair.getKey()); } + abstract Pair fetchKeyValueStats(SimpleColStats[] rowStats); + @Override public void close() throws IOException { if (dataFileIndexWriter != null) { @@ -213,56 +209,4 @@ public void close() throws IOException { } super.close(); } - - private static class StateAbstractor { - private final int numKeyFields; - private final int numValueFields; - // if keyStatMapping is not null, means thin mode on. - @Nullable private final int[] keyStatMapping; - - public StateAbstractor(RowType keyType, RowType valueType, boolean thinMode) { - this.numKeyFields = keyType.getFieldCount(); - this.numValueFields = valueType.getFieldCount(); - Map idToIndex = new HashMap<>(); - for (int i = 0; i < valueType.getFieldCount(); i++) { - idToIndex.put(valueType.getFields().get(i).id(), i); - } - if (thinMode) { - this.keyStatMapping = new int[keyType.getFieldCount()]; - for (int i = 0; i < keyType.getFieldCount(); i++) { - keyStatMapping[i] = - idToIndex.get( - keyType.getFields().get(i).id() - - SpecialFields.KEY_FIELD_ID_START); - } - } else { - this.keyStatMapping = null; - } - } - - Pair fetchKeyValueStats(SimpleColStats[] rowStats) { - SimpleColStats[] keyStats = new SimpleColStats[numKeyFields]; - SimpleColStats[] valFieldStats = new SimpleColStats[numValueFields]; - - // If thin mode only, there is no key stats in rowStats, so we only jump - // _SEQUNCE_NUMBER_ and _ROW_KIND_ stats. Therefore, the 'from' value is 2. - // Otherwise, we need to jump key stats, so the 'from' value is numKeyFields + 2. - int valueFrom = thinMode() ? 2 : numKeyFields + 2; - System.arraycopy(rowStats, valueFrom, valFieldStats, 0, numValueFields); - if (thinMode()) { - // Thin mode on, so need to map value stats to key stats. - for (int i = 0; i < keyStatMapping.length; i++) { - keyStats[i] = valFieldStats[keyStatMapping[i]]; - } - } else { - // Thin mode off, just copy stats from rowStats. - System.arraycopy(valFieldStats, 0, keyStats, 0, numKeyFields); - } - return Pair.of(keyStats, valFieldStats); - } - - private boolean thinMode() { - return keyStatMapping != null; - } - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java new file mode 100644 index 000000000000..fd69377661f8 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.SimpleColStats; +import org.apache.paimon.format.SimpleStatsExtractor; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.function.Function; + +/** Write data files containing {@link KeyValue}s. */ +public class KeyValueDataFileWriterImpl extends KeyValueDataFileWriter { + + public KeyValueDataFileWriterImpl( + FileIO fileIO, + FormatWriterFactory factory, + Path path, + Function converter, + RowType keyType, + RowType valueType, + @Nullable SimpleStatsExtractor simpleStatsExtractor, + long schemaId, + int level, + String compression, + CoreOptions options, + FileSource fileSource, + FileIndexOptions fileIndexOptions) { + super( + fileIO, + factory, + path, + converter, + keyType, + valueType, + simpleStatsExtractor, + schemaId, + level, + compression, + options, + fileSource, + fileIndexOptions); + } + + @Override + Pair fetchKeyValueStats(SimpleColStats[] rowStats) { + int numKeyFields = keyType.getFieldCount(); + int numValueFields = valueType.getFieldCount(); + return Pair.of( + Arrays.copyOfRange(rowStats, 0, numKeyFields), + Arrays.copyOfRange(rowStats, numKeyFields + 2, numValueFields)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 87f53ab5b013..c1996936c8a3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -34,7 +34,6 @@ import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; -import org.apache.paimon.utils.ObjectSerializer; import org.apache.paimon.utils.StatsCollectorFactories; import javax.annotation.Nullable; @@ -109,24 +108,35 @@ public RollingFileWriter createRollingChangelogFileWrite private KeyValueDataFileWriter createDataFileWriter( Path path, int level, FileSource fileSource) { - ObjectSerializer kvSerializer = - options.thinMode() - ? new KeyValueThinSerializer(keyType, valueType) - : new KeyValueSerializer(keyType, valueType); - return new KeyValueDataFileWriter( - fileIO, - formatContext.writerFactory(level), - path, - kvSerializer::toRow, - keyType, - valueType, - formatContext.extractor(level), - schemaId, - level, - formatContext.compression(level), - options, - fileSource, - fileIndexOptions); + return options.thinMode() + ? new KeyValueThinDataFileWriterImpl( + fileIO, + formatContext.writerFactory(level), + path, + new KeyValueThinSerializer(keyType, valueType)::toRow, + keyType, + valueType, + formatContext.extractor(level), + schemaId, + level, + formatContext.compression(level), + options, + fileSource, + fileIndexOptions) + : new KeyValueDataFileWriterImpl( + fileIO, + formatContext.writerFactory(level), + path, + new KeyValueSerializer(keyType, valueType)::toRow, + keyType, + valueType, + formatContext.extractor(level), + schemaId, + level, + formatContext.compression(level), + options, + fileSource, + fileIndexOptions); } public void deleteFile(String filename, int level) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java new file mode 100644 index 000000000000..1b93645a7b5e --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.SimpleColStats; +import org.apache.paimon.format.SimpleStatsExtractor; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +/** + * Implementation of KeyValueDataFileWriter for thin data files. Thin data files only contain + * _SEQUENCE_NUMBER_, _ROW_KIND_ and value fields. + */ +public class KeyValueThinDataFileWriterImpl extends KeyValueDataFileWriter { + + private final int[] keyStatMapping; + + /** + * Constructs a KeyValueThinDataFileWriterImpl. + * + * @param fileIO The file IO interface. + * @param factory The format writer factory. + * @param path The path to the file. + * @param converter The function to convert KeyValue to InternalRow. + * @param keyType The row type of the key. + * @param valueType The row type of the value. + * @param simpleStatsExtractor The simple stats extractor, can be null. + * @param schemaId The schema ID. + * @param level The level. + * @param compression The compression type. + * @param options The core options. + * @param fileSource The file source. + * @param fileIndexOptions The file index options. + */ + public KeyValueThinDataFileWriterImpl( + FileIO fileIO, + FormatWriterFactory factory, + Path path, + Function converter, + RowType keyType, + RowType valueType, + @Nullable SimpleStatsExtractor simpleStatsExtractor, + long schemaId, + int level, + String compression, + CoreOptions options, + FileSource fileSource, + FileIndexOptions fileIndexOptions) { + super( + fileIO, + factory, + path, + converter, + keyType, + valueType, + simpleStatsExtractor, + schemaId, + level, + compression, + options, + fileSource, + fileIndexOptions); + Map idToIndex = new HashMap<>(); + for (int i = 0; i < valueType.getFieldCount(); i++) { + idToIndex.put(valueType.getFields().get(i).id(), i); + } + this.keyStatMapping = new int[keyType.getFieldCount()]; + for (int i = 0; i < keyType.getFieldCount(); i++) { + keyStatMapping[i] = + idToIndex.get( + keyType.getFields().get(i).id() - SpecialFields.KEY_FIELD_ID_START); + } + } + + /** + * Fetches the key and value statistics. + * + * @param rowStats The row statistics. + * @return A pair of key statistics and value statistics. + */ + @Override + Pair fetchKeyValueStats(SimpleColStats[] rowStats) { + int numKeyFields = keyType.getFieldCount(); + int numValueFields = valueType.getFieldCount(); + + SimpleColStats[] keyStats = new SimpleColStats[numKeyFields]; + SimpleColStats[] valFieldStats = new SimpleColStats[numValueFields]; + + // If thin mode only, there is no key stats in rowStats, so we only jump + // _SEQUNCE_NUMBER_ and _ROW_KIND_ stats. Therefore, the 'from' value is 2. + System.arraycopy(rowStats, 2, valFieldStats, 0, numValueFields); + // Thin mode on, so need to map value stats to key stats. + for (int i = 0; i < keyStatMapping.length; i++) { + keyStats[i] = valFieldStats[keyStatMapping[i]]; + } + + return Pair.of(keyStats, valFieldStats); + } +}