Skip to content

Commit

Permalink
Improve federated init, validate reactor program, delimited proto mes…
Browse files Browse the repository at this point in the history
…sages (#112)

* Validate that reactor program is correctly constructed after assemble

* Add more validation

* Validate connections also

* Naming

* Add wait_for in platform API

* More fixes piling on

* Fix mistake in serialize/deserialize funciotn  in posix example

* Pico fix

* riot fix

* Zephyr fixe

* Zephyr example fixes

* Add a LF_CONNECTION_CLOSED return type. Terminate receive thread if server closes connection.

* More comments on environment_assemble

* Update last_known_tag to FOREVER when an input connection closes

* typo

* Avoid null-pointer exception

* Incorporate changes to the channel API

* Surpress unused warning
  • Loading branch information
erlingrj authored Nov 7, 2024
1 parent 46f8297 commit 9a3f83c
Show file tree
Hide file tree
Showing 23 changed files with 395 additions and 289 deletions.
15 changes: 3 additions & 12 deletions examples/posix/federated/receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ typedef struct {
lf_ret_t deserialize_msg_t(void *user_struct, const unsigned char *msg_buf, size_t msg_size) {
msg_t *msg = user_struct;
memcpy(&msg->size, msg_buf, sizeof(msg->size));
memcpy(msg->msg, msg_buf + sizeof(msg->size), sizeof(msg->size));
memcpy(msg->msg, msg_buf + sizeof(msg->size), msg->size);

return LF_OK;
}
Expand All @@ -36,7 +36,8 @@ DEFINE_REACTION_BODY(Receiver, 0) {
Receiver *self = (Receiver *)_self->parent;
Environment *env = self->super.env;
In *inp = &self->inp;
printf("Input triggered @ %" PRId64 " with %s\n", env->get_elapsed_logical_time(env), inp->value.msg);
printf("Input triggered @ %" PRId64 " with %s size %d\n", env->get_elapsed_logical_time(env), inp->value.msg,
inp->value.size);
}
DEFINE_REACTION_CTOR(Receiver, 0)

Expand Down Expand Up @@ -65,16 +66,6 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
ConnRecv_ctor(&self->conn, parent);
TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET, false);
self->inputs[0] = &self->conn.super;

NetworkChannel *channel = (NetworkChannel *)&self->channel;
channel->open_connection(channel);

lf_ret_t ret;
do {
ret = channel->try_connect(channel);
} while (ret != LF_OK);
validate(ret == LF_OK);
printf("Recv: Connected\n");
self->deserialize_hooks[0] = deserialize_msg_t;

FederatedConnectionBundle_ctor(&self->super, parent, &self->channel.super, (FederatedInputConnection **)&self->inputs,
Expand Down
52 changes: 1 addition & 51 deletions examples/posix/federated/sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ size_t serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigne
const msg_t *msg = user_struct;

memcpy(msg_buf, &msg->size, sizeof(msg->size));
memcpy(msg_buf + sizeof(msg->size), msg->msg, sizeof(msg->size));
memcpy(msg_buf + sizeof(msg->size), msg->msg, msg->size);

return sizeof(msg->size) + msg->size;
}
Expand Down Expand Up @@ -62,39 +62,6 @@ void Sender_ctor(Sender *self, Reactor *parent, Environment *env, Connection **c
OUTPUT_REGISTER_SOURCE(self->out, self->reaction);
}

DEFINE_REACTION_STRUCT(Receiver, 0, 1)
DEFINE_INPUT_PORT_STRUCT(In, 1, msg_t, 0)
DEFINE_INPUT_PORT_CTOR(In, 1, msg_t, 0)

typedef struct {
Reactor super;
Receiver_Reaction0 reaction;
In inp;
int cnt;
Reaction *_reactions[1];
Trigger *_triggers[1];
} Receiver;

DEFINE_REACTION_BODY(Receiver, 0) {
Receiver *self = (Receiver *)_self->parent;
Environment *env = self->super.env;
In *inp = &self->inp;
printf("Input triggered @ %" PRId64 " with %s size %i\n", env->get_elapsed_logical_time(env), inp->value.msg,
inp->value.size);
}
DEFINE_REACTION_CTOR(Receiver, 0)

void Receiver_ctor(Receiver *self, Reactor *parent, Environment *env) {
self->_reactions[0] = (Reaction *)&self->reaction;
self->_triggers[0] = (Trigger *)&self->inp;
Reactor_ctor(&self->super, "Receiver", env, parent, NULL, 0, self->_reactions, 1, self->_triggers, 1);
Receiver_Reaction0_ctor(&self->reaction, &self->super);
In_ctor(&self->inp, &self->super);

// Register reaction as an effect of in
INPUT_REGISTER_EFFECT(self->inp, self->reaction);
}

DEFINE_FEDERATED_OUTPUT_CONNECTION(ConnSender, msg_t, 1)

typedef struct {
Expand All @@ -109,23 +76,6 @@ void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) {
TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET, true);
ConnSender_ctor(&self->conn, &parent->super, &self->super);
self->output[0] = &self->conn.super;

NetworkChannel *channel = (NetworkChannel *)&self->channel;
lf_ret_t ret = channel->open_connection(channel);
if (ret != LF_OK) {
printf("bind failed with %d\n", errno);
exit(1);
}
validate(ret == LF_OK);
printf("Sender: Bound\n");

// accept one connection
do {
ret = channel->try_connect(channel);
} while (ret != LF_OK);
validate(ret == LF_OK);
printf("Sender: Accepted\n");

self->serialize_hooks[0] = serialize_msg_t;

FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super, NULL, NULL, 0,
Expand Down
22 changes: 0 additions & 22 deletions examples/zephyr/basic_federated/federated_receiver1/src/receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,6 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
ConnRecv_ctor(&self->conn, parent);
TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_NUM, AF_INET, false);
self->inputs[0] = &self->conn.super;

NetworkChannel *chan = &self->chan.super;
chan->open_connection(chan);

LF_DEBUG(ENV, "Recv: Connecting");
lf_ret_t ret = LF_TRY_AGAIN;
while (ret != LF_OK) {
ret = chan->try_connect(chan);
switch (ret) {
case LF_OK:
break;
case LF_IN_PROGRESS:
case LF_TRY_AGAIN:
k_msleep(100);
break;
default:
printf("Sender: Could not accept\n");
exit(1);
break;
}
}
LF_DEBUG(ENV, "Recv: Connected");
self->deserialize_hooks[0] = deserialize_payload_default;

FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, (FederatedInputConnection **)&self->inputs,
Expand Down
23 changes: 0 additions & 23 deletions examples/zephyr/basic_federated/federated_receiver2/src/receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,7 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
ConnRecv_ctor(&self->conn, parent);
TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_NUM, AF_INET, false);
self->inputs[0] = &self->conn.super;

NetworkChannel *chan = &self->chan.super;
chan->open_connection(chan);

LF_DEBUG(ENV, "Recv: Connecting");
lf_ret_t ret = LF_TRY_AGAIN;
while (ret != LF_OK) {
ret = chan->try_connect(chan);
switch (ret) {
case LF_OK:
break;
case LF_IN_PROGRESS:
case LF_TRY_AGAIN:
k_msleep(100);
break;
default:
printf("Sender: Could not accept\n");
exit(1);
break;
}
}
LF_DEBUG(ENV, "Recv: Connected");
self->deserialize_hooks[0] = deserialize_payload_default;

FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, (FederatedInputConnection **)&self->inputs,
self->deserialize_hooks, 1, NULL, NULL, 0);
}
Expand Down
49 changes: 0 additions & 49 deletions examples/zephyr/basic_federated/federated_sender/src/sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,31 +121,6 @@ void SenderRecv1Bundle_ctor(SenderRecv1Bundle *self, Reactor *parent) {
TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_CONN_1, AF_INET, true);
ConnSender1_ctor(&self->conn, parent, &self->super);
self->output[0] = &self->conn.super;

NetworkChannel *chan = &self->chan.super;
lf_ret_t ret = chan->open_connection(chan);
validate(ret == LF_OK);
printf("Sender: Bound 1\n");

// accept one connection
ret = LF_TRY_AGAIN;
while (ret != LF_OK) {
ret = chan->try_connect(chan);
switch (ret) {
case LF_OK:
break;
case LF_IN_PROGRESS:
case LF_TRY_AGAIN:
k_msleep(100);
break;
break;
default:
printf("Sender: Could not accept\n");
exit(1);
break;
}
}
printf("Sender: Accepted 1\n");
self->serialize_hooks[0] = serialize_payload_default;

FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, NULL, NULL, 0,
Expand All @@ -156,30 +131,6 @@ void SenderRecv2Bundle_ctor(SenderRecv2Bundle *self, Reactor *parent) {
TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_CONN_2, AF_INET, true);
ConnSender2_ctor(&self->conn, parent, &self->super);
self->output[0] = &self->conn.super;

NetworkChannel *chan = &self->chan.super;
lf_ret_t ret = chan->open_connection(chan);
validate(ret == LF_OK);
printf("Sender: Bound 2\n");

// accept one connection
ret = LF_TRY_AGAIN;
while (ret != LF_OK) {
ret = chan->try_connect(chan);
switch (ret) {
case LF_OK:
break;
case LF_TRY_AGAIN:
k_msleep(100);
break;
default:
printf("Sender: Could not accept\n");
exit(1);
break;
}
}

printf("Sender: Accepted 2\n");
self->serialize_hooks[0] = serialize_payload_default;
FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, NULL, NULL, 0,
(FederatedOutputConnection **)&self->output, self->serialize_hooks, 1);
Expand Down
1 change: 1 addition & 0 deletions include/reactor-uc/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ typedef enum {
LF_NO_MEM,
LF_COULD_NOT_CONNECT,
LF_NETWORK_SETUP_FAILED,
LF_CONNECTION_CLOSED,
LF_TIMEOUT,
LF_TRY_AGAIN,
LF_IN_PROGRESS
Expand Down
3 changes: 3 additions & 0 deletions include/reactor-uc/federated.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct FederatedConnectionBundle {
serialize_hook *serialize_hooks;
size_t outputs_size;
bool server; // Does this federate work as server or client
void (*channel_disconnected)(FederatedConnectionBundle *);
};

void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel,
Expand All @@ -48,6 +49,8 @@ struct FederatedOutputConnection {
int conn_id;
};

void FederatedConnectionBundle_validate(FederatedConnectionBundle *bundle);

void FederatedOutputConnection_ctor(FederatedOutputConnection *self, Reactor *parent, FederatedConnectionBundle *bundle,
int conn_id, void *payload_buf, bool *payload_used_buf, size_t payload_size,
size_t payload_buf_capacity);
Expand Down
5 changes: 5 additions & 0 deletions include/reactor-uc/platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ struct Platform {
* does not need to be handled.
*/
lf_ret_t (*wait_until)(Platform *self, instant_t wakeup_time);
/**
* @brief Put system to sleep until the wakeup time. Asynchronous events
* does not need to be handled.
*/
lf_ret_t (*wait_for)(Platform *self, interval_t duration);
/**
* @brief Put the system to sleep until the wakeup time or until an
* asynchronous event occurs.
Expand Down
2 changes: 1 addition & 1 deletion include/reactor-uc/platform/posix/tcp_ip_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct TcpIpChannel {

int fd;
int client;
bool client_connect_in_progress;
NetworkChannelState state;

const char *host;
unsigned short port;
Expand Down
2 changes: 2 additions & 0 deletions include/reactor-uc/reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ struct Reactor {
void Reactor_ctor(Reactor *self, const char *name, Environment *env, Reactor *parent, Reactor **children,
size_t children_size, Reaction **reactions, size_t reactions_size, Trigger **triggers,
size_t triggers_size);

void Reactor_validate(Reactor *self);
#endif
53 changes: 52 additions & 1 deletion src/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,58 @@
#include <assert.h>
#include <inttypes.h>

void Environment_assemble(Environment *self) { validaten(self->main->calculate_levels(self->main)); }
void Environment_validate(Environment *self) {
Reactor_validate(self->main);
for (size_t i = 0; i < self->net_bundles_size; i++) {
FederatedConnectionBundle_validate(self->net_bundles[i]);
}
}

void Environment_assemble(Environment *self) {
lf_ret_t ret;
bool connected[self->net_bundles_size];

validaten(self->main->calculate_levels(self->main));

for (size_t i = 0; i < self->net_bundles_size; i++) {
connected[i] = false;
FederatedConnectionBundle *bundle = self->net_bundles[i];
NetworkChannel *chan = bundle->net_channel;
ret = chan->open_connection(chan);
validate(ret == LF_OK);
}

bool all_connected = false;
interval_t wait_before_retry = NEVER;
while (!all_connected) {
all_connected = true;
for (size_t i = 0; i < self->net_bundles_size; i++) {
FederatedConnectionBundle *bundle = self->net_bundles[i];
NetworkChannel *chan = bundle->net_channel;
if (!connected[i]) {
ret = chan->try_connect(chan);
switch (ret) {
case LF_OK:
connected[i] = true;
break;
case LF_IN_PROGRESS:
case LF_TRY_AGAIN:
if (chan->expected_try_connect_duration < wait_before_retry && chan->expected_try_connect_duration > 0) {
wait_before_retry = chan->expected_try_connect_duration;
}
all_connected = false;
break;
default:
throw("Could not connect to federate during assemble");
break;
}
}
}
if (!all_connected) {
self->platform->wait_for(self->platform, wait_before_retry);
}
}
}

void Environment_start(Environment *self) {
self->scheduler.acquire_and_schedule_start_tag(&self->scheduler);
Expand Down
Loading

0 comments on commit 9a3f83c

Please sign in to comment.