Skip to content

Commit

Permalink
kie-issues-314: Operator driven service discovery API Phase3
Browse files Browse the repository at this point in the history
  • Loading branch information
wmedvede committed Dec 4, 2023
1 parent 96d6b1d commit 51c6a1a
Show file tree
Hide file tree
Showing 27 changed files with 3,415 additions and 260 deletions.
36 changes: 36 additions & 0 deletions bundle/manifests/sonataflow-operator.clusterserviceversion.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,42 @@ spec:
- subjectaccessreviews
verbs:
- create
- apiGroups:
- apps
resources:
- statefulset
- statefulsets
verbs:
- get
- list
- watch
- apiGroups:
- networking.k8s.io
resources:
- ingress
- ingresses
verbs:
- get
- list
- watch
- apiGroups:
- serving.knative.dev
resources:
- service
- services
verbs:
- get
- list
- watch
- apiGroups:
- eventing.knative.dev
resources:
- broker
- brokers
verbs:
- get
- list
- watch
serviceAccountName: sonataflow-operator-controller-manager
deployments:
- label:
Expand Down
2 changes: 2 additions & 0 deletions config/rbac/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ resources:
- openshift_role_binding.yaml
- operator_role_leases.yaml
- operator_role_binding_leases.yaml
- service_discovery_role.yaml
- service_discovery_role_binding.yaml
# Comment the following 4 lines if you want to disable
# the auth proxy (https://github.com/brancz/kube-rbac-proxy)
# which protects your /metrics endpoint.
Expand Down
42 changes: 42 additions & 0 deletions config/rbac/service_discovery_role.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: service-discovery-role
rules:
- apiGroups:
- apps
resources:
- statefulset
- statefulsets
verbs:
- get
- list
- watch
- apiGroups:
- networking.k8s.io
resources:
- ingress
- ingresses
verbs:
- get
- list
- watch
- apiGroups:
- serving.knative.dev
resources:
- service
- services
verbs:
- get
- list
- watch
- apiGroups:
- eventing.knative.dev
resources:
- broker
- brokers
verbs:
- get
- list
- watch
14 changes: 14 additions & 0 deletions config/rbac/service_discovery_role_binding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: service-discovery-rolebinding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: service-discovery-role
subjects:
- kind: ServiceAccount
name: controller-manager
namespace: system

11 changes: 8 additions & 3 deletions controllers/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
"context"
"fmt"

clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"

clientservingv1 "knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -52,6 +56,7 @@ const (

// knative groups
knativeServices = "knative:services.v1.serving.knative.dev"
knativeBrokers = "knative:brokers.v1.eventing.knative.dev"

// openshift groups
openshiftRoutes = "openshift:routes.v1.route.openshift.io"
Expand Down Expand Up @@ -84,10 +89,10 @@ type sonataFlowServiceCatalog struct {
}

// NewServiceCatalog returns a new ServiceCatalog configured to resolve kubernetes, knative, and openshift resource addresses.
func NewServiceCatalog(cli client.Client) ServiceCatalog {
func NewServiceCatalog(cli client.Client, knServingClient clientservingv1.ServingV1Interface, knEventingClient clienteventingv1.EventingV1Interface) ServiceCatalog {
return &sonataFlowServiceCatalog{
kubernetesCatalog: newK8SServiceCatalog(cli),
knativeCatalog: newKnServiceCatalog(cli),
knativeCatalog: newKnServiceCatalog(knServingClient, knEventingClient),
}
}

Expand All @@ -96,7 +101,7 @@ func (c *sonataFlowServiceCatalog) Query(ctx context.Context, uri ResourceUri, o
case KubernetesScheme:
return c.kubernetesCatalog.Query(ctx, uri, outputFormat)
case KnativeScheme:
return "", fmt.Errorf("knative service discovery is not yet implemened")
return c.knativeCatalog.Query(ctx, uri, outputFormat)
case OpenshiftScheme:
return "", fmt.Errorf("openshift service discovery is not yet implemented")
default:
Expand Down
125 changes: 125 additions & 0 deletions controllers/discovery/discovery_knative_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package discovery

import (
"context"
"fmt"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/apis"

duckv1 "knative.dev/pkg/apis/duck/v1"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"

fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"

servingv1 "knative.dev/serving/pkg/apis/serving/v1"
fakeservingclient "knative.dev/serving/pkg/client/injection/client/fake"
)

func Test_QueryKnativeService(t *testing.T) {
doTestQueryKnativeService(t, "http://knServiceName1.namespace1.svc.cluster.local")
}

func Test_QueryKnativeServiceNotFound(t *testing.T) {
_, client := fakeservingclient.With(context.TODO())
ctg := NewServiceCatalog(nil, client.ServingV1(), nil)
doTestQueryWithError(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("services").
Group("serving.knative.dev").
Version("v1").
Namespace(namespace1).
Name(knServiceName1).Build(), "", fmt.Sprintf("services.serving.knative.dev %q not found", knServiceName1))
}

func doTestQueryKnativeService(t *testing.T, expectedUri string) {
service := &servingv1.Service{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace1,
Name: knServiceName1,
},
Spec: servingv1.ServiceSpec{},
Status: servingv1.ServiceStatus{
RouteStatusFields: servingv1.RouteStatusFields{
Address: &duckv1.Addressable{
URL: &apis.URL{
Scheme: "http",
Host: knServiceName1 + "." + namespace1 + ".svc.cluster.local",
},
},
},
},
}
_, client := fakeservingclient.With(context.TODO(), service)
ctg := NewServiceCatalog(nil, client.ServingV1(), nil)
doTestQuery(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("services").
Group("serving.knative.dev").
Version("v1").
Namespace(namespace1).
Name(knServiceName1).Build(), "", expectedUri)
}

func Test_QueryKnativeBroker(t *testing.T) {
doTestQueryKnativeBroker(t, "http://broker-ingress.knative-eventing.svc.cluster.local/namespace1/knBrokerName1")
}

func Test_QueryKnativeBrokerNotFound(t *testing.T) {
_, client := fakeeventingclient.With(context.TODO())
ctg := NewServiceCatalog(nil, nil, client.EventingV1())
doTestQueryWithError(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("brokers").
Group("eventing.knative.dev").
Version("v1").
Namespace(namespace1).
Name(knBrokerName1).Build(), "", fmt.Sprintf("brokers.eventing.knative.dev %q not found", knBrokerName1))
}

func doTestQueryKnativeBroker(t *testing.T, expectedUri string) {
broker := &eventingv1.Broker{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace1,
Name: knBrokerName1,
},
Spec: eventingv1.BrokerSpec{},
Status: eventingv1.BrokerStatus{
Address: duckv1.Addressable{
URL: &apis.URL{
Scheme: "http",
Host: "broker-ingress.knative-eventing.svc.cluster.local",
Path: "/" + namespace1 + "/" + knBrokerName1,
},
},
},
}
_, client := fakeeventingclient.With(context.TODO(), broker)
ctg := NewServiceCatalog(nil, nil, client.EventingV1())
doTestQuery(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("brokers").
Group("eventing.knative.dev").
Version("v1").
Namespace(namespace1).
Name(knBrokerName1).Build(), "", expectedUri)
}
14 changes: 7 additions & 7 deletions controllers/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func doTestQueryKubernetesService(t *testing.T, outputFormat string, expectedUri
service.Spec.Type = corev1.ServiceTypeNodePort
service.Spec.ClusterIP = "10.1.5.18"
cli := fake.NewClientBuilder().WithRuntimeObjects(service).Build()
ctg := NewServiceCatalog(cli)
ctg := NewServiceCatalog(cli, nil, nil)
doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Kind("services").
Version("v1").
Expand All @@ -86,7 +86,7 @@ func doTestQueryKubernetesPod(t *testing.T, outputFormat string, expectedUri str
*mockContainerWithPorts("container1Name", mockContainerPort(httpProtocol, tcp, defaultHttpPort)))
pod.Status.PodIP = "10.1.12.13"
cli := fake.NewClientBuilder().WithRuntimeObjects(pod).Build()
ctg := NewServiceCatalog(cli)
ctg := NewServiceCatalog(cli, nil, nil)
doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Kind("pods").
Version("v1").
Expand Down Expand Up @@ -116,7 +116,7 @@ func doTesQueryKubernetesDeploymentWithService(t *testing.T, outputFormat string
service.Spec.Type = corev1.ServiceTypeNodePort

cli := fake.NewClientBuilder().WithRuntimeObjects(deployment, service).Build()
ctg := NewServiceCatalog(cli)
ctg := NewServiceCatalog(cli, nil, nil)

doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand All @@ -142,7 +142,7 @@ func doTestQueryKubernetesDeploymentWithoutService(t *testing.T, outputFormat st
}

deployment := mockDeployment(namespace1, deployment1Name, nil, &selector)
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(deployment).Build())
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(deployment).Build(), nil, nil)

uri := *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand Down Expand Up @@ -176,7 +176,7 @@ func doTestQueryKubernetesStatefulSetWithService(t *testing.T, outputFormat stri
service.Spec.Type = corev1.ServiceTypeNodePort

cli := fake.NewClientBuilder().WithRuntimeObjects(statefulSet, service).Build()
ctg := NewServiceCatalog(cli)
ctg := NewServiceCatalog(cli, nil, nil)

doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand All @@ -202,7 +202,7 @@ func doTestQueryKubernetesStatefulSetWithoutService(t *testing.T, outputFormat s
}

statefulSet := mockStatefulSet(namespace1, statefulSet1Name, nil, &selector)
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(statefulSet).Build())
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(statefulSet).Build(), nil, nil)

uri := *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand Down Expand Up @@ -237,7 +237,7 @@ func doTestQueryKubernetesIngress(t *testing.T, hostName string, ip string, tls
ingress.Spec.TLS = []v1.IngressTLS{{}}
}
cli := fake.NewClientBuilder().WithRuntimeObjects(ingress).Build()
ctg := NewServiceCatalog(cli)
ctg := NewServiceCatalog(cli, nil, nil)
doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Kind("ingresses").
Group("networking.k8s.io").
Expand Down
45 changes: 40 additions & 5 deletions controllers/discovery/knative_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,54 @@ import (
"context"
"fmt"

"sigs.k8s.io/controller-runtime/pkg/client"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
clientservingv1 "knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1"
)

const (
knServiceKind = "services"
knBrokerKind = "brokers"
)

type knServiceCatalog struct {
Client client.Client
ServingClient clientservingv1.ServingV1Interface
EventingClient clienteventingv1.EventingV1Interface
}

func newKnServiceCatalog(cli client.Client) knServiceCatalog {
func newKnServiceCatalog(servingClient clientservingv1.ServingV1Interface, eventingClient clienteventingv1.EventingV1Interface) knServiceCatalog {
return knServiceCatalog{
Client: cli,
ServingClient: servingClient,
EventingClient: eventingClient,
}
}

func (c knServiceCatalog) Query(ctx context.Context, uri ResourceUri, outputFormat string) (string, error) {
return "", fmt.Errorf("knative service discovery is not yet implemened")
switch uri.GVK.Kind {
case knServiceKind:
return c.resolveKnServiceQuery(ctx, uri)
case knBrokerKind:
return c.resolveKnBrokerQuery(ctx, uri)
default:
return "", fmt.Errorf("resolution of knative kind: %s is not implemented", uri.GVK.Kind)
}
}

func (c knServiceCatalog) resolveKnServiceQuery(ctx context.Context, uri ResourceUri) (string, error) {
if service, err := c.ServingClient.Services(uri.Namespace).Get(ctx, uri.Name, metav1.GetOptions{}); err != nil {
return "", err
} else {
// knative objects discovery should rely on the addressable interface
return service.Status.Address.URL.String(), nil
}
}

func (c knServiceCatalog) resolveKnBrokerQuery(ctx context.Context, uri ResourceUri) (string, error) {
if broker, err := c.EventingClient.Brokers(uri.Namespace).Get(ctx, uri.Name, metav1.GetOptions{}); err != nil {
return "", err
} else {
// knative objects discovery should rely on the addressable interface
return broker.Status.Address.URL.String(), nil
}
}
Loading

0 comments on commit 51c6a1a

Please sign in to comment.