Skip to content

Commit

Permalink
monitor alerts
Browse files Browse the repository at this point in the history
  • Loading branch information
ii-cruz committed Nov 8, 2023
1 parent be44684 commit bab2e00
Showing 1 changed file with 99 additions and 23 deletions.
122 changes: 99 additions & 23 deletions src/bin/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use nimiq_keys::PublicKey;
use snark_setup_operator::{
data_structs::{Ceremony, Response},
data_structs::{Ceremony, Response, UniqueChunkId},
error::MonitorError,
};

use anyhow::Result;
use chrono::Duration;
use chrono::{DateTime, Duration, Utc};
use gumdrop::Options;
use std::collections::HashSet;
use tracing::info;
use tracing::{error, info, warn};
use url::Url;

#[derive(Debug, Options, Clone)]
Expand All @@ -20,39 +21,82 @@ pub struct MonitorOpts {
pub coordinator_url: String,
#[options(help = "polling interval in minutes", default = "1")]
pub polling_interval: u64,
#[options(help = "timeout in minutes", default = "1")]
pub timeout: i64,
#[options(help = "chunk lock timeout in minutes", default = "1")]
pub chunk_timeout: i64,
#[options(help = "ceremony timeout in minutes", default = "1")]
pub ceremony_timeout: i64,
}

pub struct Monitor {
// Settings
pub server_url: Url,
pub timeout: Duration,
pub ceremony_timeout: Duration,

// Last changed values in the ceremony
pub last_ceremony_version: u64,
pub last_ceremony_update: DateTime<Utc>,

pub last_setups_all_done: Vec<bool>,

pub last_timed_out_participant_ids: Vec<PublicKey>,

pub last_chunks_complete: Vec<UniqueChunkId>,
pub last_chunks_incomplete: Vec<UniqueChunkId>,
pub participant_ids_incomplete: Vec<PublicKey>,
}

impl Monitor {
pub fn new(opts: &MonitorOpts) -> Result<Self> {
let monitor = Self {
server_url: Url::parse(&opts.coordinator_url)?.join("ceremony")?,
timeout: Duration::minutes(opts.timeout),
timeout: Duration::minutes(opts.chunk_timeout),
ceremony_timeout: Duration::minutes(opts.ceremony_timeout),
last_ceremony_version: 0,
last_ceremony_update: chrono::Utc::now(),
last_setups_all_done: vec![],
last_timed_out_participant_ids: vec![],
last_chunks_complete: vec![],
last_chunks_incomplete: vec![],
participant_ids_incomplete: vec![],
};
Ok(monitor)
}

async fn run(&self) -> Result<()> {
async fn run(&mut self) -> Result<()> {
let response = reqwest::get(self.server_url.as_str())
.await?
.error_for_status()?;
let data = response.text().await?;
let ceremony: Ceremony = serde_json::from_str::<Response<Ceremony>>(&data)?.result;

self.check_progress(&ceremony)?;
self.check_timeout(&ceremony)?;
self.check_all_done(&ceremony)?;
self.show_finished_chunks(&ceremony)?;

Ok(())
}

fn check_timeout(&self, ceremony: &Ceremony) -> Result<()> {
fn check_progress(&mut self, ceremony: &Ceremony) -> Result<()> {
let current_time = chrono::Utc::now();
let elapsed = current_time - self.last_ceremony_update;
if ceremony.version == self.last_ceremony_version {
if self.ceremony_timeout > elapsed {
warn!(
"Ceremony progress is stuck at version {:?} for {:?}",
ceremony.version, elapsed
);
}
} else {
self.last_ceremony_update = current_time;
self.last_ceremony_version = ceremony.version;
}

Ok(())
}

fn check_timeout(&mut self, ceremony: &Ceremony) -> Result<()> {
let current_time = chrono::Utc::now();
let mut timed_out_participant_ids = vec![];

Expand All @@ -75,16 +119,23 @@ impl Monitor {
}
}
}
info!("timed out participants: {:?}", timed_out_participant_ids);
if !self
.last_timed_out_participant_ids
.eq(&timed_out_participant_ids)
{
warn!("timed out participants: {:?}", timed_out_participant_ids);
self.last_timed_out_participant_ids = timed_out_participant_ids;
}

Ok(())
}

fn check_all_done(&self, ceremony: &Ceremony) -> Result<()> {
fn check_all_done(&mut self, ceremony: &Ceremony) -> Result<()> {
let participant_ids: HashSet<_> = ceremony.contributor_ids.iter().clone().collect();
let mut last_setups_all_done = vec![];

for setup in ceremony.setups.iter() {
if setup.chunks.iter().all(|chunk| {
let done = setup.chunks.iter().all(|chunk| {
let verified_participant_ids_in_chunk: HashSet<_> = chunk
.contributions
.iter()
Expand All @@ -95,16 +146,22 @@ impl Monitor {
participant_ids
.iter()
.all(|p| verified_participant_ids_in_chunk.contains(*p))
}) {
info!("setup {:?} all done", setup.setup_id);
} else {
info!("setup {:?} not finished", setup.setup_id);
});
last_setups_all_done.push(done);
let index = setup.setup_id.len() - 1;

if self.last_setups_all_done[index] != last_setups_all_done[index] {
warn!(
"setup {:?} all done: {}",
setup.setup_id, last_setups_all_done[index]
);
self.last_setups_all_done[index] = last_setups_all_done[index];
}
}
Ok(())
}

fn show_finished_chunks(&self, ceremony: &Ceremony) -> Result<()> {
fn show_finished_chunks(&mut self, ceremony: &Ceremony) -> Result<()> {
let participant_ids: HashSet<_> = ceremony.contributor_ids.iter().clone().collect();

let mut chunks_complete = vec![];
Expand All @@ -117,7 +174,7 @@ impl Monitor {
.contributions
.iter()
.filter(|c| c.verified)
.map(|c| c.contributor_id.as_ref())
.map(|c| c.contributor_id)
.filter_map(|e| e)
.collect();
if participant_ids
Expand All @@ -130,16 +187,35 @@ impl Monitor {
.iter()
.filter(|x| !verified_participant_ids_in_chunk.contains(*x))
.for_each(|p| {
participant_ids_incomplete.insert(p);
participant_ids_incomplete.insert((*p).clone());
});
chunks_incomplete.push(chunk.unique_chunk_id.clone())
}
}
}

info!("complete chunks: {:?}", chunks_complete);
info!("incomplete chunks: {:?}", chunks_incomplete);
info!("incomplete participants: {:?}", participant_ids_incomplete);
if !self.last_chunks_complete.eq(&chunks_complete) {
info!("complete chunks: {:?}", chunks_complete);
self.last_chunks_complete = chunks_complete;
}
if !self.last_chunks_incomplete.eq(&chunks_incomplete) {
info!("incomplete chunks: {:?}", chunks_incomplete);
self.last_chunks_incomplete = chunks_incomplete;
}
let participant_ids_incomplete_vec: Vec<PublicKey> = participant_ids_incomplete
.iter()
.map(|pk| pk.clone())
.collect();
if !self
.participant_ids_incomplete
.eq(&participant_ids_incomplete_vec)
{
info!(
"incomplete participants: {:?}",
participant_ids_incomplete_vec
);
self.participant_ids_incomplete = participant_ids_incomplete_vec;
}

Ok(())
}
Expand All @@ -151,14 +227,14 @@ async fn main() {

let opts: MonitorOpts = MonitorOpts::parse_args_default_or_exit();

let monitor = Monitor::new(&opts).expect("Should have been able to create a monitor.");
let mut monitor = Monitor::new(&opts).expect("Should have been able to create a monitor.");
let mut monitor_interval =
tokio::time::interval(std::time::Duration::from_secs(60 * opts.polling_interval));
loop {
monitor_interval.tick().await;

match monitor.run().await {
Err(e) => info!("Got error from monitor: {}", e.to_string()),
Err(e) => error!("Got error from monitor: {}", e.to_string()),
_ => {}
}
}
Expand Down

0 comments on commit bab2e00

Please sign in to comment.