diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java index 772a2f4ed53d..bb16bbf1c7bc 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java @@ -27,8 +27,6 @@ import org.apache.spark.sql.connector.catalog.SupportsNamespaces; import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; -import java.util.Arrays; - import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; /** Catalog methods for working with Functions. */ @@ -54,7 +52,7 @@ default Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceExc return new Identifier[0]; } - throw new RuntimeException("Namespace " + Arrays.toString(namespace) + " is not valid"); + throw new NoSuchNamespaceException(namespace); } @Override @@ -66,6 +64,6 @@ default UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionExce } } - throw new RuntimeException("Function " + ident + " is not a paimon function"); + throw new NoSuchFunctionException(ident); } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala index 6b4543661fa2..f399ca3e6f01 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala @@ -23,6 +23,8 @@ import org.apache.paimon.spark.catalog.functions.PaimonFunctions import org.apache.spark.sql.Row import org.apache.spark.sql.connector.catalog.{FunctionCatalog, Identifier} +import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} +import org.apache.spark.sql.types.{DataType, IntegerType, StructType} class PaimonFunctionTest extends PaimonHiveTestBase { @@ -85,4 +87,49 @@ class PaimonFunctionTest extends PaimonHiveTestBase { ) } } + + test("Paimon function: show and load function with SparkGenericCatalog") { + sql(s"USE $sparkCatalogName") + sql(s"USE $hiveDbName") + sql("CREATE FUNCTION myIntSum AS 'org.apache.paimon.spark.sql.MyIntSum'") + checkAnswer( + sql(s"SHOW FUNCTIONS FROM $hiveDbName LIKE 'myIntSum'"), + Row("spark_catalog.test_hive.myintsum")) + + withTable("t") { + sql("CREATE TABLE t (id INT)") + sql("INSERT INTO t VALUES (1), (2), (3)") + checkAnswer(sql("SELECT myIntSum(id) FROM t"), Row(6)) + } + + sql("DROP FUNCTION myIntSum") + checkAnswer(sql(s"SHOW FUNCTIONS FROM $hiveDbName LIKE 'myIntSum'"), Seq.empty) + } +} + +private class MyIntSum extends UserDefinedAggregateFunction { + + override def inputSchema: StructType = new StructType().add("input", IntegerType) + + override def bufferSchema: StructType = new StructType().add("buffer", IntegerType) + + override def dataType: DataType = IntegerType + + override def deterministic: Boolean = true + + override def initialize(buffer: MutableAggregationBuffer): Unit = { + buffer.update(0, 0) + } + + override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { + buffer.update(0, buffer.getInt(0) + input.getInt(0)) + } + + override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { + buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0)) + } + + override def evaluate(buffer: Row): Any = { + buffer.getInt(0) + } }