diff --git a/examples/riot/coap_channel/Makefile b/examples/riot/coap_channel/Makefile new file mode 100755 index 00000000..4e493e87 --- /dev/null +++ b/examples/riot/coap_channel/Makefile @@ -0,0 +1,28 @@ +# name of your application +APPLICATION = lf-test + +# If no BOARD is found in the environment, use this default: +BOARD ?= native + +# Comment this out to disable code in RIOT that does safety checking +# which is not needed in a production environment but helps in the +# development process: +DEVELHELP ?= 1 + +# Change this to 0 show compiler invocation lines by default: +QUIET ?= 1 + +# Enable reactor-uc features +# CFLAGS += -DNETWORK_CHANNEL_TCP_POSIX +CFLAGS += -DNETWORK_CHANNEL_COAP_RIOT +CFLAGS += -DEVENT_QUEUE_SIZE=32 +CFLAGS += -DREACTION_QUEUE_SIZE=32 + +USEMODULE += shell_cmds_default +USEMODULE += ps +USEMODULE += shell +USEMODULE += gnrc_icmpv6_echo +USEMODULE += ipv4_addr +USEMODULE += shell_cmd_gnrc_udp + +include $(CURDIR)/../../../make/riot/riot.mk diff --git a/examples/riot/coap_channel/coap_channel.c b/examples/riot/coap_channel/coap_channel.c new file mode 100644 index 00000000..86d9777e --- /dev/null +++ b/examples/riot/coap_channel/coap_channel.c @@ -0,0 +1,284 @@ +#include "coap_channel.h" +#include "reactor-uc/logging.h" +#include "reactor-uc/environment.h" + +#include +#include +#include +#include +#include + +#include "net/gcoap.h" +#include "net/ipv6/addr.h" +#include "net/ipv4/addr.h" +#include "net/sock/util.h" +#include "od.h" +#include "uri_parser.h" + +static bool _is_coap_initialized = false; +static Environment *_env; + +static CoapChannel *_get_coap_channel_by_remote(sock_udp_ep_t *remote) { + CoapChannel *channel; + for (size_t i = 0; i < _env->net_bundles_size; i++) { + channel = (CoapChannel *)_env->net_bundles[i]->net_channel; + + if (remote->family == AF_INET6) { + if (ipv6_addr_equal(&channel->remote.addr.ipv6, &remote->addr.ipv6)) { + return channel; + } + } else if (remote->family == AF_INET) { + if (ipv4_addr_equal(&channel->remote.addr.ipv4, &remote->addr.ipv4)) { + return channel; + } + } + } + + return NULL; +} + +// static void _resp_handler(const gcoap_request_memo_t *memo, coap_pkt_t *pdu, const sock_udp_ep_t *remote) { +// (void)remote; /* not interested in the source currently */ + +// if (memo->state == GCOAP_MEMO_TIMEOUT) { +// LF_DEBUG(NET, "gcoap: timeout for msg ID %02u\n", coap_get_id(pdu)); +// return; +// } + +// char *class_str = (coap_get_code_class(pdu) == COAP_CLASS_SUCCESS) ? "Success" : "Error"; +// LF_DEBUG(NET, "gcoap: response %s, code %1u.%02u", class_str, coap_get_code_class(pdu), coap_get_code_detail(pdu)); +// if (pdu->payload_len) { +// unsigned content_type = coap_get_content_type(pdu); +// if (content_type == COAP_FORMAT_TEXT || content_type == COAP_FORMAT_LINK || +// coap_get_code_class(pdu) == COAP_CLASS_CLIENT_FAILURE || +// coap_get_code_class(pdu) == COAP_CLASS_SERVER_FAILURE) { +// /* Expecting diagnostic payload in failure cases */ +// LF_DEBUG(NET, ", %u bytes\n%.*s\n", pdu->payload_len, pdu->payload_len, (char *)pdu->payload); +// } else { +// LF_DEBUG(NET, ", %u bytes\n", pdu->payload_len); +// od_hex_dump(pdu->payload, pdu->payload_len, OD_WIDTH_DEFAULT); +// } +// } else { +// LF_DEBUG(NET, ", empty payload\n"); +// } +// } + +static bool _send_coap_message(sock_udp_ep_t *remote, char *path, gcoap_resp_handler_t resp_handler) { + coap_pkt_t pdu; + uint8_t buf[CONFIG_GCOAP_PDU_BUF_SIZE]; + unsigned msg_type = COAP_TYPE_NON; + + gcoap_req_init(&pdu, &buf[0], CONFIG_GCOAP_PDU_BUF_SIZE, msg_type, path); + coap_hdr_set_type(pdu.hdr, msg_type); + ssize_t len = coap_opt_finish(&pdu, COAP_OPT_FINISH_NONE); + ssize_t bytes_sent = gcoap_req_send(buf, len, remote, NULL, resp_handler, NULL, GCOAP_SOCKET_TYPE_UDP); + LF_DEBUG(NET, "CoapChannel: %d", bytes_sent); + if (bytes_sent > 0) { + LF_DEBUG(NET, "CoapChannel: Successfully sent"); + return true; + } + + return false; +} + +static ssize_t _server_connect_handler(coap_pkt_t *pdu, uint8_t *buf, size_t len, coap_request_ctx_t *ctx) { + LF_DEBUG(NET, "CoapChannel: Server connect handler"); + CoapChannel *self = _get_coap_channel_by_remote(ctx->remote); + (void)self; + + // TODO: If state not open or already connected, then: return bool with FALSE and also handle this bool in client + // TODO: Also: if self is NULL, then there is no channel that this federate supports for the other federate => return + // false as well. + + // gcoap_resp_init(pdu, buf, len, COAP_CODE_CONTENT); + // coap_opt_add_format(pdu, COAP_FORMAT_TEXT); + // size_t resp_len = coap_opt_finish(pdu, COAP_OPT_FINISH_PAYLOAD); + + // if (self->connected_to_client) { + // // This server is already connected to a client + // return false; + // } + + // // Set as connected + // self->connected_to_client = true; + + // // Connect successful + // return true; + gcoap_resp_init(pdu, buf, len, COAP_CODE_CONTENT); + coap_opt_add_format(pdu, COAP_FORMAT_TEXT); + size_t resp_len = coap_opt_finish(pdu, COAP_OPT_FINISH_PAYLOAD); + + /* write the RIOT board name in the response buffer */ + if (pdu->payload_len >= strlen(RIOT_BOARD)) { + memcpy(pdu->payload, RIOT_BOARD, strlen(RIOT_BOARD)); + return resp_len + strlen(RIOT_BOARD); + } else { + puts("gcoap_cli: msg buffer too small"); + return gcoap_response(pdu, buf, len, COAP_CODE_INTERNAL_SERVER_ERROR); + } +} + +static ssize_t _server_disconnect_handler(coap_pkt_t *pdu, uint8_t *buf, size_t len, coap_request_ctx_t *ctx) { + LF_DEBUG(NET, "CoapChannel: Server disconnect handler"); + CoapChannel *self = _get_coap_channel_by_remote(ctx->remote); + + gcoap_resp_init(pdu, buf, len, COAP_CODE_CONTENT); + coap_opt_add_format(pdu, COAP_FORMAT_TEXT); + size_t resp_len = coap_opt_finish(pdu, COAP_OPT_FINISH_PAYLOAD); + + // Set as disconnect + self->state = NETWORK_CHANNEL_STATE_DISCONNECTED; + + // Disconnect successful + return true; +} + +static ssize_t _server_message_handler(coap_pkt_t *pdu, uint8_t *buf, size_t len, coap_request_ctx_t *ctx) { + LF_DEBUG(NET, "CoapChannel: Server message handler"); + CoapChannel *self = _get_coap_channel_by_remote(ctx->remote); + + gcoap_resp_init(pdu, buf, len, COAP_CODE_CONTENT); + coap_opt_add_format(pdu, COAP_FORMAT_TEXT); + size_t resp_len = coap_opt_finish(pdu, COAP_OPT_FINISH_PAYLOAD); + + // TODO + + return true; +} + +static const coap_resource_t _resources[] = { + {"/connect", COAP_GET, _server_connect_handler, NULL}, + {"/disconnect", COAP_GET, _server_disconnect_handler, NULL}, + {"/message", COAP_GET, _server_message_handler, NULL}, +}; + +static gcoap_listener_t _listener = {&_resources[0], ARRAY_SIZE(_resources), GCOAP_SOCKET_TYPE_UDP, NULL, NULL, NULL}; + +static lf_ret_t CoapChannel_open_connection(NetworkChannel *untyped_self) { + LF_DEBUG(NET, "CoapChannel: Open connection"); + CoapChannel *self = (CoapChannel *)untyped_self; + + /* Server */ + // Do nothing + + /* Client */ + // Do nothing + + self->state = NETWORK_CHANNEL_STATE_OPEN; +} + +static void _client_try_connect_callback(const gcoap_request_memo_t *memo, coap_pkt_t *pdu, + const sock_udp_ep_t *remote) { + LF_DEBUG(NET, "CoapChannel: Try connect callback"); + CoapChannel *self = _get_coap_channel_by_remote(remote); + + if (memo->state == GCOAP_MEMO_TIMEOUT || coap_get_code_class(pdu) != COAP_CLASS_SUCCESS) { + self->state = NETWORK_CHANNEL_STATE_DISCONNECTED; + return; + } + + // TODO: Analyse payload. The remote federate might respond with "false" it it is not open for connections or if it is + // already connected + + self->state = NETWORK_CHANNEL_STATE_CONNECTED; +} + +static lf_ret_t CoapChannel_try_connect(NetworkChannel *untyped_self) { + LF_DEBUG(NET, "CoapChannel: Try connect"); + CoapChannel *self = (CoapChannel *)untyped_self; + + /* Server */ + // Do nothing. Every CoAP client is also a server in our case. + // If the client successfully connected to the server of the other federate, + // then we consider the connection as established + // The other federate will also connect to our server and after that consider + // the connection to us as established. + + /* Client */ + switch (self->state) { + case NETWORK_CHANNEL_STATE_CONNECTED: + return LF_OK; + + case NETWORK_CHANNEL_STATE_OPEN: + if (!_send_coap_message(&self->remote, "/connect", _client_try_connect_callback)) { + LF_ERR(NET, "CoapChannel: try_connect: Failed to send CoAP message"); + return LF_ERR; + } + return LF_IN_PROGRESS; + + case NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS: + return LF_IN_PROGRESS; + + case NETWORK_CHANNEL_STATE_DISCONNECTED: + case NETWORK_CHANNEL_STATE_LOST_CONNECTION: + self->state = NETWORK_CHANNEL_STATE_OPEN; + return LF_TRY_AGAIN; + + case NETWORK_CHANNEL_STATE_UNINITIALIZED: + return LF_ERR; + } +} + +static void CoapChannel_close_connection(NetworkChannel *untyped_self) { + LF_DEBUG(NET, "CoapChannel: Close connection"); + CoapChannel *self = (CoapChannel *)untyped_self; + + // TODO +} + +static lf_ret_t CoapChannel_send_blocking(NetworkChannel *untyped_self, const FederateMessage *message) { + LF_DEBUG(NET, "CoapChannel: Send blocking"); + CoapChannel *self = (CoapChannel *)untyped_self; + + // TODO +} + +static void CoapChannel_register_receive_callback(NetworkChannel *untyped_self, + void (*receive_callback)(FederatedConnectionBundle *conn, + const FederateMessage *msg), + FederatedConnectionBundle *conn) { + LF_INFO(NET, "CoapChannel: Register receive callback"); + CoapChannel *self = (CoapChannel *)untyped_self; + + self->receive_callback = receive_callback; + self->federated_connection = conn; +} + +static void CoapChannel_free(NetworkChannel *untyped_self) { + LF_DEBUG(NET, "CoapChannel: Free"); + CoapChannel *self = (CoapChannel *)untyped_self; + + // TODO +} + +void CoapChannel_ctor(CoapChannel *self, Environment *env, const char *remote_host) { + // Initialize global coap server it not already done + if (!_is_coap_initialized) { + _is_coap_initialized = true; + + // Set environment + _env = env; + + // Initialize coap server + gcoap_register_listener(&_listener); + } + + // Super fields + self->super.open_connection = CoapChannel_open_connection; + self->super.try_connect = CoapChannel_try_connect; + self->super.close_connection = CoapChannel_close_connection; + self->super.send_blocking = CoapChannel_send_blocking; + self->super.register_receive_callback = CoapChannel_register_receive_callback; + self->super.free = CoapChannel_free; + + // Concrete fields + self->receive_callback = NULL; + self->federated_connection = NULL; + self->state = NETWORK_CHANNEL_STATE_UNINITIALIZED; + + // Convert host to udp socket + sock_udp_name2ep(&self->remote, remote_host); + if (self->remote.port == 0) { + self->remote.port = CONFIG_GCOAP_PORT; + } +} diff --git a/examples/riot/coap_channel/coap_channel.h b/examples/riot/coap_channel/coap_channel.h new file mode 100644 index 00000000..836dd768 --- /dev/null +++ b/examples/riot/coap_channel/coap_channel.h @@ -0,0 +1,33 @@ +#ifndef REACTOR_UC_COAP_CHANNEL_H +#define REACTOR_UC_COAP_CHANNEL_H +#include "reactor-uc/network_channel.h" +#include "reactor-uc/environment.h" +#include "net/sock/udp.h" + +// #define COAP_CHANNEL_BUFFERSIZE 1024 +// #define COAP_CHANNEL_NUM_RETRIES 255; +// #define COAP_CHANNEL_RECV_THREAD_STACK_SIZE 2048 +// #define COAP_CHANNEL_RECV_THREAD_STACK_GUARD_SIZE 128 + +typedef enum { + SERVER_COAP_CHANNEL, + CLIENT_COAP_CHANNEL, +} CoapChannelType; + +typedef struct CoapChannel CoapChannel; +typedef struct FederatedConnectionBundle FederatedConnectionBundle; + +struct CoapChannel { + NetworkChannel super; + + sock_udp_ep_t remote; + + NetworkChannelState state; + + FederatedConnectionBundle *federated_connection; + void (*receive_callback)(FederatedConnectionBundle *conn, const FederateMessage *message); +}; + +void CoapChannel_ctor(CoapChannel *self, Environment *env, const char *remote_host); + +#endif diff --git a/examples/riot/coap_channel/main.c b/examples/riot/coap_channel/main.c new file mode 100755 index 00000000..0a914f42 --- /dev/null +++ b/examples/riot/coap_channel/main.c @@ -0,0 +1,177 @@ +// #include "reactor-uc/reactor-uc.h" +// #include "coap_channel.h" +// #include + +// typedef struct { +// Reactor super; +// Reaction *_reactions[1]; +// Trigger *_triggers[1]; +// } MyReactor; + +// CoapChannel channel; + +// void MyReactor_ctor(MyReactor *self, Environment *env) { +// Reactor_ctor(&self->super, "MyReactor", env, NULL, NULL, 0, self->_reactions, 1, self->_triggers, 1); +// CoapChannel_ctor(&channel, &env, "127.0.0.1", 4444, "127.0.0.1", 5555); +// } + +// /* Application */ +// ENTRY_POINT(MyReactor, FOREVER, true) + +// int main() { +// lf_start(); +// return 0; +// } + +#include "reactor-uc/reactor-uc.h" +#include "coap_channel.h" + +#include +#include +#include +#include "msg.h" + +#include "net/gcoap.h" +#include "shell.h" + +#define REMOTE_HOST "[::1]" + +#define MAIN_QUEUE_SIZE (4) +static msg_t _main_msg_queue[MAIN_QUEUE_SIZE]; +static const shell_command_t shell_commands[] = {{NULL, NULL, NULL}}; + +typedef struct { + int size; + char msg[512]; +} lf_msg_t; + +size_t serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) { + const lf_msg_t *msg = user_struct; + + memcpy(msg_buf, &msg->size, sizeof(msg->size)); + memcpy(msg_buf + sizeof(msg->size), msg->msg, msg->size); + + return sizeof(msg->size) + msg->size; +} + +DEFINE_TIMER_STRUCT(Timer1, 1) +DEFINE_TIMER_CTOR_FIXED(Timer1, 1, MSEC(0), SEC(1)) +DEFINE_REACTION_STRUCT(Sender, 0, 1) +DEFINE_OUTPUT_PORT_STRUCT(Out, 1, 1) +DEFINE_OUTPUT_PORT_CTOR(Out, 1) + +typedef struct { + Reactor super; + Sender_Reaction0 reaction; + Timer1 timer; + Out out; + Reaction *_reactions[1]; + Trigger *_triggers[1]; +} Sender; + +DEFINE_REACTION_BODY(Sender, 0) { + Sender *self = (Sender *)_self->parent; + Environment *env = self->super.env; + Out *out = &self->out; + + printf("Timer triggered @ %" PRId64 "\n", env->get_elapsed_logical_time(env)); + lf_msg_t val; + strcpy(val.msg, "Hello From Sender"); + val.size = sizeof("Hello From Sender"); + lf_set(out, val); +} +DEFINE_REACTION_CTOR(Sender, 0) + +void Sender_ctor(Sender *self, Reactor *parent, Environment *env, Connection **conn_out, size_t conn_out_num) { + self->_reactions[0] = (Reaction *)&self->reaction; + self->_triggers[0] = (Trigger *)&self->timer; + Reactor_ctor(&self->super, "Sender", env, parent, NULL, 0, self->_reactions, 1, self->_triggers, 1); + Sender_Reaction0_ctor(&self->reaction, &self->super); + Timer_ctor(&self->timer.super, &self->super, 0, MSEC(100), self->timer.effects, 1); + Out_ctor(&self->out, &self->super, conn_out, conn_out_num); + TIMER_REGISTER_EFFECT(self->timer, self->reaction); + + // Register reaction as a source for out + OUTPUT_REGISTER_SOURCE(self->out, self->reaction); +} + +DEFINE_FEDERATED_OUTPUT_CONNECTION(ConnSender, lf_msg_t, 1) + +typedef struct { + FederatedConnectionBundle super; + CoapChannel channel; + ConnSender conn; + FederatedOutputConnection *output[1]; + serialize_hook serialize_hooks[1]; +} SenderRecvBundle; + +void SenderRecvConn_ctor(SenderRecvBundle *self, Environment *env, Sender *parent) { + CoapChannel_ctor(&self->channel, env, REMOTE_HOST); + ConnSender_ctor(&self->conn, &parent->super, &self->super); + self->output[0] = &self->conn.super; + self->serialize_hooks[0] = serialize_msg_t; + + FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super, NULL, NULL, 0, + (FederatedOutputConnection **)&self->output, self->serialize_hooks, 1); +} + +// Reactor main +typedef struct { + Reactor super; + Sender sender; + SenderRecvBundle bundle; + CoapChannel channel; + ConnSender conn; + FederatedConnectionBundle *_bundles[1]; + Reactor *_children[1]; + Connection *_conn_sender_out[1]; +} MainSender; + +void MainSender_ctor(MainSender *self, Environment *env) { + self->_children[0] = &self->sender.super; + Sender_ctor(&self->sender, &self->super, env, self->_conn_sender_out, 1); + + SenderRecvConn_ctor(&self->bundle, env, &self->sender); + self->_bundles[0] = &self->bundle.super; + CONN_REGISTER_UPSTREAM(self->bundle.conn, self->sender.out); + Reactor_ctor(&self->super, "MainSender", env, NULL, self->_children, 1, NULL, 0, NULL, 0); +} + +MainSender main_reactor; +Environment env; +void lf_exit(void) { Environment_free(&env); } + +void lf_start() { + Environment_ctor(&env, (Reactor *)&main_reactor); + env.scheduler.duration = ((interval_t)(1 * 1000000000LL)); + env.scheduler.keep_alive = 1; + env.scheduler.leader = 1; + env.has_async_events = 0; + env.enter_critical_section(&env); + MainSender_ctor(&main_reactor, &env); + env.net_bundles_size = 1; + env.net_bundles = (FederatedConnectionBundle **)&main_reactor._bundles; + // env.assemble(&env); + // env.leave_critical_section(&env); + // env.start(&env); + // lf_exit(); +} + +int main() { + lf_start(); + printf("%d\n", env.net_bundles_size); + NetworkChannel *channel = (NetworkChannel *)&main_reactor.bundle.channel; + + channel->open_connection(channel); + + while (channel->try_connect(channel) != LF_OK) { + LF_ERR(NET, "Connection not yet established"); + } + + LF_INFO(NET, "SUCCESS: All channels connected"); + + puts("All up, running the shell now"); + char line_buf[SHELL_DEFAULT_BUFSIZE]; + shell_run(shell_commands, line_buf, SHELL_DEFAULT_BUFSIZE); + return 0; +} diff --git a/include/reactor-uc/network_channel.h b/include/reactor-uc/network_channel.h index 452e5c7a..37340589 100644 --- a/include/reactor-uc/network_channel.h +++ b/include/reactor-uc/network_channel.h @@ -8,6 +8,7 @@ #include "proto/message.pb.h" #include "reactor-uc/tag.h" #include "reactor-uc/error.h" +#include "reactor-uc/federated.h" typedef enum { NETWORK_CHANNEL_STATE_UNINITIALIZED, diff --git a/make/riot/external_modules/reactor-uc/Makefile.dep b/make/riot/external_modules/reactor-uc/Makefile.dep index 43b5051c..dd437ed2 100644 --- a/make/riot/external_modules/reactor-uc/Makefile.dep +++ b/make/riot/external_modules/reactor-uc/Makefile.dep @@ -22,3 +22,13 @@ ifeq ($(filter -DNETWORK_CHANNEL_TCP_POSIX, $(CFLAGS)), -DNETWORK_CHANNEL_TCP_PO endif endif +# If Feature NETWORK_CHANNEL_COAP_RIOT is enabled +ifeq ($(filter -DNETWORK_CHANNEL_COAP_RIOT, $(CFLAGS)), -DNETWORK_CHANNEL_COAP_RIOT) + # Enable networking + USEMODULE += netdev_default + USEMODULE += auto_init_gnrc_netif + USEMODULE += gnrc_ipv6_default + + # Enable coap + USEMODULE += gcoap +endif