From 767458e450a3d319f37a0fa1e6cf56af86a3f8e1 Mon Sep 17 00:00:00 2001 From: Adrian Del Grosso <10929341+ad3154@users.noreply.github.com> Date: Thu, 28 Mar 2024 14:55:23 -0600 Subject: [PATCH] [Core]: Add support for CMSIS RTOS 2 Thread synchronization --- CMakeLists.txt | 11 + .../can_hardware_interface.hpp | 12 +- .../src/can_hardware_interface.cpp | 57 ++- isobus/src/isobus_task_controller_client.cpp | 2 +- isobus/src/isobus_virtual_terminal_client.cpp | 2 +- .../isobus/utility/thread_synchronization.hpp | 366 +++++++++++++++++- 6 files changed, 441 insertions(+), 9 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 97f6c4a8..2639ecd7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,6 +38,17 @@ if(NOT CAN_STACK_DISABLE_THREADS AND NOT ARDUINO) endif() endif() +option( + USE_CMSIS_RTOS2_THREADING + "Set to ON to use ARM CMSIS RTOS2 thread syncronization. Replaces std::mutex and implements a CMSIS lock_guard." + OFF) +if(USE_CMSIS_RTOS2_THREADING) + message( + AUTHOR_WARNING + "Using CMSIS RTOS2 threading requires you to implement a hardware timebase (_gettimeofday) for the stack. Make sure you do this using a hardware timer!" + ) +endif() + # A handy function to prepend text to all elements in a list (useful for # subdirectories) function(prepend var prefix) diff --git a/hardware_integration/include/isobus/hardware_integration/can_hardware_interface.hpp b/hardware_integration/include/isobus/hardware_integration/can_hardware_interface.hpp index 9b836222..46d19e5d 100644 --- a/hardware_integration/include/isobus/hardware_integration/can_hardware_interface.hpp +++ b/hardware_integration/include/isobus/hardware_integration/can_hardware_interface.hpp @@ -146,9 +146,13 @@ namespace isobus void stop_threads(); /// @brief The receiving thread loop for this CAN channel +#ifdef USE_CMSIS_RTOS2_THREADING + static void receive_thread_function(void *parent); +#else void receive_thread_function(); +#endif - std::unique_ptr receiveMessageThread; ///< Thread to manage getting messages from a CAN channel + std::unique_ptr receiveMessageThread; ///< Thread to manage getting messages from a CAN channel bool receiveThreadRunning = false; ///< Flag to indicate if the receive thread is running #endif @@ -170,7 +174,11 @@ namespace isobus virtual ~CANHardwareInterface(); /// @brief The main thread loop for updating the stack +#if defined USE_CMSIS_RTOS2_THREADING + static void update_thread_function(void *); +#else static void update_thread_function(); +#endif /// @brief Starts all threads related to the hardware interface static void start_threads(); @@ -178,7 +186,7 @@ namespace isobus /// @brief Stops all threads related to the hardware interface static void stop_threads(); - static std::unique_ptr updateThread; ///< The main thread + static std::unique_ptr updateThread; ///< The main thread static std::condition_variable updateThreadWakeupCondition; ///< A condition variable to allow for signaling the `updateThread` to wakeup #endif static std::uint32_t lastUpdateTimestamp; ///< The last time the network manager was updated diff --git a/hardware_integration/src/can_hardware_interface.cpp b/hardware_integration/src/can_hardware_interface.cpp index 17b9d2da..e6f0768b 100644 --- a/hardware_integration/src/can_hardware_interface.cpp +++ b/hardware_integration/src/can_hardware_interface.cpp @@ -18,7 +18,7 @@ namespace isobus { #if !defined CAN_STACK_DISABLE_THREADS && !defined ARDUINO - std::unique_ptr CANHardwareInterface::updateThread; + std::unique_ptr CANHardwareInterface::updateThread; std::condition_variable CANHardwareInterface::updateThreadWakeupCondition; #endif std::uint32_t CANHardwareInterface::periodicUpdateInterval = PERIODIC_UPDATE_INTERVAL; @@ -112,7 +112,11 @@ namespace isobus receiveThreadRunning = true; if (nullptr == receiveMessageThread) { - receiveMessageThread.reset(new std::thread([this]() { receive_thread_function(); })); +#ifdef USE_CMSIS_RTOS2_THREADING + receiveMessageThread.reset(new Thread(receive_thread_function, this)); +#else + receiveMessageThread.reset(new Thread([this]() { receive_thread_function(); })); +#endif } } @@ -129,6 +133,31 @@ namespace isobus } } +#ifdef USE_CMSIS_RTOS2_THREADING + void CANHardwareInterface::CANHardware::receive_thread_function(void *parent) + { + auto target = static_cast(parent); + while (target->receiveThreadRunning) + { + if ((nullptr != target->frameHandler) && target->frameHandler->get_is_valid()) + { + if (!target->receive_can_frame()) + { + // There was no frame to receive, and osThreadYield may not exist + osDelay(1); + } + else + { + CANHardwareInterface::updateThreadWakeupCondition.notify_all(); + } + } + else + { + osDelay(1000); // Arbitrary, but don't want to infinite loop on the validity check. + } + } + } +#else void CANHardwareInterface::CANHardware::receive_thread_function() { while (receiveThreadRunning) @@ -151,6 +180,7 @@ namespace isobus } } } +#endif #endif bool send_can_message_frame_to_hardware(const CANMessageFrame &frame) @@ -407,6 +437,22 @@ namespace isobus } #if !defined CAN_STACK_DISABLE_THREADS && !defined ARDUINO + +#if defined USE_CMSIS_RTOS2_THREADING + void CANHardwareInterface::update_thread_function(void *) + { + hardwareChannelsMutex.lock(); + // Wait until everything is running + hardwareChannelsMutex.unlock(); + + while (started) + { + LOCK_GUARD(Mutex, updateMutex); + updateThreadWakeupCondition.wait_for(updateMutex, periodicUpdateInterval); // Update with at least the periodic interval + update(); + } + } +#else void CANHardwareInterface::update_thread_function() { std::unique_lock hardwareLock(hardwareChannelsMutex); @@ -420,13 +466,18 @@ namespace isobus update(); } } +#endif void CANHardwareInterface::start_threads() { started = true; if (nullptr == updateThread) { - updateThread.reset(new std::thread(update_thread_function)); +#ifdef USE_CMSIS_RTOS2_THREADING + updateThread.reset(new Thread(update_thread_function, nullptr)); +#else + updateThread.reset(new Thread(update_thread_function)); +#endif } } diff --git a/isobus/src/isobus_task_controller_client.cpp b/isobus/src/isobus_task_controller_client.cpp index 9e19140b..9f6f4531 100644 --- a/isobus/src/isobus_task_controller_client.cpp +++ b/isobus/src/isobus_task_controller_client.cpp @@ -60,7 +60,7 @@ namespace isobus if (!initialized) { -#if !defined CAN_STACK_DISABLE_THREADS && !defined ARDUINO +#if !defined CAN_STACK_DISABLE_THREADS && !defined ARDUINO && !defined USE_CMSIS_RTOS2_THREADING if (spawnThread) { workerThread = new std::thread([this]() { worker_thread_function(); }); diff --git a/isobus/src/isobus_virtual_terminal_client.cpp b/isobus/src/isobus_virtual_terminal_client.cpp index b8af3ec6..e3bdec9e 100644 --- a/isobus/src/isobus_virtual_terminal_client.cpp +++ b/isobus/src/isobus_virtual_terminal_client.cpp @@ -60,7 +60,7 @@ namespace isobus { languageCommandInterface.initialize(); } -#if !defined CAN_STACK_DISABLE_THREADS && !defined ARDUINO +#if !defined CAN_STACK_DISABLE_THREADS && !defined ARDUINO && !defined USE_CMSIS_RTOS2_THREADING if (spawnThread) { workerThread = new std::thread([this]() { worker_thread_function(); }); diff --git a/utility/include/isobus/utility/thread_synchronization.hpp b/utility/include/isobus/utility/thread_synchronization.hpp index bae1e7b7..909467eb 100644 --- a/utility/include/isobus/utility/thread_synchronization.hpp +++ b/utility/include/isobus/utility/thread_synchronization.hpp @@ -14,14 +14,24 @@ namespace isobus { - /// @brief A dummy mutex class when treading is disabled. + /// @brief A dummy mutex class when threading is disabled. class Mutex { }; - /// @brief A dummy recursive mutex class when treading is disabled. + /// @brief A dummy recursive mutex class when threading is disabled. class RecursiveMutex { }; + + /// @brief A dummy condition variable for when threading is disabled + class ConditionVariable + { + }; + + /// @brief A dummy thread for when threading is disabled + class Thread + { + }; } /// @brief Disabled LOCK_GUARD macro since threads are disabled. #define LOCK_GUARD(type, x) @@ -88,16 +98,368 @@ class LockFreeQueue std::queue queue; ///< The queue }; +#elif defined USE_CMSIS_RTOS2_THREADING + +#include "cmsis_os.h" + +#include +#include +#include +#include + +namespace isobus +{ + /// @brief A wrapper around a CMSIS RTOS 2 mutex. + /// @details See definition at https://www.keil.com/pack/doc/CMSIS/RTOS2/html/group__CMSIS__RTOS.html + class Mutex + { + public: + /// @brief Constructor for the CMSIS RTOS2 mutex wrapper + Mutex() : + handle(nullptr) + { + } + + /// @brief Locks the mutex. Part of BasicLockable requirements. + void lock() + { + if (ready()) + { + osStatus_t osRetVal = osMutexAcquire(handle, osWaitForever); + + if (osOK != osRetVal) + { + while (true) + { + // If your code is stuck in here, that means you did something + // very wrong, like recursively locked this mutex, or tried to + // lock the mutex before the OS was initialized, or called this in + // an interrupt service routine. + // osRetVal may contain more information. + } + } + } + } + + /// @brief Attempts to the mutex, and doesn't wait if it's not available. + /// @returns true if the mutex was successfully locked, false otherwise. + bool try_lock() + { + bool retVal = false; + + if (ready()) + { + osStatus_t osRetVal = osMutexAcquire(handle, 0); + retVal = (osOK == osRetVal); + } + return retVal; + } + + /// @brief Unlocks the mutex. Part of BasicLockable requirements. + void unlock() + { + if (nullptr != handle) + { + osStatus_t osRetVal = osMutexRelease(handle); + + if (osOK != osRetVal) + { + while (true) + { + // If your code is stuck in here, that means you + // either tried to release a mutex which is owned by a different thread, + // or the release failed due to some other OS reason. + // osRetVal may contain more information. + } + } + } + else + { + while (true) + { + // If your code is stuck in here, it's because you tried to unlock a + // mutex which doesn't exist. Don't do that. + // osRetVal may contain more information. + } + } + } + + protected: + /// @brief Checks if the mutex is ready to be used. Initializes the mutex if it's not. + /// @returns true if the mutex is ready to be used, false otherwise. + virtual bool ready() + { + if (nullptr == handle) + { + const osMutexAttr_t attributes = { + nullptr, +#ifdef osCMSIS_FreeRTOS // FreeRTOS doesn't support robust mutexes + osMutexPrioInherit, +#else + osMutexPrioInherit | osMutexRobust, +#endif + nullptr, + 0 + }; + handle = osMutexNew(&attributes); + + while (nullptr == handle) + { + // If your code is stuck in here, it means + // the RTOS didn't have enough memory available to make another + // mutex. Increase the global memory pool (sometimes called the OS heap) + // or reduce its usage by statically allocating your thread's stack(s) + } + } + return nullptr != handle; + } + osMutexId_t handle; ///< Mutex ID for reference by other functions or NULL in case of error or not yet initialized + }; + + /// @brief A wrapper around a CMSIS RTOS 2 recursive mutex. + /// @details See definition at https://www.keil.com/pack/doc/CMSIS/RTOS2/html/group__CMSIS__RTOS.html + class RecursiveMutex : public Mutex + { + protected: + /// @brief Checks if the mutex is ready to be used. + /// Initializes the mutex if it's not. + /// @returns true if the mutex is ready to be used, false otherwise. + bool ready() override + { + if (nullptr == handle) + { + const osMutexAttr_t attributes = { + nullptr, +#ifdef osCMSIS_FreeRTOS // FreeRTOS doesn't support robust mutexes + osMutexPrioInherit | osMutexRecursive, +#else + osMutexPrioInherit | osMutexRobust | osMutexRecursive, +#endif + nullptr, + 0 + }; + handle = osMutexNew(&attributes); + } + return nullptr != handle; + } + }; + + template + /// @brief A class to automatically lock and unlock a mutex when the scope ends. + /// Meant for systems with no support for std::lock_guard. + class LockGuard + { + public: + /// @brief Constructor for the LockGuard class. + /// @param mutex The mutex to lock. + /// @details Locks the mutex when the scope starts. + /// Unlocks the mutex when the scope ends. + LockGuard(T *mutex) : + lockable(mutex) + { + lockable->lock(); + } + + /// @brief Destructor for the LockGuard class. + /// @details Unlocks the mutex when the scope ends. + ~LockGuard() + { + lockable->unlock(); + } + + private: + T *lockable; ///< The mutex to lock and unlock. + }; + + /// @brief A class that emulates a condition variable using CMSIS thread flags + class ConditionVariable + { + public: + ConditionVariable() = default; + + void wait_for(Mutex &, std::uint32_t timeout) + { + targetThread = osThreadGetId(); + osThreadFlagsWait(0x00000001, osFlagsWaitAny, timeout); + } + + void notify_all() + { + osThreadFlagsSet(targetThread, 0x00000001); + } + + private: + osThreadId_t targetThread = NULL; + }; + + class Thread + { + public: + Thread(osThreadFunc_t threadFunction, void *argument) + { + osThreadAttr_t attributes = { + NULL, + osThreadDetached, + NULL, + 0, + NULL, + 0, + osPriorityNormal, + 0, + 0 + }; + myID = osThreadNew(threadFunction, argument, &attributes); + if (NULL == myID) + { + while (true) + { + // If your code is stuck in here, it means you don't have enough OS memory left to spawn the thread. + // Consider increasing it. + } + } + else + { + isJoinable = true; + } + } + + bool joinable() const + { + return isJoinable; + } + + void join() + { + if (isJoinable) + { + osThreadTerminate(myID); + } + } + + private: + osThreadId_t myID = NULL; + bool isJoinable = false; + }; +} // namespace isobus + +namespace std +{ + using mutex = isobus::Mutex; + using recursive_mutex = isobus::RecursiveMutex; + using condition_variable = isobus::ConditionVariable; +} // namespace std + +#define LOCK_GUARD(type, x) const LockGuard x##Lock(&x) + +/// @brief A template class for a lock free queue. +/// @tparam T The item type for the queue. +template +class LockFreeQueue +{ +public: + /// @brief Constructor for the lock free queue. + explicit LockFreeQueue(std::size_t size) : + buffer(size), capacity(size) + { + // Validate the size of the queue, if assertion is disabled, set the size to 1. + assert(size > 0 && "The size of the queue must be greater than 0."); + if (size == 0) + { + size = 1; + } + } + + /// @brief Push an item to the queue. + /// @param item The item to push to the queue. + /// @return True if the item was pushed to the queue, false if the queue is full. + bool push(const T &item) + { + const auto currentWriteIndex = writeIndex.load(std::memory_order_relaxed); + const auto nextWriteIndex = nextIndex(currentWriteIndex); + + if (nextWriteIndex == readIndex.load(std::memory_order_acquire)) + { + // The buffer is full. + return false; + } + + buffer[currentWriteIndex] = item; + writeIndex.store(nextWriteIndex, std::memory_order_release); + return true; + } + + /// @brief Peek at the next item in the queue. + /// @param item The item to peek at in the queue. + /// @return True if the item was peeked at in the queue, false if the queue is empty. + bool peek(T &item) + { + const auto currentReadIndex = readIndex.load(std::memory_order_relaxed); + if (currentReadIndex == writeIndex.load(std::memory_order_acquire)) + { + // The buffer is empty. + return false; + } + + item = buffer[currentReadIndex]; + return true; + } + + /// @brief Pop an item from the queue. + /// @return True if the item was popped from the queue, false if the queue is empty. + bool pop() + { + const auto currentReadIndex = readIndex.load(std::memory_order_relaxed); + if (currentReadIndex == writeIndex.load(std::memory_order_acquire)) + { + // The buffer is empty. + return false; + } + + readIndex.store(nextIndex(currentReadIndex), std::memory_order_release); + return true; + } + + /// @brief Check if the queue is full. + /// @return True if the queue is full, false if the queue is not full. + bool is_full() const + { + return nextIndex(writeIndex.load(std::memory_order_acquire)) == readIndex.load(std::memory_order_acquire); + } + + /// @brief Clear the queue. + void clear() + { + // Simply move the read index to the write index. + readIndex.store(writeIndex.load(std::memory_order_acquire), std::memory_order_release); + } + +private: + std::vector buffer; ///< The buffer for the circular buffer. + std::atomic readIndex = { 0 }; ///< The read index for the circular buffer. + std::atomic writeIndex = { 0 }; ///< The write index for the circular buffer. + const std::size_t capacity; ///< The capacity of the circular buffer. + + /// @brief Get the next index in the circular buffer. + /// @param current The current index. + /// @return The next index in the circular buffer. + std::size_t nextIndex(std::size_t current) const + { + return (current + 1) % capacity; + } +}; + #else #include #include #include +#include #include namespace isobus { using Mutex = std::mutex; using RecursiveMutex = std::recursive_mutex; + using Thread = std::thread; } /// @brief A macro to automatically lock a mutex and unlock it when the scope ends. /// @param type The type of the mutex.