diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b04dc9f86..eb33358693 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added a limited implementation of the OTP `ets` interface - Added `code:all_loaded/0` and `code:all_available/0` - Added `erlang:split_binary/2` +- Added support for socket asynchronous API for `recv` and `recvfrom`. ## [0.6.6] - Unreleased diff --git a/libs/estdlib/src/gen_tcp_socket.erl b/libs/estdlib/src/gen_tcp_socket.erl index 6d808c417d..86079a42e4 100644 --- a/libs/estdlib/src/gen_tcp_socket.erl +++ b/libs/estdlib/src/gen_tcp_socket.erl @@ -339,10 +339,7 @@ handle_cast(_Request, State) -> {noreply, State}. %% @hidden -handle_info({select, _Socket, Ref, ready_input}, State) -> - ?LOG_DEBUG("handle_info [~p], ~p]", [ - {select, _Socket, Ref, ready_input}, State - ]), +handle_info({'$socket', _Socket, select, Ref}, State) -> %% TODO cancel timer case maps:get(Ref, State#state.pending_selects, undefined) of undefined -> @@ -366,6 +363,28 @@ handle_info({select, _Socket, Ref, ready_input}, State) -> pending_selects = maps:remove(Ref, State#state.pending_selects) }} end; +handle_info({'$socket', Socket, abort, {Ref, closed}}, State) -> + %% TODO cancel timer + case maps:get(Ref, State#state.pending_selects, undefined) of + undefined -> + ?LOG_WARNING("Unable to find select ref ~p in pending selects", [Ref]), + socket:nif_select_stop(Socket), + {noreply, State}; + {accept, From, _AcceptingProc, _Timeout} -> + socket:nif_select_stop(Socket), + gen_server:reply(From, {error, closed}), + {noreply, State}; + active -> + WrappedSocket = {?GEN_TCP_MONIKER, self(), ?MODULE}, + State#state.controlling_process ! {tcp_closed, WrappedSocket}, + {noreply, State}; + {passive, From, _Length, _Timeout} -> + socket:nif_select_stop(Socket), + gen_server:reply(From, {error, closed}), + {noreply, State#state{ + pending_selects = maps:remove(Ref, State#state.pending_selects) + }} + end; handle_info({timeout, Ref, From}, State) -> ?LOG_DEBUG("handle_info [~p], ~p]", [ {timeout, Ref, From}, State diff --git a/libs/estdlib/src/gen_udp_socket.erl b/libs/estdlib/src/gen_udp_socket.erl index 56aa27fe5e..c3a561e53e 100644 --- a/libs/estdlib/src/gen_udp_socket.erl +++ b/libs/estdlib/src/gen_udp_socket.erl @@ -242,7 +242,7 @@ handle_cast(_Request, State) -> {noreply, State}. %% @hidden -handle_info({select, _Socket, Ref, ready_input}, State) -> +handle_info({'$socket', _Socket, select, Ref}, State) -> case maps:get(Ref, State#state.pending_selects, undefined) of undefined -> ?LOG_INFO("Unable to find select ref ~p in pending selects", [Ref]), diff --git a/libs/estdlib/src/socket.erl b/libs/estdlib/src/socket.erl index c7f95c8079..0de6087f96 100644 --- a/libs/estdlib/src/socket.erl +++ b/libs/estdlib/src/socket.erl @@ -38,6 +38,7 @@ send/2, sendto/3, setopt/3, + getopt/2, connect/2, shutdown/2 ]). @@ -66,7 +67,9 @@ -type in_addr() :: {0..255, 0..255, 0..255, 0..255}. -type port_number() :: 0..65535. --type socket_option() :: {socket, reuseaddr} | {socket, linger}. +-type socket_option() :: + {socket, reuseaddr | linger | type} + | {otp, recvbuf}. -export_type([ socket/0, @@ -242,7 +245,7 @@ accept(Socket, Timeout) -> case ?MODULE:nif_select_read(Socket, Ref) of ok -> receive - {select, _AcceptedSocket, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> case ?MODULE:nif_accept(Socket) of {error, closed} = E -> ?MODULE:nif_select_stop(Socket), @@ -250,14 +253,15 @@ accept(Socket, Timeout) -> R -> R end; - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> % socket was closed by another process % TODO: we need to handle: % (a) SELECT_STOP being scheduled - % (b) flush of messages as we can have both - % {closed, Ref} and {select, _, Ref, _} in the + % (b) flush of messages as we can have both in the % queue - {error, closed} + {error, closed}; + Other -> + {error, {accept, unexpected, Other, {'$socket', Socket, select, Ref}}} after Timeout -> {error, timeout} end; @@ -296,25 +300,60 @@ recv(Socket, Length) -> %% `{ok, Data} = socket:recv(ConnectedSocket)' %% @end %%----------------------------------------------------------------------------- --spec recv(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) -> - {ok, Data :: binary()} | {error, Reason :: term()}. +-spec recv( + Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout() | nowait | reference() +) -> + {ok, Data :: binary()} + | {select, {select_info, recvfrom, reference()}} + | {select, {{select_info, recvfrom, reference()}, Data :: binary()}} + | {error, Reason :: term()}. +recv(Socket, Length, 0) -> + recv0_noselect(Socket, Length); +recv(Socket, 0, Timeout) when is_integer(Timeout) orelse Timeout =:= infinity -> + recv0(Socket, 0, Timeout); +recv(Socket, Length, nowait) -> + recv0_nowait(Socket, Length, erlang:make_ref()); +recv(Socket, Length, Ref) when is_reference(Ref) -> + recv0_nowait(Socket, Length, Ref); recv(Socket, Length, Timeout) -> + case ?MODULE:getopt(Socket, {socket, type}) of + {ok, stream} when Timeout =/= infinity -> + recv0_r(Socket, Length, Timeout, erlang:system_time(millisecond) + Timeout, []); + {ok, stream} when Timeout =:= infinity -> + recv0_r(Socket, Length, Timeout, undefined, []); + _ -> + recv0(Socket, Length, Timeout) + end. + +recv0_noselect(Socket, Length) -> + case ?MODULE:nif_recv(Socket, Length) of + {error, _} = E -> + E; + {ok, Data} when Length =:= 0 orelse byte_size(Data) =:= Length -> + {ok, Data}; + {ok, Data} -> + case ?MODULE:getopt(Socket, {socket, type}) of + {ok, stream} -> + {error, {timeout, Data}}; + {ok, dgram} -> + {ok, Data} + end + end. + +recv0(Socket, Length, Timeout) -> Ref = erlang:make_ref(), - ?TRACE("select read for recv. self=~p ref=~p~n", [self(), Ref]), case ?MODULE:nif_select_read(Socket, Ref) of ok -> receive - {select, _AcceptedSocket, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> case ?MODULE:nif_recv(Socket, Length) of {error, _} = E -> ?MODULE:nif_select_stop(Socket), E; - % TODO: Assemble data to have more if Length > byte_size(Data) - % as long as timeout did not expire {ok, Data} -> {ok, Data} end; - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> % socket was closed by another process % TODO: see above in accept/2 {error, closed} @@ -325,6 +364,72 @@ recv(Socket, Length, Timeout) -> Error end. +recv0_nowait(Socket, Length, Ref) -> + case ?MODULE:nif_recv(Socket, Length) of + {error, timeout} -> + case ?MODULE:nif_select_read(Socket, Ref) of + ok -> + {select, {select_info, recv, Ref}}; + {error, _} = Error1 -> + Error1 + end; + {error, _} = E -> + E; + {ok, Data} when byte_size(Data) < Length -> + case ?MODULE:getopt(Socket, {socket, type}) of + {ok, stream} -> + case ?MODULE:nif_select_read(Socket, Ref) of + ok -> + {select, {{select_info, recv, Ref}, Data}}; + {error, _} = Error1 -> + Error1 + end; + {ok, dgram} -> + {ok, Data} + end; + {ok, Data} -> + {ok, Data} + end. + +recv0_r(Socket, Length, Timeout, EndQuery, Acc) -> + Ref = erlang:make_ref(), + case ?MODULE:nif_select_read(Socket, Ref) of + ok -> + receive + {'$socket', Socket, select, Ref} -> + case ?MODULE:nif_recv(Socket, Length) of + {error, _} = E -> + ?MODULE:nif_select_stop(Socket), + E; + {ok, Data} -> + NewAcc = [Data | Acc], + Remaining = Length - byte_size(Data), + case Remaining of + 0 -> + {ok, list_to_binary(lists:reverse(NewAcc))}; + _ -> + NewTimeout = + case Timeout of + infinity -> infinity; + _ -> EndQuery - erlang:system_time(millisecond) + end, + recv0_r(Socket, Remaining, NewTimeout, EndQuery, NewAcc) + end + end; + {'$socket', Socket, abort, {Ref, closed}} -> + % socket was closed by another process + % TODO: see above in accept/2 + {error, closed} + after Timeout -> + case Acc of + [] -> {error, timeout}; + _ -> {error, {timeout, list_to_binary(lists:reverse(Acc))}} + end + end; + {error, _Reason} = Error -> + Error + end. + %%----------------------------------------------------------------------------- %% @equiv socket:recvfrom(Socket, 0) %% @end @@ -367,25 +472,43 @@ recvfrom(Socket, Length) -> %% bytes are available and return these bytes. %% @end %%----------------------------------------------------------------------------- --spec recvfrom(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) -> - {ok, {Address :: sockaddr(), Data :: binary()}} | {error, Reason :: term()}. +-spec recvfrom( + Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout() | nowait | reference() +) -> + {ok, {Address :: sockaddr(), Data :: binary()}} + | {select, {select_info, recvfrom, reference()}} + | {error, Reason :: term()}. +recvfrom(Socket, Length, 0) -> + recvfrom0_noselect(Socket, Length); +recvfrom(Socket, Length, nowait) -> + recvfrom0_nowait(Socket, Length, erlang:make_ref()); +recvfrom(Socket, Length, Ref) when is_reference(Ref) -> + recvfrom0_nowait(Socket, Length, Ref); recvfrom(Socket, Length, Timeout) -> + recvfrom0(Socket, Length, Timeout). + +recvfrom0_noselect(Socket, Length) -> + case ?MODULE:nif_recvfrom(Socket, Length) of + {error, _} = E -> + E; + {ok, {_Address, _Data}} = Reply -> + Reply + end. + +recvfrom0(Socket, Length, Timeout) -> Ref = erlang:make_ref(), - ?TRACE("select read for recvfrom. self=~p ref=~p", [self(), Ref]), case ?MODULE:nif_select_read(Socket, Ref) of ok -> receive - {select, _AcceptedSocket, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> case ?MODULE:nif_recvfrom(Socket, Length) of {error, _} = E -> ?MODULE:nif_select_stop(Socket), E; - % TODO: Assemble data to have more if Length > byte_size(Data) - % as long as timeout did not expire - {ok, {Address, Data}} -> - {ok, {Address, Data}} + {ok, {_Address, _Data}} = Reply -> + Reply end; - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> % socket was closed by another process % TODO: see above in accept/2 {error, closed} @@ -396,6 +519,21 @@ recvfrom(Socket, Length, Timeout) -> Error end. +recvfrom0_nowait(Socket, Length, Ref) -> + case ?MODULE:nif_recvfrom(Socket, Length) of + {error, timeout} -> + case ?MODULE:nif_select_read(Socket, Ref) of + ok -> + {select, {select_info, recvfrom, Ref}}; + {error, _} = SelectError -> + SelectError + end; + {error, _} = RecvError -> + RecvError; + {ok, {_Address, _Data}} = Reply -> + Reply + end. + %%----------------------------------------------------------------------------- %% @param Socket the socket %% @param Data the data to send @@ -443,11 +581,32 @@ sendto(Socket, Data, Dest) when is_binary(Data) -> sendto(Socket, Data, Dest) -> ?MODULE:nif_sendto(Socket, erlang:iolist_to_binary(Data), Dest). +%%----------------------------------------------------------------------------- +%% @param Socket the socket +%% @param SocketOption the option +%% @returns `{ok, Value}' if successful; `{error, Reason}', otherwise. +%% @doc Get a socket option. +%% +%% Currently, the following options are supported: +%% +%% +%%
`{socket, type}'`type()'
+%% +%% Example: +%% +%% `{ok, stream} = socket:getopt(ListeningSocket, {socket, type})' +%% @end +%%----------------------------------------------------------------------------- +-spec getopt(Socket :: socket(), SocketOption :: socket_option()) -> + {ok, Value :: term()} | {error, Reason :: term()}. +getopt(_Socket, _SocketOption) -> + erlang:nif_error(undefined). + %%----------------------------------------------------------------------------- %% @param Socket the socket %% @param SocketOption the option %% @param Value the option value -%% @returns `{ok, Address}' if successful; `{error, Reason}', otherwise. +%% @returns `ok' if successful; `{error, Reason}', otherwise. %% @doc Set a socket option. %% %% Set an option on a socket. @@ -456,6 +615,7 @@ sendto(Socket, Data, Dest) -> %% %% %% +%% %%
`{socket, reuseaddr}'`boolean()'
`{socket, linger}'`#{onoff => boolean(), linger => non_neg_integer()}'
`{otp, recvbuf}'`non_neg_integer()'
%% %% Example: @@ -465,7 +625,7 @@ sendto(Socket, Data, Dest) -> %% @end %%----------------------------------------------------------------------------- -spec setopt(Socket :: socket(), SocketOption :: socket_option(), Value :: term()) -> - ok | {error, Reason :: term()}. + ok | {error, any()}. setopt(_Socket, _SocketOption, _Value) -> erlang:nif_error(undefined). diff --git a/libs/estdlib/src/ssl.erl b/libs/estdlib/src/ssl.erl index 95919765d1..46385b0662 100644 --- a/libs/estdlib/src/ssl.erl +++ b/libs/estdlib/src/ssl.erl @@ -189,9 +189,9 @@ handshake_loop(SSLContext, Socket) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> handshake_loop(SSLContext, Socket); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> ok = socket:close(Socket), {error, closed} end; @@ -242,9 +242,9 @@ close_notify_loop(SSLContext, Socket) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> close_notify_loop(SSLContext, Socket); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> ok = socket:close(Socket), {error, closed} end; @@ -274,9 +274,9 @@ send({SSLContext, Socket} = SSLSocket, Binary) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> send(SSLSocket, Binary); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> {error, closed} end; {error, _Reason} = Error -> @@ -309,9 +309,9 @@ recv0({SSLContext, Socket} = SSLSocket, Length, Remaining, Acc) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> recv0(SSLSocket, Length, Remaining, Acc); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> {error, closed} end; {error, _Reason} = Error -> diff --git a/src/libAtomVM/defaultatoms.c b/src/libAtomVM/defaultatoms.c index 228dd79fc6..4f71b36c43 100644 --- a/src/libAtomVM/defaultatoms.c +++ b/src/libAtomVM/defaultatoms.c @@ -165,6 +165,13 @@ static const char *const global_atom = "\x6" "global"; static const char *const nonode_at_nohost_atom = "\xD" "nonode@nohost"; static const char *const net_kernel_atom = "\xA" "net_kernel"; +static const char *const dollar_socket_atom = "\x7" "$socket"; +static const char *const abort_atom = "\x5" "abort"; +static const char *const family_atom = "\x6" "family"; +static const char *const inet_atom = "\x4" "inet"; +static const char *const timeout_atom = "\x7" "timeout"; + + void defaultatoms_init(GlobalContext *glb) { int ok = 1; @@ -314,6 +321,12 @@ void defaultatoms_init(GlobalContext *glb) ok &= globalcontext_insert_atom(glb, nonode_at_nohost_atom) == NONODE_AT_NOHOST_ATOM_INDEX; ok &= globalcontext_insert_atom(glb, net_kernel_atom) == NET_KERNEL_ATOM_INDEX; + ok &= globalcontext_insert_atom(glb, dollar_socket_atom) == DOLLAR_SOCKET_ATOM_INDEX; + ok &= globalcontext_insert_atom(glb, abort_atom) == ABORT_ATOM_INDEX; + ok &= globalcontext_insert_atom(glb, family_atom) == FAMILY_ATOM_INDEX; + ok &= globalcontext_insert_atom(glb, inet_atom) == INET_ATOM_INDEX; + ok &= globalcontext_insert_atom(glb, timeout_atom) == TIMEOUT_ATOM_INDEX; + if (!ok) { AVM_ABORT(); } diff --git a/src/libAtomVM/defaultatoms.h b/src/libAtomVM/defaultatoms.h index 75b3ac9a58..f5ebbc5517 100644 --- a/src/libAtomVM/defaultatoms.h +++ b/src/libAtomVM/defaultatoms.h @@ -174,7 +174,13 @@ extern "C" { #define NONODE_AT_NOHOST_ATOM_INDEX 112 #define NET_KERNEL_ATOM_INDEX 113 -#define PLATFORM_ATOMS_BASE_INDEX 114 +#define DOLLAR_SOCKET_ATOM_INDEX 114 +#define ABORT_ATOM_INDEX 115 +#define FAMILY_ATOM_INDEX 116 +#define INET_ATOM_INDEX 117 +#define TIMEOUT_ATOM_INDEX 118 + +#define PLATFORM_ATOMS_BASE_INDEX 119 #define FALSE_ATOM TERM_FROM_ATOM_INDEX(FALSE_ATOM_INDEX) #define TRUE_ATOM TERM_FROM_ATOM_INDEX(TRUE_ATOM_INDEX) @@ -323,6 +329,12 @@ extern "C" { #define NONODE_AT_NOHOST_ATOM TERM_FROM_ATOM_INDEX(NONODE_AT_NOHOST_ATOM_INDEX) #define NET_KERNEL_ATOM TERM_FROM_ATOM_INDEX(NET_KERNEL_ATOM_INDEX) +#define DOLLAR_SOCKET_ATOM TERM_FROM_ATOM_INDEX(DOLLAR_SOCKET_ATOM_INDEX) +#define ABORT_ATOM TERM_FROM_ATOM_INDEX(ABORT_ATOM_INDEX) +#define FAMILY_ATOM TERM_FROM_ATOM_INDEX(FAMILY_ATOM_INDEX) +#define INET_ATOM TERM_FROM_ATOM_INDEX(INET_ATOM_INDEX) +#define TIMEOUT_ATOM TERM_FROM_ATOM_INDEX(TIMEOUT_ATOM_INDEX) + void defaultatoms_init(GlobalContext *glb); void platform_defaultatoms_init(GlobalContext *glb); diff --git a/src/libAtomVM/erl_nif.h b/src/libAtomVM/erl_nif.h index 23d3f256a3..b13114d777 100644 --- a/src/libAtomVM/erl_nif.h +++ b/src/libAtomVM/erl_nif.h @@ -215,6 +215,32 @@ ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj); */ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref); +/** + * @brief Variant of `enif_select` where sent message is `msg` instead of default. + * + * @param env current environment + * @param event event object (typically a file descriptor) + * @param obj resource object working as a container of the event object. + * @param pid process id to send a message to or NULL to use the current process (from `env`) + * @param msg message to send (copied). + * @param msg_env must be NULL. + * @return a negative value on failure, 0 or flags on success. + */ +int enif_select_read(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env); + +/** + * @brief Variant of `enif_select` where sent message is `msg` instead of default. + * + * @param env current environment + * @param event event object (typically a file descriptor) + * @param obj resource object working as a container of the event object. + * @param pid process id to send a message to or NULL to use the current process (from `env`) + * @param msg message to send (copied). + * @param msg_env must be NULL. + * @return a negative value on failure, 0 or flags on success. + */ +int enif_select_write(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env); + /** * @brief Monitor a process by using a resource object. * @details The monitor is automatically removed after being triggered or if the diff --git a/src/libAtomVM/globalcontext.c b/src/libAtomVM/globalcontext.c index f100018283..b27eb127a0 100644 --- a/src/libAtomVM/globalcontext.c +++ b/src/libAtomVM/globalcontext.c @@ -334,6 +334,18 @@ bool globalcontext_process_exists(GlobalContext *glb, int32_t process_id) return p != NULL; } +enum SendMessageResult globalcontext_post_message(GlobalContext *glb, int32_t process_id, Message *m) +{ + Context *p = globalcontext_get_process_lock(glb, process_id); + enum SendMessageResult result = SEND_MESSAGE_PROCESS_NOT_FOUND; + if (p) { + mailbox_post_message(p, &m->base); + globalcontext_get_process_unlock(glb, p); + result = SEND_MESSAGE_OK; + } + return result; +} + void globalcontext_send_message(GlobalContext *glb, int32_t process_id, term t) { Context *p = globalcontext_get_process_lock(glb, process_id); @@ -352,15 +364,17 @@ void globalcontext_send_message_nolock(GlobalContext *glb, int32_t process_id, t } #ifdef AVM_TASK_DRIVER_ENABLED -void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id, enum MessageType type, term t) +static inline enum SendMessageResult globalcontext_send_message_from_task_common(GlobalContext *glb, int32_t process_id, MailboxMessage *message, enum MessageType type, term t) { - MailboxMessage *message = NULL; + enum SendMessageResult result = SEND_MESSAGE_PROCESS_NOT_FOUND; bool postponed = false; #ifndef AVM_NO_SMP Context *p = NULL; if (globalcontext_get_process_trylock(glb, process_id, &p)) { if (p) { - message = mailbox_message_create_from_term(type, t); + if (message == NULL) { + message = mailbox_message_create_from_term(type, t); + } // Ensure we can acquire the spinlock if (smp_spinlock_trylock(&glb->processes_spinlock)) { // We can send the message. @@ -371,6 +385,7 @@ void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id postponed = true; } globalcontext_get_process_unlock(glb, p); + result = SEND_MESSAGE_OK; } } else { postponed = true; @@ -397,7 +412,20 @@ void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id } while (!ATOMIC_COMPARE_EXCHANGE_WEAK_PTR(&glb->message_queue, ¤t_first, queued_item)); // Make sure the scheduler is busy sys_signal(glb); + + result = SEND_MESSAGE_OK; } + return result; +} + +enum SendMessageResult globalcontext_post_message_from_task(GlobalContext *glb, int32_t process_id, Message *message) +{ + return globalcontext_send_message_from_task_common(glb, process_id, &message->base, NormalMessage, term_nil()); +} + +void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id, enum MessageType type, term t) +{ + globalcontext_send_message_from_task_common(glb, process_id, NULL, type, t); } static inline void globalcontext_process_message_queue(GlobalContext *glb) diff --git a/src/libAtomVM/globalcontext.h b/src/libAtomVM/globalcontext.h index 1aadb24068..375f3a7a3c 100644 --- a/src/libAtomVM/globalcontext.h +++ b/src/libAtomVM/globalcontext.h @@ -71,6 +71,11 @@ typedef struct GlobalContext GlobalContext; typedef struct MailboxMessage MailboxMessage; #endif +#ifndef TYPEDEF_MESSAGE +#define TYPEDEF_MESSAGE +typedef struct Message Message; +#endif + struct MessageQueueItem { struct MessageQueueItem *next; @@ -165,6 +170,12 @@ struct GlobalContext void *platform_data; }; +enum SendMessageResult +{ + SEND_MESSAGE_OK = 0, + SEND_MESSAGE_PROCESS_NOT_FOUND = 1 +}; + /** * @brief Creates a new GlobalContext * @@ -250,6 +261,18 @@ void globalcontext_send_message(GlobalContext *glb, int32_t process_id, term t); */ void globalcontext_send_message_nolock(GlobalContext *glb, int32_t process_id, term t); +/** + * @brief Post a mailbox message to a process identified by its id. + * @details This function is only used by enif_select_read/enif_select_write to + * post a message that is built before. + * + * @param glb the global context (that owns the process table). + * @param process_id the local process id. + * @param m the mailbox message to send. + * @return SEND_MESSAGE_OK if the message was sent (and ownership transfered). + */ +enum SendMessageResult globalcontext_post_message(GlobalContext *glb, int32_t process_id, Message *m); + #ifdef AVM_TASK_DRIVER_ENABLED /** * @brief Send a message to a process identified by its id. This variant is to @@ -267,6 +290,19 @@ void globalcontext_send_message_nolock(GlobalContext *glb, int32_t process_id, t */ void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id, enum MessageType type, term t); +/** + * @brief Post a mailbox message to a process identified by its id. Variant + * to be used from task drivers. + * @details This function is only used by enif_select_read/enif_select_write to + * post a message that is built before. + * + * @param glb the global context (that owns the process table). + * @param process_id the local process id. + * @param m the mailbox message to send. + * @return SEND_MESSAGE_OK if the message was sent (and ownership transfered). + */ +enum SendMessageResult globalcontext_post_message_from_task(GlobalContext *glb, int32_t process_id, Message *m); + /** * @brief Enqueue a refc binary from a task driver, to be refc decremented * later from the scheduler. diff --git a/src/libAtomVM/inet.c b/src/libAtomVM/inet.c index e3a858fd34..54227362e3 100644 --- a/src/libAtomVM/inet.c +++ b/src/libAtomVM/inet.c @@ -44,6 +44,11 @@ enum inet_type inet_atom_to_type(term type, GlobalContext *global) return interop_atom_term_select_int(inet_type_table, type, global); } +term inet_type_to_atom(enum inet_type type, GlobalContext *global) +{ + return interop_atom_term_select_atom(inet_type_table, (int) type, global); +} + static const AtomStringIntPair inet_protocol_table[] = { { ATOM_STR("\x2", "ip"), InetIpProtocol }, { ATOM_STR("\x3", "tcp"), InetTcpProtocol }, diff --git a/src/libAtomVM/inet.h b/src/libAtomVM/inet.h index e422bcd50c..1b2490bc73 100644 --- a/src/libAtomVM/inet.h +++ b/src/libAtomVM/inet.h @@ -59,6 +59,14 @@ enum inet_type */ enum inet_type inet_atom_to_type(term type, GlobalContext *global); +/** + * @brief Convert an inet type to an atom + * @param type the inet type + * @param global the global context + * @returns an atom representing the inet type + */ +term inet_type_to_atom(enum inet_type type, GlobalContext *global); + enum inet_protocol { InetInvalidProtocol = 0, diff --git a/src/libAtomVM/mailbox.c b/src/libAtomVM/mailbox.c index 9d78bb1db5..bcd75a75b3 100644 --- a/src/libAtomVM/mailbox.c +++ b/src/libAtomVM/mailbox.c @@ -126,6 +126,16 @@ void mailbox_message_dispose(MailboxMessage *m, Heap *heap) } } +// Dispose message. Normal / signal messages are not destroyed, instead they +// are appended to the current heap. +void mailbox_message_dispose_unsent(Message *m, GlobalContext *global, bool from_task) +{ + term mso_list = m->storage[STORAGE_MSO_LIST_INDEX]; + HeapFragment *fragment = mailbox_message_to_heap_fragment(m, m->heap_end); + memory_sweep_mso_list(mso_list, global, from_task); + memory_destroy_heap_fragment(fragment); +} + void mailbox_destroy(Mailbox *mbox, Heap *heap) { MailboxMessage *msg = mbox->outer_first; @@ -191,13 +201,13 @@ inline void mailbox_enqueue_message(Context *c, MailboxMessage *m) } while (!ATOMIC_COMPARE_EXCHANGE_WEAK_PTR(&c->mailbox.outer_first, ¤t_first, m)); } -static void mailbox_post_message(Context *c, MailboxMessage *m) +void mailbox_post_message(Context *c, MailboxMessage *m) { mailbox_enqueue_message(c, m); scheduler_signal_message(c); } #else -static void mailbox_post_message(Context *c, MailboxMessage *m) +void mailbox_post_message(Context *c, MailboxMessage *m) { m->next = c->mailbox.outer_first; c->mailbox.outer_first = m; @@ -231,6 +241,12 @@ MailboxMessage *mailbox_message_create_from_term(enum MessageType type, term t) } } +Message *mailbox_message_create_normal_message_from_term(term t) +{ + MailboxMessage *message = mailbox_message_create_from_term(NormalMessage, t); + return CONTAINER_OF(message, Message, base); +} + void mailbox_send(Context *c, term t) { MailboxMessage *msg = mailbox_message_create_from_term(NormalMessage, t); diff --git a/src/libAtomVM/mailbox.h b/src/libAtomVM/mailbox.h index 64d53a6f40..68fb92ae15 100644 --- a/src/libAtomVM/mailbox.h +++ b/src/libAtomVM/mailbox.h @@ -57,6 +57,11 @@ struct Context; typedef struct Context Context; #endif +#ifndef TYPEDEF_GLOBALCONTEXT +#define TYPEDEF_GLOBALCONTEXT +typedef struct GlobalContext GlobalContext; +#endif + struct Heap; #ifndef TYPEDEF_HEAP @@ -69,7 +74,10 @@ typedef struct Heap Heap; typedef struct MailboxMessage MailboxMessage; #endif +#ifndef TYPEDEF_MESSAGE +#define TYPEDEF_MESSAGE typedef struct Message Message; +#endif enum MessageType { @@ -238,6 +246,17 @@ void mailbox_send_ref_signal(Context *c, enum MessageType type, uint64_t ref_tic */ void mailbox_send_empty_body_signal(Context *c, enum MessageType type); +/** + * @brief Post a message. + * + * @details Post a message to a given context. Context gets ownership of the + * created message. + * + * @param c the process context. + * @param m the mailbox message to send + */ +void mailbox_post_message(Context *c, MailboxMessage *m); + #ifdef AVM_TASK_DRIVER_ENABLED /** * @brief Enqueue message @@ -341,6 +360,23 @@ MailboxMessage *mailbox_message_create_from_term(enum MessageType type, term t); */ void mailbox_message_dispose(MailboxMessage *m, Heap *heap); +/** + * @brief Allocate and serialize a term to a normal message. + * + * @details Can be called from a task or even ISR (provided malloc works). + * @param t the term that will be sent + */ +Message *mailbox_message_create_normal_message_from_term(term t); + +/** + * @brief Dispose an unsent normal message. The message will be freed. + * + * @param m the message to dispose. + * @param global the global context + * @param from_task boolean, true if called from a task, false otherwise + */ +void mailbox_message_dispose_unsent(Message *m, GlobalContext *global, bool from_task); + /** * @brief Remove next message from mailbox. * diff --git a/src/libAtomVM/otp_socket.c b/src/libAtomVM/otp_socket.c index 1b76a155e9..befa87fad5 100644 --- a/src/libAtomVM/otp_socket.c +++ b/src/libAtomVM/otp_socket.c @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -148,10 +149,14 @@ static void udp_recv_cb(void *arg, struct udp_pcb *pcb, struct pbuf *p, const ip struct SocketResource { int fd; - uint64_t ref_ticks; + uint64_t socket_ref_ticks; + uint64_t select_ref_ticks; int32_t selecting_process_id; ErlNifMonitor selecting_process_monitor; size_t buf_size; +#ifndef AVM_NO_SMP + RWLock *socket_lock; +#endif }; #elif OTP_SOCKET_LWIP struct SocketResource @@ -162,7 +167,8 @@ struct SocketResource struct tcp_pcb *tcp_pcb; struct udp_pcb *udp_pcb; }; - uint64_t ref_ticks; + uint64_t socket_ref_ticks; + uint64_t select_ref_ticks; int32_t selecting_process_id; // trapped or selecting ErlNifMonitor selecting_process_monitor; bool linger_on; @@ -170,6 +176,9 @@ struct SocketResource size_t pos; struct ListHead received_list; size_t buf_size; +#ifndef AVM_NO_SMP + RWLock *socket_lock; +#endif }; #endif @@ -183,6 +192,7 @@ static const char *const onoff_atom = ATOM_STR("\x5", "onoff"); static const char *const port_atom = ATOM_STR("\x4", "port"); static const char *const rcvbuf_atom = ATOM_STR("\x6", "rcvbuf"); static const char *const reuseaddr_atom = ATOM_STR("\x9", "reuseaddr"); +static const char *const type_atom = ATOM_STR("\x4", "type"); #define CLOSED_FD 0 @@ -227,6 +237,9 @@ static const AtomStringIntPair otp_socket_setopt_level_table[] = { static ErlNifResourceType *socket_resource_type; +#define SOCKET_MAKE_SELECT_NOTIFICATION_SIZE (TUPLE_SIZE(4) + REF_SIZE + TUPLE_SIZE(2) + REF_SIZE + TERM_BOXED_RESOURCE_SIZE) +static term socket_make_select_notification(struct SocketResource *rsrc_obj, Heap *heap); + // // resource operations // @@ -261,6 +274,9 @@ static void socket_dtor(ErlNifEnv *caller_env, void *obj) } LWIP_END(); #endif +#ifndef AVM_NO_SMP + smp_rwlock_destroy(rsrc_obj->socket_lock); +#endif } #if OTP_SOCKET_BSD @@ -295,6 +311,8 @@ static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNif TRACE("socket_down called on process_id=%i\n", (int) *pid); #endif + SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock); + #if OTP_SOCKET_BSD if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { // Monitor fired, so make sure we don't try to demonitor in select_stop @@ -323,6 +341,8 @@ static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNif LWIP_END(); } #endif + + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); } static const ErlNifResourceTypeInit SocketResourceTypeInit = { @@ -336,12 +356,40 @@ static const ErlNifResourceTypeInit SocketResourceTypeInit = { .down = socket_down, }; +// Make a notification message, using SOCKET_MAKE_SELECT_NOTIFICATION_SIZE on heap +static term socket_make_select_notification(struct SocketResource *rsrc_obj, Heap *heap) +{ + term notification = term_alloc_tuple(4, heap); + term_put_tuple_element(notification, 0, DOLLAR_SOCKET_ATOM); + term socket_tuple = term_alloc_tuple(2, heap); + term_put_tuple_element(socket_tuple, 0, term_from_resource(rsrc_obj, heap)); + struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj); + refc_binary_increment_refcount(rsrc_refc); + term socket_ref; + if (rsrc_obj->socket_ref_ticks == 0) { + socket_ref = UNDEFINED_ATOM; + } else { + socket_ref = term_from_ref_ticks(rsrc_obj->socket_ref_ticks, heap); + } + term_put_tuple_element(socket_tuple, 1, socket_ref); + term_put_tuple_element(notification, 1, socket_tuple); + term_put_tuple_element(notification, 2, SELECT_ATOM); + term select_ref; + if (rsrc_obj->select_ref_ticks == 0) { + select_ref = UNDEFINED_ATOM; + } else { + select_ref = term_from_ref_ticks(rsrc_obj->select_ref_ticks, heap); + } + term_put_tuple_element(notification, 3, select_ref); + return notification; +} + // select emulation for lwIP that doesn't have select. #if OTP_SOCKET_LWIP static void select_event_send_notification_from_nif(struct SocketResource *rsrc_obj, Context *locked_ctx) { - BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap) - term notification = select_event_make_notification(rsrc_obj, rsrc_obj->ref_ticks, false, &heap); + BEGIN_WITH_STACK_HEAP(SOCKET_MAKE_SELECT_NOTIFICATION_SIZE, heap) + term notification = socket_make_select_notification(rsrc_obj, &heap); mailbox_send(locked_ctx, notification); END_WITH_STACK_HEAP(heap, locked_ctx->global) } @@ -350,8 +398,8 @@ static void select_event_send_notification_from_handler(struct SocketResource *r { struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj); GlobalContext *global = rsrc_refc->resource_type->global; - BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap) - term notification = select_event_make_notification(rsrc_obj, rsrc_obj->ref_ticks, false, &heap); + BEGIN_WITH_STACK_HEAP(SOCKET_MAKE_SELECT_NOTIFICATION_SIZE, heap) + term notification = socket_make_select_notification(rsrc_obj, &heap); globalcontext_send_message(global, process_id, notification); END_WITH_STACK_HEAP(heap, global) } @@ -502,6 +550,13 @@ static term nif_socket_open(Context *ctx, int argc, term argv[]) RAISE_ERROR(OUT_OF_MEMORY_ATOM); } +#ifndef AVM_NO_SMP + rsrc_obj->socket_lock = smp_rwlock_create(); + if (IS_NULL_PTR(rsrc_obj->socket_lock)) { + free(rsrc_obj); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } +#endif #if OTP_SOCKET_BSD rsrc_obj->fd = socket(domain, type, protocol); if (UNLIKELY(rsrc_obj->fd == -1 || rsrc_obj->fd == CLOSED_FD)) { @@ -512,6 +567,14 @@ static term nif_socket_open(Context *ctx, int argc, term argv[]) TRACE("nif_socket_open: Created socket fd=%i\n", rsrc_obj->fd); rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + if (type != SOCK_STREAM) { + // TCP sockets are made non-blocking after connect, for now. + if (UNLIKELY(fcntl(rsrc_obj->fd, F_SETFL, O_NONBLOCK) != 0)) { + AVM_LOGE(TAG, "Unable to configure fd=%d to be non blocking.", rsrc_obj->fd); + return make_errno_tuple(ctx); + } + } + #elif OTP_SOCKET_LWIP if (domain == PF_INET && type == SOCK_STREAM && protocol == IPPROTO_TCP) { LWIP_BEGIN(); @@ -565,6 +628,7 @@ static term nif_socket_open(Context *ctx, int argc, term argv[]) term socket_term = term_alloc_tuple(2, &ctx->heap); uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global); + rsrc_obj->socket_ref_ticks = ref_ticks; term ref = term_from_ref_ticks(ref_ticks, &ctx->heap); term_put_tuple_element(socket_term, 0, obj); term_put_tuple_element(socket_term, 1, ref); @@ -604,21 +668,27 @@ bool term_is_otp_socket(term socket_term) // close // -static int send_closed_notification(Context *ctx, struct SocketResource *rsrc_obj) +static int send_closed_notification(Context *ctx, term socket_term, int32_t selecting_process_id, struct SocketResource *rsrc_obj) { - // send a {closed, Ref | undefined} message to the pid - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + REF_SIZE) != MEMORY_GC_OK)) { + // send a {'$socket', Socket, abort, {Ref | undefined, closed}} message to the pid + if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(4) + TUPLE_SIZE(2) + REF_SIZE, 1, &socket_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); return -1; } + term socket_tuple = term_alloc_tuple(4, &ctx->heap); + term_put_tuple_element(socket_tuple, 0, DOLLAR_SOCKET_ATOM); + term_put_tuple_element(socket_tuple, 1, socket_term); + term_put_tuple_element(socket_tuple, 2, ABORT_ATOM); + term error_tuple = term_alloc_tuple(2, &ctx->heap); - term_put_tuple_element(error_tuple, 0, CLOSED_ATOM); - term ref = (rsrc_obj->ref_ticks == 0) ? UNDEFINED_ATOM : term_from_ref_ticks(rsrc_obj->ref_ticks, &ctx->heap); - term_put_tuple_element(error_tuple, 1, ref); + term ref = (rsrc_obj->select_ref_ticks == 0) ? UNDEFINED_ATOM : term_from_ref_ticks(rsrc_obj->select_ref_ticks, &ctx->heap); + term_put_tuple_element(error_tuple, 0, ref); + term_put_tuple_element(error_tuple, 1, CLOSED_ATOM); + term_put_tuple_element(socket_tuple, 3, error_tuple); - TRACE("nif_socket_close: Sending msg to process %i, rsrc_obj = %p\n", (int) rsrc_obj->selecting_process_id, (void *) rsrc_obj); - globalcontext_send_message(ctx->global, rsrc_obj->selecting_process_id, error_tuple); + TRACE("nif_socket_close: Sending msg to process %i, rsrc_obj = %p\n", (int) selecting_process_id, (void *) rsrc_obj); + globalcontext_send_message(ctx->global, selecting_process_id, socket_tuple); return 0; } @@ -641,53 +711,61 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { RAISE_ERROR(BADARG_ATOM); } + + SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock); + #if OTP_SOCKET_BSD if (rsrc_obj->fd) { + // In POSIX with BSD sockets, if a file descriptor being monitored by + // select() is closed in another thread, the result is unspecified. + // select may continue. + // + // However, in Erlang, asynchronous sockets support closing from another + // process, as documented in specification of the abort message. + + // So we handle closing a socket while another process is selecting if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { - // - // If we are in select, then stop selecting - // + // Save process id as socket_stop may be called by enif_select. + int32_t selecting_process_id = rsrc_obj->selecting_process_id; + // Stop selecting. int stop_res = enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil()); if (UNLIKELY(stop_res < 0)) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } - // TODO: check if stop_res & ERL_NIF_SELECT_STOP_CALLED or ERL_NIF_SELECT_STOP_SCHEDULED - // following what OTP does. Indeed, if we have ERL_NIF_SELECT_STOP_SCHEDULED, we should not close the socket now - // - // If there is a process (other than ourself) who is waiting on select - // the send a {closed, Ref} message to it, so that it can break + // If there is a selecting process who may be waiting on select, + // send a closed notification to it, so that it can break // out of its receive statement. // - if (rsrc_obj->selecting_process_id != ctx->process_id) { - - // send a {closed, Ref | undefined} message to the pid - if (UNLIKELY(send_closed_notification(ctx, rsrc_obj) < 0)) { + // When using asynchronous API, the selecting process can be the + // calling process. In this case we don't send any notification. + // + if (selecting_process_id != ctx->process_id) { + // send a {'$socket', Socket, abort, {Ref | undefined, closed}} message to the pid + if (UNLIKELY(send_closed_notification(ctx, argv[0], selecting_process_id, rsrc_obj) < 0)) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - } else { - AVM_LOGW(TAG, "Selectable socket %i was closed but no known pid is waiting. This shouldn't happen.", rsrc_obj->fd); } } + // Eventually close the socket int res = close(rsrc_obj->fd); if (UNLIKELY(res != 0)) { AVM_LOGW(TAG, "Failed to close socket %i", res); } - - TRACE("nif_socket_close: Clearing pid for socket fd=%i\n", rsrc_obj->fd); rsrc_obj->fd = CLOSED_FD; - rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; - rsrc_obj->ref_ticks = 0; } else { TRACE("Double close on socket fd %i", rsrc_obj->fd); } #elif OTP_SOCKET_LWIP - // If the socket is being selected by another process, send a closed tuple. + // If the socket is being selected by another process, send a closed notification. if (rsrc_obj->socket_state & SocketStateSelectingRead && rsrc_obj->selecting_process_id != INVALID_PROCESS_ID && rsrc_obj->selecting_process_id != ctx->process_id) { - // send a {closed, Ref | undefined} message to the pid - if (UNLIKELY(send_closed_notification(ctx, rsrc_obj) < 0)) { + // send a {'$socket', Socket, abort, {Ref | undefined, closed}} message to the pid + if (UNLIKELY(send_closed_notification(ctx, argv[0], rsrc_obj->selecting_process_id, rsrc_obj) < 0)) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } } @@ -721,7 +799,8 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) } #endif - rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE; + + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; } @@ -744,6 +823,13 @@ static struct SocketResource *make_accepted_socket_resource(struct tcp_pcb *newp conn_rsrc_obj->linger_on = false; conn_rsrc_obj->linger_sec = 0; conn_rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE; +#ifndef AVM_NO_SMP + conn_rsrc_obj->socket_lock = smp_rwlock_create(); + if (IS_NULL_PTR(conn_rsrc_obj->socket_lock)) { + free(conn_rsrc_obj); + return NULL; + } +#endif list_init(&conn_rsrc_obj->received_list); tcp_arg(newpcb, conn_rsrc_obj); @@ -905,6 +991,8 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[]) RAISE_ERROR(BADARG_ATOM); } + SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock); + ErlNifEnv *env = erl_nif_env_from_context(ctx); if (rsrc_obj->selecting_process_id != ctx->process_id && rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { // demonitor can fail if process is gone. @@ -915,19 +1003,28 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[]) // if select fails than to stop select if monitor fails if (rsrc_obj->selecting_process_id != ctx->process_id) { if (UNLIKELY(enif_monitor_process(env, rsrc_obj, &ctx->process_id, &rsrc_obj->selecting_process_monitor) != 0)) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(NOPROC_ATOM); } rsrc_obj->selecting_process_id = ctx->process_id; } - rsrc_obj->ref_ticks = (select_ref_term == UNDEFINED_ATOM) ? 0 : term_to_ref_ticks(select_ref_term); + rsrc_obj->select_ref_ticks = (select_ref_term == UNDEFINED_ATOM) ? 0 : term_to_ref_ticks(select_ref_term); #if OTP_SOCKET_BSD TRACE("rsrc_obj->fd=%i\n", (int) rsrc_obj->fd); - if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_READ, rsrc_obj, &ctx->process_id, select_ref_term) < 0)) { + // Ensure the resource we're working on isn't garbage collected now. + if (UNLIKELY(memory_ensure_free_with_roots(ctx, SOCKET_MAKE_SELECT_NOTIFICATION_SIZE, 2, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + term notification = socket_make_select_notification(rsrc_obj, &ctx->heap); + if (UNLIKELY(enif_select_read(erl_nif_env_from_context(ctx), rsrc_obj->fd, rsrc_obj, &ctx->process_id, notification, NULL) < 0)) { enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor); rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } @@ -974,11 +1071,13 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[]) default: enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor); LWIP_END(); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } LWIP_END(); #endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; } @@ -993,21 +1092,112 @@ static term nif_socket_select_stop(Context *ctx, int argc, term argv[]) if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { RAISE_ERROR(BADARG_ATOM); } + // Avoid the race condition with select object here. + SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; #if OTP_SOCKET_BSD if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil()) < 0)) { RAISE_ERROR(BADARG_ATOM); } - - return OK_ATOM; #elif OTP_SOCKET_LWIP LWIP_BEGIN(); if (rsrc_obj->socket_state & SocketStateSelectingRead) { rsrc_obj->socket_state &= ~SocketStateSelectingRead; } LWIP_END(); +#endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; +} + +// +// getopt +// + +static term nif_socket_getopt(Context *ctx, int argc, term argv[]) +{ + TRACE("nif_socket_getopt\n"); + UNUSED(argc); + + VALIDATE_VALUE(argv[0], term_is_otp_socket); + + GlobalContext *global = ctx->global; + + struct SocketResource *rsrc_obj; + if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { + RAISE_ERROR(BADARG_ATOM); + } + + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); + +#if OTP_SOCKET_BSD + if (rsrc_obj->fd == 0) { +#elif OTP_SOCKET_LWIP + if (rsrc_obj->socket_state == SocketStateClosed) { +#endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return make_error_tuple(CLOSED_ATOM, ctx); + } + term level_tuple = argv[1]; + + term level = term_get_tuple_element(level_tuple, 0); + int level_val = interop_atom_term_select_int(otp_socket_setopt_level_table, level, global); + switch (level_val) { + case OtpSocketSetoptLevelSocket: { + term opt = term_get_tuple_element(level_tuple, 1); + if (globalcontext_is_term_equal_to_atom_string(global, opt, type_atom)) { + enum inet_type type; +#if OTP_SOCKET_BSD + int option_value; + socklen_t option_len = sizeof(option_value); + int res = getsockopt(rsrc_obj->fd, SOL_SOCKET, SO_TYPE, &option_value, &option_len); + if (UNLIKELY(res != 0)) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return make_errno_tuple(ctx); + } else { + switch (option_value) { + case SOCK_STREAM: + type = InetStreamType; + break; + case SOCK_DGRAM: + type = InetDgramType; + break; + default: + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + RAISE_ERROR(BADARG_ATOM); + } + } +#elif OTP_SOCKET_LWIP + LWIP_BEGIN(); + if (rsrc_obj->socket_state & SocketStateTCP) { + type = InetStreamType; + } else { + type = InetDgramType; + } + LWIP_END(); #endif + if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(2), 1, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } + term result = term_alloc_tuple(2, &ctx->heap); + term_put_tuple_element(result, 0, OK_ATOM); + term_put_tuple_element(result, 1, inet_type_to_atom(type, global)); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return result; + } else { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + RAISE_ERROR(BADARG_ATOM); + } + } + default: { + AVM_LOGE(TAG, "socket:getopt: Unsupported level"); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + RAISE_ERROR(BADARG_ATOM); + } + } } // @@ -1028,12 +1218,15 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) RAISE_ERROR(BADARG_ATOM); } + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); + #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { #endif - return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return make_error_tuple(CLOSED_ATOM, ctx); } term level_tuple = argv[1]; term value = argv[2]; @@ -1049,6 +1242,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) int option_value = (value == TRUE_ATOM); #if OTP_SOCKET_BSD int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int)); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); if (UNLIKELY(res != 0)) { return make_errno_tuple(ctx); } else { @@ -1070,6 +1264,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) } } LWIP_END(); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; #endif } else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) { @@ -1082,6 +1277,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) sl.l_onoff = (onoff == TRUE_ATOM); sl.l_linger = term_to_int(linger); int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); if (UNLIKELY(res != 0)) { return make_errno_tuple(ctx); } else { @@ -1090,6 +1286,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) #elif OTP_SOCKET_LWIP rsrc_obj->linger_on = (onoff == TRUE_ATOM); rsrc_obj->linger_sec = term_to_int(linger); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; #endif // TODO add more as needed @@ -1099,6 +1296,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) // AVM_LOGW(TAG, "Failed to set TCP_NODELAY."); // } } else { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } @@ -1109,27 +1307,32 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) // TODO support the atom `default` as a value to roll back to the default buffer size if (UNLIKELY(!term_is_integer(value))) { - AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value must be an integer"); + TRACE("socket:setopt: otp rcvbuf value must be an integer"); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx); } avm_int_t buf_size = term_to_int(value); if (UNLIKELY(buf_size < 0)) { - AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value may not be negative"); + TRACE("socket:setopt: otp rcvbuf value may not be negative"); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx); } rsrc_obj->buf_size = (size_t) buf_size; + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; } else { - AVM_LOGE(TAG, "socket:setopt: Unsupported otp option"); + TRACE("socket:setopt: Unsupported otp option"); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(globalcontext_make_atom(global, invalid_option_atom), ctx); } } default: { - AVM_LOGE(TAG, "socket:setopt: Unsupported level"); + TRACE("socket:setopt: Unsupported level"); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } } @@ -1153,11 +1356,14 @@ static term nif_socket_sockname(Context *ctx, int argc, term argv[]) if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { RAISE_ERROR(BADARG_ATOM); } + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); + #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { #endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } @@ -1168,6 +1374,7 @@ static term nif_socket_sockname(Context *ctx, int argc, term argv[]) if (UNLIKELY(res != 0)) { AVM_LOGE(TAG, "Unable to getsockname: fd=%i res=%i.", rsrc_obj->fd, res); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_errno_tuple(ctx); } uint32_t ip4_u32 = ntohl(addr.sin_addr.s_addr); @@ -1185,6 +1392,7 @@ static term nif_socket_sockname(Context *ctx, int argc, term argv[]) } LWIP_END(); #endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); // {ok, #{addr => {a,b,c,d}, port => integer()}} if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + term_map_size_in_terms(2) + TUPLE_SIZE(4)) != MEMORY_GC_OK)) { @@ -1221,19 +1429,24 @@ static term nif_socket_peername(Context *ctx, int argc, term argv[]) RAISE_ERROR(BADARG_ATOM); } + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } if (rsrc_obj->socket_state & SocketStateUDP) { // TODO: handle "connected" UDP sockets + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); } if ((rsrc_obj->socket_state & SocketStateTCPListening) == SocketStateTCPListening) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(ENOTCONN, global), ctx); } #endif @@ -1245,14 +1458,17 @@ static term nif_socket_peername(Context *ctx, int argc, term argv[]) if (UNLIKELY(res != 0)) { AVM_LOGE(TAG, "Unable to getpeername: fd=%i res=%i.", rsrc_obj->fd, res); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_errno_tuple(ctx); } + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); uint32_t ip4_u32 = ntohl(addr.sin_addr.s_addr); uint16_t port_u16 = ntohs(addr.sin_port); #elif OTP_SOCKET_LWIP // TODO: support peername for "connected" UDP sockets uint32_t ip4_u32 = ntohl(ip_addr_get_ip4_u32(&rsrc_obj->tcp_pcb->remote_ip)); uint16_t port_u16 = rsrc_obj->tcp_pcb->remote_port; + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); #endif // {ok, #{addr => {a,b,c,d}, port => integer()}} @@ -1289,13 +1505,17 @@ static term nif_socket_bind(Context *ctx, int argc, term argv[]) if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { RAISE_ERROR(BADARG_ATOM); } + + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); #if OTP_SOCKET_BSD TRACE("rsrc_obj->fd=%i\n", (int) rsrc_obj->fd); if (rsrc_obj->fd == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } #endif @@ -1353,6 +1573,7 @@ static term nif_socket_bind(Context *ctx, int argc, term argv[]) serveraddr.sin_port = htons(port_u16); socklen_t address_len = sizeof(serveraddr); int res = bind(rsrc_obj->fd, (struct sockaddr *) &serveraddr, address_len); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); if (UNLIKELY(res != 0)) { AVM_LOGE(TAG, "Unable to bind socket: res=%i.", res); return make_errno_tuple(ctx); @@ -1366,6 +1587,7 @@ static term nif_socket_bind(Context *ctx, int argc, term argv[]) } else { res = udp_bind(rsrc_obj->udp_pcb, &ip_addr, port_u16); } + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); if (UNLIKELY(res != ERR_OK)) { AVM_LOGE(TAG, "Unable to bind socket: res=%i.", res); return make_lwip_err_tuple(res, ctx); @@ -1393,15 +1615,20 @@ static term nif_socket_listen(Context *ctx, int argc, term argv[]) if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { RAISE_ERROR(BADARG_ATOM); } + + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } if (rsrc_obj->socket_state & SocketStateUDP) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EPROTOTYPE, global), ctx); } #endif @@ -1410,6 +1637,7 @@ static term nif_socket_listen(Context *ctx, int argc, term argv[]) #if OTP_SOCKET_BSD int res = listen(rsrc_obj->fd, backlog); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); if (UNLIKELY(res != 0)) { AVM_LOGE(TAG, "Unable to listen on socket: res=%i.", res); return make_errno_tuple(ctx); @@ -1437,6 +1665,7 @@ static term nif_socket_listen(Context *ctx, int argc, term argv[]) tcp_accept(new_pcb, tcp_accept_cb); rsrc_obj->tcp_pcb = new_pcb; rsrc_obj->socket_state = SocketStateTCPListening; + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; #endif } @@ -1452,6 +1681,7 @@ static term make_accepted_socket_term(struct SocketResource *conn_rsrc_obj, Heap term socket_term = term_alloc_tuple(2, heap); uint64_t ref_ticks = globalcontext_get_ref_ticks(global); + conn_rsrc_obj->socket_ref_ticks = ref_ticks; term ref = term_from_ref_ticks(ref_ticks, heap); term_put_tuple_element(socket_term, 0, obj); term_put_tuple_element(socket_term, 1, ref); @@ -1472,19 +1702,25 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { RAISE_ERROR(BADARG_ATOM); } + + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state & SocketStateClosed) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } if (rsrc_obj->socket_state & SocketStateUDP) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); } // Only listening is allowed if ((rsrc_obj->socket_state & SocketStateTCPListening) != SocketStateTCPListening) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EINVAL, global), ctx); } #endif @@ -1492,33 +1728,51 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) #if OTP_SOCKET_BSD struct sockaddr_in clientaddr; socklen_t clientlen = sizeof(clientaddr); + if (UNLIKELY(fcntl(rsrc_obj->fd, F_SETFL, O_NONBLOCK) != 0)) { + AVM_LOGE(TAG, "Unable to configure fd=%d to be non blocking.", rsrc_obj->fd); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return make_errno_tuple(ctx); + } int fd = accept(rsrc_obj->fd, (struct sockaddr *) &clientaddr, &clientlen); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) { AVM_LOGE(TAG, "Unable to accept on socket %i.", rsrc_obj->fd); int err = errno; term reason = (err == ECONNABORTED) ? CLOSED_ATOM : posix_errno_to_term(err, global); return make_error_tuple(reason, ctx); } else { - + if (UNLIKELY(fcntl(fd, F_SETFL, O_NONBLOCK) != 0)) { + AVM_LOGE(TAG, "Unable to configure fd=%d to be non blocking.", fd); + close(fd); + return make_errno_tuple(ctx); + } struct SocketResource *conn_rsrc_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketResource)); conn_rsrc_obj->fd = fd; conn_rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; conn_rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE; +#ifndef AVM_NO_SMP + conn_rsrc_obj->socket_lock = smp_rwlock_create(); + if (IS_NULL_PTR(rsrc_obj->socket_lock)) { + free(conn_rsrc_obj); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } +#endif TRACE("nif_socket_accept: Created socket on accept fd=%i\n", rsrc_obj->fd); - term obj = enif_make_resource(erl_nif_env_from_context(ctx), conn_rsrc_obj); + term new_resource = enif_make_resource(erl_nif_env_from_context(ctx), conn_rsrc_obj); enif_release_resource(conn_rsrc_obj); size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; - if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &obj, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &new_resource, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } term socket_term = term_alloc_tuple(2, &ctx->heap); uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global); + conn_rsrc_obj->socket_ref_ticks = ref_ticks; term ref = term_from_ref_ticks(ref_ticks, &ctx->heap); - term_put_tuple_element(socket_term, 0, obj); + term_put_tuple_element(socket_term, 0, new_resource); term_put_tuple_element(socket_term, 1, ref); term result = term_alloc_tuple(2, &ctx->heap); @@ -1536,12 +1790,14 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) if (IS_NULL_PTR(new_resource)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); LWIP_END(); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } size_t requested_size = TERM_BOXED_RESOURCE_SIZE + TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; - if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); LWIP_END(); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } @@ -1555,9 +1811,12 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) term_put_tuple_element(result, 1, socket_term); } else { // return EAGAIN + LWIP_END(); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EAGAIN, ctx->global), ctx); } LWIP_END(); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return result; #endif } @@ -1586,7 +1845,7 @@ static size_t copy_pbuf_data(struct pbuf *src, size_t offset, size_t count, uint } #endif -ssize_t socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, int flags, term *from, Heap *heap) +static ssize_t do_socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, int flags, term *from, Heap *heap) { #if OTP_SOCKET_BSD // @@ -1604,9 +1863,10 @@ ssize_t socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, i term address = inet_make_addr4(ntohl(addr.sin_addr.s_addr), heap); term port_number = term_from_int(ntohs(addr.sin_port)); - term map = term_alloc_map(2, heap); + term map = term_alloc_map(3, heap); term_set_map_assoc(map, 0, ADDR_ATOM, address); - term_set_map_assoc(map, 1, PORT_ATOM, port_number); + term_set_map_assoc(map, 1, FAMILY_ATOM, INET_ATOM); + term_set_map_assoc(map, 2, PORT_ATOM, port_number); *from = map; } else { res = recv(rsrc_obj->fd, buf, len, flags); @@ -1691,7 +1951,7 @@ ssize_t socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, i term port_number = term_from_int(port_u16); term map = term_alloc_map(2, heap); - term_set_map_assoc(map, 0, globalcontext_make_atom(global, addr_atom), address); + term_set_map_assoc(map, 0, ADDR_ATOM, address); term_set_map_assoc(map, 1, PORT_ATOM, port_number); *from = map; @@ -1706,8 +1966,16 @@ ssize_t socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, i #endif } +ssize_t socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, int flags, term *from, Heap *heap) +{ + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); + ssize_t result = do_socket_recv(rsrc_obj, buf, len, flags, from, heap); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return result; +} + #if OTP_SOCKET_BSD -static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) +static term nif_socket_recv_with_peek(Context *ctx, term resource_term, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) { TRACE("nif_socket_recv_with_peek\n"); @@ -1718,6 +1986,9 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_ TRACE("%li bytes available.\n", (long int) res); if (res < 0) { AVM_LOGI(TAG, "Unable to receive data on fd %i. errno=%i", rsrc_obj->fd, errno); + if (errno == EAGAIN) { + return make_error_tuple(TIMEOUT_ATOM, ctx); + } return make_errno_tuple(ctx); } else if (res == 0) { TRACE("Peer closed socket %i.\n", rsrc_obj->fd); @@ -1732,7 +2003,8 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_ size_t ensure_packet_avail = term_binary_data_size_in_terms(buffer_size) + BINARY_HEADER_SIZE; size_t requested_size = TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + INET_ADDR4_TUPLE_SIZE + TERM_MAP_SIZE(2)) : 0); - if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { + // Because resource is locked, we must ensure it's not garbage collected + if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &resource_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } @@ -1765,7 +2037,7 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_ } } -static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) +static term nif_socket_recv_without_peek(Context *ctx, term resource_term, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) { TRACE("nif_socket_recv_without_peek\n"); @@ -1779,26 +2051,30 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs RAISE_ERROR(OUT_OF_MEMORY_ATOM); } else { - - term map = term_invalid_term(); + term roots[2]; + roots[0] = resource_term; + roots[1] = term_invalid_term(); if (is_recvfrom) { - if (UNLIKELY(memory_ensure_free(ctx, INET_ADDR4_TUPLE_SIZE + TERM_MAP_SIZE(2)) != MEMORY_GC_OK)) { + // Because resource is locked, we must ensure it's not garbage collected + if (UNLIKELY(memory_ensure_free_with_roots(ctx, INET_ADDR4_TUPLE_SIZE + TERM_MAP_SIZE(2), 1, &resource_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { free(buffer); AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } } - ssize_t res = socket_recv(rsrc_obj, buffer, buffer_size, 0, is_recvfrom ? &map : NULL, &ctx->heap); + ssize_t res = socket_recv(rsrc_obj, buffer, buffer_size, 0, is_recvfrom ? &roots[1] : NULL, &ctx->heap); if (res < 0) { int err = errno; term reason = (err == ECONNRESET) ? globalcontext_make_atom(global, ATOM_STR("\xA", "econnreset")) : posix_errno_to_term(err, global); if (err == ECONNRESET) { - AVM_LOGI(TAG, "Peer closed connection."); + TRACE("Peer closed connection."); + } else if (err == EAGAIN) { + return make_error_tuple(TIMEOUT_ATOM, ctx); } else { - AVM_LOGE(TAG, "Unable to read data on socket %i. errno=%i", rsrc_obj->fd, errno); + TRACE("Unable to read data on socket %i. errno=%i", rsrc_obj->fd, errno); } return make_error_tuple(reason, ctx); @@ -1818,7 +2094,8 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs size_t ensure_packet_avail = term_binary_data_size_in_terms(len) + BINARY_HEADER_SIZE; size_t requested_size = TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? TUPLE_SIZE(2) : 0); - if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, is_recvfrom ? 1 : 0, &map, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + // Because resource is locked, we must ensure it's not garbage collected + if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, is_recvfrom ? 2 : 1, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { free(buffer); AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); @@ -1828,7 +2105,7 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs term payload; if (is_recvfrom) { - term tuple = port_heap_create_tuple2(&ctx->heap, map, data); + term tuple = port_heap_create_tuple2(&ctx->heap, roots[1], data); payload = port_heap_create_ok_tuple(&ctx->heap, tuple); } else { payload = port_heap_create_ok_tuple(&ctx->heap, data); @@ -1841,7 +2118,7 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs #elif OTP_SOCKET_LWIP -static term nif_socket_recv_lwip(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) +static term nif_socket_recv_lwip(Context *ctx, term resource_term, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) { TRACE("nif_socket_recv_lwip\n"); @@ -1894,7 +2171,8 @@ static term nif_socket_recv_lwip(Context *ctx, struct SocketResource *rsrc_obj, size_t ensure_packet_avail = term_binary_data_size_in_terms(len) + BINARY_HEADER_SIZE; size_t requested_size = REF_SIZE + 2 * TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + INET_ADDR4_TUPLE_SIZE + TERM_MAP_SIZE(2)) : 0); - if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { + // Because resource is locked, we must ensure it's not garbage collected + if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &resource_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } @@ -1935,28 +2213,36 @@ static term nif_socket_recv_internal(Context *ctx, term argv[], bool is_recvfrom if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { RAISE_ERROR(BADARG_ATOM); } + + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, ctx->global), ctx); } #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state & SocketStateClosed) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, ctx->global), ctx); } if (rsrc_obj->socket_state & SocketStateListening) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, ctx->global), ctx); } #endif + term result; #if OTP_SOCKET_BSD if (otp_socket_platform_supports_peek()) { - return nif_socket_recv_with_peek(ctx, rsrc_obj, len, is_recvfrom); + result = nif_socket_recv_with_peek(ctx, argv[0], rsrc_obj, len, is_recvfrom); } else { - return nif_socket_recv_without_peek(ctx, rsrc_obj, len, is_recvfrom); + result = nif_socket_recv_without_peek(ctx, argv[0], rsrc_obj, len, is_recvfrom); } #elif OTP_SOCKET_LWIP - return nif_socket_recv_lwip(ctx, rsrc_obj, len, is_recvfrom); + result = nif_socket_recv_lwip(ctx, argv[0], rsrc_obj, len, is_recvfrom); #endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return result; } static term nif_socket_recv(Context *ctx, int argc, term argv[]) @@ -1978,7 +2264,7 @@ static term nif_socket_recvfrom(Context *ctx, int argc, term argv[]) // // send/sendto // -ssize_t socket_send(struct SocketResource *rsrc_obj, const uint8_t *buf, size_t len, term dest) +static ssize_t do_socket_send(struct SocketResource *rsrc_obj, const uint8_t *buf, size_t len, term dest) { ssize_t sent_data = -1; #if OTP_SOCKET_BSD @@ -2084,6 +2370,14 @@ ssize_t socket_send(struct SocketResource *rsrc_obj, const uint8_t *buf, size_t #endif } +ssize_t socket_send(struct SocketResource *rsrc_obj, const uint8_t *buf, size_t len, term dest) +{ + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); + ssize_t result = do_socket_send(rsrc_obj, buf, len, dest); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return result; +} + static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool is_sendto) { TRACE("nif_socket_send_internal\n"); @@ -2099,15 +2393,19 @@ static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool i RAISE_ERROR(BADARG_ATOM); } + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } if (rsrc_obj->socket_state & SocketStateListening) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); } #endif @@ -2121,7 +2419,8 @@ static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool i const uint8_t *buf = (const uint8_t *) term_binary_data(data); size_t len = term_binary_size(data); - ssize_t sent_data = socket_send(rsrc_obj, buf, len, dest); + ssize_t sent_data = do_socket_send(rsrc_obj, buf, len, dest); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); // {ok, RestData} | {error, Reason} @@ -2147,7 +2446,7 @@ static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool i return port_create_tuple2(ctx, OK_ATOM, data); } else { - AVM_LOGE(TAG, "Unable to send data: res=%zi.", sent_data); + TRACE("Unable to send data: res=%zi.", sent_data); return make_error_tuple(CLOSED_ATOM, ctx); } } @@ -2239,27 +2538,33 @@ static term nif_socket_connect(Context *ctx, int argc, term argv[]) RAISE_ERROR(BADARG_ATOM); } + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); term sockaddr = argv[1]; term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), ctx->global); term addr = interop_kv_get_value(sockaddr, addr_atom, ctx->global); if (term_is_invalid_term(addr)) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } avm_int_t port_number = term_to_int(port); if (port_number < 0 || port_number > 65535) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } if (((rsrc_obj->socket_state & SocketStateTCPListening) == SocketStateTCPListening) || ((rsrc_obj->socket_state & SocketStateTCPConnected) == SocketStateTCPConnected)) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); } #endif @@ -2284,16 +2589,25 @@ static term nif_socket_connect(Context *ctx, int argc, term argv[]) if (errno == EINPROGRESS) { // TODO make connect non-blocking + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return UNDEFINED_ATOM; } else { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); AVM_LOGE(TAG, "Unable to connect: res=%i errno=%i", res, errno); return make_error_tuple(CLOSED_ATOM, ctx); } } else if (res == 0) { + if (UNLIKELY(fcntl(rsrc_obj->fd, F_SETFL, O_NONBLOCK) != 0)) { + AVM_LOGE(TAG, "Unable to configure fd=%d to be non blocking.", rsrc_obj->fd); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return make_errno_tuple(ctx); + } + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; } else { // won't happen according to connect(2) + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return UNDEFINED_ATOM; } #elif OTP_SOCKET_LWIP @@ -2312,11 +2626,13 @@ static term nif_socket_connect(Context *ctx, int argc, term argv[]) RAISE_ERROR(OUT_OF_MEMORY_ATOM); } if (rsrc_obj->socket_state & SocketStateUDP) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; } else { rsrc_obj->selecting_process_id = ctx->process_id; // Trap caller waiting for completion context_update_flags(ctx, ~NoFlags, Trap); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return term_invalid_term(); } #endif @@ -2341,6 +2657,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[]) RAISE_ERROR(BADARG_ATOM); } + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); int how; int val = interop_atom_term_select_int(otp_socket_shutdown_direction_table, argv[1], global); switch (val) { @@ -2358,6 +2675,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[]) break; default: + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } @@ -2366,6 +2684,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[]) #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { #endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } @@ -2375,6 +2694,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[]) int res = shutdown(rsrc_obj->fd, how); if (res < 0) { AVM_LOGE(TAG, "Unable to shut down socket: res=%i errno=%i", res, errno); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_errno_tuple(ctx); } #elif OTP_SOCKET_LWIP @@ -2391,6 +2711,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[]) } LWIP_END(); #endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return result; } @@ -2410,6 +2731,10 @@ static const struct Nif socket_select_stop_nif = { .base.type = NIFFunctionType, .nif_ptr = nif_socket_select_stop }; +static const struct Nif socket_getopt_nif = { + .base.type = NIFFunctionType, + .nif_ptr = nif_socket_getopt +}; static const struct Nif socket_setopt_nif = { .base.type = NIFFunctionType, .nif_ptr = nif_socket_setopt @@ -2480,6 +2805,10 @@ const struct Nif *otp_socket_nif_get_nif(const char *nifname) TRACE("Resolved platform nif %s ...\n", nifname); return &socket_select_stop_nif; } + if (strcmp("getopt/2", rest) == 0) { + TRACE("Resolved platform nif %s ...\n", nifname); + return &socket_getopt_nif; + } if (strcmp("setopt/3", rest) == 0) { TRACE("Resolved platform nif %s ...\n", nifname); return &socket_setopt_nif; diff --git a/src/libAtomVM/resources.c b/src/libAtomVM/resources.c index f19eb3834b..fe6aaa58c4 100644 --- a/src/libAtomVM/resources.c +++ b/src/libAtomVM/resources.c @@ -116,19 +116,20 @@ ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj) return term_from_resource(obj, &env->heap); } -int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref) +static void enif_select_event_message_dispose(Message *message, GlobalContext *global, bool from_task) { - if (!(mode & (ERL_NIF_SELECT_STOP | ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE))) { - return ERL_NIF_SELECT_BADARG; - } - if (UNLIKELY(mode & (ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE) && !term_is_local_reference(ref) && ref != UNDEFINED_ATOM)) { - return ERL_NIF_SELECT_BADARG; + if (message) { + mailbox_message_dispose_unsent(message, global, from_task); } +} +static int enif_select_common(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref, Message *message) +{ + GlobalContext *global = env->global; struct RefcBinary *resource = refc_binary_from_data(obj); // Search for event and obj struct ListHead *item; - struct ListHead *select_events = synclist_wrlock(&env->global->select_events); + struct ListHead *select_events = synclist_wrlock(&global->select_events); struct SelectEvent *select_event = NULL; LIST_FOR_EACH (item, select_events) { select_event = GET_LIST_ENTRY(item, struct SelectEvent, head); @@ -139,19 +140,20 @@ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, } if (mode & ERL_NIF_SELECT_STOP) { if (select_event == NULL) { - synclist_unlock(&env->global->select_events); + synclist_unlock(&global->select_events); return ERL_NIF_SELECT_INVALID_EVENT; } bool was_read = select_event->read; bool was_write = select_event->write; if (!was_read && !was_write) { list_remove(&select_event->head); - synclist_unlock(&env->global->select_events); + synclist_unlock(&global->select_events); // We can call stop now. if (resource->resource_type->stop) { resource->resource_type->stop(env, obj, event, true); } - refc_binary_decrement_refcount(resource, env->global); + refc_binary_decrement_refcount(resource, global); + enif_select_event_message_dispose(select_event->message, global, false); free((void *) select_event); return ERL_NIF_SELECT_STOP_CALLED; } @@ -161,13 +163,13 @@ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, select_event->close = 1; select_event->read = 0; select_event->write = 0; - synclist_unlock(&env->global->select_events); + synclist_unlock(&global->select_events); // Platform loop should check close flag after unregister is called if (was_read) { - sys_unregister_select_event(env->global, event, false); + sys_unregister_select_event(global, event, false); } if (was_write) { - sys_unregister_select_event(env->global, event, true); + sys_unregister_select_event(global, event, true); } return ERL_NIF_SELECT_STOP_SCHEDULED; } @@ -179,31 +181,60 @@ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, } select_event->event = event; select_event->resource = resource; + select_event->message = NULL; + select_event->ref_ticks = 0; // Resource is used in select_event, so we increase refcount. refc_binary_increment_refcount(resource); list_init(&select_event->head); list_append(select_events, &select_event->head); } - // Second read or second write overwrite ref & pid. - if (ref == UNDEFINED_ATOM) { + // Second read or second write overwrite ref/message & pid. + enif_select_event_message_dispose(select_event->message, global, false); + if (message) { + select_event->message = message; select_event->ref_ticks = 0; } else { - select_event->ref_ticks = term_to_ref_ticks(ref); + if (ref == UNDEFINED_ATOM) { + select_event->ref_ticks = 0; + } else { + select_event->ref_ticks = term_to_ref_ticks(ref); + } } select_event->local_pid = *pid; select_event->read = mode & ERL_NIF_SELECT_READ; select_event->write = mode & ERL_NIF_SELECT_WRITE; select_event->close = 0; - synclist_unlock(&env->global->select_events); + synclist_unlock(&global->select_events); if (select_event->read) { - sys_register_select_event(env->global, event, false); + sys_register_select_event(global, event, false); } if (select_event->write) { - sys_register_select_event(env->global, event, true); + sys_register_select_event(global, event, true); } return 0; } +int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref) +{ + if (!(mode & (ERL_NIF_SELECT_STOP | ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE))) { + return ERL_NIF_SELECT_BADARG; + } + if (UNLIKELY(mode & (ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE) && !term_is_local_reference(ref) && ref != UNDEFINED_ATOM)) { + return ERL_NIF_SELECT_BADARG; + } + return enif_select_common(env, event, mode, obj, pid, ref, NULL); +} + +int enif_select_read(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env) +{ + if (UNLIKELY(msg_env != NULL)) { + return ERL_NIF_SELECT_BADARG; + } + Message *message = mailbox_message_create_normal_message_from_term(msg); + enum ErlNifSelectFlags mode = ERL_NIF_SELECT_READ; + return enif_select_common(env, event, mode, obj, pid, term_nil(), message); +} + term select_event_make_notification(void *rsrc_obj, uint64_t ref_ticks, bool is_write, Heap *heap) { term notification = term_alloc_tuple(4, heap); @@ -224,19 +255,33 @@ term select_event_make_notification(void *rsrc_obj, uint64_t ref_ticks, bool is_ static void select_event_send_notification(struct SelectEvent *select_event, bool is_write, GlobalContext *global) { - BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap) - term notification = select_event_make_notification(select_event->resource->data, select_event->ref_ticks, is_write, &heap); + if (select_event->message) { + enum SendMessageResult result; +#ifdef AVM_SELECT_IN_TASK + result = globalcontext_post_message_from_task(global, select_event->local_pid, select_event->message); +#else + result = globalcontext_post_message(global, select_event->local_pid, select_event->message); +#endif + if (result == SEND_MESSAGE_OK) { + // Ownership was properly transfered. + // Otherwise, it will be destroyed when we have a context (when enif_select is called with stop for example) + select_event->message = NULL; + } + } else { + BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap) + term notification = select_event_make_notification(select_event->resource->data, select_event->ref_ticks, is_write, &heap); #ifdef AVM_SELECT_IN_TASK - globalcontext_send_message_from_task(global, select_event->local_pid, NormalMessage, notification); + globalcontext_send_message_from_task(global, select_event->local_pid, NormalMessage, notification); #else - globalcontext_send_message(global, select_event->local_pid, notification); + globalcontext_send_message(global, select_event->local_pid, notification); #endif + END_WITH_STACK_HEAP(heap, global) + } if (is_write) { select_event->write = 0; } else { select_event->read = 0; } - END_WITH_STACK_HEAP(heap, global) sys_unregister_select_event(global, select_event->event, is_write); } @@ -279,6 +324,7 @@ static inline void select_event_destroy(struct SelectEvent *select_event, Global #else refc_binary_decrement_refcount(select_event->resource, global); #endif + enif_select_event_message_dispose(select_event->message, global, true); free((void *) select_event); } diff --git a/src/libAtomVM/resources.h b/src/libAtomVM/resources.h index b9f2c7b5f7..f9ba20162e 100644 --- a/src/libAtomVM/resources.h +++ b/src/libAtomVM/resources.h @@ -25,6 +25,7 @@ #include "erl_nif.h" #include "list.h" +#include "mailbox.h" #include "memory.h" #ifdef __cplusplus @@ -70,6 +71,7 @@ struct SelectEvent bool close; int32_t local_pid; uint64_t ref_ticks; + Message *message; }; static inline void resource_type_destroy(struct ResourceType *resource_type) diff --git a/src/platforms/esp32/test/main/test_erl_sources/test_ssl.erl b/src/platforms/esp32/test/main/test_erl_sources/test_ssl.erl index 15f7a835e4..42449c1354 100644 --- a/src/platforms/esp32/test/main/test_erl_sources/test_ssl.erl +++ b/src/platforms/esp32/test/main/test_erl_sources/test_ssl.erl @@ -70,9 +70,9 @@ handshake_loop(SSLContext, Socket) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> handshake_loop(SSLContext, Socket); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> ok = socket:close(Socket), {error, closed} end; @@ -98,9 +98,9 @@ send_loop(SSLContext, Socket, Binary) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> send_loop(SSLContext, Socket, Binary); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> {error, closed} end; {error, _Reason} = Error -> @@ -124,9 +124,9 @@ recv_loop(SSLContext, Socket, Remaining, Acc) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> recv_loop(SSLContext, Socket, Remaining, Acc); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> {error, closed} end; {error, _Reason} = Error -> @@ -147,9 +147,9 @@ close_notify_loop(SSLContext, Socket) -> case socket:nif_select_read(Socket, Ref) of ok -> receive - {select, _SocketResource, Ref, ready_input} -> + {'$socket', Socket, select, Ref} -> close_notify_loop(SSLContext, Socket); - {closed, Ref} -> + {'$socket', Socket, abort, {Ref, closed}} -> {error, closed} end; {error, _Reason} = Error -> diff --git a/src/platforms/rp2/tests/test_erl_sources/CMakeLists.txt b/src/platforms/rp2/tests/test_erl_sources/CMakeLists.txt index 6007ce23fc..773a555dc9 100644 --- a/src/platforms/rp2/tests/test_erl_sources/CMakeLists.txt +++ b/src/platforms/rp2/tests/test_erl_sources/CMakeLists.txt @@ -18,6 +18,7 @@ # SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later # +include(ExternalProject) ExternalProject_Add(HostAtomVM SOURCE_DIR ../../../../../../ INSTALL_COMMAND cmake -E echo "Skipping install step." diff --git a/tests/libs/estdlib/test_tcp_socket.erl b/tests/libs/estdlib/test_tcp_socket.erl index f49c5c1e43..fa154af175 100644 --- a/tests/libs/estdlib/test_tcp_socket.erl +++ b/tests/libs/estdlib/test_tcp_socket.erl @@ -27,7 +27,9 @@ test() -> ok = test_shutdown(), ok = test_close_by_another_process(), ok = test_buf_size(), - ok = test_override_buf_size(), + ok = test_timeout(), + ok = test_nowait(), + ok = test_setopt_getopt(), case get_otp_version() of atomvm -> ok = test_abandon_select(); @@ -36,6 +38,8 @@ test() -> end, ok. +-define(PACKET_SIZE, 7). + test_echo_server() -> etest:flush_msg_queue(), @@ -44,7 +48,7 @@ test_echo_server() -> test_send_receive(Port, 10), - close_listen_socket(ListenSocket). + ok = close_listen_socket(ListenSocket). %% %% test_shutdown @@ -63,12 +67,12 @@ test_shutdown() -> id(ok). test_shutdown_of_client_sockets(Port) -> - ok = test_shutdown_of_side(Port, write), - ok = test_shutdown_of_side(Port, read_write), - ok = test_shutdown_of_side(Port, read), + ok = test_shutdown_of_side(Port, write, <<"echo:01">>), + ok = test_shutdown_of_side(Port, read_write, <<"echo:02">>), + ok = test_shutdown_of_side(Port, read, <<"echo:03">>), id(ok). -test_shutdown_of_side(Port, Side) -> +test_shutdown_of_side(Port, Side, Packet) -> {ok, Socket} = socket:open(inet, stream, tcp), ok = try_connect(Socket, Port, 10), @@ -76,21 +80,25 @@ test_shutdown_of_side(Port, Side) -> case Side of read -> %% read on the socket should fail - socket:send(Socket, erlang:atom_to_binary(Side, latin1)), + socket:send(Socket, Packet), case catch (socket:recv(Socket)) of {error, _} -> ok; {ok, Data} -> - %% On some Linux kernels, shutdown is not guaranteed to - %% result in an error on read. + %% On some Linux kernels, shutdown doesn't return an error + %% until all buffered data is read. %% C.f. https://stackoverflow.com/questions/740817/behavior-of-shutdownsock-shut-rd-with-tcp - erlang:display({warning, expected_error_on_recv, Side, Data}), - % error({expected_error_on_recv, Side, Data}) - ok + %% Second recv will fail + case catch (socket:recv(Socket)) of + {error, _} -> + ok; + {ok, Data} -> + error({expected_error_on_recv, Side, Data}) + end end; _ -> %% write on the socket should fail - case catch (socket:send(Socket, erlang:atom_to_binary(Side, latin1))) of + case catch (socket:send(Socket, Packet)) of {error, _} -> ok; {ok, Data} -> @@ -122,25 +130,7 @@ test_close_by_another_process() -> timer:sleep(10), - ok = close_listen_socket(ListenSocket), - - id(ok). - -check_receive(Socket, Packet, Length, Expect) -> - case socket:send(Socket, Packet) of - ok -> - ok = - case socket:recv(Socket, Length) of - {ok, Expect} -> - ok; - Error -> - io:format("Unexpected value on recv: ~p~n", [Error]), - Error - end; - {error, Reason} = Error -> - io:format("Error on send: ~p~n", [Reason]), - Error - end. + ok = close_listen_socket(ListenSocket). test_buf_size() -> etest:flush_msg_queue(), @@ -156,50 +146,26 @@ test_buf_size() -> {error, _} = socket:setopt(Socket, {otp, rcvbuf}, not_an_int), {error, _} = socket:setopt(Socket, {otp, rcvbuf}, -1), - %% limit the recv buffer size to 10 bytes - ok = socket:setopt(Socket, {otp, rcvbuf}, 10), - - Packet = "012345678901234567890123456789", + %% limit the recv buffer size to 5 bytes + ok = socket:setopt(Socket, {otp, rcvbuf}, 5), + true = 5 < ?PACKET_SIZE, %% we should only be able to receive - ok = check_receive(Socket, Packet, 0, <<"0123456789">>), - ok = check_receive(Socket, Packet, 0, <<"0123456789">>), - ok = check_receive(Socket, Packet, 0, <<"0123456789">>), - - timer:sleep(10), - - ok = close_client_socket(Socket), - - ok = close_listen_socket(ListenSocket), - - id(ok). - -test_override_buf_size() -> - etest:flush_msg_queue(), - - Port = 44404, - ListenSocket = start_echo_server(Port), - - {ok, Socket} = socket:open(inet, stream, tcp), - ok = try_connect(Socket, Port, 10), - - %% limit the recv buffer size to 10 bytes - ok = socket:setopt(Socket, {otp, rcvbuf}, 10), - - Packet = "012345678901234567890123456789", + ok = socket:send(Socket, <<"echo:01">>), + {ok, <<"echo:">>} = socket:recv(Socket, 0, 5000), + {ok, <<"01">>} = socket:recv(Socket, 0, 5000), + ok = socket:send(Socket, <<"echo:02">>), + {ok, <<"echo:">>} = socket:recv(Socket, 0, 5000), + {ok, <<"02">>} = socket:recv(Socket, 0, 5000), %% verify that the socket:recv length parameter takes %% precedence over the default - ok = check_receive(Socket, Packet, 15, <<"012345678901234">>), - ok = check_receive(Socket, Packet, 15, <<"567890123456789">>), - - timer:sleep(10), + ok = socket:send(Socket, <<"echo:03">>), + {ok, <<"echo:03">>} = socket:recv(Socket, ?PACKET_SIZE, 5000), ok = close_client_socket(Socket), - ok = close_listen_socket(ListenSocket), - - id(ok). + ok = close_listen_socket(ListenSocket). %% %% echo_server @@ -244,18 +210,18 @@ accept(Pid, ListenSocket) -> end. echo(Pid, Socket) -> - case socket:recv(Socket) of - {ok, Packet} -> - % Pid ! {packet_received, Packet}, - ok = - case socket:send(Socket, Packet) of - ok -> - ok; - E -> - %% TODO support returning Rest when Packet > buffer_size - {unexpected_reply_from_send, E} - end, - % Pid ! {packet_echoed, Packet}, + case socket:recv(Socket, ?PACKET_SIZE) of + {ok, <<"echo:", _/binary>> = Packet} -> + ok = socket:send(Socket, Packet), + echo(Pid, Socket); + {ok, <<"wait:", _/binary>> = Packet} -> + timer:sleep(500), + ok = socket:send(Socket, Packet), + echo(Pid, Socket); + {ok, <<"chnk:", Rest/binary>>} -> + ok = socket:send(Socket, <<"chnk:">>), + timer:sleep(500), + ok = socket:send(Socket, Rest), echo(Pid, Socket); %% estdlib TODO {error, closed} -> @@ -265,6 +231,9 @@ echo(Pid, Socket) -> {error, econnreset} -> Pid ! recv_terminated, ok; + {error, {closed, <<"read">>}} -> + Pid ! recv_terminated, + ok; SomethingElse -> error({unexpected_return_from_recv, SomethingElse}) end. @@ -274,23 +243,13 @@ close_listen_socket(ListenSocket) -> %% Close the socket, and wait for a signal that we came out of accept %% ok = socket:close(ListenSocket), - receive - accept_terminated -> - ok - after 1000 -> - %% - %% Closing the listening socket from another process may in some - %% cases not result in the blocking accept to break out of its - %% call with an expected return value. In this case, we will - %% allow the wait for the `accept_terminated` message to fail - %% and simply warn the user. See TODO comment to this effect in - %% `nif_socket_close` function in `otp_socket.c` - %% - erlang:display({warning, timeout, waiting, accept_terminated}) - % throw({timeout, waiting, accept_terminated}) - end, - - ok. + ok = + receive + accept_terminated -> + ok + after 1000 -> + {error, {timeout, accept_terminated}} + end. %% %% send_receive loop @@ -312,7 +271,7 @@ close_client_socket(Socket) -> receive recv_terminated -> ok - after 1000 -> + after 2000 -> throw({timeout, waiting, recv_terminated}) end. @@ -330,7 +289,8 @@ try_connect(Socket, Port, Tries) -> send_receive_loop(_Socket, 0) -> ok; send_receive_loop(Socket, I) -> - Packet = pid_to_list(self()) ++ ":" ++ integer_to_list(I), + Packet = list_to_binary(io_lib:format("echo:~2.10.0B", [I])), + ?PACKET_SIZE = byte_size(Packet), case socket:send(Socket, Packet) of ok -> case socket:recv(Socket) of @@ -345,6 +305,170 @@ send_receive_loop(Socket, I) -> Error end. +receive_loop_nowait(Socket, Packet) when byte_size(Packet) > 0 -> + case socket:recv(Socket, byte_size(Packet), nowait) of + {ok, ReceivedPacket} when ReceivedPacket =:= Packet -> + ok; + {select, {select_info, recv, SelectHandle}} when is_reference(SelectHandle) -> + receive + {'$socket', Socket, select, SelectHandle} -> + receive_loop_nowait(Socket, Packet) + after 5000 -> + {error, timeout} + end; + {select, {{select_info, recv, SelectHandle}, Data}} when is_reference(SelectHandle) -> + {Data, Rest} = split_binary(Packet, byte_size(Data)), + receive + {'$socket', Socket, select, SelectHandle} -> + receive_loop_nowait(Socket, Rest) + after 5000 -> + {error, timeout} + end; + {error, _} = Error -> + io:format("Error on recv: ~p~n", [Error]), + Error + end. + +receive_loop_nowait_ref(Socket, Packet) when byte_size(Packet) > 0 -> + Ref = make_ref(), + case socket:recv(Socket, byte_size(Packet), Ref) of + {ok, ReceivedPacket} when ReceivedPacket =:= Packet -> + ok; + {select, {select_info, recv, Ref}} -> + receive + {'$socket', Socket, select, Ref} -> + receive_loop_nowait_ref(Socket, Packet) + after 5000 -> + {error, timeout} + end; + {select, {{select_info, recv, Ref}, Data}} -> + {Data, Rest} = split_binary(Packet, byte_size(Data)), + receive + {'$socket', Socket, select, Ref} -> + receive_loop_nowait_ref(Socket, Rest) + after 5000 -> + {error, timeout} + end; + {error, _} = Error -> + io:format("Error on recv: ~p~n", [Error]), + Error + end. + +test_timeout() -> + etest:flush_msg_queue(), + + Port = 44404, + ListenSocket = start_echo_server(Port), + + {ok, Socket} = socket:open(inet, stream, tcp), + ok = try_connect(Socket, Port, 10), + + % receive of two chunks with an infinity timeout + Packet0 = <<"chnk:00">>, + ok = socket:send(Socket, Packet0), + {ok, Packet0} = socket:recv(Socket, ?PACKET_SIZE, infinity), + + % receive of two chunks with a large timeout + Packet1 = <<"chnk:01">>, + ok = socket:send(Socket, Packet1), + {ok, Packet1} = socket:recv(Socket, ?PACKET_SIZE, 5000), + + % receive of two chunks with a small timeout causing a timeout error + Packet2 = <<"chnk:02">>, + ok = socket:send(Socket, Packet2), + {error, Timeout02} = socket:recv(Socket, ?PACKET_SIZE, 250), + case Timeout02 of + {timeout, <<"chnk:">>} -> + % AtomVM usually does return partial data + {ok, <<"02">>} = socket:recv(Socket, 2, infinity); + timeout -> + % BEAM OTP-27 seems to never return partial data + {ok, <<"chnk:02">>} = socket:recv(Socket, ?PACKET_SIZE, infinity) + end, + + % receive of two chunks with a null timeout causing a timeout error + Packet3 = <<"chnk:03">>, + ok = socket:send(Socket, Packet3), + timer:sleep(250), + case socket:recv(Socket, ?PACKET_SIZE, 0) of + {ok, <<"chnk:">>} -> + % BEAM OTP-22 to OTP-24 returns this on Linux on the CI. + {ok, <<"03">>} = socket:recv(Socket, 2); + {error, Timeout03} -> + case Timeout03 of + {timeout, <<"chnk:">>} -> + % BEAM OTP-27 seems to always return partial data + % AtomVM usually does + {ok, <<"03">>} = socket:recv(Socket, 2); + timeout -> + % Depending on scheduling, AtomVM may return no partial data + {ok, <<"chnk:03">>} = socket:recv(Socket, ?PACKET_SIZE) + end + end, + + % Test recv + ok = socket:send(Socket, <<"wait:01">>), + {error, timeout} = socket:recv(Socket, 0, 100), + {ok, <<"wait:01">>} = socket:recv(Socket, 0, 5000), + + ok = socket:send(Socket, <<"wait:02">>), + {error, timeout} = socket:recv(Socket, ?PACKET_SIZE, 0), + {ok, <<"wait:02">>} = socket:recv(Socket, ?PACKET_SIZE, 5000), + + ok = socket:send(Socket, <<"wait:03">>), + {error, Timeout04} = socket:recv(Socket, 2 * ?PACKET_SIZE, 5000), + ok = + case Timeout04 of + {timeout, <<"wait:03">>} -> + % AtomVM usually does return partial data + ok; + timeout -> + % BEAM OTP-27 seems to never return partial data + ok + end, + + ok = close_client_socket(Socket), + ok = close_listen_socket(ListenSocket). + +test_nowait() -> + ok = test_nowait(fun receive_loop_nowait/2), + ok = test_nowait(fun receive_loop_nowait_ref/2), + ok. + +test_nowait(ReceiveFun) -> + etest:flush_msg_queue(), + + Port = 44404, + ListenSocket = start_echo_server(Port), + + {ok, Socket} = socket:open(inet, stream, tcp), + ok = try_connect(Socket, Port, 10), + + Packet0 = <<"echo:00">>, + ok = socket:send(Socket, Packet0), + ok = ReceiveFun(Socket, Packet0), + + Packet1 = <<"wait:00">>, + ok = socket:send(Socket, Packet1), + ok = ReceiveFun(Socket, Packet1), + + Packet2 = <<"chnk:00">>, + ok = socket:send(Socket, Packet2), + ok = ReceiveFun(Socket, Packet2), + + ok = close_client_socket(Socket), + + ok = close_listen_socket(ListenSocket). + +test_setopt_getopt() -> + {ok, Socket} = socket:open(inet, stream, tcp), + {ok, stream} = socket:getopt(Socket, {socket, type}), + ok = socket:setopt(Socket, {socket, reuseaddr}, true), + ok = socket:close(Socket), + {error, closed} = socket:getopt(Socket, {socket, type}), + {error, closed} = socket:setopt(Socket, {socket, reuseaddr}, true), + ok. + %% %% abandon_select test %% diff --git a/tests/libs/estdlib/test_udp_socket.erl b/tests/libs/estdlib/test_udp_socket.erl index 4ca20d687b..ffd67585c4 100644 --- a/tests/libs/estdlib/test_udp_socket.erl +++ b/tests/libs/estdlib/test_udp_socket.erl @@ -23,80 +23,258 @@ -export([test/0]). test() -> - ok = test_echo_server(), + ok = test_echo(), + ok = test_buf_size(), + ok = test_timeout(), + ok = test_nowait(), + ok = test_setopt_getopt(), ok. -test_echo_server() -> - Port = 44405, - {ok, ReceiveSocket} = socket:open(inet, dgram, udp), +-define(PACKET_SIZE, 7). + +start_echo_server(Port) -> + {ok, Socket} = socket:open(inet, dgram, udp), - ok = socket:setopt(ReceiveSocket, {socket, reuseaddr}, true), - ok = socket:setopt(ReceiveSocket, {socket, linger}, #{onoff => true, linger => 0}), + ok = socket:setopt(Socket, {socket, reuseaddr}, true), + ok = socket:setopt(Socket, {socket, linger}, #{onoff => true, linger => 0}), - ok = socket:bind(ReceiveSocket, #{ + ok = socket:bind(Socket, #{ family => inet, addr => loopback, port => Port }), - Self = self(), - spawn(fun() -> - Self ! ready, - receive_loop(Self, ReceiveSocket) - end), - - receive - ready -> - ok - end, - - test_send_receive(Port, 10), - - %% - %% Close the socket, and wait for a signal that we came out of recvfrom - %% - ok = socket:close(ReceiveSocket), - receive - recv_terminated -> ok - after 1000 -> - %% This is UDP, so raising an error might not be fair here. - %% Let's just log instead. - erlang:display({innocuous_udp_timeout, waiting, recv_terminated}) - end, - ok. + {Pid, MonitorRef} = spawn_opt( + fun() -> + echo_server_loop(Socket) + end, + [monitor] + ), -receive_loop(Pid, ReceiveSocket) -> - case socket:recvfrom(ReceiveSocket) of - {ok, {_Source, Packet}} -> - Pid ! {received, Packet}, - receive_loop(Pid, ReceiveSocket); + {Pid, MonitorRef, Socket}. + +echo_server_loop(Socket) -> + case socket:recvfrom(Socket, 0, 5000) of + {ok, {Source, <<"echo:", _/binary>> = Packet}} -> + ok = socket:sendto(Socket, Packet, Source), + echo_server_loop(Socket); + {ok, {Source, <<"wait:", _/binary>> = Packet}} -> + timer:sleep(500), + ok = socket:sendto(Socket, Packet, Source), + echo_server_loop(Socket); + {ok, {Source, <<"chnk:", Rest/binary>>}} -> + ok = socket:sendto(Socket, <<"chnk:">>, Source), + ok = socket:sendto(Socket, Rest, Source), + echo_server_loop(Socket); {error, closed} -> - Pid ! recv_terminated; + ok; SomethingElse -> - Pid ! recv_terminated, error({unexpected_return_from_recv, SomethingElse}) end. -test_send_receive(Port, N) -> +stop_echo_server({Pid, MonitorRef, Socket}) -> + % We stop the server by closing the packet. + ok = socket:close(Socket), + normal = + receive + {'DOWN', MonitorRef, process, Pid, Reason} -> Reason + end, + ok. + +test_echo() -> + Port = 44405, + EchoServer = start_echo_server(Port), + Dest = #{family => inet, addr => {127, 0, 0, 1}, port => Port}, + {ok, Socket} = socket:open(inet, dgram, udp), + + % Test recvfrom + ok = socket:sendto(Socket, <<"echo:01">>, Dest), + {ok, {Dest, <<"echo:01">>}} = socket:recvfrom(Socket, 0, 5000), + + % Test recv + ok = socket:sendto(Socket, <<"echo:02">>, Dest), + {ok, <<"echo:02">>} = socket:recv(Socket, 0, 5000), + + % Test loopback + ok = socket:sendto(Socket, <<"echo:03">>, #{family => inet, addr => loopback, port => Port}), + {ok, {Dest, <<"echo:03">>}} = socket:recvfrom(Socket, 0, 5000), + + % Chunk means two packets with UDP + ok = socket:sendto(Socket, <<"chnk:01">>, Dest), + timer:sleep(200), + {ok, {Dest, <<"chnk:">>}} = socket:recvfrom(Socket, 0, 5000), + {ok, {Dest, <<"01">>}} = socket:recvfrom(Socket, 0, 5000), + + % Chunk means two packets with UDP, including with recv + ok = socket:sendto(Socket, <<"chnk:02">>, Dest), + timer:sleep(200), + {ok, <<"chnk:">>} = socket:recv(Socket, 0, 5000), + {ok, <<"02">>} = socket:recv(Socket, 0, 5000), + + ok = socket:close(Socket), + ok = stop_echo_server(EchoServer). + +test_buf_size() -> + Port = 44405, + EchoServer = start_echo_server(Port), + Dest = #{family => inet, addr => {127, 0, 0, 1}, port => Port}, + {ok, Socket} = socket:open(inet, dgram, udp), + + %% try a few failures first + {error, _} = socket:setopt(Socket, {otp, badopt}, any_value), + {error, _} = socket:setopt(Socket, {otp, rcvbuf}, not_an_int), + {error, _} = socket:setopt(Socket, {otp, rcvbuf}, -1), + + %% limit the recv buffer size to 5 bytes + ok = socket:setopt(Socket, {otp, rcvbuf}, 5), + true = 5 < ?PACKET_SIZE, + + %% we should only be able to receive + ok = socket:sendto(Socket, <<"echo:01">>, Dest), + {ok, {Dest, <<"echo:">>}} = socket:recvfrom(Socket, 0, 5000), + {error, timeout} = socket:recvfrom(Socket, 0, 0), + ok = socket:sendto(Socket, <<"echo:01">>, Dest), + {ok, {Dest, <<"echo:">>}} = socket:recvfrom(Socket, 0, 5000), + {error, timeout} = socket:recvfrom(Socket, 0, 0), + + %% verify that the socket:recv length parameter takes + %% precedence over the default + ok = socket:sendto(Socket, <<"echo:03">>, Dest), + {ok, {Dest, <<"echo:03">>}} = socket:recvfrom(Socket, ?PACKET_SIZE, 5000), + + ok = socket:close(Socket), + ok = stop_echo_server(EchoServer). + +test_timeout() -> + Port = 44405, + EchoServer = start_echo_server(Port), + Dest = #{family => inet, addr => {127, 0, 0, 1}, port => Port}, + {ok, Socket} = socket:open(inet, dgram, udp), + + % Test recvfrom + ok = socket:sendto(Socket, <<"wait:01">>, Dest), + {error, timeout} = socket:recvfrom(Socket, 0, 100), + {ok, {Dest, <<"wait:01">>}} = socket:recvfrom(Socket, 0, 5000), + + ok = socket:sendto(Socket, <<"wait:02">>, Dest), + {error, timeout} = socket:recvfrom(Socket, ?PACKET_SIZE, 0), + {ok, {Dest, <<"wait:02">>}} = socket:recvfrom(Socket, ?PACKET_SIZE, 5000), + + ok = socket:sendto(Socket, <<"wait:03">>, Dest), + {error, timeout} = socket:recvfrom(Socket, 0, 0), + {ok, {Dest, <<"wait:03">>}} = socket:recvfrom(Socket, 10, infinity), + + % Test recv + ok = socket:sendto(Socket, <<"wait:01">>, Dest), + {error, timeout} = socket:recv(Socket, 0, 100), + {ok, <<"wait:01">>} = socket:recv(Socket, 0, 5000), + + ok = socket:sendto(Socket, <<"wait:02">>, Dest), + {error, timeout} = socket:recv(Socket, ?PACKET_SIZE, 0), + {ok, <<"wait:02">>} = socket:recv(Socket, ?PACKET_SIZE, 5000), + + ok = socket:sendto(Socket, <<"wait:03">>, Dest), + {error, timeout} = socket:recv(Socket, 2 * ?PACKET_SIZE, 0), + {ok, <<"wait:03">>} = socket:recv(Socket, 2 * ?PACKET_SIZE, 5000), + + ok = socket:close(Socket), + ok = stop_echo_server(EchoServer). + +test_nowait() -> + ok = test_nowait(fun receive_loop_nowait/2), + ok = test_nowait(fun receive_loop_nowait_ref/2), + ok = test_nowait(fun receive_loop_recvfrom_nowait/2), + ok = test_nowait(fun receive_loop_recvfrom_nowait_ref/2), + ok. + +test_nowait(ReceiveFun) -> + etest:flush_msg_queue(), + + Port = 44404, + EchoServer = start_echo_server(Port), + Dest = #{family => inet, addr => {127, 0, 0, 1}, port => Port}, {ok, Socket} = socket:open(inet, dgram, udp), - ok = loop(Socket, Port, N), + Packet0 = <<"echo:00">>, + ok = socket:sendto(Socket, Packet0, Dest), + ok = ReceiveFun(Socket, Packet0), + + Packet1 = <<"wait:00">>, + ok = socket:sendto(Socket, Packet1, Dest), + ok = ReceiveFun(Socket, Packet1), + + ok = socket:close(Socket), + ok = stop_echo_server(EchoServer). + +receive_loop_nowait(Socket, Packet) -> + case socket:recv(Socket, byte_size(Packet), nowait) of + {ok, ReceivedPacket} when ReceivedPacket =:= Packet -> + ok; + {select, {select_info, recv, SelectHandle}} when is_reference(SelectHandle) -> + receive + {'$socket', Socket, select, SelectHandle} -> + receive_loop_nowait(Socket, Packet) + after 5000 -> + {error, timeout} + end; + {error, _} = Error -> + io:format("Error on recv: ~p~n", [Error]), + Error + end. + +receive_loop_nowait_ref(Socket, Packet) -> + Ref = make_ref(), + case socket:recv(Socket, byte_size(Packet), Ref) of + {ok, ReceivedPacket} when ReceivedPacket =:= Packet -> + ok; + {select, {select_info, recv, Ref}} -> + receive + {'$socket', Socket, select, Ref} -> + receive_loop_nowait_ref(Socket, Packet) + after 5000 -> + {error, timeout} + end; + {error, _} = Error -> + io:format("Error on recv: ~p~n", [Error]), + Error + end. - %% - %% Close the socket - %% - ok = socket:close(Socket). +receive_loop_recvfrom_nowait(Socket, Packet) -> + case socket:recvfrom(Socket, byte_size(Packet), nowait) of + {ok, {_Source, ReceivedPacket}} when ReceivedPacket =:= Packet -> + ok; + {select, {select_info, recvfrom, SelectHandle}} when is_reference(SelectHandle) -> + receive + {'$socket', Socket, select, SelectHandle} -> + receive_loop_nowait(Socket, Packet) + after 5000 -> + {error, timeout} + end; + {error, _} = Error -> + io:format("Error on recv: ~p~n", [Error]), + Error + end. -loop(_Socket, _Port, 0) -> - ok; -loop(Socket, Port, I) -> - Packet = pid_to_list(self()) ++ ":" ++ integer_to_list(I), - Dest = #{family => inet, addr => loopback, port => Port}, - case socket:sendto(Socket, Packet, Dest) of - ok -> +receive_loop_recvfrom_nowait_ref(Socket, Packet) -> + Ref = make_ref(), + case socket:recvfrom(Socket, byte_size(Packet), Ref) of + {ok, {_Source, ReceivedPacket}} when ReceivedPacket =:= Packet -> + ok; + {select, {select_info, recvfrom, Ref}} -> receive - {received, _Packet} -> - loop(Socket, Port, I - 1) + {'$socket', Socket, select, Ref} -> + receive_loop_nowait_ref(Socket, Packet) + after 5000 -> + {error, timeout} end; - {error, _Reason} = Error -> - io:format("Error on sendto: ~p~n", [Error]), + {error, _} = Error -> + io:format("Error on recv: ~p~n", [Error]), Error end. + +test_setopt_getopt() -> + {ok, Socket} = socket:open(inet, dgram, udp), + {ok, dgram} = socket:getopt(Socket, {socket, type}), + ok = socket:setopt(Socket, {socket, reuseaddr}, true), + ok = socket:close(Socket), + {error, closed} = socket:getopt(Socket, {socket, type}), + {error, closed} = socket:setopt(Socket, {socket, reuseaddr}, true), + ok.