diff --git a/crates/papyrus_common/src/tcp.rs b/crates/papyrus_common/src/tcp.rs index 87199415ce..072f07cf1c 100644 --- a/crates/papyrus_common/src/tcp.rs +++ b/crates/papyrus_common/src/tcp.rs @@ -8,11 +8,17 @@ pub fn find_free_port() -> u16 { listener.local_addr().expect("Failed to get local address").port() } -pub fn find_n_free_ports() -> [u16; N] { +pub fn find_n_free_ports(n: usize) -> Vec { // The socket is automatically closed when the function exits. // The port may still be available when accessed, but this is not guaranteed. // TODO(Asmaa): find a reliable way to ensure the port stays free. - let listeners: [TcpListener; N] = - core::array::from_fn(|_i| TcpListener::bind("0.0.0.0:0").expect("Failed to bind")); - core::array::from_fn(|i| listeners[i].local_addr().expect("Failed to get local address").port()) + let listeners = + Vec::from_iter((0..n).map(|_| TcpListener::bind("0.0.0.0:0").expect("Failed to bind"))); + + let mut ports = Vec::with_capacity(n); + for listener in listeners { + let port = listener.local_addr().expect("Failed to get local address").port(); + ports.push(port); + } + ports } diff --git a/crates/papyrus_network/src/network_manager/test_utils.rs b/crates/papyrus_network/src/network_manager/test_utils.rs index 855cdc1904..e7b59e4695 100644 --- a/crates/papyrus_network/src/network_manager/test_utils.rs +++ b/crates/papyrus_network/src/network_manager/test_utils.rs @@ -148,9 +148,9 @@ where Ok(TestSubscriberChannels { subscriber_channels, mock_network }) } -// TODO(shahak): Change to n instead of 2. -pub fn create_two_connected_network_configs() -> (NetworkConfig, NetworkConfig) { - let [port0, port1] = find_n_free_ports::<2>(); +pub fn create_connected_network_configs(n: usize) -> Vec { + let mut ports = find_n_free_ports(n); + let port0 = ports.remove(0); let secret_key0 = [1u8; 32]; let public_key0 = Keypair::ed25519_from_bytes(secret_key0).unwrap().public(); @@ -160,37 +160,43 @@ pub fn create_two_connected_network_configs() -> (NetworkConfig, NetworkConfig) secret_key: Some(secret_key0.to_vec()), ..Default::default() }; - let config1 = NetworkConfig { - tcp_port: port1, - bootstrap_peer_multiaddr: Some( - Multiaddr::empty() - .with(Protocol::Ip4(Ipv4Addr::LOCALHOST)) - .with(Protocol::Tcp(port0)) - .with(Protocol::P2p(PeerId::from_public_key(&public_key0))), - ), - ..Default::default() - }; - (config0, config1) + let mut configs = Vec::with_capacity(n); + configs.push(config0); + for port in ports.iter() { + configs.push(NetworkConfig { + tcp_port: *port, + bootstrap_peer_multiaddr: Some( + Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::LOCALHOST)) + .with(Protocol::Tcp(port0)) + .with(Protocol::P2p(PeerId::from_public_key(&public_key0))), + ), + ..Default::default() + }); + } + configs } -pub fn create_network_config_connected_to_broadcast_channels( +pub fn create_network_configs_connected_to_broadcast_channels( + n_configs: usize, topic: Topic, -) -> (NetworkConfig, BroadcastTopicChannels) +) -> (Vec, BroadcastTopicChannels) where T: TryFrom + 'static, Bytes: From, { const BUFFER_SIZE: usize = 1000; - let (channels_config, result_config) = create_two_connected_network_configs(); + let mut channels_configs = create_connected_network_configs(n_configs + 1); + let broadcast_channels = channels_configs.pop().unwrap(); - let mut channels_network_manager = NetworkManager::new(channels_config, None); + let mut channels_network_manager = NetworkManager::new(broadcast_channels, None); let broadcast_channels = channels_network_manager.register_broadcast_topic(topic, BUFFER_SIZE).unwrap(); tokio::task::spawn(channels_network_manager.run()); - (result_config, broadcast_channels) + (channels_configs, broadcast_channels) } pub struct MockClientResponsesManager, Response: TryFrom> { diff --git a/crates/starknet_integration_tests/src/utils.rs b/crates/starknet_integration_tests/src/utils.rs index fb89e98051..bad9e4e375 100644 --- a/crates/starknet_integration_tests/src/utils.rs +++ b/crates/starknet_integration_tests/src/utils.rs @@ -12,7 +12,7 @@ use mempool_test_utils::starknet_api_test_utils::{ MultiAccountTransactionGenerator, }; use papyrus_consensus::config::ConsensusConfig; -use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels; +use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels; use papyrus_network::network_manager::BroadcastTopicChannels; use papyrus_protobuf::consensus::ProposalPart; use papyrus_storage::StorageConfig; @@ -55,8 +55,9 @@ pub async fn create_config( let gateway_config = create_gateway_config(chain_info.clone()).await; let http_server_config = create_http_server_config().await; let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr); - let (consensus_manager_config, consensus_proposals_channels) = - create_consensus_manager_config_and_channels(); + let (mut consensus_manager_configs, consensus_proposals_channels) = + create_consensus_manager_configs_and_channels(1); + let consensus_manager_config = consensus_manager_configs.pop().unwrap(); ( SequencerNodeConfig { batcher_config, @@ -76,23 +77,30 @@ pub async fn create_config( ) } -fn create_consensus_manager_config_and_channels() --> (ConsensusManagerConfig, BroadcastTopicChannels) { - let (network_config, broadcast_channels) = - create_network_config_connected_to_broadcast_channels( +fn create_consensus_manager_configs_and_channels( + n_managers: usize, +) -> (Vec, BroadcastTopicChannels) { + let (network_configs, broadcast_channels) = + create_network_configs_connected_to_broadcast_channels( + n_managers, papyrus_network::gossipsub_impl::Topic::new( starknet_consensus_manager::consensus_manager::NETWORK_TOPIC, ), ); - let consensus_manager_config = ConsensusManagerConfig { - consensus_config: ConsensusConfig { - start_height: BlockNumber(1), - consensus_delay: Duration::from_secs(1), - network_config, - ..Default::default() - }, - }; - (consensus_manager_config, broadcast_channels) + + let consensus_manager_configs = network_configs + .into_iter() + .map(|network_config| ConsensusManagerConfig { + consensus_config: ConsensusConfig { + start_height: BlockNumber(1), + consensus_delay: Duration::from_secs(1), + network_config, + ..Default::default() + }, + }) + .collect(); + + (consensus_manager_configs, broadcast_channels) } pub fn test_rpc_state_reader_config(rpc_server_addr: SocketAddr) -> RpcStateReaderConfig { diff --git a/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs b/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs index 2c1f5d42ae..9a410e1c7d 100644 --- a/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs +++ b/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs @@ -4,7 +4,7 @@ use std::net::SocketAddr; use futures::StreamExt; use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels; +use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels; use papyrus_protobuf::mempool::RpcTransactionWrapper; use rstest::{fixture, rstest}; use starknet_api::rpc_transaction::RpcTransaction; @@ -76,10 +76,12 @@ async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransacti let gateway_config = create_gateway_config(chain_info).await; let http_server_config = create_http_server_config().await; let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr); - let (network_config, mut broadcast_channels) = - create_network_config_connected_to_broadcast_channels::(Topic::new( - MEMPOOL_TOPIC, - )); + let (mut network_configs, mut broadcast_channels) = + create_network_configs_connected_to_broadcast_channels::( + 1, + Topic::new(MEMPOOL_TOPIC), + ); + let network_config = network_configs.pop().unwrap(); let mempool_p2p_config = MempoolP2pConfig { network_config, ..Default::default() }; let config = SequencerNodeConfig { components,