Skip to content

Commit

Permalink
fix(indexer): reuse validator RPC sessions (#1057)
Browse files Browse the repository at this point in the history
Description
---
caches and reuses validator RPC sessions

Motivation and Context
---
When submitting many transactions through the indexer (using tariswap
test) new RPC sessions would hit the session limit. This PR keeps hold
of RPC sessions and reuses them (requests are queued) instead of
creating new sessions.

How Has This Been Tested?
---
Tariswap test works.

What process can a PR reviewer use to test or verify this change?
---
Tariswap test (`utilities/tariswap_test_bench`)

Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify

---------

Co-authored-by: Miguel Naveira <[email protected]>
  • Loading branch information
sdbondi and mrnaveira authored Jun 21, 2024
1 parent 0ebb1fb commit 8c64be5
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ where TConsensusSpec: ConsensusSpec
if let Some(next_block) = self.store.with_read_tx(|tx| last_proposed.get_block(tx)).optional()? {
info!(
target: LOG_TARGET,
"🌿 RE-BROADCASTING locally block {}({}) to {} validators. {} command(s), justify: {} ({}), parent: {}",
"🌿 RE-BROADCASTING local block {}({}) to {} validators. {} command(s), justify: {} ({}), parent: {}",
next_block.id(),
next_block.height(),
local_committee.len(),
Expand Down
1 change: 1 addition & 0 deletions dan_layer/validator_node_rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async-trait = { workspace = true }
prost = { workspace = true }
serde = { workspace = true, default-features = true }
thiserror = { workspace = true }
tokio = { workspace = true, default-features = false, features = ["sync"] }

[build-dependencies]
proto_builder = { workspace = true }
Expand Down
59 changes: 43 additions & 16 deletions dan_layer/validator_node_rpc/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{convert::TryInto, time::Duration};
use std::{collections::HashMap, convert::TryInto, sync::Arc, time::Duration};

use anyhow::anyhow;
use async_trait::async_trait;
Expand All @@ -21,6 +21,7 @@ use tari_engine_types::{
};
use tari_networking::{MessageSpec, NetworkingHandle};
use tari_transaction::{Transaction, TransactionId};
use tokio::sync::RwLock;

use crate::{rpc_service, ValidatorNodeRpcClientError};

Expand Down Expand Up @@ -78,24 +79,15 @@ pub enum SubstateResult {
}

pub struct TariValidatorNodeRpcClient<TMsg: MessageSpec> {
networking: NetworkingHandle<TMsg>,
address: PeerAddress,
connection: Option<rpc_service::ValidatorNodeRpcClient>,
pool: RpcPool<TMsg>,
}

impl<TMsg: MessageSpec> TariValidatorNodeRpcClient<TMsg> {
pub async fn client_connection(
&mut self,
) -> Result<rpc_service::ValidatorNodeRpcClient, ValidatorNodeRpcClientError> {
if let Some(ref client) = self.connection {
if client.is_connected() {
return Ok(client.clone());
}
}

let client: rpc_service::ValidatorNodeRpcClient =
self.networking.connect_rpc(self.address.as_peer_id()).await?;
self.connection = Some(client.clone());
let client = self.pool.get_or_connect(&self.address).await?;
Ok(client)
}
}
Expand Down Expand Up @@ -251,12 +243,14 @@ impl<TMsg: MessageSpec> ValidatorNodeRpcClient for TariValidatorNodeRpcClient<TM

#[derive(Clone, Debug)]
pub struct TariValidatorNodeRpcClientFactory {
networking: NetworkingHandle<TariMessagingSpec>,
pool: RpcPool<TariMessagingSpec>,
}

impl TariValidatorNodeRpcClientFactory {
pub fn new(networking: NetworkingHandle<TariMessagingSpec>) -> Self {
Self { networking }
Self {
pool: RpcPool::new(networking),
}
}
}

Expand All @@ -266,9 +260,42 @@ impl ValidatorNodeClientFactory for TariValidatorNodeRpcClientFactory {

fn create_client(&self, address: &Self::Addr) -> Self::Client {
TariValidatorNodeRpcClient {
networking: self.networking.clone(),
address: *address,
connection: None,
pool: self.pool.clone(),
}
}
}

#[derive(Debug, Clone)]
pub struct RpcPool<TMsg: MessageSpec> {
sessions: Arc<RwLock<HashMap<PeerAddress, rpc_service::ValidatorNodeRpcClient>>>,
networking: NetworkingHandle<TMsg>,
}

impl<TMsg: MessageSpec> RpcPool<TMsg> {
pub fn new(networking: NetworkingHandle<TMsg>) -> Self {
Self {
sessions: Default::default(),
networking,
}
}

async fn get_or_connect(
&mut self,
addr: &PeerAddress,
) -> Result<rpc_service::ValidatorNodeRpcClient, ValidatorNodeRpcClientError> {
let mut sessions = self.sessions.write().await;
if let Some(client) = sessions.get(addr) {
if client.is_connected() {
return Ok(client.clone());
} else {
sessions.remove(addr);
}
}

let client: rpc_service::ValidatorNodeRpcClient = self.networking.connect_rpc(addr.as_peer_id()).await?;
sessions.insert(*addr, client.clone());

Ok(client)
}
}
2 changes: 1 addition & 1 deletion networking/rpc_framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

/// Maximum frame size of each RPC message. This is enforced in tokio's length delimited codec.
/// This can be thought of as the hard limit on message size.
pub const RPC_MAX_FRAME_SIZE: usize = 3 * 1024 * 1024; // 3 MiB
pub const RPC_MAX_FRAME_SIZE: usize = 6 * 1024 * 1024; // 6 MiB

/// The maximum request payload size
const fn max_request_size() -> usize {
Expand Down

0 comments on commit 8c64be5

Please sign in to comment.