Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(watcher): connection and calls to base node #1117

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion applications/tari_watcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,26 @@ license.workspace = true

[dependencies]

minotari_wallet_grpc_client = { workspace = true }
minotari_node_grpc_client = { workspace = true }
minotari_app_grpc = { workspace = true }
tari_common = { workspace = true }
tari_shutdown = { workspace = true }
clap = { workspace = true, features = ["derive"] }
serde = { workspace = true, features = ["derive"] }
anyhow = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal", "process", "time", "fs", "io-util"] }
tokio = { workspace = true, features = [
"rt-multi-thread",
"macros",
"signal",
"process",
"time",
"fs",
"io-util",
] }
log = { workspace = true }
fern = { workspace = true, features = ["colored"] }
tonic = { workspace = true }

toml = "0.8.12"
humantime = "2.1.0"
21 changes: 18 additions & 3 deletions applications/tari_watcher/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ impl Config {
writer.write_all(toml.as_bytes()).await?;
Ok(())
}

pub fn missing_conf(&self) -> Option<Vec<&str>> {
let mut v: Vec<&str> = Vec::new();
if self.base_node_grpc_address.is_empty() {
v.push("base_node_grpc_address");
}
if self.base_wallet_grpc_address.is_empty() {
v.push("base_wallet_grpc_address");
}
if v.is_empty() {
None
} else {
Some(v)
}
}
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -143,10 +158,10 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result<Config> {

Ok(Config {
auto_register: true,
base_node_grpc_address: "localhost:18142".to_string(),
base_wallet_grpc_address: "localhost:18143".to_string(),
sidechain_id: None,
base_node_grpc_address: "".to_string(),
base_wallet_grpc_address: "".to_string(),
base_dir: base_dir.clone(),
sidechain_id: None,
vn_registration_file: base_dir.join("registration.json"),
instance_config: instances.to_vec(),
executable_config: executables,
Expand Down
35 changes: 10 additions & 25 deletions applications/tari_watcher/src/forker.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,29 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{
env,
net::IpAddr,
path::{Path, PathBuf},
process::Stdio,
};
use std::{env, net::IpAddr, path::Path, process::Stdio};

use tokio::process::{Child, Command};

use crate::{
config::{ExecutableConfig, InstanceType},
port::PortAllocator,
};
use crate::config::{ExecutableConfig, InstanceType};

#[allow(dead_code)]
pub struct Forker {
// Used for the validator to connect to the base (L1) node
base_node_grpc_address: String,
// The base directory of calling the application
base_dir: PathBuf,
// The Tari L2 validator instance
validator: Option<Instance>,
// The Minotari L1 wallet instance
wallet: Option<Instance>,
// Child process of the forked validator instance.
// Includes PID and a handle to the process.
child: Option<Child>,
}

impl Forker {
pub fn new(base_node_grpc_address: String, base_dir: PathBuf) -> Self {
pub fn new() -> Self {
Self {
validator: None,
wallet: None,
base_node_grpc_address,
base_dir,
child: None,
}
}

pub async fn start_validator(&mut self, config: ExecutableConfig) -> anyhow::Result<Child> {
pub async fn start_validator(&mut self, config: ExecutableConfig) -> anyhow::Result<()> {
let instance = Instance::new(InstanceType::TariValidatorNode, config.clone());
self.validator = Some(instance.clone());

Expand All @@ -59,8 +45,9 @@ impl Forker {
.stdin(Stdio::null());

let child = cmd.spawn()?;
self.child = Some(child);

Ok(child)
Ok(())
}
}

Expand All @@ -70,7 +57,6 @@ struct Instance {
app: InstanceType,
config: ExecutableConfig,
listen_ip: Option<IpAddr>,
port: PortAllocator,
}

impl Instance {
Expand All @@ -79,7 +65,6 @@ impl Instance {
app,
config,
listen_ip: None,
port: PortAllocator::new(),
}
}
}
68 changes: 56 additions & 12 deletions applications/tari_watcher/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::time::SystemTime;
use std::{
path::{Path, PathBuf},
time::SystemTime,
};

use anyhow::{anyhow, Context};
use tokio::fs;
use anyhow::{anyhow, bail, Context};
use log::*;
use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::{fs, task};

use crate::{
cli::{Cli, Commands},
config::{get_base_config, Config},
manager::ProcessManager,
manager::{ManagerHandle, ProcessManager},
shutdown::exit_signal,
};

mod cli;
mod config;
mod forker;
mod manager;
mod port;
mod minotari;
mod shutdown;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -47,21 +54,58 @@ async fn main() -> anyhow::Result<()> {
log::info!("Config file created at {}", config_path.display());
},
Commands::Start(ref args) => {
let mut config = get_base_config(&cli)?;
let mut cfg = read_file(cli.get_config_path()).await?;
if let Some(conf) = cfg.missing_conf() {
bail!("Missing configuration values: {:?}", conf);
}

// optionally override config values
args.apply(&mut config);
start(config).await?;
args.apply(&mut cfg);
start(cfg).await?;
},
}

Ok(())
}

async fn start(config: Config) -> anyhow::Result<()> {
let mut manager = ProcessManager::new(config.clone());
manager.forker.start_validator(manager.validator_config).await?;
async fn read_file(path: PathBuf) -> anyhow::Result<Config> {
let p = Path::new(path.to_str().unwrap());
let content: String = fs::read_to_string(p).await.unwrap();
let config: Config = toml::from_str(&content)?;

Ok(())
Ok(config)
}

async fn start(config: Config) -> anyhow::Result<ManagerHandle> {
let shutdown = Shutdown::new();
let signal = shutdown.to_signal().select(exit_signal()?);
let (task_handle, mut manager_handle) = spawn(config.clone(), shutdown.to_signal()).await;

// Test ping #1 to base node
let tip = manager_handle.get_tip_info().await;
info!("[TEST] Tip status: {:?}", tip);

// Test ping #2 to base node
let vn_status = manager_handle.get_active_validator_nodes().await;
info!("[TEST] Active validators: {:?}", vn_status);

tokio::select! {
_ = signal => {
log::info!("Shutting down");
},
result = task_handle => {
result??;
log::info!("Process manager exited");
}
}

Ok(manager_handle)
}

async fn spawn(config: Config, shutdown: ShutdownSignal) -> (task::JoinHandle<anyhow::Result<()>>, ManagerHandle) {
let (manager, manager_handle) = ProcessManager::new(config, shutdown);
let task_handle = tokio::spawn(manager.start());
(task_handle, manager_handle)
}

fn setup_logger() -> Result<(), fern::InitError> {
Expand Down
92 changes: 89 additions & 3 deletions applications/tari_watcher/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,109 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use log::*;
use minotari_app_grpc::tari_rpc::{GetActiveValidatorNodesResponse, TipInfoResponse};
use tari_shutdown::ShutdownSignal;
use tokio::sync::{mpsc, oneshot};

use crate::{
config::{Config, ExecutableConfig},
forker::Forker,
minotari::Minotari,
};

pub struct ProcessManager {
pub validator_config: ExecutableConfig,
pub wallet_config: ExecutableConfig,
pub forker: Forker,
pub shutdown_signal: ShutdownSignal,
pub rx_request: mpsc::Receiver<ManagerRequest>,
pub chain: Minotari,
}

impl ProcessManager {
pub fn new(config: Config) -> Self {
Self {
pub fn new(config: Config, shutdown_signal: ShutdownSignal) -> (Self, ManagerHandle) {
let (tx_request, rx_request) = mpsc::channel(1);
let this = Self {
validator_config: config.executable_config[0].clone(),
wallet_config: config.executable_config[1].clone(),
forker: Forker::new(config.base_node_grpc_address, config.base_dir),
forker: Forker::new(),
shutdown_signal,
rx_request,
chain: Minotari::new(config.base_node_grpc_address, config.base_wallet_grpc_address),
};
(this, ManagerHandle::new(tx_request))
}

pub async fn start(mut self) -> anyhow::Result<()> {
info!("Starting validator node process");

self.forker.start_validator(self.validator_config.clone()).await?;
self.chain.bootstrap().await?;

loop {
tokio::select! {
Some(req) = self.rx_request.recv() => {
match req {
ManagerRequest::GetTipInfo { reply } => {
let response = self.chain.get_tip_status().await?;
drop(reply.send(Ok(response)));
}
ManagerRequest::GetActiveValidatorNodes { reply } => {
let response = self.chain.get_active_validator_nodes().await?;
drop(reply.send(Ok(response)));
}
ManagerRequest::RegisterValidatorNode => {
unimplemented!();
}
}
}

_ = self.shutdown_signal.wait() => {
info!("Shutting down process manager");
break;
}
}
}

Ok(())
}
}

type Reply<T> = oneshot::Sender<anyhow::Result<T>>;

pub enum ManagerRequest {
GetTipInfo {
reply: Reply<TipInfoResponse>,
},
GetActiveValidatorNodes {
reply: Reply<Vec<GetActiveValidatorNodesResponse>>,
},

#[allow(dead_code)]
RegisterValidatorNode, // TODO: populate types
}

pub struct ManagerHandle {
tx_request: mpsc::Sender<ManagerRequest>,
}

impl ManagerHandle {
pub fn new(tx_request: mpsc::Sender<ManagerRequest>) -> Self {
Self { tx_request }
}

pub async fn get_active_validator_nodes(&mut self) -> anyhow::Result<Vec<GetActiveValidatorNodesResponse>> {
let (tx, rx) = oneshot::channel();
self.tx_request
.send(ManagerRequest::GetActiveValidatorNodes { reply: tx })
.await?;
rx.await?
}

pub async fn get_tip_info(&mut self) -> anyhow::Result<TipInfoResponse> {
let (tx, rx) = oneshot::channel();
self.tx_request.send(ManagerRequest::GetTipInfo { reply: tx }).await?;
rx.await?
}
}
Loading
Loading