Skip to content

Commit

Permalink
scheduling interface
Browse files Browse the repository at this point in the history
  • Loading branch information
tanneberger committed Nov 18, 2024
1 parent 3aa9412 commit 27a5ec5
Show file tree
Hide file tree
Showing 23 changed files with 1,027 additions and 396 deletions.
4 changes: 2 additions & 2 deletions include/reactor-uc/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ typedef struct Environment Environment;

struct Environment {
Reactor *main; // The top-level reactor of the program.
Scheduler scheduler; // The scheduler in charge of executing the reactions.
Scheduler* scheduler; // The scheduler in charge of executing the reactions.
Platform *platform; // The platform that provides the physical time and sleep functions.
bool has_async_events;
BuiltinTrigger *startup; // A pointer to a startup trigger, if the program has one.
Expand Down Expand Up @@ -59,7 +59,7 @@ struct Environment {
void (*request_shutdown)(Environment *self);
};

void Environment_ctor(Environment *self, Reactor *main);
void Environment_ctor(Environment *self, Scheduler* scheduler, Reactor *main);
void Environment_free(Environment *self);

#endif
2 changes: 1 addition & 1 deletion include/reactor-uc/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
if (ret == LF_FATAL) { \
LF_ERR(TRIG, "Scheduling an value, that doesn't have value!"); \
Scheduler *sched = &(action)->super.super.parent->env->scheduler; \
sched->do_shutdown(sched, sched->current_tag); \
sched->do_shutdown(sched, sched->current_tag(sched)); \
throw("Tried to schedule a value onto an action without a type!"); \
} \
} while (0)
Expand Down
23 changes: 9 additions & 14 deletions include/reactor-uc/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,7 @@ typedef struct Scheduler Scheduler;
typedef struct Environment Environment;

struct Scheduler {
Environment *env;
EventQueue event_queue;
ReactionQueue reaction_queue;
// The following two fields are used to implement a linked list of Triggers
// that are registered for cleanup at the end of the current tag.
Trigger *cleanup_ll_head;
Trigger *cleanup_ll_tail;
bool leader; // Whether this scheduler is the leader in a federated program and selects the start tag.
instant_t start_time; // The physical time at which the program started.
interval_t duration; // The duration after which the program should stop.
tag_t stop_tag; // The tag at which the program should stop. This is set by the user or by the scheduler.
tag_t current_tag; // The current logical tag. Set by the scheduler and read by user in the reaction bodies.
bool keep_alive; // Whether the program should keep running even if there are no more events to process.
long int start_time;

/**
* @brief Schedules an event on trigger at a specified tag. This function will
Expand Down Expand Up @@ -73,7 +61,14 @@ struct Scheduler {
void (*register_for_cleanup)(Scheduler *self, Trigger *trigger);

void (*acquire_and_schedule_start_tag)(Scheduler *self);

void (*set_duration)(Scheduler* self, interval_t duration);

lf_ret_t (*add_to_reaction_queue)(Scheduler *self, Reaction* reaction);

tag_t (*current_tag)(Scheduler* self);
};

void Scheduler_ctor(Scheduler *self, Environment *env);
void Scheduler_ctor(Scheduler* self);

#endif
33 changes: 33 additions & 0 deletions include/reactor-uc/schedulers/dynamic/scheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// Created by tanneberger on 11/16/24.
//

#ifndef SCHEDULER_H
#define SCHEDULER_H
#include "reactor-uc/error.h"
#include "reactor-uc/queues.h"
#include "reactor-uc/scheduler.h"

typedef struct DynamicScheduler DynamicScheduler;
typedef struct Environment Environment;

struct DynamicScheduler {
Scheduler scheduler;
Environment *env;
EventQueue event_queue;
ReactionQueue reaction_queue;
// The following two fields are used to implement a linked list of Triggers
// that are registered for cleanup at the end of the current tag.
Trigger *cleanup_ll_head;
Trigger *cleanup_ll_tail;
bool leader; // Whether this scheduler is the leader in a federated program and selects the start tag.
instant_t start_time; // The physical time at which the program started.
interval_t duration; // The duration after which the program should stop.
tag_t stop_tag; // The tag at which the program should stop. This is set by the user or by the scheduler.
tag_t current_tag; // The current logical tag. Set by the scheduler and read by user in the reaction bodies.
bool keep_alive; // Whether the program should keep running even if there are no more events to process.
};

void DynamicScheduler_ctor(DynamicScheduler *self, Environment *env);

#endif //SCHEDULER_H
64 changes: 64 additions & 0 deletions include/reactor-uc/schedulers/static/instructions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#ifndef SCHEDULER_STATIC_FUNCTION_H
#define SCHEDULER_STATIC_FUNCTION_H

#include "scheduler_instructions.h"
#include "reactor-uc/reaction.h"
#include "reactor-uc/platform.h"

/**
* @brief Function type with a void* argument. To make this type represent a
* generic function, one can write a wrapper function around the target function
* and use the first argument as a pointer to a struct of input arguments
* and return values.
*/

#ifndef LF_PRINT_DEBUG
#define LF_PRINT_DEBUG(A, ...) {};
#endif

/**
* @brief Wrapper function for peeking a priority queue.
*/
void push_pop_peek_pqueue(void *self);

void execute_inst_ADD(Platform* platform, size_t worker_number, operand_t op1, operand_t op2,
operand_t op3, bool debug, size_t *pc,
Reaction **returned_reaction, bool *exit_loop);
void execute_inst_ADDI(Platform* platform, size_t worker_number, operand_t op1, operand_t op2,
operand_t op3, bool debug, size_t *pc,
Reaction **returned_reaction, bool *exit_loop);
void execute_inst_BEQ(Platform* platform, size_t worker_number, operand_t op1, operand_t op2,
operand_t op3, bool debug, size_t *pc,
Reaction **returned_reaction, bool *exit_loop);
void execute_inst_BGE(Platform* platform, size_t worker_number, operand_t op1, operand_t op2,
operand_t op3, bool debug, size_t *pc,
Reaction **returned_reaction, bool *exit_loop);
void execute_inst_BLT(Platform* platform, size_t worker_number, operand_t op1, operand_t op2,
operand_t op3, bool debug, size_t *pc,
Reaction **returned_reaction, bool *exit_loop);
void execute_inst_BNE(Platform* platform, size_t worker_number, operand_t op1, operand_t op2,
operand_t op3, bool debug, size_t *pc,
Reaction **returned_reaction, bool *exit_loop);
void execute_inst_DU(Platform* platform, size_t worker_number, operand_t op1, operand_t op2,
operand_t op3, bool debug, size_t *pc,
Reaction **returned_reaction, bool *exit_loop);
void execute_inst_EXE(Platform* platform, size_t worker_number, operand_t op1, operand_t op2,
operand_t op3, bool debug, size_t *pc,
Reaction **returned_reaction, bool *exit_loop);
void execute_inst_WLT(Platform* platform, size_t worker_number, operand_t op1, operand_t op2,
operand_t op3, bool debug, size_t *pc,
Reaction **returned_reaction, bool *exit_loop);
void execute_inst_WU(Platform* platform, size_t worker_number, operand_t op1, operand_t op2,
operand_t op3, bool debug, size_t *pc,
Reaction **returned_reaction, bool *exit_loop);
void execute_inst_JAL(Platform* platform, size_t worker_number, operand_t op1, operand_t op2,
operand_t op3, bool debug, size_t *pc,
Reaction **returned_reaction, bool *exit_loop);
void execute_inst_JALR(Platform* platform, size_t worker_number, operand_t op1, operand_t op2,
operand_t op3, bool debug, size_t *pc,
Reaction **returned_reaction, bool *exit_loop);
void execute_inst_STP(Platform* platform, size_t worker_number, operand_t op1, operand_t op2,
operand_t op3, bool debug, size_t *pc,
Reaction **returned_reaction, bool *exit_loop);

#endif
23 changes: 23 additions & 0 deletions include/reactor-uc/schedulers/static/scheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//
// Created by tanneberger on 11/16/24.
//

#ifndef STATIC_SCHEDULER_H
#define STATIC_SCHEDULER_H
#include "reactor-uc/error.h"
#include "reactor-uc/queues.h"
#include "reactor-uc/scheduler.h"

typedef struct StaticScheduler StaticScheduler;
typedef struct Environment Environment;

struct StaticScheduler {
Scheduler* scheduler;
Environment *env;
const inst_t** static_schedule;
size_t* pc;
};

void StaticScheduler_ctor(StaticScheduler *self, Environment *env);

#endif //STATIC_SCHEDULER_H
72 changes: 72 additions & 0 deletions include/reactor-uc/schedulers/static/scheduler_instructions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* @author Shaokai Lin <[email protected]>
* @brief Format of the instruction set
*/
#ifndef SCHEDULER_INSTRUCTIONS_H
#define SCHEDULER_INSTRUCTIONS_H

#include "reactor-uc/reaction.h"
#include "reactor-uc/platform.h"

typedef enum {
ADD,
ADDI,
ADV,
ADVI,
BEQ,
BGE,
BLT,
BNE,
DU,
EXE,
JAL,
JALR,
STP,
WLT,
WU,
} opcode_t;

/**
* @brief Convenient typedefs for the data types used by the C implementation of
* PRET VM. A register is 64bits and an immediate is 64bits. This avoids any
* issue with time and overflow. Arguably it is worth it even for smaller
* platforms.
*
*/
typedef volatile uint64_t reg_t;
typedef uint64_t imm_t;

/**
* @brief An union representing a single operand for the PRET VM. A union
* means that we have one piece of memory, which is big enough to fit either
* one of the two members of the union.
*
*/
typedef union {
reg_t *reg;
imm_t imm;
} operand_t;

/**
* @brief Virtual instruction function pointer
*/
typedef void (*function_virtual_instruction_t)(
Platform* platform, size_t worker_number, operand_t op1, operand_t op2, operand_t op3,
bool debug, size_t *pc, Reaction **returned_reaction, bool *exit_loop);

/**
* @brief This struct represents a PRET VM instruction for C platforms.
* There is an opcode and three operands. The operands are unions so they
* can be either a pointer or an immediate
*
*/
typedef struct inst_t {
function_virtual_instruction_t func;
opcode_t opcode;
operand_t op1;
operand_t op2;
operand_t op3;
bool debug;
} inst_t;

#endif
8 changes: 4 additions & 4 deletions src/action.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ void Action_cleanup(Trigger *self) {
void Action_prepare(Trigger *self, Event *event) {
LF_DEBUG(TRIG, "Preparing action %p", self);
Action *act = (Action *)self;
Scheduler *sched = &self->parent->env->scheduler;
Scheduler *sched = self->parent->env->scheduler;
memcpy(act->value_ptr, event->payload, act->payload_pool.size);

if (self->is_present) {
LF_WARN(TRIG, "Action %p is already present at this tag. Its value was overwritten", self);
} else {
sched->register_for_cleanup(sched, self);
for (size_t i = 0; i < act->effects.size; i++) {
validate(sched->reaction_queue.insert(&sched->reaction_queue, act->effects.reactions[i]) == LF_OK);
validate(sched->add_to_reaction_queue(sched, act->effects.reactions[i]) == LF_OK);
}
}

Expand All @@ -33,7 +33,7 @@ void Action_prepare(Trigger *self, Event *event) {
lf_ret_t Action_schedule(Action *self, interval_t offset, const void *value) {
lf_ret_t ret;
Environment *env = self->super.parent->env;
Scheduler *sched = &env->scheduler;
Scheduler *sched = env->scheduler;
void *payload = NULL;

env->enter_critical_section(env);
Expand Down Expand Up @@ -73,7 +73,7 @@ lf_ret_t Action_schedule(Action *self, interval_t offset, const void *value) {
if (self->type == PHYSICAL_ACTION) {
base_tag.time = env->get_physical_time(env);
} else {
base_tag = sched->current_tag;
base_tag = sched->current_tag(sched);
}

tag_t tag = lf_delay_tag(base_tag, total_offset);
Expand Down
4 changes: 2 additions & 2 deletions src/builtin_triggers.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ void Builtin_prepare(Trigger *_self, Event *event) {
LF_DEBUG(TRIG, "Preparing builtin trigger %p", _self);
lf_ret_t ret;
BuiltinTrigger *self = (BuiltinTrigger *)_self;
Scheduler *sched = &_self->parent->env->scheduler;
Scheduler *sched = _self->parent->env->scheduler;
_self->is_present = true;
sched->register_for_cleanup(sched, _self);
assert(self->effects.size > 0);

for (size_t i = 0; i < self->effects.size; i++) {
ret = sched->reaction_queue.insert(&sched->reaction_queue, self->effects.reactions[i]);
ret = sched->add_to_reaction_queue(sched, self->effects.reactions[i]);
validate(ret == LF_OK);
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void DelayedConnection_prepare(Trigger *trigger, Event *event) {
LF_DEBUG(CONN, "Preparing delayed connection %p for triggering", trigger);
DelayedConnection *self = (DelayedConnection *)trigger;
assert(self->staged_payload_ptr == NULL); // Should be reset to NULL at end of last tag.
Scheduler *sched = &trigger->parent->env->scheduler;
Scheduler *sched = trigger->parent->env->scheduler;
EventPayloadPool *pool = trigger->payload_pool;
trigger->is_present = true;
sched->register_for_cleanup(sched, trigger);
Expand All @@ -114,13 +114,13 @@ void DelayedConnection_cleanup(Trigger *trigger) {
if (self->staged_payload_ptr) {
LF_DEBUG(CONN, "Delayed connection %p had a staged value. Schedule it", trigger);
Environment *env = self->super.super.parent->env;
Scheduler *sched = &env->scheduler;
Scheduler *sched = env->scheduler;

tag_t base_tag = ZERO_TAG;
if (self->type == PHYSICAL_CONNECTION) {
base_tag.time = env->get_physical_time(env);
} else {
base_tag = sched->current_tag;
base_tag = sched->current_tag(sched);
}
Event event = EVENT_INIT(lf_delay_tag(base_tag, self->delay), trigger, self->staged_payload_ptr);
sched->schedule_at(sched, &event);
Expand All @@ -135,7 +135,7 @@ void DelayedConnection_trigger_downstreams(Connection *_self, const void *value,
lf_ret_t ret;
LF_DEBUG(CONN, "Triggering downstreams on delayed connection %p. Stage the value for later scheduling", _self);
Trigger *trigger = &_self->super;
Scheduler *sched = &_self->super.parent->env->scheduler;
Scheduler *sched = _self->super.parent->env->scheduler;
EventPayloadPool *pool = trigger->payload_pool;
if (self->staged_payload_ptr == NULL) {
ret = pool->allocate(pool, &self->staged_payload_ptr);
Expand Down
Loading

0 comments on commit 27a5ec5

Please sign in to comment.