Skip to content

Commit

Permalink
Merge pull request #558 from elfenpiff/iox2-555-health-monitoring-exa…
Browse files Browse the repository at this point in the history
…mple-cxx

[#555] health monitoring example cxx
  • Loading branch information
elfenpiff authored Dec 20, 2024
2 parents 750ac04 + 7c0d189 commit b1aef99
Show file tree
Hide file tree
Showing 35 changed files with 723 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ jobs:
- name: Build language bindings
# NOTE: the cmake command is in a single line since Windows complains about breaking up lines with '\'
run: |
cmake -S . -B target/ffi/build -DBUILD_EXAMPLES=ON -DBUILD_TESTING=ON ${{ matrix.mode.cmake-build-type }} ${{ matrix.mode.cmake-cxx-flags }} ${{ matrix.cmake-build-system-generator }} -DCMAKE_INSTALL_PREFIX=target/ffi/install -DCMAKE_PREFIX_PATH="${{ github.workspace }}/target/iceoryx/install" -DRUST_BUILD_ARTIFACT_PATH="${{ github.workspace }}/target/${{ matrix.mode.name }}"
cmake -S . -B target/ffi/build -DBUILD_EXAMPLES=ON -DBUILD_TESTING=ON -DWARNING_AS_ERROR=ON ${{ matrix.mode.cmake-build-type }} ${{ matrix.mode.cmake-cxx-flags }} ${{ matrix.cmake-build-system-generator }} -DCMAKE_INSTALL_PREFIX=target/ffi/install -DCMAKE_PREFIX_PATH="${{ github.workspace }}/target/iceoryx/install" -DRUST_BUILD_ARTIFACT_PATH="${{ github.workspace }}/target/${{ matrix.mode.name }}"
cmake --build target/ffi/build ${{ matrix.mode.cmake-build-config }}
cmake --install target/ffi/build ${{ matrix.mode.cmake-build-config }}
Expand Down
16 changes: 16 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ add_option(
DEFAULT_VALUE OFF
)

add_option(
NAME WARNING_AS_ERROR
DESCRIPTION "Fails if the compiler emits a warning"
DEFAULT_VALUE OFF
)

add_param(
NAME RUST_BUILD_ARTIFACT_PATH
DESCRIPTION "The path to the folder with the Rust build artifacts, e.g. '/full/path/to/iceoryx2/target/release'"
Expand Down Expand Up @@ -110,6 +116,16 @@ add_rust_feature(
RUST_FEATURE "iceoryx2/logger_tracing"
)

if(WARNING_AS_ERROR)
if(WIN32)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
else()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Wall -Wextra -Wpedantic")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wall -Wextra -Wpedantic")
endif()
endif()

if(SANITIZERS)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=address -fsanitize=undefined")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address -fsanitize=undefined")
Expand Down
2 changes: 1 addition & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ These types are demonstrated in the complex data types example.
| event | [C](c/event) [C++](cxx/event) [Rust](rust/event) | Push notifications - send event signals to wakeup processes that are waiting for them. |
| event based communication | [C++](cxx/event_based_communication) [Rust](rust/event_based_communication) | Define multiple events like publisher/subscriber created or removed, send sample, received sample, deliver history etc. and react on them for a fully event driven communication setup. |
| event multiplexing | [C](c/event_multiplexing) [C++](cxx/event_multiplexing) [Rust](rust/event_multiplexing) | Wait on multiple listeners or sockets with a single call. The WaitSet demultiplexes incoming events and notifies the user. |
| health monitoring | [Rust](rust/health_monitoring) | A central daemon creates the communication resources and monitors all nodes. When the central daemon crashes other nodes can take over and use the decentral API to monitor the nodes. |
| health monitoring | [C++](cxx/health_monitoring) [Rust](rust/health_monitoring) | A central daemon creates the communication resources and monitors all nodes. When the central daemon crashes other nodes can take over and use the decentral API to monitor the nodes. |
| publish subscribe | [C](c/publish_subscribe) [C++](cxx/publish_subscribe) [Rust](rust/publish_subscribe) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern). |
| publish subscribe dynamic data | [C++](cxx/publish_subscribe_dynamic_data) [Rust](rust/publish_subscribe_dynamic_data) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern) and payload data that has a dynamic size. |
| publish subscribe with user header | [C](c/publish_subscribe_with_user_header) [C++](cxx/publish_subscribe_with_user_header) [Rust](rust/publish_subscribe_with_user_header) | Add a user header to the payload (samples) to transfer additional information. |
Expand Down
1 change: 1 addition & 0 deletions examples/c/discovery/src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <stdio.h>

iox2_callback_progression_e list_callback(const iox2_static_config_t* static_details, void* callback_context) {
(void) callback_context;
printf("Found Service: %s, ServiceID: %s\n", static_details->name, static_details->id);
return iox2_callback_progression_e_CONTINUE;
}
Expand Down
1 change: 1 addition & 0 deletions examples/c/domains/src/discovery.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <stdio.h>

iox2_callback_progression_e list_callback(const iox2_static_config_t* static_details, void* callback_context) {
(void) callback_context;
printf("Found Service: %s, ServiceID: %s\n", static_details->name, static_details->id);
return iox2_callback_progression_e_CONTINUE;
}
Expand Down
3 changes: 0 additions & 3 deletions examples/c/domains/src/subscriber.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,8 @@ int main(int argc, char** argv) {
goto drop_service;
}

uint64_t counter = 0;
printf("subscribed to: [domain: \"%s\", service: \"%s\"]\n", argv[1], argv[2]);
while (iox2_node_wait(&node_handle, 1, 0) == IOX2_OK) {
counter += 1;

// receive sample
iox2_sample_h sample = NULL;
if (iox2_subscriber_receive(&subscriber, NULL, &sample) != IOX2_OK) {
Expand Down
2 changes: 1 addition & 1 deletion examples/c/event_multiplexing/src/notifier.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ int main(int argc, char** argv) {
iox2_port_factory_event_h service = NULL;
if (iox2_service_builder_event_open_or_create(service_builder_event, NULL, &service) != IOX2_OK) {
printf("Unable to create service!\n");
goto drop_node;
goto drop_service_name;
}

// create notifier
Expand Down
3 changes: 0 additions & 3 deletions examples/c/publish_subscribe/src/subscriber.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ int main(void) {
goto drop_service;
}

uint64_t counter = 0;
while (iox2_node_wait(&node_handle, 1, 0) == IOX2_OK) {
counter += 1;

// receive sample
iox2_sample_h sample = NULL;
if (iox2_subscriber_receive(&subscriber, NULL, &sample) != IOX2_OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ int main(void) {
goto drop_service;
}

uint64_t counter = 0;
while (iox2_node_wait(&node_handle, 1, 0) == IOX2_OK) {
counter += 1;

// receive sample
iox2_sample_h sample = NULL;
if (iox2_subscriber_receive(&subscriber, NULL, &sample) != IOX2_OK) {
Expand All @@ -107,7 +104,7 @@ int main(void) {
printf("received: %lu, user_header: version = %d, timestamp = %lu\n",
(long unsigned) *payload,
user_header->version,
user_header->timestamp);
(long unsigned) user_header->timestamp);
iox2_sample_drop(sample);
}
}
Expand Down
1 change: 1 addition & 0 deletions examples/cxx/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ add_subdirectory(domains)
add_subdirectory(event)
add_subdirectory(event_multiplexing)
add_subdirectory(event_based_communication)
add_subdirectory(health_monitoring)
add_subdirectory(publish_subscribe)
add_subdirectory(publish_subscribe_dynamic_data)
add_subdirectory(publish_subscribe_with_user_header)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ enum class PubSubEvent : uint8_t {
SentSample = 4,
ReceivedSample = 5,
SentHistory = 6,
Unknown
Unknown = 7
};

namespace iox {
Expand Down
3 changes: 0 additions & 3 deletions examples/cxx/event_multiplexing/src/wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
#include "iox2/service_type.hpp"
#include "iox2/waitset.hpp"

constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1);

// NOLINTBEGIN
struct Args {
IOX_CLI_DEFINITION(Args);
Expand All @@ -42,7 +40,6 @@ auto main(int argc, char** argv) -> int {
using namespace iox2;
auto args = Args::parse(argc, argv, "Notifier of the event multiplexing example.");

auto event_id = EventId(args.event_id());
auto service_name_1 = ServiceName::create(args.service1().c_str()).expect("valid service name");
auto service_name_2 = ServiceName::create(args.service2().c_str()).expect("valid service name");

Expand Down
61 changes: 61 additions & 0 deletions examples/cxx/health_monitoring/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright (c) 2024 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache Software License 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
# which is available at https://opensource.org/licenses/MIT.
#
# SPDX-License-Identifier: Apache-2.0 OR MIT

load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library")

cc_binary(
name = "example_cxx_health_monitoring_central_daemon",
srcs = [
"src/central_daemon.cpp",
"src/pubsub_event.hpp"
],
deps = [
"@iceoryx//:iceoryx_hoofs",
"//:iceoryx2-cxx-static",
],
)

cc_binary(
name = "example_cxx_health_monitoring_publisher_1",
srcs = [
"src/publisher_1.cpp",
"src/pubsub_event.hpp"
],
deps = [
"@iceoryx//:iceoryx_hoofs",
"//:iceoryx2-cxx-static",
],
)

cc_binary(
name = "example_cxx_health_monitoring_publisher_2",
srcs = [
"src/publisher_2.cpp",
"src/pubsub_event.hpp"
],
deps = [
"@iceoryx//:iceoryx_hoofs",
"//:iceoryx2-cxx-static",
],
)

cc_binary(
name = "example_cxx_health_monitoring_subscriber",
srcs = [
"src/subscriber.cpp",
"src/pubsub_event.hpp"
],
deps = [
"@iceoryx//:iceoryx_hoofs",
"//:iceoryx2-cxx-static",
],
)
28 changes: 28 additions & 0 deletions examples/cxx/health_monitoring/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright (c) 2024 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache Software License 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
# which is available at https://opensource.org/licenses/MIT.
#
# SPDX-License-Identifier: Apache-2.0 OR MIT

cmake_minimum_required(VERSION 3.22)
project(example_cxx_health_monitoring LANGUAGES CXX)

find_package(iceoryx2-cxx 0.4.1 REQUIRED)

add_executable(example_cxx_health_monitoring_central_daemon src/central_daemon.cpp)
target_link_libraries(example_cxx_health_monitoring_central_daemon iceoryx2-cxx::static-lib-cxx)

add_executable(example_cxx_health_monitoring_publisher_1 src/publisher_1.cpp)
target_link_libraries(example_cxx_health_monitoring_publisher_1 iceoryx2-cxx::static-lib-cxx)

add_executable(example_cxx_health_monitoring_publisher_2 src/publisher_2.cpp)
target_link_libraries(example_cxx_health_monitoring_publisher_2 iceoryx2-cxx::static-lib-cxx)

add_executable(example_cxx_health_monitoring_subscriber src/subscriber.cpp)
target_link_libraries(example_cxx_health_monitoring_subscriber iceoryx2-cxx::static-lib-cxx)
140 changes: 140 additions & 0 deletions examples/cxx/health_monitoring/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Health Monitoring

Before proceeding, all dependencies need to be installed. You can find
instructions in the [C++ Examples Readme](../README.md).

This example demonstrates how to create a robust system using iceoryx2.
A central daemon pre-creates all communication resources to ensure that every
required resource, such as memory, is available as soon as the application
starts.
Additionally, the subscriber is immediately informed if one of the processes
it depends on has crashed. Even if the central daemon itself crashes,
communication can continue without any restrictions. Thanks to the
decentralized API of iceoryx2, the subscriber can take over the role of the
central daemon and continue monitoring all processes.

The communication must also be reliable, and we expect publishers to provide
updates at regular intervals. If a publisher misses a deadline, we want to be
informed immediately. This situation can occur if the system is under heavy
load or if a process has crashed.

This example is more advanced and consists of four components:

* `central_daemon` - Must run first. It creates all communication resources and
monitors all nodes/processes.
* `publisher_1` - Sends data at a specific frequency on `service_1`.
* `publisher_2` - Sends data at a specific frequency on `service_2`.
* `subscriber` - Connects to `service_1` and `service_2` and expects new samples
within a specific time. If no sample arrives, it proactively checks for dead
nodes.

```ascii
+----------------+ creates ...........................
| central_daemon | ----------> : communication resources :
+----------------+ ...........................
| ^
| opens |
| +-----------------+--------------+
| | | |
| +-------------+ +-------------+ +------------+
| | publisher_1 | | publisher_2 | | subscriber |
| +-------------+ +-------------+ +------------+
| ^ ^ ^
| monitores | | |
+-------------+-------------------+-----------------+
```

## Running The Example

> [!CAUTION]
> Every payload you transmit with iceoryx2 must be compatible with shared
> memory. Specifically, it must:
>
> * be self contained, no heap, no pointers to external sources
> * have a uniform memory representation -> `#[repr(C)]`
> * not use pointers to manage their internal structure
>
> Data types like `String` or `Vec` will cause undefined behavior and may
> result in segmentation faults. We provide alternative data types that are
> compatible with shared memory. See the
> [complex data type example](../complex_data_types) for guidance on how to
> use them.
First you have to build the C++ examples:

```sh
cmake -S . -B target/ffi/build -DBUILD_EXAMPLES=ON
cmake --build target/ffi/build
```

For this example, you need to open five separate terminals.

## Terminal 1: Central Daemon - Create All Communication Resources

Run the central daemon, which sets up all communication resources and monitors
processes.

```sh
./target/ffi/build/examples/cxx/health_monitoring/example_cxx_health_monitoring_central_daemon
```

## Terminal 2: Publisher 1

Run the first publisher, which sends data on `service_1`.

```sh
./target/ffi/build/examples/cxx/health_monitoring/example_cxx_health_monitoring_publisher_1
```

## Terminal 3: Publisher 2

Run the second publisher, which sends data on `service_2`.

```sh
./target/ffi/build/examples/cxx/health_monitoring/example_cxx_health_monitoring_publisher_2
```

## Terminal 4: Subscriber

Run the subscriber, which listens to both `service_1` and `service_2`.

```sh
./target/ffi/build/examples/cxx/health_monitoring/example_cxx_health_monitoring_subscriber
```

## Terminal 5: Simulate Process Crashes

Send a `SIGKILL` signal to `publisher_1` to simulate a fatal crash. This
ensures that the process is unable to clean up any resources.

```sh
killall -9 example_cxx_health_monitoring_publisher_1
```

After running this command:

1. The `central_daemon` will detect that the process has crashed and print:
```ascii
detected dead node: Some(NodeName { value: "publisher 1" })
```
The event service is configured to emit a `PubSub::ProcessDied` event when a
process is identified as dead.

2. On the `subscriber` side, you will see the message:
```ascii
ServiceName { value: "service_1" }: process died!
```

3. Since `publisher_1` is no longer sending messages, the subscriber will also
regularly print another message indicating that `service_1` has violated
the contract because no new samples are being received.

Feel free to run multiple instances of publisher or subscriber processes
simultaneously to explore how iceoryx2 handles publisher-subscriber
communication efficiently.

You may hit the maximum supported number of ports when too many publisher or
subscriber processes run. Take a look at the [iceoryx2 config](../../../config)
to set the limits globally or at the
[API of the Service builder](https://docs.rs/iceoryx2/latest/iceoryx2/service/index.html)
to set them for a single service.
Loading

0 comments on commit b1aef99

Please sign in to comment.