Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] File index result should be able to process and/or. #3422

Merged
merged 3 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,33 @@
import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.types.RowType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.fileindex.FileIndexResult.REMAIN;

/** Utils to check secondary index (e.g. bloom filter) predicate. */
public class FileIndexPredicate implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(FileIndexPredicate.class);

private final FileIndexFormat.Reader reader;
private final Map<String, FileIndexFieldPredicate> fieldPredicates = new HashMap<>();

@Nullable private Path path;

public FileIndexPredicate(Path path, FileIO fileIO, RowType fileRowType) throws IOException {
this(fileIO.newInputStream(path), fileRowType);
this.path = path;
}

public FileIndexPredicate(byte[] serializedBytes, RowType fileRowType) {
Expand All @@ -67,22 +74,13 @@ public boolean testPredicate(@Nullable Predicate filePredicate) {

Set<String> requredFieldNames = getRequiredNames(filePredicate);

List<FileIndexFieldPredicate> testWorkers =
requredFieldNames.stream()
.map(
cname ->
fieldPredicates.computeIfAbsent(
cname,
k ->
new FileIndexFieldPredicate(
cname,
reader.readColumnIndex(cname))))
.collect(Collectors.toList());

for (FileIndexFieldPredicate testWorker : testWorkers) {
if (!testWorker.test(filePredicate)) {
return false;
}
Map<String, Collection<FileIndexReader>> indexReaders = new HashMap<>();
requredFieldNames.forEach(name -> indexReaders.put(name, reader.readColumnIndex(name)));
if (!new FileIndexPredicateTest(indexReaders).test(filePredicate).remain()) {
LOG.debug(
"One file has been filtered: "
+ (path == null ? "in scan stage" : path.toString()));
return false;
}
return true;
}
Expand Down Expand Up @@ -114,55 +112,62 @@ public void close() throws IOException {
}

/** Predicate test worker. */
private static class FileIndexFieldPredicate implements PredicateVisitor<Boolean> {
private static class FileIndexPredicateTest implements PredicateVisitor<FileIndexResult> {

private final String columnName;
private final Collection<FileIndexReader> fileIndexReaders;
private final Map<String, Collection<FileIndexReader>> columnIndexReaders;

public FileIndexFieldPredicate(
String columnName, Collection<FileIndexReader> fileIndexReaders) {
this.columnName = columnName;
this.fileIndexReaders = fileIndexReaders;
public FileIndexPredicateTest(Map<String, Collection<FileIndexReader>> fileIndexReaders) {
this.columnIndexReaders = fileIndexReaders;
}

public Boolean test(Predicate predicate) {
public FileIndexResult test(Predicate predicate) {
return predicate.visit(this);
}

@Override
public Boolean visit(LeafPredicate predicate) {
if (columnName.equals(predicate.fieldName())) {
FieldRef fieldRef =
new FieldRef(predicate.index(), predicate.fieldName(), predicate.type());
for (FileIndexReader fileIndexReader : fileIndexReaders) {
if (!predicate
.function()
.visit(fileIndexReader, fieldRef, predicate.literals())) {
return false;
}
public FileIndexResult visit(LeafPredicate predicate) {
FileIndexResult compoundResult = REMAIN;
FieldRef fieldRef =
new FieldRef(predicate.index(), predicate.fieldName(), predicate.type());
for (FileIndexReader fileIndexReader : columnIndexReaders.get(predicate.fieldName())) {
compoundResult =
compoundResult.and(
predicate
.function()
.visit(fileIndexReader, fieldRef, predicate.literals()));

if (!compoundResult.remain()) {
return compoundResult;
}
}
return true;
return compoundResult;
}

@Override
public Boolean visit(CompoundPredicate predicate) {

public FileIndexResult visit(CompoundPredicate predicate) {
if (predicate.function() instanceof Or) {
FileIndexResult compoundResult = null;
for (Predicate predicate1 : predicate.children()) {
if (predicate1.visit(this)) {
return true;
}
compoundResult =
compoundResult == null
? predicate1.visit(this)
: compoundResult.or(predicate1.visit(this));
}
return false;
return compoundResult == null ? REMAIN : compoundResult;

} else {
FileIndexResult compoundResult = null;
for (Predicate predicate1 : predicate.children()) {
if (!predicate1.visit(this)) {
return false;
compoundResult =
compoundResult == null
? predicate1.visit(this)
: compoundResult.and(predicate1.visit(this));
// if not remain, no need to test anymore
if (!compoundResult.remain()) {
return compoundResult;
}
}
return true;
return compoundResult == null ? REMAIN : compoundResult;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,84 +23,90 @@

import java.util.List;

import static org.apache.paimon.fileindex.FileIndexResult.REMAIN;

/**
* Read file index from serialized bytes. Return true, means we need to search this file, else means
* needn't.
*/
public abstract class FileIndexReader implements FunctionVisitor<Boolean> {
public abstract class FileIndexReader implements FunctionVisitor<FileIndexResult> {

@Override
public Boolean visitIsNotNull(FieldRef fieldRef) {
return true;
public FileIndexResult visitIsNotNull(FieldRef fieldRef) {
return REMAIN;
}

@Override
public Boolean visitIsNull(FieldRef fieldRef) {
return true;
public FileIndexResult visitIsNull(FieldRef fieldRef) {
return REMAIN;
}

@Override
public Boolean visitStartsWith(FieldRef fieldRef, Object literal) {
return true;
public FileIndexResult visitStartsWith(FieldRef fieldRef, Object literal) {
return REMAIN;
}

@Override
public Boolean visitLessThan(FieldRef fieldRef, Object literal) {
return true;
public FileIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
return REMAIN;
}

@Override
public Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
return true;
public FileIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
return REMAIN;
}

@Override
public Boolean visitNotEqual(FieldRef fieldRef, Object literal) {
return true;
public FileIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
return REMAIN;
}

@Override
public Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) {
return true;
public FileIndexResult visitLessOrEqual(FieldRef fieldRef, Object literal) {
return REMAIN;
}

@Override
public Boolean visitEqual(FieldRef fieldRef, Object literal) {
return true;
public FileIndexResult visitEqual(FieldRef fieldRef, Object literal) {
return REMAIN;
}

@Override
public Boolean visitGreaterThan(FieldRef fieldRef, Object literal) {
return true;
public FileIndexResult visitGreaterThan(FieldRef fieldRef, Object literal) {
return REMAIN;
}

@Override
public Boolean visitIn(FieldRef fieldRef, List<Object> literals) {
public FileIndexResult visitIn(FieldRef fieldRef, List<Object> literals) {
FileIndexResult fileIndexResult = null;
for (Object key : literals) {
if (visitEqual(fieldRef, key)) {
return true;
}
fileIndexResult =
fileIndexResult == null
? visitEqual(fieldRef, key)
: fileIndexResult.or(visitEqual(fieldRef, key));
}
return false;
return fileIndexResult;
}

@Override
public Boolean visitNotIn(FieldRef fieldRef, List<Object> literals) {
public FileIndexResult visitNotIn(FieldRef fieldRef, List<Object> literals) {
FileIndexResult fileIndexResult = null;
for (Object key : literals) {
if (visitNotEqual(fieldRef, key)) {
return true;
}
fileIndexResult =
fileIndexResult == null
? visitNotEqual(fieldRef, key)
: fileIndexResult.or(visitNotEqual(fieldRef, key));
}
return false;
return fileIndexResult;
}

@Override
public Boolean visitAnd(List<Boolean> children) {
public FileIndexResult visitAnd(List<FileIndexResult> children) {
throw new UnsupportedOperationException("Should not invoke this");
}

@Override
public Boolean visitOr(List<Boolean> children) {
public FileIndexResult visitOr(List<FileIndexResult> children) {
throw new UnsupportedOperationException("Should not invoke this");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex;

/** File index result to decide whether filter a file. */
public interface FileIndexResult {

FileIndexResult REMAIN =
new FileIndexResult() {
@Override
public boolean remain() {
return true;
}

@Override
public FileIndexResult and(FileIndexResult fileIndexResult) {
return fileIndexResult;
}

@Override
public FileIndexResult or(FileIndexResult fileIndexResult) {
return this;
}
};

FileIndexResult SKIP =
new FileIndexResult() {
@Override
public boolean remain() {
return false;
}

@Override
public FileIndexResult and(FileIndexResult fileIndexResult) {
return this;
}

@Override
public FileIndexResult or(FileIndexResult fileIndexResult) {
return fileIndexResult;
}
};

boolean remain();

FileIndexResult and(FileIndexResult fileIndexResult);

FileIndexResult or(FileIndexResult fileIndexResult);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.fileindex.bloomfilter;

import org.apache.paimon.fileindex.FileIndexReader;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fileindex.FileIndexWriter;
import org.apache.paimon.fileindex.FileIndexer;
import org.apache.paimon.options.Options;
Expand All @@ -29,6 +30,9 @@

import org.apache.hadoop.util.bloom.HashFunction;

import static org.apache.paimon.fileindex.FileIndexResult.REMAIN;
import static org.apache.paimon.fileindex.FileIndexResult.SKIP;

/**
* Bloom filter for file index.
*
Expand Down Expand Up @@ -118,8 +122,8 @@ public Reader(DataType type, byte[] serializedBytes) {
}

@Override
public Boolean visitEqual(FieldRef fieldRef, Object key) {
return key == null || filter.testHash(hashFunction.hash(key));
public FileIndexResult visitEqual(FieldRef fieldRef, Object key) {
return key == null || filter.testHash(hashFunction.hash(key)) ? REMAIN : SKIP;
}
}
}
Loading
Loading