diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index de7293eb..7b8cbbf8 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -45,6 +45,9 @@ type PulsarAuthentication struct { // +optional OAuth2 *PulsarAuthenticationOAuth2 `json:"oauth2,omitempty"` + + // +optional + TLS *PulsarAuthenticationTLS `json:"tls,omitempty"` } // PulsarResourceLifeCyclePolicy indicates whether it will keep or delete the resource @@ -68,6 +71,12 @@ type PulsarAuthenticationOAuth2 struct { Scope string `json:"scope,omitempty"` } +// PulsarAuthenticationTLS indicates the parameters which are need by pulsar TLS Authentication +type PulsarAuthenticationTLS struct { + ClientCertificatePath string `json:"clientCertificatePath"` + ClientCertificateKeyPath string `json:"clientCertificateKeyPath"` +} + // IsPulsarResourceReady returns true if resource satisfies with these condition // 1. The instance is not deleted // 2. Status ObservedGeneration is equal with meta.ObservedGeneration diff --git a/api/v1alpha1/pulsarconnection_types.go b/api/v1alpha1/pulsarconnection_types.go index f7fbece9..0503c9fe 100644 --- a/api/v1alpha1/pulsarconnection_types.go +++ b/api/v1alpha1/pulsarconnection_types.go @@ -58,6 +58,19 @@ type PulsarConnectionSpec struct { // set when enabling the Geo Replication // +optional ClusterName string `json:"clusterName,omitempty"` + + // TLSEnableHostnameVerification indicates whether to verify the hostname of the broker. + // Only used when using secure urls. + // +optional + TLSEnableHostnameVerification bool `json:"tlsEnableHostnameVerification,omitempty"` + + // TLSAllowInsecureConnection indicates whether to allow insecure connection to the broker. + // +optional + TLSAllowInsecureConnection bool `json:"tlsAllowInsecureConnection,omitempty"` + + // TLSTrustCertsFilePath Path for the TLS certificate used to validate the broker endpoint when using TLS. + // +optional + TLSTrustCertsFilePath string `json:"tlsTrustCertsFilePath,omitempty"` } // PulsarConnectionStatus defines the observed state of PulsarConnection diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index a694324b..9b585b5e 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -55,6 +55,11 @@ func (in *PulsarAuthentication) DeepCopyInto(out *PulsarAuthentication) { *out = new(PulsarAuthenticationOAuth2) (*in).DeepCopyInto(*out) } + if in.TLS != nil { + in, out := &in.TLS, &out.TLS + *out = new(PulsarAuthenticationTLS) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarAuthentication. @@ -83,6 +88,21 @@ func (in *PulsarAuthenticationOAuth2) DeepCopy() *PulsarAuthenticationOAuth2 { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PulsarAuthenticationTLS) DeepCopyInto(out *PulsarAuthenticationTLS) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarAuthenticationTLS. +func (in *PulsarAuthenticationTLS) DeepCopy() *PulsarAuthenticationTLS { + if in == nil { + return nil + } + out := new(PulsarAuthenticationTLS) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PulsarConnection) DeepCopyInto(out *PulsarConnection) { *out = *in diff --git a/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml b/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml index c560926c..a7b6ae91 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml @@ -130,6 +130,18 @@ spec: - issuerEndpoint - key type: object + tls: + description: PulsarAuthenticationTLS indicates the parameters + which are need by pulsar TLS Authentication + properties: + clientCertificateKeyPath: + type: string + clientCertificatePath: + type: string + required: + - clientCertificateKeyPath + - clientCertificatePath + type: object token: description: ValueOrSecretRef is a string or a secret reference of the authentication @@ -167,6 +179,18 @@ spec: description: ClusterName indicates the local cluster name of the pulsar cluster. It should set when enabling the Geo Replication type: string + tlsAllowInsecureConnection: + description: TLSAllowInsecureConnection indicates whether to allow + insecure connection to the broker. + type: boolean + tlsEnableHostnameVerification: + description: TLSEnableHostnameVerification indicates whether to verify + the hostname of the broker. Only used when using secure urls. + type: boolean + tlsTrustCertsFilePath: + description: TLSTrustCertsFilePath Path for the TLS certificate used + to validate the broker endpoint when using TLS. + type: string type: object status: description: PulsarConnectionStatus defines the observed state of PulsarConnection diff --git a/docs/pulsar_connection.md b/docs/pulsar_connection.md index c91bc55f..32dbe48e 100644 --- a/docs/pulsar_connection.md +++ b/docs/pulsar_connection.md @@ -117,6 +117,24 @@ Other `PulsarConnection` configuration examples: # Use the keyFile contents as the oauth2 key value value: {"type":"sn_service_account","client_id":"zvex72oGvFQMBQGZ2ozMxOus2s4tQASJ","client_secret":"60J6fo81j-h69_vVvYvqFOHs2NfOyy6pqGqwIhTgnxpQ7O3UH8PdCbVtdm_SJjIf","client_email":"contoso@sndev.auth.streamnative.cloud","issuer_url":"https://auth.streamnative.cloud"} +* TLS authentication + + ```yaml + apiVersion: resource.streamnative.io/v1alpha1 + kind: PulsarConnection + metadata: + name: test-tls-auth-pulsar-connection + namespace: test + spec: + adminServiceURL: http://test-pulsar-sn-platform-broker.test.svc.cluster.local:8080 + brokerServiceURL: pulsar://test-pulsar-sn-platform-broker.test.svc.cluster.local:6650 + clusterName: pulsar-cluster + authentication: + tls: + clientCertificateKeyPath: /certs/tls.key + clientCertificatePath: /certs/tls.crt + ``` + This table lists specifications available for the `PulsarConnection` resource. | Option | Description | Required or not | @@ -127,7 +145,9 @@ This table lists specifications available for the `PulsarConnection` resource. | `brokerServiceSecureURL` | The broker service URL for secure connection to the Pulsar cluster, such as `pulsar+ssl://cluster-broker.test.svc.cluster.local:6651`. This option is required for configuring Geo-replication when TLS is enabled. This option is available for version `0.3.0` or above. | No | | `adminServiceSecureURL` | The admin service URL for secure connection to the Pulsar cluster, such as `https://cluster-broker.test.svc.cluster.local:443`. This option is available for version `0.3.0` or above. | No | | `clusterName` | The Pulsar cluster name. You can use the `pulsar-admin clusters list` command to get the Pulsar cluster name. This option is required for configuring Geo-replication. Provided from `0.3.0` | No | - +| `tlsAllowInsecureConnection` | A flag that indicates whether to allow insecure connection to the broker. Provided from `0.5.0` | No | +| `tlsEnableHostnameVerification` | A flag that indicates wether hostname verification is enabled. Provided from `0.5.0` | No | +| `tlsTrustCertsFilePath` | The path to the certificate used during hostname verfification. Provided from `0.5.0` | No | 1. Apply the YAML file to create the Pulsar Connection. diff --git a/pkg/admin/interface.go b/pkg/admin/interface.go index e993e552..8536b5dd 100644 --- a/pkg/admin/interface.go +++ b/pkg/admin/interface.go @@ -15,6 +15,7 @@ package admin import ( + "fmt" "io/ioutil" "os" "strings" @@ -179,6 +180,10 @@ type PulsarAdminConfig struct { Audience string Key string Scope string + + // TLS Authentication related configuration + ClientCertificatePath string + ClientCertificateKeyPath string } // NewPulsarAdmin initialize a pulsar admin client with configuration @@ -189,8 +194,10 @@ func NewPulsarAdmin(conf PulsarAdminConfig) (PulsarAdmin, error) { var adminClient admin.Client config := &config.Config{ - WebServiceURL: conf.WebServiceURL, - TLSAllowInsecureConnection: true, + WebServiceURL: conf.WebServiceURL, + TLSAllowInsecureConnection: conf.TLSAllowInsecureConnection, + TLSEnableHostnameVerification: conf.TLSEnableHostnameVerification, + TLSTrustCertsFilePath: conf.TLSTrustCertsFilePath, // V2 admin endpoint contains operations for tenant, namespace and topic. PulsarAPIVersion: config.V2, } @@ -228,9 +235,23 @@ func NewPulsarAdmin(conf PulsarAdminConfig) (PulsarAdmin, error) { if err != nil { return nil, err } - } else { + adminClient = admin.NewWithAuthProvider(config, oauthProvider) + } else if conf.Token != "" { config.Token = conf.Token + adminClient, err = admin.New(config) + if err != nil { + return nil, err + } + } else if conf.ClientCertificatePath != "" { + config.AuthPlugin = auth.TLSPluginName + config.AuthParams = fmt.Sprintf("{\"tlsCertFile\": %q, \"tlsKeyFile\": %q}", conf.ClientCertificatePath, conf.ClientCertificateKeyPath) + + adminClient, err = admin.New(config) + if err != nil { + return nil, err + } + } else { adminClient, err = admin.New(config) if err != nil { return nil, err diff --git a/pkg/connection/reconciler.go b/pkg/connection/reconciler.go index f6df61f1..5ae809b7 100644 --- a/pkg/connection/reconciler.go +++ b/pkg/connection/reconciler.go @@ -197,13 +197,26 @@ func (r *PulsarConnectionReconciler) MakePulsarAdminConfig(ctx context.Context) } // MakePulsarAdminConfig create pulsar admin configuration -func MakePulsarAdminConfig(ctx context.Context, connection *resourcev1alpha1.PulsarConnection, +func (r *PulsarConnectionReconciler) MakePulsarAdminConfig(ctx context.Context, connection *resourcev1alpha1.PulsarConnection, k8sClient client.Client) (*admin.PulsarAdminConfig, error) { if connection.Spec.AdminServiceURL == "" && connection.Spec.AdminServiceSecureURL == "" { return nil, fmt.Errorf("adminServiceURL or adminServiceSecureURL must not be empty") } + + tlsEnableHostnameVerification := connection.Spec.TLSEnableHostnameVerification + tlsAllowInsecureConnection := connection.Spec.TLSAllowInsecureConnection + tlsTrustCertsFilePath := connection.Spec.TLSTrustCertsFilePath + + if connection.Spec.AdminServiceSecureURL == "" { + tlsEnableHostnameVerification = false + tlsAllowInsecureConnection = true + tlsTrustCertsFilePath = "" + } cfg := admin.PulsarAdminConfig{ - WebServiceURL: connection.Spec.AdminServiceURL, + WebServiceURL: connection.Spec.AdminServiceURL, + TLSAllowInsecureConnection: tlsAllowInsecureConnection, + TLSEnableHostnameVerification: tlsEnableHostnameVerification, + TLSTrustCertsFilePath: tlsTrustCertsFilePath, } hasAuth := false if authn := connection.Spec.Authentication; authn != nil { @@ -230,6 +243,10 @@ func MakePulsarAdminConfig(ctx context.Context, connection *resourcev1alpha1.Pul cfg.Key = *value } } + if tls := authn.TLS; tls != nil { + cfg.ClientCertificatePath = tls.ClientCertificatePath + cfg.ClientCertificateKeyPath = tls.ClientCertificateKeyPath + } } return &cfg, nil }