Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
仟弋 committed Dec 10, 2024
1 parent df6c353 commit e019053
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StatsCollectorFactories;
Expand All @@ -44,9 +43,7 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;
Expand All @@ -58,13 +55,13 @@
* <p>NOTE: records given to the writer must be sorted because it does not compare the min max keys
* to produce {@link DataFileMeta}.
*/
public class KeyValueDataFileWriter
public abstract class KeyValueDataFileWriter
extends StatsCollectingSingleFileWriter<KeyValue, DataFileMeta> {

private static final Logger LOG = LoggerFactory.getLogger(KeyValueDataFileWriter.class);

private final RowType keyType;
private final RowType valueType;
protected final RowType keyType;
protected final RowType valueType;
private final long schemaId;
private final int level;

Expand All @@ -79,7 +76,6 @@ public class KeyValueDataFileWriter
private long minSeqNumber = Long.MAX_VALUE;
private long maxSeqNumber = Long.MIN_VALUE;
private long deleteRecordCount = 0;
private final StateAbstractor stateAbstractor;

public KeyValueDataFileWriter(
FileIO fileIO,
Expand Down Expand Up @@ -122,7 +118,6 @@ public KeyValueDataFileWriter(
this.dataFileIndexWriter =
DataFileIndexWriter.create(
fileIO, dataFileToFileIndexPath(path), valueType, fileIndexOptions);
this.stateAbstractor = new StateAbstractor(keyType, valueType, options.thinMode());
}

@Override
Expand Down Expand Up @@ -173,8 +168,7 @@ public DataFileMeta result() throws IOException {
return null;
}

Pair<SimpleColStats[], SimpleColStats[]> keyValueStats =
stateAbstractor.fetchKeyValueStats(fieldStats());
Pair<SimpleColStats[], SimpleColStats[]> keyValueStats = fetchKeyValueStats(fieldStats());

SimpleStats keyStats = keyStatsConverter.toBinaryAllMode(keyValueStats.getKey());
Pair<List<String>, SimpleStats> valueStatsPair =
Expand Down Expand Up @@ -206,63 +200,13 @@ public DataFileMeta result() throws IOException {
valueStatsPair.getKey());
}

abstract Pair<SimpleColStats[], SimpleColStats[]> fetchKeyValueStats(SimpleColStats[] rowStats);

@Override
public void close() throws IOException {
if (dataFileIndexWriter != null) {
dataFileIndexWriter.close();
}
super.close();
}

private static class StateAbstractor {
private final int numKeyFields;
private final int numValueFields;
// if keyStatMapping is not null, means thin mode on.
@Nullable private final int[] keyStatMapping;

public StateAbstractor(RowType keyType, RowType valueType, boolean thinMode) {
this.numKeyFields = keyType.getFieldCount();
this.numValueFields = valueType.getFieldCount();
Map<Integer, Integer> idToIndex = new HashMap<>();
for (int i = 0; i < valueType.getFieldCount(); i++) {
idToIndex.put(valueType.getFields().get(i).id(), i);
}
if (thinMode) {
this.keyStatMapping = new int[keyType.getFieldCount()];
for (int i = 0; i < keyType.getFieldCount(); i++) {
keyStatMapping[i] =
idToIndex.get(
keyType.getFields().get(i).id()
- SpecialFields.KEY_FIELD_ID_START);
}
} else {
this.keyStatMapping = null;
}
}

Pair<SimpleColStats[], SimpleColStats[]> fetchKeyValueStats(SimpleColStats[] rowStats) {
SimpleColStats[] keyStats = new SimpleColStats[numKeyFields];
SimpleColStats[] valFieldStats = new SimpleColStats[numValueFields];

// If thin mode only, there is no key stats in rowStats, so we only jump
// _SEQUNCE_NUMBER_ and _ROW_KIND_ stats. Therefore, the 'from' value is 2.
// Otherwise, we need to jump key stats, so the 'from' value is numKeyFields + 2.
int valueFrom = thinMode() ? 2 : numKeyFields + 2;
System.arraycopy(rowStats, valueFrom, valFieldStats, 0, numValueFields);
if (thinMode()) {
// Thin mode on, so need to map value stats to key stats.
for (int i = 0; i < keyStatMapping.length; i++) {
keyStats[i] = valFieldStats[keyStatMapping[i]];
}
} else {
// Thin mode off, just copy stats from rowStats.
System.arraycopy(valFieldStats, 0, keyStats, 0, numKeyFields);
}
return Pair.of(keyStats, valFieldStats);
}

private boolean thinMode() {
return keyStatMapping != null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleColStats;
import org.apache.paimon.format.SimpleStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.function.Function;

/** Write data files containing {@link KeyValue}s. */
public class KeyValueDataFileWriterImpl extends KeyValueDataFileWriter {

public KeyValueDataFileWriterImpl(
FileIO fileIO,
FormatWriterFactory factory,
Path path,
Function<KeyValue, InternalRow> converter,
RowType keyType,
RowType valueType,
@Nullable SimpleStatsExtractor simpleStatsExtractor,
long schemaId,
int level,
String compression,
CoreOptions options,
FileSource fileSource,
FileIndexOptions fileIndexOptions) {
super(
fileIO,
factory,
path,
converter,
keyType,
valueType,
simpleStatsExtractor,
schemaId,
level,
compression,
options,
fileSource,
fileIndexOptions);
}

@Override
Pair<SimpleColStats[], SimpleColStats[]> fetchKeyValueStats(SimpleColStats[] rowStats) {
int numKeyFields = keyType.getFieldCount();
int numValueFields = valueType.getFieldCount();
return Pair.of(
Arrays.copyOfRange(rowStats, 0, numKeyFields),
Arrays.copyOfRange(rowStats, numKeyFields + 2, numValueFields));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ObjectSerializer;
import org.apache.paimon.utils.StatsCollectorFactories;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -109,24 +108,35 @@ public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWrite

private KeyValueDataFileWriter createDataFileWriter(
Path path, int level, FileSource fileSource) {
ObjectSerializer<KeyValue> kvSerializer =
options.thinMode()
? new KeyValueThinSerializer(keyType, valueType)
: new KeyValueSerializer(keyType, valueType);
return new KeyValueDataFileWriter(
fileIO,
formatContext.writerFactory(level),
path,
kvSerializer::toRow,
keyType,
valueType,
formatContext.extractor(level),
schemaId,
level,
formatContext.compression(level),
options,
fileSource,
fileIndexOptions);
return options.thinMode()
? new KeyValueThinDataFileWriterImpl(
fileIO,
formatContext.writerFactory(level),
path,
new KeyValueThinSerializer(keyType, valueType)::toRow,
keyType,
valueType,
formatContext.extractor(level),
schemaId,
level,
formatContext.compression(level),
options,
fileSource,
fileIndexOptions)
: new KeyValueDataFileWriterImpl(
fileIO,
formatContext.writerFactory(level),
path,
new KeyValueSerializer(keyType, valueType)::toRow,
keyType,
valueType,
formatContext.extractor(level),
schemaId,
level,
formatContext.compression(level),
options,
fileSource,
fileIndexOptions);
}

public void deleteFile(String filename, int level) {
Expand Down
Loading

0 comments on commit e019053

Please sign in to comment.