diff --git a/Cargo.lock b/Cargo.lock index 7f441ba3f9..2e2711bc66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1796,7 +1796,6 @@ dependencies = [ "futures", "im", "iml-change", - "iml-influx", "iml-manager-env", "iml-postgres", "iml-rabbit", diff --git a/iml-agent/src/daemon_plugins/device.rs b/iml-agent/src/daemon_plugins/device.rs index ad4433a1ca..d51ae5ef79 100644 --- a/iml-agent/src/daemon_plugins/device.rs +++ b/iml-agent/src/daemon_plugins/device.rs @@ -9,26 +9,56 @@ use crate::{ }; use async_trait::async_trait; use futures::{future, lock::Mutex, Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; +use iml_cmd::Command; +use lustre_collector::{mgs::mgs_fs_parser, parse_mgs_fs_output, Record, TargetStats}; +use serde_json::Value; use std::{io, pin::Pin, sync::Arc}; use stream_cancel::{Trigger, Tripwire}; -#[derive(Eq, PartialEq)] -enum State { - Pending, - Sent, +#[derive(Debug)] +pub struct Devices { + trigger: Option, + state: Arc>, } pub fn create() -> impl DaemonPlugin { Devices { trigger: None, - state: Arc::new(Mutex::new((None, State::Sent))), + state: Arc::new(Mutex::new(None)), } } -#[derive(Debug)] -pub struct Devices { - trigger: Option, - state: Arc>, +async fn get_mgs_fses() -> Result, ImlAgentError> { + let output = Command::new("lctl") + .arg("get_param") + .arg("-N") + .args(mgs_fs_parser::params()) + .output() + .err_into() + .await; + + let output = match output { + Ok(x) => x, + Err(ImlAgentError::Io(ref err)) if err.kind() == io::ErrorKind::NotFound => { + tracing::debug!("lctl binary was not found; will not send mgs fs info."); + + return Ok(vec![]); + } + Err(e) => return Err(e), + }; + + let fses: Vec<_> = parse_mgs_fs_output(&output.stdout) + .unwrap_or_default() + .into_iter() + .filter_map(|x| match x { + Record::Target(TargetStats::FsNames(x)) => Some(x), + _ => None, + }) + .flat_map(|x| x.value) + .map(|x| x.0) + .collect(); + + Ok(fses) } #[async_trait] @@ -48,22 +78,20 @@ impl DaemonPlugin for Devices { let state = Arc::clone(&self.state); Box::pin(async move { - let (x, s) = fut.await; + let (x, s): (Option>, _) = fut.await; - let x: Output = match x { - Some(x) => x, + let x: Value = match x { + Some(x) => x?, None => { return Err(ImlAgentError::Io(io::Error::new( io::ErrorKind::ConnectionAborted, "Device scanner connection aborted before any data was sent", ))) } - }?; + }; { - let mut lock = state.lock().await; - - lock.0 = x.clone(); + state.lock().await.replace(x.clone()); } tokio::spawn( @@ -72,14 +100,7 @@ impl DaemonPlugin for Devices { let state = Arc::clone(&state); async move { - let mut lock = state.lock().await; - - if lock.0 != x { - tracing::debug!("marking pending (is none: {}) ", x.is_none()); - - lock.0 = x; - lock.1 = State::Pending; - } + state.lock().await.replace(x); Ok(()) } @@ -91,6 +112,11 @@ impl DaemonPlugin for Devices { }), ); + let fses = get_mgs_fses().await?; + let fses = serde_json::to_value(fses)?; + + let x = Some(Value::Array(vec![x, fses])); + Ok(x) }) } @@ -100,16 +126,16 @@ impl DaemonPlugin for Devices { let state = Arc::clone(&self.state); async move { - let mut lock = state.lock().await; + let fses = get_mgs_fses().await?; + let fses = serde_json::to_value(fses)?; - if lock.1 == State::Pending { - tracing::debug!("Sending new value"); - lock.1 = State::Sent; + let x = state + .lock() + .await + .take() + .map(|x| Value::Array(vec![x, fses])); - Ok(lock.0.clone()) - } else { - Ok(None) - } + Ok(x) } .boxed() } diff --git a/iml-agent/src/daemon_plugins/stats.rs b/iml-agent/src/daemon_plugins/stats.rs index 0bcbf0034b..f1bee0cd65 100644 --- a/iml-agent/src/daemon_plugins/stats.rs +++ b/iml-agent/src/daemon_plugins/stats.rs @@ -9,8 +9,7 @@ use crate::{ use futures::{future, Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use iml_cmd::Command; use lustre_collector::{ - mgs::mgs_fs_parser, parse_cpustats_output, parse_lctl_output, parse_lnetctl_output, - parse_meminfo_output, parse_mgs_fs_output, parser, + parse_cpustats_output, parse_lctl_output, parse_lnetctl_output, parse_meminfo_output, parser, }; use std::{io, pin::Pin, str}; @@ -41,14 +40,6 @@ impl DaemonPlugin for Stats { let mut cmd1 = Command::new("lctl"); let cmd1 = cmd1.arg("get_param").args(params()).output().err_into(); - let mut cmd1a = Command::new("lctl"); - let cmd1a = cmd1a - .arg("get_param") - .arg("-N") - .args(mgs_fs_parser::params()) - .output() - .err_into(); - let mut cmd2 = Command::new("lnetctl"); let cmd2 = cmd2.arg("export").output().err_into(); @@ -57,14 +48,11 @@ impl DaemonPlugin for Stats { let mut cmd4 = iml_fs::stream_file_lines("/proc/stat").boxed(); let cmd4 = cmd4.try_next().err_into(); - let result = future::try_join5(cmd1, cmd1a, cmd2, cmd3, cmd4).await; + let result = future::try_join4(cmd1, cmd2, cmd3, cmd4).await; match result { - Ok((lctl, mgs_fs, lnetctl, meminfo, maybe_cpustats)) => { + Ok((lctl, lnetctl, meminfo, maybe_cpustats)) => { let mut lctl_output = parse_lctl_output(&lctl.stdout)?; - let mut mgs_fs_output = parse_mgs_fs_output(&mgs_fs.stdout).unwrap_or_default(); - - lctl_output.append(&mut mgs_fs_output); let lnetctl_stats = str::from_utf8(&lnetctl.stdout)?; diff --git a/iml-services/iml-device/Cargo.toml b/iml-services/iml-device/Cargo.toml index 6685a187e0..9167b1af8d 100644 --- a/iml-services/iml-device/Cargo.toml +++ b/iml-services/iml-device/Cargo.toml @@ -9,7 +9,6 @@ device-types = "0.3.0" futures = "0.3" im = {version = "15.0", features = ["serde"]} iml-change = {path = "../../iml-change", version = "0.1"} -iml-influx = {path = "../../iml-influx", version = "0.2", features = ["with-db-client"]} iml-manager-env = {path = "../../iml-manager-env", version = "0.4"} iml-postgres = {path = "../../iml-postgres", version = "0.4"} iml-rabbit = {path = "../../iml-rabbit", version = "0.4"} diff --git a/iml-services/iml-device/src/error.rs b/iml-services/iml-device/src/error.rs index a0413e2a11..3733b88503 100644 --- a/iml-services/iml-device/src/error.rs +++ b/iml-services/iml-device/src/error.rs @@ -19,8 +19,6 @@ pub enum ImlDeviceError { SqlxCoreError(#[from] sqlx::Error), #[error(transparent)] SqlxMigrateError(#[from] sqlx::migrate::MigrateError), - #[error(transparent)] - ImlInfluxError(#[from] iml_influx::Error), } impl reject::Reject for ImlDeviceError {} diff --git a/iml-services/iml-device/src/lib.rs b/iml-services/iml-device/src/lib.rs index 9beb3bab3e..b236b46845 100644 --- a/iml-services/iml-device/src/lib.rs +++ b/iml-services/iml-device/src/lib.rs @@ -13,7 +13,6 @@ pub use error::ImlDeviceError; use futures::{future::try_join_all, lock::Mutex, TryStreamExt}; use im::HashSet; use iml_change::*; -use iml_influx::{Client, InfluxClientExt as _, Precision}; use iml_postgres::sqlx::{self, PgPool}; use iml_tracing::tracing; use iml_wire_types::{Fqdn, FsType}; @@ -33,18 +32,6 @@ struct FsRecord { mgs_fs: Option, } -impl FsRecord { - fn filesystems(&self) -> String { - if let Some(fs) = self.fs.clone() { - fs - } else if let Some(fs) = self.mgs_fs.clone() { - fs - } else { - "".into() - } - } -} - /// Given a db pool, create a new cache and fill it with initial data. /// This will start the device tree with the previous items it left off with. pub async fn create_cache(pool: &PgPool) -> Result { @@ -63,21 +50,8 @@ pub async fn create_cache(pool: &PgPool) -> Result { } pub async fn create_target_cache(pool: &PgPool) -> Result, ImlDeviceError> { - let xs: Vec = sqlx::query!(r#"SELECT state, name, active_host_id, host_ids, filesystems, uuid, mount_path, dev_path, fs_type AS "fs_type: FsType" FROM target"#) + let xs: Vec = sqlx::query_as!(Target, r#"SELECT state, name, active_host_id, host_ids, filesystems, uuid, mount_path, dev_path, fs_type AS "fs_type: FsType" FROM target"#) .fetch(pool) - .map_ok(|x| { - Target { - state: x.state, - name: x.name, - active_host_id: x.active_host_id, - host_ids: x.host_ids, - filesystems: x.filesystems, - uuid: x.uuid, - mount_path: x.mount_path, - dev_path: x.dev_path, - fs_type: x.fs_type, - } - }) .try_collect() .await?; @@ -370,7 +344,7 @@ pub fn find_targets<'a>( mounts: &HashMap>, host_map: &HashMap, device_index: &DeviceIndex<'a>, - target_to_fs_map: &TargetFsRecord, + mgs_fs_cache: &HashMap>, ) -> Vec { let xs: Vec<_> = mounts .iter() @@ -429,6 +403,7 @@ pub fn find_targets<'a>( tracing::debug!("host id: {:?}", host_id); Some(( + fqdn, host_id, [vec![*host_id], ys].concat(), mntpnt, @@ -442,33 +417,33 @@ pub fn find_targets<'a>( xs.into_iter() .map( - |(fqdn, ids, mntpnt, fs_uuid, dev_path, target, osd)| Target { - state: "mounted".into(), - active_host_id: Some(*fqdn), - host_ids: ids, - dev_path: Some(dev_path.0.to_string_lossy().to_string()), - filesystems: target_to_fs_map - .get(target) - .map(|xs| { - xs.iter() - .filter(|(host, _)| { - host_map - .get(host) - .unwrap_or_else(|| panic!("Couldn't get host {}", host.0)) - == fqdn - }) - .map(|(_, fs)| fs.clone()) - .collect::>() - }) - .unwrap_or_default(), - name: target.into(), - uuid: fs_uuid.into(), - mount_path: Some(mntpnt.0.to_string_lossy().to_string()), - fs_type: match osd { - osd if osd.contains("zfs") => Some(FsType::Zfs), - osd if osd.contains("ldiskfs") => Some(FsType::Ldiskfs), - _ => None, - }, + |(fqdn, host_id, ids, mntpnt, fs_uuid, dev_path, target, osd)| { + let filesystems = if target == "MGS" { + mgs_fs_cache.get(&fqdn).cloned().unwrap_or_default() + } else { + target + .rsplitn(2, '-') + .nth(1) + .map(String::from) + .map(|x| vec![x]) + .unwrap_or_default() + }; + + Target { + state: "mounted".into(), + active_host_id: Some(*host_id), + host_ids: ids, + dev_path: Some(dev_path.0.to_string_lossy().to_string()), + filesystems, + name: target.into(), + uuid: fs_uuid.into(), + mount_path: Some(mntpnt.0.to_string_lossy().to_string()), + fs_type: match osd { + osd if osd.contains("zfs") => Some(FsType::Zfs), + osd if osd.contains("ldiskfs") => Some(FsType::Ldiskfs), + _ => None, + }, + } }, ) .fold(HashMap::new(), |mut acc: HashMap, x| { @@ -549,107 +524,6 @@ pub fn build_updates(x: Changes<'_, Target>) -> Vec { } } -fn parse_filesystem_data(query_result: Option>) -> TargetFsRecord { - let x = match query_result { - Some(x) => x, - None => return HashMap::new(), - }; - - let items = x - .into_iter() - .map(|x| { - let filesystems: String = x.filesystems(); - let host: String = x.host; - let target: String = x.target; - - ( - target, - filesystems - .split(',') - .map(|x| (Fqdn(host.clone()), x.to_string())) - .collect(), - ) - }) - .collect::)>>(); - - let target_to_fs = items.into_iter().fold( - HashMap::new(), - |mut acc: HashMap>, xs| { - let existing = acc.remove(xs.0.as_str()); - - let x = if let Some(entry) = existing { - [&entry[..], &xs.1[..]].concat() - } else { - xs.1 - }; - - acc.insert(xs.0, x); - - acc - }, - ); - - tracing::debug!("target_to_fs: {:?}", target_to_fs); - - target_to_fs -} - -pub async fn get_target_filesystem_map( - influx_client: &Client, -) -> Result { - let query_result: Option> = influx_client - .query_into( - "select host,target,fs,bytes_free from target group by target order by time desc limit 1;", - Some(Precision::Nanoseconds), - ) - .await?; - - Ok(parse_filesystem_data(query_result)) -} - -pub async fn get_mgs_filesystem_map( - influx_client: &Client, - mounts: &HashMap>, -) -> Result { - let query_result: Option> = influx_client - .query_into( - "select host,target,mgs_fs,is_mgs_fs from target group by mgs_fs order by time desc limit 1;", - Some(Precision::Nanoseconds), - ) - .await?; - - let target_to_fs_map = parse_filesystem_data(query_result); - - let snapshots: Vec = mounts - .iter() - .map(|(k, xs)| xs.into_iter().map(move |x| (k, x))) - .flatten() - .filter(|(_, x)| x.fs_type.0 == "lustre") - .filter(|(_, x)| x.opts.0.split(',').any(|x| x == "nomgs")) - .filter_map(|(_, x)| { - let s = x.opts.0.split(',').find(|x| x.starts_with("svname="))?; - - let s = s.split('=').nth(1)?; - - Some(s.to_string()) - }) - .collect(); - - let target_to_fs_map = target_to_fs_map - .into_iter() - .map(|(key, xs)| { - ( - key, - xs.into_iter() - .filter(|(_, x)| snapshots.iter().find(|s| s.contains(x)).is_none()) - .collect::>(), - ) - }) - .collect::(); - - Ok(target_to_fs_map) -} - #[cfg(test)] mod tests { use super::*; @@ -776,114 +650,4 @@ mod tests { assert_eq!(xs, vec![]); } - - #[test] - fn test_parse_target_filesystem_data() { - let query_result = Some(vec![ - FsRecord { - host: "oss1".into(), - target: "fs-OST0009".into(), - fs: Some("fs".into()), - mgs_fs: None, - }, - FsRecord { - host: "oss1".into(), - target: "fs-OST0008".into(), - fs: Some("fs2".into()), - mgs_fs: None, - }, - ]); - - let result = parse_filesystem_data(query_result); - assert_eq!( - result, - vec![ - ( - "fs-OST0009".to_string(), - vec![(Fqdn("oss1".to_string()), "fs".to_string())] - ), - ( - "fs-OST0008".to_string(), - vec![(Fqdn("oss1".to_string()), "fs2".to_string())] - ), - ] - .into_iter() - .collect::(), - ); - } - - #[test] - fn test_parse_mgs_filesystem_data() { - let query_result = Some(vec![ - FsRecord { - host: "mds1".into(), - target: "MGS".into(), - fs: None, - mgs_fs: Some("mgs1fs1,mgs1fs2".into()), - }, - FsRecord { - host: "mds1".into(), - target: "MGS2".into(), - fs: None, - mgs_fs: Some("mgs2fs1,mgs2fs2".into()), - }, - ]); - - let result = parse_filesystem_data(query_result); - - assert_eq!( - result, - vec![ - ( - "MGS".to_string(), - vec![ - (Fqdn("mds1".to_string()), "mgs1fs1".to_string()), - (Fqdn("mds1".to_string()), "mgs1fs2".to_string()) - ] - ), - ( - "MGS2".to_string(), - vec![ - (Fqdn("mds1".to_string()), "mgs2fs1".to_string()), - (Fqdn("mds1".to_string()), "mgs2fs2".to_string()) - ] - ), - ] - .into_iter() - .collect::(), - ); - } - - #[test] - fn test_parse_mgs_filesystem_data_on_separate_hosts() { - let query_result = Some(vec![ - FsRecord { - host: "mds1".into(), - target: "MGS".into(), - fs: None, - mgs_fs: Some("fs1".into()), - }, - FsRecord { - host: "oss1".into(), - target: "MGS".into(), - fs: None, - mgs_fs: Some("fs2".into()), - }, - ]); - - let result = parse_filesystem_data(query_result); - - assert_eq!( - result, - vec![( - "MGS".to_string(), - vec![ - (Fqdn("mds1".to_string()), "fs1".to_string()), - (Fqdn("oss1".to_string()), "fs2".to_string()) - ] - ),] - .into_iter() - .collect::(), - ); - } } diff --git a/iml-services/iml-device/src/main.rs b/iml-services/iml-device/src/main.rs index 60f9f80a02..263e285a8c 100644 --- a/iml-services/iml-device/src/main.rs +++ b/iml-services/iml-device/src/main.rs @@ -8,15 +8,13 @@ use im::HashSet; use iml_change::GetChanges as _; use iml_device::{ build_device_index, client_mount_content_id, create_cache, create_target_cache, find_targets, - get_mgs_filesystem_map, get_target_filesystem_map, linux_plugin_transforms::{ build_device_lookup, devtree2linuxoutput, get_shared_pools, populate_zpool, update_vgs, LinuxPluginData, }, - update_client_mounts, update_devices, Cache, ImlDeviceError, TargetFsRecord, + update_client_mounts, update_devices, Cache, ImlDeviceError, }; -use iml_influx::Client; -use iml_manager_env::{get_influxdb_addr, get_influxdb_metrics_db, get_pool_limit}; +use iml_manager_env::get_pool_limit; use iml_postgres::{get_db_pool, sqlx, PgPool}; use iml_service_queue::service_queue::consume_data; use iml_tracing::tracing; @@ -26,7 +24,6 @@ use std::{ iter::FromIterator, sync::Arc, }; -use url::Url; use warp::Filter; // Default pool limit if not overridden by POOL_LIMIT @@ -102,20 +99,16 @@ async fn main() -> Result<(), ImlDeviceError> { let ch = iml_rabbit::create_channel(&conn).await?; - let mut s = consume_data::<(Device, HashSet)>(&ch, "rust_agent_device_rx"); + let mut s = + consume_data::<((Device, HashSet), Vec)>(&ch, "rust_agent_device_rx"); let lustreclientmount_ct_id = client_mount_content_id(&pool).await?; - let influx_url: String = format!("http://{}", get_influxdb_addr()); - let mut mount_cache = HashMap::new(); - let influx_client = Client::new( - Url::parse(&influx_url).expect("Influx URL is invalid."), - get_influxdb_metrics_db(), - ); + let mut mgs_fs_cache = HashMap::new(); - while let Some((host, (devices, mounts))) = s.try_next().await? { + while let Some((host, ((devices, mounts), mgs_fses))) = s.try_next().await? { update_devices(&pool, &host, &devices).await?; update_client_mounts(&pool, lustreclientmount_ct_id, &host, &mounts).await?; @@ -123,7 +116,8 @@ async fn main() -> Result<(), ImlDeviceError> { let mut device_cache = cache2.lock().await; device_cache.insert(host.clone(), devices); - mount_cache.insert(host, mounts); + mount_cache.insert(host.clone(), mounts); + mgs_fs_cache.insert(host, mgs_fses); let index = build_device_index(&device_cache); @@ -134,15 +128,6 @@ async fn main() -> Result<(), ImlDeviceError> { .try_collect() .await?; - let target_to_fs_map = get_target_filesystem_map(&influx_client).await?; - let mgs_targets_to_fs_map = get_mgs_filesystem_map(&influx_client, &mount_cache).await?; - let target_to_fs_map: TargetFsRecord = target_to_fs_map - .into_iter() - .chain(mgs_targets_to_fs_map) - .collect(); - - tracing::debug!("target_to_fs_map: {:?}", target_to_fs_map); - tracing::debug!("mount_cache: {:?}", mount_cache); let targets = find_targets( @@ -150,7 +135,7 @@ async fn main() -> Result<(), ImlDeviceError> { &mount_cache, &host_ids, &index, - &target_to_fs_map, + &mgs_fs_cache, ); tracing::debug!("targets: {:?}", targets); diff --git a/iml-services/iml-stats/src/main.rs b/iml-services/iml-stats/src/main.rs index cc58fde2b5..deeed21aa4 100644 --- a/iml-services/iml-stats/src/main.rs +++ b/iml-services/iml-stats/src/main.rs @@ -2,8 +2,8 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -use futures::{future::try_join_all, stream::TryStreamExt, TryFutureExt}; -use iml_influx::{Client, InfluxClientExt, Point, Points, Precision, Value}; +use futures::stream::TryStreamExt; +use iml_influx::{Client, Point, Points, Precision, Value}; use iml_manager_env::{get_influxdb_addr, get_influxdb_metrics_db}; use iml_service_queue::service_queue::consume_data; use iml_stats::error::ImlStatsError; @@ -12,10 +12,9 @@ use lustre_collector::{ HostStats, LNetStats, NodeStats, Record, Target, TargetStats, { types::{BrwStats, TargetVariant}, - FsName, Stat, TargetStat, + Stat, TargetStat, }, }; -use std::collections::{HashMap, HashSet}; use url::Url; fn build_stats_query(x: &TargetStat>, stat: &Stat, query: Point) -> Point { @@ -365,16 +364,9 @@ fn handle_target_records(target_stats: TargetStats, host: &Fqdn) -> Option { - tracing::debug!("Fs names: {:?}", x); + tracing::warn!(data = ?x, "Unexpected MGS fses"); - let fs_names = join_fs_names(x.value); - - Some(vec![Point::new("target") - .add_tag("host", Value::String(host.0.to_string())) - .add_tag("kind", Value::String(x.kind.to_string())) - .add_tag("target", Value::String(x.target.to_string())) - .add_tag("mgs_fs", Value::String(fs_names)) - .add_field("is_mgs_fs", Value::Boolean(true))]) + None } TargetStats::JobStatsOst(_) => { // Not storing jobstats... yet. @@ -490,15 +482,8 @@ async fn main() -> Result<(), ImlStatsError> { get_influxdb_metrics_db(), ); - let mut fs_names = None; - let entries: Vec<_> = xs .into_iter() - .inspect(|x| { - if let Record::Target(TargetStats::FsNames(x)) = x { - fs_names = Some(fs_names_vec_to_hash_set(&x.value)); - } - }) .filter_map(|record| match record { Record::Target(target_stats) => handle_target_records(target_stats, &host), Record::Host(host_stats) => handle_host_records(host_stats, &host), @@ -522,10 +507,6 @@ async fn main() -> Result<(), ImlStatsError> { if let Err(e) = r { tracing::error!("Error writing series to influxdb: {}", e); } - - if let Some(fs_names) = fs_names { - delete_existing_mgs_fs_records(fs_names, &client).await?; - } } } @@ -539,111 +520,8 @@ struct MgsFsTime { mgs_fs_count: Option, } -async fn delete_existing_mgs_fs_records( - fs_names: HashSet, - client: &Client, -) -> Result<(), ImlStatsError> { - let xs: Vec = client - .query_into( - format!( - r#" - SELECT mgs_fs, mgs_fs_count FROM - (SELECT count(is_mgs_fs) AS mgs_fs_count FROM target WHERE is_mgs_fs=true GROUP BY mgs_fs), - (SELECT LAST(is_mgs_fs),time as last_time FROM target GROUP BY mgs_fs) - GROUP BY mgs_fs - "# - ) - .as_str(), - Some(Precision::Nanoseconds) - ) - .await? - .unwrap_or_default(); - - tracing::debug!( - "delete stale mgs_fs records - fs_names: {:?}; query results: {:?}", - fs_names, - xs - ); - - let subsets: HashMap = xs - .into_iter() - .filter(|x| { - let mgs_fs_set = fs_names_to_hash_set(&x.mgs_fs); - - let intersection = mgs_fs_set.intersection(&fs_names).collect::>(); - - !intersection.is_empty() - }) - .fold(HashMap::new(), |mut acc, record| { - let mgs_fs = record.mgs_fs; - let mut entry = acc.entry(mgs_fs).or_insert((0, 0)); - - if record.time > 0 { - entry.0 = record.time; - } - - if let Some(count) = record.mgs_fs_count { - entry.1 = count; - } - - acc - }); - - let delete_subset_futures = subsets - .into_iter() - .filter(|(mgs_fs, (_time, count))| { - let mgs_fs_set = fs_names_to_hash_set(&mgs_fs); - - mgs_fs_set != fs_names || *count > 1 - }) - .map(|(mgs_fs, (time, count))| { - let mds_fs_set = fs_names_to_hash_set(&mgs_fs); - - let query = if mds_fs_set == fs_names && count > 1 { - format!( - "DELETE FROM target WHERE time < {} AND mgs_fs = '{}' AND kind = '{}'", - time, - mgs_fs, - TargetVariant::MGT - ) - } else { - format!( - "DELETE FROM target WHERE time <= {} AND mgs_fs = '{}' AND kind = '{}'", - time, - mgs_fs, - TargetVariant::MGT - ) - }; - - tracing::debug!("Deleting entries with query: {}", query); - - client - .query(query.as_str(), Some(Precision::Nanoseconds)) - .map_err(|e| iml_influx::Error::InfluxDbError(e)) - }); - - try_join_all(delete_subset_futures).await?; - - Ok(()) -} - fn fs_name(t: &Target) -> &str { let s = &*t; s.split_at(s.rfind('-').unwrap_or(0)).0 } - -fn join_fs_names(xs: Vec) -> String { - xs.into_iter() - .map(|x| x.0) - .collect::>() - .join(",") -} - -fn fs_names_vec_to_hash_set(xs: &[FsName]) -> HashSet { - xs.into_iter().map(|x| x.0.to_string()).collect() -} - -fn fs_names_to_hash_set(xs: &str) -> HashSet { - xs.split(',').map(|x| x.to_string()).collect() -}