Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vote use quic.client.test2 #3571

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/next_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) fn upcoming_leader_tpu_vote_sockets(
cluster_info: &ClusterInfo,
poh_recorder: &RwLock<PohRecorder>,
fanout_slots: u64,
protocol: Protocol,
) -> Vec<SocketAddr> {
let upcoming_leaders = {
let poh_recorder = poh_recorder.read().unwrap();
Expand All @@ -29,7 +30,7 @@ pub(crate) fn upcoming_leader_tpu_vote_sockets(
.dedup()
.filter_map(|leader_pubkey| {
cluster_info
.lookup_contact_info(&leader_pubkey, |node| node.tpu_vote(Protocol::UDP))?
.lookup_contact_info(&leader_pubkey, |node| node.tpu_vote(protocol))?
.ok()
})
// dedup again since leaders could potentially share the same tpu vote socket
Expand Down
29 changes: 29 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4244,6 +4244,7 @@ pub(crate) mod tests {
},
crossbeam_channel::unbounded,
itertools::Itertools,
solana_client::connection_cache::ConnectionCache,
solana_entry::entry::{self, Entry},
solana_gossip::{cluster_info::Node, crds::Cursor},
solana_ledger::{
Expand Down Expand Up @@ -4274,6 +4275,7 @@ pub(crate) mod tests {
transaction::TransactionError,
},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_client::DEFAULT_VOTE_USE_QUIC,
solana_transaction_status::VersionedTransactionWithStatusMeta,
solana_vote_program::{
vote_state::{self, TowerSync, VoteStateVersions},
Expand Down Expand Up @@ -7558,11 +7560,18 @@ pub(crate) mod tests {
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();

let connection_cache = match DEFAULT_VOTE_USE_QUIC {
true => ConnectionCache::new_quic("connection_cache_vote_quic", 1),
false => ConnectionCache::with_udp("connection_cache_vote_udp", 1),
};

crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
&tower_storage,
vote_info,
Arc::new(connection_cache),
);

let mut cursor = Cursor::default();
Expand Down Expand Up @@ -7633,12 +7642,20 @@ pub(crate) mod tests {
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();

let connection_cache = match DEFAULT_VOTE_USE_QUIC {
true => ConnectionCache::new_quic("connection_cache_vote_quic", 1),
false => ConnectionCache::with_udp("connection_cache_vote_udp", 1),
};

crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
&tower_storage,
vote_info,
Arc::new(connection_cache),
);

let votes = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 1);
let vote_tx = &votes[0];
Expand Down Expand Up @@ -7716,11 +7733,17 @@ pub(crate) mod tests {
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();
let connection_cache = match DEFAULT_VOTE_USE_QUIC {
true => ConnectionCache::new_quic("connection_cache_vote_quic", 1),
false => ConnectionCache::with_udp("connection_cache_vote_udp", 1),
};

crate::voting_service::VotingService::handle_vote(
&cluster_info,
&poh_recorder,
&tower_storage,
vote_info,
Arc::new(connection_cache),
);

assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time);
Expand Down Expand Up @@ -7831,11 +7854,17 @@ pub(crate) mod tests {
let vote_info = voting_receiver
.recv_timeout(Duration::from_secs(1))
.unwrap();
let connection_cache = match DEFAULT_VOTE_USE_QUIC {
true => ConnectionCache::new_quic("connection_cache_vote_quic", 1),
false => ConnectionCache::with_udp("connection_cache_vote_udp", 1),
};

crate::voting_service::VotingService::handle_vote(
cluster_info,
poh_recorder,
tower_storage,
vote_info,
Arc::new(connection_cache),
);

let votes = cluster_info.get_votes(cursor);
Expand Down
9 changes: 9 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ impl Tvu {
cluster_slots: Arc<ClusterSlots>,
wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
slot_status_notifier: Option<SlotStatusNotifier>,
vote_connection_cache: Arc<ConnectionCache>,
) -> Result<Self, String> {
let in_wen_restart = wen_restart_repair_slots.is_some();

Expand Down Expand Up @@ -330,6 +331,7 @@ impl Tvu {
cluster_info.clone(),
poh_recorder.clone(),
tower_storage,
vote_connection_cache,
);

let warm_quic_cache_service = connection_cache.and_then(|connection_cache| {
Expand Down Expand Up @@ -435,6 +437,7 @@ pub mod tests {
solana_runtime::bank::Bank,
solana_sdk::signature::{Keypair, Signer},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_client::DEFAULT_VOTE_USE_QUIC,
std::sync::atomic::{AtomicU64, Ordering},
};

Expand Down Expand Up @@ -493,6 +496,11 @@ pub mod tests {
} else {
None
};
let connection_cache = match DEFAULT_VOTE_USE_QUIC {
true => ConnectionCache::new_quic("connection_cache_vote_quic", 1),
false => ConnectionCache::with_udp("connection_cache_vote_udp", 1),
};

let tvu = Tvu::new(
&vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
Expand Down Expand Up @@ -554,6 +562,7 @@ pub mod tests {
cluster_slots,
wen_restart_repair_slots,
None,
Arc::new(connection_cache),
)
.expect("assume success");
if enable_wen_restart {
Expand Down
27 changes: 27 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ impl Validator {
start_progress: Arc<RwLock<ValidatorStartProgress>>,
socket_addr_space: SocketAddrSpace,
use_quic: bool,
vote_use_quic: bool,
tpu_connection_pool_size: usize,
tpu_enable_udp: bool,
tpu_max_connections_per_ipaddr_per_minute: u64,
Expand Down Expand Up @@ -1012,6 +1013,28 @@ impl Validator {
)),
};

let vote_connection_cache = match vote_use_quic {
true => {
let vote_connection_cache = ConnectionCache::new_with_client_options(
"connection_cache_vote_quic",
1,
None,
Some((
&identity_keypair,
node.info
.tpu_vote(Protocol::QUIC)
.map_err(|err| {
ValidatorError::Other(format!("Invalid TPU address: {err:?}"))
})?
.ip(),
)),
Some((&staked_nodes, &identity_keypair.pubkey())),
);
Arc::new(vote_connection_cache)
}
false => Arc::new(ConnectionCache::with_udp("connection_cache_vote_udp", 1)),
};

let rpc_override_health_check =
Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
let (
Expand Down Expand Up @@ -1425,6 +1448,7 @@ impl Validator {
cluster_slots.clone(),
wen_restart_repair_slots.clone(),
slot_status_notifier,
vote_connection_cache,
)
.map_err(ValidatorError::Other)?;

Expand Down Expand Up @@ -2708,6 +2732,7 @@ mod tests {
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
DEFAULT_VOTE_USE_QUIC,
},
std::{fs::remove_dir_all, thread, time::Duration},
};
Expand Down Expand Up @@ -2747,6 +2772,7 @@ mod tests {
start_progress.clone(),
SocketAddrSpace::Unspecified,
DEFAULT_TPU_USE_QUIC,
DEFAULT_VOTE_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute for test
Expand Down Expand Up @@ -2966,6 +2992,7 @@ mod tests {
Arc::new(RwLock::new(ValidatorStartProgress::default())),
SocketAddrSpace::Unspecified,
DEFAULT_TPU_USE_QUIC,
DEFAULT_VOTE_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute for test
Expand Down
13 changes: 11 additions & 2 deletions core/src/voting_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
next_leader::upcoming_leader_tpu_vote_sockets,
},
crossbeam_channel::Receiver,
solana_client::connection_cache::ConnectionCache,
solana_gossip::cluster_info::ClusterInfo,
solana_measure::measure::Measure,
solana_poh::poh_recorder::PohRecorder,
Expand Down Expand Up @@ -48,6 +49,7 @@ impl VotingService {
cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<RwLock<PohRecorder>>,
tower_storage: Arc<dyn TowerStorage>,
connection_cache: Arc<ConnectionCache>,
) -> Self {
let thread_hdl = Builder::new()
.name("solVoteService".to_string())
Expand All @@ -58,6 +60,7 @@ impl VotingService {
&poh_recorder,
tower_storage.as_ref(),
vote_op,
connection_cache.clone(),
);
}
})
Expand All @@ -70,6 +73,7 @@ impl VotingService {
poh_recorder: &RwLock<PohRecorder>,
tower_storage: &dyn TowerStorage,
vote_op: VoteOp,
connection_cache: Arc<ConnectionCache>,
) {
if let VoteOp::PushVote { saved_tower, .. } = &vote_op {
let mut measure = Measure::start("tower storage save");
Expand All @@ -89,15 +93,20 @@ impl VotingService {
cluster_info,
poh_recorder,
UPCOMING_LEADER_FANOUT_SLOTS,
connection_cache.protocol(),
);

if !upcoming_leader_sockets.is_empty() {
for tpu_vote_socket in upcoming_leader_sockets {
let _ = cluster_info.send_transaction(vote_op.tx(), Some(tpu_vote_socket));
let _ = cluster_info.send_transaction(
vote_op.tx(),
Some(tpu_vote_socket),
&connection_cache,
);
}
} else {
// Send to our own tpu vote socket if we cannot find a leader to send to
let _ = cluster_info.send_transaction(vote_op.tx(), None);
let _ = cluster_info.send_transaction(vote_op.tx(), None, &connection_cache);
}

match vote_op {
Expand Down
18 changes: 13 additions & 5 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ use {
itertools::Itertools,
rand::{seq::SliceRandom, thread_rng, CryptoRng, Rng},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_client::connection_cache::ConnectionCache,
solana_connection_cache::client_connection::ClientConnection,
solana_feature_set::FeatureSet,
solana_ledger::shred::Shred,
solana_measure::measure::Measure,
Expand Down Expand Up @@ -154,7 +156,6 @@ pub struct ClusterInfo {
my_contact_info: RwLock<ContactInfo>,
ping_cache: Mutex<PingCache>,
stats: GossipStats,
socket: UdpSocket,
local_message_pending_push_queue: Mutex<Vec<CrdsValue>>,
contact_debug_interval: u64, // milliseconds, 0 = disabled
contact_save_interval: u64, // milliseconds, 0 = disabled
Expand Down Expand Up @@ -224,7 +225,6 @@ impl ClusterInfo {
GOSSIP_PING_CACHE_CAPACITY,
)),
stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
local_message_pending_push_queue: Mutex::default(),
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
instance: RwLock::new(NodeInstance::new(&mut thread_rng(), id, timestamp())),
Expand Down Expand Up @@ -929,13 +929,21 @@ impl ClusterInfo {
&self,
transaction: &Transaction,
tpu: Option<SocketAddr>,
connection_cache: &Arc<ConnectionCache>,
) -> Result<(), GossipError> {
let tpu = tpu
.map(Ok)
.unwrap_or_else(|| self.my_contact_info().tpu(contact_info::Protocol::UDP))?;
.unwrap_or_else(|| self.my_contact_info().tpu(connection_cache.protocol()))?;
let buf = serialize(transaction)?;
self.socket.send_to(&buf, tpu)?;
Ok(())
let client = connection_cache.get_connection(&tpu);
let result = client.send_data_async(buf);
match result {
Ok(_) => Ok(()),
Err(err) => {
debug!("Ran into exception sending vote: {err:?}");
Err(GossipError::SendError)
}
}
}

/// Returns votes inserted since the given cursor.
Expand Down
7 changes: 6 additions & 1 deletion local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use {
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_tpu_client::tpu_client::{
TpuClient, TpuClientConfig, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
},
solana_vote_program::{
vote_instruction,
Expand Down Expand Up @@ -96,6 +96,7 @@ pub struct ClusterConfig {
pub additional_accounts: Vec<(Pubkey, AccountSharedData)>,
pub tpu_use_quic: bool,
pub tpu_connection_pool_size: usize,
pub vote_use_quic: bool,
}

impl ClusterConfig {
Expand Down Expand Up @@ -135,6 +136,7 @@ impl Default for ClusterConfig {
additional_accounts: vec![],
tpu_use_quic: DEFAULT_TPU_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
}
}
}
Expand Down Expand Up @@ -339,6 +341,7 @@ impl LocalCluster {
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_VOTE_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
// We are turning tpu_enable_udp to true in order to prevent concurrent local cluster tests
// to use the same QUIC ports due to SO_REUSEPORT.
Expand Down Expand Up @@ -546,6 +549,7 @@ impl LocalCluster {
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_VOTE_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per mintute
Expand Down Expand Up @@ -1079,6 +1083,7 @@ impl Cluster for LocalCluster {
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_VOTE_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute, use higher value because of tests
Expand Down
2 changes: 2 additions & 0 deletions test-validator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use {
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
DEFAULT_VOTE_USE_QUIC,
},
std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -1045,6 +1046,7 @@ impl TestValidator {
config.start_progress.clone(),
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_VOTE_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
config.tpu_enable_udp,
32, // max connections per IpAddr per minute for test
Expand Down
1 change: 1 addition & 0 deletions tpu-client/src/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use {

pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
pub const DEFAULT_TPU_USE_QUIC: bool = true;
pub const DEFAULT_VOTE_USE_QUIC: bool = true;

/// The default connection count is set to 1 -- it should
/// be sufficient for most use cases. Validators can use
Expand Down
Loading
Loading