diff --git a/src/controller.rs b/src/controller.rs index eb99eb4..182c8d7 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -71,28 +71,38 @@ async fn reconcile_deleted_resource( Ok(Action::await_change()) } Err(kube::Error::Api(err)) if err.code == 404 => { - resource_sync.stop_remote_watches_if_watching(ctx).await; + stop_watches_and_remove_resource_sync_finalizers(resource_sync, name, parent_api, ctx) + .await + } + Err(err) => Err(err.into()), + } +} - let patched_finalizers = resource_sync - .finalizers_clone_or_empty() - .with_item_removed(&FINALIZER.to_string()); +async fn stop_watches_and_remove_resource_sync_finalizers( + resource_sync: Arc, + name: &str, + parent_api: &Api, + ctx: Arc, +) -> Result { + resource_sync.stop_remote_watches_if_watching(ctx).await; - // Target has been deleted, remove the finalizer from the ResourceSync - let patch = Merge(json!({ - "metadata": { - "finalizers": patched_finalizers, - }, - })); + let patched_finalizers = resource_sync + .finalizers_clone_or_empty() + .with_item_removed(&FINALIZER.to_string()); - parent_api - .patch(name, &PatchParams::default(), &patch) - .await?; + // Target has been deleted, remove the finalizer from the ResourceSync + let patch = Merge(json!({ + "metadata": { + "finalizers": patched_finalizers, + }, + })); - // We have removed our finalizer, so nothing more needs to be done - Ok(Action::await_change()) - } - Err(err) => Err(err.into()), - } + parent_api + .patch(name, &PatchParams::default(), &patch) + .await?; + + // We have removed our finalizer, so nothing more needs to be done + Ok(Action::await_change()) } async fn add_target_finalizer( @@ -197,35 +207,13 @@ async fn reconcile(resource_sync: Arc, ctx: Arc) -> Resul .ok_or(Error::NameRequired)?; let parent_api = resource_sync.api(ctx.client.clone()); - let result = { - let resource_sync = Arc::clone(&resource_sync); - - info!(?name, "running reconciler"); - - debug!(?resource_sync.spec, "got"); - let local_ns = resource_sync.namespace().ok_or(Error::NamespaceRequired)?; - - let target_api = resource_sync - .spec - .target - .api_for(ctx.client.clone(), &local_ns) - .await?; - let source_api = resource_sync - .spec - .source - .api_for(ctx.client.clone(), &local_ns) - .await?; - - match resource_sync { - resource_sync if resource_sync.has_been_deleted() => { - reconcile_deleted_resource(resource_sync, &name, target_api, &parent_api, ctx).await - } - resource_sync if !resource_sync.has_target_finalizer() => { - add_target_finalizer(resource_sync, &name, &parent_api).await - } - _ => reconcile_normally(resource_sync, &name, source_api, target_api, ctx).await, - } - }; + let result = reconcile_helper( + Arc::clone(&resource_sync), + Arc::clone(&ctx), + &name, + &parent_api, + ) + .await; let status = match &result { Err(err) => { @@ -258,6 +246,68 @@ async fn reconcile(resource_sync: Arc, ctx: Arc) -> Resul result } +async fn reconcile_helper( + resource_sync: Arc, + ctx: Arc, + name: &String, + parent_api: &Api, +) -> Result { + let resource_sync = Arc::clone(&resource_sync); + + info!(?name, "running reconciler"); + + debug!(?resource_sync.spec, "got"); + let local_ns = resource_sync.namespace().ok_or(Error::NamespaceRequired)?; + + let (source_api, target_api) = + match source_and_target_apis(&resource_sync, &ctx, local_ns).await { + Ok(apis) => apis, + Err(_) + if resource_sync.has_force_delete_option_enabled() + && resource_sync.has_been_deleted() => + { + debug!(?name, "force-deleting ResourceSync"); + return stop_watches_and_remove_resource_sync_finalizers( + resource_sync, + name, + parent_api, + ctx, + ) + .await; + } + Err(err) => return Err(err), + }; + + match resource_sync { + resource_sync if resource_sync.has_been_deleted() => { + reconcile_deleted_resource(resource_sync, name, target_api, parent_api, ctx).await + } + resource_sync if !resource_sync.has_target_finalizer() => { + add_target_finalizer(resource_sync, name, parent_api).await + } + _ => reconcile_normally(resource_sync, name, source_api, target_api, ctx).await, + } +} + +async fn source_and_target_apis( + resource_sync: &Arc, + ctx: &Arc, + local_ns: String, +) -> Result<(NamespacedApi, NamespacedApi)> { + let target_api = resource_sync + .spec + .target + .api_for(ctx.client.clone(), &local_ns) + .await?; + let source_api = resource_sync + .spec + .source + .api_for(ctx.client.clone(), &local_ns) + .await?; + + Ok((source_api, target_api)) +} + fn sync_failing_transition_time(status: &Option) -> Time { let now = Time(Utc::now()); diff --git a/src/resources.rs b/src/resources.rs index e92ae65..632d08a 100644 --- a/src/resources.rs +++ b/src/resources.rs @@ -9,6 +9,22 @@ use kube::{ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +static FORCE_DELETE_ANNOTATION: &str = "sinker.influxdata.io/force-delete"; + +impl ResourceSync { + pub fn has_force_delete_option_enabled(&self) -> bool { + self.metadata + .annotations + .as_ref() + .map(|annotations| annotations.get(FORCE_DELETE_ANNOTATION)) + .unwrap_or_default() + .cloned() + .unwrap_or_default() + .parse() + .unwrap_or_default() + } +} + #[derive(CustomResource, Debug, Serialize, Deserialize, Default, Clone, JsonSchema)] #[kube( group = "sinker.influxdata.io", @@ -142,3 +158,35 @@ impl SinkerContainer { crd } } + +#[cfg(test)] +mod tests { + use super::*; + use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; + use rstest::rstest; + use std::collections::BTreeMap; + + #[rstest] + #[case::no_annotations( + ResourceSync{metadata: Default::default(),spec: Default::default(),status: None,}, false + )] + #[case::force_delete_annotation_not_present( + ResourceSync{metadata: ObjectMeta{annotations: Some(BTreeMap::new()), ..Default::default()},spec: Default::default(),status: None,}, false + )] + #[case::force_delete_annotation_is_false( + ResourceSync{metadata: ObjectMeta{annotations: Some(BTreeMap::from([(FORCE_DELETE_ANNOTATION.to_string(), false.to_string())])), ..Default::default()},spec: Default::default(),status: None,}, false + )] + #[case::force_delete_annotation_is_other( + ResourceSync{metadata: ObjectMeta{annotations: Some(BTreeMap::from([(FORCE_DELETE_ANNOTATION.to_string(), "other".to_string())])), ..Default::default()},spec: Default::default(),status: None,}, false + )] + #[case::force_delete_annotation_is_true( + ResourceSync{metadata: ObjectMeta{annotations: Some(BTreeMap::from([(FORCE_DELETE_ANNOTATION.to_string(), true.to_string())])), ..Default::default()},spec: Default::default(),status: None,}, true + )] + #[tokio::test] + async fn test_resource_sync_has_force_delete_option_enabled( + #[case] resource_sync: ResourceSync, + #[case] expected: bool, + ) { + assert_eq!(resource_sync.has_force_delete_option_enabled(), expected); + } +}