Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support file index filter push down #4684

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex;

import org.apache.paimon.predicate.CompoundPredicate;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateVisitor;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/** Visit the predicate and extract the file index fallback predicate. */
public class FileIndexFilterFallbackPredicateVisitor
implements PredicateVisitor<Optional<Predicate>> {

private final Set<FieldRef> fields;

public FileIndexFilterFallbackPredicateVisitor(Set<FieldRef> fields) {
this.fields = fields;
}

@Override
public Optional<Predicate> visit(LeafPredicate predicate) {
if (fields.contains(predicate.fieldRef())) {
return Optional.empty();
}
return Optional.of(predicate);
}

@Override
public Optional<Predicate> visit(CompoundPredicate predicate) {
List<Predicate> converted = new ArrayList<>();
for (Predicate child : predicate.children()) {
child.visit(this).ifPresent(converted::add);
}
if (converted.isEmpty()) {
return Optional.empty();
}
if (converted.size() == 1) {
return Optional.of(converted.get(0));
}
return Optional.of(new CompoundPredicate(predicate.function(), converted));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex;

import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.FunctionVisitor;

import java.util.List;

/** Visit the predicate and check if file index can be filter push down. */
public abstract class FileIndexFilterPushDownAnalyzer implements FunctionVisitor<Boolean> {

@Override
public Boolean visitIsNotNull(FieldRef fieldRef) {
return false;
}

@Override
public Boolean visitIsNull(FieldRef fieldRef) {
return false;
}

@Override
public Boolean visitStartsWith(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitEndsWith(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitContains(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitLessThan(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitNotEqual(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitEqual(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitGreaterThan(FieldRef fieldRef, Object literal) {
return false;
}

@Override
public Boolean visitIn(FieldRef fieldRef, List<Object> literals) {
return false;
}

@Override
public Boolean visitNotIn(FieldRef fieldRef, List<Object> literals) {
return false;
}

@Override
public Boolean visitAnd(List<Boolean> children) {
throw new UnsupportedOperationException("Should not invoke this");
}

@Override
public Boolean visitOr(List<Boolean> children) {
throw new UnsupportedOperationException("Should not invoke this");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.predicate.CompoundPredicate;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateVisitor;

import java.util.Collections;
import java.util.List;
import java.util.Map;

/** Visit the predicate and check if index can push down the predicate. */
public class FileIndexFilterPushDownVisitor implements PredicateVisitor<Boolean> {

private final Map<String, List<FileIndexFilterPushDownAnalyzer>> analyzers;

public FileIndexFilterPushDownVisitor() {
this(Collections.emptyMap());
}

public FileIndexFilterPushDownVisitor(
Map<String, List<FileIndexFilterPushDownAnalyzer>> analyzers) {
this.analyzers = analyzers;
}

@Override
public Boolean visit(LeafPredicate predicate) {
List<FileIndexFilterPushDownAnalyzer> analyzers =
this.analyzers.getOrDefault(predicate.fieldName(), Collections.emptyList());
for (FileIndexFilterPushDownAnalyzer analyzer : analyzers) {
if (analyzer.visit(predicate)) {
return true;
}
}
return false;
}

@Override
public Boolean visit(CompoundPredicate predicate) {
for (Predicate child : predicate.children()) {
Boolean matched = child.visit(this);
if (!matched) {
return false;
}
}
return true;
}

@VisibleForTesting
public Map<String, List<FileIndexFilterPushDownAnalyzer>> getAnalyzers() {
return analyzers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.StringUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -167,6 +173,45 @@ public Options getMapTopLevelOptions(String column, String indexType) {
+ indexType));
}

public FileIndexFilterPushDownVisitor createFilterPushDownPredicateVisitor(RowType rowType) {
Map<String, List<FileIndexFilterPushDownAnalyzer>> analyzers = new HashMap<>();
for (Map.Entry<Column, Map<String, Options>> entry : indexTypeOptions.entrySet()) {
Column column = entry.getKey();
for (Map.Entry<String, Options> typeEntry : entry.getValue().entrySet()) {
String key;
FileIndexFilterPushDownAnalyzer analyzer;
DataField field = rowType.getField(column.columnName);
Options options = typeEntry.getValue();
if (column.isNestedColumn) {
if (field.type().getTypeRoot() != DataTypeRoot.MAP) {
throw new IllegalArgumentException(
"Column "
+ column.columnName
+ " is nested column, but is not map type. Only should map type yet.");
}
MapType mapType = (MapType) field.type();
Options mapTopLevelOptions =
getMapTopLevelOptions(column.columnName, typeEntry.getKey());
key = FileIndexCommon.toMapKey(column.columnName, column.nestedColumnName);
analyzer =
FileIndexer.create(
typeEntry.getKey(),
mapType.getValueType(),
new Options(
mapTopLevelOptions.toMap(), options.toMap()))
.createFilterPushDownAnalyzer();
} else {
key = column.columnName;
analyzer =
FileIndexer.create(typeEntry.getKey(), field.type(), options)
.createFilterPushDownAnalyzer();
}
analyzers.computeIfAbsent(key, k -> new ArrayList<>()).add(analyzer);
}
}
return new FileIndexFilterPushDownVisitor(analyzers);
}

public boolean isEmpty() {
return indexTypeOptions.isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.paimon.fileindex;

import org.apache.paimon.predicate.FieldRef;

import java.util.Collections;
import java.util.Set;

/** File index result to decide whether filter a file. */
public interface FileIndexResult {

Expand Down Expand Up @@ -59,6 +64,10 @@ public FileIndexResult or(FileIndexResult fileIndexResult) {

boolean remain();

default Set<FieldRef> applyIndexes() {
return Collections.emptySet();
}

default FileIndexResult and(FileIndexResult fileIndexResult) {
if (fileIndexResult.remain()) {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public interface FileIndexer {

FileIndexReader createReader(SeekableInputStream inputStream, int start, int length);

FileIndexFilterPushDownAnalyzer createFilterPushDownAnalyzer();

static FileIndexer create(String type, DataType dataType, Options options) {
FileIndexerFactory fileIndexerFactory = FileIndexerFactoryUtils.load(type);
return fileIndexerFactory.create(dataType, options);
Expand Down
Loading
Loading