Skip to content

Commit

Permalink
Step4: support read with delete map
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Feb 7, 2024
1 parent 0c461d1 commit ef16833
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,19 @@ public KeyValueFileStoreScan newScan() {

@Override
public KeyValueFileStoreRead newRead() {
IndexMaintainer.Factory<KeyValue, DeleteIndex> deleteMapFactory = null;
if (options.deleteMapEnabled()) {
deleteMapFactory = new DeleteMapIndexMaintainer.Factory(newIndexFileHandler());
}
return new KeyValueFileStoreRead(
schemaManager,
schemaId,
keyType,
valueType,
newKeyComparator(),
mfFactory,
newReaderFactoryBuilder());
newReaderFactoryBuilder(),
deleteMapFactory);
}

public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.index.delete;

import org.apache.paimon.KeyValue;
import org.apache.paimon.reader.RecordReader;

import javax.annotation.Nullable;

import java.io.IOException;

/** 1. */
public class ApplyDeleteIndexReader implements RecordReader<KeyValue> {

private final RecordReader<KeyValue> reader;

private final DeleteIndex deleteIndex;

public ApplyDeleteIndexReader(RecordReader<KeyValue> reader, DeleteIndex deleteIndex) {
this.reader = reader;
this.deleteIndex = deleteIndex;
}

@Nullable
@Override
public RecordIterator<KeyValue> readBatch() throws IOException {
RecordIterator<KeyValue> batch = reader.readBatch();

if (batch == null) {
return null;
}

return new RecordIterator<KeyValue>() {
@Override
public KeyValue next() throws IOException {
while (true) {
KeyValue kv = batch.next();
if (kv == null) {
return null;
}

if (!deleteIndex.isDeleted(kv.position())) {
return kv;
}
}
}

@Override
public void releaseBatch() {
batch.releaseBatch();
}
};
}

@Override
public void close() throws IOException {
reader.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatKey;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.index.delete.ApplyDeleteIndexReader;
import org.apache.paimon.index.delete.DeleteIndex;
import org.apache.paimon.partition.PartitionUtils;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
Expand All @@ -42,6 +45,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

/** Factory to create {@link RecordReader}s for reading {@link KeyValue} files. */
Expand All @@ -60,6 +65,7 @@ public class KeyValueFileReaderFactory {
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final BinaryRow partition;
private final boolean deleteMapEnabled;
private final Function<String, Optional<DeleteIndex>> fileNameToDeleteIndex;

private KeyValueFileReaderFactory(
FileIO fileIO,
Expand All @@ -71,7 +77,8 @@ private KeyValueFileReaderFactory(
DataFilePathFactory pathFactory,
long asyncThreshold,
BinaryRow partition,
boolean deleteMapEnabled) {
boolean deleteMapEnabled,
Function<String, Optional<DeleteIndex>> fileNameToDeleteIndex) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
Expand All @@ -83,6 +90,7 @@ private KeyValueFileReaderFactory(
this.partition = partition;
this.bulkFormatMappings = new HashMap<>();
this.deleteMapEnabled = deleteMapEnabled;
this.fileNameToDeleteIndex = fileNameToDeleteIndex;
}

public RecordReader<KeyValue> createRecordReader(
Expand Down Expand Up @@ -117,18 +125,26 @@ private RecordReader<KeyValue> createRecordReader(
new FormatKey(schemaId, formatIdentifier),
key -> formatSupplier.get())
: formatSupplier.get();
return new KeyValueDataFileRecordReader(
fileIO,
bulkFormatMapping.getReaderFactory(),
pathFactory.toPath(fileName),
keyType,
valueType,
level,
poolSize,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition),
deleteMapEnabled);

RecordReader<KeyValue> recordReader =
new KeyValueDataFileRecordReader(
fileIO,
bulkFormatMapping.getReaderFactory(),
pathFactory.toPath(fileName),
keyType,
valueType,
level,
poolSize,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition),
deleteMapEnabled);

Optional<DeleteIndex> deleteIndexOptional = fileNameToDeleteIndex.apply(fileName);
if (deleteIndexOptional.isPresent()) {
recordReader = new ApplyDeleteIndexReader(recordReader, deleteIndexOptional.get());
}
return recordReader;
}

public static Builder builder(
Expand Down Expand Up @@ -231,18 +247,25 @@ public RowType projectedValueType() {
return projectedValueType;
}

public KeyValueFileReaderFactory build(BinaryRow partition, int bucket) {
return build(partition, bucket, true, Collections.emptyList());
public KeyValueFileReaderFactory build(
BinaryRow partition,
int bucket,
@Nullable IndexMaintainer<KeyValue, DeleteIndex> deleteMapMaintainer) {
return build(partition, bucket, true, Collections.emptyList(), deleteMapMaintainer);
}

public KeyValueFileReaderFactory build(
BinaryRow partition,
int bucket,
boolean projectKeys,
@Nullable List<Predicate> filters) {
@Nullable List<Predicate> filters,
@Nullable IndexMaintainer<KeyValue, DeleteIndex> deleteMapMaintainer) {
int[][] keyProjection = projectKeys ? this.keyProjection : fullKeyProjection;
RowType projectedKeyType = projectKeys ? this.projectedKeyType : keyType;

Function<String, Optional<DeleteIndex>> fileNameToDeleteIndex =
deleteMapMaintainer == null
? (fileName) -> Optional.empty()
: deleteMapMaintainer::indexOf;
return new KeyValueFileReaderFactory(
fileIO,
schemaManager,
Expand All @@ -254,7 +277,8 @@ public KeyValueFileReaderFactory build(
pathFactory.createDataFilePathFactory(partition, bucket),
options.fileReaderAsyncThreshold().getBytes(),
partition,
options.deleteMapEnabled());
options.deleteMapEnabled(),
fileNameToDeleteIndex);
}

private void applyProjection() {
Expand Down
Loading

0 comments on commit ef16833

Please sign in to comment.