Skip to content

Commit

Permalink
Porting porting porting
Browse files Browse the repository at this point in the history
  • Loading branch information
erlingrj committed Nov 13, 2024
1 parent 39e00cb commit 515b036
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 172 deletions.
10 changes: 4 additions & 6 deletions examples/common/timer_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ typedef struct {
Reactor super;
TIMER_INSTANCE(TimerSource, t);
REACTION_INSTANCE(TimerSource, r);
Reaction *_reactions[1];
Trigger *_triggers[1];
REACTOR_BOOKKEEPING_INSTANCES(1,1,0);
} TimerSource;


void TimerSource_ctor(TimerSource *self, Environment *env) {
Reactor_ctor(&self->super, "TimerSource", env, NULL, NULL, 0, self->_reactions, 1, self->_triggers, 1);
size_t _triggers_idx = 0;
size_t _reactions_idx = 0;
REACTOR_CTOR_SIGNATURE(TimerSource) {
REACTOR_CTOR_PREAMBLE();
REACTOR_CTOR(TimerSource);
INITIALIZE_REACTION(TimerSource, r);
INITIALIZE_TIMER(TimerSource, t, MSEC(0), MSEC(500));
TIMER_REGISTER_EFFECT(t, r);
Expand Down
2 changes: 1 addition & 1 deletion examples/posix/federated/receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ DEFINE_REACTION_BODY(Receiver, r) {
in->value.size);
}

void Receiver_ctor(Receiver *self, Reactor *parent, Environment *env) {
REACTOR_CTOR_SIGNATURE(Receiver) {
REACTOR_CTOR_PREAMBLE();
REACTOR_CTOR(Receiver);
INITIALIZE_REACTION(Receiver, r);
Expand Down
2 changes: 1 addition & 1 deletion examples/posix/federated/sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ DEFINE_REACTION_BODY(Sender, r) {
lf_set(out, val);
}

void Sender_ctor(Sender *self, Reactor *parent, Environment *env, Connection **conn_out, size_t conn_out_num) {
REACTOR_CTOR_SIGNATURE_WITH_PARAMETERS(Sender, Connection **conn_out, size_t conn_out_num) {
REACTOR_CTOR_PREAMBLE();
REACTOR_CTOR(Sender);
INITIALIZE_REACTION(Sender, r);
Expand Down
62 changes: 56 additions & 6 deletions examples/zephyr/basic_federated/common/receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,24 @@
#include "reactor-uc/reactor-uc.h"
#include <zephyr/drivers/gpio.h>
#include <zephyr/kernel.h>

#include "reactor-uc/logging.h"
#include "reactor-uc/platform/posix/tcp_ip_channel.h"
#include "reactor-uc/reactor-uc.h"
#include "reactor-uc/serialization.h"
#include <zephyr/net/net_ip.h>

#include <zephyr/drivers/gpio.h>
#include <zephyr/kernel.h>

#ifndef PORT_NUM
#error "PORT_NUM must be defined"
#endif

#ifndef IP_ADDR
#error "IP_ADDR must be defined"
#endif

#define LED0_NODE DT_ALIAS(led0)
static const struct gpio_dt_spec led = GPIO_DT_SPEC_GET(LED0_NODE, gpios);

Expand All @@ -24,9 +42,8 @@ typedef struct {
Reactor super;
REACTION_INSTANCE(Receiver, r);
PORT_INSTANCE(Receiver, in);
REACTOR_BOOKKEEPING_INSTANCES(1,1,0);
int cnt;
Reaction *_reactions[1];
Trigger *_triggers[1];
} Receiver;

DEFINE_REACTION_BODY(Receiver, r) {
Expand All @@ -37,14 +54,47 @@ DEFINE_REACTION_BODY(Receiver, r) {
env->get_logical_time(env), env->get_physical_time(env));
}

void Receiver_ctor(Receiver *self, Reactor *parent, Environment *env) {
Reactor_ctor(&self->super, "Receiver", env, parent, NULL, 0, self->_reactions, 1, self->_triggers, 1);
size_t _reactions_idx = 0;
size_t _triggers_idx = 0;
REACTOR_CTOR_SIGNATURE(Receiver) {
REACTOR_CTOR_PREAMBLE();
REACTOR_CTOR(Receiver);
INITIALIZE_REACTION(Receiver, r);
INITIALIZE_INPUT(Receiver, in);

// Register reaction as an effect of in
INPUT_REGISTER_EFFECT(in, r);
REACTION_REGISTER_EFFECT(r, in);
}


DEFINE_FEDERATED_INPUT_CONNECTION(Receiver, in, msg_t, 5, MSEC(100), false);

typedef struct {
FederatedConnectionBundle super;
TcpIpChannel channel;
FEDERATED_INPUT_CONNECTION_INSTANCE(Receiver, in);
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1,0)
} FEDERATED_CONNECTION_BUNDLE_NAME(Receiver, Sender);


FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Receiver, Sender) {
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
TcpIpChannel_ctor(&self->channel, "192.168.1.100", PORT_NUM, AF_INET, false);
FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();
INITIALIZE_FEDERATED_INPUT_CONNECTION(Receiver, in, deserialize_payload_default);
}

typedef struct {
Reactor super;
CHILD_REACTOR_INSTANCE(Receiver, receiver);
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Receiver, Sender);
FEDERATE_BOOKKEEPING_INSTANCES(0,0,1,1);
} MainRecv;

REACTOR_CTOR_SIGNATURE(MainRecv) {
FEDERATE_CTOR_PREAMBLE();
REACTOR_CTOR(MainRecv);
INITIALIZE_CHILD_REACTOR(Receiver, receiver);
INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Receiver, Sender);
BUNDLE_REGISTER_DOWNSTREAM(Receiver, Sender, receiver, in);
}
ENTRY_POINT_FEDERATED(MainRecv, FOREVER, true, true, 1, false)
53 changes: 2 additions & 51 deletions examples/zephyr/basic_federated/federated_receiver1/src/receiver.c
Original file line number Diff line number Diff line change
@@ -1,56 +1,7 @@
#include "reactor-uc/logging.h"
#include "reactor-uc/platform/posix/tcp_ip_channel.h"
#include "reactor-uc/reactor-uc.h"
#include "reactor-uc/serialization.h"
#include <zephyr/net/net_ip.h>

#include <zephyr/drivers/gpio.h>
#include <zephyr/kernel.h>
#include "../../common/receiver.h"

#define PORT_NUM 8901
#define IP_ADDR "192.168.1.100"

DEFINE_FEDERATED_INPUT_CONNECTION(ConnRecv, 1, msg_t, 5, 0, false)

typedef struct {
FederatedConnectionBundle super;
TcpIpChannel chan;
ConnRecv conn;
FederatedInputConnection *inputs[1];
deserialize_hook deserialize_hooks[1];
} RecvSenderBundle;

void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
ConnRecv_ctor(&self->conn, parent);
TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_NUM, AF_INET, false);
self->inputs[0] = &self->conn.super;
self->deserialize_hooks[0] = deserialize_payload_default;

FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, (FederatedInputConnection **)&self->inputs,
self->deserialize_hooks, 1, NULL, NULL, 0);
}

typedef struct {
Reactor super;
Receiver receiver;
RecvSenderBundle bundle;
FederatedConnectionBundle *_bundles[1];
Reactor *_children[1];
} MainRecv;

void MainRecv_ctor(MainRecv *self, Environment *env) {
self->_children[0] = &self->receiver.super;
Receiver_ctor(&self->receiver, &self->super, env);

RecvSenderBundle_ctor(&self->bundle, &self->super);

CONN_REGISTER_DOWNSTREAM(self->bundle.conn, self->receiver.in);
Reactor_ctor(&self->super, "MainRecv", env, NULL, self->_children, 1, NULL, 0, NULL, 0);

self->_bundles[0] = &self->bundle.super;
}

ENTRY_POINT_FEDERATED(MainRecv, FOREVER, true, true, 1, false)
#include "../../common/receiver.h"

int main() {
setup_led();
Expand Down
49 changes: 2 additions & 47 deletions examples/zephyr/basic_federated/federated_receiver2/src/receiver.c
Original file line number Diff line number Diff line change
@@ -1,52 +1,7 @@
#include "reactor-uc/logging.h"
#include "reactor-uc/serialization.h"
#include "reactor-uc/platform/posix/tcp_ip_channel.h"
#include "reactor-uc/reactor-uc.h"
#include <zephyr/net/net_ip.h>
#include "../../common/receiver.h"

#define PORT_NUM 8902
#define IP_ADDR "192.168.1.100"

DEFINE_FEDERATED_INPUT_CONNECTION(ConnRecv, 1, msg_t, 5, 0, false)

typedef struct {
FederatedConnectionBundle super;
TcpIpChannel chan;
ConnRecv conn;
FederatedInputConnection *inputs[1];
deserialize_hook deserialize_hooks[1];
} RecvSenderBundle;

void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
ConnRecv_ctor(&self->conn, parent);
TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_NUM, AF_INET, false);
self->inputs[0] = &self->conn.super;
self->deserialize_hooks[0] = deserialize_payload_default;
FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, (FederatedInputConnection **)&self->inputs,
self->deserialize_hooks, 1, NULL, NULL, 0);
}

typedef struct {
Reactor super;
Receiver receiver;
RecvSenderBundle bundle;
FederatedConnectionBundle *_bundles[1];
Reactor *_children[1];
} MainRecv;

void MainRecv_ctor(MainRecv *self, Environment *env) {
self->_children[0] = &self->receiver.super;
Receiver_ctor(&self->receiver, &self->super, env);

RecvSenderBundle_ctor(&self->bundle, &self->super);

CONN_REGISTER_DOWNSTREAM(self->bundle.conn, self->receiver.in);
Reactor_ctor(&self->super, "MainRecv", env, NULL, self->_children, 1, NULL, 0, NULL, 0);

self->_bundles[0] = &self->bundle.super;
}

ENTRY_POINT_FEDERATED(MainRecv, FOREVER, true, true, 1, false)
#include "../../common/receiver.h"

int main() {
setup_led();
Expand Down
94 changes: 43 additions & 51 deletions examples/zephyr/basic_federated/federated_sender/src/sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ typedef struct {
REACTION_INSTANCE(Sender, r);
ACTION_INSTANCE(Sender, act);
PORT_INSTANCE(Sender, out);
Reaction *_reactions[1];
Trigger *_triggers[1];
REACTOR_BOOKKEEPING_INSTANCES(1,1,0);
} Sender;

DEFINE_REACTION_BODY(Sender, r) {
Expand All @@ -87,10 +86,9 @@ DEFINE_REACTION_BODY(Sender, r) {
lf_set(out, val);
}

void Sender_ctor(Sender *self, Reactor *parent, Environment *env, Connection **conn_out, size_t conn_out_num) {
Reactor_ctor(&self->super, "Sender", env, parent, NULL, 0, self->_reactions, 1, self->_triggers, 1);
size_t _reactions_idx = 0;
size_t _triggers_idx = 0;
REACTOR_CTOR_SIGNATURE_WITH_PARAMETERS(Sender, Connection **conn_out, size_t conn_out_num) {
REACTOR_CTOR_PREAMBLE();
REACTOR_CTOR(Sender);
INITIALIZE_REACTION(Sender, r);
INITIALIZE_ACTION(Sender, act, MSEC(0));
INITIALIZE_OUTPUT(Sender, out, conn_out, conn_out_num);
Expand All @@ -100,66 +98,60 @@ void Sender_ctor(Sender *self, Reactor *parent, Environment *env, Connection **c
OUTPUT_REGISTER_SOURCE(out, r);
}

DEFINE_FEDERATED_OUTPUT_CONNECTION(ConnSender1, msg_t, 1)
DEFINE_FEDERATED_OUTPUT_CONNECTION(ConnSender2, msg_t, 1)
DEFINE_FEDERATED_OUTPUT_CONNECTION(Sender, out, msg_t, 1)

typedef struct {
FederatedConnectionBundle super;
TcpIpChannel chan;
ConnSender1 conn;
FederatedOutputConnection *output[1];
serialize_hook serialize_hooks[1];
} SenderRecv1Bundle;
TcpIpChannel channel;
FEDERATED_OUTPUT_CONNECTION_INSTANCE(Sender, out);
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0,1);
} FEDERATED_CONNECTION_BUNDLE_NAME(Sender, Receiver1);

typedef struct {
FederatedConnectionBundle super;
TcpIpChannel chan;
ConnSender2 conn;
FederatedOutputConnection *output[1];
serialize_hook serialize_hooks[1];
} SenderRecv2Bundle;

void SenderRecv1Bundle_ctor(SenderRecv1Bundle *self, Reactor *parent) {
TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_CONN_1, AF_INET, true);
ConnSender1_ctor(&self->conn, parent, &self->super);
self->output[0] = &self->conn.super;
self->serialize_hooks[0] = serialize_payload_default;

FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, NULL, NULL, 0,
(FederatedOutputConnection **)&self->output, self->serialize_hooks, 1);
TcpIpChannel channel;
FEDERATED_OUTPUT_CONNECTION_INSTANCE(Sender, out);
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0,1);
} FEDERATED_CONNECTION_BUNDLE_NAME(Sender, Receiver2);

FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Sender, Receiver1) {
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
TcpIpChannel_ctor(&self->channel, "192.168.1.100", PORT_CONN_1, AF_INET, true);

FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();

INITIALIZE_FEDERATED_OUTPUT_CONNECTION(Sender, out, serialize_payload_default);
}

void SenderRecv2Bundle_ctor(SenderRecv2Bundle *self, Reactor *parent) {
TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_CONN_2, AF_INET, true);
ConnSender2_ctor(&self->conn, parent, &self->super);
self->output[0] = &self->conn.super;
self->serialize_hooks[0] = serialize_payload_default;
FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, NULL, NULL, 0,
(FederatedOutputConnection **)&self->output, self->serialize_hooks, 1);
FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Sender, Receiver2) {
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
TcpIpChannel_ctor(&self->channel, "192.168.1.100", PORT_CONN_2, AF_INET, true);

FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();

INITIALIZE_FEDERATED_OUTPUT_CONNECTION(Sender, out, serialize_payload_default);
}

// Reactor main
typedef struct {
Reactor super;
Sender sender;
SenderRecv1Bundle bundle1;
SenderRecv2Bundle bundle2;
FederatedConnectionBundle *_bundles[2];
Reactor *_children[1];
Connection *_sender_conn[2];
CHILD_REACTOR_INSTANCE(Sender, sender);
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Sender, Receiver1);
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Sender, Receiver2);
CONTAINED_OUTPUT_CONNECTIONS(sender, out, 2);
FEDERATE_BOOKKEEPING_INSTANCES(0,0,1,2);
} MainSender;

void MainSender_ctor(MainSender *self, Environment *env) {
self->_children[0] = &self->sender.super;
Sender_ctor(&self->sender, &self->super, env, self->_sender_conn, 2);
Reactor_ctor(&self->super, "MainSender", env, NULL, self->_children, 1, NULL, 0, NULL, 0);
SenderRecv1Bundle_ctor(&self->bundle1, &self->super);
SenderRecv2Bundle_ctor(&self->bundle2, &self->super);

CONN_REGISTER_UPSTREAM(self->bundle1.conn, self->sender.out);
CONN_REGISTER_UPSTREAM(self->bundle2.conn, self->sender.out);
REACTOR_CTOR_SIGNATURE(MainSender) {
FEDERATE_CTOR_PREAMBLE();
REACTOR_CTOR(MainSender);

INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Sender, sender, self->_conns_sender_out_out, 2);
INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Sender, Receiver1);
INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Sender, Receiver2);

self->_bundles[0] = &self->bundle1.super;
self->_bundles[1] = &self->bundle2.super;
BUNDLE_REGISTER_UPSTREAM(Sender, Receiver1, sender, out);
BUNDLE_REGISTER_UPSTREAM(Sender, Receiver2, sender, out);
}

ENTRY_POINT_FEDERATED(MainSender, FOREVER, true, true, 2, true)
Expand Down
Loading

0 comments on commit 515b036

Please sign in to comment.