diff --git a/src/spider/CMakeLists.txt b/src/spider/CMakeLists.txt index 7a0a360..b4a0576 100644 --- a/src/spider/CMakeLists.txt +++ b/src/spider/CMakeLists.txt @@ -2,6 +2,7 @@ set(SPIDER_CORE_SOURCES storage/MysqlStorage.cpp worker/FunctionManager.cpp + io/msgpack_message.cpp CACHE INTERNAL "spider core source files" ) @@ -12,9 +13,10 @@ set(SPIDER_CORE_HEADERS core/Task.hpp core/TaskGraph.hpp core/JobMetadata.hpp - core/Serializer.hpp - core/BoostAsio.hpp - core/MsgPack.hpp + io/BoostAsio.hpp + io/MsgPack.hpp + io/msgpack_message.hpp + io/Serializer.hpp storage/MetadataStorage.hpp storage/DataStorage.hpp storage/MysqlStorage.hpp diff --git a/src/spider/client/Data.hpp b/src/spider/client/Data.hpp index ba162c0..8ee4181 100644 --- a/src/spider/client/Data.hpp +++ b/src/spider/client/Data.hpp @@ -6,7 +6,7 @@ #include #include -#include "../core/Serializer.hpp" +#include "../io/Serializer.hpp" namespace spider { class DataImpl; diff --git a/src/spider/client/task.hpp b/src/spider/client/task.hpp index 0cecede..c89cb0a 100644 --- a/src/spider/client/task.hpp +++ b/src/spider/client/task.hpp @@ -3,7 +3,7 @@ #include -#include "../core/Serializer.hpp" +#include "../io/Serializer.hpp" #include "Data.hpp" #include "type_utils.hpp" diff --git a/src/spider/core/Data.hpp b/src/spider/core/Data.hpp index 9b716c6..40100e4 100644 --- a/src/spider/core/Data.hpp +++ b/src/spider/core/Data.hpp @@ -9,8 +9,8 @@ #include #include -#include "MsgPack.hpp" // IWYU pragma: keep -#include "Serializer.hpp" // IWYU pragma: keep +#include "../io/MsgPack.hpp" // IWYU pragma: keep +#include "../io/Serializer.hpp" // IWYU pragma: keep namespace spider::core { class Data { diff --git a/src/spider/core/BoostAsio.hpp b/src/spider/io/BoostAsio.hpp similarity index 85% rename from src/spider/core/BoostAsio.hpp rename to src/spider/io/BoostAsio.hpp index c5cc792..5931b39 100644 --- a/src/spider/core/BoostAsio.hpp +++ b/src/spider/io/BoostAsio.hpp @@ -11,20 +11,27 @@ #include #include #include +#include + #include -#include #include -#include -#include #include +#include + +#include +#include +#include + +#include +#include #include +#include #include -#include #include #include -#include +#include // IWYU pragma: end_exports // clang-format on diff --git a/src/spider/core/MsgPack.hpp b/src/spider/io/MsgPack.hpp similarity index 100% rename from src/spider/core/MsgPack.hpp rename to src/spider/io/MsgPack.hpp diff --git a/src/spider/core/Serializer.hpp b/src/spider/io/Serializer.hpp similarity index 100% rename from src/spider/core/Serializer.hpp rename to src/spider/io/Serializer.hpp diff --git a/src/spider/io/msgpack_message.cpp b/src/spider/io/msgpack_message.cpp new file mode 100644 index 0000000..4750eda --- /dev/null +++ b/src/spider/io/msgpack_message.cpp @@ -0,0 +1,264 @@ +#include "msgpack_message.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "BoostAsio.hpp" // IWYU pragma: keep +#include "MsgPack.hpp" // IWYU pragma: keep + +namespace { + +/** + * Read the type of ext msgpack message. + * + * @param type + * @return std::nullopt if type is not an ext type, a pair otherwise, + * - Size of the next read + * - True if next read reads only the body size, false otherwise + */ +auto read_ext_type(char8_t const type) -> std::optional> { + // NOLINTBEGIN(cppcoreguidelines-avoid-magic-numbers,readability-magic-numbers) + switch (type) { + case 0xd4: + return std::make_pair(2, false); + case 0xd5: + return std::make_pair(3, false); + case 0xd6: + return std::make_pair(5, false); + case 0xd7: + return std::make_pair(9, false); + case 0xd8: + return std::make_pair(17, false); + case 0xc7: + return std::make_pair(1, true); + case 0xc8: + return std::make_pair(2, true); + case 0xc9: + return std::make_pair(4, true); + default: + return std::nullopt; + } + // NOLINTEND(cppcoreguidelines-avoid-magic-numbers,readability-magic-numbers) +} + +auto read_ext_body_size(std::u8string_view const body_size) -> std::optional { + switch (body_size.size()) { + case 1: + return std::bit_cast(body_size[0]); + case 2: { + std::uint16_t body_size_16 = 0; + memcpy(&body_size_16, body_size.data(), sizeof(std::uint16_t)); + return ntohs(body_size_16); + } + case 4: { + std::uint32_t body_size_32 = 0; + memcpy(&body_size_32, body_size.data(), sizeof(std::uint32_t)); + return ntohl(body_size_32); + } + default: + return std::nullopt; + } +} + +} // namespace + +namespace spider::core { + +auto send_message(boost::asio::ip::tcp::socket& socket, msgpack::sbuffer const& buffer) -> bool { + msgpack::sbuffer message_buffer; + msgpack::packer packer{message_buffer}; + packer.pack_ext(buffer.size(), msgpack::type::BIN); + packer.pack_ext_body(buffer.data(), buffer.size()); + try { + size_t const size = boost::asio::write( + socket, + boost::asio::buffer(message_buffer.data(), message_buffer.size()) + ); + return size == message_buffer.size(); + } catch (boost::system::system_error& e) { + if (boost::asio::error::eof != e.code()) { + spdlog::error("Cannot send message to socket {}: {}", e.code().value(), e.what()); + } + return false; + } +} + +auto send_message_async( + std::reference_wrapper socket, + std::reference_wrapper buffer +) -> boost::asio::awaitable { + msgpack::sbuffer message_buffer; + msgpack::packer packer{message_buffer}; + packer.pack_ext(buffer.get().size(), msgpack::type::BIN); + packer.pack_ext_body(buffer.get().data(), buffer.get().size()); + auto const& [ec, size] = co_await boost::asio::async_write( + socket.get(), + boost::asio::buffer(message_buffer.data(), message_buffer.size()), + boost::asio::as_tuple(boost::asio::use_awaitable) + ); + if (ec) { + if (boost::asio::error::eof != ec) { + spdlog::error("Cannot send message to socket {}: {}", ec.value(), ec.message()); + } + co_return false; + } + co_return size == message_buffer.size(); +} + +auto receive_message(boost::asio::ip::tcp::socket& socket) -> std::optional { + try { + // Read header + char8_t header = 0; + boost::asio::read(socket, boost::asio::buffer(&header, sizeof(header))); + std::optional> const optional_body_pair = read_ext_type(header); + if (false == optional_body_pair.has_value()) { + return std::nullopt; + } + + // Read next + std::pair const body_pair = optional_body_pair.value(); + std::vector body_size_vec(body_pair.first); + boost::asio::read(socket, boost::asio::buffer(body_size_vec)); + if (false == body_pair.second) { + // Entire body read with type. Validate type to be bin. + if (body_size_vec[0] != msgpack::type::BIN) { + return std::nullopt; + } + msgpack::sbuffer buffer; + buffer.write(std::bit_cast(&body_size_vec[1]), body_size_vec.size() - 1); + return buffer; + } + std::optional const optional_body_size + = read_ext_body_size(std::u8string_view{body_size_vec.data(), body_size_vec.size()} + ); + if (false == optional_body_size.has_value()) { + return std::nullopt; + } + size_t const body_size = optional_body_size.value(); + + // Read body + std::vector body_vec(body_size + 1); + boost::asio::read(socket, boost::asio::buffer(body_vec)); + // Validate type to be bin + if (body_vec[0] != msgpack::type::BIN) { + return std::nullopt; + } + msgpack::sbuffer buffer; + buffer.write(std::bit_cast(&body_vec[1]), body_vec.size() - 1); + return buffer; + } catch (boost::system::system_error& e) { + if (boost::asio::error::eof != e.code()) { + spdlog::error("Cannot read message from socket {}: {}", e.code().value(), e.what()); + } + return std::nullopt; + } +} + +auto receive_message_async(std::reference_wrapper socket +) -> boost::asio::awaitable> { + // Read header + char8_t header = 0; + // Suppress clang-tidy warning inside boost asio + // NOLINTNEXTLINE(clang-analyzer-core.NullDereference) + auto const& [header_ec, header_size] = co_await boost::asio::async_read( + socket.get(), + boost::asio::buffer(&header, sizeof(header)), + boost::asio::as_tuple(boost::asio::use_awaitable) + ); + if (header_ec) { + if (boost::asio::error::eof != header_ec) { + spdlog::error( + "Cannot read message header from socket {}: {}", + header_ec.value(), + header_ec.message() + ); + } + co_return std::nullopt; + } + if (header_size != sizeof(header)) { + co_return std::nullopt; + } + std::optional> const optional_body_pair = read_ext_type(header); + if (false == optional_body_pair.has_value()) { + co_return std::nullopt; + } + + // Read next + std::pair const body_pair = optional_body_pair.value(); + std::vector body_size_vec(body_pair.first); + auto const& [body_size_ec, body_size_size] = co_await boost::asio::async_read( + socket.get(), + boost::asio::buffer(body_size_vec), + boost::asio::as_tuple(boost::asio::use_awaitable) + ); + if (body_size_ec) { + if (boost::asio::error::eof != body_size_ec) { + spdlog::error( + "Cannot read message body size or body from socket {}: {}", + body_size_ec.value(), + body_size_ec.message() + ); + } + co_return std::nullopt; + } + if (body_size_size != body_pair.first) { + co_return std::nullopt; + } + if (false == body_pair.second) { + // Entire body read with type. Validate type to be bin. + if (body_size_vec[0] != msgpack::type::BIN) { + co_return std::nullopt; + } + msgpack::sbuffer buffer; + buffer.write(std::bit_cast(&body_size_vec[1]), body_size_vec.size() - 1); + co_return buffer; + } + std::optional const optional_body_size + = read_ext_body_size(std::u8string_view{body_size_vec.data(), body_size_vec.size()}); + if (false == optional_body_size.has_value()) { + co_return std::nullopt; + } + size_t const body_size = optional_body_size.value(); + + // Read body + std::vector body_vec(body_size + 1); + auto const& [body_ec, body_read_size] = co_await boost::asio::async_read( + socket.get(), + boost::asio::buffer(body_vec), + boost::asio::as_tuple(boost::asio::use_awaitable) + ); + if (body_ec) { + if (boost::asio::error::eof != body_size_ec) { + spdlog::error( + "Cannot read message body size or body from socket {}: {}", + body_ec.value(), + body_ec.message() + ); + } + co_return std::nullopt; + } + if (body_read_size != body_size + 1) { + co_return std::nullopt; + } + + // Validate type to be bin + if (body_vec[0] != msgpack::type::BIN) { + co_return std::nullopt; + } + msgpack::sbuffer buffer; + buffer.write(std::bit_cast(&body_vec[1]), body_vec.size() - 1); + co_return buffer; +} + +} // namespace spider::core diff --git a/src/spider/io/msgpack_message.hpp b/src/spider/io/msgpack_message.hpp new file mode 100644 index 0000000..d8d9929 --- /dev/null +++ b/src/spider/io/msgpack_message.hpp @@ -0,0 +1,26 @@ +#ifndef SPIDER_CORE_MSGPACKMESSAGE_HPP +#define SPIDER_CORE_MSGPACKMESSAGE_HPP + +#include +#include + +#include "BoostAsio.hpp" // IWYU pragma: keep +#include "MsgPack.hpp" // IWYU pragma :keep + +namespace spider::core { + +auto send_message(boost::asio::ip::tcp::socket& socket, msgpack::sbuffer const& buffer) -> bool; + +auto send_message_async( + std::reference_wrapper socket, + std::reference_wrapper buffer +) -> boost::asio::awaitable; + +auto receive_message(boost::asio::ip::tcp::socket& socket) -> std::optional; + +auto receive_message_async(std::reference_wrapper socket +) -> boost::asio::awaitable>; + +} // namespace spider::core + +#endif // SPIDER_CORE_MSGPACKMESSAGE_HPP diff --git a/src/spider/worker/FunctionManager.cpp b/src/spider/worker/FunctionManager.cpp index 28d7e0c..a155bca 100644 --- a/src/spider/worker/FunctionManager.cpp +++ b/src/spider/worker/FunctionManager.cpp @@ -6,7 +6,7 @@ #include -#include "../core/MsgPack.hpp" // IWYU pragma: keep +#include "../io/MsgPack.hpp" // IWYU pragma: keep #include "TaskExecutorMessage.hpp" namespace spider::core { diff --git a/src/spider/worker/FunctionManager.hpp b/src/spider/worker/FunctionManager.hpp index 8ab8498..7818bd6 100644 --- a/src/spider/worker/FunctionManager.hpp +++ b/src/spider/worker/FunctionManager.hpp @@ -15,7 +15,7 @@ #include #include -#include "../core/MsgPack.hpp" // IWYU pragma: keep +#include "../io/MsgPack.hpp" // IWYU pragma: keep #include "TaskExecutorMessage.hpp" // NOLINTBEGIN(cppcoreguidelines-macro-usage) diff --git a/src/spider/worker/TaskExecutor.cpp b/src/spider/worker/TaskExecutor.cpp index bd5b33c..d32819d 100644 --- a/src/spider/worker/TaskExecutor.cpp +++ b/src/spider/worker/TaskExecutor.cpp @@ -8,8 +8,8 @@ #include #include -#include "../core/BoostAsio.hpp" // IWYU pragma: keep -#include "../core/MsgPack.hpp" // IWYU pragma: keep +#include "../io/BoostAsio.hpp" // IWYU pragma: keep +#include "../io/MsgPack.hpp" // IWYU pragma: keep #include "FunctionManager.hpp" #include "message_pipe.hpp" #include "TaskExecutorMessage.hpp" diff --git a/src/spider/worker/TaskExecutor.hpp b/src/spider/worker/TaskExecutor.hpp index bef4e14..1c9a4dd 100644 --- a/src/spider/worker/TaskExecutor.hpp +++ b/src/spider/worker/TaskExecutor.hpp @@ -16,8 +16,8 @@ #include #include -#include "../core/BoostAsio.hpp" // IWYU pragma: keep -#include "../core/MsgPack.hpp" // IWYU pragma: keep +#include "../io/BoostAsio.hpp" // IWYU pragma: keep +#include "../io/MsgPack.hpp" // IWYU pragma: keep #include "FunctionManager.hpp" #include "message_pipe.hpp" diff --git a/src/spider/worker/TaskExecutorMessage.hpp b/src/spider/worker/TaskExecutorMessage.hpp index da278f7..234fbfc 100644 --- a/src/spider/worker/TaskExecutorMessage.hpp +++ b/src/spider/worker/TaskExecutorMessage.hpp @@ -3,7 +3,7 @@ #include -#include "../core/MsgPack.hpp" // IWYU pragma: keep +#include "../io/MsgPack.hpp" // IWYU pragma: keep namespace spider::worker { enum class TaskExecutorResponseType : std::uint8_t { diff --git a/src/spider/worker/message_pipe.cpp b/src/spider/worker/message_pipe.cpp index bb1f269..69c0c42 100644 --- a/src/spider/worker/message_pipe.cpp +++ b/src/spider/worker/message_pipe.cpp @@ -11,8 +11,8 @@ #include #include -#include "../core/BoostAsio.hpp" // IWYU pragma: keep -#include "../core/MsgPack.hpp" // IWYU pragma: keep +#include "../io/BoostAsio.hpp" // IWYU pragma: keep +#include "../io/MsgPack.hpp" // IWYU pragma: keep namespace spider::worker { diff --git a/src/spider/worker/message_pipe.hpp b/src/spider/worker/message_pipe.hpp index ca6a667..549f563 100644 --- a/src/spider/worker/message_pipe.hpp +++ b/src/spider/worker/message_pipe.hpp @@ -4,8 +4,8 @@ #include #include -#include "../core/BoostAsio.hpp" // IWYU pragma: keep -#include "../core/MsgPack.hpp" // IWYU pragma: keep +#include "../io/BoostAsio.hpp" // IWYU pragma: keep +#include "../io/MsgPack.hpp" // IWYU pragma: keep namespace spider::worker { diff --git a/src/spider/worker/task_executor.cpp b/src/spider/worker/task_executor.cpp index 78460db..796576d 100644 --- a/src/spider/worker/task_executor.cpp +++ b/src/spider/worker/task_executor.cpp @@ -16,8 +16,8 @@ #include // IWYU pragma: keep #include -#include "../core/BoostAsio.hpp" // IWYU pragma: keep -#include "../core/MsgPack.hpp" // IWYU pragma: keep +#include "../io/BoostAsio.hpp" // IWYU pragma: keep +#include "../io/MsgPack.hpp" // IWYU pragma: keep #include "DllLoader.hpp" #include "FunctionManager.hpp" #include "message_pipe.hpp" diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index bf5c0e5..9d81394 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -8,6 +8,7 @@ set(SPIDER_TEST_SOURCES worker/test-FunctionManager.cpp worker/test-MessagePipe.cpp worker/test-TaskExecutor.cpp + io/test-MsgpackMessage.cpp scheduler/test-SchedulerPolicy.cpp CACHE INTERNAL "spider test source files" diff --git a/tests/io/test-MsgpackMessage.cpp b/tests/io/test-MsgpackMessage.cpp new file mode 100644 index 0000000..e01c8dd --- /dev/null +++ b/tests/io/test-MsgpackMessage.cpp @@ -0,0 +1,133 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "../../src/spider/io/BoostAsio.hpp" // IWYU pragma: keep +#include "../../src/spider/io/MsgPack.hpp" // IWYU pragma: keep +#include "../../src/spider/io/msgpack_message.hpp" + +namespace { + +using namespace boost::asio::ip; + +constexpr std::array cBufferSizes{1, 2, 3, 4, 5, 6, 7, 8, 9, 17, 257, 65'537}; + +// NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) +TEST_CASE("Sync socket msgpack", "[io]") { + boost::asio::io_context context; + // Create server acceptor + tcp::endpoint const local_endpoint{address::from_string("127.0.0.1"), 0}; + tcp::acceptor acceptor{context, local_endpoint}; + + std::thread server_thread([&acceptor, &context]() { + // Create server socket + tcp::socket socket{context}; + acceptor.accept(socket); + + // NOLINTBEGIN(clang-analyzer-unix.Malloc) + for (size_t const buffer_size : cBufferSizes) { + std::optional const optional_buffer + = spider::core::receive_message(socket); + REQUIRE(optional_buffer.has_value()); + if (optional_buffer.has_value()) { + msgpack::sbuffer const& buffer = optional_buffer.value(); + REQUIRE(buffer_size == buffer.size()); + for (size_t i = 0; i < buffer.size(); ++i) { + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + REQUIRE(i % 256 == std::bit_cast(buffer.data()[i])); + } + } + } + // NOLINTEND(clang-analyzer-unix.Malloc) + }); + + // Create client socket + tcp::socket socket(context); + boost::asio::connect( + socket, + std::vector{tcp::endpoint{ + address::from_string("127.0.0.1"), + acceptor.local_endpoint().port() + }} + ); + + for (size_t const buffer_size : cBufferSizes) { + msgpack::sbuffer buffer; + for (size_t i = 0; i < buffer_size; ++i) { + // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions) + char const value = i % 256; + buffer.write(&value, sizeof(value)); + } + REQUIRE(spider::core::send_message(socket, buffer)); + } + server_thread.join(); +} + +TEST_CASE("Async socket msgpack", "[io]") { + boost::asio::io_context context; + // Create server acceptor + tcp::endpoint const local_endpoint{address::from_string("127.0.0.1"), 0}; + tcp::acceptor acceptor{context, local_endpoint}; + + // Create client socket + tcp::socket client_socket(context); + boost::asio::connect( + client_socket, + std::vector{tcp::endpoint{ + address::from_string("127.0.0.1"), + acceptor.local_endpoint().port() + }} + ); + + // Create server socket + tcp::socket server_socket{context}; + acceptor.accept(server_socket); + + for (size_t const buffer_size : cBufferSizes) { + msgpack::sbuffer client_buffer; + for (size_t i = 0; i < buffer_size; ++i) { + // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions) + char const value = i % 256; + client_buffer.write(&value, sizeof(value)); + } + std::future client_future = boost::asio::co_spawn( + context, + spider::core::send_message_async(client_socket, client_buffer), + boost::asio::use_future + ); + std::future> server_future = boost::asio::co_spawn( + context, + spider::core::receive_message_async(server_socket), + boost::asio::use_future + ); + + context.run(); + REQUIRE(client_future.wait_for(std::chrono::seconds(5)) == std::future_status::ready); + REQUIRE(server_future.wait_for(std::chrono::seconds(5)) == std::future_status::ready); + context.restart(); + + REQUIRE(client_future.get()); + std::optional const& optional_result_buffer = server_future.get(); + REQUIRE(optional_result_buffer.has_value()); + if (optional_result_buffer.has_value()) { + msgpack::sbuffer const& result_buffer = optional_result_buffer.value(); + REQUIRE(buffer_size == result_buffer.size()); + for (size_t i = 0; i < result_buffer.size(); ++i) { + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + REQUIRE(i % 256 == std::bit_cast(result_buffer.data()[i])); + } + } + } +} + +// NOLINTEND(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) + +} // namespace diff --git a/tests/worker/test-FunctionManager.cpp b/tests/worker/test-FunctionManager.cpp index 91a3d68..a1b109b 100644 --- a/tests/worker/test-FunctionManager.cpp +++ b/tests/worker/test-FunctionManager.cpp @@ -6,7 +6,7 @@ #include #include "../../src/spider/core/Data.hpp" -#include "../../src/spider/core/MsgPack.hpp" // IWYU pragma: keep +#include "../../src/spider/io/MsgPack.hpp" // IWYU pragma: keep #include "../../src/spider/worker/FunctionManager.hpp" namespace { diff --git a/tests/worker/test-MessagePipe.cpp b/tests/worker/test-MessagePipe.cpp index 130467e..5c370ac 100644 --- a/tests/worker/test-MessagePipe.cpp +++ b/tests/worker/test-MessagePipe.cpp @@ -6,8 +6,8 @@ #include -#include "../../src/spider/core/BoostAsio.hpp" -#include "../../src/spider/core/MsgPack.hpp" // IWYU pragma: keep +#include "../../src/spider/io/BoostAsio.hpp" +#include "../../src/spider/io/MsgPack.hpp" // IWYU pragma: keep #include "../../src/spider/worker/FunctionManager.hpp" #include "../../src/spider/worker/message_pipe.hpp" #include "../../src/spider/worker/TaskExecutorMessage.hpp" diff --git a/tests/worker/test-TaskExecutor.cpp b/tests/worker/test-TaskExecutor.cpp index aa7075c..3607078 100644 --- a/tests/worker/test-TaskExecutor.cpp +++ b/tests/worker/test-TaskExecutor.cpp @@ -10,7 +10,7 @@ #include #include -#include "../../src/spider/core/BoostAsio.hpp" // IWYU pragma: keep +#include "../../src/spider/io/BoostAsio.hpp" // IWYU pragma: keep #include "../../src/spider/worker/FunctionManager.hpp" #include "../../src/spider/worker/TaskExecutor.hpp"