From e618e5193de87da72463de2b231ac5b2c90ce32c Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Tue, 9 Jan 2024 09:11:27 +0000 Subject: [PATCH 1/2] Discard malformed traffic sent to node-to-node ports (#5889) --- CHANGELOG.md | 8 +++ src/enclave/enclave.h | 10 +++- src/host/node_connections.h | 35 +++++++++++- src/host/rpc_connections.h | 8 ++- src/host/socket.h | 6 +- src/host/tcp.h | 8 ++- tests/connections.py | 110 +++++++++++++++++++++++++++++++++++- 7 files changed, 175 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d8d4f9a68646..55c034225ca5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [5.0.0-dev12] + +[5.0.0-dev12]: https://github.com/microsoft/CCF/releases/tag/ccf-5.0.0-dev12 + +### Fixed + +- Nodes are now more robust to unexpected traffic on node-to-node ports (#5889). + ## [5.0.0-dev11] [5.0.0-dev11]: https://github.com/microsoft/CCF/releases/tag/ccf-5.0.0-dev11 diff --git a/src/enclave/enclave.h b/src/enclave/enclave.h index e328af3a30e3..26e7e329cc28 100644 --- a/src/enclave/enclave.h +++ b/src/enclave/enclave.h @@ -357,7 +357,15 @@ namespace ccf DISPATCHER_SET_MESSAGE_HANDLER( bp, ccf::node_inbound, [this](const uint8_t* data, size_t size) { - node->recv_node_inbound(data, size); + try + { + node->recv_node_inbound(data, size); + } + catch (const std::exception& e) + { + LOG_DEBUG_FMT( + "Ignoring node_inbound message due to exception: {}", e.what()); + } }); DISPATCHER_SET_MESSAGE_HANDLER( diff --git a/src/host/node_connections.h b/src/host/node_connections.h index 0a6f3c29f661..bdabf8824aa3 100644 --- a/src/host/node_connections.h +++ b/src/host/node_connections.h @@ -35,7 +35,7 @@ namespace asynchost node(node) {} - void on_read(size_t len, uint8_t*& incoming, sockaddr) + bool on_read(size_t len, uint8_t*& incoming, sockaddr) override { LOG_DEBUG_FMT( "from node {} received {} bytes", @@ -71,8 +71,35 @@ namespace asynchost } const auto size_pre_headers = size; - auto msg_type = serialized::read(data, size); - ccf::NodeId from = serialized::read(data, size); + + ccf::NodeMsgType msg_type; + try + { + msg_type = serialized::read(data, size); + } + catch (const std::exception& e) + { + LOG_DEBUG_FMT( + "Received invalid node-to-node traffic. Unable to read message " + "type ({}). Closing connection.", + e.what()); + return false; + } + + ccf::NodeId from; + try + { + from = serialized::read(data, size); + } + catch (const std::exception& e) + { + LOG_DEBUG_FMT( + "Received invalid node-to-node traffic. Unable to read sender " + "node ID ({}). Closing connection.", + e.what()); + return false; + } + const auto size_post_headers = size; const size_t payload_size = msg_size.value() - (size_pre_headers - size_post_headers); @@ -107,6 +134,8 @@ namespace asynchost { pending.erase(pending.begin(), pending.begin() + used); } + + return true; } virtual void associate_incoming(const ccf::NodeId&) {} diff --git a/src/host/rpc_connections.h b/src/host/rpc_connections.h index 0e81a3f5a418..b52f95e60e0d 100644 --- a/src/host/rpc_connections.h +++ b/src/host/rpc_connections.h @@ -116,7 +116,7 @@ namespace asynchost cleanup(); } - void on_read(size_t len, uint8_t*& data, sockaddr) override + bool on_read(size_t len, uint8_t*& data, sockaddr) override { LOG_DEBUG_FMT("rpc read {}: {}", id, len); @@ -125,6 +125,8 @@ namespace asynchost parent.to_enclave, id, serializer::ByteRange{data, len}); + + return true; } void on_disconnect() override @@ -195,7 +197,7 @@ namespace asynchost } } - void on_read(size_t len, uint8_t*& data, sockaddr addr) override + bool on_read(size_t len, uint8_t*& data, sockaddr addr) override { // UDP connections don't have clients, it's all done in the server if constexpr (isUDP()) @@ -211,6 +213,8 @@ namespace asynchost addr_data, serializer::ByteRange{data, len}); } + + return true; } void cleanup() diff --git a/src/host/socket.h b/src/host/socket.h index ce1d7e99acb0..5c3bc2d56a24 100644 --- a/src/host/socket.h +++ b/src/host/socket.h @@ -30,7 +30,11 @@ namespace asynchost virtual ~SocketBehaviour() {} /// To be implemented by clients - virtual void on_read(size_t, uint8_t*&, sockaddr) {} + /// Return false to immediately disconnect socket. + virtual bool on_read(size_t, uint8_t*&, sockaddr) + { + return true; + } /// To be implemented by servers with connections virtual void on_accept(ConnType&) {} diff --git a/src/host/tcp.h b/src/host/tcp.h index 7b2f51a1d6a5..18cf169dcafc 100644 --- a/src/host/tcp.h +++ b/src/host/tcp.h @@ -789,12 +789,18 @@ namespace asynchost } uint8_t* p = (uint8_t*)buf->base; - behaviour->on_read((size_t)sz, p, {}); + const bool read_good = behaviour->on_read((size_t)sz, p, {}); if (p != nullptr) { on_free(buf); } + + if (!read_good) + { + behaviour->on_disconnect(); + return; + } } static void on_write(uv_write_t* req, int) diff --git a/tests/connections.py b/tests/connections.py index 5560715effb5..ffeed6911d38 100644 --- a/tests/connections.py +++ b/tests/connections.py @@ -16,6 +16,8 @@ import functools import httpx import os +import socket +import struct from infra.snp import IS_SNP from loguru import logger as LOG @@ -51,7 +53,7 @@ def interface_caps(i): } -def run(args): +def run_connection_caps_tests(args): # Listen on additional RPC interfaces with even lower session caps for i, node_spec in enumerate(args.nodes): caps = interface_caps(i) @@ -262,14 +264,118 @@ def create_connections_until_exhaustion( LOG.warning("Expected a fatal crash and saw none!") +@contextlib.contextmanager +def node_tcp_socket(node): + interface = node.n2n_interface + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect((interface.host, interface.port)) + yield s + s.close() + + +def run_node_socket_robustness_tests(args): + with infra.network.network( + args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb + ) as network: + network.start_and_open(args) + + primary, _ = network.find_nodes() + + # Protocol is: + # - 4 byte message size N (remainder is not processed until this many bytes arrive) + # - 8 byte message type (valid values are only 0, 1, or 2) + # - Sender node ID (length-prefixed string), consisting of: + # - 8 byte string length S + # - S bytes of string content + # - Message body, of N - 16 - S bytes + # Note number serialization is little-endian! + + def encode_msg( + msg_type=0, + sender="OtherNode", + body=b"", + sender_len_override=None, + total_len_override=None, + ): + b_type = struct.pack(" Date: Tue, 9 Jan 2024 09:50:58 +0000 Subject: [PATCH 2/2] Generate scenario from trace (#5875) --- src/consensus/aft/test/driver.h | 7 ++-- tla/trace2scen.py | 73 +++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 4 deletions(-) create mode 100644 tla/trace2scen.py diff --git a/src/consensus/aft/test/driver.h b/src/consensus/aft/test/driver.h index 19f006832536..c5e23e833312 100644 --- a/src/consensus/aft/test/driver.h +++ b/src/consensus/aft/test/driver.h @@ -279,9 +279,7 @@ class RaftDriver if (_nodes.find(node_id) == _nodes.end()) { - throw std::runtime_error(fmt::format( - "Node {} does not exist yet. Use \"create_new_node, \"", - node_id)); + create_new_node(node_id_s); } configuration.try_emplace(node_id); @@ -1053,7 +1051,8 @@ class RaftDriver idx) << std::endl; throw std::runtime_error(fmt::format( - "Node not at expected commit idx ({}) on line {} : {}", + "Node {} not at expected commit idx ({}) on line {} : {}", + node_id, idx, std::to_string((int)lineno), _nodes.at(node_id).raft->get_committed_seqno())); diff --git a/tla/trace2scen.py b/tla/trace2scen.py new file mode 100644 index 000000000000..d67f85fc36d9 --- /dev/null +++ b/tla/trace2scen.py @@ -0,0 +1,73 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the Apache 2.0 License. + +import sys +import json +import os + +def comment(action): + return f"# {action['name']} {action['location']['module']}:{action['location']['beginLine']}" + +def term(ctx, pre): + return str(pre["currentTerm"][ctx['i']]) + +def noop(ctx, pre, post): + return ["# Noop"] + +MAP = { + "ClientRequest": lambda ctx, pre, post: ["replicate", term(ctx, pre), "42"], + "MCClientRequest": lambda ctx, pre, post: ["replicate", term(ctx, pre), "42"], + "CheckQuorum": lambda ctx, pre, post: ["periodic_one", ctx['i'], "110"], + "Timeout": lambda ctx, pre, post: ["periodic_one", ctx['i'], "110"], + "MCTimeout": lambda ctx, pre, post: ["periodic_one", ctx['i'], "110"], + "RequestVote": noop, + "AppendEntries": lambda _, __, ___: ["dispatch_all"], + "BecomeLeader": noop, + "SignCommittableMessages": lambda ctx, pre, post: ["emit_signature", term(ctx, pre)], + "MCSignCommittableMessages": lambda ctx, pre, post: ["emit_signature", term(ctx, pre)], + "ChangeConfigurationInt": lambda ctx, pre, post: ["replicate_new_configuration", term(ctx, pre), *ctx["newConfiguration"]], + "AdvanceCommitIndex": noop, + "HandleRequestVoteRequest": lambda _, __, ___: ["dispatch_all"], + "HandleRequestVoteResponse": noop, + "RejectAppendEntriesRequest": noop, + "ReturnToFollowerState": noop, + "AppendEntriesAlreadyDone": noop, + "RcvDropIgnoredMessage": noop, + "RcvUpdateTerm": noop, + "RcvRequestVoteRequest": noop, + "RcvRequestVoteResponse": noop, +} + +def post_commit(post): + return [["assert_commit_idx", node, str(idx)] for node, idx in post["commitIndex"].items()] + +def post_state(post): + entries = [] + for node, state in post["state"].items(): + if state == "Leader": + entries.append(["assert_is_primary", node]) + elif state == "Follower": + entries.append(["assert_is_backup", node]) + elif state == "Candidate": + entries.append(["assert_is_candidate", node]) + return entries + +def step_to_action(pre_state, action, post_state): + return os.linesep.join([ + comment(action), + ','.join(MAP[action['name']](action['context'], pre_state[1], post_state[1]))]) + +def asserts(pre_state, action, post_state, assert_gen): + return os.linesep.join([','.join(assertion) for assertion in assert_gen(post_state[1])]) + +if __name__ == "__main__": + with open(sys.argv[1]) as trace: + steps = json.load(trace)["action"] + initial_state = steps[0][0][1] + initial_node, = [node for node, log in initial_state["log"].items() if log] + print(f"start_node,{initial_node}") + print(f"emit_signature,2") + for step in steps: + print(step_to_action(*step)) + print(asserts(*step, post_state)) + print(asserts(*steps[-1], post_commit)) \ No newline at end of file