Skip to content

Commit

Permalink
incubator-kie-tools-2774: Make the SonataFlow Operator to configure t…
Browse files Browse the repository at this point in the history
…he Jobs related Knative Eventing Triggers to consume the events in order (apache#2775)
  • Loading branch information
wmedvede authored Nov 27, 2024
1 parent 886fd6e commit 0ca727d
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 39 deletions.
44 changes: 31 additions & 13 deletions packages/sonataflow-operator/internal/controller/knative/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ package knative
import (
"context"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
Expand All @@ -51,14 +53,16 @@ type Availability struct {
}

const (
kSink = "K_SINK"
knativeBundleVolume = "kne-bundle-volume"
kCeOverRides = "K_CE_OVERRIDES"
knativeServingGroup = "serving.knative.dev"
knativeEventingGroup = "eventing.knative.dev"
knativeEventingAPIVersion = "eventing.knative.dev/v1"
knativeBrokerKind = "Broker"
knativeSinkProvided = "SinkProvided"
kSink = "K_SINK"
knativeBundleVolume = "kne-bundle-volume"
kCeOverRides = "K_CE_OVERRIDES"
knativeServingGroup = "serving.knative.dev"
knativeEventingGroup = "eventing.knative.dev"
knativeEventingAPIVersion = "eventing.knative.dev/v1"
knativeBrokerKind = "Broker"
knativeSinkProvided = "SinkProvided"
KafkaKnativeEventingDeliveryOrder = "kafka.eventing.knative.dev/delivery.order"
KafkaKnativeEventingDeliveryOrderOrdered = "ordered"
)

func GetKnativeServingClient(cfg *rest.Config) (clientservingv1.ServingV1Interface, error) {
Expand Down Expand Up @@ -132,19 +136,33 @@ func getDestinationWithNamespace(dest *duckv1.Destination, namespace string) *du
return dest
}

func ValidateBroker(name, namespace string) error {
func ValidateBroker(name, namespace string) (*eventingv1.Broker, error) {
broker := &eventingv1.Broker{}
if err := utils.GetClient().Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, broker); err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("broker %s in namespace %s does not exist", name, namespace)
return nil, fmt.Errorf("broker %s in namespace %s does not exist", name, namespace)
}
return err
return nil, err
}
cond := broker.Status.GetCondition(apis.ConditionReady)
if cond != nil && cond.Status == corev1.ConditionTrue {
return nil
return broker, nil
}
return nil, fmt.Errorf("broker %s in namespace %s is not ready", name, namespace)
}

// GetBrokerClass returns the broker class for a Knative Eventing Broker.
func GetBrokerClass(broker *eventingv1.Broker) string {
if broker.Annotations == nil {
return ""
}
return fmt.Errorf("broker %s in namespace %s is not ready", name, namespace)
return broker.Annotations[eventing.BrokerClassKey]
}

// IsKafkaBroker returns true if the class for a Knative Eventing Broker corresponds to a Kafka broker.
func IsKafkaBroker(brokerClass string) bool {
// currently available kafka broker classes are "Kafka", and "KafkaNamespaced", for safety ask for the substring "Kafka".
return strings.Contains(brokerClass, "Kafka")
}

func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,12 +575,13 @@ func (d *DataIndexHandler) GetSourceBroker() *duckv1.Destination {
return GetPlatformBroker(d.platform)
}

func (d *DataIndexHandler) newTrigger(labels map[string]string, brokerName, namespace, serviceName, tag, eventType, path string, platform *operatorapi.SonataFlowPlatform) *eventingv1.Trigger {
func (d *DataIndexHandler) newTrigger(labels map[string]string, annotations map[string]string, brokerName, namespace, serviceName, tag, eventType, path string, platform *operatorapi.SonataFlowPlatform) *eventingv1.Trigger {
return &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())),
Namespace: namespace,
Labels: labels,
Name: kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())),
Namespace: namespace,
Labels: labels,
Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
Expand Down Expand Up @@ -613,24 +614,28 @@ func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.Sonata
if len(namespace) == 0 {
namespace = platform.Namespace
}
if err := knative.ValidateBroker(brokerName, namespace); err != nil {
var brokerObject *eventingv1.Broker
var err error
if brokerObject, err = knative.ValidateBroker(brokerName, namespace); err != nil {
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: WaitingKnativeEventing,
Message: fmt.Sprintf("%s for service: %s", err.Error(), d.GetServiceName()),
}
return nil, event, err
}
annotations := make(map[string]string)
managedAnnotations := make(map[string]string)
addTriggerAnnotations(knative.GetBrokerClass(brokerObject), managedAnnotations)
serviceName := d.GetServiceName()
return []client.Object{
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-sla", "ProcessInstanceSLADataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent", constants.KogitoProcessInstancesMultiEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent", constants.KogitoProcessInstancesMultiEventsPath, platform),
d.newTrigger(lbl, managedAnnotations, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
}

func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
Expand All @@ -640,6 +645,12 @@ func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
return GetPlatformBroker(d.platform)
}

func addTriggerAnnotations(brokerClass string, annotations map[string]string) {
if knative.IsKafkaBroker(brokerClass) {
annotations[knative.KafkaKnativeEventingDeliveryOrder] = knative.KafkaKnativeEventingDeliveryOrderOrdered
}
}

func (d JobServiceHandler) GetSink() *duckv1.Destination {
if d.platform.Spec.Services.JobService.Sink != nil {
return d.platform.Spec.Services.JobService.Sink
Expand All @@ -658,19 +669,24 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat
if len(namespace) == 0 {
namespace = platform.Namespace
}
if err := knative.ValidateBroker(brokerName, namespace); err != nil {
var brokerObject *eventingv1.Broker
var err error
if brokerObject, err = knative.ValidateBroker(brokerName, namespace); err != nil {
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: WaitingKnativeEventing,
Message: fmt.Sprintf("%s for service: %s", err.Error(), j.GetServiceName()),
}
return nil, event, err
}
annotations := make(map[string]string)
addTriggerAnnotations(knative.GetBrokerClass(brokerObject), annotations)
jobCreateTrigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())),
Namespace: namespace,
Labels: lbl,
Name: kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())),
Namespace: namespace,
Labels: lbl,
Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
Expand All @@ -695,9 +711,10 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat
resultObjs = append(resultObjs, jobCreateTrigger)
jobDeleteTrigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())),
Namespace: namespace,
Labels: lbl,
Name: kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())),
Namespace: namespace,
Labels: lbl,
Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func TriggersCreator(workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFl
// No broker configured for the eventType. Skip and will not create trigger for it.
continue
}
if err := knative.ValidateBroker(brokerRef.Name, brokerRef.Namespace); err != nil {
if _, err := knative.ValidateBroker(brokerRef.Name, brokerRef.Namespace); err != nil {
return nil, err
}
// construct eventingv1.Trigger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,6 @@ func TestSonataFlowPlatformController(t *testing.T) {
validateTrigger(t, cl, "data-index-process-definition-", ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-error-", ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-node-", ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-sla-", ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-state-", ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-variable-", ksp.Namespace, ksp, trigger)

Expand Down Expand Up @@ -1034,8 +1033,6 @@ func TestSonataFlowPlatformController(t *testing.T) {
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-node-", ksp.Namespace, ksp, trigger)
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-sla-", ksp.Namespace, ksp, trigger)
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-state-", ksp.Namespace, ksp, trigger)
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-variable-", ksp.Namespace, ksp, trigger)
Expand Down
2 changes: 0 additions & 2 deletions packages/sonataflow-operator/test/e2e/platform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() {
Expect(err).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-error-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-node-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-sla-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-state-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-variable-", constants.KogitoProcessInstancesEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath, targetNamespace, "di-source")).NotTo(HaveOccurred())
Expand Down Expand Up @@ -316,7 +315,6 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() {
Expect(err).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-error-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-node-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-sla-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-state-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-variable-", constants.KogitoProcessInstancesEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath, brokerNamespace, brokerName)).NotTo(HaveOccurred())
Expand Down

0 comments on commit 0ca727d

Please sign in to comment.