Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Include MGS fs info with devices (#2420)
Browse files Browse the repository at this point in the history
Instead of looking at influx for MGS info, send it inline with the
device service. This should help prevent races where devices and lctl
data is updated out-of-sync with each other.

Signed-off-by: Joe Grund <[email protected]>
  • Loading branch information
jgrund authored Dec 8, 2020
1 parent e517b17 commit fec94c1
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 468 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 58 additions & 32 deletions iml-agent/src/daemon_plugins/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,56 @@ use crate::{
};
use async_trait::async_trait;
use futures::{future, lock::Mutex, Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use iml_cmd::Command;
use lustre_collector::{mgs::mgs_fs_parser, parse_mgs_fs_output, Record, TargetStats};
use serde_json::Value;
use std::{io, pin::Pin, sync::Arc};
use stream_cancel::{Trigger, Tripwire};

#[derive(Eq, PartialEq)]
enum State {
Pending,
Sent,
#[derive(Debug)]
pub struct Devices {
trigger: Option<Trigger>,
state: Arc<Mutex<Output>>,
}

pub fn create() -> impl DaemonPlugin {
Devices {
trigger: None,
state: Arc::new(Mutex::new((None, State::Sent))),
state: Arc::new(Mutex::new(None)),
}
}

#[derive(Debug)]
pub struct Devices {
trigger: Option<Trigger>,
state: Arc<Mutex<(Output, State)>>,
async fn get_mgs_fses() -> Result<Vec<String>, ImlAgentError> {
let output = Command::new("lctl")
.arg("get_param")
.arg("-N")
.args(mgs_fs_parser::params())
.output()
.err_into()
.await;

let output = match output {
Ok(x) => x,
Err(ImlAgentError::Io(ref err)) if err.kind() == io::ErrorKind::NotFound => {
tracing::debug!("lctl binary was not found; will not send mgs fs info.");

return Ok(vec![]);
}
Err(e) => return Err(e),
};

let fses: Vec<_> = parse_mgs_fs_output(&output.stdout)
.unwrap_or_default()
.into_iter()
.filter_map(|x| match x {
Record::Target(TargetStats::FsNames(x)) => Some(x),
_ => None,
})
.flat_map(|x| x.value)
.map(|x| x.0)
.collect();

Ok(fses)
}

#[async_trait]
Expand All @@ -48,22 +78,20 @@ impl DaemonPlugin for Devices {
let state = Arc::clone(&self.state);

Box::pin(async move {
let (x, s) = fut.await;
let (x, s): (Option<Result<Value, ImlAgentError>>, _) = fut.await;

let x: Output = match x {
Some(x) => x,
let x: Value = match x {
Some(x) => x?,
None => {
return Err(ImlAgentError::Io(io::Error::new(
io::ErrorKind::ConnectionAborted,
"Device scanner connection aborted before any data was sent",
)))
}
}?;
};

{
let mut lock = state.lock().await;

lock.0 = x.clone();
state.lock().await.replace(x.clone());
}

tokio::spawn(
Expand All @@ -72,14 +100,7 @@ impl DaemonPlugin for Devices {
let state = Arc::clone(&state);

async move {
let mut lock = state.lock().await;

if lock.0 != x {
tracing::debug!("marking pending (is none: {}) ", x.is_none());

lock.0 = x;
lock.1 = State::Pending;
}
state.lock().await.replace(x);

Ok(())
}
Expand All @@ -91,6 +112,11 @@ impl DaemonPlugin for Devices {
}),
);

let fses = get_mgs_fses().await?;
let fses = serde_json::to_value(fses)?;

let x = Some(Value::Array(vec![x, fses]));

Ok(x)
})
}
Expand All @@ -100,16 +126,16 @@ impl DaemonPlugin for Devices {
let state = Arc::clone(&self.state);

async move {
let mut lock = state.lock().await;
let fses = get_mgs_fses().await?;
let fses = serde_json::to_value(fses)?;

if lock.1 == State::Pending {
tracing::debug!("Sending new value");
lock.1 = State::Sent;
let x = state
.lock()
.await
.take()
.map(|x| Value::Array(vec![x, fses]));

Ok(lock.0.clone())
} else {
Ok(None)
}
Ok(x)
}
.boxed()
}
Expand Down
18 changes: 3 additions & 15 deletions iml-agent/src/daemon_plugins/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use crate::{
use futures::{future, Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use iml_cmd::Command;
use lustre_collector::{
mgs::mgs_fs_parser, parse_cpustats_output, parse_lctl_output, parse_lnetctl_output,
parse_meminfo_output, parse_mgs_fs_output, parser,
parse_cpustats_output, parse_lctl_output, parse_lnetctl_output, parse_meminfo_output, parser,
};
use std::{io, pin::Pin, str};

Expand Down Expand Up @@ -41,14 +40,6 @@ impl DaemonPlugin for Stats {
let mut cmd1 = Command::new("lctl");
let cmd1 = cmd1.arg("get_param").args(params()).output().err_into();

let mut cmd1a = Command::new("lctl");
let cmd1a = cmd1a
.arg("get_param")
.arg("-N")
.args(mgs_fs_parser::params())
.output()
.err_into();

let mut cmd2 = Command::new("lnetctl");
let cmd2 = cmd2.arg("export").output().err_into();

Expand All @@ -57,14 +48,11 @@ impl DaemonPlugin for Stats {
let mut cmd4 = iml_fs::stream_file_lines("/proc/stat").boxed();
let cmd4 = cmd4.try_next().err_into();

let result = future::try_join5(cmd1, cmd1a, cmd2, cmd3, cmd4).await;
let result = future::try_join4(cmd1, cmd2, cmd3, cmd4).await;

match result {
Ok((lctl, mgs_fs, lnetctl, meminfo, maybe_cpustats)) => {
Ok((lctl, lnetctl, meminfo, maybe_cpustats)) => {
let mut lctl_output = parse_lctl_output(&lctl.stdout)?;
let mut mgs_fs_output = parse_mgs_fs_output(&mgs_fs.stdout).unwrap_or_default();

lctl_output.append(&mut mgs_fs_output);

let lnetctl_stats = str::from_utf8(&lnetctl.stdout)?;

Expand Down
1 change: 0 additions & 1 deletion iml-services/iml-device/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ device-types = "0.3.0"
futures = "0.3"
im = {version = "15.0", features = ["serde"]}
iml-change = {path = "../../iml-change", version = "0.1"}
iml-influx = {path = "../../iml-influx", version = "0.2", features = ["with-db-client"]}
iml-manager-env = {path = "../../iml-manager-env", version = "0.4"}
iml-postgres = {path = "../../iml-postgres", version = "0.4"}
iml-rabbit = {path = "../../iml-rabbit", version = "0.4"}
Expand Down
2 changes: 0 additions & 2 deletions iml-services/iml-device/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ pub enum ImlDeviceError {
SqlxCoreError(#[from] sqlx::Error),
#[error(transparent)]
SqlxMigrateError(#[from] sqlx::migrate::MigrateError),
#[error(transparent)]
ImlInfluxError(#[from] iml_influx::Error),
}

impl reject::Reject for ImlDeviceError {}
Loading

0 comments on commit fec94c1

Please sign in to comment.