Skip to content

Commit

Permalink
feat(consensus): sequence transaction from foreign LocalPrepare/Accept
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Aug 20, 2024
1 parent 4278f20 commit bb7b1fc
Show file tree
Hide file tree
Showing 43 changed files with 879 additions and 348 deletions.
10 changes: 0 additions & 10 deletions applications/tari_dan_app_utilities/src/transaction_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,6 @@ where TTemplateProvider: TemplateProvider<Template = LoadedTemplate>
self.network,
);
let result = processor.execute(transaction.clone())?;
// Ok(result) => result,
// // TODO: This may occur due to an internal error (e.g. OOM, etc).
// Err(err) => ExecuteResult {
// finalize: FinalizeResult::new_rejected(
// tx_id,
// RejectReason::ExecutionFailure(format!("BUG: {err}")),
// ),
// execution_time: Duration::default(),
// },
// };

Ok(ExecutionOutput { transaction, result })
}
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_swarm_daemon/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub struct Overrides {
pub binaries_root: Option<PathBuf>,
#[clap(long)]
pub start_port: Option<u16>,
#[clap(short = 'k', long)]
pub skip_registration: bool,
}

impl Overrides {
Expand Down
7 changes: 6 additions & 1 deletion applications/tari_swarm_daemon/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
};

use crate::cli::Cli;
use crate::cli::{Cli, Commands};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Config {
Expand All @@ -26,6 +26,8 @@ pub struct Config {
pub webserver: WebserverConfig,
#[serde(flatten)]
pub processes: ProcessesConfig,
#[serde(default)]
pub skip_registration: bool,
}

impl Config {
Expand Down Expand Up @@ -54,6 +56,9 @@ impl Config {
}

fn overrides_from_cli(&mut self, cli: &Cli) {
if let Commands::Start(ref overrides) = cli.command {
self.skip_registration = overrides.skip_registration;
}
if let Some(ref base_dir) = cli.common.base_dir {
self.base_dir.clone_from(base_dir);
}
Expand Down
1 change: 1 addition & 0 deletions applications/tari_swarm_daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ fn get_base_config(cli: &Cli) -> anyhow::Result<Config> {
.unwrap_or_else(|| std::env::current_dir().unwrap());

Ok(Config {
skip_registration: false,
network: cli.common.network.unwrap_or(Network::LocalNet),
start_port: 12000,
base_dir: base_dir
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl InstanceManager {
self.fork_new(
executable,
instance.instance_type,
format!("{}-#{}", instance.name, i),
format!("{}-#{:02}", instance.name, i),
instance.settings.clone(),
)
.await?;
Expand Down
42 changes: 31 additions & 11 deletions applications/tari_swarm_daemon/src/process_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ pub struct ProcessManager {
instance_manager: InstanceManager,
rx_request: mpsc::Receiver<ProcessManagerRequest>,
shutdown_signal: ShutdownSignal,
skip_registration: bool,
}

impl ProcessManager {
pub fn new(config: &Config, shutdown_signal: ShutdownSignal) -> (Self, ProcessManagerHandle) {
let (tx_request, rx_request) = mpsc::channel(1);
let this = Self {
skip_registration: config.skip_registration,
executable_manager: ExecutableManager::new(
config.processes.executables.clone(),
config.processes.force_compile,
Expand All @@ -49,21 +51,39 @@ impl ProcessManager {
(this, ProcessManagerHandle::new(tx_request))
}

pub async fn start(mut self) -> anyhow::Result<()> {
async fn setup(&mut self) -> anyhow::Result<()> {
info!("Starting process manager");
let executables = self.executable_manager.prepare_all().await?;
self.instance_manager.fork_all(executables).await?;

let num_vns = self.instance_manager.num_validator_nodes();
// Mine some initial funds, guessing 10 blocks to allow for coinbase maturity
self.mine(num_vns + 10).await.context("mining failed")?;
self.wait_for_wallet_funds(num_vns)
.await
.context("waiting for wallet funds")?;
if !self.skip_registration {
let num_vns = self.instance_manager.num_validator_nodes();
// Mine some initial funds, guessing 10 blocks to allow for coinbase maturity
self.mine(num_vns + 10).await.context("mining failed")?;
self.wait_for_wallet_funds(num_vns)
.await
.context("waiting for wallet funds")?;

self.register_all_validator_nodes()
.await
.context("registering validator node via GRPC")?;
}

Ok(())
}

self.register_all_validator_nodes()
.await
.context("registering validator node via GRPC")?;
pub async fn start(mut self) -> anyhow::Result<()> {
let mut shutdown_signal = self.shutdown_signal.clone();

tokio::select! {
result = self.setup() => {
result?;
},
_ = shutdown_signal.wait() => {
info!("Shutting down process manager");
return Ok(());
}
}

loop {
tokio::select! {
Expand Down Expand Up @@ -228,7 +248,7 @@ impl ProcessManager {
// inputs for a transaction.
sleep(Duration::from_secs(2)).await;
}
self.mine(10).await?;
self.mine(20).await?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct ListValidatorNodesResponse {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidatorNodeInfo {
pub instance_id: InstanceId,
pub name: String,
pub web: String,
pub jrpc: String,
Expand All @@ -42,6 +43,7 @@ pub async fn list(
let jrpc = format!("http://localhost:{json_rpc_port}");

Ok(ValidatorNodeInfo {
instance_id: instance.id,
name: instance.name,
web,
jrpc,
Expand Down
39 changes: 24 additions & 15 deletions applications/tari_swarm_daemon/webui/src/routes/Main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,15 @@ async function jsonRpc2(address: string, method: string, params: any = null) {
return json.result;
}

function ExtraInfoVN({ name, url, setRow, addTxToPool, autoRefresh, state, horizontal }: {
function ExtraInfoVN({ name, url, addTxToPool, autoRefresh, state, horizontal }: {
name: string,
url: string,
setRow: any,
addTxToPool: any,
autoRefresh: boolean,
state: any,
horizontal: boolean
}) {
const [bucket, setBucket] = useState(null);
const [committeeInfo, setCommitteeInfo] = useState<any>(null);
const [epoch, setEpoch] = useState(null);
const [height, setHeight] = useState(null);
const [pool, setPool] = useState([]);
Expand All @@ -65,8 +64,8 @@ function ExtraInfoVN({ name, url, setRow, addTxToPool, autoRefresh, state, horiz
}, [tick, autoRefresh]);
useEffect(() => {
jsonRpc2(url, "get_epoch_manager_stats").then((resp) => {
setRow(resp.committee_shard.shard + 1);
setBucket(resp.committee_shard.shard);
// setRow(resp.committee_info.shard + 1);
setCommitteeInfo(resp.committee_info);
setHeight(resp.current_block_height);
setEpoch(resp.current_epoch);
}).catch((resp) => {
Expand Down Expand Up @@ -202,12 +201,12 @@ function ExtraInfoVN({ name, url, setRow, addTxToPool, autoRefresh, state, horiz
gridTemplateColumns: "auto auto",
gridTemplateRows: "auto auto auto auto auto",
}}>
<div><b>Bucket</b></div>
<div><b>Shard Group</b></div>
<div><b>Height</b></div>
<div><b>Epoch</b></div>
<div><b>Public key</b></div>
<div><b>Peer id</b></div>
<div>{bucket}</div>
<div>{committeeInfo?.shard_group.start}-{committeeInfo?.shard_group.end_inclusive} ({committeeInfo?.num_shard_group_members} members)</div>
<div>{height}</div>
<div>{epoch}</div>
<div>{publicKey}</div>
Expand All @@ -234,7 +233,6 @@ function ShowInfo(params: any) {
horizontal,
onReload,
} = params;
const [row, setRow] = useState(1);
// const [unprocessedTx, setUnprocessedTx] = useState([]);
const nameInfo = name && (
<div>
Expand Down Expand Up @@ -304,16 +302,15 @@ function ShowInfo(params: any) {


return (
<div className="info" key={name} style={{ gridRow: row }}>
<div className="info" key={name}>
{nameInfo}
{httpInfo}
{jrpcInfo}
{grpcInfo}
{showLogs && logInfo}
{executable === Executable.ValidatorNode && node?.jrpc &&
<ExtraInfoVN name={name} url={node.jrpc} setRow={(new_row: any) => {
if (new_row != row) setRow(new_row);
}} addTxToPool={addTxToPool} autoRefresh={autoRefresh} state={state} horizontal={horizontal} />}
<ExtraInfoVN name={name} url={node.jrpc} addTxToPool={addTxToPool} autoRefresh={autoRefresh} state={state}
horizontal={horizontal} />}
{executable !== Executable.Templates &&
<NodeControls
isRunning={node?.is_running || false}
Expand Down Expand Up @@ -364,11 +361,23 @@ function ShowInfos(params: any) {
setState((state) => ({ ...state, [partial_state.name]: partial_state.state }));
}
};

const sortedNodes = Object.keys(nodes).map((key) => [key, nodes[key]]);
sortedNodes.sort((a, b) => {
if (a[1].instance_id > b[1].instance_id) {
return 1;
}
if (a[1].instance_id < b[1].instance_id) {
return -1;
}
return 0;
});

return (
<div className="infos" style={{ display: "grid" }}>
{Object.keys(nodes).map((index) =>
<ShowInfo key={index} executable={executable} name={nodes[index].name} node={nodes[index]}
logs={logs?.[`${name} ${index}`]} stdoutLogs={stdoutLogs?.[`${name} ${index}`]}
{sortedNodes.map(([key, node]) =>
<ShowInfo key={key} executable={executable} name={node.name} node={node}
logs={logs?.[`${name} ${key}`]} stdoutLogs={stdoutLogs?.[`${name} ${key}`]}
showLogs={showLogs}
autoRefresh={autoRefresh} updateState={updateState} state={state} horizontal={horizontal}
onReload={onReload} />)}
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_swarm_daemon/webui/src/utils/json_rpc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ export async function jsonRpc(method: string, args: any = {}) {

json_id += 1;

const address = import.meta.env.VITE_DAEMON_JRPC_ADDRESS || "";
const address = import.meta.env.VITE_JSON_RPC_ADDRESS || import.meta.env.VITE_JRPC_ADDRESS || "/json_rpc";
const headers: { [key: string]: string } = { "Content-Type": "application/json" };
const response = await fetch(`${address}/json_rpc`, {
const response = await fetch(address, {
method: "POST",
body: JSON.stringify({
method: method,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,33 +85,6 @@ where
) -> Result<ExecutedTransaction, BlockTransactionExecutorError> {
let id = *transaction.id();

// if let Some(abort_reason) = transaction.abort_reason() {
// // TODO: Hacky - if a transaction uses DOWNed/non-existent inputs we error here. This changes the hard
// // error to a propose REJECT. So that we have involved shards, we use the inputs as resolved inputs and
// // assume v0 if version is not provided.
// let inputs = transaction
// .transaction()
// .all_inputs_iter()
// .map(|input| VersionedSubstateId::new(input.substate_id, input.version.unwrap_or(0)))
// .map(|id| VersionedSubstateIdLockIntent::new(id, SubstateLockFlag::Write))
// .collect();
// return Ok(ExecutedTransaction::new(
// transaction.into_transaction(),
// ExecuteResult {
// finalize: FinalizeResult {
// transaction_hash: id.into_array().into(),
// events: vec![],
// logs: vec![],
// execution_results: vec![],
// result: TransactionResult::Reject(abort_reason.clone()),
// fee_receipt: Default::default(),
// },
// },
// inputs,
// vec![],
// Duration::from_secs(0),
// ));
// }
info!(target: LOG_TARGET, "Transaction {} executing. Inputs: {:?}", id, resolved_inputs);

// Create a memory db with all the input substates, needed for the transaction execution
Expand Down
6 changes: 3 additions & 3 deletions applications/tari_validator_node/src/json_rpc/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ impl JsonRpcHandlers {
),
)
})?;
let committee_shard = self
let committee_info = self
.epoch_manager
.get_local_committee_info(current_epoch)
.await
Expand All @@ -611,8 +611,8 @@ impl JsonRpcHandlers {
current_epoch,
current_block_height,
current_block_hash,
is_valid: committee_shard.is_some(),
committee_info: committee_shard,
is_valid: committee_info.is_some(),
committee_info,
};
Ok(JsonRpcResponse::success(answer_id, response))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,5 +212,9 @@ impl MempoolGossip<PeerAddress> {
}

fn shard_group_to_topic(shard_group: ShardGroup) -> String {
format!("transactions-{}-{}", shard_group.start(), shard_group.end())
format!(
"transactions-{}-{}",
shard_group.start().as_u32(),
shard_group.end().as_u32()
)
}
11 changes: 9 additions & 2 deletions dan_layer/common_types/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{borrow::Borrow, cmp, ops::RangeInclusive};
use rand::{rngs::OsRng, seq::SliceRandom};
use serde::{Deserialize, Serialize};
use tari_common_types::types::PublicKey;
use tari_engine_types::substate::SubstateId;

use crate::{shard::Shard, Epoch, NumPreshards, ShardGroup, SubstateAddress};

Expand Down Expand Up @@ -203,7 +204,7 @@ impl CommitteeInfo {
(len - 1) / 3
}

pub fn num_shards(&self) -> NumPreshards {
pub fn num_preshards(&self) -> NumPreshards {
self.num_shards
}

Expand All @@ -215,12 +216,18 @@ impl CommitteeInfo {
self.shard_group.to_substate_address_range(self.num_shards)
}

// TODO: change these to take in a SubstateId
pub fn includes_substate_address(&self, substate_address: &SubstateAddress) -> bool {
let s = substate_address.to_shard(self.num_shards);
self.shard_group.contains(&s)
}

pub fn includes_substate_id(&self, substate_id: &SubstateId) -> bool {
// version doesnt affect shard
let addr = SubstateAddress::from_substate_id(substate_id, 0);
let shard = addr.to_shard(self.num_shards);
self.shard_group.contains(&shard)
}

pub fn includes_all_substate_addresses<I: IntoIterator<Item = B>, B: Borrow<SubstateAddress>>(
&self,
substate_addresses: I,
Expand Down
5 changes: 5 additions & 0 deletions dan_layer/consensus/src/hotstuff/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,9 @@ pub enum ProposalValidationError {
transaction_id: TransactionId,
details: String,
},
#[error(
"Foreign node submitted an foreign proposal {block_id} that did not contain any transaction evidence for this \
node"
)]
NoTransactionsInCommittee { block_id: BlockId },
}
Loading

0 comments on commit bb7b1fc

Please sign in to comment.