From 5d153649998f3a66434c7357a643ff924d6baec2 Mon Sep 17 00:00:00 2001 From: Igor Pashev Date: Mon, 26 Oct 2020 18:42:47 +0200 Subject: [PATCH] Implement new automatic snapshot feature Signed-off-by: Igor Pashev --- Cargo.lock | 1 + iml-api/src/graphql/mod.rs | 112 +++- iml-graphql-queries/src/snapshot.rs | 102 ++++ iml-gui/crate/src/lib.rs | 3 + .../crate/src/page/snapshot/create_policy.rs | 415 ++++++++++++++ .../crate/src/page/snapshot/list_policy.rs | 148 +++++ iml-gui/crate/src/page/snapshot/mod.rs | 63 ++- iml-gui/crate/src/test_utils/fixtures.rs | 1 + iml-manager-cli/src/display_utils.rs | 33 +- iml-manager-cli/src/snapshot.rs | 107 ++++ iml-services/iml-snapshot/Cargo.toml | 5 +- iml-services/iml-snapshot/src/lib.rs | 1 + iml-services/iml-snapshot/src/main.rs | 11 +- iml-services/iml-snapshot/src/policy.rs | 524 ++++++++++++++++++ iml-warp-drive/src/cache.rs | 29 +- iml-warp-drive/src/db_record.rs | 7 +- iml-wire-types/src/graphql_duration.rs | 2 +- iml-wire-types/src/snapshot.rs | 61 +- iml-wire-types/src/warp_drive.rs | 21 +- migrations/20201112000000_snapshot_v2.sql | 63 +++ sqlx-data.json | 162 ++++++ 21 files changed, 1853 insertions(+), 18 deletions(-) create mode 100644 iml-gui/crate/src/page/snapshot/create_policy.rs create mode 100644 iml-gui/crate/src/page/snapshot/list_policy.rs create mode 100644 iml-services/iml-snapshot/src/policy.rs create mode 100644 migrations/20201112000000_snapshot_v2.sql diff --git a/Cargo.lock b/Cargo.lock index ba117c1322..8570982194 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2032,6 +2032,7 @@ dependencies = [ name = "iml-snapshot" version = "0.4.0" dependencies = [ + "chrono", "futures", "futures-util", "iml-command-utils", diff --git a/iml-api/src/graphql/mod.rs b/iml-api/src/graphql/mod.rs index 8266ca2157..39283c9cfb 100644 --- a/iml-api/src/graphql/mod.rs +++ b/iml-api/src/graphql/mod.rs @@ -21,7 +21,7 @@ use iml_wire_types::{ graphql::{ServerProfile, ServerProfileInput}, graphql_duration::GraphQLDuration, logs::{LogResponse, Meta}, - snapshot::{ReserveUnit, Snapshot, SnapshotInterval, SnapshotRetention}, + snapshot::{ReserveUnit, Snapshot, SnapshotInterval, SnapshotPolicy, SnapshotRetention}, task::Task, Command, EndpointName, FsType, Job, LogMessage, LogSeverity, MessageClass, SortDir, }; @@ -621,6 +621,27 @@ impl QueryRoot { Ok(mount_command) } + + /// List all automatic snapshot policies. + async fn snapshot_policies(context: &Context) -> juniper::FieldResult> { + let xs = sqlx::query!(r#"SELECT * FROM snapshot_policy"#) + .fetch(&context.pg_pool) + .map_ok(|x| SnapshotPolicy { + id: x.id, + filesystem: x.filesystem, + interval: x.interval.into(), + barrier: x.barrier, + keep: x.keep, + daily: x.daily, + weekly: x.weekly, + monthly: x.monthly, + last_run: x.last_run, + }) + .try_collect() + .await?; + + Ok(xs) + } } struct SnapshotIntervalName { @@ -930,7 +951,7 @@ impl MutationRoot { ), reserve_unit(description = "The unit of measurement associated with the reserve_value"), keep_num( - description = "The minimum number of snapshots to keep. This is to avoid deleting all snapshots while pursuiting the reserve goal" + description = "The minimum number of snapshots to keep. This is to avoid deleting all snapshots while pursuing the reserve goal" ) ))] /// Creates a new snapshot retention policy for the given `fsname`. @@ -1085,6 +1106,67 @@ impl MutationRoot { Ok(true) } + #[graphql(arguments( + filesystem(description = "The filesystem to create snapshots with"), + interval(description = "How often a snapshot should be taken"), + use_barrier( + description = "Set write barrier before creating snapshot. The default value is `false`" + ), + keep(description = "Number of the most recent snapshots to keep"), + daily(description = "Then, number of days when keep the most recent snapshot of each day"), + weekly( + description = "Then, number of weeks when keep the most recent snapshot of each week" + ), + monthly( + description = "Then, number of months when keep the most recent snapshot of each month" + ), + ))] + /// Creates a new automatic snapshot policy. + async fn create_snapshot_policy( + context: &Context, + filesystem: String, + interval: GraphQLDuration, + barrier: Option, + keep: i32, + daily: Option, + weekly: Option, + monthly: Option, + ) -> juniper::FieldResult { + sqlx::query!( + r#" + INSERT INTO snapshot_policy ( + filesystem, + interval, + barrier, + keep, + daily, + weekly, + monthly + ) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (filesystem) + DO UPDATE SET + interval = EXCLUDED.interval, + barrier = EXCLUDED.barrier, + keep = EXCLUDED.keep, + daily = EXCLUDED.daily, + weekly = EXCLUDED.weekly, + monthly = EXCLUDED.monthly + "#, + filesystem, + PgInterval::try_from(interval.0)?, + barrier.unwrap_or(false), + keep, + daily.unwrap_or(0), + weekly.unwrap_or(0), + monthly.unwrap_or(0) + ) + .fetch_optional(&context.pg_pool) + .await?; + + Ok(true) + } + #[graphql(arguments(profile_name(description = "Name of the profile to remove")))] async fn remove_server_profile( context: &Context, @@ -1116,6 +1198,32 @@ impl MutationRoot { transaction.commit().await?; Ok(true) } + + #[graphql(arguments( + filesystem(description = "The filesystem to remove snapshot policies for"), + id(description = "Id of the policy to remove"), + ))] + /// Removes the automatic snapshot policy. + async fn remove_snapshot_policy( + context: &Context, + filesystem: Option, + id: Option, + ) -> juniper::FieldResult { + sqlx::query!( + r#" + DELETE FROM snapshot_policy + WHERE (filesystem IS NOT DISTINCT FROM $1) + OR (id IS NOT DISTINCT FROM $2) + + "#, + filesystem, + id + ) + .fetch_optional(&context.pg_pool) + .await?; + + Ok(true) + } } #[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] diff --git a/iml-graphql-queries/src/snapshot.rs b/iml-graphql-queries/src/snapshot.rs index 54795d4b87..c4d4ba14c3 100644 --- a/iml-graphql-queries/src/snapshot.rs +++ b/iml-graphql-queries/src/snapshot.rs @@ -440,3 +440,105 @@ pub mod list_retentions { pub snapshot_retention_policies: Vec, } } + +pub mod policy { + pub mod list { + use crate::Query; + use iml_wire_types::snapshot::SnapshotPolicy; + + pub static QUERY: &str = r#" + query SnapshotPolicies { + snapshotPolicies { + id + filesystem + interval + barrier + keep + daily + weekly + monthly + last_run: lastRun + } + } + "#; + + pub fn build() -> Query<()> { + Query { + query: QUERY.to_string(), + variables: None, + } + } + + #[derive(Debug, Clone, serde::Deserialize)] + pub struct Resp { + #[serde(rename(deserialize = "snapshotPolicies"))] + pub snapshot_policies: Vec, + } + } + + pub mod create { + use crate::Query; + + pub static QUERY: &str = r#" + mutation CreateSnapshotPolicy($filesystem: String!, $interval: Duration!, $barrier: Boolean, + $keep: Int!, $daily: Int, $weekly: Int, $monthly: Int) { + createSnapshotPolicy(filesystem: $filesystem, interval: $interval, barrier: $barrier, + keep: $keep, daily: $daily, weekly: $weekly, monthly: $monthly) + } + "#; + + #[derive(Debug, serde::Serialize, Default, Clone)] + pub struct Vars { + pub filesystem: String, + pub interval: String, + pub barrier: Option, + pub keep: i32, + pub daily: Option, + pub weekly: Option, + pub monthly: Option, + } + + pub fn build(vars: Vars) -> Query { + Query { + query: QUERY.to_string(), + variables: Some(vars), + } + } + + #[derive(Debug, Clone, serde::Deserialize)] + pub struct Resp { + #[serde(rename(deserialize = "createSnapshotPolicy"))] + pub snapshot_policy: bool, + } + } + + pub mod remove { + use crate::Query; + + pub static QUERY: &str = r#" + mutation RemoveSnapshotPolicy($filesystem: String!) { + removeSnapshotPolicy(filesystem: $filesystem) + } + "#; + + #[derive(Debug, serde::Serialize, Default)] + pub struct Vars { + filesystem: String, + } + + pub fn build(filesystem: impl ToString) -> Query { + Query { + query: QUERY.to_string(), + variables: Some(Vars { + filesystem: filesystem.to_string(), + }), + } + } + + #[derive(Debug, Clone, serde::Deserialize)] + pub struct Resp { + #[serde(rename(deserialize = "removeSnapshotPolicy"))] + pub snapshot_policy: bool, + } + } +} diff --git a/iml-gui/crate/src/lib.rs b/iml-gui/crate/src/lib.rs index 85f7f5469d..cd4863639e 100644 --- a/iml-gui/crate/src/lib.rs +++ b/iml-gui/crate/src/lib.rs @@ -689,6 +689,9 @@ fn handle_record_change( ArcRecord::SnapshotRetention(x) => { model.records.snapshot_retention.insert(x.id, Arc::clone(&x)); } + ArcRecord::SnapshotPolicy(x) => { + model.records.snapshot_policy.insert(x.id, Arc::clone(&x)); + } ArcRecord::StratagemConfig(x) => { model.records.stratagem_config.insert(x.id, Arc::clone(&x)); diff --git a/iml-gui/crate/src/page/snapshot/create_policy.rs b/iml-gui/crate/src/page/snapshot/create_policy.rs new file mode 100644 index 0000000000..d353aaa344 --- /dev/null +++ b/iml-gui/crate/src/page/snapshot/create_policy.rs @@ -0,0 +1,415 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::{ + components::{font_awesome, form, modal, Placement}, + extensions::{MergeAttrs as _, NodeExt as _}, + generated::css_classes::C, + key_codes, + page::{ + snapshot::{get_fs_names, help_indicator}, + RecordChange, + }, + GMsg, RequestExt, +}; +use iml_graphql_queries::{snapshot, Response}; +use iml_wire_types::{warp_drive::ArcCache, warp_drive::ArcRecord, warp_drive::RecordId, Filesystem}; +use seed::{prelude::*, *}; +use std::sync::Arc; + +#[derive(Debug, Default)] +pub struct Model { + pub modal: modal::Model, + submitting: bool, + filesystems: Vec>, + + interval: i32, + interval_unit: String, + vars: snapshot::policy::create::Vars, +} + +impl RecordChange for Model { + fn update_record(&mut self, _: ArcRecord, cache: &ArcCache, orders: &mut impl Orders) { + orders.send_msg(Msg::SetFilesystems(cache.filesystem.values().cloned().collect())); + } + fn remove_record(&mut self, _: RecordId, cache: &ArcCache, orders: &mut impl Orders) { + orders.send_msg(Msg::SetFilesystems(cache.filesystem.values().cloned().collect())); + + let present = cache.filesystem.values().any(|x| x.name == self.vars.filesystem); + + if !present { + let x = get_fs_names(cache).into_iter().next().unwrap_or_default(); + orders.send_msg(Msg::Input(Input::Filesystem(x))); + } + } + fn set_records(&mut self, cache: &ArcCache, orders: &mut impl Orders) { + orders.send_msg(Msg::SetFilesystems(cache.filesystem.values().cloned().collect())); + + let x = get_fs_names(cache).into_iter().next().unwrap_or_default(); + orders.send_msg(Msg::Input(Input::Filesystem(x))); + orders.send_msg(Msg::Input(Input::IntervalUnit("days".to_string()))); + } +} + +#[derive(Clone, Debug)] +pub enum Msg { + Modal(modal::Msg), + Open, + Close, + SetFilesystems(Vec>), + Input(Input), + Submit, + CreatePolicyResp(fetch::ResponseDataResult>), + Noop, +} + +#[derive(Clone, Debug)] +pub enum Input { + Filesystem(String), + Interval(i32), + IntervalUnit(String), + ToggleBarrier, + Keep(i32), + Daily(Option), + Monthly(Option), + Weekly(Option), +} + +pub fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders) { + match msg { + Msg::SetFilesystems(x) => { + model.filesystems = x; + } + Msg::Input(x) => match x { + Input::Interval(i) => model.interval = i, + Input::IntervalUnit(i) => model.interval_unit = i, + + Input::Filesystem(i) => model.vars.filesystem = i, + Input::ToggleBarrier => model.vars.barrier = Some(!model.vars.barrier.unwrap_or(false)), + Input::Keep(i) => model.vars.keep = i, + Input::Daily(i) => model.vars.daily = i, + Input::Weekly(i) => model.vars.weekly = i, + Input::Monthly(i) => model.vars.monthly = i, + }, + Msg::Submit => { + model.submitting = true; + model.vars.interval = format!("{}{}", model.interval, model.interval_unit); + + let query = snapshot::policy::create::build(model.vars.clone()); + + let req = fetch::Request::graphql_query(&query); + + orders.perform_cmd(req.fetch_json_data(|x| Msg::CreatePolicyResp(x))); + } + Msg::CreatePolicyResp(x) => { + model.submitting = false; + orders.send_msg(Msg::Close); + + match x { + Ok(Response::Data(_)) => {} + Ok(Response::Errors(e)) => { + error!("An error has occurred during policy creation: ", e); + } + Err(e) => { + error!("An error has occurred during policy creation: ", e); + } + } + } + Msg::Modal(msg) => { + modal::update(msg, &mut model.modal, &mut orders.proxy(Msg::Modal)); + } + Msg::Open => { + model.modal.open = true; + } + Msg::Close => { + model.modal.open = false; + } + Msg::Noop => {} + }; +} + +// FIXME: this function was created to help rustfmt only +fn interval_unit_options(selected: &str) -> Vec> { + vec![ + option![ + class![C.font_sans], + attrs! {At::Value => "minutes", At::Selected => (selected == "minutes").as_at_value()}, + "Minutes" + ], + option![ + class![C.font_sans], + attrs! {At::Value => "hours", At::Selected => (selected == "hours").as_at_value()}, + "Hours" + ], + option![ + class![C.font_sans], + attrs! {At::Value => "days", At::Selected => (selected == "days").as_at_value()}, + "Days" + ], + option![ + class![C.font_sans], + attrs! {At::Value => "years", At::Selected => (selected == "years").as_at_value()}, + "Years" + ], + ] +} + +pub fn view(model: &Model) -> Node { + let input_cls = class![ + C.appearance_none, + C.focus__outline_none, + C.focus__shadow_outline, + C.px_3, + C.py_2, + C.rounded_sm + ]; + + modal::bg_view( + model.modal.open, + Msg::Modal, + modal::content_view( + Msg::Modal, + div![ + modal::title_view(Msg::Modal, span!["Create Automatic Snapshot Policy"]), + form![ + ev(Ev::Submit, move |event| { + event.prevent_default(); + Msg::Submit + }), + div![ + class![C.grid, C.grid_cols_2, C.gap_4, C.p_4, C.items_center], + label![attrs! {At::For => "policy_filesystem"}, "Filesystem Name"], + div![ + class![C.inline_block, C.relative, C.bg_gray_200], + select![ + id!["policy_filesystem"], + &input_cls, + class![ + C.block, + C.text_gray_800, + C.leading_tight, + C.bg_transparent, + C.pr_8, + C.rounded, + C.w_full + ], + model.filesystems.iter().map(|x| { + let mut opt = option![class![C.font_sans], attrs! {At::Value => x.name}, x.name]; + if x.name == model.vars.filesystem.as_str() { + opt.add_attr(At::Selected.to_string(), "selected"); + } + + opt + }), + attrs! { + At::Required => true.as_at_value(), + }, + input_ev(Ev::Change, |s| Msg::Input(Input::Filesystem(s))), + ], + div![ + class![ + C.pointer_events_none, + C.absolute, + C.inset_y_0, + C.right_0, + C.flex, + C.items_center, + C.px_2, + C.text_gray_700, + ], + font_awesome(class![C.w_4, C.h_4, C.inline, C.ml_1], "chevron-down") + ], + ], + label![ + attrs! {At::For => "policy_interval"}, + "Interval", + help_indicator( + "How often to take a snapshot for a selected filesystem", + Placement::Right + ) + ], + div![ + class![C.grid, C.grid_cols_6], + input![ + &input_cls, + class![C.bg_gray_200, C.text_gray_800, C.col_span_4, C.rounded_r_none], + id!["policy_interval"], + attrs! { + At::Type => "number", + At::Min => "1", + At::Placeholder => "Required", + At::Required => true.as_at_value(), + }, + input_ev(Ev::Change, |s| s + .parse() + .map(|i| Msg::Input(Input::Interval(i))) + .unwrap_or(Msg::Noop)), + ], + div![ + class![C.inline_block, C.relative, C.col_span_2, C.text_white, C.bg_blue_500], + select![ + id!["interval_unit"], + &input_cls, + class![C.w_full, C.h_full C.rounded_l_none, C.bg_transparent], + interval_unit_options(&model.interval_unit), + attrs! { + At::Required => true.as_at_value(), + }, + input_ev(Ev::Change, |s| Msg::Input(Input::IntervalUnit(s))), + ], + div![ + class![ + C.pointer_events_none, + C.absolute, + C.inset_y_0, + C.right_0, + C.flex, + C.items_center, + C.px_2, + C.text_white, + ], + font_awesome(class![C.w_4, C.h_4, C.inline, C.ml_1], "chevron-down") + ] + ], + ], + label![ + attrs! {At::For => "policy_keep"}, + "Keep Recent Snapshots", + help_indicator("Number of the most recent snapshots to keep", Placement::Right) + ], + input![ + &input_cls, + class![C.bg_gray_200, C.text_gray_800, C.rounded_r_none], + id!["policy_keep"], + attrs! { + At::Type => "number", + At::Min => "1", + At::Placeholder => "Required", + At::Required => true.as_at_value(), + }, + input_ev(Ev::Change, |s| s + .parse() + .map(|i| Msg::Input(Input::Keep(i))) + .unwrap_or(Msg::Noop)), + ], + label![ + attrs! {At::For => "policy_daily"}, + "Daily Snapshots", + help_indicator( + "Number of days when keep the most recent snapshot of each day", + Placement::Right + ) + ], + input![ + &input_cls, + class![C.bg_gray_200, C.text_gray_800, C.rounded_r_none], + id!["policy_daily"], + attrs! { + At::Type => "number", + At::Min => "0", + At::Placeholder => "Optional", + At::Required => false.as_at_value(), + }, + input_ev(Ev::Change, |s| Msg::Input(Input::Daily(s.parse().ok()))), + ], + label![ + attrs! {At::For => "policy_weekly"}, + "Weekly Snapshots", + help_indicator( + "Number of weeks when keep the most recent snapshot of each week", + Placement::Right + ) + ], + input![ + &input_cls, + class![C.bg_gray_200, C.text_gray_800, C.rounded_r_none], + id!["policy_weekly"], + attrs! { + At::Type => "number", + At::Min => "0", + At::Placeholder => "Optional", + At::Required => false.as_at_value(), + }, + input_ev(Ev::Change, |s| Msg::Input(Input::Weekly(s.parse().ok()))), + ], + label![ + attrs! {At::For => "policy_monthly"}, + "Monthly Snapshots", + help_indicator( + "Number of months when keep the most recent snapshot of each months", + Placement::Right + ) + ], + input![ + &input_cls, + class![C.bg_gray_200, C.text_gray_800, C.rounded_r_none], + id!["policy_monthly"], + attrs! { + At::Type => "number", + At::Min => "0", + At::Placeholder => "Optional", + At::Required => false.as_at_value(), + }, + input_ev(Ev::Change, |s| Msg::Input(Input::Monthly(s.parse().ok()))), + ], + label![ + attrs! {At::For => "policy_barrier"}, + "Use Barrier", + help_indicator("Set write barrier before creating snapshot", Placement::Right) + ], + form::toggle() + .merge_attrs(id!["policy_barrier"]) + .merge_attrs(attrs! { + At::Checked => model.vars.barrier.unwrap_or(false).as_at_value() + }) + .with_listener(input_ev(Ev::Change, |_| Msg::Input(Input::ToggleBarrier))), + ], + modal::footer_view(vec![ + button![ + class![ + C.bg_blue_500, + C.duration_300, + C.flex, + C.form_invalid__bg_gray_500, + C.form_invalid__cursor_not_allowed, + C.form_invalid__pointer_events_none, + C.hover__bg_blue_400, + C.items_center + C.px_4, + C.py_2, + C.rounded_full, + C.text_white, + C.transition_colors, + ], + font_awesome(class![C.h_3, C.w_3, C.mr_1, C.inline], "plus"), + "Create Policy", + ], + button![ + class![ + C.bg_transparent, + C.duration_300, + C.hover__bg_gray_100, + C.hover__text_blue_400, + C.ml_2, + C.px_4, + C.py_2, + C.rounded_full, + C.text_blue_500, + C.transition_colors, + ], + simple_ev(Ev::Click, modal::Msg::Close), + "Cancel", + ] + .map_msg(Msg::Modal), + ]) + .merge_attrs(class![C.pt_8]) + ] + ], + ) + .with_listener(keyboard_ev(Ev::KeyDown, move |ev| match ev.key_code() { + key_codes::ESC => Msg::Modal(modal::Msg::Close), + _ => Msg::Noop, + })), + ) +} diff --git a/iml-gui/crate/src/page/snapshot/list_policy.rs b/iml-gui/crate/src/page/snapshot/list_policy.rs new file mode 100644 index 0000000000..b0c871a87d --- /dev/null +++ b/iml-gui/crate/src/page/snapshot/list_policy.rs @@ -0,0 +1,148 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use super::*; +use crate::{extensions::RequestExt, font_awesome}; +use chrono_humanize::{Accuracy, HumanTime, Tense}; +use iml_wire_types::snapshot::SnapshotPolicy; + +#[derive(Clone, Debug)] +pub enum Msg { + Page(paging::Msg), + Delete(Arc), + DeleteResp(fetch::ResponseDataResult>), +} + +#[derive(Default, Debug)] +pub struct Model { + pager: paging::Model, + rows: Vec>, + take: take::Model, +} + +pub fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders) { + match msg { + Msg::Page(msg) => { + paging::update(msg, &mut model.pager, &mut orders.proxy(Msg::Page)); + } + Msg::Delete(x) => { + if let Ok(true) = window().confirm_with_message(&format!("Delete snapshot policy for '{}' ?", x.filesystem)) + { + let query = snapshot::policy::remove::build(&x.filesystem); + + let req = fetch::Request::graphql_query(&query); + + orders.perform_cmd(req.fetch_json_data(Msg::DeleteResp)); + } + } + Msg::DeleteResp(x) => match x { + Ok(Response::Data(_)) => {} + Ok(Response::Errors(e)) => { + error!("An error has occurred during deletion: ", e); + } + Err(e) => { + error!("An error has occurred during deletion: ", e); + } + }, + }; +} + +impl RecordChange for Model { + fn update_record(&mut self, _: ArcRecord, cache: &ArcCache, orders: &mut impl Orders) { + self.rows = cache.snapshot_policy.values().cloned().collect(); + + orders.proxy(Msg::Page).send_msg(paging::Msg::SetTotal(self.rows.len())); + } + fn remove_record(&mut self, _: RecordId, cache: &ArcCache, orders: &mut impl Orders) { + self.rows = cache.snapshot_policy.values().cloned().collect(); + + orders.proxy(Msg::Page).send_msg(paging::Msg::SetTotal(self.rows.len())); + } + fn set_records(&mut self, cache: &ArcCache, orders: &mut impl Orders) { + self.rows = cache.snapshot_policy.values().cloned().collect(); + + orders.proxy(Msg::Page).send_msg(paging::Msg::SetTotal(self.rows.len())); + } +} + +pub fn view(model: &Model, cache: &ArcCache, session: Option<&Session>) -> Node { + panel::view( + h3![class![C.py_4, C.font_normal, C.text_lg], "Automatic Snapshot Policies"], + div![ + table::wrapper_view(vec![ + table::thead_view(vec![ + table::th_view(plain!["Filesystem"]), + table::th_view(plain!["Interval"]), + table::th_view(plain!["Keep"]), + table::th_view(plain!["Daily"]), + table::th_view(plain!["Weekly"]), + table::th_view(plain!["Monthly"]), + table::th_view(plain!["Barrier"]), + table::th_view(plain!["Last Run"]), + restrict::view(session, GroupType::FilesystemAdministrators, th![]), + ]), + tbody![model.rows[model.pager.range()].iter().map(|x| { + tr![ + td![ + table::td_cls(), + class![C.text_center], + match get_fs_by_name(cache, &x.filesystem) { + Some(x) => { + div![resource_links::fs_link(&x)] + } + None => { + plain![x.filesystem.to_string()] + } + } + ], + table::td_center(plain![chrono::Duration::from_std(x.interval.0) + .map(HumanTime::from) + .map(|x| x.to_text_en(Accuracy::Precise, Tense::Present)) + .unwrap_or("---".into())]), + table::td_center(plain![x.keep.to_string()]), + table::td_center(plain![x.daily.to_string()]), + table::td_center(plain![x.weekly.to_string()]), + table::td_center(plain![x.monthly.to_string()]), + table::td_center(plain![x.barrier.to_string()]), + table::td_center(plain![x + .last_run + .map(|x| x.format("%m/%d/%Y %H:%M:%S").to_string()) + .unwrap_or_else(|| "---".to_string())]), + td![ + class![C.flex, C.justify_center, C.p_4, C.px_3], + restrict::view( + session, + GroupType::FilesystemAdministrators, + button![ + class![ + C.bg_blue_500, + C.duration_300, + C.flex, + C.hover__bg_blue_400, + C.items_center, + C.px_6, + C.py_2, + C.rounded_sm, + C.text_white, + C.transition_colors, + ], + font_awesome(class![C.w_3, C.h_3, C.inline, C.mr_1], "trash"), + "Delete Policy", + simple_ev(Ev::Click, Msg::Delete(Arc::clone(&x))) + ] + ) + ] + ] + })] + ]) + .merge_attrs(class![C.my_6]), + div![ + class![C.flex, C.justify_end, C.py_1, C.pr_3], + paging::limit_selection_view(&model.pager).map_msg(Msg::Page), + paging::page_count_view(&model.pager), + paging::next_prev_view(&model.pager).map_msg(Msg::Page) + ] + ], + ) +} diff --git a/iml-gui/crate/src/page/snapshot/mod.rs b/iml-gui/crate/src/page/snapshot/mod.rs index 9b419bd509..b998d2bf39 100644 --- a/iml-gui/crate/src/page/snapshot/mod.rs +++ b/iml-gui/crate/src/page/snapshot/mod.rs @@ -3,9 +3,11 @@ // license that can be found in the LICENSE file. mod add_interval; +mod create_policy; mod create_retention; mod list; mod list_interval; +mod list_policy; mod list_retention; mod take; @@ -36,6 +38,8 @@ pub struct Model { list: list::Model, add_interval: add_interval::Model, create_retention: create_retention::Model, + create_policy: create_policy::Model, + list_policy: list_policy::Model, } impl RecordChange for Model { @@ -55,7 +59,13 @@ impl RecordChange for Model { self.create_retention .update_record(record.clone(), cache, &mut orders.proxy(Msg::CreatRetention)); - self.take.update_record(record, cache, &mut orders.proxy(Msg::Take)); + self.take + .update_record(record.clone(), cache, &mut orders.proxy(Msg::Take)); + + self.list_policy + .update_record(record.clone(), cache, &mut orders.proxy(Msg::ListPolicy)); + self.create_policy + .update_record(record, cache, &mut orders.proxy(Msg::CreatePolicy)); } fn remove_record(&mut self, record: RecordId, cache: &ArcCache, orders: &mut impl Orders) { self.list.remove_record(record, cache, &mut orders.proxy(Msg::List)); @@ -73,6 +83,11 @@ impl RecordChange for Model { .remove_record(record, cache, &mut orders.proxy(Msg::CreatRetention)); self.take.remove_record(record, cache, &mut orders.proxy(Msg::Take)); + + self.list_policy + .remove_record(record, cache, &mut orders.proxy(Msg::ListPolicy)); + self.create_policy + .remove_record(record, cache, &mut orders.proxy(Msg::CreatePolicy)); } fn set_records(&mut self, cache: &ArcCache, orders: &mut impl Orders) { self.list.set_records(cache, &mut orders.proxy(Msg::List)); @@ -90,6 +105,10 @@ impl RecordChange for Model { .set_records(cache, &mut orders.proxy(Msg::CreatRetention)); self.take.set_records(cache, &mut orders.proxy(Msg::Take)); + + self.list_policy.set_records(cache, &mut orders.proxy(Msg::ListPolicy)); + self.create_policy + .set_records(cache, &mut orders.proxy(Msg::CreatePolicy)); } } @@ -101,6 +120,8 @@ pub enum Msg { List(list::Msg), AddInterval(add_interval::Msg), CreatRetention(create_retention::Msg), + CreatePolicy(create_policy::Msg), + ListPolicy(list_policy::Msg), } pub fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders) { @@ -123,6 +144,12 @@ pub fn update(msg: Msg, model: &mut Model, orders: &mut impl Orders) Msg::CreatRetention(msg) => { create_retention::update(msg, &mut model.create_retention, &mut orders.proxy(Msg::CreatRetention)); } + Msg::CreatePolicy(msg) => { + create_policy::update(msg, &mut model.create_policy, &mut orders.proxy(Msg::CreatePolicy)); + } + Msg::ListPolicy(msg) => { + list_policy::update(msg, &mut model.list_policy, &mut orders.proxy(Msg::ListPolicy)); + } } } @@ -162,6 +189,15 @@ pub fn view(model: &Model, cache: &ArcCache, session: Option<&Session>) -> impl }, create_retention_btn(session), create_retention::view(&model.create_retention).map_msg(Msg::CreatRetention), + if cache.snapshot_policy.is_empty() { + empty![] + } else { + list_policy::view(&model.list_policy, cache, session) + .map_msg(Msg::ListPolicy) + .merge_attrs(class![C.my_6]) + }, + create_policy_btn(session), + create_policy::view(&model.create_policy).map_msg(Msg::CreatePolicy), list::view(&model.list, cache).map_msg(Msg::List) ] } @@ -220,6 +256,31 @@ fn create_retention_btn(session: Option<&Session>) -> Node { ) } +fn create_policy_btn(session: Option<&Session>) -> Node { + restrict::view( + session, + GroupType::FilesystemAdministrators, + button![ + class![ + C.bg_blue_500, + C.duration_300, + C.flex, + C.hover__bg_blue_400, + C.items_center, + C.mb_6, + C.px_6, + C.py_2, + C.rounded_sm, + C.text_white, + C.transition_colors + ], + font_awesome(class![C.h_3, C.w_3, C.mr_1, C.inline], "plus"), + "Create Automatic Snapshot Policy", + simple_ev(Ev::Click, create_policy::Msg::Open).map_msg(Msg::CreatePolicy) + ], + ) +} + fn help_indicator(msg: &str, placement: Placement) -> Node { span![ attrs::container(), diff --git a/iml-gui/crate/src/test_utils/fixtures.rs b/iml-gui/crate/src/test_utils/fixtures.rs index 29528b9452..3f28ba4484 100644 --- a/iml-gui/crate/src/test_utils/fixtures.rs +++ b/iml-gui/crate/src/test_utils/fixtures.rs @@ -19,6 +19,7 @@ pub(crate) fn get_cache() -> Cache { "snapshot_interval": {}, "snapshot_retention": {}, "target_record": {}, + "snapshot_policy": {}, "active_alert": { "577": { "_message": "Updates are ready for server oss1.local", diff --git a/iml-manager-cli/src/display_utils.rs b/iml-manager-cli/src/display_utils.rs index 430cf6f186..801cb0e5db 100644 --- a/iml-manager-cli/src/display_utils.rs +++ b/iml-manager-cli/src/display_utils.rs @@ -8,7 +8,7 @@ use futures::{Future, FutureExt}; use iml_wire_types::{ db::TargetRecord, graphql::ServerProfile, - snapshot::{ReserveUnit, Snapshot, SnapshotInterval, SnapshotRetention}, + snapshot::{ReserveUnit, Snapshot, SnapshotInterval, SnapshotPolicy, SnapshotRetention}, Command, Filesystem, Host, OstPool, StratagemConfiguration, StratagemReport, }; use indicatif::ProgressBar; @@ -187,6 +187,37 @@ impl IntoTable for Vec { } } +impl IntoTable for Vec { + fn into_table(self) -> Table { + generate_table( + &[ + "Filesystem", + "Interval", + "Keep", + "Daily", + "Weekly", + "Monthly", + "Barrier", + "Last Run", + ], + self.into_iter().map(|p| { + vec![ + p.filesystem, + p.interval.to_string(), + p.keep.to_string(), + p.daily.to_string(), + p.weekly.to_string(), + p.monthly.to_string(), + p.barrier.to_string(), + p.last_run + .map(|t| t.to_rfc2822()) + .unwrap_or_else(|| "---".to_string()), + ] + }), + ) + } +} + impl IntoTable for Vec { fn into_table(self) -> Table { generate_table( diff --git a/iml-manager-cli/src/snapshot.rs b/iml-manager-cli/src/snapshot.rs index 590d8008c1..736531ccd2 100644 --- a/iml-manager-cli/src/snapshot.rs +++ b/iml-manager-cli/src/snapshot.rs @@ -67,6 +67,53 @@ pub enum RetentionCommand { }, } +#[derive(Debug, StructOpt)] +pub enum PolicyCommand { + /// List snapshot policies (default) + List { + /// Display type: json, yaml, tabular + #[structopt(short = "d", long = "display", default_value = "tabular")] + display_type: DisplayType, + }, + /// Create or update snapshot policy + Create { + /// Filesystem to create a snapshot policy for + filesystem: String, + /// Automatic snapshot interval in human form, e. g. 1hour + #[structopt()] + interval: String, + /// Use barrier when creating snapshots + #[structopt(short = "b", long = "barrier")] + barrier: bool, + /// Number of recent snapshots to keep + #[structopt(short = "k", long = "keep")] + keep: i32, + /// Number of days when keep the most recent snapshot of each day + #[structopt(short = "d", long = "daily")] + daily: Option, + /// Number of weeks when keep the most recent snapshot of each week + #[structopt(short = "w", long = "weekly")] + weekly: Option, + /// Number of months when keep the most recent snapshot of each month + #[structopt(short = "m", long = "monthly")] + monthly: Option, + }, + /// Remove snapshot policies + Remove { + /// Filesystem names to remove policies for + #[structopt(required = true, min_values = 1)] + filesystem: Vec, + }, +} + +impl Default for PolicyCommand { + fn default() -> Self { + PolicyCommand::List { + display_type: DisplayType::Tabular, + } + } +} + #[derive(Debug, StructOpt)] pub enum SnapshotCommand { /// Create a snapshot @@ -89,6 +136,11 @@ pub enum SnapshotCommand { Interval(IntervalCommand), /// Snapshot retention rules operations Retention(RetentionCommand), + /// Automatic snapshot policies operations + Policy { + #[structopt(subcommand)] + command: Option, + }, } async fn interval_cli(cmd: IntervalCommand) -> Result<(), ImlManagerCliError> { @@ -181,6 +233,60 @@ async fn retention_cli(cmd: RetentionCommand) -> Result<(), ImlManagerCliError> } } +async fn policy_cli(cmd: PolicyCommand) -> Result<(), ImlManagerCliError> { + match cmd { + PolicyCommand::List { display_type } => { + let query = snapshot_queries::policy::list::build(); + + let resp: iml_graphql_queries::Response = + graphql(query).await?; + let policies = Result::from(resp)?.data.snapshot_policies; + + let x = policies.into_display_type(display_type); + + let term = Term::stdout(); + term.write_line(&x).unwrap(); + + Ok(()) + } + PolicyCommand::Create { + filesystem, + interval, + barrier, + keep, + daily, + weekly, + monthly, + } => { + let query = + snapshot_queries::policy::create::build(snapshot_queries::policy::create::Vars { + filesystem, + interval, + barrier: Some(barrier), + keep, + daily, + weekly, + monthly, + }); + + let _resp: iml_graphql_queries::Response = + graphql(query).await?; + + Ok(()) + } + PolicyCommand::Remove { filesystem } => { + for fs in filesystem { + let query = snapshot_queries::policy::remove::build(fs); + + let _resp: iml_graphql_queries::Response = + graphql(query).await?; + } + + Ok(()) + } + } +} + pub async fn snapshot_cli(command: SnapshotCommand) -> Result<(), ImlManagerCliError> { match command { SnapshotCommand::List { @@ -240,5 +346,6 @@ pub async fn snapshot_cli(command: SnapshotCommand) -> Result<(), ImlManagerCliE } SnapshotCommand::Interval(cmd) => interval_cli(cmd).await, SnapshotCommand::Retention(cmd) => retention_cli(cmd).await, + SnapshotCommand::Policy { command } => policy_cli(command.unwrap_or_default()).await, } } diff --git a/iml-services/iml-snapshot/Cargo.toml b/iml-services/iml-snapshot/Cargo.toml index 3bc9043c6b..636360c9ed 100644 --- a/iml-services/iml-snapshot/Cargo.toml +++ b/iml-services/iml-snapshot/Cargo.toml @@ -5,18 +5,19 @@ name = "iml-snapshot" version = "0.4.0" [dependencies] +chrono = "0.4" futures = "0.3" futures-util = "0.3" iml-command-utils = {path = "../../iml-command-utils", version = "0.4"} iml-graphql-queries = {path = "../../iml-graphql-queries", version = "0.2"} -iml-influx = {path = "../../iml-influx", version = "0.2"} +iml-influx = {path = "../../iml-influx", version = "0.2", features = ["with-db-client"]} iml-manager-client = {path = "../../iml-manager-client", version = "0.4"} iml-manager-env = {path = "../../iml-manager-env", version = "0.4"} iml-postgres = {path = "../../iml-postgres", version = "0.4"} iml-rabbit = {path = "../../iml-rabbit", version = "0.4"} iml-service-queue = {path = "../iml-service-queue", version = "0.4"} iml-tracing = {version = "0.3", path = "../../iml-tracing"} -iml-wire-types = {path = "../../iml-wire-types", version = "0.4"} +iml-wire-types = {path = "../../iml-wire-types", version = "0.4", features = ["postgres-interop"]} serde = "1.0" thiserror = "1.0" tokio = {version = "0.2", features = ["rt-threaded"]} diff --git a/iml-services/iml-snapshot/src/lib.rs b/iml-services/iml-snapshot/src/lib.rs index cbaf33d6b5..4d50d6eb18 100644 --- a/iml-services/iml-snapshot/src/lib.rs +++ b/iml-services/iml-snapshot/src/lib.rs @@ -3,6 +3,7 @@ use iml_manager_client::ImlManagerClientError; use tokio::time::Instant; pub mod client_monitor; +pub mod policy; pub mod retention; #[derive(thiserror::Error, Debug)] diff --git a/iml-services/iml-snapshot/src/main.rs b/iml-services/iml-snapshot/src/main.rs index d33c580b93..3115f4ad29 100644 --- a/iml-services/iml-snapshot/src/main.rs +++ b/iml-services/iml-snapshot/src/main.rs @@ -8,7 +8,7 @@ use iml_manager_client::{Client as ManagerClient, Url}; use iml_manager_env::{get_influxdb_addr, get_influxdb_metrics_db, get_pool_limit}; use iml_postgres::{get_db_pool, sqlx}; use iml_service_queue::service_queue::consume_data; -use iml_snapshot::{client_monitor::tick, retention::handle_retention_rules, MonitorState}; +use iml_snapshot::{client_monitor::tick, policy, retention::handle_retention_rules, MonitorState}; use iml_tracing::tracing; use iml_wire_types::snapshot; use std::collections::HashMap; @@ -30,8 +30,6 @@ async fn main() -> Result<(), Box> { consume_data::>>(&ch, "rust_agent_snapshot_rx"); let pool = get_db_pool(get_pool_limit().unwrap_or(DEFAULT_POOL_LIMIT)).await?; - let pool_2 = pool.clone(); - let pool_3 = pool.clone(); let manager_client: ManagerClient = iml_manager_client::get_client()?; @@ -43,6 +41,7 @@ async fn main() -> Result<(), Box> { sqlx::migrate!("../../migrations").run(&pool).await?; + let pool_2 = pool.clone(); tokio::spawn(async move { let mut interval = interval(Duration::from_secs(60)); let mut snapshot_client_counts: HashMap = HashMap::new(); @@ -56,11 +55,13 @@ async fn main() -> Result<(), Box> { }); tokio::spawn(handle_retention_rules( - manager_client, + manager_client.clone(), influx_client, - pool_3.clone(), + pool.clone(), )); + tokio::spawn(policy::main(manager_client, pool.clone())); + while let Some((fqdn, snap_map)) = s.try_next().await? { for (fs_name, snapshots) in snap_map { tracing::debug!("snapshots from {}: {:?}", fqdn, snapshots); diff --git a/iml-services/iml-snapshot/src/policy.rs b/iml-services/iml-snapshot/src/policy.rs new file mode 100644 index 0000000000..b77eaef87c --- /dev/null +++ b/iml-services/iml-snapshot/src/policy.rs @@ -0,0 +1,524 @@ +// Copyright (c) 2020 DDN. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +use crate::Error; +use chrono::{DateTime, Datelike as _, Duration, NaiveDate, NaiveDateTime, Utc}; +use futures::future::{try_join_all, AbortHandle, Abortable}; +use iml_command_utils::wait_for_cmds_success; +use iml_graphql_queries::snapshot; +use iml_manager_client::{graphql, Client}; +use iml_postgres::{sqlx, PgPool}; +use iml_tracing::tracing; +use iml_wire_types::{snapshot::SnapshotPolicy, Command}; +use std::collections::{HashMap, HashSet, LinkedList}; + +pub async fn main(client: Client, pg: PgPool) -> Result<(), Error> { + let mut svcs: HashMap = HashMap::new(); + loop { + let resp: iml_graphql_queries::Response = + graphql(client.clone(), snapshot::policy::list::build()).await?; + + let new: HashSet = Result::from(resp)? + .data + .snapshot_policies + .into_iter() + .collect(); + + let old: HashSet = svcs.keys().cloned().collect(); + + for p in old.difference(&new) { + tracing::debug!("stopping obsolete {:?}", p); + let (c, d) = svcs.remove(p).unwrap(); + c.abort(); + d.abort(); + } + for p in new.difference(&old).cloned() { + tracing::debug!("starting new {:?}", p); + let ah = start(client.clone(), pg.clone(), p.clone()); + svcs.insert(p, ah); + } + + tokio::time::delay_for(tokio::time::Duration::from_secs(6)).await; + } +} + +fn start(client: Client, pg: PgPool, policy: SnapshotPolicy) -> (AbortHandle, AbortHandle) { + let (create, create_reg) = AbortHandle::new_pair(); + let (destroy, destroy_reg) = AbortHandle::new_pair(); + + let client_1 = client.clone(); + let pg_1 = pg.clone(); + let policy_1 = policy.clone(); + tokio::spawn(Abortable::new( + async move { + loop { + if let Err(e) = create_snapshot(&client_1, &pg_1, &policy_1).await { + tracing::warn!("automatic snapshot failed: {}", e); + tokio::time::delay_for(tokio::time::Duration::from_secs(10)).await; + } + } + }, + create_reg, + )); + + tokio::spawn(Abortable::new( + async move { + loop { + if let Err(e) = destroy_obsolete(&client, &pg, &policy).await { + tracing::warn!("obsolete snapshot destroying: {}", e); + tokio::time::delay_for(tokio::time::Duration::from_secs(10)).await; + } + } + }, + destroy_reg, + )); + + (create, destroy) +} + +async fn create_snapshot( + client: &Client, + pg: &PgPool, + policy: &SnapshotPolicy, +) -> Result<(), Error> { + let latest = sqlx::query!( + r#" + SELECT snapshot_name, create_time FROM snapshot + WHERE filesystem_name = $1 + AND snapshot_name LIKE '\_\_%' + ORDER BY create_time DESC + LIMIT 1 + "#, + policy.filesystem + ) + .fetch_optional(pg) + .await?; + + if let Some(x) = latest { + tracing::debug!( + "latest automatic snapshot of {}: {} at {:?}", + policy.filesystem, + x.snapshot_name, + x.create_time + ); + let pause = Duration::to_std(&(Utc::now() - x.create_time)) + .ok() + .and_then(|y| policy.interval.0.checked_sub(y)) + .unwrap_or(std::time::Duration::from_secs(0)); + + if pause.as_secs() > 0 { + tracing::debug!( + "sleeping {:?} before creating next snapshot of {}", + pause, + policy.filesystem + ); + tokio::time::delay_for(pause).await; + } + } + + let time = Utc::now(); + let name = time.format("__%s"); + tracing::info!( + "creating automatic snapshot {} of {}", + name, + policy.filesystem + ); + + let query = snapshot::create::build( + policy.filesystem.clone(), + name, + "automatic snapshot".into(), + Some(policy.barrier), + ); + let resp: iml_graphql_queries::Response = + graphql(client.clone(), query).await?; + let x = Result::from(resp)?.data.create_snapshot; + wait_for_cmds_success(&[x], None).await?; + + sqlx::query!( + "UPDATE snapshot_policy SET last_run = $1 WHERE filesystem = $2", + time, + policy.filesystem + ) + .execute(pg) + .await?; + + // XXX This is need to make sure the new snapshot is registered. + // XXX Might need to registered snapshots right after creation (not relying on polling). + let cooldown = policy.interval.0.div_f64(2.0); + tracing::debug!("cooldown sleeping {:?} for {}", cooldown, policy.filesystem); + tokio::time::delay_for(cooldown).await; + + Ok(()) +} + +async fn destroy_obsolete( + client: &Client, + pg: &PgPool, + policy: &SnapshotPolicy, +) -> Result<(), Error> { + let snapshots = sqlx::query!( + r#" + SELECT snapshot_name, create_time FROM snapshot + WHERE filesystem_name = $1 + AND snapshot_name LIKE '\_\_%' + ORDER BY create_time DESC + "#, + policy.filesystem + ) + .fetch_all(pg) + .await? + .into_iter() + .map(|x| (x.create_time, x.snapshot_name)) + .collect::>(); + + let obsolete = get_obsolete(policy, snapshots); + + if obsolete.is_empty() { + tracing::info!("no obsolete snapshots of {}", policy.filesystem); + } else { + let cmd_futs: Vec<_> = obsolete + .iter() + .map(|x| destroy_snapshot(client.clone(), policy.filesystem.clone(), x.clone())) + .collect(); + let cmds = try_join_all(cmd_futs).await?; + wait_for_cmds_success(&cmds, None).await?; + + tracing::info!("destroyed obsolete snapshots: {}", obsolete.join(",")); + + sqlx::query!( + "UPDATE snapshot_policy SET last_run = $1 WHERE filesystem = $2", + Utc::now(), + policy.filesystem + ) + .execute(pg) + .await?; + } + + let cooldown = policy.interval.0.div_f64(2.0); + tracing::debug!( + "sleeping {:?} before next search for obsolete snapshots of {}", + cooldown, + policy.filesystem + ); + tokio::time::delay_for(cooldown).await; + + Ok(()) +} + +async fn destroy_snapshot( + client: Client, + filesystem: String, + snapshot: String, +) -> Result { + let query = snapshot::destroy::build(filesystem, snapshot, true); + let resp: iml_graphql_queries::Response = + graphql(client, query).await?; + let cmd = Result::from(resp)?.data.destroy_snapshot; + Ok(cmd) +} + +fn get_obsolete(policy: &SnapshotPolicy, snapshots: Vec<(DateTime, String)>) -> Vec { + let mut tail: Vec<_> = snapshots + .iter() + .skip(policy.keep as usize) + .map(|x| x.0) + .collect(); + + tracing::debug!( + "snapshots of {} to consider for deletion after the latest {}: {:?}", + policy.filesystem, + policy.keep, + tail + ); + + let mut to_delete: Vec> = Vec::with_capacity(tail.len()); + + // Handle daily snapshots: + if let Some(x) = tail.get(0) { + let next_day = x.date().succ().and_hms(0, 0, 0); + let cut = next_day - Duration::days(policy.daily as i64); + + let (daily, new_tail): (Vec<_>, Vec<_>) = tail.into_iter().partition(|x| *x > cut); + tracing::debug!("daily snapshots to consider: {:?}", daily); + + let datetimes: Vec = daily.iter().map(|x| x.naive_utc()).collect(); + let res = partition_datetime( + &|_| Duration::days(1).num_seconds(), + next_day.naive_utc(), + &datetimes, + ); + tracing::debug!("daily partition: {:?}", res); + for x in res.iter() { + for y in x.iter().skip(1) { + to_delete.push(DateTime::from_utc(*y, Utc)); + } + } + tail = new_tail; + } + tracing::debug!( + "snapshots of {} to consider for deletion after the daily schedule: {:?}", + policy.filesystem, + tail + ); + + // Handle weekly snapshots: + if let Some(x) = tail.get(0) { + let date = x.date(); + let days_to_next_week = Duration::days((7 - date.weekday().num_days_from_monday()).into()); + let next_week = (date + days_to_next_week).and_hms(0, 0, 0); + let cut = next_week - Duration::weeks(policy.weekly as i64); + + let (weekly, new_tail): (Vec<_>, Vec<_>) = tail.into_iter().partition(|x| *x > cut); + tracing::debug!("weekly snapshots to consider: {:?}", weekly); + + let datetimes: Vec = weekly.iter().map(|x| x.naive_utc()).collect(); + let res = partition_datetime( + &|_| Duration::weeks(1).num_seconds(), + next_week.naive_utc(), + &datetimes, + ); + tracing::debug!("weekly partition: {:?}", res); + for x in res.iter() { + for y in x.iter().skip(1) { + to_delete.push(DateTime::from_utc(*y, Utc)); + } + } + tail = new_tail; + } + tracing::debug!( + "snapshots of {} to consider for deletion after the weekly schedule: {:?}", + policy.filesystem, + tail + ); + + // Handle monthly snapshots: + if let Some(x) = tail.get(0) { + let next_month = add_month(&x.naive_utc(), 1); + let cut = DateTime::::from_utc(add_month(&next_month, -policy.monthly), Utc); + let f = |n: u32| { + let n_month = add_month(&next_month, 0 - n as i32); + let n1_month = add_month(&n_month, 1); + (n1_month - n_month).num_seconds() + }; + + let (monthly, new_tail): (Vec<_>, Vec<_>) = tail.into_iter().partition(|x| *x > cut); + tracing::debug!("monthly snapshots to consider: {:?}, {:?}", cut, monthly); + + let datetimes: Vec = monthly.iter().map(|x| x.naive_utc()).collect(); + let res = partition_datetime(&f, next_month, &datetimes); + tracing::debug!("monthly partition: {:?}", res); + for x in res.iter() { + for y in x.iter().skip(1) { + to_delete.push(DateTime::from_utc(*y, Utc)); + } + } + tail = new_tail; + } + tracing::debug!( + "snapshots of {} to consider for deletion after the monthly schedule: {:?}", + policy.filesystem, + tail + ); + + to_delete.append(&mut tail); + + to_delete.sort_unstable(); + snapshots + .into_iter() + .filter(|x| to_delete.binary_search(&x.0).is_ok()) + .map(|x| x.1) + .collect() +} + +fn add_month(date: &NaiveDateTime, n: i32) -> NaiveDateTime { + let month = date.date().month() as i32; + + let x = month + n; + let new_year = date.date().year() + + if x > 12 { + x / 12 + } else if x < 0 { + x / 12 - 1 + } else { + 0 + }; + + let x = month + n % 12; + let new_month = if x > 12 { + x - 12 + } else if x <= 0 { + 12 + x + } else { + x + } as u32; + + let new_date = NaiveDate::from_ymd(new_year, new_month, 1); + + new_date.and_hms(0, 0, 0) +} + +fn partition(f: &dyn Fn(u32) -> i64, v0: i64, v: I) -> LinkedList> +where + I: IntoIterator, +{ + let mut term: LinkedList = LinkedList::new(); + let mut res: LinkedList> = LinkedList::new(); + let mut n: u32 = 1; + let mut a: i64 = v0; + + for i in v { + while i >= a + f(n) { + res.push_back(term); + term = LinkedList::new(); + a += f(n); + n += 1; + } + term.push_back(i); + } + res.push_back(term); + + res +} + +fn partition_datetime( + f: &dyn Fn(u32) -> i64, + start: NaiveDateTime, + datetimes: &[NaiveDateTime], +) -> LinkedList> { + let mut v: Vec = datetimes + .into_iter() + .map(|&d| (start - d).num_seconds()) + .collect(); + v.sort_unstable(); + v.dedup(); + + let part = partition(f, 0, v); + + part.into_iter() + .map(|l| { + l.into_iter() + .map(|d| start - Duration::seconds(d)) + .collect() + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Timelike; + use iml_wire_types::graphql_duration::GraphQLDuration; + + #[test] + fn test_add_month() { + for (d0, n, d) in vec![ + ("2020-11-24T12:34:56Z", 0, "2020-11-01T00:00:00Z"), + ("2020-10-24T14:35:02Z", 1, "2020-11-01T00:00:00Z"), + ("2020-11-24T14:35:02Z", 1, "2020-12-01T00:00:00Z"), + ("2020-01-11T14:34:10Z", 10, "2020-11-01T00:00:00Z"), + ("2020-12-01T06:34:12Z", 23, "2022-11-01T00:00:00Z"), + ("2020-03-11T06:34:12Z", 36, "2023-03-01T00:00:00Z"), + ("2020-10-24T14:35:02Z", -1, "2020-09-01T00:00:00Z"), + ("2020-12-01T00:00:00Z", -3, "2020-09-01T00:00:00Z"), + ("2020-10-24T14:35:02Z", -12, "2019-10-01T00:00:00Z"), + ("2020-10-14T14:35:02Z", -16, "2019-06-01T00:00:00Z"), + ("2020-09-04T14:35:02Z", -36, "2017-09-01T00:00:00Z"), + ] { + let start = DateTime::parse_from_rfc3339(d0) + .unwrap() + .with_timezone(&Utc) + .naive_utc(); + let end = DateTime::parse_from_rfc3339(d) + .unwrap() + .with_timezone(&Utc) + .naive_utc(); + + assert_eq!(add_month(&start, n), end); + } + } + + #[test] + fn test_get_obsolete() { + let policy = SnapshotPolicy { + id: 0, + filesystem: "fs".into(), + interval: GraphQLDuration(std::time::Duration::from_secs(60)), + barrier: true, + keep: 2, + daily: 4, + weekly: 3, + monthly: 3, + last_run: None, + }; + + let snapshots: Vec<(DateTime, String)> = vec![ + "2020-11-24T14:36:01Z", + "2020-11-24T14:35:02Z", + "2020-11-24T14:34:11Z", + "2020-11-24T14:33:00Z", + "2020-11-24T04:30:00Z", + "2020-11-23T04:36:12Z", + "2020-11-23T04:34:00Z", + "2020-11-23T01:30:00Z", + "2020-11-22T21:38:13Z", + "2020-11-22T16:32:00Z", + "2020-11-22T03:33:00Z", + "2020-11-21T23:22:14Z", + "2020-11-21T11:59:00Z", + "2020-11-17T00:59:21Z", + "2020-11-14T23:22:22Z", + "2020-11-14T11:59:00Z", + "2020-11-13T09:44:00Z", + "2020-11-13T08:37:00Z", + "2020-11-12T05:11:00Z", + "2020-11-06T23:11:23Z", + "2020-11-05T13:55:00Z", + "2020-11-01T13:11:31Z", + "2020-10-31T10:55:32Z", + "2020-10-31T00:55:00Z", + "2020-10-23T00:55:00Z", + "2020-10-01T00:01:00Z", + "2020-09-21T00:00:33Z", + ] + .into_iter() + .map(|t| { + ( + DateTime::parse_from_rfc3339(t).unwrap().with_timezone(&Utc), + t.into(), + ) + }) + .collect(); + + let expected_number_of_obsolete = snapshots + .iter() + .filter(|x| x.0.time().second() == 0) + .count(); + + let obsolete = get_obsolete(&policy, snapshots); + let expected_obsolete: Vec = vec![ + "2020-11-24T14:33:00Z", + "2020-11-24T04:30:00Z", + "2020-11-23T04:34:00Z", + "2020-11-23T01:30:00Z", + "2020-11-22T16:32:00Z", + "2020-11-22T03:33:00Z", + "2020-11-21T11:59:00Z", + "2020-11-14T11:59:00Z", + "2020-11-13T09:44:00Z", + "2020-11-13T08:37:00Z", + "2020-11-12T05:11:00Z", + "2020-11-05T13:55:00Z", + "2020-10-31T00:55:00Z", + "2020-10-23T00:55:00Z", + "2020-10-01T00:01:00Z", + ] + .into_iter() + .map(String::from) + .collect(); + + assert_eq!(obsolete, expected_obsolete); + assert_eq!(obsolete.len(), expected_number_of_obsolete); + } +} diff --git a/iml-warp-drive/src/cache.rs b/iml-warp-drive/src/cache.rs index 43cb16c8b2..a1beae3a3f 100644 --- a/iml-warp-drive/src/cache.rs +++ b/iml-warp-drive/src/cache.rs @@ -18,7 +18,7 @@ use iml_wire_types::{ EnclosureType, HealthState, JobState, JobType, MemberState, SfaController, SfaDiskDrive, SfaEnclosure, SfaJob, SfaPowerSupply, SfaStorageSystem, SubTargetType, }, - snapshot::{ReserveUnit, SnapshotInterval, SnapshotRecord, SnapshotRetention}, + snapshot::{ReserveUnit, SnapshotInterval, SnapshotPolicy, SnapshotRecord, SnapshotRetention}, warp_drive::{Cache, Record, RecordChange, RecordId}, Alert, ApiList, EndpointName, Filesystem, FlatQuery, Host, Target, TargetConfParam, }; @@ -198,6 +198,12 @@ pub async fn db_record_to_change_record( Ok(RecordChange::Update(Record::SnapshotRetention(x))) } }, + DbRecord::SnapshotPolicy(x) => match (msg_type, x) { + (MessageType::Delete, x) => Ok(RecordChange::Delete(RecordId::SnapshotPolicy(x.id))), + (MessageType::Insert, x) | (MessageType::Update, x) => { + Ok(RecordChange::Update(Record::SnapshotPolicy(x))) + } + }, DbRecord::LnetConfiguration(x) => match (msg_type, x) { (MessageType::Delete, x) => { Ok(RecordChange::Delete(RecordId::LnetConfiguration(x.id()))) @@ -551,6 +557,27 @@ pub async fn populate_from_db( .try_collect() .await?; + cache.snapshot_policy = sqlx::query!(r#"SELECT * FROM snapshot_policy"#) + .fetch(pool) + .map_ok(|x| { + ( + x.id, + SnapshotPolicy { + id: x.id, + filesystem: x.filesystem, + interval: x.interval.into(), + barrier: x.barrier, + keep: x.keep, + daily: x.daily, + weekly: x.weekly, + monthly: x.monthly, + last_run: x.last_run, + }, + ) + }) + .try_collect() + .await?; + cache.stratagem_config = sqlx::query_as!( StratagemConfiguration, "select * from chroma_core_stratagemconfiguration where not_deleted = 't'" diff --git a/iml-warp-drive/src/db_record.rs b/iml-warp-drive/src/db_record.rs index 42e44136d7..1f9ac3525b 100644 --- a/iml-warp-drive/src/db_record.rs +++ b/iml-warp-drive/src/db_record.rs @@ -22,8 +22,9 @@ use iml_wire_types::{ SFA_JOB_TABLE_NAME, SFA_POWER_SUPPLY_TABLE_NAME, SFA_STORAGE_SYSTEM_TABLE_NAME, }, snapshot::{ - SnapshotInterval, SnapshotRecord, SnapshotRetention, SNAPSHOT_INTERVAL_TABLE_NAME, - SNAPSHOT_RETENTION_TABLE_NAME, SNAPSHOT_TABLE_NAME, + SnapshotInterval, SnapshotPolicy, SnapshotRecord, SnapshotRetention, + SNAPSHOT_INTERVAL_TABLE_NAME, SNAPSHOT_POLICY_TABLE_NAME, SNAPSHOT_RETENTION_TABLE_NAME, + SNAPSHOT_TABLE_NAME, }, }; use serde::de::Error; @@ -56,6 +57,7 @@ pub enum DbRecord { Snapshot(SnapshotRecord), SnapshotInterval(SnapshotInterval), SnapshotRetention(SnapshotRetention), + SnapshotPolicy(SnapshotPolicy), StratagemConfiguration(StratagemConfiguration), TargetRecord(TargetRecord), Volume(VolumeRecord), @@ -103,6 +105,7 @@ impl TryFrom<(TableName<'_>, serde_json::Value)> for DbRecord { serde_json::from_value(x).map(DbRecord::SnapshotRetention) } TARGET_TABLE_NAME => serde_json::from_value(x).map(DbRecord::TargetRecord), + SNAPSHOT_POLICY_TABLE_NAME => serde_json::from_value(x).map(DbRecord::SnapshotPolicy), LNET_CONFIGURATION_TABLE_NAME => { serde_json::from_value(x).map(DbRecord::LnetConfiguration) } diff --git a/iml-wire-types/src/graphql_duration.rs b/iml-wire-types/src/graphql_duration.rs index a31bb8016e..822731ffe7 100644 --- a/iml-wire-types/src/graphql_duration.rs +++ b/iml-wire-types/src/graphql_duration.rs @@ -8,7 +8,7 @@ use sqlx::postgres::types::PgInterval; use std::convert::TryInto; use std::{convert::TryFrom, fmt, time::Duration}; -#[derive(serde::Deserialize, serde::Serialize, Clone, PartialEq, Debug)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Eq, Hash, PartialEq, Debug)] #[serde(try_from = "String", into = "String")] pub struct GraphQLDuration(pub Duration); diff --git a/iml-wire-types/src/snapshot.rs b/iml-wire-types/src/snapshot.rs index 3bf2344f3c..62218b210b 100644 --- a/iml-wire-types/src/snapshot.rs +++ b/iml-wire-types/src/snapshot.rs @@ -60,7 +60,7 @@ pub const SNAPSHOT_TABLE_NAME: TableName = TableName("snapshot"); #[cfg_attr(feature = "graphql", derive(juniper::GraphQLObject))] #[derive(serde::Deserialize, serde::Serialize, Clone, PartialEq, Debug)] -/// A Snapshot interval +/// A Snapshot interval. TODO: Delete after SnapshotPolicy is settled pub struct SnapshotInterval { /// The configuration id pub id: i32, @@ -70,7 +70,7 @@ pub struct SnapshotInterval { pub use_barrier: bool, /// The interval configuration pub interval: GraphQLDuration, - // Last known run + /// Last known run pub last_run: Option>, } @@ -84,6 +84,7 @@ pub const SNAPSHOT_INTERVAL_TABLE_NAME: TableName = TableName("snapshot_interval #[cfg_attr(feature = "graphql", derive(juniper::GraphQLObject))] #[derive(serde::Deserialize, serde::Serialize, Clone, PartialEq, Debug)] +/// A Snapshot retention policy. TODO: Delete after SnapshotPolicy is settled pub struct SnapshotRetention { pub id: i32, pub filesystem_name: String, @@ -131,6 +132,62 @@ impl FromStr for ReserveUnit { } } +#[cfg_attr(feature = "graphql", derive(juniper::GraphQLObject))] +#[derive(serde::Deserialize, serde::Serialize, Eq, Clone, Debug)] +/// Automatic snapshot policy +pub struct SnapshotPolicy { + /// The configuration id + pub id: i32, + /// The filesystem name + pub filesystem: String, + /// The interval configuration + pub interval: GraphQLDuration, + /// Use a write barrier + pub barrier: bool, + /// Number of recent snapshots to keep + pub keep: i32, + /// Then, number of days to keep the most recent snapshot of each day + pub daily: i32, + /// Then, number of weeks to keep the most recent snapshot of each week + pub weekly: i32, + /// Then, number of months to keep the most recent snapshot of each months + pub monthly: i32, + /// Last known run + pub last_run: Option>, +} + +impl PartialEq for SnapshotPolicy { + fn eq(&self, other: &Self) -> bool { + self.filesystem == other.filesystem + && self.interval == other.interval + && self.barrier == other.barrier + && self.keep == other.keep + && self.daily == other.daily + && self.weekly == other.weekly + && self.monthly == other.monthly + } +} + +impl std::hash::Hash for SnapshotPolicy { + fn hash(&self, state: &mut H) { + self.filesystem.hash(state); + self.interval.hash(state); + self.barrier.hash(state); + self.keep.hash(state); + self.daily.hash(state); + self.weekly.hash(state); + self.monthly.hash(state); + } +} + +impl Id for SnapshotPolicy { + fn id(&self) -> i32 { + self.id + } +} + +pub const SNAPSHOT_POLICY_TABLE_NAME: TableName = TableName("snapshot_policy"); + #[derive(serde::Deserialize, Debug)] #[cfg_attr(feature = "cli", derive(StructOpt))] /// Ask agent to create a snapshot diff --git a/iml-wire-types/src/warp_drive.rs b/iml-wire-types/src/warp_drive.rs index 1b74c1e19e..402d905899 100644 --- a/iml-wire-types/src/warp_drive.rs +++ b/iml-wire-types/src/warp_drive.rs @@ -10,7 +10,7 @@ use crate::{ TargetRecord, VolumeNodeRecord, VolumeRecord, }, sfa::{SfaController, SfaDiskDrive, SfaEnclosure, SfaJob, SfaPowerSupply, SfaStorageSystem}, - snapshot::{SnapshotInterval, SnapshotRecord, SnapshotRetention}, + snapshot::{SnapshotInterval, SnapshotPolicy, SnapshotRecord, SnapshotRetention}, Alert, CompositeId, EndpointNameSelf, Filesystem, Host, Label, LockChange, Target, TargetConfParam, ToCompositeId, }; @@ -111,6 +111,7 @@ pub struct Cache { pub snapshot: HashMap, pub snapshot_interval: HashMap, pub snapshot_retention: HashMap, + pub snapshot_policy: HashMap, pub stratagem_config: HashMap, pub target: HashMap>, pub target_record: HashMap, @@ -143,6 +144,7 @@ pub struct ArcCache { pub snapshot: HashMap>, pub snapshot_interval: HashMap>, pub snapshot_retention: HashMap>, + pub snapshot_policy: HashMap>, pub stratagem_config: HashMap>, pub target: HashMap>>, pub target_record: HashMap>, @@ -181,6 +183,7 @@ impl Cache { RecordId::Snapshot(id) => self.snapshot.remove(&id).is_some(), RecordId::SnapshotInterval(id) => self.snapshot_interval.remove(&id).is_some(), RecordId::SnapshotRetention(id) => self.snapshot_retention.remove(&id).is_some(), + RecordId::SnapshotPolicy(id) => self.snapshot_policy.remove(&id).is_some(), RecordId::Target(id) => self.target.remove(&id).is_some(), RecordId::TargetRecord(id) => self.target_record.remove(&id).is_some(), RecordId::User(id) => self.user.remove(&id).is_some(), @@ -252,6 +255,9 @@ impl Cache { Record::SnapshotRetention(x) => { self.snapshot_retention.insert(x.id(), x); } + Record::SnapshotPolicy(x) => { + self.snapshot_policy.insert(x.id(), x); + } Record::StratagemConfig(x) => { self.stratagem_config.insert(x.id(), x); } @@ -315,6 +321,7 @@ impl ArcCache { RecordId::Snapshot(id) => self.snapshot.remove(&id).is_some(), RecordId::SnapshotInterval(id) => self.snapshot_interval.remove(&id).is_some(), RecordId::SnapshotRetention(id) => self.snapshot_retention.remove(&id).is_some(), + RecordId::SnapshotPolicy(id) => self.snapshot_policy.remove(&id).is_some(), RecordId::StratagemConfig(id) => self.stratagem_config.remove(&id).is_some(), RecordId::Target(id) => self.target.remove(&id).is_some(), RecordId::TargetRecord(id) => self.target_record.remove(&id).is_some(), @@ -387,6 +394,9 @@ impl ArcCache { Record::SnapshotRetention(x) => { self.snapshot_retention.insert(x.id(), Arc::new(x)); } + Record::SnapshotPolicy(x) => { + self.snapshot_policy.insert(x.id(), Arc::new(x)); + } Record::StratagemConfig(x) => { self.stratagem_config.insert(x.id(), Arc::new(x)); } @@ -441,6 +451,7 @@ impl ArcCache { } } +// TODO: Use a macro impl From<&Cache> for ArcCache { fn from(cache: &Cache) -> Self { Self { @@ -464,6 +475,7 @@ impl From<&Cache> for ArcCache { snapshot: hashmap_to_arc_hashmap(&cache.snapshot), snapshot_interval: hashmap_to_arc_hashmap(&cache.snapshot_interval), snapshot_retention: hashmap_to_arc_hashmap(&cache.snapshot_retention), + snapshot_policy: hashmap_to_arc_hashmap(&cache.snapshot_policy), stratagem_config: hashmap_to_arc_hashmap(&cache.stratagem_config), target: hashmap_to_arc_hashmap(&cache.target), target_record: hashmap_to_arc_hashmap(&cache.target_record), @@ -475,6 +487,7 @@ impl From<&Cache> for ArcCache { } } +// TODO: Use a macro impl From<&ArcCache> for Cache { fn from(cache: &ArcCache) -> Self { Self { @@ -498,6 +511,7 @@ impl From<&ArcCache> for Cache { snapshot: arc_hashmap_to_hashmap(&cache.snapshot), snapshot_interval: arc_hashmap_to_hashmap(&cache.snapshot_interval), snapshot_retention: arc_hashmap_to_hashmap(&cache.snapshot_retention), + snapshot_policy: arc_hashmap_to_hashmap(&cache.snapshot_policy), stratagem_config: arc_hashmap_to_hashmap(&cache.stratagem_config), target: arc_hashmap_to_hashmap(&cache.target), target_record: arc_hashmap_to_hashmap(&cache.target_record), @@ -533,6 +547,7 @@ pub enum Record { Snapshot(SnapshotRecord), SnapshotInterval(SnapshotInterval), SnapshotRetention(SnapshotRetention), + SnapshotPolicy(SnapshotPolicy), StratagemConfig(StratagemConfiguration), Target(Target), TargetRecord(TargetRecord), @@ -564,6 +579,7 @@ pub enum ArcRecord { Snapshot(Arc), SnapshotInterval(Arc), SnapshotRetention(Arc), + SnapshotPolicy(Arc), StratagemConfig(Arc), Target(Arc>), TargetRecord(Arc), @@ -597,6 +613,7 @@ impl From for ArcRecord { Record::Snapshot(x) => Self::Snapshot(Arc::new(x)), Record::SnapshotInterval(x) => Self::SnapshotInterval(Arc::new(x)), Record::SnapshotRetention(x) => Self::SnapshotRetention(Arc::new(x)), + Record::SnapshotPolicy(x) => Self::SnapshotPolicy(Arc::new(x)), Record::Target(x) => Self::Target(Arc::new(x)), Record::TargetRecord(x) => Self::TargetRecord(Arc::new(x)), Record::User(x) => Self::User(Arc::new(x)), @@ -631,6 +648,7 @@ pub enum RecordId { Snapshot(i32), SnapshotInterval(i32), SnapshotRetention(i32), + SnapshotPolicy(i32), Target(i32), TargetRecord(i32), User(i32), @@ -664,6 +682,7 @@ impl Deref for RecordId { | Self::StratagemConfig(x) | Self::SnapshotInterval(x) | Self::SnapshotRetention(x) + | Self::SnapshotPolicy(x) | Self::Target(x) | Self::TargetRecord(x) | Self::User(x) diff --git a/migrations/20201112000000_snapshot_v2.sql b/migrations/20201112000000_snapshot_v2.sql new file mode 100644 index 0000000000..669c6782dd --- /dev/null +++ b/migrations/20201112000000_snapshot_v2.sql @@ -0,0 +1,63 @@ +CREATE TABLE IF NOT EXISTS snapshot_policy ( + id serial PRIMARY KEY, + filesystem TEXT NOT NULL UNIQUE, + interval INTERVAL NOT NULL, + barrier BOOLEAN DEFAULT false NOT NULL, + keep INT NOT NULL CHECK (keep > 0), + daily INT DEFAULT 0 NOT NULL CHECK (daily >= 0), + weekly INT DEFAULT 0 NOT NULL CHECK (weekly >= 0), + monthly INT DEFAULT 0 NOT NULL CHECK (monthly >= 0), + last_run TIMESTAMP WITH TIME ZONE +); + + +CREATE OR REPLACE FUNCTION snapshot_policy_func() RETURNS TRIGGER AS $$ +DECLARE + r snapshot_policy; +BEGIN + IF (TG_OP = 'INSERT') OR (TG_OP = 'UPDATE' AND OLD IS DISTINCT FROM NEW) + THEN + r := NEW; + ELSEIF TG_OP = 'DELETE' + THEN + r := OLD; + ELSE + r := NULL; + END IF; + + IF r IS NOT NULL + THEN + PERFORM pg_notify( + 'table_update', + notify_row(TG_OP, TG_TABLE_NAME, + json_build_object( + 'id', r.id, + 'filesystem', r.filesystem, + 'interval', interval_to_seconds(r.interval), + 'barrier', r.barrier, + 'keep', r.keep, + 'daily', r.daily, + 'weekly', r.weekly, + 'monthly', r.monthly, + 'last_run', r.last_run + ) + ) + ); + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + + +DROP TRIGGER IF EXISTS snapshot_policy_notify_update ON snapshot_policy; +CREATE TRIGGER snapshot_policy_notify_update AFTER UPDATE ON snapshot_policy +FOR EACH ROW EXECUTE PROCEDURE snapshot_policy_func(); + +DROP TRIGGER IF EXISTS snapshot_policy_notify_insert ON snapshot_policy; +CREATE TRIGGER snapshot_policy_notify_insert AFTER INSERT ON snapshot_policy +FOR EACH ROW EXECUTE PROCEDURE snapshot_policy_func(); + +DROP TRIGGER IF EXISTS snapshot_policy_notify_delete ON snapshot_policy; +CREATE TRIGGER snapshot_policy_notify_delete AFTER DELETE ON snapshot_policy +FOR EACH ROW EXECUTE PROCEDURE snapshot_policy_func(); diff --git a/sqlx-data.json b/sqlx-data.json index 8b8e1cb4e7..fd4a1f8376 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -2170,6 +2170,24 @@ ] } }, + "71b25feb0a6f2f39d6cb50e81d252016d921aaed81798aa23e3805bffc6c0e77": { + "query": "\n INSERT INTO snapshot_policy (\n filesystem,\n interval,\n barrier,\n keep,\n daily,\n weekly,\n monthly\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7)\n ON CONFLICT (filesystem)\n DO UPDATE SET\n interval = EXCLUDED.interval,\n barrier = EXCLUDED.barrier,\n keep = EXCLUDED.keep,\n daily = EXCLUDED.daily,\n weekly = EXCLUDED.weekly,\n monthly = EXCLUDED.monthly\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Interval", + "Bool", + "Int4", + "Int4", + "Int4", + "Int4" + ] + }, + "nullable": [] + } + }, "7ba0b27a4f4fca50c30701d9adbbf0c0421c310b6b99453bffb4bc7834fa765c": { "query": "\n INSERT INTO corosync_resource_managed_host (host_id, cluster_id, corosync_resource_id)\n SELECT $1, $2, corosync_resource_id FROM UNNEST($3::text[]) as corosync_resource_id\n ON CONFLICT (host_id, corosync_resource_id, cluster_id)\n DO NOTHING\n ", "describe": { @@ -2468,6 +2486,72 @@ ] } }, + "82aa67ceef3e830c8a4f4739d8e02c2fce46906173bbc19dedcc324d0183628d": { + "query": "SELECT * FROM snapshot_policy", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "filesystem", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "interval", + "type_info": "Interval" + }, + { + "ordinal": 3, + "name": "barrier", + "type_info": "Bool" + }, + { + "ordinal": 4, + "name": "keep", + "type_info": "Int4" + }, + { + "ordinal": 5, + "name": "daily", + "type_info": "Int4" + }, + { + "ordinal": 6, + "name": "weekly", + "type_info": "Int4" + }, + { + "ordinal": 7, + "name": "monthly", + "type_info": "Int4" + }, + { + "ordinal": 8, + "name": "last_run", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false, + true + ] + } + }, "857ec5f2517d25f901399ddfb8efa1ab3f73ed8c8f899692c071172b61179d1a": { "query": "select * from chroma_core_lnetconfiguration where not_deleted = 't'", "describe": { @@ -2752,6 +2836,32 @@ ] } }, + "9225bc4f7185030de710bb86e0ef3a1bbbf9e8f96e08389a1d6743beffd3ee1c": { + "query": "\n SELECT snapshot_name, create_time FROM snapshot\n WHERE filesystem_name = $1\n AND snapshot_name LIKE '\\_\\_%'\n ORDER BY create_time DESC\n LIMIT 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "snapshot_name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "create_time", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false + ] + } + }, "922e7457db1e165807273def66a5ecc6b22be1a849f5e4b06d5149ec00b5e8aa": { "query": "\n DELETE FROM chroma_core_logmessage\n WHERE id in ( \n SELECT id FROM chroma_core_logmessage ORDER BY id LIMIT $1\n )\n ", "describe": { @@ -2984,6 +3094,19 @@ ] } }, + "a0e41b98f3319a78ac985a3e7c0d5204cdceb8fd0ceb04b69d58e50abecf16b7": { + "query": "UPDATE snapshot_policy SET last_run = $1 WHERE filesystem = $2", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Text" + ] + }, + "nullable": [] + } + }, "a3269a5f7c491a332facfcf86c576350f4b9e7c14638a26d0bd17daef52c0613": { "query": "SELECT\n id,\n index,\n enclosure_index,\n failed,\n slot_number,\n health_state as \"health_state: HealthState\",\n health_state_reason,\n member_index,\n member_state as \"member_state: MemberState\",\n storage_system\n FROM chroma_core_sfadiskdrive\n ", "describe": { @@ -3056,6 +3179,32 @@ ] } }, + "a4916e07e7f17d6bddb11973898bdbd1d38c0f70d80f75897e2048209cf0df6f": { + "query": "\n SELECT snapshot_name, create_time FROM snapshot\n WHERE filesystem_name = $1\n AND snapshot_name LIKE '\\_\\_%'\n ORDER BY create_time DESC\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "snapshot_name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "create_time", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false + ] + } + }, "a5e14b628a8f67d458167f1ea5d0390aacd72b92a725bb1092e6d9c104414a7b": { "query": "\n WITH updated AS (\n INSERT INTO nid\n (net_type, host_id, nid, status, interfaces)\n SELECT net_type, host_id, nid, status, string_to_array(interfaces, ',')::text[]\n FROM UNNEST($1::text[], $2::int[], $3::text[], $4::text[], $5::text[])\n AS t(net_type, host_id, nid, status, interfaces)\n ON CONFLICT (host_id, nid)\n DO\n UPDATE SET net_type = EXCLUDED.net_type,\n status = EXCLUDED.status,\n interfaces = EXCLUDED.interfaces\n RETURNING id\n )\n\n INSERT INTO lnet\n (host_id, state, nids)\n (SELECT $6, $7, array_agg(id) from updated)\n ON CONFLICT (host_id)\n DO\n UPDATE SET nids = EXCLUDED.nids,\n state = EXCLUDED.state;\n ", "describe": { @@ -4001,6 +4150,19 @@ ] } }, + "dc27c8e53476f37d4d8b5d539be33f3b62071e35d95ebea78e15a5ce59d35550": { + "query": "\n DELETE FROM snapshot_policy\n WHERE (filesystem IS NOT DISTINCT FROM $1)\n OR (id IS NOT DISTINCT FROM $2)\n\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Int4" + ] + }, + "nullable": [] + } + }, "dc969e1cf438fc20baccd36a43cf7725bcf95cf758c6f08d5ce88bd4f378a71c": { "query": "\n INSERT INTO snapshot_retention (\n filesystem_name,\n reserve_value,\n reserve_unit,\n keep_num\n )\n VALUES ($1, $2, $3, $4)\n ON CONFLICT (filesystem_name)\n DO UPDATE SET\n reserve_value = EXCLUDED.reserve_value,\n reserve_unit = EXCLUDED.reserve_unit,\n keep_num = EXCLUDED.keep_num\n ", "describe": {