Skip to content

Commit

Permalink
v1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Jan 15, 2025
1 parent 107dfa0 commit 5aca89b
Show file tree
Hide file tree
Showing 9 changed files with 562 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ public Predicate convert(Filter filter) {
return builder.equal(index, literal);
} else if (filter instanceof EqualNullSafe) {
EqualNullSafe eq = (EqualNullSafe) filter;
int index = fieldIndex(eq.attribute());
if (eq.value() == null) {
return builder.isNull(fieldIndex(eq.attribute()));
return builder.isNull(index);
} else {
int index = fieldIndex(eq.attribute());
Object literal = convertLiteral(index, eq.value());
return builder.equal(index, literal);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark

import org.apache.paimon.data.{BinaryString, Decimal, Timestamp}
import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.types.{DataTypeRoot, DecimalType, RowType}
import org.apache.paimon.types.DataTypeRoot._

import org.apache.spark.sql.connector.expressions.{Literal, NamedReference}
import org.apache.spark.sql.connector.expressions.filter.{And, Not, Or, Predicate => SparkPredicate}

import scala.collection.JavaConverters._

/** Conversion from [[SparkPredicate]] to [[Predicate]]. */
case class SparkV2FilterConverter(rowType: RowType) {

import org.apache.paimon.spark.SparkV2FilterConverter._

val builder = new PredicateBuilder(rowType)

def convert(sparkPredicate: SparkPredicate): Predicate = {
sparkPredicate.name() match {
case EQUAL_TO =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
// TODO deal with isNaN
val index = fieldIndex(fieldName)
builder.equal(index, convertLiteral(index, literal))
}

case EQUAL_NULL_SAFE =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
if (literal == null) {
builder.isNull(index)
} else {
builder.equal(index, convertLiteral(index, literal))
}
}

case GREATER_THAN =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.greaterThan(index, convertLiteral(index, literal))
}

case GREATER_THAN_OR_EQUAL =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.greaterOrEqual(index, convertLiteral(index, literal))
}

case LESS_THAN =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.lessThan(index, convertLiteral(index, literal))
}

case LESS_THAN_OR_EQUAL =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.lessOrEqual(index, convertLiteral(index, literal))
}

case IN =>
MultiPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literals)) =>
val index = fieldIndex(fieldName)
literals.map(convertLiteral(index, _)).toList.asJava
builder.in(index, literals.map(convertLiteral(index, _)).toList.asJava)
}

case IS_NULL =>
UnaryPredicate.unapply(sparkPredicate) match {
case Some(fieldName) =>
builder.isNull(fieldIndex(fieldName))
}

case IS_NOT_NULL =>
UnaryPredicate.unapply(sparkPredicate) match {
case Some(fieldName) =>
builder.isNotNull(fieldIndex(fieldName))
}

case AND =>
val and = sparkPredicate.asInstanceOf[And]
PredicateBuilder.and(convert(and.left), convert(and.right()))

case OR =>
val or = sparkPredicate.asInstanceOf[Or]
PredicateBuilder.or(convert(or.left), convert(or.right()))

case NOT =>
val not = sparkPredicate.asInstanceOf[Not]
val negate = convert(not.child()).negate()
if (negate.isPresent) {
negate.get()
} else {
throw new UnsupportedOperationException(s"Convert $sparkPredicate is unsupported.")
}

case STRING_START_WITH =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.startsWith(index, convertLiteral(index, literal))
}

case STRING_END_WITH =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.endsWith(index, convertLiteral(index, literal))
}

case STRING_CONTAINS =>
BinaryPredicate.unapply(sparkPredicate) match {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.contains(index, convertLiteral(index, literal))
}

// TODO: AlwaysTrue, AlwaysFalse
case _ => throw new UnsupportedOperationException(s"Convert $sparkPredicate is unsupported.")
}
}

private object UnaryPredicate {
def unapply(sparkPredicate: SparkPredicate): Option[String] = {
sparkPredicate.children() match {
case Array(n: NamedReference) => Some(toFieldName(n))
case _ => None
}
}
}

private object BinaryPredicate {
def unapply(sparkPredicate: SparkPredicate): Option[(String, Any)] = {
sparkPredicate.children() match {
case Array(l: NamedReference, r: Literal[_]) => Some((toFieldName(l), r.value))
case Array(l: Literal[_], r: NamedReference) => Some((toFieldName(r), l.value))
case _ => None
}
}
}

private object MultiPredicate {
def unapply(sparkPredicate: SparkPredicate): Option[(String, Array[Any])] = {
sparkPredicate.children() match {
case Array(first: NamedReference, rest @ _*)
if rest.nonEmpty && rest.forall(_.isInstanceOf[Literal[_]]) =>
Some(toFieldName(first), rest.map(_.asInstanceOf[Literal[_]].value).toArray)
case _ => None
}
}
}

private def fieldIndex(fieldName: String): Int = {
val index = rowType.getFieldIndex(fieldName)
// TODO: support nested field
if (index == -1) {
throw new UnsupportedOperationException(s"Nested field '$fieldName' is unsupported.")
}
index
}

private def convertLiteral(index: Int, value: Any): AnyRef = {
if (value == null) {
return null
}

val dataType = rowType.getTypeAt(index)
dataType.getTypeRoot match {
case BOOLEAN | BIGINT | DOUBLE | TINYINT | SMALLINT | INTEGER | FLOAT | DATE =>
value.asInstanceOf[AnyRef]
case DataTypeRoot.VARCHAR =>
BinaryString.fromString(value.toString)
case DataTypeRoot.DECIMAL =>
val decimalType = dataType.asInstanceOf[DecimalType]
val precision = decimalType.getPrecision
val scale = decimalType.getScale
Decimal.fromBigDecimal(
value.asInstanceOf[org.apache.spark.sql.types.Decimal].toJavaBigDecimal,
precision,
scale)
case DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
Timestamp.fromMicros(value.asInstanceOf[Long])
case DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
Timestamp.fromMicros(value.asInstanceOf[Long])
case _ =>
throw new UnsupportedOperationException(
s"Convert value: $value to datatype: $dataType is unsupported.")
}
}

private def toFieldName(ref: NamedReference): String = ref.fieldNames().mkString(".")
}

object SparkV2FilterConverter {

private val EQUAL_TO = "="
private val EQUAL_NULL_SAFE = "<=>"
private val GREATER_THAN = ">"
private val GREATER_THAN_OR_EQUAL = ">="
private val LESS_THAN = "<"
private val LESS_THAN_OR_EQUAL = "<="
private val IN = "IN"
private val IS_NULL = "IS_NULL"
private val IS_NOT_NULL = "IS_NOT_NULL"
private val AND = "AND"
private val OR = "OR"
private val NOT = "NOT"
private val STRING_START_WITH = "STARTS_WITH"
private val STRING_END_WITH = "ENDS_WITH"
private val STRING_CONTAINS = "CONTAINS"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.connector.expressions.FieldReference
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.translateFilterV2WithMapping
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.PartitioningUtils
Expand Down Expand Up @@ -68,8 +70,16 @@ object PaimonUtils {
DataSourceStrategy.translateFilter(predicate, supportNestedPredicatePushdown)
}

def fieldReference(name: String): NamedReference = {
FieldReference(Seq(name))
def translateFilterV2(predicate: Expression): Option[Predicate] = {
translateFilterV2WithMapping(predicate, None)
}

def fieldReference(name: String): FieldReference = {
fieldReference(Seq(name))
}

def fieldReference(parts: Seq[String]): FieldReference = {
FieldReference(parts)
}

def bytesToString(size: Long): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class PaimonSparkTestBase
super.beforeAll()
spark.sql(s"USE paimon")
spark.sql(s"CREATE DATABASE IF NOT EXISTS paimon.$dbName0")
spark.sql(s"USE paimon.$dbName0")
}

override protected def afterAll(): Unit = {
Expand Down
Loading

0 comments on commit 5aca89b

Please sign in to comment.