Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a shared-memory message broker. #272

Draft
wants to merge 14 commits into
base: develop
Choose a base branch
from
Draft
6 changes: 6 additions & 0 deletions benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,16 @@ add_executable(hash_map hash_map.cpp)
target_include_directories(hash_map PUBLIC ../catkit_core)
target_link_libraries(hash_map PUBLIC catkit_core)

# Uuid generator benchmark
add_executable(uuid_generator uuid_generator.cpp)
target_include_directories(uuid_generator PUBLIC ../catkit_core)
target_link_libraries(uuid_generator PUBLIC catkit_core)

# Add install files
install(TARGETS datastream_latency DESTINATION bin)
install(TARGETS datastream_submit DESTINATION bin)
install(TARGETS timestamp DESTINATION bin)
install(TARGETS free_list_allocator DESTINATION bin)
install(TARGETS pool_allocator DESTINATION bin)
install(TARGETS hash_map DESTINATION bin)
install(TARGETS uuid_generator DESTINATION bin)
32 changes: 32 additions & 0 deletions benchmarks/uuid_generator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#include "UuidGenerator.h"
#include "Timing.h"

#include <iostream>

int main()
{
const size_t N = 100000000;

UuidGenerator generator;

char uuid[16];

std::cout << std::hex;

auto start = GetTimeStamp();

for (size_t i = 0; i < N; ++i)
{
generator.Generate(uuid);
}

auto end = GetTimeStamp();

std::cout << std::dec;

std::cout << "Time: " << (end - start) / 1e9 << " sec" << std::endl;
std::cout << "Throughput: " << N / ((end - start) / 1e9) << " ops/s" << std::endl;
std::cout << "Time per operation: " << (end - start) / N << " ns" << std::endl;

return 0;
}
3 changes: 3 additions & 0 deletions catkit_core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ add_library(catkit_core STATIC
Util.cpp
PoolAllocator.cpp
FreeListAllocator.cpp
MessageBroker.cpp
UuidGenerator.cpp
LocalMemory.cpp
proto/core.pb.cc
proto/logging.pb.cc
proto/testbed.pb.cc
Expand Down
30 changes: 30 additions & 0 deletions catkit_core/CudaSharedMemory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef CUDA_SHARED_MEMORY_H
#define CUDA_SHARED_MEMORY_H

#include "Memory.h"

#include <memory>

#ifdef HAVE_CUDA
#include <cuda_runtime_api.h>
typedef cudaIpcMemHandle_t CudaIpcHandle;
#else
// CUDA cudaIpcMemHandle_t is a struct of 64 bytes.
typedef char CudaIpcHandle[64];
#endif

class CudaSharedMemory : public Memory
{
private:
CudaSharedMemory(const CudaIpcHandle &ipc_handle, void *device_pointer=nullptr);

public:
~CudaSharedMemory();

static std::shared_ptr<CudaSharedMemory> Create(size_t num_bytes_in_buffer);
static std::shared_ptr<CudaSharedMemory> Open(const CudaIpcHandle &ipc_handle);

void *GetAddress(std::size_t offset = 0) override;
};

#endif // CUDA_SHARED_MEMORY_H
16 changes: 16 additions & 0 deletions catkit_core/LocalMemory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include "LocalMemory.h"

LocalMemory::LocalMemory(std::size_t num_bytes)
: m_Memory(new char[num_bytes])
{
}

LocalMemory::~LocalMemory()
{
delete[] m_Memory;
}

void *LocalMemory::GetAddress(std::size_t offset)
{
return m_Memory + offset;
}
18 changes: 18 additions & 0 deletions catkit_core/LocalMemory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#ifndef LOCAL_MEMORY_H
#define LOCAL_MEMORY_H

#include "Memory.h"

class LocalMemory : public Memory
{
public:
LocalMemory(std::size_t num_bytes);
virtual ~LocalMemory();

virtual void *GetAddress(std::size_t offset = 0);

private:
char *m_Memory;
};

#endif // LOCAL_MEMORY_H
16 changes: 16 additions & 0 deletions catkit_core/Memory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#ifndef MEMORY_H
#define MEMORY_H

#include <cstddef>

class Memory
{
public:
virtual ~Memory()
{
}

virtual void *GetAddress(std::size_t offset = 0) = 0;
};

#endif // MEMORY_H
201 changes: 201 additions & 0 deletions catkit_core/MessageBroker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
#include "MessageBroker.h"

#include "Util.h"
#include "Timing.h"
#include "HostName.h"

#include <algorithm>

TopicHeader::TopicHeader(const TopicHeader &header)
{
CopyFrom(header);
}

TopicHeader &TopicHeader::operator=(const TopicHeader &header)
{
CopyFrom(header);

return *this;
}

void TopicHeader::CopyFrom(const TopicHeader &header)
{
next_frame_id.store(header.next_frame_id.load(std::memory_order_relaxed), std::memory_order_relaxed);
synchronization = header.synchronization;

std::copy(header.message_offsets, header.message_offsets + TOPIC_MAX_NUM_MESSAGES, message_offsets);
std::copy((char *)header.metadata_keys, (char *)header.metadata_keys + sizeof(metadata_keys), (char *)metadata_keys);
}

Message MessageBroker::PrepareMessage(const std::string &topic, size_t payload_size, int8_t device_id)
{
Uuid trace_id;
m_UuidGenerator.Generate(trace_id);

return PrepareMessage(topic, trace_id, payload_size, device_id);
}

Message MessageBroker::PrepareMessage(const std::string &topic, Uuid trace_id, size_t payload_size, int8_t device_id)
{
Message message;

message.m_HasBeenPublished = false;
message.m_MessageBroker = shared_from_this();

// Allocate a payload.
auto allocator = GetAllocator(device_id);

if (allocator == nullptr)
{
throw std::runtime_error("Invalid device ID.");
}

auto block_handle = allocator->Allocate(payload_size);

if (block_handle == FreeListAllocator::INVALID_HANDLE)
{
throw std::runtime_error("Could not allocate payload.");
}

auto offset = allocator->GetOffset(block_handle);

if (device_id < 0)
{
message.m_Payload = m_CpuPayloadMemory->GetAddress(offset);
}
else
{
message.m_Payload = m_GpuPayloadMemory[device_id]->GetAddress(offset);
}

// Allocate a message header.
auto message_header_handle = m_MessageHeaderAllocator.Allocate();

if (message_header_handle == PoolAllocator::INVALID_HANDLE)
{
throw std::runtime_error("Could not allocate message header.");
}

// Access the message header.
message.m_Header = &m_MessageHeaders[message_header_handle];
auto header = message.m_Header;

// Set the payload information.
header->payload_info.device_id = device_id;
header->payload_info.total_size = payload_size;
header->payload_info.offset_in_buffer = offset;
m_UuidGenerator.Generate(header->payload_id);

// Set the topic.
std::strncpy(header->topic, topic.c_str(), sizeof(header->topic));

// Set the trace ID.
std::strncpy(header->trace_id, trace_id, sizeof(header->trace_id));

// Set the producer information.
std::strncpy(header->producer_hostname, GetHostName().c_str(), sizeof(header->producer_hostname));
header->producer_pid = GetProcessId();

header->partial_frame_id = 0;
header->start_byte = 0;
header->end_byte = payload_size;

// Set default values.
header->frame_id = INVALID_FRAME_ID;
header->producer_timestamp = 0;

return message;
}

void MessageBroker::PublishMessage(Message &message, bool is_final)
{
if (message.m_HasBeenPublished)
{
return;
}

auto topic = std::string_view(message.m_Header->topic);
auto topic_header = m_TopicHeaders.Find(topic);

if (message.m_Header->frame_id == INVALID_FRAME_ID)
{
// First partial frame. Assign a new frame ID.
message.m_Header->frame_id = topic_header->next_frame_id.fetch_add(1, std::memory_order_relaxed);
message.m_Header->partial_frame_id = 0;
}
else
{
// Not the first partial frame. Use the same frame ID and increment the partial frame ID.
message.m_Header->partial_frame_id++;
}

// Set the timestamp.
message.m_Header->producer_timestamp = GetTimeStamp();

// TODO: put message offsets.

// Go to synchronization structures and signal them.
// This includes parent topics.
for (std::size_t i = 0; i <= topic.size(); ++i)
{
std::size_t size = topic.size() - i;

if (i == 0 || topic[size] == '/')
{
auto synchronization = GetSynchronization(topic.substr(0, size));

if (synchronization)
synchronization->Signal();
}
}

if (!is_final)
{
// Copy the message header since it's gone after publishing.
auto message_header_handle = m_MessageHeaderAllocator.Allocate();

if (message_header_handle == PoolAllocator::INVALID_HANDLE)
{
throw std::runtime_error("Could not allocate message header.");
}

auto new_message_header = &m_MessageHeaders[message_header_handle];
*new_message_header = *message.m_Header;
message.m_Header = new_message_header;
}

message.m_HasBeenPublished = is_final;
}

std::shared_ptr<FreeListAllocator> MessageBroker::GetAllocator(int8_t device_id)
{
if (device_id < -1 || device_id >= MAX_NUM_GPUS)
{
return nullptr;
}

if (device_id == -1)
{
return m_CpuPayloadAllocator;
}

return m_GpuPayloadAllocator[device_id];
}

std::shared_ptr<Synchronization> MessageBroker::GetSynchronization(std::string_view topic)
{
auto topic_header = m_TopicHeaders.Find(topic);

if (topic_header == nullptr)
{
return nullptr;
}

// Look up the synchronization structure (not the shared data).
if (m_Synchronizations.find(topic) == m_Synchronizations.end())
{
m_Synchronizations[topic] = std::make_shared<Synchronization>(topic_header->synchronization);
}

return m_Synchronizations[topic];
}
Loading
Loading