Skip to content

Commit

Permalink
Allow sending messages directly from ISR
Browse files Browse the repository at this point in the history
Introduce and document new functions to send messages from an interrupt service
routine

Add CMake option AVM_DISABLE_ISR to disable these functions as they require
several synchronization and atomic primitives that are not available on all
platforms

Introduce the required synchronization primitives that try and acquire locks

Also fix Pico networking code that sends messages from interrupt service code
(as chosen lwIP mode is low-priority interrupt mode) by using these new
functions

Signed-off-by: Paul Guyot <[email protected]>
  • Loading branch information
pguyot committed Oct 28, 2023
1 parent 8c3e7d1 commit aaf1e67
Show file tree
Hide file tree
Showing 33 changed files with 742 additions and 199 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-and-test-other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
- arch: "arm32v5"
platform: "arm/v5"
cflags: "-O2 -mthumb -mthumb-interwork -march=armv4t"
cmake_opts: "-DAVM_DISABLE_SMP=On"
cmake_opts: "-DAVM_DISABLE_SMP=On -DAVM_DISABLE_ISR=On"
tag: "stretch"
sources: |
deb [trusted=yes] http://archive.debian.org/debian/ stretch-backports main
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ find_package(Elixir)

option(AVM_DISABLE_FP "Disable floating point support." OFF)
option(AVM_DISABLE_SMP "Disable SMP." OFF)
option(AVM_DISABLE_ISR "Disable ISR support." OFF)
option(AVM_USE_32BIT_FLOAT "Use 32 bit floats." OFF)
option(AVM_VERBOSE_ABORT "Print module and line number on VM abort" OFF)
option(AVM_RELEASE "Build an AtomVM release" OFF)
Expand Down
16 changes: 16 additions & 0 deletions doc/src/atomvm-internals.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,22 @@ Once a scheduler thread is done executing a process, if no other thread is waiti

If there already is one thread in `sys_poll_events`, other scheduler threads pick the next ready process and if there is none, wait. Other scheduler threads can also interrupt the wait in `sys_poll_events` if a process is made ready to run. They do so using platform function `sys_signal`.

## Tasks and synchronization mechanisms

AtomVM SMP builds run on operating or runtime systems implementing tasks (FreeRTOS SMP on ESP32, Unix and WebAssembly) as well as on systems with no task implementation (Raspberry Pi Pico).

On runtime systems with tasks, each scheduler thread is implemented as a task. On Pico, a scheduler thread runs on Core 0 and another one runs on Core 1, and they are effectively pinned to each core.

For synchronization purposes, AtomVM uses mutexes, condition variables, RW locks, spinlocks and Atomics.

Availability of RW Locks and atomics are verified at compile time using detection of symbols for RW Locks and `ATOMIC_*_LOCK_FREE` C11 macros for atomics.

Mutexes and condition variables are provided by the SDK or the runtime system. If RW Locks are not available, AtomVM uses mutexes. Atomics are not available on Pico and are replaced by critical sections. Spinlocks are implemented by AtomVM on top of Atomics, or using mutexes on Pico.

Importantly, locking synchronization mechanisms (mutexes, RW locks, spinlocks) are not interrupt-safe. Interrupt service routines must not try to lock as they could fail forever if interrupted code owns the lock. Atomics, including emulation on Pico, are interrupt-safe.

Drivers can send messages from interruption service routines using `globalcontext_send_message_from_isr` function instead of `globalcontext_send_message`. This function tries to acquire required locks and if it fails, enqueues sent message in a queue, so it is later processed when the scheduler performs context switching. On platforms with no Atomics or emulation, this feature can be disabled wih `AVM_DISABLE_ISR` option.

## Mailboxes and signals

Erlang processes receive messages in a mailbox. The mailbox is the interface with other processes.
Expand Down
48 changes: 29 additions & 19 deletions src/libAtomVM/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,25 +130,35 @@ endif()

if (AVM_DISABLE_SMP)
target_compile_definitions(libAtomVM PUBLIC AVM_NO_SMP)
else()
include(CheckIncludeFile)
CHECK_INCLUDE_FILE(stdatomic.h STDATOMIC_INCLUDE)
if(HAVE_PLATFORM_SMP_H)
target_compile_definitions(libAtomVM PUBLIC HAVE_PLATFORM_SMP_H)
endif()
include(CheckCSourceCompiles)
check_c_source_compiles("
#include <stdatomic.h>
int main() {
_Static_assert(ATOMIC_POINTER_LOCK_FREE == 2, \"Expected ATOMIC_POINTER_LOCK_FREE to be equal to 2\");
}
" ATOMIC_POINTER_LOCK_FREE_IS_TWO)
if (NOT ATOMIC_POINTER_LOCK_FREE_IS_TWO AND NOT HAVE_PLATFORM_SMP_H)
if (NOT STDATOMIC_INCLUDE)
message(FATAL_ERROR "stdatomic.h cannot be found, you need to disable SMP on this platform or provide platform_smp.h and define HAVE_PLATFORM_SMP_H")
else()
message(FATAL_ERROR "Platform doesn't support atomic pointers, you need to disable SMP or provide platform_smp.h and define HAVE_PLATFORM_SMP_H")
endif()
endif()
if (AVM_DISABLE_ISR)
target_compile_definitions(libAtomVM PUBLIC AVM_NO_ISR)
endif()

if(HAVE_PLATFORM_SMP_H)
target_compile_definitions(libAtomVM PUBLIC HAVE_PLATFORM_SMP_H)
endif()
if(HAVE_PLATFORM_ATOMIC_H)
target_compile_definitions(libAtomVM PUBLIC HAVE_PLATFORM_ATOMIC_H)
endif()

include(CheckIncludeFile)
CHECK_INCLUDE_FILE(stdatomic.h STDATOMIC_INCLUDE)
include(CheckCSourceCompiles)
check_c_source_compiles("
#include <stdatomic.h>
int main() {
_Static_assert(ATOMIC_POINTER_LOCK_FREE == 2, \"Expected ATOMIC_POINTER_LOCK_FREE to be equal to 2\");
}
" ATOMIC_POINTER_LOCK_FREE_IS_TWO)
if (ATOMIC_POINTER_LOCK_FREE_IS_TWO)
target_compile_definitions(libAtomVM PUBLIC HAVE_ATOMIC)
endif()
if (NOT ATOMIC_POINTER_LOCK_FREE_IS_TWO AND NOT (HAVE_PLATFORM_ATOMIC_H OR (AVM_DISABLE_SMP AND AVM_DISABLE_ISR)))
if (NOT STDATOMIC_INCLUDE)
message(FATAL_ERROR "stdatomic.h cannot be found, you need to provide platform_atomic.h and define HAVE_PLATFORM_ATOMIC_H or alternatively pass AVM_DISABLE_SMP and AVM_DISABLE_ISR")
else()
message(FATAL_ERROR "Platform doesn't support atomic pointers, you need to provide platform_atomic.h and define HAVE_PLATFORM_ATOMIC_H or alternatively pass AVM_DISABLE_SMP and AVM_DISABLE_ISR")
endif()
endif()

Expand Down
11 changes: 10 additions & 1 deletion src/libAtomVM/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@
#include "term.h"
#include "utils.h"

#ifdef HAVE_PLATFORM_ATOMIC_H
#include "platform_atomic.h"
#endif

#if defined(HAVE_ATOMIC)
#include <stdatomic.h>
#define ATOMIC_COMPARE_EXCHANGE_WEAK_INT atomic_compare_exchange_weak
#endif

#define IMPL_EXECUTE_LOOP
#include "opcodesswitch.h"
#undef IMPL_EXECUTE_LOOP
Expand Down Expand Up @@ -232,7 +241,7 @@ void context_update_flags(Context *ctx, int mask, int value) CLANG_THREAD_SANITI
enum ContextFlags desired;
do {
desired = (expected & mask) | value;
} while (!ATOMIC_COMPARE_EXCHANGE_WEAK(&ctx->flags, &expected, desired));
} while (!ATOMIC_COMPARE_EXCHANGE_WEAK_INT(&ctx->flags, &expected, desired));
#else
ctx->flags = (ctx->flags & mask) | value;
#endif
Expand Down
124 changes: 108 additions & 16 deletions src/libAtomVM/globalcontext.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,24 @@
#include "defaultatoms.h"
#include "erl_nif_priv.h"
#include "list.h"
#include "mailbox.h"
#include "posix_nifs.h"
#include "refc_binary.h"
#include "resources.h"
#include "scheduler.h"
#include "smp.h"
#include "synclist.h"
#include "sys.h"
#include "utils.h"
#include "valueshashtable.h"

#ifndef AVM_NO_SMP
#define SMP_SPINLOCK_LOCK(spinlock) smp_spinlock_lock(spinlock)
#define SMP_SPINLOCK_UNLOCK(spinlock) smp_spinlock_unlock(spinlock)
#define SMP_MUTEX_LOCK(mutex) smp_mutex_lock(mutex)
#define SMP_MUTEX_UNLOCK(mutex) smp_mutex_unlock(mutex)
#define SMP_RWLOCK_RDLOCK(lock) smp_rwlock_rdlock(lock)
#define SMP_RWLOCK_WRLOCK(lock) smp_rwlock_wrlock(lock)
#define SMP_RWLOCK_UNLOCK(lock) smp_rwlock_unlock(lock)
#else
#define SMP_SPINLOCK_LOCK(spinlock)
#define SMP_SPINLOCK_UNLOCK(spinlock)
#define SMP_MUTEX_LOCK(mutex)
#define SMP_MUTEX_UNLOCK(mutex)
#define SMP_RWLOCK_RDLOCK(lock)
#define SMP_RWLOCK_WRLOCK(lock)
#define SMP_RWLOCK_UNLOCK(lock)
#ifdef HAVE_PLATFORM_ATOMIC_H
#include "platform_atomic.h"
#endif

#if defined(HAVE_ATOMIC)
#include <stdatomic.h>
#define ATOMIC_COMPARE_EXCHANGE_WEAK_PTR atomic_compare_exchange_weak
#endif

struct RegisteredProcess
Expand All @@ -74,6 +68,9 @@ GlobalContext *globalcontext_new()
list_init(&glb->waiting_processes);
#ifndef AVM_NO_SMP
smp_spinlock_init(&glb->processes_spinlock);
#endif
#ifndef AVM_NO_ISR
glb->message_queue = NULL;
#endif
synclist_init(&glb->avmpack_data);
synclist_init(&glb->refc_binaries);
Expand Down Expand Up @@ -277,6 +274,29 @@ Context *globalcontext_get_process_lock(GlobalContext *glb, int32_t process_id)
return NULL;
}

#ifndef AVM_NO_SMP
static bool globalcontext_get_process_trylock(GlobalContext *glb, int32_t process_id, Context **output)
{
struct ListHead *item;
Context *p = NULL;

struct ListHead *processes_table_list = synclist_tryrdlock(&glb->processes_table);
if (processes_table_list == NULL) {
return false;
}
LIST_FOR_EACH (item, processes_table_list) {
p = GET_LIST_ENTRY(item, Context, processes_table_head);

if (p->process_id == process_id) {
*output = p;
}
}
synclist_unlock(&glb->processes_table);

return true;
}
#endif

void globalcontext_get_process_unlock(GlobalContext *glb, Context *c)
{
if (c) {
Expand Down Expand Up @@ -308,6 +328,78 @@ void globalcontext_send_message_nolock(GlobalContext *glb, int32_t process_id, t
}
}

#ifndef AVM_NO_ISR
void globalcontext_send_message_from_isr(GlobalContext *glb, int32_t process_id, term t)
{
MailboxMessage *message = NULL;
bool postponed = false;
#ifndef AVM_NO_SMP
Context *p = NULL;
if (globalcontext_get_process_trylock(glb, process_id, &p)) {
if (p) {
message = mailbox_message_create_from_term(t);
// Ensure we can acquire the spinlock
if (smp_spinlock_trylock(&glb->processes_spinlock)) {
// We can send the message.
mailbox_enqueue_message(p, message);
scheduler_signal_message_from_isr(p);
smp_spinlock_unlock(&glb->processes_spinlock);
} else {
postponed = true;
}
globalcontext_get_process_unlock(glb, p);
}
} else {
postponed = true;
}
#else
// Without SMP, we have no lock, so we must always enqueue.
postponed = true;
#endif
if (postponed) {
if (message == NULL) {
message = mailbox_message_create_from_term(t);
}
struct MessageQueueItem *queued_item = malloc(sizeof(struct MessageQueueItem));
if (IS_NULL_PTR(queued_item)) {
fprintf(stderr, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__);
AVM_ABORT();
}
queued_item->message = message;
queued_item->process_id = process_id;

struct MessageQueueItem *current_first = NULL;
do {
queued_item->next = current_first;
} while (!ATOMIC_COMPARE_EXCHANGE_WEAK_PTR(&glb->message_queue, &current_first, queued_item));
// Make sure the scheduler is busy
sys_signal(glb);
}
}

void globalcontext_process_message_queue(GlobalContext *glb)
{
struct MessageQueueItem *current = glb->message_queue;
// Empty outer list using CAS
if (current) {
while (!ATOMIC_COMPARE_EXCHANGE_WEAK_PTR(&glb->message_queue, &current, NULL)) {
};
(void) synclist_rdlock(&glb->processes_table);
while (current) {
Context *context = globalcontext_get_process_nolock(glb, current->process_id);
if (context) {
mailbox_enqueue_message(context, current->message);
scheduler_signal_message(context);
}
struct MessageQueueItem *old = current;
current = old->next;
free(old);
}
synclist_unlock(&glb->processes_table);
}
}
#endif

void globalcontext_init_process(GlobalContext *glb, Context *ctx)
{
ctx->global = glb;
Expand Down
42 changes: 42 additions & 0 deletions src/libAtomVM/globalcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ typedef struct Module Module;
typedef struct GlobalContext GlobalContext;
#endif

#ifndef TYPEDEF_MAILBOXMESSAGE
#define TYPEDEF_MAILBOXMESSAGE
typedef struct MailboxMessage MailboxMessage;
#endif

struct MessageQueueItem
{
struct MessageQueueItem *next;
MailboxMessage *message;
int32_t process_id;
};

struct GlobalContext
{
struct ListHead ready_processes;
Expand All @@ -72,6 +84,10 @@ struct GlobalContext
// when running native handlers.
#ifndef AVM_NO_SMP
SpinLock processes_spinlock;
#endif
#ifndef AVM_NO_ISR
// Queue of messages that could not be sent from ISR
struct MessageQueueItem *ATOMIC message_queue;
#endif
struct SyncList refc_binaries;
struct SyncList processes_table;
Expand Down Expand Up @@ -219,6 +235,32 @@ void globalcontext_send_message(GlobalContext *glb, int32_t process_id, term t);
*/
void globalcontext_send_message_nolock(GlobalContext *glb, int32_t process_id, term t);

#ifndef AVM_NO_ISR
/**
* @brief Send a message to a process identified by its id. This variant is to
* be used from Interrupt Service Routines. It tries to acquire the necessary
* locks and if it fails, it enqueues the message which will be delivered on
* the next scheduler context switch.
*
* @details Safely send a message to the process, doing nothing if the process
* cannot be found.
*
* @param glb the global context (that owns the process table).
* @param process_id the target process id.
* @param t the message to send.
*/
void globalcontext_send_message_from_isr(GlobalContext *glb, int32_t process_id, term t);

/**
* @brief Process queue of message enqueued from ISR.
*
* @details This function is called from the scheduler.
*
* @param glb the global context (that owns the process table).
*/
void globalcontext_process_message_queue(GlobalContext *glb);
#endif

/**
* @brief Initialize a new process, providing it with a process id.
*
Expand Down
Loading

0 comments on commit aaf1e67

Please sign in to comment.