From 171af7d1f6235e42d69cf1ad52eaa4ca9f685456 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini <1538000+ricardozanini@users.noreply.github.com> Date: Wed, 20 Dec 2023 16:07:54 -0300 Subject: [PATCH] [KOGITO-8792] - Add events to build controllers and the ability to restart a build (#318) * [KOGITO-8792] - Add events to build controllers and the ability to restart a build Signed-off-by: Ricardo Zanini * Add the ability to restart builds and signal to workflows Signed-off-by: Ricardo Zanini * Rollout deployment after a successful build Signed-off-by: Ricardo Zanini * Fix rollout deployment once a build finishes Signed-off-by: Ricardo Zanini --------- Signed-off-by: Ricardo Zanini --- api/condition_types.go | 4 +- api/status_types.go | 21 ++----- api/v1alpha08/sonataflow_types.go | 2 +- api/v1alpha08/sonataflowbuild_types.go | 4 ++ controllers/builder/containerbuilder.go | 3 + controllers/platform/defaults.go | 26 ++++---- controllers/profiles/common/deployment.go | 30 ++++++++-- controllers/profiles/common/reconciler.go | 24 ++++---- controllers/profiles/dev/profile_dev.go | 12 ++-- controllers/profiles/dev/profile_dev_test.go | 18 +++--- controllers/profiles/dev/states_dev.go | 12 +++- controllers/profiles/factory/factory.go | 7 ++- .../profiles/prod/deployment_handler.go | 16 ++--- .../profiles/prod/deployment_handler_test.go | 6 +- controllers/profiles/prod/profile_prod.go | 21 ++++--- .../profiles/prod/profile_prod_test.go | 52 +++++----------- controllers/profiles/prod/states_prod.go | 48 +++++++++++---- .../profiles/prod/states_prod_nobuild.go | 18 ++++-- controllers/sonataflow_controller.go | 3 +- controllers/sonataflow_controller_test.go | 2 +- controllers/sonataflowbuild_controller.go | 45 ++++++++++---- .../sonataflowbuild_controller_test.go | 27 +++++++++ controllers/workflows/workflows.go | 60 +++++++++++++++++++ test/kubernetes_cli.go | 12 ++++ test/mock_service.go | 58 ------------------ utils/kubernetes/annotations.go | 23 +++++++ 26 files changed, 346 insertions(+), 208 deletions(-) create mode 100644 controllers/workflows/workflows.go diff --git a/api/condition_types.go b/api/condition_types.go index 5029b0fde..69b6e3be4 100644 --- a/api/condition_types.go +++ b/api/condition_types.go @@ -51,7 +51,9 @@ const ( BuildFailedReason = "BuildFailedReason" WaitingForBuildReason = "WaitingForBuild" BuildIsRunningReason = "BuildIsRunning" - BuildSkipped = "BuildSkipped" + BuildSkippedReason = "BuildSkipped" + BuildSuccessfulReason = "BuildSuccessful" + BuildMarkedToRestartReason = "BuildMarkedToRestart" ) // Condition describes the common structure for conditions in our types diff --git a/api/status_types.go b/api/status_types.go index 46a2e6452..b3d242244 100644 --- a/api/status_types.go +++ b/api/status_types.go @@ -283,21 +283,12 @@ func (s *conditionManager) MarkUnknown(t ConditionType, reason, messageFormat st // MarkFalse sets the status of t and the ready condition to False. func (s *conditionManager) MarkFalse(t ConditionType, reason, messageFormat string, messageA ...interface{}) { - types := []ConditionType{t} - for _, cond := range s.dependents { - if cond == t { - types = append(types, s.ready) - } - } - - for _, t := range types { - s.setCondition(Condition{ - Type: t, - Status: corev1.ConditionFalse, - Reason: reason, - Message: fmt.Sprintf(messageFormat, messageA...), - }) - } + s.setCondition(Condition{ + Type: t, + Status: corev1.ConditionFalse, + Reason: reason, + Message: fmt.Sprintf(messageFormat, messageA...), + }) } // InitializeConditions updates all Conditions in the ConditionSet to Unknown diff --git a/api/v1alpha08/sonataflow_types.go b/api/v1alpha08/sonataflow_types.go index eca3b6059..169aad0a0 100644 --- a/api/v1alpha08/sonataflow_types.go +++ b/api/v1alpha08/sonataflow_types.go @@ -694,7 +694,7 @@ func (s *SonataFlowStatus) Manager() api.ConditionsManager { } func (s *SonataFlowStatus) IsWaitingForPlatform() bool { - cond := s.GetCondition(api.RunningConditionType) + cond := s.GetCondition(api.BuiltConditionType) return cond.IsFalse() && cond.Reason == api.WaitingForPlatformReason } diff --git a/api/v1alpha08/sonataflowbuild_types.go b/api/v1alpha08/sonataflowbuild_types.go index 3a3dd1789..0817936de 100644 --- a/api/v1alpha08/sonataflowbuild_types.go +++ b/api/v1alpha08/sonataflowbuild_types.go @@ -22,6 +22,7 @@ package v1alpha08 import ( "encoding/json" + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -50,6 +51,9 @@ const ( BuildPhaseError BuildPhase = "Error" ) +// BuildRestartAnnotation marks a SonataFlowBuild to restart +const BuildRestartAnnotation = metadata.Domain + "/restartBuild" + // BuildTemplate an abstraction over the actual build process performed by the platform. // +k8s:openapi-gen=true type BuildTemplate struct { diff --git a/controllers/builder/containerbuilder.go b/controllers/builder/containerbuilder.go index 8956d763e..15918a86f 100644 --- a/controllers/builder/containerbuilder.go +++ b/controllers/builder/containerbuilder.go @@ -89,6 +89,9 @@ func (c *containerBuilderManager) Schedule(build *operatorapi.SonataFlowBuild) e return err } build.Status.BuildPhase = operatorapi.BuildPhase(containerBuilder.Status.Phase) + if len(build.Status.BuildPhase) == 0 { + build.Status.BuildPhase = operatorapi.BuildPhaseInitialization + } build.Status.Error = containerBuilder.Status.Error return nil } diff --git a/controllers/platform/defaults.go b/controllers/platform/defaults.go index f5e5396e8..95d8a4c8b 100644 --- a/controllers/platform/defaults.go +++ b/controllers/platform/defaults.go @@ -22,6 +22,7 @@ package platform import ( "context" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime/pkg/client" @@ -60,24 +61,27 @@ func ConfigureDefaults(ctx context.Context, c client.Client, p *operatorapi.Sona klog.V(log.I).InfoS("Maven Timeout set", "timeout", p.Spec.Build.Config.Timeout.Duration) } - updatePlatform(ctx, c, p) - - return nil + return createOrUpdatePlatform(ctx, c, p) } -func updatePlatform(ctx context.Context, c client.Client, p *operatorapi.SonataFlowPlatform) { +func createOrUpdatePlatform(ctx context.Context, c client.Client, p *operatorapi.SonataFlowPlatform) error { config := operatorapi.SonataFlowPlatform{} - errGet := c.Get(ctx, ctrl.ObjectKey{Namespace: p.Namespace, Name: p.Name}, &config) - if errGet != nil { - klog.V(log.E).ErrorS(errGet, "Error reading the Platform") + err := c.Get(ctx, ctrl.ObjectKey{Namespace: p.Namespace, Name: p.Name}, &config) + if errors.IsNotFound(err) { + klog.V(log.D).ErrorS(err, "Platform not found, creating it") + return c.Create(ctx, p) + } else if err != nil { + klog.V(log.E).ErrorS(err, "Error reading the Platform") + return err } + config.Spec = p.Spec config.Status.Cluster = p.Status.Cluster - - updateErr := c.Update(ctx, &config) - if updateErr != nil { - klog.V(log.E).ErrorS(updateErr, "Error updating the BuildPlatform") + err = c.Update(ctx, &config) + if err != nil { + klog.V(log.E).ErrorS(err, "Error updating the BuildPlatform") } + return err } func newDefaultSonataFlowPlatform(namespace string) *operatorapi.SonataFlowPlatform { diff --git a/controllers/profiles/common/deployment.go b/controllers/profiles/common/deployment.go index f3ad8e6d6..64b677678 100644 --- a/controllers/profiles/common/deployment.go +++ b/controllers/profiles/common/deployment.go @@ -24,6 +24,7 @@ import ( "fmt" appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -36,18 +37,20 @@ import ( kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes" ) -var _ WorkflowDeploymentHandler = &deploymentHandler{} +var _ WorkflowDeploymentManager = &deploymentHandler{} -// WorkflowDeploymentHandler interface to handle workflow deployment features. -type WorkflowDeploymentHandler interface { +// WorkflowDeploymentManager interface to handle workflow deployment features. +type WorkflowDeploymentManager interface { // SyncDeploymentStatus updates the workflow status aligned with the deployment counterpart. // For example, if the deployment is in a failed state, it sets the status to // Running `false` and the Message and Reason to human-readable format. SyncDeploymentStatus(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error) + // RolloutDeployment rolls out the underlying deployment object for the given workflow. + RolloutDeployment(ctx context.Context, workflow *operatorapi.SonataFlow) error } -// DeploymentHandler creates a new WorkflowDeploymentHandler implementation based on the current profile. -func DeploymentHandler(c client.Client) WorkflowDeploymentHandler { +// DeploymentManager creates a new WorkflowDeploymentManager implementation based on the current profile. +func DeploymentManager(c client.Client) WorkflowDeploymentManager { return &deploymentHandler{c: c} } @@ -55,7 +58,22 @@ type deploymentHandler struct { c client.Client } -func (d deploymentHandler) SyncDeploymentStatus(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error) { +func (d *deploymentHandler) RolloutDeployment(ctx context.Context, workflow *operatorapi.SonataFlow) error { + deployment := &appsv1.Deployment{} + if err := d.c.Get(ctx, client.ObjectKeyFromObject(workflow), deployment); err != nil { + // Deployment not found, nothing to do. + if errors.IsNotFound(err) { + return nil + } + return err + } + if err := kubeutil.MarkDeploymentToRollout(deployment); err != nil { + return err + } + return d.c.Update(ctx, deployment) +} + +func (d *deploymentHandler) SyncDeploymentStatus(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error) { deployment := &appsv1.Deployment{} if err := d.c.Get(ctx, client.ObjectKeyFromObject(workflow), deployment); err != nil { // we should have the deployment by this time, so even if the error above is not found, we should halt. diff --git a/controllers/profiles/common/reconciler.go b/controllers/profiles/common/reconciler.go index 3e6f568de..64ecd07ac 100644 --- a/controllers/profiles/common/reconciler.go +++ b/controllers/profiles/common/reconciler.go @@ -24,6 +24,7 @@ import ( "fmt" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -36,8 +37,9 @@ import ( // StateSupport is the shared structure with common accessors used throughout the whole reconciliation profiles type StateSupport struct { - C client.Client - Catalog discovery.ServiceCatalog + C client.Client + Catalog discovery.ServiceCatalog + Recorder record.EventRecorder } // PerformStatusUpdate updates the SonataFlow Status conditions @@ -51,29 +53,23 @@ func (s StateSupport) PerformStatusUpdate(ctx context.Context, workflow *operato return true, err } -// PostReconcile function to perform all the other operations required after the reconciliation - placeholder for null pattern usages -func (s StateSupport) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error { - //By default, we don't want to perform anything after the reconciliation, and so we will simply return no error - return nil -} - -// BaseReconciler is the base structure used by every reconciliation profile. -// Use NewBaseProfileReconciler to build a new reference. -type BaseReconciler struct { +// Reconciler is the base structure used by every reconciliation profile. +// Use NewReconciler to build a new reference. +type Reconciler struct { *StateSupport reconciliationStateMachine *ReconciliationStateMachine objects []client.Object } -func NewBaseProfileReconciler(support *StateSupport, stateMachine *ReconciliationStateMachine) BaseReconciler { - return BaseReconciler{ +func NewReconciler(support *StateSupport, stateMachine *ReconciliationStateMachine) Reconciler { + return Reconciler{ StateSupport: support, reconciliationStateMachine: stateMachine, } } // Reconcile does the actual reconciliation algorithm based on a set of ReconciliationState -func (b *BaseReconciler) Reconcile(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error) { +func (b *Reconciler) Reconcile(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error) { workflow.Status.Manager().InitializeConditions() result, objects, err := b.reconciliationStateMachine.do(ctx, workflow) if err != nil { diff --git a/controllers/profiles/dev/profile_dev.go b/controllers/profiles/dev/profile_dev.go index fbc6928a4..959d73ae1 100644 --- a/controllers/profiles/dev/profile_dev.go +++ b/controllers/profiles/dev/profile_dev.go @@ -21,6 +21,7 @@ package dev import ( "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -34,17 +35,18 @@ import ( var _ profiles.ProfileReconciler = &developmentProfile{} type developmentProfile struct { - common.BaseReconciler + common.Reconciler } func (d developmentProfile) GetProfile() metadata.ProfileType { return metadata.DevProfile } -func NewProfileReconciler(client client.Client) profiles.ProfileReconciler { +func NewProfileReconciler(client client.Client, recorder record.EventRecorder) profiles.ProfileReconciler { support := &common.StateSupport{ - C: client, - Catalog: discovery.NewServiceCatalog(client), + C: client, + Catalog: discovery.NewServiceCatalog(client), + Recorder: recorder, } var ensurers *objectEnsurers @@ -63,7 +65,7 @@ func NewProfileReconciler(client client.Client) profiles.ProfileReconciler { &recoverFromFailureState{StateSupport: support}) profile := &developmentProfile{ - BaseReconciler: common.NewBaseProfileReconciler(support, stateMachine), + Reconciler: common.NewReconciler(support, stateMachine), } klog.V(log.I).InfoS("Reconciling in", "profile", profile.GetProfile()) diff --git a/controllers/profiles/dev/profile_dev_test.go b/controllers/profiles/dev/profile_dev_test.go index f8c2b3f7d..b09ec12e9 100644 --- a/controllers/profiles/dev/profile_dev_test.go +++ b/controllers/profiles/dev/profile_dev_test.go @@ -55,7 +55,7 @@ func Test_OverrideStartupProbe(t *testing.T) { client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() - devReconciler := NewProfileReconciler(client) + devReconciler := NewProfileReconciler(client, test.NewFakeRecorder()) result, err := devReconciler.Reconcile(context.TODO(), workflow) assert.NoError(t, err) @@ -82,7 +82,7 @@ func Test_recoverFromFailureNoDeployment(t *testing.T) { workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.DeploymentFailureReason, "") client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() - reconciler := NewProfileReconciler(client) + reconciler := NewProfileReconciler(client, test.NewFakeRecorder()) // we are in failed state and have no objects result, err := reconciler.Reconcile(context.TODO(), workflow) @@ -123,7 +123,7 @@ func Test_newDevProfile(t *testing.T) { client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() - devReconciler := NewProfileReconciler(client) + devReconciler := NewProfileReconciler(client, test.NewFakeRecorder()) result, err := devReconciler.Reconcile(context.TODO(), workflow) assert.NoError(t, err) @@ -196,7 +196,7 @@ func Test_newDevProfile(t *testing.T) { func Test_devProfileImageDefaultsNoPlatform(t *testing.T) { workflow := test.GetBaseSonataFlowWithDevProfile(t.Name()) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() - devReconciler := NewProfileReconciler(client) + devReconciler := NewProfileReconciler(client, test.NewFakeRecorder()) result, err := devReconciler.Reconcile(context.TODO(), workflow) assert.NoError(t, err) @@ -213,7 +213,7 @@ func Test_devProfileWithImageSnapshotOverrideWithPlatform(t *testing.T) { platform := test.GetBasePlatformWithDevBaseImageInReadyPhase(workflow.Namespace) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build() - devReconciler := NewProfileReconciler(client) + devReconciler := NewProfileReconciler(client, test.NewFakeRecorder()) result, err := devReconciler.Reconcile(context.TODO(), workflow) assert.NoError(t, err) @@ -230,7 +230,7 @@ func Test_devProfileWithWPlatformWithoutDevBaseImageAndWithBaseImage(t *testing. platform := test.GetBasePlatformWithBaseImageInReadyPhase(workflow.Namespace) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build() - devReconciler := NewProfileReconciler(client) + devReconciler := NewProfileReconciler(client, test.NewFakeRecorder()) result, err := devReconciler.Reconcile(context.TODO(), workflow) assert.NoError(t, err) @@ -247,7 +247,7 @@ func Test_devProfileWithPlatformWithoutDevBaseImageAndWithoutBaseImage(t *testin platform := test.GetBasePlatformInReadyPhase(workflow.Namespace) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build() - devReconciler := NewProfileReconciler(client) + devReconciler := NewProfileReconciler(client, test.NewFakeRecorder()) result, err := devReconciler.Reconcile(context.TODO(), workflow) assert.NoError(t, err) @@ -266,7 +266,7 @@ func Test_newDevProfileWithExternalConfigMaps(t *testing.T) { client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() - devReconciler := NewProfileReconciler(client) + devReconciler := NewProfileReconciler(client, test.NewFakeRecorder()) camelXmlRouteFileName := "camelroute-xml" xmlRoute := ` @@ -380,7 +380,7 @@ func Test_VolumeWithCapitalizedPaths(t *testing.T) { client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, configMap).WithStatusSubresource(workflow, configMap).Build() - devReconciler := NewProfileReconciler(client) + devReconciler := NewProfileReconciler(client, test.NewFakeRecorder()) result, err := devReconciler.Reconcile(context.TODO(), workflow) assert.NoError(t, err) diff --git a/controllers/profiles/dev/states_dev.go b/controllers/profiles/dev/states_dev.go index 2726880da..e965547ef 100644 --- a/controllers/profiles/dev/states_dev.go +++ b/controllers/profiles/dev/states_dev.go @@ -135,6 +135,11 @@ func (e *ensureRunningWorkflowState) Do(ctx context.Context, workflow *operatora return ctrl.Result{RequeueAfter: constants.RequeueAfterIsRunning}, objs, nil } +func (e *ensureRunningWorkflowState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error { + //By default, we don't want to perform anything after the reconciliation, and so we will simply return no error + return nil +} + type followWorkflowDeploymentState struct { *common.StateSupport enrichers *statusEnrichers @@ -145,7 +150,7 @@ func (f *followWorkflowDeploymentState) CanReconcile(workflow *operatorapi.Sonat } func (f *followWorkflowDeploymentState) Do(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) { - result, err := common.DeploymentHandler(f.C).SyncDeploymentStatus(ctx, workflow) + result, err := common.DeploymentManager(f.C).SyncDeploymentStatus(ctx, workflow) if err != nil { return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, nil, err } @@ -247,3 +252,8 @@ func (r *recoverFromFailureState) Do(ctx context.Context, workflow *operatorapi. } return ctrl.Result{RequeueAfter: constants.RequeueRecoverDeploymentErrorInterval}, nil, nil } + +func (r *recoverFromFailureState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error { + //By default, we don't want to perform anything after the reconciliation, and so we will simply return no error + return nil +} diff --git a/controllers/profiles/factory/factory.go b/controllers/profiles/factory/factory.go index 04a91f3e1..8cf279598 100644 --- a/controllers/profiles/factory/factory.go +++ b/controllers/profiles/factory/factory.go @@ -20,6 +20,7 @@ package factory import ( + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" @@ -35,7 +36,7 @@ const ( opsProfile metadata.ProfileType = "prod_for_ops" ) -type reconcilerBuilder func(client client.Client) profiles.ProfileReconciler +type reconcilerBuilder func(client client.Client, recorder record.EventRecorder) profiles.ProfileReconciler var profileBuilders = map[metadata.ProfileType]reconcilerBuilder{ metadata.ProdProfile: prod.NewProfileReconciler, @@ -58,6 +59,6 @@ func profileBuilder(workflow *operatorapi.SonataFlow) reconcilerBuilder { } // NewReconciler creates a new ProfileReconciler based on the given workflow and context. -func NewReconciler(client client.Client, workflow *operatorapi.SonataFlow) profiles.ProfileReconciler { - return profileBuilder(workflow)(client) +func NewReconciler(client client.Client, recorder record.EventRecorder, workflow *operatorapi.SonataFlow) profiles.ProfileReconciler { + return profileBuilder(workflow)(client, recorder) } diff --git a/controllers/profiles/prod/deployment_handler.go b/controllers/profiles/prod/deployment_handler.go index bf31f6a25..8bced1f52 100644 --- a/controllers/profiles/prod/deployment_handler.go +++ b/controllers/profiles/prod/deployment_handler.go @@ -31,23 +31,23 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/utils" ) -type deploymentHandler struct { +type deploymentReconciler struct { *common.StateSupport ensurers *objectEnsurers } -func newDeploymentHandler(stateSupport *common.StateSupport, ensurer *objectEnsurers) *deploymentHandler { - return &deploymentHandler{ +func newDeploymentReconciler(stateSupport *common.StateSupport, ensurer *objectEnsurers) *deploymentReconciler { + return &deploymentReconciler{ StateSupport: stateSupport, ensurers: ensurer, } } -func (d *deploymentHandler) handle(ctx context.Context, workflow *operatorapi.SonataFlow) (reconcile.Result, []client.Object, error) { - return d.handleWithImage(ctx, workflow, "") +func (d *deploymentReconciler) reconcile(ctx context.Context, workflow *operatorapi.SonataFlow) (reconcile.Result, []client.Object, error) { + return d.reconcileWithBuiltImage(ctx, workflow, "") } -func (d *deploymentHandler) handleWithImage(ctx context.Context, workflow *operatorapi.SonataFlow, image string) (reconcile.Result, []client.Object, error) { +func (d *deploymentReconciler) reconcileWithBuiltImage(ctx context.Context, workflow *operatorapi.SonataFlow, image string) (reconcile.Result, []client.Object, error) { pl, _ := platform.GetActivePlatform(ctx, d.C, workflow.Namespace) propsCM, _, err := d.ensurers.propertiesConfigMap.Ensure(ctx, workflow, common.WorkflowPropertiesMutateVisitor(ctx, d.StateSupport.Catalog, workflow, pl)) if err != nil { @@ -86,7 +86,7 @@ func (d *deploymentHandler) handleWithImage(ctx context.Context, workflow *opera } // Follow deployment status - result, err := common.DeploymentHandler(d.C).SyncDeploymentStatus(ctx, workflow) + result, err := common.DeploymentManager(d.C).SyncDeploymentStatus(ctx, workflow) if err != nil { return reconcile.Result{Requeue: false}, nil, err } @@ -97,7 +97,7 @@ func (d *deploymentHandler) handleWithImage(ctx context.Context, workflow *opera return result, objs, nil } -func (d *deploymentHandler) getDeploymentMutateVisitors( +func (d *deploymentReconciler) getDeploymentMutateVisitors( workflow *operatorapi.SonataFlow, image string, configMap *v1.ConfigMap) []common.MutateVisitor { diff --git a/controllers/profiles/prod/deployment_handler_test.go b/controllers/profiles/prod/deployment_handler_test.go index bde141597..21513a4bc 100644 --- a/controllers/profiles/prod/deployment_handler_test.go +++ b/controllers/profiles/prod/deployment_handler_test.go @@ -33,9 +33,9 @@ func Test_CheckPodTemplateChangesReflectDeployment(t *testing.T) { WithStatusSubresource(workflow). Build() stateSupport := fakeReconcilerSupport(client) - handler := newDeploymentHandler(stateSupport, newObjectEnsurers(stateSupport)) + handler := newDeploymentReconciler(stateSupport, newObjectEnsurers(stateSupport)) - result, objects, err := handler.handle(context.TODO(), workflow) + result, objects, err := handler.reconcile(context.TODO(), workflow) assert.NoError(t, err) assert.NotEmpty(t, objects) assert.True(t, result.Requeue) @@ -44,7 +44,7 @@ func Test_CheckPodTemplateChangesReflectDeployment(t *testing.T) { expectedImg := "quay.io/apache/my-new-workflow:1.0.0" workflow.Spec.PodTemplate.Container.Image = expectedImg utilruntime.Must(client.Update(context.TODO(), workflow)) - result, objects, err = handler.handle(context.TODO(), workflow) + result, objects, err = handler.reconcile(context.TODO(), workflow) assert.NoError(t, err) assert.NotEmpty(t, objects) assert.True(t, result.Requeue) diff --git a/controllers/profiles/prod/profile_prod.go b/controllers/profiles/prod/profile_prod.go index a6048f591..3254950f5 100644 --- a/controllers/profiles/prod/profile_prod.go +++ b/controllers/profiles/prod/profile_prod.go @@ -23,6 +23,7 @@ import ( "time" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery" + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" @@ -34,7 +35,7 @@ import ( var _ profiles.ProfileReconciler = &prodProfile{} type prodProfile struct { - common.BaseReconciler + common.Reconciler } const ( @@ -64,10 +65,11 @@ func newObjectEnsurers(support *common.StateSupport) *objectEnsurers { // NewProfileReconciler the default profile builder which includes a build state to run an internal build process // to have an immutable workflow image deployed -func NewProfileReconciler(client client.Client) profiles.ProfileReconciler { +func NewProfileReconciler(client client.Client, recorder record.EventRecorder) profiles.ProfileReconciler { support := &common.StateSupport{ - C: client, - Catalog: discovery.NewServiceCatalog(client), + C: client, + Catalog: discovery.NewServiceCatalog(client), + Recorder: recorder, } // the reconciliation state machine stateMachine := common.NewReconciliationStateMachine( @@ -76,7 +78,7 @@ func NewProfileReconciler(client client.Client) profiles.ProfileReconciler { &deployWithBuildWorkflowState{StateSupport: support, ensurers: newObjectEnsurers(support)}, ) reconciler := &prodProfile{ - BaseReconciler: common.NewBaseProfileReconciler(support, stateMachine), + Reconciler: common.NewReconciler(support, stateMachine), } return reconciler @@ -84,10 +86,11 @@ func NewProfileReconciler(client client.Client) profiles.ProfileReconciler { // NewProfileForOpsReconciler creates an alternative prod profile that won't require to build the workflow image in order to deploy // the workflow application. It assumes that the image has been built somewhere else. -func NewProfileForOpsReconciler(client client.Client) profiles.ProfileReconciler { +func NewProfileForOpsReconciler(client client.Client, recorder record.EventRecorder) profiles.ProfileReconciler { support := &common.StateSupport{ - C: client, - Catalog: discovery.NewServiceCatalog(client), + C: client, + Catalog: discovery.NewServiceCatalog(client), + Recorder: recorder, } // the reconciliation state machine stateMachine := common.NewReconciliationStateMachine( @@ -95,7 +98,7 @@ func NewProfileForOpsReconciler(client client.Client) profiles.ProfileReconciler &followDeployWorkflowState{StateSupport: support, ensurers: newObjectEnsurers(support)}, ) reconciler := &prodProfile{ - BaseReconciler: common.NewBaseProfileReconciler(support, stateMachine), + Reconciler: common.NewReconciler(support, stateMachine), } return reconciler diff --git a/controllers/profiles/prod/profile_prod_test.go b/controllers/profiles/prod/profile_prod_test.go index efa7041bb..689ac3f4e 100644 --- a/controllers/profiles/prod/profile_prod_test.go +++ b/controllers/profiles/prod/profile_prod_test.go @@ -24,17 +24,14 @@ import ( "testing" "time" - corev1 "k8s.io/api/core/v1" - + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common" - + "github.com/apache/incubator-kie-kogito-serverless-operator/test" "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" clientruntime "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/apache/incubator-kie-kogito-serverless-operator/api" - operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" - "github.com/apache/incubator-kie-kogito-serverless-operator/test" ) func Test_Reconciler_ProdOps(t *testing.T) { @@ -47,18 +44,18 @@ func Test_Reconciler_ProdOps(t *testing.T) { client := test.NewSonataFlowClientBuilder(). WithRuntimeObjects(workflow). WithStatusSubresource(workflow, &operatorapi.SonataFlowBuild{}).Build() - result, err := NewProfileForOpsReconciler(client).Reconcile(context.TODO(), workflow) + result, err := NewProfileForOpsReconciler(client, test.NewFakeRecorder()).Reconcile(context.TODO(), workflow) assert.NoError(t, err) assert.NotNil(t, result.RequeueAfter) assert.True(t, workflow.Status.GetCondition(api.BuiltConditionType).IsFalse()) - assert.Equal(t, api.BuildSkipped, workflow.Status.GetCondition(api.BuiltConditionType).Reason) + assert.Equal(t, api.BuildSkippedReason, workflow.Status.GetCondition(api.BuiltConditionType).Reason) // We need the deployment controller to tell us that the workflow is ready // Since we don't have it in a mocked env, the result must be ready == false assert.False(t, workflow.Status.IsReady()) // Reconcile again to run the ddeployment handler - result, err = NewProfileForOpsReconciler(client).Reconcile(context.TODO(), workflow) + result, err = NewProfileForOpsReconciler(client, test.NewFakeRecorder()).Reconcile(context.TODO(), workflow) assert.NoError(t, err) // Let's check for the right creation of the workflow (one CM volume, one container with a custom image) @@ -86,7 +83,7 @@ func Test_Reconciler_ProdCustomPod(t *testing.T) { client := test.NewSonataFlowClientBuilder(). WithRuntimeObjects(workflow, build, platform). WithStatusSubresource(workflow, build, platform).Build() - _, err := NewProfileReconciler(client).Reconcile(context.TODO(), workflow) + _, err := NewProfileReconciler(client, test.NewFakeRecorder()).Reconcile(context.TODO(), workflow) assert.NoError(t, err) // Let's check for the right creation of the workflow (one CM volume, one container with a custom image) @@ -107,7 +104,7 @@ func Test_reconcilerProdBuildConditions(t *testing.T) { WithRuntimeObjects(workflow, platform). WithStatusSubresource(workflow, platform, &operatorapi.SonataFlowBuild{}).Build() - result, err := NewProfileReconciler(client).Reconcile(context.TODO(), workflow) + result, err := NewProfileReconciler(client, test.NewFakeRecorder()).Reconcile(context.TODO(), workflow) assert.NoError(t, err) assert.NotNil(t, result.RequeueAfter) @@ -115,7 +112,7 @@ func Test_reconcilerProdBuildConditions(t *testing.T) { assert.False(t, workflow.Status.IsReady()) // still building - result, err = NewProfileReconciler(client).Reconcile(context.TODO(), workflow) + result, err = NewProfileReconciler(client, test.NewFakeRecorder()).Reconcile(context.TODO(), workflow) assert.NoError(t, err) assert.Equal(t, requeueWhileWaitForBuild, result.RequeueAfter) assert.True(t, workflow.Status.IsBuildRunningOrUnknown()) @@ -128,15 +125,15 @@ func Test_reconcilerProdBuildConditions(t *testing.T) { assert.NoError(t, client.Status().Update(context.TODO(), build)) // last reconciliation cycle waiting for build - result, err = NewProfileReconciler(client).Reconcile(context.TODO(), workflow) + result, err = NewProfileReconciler(client, test.NewFakeRecorder()).Reconcile(context.TODO(), workflow) assert.NoError(t, err) assert.Equal(t, requeueWhileWaitForBuild, result.RequeueAfter) assert.False(t, workflow.Status.IsBuildRunningOrUnknown()) assert.False(t, workflow.Status.IsReady()) - assert.Equal(t, api.WaitingForBuildReason, workflow.Status.GetTopLevelCondition().Reason) + assert.Equal(t, api.WaitingForDeploymentReason, workflow.Status.GetTopLevelCondition().Reason) // now we create the objects - result, err = NewProfileReconciler(client).Reconcile(context.TODO(), workflow) + result, err = NewProfileReconciler(client, test.NewFakeRecorder()).Reconcile(context.TODO(), workflow) assert.NoError(t, err) assert.False(t, workflow.Status.IsBuildRunningOrUnknown()) assert.False(t, workflow.Status.IsReady()) @@ -154,7 +151,7 @@ func Test_reconcilerProdBuildConditions(t *testing.T) { err = client.Status().Update(context.TODO(), deployment) assert.NoError(t, err) - result, err = NewProfileReconciler(client).Reconcile(context.TODO(), workflow) + result, err = NewProfileReconciler(client, test.NewFakeRecorder()).Reconcile(context.TODO(), workflow) assert.NoError(t, err) assert.False(t, workflow.Status.IsBuildRunningOrUnknown()) assert.True(t, workflow.Status.IsReady()) @@ -187,24 +184,6 @@ func Test_deployWorkflowReconciliationHandler_handleObjects(t *testing.T) { assert.NoError(t, err) assert.False(t, workflow.Status.IsReady()) assert.Equal(t, api.WaitingForDeploymentReason, workflow.Status.GetTopLevelCondition().Reason) - - // let's mess with the deployment - /* TODO the state should be able to enforce: https://issues.redhat.com/browse/KOGITO-8524 - deployment.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort = 9090 - err = client.Update(context.TODO(), deployment) - assert.NoError(t, err) - result, objects, err = handler.Do(context.TODO(), workflow) - assert.True(t, result.Requeue) - assert.NoError(t, err) - assert.NotNil(t, result) - assert.Len(t, objects, 2) - // the reconciliation state should guarantee our port - deployment = &appsv1.Deployment{} - err = client.Get(context.TODO(), clientruntime.ObjectKeyFromObject(workflow), deployment) - assert.NoError(t, err) - assert.Equal(t, int32(8080), deployment.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort) - */ - } func Test_GenerationAnnotationCheck(t *testing.T) { @@ -248,6 +227,7 @@ func Test_GenerationAnnotationCheck(t *testing.T) { func fakeReconcilerSupport(client clientruntime.Client) *common.StateSupport { return &common.StateSupport{ - C: client, + C: client, + Recorder: test.NewFakeRecorder(), } } diff --git a/controllers/profiles/prod/states_prod.go b/controllers/profiles/prod/states_prod.go index 9770e8ea2..feb30dca6 100644 --- a/controllers/profiles/prod/states_prod.go +++ b/controllers/profiles/prod/states_prod.go @@ -22,6 +22,7 @@ package prod import ( "context" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -56,6 +57,8 @@ func (h *newBuilderState) Do(ctx context.Context, workflow *operatorapi.SonataFl _, err = h.PerformStatusUpdate(ctx, workflow) return ctrl.Result{RequeueAfter: requeueWhileWaitForPlatform}, nil, err } + // We won't record events here to avoid spamming multiple events to the object, the status should alert the admin + // since a namespace without a platform means incorrect configuration. klog.V(log.E).ErrorS(err, "Failed to get active platform") return ctrl.Result{RequeueAfter: requeueWhileWaitForPlatform}, nil, err } @@ -66,7 +69,7 @@ func (h *newBuilderState) Do(ctx context.Context, workflow *operatorapi.SonataFl if err != nil { //If we are not able to retrieve or create a Build CR for this Workflow we will mark klog.V(log.E).ErrorS(err, "Failed to retrieve or create a Build CR") - workflow.Status.Manager().MarkFalse(api.BuiltConditionType, api.WaitingForBuildReason, + workflow.Status.Manager().MarkFalse(api.BuiltConditionType, api.BuildFailedReason, "Failed to retrieve or create a Build CR", workflow.Namespace) _, err = h.PerformStatusUpdate(ctx, workflow) return ctrl.Result{}, nil, err @@ -76,20 +79,25 @@ func (h *newBuilderState) Do(ctx context.Context, workflow *operatorapi.SonataFl workflow.Status.Manager().MarkFalse(api.BuiltConditionType, api.BuildIsRunningReason, "") workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.WaitingForBuildReason, "") _, err = h.PerformStatusUpdate(ctx, workflow) + h.Recorder.Eventf(workflow, corev1.EventTypeNormal, api.BuildIsRunningReason, "Workflow %s build has started.", workflow.Name) } else { - // TODO: not ideal, but we will improve it on https://issues.redhat.com/browse/KOGITO-8792 - klog.V(log.I).InfoS("Build is in failed state, try to delete the SonataFlowBuild to restart a new build cycle") + klog.V(log.I).InfoS("Build is in failed state, you can mark the build to rebuild by setting to 'true' the ", "annotation", operatorapi.BuildRestartAnnotation) } return ctrl.Result{RequeueAfter: requeueAfterStartingBuild}, nil, err } +func (h *newBuilderState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error { + //By default, we don't want to perform anything after the reconciliation, and so we will simply return no error + return nil +} + type followBuildStatusState struct { *common.StateSupport } func (h *followBuildStatusState) CanReconcile(workflow *operatorapi.SonataFlow) bool { - return workflow.Status.IsBuildRunningOrUnknown() + return workflow.Status.IsBuildRunningOrUnknown() || workflow.Status.IsWaitingForBuild() } func (h *followBuildStatusState) Do(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) { @@ -101,24 +109,32 @@ func (h *followBuildStatusState) Do(ctx context.Context, workflow *operatorapi.S if _, err = h.PerformStatusUpdate(ctx, workflow); err != nil { return ctrl.Result{}, nil, err } - return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, nil, nil + return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, nil, err } if build.Status.BuildPhase == operatorapi.BuildPhaseSucceeded { klog.V(log.I).InfoS("Workflow build has finished") + if workflow.Status.IsReady() { + // Rollout our deployment to take the latest changes in the new image. + if err := common.DeploymentManager(h.C).RolloutDeployment(ctx, workflow); err != nil { + return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, nil, err + } + h.Recorder.Eventf(workflow, corev1.EventTypeNormal, api.WaitingForDeploymentReason, "Rolling out workflow %s deployment.", workflow.Name) + } + workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.WaitingForDeploymentReason, "Build has finished, rolling out deployment") //If we have finished a build and the workflow is not running, we will start the provisioning phase workflow.Status.Manager().MarkTrue(api.BuiltConditionType) _, err = h.PerformStatusUpdate(ctx, workflow) + h.Recorder.Eventf(workflow, corev1.EventTypeNormal, api.BuildSuccessfulReason, "Workflow %s build has been finished successfully.", workflow.Name) } else if build.Status.BuildPhase == operatorapi.BuildPhaseFailed || build.Status.BuildPhase == operatorapi.BuildPhaseError { - // TODO: we should handle build failures https://issues.redhat.com/browse/KOGITO-8792 - // TODO: ideally, we can have a configuration per platform of how many attempts we try to rebuild - // TODO: to rebuild, just do buildManager.MarkToRestart. The controller will then try to rebuild the workflow. workflow.Status.Manager().MarkFalse(api.BuiltConditionType, api.BuildFailedReason, "Workflow %s build failed. Error: %s", workflow.Name, build.Status.Error) _, err = h.PerformStatusUpdate(ctx, workflow) + h.Recorder.Eventf(workflow, corev1.EventTypeWarning, api.BuildFailedReason, "Workflow %s build has failed. Error: %s", workflow.Name, build.Status.Error) } else if build.Status.BuildPhase == operatorapi.BuildPhaseRunning && !workflow.Status.IsBuildRunning() { workflow.Status.Manager().MarkFalse(api.BuiltConditionType, api.BuildIsRunningReason, "") _, err = h.PerformStatusUpdate(ctx, workflow) + h.Recorder.Eventf(workflow, corev1.EventTypeNormal, api.BuildIsRunningReason, "Workflow %s build is running.", workflow.Name) } if err != nil { @@ -128,6 +144,11 @@ func (h *followBuildStatusState) Do(ctx context.Context, workflow *operatorapi.S return ctrl.Result{RequeueAfter: requeueWhileWaitForBuild}, nil, nil } +func (h *followBuildStatusState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error { + //By default, we don't want to perform anything after the reconciliation, and so we will simply return no error + return nil +} + type deployWithBuildWorkflowState struct { *common.StateSupport ensurers *objectEnsurers @@ -157,18 +178,23 @@ func (h *deployWithBuildWorkflowState) Do(ctx context.Context, workflow *operato } if h.isWorkflowChanged(workflow) { // Let's check that the 2 resWorkflowDef definition are different - workflow.Status.Manager().MarkUnknown(api.RunningConditionType, "", "") if err = buildManager.MarkToRestart(build); err != nil { return ctrl.Result{}, nil, err } - workflow.Status.Manager().MarkFalse(api.BuiltConditionType, api.BuildIsRunningReason, "Marked to restart") + workflow.Status.Manager().MarkFalse(api.BuiltConditionType, api.BuildIsRunningReason, "Build marked to restart") workflow.Status.Manager().MarkUnknown(api.RunningConditionType, "", "") _, err = h.PerformStatusUpdate(ctx, workflow) + h.Recorder.Eventf(workflow, corev1.EventTypeNormal, api.BuildMarkedToRestartReason, "Workflow %s will start a new build.", workflow.Name) return ctrl.Result{Requeue: false}, nil, err } // didn't change, business as usual - return newDeploymentHandler(h.StateSupport, h.ensurers).handleWithImage(ctx, workflow, build.Status.ImageTag) + return newDeploymentReconciler(h.StateSupport, h.ensurers).reconcileWithBuiltImage(ctx, workflow, build.Status.ImageTag) +} + +func (h *deployWithBuildWorkflowState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error { + //By default, we don't want to perform anything after the reconciliation, and so we will simply return no error + return nil } // isWorkflowChanged marks the workflow status as unknown to require a new build reconciliation diff --git a/controllers/profiles/prod/states_prod_nobuild.go b/controllers/profiles/prod/states_prod_nobuild.go index b80b58c93..46449ce1f 100644 --- a/controllers/profiles/prod/states_prod_nobuild.go +++ b/controllers/profiles/prod/states_prod_nobuild.go @@ -33,12 +33,12 @@ type ensureBuildSkipped struct { func (f *ensureBuildSkipped) CanReconcile(workflow *operatorapi.SonataFlow) bool { return workflow.Status.GetCondition(api.BuiltConditionType).IsUnknown() || workflow.Status.GetCondition(api.BuiltConditionType).IsTrue() || - workflow.Status.GetCondition(api.BuiltConditionType).Reason != api.BuildSkipped + workflow.Status.GetCondition(api.BuiltConditionType).Reason != api.BuildSkippedReason } func (f *ensureBuildSkipped) Do(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) { // We skip the build, so let's ensure the status reflect that - workflow.Status.Manager().MarkFalse(api.BuiltConditionType, api.BuildSkipped, "") + workflow.Status.Manager().MarkFalse(api.BuiltConditionType, api.BuildSkippedReason, "") if _, err := f.PerformStatusUpdate(ctx, workflow); err != nil { return ctrl.Result{Requeue: false}, nil, err } @@ -46,6 +46,11 @@ func (f *ensureBuildSkipped) Do(ctx context.Context, workflow *operatorapi.Sonat return ctrl.Result{Requeue: true}, nil, nil } +func (f *ensureBuildSkipped) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error { + //By default, we don't want to perform anything after the reconciliation, and so we will simply return no error + return nil +} + type followDeployWorkflowState struct { *common.StateSupport ensurers *objectEnsurers @@ -53,9 +58,14 @@ type followDeployWorkflowState struct { func (f *followDeployWorkflowState) CanReconcile(workflow *operatorapi.SonataFlow) bool { // we always reconcile since in this flow we don't mind building anything, just reconcile the deployment state - return workflow.Status.GetCondition(api.BuiltConditionType).Reason == api.BuildSkipped + return workflow.Status.GetCondition(api.BuiltConditionType).Reason == api.BuildSkippedReason } func (f *followDeployWorkflowState) Do(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) { - return newDeploymentHandler(f.StateSupport, f.ensurers).handle(ctx, workflow) + return newDeploymentReconciler(f.StateSupport, f.ensurers).reconcile(ctx, workflow) +} + +func (f *followDeployWorkflowState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error { + //By default, we don't want to perform anything after the reconciliation, and so we will simply return no error + return nil } diff --git a/controllers/sonataflow_controller.go b/controllers/sonataflow_controller.go index c8afde210..4e6bf7457 100644 --- a/controllers/sonataflow_controller.go +++ b/controllers/sonataflow_controller.go @@ -95,7 +95,7 @@ func (r *SonataFlowReconciler) Reconcile(ctx context.Context, req ctrl.Request) return reconcile.Result{}, nil } - return profiles.NewReconciler(r.Client, workflow).Reconcile(ctx, workflow) + return profiles.NewReconciler(r.Client, r.Recorder, workflow).Reconcile(ctx, workflow) } func platformEnqueueRequestsFromMapFunc(c client.Client, p *operatorapi.SonataFlowPlatform) []reconcile.Request { @@ -135,7 +135,6 @@ func buildEnqueueRequestsFromMapFunc(c client.Client, b *operatorapi.SonataFlowB var requests []reconcile.Request if b.Status.BuildPhase == operatorapi.BuildPhaseSucceeded { - // Fetch the Workflow instance workflow := &operatorapi.SonataFlow{} namespacedName := types.NamespacedName{ diff --git a/controllers/sonataflow_controller_test.go b/controllers/sonataflow_controller_test.go index ebb5c868c..fd9d04e71 100644 --- a/controllers/sonataflow_controller_test.go +++ b/controllers/sonataflow_controller_test.go @@ -49,7 +49,7 @@ func TestSonataFlowController(t *testing.T) { // Create a fake client to mock API calls. cl := test.NewSonataFlowClientBuilder().WithRuntimeObjects(objs...).WithStatusSubresource(ksw, ksp).Build() // Create a SonataFlowReconciler object with the scheme and fake client. - r := &SonataFlowReconciler{Client: cl, Scheme: cl.Scheme()} + r := &SonataFlowReconciler{Client: cl, Scheme: cl.Scheme(), Recorder: test.NewFakeRecorder()} // Mock request to simulate Reconcile() being called on an event for a // watched resource . diff --git a/controllers/sonataflowbuild_controller.go b/controllers/sonataflowbuild_controller.go index 81ff9c095..e9b98a95a 100644 --- a/controllers/sonataflowbuild_controller.go +++ b/controllers/sonataflowbuild_controller.go @@ -25,6 +25,8 @@ import ( "reflect" "time" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflows" + kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes" "k8s.io/klog/v2" buildv1 "github.com/openshift/api/build/v1" @@ -87,19 +89,15 @@ func (r *SonataFlowBuildReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, err } - if phase == operatorapi.BuildPhaseNone { - if err = buildManager.Schedule(build); err != nil { - return ctrl.Result{}, err - } - return ctrl.Result{RequeueAfter: requeueAfterForNewBuild}, r.manageStatusUpdate(ctx, build) - // TODO: this smells, why not just else? review in the future: https://issues.redhat.com/browse/KOGITO-8785 + if phase == operatorapi.BuildPhaseNone || kubeutil.GetAnnotationAsBool(build, operatorapi.BuildRestartAnnotation) { + return r.scheduleNewBuild(ctx, buildManager, build) } else if phase != operatorapi.BuildPhaseSucceeded && phase != operatorapi.BuildPhaseError && phase != operatorapi.BuildPhaseFailed { beforeReconcileStatus := build.Status.DeepCopy() if err = buildManager.Reconcile(build); err != nil { return ctrl.Result{}, err } if !reflect.DeepEqual(build.Status, beforeReconcileStatus) { - if err = r.manageStatusUpdate(ctx, build); err != nil { + if err = r.manageStatusUpdate(ctx, build, beforeReconcileStatus.BuildPhase); err != nil { return ctrl.Result{}, err } } @@ -109,10 +107,37 @@ func (r *SonataFlowBuildReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, nil } -func (r *SonataFlowBuildReconciler) manageStatusUpdate(ctx context.Context, instance *operatorapi.SonataFlowBuild) error { +func (r *SonataFlowBuildReconciler) scheduleNewBuild(ctx context.Context, buildManager builder.BuildManager, build *operatorapi.SonataFlowBuild) (ctrl.Result, error) { + if err := buildManager.Schedule(build); err != nil { + return ctrl.Result{}, err + } + if err := r.manageStatusUpdate(ctx, build, ""); err != nil { + return ctrl.Result{}, err + } + if kubeutil.GetAnnotationAsBool(build, operatorapi.BuildRestartAnnotation) { + // Remove restart annotation to not enter in infinity reconciliation loop + kubeutil.SetAnnotation(build, operatorapi.BuildRestartAnnotation, "false") + if err := r.Update(ctx, build); err != nil { + return ctrl.Result{}, err + } + // Signals to the workflow that we are rebuilding + workflowManager, err := workflows.NewManager(r.Client, ctx, build.Namespace, build.Name) + if err != nil { + return ctrl.Result{}, err + } + if err := workflowManager.SetBuiltStatusToRunning("Build marked to restart"); err != nil { + return ctrl.Result{}, err + } + } + + return ctrl.Result{RequeueAfter: requeueAfterForNewBuild}, nil +} + +func (r *SonataFlowBuildReconciler) manageStatusUpdate(ctx context.Context, instance *operatorapi.SonataFlowBuild, beforeReconcilePhase operatorapi.BuildPhase) error { err := r.Status().Update(ctx, instance) - if err == nil { - r.Recorder.Event(instance, corev1.EventTypeNormal, "Updated", fmt.Sprintf("Updated buildphase to %s", instance.Status.BuildPhase)) + // Don't need to spam events if the phase hasn't changed + if err == nil && beforeReconcilePhase != instance.Status.BuildPhase { + r.Recorder.Event(instance, corev1.EventTypeNormal, "Updated", fmt.Sprintf("Updated buildphase to %s", instance.Status.BuildPhase)) } return err } diff --git a/controllers/sonataflowbuild_controller_test.go b/controllers/sonataflowbuild_controller_test.go index 53b10958b..6ec69a6ec 100644 --- a/controllers/sonataflowbuild_controller_test.go +++ b/controllers/sonataflowbuild_controller_test.go @@ -111,3 +111,30 @@ func TestSonataFlowBuildController_WithArgsAndEnv(t *testing.T) { assert.Len(t, containerBuild.Spec.Tasks[0].Kaniko.AdditionalFlags, 1) assert.Len(t, containerBuild.Spec.Tasks[0].Kaniko.Envs, 1) } + +func TestSonataFlowBuildController_MarkToRestart(t *testing.T) { + namespace := t.Name() + ksw := test.GetBaseSonataFlow(namespace) + ksb := test.GetNewEmptySonataFlowBuild(ksw.Name, namespace) + ksb.Annotations = map[string]string{operatorapi.BuildRestartAnnotation: "true"} + + cl := test.NewSonataFlowClientBuilder(). + WithRuntimeObjects(ksb, ksw). + WithRuntimeObjects(test.GetBasePlatformInReadyPhase(namespace)). + WithRuntimeObjects(test.GetSonataFlowBuilderConfig(namespace)). + WithStatusSubresource(ksb, ksw). + Build() + + r := &SonataFlowBuildReconciler{cl, cl.Scheme(), &record.FakeRecorder{}, &rest.Config{}} + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: ksb.Name, + Namespace: ksb.Namespace, + }, + } + + _, err := r.Reconcile(context.TODO(), req) + assert.NoError(t, err) + ksb = test.MustGetBuild(t, cl, types.NamespacedName{Name: ksb.Name, Namespace: namespace}) + assert.Equal(t, "false", ksb.Annotations[operatorapi.BuildRestartAnnotation]) +} diff --git a/controllers/workflows/workflows.go b/controllers/workflows/workflows.go new file mode 100644 index 000000000..d3b4c27b3 --- /dev/null +++ b/controllers/workflows/workflows.go @@ -0,0 +1,60 @@ +// Copyright 2023 Apache Software Foundation (ASF) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workflows + +import ( + "context" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ WorkflowManager = &workflowManager{} + +// WorkflowManager offers a management interface for operations with SonataFlows instances outside the controller's package. +// Meant to be used by other packages that don't have access to a SonataFlow instance coming from a reconciliation cycle. +type WorkflowManager interface { + SetBuiltStatusToRunning(message string) error + GetWorkflow() *v1alpha08.SonataFlow +} + +type workflowManager struct { + workflow *v1alpha08.SonataFlow + client client.Client + ctx context.Context +} + +func (w *workflowManager) GetWorkflow() *v1alpha08.SonataFlow { + return w.workflow +} + +func (w *workflowManager) SetBuiltStatusToRunning(message string) error { + w.workflow.Status.Manager().MarkFalse(api.BuiltConditionType, api.BuildIsRunningReason, message) + return w.client.Status().Update(w.ctx, w.workflow) +} + +func NewManager(client client.Client, ctx context.Context, ns, name string) (WorkflowManager, error) { + workflow := &v1alpha08.SonataFlow{} + if err := client.Get(ctx, types.NamespacedName{Name: name, Namespace: ns}, workflow); err != nil { + return nil, err + } + return &workflowManager{ + workflow: workflow, + client: client, + ctx: ctx, + }, nil +} diff --git a/test/kubernetes_cli.go b/test/kubernetes_cli.go index 716542c49..c4d7021f8 100644 --- a/test/kubernetes_cli.go +++ b/test/kubernetes_cli.go @@ -32,12 +32,17 @@ import ( "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" ) +func NewFakeRecorder() record.EventRecorder { + return record.NewFakeRecorder(10) +} + // NewSonataFlowClientBuilder creates a new fake.ClientBuilder with the right scheme references func NewSonataFlowClientBuilder() *fake.ClientBuilder { s := scheme.Scheme @@ -78,6 +83,13 @@ func MustGetWorkflow(t *testing.T, client ctrl.WithWatch, name types.NamespacedN return mustGet(t, client, workflow, workflow).(*operatorapi.SonataFlow) } +func MustGetBuild(t *testing.T, client ctrl.WithWatch, name types.NamespacedName) *operatorapi.SonataFlowBuild { + build := &operatorapi.SonataFlowBuild{} + err := client.Get(context.TODO(), name, build) + assert.NoError(t, err) + return build +} + func mustGet(t *testing.T, client ctrl.WithWatch, workflow *operatorapi.SonataFlow, obj ctrl.Object) ctrl.Object { err := client.Get(context.TODO(), ctrl.ObjectKeyFromObject(workflow), obj) assert.NoError(t, err) diff --git a/test/mock_service.go b/test/mock_service.go index 6109cce37..2003c5d6c 100644 --- a/test/mock_service.go +++ b/test/mock_service.go @@ -22,8 +22,6 @@ package test import ( "context" - "k8s.io/klog/v2" - oappsv1 "github.com/openshift/api/apps/v1" buildv1 "github.com/openshift/api/build/v1" consolev1 "github.com/openshift/api/console/v1" @@ -35,10 +33,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" clientv1 "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - - apiv08 "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" - "github.com/apache/incubator-kie-kogito-serverless-operator/log" ) type MockPlatformService struct { @@ -56,10 +50,6 @@ type MockPlatformService struct { StatusFunc func() clientv1.StatusWriter } -func MockService() *MockPlatformService { - return MockServiceWithExtraScheme() -} - var knownTypes = map[schema.GroupVersion][]runtime.Object{ corev1.SchemeGroupVersion: { &corev1.PersistentVolumeClaim{}, @@ -106,54 +96,6 @@ var knownTypes = map[schema.GroupVersion][]runtime.Object{ }, } -func MockServiceWithExtraScheme(objs ...runtime.Object) *MockPlatformService { - registerObjs := []runtime.Object{&apiv08.SonataFlow{}, &apiv08.SonataFlowList{}} - registerObjs = append(registerObjs, objs...) - apiv08.SchemeBuilder.Register(registerObjs...) - scheme, _ := apiv08.SchemeBuilder.Build() - for gv, types := range knownTypes { - for _, t := range types { - scheme.AddKnownTypes(gv, t) - } - } - client := fake.NewFakeClientWithScheme(scheme) - klog.V(log.D).InfoS("Fake client created", "client", client) - return &MockPlatformService{ - Client: client, - scheme: scheme, - CreateFunc: func(ctx context.Context, obj clientv1.Object, opts ...clientv1.CreateOption) error { - return client.Create(ctx, obj, opts...) - }, - DeleteFunc: func(ctx context.Context, obj clientv1.Object, opts ...clientv1.DeleteOption) error { - return client.Delete(ctx, obj, opts...) - }, - GetFunc: func(ctx context.Context, key clientv1.ObjectKey, obj clientv1.Object) error { - return client.Get(ctx, key, obj) - }, - ListFunc: func(ctx context.Context, list clientv1.ObjectList, opts ...clientv1.ListOption) error { - return client.List(ctx, list, opts...) - }, - UpdateFunc: func(ctx context.Context, obj clientv1.Object, opts ...clientv1.UpdateOption) error { - return client.Update(ctx, obj, opts...) - }, - PatchFunc: func(ctx context.Context, obj clientv1.Object, patch clientv1.Patch, opts ...clientv1.PatchOption) error { - return client.Patch(ctx, obj, patch, opts...) - }, - DeleteAllOfFunc: func(ctx context.Context, obj clientv1.Object, opts ...clientv1.DeleteAllOfOption) error { - return client.DeleteAllOf(ctx, obj, opts...) - }, - GetCachedFunc: func(ctx context.Context, key clientv1.ObjectKey, obj clientv1.Object) error { - return client.Get(ctx, key, obj) - }, - GetSchemeFunc: func() *runtime.Scheme { - return scheme - }, - StatusFunc: func() clientv1.StatusWriter { - return client.Status() - }, - } -} - func (service *MockPlatformService) Create(ctx context.Context, obj clientv1.Object, opts ...clientv1.CreateOption) error { return service.CreateFunc(ctx, obj, opts...) } diff --git a/utils/kubernetes/annotations.go b/utils/kubernetes/annotations.go index e3bac5eb3..24d9ba4ab 100644 --- a/utils/kubernetes/annotations.go +++ b/utils/kubernetes/annotations.go @@ -21,6 +21,7 @@ package kubernetes import ( "context" + "strconv" "k8s.io/klog/v2" @@ -45,3 +46,25 @@ func GetLastGeneration(namespace string, name string, c client.Client, ctx conte workflow := getWorkflow(namespace, name, c, ctx) return workflow.Generation } + +// GetAnnotationAsBool returns the boolean value from the given annotation. +// If the annotation is not present or is there an error in the ParseBool conversion, returns false. +func GetAnnotationAsBool(object client.Object, key string) bool { + if object.GetAnnotations() != nil { + b, err := strconv.ParseBool(object.GetAnnotations()[key]) + if err != nil { + return false + } + return b + } + return false +} + +// SetAnnotation Safely set the annotation to the object +func SetAnnotation(object client.Object, key, value string) { + if object.GetAnnotations() != nil { + object.GetAnnotations()[key] = value + } else { + object.SetAnnotations(map[string]string{key: value}) + } +}