Skip to content

Commit

Permalink
feat: Add variable size message sending and receiving through socket (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sitaowang1998 authored Dec 7, 2024
1 parent 1c8773a commit 411924e
Show file tree
Hide file tree
Showing 22 changed files with 462 additions and 29 deletions.
8 changes: 5 additions & 3 deletions src/spider/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
set(SPIDER_CORE_SOURCES
storage/MysqlStorage.cpp
worker/FunctionManager.cpp
io/msgpack_message.cpp
CACHE INTERNAL
"spider core source files"
)
Expand All @@ -12,9 +13,10 @@ set(SPIDER_CORE_HEADERS
core/Task.hpp
core/TaskGraph.hpp
core/JobMetadata.hpp
core/Serializer.hpp
core/BoostAsio.hpp
core/MsgPack.hpp
io/BoostAsio.hpp
io/MsgPack.hpp
io/msgpack_message.hpp
io/Serializer.hpp
storage/MetadataStorage.hpp
storage/DataStorage.hpp
storage/MysqlStorage.hpp
Expand Down
2 changes: 1 addition & 1 deletion src/spider/client/Data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <string>
#include <vector>

#include "../core/Serializer.hpp"
#include "../io/Serializer.hpp"

namespace spider {
class DataImpl;
Expand Down
2 changes: 1 addition & 1 deletion src/spider/client/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#include <functional>

#include "../core/Serializer.hpp"
#include "../io/Serializer.hpp"
#include "Data.hpp"
#include "type_utils.hpp"

Expand Down
4 changes: 2 additions & 2 deletions src/spider/core/Data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid.hpp>

#include "MsgPack.hpp" // IWYU pragma: keep
#include "Serializer.hpp" // IWYU pragma: keep
#include "../io/MsgPack.hpp" // IWYU pragma: keep
#include "../io/Serializer.hpp" // IWYU pragma: keep

namespace spider::core {
class Data {
Expand Down
17 changes: 12 additions & 5 deletions src/spider/core/BoostAsio.hpp → src/spider/io/BoostAsio.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,27 @@
#include <boost/asio/buffer.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>

#include <boost/asio/readable_pipe.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/writable_pipe.hpp>
#include <boost/system/system_error.hpp>
#include <boost/asio/impl/co_spawn.hpp>
#include <boost/asio/impl/connect_pipe.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/address.hpp>
#include <boost/asio/impl/connect.hpp>

#include <boost/asio/detached.hpp>
#include <boost/asio/impl/co_spawn.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>

#include <boost/asio/impl/write.hpp>
#include <boost/asio/impl/read.hpp>

#include <boost/asio/detached.hpp>
#include <boost/system/system_error.hpp>

// IWYU pragma: end_exports
// clang-format on
Expand Down
File renamed without changes.
File renamed without changes.
264 changes: 264 additions & 0 deletions src/spider/io/msgpack_message.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
#include "msgpack_message.hpp"

#include <netinet/in.h>

#include <bit>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <functional>
#include <optional>
#include <string_view>
#include <utility>
#include <vector>

#include <spdlog/spdlog.h>

#include "BoostAsio.hpp" // IWYU pragma: keep
#include "MsgPack.hpp" // IWYU pragma: keep

namespace {

/**
* Read the type of ext msgpack message.
*
* @param type
* @return std::nullopt if type is not an ext type, a pair otherwise,
* - Size of the next read
* - True if next read reads only the body size, false otherwise
*/
auto read_ext_type(char8_t const type) -> std::optional<std::pair<size_t, bool>> {
// NOLINTBEGIN(cppcoreguidelines-avoid-magic-numbers,readability-magic-numbers)
switch (type) {
case 0xd4:
return std::make_pair(2, false);
case 0xd5:
return std::make_pair(3, false);
case 0xd6:
return std::make_pair(5, false);
case 0xd7:
return std::make_pair(9, false);
case 0xd8:
return std::make_pair(17, false);
case 0xc7:
return std::make_pair(1, true);
case 0xc8:
return std::make_pair(2, true);
case 0xc9:
return std::make_pair(4, true);
default:
return std::nullopt;
}
// NOLINTEND(cppcoreguidelines-avoid-magic-numbers,readability-magic-numbers)
}

auto read_ext_body_size(std::u8string_view const body_size) -> std::optional<size_t> {
switch (body_size.size()) {
case 1:
return std::bit_cast<std::uint8_t>(body_size[0]);
case 2: {
std::uint16_t body_size_16 = 0;
memcpy(&body_size_16, body_size.data(), sizeof(std::uint16_t));
return ntohs(body_size_16);
}
case 4: {
std::uint32_t body_size_32 = 0;
memcpy(&body_size_32, body_size.data(), sizeof(std::uint32_t));
return ntohl(body_size_32);
}
default:
return std::nullopt;
}
}

} // namespace

namespace spider::core {

auto send_message(boost::asio::ip::tcp::socket& socket, msgpack::sbuffer const& buffer) -> bool {
msgpack::sbuffer message_buffer;
msgpack::packer packer{message_buffer};
packer.pack_ext(buffer.size(), msgpack::type::BIN);
packer.pack_ext_body(buffer.data(), buffer.size());
try {
size_t const size = boost::asio::write(
socket,
boost::asio::buffer(message_buffer.data(), message_buffer.size())
);
return size == message_buffer.size();
} catch (boost::system::system_error& e) {
if (boost::asio::error::eof != e.code()) {
spdlog::error("Cannot send message to socket {}: {}", e.code().value(), e.what());
}
return false;
}
}

auto send_message_async(
std::reference_wrapper<boost::asio::ip::tcp::socket> socket,
std::reference_wrapper<msgpack::sbuffer> buffer
) -> boost::asio::awaitable<bool> {
msgpack::sbuffer message_buffer;
msgpack::packer packer{message_buffer};
packer.pack_ext(buffer.get().size(), msgpack::type::BIN);
packer.pack_ext_body(buffer.get().data(), buffer.get().size());
auto const& [ec, size] = co_await boost::asio::async_write(
socket.get(),
boost::asio::buffer(message_buffer.data(), message_buffer.size()),
boost::asio::as_tuple(boost::asio::use_awaitable)
);
if (ec) {
if (boost::asio::error::eof != ec) {
spdlog::error("Cannot send message to socket {}: {}", ec.value(), ec.message());
}
co_return false;
}
co_return size == message_buffer.size();
}

auto receive_message(boost::asio::ip::tcp::socket& socket) -> std::optional<msgpack::sbuffer> {
try {
// Read header
char8_t header = 0;
boost::asio::read(socket, boost::asio::buffer(&header, sizeof(header)));
std::optional<std::pair<size_t, bool>> const optional_body_pair = read_ext_type(header);
if (false == optional_body_pair.has_value()) {
return std::nullopt;
}

// Read next
std::pair<size_t, bool> const body_pair = optional_body_pair.value();
std::vector<char8_t> body_size_vec(body_pair.first);
boost::asio::read(socket, boost::asio::buffer(body_size_vec));
if (false == body_pair.second) {
// Entire body read with type. Validate type to be bin.
if (body_size_vec[0] != msgpack::type::BIN) {
return std::nullopt;
}
msgpack::sbuffer buffer;
buffer.write(std::bit_cast<char*>(&body_size_vec[1]), body_size_vec.size() - 1);
return buffer;
}
std::optional<size_t> const optional_body_size
= read_ext_body_size(std::u8string_view{body_size_vec.data(), body_size_vec.size()}
);
if (false == optional_body_size.has_value()) {
return std::nullopt;
}
size_t const body_size = optional_body_size.value();

// Read body
std::vector<char8_t> body_vec(body_size + 1);
boost::asio::read(socket, boost::asio::buffer(body_vec));
// Validate type to be bin
if (body_vec[0] != msgpack::type::BIN) {
return std::nullopt;
}
msgpack::sbuffer buffer;
buffer.write(std::bit_cast<char*>(&body_vec[1]), body_vec.size() - 1);
return buffer;
} catch (boost::system::system_error& e) {
if (boost::asio::error::eof != e.code()) {
spdlog::error("Cannot read message from socket {}: {}", e.code().value(), e.what());
}
return std::nullopt;
}
}

auto receive_message_async(std::reference_wrapper<boost::asio::ip::tcp::socket> socket
) -> boost::asio::awaitable<std::optional<msgpack::sbuffer>> {
// Read header
char8_t header = 0;
// Suppress clang-tidy warning inside boost asio
// NOLINTNEXTLINE(clang-analyzer-core.NullDereference)
auto const& [header_ec, header_size] = co_await boost::asio::async_read(
socket.get(),
boost::asio::buffer(&header, sizeof(header)),
boost::asio::as_tuple(boost::asio::use_awaitable)
);
if (header_ec) {
if (boost::asio::error::eof != header_ec) {
spdlog::error(
"Cannot read message header from socket {}: {}",
header_ec.value(),
header_ec.message()
);
}
co_return std::nullopt;
}
if (header_size != sizeof(header)) {
co_return std::nullopt;
}
std::optional<std::pair<size_t, bool>> const optional_body_pair = read_ext_type(header);
if (false == optional_body_pair.has_value()) {
co_return std::nullopt;
}

// Read next
std::pair<size_t, bool> const body_pair = optional_body_pair.value();
std::vector<char8_t> body_size_vec(body_pair.first);
auto const& [body_size_ec, body_size_size] = co_await boost::asio::async_read(
socket.get(),
boost::asio::buffer(body_size_vec),
boost::asio::as_tuple(boost::asio::use_awaitable)
);
if (body_size_ec) {
if (boost::asio::error::eof != body_size_ec) {
spdlog::error(
"Cannot read message body size or body from socket {}: {}",
body_size_ec.value(),
body_size_ec.message()
);
}
co_return std::nullopt;
}
if (body_size_size != body_pair.first) {
co_return std::nullopt;
}
if (false == body_pair.second) {
// Entire body read with type. Validate type to be bin.
if (body_size_vec[0] != msgpack::type::BIN) {
co_return std::nullopt;
}
msgpack::sbuffer buffer;
buffer.write(std::bit_cast<char*>(&body_size_vec[1]), body_size_vec.size() - 1);
co_return buffer;
}
std::optional<size_t> const optional_body_size
= read_ext_body_size(std::u8string_view{body_size_vec.data(), body_size_vec.size()});
if (false == optional_body_size.has_value()) {
co_return std::nullopt;
}
size_t const body_size = optional_body_size.value();

// Read body
std::vector<char8_t> body_vec(body_size + 1);
auto const& [body_ec, body_read_size] = co_await boost::asio::async_read(
socket.get(),
boost::asio::buffer(body_vec),
boost::asio::as_tuple(boost::asio::use_awaitable)
);
if (body_ec) {
if (boost::asio::error::eof != body_size_ec) {
spdlog::error(
"Cannot read message body size or body from socket {}: {}",
body_ec.value(),
body_ec.message()
);
}
co_return std::nullopt;
}
if (body_read_size != body_size + 1) {
co_return std::nullopt;
}

// Validate type to be bin
if (body_vec[0] != msgpack::type::BIN) {
co_return std::nullopt;
}
msgpack::sbuffer buffer;
buffer.write(std::bit_cast<char*>(&body_vec[1]), body_vec.size() - 1);
co_return buffer;
}

} // namespace spider::core
26 changes: 26 additions & 0 deletions src/spider/io/msgpack_message.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#ifndef SPIDER_CORE_MSGPACKMESSAGE_HPP
#define SPIDER_CORE_MSGPACKMESSAGE_HPP

#include <functional>
#include <optional>

#include "BoostAsio.hpp" // IWYU pragma: keep
#include "MsgPack.hpp" // IWYU pragma :keep

namespace spider::core {

auto send_message(boost::asio::ip::tcp::socket& socket, msgpack::sbuffer const& buffer) -> bool;

auto send_message_async(
std::reference_wrapper<boost::asio::ip::tcp::socket> socket,
std::reference_wrapper<msgpack::sbuffer> buffer
) -> boost::asio::awaitable<bool>;

auto receive_message(boost::asio::ip::tcp::socket& socket) -> std::optional<msgpack::sbuffer>;

auto receive_message_async(std::reference_wrapper<boost::asio::ip::tcp::socket> socket
) -> boost::asio::awaitable<std::optional<msgpack::sbuffer>>;

} // namespace spider::core

#endif // SPIDER_CORE_MSGPACKMESSAGE_HPP
2 changes: 1 addition & 1 deletion src/spider/worker/FunctionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#include <boost/dll/alias.hpp>

#include "../core/MsgPack.hpp" // IWYU pragma: keep
#include "../io/MsgPack.hpp" // IWYU pragma: keep
#include "TaskExecutorMessage.hpp"

namespace spider::core {
Expand Down
2 changes: 1 addition & 1 deletion src/spider/worker/FunctionManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <absl/container/flat_hash_map.h>
#include <fmt/format.h>

#include "../core/MsgPack.hpp" // IWYU pragma: keep
#include "../io/MsgPack.hpp" // IWYU pragma: keep
#include "TaskExecutorMessage.hpp"

// NOLINTBEGIN(cppcoreguidelines-macro-usage)
Expand Down
Loading

0 comments on commit 411924e

Please sign in to comment.