Skip to content

Commit

Permalink
[fix](java udf) make executor class thread local (apache#25758)
Browse files Browse the repository at this point in the history
  • Loading branch information
AshinGau authored Oct 23, 2023
1 parent b5ee4a9 commit 6a6e10c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 56 deletions.
46 changes: 10 additions & 36 deletions be/src/vec/functions/function_java_udf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,16 @@

#include "vec/functions/function_java_udf.h"

#include <glog/logging.h>

#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "gutil/strings/substitute.h"
#include "jni.h"
#include "jni_md.h"
#include "runtime/user_function_cache.h"
#include "util/jni-util.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/jni_connector.h"

const char* EXECUTOR_CLASS = "org/apache/doris/udf/UdfExecutor";
Expand All @@ -60,25 +45,9 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
if (env == nullptr) {
return Status::InternalError("Failed to get/create JVM");
}
if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) {
std::shared_ptr<JniEnv> jni_env = std::make_shared<JniEnv>();
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &jni_env->executor_cl));
jni_env->executor_ctor_id =
env->GetMethodID(jni_env->executor_cl, "<init>", EXECUTOR_CTOR_SIGNATURE);
RETURN_ERROR_IF_EXC(env);
jni_env->executor_evaluate_id =
env->GetMethodID(jni_env->executor_cl, "evaluate", EXECUTOR_EVALUATE_SIGNATURE);
jni_env->executor_close_id =
env->GetMethodID(jni_env->executor_cl, "close", EXECUTOR_CLOSE_SIGNATURE);
RETURN_ERROR_IF_EXC(env);
context->set_function_state(FunctionContext::FRAGMENT_LOCAL, jni_env);
}

if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) {
JniEnv* jni_env = reinterpret_cast<JniEnv*>(
context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>(
_argument_types.size(), jni_env->executor_cl, jni_env->executor_close_id);
std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>();
context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);

// Add a scoped cleanup jni reference object. This cleans up local refs made below.
Expand All @@ -98,7 +67,14 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio

RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));

jni_ctx->executor = env->NewObject(jni_env->executor_cl, jni_env->executor_ctor_id,
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &jni_ctx->executor_cl));
jni_ctx->executor_ctor_id =
env->GetMethodID(jni_ctx->executor_cl, "<init>", EXECUTOR_CTOR_SIGNATURE);
jni_ctx->executor_evaluate_id =
env->GetMethodID(jni_ctx->executor_cl, "evaluate", EXECUTOR_EVALUATE_SIGNATURE);
jni_ctx->executor_close_id =
env->GetMethodID(jni_ctx->executor_cl, "close", EXECUTOR_CLOSE_SIGNATURE);
jni_ctx->executor = env->NewObject(jni_ctx->executor_cl, jni_ctx->executor_ctor_id,
ctor_params_bytes);

jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr);
Expand All @@ -118,8 +94,6 @@ Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block,
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
JniContext* jni_ctx = reinterpret_cast<JniContext*>(
context->get_function_state(FunctionContext::THREAD_LOCAL));
JniEnv* jni_env =
reinterpret_cast<JniEnv*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL));

std::unique_ptr<long[]> input_table;
RETURN_IF_ERROR(JniConnector::to_java_table(&block, num_rows, arguments, input_table));
Expand All @@ -136,7 +110,7 @@ Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block,
{"required_fields", output_table_schema.first},
{"columns_types", output_table_schema.second}};
jobject output_map = JniUtil::convert_to_java_map(env, output_params);
long output_address = env->CallLongMethod(jni_ctx->executor, jni_env->executor_evaluate_id,
long output_address = env->CallLongMethod(jni_ctx->executor, jni_ctx->executor_evaluate_id,
input_map, output_map);
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
env->DeleteLocalRef(input_map);
Expand Down
32 changes: 12 additions & 20 deletions be/src/vec/functions/function_java_udf.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/functions/function.h"
namespace doris {

namespace vectorized {
namespace doris::vectorized {

class JavaUdfPreparedFunction : public PreparedFunctionImpl {
public:
Expand Down Expand Up @@ -114,26 +113,18 @@ class JavaFunctionCall : public IFunctionBase {
const DataTypes _argument_types;
const DataTypePtr _return_type;

struct JniEnv {
/// Global class reference to the UdfExecutor Java class and related method IDs. Set in
/// Init(). These have the lifetime of the process (i.e. 'executor_cl_' is never freed).
jclass executor_cl;
jmethodID executor_ctor_id;
jmethodID executor_evaluate_id;
jmethodID executor_close_id;
};

struct JniContext {
// Do not save parent directly, because parent is in VExpr, but jni context is in FunctionContext
// The deconstruct sequence is not determined, it will core.
// JniContext's lifecycle should same with function context, not related with expr
jclass executor_cl_;
jmethodID executor_close_id_;
jclass executor_cl;
jmethodID executor_ctor_id;
jmethodID executor_evaluate_id;
jmethodID executor_close_id;
jobject executor = nullptr;
bool is_closed = false;

JniContext(int64_t num_args, jclass executor_cl, jmethodID executor_close_id)
: executor_cl_(executor_cl), executor_close_id_(executor_close_id) {}
JniContext() = default;

void close() {
if (is_closed) {
Expand All @@ -146,15 +137,16 @@ class JavaFunctionCall : public IFunctionBase {
LOG(WARNING) << "errors while get jni env " << status;
return;
}
env->CallNonvirtualVoidMethodA(executor, executor_cl_, executor_close_id_, NULL);
env->CallNonvirtualVoidMethodA(executor, executor_cl, executor_close_id, nullptr);
env->DeleteGlobalRef(executor);
env->DeleteGlobalRef(executor_cl_);
env->DeleteGlobalRef(executor_cl);
Status s = JniUtil::GetJniExceptionMsg(env);
if (!s.ok()) LOG(WARNING) << s;
if (!s.ok()) {
LOG(WARNING) << s;
}
is_closed = true;
}
};
};

} // namespace vectorized
} // namespace doris
} // namespace doris::vectorized

0 comments on commit 6a6e10c

Please sign in to comment.