Skip to content

Commit

Permalink
updated quorum
Browse files Browse the repository at this point in the history
Signed-off-by: DenisRybas <[email protected]>
  • Loading branch information
DenisRybas committed Dec 5, 2023
1 parent 27af1d2 commit c12a487
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 40 deletions.
15 changes: 13 additions & 2 deletions indy-besu/vdr/src/client/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;

use log::{info, trace, warn};
use web3::{transports::Http, Web3};

use crate::{
client::{
Expand Down Expand Up @@ -37,14 +38,15 @@ impl LedgerClient {
// Transaction methods already depends on the client, so it make sence to accept signer on client create
// Same time we can be rework it to accept callback instead of interface -> simplier from FFI perspective
signer: Option<Box<dyn Signer>>,
clients: Vec<Web3<Http>>,
) -> VdrResult<LedgerClient> {
trace!(
"Started creating new LedgerClient. Chain id: {}, node address: {}",
chain_id,
node_address
);

let client = Web3Client::new(node_address, signer)?;
let client = Web3Client::new(node_address, signer, clients)?;
let contracts = Self::init_contracts(&client, contract_configs)?;

let ledger_client = LedgerClient {
Expand Down Expand Up @@ -165,6 +167,7 @@ pub mod test {
pub const VALIDATOR_CONTROL_PATH: &str = "network/ValidatorControl.sol/ValidatorControl.json";
pub const ROLE_CONTROL_ADDRESS: &str = "0x0000000000000000000000000000000000006666";
pub const ROLE_CONTROL_PATH: &str = "auth/RoleControl.sol/RoleControl.json";
pub const CLIENTS: Vec<Web3<Http>> = Vec::new();

fn build_contract_path(contract_path: &str) -> String {
let mut cur_dir = env::current_dir().unwrap();
Expand Down Expand Up @@ -204,7 +207,14 @@ pub mod test {

pub fn client(signer: Option<BasicSigner>) -> LedgerClient {
let signer = signer.unwrap_or_else(basic_signer);
LedgerClient::new(CHAIN_ID, NODE_ADDRESS, &contracts(), Some(Box::new(signer))).unwrap()
LedgerClient::new(
CHAIN_ID,
NODE_ADDRESS,
&contracts(),
Some(Box::new(signer)),
CLIENTS,
)
.unwrap()
}

mod create {
Expand Down Expand Up @@ -235,6 +245,7 @@ pub mod test {
wrong_node_address,
&contracts(),
Some(Box::new(basic_signer())),
CLIENTS,
)
.unwrap();
match client.ping().await.unwrap().status {
Expand Down
22 changes: 12 additions & 10 deletions indy-besu/vdr/src/client/implementation/web3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,31 @@ use crate::{
};

use crate::client::{PingStatus, TransactionType};
use async_std::io::timeout;
use log::{trace, warn};
use serde_json::json;
use std::{str::FromStr, time::Duration};
use web3::{
api::Eth,
transports::Http,
types::{Address, Bytes, CallRequest, TransactionId, TransactionParameters, H256, U256},
Transport, Web3,
types::{Address, Bytes, CallRequest, TransactionParameters, H256, U256},
Web3,
};

pub struct Web3Client {
client: Web3<Http>,
signer: Option<Box<dyn Signer>>,
clients: Vec<Web3<Http>>,
}

const POLL_INTERVAL: u64 = 200;
const NUMBER_TX_CONFIRMATIONS: usize = 1; // FIXME: what number of confirmation events should we wait? 2n+1?

impl Web3Client {
pub fn new(node_address: &str, signer: Option<Box<dyn Signer>>) -> VdrResult<Web3Client> {
pub fn new(
node_address: &str,
signer: Option<Box<dyn Signer>>,
clients: Vec<Web3<Http>>,
) -> VdrResult<Web3Client> {
trace!(
"Started creating new Web3Client. Node address: {}",
node_address
Expand All @@ -40,6 +44,7 @@ impl Web3Client {
let web3_client = Web3Client {
client: web3,
signer,
clients,
};

trace!("Created new Web3Client. Node address: {}", node_address);
Expand Down Expand Up @@ -126,11 +131,7 @@ impl Client for Web3Client {
})
}

async fn submit_transaction(
&self,
clients: Vec<Web3<Http>>,
transaction: &Transaction,
) -> VdrResult<Vec<u8>> {
async fn submit_transaction(&self, transaction: &Transaction) -> VdrResult<Vec<u8>> {
trace!(
"Submit transaction process has started. Transaction: {:?}",
transaction
Expand Down Expand Up @@ -167,7 +168,8 @@ impl Client for Web3Client {
)
.await?;

quorum::quorum_check(clients, receipt.transaction_hash);
let _quorum_result =
quorum::quorum_check(self.clients.clone(), receipt.transaction_hash).await?;

trace!("Submitted transaction: {:?}", transaction);

Expand Down
95 changes: 67 additions & 28 deletions indy-besu/vdr/src/client/implementation/web3/quorum.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::time::Duration;

use async_std::io::timeout;
use futures_util::future::try_join_all;
use tokio::time::timeout;
use web3::{
transports::Http,
types::{TransactionId, H256},
Expand All @@ -10,48 +9,88 @@ use web3::{

use crate::{VdrError, VdrResult};

use std::sync::{Arc, Mutex};

pub async fn quorum_check(clients: Vec<Web3<Http>>, transaction_hash: H256) -> VdrResult<bool> {
let quorum_reached = Arc::new(Mutex::new(false));
let approvals_counter = Arc::new(Mutex::new(0));
let finished_requests = Arc::new(Mutex::new(0));

let quorum_result = timeout(Duration::from_secs(200), async {
let quorum_reached = false;
let required_approvals: u64 = clients.len().into() * 1 / 3 + 1;
let approvals_counter = 0;
let transaction_id = TransactionId::Hash(transaction_hash);
let clients_num: u16 = clients.len() as u16;
let required_approvals = clients_num / 3 + 1;

let mut requests = Vec::new();

for client in clients {
let quorum_reached = quorum_reached.clone();
let approvals_counter = approvals_counter.clone();
let finished_requests = finished_requests.clone();

let request = tokio::spawn(async move {
let transaction_id = TransactionId::Hash(transaction_hash);

loop {
let not_finished_requests = {
let finished_requests = finished_requests.lock().unwrap();
clients_num - *finished_requests
};

let approvals_left = {
let approvals_counter = approvals_counter.lock().unwrap();
required_approvals - *approvals_counter
};

if *quorum_reached.lock().unwrap() || not_finished_requests < approvals_left {
return;
}

let requests = clients.clone().iter().map(|client| {
tokio::spawn(async move {
let transaction = client.eth().transaction(transaction_id).await;
let transaction = client.eth().transaction(transaction_id.clone()).await;

if transaction.is_ok() {
approvals_counter += 1;
if transaction.is_ok() {
let mut approvals_counter = approvals_counter.lock().unwrap();
*approvals_counter += 1;

if approvals_counter >= required_approvals {
quorum_reached = true;
if *approvals_counter >= required_approvals {
*quorum_reached.lock().unwrap() = true;
}
}

let mut finished_requests = finished_requests.lock().unwrap();
*finished_requests += 1;
}
})
});
});

requests.push(request);
}

loop {
let finished_tasks = 0;
let not_finished_requests = {
let finished_requests = finished_requests.lock().unwrap();
clients_num - *finished_requests
};

for request in requests {
if request.is_finished() {
finished_tasks += 1
}
let approvals_left = {
let approvals_counter = approvals_counter.lock().unwrap();
required_approvals - *approvals_counter
};

if finished_tasks == requests.len() || quorum_reached {
break;
if *quorum_reached.lock().unwrap() || not_finished_requests < approvals_left {
for request in requests {
if !request.is_finished() {
request.abort();
}
}
}

if finished_tasks == requests.len() || quorum_reached {
break;
}
}

Ok(quorum_reached)
})
.await?;
.await;

Ok(quorum_result)
match quorum_result {
//fix to handle quorum_reached false
Ok(_) => Ok(*quorum_reached.lock().unwrap()),
Err(e) => Err(VdrError::QuorumNotReached(e.to_string())),
}
}
3 changes: 3 additions & 0 deletions indy-besu/vdr/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use thiserror::Error;

#[derive(Error, Debug, PartialEq)]
pub enum VdrError {
#[error("Ledger: Quorum not reached: {}", _0)]
QuorumNotReached(String),

#[error("Ledger Client: Node is unreachable")]
ClientNodeUnreachable,

Expand Down

0 comments on commit c12a487

Please sign in to comment.