From 5c232e0cda56fd269ef35be694a9c231fccd0e36 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Sun, 3 Nov 2024 12:57:37 -0800 Subject: [PATCH 01/42] Use a reference instead of a pointer. --- catkit_core/Synchronization.cpp | 6 +++--- catkit_core/Synchronization.h | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/catkit_core/Synchronization.cpp b/catkit_core/Synchronization.cpp index 8da03b86e..54e6aa2b9 100644 --- a/catkit_core/Synchronization.cpp +++ b/catkit_core/Synchronization.cpp @@ -9,15 +9,15 @@ #include "Timing.h" -SynchronizationLock::SynchronizationLock(Synchronization *sync) +SynchronizationLock::SynchronizationLock(Synchronization &sync) : m_Sync(sync) { - m_Sync->Lock(); + m_Sync.Lock(); } SynchronizationLock::~SynchronizationLock() { - m_Sync->Unlock(); + m_Sync.Unlock(); } Synchronization::Synchronization() diff --git a/catkit_core/Synchronization.h b/catkit_core/Synchronization.h index 45ec8dfa9..2aa1a8b6f 100644 --- a/catkit_core/Synchronization.h +++ b/catkit_core/Synchronization.h @@ -30,11 +30,11 @@ struct SynchronizationSharedData class SynchronizationLock { public: - SynchronizationLock(Synchronization *sync); + SynchronizationLock(Synchronization &sync); ~SynchronizationLock(); private: - Synchronization *m_Sync; + Synchronization &m_Sync; }; class Synchronization From 54a6265e0e8d0e2fe08f40b1c1f910b42d53d943 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 17:29:29 -0800 Subject: [PATCH 02/42] Better scope the synchronization lock. It should only be around the tme-critical part. --- catkit_core/DataStream.cpp | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/catkit_core/DataStream.cpp b/catkit_core/DataStream.cpp index 09d20a822..64c0f1924 100644 --- a/catkit_core/DataStream.cpp +++ b/catkit_core/DataStream.cpp @@ -173,22 +173,27 @@ void DataStream::SubmitFrame(size_t id) DataFrameMetadata *meta = m_Header->m_FrameMetadata + (id % m_Header->m_NumFramesInBuffer); meta->m_TimeStamp = GetTimeStamp(); - // Obtain a lock as we are about to modify the condition of the - // synchronization. - auto lock = SynchronizationLock(&m_Synchronization); - - // Make frame available: - // Use a do-while loop to ensure we are never decrementing the last id. - size_t last_id; - do { - last_id = m_Header->m_LastId; - - if (last_id >= id + 1) - break; - } while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1)); + // Obtain a lock as we are about to modify the condition of the + // synchronization. + auto lock = SynchronizationLock(m_Synchronization); + + // Make frame available: + // Use a do-while loop to ensure we are never decrementing the last id. + size_t last_id; + do + { + last_id = m_Header->m_LastId; + + if (last_id >= id + 1) + break; + } while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1)); + + m_Synchronization.Signal(); + } - m_Synchronization.Signal(); + auto ts = GetTimeStamp(); + tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), ts, 0); // Don't update the framerate counter for the first frame. if (id == 0) @@ -207,9 +212,6 @@ void DataStream::SubmitFrame(size_t id) m_Header->m_FrameRateCounter = m_Header->m_FrameRateCounter * std::exp(-FRAMERATE_DECAY * time_delta) + FRAMERATE_DECAY; - - auto ts = GetTimeStamp(); - tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), ts, 0); } void DataStream::SubmitData(const void *data) From 2eec9a7d156b6da3bd7e8744edfb45e1d0e6ede9 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:30:32 -0800 Subject: [PATCH 03/42] Use CRTP instead of preprocessor directives for operating systems. This allows us to separate different types of synchronization within one operating system. --- catkit_core/Synchronization.cpp | 224 -------------------------------- catkit_core/Synchronization.h | 74 +++++------ catkit_core/Synchronization.inl | 121 +++++++++++++++++ 3 files changed, 152 insertions(+), 267 deletions(-) delete mode 100644 catkit_core/Synchronization.cpp create mode 100644 catkit_core/Synchronization.inl diff --git a/catkit_core/Synchronization.cpp b/catkit_core/Synchronization.cpp deleted file mode 100644 index 54e6aa2b9..000000000 --- a/catkit_core/Synchronization.cpp +++ /dev/null @@ -1,224 +0,0 @@ -#include "Synchronization.h" - -#include -#include - -#ifndef _WIN32 - #include -#endif - -#include "Timing.h" - -SynchronizationLock::SynchronizationLock(Synchronization &sync) - : m_Sync(sync) -{ - m_Sync.Lock(); -} - -SynchronizationLock::~SynchronizationLock() -{ - m_Sync.Unlock(); -} - -Synchronization::Synchronization() - : m_IsOwner(false), m_SharedData(nullptr) -{ -} - -Synchronization::~Synchronization() -{ - if (m_SharedData) - { -#ifdef _WIN32 - CloseHandle(m_Semaphore); -#else - -#endif - } -} - -void Synchronization::Initialize(const std::string &id, SynchronizationSharedData *shared_data, bool create) -{ - if (create) - { - Create(id, shared_data); - } - else - { - Open(id, shared_data); - } -} - -void Synchronization::Create(const std::string &id, SynchronizationSharedData *shared_data) -{ - if (m_SharedData) - throw std::runtime_error("Create called on an already initialized Synchronization object."); - - if (!shared_data) - throw std::runtime_error("The passed shared data was a nullptr."); - -#ifdef _WIN32 - m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); - - if (m_Semaphore == NULL) - throw std::runtime_error("Something went wrong while creating semaphore."); - - shared_data->m_NumReadersWaiting = 0; -#else - pthread_mutexattr_t mutex_attr; - pthread_mutexattr_init(&mutex_attr); - pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(&(shared_data->m_Mutex), &mutex_attr); - pthread_mutexattr_destroy(&mutex_attr); - - pthread_condattr_t cond_attr; - pthread_condattr_init(&cond_attr); - pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); -#ifndef __APPLE__ - pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC); -#endif // __APPLE__ - pthread_cond_init(&(shared_data->m_Condition), &cond_attr); - pthread_condattr_destroy(&cond_attr); -#endif // _WIN32 - - m_SharedData = shared_data; -} - -void Synchronization::Open(const std::string &id, SynchronizationSharedData *shared_data) -{ - if (m_SharedData) - throw std::runtime_error("Open called on an already initialized Synchronization object."); - - if (!shared_data) - throw std::runtime_error("The passed shared data was a nullptr."); - -#ifdef _WIN32 - m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); - - if (m_Semaphore == NULL) - throw std::runtime_error("Something went wrong while opening semaphore."); -#else -#endif - - m_SharedData = shared_data; -} - -void Synchronization::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) -{ - if (!m_SharedData) - throw std::runtime_error("Wait() was called before the synchronization was intialized."); - -#ifdef _WIN32 - Timer timer; - DWORD res = WAIT_OBJECT_0; - - while (!condition()) - { - if (res == WAIT_OBJECT_0) - { - // Increment the number of readers that are waiting, making sure the counter - // is at least 1 after the increment. This can occur when a previous reader got - // interrupted and the trigger happening before decrementing the - // m_NumReadersWaiting counter. - while (m_SharedData->m_NumReadersWaiting++ < 0) - { - } - } - - // Wait for a maximum of 20ms to perform periodic error checking. - auto res = WaitForSingleObject(m_Semaphore, (unsigned long) (std::min)(20L, timeout_in_ms)); - - if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) - { - m_SharedData->m_NumReadersWaiting--; - throw std::runtime_error("Waiting time has expired."); - } - - if (res == WAIT_FAILED) - { - m_SharedData->m_NumReadersWaiting--; - throw std::runtime_error("An error occured during waiting for the semaphore: " + std::to_string(GetLastError())); - } - - if (error_check != nullptr) - { - try - { - error_check(); - } - catch (...) - { - m_SharedData->m_NumReadersWaiting--; - throw; - } - } - } -#else - Timer timer; - - while (!condition()) - { - // Wait for a maximum of 20ms to perform periodic error checking. - long timeout_wait = std::min(20L, timeout_in_ms); - -#ifdef __APPLE__ - // Relative timespec. - timespec timeout; - timeout.tv_sec = timeout_wait / 1000; - timeout.tv_nsec = 1000000 * (timeout_wait % 1000); - - int res = pthread_cond_timedwait_relative_np(&(m_SharedData->m_Condition), &(m_SharedData->m_Mutex), &timeout); -#else - // Absolute timespec. - timespec timeout; - clock_gettime(CLOCK_MONOTONIC, &timeout); - timeout.tv_sec += timeout_wait / 1000; - timeout.tv_nsec += 1000000 * (timeout_wait % 1000); - - int res = pthread_cond_timedwait(&(m_SharedData->m_Condition), &(m_SharedData->m_Mutex), &timeout); -#endif // __APPLE__ - if (res == ETIMEDOUT && timer.GetTime() > (timeout_in_ms * 0.001)) - { - throw std::runtime_error("Waiting time has expired."); - } - - if (error_check != nullptr) - error_check(); - } -#endif // _WIN32 -} - -void Synchronization::Signal() -{ - if (!m_SharedData) - throw std::runtime_error("Signal() was called before the synchronization was intialized."); - -#ifdef _WIN32 - // Notify waiting processes. - long num_readers_waiting = m_SharedData->m_NumReadersWaiting.exchange(0); - - // If a reader times out in between us reading the number of readers that are waiting - // and us releasing the semaphore, we are releasing one too many readers. This - // results in a future reader being released immediately, which is not a problem, - // as there are checks in place for that. - - if (num_readers_waiting > 0) - ReleaseSemaphore(m_Semaphore, (LONG) num_readers_waiting, NULL); -#else - pthread_cond_broadcast(&(m_SharedData->m_Condition)); -#endif // _WIN32 -} - -void Synchronization::Lock() -{ -#ifndef _WIN32 - pthread_mutex_lock(&(m_SharedData->m_Mutex)); -#endif // _WIN32 -} - -void Synchronization::Unlock() -{ -#ifndef _WIN32 - pthread_mutex_unlock(&(m_SharedData->m_Mutex)); -#endif // _WIN32 -} diff --git a/catkit_core/Synchronization.h b/catkit_core/Synchronization.h index 2aa1a8b6f..a00fce299 100644 --- a/catkit_core/Synchronization.h +++ b/catkit_core/Synchronization.h @@ -6,47 +6,24 @@ #include #include -#ifdef _WIN32 - #define WIN32_LEAN_AND_MEAN - #define NOMINMAX - #include -#else - #include - #include -#endif // _WIN32 - -class Synchronization; - -struct SynchronizationSharedData -{ -#ifdef _WIN32 - std::atomic_long m_NumReadersWaiting; -#else - pthread_cond_t m_Condition; - pthread_mutex_t m_Mutex; -#endif -}; - -class SynchronizationLock +template +class SynchronizationBase { public: - SynchronizationLock(Synchronization &sync); - ~SynchronizationLock(); + using SharedState = SharedStateType; -private: - Synchronization &m_Sync; -}; + SynchronizationBase(); + SynchronizationBase(const SynchronizationBase &other) = delete; + ~SynchronizationBase(); -class Synchronization -{ -public: - Synchronization(); - Synchronization(const Synchronization &other) = delete; - ~Synchronization(); + SynchronizationBase &operator=(const SynchronizationBase &other) = delete; + + void Initialize(const std::string &id, SharedState *shared_state, bool create); - Synchronization &operator=(const Synchronization &other) = delete; + void Create(const std::string &id, SharedState *shared_state); - void Initialize(const std::string &id, SynchronizationSharedData *shared_data, bool create); + void Open(const std::string &id, SharedState *shared_state); + void Close(); void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); void Signal(); @@ -54,17 +31,28 @@ class Synchronization void Lock(); void Unlock(); -private: - void Create(const std::string &id, SynchronizationSharedData *shared_data); - void Open(const std::string &id, SynchronizationSharedData *shared_data); +protected: + void CreateImpl(const std::string &id, SharedState *shared_state); + + void OpenImpl(const std::string &id, SharedState *shared_state); + void CloseImpl(); bool m_IsOwner; - SynchronizationSharedData *m_SharedData; - std::string m_Id; + bool m_IsOpen; + SharedState *m_SharedState; +}; + +template +class SynchronizationLock +{ +public: + SynchronizationLock(T &sync); + ~SynchronizationLock(); -#ifdef _WIN32 - HANDLE m_Semaphore; -#endif +private: + T &m_Sync; }; +#include "Synchronization.inl" + #endif // SYNCHRONIZATION_H diff --git a/catkit_core/Synchronization.inl b/catkit_core/Synchronization.inl new file mode 100644 index 000000000..be6f89571 --- /dev/null +++ b/catkit_core/Synchronization.inl @@ -0,0 +1,121 @@ +#include "Synchronization.h" + +template +SynchronizationLock::SynchronizationLock(T &sync) + : m_Sync(sync) +{ + m_Sync.Lock(); +} + +template +SynchronizationLock::~SynchronizationLock() +{ + m_Sync.Unlock(); +} + +template +SynchronizationBase::SynchronizationBase() + : m_IsOwner(false), m_SharedState(nullptr), m_IsOpen(false) +{ +} + +template +SynchronizationBase::~SynchronizationBase() +{ + Close(); +} + +template +void SynchronizationBase::Initialize(const std::string &id, SharedState *shared_state, bool create) +{ + if (create) + { + Create(id, shared_state); + } + else + { + Open(id, shared_state); + } +} + +template +void SynchronizationBase::Create(const std::string &id, SharedState *shared_state) +{ + if (m_IsOpen) + throw std::runtime_error("Create called on an already initialized Synchronization object."); + + if (!shared_state) + throw std::runtime_error("The passed shared data was a nullptr."); + + static_cast(this)->CreateImpl(id, shared_state); + + m_IsOpen = true; + m_IsOwner = true; + + m_SharedState = shared_state; +} + +template +void SynchronizationBase::Open(const std::string &id, SharedState *shared_state) +{ + if (m_IsOpen) + throw std::runtime_error("Open called on an already initialized Synchronization object."); + + if (!shared_state) + throw std::runtime_error("The passed shared data was a nullptr."); + + + static_cast(this)->CreateImpl(id, shared_state); + + m_IsOpen = true; + m_IsOwner = false; + + m_SharedState = shared_state; +} + +template +void SynchronizationBase::Close() +{ + if (!m_IsOpen) + return; + + static_cast(this)->CloseImpl(); + + m_IsOpen = false; + m_SharedState = nullptr; +} + +template +void SynchronizationBase::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ +} + +template +void SynchronizationBase::Signal() +{ +} + +template +void SynchronizationBase::Lock() +{ +} + +template +void SynchronizationBase::Unlock() +{ +} + +template +void SynchronizationBase::CreateImpl(const std::string &id, SharedState *shared_state) +{ +} + +template +void SynchronizationBase::OpenImpl(const std::string &id, SharedState *shared_state) +{ +} + +template +void SynchronizationBase::CloseImpl() +{ +} From 399b46b87de4a362fe7f7d1a02e1c253b7c5a4dd Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:31:01 -0800 Subject: [PATCH 04/42] Add Windows semaphore synchronization. --- catkit_core/SynchronizationSemaphore.cpp | 95 ++++++++++++++++++++++++ catkit_core/SynchronizationSemaphore.h | 43 +++++++++++ 2 files changed, 138 insertions(+) create mode 100644 catkit_core/SynchronizationSemaphore.cpp create mode 100644 catkit_core/SynchronizationSemaphore.h diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp new file mode 100644 index 000000000..459821bae --- /dev/null +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -0,0 +1,95 @@ +#include "SynchronizationSemaphore.h" + +#ifdef _WIN32 +void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void *(error_check)()) +{ + Timer timer; + DWORD res = WAIT_OBJECT_0; + + while (!condition()) + { + if (res == WAIT_OBJECT_0) + { + // Increment the number of readers that are waiting, making sure the counter + // is at least 1 after the increment. This can occur when a previous reader got + // interrupted and the trigger happening before decrementing the + // m_NumReadersWaiting counter. + while (m_SharedData->m_NumReadersWaiting++ < 0) + { + } + } + + // Wait for a maximum of 20ms to perform periodic error checking. + auto res = WaitForSingleObject(m_Semaphore, (unsigned long) (std::min)(20L, timeout_in_ms)); + + if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) + { + m_SharedData->m_NumReadersWaiting--; + throw std::runtime_error("Waiting time has expired."); + } + + if (res == WAIT_FAILED) + { + m_SharedData->m_NumReadersWaiting--; + throw std::runtime_error("An error occured during waiting for the semaphore: " + std::to_string(GetLastError())); + } + + if (error_check != nullptr) + { + try + { + error_check(); + } + catch (...) + { + m_SharedData->m_NumReadersWaiting--; + throw; + } + } + } +} + +void SynchronizationSemaphore::Signal() +{ + // Notify waiting processes. + long num_readers_waiting = m_SharedData->m_NumReadersWaiting.exchange(0); + + // If a reader times out in between us reading the number of readers that are waiting + // and us releasing the semaphore, we are releasing one too many readers. This + // results in a future reader being released immediately, which is not a problem, + // as there are checks in place for that. + + if (num_readers_waiting > 0) + ReleaseSemaphore(m_Semaphore, (LONG) num_readers_waiting, NULL); +} + +void SynchronizationSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) +{ + m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); + + if (m_Semaphore == NULL) + throw std::runtime_error("Something went wrong while creating semaphore."); + + shared_data->m_NumReadersWaiting = 0; +} + +void SynchronizationSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) +{ + m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); + + if (m_Semaphore == NULL) + throw std::runtime_error("Something went wrong while opening semaphore."); +} + +void SynchronizationSempahore::CloseImpl() +{ + CloseHandle(m_Semaphore); +} + +#ifdef __linux__ +#endif // __linux__ + +#ifdef __APPLE__ +#endif // __APPLE__ + +#endif diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h new file mode 100644 index 000000000..c2ad265d2 --- /dev/null +++ b/catkit_core/SynchronizationSemaphore.h @@ -0,0 +1,43 @@ +#ifndef SYNCHRONIZATION_SEMAPHORE_H +#define SYNCHRONIZATION_SEMAPHORE_H + +#include "Synchronization.h" + +#include + +#ifdef _WIN32 + #define WIN32_LEAN_AND_MEAN + #define NOMINMAX + #include +#endif + +#ifdef _WIN32 +struct SharedDataWindowsSemaphore +{ + std::atomic_long m_NumReadersWaiting; +}; + +class SynchronizationWindowsSemaphore : public SynchronizationBase +{ +public: + void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); + void Signal(); + + void Lock(); + void Unlock(); + +protected: + void Create(const std::string &id, SharedState *shared_state); + void Open(const std::string &id, SharedState *shared_state); + + HANDLE m_Semaphore; +}; +#endif // _WIN32 + +#ifdef __linux__ +#endif // __linux__ + +#ifdef __APPLE__ +#endif // __APPLE__ + +# endif // SYNCHRONIZATION_SEMAPHORE_H From b663890c67deb556f43374ea55055fda4c2742e8 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:31:18 -0800 Subject: [PATCH 05/42] Add Posix condition variable synchronization. --- .../SynchronizationConditionVariable.cpp | 75 +++++++++++++++++++ .../SynchronizationConditionVariable.h | 32 ++++++++ 2 files changed, 107 insertions(+) create mode 100644 catkit_core/SynchronizationConditionVariable.cpp create mode 100644 catkit_core/SynchronizationConditionVariable.h diff --git a/catkit_core/SynchronizationConditionVariable.cpp b/catkit_core/SynchronizationConditionVariable.cpp new file mode 100644 index 000000000..8bc97eaff --- /dev/null +++ b/catkit_core/SynchronizationConditionVariable.cpp @@ -0,0 +1,75 @@ +#include "SynchronizationConditionVariable.h" + +#include "Timing.h" + +#if defined(__linux__) || defined(__APPLE__) + +void SynchronizationConditionVariable::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ + Timer timer; + + while (!condition()) + { + // Wait for a maximum of 20ms to perform periodic error checking. + long timeout_wait = std::min(20L, timeout_in_ms); + +#ifdef __APPLE__ + // Relative timespec. + timespec timeout; + timeout.tv_sec = timeout_wait / 1000; + timeout.tv_nsec = 1000000 * (timeout_wait % 1000); + + int res = pthread_cond_timedwait_relative_np(&(m_SharedState->m_Condition), &(m_SharedState->m_Mutex), &timeout); +#else + // Absolute timespec. + timespec timeout; + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_sec += timeout_wait / 1000; + timeout.tv_nsec += 1000000 * (timeout_wait % 1000); + + int res = pthread_cond_timedwait(&(m_SharedState->m_Condition), &(m_SharedState->m_Mutex), &timeout); +#endif // __APPLE__ + if (res == ETIMEDOUT && timer.GetTime() > (timeout_in_ms * 0.001)) + { + throw std::runtime_error("Waiting time has expired."); + } + + if (error_check != nullptr) + error_check(); + } +} + +void SynchronizationConditionVariable::Signal() +{ + pthread_cond_broadcast(&(m_SharedState->m_Condition)); +} + +void SynchronizationConditionVariable::Lock() +{ + pthread_mutex_lock(&(m_SharedState->m_Mutex)); +} + +void SynchronizationConditionVariable::Unlock() +{ + pthread_mutex_unlock(&(m_SharedState->m_Mutex)); +} + +void SynchronizationConditionVariable::CreateImpl(const std::string &id, SynchronizationConditionVariable::SharedState *shared_state) +{ + pthread_mutexattr_t mutex_attr; + pthread_mutexattr_init(&mutex_attr); + pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&(shared_state->m_Mutex), &mutex_attr); + pthread_mutexattr_destroy(&mutex_attr); + + pthread_condattr_t cond_attr; + pthread_condattr_init(&cond_attr); + pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC); +#endif // __APPLE__ + pthread_cond_init(&(shared_state->m_Condition), &cond_attr); + pthread_condattr_destroy(&cond_attr); +} + +#endif diff --git a/catkit_core/SynchronizationConditionVariable.h b/catkit_core/SynchronizationConditionVariable.h new file mode 100644 index 000000000..f2cabf908 --- /dev/null +++ b/catkit_core/SynchronizationConditionVariable.h @@ -0,0 +1,32 @@ +#ifndef SYNCHRONIZATION_CONDITION_VARIABLE_H +#define SYNCHRONIZATION_CONDITION_VARIABLE_H + +#include "Synchronization.h" + +#if defined(__linux__) || defined(__APPLE__) + +#include + +struct SharedStateConditionVariable +{ + pthread_cond_t m_Condition; + pthread_mutex_t m_Mutex; +}; + +class SynchronizationConditionVariable : public SynchronizationBase +{ + friend SynchronizationBase; +public: + void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); + void Signal(); + + void Lock(); + void Unlock(); + +protected: + void CreateImpl(const std::string &id, SynchronizationConditionVariable::SharedState *shared_state); +}; + +#endif // Linux or Apple + +#endif // SYNCHRONIZATION_CONDITION_VARIABLE_H From 93d68e22b872806cd5446e0c3a44edc5693c3ac4 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:32:45 -0800 Subject: [PATCH 06/42] Set default Synchronization method for DataStreams. --- catkit_core/DataStream.cpp | 2 +- catkit_core/DataStream.h | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/catkit_core/DataStream.cpp b/catkit_core/DataStream.cpp index 64c0f1924..b80754b6f 100644 --- a/catkit_core/DataStream.cpp +++ b/catkit_core/DataStream.cpp @@ -334,7 +334,7 @@ DataFrame DataStream::GetFrame(size_t id, long wait_time_in_ms, void (*error_che // Wait until frame becomes available. // Obtain a lock first. - auto lock = SynchronizationLock(&m_Synchronization); + auto lock = SynchronizationLock(m_Synchronization); m_Synchronization.Wait(wait_time_in_ms, [this, id]() { return this->m_Header->m_LastId > id; }, error_check); } diff --git a/catkit_core/DataStream.h b/catkit_core/DataStream.h index 322a2ea2d..440f26000 100644 --- a/catkit_core/DataStream.h +++ b/catkit_core/DataStream.h @@ -8,8 +8,16 @@ #include "SharedMemory.h" #include "Synchronization.h" +#include "SynchronizationConditionVariable.h" +#include "SynchronizationSemaphore.h" #include "Tensor.h" +#ifdef _WIN32 +using Synchronization = SynchronizationSemaphore; +#elif defined(__linux__) or defined(__APPLE__) +using Synchronization = SynchronizationConditionVariable; +#endif + const char * const CURRENT_DATASTREAM_VERSION = "0.2"; const size_t MAX_NUM_FRAMES_IN_BUFFER = 20; const long INFINITE_WAIT_TIME = LONG_MAX; @@ -46,7 +54,7 @@ struct DataStreamHeader double m_FrameRateCounter; - SynchronizationSharedData m_SynchronizationSharedData; + Synchronization::SharedState m_SynchronizationSharedData; }; class DataFrame : public Tensor From 9852b133e6ba75492d4353ccfdf0b6caf5b693a8 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:33:19 -0800 Subject: [PATCH 07/42] Compile synchronization methods. --- catkit_core/CMakeLists.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/catkit_core/CMakeLists.txt b/catkit_core/CMakeLists.txt index 6258d432e..4183440a6 100644 --- a/catkit_core/CMakeLists.txt +++ b/catkit_core/CMakeLists.txt @@ -14,7 +14,8 @@ add_compile_options($<$:/MP1>) add_library(catkit_core STATIC DataStream.cpp SharedMemory.cpp - Synchronization.cpp + SynchronizationConditionVariable.cpp + SynchronizationSemaphore.cpp Timing.cpp Log.cpp LogConsole.cpp From 0fbbf016cb31b4fb6686a34f9a33532939f66c29 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:48:33 -0800 Subject: [PATCH 08/42] Fix variable names. --- catkit_core/SynchronizationSemaphore.cpp | 10 +++++----- catkit_core/SynchronizationSemaphore.h | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp index 459821bae..68fdcab3d 100644 --- a/catkit_core/SynchronizationSemaphore.cpp +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -14,7 +14,7 @@ void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function co // is at least 1 after the increment. This can occur when a previous reader got // interrupted and the trigger happening before decrementing the // m_NumReadersWaiting counter. - while (m_SharedData->m_NumReadersWaiting++ < 0) + while (m_SharedState->m_NumReadersWaiting++ < 0) { } } @@ -24,13 +24,13 @@ void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function co if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) { - m_SharedData->m_NumReadersWaiting--; + m_SharedState->m_NumReadersWaiting--; throw std::runtime_error("Waiting time has expired."); } if (res == WAIT_FAILED) { - m_SharedData->m_NumReadersWaiting--; + m_SharedState->m_NumReadersWaiting--; throw std::runtime_error("An error occured during waiting for the semaphore: " + std::to_string(GetLastError())); } @@ -42,7 +42,7 @@ void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function co } catch (...) { - m_SharedData->m_NumReadersWaiting--; + m_SharedState->m_NumReadersWaiting--; throw; } } @@ -52,7 +52,7 @@ void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function co void SynchronizationSemaphore::Signal() { // Notify waiting processes. - long num_readers_waiting = m_SharedData->m_NumReadersWaiting.exchange(0); + long num_readers_waiting = m_SharedState->m_NumReadersWaiting.exchange(0); // If a reader times out in between us reading the number of readers that are waiting // and us releasing the semaphore, we are releasing one too many readers. This diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h index c2ad265d2..0b6881dac 100644 --- a/catkit_core/SynchronizationSemaphore.h +++ b/catkit_core/SynchronizationSemaphore.h @@ -12,12 +12,12 @@ #endif #ifdef _WIN32 -struct SharedDataWindowsSemaphore +struct SharedStateSemaphore { std::atomic_long m_NumReadersWaiting; }; -class SynchronizationWindowsSemaphore : public SynchronizationBase +class SynchronizationSemaphore : public SynchronizationBase { public: void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); From 0663382351ef669b4de3e1e105582512988858f0 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:50:51 -0800 Subject: [PATCH 09/42] Add missing includes. --- catkit_core/Synchronization.inl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/catkit_core/Synchronization.inl b/catkit_core/Synchronization.inl index be6f89571..1f842e62c 100644 --- a/catkit_core/Synchronization.inl +++ b/catkit_core/Synchronization.inl @@ -1,5 +1,8 @@ #include "Synchronization.h" +#include +#include + template SynchronizationLock::SynchronizationLock(T &sync) : m_Sync(sync) From bbc8822a05ea06c1621f7a4395bfa6df62443b13 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 6 Nov 2024 00:00:03 -0800 Subject: [PATCH 10/42] Fix the function pointer type. --- catkit_core/SynchronizationSemaphore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp index 68fdcab3d..563d44af2 100644 --- a/catkit_core/SynchronizationSemaphore.cpp +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -1,7 +1,7 @@ #include "SynchronizationSemaphore.h" #ifdef _WIN32 -void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void *(error_check)()) +void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { Timer timer; DWORD res = WAIT_OBJECT_0; From de41e7b5e40f5079b0aa7cb0a6990c4d80d6f1f2 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 6 Nov 2024 10:41:50 -0800 Subject: [PATCH 11/42] Add missing include. --- catkit_core/SynchronizationSemaphore.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp index 563d44af2..63a986ffb 100644 --- a/catkit_core/SynchronizationSemaphore.cpp +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -1,4 +1,5 @@ #include "SynchronizationSemaphore.h" +#include "Timing.h" #ifdef _WIN32 void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) From c1aabd8f43902ec190e7d23350ddd20b1433a972 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 6 Nov 2024 11:37:41 -0800 Subject: [PATCH 12/42] Fix function names. --- catkit_core/SynchronizationSemaphore.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h index 0b6881dac..778007bad 100644 --- a/catkit_core/SynchronizationSemaphore.h +++ b/catkit_core/SynchronizationSemaphore.h @@ -27,8 +27,8 @@ class SynchronizationSemaphore : public SynchronizationBase Date: Wed, 6 Nov 2024 12:11:00 -0800 Subject: [PATCH 13/42] Add friend classes and fix class name typos. --- catkit_core/SynchronizationConditionVariable.h | 1 + catkit_core/SynchronizationSemaphore.cpp | 4 ++-- catkit_core/SynchronizationSemaphore.h | 6 +++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/catkit_core/SynchronizationConditionVariable.h b/catkit_core/SynchronizationConditionVariable.h index f2cabf908..2027215c0 100644 --- a/catkit_core/SynchronizationConditionVariable.h +++ b/catkit_core/SynchronizationConditionVariable.h @@ -16,6 +16,7 @@ struct SharedStateConditionVariable class SynchronizationConditionVariable : public SynchronizationBase { friend SynchronizationBase; + public: void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); void Signal(); diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp index 63a986ffb..b381011ff 100644 --- a/catkit_core/SynchronizationSemaphore.cpp +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -71,7 +71,7 @@ void SynchronizationSemaphore::CreateImpl(const std::string &id, SharedState *sh if (m_Semaphore == NULL) throw std::runtime_error("Something went wrong while creating semaphore."); - shared_data->m_NumReadersWaiting = 0; + shared_state->m_NumReadersWaiting = 0; } void SynchronizationSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) @@ -82,7 +82,7 @@ void SynchronizationSemaphore::OpenImpl(const std::string &id, SharedState *shar throw std::runtime_error("Something went wrong while opening semaphore."); } -void SynchronizationSempahore::CloseImpl() +void SynchronizationSemaphore::CloseImpl() { CloseHandle(m_Semaphore); } diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h index 778007bad..46cd2f923 100644 --- a/catkit_core/SynchronizationSemaphore.h +++ b/catkit_core/SynchronizationSemaphore.h @@ -19,16 +19,16 @@ struct SharedStateSemaphore class SynchronizationSemaphore : public SynchronizationBase { + friend SynchronizationBase; + public: void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); void Signal(); - void Lock(); - void Unlock(); - protected: void CreateImpl(const std::string &id, SharedState *shared_state); void OpenImpl(const std::string &id, SharedState *shared_state); + void CloseImpl(); HANDLE m_Semaphore; }; From 72eea33c96ef085326c47f3fc23eae6efb3c7bf7 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 22 Jan 2025 22:17:05 -0800 Subject: [PATCH 14/42] Refactor to use template specialization instead of CRTP. --- catkit_core/Event.h | 13 ++ catkit_core/EventBase.h | 75 +++++++++++ ...ariable.cpp => EventConditionVariable.inl} | 0 catkit_core/EventFutex.inl | 0 ...zationSemaphore.cpp => EventSemaphore.inl} | 0 catkit_core/EventSpinLock.inl | 0 catkit_core/Synchronization.h | 58 -------- catkit_core/Synchronization.inl | 124 ------------------ .../SynchronizationConditionVariable.h | 33 ----- catkit_core/SynchronizationSemaphore.h | 43 ------ 10 files changed, 88 insertions(+), 258 deletions(-) create mode 100644 catkit_core/Event.h create mode 100644 catkit_core/EventBase.h rename catkit_core/{SynchronizationConditionVariable.cpp => EventConditionVariable.inl} (100%) create mode 100644 catkit_core/EventFutex.inl rename catkit_core/{SynchronizationSemaphore.cpp => EventSemaphore.inl} (100%) create mode 100644 catkit_core/EventSpinLock.inl delete mode 100644 catkit_core/Synchronization.h delete mode 100644 catkit_core/Synchronization.inl delete mode 100644 catkit_core/SynchronizationConditionVariable.h delete mode 100644 catkit_core/SynchronizationSemaphore.h diff --git a/catkit_core/Event.h b/catkit_core/Event.h new file mode 100644 index 000000000..157f7712f --- /dev/null +++ b/catkit_core/Event.h @@ -0,0 +1,13 @@ +#ifndef EVENT_H +#define EVENT_H + +#include "EventBase.h" + +// Select which implementation to use based on the platform. +#ifdef _WIN32 +using Event = EventImpl; +#else +using Event = EventImpl; +#endif + +#endif // EVENT_H diff --git a/catkit_core/EventBase.h b/catkit_core/EventBase.h new file mode 100644 index 000000000..5cd91f6d4 --- /dev/null +++ b/catkit_core/EventBase.h @@ -0,0 +1,75 @@ +#ifndef EVENT_BASE_H +#define EVENT_BASE_H + +#include + +enum EventImplementationType +{ + ET_CONDITION_VARIABLE, + ET_FUTEX, + ET_SEMAPHORE, + ET_SPIN_LOCK +}; + +template +struct EventSharedState +{ +}; + +template +struct EventLocalState +{ +}; + +template +class EventImpl +{ +public: + using SharedState = EventSharedState; + using LocalState = EventLocalState; + +protected: + EventImpl(); + +public: + static std::unique_ptr> Create(const std::string &id, SharedState *shared_state); + static std::unique_ptr> Open(const std::string &id, SharedState *shared_state); + + void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); + void Signal(); + + void Lock(); + void Unlock(); + +protected: + SharedState *m_SharedState; + LocalState m_LocalState; +}; + +template +void EventImpl::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ + throw std::runtime_error("This type of event implementation wasn't implemented."); +} + +template +void EventImpl::Signal() +{ +} + +template +void EventImpl::Lock() +{ +} + +template +void EventImpl::Unlock() +{ +} + +#include "EventConditionVariable.inl" +#include "EventFutex.inl" +#include "EventSemaphore.inl" +#include "EventSpinLock.inl" + +#endif // EVENT_BASE_H diff --git a/catkit_core/SynchronizationConditionVariable.cpp b/catkit_core/EventConditionVariable.inl similarity index 100% rename from catkit_core/SynchronizationConditionVariable.cpp rename to catkit_core/EventConditionVariable.inl diff --git a/catkit_core/EventFutex.inl b/catkit_core/EventFutex.inl new file mode 100644 index 000000000..e69de29bb diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/EventSemaphore.inl similarity index 100% rename from catkit_core/SynchronizationSemaphore.cpp rename to catkit_core/EventSemaphore.inl diff --git a/catkit_core/EventSpinLock.inl b/catkit_core/EventSpinLock.inl new file mode 100644 index 000000000..e69de29bb diff --git a/catkit_core/Synchronization.h b/catkit_core/Synchronization.h deleted file mode 100644 index a00fce299..000000000 --- a/catkit_core/Synchronization.h +++ /dev/null @@ -1,58 +0,0 @@ -#ifndef SYNCHRONZATION_H -#define SYNCHRONZATION_H - -#include -#include -#include -#include - -template -class SynchronizationBase -{ -public: - using SharedState = SharedStateType; - - SynchronizationBase(); - SynchronizationBase(const SynchronizationBase &other) = delete; - ~SynchronizationBase(); - - SynchronizationBase &operator=(const SynchronizationBase &other) = delete; - - void Initialize(const std::string &id, SharedState *shared_state, bool create); - - void Create(const std::string &id, SharedState *shared_state); - - void Open(const std::string &id, SharedState *shared_state); - void Close(); - - void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); - void Signal(); - - void Lock(); - void Unlock(); - -protected: - void CreateImpl(const std::string &id, SharedState *shared_state); - - void OpenImpl(const std::string &id, SharedState *shared_state); - void CloseImpl(); - - bool m_IsOwner; - bool m_IsOpen; - SharedState *m_SharedState; -}; - -template -class SynchronizationLock -{ -public: - SynchronizationLock(T &sync); - ~SynchronizationLock(); - -private: - T &m_Sync; -}; - -#include "Synchronization.inl" - -#endif // SYNCHRONIZATION_H diff --git a/catkit_core/Synchronization.inl b/catkit_core/Synchronization.inl deleted file mode 100644 index 1f842e62c..000000000 --- a/catkit_core/Synchronization.inl +++ /dev/null @@ -1,124 +0,0 @@ -#include "Synchronization.h" - -#include -#include - -template -SynchronizationLock::SynchronizationLock(T &sync) - : m_Sync(sync) -{ - m_Sync.Lock(); -} - -template -SynchronizationLock::~SynchronizationLock() -{ - m_Sync.Unlock(); -} - -template -SynchronizationBase::SynchronizationBase() - : m_IsOwner(false), m_SharedState(nullptr), m_IsOpen(false) -{ -} - -template -SynchronizationBase::~SynchronizationBase() -{ - Close(); -} - -template -void SynchronizationBase::Initialize(const std::string &id, SharedState *shared_state, bool create) -{ - if (create) - { - Create(id, shared_state); - } - else - { - Open(id, shared_state); - } -} - -template -void SynchronizationBase::Create(const std::string &id, SharedState *shared_state) -{ - if (m_IsOpen) - throw std::runtime_error("Create called on an already initialized Synchronization object."); - - if (!shared_state) - throw std::runtime_error("The passed shared data was a nullptr."); - - static_cast(this)->CreateImpl(id, shared_state); - - m_IsOpen = true; - m_IsOwner = true; - - m_SharedState = shared_state; -} - -template -void SynchronizationBase::Open(const std::string &id, SharedState *shared_state) -{ - if (m_IsOpen) - throw std::runtime_error("Open called on an already initialized Synchronization object."); - - if (!shared_state) - throw std::runtime_error("The passed shared data was a nullptr."); - - - static_cast(this)->CreateImpl(id, shared_state); - - m_IsOpen = true; - m_IsOwner = false; - - m_SharedState = shared_state; -} - -template -void SynchronizationBase::Close() -{ - if (!m_IsOpen) - return; - - static_cast(this)->CloseImpl(); - - m_IsOpen = false; - m_SharedState = nullptr; -} - -template -void SynchronizationBase::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) -{ -} - -template -void SynchronizationBase::Signal() -{ -} - -template -void SynchronizationBase::Lock() -{ -} - -template -void SynchronizationBase::Unlock() -{ -} - -template -void SynchronizationBase::CreateImpl(const std::string &id, SharedState *shared_state) -{ -} - -template -void SynchronizationBase::OpenImpl(const std::string &id, SharedState *shared_state) -{ -} - -template -void SynchronizationBase::CloseImpl() -{ -} diff --git a/catkit_core/SynchronizationConditionVariable.h b/catkit_core/SynchronizationConditionVariable.h deleted file mode 100644 index 2027215c0..000000000 --- a/catkit_core/SynchronizationConditionVariable.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef SYNCHRONIZATION_CONDITION_VARIABLE_H -#define SYNCHRONIZATION_CONDITION_VARIABLE_H - -#include "Synchronization.h" - -#if defined(__linux__) || defined(__APPLE__) - -#include - -struct SharedStateConditionVariable -{ - pthread_cond_t m_Condition; - pthread_mutex_t m_Mutex; -}; - -class SynchronizationConditionVariable : public SynchronizationBase -{ - friend SynchronizationBase; - -public: - void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); - void Signal(); - - void Lock(); - void Unlock(); - -protected: - void CreateImpl(const std::string &id, SynchronizationConditionVariable::SharedState *shared_state); -}; - -#endif // Linux or Apple - -#endif // SYNCHRONIZATION_CONDITION_VARIABLE_H diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h deleted file mode 100644 index 46cd2f923..000000000 --- a/catkit_core/SynchronizationSemaphore.h +++ /dev/null @@ -1,43 +0,0 @@ -#ifndef SYNCHRONIZATION_SEMAPHORE_H -#define SYNCHRONIZATION_SEMAPHORE_H - -#include "Synchronization.h" - -#include - -#ifdef _WIN32 - #define WIN32_LEAN_AND_MEAN - #define NOMINMAX - #include -#endif - -#ifdef _WIN32 -struct SharedStateSemaphore -{ - std::atomic_long m_NumReadersWaiting; -}; - -class SynchronizationSemaphore : public SynchronizationBase -{ - friend SynchronizationBase; - -public: - void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); - void Signal(); - -protected: - void CreateImpl(const std::string &id, SharedState *shared_state); - void OpenImpl(const std::string &id, SharedState *shared_state); - void CloseImpl(); - - HANDLE m_Semaphore; -}; -#endif // _WIN32 - -#ifdef __linux__ -#endif // __linux__ - -#ifdef __APPLE__ -#endif // __APPLE__ - -# endif // SYNCHRONIZATION_SEMAPHORE_H From 95233424e3d8a1fc6168cf42b6316ea28dbe0149 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 22 Jan 2025 23:16:04 -0800 Subject: [PATCH 15/42] Refactor into Event implementation structure. --- catkit_core/EventBase.h | 46 ++++++++++++++++++++++++++ catkit_core/EventConditionVariable.inl | 26 +++++++++++---- catkit_core/EventSemaphore.inl | 25 ++++++++++---- 3 files changed, 85 insertions(+), 12 deletions(-) diff --git a/catkit_core/EventBase.h b/catkit_core/EventBase.h index 5cd91f6d4..92529986a 100644 --- a/catkit_core/EventBase.h +++ b/catkit_core/EventBase.h @@ -32,6 +32,8 @@ class EventImpl EventImpl(); public: + ~EventImpl(); + static std::unique_ptr> Create(const std::string &id, SharedState *shared_state); static std::unique_ptr> Open(const std::string &id, SharedState *shared_state); @@ -42,10 +44,26 @@ class EventImpl void Unlock(); protected: + void CreateImpl(const std::string &id, SharedState *shared_state); + void OpenImpl(const std::string &id, SharedState *shared_state); + + bool m_IsOwner; + SharedState *m_SharedState; LocalState m_LocalState; }; +template +EventImpl::EventImpl() + : m_IsOwner(false), m_SharedState(nullptr) +{ +} + +template +EventImpl::~EventImpl() +{ +} + template void EventImpl::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { @@ -67,6 +85,34 @@ void EventImpl::Unlock() { } +template +void EventImpl::CreateImpl(const std::string &id, EventImpl::SharedState *shared_state) +{ + if (!shared_state) + throw std::runtime_error("The passed shared data was a nullptr."); + + auto obj = std::make_shared>(); + + obj->CreateImpl(id, shared_state); + + obj->m_IsOwner = true; + + obj->m_SharedState = shared_state; +} + +template +void EventImpl::OpenImpl(const std::string &id, EventImpl::SharedState *shared_state) +{ + if (!shared_state) + throw std::runtime_error("The passed shared data was a nullptr."); + + OpenImpl(id, shared_state); + + m_IsOwner = false; + + m_SharedState = shared_state; +} + #include "EventConditionVariable.inl" #include "EventFutex.inl" #include "EventSemaphore.inl" diff --git a/catkit_core/EventConditionVariable.inl b/catkit_core/EventConditionVariable.inl index 8bc97eaff..3be912780 100644 --- a/catkit_core/EventConditionVariable.inl +++ b/catkit_core/EventConditionVariable.inl @@ -1,10 +1,20 @@ -#include "SynchronizationConditionVariable.h" +#include "EventBase.h" #include "Timing.h" +using EventConditionVariable = EventImpl; + #if defined(__linux__) || defined(__APPLE__) -void SynchronizationConditionVariable::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +template<> +struct EventSharedState +{ + pthread_mutex_t m_Mutex; + pthread_cond_t m_Condition; +}; + +template<> +void EventConditionVariable::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { Timer timer; @@ -39,22 +49,26 @@ void SynchronizationConditionVariable::Wait(long timeout_in_ms, std::function +void EventConditionVariable::Signal() { pthread_cond_broadcast(&(m_SharedState->m_Condition)); } -void SynchronizationConditionVariable::Lock() +template<> +void EventConditionVariable::Lock() { pthread_mutex_lock(&(m_SharedState->m_Mutex)); } -void SynchronizationConditionVariable::Unlock() +template<> +void EventConditionVariable::Unlock() { pthread_mutex_unlock(&(m_SharedState->m_Mutex)); } -void SynchronizationConditionVariable::CreateImpl(const std::string &id, SynchronizationConditionVariable::SharedState *shared_state) +template<> +void EventConditionVariable::CreateImpl(const std::string &id, EventConditionVariable::SharedState *shared_state) { pthread_mutexattr_t mutex_attr; pthread_mutexattr_init(&mutex_attr); diff --git a/catkit_core/EventSemaphore.inl b/catkit_core/EventSemaphore.inl index b381011ff..dc079e89e 100644 --- a/catkit_core/EventSemaphore.inl +++ b/catkit_core/EventSemaphore.inl @@ -1,8 +1,18 @@ -#include "SynchronizationSemaphore.h" +#include "EventBase.h" + #include "Timing.h" +using EventSemaphore = EventImpl; + #ifdef _WIN32 -void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +template<> +struct EventSharedState +{ + std::atomic_long m_NumReadersWaiting; +}; + +template<> +void EventSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { Timer timer; DWORD res = WAIT_OBJECT_0; @@ -50,7 +60,8 @@ void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function co } } -void SynchronizationSemaphore::Signal() +template<> +void EventSemaphore::Signal() { // Notify waiting processes. long num_readers_waiting = m_SharedState->m_NumReadersWaiting.exchange(0); @@ -64,7 +75,8 @@ void SynchronizationSemaphore::Signal() ReleaseSemaphore(m_Semaphore, (LONG) num_readers_waiting, NULL); } -void SynchronizationSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) +template<> +void EventSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) { m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); @@ -74,7 +86,8 @@ void SynchronizationSemaphore::CreateImpl(const std::string &id, SharedState *sh shared_state->m_NumReadersWaiting = 0; } -void SynchronizationSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) +template<> +void EventSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) { m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); @@ -82,7 +95,7 @@ void SynchronizationSemaphore::OpenImpl(const std::string &id, SharedState *shar throw std::runtime_error("Something went wrong while opening semaphore."); } -void SynchronizationSemaphore::CloseImpl() +void EventSemaphore::~EventImpl() { CloseHandle(m_Semaphore); } From f6f51577fb94339dd46edbf9b34ac6adb4997513 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 22 Jan 2025 23:16:20 -0800 Subject: [PATCH 16/42] Remove non-existent files. --- catkit_core/CMakeLists.txt | 2 -- 1 file changed, 2 deletions(-) diff --git a/catkit_core/CMakeLists.txt b/catkit_core/CMakeLists.txt index 4183440a6..b18ae6a99 100644 --- a/catkit_core/CMakeLists.txt +++ b/catkit_core/CMakeLists.txt @@ -14,8 +14,6 @@ add_compile_options($<$:/MP1>) add_library(catkit_core STATIC DataStream.cpp SharedMemory.cpp - SynchronizationConditionVariable.cpp - SynchronizationSemaphore.cpp Timing.cpp Log.cpp LogConsole.cpp From e47ed9ec3e98deae4bbf9581044e208aac42cf9e Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 00:33:48 -0800 Subject: [PATCH 17/42] Fix base implementation of Create() and Open(). --- catkit_core/EventBase.h | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/catkit_core/EventBase.h b/catkit_core/EventBase.h index 92529986a..3b6ceef5b 100644 --- a/catkit_core/EventBase.h +++ b/catkit_core/EventBase.h @@ -86,31 +86,35 @@ void EventImpl::Unlock() } template -void EventImpl::CreateImpl(const std::string &id, EventImpl::SharedState *shared_state) +std::unique_ptr> EventImpl::Create(const std::string &id, EventImpl::SharedState *shared_state) { if (!shared_state) throw std::runtime_error("The passed shared data was a nullptr."); - auto obj = std::make_shared>(); + auto obj = std::unique_ptr>(new EventImpl()); obj->CreateImpl(id, shared_state); obj->m_IsOwner = true; - obj->m_SharedState = shared_state; + + return obj; } template -void EventImpl::OpenImpl(const std::string &id, EventImpl::SharedState *shared_state) +std::unique_ptr> EventImpl::Open(const std::string &id, EventImpl::SharedState *shared_state) { if (!shared_state) throw std::runtime_error("The passed shared data was a nullptr."); - OpenImpl(id, shared_state); + auto obj = std::unique_ptr>(new EventImpl()); - m_IsOwner = false; + obj->OpenImpl(id, shared_state); + + obj->m_IsOwner = false; + obj->m_SharedState = shared_state; - m_SharedState = shared_state; + return obj; } #include "EventConditionVariable.inl" From 6a408ddf3333fd17a32fd37bfc81cd0bff785a74 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 00:34:09 -0800 Subject: [PATCH 18/42] Implement a lock guard for events. --- catkit_core/Event.h | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/catkit_core/Event.h b/catkit_core/Event.h index 157f7712f..faa099745 100644 --- a/catkit_core/Event.h +++ b/catkit_core/Event.h @@ -3,10 +3,29 @@ #include "EventBase.h" +template +class EventLockGuard +{ +public: + inline EventLockGuard(Event &event) + : m_Event(event) + { + m_Event->Lock(); + } + + inline ~EventLockGuard() + { + m_Event->Unlock(); + } + +private: + Event &m_Event; +}; + // Select which implementation to use based on the platform. #ifdef _WIN32 using Event = EventImpl; -#else +#elif defined(__linux__) or defined(__APPLE__) using Event = EventImpl; #endif From a8580c10aba68ba0f73e4f1fd92b69322e2dd34b Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 00:34:26 -0800 Subject: [PATCH 19/42] Make empty template specializations inline. --- catkit_core/EventBase.h | 14 ++++++++------ catkit_core/EventConditionVariable.inl | 16 +++++++++++----- catkit_core/EventSemaphore.inl | 11 ++++++----- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/catkit_core/EventBase.h b/catkit_core/EventBase.h index 3b6ceef5b..263dc4c55 100644 --- a/catkit_core/EventBase.h +++ b/catkit_core/EventBase.h @@ -34,18 +34,20 @@ class EventImpl public: ~EventImpl(); + // Note: do not implement these functions for specific implementations. static std::unique_ptr> Create(const std::string &id, SharedState *shared_state); static std::unique_ptr> Open(const std::string &id, SharedState *shared_state); - void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); - void Signal(); + // Note: implement the following functions for specific implementations. + inline void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); + inline void Signal(); - void Lock(); - void Unlock(); + inline void Lock(); + inline void Unlock(); protected: - void CreateImpl(const std::string &id, SharedState *shared_state); - void OpenImpl(const std::string &id, SharedState *shared_state); + inline void CreateImpl(const std::string &id, SharedState *shared_state); + inline void OpenImpl(const std::string &id, SharedState *shared_state); bool m_IsOwner; diff --git a/catkit_core/EventConditionVariable.inl b/catkit_core/EventConditionVariable.inl index 3be912780..da7ce818a 100644 --- a/catkit_core/EventConditionVariable.inl +++ b/catkit_core/EventConditionVariable.inl @@ -14,7 +14,7 @@ struct EventSharedState }; template<> -void EventConditionVariable::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +inline void EventConditionVariable::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { Timer timer; @@ -50,25 +50,25 @@ void EventConditionVariable::Wait(long timeout_in_ms, std::function cond } template<> -void EventConditionVariable::Signal() +inline void EventConditionVariable::Signal() { pthread_cond_broadcast(&(m_SharedState->m_Condition)); } template<> -void EventConditionVariable::Lock() +inline void EventConditionVariable::Lock() { pthread_mutex_lock(&(m_SharedState->m_Mutex)); } template<> -void EventConditionVariable::Unlock() +inline void EventConditionVariable::Unlock() { pthread_mutex_unlock(&(m_SharedState->m_Mutex)); } template<> -void EventConditionVariable::CreateImpl(const std::string &id, EventConditionVariable::SharedState *shared_state) +inline void EventConditionVariable::CreateImpl(const std::string &id, EventConditionVariable::SharedState *shared_state) { pthread_mutexattr_t mutex_attr; pthread_mutexattr_init(&mutex_attr); @@ -86,4 +86,10 @@ void EventConditionVariable::CreateImpl(const std::string &id, EventConditionVar pthread_condattr_destroy(&cond_attr); } +template<> +inline void EventConditionVariable::OpenImpl(const std::string &id, EventConditionVariable::SharedState *shared_state) +{ + // Nothing to do. +} + #endif diff --git a/catkit_core/EventSemaphore.inl b/catkit_core/EventSemaphore.inl index dc079e89e..9868d2b6f 100644 --- a/catkit_core/EventSemaphore.inl +++ b/catkit_core/EventSemaphore.inl @@ -12,7 +12,7 @@ struct EventSharedState }; template<> -void EventSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +inline void EventSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { Timer timer; DWORD res = WAIT_OBJECT_0; @@ -61,7 +61,7 @@ void EventSemaphore::Wait(long timeout_in_ms, std::function condition, v } template<> -void EventSemaphore::Signal() +inline void EventSemaphore::Signal() { // Notify waiting processes. long num_readers_waiting = m_SharedState->m_NumReadersWaiting.exchange(0); @@ -76,7 +76,7 @@ void EventSemaphore::Signal() } template<> -void EventSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) +inline void EventSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) { m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); @@ -87,7 +87,7 @@ void EventSemaphore::CreateImpl(const std::string &id, SharedState *shared_state } template<> -void EventSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) +inline void EventSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) { m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); @@ -95,7 +95,8 @@ void EventSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) throw std::runtime_error("Something went wrong while opening semaphore."); } -void EventSemaphore::~EventImpl() +template<> +inline void EventSemaphore::~EventImpl() { CloseHandle(m_Semaphore); } From 68102ab963685628665c2f2be45a99794299204c Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 00:34:48 -0800 Subject: [PATCH 20/42] Use Events rather than Synchronization classes. --- catkit_core/DataStream.cpp | 15 +++++++++------ catkit_core/DataStream.h | 14 +++----------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/catkit_core/DataStream.cpp b/catkit_core/DataStream.cpp index b80754b6f..9bac19bbc 100644 --- a/catkit_core/DataStream.cpp +++ b/catkit_core/DataStream.cpp @@ -68,6 +68,7 @@ void CopyString(char *dest, const char *src, size_t n) DataStream::DataStream(const std::string &stream_id, std::shared_ptr shared_memory, bool create) : m_SharedMemory(shared_memory), + m_Event(nullptr), m_Header(nullptr), m_Buffer(nullptr), m_NextFrameIdToRead(0), m_BufferHandlingMode(BM_NEWEST_ONLY) @@ -75,8 +76,6 @@ DataStream::DataStream(const std::string &stream_id, std::shared_ptrGetAddress(); m_Header = (DataStreamHeader *) buffer; m_Buffer = ((char *) buffer) + sizeof(DataStreamHeader); - - m_Synchronization.Initialize(stream_id, &(m_Header->m_SynchronizationSharedData), create); } DataStream::~DataStream() @@ -119,6 +118,8 @@ std::shared_ptr DataStream::Create(const std::string &stream_name, c data_stream->UpdateParameters(type, dimensions, num_frames_in_buffer); + data_stream->m_Event = Event::Create(stream_id, &(header->m_EventSharedState)); + return data_stream; } @@ -141,6 +142,8 @@ std::shared_ptr DataStream::Open(const std::string &stream_id) // Don't read frames that already are available at the time the data stream is opened. data_stream->m_NextFrameIdToRead = data_stream->m_Header->m_LastId; + data_stream->m_Event = Event::Open(stream_id, &(data_stream->m_Header->m_EventSharedState)); + return data_stream; } @@ -176,7 +179,7 @@ void DataStream::SubmitFrame(size_t id) { // Obtain a lock as we are about to modify the condition of the // synchronization. - auto lock = SynchronizationLock(m_Synchronization); + auto lock = EventLockGuard(m_Event); // Make frame available: // Use a do-while loop to ensure we are never decrementing the last id. @@ -189,7 +192,7 @@ void DataStream::SubmitFrame(size_t id) break; } while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1)); - m_Synchronization.Signal(); + m_Event->Signal(); } auto ts = GetTimeStamp(); @@ -334,8 +337,8 @@ DataFrame DataStream::GetFrame(size_t id, long wait_time_in_ms, void (*error_che // Wait until frame becomes available. // Obtain a lock first. - auto lock = SynchronizationLock(m_Synchronization); - m_Synchronization.Wait(wait_time_in_ms, [this, id]() { return this->m_Header->m_LastId > id; }, error_check); + auto lock = EventLockGuard(m_Event); + m_Event->Wait(wait_time_in_ms, [this, id]() { return this->m_Header->m_LastId > id; }, error_check); } size_t offset = (id % m_Header->m_NumFramesInBuffer) * m_Header->m_NumBytesPerFrame; diff --git a/catkit_core/DataStream.h b/catkit_core/DataStream.h index 440f26000..66968d444 100644 --- a/catkit_core/DataStream.h +++ b/catkit_core/DataStream.h @@ -7,17 +7,9 @@ #include #include "SharedMemory.h" -#include "Synchronization.h" -#include "SynchronizationConditionVariable.h" -#include "SynchronizationSemaphore.h" +#include "Event.h" #include "Tensor.h" -#ifdef _WIN32 -using Synchronization = SynchronizationSemaphore; -#elif defined(__linux__) or defined(__APPLE__) -using Synchronization = SynchronizationConditionVariable; -#endif - const char * const CURRENT_DATASTREAM_VERSION = "0.2"; const size_t MAX_NUM_FRAMES_IN_BUFFER = 20; const long INFINITE_WAIT_TIME = LONG_MAX; @@ -54,7 +46,7 @@ struct DataStreamHeader double m_FrameRateCounter; - Synchronization::SharedState m_SynchronizationSharedData; + Event::SharedState m_EventSharedState; }; class DataFrame : public Tensor @@ -125,7 +117,7 @@ class DataStream DataStreamHeader *m_Header; char *m_Buffer; - Synchronization m_Synchronization; + std::shared_ptr m_Event; size_t m_NextFrameIdToRead; BufferHandlingMode m_BufferHandlingMode; From c67223a30f0a1d0692f0eeaa16647866e9247f0f Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 00:45:48 -0800 Subject: [PATCH 21/42] Use local state structure. --- catkit_core/EventSemaphore.inl | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/catkit_core/EventSemaphore.inl b/catkit_core/EventSemaphore.inl index 9868d2b6f..f5eecf6c6 100644 --- a/catkit_core/EventSemaphore.inl +++ b/catkit_core/EventSemaphore.inl @@ -11,6 +11,12 @@ struct EventSharedState std::atomic_long m_NumReadersWaiting; }; +template<> +struct EventLocalState +{ + HANDLE m_Semaphore; +}; + template<> inline void EventSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { @@ -31,7 +37,7 @@ inline void EventSemaphore::Wait(long timeout_in_ms, std::function condi } // Wait for a maximum of 20ms to perform periodic error checking. - auto res = WaitForSingleObject(m_Semaphore, (unsigned long) (std::min)(20L, timeout_in_ms)); + res = WaitForSingleObject(m_LocalState.m_Semaphore, (unsigned long) (std::min)(20L, timeout_in_ms)); if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) { @@ -72,15 +78,15 @@ inline void EventSemaphore::Signal() // as there are checks in place for that. if (num_readers_waiting > 0) - ReleaseSemaphore(m_Semaphore, (LONG) num_readers_waiting, NULL); + ReleaseSemaphore(m_LocalState.m_Semaphore, (LONG) num_readers_waiting, NULL); } template<> inline void EventSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) { - m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); + m_LocalState.m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); - if (m_Semaphore == NULL) + if (m_LocalState.m_Semaphore == NULL) throw std::runtime_error("Something went wrong while creating semaphore."); shared_state->m_NumReadersWaiting = 0; @@ -89,16 +95,16 @@ inline void EventSemaphore::CreateImpl(const std::string &id, SharedState *share template<> inline void EventSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) { - m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); + m_LocalState.m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); - if (m_Semaphore == NULL) + if (m_LocalState.m_Semaphore == NULL) throw std::runtime_error("Something went wrong while opening semaphore."); } template<> -inline void EventSemaphore::~EventImpl() +inline EventSemaphore::~EventImpl() { - CloseHandle(m_Semaphore); + CloseHandle(m_LocalState.m_Semaphore); } #ifdef __linux__ From ac53f12cb3dc2fcd0179a5c861f3db18ed48896a Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 00:46:13 -0800 Subject: [PATCH 22/42] Add missing include. --- catkit_core/EventBase.h | 1 + 1 file changed, 1 insertion(+) diff --git a/catkit_core/EventBase.h b/catkit_core/EventBase.h index 263dc4c55..0c39a0bff 100644 --- a/catkit_core/EventBase.h +++ b/catkit_core/EventBase.h @@ -2,6 +2,7 @@ #define EVENT_BASE_H #include +#include enum EventImplementationType { From 79898078afa9f69f6281a2fdb0e2762078d74238 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 12:58:33 -0800 Subject: [PATCH 23/42] Add missing includes. --- catkit_core/EventConditionVariable.inl | 4 ++++ catkit_core/EventSemaphore.inl | 7 +++++++ 2 files changed, 11 insertions(+) diff --git a/catkit_core/EventConditionVariable.inl b/catkit_core/EventConditionVariable.inl index da7ce818a..881f13ed0 100644 --- a/catkit_core/EventConditionVariable.inl +++ b/catkit_core/EventConditionVariable.inl @@ -2,6 +2,10 @@ #include "Timing.h" +#if defined(__linux__) || defined(__APPLE__) + #include +#endif + using EventConditionVariable = EventImpl; #if defined(__linux__) || defined(__APPLE__) diff --git a/catkit_core/EventSemaphore.inl b/catkit_core/EventSemaphore.inl index f5eecf6c6..1bff3e115 100644 --- a/catkit_core/EventSemaphore.inl +++ b/catkit_core/EventSemaphore.inl @@ -2,9 +2,16 @@ #include "Timing.h" +#ifdef _WIN32 + #define WIN32_LEAN_AND_MEAN + #define NOMINMAX + #include +#endif // _WIN32 + using EventSemaphore = EventImpl; #ifdef _WIN32 + template<> struct EventSharedState { From b7ae351ddd9501336d073c21478481647d04de7b Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Mon, 16 Dec 2024 17:02:30 -0800 Subject: [PATCH 24/42] Add UUID generator based on two MT19937 pseudo-random number generators seeded by true randomness. --- benchmarks/uuid_generator.cpp | 32 ++++++++++++++++++++++++++++++++ catkit_core/CMakeLists.txt | 1 + catkit_core/UuidGenerator.cpp | 21 +++++++++++++++++++++ catkit_core/UuidGenerator.h | 17 +++++++++++++++++ 4 files changed, 71 insertions(+) create mode 100644 benchmarks/uuid_generator.cpp create mode 100644 catkit_core/UuidGenerator.cpp create mode 100644 catkit_core/UuidGenerator.h diff --git a/benchmarks/uuid_generator.cpp b/benchmarks/uuid_generator.cpp new file mode 100644 index 000000000..bb2879cf2 --- /dev/null +++ b/benchmarks/uuid_generator.cpp @@ -0,0 +1,32 @@ +#include "UuidGenerator.h" +#include "Timing.h" + +#include + +int main() +{ + const size_t N = 100000000; + + UuidGenerator generator; + + char uuid[16]; + + std::cout << std::hex; + + auto start = GetTimeStamp(); + + for (size_t i = 0; i < N; ++i) + { + generator.GenerateUuid(uuid); + } + + auto end = GetTimeStamp(); + + std::cout << std::dec; + + std::cout << "Time: " << (end - start) / 1e9 << " sec" << std::endl; + std::cout << "Throughput: " << N / ((end - start) / 1e9) << " ops/s" << std::endl; + std::cout << "Time per operation: " << (end - start) / N << " ns" << std::endl; + + return 0; +} diff --git a/catkit_core/CMakeLists.txt b/catkit_core/CMakeLists.txt index b18ae6a99..fb937ff85 100644 --- a/catkit_core/CMakeLists.txt +++ b/catkit_core/CMakeLists.txt @@ -34,6 +34,7 @@ add_library(catkit_core STATIC Util.cpp PoolAllocator.cpp FreeListAllocator.cpp + UuidGenerator.cpp proto/core.pb.cc proto/logging.pb.cc proto/testbed.pb.cc diff --git a/catkit_core/UuidGenerator.cpp b/catkit_core/UuidGenerator.cpp new file mode 100644 index 000000000..ea68bcaf8 --- /dev/null +++ b/catkit_core/UuidGenerator.cpp @@ -0,0 +1,21 @@ +#include "UuidGenerator.h" + +UuidGenerator::UuidGenerator() +{ + std::random_device random_device; + + for (size_t i = 0; i < 2; ++i) + { + std::seed_seq seed{random_device(), random_device()}; + m_Engines[i].seed(seed); + } +} + +void UuidGenerator::GenerateUuid(char *uuid) +{ + for (size_t i = 0; i < 2; ++i) + { + std::uint64_t value = m_Engines[i](); + *reinterpret_cast(uuid + i * 8) = value; + } +} diff --git a/catkit_core/UuidGenerator.h b/catkit_core/UuidGenerator.h new file mode 100644 index 000000000..244b828d9 --- /dev/null +++ b/catkit_core/UuidGenerator.h @@ -0,0 +1,17 @@ +#ifndef UUID_GENERATOR_H +#define UUID_GENERATOR_H + +#include + +class UuidGenerator +{ +public: + UuidGenerator(); + + void GenerateUuid(char *uuid); + +private: + std::mt19937_64 m_Engines[2]; +}; + +#endif // UUID_GENERATOR_H From d5abc376be3d753e1f8363942185f193d25f9a77 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Mon, 16 Dec 2024 17:08:43 -0800 Subject: [PATCH 25/42] Add interface for Cuda shared memory. --- catkit_core/CudaSharedMemory.h | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 catkit_core/CudaSharedMemory.h diff --git a/catkit_core/CudaSharedMemory.h b/catkit_core/CudaSharedMemory.h new file mode 100644 index 000000000..98899e2d5 --- /dev/null +++ b/catkit_core/CudaSharedMemory.h @@ -0,0 +1,28 @@ +#ifndef CUDA_SHARED_MEMORY_H +#define CUDA_SHARED_MEMORY_H + +#include + +#ifdef HAVE_CUDA +#include +typedef cudaIpcMemHandle_t CudaIpcHandle; +#else +// CUDA cudaIpcMemHandle_t is a struct of 64 bytes. +typedef char CudaIpcHandle[64]; +#endif + +class CudaSharedMemory +{ +private: + CudaSharedMemory(const CudaIpcHandle &ipc_handle, void *device_pointer=nullptr); + +public: + ~CudaSharedMemory(); + + static std::shared_ptr Create(size_t num_bytes_in_buffer); + static std::shared_ptr Open(const CudaIpcHandle &ipc_handle); + + void *GetAddress(); +}; + +#endif // CUDA_SHARED_MEMORY_H From 20c620210cb1ec56b701e3208d012b88a56baed4 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Mon, 16 Dec 2024 17:09:20 -0800 Subject: [PATCH 26/42] Add rough layout for message broker. --- catkit_core/CMakeLists.txt | 1 + catkit_core/MessageBroker.cpp | 2 + catkit_core/MessageBroker.h | 136 ++++++++++++++++++++++++++++++++++ 3 files changed, 139 insertions(+) create mode 100644 catkit_core/MessageBroker.cpp create mode 100644 catkit_core/MessageBroker.h diff --git a/catkit_core/CMakeLists.txt b/catkit_core/CMakeLists.txt index fb937ff85..16f62494c 100644 --- a/catkit_core/CMakeLists.txt +++ b/catkit_core/CMakeLists.txt @@ -34,6 +34,7 @@ add_library(catkit_core STATIC Util.cpp PoolAllocator.cpp FreeListAllocator.cpp + MessageBroker.cpp UuidGenerator.cpp proto/core.pb.cc proto/logging.pb.cc diff --git a/catkit_core/MessageBroker.cpp b/catkit_core/MessageBroker.cpp new file mode 100644 index 000000000..2983c90e8 --- /dev/null +++ b/catkit_core/MessageBroker.cpp @@ -0,0 +1,2 @@ +#include "MessageBroker.h" + diff --git a/catkit_core/MessageBroker.h b/catkit_core/MessageBroker.h new file mode 100644 index 000000000..b96c563fc --- /dev/null +++ b/catkit_core/MessageBroker.h @@ -0,0 +1,136 @@ +#ifndef MESSAGE_BROKER_H +#define MESSAGE_BROKER_H + +#include "HashMap.h" +#include "Synchronization.h" +#include "FreeListAllocator.h" +#include "PoolAllocator.h" +#include "SharedMemory.h" +#include "CudaSharedMemory.h" +#include "UuidGenerator.h" + +const char * const MESSAGE_BROKER_VERSION = "0.1"; + +const size_t VERSION_SIZE = 8; +const size_t TOPIC_HASH_MAP_SIZE = 16384; +const size_t TOPIC_MAX_KEY_SIZE = 128; +const size_t TOPIC_MAX_NUM_MESSAGES = 15; +const size_t HOST_NAME_SIZE = 64; +const size_t METADATA_MAX_STRLEN = 15; +const size_t UUID_SIZE = 16; +const size_t MAX_NUM_DIMENSIONS = 4; +const size_t MAX_NUM_METADATA_ENTRIES = 16; +const size_t MAX_SHARED_MEMORY_ID_SIZE = 64; +const size_t MAX_NUM_GPUS = 8; + +struct MetadataEntry +{ +// Avoid post-padding of this anonymous union. +#pragma pack(push,1) + union + { + std::uint64_t integer; + double floating_point; + char string[METADATA_MAX_STRLEN]; + }; +#pragma pack(pop) + + std::uint8_t metadata_id; +}; + +struct ArrayInfo +{ + char data_type; + char byte_order; + std::uint8_t num_dimensions; + std::uint32_t shape[MAX_NUM_DIMENSIONS]; + std::uint32_t strides[MAX_NUM_DIMENSIONS]; +}; + +struct PayloadInfo +{ + std::int8_t device_id; + std::uint64_t offset_in_buffer; + std::uint64_t total_size; + + ArrayInfo array_info; +}; + +struct MessageHeader +{ + char topic[TOPIC_MAX_KEY_SIZE]; + + char payload_id[UUID_SIZE]; + std::uint64_t frame_id; + + char trace_id[UUID_SIZE]; + + char producer_hostname[HOST_NAME_SIZE]; + std::uint32_t producer_pid; + std::uint64_t producer_timestamp; + + PayloadInfo payload_info; + + MetadataEntry metadata_entries[MAX_NUM_METADATA_ENTRIES]; + + std::uint16_t partial_frame_id; + std::uint64_t start_byte; + std::uint64_t end_byte; +}; + +struct TopicHeader +{ + std::atomic_uint64_t latest_frame_id; + std::uint64_t message_offsets[TOPIC_MAX_NUM_MESSAGES]; + char message_ids[UUID_SIZE][TOPIC_MAX_NUM_MESSAGES]; + + SynchronizationSharedData synchronization; +}; + +struct MessageBrokerHeader +{ + char version[VERSION_SIZE]; + char creator_hostname[HOST_NAME_SIZE]; + std::uint64_t time_of_last_activity; + + char buffer_shared_memory_id[MAX_SHARED_MEMORY_ID_SIZE]; + CudaIpcHandle cuda_ipc_handles[MAX_NUM_GPUS]; +}; + +struct Message +{ + MessageHeader *header; + void *payload; +}; + +class MessageBroker +{ +public: + MessageBroker(void *metadata_buffer); + + void Initialize(); + + void *ReservePayload(size_t payload_size, int8_t device_id = -1); + + void Publish(const std::string &topic, const Message &message); + void PublishPartial(const std::string &topic, const Message &message); + + Message GetNextMessage(const std::string &topic, double timeout_in_seconds); + Message GetMessage(const std::string &topic, size_t frame_id); + +private: + MessageBrokerHeader &m_Header; + + HashMap m_TopicHeaders; + PoolAllocator m_MessageHeaderAllocator; + + FreeListAllocator m_CpuPayloadAllocator; + std::shared_ptr m_CpuPayloadMemory; + + FreeListAllocator m_GpuPayloadAllocator[MAX_NUM_GPUS]; + std::shared_ptr m_GpuPayloadMemory[MAX_NUM_GPUS]; + + UuidGenerator m_UuidGenerator; +}; + +#endif // MESSAGE_BROKER_H From f19582408a904078c185025fac1535e5502358d4 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Mon, 16 Dec 2024 17:09:33 -0800 Subject: [PATCH 27/42] Add benchmark for UUID generator. --- benchmarks/CMakeLists.txt | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index a76ebf1e7..b9c54288c 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -39,6 +39,11 @@ add_executable(hash_map hash_map.cpp) target_include_directories(hash_map PUBLIC ../catkit_core) target_link_libraries(hash_map PUBLIC catkit_core) +# Uuid generator benchmark +add_executable(uuid_generator uuid_generator.cpp) +target_include_directories(uuid_generator PUBLIC ../catkit_core) +target_link_libraries(uuid_generator PUBLIC catkit_core) + # Add install files install(TARGETS datastream_latency DESTINATION bin) install(TARGETS datastream_submit DESTINATION bin) @@ -46,3 +51,4 @@ install(TARGETS timestamp DESTINATION bin) install(TARGETS free_list_allocator DESTINATION bin) install(TARGETS pool_allocator DESTINATION bin) install(TARGETS hash_map DESTINATION bin) +install(TARGETS uuid_generator DESTINATION bin) From c2c26f2f1a1c66b6afee5b2444417dd51b8595dd Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 17 Dec 2024 21:41:01 -0800 Subject: [PATCH 28/42] Use a typename for UUIDs. --- catkit_core/UuidGenerator.cpp | 2 +- catkit_core/UuidGenerator.h | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/catkit_core/UuidGenerator.cpp b/catkit_core/UuidGenerator.cpp index ea68bcaf8..80328a02d 100644 --- a/catkit_core/UuidGenerator.cpp +++ b/catkit_core/UuidGenerator.cpp @@ -11,7 +11,7 @@ UuidGenerator::UuidGenerator() } } -void UuidGenerator::GenerateUuid(char *uuid) +void UuidGenerator::Generate(Uuid &uuid) { for (size_t i = 0; i < 2; ++i) { diff --git a/catkit_core/UuidGenerator.h b/catkit_core/UuidGenerator.h index 244b828d9..875234218 100644 --- a/catkit_core/UuidGenerator.h +++ b/catkit_core/UuidGenerator.h @@ -3,12 +3,14 @@ #include +using Uuid = char[16]; + class UuidGenerator { public: UuidGenerator(); - void GenerateUuid(char *uuid); + void Generate(Uuid &uuid); private: std::mt19937_64 m_Engines[2]; From 304a0052e6fdbac6a2b5032b75179eb47dc30e70 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 18 Dec 2024 14:00:20 -0800 Subject: [PATCH 29/42] Flesh out interface more. --- catkit_core/MessageBroker.h | 111 +++++++++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 27 deletions(-) diff --git a/catkit_core/MessageBroker.h b/catkit_core/MessageBroker.h index b96c563fc..f8ce8adf5 100644 --- a/catkit_core/MessageBroker.h +++ b/catkit_core/MessageBroker.h @@ -9,6 +9,8 @@ #include "CudaSharedMemory.h" #include "UuidGenerator.h" +#include + const char * const MESSAGE_BROKER_VERSION = "0.1"; const size_t VERSION_SIZE = 8; @@ -16,26 +18,17 @@ const size_t TOPIC_HASH_MAP_SIZE = 16384; const size_t TOPIC_MAX_KEY_SIZE = 128; const size_t TOPIC_MAX_NUM_MESSAGES = 15; const size_t HOST_NAME_SIZE = 64; -const size_t METADATA_MAX_STRLEN = 15; -const size_t UUID_SIZE = 16; +const size_t METADATA_MAX_STRLEN = 16; const size_t MAX_NUM_DIMENSIONS = 4; const size_t MAX_NUM_METADATA_ENTRIES = 16; const size_t MAX_SHARED_MEMORY_ID_SIZE = 64; const size_t MAX_NUM_GPUS = 8; -struct MetadataEntry +union MetadataEntry { -// Avoid post-padding of this anonymous union. -#pragma pack(push,1) - union - { - std::uint64_t integer; - double floating_point; - char string[METADATA_MAX_STRLEN]; - }; -#pragma pack(pop) - - std::uint8_t metadata_id; + std::uint64_t integer; + double floating_point; + char string[METADATA_MAX_STRLEN]; }; struct ArrayInfo @@ -60,10 +53,10 @@ struct MessageHeader { char topic[TOPIC_MAX_KEY_SIZE]; - char payload_id[UUID_SIZE]; + Uuid payload_id; std::uint64_t frame_id; - char trace_id[UUID_SIZE]; + Uuid trace_id; char producer_hostname[HOST_NAME_SIZE]; std::uint32_t producer_pid; @@ -80,11 +73,20 @@ struct MessageHeader struct TopicHeader { - std::atomic_uint64_t latest_frame_id; + std::atomic_uint64_t next_frame_id; std::uint64_t message_offsets[TOPIC_MAX_NUM_MESSAGES]; - char message_ids[UUID_SIZE][TOPIC_MAX_NUM_MESSAGES]; SynchronizationSharedData synchronization; + + char metadata_keys[METADATA_MAX_STRLEN][MAX_NUM_METADATA_ENTRIES]; + + TopicHeader() = default; + TopicHeader(const TopicHeader &header); + + TopicHeader &operator=(const TopicHeader &header); + +private: + void CopyFrom(const TopicHeader &header); }; struct MessageBrokerHeader @@ -97,33 +99,88 @@ struct MessageBrokerHeader CudaIpcHandle cuda_ipc_handles[MAX_NUM_GPUS]; }; -struct Message +class Message { - MessageHeader *header; - void *payload; + friend class MessageBroker; + +public: + Message(); + + bool PublishPartial(std::uint64_t start_byte, std::uint64_t end_byte, bool is_final); + bool Publish(); + + const char *GetTopic() const; + + const Uuid &GetPayloadId() const; + const std::uint64_t GetFrameId() const; + + const Uuid &GetTraceId() const; + + const char *GetProducerHostnname() const; + const std::uint32_t GetProducerPid() const; + const std::uint64_t GetProducerTimestamp() const; + + const PayloadInfo &GetPayloadInfo() const; + + const ArrayInfo &GetArrayInfo() const; + void SetArrayInfo(const ArrayInfo &array_info); + + void *GetPayload() const; + size_t GetPayloadSize() const; + + const MetadataEntry &GetMetadataEntry(std::uint8_t metadata_id) const; + void SetMetadataEntry(std::uint8_t metadata_id, std::uint64_t value); + void SetMetadataEntry(std::uint8_t metadata_id, double value); + void SetMetadataEntry(std::uint8_t metadata_id, const char *value); + + const std::uint16_t GetPartialFrameId() const; + + const std::uint64_t GetStartByte() const; + void SetStartByte(const std::uint64_t &start_byte); + + const std::uint64_t GetEndByte() const; + void SetEndByte(const std::uint64_t &end_byte); + +private: + MessageHeader *m_Header; + void *m_Payload; + + bool m_HasBeenPublished; + + std::shared_ptr m_MessageBroker; }; class MessageBroker { -public: - MessageBroker(void *metadata_buffer); +private: + MessageBroker(); // TODO: Add parameters. - void Initialize(); +public: + std::unique_ptr Create(); // TODO: Add parameters. + std::unique_ptr Open(void *metadata_buffer); - void *ReservePayload(size_t payload_size, int8_t device_id = -1); + Message PrepareMessage(const std::string &topic, Uuid trace_id, size_t payload_size, int8_t device_id = -1); + Message PrepareMessage(const std::string &topic, size_t payload_size, int8_t device_id = -1); - void Publish(const std::string &topic, const Message &message); - void PublishPartial(const std::string &topic, const Message &message); + bool Publish(const Message &message); + bool PublishPartial(const Message &message, bool is_final); Message GetNextMessage(const std::string &topic, double timeout_in_seconds); Message GetMessage(const std::string &topic, size_t frame_id); private: + uint64_t AllocatePayload(size_t payload_size, int8_t device_id); + bool PublishMessage(const Message &message); + + FreeListAllocator *GetAllocator(int8_t device_id); + MessageBrokerHeader &m_Header; HashMap m_TopicHeaders; PoolAllocator m_MessageHeaderAllocator; + MessageHeader *m_MessageHeaders; + FreeListAllocator m_CpuPayloadAllocator; std::shared_ptr m_CpuPayloadMemory; From e29e4e3aa6cf9e99339a798824e3cd09d360bb16 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 19 Dec 2024 20:14:19 -0800 Subject: [PATCH 30/42] Only use a single PublishMessage() method. --- catkit_core/MessageBroker.h | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/catkit_core/MessageBroker.h b/catkit_core/MessageBroker.h index f8ce8adf5..5b3a8e110 100644 --- a/catkit_core/MessageBroker.h +++ b/catkit_core/MessageBroker.h @@ -24,6 +24,8 @@ const size_t MAX_NUM_METADATA_ENTRIES = 16; const size_t MAX_SHARED_MEMORY_ID_SIZE = 64; const size_t MAX_NUM_GPUS = 8; +const std::uint64_t INVALID_FRAME_ID = 0xFFFFFFFFFFFFFFFF; + union MetadataEntry { std::uint64_t integer; @@ -103,11 +105,11 @@ class Message { friend class MessageBroker; -public: +private: Message(); - bool PublishPartial(std::uint64_t start_byte, std::uint64_t end_byte, bool is_final); - bool Publish(); +public: + ~Message(); const char *GetTopic() const; @@ -150,8 +152,10 @@ class Message std::shared_ptr m_MessageBroker; }; -class MessageBroker +class MessageBroker : std::enable_shared_from_this { + friend class Message; + private: MessageBroker(); // TODO: Add parameters. @@ -159,20 +163,17 @@ class MessageBroker std::unique_ptr Create(); // TODO: Add parameters. std::unique_ptr Open(void *metadata_buffer); - Message PrepareMessage(const std::string &topic, Uuid trace_id, size_t payload_size, int8_t device_id = -1); Message PrepareMessage(const std::string &topic, size_t payload_size, int8_t device_id = -1); + Message PrepareMessage(const std::string &topic, Uuid trace_id, size_t payload_size, int8_t device_id = -1); - bool Publish(const Message &message); - bool PublishPartial(const Message &message, bool is_final); + void PublishMessage(Message &message, bool is_final = true); Message GetNextMessage(const std::string &topic, double timeout_in_seconds); Message GetMessage(const std::string &topic, size_t frame_id); private: - uint64_t AllocatePayload(size_t payload_size, int8_t device_id); - bool PublishMessage(const Message &message); - FreeListAllocator *GetAllocator(int8_t device_id); + Synchronization *GetSynchronization(const std::string &topic); MessageBrokerHeader &m_Header; From 35117d104e4f49df12d923835a7bdc4444ad64fd Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 19 Dec 2024 20:15:15 -0800 Subject: [PATCH 31/42] Topic header copy constructor and copy assignment operator. --- catkit_core/MessageBroker.cpp | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/catkit_core/MessageBroker.cpp b/catkit_core/MessageBroker.cpp index 2983c90e8..d3ee17763 100644 --- a/catkit_core/MessageBroker.cpp +++ b/catkit_core/MessageBroker.cpp @@ -1,2 +1,28 @@ #include "MessageBroker.h" +#include "Util.h" +#include "Timing.h" +#include "HostName.h" + +#include + +TopicHeader::TopicHeader(const TopicHeader &header) +{ + CopyFrom(header); +} + +TopicHeader &TopicHeader::operator=(const TopicHeader &header) +{ + CopyFrom(header); + + return *this; +} + +void TopicHeader::CopyFrom(const TopicHeader &header) +{ + next_frame_id.store(header.next_frame_id.load(std::memory_order_relaxed), std::memory_order_relaxed); + synchronization = header.synchronization; + + std::copy(header.message_offsets, header.message_offsets + TOPIC_MAX_NUM_MESSAGES, message_offsets); + std::copy((char *)header.metadata_keys, (char *)header.metadata_keys + sizeof(metadata_keys), (char *)metadata_keys); +} From 485ae311d9d973c485366866da6aa3109c7200a3 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 19 Dec 2024 20:15:53 -0800 Subject: [PATCH 32/42] Initial implementation of Prepare() and Publish() functions. --- catkit_core/MessageBroker.cpp | 152 ++++++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) diff --git a/catkit_core/MessageBroker.cpp b/catkit_core/MessageBroker.cpp index d3ee17763..1fc6df6da 100644 --- a/catkit_core/MessageBroker.cpp +++ b/catkit_core/MessageBroker.cpp @@ -26,3 +26,155 @@ void TopicHeader::CopyFrom(const TopicHeader &header) std::copy(header.message_offsets, header.message_offsets + TOPIC_MAX_NUM_MESSAGES, message_offsets); std::copy((char *)header.metadata_keys, (char *)header.metadata_keys + sizeof(metadata_keys), (char *)metadata_keys); } + +Message MessageBroker::PrepareMessage(const std::string &topic, size_t payload_size, int8_t device_id) +{ + Uuid trace_id; + m_UuidGenerator.Generate(trace_id); + + return PrepareMessage(topic, trace_id, payload_size, device_id); +} + +Message MessageBroker::PrepareMessage(const std::string &topic, Uuid trace_id, size_t payload_size, int8_t device_id) +{ + Message message; + + message.m_HasBeenPublished = false; + message.m_MessageBroker = shared_from_this(); + + // Allocate a payload. + auto allocator = GetAllocator(device_id); + + if (allocator == nullptr) + { + throw std::runtime_error("Invalid device ID."); + } + + auto block_handle = allocator->Allocate(payload_size); + + if (block_handle == FreeListAllocator::INVALID_HANDLE) + { + throw std::runtime_error("Could not allocate payload."); + } + + auto offset = allocator->GetOffset(block_handle); + + if (device_id < 0) + { + message.m_Payload = m_CpuPayloadMemory->GetAddress() + offset; + } + else + { + message.m_Payload = m_GpuPayloadMemory[device_id]->GetAddress() + offset; + } + + // Allocate a message header. + auto message_header_handle = m_MessageHeaderAllocator.Allocate(); + + if (message_header_handle == PoolAllocator::INVALID_HANDLE) + { + throw std::runtime_error("Could not allocate message header."); + } + + // Access the message header. + message.m_Header = &m_MessageHeaders[message_header_handle]; + auto header = message.m_Header; + + // Set the payload information. + header->payload_info.device_id = device_id; + header->payload_info.total_size = payload_size; + header->payload_info.offset_in_buffer = offset; + m_UuidGenerator.Generate(header->payload_id); + + // Set the topic. + std::strncpy(header->topic, topic.c_str(), sizeof(header->topic)); + + // Set the trace ID. + std::strncpy(header->trace_id, trace_id, sizeof(header->trace_id)); + + // Set the producer information. + std::strncpy(header->producer_hostname, GetHostName().c_str(), sizeof(header->producer_hostname)); + header->producer_pid = GetProcessId(); + + header->partial_frame_id = 0; + header->start_byte = 0; + header->end_byte = payload_size; + + // Set default values. + header->frame_id = INVALID_FRAME_ID; + header->producer_timestamp = 0; + + return message; +} + +void MessageBroker::PublishMessage(Message &message, bool is_final) +{ + if (message.m_HasBeenPublished) + { + return; + } + + auto topic_header = m_TopicHeaders.Find(message.m_Header->topic); + + if (message.m_Header->partial_frame_id == INVALID_FRAME_ID) + { + // First partial frame. Assign a new frame ID. + message.m_Header->frame_id = topic_header->next_frame_id.fetch_add(1, std::memory_order_relaxed); + message.m_Header->partial_frame_id = 0; + } + else + { + // Not the first partial frame. Use the same frame ID and increment the partial frame ID. + message.m_Header->partial_frame_id++; + } + + // Get timestamp. + message.m_Header->producer_timestamp = GetTimeStamp(); + + // Go to synchronization structures and signal them. + + + if (!is_final) + { + // Copy the message header since it's gone after publishing. + auto message_header_handle = m_MessageHeaderAllocator.Allocate(); + + if (message_header_handle == PoolAllocator::INVALID_HANDLE) + { + throw std::runtime_error("Could not allocate message header."); + } + + auto new_message_header = &m_MessageHeaders[message_header_handle]; + *new_message_header = *message.m_Header; + message.m_Header = new_message_header; + } + + message.m_HasBeenPublished = is_final; +} + +FreeListAllocator *MessageBroker::GetAllocator(int8_t device_id) +{ + if (device_id < -1 || device_id >= MAX_NUM_GPUS) + { + return nullptr; + } + + if (device_id == -1) + { + return &m_CpuPayloadAllocator; + } + + return &m_GpuPayloadAllocator[device_id]; +} + +Synchronization *MessageBroker::GetSynchronization(const std::string &topic) +{ + auto topic_header = m_TopicHeaders.Find(topic); + + if (topic_header == nullptr) + { + return nullptr; + } + + // TODO: look up the synchronization structure (not the shared data). +} From aff12e5e01e682f989936b848f437f395776412b Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Fri, 20 Dec 2024 11:17:52 -0800 Subject: [PATCH 33/42] Declare class for pointer access. --- catkit_core/MessageBroker.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/catkit_core/MessageBroker.h b/catkit_core/MessageBroker.h index 5b3a8e110..25fdf79c7 100644 --- a/catkit_core/MessageBroker.h +++ b/catkit_core/MessageBroker.h @@ -101,6 +101,8 @@ struct MessageBrokerHeader CudaIpcHandle cuda_ipc_handles[MAX_NUM_GPUS]; }; +class MessageBroker; + class Message { friend class MessageBroker; From 59351f0f10566b3915308be3ad626f140ecb9e37 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Sun, 22 Dec 2024 12:21:16 -0800 Subject: [PATCH 34/42] Trigger synchronization for each (parent) topic. --- catkit_core/MessageBroker.cpp | 41 ++++++++++++++++++++++++++--------- catkit_core/MessageBroker.h | 11 ++++++---- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/catkit_core/MessageBroker.cpp b/catkit_core/MessageBroker.cpp index 1fc6df6da..89245a03d 100644 --- a/catkit_core/MessageBroker.cpp +++ b/catkit_core/MessageBroker.cpp @@ -61,11 +61,11 @@ Message MessageBroker::PrepareMessage(const std::string &topic, Uuid trace_id, s if (device_id < 0) { - message.m_Payload = m_CpuPayloadMemory->GetAddress() + offset; + message.m_Payload = m_CpuPayloadMemory->GetAddress(offset); } else { - message.m_Payload = m_GpuPayloadMemory[device_id]->GetAddress() + offset; + message.m_Payload = m_GpuPayloadMemory[device_id]->GetAddress(offset); } // Allocate a message header. @@ -114,9 +114,10 @@ void MessageBroker::PublishMessage(Message &message, bool is_final) return; } - auto topic_header = m_TopicHeaders.Find(message.m_Header->topic); + auto topic = std::string_view(message.m_Header->topic); + auto topic_header = m_TopicHeaders.Find(topic); - if (message.m_Header->partial_frame_id == INVALID_FRAME_ID) + if (message.m_Header->frame_id == INVALID_FRAME_ID) { // First partial frame. Assign a new frame ID. message.m_Header->frame_id = topic_header->next_frame_id.fetch_add(1, std::memory_order_relaxed); @@ -128,11 +129,25 @@ void MessageBroker::PublishMessage(Message &message, bool is_final) message.m_Header->partial_frame_id++; } - // Get timestamp. + // Set the timestamp. message.m_Header->producer_timestamp = GetTimeStamp(); + // TODO: put message offsets. + // Go to synchronization structures and signal them. + // This includes parent topics. + for (std::size_t i = 0; i <= topic.size(); ++i) + { + std::size_t size = topic.size() - i; + + if (i == 0 || topic[size] == '/') + { + auto synchronization = GetSynchronization(topic.substr(0, size)); + if (synchronization) + synchronization->Signal(); + } + } if (!is_final) { @@ -152,7 +167,7 @@ void MessageBroker::PublishMessage(Message &message, bool is_final) message.m_HasBeenPublished = is_final; } -FreeListAllocator *MessageBroker::GetAllocator(int8_t device_id) +std::shared_ptr MessageBroker::GetAllocator(int8_t device_id) { if (device_id < -1 || device_id >= MAX_NUM_GPUS) { @@ -161,13 +176,13 @@ FreeListAllocator *MessageBroker::GetAllocator(int8_t device_id) if (device_id == -1) { - return &m_CpuPayloadAllocator; + return m_CpuPayloadAllocator; } - return &m_GpuPayloadAllocator[device_id]; + return m_GpuPayloadAllocator[device_id]; } -Synchronization *MessageBroker::GetSynchronization(const std::string &topic) +std::shared_ptr MessageBroker::GetSynchronization(std::string_view topic) { auto topic_header = m_TopicHeaders.Find(topic); @@ -176,5 +191,11 @@ Synchronization *MessageBroker::GetSynchronization(const std::string &topic) return nullptr; } - // TODO: look up the synchronization structure (not the shared data). + // Look up the synchronization structure (not the shared data). + if (m_Synchronizations.find(topic) == m_Synchronizations.end()) + { + m_Synchronizations[topic] = std::make_shared(topic_header->synchronization); + } + + return m_Synchronizations[topic]; } diff --git a/catkit_core/MessageBroker.h b/catkit_core/MessageBroker.h index 25fdf79c7..c547c9b88 100644 --- a/catkit_core/MessageBroker.h +++ b/catkit_core/MessageBroker.h @@ -10,6 +10,7 @@ #include "UuidGenerator.h" #include +#include const char * const MESSAGE_BROKER_VERSION = "0.1"; @@ -174,8 +175,8 @@ class MessageBroker : std::enable_shared_from_this Message GetMessage(const std::string &topic, size_t frame_id); private: - FreeListAllocator *GetAllocator(int8_t device_id); - Synchronization *GetSynchronization(const std::string &topic); + std::shared_ptr GetAllocator(int8_t device_id); + std::shared_ptr GetSynchronization(std::string_view topic); MessageBrokerHeader &m_Header; @@ -184,13 +185,15 @@ class MessageBroker : std::enable_shared_from_this MessageHeader *m_MessageHeaders; - FreeListAllocator m_CpuPayloadAllocator; + std::shared_ptr m_CpuPayloadAllocator; std::shared_ptr m_CpuPayloadMemory; - FreeListAllocator m_GpuPayloadAllocator[MAX_NUM_GPUS]; + std::shared_ptr m_GpuPayloadAllocator[MAX_NUM_GPUS]; std::shared_ptr m_GpuPayloadMemory[MAX_NUM_GPUS]; UuidGenerator m_UuidGenerator; + + std::map> m_Synchronizations; }; #endif // MESSAGE_BROKER_H From 7b5f1f38bd63b3fa69108dcf6add26657458e0d9 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Sun, 22 Dec 2024 12:21:40 -0800 Subject: [PATCH 35/42] Fix due to API change. --- benchmarks/uuid_generator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/uuid_generator.cpp b/benchmarks/uuid_generator.cpp index bb2879cf2..8db6341a8 100644 --- a/benchmarks/uuid_generator.cpp +++ b/benchmarks/uuid_generator.cpp @@ -17,7 +17,7 @@ int main() for (size_t i = 0; i < N; ++i) { - generator.GenerateUuid(uuid); + generator.Generate(uuid); } auto end = GetTimeStamp(); From 73ad596c14936fc26d7948ccb0bf7fa5f009f7bc Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Sun, 22 Dec 2024 12:22:03 -0800 Subject: [PATCH 36/42] Unify memory block API. --- catkit_core/CudaSharedMemory.h | 6 ++++-- catkit_core/Memory.h | 16 ++++++++++++++++ catkit_core/SharedMemory.cpp | 4 ++-- catkit_core/SharedMemory.h | 6 ++++-- 4 files changed, 26 insertions(+), 6 deletions(-) create mode 100644 catkit_core/Memory.h diff --git a/catkit_core/CudaSharedMemory.h b/catkit_core/CudaSharedMemory.h index 98899e2d5..a5e433f59 100644 --- a/catkit_core/CudaSharedMemory.h +++ b/catkit_core/CudaSharedMemory.h @@ -1,6 +1,8 @@ #ifndef CUDA_SHARED_MEMORY_H #define CUDA_SHARED_MEMORY_H +#include "Memory.h" + #include #ifdef HAVE_CUDA @@ -11,7 +13,7 @@ typedef cudaIpcMemHandle_t CudaIpcHandle; typedef char CudaIpcHandle[64]; #endif -class CudaSharedMemory +class CudaSharedMemory : public Memory { private: CudaSharedMemory(const CudaIpcHandle &ipc_handle, void *device_pointer=nullptr); @@ -22,7 +24,7 @@ class CudaSharedMemory static std::shared_ptr Create(size_t num_bytes_in_buffer); static std::shared_ptr Open(const CudaIpcHandle &ipc_handle); - void *GetAddress(); + void *GetAddress(std::size_t offset = 0) override; }; #endif // CUDA_SHARED_MEMORY_H diff --git a/catkit_core/Memory.h b/catkit_core/Memory.h new file mode 100644 index 000000000..9f20e9ed6 --- /dev/null +++ b/catkit_core/Memory.h @@ -0,0 +1,16 @@ +#ifndef MEMORY_H +#define MEMORY_H + +#include + +class Memory +{ +public: + virtual ~Memory() + { + } + + virtual void *GetAddress(std::size_t offset = 0) = 0; +}; + +#endif // MEMORY_H diff --git a/catkit_core/SharedMemory.cpp b/catkit_core/SharedMemory.cpp index a469b9c40..c8b8d356e 100644 --- a/catkit_core/SharedMemory.cpp +++ b/catkit_core/SharedMemory.cpp @@ -82,7 +82,7 @@ SharedMemory::SharedMemory(const std::string &id, FileObject file, bool is_owner throw std::runtime_error("Something went wrong while mapping shared memory file."); } -void *SharedMemory::GetAddress() +void *SharedMemory::GetAddress(std::size_t offset) { - return m_Buffer; + return static_cast(m_Buffer) + offset; } diff --git a/catkit_core/SharedMemory.h b/catkit_core/SharedMemory.h index aa6764bd9..ca78b51c6 100644 --- a/catkit_core/SharedMemory.h +++ b/catkit_core/SharedMemory.h @@ -1,6 +1,8 @@ #ifndef SHARED_MEMORY_H #define SHARED_MEMORY_H +#include "Memory.h" + #include #include @@ -21,7 +23,7 @@ typedef int FileObject; #endif -class SharedMemory +class SharedMemory : public Memory { private: SharedMemory(const std::string &id, FileObject file, bool is_owner); @@ -32,7 +34,7 @@ class SharedMemory static std::shared_ptr Create(const std::string &id, size_t num_bytes_in_buffer); static std::shared_ptr Open(const std::string &id); - void *GetAddress(); + void *GetAddress(std::size_t offset = 0) override; private: std::string m_Id; From c46bd8045de9028f50af98ed23b30c9a1fdc37ec Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Sun, 22 Dec 2024 12:22:17 -0800 Subject: [PATCH 37/42] Add local memory for testing purposes. --- catkit_core/CMakeLists.txt | 1 + catkit_core/LocalMemory.cpp | 16 ++++++++++++++++ catkit_core/LocalMemory.h | 18 ++++++++++++++++++ 3 files changed, 35 insertions(+) create mode 100644 catkit_core/LocalMemory.cpp create mode 100644 catkit_core/LocalMemory.h diff --git a/catkit_core/CMakeLists.txt b/catkit_core/CMakeLists.txt index 16f62494c..e4e1de6d5 100644 --- a/catkit_core/CMakeLists.txt +++ b/catkit_core/CMakeLists.txt @@ -36,6 +36,7 @@ add_library(catkit_core STATIC FreeListAllocator.cpp MessageBroker.cpp UuidGenerator.cpp + LocalMemory.cpp proto/core.pb.cc proto/logging.pb.cc proto/testbed.pb.cc diff --git a/catkit_core/LocalMemory.cpp b/catkit_core/LocalMemory.cpp new file mode 100644 index 000000000..57120fc05 --- /dev/null +++ b/catkit_core/LocalMemory.cpp @@ -0,0 +1,16 @@ +#include "LocalMemory.h" + +LocalMemory::LocalMemory(std::size_t num_bytes) + : m_Memory(new char[num_bytes]) +{ +} + +LocalMemory::~LocalMemory() +{ + delete[] m_Memory; +} + +void *LocalMemory::GetAddress(std::size_t offset) +{ + return m_Memory + offset; +} diff --git a/catkit_core/LocalMemory.h b/catkit_core/LocalMemory.h new file mode 100644 index 000000000..d237b9dba --- /dev/null +++ b/catkit_core/LocalMemory.h @@ -0,0 +1,18 @@ +#ifndef LOCAL_MEMORY_H +#define LOCAL_MEMORY_H + +#include "Memory.h" + +class LocalMemory : public Memory +{ +public: + LocalMemory(std::size_t num_bytes); + virtual ~LocalMemory(); + + virtual void *GetAddress(std::size_t offset = 0); + +private: + char *m_Memory; +}; + +#endif // LOCAL_MEMORY_H From 3b23547d4adbc50297e469cf484eb95bde28e3b1 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 21 Jan 2025 19:19:12 -0800 Subject: [PATCH 38/42] Use iterator and range structures to loop over subtopics. --- catkit_core/MessageBroker.cpp | 83 +++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/catkit_core/MessageBroker.cpp b/catkit_core/MessageBroker.cpp index 89245a03d..caf8602bf 100644 --- a/catkit_core/MessageBroker.cpp +++ b/catkit_core/MessageBroker.cpp @@ -6,6 +6,89 @@ #include +class SubtopicIterator { +public: + SubtopicIterator(std::string_view str, char delimiter, bool is_valid = true) + : m_String(str), m_Delimiter(delimiter), m_IsValid(is_valid) + { + } + + std::string_view operator*() const + { + return m_String; + } + + SubtopicIterator& operator++() + { + if (m_String.empty()) + { + m_IsValid = false; + return *this; + } + + size_t pos = m_String.rfind(m_Delimiter); + + if (pos != std::string_view::npos) + { + // Remove the last part. + m_String.remove_suffix(m_String.size() - pos); + } + else + { + // No more delimiters left, so remove the whole string. + m_String.remove_suffix(m_String.size()); + } + + return *this; + } + + bool operator!=(const SubtopicIterator& other) const + { + if (!m_IsValid && !other.m_IsValid) + { + // If both iterators are invalid, they are equal no matter what. + return false; + } + + return m_IsValid == other.m_IsValid || + m_String.data() != other.m_String.data() || + m_String.size() != other.m_String.size(); + } + + static SubtopicIterator end() + { + return SubtopicIterator({}, '\0', true); + } + +private: + std::string_view m_String; + char m_Delimiter; + bool m_IsValid; +}; + +class SubtopicRange +{ +public: + SubtopicRange(std::string_view str, char delimiter = '/') + : m_String(str), m_Delimiter(delimiter) + { + } + + SubtopicIterator begin() const + { + return SubtopicIterator(m_String, m_Delimiter); + } + + SubtopicIterator end() const + { + return SubtopicIterator::end(); + } + +private: + std::string_view m_String; + char m_Delimiter; +}; + TopicHeader::TopicHeader(const TopicHeader &header) { CopyFrom(header); From bac24235983ec88bafecc1001e198142da9da81d Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 21 Jan 2025 19:19:48 -0800 Subject: [PATCH 39/42] Use GetMemory() function to access memory addresses. --- catkit_core/MessageBroker.cpp | 14 ++++++-------- catkit_core/MessageBroker.h | 2 ++ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/catkit_core/MessageBroker.cpp b/catkit_core/MessageBroker.cpp index caf8602bf..db399fba6 100644 --- a/catkit_core/MessageBroker.cpp +++ b/catkit_core/MessageBroker.cpp @@ -142,20 +142,18 @@ Message MessageBroker::PrepareMessage(const std::string &topic, Uuid trace_id, s auto offset = allocator->GetOffset(block_handle); - if (device_id < 0) - { - message.m_Payload = m_CpuPayloadMemory->GetAddress(offset); - } - else - { - message.m_Payload = m_GpuPayloadMemory[device_id]->GetAddress(offset); - } + auto memory = GetMemory(device_id); + message.m_Payload = memory->GetAddress(offset); // Allocate a message header. auto message_header_handle = m_MessageHeaderAllocator.Allocate(); if (message_header_handle == PoolAllocator::INVALID_HANDLE) { + // Deallocate allocated memory block. + // TODO: Do this with RAII. + allocator->Deallocate(block_handle); + throw std::runtime_error("Could not allocate message header."); } diff --git a/catkit_core/MessageBroker.h b/catkit_core/MessageBroker.h index c547c9b88..095893e88 100644 --- a/catkit_core/MessageBroker.h +++ b/catkit_core/MessageBroker.h @@ -176,6 +176,8 @@ class MessageBroker : std::enable_shared_from_this private: std::shared_ptr GetAllocator(int8_t device_id); + std::shared_ptr GetMemory(int8_t device_id); + std::shared_ptr GetSynchronization(std::string_view topic); MessageBrokerHeader &m_Header; From c17f2c222b8ff5ee71c6ff3465cfd68125245956 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 21 Jan 2025 20:11:37 -0800 Subject: [PATCH 40/42] Generalize synchronization lock using templates. --- catkit_core/Synchronization.cpp | 213 ++++++++++++++++++++++++++++++++ catkit_core/Synchronization.h | 78 ++++++++++++ 2 files changed, 291 insertions(+) create mode 100644 catkit_core/Synchronization.cpp create mode 100644 catkit_core/Synchronization.h diff --git a/catkit_core/Synchronization.cpp b/catkit_core/Synchronization.cpp new file mode 100644 index 000000000..7bb818727 --- /dev/null +++ b/catkit_core/Synchronization.cpp @@ -0,0 +1,213 @@ +#include "Synchronization.h" + +#include +#include + +#ifndef _WIN32 + #include +#endif + +#include "Timing.h" + +Synchronization::Synchronization() + : m_IsOwner(false), m_SharedData(nullptr) +{ +} + +Synchronization::~Synchronization() +{ + if (m_SharedData) + { +#ifdef _WIN32 + CloseHandle(m_Semaphore); +#else + +#endif + } +} + +void Synchronization::Initialize(const std::string &id, SynchronizationSharedData *shared_data, bool create) +{ + if (create) + { + Create(id, shared_data); + } + else + { + Open(id, shared_data); + } +} + +void Synchronization::Create(const std::string &id, SynchronizationSharedData *shared_data) +{ + if (m_SharedData) + throw std::runtime_error("Create called on an already initialized Synchronization object."); + + if (!shared_data) + throw std::runtime_error("The passed shared data was a nullptr."); + +#ifdef _WIN32 + m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); + + if (m_Semaphore == NULL) + throw std::runtime_error("Something went wrong while creating semaphore."); + + shared_data->m_NumReadersWaiting = 0; +#else + pthread_mutexattr_t mutex_attr; + pthread_mutexattr_init(&mutex_attr); + pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&(shared_data->m_Mutex), &mutex_attr); + pthread_mutexattr_destroy(&mutex_attr); + + pthread_condattr_t cond_attr; + pthread_condattr_init(&cond_attr); + pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC); +#endif // __APPLE__ + pthread_cond_init(&(shared_data->m_Condition), &cond_attr); + pthread_condattr_destroy(&cond_attr); +#endif // _WIN32 + + m_SharedData = shared_data; +} + +void Synchronization::Open(const std::string &id, SynchronizationSharedData *shared_data) +{ + if (m_SharedData) + throw std::runtime_error("Open called on an already initialized Synchronization object."); + + if (!shared_data) + throw std::runtime_error("The passed shared data was a nullptr."); + +#ifdef _WIN32 + m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); + + if (m_Semaphore == NULL) + throw std::runtime_error("Something went wrong while opening semaphore."); +#else +#endif + + m_SharedData = shared_data; +} + +void Synchronization::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ + if (!m_SharedData) + throw std::runtime_error("Wait() was called before the synchronization was intialized."); + +#ifdef _WIN32 + Timer timer; + DWORD res = WAIT_OBJECT_0; + + while (!condition()) + { + if (res == WAIT_OBJECT_0) + { + // Increment the number of readers that are waiting, making sure the counter + // is at least 1 after the increment. This can occur when a previous reader got + // interrupted and the trigger happening before decrementing the + // m_NumReadersWaiting counter. + while (m_SharedData->m_NumReadersWaiting++ < 0) + { + } + } + + // Wait for a maximum of 20ms to perform periodic error checking. + auto res = WaitForSingleObject(m_Semaphore, (unsigned long) (std::min)(20L, timeout_in_ms)); + + if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) + { + m_SharedData->m_NumReadersWaiting--; + throw std::runtime_error("Waiting time has expired."); + } + + if (res == WAIT_FAILED) + { + m_SharedData->m_NumReadersWaiting--; + throw std::runtime_error("An error occured during waiting for the semaphore: " + std::to_string(GetLastError())); + } + + if (error_check != nullptr) + { + try + { + error_check(); + } + catch (...) + { + m_SharedData->m_NumReadersWaiting--; + throw; + } + } + } +#else + Timer timer; + + while (!condition()) + { + // Wait for a maximum of 20ms to perform periodic error checking. + long timeout_wait = std::min(20L, timeout_in_ms); + +#ifdef __APPLE__ + // Relative timespec. + timespec timeout; + timeout.tv_sec = timeout_wait / 1000; + timeout.tv_nsec = 1000000 * (timeout_wait % 1000); + + int res = pthread_cond_timedwait_relative_np(&(m_SharedData->m_Condition), &(m_SharedData->m_Mutex), &timeout); +#else + // Absolute timespec. + timespec timeout; + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_sec += timeout_wait / 1000; + timeout.tv_nsec += 1000000 * (timeout_wait % 1000); + + int res = pthread_cond_timedwait(&(m_SharedData->m_Condition), &(m_SharedData->m_Mutex), &timeout); +#endif // __APPLE__ + if (res == ETIMEDOUT && timer.GetTime() > (timeout_in_ms * 0.001)) + { + throw std::runtime_error("Waiting time has expired."); + } + + if (error_check != nullptr) + error_check(); + } +#endif // _WIN32 +} + +void Synchronization::Signal() +{ + if (!m_SharedData) + throw std::runtime_error("Signal() was called before the synchronization was intialized."); + +#ifdef _WIN32 + // Notify waiting processes. + long num_readers_waiting = m_SharedData->m_NumReadersWaiting.exchange(0); + + // If a reader times out in between us reading the number of readers that are waiting + // and us releasing the semaphore, we are releasing one too many readers. This + // results in a future reader being released immediately, which is not a problem, + // as there are checks in place for that. + + if (num_readers_waiting > 0) + ReleaseSemaphore(m_Semaphore, (LONG) num_readers_waiting, NULL); +#else + pthread_cond_broadcast(&(m_SharedData->m_Condition)); +#endif // _WIN32 +} + +void Synchronization::Lock() +{ +#ifndef _WIN32 + pthread_mutex_lock(&(m_SharedData->m_Mutex)); +#endif // _WIN32 +} + +void Synchronization::Unlock() +{ +#ifndef _WIN32 + pthread_mutex_unlock(&(m_SharedData->m_Mutex)); +#endif // _WIN32 +} diff --git a/catkit_core/Synchronization.h b/catkit_core/Synchronization.h new file mode 100644 index 000000000..6f6ba5cf6 --- /dev/null +++ b/catkit_core/Synchronization.h @@ -0,0 +1,78 @@ +#ifndef SYNCHRONZATION_H +#define SYNCHRONZATION_H + +#include +#include +#include +#include + +#ifdef _WIN32 + #define WIN32_LEAN_AND_MEAN + #define NOMINMAX + #include +#else + #include + #include +#endif // _WIN32 + +class Synchronization; + +struct SynchronizationSharedData +{ +#ifdef _WIN32 + std::atomic_long m_NumReadersWaiting; +#else + pthread_cond_t m_Condition; + pthread_mutex_t m_Mutex; +#endif +}; + +class Synchronization +{ +public: + Synchronization(); + Synchronization(const Synchronization &other) = delete; + ~Synchronization(); + + Synchronization &operator=(const Synchronization &other) = delete; + + void Initialize(const std::string &id, SynchronizationSharedData *shared_data, bool create); + + void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); + void Signal(); + + void Lock(); + void Unlock(); + +private: + void Create(const std::string &id, SynchronizationSharedData *shared_data); + void Open(const std::string &id, SynchronizationSharedData *shared_data); + + bool m_IsOwner; + SynchronizationSharedData *m_SharedData; + std::string m_Id; + +#ifdef _WIN32 + HANDLE m_Semaphore; +#endif +}; + +template +class SynchronizationLock +{ +public: + inline SynchronizationLock(T sync) + : m_Sync(sync) + { + m_Sync->Lock(); + } + inline ~SynchronizationLock() + { + m_Sync->Unlock(); + } + +private: + T m_Sync; +}; + +#endif // SYNCHRONIZATION_H From 9790a19b0dad7426973f8f545e51212e428faa17 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 16:03:37 -0800 Subject: [PATCH 41/42] Publish messages on each subtopic. --- catkit_core/MessageBroker.cpp | 50 ++++++++++++++++++++++++++++------- catkit_core/MessageBroker.h | 5 +++- 2 files changed, 44 insertions(+), 11 deletions(-) diff --git a/catkit_core/MessageBroker.cpp b/catkit_core/MessageBroker.cpp index db399fba6..a19d73b36 100644 --- a/catkit_core/MessageBroker.cpp +++ b/catkit_core/MessageBroker.cpp @@ -5,6 +5,19 @@ #include "HostName.h" #include +#include + +template +T fetch_max(std::atomic &atom, T value) +{ + T current = atom.load(std::memory_order_relaxed); + + while (current < value && !atom.compare_exchange_weak(current, value, std::memory_order_acq_rel)) + { + } + + return current; +} class SubtopicIterator { public: @@ -203,6 +216,12 @@ void MessageBroker::PublishMessage(Message &message, bool is_final) // First partial frame. Assign a new frame ID. message.m_Header->frame_id = topic_header->next_frame_id.fetch_add(1, std::memory_order_relaxed); message.m_Header->partial_frame_id = 0; + + // If the ring buffer is full, make the oldest frame unavailable. + if ((topic_header->last_frame_id - topic_header->first_frame_id) >= TOPIC_MAX_NUM_MESSAGES) + { + topic_header->first_frame_id++; + } } else { @@ -213,20 +232,31 @@ void MessageBroker::PublishMessage(Message &message, bool is_final) // Set the timestamp. message.m_Header->producer_timestamp = GetTimeStamp(); - // TODO: put message offsets. - - // Go to synchronization structures and signal them. - // This includes parent topics. - for (std::size_t i = 0; i <= topic.size(); ++i) + for (const auto &subtopic : SubtopicRange(topic)) { - std::size_t size = topic.size() - i; + auto topic_header = m_TopicHeaders.Find(std::string(subtopic)); - if (i == 0 || topic[size] == '/') + if (!topic_header) { - auto synchronization = GetSynchronization(topic.substr(0, size)); + // TODO: if the topic header doesn't exist, create it. + + } + + auto synchronization = GetSynchronization(subtopic); + + // Copy over message header reference. + std::size_t message_header_index = message.m_Header - m_MessageHeaders; + topic_header->message_headers[message.m_Header->frame_id % TOPIC_MAX_NUM_MESSAGES] = message_header_index; + + { + // Obtain a lock as we're about to signal the synchronization structure. + auto lock = SynchronizationLock(synchronization); + + // Make the message available. + fetch_max(topic_header->last_frame_id, message.m_Header->frame_id); - if (synchronization) - synchronization->Signal(); + // Signal the synchronization structure. + synchronization->Signal(); } } diff --git a/catkit_core/MessageBroker.h b/catkit_core/MessageBroker.h index 095893e88..091b34d36 100644 --- a/catkit_core/MessageBroker.h +++ b/catkit_core/MessageBroker.h @@ -77,7 +77,10 @@ struct MessageHeader struct TopicHeader { std::atomic_uint64_t next_frame_id; - std::uint64_t message_offsets[TOPIC_MAX_NUM_MESSAGES]; + std::atomic_uint64_t first_frame_id; + std::atomic_uint64_t last_frame_id; + + std::uint64_t message_headers[TOPIC_MAX_NUM_MESSAGES]; SynchronizationSharedData synchronization; From 43f7f7bd0cd31cef746750a42a03af52aac72455 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 16:07:19 -0800 Subject: [PATCH 42/42] Fix name change. --- catkit_core/MessageBroker.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catkit_core/MessageBroker.cpp b/catkit_core/MessageBroker.cpp index a19d73b36..0bce36fea 100644 --- a/catkit_core/MessageBroker.cpp +++ b/catkit_core/MessageBroker.cpp @@ -119,7 +119,7 @@ void TopicHeader::CopyFrom(const TopicHeader &header) next_frame_id.store(header.next_frame_id.load(std::memory_order_relaxed), std::memory_order_relaxed); synchronization = header.synchronization; - std::copy(header.message_offsets, header.message_offsets + TOPIC_MAX_NUM_MESSAGES, message_offsets); + std::copy(header.message_headers, header.message_headers + TOPIC_MAX_NUM_MESSAGES, message_headers); std::copy((char *)header.metadata_keys, (char *)header.metadata_keys + sizeof(metadata_keys), (char *)metadata_keys); }