From 85482062b63c28d9dbb5e6ee87e1ba8cd80bff42 Mon Sep 17 00:00:00 2001 From: Paul Guyot Date: Sun, 15 Oct 2023 21:31:29 +0200 Subject: [PATCH] Add support for PicoW networking Port OTP Socket code to lwIP raw API Also fix networkdriver code to avoid sending messages from ISR Also add socket:recv/2,3 and socket:recvfrom/2,3 Signed-off-by: Paul Guyot --- examples/erlang/rp2040/CMakeLists.txt | 2 + examples/erlang/rp2040/picow_tcp_server.erl | 89 + examples/erlang/rp2040/picow_udp_beacon.erl | 58 + libs/estdlib/src/socket.erl | 90 +- src/libAtomVM/CMakeLists.txt | 2 + src/libAtomVM/otp_socket.c | 2045 ++++++++++++----- src/libAtomVM/otp_socket.h | 68 + src/libAtomVM/posix_nifs.c | 6 + src/libAtomVM/resources.c | 23 +- src/libAtomVM/resources.h | 14 +- .../esp32/components/avm_sys/CMakeLists.txt | 3 +- .../esp32/components/libatomvm/CMakeLists.txt | 7 +- src/platforms/esp32/test/CMakeLists.txt | 13 + src/platforms/rp2040/src/lib/CMakeLists.txt | 10 +- src/platforms/rp2040/src/lib/networkdriver.c | 137 +- .../rp2040/src/lib/otp_socket_platform.c | 70 + .../rp2040/src/lib/otp_socket_platform.h | 49 + src/platforms/rp2040/src/lib/rp2040_sys.h | 41 + src/platforms/rp2040/src/lib/sys.c | 113 +- tests/libs/estdlib/test_tcp_socket.erl | 21 + 20 files changed, 2239 insertions(+), 622 deletions(-) create mode 100644 examples/erlang/rp2040/picow_tcp_server.erl create mode 100644 examples/erlang/rp2040/picow_udp_beacon.erl create mode 100644 src/platforms/rp2040/src/lib/otp_socket_platform.c create mode 100644 src/platforms/rp2040/src/lib/otp_socket_platform.h diff --git a/examples/erlang/rp2040/CMakeLists.txt b/examples/erlang/rp2040/CMakeLists.txt index 02bc018487..e1f0b3db0d 100644 --- a/examples/erlang/rp2040/CMakeLists.txt +++ b/examples/erlang/rp2040/CMakeLists.txt @@ -28,3 +28,5 @@ pack_uf2(pico_rtc pico_rtc) pack_uf2(picow_blink picow_blink) pack_uf2(picow_wifi_sta picow_wifi_sta) pack_uf2(picow_wifi_ap picow_wifi_ap) +pack_uf2(picow_udp_beacon picow_udp_beacon) +pack_uf2(picow_tcp_server picow_tcp_server) diff --git a/examples/erlang/rp2040/picow_tcp_server.erl b/examples/erlang/rp2040/picow_tcp_server.erl new file mode 100644 index 0000000000..053bf170c2 --- /dev/null +++ b/examples/erlang/rp2040/picow_tcp_server.erl @@ -0,0 +1,89 @@ +% +% This file is part of AtomVM. +% +% Copyright 2023 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +-module(picow_tcp_server). + +-export([start/0]). + +-spec start() -> no_return(). +start() -> + Config = [ + {sta, [ + {ssid, <<"myssid">>}, + {psk, <<"mypsk">>}, + {got_ip, fun got_ip/1} + ]} + ], + case network:start(Config) of + {ok, _Pid} -> + timer:sleep(infinity); + Error -> + erlang:display(Error) + end. + +got_ip({IPv4, Netmask, Gateway}) -> + io:format("Got IP: ip=~p, netmask=~p, gateway=~p.\n", [IPv4, Netmask, Gateway]), + setup(). + +setup() -> + {ok, ListeningSocket} = socket:open(inet, stream, tcp), + + ok = socket:setopt(ListeningSocket, {socket, reuseaddr}, true), + ok = socket:setopt(ListeningSocket, {socket, linger}, #{onoff => true, linger => 0}), + + ok = socket:bind(ListeningSocket, #{family => inet, addr => any, port => 44404}), + ok = socket:listen(ListeningSocket), + io:format("Listening on ~p.~n", [socket:sockname(ListeningSocket)]), + + spawn(fun() -> accept(ListeningSocket) end). + +accept(ListeningSocket) -> + io:format("Waiting to accept connection...~n"), + case socket:accept(ListeningSocket) of + {ok, ConnectedSocket} -> + io:format("Accepted connection. local: ~p peer: ~p~n", [ + socket:sockname(ConnectedSocket), socket:peername(ConnectedSocket) + ]), + spawn(fun() -> accept(ListeningSocket) end), + echo(ConnectedSocket); + {error, Reason} -> + io:format("An error occurred accepting connection: ~p~n", [Reason]) + end. + +-spec echo(ConnectedSocket :: socket:socket()) -> no_return(). +echo(ConnectedSocket) -> + io:format("Waiting to receive data...~n"), + case socket:recv(ConnectedSocket) of + {ok, Data} -> + io:format("Received data ~p from ~p. Echoing back...~n", [ + Data, socket:peername(ConnectedSocket) + ]), + case socket:send(ConnectedSocket, Data) of + ok -> + io:format("All data was sent~n"); + {ok, Rest} -> + io:format("Some data was sent. Remaining: ~p~n", [Rest]); + {error, Reason} -> + io:format("An error occurred sending data: ~p~n", [Reason]) + end, + echo(ConnectedSocket); + {error, Reason} -> + io:format("An error occurred waiting on a connected socket: ~p~n", [Reason]) + end. diff --git a/examples/erlang/rp2040/picow_udp_beacon.erl b/examples/erlang/rp2040/picow_udp_beacon.erl new file mode 100644 index 0000000000..b555147af1 --- /dev/null +++ b/examples/erlang/rp2040/picow_udp_beacon.erl @@ -0,0 +1,58 @@ +% +% This file is part of AtomVM. +% +% Copyright 2023 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +-module(picow_udp_beacon). + +-export([start/0]). + +start() -> + Self = self(), + Config = [ + {sta, [ + {ssid, <<"myssid">>}, + {psk, <<"mypsk">>}, + {got_ip, fun(IPInfo) -> got_ip(Self, IPInfo) end} + ]} + ], + case network:start(Config) of + {ok, _Pid} -> + loop(disconnected); + Error -> + erlang:display(Error) + end. + +got_ip(Parent, {IPv4, Netmask, Gateway}) -> + io:format("Got IP: ip=~p, netmask=~p, gateway=~p.\n", [IPv4, Netmask, Gateway]), + Parent ! connected. + +loop(disconnected) -> + receive + connected -> + {ok, UDPSocket} = socket:open(inet, dgram, udp), + loop({UDPSocket, 0}) + end; +loop({UDPSocket, N}) -> + Message = io_lib:format("AtomVM beacon #~B\n", [N]), + ok = socket:sendto(UDPSocket, Message, #{ + family => inet, port => 4444, addr => {255, 255, 255, 255} + }), + io:format("Sent beacon #~B\n", [N]), + timer:sleep(1000), + loop({UDPSocket, N + 1}). diff --git a/libs/estdlib/src/socket.erl b/libs/estdlib/src/socket.erl index e8741a5196..6027d03a35 100644 --- a/libs/estdlib/src/socket.erl +++ b/libs/estdlib/src/socket.erl @@ -30,7 +30,11 @@ sockname/1, peername/1, recv/1, + recv/2, + recv/3, recvfrom/1, + recvfrom/2, + recvfrom/3, send/2, sendto/3, setopt/3, @@ -42,8 +46,8 @@ -export([ nif_select_read/3, nif_accept/1, - nif_recv/1, - nif_recvfrom/1, + nif_recv/2, + nif_recvfrom/2, nif_select_stop/1, nif_send/2, nif_sendto/3 @@ -248,6 +252,12 @@ accept(Socket, Timeout) -> R end; {closed, Ref} -> + % socket was closed by another process + % TODO: we need to handle: + % (a) SELECT_STOP being scheduled + % (b) flush of messages as we can have both + % {closed, Ref} and {select, _, Ref, _} in the + % queue {error, closed} after Timeout -> {error, timeout} @@ -257,30 +267,39 @@ accept(Socket, Timeout) -> end. %%----------------------------------------------------------------------------- -%% @equiv socket:recv(Socket, infinity) +%% @equiv socket:recv(Socket, 0) %% @end %%----------------------------------------------------------------------------- -spec recv(Socket :: socket()) -> {ok, Data :: binary()} | {error, Reason :: term()}. recv(Socket) -> - recv(Socket, infinity). + recv(Socket, 0, infinity). + +%%----------------------------------------------------------------------------- +%% @equiv socket:recv(Socket, Length, infinity) +%% @end +%%----------------------------------------------------------------------------- +-spec recv(Socket :: socket(), Length :: non_neg_integer()) -> + {ok, Data :: binary()} | {error, Reason :: term()}. +recv(Socket, Length) -> + recv(Socket, Length, infinity). %%----------------------------------------------------------------------------- %% @param Socket the socket +%% @param Length number of bytes to receive %% @param Timeout timeout (in milliseconds) %% @returns `{ok, Data}' if successful; `{error, Reason}', otherwise. %% @doc Receive data on the specified socket. %% -%% Note that this function will block until data is received -%% on the socket. +%% This function is equivalent to `recvfrom/3' except for the return type. %% %% Example: %% %% `{ok, Data} = socket:recv(ConnectedSocket)' %% @end %%----------------------------------------------------------------------------- --spec recv(Socket :: socket(), Timeout :: timeout()) -> +-spec recv(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) -> {ok, Data :: binary()} | {error, Reason :: term()}. -recv(Socket, Timeout) -> +recv(Socket, Length, Timeout) -> Self = self(), Ref = erlang:make_ref(), ?TRACE("select read for recv. self=~p ref=~p~n", [Self, Ref]), @@ -288,14 +307,18 @@ recv(Socket, Timeout) -> ok -> receive {select, _AcceptedSocket, Ref, ready_input} -> - case ?MODULE:nif_recv(Socket) of - {error, closed} = E -> + case ?MODULE:nif_recv(Socket, Length) of + {error, _} = E -> ?MODULE:nif_select_stop(Socket), E; - R -> - R + % TODO: Assemble data to have more if Length > byte_size(Data) + % as long as timeout did not expire + {ok, Data} -> + {ok, Data} end; {closed, Ref} -> + % socket was closed by another process + % TODO: see above in accept/2 {error, closed} after Timeout -> {error, timeout} @@ -305,16 +328,26 @@ recv(Socket, Timeout) -> end. %%----------------------------------------------------------------------------- -%% @equiv socket:recvfrom(Socket, infinity) +%% @equiv socket:recvfrom(Socket, 0) %% @end %%----------------------------------------------------------------------------- -spec recvfrom(Socket :: socket()) -> {ok, {Address :: sockaddr(), Data :: binary()}} | {error, Reason :: term()}. recvfrom(Socket) -> - recvfrom(Socket, infinity). + recvfrom(Socket, 0). + +%%----------------------------------------------------------------------------- +%% @equiv socket:recvfrom(Socket, Length, infinity) +%% @end +%%----------------------------------------------------------------------------- +-spec recvfrom(Socket :: socket(), Length :: non_neg_integer()) -> + {ok, {Address :: sockaddr(), Data :: binary()}} | {error, Reason :: term()}. +recvfrom(Socket, Length) -> + recvfrom(Socket, Length, infinity). %%----------------------------------------------------------------------------- %% @param Socket the socket +%% @param Length number of bytes to receive %% @param Timeout timeout (in milliseconds) %% @returns `{ok, {Address, Data}}' if successful; `{error, Reason}', otherwise. %% @doc Receive data on the specified socket, returning the from address. @@ -325,11 +358,20 @@ recvfrom(Socket) -> %% Example: %% %% `{ok, {Address, Data}} = socket:recvfrom(ConnectedSocket)' +%% +%% If socket is UDP, the function retrieves the first available packet and +%% truncate it to Length bytes, unless Length is 0 in which case it returns +%% the whole packet ("all available"). +%% +%% If socket is TCP and Length is 0, this function retrieves all available +%% data without waiting (using peek if the platform allows it). +%% If socket is TCP and Length is not 0, this function waits until Length +%% bytes are available and return these bytes. %% @end %%----------------------------------------------------------------------------- --spec recvfrom(Socket :: socket(), Timeout :: timeout()) -> +-spec recvfrom(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) -> {ok, {Address :: sockaddr(), Data :: binary()}} | {error, Reason :: term()}. -recvfrom(Socket, Timeout) -> +recvfrom(Socket, Length, Timeout) -> Self = self(), Ref = erlang:make_ref(), ?TRACE("select read for recvfrom. self=~p ref=~p", [Self, Ref]), @@ -337,14 +379,18 @@ recvfrom(Socket, Timeout) -> ok -> receive {select, _AcceptedSocket, Ref, ready_input} -> - case ?MODULE:nif_recvfrom(Socket) of - {error, closed} = E -> + case ?MODULE:nif_recvfrom(Socket, Length) of + {error, _} = E -> ?MODULE:nif_select_stop(Socket), E; - R -> - R + % TODO: Assemble data to have more if Length > byte_size(Data) + % as long as timeout did not expire + {ok, {Address, Data}} -> + {ok, {Address, Data}} end; {closed, Ref} -> + % socket was closed by another process + % TODO: see above in accept/2 {error, closed} after Timeout -> {error, timeout} @@ -475,11 +521,11 @@ nif_accept(_Socket) -> erlang:nif_error(undefined). %% @private -nif_recv(_Socket) -> +nif_recv(_Socket, _Length) -> erlang:nif_error(undefined). %% @private -nif_recvfrom(_Socket) -> +nif_recvfrom(_Socket, _Length) -> erlang:nif_error(undefined). %% @private diff --git a/src/libAtomVM/CMakeLists.txt b/src/libAtomVM/CMakeLists.txt index c26f939804..c36102aac9 100644 --- a/src/libAtomVM/CMakeLists.txt +++ b/src/libAtomVM/CMakeLists.txt @@ -168,6 +168,8 @@ define_if_symbol_exists(libAtomVM O_SEARCH "fcntl.h" PRIVATE HAVE_O_SEARCH) define_if_symbol_exists(libAtomVM O_TTY_INIT "fcntl.h" PRIVATE HAVE_O_TTY_INIT) define_if_symbol_exists(libAtomVM clock_settime "time.h" PRIVATE HAVE_CLOCK_SETTIME) define_if_symbol_exists(libAtomVM settimeofday "sys/time.h" PRIVATE HAVE_SETTIMEOFDAY) +define_if_symbol_exists(libAtomVM socket "sys/socket.h" PUBLIC HAVE_SOCKET) +define_if_symbol_exists(libAtomVM select "sys/select.h" PUBLIC HAVE_SELECT) if (AVM_USE_32BIT_FLOAT) target_compile_definitions(libAtomVM PUBLIC AVM_USE_SINGLE_PRECISION) diff --git a/src/libAtomVM/otp_socket.c b/src/libAtomVM/otp_socket.c index bcd5139ec3..cdc331561e 100644 --- a/src/libAtomVM/otp_socket.c +++ b/src/libAtomVM/otp_socket.c @@ -26,9 +26,9 @@ #include #include #include +#include #include #include -#include #include #include #include @@ -36,7 +36,10 @@ #include #include +// We use errno to report errors with both BSD Sockets and LWIP #include + +#if OTP_SOCKET_BSD #include #include #include @@ -45,17 +48,123 @@ #if HAVE_SIGNAL #include #endif +#elif OTP_SOCKET_LWIP +#include +#include +#else +#error OTP Socket requires BSD Socket or lwIP +#endif // #define ENABLE_TRACE #include -struct SocketFd +// To factorize parsing of erlang term options, we define few constants for lwIP. +#if OTP_SOCKET_LWIP +enum ShutdownHow +{ + SHUT_RD, + SHUT_WR, + SHUT_RDWR, +}; + +enum SocketProtocol +{ + IPPROTO_IP, + IPPROTO_TCP, + IPPROTO_UDP +}; + +enum SocketDomain +{ + PF_INET +}; + +enum SocketType +{ + SOCK_STREAM, + SOCK_DGRAM +}; +#endif + +// Additional LWIP definitions +#if OTP_SOCKET_LWIP +enum SocketState +{ + // Bits + SocketStateClosed = 0, + SocketStateUDP = 1 << 1, + SocketStateTCP = 1 << 2, + SocketStateSelectingRead = 1 << 3, + SocketStateSelectedOneClient = 1 << 4, + SocketStateConnected = 1 << 5, + SocketStateListening = 1 << 6, + SocketStateTrapped = 1 << 7, + + // Actual states + SocketStateUDPIdle = SocketStateUDP, + SocketStateUDPSelectingRead = SocketStateUDP | SocketStateSelectingRead, + SocketStateTCPNew = SocketStateTCP, + SocketStateTCPConnected = SocketStateTCP | SocketStateConnected, + SocketStateTCPConnecting = SocketStateTCPConnected | SocketStateTrapped, + SocketStateTCPSelectingRead = SocketStateTCPConnected | SocketStateSelectingRead, + SocketStateTCPListening = SocketStateTCP | SocketStateListening, + SocketStateTCPAccepting = SocketStateTCPListening | SocketStateTrapped, + SocketStateTCPSelectingAccept = SocketStateTCPListening | SocketStateSelectingRead, + SocketStateTCPSelectingOneClient = SocketStateTCPListening | SocketStateSelectingRead | SocketStateSelectedOneClient, + SocketStateTCPSelectStopOneClient = SocketStateTCPListening | SocketStateSelectedOneClient, +}; + +struct TCPReceivedItem +{ + struct ListHead list_head; + struct pbuf *buf; + err_t err; +}; + +struct UDPReceivedItem +{ + struct ListHead list_head; + struct pbuf *buf; + const ip_addr_t *addr; + u16_t port; +}; + +static err_t tcp_recv_cb(void *arg, struct tcp_pcb *tpcb, struct pbuf *p, err_t err); +static void udp_recv_cb(void *arg, struct udp_pcb *pcb, struct pbuf *p, const ip_addr_t *addr, u16_t port); + +#endif + +#if OTP_SOCKET_BSD +struct SocketResource { int fd; uint64_t ref_ticks; int32_t selecting_process_id; ErlNifMonitor selecting_process_monitor; }; +#elif OTP_SOCKET_LWIP +struct SocketResource +{ + enum SocketState socket_state; + union + { + struct tcp_pcb *tcp_pcb; + struct udp_pcb *udp_pcb; + }; + uint64_t ref_ticks; + int32_t selecting_process_id; // trapped or selecting + ErlNifMonitor selecting_process_monitor; + bool linger_on; + int linger_sec; + size_t pos; + union + { + struct ListHead tcp_received_list; + struct ListHead udp_received_list; + struct SocketResource *selected_client; + }; +}; +#endif static const char *const addr_atom = ATOM_STR("\x4", "addr"); static const char *const any_atom = ATOM_STR("\x3", "any"); @@ -127,7 +236,10 @@ static const AtomStringIntPair otp_socket_shutdown_direction_table[] = { }; #define DEFAULT_BUFFER_SIZE 512 + +#ifndef MIN #define MIN(A, B) (((A) < (B)) ? (A) : (B)) +#endif static ErlNifResourceType *socket_resource_type; @@ -139,28 +251,51 @@ static void socket_dtor(ErlNifEnv *caller_env, void *obj) { UNUSED(caller_env); - struct SocketFd *fd_obj = (struct SocketFd *) obj; - if (fd_obj->fd != CLOSED_FD) { - close(fd_obj->fd); - fd_obj->fd = CLOSED_FD; + struct SocketResource *rsrc_obj = (struct SocketResource *) obj; + + TRACE("socket_dtor called on resource=%p\n", (void *) rsrc_obj); +#if OTP_SOCKET_BSD + if (rsrc_obj->fd != CLOSED_FD) { + close(rsrc_obj->fd); + rsrc_obj->fd = CLOSED_FD; + } +#elif OTP_SOCKET_LWIP + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateUDP) { + udp_remove(rsrc_obj->udp_pcb); + rsrc_obj->udp_pcb = NULL; + } else if (rsrc_obj->socket_state & SocketStateTCP) { + // Try to nicely close the connection here + // NOTE: we can't handle linger if the socket is gone. + if (UNLIKELY(tcp_close(rsrc_obj->tcp_pcb) != ERR_OK)) { + // The resource will be gone, there is not much we can do here. + if (!(rsrc_obj->socket_state & SocketStateTCPListening)) { + tcp_abort(rsrc_obj->tcp_pcb); + } + } + rsrc_obj->tcp_pcb = NULL; } + LWIP_END(); +#endif } +#if OTP_SOCKET_BSD static void socket_stop(ErlNifEnv *caller_env, void *obj, ErlNifEvent event, int is_direct_call) { UNUSED(caller_env); UNUSED(event); UNUSED(is_direct_call); - struct SocketFd *fd_obj = (struct SocketFd *) obj; + struct SocketResource *rsrc_obj = (struct SocketResource *) obj; - if (fd_obj->selecting_process_id != INVALID_PROCESS_ID) { - enif_demonitor_process(caller_env, fd_obj, &fd_obj->selecting_process_monitor); - fd_obj->selecting_process_id = INVALID_PROCESS_ID; + if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { + enif_demonitor_process(caller_env, rsrc_obj, &rsrc_obj->selecting_process_monitor); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; } - TRACE("socket_stop called on fd=%i\n", fd_obj->fd); + TRACE("socket_stop called on fd=%i\n", rsrc_obj->fd); } +#endif static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNifMonitor *mon) { @@ -168,32 +303,83 @@ static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNif UNUSED(pid); UNUSED(mon); - struct SocketFd *fd_obj = (struct SocketFd *) obj; + struct SocketResource *rsrc_obj = (struct SocketResource *) obj; - TRACE("socket_down called on process_id=%i fd=%i\n", *pid, fd_obj->fd); +#if OTP_SOCKET_BSD + TRACE("socket_down called on process_id=%i fd=%i\n", (int) *pid, rsrc_obj->fd); +#else + TRACE("socket_down called on process_id=%i\n", (int) *pid); +#endif - if (fd_obj->selecting_process_id != INVALID_PROCESS_ID) { +#if OTP_SOCKET_BSD + if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { // Monitor fired, so make sure we don't try to demonitor in select_stop // as it could crash trying to reacquire lock on process table - fd_obj->selecting_process_id = INVALID_PROCESS_ID; - enif_select(caller_env, fd_obj->fd, ERL_NIF_SELECT_STOP, fd_obj, NULL, term_nil()); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + enif_select(caller_env, rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil()); + } +#elif OTP_SOCKET_LWIP + // Monitor can be called when we're selecting, accepting or connecting. + if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { + LWIP_BEGIN(); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + if (rsrc_obj->socket_state & SocketStateTCP) { + if (rsrc_obj->socket_state & SocketStateTCPListening) { + (void) tcp_close(rsrc_obj->tcp_pcb); + } else { + tcp_abort(rsrc_obj->tcp_pcb); + } + rsrc_obj->tcp_pcb = NULL; + rsrc_obj->socket_state = SocketStateClosed; + } else if (rsrc_obj->socket_state & SocketStateUDP) { + udp_remove(rsrc_obj->udp_pcb); + rsrc_obj->udp_pcb = NULL; + rsrc_obj->socket_state = SocketStateClosed; + } + LWIP_END(); } +#endif } static const ErlNifResourceTypeInit SocketResourceTypeInit = { .members = 3, .dtor = socket_dtor, +#if OTP_SOCKET_BSD .stop = socket_stop, +#else + .stop = NULL, +#endif .down = socket_down, }; +// select emulation for lwIP that doesn't have select. +#if OTP_SOCKET_LWIP +static void select_event_send_notification_from_nif(struct SocketResource *rsrc_obj, Context *locked_ctx) +{ + BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap) + term notification = select_event_make_notification(rsrc_obj, rsrc_obj->ref_ticks, false, &heap); + mailbox_send(locked_ctx, notification); + END_WITH_STACK_HEAP(heap, locked_ctx->global) +} + +static void select_event_send_notification_from_handler(struct SocketResource *rsrc_obj, int32_t process_id) +{ + struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj); + GlobalContext *global = rsrc_refc->resource_type->global; + BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap) + term notification = select_event_make_notification(rsrc_obj, rsrc_obj->ref_ticks, false, &heap); + globalcontext_send_message(global, process_id, notification); + END_WITH_STACK_HEAP(heap, global) +} +#endif + // register the socket_fd resource type void otp_socket_init(GlobalContext *global) { ErlNifEnv env; erl_nif_env_partial_init_from_globalcontext(&env, global); socket_resource_type = enif_init_resource_type(&env, "socket_fd", &SocketResourceTypeInit, ERL_NIF_RT_CREATE, NULL); -#if HAVE_SIGNAL +#if OTP_SOCKET_BSD && HAVE_SIGNAL // ignore pipe signals on shutdown signal(SIGPIPE, SIG_IGN); #endif @@ -284,12 +470,28 @@ static inline int get_protocol(GlobalContext *global, term protocol_term, bool * static inline term make_error_tuple(term reason, Context *ctx) { + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } term error_tuple = term_alloc_tuple(2, &ctx->heap); term_put_tuple_element(error_tuple, 0, ERROR_ATOM); term_put_tuple_element(error_tuple, 1, reason); return error_tuple; } +#if OTP_SOCKET_BSD +static term make_errno_tuple(Context *ctx) +{ + return make_error_tuple(posix_errno_to_term(errno, ctx->global), ctx); +} +#elif OTP_SOCKET_LWIP +static term make_lwip_err_tuple(err_t err, Context *ctx) +{ + return make_error_tuple(term_from_int(err), ctx); +} +#endif + // // open // @@ -316,37 +518,66 @@ static term nif_socket_open(Context *ctx, int argc, term argv[]) RAISE_ERROR(BADARG_ATOM); } - int fd = socket(domain, type, protocol); - if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) { + struct SocketResource *rsrc_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketResource)); + if (IS_NULL_PTR(rsrc_obj)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } +#if OTP_SOCKET_BSD + rsrc_obj->fd = socket(domain, type, protocol); + if (UNLIKELY(rsrc_obj->fd == -1 || rsrc_obj->fd == CLOSED_FD)) { AVM_LOGE(TAG, "Failed to initialize socket."); - - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } - - return make_error_tuple(posix_errno_to_term(errno, global), ctx); - + enif_release_resource(rsrc_obj); + return make_errno_tuple(ctx); } else { - - struct SocketFd *fd_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketFd)); - - if (IS_NULL_PTR(fd_obj)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); + TRACE("nif_socket_open: Created socket fd=%i\n", rsrc_obj->fd); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + +#elif OTP_SOCKET_LWIP + if (domain == PF_INET && type == SOCK_STREAM && protocol == IPPROTO_TCP) { + LWIP_BEGIN(); + rsrc_obj->tcp_pcb = tcp_new(); + LWIP_END(); + rsrc_obj->socket_state = SocketStateTCPNew; + } else if (domain == PF_INET && type == SOCK_DGRAM && protocol == IPPROTO_UDP) { + LWIP_BEGIN(); + rsrc_obj->udp_pcb = udp_new(); + LWIP_END(); + rsrc_obj->socket_state = SocketStateUDPIdle; + } else { + enif_release_resource(rsrc_obj); + RAISE_ERROR(BADARG_ATOM); + } + if (UNLIKELY(rsrc_obj->udp_pcb == NULL && rsrc_obj->tcp_pcb == NULL)) { + rsrc_obj->socket_state = SocketStateClosed; + enif_release_resource(rsrc_obj); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } else { + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + rsrc_obj->linger_on = false; + rsrc_obj->linger_sec = 0; + rsrc_obj->pos = 0; + if (rsrc_obj->socket_state & SocketStateTCP) { + list_init(&rsrc_obj->tcp_received_list); + LWIP_BEGIN(); + tcp_arg(rsrc_obj->tcp_pcb, rsrc_obj); + tcp_recv(rsrc_obj->tcp_pcb, tcp_recv_cb); + LWIP_END(); + } else { + list_init(&rsrc_obj->udp_received_list); + LWIP_BEGIN(); + udp_recv(rsrc_obj->udp_pcb, udp_recv_cb, rsrc_obj); + LWIP_END(); } - - fd_obj->fd = fd; - fd_obj->selecting_process_id = INVALID_PROCESS_ID; - TRACE("nif_socket_open: Created socket fd=%i\n", fd_obj->fd); +#endif if (UNLIKELY(memory_ensure_free(ctx, TERM_BOXED_RESOURCE_SIZE) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - term obj = enif_make_resource(erl_nif_env_from_context(ctx), fd_obj); - enif_release_resource(fd_obj); + term obj = enif_make_resource(erl_nif_env_from_context(ctx), rsrc_obj); + enif_release_resource(rsrc_obj); size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &obj, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { @@ -389,6 +620,32 @@ static bool term_is_socket(term socket_term) // close // +static int send_closed_notification(Context *ctx, struct SocketResource *rsrc_obj) +{ + // send a {closed, Ref | undefined} message to the pid + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + REF_SIZE) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + return -1; + } + + term error_tuple = term_alloc_tuple(2, &ctx->heap); + term_put_tuple_element(error_tuple, 0, CLOSED_ATOM); + term ref = (rsrc_obj->ref_ticks == 0) ? UNDEFINED_ATOM : term_from_ref_ticks(rsrc_obj->ref_ticks, &ctx->heap); + term_put_tuple_element(error_tuple, 1, ref); + + TRACE("nif_socket_close: Sending msg to process %i, rsrc_obj = %p\n", (int) rsrc_obj->selecting_process_id, (void *) rsrc_obj); + globalcontext_send_message(ctx->global, rsrc_obj->selecting_process_id, error_tuple); + + return 0; +} + +#if OTP_SOCKET_LWIP +static void finalize_close_hander(struct LWIPEvent *event) +{ + enif_release_resource(event->finalize_close.rsrc_obj); +} +#endif + static term nif_socket_close(Context *ctx, int argc, term argv[]) { TRACE("nif_socket_close\n"); @@ -396,57 +653,92 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) VALIDATE_VALUE(argv[0], term_is_socket); - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - if (fd_obj->selecting_process_id != INVALID_PROCESS_ID) { + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; +#if OTP_SOCKET_BSD + if (rsrc_obj->fd) { + if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { // // If we are in select, then stop selecting // - if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), fd_obj->fd, ERL_NIF_SELECT_STOP, fd_obj, NULL, term_nil()) < 0)) { + int stop_res = enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil()); + if (UNLIKELY(stop_res < 0)) { RAISE_ERROR(BADARG_ATOM); } + // TODO: check if stop_res & ERL_NIF_SELECT_STOP_CALLED or ERL_NIF_SELECT_STOP_SCHEDULED + // following what OTP does. Indeed, if we have ERL_NIF_SELECT_STOP_SCHEDULED, we should not close the socket now // // If there is a process (other than ourself) who is waiting on select // the send a {closed, Ref} message to it, so that it can break // out of its receive statement. // - if (fd_obj->selecting_process_id != ctx->process_id) { + if (rsrc_obj->selecting_process_id != ctx->process_id) { // send a {closed, Ref | undefined} message to the pid - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + REF_SIZE) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + if (UNLIKELY(send_closed_notification(ctx, rsrc_obj) < 0)) { RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - - term error_tuple = term_alloc_tuple(2, &ctx->heap); - term_put_tuple_element(error_tuple, 0, CLOSED_ATOM); - term ref = (fd_obj->ref_ticks == 0) ? UNDEFINED_ATOM : term_from_ref_ticks(fd_obj->ref_ticks, &ctx->heap); - term_put_tuple_element(error_tuple, 1, ref); - - TRACE("nif_socket_close: Sending msg to process %i\n", fd_obj->selecting_process_id); - globalcontext_send_message(ctx->global, fd_obj->selecting_process_id, error_tuple); } else { - AVM_LOGW(TAG, "Selectable socket %i was closed but no known pid is waiting. This shouldn't happen.", fd_obj->fd); + AVM_LOGW(TAG, "Selectable socket %i was closed but no known pid is waiting. This shouldn't happen.", rsrc_obj->fd); } } - int res = close(fd_obj->fd); + int res = close(rsrc_obj->fd); if (UNLIKELY(res != 0)) { AVM_LOGW(TAG, "Failed to close socket %i", res); } - TRACE("nif_socket_close: Clearing pid for socket fd=%i\n", fd_obj->fd); - fd_obj->fd = CLOSED_FD; - fd_obj->selecting_process_id = INVALID_PROCESS_ID; - fd_obj->ref_ticks = 0; + TRACE("nif_socket_close: Clearing pid for socket fd=%i\n", rsrc_obj->fd); + rsrc_obj->fd = CLOSED_FD; + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + rsrc_obj->ref_ticks = 0; + } else { + TRACE("Double close on socket fd %i", rsrc_obj->fd); + } +#elif OTP_SOCKET_LWIP + // If the socket is being selected by another process, send a closed tuple. + if (rsrc_obj->socket_state & SocketStateSelectingRead + && rsrc_obj->selecting_process_id != INVALID_PROCESS_ID + && rsrc_obj->selecting_process_id != ctx->process_id) { + // send a {closed, Ref | undefined} message to the pid + if (UNLIKELY(send_closed_notification(ctx, rsrc_obj) < 0)) { + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + } + if (rsrc_obj->socket_state == SocketStateClosed) { + TRACE("Double close on pcb"); } else { - TRACE("Double close on socket fd %i", fd_obj->fd); + if (rsrc_obj->socket_state & SocketStateTCP) { + LWIP_BEGIN(); + tcp_arg(rsrc_obj->tcp_pcb, NULL); + err_t err = tcp_close(rsrc_obj->tcp_pcb); + LWIP_END(); + if (UNLIKELY(err != ERR_OK)) { + AVM_LOGW(TAG, "tcp_close failed with err=%d: %s:%i.\n", err, __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + rsrc_obj->tcp_pcb = NULL; + } else if (rsrc_obj->socket_state & SocketStateUDP) { + LWIP_BEGIN(); + udp_recv(rsrc_obj->udp_pcb, udp_recv_cb, NULL); + udp_remove(rsrc_obj->udp_pcb); + LWIP_END(); + rsrc_obj->udp_pcb = NULL; + } + rsrc_obj->socket_state = SocketStateClosed; + // The resource should not go away until all callbacks are processed. + enif_keep_resource(rsrc_obj); + struct LWIPEvent event; + event.handler = finalize_close_hander; + event.finalize_close.rsrc_obj = rsrc_obj; + otp_socket_lwip_enqueue(&event); } +#endif + return OK_ATOM; } @@ -454,9 +746,149 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) // select // -static term nif_socket_select(Context *ctx, term argv[], enum ErlNifSelectFlags mode) +#if OTP_SOCKET_LWIP +static struct SocketResource *make_accepted_socket_resource(struct tcp_pcb *newpcb) +{ + struct SocketResource *conn_rsrc_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketResource)); + conn_rsrc_obj->socket_state = SocketStateTCPConnected; + conn_rsrc_obj->tcp_pcb = newpcb; + conn_rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + conn_rsrc_obj->pos = 0; + conn_rsrc_obj->linger_on = false; + conn_rsrc_obj->linger_sec = 0; + list_init(&conn_rsrc_obj->tcp_received_list); + + tcp_arg(newpcb, conn_rsrc_obj); + tcp_recv(newpcb, tcp_recv_cb); + return conn_rsrc_obj; +} + +static void tcp_selecting_accept_make_resource_handler(struct LWIPEvent *event) +{ + struct SocketResource *rsrc_obj = event->accept_make_resource.rsrc_obj; + struct SocketResource *conn_rsrc_obj = make_accepted_socket_resource(event->accept_make_resource.newpcb); + rsrc_obj->selected_client = conn_rsrc_obj; + rsrc_obj->socket_state = SocketStateTCPSelectingOneClient; + select_event_send_notification_from_handler(rsrc_obj, rsrc_obj->selecting_process_id); +} + +static err_t tcp_selecting_accept_cb(void *arg, struct tcp_pcb *newpcb, err_t err) +{ + UNUSED(err); + + struct SocketResource *rsrc_obj = (struct SocketResource *) arg; + if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { + if (newpcb != NULL) { + // Delay accepting of further clients + tcp_backlog_delayed(rsrc_obj->tcp_pcb); + // Because data can come in, we need to immediately set the receive callback + // However, we cannot allocate the resource because we can't call malloc from ISR. + tcp_arg(newpcb, NULL); + tcp_recv(newpcb, tcp_recv_cb); + + // Enqueue an event to further process accept + struct LWIPEvent event; + event.handler = tcp_selecting_accept_make_resource_handler; + event.accept_make_resource.rsrc_obj = rsrc_obj; + event.accept_make_resource.newpcb = newpcb; + otp_socket_lwip_enqueue(&event); + } else { + // Ignore errors for now (we are still accepting). + } + return ERR_OK; + } else { + // Selecting process died + if (newpcb != NULL) { + if (rsrc_obj->socket_state & SocketStateTCPListening) { + (void) tcp_close(newpcb); + } else { + tcp_abort(newpcb); + } + } + return ERR_ABRT; + } +} + +static void tcp_recv_handler(struct LWIPEvent *event) +{ + struct tcp_pcb *tpcb = event->tcp_recv.tpcb; + // The resource should have been set by now as make_resource was queued first + struct SocketResource *rsrc_obj = tpcb->callback_arg; + if (IS_NULL_PTR(rsrc_obj)) { + TRACE("Unexpected null resource in tcp_recv_handler -- buf = %p\n", (void *) event->tcp_recv.buf); + } else { + struct TCPReceivedItem *new_item = malloc(sizeof(struct TCPReceivedItem)); + list_append(&rsrc_obj->tcp_received_list, &new_item->list_head); + new_item->buf = event->tcp_recv.buf; + new_item->err = event->tcp_recv.err; + + // Send notification if we are selecting. + if (rsrc_obj->socket_state & SocketStateSelectingRead) { + // Clear flag to avoid sending a message again. + rsrc_obj->socket_state &= ~SocketStateSelectingRead; + if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { + select_event_send_notification_from_handler(rsrc_obj, rsrc_obj->selecting_process_id); + } // otherwise, selecting process died but we can just wait for monitor to handle it + } + } +} + +static err_t tcp_recv_cb(void *arg, struct tcp_pcb *tpcb, struct pbuf *p, err_t err) +{ + UNUSED(arg); + + // Enqueue an event to further process recv + // The resource may or may not have been created, so we store tpcb instead. + struct LWIPEvent event; + event.handler = tcp_recv_handler; + event.tcp_recv.tpcb = tpcb; + event.tcp_recv.buf = p; + event.tcp_recv.err = err; + otp_socket_lwip_enqueue(&event); + return ERR_OK; +} + +static void udp_recv_handler(struct LWIPEvent *event) +{ + struct SocketResource *rsrc_obj = event->udp_recv.rsrc_obj; + struct UDPReceivedItem *new_item = malloc(sizeof(struct UDPReceivedItem)); + list_append(&rsrc_obj->tcp_received_list, &new_item->list_head); + new_item->buf = event->udp_recv.buf; + new_item->addr = event->udp_recv.addr; + new_item->port = event->udp_recv.port; + + // Send notification if we are selecting. + if (rsrc_obj->socket_state & SocketStateSelectingRead) { + // Clear flag to avoid sending a message again. + rsrc_obj->socket_state &= ~SocketStateSelectingRead; + if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { + select_event_send_notification_from_handler(rsrc_obj, rsrc_obj->selecting_process_id); + } // otherwise, selecting process died + } +} + +static void udp_recv_cb(void *arg, struct udp_pcb *pcb, struct pbuf *p, const ip_addr_t *addr, u16_t port) +{ + UNUSED(pcb); + + struct SocketResource *rsrc_obj = (struct SocketResource *) arg; + if (LIKELY(rsrc_obj)) { + struct LWIPEvent event; + event.handler = udp_recv_handler; + event.udp_recv.rsrc_obj = rsrc_obj; + event.udp_recv.buf = p; + event.udp_recv.addr = addr; + event.udp_recv.port = port; + otp_socket_lwip_enqueue(&event); + } // Otherwise socket was closed. +} +#endif + +static term nif_socket_select_read(Context *ctx, int argc, term argv[]) { - TRACE("nif_socket_select\n"); + TRACE("nif_socket_select_read\n"); + + UNUSED(argc); VALIDATE_VALUE(argv[0], term_is_socket); @@ -469,45 +901,91 @@ static term nif_socket_select(Context *ctx, term argv[], enum ErlNifSelectFlags if (select_ref_term != UNDEFINED_ATOM) { VALIDATE_VALUE(select_ref_term, term_is_reference); } - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - TRACE("fd_obj->fd=%i\n", (int) fd_obj->fd); + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; ErlNifEnv *env = erl_nif_env_from_context(ctx); - if (fd_obj->selecting_process_id != process_pid && fd_obj->selecting_process_id != INVALID_PROCESS_ID) { - if (UNLIKELY(enif_demonitor_process(env, fd_obj, &fd_obj->selecting_process_monitor) != 0)) { - // RAISE_ERROR(posix_errno_to_term(EINVAL, global)); - } - fd_obj->selecting_process_id = INVALID_PROCESS_ID; + if (rsrc_obj->selecting_process_id != process_pid && rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { + // demonitor can fail if process is gone. + enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; } // Monitor first as select is less likely to fail and it's less expensive to demonitor // if select fails than to stop select if monitor fails - if (fd_obj->selecting_process_id != process_pid) { - if (UNLIKELY(enif_monitor_process(env, fd_obj, &process_pid, &fd_obj->selecting_process_monitor) != 0)) { + if (rsrc_obj->selecting_process_id != process_pid) { + if (UNLIKELY(enif_monitor_process(env, rsrc_obj, &process_pid, &rsrc_obj->selecting_process_monitor) != 0)) { RAISE_ERROR(NOPROC_ATOM); } - fd_obj->selecting_process_id = process_pid; + rsrc_obj->selecting_process_id = process_pid; } - if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), fd_obj->fd, mode, fd_obj, &process_pid, select_ref_term) < 0)) { - enif_demonitor_process(env, fd_obj, &fd_obj->selecting_process_monitor); - fd_obj->selecting_process_id = INVALID_PROCESS_ID; + rsrc_obj->ref_ticks = (select_ref_term == UNDEFINED_ATOM) ? 0 : term_to_ref_ticks(select_ref_term); + +#if OTP_SOCKET_BSD + TRACE("rsrc_obj->fd=%i\n", (int) rsrc_obj->fd); + + if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_READ, rsrc_obj, &process_pid, select_ref_term) < 0)) { + enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; RAISE_ERROR(BADARG_ATOM); } - TRACE("nif_socket_select: Setting pid for socket fd %i to %i\n", fd_obj->fd, process_pid); - fd_obj->ref_ticks = (select_ref_term == UNDEFINED_ATOM) ? 0 : term_to_ref_ticks(select_ref_term); + TRACE("nif_socket_select: Setting pid for socket fd %i to %i\n", rsrc_obj->fd, process_pid); - return OK_ATOM; -} +#elif OTP_SOCKET_LWIP + GlobalContext *global = ctx->global; -static term nif_socket_select_read(Context *ctx, int argc, term argv[]) -{ - UNUSED(argc); - return nif_socket_select(ctx, argv, ERL_NIF_SELECT_READ); + LWIP_BEGIN(); + if (rsrc_obj->socket_state == SocketStateTCPListening) { + rsrc_obj->socket_state = SocketStateTCPSelectingAccept; + tcp_accept(rsrc_obj->tcp_pcb, tcp_selecting_accept_cb); + // No longer delay processing of incoming connections + tcp_backlog_accepted(rsrc_obj->tcp_pcb); + } else if (rsrc_obj->socket_state == SocketStateTCPSelectingAccept) { + // noop + } else if (rsrc_obj->socket_state == SocketStateTCPSelectingOneClient || rsrc_obj->socket_state == SocketStateTCPSelectStopOneClient) { + rsrc_obj->socket_state = SocketStateTCPSelectingOneClient; + // Resend notification + if (ctx->process_id == process_pid) { + select_event_send_notification_from_nif(rsrc_obj, ctx); + } else { + Context *target = globalcontext_get_process_lock(global, process_pid); + if (target) { + select_event_send_notification_from_nif(rsrc_obj, ctx); + globalcontext_get_process_unlock(global, target); + } + } + } else if (rsrc_obj->socket_state == SocketStateTCPConnected) { + if (!list_is_empty(&rsrc_obj->tcp_received_list)) { + // Send (or resend) notification + select_event_send_notification_from_nif(rsrc_obj, ctx); + } else { + // Set flag to send it when a package will arrive. + rsrc_obj->socket_state = SocketStateTCPSelectingRead; + } + } else if (rsrc_obj->socket_state == SocketStateTCPSelectingRead) { + // noop + } else if (rsrc_obj->socket_state == SocketStateUDPIdle) { + if (!list_is_empty(&rsrc_obj->udp_received_list)) { + // Send (or resend) notification + select_event_send_notification_from_nif(rsrc_obj, ctx); + } else { + rsrc_obj->socket_state = SocketStateUDPSelectingRead; + } + } else if (rsrc_obj->socket_state == SocketStateUDPSelectingRead) { + // noop + } else { + enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor); + LWIP_END(); + RAISE_ERROR(BADARG_ATOM); + } + LWIP_END(); +#endif + + return OK_ATOM; } static term nif_socket_select_stop(Context *ctx, int argc, term argv[]) @@ -517,16 +995,30 @@ static term nif_socket_select_stop(Context *ctx, int argc, term argv[]) VALIDATE_VALUE(argv[0], term_is_socket); - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), fd_obj->fd, ERL_NIF_SELECT_STOP, fd_obj, NULL, term_nil()) < 0)) { + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; +#if OTP_SOCKET_BSD + if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil()) < 0)) { RAISE_ERROR(BADARG_ATOM); } return OK_ATOM; +#elif OTP_SOCKET_LWIP + LWIP_BEGIN(); + if (rsrc_obj->socket_state == SocketStateTCPSelectingAccept) { + rsrc_obj->socket_state = SocketStateTCPListening; + // Suspend processing of incoming connections + tcp_backlog_delayed(rsrc_obj->tcp_pcb); + } else if (rsrc_obj->socket_state == SocketStateTCPSelectingOneClient) { + rsrc_obj->socket_state = SocketStateTCPSelectStopOneClient; + } + LWIP_END(); + + return OK_ATOM; +#endif } // @@ -542,59 +1034,78 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - term level_tuple = argv[1]; - term value = argv[2]; - - term opt = term_get_tuple_element(level_tuple, 1); - if (globalcontext_is_term_equal_to_atom_string(global, opt, reuseaddr_atom)) { - int option_value = (value == TRUE_ATOM); - int res = setsockopt(fd_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int)); - if (UNLIKELY(res != 0)) { + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } - return make_error_tuple(posix_errno_to_term(errno, global), ctx); +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { +#endif + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } + term level_tuple = argv[1]; + term value = argv[2]; + + term opt = term_get_tuple_element(level_tuple, 1); + if (globalcontext_is_term_equal_to_atom_string(global, opt, reuseaddr_atom)) { + int option_value = (value == TRUE_ATOM); +#if OTP_SOCKET_BSD + int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int)); + if (UNLIKELY(res != 0)) { + return make_errno_tuple(ctx); + } else { + return OK_ATOM; + } +#elif OTP_SOCKET_LWIP + LWIP_BEGIN(); + if (option_value) { + if (rsrc_obj->socket_state & SocketStateTCP) { + ip_set_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR); } else { - return OK_ATOM; + ip_set_option(rsrc_obj->udp_pcb, SOF_REUSEADDR); } - } else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) { - term onoff = interop_kv_get_value(value, onoff_atom, ctx->global); - term linger = interop_kv_get_value(value, linger_atom, ctx->global); - VALIDATE_VALUE(linger, term_is_integer); - - struct linger sl; - sl.l_onoff = (onoff == TRUE_ATOM); - sl.l_linger = term_to_int(linger); - int res = setsockopt(fd_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); - if (UNLIKELY(res != 0)) { - - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } - return make_error_tuple(posix_errno_to_term(errno, global), ctx); + } else { + if (rsrc_obj->socket_state & SocketStateTCP) { + ip_reset_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR); } else { - return OK_ATOM; + ip_reset_option(rsrc_obj->udp_pcb, SOF_REUSEADDR); } - // TODO add more as needed - // int flag = 1; - // int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); - // if (UNLIKELY(res != 0)) { - // AVM_LOGW(TAG, "Failed to set TCP_NODELAY."); - // } + } + LWIP_END(); + return OK_ATOM; +#endif + } else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) { + term onoff = interop_kv_get_value(value, onoff_atom, ctx->global); + term linger = interop_kv_get_value(value, linger_atom, ctx->global); + VALIDATE_VALUE(linger, term_is_integer); + +#if OTP_SOCKET_BSD + struct linger sl; + sl.l_onoff = (onoff == TRUE_ATOM); + sl.l_linger = term_to_int(linger); + int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); + if (UNLIKELY(res != 0)) { + return make_errno_tuple(ctx); } else { - RAISE_ERROR(BADARG_ATOM); + return OK_ATOM; } +#elif OTP_SOCKET_LWIP + rsrc_obj->linger_on = (onoff == TRUE_ATOM); + rsrc_obj->linger_sec = term_to_int(linger); + return OK_ATOM; +#endif + // TODO add more as needed + // int flag = 1; + // int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); + // if (UNLIKELY(res != 0)) { + // AVM_LOGW(TAG, "Failed to set TCP_NODELAY."); + // } } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + RAISE_ERROR(BADARG_ATOM); } } @@ -611,43 +1122,58 @@ static term nif_socket_sockname(Context *ctx, int argc, term argv[]) GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - struct sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - int res = getsockname(fd_obj->fd, (struct sockaddr *) &addr, &addrlen); - - if (UNLIKELY(res != 0)) { - AVM_LOGE(TAG, "Unable to getsockname: fd=%i res=%i.", fd_obj->fd, res); - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { +#endif + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } - return make_error_tuple(posix_errno_to_term(errno, global), ctx); - } else { - // {ok, #{addr => {a,b,c,d}, port => integer()}} - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + term_map_size_in_terms(2) + TUPLE_SIZE(4)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } else { - term address = socket_tuple_from_addr(ctx, ntohl(addr.sin_addr.s_addr)); - term port_number = term_from_int(ntohs(addr.sin_port)); +#if OTP_SOCKET_BSD + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + int res = getsockname(rsrc_obj->fd, (struct sockaddr *) &addr, &addrlen); - term map = term_alloc_map(2, &ctx->heap); - term_set_map_assoc(map, 0, ADDR_ATOM, address); - term_set_map_assoc(map, 1, PORT_ATOM, port_number); - term return_value = port_create_tuple2(ctx, OK_ATOM, map); + if (UNLIKELY(res != 0)) { + AVM_LOGE(TAG, "Unable to getsockname: fd=%i res=%i.", rsrc_obj->fd, res); + return make_errno_tuple(ctx); + } + uint32_t ip4_u32 = ntohl(addr.sin_addr.s_addr); + uint16_t port_u16 = ntohs(addr.sin_port); +#elif OTP_SOCKET_LWIP + uint32_t ip4_u32; + uint16_t port_u16; + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateTCP) { + ip4_u32 = ntohl(ip_addr_get_ip4_u32(&rsrc_obj->tcp_pcb->local_ip)); + port_u16 = rsrc_obj->tcp_pcb->local_port; + } else { + ip4_u32 = ntohl(ip_addr_get_ip4_u32(&rsrc_obj->udp_pcb->local_ip)); + port_u16 = rsrc_obj->udp_pcb->local_port; + } + LWIP_END(); +#endif - return return_value; - } - } + // {ok, #{addr => {a,b,c,d}, port => integer()}} + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + term_map_size_in_terms(2) + TUPLE_SIZE(4)) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); } else { - RAISE_ERROR(posix_errno_to_term(EBADF, ctx->global)); + term address = socket_tuple_from_addr(ctx, ip4_u32); + term port_number = term_from_int(port_u16); + + term map = term_alloc_map(2, &ctx->heap); + term_set_map_assoc(map, 0, ADDR_ATOM, address); + term_set_map_assoc(map, 1, PORT_ATOM, port_number); + term return_value = port_create_tuple2(ctx, OK_ATOM, map); + + return return_value; } } @@ -664,43 +1190,60 @@ static term nif_socket_peername(Context *ctx, int argc, term argv[]) GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - struct sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - int res = getpeername(fd_obj->fd, (struct sockaddr *) &addr, &addrlen); + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; - if (UNLIKELY(res != 0)) { - AVM_LOGE(TAG, "Unable to getpeername: fd=%i res=%i.", fd_obj->fd, res); - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } + if (rsrc_obj->socket_state & SocketStateUDP) { + // TODO: handle "connected" UDP sockets + return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); + } + if ((rsrc_obj->socket_state & SocketStateTCPListening) == SocketStateTCPListening) { + return make_error_tuple(posix_errno_to_term(ENOTCONN, global), ctx); + } +#endif - return make_error_tuple(posix_errno_to_term(errno, global), ctx); - } else { - // {ok, #{addr => {a,b,c,d}, port => integer()}} - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + term_map_size_in_terms(2) + TUPLE_SIZE(4)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } else { - term address = socket_tuple_from_addr(ctx, ntohl(addr.sin_addr.s_addr)); - term port_number = term_from_int(ntohs(addr.sin_port)); +#if OTP_SOCKET_BSD + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + int res = getpeername(rsrc_obj->fd, (struct sockaddr *) &addr, &addrlen); - term map = term_alloc_map(2, &ctx->heap); - term_set_map_assoc(map, 0, ADDR_ATOM, address); - term_set_map_assoc(map, 1, PORT_ATOM, port_number); - term return_value = port_create_tuple2(ctx, OK_ATOM, map); + if (UNLIKELY(res != 0)) { + AVM_LOGE(TAG, "Unable to getpeername: fd=%i res=%i.", rsrc_obj->fd, res); + return make_errno_tuple(ctx); + } + uint32_t ip4_u32 = ntohl(addr.sin_addr.s_addr); + uint16_t port_u16 = ntohs(addr.sin_port); +#elif OTP_SOCKET_LWIP + // TODO: support peername for "connected" UDP sockets + uint32_t ip4_u32 = ntohl(ip_addr_get_ip4_u32(&rsrc_obj->tcp_pcb->remote_ip)); + uint16_t port_u16 = rsrc_obj->tcp_pcb->remote_port; +#endif - return return_value; - } - } + // {ok, #{addr => {a,b,c,d}, port => integer()}} + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + term_map_size_in_terms(2) + TUPLE_SIZE(4)) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + term address = socket_tuple_from_addr(ctx, ip4_u32); + term port_number = term_from_int(port_u16); + + term map = term_alloc_map(2, &ctx->heap); + term_set_map_assoc(map, 0, ADDR_ATOM, address); + term_set_map_assoc(map, 1, PORT_ATOM, port_number); + term return_value = port_create_tuple2(ctx, OK_ATOM, map); + + return return_value; } } @@ -717,51 +1260,95 @@ static term nif_socket_bind(Context *ctx, int argc, term argv[]) GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - term sockaddr = argv[1]; + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; +#if OTP_SOCKET_BSD + TRACE("rsrc_obj->fd=%i\n", (int) rsrc_obj->fd); + if (rsrc_obj->fd == 0) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } +#endif - struct sockaddr_in serveraddr; - memset(&serveraddr, 0, sizeof(serveraddr)); - serveraddr.sin_family = AF_INET; + term sockaddr = argv[1]; - if (globalcontext_is_term_equal_to_atom_string(global, sockaddr, any_atom)) { - serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); - } else if (globalcontext_is_term_equal_to_atom_string(global, sockaddr, loopback_atom)) { - serveraddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - } else if (term_is_map(sockaddr)) { - term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), ctx->global); - serveraddr.sin_port = htons(term_to_int(port)); - term addr = interop_kv_get_value(sockaddr, addr_atom, ctx->global); - if (globalcontext_is_term_equal_to_atom_string(global, addr, any_atom)) { - serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); - } else if (globalcontext_is_term_equal_to_atom_string(global, addr, loopback_atom)) { - serveraddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - } else { - serveraddr.sin_addr.s_addr = htonl(socket_tuple_to_addr(addr)); - } - } +#if OTP_SOCKET_BSD + struct sockaddr_in serveraddr; + memset(&serveraddr, 0, sizeof(serveraddr)); + serveraddr.sin_family = AF_INET; +#elif OTP_SOCKET_LWIP + ip_addr_t ip_addr; +#endif - socklen_t address_len = sizeof(serveraddr); - int res = bind(fd_obj->fd, (struct sockaddr *) &serveraddr, address_len); - if (UNLIKELY(res != 0)) { - AVM_LOGE(TAG, "Unable to bind socket: res=%i.", res); - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } - return make_error_tuple(posix_errno_to_term(errno, global), ctx); + uint16_t port_u16 = 0; + + if (globalcontext_is_term_equal_to_atom_string(global, sockaddr, any_atom)) { +#if OTP_SOCKET_BSD + serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); +#elif OTP_SOCKET_LWIP + ip_addr_set_any(false, &ip_addr); +#endif + } else if (globalcontext_is_term_equal_to_atom_string(global, sockaddr, loopback_atom)) { +#if OTP_SOCKET_BSD + serveraddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); +#elif OTP_SOCKET_LWIP + ip_addr_set_loopback(false, &ip_addr); +#endif + } else if (term_is_map(sockaddr)) { + term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), ctx->global); + port_u16 = term_to_int(port); + term addr = interop_kv_get_value(sockaddr, addr_atom, ctx->global); + if (globalcontext_is_term_equal_to_atom_string(global, addr, any_atom)) { +#if OTP_SOCKET_BSD + serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); +#elif OTP_SOCKET_LWIP + ip_addr_set_any(false, &ip_addr); +#endif + } else if (globalcontext_is_term_equal_to_atom_string(global, addr, loopback_atom)) { +#if OTP_SOCKET_BSD + serveraddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); +#elif OTP_SOCKET_LWIP + ip_addr_set_loopback(false, &ip_addr); +#endif } else { - return OK_ATOM; +#if OTP_SOCKET_BSD + serveraddr.sin_addr.s_addr = htonl(socket_tuple_to_addr(addr)); +#elif OTP_SOCKET_LWIP + ip_addr_set_ip4_u32(&ip_addr, htonl(socket_tuple_to_addr(addr))); +#endif } + } +#if OTP_SOCKET_BSD + serveraddr.sin_port = htons(port_u16); + socklen_t address_len = sizeof(serveraddr); + int res = bind(rsrc_obj->fd, (struct sockaddr *) &serveraddr, address_len); + if (UNLIKELY(res != 0)) { + AVM_LOGE(TAG, "Unable to bind socket: res=%i.", res); + return make_errno_tuple(ctx); } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + return OK_ATOM; + } +#elif OTP_SOCKET_LWIP + err_t res; + if (rsrc_obj->socket_state & SocketStateTCP) { + res = tcp_bind(rsrc_obj->tcp_pcb, &ip_addr, port_u16); + } else { + res = udp_bind(rsrc_obj->udp_pcb, &ip_addr, port_u16); + } + if (UNLIKELY(res != ERR_OK)) { + AVM_LOGE(TAG, "Unable to bind socket: res=%i.", res); + return make_lwip_err_tuple(res, ctx); + } else { + return OK_ATOM; } +#endif } // @@ -778,34 +1365,77 @@ static term nif_socket_listen(Context *ctx, int argc, term argv[]) VALIDATE_VALUE(argv[0], term_is_socket); VALIDATE_VALUE(argv[1], term_is_integer); - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - int backlog = term_to_int(argv[1]); - int res = listen(fd_obj->fd, backlog); - if (UNLIKELY(res != 0)) { - AVM_LOGE(TAG, "Unable to listen on socket: res=%i.", res); - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } + if (rsrc_obj->socket_state & SocketStateUDP) { + return make_error_tuple(posix_errno_to_term(EPROTOTYPE, global), ctx); + } +#endif - return make_error_tuple(posix_errno_to_term(errno, global), ctx); - } else { - return OK_ATOM; - } + int backlog = term_to_int(argv[1]); + +#if OTP_SOCKET_BSD + int res = listen(rsrc_obj->fd, backlog); + if (UNLIKELY(res != 0)) { + AVM_LOGE(TAG, "Unable to listen on socket: res=%i.", res); + return make_errno_tuple(ctx); } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + return OK_ATOM; + } +#elif OTP_SOCKET_LWIP + uint8_t backlog_u8; + if (backlog > 255) { + // POSIX says: "Implementations may impose a limit on backlog and silently reduce the specified value" + backlog_u8 = 255; + } + if (backlog < 0) { + // POSIX says: "If listen() is called with a backlog argument value that is less than 0, the function behaves as if it had been called with a backlog argument value of 0." + backlog_u8 = 0; + } else { + backlog_u8 = backlog; } + err_t err; + struct tcp_pcb *new_pcb = tcp_listen_with_backlog_and_err(rsrc_obj->tcp_pcb, backlog_u8, &err); + if (new_pcb == NULL) { + return make_lwip_err_tuple(err, ctx); + } + // Do not accept until we select/accept + tcp_backlog_delayed(new_pcb); + rsrc_obj->tcp_pcb = new_pcb; + rsrc_obj->socket_state = SocketStateTCPListening; + return OK_ATOM; +#endif } // // accept // +#if OTP_SOCKET_LWIP +static term make_accepted_socket_term(struct SocketResource *conn_rsrc_obj, Heap *heap, GlobalContext *global) +{ + term obj = term_from_resource(conn_rsrc_obj, heap); + + term socket_term = term_alloc_tuple(2, heap); + uint64_t ref_ticks = globalcontext_get_ref_ticks(global); + term ref = term_from_ref_ticks(ref_ticks, heap); + term_put_tuple_element(socket_term, 0, obj); + term_put_tuple_element(socket_term, 1, ref); + return socket_term; +} +#endif + static term nif_socket_accept(Context *ctx, int argc, term argv[]) { TRACE("nif_socket_accept\n"); @@ -815,132 +1445,216 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - struct sockaddr_in clientaddr; - socklen_t clientlen = sizeof(clientaddr); - int fd = accept(fd_obj->fd, (struct sockaddr *) &clientaddr, &clientlen); - if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) { - AVM_LOGE(TAG, "Unable to accept on socket %i.", fd_obj->fd); - - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } - - int err = errno; - term reason = (err == ECONNABORTED) ? CLOSED_ATOM : posix_errno_to_term(err, global); - return make_error_tuple(reason, ctx); + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state & SocketStateClosed) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } + if (rsrc_obj->socket_state & SocketStateUDP) { + return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); + } + // Only three states are allowed: + // - SocketStateTCPListening => block to accept (not used by otp_socket.erl) + // - SocketStateTCPSelectingOneClient => was selected, we have one in buffer + // - SocketStateTCPSelectStopOneClient => was selected, stop selecting, but we still have one in buffer + if (rsrc_obj->socket_state != SocketStateTCPListening && rsrc_obj->socket_state != SocketStateTCPSelectingOneClient && rsrc_obj->socket_state != SocketStateTCPSelectStopOneClient) { + return make_error_tuple(posix_errno_to_term(EINVAL, global), ctx); + } +#endif - } else { +#if OTP_SOCKET_BSD + struct sockaddr_in clientaddr; + socklen_t clientlen = sizeof(clientaddr); + int fd = accept(rsrc_obj->fd, (struct sockaddr *) &clientaddr, &clientlen); + if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) { + AVM_LOGE(TAG, "Unable to accept on socket %i.", rsrc_obj->fd); + int err = errno; + term reason = (err == ECONNABORTED) ? CLOSED_ATOM : posix_errno_to_term(err, global); + return make_error_tuple(reason, ctx); + } else { - struct SocketFd *conn_fd_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketFd)); - conn_fd_obj->fd = fd; - conn_fd_obj->selecting_process_id = INVALID_PROCESS_ID; - TRACE("nif_socket_accept: Created socket on accept fd=%i\n", fd_obj->fd); + struct SocketResource *conn_rsrc_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketResource)); + conn_rsrc_obj->fd = fd; + conn_rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + TRACE("nif_socket_accept: Created socket on accept fd=%i\n", rsrc_obj->fd); - term obj = enif_make_resource(erl_nif_env_from_context(ctx), conn_fd_obj); - enif_release_resource(conn_fd_obj); + term obj = enif_make_resource(erl_nif_env_from_context(ctx), conn_rsrc_obj); + enif_release_resource(conn_rsrc_obj); - size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; - if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &obj, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } + size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; + if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &obj, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } - term socket_term = term_alloc_tuple(2, &ctx->heap); - uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global); - term ref = term_from_ref_ticks(ref_ticks, &ctx->heap); - term_put_tuple_element(socket_term, 0, obj); - term_put_tuple_element(socket_term, 1, ref); + term socket_term = term_alloc_tuple(2, &ctx->heap); + uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global); + term ref = term_from_ref_ticks(ref_ticks, &ctx->heap); + term_put_tuple_element(socket_term, 0, obj); + term_put_tuple_element(socket_term, 1, ref); - term result = term_alloc_tuple(2, &ctx->heap); - term_put_tuple_element(result, 0, OK_ATOM); - term_put_tuple_element(result, 1, socket_term); + term result = term_alloc_tuple(2, &ctx->heap); + term_put_tuple_element(result, 0, OK_ATOM); + term_put_tuple_element(result, 1, socket_term); - return result; + return result; + } +#elif OTP_SOCKET_LWIP + term result; + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateSelectedOneClient) { + size_t requested_size = TERM_BOXED_RESOURCE_SIZE + TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; + if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + LWIP_END(); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + term socket_term = make_accepted_socket_term(rsrc_obj->selected_client, &ctx->heap, global); + rsrc_obj->selected_client = NULL; + if (rsrc_obj->socket_state == SocketStateTCPSelectStopOneClient) { + rsrc_obj->socket_state = SocketStateTCPListening; + } else { + rsrc_obj->socket_state = SocketStateTCPSelectingAccept; + // No longer delay processing of incoming connections + tcp_backlog_accepted(rsrc_obj->tcp_pcb); } + result = term_alloc_tuple(2, &ctx->heap); + term_put_tuple_element(result, 0, OK_ATOM); + term_put_tuple_element(result, 1, socket_term); } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + // return EAGAIN + return make_error_tuple(posix_errno_to_term(EAGAIN, ctx->global), ctx); } + LWIP_END(); + return result; +#endif } // // recv/recvfrom // -static term nif_socket_recv_with_peek(Context *ctx, int argc, term argv[], bool is_recvfrom) +#if OTP_SOCKET_BSD +static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) { TRACE("nif_socket_recv_with_peek\n"); - UNUSED(argc); - - VALIDATE_VALUE(argv[0], term_is_socket); GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { - RAISE_ERROR(BADARG_ATOM); + int flags = MSG_WAITALL; + // TODO parameterize buffer size + ssize_t res = recvfrom(rsrc_obj->fd, NULL, DEFAULT_BUFFER_SIZE, MSG_PEEK | flags, NULL, NULL); + TRACE("%li bytes available.\n", (long int) res); + if (res < 0) { + AVM_LOGI(TAG, "Unable to receive data on fd %i. errno=%i", rsrc_obj->fd, errno); + return make_errno_tuple(ctx); + } else if (res == 0) { + TRACE("Peer closed socket %i.\n", rsrc_obj->fd); + return make_error_tuple(CLOSED_ATOM, ctx); + } else { + ssize_t buffer_size = len == 0 ? (ssize_t) res : MIN((size_t) res, len); + + // {ok, Data :: binary()}} + size_t ensure_packet_avail = term_binary_data_size_in_terms(buffer_size) + BINARY_HEADER_SIZE; + size_t requested_size = ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + term_map_size_in_terms(2)) : 0); + + if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + + term data = term_create_uninitialized_binary(buffer_size, &ctx->heap, global); + const char *buffer = term_binary_data(data); + + // + // receive data on the socket + // + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + res = recvfrom(rsrc_obj->fd, (char *) buffer, buffer_size, flags, (struct sockaddr *) &addr, &addrlen); + + TRACE("otp_socket.recv_handler: received data on fd: %i available=%lu, read=%lu\n", rsrc_obj->fd, (unsigned long) res, (unsigned long) buffer_size); + + term payload = term_invalid_term(); + if (is_recvfrom) { + term address = socket_tuple_from_addr(ctx, ntohl(addr.sin_addr.s_addr)); + term port_number = term_from_int(ntohs(addr.sin_port)); + + term map = term_alloc_map(2, &ctx->heap); + term_set_map_assoc(map, 0, ADDR_ATOM, address); + term_set_map_assoc(map, 1, PORT_ATOM, port_number); + term tuple = port_heap_create_tuple2(&ctx->heap, map, data); + payload = port_heap_create_ok_tuple(&ctx->heap, tuple); + } else { + payload = port_heap_create_ok_tuple(&ctx->heap, data); + } + + return payload; } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - int flags = MSG_WAITALL; - // TODO parameterize buffer size - ssize_t res = recvfrom(fd_obj->fd, NULL, DEFAULT_BUFFER_SIZE, MSG_PEEK | flags, NULL, NULL); - TRACE("%li bytes available.\n", (long int) res); - if (res < 0) { - AVM_LOGI(TAG, "Unable to receive data on fd %i. errno=%i", fd_obj->fd, errno); +} - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } +static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) +{ + TRACE("nif_socket_recv_without_peek\n"); + + GlobalContext *global = ctx->global; - return make_error_tuple(posix_errno_to_term(errno, global), ctx); + // TODO plumb through buffer size + size_t buffer_size = len == 0 ? DEFAULT_BUFFER_SIZE : len; + char *buffer = malloc(buffer_size); + term payload = term_invalid_term(); - } else if (res == 0) { + if (IS_NULL_PTR(buffer)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); - TRACE("Peer closed socket %i.\n", fd_obj->fd); + } else { - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); + int flags = 0; + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + ssize_t res = recvfrom(rsrc_obj->fd, (char *) buffer, buffer_size, flags, (struct sockaddr *) &addr, &addrlen); + + if (res < 0) { + + int err = errno; + term reason = (err == ECONNRESET) ? globalcontext_make_atom(global, ATOM_STR("\xA", "econnreset")) : posix_errno_to_term(err, global); + + if (err == ECONNRESET) { + AVM_LOGI(TAG, "Peer closed connection."); + } else { + AVM_LOGE(TAG, "Unable to read data on socket %i. errno=%i", rsrc_obj->fd, errno); } - return make_error_tuple(CLOSED_ATOM, ctx); + return make_error_tuple(reason, ctx); + } else if (res == 0) { + TRACE("Peer closed socket %i.\n", rsrc_obj->fd); + return make_error_tuple(CLOSED_ATOM, ctx); } else { size_t len = (size_t) res; - ssize_t buffer_size = MIN(len, DEFAULT_BUFFER_SIZE); + TRACE("otp_socket.recv_handler: received data on fd: %i len=%lu\n", rsrc_obj->fd, (unsigned long) len); - // {ok, Data :: binary()}} - size_t ensure_packet_avail = term_binary_data_size_in_terms(buffer_size) + BINARY_HEADER_SIZE; - size_t requested_size = ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + term_map_size_in_terms(2)) : 0); + size_t ensure_packet_avail = term_binary_data_size_in_terms(len) + BINARY_HEADER_SIZE; + size_t requested_size = REF_SIZE + 2 * TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + term_map_size_in_terms(2)) : 0); if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - term data = term_create_uninitialized_binary(buffer_size, &ctx->heap, global); - const char *buffer = term_binary_data(data); + term data = term_from_literal_binary(buffer, len, &ctx->heap, global); - // - // receive data on the socket - // - struct sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - res = recvfrom(fd_obj->fd, (char *) buffer, buffer_size, flags, (struct sockaddr *) &addr, &addrlen); - - TRACE("otp_socket.recv_handler: received data on fd: %i len=%lu\n", fd_obj->fd, (unsigned long) len); - - term payload = term_invalid_term(); if (is_recvfrom) { term address = socket_tuple_from_addr(ctx, ntohl(addr.sin_addr.s_addr)); term port_number = term_from_int(ntohs(addr.sin_port)); @@ -953,137 +1667,221 @@ static term nif_socket_recv_with_peek(Context *ctx, int argc, term argv[], bool } else { payload = port_heap_create_ok_tuple(&ctx->heap, data); } - - return payload; } - } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + + free(buffer); + return payload; } } -static term nif_socket_recv_without_peek(Context *ctx, int argc, term argv[], bool is_recvfrom) +#elif OTP_SOCKET_LWIP +static size_t copy_pbuf_data(struct pbuf *src, size_t offset, size_t count, uint8_t *dst) { - TRACE("nif_socket_recv_without_peek\n"); - UNUSED(argc); - - VALIDATE_VALUE(argv[0], term_is_socket); + size_t copied = 0; + while (count > 0 && src != NULL) { + if (offset > src->len) { + offset -= src->len; + src = src->next; + continue; + } + size_t chunk_count = MIN(count, src->len - offset); + memcpy(dst, src->payload, chunk_count); + count -= chunk_count; + copied += chunk_count; + dst += chunk_count; + src = src->next; + } + return copied; +} +static term nif_socket_recv_lwip(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) +{ GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { - RAISE_ERROR(BADARG_ATOM); + size_t buffer_size = 0; + bool closed = false; + err_t err = ERR_OK; + // Use lwIP lock + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateTCP) { + // TCP: we return up to len bytes or all available (if len == 0) + struct ListHead *item; + LIST_FOR_EACH (item, &rsrc_obj->tcp_received_list) { + struct TCPReceivedItem *received_item = GET_LIST_ENTRY(item, struct TCPReceivedItem, list_head); + if (received_item->buf == NULL || received_item->err != ERR_OK) { + closed = received_item->buf == NULL; + err = received_item->err; + break; + } else { + buffer_size += received_item->buf->tot_len; + if (len > 0 && buffer_size >= len) { + buffer_size = len; + break; + } + } + } + } else { + // UDP: we return the first message and truncate it to len if len != 0 + if (!list_is_empty(&rsrc_obj->udp_received_list)) { + struct UDPReceivedItem *first_item = CONTAINER_OF(list_first(&rsrc_obj->udp_received_list), struct UDPReceivedItem, list_head); + buffer_size = first_item->buf->tot_len; + if (len > 0 && buffer_size > len) { + buffer_size = len; + } + } } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - - // TODO plumb through buffer size - size_t buffer_size = DEFAULT_BUFFER_SIZE; - char *buffer = malloc(buffer_size); - term payload = term_invalid_term(); + LWIP_END(); - if (IS_NULL_PTR(buffer)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - - } else { - - int flags = 0; - struct sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - ssize_t res = recvfrom(fd_obj->fd, (char *) buffer, buffer_size, flags, (struct sockaddr *) &addr, &addrlen); - - if (res < 0) { + // If we have no data, return EAGAIN or closed or the error. + if (buffer_size == 0) { + if (closed) { + return make_error_tuple(CLOSED_ATOM, ctx); + } + if (err != ERR_OK) { + return make_error_tuple(term_from_int(err), ctx); + } + return make_error_tuple(posix_errno_to_term(EAGAIN, global), ctx); + } - int err = errno; - term reason = (err == ECONNRESET) ? globalcontext_make_atom(global, ATOM_STR("\xA", "econnreset")) : posix_errno_to_term(err, global); + size_t ensure_packet_avail = term_binary_data_size_in_terms(len) + BINARY_HEADER_SIZE; + size_t requested_size = REF_SIZE + 2 * TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + term_map_size_in_terms(2)) : 0); + if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } - if (err == ECONNRESET) { - TRACE("Peer closed connection.\n"); + term data = term_create_uninitialized_binary(buffer_size, &ctx->heap, global); + uint8_t *ptr = (uint8_t *) term_binary_data(data); + size_t remaining = buffer_size; + + uint32_t ip4_u32; + uint16_t port_u16; + + // Use lwIP lock + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateTCP) { + size_t pos = rsrc_obj->pos; + struct ListHead *item; + struct ListHead *tmp; + MUTABLE_LIST_FOR_EACH (item, tmp, &rsrc_obj->tcp_received_list) { + struct TCPReceivedItem *received_item = GET_LIST_ENTRY(item, struct TCPReceivedItem, list_head); + if (received_item->buf == NULL || received_item->err != ERR_OK) { + break; + } + if (pos < received_item->buf->tot_len) { + size_t copied = copy_pbuf_data(received_item->buf, pos, remaining, ptr); + ptr += copied; + tcp_recved(rsrc_obj->tcp_pcb, copied); + if (copied + pos == received_item->buf->tot_len) { + // all data was copied. + list_remove(item); + pbuf_free(received_item->buf); + pos = 0; } else { - AVM_LOGE(TAG, "Unable to read data on socket %i. errno=%i", fd_obj->fd, errno); - } - - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); + pos = pos + copied; } - - return make_error_tuple(reason, ctx); - - } else if (res == 0) { - - TRACE("Peer closed socket %i.\n", fd_obj->fd); - - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); + if (remaining == 0) { + break; } - - return make_error_tuple(CLOSED_ATOM, ctx); - } else { + pos -= received_item->buf->tot_len; + } + } + rsrc_obj->pos = pos; + if (is_recvfrom) { + ip4_u32 = ntohl(ip_addr_get_ip4_u32(&rsrc_obj->tcp_pcb->remote_ip)); + port_u16 = rsrc_obj->tcp_pcb->remote_port; + } + } else { + struct UDPReceivedItem *first_item = CONTAINER_OF(list_first(&rsrc_obj->udp_received_list), struct UDPReceivedItem, list_head); + copy_pbuf_data(first_item->buf, 0, remaining, ptr); + if (is_recvfrom) { + ip4_u32 = ntohl(ip_addr_get_ip4_u32(first_item->addr)); + port_u16 = first_item->port; + } + list_remove(&first_item->list_head); + pbuf_free(first_item->buf); + free(first_item); + } + LWIP_END(); - size_t len = (size_t) res; - TRACE("otp_socket.recv_handler: received data on fd: %i len=%lu\n", fd_obj->fd, (unsigned long) len); - - size_t ensure_packet_avail = term_binary_data_size_in_terms(len) + BINARY_HEADER_SIZE; - size_t requested_size = REF_SIZE + 2 * TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + term_map_size_in_terms(2)) : 0); - - if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } - - term data = term_from_literal_binary(buffer, len, &ctx->heap, global); - - if (is_recvfrom) { - term address = socket_tuple_from_addr(ctx, ntohl(addr.sin_addr.s_addr)); - term port_number = term_from_int(ntohs(addr.sin_port)); + term payload; - term map = term_alloc_map(2, &ctx->heap); - term_set_map_assoc(map, 0, ADDR_ATOM, address); - term_set_map_assoc(map, 1, PORT_ATOM, port_number); - term tuple = port_heap_create_tuple2(&ctx->heap, map, data); - payload = port_heap_create_ok_tuple(&ctx->heap, tuple); - } else { - payload = port_heap_create_ok_tuple(&ctx->heap, data); - } - } + if (is_recvfrom) { + term address = socket_tuple_from_addr(ctx, ip4_u32); + term port_number = term_from_int(port_u16); - free(buffer); - return payload; - } + term map = term_alloc_map(2, &ctx->heap); + term_set_map_assoc(map, 0, ADDR_ATOM, address); + term_set_map_assoc(map, 1, PORT_ATOM, port_number); + term tuple = port_heap_create_tuple2(&ctx->heap, map, data); + payload = port_heap_create_ok_tuple(&ctx->heap, tuple); } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + payload = port_heap_create_ok_tuple(&ctx->heap, data); } + + return payload; } +#endif -static term nif_socket_recv_internal(Context *ctx, int argc, term argv[], bool is_recvfrom) +static term nif_socket_recv_internal(Context *ctx, term argv[], bool is_recvfrom) { + VALIDATE_VALUE(argv[0], term_is_socket); + VALIDATE_VALUE(argv[1], term_is_integer); + avm_int_t len = term_to_int(argv[1]); + // We raise badarg but return error tuples for POSIX errors + if (UNLIKELY(len < 0)) { + RAISE_ERROR(BADARG_ATOM); + } + + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { + return make_error_tuple(posix_errno_to_term(EINVAL, ctx->global), ctx); + } + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { + return make_error_tuple(posix_errno_to_term(EBADF, ctx->global), ctx); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state & SocketStateClosed) { + return make_error_tuple(posix_errno_to_term(EBADF, ctx->global), ctx); + } + if (rsrc_obj->socket_state & SocketStateListening) { + return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, ctx->global), ctx); + } +#endif + +#if OTP_SOCKET_BSD if (otp_socket_platform_supports_peek()) { - return nif_socket_recv_with_peek(ctx, argc, argv, is_recvfrom); + return nif_socket_recv_with_peek(ctx, rsrc_obj, len, is_recvfrom); } else { - return nif_socket_recv_without_peek(ctx, argc, argv, is_recvfrom); + return nif_socket_recv_without_peek(ctx, rsrc_obj, len, is_recvfrom); } +#elif OTP_SOCKET_LWIP + return nif_socket_recv_lwip(ctx, rsrc_obj, len, is_recvfrom); +#endif } static term nif_socket_recv(Context *ctx, int argc, term argv[]) { + UNUSED(argc); + TRACE("nif_socket_recv\n"); - return nif_socket_recv_internal(ctx, argc, argv, false); + return nif_socket_recv_internal(ctx, argv, false); } static term nif_socket_recvfrom(Context *ctx, int argc, term argv[]) { + UNUSED(argc); + TRACE("nif_socket_recvfrom\n"); - return nif_socket_recv_internal(ctx, argc, argv, true); + return nif_socket_recv_internal(ctx, argv, true); } // // send/sendto // - static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool is_sendto) { TRACE("nif_socket_send_internal\n"); @@ -1094,79 +1892,143 @@ static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool i GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; - term data = argv[1]; - term dest = term_invalid_term(); - if (is_sendto) { - dest = argv[2]; - } - - // TODO make non-blocking +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } + if (rsrc_obj->socket_state & SocketStateListening) { + return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); + } +#endif - const char *buf = term_binary_data(data); - size_t len = term_binary_size(data); + term data = argv[1]; + term dest = term_invalid_term(); + if (is_sendto) { + dest = argv[2]; + } - ssize_t sent_data = -1; + const char *buf = term_binary_data(data); + size_t len = term_binary_size(data); - if (is_sendto) { + ssize_t sent_data = -1; - struct sockaddr_in destaddr; - memset(&destaddr, 0, sizeof(destaddr)); - destaddr.sin_family = AF_INET; +#if OTP_SOCKET_BSD + // TODO make non-blocking - term port = interop_kv_get_value_default(dest, port_atom, term_from_int(0), global); - destaddr.sin_port = htons(term_to_int(port)); - term addr = interop_kv_get_value(dest, addr_atom, ctx->global); - if (globalcontext_is_term_equal_to_atom_string(global, addr, loopback_atom)) { - destaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - } else { - destaddr.sin_addr.s_addr = htonl(socket_tuple_to_addr(addr)); - } + if (is_sendto) { - sent_data = sendto(fd_obj->fd, buf, len, 0, (struct sockaddr *) &destaddr, sizeof(destaddr)); + struct sockaddr_in destaddr; + memset(&destaddr, 0, sizeof(destaddr)); + destaddr.sin_family = AF_INET; + term port = interop_kv_get_value_default(dest, port_atom, term_from_int(0), global); + destaddr.sin_port = htons(term_to_int(port)); + term addr = interop_kv_get_value(dest, addr_atom, ctx->global); + if (globalcontext_is_term_equal_to_atom_string(global, addr, loopback_atom)) { + destaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); } else { - sent_data = send(fd_obj->fd, buf, len, 0); + destaddr.sin_addr.s_addr = htonl(socket_tuple_to_addr(addr)); } - // {ok, RestData} | {error, Reason} + sent_data = sendto(rsrc_obj->fd, buf, len, 0, (struct sockaddr *) &destaddr, sizeof(destaddr)); - if (sent_data > 0) { + } else { + sent_data = send(rsrc_obj->fd, buf, len, 0); + } - // assert sent_data < len - size_t rest_len = len - sent_data; - if (rest_len == 0) { - return OK_ATOM; - } +#elif OTP_SOCKET_LWIP + err_t err; + ip_addr_t ip_addr; + uint16_t port_u16; + if (is_sendto) { - size_t requested_size = term_sub_binary_heap_size(data, rest_len); - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + requested_size) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); + term port_term = interop_kv_get_value_default(dest, port_atom, term_from_int(0), global); + avm_int_t port_number = term_to_int(port_term); + if (port_number < 0 || port_number > 65535) { + RAISE_ERROR(BADARG_ATOM); + } + port_u16 = (uint16_t) port_number; + term addr = interop_kv_get_value(dest, addr_atom, ctx->global); + if (globalcontext_is_term_equal_to_atom_string(global, addr, loopback_atom)) { + ip_addr_set_loopback(false, &ip_addr); + } else { + ip_addr_set_ip4_u32(&ip_addr, htonl(socket_tuple_to_addr(addr))); + } + } + + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateUDP) { + struct pbuf *p = pbuf_alloc(PBUF_TRANSPORT, len, PBUF_RAM); + char *bytes = (char *) p->payload; + memcpy(bytes, buf, len); + if (is_sendto) { + err = udp_sendto(rsrc_obj->udp_pcb, p, &ip_addr, port_u16); + } else { + err = udp_send(rsrc_obj->udp_pcb, p); + } + if (err == ERR_OK) { + sent_data = len; + } + pbuf_free(p); + } else { + // If the socket is connection-mode, dest_addr shall be ignored. + // Because we are copying, we cannot really send tcp_sndbuf(rsrc_obj->tcp_pcb) at once + size_t buflen = MIN(len, TCP_MSS); + int flags = TCP_WRITE_FLAG_COPY; + if (buflen < len) { + flags |= TCP_WRITE_FLAG_MORE; + } + err = tcp_write(rsrc_obj->tcp_pcb, buf, buflen, flags); + if (err == ERR_MEM) { + sent_data = 0; + } else { + if (err == ERR_OK) { + err = tcp_output(rsrc_obj->tcp_pcb); + if (err == ERR_OK) { + sent_data = buflen; + } } + } + } + LWIP_END(); - term rest = term_maybe_create_sub_binary(data, sent_data, rest_len, &ctx->heap, ctx->global); - return port_create_tuple2(ctx, OK_ATOM, rest); +#endif + // {ok, RestData} | {error, Reason} - } else { + size_t rest_len = len - sent_data; + if (rest_len == 0) { + return OK_ATOM; + } else if (sent_data > 0) { - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } + size_t requested_size = term_sub_binary_heap_size(data, rest_len); + if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(2) + requested_size, 1, &data, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } - TRACE("Unable to send data: res=%zi.\n", sent_data); - return make_error_tuple(CLOSED_ATOM, ctx); + term rest = term_maybe_create_sub_binary(data, sent_data, rest_len, &ctx->heap, ctx->global); + return port_create_tuple2(ctx, OK_ATOM, rest); + + } else if (sent_data == 0) { + if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(2), 1, &data, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); } + return port_create_tuple2(ctx, OK_ATOM, data); } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + AVM_LOGE(TAG, "Unable to send data: res=%zi.", sent_data); + return make_error_tuple(CLOSED_ATOM, ctx); } } @@ -1186,6 +2048,61 @@ static term nif_socket_sendto(Context *ctx, int argc, term argv[]) // connect // +#if OTP_SOCKET_LWIP +static void trap_answer_ok(struct LWIPEvent *event) +{ + Context *target_ctx = globalcontext_get_process_lock(event->trap_answer_ok.global, event->trap_answer_ok.target_pid); + if (target_ctx) { + mailbox_send_term_signal(target_ctx, TrapAnswerSignal, OK_ATOM); + globalcontext_get_process_unlock(event->trap_answer_ok.global, target_ctx); + } +} + +static void trap_answer_closed(struct LWIPEvent *event) +{ + BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(2), heap); + { + term result = term_alloc_tuple(2, &heap); + term_put_tuple_element(result, 0, ERROR_ATOM); + // TODO: We may want to interpret err + term_put_tuple_element(result, 1, CLOSED_ATOM); + Context *target_ctx = globalcontext_get_process_lock(event->trap_answer_closed.global, event->trap_answer_closed.target_pid); + if (target_ctx) { + mailbox_send_term_signal(target_ctx, TrapAnswerSignal, result); + globalcontext_get_process_unlock(event->trap_answer_closed.global, target_ctx); + } + } + END_WITH_STACK_HEAP(heap, event->trap_answer_closed.global); +} + +static err_t tcp_connected_cb(void *arg, struct tcp_pcb *tpcb, err_t err) +{ + UNUSED(tpcb); + + struct SocketResource *rsrc_obj = (struct SocketResource *) arg; + struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj); + GlobalContext *global = rsrc_refc->resource_type->global; + int32_t target_pid = rsrc_obj->selecting_process_id; + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + if (target_pid != INVALID_PROCESS_ID) { + if (err == ERR_OK) { + struct LWIPEvent event; + event.handler = trap_answer_ok; + event.trap_answer_ok.global = global; + event.trap_answer_ok.target_pid = target_pid; + otp_socket_lwip_enqueue(&event); + } else { + struct LWIPEvent event; + event.handler = trap_answer_closed; + event.trap_answer_closed.global = global; + event.trap_answer_closed.target_pid = target_pid; + otp_socket_lwip_enqueue(&event); + } + } // else: sender died + return ERR_OK; +} +#endif + static term nif_socket_connect(Context *ctx, int argc, term argv[]) { TRACE("nif_socket_connect\n"); @@ -1196,59 +2113,93 @@ static term nif_socket_connect(Context *ctx, int argc, term argv[]) GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; - term sockaddr = argv[1]; - struct sockaddr_in address; - memset(&address, 0, sizeof(struct sockaddr_in)); - address.sin_family = AF_INET; + term sockaddr = argv[1]; + term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), ctx->global); + term addr = interop_kv_get_value(sockaddr, addr_atom, ctx->global); + if (term_is_invalid_term(addr)) { + RAISE_ERROR(BADARG_ATOM); + } + avm_int_t port_number = term_to_int(port); + if (port_number < 0 || port_number > 65535) { + RAISE_ERROR(BADARG_ATOM); + } - term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), ctx->global); - address.sin_port = htons(term_to_int(port)); +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } + if (((rsrc_obj->socket_state & SocketStateTCPListening) == SocketStateTCPListening) + || ((rsrc_obj->socket_state & SocketStateTCPConnected) == SocketStateTCPConnected)) { + return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); + } +#endif - term addr = interop_kv_get_value(sockaddr, addr_atom, ctx->global); - if (term_is_invalid_term(addr)) { - RAISE_ERROR(BADARG_ATOM); - } - if (globalcontext_is_term_equal_to_atom_string(global, addr, loopback_atom)) { - address.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - } else { - // TODO more validation on addr tuple - address.sin_addr.s_addr = htonl(socket_tuple_to_addr(addr)); - } +#if OTP_SOCKET_BSD + struct sockaddr_in address; + memset(&address, 0, sizeof(struct sockaddr_in)); + address.sin_family = AF_INET; - socklen_t addr_len = sizeof(struct sockaddr_in); - int res = connect(fd_obj->fd, (const struct sockaddr *) &address, addr_len); - if (res == -1) { - if (errno == EINPROGRESS) { + address.sin_port = htons(port_number); - // TODO make connect non-blocking - return UNDEFINED_ATOM; + if (globalcontext_is_term_equal_to_atom_string(global, addr, loopback_atom)) { + address.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + } else { + // TODO more validation on addr tuple + address.sin_addr.s_addr = htonl(socket_tuple_to_addr(addr)); + } - } else { - AVM_LOGE(TAG, "Unable to connect: res=%i errno=%i", res, errno); + socklen_t addr_len = sizeof(struct sockaddr_in); + int res = connect(rsrc_obj->fd, (const struct sockaddr *) &address, addr_len); + if (res == -1) { + if (errno == EINPROGRESS) { - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } + // TODO make connect non-blocking + return UNDEFINED_ATOM; - return make_error_tuple(CLOSED_ATOM, ctx); - } - } else if (res == 0) { - return OK_ATOM; } else { - // won't happen according to connect(2) - return UNDEFINED_ATOM; + AVM_LOGE(TAG, "Unable to connect: res=%i errno=%i", res, errno); + return make_error_tuple(CLOSED_ATOM, ctx); } + } else if (res == 0) { + return OK_ATOM; } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + // won't happen according to connect(2) + return UNDEFINED_ATOM; + } +#elif OTP_SOCKET_LWIP + ip_addr_t ip_addr; + ip_addr_set_ip4_u32(&ip_addr, htonl(socket_tuple_to_addr(addr))); + err_t err; + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateUDP) { + err = udp_connect(rsrc_obj->udp_pcb, &ip_addr, port_number); + } else { + err = tcp_connect(rsrc_obj->tcp_pcb, &ip_addr, port_number, tcp_connected_cb); } + LWIP_END(); + + if (UNLIKELY(err != ERR_OK)) { + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + if (rsrc_obj->socket_state & SocketStateUDP) { + return OK_ATOM; + } else { + rsrc_obj->selecting_process_id = ctx->process_id; + // Trap caller waiting for completion + context_update_flags(ctx, ~NoFlags, Trap); + return term_invalid_term(); + } +#endif } // @@ -1265,49 +2216,63 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[]) GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; - int how; + int how; + int val = interop_atom_term_select_int(otp_socket_shutdown_direction_table, argv[1], global); + switch (val) { - int val = interop_atom_term_select_int(otp_socket_shutdown_direction_table, argv[1], global); - switch (val) { + case OtpSocketReadShutdownDirection: + how = SHUT_RD; + break; - case OtpSocketReadShutdownDirection: - how = SHUT_RD; - break; + case OtpSocketWriteShutdownDirection: + how = SHUT_WR; + break; - case OtpSocketWriteShutdownDirection: - how = SHUT_WR; - break; - - case OtpSocketReadWriteShutdownDirection: - how = SHUT_RDWR; - break; + case OtpSocketReadWriteShutdownDirection: + how = SHUT_RDWR; + break; - default: - RAISE_ERROR(BADARG_ATOM); - } + default: + RAISE_ERROR(BADARG_ATOM); + } - int res = shutdown(fd_obj->fd, how); - if (res < 0) { - AVM_LOGE(TAG, "Unable to shut down socket: res=%i errno=%i", res, errno); +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { +#endif + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } + term result = OK_ATOM; - return make_error_tuple(posix_errno_to_term(errno, global), ctx); +#if OTP_SOCKET_BSD + int res = shutdown(rsrc_obj->fd, how); + if (res < 0) { + AVM_LOGE(TAG, "Unable to shut down socket: res=%i errno=%i", res, errno); + return make_errno_tuple(ctx); + } +#elif OTP_SOCKET_LWIP + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateTCP) { + err_t res = tcp_shutdown(rsrc_obj->tcp_pcb, how != SHUT_WR, how != SHUT_RD); + if (how == SHUT_RDWR && res == ERR_OK) { + rsrc_obj->tcp_pcb = NULL; + } + if (res != ERR_OK) { + AVM_LOGE(TAG, "Unable to shut down socket: res=%i", res); + result = make_lwip_err_tuple(res, ctx); } - return OK_ATOM; - } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } + LWIP_END(); +#endif + return result; } // @@ -1424,11 +2389,11 @@ const struct Nif *otp_socket_nif_get_nif(const char *nifname) TRACE("Resolved platform nif %s ...\n", nifname); return &socket_accept_nif; } - if (strcmp("nif_recv/1", rest) == 0) { + if (strcmp("nif_recv/2", rest) == 0) { TRACE("Resolved platform nif %s ...\n", nifname); return &socket_recv_nif; } - if (strcmp("nif_recvfrom/1", rest) == 0) { + if (strcmp("nif_recvfrom/2", rest) == 0) { TRACE("Resolved platform nif %s ...\n", nifname); return &socket_recvfrom_nif; } diff --git a/src/libAtomVM/otp_socket.h b/src/libAtomVM/otp_socket.h index 417cb44fcb..382088c876 100644 --- a/src/libAtomVM/otp_socket.h +++ b/src/libAtomVM/otp_socket.h @@ -27,10 +27,78 @@ extern "C" { #include #include +#include + +#if !defined(OTP_SOCKET_BSD) && !defined(OTP_SOCKET_LWIP) +#if HAVE_SOCKET && HAVE_SELECT +#define OTP_SOCKET_BSD 1 +#elif HAVE_LWIP_RAW +#define OTP_SOCKET_LWIP 1 +#else +#error OTP Socket requires BSD Socket or lwIP +#endif +#endif const struct Nif *otp_socket_nif_get_nif(const char *nifname); void otp_socket_init(GlobalContext *global); +#if OTP_SOCKET_LWIP +struct LWIPEvent +{ + void (*handler)(struct LWIPEvent *event); + union + { + struct + { + struct SocketResource *rsrc_obj; + struct tcp_pcb *newpcb; + } accept_make_resource; + struct + { + struct tcp_pcb *tpcb; + struct pbuf *buf; + err_t err; + } tcp_recv; + struct + { + struct SocketResource *rsrc_obj; + struct pbuf *buf; + const ip_addr_t *addr; + u16_t port; + } udp_recv; + struct + { + GlobalContext *global; + int32_t target_pid; + } trap_answer_ok; + struct + { + GlobalContext *global; + int32_t target_pid; + } trap_answer_closed; + struct + { + struct SocketResource *rsrc_obj; + } finalize_close; + }; +}; + +/** + * @brief Enqueue an event to be processed in task context + * @param event the event to enqueue + * @details This function must be implemented in otp_socket_platform.c. + * Platforms using lwIP implementation may have lwIP callbacks coming + * from ISR and in such case need to implement a queue mechanism to process + * them in task context. GlobalContext is not always available in lwIP callbacks + * so platforms using a queue need a global variable. + * If lwIP callbacks are not called from ISR, calling handler with the event is + * sufficient. + * @end + */ +void otp_socket_lwip_enqueue(struct LWIPEvent *event); + +#endif + #ifdef __cplusplus } #endif diff --git a/src/libAtomVM/posix_nifs.c b/src/libAtomVM/posix_nifs.c index 4e7e690425..07881e0ff0 100644 --- a/src/libAtomVM/posix_nifs.c +++ b/src/libAtomVM/posix_nifs.c @@ -112,6 +112,12 @@ term posix_errno_to_term(int err, GlobalContext *glb) return globalcontext_make_atom(glb, ATOM_STR("\x5", "esrch")); case EXDEV: return globalcontext_make_atom(glb, ATOM_STR("\x5", "exdev")); + case EPROTOTYPE: + return globalcontext_make_atom(glb, ATOM_STR("\x8", "eprototype")); + case ENOTCONN: + return globalcontext_make_atom(glb, ATOM_STR("\x8", "enotconn")); + case EOPNOTSUPP: + return globalcontext_make_atom(glb, ATOM_STR("\xA", "eopnotsupp")); } #else UNUSED(glb); diff --git a/src/libAtomVM/resources.c b/src/libAtomVM/resources.c index 51d94160d0..85beff94ff 100644 --- a/src/libAtomVM/resources.c +++ b/src/libAtomVM/resources.c @@ -184,10 +184,8 @@ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, // Second read or second write overwrite ref & pid. if (ref == UNDEFINED_ATOM) { select_event->ref_ticks = 0; - select_event->undefined_ref = 1; } else { select_event->ref_ticks = term_to_ref_ticks(ref); - select_event->undefined_ref = 0; } select_event->local_pid = *pid; select_event->read = mode & ERL_NIF_SELECT_READ; @@ -203,21 +201,28 @@ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, return 0; } -static void select_event_send_notification(struct SelectEvent *select_event, bool is_write, GlobalContext *global) +term select_event_make_notification(void *rsrc_obj, uint64_t ref_ticks, bool is_write, Heap *heap) { - BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(4) + REF_SIZE + TERM_BOXED_REFC_BINARY_SIZE, heap) - term notification = term_alloc_tuple(4, &heap); + term notification = term_alloc_tuple(4, heap); term_put_tuple_element(notification, 0, SELECT_ATOM); - term_put_tuple_element(notification, 1, term_from_resource(select_event->resource->data, &heap)); - refc_binary_increment_refcount(select_event->resource); + term_put_tuple_element(notification, 1, term_from_resource(rsrc_obj, heap)); + struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj); + refc_binary_increment_refcount(rsrc_refc); term ref; - if (select_event->undefined_ref) { + if (ref_ticks == 0) { ref = UNDEFINED_ATOM; } else { - ref = term_from_ref_ticks(select_event->ref_ticks, &heap); + ref = term_from_ref_ticks(ref_ticks, heap); } term_put_tuple_element(notification, 2, ref); term_put_tuple_element(notification, 3, is_write ? READY_OUTPUT_ATOM : READY_INPUT_ATOM); + return notification; +} + +static void select_event_send_notification(struct SelectEvent *select_event, bool is_write, GlobalContext *global) +{ + BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap) + term notification = select_event_make_notification(select_event->resource->data, select_event->ref_ticks, is_write, &heap); globalcontext_send_message(global, select_event->local_pid, notification); if (is_write) { select_event->write = 0; diff --git a/src/libAtomVM/resources.h b/src/libAtomVM/resources.h index 78e7f90111..9295894a1d 100644 --- a/src/libAtomVM/resources.h +++ b/src/libAtomVM/resources.h @@ -25,6 +25,7 @@ #include "erl_nif.h" #include "list.h" +#include "memory.h" #ifdef __cplusplus extern "C" { @@ -66,7 +67,6 @@ struct SelectEvent struct RefcBinary *resource; bool read; bool write; - bool undefined_ref; bool close; int32_t local_pid; uint64_t ref_ticks; @@ -118,6 +118,18 @@ void select_event_count_and_destroy_closed(struct ListHead *select_events, size_ */ void destroy_resource_monitors(struct RefcBinary *resource, GlobalContext *global); +#define SELECT_EVENT_NOTIFICATION_SIZE (TUPLE_SIZE(4) + REF_SIZE + TERM_BOXED_RESOURCE_SIZE) + +/** + * @brief Build a select event notification. + * @param rsrc_obj the resource to build the notification for + * @param ref_ticks the reference or 0 if it's undefined + * @param is_write if the notification is for a write or a read + * @param heap the heap to create the notification in, should have enough memory + * available (see SELECT_EVENT_NOTIFICATION_SIZE) + */ +term select_event_make_notification(void *rsrc_obj, uint64_t ref_ticks, bool is_write, Heap *heap); + #ifdef __cplusplus } #endif diff --git a/src/platforms/esp32/components/avm_sys/CMakeLists.txt b/src/platforms/esp32/components/avm_sys/CMakeLists.txt index bf4596aa0a..bbfbc17691 100644 --- a/src/platforms/esp32/components/avm_sys/CMakeLists.txt +++ b/src/platforms/esp32/components/avm_sys/CMakeLists.txt @@ -60,9 +60,8 @@ list(TRANSFORM soc_include_dirs PREPEND ${soc_dir}/) list(TRANSFORM newlib_include_dirs PREPEND ${newlib_dir}/) list(TRANSFORM pthread_include_dirs PREPEND ${pthread_dir}/) -set(CMAKE_REQUIRED_INCLUDES ${soc_target_include_dir} ${soc_include_dirs} ${newlib_include_dirs} ${pthread_include_dirs} ${CMAKE_BINARY_DIR}/config/) +set(CMAKE_REQUIRED_INCLUDES ${CMAKE_REQUIRED_INCLUDES} ${soc_target_include_dir} ${soc_include_dirs} ${newlib_include_dirs} ${pthread_include_dirs} ${CMAKE_BINARY_DIR}/config/) -include(CheckIncludeFile) include(CheckSymbolExists) include(CheckCSourceCompiles) diff --git a/src/platforms/esp32/components/libatomvm/CMakeLists.txt b/src/platforms/esp32/components/libatomvm/CMakeLists.txt index 1e3ea93b48..a56df1ee13 100644 --- a/src/platforms/esp32/components/libatomvm/CMakeLists.txt +++ b/src/platforms/esp32/components/libatomvm/CMakeLists.txt @@ -19,9 +19,14 @@ # idf_component_register(INCLUDE_DIRS "${CMAKE_CURRENT_SOURCE_DIR}/../../../../libAtomVM") - add_subdirectory("${CMAKE_CURRENT_SOURCE_DIR}/../../../../libAtomVM" "libAtomVM") +# Force HAVE_SOCKET +# Automatically detecting it requires to put too many components include dirs +# in CMAKE_REQUIRED_INCLUDES as lwip includes freetos and many esp system components +set(HAVE_SOCKET ON) +target_compile_definitions(libAtomVM PUBLIC HAVE_SOCKET) + target_link_libraries(${COMPONENT_LIB} INTERFACE libAtomVM "-u platform_nifs_get_nif" "-u platform_defaultatoms_init") diff --git a/src/platforms/esp32/test/CMakeLists.txt b/src/platforms/esp32/test/CMakeLists.txt index 8ca0be1f6c..fa0c5cd9ac 100644 --- a/src/platforms/esp32/test/CMakeLists.txt +++ b/src/platforms/esp32/test/CMakeLists.txt @@ -32,6 +32,19 @@ set(TEST_COMPONENTS "testable" CACHE STRING "List of components to test") list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/../../../../CMakeModules") include($ENV{IDF_PATH}/tools/cmake/project.cmake) + +# Don't try to link when testing availability of functions +set(CMAKE_TRY_COMPILE_TARGET_TYPE "STATIC_LIBRARY") + +# mkfifo is defined in newlib header but not implemented +set(HAVE_MKFIFO NO) + +# Disable SMP with esp32 socs that have only one core +if (${IDF_TARGET} MATCHES "esp32s2|esp32c3|esp32h2") + message("Disabling SMP as selected target only has one core") + set(AVM_DISABLE_SMP YES FORCE) +endif() + project(atomvm-esp32-test) # esp-idf does not use compile_feature but instead sets version in diff --git a/src/platforms/rp2040/src/lib/CMakeLists.txt b/src/platforms/rp2040/src/lib/CMakeLists.txt index 2789c6a710..8fa7b034ad 100644 --- a/src/platforms/rp2040/src/lib/CMakeLists.txt +++ b/src/platforms/rp2040/src/lib/CMakeLists.txt @@ -76,8 +76,16 @@ if (PICO_CYW43_SUPPORTED) target_include_directories(pan_lwip_dhserver INTERFACE ${BTSTACK_3RD_PARTY_PATH}/lwip/dhcp-server ) + target_compile_options(libAtomVM${PLATFORM_LIB_SUFFIX} PRIVATE -DHAVE_LWIP_RAW=1) + target_sources( + libAtomVM${PLATFORM_LIB_SUFFIX} + PRIVATE + otp_socket_platform.c + ../../../../libAtomVM/otp_socket.c + otp_socket_platform.h + ../../../../libAtomVM/otp_socket.h) target_link_libraries(libAtomVM${PLATFORM_LIB_SUFFIX} PUBLIC pico_cyw43_arch_lwip_threadsafe_background pico_lwip_sntp INTERFACE pan_lwip_dhserver) - target_link_options(libAtomVM${PLATFORM_LIB_SUFFIX} PUBLIC "SHELL:-Wl,-u -Wl,networkregister_port_driver") + target_link_options(libAtomVM${PLATFORM_LIB_SUFFIX} PUBLIC "SHELL:-Wl,-u -Wl,networkregister_port_driver -Wl,-u -Wl,otp_socket_nif") endif() target_link_options(libAtomVM${PLATFORM_LIB_SUFFIX} PUBLIC "SHELL:-Wl,-u -Wl,gpio_nif") diff --git a/src/platforms/rp2040/src/lib/networkdriver.c b/src/platforms/rp2040/src/lib/networkdriver.c index c0169d0b02..fd2172fae1 100644 --- a/src/platforms/rp2040/src/lib/networkdriver.c +++ b/src/platforms/rp2040/src/lib/networkdriver.c @@ -76,6 +76,25 @@ struct NetworkDriverData int stas_count; uint8_t *stas_mac; struct dhcp_config *dhcp_config; + queue_t queue; +}; + +enum NetworkDriverEventType +{ + NetworkDriverEventTypeCyw43Assoc, + NetworkDriverEventTypeSTADisconnected, + NetworkDriverEventTypeSTAConnected, + NetworkDriverEventTypeGotIP, +}; + +struct NetworkDriverEvent +{ + enum NetworkDriverEventType type; + // Union of parameters + union + { + struct netif *netif; + }; }; // Callbacks do not allow for user data @@ -85,6 +104,8 @@ static struct NetworkDriverData *driver_data; static void network_driver_netif_status_cb(struct netif *netif); static void network_driver_cyw43_assoc_cb(bool assoc); +static void network_driver_do_cyw43_assoc(GlobalContext *glb); + static term tuple_from_addr(Heap *heap, uint32_t addr) { term terms[4]; @@ -107,27 +128,27 @@ static void send_term(Heap *heap, term t) port_send_message(driver_data->global, term_from_local_process_id(driver_data->owner_process_id), msg); } -static void send_sta_connected() +static void send_sta_connected(GlobalContext *glb) { // {Ref, sta_connected} BEGIN_WITH_STACK_HEAP(PORT_REPLY_SIZE, heap); { - send_term(&heap, globalcontext_make_atom(driver_data->global, sta_connected_atom)); + send_term(&heap, globalcontext_make_atom(glb, sta_connected_atom)); } - END_WITH_STACK_HEAP(heap, driver_data->global); + END_WITH_STACK_HEAP(heap, glb); } -static void send_sta_disconnected() +static void send_sta_disconnected(GlobalContext *glb) { // {Ref, sta_disconnected} BEGIN_WITH_STACK_HEAP(PORT_REPLY_SIZE, heap); { - send_term(&heap, globalcontext_make_atom(driver_data->global, sta_disconnected_atom)); + send_term(&heap, globalcontext_make_atom(glb, sta_disconnected_atom)); } - END_WITH_STACK_HEAP(heap, driver_data->global); + END_WITH_STACK_HEAP(heap, glb); } -static void send_got_ip(struct netif *netif) +static void send_got_ip(struct netif *netif, GlobalContext *glb) { // {Ref, {sta_got_ip, {{192, 168, 1, 2}, {255, 255, 255, 0}, {192, 168, 1, 1}}}} BEGIN_WITH_STACK_HEAP(PORT_REPLY_SIZE + TUPLE_SIZE(2) + TUPLE_SIZE(3) + TUPLE_SIZE(4) * 3, heap); @@ -137,23 +158,23 @@ static void send_got_ip(struct netif *netif) term gw = tuple_from_addr(&heap, ntohl(ip4_addr_get_u32(netif_ip4_gw(netif)))); term ip_info = port_heap_create_tuple3(&heap, ip, netmask, gw); - term reply = port_heap_create_tuple2(&heap, globalcontext_make_atom(driver_data->global, sta_got_ip_atom), ip_info); + term reply = port_heap_create_tuple2(&heap, globalcontext_make_atom(glb, sta_got_ip_atom), ip_info); send_term(&heap, reply); } - END_WITH_STACK_HEAP(heap, driver_data->global); + END_WITH_STACK_HEAP(heap, glb); } -static void send_ap_started() +static void send_ap_started(GlobalContext *glb) { // {Ref, ap_started} BEGIN_WITH_STACK_HEAP(PORT_REPLY_SIZE, heap); { - send_term(&heap, globalcontext_make_atom(driver_data->global, ap_started_atom)); + send_term(&heap, globalcontext_make_atom(glb, ap_started_atom)); } - END_WITH_STACK_HEAP(heap, driver_data->global); + END_WITH_STACK_HEAP(heap, glb); } -static void send_atom_mac(term atom, uint8_t *mac) +static void send_atom_mac(term atom, uint8_t *mac, GlobalContext *glb) { // {Ref, {ap_connected | ap_disconnected, <<1,2,3,4,5,6>>}} BEGIN_WITH_STACK_HEAP(PORT_REPLY_SIZE + TUPLE_SIZE(2) + TERM_BINARY_HEAP_SIZE(6), heap); @@ -162,17 +183,17 @@ static void send_atom_mac(term atom, uint8_t *mac) term reply = port_heap_create_tuple2(&heap, atom, mac_term); send_term(&heap, reply); } - END_WITH_STACK_HEAP(heap, driver_data->global); + END_WITH_STACK_HEAP(heap, glb); } -static void send_ap_sta_connected(uint8_t *mac) +static void send_ap_sta_connected(uint8_t *mac, GlobalContext *glb) { - send_atom_mac(globalcontext_make_atom(driver_data->global, ap_sta_connected_atom), mac); + send_atom_mac(globalcontext_make_atom(glb, ap_sta_connected_atom), mac, glb); } -static void send_ap_sta_disconnected(uint8_t *mac) +static void send_ap_sta_disconnected(uint8_t *mac, GlobalContext *glb) { - send_atom_mac(globalcontext_make_atom(driver_data->global, ap_sta_disconnected_atom), mac); + send_atom_mac(globalcontext_make_atom(glb, ap_sta_disconnected_atom), mac, glb); } static void send_sntp_sync(struct timeval *tv) @@ -249,10 +270,8 @@ static char *get_default_device_name() return buf; } -static void network_driver_cyw43_assoc_cb(bool assoc) +static void network_driver_do_cyw43_assoc(GlobalContext *glb) { - UNUSED(assoc); - int max_stas; cyw43_wifi_ap_get_max_stas(&cyw43_state, &max_stas); uint8_t *new_macs = malloc(6 * max_stas); @@ -268,7 +287,7 @@ static void network_driver_cyw43_assoc_cb(bool assoc) } } if (new_mac) { - send_ap_sta_connected(&new_macs[6 * i]); + send_ap_sta_connected(&new_macs[6 * i], glb); } } // Determine old macs @@ -281,7 +300,7 @@ static void network_driver_cyw43_assoc_cb(bool assoc) } } if (old_mac) { - send_ap_sta_disconnected(&driver_data->stas_mac[6 * j]); + send_ap_sta_disconnected(&driver_data->stas_mac[6 * j], glb); } } free(driver_data->stas_mac); @@ -290,6 +309,15 @@ static void network_driver_cyw43_assoc_cb(bool assoc) driver_data->stas_count = nb_stas; } +static void network_driver_cyw43_assoc_cb(bool assoc) +{ + UNUSED(assoc); + + struct NetworkDriverEvent event; + event.type = NetworkDriverEventTypeCyw43Assoc; + sys_try_post_listener_event_from_isr(driver_data->global, &driver_data->queue, &event); +} + static term setup_dhcp_server() { int max_stas; @@ -383,7 +411,7 @@ static term start_ap(term ap_config, GlobalContext *global) uint32_t auth = (psk == NULL) ? CYW43_AUTH_OPEN : CYW43_AUTH_WPA2_AES_PSK; cyw43_state.assoc_cb = network_driver_cyw43_assoc_cb; cyw43_arch_enable_ap_mode(ssid, psk, auth); - send_ap_started(); + send_ap_started(global); free(ssid); free(psk); @@ -449,13 +477,58 @@ static void network_driver_netif_status_cb(struct netif *netif) cyw43_arch_lwip_end(); if (link_status != previous_link_status) { if (link_status == CYW43_LINK_DOWN) { - send_sta_disconnected(); + struct NetworkDriverEvent event; + event.type = NetworkDriverEventTypeSTADisconnected; + sys_try_post_listener_event_from_isr(driver_data->global, &driver_data->queue, &event); } else if (link_status == CYW43_LINK_JOIN) { - send_sta_connected(); + struct NetworkDriverEvent event; + event.type = NetworkDriverEventTypeSTAConnected; + sys_try_post_listener_event_from_isr(driver_data->global, &driver_data->queue, &event); } else if (link_status == CYW43_LINK_UP) { - send_got_ip(netif); + struct NetworkDriverEvent event; + event.type = NetworkDriverEventTypeGotIP; + event.netif = netif; + sys_try_post_listener_event_from_isr(driver_data->global, &driver_data->queue, &event); + } + } +} + +static EventListener *network_events_handler(GlobalContext *glb, EventListener *listener) +{ + struct NetworkDriverEvent event; + while (queue_try_remove(listener->queue, &event)) { + switch (event.type) { + case NetworkDriverEventTypeCyw43Assoc: + network_driver_do_cyw43_assoc(glb); + break; + case NetworkDriverEventTypeSTADisconnected: + send_sta_disconnected(glb); + break; + case NetworkDriverEventTypeSTAConnected: + send_sta_connected(glb); + break; + case NetworkDriverEventTypeGotIP: + send_got_ip(event.netif, glb); + break; } } + return listener; +} + +static void init_driver_data(GlobalContext *glb) +{ + driver_data = malloc(sizeof(struct NetworkDriverData)); + driver_data->sntp_hostname = NULL; + driver_data->stas_mac = NULL; + driver_data->dhcp_config = NULL; + driver_data->global = glb; + queue_init(&driver_data->queue, sizeof(struct NetworkDriverEvent), EVENT_QUEUE_LEN); + + EventListener *network_listener = malloc(sizeof(EventListener)); + + network_listener->handler = network_events_handler; + network_listener->queue = &driver_data->queue; + sys_register_listener(glb, network_listener); } static void start_network(Context *ctx, term pid, term ref, term config) @@ -467,12 +540,8 @@ static void start_network(Context *ctx, term pid, term ref, term config) } if (driver_data == NULL) { - driver_data = malloc(sizeof(struct NetworkDriverData)); - driver_data->sntp_hostname = NULL; - driver_data->stas_mac = NULL; - driver_data->dhcp_config = NULL; + init_driver_data(ctx->global); } - driver_data->global = ctx->global; driver_data->owner_process_id = term_to_local_process_id(pid); driver_data->ref_ticks = term_to_ref_ticks(ref); driver_data->link_status = CYW43_LINK_DOWN; @@ -593,8 +662,6 @@ void network_driver_init(GlobalContext *global) void network_driver_destroy(GlobalContext *global) { - UNUSED(global); - if (driver_data) { free(driver_data->sntp_hostname); free(driver_data->stas_mac); @@ -602,6 +669,8 @@ void network_driver_destroy(GlobalContext *global) dhserv_free(); } free(driver_data->dhcp_config); + sys_unregister_listener_from_event(global, &driver_data->queue); + queue_free(&driver_data->queue); } free(driver_data); driver_data = NULL; diff --git a/src/platforms/rp2040/src/lib/otp_socket_platform.c b/src/platforms/rp2040/src/lib/otp_socket_platform.c new file mode 100644 index 0000000000..f257a40312 --- /dev/null +++ b/src/platforms/rp2040/src/lib/otp_socket_platform.c @@ -0,0 +1,70 @@ +/* + * This file is part of AtomVM. + * + * Copyright 2023 by Fred Dushin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + */ + +#ifdef LIB_PICO_CYW43_ARCH + +#include +#include +#include + +struct OTPSocketNIFGlobalData +{ + GlobalContext *global; + queue_t queue; +}; + +static struct OTPSocketNIFGlobalData OTPSocketGlobalData; + +static EventListener *otp_socket_event_handler(GlobalContext *glb, EventListener *listener) +{ + UNUSED(glb); + + struct LWIPEvent event; + while (queue_try_remove(listener->queue, &event)) { + event.handler(&event); + } + return listener; +} + +static void otp_socket_nif_init(GlobalContext *global) +{ + OTPSocketGlobalData.global = global; + queue_init(&OTPSocketGlobalData.queue, sizeof(struct LWIPEvent), EVENT_QUEUE_LEN); + + EventListener *network_listener = malloc(sizeof(EventListener)); + + network_listener->handler = otp_socket_event_handler; + network_listener->queue = &OTPSocketGlobalData.queue; + sys_register_listener(global, network_listener); +} + +static void otp_socket_nif_destroy(GlobalContext *global) +{ + sys_unregister_listener_from_event(global, &OTPSocketGlobalData.queue); +} + +void otp_socket_lwip_enqueue(struct LWIPEvent *event) +{ + sys_try_post_listener_event_from_isr(OTPSocketGlobalData.global, &OTPSocketGlobalData.queue, event); +} + +REGISTER_NIF_COLLECTION(otp_socket, otp_socket_nif_init, otp_socket_nif_destroy, otp_socket_nif_get_nif) + +#endif diff --git a/src/platforms/rp2040/src/lib/otp_socket_platform.h b/src/platforms/rp2040/src/lib/otp_socket_platform.h new file mode 100644 index 0000000000..bb60e4535c --- /dev/null +++ b/src/platforms/rp2040/src/lib/otp_socket_platform.h @@ -0,0 +1,49 @@ +/* + * This file is part of AtomVM. + * + * Copyright 2023 by Fred Dushin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + */ + +#ifndef __OTP_SOCKET_PLATFORM_H__ +#define __OTP_SOCKET_PLATFORM_H__ + +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpedantic" + +#include + +#pragma GCC diagnostic pop + +#define TAG "otp_socket" + +#define AVM_LOGI(tag, format, ...) \ + printf("I %s: " format " (%s:%i)\n", tag, ##__VA_ARGS__, __FILE__, __LINE__); + +#define AVM_LOGW(tag, format, ...) \ + printf("W %s: " format " (%s:%i)\n", tag, ##__VA_ARGS__, __FILE__, __LINE__); + +#define AVM_LOGE(tag, format, ...) \ + printf("E %s: " format " (%s:%i)\n", tag, ##__VA_ARGS__, __FILE__, __LINE__); + +// Define macros to lock/unlock lwIP +#define LWIP_BEGIN() cyw43_arch_lwip_begin() +#define LWIP_END() cyw43_arch_lwip_end() + +#endif diff --git a/src/platforms/rp2040/src/lib/rp2040_sys.h b/src/platforms/rp2040/src/lib/rp2040_sys.h index 728c0fd57f..58917ce723 100644 --- a/src/platforms/rp2040/src/lib/rp2040_sys.h +++ b/src/platforms/rp2040/src/lib/rp2040_sys.h @@ -23,7 +23,14 @@ #include +// Pico SDK +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpedantic" + #include +#include + +#pragma GCC diagnostic pop #include "sys.h" @@ -65,12 +72,46 @@ nif_collection_list = &NAME##_nif_collection_def_list_item; \ } +#define EVENT_QUEUE_LEN 16 + +typedef queue_t *listener_event_t; + +struct EventListener +{ + struct ListHead listeners_list_head; + event_handler_t handler; + listener_event_t queue; +}; + +/** + * @brief Post an event from ISR to trigger a listener call from task context. + * @param global the global context + * @param listener_queue the listener's queue (EventListener->queue) + * @param event the event to enqueue (copied) + * @returns true if successful, false if either queue is full + * @details This function should be called from ISR callbacks to postpone + * processing in task context (from sys_poll_events). If the system is overloaded + * with events, it prints a message to stderr and returns false. + * @end + */ +bool sys_try_post_listener_event_from_isr(GlobalContext *global, listener_event_t listener_queue, const void *event); + +/** + * @brief Unregister a listener using its queue. + * @param global the global context + * @param listener_queue the listener's queue (EventListener->queue) + * @details We can avoid storing the listeners in platform data. + * @end + */ +void sys_unregister_listener_from_event(GlobalContext *global, listener_event_t listener_queue); + struct RP2040PlatformData { #ifndef AVM_NO_SMP mutex_t event_poll_mutex; cond_t event_poll_cond; #endif + queue_t event_queue; }; typedef void (*port_driver_init_t)(GlobalContext *global); diff --git a/src/platforms/rp2040/src/lib/sys.c b/src/platforms/rp2040/src/lib/sys.c index 716766d9b1..842b513712 100644 --- a/src/platforms/rp2040/src/lib/sys.c +++ b/src/platforms/rp2040/src/lib/sys.c @@ -21,6 +21,7 @@ #include // C11 +#include #include // Pico SDK @@ -30,6 +31,7 @@ #include #include #include +#include #include #ifdef LIB_PICO_CYW43_ARCH @@ -38,6 +40,10 @@ #pragma GCC diagnostic pop +#ifdef LIB_PICO_CYW43_ARCH +#include +#endif + // libAtomVM #include #include @@ -45,6 +51,12 @@ #include "rp2040_sys.h" +// #define ENABLE_TRACE +#include "trace.h" + +// Platform uses listeners +#include "listeners.h" + struct PortDriverDefListItem *port_driver_list; struct NifCollectionDefListItem *nif_collection_list; @@ -57,9 +69,11 @@ void sys_init_platform(GlobalContext *glb) cond_init(&platform->event_poll_cond); smp_init(); #endif + queue_init(&platform->event_queue, sizeof(queue_t *), EVENT_QUEUE_LEN); #ifdef LIB_PICO_CYW43_ARCH cyw43_arch_init(); + otp_socket_init(glb); #endif } @@ -70,6 +84,7 @@ void sys_free_platform(GlobalContext *glb) #endif struct RP2040PlatformData *platform = glb->platform_data; + queue_free(&platform->event_queue); free(platform); #ifndef AVM_NO_SMP @@ -77,35 +92,54 @@ void sys_free_platform(GlobalContext *glb) #endif } +bool sys_try_post_listener_event_from_isr(GlobalContext *glb, listener_event_t listener_queue, const void *event) +{ + struct RP2040PlatformData *platform = glb->platform_data; + if (UNLIKELY(!queue_try_add(listener_queue, event))) { + fprintf(stderr, "Lost event from ISR as listener queue is full. System is overloaded or EVENT_QUEUE_LEN is too low\n"); + return false; + } + + if (UNLIKELY(!queue_try_add(&platform->event_queue, &listener_queue))) { + fprintf(stderr, "Lost event from ISR as global event queue is full. System is overloaded or EVENT_QUEUE_LEN is too low\n"); + return false; + } + #ifndef AVM_NO_SMP + sys_signal(glb); +#endif + + return true; +} + void sys_poll_events(GlobalContext *glb, int timeout_ms) { + struct RP2040PlatformData *platform = glb->platform_data; +#ifndef AVM_NO_SMP if (timeout_ms > 0) { - struct RP2040PlatformData *platform = glb->platform_data; mutex_enter_blocking(&platform->event_poll_mutex); cond_wait_timeout_ms(&platform->event_poll_cond, &platform->event_poll_mutex, timeout_ms); mutex_exit(&platform->event_poll_mutex); } +#endif + queue_t *event = NULL; + while (queue_try_remove(&platform->event_queue, &event)) { + struct ListHead *listeners = synclist_wrlock(&glb->listeners); + if (!process_listener_handler(glb, event, listeners, NULL, NULL)) { + TRACE("sys: handler not found for: %p\n", (void *) event); + } + synclist_unlock(&glb->listeners); + } } +#ifndef AVM_NO_SMP void sys_signal(GlobalContext *glb) { struct RP2040PlatformData *platform = glb->platform_data; cond_signal(&platform->event_poll_cond); } -#else -void sys_poll_events(GlobalContext *glb, int timeout_ms) -{ - UNUSED(glb); - UNUSED(timeout_ms); -} #endif -void sys_listener_destroy(struct ListHead *item) -{ - UNUSED(item); -} - void sys_register_select_event(GlobalContext *global, ErlNifEvent event, bool is_write) { UNUSED(global); @@ -254,3 +288,58 @@ const struct Nif *nif_collection_resolve_nif(const char *name) return NULL; } + +static void event_listener_add_to_polling_set(struct EventListener *listener, GlobalContext *glb) +{ + UNUSED(listener); + UNUSED(glb); +} + +static void listener_event_remove_from_polling_set(listener_event_t event, GlobalContext *glb) +{ + UNUSED(event); + UNUSED(glb); +} + +static bool event_listener_is_event(EventListener *listener, listener_event_t event) +{ + return listener->queue == event; +} + +void sys_register_listener(GlobalContext *global, struct EventListener *listener) +{ + struct ListHead *listeners = synclist_wrlock(&global->listeners); +#ifndef AVM_NO_SMP + sys_signal(global); +#endif + list_append(listeners, &listener->listeners_list_head); + synclist_unlock(&global->listeners); +} + +void sys_unregister_listener(GlobalContext *global, struct EventListener *listener) +{ + struct ListHead *dummy = synclist_wrlock(&global->listeners); + UNUSED(dummy); +#ifndef AVM_NO_SMP + sys_signal(global); +#endif + list_remove(&listener->listeners_list_head); + synclist_unlock(&global->listeners); +} + +void sys_unregister_listener_from_event(GlobalContext *global, listener_event_t event) +{ + struct ListHead *list = synclist_wrlock(&global->listeners); +#ifndef AVM_NO_SMP + sys_signal(global); +#endif + struct ListHead *item; + LIST_FOR_EACH (item, list) { + struct EventListener *listener = GET_LIST_ENTRY(item, struct EventListener, listeners_list_head); + if (event_listener_is_event(listener, event)) { + list_remove(&listener->listeners_list_head); + break; + } + } + synclist_unlock(&global->listeners); +} diff --git a/tests/libs/estdlib/test_tcp_socket.erl b/tests/libs/estdlib/test_tcp_socket.erl index d6ed12a9b2..5f8d540a79 100644 --- a/tests/libs/estdlib/test_tcp_socket.erl +++ b/tests/libs/estdlib/test_tcp_socket.erl @@ -25,6 +25,7 @@ test() -> ok = test_echo_server(), ok = test_shutdown(), + ok = test_close_by_another_process(), case get_otp_version() of atomvm -> ok = test_abandon_select(); @@ -272,6 +273,26 @@ test_abandon_select() -> erlang:garbage_collect(), ok. +test_close_by_another_process() -> + % socket:recv is blocking and the only way to interrupt it is to close + % the socket. + etest:flush_msg_queue(), + + Port = 44404, + ListenSocket = start_echo_server(Port), + + {ok, ClientSocket1} = socket:open(inet, stream, tcp), + ok = try_connect(ClientSocket1, Port, 10), + + spawn_link(fun() -> + timer:sleep(500), + ok = socket:close(ClientSocket1) + end), + % recv is blocking + {error, closed} = socket:recv(ClientSocket1, 0, 5000), + + close_listen_socket(ListenSocket). + get_otp_version() -> case erlang:system_info(machine) of "BEAM" ->