diff --git a/Tests/unit/core/CMakeLists.txt b/Tests/unit/core/CMakeLists.txt index 23c64c46c..b18982d77 100644 --- a/Tests/unit/core/CMakeLists.txt +++ b/Tests/unit/core/CMakeLists.txt @@ -79,7 +79,7 @@ add_executable(${TEST_RUNNER_NAME} test_websockettext.cpp test_workerpool.cpp test_xgetopt.cpp - #test_message_dispatcher.cpp + test_message_dispatcher.cpp ) #[[ if(MESSAGING) diff --git a/Tests/unit/core/test_message_dispatcher.cpp b/Tests/unit/core/test_message_dispatcher.cpp index 0446042f6..afa213115 100644 --- a/Tests/unit/core/test_message_dispatcher.cpp +++ b/Tests/unit/core/test_message_dispatcher.cpp @@ -29,7 +29,10 @@ #include "../IPTestAdministrator.h" +#include + namespace Thunder { + namespace Tests { namespace Core { @@ -62,7 +65,7 @@ namespace Core { void SetUp() override { - _dispatcher.reset(new ::Thunder::Core::MessageDispatcherType(_identifier, _instanceId, true, _basePath)); + _dispatcher.reset(new ::Thunder::Messaging::MessageDataBuffer(_identifier, _instanceId, _basePath, DATA_SIZE, 0, true)); } void TearDown() override { @@ -73,7 +76,7 @@ namespace Core { ::Thunder::Core::Singleton::Dispose(); } - std::unique_ptr<::Thunder::Core::MessageDispatcherType> _dispatcher; + std::unique_ptr<::Thunder::Messaging::MessageDataBuffer> _dispatcher; string _identifier; string _basePath; @@ -119,16 +122,16 @@ namespace Core { TEST_F(Core_MessageDispatcher, CreateAndOpenOperatesOnSameValidFile) { - ::Thunder::Core::MessageDispatcherType writerDispatcher(_T("test_md"), 0, true, this->_basePath); - ::Thunder::Core::MessageDispatcherType readerDispatcher(_T("test_md"), 0, false, this->_basePath); + ::Thunder::Messaging::MessageDataBuffer writerDispatcher(_T("test_md"), 0, this->_basePath, DATA_SIZE, 0, true); + ::Thunder::Messaging::MessageDataBuffer readerDispatcher(_T("test_md"), 0, this->_basePath, DATA_SIZE, 0, false); uint8_t testData[2] = { 13, 37 }; uint8_t readData[4]; uint16_t readLength = sizeof(readData); - ASSERT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE); - ASSERT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE); + ASSERT_EQ(writerDispatcher.PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE); + ASSERT_EQ(readerDispatcher.PopData(readLength, readData), ::Thunder::Core::ERROR_NONE); ASSERT_EQ(readLength, sizeof(testData)); ASSERT_EQ(readData[0], 13); @@ -137,24 +140,24 @@ namespace Core { TEST_F(Core_MessageDispatcher, MessageDispatcherCanBeOpenedAndClosed) { - ::Thunder::Core::MessageDispatcherType writerDispatcher(_T("test_md"), 0, true, this->_basePath); + ::Thunder::Messaging::MessageDataBuffer writerDispatcher(_T("test_md"), 0, this->_basePath, DATA_SIZE, 0, true); { - ::Thunder::Core::MessageDispatcherType readerDispatcher(_T("test_md"), 0, false, this->_basePath); + ::Thunder::Messaging::MessageDataBuffer readerDispatcher(_T("test_md"), 0, this->_basePath, DATA_SIZE, 0, false); //destructor is called } //reopen - ::Thunder::Core::MessageDispatcherType readerDispatcher(_T("test_md"), 0, true, this->_basePath); + ::Thunder::Messaging::MessageDataBuffer readerDispatcher(_T("test_md"), 0, this->_basePath, DATA_SIZE, 0, false); uint8_t testData[2] = { 13, 37 }; uint8_t readData[4]; uint16_t readLength = sizeof(readData); - ASSERT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE); - ASSERT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE); + ASSERT_EQ(writerDispatcher.PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE); + ASSERT_EQ(readerDispatcher.PopData(readLength, readData), ::Thunder::Core::ERROR_NONE); ASSERT_EQ(readLength, sizeof(testData)); ASSERT_EQ(readData[0], 13); @@ -185,25 +188,21 @@ namespace Core { ASSERT_EQ(readData[0], 40); } - TEST_F(Core_MessageDispatcher, WriteAndReadDataAreEqualInDiffrentProcesses) + TEST_F(Core_MessageDispatcher, WriteAndReadDataAreEqualInDifferentProcesses) { auto lambdaFunc = [this](IPTestAdministrator& testAdmin) { - ::Thunder::Core::MessageDispatcherType dispatcher(this->_identifier, this->_instanceId, false, this->_basePath); + ::Thunder::Messaging::MessageDataBuffer dispatcher(this->_identifier, this->_instanceId, this->_basePath, DATA_SIZE, 0, false); uint8_t readData[4]; uint16_t readLength = sizeof(readData); - testAdmin.Sync("setup reader"); - testAdmin.Sync("writer wrote"); - + // Arbitrary timeout value, 1 second + ASSERT_EQ(dispatcher.Wait(1000), ::Thunder::Core::ERROR_NONE); ASSERT_EQ(dispatcher.PopData(readLength, readData), ::Thunder::Core::ERROR_NONE); ASSERT_EQ(readLength, 2); ASSERT_EQ(readData[0], 13); ASSERT_EQ(readData[1], 37); - - testAdmin.Sync("reader read"); - testAdmin.Sync("done"); }; static std::function lambdaVar = lambdaFunc; @@ -212,15 +211,11 @@ namespace Core { // This side (tested) acts as writer IPTestAdministrator testAdmin(otherSide); { - testAdmin.Sync("setup reader"); - uint8_t testData[2] = { 13, 37 }; ASSERT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE); - - testAdmin.Sync("writer wrote"); - testAdmin.Sync("reader read"); + // The 'ring' is implicit +// _dispatcher->Ring(); } - testAdmin.Sync("done"); } TEST_F(Core_MessageDispatcher, PushDataShouldNotFitWhenExcedingDataBufferSize) @@ -231,175 +226,93 @@ namespace Core { ASSERT_EQ(_dispatcher->PushData(sizeof(fullBufferSimulation), fullBufferSimulation), ::Thunder::Core::ERROR_WRITE_ERROR); } - TEST_F(Core_MessageDispatcher, PushDataShouldFlushOldDataIfDoesNotFit) + TEST_F(Core_MessageDispatcher, OffsetCutPushDataShouldFlushOldDataIfDoesNotFitOffsetCut) { - uint8_t fullBufferSimulation[DATA_SIZE - 1 + sizeof(::Thunder::Core::CyclicBuffer::control) //almost full buffer - - sizeof(uint8_t) //size of type (part of message header) - - sizeof(uint16_t)]; //size of length (part of message header) - - uint8_t testData[] = { 12, 21 }; - + // CyclicBuffer Reserve requires 'length' < Size() + uint8_t fullBufferSimulation[DATA_SIZE - sizeof(uint16_t) - 1]; // sizeof(length) + length - 1, eg. < Size() + uint8_t testData[] = { 12, 11, 13, 21 }; uint8_t readData[4]; uint16_t readLength = sizeof(readData); - ASSERT_EQ(_dispatcher->PushData(sizeof(fullBufferSimulation), fullBufferSimulation), ::Thunder::Core::ERROR_NONE); - //buffer is full, but trying to write new data + EXPECT_EQ(_dispatcher->PushData(sizeof(fullBufferSimulation), fullBufferSimulation), ::Thunder::Core::ERROR_NONE); + // One element free space remaining + // 2+2 bytes, 1 at tail position, 3 starting at position 0 - ASSERT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE); - //new data written, so the oldest data should be replaced - //this is first entry and should be first popped (FIFO) + EXPECT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE); + // new data written, so the oldest data should be replaced - ASSERT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE); - ASSERT_EQ(readLength, sizeof(testData)); - ASSERT_EQ(readData[0], 12); - ASSERT_EQ(readData[1], 21); + // this is first entry and should be first popped (FIFO) + EXPECT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE); + + EXPECT_EQ(readLength, sizeof(testData)); + EXPECT_EQ(readData[0], testData[0]); + EXPECT_EQ(readData[3], testData[3]); } - //doorbell (socket) is not quite working inside test suite - TEST_F(Core_MessageDispatcher, DISABLED_ReaderShouldWaitUntillRingBells) + TEST_F(Core_MessageDispatcher, OnlyOffsetFitsPushDataShouldFlushOldDataIfDoesNotFit) { - auto lambdaFunc = [this](IPTestAdministrator& testAdmin) { - ::Thunder::Core::MessageDispatcherType dispatcher(this->_identifier, this->_instanceId, false, this->_basePath); - - uint8_t readData[4]; - uint16_t readLength = sizeof(readData); - - bool called = false; - dispatcher.Wait(0); //initialize socket - testAdmin.Sync("init"); - - if (dispatcher.Wait(::Thunder::Core::infinite) == ::Thunder::Core::ERROR_NONE) { - ASSERT_EQ(dispatcher.PopData(readLength, readData), ::Thunder::Core::ERROR_NONE); + uint8_t onlyOffsetFitsSimulation[DATA_SIZE - sizeof(uint16_t) - 2]; + uint8_t testData[] = { 12, 11, 13, 21 }; + uint8_t readData[4]; + uint16_t readLength = sizeof(readData); - ASSERT_EQ(readLength, 2); - ASSERT_EQ(readData[0], 13); - ASSERT_EQ(readData[1], 37); - called = true; - } - ASSERT_EQ(called, true); + EXPECT_EQ(_dispatcher->PushData(sizeof(onlyOffsetFitsSimulation), onlyOffsetFitsSimulation), ::Thunder::Core::ERROR_NONE); + // Two elements free space remaining so offset of testData should fit at the end of the cyclic buffer, + // and content of testData buffer should be at the beginning of the cyclic buffer - testAdmin.Sync("done"); - }; - - static std::function lambdaVar = lambdaFunc; - IPTestAdministrator::OtherSideMain otherSide = [](IPTestAdministrator& testAdmin) { lambdaVar(testAdmin); }; + EXPECT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE); + EXPECT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE); - // This side (tested) acts as writer - IPTestAdministrator testAdmin(otherSide); - { - uint8_t testData[2] = { 13, 37 }; - testAdmin.Sync("init"); - - ASSERT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE); - } - testAdmin.Sync("done"); + EXPECT_EQ(readLength, sizeof(testData)); + EXPECT_EQ(readData[0], testData[0]); + EXPECT_EQ(readData[3], testData[3]); } - TEST_F(Core_MessageDispatcher, WriteAndReadMetaDataAreEqualInSameProcess) + TEST_F(Core_MessageDispatcher, OffsetFitsBufferCutPushDataShouldFlushOldDataIfDoesNotFit) { - uint8_t testData[2] = { 13, 37 }; - - _dispatcher->RegisterDataAvailable([&](const uint16_t length, const uint8_t* value, uint16_t& outLength, uint8_t* outValue) { - EXPECT_EQ(length, sizeof(testData)); - EXPECT_EQ(value[0], 13); - EXPECT_EQ(value[1], 37); - - outValue[0] = 60; - outValue[1] = 61; - outLength = 2; - }); + uint8_t offsetPlusFitsSimulation[DATA_SIZE - sizeof(uint16_t) - 3]; + uint8_t testData[] = { 12, 11, 13, 21 }; + uint8_t readData[4]; + uint16_t readLength = sizeof(readData); - auto result = _dispatcher->PushMetadata(sizeof(testData), testData, sizeof(testData)); - ASSERT_EQ(result, 2); - ASSERT_EQ(testData[0], 60); - ASSERT_EQ(testData[1], 61); - ::SleepMs(50); //wait for callback complete before closing + EXPECT_EQ(_dispatcher->PushData(sizeof(offsetPlusFitsSimulation), offsetPlusFitsSimulation), ::Thunder::Core::ERROR_NONE); + // Three elements free space remaining, so the offset of testData should still fit at the end of the cyclic buffer, + // as well as the first part of the testData buffer, but its second part should be at the beginning of the cyclic buffer - _dispatcher->UnregisterDataAvailable(); - } + EXPECT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE); + EXPECT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE); - TEST_F(Core_MessageDispatcher, WriteAndReadMetaDataAreEqualInSameProcessTwice) - { - uint8_t testData1[2] = { 13, 37 }; - uint8_t testData2[2] = { 12, 34 }; - - //first write and read - _dispatcher->RegisterDataAvailable([&](const uint16_t length, const uint8_t* value, uint16_t& outLength, uint8_t* outValue) { - EXPECT_EQ(length, sizeof(testData1)); - EXPECT_EQ(value[0], 13); - EXPECT_EQ(value[1], 37); - - outValue[0] = 60; - outValue[1] = 61; - outLength = 2; - }); - auto result = _dispatcher->PushMetadata(sizeof(testData1), testData1, sizeof(testData1)); - ASSERT_EQ(result, 2); - ASSERT_EQ(testData1[0], 60); - ASSERT_EQ(testData1[1], 61); - ::SleepMs(50); //need to wait before unregistering, not clean solution though - _dispatcher->UnregisterDataAvailable(); - - //second write and read - _dispatcher->RegisterDataAvailable([&](const uint16_t length, const uint8_t* value, uint16_t& outLength, uint8_t* outValue) { - EXPECT_EQ(length, sizeof(testData2)); - EXPECT_EQ(value[0], 12); - EXPECT_EQ(value[1], 34); - - outValue[0] = 60; - outValue[1] = 61; - outLength = 2; - }); - result = _dispatcher->PushMetadata(sizeof(testData2), testData2, sizeof(testData2)); - ASSERT_EQ(result, 2); - ASSERT_EQ(testData2[0], 60); - ASSERT_EQ(testData2[1], 61); - ::SleepMs(50); - _dispatcher->UnregisterDataAvailable(); + EXPECT_EQ(readLength, sizeof(testData)); + EXPECT_EQ(readData[0], testData[0]); + EXPECT_EQ(readData[3], testData[3]); } - //socket problems inside test suite - TEST_F(Core_MessageDispatcher, DISABLED_WriteAndReadMetaDataAreEqualInDiffrentProcesses) + TEST_F(Core_MessageDispatcher, BufferGetsFilledToItsVeryMaximum) { - auto lambdaFunc = [this](IPTestAdministrator& testAdmin) { - ::Thunder::Core::MessageDispatcherType dispatcher(this->_identifier, this->_instanceId, false, this->_basePath); - uint8_t testData[2] = { 13, 37 }; - //testAdmin.Sync("setup"); + uint8_t almostFullBufferSimulation[DATA_SIZE - sizeof(uint16_t) - 6]; + uint8_t testData1[] = { 12, 11, 13, 21 }; + uint8_t testData2[] = { 54, 62, 78, 91 }; + uint8_t readData[4]; + uint16_t readLength = sizeof(readData); - auto result = _dispatcher->PushMetadata(sizeof(testData), testData, sizeof(testData)); - ASSERT_EQ(result, 2); - ASSERT_EQ(testData[0], 60); - ASSERT_EQ(testData[1], 61); - ::SleepMs(2000); - }; + EXPECT_EQ(_dispatcher->PushData(sizeof(almostFullBufferSimulation), almostFullBufferSimulation), ::Thunder::Core::ERROR_NONE); - static std::function lambdaVar = lambdaFunc; - IPTestAdministrator::OtherSideMain otherSide = [](IPTestAdministrator& testAdmin) { lambdaVar(testAdmin); }; + EXPECT_EQ(_dispatcher->PushData(sizeof(testData1), testData1), ::Thunder::Core::ERROR_NONE); + // The cyclic buffer is now full - // This side (tested) acts as reader - IPTestAdministrator testAdmin(otherSide); - { - _dispatcher->RegisterDataAvailable([&](const uint16_t length, const uint8_t* value, uint16_t& outLength, uint8_t* outValue) { - std::vector result{ 60, 61 }; - EXPECT_EQ(length, 2); - EXPECT_EQ(value[0], 13); - EXPECT_EQ(value[1], 37); - return result; - }); - - ::SleepMs(2000); - } - _dispatcher->UnregisterDataAvailable(); - } + EXPECT_EQ(_dispatcher->PushData(sizeof(testData2), testData2), ::Thunder::Core::ERROR_NONE); + // The cyclic buffer needs to flush the almostFullBufferSimulation to make space for testData2 - TEST_F(Core_MessageDispatcher, WriteMetaDataShouldFailIfReaderNotRegistered) - { - uint8_t testData[2] = { 13, 37 }; + EXPECT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE); + EXPECT_EQ(readLength, sizeof(testData1)); + EXPECT_EQ(readData[0], testData1[0]); + EXPECT_EQ(readData[3], testData1[3]); - auto result = _dispatcher->PushMetadata(sizeof(testData), testData, sizeof(testData)); - ASSERT_EQ(result, 0); + EXPECT_EQ(_dispatcher->PopData(readLength, readData), ::Thunder::Core::ERROR_NONE); + EXPECT_EQ(readLength, sizeof(testData2)); + EXPECT_EQ(readData[0], testData2[0]); + EXPECT_EQ(readData[3], testData2[3]); } - } // Core } // Tests } // Thunder