Skip to content

Commit

Permalink
kie-issues-314: Operator driven service discovery API Phase3
Browse files Browse the repository at this point in the history
    - Code review suggestions 1
  • Loading branch information
wmedvede committed Dec 11, 2023
1 parent fbeaa46 commit 3e46c68
Show file tree
Hide file tree
Showing 19 changed files with 392 additions and 293 deletions.
8 changes: 2 additions & 6 deletions controllers/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ import (
"context"
"fmt"

clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"

clientservingv1 "knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -89,10 +85,10 @@ type sonataFlowServiceCatalog struct {
}

// NewServiceCatalog returns a new ServiceCatalog configured to resolve kubernetes, knative, and openshift resource addresses.
func NewServiceCatalog(cli client.Client, knServingClient clientservingv1.ServingV1Interface, knEventingClient clienteventingv1.EventingV1Interface) ServiceCatalog {
func NewServiceCatalog(cli client.Client, knDiscoveryClient *KnDiscoveryClient) ServiceCatalog {
return &sonataFlowServiceCatalog{
kubernetesCatalog: newK8SServiceCatalog(cli),
knativeCatalog: newKnServiceCatalog(knServingClient, knEventingClient),
knativeCatalog: newKnServiceCatalog(knDiscoveryClient),
}
}

Expand Down
8 changes: 4 additions & 4 deletions controllers/discovery/discovery_knative_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Test_QueryKnativeService(t *testing.T) {

func Test_QueryKnativeServiceNotFound(t *testing.T) {
_, client := fakeservingclient.With(context.TODO())
ctg := NewServiceCatalog(nil, client.ServingV1(), nil)
ctg := NewServiceCatalog(nil, NewKnDiscoveryClient(client.ServingV1(), nil))
doTestQueryWithError(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("services").
Group("serving.knative.dev").
Expand Down Expand Up @@ -72,7 +72,7 @@ func doTestQueryKnativeService(t *testing.T, expectedUri string) {
},
}
_, client := fakeservingclient.With(context.TODO(), service)
ctg := NewServiceCatalog(nil, client.ServingV1(), nil)
ctg := NewServiceCatalog(nil, NewKnDiscoveryClient(client.ServingV1(), nil))
doTestQuery(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("services").
Group("serving.knative.dev").
Expand All @@ -87,7 +87,7 @@ func Test_QueryKnativeBroker(t *testing.T) {

func Test_QueryKnativeBrokerNotFound(t *testing.T) {
_, client := fakeeventingclient.With(context.TODO())
ctg := NewServiceCatalog(nil, nil, client.EventingV1())
ctg := NewServiceCatalog(nil, NewKnDiscoveryClient(nil, client.EventingV1()))
doTestQueryWithError(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("brokers").
Group("eventing.knative.dev").
Expand Down Expand Up @@ -115,7 +115,7 @@ func doTestQueryKnativeBroker(t *testing.T, expectedUri string) {
},
}
_, client := fakeeventingclient.With(context.TODO(), broker)
ctg := NewServiceCatalog(nil, nil, client.EventingV1())
ctg := NewServiceCatalog(nil, NewKnDiscoveryClient(nil, client.EventingV1()))
doTestQuery(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("brokers").
Group("eventing.knative.dev").
Expand Down
14 changes: 7 additions & 7 deletions controllers/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func doTestQueryKubernetesService(t *testing.T, outputFormat string, expectedUri
service.Spec.Type = corev1.ServiceTypeNodePort
service.Spec.ClusterIP = "10.1.5.18"
cli := fake.NewClientBuilder().WithRuntimeObjects(service).Build()
ctg := NewServiceCatalog(cli, nil, nil)
ctg := NewServiceCatalog(cli, NewKnDiscoveryClient(nil, nil))
doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Kind("services").
Version("v1").
Expand All @@ -86,7 +86,7 @@ func doTestQueryKubernetesPod(t *testing.T, outputFormat string, expectedUri str
*mockContainerWithPorts("container1Name", mockContainerPort(httpProtocol, tcp, defaultHttpPort)))
pod.Status.PodIP = "10.1.12.13"
cli := fake.NewClientBuilder().WithRuntimeObjects(pod).Build()
ctg := NewServiceCatalog(cli, nil, nil)
ctg := NewServiceCatalog(cli, NewKnDiscoveryClient(nil, nil))
doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Kind("pods").
Version("v1").
Expand Down Expand Up @@ -116,7 +116,7 @@ func doTesQueryKubernetesDeploymentWithService(t *testing.T, outputFormat string
service.Spec.Type = corev1.ServiceTypeNodePort

cli := fake.NewClientBuilder().WithRuntimeObjects(deployment, service).Build()
ctg := NewServiceCatalog(cli, nil, nil)
ctg := NewServiceCatalog(cli, NewKnDiscoveryClient(nil, nil))

doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand All @@ -142,7 +142,7 @@ func doTestQueryKubernetesDeploymentWithoutService(t *testing.T, outputFormat st
}

deployment := mockDeployment(namespace1, deployment1Name, nil, &selector)
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(deployment).Build(), nil, nil)
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(deployment).Build(), NewKnDiscoveryClient(nil, nil))

uri := *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand Down Expand Up @@ -176,7 +176,7 @@ func doTestQueryKubernetesStatefulSetWithService(t *testing.T, outputFormat stri
service.Spec.Type = corev1.ServiceTypeNodePort

cli := fake.NewClientBuilder().WithRuntimeObjects(statefulSet, service).Build()
ctg := NewServiceCatalog(cli, nil, nil)
ctg := NewServiceCatalog(cli, NewKnDiscoveryClient(nil, nil))

doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand All @@ -202,7 +202,7 @@ func doTestQueryKubernetesStatefulSetWithoutService(t *testing.T, outputFormat s
}

statefulSet := mockStatefulSet(namespace1, statefulSet1Name, nil, &selector)
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(statefulSet).Build(), nil, nil)
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(statefulSet).Build(), NewKnDiscoveryClient(nil, nil))

uri := *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand Down Expand Up @@ -237,7 +237,7 @@ func doTestQueryKubernetesIngress(t *testing.T, hostName string, ip string, tls
ingress.Spec.TLS = []v1.IngressTLS{{}}
}
cli := fake.NewClientBuilder().WithRuntimeObjects(ingress).Build()
ctg := NewServiceCatalog(cli, nil, nil)
ctg := NewServiceCatalog(cli, NewKnDiscoveryClient(nil, nil))
doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Kind("ingresses").
Group("networking.k8s.io").
Expand Down
23 changes: 18 additions & 5 deletions controllers/discovery/knative_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,31 @@ const (
)

type knServiceCatalog struct {
dc *KnDiscoveryClient
}

type KnDiscoveryClient struct {
ServingClient clientservingv1.ServingV1Interface
EventingClient clienteventingv1.EventingV1Interface
}

func newKnServiceCatalog(servingClient clientservingv1.ServingV1Interface, eventingClient clienteventingv1.EventingV1Interface) knServiceCatalog {
func newKnServiceCatalog(discoveryClient *KnDiscoveryClient) knServiceCatalog {
return knServiceCatalog{
dc: discoveryClient,
}
}

func NewKnDiscoveryClient(servingClient clientservingv1.ServingV1Interface, eventingClient clienteventingv1.EventingV1Interface) *KnDiscoveryClient {
return &KnDiscoveryClient{
ServingClient: servingClient,
EventingClient: eventingClient,
}
}

func (c knServiceCatalog) Query(ctx context.Context, uri ResourceUri, outputFormat string) (string, error) {
if c.dc == nil {
return "", fmt.Errorf("knative KnDiscoveryClient was not provided, maybe knative is not installed in current cluster")
}
switch uri.GVK.Kind {
case knServiceKind:
return c.resolveKnServiceQuery(ctx, uri)
Expand All @@ -58,10 +71,10 @@ func (c knServiceCatalog) Query(ctx context.Context, uri ResourceUri, outputForm
}

func (c knServiceCatalog) resolveKnServiceQuery(ctx context.Context, uri ResourceUri) (string, error) {
if c.ServingClient == nil {
if c.dc.ServingClient == nil {
return "", fmt.Errorf("knative ServingClient was not provided, maybe the serving.knative.dev api is not installed in current cluster")
}
if service, err := c.ServingClient.Services(uri.Namespace).Get(ctx, uri.Name, metav1.GetOptions{}); err != nil {
if service, err := c.dc.ServingClient.Services(uri.Namespace).Get(ctx, uri.Name, metav1.GetOptions{}); err != nil {
return "", err
} else {
// knative objects discovery should rely on the addressable interface
Expand All @@ -70,10 +83,10 @@ func (c knServiceCatalog) resolveKnServiceQuery(ctx context.Context, uri Resourc
}

func (c knServiceCatalog) resolveKnBrokerQuery(ctx context.Context, uri ResourceUri) (string, error) {
if c.EventingClient == nil {
if c.dc.EventingClient == nil {
return "", fmt.Errorf("knative EventingClient was not provided, maybe the eventing.knative.dev api is not installed in current cluster")
}
if broker, err := c.EventingClient.Brokers(uri.Namespace).Get(ctx, uri.Name, metav1.GetOptions{}); err != nil {
if broker, err := c.dc.EventingClient.Brokers(uri.Namespace).Get(ctx, uri.Name, metav1.GetOptions{}); err != nil {
return "", err
} else {
// knative objects discovery should rely on the addressable interface
Expand Down
86 changes: 86 additions & 0 deletions controllers/knative/knative.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package knative

import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
clientservingv1 "knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1"
)

var servingClient clientservingv1.ServingV1Interface
var eventingClient clienteventingv1.EventingV1Interface

type Info struct {
IsKnativeEventing bool
IsKnativeServing bool
}

func GetKnativeServingClient(cfg *rest.Config) (clientservingv1.ServingV1Interface, error) {
if servingClient == nil {
if knServingClient, err := NewKnativeServingClient(cfg); err != nil {
return nil, err
} else {
servingClient = knServingClient
}
}
return servingClient, nil
}

func GetKnativeEventingClient(cfg *rest.Config) (clienteventingv1.EventingV1Interface, error) {
if eventingClient == nil {
if knEventingClient, err := NewKnativeEventingClient(cfg); err != nil {
return nil, err
} else {
eventingClient = knEventingClient
}
}
return eventingClient, nil
}

func NewKnativeServingClient(cfg *rest.Config) (*clientservingv1.ServingV1Client, error) {
return clientservingv1.NewForConfig(cfg)
}

func NewKnativeEventingClient(cfg *rest.Config) (*clienteventingv1.EventingV1Client, error) {
return clienteventingv1.NewForConfig(cfg)
}

func GetKnativeInfo(cfg *rest.Config) (*Info, error) {
if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err != nil {
return nil, err
} else {
apiList, err := cli.ServerGroups()
if err != nil {
return nil, err
}
result := new(Info)
for _, group := range apiList.Groups {
if group.Name == "serving.knative.dev" {
result.IsKnativeServing = true
}
if group.Name == "eventing.knative.dev" {
result.IsKnativeEventing = true
}
}
return result, nil
}
}
Loading

0 comments on commit 3e46c68

Please sign in to comment.