From c12a487f0bc48a389cfed9809956873e801b2b9d Mon Sep 17 00:00:00 2001 From: DenisRybas Date: Wed, 6 Dec 2023 02:39:48 +0300 Subject: [PATCH] updated quorum Signed-off-by: DenisRybas --- indy-besu/vdr/src/client/client.rs | 15 ++- .../src/client/implementation/web3/client.rs | 22 +++-- .../src/client/implementation/web3/quorum.rs | 95 +++++++++++++------ indy-besu/vdr/src/error/mod.rs | 3 + 4 files changed, 95 insertions(+), 40 deletions(-) diff --git a/indy-besu/vdr/src/client/client.rs b/indy-besu/vdr/src/client/client.rs index 74aeef511..acf5d38d7 100644 --- a/indy-besu/vdr/src/client/client.rs +++ b/indy-besu/vdr/src/client/client.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use log::{info, trace, warn}; +use web3::{transports::Http, Web3}; use crate::{ client::{ @@ -37,6 +38,7 @@ impl LedgerClient { // Transaction methods already depends on the client, so it make sence to accept signer on client create // Same time we can be rework it to accept callback instead of interface -> simplier from FFI perspective signer: Option>, + clients: Vec>, ) -> VdrResult { trace!( "Started creating new LedgerClient. Chain id: {}, node address: {}", @@ -44,7 +46,7 @@ impl LedgerClient { node_address ); - let client = Web3Client::new(node_address, signer)?; + let client = Web3Client::new(node_address, signer, clients)?; let contracts = Self::init_contracts(&client, contract_configs)?; let ledger_client = LedgerClient { @@ -165,6 +167,7 @@ pub mod test { pub const VALIDATOR_CONTROL_PATH: &str = "network/ValidatorControl.sol/ValidatorControl.json"; pub const ROLE_CONTROL_ADDRESS: &str = "0x0000000000000000000000000000000000006666"; pub const ROLE_CONTROL_PATH: &str = "auth/RoleControl.sol/RoleControl.json"; + pub const CLIENTS: Vec> = Vec::new(); fn build_contract_path(contract_path: &str) -> String { let mut cur_dir = env::current_dir().unwrap(); @@ -204,7 +207,14 @@ pub mod test { pub fn client(signer: Option) -> LedgerClient { let signer = signer.unwrap_or_else(basic_signer); - LedgerClient::new(CHAIN_ID, NODE_ADDRESS, &contracts(), Some(Box::new(signer))).unwrap() + LedgerClient::new( + CHAIN_ID, + NODE_ADDRESS, + &contracts(), + Some(Box::new(signer)), + CLIENTS, + ) + .unwrap() } mod create { @@ -235,6 +245,7 @@ pub mod test { wrong_node_address, &contracts(), Some(Box::new(basic_signer())), + CLIENTS, ) .unwrap(); match client.ping().await.unwrap().status { diff --git a/indy-besu/vdr/src/client/implementation/web3/client.rs b/indy-besu/vdr/src/client/implementation/web3/client.rs index 559660eaf..09d94ff95 100644 --- a/indy-besu/vdr/src/client/implementation/web3/client.rs +++ b/indy-besu/vdr/src/client/implementation/web3/client.rs @@ -9,27 +9,31 @@ use crate::{ }; use crate::client::{PingStatus, TransactionType}; -use async_std::io::timeout; use log::{trace, warn}; use serde_json::json; use std::{str::FromStr, time::Duration}; use web3::{ api::Eth, transports::Http, - types::{Address, Bytes, CallRequest, TransactionId, TransactionParameters, H256, U256}, - Transport, Web3, + types::{Address, Bytes, CallRequest, TransactionParameters, H256, U256}, + Web3, }; pub struct Web3Client { client: Web3, signer: Option>, + clients: Vec>, } const POLL_INTERVAL: u64 = 200; const NUMBER_TX_CONFIRMATIONS: usize = 1; // FIXME: what number of confirmation events should we wait? 2n+1? impl Web3Client { - pub fn new(node_address: &str, signer: Option>) -> VdrResult { + pub fn new( + node_address: &str, + signer: Option>, + clients: Vec>, + ) -> VdrResult { trace!( "Started creating new Web3Client. Node address: {}", node_address @@ -40,6 +44,7 @@ impl Web3Client { let web3_client = Web3Client { client: web3, signer, + clients, }; trace!("Created new Web3Client. Node address: {}", node_address); @@ -126,11 +131,7 @@ impl Client for Web3Client { }) } - async fn submit_transaction( - &self, - clients: Vec>, - transaction: &Transaction, - ) -> VdrResult> { + async fn submit_transaction(&self, transaction: &Transaction) -> VdrResult> { trace!( "Submit transaction process has started. Transaction: {:?}", transaction @@ -167,7 +168,8 @@ impl Client for Web3Client { ) .await?; - quorum::quorum_check(clients, receipt.transaction_hash); + let _quorum_result = + quorum::quorum_check(self.clients.clone(), receipt.transaction_hash).await?; trace!("Submitted transaction: {:?}", transaction); diff --git a/indy-besu/vdr/src/client/implementation/web3/quorum.rs b/indy-besu/vdr/src/client/implementation/web3/quorum.rs index 298b9f006..51bbf1d3d 100644 --- a/indy-besu/vdr/src/client/implementation/web3/quorum.rs +++ b/indy-besu/vdr/src/client/implementation/web3/quorum.rs @@ -1,7 +1,6 @@ use std::time::Duration; -use async_std::io::timeout; -use futures_util::future::try_join_all; +use tokio::time::timeout; use web3::{ transports::Http, types::{TransactionId, H256}, @@ -10,48 +9,88 @@ use web3::{ use crate::{VdrError, VdrResult}; +use std::sync::{Arc, Mutex}; + pub async fn quorum_check(clients: Vec>, transaction_hash: H256) -> VdrResult { + let quorum_reached = Arc::new(Mutex::new(false)); + let approvals_counter = Arc::new(Mutex::new(0)); + let finished_requests = Arc::new(Mutex::new(0)); + let quorum_result = timeout(Duration::from_secs(200), async { - let quorum_reached = false; - let required_approvals: u64 = clients.len().into() * 1 / 3 + 1; - let approvals_counter = 0; - let transaction_id = TransactionId::Hash(transaction_hash); + let clients_num: u16 = clients.len() as u16; + let required_approvals = clients_num / 3 + 1; + + let mut requests = Vec::new(); + + for client in clients { + let quorum_reached = quorum_reached.clone(); + let approvals_counter = approvals_counter.clone(); + let finished_requests = finished_requests.clone(); + + let request = tokio::spawn(async move { + let transaction_id = TransactionId::Hash(transaction_hash); + + loop { + let not_finished_requests = { + let finished_requests = finished_requests.lock().unwrap(); + clients_num - *finished_requests + }; + + let approvals_left = { + let approvals_counter = approvals_counter.lock().unwrap(); + required_approvals - *approvals_counter + }; + + if *quorum_reached.lock().unwrap() || not_finished_requests < approvals_left { + return; + } - let requests = clients.clone().iter().map(|client| { - tokio::spawn(async move { - let transaction = client.eth().transaction(transaction_id).await; + let transaction = client.eth().transaction(transaction_id.clone()).await; - if transaction.is_ok() { - approvals_counter += 1; + if transaction.is_ok() { + let mut approvals_counter = approvals_counter.lock().unwrap(); + *approvals_counter += 1; - if approvals_counter >= required_approvals { - quorum_reached = true; + if *approvals_counter >= required_approvals { + *quorum_reached.lock().unwrap() = true; + } } + + let mut finished_requests = finished_requests.lock().unwrap(); + *finished_requests += 1; } - }) - }); + }); + + requests.push(request); + } loop { - let finished_tasks = 0; + let not_finished_requests = { + let finished_requests = finished_requests.lock().unwrap(); + clients_num - *finished_requests + }; - for request in requests { - if request.is_finished() { - finished_tasks += 1 - } + let approvals_left = { + let approvals_counter = approvals_counter.lock().unwrap(); + required_approvals - *approvals_counter + }; - if finished_tasks == requests.len() || quorum_reached { - break; + if *quorum_reached.lock().unwrap() || not_finished_requests < approvals_left { + for request in requests { + if !request.is_finished() { + request.abort(); + } } - } - if finished_tasks == requests.len() || quorum_reached { break; } } - - Ok(quorum_reached) }) - .await?; + .await; - Ok(quorum_result) + match quorum_result { + //fix to handle quorum_reached false + Ok(_) => Ok(*quorum_reached.lock().unwrap()), + Err(e) => Err(VdrError::QuorumNotReached(e.to_string())), + } } diff --git a/indy-besu/vdr/src/error/mod.rs b/indy-besu/vdr/src/error/mod.rs index 81faddb33..e43cb056d 100644 --- a/indy-besu/vdr/src/error/mod.rs +++ b/indy-besu/vdr/src/error/mod.rs @@ -4,6 +4,9 @@ use thiserror::Error; #[derive(Error, Debug, PartialEq)] pub enum VdrError { + #[error("Ledger: Quorum not reached: {}", _0)] + QuorumNotReached(String), + #[error("Ledger Client: Node is unreachable")] ClientNodeUnreachable,