Skip to content

Commit

Permalink
support config oauth2 for geo replication (#237)
Browse files Browse the repository at this point in the history
* support config oauth2 for geo replication

* Update config/rbac/role.yaml

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix style

* fix lint

* fix npe when test

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
freeznet and github-actions[bot] authored Aug 21, 2024
1 parent e7c5d30 commit 782fb18
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 21 deletions.
10 changes: 5 additions & 5 deletions api/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ const (

// PulsarAuthenticationOAuth2 indicates the parameters which are need by pulsar OAuth2
type PulsarAuthenticationOAuth2 struct {
IssuerEndpoint string `json:"issuerEndpoint"`
ClientID string `json:"clientID"`
Audience string `json:"audience"`
Key ValueOrSecretRef `json:"key"`
Scope string `json:"scope,omitempty"`
IssuerEndpoint string `json:"issuerEndpoint"`
ClientID string `json:"clientID"`
Audience string `json:"audience"`
Key *ValueOrSecretRef `json:"key"`
Scope string `json:"scope,omitempty"`
}

// IsPulsarResourceReady returns true if resource satisfies with these condition
Expand Down
6 changes: 5 additions & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 59 additions & 14 deletions pkg/connection/reconcile_geo_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package connection

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -244,22 +246,41 @@ func (r *PulsarGeoReplicationReconciler) ReconcileGeoReplication(ctx context.Con

func (r *PulsarGeoReplicationReconciler) checkSecretRefUpdate(connection resourcev1alpha1.PulsarConnection) (bool, error) {
auth := connection.Spec.Authentication
if auth == nil || auth.Token.SecretRef == nil {
if auth == nil || (auth.Token != nil && auth.Token.SecretRef == nil) ||
(auth.OAuth2 != nil && auth.OAuth2.Key == nil) ||
(auth.OAuth2 != nil && auth.OAuth2.Key != nil && auth.OAuth2.Key.SecretRef == nil) {
return false, nil
}
secret := &corev1.Secret{}
namespacedName := types.NamespacedName{
Name: auth.Token.SecretRef.Name,
Namespace: connection.Namespace,
}
if err := r.conn.client.Get(context.Background(), namespacedName, secret); err != nil {
return false, err
if auth.Token != nil && auth.Token.SecretRef != nil {
namespacedName := types.NamespacedName{
Name: auth.Token.SecretRef.Name,
Namespace: connection.Namespace,
}
if err := r.conn.client.Get(context.Background(), namespacedName, secret); err != nil {
return false, err
}
secretHash, err := utils.CalculateSecretKeyMd5(secret, auth.Token.SecretRef.Key)
if err != nil {
return false, err
}
return connection.Status.SecretKeyHash != secretHash, nil
}
secretHash, err := utils.CalculateSecretKeyMd5(secret, auth.Token.SecretRef.Key)
if err != nil {
return false, err
if auth.OAuth2 != nil && auth.OAuth2.Key != nil && auth.OAuth2.Key.SecretRef != nil {
namespacedName := types.NamespacedName{
Name: auth.OAuth2.Key.SecretRef.Name,
Namespace: connection.Namespace,
}
if err := r.conn.client.Get(context.Background(), namespacedName, secret); err != nil {
return false, err
}
secretHash, err := utils.CalculateSecretKeyMd5(secret, auth.OAuth2.Key.SecretRef.Key)
if err != nil {
return false, err
}
return connection.Status.SecretKeyHash != secretHash, nil
}
return connection.Status.SecretKeyHash != secretHash, nil
return false, nil
}

func createParams(ctx context.Context, destConnection *resourcev1alpha1.PulsarConnection, client client.Client) (*admin.ClusterParams, error) {
Expand All @@ -271,6 +292,7 @@ func createParams(ctx context.Context, destConnection *resourcev1alpha1.PulsarCo
BrokerClientTrustCertsFilePath: destConnection.Spec.BrokerClientTrustCertsFilePath,
}

hasAuth := false
if auth := destConnection.Spec.Authentication; auth != nil {
if auth.Token != nil {
value, err := GetValue(ctx, client, destConnection.Namespace, auth.Token)
Expand All @@ -280,11 +302,34 @@ func createParams(ctx context.Context, destConnection *resourcev1alpha1.PulsarCo
if value != nil {
clusterParam.AuthPlugin = resourcev1alpha1.AuthPluginToken
clusterParam.AuthParameters = "token:" + *value
hasAuth = true
}
}
if oauth2 := auth.OAuth2; !hasAuth && oauth2 != nil {
var paramsJSON = utils.ClientCredentials{
IssuerURL: oauth2.IssuerEndpoint,
Audience: oauth2.Audience,
Scope: oauth2.Scope,
ClientID: oauth2.ClientID,
}
if oauth2.Key != nil {
value, err := GetValue(ctx, client, destConnection.Namespace, oauth2.Key)
if err != nil {
return nil, err
}
if value != nil {
paramsJSON.PrivateKey = "data:application/json;base64," + base64.StdEncoding.EncodeToString([]byte(*value))
clusterParam.AuthPlugin = resourcev1alpha1.AuthPluginOAuth2
paramsJSONString, err := json.Marshal(paramsJSON)
if err != nil {
return nil, err
}
clusterParam.AuthParameters = string(paramsJSONString)
}
} else {
return nil, fmt.Errorf("OAuth2 key is empty")
}
}
// TODO support oauth2
// if auth.OAuth2 != nil && !hasAuth {
// }
}
return clusterParam, nil
}
5 changes: 4 additions & 1 deletion pkg/connection/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,10 @@ func MakePulsarAdminConfig(ctx context.Context, connection *resourcev1alpha1.Pul
cfg.ClientID = oauth2.ClientID
cfg.Audience = oauth2.Audience
cfg.Scope = oauth2.Scope
value, err := GetValue(ctx, k8sClient, connection.Namespace, &oauth2.Key)
if oauth2.Key == nil {
return nil, fmt.Errorf("oauth2 key must not be empty")
}
value, err := GetValue(ctx, k8sClient, connection.Namespace, oauth2.Key)
if err != nil {
return nil, err
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/utils/oauth2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2023 StreamNative
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

// ClientCredentials represents the client credentials for OAuth2
type ClientCredentials struct {
IssuerURL string `json:"issuerUrl,omitempty"`
Audience string `json:"audience,omitempty"`
Scope string `json:"scope,omitempty"`
PrivateKey string `json:"privateKey,omitempty"`
ClientID string `json:"clientId,omitempty"`
}

0 comments on commit 782fb18

Please sign in to comment.