Skip to content

Commit

Permalink
erling feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
tanneberger committed Nov 20, 2024
1 parent 86c750a commit 4db43cb
Show file tree
Hide file tree
Showing 14 changed files with 70 additions and 95 deletions.
2 changes: 1 addition & 1 deletion include/reactor-uc/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ typedef struct FederatedInputConnection FederatedInputConnection;
void lf_exit(void) { Environment_free(&env); } \
void lf_start() { \
DynamicScheduler_ctor(&scheduler, &env); \
Environment_ctor(&env, &scheduler.scheduler, (Reactor *)&main_reactor); \
Environment_ctor(&env, &scheduler.super, (Reactor *)&main_reactor); \
MainReactorName##_ctor(&main_reactor, NULL, &env); \
env.scheduler->duration = Timeout; \
env.scheduler->keep_alive = KeepAlive; \
Expand Down
21 changes: 0 additions & 21 deletions include/reactor-uc/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,6 @@ struct Scheduler {
*/
void (*run)(Scheduler *self);

/**
* @brief After committing to a tag, but before executing reactions, the
* scheduler must prepare the timestep by adding reactions to the reaction
* queue.
*/
void (*prepare_timestep)(Scheduler *self, tag_t tag);

/**
* @brief After completing all reactions at a tag, this function is called to
* reset is_present fields and increment index pointers of the EventPayloadPool.
*/
void (*clean_up_timestep)(Scheduler *self);

/**
* @brief Called after `prepare_timestep` to run all reactions on the current
* tag.
*/
void (*run_timestep)(Scheduler *self);

/**
* @brief Called to execute all reactions triggered by a shutdown trigger.
*/
Expand All @@ -71,8 +52,6 @@ struct Scheduler {
tag_t (*current_tag)(Scheduler *self);
};

void Scheduler_ctor(Scheduler *self);

#define SCHEDULER_DYNAMIC

#if defined(SCHEDULER_DYNAMIC)
Expand Down
22 changes: 21 additions & 1 deletion include/reactor-uc/schedulers/dynamic/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,37 @@ typedef struct DynamicScheduler DynamicScheduler;
typedef struct Environment Environment;

struct DynamicScheduler {
Scheduler scheduler;
Scheduler super;
Environment *env;
EventQueue event_queue;
ReactionQueue reaction_queue;

// The following two fields are used to implement a linked list of Triggers
// that are registered for cleanup at the end of the current tag.
Trigger *cleanup_ll_head;
Trigger *cleanup_ll_tail;
bool leader; // Whether this scheduler is the leader in a federated program and selects the start tag.
tag_t stop_tag; // The tag at which the program should stop. This is set by the user or by the scheduler.
tag_t current_tag; // The current logical tag. Set by the scheduler and read by user in the reaction bodies.

/**
* @brief After committing to a tag, but before executing reactions, the
* scheduler must prepare the timestep by adding reactions to the reaction
* queue.
*/
void (*prepare_timestep)(Scheduler *self, tag_t tag);

/**
* @brief After completing all reactions at a tag, this function is called to
* reset is_present fields and increment index pointers of the EventPayloadPool.
*/
void (*clean_up_timestep)(Scheduler *self);

/**
* @brief Called after `prepare_timestep` to run all reactions on the current
* tag.
*/
void (*run_timestep)(Scheduler *self);
};

void DynamicScheduler_ctor(DynamicScheduler *self, Environment *env);
Expand Down
2 changes: 1 addition & 1 deletion include/reactor-uc/schedulers/static/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ typedef struct StaticScheduler StaticScheduler;
typedef struct Environment Environment;

struct StaticScheduler {
Scheduler *scheduler;
Scheduler *super;
Environment *env;
const inst_t **static_schedule;
size_t *pc;
Expand Down
1 change: 0 additions & 1 deletion src/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "reactor-uc/scheduler.h"
#include <assert.h>
#include <inttypes.h>
#include <reactor-uc/schedulers/dynamic/scheduler.h>

void Environment_validate(Environment *self) {
Reactor_validate(self->main);
Expand Down
2 changes: 0 additions & 2 deletions src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,3 @@
#include "schedulers/static/instructions.c"
#include "schedulers/dynamic/dynamic.c"
#include "schedulers/static/scheduler.c"

void Scheduler_ctor(Scheduler *self) { (void)self; }
69 changes: 34 additions & 35 deletions src/schedulers/dynamic/dynamic.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,16 @@ void Scheduler_do_shutdown(Scheduler *untyped_self, tag_t shutdown_tag) {

LF_INFO(SCHED, "Scheduler terminating at tag %" PRId64 ":%" PRIu32, shutdown_tag.time, shutdown_tag.microstep);
Environment *env = self->env;
self->scheduler.prepare_timestep(untyped_self, shutdown_tag);
self->prepare_timestep(untyped_self, shutdown_tag);
env->leave_critical_section(env);

Trigger *shutdown = &self->env->shutdown->super;

Event event = EVENT_INIT(shutdown_tag, shutdown, NULL);
if (shutdown) {
Scheduler_prepare_builtin(&event);
self->scheduler.run_timestep(untyped_self);
self->scheduler.clean_up_timestep(untyped_self);
self->run_timestep(untyped_self);
self->clean_up_timestep(untyped_self);
}
}

Expand Down Expand Up @@ -189,24 +189,24 @@ void Scheduler_acquire_and_schedule_start_tag(Scheduler *untyped_self) {
Environment *env = self->env;
env->enter_critical_section(env);
if (env->net_bundles_size == 0) {
self->scheduler.start_time = env->get_physical_time(env);
LF_DEBUG(SCHED, "No federated connections, picking start_time %" PRId64, self->scheduler.start_time);
self->super.start_time = env->get_physical_time(env);
LF_DEBUG(SCHED, "No federated connections, picking start_time %" PRId64, self->super.start_time);
} else if (self->leader) {
self->scheduler.start_time = env->get_physical_time(env);
LF_DEBUG(SCHED, "Is leader of the federation, picking start_time %" PRId64, self->scheduler.start_time);
Federated_distribute_start_tag(env, self->scheduler.start_time);
self->super.start_time = env->get_physical_time(env);
LF_DEBUG(SCHED, "Is leader of the federation, picking start_time %" PRId64, self->super.start_time);
Federated_distribute_start_tag(env, self->super.start_time);
} else {
LF_DEBUG(SCHED, "Not leader, waiting for start tag signal");
while (self->scheduler.start_time == NEVER) {
while (self->super.start_time == NEVER) {
env->wait_until(env, FOREVER);
}
}
LF_DEBUG(SCHED, "Start_time is %" PRId64, self->scheduler.start_time);
tag_t start_tag = {.time = self->scheduler.start_time, .microstep = 0};
LF_DEBUG(SCHED, "Start_time is %" PRId64, self->super.start_time);
tag_t start_tag = {.time = self->super.start_time, .microstep = 0};
// Set the stop tag
self->stop_tag = lf_delay_tag(start_tag, self->scheduler.duration);
LF_DEBUG(INFO, "Start time is %" PRId64 "and stop time is %" PRId64 " (%" PRId32 ")", self->scheduler.start_time,
self->stop_tag.time, self->scheduler.duration);
self->stop_tag = lf_delay_tag(start_tag, self->super.duration);
LF_DEBUG(INFO, "Start time is %" PRId64 "and stop time is %" PRId64 " (%" PRId32 ")", self->super.start_time,
self->stop_tag.time, self->super.duration);

// Schedule the initial events
Scheduler_schedule_startups(untyped_self, start_tag);
Expand All @@ -220,7 +220,7 @@ void Scheduler_run(Scheduler *untyped_self) {
Environment *env = self->env;
lf_ret_t res;
tag_t next_tag;
bool non_terminating = self->scheduler.keep_alive || env->has_async_events;
bool non_terminating = self->super.keep_alive || env->has_async_events;
bool going_to_shutdown = false;
LF_INFO(SCHED, "Scheduler running with non_terminating=%d has_async_events=%d", non_terminating,
env->has_async_events);
Expand Down Expand Up @@ -260,15 +260,15 @@ void Scheduler_run(Scheduler *untyped_self) {
break;
}

self->scheduler.prepare_timestep(untyped_self, next_tag);
self->prepare_timestep(untyped_self, next_tag);

Scheduler_pop_events_and_prepare(untyped_self, next_tag);
LF_DEBUG(SCHED, "Acquired tag %" PRId64 ":%" PRIu32, next_tag.time, next_tag.microstep);

env->leave_critical_section(env);

self->scheduler.run_timestep(untyped_self);
self->scheduler.clean_up_timestep(untyped_self);
self->run_timestep(untyped_self);
self->clean_up_timestep(untyped_self);

env->enter_critical_section(env);
}
Expand All @@ -283,7 +283,7 @@ void Scheduler_run(Scheduler *untyped_self) {
shutdown_tag = self->stop_tag;
}

self->scheduler.do_shutdown(untyped_self, shutdown_tag);
self->super.do_shutdown(untyped_self, shutdown_tag);
}

lf_ret_t Scheduler_schedule_at_locked(Scheduler *untyped_self, Event *event) {
Expand Down Expand Up @@ -350,30 +350,29 @@ tag_t Scheduler_current_tag(Scheduler *untyped_self) { return ((DynamicScheduler
void DynamicScheduler_ctor(DynamicScheduler *self, Environment *env) {
self->env = env;

self->scheduler.keep_alive = false;
self->super.keep_alive = false;
self->stop_tag = FOREVER_TAG;
self->current_tag = NEVER_TAG;
self->scheduler.duration = FOREVER;
self->super.duration = FOREVER;
self->cleanup_ll_head = NULL;
self->cleanup_ll_tail = NULL;
self->leader = false;

self->scheduler.start_time = NEVER;
self->scheduler.run = Scheduler_run;
self->scheduler.prepare_timestep = Scheduler_prepare_timestep;
self->scheduler.clean_up_timestep = Scheduler_clean_up_timestep;
self->scheduler.run_timestep = Scheduler_run_timestep;
self->scheduler.do_shutdown = Scheduler_do_shutdown;
self->scheduler.schedule_at = Scheduler_schedule_at;
self->scheduler.schedule_at_locked = Scheduler_schedule_at_locked;
self->scheduler.register_for_cleanup = Scheduler_register_for_cleanup;
self->scheduler.request_shutdown = Scheduler_request_shutdown;
self->scheduler.acquire_and_schedule_start_tag = Scheduler_acquire_and_schedule_start_tag;
self->super.start_time = NEVER;
self->super.run = Scheduler_run;
self->prepare_timestep = Scheduler_prepare_timestep;
self->clean_up_timestep = Scheduler_clean_up_timestep;
self->run_timestep = Scheduler_run_timestep;
self->super.do_shutdown = Scheduler_do_shutdown;
self->super.schedule_at = Scheduler_schedule_at;
self->super.schedule_at_locked = Scheduler_schedule_at_locked;
self->super.register_for_cleanup = Scheduler_register_for_cleanup;
self->super.request_shutdown = Scheduler_request_shutdown;
self->super.acquire_and_schedule_start_tag = Scheduler_acquire_and_schedule_start_tag;
// self->scheduler.set_duration = Scheduler_set_duration;
self->scheduler.add_to_reaction_queue = Scheduler_add_to_reaction_queue;
self->scheduler.current_tag = Scheduler_current_tag;
self->super.add_to_reaction_queue = Scheduler_add_to_reaction_queue;
self->super.current_tag = Scheduler_current_tag;

Scheduler_ctor(&self->scheduler);
EventQueue_ctor(&self->event_queue);
ReactionQueue_ctor(&self->reaction_queue);
}
33 changes: 7 additions & 26 deletions src/schedulers/static/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,11 @@ void StaticScheduler_ctor(StaticScheduler *self, Environment *env, const inst_t
self->env = env;
self->static_schedule = static_schedule;

/*
self->keep_alive = false;
self->stop_tag = FOREVER_TAG;
self->current_tag = NEVER_TAG;
self->start_time = NEVER;
self->duration = FOREVER;
self->cleanup_ll_head = NULL;
self->cleanup_ll_tail = NULL;
self->leader = false;
*/

self->scheduler->run = Scheduler_run;
self->scheduler->prepare_timestep = Scheduler_prepare_timestep;
self->scheduler->clean_up_timestep = Scheduler_clean_up_timestep;
self->scheduler->run_timestep = Scheduler_run_timestep;
self->scheduler->do_shutdown = Scheduler_do_shutdown;
self->scheduler->schedule_at = Scheduler_schedule_at;
self->scheduler->schedule_at_locked = Scheduler_schedule_at_locked;
self->scheduler->register_for_cleanup = Scheduler_register_for_cleanup;
self->scheduler->request_shutdown = Scheduler_request_shutdown;
self->scheduler->acquire_and_schedule_start_tag = Scheduler_acquire_and_schedule_start_tag;
// self->scheduler.set_duration = Scheduler_set_duration;
// self->scheduler.add_to_reaction_queue = Scheduler_add_to_reaction_queue;
// self->scheduler.current_tag = Scheduler_current_tag;

Scheduler_ctor(self->scheduler);
self->super->run = Scheduler_run;
self->super->do_shutdown = Scheduler_do_shutdown;
self->super->schedule_at = Scheduler_schedule_at;
self->super->schedule_at_locked = Scheduler_schedule_at_locked;
self->super->register_for_cleanup = Scheduler_register_for_cleanup;
self->super->request_shutdown = Scheduler_request_shutdown;
self->super->acquire_and_schedule_start_tag = Scheduler_acquire_and_schedule_start_tag;
}
1 change: 0 additions & 1 deletion src/timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ void Timer_prepare(Trigger *_self, Event *event) {
LF_DEBUG(TRIG, "Triggering %d reactions", self->effects.size);
for (size_t i = 0; i < self->effects.size; i++) {
validaten(sched->add_to_reaction_queue(sched, self->effects.reactions[i]));
// reaction_queue.insert(&sched->reaction_queue, self->effects.reactions[i]));
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/unit/action_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void action_lib_start(interval_t duration) {
Environment env;
DynamicScheduler scheduler;
DynamicScheduler_ctor(&scheduler, &env);
Environment_ctor(&env, &scheduler.scheduler, (Reactor *)&my_reactor);
Environment_ctor(&env, &scheduler.super, (Reactor *)&my_reactor);
ActionLib_ctor(&my_reactor, NULL, &env);
env.scheduler->duration = duration;
env.assemble(&env);
Expand Down
2 changes: 1 addition & 1 deletion test/unit/delayed_conn_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void test_simple() {
Environment env;
DynamicScheduler scheduler;
DynamicScheduler_ctor(&scheduler, &env);
Environment_ctor(&env, &scheduler.scheduler ,(Reactor *)&main);
Environment_ctor(&env, &scheduler.super ,(Reactor *)&main);
Main_ctor(&main, NULL, &env);
env.scheduler->duration = MSEC(100);
env.assemble(&env);
Expand Down
2 changes: 1 addition & 1 deletion test/unit/physical_action_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void test_simple() {
PhyActionTest my_reactor;
DynamicScheduler scheduler;
DynamicScheduler_ctor(&scheduler, &env);
Environment_ctor(&env, &scheduler.scheduler ,(Reactor *)&my_reactor);
Environment_ctor(&env, &scheduler.super ,(Reactor *)&my_reactor);
PhyActionTest_ctor(&my_reactor, NULL, &env);
env.scheduler->duration = MSEC(100);
env.assemble(&env);
Expand Down
2 changes: 1 addition & 1 deletion test/unit/port_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void test_simple() {
Environment env;
DynamicScheduler scheduler;
DynamicScheduler_ctor(&scheduler, &env);
Environment_ctor(&env, &scheduler.scheduler, (Reactor *)&main);
Environment_ctor(&env, &scheduler.super, (Reactor *)&main);
Main_ctor(&main, NULL, &env);
env.scheduler->duration = MSEC(100);
env.assemble(&env);
Expand Down
4 changes: 2 additions & 2 deletions test/unit/timer_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ typedef struct {
DEFINE_REACTION_BODY(TimerTest, reaction) {
SCOPE_SELF(TimerTest);
SCOPE_ENV();
//TEST_ASSERT_EQUAL(self->cnt * MSEC(1), env->get_elapsed_logical_time(env));
TEST_ASSERT_EQUAL(self->cnt * MSEC(1), env->get_elapsed_logical_time(env));
printf("Hello World @ %ld\n", env->get_elapsed_logical_time(env));
self->cnt++;
}
Expand All @@ -37,7 +37,7 @@ Environment env;
void test_simple() {
DynamicScheduler scheduler;
DynamicScheduler_ctor(&scheduler, &env);
Environment_ctor(&env, &scheduler.scheduler, (Reactor *)&my_reactor);
Environment_ctor(&env, &scheduler.super, (Reactor *)&my_reactor);
env.scheduler->duration = MSEC(100);
TimerTest_ctor(&my_reactor, NULL, &env);
env.assemble(&env);
Expand Down

0 comments on commit 4db43cb

Please sign in to comment.