Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add and enforce specification invariant CommittableIndicesAreKnownSignaturesInv #5764

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions src/consensus/aft/test/committable_suffix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
#define DOCTEST_CONFIG_NO_SHORT_MACRO_NAMES
#include <doctest/doctest.h>

using AllSigsStore = aft::LoggingStubStoreSig;
using AllSigsAdaptor = aft::Adaptor<AllSigsStore>;

void keep_messages_for_multiple(
const std::set<ccf::NodeId>& targets,
aft::ChannelStubProxy::MessageList& messages,
Expand Down Expand Up @@ -85,10 +82,10 @@ void keep_earliest_append_entries_for_each_target(

#define TEST_DECLARE_NODE(N) \
ccf::NodeId node_id##N(#N); \
auto store##N = std::make_shared<AllSigsStore>(node_id##N); \
auto store##N = std::make_shared<Store>(node_id##N); \
TRaft r##N( \
raft_settings, \
std::make_unique<AllSigsAdaptor>(store##N), \
std::make_unique<Adaptor>(store##N), \
std::make_unique<aft::LedgerStubProxy>(node_id##N), \
std::make_shared<aft::ChannelStubProxy>(), \
std::make_shared<aft::State>(node_id##N), \
Expand Down
12 changes: 6 additions & 6 deletions src/consensus/aft/test/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ struct LedgerStubProxy_Mermaid : public aft::LedgerStubProxy
}
};

struct LoggingStubStoreSig_Mermaid : public aft::LoggingStubStoreSigConfig
struct LoggingStubStore_Mermaid : public aft::LoggingStubStoreConfig
{
using LoggingStubStoreSigConfig::LoggingStubStoreSigConfig;
using LoggingStubStoreConfig::LoggingStubStoreConfig;

void compact(aft::Index idx) override
{
RAFT_DRIVER_OUT << fmt::format(
" {}->>{}: [KV] compacting to {}", _id, _id, idx)
<< std::endl;
aft::LoggingStubStoreSigConfig::compact(idx);
aft::LoggingStubStoreConfig::compact(idx);
}

void rollback(const kv::TxID& tx_id, aft::Term t) override
Expand All @@ -85,21 +85,21 @@ struct LoggingStubStoreSig_Mermaid : public aft::LoggingStubStoreSigConfig
tx_id.version,
t)
<< std::endl;
aft::LoggingStubStoreSigConfig::rollback(tx_id, t);
aft::LoggingStubStoreConfig::rollback(tx_id, t);
}

void initialise_term(aft::Term t) override
{
RAFT_DRIVER_OUT << fmt::format(
" {}->>{}: [KV] initialising in term {}", _id, _id, t)
<< std::endl;
aft::LoggingStubStoreSigConfig::initialise_term(t);
aft::LoggingStubStoreConfig::initialise_term(t);
}
};

using ms = std::chrono::milliseconds;
using TRaft = aft::Aft<LedgerStubProxy_Mermaid>;
using Store = LoggingStubStoreSig_Mermaid;
using Store = LoggingStubStore_Mermaid;
eddyashton marked this conversation as resolved.
Show resolved Hide resolved
using Adaptor = aft::Adaptor<Store>;

aft::ChannelStubProxy* channel_stub_proxy(const TRaft& r)
Expand Down
50 changes: 18 additions & 32 deletions src/consensus/aft/test/logging_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,24 @@ namespace aft
std::lock_guard<std::mutex> lock(ledger_access);

// The payload that we eventually deserialise must include the
// ledger entry as well as the View and Index that identify it. In
// the real entries, they are nested in the payload and the IV. For
// test purposes, we just prefix them manually (to mirror the
// deserialisation in LoggingStubStore::ExecutionWrapper). We also
// size-prefix, so in a buffer of multiple of these messages we can
// extract each with get_entry
// ledger entry as well as the View and Index that identify it, and
// whether this is committable. In the real entries, they are nested in
// the payload and the IV. For test purposes, we just prefix them manually
// (to mirror the deserialisation in LoggingStubStore::ExecutionWrapper).
// We also size-prefix, so in a buffer of multiple of these messages we
// can extract each with get_entry
const size_t idx = ledger.size() + 1;
assert(idx == index);
auto additional_size = sizeof(size_t) + sizeof(term) + sizeof(index);
auto additional_size =
sizeof(size_t) + sizeof(bool) + sizeof(term) + sizeof(index);
std::vector<uint8_t> combined(additional_size);
{
uint8_t* data = combined.data();
serialized::write(
data,
additional_size,
(sizeof(term) + sizeof(index) + original.size()));
(sizeof(bool) + sizeof(term) + sizeof(index) + original.size()));
serialized::write(data, additional_size, globally_committable);
serialized::write(data, additional_size, term);
serialized::write(data, additional_size, index);
}
Expand Down Expand Up @@ -344,7 +346,6 @@ namespace aft
return kv::NoVersion;
}

template <kv::ApplyResult AR>
class ExecutionWrapper : public kv::AbstractExecutionWrapper
{
private:
Expand All @@ -366,11 +367,13 @@ namespace aft
const uint8_t* data = data_.data();
auto size = data_.size();

const auto committable = serialized::read<bool>(data, size);
term = serialized::read<aft::Term>(data, size);
index = serialized::read<kv::Version>(data, size);
entry = serialized::read(data, size, size);

result = AR;
result =
committable ? kv::ApplyResult::PASS_SIGNATURE : kv::ApplyResult::PASS;

if (expected_txid.has_value())
{
Expand Down Expand Up @@ -439,7 +442,7 @@ namespace aft
const std::optional<kv::TxID>& expected_txid = std::nullopt)
{
kv::ConsensusHookPtrs hooks = {};
return std::make_unique<ExecutionWrapper<kv::ApplyResult::PASS>>(
return std::make_unique<ExecutionWrapper>(
data, expected_txid, std::move(hooks));
}

Expand All @@ -451,27 +454,10 @@ namespace aft
void unset_flag(kv::AbstractStore::Flag) {}
};

class LoggingStubStoreSig : public LoggingStubStore
class LoggingStubStoreConfig : public LoggingStubStore
{
public:
LoggingStubStoreSig(ccf::NodeId id) : LoggingStubStore(id) {}

virtual std::unique_ptr<kv::AbstractExecutionWrapper> deserialize(
const std::vector<uint8_t>& data,
bool public_only = false,
const std::optional<kv::TxID>& expected_txid = std::nullopt) override
{
kv::ConsensusHookPtrs hooks = {};
return std::make_unique<
ExecutionWrapper<kv::ApplyResult::PASS_SIGNATURE>>(
data, expected_txid, std::move(hooks));
}
};

class LoggingStubStoreSigConfig : public LoggingStubStoreSig
{
public:
LoggingStubStoreSigConfig(ccf::NodeId id) : LoggingStubStoreSig(id) {}
LoggingStubStoreConfig(ccf::NodeId id) : LoggingStubStore(id) {}

virtual std::unique_ptr<kv::AbstractExecutionWrapper> deserialize(
const std::vector<uint8_t>& data,
Expand All @@ -482,6 +468,7 @@ namespace aft
// Read wrapping term and version
auto data_ = data.data();
auto size = data.size();
const auto committable = serialized::read<bool>(data_, size);
serialized::read<aft::Term>(data_, size);
auto version = serialized::read<kv::Version>(data_, size);
ReplicatedData r = nlohmann::json::parse(std::span{data_, size});
Expand All @@ -494,8 +481,7 @@ namespace aft
hooks.push_back(std::move(hook));
}

return std::make_unique<
ExecutionWrapper<kv::ApplyResult::PASS_SIGNATURE>>(
return std::make_unique<ExecutionWrapper>(
data, expected_txid, std::move(hooks));
}
};
Expand Down
8 changes: 4 additions & 4 deletions src/consensus/aft/test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,19 +531,19 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple"))
ccf::NodeId node_id0 = kv::test::PrimaryNodeId;
ccf::NodeId node_id1 = kv::test::FirstBackupNodeId;

auto kv_store0 = std::make_shared<SigStore>(node_id0);
auto kv_store1 = std::make_shared<SigStore>(node_id1);
auto kv_store0 = std::make_shared<Store>(node_id0);
auto kv_store1 = std::make_shared<Store>(node_id1);

TRaft r0(
raft_settings,
std::make_unique<SigAdaptor>(kv_store0),
std::make_unique<Adaptor>(kv_store0),
std::make_unique<aft::LedgerStubProxy>(node_id0),
std::make_shared<aft::ChannelStubProxy>(),
std::make_shared<aft::State>(node_id0),
nullptr);
TRaft r1(
raft_settings,
std::make_unique<SigAdaptor>(kv_store1),
std::make_unique<Adaptor>(kv_store1),
std::make_unique<aft::LedgerStubProxy>(node_id1),
std::make_shared<aft::ChannelStubProxy>(),
std::make_shared<aft::State>(node_id1),
Expand Down
3 changes: 0 additions & 3 deletions src/consensus/aft/test/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ using TRaft = aft::Aft<aft::LedgerStubProxy>;
using Store = aft::LoggingStubStore;
using Adaptor = aft::Adaptor<Store>;

using SigStore = aft::LoggingStubStoreSig;
using SigAdaptor = aft::Adaptor<SigStore>;

static std::vector<uint8_t> cert;

static const ds::TimeString request_timeout_ = {"10ms"};
Expand Down
2 changes: 2 additions & 0 deletions tla/consensus/SIMccfraft.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ INVARIANTS

CommitCommittableIndices

CommittableIndicesAreKnownSignaturesInv

\* DebugInvLeaderCannotStepDown
\* DebugInvAnyCommitted
\* DebugInvAllCommitted
Expand Down
13 changes: 12 additions & 1 deletion tla/consensus/ccfraft.tla
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,12 @@ NoConflictAppendEntriesRequest(i, j, m) ==
Max({c \in DOMAIN new_configs : c <= new_commit_index})
IN
/\ commitIndex' = [commitIndex EXCEPT ![i] = new_commit_index]
/\ committableIndices' = [ committableIndices EXCEPT ![i] = (@ \cup Len(log[i])..Len(log'[i])) \ 0..commitIndex'[i]]
\* see committable_indices.push_back(i) in raft.h:execute_append_entries_sync, guarded by case PASS_SIGNATURE
/\ committableIndices' =
[ committableIndices EXCEPT ![i] =
(@ \cup
{n \in Len(log[i])..Len(log'[i]) \ {0} : log'[i][n].contentType = TypeSignature})
\ 0..commitIndex'[i]]
/\ configurations' =
[configurations EXCEPT ![i] = RestrictDomain(new_configs, LAMBDA c : c >= new_conf_index)]
\* If we added a new configuration that we are in and were pending, we are now follower
Expand Down Expand Up @@ -1295,6 +1300,12 @@ CommitCommittableIndices ==
\A i \in Servers :
committableIndices[i] # {} => commitIndex[i] < Min(committableIndices[i])

CommittableIndicesAreKnownSignaturesInv ==
\A i \in Servers :
\A j \in committableIndices[i] :
/\ j \in DOMAIN(log[i])
/\ HasTypeSignature(log[i][j])

------------------------------------------------------------------------------
\* Properties

Expand Down