Skip to content

Commit

Permalink
added nunavut to the common lib
Browse files Browse the repository at this point in the history
  • Loading branch information
serges147 committed Dec 30, 2024
1 parent 7ce6fae commit 2585693
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 159 deletions.
43 changes: 29 additions & 14 deletions src/common/dsdl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,60 @@
#ifndef OCVSMD_COMMON_DSDL_HELPERS_HPP_INCLUDED
#define OCVSMD_COMMON_DSDL_HELPERS_HPP_INCLUDED

#include <cetl/pf17/cetlpf.hpp>
#include <cetl/pf20/cetlpf.hpp>

#include <array>
#include <cerrno>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <type_traits>

namespace ocvsmd
{
namespace common
{

template <typename Message, typename Result, std::size_t BufferSize, bool IsOnStack, typename Action>
static auto tryPerformOnSerialized(const Message& message, //
const cetl::pmr::memory_resource& memory,
Action&& action) -> std::enable_if_t<IsOnStack, Result>
template <typename Message>
static auto tryDeserializePayload(const cetl::span<const std::uint8_t> payload, Message& out_message)
{
// Not in use b/c we use stack buffer for small messages.
(void) memory;
return deserialize(out_message, {payload.data(), payload.size()});
}

template <typename Message, typename Result, std::size_t BufferSize, bool IsOnStack, typename Action>
static auto tryPerformOnSerialized(const Message& message, Action&& action) -> std::enable_if_t<IsOnStack, Result>
{
// Try to serialize the message to raw payload buffer.
//
// Next nolint b/c we use a buffer to serialize the message, so no need to zero it (and performance better).
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init,hicpp-member-init)
std::array<cetl::byte, BufferSize> buffer;
std::array<std::uint8_t, BufferSize> buffer;
//
const auto result_size = serialize( //
message,
// Next nolint & NOSONAR are currently unavoidable.
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
{reinterpret_cast<std::uint8_t*>(buffer.data()), BufferSize}); // NOSONAR cpp:S3630,
const auto result_size = serialize(message, {buffer.data(), buffer.size()});
if (!result_size)
{
return result_size.error();
}

const cetl::span<const std::uint8_t> bytes{buffer.data(), result_size.value()};
return std::forward<Action>(action)(bytes);
}

template <typename Message, typename Result, std::size_t BufferSize, bool IsOnStack, typename Action>
static auto tryPerformOnSerialized(const Message& message, Action&& action) -> std::enable_if_t<!IsOnStack, Result>
{
// Try to serialize the message to raw payload buffer.
//
using ArrayOfBytes = std::array<std::uint8_t, BufferSize>;
const std::unique_ptr<ArrayOfBytes> buffer{new ArrayOfBytes};
//
const auto result_size = serialize(message, {buffer->data(), buffer->size()});
if (!result_size)
{
return result_size.error();
}

const cetl::span<const cetl::byte> bytes{buffer.data(), result_size.value()};
const cetl::span<const std::uint8_t> bytes{buffer->data(), result_size.value()};
return std::forward<Action>(action)(bytes);
}

Expand Down
188 changes: 188 additions & 0 deletions src/common/ipc/unix_socket_base.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
//
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
//

#ifndef OCVSMD_COMMON_IPC_UNIX_SOCKET_BASE_HPP_INCLUDED
#define OCVSMD_COMMON_IPC_UNIX_SOCKET_BASE_HPP_INCLUDED

#include "dsdl_helpers.hpp"
#include "platform/posix_utils.hpp"

#include <nunavut/support/serialization.hpp>

#include <cstddef>
#include <sys/syslog.h>
#include <unistd.h>

namespace ocvsmd
{
namespace common
{
namespace ipc
{

class UnixSocketBase
{
public:
// Either POSIX errno or a Nunavut one.
using Failure = cetl::variant<int, nunavut::support::Error>;

protected:
template <typename Message>
static Failure writeMessage(const int output_fd, const Message& message)
{
constexpr std::size_t BufferSize = Message::_traits_::SerializationBufferSizeBytes;
constexpr bool IsOnStack = BufferSize <= MsgSmallPayloadSize;

return tryPerformOnSerialized<Message, Failure, BufferSize, IsOnStack>( //
message,
[output_fd](const auto msg_bytes_span) {
//
// 1. Write the message header.
if (const int err = platform::posixSyscallError([output_fd, msg_bytes_span] {
//
const MsgHeader msg_header{.signature = MsgSignature,
.size = static_cast<std::uint32_t>(msg_bytes_span.size())};
return ::write(output_fd, &msg_header, sizeof(msg_header));
}))
{
return err;
}
// 2. Write the message payload.
return platform::posixSyscallError([output_fd, msg_bytes_span] {
//
return ::write(output_fd, msg_bytes_span.data(), msg_bytes_span.size());
});
});
}

template <typename Action>
static int readAndActOnMessage(const int input_fd, Action&& action)
{
// 1. Read and validate the message header.
//
std::size_t msg_size = 0;
{
MsgHeader msg_header;
ssize_t bytes_read = 0;
if (const auto err = platform::posixSyscallError([input_fd, &msg_header, &bytes_read] {
//
return bytes_read = ::read(input_fd, &msg_header, sizeof(msg_header));
}))
{
::syslog(LOG_ERR, "Failed to read message header (fd=%d): %s", input_fd, ::strerror(err));
return err;
}

if ((bytes_read != sizeof(msg_header)) || (msg_header.signature != MsgSignature) //
|| (msg_header.size == 0) || (msg_header.size > MsgMaxSize))
{
return EINVAL;
}

msg_size = msg_header.size;
}

// 2. Read message payload.
//
auto read_and_act = [input_fd, act = std::forward<Action>(action)](const cetl::span<std::uint8_t> buf_span) {
//
ssize_t read = 0;
if (const auto err = platform::posixSyscallError([input_fd, buf_span, &read] {
//
return read = ::read(input_fd, buf_span.data(), buf_span.size());
}))
{
::syslog(LOG_ERR, "Failed to read message payload (fd=%d): %s", input_fd, ::strerror(err));
return err;
}
if (read != buf_span.size())
{
return EINVAL;
}

const cetl::span<const std::uint8_t> const_buf_span{buf_span};
return std::forward<Action>(act)(const_buf_span);
};
if (msg_size <= MsgSmallPayloadSize) // on stack buffer?
{
std::array<std::uint8_t, MsgSmallPayloadSize> buffer;
return read_and_act({buffer.data(), msg_size});
}
const std::unique_ptr<std::uint8_t[]> buffer{new std::uint8_t[msg_size]};
return read_and_act({buffer.get(), msg_size});
}

template <typename Message>
static Failure readMessage(const int input_fd, Message& message)
{
// 1. Read and validate the message header.
//
std::size_t msg_size = 0;
{
MsgHeader msg_header;
ssize_t bytes_read = 0;
if (const auto err = platform::posixSyscallError([input_fd, &msg_header, &bytes_read] {
//
return bytes_read = ::read(input_fd, &msg_header, sizeof(msg_header));
}))
{
return err;
}

// 2. Validate message header.
if ((bytes_read != sizeof(msg_header)) || (msg_header.signature != MsgSignature) //
|| (msg_header.size == 0) || (msg_header.size > MsgMaxSize))
{
return EINVAL;
}

msg_size = msg_header.size;
}

// 2. Deserialize message payload.
//
auto read_payload_and_deser_msg = [input_fd, &message](const cetl::span<std::uint8_t> buf_span) {
//
ssize_t read = 0;
if (const auto err = platform::posixSyscallError([input_fd, buf_span, &read] {
//
return read = ::read(input_fd, buf_span.data(), buf_span.size());
}))
{
return err;
}
if (read != buf_span.size())
{
return EINVAL;
}
return tryDeserializePayload({buf_span.data(), buf_span.size()}, message);
};
if (msg_size <= MsgSmallPayloadSize) // on stack buffer?
{
std::array<std::uint8_t, MsgSmallPayloadSize> buffer;
return read_payload_and_deser_msg({buffer.data(), msg_size});
}
const std::unique_ptr<std::uint8_t[]> buffer{new std::uint8_t[msg_size]};
return read_payload_and_deser_msg({buffer.get(), msg_size});
}

private:
struct MsgHeader
{
std::uint32_t signature;
std::uint32_t size;
};

static constexpr std::size_t MsgSmallPayloadSize = 256;
static constexpr std::uint32_t MsgSignature = 0x5356434F; // 'OCVS'
static constexpr std::size_t MsgMaxSize = 1ULL << 20ULL; // 1 MB

}; // UnixSocketBase

} // namespace ipc
} // namespace common
} // namespace ocvsmd

#endif // OCVSMD_COMMON_IPC_UNIX_SOCKET_BASE_HPP_INCLUDED
52 changes: 3 additions & 49 deletions src/common/ipc/unix_socket_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,16 @@

#include "unix_socket_client.hpp"

#include "dsdl_helpers.hpp"
#include "platform/posix_utils.hpp"

#include <nunavut/support/serialization.hpp>
#include <ocvsmd/common/dsdl/Foo_1_0.hpp>

#include <cetl/cetl.hpp>
#include <cetl/pf17/cetlpf.hpp>
#include <cetl/pf20/cetlpf.hpp>

#include <algorithm>
#include <array>
#include <cstddef>
#include <cstring>
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include <utility>
Expand All @@ -34,9 +26,8 @@ namespace common
namespace ipc
{

UnixSocketClient::UnixSocketClient(cetl::pmr::memory_resource& memory, std::string socket_path)
: memory_{memory}
, socket_path_{std::move(socket_path)}
UnixSocketClient::UnixSocketClient(std::string socket_path)
: socket_path_{std::move(socket_path)}
, client_fd_{-1}
{
}
Expand All @@ -52,7 +43,7 @@ UnixSocketClient::~UnixSocketClient()
}
}

bool UnixSocketClient::connect_to_server()
bool UnixSocketClient::connectToServer()
{
CETL_DEBUG_ASSERT(client_fd_ == -1, "");

Expand Down Expand Up @@ -89,43 +80,6 @@ bool UnixSocketClient::connect_to_server()
return true;
}

void UnixSocketClient::send_message(const dsdl::Foo_1_0& foo_message) const
{
using Failure = cetl::variant<int, nunavut::support::Error>;

tryPerformOnSerialized<dsdl::Foo_1_0,
Failure,
dsdl::Foo_1_0::_traits_::SerializationBufferSizeBytes,
true>(foo_message, memory_, [this](const cetl::span<const cetl::byte> msg_bytes) {
//
if (const auto err = platform::posixSyscallError([this, msg_bytes] {
//
return ::write(client_fd_, msg_bytes.data(), msg_bytes.size());
}))
{
std::cerr << "Failed to write: " << ::strerror(err) << "\n";
}
return 0;
});

constexpr std::size_t buf_size = 256;
std::array<char, buf_size> buffer{};
ssize_t bytes_read = 0;
if (const auto err = platform::posixSyscallError([this, &bytes_read, &buffer] {
//
return bytes_read = ::read(client_fd_, buffer.data(), buffer.size() - 1);
}))
{
std::cerr << "Failed to read: " << ::strerror(err) << "\n";
return;
}
if (bytes_read > 0)
{
buffer[bytes_read] = '\0'; // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index)
std::cout << "Received: " << buffer.data() << "\n";
}
}

} // namespace ipc
} // namespace common
} // namespace ocvsmd
Loading

0 comments on commit 2585693

Please sign in to comment.