Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Implement new automatic snapshot feature
Browse files Browse the repository at this point in the history
Signed-off-by: Igor Pashev <[email protected]>
  • Loading branch information
ip1981 authored and jgrund committed Dec 10, 2020
1 parent 09405a3 commit 17df2d3
Show file tree
Hide file tree
Showing 26 changed files with 1,331 additions and 2,104 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

300 changes: 107 additions & 193 deletions iml-api/src/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@ mod filesystem;
mod stratagem;
mod task;

use crate::{
command::get_command,
error::ImlApiError,
timer::{configure_snapshot_timer, remove_snapshot_timer},
};
use crate::{command::get_command, error::ImlApiError};
use chrono::{DateTime, Utc};
use futures::{
future::{self, join_all},
Expand All @@ -25,7 +21,7 @@ use iml_wire_types::{
graphql::{ServerProfile, ServerProfileInput},
graphql_duration::GraphQLDuration,
logs::{LogResponse, Meta},
snapshot::{ReserveUnit, Snapshot, SnapshotInterval, SnapshotRetention},
snapshot::{Snapshot, SnapshotPolicy},
task::Task,
Command, EndpointName, FsType, Job, LogMessage, LogSeverity, MessageClass, SortDir,
};
Expand Down Expand Up @@ -441,47 +437,6 @@ impl QueryRoot {
Ok(commands)
}

/// List all snapshot intervals
async fn snapshot_intervals(context: &Context) -> juniper::FieldResult<Vec<SnapshotInterval>> {
let xs: Vec<SnapshotInterval> = sqlx::query!("SELECT * FROM snapshot_interval")
.fetch(&context.pg_pool)
.map_ok(|x| SnapshotInterval {
id: x.id,
filesystem_name: x.filesystem_name,
use_barrier: x.use_barrier,
interval: x.interval.into(),
last_run: x.last_run,
})
.try_collect()
.await?;

Ok(xs)
}
/// List all snapshot retention policies. Snapshots will automatically be deleted (starting with the oldest)
/// when free space falls below the defined reserve value and its associated unit.
async fn snapshot_retention_policies(
context: &Context,
) -> juniper::FieldResult<Vec<SnapshotRetention>> {
let xs: Vec<SnapshotRetention> = sqlx::query_as!(
SnapshotRetention,
r#"
SELECT
id,
filesystem_name,
reserve_value,
reserve_unit as "reserve_unit:ReserveUnit",
last_run,
keep_num
FROM snapshot_retention
"#
)
.fetch(&context.pg_pool)
.try_collect()
.await?;

Ok(xs)
}

#[graphql(arguments(
limit(description = "optional paging limit, defaults to 100",),
offset(description = "Offset into items, defaults to 0"),
Expand Down Expand Up @@ -641,27 +596,26 @@ impl QueryRoot {

Ok(mount_command)
}
}

struct SnapshotIntervalName {
id: i32,
fs_name: String,
timestamp: DateTime<Utc>,
}

fn parse_snapshot_name(name: &str) -> Option<SnapshotIntervalName> {
match name.trim().splitn(3, "-").collect::<Vec<&str>>().as_slice() {
[id, fs, ts] => {
let ts = ts.parse::<DateTime<Utc>>().ok()?;
let id = id.parse::<i32>().ok()?;

Some(SnapshotIntervalName {
id,
fs_name: fs.to_string(),
timestamp: ts,
/// List all automatic snapshot policies.
async fn snapshot_policies(context: &Context) -> juniper::FieldResult<Vec<SnapshotPolicy>> {
let xs = sqlx::query!(r#"SELECT * FROM snapshot_policy"#)
.fetch(&context.pg_pool)
.map_ok(|x| SnapshotPolicy {
id: x.id,
filesystem: x.filesystem,
interval: x.interval.into(),
barrier: x.barrier,
keep: x.keep,
daily: x.daily,
weekly: x.weekly,
monthly: x.monthly,
last_run: x.last_run,
})
}
_ => None,
.try_collect()
.await?;

Ok(xs)
}
}

Expand Down Expand Up @@ -699,22 +653,6 @@ impl MutationRoot {
let name = name.trim();
validate_snapshot_name(name)?;

let snapshot_interval_name = parse_snapshot_name(name);
if let Some(data) = snapshot_interval_name {
sqlx::query!(
r#"
UPDATE snapshot_interval
SET last_run=$1
WHERE id=$2 AND filesystem_name=$3
"#,
data.timestamp,
data.id,
data.fs_name,
)
.execute(&context.pg_pool)
.await?;
}

let active_mgs_host_fqdn = active_mgs_host_fqdn(&fsname, &context.pg_pool)
.await?
.ok_or_else(|| {
Expand Down Expand Up @@ -890,117 +828,6 @@ impl MutationRoot {
.await
.map_err(|e| e.into())
}
#[graphql(arguments(
fsname(description = "The filesystem to create snapshots with"),
interval(description = "How often a snapshot should be taken"),
use_barrier(
description = "Set write barrier before creating snapshot. The default value is `false`"
),
))]
/// Creates a new snapshot interval.
/// A recurring snapshot will be taken once the given `interval` expires for the given `fsname`.
/// In order for the snapshot to be successful, the filesystem must be available.
async fn create_snapshot_interval(
context: &Context,
fsname: String,
interval: GraphQLDuration,
use_barrier: Option<bool>,
) -> juniper::FieldResult<bool> {
let _ = fs_id_by_name(&context.pg_pool, &fsname).await?;
let maybe_id = sqlx::query!(
r#"
INSERT INTO snapshot_interval (
filesystem_name,
use_barrier,
interval
)
VALUES ($1, $2, $3)
ON CONFLICT (filesystem_name, interval)
DO NOTHING
RETURNING id
"#,
fsname,
use_barrier.unwrap_or_default(),
PgInterval::try_from(interval.0)?,
)
.fetch_optional(&context.pg_pool)
.await?
.map(|x| x.id);

if let Some(id) = maybe_id {
configure_snapshot_timer(id, fsname, interval.0, use_barrier.unwrap_or_default())
.await?;
}

Ok(true)
}
/// Removes an existing snapshot interval.
/// This will also cancel any outstanding intervals scheduled by this rule.
#[graphql(arguments(id(description = "The snapshot interval id"),))]
async fn remove_snapshot_interval(context: &Context, id: i32) -> juniper::FieldResult<bool> {
sqlx::query!("DELETE FROM snapshot_interval WHERE id=$1", id)
.execute(&context.pg_pool)
.await?;

remove_snapshot_timer(id).await?;

Ok(true)
}
#[graphql(arguments(
fsname(description = "Filesystem name"),
reserve_value(
description = "Delete the oldest snapshot when available space falls below this value"
),
reserve_unit(description = "The unit of measurement associated with the reserve_value"),
keep_num(
description = "The minimum number of snapshots to keep. This is to avoid deleting all snapshots while pursuiting the reserve goal"
)
))]
/// Creates a new snapshot retention policy for the given `fsname`.
/// Snapshots will automatically be deleted (starting with the oldest)
/// when free space falls below the defined reserve value and its associated unit.
async fn create_snapshot_retention(
context: &Context,
fsname: String,
reserve_value: i32,
reserve_unit: ReserveUnit,
keep_num: Option<i32>,
) -> juniper::FieldResult<bool> {
let _ = fs_id_by_name(&context.pg_pool, &fsname).await?;
sqlx::query!(
r#"
INSERT INTO snapshot_retention (
filesystem_name,
reserve_value,
reserve_unit,
keep_num
)
VALUES ($1, $2, $3, $4)
ON CONFLICT (filesystem_name)
DO UPDATE SET
reserve_value = EXCLUDED.reserve_value,
reserve_unit = EXCLUDED.reserve_unit,
keep_num = EXCLUDED.keep_num
"#,
fsname,
reserve_value,
reserve_unit as ReserveUnit,
keep_num.unwrap_or(0)
)
.execute(&context.pg_pool)
.await?;

Ok(true)
}
/// Remove an existing snapshot retention policy.
#[graphql(arguments(id(description = "The snapshot retention policy id")))]
async fn remove_snapshot_retention(context: &Context, id: i32) -> juniper::FieldResult<bool> {
sqlx::query!("DELETE FROM snapshot_retention WHERE id=$1", id)
.execute(&context.pg_pool)
.await?;

Ok(true)
}

/// Create a server profile.
#[graphql(arguments(profile(description = "The server profile to add")))]
Expand Down Expand Up @@ -1108,6 +935,67 @@ impl MutationRoot {
Ok(true)
}

#[graphql(arguments(
filesystem(description = "The filesystem to create snapshots with"),
interval(description = "How often a snapshot should be taken"),
use_barrier(
description = "Set write barrier before creating snapshot. The default value is `false`"
),
keep(description = "Number of the most recent snapshots to keep"),
daily(description = "Then, number of days when keep the most recent snapshot of each day"),
weekly(
description = "Then, number of weeks when keep the most recent snapshot of each week"
),
monthly(
description = "Then, number of months when keep the most recent snapshot of each month"
),
))]
/// Creates a new automatic snapshot policy.
async fn create_snapshot_policy(
context: &Context,
filesystem: String,
interval: GraphQLDuration,
barrier: Option<bool>,
keep: i32,
daily: Option<i32>,
weekly: Option<i32>,
monthly: Option<i32>,
) -> juniper::FieldResult<bool> {
sqlx::query!(
r#"
INSERT INTO snapshot_policy (
filesystem,
interval,
barrier,
keep,
daily,
weekly,
monthly
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (filesystem)
DO UPDATE SET
interval = EXCLUDED.interval,
barrier = EXCLUDED.barrier,
keep = EXCLUDED.keep,
daily = EXCLUDED.daily,
weekly = EXCLUDED.weekly,
monthly = EXCLUDED.monthly
"#,
filesystem,
PgInterval::try_from(interval.0)?,
barrier.unwrap_or(false),
keep,
daily.unwrap_or(0),
weekly.unwrap_or(0),
monthly.unwrap_or(0)
)
.execute(&context.pg_pool)
.await?;

Ok(true)
}

#[graphql(arguments(profile_name(description = "Name of the profile to remove")))]
async fn remove_server_profile(
context: &Context,
Expand Down Expand Up @@ -1139,6 +1027,32 @@ impl MutationRoot {
transaction.commit().await?;
Ok(true)
}

#[graphql(arguments(
filesystem(description = "The filesystem to remove snapshot policies for"),
id(description = "Id of the policy to remove"),
))]
/// Removes the automatic snapshot policy.
async fn remove_snapshot_policy(
context: &Context,
filesystem: Option<String>,
id: Option<i32>,
) -> juniper::FieldResult<bool> {
sqlx::query!(
r#"
DELETE FROM snapshot_policy
WHERE (filesystem IS NOT DISTINCT FROM $1)
OR (id IS NOT DISTINCT FROM $2)
"#,
filesystem,
id
)
.execute(&context.pg_pool)
.await?;

Ok(true)
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
Expand Down
1 change: 0 additions & 1 deletion iml-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ mod action;
mod command;
mod error;
mod graphql;
mod timer;

use iml_manager_env::get_pool_limit;
use iml_postgres::get_db_pool;
Expand Down
Loading

0 comments on commit 17df2d3

Please sign in to comment.