diff --git a/doc/design/chunk_header.md b/doc/design/chunk_header.md index 98985b77ee..c792cf41ec 100644 --- a/doc/design/chunk_header.md +++ b/doc/design/chunk_header.md @@ -45,7 +45,8 @@ class ChunkHeader { uint32_t chunkSize; uint8_t chunkHeaderVersion; - uint8_t reserved[3]; + uint8_t reserved{0}; + uint16_t userHeaderId; uint64_t originId; uint64_t sequenceNumber; uint32_t userHeaderSize{0U}; @@ -57,7 +58,8 @@ class ChunkHeader - **chunkSize** is the size of the whole chunk - **chunkHeaderVersion** is used to detect incompatibilities for record&replay functionality -- **reserved[3]** is currently not used and set to `0` +- **reserved** is currently not used and set to `0` +- **userHeaderId** is currently not used and set to `NO_USER_HEADER` - **originId** is the unique identifier of the publisher the chunk was sent from - **sequenceNumber** is a serial number for the sent chunks - **userPayloadSize** is the size of the chunk occupied by the user-header @@ -247,33 +249,7 @@ Furthermore, the `Publisher` and `Subscriber` have access to the `ChunkHeader` a ## Open Issues -- the design was done with the intention to have a user-header of arbitrary size, if the size is limited to e.g. 32 bytes, some things could be simplified -- publisher/subscriber API proposal -``` -// publisher -auto pub = iox::popo::Publisher(serviceDescription); -pub.loan() - .and_then([&](auto& sample) { - sample.getHeader()->userHeader()->data = 42; - sample->a = 42; - sample->b = 13; - sample.publish(); - }) - .or_else([](iox::popo::AllocationError) { - // Do something with error. - }); - -// subscriber -auto sub = iox::popo::Subscriber(serviceDescription); -sub->take() - .and_then([](auto& sample){ - std::cout << "User-Header data: " << sample.getHeader()->userHeader()->data << std::endl; - std::cout << "User-Payload data: " << static_cast(sample->get())->data << std::endl; - }); -``` - - the publisher/subscriber would have a default parameter for the user-header to be source compatible with our current API - - the drawback is that the user could use the wrong user-header. Maybe `Sample` also needs an additional template parameter - - additionally, a `ChunkHeaderHook` could be used on the publisher side +- a `ChunkHeaderHook` could be used on the publisher side ``` template class MyChunkHeaderHook : public ChunkHeaderHook @@ -290,43 +266,6 @@ auto pub = iox::popo::Publisher(serviceDescription, userHeaderHook); ``` - alternatively, instead of the ChunkHeaderHook class, the publisher could have a method `registerDeliveryHook(std::function)` - allow the user only read access to the `ChunkHeader` and write access to the `UserHeader` -- untyped publisher/subscriber API proposal -``` -// publisher option 1 -auto pub = iox::popo::UntypedPublisher(serviceDescription); - -// publisher option 2 -auto userHeaderSize = sizeOf(MyUserHeader); -auto pub = iox::popo::UntypedPublisher(serviceDescription, userHeaderSize); - -auto payloadSize = sizeof(MyPayload); -auto payloadAlignment = alignof(MyPayload); -pub.loan(payloadSize, payloadAlignment) - .and_then([&](auto& sample) { - sample.getHeader()->userHeader()->data = 42; - auto payload = new (sample.get()) MyPayload(); - payload->data = 73; - sample.publish(); - }) - .or_else([](iox::popo::AllocationError) { - // Do something with error. - }); - -// subscriber option 1 -auto pub = iox::popo::UntypedPublisher(serviceDescription); - -// subscriber option 2 -auto userHeaderSize = sizeOf(MyUserHeader); -auto pub = iox::popo::UntypedSubscriber(serviceDescription, userHeaderSize); -sub->take() - .and_then([](auto& sample){ - std::cout << "User-Header data: " << sample.getHeader()->userHeader()->data << std::endl; - std::cout << "User-Payload data: " << static_cast(sample->get())->data << std::endl; - }); -``` - - option 1 has the benefit to catch a wrong alignment of the user-header and would be necessary if we make the `Sample` aware of the user-header -- C bindings - - PoC is needed - user defined sequence number - this can probably be done by a `ChunkHeaderHook` - alternatively, a flag could be introduce into the `ChunkHeader` diff --git a/iceoryx_dds/CMakeLists.txt b/iceoryx_dds/CMakeLists.txt index f3a5f72b9d..56137b2626 100644 --- a/iceoryx_dds/CMakeLists.txt +++ b/iceoryx_dds/CMakeLists.txt @@ -90,6 +90,7 @@ if(USE_CYCLONE_DDS) source/iceoryx_dds/dds/cyclone_context.cpp source/iceoryx_dds/dds/cyclone_data_reader.cpp source/iceoryx_dds/dds/cyclone_data_writer.cpp + source/iceoryx_dds/dds/iox_chunk_datagram_header.cpp ) # Generate IDL at configure time diff --git a/iceoryx_dds/include/iceoryx_dds/dds/cyclone_data_reader.hpp b/iceoryx_dds/include/iceoryx_dds/dds/cyclone_data_reader.hpp index 17df7e5102..5cfd2dfefb 100644 --- a/iceoryx_dds/include/iceoryx_dds/dds/cyclone_data_reader.hpp +++ b/iceoryx_dds/include/iceoryx_dds/dds/cyclone_data_reader.hpp @@ -1,4 +1,5 @@ // Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// Copyright (c) 2021 by Apex.AI Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -46,12 +47,11 @@ class CycloneDataReader : public DataReader void connect() noexcept override; - iox::cxx::optional peekNextSize() override; - bool hasSamples() override; - iox::cxx::expected takeNext(uint8_t* const buffer, const uint64_t& bufferSize) override; - - iox::cxx::expected - take(uint8_t* const buffer, const uint64_t& bufferSize, const iox::cxx::optional& maxSamples) override; + iox::cxx::optional peekNextIoxChunkDatagramHeader() noexcept override; + bool hasSamples() noexcept override; + iox::cxx::expected takeNext(const IoxChunkDatagramHeader datagramHeader, + uint8_t* const userHeaderBuffer, + uint8_t* const userPayloadBuffer) noexcept override; capro::IdString_t getServiceId() const noexcept override; capro::IdString_t getInstanceId() const noexcept override; diff --git a/iceoryx_dds/include/iceoryx_dds/dds/cyclone_data_writer.hpp b/iceoryx_dds/include/iceoryx_dds/dds/cyclone_data_writer.hpp index 19d9f90508..1fbade7aa6 100644 --- a/iceoryx_dds/include/iceoryx_dds/dds/cyclone_data_writer.hpp +++ b/iceoryx_dds/include/iceoryx_dds/dds/cyclone_data_writer.hpp @@ -1,4 +1,5 @@ // Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// Copyright (c) 2021 by Apex.AI Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -44,7 +45,9 @@ class CycloneDataWriter : public iox::dds::DataWriter CycloneDataWriter& operator=(CycloneDataWriter&&) = default; void connect() noexcept override; - void write(const uint8_t* const bytes, const uint64_t size) noexcept override; + void write(iox::dds::IoxChunkDatagramHeader datagramHeader, + const uint8_t* const userHeaderBytes, + const uint8_t* const userPayloadBytes) noexcept override; capro::IdString_t getServiceId() const noexcept override; capro::IdString_t getInstanceId() const noexcept override; capro::IdString_t getEventId() const noexcept override; diff --git a/iceoryx_dds/include/iceoryx_dds/dds/data_reader.hpp b/iceoryx_dds/include/iceoryx_dds/dds/data_reader.hpp index 8e83f239ae..f083280cc3 100644 --- a/iceoryx_dds/include/iceoryx_dds/dds/data_reader.hpp +++ b/iceoryx_dds/include/iceoryx_dds/dds/data_reader.hpp @@ -18,6 +18,7 @@ #ifndef IOX_DDS_DDS_DATA_READER_HPP #define IOX_DDS_DDS_DATA_READER_HPP +#include "iceoryx_dds/dds/iox_chunk_datagram_header.hpp" #include "iceoryx_posh/iceoryx_posh_types.hpp" #include "iceoryx_utils/cxx/expected.hpp" #include "iceoryx_utils/cxx/optional.hpp" @@ -30,13 +31,20 @@ enum class DataReaderError : uint8_t { INVALID_STATE, NOT_CONNECTED, - INVALID_RECV_BUFFER, + INVALID_DATAGRAM_HEADER_SIZE, + INVALID_BUFFER_PARAMETER_FOR_USER_HEADER, + INVALID_BUFFER_PARAMETER_FOR_USER_PAYLOAD, INVALID_DATA, - RECV_BUFFER_TOO_SMALL + BUFFER_SIZE_MISMATCH }; -constexpr char DataReaderErrorString[][64] = { - "NOT_CONNECTED", "INVALID_RECV_BUFFER", "INVALID_DATA", "RECV_BUFFER_TOO_SMALL"}; +constexpr const char* DataReaderErrorString[] = {"INVALID_STATE", + "NOT_CONNECTED", + "INVALID_DATAGRAM_HEADER_SIZE", + "INVALID_BUFFER_PARAMETER_FOR_USER_HEADER", + "INVALID_BUFFER_PARAMETER_FOR_USER_PAYLOAD", + "INVALID_DATA", + "BUFFER_SIZE_MISMATCH"}; class DataReader { @@ -47,37 +55,27 @@ class DataReader virtual void connect() noexcept = 0; /// - /// @brief peekNextSize Get the size of the next sample if one is available. - /// @return The size of the next sample if one is available. + /// @brief peekNextIoxChunkDatagramHeader Get the IoxChunkDatagramHeader of the next sample if one is available. + /// @return The IoxChunkDatagramHeader of the next sample if one is available. /// - virtual iox::cxx::optional peekNextSize() = 0; + virtual iox::cxx::optional peekNextIoxChunkDatagramHeader() noexcept = 0; /// /// @brief hasSamples Checks if new samples ready to take. /// @return True if new samples available. /// - virtual bool hasSamples() = 0; + virtual bool hasSamples() noexcept = 0; /// /// @brief take Take the next available sample from the DDS data space. - /// @param buffer Receive buffer in which sample will be stored. - /// @param bufferSize Size of the provided buffer. + /// @param datagramHeader with size information + /// @param userHeaderBuffer buffer for the user-header + /// @param userPayloadBuffer buffer for the user-payload /// @return Error if unsuccessful. /// - virtual iox::cxx::expected takeNext(uint8_t* const buffer, const uint64_t& bufferSize) = 0; - - - /// - /// @brief take Take up to a maximum number of samples from the DDS data space. - /// @param buffer Receive buffer in which samples will be stored. - /// @param bufferSize The size of the buffer (in bytes). - /// @param maxSamples The maximum number of samples to request from the network. - /// @return Number of samples taken if successful. Number of samples will be in the sange [0,maxSamples]. - /// - /// @note Sample size must be known ahead of time & can be checked using @ref peekNextSize() . - /// - virtual iox::cxx::expected - take(uint8_t* const buffer, const uint64_t& bufferSize, const iox::cxx::optional& maxSamples) = 0; + virtual iox::cxx::expected takeNext(const IoxChunkDatagramHeader datagramHeader, + uint8_t* const userHeaderBuffer, + uint8_t* const userPayloadBuffer) noexcept = 0; /// /// @brief getServiceId @@ -98,7 +96,7 @@ class DataReader virtual capro::IdString_t getEventId() const noexcept = 0; protected: - DataReader() = default; + DataReader() noexcept = default; }; } // namespace dds } // namespace iox diff --git a/iceoryx_dds/include/iceoryx_dds/dds/data_writer.hpp b/iceoryx_dds/include/iceoryx_dds/dds/data_writer.hpp index ec32d07cba..f1baf07037 100644 --- a/iceoryx_dds/include/iceoryx_dds/dds/data_writer.hpp +++ b/iceoryx_dds/include/iceoryx_dds/dds/data_writer.hpp @@ -1,4 +1,5 @@ // Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// Copyright (c) 2021 by Apex.AI Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,6 +18,8 @@ #ifndef IOX_DDS_DDS_DATA_WRITER_HPP #define IOX_DDS_DDS_DATA_WRITER_HPP +#include "iceoryx_dds/dds/iox_chunk_datagram_header.hpp" + #include "iceoryx_posh/iceoryx_posh_types.hpp" #include @@ -40,11 +43,14 @@ class DataWriter virtual void connect() noexcept = 0; /// - /// @brief write Write the provided bytes on the DDS network on the topic: serviceId/instanceId/eventId - /// @param bytes - /// @param size + /// @brief write Write the provided header and bytes on the DDS network on the topic: serviceId/instanceId/eventId + /// @param datagramHeader with size information + /// @param userHeaderBytes buffer with the user-header + /// @param userPayloadBytes buffer with the user-payload /// - virtual void write(const uint8_t* const bytes, const uint64_t size) noexcept = 0; + virtual void write(iox::dds::IoxChunkDatagramHeader datagramHeader, + const uint8_t* const userHeaderBytes, + const uint8_t* const userPayloadBytes) noexcept = 0; /// /// @brief getServiceId diff --git a/iceoryx_dds/include/iceoryx_dds/dds/iox_chunk_datagram_header.hpp b/iceoryx_dds/include/iceoryx_dds/dds/iox_chunk_datagram_header.hpp new file mode 100644 index 0000000000..b79eeebf24 --- /dev/null +++ b/iceoryx_dds/include/iceoryx_dds/dds/iox_chunk_datagram_header.hpp @@ -0,0 +1,77 @@ +// Copyright (c) 2021 by Apex.AI Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +#ifndef IOX_DDS_DDS_IOX_CHUNK_DATAGRAM_HEADER_HPP +#define IOX_DDS_DDS_IOX_CHUNK_DATAGRAM_HEADER_HPP + +#include "iceoryx_utils/cxx/vector.hpp" + +#include + +namespace iox +{ +namespace dds +{ +/// @brief the endianess of the serialized data +enum class Endianess : uint8_t +{ + UNDEFINED, + LITTLE, + BIG, + MIXED, +}; + +constexpr const char* EndianessString[] = {"UNDEFINED", "LITTLE", "BIG", "MIXED"}; + +/// @brief Detects the endianness of the system +Endianess getEndianess(); + +/// @brief The datagram header with chunk metadata for user-header and user-payload +struct IoxChunkDatagramHeader +{ + using Serialized_t = iox::cxx::vector; + + /// @brief Serializes a IoxChunkDatagramHeader into a vector of uint8_t + /// @param[in] datagramHeader to serialize + /// @return the serialized IoxChunkDatagramHeader + static Serialized_t serialize(const IoxChunkDatagramHeader& datagramHeader); + + /// @brief Deserializes a vector of uint8_t into a IoxChunkDatagramHeader + /// @param[in] serializedDatagram is the serialized IoxChunkDatagramHeader + /// @return the deserialized IoxChunkDatagramHeader + static IoxChunkDatagramHeader deserialize(const Serialized_t& serializedDatagramHeader); + + /// @brief From the 1.0 release onward, this must be incremented for each incompatible change, e.g. + /// - data width of members changes + /// - members are rearranged + /// - semantic meaning of a member changes + static constexpr uint8_t DATAGRAM_VERSION{1U}; + + /// @note This must always be the first member and always 1 bytes in order to prevent issues with endianess when + /// deserialized or incorrectly detected versions due to different size + uint8_t datagramVersion{DATAGRAM_VERSION}; + /// @note This must always be 1 byte in order to prevent issues with endianess when deserialized + Endianess endianness{Endianess::UNDEFINED}; + uint16_t userHeaderId{0xFFFF}; + uint32_t userHeaderSize{0U}; + uint32_t userPayloadSize{0U}; + uint32_t userPayloadAlignment{0U}; +}; + +} // namespace dds +} // namespace iox + +#endif // IOX_DDS_DDS_IOX_CHUNK_DATAGRAM_HEADER_HPP diff --git a/iceoryx_dds/include/iceoryx_dds/internal/gateway/dds_to_iox.inl b/iceoryx_dds/include/iceoryx_dds/internal/gateway/dds_to_iox.inl index b34ff6ea89..c6e1107aa1 100644 --- a/iceoryx_dds/include/iceoryx_dds/internal/gateway/dds_to_iox.inl +++ b/iceoryx_dds/include/iceoryx_dds/internal/gateway/dds_to_iox.inl @@ -64,16 +64,34 @@ inline void DDS2IceoryxGateway::forward(const channel_t& c while (reader->hasSamples()) { - reader->peekNextSize().and_then([&](auto size) { - publisher->loan(size).and_then([&](auto chunk) { - reader->takeNext(static_cast(chunk), size) - .and_then([&]() { publisher->publish(chunk); }) - .or_else([&](DataReaderError err) { - publisher->release(chunk); - LogWarn() << "[DDS2IceoryxGateway] Encountered error reading from DDS network: " - << dds::DataReaderErrorString[static_cast(err)]; - }); - }); + reader->peekNextIoxChunkDatagramHeader().and_then([&](auto datagramHeader) { + // this is safe, it is just used to check if the alignment doesn't exceed the + // alignment of the ChunkHeader but since this is data from a previously valid + // chunk, we can assume that the alignment was correct and use this value + constexpr uint32_t USER_HEADER_ALIGNMENT{1U}; + publisher + ->loan(datagramHeader.userPayloadSize, + datagramHeader.userPayloadAlignment, + datagramHeader.userHeaderSize, + USER_HEADER_ALIGNMENT) + .and_then([&](auto userPayload) { + auto chunkHeader = iox::mepoo::ChunkHeader::fromUserPayload(userPayload); + reader + ->takeNext(datagramHeader, + static_cast(chunkHeader->userHeader()), + static_cast(chunkHeader->userPayload())) + .and_then([&]() { publisher->publish(userPayload); }) + .or_else([&](DataReaderError err) { + publisher->release(userPayload); + LogWarn() << "[DDS2IceoryxGateway] Encountered error reading from DDS network: " + << dds::DataReaderErrorString[static_cast(err)]; + }); + }) + .or_else([](auto& error) { + LogError() << "[DDS2IceoryxGateway] Could not loan chunk! Error code: " + << static_cast(error); + }); + ; }); } } diff --git a/iceoryx_dds/include/iceoryx_dds/internal/gateway/iox_to_dds.inl b/iceoryx_dds/include/iceoryx_dds/internal/gateway/iox_to_dds.inl index 9c8a0238c3..7891454600 100644 --- a/iceoryx_dds/include/iceoryx_dds/internal/gateway/iox_to_dds.inl +++ b/iceoryx_dds/include/iceoryx_dds/internal/gateway/iox_to_dds.inl @@ -19,6 +19,7 @@ #define IOX_DDS_IOX_TO_DDS_INL #include "iceoryx_dds/dds/dds_config.hpp" +#include "iceoryx_dds/dds/iox_chunk_datagram_header.hpp" #include "iceoryx_dds/internal/log/logging.hpp" #include "iceoryx_posh/capro/service_description.hpp" #include "iceoryx_posh/gateway/gateway_config.hpp" @@ -110,8 +111,15 @@ inline void Iceoryx2DDSGateway::forward(const channel_t& c { subscriber->take().and_then([&](const void* userPayload) { auto dataWriter = channel.getExternalTerminal(); - auto header = iox::mepoo::ChunkHeader::fromUserPayload(userPayload); - dataWriter->write(static_cast(userPayload), header->userPayloadSize()); + auto chunkHeader = iox::mepoo::ChunkHeader::fromUserPayload(userPayload); + iox::dds::IoxChunkDatagramHeader datagramHeader; + datagramHeader.userHeaderId = chunkHeader->userHeaderId(); + datagramHeader.userHeaderSize = chunkHeader->userHeaderSize(); + datagramHeader.userPayloadSize = chunkHeader->userPayloadSize(); + datagramHeader.userPayloadAlignment = chunkHeader->userPayloadAlignment(); + dataWriter->write(datagramHeader, + static_cast(chunkHeader->userHeader()), + static_cast(chunkHeader->userPayload())); subscriber->release(userPayload); }); } diff --git a/iceoryx_dds/source/iceoryx_dds/dds/cyclone_data_reader.cpp b/iceoryx_dds/source/iceoryx_dds/dds/cyclone_data_reader.cpp index 6fb397d7f3..09719c1f90 100644 --- a/iceoryx_dds/source/iceoryx_dds/dds/cyclone_data_reader.cpp +++ b/iceoryx_dds/source/iceoryx_dds/dds/cyclone_data_reader.cpp @@ -1,4 +1,5 @@ // Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// Copyright (c) 2021 by Apex.AI Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,6 +18,7 @@ #include "iceoryx_dds/dds/cyclone_data_reader.hpp" #include "iceoryx_dds/dds/cyclone_context.hpp" #include "iceoryx_dds/internal/log/logging.hpp" +#include "iceoryx_posh/mepoo/chunk_header.hpp" iox::dds::CycloneDataReader::CycloneDataReader(const capro::IdString_t serviceId, const capro::IdString_t instanceId, @@ -53,48 +55,108 @@ void iox::dds::CycloneDataReader::connect() noexcept } } -iox::cxx::optional iox::dds::CycloneDataReader::peekNextSize() +iox::cxx::optional +iox::dds::CycloneDataReader::peekNextIoxChunkDatagramHeader() noexcept { // ensure to only read sample - do not take - auto readSamples = m_impl.select().max_samples(1u).state(::dds::sub::status::SampleState::any()).read(); + auto readSamples = m_impl.select().max_samples(1U).state(::dds::sub::status::SampleState::any()).read(); - if (readSamples.length() > 0) + constexpr iox::cxx::nullopt_t NO_VALID_SAMPLE_AVAILABLE; + + if (readSamples.length() == 0) + { + return NO_VALID_SAMPLE_AVAILABLE; + } + + auto nextSample = readSamples.begin(); + auto& nextSamplePayload = nextSample->data().payload(); + auto nextSampleSize = nextSamplePayload.size(); + + auto dropSample = [&] { + m_impl.select().max_samples(1U).state(::dds::sub::status::SampleState::any()).take(); + return NO_VALID_SAMPLE_AVAILABLE; + }; + + // Ignore samples with no payload + if (nextSampleSize == 0) { - auto nextSample = readSamples.begin(); - auto nextSampleSize = nextSample->data().payload().size(); + LogError() << "[CycloneDataReader] received sample with size zero! Dropped sample!"; + return dropSample(); + } - // Ignore samples with no payload - if (nextSampleSize != 0) + // Ignore Invalid IoxChunkDatagramHeader + if (nextSampleSize < sizeof(iox::dds::IoxChunkDatagramHeader)) + { + auto log = LogError(); + log << "[CycloneDataReader] invalid sample size! Must be at least sizeof(IoxChunkDatagramHeader) = " + << sizeof(iox::dds::IoxChunkDatagramHeader) << " but got " << nextSampleSize; + if (nextSampleSize >= 1) { - return iox::cxx::optional(static_cast(nextSampleSize)); + log << "! Potential datagram version is " << static_cast(nextSamplePayload[0]) + << "! Dropped sample!"; } + return dropSample(); + } + + iox::dds::IoxChunkDatagramHeader::Serialized_t serializedDatagramHeader; + for (uint64_t i = 0U; i < serializedDatagramHeader.capacity(); ++i) + { + serializedDatagramHeader.emplace_back(nextSamplePayload[i]); } - // no valid samples available - return iox::cxx::nullopt_t(); + auto datagramHeader = iox::dds::IoxChunkDatagramHeader::deserialize(serializedDatagramHeader); + + if (datagramHeader.datagramVersion != iox::dds::IoxChunkDatagramHeader::DATAGRAM_VERSION) + { + LogError() << "[CycloneDataReader] received sample with incompatible IoxChunkDatagramHeader version! Received '" + << static_cast(datagramHeader.datagramVersion) << "', expected '" + << static_cast(iox::dds::IoxChunkDatagramHeader::DATAGRAM_VERSION) << "'! Dropped sample!"; + return dropSample(); + } + + if (datagramHeader.endianness != getEndianess()) + { + LogError() << "[CycloneDataReader] received sample with incompatible endianess! Received '" + << EndianessString[static_cast(datagramHeader.endianness)] << "', expected '" + << EndianessString[static_cast(getEndianess())] << "'! Dropped sample!"; + return dropSample(); + } + + return datagramHeader; } -bool iox::dds::CycloneDataReader::hasSamples() +bool iox::dds::CycloneDataReader::hasSamples() noexcept { auto samples = m_impl.select().max_samples(1u).state(::dds::sub::status::SampleState::any()).read(); return samples.length() > 0; } -iox::cxx::expected iox::dds::CycloneDataReader::takeNext(uint8_t* const buffer, - const uint64_t& bufferSize) +iox::cxx::expected +iox::dds::CycloneDataReader::takeNext(const iox::dds::IoxChunkDatagramHeader datagramHeader, + uint8_t* const userHeaderBuffer, + uint8_t* const userPayloadBuffer) noexcept { // validation checks if (!m_isConnected.load()) { return iox::cxx::error(iox::dds::DataReaderError::NOT_CONNECTED); } - if (buffer == nullptr) + // it is assume that peekNextIoxChunkDatagramHeader was called beforehand and that the provided datagramHeader + // belongs to this sample + if (datagramHeader.userHeaderSize > 0 + && (datagramHeader.userHeaderId == iox::mepoo::ChunkHeader::NO_USER_HEADER || userHeaderBuffer == nullptr)) { - return iox::cxx::error(iox::dds::DataReaderError::INVALID_RECV_BUFFER); + return iox::cxx::error( + iox::dds::DataReaderError::INVALID_BUFFER_PARAMETER_FOR_USER_HEADER); + } + if (datagramHeader.userPayloadSize > 0 && userPayloadBuffer == nullptr) + { + return iox::cxx::error( + iox::dds::DataReaderError::INVALID_BUFFER_PARAMETER_FOR_USER_PAYLOAD); } // take next sample and copy into buffer - auto takenSamples = m_impl.select().max_samples(1u).state(::dds::sub::status::SampleState::any()).take(); + auto takenSamples = m_impl.select().max_samples(1U).state(::dds::sub::status::SampleState::any()).take(); if (takenSamples.length() == 0) { // no samples available @@ -103,87 +165,54 @@ iox::cxx::expected iox::dds::CycloneDataReader::takeN // valid size auto nextSample = takenSamples.begin(); - auto sampleSize = nextSample->data().payload().size(); + auto samplePayload = nextSample->data().payload(); + auto sampleSize = samplePayload.size(); if (sampleSize == 0) { return iox::cxx::error(iox::dds::DataReaderError::INVALID_DATA); } - if (bufferSize < sampleSize) + if (sampleSize < sizeof(iox::dds::IoxChunkDatagramHeader)) { - // provided buffer is too small. - return iox::cxx::error(iox::dds::DataReaderError::RECV_BUFFER_TOO_SMALL); + return iox::cxx::error(iox::dds::DataReaderError::INVALID_DATAGRAM_HEADER_SIZE); } - // copy data into the provided buffer - auto bytes = nextSample->data().payload().data(); - std::copy(bytes, bytes + sampleSize, buffer); - - return iox::cxx::success<>(); -} - -iox::cxx::expected iox::dds::CycloneDataReader::take( - uint8_t* const buffer, const uint64_t& bufferSize, const iox::cxx::optional& maxSamples) -{ - if (!m_isConnected.load()) + iox::dds::IoxChunkDatagramHeader::Serialized_t serializedDatagramHeader; + for (uint64_t i = 0U; i < serializedDatagramHeader.capacity(); ++i) { - return iox::cxx::error(iox::dds::DataReaderError::NOT_CONNECTED); + serializedDatagramHeader.emplace_back(samplePayload[i]); } - if (buffer == nullptr) + + auto actualDatagramHeader = iox::dds::IoxChunkDatagramHeader::deserialize(serializedDatagramHeader); + + iox::cxx::Ensures(datagramHeader.userHeaderId == actualDatagramHeader.userHeaderId); + iox::cxx::Ensures(datagramHeader.userHeaderSize == actualDatagramHeader.userHeaderSize); + iox::cxx::Ensures(datagramHeader.userPayloadSize == actualDatagramHeader.userPayloadSize); + iox::cxx::Ensures(datagramHeader.userPayloadAlignment == actualDatagramHeader.userPayloadAlignment); + + auto dataSize = sampleSize - sizeof(iox::dds::IoxChunkDatagramHeader); + auto bufferSize = datagramHeader.userHeaderSize + datagramHeader.userPayloadSize; + + if (bufferSize != dataSize) { - return iox::cxx::error(iox::dds::DataReaderError::INVALID_RECV_BUFFER); + // provided buffer don't match + return iox::cxx::error(iox::dds::DataReaderError::BUFFER_SIZE_MISMATCH); } - // get size of the sample - auto peekResult = peekNextSize(); - if (peekResult.has_value()) + // copy data into the provided buffer + if (userHeaderBuffer) { - uint64_t sampleSize = peekResult.value(); - if (sampleSize == 0) - { - return iox::cxx::error(iox::dds::DataReaderError::INVALID_DATA); - } - if (bufferSize < sampleSize) - { - // Provided buffer is too small. - return iox::cxx::error(iox::dds::DataReaderError::RECV_BUFFER_TOO_SMALL); - } - - // take maximum amount possible up to the cap: maxSamples - auto bufferCapacity = bufferSize / sampleSize; - - auto numToTake = bufferCapacity; - if (maxSamples.has_value()) - { - if (bufferCapacity > maxSamples.value()) - { - numToTake = maxSamples.value(); - } - } - auto samples = m_impl.select() - .max_samples(static_cast(numToTake)) - .state(::dds::sub::status::SampleState::any()) - .take(); - - // copy data into the provided buffer - uint64_t numSamplesBuffered = 0u; - if (samples.length() > 0) - { - // do copy - uint64_t cursor = 0; // Tracks the position in the buffer to write next sample. - for (const auto& sample : samples) - { - auto bytes = sample.data().payload().data(); - std::copy(bytes, bytes + sampleSize, &buffer[cursor]); - cursor += sampleSize; - } - numSamplesBuffered = cursor / sampleSize; - } - return iox::cxx::success(numSamplesBuffered); + auto userHeaderBytes = &samplePayload.data()[sizeof(iox::dds::IoxChunkDatagramHeader)]; + std::memcpy(userHeaderBuffer, userHeaderBytes, datagramHeader.userHeaderSize); } - else + + if (userPayloadBuffer) { - return iox::cxx::success(0u); + auto userPayloadBytes = + &samplePayload.data()[sizeof(iox::dds::IoxChunkDatagramHeader) + datagramHeader.userHeaderSize]; + std::memcpy(userPayloadBuffer, userPayloadBytes, datagramHeader.userPayloadSize); } + + return iox::cxx::success<>(); } iox::capro::IdString_t iox::dds::CycloneDataReader::getServiceId() const noexcept diff --git a/iceoryx_dds/source/iceoryx_dds/dds/cyclone_data_writer.cpp b/iceoryx_dds/source/iceoryx_dds/dds/cyclone_data_writer.cpp index 0f5c5d89f5..21ec255d2e 100644 --- a/iceoryx_dds/source/iceoryx_dds/dds/cyclone_data_writer.cpp +++ b/iceoryx_dds/source/iceoryx_dds/dds/cyclone_data_writer.cpp @@ -1,4 +1,5 @@ // Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// Copyright (c) 2021 by Apex.AI Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,6 +18,7 @@ #include "iceoryx_dds/dds/cyclone_data_writer.hpp" #include "iceoryx_dds/dds/cyclone_context.hpp" #include "iceoryx_dds/internal/log/logging.hpp" +#include "iceoryx_posh/mepoo/chunk_header.hpp" #include #include @@ -48,10 +50,45 @@ void iox::dds::CycloneDataWriter::connect() noexcept LogDebug() << "[CycloneDataWriter] Connected to topic: " << topic; } -void iox::dds::CycloneDataWriter::write(const uint8_t* const bytes, const uint64_t size) noexcept +void iox::dds::CycloneDataWriter::write(iox::dds::IoxChunkDatagramHeader datagramHeader, + const uint8_t* const userHeaderBytes, + const uint8_t* const userPayloadBytes) noexcept { + if (datagramHeader.userHeaderSize > 0 + && (datagramHeader.userHeaderId == iox::mepoo::ChunkHeader::NO_USER_HEADER || userHeaderBytes == nullptr)) + { + LogError() << "[CycloneDataWriter] invalid user-header parameter! Dropping chunk!"; + return; + } + if (datagramHeader.userPayloadSize > 0 && userPayloadBytes == nullptr) + { + LogError() << "[CycloneDataWriter] invalid user-payload parameter! Dropping chunk!"; + return; + } + + datagramHeader.endianness = getEndianess(); + + auto serializedDatagramHeader = iox::dds::IoxChunkDatagramHeader::serialize(datagramHeader); + auto datagramSize = + serializedDatagramHeader.size() + datagramHeader.userHeaderSize + datagramHeader.userPayloadSize; + auto chunk = Mempool::Chunk(); - std::copy(bytes, bytes + size, std::back_inserter(chunk.payload())); + chunk.payload().reserve(datagramSize); + + std::copy(serializedDatagramHeader.data(), + serializedDatagramHeader.data() + serializedDatagramHeader.size(), + std::back_inserter(chunk.payload())); + if (datagramHeader.userHeaderSize > 0 && userHeaderBytes != nullptr ) + { + std::copy( + userHeaderBytes, userHeaderBytes + datagramHeader.userHeaderSize, std::back_inserter(chunk.payload())); + } + if (datagramHeader.userPayloadSize > 0 && userPayloadBytes != nullptr) + { + std::copy( + userPayloadBytes, userPayloadBytes + datagramHeader.userPayloadSize, std::back_inserter(chunk.payload())); + } + m_writer.write(chunk); } diff --git a/iceoryx_dds/source/iceoryx_dds/dds/iox_chunk_datagram_header.cpp b/iceoryx_dds/source/iceoryx_dds/dds/iox_chunk_datagram_header.cpp new file mode 100644 index 0000000000..3b231b47f8 --- /dev/null +++ b/iceoryx_dds/source/iceoryx_dds/dds/iox_chunk_datagram_header.cpp @@ -0,0 +1,97 @@ +// Copyright (c) 2021 by Apex.AI Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +#include "iceoryx_dds/dds/iox_chunk_datagram_header.hpp" +#include "iceoryx_utils/cxx/helplets.hpp" + +namespace iox +{ +namespace dds +{ +Endianess getEndianess() +{ + uint32_t endianDetector{0x01020304}; + switch (reinterpret_cast(&endianDetector)[0U]) + { + case 1U: + return Endianess::BIG; + case 4U: + return Endianess::LITTLE; + case 2U: + return Endianess::MIXED; + default: + return Endianess::UNDEFINED; + } + + return Endianess::UNDEFINED; +} + +IoxChunkDatagramHeader::Serialized_t IoxChunkDatagramHeader::serialize(const IoxChunkDatagramHeader& datagramHeader) +{ + Serialized_t serializedDatagram; + + serializedDatagram.emplace_back(datagramHeader.datagramVersion); + serializedDatagram.emplace_back(static_cast(datagramHeader.endianness)); + serializedDatagram.emplace_back(static_cast((datagramHeader.userHeaderId >> 8U) & 0xFF)); + serializedDatagram.emplace_back(static_cast(datagramHeader.userHeaderId & 0xFF)); + serializedDatagram.emplace_back(static_cast((datagramHeader.userHeaderSize >> 24U) & 0xFF)); + serializedDatagram.emplace_back(static_cast((datagramHeader.userHeaderSize >> 16U) & 0xFF)); + serializedDatagram.emplace_back(static_cast((datagramHeader.userHeaderSize >> 8U) & 0xFF)); + serializedDatagram.emplace_back(static_cast(datagramHeader.userHeaderSize & 0xFF)); + serializedDatagram.emplace_back(static_cast((datagramHeader.userPayloadSize >> 24U) & 0xFF)); + serializedDatagram.emplace_back(static_cast((datagramHeader.userPayloadSize >> 16U) & 0xFF)); + serializedDatagram.emplace_back(static_cast((datagramHeader.userPayloadSize >> 8U) & 0xFF)); + serializedDatagram.emplace_back(static_cast(datagramHeader.userPayloadSize & 0xFF)); + serializedDatagram.emplace_back(static_cast((datagramHeader.userPayloadAlignment >> 24U) & 0xFF)); + serializedDatagram.emplace_back(static_cast((datagramHeader.userPayloadAlignment >> 16U) & 0xFF)); + serializedDatagram.emplace_back(static_cast((datagramHeader.userPayloadAlignment >> 8U) & 0xFF)); + auto successfullyPushed = + serializedDatagram.emplace_back(static_cast(datagramHeader.userPayloadAlignment & 0xFF)); + + iox::cxx::Ensures(successfullyPushed && "Expected to successfully serialize IoxChunkDatagramHeader!"); + + return serializedDatagram; +} + +IoxChunkDatagramHeader +IoxChunkDatagramHeader::deserialize(const IoxChunkDatagramHeader::Serialized_t& serializedDatagramHeader) +{ + iox::cxx::Expects(serializedDatagramHeader.size() == 16U && "Expects valid IoxChunkDatagramHeader serialization!"); + + IoxChunkDatagramHeader datagramHeader; + + datagramHeader.datagramVersion = serializedDatagramHeader[0U]; + datagramHeader.endianness = static_cast(serializedDatagramHeader[1U]); + datagramHeader.userHeaderId = static_cast((static_cast(serializedDatagramHeader[2U]) << 8U) + | static_cast(serializedDatagramHeader[3U])); + datagramHeader.userHeaderSize = (static_cast(serializedDatagramHeader[4U]) << 24U) + | (static_cast(serializedDatagramHeader[5U]) << 16U) + | (static_cast(serializedDatagramHeader[6U]) << 8U) + | static_cast(serializedDatagramHeader[7U]); + datagramHeader.userPayloadSize = (static_cast(serializedDatagramHeader[8U]) << 24U) + | (static_cast(serializedDatagramHeader[9U]) << 16U) + | (static_cast(serializedDatagramHeader[10U]) << 8U) + | static_cast(serializedDatagramHeader[11U]); + datagramHeader.userPayloadAlignment = (static_cast(serializedDatagramHeader[12U]) << 24U) + | (static_cast(serializedDatagramHeader[13U]) << 16U) + | (static_cast(serializedDatagramHeader[14U]) << 8U) + | static_cast(serializedDatagramHeader[15U]); + + return datagramHeader; +} + +} // namespace dds +} // namespace iox diff --git a/iceoryx_dds/test/mocks/google_mocks.hpp b/iceoryx_dds/test/mocks/google_mocks.hpp index 7f01eb2893..bbafddb095 100644 --- a/iceoryx_dds/test/mocks/google_mocks.hpp +++ b/iceoryx_dds/test/mocks/google_mocks.hpp @@ -1,4 +1,5 @@ -// Copyright (c) 2020 by Robert Bosch GmbH, Apex.AI Inc. All rights reserved. +// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// Copyright (c) 2020 - 2021 by Apex.AI Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,6 +20,7 @@ #include "iceoryx_dds/dds/data_reader.hpp" #include "iceoryx_dds/dds/data_writer.hpp" +#include "iceoryx_dds/dds/dds_types.hpp" #include "iceoryx_posh/gateway/channel.hpp" #include "iceoryx_posh/gateway/gateway_generic.hpp" #include "iceoryx_posh/iceoryx_posh_types.hpp" @@ -68,12 +70,11 @@ class MockDataReader public: MockDataReader(const iox::capro::ServiceDescription&){}; MOCK_METHOD0(connect, void(void)); - MOCK_METHOD0(peekNextSize, iox::cxx::optional(void)); - MOCK_METHOD2(takeNext, iox::cxx::expected(uint8_t* const, const uint64_t&)); - MOCK_METHOD3(take, - iox::cxx::expected(uint8_t* const buffer, - const uint64_t&, - const iox::cxx::optional&)); + MOCK_METHOD0(peekNextIoxChunkDatagramHeader, iox::cxx::optional(void)); + MOCK_METHOD3(takeNext, + iox::cxx::expected(const iox::dds::IoxChunkDatagramHeader, + uint8_t* const, + uint8_t* const)); MOCK_CONST_METHOD0(getServiceId, std::string(void)); MOCK_CONST_METHOD0(getInstanceId, std::string(void)); MOCK_CONST_METHOD0(getEventId, std::string(void)); @@ -84,7 +85,7 @@ class MockDataWriter public: MockDataWriter(const iox::capro::ServiceDescription&){}; MOCK_METHOD0(connect, void(void)); - MOCK_METHOD2(write, bool(uint8_t*, uint64_t)); + MOCK_METHOD3(write, bool(iox::dds::IoxChunkDatagramHeader, const uint8_t* const, const uint8_t* const)); MOCK_CONST_METHOD0(getServiceId, std::string(void)); MOCK_CONST_METHOD0(getInstanceId, std::string(void)); MOCK_CONST_METHOD0(getEventId, std::string(void)); diff --git a/iceoryx_dds/test/moduletests/test_cyclone_data_reader.cpp b/iceoryx_dds/test/moduletests/test_cyclone_data_reader.cpp index 1d2a99ecb3..9b346a7ee2 100644 --- a/iceoryx_dds/test/moduletests/test_cyclone_data_reader.cpp +++ b/iceoryx_dds/test/moduletests/test_cyclone_data_reader.cpp @@ -1,4 +1,5 @@ // Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// Copyright (c) 2021 by Apex.AI Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,6 +16,7 @@ // SPDX-License-Identifier: Apache-2.0 #include "iceoryx_dds/dds/cyclone_data_reader.hpp" +#include "iceoryx_posh/testing/mocks/chunk_mock.hpp" #include "test.hpp" #include @@ -30,6 +32,17 @@ namespace dds // ======================================== Helpers ======================================== // using TestDataReader = CycloneDataReader; +struct DummyPayload +{ + uint64_t a; + uint64_t b; + uint64_t c; +}; +struct DummyUserHeader +{ + uint64_t a; +}; + // ======================================== Fixture ======================================== // class CycloneDataReaderTest : public Test { @@ -42,40 +55,47 @@ class CycloneDataReaderTest : public Test TEST_F(CycloneDataReaderTest, DoesNotAttemptToReadWhenDisconnected) { // ===== Setup - uint64_t bufferSize = 1024; - uint8_t buffer[bufferSize]; + ChunkMock chunkMock; + iox::dds::IoxChunkDatagramHeader datagramHeader; + datagramHeader.endianness = getEndianess(); + datagramHeader.userPayloadSize = chunkMock.chunkHeader()->userPayloadSize(); + datagramHeader.userPayloadAlignment = chunkMock.chunkHeader()->userPayloadAlignment(); // ===== Test TestDataReader reader{"", "", ""}; - auto takeResult = reader.take(buffer, bufferSize, iox::cxx::nullopt); - EXPECT_EQ(true, takeResult.has_error()); - EXPECT_EQ(iox::dds::DataReaderError::NOT_CONNECTED, takeResult.get_error()); - - auto takeNextResult = reader.takeNext(buffer, bufferSize); - EXPECT_EQ(true, takeNextResult.has_error()); - EXPECT_EQ(iox::dds::DataReaderError::NOT_CONNECTED, takeResult.get_error()); + auto takeNextResult = reader.takeNext(datagramHeader, + static_cast(chunkMock.chunkHeader()->userHeader()), + static_cast(chunkMock.chunkHeader()->userPayload())); + ASSERT_EQ(true, takeNextResult.has_error()); + EXPECT_EQ(iox::dds::DataReaderError::NOT_CONNECTED, takeNextResult.get_error()); } TEST_F(CycloneDataReaderTest, ReturnsErrorWhenAttemptingToReadIntoANullBuffer) { // ===== Setup - uint64_t bufferSize = 0; - uint8_t* buffer = nullptr; + ChunkMock chunkMock; + iox::dds::IoxChunkDatagramHeader datagramHeader; + datagramHeader.endianness = getEndianess(); + datagramHeader.userHeaderId = iox::mepoo::ChunkHeader::UNKNOWN_USER_HEADER; + datagramHeader.userHeaderSize = chunkMock.chunkHeader()->userHeaderSize(); + datagramHeader.userPayloadSize = chunkMock.chunkHeader()->userPayloadSize(); + datagramHeader.userPayloadAlignment = chunkMock.chunkHeader()->userPayloadAlignment(); // ===== Test TestDataReader reader{"", "", ""}; reader.connect(); - auto takeResult = reader.take(buffer, bufferSize, iox::cxx::nullopt); - EXPECT_EQ(true, takeResult.has_error()); - EXPECT_EQ(iox::dds::DataReaderError::INVALID_RECV_BUFFER, takeResult.get_error()); + auto takeNextResult1 = + reader.takeNext(datagramHeader, nullptr, static_cast(chunkMock.chunkHeader()->userPayload())); + ASSERT_EQ(true, takeNextResult1.has_error()); + EXPECT_EQ(iox::dds::DataReaderError::INVALID_BUFFER_PARAMETER_FOR_USER_HEADER, takeNextResult1.get_error()); - auto takeNextResult = reader.takeNext(buffer, bufferSize); - EXPECT_EQ(true, takeNextResult.has_error()); - EXPECT_EQ(iox::dds::DataReaderError::INVALID_RECV_BUFFER, takeNextResult.get_error()); + auto takeNextResult2 = + reader.takeNext(datagramHeader, static_cast(chunkMock.chunkHeader()->userHeader()), nullptr); + ASSERT_EQ(true, takeNextResult2.has_error()); + EXPECT_EQ(iox::dds::DataReaderError::INVALID_BUFFER_PARAMETER_FOR_USER_PAYLOAD, takeNextResult2.get_error()); } - } // namespace dds } // namespace iox diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_popper.inl b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_popper.inl index ce80b2cec9..24d2125a4f 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_popper.inl +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_popper.inl @@ -52,7 +52,19 @@ inline cxx::optional ChunkQueuePopper::t // check if queue had an element that was poped and return if so if (retVal.has_value()) { - return cxx::make_optional(retVal.value().releaseToSharedChunk()); + auto chunk = retVal.value().releaseToSharedChunk(); + + auto receivedChunkHeaderVersion = chunk.getChunkHeader()->chunkHeaderVersion(); + if (receivedChunkHeaderVersion != mepoo::ChunkHeader::CHUNK_HEADER_VERSION) + { + LogError() << "Received chunk with CHUNK_HEADER_VERSION '" << receivedChunkHeaderVersion + << "' but expected '" << mepoo::ChunkHeader::CHUNK_HEADER_VERSION << "'! Dropping chunk!"; + errorHandler(Error::kPOPO__CHUNK_QUEUE_POPPER_CHUNK_WITH_INCOMPATIBLE_CHUNK_HEADER_VERSION, + nullptr, + ErrorLevel::SEVERE); + return cxx::nullopt_t(); + } + return cxx::make_optional(chunk); } else { diff --git a/iceoryx_posh/include/iceoryx_posh/mepoo/chunk_header.hpp b/iceoryx_posh/include/iceoryx_posh/mepoo/chunk_header.hpp index fa83cd90ec..7d187ddae8 100644 --- a/iceoryx_posh/include/iceoryx_posh/mepoo/chunk_header.hpp +++ b/iceoryx_posh/include/iceoryx_posh/mepoo/chunk_header.hpp @@ -61,10 +61,19 @@ struct ChunkHeader /// - semantic meaning of a member changes static constexpr uint8_t CHUNK_HEADER_VERSION{1U}; + /// @brief User-Header id for no user-header + static constexpr uint16_t NO_USER_HEADER{0x0000}; + /// @brief User-Header id for an unknown user-header + static constexpr uint16_t UNKNOWN_USER_HEADER{0xFFFF}; + /// @brief The ChunkHeader version is used to detect incompatibilities for record&replay functionality /// @return the ChunkHeader version uint8_t chunkHeaderVersion() const noexcept; + /// @brief The id of the user-header used by the chunk; if no user-header is used, this is set to NO_USER_HEADER + /// @return the user-header id of the chunk + uint16_t userHeaderId() const noexcept; + /// @brief Get the pointer to the user-header /// @return the pointer to the user-header void* userHeader() noexcept; @@ -139,7 +148,9 @@ struct ChunkHeader uint32_t m_chunkSize{0U}; uint8_t m_chunkHeaderVersion{CHUNK_HEADER_VERSION}; // reserved for future functionality and used to indicate the padding bytes; currently not used and set to `0` - uint8_t m_reserved[3]{}; + uint8_t m_reserved{0}; + // currently just a placeholder + uint16_t m_userHeaderId{NO_USER_HEADER}; UniquePortId m_originId{popo::InvalidId}; uint64_t m_sequenceNumber{0U}; uint32_t m_userHeaderSize{0U}; diff --git a/iceoryx_posh/source/mepoo/chunk_header.cpp b/iceoryx_posh/source/mepoo/chunk_header.cpp index eb770fb74c..88d1e8da38 100644 --- a/iceoryx_posh/source/mepoo/chunk_header.cpp +++ b/iceoryx_posh/source/mepoo/chunk_header.cpp @@ -75,6 +75,9 @@ ChunkHeader::ChunkHeader(const uint32_t chunkSize, const ChunkSettings& chunkSet } else { + // currently there is no way to set the user-header id; this is just a preparation for future functionality + m_userHeaderId = UNKNOWN_USER_HEADER; + // the most complex case with a user-header auto addressOfChunkHeader = reinterpret_cast(this); uint64_t headerEndAddress = addressOfChunkHeader + sizeof(ChunkHeader) + userHeaderSize; @@ -108,8 +111,17 @@ uint8_t ChunkHeader::chunkHeaderVersion() const noexcept return m_chunkHeaderVersion; } +uint16_t ChunkHeader::userHeaderId() const noexcept +{ + return m_userHeaderId; +} + void* ChunkHeader::userHeader() noexcept { + if (m_userHeaderId == NO_USER_HEADER) + { + return nullptr; + } // the UserHeader is always located relative to "this" in this way return reinterpret_cast(reinterpret_cast(this) + sizeof(ChunkHeader)); } diff --git a/iceoryx_posh/source/roudi/roudi.cpp b/iceoryx_posh/source/roudi/roudi.cpp index b3b69011bd..f7c1a130a3 100644 --- a/iceoryx_posh/source/roudi/roudi.cpp +++ b/iceoryx_posh/source/roudi/roudi.cpp @@ -101,8 +101,16 @@ void RouDi::shutdown() m_prcMgr->requestShutdownOfAllProcesses(); + using namespace units::duration_literals; + auto remainingDurationForWarnPrint = m_processKillDelay - 2_s; while (m_prcMgr->isAnyRegisteredProcessStillRunning() && !finalKillTimer.hasExpired()) { + if (remainingDurationForWarnPrint > finalKillTimer.remainingTime()) + { + LogWarn() << "Some applications seem to not shutdown gracefully! Time until hard shutdown: " + << finalKillTimer.remainingTime().toSeconds() << "s!"; + remainingDurationForWarnPrint = remainingDurationForWarnPrint - 5_s; + } // give processes some time to terminate std::this_thread::sleep_for(std::chrono::milliseconds(PROCESS_TERMINATED_CHECK_INTERVAL.toMilliseconds())); } diff --git a/iceoryx_posh/test/moduletests/test_mepoo_chunk_header.cpp b/iceoryx_posh/test/moduletests/test_mepoo_chunk_header.cpp index 067fcda1ee..ed7259df3d 100644 --- a/iceoryx_posh/test/moduletests/test_mepoo_chunk_header.cpp +++ b/iceoryx_posh/test/moduletests/test_mepoo_chunk_header.cpp @@ -49,6 +49,7 @@ TEST(ChunkHeader_test, ChunkHeaderHasInitializedMembers) EXPECT_THAT(sut.sequenceNumber(), Eq(0U)); + EXPECT_THAT(sut.userHeaderId(), Eq(ChunkHeader::NO_USER_HEADER)); EXPECT_THAT(sut.userHeaderSize(), Eq(0U)); EXPECT_THAT(sut.userPayloadSize(), Eq(USER_PAYLOAD_SIZE)); EXPECT_THAT(sut.userPayloadAlignment(), Eq(USER_PAYLOAD_ALIGNMENT)); @@ -291,14 +292,17 @@ void createChunksOnMultipleAddresses(const PayloadParams& userPayloadParams, } } -void checkUserHeaderSizeAndPayloadSizeAndAlignmentIsSet(const ChunkHeader& sut, - const PayloadParams& userPayloadParams, - const uint32_t userHeaderSize) +void checkUserHeaderIdAndSizeAndPayloadSizeAndAlignmentIsSet(const ChunkHeader& sut, + const PayloadParams& userPayloadParams, + const uint16_t userHeaderId, + const uint32_t userHeaderSize) { + SCOPED_TRACE(std::string("Check user-header id and size and user-payload alignment ist correctly set")); EXPECT_EQ(sut.userPayloadSize(), userPayloadParams.size); // a user-payload alignment of zero will internally be set to one auto adjustedAlignment = userPayloadParams.alignment == 0U ? 1U : userPayloadParams.alignment; EXPECT_EQ(sut.userPayloadAlignment(), adjustedAlignment); + EXPECT_EQ(sut.userHeaderId(), userHeaderId); EXPECT_EQ(sut.userHeaderSize(), userHeaderSize); } @@ -411,7 +415,8 @@ TEST_P(ChunkHeader_AlteringUserPayloadWithoutUserHeader, CheckIntegrityOfChunkHe constexpr uint32_t USER_HEADER_ALIGNMENT{iox::CHUNK_NO_USER_HEADER_ALIGNMENT}; createChunksOnMultipleAddresses(userPayloadParams, USER_HEADER_SIZE, USER_HEADER_ALIGNMENT, [&](ChunkHeader& sut) { - checkUserHeaderSizeAndPayloadSizeAndAlignmentIsSet(sut, userPayloadParams, USER_HEADER_SIZE); + checkUserHeaderIdAndSizeAndPayloadSizeAndAlignmentIsSet( + sut, userPayloadParams, ChunkHeader::NO_USER_HEADER, USER_HEADER_SIZE); checkUserPayloadNotOverlappingWithChunkHeader(sut); checkUserPayloadSize(sut, userPayloadParams); checkUserPayloadAlignment(sut, userPayloadParams); @@ -483,7 +488,8 @@ TEST_P(ChunkHeader_AlteringUserPayloadWithUserHeader, CheckIntegrityOfChunkHeade createChunksOnMultipleAddresses( userPayloadParams, userHeaderSize, userHeaderAlignment, [&](ChunkHeader& sut) { - checkUserHeaderSizeAndPayloadSizeAndAlignmentIsSet(sut, userPayloadParams, userHeaderSize); + checkUserHeaderIdAndSizeAndPayloadSizeAndAlignmentIsSet( + sut, userPayloadParams, ChunkHeader::UNKNOWN_USER_HEADER, userHeaderSize); checkUserHeaderIsAdjacentToChunkHeader(sut); checkUserPayloadNotOverlappingWithUserHeader(sut, userHeaderSize); checkUserPayloadSize(sut, userPayloadParams); diff --git a/iceoryx_posh/test/moduletests/test_popo_chunk_queue.cpp b/iceoryx_posh/test/moduletests/test_popo_chunk_queue.cpp index 2503fbb407..7e65bbcf84 100644 --- a/iceoryx_posh/test/moduletests/test_popo_chunk_queue.cpp +++ b/iceoryx_posh/test/moduletests/test_popo_chunk_queue.cpp @@ -157,6 +157,28 @@ TYPED_TEST(ChunkQueue_test, PushedChunksMustBePoppedInTheSameOrder) } } +TYPED_TEST(ChunkQueue_test, PopChunkWithIncompatibleChunkHeaderCallsErrorHandler) +{ + auto chunk = this->allocateChunk(); + // this is currently the only possibility to test an invalid CHUNK_HEADER_VERSION + auto chunkHeaderAddress = reinterpret_cast(chunk.getChunkHeader()); + auto chunkHeaderVersionAddress = chunkHeaderAddress + sizeof(uint32_t); + auto chunkHeaderVersionPointer = reinterpret_cast(chunkHeaderVersionAddress); + *chunkHeaderVersionPointer = std::numeric_limits::max(); + + this->m_pusher.push(chunk); + + iox::Error receivedError{iox::Error::kNO_ERROR}; + auto errorHandlerGuard = iox::ErrorHandler::SetTemporaryErrorHandler( + [&](const iox::Error error, const std::function, const iox::ErrorLevel errorLevel) { + receivedError = error; + EXPECT_EQ(errorLevel, iox::ErrorLevel::SEVERE); + }); + + EXPECT_FALSE(this->m_popper.tryPop().has_value()); + EXPECT_EQ(receivedError, iox::Error::kPOPO__CHUNK_QUEUE_POPPER_CHUNK_WITH_INCOMPATIBLE_CHUNK_HEADER_VERSION); +} + TYPED_TEST(ChunkQueue_test, ClearOnEmpty) { this->m_popper.clear(); @@ -347,14 +369,14 @@ TYPED_TEST(ChunkQueueSoFi_test, InitialNoLostChunks) TYPED_TEST(ChunkQueueSoFi_test, IndicateALostChunk) { - this->m_pusher.lostAChunk(); + this->m_pusher.lostAChunk(); EXPECT_TRUE(this->m_popper.hasLostChunks()); } TYPED_TEST(ChunkQueueSoFi_test, LostChunkInfoIsResetAfterRead) { - this->m_pusher.lostAChunk(); + this->m_pusher.lostAChunk(); this->m_popper.hasLostChunks(); EXPECT_FALSE(this->m_popper.hasLostChunks()); diff --git a/iceoryx_utils/include/iceoryx_utils/error_handling/error_handling.hpp b/iceoryx_utils/include/iceoryx_utils/error_handling/error_handling.hpp index d157cb6090..749afda02d 100644 --- a/iceoryx_utils/include/iceoryx_utils/error_handling/error_handling.hpp +++ b/iceoryx_utils/include/iceoryx_utils/error_handling/error_handling.hpp @@ -74,6 +74,7 @@ namespace iox error(POPO__APPLICATION_PORT_QUEUE_OVERFLOW) \ error(POPO__BASE_SUBSCRIBER_OVERRIDING_WITH_EVENT_SINCE_HAS_DATA_OR_DATA_RECEIVED_ALREADY_ATTACHED) \ error(POPO__BASE_SUBSCRIBER_OVERRIDING_WITH_STATE_SINCE_HAS_DATA_OR_DATA_RECEIVED_ALREADY_ATTACHED) \ + error(POPO__CHUNK_QUEUE_POPPER_CHUNK_WITH_INCOMPATIBLE_CHUNK_HEADER_VERSION) \ error(POPO__CHUNK_DISTRIBUTOR_OVERFLOW_OF_QUEUE_CONTAINER) \ error(POPO__CHUNK_DISTRIBUTOR_CLEANUP_DEADLOCK_BECAUSE_BAD_APPLICATION_TERMINATION) \ error(POPO__CHUNK_SENDER_INVALID_CHUNK_TO_FREE_FROM_USER) \