Skip to content

Commit

Permalink
Add support for encoded local pids in external terms
Browse files Browse the repository at this point in the history
Also add partial support for external pids.

Signed-off-by: Paul Guyot <[email protected]>
  • Loading branch information
pguyot committed Dec 21, 2024
1 parent 1e6248f commit 941b581
Show file tree
Hide file tree
Showing 11 changed files with 517 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `code:all_loaded/0` and `code:all_available/0`
- Added `erlang:split_binary/2`
- Added `inet:getaddr/2`
- Added support for external pids and encoded pids in external terms

## [0.6.6] - Unreleased

Expand Down
19 changes: 16 additions & 3 deletions src/libAtomVM/ets_hashtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ static uint32_t hash_float(term t, int32_t h, GlobalContext *global)
return h * LARGE_PRIME_FLOAT;
}

static uint32_t hash_pid(term t, int32_t h, GlobalContext *global)
static uint32_t hash_local_pid(term t, int32_t h, GlobalContext *global)
{
UNUSED(global);
uint32_t n = (uint32_t) term_to_local_process_id(t);
Expand All @@ -255,6 +255,17 @@ static uint32_t hash_pid(term t, int32_t h, GlobalContext *global)
return h * LARGE_PRIME_PID;
}

static uint32_t hash_external_pid(term t, int32_t h, GlobalContext *global)
{
UNUSED(global);
uint32_t n = (uint32_t) term_get_external_pid_process_id(t);
while (n) {
h = h * LARGE_PRIME_PID + (n & 0xFF);
n >>= 8;
}
return h * LARGE_PRIME_PID;
}

static uint32_t hash_reference(term t, int32_t h, GlobalContext *global)
{
UNUSED(global);
Expand Down Expand Up @@ -285,8 +296,10 @@ static uint32_t hash_term_incr(term t, int32_t h, GlobalContext *global)
return hash_integer(t, h, global);
} else if (term_is_float(t)) {
return hash_float(t, h, global);
} else if (term_is_pid(t)) {
return hash_pid(t, h, global);
} else if (term_is_local_pid(t)) {
return hash_local_pid(t, h, global);
} else if (term_is_external_pid(t)) {
return hash_external_pid(t, h, global);
} else if (term_is_reference(t)) {
return hash_reference(t, h, global);
} else if (term_is_binary(t)) {
Expand Down
84 changes: 84 additions & 0 deletions src/libAtomVM/externalterm.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@
#include <stdlib.h>

#include "bitstring.h"
#include "defaultatoms.h"
#include "term.h"
#include "unicode.h"
#include "utils.h"

#define NEW_FLOAT_EXT 70
#define NEW_PID_EXT 88
#define SMALL_INTEGER_EXT 97
#define INTEGER_EXT 98
#define ATOM_EXT 100
#define PID_EXT 103
#define SMALL_TUPLE_EXT 104
#define LARGE_TUPLE_EXT 105
#define NIL_EXT 106
Expand Down Expand Up @@ -390,6 +394,33 @@ static int serialize_term(uint8_t *buf, term t, GlobalContext *glb)
k += serialize_term(IS_NULL_PTR(buf) ? NULL : buf + k, mfa, glb);
}
return k;
} else if (term_is_local_pid(t)) {
if (!IS_NULL_PTR(buf)) {
buf[0] = NEW_PID_EXT;
}
size_t k = 1;
term node_name = glb->node_name;
uint32_t creation = node_name == NONODE_AT_NOHOST_ATOM ? 0 : glb->creation;
k += serialize_term(IS_NULL_PTR(buf) ? NULL : buf + k, node_name, glb);
if (!IS_NULL_PTR(buf)) {
WRITE_32_UNALIGNED(buf + k, term_to_local_process_id(t));
WRITE_32_UNALIGNED(buf + k + 4, 0); // serial is 0 for local pids
WRITE_32_UNALIGNED(buf + k + 8, creation);
}
return k + 12;
} else if (term_is_external_pid(t)) {
if (!IS_NULL_PTR(buf)) {
buf[0] = NEW_PID_EXT;
}
size_t k = 1;
term node = term_get_external_node(t);
k += serialize_term(IS_NULL_PTR(buf) ? NULL : buf + k, node, glb);
if (!IS_NULL_PTR(buf)) {
WRITE_32_UNALIGNED(buf + k, term_get_external_pid_process_id(t));
WRITE_32_UNALIGNED(buf + k + 4, term_get_external_pid_serial(t));
WRITE_32_UNALIGNED(buf + k + 8, term_get_external_node_creation(t));
}
return k + 12;
} else {
fprintf(stderr, "Unknown external term type: %" TERM_U_FMT "\n", t);
AVM_ABORT();
Expand Down Expand Up @@ -659,6 +690,32 @@ static term parse_external_terms(const uint8_t *external_term_buf, size_t *eterm
return term_from_atom_index(global_atom_id);
}

case NEW_PID_EXT: {
size_t node_size;
term node = parse_external_terms(external_term_buf + 1, &node_size, copy, heap, glb);
if (UNLIKELY(!term_is_atom(node))) {
return term_invalid_term();
}
uint32_t number = READ_32_UNALIGNED(external_term_buf + node_size + 1);
uint32_t serial = READ_32_UNALIGNED(external_term_buf + node_size + 5);
uint32_t creation = READ_32_UNALIGNED(external_term_buf + node_size + 9);
*eterm_size = node_size + 13;
if (node != NONODE_AT_NOHOST_ATOM) {
term this_node = glb->node_name;
uint32_t this_creation = this_node == NONODE_AT_NOHOST_ATOM ? 0 : glb->creation;
if (node == this_node && creation == this_creation) {
return term_from_local_process_id(number);
} else {
return term_make_external_process_id(node, number, serial, creation, heap);
}
} else {
if (UNLIKELY(serial != 0 || creation != 0)) {
return term_invalid_term();
}
return term_from_local_process_id(number);
}
}

default:
return term_invalid_term();
}
Expand Down Expand Up @@ -948,6 +1005,33 @@ static int calculate_heap_usage(const uint8_t *external_term_buf, size_t remaini
return 0;
}

case NEW_PID_EXT: {
if (UNLIKELY(remaining < 1)) {
return INVALID_TERM_SIZE;
}
remaining -= 1;
int buf_pos = 1;
size_t heap_size = EXTERNAL_PID_SIZE;
size_t node_size = 0;
int u = calculate_heap_usage(external_term_buf + buf_pos, remaining, &node_size, copy);
if (UNLIKELY(u == INVALID_TERM_SIZE)) {
return INVALID_TERM_SIZE;
}
if (external_term_buf[1] == SMALL_ATOM_UTF8_EXT) {
// Check if it's non-distributed node, in which case it's always a local pid
if (external_term_buf[2] == strlen("nonode@nohost") && memcmp(external_term_buf + 3, "nonode@nohost", strlen("nonode@nohost")) == 0) {
heap_size = 0;
}
// If this is our node, but we're distributed, we'll allocate more memory and may not use it.
// This way we're sure to not go out of bounds if distribution changes between now and when we deserialize
} else if (UNLIKELY(external_term_buf[1] != ATOM_EXT)) {
return INVALID_TERM_SIZE;
}
buf_pos += node_size;
*eterm_size = buf_pos + 12;
return heap_size + u;
}

default:
return INVALID_TERM_SIZE;
}
Expand Down
29 changes: 18 additions & 11 deletions src/libAtomVM/memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ unsigned long memory_estimate_usage(term t)
} else if (term_is_nil(t)) {
t = temp_stack_pop(&temp_stack);

} else if (term_is_pid(t)) {
} else if (term_is_local_pid(t)) {
t = temp_stack_pop(&temp_stack);

} else if (term_is_nonempty_list(t)) {
Expand Down Expand Up @@ -587,19 +587,19 @@ static void memory_scan_and_copy(HeapFragment *old_fragment, term *mem_start, co
TRACE("Found NIL (%" TERM_X_FMT ")\n", t);
ptr++;

} else if (term_is_pid(t)) {
} else if (term_is_local_pid(t)) {
TRACE("Found PID (%" TERM_X_FMT ")\n", t);
ptr++;

} else if ((t & 0x3) == 0x0) {
TRACE("Found boxed header (%" TERM_X_FMT ")\n", t);

size_t arity = term_get_size_from_boxed_header(t);
switch (t & TERM_BOXED_TAG_MASK) {
case TERM_BOXED_TUPLE: {
int arity = term_get_size_from_boxed_header(t);
TRACE("- Boxed is tuple (%" TERM_X_FMT "), arity: %i\n", t, arity);
TRACE("- Boxed is tuple (%" TERM_X_FMT "), arity: %i\n", t, (int) arity);

for (int i = 1; i <= arity; i++) {
for (size_t i = 1; i <= arity; i++) {
TRACE("-- Elem: %" TERM_X_FMT "\n", ptr[i]);
ptr[i] = memory_shallow_copy_term(old_fragment, ptr[i], &new_heap, move);
}
Expand All @@ -620,13 +620,16 @@ static void memory_scan_and_copy(HeapFragment *old_fragment, term *mem_start, co
TRACE("- Found ref.\n");
break;

case TERM_BOXED_EXTERNAL_PID:
TRACE("- Found external pid.\n");
break;

case TERM_BOXED_FUN: {
int fun_size = term_get_size_from_boxed_header(t);
TRACE("- Found fun, size: %i.\n", fun_size);
TRACE("- Found fun, size: %i.\n", (int) arity);

// first term is the boxed header, followed by module and fun index.

for (int i = 3; i <= fun_size; i++) {
for (size_t i = 3; i <= arity; i++) {
TRACE("-- Frozen: %" TERM_X_FMT "\n", ptr[i]);
ptr[i] = memory_shallow_copy_term(old_fragment, ptr[i], &new_heap, move);
}
Expand Down Expand Up @@ -658,7 +661,7 @@ static void memory_scan_and_copy(HeapFragment *old_fragment, term *mem_start, co

case TERM_BOXED_MAP: {
TRACE("- Found map.\n");
size_t map_size = term_get_size_from_boxed_header(t) - 1;
size_t map_size = arity - 1;
size_t keys_offset = term_get_map_keys_offset();
size_t value_offset = term_get_map_value_offset();
TRACE("-- Map keys: %" TERM_X_FMT "\n", ptr[keys_offset]);
Expand All @@ -674,7 +677,7 @@ static void memory_scan_and_copy(HeapFragment *old_fragment, term *mem_start, co
AVM_ABORT();
}

ptr += term_get_size_from_boxed_header(t) + 1;
ptr += arity + 1;

} else if (term_is_nonempty_list(t)) {
TRACE("Found nonempty list (%p)\n", (void *) t);
Expand Down Expand Up @@ -740,6 +743,10 @@ static void memory_scan_and_rewrite(size_t count, term *terms, const term *old_s
ptr += term_get_size_from_boxed_header(t);
break;

case TERM_BOXED_EXTERNAL_PID:
ptr += term_get_size_from_boxed_header(t);
break;

case TERM_BOXED_FUN:
// Skip header and module and process next terms
ptr++;
Expand Down Expand Up @@ -810,7 +817,7 @@ HOT_FUNC static term memory_shallow_copy_term(HeapFragment *old_fragment, term t
} else if (term_is_nil(t)) {
return t;

} else if (term_is_pid(t)) {
} else if (term_is_local_pid(t)) {
return t;

} else if (term_is_cp(t)) {
Expand Down
28 changes: 14 additions & 14 deletions src/libAtomVM/nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ static term nif_erlang_register_2(Context *ctx, int argc, term argv[])
term reg_name_term = argv[0];
VALIDATE_VALUE(reg_name_term, term_is_atom);
term pid_or_port_term = argv[1];
VALIDATE_VALUE(pid_or_port_term, term_is_pid);
VALIDATE_VALUE(pid_or_port_term, term_is_local_pid);

int atom_index = term_to_atom_index(reg_name_term);
int32_t pid = term_to_local_process_id(pid_or_port_term);
Expand Down Expand Up @@ -1407,7 +1407,7 @@ static term nif_erlang_send_2(Context *ctx, int argc, term argv[])
term target = argv[0];
GlobalContext *glb = ctx->global;

if (term_is_pid(target)) {
if (term_is_local_pid(target)) {
int32_t local_process_id = term_to_local_process_id(target);

globalcontext_send_message(glb, local_process_id, argv[1]);
Expand Down Expand Up @@ -2748,7 +2748,7 @@ static term nif_erlang_process_flag(Context *ctx, int argc, term argv[])
flag = argv[1];
value = argv[2];

VALIDATE_VALUE(pid, term_is_pid);
VALIDATE_VALUE(pid, term_is_local_pid);
int local_process_id = term_to_local_process_id(pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
if (IS_NULL_PTR(target)) {
Expand Down Expand Up @@ -3225,7 +3225,7 @@ static term nif_binary_split(Context *ctx, int argc, term argv[])

if (num_segments == 1) {
// not found
if (UNLIKELY(memory_ensure_free_with_roots(ctx, 2, 1, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, 0), 1, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}

Expand Down Expand Up @@ -3477,11 +3477,11 @@ static term nif_erlang_pid_to_list(Context *ctx, int argc, term argv[])

term t = argv[0];
VALIDATE_VALUE(t, term_is_pid);
size_t max_len = term_is_external(t) ? EXTERNAL_PID_AS_CSTRING_LEN : LOCAL_PID_AS_CSTRING_LEN;

char buf[PID_AS_CSTRING_LEN];
int str_len = term_snprint(buf, PID_AS_CSTRING_LEN, t, ctx->global);
char buf[max_len];
int str_len = term_snprint(buf, max_len, t, ctx->global);
if (UNLIKELY(str_len < 0)) {
// TODO: change to internal error or something like that
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}

Expand Down Expand Up @@ -3593,7 +3593,7 @@ static term nif_erlang_garbage_collect(Context *ctx, int argc, term argv[])
} else {
// argc == 1
term t = argv[0];
VALIDATE_VALUE(t, term_is_pid);
VALIDATE_VALUE(t, term_is_local_pid);

int local_id = term_to_local_process_id(t);
Context *target = globalcontext_get_process_lock(ctx->global, local_id);
Expand Down Expand Up @@ -3636,7 +3636,7 @@ static term nif_erlang_exit(Context *ctx, int argc, term argv[])
RAISE(LOWERCASE_EXIT_ATOM, reason);
} else {
term target_process = argv[0];
VALIDATE_VALUE(target_process, term_is_pid);
VALIDATE_VALUE(target_process, term_is_local_pid);
term reason = argv[1];
GlobalContext *glb = ctx->global;
Context *target = globalcontext_get_process_lock(glb, term_to_local_process_id(target_process));
Expand Down Expand Up @@ -3749,7 +3749,7 @@ static term nif_erlang_monitor(Context *ctx, int argc, term argv[])
RAISE_ERROR(BADARG_ATOM);
}

VALIDATE_VALUE(target_pid, term_is_pid);
VALIDATE_VALUE(target_pid, term_is_local_pid);

int local_process_id = term_to_local_process_id(target_pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
Expand Down Expand Up @@ -3817,7 +3817,7 @@ static term nif_erlang_link(Context *ctx, int argc, term argv[])

term target_pid = argv[0];

VALIDATE_VALUE(target_pid, term_is_pid);
VALIDATE_VALUE(target_pid, term_is_local_pid);

int local_process_id = term_to_local_process_id(target_pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
Expand Down Expand Up @@ -3848,7 +3848,7 @@ static term nif_erlang_unlink(Context *ctx, int argc, term argv[])

term target_pid = argv[0];

VALIDATE_VALUE(target_pid, term_is_pid);
VALIDATE_VALUE(target_pid, term_is_local_pid);

int local_process_id = term_to_local_process_id(target_pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
Expand Down Expand Up @@ -3879,8 +3879,8 @@ static term nif_erlang_group_leader(Context *ctx, int argc, term argv[])
} else {
term leader = argv[0];
term pid = argv[1];
VALIDATE_VALUE(pid, term_is_pid);
VALIDATE_VALUE(leader, term_is_pid);
VALIDATE_VALUE(pid, term_is_local_pid);
VALIDATE_VALUE(leader, term_is_local_pid);

int local_process_id = term_to_local_process_id(pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
Expand Down
4 changes: 2 additions & 2 deletions src/libAtomVM/opcodesswitch.h
Original file line number Diff line number Diff line change
Expand Up @@ -2404,7 +2404,7 @@ HOT_FUNC int scheduler_entry_point(GlobalContext *glb)
#ifdef IMPL_EXECUTE_LOOP
term recipient_term = x_regs[0];
int local_process_id;
if (term_is_pid(recipient_term)) {
if (term_is_local_pid(recipient_term)) {
local_process_id = term_to_local_process_id(recipient_term);
} else if (term_is_atom(recipient_term)) {
local_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(recipient_term));
Expand Down Expand Up @@ -3004,7 +3004,7 @@ HOT_FUNC int scheduler_entry_point(GlobalContext *glb)
#ifdef IMPL_EXECUTE_LOOP
TRACE("is_port/2, label=%i, arg1=%lx\n", label, arg1);

if (term_is_pid(arg1)) {
if (term_is_local_pid(arg1)) {
int local_process_id = term_to_local_process_id(arg1);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
bool is_port_driver = false;
Expand Down
2 changes: 1 addition & 1 deletion src/libAtomVM/posix_nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ static term nif_atomvm_posix_write(Context *ctx, int argc, term argv[])
static term nif_atomvm_posix_select(Context *ctx, term argv[], enum ErlNifSelectFlags mode)
{
term process_pid_term = argv[1];
VALIDATE_VALUE(process_pid_term, term_is_pid);
VALIDATE_VALUE(process_pid_term, term_is_local_pid);
int32_t process_pid = term_to_local_process_id(process_pid_term);
term select_ref_term = argv[2];
if (select_ref_term != UNDEFINED_ATOM) {
Expand Down
Loading

0 comments on commit 941b581

Please sign in to comment.