diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index a76ebf1e..b9c54288 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -39,6 +39,11 @@ add_executable(hash_map hash_map.cpp) target_include_directories(hash_map PUBLIC ../catkit_core) target_link_libraries(hash_map PUBLIC catkit_core) +# Uuid generator benchmark +add_executable(uuid_generator uuid_generator.cpp) +target_include_directories(uuid_generator PUBLIC ../catkit_core) +target_link_libraries(uuid_generator PUBLIC catkit_core) + # Add install files install(TARGETS datastream_latency DESTINATION bin) install(TARGETS datastream_submit DESTINATION bin) @@ -46,3 +51,4 @@ install(TARGETS timestamp DESTINATION bin) install(TARGETS free_list_allocator DESTINATION bin) install(TARGETS pool_allocator DESTINATION bin) install(TARGETS hash_map DESTINATION bin) +install(TARGETS uuid_generator DESTINATION bin) diff --git a/benchmarks/uuid_generator.cpp b/benchmarks/uuid_generator.cpp new file mode 100644 index 00000000..8db6341a --- /dev/null +++ b/benchmarks/uuid_generator.cpp @@ -0,0 +1,32 @@ +#include "UuidGenerator.h" +#include "Timing.h" + +#include + +int main() +{ + const size_t N = 100000000; + + UuidGenerator generator; + + char uuid[16]; + + std::cout << std::hex; + + auto start = GetTimeStamp(); + + for (size_t i = 0; i < N; ++i) + { + generator.Generate(uuid); + } + + auto end = GetTimeStamp(); + + std::cout << std::dec; + + std::cout << "Time: " << (end - start) / 1e9 << " sec" << std::endl; + std::cout << "Throughput: " << N / ((end - start) / 1e9) << " ops/s" << std::endl; + std::cout << "Time per operation: " << (end - start) / N << " ns" << std::endl; + + return 0; +} diff --git a/catkit_core/CMakeLists.txt b/catkit_core/CMakeLists.txt index 6258d432..e4e1de6d 100644 --- a/catkit_core/CMakeLists.txt +++ b/catkit_core/CMakeLists.txt @@ -14,7 +14,6 @@ add_compile_options($<$:/MP1>) add_library(catkit_core STATIC DataStream.cpp SharedMemory.cpp - Synchronization.cpp Timing.cpp Log.cpp LogConsole.cpp @@ -35,6 +34,9 @@ add_library(catkit_core STATIC Util.cpp PoolAllocator.cpp FreeListAllocator.cpp + MessageBroker.cpp + UuidGenerator.cpp + LocalMemory.cpp proto/core.pb.cc proto/logging.pb.cc proto/testbed.pb.cc diff --git a/catkit_core/CudaSharedMemory.h b/catkit_core/CudaSharedMemory.h new file mode 100644 index 00000000..a5e433f5 --- /dev/null +++ b/catkit_core/CudaSharedMemory.h @@ -0,0 +1,30 @@ +#ifndef CUDA_SHARED_MEMORY_H +#define CUDA_SHARED_MEMORY_H + +#include "Memory.h" + +#include + +#ifdef HAVE_CUDA +#include +typedef cudaIpcMemHandle_t CudaIpcHandle; +#else +// CUDA cudaIpcMemHandle_t is a struct of 64 bytes. +typedef char CudaIpcHandle[64]; +#endif + +class CudaSharedMemory : public Memory +{ +private: + CudaSharedMemory(const CudaIpcHandle &ipc_handle, void *device_pointer=nullptr); + +public: + ~CudaSharedMemory(); + + static std::shared_ptr Create(size_t num_bytes_in_buffer); + static std::shared_ptr Open(const CudaIpcHandle &ipc_handle); + + void *GetAddress(std::size_t offset = 0) override; +}; + +#endif // CUDA_SHARED_MEMORY_H diff --git a/catkit_core/DataStream.cpp b/catkit_core/DataStream.cpp index 09d20a82..9bac19bb 100644 --- a/catkit_core/DataStream.cpp +++ b/catkit_core/DataStream.cpp @@ -68,6 +68,7 @@ void CopyString(char *dest, const char *src, size_t n) DataStream::DataStream(const std::string &stream_id, std::shared_ptr shared_memory, bool create) : m_SharedMemory(shared_memory), + m_Event(nullptr), m_Header(nullptr), m_Buffer(nullptr), m_NextFrameIdToRead(0), m_BufferHandlingMode(BM_NEWEST_ONLY) @@ -75,8 +76,6 @@ DataStream::DataStream(const std::string &stream_id, std::shared_ptrGetAddress(); m_Header = (DataStreamHeader *) buffer; m_Buffer = ((char *) buffer) + sizeof(DataStreamHeader); - - m_Synchronization.Initialize(stream_id, &(m_Header->m_SynchronizationSharedData), create); } DataStream::~DataStream() @@ -119,6 +118,8 @@ std::shared_ptr DataStream::Create(const std::string &stream_name, c data_stream->UpdateParameters(type, dimensions, num_frames_in_buffer); + data_stream->m_Event = Event::Create(stream_id, &(header->m_EventSharedState)); + return data_stream; } @@ -141,6 +142,8 @@ std::shared_ptr DataStream::Open(const std::string &stream_id) // Don't read frames that already are available at the time the data stream is opened. data_stream->m_NextFrameIdToRead = data_stream->m_Header->m_LastId; + data_stream->m_Event = Event::Open(stream_id, &(data_stream->m_Header->m_EventSharedState)); + return data_stream; } @@ -173,22 +176,27 @@ void DataStream::SubmitFrame(size_t id) DataFrameMetadata *meta = m_Header->m_FrameMetadata + (id % m_Header->m_NumFramesInBuffer); meta->m_TimeStamp = GetTimeStamp(); - // Obtain a lock as we are about to modify the condition of the - // synchronization. - auto lock = SynchronizationLock(&m_Synchronization); - - // Make frame available: - // Use a do-while loop to ensure we are never decrementing the last id. - size_t last_id; - do { - last_id = m_Header->m_LastId; - - if (last_id >= id + 1) - break; - } while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1)); + // Obtain a lock as we are about to modify the condition of the + // synchronization. + auto lock = EventLockGuard(m_Event); + + // Make frame available: + // Use a do-while loop to ensure we are never decrementing the last id. + size_t last_id; + do + { + last_id = m_Header->m_LastId; + + if (last_id >= id + 1) + break; + } while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1)); + + m_Event->Signal(); + } - m_Synchronization.Signal(); + auto ts = GetTimeStamp(); + tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), ts, 0); // Don't update the framerate counter for the first frame. if (id == 0) @@ -207,9 +215,6 @@ void DataStream::SubmitFrame(size_t id) m_Header->m_FrameRateCounter = m_Header->m_FrameRateCounter * std::exp(-FRAMERATE_DECAY * time_delta) + FRAMERATE_DECAY; - - auto ts = GetTimeStamp(); - tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), ts, 0); } void DataStream::SubmitData(const void *data) @@ -332,8 +337,8 @@ DataFrame DataStream::GetFrame(size_t id, long wait_time_in_ms, void (*error_che // Wait until frame becomes available. // Obtain a lock first. - auto lock = SynchronizationLock(&m_Synchronization); - m_Synchronization.Wait(wait_time_in_ms, [this, id]() { return this->m_Header->m_LastId > id; }, error_check); + auto lock = EventLockGuard(m_Event); + m_Event->Wait(wait_time_in_ms, [this, id]() { return this->m_Header->m_LastId > id; }, error_check); } size_t offset = (id % m_Header->m_NumFramesInBuffer) * m_Header->m_NumBytesPerFrame; diff --git a/catkit_core/DataStream.h b/catkit_core/DataStream.h index 322a2ea2..66968d44 100644 --- a/catkit_core/DataStream.h +++ b/catkit_core/DataStream.h @@ -7,7 +7,7 @@ #include #include "SharedMemory.h" -#include "Synchronization.h" +#include "Event.h" #include "Tensor.h" const char * const CURRENT_DATASTREAM_VERSION = "0.2"; @@ -46,7 +46,7 @@ struct DataStreamHeader double m_FrameRateCounter; - SynchronizationSharedData m_SynchronizationSharedData; + Event::SharedState m_EventSharedState; }; class DataFrame : public Tensor @@ -117,7 +117,7 @@ class DataStream DataStreamHeader *m_Header; char *m_Buffer; - Synchronization m_Synchronization; + std::shared_ptr m_Event; size_t m_NextFrameIdToRead; BufferHandlingMode m_BufferHandlingMode; diff --git a/catkit_core/Event.h b/catkit_core/Event.h new file mode 100644 index 00000000..faa09974 --- /dev/null +++ b/catkit_core/Event.h @@ -0,0 +1,32 @@ +#ifndef EVENT_H +#define EVENT_H + +#include "EventBase.h" + +template +class EventLockGuard +{ +public: + inline EventLockGuard(Event &event) + : m_Event(event) + { + m_Event->Lock(); + } + + inline ~EventLockGuard() + { + m_Event->Unlock(); + } + +private: + Event &m_Event; +}; + +// Select which implementation to use based on the platform. +#ifdef _WIN32 +using Event = EventImpl; +#elif defined(__linux__) or defined(__APPLE__) +using Event = EventImpl; +#endif + +#endif // EVENT_H diff --git a/catkit_core/EventBase.h b/catkit_core/EventBase.h new file mode 100644 index 00000000..0c39a0bf --- /dev/null +++ b/catkit_core/EventBase.h @@ -0,0 +1,128 @@ +#ifndef EVENT_BASE_H +#define EVENT_BASE_H + +#include +#include + +enum EventImplementationType +{ + ET_CONDITION_VARIABLE, + ET_FUTEX, + ET_SEMAPHORE, + ET_SPIN_LOCK +}; + +template +struct EventSharedState +{ +}; + +template +struct EventLocalState +{ +}; + +template +class EventImpl +{ +public: + using SharedState = EventSharedState; + using LocalState = EventLocalState; + +protected: + EventImpl(); + +public: + ~EventImpl(); + + // Note: do not implement these functions for specific implementations. + static std::unique_ptr> Create(const std::string &id, SharedState *shared_state); + static std::unique_ptr> Open(const std::string &id, SharedState *shared_state); + + // Note: implement the following functions for specific implementations. + inline void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); + inline void Signal(); + + inline void Lock(); + inline void Unlock(); + +protected: + inline void CreateImpl(const std::string &id, SharedState *shared_state); + inline void OpenImpl(const std::string &id, SharedState *shared_state); + + bool m_IsOwner; + + SharedState *m_SharedState; + LocalState m_LocalState; +}; + +template +EventImpl::EventImpl() + : m_IsOwner(false), m_SharedState(nullptr) +{ +} + +template +EventImpl::~EventImpl() +{ +} + +template +void EventImpl::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ + throw std::runtime_error("This type of event implementation wasn't implemented."); +} + +template +void EventImpl::Signal() +{ +} + +template +void EventImpl::Lock() +{ +} + +template +void EventImpl::Unlock() +{ +} + +template +std::unique_ptr> EventImpl::Create(const std::string &id, EventImpl::SharedState *shared_state) +{ + if (!shared_state) + throw std::runtime_error("The passed shared data was a nullptr."); + + auto obj = std::unique_ptr>(new EventImpl()); + + obj->CreateImpl(id, shared_state); + + obj->m_IsOwner = true; + obj->m_SharedState = shared_state; + + return obj; +} + +template +std::unique_ptr> EventImpl::Open(const std::string &id, EventImpl::SharedState *shared_state) +{ + if (!shared_state) + throw std::runtime_error("The passed shared data was a nullptr."); + + auto obj = std::unique_ptr>(new EventImpl()); + + obj->OpenImpl(id, shared_state); + + obj->m_IsOwner = false; + obj->m_SharedState = shared_state; + + return obj; +} + +#include "EventConditionVariable.inl" +#include "EventFutex.inl" +#include "EventSemaphore.inl" +#include "EventSpinLock.inl" + +#endif // EVENT_BASE_H diff --git a/catkit_core/EventConditionVariable.inl b/catkit_core/EventConditionVariable.inl new file mode 100644 index 00000000..881f13ed --- /dev/null +++ b/catkit_core/EventConditionVariable.inl @@ -0,0 +1,99 @@ +#include "EventBase.h" + +#include "Timing.h" + +#if defined(__linux__) || defined(__APPLE__) + #include +#endif + +using EventConditionVariable = EventImpl; + +#if defined(__linux__) || defined(__APPLE__) + +template<> +struct EventSharedState +{ + pthread_mutex_t m_Mutex; + pthread_cond_t m_Condition; +}; + +template<> +inline void EventConditionVariable::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ + Timer timer; + + while (!condition()) + { + // Wait for a maximum of 20ms to perform periodic error checking. + long timeout_wait = std::min(20L, timeout_in_ms); + +#ifdef __APPLE__ + // Relative timespec. + timespec timeout; + timeout.tv_sec = timeout_wait / 1000; + timeout.tv_nsec = 1000000 * (timeout_wait % 1000); + + int res = pthread_cond_timedwait_relative_np(&(m_SharedState->m_Condition), &(m_SharedState->m_Mutex), &timeout); +#else + // Absolute timespec. + timespec timeout; + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_sec += timeout_wait / 1000; + timeout.tv_nsec += 1000000 * (timeout_wait % 1000); + + int res = pthread_cond_timedwait(&(m_SharedState->m_Condition), &(m_SharedState->m_Mutex), &timeout); +#endif // __APPLE__ + if (res == ETIMEDOUT && timer.GetTime() > (timeout_in_ms * 0.001)) + { + throw std::runtime_error("Waiting time has expired."); + } + + if (error_check != nullptr) + error_check(); + } +} + +template<> +inline void EventConditionVariable::Signal() +{ + pthread_cond_broadcast(&(m_SharedState->m_Condition)); +} + +template<> +inline void EventConditionVariable::Lock() +{ + pthread_mutex_lock(&(m_SharedState->m_Mutex)); +} + +template<> +inline void EventConditionVariable::Unlock() +{ + pthread_mutex_unlock(&(m_SharedState->m_Mutex)); +} + +template<> +inline void EventConditionVariable::CreateImpl(const std::string &id, EventConditionVariable::SharedState *shared_state) +{ + pthread_mutexattr_t mutex_attr; + pthread_mutexattr_init(&mutex_attr); + pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&(shared_state->m_Mutex), &mutex_attr); + pthread_mutexattr_destroy(&mutex_attr); + + pthread_condattr_t cond_attr; + pthread_condattr_init(&cond_attr); + pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC); +#endif // __APPLE__ + pthread_cond_init(&(shared_state->m_Condition), &cond_attr); + pthread_condattr_destroy(&cond_attr); +} + +template<> +inline void EventConditionVariable::OpenImpl(const std::string &id, EventConditionVariable::SharedState *shared_state) +{ + // Nothing to do. +} + +#endif diff --git a/catkit_core/EventFutex.inl b/catkit_core/EventFutex.inl new file mode 100644 index 00000000..e69de29b diff --git a/catkit_core/EventSemaphore.inl b/catkit_core/EventSemaphore.inl new file mode 100644 index 00000000..1bff3e11 --- /dev/null +++ b/catkit_core/EventSemaphore.inl @@ -0,0 +1,123 @@ +#include "EventBase.h" + +#include "Timing.h" + +#ifdef _WIN32 + #define WIN32_LEAN_AND_MEAN + #define NOMINMAX + #include +#endif // _WIN32 + +using EventSemaphore = EventImpl; + +#ifdef _WIN32 + +template<> +struct EventSharedState +{ + std::atomic_long m_NumReadersWaiting; +}; + +template<> +struct EventLocalState +{ + HANDLE m_Semaphore; +}; + +template<> +inline void EventSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ + Timer timer; + DWORD res = WAIT_OBJECT_0; + + while (!condition()) + { + if (res == WAIT_OBJECT_0) + { + // Increment the number of readers that are waiting, making sure the counter + // is at least 1 after the increment. This can occur when a previous reader got + // interrupted and the trigger happening before decrementing the + // m_NumReadersWaiting counter. + while (m_SharedState->m_NumReadersWaiting++ < 0) + { + } + } + + // Wait for a maximum of 20ms to perform periodic error checking. + res = WaitForSingleObject(m_LocalState.m_Semaphore, (unsigned long) (std::min)(20L, timeout_in_ms)); + + if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) + { + m_SharedState->m_NumReadersWaiting--; + throw std::runtime_error("Waiting time has expired."); + } + + if (res == WAIT_FAILED) + { + m_SharedState->m_NumReadersWaiting--; + throw std::runtime_error("An error occured during waiting for the semaphore: " + std::to_string(GetLastError())); + } + + if (error_check != nullptr) + { + try + { + error_check(); + } + catch (...) + { + m_SharedState->m_NumReadersWaiting--; + throw; + } + } + } +} + +template<> +inline void EventSemaphore::Signal() +{ + // Notify waiting processes. + long num_readers_waiting = m_SharedState->m_NumReadersWaiting.exchange(0); + + // If a reader times out in between us reading the number of readers that are waiting + // and us releasing the semaphore, we are releasing one too many readers. This + // results in a future reader being released immediately, which is not a problem, + // as there are checks in place for that. + + if (num_readers_waiting > 0) + ReleaseSemaphore(m_LocalState.m_Semaphore, (LONG) num_readers_waiting, NULL); +} + +template<> +inline void EventSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) +{ + m_LocalState.m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); + + if (m_LocalState.m_Semaphore == NULL) + throw std::runtime_error("Something went wrong while creating semaphore."); + + shared_state->m_NumReadersWaiting = 0; +} + +template<> +inline void EventSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) +{ + m_LocalState.m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); + + if (m_LocalState.m_Semaphore == NULL) + throw std::runtime_error("Something went wrong while opening semaphore."); +} + +template<> +inline EventSemaphore::~EventImpl() +{ + CloseHandle(m_LocalState.m_Semaphore); +} + +#ifdef __linux__ +#endif // __linux__ + +#ifdef __APPLE__ +#endif // __APPLE__ + +#endif diff --git a/catkit_core/EventSpinLock.inl b/catkit_core/EventSpinLock.inl new file mode 100644 index 00000000..e69de29b diff --git a/catkit_core/LocalMemory.cpp b/catkit_core/LocalMemory.cpp new file mode 100644 index 00000000..57120fc0 --- /dev/null +++ b/catkit_core/LocalMemory.cpp @@ -0,0 +1,16 @@ +#include "LocalMemory.h" + +LocalMemory::LocalMemory(std::size_t num_bytes) + : m_Memory(new char[num_bytes]) +{ +} + +LocalMemory::~LocalMemory() +{ + delete[] m_Memory; +} + +void *LocalMemory::GetAddress(std::size_t offset) +{ + return m_Memory + offset; +} diff --git a/catkit_core/LocalMemory.h b/catkit_core/LocalMemory.h new file mode 100644 index 00000000..d237b9db --- /dev/null +++ b/catkit_core/LocalMemory.h @@ -0,0 +1,18 @@ +#ifndef LOCAL_MEMORY_H +#define LOCAL_MEMORY_H + +#include "Memory.h" + +class LocalMemory : public Memory +{ +public: + LocalMemory(std::size_t num_bytes); + virtual ~LocalMemory(); + + virtual void *GetAddress(std::size_t offset = 0); + +private: + char *m_Memory; +}; + +#endif // LOCAL_MEMORY_H diff --git a/catkit_core/Memory.h b/catkit_core/Memory.h new file mode 100644 index 00000000..9f20e9ed --- /dev/null +++ b/catkit_core/Memory.h @@ -0,0 +1,16 @@ +#ifndef MEMORY_H +#define MEMORY_H + +#include + +class Memory +{ +public: + virtual ~Memory() + { + } + + virtual void *GetAddress(std::size_t offset = 0) = 0; +}; + +#endif // MEMORY_H diff --git a/catkit_core/MessageBroker.cpp b/catkit_core/MessageBroker.cpp new file mode 100644 index 00000000..0bce36fe --- /dev/null +++ b/catkit_core/MessageBroker.cpp @@ -0,0 +1,312 @@ +#include "MessageBroker.h" + +#include "Util.h" +#include "Timing.h" +#include "HostName.h" + +#include +#include + +template +T fetch_max(std::atomic &atom, T value) +{ + T current = atom.load(std::memory_order_relaxed); + + while (current < value && !atom.compare_exchange_weak(current, value, std::memory_order_acq_rel)) + { + } + + return current; +} + +class SubtopicIterator { +public: + SubtopicIterator(std::string_view str, char delimiter, bool is_valid = true) + : m_String(str), m_Delimiter(delimiter), m_IsValid(is_valid) + { + } + + std::string_view operator*() const + { + return m_String; + } + + SubtopicIterator& operator++() + { + if (m_String.empty()) + { + m_IsValid = false; + return *this; + } + + size_t pos = m_String.rfind(m_Delimiter); + + if (pos != std::string_view::npos) + { + // Remove the last part. + m_String.remove_suffix(m_String.size() - pos); + } + else + { + // No more delimiters left, so remove the whole string. + m_String.remove_suffix(m_String.size()); + } + + return *this; + } + + bool operator!=(const SubtopicIterator& other) const + { + if (!m_IsValid && !other.m_IsValid) + { + // If both iterators are invalid, they are equal no matter what. + return false; + } + + return m_IsValid == other.m_IsValid || + m_String.data() != other.m_String.data() || + m_String.size() != other.m_String.size(); + } + + static SubtopicIterator end() + { + return SubtopicIterator({}, '\0', true); + } + +private: + std::string_view m_String; + char m_Delimiter; + bool m_IsValid; +}; + +class SubtopicRange +{ +public: + SubtopicRange(std::string_view str, char delimiter = '/') + : m_String(str), m_Delimiter(delimiter) + { + } + + SubtopicIterator begin() const + { + return SubtopicIterator(m_String, m_Delimiter); + } + + SubtopicIterator end() const + { + return SubtopicIterator::end(); + } + +private: + std::string_view m_String; + char m_Delimiter; +}; + +TopicHeader::TopicHeader(const TopicHeader &header) +{ + CopyFrom(header); +} + +TopicHeader &TopicHeader::operator=(const TopicHeader &header) +{ + CopyFrom(header); + + return *this; +} + +void TopicHeader::CopyFrom(const TopicHeader &header) +{ + next_frame_id.store(header.next_frame_id.load(std::memory_order_relaxed), std::memory_order_relaxed); + synchronization = header.synchronization; + + std::copy(header.message_headers, header.message_headers + TOPIC_MAX_NUM_MESSAGES, message_headers); + std::copy((char *)header.metadata_keys, (char *)header.metadata_keys + sizeof(metadata_keys), (char *)metadata_keys); +} + +Message MessageBroker::PrepareMessage(const std::string &topic, size_t payload_size, int8_t device_id) +{ + Uuid trace_id; + m_UuidGenerator.Generate(trace_id); + + return PrepareMessage(topic, trace_id, payload_size, device_id); +} + +Message MessageBroker::PrepareMessage(const std::string &topic, Uuid trace_id, size_t payload_size, int8_t device_id) +{ + Message message; + + message.m_HasBeenPublished = false; + message.m_MessageBroker = shared_from_this(); + + // Allocate a payload. + auto allocator = GetAllocator(device_id); + + if (allocator == nullptr) + { + throw std::runtime_error("Invalid device ID."); + } + + auto block_handle = allocator->Allocate(payload_size); + + if (block_handle == FreeListAllocator::INVALID_HANDLE) + { + throw std::runtime_error("Could not allocate payload."); + } + + auto offset = allocator->GetOffset(block_handle); + + auto memory = GetMemory(device_id); + message.m_Payload = memory->GetAddress(offset); + + // Allocate a message header. + auto message_header_handle = m_MessageHeaderAllocator.Allocate(); + + if (message_header_handle == PoolAllocator::INVALID_HANDLE) + { + // Deallocate allocated memory block. + // TODO: Do this with RAII. + allocator->Deallocate(block_handle); + + throw std::runtime_error("Could not allocate message header."); + } + + // Access the message header. + message.m_Header = &m_MessageHeaders[message_header_handle]; + auto header = message.m_Header; + + // Set the payload information. + header->payload_info.device_id = device_id; + header->payload_info.total_size = payload_size; + header->payload_info.offset_in_buffer = offset; + m_UuidGenerator.Generate(header->payload_id); + + // Set the topic. + std::strncpy(header->topic, topic.c_str(), sizeof(header->topic)); + + // Set the trace ID. + std::strncpy(header->trace_id, trace_id, sizeof(header->trace_id)); + + // Set the producer information. + std::strncpy(header->producer_hostname, GetHostName().c_str(), sizeof(header->producer_hostname)); + header->producer_pid = GetProcessId(); + + header->partial_frame_id = 0; + header->start_byte = 0; + header->end_byte = payload_size; + + // Set default values. + header->frame_id = INVALID_FRAME_ID; + header->producer_timestamp = 0; + + return message; +} + +void MessageBroker::PublishMessage(Message &message, bool is_final) +{ + if (message.m_HasBeenPublished) + { + return; + } + + auto topic = std::string_view(message.m_Header->topic); + auto topic_header = m_TopicHeaders.Find(topic); + + if (message.m_Header->frame_id == INVALID_FRAME_ID) + { + // First partial frame. Assign a new frame ID. + message.m_Header->frame_id = topic_header->next_frame_id.fetch_add(1, std::memory_order_relaxed); + message.m_Header->partial_frame_id = 0; + + // If the ring buffer is full, make the oldest frame unavailable. + if ((topic_header->last_frame_id - topic_header->first_frame_id) >= TOPIC_MAX_NUM_MESSAGES) + { + topic_header->first_frame_id++; + } + } + else + { + // Not the first partial frame. Use the same frame ID and increment the partial frame ID. + message.m_Header->partial_frame_id++; + } + + // Set the timestamp. + message.m_Header->producer_timestamp = GetTimeStamp(); + + for (const auto &subtopic : SubtopicRange(topic)) + { + auto topic_header = m_TopicHeaders.Find(std::string(subtopic)); + + if (!topic_header) + { + // TODO: if the topic header doesn't exist, create it. + + } + + auto synchronization = GetSynchronization(subtopic); + + // Copy over message header reference. + std::size_t message_header_index = message.m_Header - m_MessageHeaders; + topic_header->message_headers[message.m_Header->frame_id % TOPIC_MAX_NUM_MESSAGES] = message_header_index; + + { + // Obtain a lock as we're about to signal the synchronization structure. + auto lock = SynchronizationLock(synchronization); + + // Make the message available. + fetch_max(topic_header->last_frame_id, message.m_Header->frame_id); + + // Signal the synchronization structure. + synchronization->Signal(); + } + } + + if (!is_final) + { + // Copy the message header since it's gone after publishing. + auto message_header_handle = m_MessageHeaderAllocator.Allocate(); + + if (message_header_handle == PoolAllocator::INVALID_HANDLE) + { + throw std::runtime_error("Could not allocate message header."); + } + + auto new_message_header = &m_MessageHeaders[message_header_handle]; + *new_message_header = *message.m_Header; + message.m_Header = new_message_header; + } + + message.m_HasBeenPublished = is_final; +} + +std::shared_ptr MessageBroker::GetAllocator(int8_t device_id) +{ + if (device_id < -1 || device_id >= MAX_NUM_GPUS) + { + return nullptr; + } + + if (device_id == -1) + { + return m_CpuPayloadAllocator; + } + + return m_GpuPayloadAllocator[device_id]; +} + +std::shared_ptr MessageBroker::GetSynchronization(std::string_view topic) +{ + auto topic_header = m_TopicHeaders.Find(topic); + + if (topic_header == nullptr) + { + return nullptr; + } + + // Look up the synchronization structure (not the shared data). + if (m_Synchronizations.find(topic) == m_Synchronizations.end()) + { + m_Synchronizations[topic] = std::make_shared(topic_header->synchronization); + } + + return m_Synchronizations[topic]; +} diff --git a/catkit_core/MessageBroker.h b/catkit_core/MessageBroker.h new file mode 100644 index 00000000..091b34d3 --- /dev/null +++ b/catkit_core/MessageBroker.h @@ -0,0 +1,204 @@ +#ifndef MESSAGE_BROKER_H +#define MESSAGE_BROKER_H + +#include "HashMap.h" +#include "Synchronization.h" +#include "FreeListAllocator.h" +#include "PoolAllocator.h" +#include "SharedMemory.h" +#include "CudaSharedMemory.h" +#include "UuidGenerator.h" + +#include +#include + +const char * const MESSAGE_BROKER_VERSION = "0.1"; + +const size_t VERSION_SIZE = 8; +const size_t TOPIC_HASH_MAP_SIZE = 16384; +const size_t TOPIC_MAX_KEY_SIZE = 128; +const size_t TOPIC_MAX_NUM_MESSAGES = 15; +const size_t HOST_NAME_SIZE = 64; +const size_t METADATA_MAX_STRLEN = 16; +const size_t MAX_NUM_DIMENSIONS = 4; +const size_t MAX_NUM_METADATA_ENTRIES = 16; +const size_t MAX_SHARED_MEMORY_ID_SIZE = 64; +const size_t MAX_NUM_GPUS = 8; + +const std::uint64_t INVALID_FRAME_ID = 0xFFFFFFFFFFFFFFFF; + +union MetadataEntry +{ + std::uint64_t integer; + double floating_point; + char string[METADATA_MAX_STRLEN]; +}; + +struct ArrayInfo +{ + char data_type; + char byte_order; + std::uint8_t num_dimensions; + std::uint32_t shape[MAX_NUM_DIMENSIONS]; + std::uint32_t strides[MAX_NUM_DIMENSIONS]; +}; + +struct PayloadInfo +{ + std::int8_t device_id; + std::uint64_t offset_in_buffer; + std::uint64_t total_size; + + ArrayInfo array_info; +}; + +struct MessageHeader +{ + char topic[TOPIC_MAX_KEY_SIZE]; + + Uuid payload_id; + std::uint64_t frame_id; + + Uuid trace_id; + + char producer_hostname[HOST_NAME_SIZE]; + std::uint32_t producer_pid; + std::uint64_t producer_timestamp; + + PayloadInfo payload_info; + + MetadataEntry metadata_entries[MAX_NUM_METADATA_ENTRIES]; + + std::uint16_t partial_frame_id; + std::uint64_t start_byte; + std::uint64_t end_byte; +}; + +struct TopicHeader +{ + std::atomic_uint64_t next_frame_id; + std::atomic_uint64_t first_frame_id; + std::atomic_uint64_t last_frame_id; + + std::uint64_t message_headers[TOPIC_MAX_NUM_MESSAGES]; + + SynchronizationSharedData synchronization; + + char metadata_keys[METADATA_MAX_STRLEN][MAX_NUM_METADATA_ENTRIES]; + + TopicHeader() = default; + TopicHeader(const TopicHeader &header); + + TopicHeader &operator=(const TopicHeader &header); + +private: + void CopyFrom(const TopicHeader &header); +}; + +struct MessageBrokerHeader +{ + char version[VERSION_SIZE]; + char creator_hostname[HOST_NAME_SIZE]; + std::uint64_t time_of_last_activity; + + char buffer_shared_memory_id[MAX_SHARED_MEMORY_ID_SIZE]; + CudaIpcHandle cuda_ipc_handles[MAX_NUM_GPUS]; +}; + +class MessageBroker; + +class Message +{ + friend class MessageBroker; + +private: + Message(); + +public: + ~Message(); + + const char *GetTopic() const; + + const Uuid &GetPayloadId() const; + const std::uint64_t GetFrameId() const; + + const Uuid &GetTraceId() const; + + const char *GetProducerHostnname() const; + const std::uint32_t GetProducerPid() const; + const std::uint64_t GetProducerTimestamp() const; + + const PayloadInfo &GetPayloadInfo() const; + + const ArrayInfo &GetArrayInfo() const; + void SetArrayInfo(const ArrayInfo &array_info); + + void *GetPayload() const; + size_t GetPayloadSize() const; + + const MetadataEntry &GetMetadataEntry(std::uint8_t metadata_id) const; + void SetMetadataEntry(std::uint8_t metadata_id, std::uint64_t value); + void SetMetadataEntry(std::uint8_t metadata_id, double value); + void SetMetadataEntry(std::uint8_t metadata_id, const char *value); + + const std::uint16_t GetPartialFrameId() const; + + const std::uint64_t GetStartByte() const; + void SetStartByte(const std::uint64_t &start_byte); + + const std::uint64_t GetEndByte() const; + void SetEndByte(const std::uint64_t &end_byte); + +private: + MessageHeader *m_Header; + void *m_Payload; + + bool m_HasBeenPublished; + + std::shared_ptr m_MessageBroker; +}; + +class MessageBroker : std::enable_shared_from_this +{ + friend class Message; + +private: + MessageBroker(); // TODO: Add parameters. + +public: + std::unique_ptr Create(); // TODO: Add parameters. + std::unique_ptr Open(void *metadata_buffer); + + Message PrepareMessage(const std::string &topic, size_t payload_size, int8_t device_id = -1); + Message PrepareMessage(const std::string &topic, Uuid trace_id, size_t payload_size, int8_t device_id = -1); + + void PublishMessage(Message &message, bool is_final = true); + + Message GetNextMessage(const std::string &topic, double timeout_in_seconds); + Message GetMessage(const std::string &topic, size_t frame_id); + +private: + std::shared_ptr GetAllocator(int8_t device_id); + std::shared_ptr GetMemory(int8_t device_id); + + std::shared_ptr GetSynchronization(std::string_view topic); + + MessageBrokerHeader &m_Header; + + HashMap m_TopicHeaders; + PoolAllocator m_MessageHeaderAllocator; + + MessageHeader *m_MessageHeaders; + + std::shared_ptr m_CpuPayloadAllocator; + std::shared_ptr m_CpuPayloadMemory; + + std::shared_ptr m_GpuPayloadAllocator[MAX_NUM_GPUS]; + std::shared_ptr m_GpuPayloadMemory[MAX_NUM_GPUS]; + + UuidGenerator m_UuidGenerator; + + std::map> m_Synchronizations; +}; + +#endif // MESSAGE_BROKER_H diff --git a/catkit_core/SharedMemory.cpp b/catkit_core/SharedMemory.cpp index a469b9c4..c8b8d356 100644 --- a/catkit_core/SharedMemory.cpp +++ b/catkit_core/SharedMemory.cpp @@ -82,7 +82,7 @@ SharedMemory::SharedMemory(const std::string &id, FileObject file, bool is_owner throw std::runtime_error("Something went wrong while mapping shared memory file."); } -void *SharedMemory::GetAddress() +void *SharedMemory::GetAddress(std::size_t offset) { - return m_Buffer; + return static_cast(m_Buffer) + offset; } diff --git a/catkit_core/SharedMemory.h b/catkit_core/SharedMemory.h index aa6764bd..ca78b51c 100644 --- a/catkit_core/SharedMemory.h +++ b/catkit_core/SharedMemory.h @@ -1,6 +1,8 @@ #ifndef SHARED_MEMORY_H #define SHARED_MEMORY_H +#include "Memory.h" + #include #include @@ -21,7 +23,7 @@ typedef int FileObject; #endif -class SharedMemory +class SharedMemory : public Memory { private: SharedMemory(const std::string &id, FileObject file, bool is_owner); @@ -32,7 +34,7 @@ class SharedMemory static std::shared_ptr Create(const std::string &id, size_t num_bytes_in_buffer); static std::shared_ptr Open(const std::string &id); - void *GetAddress(); + void *GetAddress(std::size_t offset = 0) override; private: std::string m_Id; diff --git a/catkit_core/Synchronization.cpp b/catkit_core/Synchronization.cpp index 8da03b86..7bb81872 100644 --- a/catkit_core/Synchronization.cpp +++ b/catkit_core/Synchronization.cpp @@ -9,17 +9,6 @@ #include "Timing.h" -SynchronizationLock::SynchronizationLock(Synchronization *sync) - : m_Sync(sync) -{ - m_Sync->Lock(); -} - -SynchronizationLock::~SynchronizationLock() -{ - m_Sync->Unlock(); -} - Synchronization::Synchronization() : m_IsOwner(false), m_SharedData(nullptr) { diff --git a/catkit_core/Synchronization.h b/catkit_core/Synchronization.h index 45ec8dfa..6f6ba5cf 100644 --- a/catkit_core/Synchronization.h +++ b/catkit_core/Synchronization.h @@ -27,16 +27,6 @@ struct SynchronizationSharedData #endif }; -class SynchronizationLock -{ -public: - SynchronizationLock(Synchronization *sync); - ~SynchronizationLock(); - -private: - Synchronization *m_Sync; -}; - class Synchronization { public: @@ -67,4 +57,22 @@ class Synchronization #endif }; +template +class SynchronizationLock +{ +public: + inline SynchronizationLock(T sync) + : m_Sync(sync) + { + m_Sync->Lock(); + } + inline ~SynchronizationLock() + { + m_Sync->Unlock(); + } + +private: + T m_Sync; +}; + #endif // SYNCHRONIZATION_H diff --git a/catkit_core/UuidGenerator.cpp b/catkit_core/UuidGenerator.cpp new file mode 100644 index 00000000..80328a02 --- /dev/null +++ b/catkit_core/UuidGenerator.cpp @@ -0,0 +1,21 @@ +#include "UuidGenerator.h" + +UuidGenerator::UuidGenerator() +{ + std::random_device random_device; + + for (size_t i = 0; i < 2; ++i) + { + std::seed_seq seed{random_device(), random_device()}; + m_Engines[i].seed(seed); + } +} + +void UuidGenerator::Generate(Uuid &uuid) +{ + for (size_t i = 0; i < 2; ++i) + { + std::uint64_t value = m_Engines[i](); + *reinterpret_cast(uuid + i * 8) = value; + } +} diff --git a/catkit_core/UuidGenerator.h b/catkit_core/UuidGenerator.h new file mode 100644 index 00000000..87523421 --- /dev/null +++ b/catkit_core/UuidGenerator.h @@ -0,0 +1,19 @@ +#ifndef UUID_GENERATOR_H +#define UUID_GENERATOR_H + +#include + +using Uuid = char[16]; + +class UuidGenerator +{ +public: + UuidGenerator(); + + void Generate(Uuid &uuid); + +private: + std::mt19937_64 m_Engines[2]; +}; + +#endif // UUID_GENERATOR_H