Skip to content

Commit

Permalink
feat(consensus): sequencer context broadcasts votes (starkware-libs#2422
Browse files Browse the repository at this point in the history
)
  • Loading branch information
matan-starkware authored Dec 4, 2024
1 parent 728d6a1 commit ed273b2
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 120 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use papyrus_consensus::types::{
Round,
ValidatorId,
};
use papyrus_network::network_manager::BroadcastTopicClient;
use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use papyrus_protobuf::consensus::{
ConsensusMessage,
ProposalFin,
Expand Down Expand Up @@ -105,19 +105,23 @@ pub struct SequencerConsensusContext {
queued_proposals:
BTreeMap<Round, (ValidationParams, oneshot::Sender<(ProposalContentId, ProposalFin)>)>,
outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
// Used to broadcast votes to other consensus nodes.
vote_broadcast_client: BroadcastTopicClient<ConsensusMessage>,
}

impl SequencerConsensusContext {
pub fn new(
batcher: Arc<dyn BatcherClient>,
_proposal_streaming_client: BroadcastTopicClient<ProposalPart>,
outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
vote_broadcast_client: BroadcastTopicClient<ConsensusMessage>,
num_validators: u64,
) -> Self {
Self {
batcher,
_proposal_streaming_client,
outbound_proposal_sender,
vote_broadcast_client,
validators: (0..num_validators).map(ValidatorId::from).collect(),
valid_proposals: Arc::new(Mutex::new(HeightToIdToContent::new())),
proposal_id: 0,
Expand Down Expand Up @@ -274,7 +278,8 @@ impl ConsensusContext for SequencerConsensusContext {
}

async fn broadcast(&mut self, message: ConsensusMessage) -> Result<(), ConsensusError> {
debug!("No-op broadcasting message: {message:?}");
debug!("Broadcasting message: {message:?}");
self.vote_broadcast_client.broadcast_message(message).await?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use papyrus_network::network_manager::test_utils::{
};
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::{
ConsensusMessage,
ProposalFin,
ProposalInit,
ProposalPart,
Expand Down Expand Up @@ -67,20 +68,49 @@ fn generate_executable_invoke_tx(tx_hash: Felt) -> ExecutableTransaction {
})))
}

fn make_streaming_channels() -> (
mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
mpsc::Receiver<mpsc::Receiver<ProposalPart>>,
BroadcastNetworkMock<StreamMessage<ProposalPart>>,
) {
let TestSubscriberChannels { mock_network, subscriber_channels } =
// Structs which aren't utilized but should not be dropped.
struct NetworkDependencies {
_vote_network: BroadcastNetworkMock<ConsensusMessage>,
_old_proposal_network: BroadcastNetworkMock<ProposalPart>,
_new_proposal_network: BroadcastNetworkMock<StreamMessage<ProposalPart>>,
}

fn setup(batcher: MockBatcherClient) -> (SequencerConsensusContext, NetworkDependencies) {
let TestSubscriberChannels { mock_network: mock_proposal_stream_network, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
} = subscriber_channels;
let (outbound_internal_sender, inbound_internal_receiver, _) =
let (outbound_proposal_stream_sender, _, _) =
StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender);
(outbound_internal_sender, inbound_internal_receiver, mock_network)

// TODO(guyn): remove this first set of channels once we are using only the streaming channels.
let TestSubscriberChannels { mock_network: mock_proposal_network, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcast_topic_client: proposal_streaming_client, .. } =
subscriber_channels;

let TestSubscriberChannels { mock_network: mock_vote_network, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcast_topic_client: votes_topic_client, .. } =
subscriber_channels;

let context = SequencerConsensusContext::new(
Arc::new(batcher),
proposal_streaming_client,
outbound_proposal_stream_sender,
votes_topic_client,
NUM_VALIDATORS,
);

let network_dependencies = NetworkDependencies {
_vote_network: mock_vote_network,
_old_proposal_network: mock_proposal_network,
_new_proposal_network: mock_proposal_stream_network,
};

(context, network_dependencies)
}

#[tokio::test]
Expand Down Expand Up @@ -110,21 +140,8 @@ async fn build_proposal() {
}),
})
});
// TODO(guyn): remove this first set of channels once we are using only the streaming channels.
let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;

let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();
let (mut context, _network) = setup(batcher);

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
broadcast_topic_client,
outbound_internal_sender,
NUM_VALIDATORS,
);
let init = ProposalInit {
height: BlockNumber(0),
round: 0,
Expand Down Expand Up @@ -172,21 +189,8 @@ async fn validate_proposal_success() {
})
},
);
// TODO(guyn): remove this first set of channels once we are using only the streaming channels.
let TestSubscriberChannels { mock_network: _, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;
let (mut context, _network) = setup(batcher);

let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
broadcast_topic_client,
outbound_internal_sender,
NUM_VALIDATORS,
);
// Initialize the context for a specific height, starting with round 0.
context.set_height_and_round(BlockNumber(0), 0).await;

Expand Down Expand Up @@ -239,21 +243,8 @@ async fn repropose() {
})
},
);
// TODO(guyn): remove this first set of channels once we are using only the streaming channels.
let TestSubscriberChannels { mock_network: _, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;
let (mut context, _network) = setup(batcher);

let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
broadcast_topic_client,
outbound_internal_sender,
NUM_VALIDATORS,
);
// Initialize the context for a specific height, starting with round 0.
context.set_height_and_round(BlockNumber(0), 0).await;

Expand Down Expand Up @@ -319,20 +310,7 @@ async fn proposals_from_different_rounds() {
})
},
);
let TestSubscriberChannels { mock_network: _, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;

let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
broadcast_topic_client,
outbound_internal_sender,
NUM_VALIDATORS,
);
let (mut context, _network) = setup(batcher);
// Initialize the context for a specific height, starting with round 0.
context.set_height_and_round(BlockNumber(0), 0).await;
context.set_height_and_round(BlockNumber(0), 1).await;
Expand Down Expand Up @@ -418,20 +396,7 @@ async fn interrupt_active_proposal() {
}),
})
});
let TestSubscriberChannels { mock_network: _, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;

let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
broadcast_topic_client,
outbound_internal_sender,
NUM_VALIDATORS,
);
let (mut context, _network) = setup(batcher);
// Initialize the context for a specific height, starting with round 0.
context.set_height_and_round(BlockNumber(0), 0).await;

Expand Down
2 changes: 0 additions & 2 deletions crates/starknet_consensus_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ workspace = true
[dependencies]
async-trait.workspace = true
futures.workspace = true
libp2p.workspace = true
papyrus_config.workspace = true
papyrus_consensus.workspace = true
papyrus_consensus_orchestrator.workspace = true
papyrus_network.workspace = true
papyrus_network_types.workspace = true
papyrus_protobuf.workspace = true
serde.workspace = true
starknet_batcher_types.workspace = true
Expand Down
50 changes: 14 additions & 36 deletions crates/starknet_consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,12 @@ use std::any::type_name;
use std::sync::Arc;

use async_trait::async_trait;
use futures::channel::mpsc::{self, SendError};
use futures::future::Ready;
use futures::{SinkExt, StreamExt};
use libp2p::PeerId;
use futures::StreamExt;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError};
use papyrus_consensus::types::ConsensusError;
use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{
BroadcastTopicChannels,
BroadcastTopicClient,
NetworkManager,
};
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart, StreamMessage};
use starknet_batcher_types::communication::SharedBatcherClient;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
Expand All @@ -26,9 +18,8 @@ use crate::config::ConsensusManagerConfig;

// TODO(Dan, Guy): move to config.
pub const BROADCAST_BUFFER_SIZE: usize = 100;

pub const CONSENSUS_PROPOSALS_TOPIC: &str = "consensus_proposals";

pub const CONSENSUS_VOTES_TOPIC: &str = "consensus_votes";
// TODO(guyn): remove this once we have integrated streaming.
pub const NETWORK_TOPIC2: &str = "streamed_consensus_proposals";

Expand Down Expand Up @@ -61,6 +52,14 @@ impl ConsensusManager {
BROADCAST_BUFFER_SIZE,
)
.expect("Failed to register broadcast topic");

let votes_broadcast_channels = network_manager
.register_broadcast_topic::<ConsensusMessage>(
Topic::new(CONSENSUS_VOTES_TOPIC),
BROADCAST_BUFFER_SIZE,
)
.expect("Failed to register broadcast topic");

let BroadcastTopicChannels {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
Expand All @@ -73,6 +72,7 @@ impl ConsensusManager {
Arc::clone(&self.batcher_client),
old_proposals_broadcast_channels.broadcast_topic_client.clone(),
outbound_internal_sender,
votes_broadcast_channels.broadcast_topic_client.clone(),
self.config.consensus_config.num_validators,
);

Expand All @@ -85,7 +85,7 @@ impl ConsensusManager {
self.config.consensus_config.validator_id,
self.config.consensus_config.consensus_delay,
self.config.consensus_config.timeouts.clone(),
create_fake_network_channels(),
votes_broadcast_channels.into(),
inbound_internal_receiver,
futures::stream::pending(),
);
Expand Down Expand Up @@ -116,28 +116,6 @@ impl ConsensusManager {
}
}

// Milestone 1:
// We want to only run 1 node (e.g. no network), implying the local node can reach a quorum
// alone and is always the proposer. Actually connecting to the network will require an external
// dependency.
fn create_fake_network_channels() -> BroadcastConsensusMessageChannel {
let messages_to_broadcast_fn: fn(ConsensusMessage) -> Ready<Result<Vec<u8>, SendError>> =
|_| todo!("messages_to_broadcast_sender should not be used");
let reported_messages_sender_fn: fn(
BroadcastedMessageMetadata,
) -> Ready<Result<PeerId, SendError>> =
|_| todo!("messages_to_broadcast_sender should not be used");
let broadcast_topic_client = BroadcastTopicClient::new(
mpsc::channel(0).0.with(messages_to_broadcast_fn),
mpsc::channel(0).0.with(reported_messages_sender_fn),
mpsc::channel(0).0,
);
BroadcastConsensusMessageChannel {
broadcasted_messages_receiver: Box::new(futures::stream::pending()),
broadcast_topic_client,
}
}

pub fn create_consensus_manager(
config: ConsensusManagerConfig,
batcher_client: SharedBatcherClient,
Expand Down

0 comments on commit ed273b2

Please sign in to comment.