diff --git a/include/ocpp/common/safe_queue.hpp b/include/ocpp/common/safe_queue.hpp new file mode 100644 index 000000000..dc8901771 --- /dev/null +++ b/include/ocpp/common/safe_queue.hpp @@ -0,0 +1,118 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2020 - 2023 Pionix GmbH and Contributors to EVerest + +#pragma once + +#include +#include +#include + +namespace ocpp { + +/// \brief Thread safe message queue +template class SafeQueue { + using safe_queue_reference = typename std::queue::reference; + using safe_queue_const_reference = typename std::queue::const_reference; + +public: + /// \return True if the queue is empty + inline bool empty() const { + std::lock_guard lock(mutex); + return queue.empty(); + } + + inline safe_queue_reference front() const { + std::lock_guard lock(mutex); + return queue.front(); + } + + inline safe_queue_const_reference front() { + std::lock_guard lock(mutex); + return queue.front(); + } + + /// \return retrieves and removes the first element in the queue. Undefined behavior if the queue is empty + inline T pop() { + std::lock_guard lock(mutex); + + T front = std::move(queue.front()); + queue.pop(); + + return front; + } + + /// \brief Queues an element and notifies any threads waiting on the internal conditional variable + inline void push(T&& value) { + { + std::lock_guard lock(mutex); + queue.push(value); + } + + notify_waiting_thread(); + } + + /// \brief Queues an element and notifies any threads waiting on the internal conditional variable + inline void push(const T& value) { + { + std::lock_guard lock(mutex); + queue.push(value); + } + + notify_waiting_thread(); + } + + /// \brief Clears the queue + inline void clear() { + std::lock_guard lock(mutex); + + std::queue empty; + empty.swap(queue); + } + + /// \brief Waits seconds for the queue to receive an element + /// \param seconds Count of seconds to wait, pass in a value <= 0 to wait indefinitely + inline void wait_on_queue(int seconds = -1) { + std::unique_lock lock(mutex); + + if (seconds > 0) { + cv.wait_for(lock, std::chrono::seconds(seconds), [&]() { return (false == queue.empty()); }); + } else { + cv.wait(lock, [&]() { return (false == queue.empty()); }); + } + } + + /// \brief Same as 'wait_on_queue' but receives an additional predicate to wait upon + template inline void wait_on_queue(Predicate pred, int seconds = -1) { + std::unique_lock lock(mutex); + + if (seconds > 0) { + cv.wait_for(lock, std::chrono::seconds(seconds), [&]() { return (false == queue.empty()) or pred(); }); + } else { + cv.wait(lock, [&]() { return (false == queue.empty()) or pred(); }); + } + } + + /// \brief Waits on the queue for a custom event + template inline void wait_on_custom(Predicate pred, int seconds = -1) { + std::unique_lock lock(mutex); + + if (seconds > 0) { + cv.wait_for(lock, std::chrono::seconds(seconds), [&]() { return pred(); }); + } else { + cv.wait(lock, [&]() { return pred(); }); + } + } + + /// \brief Notifies a single waiting thread to wake up + inline void notify_waiting_thread() { + cv.notify_one(); + } + +private: + std::queue queue; + + mutable std::mutex mutex; + std::condition_variable cv; +}; + +} // namespace ocpp \ No newline at end of file diff --git a/include/ocpp/common/websocket/websocket_libwebsockets.hpp b/include/ocpp/common/websocket/websocket_libwebsockets.hpp index 08d6f5ff5..98c2394df 100644 --- a/include/ocpp/common/websocket/websocket_libwebsockets.hpp +++ b/include/ocpp/common/websocket/websocket_libwebsockets.hpp @@ -4,6 +4,7 @@ #define OCPP_WEBSOCKET_TLS_TPM_HPP #include +#include #include #include @@ -20,112 +21,6 @@ namespace ocpp { struct ConnectionData; struct WebsocketMessage; -/// \brief Thread safe message queue -template class SafeQueue { - using safe_queue_reference = typename std::queue::reference; - using safe_queue_const_reference = typename std::queue::const_reference; - -public: - /// \return True if the queue is empty - inline bool empty() const { - std::lock_guard lock(mutex); - return queue.empty(); - } - - inline safe_queue_reference front() const { - std::lock_guard lock(mutex); - return queue.front(); - } - - inline safe_queue_const_reference front() { - std::lock_guard lock(mutex); - return queue.front(); - } - - /// \return retrieves and removes the first element in the queue. Undefined behavior if the queue is empty - inline T pop() { - std::lock_guard lock(mutex); - - T front = std::move(queue.front()); - queue.pop(); - - return front; - } - - /// \brief Queues an element and notifies any threads waiting on the internal conditional variable - inline void push(T&& value) { - { - std::lock_guard lock(mutex); - queue.push(value); - } - - notify_waiting_thread(); - } - - /// \brief Queues an element and notifies any threads waiting on the internal conditional variable - inline void push(const T& value) { - { - std::lock_guard lock(mutex); - queue.push(value); - } - - notify_waiting_thread(); - } - - /// \brief Clears the queue - inline void clear() { - std::lock_guard lock(mutex); - - std::queue empty; - empty.swap(queue); - } - - /// \brief Waits seconds for the queue to receive an element - /// \param seconds Count of seconds to wait, pass in a value <= 0 to wait indefinitely - inline void wait_on_queue(int seconds = -1) { - std::unique_lock lock(mutex); - - if (seconds > 0) { - cv.wait_for(lock, std::chrono::seconds(seconds), [&]() { return (false == queue.empty()); }); - } else { - cv.wait(lock, [&]() { return (false == queue.empty()); }); - } - } - - /// \brief Same as 'wait_on_queue' but receives an additional predicate to wait upon - template inline void wait_on_queue(Predicate pred, int seconds = -1) { - std::unique_lock lock(mutex); - - if (seconds > 0) { - cv.wait_for(lock, std::chrono::seconds(seconds), [&]() { return (false == queue.empty()) or pred(); }); - } else { - cv.wait(lock, [&]() { return (false == queue.empty()) or pred(); }); - } - } - - /// \brief Waits on the queue for a custom event - template inline void wait_on_custom(Predicate pred, int seconds = -1) { - std::unique_lock lock(mutex); - - if (seconds > 0) { - cv.wait_for(lock, std::chrono::seconds(seconds), [&]() { return pred(); }); - } else { - cv.wait(lock, [&]() { return pred(); }); - } - } - - /// \brief Notifies a single waiting thread to wake up - inline void notify_waiting_thread() { - cv.notify_one(); - } - -private: - std::queue queue; - - mutable std::mutex mutex; - std::condition_variable cv; -}; - /// \brief Experimental libwebsockets TLS connection class WebsocketLibwebsockets final : public WebsocketBase { public: