diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index c6523228..6e36e803 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -61,11 +61,11 @@ const ( // PulsarAuthenticationOAuth2 indicates the parameters which are need by pulsar OAuth2 type PulsarAuthenticationOAuth2 struct { - IssuerEndpoint string `json:"issuerEndpoint"` - ClientID string `json:"clientID"` - Audience string `json:"audience"` - Key ValueOrSecretRef `json:"key"` - Scope string `json:"scope,omitempty"` + IssuerEndpoint string `json:"issuerEndpoint"` + ClientID string `json:"clientID"` + Audience string `json:"audience"` + Key *ValueOrSecretRef `json:"key"` + Scope string `json:"scope,omitempty"` } // IsPulsarResourceReady returns true if resource satisfies with these condition diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 3e54b6e9..f851ac3a 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -201,7 +201,11 @@ func (in *PulsarAuthentication) DeepCopy() *PulsarAuthentication { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PulsarAuthenticationOAuth2) DeepCopyInto(out *PulsarAuthenticationOAuth2) { *out = *in - in.Key.DeepCopyInto(&out.Key) + if in.Key != nil { + in, out := &in.Key, &out.Key + *out = new(ValueOrSecretRef) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarAuthenticationOAuth2. diff --git a/pkg/connection/reconcile_geo_replication.go b/pkg/connection/reconcile_geo_replication.go index 6af1c027..8adde66c 100644 --- a/pkg/connection/reconcile_geo_replication.go +++ b/pkg/connection/reconcile_geo_replication.go @@ -16,6 +16,8 @@ package connection import ( "context" + "encoding/base64" + "encoding/json" "fmt" "github.com/go-logr/logr" @@ -244,22 +246,41 @@ func (r *PulsarGeoReplicationReconciler) ReconcileGeoReplication(ctx context.Con func (r *PulsarGeoReplicationReconciler) checkSecretRefUpdate(connection resourcev1alpha1.PulsarConnection) (bool, error) { auth := connection.Spec.Authentication - if auth == nil || auth.Token.SecretRef == nil { + if auth == nil || (auth.Token != nil && auth.Token.SecretRef == nil) || + (auth.OAuth2 != nil && auth.OAuth2.Key == nil) || + (auth.OAuth2 != nil && auth.OAuth2.Key != nil && auth.OAuth2.Key.SecretRef == nil) { return false, nil } secret := &corev1.Secret{} - namespacedName := types.NamespacedName{ - Name: auth.Token.SecretRef.Name, - Namespace: connection.Namespace, - } - if err := r.conn.client.Get(context.Background(), namespacedName, secret); err != nil { - return false, err + if auth.Token != nil && auth.Token.SecretRef != nil { + namespacedName := types.NamespacedName{ + Name: auth.Token.SecretRef.Name, + Namespace: connection.Namespace, + } + if err := r.conn.client.Get(context.Background(), namespacedName, secret); err != nil { + return false, err + } + secretHash, err := utils.CalculateSecretKeyMd5(secret, auth.Token.SecretRef.Key) + if err != nil { + return false, err + } + return connection.Status.SecretKeyHash != secretHash, nil } - secretHash, err := utils.CalculateSecretKeyMd5(secret, auth.Token.SecretRef.Key) - if err != nil { - return false, err + if auth.OAuth2 != nil && auth.OAuth2.Key != nil && auth.OAuth2.Key.SecretRef != nil { + namespacedName := types.NamespacedName{ + Name: auth.OAuth2.Key.SecretRef.Name, + Namespace: connection.Namespace, + } + if err := r.conn.client.Get(context.Background(), namespacedName, secret); err != nil { + return false, err + } + secretHash, err := utils.CalculateSecretKeyMd5(secret, auth.OAuth2.Key.SecretRef.Key) + if err != nil { + return false, err + } + return connection.Status.SecretKeyHash != secretHash, nil } - return connection.Status.SecretKeyHash != secretHash, nil + return false, nil } func createParams(ctx context.Context, destConnection *resourcev1alpha1.PulsarConnection, client client.Client) (*admin.ClusterParams, error) { @@ -271,6 +292,7 @@ func createParams(ctx context.Context, destConnection *resourcev1alpha1.PulsarCo BrokerClientTrustCertsFilePath: destConnection.Spec.BrokerClientTrustCertsFilePath, } + hasAuth := false if auth := destConnection.Spec.Authentication; auth != nil { if auth.Token != nil { value, err := GetValue(ctx, client, destConnection.Namespace, auth.Token) @@ -280,11 +302,34 @@ func createParams(ctx context.Context, destConnection *resourcev1alpha1.PulsarCo if value != nil { clusterParam.AuthPlugin = resourcev1alpha1.AuthPluginToken clusterParam.AuthParameters = "token:" + *value + hasAuth = true + } + } + if oauth2 := auth.OAuth2; !hasAuth && oauth2 != nil { + var paramsJSON = utils.ClientCredentials{ + IssuerURL: oauth2.IssuerEndpoint, + Audience: oauth2.Audience, + Scope: oauth2.Scope, + ClientID: oauth2.ClientID, + } + if oauth2.Key != nil { + value, err := GetValue(ctx, client, destConnection.Namespace, oauth2.Key) + if err != nil { + return nil, err + } + if value != nil { + paramsJSON.PrivateKey = "data:application/json;base64," + base64.StdEncoding.EncodeToString([]byte(*value)) + clusterParam.AuthPlugin = resourcev1alpha1.AuthPluginOAuth2 + paramsJSONString, err := json.Marshal(paramsJSON) + if err != nil { + return nil, err + } + clusterParam.AuthParameters = string(paramsJSONString) + } + } else { + return nil, fmt.Errorf("OAuth2 key is empty") } } - // TODO support oauth2 - // if auth.OAuth2 != nil && !hasAuth { - // } } return clusterParam, nil } diff --git a/pkg/connection/reconciler.go b/pkg/connection/reconciler.go index 36327d02..6c1425e2 100644 --- a/pkg/connection/reconciler.go +++ b/pkg/connection/reconciler.go @@ -298,7 +298,10 @@ func MakePulsarAdminConfig(ctx context.Context, connection *resourcev1alpha1.Pul cfg.ClientID = oauth2.ClientID cfg.Audience = oauth2.Audience cfg.Scope = oauth2.Scope - value, err := GetValue(ctx, k8sClient, connection.Namespace, &oauth2.Key) + if oauth2.Key == nil { + return nil, fmt.Errorf("oauth2 key must not be empty") + } + value, err := GetValue(ctx, k8sClient, connection.Namespace, oauth2.Key) if err != nil { return nil, err } diff --git a/pkg/utils/oauth2.go b/pkg/utils/oauth2.go new file mode 100644 index 00000000..dabedc99 --- /dev/null +++ b/pkg/utils/oauth2.go @@ -0,0 +1,24 @@ +// Copyright 2023 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +// ClientCredentials represents the client credentials for OAuth2 +type ClientCredentials struct { + IssuerURL string `json:"issuerUrl,omitempty"` + Audience string `json:"audience,omitempty"` + Scope string `json:"scope,omitempty"` + PrivateKey string `json:"privateKey,omitempty"` + ClientID string `json:"clientId,omitempty"` +}