From c2c05664cec831b6efc567aa6992e0ee6fc30734 Mon Sep 17 00:00:00 2001 From: Paul Guyot Date: Sun, 5 Jan 2025 22:09:58 +0100 Subject: [PATCH] Introduce erlang distribution - Add support for handshake from OTP nodes - Add support for monitoring processes and sending messages to registered processes from OTP nodes Signed-off-by: Paul Guyot --- examples/erlang/CMakeLists.txt | 1 + examples/erlang/disterl.erl | 41 ++ libs/estdlib/src/CMakeLists.txt | 7 + libs/estdlib/src/application.erl | 37 ++ libs/estdlib/src/dist_util.erl | 482 ++++++++++++++++++++ libs/estdlib/src/erlang.erl | 89 +++- libs/estdlib/src/kernel.erl | 53 +++ libs/estdlib/src/net_kernel.erl | 318 +++++++++++++ libs/estdlib/src/net_kernel_sup.erl | 96 ++++ libs/estdlib/src/socket_dist.erl | 176 +++++++ libs/estdlib/src/socket_dist_controller.erl | 208 +++++++++ src/libAtomVM/CMakeLists.txt | 2 + src/libAtomVM/context.c | 5 +- src/libAtomVM/defaultatoms.def | 2 + src/libAtomVM/dist_nifs.c | 467 +++++++++++++++++++ src/libAtomVM/dist_nifs.h | 50 ++ src/libAtomVM/globalcontext.c | 6 +- src/libAtomVM/globalcontext.h | 2 + src/libAtomVM/nifs.c | 1 + src/libAtomVM/nifs.gperf | 4 + src/libAtomVM/opcodesswitch.h | 27 +- tests/libs/estdlib/CMakeLists.txt | 1 + tests/libs/estdlib/test_net_kernel.erl | 107 +++++ 23 files changed, 2165 insertions(+), 17 deletions(-) create mode 100644 examples/erlang/disterl.erl create mode 100644 libs/estdlib/src/application.erl create mode 100644 libs/estdlib/src/dist_util.erl create mode 100644 libs/estdlib/src/kernel.erl create mode 100644 libs/estdlib/src/net_kernel.erl create mode 100644 libs/estdlib/src/net_kernel_sup.erl create mode 100644 libs/estdlib/src/socket_dist.erl create mode 100644 libs/estdlib/src/socket_dist_controller.erl create mode 100644 src/libAtomVM/dist_nifs.c create mode 100644 src/libAtomVM/dist_nifs.h create mode 100644 tests/libs/estdlib/test_net_kernel.erl diff --git a/examples/erlang/CMakeLists.txt b/examples/erlang/CMakeLists.txt index 69f499584..46098c483 100644 --- a/examples/erlang/CMakeLists.txt +++ b/examples/erlang/CMakeLists.txt @@ -41,3 +41,4 @@ pack_runnable(mqtt_client mqtt_client estdlib eavmlib) pack_runnable(network_console network_console estdlib eavmlib alisp) pack_runnable(logging_example logging_example estdlib eavmlib) pack_runnable(http_client http_client estdlib eavmlib) +pack_runnable(disterl disterl estdlib) diff --git a/examples/erlang/disterl.erl b/examples/erlang/disterl.erl new file mode 100644 index 000000000..856d3ce1a --- /dev/null +++ b/examples/erlang/disterl.erl @@ -0,0 +1,41 @@ +% +% This file is part of AtomVM. +% +% Copyright 2024 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +-module(disterl). + +-export([start/0]). + +start() -> + {ok, _KernelPid} = kernel:start(normal, []), + {ok, _NetKernelPid} = net_kernel:start('atomvm@127.0.0.1', #{name_domain => longnames}), + io:format("Distribution was started\n"), + io:format("Node is ~p\n", [node()]), + net_kernel:set_cookie(<<"AtomVM">>), + io:format("Cookie is ~s\n", [net_kernel:get_cookie()]), + register(disterl, self()), + io:format( + "This AtomVM node is waiting for 'quit' message, and this process is registered as 'disterl'\n" + ), + io:format("On an OTP node with long names distribution, run:\n"), + io:format("erlang:set_cookie('atomvm@127.0.0.1', 'AtomVM').\n"), + io:format("{disterl, 'atomvm@127.0.0.1'} ! quit.\n"), + receive + quit -> ok + end. diff --git a/libs/estdlib/src/CMakeLists.txt b/libs/estdlib/src/CMakeLists.txt index 7c7a7e049..0c8c01b0a 100644 --- a/libs/estdlib/src/CMakeLists.txt +++ b/libs/estdlib/src/CMakeLists.txt @@ -23,11 +23,13 @@ project(estdlib) include(BuildErlang) set(ERLANG_MODULES + application base64 binary calendar code crypto + dist_util erl_epmd erts_debug ets @@ -41,6 +43,9 @@ set(ERLANG_MODULES gen_tcp_inet gen_tcp_socket supervisor + kernel + net_kernel + net_kernel_sup inet io_lib io @@ -54,6 +59,8 @@ set(ERLANG_MODULES queue sets socket + socket_dist + socket_dist_controller ssl string timer diff --git a/libs/estdlib/src/application.erl b/libs/estdlib/src/application.erl new file mode 100644 index 000000000..b2cd9b221 --- /dev/null +++ b/libs/estdlib/src/application.erl @@ -0,0 +1,37 @@ +% +% This file is part of AtomVM. +% +% Copyright 2025 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +-module(application). +-export([get_env/3]). +-export_type([start_type/0]). + +-type start_type() :: normal | {takeover, Node :: node()} | {failover, Node :: node()}. + +%%----------------------------------------------------------------------------- +%% @param Application application to get the parameter value of +%% @param Parameter parameter to get the value of +%% @param Default default value if parameter is not found +%% @returns default value +%% @doc Retrieve the value of the configuration parameter `Parameter' for +%% application `Application' or `Default' if not found. +%% @end +%%----------------------------------------------------------------------------- +-spec get_env(Application :: atom(), Parameter :: atom(), Default :: any()) -> any(). +get_env(_Application, _Parameter, Default) -> Default. diff --git a/libs/estdlib/src/dist_util.erl b/libs/estdlib/src/dist_util.erl new file mode 100644 index 000000000..087e46b8f --- /dev/null +++ b/libs/estdlib/src/dist_util.erl @@ -0,0 +1,482 @@ +% +% This file is part of AtomVM. +% +% Copyright 2024 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% +-module(dist_util). + +% This implementation is based on: +% Erlang OTP/27 distribution protocol documentation +% https://www.erlang.org/doc/apps/erts/erl_dist_protocol +% Erlang OTP/27 implementation +% https://github.com/erlang/otp/blob/maint-27/lib/kernel/src/dist_util.erl + +% The main differences with Erlang OTP/27 are as follows: +% - this implementation only accept latest protocol (epmd 6 and flags from OTP-26) +% - this implementation doesn't support named flag +% - net_kernel is responsible for handling cookies +% - interface with net_kernel is done through functions and eventually gen_server API +% - receive function shall return binaries and not lists + +% Public API +-export([ + net_ticker_spawn_options/0, + start_timer/1, + cancel_timer/1, + reset_timer/1, + handshake_other_started/1, + handshake_we_started/1, + shutdown/3, + shutdown/4 +]). + +% Required include headers +-include_lib("kernel/include/dist_util.hrl"). + +% Because this module can be compiled with OTP21 headers, we need to define +% flags that we support and that OTP-21 doesn't know. For this reason, we don't +% include dist.hrl but instead define the flags here. + +% OTP-21: +-define(DFLAG_PUBLISHED, 1). +-define(DFLAG_ATOM_CACHE, 2). +-define(DFLAG_EXTENDED_REFERENCES, 4). +-define(DFLAG_DIST_MONITOR, 8). +-define(DFLAG_FUN_TAGS, 16#10). +-define(DFLAG_DIST_MONITOR_NAME, 16#20). +-define(DFLAG_HIDDEN_ATOM_CACHE, 16#40). +-define(DFLAG_NEW_FUN_TAGS, 16#80). +-define(DFLAG_EXTENDED_PIDS_PORTS, 16#100). +-define(DFLAG_EXPORT_PTR_TAG, 16#200). +-define(DFLAG_BIT_BINARIES, 16#400). +-define(DFLAG_NEW_FLOATS, 16#800). +-define(DFLAG_UNICODE_IO, 16#1000). +-define(DFLAG_DIST_HDR_ATOM_CACHE, 16#2000). +-define(DFLAG_SMALL_ATOM_TAGS, 16#4000). +-define(DFLAG_UTF8_ATOMS, 16#10000). +-define(DFLAG_MAP_TAG, 16#20000). +-define(DFLAG_BIG_CREATION, 16#40000). +-define(DFLAG_SEND_SENDER, 16#80000). +-define(DFLAG_BIG_SEQTRACE_LABELS, 16#100000). +% OTP-22 +%% -define(DFLAG_NO_MAGIC, 16#200000). %% Used internally only +-define(DFLAG_EXIT_PAYLOAD, 16#400000). +-define(DFLAG_FRAGMENTS, 16#800000). +% OTP-23 +-define(DFLAG_HANDSHAKE_23, 16#01000000). +-define(DFLAG_SPAWN, 16#100000000). +-define(DFLAG_NAME_ME, 16#200000000). +% OTP-24 +-define(DFLAG_V4_NC, 16#400000000). +-define(DFLAG_ALIAS, 16#800000000). +% OTP-25 +-define(DFLAG_UNLINK_ID, 16#02000000). +-define(DFLAG_MANDATORY_25_DIGEST, 16#04000000). + +-define(MANDATORY_DFLAGS_25, + (?DFLAG_EXTENDED_REFERENCES bor + ?DFLAG_FUN_TAGS bor + ?DFLAG_EXTENDED_PIDS_PORTS bor + ?DFLAG_NEW_FUN_TAGS bor + ?DFLAG_EXPORT_PTR_TAG bor + ?DFLAG_BIT_BINARIES bor + ?DFLAG_NEW_FLOATS bor + ?DFLAG_UTF8_ATOMS bor + ?DFLAG_MAP_TAG bor + ?DFLAG_BIG_CREATION bor + ?DFLAG_HANDSHAKE_23) +). + +-define(MANDATORY_DFLAGS_26, + (?DFLAG_V4_NC bor + ?DFLAG_UNLINK_ID) +). + +% Analysis of erts_internal:get_dflags() on OTP-27: +% PREFERRED MANDATORY ALLOWED REJECTABLE +% ?DFLAG_PUBLISHED - - - - +% ?DFLAG_ATOM_CACHE - - - x +% ?DFLAG_EXTENDED_REFERENCES x x x - +% ?DFLAG_DIST_MONITOR x - x - +% ?DFLAG_FUN_TAGS x x x - +% ?DFLAG_DIST_MONITOR_NAME x - x - +% ?DFLAG_HIDDEN_ATOM_CACHE - - - x +% ?DFLAG_NEW_FUN_TAGS x x x - +% ?DFLAG_EXTENDED_PIDS_PORTS x x x - +% ?DFLAG_EXPORT_PTR_TAG x x x - +% ?DFLAG_BIT_BINARIES x x x - +% ?DFLAG_NEW_FLOATS x x x - +% ?DFLAG_UNICODE_IO x - x - +% ?DFLAG_DIST_HDR_ATOM_CACHE x - x x +% ?DFLAG_SMALL_ATOM_TAGS x - x - +% (reserved) +% ?DFLAG_UTF8_ATOMS x x x - +% ?DFLAG_MAP_TAG x x x - +% ?DFLAG_BIG_CREATION x x x - +% ?DFLAG_SEND_SENDER x - x - +% ?DFLAG_BIG_SEQTRACE_LABELS x - x - +% (?DFLAG_NO_MAGIC) +% ?DFLAG_EXIT_PAYLOAD x - x - +% ?DFLAG_FRAGMENTS x - x x +% ?DFLAG_HANDSHAKE_23 x x x - +% ?DFLAG_UNLINK_ID x x x - +% (?DFLAG_MANDATORY_25_DIGEST) +% (reserved) +% ?DFLAG_SPAWN x - x - +% (?DFLAG_NAME_ME) +% ?DFLAG_V4_NC x x x - +% ?DFLAG_ALIAS x - x - + +% All preferred flags can be added by remote node. +% Many flags cannot be rejected, and are thus mandatory if connection is +% initiated by OTP-27. + +-define(MANDATORY_DFLAGS, + (?MANDATORY_DFLAGS_26 bor ?MANDATORY_DFLAGS_25 bor + ?DFLAG_DIST_MONITOR bor + ?DFLAG_DIST_MONITOR_NAME bor + ?DFLAG_UNICODE_IO bor + ?DFLAG_SMALL_ATOM_TAGS bor + ?DFLAG_SEND_SENDER bor + ?DFLAG_BIG_SEQTRACE_LABELS bor + ?DFLAG_EXIT_PAYLOAD bor + ?DFLAG_SPAWN bor + ?DFLAG_ALIAS) +). + +-define(UNSUPPORTED_DFLAGS, (?DFLAG_NAME_ME)). + +-record(connection, { + kernel :: pid(), + node :: node(), + socket :: any(), + tick_intensity :: 4..1000, + get_stat, + send_tick, + tick_timeout :: non_neg_integer(), + last_read :: non_neg_integer(), + last_written :: non_neg_integer(), + dhandle :: reference() +}). + +-spec net_ticker_spawn_options() -> [any()]. +net_ticker_spawn_options() -> + [link]. + +-spec start_timer(pos_integer()) -> pid(). +start_timer(Timeout) -> + Self = self(), + spawn_link(fun() -> timer_loop(Self, Timeout) end). + +timer_loop(Pid, Timeout) -> + receive + {Pid, reset} -> timer_loop(Pid, Timeout) + after Timeout -> + ?shutdown(timer_timeout) + end. + +-spec cancel_timer(pid()) -> ok. +cancel_timer(Timer) -> + unlink(Timer), + exit(Timer, cancel_timer), + ok. + +-spec reset_timer(pid()) -> ok. +reset_timer(Timer) -> + Timer ! {self(), reset}, + ok. + +-spec handshake_other_started(#hs_data{}) -> no_return(). +handshake_other_started(#hs_data{socket = Socket, f_recv = Recv} = HSData0) -> + case Recv(Socket, 0, infinity) of + {ok, <<$N, Flags:64, Creation:32, NameLen:16, Rest/binary>>} -> + {Name, _} = split_binary(Rest, NameLen), + check_name(Name), + NodeName = binary_to_atom(Name, latin1), + HSData1 = HSData0#hs_data{ + other_node = NodeName, + other_started = true, + this_flags = ?MANDATORY_DFLAGS + }, + check_flags(Flags, HSData1), + mark_pending(HSData1), + Cookie = net_kernel:get_cookie(HSData1#hs_data.other_node), + <> = crypto:strong_rand_bytes(4), + send_challenge(Challenge, HSData1), + reset_timer(HSData1#hs_data.timer), + {OtherChallenge, OtherDigest} = recv_challenge_reply(HSData1), + check_challenge(Cookie, Challenge, OtherDigest, HSData1), + send_challenge_ack(Cookie, OtherChallenge, HSData1), + connection(HSData1, Creation); + {ok, Other} -> + ?shutdown({unexpected, Other}); + {error, Reason} -> + ?shutdown2(recv_error, Reason) + end. + +-spec handshake_we_started(#hs_data{}) -> no_return(). +handshake_we_started(#hs_data{}) -> ok. + +% We are connected +-spec connection(#hs_data{}, non_neg_integer()) -> no_return(). +connection( + #hs_data{ + kernel_pid = Kernel, + timer = Timer, + f_setopts_pre_nodeup = PreNodeUp, + f_setopts_post_nodeup = PostNodeUp, + f_handshake_complete = HandshakeComplete, + f_getll = GetLL, + f_address = Address, + mf_getstat = GetStat, + mf_tick = SendTick, + socket = Socket, + other_node = Node, + this_flags = Flags + } = HSData, + Creation +) -> + cancel_timer(Timer), + case PreNodeUp(Socket) of + ok -> ok; + Error1 -> ?shutdown2({Node, Socket}, Error1) + end, + LL = + case GetLL(Socket) of + {ok, LL0} -> LL0; + Error2 -> ?shutdown2({Node, Socket}, Error2) + end, + % We don't negotiate flags, other flags are our flags + DHandle = erlang:setnode(Node, LL, {Flags, Creation}), + AddressRecord = Address(Socket, Node), + TickIntensity = mark_nodeup(AddressRecord, HSData), + case PostNodeUp(Socket) of + ok -> ok; + Error3 -> ?shutdown2({Node, Socket}, Error3) + end, + case HandshakeComplete of + undefined -> ok; + _ -> HandshakeComplete(Socket, Node, DHandle) + end, + Connection = #connection{ + kernel = Kernel, + node = Node, + socket = Socket, + tick_intensity = TickIntensity, + get_stat = GetStat, + send_tick = SendTick, + tick_timeout = TickIntensity - 1, + last_read = 0, + last_written = 0, + dhandle = DHandle + }, + connection_loop(Connection). + +connection_loop(#connection{kernel = Kernel} = Connection) -> + receive + {Kernel, disconnect} -> + ?shutdown2({Connection#connection.node, Connection#connection.socket}, disconnected); + {Kernel, tick} -> + LastRead = Connection#connection.last_read, + LastWritten = Connection#connection.last_written, + TickTimeout = Connection#connection.tick_timeout, + Socket = Connection#connection.socket, + GetStat = Connection#connection.get_stat, + case GetStat(Socket) of + {ok, LastRead, _Write, _Pending} when TickTimeout =:= 0 -> + ?shutdown2( + {Connection#connection.node, Connection#connection.socket}, net_tick_timeout + ); + {ok, LastRead, LastWritten, 0} -> + % nothing read, nothing written and send queue is empty: send tick + SendTick = Connection#connection.send_tick, + SendTick(Socket), + connection_loop(Connection#connection{ + last_written = LastWritten + 1, tick_timeout = TickTimeout - 1 + }); + {ok, NewRead, LastWritten, _} -> + % nothing read and send queue is empty: send tick + SendTick = Connection#connection.send_tick, + SendTick(Socket), + connection_loop(Connection#connection{ + last_written = LastWritten + 1, + last_read = NewRead, + tick_timeout = Connection#connection.tick_intensity - 1 + }); + {ok, NewRead, NewWrite, _} -> + connection_loop(Connection#connection{ + last_read = NewRead, + last_written = NewWrite, + tick_timeout = Connection#connection.tick_intensity - 1 + }); + {error, Reason} -> + ?shutdown2({Connection#connection.node, Connection#connection.socket}, Reason) + end; + Other -> + io:format("Unexpected message in conn_loop : ~p\n", [Other]) + end. + +-spec check_flags(non_neg_integer(), #hs_data{}) -> ok. +check_flags(Flags0, HSData) -> + Flags1 = + if + Flags0 band ?DFLAG_MANDATORY_25_DIGEST -> + Flags0 bor ?MANDATORY_DFLAGS_25; + true -> + Flags0 + end, + CheckResult = + if + Flags1 band ?MANDATORY_DFLAGS =/= ?MANDATORY_DFLAGS -> + {error, {missing_flags, ?MANDATORY_DFLAGS bxor (Flags1 band ?MANDATORY_DFLAGS)}}; + Flags1 band ?UNSUPPORTED_DFLAGS =/= 0 -> + {error, {unsupported_flags, Flags1 band ?UNSUPPORTED_DFLAGS}}; + true -> + ok + end, + case CheckResult of + ok -> + ok; + {error, Reason} when HSData#hs_data.other_started -> + % send_status to say we don't accept this + send_status(<<"not_allowed">>, HSData), + ?shutdown(Reason); + {error, Reason} -> + ?shutdown(Reason) + end. + +% Ensure name is somewhat valid +-spec check_name(binary()) -> ok. +check_name(Name) -> + case binary:split(Name, <<"@">>, [global]) of + [<<_, _NamePartRest/binary>>, <<_, _HostPartRest/binary>>] -> + ok; + _Other -> + ?shutdown({unsupported_name, Name}) + end. + +-spec send_status(binary(), #hs_data{}) -> ok. +send_status(Status, #hs_data{socket = Socket, f_send = Send} = HSData) -> + case Send(Socket, <<$s, Status/binary>>) of + {error, _} = Error -> + ?shutdown2({HSData#hs_data.other_node, Socket}, {send_status_failed, Error}); + ok -> + ok + end. + +-spec recv_status_reply(#hs_data{}) -> binary(). +recv_status_reply(#hs_data{socket = Socket, f_recv = Recv} = HSData) -> + case Recv(Socket, 0, infinity) of + {ok, <<$s, Result/binary>>} -> + Result; + {ok, Other} -> + ?shutdown({HSData#hs_data.other_node, {unexpected, recv_status_reply, Other}}); + {error, Reason} -> + ?shutdown2({HSData#hs_data.other_node, recv_error}, Reason) + end. + +-spec mark_pending(#hs_data{}) -> ok. +mark_pending(#hs_data{kernel_pid = Kernel, this_node = ThisNode, other_node = OtherNode} = HSData) -> + case net_kernel:mark_pending(Kernel, ThisNode, OtherNode, self()) of + ok -> + send_status(<<"ok">>, HSData); + {ok_simultaneous, OtherConnPid} -> + send_status(<<"ok_simultaneous">>, HSData), + exit(OtherConnPid, shutdown); + nok -> + send_status(<<"nok">>, HSData), + ?shutdown(OtherNode); + alive -> + send_status(<<"alive">>, HSData), + reset_timer(HSData#hs_data.timer), + case recv_status_reply(HSData) of + <<"true">> -> ok; + <<"false">> -> ?shutdown(OtherNode); + Other -> ?shutdown({OtherNode, {unexpected, Other}}) + end + end. + +mark_nodeup(Address, #hs_data{kernel_pid = Kernel, other_node = Node}) -> + case net_kernel:mark_nodeup(Kernel, Node, Address, self()) of + {ok, TickIntensity} -> + TickIntensity; + {error, Reason} -> + ?shutdown2({mark_nodeup, Node, Address}, Reason) + end. + +send_challenge( + Challenge, + #hs_data{this_node = ThisNode, this_flags = ThisFlags, socket = Socket, f_send = Send} = HSData +) -> + Creation = atomvm:get_creation(), + NodeName = atom_to_binary(ThisNode, latin1), + NameLen = byte_size(NodeName), + case + Send(Socket, <<$N, ThisFlags:64, Challenge:32, Creation:32, NameLen:16, NodeName/binary>>) + of + {error, _} = Error -> + ?shutdown2({HSData#hs_data.other_node, Socket}, {send_challenge_failed, Error}); + ok -> + ok + end. + +-spec recv_challenge_reply(#hs_data{}) -> {non_neg_integer(), binary()}. +recv_challenge_reply(#hs_data{socket = Socket, f_recv = Recv} = HSData) -> + case Recv(Socket, 0, infinity) of + {ok, <<$r, OtherChallenge:32, OtherDigest:16/binary>>} -> + {OtherChallenge, OtherDigest}; + {ok, Other} -> + ?shutdown({HSData#hs_data.other_node, {unexpected, recv_challenge_reply, Other}}); + {error, Reason} -> + ?shutdown2({HSData#hs_data.other_node, recv_error}, Reason) + end. + +-spec check_challenge( + Cookie :: binary(), Challenge :: non_neg_integer(), Digest :: binary(), #hs_data{} +) -> ok. +check_challenge(Cookie, Challenge, Digest, HSData) -> + case gen_digest(Cookie, Challenge) =:= Digest of + true -> + ok; + false -> + ?shutdown({HSData#hs_data.other_node, invalid_challenge}) + end. + +-spec gen_digest(Cookie :: binary(), Challenge :: non_neg_integer()) -> binary(). +gen_digest(Cookie, Challenge) -> + crypto:hash(md5, [Cookie, integer_to_list(Challenge)]). + +-spec send_challenge_ack(Cookie :: binary(), Challenge :: non_neg_integer(), #hs_data{}) -> ok. +send_challenge_ack(Cookie, Challenge, #hs_data{socket = Socket, f_send = Send} = HSData) -> + Digest = gen_digest(Cookie, Challenge), + case Send(Socket, <<$a, Digest/binary>>) of + {error, _} = Error -> + ?shutdown2({HSData#hs_data.other_node, Socket}, {send_challenge_failed, Error}); + ok -> + ok + end. + +-spec shutdown(atom(), non_neg_integer(), term()) -> no_return(). +shutdown(Module, Line, Data) -> + shutdown(Module, Line, Data, shutdown). + +-spec shutdown(atom(), non_neg_integer(), term(), term()) -> no_return(). +shutdown(Module, Line, Data, Reason) -> + io:format("~s: shutting down connection ~p:~p, data=~p, reason=~p\n", [ + ?MODULE, Module, Line, Data, Reason + ]), + exit(Reason). diff --git a/libs/estdlib/src/erlang.erl b/libs/estdlib/src/erlang.erl index 16c5fb543..2f5750d38 100644 --- a/libs/estdlib/src/erlang.erl +++ b/libs/estdlib/src/erlang.erl @@ -110,7 +110,15 @@ timestamp/0, universaltime/0, localtime/0, - setnode/2 + setnode/2, + setnode/3, + get_cookie/0, + get_cookie/1, + set_cookie/1, + set_cookie/2, + dist_ctrl_get_data_notification/1, + dist_ctrl_get_data/1, + dist_ctrl_put_data/2 ]). -export_type([ @@ -1287,3 +1295,82 @@ localtime() -> -spec setnode(atom(), non_neg_integer()) -> true. setnode(_NodeName, _Creation) -> erlang:nif_error(undefined). + +%% @hidden +-spec setnode(node(), pid(), {non_neg_integer(), non_neg_integer()}) -> reference(). +setnode(_TargetNode, _ConnPid, _TargetFlagsCreation) -> + erlang:nif_error(undefined). + +%% @hidden +-spec dist_ctrl_get_data_notification(reference()) -> ok. +dist_ctrl_get_data_notification(_DHandle) -> + erlang:nif_error(undefined). + +%% @hidden +-spec dist_ctrl_get_data(reference()) -> none | binary(). +dist_ctrl_get_data(_DHandle) -> + erlang:nif_error(undefined). + +%% @hidden +-spec dist_ctrl_put_data(reference(), binary()) -> ok. +dist_ctrl_put_data(_DHandle, _Packet) -> + erlang:nif_error(undefined). + +%%----------------------------------------------------------------------------- +%% @returns The cookie used by this node +%% @doc Return the cookie used by this node or `nocookie' if +%% distribution was not started +%% @end +%%----------------------------------------------------------------------------- +-spec get_cookie() -> nocookie | atom(). +get_cookie() -> + try + CookieBin = net_kernel:get_cookie(), + ?MODULE:binary_to_atom(CookieBin, latin1) + catch + exit:{Reason, _} when + Reason =:= noproc; + Reason =:= shutdown; + Reason =:= killed + -> + nocookie + end. + +%%----------------------------------------------------------------------------- +%% @returns The cookie used to connect to another node +%% @doc Return the cookie used to connect to another node or `nocookie' if +%% distribution was not started +%% @end +%%----------------------------------------------------------------------------- +-spec get_cookie(node()) -> nocookie | atom(). +get_cookie(Node) -> + try + CookieBin = net_kernel:get_cookie(Node), + ?MODULE:binary_to_atom(CookieBin, latin1) + catch + exit:{Reason, _} when + Reason =:= noproc; + Reason =:= shutdown; + Reason =:= killed + -> + nocookie + end. + +%%----------------------------------------------------------------------------- +%% @param Cookie the cookie to use to connect to this node +%% @doc Set the cookie of this node. Fails if distribution was not started. +%% @end +%%----------------------------------------------------------------------------- +-spec set_cookie(Cookie :: atom()) -> ok. +set_cookie(Cookie) -> + net_kernel:set_cookie(?MODULE:atom_to_binary(Cookie, latin1)). + +%%----------------------------------------------------------------------------- +%% @param Node the node to set the cookie for +%% @param Cookie the cookie to use to connect to Node +%% @doc Set the cookie of a given node. Fails if distribution was not started. +%% @end +%%----------------------------------------------------------------------------- +-spec set_cookie(Node :: node(), Cookie :: atom()) -> ok. +set_cookie(Node, Cookie) -> + net_kernel:set_cookie(Node, ?MODULE:atom_to_binary(Cookie, latin1)). diff --git a/libs/estdlib/src/kernel.erl b/libs/estdlib/src/kernel.erl new file mode 100644 index 000000000..c099ece04 --- /dev/null +++ b/libs/estdlib/src/kernel.erl @@ -0,0 +1,53 @@ +% +% This file is part of AtomVM. +% +% Copyright 2024 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +%%----------------------------------------------------------------------------- +%% @doc Root supervisor for the kernel application. +%% @end +%%----------------------------------------------------------------------------- +-module(kernel). +-behaviour(application). +-behaviour(supervisor). + +% application behavior +-export([ + start/2, + stop/1 +]). + +% supervisor behavior +-export([ + init/1 +]). + +%% @doc Start kernel application +-spec start(StartType :: application:start_type(), StartArgs :: any()) -> + {ok, pid()} | {error, any()}. +start(_StartType, []) -> + supervisor:start_link({local, kernel_sup}, ?MODULE, []). + +%% @hidden +stop(_State) -> + ok. + +%% @hidden +init([]) -> + SupFlags = #{strategy => one_for_all}, + {ok, {SupFlags, []}}. diff --git a/libs/estdlib/src/net_kernel.erl b/libs/estdlib/src/net_kernel.erl new file mode 100644 index 000000000..5840e0c36 --- /dev/null +++ b/libs/estdlib/src/net_kernel.erl @@ -0,0 +1,318 @@ +% +% This file is part of AtomVM. +% +% Copyright 2024 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +%%----------------------------------------------------------------------------- +%% @doc An implementation of the Erlang/OTP net_kernel interface. +%% +%% This module implements a strict subset of the Erlang/OTP net_kernel +%% interface. +%% @end +%%----------------------------------------------------------------------------- +-module(net_kernel). + +-behaviour(gen_server). + +% Public API +-export([ + start/2, + stop/0, + get_state/0, + epmd_module/0 +]). + +% supervised callback +-export([ + start_link/1 +]). + +% Interface to dist_util +-export([ + mark_pending/4, + mark_nodeup/4, + get_cookie/0, + get_cookie/1, + set_cookie/1, + set_cookie/2 +]). + +% gen_server interface +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-define(SETUPTIME, 7000). +-define(NET_TICK_INTENSITY, 4). +-define(NET_TICK_TIME, 60). + +%%----------------------------------------------------------------------------- +%% @doc Start erlang distribution +%% @end +%% @param Name name of the node +%% @param Options options for distribution. Supported options are: +%% - `name_domain' : whether name should be short or long +%% - `proto_dist' : the module used for distribution (e.g. `socket_dist') +%%----------------------------------------------------------------------------- +-spec start(atom(), map()) -> {ok, pid()}. +start(Name, Options) when is_atom(Name) andalso is_map(Options) -> + ok = maps:foreach( + fun(Key, Val) -> + case Key of + name_domain when Val =:= shortnames orelse Val =:= longnames -> ok; + proto_dist when is_atom(Val) -> ok; + _ -> error({invalid_option, Key, Val}, [Name, Options]) + end + end, + Options + ), + net_kernel_sup:start(Options#{name => Name}); +start(Name, Options) when is_map(Options) -> + error(invalid_name, [Name, Options]); +start(Name, Options) -> + error(invalid_options, [Name, Options]). + +%%----------------------------------------------------------------------------- +%% @doc Stop erlang distribution +%% @end +%%----------------------------------------------------------------------------- +stop() -> + io:format("~s:~s\n", [?MODULE, ?FUNCTION_NAME]), + net_kernel_sup:stop(). + +%%----------------------------------------------------------------------------- +%% @doc Get state of erlang distribution +%% @end +%% @return a map describing state of distribution +%%----------------------------------------------------------------------------- +-spec get_state() -> map(). +get_state() -> + try + gen_server:call(net_kernel, get_state, infinity) + catch + exit:{Reason, _} when + Reason =:= noproc; + Reason =:= shutdown; + Reason =:= killed + -> + #{started => no} + end. + +epmd_module() -> + erl_epmd. + +%% @hidden +-spec mark_pending(pid(), node(), node(), pid()) -> ok | {ok_simultaneous, pid()} | nok | alive. +mark_pending(Kernel, ThisNode, OtherNode, ConnPid) -> + gen_server:call(Kernel, {mark_pending, ThisNode, OtherNode, ConnPid}). + +%% @hidden +-spec mark_nodeup(pid(), node(), term(), pid()) -> {ok, pos_integer()} | {error, any()}. +mark_nodeup(Kernel, OtherNode, OtherAddress, ConnPid) -> + gen_server:call(Kernel, {mark_nodeup, OtherNode, OtherAddress, ConnPid}). + +%% @hidden +-spec get_cookie() -> binary(). +get_cookie() -> + gen_server:call(net_kernel, get_cookie, infinity). + +%% @hidden +-spec get_cookie(node()) -> binary(). +get_cookie(Node) -> + gen_server:call(net_kernel, {get_cookie, Node}, infinity). + +%% @hidden +-spec set_cookie(binary()) -> ok. +set_cookie(Cookie) -> + gen_server:call(net_kernel, {set_cookie, Cookie}, infinity). + +%% @hidden +-spec set_cookie(node(), binary()) -> ok. +set_cookie(Node, Cookie) -> + gen_server:call(net_kernel, {set_cookie, Node, Cookie}, infinity). + +%% @hidden +start_link(Options) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, Options, []). + +%%----------------------------------------------------------------------------- +-record(state, { + ticker :: pid(), + longnames :: boolean(), + node :: node(), + listen :: any(), + accept_pid :: pid(), + proto_dist :: module(), + connections :: map(), + cookies :: map(), + cookie :: binary() +}). + +%% @hidden +init(Options) -> + process_flag(trap_exit, true), + LongNames = maps:get(name_domain, Options, longnames) =:= longnames, + ProtoDist = maps:get(proto_dist, Options, socket_dist), + NameOption = maps:get(name, Options), + Cookie = crypto:strong_rand_bytes(16), + {Name, Node} = split_node(NameOption, LongNames), + TickInterval = (?NET_TICK_TIME * 1000) div ?NET_TICK_INTENSITY, + Self = self(), + Ticker = spawn_link(fun() -> ticker(Self, TickInterval) end), + case ProtoDist:listen(Name) of + {ok, {Listen, _Address, Creation}} -> + true = erlang:setnode(Node, Creation), + AcceptPid = ProtoDist:accept(Listen), + {ok, #state{ + ticker = Ticker, + longnames = LongNames, + node = Node, + listen = Listen, + accept_pid = AcceptPid, + proto_dist = ProtoDist, + connections = maps:new(), + cookies = maps:new(), + cookie = Cookie + }}; + {error, Reason} -> + {stop, Reason} + end. + +%% @hidden +handle_call(get_state, _From, #state{longnames = Longnames} = State) -> + NameDomain = + case Longnames of + true -> longnames; + false -> shortnames + end, + Result = #{ + started => dynamic, + name_type => static, + name => node(), + name_domain => NameDomain + }, + {reply, Result, State}; +handle_call( + {mark_pending, ThisNode, OtherNode, ConnPid}, _From, #state{connections = Connections} = State0 +) -> + case maps:find(OtherNode, Connections) of + error -> + {reply, ok, State0#state{ + connections = maps:put(OtherNode, {pending, ConnPid}, Connections) + }}; + {ok, {pending, OtherConnPid}} when OtherNode > ThisNode -> + {reply, {ok_simultaneous, OtherConnPid}, State0#state{ + connections = maps:update(OtherNode, {pending, ConnPid}, Connections) + }}; + {ok, {pending, _OtherConnPid}} -> + {reply, nok, State0}; + {ok, {alive, _ConnPid, _Address}} -> + {reply, alive, State0} + end; +handle_call( + {mark_nodeup, OtherNode, OtherAddress, ConnPid}, + _From, + #state{connections = Connections0} = State0 +) -> + Connections1 = maps:update(OtherNode, {alive, ConnPid, OtherAddress}, Connections0), + State1 = State0#state{connections = Connections1}, + {reply, {ok, ?NET_TICK_INTENSITY}, State1}; +handle_call(get_cookie, _From, #state{cookie = Cookie} = State) -> + {reply, Cookie, State}; +handle_call({get_cookie, Node}, _From, #state{cookie = Cookie, cookies = Cookies} = State) -> + case maps:find(Node, Cookies) of + error -> {reply, Cookie, State}; + {ok, NodeCookie} -> {reply, NodeCookie, State} + end; +handle_call({set_cookie, Cookie}, _From, #state{} = State) -> + {reply, ok, State#state{cookie = Cookie}}; +handle_call({set_cookie, Node, Cookie}, _From, #state{cookies = Cookies0} = State) -> + Cookies1 = maps:put(Node, Cookie, Cookies0), + {reply, ok, State#state{cookies = Cookies1}}; +handle_call({is_auth, _Node}, _From, State) -> + {reply, yes, State}. + +%% @hidden +handle_cast(_Message, State) -> + {noreply, State}. + +%% @hidden +handle_info({accept, AcceptPid, SocketPid, inet, tcp}, #state{proto_dist = ProtoDist} = State) -> + Pid = ProtoDist:accept_connection(AcceptPid, SocketPid, State#state.node, [], ?SETUPTIME), + AcceptPid ! {self(), controller, Pid}, + {noreply, State}; +handle_info(tick, #state{connections = Connections} = State) -> + maps:fold( + fun(_Node, Status, ok) -> + case Status of + {alive, ConnPid, _Address} -> + ConnPid ! {self(), tick}; + _ -> + ok + end + end, + ok, + Connections + ), + {noreply, State}; +handle_info({'EXIT', Ticker, _Reason}, #state{ticker = Ticker} = State) -> + TickInterval = (?NET_TICK_TIME * 1000) div ?NET_TICK_INTENSITY, + Self = self(), + NewTicker = spawn_link(fun() -> ticker(Self, TickInterval) end), + {noreply, State#state{ticker = NewTicker}}; +handle_info( + {'EXIT', AcceptPid, _Reason}, + #state{accept_pid = AcceptPid, listen = Listen, proto_dist = ProtoDist} = State +) -> + NewAcceptPid = ProtoDist:accept(Listen), + {noreply, State#state{accept_pid = NewAcceptPid}}; +handle_info({'EXIT', Pid, _Reason}, #state{connections = Connections} = State) -> + NewConnections = maps:filter( + fun(_Node, Status) -> + case Status of + {alive, Pid, _Address} -> false; + {pending, Pid} -> false; + _ -> true + end + end, + Connections + ), + {noreply, State#state{connections = NewConnections}}. + +%% @hidden +terminate(_Reason, State) -> + io:format("~s:~s\n", [?MODULE, ?FUNCTION_NAME]), + {ok, State}. + +split_node(NameOption, true) -> + [NamePart, _HostPart] = binary:split(atom_to_binary(NameOption, utf8), <<"@">>, [global]), + {NamePart, NameOption}; +split_node(NameOption, false) -> + {ok, Hostname} = net:gethostname(), + {atom_to_list(NameOption), list_to_atom(atom_to_list(NameOption) ++ "@" ++ Hostname)}. + +ticker(Kernel, TickInterval) -> + receive + after TickInterval -> ok + end, + Kernel ! tick, + ticker(Kernel, TickInterval). diff --git a/libs/estdlib/src/net_kernel_sup.erl b/libs/estdlib/src/net_kernel_sup.erl new file mode 100644 index 000000000..19e0cd480 --- /dev/null +++ b/libs/estdlib/src/net_kernel_sup.erl @@ -0,0 +1,96 @@ +% +% This file is part of AtomVM. +% +% Copyright 2024 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +%%----------------------------------------------------------------------------- +%% @doc Supervisor for erlang distribution. +%% +%% This module matches OTP's undocumented erl_distribution module and serves +%% as the supervisor for everything erlang distribution related. +%% @end +%%----------------------------------------------------------------------------- +-module(net_kernel_sup). + +-behaviour(supervisor). + +% api to net_kernel +-export([ + start/1, + stop/0 +]). + +% supervisor interface +-export([ + init/1 +]). + +% supervised callback +-export([ + start_link/1 +]). + +%% @hidden +-spec start(map()) -> {ok, pid()} | {error, any()}. +start(Options) -> + ChildSpec = #{ + id => ?MODULE, + start => {?MODULE, start_link, [Options]}, + restart => permanent, + shutdown => 1000, + type => supervisor, + modules => [?MODULE] + }, + supervisor:start_child(kernel_sup, ChildSpec). + +%% @hidden +-spec stop() -> ok | {error, any()}. +stop() -> + io:format("~s:~s\n", [?MODULE, ?FUNCTION_NAME]), + case supervisor:terminate_child(kernel_sup, ?MODULE) of + ok -> + supervisor:delete_child(kernel_sup, ?MODULE); + Error -> + Error + end. + +%% @hidden +start_link(Options) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, [Options]). + +init(Options) -> + ChildrenSpec = [ + #{ + id => erl_epmd, + start => {erl_epmd, start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => [erl_epmd] + }, + #{ + id => net_kernel, + start => {net_kernel, start_link, Options}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => [net_kernel] + } + ], + SupFlags = #{strategy => one_for_all}, + {ok, {SupFlags, ChildrenSpec}}. diff --git a/libs/estdlib/src/socket_dist.erl b/libs/estdlib/src/socket_dist.erl new file mode 100644 index 000000000..dea43895a --- /dev/null +++ b/libs/estdlib/src/socket_dist.erl @@ -0,0 +1,176 @@ +% +% This file is part of AtomVM. +% +% Copyright 2024 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% +-module(socket_dist). + +% dist interface +-export([ + listen/1, + accept/1, + accept_connection/5, + setup/5, + close/1, + select/1, + address/0 +]). + +% Required include headers +-include_lib("kernel/include/net_address.hrl"). +-include_lib("kernel/include/dist.hrl"). +-include_lib("kernel/include/dist_util.hrl"). + +-spec listen(string()) -> {ok, {any(), #net_address{}, pos_integer()}} | {error, any()}. +listen(Name) -> + {ok, LSock} = socket:open(inet, stream, tcp), + ok = socket:bind(LSock, #{ + family => inet, + port => 0, + addr => {0, 0, 0, 0} + }), + ok = socket:listen(LSock), + {ok, #{addr := Addr, port := Port}} = socket:sockname(LSock), + ErlEpmd = net_kernel:epmd_module(), + Address = #net_address{ + host = Addr, + protocol = tcp, + family = inet + }, + case ErlEpmd:register_node(Name, Port) of + {ok, Creation} -> + {ok, {LSock, Address, Creation}}; + Error -> + Error + end. + +-spec address() -> #net_address{}. +address() -> + #net_address{ + host = {0, 0, 0, 0}, + protocol = tcp, + family = inet + }. + +-spec accept(any()) -> pid(). +accept(Listen) -> + Kernel = self(), + spawn_link(fun() -> accept_loop(Kernel, Listen) end). + +accept_loop(Kernel, LSock) -> + case socket:accept(LSock) of + {ok, CSock} -> + {ok, DistController} = socket_dist_controller:start(CSock), + Kernel ! {accept, self(), DistController, inet, tcp}, + receive + {Kernel, controller, SupervisorPid} -> + true = socket_dist_controller:supervisor(DistController, SupervisorPid); + {Kernel, unsupported_protocol} -> + exit(unsupported_protocol) + end, + accept_loop(Kernel, LSock); + {error, _} = Error -> + exit(Error) + end. + +accept_connection(_AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> + Kernel = self(), + spawn_opt( + fun() -> do_accept_connection(Kernel, DistCtrl, MyNode, Allowed, SetupTime) end, + dist_util:net_ticker_spawn_options() + ). + +do_accept_connection(Kernel, DistCtrl, MyNode, Allowed, SetupTime) -> + Timer = dist_util:start_timer(SetupTime), + HSData = hs_data(Kernel, MyNode, DistCtrl, Allowed, undefined, undefined, undefined, Timer), + dist_util:handshake_other_started(HSData). + +hs_data(Kernel, MyNode, DistCtrl, Allowed, OtherNode, OtherVersion, Type, Timer) -> + #hs_data{ + kernel_pid = Kernel, + this_node = MyNode, + socket = DistCtrl, + timer = Timer, + this_flags = 0, + allowed = Allowed, + other_node = OtherNode, + other_version = OtherVersion, + f_send = fun socket_dist_controller:send/2, + f_recv = fun socket_dist_controller:recv/3, + f_setopts_pre_nodeup = fun socket_dist_controller:setopts_pre_nodeup/1, + f_setopts_post_nodeup = fun socket_dist_controller:setopts_post_nodeup/1, + f_getll = fun socket_dist_controller:getll/1, + f_address = fun socket_dist_controller:address/2, + f_handshake_complete = fun socket_dist_controller:handshake_complete/3, + mf_tick = fun socket_dist_controller:tick/1, + mf_getstat = fun socket_dist_controller:getstat/1, + request_type = Type + }. + +setup(Node, Type, MyNode, LongOrShortNames, SetupTime) -> + Kernel = self(), + spawn_opt( + fun() -> do_setup(Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) end, + dist_util:net_ticker_spawn_options() + ). + +do_setup(Kernel, Node, Type, MyNode, _LongOrShortNames, SetupTime) -> + case string:split(atom_to_list(Node), "@") of + [Name, Address] -> + Timer = dist_util:start_timer(SetupTime), + case inet:getaddr(Address, inet) of + {ok, Ip} -> + dist_util:reset_timer(Timer), + ErlEpmd = net_kernel:epmd_module(), + case ErlEpmd:port_please(Name, Ip) of + {port, TcpPort, Version} -> + dist_util:reset_timer(Timer), + {ok, Sock} = socket:open(inet, stream, tcp), + case + socket:connect(Sock, #{family => inet, addr => Ip, port => TcpPort}) + of + ok -> + {ok, DistController} = socket_dist_controller:start(Sock), + HSData = hs_data( + Kernel, + MyNode, + DistController, + [], + Node, + Version, + Type, + Timer + ), + dist_util:handshake_we_started(HSData); + _ -> + ?shutdown(Node) + end; + _ -> + ?shutdown(Node) + end; + _ -> + ?shutdown(Node) + end; + _ -> + ?shutdown(Node) + end. + +close(Listen) -> + socket:close(Listen). + +select(_Node) -> + true. diff --git a/libs/estdlib/src/socket_dist_controller.erl b/libs/estdlib/src/socket_dist_controller.erl new file mode 100644 index 000000000..7c7cb61f8 --- /dev/null +++ b/libs/estdlib/src/socket_dist_controller.erl @@ -0,0 +1,208 @@ +% +% This file is part of AtomVM. +% +% Copyright 2024 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% +-module(socket_dist_controller). + +% Required include headers +-include_lib("kernel/include/net_address.hrl"). + +% BEAM's dist_util expect packets to be list of integers. +-ifdef(BEAM_INTERFACE). +-define(POST_PROCESS(Packet), binary_to_list(Packet)). +-define(PRE_PROCESS(Packet), iolist_to_binary(Packet)). +-else. +-define(POST_PROCESS(Packet), Packet). +-define(PRE_PROCESS(Packet), Packet). +-endif. + +% interface with socket_dist +-export([ + start/1, + supervisor/2, + recv/3, + send/2, + setopts_pre_nodeup/1, + setopts_post_nodeup/1, + getll/1, + address/2, + tick/1, + getstat/1, + handshake_complete/3 +]). + +% gen_server API +-behaviour(gen_server). + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +start(Socket) -> + gen_server:start(?MODULE, Socket, []). + +supervisor(Controller, Pid) -> + gen_server:call(Controller, {supervisor, Pid}). + +recv(Controller, Length, Timeout) -> + Socket = gen_server:call(Controller, socket), + Result = + case socket:recv(Socket, 2 + Length, Timeout) of + {ok, <>} -> + Remaining = Len - byte_size(Tail), + case Remaining of + 0 -> + {ok, Tail}; + _ -> + case socket:recv(Socket, Remaining, Timeout) of + {ok, Packet} -> + {ok, ?POST_PROCESS(<>)}; + {error, _} = Err0 -> + Err0 + end + end; + {error, _} = Err1 -> + Err1 + end, + Result. + +send(Controller, Data) -> + Socket = gen_server:call(Controller, socket), + DataBin = iolist_to_binary(Data), + DataSize = byte_size(DataBin), + socket:send(Socket, <>). + +setopts_pre_nodeup(_Controller) -> + ok. + +setopts_post_nodeup(_Controller) -> + ok. + +getll(Controller) -> + {ok, Controller}. + +address(Controller, Node) -> + Socket = gen_server:call(Controller, socket), + {ok, #{addr := Addr, port := Port}} = socket:peername(Socket), + case string:split(atom_to_list(Node), "@") of + [_Name, Host] -> + #net_address{address = {Addr, Port}, host = Host, protocol = tcp, family = inet}; + _ -> + {error, no_node} + end. + +tick(Controller) -> + gen_server:cast(Controller, tick). + +getstat(Controller) -> + gen_server:call(Controller, getstat). + +handshake_complete(Controller, _Node, DHandle) -> + gen_server:cast(Controller, {handshake_complete, DHandle}), + ok. + +-record(state, { + socket :: socket:socket(), + dhandle :: reference() | undefined, + select_handle :: reference() | undefined, + buffer :: binary(), + received :: non_neg_integer(), + sent :: non_neg_integer() +}). + +init(Socket) -> + {ok, #state{socket = Socket, buffer = <<>>, received = 0, sent = 0}}. + +handle_call({supervisor, Pid}, _From, #state{} = State0) -> + Result = link(Pid), + {reply, Result, State0}; +handle_call(socket, _From, #state{socket = Socket} = State) -> + {reply, Socket, State}; +handle_call(getstat, _From, #state{received = Received, sent = Sent} = State) -> + {reply, {ok, Received, Sent, 0}, State}. + +handle_cast(tick, #state{socket = Socket, sent = Sent} = State) -> + socket:send(Socket, <<0:32>>), + {noreply, State#state{sent = Sent + 1}}; +handle_cast({handshake_complete, DHandle}, State0) -> + ok = erlang:dist_ctrl_get_data_notification(DHandle), + % We now need to get messages when data is coming. + State1 = State0#state{dhandle = DHandle}, + State2 = recv_data_loop(State1), + {noreply, State2}. + +handle_info(dist_data, State0) -> + State1 = send_data_loop(State0), + {noreply, State1}; +handle_info( + {'$socket', Socket, select, SelectHandle}, + #state{socket = Socket, select_handle = SelectHandle} = State0 +) -> + State1 = recv_data_loop(State0), + {noreply, State1}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +recv_data_loop( + #state{dhandle = DHandle, socket = Socket, buffer = Buffer, received = Received} = State +) -> + case socket:recv(Socket, 0, nowait) of + {ok, Data} -> + {NewBuffer, NewReceived} = process_recv_buffer( + DHandle, <>, Received + ), + recv_data_loop(State#state{buffer = NewBuffer, received = NewReceived}); + {select, {{select_info, recv, SelectHandle}, Data}} when is_reference(SelectHandle) -> + {NewBuffer, NewReceived} = process_recv_buffer( + DHandle, <>, Received + ), + State#state{buffer = NewBuffer, select_handle = SelectHandle, received = NewReceived}; + {select, {select_info, recv, SelectHandle}} when is_reference(SelectHandle) -> + State#state{select_handle = SelectHandle} + end. + +process_recv_buffer(DHandle, <>, Received) when byte_size(Rest) >= Size -> + {Packet, Tail} = split_binary(Rest, Size), + case Packet of + <<>> -> ok; + _ -> ok = erlang:dist_ctrl_put_data(DHandle, Packet) + end, + process_recv_buffer(DHandle, Tail, Received + 1); +process_recv_buffer(_DHandle, Other, Received) -> + {Other, Received}. + +send_data_loop(#state{dhandle = DHandle, socket = Socket, sent = Sent} = State) -> + case erlang:dist_ctrl_get_data(DHandle) of + none -> + ok = erlang:dist_ctrl_get_data_notification(DHandle), + State; + Data -> + DataBin = ?PRE_PROCESS(Data), + DataSize = byte_size(DataBin), + socket:send(Socket, <>), + send_data_loop(State#state{sent = Sent + 1}) + end. diff --git a/src/libAtomVM/CMakeLists.txt b/src/libAtomVM/CMakeLists.txt index 80046a695..3af3819ec 100644 --- a/src/libAtomVM/CMakeLists.txt +++ b/src/libAtomVM/CMakeLists.txt @@ -33,6 +33,7 @@ set(HEADER_FILES defaultatoms.def defaultatoms.h dictionary.h + dist_nifs.h erl_nif.h erl_nif_priv.h ets.h @@ -83,6 +84,7 @@ set(SOURCE_FILES debug.c defaultatoms.c dictionary.c + dist_nifs.c ets.c ets_hashtable.c externalterm.c diff --git a/src/libAtomVM/context.c b/src/libAtomVM/context.c index 1e8741f58..0242ce6df 100644 --- a/src/libAtomVM/context.c +++ b/src/libAtomVM/context.c @@ -147,9 +147,6 @@ void context_destroy(Context *ctx) // Eventually call resource monitors handlers after the processes table was unlocked // The monitors were removed from the list of monitors. if (resource_monitor) { - ErlNifEnv env; - erl_nif_env_partial_init_from_globalcontext(&env, ctx->global); - struct ListHead monitors; list_prepend(&resource_monitor->base.monitor_list_head, &monitors); @@ -159,7 +156,7 @@ void context_destroy(Context *ctx) struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head); void *resource = term_to_term_ptr(monitor->monitor_obj); struct RefcBinary *refc = refc_binary_from_data(resource); - refc->resource_type->down(&env, resource, &ctx->process_id, &monitor->ref_ticks); + refc->resource_type->down(erl_nif_env_from_context(ctx), resource, &ctx->process_id, &monitor->ref_ticks); free(monitor); } } diff --git a/src/libAtomVM/defaultatoms.def b/src/libAtomVM/defaultatoms.def index c19da3f47..6146ec573 100644 --- a/src/libAtomVM/defaultatoms.def +++ b/src/libAtomVM/defaultatoms.def @@ -176,3 +176,5 @@ X(INET_ATOM, "\x4", "inet") X(TIMEOUT_ATOM, "\x7", "timeout") X(SHUTDOWN_ATOM, "\x8", "shutdown") + +X(DIST_DATA_ATOM, "\x9", "dist_data") diff --git a/src/libAtomVM/dist_nifs.c b/src/libAtomVM/dist_nifs.c new file mode 100644 index 000000000..b58141e8a --- /dev/null +++ b/src/libAtomVM/dist_nifs.c @@ -0,0 +1,467 @@ +/* + * This file is part of AtomVM. + * + * Copyright 2024 Paul Guyot + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + */ + +/** + * @file dist_nifs.c + * @brief Implementation of distribution NIFs and resources + */ + +#include "dist_nifs.h" +#include "defaultatoms.h" +#include "erl_nif.h" +#include "erl_nif_priv.h" +#include "externalterm.h" +#include "globalcontext.h" +#include "list.h" +#include "memory.h" +#include "nifs.h" +#include "synclist.h" +#include "term.h" +#include "utils.h" + +enum +{ + OPERATION_LINK = 1, + // OPERATION_SEND = 2, + // OPERATION_EXIT = 3, + // OPERATION_UNLINK = 4, + OPERATION_NODE_LINK = 5, + OPERATION_REG_SEND = 6, + OPERATION_GROUP_LEADER = 7, + // OPERATION_EXIT2 = 8, + // OPERATION_SEND_TT = 12, + // OPERATION_EXIT_TT = 13, + OPERATION_REG_SEND_TT = 16, + // OPERATION_EXIT2_TT = 18, + OPERATION_MONITOR_P = 19, + OPERATION_DEMONITOR_P = 20, + // OPERATION_MONITOR_P_EXIT = 21, + OPERATION_SEND_SENDER = 22, + OPERATION_SEND_SENDER_TT = 23, + OPERATION_PAYLOAD_EXIT = 24, + OPERATION_PAYLOAD_EXIT_TT = 25, + OPERATION_PAYLOAD_EXIT2 = 26, + OPERATION_PAYLOAD_EXIT2_TT = 27, + OPERATION_PAYLOAD_MONITOR_P_EXIT = 28, + OPERATION_SPAWN_REQUEST = 29, + OPERATION_SPAWN_REQUEST_TT = 30, + OPERATION_SPAWN_REPLY = 31, + OPERATION_SPAWN_REPLY_TT = 32, + OPERATION_UNLINK_ID = 35, + OPERATION_UNLINK_ID_ACK = 36, + OPERATION_ALIAS_SEND = 33, + OPERATION_ALIAS_SEND_TT = 34, +}; + +struct DistributionPacket +{ + struct ListHead head; + size_t size; + uint8_t bytes[]; +}; + +struct RemoteMonitor +{ + struct ListHead head; + term target_proc; // atom or local pid + uint8_t ref_len; + uint32_t ref_words[5]; + uint32_t pid_number; + uint32_t pid_serial; + ErlNifMonitor process_monitor; +}; + +struct DistConnection +{ + struct ListHead head; + int node_atom_index; + uint32_t node_creation; + int32_t selecting_process_id; + ErlNifMonitor connection_process_monitor; + struct SyncList remote_monitors; + struct SyncList pending_packets; +}; + +static void dist_connection_dtor(ErlNifEnv *caller_env, void *obj) +{ + struct DistConnection *conn_obj = (struct DistConnection *) obj; + enif_demonitor_process(caller_env, conn_obj, &conn_obj->connection_process_monitor); + synclist_remove(&caller_env->global->dist_connections, &conn_obj->head); +} + +static void dist_enqueue_message(term control_message, term payload, struct DistConnection *connection, GlobalContext *global) +{ + size_t control_message_size = 0; // some compilers including esp-idf 5.0.7 is not smart enough + enum ExternalTermResult serialize_result = externalterm_compute_external_size(control_message, &control_message_size, global); + if (LIKELY(serialize_result == EXTERNAL_TERM_OK)) { + size_t payload_size = 0; + if (!term_is_invalid_term(payload)) { + serialize_result = externalterm_compute_external_size(payload, &payload_size, global); + } + if (LIKELY(serialize_result == EXTERNAL_TERM_OK)) { + struct DistributionPacket *packet = malloc(sizeof(struct DistributionPacket) + 1 + control_message_size + payload_size); + if (LIKELY(packet != NULL)) { + packet->size = 1 + control_message_size + payload_size; + packet->bytes[0] = 112; + externalterm_serialize_term(&packet->bytes[1], control_message, global); + if (!term_is_invalid_term(payload)) { + externalterm_serialize_term(&packet->bytes[1 + control_message_size], payload, global); + } + // Use the lock on the list of pending packets to notify process + struct ListHead *pending_packets = synclist_wrlock(&connection->pending_packets); + list_append(pending_packets, &packet->head); + int32_t selecting_process_id = connection->selecting_process_id; + if (selecting_process_id != INVALID_PROCESS_ID) { + connection->selecting_process_id = INVALID_PROCESS_ID; + } + synclist_unlock(&connection->pending_packets); + if (selecting_process_id != INVALID_PROCESS_ID) { + globalcontext_send_message(global, selecting_process_id, DIST_DATA_ATOM); + } + } + } + } +} + +static void dist_enqueue_send_sender_message(int32_t local_process_id, term remote_process_id, term payload, struct DistConnection *connection, GlobalContext *global) +{ + BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(2) + EXTERNAL_PID_SIZE, heap) + term control_message = term_alloc_tuple(3, &heap); + term_put_tuple_element(control_message, 0, term_from_int(OPERATION_SEND_SENDER)); + term_put_tuple_element(control_message, 1, term_from_local_process_id(local_process_id)); + term_put_tuple_element(control_message, 2, remote_process_id); + + dist_enqueue_message(control_message, payload, connection, global); + END_WITH_STACK_HEAP(heap, global) +} + +static void dist_enqueue_monitor_exit_message(struct RemoteMonitor *monitor, term reason, struct DistConnection *connection, GlobalContext *global) +{ + BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(4) + EXTERNAL_PID_SIZE + EXTERNAL_REF_SIZE(5), heap) + term control_message = term_alloc_tuple(4, &heap); + term_put_tuple_element(control_message, 0, term_from_int(OPERATION_PAYLOAD_MONITOR_P_EXIT)); + term_put_tuple_element(control_message, 1, monitor->target_proc); + term external_pid = term_make_external_process_id(term_from_atom_index(connection->node_atom_index), monitor->pid_number, monitor->pid_serial, connection->node_creation, &heap); + term_put_tuple_element(control_message, 2, external_pid); + term external_ref = term_make_external_reference(term_from_atom_index(connection->node_atom_index), monitor->ref_len, monitor->ref_words, connection->node_creation, &heap); + term_put_tuple_element(control_message, 3, external_ref); + + dist_enqueue_message(control_message, reason, connection, global); + END_WITH_STACK_HEAP(heap, global) +} + +static void dist_connection_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNifMonitor *mon) +{ + UNUSED(pid); + UNUSED(mon); + + struct DistConnection *conn_obj = (struct DistConnection *) obj; + + if (UNLIKELY(enif_compare_monitors(&conn_obj->connection_process_monitor, mon) == 0)) { + struct RefcBinary *rsrc_refc = refc_binary_from_data(obj); + refc_binary_decrement_refcount(rsrc_refc, caller_env->global); + } else { + struct ListHead *remote_monitors = synclist_wrlock(&conn_obj->remote_monitors); + struct ListHead *item; + LIST_FOR_EACH (item, remote_monitors) { + struct RemoteMonitor *remote_monitor = GET_LIST_ENTRY(item, struct RemoteMonitor, head); + if (enif_compare_monitors(&remote_monitor->process_monitor, mon) == 0) { + // Found monitor. + // We need to find out the exit reason. This function is called + // from context_destroy. The process is no longer in the table + // and cannot be found anymore. However, it is invoked from + // caller_env which really is the destroyed process. + term reason; + if (erl_nif_env_is_context(caller_env)) { + Context *ctx = (Context *) caller_env; + reason = ctx->exit_reason; + } else { + reason = UNDEFINED_ATOM; + } + dist_enqueue_monitor_exit_message(remote_monitor, reason, conn_obj, caller_env->global); + list_remove(&remote_monitor->head); + free(remote_monitor); + break; + } + } + } + synclist_unlock(&conn_obj->remote_monitors); +} + +const ErlNifResourceTypeInit dist_connection_resource_type_init = { + .members = 3, + .dtor = dist_connection_dtor, + .stop = NULL, + .down = dist_connection_down, +}; + +static term nif_erlang_setnode_3(Context *ctx, int argc, term argv[]) +{ + UNUSED(argc); + + VALIDATE_VALUE(argv[0], term_is_atom); + VALIDATE_VALUE(argv[1], term_is_local_pid_or_port); + VALIDATE_VALUE(argv[2], term_is_tuple); + if (UNLIKELY(term_get_tuple_arity(argv[2]) != 2)) { + RAISE_ERROR(BADARG_ATOM); + } + VALIDATE_VALUE(term_get_tuple_element(argv[2], 0), term_is_any_integer); + VALIDATE_VALUE(term_get_tuple_element(argv[2], 1), term_is_any_integer); + // Ignore flags for now + uint32_t creation = term_maybe_unbox_int(term_get_tuple_element(argv[2], 1)); + int node_atom_index = term_to_atom_index(argv[0]); + + // Ensure we don't already know this node. + struct ListHead *dist_connections = synclist_wrlock(&ctx->global->dist_connections); + struct ListHead *item; + LIST_FOR_EACH (item, dist_connections) { + struct DistConnection *dist_connection = GET_LIST_ENTRY(item, struct DistConnection, head); + if (dist_connection->node_atom_index == node_atom_index && dist_connection->node_creation == creation) { + synclist_unlock(&ctx->global->dist_connections); + RAISE_ERROR(BADARG_ATOM); + } + } + + // Create a resource object + struct DistConnection *conn_obj = enif_alloc_resource(ctx->global->dist_connection_resource_type, sizeof(struct DistConnection)); + if (IS_NULL_PTR(conn_obj)) { + synclist_unlock(&ctx->global->dist_connections); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + conn_obj->node_atom_index = node_atom_index; + conn_obj->node_creation = creation; + synclist_init(&conn_obj->remote_monitors); + synclist_init(&conn_obj->pending_packets); + + ErlNifEnv *env = erl_nif_env_from_context(ctx); + int32_t process_pid = term_to_local_process_id(argv[1]); + if (UNLIKELY(enif_monitor_process(env, conn_obj, &process_pid, &conn_obj->connection_process_monitor) != 0)) { + synclist_unlock(&ctx->global->dist_connections); + RAISE_ERROR(BADARG_ATOM); + } + list_prepend(dist_connections, &conn_obj->head); + synclist_unlock(&ctx->global->dist_connections); + + // Increment reference count as the resource should be alive until controller process dies + struct RefcBinary *rsrc_refc = refc_binary_from_data(conn_obj); + refc_binary_increment_refcount(rsrc_refc); + + if (UNLIKELY(memory_ensure_free_opt(ctx, TERM_BOXED_RESOURCE_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + return term_from_resource(conn_obj, &ctx->heap); +} + +static term nif_erlang_dist_ctrl_get_data_notification(Context *ctx, int argc, term argv[]) +{ + UNUSED(argc); + + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), argv[0], ctx->global->dist_connection_resource_type, &rsrc_obj_ptr))) { + RAISE_ERROR(BADARG_ATOM); + } + struct DistConnection *conn_obj = (struct DistConnection *) rsrc_obj_ptr; + + struct ListHead *pending_packets = synclist_wrlock(&conn_obj->pending_packets); + UNUSED(pending_packets); // without SMP, this would appear as a statement with no effect + conn_obj->selecting_process_id = ctx->process_id; + synclist_unlock(&conn_obj->pending_packets); + + return OK_ATOM; +} + +static term nif_erlang_dist_ctrl_get_data(Context *ctx, int argc, term argv[]) +{ + UNUSED(argc); + + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), argv[0], ctx->global->dist_connection_resource_type, &rsrc_obj_ptr))) { + RAISE_ERROR(BADARG_ATOM); + } + struct DistConnection *conn_obj = (struct DistConnection *) rsrc_obj_ptr; + + term result; + + struct ListHead *pending_packets = synclist_wrlock(&conn_obj->pending_packets); + if (list_is_empty(pending_packets)) { + result = NONE_ATOM; + } else { + struct ListHead *first = list_first(pending_packets); + struct DistributionPacket *packet = GET_LIST_ENTRY(first, struct DistributionPacket, head); + if (UNLIKELY(memory_ensure_free(ctx, term_binary_heap_size(packet->size)) != MEMORY_GC_OK)) { + synclist_unlock(&conn_obj->pending_packets); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + result = term_from_literal_binary(packet->bytes, packet->size, &ctx->heap, ctx->global); + list_remove(first); + free(first); + } + synclist_unlock(&conn_obj->pending_packets); + + return result; +} + +static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[]) +{ + UNUSED(argc); + VALIDATE_VALUE(argv[1], term_is_binary); + const uint8_t *data = (const uint8_t *) term_binary_data(argv[1]); + size_t binary_len = term_binary_size(argv[1]); + if (binary_len < 1 || data[0] != 112) { + RAISE_ERROR(BADARG_ATOM); + } + + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), argv[0], ctx->global->dist_connection_resource_type, &rsrc_obj_ptr))) { + RAISE_ERROR(BADARG_ATOM); + } + struct DistConnection *conn_obj = (struct DistConnection *) rsrc_obj_ptr; + + size_t bytes_read = 0; + term control = externalterm_to_term_with_roots(data + 1, binary_len - 1, ctx, ExternalTermCopy, &bytes_read, 2, argv); + + if (UNLIKELY(!term_is_tuple(control))) { + RAISE_ERROR(BADARG_ATOM); + } + size_t arity = term_get_tuple_arity(control); + if (UNLIKELY(arity < 1)) { + RAISE_ERROR(BADARG_ATOM); + } + term operation = term_get_tuple_element(control, 0); + if (UNLIKELY(!term_is_integer(operation))) { + RAISE_ERROR(BADARG_ATOM); + } + + switch (term_to_int(operation)) { + case OPERATION_REG_SEND: { + if (UNLIKELY(arity != 4)) { + RAISE_ERROR(BADARG_ATOM); + } + term to_name = term_get_tuple_element(control, 3); + int target_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(to_name)); + if (target_process_id) { + term payload = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 2, argv); + globalcontext_send_message(ctx->global, target_process_id, payload); + } + break; + } + case OPERATION_MONITOR_P: { + if (UNLIKELY(arity != 4)) { + RAISE_ERROR(BADARG_ATOM); + } + term from_pid = term_get_tuple_element(control, 1); + term target_proc = term_get_tuple_element(control, 2); + term monitor_ref = term_get_tuple_element(control, 3); + int target_process_id = 0; + if (term_is_local_pid(target_proc)) { + target_process_id = term_to_local_process_id(target_proc); + } else if (term_is_atom(target_proc)) { + target_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc)); + } else { + RAISE_ERROR(BADARG_ATOM); + } + struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor)); + monitor->target_proc = target_proc; + monitor->pid_number = term_get_external_pid_process_id(from_pid); + monitor->pid_serial = term_get_external_pid_serial(from_pid); + monitor->ref_len = term_get_external_reference_len(monitor_ref); + memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len); + if (target_process_id) { + synclist_append(&conn_obj->remote_monitors, &monitor->head); + ErlNifPid target_process_pid = target_process_id; + if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) { + synclist_remove(&conn_obj->remote_monitors, &monitor->head); + dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global); + free(monitor); + } + } else { + dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global); + free(monitor); + } + + break; + } + case OPERATION_DEMONITOR_P: { + if (UNLIKELY(arity != 4)) { + RAISE_ERROR(BADARG_ATOM); + } + term target_proc = term_get_tuple_element(control, 2); + term monitor_ref = term_get_tuple_element(control, 3); + uint8_t ref_len = term_get_external_reference_len(monitor_ref); + const uint32_t *ref_words = term_get_external_reference_words(monitor_ref); + struct ListHead *remote_monitors = synclist_wrlock(&conn_obj->remote_monitors); + struct ListHead *item; + LIST_FOR_EACH (item, remote_monitors) { + struct RemoteMonitor *remote_monitor = GET_LIST_ENTRY(item, struct RemoteMonitor, head); + if (remote_monitor->target_proc == target_proc + && remote_monitor->ref_len == ref_len + && memcmp(remote_monitor->ref_words, ref_words, ref_len * sizeof(uint32_t)) == 0) { + enif_demonitor_process(erl_nif_env_from_context(ctx), conn_obj, &remote_monitor->process_monitor); + list_remove(item); + break; + } + } + synclist_unlock(&conn_obj->remote_monitors); + break; + } + default: + printf("Unknown distribution protocol operation id %d\n", (int) term_to_int(operation)); + RAISE_ERROR(BADARG_ATOM); + } + + return OK_ATOM; +} + +void dist_send_message(term external_pid, term payload, Context *ctx) +{ + // Search for dhandle. + int node_atom_index = term_to_atom_index(term_get_external_node(external_pid)); + uint32_t node_creation = term_get_external_node_creation(external_pid); + struct ListHead *dist_connections = synclist_rdlock(&ctx->global->dist_connections); + struct ListHead *item; + LIST_FOR_EACH (item, dist_connections) { + struct DistConnection *dist_connection = GET_LIST_ENTRY(item, struct DistConnection, head); + if (dist_connection->node_atom_index == node_atom_index && dist_connection->node_creation == node_creation) { + dist_enqueue_send_sender_message(ctx->process_id, external_pid, payload, dist_connection, ctx->global); + break; + } + } + synclist_unlock(&ctx->global->dist_connections); +} + +const struct Nif setnode_3_nif = { + .base.type = NIFFunctionType, + .nif_ptr = nif_erlang_setnode_3 +}; + +const struct Nif dist_ctrl_get_data_notification_nif = { + .base.type = NIFFunctionType, + .nif_ptr = nif_erlang_dist_ctrl_get_data_notification +}; + +const struct Nif dist_ctrl_get_data_nif = { + .base.type = NIFFunctionType, + .nif_ptr = nif_erlang_dist_ctrl_get_data +}; + +const struct Nif dist_ctrl_put_data_nif = { + .base.type = NIFFunctionType, + .nif_ptr = nif_erlang_dist_ctrl_put_data +}; diff --git a/src/libAtomVM/dist_nifs.h b/src/libAtomVM/dist_nifs.h new file mode 100644 index 000000000..d96cfcba4 --- /dev/null +++ b/src/libAtomVM/dist_nifs.h @@ -0,0 +1,50 @@ +/* + * This file is part of AtomVM. + * + * Copyright 2024 Paul Guyot + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + */ + +/** + * @file dist_nifs.h + * @brief Declaration of distribution NIFs and resources + */ + +#ifndef _DIST_NIFS_H_ +#define _DIST_NIFS_H_ + +#include "exportedfunction.h" +#include "globalcontext.h" +#include "term.h" + +#ifdef __cplusplus +extern "C" { +#endif + +extern const ErlNifResourceTypeInit dist_connection_resource_type_init; + +extern const struct Nif setnode_3_nif; +extern const struct Nif dist_ctrl_get_data_notification_nif; +extern const struct Nif dist_ctrl_get_data_nif; +extern const struct Nif dist_ctrl_put_data_nif; + +void dist_send_message(term external_pid, term payload, Context *ctx); + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/src/libAtomVM/globalcontext.c b/src/libAtomVM/globalcontext.c index 950f0a6f6..925d6913e 100644 --- a/src/libAtomVM/globalcontext.c +++ b/src/libAtomVM/globalcontext.c @@ -21,6 +21,7 @@ #include #include +#include "dist_nifs.h" #include "globalcontext.h" #include "atom_table.h" @@ -124,10 +125,13 @@ GlobalContext *globalcontext_new() glb->node_name = NONODE_AT_NOHOST_ATOM; glb->creation = 0; + synclist_init(&glb->dist_connections); -#if HAVE_OPEN && HAVE_CLOSE ErlNifEnv env; erl_nif_env_partial_init_from_globalcontext(&env, glb); + glb->dist_connection_resource_type = enif_init_resource_type(&env, "dist_connection", &dist_connection_resource_type_init, ERL_NIF_RT_CREATE, NULL); + +#if HAVE_OPEN && HAVE_CLOSE glb->posix_fd_resource_type = enif_init_resource_type(&env, "posix_fd", &posix_fd_resource_type_init, ERL_NIF_RT_CREATE, NULL); if (IS_NULL_PTR(glb->posix_fd_resource_type)) { #ifndef AVM_NO_SMP diff --git a/src/libAtomVM/globalcontext.h b/src/libAtomVM/globalcontext.h index e093e4e59..741992f12 100644 --- a/src/libAtomVM/globalcontext.h +++ b/src/libAtomVM/globalcontext.h @@ -158,6 +158,8 @@ struct GlobalContext term ATOMIC node_name; uint32_t ATOMIC creation; + ErlNifResourceType *dist_connection_resource_type; + struct SyncList dist_connections; #if HAVE_OPEN && HAVE_CLOSE ErlNifResourceType *posix_fd_resource_type; diff --git a/src/libAtomVM/nifs.c b/src/libAtomVM/nifs.c index 52089e6b3..2537bd131 100644 --- a/src/libAtomVM/nifs.c +++ b/src/libAtomVM/nifs.c @@ -40,6 +40,7 @@ #include "context.h" #include "defaultatoms.h" #include "dictionary.h" +#include "dist_nifs.h" #include "ets.h" #include "externalterm.h" #include "globalcontext.h" diff --git a/src/libAtomVM/nifs.gperf b/src/libAtomVM/nifs.gperf index b4fd499fa..99ad4e7a1 100644 --- a/src/libAtomVM/nifs.gperf +++ b/src/libAtomVM/nifs.gperf @@ -131,6 +131,10 @@ erlang:group_leader/2, &group_leader_nif erlang:get_module_info/1, &get_module_info_nif erlang:get_module_info/2, &get_module_info_nif erlang:setnode/2, &setnode_2_nif +erlang:setnode/3, &setnode_3_nif +erlang:dist_ctrl_get_data_notification/1, &dist_ctrl_get_data_notification_nif +erlang:dist_ctrl_get_data/1, &dist_ctrl_get_data_nif +erlang:dist_ctrl_put_data/2, &dist_ctrl_put_data_nif erts_debug:flat_size/1, &flat_size_nif ets:new/2, &ets_new_nif ets:insert/2, &ets_insert_nif diff --git a/src/libAtomVM/opcodesswitch.h b/src/libAtomVM/opcodesswitch.h index 7875de3e9..11d12f23e 100644 --- a/src/libAtomVM/opcodesswitch.h +++ b/src/libAtomVM/opcodesswitch.h @@ -29,6 +29,7 @@ #include "bitstring.h" #include "debug.h" #include "defaultatoms.h" +#include "dist_nifs.h" #include "exportedfunction.h" #include "nifs.h" #include "opcodes.h" @@ -2403,20 +2404,24 @@ HOT_FUNC int scheduler_entry_point(GlobalContext *glb) #ifdef IMPL_EXECUTE_LOOP term recipient_term = x_regs[0]; - int local_process_id; - if (term_is_local_pid_or_port(recipient_term)) { - local_process_id = term_to_local_process_id(recipient_term); - } else if (term_is_atom(recipient_term)) { - local_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(recipient_term)); - if (UNLIKELY(local_process_id == 0)) { + if (UNLIKELY(term_is_external_pid(recipient_term))) { + dist_send_message(recipient_term, x_regs[1], ctx); + } else { + int local_process_id; + if (term_is_local_pid_or_port(recipient_term)) { + local_process_id = term_to_local_process_id(recipient_term); + } else if (term_is_atom(recipient_term)) { + local_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(recipient_term)); + if (UNLIKELY(local_process_id == 0)) { + RAISE_ERROR(BADARG_ATOM); + } + } else { RAISE_ERROR(BADARG_ATOM); } - } else { - RAISE_ERROR(BADARG_ATOM); + TRACE("send/0 target_pid=%i\n", local_process_id); + TRACE_SEND(ctx, x_regs[0], x_regs[1]); + globalcontext_send_message(ctx->global, local_process_id, x_regs[1]); } - TRACE("send/0 target_pid=%i\n", local_process_id); - TRACE_SEND(ctx, x_regs[0], x_regs[1]); - globalcontext_send_message(ctx->global, local_process_id, x_regs[1]); x_regs[0] = x_regs[1]; #endif diff --git a/tests/libs/estdlib/CMakeLists.txt b/tests/libs/estdlib/CMakeLists.txt index 331f81463..3531c3943 100644 --- a/tests/libs/estdlib/CMakeLists.txt +++ b/tests/libs/estdlib/CMakeLists.txt @@ -38,6 +38,7 @@ set(ERLANG_MODULES test_logger test_maps test_net + test_net_kernel test_sets test_spawn test_ssl diff --git a/tests/libs/estdlib/test_net_kernel.erl b/tests/libs/estdlib/test_net_kernel.erl new file mode 100644 index 000000000..b498fc9d4 --- /dev/null +++ b/tests/libs/estdlib/test_net_kernel.erl @@ -0,0 +1,107 @@ +% +% This file is part of AtomVM. +% +% Copyright 2024 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +-module(test_net_kernel). + +-export([test/0, start/0]). + +start() -> + test(). + +test() -> + Platform = erlang:system_info(machine), + case has_erl(Platform) andalso has_epmd(Platform) of + true -> + ok = ensure_epmd(Platform), + ok = setup(Platform), + ok = test_ping_from_beam(Platform), + ok = test_fail_with_wrong_cookie(Platform), + io:format("test passed\n"), + ok; + false -> + io:format("~s: skipped\n", [?MODULE]), + ok + end. + +has_erl(Platform) -> + has_command(Platform, "erl"). + +has_epmd(Platform) -> + has_command(Platform, "epmd"). + +has_command("BEAM", Command) -> + R = os:cmd("command -v " ++ Command), + R =/= []; +has_command("ATOM", Command) -> + {ok, _, Fd} = atomvm:subprocess("/bin/sh", ["sh", "-c", "command -v " ++ Command], [], [stdout]), + Result = + case atomvm:posix_read(Fd, 200) of + eof -> false; + {ok, _Line} -> true + end, + ok = atomvm:posix_close(Fd), + Result. + +ensure_epmd("BEAM") -> + _ = os:cmd("epmd -daemon"), + ok; +ensure_epmd("ATOM") -> + {ok, _, Fd} = atomvm:subprocess("/bin/sh", ["sh", "-c", "epmd -daemon"], [], [stdout]), + ok = atomvm:posix_close(Fd), + ok. + +test_ping_from_beam(Platform) -> + {ok, _NetKernelPid} = net_kernel:start(atomvm, #{name_domain => shortnames}), + Node = node(), + erlang:set_cookie('AtomVM'), + Result = execute_command( + Platform, + "erl -sname otp -setcookie AtomVM -eval \"R = net_adm:ping('" ++ atom_to_list(Node) ++ + "'), erlang:display(R).\" -s init stop -noshell" + ), + "pong" ++ _ = Result, + net_kernel:stop(), + ok. + +test_fail_with_wrong_cookie(_Platform) -> + ok. + +% On AtomVM, we need to start kernel. +setup("BEAM") -> + ok; +setup("ATOM") -> + {ok, _KernelPid} = kernel:start(normal, []), + ok. + +execute_command("BEAM", Command) -> + os:cmd(Command); +execute_command("ATOM", Command) -> + {ok, _, Fd} = atomvm:subprocess("/bin/sh", ["sh", "-c", Command], [], [stdout]), + Result = loop_read(Fd, []), + ok = atomvm:posix_close(Fd), + Result. + +loop_read(Fd, Acc) -> + case atomvm:posix_read(Fd, 10) of + eof -> + lists:flatten(lists:reverse(Acc)); + {ok, Line} -> + loop_read(Fd, [binary_to_list(Line) | Acc]) + end.