From 86a5975c7024eda9f29b8e40affec7161cb67e88 Mon Sep 17 00:00:00 2001 From: Jianrong Zhang Date: Fri, 13 Sep 2024 10:37:43 -0400 Subject: [PATCH] [issue-464] Create a Prometheus ServiceMonitor object that can capture/collect metrics from deployed SonataFlow instances --- .github/workflows/e2e.yml | 4 + Makefile | 17 +- api/v1alpha08/sonataflowplatform_types.go | 13 + api/v1alpha08/zz_generated.deepcopy.go | 20 ++ .../sonataflow.org_sonataflowplatforms.yaml | 8 + cmd/main.go | 5 +- .../sonataflow.org_sonataflowplatforms.yaml | 8 + config/rbac/role.yaml | 11 + go.work.sum | 2 + internal/controller/knative/knative.go | 19 +- internal/controller/monitoring/monitoring.go | 52 ++++ .../profiles/common/object_creators.go | 33 +- .../profiles/common/object_creators_test.go | 28 ++ .../controller/profiles/dev/profile_dev.go | 3 + .../profiles/dev/profile_dev_test.go | 19 +- .../controller/profiles/dev/states_dev.go | 9 + .../profiles/gitops/profile_gitops_test.go | 4 +- .../profiles/monitoring/monitoring.go | 65 ++++ .../profiles/preview/deployment_handler.go | 14 +- .../preview/deployment_handler_test.go | 10 +- .../profiles/preview/profile_preview.go | 18 +- .../profiles/preview/profile_preview_test.go | 29 +- internal/controller/sonataflow_controller.go | 11 + .../sonataflowplatform_controller_test.go | 5 +- operator.yaml | 19 ++ test/e2e/e2e_suite_test.go | 4 + test/e2e/helpers.go | 105 ++++++- .../k8s_deployment/01-postgres.yaml | 86 ++++++ .../k8s_deployment/02-default-broker.yaml | 18 ++ .../03-sonataflow_platform.yaml | 67 ++++ ...configmap_callbackstatetimeouts-props.yaml | 28 ++ ...6-sonataflow_callbackstatetimeouts.sw.yaml | 107 +++++++ .../k8s_deployment/kustomization.yaml | 35 +++ .../knative_service/01-postgres.yaml | 86 ++++++ .../knative_service/02-default-broker.yaml | 18 ++ .../03-sonataflow_platform.yaml | 67 ++++ ...configmap_callbackstatetimeouts-props.yaml | 28 ++ ...6-sonataflow_callbackstatetimeouts.sw.yaml | 114 +++++++ .../knative_service/kustomization.yaml | 35 +++ test/e2e/workflow_test.go | 289 ++++++++++++------ test/kubernetes_cli.go | 2 + test/testdata/grafana.yaml | 26 ++ test/testdata/prometheus.yaml | 69 +++++ test/yaml.go | 3 +- utils/client.go | 22 +- utils/kubernetes/security.go | 3 +- 46 files changed, 1480 insertions(+), 158 deletions(-) create mode 100644 internal/controller/monitoring/monitoring.go create mode 100644 internal/controller/profiles/monitoring/monitoring.go create mode 100644 test/e2e/testdata/workflows/prometheus/k8s_deployment/01-postgres.yaml create mode 100644 test/e2e/testdata/workflows/prometheus/k8s_deployment/02-default-broker.yaml create mode 100644 test/e2e/testdata/workflows/prometheus/k8s_deployment/03-sonataflow_platform.yaml create mode 100644 test/e2e/testdata/workflows/prometheus/k8s_deployment/05-configmap_callbackstatetimeouts-props.yaml create mode 100644 test/e2e/testdata/workflows/prometheus/k8s_deployment/06-sonataflow_callbackstatetimeouts.sw.yaml create mode 100644 test/e2e/testdata/workflows/prometheus/k8s_deployment/kustomization.yaml create mode 100644 test/e2e/testdata/workflows/prometheus/knative_service/01-postgres.yaml create mode 100644 test/e2e/testdata/workflows/prometheus/knative_service/02-default-broker.yaml create mode 100644 test/e2e/testdata/workflows/prometheus/knative_service/03-sonataflow_platform.yaml create mode 100644 test/e2e/testdata/workflows/prometheus/knative_service/05-configmap_callbackstatetimeouts-props.yaml create mode 100644 test/e2e/testdata/workflows/prometheus/knative_service/06-sonataflow_callbackstatetimeouts.sw.yaml create mode 100644 test/e2e/testdata/workflows/prometheus/knative_service/kustomization.yaml create mode 100644 test/testdata/grafana.yaml create mode 100644 test/testdata/prometheus.yaml diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index ec598cc88..32a4ae0c9 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -17,6 +17,7 @@ env: PYTHON_VERSION: "3.10" KIND_VERSION: v0.20.0 KNATIVE_VERSION: v1.12.5 + PROMETHEUS_VERSION: v0.70.0 OPERATOR_IMAGE_NAME: "127.0.0.1:5001/kogito-serverless-operator:0.0.1" jobs: @@ -68,6 +69,9 @@ jobs: - name: Deploy Knative Eventing and Serving run: make KNATIVE_VERSION=${{ env.KNATIVE_VERSION }} deploy-knative + - name: Deploy Prometheus + run: make PROMETHEUS_VERSION=${{ env.PROMETHEUS_VERSION }} deploy-prometheus + - name: Set OPERATOR_IMAGE_NAME to Point to Kind's Local Registry run: echo "OPERATOR_IMAGE_NAME=${{ env.OPERATOR_IMAGE_NAME }}" >> $GITHUB_ENV diff --git a/Makefile b/Makefile index a768f93d3..ba1baa070 100644 --- a/Makefile +++ b/Makefile @@ -123,8 +123,7 @@ test: manifests generate envtest test-api ## Run tests. @$(MAKE) vet @$(MAKE) fmt @echo "🔍 Running controller tests..." - @KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" \ - go test $(shell go list ./... | grep -v /test/) -coverprofile cover.out > /dev/null 2>&1 + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test $(shell go list ./... | grep -v /test/) -coverprofile cover.out @echo "✅ Tests completed successfully. Coverage report generated: cover.out." .PHONY: test-api @@ -264,6 +263,8 @@ GOLANGCI_LINT_VERSION ?= v1.57.2 KIND_VERSION ?= v0.20.0 KNATIVE_VERSION ?= v1.13.2 TIMEOUT_SECS ?= 180s +PROMETHEUS_VERSION ?= v0.70.0 +GRAFANA_VERSION ?= v5.13.0 KNATIVE_SERVING_PREFIX ?= "https://github.com/knative/serving/releases/download/knative-$(KNATIVE_VERSION)" KNATIVE_EVENTING_PREFIX ?= "https://github.com/knative/eventing/releases/download/knative-$(KNATIVE_VERSION)" @@ -450,6 +451,18 @@ deploy-knative: kubectl wait --for=condition=Ready=True KnativeServing/knative-serving -n knative-serving --timeout=$(TIMEOUT_SECS) kubectl wait --for=condition=Ready=True KnativeEventing/knative-eventing -n knative-eventing --timeout=$(TIMEOUT_SECS) +.PHONY: deploy-prometheus +deploy-prometheus: create-cluster + kubectl create -f https://github.com/prometheus-operator/prometheus-operator/releases/download/$(PROMETHEUS_VERSION)/bundle.yaml + kubectl wait --for=condition=Available=True deploy/prometheus-operator -n default --timeout=$(TIMEOUT_SECS) + kubectl apply -f ./test/testdata/prometheus.yaml + kubectl wait --for=condition=Available=True prometheus/prometheus -n default --timeout=$(TIMEOUT_SECS) + +.PHONY: deploy-grafana +deploy-grafana: create-cluster + kubectl create -f https://github.com/grafana/grafana-operator/releases/download/$(GRAFANA_VERSION)/kustomize-cluster_scoped.yaml + kubectl wait --for=condition=Available=True deploy/grafana-operator-controller-manager -n grafana --timeout=$(TIMEOUT_SECS) + .PHONY: delete-cluster delete-cluster: install-kind kind delete cluster && $(BUILDER) rm -f kind-registry diff --git a/api/v1alpha08/sonataflowplatform_types.go b/api/v1alpha08/sonataflowplatform_types.go index e1a009e75..b940ee8a1 100644 --- a/api/v1alpha08/sonataflowplatform_types.go +++ b/api/v1alpha08/sonataflowplatform_types.go @@ -63,6 +63,10 @@ type SonataFlowPlatformSpec struct { // These properties MAY NOT be propagated to a SonataFlowClusterPlatform since PropertyVarSource can only refer local context sources. // +optional Properties *PropertyPlatformSpec `json:"properties,omitempty"` + // Settings for Prometheus monitoring + // +optional + // +default: false + Monitoring *PlatformMonitoringOptionsSpec `json:"monitoring,omitempty"` } // PlatformEventingSpec specifies the Knative Eventing integration details in the platform. @@ -74,6 +78,15 @@ type PlatformEventingSpec struct { Broker *duckv1.Destination `json:"broker,omitempty"` } +// PlatformMonitoringOptionsSpec specifies the settings for monitoring +// +k8s:openapi-gen=true +type PlatformMonitoringOptionsSpec struct { + // Enabled indicates whether monitoring with Prometheus metrics is enabled + // +optional + // +default: false + Enabled bool `json:"enabled,omitempty"` +} + // PlatformCluster is the kind of orchestration cluster the platform is installed into // +kubebuilder:validation:Enum=kubernetes;openshift type PlatformCluster string diff --git a/api/v1alpha08/zz_generated.deepcopy.go b/api/v1alpha08/zz_generated.deepcopy.go index b5a16a466..5cb3bdbf3 100644 --- a/api/v1alpha08/zz_generated.deepcopy.go +++ b/api/v1alpha08/zz_generated.deepcopy.go @@ -449,6 +449,21 @@ func (in *PlatformEventingSpec) DeepCopy() *PlatformEventingSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PlatformMonitoringOptionsSpec) DeepCopyInto(out *PlatformMonitoringOptionsSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PlatformMonitoringOptionsSpec. +func (in *PlatformMonitoringOptionsSpec) DeepCopy() *PlatformMonitoringOptionsSpec { + if in == nil { + return nil + } + out := new(PlatformMonitoringOptionsSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PlatformPersistenceOptionsSpec) DeepCopyInto(out *PlatformPersistenceOptionsSpec) { *out = *in @@ -1289,6 +1304,11 @@ func (in *SonataFlowPlatformSpec) DeepCopyInto(out *SonataFlowPlatformSpec) { *out = new(PropertyPlatformSpec) (*in).DeepCopyInto(*out) } + if in.Monitoring != nil { + in, out := &in.Monitoring, &out.Monitoring + *out = new(PlatformMonitoringOptionsSpec) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowPlatformSpec. diff --git a/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml b/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml index 6f83ab0ae..975327dab 100644 --- a/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml +++ b/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml @@ -503,6 +503,14 @@ spec: type: string type: object type: object + monitoring: + description: Settings for Prometheus monitoring + properties: + enabled: + description: Enabled indicates whether monitoring with Prometheus + metrics is enabled + type: boolean + type: object persistence: description: |- Persistence defines the platform persistence configuration. When this field is set, diff --git a/cmd/main.go b/cmd/main.go index 7d625360e..8a5bce673 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -27,14 +27,14 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/cfg" "github.com/apache/incubator-kie-kogito-serverless-operator/version" + prometheus "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + "k8s.io/klog/v2/klogr" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" servingv1 "knative.dev/serving/pkg/apis/serving/v1" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/webhook" - "k8s.io/klog/v2/klogr" - "k8s.io/klog/v2" "github.com/apache/incubator-kie-kogito-serverless-operator/utils" @@ -66,6 +66,7 @@ func init() { utilruntime.Must(sourcesv1.AddToScheme(scheme)) utilruntime.Must(eventingv1.AddToScheme(scheme)) utilruntime.Must(servingv1.AddToScheme(scheme)) + utilruntime.Must(prometheus.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } diff --git a/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml b/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml index 762533cb7..ecda82523 100644 --- a/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml +++ b/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml @@ -503,6 +503,14 @@ spec: type: string type: object type: object + monitoring: + description: Settings for Prometheus monitoring + properties: + enabled: + description: Enabled indicates whether monitoring with Prometheus + metrics is enabled + type: boolean + type: object persistence: description: |- Persistence defines the platform persistence configuration. When this field is set, diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 66e33fa72..521895f8a 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -4,6 +4,17 @@ kind: ClusterRole metadata: name: manager-role rules: +- apiGroups: + - monitoring.coreos.com + resources: + - servicemonitors + verbs: + - create + - delete + - get + - list + - update + - watch - apiGroups: - sonataflow.org resources: diff --git a/go.work.sum b/go.work.sum index 0ce649c2f..3f5892fea 100644 --- a/go.work.sum +++ b/go.work.sum @@ -2174,6 +2174,7 @@ github.com/google/go-containerregistry/pkg/authn/kubernetes v0.0.0-2023020916533 github.com/google/go-containerregistry/pkg/authn/kubernetes v0.0.0-20230209165335-3624968304fd/go.mod h1:6pjZpt+0dg+Z0kUEn53qLtD57raiZo/bqWzsuX6dDjo= github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4rEjNlfyDHW9dolSY= github.com/google/go-github/v27 v27.0.6 h1:oiOZuBmGHvrGM1X9uNUAUlLgp5r1UUO/M/KnbHnLRlQ= +github.com/google/go-jsonnet v0.18.0/go.mod h1:C3fTzyVJDslXdiTqw/bTFk7vSGyCtH3MGRbDfvEwGd0= github.com/google/go-pkcs11 v0.2.1-0.20230907215043-c6f79328ddf9 h1:OF1IPgv+F4NmqmJ98KTjdN97Vs1JxDPB3vbmYzV2dpk= github.com/google/go-pkcs11 v0.2.1-0.20230907215043-c6f79328ddf9/go.mod h1:6eQoGcuNJpa7jnd5pMGdkSaQpNDYvPlXWMcjXXThLlY= github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk= @@ -2466,6 +2467,7 @@ github.com/openzipkin/zipkin-go v0.3.0/go.mod h1:4c3sLeE8xjNqehmF5RpAFLPLJxXscc0 github.com/openzipkin/zipkin-go v0.4.2 h1:zjqfqHjUpPmB3c1GlCvvgsM1G4LkvqQbBDueDOCg/jA= github.com/openzipkin/zipkin-go v0.4.2/go.mod h1:ZeVkFjuuBiSy13y8vpSDCjMi9GoI3hPpCJSBx/EYFhY= github.com/operator-framework/api v0.1.1 h1:DbfxRJUPMQlQW6nbfoNzWLxv1rIv13Gt8GbsF2aglFk= +github.com/operator-framework/operator-lib v0.11.0/go.mod h1:RpyKhFAoG6DmKTDIwMuO6pI3LRc8IE9rxEYWy476o6g= github.com/operator-framework/operator-registry v1.6.1 h1:Ow0Ko9DRIZ4xvH55vFAslcTy6A9FhlIeXvm+FhyRd84= github.com/otiai10/copy v1.0.2 h1:DDNipYy6RkIkjMwy+AWzgKiNTyj2RUI9yEMeETEpVyc= github.com/otiai10/curr v0.0.0-20190513014714-f5a3d24e5776 h1:o59bHXu8Ejas8Kq6pjoVJQ9/neN66SM8AKh6wI42BBs= diff --git a/internal/controller/knative/knative.go b/internal/controller/knative/knative.go index c99942cf8..48aabd1ea 100644 --- a/internal/controller/knative/knative.go +++ b/internal/controller/knative/knative.go @@ -30,7 +30,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/discovery" "k8s.io/client-go/rest" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" @@ -44,7 +43,6 @@ import ( var servingClient clientservingv1.ServingV1Interface var eventingClient clienteventingv1.EventingV1Interface -var discoveryClient discovery.DiscoveryInterface type Availability struct { Eventing bool @@ -92,23 +90,8 @@ func NewKnativeEventingClient(cfg *rest.Config) (*clienteventingv1.EventingV1Cli return clienteventingv1.NewForConfig(cfg) } -func getDiscoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface, error) { - if discoveryClient == nil { - if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err != nil { - return nil, err - } else { - discoveryClient = cli - } - } - return discoveryClient, nil -} - -func SetDiscoveryClient(cli discovery.DiscoveryInterface) { - discoveryClient = cli -} - func GetKnativeAvailability(cfg *rest.Config) (*Availability, error) { - if cli, err := getDiscoveryClient(cfg); err != nil { + if cli, err := utils.GetDiscoveryClient(cfg); err != nil { return nil, err } else { apiList, err := cli.ServerGroups() diff --git a/internal/controller/monitoring/monitoring.go b/internal/controller/monitoring/monitoring.go new file mode 100644 index 000000000..0ce2a97e1 --- /dev/null +++ b/internal/controller/monitoring/monitoring.go @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package monitoring + +import ( + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/utils" + "k8s.io/client-go/rest" +) + +const ( + prometheusGroup = "monitoring.coreos.com" +) + +func GetPrometheusAvailability(cfg *rest.Config) (bool, error) { + cli, err := utils.GetDiscoveryClient(cfg) + if err != nil { + return false, err + } + apiList, err := cli.ServerGroups() + if err != nil { + return false, err + } + for _, group := range apiList.Groups { + if group.Name == prometheusGroup { + return true, nil + } + + } + return false, nil +} + +func IsMonitoringEnabled(pl *operatorapi.SonataFlowPlatform) bool { + return pl != nil && pl.Spec.Monitoring != nil && pl.Spec.Monitoring.Enabled +} diff --git a/internal/controller/profiles/common/object_creators.go b/internal/controller/profiles/common/object_creators.go index b0a9ff4ec..240c8aefd 100644 --- a/internal/controller/profiles/common/object_creators.go +++ b/internal/controller/profiles/common/object_creators.go @@ -32,6 +32,7 @@ import ( cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model" "github.com/imdario/mergo" + prometheus "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -64,6 +65,8 @@ const ( deploymentKind = "Deployment" k8sServiceAPIVersion = "v1" k8sServiceKind = "Service" + k8sServicePortName = "web" + metricsServicePortPath = "/q/metrics" ) // ObjectCreator is the func that creates the initial reference object, if the object doesn't exist in the cluster, this one is created. @@ -262,6 +265,7 @@ func ServiceCreator(workflow *operatorapi.SonataFlow) (client.Object, error) { Spec: corev1.ServiceSpec{ Selector: lbl, Ports: []corev1.ServicePort{{ + Name: k8sServicePortName, Protocol: corev1.ProtocolTCP, Port: defaultHTTPServicePort, TargetPort: variables.DefaultHTTPWorkflowPortIntStr, @@ -439,10 +443,37 @@ func UserPropsConfigMapCreator(workflow *operatorapi.SonataFlow) (client.Object, // ManagedPropsConfigMapCreator creates an empty ConfigMap to hold the external application properties func ManagedPropsConfigMapCreator(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (client.Object, error) { - props, err := properties.ApplicationManagedProperties(workflow, platform) if err != nil { return nil, err } return workflowproj.CreateNewManagedPropsConfigMap(workflow, props), nil } + +// ServiceMonitorCreator is an ObjectsCreator for Service Monitor for the workflow service. +func ServiceMonitorCreator(workflow *operatorapi.SonataFlow) (client.Object, error) { + lbl := workflowproj.GetMergedLabels(workflow) + spec := &prometheus.ServiceMonitorSpec{ + Selector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + workflowproj.LabelWorkflow: workflow.Name, + workflowproj.LabelWorkflowNamespace: workflow.Namespace, + }, + }, + Endpoints: []prometheus.Endpoint{ + { + Port: k8sServicePortName, + Path: metricsServicePortPath, + }, + }, + } + serviceMonitor := &prometheus.ServiceMonitor{ + ObjectMeta: metav1.ObjectMeta{ + Name: workflow.Name, + Namespace: workflow.Namespace, + Labels: lbl, + }, + Spec: *spec, + } + return serviceMonitor, nil +} diff --git a/internal/controller/profiles/common/object_creators_test.go b/internal/controller/profiles/common/object_creators_test.go index 46f1c894b..9e2b5274d 100644 --- a/internal/controller/profiles/common/object_creators_test.go +++ b/internal/controller/profiles/common/object_creators_test.go @@ -25,6 +25,7 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" "github.com/magiconair/properties" + prometheus "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -941,3 +942,30 @@ func doTestDefaultContainer_WithPlatformPersistence(t *testing.T, workflow *v1al assert.Nil(t, container.Env) } } + +func TestEnsureWorkflowServiceMonitorIsCreatedWhenDeployedAsDeployment(t *testing.T) { + workflow := test.GetVetEventSonataFlow(t.Name()) + assert.Equal(t, workflow.IsKnativeDeployment(), false) + serviceMonitor, err := ServiceMonitorCreator(workflow) + assert.NoError(t, err) + assert.NotNil(t, serviceMonitor) + serviceMonitor.SetUID("1") + serviceMonitor.SetResourceVersion("1") + reflectServiceMonitor := serviceMonitor.(*prometheus.ServiceMonitor) + + assert.NotNil(t, reflectServiceMonitor) + assert.NotNil(t, reflectServiceMonitor.Spec) + assert.Equal(t, len(reflectServiceMonitor.Spec.Selector.MatchLabels), 2) + assert.Equal(t, reflectServiceMonitor.Spec.Selector.MatchLabels[workflowproj.LabelWorkflow], workflow.Name) + assert.Equal(t, reflectServiceMonitor.Spec.Selector.MatchLabels[workflowproj.LabelWorkflowNamespace], workflow.Namespace) + assert.Equal(t, reflectServiceMonitor.Spec.Endpoints[0].Port, k8sServicePortName) + assert.Equal(t, reflectServiceMonitor.Spec.Endpoints[0].Path, metricsServicePortPath) + assert.NotNil(t, reflectServiceMonitor.GetLabels()) + assert.Equal(t, reflectServiceMonitor.ObjectMeta.Labels, map[string]string{ + "app": workflow.Name, + "sonataflow.org/workflow-app": workflow.Name, + "sonataflow.org/workflow-namespace": workflow.Namespace, + "app.kubernetes.io/name": workflow.Name, + "app.kubernetes.io/component": "serverless-workflow", + "app.kubernetes.io/managed-by": "sonataflow-operator"}) +} diff --git a/internal/controller/profiles/dev/profile_dev.go b/internal/controller/profiles/dev/profile_dev.go index 969d1c2d5..585232737 100644 --- a/internal/controller/profiles/dev/profile_dev.go +++ b/internal/controller/profiles/dev/profile_dev.go @@ -78,6 +78,7 @@ func newObjectEnsurers(support *common.StateSupport) *objectEnsurers { return &objectEnsurers{ deployment: common.NewObjectEnsurerWithPlatform(support.C, deploymentCreator), service: common.NewObjectEnsurer(support.C, serviceCreator), + serviceMonitor: common.NewObjectEnsurer(support.C, common.ServiceMonitorCreator), network: common.NewNoopObjectEnsurer(), definitionConfigMap: common.NewObjectEnsurer(support.C, workflowDefConfigMapCreator), userPropsConfigMap: common.NewObjectEnsurer(support.C, common.UserPropsConfigMapCreator), @@ -89,6 +90,7 @@ func newObjectEnsurersOpenShift(support *common.StateSupport) *objectEnsurers { return &objectEnsurers{ deployment: common.NewObjectEnsurerWithPlatform(support.C, deploymentCreator), service: common.NewObjectEnsurer(support.C, serviceCreator), + serviceMonitor: common.NewObjectEnsurer(support.C, common.ServiceMonitorCreator), network: common.NewObjectEnsurer(support.C, common.OpenShiftRouteCreator), definitionConfigMap: common.NewObjectEnsurer(support.C, workflowDefConfigMapCreator), userPropsConfigMap: common.NewObjectEnsurer(support.C, common.UserPropsConfigMapCreator), @@ -111,6 +113,7 @@ func newStatusEnrichersOpenShift(support *common.StateSupport) *statusEnrichers type objectEnsurers struct { deployment common.ObjectEnsurerWithPlatform service common.ObjectEnsurer + serviceMonitor common.ObjectEnsurer network common.ObjectEnsurer definitionConfigMap common.ObjectEnsurer userPropsConfigMap common.ObjectEnsurer diff --git a/internal/controller/profiles/dev/profile_dev_test.go b/internal/controller/profiles/dev/profile_dev_test.go index b349d3387..7d4b44738 100644 --- a/internal/controller/profiles/dev/profile_dev_test.go +++ b/internal/controller/profiles/dev/profile_dev_test.go @@ -51,7 +51,6 @@ import ( clientruntime "sigs.k8s.io/controller-runtime/pkg/client" "github.com/apache/incubator-kie-kogito-serverless-operator/api" - "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/test" ) @@ -60,7 +59,7 @@ func Test_OverrideStartupProbe(t *testing.T) { client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) @@ -88,7 +87,7 @@ func Test_recoverFromFailureNoDeployment(t *testing.T) { workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.DeploymentFailureReason, "") client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) reconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) // we are in failed state and have no objects @@ -129,7 +128,7 @@ func Test_newDevProfile(t *testing.T) { workflow := test.GetBaseSonataFlow(t.Name()) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) @@ -212,7 +211,7 @@ func Test_newDevProfile(t *testing.T) { func Test_devProfileImageDefaultsNoPlatform(t *testing.T) { workflow := test.GetBaseSonataFlowWithDevProfile(t.Name()) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) @@ -231,7 +230,7 @@ func Test_devProfileWithImageSnapshotOverrideWithPlatform(t *testing.T) { platform := test.GetBasePlatformWithDevBaseImageInReadyPhase(workflow.Namespace) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build() - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) @@ -250,7 +249,7 @@ func Test_devProfileWithWPlatformWithoutDevBaseImageAndWithBaseImage(t *testing. platform := test.GetBasePlatformWithBaseImageInReadyPhase(workflow.Namespace) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build() - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) @@ -269,7 +268,7 @@ func Test_devProfileWithPlatformWithoutDevBaseImageAndWithoutBaseImage(t *testin platform := test.GetBasePlatformInReadyPhase(workflow.Namespace) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build() - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) @@ -289,7 +288,7 @@ func Test_newDevProfileWithExternalConfigMaps(t *testing.T) { operatorapi.ConfigMapWorkflowResource{ConfigMap: corev1.LocalObjectReference{Name: configmapName}, WorkflowPath: "routes"}) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build() - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) @@ -404,7 +403,7 @@ func Test_VolumeWithCapitalizedPaths(t *testing.T) { workflow := test.GetSonataFlow(test.SonataFlowGreetingsWithStaticResourcesCR, t.Name()) client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, configMap).WithStatusSubresource(workflow, configMap).Build() - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) devReconciler := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()) diff --git a/internal/controller/profiles/dev/states_dev.go b/internal/controller/profiles/dev/states_dev.go index 3386cb27e..34be5c929 100644 --- a/internal/controller/profiles/dev/states_dev.go +++ b/internal/controller/profiles/dev/states_dev.go @@ -34,6 +34,7 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/api" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/monitoring" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/constants" @@ -111,6 +112,14 @@ func (e *ensureRunningWorkflowState) Do(ctx context.Context, workflow *operatora } objs = append(objs, service) + if monitoring.IsMonitoringEnabled(pl) { + serviceMonitor, _, err := e.ensurers.serviceMonitor.Ensure(ctx, workflow) + if err != nil { + return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, objs, err + } + objs = append(objs, serviceMonitor) + } + route, _, err := e.ensurers.network.Ensure(ctx, workflow) if err != nil { return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, objs, err diff --git a/internal/controller/profiles/gitops/profile_gitops_test.go b/internal/controller/profiles/gitops/profile_gitops_test.go index 0c2f1bba4..feecaa385 100644 --- a/internal/controller/profiles/gitops/profile_gitops_test.go +++ b/internal/controller/profiles/gitops/profile_gitops_test.go @@ -20,8 +20,8 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/api" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" - "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/test" + "github.com/apache/incubator-kie-kogito-serverless-operator/utils" "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -40,7 +40,7 @@ func Test_Reconciler_ProdOps(t *testing.T) { WithRuntimeObjects(workflow). WithStatusSubresource(workflow, &operatorapi.SonataFlowBuild{}).Build() - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) result, err := NewProfileForOpsReconciler(client, &rest.Config{}, test.NewFakeRecorder()).Reconcile(context.TODO(), workflow) assert.NoError(t, err) diff --git a/internal/controller/profiles/monitoring/monitoring.go b/internal/controller/profiles/monitoring/monitoring.go new file mode 100644 index 000000000..03af74164 --- /dev/null +++ b/internal/controller/profiles/monitoring/monitoring.go @@ -0,0 +1,65 @@ +// Copyright 2024 Apache Software Foundation (ASF) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitoring + +import ( + "context" + + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/monitoring" + "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ MonitoringEventingHandler = &monitoringObjectManager{} + +type monitoringObjectManager struct { + serviceMonitor common.ObjectEnsurer + *common.StateSupport +} + +func NewMonitoringHandler(support *common.StateSupport) MonitoringEventingHandler { + return &monitoringObjectManager{ + serviceMonitor: common.NewObjectEnsurer(support.C, common.ServiceMonitorCreator), + StateSupport: support, + } +} + +type MonitoringEventingHandler interface { + Ensure(ctx context.Context, workflow *operatorapi.SonataFlow) ([]client.Object, error) +} + +func (k monitoringObjectManager) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow) ([]client.Object, error) { + var objs []client.Object + monitoringAvail, err := monitoring.GetPrometheusAvailability(k.Cfg) + if err != nil { + klog.V(log.I).InfoS("Error checking Prometheus availability: %v", err) + return nil, err + } + if !monitoringAvail { + klog.V(log.I).InfoS("Prometheus is not installed") + } else { + // create serviceMonitor + serviceMonitor, _, err := k.serviceMonitor.Ensure(ctx, workflow) + if err != nil { + return objs, err + } else if serviceMonitor != nil { + objs = append(objs, serviceMonitor) + } + } + return objs, nil +} diff --git a/internal/controller/profiles/preview/deployment_handler.go b/internal/controller/profiles/preview/deployment_handler.go index fb75a8706..18b7cfb8a 100644 --- a/internal/controller/profiles/preview/deployment_handler.go +++ b/internal/controller/profiles/preview/deployment_handler.go @@ -26,6 +26,7 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/api" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/monitoring" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform/services" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common" @@ -154,12 +155,19 @@ func (d *DeploymentReconciler) ensureObjects(ctx context.Context, workflow *oper return reconcile.Result{}, nil, err } + objs := []client.Object{deployment, managedPropsCM, service} eventingObjs, err := common.NewKnativeEventingHandler(d.StateSupport, pl).Ensure(ctx, workflow) if err != nil { return reconcile.Result{}, nil, err } - - objs := []client.Object{deployment, managedPropsCM, service} + objs = append(objs, eventingObjs...) + if monitoring.IsMonitoringEnabled(pl) { + serviceMonitor, _, err := d.ensurers.ServiceMonitorByDeploymentModel(workflow).Ensure(ctx, workflow) + if err != nil { + return reconcile.Result{}, nil, err + } + objs = append(objs, serviceMonitor) + } if deploymentOp == controllerutil.OperationResultCreated { workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.WaitingForDeploymentReason, "") if _, err := d.PerformStatusUpdate(ctx, workflow); err != nil { @@ -167,8 +175,6 @@ func (d *DeploymentReconciler) ensureObjects(ctx context.Context, workflow *oper } return reconcile.Result{RequeueAfter: constants.RequeueAfterFollowDeployment, Requeue: true}, objs, nil } - objs = append(objs, eventingObjs...) - return reconcile.Result{}, objs, nil } diff --git a/internal/controller/profiles/preview/deployment_handler_test.go b/internal/controller/profiles/preview/deployment_handler_test.go index d2dd6dca7..483f69eee 100644 --- a/internal/controller/profiles/preview/deployment_handler_test.go +++ b/internal/controller/profiles/preview/deployment_handler_test.go @@ -20,8 +20,8 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" - "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/test" + "github.com/apache/incubator-kie-kogito-serverless-operator/utils" "github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj" "github.com/magiconair/properties" "github.com/stretchr/testify/assert" @@ -45,7 +45,7 @@ func Test_CheckDeploymentModelIsKnative(t *testing.T) { WithStatusSubresource(workflow). Build() stateSupport := fakeReconcilerSupport(cli) - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) handler := NewDeploymentReconciler(stateSupport, NewObjectEnsurers(stateSupport)) result, objects, err := handler.ensureObjects(context.TODO(), workflow, "") @@ -72,7 +72,7 @@ func Test_CheckPodTemplateChangesReflectDeployment(t *testing.T) { WithStatusSubresource(workflow). Build() stateSupport := fakeReconcilerSupport(client) - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) handler := NewDeploymentReconciler(stateSupport, NewObjectEnsurers(stateSupport)) result, objects, err := handler.Reconcile(context.TODO(), workflow) @@ -108,7 +108,7 @@ func Test_CheckDeploymentRolloutAfterCMChange(t *testing.T) { WithStatusSubresource(workflow). Build() stateSupport := fakeReconcilerSupport(client) - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) handler := NewDeploymentReconciler(stateSupport, NewObjectEnsurers(stateSupport)) result, objects, err := handler.Reconcile(context.TODO(), workflow) @@ -171,7 +171,7 @@ func Test_CheckDeploymentUnchangedAfterCMChangeOtherKeys(t *testing.T) { WithStatusSubresource(workflow). Build() stateSupport := fakeReconcilerSupport(client) - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) handler := NewDeploymentReconciler(stateSupport, NewObjectEnsurers(stateSupport)) result, objects, err := handler.Reconcile(context.TODO(), workflow) diff --git a/internal/controller/profiles/preview/profile_preview.go b/internal/controller/profiles/preview/profile_preview.go index 2f483ab2c..0892e81ce 100644 --- a/internal/controller/profiles/preview/profile_preview.go +++ b/internal/controller/profiles/preview/profile_preview.go @@ -58,7 +58,13 @@ type ObjectEnsurers struct { // kservice Knative Serving deployment for this ensurer. Don't call it directly, use DeploymentByDeploymentModel instead kservice common.ObjectEnsurerWithPlatform // service for this ensurer. Don't call it directly, use ServiceByDeploymentModel instead - service common.ObjectEnsurer + service common.ObjectEnsurer + // kMetricsService for this ensurer. Don't call it directly, use ServiceByDeploymentModel instead + kMetricsService common.ObjectEnsurer + // serviceMonitor for this ensurer. Don't call it directly, use ServiceMonitorByDeploymentModel instead + serviceMonitor common.ObjectEnsurer + // kServiceMonitor for this ensurer. Don't call it directly, use ServiceMonitorByDeploymentModel instead + kServiceMonitor common.ObjectEnsurer userPropsConfigMap common.ObjectEnsurer managedPropsConfigMap common.ObjectEnsurerWithPlatform } @@ -80,12 +86,22 @@ func (o *ObjectEnsurers) ServiceByDeploymentModel(workflow *v1alpha08.SonataFlow return o.service } +// ServiceMonitorByDeploymentModel gets the service monitor ensurer based on the SonataFlow deployment model +func (o *ObjectEnsurers) ServiceMonitorByDeploymentModel(workflow *v1alpha08.SonataFlow) common.ObjectEnsurer { + if workflow.IsKnativeDeployment() { + // Do not create service monitor for workflows deployed as Knative service + return common.NewNoopObjectEnsurer() + } + return o.serviceMonitor +} + // NewObjectEnsurers common.ObjectEnsurer(s) for the preview profile. func NewObjectEnsurers(support *common.StateSupport) *ObjectEnsurers { return &ObjectEnsurers{ deployment: common.NewObjectEnsurerWithPlatform(support.C, common.DeploymentCreator), kservice: common.NewObjectEnsurerWithPlatform(support.C, common.KServiceCreator), service: common.NewObjectEnsurer(support.C, common.ServiceCreator), + serviceMonitor: common.NewObjectEnsurer(support.C, common.ServiceMonitorCreator), userPropsConfigMap: common.NewObjectEnsurer(support.C, common.UserPropsConfigMapCreator), managedPropsConfigMap: common.NewObjectEnsurerWithPlatform(support.C, common.ManagedPropsConfigMapCreator), } diff --git a/internal/controller/profiles/preview/profile_preview_test.go b/internal/controller/profiles/preview/profile_preview_test.go index 7e4723496..aed417bda 100644 --- a/internal/controller/profiles/preview/profile_preview_test.go +++ b/internal/controller/profiles/preview/profile_preview_test.go @@ -26,9 +26,11 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/api" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" - "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common" "github.com/apache/incubator-kie-kogito-serverless-operator/test" + "github.com/apache/incubator-kie-kogito-serverless-operator/utils" + "github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj" + prometheus "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -50,7 +52,7 @@ func Test_Reconciler_ProdCustomPod(t *testing.T) { client := test.NewSonataFlowClientBuilder(). WithRuntimeObjects(workflow, build, platform). WithStatusSubresource(workflow, build, platform).Build() - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) _, err := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()).Reconcile(context.TODO(), workflow) assert.NoError(t, err) @@ -82,7 +84,7 @@ func Test_reconcilerProdBuildConditions(t *testing.T) { client := test.NewSonataFlowClientBuilder(). WithRuntimeObjects(workflow, platform). WithStatusSubresource(workflow, platform, &operatorapi.SonataFlowBuild{}).Build() - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) result, err := NewProfileReconciler(client, &rest.Config{}, test.NewFakeRecorder()).Reconcile(context.TODO(), workflow) assert.NoError(t, err) @@ -139,12 +141,13 @@ func Test_reconcilerProdBuildConditions(t *testing.T) { func Test_deployWorkflowReconciliationHandler_handleObjects(t *testing.T) { workflow := test.GetBaseSonataFlow(t.Name()) platform := test.GetBasePlatformInReadyPhase(t.Name()) + platform.Spec.Monitoring = &operatorapi.PlatformMonitoringOptionsSpec{Enabled: true} build := test.GetLocalSucceedSonataFlowBuild(workflow.Name, workflow.Namespace) - client := test.NewSonataFlowClientBuilder(). + client := test.NewKogitoClientBuilderWithOpenShift(). WithRuntimeObjects(workflow, platform, build). WithStatusSubresource(workflow, platform, build). Build() - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) handler := &deployWithBuildWorkflowState{ StateSupport: fakeReconcilerSupport(client), ensurers: NewObjectEnsurers(&common.StateSupport{C: client}), @@ -153,7 +156,7 @@ func Test_deployWorkflowReconciliationHandler_handleObjects(t *testing.T) { assert.Greater(t, result.RequeueAfter, int64(0)) assert.NoError(t, err) assert.NotNil(t, result) - assert.Len(t, objects, 3) + assert.Len(t, objects, 4) deployment := &appsv1.Deployment{} err = client.Get(context.TODO(), clientruntime.ObjectKeyFromObject(workflow), deployment) @@ -164,6 +167,18 @@ func Test_deployWorkflowReconciliationHandler_handleObjects(t *testing.T) { assert.NoError(t, err) assert.False(t, workflow.Status.IsReady()) assert.Equal(t, api.WaitingForDeploymentReason, workflow.Status.GetTopLevelCondition().Reason) + + serviceMonitor := &prometheus.ServiceMonitor{} + err = client.Get(context.TODO(), clientruntime.ObjectKeyFromObject(workflow), serviceMonitor) + assert.NoError(t, err) + assert.NotEmpty(t, serviceMonitor.Spec) + assert.NotEmpty(t, serviceMonitor.Spec.Selector) + assert.Equal(t, len(serviceMonitor.Spec.Selector.MatchLabels), 2) + assert.Equal(t, serviceMonitor.Spec.Selector.MatchLabels[workflowproj.LabelWorkflow], workflow.Name) + assert.Equal(t, serviceMonitor.Spec.Selector.MatchLabels[workflowproj.LabelWorkflowNamespace], workflow.Namespace) + assert.Equal(t, len(serviceMonitor.Spec.Endpoints), 1) + assert.Equal(t, serviceMonitor.Spec.Endpoints[0].Port, "web") + assert.Equal(t, serviceMonitor.Spec.Endpoints[0].Path, "/q/metrics") } func Test_GenerationAnnotationCheck(t *testing.T) { @@ -173,7 +188,7 @@ func Test_GenerationAnnotationCheck(t *testing.T) { client := test.NewSonataFlowClientBuilder(). WithRuntimeObjects(workflow, platform). WithStatusSubresource(workflow, platform, &operatorapi.SonataFlowBuild{}).Build() - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) handler := &deployWithBuildWorkflowState{ StateSupport: fakeReconcilerSupport(client), ensurers: NewObjectEnsurers(&common.StateSupport{C: client}), diff --git a/internal/controller/sonataflow_controller.go b/internal/controller/sonataflow_controller.go index a7734becd..7724a6378 100644 --- a/internal/controller/sonataflow_controller.go +++ b/internal/controller/sonataflow_controller.go @@ -24,6 +24,7 @@ import ( "fmt" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative" + "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/monitoring" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" servingv1 "knative.dev/serving/pkg/apis/serving/v1" @@ -37,6 +38,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/client-go/rest" + prometheus "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -68,6 +70,7 @@ type SonataFlowReconciler struct { //+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/status,verbs=get;update;patch //+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/finalizers,verbs=update +//+kubebuilder:rbac:groups="monitoring.coreos.com",resources=servicemonitors,verbs=get;list;watch;create;update;delete // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -253,5 +256,13 @@ func (r *SonataFlowReconciler) SetupWithManager(mgr ctrl.Manager) error { Owns(&sourcesv1.SinkBinding{}). Watches(&eventingv1.Trigger{}, handler.EnqueueRequestsFromMapFunc(knative.MapTriggerToPlatformRequests)) } + promAvail, err := monitoring.GetPrometheusAvailability(mgr.GetConfig()) + if err != nil { + return err + } + if promAvail { + builder = builder.Owns(&prometheus.ServiceMonitor{}) + } + return builder.Complete(r) } diff --git a/internal/controller/sonataflowplatform_controller_test.go b/internal/controller/sonataflowplatform_controller_test.go index db567769b..dbadd2e8e 100644 --- a/internal/controller/sonataflowplatform_controller_test.go +++ b/internal/controller/sonataflowplatform_controller_test.go @@ -25,7 +25,6 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/clusterplatform" - "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform/services" "github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/constants" "github.com/apache/incubator-kie-kogito-serverless-operator/test" @@ -874,7 +873,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp, broker).WithStatusSubresource(ksp, broker).Build() utils.SetClient(cl) - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) // Create a SonataFlowPlatformReconciler object with the scheme and fake client. r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} @@ -975,7 +974,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp, broker, brokerDataIndexSource, brokerJobsServiceSource, brokerJobsServiceSink).WithStatusSubresource(ksp, broker, brokerDataIndexSource, brokerJobsServiceSource, brokerJobsServiceSink).Build() utils.SetClient(cl) - knative.SetDiscoveryClient(test.CreateFakeKnativeDiscoveryClient()) + utils.SetDiscoveryClient(test.CreateFakeKnativeAndMonitoringDiscoveryClient()) // Create a SonataFlowPlatformReconciler object with the scheme and fake client. r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} diff --git a/operator.yaml b/operator.yaml index 85d949106..a799e2924 100644 --- a/operator.yaml +++ b/operator.yaml @@ -1037,6 +1037,14 @@ spec: type: string type: object type: object + monitoring: + description: Settings for Prometheus monitoring + properties: + enabled: + description: Enabled indicates whether monitoring with Prometheus + metrics is enabled + type: boolean + type: object persistence: description: |- Persistence defines the platform persistence configuration. When this field is set, @@ -27783,6 +27791,17 @@ kind: ClusterRole metadata: name: sonataflow-operator-manager-role rules: +- apiGroups: + - monitoring.coreos.com + resources: + - servicemonitors + verbs: + - create + - delete + - get + - list + - update + - watch - apiGroups: - sonataflow.org resources: diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index e249d3993..8fbd82436 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -101,6 +101,10 @@ var _ = BeforeSuite(func() { } else { GinkgoWriter.Println("Fetch pre-built workflows images in the cluster") err = fetchImageTagsBuiltWorkflows(workflows) + if err != nil { + GinkgoWriter.Println("Failed to fetch pre-built workflows images, try to build them") + err = deployWorkflowsAndWaitForBuild(workflows) + } Expect(err).NotTo(HaveOccurred()) } diff --git a/test/e2e/helpers.go b/test/e2e/helpers.go index e6e2faca8..703ad7827 100644 --- a/test/e2e/helpers.go +++ b/test/e2e/helpers.go @@ -238,7 +238,36 @@ func verifySchemaMigration(data, name string) bool { strings.Contains(data, fmt.Sprintf("Schema \"%s\" is up to date. No migration necessary", name))) } -func waitForPodRestartCompletion(label, ns string) { +func verifyKSinkInjection(label, ns string) bool { + cmd := exec.Command("kubectl", "get", "pod", "-n", ns, "-l", label, "-o", "jsonpath={.items[*].metadata.name}") + out, err := utils.Run(cmd) + if err != nil { + GinkgoWriter.Println(fmt.Errorf("failed to get pods: %v", err)) + return false + } + podNames := strings.Fields(string(out)) + if len(podNames) == 0 { + GinkgoWriter.Println("no pods found to check K_SINK") + return false // pods haven't created yet + } + GinkgoWriter.Println(fmt.Sprintf("pods found: %s", podNames)) + for _, pod := range podNames { + cmd = exec.Command("kubectl", "get", "pod", pod, "-n", ns, "-o", "json") + out, err := utils.Run(cmd) + if err != nil { + GinkgoWriter.Println(fmt.Errorf("failed to get pod: %v", err)) + return false + } + GinkgoWriter.Println(string(out)) + if !strings.Contains(string(out), "K_SINK") { // The pod does not have K_SINK injected + GinkgoWriter.Println(fmt.Sprintf("Pod does not have K_SINK injected: %s", string(out))) + return false + } + } + return true +} + +func waitForPodRestartCompletion(label, ns string) (podRunning string) { EventuallyWithOffset(1, func() bool { cmd := exec.Command("kubectl", "get", "pod", "-n", ns, "-l", label, "-o", "jsonpath={.items[*].metadata.name}") out, err := utils.Run(cmd) @@ -254,8 +283,11 @@ func waitForPodRestartCompletion(label, ns string) { GinkgoWriter.Println("multiple pods found") return false // multiple pods found, wait for other pods to terminate } + podRunning = podNames[0] return true - }, 1*time.Minute, 5).Should(BeTrue()) + }, 10*time.Minute, 5).Should(BeTrue()) + + return } func verifyTrigger(triggers []operatorapi.SonataFlowPlatformTriggerRef, namePrefix, path, ns, broker string) error { @@ -293,3 +325,72 @@ func verifySinkBinding(name, ns, broker string) error { } return fmt.Errorf("failed to verify sinkbinding %v, data=%s", name, string(out)) } + +func getWorkflowId(resp string) (string, error) { + // First find the json data + ind1 := strings.Index(resp, "{") + ind2 := strings.LastIndex(resp, "}") + data := resp[ind1 : ind2+1] + // Retrieve the id from json data + m := make(map[string]interface{}) + err := json.Unmarshal([]byte(data), &m) + if err != nil { + return "", err + } + if id, ok := m["id"].(string); ok { + return id, nil + } + return "", fmt.Errorf("failed to find workflow id") +} + +func getMetricValue(resp string) (string, error) { + fmt.Println(resp) + ind1 := strings.Index(resp, "{") + ind2 := strings.LastIndex(resp, "}") + data := resp[ind1 : ind2+1] + + // Retrieve the metric value from json data + m := make(map[string]interface{}) + err := json.Unmarshal([]byte(data), &m) + if err != nil { + return "", err + } + result, ok := m["data"].(map[string]interface{})["result"] + if !ok { + return "", fmt.Errorf("no valid response data received") + } + metrics := result.([]interface{}) + if len(metrics) == 0 { + return "", fmt.Errorf("no valid metric data retrieved") + } + metric := metrics[0] + values := metric.(map[string]interface{})["value"] + if val, ok := (values.([]interface{}))[1].(string); ok { + return val, nil + } else { + return "", fmt.Errorf("failed to get metric value") + } +} + +func getPodNameAfterWorkflowInstCreation(name, ns string) (string, error) { + labels := fmt.Sprintf("sonataflow.org/workflow-app=%s,sonataflow.org/workflow-namespace=%s", name, ns) + cmd := exec.Command("kubectl", "get", "pod", "-n", ns, "-l", labels, "-o=jsonpath='{range .items[*]}{.metadata.name} {.status.conditions[?(@.type=='Ready')].status}{';'}{end}'") + fmt.Println(cmd.String()) + out, err := utils.Run(cmd) + if err != nil { + return "", err + } + fmt.Println(string(out)) + data := strings.Split(string(out), ";") + for _, line := range data { + res := strings.Fields(line) + if len(res) == 2 && strings.Contains(res[0], "-00002-deployment-") { + if res[1] == "True" { + return res[0], nil + } else { + return "", fmt.Errorf("pod %s is not ready=", res) + } + } + } + return "", fmt.Errorf("invalid data received: %s", string(out)) +} diff --git a/test/e2e/testdata/workflows/prometheus/k8s_deployment/01-postgres.yaml b/test/e2e/testdata/workflows/prometheus/k8s_deployment/01-postgres.yaml new file mode 100644 index 000000000..662de4c7b --- /dev/null +++ b/test/e2e/testdata/workflows/prometheus/k8s_deployment/01-postgres.yaml @@ -0,0 +1,86 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres-pvc +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: postgres + template: + metadata: + labels: + app.kubernetes.io/name: postgres + spec: + containers: + - name: postgres + image: postgres:13.2-alpine + imagePullPolicy: 'IfNotPresent' + ports: + - containerPort: 5432 + volumeMounts: + - name: storage + mountPath: /var/lib/postgresql/data + envFrom: + - secretRef: + name: postgres-secrets + readinessProbe: + exec: + command: ["pg_isready"] + initialDelaySeconds: 15 + timeoutSeconds: 2 + livenessProbe: + exec: + command: ["pg_isready"] + initialDelaySeconds: 15 + timeoutSeconds: 2 + resources: + limits: + memory: "256Mi" + cpu: "500m" + volumes: + - name: storage + persistentVolumeClaim: + claimName: postgres-pvc +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres +spec: + selector: + app.kubernetes.io/name: postgres + ports: + - port: 5432 diff --git a/test/e2e/testdata/workflows/prometheus/k8s_deployment/02-default-broker.yaml b/test/e2e/testdata/workflows/prometheus/k8s_deployment/02-default-broker.yaml new file mode 100644 index 000000000..95338a19d --- /dev/null +++ b/test/e2e/testdata/workflows/prometheus/k8s_deployment/02-default-broker.yaml @@ -0,0 +1,18 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: eventing.knative.dev/v1 +kind: Broker +metadata: + name: sonataflow-broker \ No newline at end of file diff --git a/test/e2e/testdata/workflows/prometheus/k8s_deployment/03-sonataflow_platform.yaml b/test/e2e/testdata/workflows/prometheus/k8s_deployment/03-sonataflow_platform.yaml new file mode 100644 index 000000000..74ea3bb43 --- /dev/null +++ b/test/e2e/testdata/workflows/prometheus/k8s_deployment/03-sonataflow_platform.yaml @@ -0,0 +1,67 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlowPlatform +metadata: + name: sonataflow-platform +spec: + monitoring: + enabled: true + eventing: + # Platform Broker configuration. Should be used by DI, JS and the workflows if not overridden. + broker: + ref: + name: sonataflow-broker + apiVersion: eventing.knative.dev/v1 + kind: Broker + services: + dataIndex: + enabled: true + persistence: + migrateDBOnStartUp: true + postgresql: + serviceRef: + name: postgres + databaseName: sonataflow + databaseSchema: data-index-schema + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD + podTemplate: + initContainers: + - name: init-postgres + image: registry.access.redhat.com/ubi9/ubi-minimal:latest + imagePullPolicy: IfNotPresent + command: [ 'sh', '-c', 'until (echo 1 > /dev/tcp/postgres.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432) >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;' ] + jobService: + enabled: true + persistence: + migrateDBOnStartUp: true + postgresql: + serviceRef: + name: postgres + databaseName: sonataflow + databaseSchema: jobs-service-schema + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD + podTemplate: + initContainers: + - name: init-postgres + image: registry.access.redhat.com/ubi9/ubi-minimal:latest + imagePullPolicy: IfNotPresent + command: [ 'sh', '-c', 'until (echo 1 > /dev/tcp/postgres.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432) >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;' ] diff --git a/test/e2e/testdata/workflows/prometheus/k8s_deployment/05-configmap_callbackstatetimeouts-props.yaml b/test/e2e/testdata/workflows/prometheus/k8s_deployment/05-configmap_callbackstatetimeouts-props.yaml new file mode 100644 index 000000000..5aa5a08aa --- /dev/null +++ b/test/e2e/testdata/workflows/prometheus/k8s_deployment/05-configmap_callbackstatetimeouts-props.yaml @@ -0,0 +1,28 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +data: + application.properties: | + # Kogito runtime persistence configurations + kogito.persistence.query.timeout.millis=10000 + # Set to false to create the tables manually. Automatic generation is a different use case. + quarkus.flyway.migrate-at-start=true + # Not mandatory in production + quarkus.swagger-ui.always-include=true +kind: ConfigMap +metadata: + labels: + app: callbackstatetimeouts + name: callbackstatetimeouts-props diff --git a/test/e2e/testdata/workflows/prometheus/k8s_deployment/06-sonataflow_callbackstatetimeouts.sw.yaml b/test/e2e/testdata/workflows/prometheus/k8s_deployment/06-sonataflow_callbackstatetimeouts.sw.yaml new file mode 100644 index 000000000..67ed09a2f --- /dev/null +++ b/test/e2e/testdata/workflows/prometheus/k8s_deployment/06-sonataflow_callbackstatetimeouts.sw.yaml @@ -0,0 +1,107 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlow +metadata: + name: callbackstatetimeouts + annotations: + sonataflow.org/description: Callback State Timeouts Example k8s + sonataflow.org/version: 0.0.1 + sonataflow.org/profile: preview +spec: + # In this example we only configure the DB to use, the operator will manage all the rest at build time. + # 1) add the necessary quarkus extensions related to the persistence + # 2) add the necessary build time properties related to the persistence in the form of maven build attributes + persistence: + migrateDBOnStartUp: true + postgresql: + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD + serviceRef: + name: postgres + databaseName: sonataflow + podTemplate: + initContainers: + - name: init-postgres + image: registry.access.redhat.com/ubi9/ubi-minimal:latest + imagePullPolicy: IfNotPresent + command: [ 'sh', '-c', 'until (echo 1 > /dev/tcp/postgres.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432) >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;' ] + flow: + start: PrintStartMessage + events: + - name: callbackEvent + source: '' + type: callback_event_type + - name: exitEvent + source: '' + kind: produced + type: callbackstatetimeouts_exit_event + functions: + - name: systemOut + type: custom + operation: sysout + states: + - name: PrintStartMessage + type: operation + actions: + - name: printSystemOut + functionRef: + refName: systemOut + arguments: + message: "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId + \" has started.\"}" + transition: CallbackState + - name: CallbackState + type: callback + action: + name: callbackAction + functionRef: + refName: systemOut + arguments: + message: "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId + \" has executed the callbackFunction.\"}" + eventRef: callbackEvent + transition: CheckEventArrival + timeouts: + eventTimeout: PT30S + - name: CheckEventArrival + type: switch + dataConditions: + - condition: "${ .eventData != null }" + transition: EventArrived + defaultCondition: + transition: EventNotArrived + - name: EventArrived + type: inject + data: + exitMessage: "The callback event has arrived." + transition: PrintExitMessage + - name: EventNotArrived + type: inject + data: + exitMessage: "The callback event has not arrived, and the timeout has overdue." + transition: PrintExitMessage + - name: PrintExitMessage + type: operation + actions: + - name: printSystemOut + functionRef: + refName: systemOut + arguments: + message: "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId + \" has finalized. \" + .exitMessage + \" eventData: \" + .eventData}" + end: + terminate: true + produceEvents: + - eventRef: exitEvent diff --git a/test/e2e/testdata/workflows/prometheus/k8s_deployment/kustomization.yaml b/test/e2e/testdata/workflows/prometheus/k8s_deployment/kustomization.yaml new file mode 100644 index 000000000..ef758adea --- /dev/null +++ b/test/e2e/testdata/workflows/prometheus/k8s_deployment/kustomization.yaml @@ -0,0 +1,35 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +resources: +# DB server and Broker +- 01-postgres.yaml +- 02-default-broker.yaml +- 03-sonataflow_platform.yaml +- 05-configmap_callbackstatetimeouts-props.yaml +- 06-sonataflow_callbackstatetimeouts.sw.yaml + +generatorOptions: + disableNameSuffixHash: true + +secretGenerator: + - name: postgres-secrets + literals: + - POSTGRES_USER=sonataflow + - POSTGRES_PASSWORD=sonataflow + - POSTGRES_DB=sonataflow + - PGDATA=/var/lib/postgresql/data/mydata + +sortOptions: + order: fifo \ No newline at end of file diff --git a/test/e2e/testdata/workflows/prometheus/knative_service/01-postgres.yaml b/test/e2e/testdata/workflows/prometheus/knative_service/01-postgres.yaml new file mode 100644 index 000000000..662de4c7b --- /dev/null +++ b/test/e2e/testdata/workflows/prometheus/knative_service/01-postgres.yaml @@ -0,0 +1,86 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres-pvc +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: postgres + template: + metadata: + labels: + app.kubernetes.io/name: postgres + spec: + containers: + - name: postgres + image: postgres:13.2-alpine + imagePullPolicy: 'IfNotPresent' + ports: + - containerPort: 5432 + volumeMounts: + - name: storage + mountPath: /var/lib/postgresql/data + envFrom: + - secretRef: + name: postgres-secrets + readinessProbe: + exec: + command: ["pg_isready"] + initialDelaySeconds: 15 + timeoutSeconds: 2 + livenessProbe: + exec: + command: ["pg_isready"] + initialDelaySeconds: 15 + timeoutSeconds: 2 + resources: + limits: + memory: "256Mi" + cpu: "500m" + volumes: + - name: storage + persistentVolumeClaim: + claimName: postgres-pvc +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres +spec: + selector: + app.kubernetes.io/name: postgres + ports: + - port: 5432 diff --git a/test/e2e/testdata/workflows/prometheus/knative_service/02-default-broker.yaml b/test/e2e/testdata/workflows/prometheus/knative_service/02-default-broker.yaml new file mode 100644 index 000000000..95338a19d --- /dev/null +++ b/test/e2e/testdata/workflows/prometheus/knative_service/02-default-broker.yaml @@ -0,0 +1,18 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: eventing.knative.dev/v1 +kind: Broker +metadata: + name: sonataflow-broker \ No newline at end of file diff --git a/test/e2e/testdata/workflows/prometheus/knative_service/03-sonataflow_platform.yaml b/test/e2e/testdata/workflows/prometheus/knative_service/03-sonataflow_platform.yaml new file mode 100644 index 000000000..74ea3bb43 --- /dev/null +++ b/test/e2e/testdata/workflows/prometheus/knative_service/03-sonataflow_platform.yaml @@ -0,0 +1,67 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlowPlatform +metadata: + name: sonataflow-platform +spec: + monitoring: + enabled: true + eventing: + # Platform Broker configuration. Should be used by DI, JS and the workflows if not overridden. + broker: + ref: + name: sonataflow-broker + apiVersion: eventing.knative.dev/v1 + kind: Broker + services: + dataIndex: + enabled: true + persistence: + migrateDBOnStartUp: true + postgresql: + serviceRef: + name: postgres + databaseName: sonataflow + databaseSchema: data-index-schema + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD + podTemplate: + initContainers: + - name: init-postgres + image: registry.access.redhat.com/ubi9/ubi-minimal:latest + imagePullPolicy: IfNotPresent + command: [ 'sh', '-c', 'until (echo 1 > /dev/tcp/postgres.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432) >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;' ] + jobService: + enabled: true + persistence: + migrateDBOnStartUp: true + postgresql: + serviceRef: + name: postgres + databaseName: sonataflow + databaseSchema: jobs-service-schema + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD + podTemplate: + initContainers: + - name: init-postgres + image: registry.access.redhat.com/ubi9/ubi-minimal:latest + imagePullPolicy: IfNotPresent + command: [ 'sh', '-c', 'until (echo 1 > /dev/tcp/postgres.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432) >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;' ] diff --git a/test/e2e/testdata/workflows/prometheus/knative_service/05-configmap_callbackstatetimeouts-props.yaml b/test/e2e/testdata/workflows/prometheus/knative_service/05-configmap_callbackstatetimeouts-props.yaml new file mode 100644 index 000000000..5aa5a08aa --- /dev/null +++ b/test/e2e/testdata/workflows/prometheus/knative_service/05-configmap_callbackstatetimeouts-props.yaml @@ -0,0 +1,28 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +data: + application.properties: | + # Kogito runtime persistence configurations + kogito.persistence.query.timeout.millis=10000 + # Set to false to create the tables manually. Automatic generation is a different use case. + quarkus.flyway.migrate-at-start=true + # Not mandatory in production + quarkus.swagger-ui.always-include=true +kind: ConfigMap +metadata: + labels: + app: callbackstatetimeouts + name: callbackstatetimeouts-props diff --git a/test/e2e/testdata/workflows/prometheus/knative_service/06-sonataflow_callbackstatetimeouts.sw.yaml b/test/e2e/testdata/workflows/prometheus/knative_service/06-sonataflow_callbackstatetimeouts.sw.yaml new file mode 100644 index 000000000..007404a5c --- /dev/null +++ b/test/e2e/testdata/workflows/prometheus/knative_service/06-sonataflow_callbackstatetimeouts.sw.yaml @@ -0,0 +1,114 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlow +metadata: + name: callbackstatetimeouts + annotations: + sonataflow.org/description: Callback State Timeouts Example k8s + sonataflow.org/version: 0.0.1 + sonataflow.org/profile: preview +spec: + # In this example we only configure the DB to use, the operator will manage all the rest at build time. + # 1) add the necessary quarkus extensions related to the persistence + # 2) add the necessary build time properties related to the persistence in the form of maven build attributes + persistence: + migrateDBOnStartUp: true + postgresql: + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD + serviceRef: + name: postgres + databaseName: sonataflow + sink: + ref: + name: sonataflow-broker + apiVersion: eventing.knative.dev/v1 + kind: Broker + sources: + - eventType: callback_event_type + ref: + name: sonataflow-broker + apiVersion: eventing.knative.dev/v1 + kind: Broker + podTemplate: + deploymentModel: knative + flow: + start: PrintStartMessage + events: + - name: callbackEvent + source: '' + type: callback_event_type + - name: exitEvent + source: '' + kind: produced + type: callbackstatetimeouts_exit_event + functions: + - name: systemOut + type: custom + operation: sysout + states: + - name: PrintStartMessage + type: operation + actions: + - name: printSystemOut + functionRef: + refName: systemOut + arguments: + message: "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId + \" has started.\"}" + transition: CallbackState + - name: CallbackState + type: callback + action: + name: callbackAction + functionRef: + refName: systemOut + arguments: + message: "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId + \" has executed the callbackFunction.\"}" + eventRef: callbackEvent + transition: CheckEventArrival + timeouts: + eventTimeout: PT30S + - name: CheckEventArrival + type: switch + dataConditions: + - condition: "${ .eventData != null }" + transition: EventArrived + defaultCondition: + transition: EventNotArrived + - name: EventArrived + type: inject + data: + exitMessage: "The callback event has arrived." + transition: PrintExitMessage + - name: EventNotArrived + type: inject + data: + exitMessage: "The callback event has not arrived, and the timeout has overdue." + transition: PrintExitMessage + - name: PrintExitMessage + type: operation + actions: + - name: printSystemOut + functionRef: + refName: systemOut + arguments: + message: "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId + \" has finalized. \" + .exitMessage + \" eventData: \" + .eventData}" + end: + terminate: true + produceEvents: + - eventRef: exitEvent diff --git a/test/e2e/testdata/workflows/prometheus/knative_service/kustomization.yaml b/test/e2e/testdata/workflows/prometheus/knative_service/kustomization.yaml new file mode 100644 index 000000000..ef758adea --- /dev/null +++ b/test/e2e/testdata/workflows/prometheus/knative_service/kustomization.yaml @@ -0,0 +1,35 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +resources: +# DB server and Broker +- 01-postgres.yaml +- 02-default-broker.yaml +- 03-sonataflow_platform.yaml +- 05-configmap_callbackstatetimeouts-props.yaml +- 06-sonataflow_callbackstatetimeouts.sw.yaml + +generatorOptions: + disableNameSuffixHash: true + +secretGenerator: + - name: postgres-secrets + literals: + - POSTGRES_USER=sonataflow + - POSTGRES_PASSWORD=sonataflow + - POSTGRES_DB=sonataflow + - PGDATA=/var/lib/postgresql/data/mydata + +sortOptions: + order: fifo \ No newline at end of file diff --git a/test/e2e/workflow_test.go b/test/e2e/workflow_test.go index e7e0a1aa4..5fe4b76ce 100644 --- a/test/e2e/workflow_test.go +++ b/test/e2e/workflow_test.go @@ -24,6 +24,7 @@ import ( "fmt" "math/rand" "os/exec" + "path/filepath" "strings" "time" @@ -138,7 +139,6 @@ var _ = Describe("Workflow Non-Persistence Use Cases :: ", Label("flows-ephemera }) }) - }) var _ = Describe("Workflow Persistence Use Cases :: ", Label("flows-persistence"), Ordered, func() { @@ -168,111 +168,206 @@ var _ = Describe("Workflow Persistence Use Cases :: ", Label("flows-persistence" } }) + /* + DescribeTable("when deploying a SonataFlow CR with PostgreSQL persistence", func(testcaseDir string, withPersistence bool, waitKSinkInjection bool) { + By("Deploy the CR") + var manifests []byte + EventuallyWithOffset(1, func() error { + var err error + cmd := exec.Command("kubectl", "kustomize", testcaseDir) + manifests, err = utils.Run(cmd) + return err + }, time.Minute, time.Second).Should(Succeed()) + cmd := exec.Command("kubectl", "create", "-n", ns, "-f", "-") + cmd.Stdin = bytes.NewBuffer(manifests) + _, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) - DescribeTable("when deploying a SonataFlow CR with PostgreSQL persistence", func(testcaseDir string, withPersistence bool, waitKSinkInjection bool) { - By("Deploy the CR") - var manifests []byte - EventuallyWithOffset(1, func() error { - var err error - cmd := exec.Command("kubectl", "kustomize", testcaseDir) - manifests, err = utils.Run(cmd) - return err - }, time.Minute, time.Second).Should(Succeed()) - cmd := exec.Command("kubectl", "create", "-n", ns, "-f", "-") - cmd.Stdin = bytes.NewBuffer(manifests) - _, err := utils.Run(cmd) - Expect(err).NotTo(HaveOccurred()) + By("Replacing the image with a prebuilt one and rollout") + EventuallyWithOffset(1, func() error { + if withPersistence { + return kubectlPatchSonataFlowImageAndRollout(ns, prebuiltWorkflows.CallBackPersistence.Name, prebuiltWorkflows.CallBackPersistence.Tag) + } + return kubectlPatchSonataFlowImageAndRollout(ns, prebuiltWorkflows.CallBack.Name, prebuiltWorkflows.CallBack.Tag) + }, 3*time.Minute, time.Second).Should(Succeed()) - By("Replacing the image with a prebuilt one and rollout") - EventuallyWithOffset(1, func() error { - if withPersistence { - return kubectlPatchSonataFlowImageAndRollout(ns, prebuiltWorkflows.CallBackPersistence.Name, prebuiltWorkflows.CallBackPersistence.Tag) - } - return kubectlPatchSonataFlowImageAndRollout(ns, prebuiltWorkflows.CallBack.Name, prebuiltWorkflows.CallBack.Tag) - }, 3*time.Minute, time.Second).Should(Succeed()) - - By("Wait for SonataFlow CR to complete deployment") - // wait for service deployments to be ready - EventuallyWithOffset(1, func() bool { - cmd = exec.Command("kubectl", "wait", "pod", "-n", ns, "-l", workflowAppLabel, "--for", "condition=Ready", "--timeout=5s") - out, err := utils.Run(cmd) - if err != nil { + By("Wait for SonataFlow CR to complete deployment") + // wait for service deployments to be ready + EventuallyWithOffset(1, func() bool { + cmd = exec.Command("kubectl", "wait", "pod", "-n", ns, "-l", workflowAppLabel, "--for", "condition=Ready", "--timeout=5s") + out, err := utils.Run(cmd) + if err != nil { + return false + } + GinkgoWriter.Printf("%s\n", string(out)) + if !waitKSinkInjection { + return true + } + GinkgoWriter.Println("waitForPodRestartCompletion") + waitForPodRestartCompletion(workflowAppLabel, ns) + GinkgoWriter.Println("waitForPodRestartCompletion done") + return true + }, 5*time.Minute, 5*time.Second).Should(BeTrue()) + + By("Evaluate status of the workflow's pod database connection health endpoint") + cmd = exec.Command("kubectl", "get", "pod", "-l", workflowAppLabel, "-n", ns, "-ojsonpath={.items[*].metadata.name}") + output, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + EventuallyWithOffset(1, func() bool { + for _, pn := range strings.Split(string(output), " ") { + h, err := getHealthFromPod(pn, ns) + if err != nil { + continue + } + Expect(h.Status).To(Equal(upStatus), "Pod health is not UP") + if withPersistence { + connectionCheckFound := false + for _, c := range h.Checks { + if c.Name == dbConnectionName { + Expect(c.Status).To(Equal(upStatus), "Pod's database connection is not UP") + Expect(c.Data[defaultDataCheck]).To(Equal(upStatus), "Pod's 'default' database data is not UP") + connectionCheckFound = true + } + } + Expect(connectionCheckFound).To(Equal(true), "Connection health check not found, but the wofkflow has persistence") + return true + } else { + connectionCheckFound := false + for _, c := range h.Checks { + if c.Name == dbConnectionName { + connectionCheckFound = true + } + } + Expect(connectionCheckFound).To(Equal(false), "Connection health check was found, but the workflow don't have persistence") + return true + } + } return false + }, 4*time.Minute).Should(BeTrue()) + // Persistence initialization checks + cmd = exec.Command("kubectl", "get", "pod", "-l", workflowAppLabel, "-n", ns, "-ojsonpath={.items[*].metadata.name}") + output, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + podName := string(output) + cmd = exec.Command("kubectl", "logs", podName, "-n", ns) + output, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + logs := string(output) + if withPersistence { + By("Validate that the workflow persistence was properly initialized") + Expect(logs).Should(ContainSubstring("Flyway Community Edition")) + Expect(logs).Should(ContainSubstring("Database: jdbc:postgresql://postgres.%s:5432", ns)) + result := verifySchemaMigration(logs, prebuiltWorkflows.CallBackPersistence.Name) + GinkgoWriter.Println(fmt.Sprintf("verifySchemaMigration: %v", result)) + Expect(result).Should(BeTrue()) + Expect(logs).Should(ContainSubstring("Profile prod activated")) + } else { + By("Validate that the workflow has no persistence") + Expect(logs).ShouldNot(ContainSubstring("Flyway Community Edition")) + Expect(logs).ShouldNot(ContainSubstring(fmt.Sprintf(`Creating schema "%s"`, prebuiltWorkflows.CallBack.Name))) + Expect(logs).Should(ContainSubstring("Profile prod activated")) } - GinkgoWriter.Printf("%s\n", string(out)) - if !waitKSinkInjection { + }, + Entry("defined in the workflow from an existing kubernetes service as a reference", test.GetPathFromE2EDirectory("workflows", "persistence", "by_service"), true, false), + Entry("defined in the workflow and from the sonataflow platform", test.GetPathFromE2EDirectory("workflows", "persistence", "from_platform_overwritten_by_service"), true, false), + Entry("defined from the sonataflow platform as reference and with DI and JS", test.GetPathFromE2EDirectory("workflows", "persistence", "from_platform_with_di_and_js_services"), true, true), + Entry("defined from the sonataflow platform as reference and without DI and JS", test.GetPathFromE2EDirectory("workflows", "persistence", "from_platform_without_di_and_js_services"), true, false), + Entry("defined from the sonataflow platform as reference but not required by the workflow", test.GetPathFromE2EDirectory("workflows", "persistence", "from_platform_with_no_persistence_required"), false, false), + ) + */ + Describe("basic workflow monitoring", func() { + projectDir, _ := utils.GetProjectDir() + + It("should create servicemonitor for workflow deployed as k8s deployment when monitoring enabled in platform", func() { + By("creating external resources DataInputSchema configMap") + By("Deploy the SonataFlowPlatform CR") + var manifests []byte + EventuallyWithOffset(1, func() error { + var err error + cmd := exec.Command("kubectl", "kustomize", filepath.Join(projectDir, + "test/e2e/testdata/workflows/prometheus", "k8s_deployment")) + manifests, err = utils.Run(cmd) + return err + }, time.Minute, time.Second).Should(Succeed()) + cmd := exec.Command("kubectl", "create", "-n", ns, "-f", "-") + cmd.Stdin = bytes.NewBuffer(manifests) + _, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + workflowName := "callbackstatetimeouts" + By("check the workflow is in running state") + EventuallyWithOffset(1, func() bool { return verifyWorkflowIsInRunningState(workflowName, ns) }, 15*time.Minute, 30*time.Second).Should(BeTrue()) + + By("Retrieve the name of the running pod for the workflow") + labels := fmt.Sprintf("sonataflow.org/workflow-app=%s,sonataflow.org/workflow-namespace=%s", workflowName, ns) + podName := waitForPodRestartCompletion(labels, ns) + + By("check service monitor has been created") + EventuallyWithOffset(1, func() bool { + cmd := exec.Command("kubectl", "get", "servicemonitor", workflowName, "-n", ns) + _, err := utils.Run(cmd) + if err != nil { + GinkgoWriter.Println(fmt.Errorf("failed to get servicemonitor: %v", err)) + return false + } return true - } - GinkgoWriter.Println("waitForPodRestartCompletion") - waitForPodRestartCompletion(workflowAppLabel, ns) - GinkgoWriter.Println("waitForPodRestartCompletion done") - return true - }, 5*time.Minute, 5*time.Second).Should(BeTrue()) - - By("Evaluate status of the workflow's pod database connection health endpoint") - cmd = exec.Command("kubectl", "get", "pod", "-l", workflowAppLabel, "-n", ns, "-ojsonpath={.items[*].metadata.name}") - output, err := utils.Run(cmd) - Expect(err).NotTo(HaveOccurred()) - EventuallyWithOffset(1, func() bool { - for _, pn := range strings.Split(string(output), " ") { - h, err := getHealthFromPod(pn, ns) + }, 1*time.Minute, 5).Should(BeTrue()) + + By("trigger a new workflow instance") + var workflowId string + EventuallyWithOffset(1, func() bool { + curlCmd := fmt.Sprintf("curl -X POST -H 'Content-Type: application/json' -H 'Accept: application/json' http://%s/%s", workflowName, workflowName) + cmd := exec.Command("kubectl", "exec", podName, "-c", "workflow", "-n", ns, "--", "/bin/bash", "-c", curlCmd) + resp, err := utils.Run(cmd) if err != nil { - continue + GinkgoWriter.Println(fmt.Errorf("failed to trigger workflow instance: %v", err)) + return false } - Expect(h.Status).To(Equal(upStatus), "Pod health is not UP") - if withPersistence { - connectionCheckFound := false - for _, c := range h.Checks { - if c.Name == dbConnectionName { - Expect(c.Status).To(Equal(upStatus), "Pod's database connection is not UP") - Expect(c.Data[defaultDataCheck]).To(Equal(upStatus), "Pod's 'default' database data is not UP") - connectionCheckFound = true - } - } - Expect(connectionCheckFound).To(Equal(true), "Connection health check not found, but the wofkflow has persistence") - return true + GinkgoWriter.Println(fmt.Errorf("Response: %v", string(resp))) + if id, err := getWorkflowId(string(resp)); err != nil { + GinkgoWriter.Println(err) + return false } else { - connectionCheckFound := false - for _, c := range h.Checks { - if c.Name == dbConnectionName { - connectionCheckFound = true - } - } - Expect(connectionCheckFound).To(Equal(false), "Connection health check was found, but the workflow don't have persistence") + workflowId = id + GinkgoWriter.Println("Workflow id found:", id) return true } - } - return false - }, 4*time.Minute).Should(BeTrue()) - // Persistence initialization checks - cmd = exec.Command("kubectl", "get", "pod", "-l", workflowAppLabel, "-n", ns, "-ojsonpath={.items[*].metadata.name}") - output, err = utils.Run(cmd) - Expect(err).NotTo(HaveOccurred()) - podName := string(output) - cmd = exec.Command("kubectl", "logs", podName, "-n", ns) - output, err = utils.Run(cmd) - Expect(err).NotTo(HaveOccurred()) - logs := string(output) - if withPersistence { - By("Validate that the workflow persistence was properly initialized") - Expect(logs).Should(ContainSubstring("Flyway Community Edition")) - Expect(logs).Should(ContainSubstring("Database: jdbc:postgresql://postgres.%s:5432", ns)) - result := verifySchemaMigration(logs, prebuiltWorkflows.CallBackPersistence.Name) - GinkgoWriter.Println(fmt.Sprintf("verifySchemaMigration: %v", result)) - Expect(result).Should(BeTrue()) - Expect(logs).Should(ContainSubstring("Profile prod activated")) - } else { - By("Validate that the workflow has no persistence") - Expect(logs).ShouldNot(ContainSubstring("Flyway Community Edition")) - Expect(logs).ShouldNot(ContainSubstring(fmt.Sprintf(`Creating schema "%s"`, prebuiltWorkflows.CallBack.Name))) - Expect(logs).Should(ContainSubstring("Profile prod activated")) - } - }, - Entry("defined in the workflow from an existing kubernetes service as a reference", test.GetPathFromE2EDirectory("workflows", "persistence", "by_service"), true, false), - Entry("defined in the workflow and from the sonataflow platform", test.GetPathFromE2EDirectory("workflows", "persistence", "from_platform_overwritten_by_service"), true, false), - Entry("defined from the sonataflow platform as reference and with DI and JS", test.GetPathFromE2EDirectory("workflows", "persistence", "from_platform_with_di_and_js_services"), true, true), - Entry("defined from the sonataflow platform as reference and without DI and JS", test.GetPathFromE2EDirectory("workflows", "persistence", "from_platform_without_di_and_js_services"), true, false), - Entry("defined from the sonataflow platform as reference but not required by the workflow", test.GetPathFromE2EDirectory("workflows", "persistence", "from_platform_with_no_persistence_required"), false, false), - ) + }, 2*time.Minute, 5).Should(BeTrue()) + + By("check the new workflow instance has finished") + checkStr := fmt.Sprintf("callback-state-timeouts: %s has finalized", workflowId) + EventuallyWithOffset(1, func() bool { + cmdLog := exec.Command("kubectl", "logs", podName, "-c", "workflow", "-n", ns) + responseLog, err := utils.Run(cmdLog) + if err == nil { + if strings.Contains(string(responseLog), checkStr) { + return true + } + GinkgoWriter.Println(fmt.Errorf("did not find matched string in the log")) + } else { + GinkgoWriter.Println(fmt.Errorf("failed to get logs from workflow pod: %v", err)) + } + return false + }, 5*time.Minute, 5).Should(BeTrue()) + + By("check prometheus server has workflow instance metrics") + EventuallyWithOffset(1, func() bool { + curlCmd := fmt.Sprintf("curl http://prometheus-operated.default:9090/api/v1/query --data-urlencode 'query=kogito_process_instance_duration_seconds_count{job=\"%s\",namespace=\"%s\"}'", workflowName, ns) + GinkgoWriter.Println(curlCmd) + cmd := exec.Command("kubectl", "exec", podName, "-c", "workflow", "-n", ns, "--", "/bin/bash", "-c", curlCmd) + resp, err := utils.Run(cmd) + if err != nil { + GinkgoWriter.Println(fmt.Errorf("failed to get metrics from prometheus server: %v", err)) + return false + } + if val, err := getMetricValue(string(resp)); err != nil { + GinkgoWriter.Println(err) + return false + } else { + GinkgoWriter.Println("metric value found:", val) + return val == "1" + } + }, 5*time.Minute, 5).Should(BeTrue()) + }) + }) }) diff --git a/test/kubernetes_cli.go b/test/kubernetes_cli.go index 810dd1932..efa6b2d05 100644 --- a/test/kubernetes_cli.go +++ b/test/kubernetes_cli.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/apache/incubator-kie-kogito-serverless-operator/utils" + prometheus "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" servingv1 "knative.dev/serving/pkg/apis/serving/v1" @@ -103,6 +104,7 @@ func NewKogitoClientBuilderWithOpenShift() *SonataFlowClientBuilder { utilruntime.Must(operatorapi.AddToScheme(s)) utilruntime.Must(eventingv1.AddToScheme(s)) utilruntime.Must(sourcesv1.AddToScheme(s)) + utilruntime.Must(prometheus.AddToScheme(s)) builder := fake.NewClientBuilder().WithScheme(s) return &SonataFlowClientBuilder{ innerBuilder: builder, diff --git a/test/testdata/grafana.yaml b/test/testdata/grafana.yaml new file mode 100644 index 000000000..3390aac86 --- /dev/null +++ b/test/testdata/grafana.yaml @@ -0,0 +1,26 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +apiVersion: grafana.integreatly.org/v1beta1 +kind: Grafana +metadata: + name: grafana + labels: + dashboards: "grafana" +spec: + config: + security: + admin_user: root + admin_password: secret diff --git a/test/testdata/prometheus.yaml b/test/testdata/prometheus.yaml new file mode 100644 index 000000000..44eeb156d --- /dev/null +++ b/test/testdata/prometheus.yaml @@ -0,0 +1,69 @@ +# Copyright 2024 Apache Software Foundation (ASF) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: monitoring.coreos.com/v1 +kind: Prometheus +metadata: + name: prometheus +spec: + serviceAccountName: prometheus + serviceMonitorNamespaceSelector: {} + serviceMonitorSelector: {} + podMonitorSelector: {} + resources: + requests: + memory: 400Mi +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: prometheus +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: prometheus +rules: +- apiGroups: [""] + resources: + - nodes + - nodes/metrics + - services + - endpoints + - pods + verbs: ["get", "list", "watch"] +- apiGroups: [""] + resources: + - configmaps + verbs: ["get"] +- apiGroups: + - networking.k8s.io + resources: + - ingresses + verbs: ["get", "list", "watch"] +- nonResourceURLs: ["/metrics"] + verbs: ["get"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: prometheus +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: prometheus +subjects: +- kind: ServiceAccount + name: prometheus + namespace: default diff --git a/test/yaml.go b/test/yaml.go index 276f9b9b7..752f05620 100644 --- a/test/yaml.go +++ b/test/yaml.go @@ -323,12 +323,13 @@ func getProjectDir() string { return projectDir } -func CreateFakeKnativeDiscoveryClient() discovery.DiscoveryInterface { +func CreateFakeKnativeAndMonitoringDiscoveryClient() discovery.DiscoveryInterface { return &discfake.FakeDiscovery{ Fake: &clienttesting.Fake{ Resources: []*metav1.APIResourceList{ {GroupVersion: "serving.knative.dev/v1"}, {GroupVersion: "eventing.knative.dev/v1"}, + {GroupVersion: "monitoring.coreos.com/v1"}, }, }, } diff --git a/utils/client.go b/utils/client.go index 9b1cd0856..75d9c01a2 100644 --- a/utils/client.go +++ b/utils/client.go @@ -14,9 +14,14 @@ package utils -import "sigs.k8s.io/controller-runtime/pkg/client" +import ( + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" +) var k8sClient client.Client +var discoveryClient discovery.DiscoveryInterface // TODO: consider refactor the internals as we progress adding features to rely on this client instead of passing it through all the functions @@ -30,3 +35,18 @@ func GetClient() client.Client { func SetClient(client client.Client) { k8sClient = client } + +func GetDiscoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface, error) { + if discoveryClient == nil { + if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err != nil { + return nil, err + } else { + discoveryClient = cli + } + } + return discoveryClient, nil +} + +func SetDiscoveryClient(cli discovery.DiscoveryInterface) { + discoveryClient = cli +} diff --git a/utils/kubernetes/security.go b/utils/kubernetes/security.go index d17a51ac9..50914c1d5 100644 --- a/utils/kubernetes/security.go +++ b/utils/kubernetes/security.go @@ -20,9 +20,8 @@ package kubernetes import ( - corev1 "k8s.io/api/core/v1" - "github.com/apache/incubator-kie-kogito-serverless-operator/utils" + corev1 "k8s.io/api/core/v1" ) func SecurityDefaults() *corev1.SecurityContext {