From f34103145c26b9668377c6f77a15a99779ff93f8 Mon Sep 17 00:00:00 2001 From: Paul Guyot Date: Sat, 11 Jan 2025 13:59:39 +0100 Subject: [PATCH] Distribution: add support for rpc from other nodes Signed-off-by: Paul Guyot --- libs/estdlib/src/CMakeLists.txt | 1 + libs/estdlib/src/erpc.erl | 59 ++++++++++++ src/libAtomVM/defaultatoms.def | 8 ++ src/libAtomVM/dist_nifs.c | 128 ++++++++++++++++++++----- src/libAtomVM/dist_nifs.h | 27 ++++++ src/libAtomVM/nifs.c | 33 ++++++- src/libAtomVM/nifs.h | 2 + src/libAtomVM/otp_net.c | 14 ++- tests/libs/estdlib/test_net_kernel.erl | 28 ++++++ 9 files changed, 270 insertions(+), 30 deletions(-) create mode 100644 libs/estdlib/src/erpc.erl diff --git a/libs/estdlib/src/CMakeLists.txt b/libs/estdlib/src/CMakeLists.txt index 0c8c01b0a..b6b64c910 100644 --- a/libs/estdlib/src/CMakeLists.txt +++ b/libs/estdlib/src/CMakeLists.txt @@ -31,6 +31,7 @@ set(ERLANG_MODULES crypto dist_util erl_epmd + erpc erts_debug ets gen_event diff --git a/libs/estdlib/src/erpc.erl b/libs/estdlib/src/erpc.erl new file mode 100644 index 000000000..2b94ccd7e --- /dev/null +++ b/libs/estdlib/src/erpc.erl @@ -0,0 +1,59 @@ +% +% This file is part of AtomVM. +% +% Copyright 2025 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +%%----------------------------------------------------------------------------- +%% @doc An implementation of the Erlang/OTP erpc interface. +%% +%% This module implements a strict subset of the Erlang/OTP erpc +%% interface. +%% @end +%%----------------------------------------------------------------------------- +-module(erpc). + +% api +-export([ + execute_call/4 +]). + +%%----------------------------------------------------------------------------- +%% @param Reference reference of the request, passed in exit tuple +%% @param Module module to call +%% @param Func function to call +%% @param Args argument of the call +%% @doc Execute a call locally, exiting with the result. +%% This function is called from rpc on other nodes using spawn_request BIF. +%% @end +%%----------------------------------------------------------------------------- +-spec execute_call(Reference :: reference(), Module :: module(), Func :: atom(), Args :: [any()]) -> + no_return(). +execute_call(Reference, Module, Func, Args) -> + Reply = + try + Result = apply(Module, Func, Args), + {Reference, return, Result} + catch + throw:Reason -> + {Reference, throw, Reason}; + exit:Reason -> + {Reference, exit, Reason}; + error:Reason:Stack -> + {Reference, error, Reason, Stack} + end, + exit(Reply). diff --git a/src/libAtomVM/defaultatoms.def b/src/libAtomVM/defaultatoms.def index 5f29bf966..8467df467 100644 --- a/src/libAtomVM/defaultatoms.def +++ b/src/libAtomVM/defaultatoms.def @@ -178,3 +178,11 @@ X(INET_ATOM, "\x4", "inet") X(TIMEOUT_ATOM, "\x7", "timeout") X(DIST_DATA_ATOM, "\x9", "dist_data") + +X(REQUEST_ATOM, "\x7", "request") +X(REPLY_TAG_ATOM, "\x9", "reply_tag") +X(SPAWN_REPLY_ATOM, "\xB", "spawn_reply") +X(REPLY_ATOM, "\x5", "reply") +X(YES_ATOM, "\x3", "yes") +X(NO_ATOM, "\x2", "no") +X(ERROR_ONLY_ATOM, "\xA", "error_only") diff --git a/src/libAtomVM/dist_nifs.c b/src/libAtomVM/dist_nifs.c index 1a5d20586..5e0d6da89 100644 --- a/src/libAtomVM/dist_nifs.c +++ b/src/libAtomVM/dist_nifs.c @@ -70,6 +70,12 @@ enum OPERATION_ALIAS_SEND_TT = 34, }; +enum +{ + SPAWN_REPLY_FLAGS_LINK_CREATED = 1, + SPAWN_REPLY_FLAGS_MONITOR_CREATED = 2, +}; + struct DistributionPacket { struct ListHead head; @@ -318,6 +324,37 @@ static term nif_erlang_dist_ctrl_get_data(Context *ctx, int argc, term argv[]) return result; } +term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx) +{ + int target_process_id = 0; + if (term_is_local_pid(target_proc)) { + target_process_id = term_to_local_process_id(target_proc); + } else if (term_is_atom(target_proc)) { + target_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc)); + } else { + RAISE_ERROR(BADARG_ATOM); + } + struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor)); + monitor->target_proc = target_proc; + monitor->pid_number = term_get_external_pid_process_id(from_pid); + monitor->pid_serial = term_get_external_pid_serial(from_pid); + monitor->ref_len = term_get_external_reference_len(monitor_ref); + memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len); + if (target_process_id) { + synclist_append(&conn_obj->remote_monitors, &monitor->head); + ErlNifPid target_process_pid = target_process_id; + if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) { + synclist_remove(&conn_obj->remote_monitors, &monitor->head); + dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global); + free(monitor); + } + } else { + dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global); + free(monitor); + } + return OK_ATOM; +} + static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[]) { UNUSED(argc); @@ -369,32 +406,7 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[]) term from_pid = term_get_tuple_element(control, 1); term target_proc = term_get_tuple_element(control, 2); term monitor_ref = term_get_tuple_element(control, 3); - int target_process_id = 0; - if (term_is_local_pid(target_proc)) { - target_process_id = term_to_local_process_id(target_proc); - } else if (term_is_atom(target_proc)) { - target_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc)); - } else { - RAISE_ERROR(BADARG_ATOM); - } - struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor)); - monitor->target_proc = target_proc; - monitor->pid_number = term_get_external_pid_process_id(from_pid); - monitor->pid_serial = term_get_external_pid_serial(from_pid); - monitor->ref_len = term_get_external_reference_len(monitor_ref); - memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len); - if (target_process_id) { - synclist_append(&conn_obj->remote_monitors, &monitor->head); - ErlNifPid target_process_pid = target_process_id; - if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) { - synclist_remove(&conn_obj->remote_monitors, &monitor->head); - dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global); - free(monitor); - } - } else { - dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global); - free(monitor); - } + dist_monitor(conn_obj, from_pid, target_proc, monitor_ref, ctx); break; } @@ -421,6 +433,53 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[]) synclist_unlock(&conn_obj->remote_monitors); break; } + case OPERATION_SPAWN_REQUEST: { + if (UNLIKELY(arity != 6)) { + RAISE_ERROR(BADARG_ATOM); + } + term roots[4]; + roots[0] = argv[0]; + roots[1] = argv[1]; + roots[2] = control; + roots[3] = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 3, roots); + if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(4)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + control = roots[2]; + term arglist = roots[3]; + term mfa = term_get_tuple_element(control, 4); + if (UNLIKELY(!term_is_tuple(mfa) || term_get_tuple_arity(mfa) != 3)) { + RAISE_ERROR(BADARG_ATOM); + } + if (UNLIKELY(!term_is_list(arglist))) { + RAISE_ERROR(BADARG_ATOM); + } + term reqid = term_get_tuple_element(control, 1); + term from = term_get_tuple_element(control, 2); + if (UNLIKELY(!term_is_pid(from))) { + RAISE_ERROR(BADARG_ATOM); + } + // term groupleader = term_get_tuple_element(control, 3); + term options = term_get_tuple_element(control, 5); + + term request_tuple = term_alloc_tuple(4, &ctx->heap); + term_put_tuple_element(request_tuple, 0, roots[0]); + term_put_tuple_element(request_tuple, 1, reqid); + term_put_tuple_element(request_tuple, 2, from); + term_put_tuple_element(request_tuple, 3, options); + term request_opt = term_alloc_tuple(2, &ctx->heap); + term_put_tuple_element(request_opt, 0, REQUEST_ATOM); + term_put_tuple_element(request_opt, 1, request_tuple); + term spawn_opts = term_list_prepend(request_opt, term_nil(), &ctx->heap); + + // reuse roots for args + roots[0] = term_get_tuple_element(mfa, 0); + roots[1] = term_get_tuple_element(mfa, 1); + roots[2] = arglist; + roots[3] = spawn_opts; + nif_erlang_spawn_opt(ctx, 4, roots); + break; + } default: printf("Unknown distribution protocol operation id %d\n", (int) term_to_int(operation)); RAISE_ERROR(BADARG_ATOM); @@ -446,6 +505,23 @@ void dist_send_message(term external_pid, term payload, Context *ctx) synclist_unlock(&ctx->global->dist_connections); } +void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global) +{ + int flags = (link ? SPAWN_REPLY_FLAGS_LINK_CREATED : 0) + | (monitor ? SPAWN_REPLY_FLAGS_MONITOR_CREATED : 0); + // allocate tuple + BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(5), heap) + term control_message = term_alloc_tuple(5, &heap); + term_put_tuple_element(control_message, 0, term_from_int(OPERATION_SPAWN_REPLY)); + term_put_tuple_element(control_message, 1, req_id); + term_put_tuple_element(control_message, 2, to_pid); + term_put_tuple_element(control_message, 3, term_from_int(flags)); + term_put_tuple_element(control_message, 4, result); + + dist_enqueue_message(control_message, term_invalid_term(), connection, global); + END_WITH_STACK_HEAP(heap, global) +} + const struct Nif setnode_3_nif = { .base.type = NIFFunctionType, .nif_ptr = nif_erlang_setnode_3 diff --git a/src/libAtomVM/dist_nifs.h b/src/libAtomVM/dist_nifs.h index d96cfcba4..0a990d696 100644 --- a/src/libAtomVM/dist_nifs.h +++ b/src/libAtomVM/dist_nifs.h @@ -41,8 +41,35 @@ extern const struct Nif dist_ctrl_get_data_notification_nif; extern const struct Nif dist_ctrl_get_data_nif; extern const struct Nif dist_ctrl_put_data_nif; +struct DistConnection; + void dist_send_message(term external_pid, term payload, Context *ctx); +/** + * @doc Setup a monitor on a local process for a distributed process. + * @end + * @param conn_obj object of the connection + * @param from_pid remote pid setting up the monitor + * @param target_proc atom (for registered process) or pid of the local + * process to monitor + * @param monitor_ref reference used for monitor + * @param ctx context for memory allocation + */ +term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx); + +/** + * @doc Send a spawn reply signal to a node + * @end + * @param conn_obj object of the connection + * @param req_id reference identifying the request + * @param to_pid (remote) process id identifying the caller + * @param link if a link was created + * @param monitor if a monitor was created + * @param result pid of the spawned process or atom for an error + * @param ctx context for memory allocation + */ +void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global); + #ifdef __cplusplus } #endif diff --git a/src/libAtomVM/nifs.c b/src/libAtomVM/nifs.c index 2537bd131..9dd58c31b 100644 --- a/src/libAtomVM/nifs.c +++ b/src/libAtomVM/nifs.c @@ -41,6 +41,7 @@ #include "defaultatoms.h" #include "dictionary.h" #include "dist_nifs.h" +#include "erl_nif_priv.h" #include "ets.h" #include "externalterm.h" #include "globalcontext.h" @@ -134,7 +135,7 @@ static term nif_erlang_register_2(Context *ctx, int argc, term argv[]); static term nif_erlang_unregister_1(Context *ctx, int argc, term argv[]); static term nif_erlang_send_2(Context *ctx, int argc, term argv[]); static term nif_erlang_setelement_3(Context *ctx, int argc, term argv[]); -static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]); +// static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]); static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[]); static term nif_erlang_whereis_1(Context *ctx, int argc, term argv[]); static term nif_erlang_system_time_1(Context *ctx, int argc, term argv[]); @@ -1222,6 +1223,7 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term) term link_term = interop_proplist_get_value(opts_term, LINK_ATOM); term monitor_term = interop_proplist_get_value(opts_term, MONITOR_ATOM); term heap_growth_strategy = interop_proplist_get_value_default(opts_term, ATOMVM_HEAP_GROWTH_ATOM, BOUNDED_FREE_ATOM); + term request_term = interop_proplist_get_value(opts_term, REQUEST_ATOM); if (min_heap_size_term != term_nil()) { if (UNLIKELY(!term_is_integer(min_heap_size_term))) { @@ -1303,6 +1305,33 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term) term_put_tuple_element(pid_ref_tuple, 1, ref); return pid_ref_tuple; + } else if (UNLIKELY(request_term != term_nil())) { + // Handling of spawn_request + // spawn_request requires that the reply is enqueued before + // any message from the spawned process + + term dhandle = term_get_tuple_element(request_term, 0); + term request_ref = term_get_tuple_element(request_term, 1); + term request_from = term_get_tuple_element(request_term, 2); + term request_opts = term_get_tuple_element(request_term, 3); + monitor_term = interop_proplist_get_value(request_opts, MONITOR_ATOM); + // link_term = interop_proplist_get_value(request_opts, LINK_ATOM); + + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), dhandle, ctx->global->dist_connection_resource_type, &rsrc_obj_ptr))) { + RAISE_ERROR(BADARG_ATOM); + } + struct DistConnection *conn_obj = (struct DistConnection *) rsrc_obj_ptr; + + dist_spawn_reply(request_ref, request_from, false, monitor_term != term_nil(), new_pid, conn_obj, ctx->global); + + // Also setup monitor, if any. + if (monitor_term != term_nil()) { + dist_monitor(conn_obj, request_from, new_pid, request_ref, ctx); + } + + scheduler_init_ready(new_ctx); + return new_pid; } else { scheduler_init_ready(new_ctx); return new_pid; @@ -1360,7 +1389,7 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[]) return do_spawn(ctx, new_ctx, opts_term); } -static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]) +term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]) { UNUSED(argc); diff --git a/src/libAtomVM/nifs.h b/src/libAtomVM/nifs.h index d3161d0ea..7e6a46fac 100644 --- a/src/libAtomVM/nifs.h +++ b/src/libAtomVM/nifs.h @@ -48,6 +48,8 @@ extern "C" { const struct Nif *nifs_get(AtomString module, AtomString function, int arity); +// spawn opt is used by distribution nifs +term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]); #ifdef __cplusplus } #endif diff --git a/src/libAtomVM/otp_net.c b/src/libAtomVM/otp_net.c index 037baf1c6..dede024b3 100644 --- a/src/libAtomVM/otp_net.c +++ b/src/libAtomVM/otp_net.c @@ -344,8 +344,18 @@ static term nif_net_gethostname(Context *ctx, int argc, term argv[]) } return make_error_tuple(posix_errno_to_term(errno, ctx->global), ctx); } - - size_t len = strlen(buf); + // Truncate name to first dot + char *end_str = buf; + while (1) { + char c = *end_str++; + if (c == 0) { + break; + } + if (c == '.') { + break; + } + } + size_t len = end_str - buf; if (UNLIKELY(memory_ensure_free_opt(ctx, TUPLE_SIZE(2) + LIST_SIZE(len, 1), MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { RAISE_ERROR(OUT_OF_MEMORY_ATOM); } diff --git a/tests/libs/estdlib/test_net_kernel.erl b/tests/libs/estdlib/test_net_kernel.erl index 0a02ee4d1..696349a8a 100644 --- a/tests/libs/estdlib/test_net_kernel.erl +++ b/tests/libs/estdlib/test_net_kernel.erl @@ -33,6 +33,8 @@ test() -> ok = setup(Platform), ok = test_ping_from_beam(Platform), ok = test_fail_with_wrong_cookie(Platform), + ok = test_rpc_from_beam(Platform), + ok = test_rpc_loop_from_beam(Platform), ok; false -> io:format("~s: skipped\n", [?MODULE]), @@ -92,6 +94,32 @@ test_fail_with_wrong_cookie(Platform) -> net_kernel:stop(), ok. +test_rpc_from_beam(Platform) -> + {ok, _NetKernelPid} = net_kernel:start(atomvm, #{name_domain => shortnames}), + Node = node(), + erlang:set_cookie('AtomVM'), + Result = execute_command( + Platform, + "erl -sname otp -setcookie AtomVM -eval \"R = rpc:call('" ++ atom_to_list(Node) ++ + "', erlang, system_info, [machine]), erlang:display(R).\" -s init stop -noshell" + ), + true = Result =:= lists:flatten(io_lib:format("~p\r\n", [Platform])), + net_kernel:stop(), + ok. + +test_rpc_loop_from_beam(Platform) -> + {ok, _NetKernelPid} = net_kernel:start(atomvm, #{name_domain => shortnames}), + Node = node(), + erlang:set_cookie('AtomVM'), + Result = execute_command( + Platform, + "erl -sname otp -setcookie AtomVM -eval \"R = lists:foldl(fun(X, Acc) -> R = rpc:call('" ++ atom_to_list(Node) ++ + "', erlang, system_info, [machine]), if Acc =:= R -> Acc; Acc =:= undefined -> R end end, undefined, lists:seq(1, 10)), erlang:display(R).\" -s init stop -noshell" + ), + true = Result =:= lists:flatten(io_lib:format("~p\r\n", [Platform])), + net_kernel:stop(), + ok. + % On AtomVM, we need to start kernel. setup("BEAM") -> ok;