Skip to content

Commit

Permalink
[core] support file index filter push down
Browse files Browse the repository at this point in the history
  • Loading branch information
Tan-JiaLiang committed Dec 11, 2024
1 parent 7400979 commit 7766f69
Show file tree
Hide file tree
Showing 38 changed files with 1,102 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex;

import org.apache.paimon.predicate.CompoundPredicate;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateVisitor;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/** Visit the predicate and extract the file index fallback predicate. */
public class FileIndexFilterFallbackPredicateVisitor
implements PredicateVisitor<Optional<Predicate>> {

private final Set<FieldRef> fields;

public FileIndexFilterFallbackPredicateVisitor(Set<FieldRef> fields) {
this.fields = fields;
}

@Override
public Optional<Predicate> visit(LeafPredicate predicate) {
if (fields.contains(predicate.fieldRef())) {
return Optional.empty();
}
return Optional.of(predicate);
}

@Override
public Optional<Predicate> visit(CompoundPredicate predicate) {
List<Predicate> converted = new ArrayList<>();
for (Predicate child : predicate.children()) {
child.visit(this).ifPresent(converted::add);
}
if (converted.isEmpty()) {
return Optional.empty();
}
if (converted.size() == 1) {
return Optional.of(converted.get(0));
}
return Optional.of(new CompoundPredicate(predicate.function(), converted));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex;

import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.FunctionVisitor;

import java.util.List;

/** Visit the predicate and check if file index can be filter push down. */
public abstract class FileIndexFilterPushDownAnalyzer implements FunctionVisitor<Boolean> {

@Override
public Boolean visitIsNotNull(FieldRef fieldRef) {
return false;
}

@Override
public Boolean visitIsNull(FieldRef fieldRef) {
return false;
}

@Override
public Boolean visitStartsWith(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitEndsWith(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitContains(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitLessThan(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitNotEqual(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitEqual(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitGreaterThan(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitIn(FieldRef fieldRef, List<Object> literals) {
return false;
}

@Override
public Boolean visitNotIn(FieldRef fieldRef, List<Object> literals) {
return false;
}

@Override
public Boolean visitAnd(List<Boolean> children) {
throw new UnsupportedOperationException("Should not invoke this");
}

@Override
public Boolean visitOr(List<Boolean> children) {
throw new UnsupportedOperationException("Should not invoke this");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.predicate.CompoundPredicate;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateVisitor;

import java.util.Collections;
import java.util.List;
import java.util.Map;

/** Visit the predicate and check if index can push down the predicate. */
public class FileIndexFilterPushDownVisitor implements PredicateVisitor<Boolean> {

private final Map<String, List<FileIndexFilterPushDownAnalyzer>> analyzers;

public FileIndexFilterPushDownVisitor() {
this(Collections.emptyMap());
}

public FileIndexFilterPushDownVisitor(
Map<String, List<FileIndexFilterPushDownAnalyzer>> analyzers) {
this.analyzers = analyzers;
}

@Override
public Boolean visit(LeafPredicate predicate) {
List<FileIndexFilterPushDownAnalyzer> analyzers =
this.analyzers.getOrDefault(predicate.fieldName(), Collections.emptyList());
for (FileIndexFilterPushDownAnalyzer analyzer : analyzers) {
if (analyzer.visit(predicate)) {
return true;
}
}
return false;
}

@Override
public Boolean visit(CompoundPredicate predicate) {
for (Predicate child : predicate.children()) {
Boolean matched = child.visit(this);
if (!matched) {
return false;
}
}
return true;
}

@VisibleForTesting
public Map<String, List<FileIndexFilterPushDownAnalyzer>> getAnalyzers() {
return analyzers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.StringUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -167,6 +173,45 @@ public Options getMapTopLevelOptions(String column, String indexType) {
+ indexType));
}

public FileIndexFilterPushDownVisitor createFilterPushDownPredicateVisitor(RowType rowType) {
Map<String, List<FileIndexFilterPushDownAnalyzer>> analyzers = new HashMap<>();
for (Map.Entry<Column, Map<String, Options>> entry : indexTypeOptions.entrySet()) {
Column column = entry.getKey();
for (Map.Entry<String, Options> typeEntry : entry.getValue().entrySet()) {
String key;
FileIndexFilterPushDownAnalyzer analyzer;
DataField field = rowType.getField(column.columnName);
Options options = typeEntry.getValue();
if (column.isNestedColumn) {
if (field.type().getTypeRoot() != DataTypeRoot.MAP) {
throw new IllegalArgumentException(
"Column "
+ column.columnName
+ " is nested column, but is not map type. Only should map type yet.");
}
MapType mapType = (MapType) field.type();
Options mapTopLevelOptions =
getMapTopLevelOptions(column.columnName, typeEntry.getKey());
key = FileIndexCommon.toMapKey(column.columnName, column.nestedColumnName);
analyzer =
FileIndexer.create(
typeEntry.getKey(),
mapType.getValueType(),
new Options(
mapTopLevelOptions.toMap(), options.toMap()))
.createFilterPushDownAnalyzer();
} else {
key = column.columnName;
analyzer =
FileIndexer.create(typeEntry.getKey(), field.type(), options)
.createFilterPushDownAnalyzer();
}
analyzers.computeIfAbsent(key, k -> new ArrayList<>()).add(analyzer);
}
}
return new FileIndexFilterPushDownVisitor(analyzers);
}

public boolean isEmpty() {
return indexTypeOptions.isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.paimon.fileindex;

import org.apache.paimon.predicate.FieldRef;

import java.util.Collections;
import java.util.Set;

/** File index result to decide whether filter a file. */
public interface FileIndexResult {

Expand Down Expand Up @@ -59,6 +64,10 @@ public FileIndexResult or(FileIndexResult fileIndexResult) {

boolean remain();

default Set<FieldRef> applyIndexes() {
return Collections.emptySet();
}

default FileIndexResult and(FileIndexResult fileIndexResult) {
if (fileIndexResult.remain()) {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public interface FileIndexer {

FileIndexReader createReader(SeekableInputStream inputStream, int start, int length);

FileIndexFilterPushDownAnalyzer createFilterPushDownAnalyzer();

static FileIndexer create(String type, DataType dataType, Options options) {
FileIndexerFactory fileIndexerFactory = FileIndexerFactoryUtils.load(type);
return fileIndexerFactory.create(dataType, options);
Expand Down
Loading

0 comments on commit 7766f69

Please sign in to comment.