diff --git a/CHANGELOG.md b/CHANGELOG.md index d8d4f9a68646..55c034225ca5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [5.0.0-dev12] + +[5.0.0-dev12]: https://github.com/microsoft/CCF/releases/tag/ccf-5.0.0-dev12 + +### Fixed + +- Nodes are now more robust to unexpected traffic on node-to-node ports (#5889). + ## [5.0.0-dev11] [5.0.0-dev11]: https://github.com/microsoft/CCF/releases/tag/ccf-5.0.0-dev11 diff --git a/src/consensus/aft/test/driver.h b/src/consensus/aft/test/driver.h index 19f006832536..c5e23e833312 100644 --- a/src/consensus/aft/test/driver.h +++ b/src/consensus/aft/test/driver.h @@ -279,9 +279,7 @@ class RaftDriver if (_nodes.find(node_id) == _nodes.end()) { - throw std::runtime_error(fmt::format( - "Node {} does not exist yet. Use \"create_new_node, \"", - node_id)); + create_new_node(node_id_s); } configuration.try_emplace(node_id); @@ -1053,7 +1051,8 @@ class RaftDriver idx) << std::endl; throw std::runtime_error(fmt::format( - "Node not at expected commit idx ({}) on line {} : {}", + "Node {} not at expected commit idx ({}) on line {} : {}", + node_id, idx, std::to_string((int)lineno), _nodes.at(node_id).raft->get_committed_seqno())); diff --git a/src/enclave/enclave.h b/src/enclave/enclave.h index e328af3a30e3..26e7e329cc28 100644 --- a/src/enclave/enclave.h +++ b/src/enclave/enclave.h @@ -357,7 +357,15 @@ namespace ccf DISPATCHER_SET_MESSAGE_HANDLER( bp, ccf::node_inbound, [this](const uint8_t* data, size_t size) { - node->recv_node_inbound(data, size); + try + { + node->recv_node_inbound(data, size); + } + catch (const std::exception& e) + { + LOG_DEBUG_FMT( + "Ignoring node_inbound message due to exception: {}", e.what()); + } }); DISPATCHER_SET_MESSAGE_HANDLER( diff --git a/src/host/node_connections.h b/src/host/node_connections.h index 0a6f3c29f661..bdabf8824aa3 100644 --- a/src/host/node_connections.h +++ b/src/host/node_connections.h @@ -35,7 +35,7 @@ namespace asynchost node(node) {} - void on_read(size_t len, uint8_t*& incoming, sockaddr) + bool on_read(size_t len, uint8_t*& incoming, sockaddr) override { LOG_DEBUG_FMT( "from node {} received {} bytes", @@ -71,8 +71,35 @@ namespace asynchost } const auto size_pre_headers = size; - auto msg_type = serialized::read(data, size); - ccf::NodeId from = serialized::read(data, size); + + ccf::NodeMsgType msg_type; + try + { + msg_type = serialized::read(data, size); + } + catch (const std::exception& e) + { + LOG_DEBUG_FMT( + "Received invalid node-to-node traffic. Unable to read message " + "type ({}). Closing connection.", + e.what()); + return false; + } + + ccf::NodeId from; + try + { + from = serialized::read(data, size); + } + catch (const std::exception& e) + { + LOG_DEBUG_FMT( + "Received invalid node-to-node traffic. Unable to read sender " + "node ID ({}). Closing connection.", + e.what()); + return false; + } + const auto size_post_headers = size; const size_t payload_size = msg_size.value() - (size_pre_headers - size_post_headers); @@ -107,6 +134,8 @@ namespace asynchost { pending.erase(pending.begin(), pending.begin() + used); } + + return true; } virtual void associate_incoming(const ccf::NodeId&) {} diff --git a/src/host/rpc_connections.h b/src/host/rpc_connections.h index 0e81a3f5a418..b52f95e60e0d 100644 --- a/src/host/rpc_connections.h +++ b/src/host/rpc_connections.h @@ -116,7 +116,7 @@ namespace asynchost cleanup(); } - void on_read(size_t len, uint8_t*& data, sockaddr) override + bool on_read(size_t len, uint8_t*& data, sockaddr) override { LOG_DEBUG_FMT("rpc read {}: {}", id, len); @@ -125,6 +125,8 @@ namespace asynchost parent.to_enclave, id, serializer::ByteRange{data, len}); + + return true; } void on_disconnect() override @@ -195,7 +197,7 @@ namespace asynchost } } - void on_read(size_t len, uint8_t*& data, sockaddr addr) override + bool on_read(size_t len, uint8_t*& data, sockaddr addr) override { // UDP connections don't have clients, it's all done in the server if constexpr (isUDP()) @@ -211,6 +213,8 @@ namespace asynchost addr_data, serializer::ByteRange{data, len}); } + + return true; } void cleanup() diff --git a/src/host/socket.h b/src/host/socket.h index ce1d7e99acb0..5c3bc2d56a24 100644 --- a/src/host/socket.h +++ b/src/host/socket.h @@ -30,7 +30,11 @@ namespace asynchost virtual ~SocketBehaviour() {} /// To be implemented by clients - virtual void on_read(size_t, uint8_t*&, sockaddr) {} + /// Return false to immediately disconnect socket. + virtual bool on_read(size_t, uint8_t*&, sockaddr) + { + return true; + } /// To be implemented by servers with connections virtual void on_accept(ConnType&) {} diff --git a/src/host/tcp.h b/src/host/tcp.h index 7b2f51a1d6a5..18cf169dcafc 100644 --- a/src/host/tcp.h +++ b/src/host/tcp.h @@ -789,12 +789,18 @@ namespace asynchost } uint8_t* p = (uint8_t*)buf->base; - behaviour->on_read((size_t)sz, p, {}); + const bool read_good = behaviour->on_read((size_t)sz, p, {}); if (p != nullptr) { on_free(buf); } + + if (!read_good) + { + behaviour->on_disconnect(); + return; + } } static void on_write(uv_write_t* req, int) diff --git a/tests/connections.py b/tests/connections.py index 5560715effb5..ffeed6911d38 100644 --- a/tests/connections.py +++ b/tests/connections.py @@ -16,6 +16,8 @@ import functools import httpx import os +import socket +import struct from infra.snp import IS_SNP from loguru import logger as LOG @@ -51,7 +53,7 @@ def interface_caps(i): } -def run(args): +def run_connection_caps_tests(args): # Listen on additional RPC interfaces with even lower session caps for i, node_spec in enumerate(args.nodes): caps = interface_caps(i) @@ -262,14 +264,118 @@ def create_connections_until_exhaustion( LOG.warning("Expected a fatal crash and saw none!") +@contextlib.contextmanager +def node_tcp_socket(node): + interface = node.n2n_interface + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect((interface.host, interface.port)) + yield s + s.close() + + +def run_node_socket_robustness_tests(args): + with infra.network.network( + args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb + ) as network: + network.start_and_open(args) + + primary, _ = network.find_nodes() + + # Protocol is: + # - 4 byte message size N (remainder is not processed until this many bytes arrive) + # - 8 byte message type (valid values are only 0, 1, or 2) + # - Sender node ID (length-prefixed string), consisting of: + # - 8 byte string length S + # - S bytes of string content + # - Message body, of N - 16 - S bytes + # Note number serialization is little-endian! + + def encode_msg( + msg_type=0, + sender="OtherNode", + body=b"", + sender_len_override=None, + total_len_override=None, + ): + b_type = struct.pack("