Skip to content

Commit

Permalink
Kernel+LibLemon: All message data for a queue in same buffer to avoid…
Browse files Browse the repository at this point in the history
… excessive allocations
  • Loading branch information
fido2020 committed Feb 28, 2021
1 parent e122a57 commit 909b03c
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 153 deletions.
65 changes: 29 additions & 36 deletions Kernel/include/objects/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,11 @@

#include <objects/kobject.h>

struct Message{
union{
uint64_t data; // Empty, pointer or integer
uint8_t* dataP;
};
uint64_t id = 0;
uint16_t size = 0;
};

struct Reponse{
Message* ret;
uint64_t id;
};

struct MessageEndpointInfo{
uint16_t msgSize;
};

class MessageEndpoint final : public KernelObject{
private:
friend Pair<FancyRefPtr<MessageEndpoint>,FancyRefPtr<MessageEndpoint>> CreatePair();
uint16_t maxMessageSize = 8;
uint16_t messageQueueLimit = 128;
lock_t queueLock = 0;

Semaphore queueAvailablilitySemaphore = Semaphore(messageQueueLimit);

RingBuffer<Message> queue;

FancyRefPtr<MessageEndpoint> peer;

List<KernelObjectWatcher*> waiting;
List<Pair<Semaphore*, Reponse>> waitingResponse;

lock_t waitingLock = 0;
lock_t waitingResponseLock = 0;
public:
static const uint16_t maxMessageSizeLimit = UINT16_MAX;

Expand All @@ -71,11 +40,11 @@ class MessageEndpoint final : public KernelObject{
///
/// \param id Pointer to the ID to be populated
/// \param size Pointer to the message size to be populated
/// \param data Pointer to an unsigned integer representing either 8 bytes of data (size <= 8) or a pointer to a buffer of size length containing message data
/// \param data Pointer to data
///
/// \return 0 on success, 1 on empty, negative error code on failure
/////////////////////////////
int64_t Read(uint64_t* id, uint16_t* size, uint64_t* data);
int64_t Read(uint64_t* id, uint16_t* size, uint8_t* data);

/////////////////////////////
/// \brief Send a message and return the response
Expand All @@ -85,7 +54,7 @@ class MessageEndpoint final : public KernelObject{
///
/// \param id ID of the message to be sent
/// \param size Size of the message to be sent
/// \param data Either a pointer to data (size > 8) or 8 bytes of data to be sent (size <= 8)
/// \param data Pointer to data
///
/// \param rID ID of the expected message
/// \param size Pointer to the message size to be populated
Expand All @@ -95,7 +64,7 @@ class MessageEndpoint final : public KernelObject{
///
/// \return 0 on success, 1 on timeout, negative error code on failure
/////////////////////////////
int64_t Call(uint64_t id, uint16_t size, uint64_t data, uint64_t rID, uint16_t* rSize, uint64_t* rData, int64_t timeout);
int64_t Call(uint64_t id, uint16_t size, uint64_t data, uint64_t rID, uint16_t* rSize, uint8_t* rData, int64_t timeout);

/////////////////////////////
/// \brief Send a message
Expand All @@ -104,7 +73,7 @@ class MessageEndpoint final : public KernelObject{
///
/// \param id ID of the message to be sent
/// \param size Size of the message to be sent
/// \param data Either a pointer to data (size > 8) or 8 bytes of data to be sent (size <= 8)
/// \param data Pointer to data
///
/// \return 0 on success, negative error code on failure
/////////////////////////////
Expand All @@ -124,4 +93,28 @@ class MessageEndpoint final : public KernelObject{

inline static constexpr kobject_id_t TypeID() { return KOBJECT_ID_MESSAGE_ENDPOINT; }
inline kobject_id_t InstanceTypeID() const { return TypeID(); }

private:
struct Response{
uint64_t id;
uint16_t* size;
uint8_t** buffer;
};

friend Pair<FancyRefPtr<MessageEndpoint>,FancyRefPtr<MessageEndpoint>> CreatePair();
uint16_t maxMessageSize = 8;
uint16_t messageQueueLimit = 128;
lock_t queueLock = 0;

Semaphore queueAvailablilitySemaphore = Semaphore(messageQueueLimit);

RawRingBuffer queue;

FancyRefPtr<MessageEndpoint> peer;

List<KernelObjectWatcher*> waiting;
List<Pair<Semaphore*, Response>> waitingResponse;

lock_t waitingLock = 0;
lock_t waitingResponseLock = 0;
};
183 changes: 160 additions & 23 deletions Kernel/include/ringbuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <memory.h>
#include <spin.h>

template<typename T>
template<typename T = uint8_t>
class RingBuffer{
public:
RingBuffer(){
Expand All @@ -17,9 +17,18 @@ class RingBuffer{
dequeuePointer = buffer;
}

void Enqueue(T* data){
acquireLock(&dequeueLock);
RingBuffer(unsigned bSize) : bufferSize(bSize) {
buffer = reinterpret_cast<T*>(kmalloc(sizeof(T) * bufferSize));

bufferEnd = &buffer[bufferSize];

enqueuePointer = buffer;
dequeuePointer = buffer;
}

void Enqueue(const T* data){
acquireLock(&enqueueLock);
acquireLock(&dequeueLock);

memcpy(enqueuePointer++, data, sizeof(T));

Expand All @@ -28,54 +37,182 @@ class RingBuffer{
}

if(enqueuePointer == dequeuePointer){
Resize((bufferSize + 2) << 1);
}

T* oldBuffer = buffer;
T* oldBufferEnd = bufferEnd;
releaseLock(&enqueueLock);
releaseLock(&dequeueLock);
}

bufferSize = (bufferSize + 2) << 1;
void EnqueueUnlocked(const T* data, size_t count){
size_t contiguousCount = count;
size_t wrappedCount = 0;

buffer = reinterpret_cast<T*>(kmalloc(sizeof(T) * bufferSize));
bufferEnd = &buffer[bufferSize - 1];

memcpy(bufferEnd - (oldBufferEnd - dequeuePointer), dequeuePointer, oldBufferEnd - dequeuePointer);
if(enqueuePointer < dequeuePointer && enqueuePointer + count > dequeuePointer){
Resize((bufferSize << 1) + count); // Ring buffer is full
} else if(enqueuePointer + contiguousCount > bufferEnd){
contiguousCount = (uintptr_t)(bufferEnd - enqueuePointer); // Get number of contiguous elements to copy
wrappedCount = count - contiguousCount; // Get number of elements that did not fit

dequeuePointer = bufferEnd - (oldBufferEnd - dequeuePointer);
enqueuePointer = buffer + (enqueuePointer - oldBuffer);
if(buffer + wrappedCount >= dequeuePointer){
Resize((bufferSize << 1) + count); // Ring buffer is full
}
}

kfree(oldBuffer);
memcpy(enqueuePointer, data, sizeof(T) * contiguousCount);

if(wrappedCount > 0){
memcpy(buffer, data + contiguousCount, sizeof(T) * wrappedCount);
}

size += count;
enqueuePointer = buffer + ((enqueuePointer + count - buffer) % bufferSize); // Calculate new enqueuePointer
}

inline void Enqueue(const T* data, size_t count){
acquireLock(&enqueueLock);
acquireLock(&dequeueLock);

EnqueueUnlocked(data, count);

releaseLock(&enqueueLock);
releaseLock(&dequeueLock);
}

int Dequeue(T* data, size_t amount){
acquireLock(&dequeueLock);
if(dequeuePointer == enqueuePointer){
releaseLock(&dequeueLock);
return 0;
}

size_t counter = 0;
while(dequeuePointer != enqueuePointer && counter < amount) {
memcpy(data++, dequeuePointer++, sizeof(T));
counter++;
acquireLock(&dequeueLock);

size_t contiguous = amount;
size_t wrapped = 0;
if(dequeuePointer < enqueuePointer && dequeuePointer + amount > enqueuePointer){
contiguous = enqueuePointer - dequeuePointer;
} else if(dequeuePointer + contiguous > bufferEnd){
contiguous = bufferEnd - dequeuePointer;
wrapped = amount - contiguous;

if(buffer + wrapped > enqueuePointer){
wrapped = enqueuePointer - buffer;
}
}

memcpy(data, dequeuePointer, contiguous * sizeof(T));
dequeuePointer += contiguous;

if(dequeuePointer >= bufferEnd){
dequeuePointer = buffer + wrapped;

if(dequeuePointer >= bufferEnd){
dequeuePointer = buffer;
if(wrapped){
memcpy(data + contiguous, buffer, wrapped * sizeof(T));
}
}

size -= contiguous + wrapped;
releaseLock(&dequeueLock);

return counter;
return contiguous + wrapped;
}
private:

void Drain() {
acquireLock(&enqueueLock);
acquireLock(&dequeueLock);

enqueuePointer = dequeuePointer = buffer;
size = 0;

releaseLock(&enqueueLock);
releaseLock(&dequeueLock);
}

void Drain(size_t count) {
acquireLock(&dequeueLock);

if(dequeuePointer <= enqueuePointer && dequeuePointer + count > enqueuePointer){
size = 0;
dequeuePointer = enqueuePointer;
} else if(dequeuePointer + count > bufferEnd){
if(buffer + count > enqueuePointer){
size = 0;
dequeuePointer = enqueuePointer;
} else {
dequeuePointer = buffer + count;
size -= count;
}
} else {
size -= count;
dequeuePointer += count;
}

releaseLock(&dequeueLock);
}

inline unsigned Count() { return size; }
inline bool Empty() { return dequeuePointer == enqueuePointer; }
protected:
void Resize(size_t size){
T* oldBuffer = buffer;
T* oldBufferEnd = bufferEnd;

bufferSize = size;

buffer = reinterpret_cast<T*>(kmalloc(sizeof(T) * bufferSize));
bufferEnd = &buffer[bufferSize];

memcpy(bufferEnd - (oldBufferEnd - dequeuePointer), dequeuePointer, (oldBufferEnd - dequeuePointer) * sizeof(T));
if(dequeuePointer > enqueuePointer){
memcpy(buffer, oldBuffer, (enqueuePointer - oldBuffer) * sizeof(T));
}

dequeuePointer = bufferEnd - (oldBufferEnd - dequeuePointer);
enqueuePointer = buffer + (enqueuePointer - oldBuffer);

kfree(oldBuffer);
}

T* buffer = nullptr;
T* bufferEnd = nullptr;
size_t bufferSize = 0;

T* enqueuePointer = nullptr;
T* dequeuePointer = nullptr;

unsigned size;

lock_t enqueueLock = 0;
lock_t dequeueLock = 0;
};

class RawRingBuffer : public RingBuffer<uint8_t>{
public:
template<typename ...T>
inline constexpr void EnqueueObjects(const T&... data){
acquireLock(&enqueueLock);
acquireLock(&dequeueLock);

EnqueueUnlocked(data...);

releaseLock(&enqueueLock);
releaseLock(&dequeueLock);
}

private:
template<typename T, typename I>
inline constexpr void EnqueueUnlocked(const Pair<T*, I>& data){
RingBuffer::EnqueueUnlocked(data.item1, data.item2 * sizeof(T));
}

template<typename O, typename ...T>
inline constexpr void EnqueueUnlocked(const O& obj, const T&... data){
EnqueueUnlocked<O>(obj);

EnqueueUnlocked(data...);
}

template<typename T>
inline constexpr void EnqueueUnlocked(const T& data){
RingBuffer::EnqueueUnlocked(reinterpret_cast<uint8_t const*>(&data), sizeof(T));
}
};
Loading

0 comments on commit 909b03c

Please sign in to comment.