From 0c0b2cd64226e88692eed426ae20bafe1fdb755a Mon Sep 17 00:00:00 2001
From: wgcn <1026688210@qq.com>
Date: Sat, 25 May 2024 21:39:29 +0800
Subject: [PATCH] support parse filter sql
---
paimon-flink/paimon-flink-common/pom.xml | 6 +
.../SimpleSqlPredicateConvertor.java | 158 ++++++++++++
.../SimpleSqlPredicateConvertorTest.java | 228 ++++++++++++++++++
3 files changed, 392 insertions(+)
create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml
index 09ea2bb9c678d..22dd0b0e96c1e 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -77,6 +77,12 @@ under the License.
test
+
+ org.apache.flink
+ flink-table-planner_2.12
+ ${flink.version}
+ provided
+
org.apache.flink
flink-connector-test-utils
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
new file mode 100644
index 0000000000000..1cc5912ac79c0
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.predicate;
+
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TypeUtils;
+
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlBinaryOperator;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlPostfixOperator;
+import org.apache.calcite.sql.SqlPrefixOperator;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiFunction;
+
+import static org.apache.calcite.avatica.util.Casing.UNCHANGED;
+
+/** convert sql to predicate. */
+public class SimpleSqlPredicateConvertor {
+
+ private final PredicateBuilder builder;
+ private final RowType rowType;
+
+ public SimpleSqlPredicateConvertor(RowType type) {
+ this.rowType = type;
+ this.builder = new PredicateBuilder(type);
+ }
+
+ public Predicate convertSqlToPredicate(String conditionSql) throws SqlParseException {
+ SqlParser parser =
+ SqlParser.create(conditionSql, SqlParser.config().withUnquotedCasing(UNCHANGED));
+ SqlNode sqlNode = parser.parseExpression();
+ return convert((SqlBasicCall) sqlNode);
+ }
+
+ public Predicate convert(SqlBasicCall sqlBasicCall) {
+ SqlOperator operator = sqlBasicCall.getOperator();
+ SqlKind kind = operator.getKind();
+ if (operator instanceof SqlBinaryOperator) {
+ List operandList = sqlBasicCall.getOperandList();
+ SqlNode left = operandList.get(0);
+ SqlNode right = operandList.get(1);
+ switch (kind) {
+ case OR:
+ return PredicateBuilder.or(
+ convert((SqlBasicCall) left), convert((SqlBasicCall) right));
+ case AND:
+ return PredicateBuilder.and(
+ convert((SqlBasicCall) left), convert((SqlBasicCall) right));
+ case EQUALS:
+ return visitBiFunction(left, right, builder::equal, builder::equal);
+ case NOT_EQUALS:
+ return visitBiFunction(left, right, builder::notEqual, builder::notEqual);
+ case LESS_THAN:
+ return visitBiFunction(left, right, builder::lessThan, builder::greaterThan);
+ case LESS_THAN_OR_EQUAL:
+ return visitBiFunction(
+ left, right, builder::lessOrEqual, builder::greaterOrEqual);
+ case GREATER_THAN:
+ return visitBiFunction(left, right, builder::greaterThan, builder::lessThan);
+ case GREATER_THAN_OR_EQUAL:
+ return visitBiFunction(
+ left, right, builder::greaterOrEqual, builder::lessOrEqual);
+ case IN:
+ {
+ int index = getfieldIndex(left.toString());
+ SqlNodeList Elementslist = (SqlNodeList) right;
+
+ List