Skip to content

Commit

Permalink
Merge pull request atomvm#1365 from pguyot/w45/encode_and_decode_ports
Browse files Browse the repository at this point in the history
Introduce ports (local and external)

Continuation of atomvm#1363

Native ports are now represented as ports, and can be encoded in binaries.
External ports are also supported. String representation of ports matches
OTP's.

These changes are made under both the "Apache 2.0" and the "GNU Lesser General
Public License 2.1 or later" license terms (dual license).

SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
  • Loading branch information
bettio committed Dec 24, 2024
2 parents 642a280 + 0870d0a commit bd67862
Show file tree
Hide file tree
Showing 17 changed files with 431 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `inet:getaddr/2`
- Added support for external pids and encoded pids in external terms
- Added support for external refs and encoded refs in external terms
- Introduce ports to represent native processes and added support for external ports and encoded ports in external terms

## [0.6.6] - Unreleased

Expand Down
6 changes: 6 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@

# AtomVM Update Instructions

## v0.6.6 -> Unreleased
- Ports now represent the native processes. Port drivers should return a port (instead of a pid),
by using `term_port_from_local_process_id` instead of `term_from_local_process_id`. Sockets, from
port socket driver, are also represented by a port and some matching code may need to be updated from
`is_pid/1` to `is_port/1`.

## v0.6.4 -> v0.6.5

- ESP32: `esp32boot.avm` doesn't contain anymore Elixir standard library, use instead
Expand Down
9 changes: 9 additions & 0 deletions src/libAtomVM/bif.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ term bif_erlang_is_pid_1(Context *ctx, uint32_t fail_label, term arg1)
return term_is_pid(arg1) ? TRUE_ATOM : FALSE_ATOM;
}

// Generated by OTP-21 compiler
term bif_erlang_is_port_1(Context *ctx, uint32_t fail_label, term arg1)
{
UNUSED(ctx);
UNUSED(fail_label);

return term_is_port(arg1) ? TRUE_ATOM : FALSE_ATOM;
}

term bif_erlang_is_reference_1(Context *ctx, uint32_t fail_label, term arg1)
{
UNUSED(ctx);
Expand Down
1 change: 1 addition & 0 deletions src/libAtomVM/bif.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ term bif_erlang_is_integer_1(Context *ctx, uint32_t fail_label, term arg1);
term bif_erlang_is_list_1(Context *ctx, uint32_t fail_label, term arg1);
term bif_erlang_is_number_1(Context *ctx, uint32_t fail_label, term arg1);
term bif_erlang_is_pid_1(Context *ctx, uint32_t fail_label, term arg1);
term bif_erlang_is_port_1(Context *ctx, uint32_t fail_label, term arg1);
term bif_erlang_is_reference_1(Context *ctx, uint32_t fail_label, term arg1);
term bif_erlang_is_tuple_1(Context *ctx, uint32_t fail_label, term arg1);
term bif_erlang_is_map_1(Context *ctx, uint32_t fail_label, term arg1);
Expand Down
1 change: 1 addition & 0 deletions src/libAtomVM/bifs.gperf
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ erlang:is_integer/1, {.bif.base.type = BIFFunctionType, .bif.bif1_ptr = bif_erla
erlang:is_list/1, {.bif.base.type = BIFFunctionType, .bif.bif1_ptr = bif_erlang_is_list_1}
erlang:is_number/1, {.bif.base.type = BIFFunctionType, .bif.bif1_ptr = bif_erlang_is_number_1}
erlang:is_pid/1, {.bif.base.type = BIFFunctionType, .bif.bif1_ptr = bif_erlang_is_pid_1}
erlang:is_port/1, {.bif.base.type = BIFFunctionType, .bif.bif1_ptr = bif_erlang_is_port_1}
erlang:is_reference/1, {.bif.base.type = BIFFunctionType, .bif.bif1_ptr = bif_erlang_is_reference_1}
erlang:is_tuple/1, {.bif.base.type = BIFFunctionType, .bif.bif1_ptr = bif_erlang_is_tuple_1}
erlang:is_map/1, {.bif.base.type = BIFFunctionType, .bif.bif1_ptr = bif_erlang_is_map_1}
Expand Down
3 changes: 2 additions & 1 deletion src/libAtomVM/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,11 @@ static struct ResourceMonitor *context_monitors_handle_terminate(Context *ctx)
term_put_tuple_element(info_tuple, 1, ref);
if (ctx->native_handler != NULL) {
term_put_tuple_element(info_tuple, 2, PORT_ATOM);
term_put_tuple_element(info_tuple, 3, term_port_from_local_process_id(ctx->process_id));
} else {
term_put_tuple_element(info_tuple, 2, PROCESS_ATOM);
term_put_tuple_element(info_tuple, 3, term_from_local_process_id(ctx->process_id));
}
term_put_tuple_element(info_tuple, 3, term_from_local_process_id(ctx->process_id));
term_put_tuple_element(info_tuple, 4, ctx->exit_reason);

mailbox_send(target, info_tuple);
Expand Down
57 changes: 56 additions & 1 deletion src/libAtomVM/externalterm.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#define EXPORT_EXT 113
#define MAP_EXT 116
#define SMALL_ATOM_UTF8_EXT 119
#define V4_PORT_EXT 120
#define INVALID_TERM_SIZE -1

#define NEW_FLOAT_EXT_SIZE 9
Expand Down Expand Up @@ -422,6 +423,31 @@ static int serialize_term(uint8_t *buf, term t, GlobalContext *glb)
WRITE_32_UNALIGNED(buf + k + 8, term_get_external_node_creation(t));
}
return k + 12;
} else if (term_is_local_port(t)) {
if (!IS_NULL_PTR(buf)) {
buf[0] = V4_PORT_EXT;
}
size_t k = 1;
term node_name = glb->node_name;
uint32_t creation = node_name == NONODE_AT_NOHOST_ATOM ? 0 : glb->creation;
k += serialize_term(IS_NULL_PTR(buf) ? NULL : buf + k, node_name, glb);
if (!IS_NULL_PTR(buf)) {
WRITE_64_UNALIGNED(buf + k, term_to_local_process_id(t));
WRITE_32_UNALIGNED(buf + k + 8, creation); // creation
}
return k + 12;
} else if (term_is_external_port(t)) {
if (!IS_NULL_PTR(buf)) {
buf[0] = V4_PORT_EXT;
}
size_t k = 1;
term node = term_get_external_node(t);
k += serialize_term(IS_NULL_PTR(buf) ? NULL : buf + k, node, glb);
if (!IS_NULL_PTR(buf)) {
WRITE_64_UNALIGNED(buf + k, term_get_external_port_number(t));
WRITE_32_UNALIGNED(buf + k + 8, term_get_external_node_creation(t));
}
return k + 12;
} else if (term_is_local_reference(t)) {
if (!IS_NULL_PTR(buf)) {
buf[0] = NEWER_REFERENCE_EXT;
Expand Down Expand Up @@ -759,6 +785,34 @@ static term parse_external_terms(const uint8_t *external_term_buf, size_t *eterm
}
}

case V4_PORT_EXT: {
size_t node_size;
term node = parse_external_terms(external_term_buf + 1, &node_size, copy, heap, glb);
if (UNLIKELY(!term_is_atom(node))) {
return term_invalid_term();
}
uint64_t number = READ_64_UNALIGNED(external_term_buf + node_size + 1);
uint32_t creation = READ_32_UNALIGNED(external_term_buf + node_size + 9);
*eterm_size = node_size + 13;
if (node != NONODE_AT_NOHOST_ATOM) {
term this_node = glb->node_name;
uint32_t this_creation = this_node == NONODE_AT_NOHOST_ATOM ? 0 : glb->creation;
if (node == this_node && creation == this_creation) {
if (UNLIKELY(number > TERM_MAX_LOCAL_PROCESS_ID)) {
return term_invalid_term();
}
return term_port_from_local_process_id(number);
} else {
return term_make_external_port_number(node, number, creation, heap);
}
} else {
if (UNLIKELY(number > TERM_MAX_LOCAL_PROCESS_ID || creation != 0)) {
return term_invalid_term();
}
return term_port_from_local_process_id(number);
}
}

case NEWER_REFERENCE_EXT: {
uint16_t len = READ_16_UNALIGNED(external_term_buf + 1);
if (UNLIKELY(len > 5)) {
Expand Down Expand Up @@ -1079,7 +1133,8 @@ static int calculate_heap_usage(const uint8_t *external_term_buf, size_t remaini
return 0;
}

case NEW_PID_EXT: {
case NEW_PID_EXT:
case V4_PORT_EXT: {
if (UNLIKELY(remaining < 1)) {
return INVALID_TERM_SIZE;
}
Expand Down
14 changes: 14 additions & 0 deletions src/libAtomVM/memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@ unsigned long memory_estimate_usage(term t)
} else if (term_is_local_pid(t)) {
t = temp_stack_pop(&temp_stack);

} else if (term_is_local_port(t)) {
t = temp_stack_pop(&temp_stack);

} else if (term_is_nonempty_list(t)) {
acc += 2;
if (UNLIKELY(temp_stack_push(&temp_stack, term_get_list_tail(t)) != TempStackOk)) {
Expand Down Expand Up @@ -624,6 +627,10 @@ static void memory_scan_and_copy(HeapFragment *old_fragment, term *mem_start, co
TRACE("- Found external pid.\n");
break;

case TERM_BOXED_EXTERNAL_PORT:
TRACE("- Found external port.\n");
break;

case TERM_BOXED_EXTERNAL_REF:
TRACE("- Found external ref.\n");
break;
Expand Down Expand Up @@ -751,6 +758,10 @@ static void memory_scan_and_rewrite(size_t count, term *terms, const term *old_s
ptr += term_get_size_from_boxed_header(t);
break;

case TERM_BOXED_EXTERNAL_PORT:
ptr += term_get_size_from_boxed_header(t);
break;

case TERM_BOXED_EXTERNAL_REF:
ptr += term_get_size_from_boxed_header(t);
break;
Expand Down Expand Up @@ -828,6 +839,9 @@ HOT_FUNC static term memory_shallow_copy_term(HeapFragment *old_fragment, term t
} else if (term_is_local_pid(t)) {
return t;

} else if (term_is_local_port(t)) {
return t;

} else if (term_is_cp(t)) {
// CP is valid only on stack
return t;
Expand Down
39 changes: 31 additions & 8 deletions src/libAtomVM/nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ static term nif_ets_lookup(Context *ctx, int argc, term argv[]);
static term nif_ets_lookup_element(Context *ctx, int argc, term argv[]);
static term nif_ets_delete(Context *ctx, int argc, term argv[]);
static term nif_erlang_pid_to_list(Context *ctx, int argc, term argv[]);
static term nif_erlang_port_to_list(Context *ctx, int argc, term argv[]);
static term nif_erlang_ref_to_list(Context *ctx, int argc, term argv[]);
static term nif_erlang_fun_to_list(Context *ctx, int argc, term argv[]);
static term nif_erlang_function_exported(Context *ctx, int argc, term argv[]);
Expand Down Expand Up @@ -590,6 +591,12 @@ static const struct Nif pid_to_list_nif =
.nif_ptr = nif_erlang_pid_to_list
};

static const struct Nif port_to_list_nif =
{
.base.type = NIFFunctionType,
.nif_ptr = nif_erlang_port_to_list
};

static const struct Nif ref_to_list_nif =
{
.base.type = NIFFunctionType,
Expand Down Expand Up @@ -992,7 +999,7 @@ static term nif_erlang_open_port_2(Context *ctx, int argc, term argv[])
if (!new_ctx) {
RAISE_ERROR(BADARG_ATOM);
} else {
return term_from_local_process_id(new_ctx->process_id);
return term_port_from_local_process_id(new_ctx->process_id);
}
}

Expand All @@ -1003,7 +1010,7 @@ static term nif_erlang_register_2(Context *ctx, int argc, term argv[])
term reg_name_term = argv[0];
VALIDATE_VALUE(reg_name_term, term_is_atom);
term pid_or_port_term = argv[1];
VALIDATE_VALUE(pid_or_port_term, term_is_local_pid);
VALIDATE_VALUE(pid_or_port_term, term_is_local_pid_or_port);

int atom_index = term_to_atom_index(reg_name_term);
int32_t pid = term_to_local_process_id(pid_or_port_term);
Expand Down Expand Up @@ -1069,7 +1076,7 @@ static NativeHandlerResult process_echo_mailbox(Context *ctx)
}
result = NativeTerminate;
term reply = term_alloc_tuple(2, &ctx->heap);
term_put_tuple_element(reply, 0, term_from_local_process_id(ctx->process_id));
term_put_tuple_element(reply, 0, term_port_from_local_process_id(ctx->process_id));
term_put_tuple_element(reply, 1, CLOSED_ATOM);
port_send_message(ctx->global, pid, reply);
} else {
Expand Down Expand Up @@ -1101,7 +1108,7 @@ static NativeHandlerResult process_console_message(Context *ctx, term msg)
result = NativeTerminate;
term pid = term_get_tuple_element(msg, 0);
term reply = term_alloc_tuple(2, &ctx->heap);
term_put_tuple_element(reply, 0, term_from_local_process_id(ctx->process_id));
term_put_tuple_element(reply, 0, term_port_from_local_process_id(ctx->process_id));
term_put_tuple_element(reply, 1, CLOSED_ATOM);
port_send_message(ctx->global, pid, reply);
} else if (is_tagged_tuple(msg, IO_REQUEST_ATOM, 4)) {
Expand Down Expand Up @@ -1407,7 +1414,7 @@ static term nif_erlang_send_2(Context *ctx, int argc, term argv[])
term target = argv[0];
GlobalContext *glb = ctx->global;

if (term_is_local_pid(target)) {
if (term_is_local_pid_or_port(target)) {
int32_t local_process_id = term_to_local_process_id(target);

globalcontext_send_message(glb, local_process_id, argv[1]);
Expand Down Expand Up @@ -3487,6 +3494,22 @@ static term nif_erlang_pid_to_list(Context *ctx, int argc, term argv[])
return make_list_from_ascii_buf((uint8_t *) buf, str_len, ctx);
}

static term nif_erlang_port_to_list(Context *ctx, int argc, term argv[])
{
UNUSED(argc);

term t = argv[0];
VALIDATE_VALUE(t, term_is_port);

char buf[PORT_AS_CSTRING_LEN];
int str_len = term_snprint(buf, PORT_AS_CSTRING_LEN, t, ctx->global);
if (UNLIKELY(str_len < 0)) {
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}

return make_list_from_ascii_buf((uint8_t *) buf, str_len, ctx);
}

static term nif_erlang_ref_to_list(Context *ctx, int argc, term argv[])
{
UNUSED(argc);
Expand Down Expand Up @@ -3747,7 +3770,7 @@ static term nif_erlang_monitor(Context *ctx, int argc, term argv[])
RAISE_ERROR(BADARG_ATOM);
}

VALIDATE_VALUE(target_pid, term_is_local_pid);
VALIDATE_VALUE(target_pid, term_is_local_pid_or_port);

int local_process_id = term_to_local_process_id(target_pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
Expand Down Expand Up @@ -3815,7 +3838,7 @@ static term nif_erlang_link(Context *ctx, int argc, term argv[])

term target_pid = argv[0];

VALIDATE_VALUE(target_pid, term_is_local_pid);
VALIDATE_VALUE(target_pid, term_is_local_pid_or_port);

int local_process_id = term_to_local_process_id(target_pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
Expand Down Expand Up @@ -3846,7 +3869,7 @@ static term nif_erlang_unlink(Context *ctx, int argc, term argv[])

term target_pid = argv[0];

VALIDATE_VALUE(target_pid, term_is_local_pid);
VALIDATE_VALUE(target_pid, term_is_local_pid_or_port);

int local_process_id = term_to_local_process_id(target_pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
Expand Down
1 change: 1 addition & 0 deletions src/libAtomVM/nifs.gperf
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ erlang:throw/1, &throw_nif
erlang:raise/3, &raise_nif
erlang:unlink/1, &unlink_nif
erlang:pid_to_list/1, &pid_to_list_nif
erlang:port_to_list/1, &port_to_list_nif
erlang:ref_to_list/1, &ref_to_list_nif
erlang:fun_to_list/1, &fun_to_list_nif
erlang:function_exported/3, &function_exported_nif
Expand Down
15 changes: 2 additions & 13 deletions src/libAtomVM/opcodesswitch.h
Original file line number Diff line number Diff line change
Expand Up @@ -2404,7 +2404,7 @@ HOT_FUNC int scheduler_entry_point(GlobalContext *glb)
#ifdef IMPL_EXECUTE_LOOP
term recipient_term = x_regs[0];
int local_process_id;
if (term_is_local_pid(recipient_term)) {
if (term_is_local_pid_or_port(recipient_term)) {
local_process_id = term_to_local_process_id(recipient_term);
} else if (term_is_atom(recipient_term)) {
local_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(recipient_term));
Expand Down Expand Up @@ -3004,18 +3004,7 @@ HOT_FUNC int scheduler_entry_point(GlobalContext *glb)
#ifdef IMPL_EXECUTE_LOOP
TRACE("is_port/2, label=%i, arg1=%lx\n", label, arg1);

if (term_is_local_pid(arg1)) {
int local_process_id = term_to_local_process_id(arg1);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
bool is_port_driver = false;
if (target) {
is_port_driver = context_is_port_driver(target);
globalcontext_get_process_unlock(ctx->global, target);
}
if (!is_port_driver) {
pc = mod->labels[label];
}
} else {
if (!term_is_port(arg1)) {
pc = mod->labels[label];
}
#endif
Expand Down
Loading

0 comments on commit bd67862

Please sign in to comment.