From c44f8a4a62e72759f9e8d22ed88406c20eeac0ab Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Mon, 26 Aug 2024 08:56:39 +0000 Subject: [PATCH] remove native registration --- .../org/apache/gluten/udf/UdfJniWrapper.java | 2 +- .../backendsapi/velox/VeloxListenerApi.scala | 2 + .../backendsapi/velox/VeloxRuleApi.scala | 2 - .../spark/sql/expression/UDFResolver.scala | 36 +-- .../gluten/expression/VeloxUdfSuite.scala | 44 +--- cpp/velox/jni/JniUdf.cc | 2 +- cpp/velox/jni/JniUdf.h | 2 +- cpp/velox/jni/VeloxJniWrapper.cc | 4 +- cpp/velox/tests/MyUdfTest.cc | 21 +- cpp/velox/udf/examples/CMakeLists.txt | 3 - cpp/velox/udf/examples/MyUDF.cc | 222 ------------------ cpp/velox/udf/examples/TestMyUDF.cc | 45 ---- docs/developers/VeloxUDF.md | 39 +-- 13 files changed, 53 insertions(+), 371 deletions(-) delete mode 100644 cpp/velox/udf/examples/TestMyUDF.cc diff --git a/backends-velox/src/main/java/org/apache/gluten/udf/UdfJniWrapper.java b/backends-velox/src/main/java/org/apache/gluten/udf/UdfJniWrapper.java index 8bfe8bad5c018..bbe2057c42f99 100644 --- a/backends-velox/src/main/java/org/apache/gluten/udf/UdfJniWrapper.java +++ b/backends-velox/src/main/java/org/apache/gluten/udf/UdfJniWrapper.java @@ -18,5 +18,5 @@ public class UdfJniWrapper { - public static native void getFunctionSignatures(); + public static native void registerFunctionSignatures(); } diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala index 2cfc4e9a9099d..51d257f680c11 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala @@ -21,6 +21,7 @@ import org.apache.gluten.backendsapi.ListenerApi import org.apache.gluten.execution.datasource.{GlutenOrcWriterInjects, GlutenParquetWriterInjects, GlutenRowSplitter} import org.apache.gluten.expression.UDFMappings import org.apache.gluten.init.NativeBackendInitializer +import org.apache.gluten.udf.UdfJniWrapper import org.apache.gluten.utils._ import org.apache.gluten.vectorized.{JniLibLoader, JniWorkspace} @@ -61,6 +62,7 @@ class VeloxListenerApi extends ListenerApi with Logging { SparkDirectoryUtil.init(conf) UDFResolver.resolveUdfConf(conf, isDriver = true) initialize(conf) + UdfJniWrapper.registerFunctionSignatures() } override def onDriverShutdown(): Unit = shutdown() diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 438895b25ae90..4ff7f0305d58c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -30,7 +30,6 @@ import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasI import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter} -import org.apache.spark.sql.expression.UDFResolver class VeloxRuleApi extends RuleApi { import VeloxRuleApi._ @@ -47,7 +46,6 @@ private object VeloxRuleApi { // Regular Spark rules. injector.injectOptimizerRule(CollectRewriteRule.apply) injector.injectOptimizerRule(HLLRewriteRule.apply) - UDFResolver.getFunctionSignatures().foreach(injector.injectFunction) injector.injectPostHocResolutionRule(ArrowConvertorRule.apply) } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala index 39032e46f3814..db166a51327c4 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala @@ -18,16 +18,14 @@ package org.apache.spark.sql.expression import org.apache.gluten.backendsapi.velox.VeloxBackendSettings import org.apache.gluten.exception.{GlutenException, GlutenNotSupportException} -import org.apache.gluten.expression.{ConverterUtils, ExpressionTransformer, ExpressionType, GenericExpressionTransformer, Transformable} -import org.apache.gluten.udf.UdfJniWrapper +import org.apache.gluten.expression._ import org.apache.gluten.vectorized.JniWorkspace -import org.apache.spark.{SparkConf, SparkContext, SparkFiles} +import org.apache.spark.{SparkConf, SparkFiles} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow} -import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, Expression, ExpressionInfo, Unevaluable} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, Expression, Unevaluable} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.types.DataTypeUtils @@ -334,32 +332,6 @@ object UDFResolver extends Logging { .mkString(",") } - def getFunctionSignatures(): Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = { - val sparkContext = SparkContext.getActive.get - val sparkConf = sparkContext.conf - val udfLibPaths = sparkConf.getOption(VeloxBackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS) - - udfLibPaths match { - case None => - Seq.empty - case Some(_) => - UdfJniWrapper.getFunctionSignatures() - UDFNames.map { - name => - ( - new FunctionIdentifier(name), - new ExpressionInfo(classOf[UDFExpression].getName, name), - (e: Seq[Expression]) => getUdfExpression(name, name)(e)) - }.toSeq ++ UDAFNames.map { - name => - ( - new FunctionIdentifier(name), - new ExpressionInfo(classOf[UserDefinedAggregateFunction].getName, name), - (e: Seq[Expression]) => getUdafExpression(name)(e)) - }.toSeq - } - } - private def checkAllowTypeConversion: Boolean = { SQLConf.get .getConfString(VeloxBackendSettings.GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION, "false") diff --git a/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala index 596757df35d9c..3abab13768912 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/expression/VeloxUdfSuite.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.expression.UDFResolver import java.nio.file.Paths -import java.sql.Date abstract class VeloxUdfSuite extends GlutenQueryTest with SQLHelper { @@ -92,41 +91,7 @@ abstract class VeloxUdfSuite extends GlutenQueryTest with SQLHelper { .set("spark.memory.offHeap.size", "1024MB") } - test("test udf") { - val df = spark.sql("""select - | myudf1(100L), - | myudf2(1), - | myudf2(1L), - | myudf3(), - | myudf3(1), - | myudf3(1, 2, 3), - | myudf3(1L), - | myudf3(1L, 2L, 3L), - | mydate(cast('2024-03-25' as date), 5) - |""".stripMargin) - assert( - df.collect() - .sameElements(Array(Row(105L, 6, 6L, 5, 6, 11, 6L, 11L, Date.valueOf("2024-03-30"))))) - } - - test("test udf allow type conversion") { - withSQLConf(VeloxBackendSettings.GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION -> "true") { - val df = spark.sql("""select myudf1("100"), myudf1(1), mydate('2024-03-25', 5)""") - assert( - df.collect() - .sameElements(Array(Row(105L, 6L, Date.valueOf("2024-03-30"))))) - } - - withSQLConf(VeloxBackendSettings.GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION -> "false") { - assert( - spark - .sql("select mydate2('2024-03-25', 5)") - .collect() - .sameElements(Array(Row(Date.valueOf("2024-03-30"))))) - } - } - - test("test udaf") { + ignore("test udaf") { val df = spark.sql("""select | myavg(1), | myavg(1L), @@ -140,7 +105,7 @@ abstract class VeloxUdfSuite extends GlutenQueryTest with SQLHelper { .sameElements(Array(Row(1.0, 1.0, 1.0, 1.0, 1L)))) } - test("test udaf allow type conversion") { + ignore("test udaf allow type conversion") { withSQLConf(VeloxBackendSettings.GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION -> "true") { val df = spark.sql("""select myavg("1"), myavg("1.0"), mycount_if("true")""") assert( @@ -149,7 +114,7 @@ abstract class VeloxUdfSuite extends GlutenQueryTest with SQLHelper { } } - test("test hive udf replacement") { + test("test native hive udf") { val tbl = "test_hive_udf_replacement" withTempPath { dir => @@ -169,12 +134,15 @@ abstract class VeloxUdfSuite extends GlutenQueryTest with SQLHelper { |AS 'org.apache.spark.sql.hive.execution.UDFStringString' |""".stripMargin) + val nativeResultWithImplicitConversion = + spark.sql(s"""SELECT hive_string_string(col1, 'a') FROM $tbl""").collect() val nativeResult = spark.sql(s"""SELECT hive_string_string(col2, 'a') FROM $tbl""").collect() // Unregister native hive udf to fallback. UDFResolver.UDFNames.remove("org.apache.spark.sql.hive.execution.UDFStringString") val fallbackResult = spark.sql(s"""SELECT hive_string_string(col2, 'a') FROM $tbl""").collect() + assert(nativeResultWithImplicitConversion.sameElements(fallbackResult)) assert(nativeResult.sameElements(fallbackResult)) // Add an unimplemented udf to the map to test fallback of registered native hive udf. diff --git a/cpp/velox/jni/JniUdf.cc b/cpp/velox/jni/JniUdf.cc index 8230724f12602..aa5d1c0932c40 100644 --- a/cpp/velox/jni/JniUdf.cc +++ b/cpp/velox/jni/JniUdf.cc @@ -49,7 +49,7 @@ void gluten::finalizeVeloxJniUDF(JNIEnv* env) { env->DeleteGlobalRef(udfResolverClass); } -void gluten::jniGetFunctionSignatures(JNIEnv* env) { +void gluten::jniRegisterFunctionSignatures(JNIEnv* env) { auto udfLoader = gluten::UdfLoader::getInstance(); const auto& signatures = udfLoader->getRegisteredUdfSignatures(); diff --git a/cpp/velox/jni/JniUdf.h b/cpp/velox/jni/JniUdf.h index b91ac08dedc5e..568439d18451b 100644 --- a/cpp/velox/jni/JniUdf.h +++ b/cpp/velox/jni/JniUdf.h @@ -27,6 +27,6 @@ void initVeloxJniUDF(JNIEnv* env); void finalizeVeloxJniUDF(JNIEnv* env); -void jniGetFunctionSignatures(JNIEnv* env); +void jniRegisterFunctionSignatures(JNIEnv* env); } // namespace gluten diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 5df3a478e6674..37c90643a5c94 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -88,11 +88,11 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_init_NativeBackendInitializer_shut JNI_METHOD_END() } -JNIEXPORT void JNICALL Java_org_apache_gluten_udf_UdfJniWrapper_getFunctionSignatures( // NOLINT +JNIEXPORT void JNICALL Java_org_apache_gluten_udf_UdfJniWrapper_registerFunctionSignatures( // NOLINT JNIEnv* env, jclass) { JNI_METHOD_START - gluten::jniGetFunctionSignatures(env); + gluten::jniRegisterFunctionSignatures(env); JNI_METHOD_END() } diff --git a/cpp/velox/tests/MyUdfTest.cc b/cpp/velox/tests/MyUdfTest.cc index 46898b38cfd24..c9849d67d010d 100644 --- a/cpp/velox/tests/MyUdfTest.cc +++ b/cpp/velox/tests/MyUdfTest.cc @@ -34,12 +34,17 @@ class MyUdfTest : public FunctionBaseTest { } }; -TEST_F(MyUdfTest, myudf1) { - const auto myudf1 = [&](const int64_t& number) { - return evaluateOnce("myudf1(c0)", BIGINT(), std::make_optional(number)); - }; - - EXPECT_EQ(5, myudf1(0)); - EXPECT_EQ(105, myudf1(100)); - EXPECT_EQ(3147483652, myudf1(3147483647)); // int64 +TEST_F(MyUdfTest, hivestringstring) { + auto map = facebook::velox::exec::vectorFunctionFactories(); + const std::string candidate = {"org.apache.spark.sql.hive.execution.UDFStringString"}; + ASSERT(map.withRLock([&candidate](auto& self) -> bool { + auto iter = self.find(candidate); + std::unordered_map values; + const facebook::velox::core::QueryConfig config(std::move(values)); + iter->second.factory( + candidate, + {facebook::velox::exec::VectorFunctionArg{facebook::velox::VARCHAR()}, + facebook::velox::exec::VectorFunctionArg{facebook::velox::VARCHAR()}}, + config) != nullptr; + });) } \ No newline at end of file diff --git a/cpp/velox/udf/examples/CMakeLists.txt b/cpp/velox/udf/examples/CMakeLists.txt index 3c51859eeb71b..32f39425bb778 100644 --- a/cpp/velox/udf/examples/CMakeLists.txt +++ b/cpp/velox/udf/examples/CMakeLists.txt @@ -18,6 +18,3 @@ target_link_libraries(myudf velox) add_library(myudaf SHARED "MyUDAF.cc") target_link_libraries(myudaf velox) - -add_executable(test_myudf "TestMyUDF.cc") -target_link_libraries(test_myudf velox) diff --git a/cpp/velox/udf/examples/MyUDF.cc b/cpp/velox/udf/examples/MyUDF.cc index 75e68413a8426..783699614fbd2 100644 --- a/cpp/velox/udf/examples/MyUDF.cc +++ b/cpp/velox/udf/examples/MyUDF.cc @@ -32,223 +32,6 @@ static const char* kBigInt = "bigint"; static const char* kDate = "date"; static const char* kVarChar = "varchar"; -namespace myudf { - -template -class PlusFiveFunction : public exec::VectorFunction { - public: - explicit PlusFiveFunction() {} - - void apply( - const SelectivityVector& rows, - std::vector& args, - const TypePtr& /* outputType */, - exec::EvalCtx& context, - VectorPtr& result) const override { - using nativeType = typename TypeTraits::NativeType; - - BaseVector::ensureWritable(rows, createScalarType(), context.pool(), result); - - auto* flatResult = result->asFlatVector(); - auto* rawResult = flatResult->mutableRawValues(); - - flatResult->clearNulls(rows); - - rows.applyToSelected([&](auto row) { rawResult[row] = 5; }); - - if (args.size() == 0) { - return; - } - - for (int i = 0; i < args.size(); i++) { - auto& arg = args[i]; - VELOX_CHECK(arg->isFlatEncoding() || arg->isConstantEncoding()); - if (arg->isConstantEncoding()) { - auto value = arg->as>()->valueAt(0); - rows.applyToSelected([&](auto row) { rawResult[row] += value; }); - } else { - auto* rawInput = arg->as>()->rawValues(); - rows.applyToSelected([&](auto row) { rawResult[row] += rawInput[row]; }); - } - } - } -}; - -static std::shared_ptr makePlusConstant( - const std::string& /*name*/, - const std::vector& inputArgs, - const core::QueryConfig& /*config*/) { - if (inputArgs.size() == 0) { - return std::make_shared>(); - } - auto typeKind = inputArgs[0].type->kind(); - switch (typeKind) { - case TypeKind::INTEGER: - return std::make_shared>(); - case TypeKind::BIGINT: - return std::make_shared>(); - default: - VELOX_UNREACHABLE(); - } -} - -// name: myudf1 -// signatures: -// bigint -> bigint -// type: VectorFunction -class MyUdf1Registerer final : public gluten::UdfRegisterer { - public: - int getNumUdf() override { - return 1; - } - - void populateUdfEntries(int& index, gluten::UdfEntry* udfEntries) override { - udfEntries[index++] = {name_.c_str(), kBigInt, 1, bigintArg_}; - } - - void registerSignatures() override { - facebook::velox::exec::registerVectorFunction( - name_, bigintSignatures(), std::make_unique>()); - } - - private: - std::vector> bigintSignatures() { - return {exec::FunctionSignatureBuilder().returnType("bigint").argumentType("bigint").build()}; - } - - const std::string name_ = "myudf1"; - const char* bigintArg_[1] = {kBigInt}; -}; - -// name: myudf2 -// signatures: -// integer -> integer -// bigint -> bigint -// type: StatefulVectorFunction -class MyUdf2Registerer final : public gluten::UdfRegisterer { - public: - int getNumUdf() override { - return 2; - } - - void populateUdfEntries(int& index, gluten::UdfEntry* udfEntries) override { - udfEntries[index++] = {name_.c_str(), kInteger, 1, integerArg_}; - udfEntries[index++] = {name_.c_str(), kBigInt, 1, bigintArg_}; - } - - void registerSignatures() override { - facebook::velox::exec::registerStatefulVectorFunction(name_, integerAndBigintSignatures(), makePlusConstant); - } - - private: - std::vector> integerAndBigintSignatures() { - return { - exec::FunctionSignatureBuilder().returnType("integer").argumentType("integer").build(), - exec::FunctionSignatureBuilder().returnType("bigint").argumentType("bigint").build()}; - } - - const std::string name_ = "myudf2"; - const char* integerArg_[1] = {kInteger}; - const char* bigintArg_[1] = {kBigInt}; -}; - -// name: myudf3 -// signatures: -// [integer,] ... -> integer -// bigint, [bigint,] ... -> bigint -// type: StatefulVectorFunction with variable arity -class MyUdf3Registerer final : public gluten::UdfRegisterer { - public: - int getNumUdf() override { - return 2; - } - - void populateUdfEntries(int& index, gluten::UdfEntry* udfEntries) override { - udfEntries[index++] = {name_.c_str(), kInteger, 1, integerArg_, true}; - udfEntries[index++] = {name_.c_str(), kBigInt, 2, bigintArgs_, true}; - } - - void registerSignatures() override { - facebook::velox::exec::registerStatefulVectorFunction( - name_, integerAndBigintSignaturesWithVariableArity(), makePlusConstant); - } - - private: - std::vector> integerAndBigintSignaturesWithVariableArity() { - return { - exec::FunctionSignatureBuilder().returnType("integer").argumentType("integer").variableArity().build(), - exec::FunctionSignatureBuilder() - .returnType("bigint") - .argumentType("bigint") - .argumentType("bigint") - .variableArity() - .build()}; - } - - const std::string name_ = "myudf3"; - const char* integerArg_[1] = {kInteger}; - const char* bigintArgs_[2] = {kBigInt, kBigInt}; -}; -} // namespace myudf - -namespace mydate { -template -struct MyDateSimpleFunction { - VELOX_DEFINE_FUNCTION_TYPES(T); - - FOLLY_ALWAYS_INLINE void call(int32_t& result, const arg_type& date, const arg_type addition) { - result = date + addition; - } -}; - -// name: mydate -// signatures: -// date, integer -> bigint -// type: SimpleFunction -class MyDateRegisterer final : public gluten::UdfRegisterer { - public: - int getNumUdf() override { - return 1; - } - - void populateUdfEntries(int& index, gluten::UdfEntry* udfEntries) override { - udfEntries[index++] = {name_.c_str(), kDate, 2, myDateArg_}; - } - - void registerSignatures() override { - facebook::velox::registerFunction({name_}); - } - - private: - const std::string name_ = "mydate"; - const char* myDateArg_[2] = {kDate, kInteger}; -}; - -// name: mydate -// signatures: -// date, integer -> bigint -// type: SimpleFunction -// enable type conversion -class MyDate2Registerer final : public gluten::UdfRegisterer { - public: - int getNumUdf() override { - return 1; - } - - void populateUdfEntries(int& index, gluten::UdfEntry* udfEntries) override { - udfEntries[index++] = {name_.c_str(), kDate, 2, myDateArg_, false, true}; - } - - void registerSignatures() override { - facebook::velox::registerFunction({name_}); - } - - private: - const std::string name_ = "mydate2"; - const char* myDateArg_[2] = {kDate, kInteger}; -}; -} // namespace mydate - namespace hivestringstring { template struct HiveStringStringFunction { @@ -297,11 +80,6 @@ void setupRegisterers() { return; } auto& registerers = globalRegisters(); - registerers.push_back(std::make_shared()); - registerers.push_back(std::make_shared()); - registerers.push_back(std::make_shared()); - registerers.push_back(std::make_shared()); - registerers.push_back(std::make_shared()); registerers.push_back(std::make_shared()); inited = true; } diff --git a/cpp/velox/udf/examples/TestMyUDF.cc b/cpp/velox/udf/examples/TestMyUDF.cc deleted file mode 100644 index 794dd0613e907..0000000000000 --- a/cpp/velox/udf/examples/TestMyUDF.cc +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include "udf/UdfLoader.h" - -#include "velox/expression/VectorFunction.h" - -int main() { - auto udfLoader = gluten::UdfLoader::getInstance(); - udfLoader->loadUdfLibraries("libmyudf.so"); - udfLoader->registerUdf(); - - auto map = facebook::velox::exec::vectorFunctionFactories(); - const std::vector candidates = {"myudf1", "myudf2"}; - auto f = map.withRLock([&candidates](auto& self) -> bool { - return std::all_of(candidates.begin(), candidates.end(), [&](const auto& funcName) { - auto iter = self.find(funcName); - std::unordered_map values; - const facebook::velox::core::QueryConfig config(std::move(values)); - return iter->second.factory( - funcName, {facebook::velox::exec::VectorFunctionArg{facebook::velox::BIGINT()}}, config) != nullptr; - }); - }); - - if (!f) { - return 1; - } - - return 0; -} diff --git a/docs/developers/VeloxUDF.md b/docs/developers/VeloxUDF.md index 6872f2d0c8419..4f685cc41ed77 100644 --- a/docs/developers/VeloxUDF.md +++ b/docs/developers/VeloxUDF.md @@ -10,9 +10,10 @@ parent: /developer-overview/ ## Introduction Velox backend supports User-Defined Functions (UDF) and User-Defined Aggregate Functions (UDAF). -Users can create their own functions using the UDF interface provided in Velox backend and build libraries for these functions. -At runtime, the UDF are registered at the start of applications. -Once registered, Gluten will be able to parse and offload these UDF into Velox during execution. +Users can implement custom functions using the UDF interface provided by Velox and compile them into libraries. +At runtime, these UDFs are registered alongside their Java implementations via `CREATE TEMPORARY FUNCTION`. +Once registered, Gluten can parse and offload these UDFs to Velox during execution, +meanwhile ensuring proper fallback to Java UDFs when necessary. ## Create and Build UDF/UDAF library @@ -39,22 +40,23 @@ The following steps demonstrate how to set up a UDF library project: This function is called to register the UDF to Velox function registry. This is where users should register functions by calling `facebook::velox::exec::registerVecotorFunction` or other Velox APIs. - - The interface functions are mapped to marcos in [Udf.h](../../cpp/velox/udf/Udf.h). Here's an example of how to implement these functions: + - The interface functions are mapped to marcos in [Udf.h](../../cpp/velox/udf/Udf.h). + + Assuming there is an existing Hive UDF `org.apache.gluten.sql.hive.MyUDF`, its native UDF can be implemented as follows. ``` - // Filename MyUDF.cc - #include #include namespace { static const char* kInteger = "integer"; + static const char* kMyUdfFunctionName = "org.apache.gluten.sql.hive.MyUDF"; } const int kNumMyUdf = 1; const char* myUdfArgs[] = {kInteger}: - gluten::UdfEntry myUdfSig = {"myudf", kInteger, 1, myUdfArgs}; + gluten::UdfEntry myUdfSig = {kMyUdfFunctionName, kInteger, 1, myUdfArgs}; class MyUdf : public facebook::velox::exec::VectorFunction { ... // Omit concrete implementation @@ -167,20 +169,25 @@ or --conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=file:///path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so ``` -Run query. The functions `myudf1` and `myudf2` increment the input value by a constant of 5 +Start `spark-sql` and run query. You need to add jar "spark-hive_2.12--tests.jar" to the classpath for hive udf `org.apache.spark.sql.hive.execution.UDFStringString` ``` -select myudf1(100L), myudf2(1) +spark-sql (default)> CREATE TEMPORARY FUNCTION hive_string_string AS 'org.apache.spark.sql.hive.execution.UDFStringString'; +Time taken: 0.808 seconds +spark-sql (default)> select hive_string_string("hello", "world"); +hello world +Time taken: 3.208 seconds, Fetched 1 row(s) ``` -The output from spark-shell will be like - +You can verify the offload with "explain". ``` -+------------------+----------------+ -|udfexpression(100)|udfexpression(1)| -+------------------+----------------+ -| 105| 6| -+------------------+----------------+ +spark-sql (default)> explain select hive_string_string("hello", "world"); +== Physical Plan == +VeloxColumnarToRowExec ++- ^(2) ProjectExecTransformer [hello world AS hive_string_string(hello, world)#8] + +- ^(2) InputIteratorTransformer[fake_column#9] + +- RowToVeloxColumnar + +- *(1) Scan OneRowRelation[fake_column#9] ``` ## Configurations