Skip to content

Commit

Permalink
[core] File index result should be able to process and/or
Browse files Browse the repository at this point in the history
  • Loading branch information
yejunhao committed May 29, 2024
1 parent fd6a287 commit b23b886
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,34 @@
import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.types.RowType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.fileindex.FileIndexResult.REMAIN;
import static org.apache.paimon.fileindex.FileIndexResult.SKIP;

/** Utils to check secondary index (e.g. bloom filter) predicate. */
public class FileIndexPredicate implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(FileIndexPredicate.class);

private final FileIndexFormat.Reader reader;
private final Map<String, FileIndexFieldPredicate> fieldPredicates = new HashMap<>();

@Nullable private Path path;

public FileIndexPredicate(Path path, FileIO fileIO, RowType fileRowType) throws IOException {
this(fileIO.newInputStream(path), fileRowType);
this.path = path;
}

public FileIndexPredicate(byte[] serializedBytes, RowType fileRowType) {
Expand All @@ -67,22 +75,13 @@ public boolean testPredicate(@Nullable Predicate filePredicate) {

Set<String> requredFieldNames = getRequiredNames(filePredicate);

List<FileIndexFieldPredicate> testWorkers =
requredFieldNames.stream()
.map(
cname ->
fieldPredicates.computeIfAbsent(
cname,
k ->
new FileIndexFieldPredicate(
cname,
reader.readColumnIndex(cname))))
.collect(Collectors.toList());

for (FileIndexFieldPredicate testWorker : testWorkers) {
if (!testWorker.test(filePredicate)) {
return false;
}
Map<String, Collection<FileIndexReader>> indexReaders = new HashMap<>();
requredFieldNames.forEach(name -> indexReaders.put(name, reader.readColumnIndex(name)));
if (!new FileIndexPredicateTest(indexReaders).test(filePredicate).remain()) {
LOG.debug(
"One file has been filtered: "
+ (path == null ? "in scan stage" : path.toString()));
return false;
}
return true;
}
Expand Down Expand Up @@ -114,55 +113,55 @@ public void close() throws IOException {
}

/** Predicate test worker. */
private static class FileIndexFieldPredicate implements PredicateVisitor<Boolean> {
private static class FileIndexPredicateTest implements PredicateVisitor<FileIndexResult> {

private final String columnName;
private final Collection<FileIndexReader> fileIndexReaders;
private final Map<String, Collection<FileIndexReader>> columnIndexReaders;

public FileIndexFieldPredicate(
String columnName, Collection<FileIndexReader> fileIndexReaders) {
this.columnName = columnName;
this.fileIndexReaders = fileIndexReaders;
public FileIndexPredicateTest(Map<String, Collection<FileIndexReader>> fileIndexReaders) {
this.columnIndexReaders = fileIndexReaders;
}

public Boolean test(Predicate predicate) {
public FileIndexResult test(Predicate predicate) {
return predicate.visit(this);
}

@Override
public Boolean visit(LeafPredicate predicate) {
if (columnName.equals(predicate.fieldName())) {
FieldRef fieldRef =
new FieldRef(predicate.index(), predicate.fieldName(), predicate.type());
for (FileIndexReader fileIndexReader : fileIndexReaders) {
if (!predicate
.function()
.visit(fileIndexReader, fieldRef, predicate.literals())) {
return false;
}
public FileIndexResult visit(LeafPredicate predicate) {
FileIndexResult compoundResult = REMAIN;
FieldRef fieldRef =
new FieldRef(predicate.index(), predicate.fieldName(), predicate.type());
for (FileIndexReader fileIndexReader : columnIndexReaders.get(predicate.fieldName())) {
compoundResult =
compoundResult.and(
predicate
.function()
.visit(fileIndexReader, fieldRef, predicate.literals()));

if (!compoundResult.remain()) {
return compoundResult;
}
}
return true;
return compoundResult;
}

@Override
public Boolean visit(CompoundPredicate predicate) {

public FileIndexResult visit(CompoundPredicate predicate) {
if (predicate.function() instanceof Or) {
FileIndexResult compoundResult = SKIP;
for (Predicate predicate1 : predicate.children()) {
if (predicate1.visit(this)) {
return true;
}
compoundResult = compoundResult.or(predicate1.visit(this));
}
return false;
return compoundResult;

} else {
FileIndexResult compundResult = REMAIN;
for (Predicate predicate1 : predicate.children()) {
if (!predicate1.visit(this)) {
return false;
compundResult = compundResult.and(predicate1.visit(this));
if (!compundResult.remain()) {
return compundResult;
}
}
return true;
return compundResult;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,84 +23,90 @@

import java.util.List;

import static org.apache.paimon.fileindex.FileIndexResult.REMAIN;

/**
* Read file index from serialized bytes. Return true, means we need to search this file, else means
* needn't.
*/
public abstract class FileIndexReader implements FunctionVisitor<Boolean> {
public abstract class FileIndexReader implements FunctionVisitor<FileIndexResult> {

@Override
public Boolean visitIsNotNull(FieldRef fieldRef) {
return true;
public FileIndexResult visitIsNotNull(FieldRef fieldRef) {
return REMAIN;
}

@Override
public Boolean visitIsNull(FieldRef fieldRef) {
return true;
public FileIndexResult visitIsNull(FieldRef fieldRef) {
return REMAIN;
}

@Override
public Boolean visitStartsWith(FieldRef fieldRef, Object literal) {
return true;
public FileIndexResult visitStartsWith(FieldRef fieldRef, Object literal) {
return REMAIN;
}

@Override
public Boolean visitLessThan(FieldRef fieldRef, Object literal) {
return true;
public FileIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
return REMAIN;
}

@Override
public Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
return true;
public FileIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
return REMAIN;
}

@Override
public Boolean visitNotEqual(FieldRef fieldRef, Object literal) {
return true;
public FileIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
return REMAIN;
}

@Override
public Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) {
return true;
public FileIndexResult visitLessOrEqual(FieldRef fieldRef, Object literal) {
return REMAIN;
}

@Override
public Boolean visitEqual(FieldRef fieldRef, Object literal) {
return true;
public FileIndexResult visitEqual(FieldRef fieldRef, Object literal) {
return REMAIN;
}

@Override
public Boolean visitGreaterThan(FieldRef fieldRef, Object literal) {
return true;
public FileIndexResult visitGreaterThan(FieldRef fieldRef, Object literal) {
return REMAIN;
}

@Override
public Boolean visitIn(FieldRef fieldRef, List<Object> literals) {
public FileIndexResult visitIn(FieldRef fieldRef, List<Object> literals) {
FileIndexResult fileIndexResult = null;
for (Object key : literals) {
if (visitEqual(fieldRef, key)) {
return true;
}
fileIndexResult =
fileIndexResult == null
? visitEqual(fieldRef, key)
: fileIndexResult.or(visitEqual(fieldRef, key));
}
return false;
return fileIndexResult;
}

@Override
public Boolean visitNotIn(FieldRef fieldRef, List<Object> literals) {
public FileIndexResult visitNotIn(FieldRef fieldRef, List<Object> literals) {
FileIndexResult fileIndexResult = null;
for (Object key : literals) {
if (visitNotEqual(fieldRef, key)) {
return true;
}
fileIndexResult =
fileIndexResult == null
? visitNotEqual(fieldRef, key)
: fileIndexResult.or(visitNotEqual(fieldRef, key));
}
return false;
return fileIndexResult;
}

@Override
public Boolean visitAnd(List<Boolean> children) {
public FileIndexResult visitAnd(List<FileIndexResult> children) {
throw new UnsupportedOperationException("Should not invoke this");
}

@Override
public Boolean visitOr(List<Boolean> children) {
public FileIndexResult visitOr(List<FileIndexResult> children) {
throw new UnsupportedOperationException("Should not invoke this");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex;

/** File index result to decide whether filter a file. */
public interface FileIndexResult {

FileIndexResult REMAIN =
new FileIndexResult() {
@Override
public boolean remain() {
return true;
}

@Override
public FileIndexResult and(FileIndexResult fileIndexResult) {
return fileIndexResult;
}

@Override
public FileIndexResult or(FileIndexResult fileIndexResult) {
return this;
}
};

FileIndexResult SKIP =
new FileIndexResult() {
@Override
public boolean remain() {
return false;
}

@Override
public FileIndexResult and(FileIndexResult fileIndexResult) {
return this;
}

@Override
public FileIndexResult or(FileIndexResult fileIndexResult) {
return fileIndexResult;
}
};

boolean remain();

FileIndexResult and(FileIndexResult fileIndexResult);

FileIndexResult or(FileIndexResult fileIndexResult);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.fileindex.bloomfilter;

import org.apache.paimon.fileindex.FileIndexReader;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fileindex.FileIndexWriter;
import org.apache.paimon.fileindex.FileIndexer;
import org.apache.paimon.options.Options;
Expand All @@ -29,6 +30,9 @@

import org.apache.hadoop.util.bloom.HashFunction;

import static org.apache.paimon.fileindex.FileIndexResult.REMAIN;
import static org.apache.paimon.fileindex.FileIndexResult.SKIP;

/**
* Bloom filter for file index.
*
Expand Down Expand Up @@ -118,8 +122,8 @@ public Reader(DataType type, byte[] serializedBytes) {
}

@Override
public Boolean visitEqual(FieldRef fieldRef, Object key) {
return key == null || filter.testHash(hashFunction.hash(key));
public FileIndexResult visitEqual(FieldRef fieldRef, Object key) {
return key == null || filter.testHash(hashFunction.hash(key)) ? REMAIN : SKIP;
}
}
}
Loading

0 comments on commit b23b886

Please sign in to comment.