Skip to content

Commit

Permalink
Merge pull request #741 from ApexAI/iox-#14-make-gateway-aware-of-lar…
Browse files Browse the repository at this point in the history
…ge-payload-alignments

Iox #14 make gateway aware of large payload alignments
  • Loading branch information
elBoberido authored Apr 15, 2021
2 parents d6eaa11 + 3b7b947 commit 2ff7cbf
Show file tree
Hide file tree
Showing 21 changed files with 538 additions and 232 deletions.
71 changes: 5 additions & 66 deletions doc/design/chunk_header.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class ChunkHeader
{
uint32_t chunkSize;
uint8_t chunkHeaderVersion;
uint8_t reserved[3];
uint8_t reserved{0};
uint16_t userHeaderId;
uint64_t originId;
uint64_t sequenceNumber;
uint32_t userHeaderSize{0U};
Expand All @@ -57,7 +58,8 @@ class ChunkHeader

- **chunkSize** is the size of the whole chunk
- **chunkHeaderVersion** is used to detect incompatibilities for record&replay functionality
- **reserved[3]** is currently not used and set to `0`
- **reserved** is currently not used and set to `0`
- **userHeaderId** is currently not used and set to `NO_USER_HEADER`
- **originId** is the unique identifier of the publisher the chunk was sent from
- **sequenceNumber** is a serial number for the sent chunks
- **userPayloadSize** is the size of the chunk occupied by the user-header
Expand Down Expand Up @@ -247,33 +249,7 @@ Furthermore, the `Publisher` and `Subscriber` have access to the `ChunkHeader` a

## Open Issues

- the design was done with the intention to have a user-header of arbitrary size, if the size is limited to e.g. 32 bytes, some things could be simplified
- publisher/subscriber API proposal
```
// publisher
auto pub = iox::popo::Publisher<MyPayload, MyUserHeader>(serviceDescription);
pub.loan()
.and_then([&](auto& sample) {
sample.getHeader()->userHeader<MyUserHeader>()->data = 42;
sample->a = 42;
sample->b = 13;
sample.publish();
})
.or_else([](iox::popo::AllocationError) {
// Do something with error.
});
// subscriber
auto sub = iox::popo::Subscriber<MyPayload, MyUserHeader>(serviceDescription);
sub->take()
.and_then([](auto& sample){
std::cout << "User-Header data: " << sample.getHeader()->userHeader<MyUserHeader>()->data << std::endl;
std::cout << "User-Payload data: " << static_cast<const MyPayload*>(sample->get())->data << std::endl;
});
```
- the publisher/subscriber would have a default parameter for the user-header to be source compatible with our current API
- the drawback is that the user could use the wrong user-header. Maybe `Sample` also needs an additional template parameter
- additionally, a `ChunkHeaderHook` could be used on the publisher side
- a `ChunkHeaderHook` could be used on the publisher side
```
template <typename UserHeader>
class MyChunkHeaderHook : public ChunkHeaderHook
Expand All @@ -290,43 +266,6 @@ auto pub = iox::popo::Publisher<MyPayload>(serviceDescription, userHeaderHook);
```
- alternatively, instead of the ChunkHeaderHook class, the publisher could have a method `registerDeliveryHook(std::function<void(ChunkHeader&)>)`
- allow the user only read access to the `ChunkHeader` and write access to the `UserHeader`
- untyped publisher/subscriber API proposal
```
// publisher option 1
auto pub = iox::popo::UntypedPublisher<MyUserHeader>(serviceDescription);
// publisher option 2
auto userHeaderSize = sizeOf(MyUserHeader);
auto pub = iox::popo::UntypedPublisher(serviceDescription, userHeaderSize);
auto payloadSize = sizeof(MyPayload);
auto payloadAlignment = alignof(MyPayload);
pub.loan(payloadSize, payloadAlignment)
.and_then([&](auto& sample) {
sample.getHeader()->userHeader<MyUserHeader>()->data = 42;
auto payload = new (sample.get()) MyPayload();
payload->data = 73;
sample.publish();
})
.or_else([](iox::popo::AllocationError) {
// Do something with error.
});
// subscriber option 1
auto pub = iox::popo::UntypedPublisher<MyUserHeader>(serviceDescription);
// subscriber option 2
auto userHeaderSize = sizeOf(MyUserHeader);
auto pub = iox::popo::UntypedSubscriber(serviceDescription, userHeaderSize);
sub->take()
.and_then([](auto& sample){
std::cout << "User-Header data: " << sample.getHeader()->userHeader<MyUserHeader>()->data << std::endl;
std::cout << "User-Payload data: " << static_cast<const MyPayload*>(sample->get())->data << std::endl;
});
```
- option 1 has the benefit to catch a wrong alignment of the user-header and would be necessary if we make the `Sample` aware of the user-header
- C bindings
- PoC is needed
- user defined sequence number
- this can probably be done by a `ChunkHeaderHook`
- alternatively, a flag could be introduce into the `ChunkHeader`
Expand Down
1 change: 1 addition & 0 deletions iceoryx_dds/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ if(USE_CYCLONE_DDS)
source/iceoryx_dds/dds/cyclone_context.cpp
source/iceoryx_dds/dds/cyclone_data_reader.cpp
source/iceoryx_dds/dds/cyclone_data_writer.cpp
source/iceoryx_dds/dds/iox_chunk_datagram_header.cpp
)

# Generate IDL at configure time
Expand Down
12 changes: 6 additions & 6 deletions iceoryx_dds/include/iceoryx_dds/dds/cyclone_data_reader.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2021 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,12 +47,11 @@ class CycloneDataReader : public DataReader

void connect() noexcept override;

iox::cxx::optional<uint32_t> peekNextSize() override;
bool hasSamples() override;
iox::cxx::expected<DataReaderError> takeNext(uint8_t* const buffer, const uint64_t& bufferSize) override;

iox::cxx::expected<uint64_t, DataReaderError>
take(uint8_t* const buffer, const uint64_t& bufferSize, const iox::cxx::optional<uint64_t>& maxSamples) override;
iox::cxx::optional<IoxChunkDatagramHeader> peekNextIoxChunkDatagramHeader() noexcept override;
bool hasSamples() noexcept override;
iox::cxx::expected<DataReaderError> takeNext(const IoxChunkDatagramHeader datagramHeader,
uint8_t* const userHeaderBuffer,
uint8_t* const userPayloadBuffer) noexcept override;

capro::IdString_t getServiceId() const noexcept override;
capro::IdString_t getInstanceId() const noexcept override;
Expand Down
5 changes: 4 additions & 1 deletion iceoryx_dds/include/iceoryx_dds/dds/cyclone_data_writer.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2021 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,7 +45,9 @@ class CycloneDataWriter : public iox::dds::DataWriter
CycloneDataWriter& operator=(CycloneDataWriter&&) = default;

void connect() noexcept override;
void write(const uint8_t* const bytes, const uint64_t size) noexcept override;
void write(iox::dds::IoxChunkDatagramHeader datagramHeader,
const uint8_t* const userHeaderBytes,
const uint8_t* const userPayloadBytes) noexcept override;
capro::IdString_t getServiceId() const noexcept override;
capro::IdString_t getInstanceId() const noexcept override;
capro::IdString_t getEventId() const noexcept override;
Expand Down
48 changes: 23 additions & 25 deletions iceoryx_dds/include/iceoryx_dds/dds/data_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#ifndef IOX_DDS_DDS_DATA_READER_HPP
#define IOX_DDS_DDS_DATA_READER_HPP

#include "iceoryx_dds/dds/iox_chunk_datagram_header.hpp"
#include "iceoryx_posh/iceoryx_posh_types.hpp"
#include "iceoryx_utils/cxx/expected.hpp"
#include "iceoryx_utils/cxx/optional.hpp"
Expand All @@ -30,13 +31,20 @@ enum class DataReaderError : uint8_t
{
INVALID_STATE,
NOT_CONNECTED,
INVALID_RECV_BUFFER,
INVALID_DATAGRAM_HEADER_SIZE,
INVALID_BUFFER_PARAMETER_FOR_USER_HEADER,
INVALID_BUFFER_PARAMETER_FOR_USER_PAYLOAD,
INVALID_DATA,
RECV_BUFFER_TOO_SMALL
BUFFER_SIZE_MISMATCH
};

constexpr char DataReaderErrorString[][64] = {
"NOT_CONNECTED", "INVALID_RECV_BUFFER", "INVALID_DATA", "RECV_BUFFER_TOO_SMALL"};
constexpr const char* DataReaderErrorString[] = {"INVALID_STATE",
"NOT_CONNECTED",
"INVALID_DATAGRAM_HEADER_SIZE",
"INVALID_BUFFER_PARAMETER_FOR_USER_HEADER",
"INVALID_BUFFER_PARAMETER_FOR_USER_PAYLOAD",
"INVALID_DATA",
"BUFFER_SIZE_MISMATCH"};

class DataReader
{
Expand All @@ -47,37 +55,27 @@ class DataReader
virtual void connect() noexcept = 0;

///
/// @brief peekNextSize Get the size of the next sample if one is available.
/// @return The size of the next sample if one is available.
/// @brief peekNextIoxChunkDatagramHeader Get the IoxChunkDatagramHeader of the next sample if one is available.
/// @return The IoxChunkDatagramHeader of the next sample if one is available.
///
virtual iox::cxx::optional<uint32_t> peekNextSize() = 0;
virtual iox::cxx::optional<IoxChunkDatagramHeader> peekNextIoxChunkDatagramHeader() noexcept = 0;

///
/// @brief hasSamples Checks if new samples ready to take.
/// @return True if new samples available.
///
virtual bool hasSamples() = 0;
virtual bool hasSamples() noexcept = 0;

///
/// @brief take Take the next available sample from the DDS data space.
/// @param buffer Receive buffer in which sample will be stored.
/// @param bufferSize Size of the provided buffer.
/// @param datagramHeader with size information
/// @param userHeaderBuffer buffer for the user-header
/// @param userPayloadBuffer buffer for the user-payload
/// @return Error if unsuccessful.
///
virtual iox::cxx::expected<DataReaderError> takeNext(uint8_t* const buffer, const uint64_t& bufferSize) = 0;


///
/// @brief take Take up to a maximum number of samples from the DDS data space.
/// @param buffer Receive buffer in which samples will be stored.
/// @param bufferSize The size of the buffer (in bytes).
/// @param maxSamples The maximum number of samples to request from the network.
/// @return Number of samples taken if successful. Number of samples will be in the sange [0,maxSamples].
///
/// @note Sample size must be known ahead of time & can be checked using @ref peekNextSize() .
///
virtual iox::cxx::expected<uint64_t, DataReaderError>
take(uint8_t* const buffer, const uint64_t& bufferSize, const iox::cxx::optional<uint64_t>& maxSamples) = 0;
virtual iox::cxx::expected<DataReaderError> takeNext(const IoxChunkDatagramHeader datagramHeader,
uint8_t* const userHeaderBuffer,
uint8_t* const userPayloadBuffer) noexcept = 0;

///
/// @brief getServiceId
Expand All @@ -98,7 +96,7 @@ class DataReader
virtual capro::IdString_t getEventId() const noexcept = 0;

protected:
DataReader() = default;
DataReader() noexcept = default;
};
} // namespace dds
} // namespace iox
Expand Down
14 changes: 10 additions & 4 deletions iceoryx_dds/include/iceoryx_dds/dds/data_writer.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2021 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -17,6 +18,8 @@
#ifndef IOX_DDS_DDS_DATA_WRITER_HPP
#define IOX_DDS_DDS_DATA_WRITER_HPP

#include "iceoryx_dds/dds/iox_chunk_datagram_header.hpp"

#include "iceoryx_posh/iceoryx_posh_types.hpp"

#include <cstdint>
Expand All @@ -40,11 +43,14 @@ class DataWriter
virtual void connect() noexcept = 0;

///
/// @brief write Write the provided bytes on the DDS network on the topic: serviceId/instanceId/eventId
/// @param bytes
/// @param size
/// @brief write Write the provided header and bytes on the DDS network on the topic: serviceId/instanceId/eventId
/// @param datagramHeader with size information
/// @param userHeaderBytes buffer with the user-header
/// @param userPayloadBytes buffer with the user-payload
///
virtual void write(const uint8_t* const bytes, const uint64_t size) noexcept = 0;
virtual void write(iox::dds::IoxChunkDatagramHeader datagramHeader,
const uint8_t* const userHeaderBytes,
const uint8_t* const userPayloadBytes) noexcept = 0;

///
/// @brief getServiceId
Expand Down
77 changes: 77 additions & 0 deletions iceoryx_dds/include/iceoryx_dds/dds/iox_chunk_datagram_header.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) 2021 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

#ifndef IOX_DDS_DDS_IOX_CHUNK_DATAGRAM_HEADER_HPP
#define IOX_DDS_DDS_IOX_CHUNK_DATAGRAM_HEADER_HPP

#include "iceoryx_utils/cxx/vector.hpp"

#include <cstdint>

namespace iox
{
namespace dds
{
/// @brief the endianess of the serialized data
enum class Endianess : uint8_t
{
UNDEFINED,
LITTLE,
BIG,
MIXED,
};

constexpr const char* EndianessString[] = {"UNDEFINED", "LITTLE", "BIG", "MIXED"};

/// @brief Detects the endianness of the system
Endianess getEndianess();

/// @brief The datagram header with chunk metadata for user-header and user-payload
struct IoxChunkDatagramHeader
{
using Serialized_t = iox::cxx::vector<uint8_t, 16U>;

/// @brief Serializes a IoxChunkDatagramHeader into a vector of uint8_t
/// @param[in] datagramHeader to serialize
/// @return the serialized IoxChunkDatagramHeader
static Serialized_t serialize(const IoxChunkDatagramHeader& datagramHeader);

/// @brief Deserializes a vector of uint8_t into a IoxChunkDatagramHeader
/// @param[in] serializedDatagram is the serialized IoxChunkDatagramHeader
/// @return the deserialized IoxChunkDatagramHeader
static IoxChunkDatagramHeader deserialize(const Serialized_t& serializedDatagramHeader);

/// @brief From the 1.0 release onward, this must be incremented for each incompatible change, e.g.
/// - data width of members changes
/// - members are rearranged
/// - semantic meaning of a member changes
static constexpr uint8_t DATAGRAM_VERSION{1U};

/// @note This must always be the first member and always 1 bytes in order to prevent issues with endianess when
/// deserialized or incorrectly detected versions due to different size
uint8_t datagramVersion{DATAGRAM_VERSION};
/// @note This must always be 1 byte in order to prevent issues with endianess when deserialized
Endianess endianness{Endianess::UNDEFINED};
uint16_t userHeaderId{0xFFFF};
uint32_t userHeaderSize{0U};
uint32_t userPayloadSize{0U};
uint32_t userPayloadAlignment{0U};
};

} // namespace dds
} // namespace iox

#endif // IOX_DDS_DDS_IOX_CHUNK_DATAGRAM_HEADER_HPP
38 changes: 28 additions & 10 deletions iceoryx_dds/include/iceoryx_dds/internal/gateway/dds_to_iox.inl
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,34 @@ inline void DDS2IceoryxGateway<channel_t, gateway_t>::forward(const channel_t& c

while (reader->hasSamples())
{
reader->peekNextSize().and_then([&](auto size) {
publisher->loan(size).and_then([&](auto chunk) {
reader->takeNext(static_cast<uint8_t*>(chunk), size)
.and_then([&]() { publisher->publish(chunk); })
.or_else([&](DataReaderError err) {
publisher->release(chunk);
LogWarn() << "[DDS2IceoryxGateway] Encountered error reading from DDS network: "
<< dds::DataReaderErrorString[static_cast<uint8_t>(err)];
});
});
reader->peekNextIoxChunkDatagramHeader().and_then([&](auto datagramHeader) {
// this is safe, it is just used to check if the alignment doesn't exceed the
// alignment of the ChunkHeader but since this is data from a previously valid
// chunk, we can assume that the alignment was correct and use this value
constexpr uint32_t USER_HEADER_ALIGNMENT{1U};
publisher
->loan(datagramHeader.userPayloadSize,
datagramHeader.userPayloadAlignment,
datagramHeader.userHeaderSize,
USER_HEADER_ALIGNMENT)
.and_then([&](auto userPayload) {
auto chunkHeader = iox::mepoo::ChunkHeader::fromUserPayload(userPayload);
reader
->takeNext(datagramHeader,
static_cast<uint8_t*>(chunkHeader->userHeader()),
static_cast<uint8_t*>(chunkHeader->userPayload()))
.and_then([&]() { publisher->publish(userPayload); })
.or_else([&](DataReaderError err) {
publisher->release(userPayload);
LogWarn() << "[DDS2IceoryxGateway] Encountered error reading from DDS network: "
<< dds::DataReaderErrorString[static_cast<uint8_t>(err)];
});
})
.or_else([](auto& error) {
LogError() << "[DDS2IceoryxGateway] Could not loan chunk! Error code: "
<< static_cast<uint64_t>(error);
});
;
});
}
}
Expand Down
Loading

0 comments on commit 2ff7cbf

Please sign in to comment.