From 64435fdea6c56d941d7451a5e498cef49be94459 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Fri, 23 Aug 2024 11:45:30 +0400 Subject: [PATCH] send foreign proposals with local proposals --- bindings/dist/index.d.ts | 238 +++++++++++++++++ bindings/dist/index.js | 242 +++++++++++++++++ bindings/dist/types/Command.d.ts | 4 +- bindings/dist/types/ForeignProposalAtom.d.ts | 5 + bindings/dist/types/ForeignProposalAtom.js | 2 + .../dist/types/TransactionPoolRecord.d.ts | 1 + bindings/src/index.ts | 244 ++++++++++++++++++ bindings/src/types/Command.ts | 4 +- bindings/src/types/ForeignProposalAtom.ts | 7 + bindings/src/types/TransactionPoolRecord.ts | 1 + dan_layer/common_types/src/committee.rs | 1 - dan_layer/common_types/src/lib.rs | 2 +- .../common_types/src/substate_address.rs | 69 ++--- .../src/hotstuff/block_change_set.rs | 14 + .../src/hotstuff/on_message_validate.rs | 57 ++-- .../consensus/src/hotstuff/on_propose.rs | 106 ++++---- .../on_ready_to_vote_on_local_block.rs | 54 ++-- .../hotstuff/on_receive_foreign_proposal.rs | 103 +++----- .../src/hotstuff/on_receive_local_proposal.rs | 10 +- .../consensus/src/hotstuff/on_sync_request.rs | 17 +- dan_layer/consensus/src/hotstuff/worker.rs | 12 +- dan_layer/consensus/src/messages/proposal.rs | 36 ++- dan_layer/consensus_tests/src/consensus.rs | 9 +- .../consensus_tests/src/support/harness.rs | 29 ++- .../consensus_tests/src/support/logging.rs | 4 - .../consensus_tests/src/support/network.rs | 2 +- dan_layer/engine_types/src/fee_claim.rs | 21 +- dan_layer/engine_types/src/substate.rs | 33 ++- .../engine_types/src/transaction_receipt.rs | 21 +- dan_layer/p2p/proto/consensus.proto | 25 +- dan_layer/p2p/src/conversions/consensus.rs | 116 +++++---- .../up.sql | 31 ++- dan_layer/state_store_sqlite/src/reader.rs | 147 ++++++----- dan_layer/state_store_sqlite/src/schema.rs | 25 +- .../src/sql_models/block.rs | 12 +- .../src/sql_models/bookkeeping.rs | 70 +---- .../src/sql_models/foreign_parked_block.rs | 4 +- .../src/sql_models/foreign_proposal.rs | 132 ++++++++++ .../state_store_sqlite/src/sql_models/mod.rs | 2 + dan_layer/state_store_sqlite/src/writer.rs | 136 ++++++---- .../storage/src/consensus_models/block.rs | 30 +-- .../storage/src/consensus_models/command.rs | 6 +- .../storage/src/consensus_models/evidence.rs | 52 ++-- .../foreign_parked_proposal.rs | 30 +-- .../src/consensus_models/foreign_proposal.rs | 192 ++++++++------ .../src/consensus_models/transaction_pool.rs | 16 +- dan_layer/storage/src/state_store/mod.rs | 48 ++-- dan_layer/template_lib/src/hash.rs | 2 +- .../src/models/layer_one_commitment.rs | 22 +- dan_layer/wallet/sdk/src/apis/substate.rs | 2 +- networking/libp2p-messaging/src/handler.rs | 1 + 51 files changed, 1747 insertions(+), 702 deletions(-) create mode 100644 bindings/dist/types/ForeignProposalAtom.d.ts create mode 100644 bindings/dist/types/ForeignProposalAtom.js create mode 100644 bindings/src/types/ForeignProposalAtom.ts create mode 100644 dan_layer/state_store_sqlite/src/sql_models/foreign_proposal.rs diff --git a/bindings/dist/index.d.ts b/bindings/dist/index.d.ts index 39b829d49d..23698a2e36 100644 --- a/bindings/dist/index.d.ts +++ b/bindings/dist/index.d.ts @@ -586,3 +586,241 @@ export * from "./tari-indexer-client"; export * from "./validator-node-client"; export * from "./wallet-daemon-client"; export * from "./helpers/helpers"; +export * from "./types/AccessRule"; +export * from "./types/Account"; +export * from "./types/Amount"; +export * from "./types/ArgDef"; +export * from "./types/Arg"; +export * from "./types/AuthHook"; +export * from "./types/Block"; +export * from "./types/BucketId"; +export * from "./types/Claims"; +export * from "./types/Command"; +export * from "./types/CommitteeInfo"; +export * from "./types/CommitteeShardInfo"; +export * from "./types/Committee"; +export * from "./types/ComponentAccessRules"; +export * from "./types/ComponentAddress"; +export * from "./types/ComponentBody"; +export * from "./types/ComponentHeader"; +export * from "./types/ComponentKey"; +export * from "./types/ConfidentialClaim"; +export * from "./types/ConfidentialOutputStatement"; +export * from "./types/ConfidentialOutput"; +export * from "./types/ConfidentialStatement"; +export * from "./types/ConfidentialTransferInputSelection"; +export * from "./types/ConfidentialWithdrawProof"; +export * from "./types/Decision"; +export * from "./types/ElgamalVerifiableBalance"; +export * from "./types/EntityId"; +export * from "./types/Epoch"; +export * from "./types/Event"; +export * from "./types/Evidence"; +export * from "./types/ExecutedTransaction"; +export * from "./types/ExecuteResult"; +export * from "./types/FeeBreakdown"; +export * from "./types/FeeClaimAddress"; +export * from "./types/FeeClaim"; +export * from "./types/FeeCostBreakdown"; +export * from "./types/FeeReceipt"; +export * from "./types/FeeSource"; +export * from "./types/FinalizeResult"; +export * from "./types/ForeignProposalAtom"; +export * from "./types/ForeignProposalState"; +export * from "./types/ForeignProposal"; +export * from "./types/FunctionDef"; +export * from "./types/IndexedValue"; +export * from "./types/IndexedWellKnownTypes"; +export * from "./types/InstructionResult"; +export * from "./types/Instruction"; +export * from "./types/JrpcPermissions"; +export * from "./types/JrpcPermission"; +export * from "./types/LeaderFee"; +export * from "./types/LockFlag"; +export * from "./types/LogEntry"; +export * from "./types/LogLevel"; +export * from "./types/Metadata"; +export * from "./types/NetworkCommitteeInfo"; +export * from "./types/NodeHeight"; +export * from "./types/NonFungibleAddressContents"; +export * from "./types/NonFungibleAddress"; +export * from "./types/NonFungibleContainer"; +export * from "./types/NonFungibleId"; +export * from "./types/NonFungibleIndexAddress"; +export * from "./types/NonFungibleIndex"; +export * from "./types/NonFungibleToken"; +export * from "./types/NonFungible"; +export * from "./types/NumPreshards"; +export * from "./types/Ordering"; +export * from "./types/OwnerRule"; +export * from "./types/PeerAddress"; +export * from "./types/ProofId"; +export * from "./types/QuorumCertificate"; +export * from "./types/QuorumDecision"; +export * from "./types/RejectReason"; +export * from "./types/RequireRule"; +export * from "./types/ResourceAccessRules"; +export * from "./types/ResourceAddress"; +export * from "./types/ResourceContainer"; +export * from "./types/Resource"; +export * from "./types/ResourceType"; +export * from "./types/RestrictedAccessRule"; +export * from "./types/RuleRequirement"; +export * from "./types/ShardEvidence"; +export * from "./types/ShardGroup"; +export * from "./types/Shard"; +export * from "./types/SubstateAddress"; +export * from "./types/SubstateDestroyed"; +export * from "./types/SubstateDiff"; +export * from "./types/SubstateId"; +export * from "./types/SubstateLockFlag"; +export * from "./types/SubstateLockType"; +export * from "./types/SubstateRecord"; +export * from "./types/SubstateRequirement"; +export * from "./types/Substate"; +export * from "./types/SubstateType"; +export * from "./types/SubstateValue"; +export * from "./types/TemplateDef"; +export * from "./types/TemplateDefV1"; +export * from "./types/TransactionAtom"; +export * from "./types/TransactionPoolRecord"; +export * from "./types/TransactionPoolStage"; +export * from "./types/TransactionReceiptAddress"; +export * from "./types/TransactionReceipt"; +export * from "./types/TransactionResult"; +export * from "./types/TransactionSignature"; +export * from "./types/TransactionStatus"; +export * from "./types/Transaction"; +export * from "./types/Type"; +export * from "./types/UnclaimedConfidentialOutput"; +export * from "./types/UnsignedTransaction"; +export * from "./types/ValidatorSignature"; +export * from "./types/VaultId"; +export * from "./types/Vault"; +export * from "./types/VersionedSubstateIdLockIntent"; +export * from "./types/VersionedSubstateId"; +export * from "./types/ViewableBalanceProof"; +export * from "./base-node-client"; +export * from "./tari-indexer-client"; +export * from "./validator-node-client"; +export * from "./wallet-daemon-client"; +export * from "./helpers/helpers"; +export * from "./types/AccessRule"; +export * from "./types/Account"; +export * from "./types/Amount"; +export * from "./types/ArgDef"; +export * from "./types/Arg"; +export * from "./types/AuthHook"; +export * from "./types/Block"; +export * from "./types/BucketId"; +export * from "./types/Claims"; +export * from "./types/Command"; +export * from "./types/CommitteeInfo"; +export * from "./types/CommitteeShardInfo"; +export * from "./types/Committee"; +export * from "./types/ComponentAccessRules"; +export * from "./types/ComponentAddress"; +export * from "./types/ComponentBody"; +export * from "./types/ComponentHeader"; +export * from "./types/ComponentKey"; +export * from "./types/ConfidentialClaim"; +export * from "./types/ConfidentialOutputStatement"; +export * from "./types/ConfidentialOutput"; +export * from "./types/ConfidentialStatement"; +export * from "./types/ConfidentialTransferInputSelection"; +export * from "./types/ConfidentialWithdrawProof"; +export * from "./types/Decision"; +export * from "./types/ElgamalVerifiableBalance"; +export * from "./types/EntityId"; +export * from "./types/Epoch"; +export * from "./types/Event"; +export * from "./types/Evidence"; +export * from "./types/ExecutedTransaction"; +export * from "./types/ExecuteResult"; +export * from "./types/FeeBreakdown"; +export * from "./types/FeeClaimAddress"; +export * from "./types/FeeClaim"; +export * from "./types/FeeCostBreakdown"; +export * from "./types/FeeReceipt"; +export * from "./types/FeeSource"; +export * from "./types/FinalizeResult"; +export * from "./types/ForeignProposalAtom"; +export * from "./types/ForeignProposalState"; +export * from "./types/ForeignProposal"; +export * from "./types/FunctionDef"; +export * from "./types/IndexedValue"; +export * from "./types/IndexedWellKnownTypes"; +export * from "./types/InstructionResult"; +export * from "./types/Instruction"; +export * from "./types/JrpcPermissions"; +export * from "./types/JrpcPermission"; +export * from "./types/LeaderFee"; +export * from "./types/LockFlag"; +export * from "./types/LogEntry"; +export * from "./types/LogLevel"; +export * from "./types/Metadata"; +export * from "./types/NetworkCommitteeInfo"; +export * from "./types/NodeHeight"; +export * from "./types/NonFungibleAddressContents"; +export * from "./types/NonFungibleAddress"; +export * from "./types/NonFungibleContainer"; +export * from "./types/NonFungibleId"; +export * from "./types/NonFungibleIndexAddress"; +export * from "./types/NonFungibleIndex"; +export * from "./types/NonFungibleToken"; +export * from "./types/NonFungible"; +export * from "./types/NumPreshards"; +export * from "./types/Ordering"; +export * from "./types/OwnerRule"; +export * from "./types/PeerAddress"; +export * from "./types/ProofId"; +export * from "./types/QuorumCertificate"; +export * from "./types/QuorumDecision"; +export * from "./types/RejectReason"; +export * from "./types/RequireRule"; +export * from "./types/ResourceAccessRules"; +export * from "./types/ResourceAddress"; +export * from "./types/ResourceContainer"; +export * from "./types/Resource"; +export * from "./types/ResourceType"; +export * from "./types/RestrictedAccessRule"; +export * from "./types/RuleRequirement"; +export * from "./types/ShardEvidence"; +export * from "./types/ShardGroup"; +export * from "./types/Shard"; +export * from "./types/SubstateAddress"; +export * from "./types/SubstateDestroyed"; +export * from "./types/SubstateDiff"; +export * from "./types/SubstateId"; +export * from "./types/SubstateLockFlag"; +export * from "./types/SubstateLockType"; +export * from "./types/SubstateRecord"; +export * from "./types/SubstateRequirement"; +export * from "./types/Substate"; +export * from "./types/SubstateType"; +export * from "./types/SubstateValue"; +export * from "./types/TemplateDef"; +export * from "./types/TemplateDefV1"; +export * from "./types/TransactionAtom"; +export * from "./types/TransactionPoolRecord"; +export * from "./types/TransactionPoolStage"; +export * from "./types/TransactionReceiptAddress"; +export * from "./types/TransactionReceipt"; +export * from "./types/TransactionResult"; +export * from "./types/TransactionSignature"; +export * from "./types/TransactionStatus"; +export * from "./types/Transaction"; +export * from "./types/Type"; +export * from "./types/UnclaimedConfidentialOutput"; +export * from "./types/UnsignedTransaction"; +export * from "./types/ValidatorSignature"; +export * from "./types/VaultId"; +export * from "./types/Vault"; +export * from "./types/VersionedSubstateIdLockIntent"; +export * from "./types/VersionedSubstateId"; +export * from "./types/ViewableBalanceProof"; +export * from "./base-node-client"; +export * from "./tari-indexer-client"; +export * from "./validator-node-client"; +export * from "./wallet-daemon-client"; +export * from "./helpers/helpers"; diff --git a/bindings/dist/index.js b/bindings/dist/index.js index a560c71712..03df9a8f45 100644 --- a/bindings/dist/index.js +++ b/bindings/dist/index.js @@ -596,3 +596,245 @@ export * from "./tari-indexer-client"; export * from "./validator-node-client"; export * from "./wallet-daemon-client"; export * from "./helpers/helpers"; +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause +export * from "./types/AccessRule"; +export * from "./types/Account"; +export * from "./types/Amount"; +export * from "./types/ArgDef"; +export * from "./types/Arg"; +export * from "./types/AuthHook"; +export * from "./types/Block"; +export * from "./types/BucketId"; +export * from "./types/Claims"; +export * from "./types/Command"; +export * from "./types/CommitteeInfo"; +export * from "./types/CommitteeShardInfo"; +export * from "./types/Committee"; +export * from "./types/ComponentAccessRules"; +export * from "./types/ComponentAddress"; +export * from "./types/ComponentBody"; +export * from "./types/ComponentHeader"; +export * from "./types/ComponentKey"; +export * from "./types/ConfidentialClaim"; +export * from "./types/ConfidentialOutputStatement"; +export * from "./types/ConfidentialOutput"; +export * from "./types/ConfidentialStatement"; +export * from "./types/ConfidentialTransferInputSelection"; +export * from "./types/ConfidentialWithdrawProof"; +export * from "./types/Decision"; +export * from "./types/ElgamalVerifiableBalance"; +export * from "./types/EntityId"; +export * from "./types/Epoch"; +export * from "./types/Event"; +export * from "./types/Evidence"; +export * from "./types/ExecutedTransaction"; +export * from "./types/ExecuteResult"; +export * from "./types/FeeBreakdown"; +export * from "./types/FeeClaimAddress"; +export * from "./types/FeeClaim"; +export * from "./types/FeeCostBreakdown"; +export * from "./types/FeeReceipt"; +export * from "./types/FeeSource"; +export * from "./types/FinalizeResult"; +export * from "./types/ForeignProposalAtom"; +export * from "./types/ForeignProposalState"; +export * from "./types/ForeignProposal"; +export * from "./types/FunctionDef"; +export * from "./types/IndexedValue"; +export * from "./types/IndexedWellKnownTypes"; +export * from "./types/InstructionResult"; +export * from "./types/Instruction"; +export * from "./types/JrpcPermissions"; +export * from "./types/JrpcPermission"; +export * from "./types/LeaderFee"; +export * from "./types/LockFlag"; +export * from "./types/LogEntry"; +export * from "./types/LogLevel"; +export * from "./types/Metadata"; +export * from "./types/NetworkCommitteeInfo"; +export * from "./types/NodeHeight"; +export * from "./types/NonFungibleAddressContents"; +export * from "./types/NonFungibleAddress"; +export * from "./types/NonFungibleContainer"; +export * from "./types/NonFungibleId"; +export * from "./types/NonFungibleIndexAddress"; +export * from "./types/NonFungibleIndex"; +export * from "./types/NonFungibleToken"; +export * from "./types/NonFungible"; +export * from "./types/NumPreshards"; +export * from "./types/Ordering"; +export * from "./types/OwnerRule"; +export * from "./types/PeerAddress"; +export * from "./types/ProofId"; +export * from "./types/QuorumCertificate"; +export * from "./types/QuorumDecision"; +export * from "./types/RejectReason"; +export * from "./types/RequireRule"; +export * from "./types/ResourceAccessRules"; +export * from "./types/ResourceAddress"; +export * from "./types/ResourceContainer"; +export * from "./types/Resource"; +export * from "./types/ResourceType"; +export * from "./types/RestrictedAccessRule"; +export * from "./types/RuleRequirement"; +export * from "./types/ShardEvidence"; +export * from "./types/ShardGroup"; +export * from "./types/Shard"; +export * from "./types/SubstateAddress"; +export * from "./types/SubstateDestroyed"; +export * from "./types/SubstateDiff"; +export * from "./types/SubstateId"; +export * from "./types/SubstateLockFlag"; +export * from "./types/SubstateLockType"; +export * from "./types/SubstateRecord"; +export * from "./types/SubstateRequirement"; +export * from "./types/Substate"; +export * from "./types/SubstateType"; +export * from "./types/SubstateValue"; +export * from "./types/TemplateDef"; +export * from "./types/TemplateDefV1"; +export * from "./types/TransactionAtom"; +export * from "./types/TransactionPoolRecord"; +export * from "./types/TransactionPoolStage"; +export * from "./types/TransactionReceiptAddress"; +export * from "./types/TransactionReceipt"; +export * from "./types/TransactionResult"; +export * from "./types/TransactionSignature"; +export * from "./types/TransactionStatus"; +export * from "./types/Transaction"; +export * from "./types/Type"; +export * from "./types/UnclaimedConfidentialOutput"; +export * from "./types/UnsignedTransaction"; +export * from "./types/ValidatorSignature"; +export * from "./types/VaultId"; +export * from "./types/Vault"; +export * from "./types/VersionedSubstateIdLockIntent"; +export * from "./types/VersionedSubstateId"; +export * from "./types/ViewableBalanceProof"; +export * from "./base-node-client"; +export * from "./tari-indexer-client"; +export * from "./validator-node-client"; +export * from "./wallet-daemon-client"; +export * from "./helpers/helpers"; +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause +export * from "./types/AccessRule"; +export * from "./types/Account"; +export * from "./types/Amount"; +export * from "./types/ArgDef"; +export * from "./types/Arg"; +export * from "./types/AuthHook"; +export * from "./types/Block"; +export * from "./types/BucketId"; +export * from "./types/Claims"; +export * from "./types/Command"; +export * from "./types/CommitteeInfo"; +export * from "./types/CommitteeShardInfo"; +export * from "./types/Committee"; +export * from "./types/ComponentAccessRules"; +export * from "./types/ComponentAddress"; +export * from "./types/ComponentBody"; +export * from "./types/ComponentHeader"; +export * from "./types/ComponentKey"; +export * from "./types/ConfidentialClaim"; +export * from "./types/ConfidentialOutputStatement"; +export * from "./types/ConfidentialOutput"; +export * from "./types/ConfidentialStatement"; +export * from "./types/ConfidentialTransferInputSelection"; +export * from "./types/ConfidentialWithdrawProof"; +export * from "./types/Decision"; +export * from "./types/ElgamalVerifiableBalance"; +export * from "./types/EntityId"; +export * from "./types/Epoch"; +export * from "./types/Event"; +export * from "./types/Evidence"; +export * from "./types/ExecutedTransaction"; +export * from "./types/ExecuteResult"; +export * from "./types/FeeBreakdown"; +export * from "./types/FeeClaimAddress"; +export * from "./types/FeeClaim"; +export * from "./types/FeeCostBreakdown"; +export * from "./types/FeeReceipt"; +export * from "./types/FeeSource"; +export * from "./types/FinalizeResult"; +export * from "./types/ForeignProposalAtom"; +export * from "./types/ForeignProposalState"; +export * from "./types/ForeignProposal"; +export * from "./types/FunctionDef"; +export * from "./types/IndexedValue"; +export * from "./types/IndexedWellKnownTypes"; +export * from "./types/InstructionResult"; +export * from "./types/Instruction"; +export * from "./types/JrpcPermissions"; +export * from "./types/JrpcPermission"; +export * from "./types/LeaderFee"; +export * from "./types/LockFlag"; +export * from "./types/LogEntry"; +export * from "./types/LogLevel"; +export * from "./types/Metadata"; +export * from "./types/NetworkCommitteeInfo"; +export * from "./types/NodeHeight"; +export * from "./types/NonFungibleAddressContents"; +export * from "./types/NonFungibleAddress"; +export * from "./types/NonFungibleContainer"; +export * from "./types/NonFungibleId"; +export * from "./types/NonFungibleIndexAddress"; +export * from "./types/NonFungibleIndex"; +export * from "./types/NonFungibleToken"; +export * from "./types/NonFungible"; +export * from "./types/NumPreshards"; +export * from "./types/Ordering"; +export * from "./types/OwnerRule"; +export * from "./types/PeerAddress"; +export * from "./types/ProofId"; +export * from "./types/QuorumCertificate"; +export * from "./types/QuorumDecision"; +export * from "./types/RejectReason"; +export * from "./types/RequireRule"; +export * from "./types/ResourceAccessRules"; +export * from "./types/ResourceAddress"; +export * from "./types/ResourceContainer"; +export * from "./types/Resource"; +export * from "./types/ResourceType"; +export * from "./types/RestrictedAccessRule"; +export * from "./types/RuleRequirement"; +export * from "./types/ShardEvidence"; +export * from "./types/ShardGroup"; +export * from "./types/Shard"; +export * from "./types/SubstateAddress"; +export * from "./types/SubstateDestroyed"; +export * from "./types/SubstateDiff"; +export * from "./types/SubstateId"; +export * from "./types/SubstateLockFlag"; +export * from "./types/SubstateLockType"; +export * from "./types/SubstateRecord"; +export * from "./types/SubstateRequirement"; +export * from "./types/Substate"; +export * from "./types/SubstateType"; +export * from "./types/SubstateValue"; +export * from "./types/TemplateDef"; +export * from "./types/TemplateDefV1"; +export * from "./types/TransactionAtom"; +export * from "./types/TransactionPoolRecord"; +export * from "./types/TransactionPoolStage"; +export * from "./types/TransactionReceiptAddress"; +export * from "./types/TransactionReceipt"; +export * from "./types/TransactionResult"; +export * from "./types/TransactionSignature"; +export * from "./types/TransactionStatus"; +export * from "./types/Transaction"; +export * from "./types/Type"; +export * from "./types/UnclaimedConfidentialOutput"; +export * from "./types/UnsignedTransaction"; +export * from "./types/ValidatorSignature"; +export * from "./types/VaultId"; +export * from "./types/Vault"; +export * from "./types/VersionedSubstateIdLockIntent"; +export * from "./types/VersionedSubstateId"; +export * from "./types/ViewableBalanceProof"; +export * from "./base-node-client"; +export * from "./tari-indexer-client"; +export * from "./validator-node-client"; +export * from "./wallet-daemon-client"; +export * from "./helpers/helpers"; diff --git a/bindings/dist/types/Command.d.ts b/bindings/dist/types/Command.d.ts index dac5ed31f6..07fb1dfb50 100644 --- a/bindings/dist/types/Command.d.ts +++ b/bindings/dist/types/Command.d.ts @@ -1,4 +1,4 @@ -import type { ForeignProposal } from "./ForeignProposal"; +import type { ForeignProposalAtom } from "./ForeignProposalAtom"; import type { TransactionAtom } from "./TransactionAtom"; export type Command = { LocalOnly: TransactionAtom; @@ -17,5 +17,5 @@ export type Command = { } | { SomeAccept: TransactionAtom; } | { - ForeignProposal: ForeignProposal; + ForeignProposal: ForeignProposalAtom; } | "EndEpoch"; diff --git a/bindings/dist/types/ForeignProposalAtom.d.ts b/bindings/dist/types/ForeignProposalAtom.d.ts new file mode 100644 index 0000000000..63fe390030 --- /dev/null +++ b/bindings/dist/types/ForeignProposalAtom.d.ts @@ -0,0 +1,5 @@ +export interface ForeignProposalAtom { + block_id: string; + shard_group: number; + base_layer_block_height: number; +} diff --git a/bindings/dist/types/ForeignProposalAtom.js b/bindings/dist/types/ForeignProposalAtom.js new file mode 100644 index 0000000000..e5b481d1ea --- /dev/null +++ b/bindings/dist/types/ForeignProposalAtom.js @@ -0,0 +1,2 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +export {}; diff --git a/bindings/dist/types/TransactionPoolRecord.d.ts b/bindings/dist/types/TransactionPoolRecord.d.ts index 025d38ada9..ce0482b19d 100644 --- a/bindings/dist/types/TransactionPoolRecord.d.ts +++ b/bindings/dist/types/TransactionPoolRecord.d.ts @@ -5,6 +5,7 @@ import type { TransactionPoolStage } from "./TransactionPoolStage"; export interface TransactionPoolRecord { transaction_id: string; evidence: Evidence; + remote_evidence: Evidence | null; transaction_fee: number; leader_fee: LeaderFee | null; stage: TransactionPoolStage; diff --git a/bindings/src/index.ts b/bindings/src/index.ts index fc1d49f87f..e492569251 100644 --- a/bindings/src/index.ts +++ b/bindings/src/index.ts @@ -601,3 +601,247 @@ export * from "./tari-indexer-client"; export * from "./validator-node-client"; export * from "./wallet-daemon-client"; export * from "./helpers/helpers"; +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +export * from "./types/AccessRule"; +export * from "./types/Account"; +export * from "./types/Amount"; +export * from "./types/ArgDef"; +export * from "./types/Arg"; +export * from "./types/AuthHook"; +export * from "./types/Block"; +export * from "./types/BucketId"; +export * from "./types/Claims"; +export * from "./types/Command"; +export * from "./types/CommitteeInfo"; +export * from "./types/CommitteeShardInfo"; +export * from "./types/Committee"; +export * from "./types/ComponentAccessRules"; +export * from "./types/ComponentAddress"; +export * from "./types/ComponentBody"; +export * from "./types/ComponentHeader"; +export * from "./types/ComponentKey"; +export * from "./types/ConfidentialClaim"; +export * from "./types/ConfidentialOutputStatement"; +export * from "./types/ConfidentialOutput"; +export * from "./types/ConfidentialStatement"; +export * from "./types/ConfidentialTransferInputSelection"; +export * from "./types/ConfidentialWithdrawProof"; +export * from "./types/Decision"; +export * from "./types/ElgamalVerifiableBalance"; +export * from "./types/EntityId"; +export * from "./types/Epoch"; +export * from "./types/Event"; +export * from "./types/Evidence"; +export * from "./types/ExecutedTransaction"; +export * from "./types/ExecuteResult"; +export * from "./types/FeeBreakdown"; +export * from "./types/FeeClaimAddress"; +export * from "./types/FeeClaim"; +export * from "./types/FeeCostBreakdown"; +export * from "./types/FeeReceipt"; +export * from "./types/FeeSource"; +export * from "./types/FinalizeResult"; +export * from "./types/ForeignProposalAtom"; +export * from "./types/ForeignProposalState"; +export * from "./types/ForeignProposal"; +export * from "./types/FunctionDef"; +export * from "./types/IndexedValue"; +export * from "./types/IndexedWellKnownTypes"; +export * from "./types/InstructionResult"; +export * from "./types/Instruction"; +export * from "./types/JrpcPermissions"; +export * from "./types/JrpcPermission"; +export * from "./types/LeaderFee"; +export * from "./types/LockFlag"; +export * from "./types/LogEntry"; +export * from "./types/LogLevel"; +export * from "./types/Metadata"; +export * from "./types/NetworkCommitteeInfo"; +export * from "./types/NodeHeight"; +export * from "./types/NonFungibleAddressContents"; +export * from "./types/NonFungibleAddress"; +export * from "./types/NonFungibleContainer"; +export * from "./types/NonFungibleId"; +export * from "./types/NonFungibleIndexAddress"; +export * from "./types/NonFungibleIndex"; +export * from "./types/NonFungibleToken"; +export * from "./types/NonFungible"; +export * from "./types/NumPreshards"; +export * from "./types/Ordering"; +export * from "./types/OwnerRule"; +export * from "./types/PeerAddress"; +export * from "./types/ProofId"; +export * from "./types/QuorumCertificate"; +export * from "./types/QuorumDecision"; +export * from "./types/RejectReason"; +export * from "./types/RequireRule"; +export * from "./types/ResourceAccessRules"; +export * from "./types/ResourceAddress"; +export * from "./types/ResourceContainer"; +export * from "./types/Resource"; +export * from "./types/ResourceType"; +export * from "./types/RestrictedAccessRule"; +export * from "./types/RuleRequirement"; +export * from "./types/ShardEvidence"; +export * from "./types/ShardGroup"; +export * from "./types/Shard"; +export * from "./types/SubstateAddress"; +export * from "./types/SubstateDestroyed"; +export * from "./types/SubstateDiff"; +export * from "./types/SubstateId"; +export * from "./types/SubstateLockFlag"; +export * from "./types/SubstateLockType"; +export * from "./types/SubstateRecord"; +export * from "./types/SubstateRequirement"; +export * from "./types/Substate"; +export * from "./types/SubstateType"; +export * from "./types/SubstateValue"; +export * from "./types/TemplateDef"; +export * from "./types/TemplateDefV1"; +export * from "./types/TransactionAtom"; +export * from "./types/TransactionPoolRecord"; +export * from "./types/TransactionPoolStage"; +export * from "./types/TransactionReceiptAddress"; +export * from "./types/TransactionReceipt"; +export * from "./types/TransactionResult"; +export * from "./types/TransactionSignature"; +export * from "./types/TransactionStatus"; +export * from "./types/Transaction"; +export * from "./types/Type"; +export * from "./types/UnclaimedConfidentialOutput"; +export * from "./types/UnsignedTransaction"; +export * from "./types/ValidatorSignature"; +export * from "./types/VaultId"; +export * from "./types/Vault"; +export * from "./types/VersionedSubstateIdLockIntent"; +export * from "./types/VersionedSubstateId"; +export * from "./types/ViewableBalanceProof"; +export * from "./base-node-client"; +export * from "./tari-indexer-client"; +export * from "./validator-node-client"; +export * from "./wallet-daemon-client"; +export * from "./helpers/helpers"; +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +export * from "./types/AccessRule"; +export * from "./types/Account"; +export * from "./types/Amount"; +export * from "./types/ArgDef"; +export * from "./types/Arg"; +export * from "./types/AuthHook"; +export * from "./types/Block"; +export * from "./types/BucketId"; +export * from "./types/Claims"; +export * from "./types/Command"; +export * from "./types/CommitteeInfo"; +export * from "./types/CommitteeShardInfo"; +export * from "./types/Committee"; +export * from "./types/ComponentAccessRules"; +export * from "./types/ComponentAddress"; +export * from "./types/ComponentBody"; +export * from "./types/ComponentHeader"; +export * from "./types/ComponentKey"; +export * from "./types/ConfidentialClaim"; +export * from "./types/ConfidentialOutputStatement"; +export * from "./types/ConfidentialOutput"; +export * from "./types/ConfidentialStatement"; +export * from "./types/ConfidentialTransferInputSelection"; +export * from "./types/ConfidentialWithdrawProof"; +export * from "./types/Decision"; +export * from "./types/ElgamalVerifiableBalance"; +export * from "./types/EntityId"; +export * from "./types/Epoch"; +export * from "./types/Event"; +export * from "./types/Evidence"; +export * from "./types/ExecutedTransaction"; +export * from "./types/ExecuteResult"; +export * from "./types/FeeBreakdown"; +export * from "./types/FeeClaimAddress"; +export * from "./types/FeeClaim"; +export * from "./types/FeeCostBreakdown"; +export * from "./types/FeeReceipt"; +export * from "./types/FeeSource"; +export * from "./types/FinalizeResult"; +export * from "./types/ForeignProposalAtom"; +export * from "./types/ForeignProposalState"; +export * from "./types/ForeignProposal"; +export * from "./types/FunctionDef"; +export * from "./types/IndexedValue"; +export * from "./types/IndexedWellKnownTypes"; +export * from "./types/InstructionResult"; +export * from "./types/Instruction"; +export * from "./types/JrpcPermissions"; +export * from "./types/JrpcPermission"; +export * from "./types/LeaderFee"; +export * from "./types/LockFlag"; +export * from "./types/LogEntry"; +export * from "./types/LogLevel"; +export * from "./types/Metadata"; +export * from "./types/NetworkCommitteeInfo"; +export * from "./types/NodeHeight"; +export * from "./types/NonFungibleAddressContents"; +export * from "./types/NonFungibleAddress"; +export * from "./types/NonFungibleContainer"; +export * from "./types/NonFungibleId"; +export * from "./types/NonFungibleIndexAddress"; +export * from "./types/NonFungibleIndex"; +export * from "./types/NonFungibleToken"; +export * from "./types/NonFungible"; +export * from "./types/NumPreshards"; +export * from "./types/Ordering"; +export * from "./types/OwnerRule"; +export * from "./types/PeerAddress"; +export * from "./types/ProofId"; +export * from "./types/QuorumCertificate"; +export * from "./types/QuorumDecision"; +export * from "./types/RejectReason"; +export * from "./types/RequireRule"; +export * from "./types/ResourceAccessRules"; +export * from "./types/ResourceAddress"; +export * from "./types/ResourceContainer"; +export * from "./types/Resource"; +export * from "./types/ResourceType"; +export * from "./types/RestrictedAccessRule"; +export * from "./types/RuleRequirement"; +export * from "./types/ShardEvidence"; +export * from "./types/ShardGroup"; +export * from "./types/Shard"; +export * from "./types/SubstateAddress"; +export * from "./types/SubstateDestroyed"; +export * from "./types/SubstateDiff"; +export * from "./types/SubstateId"; +export * from "./types/SubstateLockFlag"; +export * from "./types/SubstateLockType"; +export * from "./types/SubstateRecord"; +export * from "./types/SubstateRequirement"; +export * from "./types/Substate"; +export * from "./types/SubstateType"; +export * from "./types/SubstateValue"; +export * from "./types/TemplateDef"; +export * from "./types/TemplateDefV1"; +export * from "./types/TransactionAtom"; +export * from "./types/TransactionPoolRecord"; +export * from "./types/TransactionPoolStage"; +export * from "./types/TransactionReceiptAddress"; +export * from "./types/TransactionReceipt"; +export * from "./types/TransactionResult"; +export * from "./types/TransactionSignature"; +export * from "./types/TransactionStatus"; +export * from "./types/Transaction"; +export * from "./types/Type"; +export * from "./types/UnclaimedConfidentialOutput"; +export * from "./types/UnsignedTransaction"; +export * from "./types/ValidatorSignature"; +export * from "./types/VaultId"; +export * from "./types/Vault"; +export * from "./types/VersionedSubstateIdLockIntent"; +export * from "./types/VersionedSubstateId"; +export * from "./types/ViewableBalanceProof"; +export * from "./base-node-client"; +export * from "./tari-indexer-client"; +export * from "./validator-node-client"; +export * from "./wallet-daemon-client"; +export * from "./helpers/helpers"; diff --git a/bindings/src/types/Command.ts b/bindings/src/types/Command.ts index 8afe235f7b..cf2c8be199 100644 --- a/bindings/src/types/Command.ts +++ b/bindings/src/types/Command.ts @@ -1,5 +1,5 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -import type { ForeignProposal } from "./ForeignProposal"; +import type { ForeignProposalAtom } from "./ForeignProposalAtom"; import type { TransactionAtom } from "./TransactionAtom"; export type Command = @@ -11,5 +11,5 @@ export type Command = | { LocalAccept: TransactionAtom } | { AllAccept: TransactionAtom } | { SomeAccept: TransactionAtom } - | { ForeignProposal: ForeignProposal } + | { ForeignProposal: ForeignProposalAtom } | "EndEpoch"; diff --git a/bindings/src/types/ForeignProposalAtom.ts b/bindings/src/types/ForeignProposalAtom.ts new file mode 100644 index 0000000000..301c086c08 --- /dev/null +++ b/bindings/src/types/ForeignProposalAtom.ts @@ -0,0 +1,7 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export interface ForeignProposalAtom { + block_id: string; + shard_group: number; + base_layer_block_height: number; +} diff --git a/bindings/src/types/TransactionPoolRecord.ts b/bindings/src/types/TransactionPoolRecord.ts index 533d961081..1a36cdab2f 100644 --- a/bindings/src/types/TransactionPoolRecord.ts +++ b/bindings/src/types/TransactionPoolRecord.ts @@ -7,6 +7,7 @@ import type { TransactionPoolStage } from "./TransactionPoolStage"; export interface TransactionPoolRecord { transaction_id: string; evidence: Evidence; + remote_evidence: Evidence | null; transaction_fee: number; leader_fee: LeaderFee | null; stage: TransactionPoolStage; diff --git a/dan_layer/common_types/src/committee.rs b/dan_layer/common_types/src/committee.rs index 81383ca5d7..79dac7d11b 100644 --- a/dan_layer/common_types/src/committee.rs +++ b/dan_layer/common_types/src/committee.rs @@ -216,7 +216,6 @@ impl CommitteeInfo { self.shard_group.to_substate_address_range(self.num_shards) } - // TODO: change these to take in a SubstateId pub fn includes_substate_address(&self, substate_address: &SubstateAddress) -> bool { let s = substate_address.to_shard(self.num_shards); self.shard_group.contains(&s) diff --git a/dan_layer/common_types/src/lib.rs b/dan_layer/common_types/src/lib.rs index 9105b56319..a3d67c423a 100644 --- a/dan_layer/common_types/src/lib.rs +++ b/dan_layer/common_types/src/lib.rs @@ -27,7 +27,7 @@ pub use node_addressable::*; pub mod services; mod substate_address; -pub use substate_address::SubstateAddress; +pub use substate_address::*; pub mod substate_type; diff --git a/dan_layer/common_types/src/substate_address.rs b/dan_layer/common_types/src/substate_address.rs index edc578833b..d373a64689 100644 --- a/dan_layer/common_types/src/substate_address.rs +++ b/dan_layer/common_types/src/substate_address.rs @@ -15,13 +15,8 @@ use tari_crypto::tari_utilities::{ hex::{from_hex, Hex}, ByteArray, }; -use tari_engine_types::{ - hashing::{hasher32, EngineHashDomainLabel}, - serde_with, - substate::SubstateId, - transaction_receipt::TransactionReceiptAddress, -}; -use tari_template_lib::{models::ObjectKey, Hash}; +use tari_engine_types::{serde_with, substate::SubstateId, transaction_receipt::TransactionReceiptAddress}; +use tari_template_lib::models::ObjectKey; use crate::{shard::Shard, uint::U256, NumPreshards, ShardGroup}; @@ -40,45 +35,16 @@ pub struct SubstateAddress( impl SubstateAddress { pub const LENGTH: usize = ObjectKey::LENGTH + size_of::(); - /// Defines the mapping of SubstateId to SubstateAddress + /// Defines the mapping of SubstateId,version to SubstateAddress pub fn from_substate_id(id: &SubstateId, version: u32) -> Self { - match id { - SubstateId::Component(id) => Self::from_object_key(id.as_object_key(), version), - SubstateId::Resource(id) => Self::from_object_key(id.as_object_key(), version), - SubstateId::Vault(id) => Self::from_object_key(id.as_object_key(), version), - SubstateId::NonFungible(id) => { - let key = hasher32(EngineHashDomainLabel::NonFungibleId) - .chain(id.resource_address()) - .chain(id.id()) - .result() - .trailing_bytes() - .into(); - - Self::from_object_key(&ObjectKey::new(id.resource_address().as_entity_id(), key), version) - }, - SubstateId::NonFungibleIndex(id) => { - let key = hasher32(EngineHashDomainLabel::NonFungibleIndex) - .chain(id.resource_address().as_object_key()) - .chain(&id.index()) - .result() - .trailing_bytes() - .into(); - Self::from_object_key(&ObjectKey::new(id.resource_address().as_entity_id(), key), version) - }, - - // These should only have a version of 0, however the address should account for the version argument passed - // in. For example, if querying one of these substates with a version > 0 then the substate will not exist. - SubstateId::UnclaimedConfidentialOutput(id) => Self::from_hash(id.hash(), version), - SubstateId::TransactionReceipt(id) => Self::from_hash(id.hash(), version), - SubstateId::FeeClaim(id) => Self::from_hash(id.hash(), version), - } + Self::from_object_key(&id.to_object_key(), version) } pub fn for_transaction_receipt(tx_receipt: TransactionReceiptAddress) -> Self { Self::from_substate_id(&tx_receipt.into(), 0) } - fn from_object_key(object_key: &ObjectKey, version: u32) -> Self { + pub fn from_object_key(object_key: &ObjectKey, version: u32) -> Self { // concatenate (entity_id, component_key), and version let mut buf = [0u8; SubstateAddress::LENGTH]; buf[..ObjectKey::LENGTH].copy_from_slice(object_key); @@ -87,17 +53,6 @@ impl SubstateAddress { Self(buf) } - fn from_hash(hash: &Hash, version: u32) -> Self { - // let new_addr = hasher32(EngineHashDomainLabel::SubstateAddress) - // .chain(hash) - // .chain(&version) - // .result(); - let mut buf = [0u8; SubstateAddress::LENGTH]; - buf[..ObjectKey::LENGTH].copy_from_slice(hash); - buf[ObjectKey::LENGTH..].copy_from_slice(&version.to_le_bytes()); - Self(buf) - } - pub fn as_bytes(&self) -> &[u8] { &self.0 } @@ -106,11 +61,11 @@ impl SubstateAddress { if bytes.len() != SubstateAddress::LENGTH { return Err(FixedHashSizeError); } - let hash = Hash::try_from(&bytes[..ObjectKey::LENGTH]).map_err(|_| FixedHashSizeError)?; + let key = ObjectKey::try_from(&bytes[..ObjectKey::LENGTH]).map_err(|_| FixedHashSizeError)?; let mut v_buf = [0u8; size_of::()]; v_buf.copy_from_slice(&bytes[ObjectKey::LENGTH..]); let version = u32::from_le_bytes(v_buf); - Ok(Self::from_hash(&hash, version)) + Ok(Self::from_object_key(&key, version)) } pub fn is_zero(&self) -> bool { @@ -154,6 +109,16 @@ impl SubstateAddress { Self(buf) } + pub fn object_key_bytes(&self) -> &[u8] { + &self.0[..ObjectKey::LENGTH] + } + + pub fn to_version(&self) -> u32 { + let mut buf = [0u8; size_of::()]; + buf.copy_from_slice(&self.0[ObjectKey::LENGTH..]); + u32::from_le_bytes(buf) + } + pub fn to_u256(&self) -> U256 { let mut buf = [0u8; ObjectKey::LENGTH]; buf.copy_from_slice(&self.0[..ObjectKey::LENGTH]); diff --git a/dan_layer/consensus/src/hotstuff/block_change_set.rs b/dan_layer/consensus/src/hotstuff/block_change_set.rs index 1f1bb14b21..956f08ac70 100644 --- a/dan_layer/consensus/src/hotstuff/block_change_set.rs +++ b/dan_layer/consensus/src/hotstuff/block_change_set.rs @@ -10,7 +10,9 @@ use tari_dan_storage::{ consensus_models::{ Block, BlockDiff, + BlockId, BlockTransactionExecution, + ForeignProposal, LeafBlock, PendingShardStateTreeDiff, QuorumCertificate, @@ -52,6 +54,7 @@ pub struct ProposedBlockChangeSet { state_tree_diffs: IndexMap, substate_locks: IndexMap>, transaction_changes: IndexMap, + proposed_foreign_proposals: Vec, } impl ProposedBlockChangeSet { @@ -63,6 +66,7 @@ impl ProposedBlockChangeSet { substate_locks: IndexMap::new(), transaction_changes: IndexMap::new(), state_tree_diffs: IndexMap::new(), + proposed_foreign_proposals: Vec::new(), } } @@ -72,6 +76,7 @@ impl ProposedBlockChangeSet { self.transaction_changes = IndexMap::new(); self.state_tree_diffs = IndexMap::new(); self.substate_locks = IndexMap::new(); + self.proposed_foreign_proposals = Vec::new(); self } @@ -95,6 +100,11 @@ impl ProposedBlockChangeSet { self } + pub fn set_foreign_proposal_proposed_in(&mut self, foreign_proposal_block_id: BlockId) -> &mut Self { + self.proposed_foreign_proposals.push(foreign_proposal_block_id); + self + } + // TODO: this is a hack to allow the update to be modified after the fact. This should be removed. pub fn next_update_mut(&mut self, transaction_id: &TransactionId) -> Option<&mut TransactionPoolStatusUpdate> { self.transaction_changes @@ -213,6 +223,10 @@ impl ProposedBlockChangeSet { } } + for block_id in self.proposed_foreign_proposals { + ForeignProposal::set_proposed_in(tx, &block_id, &self.block.block_id)?; + } + Ok(()) } } diff --git a/dan_layer/consensus/src/hotstuff/on_message_validate.rs b/dan_layer/consensus/src/hotstuff/on_message_validate.rs index ab6fb47894..fc9d852d90 100644 --- a/dan_layer/consensus/src/hotstuff/on_message_validate.rs +++ b/dan_layer/consensus/src/hotstuff/on_message_validate.rs @@ -126,35 +126,33 @@ impl OnMessageValidate { from: TConsensusSpec::Addr, proposal: ProposalMessage, ) -> Result, HotStuffError> { - let ProposalMessage { block } = proposal; - info!( target: LOG_TARGET, "📜 new unvalidated PROPOSAL message {} from {} (current height = {})", - block, - block.proposed_by(), + proposal.block, + proposal.block.proposed_by(), current_height, ); - if block.height() < current_height { + if proposal.block.height() < current_height { info!( target: LOG_TARGET, "🔥 Block {} is lower than current height {}. Ignoring.", - block, + proposal.block, current_height ); return Ok(MessageValidationResult::Discard); } - if let Err(err) = self.check_proposal(&block).await { + if let Err(err) = self.check_proposal(&proposal.block).await { return Ok(MessageValidationResult::Invalid { from, - message: HotstuffMessage::Proposal(ProposalMessage { block }), + message: HotstuffMessage::Proposal(proposal), err, }); } - self.handle_missing_transactions_local_block(from, block).await + self.handle_missing_transactions_local_block(from, proposal).await } pub fn update_local_parked_blocks( @@ -166,7 +164,7 @@ impl OnMessageValidate { .store .with_write_tx(|tx| tx.missing_transactions_remove(current_height, transaction_id))?; - let Some(unparked_block) = maybe_unparked_block else { + let Some((unparked_block, foreign_proposals)) = maybe_unparked_block else { return Ok(None); }; @@ -176,7 +174,10 @@ impl OnMessageValidate { block: unparked_block.as_leaf_block(), }); - Ok(Some(ProposalMessage { block: unparked_block })) + Ok(Some(ProposalMessage { + block: unparked_block, + foreign_proposals, + })) } pub fn update_foreign_parked_blocks( @@ -211,30 +212,30 @@ impl OnMessageValidate { async fn handle_missing_transactions_local_block( &mut self, from: TConsensusSpec::Addr, - block: Block, + proposal: ProposalMessage, ) -> Result, HotStuffError> { let missing_tx_ids = self .store - .with_write_tx(|tx| self.check_for_missing_transactions(tx, &block))?; + .with_write_tx(|tx| self.check_for_missing_transactions(tx, &proposal))?; if missing_tx_ids.is_empty() { return Ok(MessageValidationResult::Ready { from, - message: HotstuffMessage::Proposal(ProposalMessage { block }), + message: HotstuffMessage::Proposal(proposal), }); } let _ignore = self.tx_events.send(HotstuffEvent::ProposedBlockParked { - block: block.as_leaf_block(), + block: proposal.block.as_leaf_block(), num_missing_txs: missing_tx_ids.len(), // TODO: remove num_awaiting_txs: 0, }); Ok(MessageValidationResult::ParkedProposal { - block_id: *block.id(), - epoch: block.epoch(), - proposed_by: block.proposed_by().clone(), + block_id: *proposal.block.id(), + epoch: proposal.block.epoch(), + proposed_by: proposal.block.proposed_by().clone(), missing_txs: missing_tx_ids, }) } @@ -242,31 +243,31 @@ impl OnMessageValidate { fn check_for_missing_transactions( &self, tx: &mut ::WriteTransaction<'_>, - block: &Block, + proposal: &ProposalMessage, ) -> Result, HotStuffError> { - if block.commands().is_empty() { + if proposal.block.commands().is_empty() { debug!( target: LOG_TARGET, - "✅ Block {} is empty (no missing transactions)", block + "✅ Block {} is empty (no missing transactions)", proposal.block ); return Ok(HashSet::new()); } - let missing_tx_ids = TransactionRecord::get_missing(&**tx, block.all_transaction_ids())?; + let missing_tx_ids = TransactionRecord::get_missing(&**tx, proposal.block.all_transaction_ids())?; if missing_tx_ids.is_empty() { debug!( target: LOG_TARGET, - "✅ Block {} has no missing transactions", block + "✅ Block {} has no missing transactions", proposal.block ); return Ok(HashSet::new()); } info!( target: LOG_TARGET, - "⏳ Block {} has {} missing transactions", block, missing_tx_ids.len(), + "⏳ Block {} has {} missing transactions", proposal.block, missing_tx_ids.len(), ); - tx.missing_transactions_insert(block, &missing_tx_ids, &[])?; + tx.missing_transactions_insert(&proposal.block, &proposal.foreign_proposals, &missing_tx_ids)?; Ok(missing_tx_ids) } @@ -353,9 +354,9 @@ impl OnMessageValidate { parked_block.add_missing_transactions(tx, &missing_tx_ids)?; Ok(MessageValidationResult::ParkedProposal { - block_id: *parked_block.block.id(), - epoch: parked_block.block.epoch(), - proposed_by: parked_block.block.proposed_by().clone(), + block_id: *parked_block.block().id(), + epoch: parked_block.block().epoch(), + proposed_by: parked_block.block().proposed_by().clone(), missing_txs: missing_tx_ids, }) }) diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index e6626a70b1..42a3f2afd6 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -29,7 +29,6 @@ use tari_dan_storage::{ HighQc, LastProposed, LeafBlock, - LockedBlock, PendingShardStateTreeDiff, QuorumCertificate, SubstateChange, @@ -67,6 +66,12 @@ use crate::{ const LOG_TARGET: &str = "tari::dan::consensus::hotstuff::on_local_propose"; +type NextBlock = ( + Block, + Vec, + HashMap, +); + pub struct OnPropose { config: HotstuffConfig, store: TConsensusSpec::StateStore, @@ -118,21 +123,25 @@ where TConsensusSpec: ConsensusSpec // is_newview_propose means that a NEWVIEW has reached quorum and nodes are expecting us to propose. // Re-broadcast the previous proposal if is_newview_propose { - if let Some(next_block) = self.store.with_read_tx(|tx| last_proposed.get_block(tx)).optional()? { - info!( - target: LOG_TARGET, - "🌿 RE-BROADCASTING local block {}({}) to {} validators. {} command(s), justify: {} ({}), parent: {}", - next_block.id(), - next_block.height(), - local_committee.len(), - next_block.commands().len(), - next_block.justify().block_id(), - next_block.justify().block_height(), - next_block.parent(), - ); - self.broadcast_local_proposal(next_block, local_committee).await?; - return Ok(()); - } + warn!( + target: LOG_TARGET, + "⚠️ Newview propose {leaf_block} but we already proposed block {last_proposed}.", + ); + // if let Some(next_block) = self.store.with_read_tx(|tx| last_proposed.get_block(tx)).optional()? { + // info!( + // target: LOG_TARGET, + // "🌿 RE-BROADCASTING local block {}({}) to {} validators. {} command(s), justify: {} ({}), + // parent: {}", next_block.id(), + // next_block.height(), + // local_committee.len(), + // next_block.commands().len(), + // next_block.justify().block_id(), + // next_block.justify().block_height(), + // next_block.parent(), + // ); + // self.broadcast_local_proposal(next_block, local_committee).await?; + // return Ok(()); + // } } info!( @@ -153,10 +162,10 @@ where TConsensusSpec: ConsensusSpec let base_layer_block_hash = current_base_layer_block_hash; let base_layer_block_height = current_base_layer_block_height; - let next_block = self.store.with_write_tx(|tx| { + let (next_block, foreign_proposals) = self.store.with_write_tx(|tx| { let high_qc = HighQc::get(&**tx)?; let high_qc_cert = high_qc.get_quorum_certificate(&**tx)?; - let (next_block, executed_transactions) = self.build_next_block( + let (next_block, foreign_proposals, executed_transactions) = self.build_next_block( tx, epoch, &leaf_block, @@ -185,7 +194,7 @@ where TConsensusSpec: ConsensusSpec } next_block.as_last_proposed().set(tx)?; - Ok::<_, HotStuffError>(next_block) + Ok::<_, HotStuffError>((next_block, foreign_proposals)) })?; info!( @@ -199,7 +208,8 @@ where TConsensusSpec: ConsensusSpec next_block.parent() ); - self.broadcast_local_proposal(next_block, local_committee).await?; + self.broadcast_local_proposal(next_block, foreign_proposals, local_committee) + .await?; Ok(()) } @@ -207,6 +217,7 @@ where TConsensusSpec: ConsensusSpec pub async fn broadcast_local_proposal( &mut self, next_block: Block, + foreign_proposals: Vec, local_committee: &Committee, ) -> Result<(), HotStuffError> { info!( @@ -221,7 +232,8 @@ where TConsensusSpec: ConsensusSpec .multicast( local_committee.iter().map(|(addr, _)| addr), HotstuffMessage::Proposal(ProposalMessage { - block: next_block.clone(), + block: next_block, + foreign_proposals, }), ) .await?; @@ -300,38 +312,44 @@ where TConsensusSpec: ConsensusSpec base_layer_block_height: u64, base_layer_block_hash: FixedHash, propose_epoch_end: bool, - ) -> Result<(Block, HashMap), HotStuffError> { + ) -> Result { // TODO: Configure const TARGET_BLOCK_SIZE: usize = 500; + + let next_height = parent_block.height() + NodeHeight(1); + + let mut total_leader_fee = 0; + + let foreign_proposals = if propose_epoch_end { + vec![] + } else { + ForeignProposal::get_all_new( + tx, + base_layer_block_height, + parent_block.block_id(), + TARGET_BLOCK_SIZE / 4, + )? + }; + let batch = if dont_propose_transactions || propose_epoch_end { vec![] } else { - self.transaction_pool.get_batch_for_next_block(tx, TARGET_BLOCK_SIZE)? + TARGET_BLOCK_SIZE + // Each foreign proposal is "heavier" than a transaction command + .checked_sub(foreign_proposals.len() * 4) + .map(|size| self.transaction_pool.get_batch_for_next_block(tx, size)) + .transpose()? + .unwrap_or_default() }; - let next_height = parent_block.height() + NodeHeight(1); - let mut total_leader_fee = 0; - let locked_block = LockedBlock::get(tx)?; - let pending_proposals = ForeignProposal::get_all_pending(tx, locked_block.block_id(), parent_block.block_id())?; let mut commands = if propose_epoch_end { BTreeSet::from_iter([Command::EndEpoch]) } else { - ForeignProposal::get_all_new(tx)? - .into_iter() - .filter(|foreign_proposal| { - // If the proposal base layer height is too high, ignore for now. - foreign_proposal.base_layer_block_height <= base_layer_block_height && - // If the foreign proposal is already pending, don't propose it again - !pending_proposals.iter().any(|pending_proposal| { - pending_proposal.shard_group == foreign_proposal.shard_group && - pending_proposal.block_id == foreign_proposal.block_id - }) - }) - .map(|mut foreign_proposal| { - foreign_proposal.set_proposed_height(parent_block.height().saturating_add(NodeHeight(1))); - Command::ForeignProposal(foreign_proposal) - }) - .collect() + BTreeSet::from_iter( + foreign_proposals + .iter() + .map(|fp| Command::ForeignProposal(fp.to_atom())), + ) }; // batch is empty for is_empty, is_epoch_end and is_epoch_start blocks @@ -405,7 +423,7 @@ where TConsensusSpec: ConsensusSpec let signature = self.signing_service.sign(next_block.id()); next_block.set_signature(signature); - Ok((next_block, executed_transactions)) + Ok((next_block, foreign_proposals, executed_transactions)) } fn prepare_transaction( diff --git a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs index 51e8c1a74e..1e313d4abd 100644 --- a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs +++ b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs @@ -15,6 +15,7 @@ use tari_dan_storage::{ BlockTransactionExecution, Command, Decision, + ForeignProposalAtom, LastExecuted, LastVoted, LockedBlock, @@ -173,6 +174,16 @@ where TConsensusSpec: ConsensusSpec continue; }; + // CASE: This code checks if the new leaf block causes the transaction to be ready. + // For example, suppose a transaction is LocalPrepared in the currently proposed block, however it + // is not yet the leaf block (because it has not been justified). If we then + // receive the foreign LocalPrepared for this transaction, we evaluate the + // foreign LocalPrepare using the transaction state as it "was" without + // considering data from the as yet unjustified block. This means that we do not + // recognise that the transaction is ready for AllPrepared, and we never propose + // it. This code reevaluates the new leaf blocks and sets any transactions to + // ready that have the required evidence. + if let Some(update_mut) = change_set.next_update_mut(atom.id()) { // The leaf block already approved finalising this transaction (i.e. the justify block proposed // LocalAccept, the leaf proposed (some|all)Accept therefore we already have all evidence). This @@ -231,15 +242,6 @@ where TConsensusSpec: ConsensusSpec if pool_tx.is_ready() { change_set.set_next_transaction_update(&pool_tx, pool_tx.current_stage(), true)?; } else { - // CASE: This code checks if the new leaf block causes the transaction to be ready. - // For example, suppose a transaction is LocalPrepared in the currently proposed block, however it - // is not yet the leaf block (because it has not been justified). If we then - // receive the foreign LocalPrepared for this transaction, we evaluate the - // foreign LocalPrepare using the transaction state as it "was" without - // considering data from the as yet unjustified block. This means that we do not - // recognise that the transaction is ready for AllPrepared, and we never propose - // it. This code reevaluates the new leaf blocks and sets any transactions to - // ready that have the required evidence. let local_prepare_is_justified = cmd .local_prepare() .map(|_| pool_tx.evidence().all_input_addresses_justified()) @@ -402,15 +404,11 @@ where TConsensusSpec: ConsensusSpec return Ok(proposed_block_change_set.no_vote()); } }, - Command::ForeignProposal(foreign_proposal) => { - if !foreign_proposal.exists(tx)? { - warn!( - target: LOG_TARGET, - "❌ Foreign proposal for block {block_id} from {shard_group} does not exist in the store", - block_id = foreign_proposal.block_id,shard_group = foreign_proposal.shard_group - ); + Command::ForeignProposal(fp_atom) => { + if !self.evaluate_foreign_proposal_command(tx, fp_atom, &mut proposed_block_change_set)? { return Ok(proposed_block_change_set.no_vote()); } + continue; }, Command::EndEpoch => { @@ -1284,6 +1282,26 @@ where TConsensusSpec: ConsensusSpec Ok(true) } + fn evaluate_foreign_proposal_command( + &self, + tx: &::ReadTransaction<'_>, + fp_atom: &ForeignProposalAtom, + proposed_block_change_set: &mut ProposedBlockChangeSet, + ) -> Result { + if !fp_atom.exists(tx)? { + warn!( + target: LOG_TARGET, + "❌ NO VOTE: Foreign proposal for block {block_id} has not been received.", + block_id = fp_atom.block_id, + ); + return Ok(false); + } + + proposed_block_change_set.set_foreign_proposal_proposed_in(fp_atom.block_id); + + Ok(true) + } + fn execute_transaction( &self, tx: &::ReadTransaction<'_>, @@ -1349,10 +1367,6 @@ where TConsensusSpec: ConsensusSpec block, ); - for foreign_proposal in block.all_foreign_proposals() { - foreign_proposal.upsert(tx)?; - } - // Release all locks for SomePrepare transactions since these can never be committed SubstateRecord::unlock_all(tx, block.all_some_prepare().map(|t| &t.id).peekable())?; diff --git a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs index d8c642381b..0e8fe3698a 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs @@ -12,7 +12,6 @@ use tari_dan_storage::{ ForeignProposal, ForeignReceiveCounters, LeafBlock, - QuorumCertificate, TransactionAtom, TransactionPool, TransactionPoolRecord, @@ -21,7 +20,6 @@ use tari_dan_storage::{ StateStore, }; use tari_epoch_manager::EpochManagerReader; -use tari_transaction::TransactionId; use crate::{ hotstuff::{error::HotStuffError, pacemaker_handle::PaceMakerHandle, ProposalValidationError}, @@ -62,26 +60,24 @@ where TConsensusSpec: ConsensusSpec message: ForeignProposalMessage, local_committee_info: &CommitteeInfo, ) -> Result<(), HotStuffError> { - let ForeignProposalMessage { - block, - justify_qc, - block_pledge, - } = message; + let proposal = ForeignProposal::from(message); // TODO: validate justify_qc let mut foreign_receive_counter = self .store .with_read_tx(|tx| ForeignReceiveCounters::get_or_default(tx))?; - let vn = self.epoch_manager.get_validator_node(block.epoch(), &from).await?; + let vn = self + .epoch_manager + .get_validator_node_by_public_key(proposal.block().epoch(), proposal.block().proposed_by()) + .await?; let foreign_committee_info = self .epoch_manager - .get_committee_info_for_substate(block.epoch(), vn.shard_key) + .get_committee_info_for_substate(proposal.block().epoch(), vn.shard_key) .await?; if let Err(err) = self.validate_proposed_block( - &from, - &block, + proposal.block(), foreign_committee_info.shard_group(), local_committee_info.shard_group(), &foreign_receive_counter, @@ -98,48 +94,23 @@ where TConsensusSpec: ConsensusSpec foreign_receive_counter.increment_group(foreign_committee_info.shard_group()); - let tx_ids = block - .commands() - .iter() - .filter_map(|command| { - if let Some(tx) = command.local_prepare().or_else(|| command.local_accept()) { - if !foreign_committee_info.includes_any_address(command.evidence().substate_addresses_iter()) { - return None; - } - // We are interested in the commands that are for us, they will be in local prepared and one of the - // evidence shards will be ours - Some(tx.id) - } else { - None - } - }) - .collect::>(); - // Justify QC must justify the block - if justify_qc.block_id() != block.id() { + if proposal.justify_qc().block_id() != proposal.block().id() { warn!( target: LOG_TARGET, "⚠️ FOREIGN PROPOSAL: Justify QC block id does not match the block id. Justify QC block id: {}, block id: {}", - justify_qc.block_id(), - block.id(), + proposal.justify_qc().block_id(), + proposal.block().id(), ); return Ok(()); } - // The block height was validated earlier, so we can use the height only and not store the hash anymore - let foreign_proposal = ForeignProposal::new( - foreign_committee_info.shard_group(), - *block.id(), - tx_ids, - block.base_layer_block_height(), - ); - - if self.store.with_read_tx(|tx| foreign_proposal.exists(tx))? { + if self.store.with_read_tx(|tx| proposal.exists(tx))? { // This is expected behaviour, we may receive the same foreign proposal multiple times debug!( target: LOG_TARGET, "FOREIGN PROPOSAL: Already received proposal for block {}", - block.id(), + proposal.block().id(), ); return Ok(()); } @@ -147,22 +118,16 @@ where TConsensusSpec: ConsensusSpec info!( target: LOG_TARGET, "🧩 Receive FOREIGN PROPOSAL for block {}, justify_qc: {} from {}", - block, - justify_qc, + proposal.block(), + proposal.justify_qc(), from, ); + let block_id = *proposal.block().id(); let result = self.store.with_write_tx(|tx| { foreign_receive_counter.save(tx)?; - foreign_proposal.upsert(tx)?; - self.on_receive_foreign_block( - tx, - &block, - &justify_qc, - &foreign_committee_info, - local_committee_info, - block_pledge, - ) + proposal.upsert(tx, None)?; + self.on_receive_foreign_block(tx, proposal, &foreign_committee_info, local_committee_info) }); match result { @@ -175,7 +140,7 @@ where TConsensusSpec: ConsensusSpec error!( target: LOG_TARGET, "⚠️ FOREIGN PROPOSAL: Failed to process foreign proposal for block {}: {}", - block.id(), + block_id, err ); }, @@ -188,12 +153,16 @@ where TConsensusSpec: ConsensusSpec fn on_receive_foreign_block( &self, tx: &mut ::WriteTransaction<'_>, - block: &Block, - justify_qc: &QuorumCertificate, + foreign_proposal: ForeignProposal, foreign_committee_info: &CommitteeInfo, local_committee_info: &CommitteeInfo, - mut block_pledge: BlockPledge, ) -> Result<(), HotStuffError> { + let ForeignProposal { + block, + justify_qc, + mut block_pledge, + .. + } = foreign_proposal; let local_leaf = LeafBlock::get(&**tx)?; // We only want to save the QC once if applicable let mut is_qc_saved = false; @@ -217,11 +186,14 @@ where TConsensusSpec: ConsensusSpec }; if tx_rec.current_stage() > TransactionPoolStage::LocalPrepared { + // TODO: This can happen if the foreign shard group is only responsible for outputs (the input + // SGs have already progressed to LocalAccept) in which case it is safe to ignore this command. + // However we should not send the proposal in the first place (assuming it does not involve any + // other shard-applicable transactions). warn!( target: LOG_TARGET, - "⚠️ Foreign LocalPrepare proposal (shard_group={}, block={}) received for transaction {} but current transaction stage is {}. Ignoring.", - foreign_committee_info.shard_group(), - block.id(), + "⚠️ Foreign LocalPrepare proposal ({}) received LOCAL_PREPARE for transaction {} but current transaction stage is {}. Ignoring.", + block, tx_rec.transaction_id(), tx_rec.current_stage() ); continue; @@ -234,7 +206,7 @@ where TConsensusSpec: ConsensusSpec if remote_decision.is_abort() && local_decision.is_commit() { info!( target: LOG_TARGET, - "⚠️ Foreign shard ABORT {}. Update overall decision to ABORT. Local stage: {}, Leaf: {}", + "⚠️ Foreign committee ABORT transaction {}. Update overall decision to ABORT. Local stage: {}, Leaf: {}", tx_rec.transaction_id(), tx_rec.current_stage(), local_leaf ); } @@ -368,8 +340,10 @@ where TConsensusSpec: ConsensusSpec if tx_rec.current_stage() > TransactionPoolStage::LocalAccepted { warn!( target: LOG_TARGET, - "⚠️ Foreign proposal received for transaction {} but current transaction stage is {}. Ignoring.", - tx_rec.transaction_id(), tx_rec.current_stage() + "⚠️ Foreign proposal {} received LOCAL_ACCEPT for transaction {} but current transaction stage is {}. Ignoring.", + block, + tx_rec.transaction_id(), + tx_rec.current_stage(), ); continue; } @@ -587,7 +561,6 @@ where TConsensusSpec: ConsensusSpec fn validate_proposed_block( &self, - from: &TConsensusSpec::Addr, candidate_block: &Block, _foreign_shard: ShardGroup, _local_shard: ShardGroup, @@ -616,7 +589,7 @@ where TConsensusSpec: ConsensusSpec // } if candidate_block.is_genesis() { return Err(ProposalValidationError::ProposingGenesisBlock { - proposed_by: from.to_string(), + proposed_by: candidate_block.proposed_by().to_string(), hash: *candidate_block.id(), }); } @@ -624,7 +597,7 @@ where TConsensusSpec: ConsensusSpec let calculated_hash = candidate_block.calculate_hash().into(); if calculated_hash != *candidate_block.id() { return Err(ProposalValidationError::NodeHashMismatch { - proposed_by: from.to_string(), + proposed_by: candidate_block.proposed_by().to_string(), hash: *candidate_block.id(), calculated_hash, }); diff --git a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs index e8019d60e7..504c4ab00b 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs @@ -29,7 +29,7 @@ use crate::{ HotstuffEvent, ProposalValidationError, }, - messages::{ForeignProposalMessage, HotstuffMessage, ProposalMessage, VoteMessage}, + messages::{ForeignProposalMessage, HotstuffMessage, VoteMessage}, traits::{ hooks::ConsensusHooks, ConsensusSpec, @@ -93,9 +93,7 @@ impl OnReceiveLocalProposalHandler Result<(), HotStuffError> { - let ProposalMessage { block } = message; - + pub async fn handle(&mut self, current_epoch: Epoch, block: Block) -> Result<(), HotStuffError> { debug!( target: LOG_TARGET, "🔥 LOCAL PROPOSAL: block {} from {}", @@ -650,7 +648,7 @@ impl OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler OnSyncRequest { block, from ); + // TODO(perf): O(n) queries + let foreign_proposals = match store.with_read_tx(|tx| block.get_foreign_proposals(tx)) { + Ok(foreign_proposals) => foreign_proposals, + Err(err) => { + warn!(target: LOG_TARGET, "Failed to fetch foreign proposals for block {}: {}", block, err); + return; + }, + }; + if let Err(err) = outbound_messaging - .send(from.clone(), HotstuffMessage::Proposal(ProposalMessage { block })) + .send( + from.clone(), + HotstuffMessage::Proposal(ProposalMessage { + block, + foreign_proposals, + }), + ) .await { warn!(target: LOG_TARGET, "Error sending SyncResponse: {err}"); diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index b21a1a0abb..be900d4242 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -670,9 +670,19 @@ impl HotstuffWorker { .await, ), HotstuffMessage::Proposal(msg) => { + // First process attached foreign proposals + for foreign_proposal in msg.foreign_proposals { + log_err( + "on_receive_foreign_proposal", + self.on_receive_foreign_proposal + .handle(from.clone(), foreign_proposal.into(), local_committee_info) + .await, + )?; + } + match log_err( "on_receive_local_proposal", - self.on_receive_local_proposal.handle(current_epoch, msg).await, + self.on_receive_local_proposal.handle(current_epoch, msg.block).await, ) { Ok(_) => Ok(()), Err( diff --git a/dan_layer/consensus/src/messages/proposal.rs b/dan_layer/consensus/src/messages/proposal.rs index b3f32d781b..64a8f97388 100644 --- a/dan_layer/consensus/src/messages/proposal.rs +++ b/dan_layer/consensus/src/messages/proposal.rs @@ -4,11 +4,18 @@ use std::fmt::{Display, Formatter}; use serde::Serialize; -use tari_dan_storage::consensus_models::{Block, BlockPledge, ForeignParkedProposal, QuorumCertificate}; +use tari_dan_storage::consensus_models::{ + Block, + BlockPledge, + ForeignParkedProposal, + ForeignProposal, + QuorumCertificate, +}; #[derive(Debug, Clone, Serialize)] pub struct ProposalMessage { pub block: Block, + pub foreign_proposals: Vec, } impl Display for ProposalMessage { @@ -26,16 +33,33 @@ pub struct ForeignProposalMessage { impl From for ForeignParkedProposal { fn from(msg: ForeignProposalMessage) -> Self { - ForeignParkedProposal::new(msg.block, msg.justify_qc, msg.block_pledge) + ForeignParkedProposal::new(msg.into()) + } +} + +impl From for ForeignProposal { + fn from(msg: ForeignProposalMessage) -> Self { + ForeignProposal::new(msg.block, msg.block_pledge, msg.justify_qc) + } +} + +impl From for ForeignProposalMessage { + fn from(proposal: ForeignProposal) -> Self { + ForeignProposalMessage { + block: proposal.block, + justify_qc: proposal.justify_qc, + block_pledge: proposal.block_pledge, + } } } impl From for ForeignProposalMessage { - fn from(block: ForeignParkedProposal) -> Self { + fn from(proposal: ForeignParkedProposal) -> Self { + let proposal = proposal.into_proposal(); ForeignProposalMessage { - block: block.block, - justify_qc: block.justify_qc, - block_pledge: block.block_pledge, + block: proposal.block, + justify_qc: proposal.justify_qc, + block_pledge: proposal.block_pledge, } } } diff --git a/dan_layer/consensus_tests/src/consensus.rs b/dan_layer/consensus_tests/src/consensus.rs index c8bb7799cb..ab451eadb9 100644 --- a/dan_layer/consensus_tests/src/consensus.rs +++ b/dan_layer/consensus_tests/src/consensus.rs @@ -335,11 +335,11 @@ async fn multi_shard_propose_blocks_with_new_transactions_until_all_committed() } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn foreign_shard_decides_to_abort() { +async fn foreign_shard_group_decides_to_abort() { setup_logger(); let mut test = Test::builder() - .add_committee(0, vec!["1", "3", "5"]) - .add_committee(1, vec!["2", "4", "6"]) + .add_committee(0, vec!["1", "2", "3"]) + .add_committee(1, vec!["4", "5", "6"]) .start() .await; @@ -440,7 +440,8 @@ async fn multishard_local_inputs_foreign_outputs() { async fn multishard_local_inputs_and_outputs_foreign_outputs() { setup_logger(); let mut test = Test::builder() - .debug_sql("/tmp/test{}.db") + // TODO: investigate + .with_test_timeout(Duration::from_secs(60)) .add_committee(0, vec!["1", "2"]) .add_committee(1, vec!["3", "4"]) .add_committee(2, vec!["5", "6"]) diff --git a/dan_layer/consensus_tests/src/support/harness.rs b/dan_layer/consensus_tests/src/support/harness.rs index 0bea9f8a3c..4d35521c85 100644 --- a/dan_layer/consensus_tests/src/support/harness.rs +++ b/dan_layer/consensus_tests/src/support/harness.rs @@ -8,6 +8,7 @@ use std::{ }; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; +use itertools::Itertools; use tari_consensus::hotstuff::HotstuffEvent; use tari_dan_common_types::{committee::Committee, shard::Shard, Epoch, NodeHeight, NumPreshards, ShardGroup}; use tari_dan_storage::{ @@ -269,9 +270,13 @@ impl Test { pub async fn on_block_committed(&mut self) -> (TestAddress, BlockId, Epoch, NodeHeight) { loop { let (address, event) = if let Some(timeout) = self.timeout { - tokio::time::timeout(timeout, self.on_hotstuff_event()) - .await - .unwrap_or_else(|_| panic!("Timeout waiting for Hotstuff event")) + match tokio::time::timeout(timeout, self.on_hotstuff_event()).await { + Ok(v) => v, + Err(_) => { + self.dump_pool_info(); + panic!("Timeout waiting for Hotstuff event"); + }, + } } else { self.on_hotstuff_event().await }; @@ -290,6 +295,24 @@ impl Test { } } + pub fn dump_pool_info(&self) { + for v in self.validators.values().sorted_unstable_by_key(|a| &a.address) { + let pool = v.state_store.with_read_tx(|tx| tx.transaction_pool_get_all()).unwrap(); + for tx in pool { + eprintln!( + "{}: {}->{:?} {}[{}, ready={}, {}]", + v.address, + tx.current_stage(), + tx.pending_stage(), + tx.transaction_id(), + tx.current_decision(), + tx.is_ready(), + tx.evidence() + ); + } + } + } + pub fn network(&mut self) -> &mut TestNetwork { &mut self.network } diff --git a/dan_layer/consensus_tests/src/support/logging.rs b/dan_layer/consensus_tests/src/support/logging.rs index 9a79040b69..4ee0366213 100644 --- a/dan_layer/consensus_tests/src/support/logging.rs +++ b/dan_layer/consensus_tests/src/support/logging.rs @@ -2,10 +2,6 @@ // SPDX-License-Identifier: BSD-3-Clause pub fn setup_logger() { - if option_env!("CI").is_some() { - return; - } - let _ignore = fern::Dispatch::new() // Perform allocation-free log formatting .format(|out, message, record| { diff --git a/dan_layer/consensus_tests/src/support/network.rs b/dan_layer/consensus_tests/src/support/network.rs index c687555d87..17ed9745cf 100644 --- a/dan_layer/consensus_tests/src/support/network.rs +++ b/dan_layer/consensus_tests/src/support/network.rs @@ -310,7 +310,7 @@ impl TestNetworkWorker { } pub async fn handle_broadcast(&mut self, from: TestAddress, to: Vec, msg: HotstuffMessage) { - log::debug!("🌎️ Broadcast {} from {} to {}", msg, from, to.iter().join(", ")); + log::debug!("✉️ Broadcast {} from {} to {}", msg, from, to.iter().join(", ")); for vn in to { if let Some(message_filter) = &self.message_filter { if !message_filter(&from, &vn, &msg) { diff --git a/dan_layer/engine_types/src/fee_claim.rs b/dan_layer/engine_types/src/fee_claim.rs index 7d84cce95e..c52409a47f 100644 --- a/dan_layer/engine_types/src/fee_claim.rs +++ b/dan_layer/engine_types/src/fee_claim.rs @@ -6,7 +6,11 @@ use std::{fmt, fmt::Display}; use serde::{Deserialize, Serialize}; use tari_bor::BorTag; use tari_common_types::types::PublicKey; -use tari_template_lib::{models::BinaryTag, prelude::Amount, Hash}; +use tari_template_lib::{ + models::{BinaryTag, ObjectKey}, + prelude::Amount, + Hash, +}; #[cfg(feature = "ts")] use ts_rs::TS; @@ -16,11 +20,12 @@ const TAG: u64 = BinaryTag::FeeClaim.as_u64(); #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] #[cfg_attr(feature = "ts", derive(TS), ts(export, export_to = "../../bindings/src/types/"))] -pub struct FeeClaimAddress(#[cfg_attr(feature = "ts", ts(type = "string"))] BorTag); +pub struct FeeClaimAddress(#[cfg_attr(feature = "ts", ts(type = "string"))] BorTag); impl FeeClaimAddress { - pub const fn new(address: Hash) -> Self { - Self(BorTag::new(address)) + pub const fn from_hash(hash: Hash) -> Self { + let key = ObjectKey::from_array(hash.into_array()); + Self(BorTag::new(key)) } pub fn from_addr>(epoch: u64, addr: TAddr) -> Self { @@ -28,23 +33,23 @@ impl FeeClaimAddress { .chain(&epoch) .chain(addr.as_ref()) .result(); - Self::new(hash) + Self::from_hash(hash) } - pub fn hash(&self) -> &Hash { + pub fn as_object_key(&self) -> &ObjectKey { self.0.inner() } } impl> From for FeeClaimAddress { fn from(address: T) -> Self { - Self::new(address.into()) + Self::from_hash(address.into()) } } impl Display for FeeClaimAddress { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "feeclaim_{}", self.hash()) + write!(f, "feeclaim_{}", self.as_object_key()) } } diff --git a/dan_layer/engine_types/src/substate.rs b/dan_layer/engine_types/src/substate.rs index 6dc7327fca..4ef7dd0502 100644 --- a/dan_layer/engine_types/src/substate.rs +++ b/dan_layer/engine_types/src/substate.rs @@ -33,6 +33,7 @@ use tari_template_lib::{ ComponentAddress, NonFungibleAddress, NonFungibleIndexAddress, + ObjectKey, ResourceAddress, UnclaimedConfidentialOutputAddress, VaultId, @@ -47,7 +48,7 @@ use crate::{ component::ComponentHeader, confidential::UnclaimedConfidentialOutput, fee_claim::{FeeClaim, FeeClaimAddress}, - hashing::substate_value_hasher32, + hashing::{hasher32, substate_value_hasher32, EngineHashDomainLabel}, non_fungible::NonFungibleContainer, non_fungible_index::NonFungibleIndex, resource::Resource, @@ -170,6 +171,36 @@ impl SubstateId { decode_exact(bytes) } + pub fn to_object_key(&self) -> ObjectKey { + match self { + SubstateId::Component(addr) => *addr.as_object_key(), + SubstateId::Resource(addr) => *addr.as_object_key(), + SubstateId::Vault(addr) => *addr.as_object_key(), + SubstateId::NonFungible(addr) => { + let key = hasher32(EngineHashDomainLabel::NonFungibleId) + .chain(addr.resource_address()) + .chain(addr.id()) + .result() + .trailing_bytes() + .into(); + + ObjectKey::new(addr.resource_address().as_entity_id(), key) + }, + SubstateId::NonFungibleIndex(addr) => { + let key = hasher32(EngineHashDomainLabel::NonFungibleIndex) + .chain(addr.resource_address()) + .chain(&addr.index()) + .result() + .trailing_bytes() + .into(); + ObjectKey::new(addr.resource_address().as_entity_id(), key) + }, + SubstateId::UnclaimedConfidentialOutput(addr) => *addr.as_object_key(), + SubstateId::TransactionReceipt(addr) => *addr.as_object_key(), + SubstateId::FeeClaim(addr) => *addr.as_object_key(), + } + } + // TODO: look at using BECH32 standard pub fn to_address_string(&self) -> String { self.to_string() diff --git a/dan_layer/engine_types/src/transaction_receipt.rs b/dan_layer/engine_types/src/transaction_receipt.rs index 953074d2fe..7041938bb2 100644 --- a/dan_layer/engine_types/src/transaction_receipt.rs +++ b/dan_layer/engine_types/src/transaction_receipt.rs @@ -8,7 +8,11 @@ use std::{ use serde::{Deserialize, Serialize}; use tari_bor::BorTag; -use tari_template_lib::{models::BinaryTag, Hash, HashParseError}; +use tari_template_lib::{ + models::{BinaryTag, ObjectKey}, + Hash, + HashParseError, +}; #[cfg(feature = "ts")] use ts_rs::TS; @@ -18,32 +22,33 @@ const TAG: u64 = BinaryTag::TransactionReceipt.as_u64(); #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] #[cfg_attr(feature = "ts", derive(TS), ts(export, export_to = "../../bindings/src/types/"))] -pub struct TransactionReceiptAddress(#[cfg_attr(feature = "ts", ts(type = "string"))] BorTag); +pub struct TransactionReceiptAddress(#[cfg_attr(feature = "ts", ts(type = "string"))] BorTag); impl TransactionReceiptAddress { - pub const fn new(address: Hash) -> Self { - Self(BorTag::new(address)) + pub const fn from_hash(hash: Hash) -> Self { + let key = ObjectKey::from_array(hash.into_array()); + Self(BorTag::new(key)) } - pub fn hash(&self) -> &Hash { + pub fn as_object_key(&self) -> &ObjectKey { self.0.inner() } pub fn from_hex(hex: &str) -> Result { let hash = Hash::from_hex(hex)?; - Ok(Self::new(hash)) + Ok(Self::from_hash(hash)) } } impl> From for TransactionReceiptAddress { fn from(address: T) -> Self { - Self::new(address.into()) + Self::from_hash(address.into()) } } impl Display for TransactionReceiptAddress { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "txreceipt_{}", self.hash()) + write!(f, "txreceipt_{}", self.as_object_key()) } } diff --git a/dan_layer/p2p/proto/consensus.proto b/dan_layer/p2p/proto/consensus.proto index 840081f643..35149902c4 100644 --- a/dan_layer/p2p/proto/consensus.proto +++ b/dan_layer/p2p/proto/consensus.proto @@ -30,9 +30,14 @@ message NewViewMessage { message ProposalMessage { Block block = 1; + repeated ForeignProposal foreign_proposals = 2; } message ForeignProposalMessage { + ForeignProposal proposal = 1; +} + +message ForeignProposal { Block block = 1; QuorumCertificate justify_qc = 2; repeated TransactionPledge block_pledge = 3; @@ -112,25 +117,15 @@ message Command { TransactionAtom all_accept = 7; TransactionAtom some_accept = 8; - ForeignProposal foreign_proposal = 9; + ForeignProposalAtom foreign_proposal = 9; bool end_epoch = 10; } } -enum ForeignProposalState { - UNKNOWN_STATE = 0; - NEW = 1; - MINED = 2; - DELETED = 3; -} - -message ForeignProposal { - uint32 shard_group = 1; - bytes block_id = 2; - ForeignProposalState state = 3; - uint64 mined_at = 4; - repeated bytes transactions = 5; - uint64 base_layer_block_height = 6; +message ForeignProposalAtom { + bytes block_id = 1; + uint32 shard_group = 2; + uint64 base_layer_block_height = 3; } message TransactionAtom { diff --git a/dan_layer/p2p/src/conversions/consensus.rs b/dan_layer/p2p/src/conversions/consensus.rs index 864e4b9d6d..0689330915 100644 --- a/dan_layer/p2p/src/conversions/consensus.rs +++ b/dan_layer/p2p/src/conversions/consensus.rs @@ -45,7 +45,7 @@ use tari_dan_storage::consensus_models::{ Decision, Evidence, ForeignProposal, - ForeignProposalState, + ForeignProposalAtom, HighQc, LeaderFee, QcId, @@ -153,6 +153,7 @@ impl From<&ProposalMessage> for proto::consensus::ProposalMessage { fn from(value: &ProposalMessage) -> Self { Self { block: Some((&value.block).into()), + foreign_proposals: value.foreign_proposals.iter().map(Into::into).collect(), } } } @@ -163,6 +164,11 @@ impl TryFrom for ProposalMessage { fn try_from(value: proto::consensus::ProposalMessage) -> Result { Ok(ProposalMessage { block: value.block.ok_or_else(|| anyhow!("Block is missing"))?.try_into()?, + foreign_proposals: value + .foreign_proposals + .into_iter() + .map(TryInto::try_into) + .collect::>()?, }) } } @@ -172,9 +178,7 @@ impl TryFrom for ProposalMessage { impl From<&ForeignProposalMessage> for proto::consensus::ForeignProposalMessage { fn from(value: &ForeignProposalMessage) -> Self { Self { - block: Some(proto::consensus::Block::from(&value.block)), - justify_qc: Some(proto::consensus::QuorumCertificate::from(&value.justify_qc)), - block_pledge: value.block_pledge.iter().map(Into::into).collect(), + proposal: Some(proto::consensus::ForeignProposal::from(value)), } } } @@ -183,13 +187,14 @@ impl TryFrom for ForeignProposalMessag type Error = anyhow::Error; fn try_from(value: proto::consensus::ForeignProposalMessage) -> Result { + let proposal = value.proposal.ok_or_else(|| anyhow!("Proposal is missing"))?; Ok(ForeignProposalMessage { - block: value.block.ok_or_else(|| anyhow!("Block is missing"))?.try_into()?, - justify_qc: value + block: proposal.block.ok_or_else(|| anyhow!("Block is missing"))?.try_into()?, + justify_qc: proposal .justify_qc .ok_or_else(|| anyhow!("Justify QC is missing"))? .try_into()?, - block_pledge: value + block_pledge: proposal .block_pledge .into_iter() .map(TryInto::try_into) @@ -198,6 +203,45 @@ impl TryFrom for ForeignProposalMessag } } +impl From<&ForeignProposalMessage> for proto::consensus::ForeignProposal { + fn from(value: &ForeignProposalMessage) -> Self { + Self { + block: Some(proto::consensus::Block::from(&value.block)), + justify_qc: Some(proto::consensus::QuorumCertificate::from(&value.justify_qc)), + block_pledge: value.block_pledge.iter().map(Into::into).collect(), + } + } +} + +impl From<&ForeignProposal> for proto::consensus::ForeignProposal { + fn from(value: &ForeignProposal) -> Self { + Self { + block: Some(proto::consensus::Block::from(&value.block)), + justify_qc: Some(proto::consensus::QuorumCertificate::from(&value.justify_qc)), + block_pledge: value.block_pledge.iter().map(Into::into).collect(), + } + } +} + +impl TryFrom for ForeignProposal { + type Error = anyhow::Error; + + fn try_from(value: proto::consensus::ForeignProposal) -> Result { + Ok(Self::new( + value.block.ok_or_else(|| anyhow!("Block is missing"))?.try_into()?, + value + .block_pledge + .into_iter() + .map(TryInto::try_into) + .collect::>()?, + value + .justify_qc + .ok_or_else(|| anyhow!("Justify QC is missing"))? + .try_into()?, + )) + } +} + // -------------------------------- TransactionPledge -------------------------------- // impl From<(&TransactionId, &SubstatePledges)> for proto::consensus::TransactionPledge { @@ -572,68 +616,26 @@ impl TryFrom for LeaderFee { } } -// ForeignProposalState -// -------------------------------- Decision -------------------------------- // - -impl From for proto::consensus::ForeignProposalState { - fn from(value: ForeignProposalState) -> Self { - match value { - ForeignProposalState::New => proto::consensus::ForeignProposalState::New, - ForeignProposalState::Proposed => proto::consensus::ForeignProposalState::Mined, - ForeignProposalState::Deleted => proto::consensus::ForeignProposalState::Deleted, - } - } -} - -impl TryFrom for ForeignProposalState { - type Error = anyhow::Error; - - fn try_from(value: proto::consensus::ForeignProposalState) -> Result { - match value { - proto::consensus::ForeignProposalState::New => Ok(ForeignProposalState::New), - proto::consensus::ForeignProposalState::Mined => Ok(ForeignProposalState::Proposed), - proto::consensus::ForeignProposalState::Deleted => Ok(ForeignProposalState::Deleted), - proto::consensus::ForeignProposalState::UnknownState => Err(anyhow!("Foreign proposal state not provided")), - } - } -} - -// ForeignProposal +// -------------------------------- ForeignProposalAtom -------------------------------- // -impl From<&ForeignProposal> for proto::consensus::ForeignProposal { - fn from(value: &ForeignProposal) -> Self { +impl From<&ForeignProposalAtom> for proto::consensus::ForeignProposalAtom { + fn from(value: &ForeignProposalAtom) -> Self { Self { - shard_group: value.shard_group.encode_as_u32(), block_id: value.block_id.as_bytes().to_vec(), - state: proto::consensus::ForeignProposalState::from(value.state).into(), - mined_at: value.proposed_height.map(|a| a.0).unwrap_or(0), - transactions: value.transactions.iter().map(|tx| tx.as_bytes().to_vec()).collect(), + shard_group: value.shard_group.encode_as_u32(), base_layer_block_height: value.base_layer_block_height, } } } -impl TryFrom for ForeignProposal { +impl TryFrom for ForeignProposalAtom { type Error = anyhow::Error; - fn try_from(value: proto::consensus::ForeignProposal) -> Result { - Ok(ForeignProposal { + fn try_from(value: proto::consensus::ForeignProposalAtom) -> Result { + Ok(ForeignProposalAtom { + block_id: BlockId::try_from(value.block_id)?, shard_group: ShardGroup::decode_from_u32(value.shard_group) .ok_or_else(|| anyhow!("Block shard_group ({}) is not a valid", value.shard_group))?, - block_id: BlockId::try_from(value.block_id)?, - state: proto::consensus::ForeignProposalState::try_from(value.state) - .map_err(|_| anyhow!("Invalid foreign proposal state value {}", value.state))? - .try_into()?, - proposed_height: if value.mined_at == 0 { - None - } else { - Some(NodeHeight(value.mined_at)) - }, - transactions: value - .transactions - .into_iter() - .map(|tx| tx.try_into()) - .collect::>()?, base_layer_block_height: value.base_layer_block_height, }) } diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index d42e2ef642..c38519f02b 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -58,10 +58,10 @@ create table parked_blocks total_leader_fee bigint not NULL, foreign_indexes text not NULL, signature text NULL, - block_time bigint NULL, timestamp bigint not NULL, base_layer_block_height bigint not NULL, base_layer_block_hash text not NULL, + foreign_proposals text not NULL, created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ); @@ -349,15 +349,30 @@ create table votes CREATE TABLE foreign_proposals ( - id integer not NULL primary key AUTOINCREMENT, - shard_group integer not NULL, + id integer not null primary key AUTOINCREMENT, block_id text not NULL, - state text not NULL, - proposed_height bigint NULL, - transactions text not NULL, + parent_block_id text not NULL, + merkle_root text not NULL, + network text not NULL, + height bigint not NULL, + epoch bigint not NULL, + shard_group integer not NULL, + proposed_by text not NULL, + qc text not NULL, + command_count bigint not NULL, + commands text not NULL, + total_leader_fee bigint not NULL, + foreign_indexes text not NULL, + signature text NULL, + timestamp bigint not NULL, base_layer_block_height bigint not NULL, - created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP, - UNIQUE (shard_group, block_id) + base_layer_block_hash text not NULL, + justify_qc_id text not NULL REFERENCES quorum_certificates (qc_id), + block_pledge text not NULL, + proposed_in_block text NULL REFERENCES blocks (block_id), + proposed_in_block_height bigint NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE (block_id) ); CREATE TABLE foreign_send_counters diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index 755731f0a2..9a67f5f1ea 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -25,6 +25,7 @@ use diesel::{ SqliteConnection, TextExpressionMethods, }; +use indexmap::IndexMap; use log::*; use serde::{de::DeserializeOwned, Serialize}; use tari_common_types::types::{FixedHash, PublicKey}; @@ -38,7 +39,7 @@ use tari_dan_storage::{ Command, EpochCheckpoint, ForeignProposal, - ForeignProposalState, + ForeignProposalAtom, ForeignReceiveCounters, ForeignSendCounters, HighQc, @@ -77,7 +78,7 @@ use tari_utilities::ByteArray; use crate::{ error::SqliteStorageError, - serialization::{deserialize_hex_try_from, deserialize_json, parse_from_string, serialize_hex, serialize_json}, + serialization::{deserialize_hex_try_from, deserialize_json, parse_from_string, serialize_hex}, sql_models, sqlite_transaction::SqliteTransaction, }; @@ -116,15 +117,15 @@ impl<'a, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'a> SqliteState from_block_id: &BlockId, to_block_id: &BlockId, transaction_ids: ITx, - ) -> Result, SqliteStorageError> + ) -> Result, SqliteStorageError> where ITx: Iterator + ExactSizeIterator, { if transaction_ids.len() == 0 { - return Ok(HashMap::new()); + return Ok(IndexMap::new()); } - let applicable_block_ids = self.get_block_ids_that_change_state_between(from_block_id, to_block_id)?; + let applicable_block_ids = self.get_block_ids_with_commands_between(from_block_id, to_block_id)?; debug!( target: LOG_TARGET, @@ -134,7 +135,7 @@ impl<'a, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'a> SqliteState applicable_block_ids.len()); if applicable_block_ids.is_empty() { - return Ok(HashMap::new()); + return Ok(IndexMap::new()); } self.create_transaction_atom_updates_query(transaction_ids, applicable_block_ids.iter().map(|s| s.as_str())) @@ -253,7 +254,7 @@ impl<'a, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'a> SqliteState .collect() } - pub(crate) fn get_block_ids_that_change_state_between( + pub(crate) fn get_block_ids_with_commands_between( &self, start_block: &BlockId, end_block: &BlockId, @@ -465,47 +466,102 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor high_qc.try_into() } - fn foreign_proposal_exists(&self, foreign_proposal: &ForeignProposal) -> Result { + fn foreign_proposals_get_any<'a, I: IntoIterator>( + &self, + block_ids: I, + ) -> Result, StorageError> { + use crate::schema::{foreign_proposals, quorum_certificates}; + + let foreign_proposals = foreign_proposals::table + .left_join(quorum_certificates::table.on(foreign_proposals::justify_qc_id.eq(quorum_certificates::qc_id))) + .filter(foreign_proposals::block_id.eq_any(block_ids.into_iter().map(serialize_hex))) + .get_results::<(sql_models::ForeignProposal, Option)>(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "foreign_proposals_get_any", + source: e, + })?; + + foreign_proposals + .into_iter() + .map(|(proposal, qc)| { + let justify_qc = qc.ok_or_else(|| SqliteStorageError::DbInconsistency { + operation: "foreign_proposals_get_any", + details: format!( + "foreign proposal {} references non-existent quorum certificate {}", + proposal.block_id, proposal.justify_qc_id + ), + })?; + proposal.try_convert(justify_qc) + }) + .collect() + } + + fn foreign_proposals_exists(&self, block_id: &BlockId) -> Result { use crate::schema::foreign_proposals; let foreign_proposals = foreign_proposals::table - .filter(foreign_proposals::shard_group.eq(foreign_proposal.shard_group.encode_as_u32() as i32)) - .filter(foreign_proposals::block_id.eq(serialize_hex(foreign_proposal.block_id))) - .filter(foreign_proposals::transactions.eq(serialize_json(&foreign_proposal.transactions)?)) - .filter(foreign_proposals::base_layer_block_height.eq(foreign_proposal.base_layer_block_height as i64)) + .filter(foreign_proposals::block_id.eq(serialize_hex(block_id))) .count() .limit(1) .get_result::(self.connection()) .map_err(|e| SqliteStorageError::DieselError { - operation: "foreign_proposal_exists", + operation: "foreign_proposals_exists", source: e, })?; Ok(foreign_proposals > 0) } - fn foreign_proposal_get_all_new(&self) -> Result, StorageError> { - use crate::schema::foreign_proposals; + fn foreign_proposals_get_all_new( + &self, + max_base_layer_block_height: u64, + block_id: &BlockId, + limit: usize, + ) -> Result, StorageError> { + use crate::schema::{foreign_proposals, quorum_certificates}; + + let locked = self.locked_block_get()?; + let pending_block_ids = self.get_block_ids_with_commands_between(&locked.block_id, block_id)?; let foreign_proposals = foreign_proposals::table - .filter(foreign_proposals::state.eq(ForeignProposalState::New.to_string())) - .load::(self.connection()) + .left_join(quorum_certificates::table.on(foreign_proposals::justify_qc_id.eq(quorum_certificates::qc_id))) + // Only propose the Foreign proposal if we have reached the base layer block height specified in the block + .filter(foreign_proposals::base_layer_block_height.le(max_base_layer_block_height as i64)) + .filter( + foreign_proposals::proposed_in_block.is_null() + .or(foreign_proposals::proposed_in_block.ne_all(pending_block_ids) + .and(foreign_proposals::proposed_in_block_height.gt(locked.height.as_u64() as i64))), + ) + .limit(i64::try_from(limit).unwrap_or(i64::MAX)) + .get_results::<(sql_models::ForeignProposal, Option)>(self.connection()) .map_err(|e| SqliteStorageError::DieselError { - operation: "foreign_proposal_get_all", + operation: "foreign_proposals_get_all_new", source: e, })?; - foreign_proposals.into_iter().map(|p| p.try_into()).collect() + foreign_proposals + .into_iter() + .map(|(proposal, qc)| { + let justify_qc = qc.ok_or_else(|| SqliteStorageError::DbInconsistency { + operation: "foreign_proposals_get_all_new", + details: format!( + "foreign proposal {} references non-existent quorum certificate {}", + proposal.block_id, proposal.justify_qc_id + ), + })?; + proposal.try_convert(justify_qc) + }) + .collect() } fn foreign_proposal_get_all_pending( &self, from_block_id: &BlockId, to_block_id: &BlockId, - ) -> Result, StorageError> { + ) -> Result, StorageError> { use crate::schema::blocks; - let blocks = self.get_block_ids_that_change_state_between(from_block_id, to_block_id)?; + let blocks = self.get_block_ids_with_commands_between(from_block_id, to_block_id)?; let all_commands: Vec = blocks::table .select(blocks::commands) @@ -524,22 +580,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor Ok(all_commands .into_iter() .filter_map(|command| command.foreign_proposal().cloned()) - .collect::>()) - } - - fn foreign_proposal_get_all_proposed(&self, to_height: NodeHeight) -> Result, StorageError> { - use crate::schema::foreign_proposals; - - let foreign_proposals = foreign_proposals::table - .filter(foreign_proposals::state.eq(ForeignProposalState::Proposed.to_string())) - .filter(foreign_proposals::proposed_height.le(to_height.0 as i64)) - .load::(self.connection()) - .map_err(|e| SqliteStorageError::DieselError { - operation: "foreign_proposal_get_all", - source: e, - })?; - - foreign_proposals.into_iter().map(|p| p.try_into()).collect() + .collect::>()) } fn foreign_send_counters_get(&self, block_id: &BlockId) -> Result { @@ -781,13 +822,13 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor &self, epoch: Epoch, shard_group: ShardGroup, - start_block_id_exclusive: &BlockId, - end_block_id_inclusive: &BlockId, + start_block_id: &BlockId, + end_block_id: &BlockId, include_dummy_blocks: bool, ) -> Result, StorageError> { use crate::schema::{blocks, quorum_certificates}; - let block_ids = self.get_block_ids_between(start_block_id_exclusive, end_block_id_inclusive)?; + let block_ids = self.get_block_ids_between(start_block_id, end_block_id)?; if block_ids.is_empty() { return Ok(vec![]); } @@ -1246,7 +1287,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor ) -> Result { use crate::schema::block_diffs; let commit_block = self.get_commit_block_id()?; - let block_ids = self.get_block_ids_that_change_state_between(&commit_block, block_id)?; + let block_ids = self.get_block_ids_with_commands_between(&commit_block, block_id)?; let diff = block_diffs::table .filter(block_diffs::block_id.eq_any(block_ids)) @@ -1261,24 +1302,6 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor sql_models::BlockDiff::try_convert_change(diff) } - fn parked_blocks_exists(&self, block_id: &BlockId) -> Result { - use crate::schema::parked_blocks; - - let block_id = serialize_hex(block_id); - - let count = parked_blocks::table - .filter(parked_blocks::block_id.eq(&block_id)) - .count() - .limit(1) - .get_result::(self.connection()) - .map_err(|e| SqliteStorageError::DieselError { - operation: "blocks_exists_or_parked", - source: e, - })?; - - Ok(count > 0) - } - fn quorum_certificates_get(&self, qc_id: &QcId) -> Result { use crate::schema::quorum_certificates; @@ -1375,7 +1398,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor source: e, })?; - rec.try_convert(updates.remove(&transaction_id)) + rec.try_convert(updates.swap_remove(&transaction_id)) } fn transaction_pool_exists(&self, transaction_id: &TransactionId) -> Result { @@ -1443,7 +1466,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor ready_txs .into_iter() .map(|rec| { - let maybe_update = updates.remove(&rec.transaction_id); + let maybe_update = updates.swap_remove(&rec.transaction_id); rec.try_convert(maybe_update) }) // Filter only Ok where is_ready == true (after update) or Err @@ -1976,7 +1999,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor // Get the last committed block let committed_block_id = self.get_commit_block_id()?; - let block_ids = self.get_block_ids_that_change_state_between(&committed_block_id, block_id)?; + let block_ids = self.get_block_ids_with_commands_between(&committed_block_id, block_id)?; if block_ids.is_empty() { return Ok(HashMap::new()); diff --git a/dan_layer/state_store_sqlite/src/schema.rs b/dan_layer/state_store_sqlite/src/schema.rs index 087837abbc..f202cbe6db 100644 --- a/dan_layer/state_store_sqlite/src/schema.rs +++ b/dan_layer/state_store_sqlite/src/schema.rs @@ -76,12 +76,27 @@ diesel::table! { diesel::table! { foreign_proposals (id) { id -> Integer, - shard_group -> Integer, block_id -> Text, - state -> Text, - proposed_height -> Nullable, - transactions -> Text, + parent_block_id -> Text, + merkle_root -> Text, + network -> Text, + height -> BigInt, + epoch -> BigInt, + shard_group -> Integer, + proposed_by -> Text, + qc -> Text, + command_count -> BigInt, + commands -> Text, + total_leader_fee -> BigInt, + foreign_indexes -> Text, + signature -> Nullable, + timestamp -> BigInt, base_layer_block_height -> BigInt, + base_layer_block_hash -> Text, + justify_qc_id -> Text, + block_pledge -> Text, + proposed_in_block -> Nullable, + proposed_in_block_height -> Nullable, created_at -> Timestamp, } } @@ -217,10 +232,10 @@ diesel::table! { total_leader_fee -> BigInt, foreign_indexes -> Text, signature -> Nullable, - block_time -> Nullable, timestamp -> BigInt, base_layer_block_height -> BigInt, base_layer_block_hash -> Text, + foreign_proposals -> Text, created_at -> Timestamp, } } diff --git a/dan_layer/state_store_sqlite/src/sql_models/block.rs b/dan_layer/state_store_sqlite/src/sql_models/block.rs index e511ffe77b..346750565b 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/block.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/block.rs @@ -102,14 +102,14 @@ pub struct ParkedBlock { pub total_leader_fee: i64, pub foreign_indexes: String, pub signature: Option, - pub block_time: Option, pub timestamp: i64, pub base_layer_block_height: i64, pub base_layer_block_hash: String, + pub foreign_proposals: String, pub created_at: PrimitiveDateTime, } -impl TryFrom for consensus_models::Block { +impl TryFrom for (consensus_models::Block, Vec) { type Error = StorageError; fn try_from(value: ParkedBlock) -> Result { @@ -118,7 +118,7 @@ impl TryFrom for consensus_models::Block { item: "block", details: format!("Block #{} network byte is not a valid Network", value.id), })?; - Ok(consensus_models::Block::load( + let block = consensus_models::Block::load( deserialize_hex_try_from(&value.block_id)?, network, deserialize_hex_try_from(&value.parent_block_id)?, @@ -147,10 +147,12 @@ impl TryFrom for consensus_models::Block { deserialize_json(&value.foreign_indexes)?, value.signature.map(|val| deserialize_json(&val)).transpose()?, value.created_at, - value.block_time.map(|v| v as u64), + None, value.timestamp as u64, value.base_layer_block_height as u64, deserialize_hex_try_from(&value.base_layer_block_hash)?, - )) + ); + let foreign_proposals = deserialize_json(&value.foreign_proposals)?; + Ok((block, foreign_proposals)) } } diff --git a/dan_layer/state_store_sqlite/src/sql_models/bookkeeping.rs b/dan_layer/state_store_sqlite/src/sql_models/bookkeeping.rs index 03431897a5..b14351718d 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/bookkeeping.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/bookkeeping.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use diesel::Queryable; -use tari_dan_common_types::{Epoch, NodeHeight, ShardGroup}; +use tari_dan_common_types::{Epoch, NodeHeight}; use tari_dan_storage::{ consensus_models::{self, QuorumDecision}, StorageError, @@ -11,7 +11,7 @@ use time::PrimitiveDateTime; use crate::{ error::SqliteStorageError, - serialization::{deserialize_hex_try_from, deserialize_json, parse_from_string}, + serialization::{deserialize_hex_try_from, deserialize_json}, }; #[derive(Debug, Clone, Queryable)] @@ -37,72 +37,6 @@ impl TryFrom for consensus_models::HighQc { } } -#[derive(Debug, Clone, Queryable)] -pub struct ForeignProposal { - pub id: i32, - pub shard_group: i32, - pub block_id: String, - pub state: String, - pub mined_at: Option, - pub transactions: String, - pub base_layer_block_height: i64, - pub created_at: PrimitiveDateTime, -} - -impl TryFrom for consensus_models::ForeignProposal { - type Error = StorageError; - - fn try_from(value: ForeignProposal) -> Result { - Ok(Self { - shard_group: ShardGroup::decode_from_u32(value.shard_group as u32).ok_or_else(|| { - StorageError::DataInconsistency { - details: format!("Invalid shard group: {}", value.shard_group), - } - })?, - block_id: deserialize_hex_try_from(&value.block_id)?, - state: parse_from_string(&value.state)?, - proposed_height: value.mined_at.map(|mined_at| NodeHeight(mined_at as u64)), - transactions: deserialize_json(&value.transactions)?, - base_layer_block_height: value.base_layer_block_height as u64, - }) - } -} - -#[derive(Debug, Clone, Queryable)] -pub struct ForeignSendCounters { - pub id: i32, - pub block_id: String, - pub counters: String, - pub created_at: PrimitiveDateTime, -} - -impl TryFrom for consensus_models::ForeignSendCounters { - type Error = StorageError; - - fn try_from(value: ForeignSendCounters) -> Result { - Ok(Self { - counters: deserialize_json(&value.counters)?, - }) - } -} - -#[derive(Debug, Clone, Queryable)] -pub struct ForeignReceiveCounters { - pub id: i32, - pub counters: String, - pub created_at: PrimitiveDateTime, -} - -impl TryFrom for consensus_models::ForeignReceiveCounters { - type Error = StorageError; - - fn try_from(value: ForeignReceiveCounters) -> Result { - Ok(Self { - counters: deserialize_json(&value.counters)?, - }) - } -} - #[derive(Debug, Clone, Queryable)] pub struct LockedBlock { pub id: i32, diff --git a/dan_layer/state_store_sqlite/src/sql_models/foreign_parked_block.rs b/dan_layer/state_store_sqlite/src/sql_models/foreign_parked_block.rs index a7643f6127..efcd8721dc 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/foreign_parked_block.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/foreign_parked_block.rs @@ -26,9 +26,7 @@ impl TryFrom for consensus_models::ForeignParkedProposal { let justify_qc = deserialize_json(&value.justify_qc)?; Ok(consensus_models::ForeignParkedProposal::new( - block, - justify_qc, - block_pledge, + consensus_models::ForeignProposal::new(block, block_pledge, justify_qc), )) } } diff --git a/dan_layer/state_store_sqlite/src/sql_models/foreign_proposal.rs b/dan_layer/state_store_sqlite/src/sql_models/foreign_proposal.rs new file mode 100644 index 0000000000..24d1f8ab71 --- /dev/null +++ b/dan_layer/state_store_sqlite/src/sql_models/foreign_proposal.rs @@ -0,0 +1,132 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use diesel::Queryable; +use tari_common_types::types::PublicKey; +use tari_dan_common_types::{Epoch, NodeHeight, ShardGroup}; +use tari_dan_storage::{consensus_models, StorageError}; +use tari_utilities::ByteArray; +use time::PrimitiveDateTime; + +use crate::{ + serialization::{deserialize_hex, deserialize_hex_try_from, deserialize_json}, + sql_models::QuorumCertificate, +}; + +#[derive(Debug, Clone, Queryable)] +pub struct ForeignProposal { + pub id: i32, + pub block_id: String, + pub parent_block_id: String, + pub merkle_root: String, + pub network: String, + pub height: i64, + pub epoch: i64, + pub shard_group: i32, + pub proposed_by: String, + pub qc: String, + pub command_count: i64, + pub commands: String, + pub total_leader_fee: i64, + pub foreign_indexes: String, + pub signature: Option, + pub timestamp: i64, + pub base_layer_block_height: i64, + pub base_layer_block_hash: String, + pub justify_qc_id: String, + pub block_pledge: String, + pub proposed_in_block: Option, + pub proposed_in_block_height: Option, + pub created_at: PrimitiveDateTime, +} + +impl ForeignProposal { + pub fn try_convert(self, justify_qc: QuorumCertificate) -> Result { + let network = self.network.parse().map_err(|_| StorageError::DecodingError { + operation: "try_convert", + item: "block", + details: format!("Block #{} network byte is not a valid Network", self.id), + })?; + let block = consensus_models::Block::load( + deserialize_hex_try_from(&self.block_id)?, + network, + deserialize_hex_try_from(&self.parent_block_id)?, + // TODO: we dont need this + deserialize_json(&self.qc)?, + NodeHeight(self.height as u64), + Epoch(self.epoch as u64), + ShardGroup::decode_from_u32(self.shard_group as u32).ok_or_else(|| StorageError::DataInconsistency { + details: format!( + "Block id={} shard_group ({}) is not a valid", + self.id, self.shard_group as u32 + ), + })?, + PublicKey::from_canonical_bytes(&deserialize_hex(&self.proposed_by)?).map_err(|_| { + StorageError::DecodingError { + operation: "try_convert", + item: "block", + details: format!("Block #{} proposed_by is malformed", self.id), + } + })?, + deserialize_json(&self.commands)?, + deserialize_hex_try_from(&self.merkle_root)?, + self.total_leader_fee as u64, + false, + false, + false, + deserialize_json(&self.foreign_indexes)?, + self.signature.map(|val| deserialize_json(&val)).transpose()?, + self.created_at, + None, + self.timestamp as u64, + self.base_layer_block_height as u64, + deserialize_hex_try_from(&self.base_layer_block_hash)?, + ); + + Ok(consensus_models::ForeignProposal { + block, + block_pledge: deserialize_json(&self.block_pledge)?, + justify_qc: justify_qc.try_into()?, + proposed_by_block: self + .proposed_in_block + .as_deref() + .map(deserialize_hex_try_from) + .transpose()?, + }) + } +} + +#[derive(Debug, Clone, Queryable)] +pub struct ForeignSendCounters { + pub id: i32, + pub block_id: String, + pub counters: String, + pub created_at: PrimitiveDateTime, +} + +impl TryFrom for consensus_models::ForeignSendCounters { + type Error = StorageError; + + fn try_from(value: ForeignSendCounters) -> Result { + Ok(Self { + counters: deserialize_json(&value.counters)?, + }) + } +} + +#[derive(Debug, Clone, Queryable)] +pub struct ForeignReceiveCounters { + pub id: i32, + pub counters: String, + pub created_at: PrimitiveDateTime, +} + +impl TryFrom for consensus_models::ForeignReceiveCounters { + type Error = StorageError; + + fn try_from(value: ForeignReceiveCounters) -> Result { + Ok(Self { + counters: deserialize_json(&value.counters)?, + }) + } +} diff --git a/dan_layer/state_store_sqlite/src/sql_models/mod.rs b/dan_layer/state_store_sqlite/src/sql_models/mod.rs index 7c232f530f..f3c570c66d 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/mod.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/mod.rs @@ -6,6 +6,7 @@ mod block_diff; mod bookkeeping; mod epoch_checkpoint; mod foreign_parked_block; +mod foreign_proposal; mod foreign_substate_pledge; mod leaf_block; mod pending_state_tree_diff; @@ -23,6 +24,7 @@ pub use block_diff::*; pub use bookkeeping::*; pub use epoch_checkpoint::*; pub use foreign_parked_block::*; +pub use foreign_proposal::*; pub use foreign_substate_pledge::*; pub use leaf_block::*; pub use pending_state_tree_diff::*; diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index 1a11ea3c88..ec6797b331 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -89,10 +89,10 @@ impl<'a, TAddr: NodeAddressable> SqliteStateStoreWriteTransaction<'a, TAddr> { self.transaction.as_mut().unwrap().connection() } - fn parked_blocks_remove(&mut self, block_id: &str) -> Result { + fn parked_blocks_remove(&mut self, block_id: &str) -> Result<(Block, Vec), StorageError> { use crate::schema::parked_blocks; - let block = parked_blocks::table + let parked_block = parked_blocks::table .filter(parked_blocks::block_id.eq(&block_id)) .first::(self.connection()) .optional() @@ -113,10 +113,14 @@ impl<'a, TAddr: NodeAddressable> SqliteStateStoreWriteTransaction<'a, TAddr> { source: e, })?; - block.try_into() + parked_block.try_into() } - fn parked_blocks_insert(&mut self, block: &Block) -> Result<(), StorageError> { + fn parked_blocks_insert( + &mut self, + block: &Block, + foreign_proposals: &[ForeignProposal], + ) -> Result<(), StorageError> { use crate::schema::{blocks, parked_blocks}; // check if block exists in blocks table using count query @@ -167,11 +171,11 @@ impl<'a, TAddr: NodeAddressable> SqliteStateStoreWriteTransaction<'a, TAddr> { parked_blocks::total_leader_fee.eq(block.total_leader_fee() as i64), parked_blocks::justify.eq(serialize_json(block.justify())?), parked_blocks::foreign_indexes.eq(serialize_json(block.foreign_indexes())?), - parked_blocks::block_time.eq(block.block_time().map(|v| v as i64)), parked_blocks::signature.eq(block.signature().map(serialize_json).transpose()?), parked_blocks::timestamp.eq(block.timestamp() as i64), parked_blocks::base_layer_block_height.eq(block.base_layer_block_height() as i64), parked_blocks::base_layer_block_hash.eq(serialize_hex(block.base_layer_block_hash())), + parked_blocks::foreign_proposals.eq(serialize_json(foreign_proposals)?), ); diesel::insert_into(parked_blocks::table) @@ -526,40 +530,86 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta Ok(()) } - fn foreign_proposal_upsert(&mut self, foreign_proposal: &ForeignProposal) -> Result<(), StorageError> { + fn foreign_proposals_upsert( + &mut self, + foreign_proposal: &ForeignProposal, + proposed_in_block: Option, + ) -> Result<(), StorageError> { use crate::schema::foreign_proposals; + let block = foreign_proposal.block(); let values = ( - foreign_proposals::shard_group.eq(foreign_proposal.shard_group.encode_as_u32() as i32), - foreign_proposals::block_id.eq(serialize_hex(foreign_proposal.block_id)), - foreign_proposals::state.eq(foreign_proposal.state.to_string()), - foreign_proposals::proposed_height.eq(foreign_proposal.proposed_height.map(|h| h.as_u64() as i64)), - foreign_proposals::transactions.eq(serialize_json(&foreign_proposal.transactions)?), - foreign_proposals::base_layer_block_height.eq(foreign_proposal.base_layer_block_height as i64), + foreign_proposals::block_id.eq(serialize_hex(block.id())), + foreign_proposals::parent_block_id.eq(serialize_hex(block.parent())), + foreign_proposals::merkle_root.eq(block.merkle_root().to_string()), + foreign_proposals::network.eq(block.network().to_string()), + foreign_proposals::height.eq(block.height().as_u64() as i64), + foreign_proposals::epoch.eq(block.epoch().as_u64() as i64), + foreign_proposals::shard_group.eq(block.shard_group().encode_as_u32() as i32), + foreign_proposals::proposed_by.eq(serialize_hex(block.proposed_by().as_bytes())), + foreign_proposals::command_count.eq(block.commands().len() as i64), + foreign_proposals::commands.eq(serialize_json(block.commands())?), + foreign_proposals::total_leader_fee.eq(block.total_leader_fee() as i64), + // TODO: we dont need this + foreign_proposals::qc.eq(serialize_json(block.justify())?), + foreign_proposals::foreign_indexes.eq(serialize_json(block.foreign_indexes())?), + foreign_proposals::timestamp.eq(block.timestamp() as i64), + foreign_proposals::base_layer_block_height.eq(block.base_layer_block_height() as i64), + foreign_proposals::base_layer_block_hash.eq(serialize_hex(block.base_layer_block_hash())), + // Extra + foreign_proposals::justify_qc_id.eq(serialize_hex(foreign_proposal.justify_qc().id())), + foreign_proposals::block_pledge.eq(serialize_json(foreign_proposal.block_pledge())?), ); diesel::insert_into(foreign_proposals::table) - .values(values.clone()) - .on_conflict((foreign_proposals::shard_group, foreign_proposals::block_id)) - .do_update() - .set(values) + .values(&values) + .on_conflict_do_nothing() .execute(self.connection()) .map_err(|e| SqliteStorageError::DieselError { - operation: "foreign_proposal_set", + operation: "foreign_proposals_upsert", source: e, })?; + + if let Some(proposed_in_block) = proposed_in_block { + self.foreign_proposals_set_proposed_in(block.id(), &proposed_in_block)?; + } + Ok(()) } - fn foreign_proposal_delete(&mut self, foreign_proposal: &ForeignProposal) -> Result<(), StorageError> { + fn foreign_proposals_delete(&mut self, block_id: &BlockId) -> Result<(), StorageError> { use crate::schema::foreign_proposals; diesel::delete(foreign_proposals::table) - .filter(foreign_proposals::shard_group.eq(foreign_proposal.shard_group.encode_as_u32() as i32)) - .filter(foreign_proposals::block_id.eq(serialize_hex(foreign_proposal.block_id))) + .filter(foreign_proposals::block_id.eq(serialize_hex(block_id))) .execute(self.connection()) .map_err(|e| SqliteStorageError::DieselError { - operation: "foreign_proposal_delete", + operation: "foreign_proposals_delete", + source: e, + })?; + + Ok(()) + } + + fn foreign_proposals_set_proposed_in( + &mut self, + block_id: &BlockId, + proposed_in_block: &BlockId, + ) -> Result<(), StorageError> { + use crate::schema::{blocks, foreign_proposals}; + + diesel::update(foreign_proposals::table) + .filter(foreign_proposals::block_id.eq(serialize_hex(block_id))) + .set(( + foreign_proposals::proposed_in_block.eq(serialize_hex(proposed_in_block)), + foreign_proposals::proposed_in_block_height.eq(blocks::table + .select(blocks::height) + .filter(blocks::block_id.eq(serialize_hex(proposed_in_block))) + .single_value()), + )) + .execute(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "foreign_proposals_set_proposed_in", source: e, })?; @@ -855,6 +905,18 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta let transaction_id = serialize_hex(update.transaction_id()); let block_id = serialize_hex(update.block_id()); + + // Check if update exists for block and transaction + let count = transaction_pool_state_updates::table + .count() + .filter(transaction_pool_state_updates::block_id.eq(&block_id)) + .filter(transaction_pool_state_updates::transaction_id.eq(&transaction_id)) + .first::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "transaction_pool_add_pending_update", + source: e, + })?; + let values = ( transaction_pool_state_updates::block_id.eq(&block_id), transaction_pool_state_updates::block_height.eq(blocks::table @@ -869,17 +931,6 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta transaction_pool_state_updates::is_ready.eq(update.is_ready()), ); - // Check if update exists for block and transaction - let count = transaction_pool_state_updates::table - .count() - .filter(transaction_pool_state_updates::block_id.eq(&block_id)) - .filter(transaction_pool_state_updates::transaction_id.eq(&transaction_id)) - .first::(self.connection()) - .map_err(|e| SqliteStorageError::DieselError { - operation: "transaction_pool_add_pending_update", - source: e, - })?; - if count == 0 { diesel::insert_into(transaction_pool_state_updates::table) .values(values) @@ -1125,23 +1176,18 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta Ok(()) } - fn missing_transactions_insert< - 'a, - IMissing: IntoIterator, - IAwaiting: IntoIterator, - >( + fn missing_transactions_insert<'a, IMissing: IntoIterator>( &mut self, block: &Block, + foreign_proposals: &[ForeignProposal], missing_transaction_ids: IMissing, - awaiting_transaction_ids: IAwaiting, ) -> Result<(), StorageError> { use crate::schema::missing_transactions; let missing_transaction_ids = missing_transaction_ids.into_iter().map(serialize_hex); - let awaiting_transaction_ids = awaiting_transaction_ids.into_iter().map(serialize_hex); let block_id_hex = serialize_hex(block.id()); - self.parked_blocks_insert(block)?; + self.parked_blocks_insert(block, foreign_proposals)?; let values = missing_transaction_ids .map(|tx_id| { @@ -1152,14 +1198,6 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta missing_transactions::is_awaiting_execution.eq(false), ) }) - .chain(awaiting_transaction_ids.map(|tx_id| { - ( - missing_transactions::block_id.eq(&block_id_hex), - missing_transactions::block_height.eq(block.height().as_u64() as i64), - missing_transactions::transaction_id.eq(tx_id), - missing_transactions::is_awaiting_execution.eq(true), - ) - })) .collect::>(); diesel::insert_into(missing_transactions::table) @@ -1177,7 +1215,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta &mut self, current_height: NodeHeight, transaction_id: &TransactionId, - ) -> Result, StorageError> { + ) -> Result)>, StorageError> { use crate::schema::missing_transactions; let transaction_id = serialize_hex(transaction_id); diff --git a/dan_layer/storage/src/consensus_models/block.rs b/dan_layer/storage/src/consensus_models/block.rs index 8fe1c5ac78..7c1acc7f0c 100644 --- a/dan_layer/storage/src/consensus_models/block.rs +++ b/dan_layer/storage/src/consensus_models/block.rs @@ -36,6 +36,7 @@ use super::{ BlockDiff, BlockPledge, ForeignProposal, + ForeignProposalAtom, ForeignSendCounters, QuorumCertificate, SubstateChange, @@ -313,7 +314,8 @@ impl Block { let inner_hash = hashing::block_hasher() .chain(&self.network) - .chain(&self.justify) + // This allows us to exclude the justify and still validate the block + .chain(self.justify.id()) .chain(&self.height) .chain(&self.total_leader_fee) .chain(&self.epoch) @@ -368,7 +370,7 @@ impl Block { self.commands.iter().filter_map(|d| d.finalising()).map(|t| t.id()) } - pub fn all_foreign_proposals(&self) -> impl Iterator + '_ { + pub fn all_foreign_proposals(&self) -> impl Iterator + '_ { self.commands.iter().filter_map(|c| c.foreign_proposal()) } @@ -543,17 +545,11 @@ impl Block { tx: &TTx, epoch: Epoch, shard_group: ShardGroup, - start_block_id_exclusive: &BlockId, - end_block_id_inclusive: &BlockId, + start_block_id: &BlockId, + end_block_id: &BlockId, include_dummy_blocks: bool, ) -> Result, StorageError> { - tx.blocks_get_all_between( - epoch, - shard_group, - start_block_id_exclusive, - end_block_id_inclusive, - include_dummy_blocks, - ) + tx.blocks_get_all_between(epoch, shard_group, start_block_id, end_block_id, include_dummy_blocks) } pub fn get_last_n_in_epoch( @@ -951,10 +947,7 @@ impl Block { Ok(()) } - pub fn get_local_prepared_block_pledge( - &self, - tx: &TTx, - ) -> Result { + pub fn get_block_pledge(&self, tx: &TTx) -> Result { let mut pledges = BlockPledge::new(); for atom in self .commands() @@ -989,6 +982,13 @@ impl Block { } Ok(pledges) } + + pub fn get_foreign_proposals( + &self, + tx: &TTx, + ) -> Result, StorageError> { + ForeignProposal::get_any(tx, self.all_foreign_proposals().map(|p| &p.block_id)) + } } impl Display for Block { diff --git a/dan_layer/storage/src/consensus_models/command.rs b/dan_layer/storage/src/consensus_models/command.rs index 39bf23248e..8f985df8ad 100644 --- a/dan_layer/storage/src/consensus_models/command.rs +++ b/dan_layer/storage/src/consensus_models/command.rs @@ -9,7 +9,7 @@ use std::{ use serde::{Deserialize, Serialize}; use tari_transaction::TransactionId; -use super::{BlockId, ExecutedTransaction, ForeignProposal, LeaderFee, TransactionRecord}; +use super::{BlockId, ExecutedTransaction, ForeignProposalAtom, LeaderFee, TransactionRecord}; use crate::{ consensus_models::{evidence::Evidence, Decision}, StateStoreReadTransaction, @@ -98,7 +98,7 @@ pub enum Command { /// Request validators to agree that one or more involved shard groups did not agreed to ACCEPT the transaction. SomeAccept(TransactionAtom), // Validator node commands - ForeignProposal(ForeignProposal), + ForeignProposal(ForeignProposalAtom), EndEpoch, } @@ -185,7 +185,7 @@ impl Command { } } - pub fn foreign_proposal(&self) -> Option<&ForeignProposal> { + pub fn foreign_proposal(&self) -> Option<&ForeignProposalAtom> { match self { Command::ForeignProposal(tx) => Some(tx), _ => None, diff --git a/dan_layer/storage/src/consensus_models/evidence.rs b/dan_layer/storage/src/consensus_models/evidence.rs index fdceea4043..1a1760a168 100644 --- a/dan_layer/storage/src/consensus_models/evidence.rs +++ b/dan_layer/storage/src/consensus_models/evidence.rs @@ -124,13 +124,24 @@ impl Evidence { /// Add or update substate addresses and locks into Evidence pub fn update>(&mut self, extend: I) -> &mut Self { for (substate_address, lock_type) in extend { - self.evidence - .entry(substate_address) - .and_modify(|evidence| evidence.lock = lock_type) - .or_insert_with(|| ShardEvidence { - qc_ids: IndexSet::new(), - lock: lock_type, - }); + let maybe_pos = self + .evidence + .iter() + // If the update contains an object (as in ObjectKey) that is already in the evidence, update it without duplicating the object key + .position(|(address, _)| address.object_key_bytes() == substate_address.object_key_bytes()); + match maybe_pos { + Some(pos) => { + let (_, mut evidence) = self.evidence.swap_remove_index(pos).expect("position is valid"); + evidence.lock = lock_type; + self.evidence.insert(substate_address, evidence); + }, + None => { + self.evidence.insert(substate_address, ShardEvidence { + qc_ids: IndexSet::new(), + lock: lock_type, + }); + }, + } } self } @@ -139,17 +150,22 @@ impl Evidence { /// updated to the lock type and the QCs are appended to this instance. pub fn merge(&mut self, other: Evidence) -> &mut Self { for (substate_address, shard_evidence) in other.evidence { - let entry = self + let maybe_pos = self .evidence - .entry(substate_address) - .and_modify(|evidence| { + .iter() + // If the update contains an object (as in ObjectKey) that is already in the evidence, update it without duplicating the object key + .position(|(address, _)| address.object_key_bytes() == substate_address.object_key_bytes()); + match maybe_pos { + Some(pos) => { + let (_, mut evidence) = self.evidence.swap_remove_index(pos).expect("position is valid"); evidence.lock = shard_evidence.lock; - }) - .or_insert_with(|| ShardEvidence { - qc_ids: IndexSet::new(), - lock: shard_evidence.lock, - }); - entry.qc_ids.extend(shard_evidence.qc_ids); + evidence.qc_ids.extend(shard_evidence.qc_ids); + self.evidence.insert(substate_address, evidence); + }, + None => { + self.evidence.insert(substate_address, shard_evidence); + }, + } } self } @@ -189,10 +205,6 @@ pub struct ShardEvidence { } impl ShardEvidence { - pub fn new(qc_ids: IndexSet, lock: SubstateLockType) -> Self { - Self { qc_ids, lock } - } - pub fn is_empty(&self) -> bool { self.qc_ids.is_empty() } diff --git a/dan_layer/storage/src/consensus_models/foreign_parked_proposal.rs b/dan_layer/storage/src/consensus_models/foreign_parked_proposal.rs index 15f9a64f1b..908bcc3d8a 100644 --- a/dan_layer/storage/src/consensus_models/foreign_parked_proposal.rs +++ b/dan_layer/storage/src/consensus_models/foreign_parked_proposal.rs @@ -6,37 +6,35 @@ use std::fmt::Display; use tari_transaction::TransactionId; use crate::{ - consensus_models::{Block, BlockPledge, QuorumCertificate}, + consensus_models::{Block, BlockPledge, ForeignProposal, QuorumCertificate}, StateStoreWriteTransaction, StorageError, }; #[derive(Debug, Clone)] pub struct ForeignParkedProposal { - pub block: Block, - pub block_pledge: BlockPledge, - pub justify_qc: QuorumCertificate, + proposal: ForeignProposal, } impl ForeignParkedProposal { - pub fn new(block: Block, justify_qc: QuorumCertificate, block_pledge: BlockPledge) -> Self { - Self { - block, - block_pledge, - justify_qc, - } + pub fn new(proposal: ForeignProposal) -> Self { + Self { proposal } + } + + pub fn into_proposal(self) -> ForeignProposal { + self.proposal } pub fn block(&self) -> &Block { - &self.block + &self.proposal.block } pub fn block_pledge(&self) -> &BlockPledge { - &self.block_pledge + &self.proposal.block_pledge } pub fn justify_qc(&self) -> &QuorumCertificate { - &self.justify_qc + &self.proposal.justify_qc } } @@ -50,7 +48,7 @@ impl ForeignParkedProposal { tx: &mut TTx, transaction_ids: I, ) -> Result<(), StorageError> { - tx.foreign_parked_blocks_insert_missing_transactions(self.block.id(), transaction_ids) + tx.foreign_parked_blocks_insert_missing_transactions(self.block().id(), transaction_ids) } pub fn remove_by_transaction_id( @@ -63,7 +61,7 @@ impl ForeignParkedProposal { impl Display for ForeignParkedProposal { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "ForeignParkedBlock: block={}, qcs=", self.block)?; + write!(f, "ForeignParkedBlock: block={}, qcs=", self.block())?; for (_tx_id, pledges) in self.block_pledge().iter() { write!(f, "{_tx_id}:[")?; for pledge in pledges { @@ -71,7 +69,7 @@ impl Display for ForeignParkedProposal { } write!(f, "],")?; } - write!(f, "justify_qc={}", self.justify_qc)?; + write!(f, "justify_qc={}", self.justify_qc())?; Ok(()) } } diff --git a/dan_layer/storage/src/consensus_models/foreign_proposal.rs b/dan_layer/storage/src/consensus_models/foreign_proposal.rs index 42ebc877ef..577f610c43 100644 --- a/dan_layer/storage/src/consensus_models/foreign_proposal.rs +++ b/dan_layer/storage/src/consensus_models/foreign_proposal.rs @@ -4,48 +4,99 @@ use std::{ fmt::{self, Display, Formatter}, hash::Hash, + ops::Deref, str::FromStr, }; use serde::{Deserialize, Serialize}; -use tari_dan_common_types::{NodeHeight, ShardGroup}; -use tari_transaction::TransactionId; +use tari_dan_common_types::ShardGroup; -use super::BlockId; +use super::{Block, BlockId, BlockPledge, QuorumCertificate}; use crate::{StateStoreReadTransaction, StateStoreWriteTransaction, StorageError}; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)] -#[cfg_attr( - feature = "ts", - derive(ts_rs::TS), - ts(export, export_to = "../../bindings/src/types/") -)] -pub enum ForeignProposalState { - New, - Proposed, - Deleted, +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ForeignProposal { + pub block: Block, + pub block_pledge: BlockPledge, + pub justify_qc: QuorumCertificate, + pub proposed_by_block: Option, } -impl Display for ForeignProposalState { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - ForeignProposalState::New => write!(f, "New"), - ForeignProposalState::Proposed => write!(f, "Proposed"), - ForeignProposalState::Deleted => write!(f, "Deleted"), +impl ForeignProposal { + pub fn new(block: Block, block_pledge: BlockPledge, justify_qc: QuorumCertificate) -> Self { + Self { + block, + block_pledge, + justify_qc, + proposed_by_block: None, + } + } + + pub fn to_atom(&self) -> ForeignProposalAtom { + ForeignProposalAtom { + shard_group: self.block.shard_group(), + block_id: *self.block.id(), + base_layer_block_height: self.block.base_layer_block_height(), } } + + pub fn block(&self) -> &Block { + &self.block + } + + pub fn block_pledge(&self) -> &BlockPledge { + &self.block_pledge + } + + pub fn justify_qc(&self) -> &QuorumCertificate { + &self.justify_qc + } + + pub fn proposed_by_block(&self) -> Option<&BlockId> { + self.proposed_by_block.as_ref() + } } -impl FromStr for ForeignProposalState { - type Err = anyhow::Error; +impl ForeignProposal { + pub fn upsert(&self, tx: &mut TTx, proposed_in_block: Option) -> Result<(), StorageError> + where + TTx: StateStoreWriteTransaction + Deref + ?Sized, + TTx::Target: StateStoreReadTransaction, + { + self.justify_qc().save(tx)?; + tx.foreign_proposals_upsert(self, proposed_in_block) + } - fn from_str(s: &str) -> Result { - match s { - "New" => Ok(ForeignProposalState::New), - "Proposed" => Ok(ForeignProposalState::Proposed), - "Deleted" => Ok(ForeignProposalState::Deleted), - _ => Err(anyhow::anyhow!("Invalid foreign proposal state {}", s)), - } + pub fn delete(&self, tx: &mut TTx) -> Result<(), StorageError> { + tx.foreign_proposals_delete(self.block.id()) + } + + pub fn get_any<'a, TTx: StateStoreReadTransaction + ?Sized, I: IntoIterator>( + tx: &TTx, + block_ids: I, + ) -> Result, StorageError> { + tx.foreign_proposals_get_any(block_ids) + } + + pub fn exists(&self, tx: &TTx) -> Result { + tx.foreign_proposals_exists(self.block.id()) + } + + pub fn get_all_new( + tx: &TTx, + max_base_layer_block_height: u64, + block_id: &BlockId, + limit: usize, + ) -> Result, StorageError> { + tx.foreign_proposals_get_all_new(max_base_layer_block_height, block_id, limit) + } + + pub fn set_proposed_in( + tx: &mut TTx, + block_id: &BlockId, + proposed_in_block: &BlockId, + ) -> Result<(), StorageError> { + tx.foreign_proposals_set_proposed_in(block_id, proposed_in_block) } } @@ -55,74 +106,47 @@ impl FromStr for ForeignProposalState { derive(ts_rs::TS), ts(export, export_to = "../../bindings/src/types/") )] -pub struct ForeignProposal { - #[cfg_attr(feature = "ts", ts(type = "number"))] - pub shard_group: ShardGroup, +pub struct ForeignProposalAtom { #[cfg_attr(feature = "ts", ts(type = "string"))] pub block_id: BlockId, - pub state: ForeignProposalState, - pub proposed_height: Option, - #[cfg_attr(feature = "ts", ts(type = "Array"))] - pub transactions: Vec, + #[cfg_attr(feature = "ts", ts(type = "number"))] + pub shard_group: ShardGroup, #[cfg_attr(feature = "ts", ts(type = "number"))] pub base_layer_block_height: u64, } -impl ForeignProposal { - pub fn new( - shard_group: ShardGroup, - block_id: BlockId, - transactions: Vec, - base_layer_block_height: u64, - ) -> Self { - Self { - shard_group, - block_id, - state: ForeignProposalState::New, - proposed_height: None, - transactions, - base_layer_block_height, - } - } - - pub fn set_proposed_height(&mut self, height: NodeHeight) -> &mut Self { - self.proposed_height = Some(height); - self.state = ForeignProposalState::Proposed; - self +impl ForeignProposalAtom { + pub fn exists(&self, tx: &TTx) -> Result { + tx.foreign_proposals_exists(&self.block_id) } } -impl ForeignProposal { - pub fn upsert(&self, tx: &mut TTx) -> Result<(), StorageError> { - tx.foreign_proposal_upsert(self)?; - Ok(()) - } - - pub fn delete(&self, tx: &mut TTx) -> Result<(), StorageError> { - tx.foreign_proposal_delete(self)?; - Ok(()) - } - - pub fn exists(&self, tx: &TTx) -> Result { - tx.foreign_proposal_exists(self) - } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)] +pub enum ForeignProposalStatus { + New, + Proposed, + Deleted, +} - pub fn get_all_new(tx: &TTx) -> Result, StorageError> { - tx.foreign_proposal_get_all_new() +impl Display for ForeignProposalStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + ForeignProposalStatus::New => write!(f, "New"), + ForeignProposalStatus::Proposed => write!(f, "Proposed"), + ForeignProposalStatus::Deleted => write!(f, "Deleted"), + } } +} - pub fn get_all_pending( - tx: &TTx, - from_block_id: &BlockId, - to_block_id: &BlockId, - ) -> Result, StorageError> { - tx.foreign_proposal_get_all_pending(from_block_id, to_block_id) - } +impl FromStr for ForeignProposalStatus { + type Err = anyhow::Error; - pub fn get_all_proposed( - tx: &TTx, - to_height: NodeHeight, - ) -> Result, StorageError> { - tx.foreign_proposal_get_all_proposed(to_height) + fn from_str(s: &str) -> Result { + match s { + "New" => Ok(ForeignProposalStatus::New), + "Proposed" => Ok(ForeignProposalStatus::Proposed), + "Deleted" => Ok(ForeignProposalStatus::Deleted), + _ => Err(anyhow::anyhow!("Invalid foreign proposal state {}", s)), + } } } diff --git a/dan_layer/storage/src/consensus_models/transaction_pool.rs b/dan_layer/storage/src/consensus_models/transaction_pool.rs index ba3bc09893..e060a39c37 100644 --- a/dan_layer/storage/src/consensus_models/transaction_pool.rs +++ b/dan_layer/storage/src/consensus_models/transaction_pool.rs @@ -543,16 +543,17 @@ impl TransactionPoolRecord { ((TransactionPoolStage::New, TransactionPoolStage::Prepared), true) | ((TransactionPoolStage::New, TransactionPoolStage::LocalOnly), false) | // Prepared + ((TransactionPoolStage::Prepared, TransactionPoolStage::Prepared), _) | ((TransactionPoolStage::Prepared, TransactionPoolStage::LocalPrepared), _) | // LocalPrepared ((TransactionPoolStage::LocalPrepared, TransactionPoolStage::LocalPrepared), _) | ((TransactionPoolStage::LocalPrepared, TransactionPoolStage::AllPrepared), _) | ((TransactionPoolStage::LocalPrepared, TransactionPoolStage::SomePrepared), _) | // AllPrepared - ((TransactionPoolStage::AllPrepared, TransactionPoolStage::AllPrepared), false) | + ((TransactionPoolStage::AllPrepared, TransactionPoolStage::AllPrepared), _) | ((TransactionPoolStage::AllPrepared, TransactionPoolStage::LocalAccepted), _) | // SomePrepared - ((TransactionPoolStage::SomePrepared, TransactionPoolStage::SomePrepared), false) | + ((TransactionPoolStage::SomePrepared, TransactionPoolStage::SomePrepared), _) | ((TransactionPoolStage::SomePrepared, TransactionPoolStage::LocalAccepted), _) | // LocalAccepted ((TransactionPoolStage::LocalAccepted, TransactionPoolStage::LocalAccepted), _) | @@ -583,6 +584,17 @@ impl TransactionPoolRecord { ) -> Result<(), TransactionPoolError> { self.check_pending_status_update(pending_stage, is_ready)?; + debug!( + target: LOG_TARGET, + "add_pending_status_update: tx: {}, blk: {}, transition: {}->{}, ready {}->{}", + self.transaction_id, + block.block_id, + self.stage, + pending_stage, + self.is_ready, + is_ready, + ); + let update = TransactionPoolStatusUpdate { block_id: block.block_id, transaction_id: self.transaction_id, diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index 449591809e..71fa37c413 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -29,6 +29,7 @@ use crate::{ Evidence, ForeignParkedProposal, ForeignProposal, + ForeignProposalAtom, ForeignReceiveCounters, ForeignSendCounters, HighQc, @@ -105,14 +106,23 @@ pub trait StateStoreReadTransaction: Sized { fn locked_block_get(&self) -> Result; fn leaf_block_get(&self) -> Result; fn high_qc_get(&self) -> Result; - fn foreign_proposal_exists(&self, foreign_proposal: &ForeignProposal) -> Result; - fn foreign_proposal_get_all_new(&self) -> Result, StorageError>; + fn foreign_proposals_get_any<'a, I: IntoIterator>( + &self, + block_ids: I, + ) -> Result, StorageError>; + fn foreign_proposals_exists(&self, block_id: &BlockId) -> Result; + fn foreign_proposals_get_all_new( + &self, + max_base_layer_block_height: u64, + block_id: &BlockId, + limit: usize, + ) -> Result, StorageError>; fn foreign_proposal_get_all_pending( &self, from_block_id: &BlockId, to_block_id: &BlockId, - ) -> Result, StorageError>; - fn foreign_proposal_get_all_proposed(&self, to_height: NodeHeight) -> Result, StorageError>; + ) -> Result, StorageError>; + fn foreign_send_counters_get(&self, block_id: &BlockId) -> Result; fn foreign_receive_counters_get(&self) -> Result; fn transactions_get(&self, tx_id: &TransactionId) -> Result; @@ -148,8 +158,8 @@ pub trait StateStoreReadTransaction: Sized { &self, epoch: Epoch, shard_group: ShardGroup, - start_block_id_exclusive: &BlockId, - end_block_id_inclusive: &BlockId, + start_block_id: &BlockId, + end_block_id: &BlockId, include_dummy_blocks: bool, ) -> Result, StorageError>; fn blocks_exists(&self, block_id: &BlockId) -> Result; @@ -192,8 +202,6 @@ pub trait StateStoreReadTransaction: Sized { substate_id: &SubstateId, ) -> Result; - fn parked_blocks_exists(&self, block_id: &BlockId) -> Result; - // -------------------------------- QuorumCertificate -------------------------------- // fn quorum_certificates_get(&self, qc_id: &QcId) -> Result; fn quorum_certificates_get_all<'a, I: IntoIterator>( @@ -337,8 +345,18 @@ pub trait StateStoreWriteTransaction { fn leaf_block_set(&mut self, leaf_node: &LeafBlock) -> Result<(), StorageError>; fn locked_block_set(&mut self, locked_block: &LockedBlock) -> Result<(), StorageError>; fn high_qc_set(&mut self, high_qc: &HighQc) -> Result<(), StorageError>; - fn foreign_proposal_upsert(&mut self, foreign_proposal: &ForeignProposal) -> Result<(), StorageError>; - fn foreign_proposal_delete(&mut self, foreign_proposal: &ForeignProposal) -> Result<(), StorageError>; + fn foreign_proposals_upsert( + &mut self, + foreign_proposal: &ForeignProposal, + proposed_in_block: Option, + ) -> Result<(), StorageError>; + fn foreign_proposals_delete(&mut self, block_id: &BlockId) -> Result<(), StorageError>; + + fn foreign_proposals_set_proposed_in( + &mut self, + block_id: &BlockId, + proposed_in_block: &BlockId, + ) -> Result<(), StorageError>; fn foreign_send_counters_set( &mut self, foreign_send_counter: &ForeignSendCounters, @@ -403,22 +421,18 @@ pub trait StateStoreWriteTransaction { // -------------------------------- Missing Transactions -------------------------------- // - fn missing_transactions_insert< - 'a, - IMissing: IntoIterator, - IAwaiting: IntoIterator, - >( + fn missing_transactions_insert<'a, IMissing: IntoIterator>( &mut self, park_block: &Block, + foreign_proposals: &[ForeignProposal], missing_transaction_ids: IMissing, - awaiting_transaction_ids: IAwaiting, ) -> Result<(), StorageError>; fn missing_transactions_remove( &mut self, current_height: NodeHeight, transaction_id: &TransactionId, - ) -> Result, StorageError>; + ) -> Result)>, StorageError>; fn foreign_parked_blocks_insert(&mut self, park_block: &ForeignParkedProposal) -> Result<(), StorageError>; diff --git a/dan_layer/template_lib/src/hash.rs b/dan_layer/template_lib/src/hash.rs index c940547bff..51ebc87498 100644 --- a/dan_layer/template_lib/src/hash.rs +++ b/dan_layer/template_lib/src/hash.rs @@ -42,7 +42,7 @@ impl Hash { Self(bytes) } - pub fn into_array(self) -> [u8; Self::LENGTH] { + pub const fn into_array(self) -> [u8; Self::LENGTH] { self.0 } diff --git a/dan_layer/template_lib/src/models/layer_one_commitment.rs b/dan_layer/template_lib/src/models/layer_one_commitment.rs index f089679281..cf504a922e 100644 --- a/dan_layer/template_lib/src/models/layer_one_commitment.rs +++ b/dan_layer/template_lib/src/models/layer_one_commitment.rs @@ -5,28 +5,24 @@ use std::fmt::{Display, Formatter}; use serde::{Deserialize, Serialize}; -use crate::{hash::HashParseError, Hash}; +use crate::models::{KeyParseError, ObjectKey}; /// The unique identification of a unclaimed confidential output in the Tari network. /// Used when a user wants to claim burned funds from the Minotari network into the Tari network #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[serde(transparent)] -pub struct UnclaimedConfidentialOutputAddress(Hash); +pub struct UnclaimedConfidentialOutputAddress(ObjectKey); impl UnclaimedConfidentialOutputAddress { - pub const fn new(hash: Hash) -> Self { - Self(hash) + pub fn from_hex(hex: &str) -> Result { + Ok(Self(ObjectKey::from_hex(hex)?)) } - pub fn from_hex(hex: &str) -> Result { - Ok(Self(Hash::from_hex(hex)?)) + pub fn try_from_commitment(commitment_bytes: &[u8]) -> Result { + Ok(Self(ObjectKey::try_from(commitment_bytes)?)) } - pub fn try_from_commitment(commitment_bytes: &[u8]) -> Result { - Ok(Self(Hash::try_from(commitment_bytes)?)) - } - - pub fn hash(&self) -> &Hash { + pub fn as_object_key(&self) -> &ObjectKey { &self.0 } @@ -36,10 +32,10 @@ impl UnclaimedConfidentialOutputAddress { } impl TryFrom<&[u8]> for UnclaimedConfidentialOutputAddress { - type Error = HashParseError; + type Error = KeyParseError; fn try_from(value: &[u8]) -> Result { - Hash::try_from(value).map(Self) + ObjectKey::try_from(value).map(Self) } } diff --git a/dan_layer/wallet/sdk/src/apis/substate.rs b/dan_layer/wallet/sdk/src/apis/substate.rs index ecf3f12f03..509c6d942f 100644 --- a/dan_layer/wallet/sdk/src/apis/substate.rs +++ b/dan_layer/wallet/sdk/src/apis/substate.rs @@ -113,7 +113,7 @@ where }, SubstateValue::Resource(_) => {}, SubstateValue::TransactionReceipt(tx_receipt) => { - let tx_receipt_addr = SubstateId::TransactionReceipt(TransactionReceiptAddress::new( + let tx_receipt_addr = SubstateId::TransactionReceipt(TransactionReceiptAddress::from_hash( tx_receipt.transaction_hash, )); if substate_addresses.contains_key(&tx_receipt_addr) { diff --git a/networking/libp2p-messaging/src/handler.rs b/networking/libp2p-messaging/src/handler.rs index 390401739f..8c8b59f6bb 100644 --- a/networking/libp2p-messaging/src/handler.rs +++ b/networking/libp2p-messaging/src/handler.rs @@ -176,6 +176,7 @@ where TCodec: Codec + Send + Clone + 'static let fut = async move { loop { + // TODO: read timeout match codec.decode_from(&mut stream).await { Ok(msg) => { events