Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tests_integration): one to many network config #2253

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions crates/papyrus_common/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ pub fn find_free_port() -> u16 {
listener.local_addr().expect("Failed to get local address").port()
}

pub fn find_n_free_ports<const N: usize>() -> [u16; N] {
pub fn find_n_free_ports(n: usize) -> Vec<u16> {
// The socket is automatically closed when the function exits.
// The port may still be available when accessed, but this is not guaranteed.
// TODO(Asmaa): find a reliable way to ensure the port stays free.
let listeners: [TcpListener; N] =
core::array::from_fn(|_i| TcpListener::bind("0.0.0.0:0").expect("Failed to bind"));
core::array::from_fn(|i| listeners[i].local_addr().expect("Failed to get local address").port())
let listeners =
Vec::from_iter((0..n).map(|_| TcpListener::bind("0.0.0.0:0").expect("Failed to bind")));

let mut ports = Vec::with_capacity(n);
for listener in listeners {
let port = listener.local_addr().expect("Failed to get local address").port();
ports.push(port);
}
ports
}
44 changes: 25 additions & 19 deletions crates/papyrus_network/src/network_manager/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ where
Ok(TestSubscriberChannels { subscriber_channels, mock_network })
}

// TODO(shahak): Change to n instead of 2.
pub fn create_two_connected_network_configs() -> (NetworkConfig, NetworkConfig) {
let [port0, port1] = find_n_free_ports::<2>();
pub fn create_connected_network_configs(n: usize) -> Vec<NetworkConfig> {
let mut ports = find_n_free_ports(n);
let port0 = ports.remove(0);

let secret_key0 = [1u8; 32];
let public_key0 = Keypair::ed25519_from_bytes(secret_key0).unwrap().public();
Expand All @@ -160,37 +160,43 @@ pub fn create_two_connected_network_configs() -> (NetworkConfig, NetworkConfig)
secret_key: Some(secret_key0.to_vec()),
..Default::default()
};
let config1 = NetworkConfig {
tcp_port: port1,
bootstrap_peer_multiaddr: Some(
Multiaddr::empty()
.with(Protocol::Ip4(Ipv4Addr::LOCALHOST))
.with(Protocol::Tcp(port0))
.with(Protocol::P2p(PeerId::from_public_key(&public_key0))),
),
..Default::default()
};
(config0, config1)
let mut configs = Vec::with_capacity(n);
configs.push(config0);
for port in ports.iter() {
configs.push(NetworkConfig {
tcp_port: *port,
bootstrap_peer_multiaddr: Some(
Multiaddr::empty()
.with(Protocol::Ip4(Ipv4Addr::LOCALHOST))
.with(Protocol::Tcp(port0))
.with(Protocol::P2p(PeerId::from_public_key(&public_key0))),
),
..Default::default()
});
}
configs
}

pub fn create_network_config_connected_to_broadcast_channels<T>(
pub fn create_network_configs_connected_to_broadcast_channels<T>(
n_configs: usize,
topic: Topic,
) -> (NetworkConfig, BroadcastTopicChannels<T>)
) -> (Vec<NetworkConfig>, BroadcastTopicChannels<T>)
where
T: TryFrom<Bytes> + 'static,
Bytes: From<T>,
{
const BUFFER_SIZE: usize = 1000;

let (channels_config, result_config) = create_two_connected_network_configs();
let mut channels_configs = create_connected_network_configs(n_configs + 1);
let broadcast_channels = channels_configs.pop().unwrap();

let mut channels_network_manager = NetworkManager::new(channels_config, None);
let mut channels_network_manager = NetworkManager::new(broadcast_channels, None);
let broadcast_channels =
channels_network_manager.register_broadcast_topic(topic, BUFFER_SIZE).unwrap();

tokio::task::spawn(channels_network_manager.run());

(result_config, broadcast_channels)
(channels_configs, broadcast_channels)
}

pub struct MockClientResponsesManager<Query: TryFrom<Bytes>, Response: TryFrom<Bytes>> {
Expand Down
40 changes: 24 additions & 16 deletions crates/starknet_integration_tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use mempool_test_utils::starknet_api_test_utils::{
MultiAccountTransactionGenerator,
};
use papyrus_consensus::config::ConsensusConfig;
use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels;
use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use papyrus_storage::StorageConfig;
Expand Down Expand Up @@ -55,8 +55,9 @@ pub async fn create_config(
let gateway_config = create_gateway_config(chain_info.clone()).await;
let http_server_config = create_http_server_config().await;
let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr);
let (consensus_manager_config, consensus_proposals_channels) =
create_consensus_manager_config_and_channels();
let (mut consensus_manager_configs, consensus_proposals_channels) =
create_consensus_manager_configs_and_channels(1);
let consensus_manager_config = consensus_manager_configs.pop().unwrap();
(
SequencerNodeConfig {
batcher_config,
Expand All @@ -76,23 +77,30 @@ pub async fn create_config(
)
}

fn create_consensus_manager_config_and_channels()
-> (ConsensusManagerConfig, BroadcastTopicChannels<ProposalPart>) {
let (network_config, broadcast_channels) =
create_network_config_connected_to_broadcast_channels(
fn create_consensus_manager_configs_and_channels(
n_managers: usize,
) -> (Vec<ConsensusManagerConfig>, BroadcastTopicChannels<ProposalPart>) {
let (network_configs, broadcast_channels) =
create_network_configs_connected_to_broadcast_channels(
n_managers,
papyrus_network::gossipsub_impl::Topic::new(
starknet_consensus_manager::consensus_manager::NETWORK_TOPIC,
),
);
let consensus_manager_config = ConsensusManagerConfig {
consensus_config: ConsensusConfig {
start_height: BlockNumber(1),
consensus_delay: Duration::from_secs(1),
network_config,
..Default::default()
},
};
(consensus_manager_config, broadcast_channels)

let consensus_manager_configs = network_configs
.into_iter()
.map(|network_config| ConsensusManagerConfig {
consensus_config: ConsensusConfig {
start_height: BlockNumber(1),
consensus_delay: Duration::from_secs(1),
network_config,
..Default::default()
},
})
.collect();

(consensus_manager_configs, broadcast_channels)
}

pub fn test_rpc_state_reader_config(rpc_server_addr: SocketAddr) -> RpcStateReaderConfig {
Expand Down
12 changes: 7 additions & 5 deletions crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::net::SocketAddr;
use futures::StreamExt;
use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels;
use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels;
use papyrus_protobuf::mempool::RpcTransactionWrapper;
use rstest::{fixture, rstest};
use starknet_api::rpc_transaction::RpcTransaction;
Expand Down Expand Up @@ -76,10 +76,12 @@ async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransacti
let gateway_config = create_gateway_config(chain_info).await;
let http_server_config = create_http_server_config().await;
let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr);
let (network_config, mut broadcast_channels) =
create_network_config_connected_to_broadcast_channels::<RpcTransactionWrapper>(Topic::new(
MEMPOOL_TOPIC,
));
let (mut network_configs, mut broadcast_channels) =
create_network_configs_connected_to_broadcast_channels::<RpcTransactionWrapper>(
1,
Topic::new(MEMPOOL_TOPIC),
);
let network_config = network_configs.pop().unwrap();
let mempool_p2p_config = MempoolP2pConfig { network_config, ..Default::default() };
let config = SequencerNodeConfig {
components,
Expand Down
Loading