Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Development/messagedispatcher #1616

Merged
merged 15 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Tests/unit/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ add_executable(${TEST_RUNNER_NAME}
test_websockettext.cpp
test_workerpool.cpp
test_xgetopt.cpp
#test_message_dispatcher.cpp
test_message_dispatcher.cpp
)

#[[ if(MESSAGING)
Expand Down
247 changes: 80 additions & 167 deletions Tests/unit/core/test_message_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@

#include "../IPTestAdministrator.h"

#include <messaging/messaging.h>

namespace Thunder {

namespace Tests {
namespace Core {

Expand Down Expand Up @@ -62,7 +65,7 @@ namespace Core {

void SetUp() override
{
_dispatcher.reset(new ::Thunder::Core::MessageDispatcherType<METADATA_SIZE, DATA_SIZE>(_identifier, _instanceId, true, _basePath));
_dispatcher.reset(new ::Thunder::Messaging::MessageDataBuffer(_identifier, _instanceId, _basePath, DATA_SIZE, 0, true));
}
void TearDown() override
{
Expand All @@ -73,7 +76,7 @@ namespace Core {
::Thunder::Core::Singleton::Dispose();
}

std::unique_ptr<::Thunder::Core::MessageDispatcherType<METADATA_SIZE, DATA_SIZE>> _dispatcher;
std::unique_ptr<::Thunder::Messaging::MessageDataBuffer> _dispatcher;
string _identifier;
string _basePath;

Expand Down Expand Up @@ -119,16 +122,16 @@ namespace Core {

TEST_F(Core_MessageDispatcher, CreateAndOpenOperatesOnSameValidFile)
{
::Thunder::Core::MessageDispatcherType<METADATA_SIZE, DATA_SIZE> writerDispatcher(_T("test_md"), 0, true, this->_basePath);
::Thunder::Core::MessageDispatcherType<METADATA_SIZE, DATA_SIZE> readerDispatcher(_T("test_md"), 0, false, this->_basePath);
::Thunder::Messaging::MessageDataBuffer writerDispatcher(_T("test_md"), 0, this->_basePath, DATA_SIZE, 0, true);
::Thunder::Messaging::MessageDataBuffer readerDispatcher(_T("test_md"), 0, this->_basePath, DATA_SIZE, 0, false);

uint8_t testData[2] = { 13, 37 };

uint8_t readData[4];
uint16_t readLength = sizeof(readData);

ASSERT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE);
ASSERT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE);
ASSERT_EQ(writerDispatcher.PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE);
ASSERT_EQ(readerDispatcher.PopData(readLength, readData), ::Thunder::Core::ERROR_NONE);

ASSERT_EQ(readLength, sizeof(testData));
ASSERT_EQ(readData[0], 13);
Expand All @@ -137,24 +140,24 @@ namespace Core {

TEST_F(Core_MessageDispatcher, MessageDispatcherCanBeOpenedAndClosed)
{
::Thunder::Core::MessageDispatcherType<METADATA_SIZE, DATA_SIZE> writerDispatcher(_T("test_md"), 0, true, this->_basePath);
::Thunder::Messaging::MessageDataBuffer writerDispatcher(_T("test_md"), 0, this->_basePath, DATA_SIZE, 0, true);

{
::Thunder::Core::MessageDispatcherType<METADATA_SIZE, DATA_SIZE> readerDispatcher(_T("test_md"), 0, false, this->_basePath);
::Thunder::Messaging::MessageDataBuffer readerDispatcher(_T("test_md"), 0, this->_basePath, DATA_SIZE, 0, false);

//destructor is called
}

//reopen
::Thunder::Core::MessageDispatcherType<METADATA_SIZE, DATA_SIZE> readerDispatcher(_T("test_md"), 0, true, this->_basePath);
::Thunder::Messaging::MessageDataBuffer readerDispatcher(_T("test_md"), 0, this->_basePath, DATA_SIZE, 0, false);

uint8_t testData[2] = { 13, 37 };

uint8_t readData[4];
uint16_t readLength = sizeof(readData);

ASSERT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE);
ASSERT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE);
ASSERT_EQ(writerDispatcher.PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE);
ASSERT_EQ(readerDispatcher.PopData(readLength, readData), ::Thunder::Core::ERROR_NONE);

ASSERT_EQ(readLength, sizeof(testData));
ASSERT_EQ(readData[0], 13);
Expand Down Expand Up @@ -185,25 +188,21 @@ namespace Core {
ASSERT_EQ(readData[0], 40);
}

TEST_F(Core_MessageDispatcher, WriteAndReadDataAreEqualInDiffrentProcesses)
TEST_F(Core_MessageDispatcher, WriteAndReadDataAreEqualInDifferentProcesses)
{
auto lambdaFunc = [this](IPTestAdministrator& testAdmin) {
::Thunder::Core::MessageDispatcherType<METADATA_SIZE, DATA_SIZE> dispatcher(this->_identifier, this->_instanceId, false, this->_basePath);
::Thunder::Messaging::MessageDataBuffer dispatcher(this->_identifier, this->_instanceId, this->_basePath, DATA_SIZE, 0, false);

uint8_t readData[4];
uint16_t readLength = sizeof(readData);

testAdmin.Sync("setup reader");
testAdmin.Sync("writer wrote");

// Arbitrary timeout value, 1 second
ASSERT_EQ(dispatcher.Wait(1000), ::Thunder::Core::ERROR_NONE);
ASSERT_EQ(dispatcher.PopData(readLength, readData), ::Thunder::Core::ERROR_NONE);

ASSERT_EQ(readLength, 2);
ASSERT_EQ(readData[0], 13);
ASSERT_EQ(readData[1], 37);

testAdmin.Sync("reader read");
testAdmin.Sync("done");
};

static std::function<void(IPTestAdministrator&)> lambdaVar = lambdaFunc;
Expand All @@ -212,15 +211,11 @@ namespace Core {
// This side (tested) acts as writer
IPTestAdministrator testAdmin(otherSide);
{
testAdmin.Sync("setup reader");

uint8_t testData[2] = { 13, 37 };
ASSERT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE);

testAdmin.Sync("writer wrote");
testAdmin.Sync("reader read");
// The 'ring' is implicit
// _dispatcher->Ring();
}
testAdmin.Sync("done");
}

TEST_F(Core_MessageDispatcher, PushDataShouldNotFitWhenExcedingDataBufferSize)
Expand All @@ -231,175 +226,93 @@ namespace Core {
ASSERT_EQ(_dispatcher->PushData(sizeof(fullBufferSimulation), fullBufferSimulation), ::Thunder::Core::ERROR_WRITE_ERROR);
}

TEST_F(Core_MessageDispatcher, PushDataShouldFlushOldDataIfDoesNotFit)
TEST_F(Core_MessageDispatcher, OffsetCutPushDataShouldFlushOldDataIfDoesNotFitOffsetCut)
{
uint8_t fullBufferSimulation[DATA_SIZE - 1 + sizeof(::Thunder::Core::CyclicBuffer::control) //almost full buffer
- sizeof(uint8_t) //size of type (part of message header)
- sizeof(uint16_t)]; //size of length (part of message header)

uint8_t testData[] = { 12, 21 };

// CyclicBuffer Reserve requires 'length' < Size()
uint8_t fullBufferSimulation[DATA_SIZE - sizeof(uint16_t) - 1]; // sizeof(length) + length - 1, eg. < Size()
uint8_t testData[] = { 12, 11, 13, 21 };
uint8_t readData[4];
uint16_t readLength = sizeof(readData);

ASSERT_EQ(_dispatcher->PushData(sizeof(fullBufferSimulation), fullBufferSimulation), ::Thunder::Core::ERROR_NONE);
//buffer is full, but trying to write new data
EXPECT_EQ(_dispatcher->PushData(sizeof(fullBufferSimulation), fullBufferSimulation), ::Thunder::Core::ERROR_NONE);
// One element free space remaining
// 2+2 bytes, 1 at tail position, 3 starting at position 0

ASSERT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE);
//new data written, so the oldest data should be replaced
//this is first entry and should be first popped (FIFO)
EXPECT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE);
// new data written, so the oldest data should be replaced

ASSERT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE);
ASSERT_EQ(readLength, sizeof(testData));
ASSERT_EQ(readData[0], 12);
ASSERT_EQ(readData[1], 21);
// this is first entry and should be first popped (FIFO)
EXPECT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE);

EXPECT_EQ(readLength, sizeof(testData));
EXPECT_EQ(readData[0], testData[0]);
EXPECT_EQ(readData[3], testData[3]);
}

//doorbell (socket) is not quite working inside test suite
TEST_F(Core_MessageDispatcher, DISABLED_ReaderShouldWaitUntillRingBells)
TEST_F(Core_MessageDispatcher, OnlyOffsetFitsPushDataShouldFlushOldDataIfDoesNotFit)
{
auto lambdaFunc = [this](IPTestAdministrator& testAdmin) {
::Thunder::Core::MessageDispatcherType<METADATA_SIZE, DATA_SIZE> dispatcher(this->_identifier, this->_instanceId, false, this->_basePath);

uint8_t readData[4];
uint16_t readLength = sizeof(readData);

bool called = false;
dispatcher.Wait(0); //initialize socket
testAdmin.Sync("init");

if (dispatcher.Wait(::Thunder::Core::infinite) == ::Thunder::Core::ERROR_NONE) {
ASSERT_EQ(dispatcher.PopData(readLength, readData), ::Thunder::Core::ERROR_NONE);
uint8_t onlyOffsetFitsSimulation[DATA_SIZE - sizeof(uint16_t) - 2];
uint8_t testData[] = { 12, 11, 13, 21 };
uint8_t readData[4];
uint16_t readLength = sizeof(readData);

ASSERT_EQ(readLength, 2);
ASSERT_EQ(readData[0], 13);
ASSERT_EQ(readData[1], 37);
called = true;
}
ASSERT_EQ(called, true);
EXPECT_EQ(_dispatcher->PushData(sizeof(onlyOffsetFitsSimulation), onlyOffsetFitsSimulation), ::Thunder::Core::ERROR_NONE);
// Two elements free space remaining so offset of testData should fit at the end of the cyclic buffer,
// and content of testData buffer should be at the beginning of the cyclic buffer

testAdmin.Sync("done");
};

static std::function<void(IPTestAdministrator&)> lambdaVar = lambdaFunc;
IPTestAdministrator::OtherSideMain otherSide = [](IPTestAdministrator& testAdmin) { lambdaVar(testAdmin); };
EXPECT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE);
EXPECT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE);

// This side (tested) acts as writer
IPTestAdministrator testAdmin(otherSide);
{
uint8_t testData[2] = { 13, 37 };
testAdmin.Sync("init");

ASSERT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE);
}
testAdmin.Sync("done");
EXPECT_EQ(readLength, sizeof(testData));
EXPECT_EQ(readData[0], testData[0]);
EXPECT_EQ(readData[3], testData[3]);
}

TEST_F(Core_MessageDispatcher, WriteAndReadMetaDataAreEqualInSameProcess)
TEST_F(Core_MessageDispatcher, OffsetFitsBufferCutPushDataShouldFlushOldDataIfDoesNotFit)
{
uint8_t testData[2] = { 13, 37 };

_dispatcher->RegisterDataAvailable([&](const uint16_t length, const uint8_t* value, uint16_t& outLength, uint8_t* outValue) {
EXPECT_EQ(length, sizeof(testData));
EXPECT_EQ(value[0], 13);
EXPECT_EQ(value[1], 37);

outValue[0] = 60;
outValue[1] = 61;
outLength = 2;
});
uint8_t offsetPlusFitsSimulation[DATA_SIZE - sizeof(uint16_t) - 3];
uint8_t testData[] = { 12, 11, 13, 21 };
uint8_t readData[4];
uint16_t readLength = sizeof(readData);

auto result = _dispatcher->PushMetadata(sizeof(testData), testData, sizeof(testData));
ASSERT_EQ(result, 2);
ASSERT_EQ(testData[0], 60);
ASSERT_EQ(testData[1], 61);
::SleepMs(50); //wait for callback complete before closing
EXPECT_EQ(_dispatcher->PushData(sizeof(offsetPlusFitsSimulation), offsetPlusFitsSimulation), ::Thunder::Core::ERROR_NONE);
// Three elements free space remaining, so the offset of testData should still fit at the end of the cyclic buffer,
// as well as the first part of the testData buffer, but its second part should be at the beginning of the cyclic buffer

_dispatcher->UnregisterDataAvailable();
}
EXPECT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE);
EXPECT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE);

TEST_F(Core_MessageDispatcher, WriteAndReadMetaDataAreEqualInSameProcessTwice)
{
uint8_t testData1[2] = { 13, 37 };
uint8_t testData2[2] = { 12, 34 };

//first write and read
_dispatcher->RegisterDataAvailable([&](const uint16_t length, const uint8_t* value, uint16_t& outLength, uint8_t* outValue) {
EXPECT_EQ(length, sizeof(testData1));
EXPECT_EQ(value[0], 13);
EXPECT_EQ(value[1], 37);

outValue[0] = 60;
outValue[1] = 61;
outLength = 2;
});
auto result = _dispatcher->PushMetadata(sizeof(testData1), testData1, sizeof(testData1));
ASSERT_EQ(result, 2);
ASSERT_EQ(testData1[0], 60);
ASSERT_EQ(testData1[1], 61);
::SleepMs(50); //need to wait before unregistering, not clean solution though
_dispatcher->UnregisterDataAvailable();

//second write and read
_dispatcher->RegisterDataAvailable([&](const uint16_t length, const uint8_t* value, uint16_t& outLength, uint8_t* outValue) {
EXPECT_EQ(length, sizeof(testData2));
EXPECT_EQ(value[0], 12);
EXPECT_EQ(value[1], 34);

outValue[0] = 60;
outValue[1] = 61;
outLength = 2;
});
result = _dispatcher->PushMetadata(sizeof(testData2), testData2, sizeof(testData2));
ASSERT_EQ(result, 2);
ASSERT_EQ(testData2[0], 60);
ASSERT_EQ(testData2[1], 61);
::SleepMs(50);
_dispatcher->UnregisterDataAvailable();
EXPECT_EQ(readLength, sizeof(testData));
EXPECT_EQ(readData[0], testData[0]);
EXPECT_EQ(readData[3], testData[3]);
}

//socket problems inside test suite
TEST_F(Core_MessageDispatcher, DISABLED_WriteAndReadMetaDataAreEqualInDiffrentProcesses)
TEST_F(Core_MessageDispatcher, BufferGetsFilledToItsVeryMaximum)
{
auto lambdaFunc = [this](IPTestAdministrator& testAdmin) {
::Thunder::Core::MessageDispatcherType<METADATA_SIZE, DATA_SIZE> dispatcher(this->_identifier, this->_instanceId, false, this->_basePath);
uint8_t testData[2] = { 13, 37 };
//testAdmin.Sync("setup");
uint8_t almostFullBufferSimulation[DATA_SIZE - sizeof(uint16_t) - 6];
uint8_t testData1[] = { 12, 11, 13, 21 };
uint8_t testData2[] = { 54, 62, 78, 91 };
uint8_t readData[4];
uint16_t readLength = sizeof(readData);

auto result = _dispatcher->PushMetadata(sizeof(testData), testData, sizeof(testData));
ASSERT_EQ(result, 2);
ASSERT_EQ(testData[0], 60);
ASSERT_EQ(testData[1], 61);
::SleepMs(2000);
};
EXPECT_EQ(_dispatcher->PushData(sizeof(almostFullBufferSimulation), almostFullBufferSimulation), ::Thunder::Core::ERROR_NONE);

static std::function<void(IPTestAdministrator&)> lambdaVar = lambdaFunc;
IPTestAdministrator::OtherSideMain otherSide = [](IPTestAdministrator& testAdmin) { lambdaVar(testAdmin); };
EXPECT_EQ(_dispatcher->PushData(sizeof(testData1), testData1), ::Thunder::Core::ERROR_NONE);
// The cyclic buffer is now full

// This side (tested) acts as reader
IPTestAdministrator testAdmin(otherSide);
{
_dispatcher->RegisterDataAvailable([&](const uint16_t length, const uint8_t* value, uint16_t& outLength, uint8_t* outValue) {
std::vector<uint8_t> result{ 60, 61 };
EXPECT_EQ(length, 2);
EXPECT_EQ(value[0], 13);
EXPECT_EQ(value[1], 37);
return result;
});

::SleepMs(2000);
}
_dispatcher->UnregisterDataAvailable();
}
EXPECT_EQ(_dispatcher->PushData(sizeof(testData2), testData2), ::Thunder::Core::ERROR_NONE);
// The cyclic buffer needs to flush the almostFullBufferSimulation to make space for testData2

TEST_F(Core_MessageDispatcher, WriteMetaDataShouldFailIfReaderNotRegistered)
{
uint8_t testData[2] = { 13, 37 };
EXPECT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE);
EXPECT_EQ(readLength, sizeof(testData1));
EXPECT_EQ(readData[0], testData1[0]);
EXPECT_EQ(readData[3], testData1[3]);

auto result = _dispatcher->PushMetadata(sizeof(testData), testData, sizeof(testData));
ASSERT_EQ(result, 0);
EXPECT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE);
EXPECT_EQ(readLength, sizeof(testData2));
EXPECT_EQ(readData[0], testData2[0]);
EXPECT_EQ(readData[3], testData2[3]);
}

} // Core
} // Tests
} // Thunder