Skip to content

Latest commit

 

History

History
 
 

waitset

WaitSet

Introduction

The WaitSet is a set where you can attach objects so that they can signal a wide variety of events to one single notifyable. The typical approach is that one creates a WaitSet attaches multiple subscribers, user trigger or other Triggerables to it and then wait till one or many of the attached entities signal an event. If that happens one receives a list of EventInfos which is corresponding to all occurred events.

WaitSet events can be state based, this means that the WaitSet will notify you till you reset the state. The HAS_DATA event of the subscriber for instance will notify you as long as there are samples. But it is also possible that one attaches one shot events. These are events which will trigger the WaitSet only once.

Expected output

Threadsafety

The WaitSet is not threadsafe!

  • It is not allowed to attach or detach Triggerable classes with attachEvent or detachEvent when another thread is currently waiting for events with wait.

The TriggerHandle on the other hand is threadsafe! Therefore you are allowed to attach/detach a TriggerHandle to a Triggerable while another thread may trigger the TriggerHandle.

Glossary

  • EventCallback a callback attached to an EventInfo. It must have the following signature void ( EventOrigin ). Any free function, static class method and non capturing lambda is allowed. You have to ensure the lifetime of that callback. This can become important when you would like to use lambdas.
  • EventId an id which is tagged to an event. It does not need to be unique or follow any restrictions. The user can choose any arbitrary uint64_t. Assigning the same EventId to multiple Events can be useful when you would like to group Events.
  • EventInfo a class which corresponds with Triggers and is used to inform the user which Event occurred. You can use the EventInfo to acquire the EventId, call the EventCallback or acquire the EventOrigin.
  • EventOrigin the pointer to the class where the Event originated from, short pointer to the Triggerable.
  • Events a Triggerable will signal an event via a TriggerHandle to a Notifyable. For instance one can attach the subscriber event HAS_DATA to WaitSet. This will cause the subscriber to notify the WaitSet via the TriggerHandle everytime when a sample was received.
  • Notifyable is a class which listens to events. A TriggerHandle which corresponds to a Trigger is used to notify the Notifyable that an event occurred. The WaitSet is a Notifyable.
  • Trigger a class which is used by the Notifyable to acquire the information which events were signalled. It corresponds to a TriggerHandle. If the Notifyable goes out of scope the corresponding TriggerHandle will be invalidated and if the Triggerable goes out of scope the corresponding Trigger will be invalidated.
  • Triggerable a class which has attached a TriggerHandle to itself to signal certain Events to a Notifyable.
  • TriggerHandle a threadsafe class which can be used to trigger a Notifyable. If a TriggerHandle goes out of scope it will detach itself from the Notifyable. A TriggerHandle is logical equal to another Trigger if they:
    • are attached to the same Notifyable (or in other words they are using the same ConditionVariable)
    • they have the same EventOrigin
    • they have the same callback to verify that they were triggered (hasEventCallback)
    • they have the same EventId
  • WaitSet a Notifyable which manages a set of Triggers which are corresponding to Events. A user may attach or detach events. The Waitset listens to the whole set of Triggers and if one or more Triggers are triggered by an event it will notify the user. If a WaitSet goes out of scope all attached Triggers will be invalidated.

Quick Overview

To a Notifyable like the WaitSet Events can be attached or detached. The WaitSet will listen on Triggers for a signal that an Event has occurred and it hands out TriggerHandles to Triggerable objects. The TriggerHandle is used to inform the WaitSet about the occurrence of an Event. When returning from WaitSet::wait() the user is provided with a vector of EventInfos associated with Events which had occurred. The EventOrigin, EventId and EventCallback are stored inside of the EventInfo and can be acquired by the user.

Reference

task call
attach subscriber to a WaitSet waitset.attachEvent(subscriber, iox::popo::SubscriberEvent::HAS_DATA, 123, &mySubscriberCallback)
attach user trigger to a WaitSet waitset.attachEvent(userTrigger, 456, &myUserTriggerCallback)
wait for triggers auto triggerVector = myWaitSet.wait();
wait for triggers with timeout auto triggerVector = myWaitSet.timedWait(1_s);
check if event originated from some object event.doesOriginateFrom(ptrToSomeObject)
get id of the event event.getEventId()
call eventCallback event()
acquire EventOrigin event.getOrigin<OriginType>();

Use cases

This example consists of 5 use cases.

  1. ice_waitset_gateway.cpp: We build a gateway to forward data to another network. A list of subscribers is handled in an uniform way by defining a callback and which is executed for every subscriber who has received data.

  2. ice_waitset_grouping: We would like to group multiple subscribers into 2 distinct groups and handle them according to their group membership.

  3. ice_waitset_individual: A list of subscribers where every subscriber is handled differently.

  4. ice_waitset_sync: We use the WaitSet to trigger a cyclic call which should execute an algorithm every 100ms.

  5. ice_waitset_trigger: We create our own class which can be attached to a WaitSet to signal events.

Examples

All our examples require a running iox-roudi and some data to receive which will be send by iox-ex-waitset-publisher. The publisher does not contain any WaitSet specific logic and is explained in detail in the icedelivery example.

Gateway

We have a list of subscribers which can be subscribed to any arbitrary topic and everytime we received a sample we would like to send the bytestream to a socket, write it into a file or print it to the console. But whatever we choose to do we perform the same task for all the subscribers.

Let's start by implementing our callback which prints the subscriber pointer, the payload size and the payload pointer to the console.

void subscriberCallback(iox::popo::UntypedSubscriber* const subscriber)
{
    subscriber->take().and_then([&](iox::popo::Sample<const void>& sample) {
        std::cout << "subscriber: " << std::hex << subscriber << " length: " << std::dec
                  << sample.getHeader()->payloadSize << " ptr: " << std::hex << sample.getHeader()->payload()
                  << std::endl;
    });
}

An Event always requires a callback which has the following signature void (EventOrigin). In our example the EventOrigin is a iox::popo::UntypedSubscriber pointer which we use to acquire the latest sample by calling take(). When take() was successful we print our message to the console inside of the and_then lambda.

In our main function we create a WaitSet which has storage capacity for 5 events, 4 subscribers and one shutdown trigger, after we registered us at our central broker RouDi. Then we attach our shutdownTrigger to handle CTRL+c events.

iox::popo::WaitSet waitset<NUMBER_OF_SUBSCRIBERS + ONE_SHUTDOWN_TRIGGER>;

waitset.attachEvent(shutdownTrigger).or_else([](auto) {
    std::cerr << "failed to attach shutdown trigger" << std::endl;
    std::terminate();
});

After that we create a vector to hold our subscribers, we create and then attach them to a WaitSet with the HAS_DATA event and the subscriberCallback. Everytime one of the subscribers is receiving a new sample it will trigger the WaitSet.

iox::cxx::vector<iox::popo::UntypedSubscriber, NUMBER_OF_SUBSCRIBERS> subscriberVector;
for (auto i = 0; i < NUMBER_OF_SUBSCRIBERS; ++i)
{
    subscriberVector.emplace_back(iox::capro::ServiceDescription{"Radar", "FrontLeft", "Counter"});
    auto& subscriber = subscriberVector.back();

    waitset.attachEvent(subscriber, iox::popo::SubscriberEvent::HAS_DATA, 0, &subscriberCallback)
        .or_else([&](auto) {
            std::cerr << "failed to attach subscriber" << i << std::endl;
            std::terminate();
        });
}

attachEvent is returning a cxx::expected which informs us if attaching the event succeeded. In the .or_else([&](auto){/*...*/}) part we perform the error handling if attachEvent failed.

Now our system is prepared and ready to work. We enter the event loop which starts with a call to our WaitSet (waitset.wait()). This call will block until one or more events triggered the WaitSet. After the call returned we get a vector filled with EventInfos which are corresponding to all the events which triggered the WaitSet.

We iterate through this vector, if an Event originated from the shutdownTrigger we exit the program otherwise we just call the assigned callback by calling the trigger. This will then call subscriberCallback with the EventOrigin (the pointer to the untyped subscriber) as parameter.

while (true)
{
    auto eventVector = waitset.wait();

    for (auto& event : eventVector)
    {
        if (event->doesOriginateFrom(&shutdownTrigger))
        {
            return (EXIT_SUCCESS);
        }
        else
        {
            (*event)();
        }
    }

Grouping

In our next use case we would like to divide the subscribers into two groups and we do not want to attach a callback to them. Instead we perform the calls on the subscribers directly.

We again start by creating a WaitSet with a capacity of 5 (4 subscribers and 1 shutdownTrigger), and attach the shutdownTrigger to handle CTRL+c.

iox::popo::WaitSet<NUMBER_OF_SUBSCRIBERS + ONE_SHUTDOWN_TRIGGER> waitset;

waitset.attachEvent(shutdownTrigger).or_else([](auto) {
    std::cerr << "failed to attach shutdown trigger" << std::endl;
    std::terminate();
});

Now we create a vector of 4 subscribers.

iox::cxx::vector<iox::popo::UntypedSubscriber, NUMBER_OF_SUBSCRIBERS> subscriberVector;
for (auto i = 0; i < NUMBER_OF_SUBSCRIBERS; ++i)
{
    subscriberVector.emplace_back(iox::capro::ServiceDescription{"Radar", "FrontLeft", "Counter"});
    auto& subscriber = subscriberVector.back();
}

After that we define our two groups with the ids FIRST_GROUP_ID and SECOND_GROUP_ID and attach the first two subscribers to the first group and the remaining subscribers to the second group.

for (auto i = 0; i < NUMBER_OF_SUBSCRIBERS / 2; ++i)
{
    waitset.attachEvent(subscriberVector[i], iox::popo::SubscriberEvent::HAS_DATA, FIRST_GROUP_ID)
        .or_else([&](auto) {
            std::cerr << "failed to attach subscriber" << i << std::endl;
            std::terminate();
        });
}

for (auto i = NUMBER_OF_SUBSCRIBERS / 2; i < NUMBER_OF_SUBSCRIBERS; ++i)
{
    waitset.attachEvent(subscriberVector[i], iox::popo::SubscriberEvent::HAS_DATA, SECOND_GROUP_ID)
        .or_else([&](auto) {
            std::cerr << "failed to attach subscriber" << i << std::endl;
            std::terminate();
        });
}

The event loop calls auto eventVector = waitset.wait() in a blocking call to receive a vector of all the EventInfos which are corresponding to the occurred events. If the Event originated from the shutdownTrigger we terminate the program.

while (true)
{
    auto eventVector = waitset.wait();

    for (auto& event : eventVector)
    {
        if (event->doesOriginateFrom(&shutdownTrigger))
        {
            return (EXIT_SUCCESS);
        }

The remaining part of the loop is handling the subscribers. In the first group we would like to print the received data to the console and in the second group we just dismiss the received data.

    else if (event->getEventId() == FIRST_GROUP_ID)
    {
        auto subscriber = event->getOrigin<iox::popo::UntypedSubscriber>();
        subscriber->take().and_then([&](iox::popo::Sample<const void>& sample) {
            const CounterTopic* data = reinterpret_cast<const CounterTopic*>(sample.get());
            std::cout << "received: " << std::dec << data->counter << std::endl;
        });
    }
    else if (event->getEventId() == SECOND_GROUP_ID)
    {
        std::cout << "dismiss data\n";
        auto subscriber = event->getOrigin<iox::popo::UntypedSubscriber>();
        subscriber->releaseQueuedData();
    }

Important The second group needs to release all queued samples otherwise the WaitSet would notify the user again and again that the subscriber from the second group has new samples.

Individual

When every Triggerable requires a different reaction we need to know the origin of an Event. We can call event.doesOriginateFrom(EventOrigin) which will return true if the event originated from EventOrigin and otherwise false.

We start this example by creating a WaitSet with the default capacity and attaching the shutdownTrigger to handle CTRL-c.

iox::popo::WaitSet waitset<>;

waitset.attachEvent(shutdownTrigger).or_else([](auto) {
        std::cerr << "failed to attach shutdown trigger" << std::endl;
        std::terminate();
    });

Additionally, we create two subscribers and attach them to the waitset to let them inform us whenever they receive a new sample.

iox::popo::Subscriber<CounterTopic> subscriber1({"Radar", "FrontLeft", "Counter"});
iox::popo::Subscriber<CounterTopic> subscriber2({"Radar", "FrontLeft", "Counter"});

waitset.attachEvent(subscriber1, iox::popo::SubscriberEvent::HAS_DATA).or_else([](auto) {
    std::cerr << "failed to attach subscriber1" << std::endl;
    std::terminate();
});
waitset.attachEvent(subscriber2, iox::popo::SubscriberEvent::HAS_DATA).or_else([](auto) {
    std::cerr << "failed to attach subscriber2" << std::endl;
    std::terminate();
});

With that set up we enter the event loop and handle the program termination first.

while (true)
{
    auto eventVector = waitset.wait();

    for (auto& event : eventVector)
    {
        if (event->doesOriginateFrom(&shutdownTrigger))
        {
            return (EXIT_SUCCESS);
        }

When the origin is subscriber1 we would like to state that subscriber 1 has received the following number X. But for subscriber2 we just dismiss the received samples. We accomplish this by asking the event if it originated from the corresponding subscriber. If so we act.

        else if (event->doesOriginateFrom(&subscriber1))
        {
            subscriber1.take().and_then([&](iox::popo::Sample<const CounterTopic>& sample) {
                std::cout << " subscriber 1 received: " << sample->counter << std::endl;
            });
        }
        if (event->doesOriginateFrom(&subscriber2))
        {
            subscriber2.releaseQueuedData();
            std::cout << "subscriber 2 received something - dont care\n";
        }

Sync

Let's say we have SomeClass and would like to execute a cyclic static method cyclicRun every second. We could execute any arbitrary algorithm in there but for now we just print activation callback. The class could look like

class SomeClass
{
  public:
    static void cyclicRun(iox::popo::UserTrigger*)
    {
        std::cout << "activation callback\n";
        trigger->resetTrigger();
    }
};

Important We need to reset the user trigger otherwise the WaitSet would notify us immediately again since the user trigger is state based.

We begin as always, by creating a WaitSet with the default capacity and by attaching the shutdownTrigger to it. In this case we do not set an event id when calling attachEvent which means the default event id EventInfo::INVALID_ID is set.

iox::popo::WaitSet<> waitset;

// attach shutdownTrigger to handle CTRL+C
waitset.attachEvent(shutdownTrigger).or_else([](auto) {
    std::cerr << "failed to attach shutdown trigger" << std::endl;
    std::terminate();
});

After that we require a cyclicTrigger to trigger our cyclicRun every second. Therefore, we attach it to the waitset with eventId 0 and the callback SomeClass::cyclicRun

iox::popo::UserTrigger cyclicTrigger;
waitset.attachEvent(cyclicTrigger, 0U, &SomeClass::cyclicRun).or_else([](auto) {
    std::cerr << "failed to attach cyclic trigger" << std::endl;
    std::terminate();
});

The next thing we need is something which will trigger our cyclicTrigger every second. We use a infinite loop packed inside of a thread.

std::thread cyclicTriggerThread([&] {
    while (keepRunning.load())
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        cyclicTrigger.trigger();
    }
});

Everything is set up and we can implement the event loop. As usual we handle CTRL-c which is indicated by the shutdownTrigger.

while (true)
{
    auto eventVector = waitset.wait();
    
    for (auto& event : eventVector)
    {
        if (event->doesOriginateFrom(&shutdownTrigger))
        {
            keepRunning.store(false);
        }

The cyclicTrigger callback is called in the else part.

        else
        {
            (*event)();
        }

Trigger

In this example we describe how you would implement a Triggerable class which can be attached to a WaitSet. Our class in this example will be called MyTriggerClass and it signals the WaitSet two events. The PERFORMED_ACTION event which is triggered whenever the method performAction is called and the ACTIVATE event which is triggered when activate is called with an activationCode.

MyTriggerClass

At the moment the WaitSet does not support Triggerable classes which are movable or copyable. This is caused by the resetCallback and the hasEventCallback which are pointing to the Triggerable. After a move the callbacks inside of the WaitSet would point to the wrong memory location and a copy could lead to an unattached object if there is no more space left in the WaitSet. Therefore we have to delete the move and copy operations for now.

    MyTriggerClass(const MyTriggerClass&) = delete;
    MyTriggerClass(MyTriggerClass&&) = delete;
    MyTriggerClass& operator=(const MyTriggerClass&) = delete;
    MyTriggerClass& operator=(MyTriggerClass&&) = delete;

The class implementation of these two methods could look like the following.

class MyTriggerClass
{
  public:
    void activate(const int activationCode) noexcept
    {
        m_activationCode = activationCode;
        m_isActivated = true;
        m_activateTrigger.trigger();
    }

    void performAction() noexcept
    {
        m_hasPerformedAction = true;
        m_onActionTrigger.trigger();
    }

As you can see we perform some internal action and when they are finished we signal the corresponding Trigger via our stored TriggerHandle that we performed the task. Internally we just set a boolean to signal that the method was called.

Every Trigger requires a corresponding class method which returns a boolean stating if the Trigger was actually triggered or not. In our case these are the two const methods hasPerformedAction and isActivated.

    bool hasPerformedAction() const noexcept
    {
        return m_hasPerformedAction;
    }

    bool isActivated() const noexcept
    {
        return m_isActivated;
    }

Since the following methods should not be accessible by the public but must be accessible by any Notifyable like the WaitSet and to avoid that we have to befriend every possible Notifyable we created the EventAttorney. Every Triggerable has to befriend the EventAttorney which provides access to the private methods enableEvent, disableEvent, invalidateTrigger and getHasTriggeredCallbackForEvent to all Notifyables.

    friend iox::popo::EventAttorney;

The method enableEvent is called by the WaitSet when MyTriggerClass is being attached to it. During that process the WaitSet creates a triggerHandle and forwards the event to which this handle belongs.

In the switch case statement we assign the triggerHandle to the corresponding internal trigger handle.

    void enableEvent(iox::popo::TriggerHandle&& triggerHandle,
                     const MyTriggerClassEvents event) noexcept
    {
        switch (event)
        {
        case MyTriggerClassEvents::PERFORMED_ACTION:
            m_onActionTrigger = std::move(triggerHandle);
            break;
        case MyTriggerClassEvents::ACTIVATE:
            m_activateTrigger = std::move(triggerHandle);
            break;
        }
    }

The next thing on our checklist is the invalidateTrigger method used by the WaitSet to reset the Trigger when it goes out of scope. Therefore we look up the correct unique trigger id first and then invalidate it to make them unusable in the future.

    void invalidateTrigger(const uint64_t uniqueTriggerId)
    {
        if (m_onActionTrigger.getUniqueId() == uniqueTriggerId)
        {
            m_onActionTrigger.invalidate();
        }
        else if (m_activateTrigger.getUniqueId() == uniqueTriggerId)
        {
            m_activateTrigger.invalidate();
        }
    }

Detaching an event in the WaitSet will lead to a call to disableEvent in our class. In this case we have to reset the corresponding trigger to invalidate and release it from the WaitSet. Like before we use a switch case statement to find the to the event corresponding trigger.

    void disableEvent(const MyTriggerClassEvents event) noexcept
    {
        switch (event)
        {
        case MyTriggerClassEvents::PERFORMED_ACTION:
            m_onActionTrigger.reset();
            break;
        case MyTriggerClassEvents::ACTIVATE:
            m_activateTrigger.reset();
            break;
        }
    }

The last method we have to implement is getHasTriggeredCallbackForEvent. The WaitSet is state based and therefore it requires, beside the condition variable which only states that something has happened, a callback to find the object where it happened. This is the hasTriggerCallback. In our case we either return the method pointer to hasPerformedAction or isActivated depending on which event was requested.

    iox::popo::WaitSetHasTriggeredCallback 
    getHasTriggeredCallbackForEvent(const MyTriggerClassEvents event) const noexcept
    {
        switch (event)
        {
        case MyTriggerClassEvents::PERFORMED_ACTION:
            return {*this, &MyTriggerClass::hasPerformedAction};
        case MyTriggerClassEvents::ACTIVATE:
            return {*this, &MyTriggerClass::isActivated};
        }
        return {};
    }

Using MyTriggerClass

The next thing we define is a free function, our eventLoop, which will handle all events of our waitset. The action is for every trigger the same, resetting the MyTriggerClass event and then call the callback which is attached to the trigger.

void eventLoop()
{
    while (true)
    {
        auto triggerStateVector = waitset->wait();
        for (auto& triggerState : triggerStateVector)
        {
            if (triggerState->getEventId() == ACTIVATE_ID)
            {
                triggerState->getOrigin<MyTriggerClass>()->reset(MyTriggerClassEvents::ACTIVATE);
                (*triggerState)();
            }
            else if (triggerState->getEventId() == ACTION_ID)
            {
                triggerState->getOrigin<MyTriggerClass>()->reset(MyTriggerClassEvents::PERFORMED_ACTION);
                (*triggerState)();
            }
        }
    }
}

We start like in every other example by creating the waitset first. In this case the waitset and the triggerClass are stored inside of two global optional's and have to be created with an emplace call.

waitset.emplace();
triggerClass.emplace();

After that we can attach both triggerClass events to the waitset and provide also a callback for them.

    waitset->attachEvent(*triggerClass, MyTriggerClassEvents::ACTIVATE, ACTIVATE_ID, &callOnActivate).or_else([](auto) {
        std::cerr << "failed to attach MyTriggerClass::ACTIVATE event " << std::endl;
        std::terminate();
    });
    waitset
        ->attachEvent(*triggerClass, MyTriggerClassEvents::PERFORMED_ACTION, ACTION_ID, &MyTriggerClass::callOnAction)
        .or_else([](auto) {
            std::cerr << "failed to attach MyTriggerClass::PERFORMED_ACTION event " << std::endl;
            std::terminate();
        });

Now that everything is set up we can start our eventLoop in a new thread.

    std::thread eventLoopThread(eventLoop);

A thread which will trigger an event every second is started with the following lines.

    std::thread triggerThread([&] {
        int activationCode = 1;
        for (auto i = 0; i < 10; ++i)
        {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            triggerClass->activate(activationCode++);
            std::this_thread::sleep_for(std::chrono::seconds(1));
            triggerClass->performAction();
        }
    });