From ed273b2943161a8e0e9f6075fdd35494308cdd4c Mon Sep 17 00:00:00 2001 From: matan-starkware <97523054+matan-starkware@users.noreply.github.com> Date: Wed, 4 Dec 2024 09:32:36 +0200 Subject: [PATCH] feat(consensus): sequencer context broadcasts votes (#2422) --- Cargo.lock | 2 - .../src/sequencer_consensus_context.rs | 9 +- .../src/sequencer_consensus_context_test.rs | 121 +++++++----------- crates/starknet_consensus_manager/Cargo.toml | 2 - .../src/consensus_manager.rs | 50 ++------ 5 files changed, 64 insertions(+), 120 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd4c794b3e..6c9bddd9f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10342,12 +10342,10 @@ version = "0.0.0" dependencies = [ "async-trait", "futures", - "libp2p", "papyrus_config", "papyrus_consensus", "papyrus_consensus_orchestrator", "papyrus_network", - "papyrus_network_types", "papyrus_protobuf", "serde", "starknet_batcher_types", diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 11fc4b7b33..8f5322e6f6 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -19,7 +19,7 @@ use papyrus_consensus::types::{ Round, ValidatorId, }; -use papyrus_network::network_manager::BroadcastTopicClient; +use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; use papyrus_protobuf::consensus::{ ConsensusMessage, ProposalFin, @@ -105,6 +105,8 @@ pub struct SequencerConsensusContext { queued_proposals: BTreeMap)>, outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, + // Used to broadcast votes to other consensus nodes. + vote_broadcast_client: BroadcastTopicClient, } impl SequencerConsensusContext { @@ -112,12 +114,14 @@ impl SequencerConsensusContext { batcher: Arc, _proposal_streaming_client: BroadcastTopicClient, outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, + vote_broadcast_client: BroadcastTopicClient, num_validators: u64, ) -> Self { Self { batcher, _proposal_streaming_client, outbound_proposal_sender, + vote_broadcast_client, validators: (0..num_validators).map(ValidatorId::from).collect(), valid_proposals: Arc::new(Mutex::new(HeightToIdToContent::new())), proposal_id: 0, @@ -274,7 +278,8 @@ impl ConsensusContext for SequencerConsensusContext { } async fn broadcast(&mut self, message: ConsensusMessage) -> Result<(), ConsensusError> { - debug!("No-op broadcasting message: {message:?}"); + debug!("Broadcasting message: {message:?}"); + self.vote_broadcast_client.broadcast_message(message).await?; Ok(()) } diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index 3fb1a632c6..f89ed74604 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -14,6 +14,7 @@ use papyrus_network::network_manager::test_utils::{ }; use papyrus_network::network_manager::BroadcastTopicChannels; use papyrus_protobuf::consensus::{ + ConsensusMessage, ProposalFin, ProposalInit, ProposalPart, @@ -67,20 +68,49 @@ fn generate_executable_invoke_tx(tx_hash: Felt) -> ExecutableTransaction { }))) } -fn make_streaming_channels() -> ( - mpsc::Sender<(u64, mpsc::Receiver)>, - mpsc::Receiver>, - BroadcastNetworkMock>, -) { - let TestSubscriberChannels { mock_network, subscriber_channels } = +// Structs which aren't utilized but should not be dropped. +struct NetworkDependencies { + _vote_network: BroadcastNetworkMock, + _old_proposal_network: BroadcastNetworkMock, + _new_proposal_network: BroadcastNetworkMock>, +} + +fn setup(batcher: MockBatcherClient) -> (SequencerConsensusContext, NetworkDependencies) { + let TestSubscriberChannels { mock_network: mock_proposal_stream_network, subscriber_channels } = mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver: inbound_network_receiver, broadcast_topic_client: outbound_network_sender, } = subscriber_channels; - let (outbound_internal_sender, inbound_internal_receiver, _) = + let (outbound_proposal_stream_sender, _, _) = StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); - (outbound_internal_sender, inbound_internal_receiver, mock_network) + + // TODO(guyn): remove this first set of channels once we are using only the streaming channels. + let TestSubscriberChannels { mock_network: mock_proposal_network, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { broadcast_topic_client: proposal_streaming_client, .. } = + subscriber_channels; + + let TestSubscriberChannels { mock_network: mock_vote_network, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { broadcast_topic_client: votes_topic_client, .. } = + subscriber_channels; + + let context = SequencerConsensusContext::new( + Arc::new(batcher), + proposal_streaming_client, + outbound_proposal_stream_sender, + votes_topic_client, + NUM_VALIDATORS, + ); + + let network_dependencies = NetworkDependencies { + _vote_network: mock_vote_network, + _old_proposal_network: mock_proposal_network, + _new_proposal_network: mock_proposal_stream_network, + }; + + (context, network_dependencies) } #[tokio::test] @@ -110,21 +140,8 @@ async fn build_proposal() { }), }) }); - // TODO(guyn): remove this first set of channels once we are using only the streaming channels. - let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } = - mock_register_broadcast_topic().expect("Failed to create mock network"); - let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = - subscriber_channels; - - let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) = - make_streaming_channels(); + let (mut context, _network) = setup(batcher); - let mut context = SequencerConsensusContext::new( - Arc::new(batcher), - broadcast_topic_client, - outbound_internal_sender, - NUM_VALIDATORS, - ); let init = ProposalInit { height: BlockNumber(0), round: 0, @@ -172,21 +189,8 @@ async fn validate_proposal_success() { }) }, ); - // TODO(guyn): remove this first set of channels once we are using only the streaming channels. - let TestSubscriberChannels { mock_network: _, subscriber_channels } = - mock_register_broadcast_topic().expect("Failed to create mock network"); - let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = - subscriber_channels; + let (mut context, _network) = setup(batcher); - let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) = - make_streaming_channels(); - - let mut context = SequencerConsensusContext::new( - Arc::new(batcher), - broadcast_topic_client, - outbound_internal_sender, - NUM_VALIDATORS, - ); // Initialize the context for a specific height, starting with round 0. context.set_height_and_round(BlockNumber(0), 0).await; @@ -239,21 +243,8 @@ async fn repropose() { }) }, ); - // TODO(guyn): remove this first set of channels once we are using only the streaming channels. - let TestSubscriberChannels { mock_network: _, subscriber_channels } = - mock_register_broadcast_topic().expect("Failed to create mock network"); - let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = - subscriber_channels; + let (mut context, _network) = setup(batcher); - let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) = - make_streaming_channels(); - - let mut context = SequencerConsensusContext::new( - Arc::new(batcher), - broadcast_topic_client, - outbound_internal_sender, - NUM_VALIDATORS, - ); // Initialize the context for a specific height, starting with round 0. context.set_height_and_round(BlockNumber(0), 0).await; @@ -319,20 +310,7 @@ async fn proposals_from_different_rounds() { }) }, ); - let TestSubscriberChannels { mock_network: _, subscriber_channels } = - mock_register_broadcast_topic().expect("Failed to create mock network"); - let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = - subscriber_channels; - - let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) = - make_streaming_channels(); - - let mut context = SequencerConsensusContext::new( - Arc::new(batcher), - broadcast_topic_client, - outbound_internal_sender, - NUM_VALIDATORS, - ); + let (mut context, _network) = setup(batcher); // Initialize the context for a specific height, starting with round 0. context.set_height_and_round(BlockNumber(0), 0).await; context.set_height_and_round(BlockNumber(0), 1).await; @@ -418,20 +396,7 @@ async fn interrupt_active_proposal() { }), }) }); - let TestSubscriberChannels { mock_network: _, subscriber_channels } = - mock_register_broadcast_topic().expect("Failed to create mock network"); - let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = - subscriber_channels; - - let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) = - make_streaming_channels(); - - let mut context = SequencerConsensusContext::new( - Arc::new(batcher), - broadcast_topic_client, - outbound_internal_sender, - NUM_VALIDATORS, - ); + let (mut context, _network) = setup(batcher); // Initialize the context for a specific height, starting with round 0. context.set_height_and_round(BlockNumber(0), 0).await; diff --git a/crates/starknet_consensus_manager/Cargo.toml b/crates/starknet_consensus_manager/Cargo.toml index 88b13d65d1..a4f2b6f7a5 100644 --- a/crates/starknet_consensus_manager/Cargo.toml +++ b/crates/starknet_consensus_manager/Cargo.toml @@ -11,12 +11,10 @@ workspace = true [dependencies] async-trait.workspace = true futures.workspace = true -libp2p.workspace = true papyrus_config.workspace = true papyrus_consensus.workspace = true papyrus_consensus_orchestrator.workspace = true papyrus_network.workspace = true -papyrus_network_types.workspace = true papyrus_protobuf.workspace = true serde.workspace = true starknet_batcher_types.workspace = true diff --git a/crates/starknet_consensus_manager/src/consensus_manager.rs b/crates/starknet_consensus_manager/src/consensus_manager.rs index f6d8964e41..e934204c9e 100644 --- a/crates/starknet_consensus_manager/src/consensus_manager.rs +++ b/crates/starknet_consensus_manager/src/consensus_manager.rs @@ -2,20 +2,12 @@ use std::any::type_name; use std::sync::Arc; use async_trait::async_trait; -use futures::channel::mpsc::{self, SendError}; -use futures::future::Ready; -use futures::{SinkExt, StreamExt}; -use libp2p::PeerId; +use futures::StreamExt; use papyrus_consensus::stream_handler::StreamHandler; -use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError}; +use papyrus_consensus::types::ConsensusError; use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::{ - BroadcastTopicChannels, - BroadcastTopicClient, - NetworkManager, -}; -use papyrus_network_types::network_types::BroadcastedMessageMetadata; +use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager}; use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart, StreamMessage}; use starknet_batcher_types::communication::SharedBatcherClient; use starknet_sequencer_infra::component_definitions::ComponentStarter; @@ -26,9 +18,8 @@ use crate::config::ConsensusManagerConfig; // TODO(Dan, Guy): move to config. pub const BROADCAST_BUFFER_SIZE: usize = 100; - pub const CONSENSUS_PROPOSALS_TOPIC: &str = "consensus_proposals"; - +pub const CONSENSUS_VOTES_TOPIC: &str = "consensus_votes"; // TODO(guyn): remove this once we have integrated streaming. pub const NETWORK_TOPIC2: &str = "streamed_consensus_proposals"; @@ -61,6 +52,14 @@ impl ConsensusManager { BROADCAST_BUFFER_SIZE, ) .expect("Failed to register broadcast topic"); + + let votes_broadcast_channels = network_manager + .register_broadcast_topic::( + Topic::new(CONSENSUS_VOTES_TOPIC), + BROADCAST_BUFFER_SIZE, + ) + .expect("Failed to register broadcast topic"); + let BroadcastTopicChannels { broadcasted_messages_receiver: inbound_network_receiver, broadcast_topic_client: outbound_network_sender, @@ -73,6 +72,7 @@ impl ConsensusManager { Arc::clone(&self.batcher_client), old_proposals_broadcast_channels.broadcast_topic_client.clone(), outbound_internal_sender, + votes_broadcast_channels.broadcast_topic_client.clone(), self.config.consensus_config.num_validators, ); @@ -85,7 +85,7 @@ impl ConsensusManager { self.config.consensus_config.validator_id, self.config.consensus_config.consensus_delay, self.config.consensus_config.timeouts.clone(), - create_fake_network_channels(), + votes_broadcast_channels.into(), inbound_internal_receiver, futures::stream::pending(), ); @@ -116,28 +116,6 @@ impl ConsensusManager { } } -// Milestone 1: -// We want to only run 1 node (e.g. no network), implying the local node can reach a quorum -// alone and is always the proposer. Actually connecting to the network will require an external -// dependency. -fn create_fake_network_channels() -> BroadcastConsensusMessageChannel { - let messages_to_broadcast_fn: fn(ConsensusMessage) -> Ready, SendError>> = - |_| todo!("messages_to_broadcast_sender should not be used"); - let reported_messages_sender_fn: fn( - BroadcastedMessageMetadata, - ) -> Ready> = - |_| todo!("messages_to_broadcast_sender should not be used"); - let broadcast_topic_client = BroadcastTopicClient::new( - mpsc::channel(0).0.with(messages_to_broadcast_fn), - mpsc::channel(0).0.with(reported_messages_sender_fn), - mpsc::channel(0).0, - ); - BroadcastConsensusMessageChannel { - broadcasted_messages_receiver: Box::new(futures::stream::pending()), - broadcast_topic_client, - } -} - pub fn create_consensus_manager( config: ConsensusManagerConfig, batcher_client: SharedBatcherClient,