Skip to content

Commit

Permalink
kie-issues-314: Operator driven service discovery API Phase3
Browse files Browse the repository at this point in the history
    - Code review suggestions 2
  • Loading branch information
wmedvede committed Dec 13, 2023
1 parent 3e46c68 commit ae8ab89
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 117 deletions.
9 changes: 9 additions & 0 deletions controllers/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"context"
"fmt"

"k8s.io/client-go/rest"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -92,6 +94,13 @@ func NewServiceCatalog(cli client.Client, knDiscoveryClient *KnDiscoveryClient)
}
}

func NewServiceCatalogForConfig(cli client.Client, cfg *rest.Config) ServiceCatalog {
return &sonataFlowServiceCatalog{
kubernetesCatalog: newK8SServiceCatalog(cli),
knativeCatalog: newKnServiceCatalogForConfig(cfg),
}
}

func (c *sonataFlowServiceCatalog) Query(ctx context.Context, uri ResourceUri, outputFormat string) (string, error) {
switch uri.Scheme {
case KubernetesScheme:
Expand Down
8 changes: 4 additions & 4 deletions controllers/discovery/discovery_knative_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Test_QueryKnativeService(t *testing.T) {

func Test_QueryKnativeServiceNotFound(t *testing.T) {
_, client := fakeservingclient.With(context.TODO())
ctg := NewServiceCatalog(nil, NewKnDiscoveryClient(client.ServingV1(), nil))
ctg := NewServiceCatalog(nil, newKnDiscoveryClient(client.ServingV1(), nil))
doTestQueryWithError(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("services").
Group("serving.knative.dev").
Expand Down Expand Up @@ -72,7 +72,7 @@ func doTestQueryKnativeService(t *testing.T, expectedUri string) {
},
}
_, client := fakeservingclient.With(context.TODO(), service)
ctg := NewServiceCatalog(nil, NewKnDiscoveryClient(client.ServingV1(), nil))
ctg := NewServiceCatalog(nil, newKnDiscoveryClient(client.ServingV1(), nil))
doTestQuery(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("services").
Group("serving.knative.dev").
Expand All @@ -87,7 +87,7 @@ func Test_QueryKnativeBroker(t *testing.T) {

func Test_QueryKnativeBrokerNotFound(t *testing.T) {
_, client := fakeeventingclient.With(context.TODO())
ctg := NewServiceCatalog(nil, NewKnDiscoveryClient(nil, client.EventingV1()))
ctg := NewServiceCatalog(nil, newKnDiscoveryClient(nil, client.EventingV1()))
doTestQueryWithError(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("brokers").
Group("eventing.knative.dev").
Expand Down Expand Up @@ -115,7 +115,7 @@ func doTestQueryKnativeBroker(t *testing.T, expectedUri string) {
},
}
_, client := fakeeventingclient.With(context.TODO(), broker)
ctg := NewServiceCatalog(nil, NewKnDiscoveryClient(nil, client.EventingV1()))
ctg := NewServiceCatalog(nil, newKnDiscoveryClient(nil, client.EventingV1()))
doTestQuery(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("brokers").
Group("eventing.knative.dev").
Expand Down
14 changes: 7 additions & 7 deletions controllers/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func doTestQueryKubernetesService(t *testing.T, outputFormat string, expectedUri
service.Spec.Type = corev1.ServiceTypeNodePort
service.Spec.ClusterIP = "10.1.5.18"
cli := fake.NewClientBuilder().WithRuntimeObjects(service).Build()
ctg := NewServiceCatalog(cli, NewKnDiscoveryClient(nil, nil))
ctg := NewServiceCatalog(cli, newKnDiscoveryClient(nil, nil))
doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Kind("services").
Version("v1").
Expand All @@ -86,7 +86,7 @@ func doTestQueryKubernetesPod(t *testing.T, outputFormat string, expectedUri str
*mockContainerWithPorts("container1Name", mockContainerPort(httpProtocol, tcp, defaultHttpPort)))
pod.Status.PodIP = "10.1.12.13"
cli := fake.NewClientBuilder().WithRuntimeObjects(pod).Build()
ctg := NewServiceCatalog(cli, NewKnDiscoveryClient(nil, nil))
ctg := NewServiceCatalog(cli, newKnDiscoveryClient(nil, nil))
doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Kind("pods").
Version("v1").
Expand Down Expand Up @@ -116,7 +116,7 @@ func doTesQueryKubernetesDeploymentWithService(t *testing.T, outputFormat string
service.Spec.Type = corev1.ServiceTypeNodePort

cli := fake.NewClientBuilder().WithRuntimeObjects(deployment, service).Build()
ctg := NewServiceCatalog(cli, NewKnDiscoveryClient(nil, nil))
ctg := NewServiceCatalog(cli, newKnDiscoveryClient(nil, nil))

doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand All @@ -142,7 +142,7 @@ func doTestQueryKubernetesDeploymentWithoutService(t *testing.T, outputFormat st
}

deployment := mockDeployment(namespace1, deployment1Name, nil, &selector)
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(deployment).Build(), NewKnDiscoveryClient(nil, nil))
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(deployment).Build(), newKnDiscoveryClient(nil, nil))

uri := *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand Down Expand Up @@ -176,7 +176,7 @@ func doTestQueryKubernetesStatefulSetWithService(t *testing.T, outputFormat stri
service.Spec.Type = corev1.ServiceTypeNodePort

cli := fake.NewClientBuilder().WithRuntimeObjects(statefulSet, service).Build()
ctg := NewServiceCatalog(cli, NewKnDiscoveryClient(nil, nil))
ctg := NewServiceCatalog(cli, newKnDiscoveryClient(nil, nil))

doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand All @@ -202,7 +202,7 @@ func doTestQueryKubernetesStatefulSetWithoutService(t *testing.T, outputFormat s
}

statefulSet := mockStatefulSet(namespace1, statefulSet1Name, nil, &selector)
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(statefulSet).Build(), NewKnDiscoveryClient(nil, nil))
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(statefulSet).Build(), newKnDiscoveryClient(nil, nil))

uri := *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand Down Expand Up @@ -237,7 +237,7 @@ func doTestQueryKubernetesIngress(t *testing.T, hostName string, ip string, tls
ingress.Spec.TLS = []v1.IngressTLS{{}}
}
cli := fake.NewClientBuilder().WithRuntimeObjects(ingress).Build()
ctg := NewServiceCatalog(cli, NewKnDiscoveryClient(nil, nil))
ctg := NewServiceCatalog(cli, newKnDiscoveryClient(nil, nil))
doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Kind("ingresses").
Group("networking.k8s.io").
Expand Down
42 changes: 41 additions & 1 deletion controllers/discovery/knative_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"context"
"fmt"

"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
Expand All @@ -49,7 +54,42 @@ func newKnServiceCatalog(discoveryClient *KnDiscoveryClient) knServiceCatalog {
}
}

func NewKnDiscoveryClient(servingClient clientservingv1.ServingV1Interface, eventingClient clienteventingv1.EventingV1Interface) *KnDiscoveryClient {
func newKnServiceCatalogForConfig(cfg *rest.Config) knServiceCatalog {
return knServiceCatalog{
dc: newKnDiscoveryClientForConfig(cfg),
}
}

// newKnDiscoveryClientForConfig returns a KnDiscoveryClient discovery client depending on the cluster status, if knative
// serving nor knative eventing are installed, or it was not possible to create that client, returns null.
func newKnDiscoveryClientForConfig(cfg *rest.Config) *KnDiscoveryClient {
var servingClient clientservingv1.ServingV1Interface
var eventingClient clienteventingv1.EventingV1Interface

if avail, err := knative.GetKnativeAvailability(cfg); err != nil {
klog.V(log.E).ErrorS(err, "Unable to determine if knative is installed in the cluster")
return nil
} else {
if avail.Serving {
if servingClient, err = knative.GetKnativeServingClient(cfg); err != nil {
klog.V(log.E).ErrorS(err, "Unable to get the knative serving client")
return nil
}
}
if avail.Eventing {
if eventingClient, err = knative.GetKnativeEventingClient(cfg); err != nil {
klog.V(log.E).ErrorS(err, "Unable to get the knative eventing client")
return nil
}
}
if servingClient != nil || eventingClient != nil {
return newKnDiscoveryClient(servingClient, eventingClient)
}
}
return nil
}

func newKnDiscoveryClient(servingClient clientservingv1.ServingV1Interface, eventingClient clienteventingv1.EventingV1Interface) *KnDiscoveryClient {
return &KnDiscoveryClient{
ServingClient: servingClient,
EventingClient: eventingClient,
Expand Down
14 changes: 7 additions & 7 deletions controllers/knative/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
var servingClient clientservingv1.ServingV1Interface
var eventingClient clienteventingv1.EventingV1Interface

type Info struct {
IsKnativeEventing bool
IsKnativeServing bool
type Availability struct {
Eventing bool
Serving bool
}

func GetKnativeServingClient(cfg *rest.Config) (clientservingv1.ServingV1Interface, error) {
Expand Down Expand Up @@ -64,21 +64,21 @@ func NewKnativeEventingClient(cfg *rest.Config) (*clienteventingv1.EventingV1Cli
return clienteventingv1.NewForConfig(cfg)
}

func GetKnativeInfo(cfg *rest.Config) (*Info, error) {
func GetKnativeAvailability(cfg *rest.Config) (*Availability, error) {
if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err != nil {
return nil, err
} else {
apiList, err := cli.ServerGroups()
if err != nil {
return nil, err
}
result := new(Info)
result := new(Availability)
for _, group := range apiList.Groups {
if group.Name == "serving.knative.dev" {
result.IsKnativeServing = true
result.Serving = true
}
if group.Name == "eventing.knative.dev" {
result.IsKnativeEventing = true
result.Eventing = true
}
}
return result, nil
Expand Down
5 changes: 3 additions & 2 deletions controllers/profiles/dev/profile_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package dev

import (
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -41,10 +42,10 @@ func (d developmentProfile) GetProfile() metadata.ProfileType {
return metadata.DevProfile
}

func NewProfileReconciler(client client.Client, knDiscoveryClient *discovery.KnDiscoveryClient) profiles.ProfileReconciler {
func NewProfileReconciler(client client.Client, cfg *rest.Config) profiles.ProfileReconciler {
support := &common.StateSupport{
C: client,
Catalog: discovery.NewServiceCatalog(client, knDiscoveryClient),
Catalog: discovery.NewServiceCatalogForConfig(client, cfg),
}

var ensurers *objectEnsurers
Expand Down
20 changes: 11 additions & 9 deletions controllers/profiles/dev/profile_dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sort"
"testing"

"k8s.io/client-go/rest"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -54,7 +56,7 @@ func Test_OverrideStartupProbe(t *testing.T) {

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()

devReconciler := NewProfileReconciler(client, nil)
devReconciler := NewProfileReconciler(client, &rest.Config{})

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -81,7 +83,7 @@ func Test_recoverFromFailureNoDeployment(t *testing.T) {
workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.DeploymentFailureReason, "")
client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()

reconciler := NewProfileReconciler(client, nil)
reconciler := NewProfileReconciler(client, &rest.Config{})

// we are in failed state and have no objects
result, err := reconciler.Reconcile(context.TODO(), workflow)
Expand Down Expand Up @@ -122,7 +124,7 @@ func Test_newDevProfile(t *testing.T) {

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()

devReconciler := NewProfileReconciler(client, nil)
devReconciler := NewProfileReconciler(client, &rest.Config{})

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand Down Expand Up @@ -195,7 +197,7 @@ func Test_newDevProfile(t *testing.T) {
func Test_devProfileImageDefaultsNoPlatform(t *testing.T) {
workflow := test.GetBaseSonataFlowWithDevProfile(t.Name())
client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()
devReconciler := NewProfileReconciler(client, nil)
devReconciler := NewProfileReconciler(client, &rest.Config{})

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -212,7 +214,7 @@ func Test_devProfileWithImageSnapshotOverrideWithPlatform(t *testing.T) {
platform := test.GetBasePlatformWithDevBaseImageInReadyPhase(workflow.Namespace)

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build()
devReconciler := NewProfileReconciler(client, nil)
devReconciler := NewProfileReconciler(client, &rest.Config{})

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -229,7 +231,7 @@ func Test_devProfileWithWPlatformWithoutDevBaseImageAndWithBaseImage(t *testing.
platform := test.GetBasePlatformWithBaseImageInReadyPhase(workflow.Namespace)

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build()
devReconciler := NewProfileReconciler(client, nil)
devReconciler := NewProfileReconciler(client, &rest.Config{})

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -246,7 +248,7 @@ func Test_devProfileWithPlatformWithoutDevBaseImageAndWithoutBaseImage(t *testin
platform := test.GetBasePlatformInReadyPhase(workflow.Namespace)

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build()
devReconciler := NewProfileReconciler(client, nil)
devReconciler := NewProfileReconciler(client, &rest.Config{})

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -265,7 +267,7 @@ func Test_newDevProfileWithExternalConfigMaps(t *testing.T) {

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()

devReconciler := NewProfileReconciler(client, nil)
devReconciler := NewProfileReconciler(client, &rest.Config{})

camelXmlRouteFileName := "camelroute-xml"
xmlRoute := `<route routeConfigurationId="xmlError">
Expand Down Expand Up @@ -381,7 +383,7 @@ func Test_VolumeWithCapitalizedPaths(t *testing.T) {

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, configMap).WithStatusSubresource(workflow, configMap).Build()

devReconciler := NewProfileReconciler(client, nil)
devReconciler := NewProfileReconciler(client, &rest.Config{})

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions controllers/profiles/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package factory

import (
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata"
Expand All @@ -36,7 +36,7 @@ const (
opsProfile metadata.ProfileType = "prod_for_ops"
)

type reconcilerBuilder func(client client.Client, knDiscoveryClient *discovery.KnDiscoveryClient) profiles.ProfileReconciler
type reconcilerBuilder func(client client.Client, cfg *rest.Config) profiles.ProfileReconciler

var profileBuilders = map[metadata.ProfileType]reconcilerBuilder{
metadata.ProdProfile: prod.NewProfileReconciler,
Expand All @@ -59,6 +59,6 @@ func profileBuilder(workflow *operatorapi.SonataFlow) reconcilerBuilder {
}

// NewReconciler creates a new ProfileReconciler based on the given workflow and context.
func NewReconciler(client client.Client, knDiscoveryClient *discovery.KnDiscoveryClient, workflow *operatorapi.SonataFlow) profiles.ProfileReconciler {
return profileBuilder(workflow)(client, knDiscoveryClient)
func NewReconciler(client client.Client, cfg *rest.Config, workflow *operatorapi.SonataFlow) profiles.ProfileReconciler {
return profileBuilder(workflow)(client, cfg)
}
10 changes: 6 additions & 4 deletions controllers/profiles/prod/profile_prod.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package prod
import (
"time"

"k8s.io/client-go/rest"

"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"

"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -64,10 +66,10 @@ func newObjectEnsurers(support *common.StateSupport) *objectEnsurers {

// NewProfileReconciler the default profile builder which includes a build state to run an internal build process
// to have an immutable workflow image deployed
func NewProfileReconciler(client client.Client, knDiscoveryClient *discovery.KnDiscoveryClient) profiles.ProfileReconciler {
func NewProfileReconciler(client client.Client, cfg *rest.Config) profiles.ProfileReconciler {
support := &common.StateSupport{
C: client,
Catalog: discovery.NewServiceCatalog(client, knDiscoveryClient),
Catalog: discovery.NewServiceCatalogForConfig(client, cfg),
}
// the reconciliation state machine
stateMachine := common.NewReconciliationStateMachine(
Expand All @@ -84,10 +86,10 @@ func NewProfileReconciler(client client.Client, knDiscoveryClient *discovery.KnD

// NewProfileForOpsReconciler creates an alternative prod profile that won't require to build the workflow image in order to deploy
// the workflow application. It assumes that the image has been built somewhere else.
func NewProfileForOpsReconciler(client client.Client, knDiscoveryClient *discovery.KnDiscoveryClient) profiles.ProfileReconciler {
func NewProfileForOpsReconciler(client client.Client, cfg *rest.Config) profiles.ProfileReconciler {
support := &common.StateSupport{
C: client,
Catalog: discovery.NewServiceCatalog(client, knDiscoveryClient),
Catalog: discovery.NewServiceCatalogForConfig(client, cfg),
}
// the reconciliation state machine
stateMachine := common.NewReconciliationStateMachine(
Expand Down
Loading

0 comments on commit ae8ab89

Please sign in to comment.