diff --git a/CHANGELOG.md b/CHANGELOG.md index f0a65dda4..18b7d8d9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ certain VM instructions are used. - Fixed ESP32 GPIO interrupt trigger `none` - Fixed an issue where a timeout would occur immediately in a race condition - Fixed SPI close command +- Added missing lock on socket structure ## [0.6.5] - 2024-10-15 diff --git a/src/libAtomVM/otp_socket.c b/src/libAtomVM/otp_socket.c index 5f7d84c46..5655d7ba8 100644 --- a/src/libAtomVM/otp_socket.c +++ b/src/libAtomVM/otp_socket.c @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -152,6 +153,9 @@ struct SocketResource int32_t selecting_process_id; ErlNifMonitor selecting_process_monitor; size_t buf_size; +#ifndef AVM_NO_SMP + RWLock *socket_lock; +#endif }; #elif OTP_SOCKET_LWIP struct SocketResource @@ -170,6 +174,9 @@ struct SocketResource size_t pos; struct ListHead received_list; size_t buf_size; +#ifndef AVM_NO_SMP + RWLock *socket_lock; +#endif }; #endif @@ -261,6 +268,9 @@ static void socket_dtor(ErlNifEnv *caller_env, void *obj) } LWIP_END(); #endif +#ifndef AVM_NO_SMP + smp_rwlock_destroy(rsrc_obj->socket_lock); +#endif } #if OTP_SOCKET_BSD @@ -295,6 +305,12 @@ static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNif TRACE("socket_down called on process_id=%i\n", (int) *pid); #endif + // Increment the reference count so the resource doesn't go away + // (enif_select will decrement the ref count) + struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj); + refc_binary_increment_refcount(rsrc_refc); + SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock); + #if OTP_SOCKET_BSD if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { // Monitor fired, so make sure we don't try to demonitor in select_stop @@ -323,6 +339,9 @@ static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNif LWIP_END(); } #endif + + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + refc_binary_decrement_refcount(rsrc_refc, caller_env->global); } static const ErlNifResourceTypeInit SocketResourceTypeInit = { @@ -502,6 +521,13 @@ static term nif_socket_open(Context *ctx, int argc, term argv[]) RAISE_ERROR(OUT_OF_MEMORY_ATOM); } +#ifndef AVM_NO_SMP + rsrc_obj->socket_lock = smp_rwlock_create(); + if (IS_NULL_PTR(rsrc_obj->socket_lock)) { + free(rsrc_obj); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } +#endif #if OTP_SOCKET_BSD rsrc_obj->fd = socket(domain, type, protocol); if (UNLIKELY(rsrc_obj->fd == -1 || rsrc_obj->fd == CLOSED_FD)) { @@ -604,10 +630,10 @@ bool term_is_otp_socket(term socket_term) // close // -static int send_closed_notification(Context *ctx, struct SocketResource *rsrc_obj) +static int send_closed_notification(Context *ctx, term socket_term, int32_t selecting_process_id, struct SocketResource *rsrc_obj) { // send a {closed, Ref | undefined} message to the pid - if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + REF_SIZE) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(2) + REF_SIZE, 1, &socket_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); return -1; } @@ -617,8 +643,8 @@ static int send_closed_notification(Context *ctx, struct SocketResource *rsrc_ob term ref = (rsrc_obj->ref_ticks == 0) ? UNDEFINED_ATOM : term_from_ref_ticks(rsrc_obj->ref_ticks, &ctx->heap); term_put_tuple_element(error_tuple, 1, ref); - TRACE("nif_socket_close: Sending msg to process %i, rsrc_obj = %p\n", (int) rsrc_obj->selecting_process_id, (void *) rsrc_obj); - globalcontext_send_message(ctx->global, rsrc_obj->selecting_process_id, error_tuple); + TRACE("nif_socket_close: Sending msg to process %i, rsrc_obj = %p\n", (int) selecting_process_id, (void *) rsrc_obj); + globalcontext_send_message(ctx->global, selecting_process_id, error_tuple); return 0; } @@ -641,43 +667,51 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { RAISE_ERROR(BADARG_ATOM); } + + SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock); + #if OTP_SOCKET_BSD if (rsrc_obj->fd) { + // In POSIX with BSD sockets, if a file descriptor being monitored by + // select() is closed in another thread, the result is unspecified. + // select may continue. + // + // However, in Erlang, asynchronous sockets support closing from another + // process, as documented in specification of the abort message. + + // So we handle closing a socket while another process is selecting if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { - // - // If we are in select, then stop selecting - // + // Save process id as socket_stop may be called by enif_select. + int32_t selecting_process_id = rsrc_obj->selecting_process_id; + // Stop selecting. int stop_res = enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil()); if (UNLIKELY(stop_res < 0)) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } - // TODO: check if stop_res & ERL_NIF_SELECT_STOP_CALLED or ERL_NIF_SELECT_STOP_SCHEDULED - // following what OTP does. Indeed, if we have ERL_NIF_SELECT_STOP_SCHEDULED, we should not close the socket now - // - // If there is a process (other than ourself) who is waiting on select - // the send a {closed, Ref} message to it, so that it can break + // If there is a selecting process who may be waiting on select, + // send a closed notification to it, so that it can break // out of its receive statement. // - if (rsrc_obj->selecting_process_id != ctx->process_id) { + // When using asynchronous API, the selecting process can be the + // calling process. In this case we don't send any notification. + // + if (selecting_process_id != ctx->process_id) { // send a {closed, Ref | undefined} message to the pid - if (UNLIKELY(send_closed_notification(ctx, rsrc_obj) < 0)) { + if (UNLIKELY(send_closed_notification(ctx, argv[0], selecting_process_id, rsrc_obj) < 0)) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - } else { - AVM_LOGW(TAG, "Selectable socket %i was closed but no known pid is waiting. This shouldn't happen.", rsrc_obj->fd); } } + // Eventually close the socket int res = close(rsrc_obj->fd); if (UNLIKELY(res != 0)) { AVM_LOGW(TAG, "Failed to close socket %i", res); } - - TRACE("nif_socket_close: Clearing pid for socket fd=%i\n", rsrc_obj->fd); rsrc_obj->fd = CLOSED_FD; - rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; - rsrc_obj->ref_ticks = 0; } else { TRACE("Double close on socket fd %i", rsrc_obj->fd); } @@ -687,7 +721,8 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) && rsrc_obj->selecting_process_id != INVALID_PROCESS_ID && rsrc_obj->selecting_process_id != ctx->process_id) { // send a {closed, Ref | undefined} message to the pid - if (UNLIKELY(send_closed_notification(ctx, rsrc_obj) < 0)) { + if (UNLIKELY(send_closed_notification(ctx, argv[0], rsrc_obj->selecting_process_id, rsrc_obj) < 0)) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } } @@ -721,7 +756,8 @@ static term nif_socket_close(Context *ctx, int argc, term argv[]) } #endif - rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE; + + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; } @@ -744,6 +780,13 @@ static struct SocketResource *make_accepted_socket_resource(struct tcp_pcb *newp conn_rsrc_obj->linger_on = false; conn_rsrc_obj->linger_sec = 0; conn_rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE; +#ifndef AVM_NO_SMP + conn_rsrc_obj->socket_lock = smp_rwlock_create(); + if (IS_NULL_PTR(conn_rsrc_obj->socket_lock)) { + free(conn_rsrc_obj); + return NULL; + } +#endif list_init(&conn_rsrc_obj->received_list); tcp_arg(newpcb, conn_rsrc_obj); @@ -905,6 +948,8 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[]) RAISE_ERROR(BADARG_ATOM); } + SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock); + ErlNifEnv *env = erl_nif_env_from_context(ctx); if (rsrc_obj->selecting_process_id != ctx->process_id && rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) { // demonitor can fail if process is gone. @@ -915,6 +960,7 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[]) // if select fails than to stop select if monitor fails if (rsrc_obj->selecting_process_id != ctx->process_id) { if (UNLIKELY(enif_monitor_process(env, rsrc_obj, &ctx->process_id, &rsrc_obj->selecting_process_monitor) != 0)) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(NOPROC_ATOM); } rsrc_obj->selecting_process_id = ctx->process_id; @@ -925,14 +971,18 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[]) #if OTP_SOCKET_BSD TRACE("rsrc_obj->fd=%i\n", (int) rsrc_obj->fd); - if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_READ, rsrc_obj, &ctx->process_id, select_ref_term) < 0)) { - enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor); - rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; - RAISE_ERROR(BADARG_ATOM); + // The socket may be closed here. + if (rsrc_obj->fd == CLOSED_FD) { + send_closed_notification(ctx, argv[0], ctx->process_id, rsrc_obj); + } else { + if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_READ, rsrc_obj, &ctx->process_id, select_ref_term) < 0)) { + enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + RAISE_ERROR(BADARG_ATOM); + } } - TRACE("nif_socket_select: Setting pid for socket fd %i to %i\n", (int) rsrc_obj->fd, (int) ctx->process_id); - #elif OTP_SOCKET_LWIP LWIP_BEGIN(); switch (rsrc_obj->socket_state) { @@ -974,11 +1024,13 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[]) default: enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor); LWIP_END(); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } LWIP_END(); #endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; } @@ -993,21 +1045,23 @@ static term nif_socket_select_stop(Context *ctx, int argc, term argv[]) if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { RAISE_ERROR(BADARG_ATOM); } + // Avoid the race condition with select object here. + SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock); + rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; #if OTP_SOCKET_BSD if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil()) < 0)) { RAISE_ERROR(BADARG_ATOM); } - - return OK_ATOM; #elif OTP_SOCKET_LWIP LWIP_BEGIN(); if (rsrc_obj->socket_state & SocketStateSelectingRead) { rsrc_obj->socket_state &= ~SocketStateSelectingRead; } LWIP_END(); +#endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; -#endif } // @@ -1028,11 +1082,14 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) RAISE_ERROR(BADARG_ATOM); } + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); + #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { #endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } term level_tuple = argv[1]; @@ -1049,6 +1106,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) int option_value = (value == TRUE_ATOM); #if OTP_SOCKET_BSD int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int)); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); if (UNLIKELY(res != 0)) { return make_errno_tuple(ctx); } else { @@ -1070,6 +1128,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) } } LWIP_END(); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; #endif } else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) { @@ -1082,6 +1141,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) sl.l_onoff = (onoff == TRUE_ATOM); sl.l_linger = term_to_int(linger); int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); if (UNLIKELY(res != 0)) { return make_errno_tuple(ctx); } else { @@ -1090,6 +1150,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) #elif OTP_SOCKET_LWIP rsrc_obj->linger_on = (onoff == TRUE_ATOM); rsrc_obj->linger_sec = term_to_int(linger); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; #endif // TODO add more as needed @@ -1099,6 +1160,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) // AVM_LOGW(TAG, "Failed to set TCP_NODELAY."); // } } else { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } @@ -1109,27 +1171,32 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) // TODO support the atom `default` as a value to roll back to the default buffer size if (UNLIKELY(!term_is_integer(value))) { - AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value must be an integer"); + TRACE("socket:setopt: otp rcvbuf value must be an integer"); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx); } avm_int_t buf_size = term_to_int(value); if (UNLIKELY(buf_size < 0)) { - AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value may not be negative"); + TRACE("socket:setopt: otp rcvbuf value may not be negative"); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx); } rsrc_obj->buf_size = (size_t) buf_size; + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; } else { - AVM_LOGE(TAG, "socket:setopt: Unsupported otp option"); + TRACE("socket:setopt: Unsupported otp option"); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(globalcontext_make_atom(global, invalid_option_atom), ctx); } } default: { - AVM_LOGE(TAG, "socket:setopt: Unsupported level"); + TRACE("socket:setopt: Unsupported level"); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } } @@ -1153,11 +1220,14 @@ static term nif_socket_sockname(Context *ctx, int argc, term argv[]) if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { RAISE_ERROR(BADARG_ATOM); } + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); + #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { #endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } @@ -1168,6 +1238,7 @@ static term nif_socket_sockname(Context *ctx, int argc, term argv[]) if (UNLIKELY(res != 0)) { AVM_LOGE(TAG, "Unable to getsockname: fd=%i res=%i.", rsrc_obj->fd, res); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_errno_tuple(ctx); } uint32_t ip4_u32 = ntohl(addr.sin_addr.s_addr); @@ -1185,6 +1256,7 @@ static term nif_socket_sockname(Context *ctx, int argc, term argv[]) } LWIP_END(); #endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); // {ok, #{addr => {a,b,c,d}, port => integer()}} if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + term_map_size_in_terms(2) + TUPLE_SIZE(4)) != MEMORY_GC_OK)) { @@ -1221,19 +1293,24 @@ static term nif_socket_peername(Context *ctx, int argc, term argv[]) RAISE_ERROR(BADARG_ATOM); } + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } if (rsrc_obj->socket_state & SocketStateUDP) { // TODO: handle "connected" UDP sockets + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); } if ((rsrc_obj->socket_state & SocketStateTCPListening) == SocketStateTCPListening) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(ENOTCONN, global), ctx); } #endif @@ -1245,14 +1322,17 @@ static term nif_socket_peername(Context *ctx, int argc, term argv[]) if (UNLIKELY(res != 0)) { AVM_LOGE(TAG, "Unable to getpeername: fd=%i res=%i.", rsrc_obj->fd, res); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_errno_tuple(ctx); } + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); uint32_t ip4_u32 = ntohl(addr.sin_addr.s_addr); uint16_t port_u16 = ntohs(addr.sin_port); #elif OTP_SOCKET_LWIP // TODO: support peername for "connected" UDP sockets uint32_t ip4_u32 = ntohl(ip_addr_get_ip4_u32(&rsrc_obj->tcp_pcb->remote_ip)); uint16_t port_u16 = rsrc_obj->tcp_pcb->remote_port; + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); #endif // {ok, #{addr => {a,b,c,d}, port => integer()}} @@ -1289,13 +1369,17 @@ static term nif_socket_bind(Context *ctx, int argc, term argv[]) if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { RAISE_ERROR(BADARG_ATOM); } + + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); #if OTP_SOCKET_BSD TRACE("rsrc_obj->fd=%i\n", (int) rsrc_obj->fd); if (rsrc_obj->fd == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } #endif @@ -1353,6 +1437,7 @@ static term nif_socket_bind(Context *ctx, int argc, term argv[]) serveraddr.sin_port = htons(port_u16); socklen_t address_len = sizeof(serveraddr); int res = bind(rsrc_obj->fd, (struct sockaddr *) &serveraddr, address_len); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); if (UNLIKELY(res != 0)) { AVM_LOGE(TAG, "Unable to bind socket: res=%i.", res); return make_errno_tuple(ctx); @@ -1366,6 +1451,7 @@ static term nif_socket_bind(Context *ctx, int argc, term argv[]) } else { res = udp_bind(rsrc_obj->udp_pcb, &ip_addr, port_u16); } + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); if (UNLIKELY(res != ERR_OK)) { AVM_LOGE(TAG, "Unable to bind socket: res=%i.", res); return make_lwip_err_tuple(res, ctx); @@ -1393,15 +1479,20 @@ static term nif_socket_listen(Context *ctx, int argc, term argv[]) if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { RAISE_ERROR(BADARG_ATOM); } + + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } if (rsrc_obj->socket_state & SocketStateUDP) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EPROTOTYPE, global), ctx); } #endif @@ -1410,6 +1501,7 @@ static term nif_socket_listen(Context *ctx, int argc, term argv[]) #if OTP_SOCKET_BSD int res = listen(rsrc_obj->fd, backlog); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); if (UNLIKELY(res != 0)) { AVM_LOGE(TAG, "Unable to listen on socket: res=%i.", res); return make_errno_tuple(ctx); @@ -1437,6 +1529,7 @@ static term nif_socket_listen(Context *ctx, int argc, term argv[]) tcp_accept(new_pcb, tcp_accept_cb); rsrc_obj->tcp_pcb = new_pcb; rsrc_obj->socket_state = SocketStateTCPListening; + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; #endif } @@ -1472,19 +1565,25 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { RAISE_ERROR(BADARG_ATOM); } + + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state & SocketStateClosed) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } if (rsrc_obj->socket_state & SocketStateUDP) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); } // Only listening is allowed if ((rsrc_obj->socket_state & SocketStateTCPListening) != SocketStateTCPListening) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EINVAL, global), ctx); } #endif @@ -1493,6 +1592,7 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) struct sockaddr_in clientaddr; socklen_t clientlen = sizeof(clientaddr); int fd = accept(rsrc_obj->fd, (struct sockaddr *) &clientaddr, &clientlen); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) { AVM_LOGE(TAG, "Unable to accept on socket %i.", rsrc_obj->fd); int err = errno; @@ -1504,13 +1604,20 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) conn_rsrc_obj->fd = fd; conn_rsrc_obj->selecting_process_id = INVALID_PROCESS_ID; conn_rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE; +#ifndef AVM_NO_SMP + conn_rsrc_obj->socket_lock = smp_rwlock_create(); + if (IS_NULL_PTR(conn_rsrc_obj->socket_lock)) { + free(conn_rsrc_obj); + RAISE_ERROR(OUT_OF_MEMORY_ATOM); + } +#endif TRACE("nif_socket_accept: Created socket on accept fd=%i\n", rsrc_obj->fd); - term obj = enif_make_resource(erl_nif_env_from_context(ctx), conn_rsrc_obj); + term new_resource = enif_make_resource(erl_nif_env_from_context(ctx), conn_rsrc_obj); enif_release_resource(conn_rsrc_obj); size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; - if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &obj, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &new_resource, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } @@ -1518,7 +1625,7 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) term socket_term = term_alloc_tuple(2, &ctx->heap); uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global); term ref = term_from_ref_ticks(ref_ticks, &ctx->heap); - term_put_tuple_element(socket_term, 0, obj); + term_put_tuple_element(socket_term, 0, new_resource); term_put_tuple_element(socket_term, 1, ref); term result = term_alloc_tuple(2, &ctx->heap); @@ -1536,12 +1643,14 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) if (IS_NULL_PTR(new_resource)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); LWIP_END(); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } size_t requested_size = TERM_BOXED_RESOURCE_SIZE + TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE; - if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { + if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); LWIP_END(); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } @@ -1555,9 +1664,12 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) term_put_tuple_element(result, 1, socket_term); } else { // return EAGAIN + LWIP_END(); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EAGAIN, ctx->global), ctx); } LWIP_END(); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return result; #endif } @@ -1586,7 +1698,7 @@ static size_t copy_pbuf_data(struct pbuf *src, size_t offset, size_t count, uint } #endif -ssize_t socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, int flags, term *from, Heap *heap) +static ssize_t do_socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, int flags, term *from, Heap *heap) { #if OTP_SOCKET_BSD // @@ -1706,8 +1818,16 @@ ssize_t socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, i #endif } +ssize_t socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, int flags, term *from, Heap *heap) +{ + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); + ssize_t result = do_socket_recv(rsrc_obj, buf, len, flags, from, heap); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return result; +} + #if OTP_SOCKET_BSD -static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) +static term nif_socket_recv_with_peek(Context *ctx, term resource_term, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) { TRACE("nif_socket_recv_with_peek\n"); @@ -1732,7 +1852,8 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_ size_t ensure_packet_avail = term_binary_data_size_in_terms(buffer_size) + BINARY_HEADER_SIZE; size_t requested_size = TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + INET_ADDR4_TUPLE_SIZE + TERM_MAP_SIZE(2)) : 0); - if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { + // Because resource is locked, we must ensure it's not garbage collected + if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &resource_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } @@ -1765,7 +1886,7 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_ } } -static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) +static term nif_socket_recv_without_peek(Context *ctx, term resource_term, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) { TRACE("nif_socket_recv_without_peek\n"); @@ -1779,17 +1900,19 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs RAISE_ERROR(OUT_OF_MEMORY_ATOM); } else { - - term map = term_invalid_term(); + term roots[2]; + roots[0] = resource_term; + roots[1] = term_invalid_term(); if (is_recvfrom) { - if (UNLIKELY(memory_ensure_free(ctx, INET_ADDR4_TUPLE_SIZE + TERM_MAP_SIZE(2)) != MEMORY_GC_OK)) { + // Because resource is locked, we must ensure it's not garbage collected + if (UNLIKELY(memory_ensure_free_with_roots(ctx, INET_ADDR4_TUPLE_SIZE + TERM_MAP_SIZE(2), 1, &resource_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { free(buffer); AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } } - ssize_t res = socket_recv(rsrc_obj, buffer, buffer_size, 0, is_recvfrom ? &map : NULL, &ctx->heap); + ssize_t res = socket_recv(rsrc_obj, buffer, buffer_size, 0, is_recvfrom ? &roots[1] : NULL, &ctx->heap); if (res < 0) { int err = errno; @@ -1818,7 +1941,8 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs size_t ensure_packet_avail = term_binary_data_size_in_terms(len) + BINARY_HEADER_SIZE; size_t requested_size = TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? TUPLE_SIZE(2) : 0); - if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, is_recvfrom ? 1 : 0, &map, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { + // Because resource is locked, we must ensure it's not garbage collected + if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, is_recvfrom ? 2 : 1, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { free(buffer); AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); @@ -1828,7 +1952,7 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs term payload; if (is_recvfrom) { - term tuple = port_heap_create_tuple2(&ctx->heap, map, data); + term tuple = port_heap_create_tuple2(&ctx->heap, roots[1], data); payload = port_heap_create_ok_tuple(&ctx->heap, tuple); } else { payload = port_heap_create_ok_tuple(&ctx->heap, data); @@ -1841,7 +1965,7 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs #elif OTP_SOCKET_LWIP -static term nif_socket_recv_lwip(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) +static term nif_socket_recv_lwip(Context *ctx, term resource_term, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom) { TRACE("nif_socket_recv_lwip\n"); @@ -1894,7 +2018,8 @@ static term nif_socket_recv_lwip(Context *ctx, struct SocketResource *rsrc_obj, size_t ensure_packet_avail = term_binary_data_size_in_terms(len) + BINARY_HEADER_SIZE; size_t requested_size = REF_SIZE + 2 * TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + INET_ADDR4_TUPLE_SIZE + TERM_MAP_SIZE(2)) : 0); - if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) { + // Because resource is locked, we must ensure it's not garbage collected + if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &resource_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) { AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__); RAISE_ERROR(OUT_OF_MEMORY_ATOM); } @@ -1935,28 +2060,36 @@ static term nif_socket_recv_internal(Context *ctx, term argv[], bool is_recvfrom if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) { RAISE_ERROR(BADARG_ATOM); } + + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, ctx->global), ctx); } #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state & SocketStateClosed) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, ctx->global), ctx); } if (rsrc_obj->socket_state & SocketStateListening) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, ctx->global), ctx); } #endif + term result; #if OTP_SOCKET_BSD if (otp_socket_platform_supports_peek()) { - return nif_socket_recv_with_peek(ctx, rsrc_obj, len, is_recvfrom); + result = nif_socket_recv_with_peek(ctx, argv[0], rsrc_obj, len, is_recvfrom); } else { - return nif_socket_recv_without_peek(ctx, rsrc_obj, len, is_recvfrom); + result = nif_socket_recv_without_peek(ctx, argv[0], rsrc_obj, len, is_recvfrom); } #elif OTP_SOCKET_LWIP - return nif_socket_recv_lwip(ctx, rsrc_obj, len, is_recvfrom); + result = nif_socket_recv_lwip(ctx, argv[0], rsrc_obj, len, is_recvfrom); #endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return result; } static term nif_socket_recv(Context *ctx, int argc, term argv[]) @@ -1978,7 +2111,7 @@ static term nif_socket_recvfrom(Context *ctx, int argc, term argv[]) // // send/sendto // -ssize_t socket_send(struct SocketResource *rsrc_obj, const uint8_t *buf, size_t len, term dest) +static ssize_t do_socket_send(struct SocketResource *rsrc_obj, const uint8_t *buf, size_t len, term dest) { ssize_t sent_data = -1; #if OTP_SOCKET_BSD @@ -2084,6 +2217,14 @@ ssize_t socket_send(struct SocketResource *rsrc_obj, const uint8_t *buf, size_t #endif } +ssize_t socket_send(struct SocketResource *rsrc_obj, const uint8_t *buf, size_t len, term dest) +{ + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); + ssize_t result = do_socket_send(rsrc_obj, buf, len, dest); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return result; +} + static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool is_sendto) { TRACE("nif_socket_send_internal\n"); @@ -2099,15 +2240,19 @@ static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool i RAISE_ERROR(BADARG_ATOM); } + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } if (rsrc_obj->socket_state & SocketStateListening) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); } #endif @@ -2121,7 +2266,8 @@ static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool i const uint8_t *buf = (const uint8_t *) term_binary_data(data); size_t len = term_binary_size(data); - ssize_t sent_data = socket_send(rsrc_obj, buf, len, dest); + ssize_t sent_data = do_socket_send(rsrc_obj, buf, len, dest); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); // {ok, RestData} | {error, Reason} @@ -2147,7 +2293,7 @@ static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool i return port_create_tuple2(ctx, OK_ATOM, data); } else { - AVM_LOGE(TAG, "Unable to send data: res=%zi.", sent_data); + TRACE("Unable to send data: res=%zi.", sent_data); return make_error_tuple(CLOSED_ATOM, ctx); } } @@ -2239,27 +2385,33 @@ static term nif_socket_connect(Context *ctx, int argc, term argv[]) RAISE_ERROR(BADARG_ATOM); } + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); term sockaddr = argv[1]; term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), ctx->global); term addr = interop_kv_get_value(sockaddr, addr_atom, ctx->global); if (term_is_invalid_term(addr)) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } avm_int_t port_number = term_to_int(port); if (port_number < 0 || port_number > 65535) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } #if OTP_SOCKET_BSD if (rsrc_obj->fd == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } if (((rsrc_obj->socket_state & SocketStateTCPListening) == SocketStateTCPListening) || ((rsrc_obj->socket_state & SocketStateTCPConnected) == SocketStateTCPConnected)) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx); } #endif @@ -2284,16 +2436,20 @@ static term nif_socket_connect(Context *ctx, int argc, term argv[]) if (errno == EINPROGRESS) { // TODO make connect non-blocking + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return UNDEFINED_ATOM; } else { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); AVM_LOGE(TAG, "Unable to connect: res=%i errno=%i", res, errno); return make_error_tuple(CLOSED_ATOM, ctx); } } else if (res == 0) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; } else { // won't happen according to connect(2) + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return UNDEFINED_ATOM; } #elif OTP_SOCKET_LWIP @@ -2312,11 +2468,13 @@ static term nif_socket_connect(Context *ctx, int argc, term argv[]) RAISE_ERROR(OUT_OF_MEMORY_ATOM); } if (rsrc_obj->socket_state & SocketStateUDP) { + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return OK_ATOM; } else { rsrc_obj->selecting_process_id = ctx->process_id; // Trap caller waiting for completion context_update_flags(ctx, ~NoFlags, Trap); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return term_invalid_term(); } #endif @@ -2341,6 +2499,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[]) RAISE_ERROR(BADARG_ATOM); } + SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); int how; int val = interop_atom_term_select_int(otp_socket_shutdown_direction_table, argv[1], global); switch (val) { @@ -2358,6 +2517,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[]) break; default: + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); } @@ -2366,6 +2526,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[]) #elif OTP_SOCKET_LWIP if (rsrc_obj->socket_state == SocketStateClosed) { #endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_error_tuple(posix_errno_to_term(EBADF, global), ctx); } @@ -2375,6 +2536,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[]) int res = shutdown(rsrc_obj->fd, how); if (res < 0) { AVM_LOGE(TAG, "Unable to shut down socket: res=%i errno=%i", res, errno); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return make_errno_tuple(ctx); } #elif OTP_SOCKET_LWIP @@ -2391,6 +2553,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[]) } LWIP_END(); #endif + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); return result; }