From 701a508dfe9a46dd4db10f91443a67e0254961d3 Mon Sep 17 00:00:00 2001 From: Xudong Sun Date: Thu, 3 Oct 2024 21:29:41 -0500 Subject: [PATCH] Port shim layer to v2 (#558) Signed-off-by: Xudong Sun --- src/anvil_v2.rs | 5 - src/v2/external_shim_layer/mod.rs | 23 + .../spec/api_server/state_machine.rs | 1 - .../spec/api_server/types.rs | 3 +- .../spec/controller/types.rs | 3 +- .../kubernetes_cluster/spec/external/types.rs | 3 +- src/v2/kubernetes_cluster/spec/message.rs | 1 - src/v2/reconciler/exec/resource_builder.rs | 1 - src/v2/shim_layer/controller_runtime.rs | 434 ++++++++++++++++++ src/v2/shim_layer/fault_injection.rs | 79 ++++ src/v2/shim_layer/mod.rs | 4 + src/v2_vreplicaset_controller.rs | 43 +- 12 files changed, 569 insertions(+), 31 deletions(-) create mode 100644 src/v2/external_shim_layer/mod.rs create mode 100644 src/v2/shim_layer/controller_runtime.rs create mode 100644 src/v2/shim_layer/fault_injection.rs create mode 100644 src/v2/shim_layer/mod.rs diff --git a/src/anvil_v2.rs b/src/anvil_v2.rs index d34da39a3..afcfae555 100644 --- a/src/anvil_v2.rs +++ b/src/anvil_v2.rs @@ -1,9 +1,6 @@ // Copyright 2022 VMware, Inc. // SPDX-License-Identifier: MIT - #![allow(unused_imports)] - -pub mod external_api; pub mod kubernetes_api_objects; #[path = "v2/kubernetes_cluster/mod.rs"] pub mod kubernetes_cluster; @@ -12,5 +9,3 @@ pub mod reconciler; pub mod state_machine; pub mod temporal_logic; pub mod vstd_ext; - -use vstd::prelude::*; diff --git a/src/v2/external_shim_layer/mod.rs b/src/v2/external_shim_layer/mod.rs new file mode 100644 index 000000000..f7aa7d267 --- /dev/null +++ b/src/v2/external_shim_layer/mod.rs @@ -0,0 +1,23 @@ +// Copyright 2022 VMware, Inc. +// SPDX-License-Identifier: MIT +use crate::reconciler::exec::io::{VoidEReq, VoidEResp}; + +// A trait for the external api of a reconciler, whose core is a transition method, and the developer should wrap all +// possible operations they may need in the function. +// Input is the input type of the external api and also the ? of Request of the reconciler, i.e., it completes the +// request type of a reconciler. +// Similarly, Output is the output type of the external api, which composes the Response type of a reconciler. +// Note that we can encapsulate all the required libraries here, so each reconciler only has one ExternalAPI type. +pub trait ExternalShimLayer { + fn external_call(req: EReq) -> EResp; +} + +// An empty library that implements External Library. +// This can be used by those controllers that don't rely on a third-party library. +pub struct VoidExternalShimLayer {} + +impl ExternalShimLayer for VoidExternalShimLayer { + fn external_call(_input: VoidEReq) -> VoidEResp { + panic!("This should not be visited"); + } +} diff --git a/src/v2/kubernetes_cluster/spec/api_server/state_machine.rs b/src/v2/kubernetes_cluster/spec/api_server/state_machine.rs index 19fb79dae..e07a08ed9 100644 --- a/src/v2/kubernetes_cluster/spec/api_server/state_machine.rs +++ b/src/v2/kubernetes_cluster/spec/api_server/state_machine.rs @@ -1,7 +1,6 @@ // Copyright 2022 VMware, Inc. // SPDX-License-Identifier: MIT #![allow(unused_imports)] -use crate::external_api::spec::*; use crate::kubernetes_api_objects::error::*; use crate::kubernetes_api_objects::spec::prelude::*; use crate::kubernetes_cluster::spec::{api_server::types::*, message::*}; diff --git a/src/v2/kubernetes_cluster/spec/api_server/types.rs b/src/v2/kubernetes_cluster/spec/api_server/types.rs index 2fef4273a..4e34ca653 100644 --- a/src/v2/kubernetes_cluster/spec/api_server/types.rs +++ b/src/v2/kubernetes_cluster/spec/api_server/types.rs @@ -1,12 +1,11 @@ // Copyright 2022 VMware, Inc. // SPDX-License-Identifier: MIT #![allow(unused_imports)] -use crate::external_api::spec::*; use crate::kubernetes_api_objects::spec::prelude::*; +use crate::kubernetes_cluster::spec::message::*; use crate::state_machine::action::*; use crate::state_machine::state_machine::*; use crate::temporal_logic::defs::*; -use crate::kubernetes_cluster::spec::message::*; use crate::vstd_ext::string_view::StringView; use vstd::{multiset::*, prelude::*}; diff --git a/src/v2/kubernetes_cluster/spec/controller/types.rs b/src/v2/kubernetes_cluster/spec/controller/types.rs index 7fc5362ee..9c191e396 100644 --- a/src/v2/kubernetes_cluster/spec/controller/types.rs +++ b/src/v2/kubernetes_cluster/spec/controller/types.rs @@ -1,11 +1,10 @@ // Copyright 2022 VMware, Inc. // SPDX-License-Identifier: MIT #![allow(unused_imports)] -use crate::external_api::spec::*; use crate::kubernetes_api_objects::spec::prelude::*; +use crate::kubernetes_cluster::spec::message::*; use crate::state_machine::action::*; use crate::state_machine::state_machine::*; -use crate::kubernetes_cluster::spec::message::*; use vstd::{multiset::*, prelude::*}; verus! { diff --git a/src/v2/kubernetes_cluster/spec/external/types.rs b/src/v2/kubernetes_cluster/spec/external/types.rs index 44c3f6785..250b6b12e 100644 --- a/src/v2/kubernetes_cluster/spec/external/types.rs +++ b/src/v2/kubernetes_cluster/spec/external/types.rs @@ -1,11 +1,10 @@ // Copyright 2022 VMware, Inc. // SPDX-License-Identifier: MIT #![allow(unused_imports)] -use crate::external_api::spec::*; use crate::kubernetes_api_objects::spec::prelude::*; +use crate::kubernetes_cluster::spec::message::*; use crate::state_machine::action::*; use crate::state_machine::state_machine::*; -use crate::kubernetes_cluster::spec::message::*; use crate::temporal_logic::defs::*; use vstd::{multiset::*, prelude::*}; diff --git a/src/v2/kubernetes_cluster/spec/message.rs b/src/v2/kubernetes_cluster/spec/message.rs index 4dae1cc9d..bf3291f51 100644 --- a/src/v2/kubernetes_cluster/spec/message.rs +++ b/src/v2/kubernetes_cluster/spec/message.rs @@ -1,7 +1,6 @@ // Copyright 2022 VMware, Inc. // SPDX-License-Identifier: MIT #![allow(unused_imports)] -use crate::external_api::spec::*; use crate::kubernetes_api_objects::error::*; use crate::kubernetes_api_objects::spec::prelude::*; use crate::vstd_ext::string_view::*; diff --git a/src/v2/reconciler/exec/resource_builder.rs b/src/v2/reconciler/exec/resource_builder.rs index efb4146ae..ed806639b 100644 --- a/src/v2/reconciler/exec/resource_builder.rs +++ b/src/v2/reconciler/exec/resource_builder.rs @@ -1,7 +1,6 @@ // Copyright 2022 VMware, Inc. // SPDX-License-Identifier: MIT #![allow(unused_imports)] -use crate::external_api::exec::*; use crate::kubernetes_api_objects::exec::{api_method::*, dynamic::*, resource::*}; use crate::reconciler::exec::{io::*, reconciler::*}; use crate::reconciler::spec::resource_builder; diff --git a/src/v2/shim_layer/controller_runtime.rs b/src/v2/shim_layer/controller_runtime.rs new file mode 100644 index 000000000..5118b1d15 --- /dev/null +++ b/src/v2/shim_layer/controller_runtime.rs @@ -0,0 +1,434 @@ +// Copyright 2022 VMware, Inc. +// SPDX-License-Identifier: MIT +#![allow(unused_imports)] +use crate::external_shim_layer::*; +use crate::kubernetes_api_objects::error::*; +use crate::kubernetes_api_objects::exec::{api_method::*, dynamic::*, resource::*}; +use crate::reconciler::exec::{io::*, reconciler::*}; +use crate::shim_layer::fault_injection::*; +use builtin::*; +use builtin_macros::*; +use core::fmt::Debug; +use core::hash::Hash; +use deps_hack::anyhow::Result; +use deps_hack::futures::{Future, Stream, StreamExt, TryFuture}; +use deps_hack::kube::{ + api::{Api, DeleteParams, ListParams, ObjectMeta, PostParams, Resource}, + runtime::{ + controller::{self, Action, Controller}, + reflector, watcher, + }, + Client, CustomResource, CustomResourceExt, +}; +use deps_hack::kube_core::{ErrorResponse, NamespaceResourceScope}; +use deps_hack::serde::{de::DeserializeOwned, Serialize}; +use deps_hack::tracing::{error, info, warn}; +use deps_hack::Error; +use std::sync::Arc; +use std::time::Duration; +use vstd::{string::*, view::*}; + +// The shim layer connects the verified reconciler to the trusted kube-rs APIs. +// The key is to implement the reconcile function (impl FnMut(Arc, Arc) -> ReconcilerFut), +// which is required by the kube-rs framework to build a controller, +// on top of reconcile_core, which is provided by the developer. + +// run_controller prepares and runs the controller. It requires: +// K: the custom resource type +// R: the reconciler type +pub async fn run_controller(fault_injection: bool) -> Result<()> +where + K: Clone + + Resource + + CustomResourceExt + + DeserializeOwned + + Debug + + Send + + Serialize + + Sync + + 'static, + K::DynamicType: Default + Eq + Hash + Clone + Debug + Unpin, + R: Reconciler + Send + Sync, + R::K: ResourceWrapper + Send, + R::S: Send, + R::EReq: Send, + R::EResp: Send, + E: ExternalShimLayer, +{ + let client = Client::try_default().await?; + let crs = Api::::all(client.clone()); + + // Build the async closure on top of reconcile_with + let reconcile = |cr: Arc, ctx: Arc| async move { + return reconcile_with::(cr, ctx, fault_injection).await; + }; + + info!("starting controller"); + // TODO: the controller should also listen to the owned resources + Controller::new(crs, watcher::Config::default()) // The controller's reconcile is triggered when a CR is created/updated + .shutdown_on_signal() + .run(reconcile, error_policy, Arc::new(Data { client })) // The reconcile function is registered + .for_each(|res| async move { + match res { + Ok(o) => info!("reconciled {:?}", o), + Err(e) => info!("reconcile failed: {}", e), + } + }) + .await; + info!("controller terminated"); + Ok(()) +} + +// reconcile_with implements the reconcile function by repeatedly invoking R::reconcile_core. +// reconcile_with will be invoked by kube-rs whenever kube-rs's watcher receives any relevant event to the controller. +// In each invocation, reconcile_with invokes R::reconcile_core in a loop: +// it starts with R::reconcile_init_state, and in each iteration it invokes R::reconcile_core +// with the new state returned by the previous invocation. +// For each request from R::reconcile_core, it invokes kube-rs APIs to send the request to the Kubernetes API. +// It ends the loop when the R reports the reconcile is done (R::reconcile_done) +// or encounters error (R::reconcile_error). +pub async fn reconcile_with( + cr: Arc, + ctx: Arc, + fault_injection: bool, +) -> Result +where + K: Clone + + Resource + + CustomResourceExt + + DeserializeOwned + + Debug + + Serialize, + K::DynamicType: Default + Clone + Debug, + R: Reconciler, + R::K: ResourceWrapper, + E: ExternalShimLayer, +{ + let client = &ctx.client; + + let cr_name = cr.meta().name.as_ref().ok_or_else(|| { + Error::ShimLayerError("Custom resource misses \".metadata.name\"".to_string()) + })?; + let cr_namespace = cr.meta().namespace.as_ref().ok_or_else(|| { + Error::ShimLayerError("Custom resources misses \".metadata.namespace\"".to_string()) + })?; + let cr_kind = K::kind(&K::DynamicType::default()).to_string(); + + let cr_key = format!("{}/{}/{}", cr_kind, cr_namespace, cr_name); + let log_header = format!("Reconciling {}:", cr_key); + + let cr_api = Api::::namespaced(client.clone(), &cr_namespace); + // Get the custom resource by a quorum read to Kubernetes' storage (etcd) to get the most updated custom resource + let get_cr_resp = cr_api.get(&cr_name).await; + match get_cr_resp { + Err(deps_hack::kube_client::error::Error::Api(ErrorResponse { reason, .. })) + if &reason == "NotFound" => + { + warn!( + "{} Custom resource {} not found, end reconcile", + log_header, cr_name + ); + return Ok(Action::await_change()); + } + Err(err) => { + warn!( + "{} Get custom resource {} failed with error: {}, will retry reconcile", + log_header, cr_name, err + ); + return Ok(Action::requeue(Duration::from_secs(60))); + } + _ => {} + } + // Wrap the custom resource with Verus-friendly wrapper type (which has a ghost version, i.e., view) + let cr = get_cr_resp.unwrap(); + info!( + "{} Get cr {}", + log_header, + deps_hack::k8s_openapi::serde_json::to_string(&cr).unwrap() + ); + + let cr_wrapper = R::K::from_kube(cr); + let mut state = R::reconcile_init_state(); + let mut resp_option: Option> = None; + // check_fault_timing is only set to true right after the controller issues any create, update or delete request, + // or external request + let mut check_fault_timing: bool; + + // Call reconcile_core in a loop + loop { + check_fault_timing = false; + // If reconcile core is done, then breaks the loop + if R::reconcile_done(&state) { + info!("{} done", log_header); + break; + } + if R::reconcile_error(&state) { + warn!("{} error", log_header); + return Err(Error::ReconcileCoreError); + } + // Feed the current reconcile state and get the new state and the pending request + let (state_prime, request_option) = R::reconcile_core(&cr_wrapper, resp_option, state); + // Pattern match the request and send requests to the Kubernetes API via kube-rs methods + match request_option { + Some(request) => match request { + Request::KRequest(req) => { + let kube_resp: KubeAPIResponse; + match req { + KubeAPIRequest::GetRequest(get_req) => { + let api = Api::::namespaced_with( + client.clone(), + &get_req.namespace, + get_req.api_resource.as_kube_ref(), + ); + let key = get_req.key(); + match api.get(&get_req.name).await { + Err(err) => { + kube_resp = KubeAPIResponse::GetResponse(KubeGetResponse { + res: Err(kube_error_to_ghost(&err)), + }); + info!("{} Get {} failed with error: {}", log_header, key, err); + } + Ok(obj) => { + kube_resp = KubeAPIResponse::GetResponse(KubeGetResponse { + res: Ok(DynamicObject::from_kube(obj)), + }); + info!("{} Get {} done", log_header, key); + } + } + } + KubeAPIRequest::ListRequest(list_req) => { + let api = Api::::namespaced_with( + client.clone(), + &list_req.namespace, + list_req.api_resource.as_kube_ref(), + ); + let key = list_req.key(); + let lp = ListParams::default(); + match api.list(&lp).await { + Err(err) => { + kube_resp = KubeAPIResponse::ListResponse(KubeListResponse { + res: Err(kube_error_to_ghost(&err)), + }); + info!("{} List {} failed with error: {}", log_header, key, err); + } + Ok(obj_list) => { + kube_resp = KubeAPIResponse::ListResponse(KubeListResponse { + res: Ok(obj_list + .items + .into_iter() + .map(|obj| DynamicObject::from_kube(obj)) + .collect()), + }); + info!("{} List {} done", log_header, key); + } + } + } + KubeAPIRequest::CreateRequest(create_req) => { + check_fault_timing = true; + let api = Api::::namespaced_with( + client.clone(), + &create_req.namespace, + create_req.api_resource.as_kube_ref(), + ); + let pp = PostParams::default(); + let key = create_req.key(); + let obj_to_create = create_req.obj.into_kube(); + match api.create(&pp, &obj_to_create).await { + Err(err) => { + kube_resp = + KubeAPIResponse::CreateResponse(KubeCreateResponse { + res: Err(kube_error_to_ghost(&err)), + }); + info!( + "{} Create {} failed with error: {}", + log_header, key, err + ); + } + Ok(obj) => { + kube_resp = + KubeAPIResponse::CreateResponse(KubeCreateResponse { + res: Ok(DynamicObject::from_kube(obj)), + }); + info!("{} Create {} done", log_header, key); + } + } + } + KubeAPIRequest::DeleteRequest(delete_req) => { + check_fault_timing = true; + let api = Api::::namespaced_with( + client.clone(), + &delete_req.namespace, + delete_req.api_resource.as_kube_ref(), + ); + let mut dp = DeleteParams::default(); + if delete_req.preconditions.is_some() { + dp = dp.preconditions( + delete_req.preconditions.clone().unwrap().into_kube(), + ); + } + let key = delete_req.key(); + match api.delete(&delete_req.name, &dp).await { + Err(err) => { + kube_resp = + KubeAPIResponse::DeleteResponse(KubeDeleteResponse { + res: Err(kube_error_to_ghost(&err)), + }); + info!( + "{} Delete {} failed with error: {}", + log_header, key, err + ); + } + Ok(_) => { + kube_resp = + KubeAPIResponse::DeleteResponse(KubeDeleteResponse { + res: Ok(()), + }); + info!("{} Delete {} done", log_header, key); + } + } + } + KubeAPIRequest::UpdateRequest(update_req) => { + check_fault_timing = true; + let api = Api::::namespaced_with( + client.clone(), + &update_req.namespace, + update_req.api_resource.as_kube_ref(), + ); + let pp = PostParams::default(); + let key = update_req.key(); + let obj_to_update = update_req.obj.into_kube(); + match api.replace(&update_req.name, &pp, &obj_to_update).await { + Err(err) => { + kube_resp = + KubeAPIResponse::UpdateResponse(KubeUpdateResponse { + res: Err(kube_error_to_ghost(&err)), + }); + info!( + "{} Update {} failed with error: {}", + log_header, key, err + ); + } + Ok(obj) => { + kube_resp = + KubeAPIResponse::UpdateResponse(KubeUpdateResponse { + res: Ok(DynamicObject::from_kube(obj)), + }); + info!("{} Update {} done", log_header, key); + } + } + } + KubeAPIRequest::UpdateStatusRequest(update_status_req) => { + check_fault_timing = true; + let api = Api::::namespaced_with( + client.clone(), + &update_status_req.namespace, + update_status_req.api_resource.as_kube_ref(), + ); + let pp = PostParams::default(); + let key = update_status_req.key(); + let obj_to_update = update_status_req.obj.into_kube(); + // Here we assume serde_json always succeed + match api + .replace_status( + &update_status_req.name, + &pp, + deps_hack::k8s_openapi::serde_json::to_vec(&obj_to_update) + .unwrap(), + ) + .await + { + Err(err) => { + kube_resp = KubeAPIResponse::UpdateStatusResponse( + KubeUpdateStatusResponse { + res: Err(kube_error_to_ghost(&err)), + }, + ); + info!( + "{} UpdateStatus {} failed with error: {}", + log_header, key, err + ); + } + Ok(obj) => { + kube_resp = KubeAPIResponse::UpdateStatusResponse( + KubeUpdateStatusResponse { + res: Ok(DynamicObject::from_kube(obj)), + }, + ); + info!("{} UpdateStatus {} done", log_header, key); + } + } + } + } + resp_option = Some(Response::KResponse(kube_resp)); + } + Request::ExternalRequest(external_req) => { + check_fault_timing = true; + let external_resp = E::external_call(external_req); + resp_option = Some(Response::ExternalResponse(external_resp)); + } + }, + _ => resp_option = None, + } + if check_fault_timing && fault_injection { + // If the controller just issues create, update, delete or external request, + // and fault injection option is on, then check whether to crash at this point + let result = crash_or_continue(client, &cr_key, &log_header).await; + if result.is_err() { + error!( + "{} crash_or_continue fails due to {}", + log_header, + result.unwrap_err() + ); + } + } + state = state_prime; + } + + return Ok(Action::requeue(Duration::from_secs(60))); +} + +// error_policy defines the controller's behavior when the reconcile ends with an error. +pub fn error_policy(_object: Arc, _error: &Error, _ctx: Arc) -> Action +where + K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static, + K::DynamicType: Eq + Hash + Clone + Debug + Unpin, +{ + Action::requeue(Duration::from_secs(10)) +} + +// Data is passed to reconcile_with. +// It carries the client that communicates with Kubernetes API. +pub struct Data { + pub client: Client, +} + +// kube_error_to_ghost translates the API error from kube-rs APIs +// to the form that can be processed by reconcile_core. + +// TODO: match more error types. +pub fn kube_error_to_ghost(error: &deps_hack::kube::Error) -> APIError { + match error { + deps_hack::kube::Error::Api(error_resp) => { + if &error_resp.reason == "NotFound" { + APIError::ObjectNotFound + } else if &error_resp.reason == "AlreadyExists" { + APIError::ObjectAlreadyExists + } else if &error_resp.reason == "BadRequest" { + APIError::BadRequest + } else if &error_resp.reason == "Conflict" { + APIError::Conflict + } else if &error_resp.reason == "Invalid" { + APIError::Invalid + } else if &error_resp.reason == "InternalError" { + APIError::InternalError + } else if &error_resp.reason == "Timeout" { + APIError::Timeout + } else if &error_resp.reason == "ServerTimeout" { + APIError::ServerTimeout + } else { + APIError::Other + } + } + _ => APIError::Other, + } +} diff --git a/src/v2/shim_layer/fault_injection.rs b/src/v2/shim_layer/fault_injection.rs new file mode 100644 index 000000000..18da8cf09 --- /dev/null +++ b/src/v2/shim_layer/fault_injection.rs @@ -0,0 +1,79 @@ +// Copyright 2022 VMware, Inc. +// SPDX-License-Identifier: MIT +#![allow(unused_imports)] +use builtin::*; +use builtin_macros::*; +use core::fmt::Debug; +use deps_hack::anyhow::Result; +use deps_hack::futures::{Future, Stream, StreamExt, TryFuture}; +use deps_hack::k8s_openapi::api::core::v1::ConfigMap; +use deps_hack::kube::{ + api::{Api, ObjectMeta, PostParams, Resource}, + Client, +}; +use deps_hack::tracing::info; +use deps_hack::Error; + +pub async fn crash_or_continue( + client: &Client, + cr_key: &String, + log_header: &String, +) -> Result<(), String> { + // We require the fault injection configuration is stored by a ConfigMap object + // in the default namespace called "fault-injection-config" + let config_map_name = "fault-injection-config"; + let config_map_api = Api::::namespaced(client.clone(), "default"); + let mut config_map = config_map_api + .get(&config_map_name) + .await + .map_err(|_e| "Fail to get fault injection config".to_string())?; + info!( + "{} Get {}: {}", + log_header, + config_map_name, + deps_hack::k8s_openapi::serde_json::to_string(&config_map).unwrap() + ); + let data = config_map + .data + .as_ref() + .ok_or_else(|| "Fail to unwrap data".to_string())?; + // The configuration should tell us a cr_key and we will crash the controller when it is managing that object + // This is to make the fault injection more deterministic when the controller manages multiple cr objects of different types + let cr_key_val = data + .get("cr_key") + .ok_or_else(|| "Fail to get cr_key".to_string())?; + // We only want to crash when the controller is managing the object identified by cr_key + if cr_key_val.to_string() != cr_key.to_string() { + return Ok(()); + } + // The configuration should have the two entries: + // 1. the current number of requests that the controller has issued, and + // 2. the expected number of requests after which the controller should crash + let current_val = data + .get("current") + .ok_or_else(|| "Fail to get current".to_string())?; + let current = current_val + .parse::() + .map_err(|_e| "Fail to parse current value to i32".to_string())?; + let expected_val = data + .get("expected") + .ok_or_else(|| "Fail to get expected".to_string())?; + let expected = expected_val + .parse::() + .map_err(|_e| "Fail to parse expected value to i32".to_string())?; + // We increment current entry here before panic, otherwise we might end up crashing at the same point forever + config_map + .data + .as_mut() + .unwrap() + .insert("current".to_string(), (current + 1).to_string()); + config_map_api + .replace(config_map_name, &PostParams::default(), &config_map) + .await + .map_err(|_e| "Fail to update fault injection config".to_string())?; + if current == expected { + // Now it is time to crash according to fault-injection-config + panic!(); + } + return Ok(()); +} diff --git a/src/v2/shim_layer/mod.rs b/src/v2/shim_layer/mod.rs new file mode 100644 index 000000000..d24968b0f --- /dev/null +++ b/src/v2/shim_layer/mod.rs @@ -0,0 +1,4 @@ +// Copyright 2022 VMware, Inc. +// SPDX-License-Identifier: MIT +pub mod controller_runtime; +pub mod fault_injection; diff --git a/src/v2_vreplicaset_controller.rs b/src/v2_vreplicaset_controller.rs index 3cdbfe434..01ccc706d 100644 --- a/src/v2_vreplicaset_controller.rs +++ b/src/v2_vreplicaset_controller.rs @@ -2,19 +2,22 @@ // SPDX-License-Identifier: MIT #![allow(unused_imports)] -pub mod external_api; +#[path = "v2/external_shim_layer/mod.rs"] +pub mod external_shim_layer; pub mod kubernetes_api_objects; #[path = "v2/kubernetes_cluster/mod.rs"] pub mod kubernetes_cluster; #[path = "v2/reconciler/mod.rs"] pub mod reconciler; -// pub mod shim_layer; +#[path = "v2/shim_layer/mod.rs"] +pub mod shim_layer; pub mod state_machine; pub mod temporal_logic; #[path = "v2/controllers/vreplicaset_controller/mod.rs"] pub mod vreplicaset_controller; pub mod vstd_ext; +use crate::external_shim_layer::VoidExternalShimLayer; use crate::vreplicaset_controller::exec::reconciler::VReplicaSetReconciler; use deps_hack::anyhow::Result; use deps_hack::kube::CustomResourceExt; @@ -22,25 +25,31 @@ use deps_hack::serde_yaml; use deps_hack::tokio; use deps_hack::tracing::{error, info}; use deps_hack::tracing_subscriber; -// use shim_layer::controller_runtime::run_controller; +use shim_layer::controller_runtime::run_controller; use std::env; #[tokio::main] async fn main() -> Result<()> { - // tracing_subscriber::fmt::init(); - // let args: Vec = env::args().collect(); - // let cmd = args[1].clone(); + tracing_subscriber::fmt::init(); + let args: Vec = env::args().collect(); + let cmd = args[1].clone(); - // if cmd == String::from("export") { - // println!("{}", serde_yaml::to_string(&deps_hack::VReplicaSet::crd())?); - // } else if cmd == String::from("run") { - // info!("running v-replica-set-controller"); - // run_controller::(false).await?; - // } else if cmd == String::from("crash") { - // info!("running v-replica-set-controller in crash-testing mode"); - // run_controller::(true).await?; - // } else { - // error!("wrong command; please use \"export\", \"run\" or \"crash\""); - // } + if cmd == String::from("export") { + println!("{}", serde_yaml::to_string(&deps_hack::VReplicaSet::crd())?); + } else if cmd == String::from("run") { + info!("running vreplicaset-controller"); + run_controller::( + false, + ) + .await?; + } else if cmd == String::from("crash") { + info!("running vreplicaset-controller in crash-testing mode"); + run_controller::( + true, + ) + .await?; + } else { + error!("wrong command; please use \"export\", \"run\" or \"crash\""); + } Ok(()) }