From 2110f56f4a782afa2fcb908d6ace373ad9eefee1 Mon Sep 17 00:00:00 2001 From: Paul Guyot Date: Sun, 15 Oct 2023 21:31:29 +0200 Subject: [PATCH] Add support for PicoW networking Port OTP Socket code to lwIP raw API Also add socket:recv/2,3 and socket:recvfrom/2,3 Signed-off-by: Paul Guyot --- examples/erlang/rp2040/CMakeLists.txt | 2 + examples/erlang/rp2040/picow_tcp_server.erl | 89 + examples/erlang/rp2040/picow_udp_beacon.erl | 58 + libs/estdlib/src/socket.erl | 84 +- src/libAtomVM/CMakeLists.txt | 2 + src/libAtomVM/otp_socket.c | 1955 ++++++++++++----- src/libAtomVM/otp_socket.h | 8 + src/libAtomVM/posix_nifs.c | 6 + src/libAtomVM/resources.c | 2 +- src/libAtomVM/term.h | 3 + src/platforms/rp2040/src/lib/CMakeLists.txt | 10 +- .../rp2040/src/lib/otp_socket_platform.c | 29 + .../rp2040/src/lib/otp_socket_platform.h | 52 + src/platforms/rp2040/src/lib/sys.c | 5 + 14 files changed, 1745 insertions(+), 560 deletions(-) create mode 100644 examples/erlang/rp2040/picow_tcp_server.erl create mode 100644 examples/erlang/rp2040/picow_udp_beacon.erl create mode 100644 src/platforms/rp2040/src/lib/otp_socket_platform.c create mode 100644 src/platforms/rp2040/src/lib/otp_socket_platform.h diff --git a/examples/erlang/rp2040/CMakeLists.txt b/examples/erlang/rp2040/CMakeLists.txt index 02bc018487..e1f0b3db0d 100644 --- a/examples/erlang/rp2040/CMakeLists.txt +++ b/examples/erlang/rp2040/CMakeLists.txt @@ -28,3 +28,5 @@ pack_uf2(pico_rtc pico_rtc) pack_uf2(picow_blink picow_blink) pack_uf2(picow_wifi_sta picow_wifi_sta) pack_uf2(picow_wifi_ap picow_wifi_ap) +pack_uf2(picow_udp_beacon picow_udp_beacon) +pack_uf2(picow_tcp_server picow_tcp_server) diff --git a/examples/erlang/rp2040/picow_tcp_server.erl b/examples/erlang/rp2040/picow_tcp_server.erl new file mode 100644 index 0000000000..053bf170c2 --- /dev/null +++ b/examples/erlang/rp2040/picow_tcp_server.erl @@ -0,0 +1,89 @@ +% +% This file is part of AtomVM. +% +% Copyright 2023 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +-module(picow_tcp_server). + +-export([start/0]). + +-spec start() -> no_return(). +start() -> + Config = [ + {sta, [ + {ssid, <<"myssid">>}, + {psk, <<"mypsk">>}, + {got_ip, fun got_ip/1} + ]} + ], + case network:start(Config) of + {ok, _Pid} -> + timer:sleep(infinity); + Error -> + erlang:display(Error) + end. + +got_ip({IPv4, Netmask, Gateway}) -> + io:format("Got IP: ip=~p, netmask=~p, gateway=~p.\n", [IPv4, Netmask, Gateway]), + setup(). + +setup() -> + {ok, ListeningSocket} = socket:open(inet, stream, tcp), + + ok = socket:setopt(ListeningSocket, {socket, reuseaddr}, true), + ok = socket:setopt(ListeningSocket, {socket, linger}, #{onoff => true, linger => 0}), + + ok = socket:bind(ListeningSocket, #{family => inet, addr => any, port => 44404}), + ok = socket:listen(ListeningSocket), + io:format("Listening on ~p.~n", [socket:sockname(ListeningSocket)]), + + spawn(fun() -> accept(ListeningSocket) end). + +accept(ListeningSocket) -> + io:format("Waiting to accept connection...~n"), + case socket:accept(ListeningSocket) of + {ok, ConnectedSocket} -> + io:format("Accepted connection. local: ~p peer: ~p~n", [ + socket:sockname(ConnectedSocket), socket:peername(ConnectedSocket) + ]), + spawn(fun() -> accept(ListeningSocket) end), + echo(ConnectedSocket); + {error, Reason} -> + io:format("An error occurred accepting connection: ~p~n", [Reason]) + end. + +-spec echo(ConnectedSocket :: socket:socket()) -> no_return(). +echo(ConnectedSocket) -> + io:format("Waiting to receive data...~n"), + case socket:recv(ConnectedSocket) of + {ok, Data} -> + io:format("Received data ~p from ~p. Echoing back...~n", [ + Data, socket:peername(ConnectedSocket) + ]), + case socket:send(ConnectedSocket, Data) of + ok -> + io:format("All data was sent~n"); + {ok, Rest} -> + io:format("Some data was sent. Remaining: ~p~n", [Rest]); + {error, Reason} -> + io:format("An error occurred sending data: ~p~n", [Reason]) + end, + echo(ConnectedSocket); + {error, Reason} -> + io:format("An error occurred waiting on a connected socket: ~p~n", [Reason]) + end. diff --git a/examples/erlang/rp2040/picow_udp_beacon.erl b/examples/erlang/rp2040/picow_udp_beacon.erl new file mode 100644 index 0000000000..b555147af1 --- /dev/null +++ b/examples/erlang/rp2040/picow_udp_beacon.erl @@ -0,0 +1,58 @@ +% +% This file is part of AtomVM. +% +% Copyright 2023 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +-module(picow_udp_beacon). + +-export([start/0]). + +start() -> + Self = self(), + Config = [ + {sta, [ + {ssid, <<"myssid">>}, + {psk, <<"mypsk">>}, + {got_ip, fun(IPInfo) -> got_ip(Self, IPInfo) end} + ]} + ], + case network:start(Config) of + {ok, _Pid} -> + loop(disconnected); + Error -> + erlang:display(Error) + end. + +got_ip(Parent, {IPv4, Netmask, Gateway}) -> + io:format("Got IP: ip=~p, netmask=~p, gateway=~p.\n", [IPv4, Netmask, Gateway]), + Parent ! connected. + +loop(disconnected) -> + receive + connected -> + {ok, UDPSocket} = socket:open(inet, dgram, udp), + loop({UDPSocket, 0}) + end; +loop({UDPSocket, N}) -> + Message = io_lib:format("AtomVM beacon #~B\n", [N]), + ok = socket:sendto(UDPSocket, Message, #{ + family => inet, port => 4444, addr => {255, 255, 255, 255} + }), + io:format("Sent beacon #~B\n", [N]), + timer:sleep(1000), + loop({UDPSocket, N + 1}). diff --git a/libs/estdlib/src/socket.erl b/libs/estdlib/src/socket.erl index e8741a5196..3daa2181f6 100644 --- a/libs/estdlib/src/socket.erl +++ b/libs/estdlib/src/socket.erl @@ -30,7 +30,11 @@ sockname/1, peername/1, recv/1, + recv/2, + recv/3, recvfrom/1, + recvfrom/2, + recvfrom/3, send/2, sendto/3, setopt/3, @@ -246,9 +250,7 @@ accept(Socket, Timeout) -> E; R -> R - end; - {closed, Ref} -> - {error, closed} + end after Timeout -> {error, timeout} end; @@ -257,30 +259,39 @@ accept(Socket, Timeout) -> end. %%----------------------------------------------------------------------------- -%% @equiv socket:recv(Socket, infinity) +%% @equiv socket:recv(Socket, 0) %% @end %%----------------------------------------------------------------------------- -spec recv(Socket :: socket()) -> {ok, Data :: binary()} | {error, Reason :: term()}. recv(Socket) -> - recv(Socket, infinity). + recv(Socket, 0, infinity). + +%%----------------------------------------------------------------------------- +%% @equiv socket:recv(Socket, Length, infinity) +%% @end +%%----------------------------------------------------------------------------- +-spec recv(Socket :: socket(), Length :: non_neg_integer()) -> + {ok, Data :: binary()} | {error, Reason :: term()}. +recv(Socket, Length) -> + recv(Socket, Length, infinity). %%----------------------------------------------------------------------------- %% @param Socket the socket +%% @param Length number of bytes to receive %% @param Timeout timeout (in milliseconds) %% @returns `{ok, Data}' if successful; `{error, Reason}', otherwise. %% @doc Receive data on the specified socket. %% -%% Note that this function will block until data is received -%% on the socket. +%% This function is equivalent to `recvfrom/3' except for the return type. %% %% Example: %% %% `{ok, Data} = socket:recv(ConnectedSocket)' %% @end %%----------------------------------------------------------------------------- --spec recv(Socket :: socket(), Timeout :: timeout()) -> +-spec recv(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) -> {ok, Data :: binary()} | {error, Reason :: term()}. -recv(Socket, Timeout) -> +recv(Socket, Length, Timeout) -> Self = self(), Ref = erlang:make_ref(), ?TRACE("select read for recv. self=~p ref=~p~n", [Self, Ref]), @@ -288,15 +299,15 @@ recv(Socket, Timeout) -> ok -> receive {select, _AcceptedSocket, Ref, ready_input} -> - case ?MODULE:nif_recv(Socket) of - {error, closed} = E -> + case ?MODULE:nif_recv(Socket, Length) of + {error, _} = E -> ?MODULE:nif_select_stop(Socket), E; - R -> - R - end; - {closed, Ref} -> - {error, closed} + % TODO: Assemble data to have more if Length > byte_size(Data) + % as long as timeout did not expire + {ok, Data} -> + {ok, Data} + end after Timeout -> {error, timeout} end; @@ -305,16 +316,26 @@ recv(Socket, Timeout) -> end. %%----------------------------------------------------------------------------- -%% @equiv socket:recvfrom(Socket, infinity) +%% @equiv socket:recvfrom(Socket, 0) %% @end %%----------------------------------------------------------------------------- -spec recvfrom(Socket :: socket()) -> {ok, {Address :: sockaddr(), Data :: binary()}} | {error, Reason :: term()}. recvfrom(Socket) -> - recvfrom(Socket, infinity). + recvfrom(Socket, 0). + +%%----------------------------------------------------------------------------- +%% @equiv socket:recvfrom(Socket, Length, infinity) +%% @end +%%----------------------------------------------------------------------------- +-spec recvfrom(Socket :: socket(), Length :: non_neg_integer()) -> + {ok, {Address :: sockaddr(), Data :: binary()}} | {error, Reason :: term()}. +recvfrom(Socket, Length) -> + recvfrom(Socket, Length, infinity). %%----------------------------------------------------------------------------- %% @param Socket the socket +%% @param Length number of bytes to receive %% @param Timeout timeout (in milliseconds) %% @returns `{ok, {Address, Data}}' if successful; `{error, Reason}', otherwise. %% @doc Receive data on the specified socket, returning the from address. @@ -325,11 +346,20 @@ recvfrom(Socket) -> %% Example: %% %% `{ok, {Address, Data}} = socket:recvfrom(ConnectedSocket)' +%% +%% If socket is UDP, the function retrieves the first available packet and +%% truncate it to Length bytes, unless Length is 0 in which case it returns +%% the whole packet ("all available"). +%% +%% If socket is TCP and Length is 0, this function retrieves all available +%% data without waiting (using peek if the platform allows it). +%% If socket is TCP and Length is not 0, this function waits until Length +%% bytes are available and return these bytes. %% @end %%----------------------------------------------------------------------------- --spec recvfrom(Socket :: socket(), Timeout :: timeout()) -> +-spec recvfrom(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) -> {ok, {Address :: sockaddr(), Data :: binary()}} | {error, Reason :: term()}. -recvfrom(Socket, Timeout) -> +recvfrom(Socket, Length, Timeout) -> Self = self(), Ref = erlang:make_ref(), ?TRACE("select read for recvfrom. self=~p ref=~p", [Self, Ref]), @@ -337,15 +367,15 @@ recvfrom(Socket, Timeout) -> ok -> receive {select, _AcceptedSocket, Ref, ready_input} -> - case ?MODULE:nif_recvfrom(Socket) of - {error, closed} = E -> + case ?MODULE:nif_recvfrom(Socket, Length) of + {error, _} = E -> ?MODULE:nif_select_stop(Socket), E; - R -> - R - end; - {closed, Ref} -> - {error, closed} + % TODO: Assemble data to have more if Length > byte_size(Data) + % as long as timeout did not expire + {ok, {Address, Data}} -> + {ok, {Address, Data}} + end after Timeout -> {error, timeout} end; diff --git a/src/libAtomVM/CMakeLists.txt b/src/libAtomVM/CMakeLists.txt index d06e0dd2ef..a32823eb68 100644 --- a/src/libAtomVM/CMakeLists.txt +++ b/src/libAtomVM/CMakeLists.txt @@ -178,6 +178,8 @@ define_if_symbol_exists(libAtomVM O_SEARCH "fcntl.h" PRIVATE HAVE_O_SEARCH) define_if_symbol_exists(libAtomVM O_TTY_INIT "fcntl.h" PRIVATE HAVE_O_TTY_INIT) define_if_symbol_exists(libAtomVM clock_settime "time.h" PRIVATE HAVE_CLOCK_SETTIME) define_if_symbol_exists(libAtomVM settimeofday "sys/time.h" PRIVATE HAVE_SETTIMEOFDAY) +define_if_symbol_exists(libAtomVM socket "sys/socket.h" PUBLIC HAVE_SOCKET) +define_if_symbol_exists(libAtomVM select "sys/select.h" PUBLIC HAVE_SELECT) if (AVM_USE_32BIT_FLOAT) target_compile_definitions(libAtomVM PUBLIC AVM_USE_SINGLE_PRECISION) diff --git a/src/libAtomVM/otp_socket.c b/src/libAtomVM/otp_socket.c index bcd5139ec3..cc2b904e8c 100644 --- a/src/libAtomVM/otp_socket.c +++ b/src/libAtomVM/otp_socket.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -36,7 +37,10 @@ #include #include +// We use errno to report errors with both BSD Sockets and LWIP #include + +#if OTP_SOCKET_BSD #include #include #include @@ -45,17 +49,123 @@ #if HAVE_SIGNAL #include #endif +#elif OTP_SOCKET_LWIP +#include +#include +#else +#error OTP Socket requires BSD Socket or lwIP +#endif // #define ENABLE_TRACE #include -struct SocketFd +// To factorize parsing of erlang term options, we define few constants for lwIP. +#if OTP_SOCKET_LWIP +enum ShutdownHow +{ + SHUT_RD, + SHUT_WR, + SHUT_RDWR, +}; + +enum SocketProtocol +{ + IPPROTO_IP, + IPPROTO_TCP, + IPPROTO_UDP +}; + +enum SocketDomain +{ + PF_INET +}; + +enum SocketType +{ + SOCK_STREAM, + SOCK_DGRAM +}; +#endif + +// Additional LWIP definitions +#if OTP_SOCKET_LWIP +enum SocketState +{ + // Bits + SocketStateClosed = 0, + SocketStateUDP = 1 << 1, + SocketStateTCP = 1 << 2, + SocketStateSelectingRead = 1 << 3, + SocketStateSelectedOneClient = 1 << 4, + SocketStateConnected = 1 << 5, + SocketStateListening = 1 << 6, + SocketStateTrapped = 1 << 7, + + // Actual states + SocketStateUDPIdle = SocketStateUDP, + SocketStateUDPSelectingRead = SocketStateUDP | SocketStateSelectingRead, + SocketStateTCPNew = SocketStateTCP, + SocketStateTCPConnected = SocketStateTCP | SocketStateConnected, + SocketStateTCPConnecting = SocketStateTCPConnected | SocketStateTrapped, + SocketStateTCPSelectingRead = SocketStateTCPConnected | SocketStateSelectingRead, + SocketStateTCPListening = SocketStateTCP | SocketStateListening, + SocketStateTCPAccepting = SocketStateTCPListening | SocketStateTrapped, + SocketStateTCPSelectingAccept = SocketStateTCPListening | SocketStateSelectingRead, + SocketStateTCPSelectingOneClient = SocketStateTCPListening | SocketStateSelectingRead | SocketStateSelectedOneClient, + SocketStateTCPSelectStopOneClient = SocketStateTCPListening | SocketStateSelectedOneClient, +}; + +struct TCPReceivedItem +{ + struct ListHead list_head; + struct pbuf *buf; + err_t err; +}; + +struct UDPReceivedItem +{ + struct ListHead list_head; + struct pbuf *buf; + const ip_addr_t *addr; + u16_t port; +}; + +static err_t tcp_recv_cb(void *arg, struct tcp_pcb *tpcb, struct pbuf *p, err_t err); +static void udp_recv_cb(void *arg, struct udp_pcb *pcb, struct pbuf *p, const ip_addr_t *addr, u16_t port); + +#endif + +#if OTP_SOCKET_BSD +struct SocketResource { int fd; uint64_t ref_ticks; int32_t selecting_process_id; ErlNifMonitor selecting_process_monitor; }; +#elif OTP_SOCKET_LWIP +struct SocketResource +{ + enum SocketState socket_state; + union + { + struct tcp_pcb *tcp_pcb; + struct udp_pcb *udp_pcb; + }; + uint64_t ref_ticks; + int32_t selecting_process_id; // trapped or selecting + ErlNifMonitor selecting_process_monitor; + bool linger_on; + int linger_sec; + size_t pos; + union + { + struct ListHead tcp_received_list; + struct ListHead udp_received_list; + struct SocketResource *selected_client; + }; +}; +#endif static const char *const addr_atom = ATOM_STR("\x4", "addr"); static const char *const any_atom = ATOM_STR("\x3", "any"); @@ -127,7 +237,10 @@ static const AtomStringIntPair otp_socket_shutdown_direction_table[] = { }; #define DEFAULT_BUFFER_SIZE 512 + +#ifndef MIN #define MIN(A, B) (((A) < (B)) ? (A) : (B)) +#endif static ErlNifResourceType *socket_resource_type; @@ -139,28 +252,49 @@ static void socket_dtor(ErlNifEnv *caller_env, void *obj) { UNUSED(caller_env); - struct SocketFd *fd_obj = (struct SocketFd *) obj; - if (fd_obj->fd != CLOSED_FD) { - close(fd_obj->fd); - fd_obj->fd = CLOSED_FD; + struct SocketResource *rsrc_obj = (struct SocketResource *) obj; +#if OTP_SOCKET_BSD + if (rsrc_obj->fd != CLOSED_FD) { + close(rsrc_obj->fd); + rsrc_obj->fd = CLOSED_FD; + } +#elif OTP_SOCKET_LWIP + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateUDP) { + udp_remove(rsrc_obj->udp_pcb); + rsrc_obj->udp_pcb = NULL; + } else if (rsrc_obj->socket_state & SocketStateTCP) { + // Try to nicely close the connection here + // NOTE: we can't handle linger if the socket is gone. + if (UNLIKELY(tcp_close(rsrc_obj->tcp_pcb) != ERR_OK)) { + // The resource will be gone, there is not much we can do here. + if (!(rsrc_obj->socket_state & SocketStateTCPListening)) { + tcp_abort(rsrc_obj->tcp_pcb); + } + } + rsrc_obj->tcp_pcb = NULL; } + LWIP_END(); +#endif } +#if OTP_SOCKET_BSD static void socket_stop(ErlNifEnv *caller_env, void *obj, ErlNifEvent event, int is_direct_call) { UNUSED(caller_env); UNUSED(event); UNUSED(is_direct_call); - struct SocketFd *fd_obj = (struct SocketFd *) obj; + struct SocketResource *rsrc_obj = (struct SocketResource *) obj; - if (fd_obj->selecting_process_id != INVALID_PROCESS_ID) { - enif_demonitor_process(caller_env, fd_obj, &fd_obj->selecting_process_monitor); - fd_obj->selecting_process_id = INVALID_PROCESS_ID; + if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { + enif_demonitor_process(caller_env, rsrc_obj, &rsrc_obj->selecting_process_monitor); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; } - TRACE("socket_stop called on fd=%i\n", fd_obj->fd); + TRACE("socket_stop called on fd=%i\n", rsrc_obj->fd); } +#endif static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNifMonitor *mon) { @@ -168,32 +302,105 @@ static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNif UNUSED(pid); UNUSED(mon); - struct SocketFd *fd_obj = (struct SocketFd *) obj; + struct SocketResource *rsrc_obj = (struct SocketResource *) obj; - TRACE("socket_down called on process_id=%i fd=%i\n", *pid, fd_obj->fd); +#if OTP_SOCKET_BSD + TRACE("socket_down called on process_id=%i fd=%i\n", *pid, rsrc_obj->fd); +#else + TRACE("socket_down called on process_id=%i\n", *pid); +#endif - if (fd_obj->selecting_process_id != INVALID_PROCESS_ID) { +#if OTP_SOCKET_BSD + if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { // Monitor fired, so make sure we don't try to demonitor in select_stop // as it could crash trying to reacquire lock on process table - fd_obj->selecting_process_id = INVALID_PROCESS_ID; - enif_select(caller_env, fd_obj->fd, ERL_NIF_SELECT_STOP, fd_obj, NULL, term_nil()); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + enif_select(caller_env, rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil()); } +#elif OTP_SOCKET_LWIP + // Monitor can be called when we're selecting, accepting or connecting. + if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { + LWIP_BEGIN(); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + if (rsrc_obj->socket_state & SocketStateTCP) { + if (rsrc_obj->socket_state & SocketStateTCPListening) { + (void) tcp_close(rsrc_obj->tcp_pcb); + } else { + tcp_abort(rsrc_obj->tcp_pcb); + } + rsrc_obj->tcp_pcb = NULL; + rsrc_obj->socket_state = SocketStateClosed; + } else if (rsrc_obj->socket_state & SocketStateUDP) { + udp_remove(rsrc_obj->udp_pcb); + rsrc_obj->udp_pcb = NULL; + rsrc_obj->socket_state = SocketStateClosed; + } + LWIP_END(); + } +#endif } static const ErlNifResourceTypeInit SocketResourceTypeInit = { .members = 3, .dtor = socket_dtor, +#if OTP_SOCKET_BSD .stop = socket_stop, +#else + .stop = NULL, +#endif .down = socket_down, }; +// select emulation for lwIP that doesn't have select. +#if OTP_SOCKET_LWIP +static term select_event_make_notification(struct SocketResource *rsrc_obj, Heap *heap) +{ + term notification = term_alloc_tuple(4, heap); + term_put_tuple_element(notification, 0, SELECT_ATOM); + term_put_tuple_element(notification, 1, term_from_resource(rsrc_obj, heap)); + struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj); + refc_binary_increment_refcount(rsrc_refc); + term ref; + if (rsrc_obj->ref_ticks == 0) { + ref = UNDEFINED_ATOM; + } else { + ref = term_from_ref_ticks(rsrc_obj->ref_ticks, heap); + } + term_put_tuple_element(notification, 2, ref); + term_put_tuple_element(notification, 3, READY_INPUT_ATOM); + return notification; +} + +static void select_event_send_notification_from_nif(struct SocketResource *rsrc_obj, Context *locked_ctx) +{ + BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(4) + REF_SIZE + TERM_BOXED_RESOURCE_SIZE, heap) + term notification = select_event_make_notification(rsrc_obj, &heap); + mailbox_send(locked_ctx, notification); + END_WITH_STACK_HEAP(heap, locked_ctx->global) +} + +static void select_event_send_notification_from_cb(struct SocketResource *rsrc_obj, int32_t process_id) +{ + struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj); + GlobalContext *global = rsrc_refc->resource_type->global; + BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(4) + REF_SIZE + TERM_BOXED_RESOURCE_SIZE, heap) + term notification = select_event_make_notification(rsrc_obj, &heap); +#if OTP_SOCKET_LWIP_ISR + globalcontext_send_message_from_isr(global, process_id, NormalMessage, notification); +#else + globalcontext_send_message(global, process_id, notification); +#endif + END_WITH_STACK_HEAP(heap, global) +} +#endif + // register the socket_fd resource type void otp_socket_init(GlobalContext *global) { ErlNifEnv env; erl_nif_env_partial_init_from_globalcontext(&env, global); socket_resource_type = enif_init_resource_type(&env, "socket_fd", &SocketResourceTypeInit, ERL_NIF_RT_CREATE, NULL); -#if HAVE_SIGNAL +#if OTP_SOCKET_BSD && HAVE_SIGNAL // ignore pipe signals on shutdown signal(SIGPIPE, SIG_IGN); #endif @@ -284,12 +491,28 @@ static inline int get_protocol(GlobalContext *global, term protocol_term, bool * static inline term make_error_tuple(term reason, Context *ctx) { + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } term error_tuple = term_alloc_tuple(2, &ctx->heap); term_put_tuple_element(error_tuple, 0, ERROR_ATOM); term_put_tuple_element(error_tuple, 1, reason); return error_tuple; } +#if OTP_SOCKET_BSD +static term make_errno_tuple(Context *ctx) +{ + return make_error_tuple(posix_errno_to_term(errno, ctx->global), ctx); +} +#elif OTP_SOCKET_LWIP +static term make_lwip_err_tuple(err_t err, Context *ctx) +{ + return make_error_tuple(term_from_int(err), ctx); +} +#endif + // // open // @@ -316,37 +539,66 @@ static term nif_socket_open(Context *ctx, int argc, term argv[]) RAISE_ERROR(BADARG_ATOM); } - int fd = socket(domain, type, protocol); - if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) { + struct SocketResource *rsrc_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketResource)); + if (IS_NULL_PTR(rsrc_obj)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } +#if OTP_SOCKET_BSD + rsrc_obj->fd = socket(domain, type, protocol); + if (UNLIKELY(rsrc_obj->fd == -1 || rsrc_obj->fd == CLOSED_FD)) { AVM_LOGE(TAG, "Failed to initialize socket."); - - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } - - return make_error_tuple(posix_errno_to_term(errno, global), ctx); - + enif_release_resource(rsrc_obj); + return make_errno_tuple(ctx); } else { - - struct SocketFd *fd_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketFd)); - - if (IS_NULL_PTR(fd_obj)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); + TRACE("nif_socket_open: Created socket fd=%i\n", rsrc_obj->fd); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + +#elif OTP_SOCKET_LWIP + if (domain == PF_INET && type == SOCK_STREAM && protocol == IPPROTO_TCP) { + LWIP_BEGIN(); + rsrc_obj->tcp_pcb = tcp_new(); + LWIP_END(); + rsrc_obj->socket_state = SocketStateTCPNew; + } else if (domain == PF_INET && type == SOCK_DGRAM && protocol == IPPROTO_UDP) { + LWIP_BEGIN(); + rsrc_obj->udp_pcb = udp_new(); + LWIP_END(); + rsrc_obj->socket_state = SocketStateUDPIdle; + } else { + enif_release_resource(rsrc_obj); + RAISE_ERROR(BADARG_ATOM); + } + if (UNLIKELY(rsrc_obj->udp_pcb == NULL && rsrc_obj->tcp_pcb == NULL)) { + rsrc_obj->socket_state = SocketStateClosed; + enif_release_resource(rsrc_obj); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } else { + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + rsrc_obj->linger_on = false; + rsrc_obj->linger_sec = 0; + rsrc_obj->pos = 0; + if (rsrc_obj->socket_state & SocketStateTCP) { + list_init(&rsrc_obj->tcp_received_list); + LWIP_BEGIN(); + tcp_arg(rsrc_obj->tcp_pcb, rsrc_obj); + tcp_recv(rsrc_obj->tcp_pcb, tcp_recv_cb); + LWIP_END(); + } else { + list_init(&rsrc_obj->udp_received_list); + LWIP_BEGIN(); + udp_recv(rsrc_obj->udp_pcb, udp_recv_cb, rsrc_obj->udp_pcb); + LWIP_END(); } - - fd_obj->fd = fd; - fd_obj->selecting_process_id = INVALID_PROCESS_ID; - TRACE("nif_socket_open: Created socket fd=%i\n", fd_obj->fd); +#endif if (UNLIKELY(memory_ensure_free(ctx, TERM_BOXED_RESOURCE_SIZE) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - term obj = enif_make_resource(erl_nif_env_from_context(ctx), fd_obj); - enif_release_resource(fd_obj); + term obj = enif_make_resource(erl_nif_env_from_context(ctx), rsrc_obj); + enif_release_resource(rsrc_obj); size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &obj, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { @@ -396,17 +648,18 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) VALIDATE_VALUE(argv[0], term_is_socket); - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - if (fd_obj->selecting_process_id != INVALID_PROCESS_ID) { + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; +#if OTP_SOCKET_BSD + if (rsrc_obj->fd) { + if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { // // If we are in select, then stop selecting // - if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), fd_obj->fd, ERL_NIF_SELECT_STOP, fd_obj, NULL, term_nil()) < 0)) { + if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil()) < 0)) { RAISE_ERROR(BADARG_ATOM); } // @@ -414,7 +667,7 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) // the send a {closed, Ref} message to it, so that it can break // out of its receive statement. // - if (fd_obj->selecting_process_id != ctx->process_id) { + if (rsrc_obj->selecting_process_id != ctx->process_id) { // send a {closed, Ref | undefined} message to the pid if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + REF_SIZE) != MEMORY_GC_OK)) { @@ -424,28 +677,50 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) term error_tuple = term_alloc_tuple(2, &ctx->heap); term_put_tuple_element(error_tuple, 0, CLOSED_ATOM); - term ref = (fd_obj->ref_ticks == 0) ? UNDEFINED_ATOM : term_from_ref_ticks(fd_obj->ref_ticks, &ctx->heap); + term ref = (rsrc_obj->ref_ticks == 0) ? UNDEFINED_ATOM : term_from_ref_ticks(rsrc_obj->ref_ticks, &ctx->heap); term_put_tuple_element(error_tuple, 1, ref); - TRACE("nif_socket_close: Sending msg to process %i\n", fd_obj->selecting_process_id); - globalcontext_send_message(ctx->global, fd_obj->selecting_process_id, error_tuple); + TRACE("nif_socket_close: Sending msg to process %i\n", rsrc_obj->selecting_process_id); + globalcontext_send_message(ctx->global, rsrc_obj->selecting_process_id, error_tuple); } else { - AVM_LOGW(TAG, "Selectable socket %i was closed but no known pid is waiting. This shouldn't happen.", fd_obj->fd); + AVM_LOGW(TAG, "Selectable socket %i was closed but no known pid is waiting. This shouldn't happen.", rsrc_obj->fd); } } - int res = close(fd_obj->fd); + int res = close(rsrc_obj->fd); if (UNLIKELY(res != 0)) { AVM_LOGW(TAG, "Failed to close socket %i", res); } - TRACE("nif_socket_close: Clearing pid for socket fd=%i\n", fd_obj->fd); - fd_obj->fd = CLOSED_FD; - fd_obj->selecting_process_id = INVALID_PROCESS_ID; - fd_obj->ref_ticks = 0; + TRACE("nif_socket_close: Clearing pid for socket fd=%i\n", rsrc_obj->fd); + rsrc_obj->fd = CLOSED_FD; + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + rsrc_obj->ref_ticks = 0; + } else { + TRACE("Double close on socket fd %i", rsrc_obj->fd); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state & SocketStateTCP) { + // TODO: Maybe the socket was selecting, accepting or connecting + LWIP_BEGIN(); + err_t err = tcp_close(rsrc_obj->tcp_pcb); + LWIP_END(); + if (UNLIKELY(err != ERR_OK)) { + AVM_LOGW(TAG, "tcp_close failed with err=%d: %s:%i.\n", err, __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + rsrc_obj->tcp_pcb = NULL; + rsrc_obj->socket_state = SocketStateClosed; + } else if (rsrc_obj->socket_state & SocketStateUDP) { + LWIP_BEGIN(); + udp_remove(rsrc_obj->udp_pcb); + LWIP_END(); + rsrc_obj->udp_pcb = NULL; + rsrc_obj->socket_state = SocketStateClosed; } else { - TRACE("Double close on socket fd %i", fd_obj->fd); + TRACE("Double close on pcb", rsrc_obj); } +#endif return OK_ATOM; } @@ -454,9 +729,104 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) // select // -static term nif_socket_select(Context *ctx, term argv[], enum ErlNifSelectFlags mode) +#if OTP_SOCKET_LWIP +static struct SocketResource *make_accepted_socket_resource(struct tcp_pcb *newpcb) +{ + struct SocketResource *conn_rsrc_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketResource)); + conn_rsrc_obj->socket_state = SocketStateTCPConnected; + conn_rsrc_obj->tcp_pcb = newpcb; + conn_rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + conn_rsrc_obj->pos = 0; + conn_rsrc_obj->linger_on = false; + conn_rsrc_obj->linger_sec = 0; + list_init(&conn_rsrc_obj->tcp_received_list); + + tcp_arg(newpcb, conn_rsrc_obj); + tcp_recv(newpcb, tcp_recv_cb); + return conn_rsrc_obj; +} + +static err_t tcp_selecting_accept_cb(void *arg, struct tcp_pcb *newpcb, err_t err) +{ + UNUSED(err); + + struct SocketResource *rsrc_obj = (struct SocketResource *) arg; + if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { + if (newpcb != NULL) { + // Delay accepting of further clients + tcp_backlog_delayed(rsrc_obj->tcp_pcb); + // Immediatly configure the pcb, even if it's not accepted yet + struct SocketResource *conn_rsrc_obj = make_accepted_socket_resource(newpcb); + rsrc_obj->selected_client = conn_rsrc_obj; + rsrc_obj->socket_state = SocketStateTCPSelectingOneClient; + select_event_send_notification_from_cb(rsrc_obj, rsrc_obj->selecting_process_id); + } else { + // Ignore errors for now (we are still accepting). + } + return ERR_OK; + } else { + // Selecting process died + if (newpcb != NULL) { + if (rsrc_obj->socket_state & SocketStateTCPListening) { + (void) tcp_close(newpcb); + } else { + tcp_abort(newpcb); + } + } + return ERR_ABRT; + } +} + +static err_t tcp_recv_cb(void *arg, struct tcp_pcb *tpcb, struct pbuf *p, err_t err) +{ + UNUSED(tpcb); + + struct SocketResource *rsrc_obj = (struct SocketResource *) arg; + + struct TCPReceivedItem *new_item = malloc(sizeof(struct TCPReceivedItem)); + list_append(&rsrc_obj->tcp_received_list, &new_item->list_head); + new_item->buf = p; + new_item->err = err; + + // Send notification if we are selecting. + if (rsrc_obj->socket_state & SocketStateSelectingRead) { + // Clear flag to avoid sending a message again. + rsrc_obj->socket_state &= ~SocketStateSelectingRead; + if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { + select_event_send_notification_from_cb(rsrc_obj, rsrc_obj->selecting_process_id); + } // otherwise, selecting process died but we can just wait for monitor to handle it + } + return ERR_OK; +} + +static void udp_recv_cb(void *arg, struct udp_pcb *pcb, struct pbuf *p, const ip_addr_t *addr, u16_t port) +{ + UNUSED(pcb); + + struct SocketResource *rsrc_obj = (struct SocketResource *) arg; + + struct UDPReceivedItem *new_item = malloc(sizeof(struct UDPReceivedItem)); + list_append(&rsrc_obj->tcp_received_list, &new_item->list_head); + new_item->buf = p; + new_item->addr = addr; + new_item->port = port; + + // Send notification if we are selecting. + if (rsrc_obj->socket_state & SocketStateSelectingRead) { + // Clear flag to avoid sending a message again. + rsrc_obj->socket_state &= ~SocketStateSelectingRead; + if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { + select_event_send_notification_from_cb(rsrc_obj, rsrc_obj->selecting_process_id); + } // otherwise, selecting process died + } +} +#endif + +static term nif_socket_select_read(Context *ctx, int argc, term argv[]) { - TRACE("nif_socket_select\n"); + TRACE("nif_socket_select_read\n"); + + UNUSED(argc); VALIDATE_VALUE(argv[0], term_is_socket); @@ -469,45 +839,91 @@ static term nif_socket_select(Context *ctx, term argv[], enum ErlNifSelectFlags if (select_ref_term != UNDEFINED_ATOM) { VALIDATE_VALUE(select_ref_term, term_is_reference); } - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - TRACE("fd_obj->fd=%i\n", (int) fd_obj->fd); + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; ErlNifEnv *env = erl_nif_env_from_context(ctx); - if (fd_obj->selecting_process_id != process_pid && fd_obj->selecting_process_id != INVALID_PROCESS_ID) { - if (UNLIKELY(enif_demonitor_process(env, fd_obj, &fd_obj->selecting_process_monitor) != 0)) { - // RAISE_ERROR(posix_errno_to_term(EINVAL, global)); - } - fd_obj->selecting_process_id = INVALID_PROCESS_ID; + if (rsrc_obj->selecting_process_id != process_pid && rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { + // demonitor can fail if process is gone. + enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; } // Monitor first as select is less likely to fail and it's less expensive to demonitor // if select fails than to stop select if monitor fails - if (fd_obj->selecting_process_id != process_pid) { - if (UNLIKELY(enif_monitor_process(env, fd_obj, &process_pid, &fd_obj->selecting_process_monitor) != 0)) { + if (rsrc_obj->selecting_process_id != process_pid) { + if (UNLIKELY(enif_monitor_process(env, rsrc_obj, &process_pid, &rsrc_obj->selecting_process_monitor) != 0)) { RAISE_ERROR(NOPROC_ATOM); } - fd_obj->selecting_process_id = process_pid; + rsrc_obj->selecting_process_id = process_pid; } - if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), fd_obj->fd, mode, fd_obj, &process_pid, select_ref_term) < 0)) { - enif_demonitor_process(env, fd_obj, &fd_obj->selecting_process_monitor); - fd_obj->selecting_process_id = INVALID_PROCESS_ID; + rsrc_obj->ref_ticks = (select_ref_term == UNDEFINED_ATOM) ? 0 : term_to_ref_ticks(select_ref_term); + +#if OTP_SOCKET_BSD + TRACE("rsrc_obj->fd=%i\n", (int) rsrc_obj->fd); + + if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_READ, rsrc_obj, &process_pid, select_ref_term) < 0)) { + enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; RAISE_ERROR(BADARG_ATOM); } - TRACE("nif_socket_select: Setting pid for socket fd %i to %i\n", fd_obj->fd, process_pid); - fd_obj->ref_ticks = (select_ref_term == UNDEFINED_ATOM) ? 0 : term_to_ref_ticks(select_ref_term); + TRACE("nif_socket_select: Setting pid for socket fd %i to %i\n", rsrc_obj->fd, process_pid); - return OK_ATOM; -} +#elif OTP_SOCKET_LWIP + GlobalContext *global = ctx->global; -static term nif_socket_select_read(Context *ctx, int argc, term argv[]) -{ - UNUSED(argc); - return nif_socket_select(ctx, argv, ERL_NIF_SELECT_READ); + LWIP_BEGIN(); + if (rsrc_obj->socket_state == SocketStateTCPListening) { + rsrc_obj->socket_state = SocketStateTCPSelectingAccept; + tcp_accept(rsrc_obj->tcp_pcb, tcp_selecting_accept_cb); + // No longer delay processing of incoming connections + tcp_backlog_accepted(rsrc_obj->tcp_pcb); + } else if (rsrc_obj->socket_state == SocketStateTCPSelectingAccept) { + // noop + } else if (rsrc_obj->socket_state == SocketStateTCPSelectingOneClient || rsrc_obj->socket_state == SocketStateTCPSelectStopOneClient) { + rsrc_obj->socket_state = SocketStateTCPSelectingOneClient; + // Resend notification + if (ctx->process_id == process_pid) { + select_event_send_notification_from_nif(rsrc_obj, ctx); + } else { + Context *target = globalcontext_get_process_lock(global, process_pid); + if (target) { + select_event_send_notification_from_nif(rsrc_obj, ctx); + globalcontext_get_process_unlock(global, target); + } + } + } else if (rsrc_obj->socket_state == SocketStateTCPConnected) { + if (!list_is_empty(&rsrc_obj->tcp_received_list)) { + // Send (or resend) notification + select_event_send_notification_from_nif(rsrc_obj, ctx); + } else { + // Set flag to send it when a package will arrive. + rsrc_obj->socket_state = SocketStateTCPSelectingRead; + } + } else if (rsrc_obj->socket_state == SocketStateTCPSelectingRead) { + // noop + } else if (rsrc_obj->socket_state == SocketStateUDPIdle) { + if (!list_is_empty(&rsrc_obj->udp_received_list)) { + // Send (or resend) notification + select_event_send_notification_from_nif(rsrc_obj, ctx); + } else { + rsrc_obj->socket_state = SocketStateUDPSelectingRead; + } + } else if (rsrc_obj->socket_state == SocketStateUDPSelectingRead) { + // noop + } else { + enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor); + LWIP_END(); + RAISE_ERROR(BADARG_ATOM); + } + LWIP_END(); +#endif + + return OK_ATOM; } static term nif_socket_select_stop(Context *ctx, int argc, term argv[]) @@ -517,16 +933,30 @@ static term nif_socket_select_stop(Context *ctx, int argc, term argv[]) VALIDATE_VALUE(argv[0], term_is_socket); - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), fd_obj->fd, ERL_NIF_SELECT_STOP, fd_obj, NULL, term_nil()) < 0)) { + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; +#if OTP_SOCKET_BSD + if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil()) < 0)) { RAISE_ERROR(BADARG_ATOM); } return OK_ATOM; +#elif OTP_SOCKET_LWIP + LWIP_BEGIN(); + if (rsrc_obj->socket_state == SocketStateTCPSelectingAccept) { + rsrc_obj->socket_state = SocketStateTCPListening; + // Suspend processing of incoming connections + tcp_backlog_delayed(rsrc_obj->tcp_pcb); + } else if (rsrc_obj->socket_state == SocketStateTCPSelectingOneClient) { + rsrc_obj->socket_state = SocketStateTCPSelectStopOneClient; + } + LWIP_END(); + + return OK_ATOM; +#endif } // @@ -542,59 +972,78 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - term level_tuple = argv[1]; - term value = argv[2]; + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; - term opt = term_get_tuple_element(level_tuple, 1); - if (globalcontext_is_term_equal_to_atom_string(global, opt, reuseaddr_atom)) { - int option_value = (value == TRUE_ATOM); - int res = setsockopt(fd_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int)); - if (UNLIKELY(res != 0)) { - - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } - return make_error_tuple(posix_errno_to_term(errno, global), ctx); +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { +#endif + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } + term level_tuple = argv[1]; + term value = argv[2]; + + term opt = term_get_tuple_element(level_tuple, 1); + if (globalcontext_is_term_equal_to_atom_string(global, opt, reuseaddr_atom)) { + int option_value = (value == TRUE_ATOM); +#if OTP_SOCKET_BSD + int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int)); + if (UNLIKELY(res != 0)) { + return make_errno_tuple(ctx); + } else { + return OK_ATOM; + } +#elif OTP_SOCKET_LWIP + LWIP_BEGIN(); + if (option_value) { + if (rsrc_obj->socket_state & SocketStateTCP) { + ip_set_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR); } else { - return OK_ATOM; + ip_set_option(rsrc_obj->udp_pcb, SOF_REUSEADDR); } - } else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) { - term onoff = interop_kv_get_value(value, onoff_atom, ctx->global); - term linger = interop_kv_get_value(value, linger_atom, ctx->global); - VALIDATE_VALUE(linger, term_is_integer); - - struct linger sl; - sl.l_onoff = (onoff == TRUE_ATOM); - sl.l_linger = term_to_int(linger); - int res = setsockopt(fd_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); - if (UNLIKELY(res != 0)) { - - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } - return make_error_tuple(posix_errno_to_term(errno, global), ctx); + } else { + if (rsrc_obj->socket_state & SocketStateTCP) { + ip_reset_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR); } else { - return OK_ATOM; + ip_reset_option(rsrc_obj->udp_pcb, SOF_REUSEADDR); } - // TODO add more as needed - // int flag = 1; - // int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); - // if (UNLIKELY(res != 0)) { - // AVM_LOGW(TAG, "Failed to set TCP_NODELAY."); - // } + } + LWIP_END(); + return OK_ATOM; +#endif + } else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) { + term onoff = interop_kv_get_value(value, onoff_atom, ctx->global); + term linger = interop_kv_get_value(value, linger_atom, ctx->global); + VALIDATE_VALUE(linger, term_is_integer); + +#if OTP_SOCKET_BSD + struct linger sl; + sl.l_onoff = (onoff == TRUE_ATOM); + sl.l_linger = term_to_int(linger); + int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); + if (UNLIKELY(res != 0)) { + return make_errno_tuple(ctx); } else { - RAISE_ERROR(BADARG_ATOM); + return OK_ATOM; } +#elif OTP_SOCKET_LWIP + rsrc_obj->linger_on = (onoff == TRUE_ATOM); + rsrc_obj->linger_sec = term_to_int(linger); + return OK_ATOM; +#endif + // TODO add more as needed + // int flag = 1; + // int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); + // if (UNLIKELY(res != 0)) { + // AVM_LOGW(TAG, "Failed to set TCP_NODELAY."); + // } } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + RAISE_ERROR(BADARG_ATOM); } } @@ -611,43 +1060,58 @@ static term nif_socket_sockname(Context *ctx, int argc, term argv[]) GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - struct sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - int res = getsockname(fd_obj->fd, (struct sockaddr *) &addr, &addrlen); - - if (UNLIKELY(res != 0)) { - AVM_LOGE(TAG, "Unable to getsockname: fd=%i res=%i.", fd_obj->fd, res); - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { +#endif + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } - return make_error_tuple(posix_errno_to_term(errno, global), ctx); - } else { - // {ok, #{addr => {a,b,c,d}, port => integer()}} - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + term_map_size_in_terms(2) + TUPLE_SIZE(4)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } else { - term address = socket_tuple_from_addr(ctx, ntohl(addr.sin_addr.s_addr)); - term port_number = term_from_int(ntohs(addr.sin_port)); +#if OTP_SOCKET_BSD + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + int res = getsockname(rsrc_obj->fd, (struct sockaddr *) &addr, &addrlen); - term map = term_alloc_map(2, &ctx->heap); - term_set_map_assoc(map, 0, ADDR_ATOM, address); - term_set_map_assoc(map, 1, PORT_ATOM, port_number); - term return_value = port_create_tuple2(ctx, OK_ATOM, map); + if (UNLIKELY(res != 0)) { + AVM_LOGE(TAG, "Unable to getsockname: fd=%i res=%i.", rsrc_obj->fd, res); + return make_errno_tuple(ctx); + } + uint32_t ip4_u32 = ntohl(addr.sin_addr.s_addr); + uint16_t port_u16 = ntohs(addr.sin_port); +#elif OTP_SOCKET_LWIP + uint32_t ip4_u32; + uint16_t port_u16; + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateTCP) { + ip4_u32 = ntohl(ip_addr_get_ip4_u32(&rsrc_obj->tcp_pcb->local_ip)); + port_u16 = rsrc_obj->tcp_pcb->local_port; + } else { + ip4_u32 = ntohl(ip_addr_get_ip4_u32(&rsrc_obj->udp_pcb->local_ip)); + port_u16 = rsrc_obj->udp_pcb->local_port; + } + LWIP_END(); +#endif - return return_value; - } - } + // {ok, #{addr => {a,b,c,d}, port => integer()}} + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + term_map_size_in_terms(2) + TUPLE_SIZE(4)) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); } else { - RAISE_ERROR(posix_errno_to_term(EBADF, ctx->global)); + term address = socket_tuple_from_addr(ctx, ip4_u32); + term port_number = term_from_int(port_u16); + + term map = term_alloc_map(2, &ctx->heap); + term_set_map_assoc(map, 0, ADDR_ATOM, address); + term_set_map_assoc(map, 1, PORT_ATOM, port_number); + term return_value = port_create_tuple2(ctx, OK_ATOM, map); + + return return_value; } } @@ -664,43 +1128,60 @@ static term nif_socket_peername(Context *ctx, int argc, term argv[]) GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - struct sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - int res = getpeername(fd_obj->fd, (struct sockaddr *) &addr, &addrlen); + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; - if (UNLIKELY(res != 0)) { - AVM_LOGE(TAG, "Unable to getpeername: fd=%i res=%i.", fd_obj->fd, res); - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } + if (rsrc_obj->socket_state & SocketStateUDP) { + // TODO: handle "connected" UDP sockets + return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); + } + if ((rsrc_obj->socket_state & SocketStateTCPListening) == SocketStateTCPListening) { + return make_error_tuple(posix_errno_to_term(ENOTCONN, global), ctx); + } +#endif - return make_error_tuple(posix_errno_to_term(errno, global), ctx); - } else { - // {ok, #{addr => {a,b,c,d}, port => integer()}} - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + term_map_size_in_terms(2) + TUPLE_SIZE(4)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } else { - term address = socket_tuple_from_addr(ctx, ntohl(addr.sin_addr.s_addr)); - term port_number = term_from_int(ntohs(addr.sin_port)); +#if OTP_SOCKET_BSD + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + int res = getpeername(rsrc_obj->fd, (struct sockaddr *) &addr, &addrlen); - term map = term_alloc_map(2, &ctx->heap); - term_set_map_assoc(map, 0, ADDR_ATOM, address); - term_set_map_assoc(map, 1, PORT_ATOM, port_number); - term return_value = port_create_tuple2(ctx, OK_ATOM, map); + if (UNLIKELY(res != 0)) { + AVM_LOGE(TAG, "Unable to getpeername: fd=%i res=%i.", rsrc_obj->fd, res); + return make_errno_tuple(ctx); + } + uint32_t ip4_u32 = ntohl(addr.sin_addr.s_addr); + uint16_t port_u16 = ntohs(addr.sin_port); +#elif OTP_SOCKET_LWIP + // TODO: support peername for "connected" UDP sockets + uint32_t ip4_u32 = ntohl(ip_addr_get_ip4_u32(&rsrc_obj->tcp_pcb->remote_ip)); + uint16_t port_u16 = rsrc_obj->tcp_pcb->remote_port; +#endif - return return_value; - } - } + // {ok, #{addr => {a,b,c,d}, port => integer()}} + if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + term_map_size_in_terms(2) + TUPLE_SIZE(4)) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + term address = socket_tuple_from_addr(ctx, ip4_u32); + term port_number = term_from_int(port_u16); + + term map = term_alloc_map(2, &ctx->heap); + term_set_map_assoc(map, 0, ADDR_ATOM, address); + term_set_map_assoc(map, 1, PORT_ATOM, port_number); + term return_value = port_create_tuple2(ctx, OK_ATOM, map); + + return return_value; } } @@ -717,51 +1198,95 @@ static term nif_socket_bind(Context *ctx, int argc, term argv[]) GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - term sockaddr = argv[1]; + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; +#if OTP_SOCKET_BSD + TRACE("rsrc_obj->fd=%i\n", (int) rsrc_obj->fd); + if (rsrc_obj->fd == 0) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } +#endif + + term sockaddr = argv[1]; - struct sockaddr_in serveraddr; - memset(&serveraddr, 0, sizeof(serveraddr)); - serveraddr.sin_family = AF_INET; +#if OTP_SOCKET_BSD + struct sockaddr_in serveraddr; + memset(&serveraddr, 0, sizeof(serveraddr)); + serveraddr.sin_family = AF_INET; +#elif OTP_SOCKET_LWIP + ip_addr_t ip_addr; +#endif + + uint16_t port_u16 = 0; - if (globalcontext_is_term_equal_to_atom_string(global, sockaddr, any_atom)) { + if (globalcontext_is_term_equal_to_atom_string(global, sockaddr, any_atom)) { +#if OTP_SOCKET_BSD + serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); +#elif OTP_SOCKET_LWIP + ip_addr_set_any(false, &ip_addr); +#endif + } else if (globalcontext_is_term_equal_to_atom_string(global, sockaddr, loopback_atom)) { +#if OTP_SOCKET_BSD + serveraddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); +#elif OTP_SOCKET_LWIP + ip_addr_set_loopback(false, &ip_addr); +#endif + } else if (term_is_map(sockaddr)) { + term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), ctx->global); + port_u16 = term_to_int(port); + term addr = interop_kv_get_value(sockaddr, addr_atom, ctx->global); + if (globalcontext_is_term_equal_to_atom_string(global, addr, any_atom)) { +#if OTP_SOCKET_BSD serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); - } else if (globalcontext_is_term_equal_to_atom_string(global, sockaddr, loopback_atom)) { +#elif OTP_SOCKET_LWIP + ip_addr_set_any(false, &ip_addr); +#endif + } else if (globalcontext_is_term_equal_to_atom_string(global, addr, loopback_atom)) { +#if OTP_SOCKET_BSD serveraddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - } else if (term_is_map(sockaddr)) { - term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), ctx->global); - serveraddr.sin_port = htons(term_to_int(port)); - term addr = interop_kv_get_value(sockaddr, addr_atom, ctx->global); - if (globalcontext_is_term_equal_to_atom_string(global, addr, any_atom)) { - serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); - } else if (globalcontext_is_term_equal_to_atom_string(global, addr, loopback_atom)) { - serveraddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - } else { - serveraddr.sin_addr.s_addr = htonl(socket_tuple_to_addr(addr)); - } - } - - socklen_t address_len = sizeof(serveraddr); - int res = bind(fd_obj->fd, (struct sockaddr *) &serveraddr, address_len); - if (UNLIKELY(res != 0)) { - AVM_LOGE(TAG, "Unable to bind socket: res=%i.", res); - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } - return make_error_tuple(posix_errno_to_term(errno, global), ctx); +#elif OTP_SOCKET_LWIP + ip_addr_set_loopback(false, &ip_addr); +#endif } else { - return OK_ATOM; +#if OTP_SOCKET_BSD + serveraddr.sin_addr.s_addr = htonl(socket_tuple_to_addr(addr)); +#elif OTP_SOCKET_LWIP + ip_addr_set_ip4_u32(&ip_addr, htonl(socket_tuple_to_addr(addr))); +#endif } + } +#if OTP_SOCKET_BSD + serveraddr.sin_port = htons(port_u16); + socklen_t address_len = sizeof(serveraddr); + int res = bind(rsrc_obj->fd, (struct sockaddr *) &serveraddr, address_len); + if (UNLIKELY(res != 0)) { + AVM_LOGE(TAG, "Unable to bind socket: res=%i.", res); + return make_errno_tuple(ctx); } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + return OK_ATOM; + } +#elif OTP_SOCKET_LWIP + err_t res; + if (rsrc_obj->socket_state & SocketStateTCP) { + res = tcp_bind(rsrc_obj->tcp_pcb, &ip_addr, port_u16); + } else { + res = udp_bind(rsrc_obj->udp_pcb, &ip_addr, port_u16); + } + if (UNLIKELY(res != ERR_OK)) { + AVM_LOGE(TAG, "Unable to bind socket: res=%i.", res); + return make_lwip_err_tuple(res, ctx); + } else { + return OK_ATOM; } +#endif } // @@ -778,34 +1303,77 @@ static term nif_socket_listen(Context *ctx, int argc, term argv[]) VALIDATE_VALUE(argv[0], term_is_socket); VALIDATE_VALUE(argv[1], term_is_integer); - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - int backlog = term_to_int(argv[1]); - int res = listen(fd_obj->fd, backlog); - if (UNLIKELY(res != 0)) { - AVM_LOGE(TAG, "Unable to listen on socket: res=%i.", res); - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } + if (rsrc_obj->socket_state & SocketStateUDP) { + return make_error_tuple(posix_errno_to_term(EPROTOTYPE, global), ctx); + } +#endif - return make_error_tuple(posix_errno_to_term(errno, global), ctx); - } else { - return OK_ATOM; - } + int backlog = term_to_int(argv[1]); + +#if OTP_SOCKET_BSD + int res = listen(rsrc_obj->fd, backlog); + if (UNLIKELY(res != 0)) { + AVM_LOGE(TAG, "Unable to listen on socket: res=%i.", res); + return make_errno_tuple(ctx); } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + return OK_ATOM; + } +#elif OTP_SOCKET_LWIP + uint8_t backlog_u8; + if (backlog > 255) { + // POSIX says: "Implementations may impose a limit on backlog and silently reduce the specified value" + backlog_u8 = 255; } + if (backlog < 0) { + // POSIX says: "If listen() is called with a backlog argument value that is less than 0, the function behaves as if it had been called with a backlog argument value of 0." + backlog_u8 = 0; + } else { + backlog_u8 = backlog; + } + err_t err; + struct tcp_pcb *new_pcb = tcp_listen_with_backlog_and_err(rsrc_obj->tcp_pcb, backlog_u8, &err); + if (new_pcb == NULL) { + return make_lwip_err_tuple(err, ctx); + } + // Do not accept until we select/accept + tcp_backlog_delayed(new_pcb); + rsrc_obj->tcp_pcb = new_pcb; + rsrc_obj->socket_state = SocketStateTCPListening; + return OK_ATOM; +#endif } // // accept // +#if OTP_SOCKET_LWIP +static term make_accepted_socket_term(struct SocketResource *conn_rsrc_obj, Heap *heap, GlobalContext *global) +{ + term obj = term_from_resource(conn_rsrc_obj, heap); + + term socket_term = term_alloc_tuple(2, heap); + uint64_t ref_ticks = globalcontext_get_ref_ticks(global); + term ref = term_from_ref_ticks(ref_ticks, heap); + term_put_tuple_element(socket_term, 0, obj); + term_put_tuple_element(socket_term, 1, ref); + return socket_term; +} +#endif + static term nif_socket_accept(Context *ctx, int argc, term argv[]) { TRACE("nif_socket_accept\n"); @@ -815,132 +1383,216 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - struct sockaddr_in clientaddr; - socklen_t clientlen = sizeof(clientaddr); - int fd = accept(fd_obj->fd, (struct sockaddr *) &clientaddr, &clientlen); - if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) { - AVM_LOGE(TAG, "Unable to accept on socket %i.", fd_obj->fd); - - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } - - int err = errno; - term reason = (err == ECONNABORTED) ? CLOSED_ATOM : posix_errno_to_term(err, global); - return make_error_tuple(reason, ctx); + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state & SocketStateClosed) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } + if (rsrc_obj->socket_state & SocketStateUDP) { + return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); + } + // Only three states are allowed: + // - SocketStateTCPListening => block to accept (not used by otp_socket.erl) + // - SocketStateTCPSelectingOneClient => was selected, we have one in buffer + // - SocketStateTCPSelectStopOneClient => was selected, stop selecting, but we still have one in buffer + if (rsrc_obj->socket_state != SocketStateTCPListening && rsrc_obj->socket_state != SocketStateTCPSelectingOneClient && rsrc_obj->socket_state != SocketStateTCPSelectStopOneClient) { + return make_error_tuple(posix_errno_to_term(EINVAL, global), ctx); + } +#endif - } else { +#if OTP_SOCKET_BSD + struct sockaddr_in clientaddr; + socklen_t clientlen = sizeof(clientaddr); + int fd = accept(rsrc_obj->fd, (struct sockaddr *) &clientaddr, &clientlen); + if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) { + AVM_LOGE(TAG, "Unable to accept on socket %i.", rsrc_obj->fd); + int err = errno; + term reason = (err == ECONNABORTED) ? CLOSED_ATOM : posix_errno_to_term(err, global); + return make_error_tuple(reason, ctx); + } else { - struct SocketFd *conn_fd_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketFd)); - conn_fd_obj->fd = fd; - conn_fd_obj->selecting_process_id = INVALID_PROCESS_ID; - TRACE("nif_socket_accept: Created socket on accept fd=%i\n", fd_obj->fd); + struct SocketResource *conn_rsrc_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketResource)); + conn_rsrc_obj->fd = fd; + conn_rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + TRACE("nif_socket_accept: Created socket on accept fd=%i\n", rsrc_obj->fd); - term obj = enif_make_resource(erl_nif_env_from_context(ctx), conn_fd_obj); - enif_release_resource(conn_fd_obj); + term obj = enif_make_resource(erl_nif_env_from_context(ctx), conn_rsrc_obj); + enif_release_resource(conn_rsrc_obj); - size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; - if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &obj, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } + size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; + if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &obj, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } - term socket_term = term_alloc_tuple(2, &ctx->heap); - uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global); - term ref = term_from_ref_ticks(ref_ticks, &ctx->heap); - term_put_tuple_element(socket_term, 0, obj); - term_put_tuple_element(socket_term, 1, ref); + term socket_term = term_alloc_tuple(2, &ctx->heap); + uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global); + term ref = term_from_ref_ticks(ref_ticks, &ctx->heap); + term_put_tuple_element(socket_term, 0, obj); + term_put_tuple_element(socket_term, 1, ref); - term result = term_alloc_tuple(2, &ctx->heap); - term_put_tuple_element(result, 0, OK_ATOM); - term_put_tuple_element(result, 1, socket_term); + term result = term_alloc_tuple(2, &ctx->heap); + term_put_tuple_element(result, 0, OK_ATOM); + term_put_tuple_element(result, 1, socket_term); - return result; + return result; + } +#elif OTP_SOCKET_LWIP + term result; + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateSelectedOneClient) { + size_t requested_size = TERM_BOXED_RESOURCE_SIZE + TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; + if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + LWIP_END(); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + term socket_term = make_accepted_socket_term(rsrc_obj->selected_client, &ctx->heap, global); + rsrc_obj->selected_client = NULL; + if (rsrc_obj->socket_state == SocketStateTCPSelectStopOneClient) { + rsrc_obj->socket_state = SocketStateTCPListening; + } else { + rsrc_obj->socket_state = SocketStateTCPSelectingAccept; + // No longer delay processing of incoming connections + tcp_backlog_accepted(rsrc_obj->tcp_pcb); } + result = term_alloc_tuple(2, &ctx->heap); + term_put_tuple_element(result, 0, OK_ATOM); + term_put_tuple_element(result, 1, socket_term); } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + // return EAGAIN + return make_error_tuple(posix_errno_to_term(EAGAIN, ctx->global), ctx); } + LWIP_END(); + return result; +#endif } // // recv/recvfrom // -static term nif_socket_recv_with_peek(Context *ctx, int argc, term argv[], bool is_recvfrom) +#if OTP_SOCKET_BSD +static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) { TRACE("nif_socket_recv_with_peek\n"); - UNUSED(argc); - - VALIDATE_VALUE(argv[0], term_is_socket); GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { - RAISE_ERROR(BADARG_ATOM); + int flags = MSG_WAITALL; + // TODO parameterize buffer size + ssize_t res = recvfrom(rsrc_obj->fd, NULL, DEFAULT_BUFFER_SIZE, MSG_PEEK | flags, NULL, NULL); + TRACE("%li bytes available.\n", (long int) res); + if (res < 0) { + AVM_LOGI(TAG, "Unable to receive data on fd %i. errno=%i", rsrc_obj->fd, errno); + return make_errno_tuple(ctx); + } else if (res == 0) { + TRACE("Peer closed socket %i.\n", rsrc_obj->fd); + return make_error_tuple(CLOSED_ATOM, ctx); + } else { + ssize_t buffer_size = len == 0 ? (ssize_t) res : MIN((size_t) res, len); + + // {ok, Data :: binary()}} + size_t ensure_packet_avail = term_binary_data_size_in_terms(buffer_size) + BINARY_HEADER_SIZE; + size_t requested_size = ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + term_map_size_in_terms(2)) : 0); + + if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + + term data = term_create_uninitialized_binary(buffer_size, &ctx->heap, global); + const char *buffer = term_binary_data(data); + + // + // receive data on the socket + // + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + res = recvfrom(rsrc_obj->fd, (char *) buffer, buffer_size, flags, (struct sockaddr *) &addr, &addrlen); + + TRACE("otp_socket.recv_handler: received data on fd: %i available=%lu, read=%lu\n", rsrc_obj->fd, (unsigned long) res, (unsigned long) buffer_size); + + term payload = term_invalid_term(); + if (is_recvfrom) { + term address = socket_tuple_from_addr(ctx, ntohl(addr.sin_addr.s_addr)); + term port_number = term_from_int(ntohs(addr.sin_port)); + + term map = term_alloc_map(2, &ctx->heap); + term_set_map_assoc(map, 0, ADDR_ATOM, address); + term_set_map_assoc(map, 1, PORT_ATOM, port_number); + term tuple = port_heap_create_tuple2(&ctx->heap, map, data); + payload = port_heap_create_ok_tuple(&ctx->heap, tuple); + } else { + payload = port_heap_create_ok_tuple(&ctx->heap, data); + } + + return payload; } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - int flags = MSG_WAITALL; - // TODO parameterize buffer size - ssize_t res = recvfrom(fd_obj->fd, NULL, DEFAULT_BUFFER_SIZE, MSG_PEEK | flags, NULL, NULL); - TRACE("%li bytes available.\n", (long int) res); - if (res < 0) { - AVM_LOGI(TAG, "Unable to receive data on fd %i. errno=%i", fd_obj->fd, errno); +} - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } +static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) +{ + TRACE("nif_socket_recv_without_peek\n"); - return make_error_tuple(posix_errno_to_term(errno, global), ctx); + GlobalContext *global = ctx->global; - } else if (res == 0) { + // TODO plumb through buffer size + size_t buffer_size = len == 0 ? DEFAULT_BUFFER_SIZE : len; + char *buffer = malloc(buffer_size); + term payload = term_invalid_term(); - TRACE("Peer closed socket %i.\n", fd_obj->fd); + if (IS_NULL_PTR(buffer)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } else { + + int flags = 0; + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + ssize_t res = recvfrom(rsrc_obj->fd, (char *) buffer, buffer_size, flags, (struct sockaddr *) &addr, &addrlen); + + if (res < 0) { + + int err = errno; + term reason = (err == ECONNRESET) ? globalcontext_make_atom(global, ATOM_STR("\xA", "econnreset")) : posix_errno_to_term(err, global); + + if (err == ECONNRESET) { + AVM_LOGI(TAG, "Peer closed connection."); + } else { + AVM_LOGE(TAG, "Unable to read data on socket %i. errno=%i", rsrc_obj->fd, errno); } - return make_error_tuple(CLOSED_ATOM, ctx); + return make_error_tuple(reason, ctx); + } else if (res == 0) { + TRACE("Peer closed socket %i.\n", rsrc_obj->fd); + return make_error_tuple(CLOSED_ATOM, ctx); } else { size_t len = (size_t) res; - ssize_t buffer_size = MIN(len, DEFAULT_BUFFER_SIZE); + TRACE("otp_socket.recv_handler: received data on fd: %i len=%lu\n", rsrc_obj->fd, (unsigned long) len); - // {ok, Data :: binary()}} - size_t ensure_packet_avail = term_binary_data_size_in_terms(buffer_size) + BINARY_HEADER_SIZE; - size_t requested_size = ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + term_map_size_in_terms(2)) : 0); + size_t ensure_packet_avail = term_binary_data_size_in_terms(len) + BINARY_HEADER_SIZE; + size_t requested_size = REF_SIZE + 2 * TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + term_map_size_in_terms(2)) : 0); if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - term data = term_create_uninitialized_binary(buffer_size, &ctx->heap, global); - const char *buffer = term_binary_data(data); + term data = term_from_literal_binary(buffer, len, &ctx->heap, global); - // - // receive data on the socket - // - struct sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - res = recvfrom(fd_obj->fd, (char *) buffer, buffer_size, flags, (struct sockaddr *) &addr, &addrlen); - - TRACE("otp_socket.recv_handler: received data on fd: %i len=%lu\n", fd_obj->fd, (unsigned long) len); - - term payload = term_invalid_term(); if (is_recvfrom) { term address = socket_tuple_from_addr(ctx, ntohl(addr.sin_addr.s_addr)); term port_number = term_from_int(ntohs(addr.sin_port)); @@ -953,137 +1605,221 @@ static term nif_socket_recv_with_peek(Context *ctx, int argc, term argv[], bool } else { payload = port_heap_create_ok_tuple(&ctx->heap, data); } - - return payload; } - } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + + free(buffer); + return payload; } } -static term nif_socket_recv_without_peek(Context *ctx, int argc, term argv[], bool is_recvfrom) +#elif OTP_SOCKET_LWIP +static size_t copy_pbuf_data(struct pbuf *src, size_t offset, size_t count, uint8_t *dst) { - TRACE("nif_socket_recv_without_peek\n"); - UNUSED(argc); - - VALIDATE_VALUE(argv[0], term_is_socket); + size_t copied = 0; + while (count > 0 && src != NULL) { + if (offset > src->len) { + offset -= src->len; + src = src->next; + continue; + } + size_t chunk_count = MIN(count, src->len - offset); + memcpy(dst, src->payload, chunk_count); + count -= chunk_count; + copied += chunk_count; + dst += chunk_count; + src = src->next; + } + return copied; +} +static term nif_socket_recv_lwip(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) +{ GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { - RAISE_ERROR(BADARG_ATOM); + size_t buffer_size = 0; + bool closed = false; + err_t err = ERR_OK; + // Use lwIP lock + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateTCP) { + // TCP: we return up to len bytes or all available (if len == 0) + struct ListHead *item; + LIST_FOR_EACH (item, &rsrc_obj->tcp_received_list) { + struct TCPReceivedItem *received_item = GET_LIST_ENTRY(item, struct TCPReceivedItem, list_head); + if (received_item->buf == NULL || received_item->err != ERR_OK) { + closed = received_item->buf == NULL; + err = received_item->err; + break; + } else { + buffer_size += received_item->buf->tot_len; + if (len > 0 && buffer_size >= len) { + buffer_size = len; + break; + } + } + } + } else { + // UDP: we return the first message and truncate it to len if len != 0 + if (!list_is_empty(&rsrc_obj->udp_received_list)) { + struct UDPReceivedItem *first_item = CONTAINER_OF(list_first(&rsrc_obj->udp_received_list), struct UDPReceivedItem, list_head); + buffer_size = first_item->buf->tot_len; + if (len > 0 && buffer_size > len) { + buffer_size = len; + } + } } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - - // TODO plumb through buffer size - size_t buffer_size = DEFAULT_BUFFER_SIZE; - char *buffer = malloc(buffer_size); - term payload = term_invalid_term(); - - if (IS_NULL_PTR(buffer)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); + LWIP_END(); - } else { - - int flags = 0; - struct sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - ssize_t res = recvfrom(fd_obj->fd, (char *) buffer, buffer_size, flags, (struct sockaddr *) &addr, &addrlen); - - if (res < 0) { + // If we have no data, return EAGAIN or closed or the error. + if (buffer_size == 0) { + if (closed) { + return make_error_tuple(CLOSED_ATOM, ctx); + } + if (err != ERR_OK) { + return make_error_tuple(term_from_int(err), ctx); + } + return make_error_tuple(posix_errno_to_term(EAGAIN, global), ctx); + } - int err = errno; - term reason = (err == ECONNRESET) ? globalcontext_make_atom(global, ATOM_STR("\xA", "econnreset")) : posix_errno_to_term(err, global); + size_t ensure_packet_avail = term_binary_data_size_in_terms(len) + BINARY_HEADER_SIZE; + size_t requested_size = REF_SIZE + 2 * TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + term_map_size_in_terms(2)) : 0); + if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } - if (err == ECONNRESET) { - TRACE("Peer closed connection.\n"); + term data = term_create_uninitialized_binary(buffer_size, &ctx->heap, global); + uint8_t *ptr = (uint8_t *) term_binary_data(data); + size_t remaining = buffer_size; + + uint32_t ip4_u32; + uint16_t port_u16; + + // Use lwIP lock + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateTCP) { + size_t pos = rsrc_obj->pos; + struct ListHead *item; + struct ListHead *tmp; + MUTABLE_LIST_FOR_EACH (item, tmp, &rsrc_obj->tcp_received_list) { + struct TCPReceivedItem *received_item = GET_LIST_ENTRY(item, struct TCPReceivedItem, list_head); + if (received_item->buf == NULL || received_item->err != ERR_OK) { + break; + } + if (pos < received_item->buf->tot_len) { + size_t copied = copy_pbuf_data(received_item->buf, pos, remaining, ptr); + ptr += copied; + tcp_recved(rsrc_obj->tcp_pcb, copied); + if (copied + pos == received_item->buf->tot_len) { + // all data was copied. + list_remove(item); + pbuf_free(received_item->buf); + pos = 0; } else { - AVM_LOGE(TAG, "Unable to read data on socket %i. errno=%i", fd_obj->fd, errno); + pos = pos + copied; } - - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } - - return make_error_tuple(reason, ctx); - - } else if (res == 0) { - - TRACE("Peer closed socket %i.\n", fd_obj->fd); - - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); + if (remaining == 0) { + break; } - - return make_error_tuple(CLOSED_ATOM, ctx); - } else { + pos -= received_item->buf->tot_len; + } + } + rsrc_obj->pos = pos; + if (is_recvfrom) { + ip4_u32 = ntohl(ip_addr_get_ip4_u32(&rsrc_obj->tcp_pcb->remote_ip)); + port_u16 = rsrc_obj->tcp_pcb->remote_port; + } + } else { + struct UDPReceivedItem *first_item = CONTAINER_OF(list_first(&rsrc_obj->udp_received_list), struct UDPReceivedItem, list_head); + copy_pbuf_data(first_item->buf, 0, remaining, ptr); + if (is_recvfrom) { + ip4_u32 = ntohl(ip_addr_get_ip4_u32(first_item->addr)); + port_u16 = first_item->port; + } + list_remove(&first_item->list_head); + pbuf_free(first_item->buf); + free(first_item); + } + LWIP_END(); - size_t len = (size_t) res; - TRACE("otp_socket.recv_handler: received data on fd: %i len=%lu\n", fd_obj->fd, (unsigned long) len); - - size_t ensure_packet_avail = term_binary_data_size_in_terms(len) + BINARY_HEADER_SIZE; - size_t requested_size = REF_SIZE + 2 * TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + term_map_size_in_terms(2)) : 0); - - if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } - - term data = term_from_literal_binary(buffer, len, &ctx->heap, global); - - if (is_recvfrom) { - term address = socket_tuple_from_addr(ctx, ntohl(addr.sin_addr.s_addr)); - term port_number = term_from_int(ntohs(addr.sin_port)); + term payload; - term map = term_alloc_map(2, &ctx->heap); - term_set_map_assoc(map, 0, ADDR_ATOM, address); - term_set_map_assoc(map, 1, PORT_ATOM, port_number); - term tuple = port_heap_create_tuple2(&ctx->heap, map, data); - payload = port_heap_create_ok_tuple(&ctx->heap, tuple); - } else { - payload = port_heap_create_ok_tuple(&ctx->heap, data); - } - } + if (is_recvfrom) { + term address = socket_tuple_from_addr(ctx, ip4_u32); + term port_number = term_from_int(port_u16); - free(buffer); - return payload; - } + term map = term_alloc_map(2, &ctx->heap); + term_set_map_assoc(map, 0, ADDR_ATOM, address); + term_set_map_assoc(map, 1, PORT_ATOM, port_number); + term tuple = port_heap_create_tuple2(&ctx->heap, map, data); + payload = port_heap_create_ok_tuple(&ctx->heap, tuple); } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + payload = port_heap_create_ok_tuple(&ctx->heap, data); } + + return payload; } +#endif -static term nif_socket_recv_internal(Context *ctx, int argc, term argv[], bool is_recvfrom) +static term nif_socket_recv_internal(Context *ctx, term argv[], bool is_recvfrom) { + VALIDATE_VALUE(argv[0], term_is_socket); + VALIDATE_VALUE(argv[1], term_is_integer); + avm_int_t len = term_to_int(argv[1]); + // We raise badarg but return error tuples for POSIX errors + if (UNLIKELY(len < 0)) { + RAISE_ERROR(BADARG_ATOM); + } + + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { + return make_error_tuple(posix_errno_to_term(EINVAL, ctx->global), ctx); + } + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { + return make_error_tuple(posix_errno_to_term(EBADF, ctx->global), ctx); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state & SocketStateClosed) { + return make_error_tuple(posix_errno_to_term(EBADF, ctx->global), ctx); + } + if (rsrc_obj->socket_state & SocketStateListening) { + return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, ctx->global), ctx); + } +#endif + +#if OTP_SOCKET_BSD if (otp_socket_platform_supports_peek()) { - return nif_socket_recv_with_peek(ctx, argc, argv, is_recvfrom); + return nif_socket_recv_with_peek(ctx, rsrc_obj, len, is_recvfrom); } else { - return nif_socket_recv_without_peek(ctx, argc, argv, is_recvfrom); + return nif_socket_recv_without_peek(ctx, rsrc_obj, len, is_recvfrom); } +#elif OTP_SOCKET_LWIP + return nif_socket_recv_lwip(ctx, rsrc_obj, len, is_recvfrom); +#endif } static term nif_socket_recv(Context *ctx, int argc, term argv[]) { + UNUSED(argc); + TRACE("nif_socket_recv\n"); - return nif_socket_recv_internal(ctx, argc, argv, false); + return nif_socket_recv_internal(ctx, argv, false); } static term nif_socket_recvfrom(Context *ctx, int argc, term argv[]) { + UNUSED(argc); + TRACE("nif_socket_recvfrom\n"); - return nif_socket_recv_internal(ctx, argc, argv, true); + return nif_socket_recv_internal(ctx, argv, true); } // // send/sendto // - static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool is_sendto) { TRACE("nif_socket_send_internal\n"); @@ -1094,79 +1830,143 @@ static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool i GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; - term data = argv[1]; - term dest = term_invalid_term(); - if (is_sendto) { - dest = argv[2]; - } - - // TODO make non-blocking +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } + if (rsrc_obj->socket_state & SocketStateListening) { + return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); + } +#endif - const char *buf = term_binary_data(data); - size_t len = term_binary_size(data); + term data = argv[1]; + term dest = term_invalid_term(); + if (is_sendto) { + dest = argv[2]; + } - ssize_t sent_data = -1; + const char *buf = term_binary_data(data); + size_t len = term_binary_size(data); - if (is_sendto) { + ssize_t sent_data = -1; - struct sockaddr_in destaddr; - memset(&destaddr, 0, sizeof(destaddr)); - destaddr.sin_family = AF_INET; +#if OTP_SOCKET_BSD + // TODO make non-blocking - term port = interop_kv_get_value_default(dest, port_atom, term_from_int(0), global); - destaddr.sin_port = htons(term_to_int(port)); - term addr = interop_kv_get_value(dest, addr_atom, ctx->global); - if (globalcontext_is_term_equal_to_atom_string(global, addr, loopback_atom)) { - destaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - } else { - destaddr.sin_addr.s_addr = htonl(socket_tuple_to_addr(addr)); - } + if (is_sendto) { - sent_data = sendto(fd_obj->fd, buf, len, 0, (struct sockaddr *) &destaddr, sizeof(destaddr)); + struct sockaddr_in destaddr; + memset(&destaddr, 0, sizeof(destaddr)); + destaddr.sin_family = AF_INET; + term port = interop_kv_get_value_default(dest, port_atom, term_from_int(0), global); + destaddr.sin_port = htons(term_to_int(port)); + term addr = interop_kv_get_value(dest, addr_atom, ctx->global); + if (globalcontext_is_term_equal_to_atom_string(global, addr, loopback_atom)) { + destaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); } else { - sent_data = send(fd_obj->fd, buf, len, 0); + destaddr.sin_addr.s_addr = htonl(socket_tuple_to_addr(addr)); } - // {ok, RestData} | {error, Reason} + sent_data = sendto(rsrc_obj->fd, buf, len, 0, (struct sockaddr *) &destaddr, sizeof(destaddr)); - if (sent_data > 0) { + } else { + sent_data = send(rsrc_obj->fd, buf, len, 0); + } - // assert sent_data < len - size_t rest_len = len - sent_data; - if (rest_len == 0) { - return OK_ATOM; - } +#elif OTP_SOCKET_LWIP + err_t err; + ip_addr_t ip_addr; + uint16_t port_u16; + if (is_sendto) { - size_t requested_size = term_sub_binary_heap_size(data, rest_len); - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + requested_size) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); + term port_term = interop_kv_get_value_default(dest, port_atom, term_from_int(0), global); + avm_int_t port_number = term_to_int(port_term); + if (port_number < 0 || port_number > 65535) { + RAISE_ERROR(BADARG_ATOM); + } + port_u16 = (uint16_t) port_number; + term addr = interop_kv_get_value(dest, addr_atom, ctx->global); + if (globalcontext_is_term_equal_to_atom_string(global, addr, loopback_atom)) { + ip_addr_set_loopback(false, &ip_addr); + } else { + ip_addr_set_ip4_u32(&ip_addr, htonl(socket_tuple_to_addr(addr))); + } + } + + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateUDP) { + struct pbuf *p = pbuf_alloc(PBUF_TRANSPORT, len, PBUF_RAM); + char *bytes = (char *) p->payload; + memcpy(bytes, buf, len); + if (is_sendto) { + err = udp_sendto(rsrc_obj->udp_pcb, p, &ip_addr, port_u16); + } else { + err = udp_send(rsrc_obj->udp_pcb, p); + } + if (err == ERR_OK) { + sent_data = len; + } + pbuf_free(p); + } else { + // If the socket is connection-mode, dest_addr shall be ignored. + // Because we are copying, we cannot really send tcp_sndbuf(rsrc_obj->tcp_pcb) at once + size_t buflen = MIN(len, TCP_MSS); + int flags = TCP_WRITE_FLAG_COPY; + if (buflen < len) { + flags |= TCP_WRITE_FLAG_MORE; + } + err = tcp_write(rsrc_obj->tcp_pcb, buf, buflen, flags); + if (err == ERR_MEM) { + sent_data = 0; + } else { + if (err == ERR_OK) { + err = tcp_output(rsrc_obj->tcp_pcb); + if (err == ERR_OK) { + sent_data = buflen; + } } + } + } + LWIP_END(); - term rest = term_maybe_create_sub_binary(data, sent_data, rest_len, &ctx->heap, ctx->global); - return port_create_tuple2(ctx, OK_ATOM, rest); +#endif + // {ok, RestData} | {error, Reason} - } else { + size_t rest_len = len - sent_data; + if (rest_len == 0) { + return OK_ATOM; + } else if (sent_data > 0) { - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } + size_t requested_size = term_sub_binary_heap_size(data, rest_len); + if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(2) + requested_size, 1, &data, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } - TRACE("Unable to send data: res=%zi.\n", sent_data); - return make_error_tuple(CLOSED_ATOM, ctx); + term rest = term_maybe_create_sub_binary(data, sent_data, rest_len, &ctx->heap, ctx->global); + return port_create_tuple2(ctx, OK_ATOM, rest); + + } else if (sent_data == 0) { + if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(2), 1, &data, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); } + return port_create_tuple2(ctx, OK_ATOM, data); } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + AVM_LOGE(TAG, "Unable to send data: res=%zi.", sent_data); + return make_error_tuple(CLOSED_ATOM, ctx); } } @@ -1186,6 +1986,51 @@ static term nif_socket_sendto(Context *ctx, int argc, term argv[]) // connect // +#if OTP_SOCKET_LWIP +static err_t tcp_connected_cb(void *arg, struct tcp_pcb *tpcb, err_t err) +{ + UNUSED(tpcb); + + struct SocketResource *rsrc_obj = (struct SocketResource *) arg; + struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj); + GlobalContext *global = rsrc_refc->resource_type->global; + int32_t target_pid = rsrc_obj->selecting_process_id; + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + if (target_pid != INVALID_PROCESS_ID) { + if (err == ERR_OK) { +#if OTP_SOCKET_LWIP_ISR + globalcontext_send_message_from_isr(global, target_pid, TrapAnswerSignal, OK_ATOM); +#else + Context *target_ctx = globalcontext_get_process_lock(global, target_pid); + if (target_ctx) { + mailbox_send_term_signal(target_ctx, TrapAnswerSignal, OK_ATOM); + globalcontext_get_process_unlock(global, target_ctx); + } +#endif + } else { + BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(2), heap); + { + term result = term_alloc_tuple(2, &heap); + term_put_tuple_element(result, 0, ERROR_ATOM); + // TODO: We may want to interpret err + term_put_tuple_element(result, 1, CLOSED_ATOM); +#if OTP_SOCKET_LWIP_ISR + globalcontext_send_message_from_isr(global, target_pid, TrapAnswerSignal, result); +#else + Context *target_ctx = globalcontext_get_process_lock(global, target_pid); + if (target_ctx) { + mailbox_send_term_signal(target_ctx, TrapAnswerSignal, result); + globalcontext_get_process_unlock(global, target_ctx); + } +#endif + } + END_WITH_STACK_HEAP(heap, global); + } + } // else: sender died + return ERR_OK; +} +#endif + static term nif_socket_connect(Context *ctx, int argc, term argv[]) { TRACE("nif_socket_connect\n"); @@ -1196,59 +2041,93 @@ static term nif_socket_connect(Context *ctx, int argc, term argv[]) GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; - term sockaddr = argv[1]; - struct sockaddr_in address; - memset(&address, 0, sizeof(struct sockaddr_in)); - address.sin_family = AF_INET; + term sockaddr = argv[1]; + term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), ctx->global); + term addr = interop_kv_get_value(sockaddr, addr_atom, ctx->global); + if (term_is_invalid_term(addr)) { + RAISE_ERROR(BADARG_ATOM); + } + avm_int_t port_number = term_to_int(port); + if (port_number < 0 || port_number > 65535) { + RAISE_ERROR(BADARG_ATOM); + } - term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), ctx->global); - address.sin_port = htons(term_to_int(port)); +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } + if (((rsrc_obj->socket_state & SocketStateTCPListening) == SocketStateTCPListening) + || ((rsrc_obj->socket_state & SocketStateTCPConnected) == SocketStateTCPConnected)) { + return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); + } +#endif - term addr = interop_kv_get_value(sockaddr, addr_atom, ctx->global); - if (term_is_invalid_term(addr)) { - RAISE_ERROR(BADARG_ATOM); - } - if (globalcontext_is_term_equal_to_atom_string(global, addr, loopback_atom)) { - address.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - } else { - // TODO more validation on addr tuple - address.sin_addr.s_addr = htonl(socket_tuple_to_addr(addr)); - } +#if OTP_SOCKET_BSD + struct sockaddr_in address; + memset(&address, 0, sizeof(struct sockaddr_in)); + address.sin_family = AF_INET; - socklen_t addr_len = sizeof(struct sockaddr_in); - int res = connect(fd_obj->fd, (const struct sockaddr *) &address, addr_len); - if (res == -1) { - if (errno == EINPROGRESS) { + address.sin_port = htons(port_number); - // TODO make connect non-blocking - return UNDEFINED_ATOM; + if (globalcontext_is_term_equal_to_atom_string(global, addr, loopback_atom)) { + address.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + } else { + // TODO more validation on addr tuple + address.sin_addr.s_addr = htonl(socket_tuple_to_addr(addr)); + } - } else { - AVM_LOGE(TAG, "Unable to connect: res=%i errno=%i", res, errno); + socklen_t addr_len = sizeof(struct sockaddr_in); + int res = connect(rsrc_obj->fd, (const struct sockaddr *) &address, addr_len); + if (res == -1) { + if (errno == EINPROGRESS) { - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } + // TODO make connect non-blocking + return UNDEFINED_ATOM; - return make_error_tuple(CLOSED_ATOM, ctx); - } - } else if (res == 0) { - return OK_ATOM; } else { - // won't happen according to connect(2) - return UNDEFINED_ATOM; + AVM_LOGE(TAG, "Unable to connect: res=%i errno=%i", res, errno); + return make_error_tuple(CLOSED_ATOM, ctx); } + } else if (res == 0) { + return OK_ATOM; } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + // won't happen according to connect(2) + return UNDEFINED_ATOM; + } +#elif OTP_SOCKET_LWIP + ip_addr_t ip_addr; + ip_addr_set_ip4_u32(&ip_addr, htonl(socket_tuple_to_addr(addr))); + err_t err; + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateUDP) { + err = udp_connect(rsrc_obj->udp_pcb, &ip_addr, port_number); + } else { + err = tcp_connect(rsrc_obj->tcp_pcb, &ip_addr, port_number, tcp_connected_cb); + } + LWIP_END(); + + if (UNLIKELY(err != ERR_OK)) { + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + if (rsrc_obj->socket_state & SocketStateUDP) { + return OK_ATOM; + } else { + rsrc_obj->selecting_process_id = ctx->process_id; + // Trap caller waiting for completion + context_update_flags(ctx, ~NoFlags, Trap); + return term_invalid_term(); } +#endif } // @@ -1265,49 +2144,63 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[]) GlobalContext *global = ctx->global; - void *fd_obj_ptr; - if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &fd_obj_ptr))) { + void *rsrc_obj_ptr; + if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), get_socket(argv[0]), socket_resource_type, &rsrc_obj_ptr))) { RAISE_ERROR(BADARG_ATOM); } - struct SocketFd *fd_obj = (struct SocketFd *) fd_obj_ptr; - if (fd_obj->fd) { - - int how; + struct SocketResource *rsrc_obj = (struct SocketResource *) rsrc_obj_ptr; - int val = interop_atom_term_select_int(otp_socket_shutdown_direction_table, argv[1], global); - switch (val) { + int how; + int val = interop_atom_term_select_int(otp_socket_shutdown_direction_table, argv[1], global); + switch (val) { - case OtpSocketReadShutdownDirection: - how = SHUT_RD; - break; + case OtpSocketReadShutdownDirection: + how = SHUT_RD; + break; - case OtpSocketWriteShutdownDirection: - how = SHUT_WR; - break; + case OtpSocketWriteShutdownDirection: + how = SHUT_WR; + break; - case OtpSocketReadWriteShutdownDirection: - how = SHUT_RDWR; - break; + case OtpSocketReadWriteShutdownDirection: + how = SHUT_RDWR; + break; - default: - RAISE_ERROR(BADARG_ATOM); - } + default: + RAISE_ERROR(BADARG_ATOM); + } - int res = shutdown(fd_obj->fd, how); - if (res < 0) { - AVM_LOGE(TAG, "Unable to shut down socket: res=%i errno=%i", res, errno); +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { +#endif + return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + } - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2)) != MEMORY_GC_OK)) { - AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); - RAISE_ERROR(OUT_OF_MEMORY_ATOM); - } + term result = OK_ATOM; - return make_error_tuple(posix_errno_to_term(errno, global), ctx); +#if OTP_SOCKET_BSD + int res = shutdown(rsrc_obj->fd, how); + if (res < 0) { + AVM_LOGE(TAG, "Unable to shut down socket: res=%i errno=%i", res, errno); + return make_errno_tuple(ctx); + } +#elif OTP_SOCKET_LWIP + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateTCP) { + err_t res = tcp_shutdown(rsrc_obj->tcp_pcb, how != SHUT_WR, how != SHUT_RD); + if (how == SHUT_RDWR && res == ERR_OK) { + rsrc_obj->tcp_pcb = NULL; + } + if (res != ERR_OK) { + AVM_LOGE(TAG, "Unable to shut down socket: res=%i", res); + result = make_lwip_err_tuple(res, ctx); } - return OK_ATOM; - } else { - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } + LWIP_END(); +#endif + return result; } // @@ -1424,11 +2317,11 @@ const struct Nif *otp_socket_nif_get_nif(const char *nifname) TRACE("Resolved platform nif %s ...\n", nifname); return &socket_accept_nif; } - if (strcmp("nif_recv/1", rest) == 0) { + if (strcmp("nif_recv/2", rest) == 0) { TRACE("Resolved platform nif %s ...\n", nifname); return &socket_recv_nif; } - if (strcmp("nif_recvfrom/1", rest) == 0) { + if (strcmp("nif_recvfrom/2", rest) == 0) { TRACE("Resolved platform nif %s ...\n", nifname); return &socket_recvfrom_nif; } diff --git a/src/libAtomVM/otp_socket.h b/src/libAtomVM/otp_socket.h index 417cb44fcb..18c020dbfe 100644 --- a/src/libAtomVM/otp_socket.h +++ b/src/libAtomVM/otp_socket.h @@ -28,6 +28,14 @@ extern "C" { #include #include +#if HAVE_SOCKET && HAVE_SELECT +#define OTP_SOCKET_BSD 1 +#elif HAVE_LWIP_RAW +#define OTP_SOCKET_LWIP 1 +#else +#error OTP Socket requires BSD Socket or lwIP +#endif + const struct Nif *otp_socket_nif_get_nif(const char *nifname); void otp_socket_init(GlobalContext *global); diff --git a/src/libAtomVM/posix_nifs.c b/src/libAtomVM/posix_nifs.c index 4e7e690425..07881e0ff0 100644 --- a/src/libAtomVM/posix_nifs.c +++ b/src/libAtomVM/posix_nifs.c @@ -112,6 +112,12 @@ term posix_errno_to_term(int err, GlobalContext *glb) return globalcontext_make_atom(glb, ATOM_STR("\x5", "esrch")); case EXDEV: return globalcontext_make_atom(glb, ATOM_STR("\x5", "exdev")); + case EPROTOTYPE: + return globalcontext_make_atom(glb, ATOM_STR("\x8", "eprototype")); + case ENOTCONN: + return globalcontext_make_atom(glb, ATOM_STR("\x8", "enotconn")); + case EOPNOTSUPP: + return globalcontext_make_atom(glb, ATOM_STR("\xA", "eopnotsupp")); } #else UNUSED(glb); diff --git a/src/libAtomVM/resources.c b/src/libAtomVM/resources.c index 51d94160d0..5dcbcd0dee 100644 --- a/src/libAtomVM/resources.c +++ b/src/libAtomVM/resources.c @@ -205,7 +205,7 @@ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, static void select_event_send_notification(struct SelectEvent *select_event, bool is_write, GlobalContext *global) { - BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(4) + REF_SIZE + TERM_BOXED_REFC_BINARY_SIZE, heap) + BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(4) + REF_SIZE + TERM_BOXED_RESOURCE_SIZE, heap) term notification = term_alloc_tuple(4, &heap); term_put_tuple_element(notification, 0, SELECT_ATOM); term_put_tuple_element(notification, 1, term_from_resource(select_event->resource->data, &heap)); diff --git a/src/libAtomVM/term.h b/src/libAtomVM/term.h index f4286ed168..af0613abd3 100644 --- a/src/libAtomVM/term.h +++ b/src/libAtomVM/term.h @@ -1040,6 +1040,9 @@ static inline term term_maybe_create_sub_binary(term binary, size_t offset, size { if (term_is_refc_binary(binary) && len >= SUB_BINARY_MIN) { return term_alloc_sub_binary(binary, offset, len, heap); + } else if (term_is_sub_binary(binary) && len >= SUB_BINARY_MIN) { + const term *boxed_value = term_to_const_term_ptr(binary); + return term_alloc_sub_binary(boxed_value[3], boxed_value[2] + offset, len, heap); } else { const char *data = term_binary_data(binary); return term_from_literal_binary(data + offset, len, heap, glb); diff --git a/src/platforms/rp2040/src/lib/CMakeLists.txt b/src/platforms/rp2040/src/lib/CMakeLists.txt index 2789c6a710..8fa7b034ad 100644 --- a/src/platforms/rp2040/src/lib/CMakeLists.txt +++ b/src/platforms/rp2040/src/lib/CMakeLists.txt @@ -76,8 +76,16 @@ if (PICO_CYW43_SUPPORTED) target_include_directories(pan_lwip_dhserver INTERFACE ${BTSTACK_3RD_PARTY_PATH}/lwip/dhcp-server ) + target_compile_options(libAtomVM${PLATFORM_LIB_SUFFIX} PRIVATE -DHAVE_LWIP_RAW=1) + target_sources( + libAtomVM${PLATFORM_LIB_SUFFIX} + PRIVATE + otp_socket_platform.c + ../../../../libAtomVM/otp_socket.c + otp_socket_platform.h + ../../../../libAtomVM/otp_socket.h) target_link_libraries(libAtomVM${PLATFORM_LIB_SUFFIX} PUBLIC pico_cyw43_arch_lwip_threadsafe_background pico_lwip_sntp INTERFACE pan_lwip_dhserver) - target_link_options(libAtomVM${PLATFORM_LIB_SUFFIX} PUBLIC "SHELL:-Wl,-u -Wl,networkregister_port_driver") + target_link_options(libAtomVM${PLATFORM_LIB_SUFFIX} PUBLIC "SHELL:-Wl,-u -Wl,networkregister_port_driver -Wl,-u -Wl,otp_socket_nif") endif() target_link_options(libAtomVM${PLATFORM_LIB_SUFFIX} PUBLIC "SHELL:-Wl,-u -Wl,gpio_nif") diff --git a/src/platforms/rp2040/src/lib/otp_socket_platform.c b/src/platforms/rp2040/src/lib/otp_socket_platform.c new file mode 100644 index 0000000000..81cfc0ea26 --- /dev/null +++ b/src/platforms/rp2040/src/lib/otp_socket_platform.c @@ -0,0 +1,29 @@ +/* + * This file is part of AtomVM. + * + * Copyright 2023 by Fred Dushin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + */ + +#ifdef LIB_PICO_CYW43_ARCH + +#include +#include +#include + +REGISTER_NIF_COLLECTION(otp_socket, NULL, NULL, otp_socket_nif_get_nif) + +#endif diff --git a/src/platforms/rp2040/src/lib/otp_socket_platform.h b/src/platforms/rp2040/src/lib/otp_socket_platform.h new file mode 100644 index 0000000000..2f17a66ea6 --- /dev/null +++ b/src/platforms/rp2040/src/lib/otp_socket_platform.h @@ -0,0 +1,52 @@ +/* + * This file is part of AtomVM. + * + * Copyright 2023 by Fred Dushin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + */ + +#ifndef __OTP_SOCKET_PLATFORM_H__ +#define __OTP_SOCKET_PLATFORM_H__ + +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpedantic" + +#include + +#pragma GCC diagnostic pop + +#define TAG "otp_socket" + +#define AVM_LOGI(tag, format, ...) \ + printf("I %s: " format " (%s:%i)\n", tag, ##__VA_ARGS__, __FILE__, __LINE__); + +#define AVM_LOGW(tag, format, ...) \ + printf("W %s: " format " (%s:%i)\n", tag, ##__VA_ARGS__, __FILE__, __LINE__); + +#define AVM_LOGE(tag, format, ...) \ + printf("E %s: " format " (%s:%i)\n", tag, ##__VA_ARGS__, __FILE__, __LINE__); + +// Define macros to lock/unlock lwIP +#define LWIP_BEGIN() cyw43_arch_lwip_begin() +#define LWIP_END() cyw43_arch_lwip_end() + +// lwIP is based on low-priority ISR +#define OTP_SOCKET_LWIP_ISR 1 + +#endif diff --git a/src/platforms/rp2040/src/lib/sys.c b/src/platforms/rp2040/src/lib/sys.c index 3d9ff5f48c..81496d63ab 100644 --- a/src/platforms/rp2040/src/lib/sys.c +++ b/src/platforms/rp2040/src/lib/sys.c @@ -38,6 +38,10 @@ #pragma GCC diagnostic pop +#ifdef LIB_PICO_CYW43_ARCH +#include +#endif + // libAtomVM #include #include @@ -62,6 +66,7 @@ void sys_init_platform(GlobalContext *glb) #ifdef LIB_PICO_CYW43_ARCH cyw43_arch_init(); + otp_socket_init(glb); #endif }