diff --git a/.github/workflows/chart-test.yml b/.github/workflows/chart-test.yml index 0421a53..6399e7d 100644 --- a/.github/workflows/chart-test.yml +++ b/.github/workflows/chart-test.yml @@ -46,7 +46,8 @@ jobs: diff config/crd/bases/resource.streamnative.io_pulsarsinks.yaml charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarsinks.yaml && \ diff config/crd/bases/resource.streamnative.io_pulsarsources.yaml charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarsources.yaml && \ diff config/crd/bases/resource.streamnative.io_pulsarpackages.yaml charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarpackages.yaml && \ - diff config/crd/bases/resource.streamnative.io_pulsartopics.yaml charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsartopics.yaml + diff config/crd/bases/resource.streamnative.io_pulsartopics.yaml charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsartopics.yaml && \ + diff config/crd/bases/resource.streamnative.io_pulsarnsisolationpolicies.yaml charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarnsisolationpolicies.yaml - name: Set up Helm uses: azure/setup-helm@v3 diff --git a/README.md b/README.md index ccf7f75..ec5cae4 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ Currently, the Pulsar Resources Operator provides full lifecycle management for - [Sinks](docs/pulsar_sink.md) - [Sources](docs/pulsar_source.md) - [Geo-Replication](docs/pulsar_geo_replication.md) +- [NS-Isolation-Policy](docs/pulsar_ns_isolation_policy.md) ## Lifecycle Management @@ -128,6 +129,7 @@ In this tutorial, a Kubernetes namespace called `test` is used for examples, whi - [PulsarSink](docs/pulsar_sink.md) - [PulsarSource](docs/pulsar_source.md) - [PulsarGeoReplication](docs/pulsar_geo_replication.md) +- [NS-Isolation-Policy](docs/pulsar_ns_isolation_policy.md) # Contributing diff --git a/api/v1alpha1/pulsarnamespace_types.go b/api/v1alpha1/pulsarnamespace_types.go index ba67a43..ef3bdea 100644 --- a/api/v1alpha1/pulsarnamespace_types.go +++ b/api/v1alpha1/pulsarnamespace_types.go @@ -127,6 +127,16 @@ type PulsarNamespaceSpec struct { // Deduplication controls whether to enable message deduplication for the namespace. // +optional Deduplication *bool `json:"deduplication,omitempty"` + + // BookieAffinityGroup is the name of the namespace isolation policy to apply to the namespace. + BookieAffinityGroup *BookieAffinityGroupData `json:"bookieAffinityGroup,omitempty"` +} + +type BookieAffinityGroupData struct { + BookkeeperAffinityGroupPrimary string `json:"bookkeeperAffinityGroupPrimary"` + + // +optional + BookkeeperAffinityGroupSecondary string `json:"bookkeeperAffinityGroupSecondary,omitempty"` } // PulsarNamespaceStatus defines the observed state of PulsarNamespace diff --git a/api/v1alpha1/pulsarnsisolationpolicy_types.go b/api/v1alpha1/pulsarnsisolationpolicy_types.go new file mode 100644 index 0000000..8685c87 --- /dev/null +++ b/api/v1alpha1/pulsarnsisolationpolicy_types.go @@ -0,0 +1,124 @@ +// Copyright 2024 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1alpha1 + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! +// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +// PulsarNSIsolationPolicySpec defines the desired state of a Pulsar namespace isolation policy. +// It corresponds to the configuration options available in Pulsar's namespaceIsolationPolicies admin API. +type PulsarNSIsolationPolicySpec struct { + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster + // Important: Run "make" to regenerate code after modifying this file + + // Name is the policy name + // +kubebuilder:validation:Required + Name string `json:"name"` + + // Cluster is the name of the Pulsar Cluster + // +kubebuilder:validation:Required + Cluster string `json:"cluster"` + + // ConnectionRef is the reference to the PulsarConnection resource + // used to connect to the Pulsar cluster for this ns-isolation-policy. + ConnectionRef corev1.LocalObjectReference `json:"connectionRef"` + + // Namespaces namespaces-regex list + // +kubebuilder:validation:Required + Namespaces []string `json:"namespaces"` + + // Primary primary-broker-regex list + // +kubebuilder:validation:Required + Primary []string `json:"primary"` + + // Secondary secondary-broker-regex list, optional + // +optional + Secondary []string `json:"secondary,omitempty"` + + // AutoFailoverPolicyType auto failover policy type name, only support min_available now + // +kubebuilder:validation:Required + // +kubebuilder:validation:Enum=min_available + AutoFailoverPolicyType AutoFailoverPolicyType `json:"autoFailoverPolicyType"` + + // AutoFailoverPolicyParams auto failover policy parameters + // +kubebuilder:validation:Required + AutoFailoverPolicyParams map[string]string `json:"autoFailoverPolicyParams"` +} + +type AutoFailoverPolicyType string + +const ( + MinAvailable AutoFailoverPolicyType = "min_available" +) + +// PulsarNSIsolationPolicyStatus defines the observed state of PulsarNSIsolationPolicy +type PulsarNSIsolationPolicyStatus struct { + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "make" to regenerate code after modifying this file + + // ObservedGeneration is the most recent generation observed for this resource. + // It corresponds to the metadata generation, which is updated on mutation by the API Server. + // This field is used to track whether the controller has processed the latest changes. + // +optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + + // Conditions represent the latest available observations of the ns-isolation-policy's current state. + // It follows the Kubernetes conventions for condition types and status. + // The "Ready" condition type is typically used to indicate the overall status of the ns-isolation-policy. + // +patchMergeKey=type + // +patchStrategy=merge + // +listType=map + // +listMapKey=type + // +optional + Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` +} + +//+kubebuilder:object:root=true +//+kubebuilder:subresource:status +//+kubebuilder:resource:categories=pulsar;pulsarres,shortName=pnsip +//+kubebuilder:printcolumn:name="RESOURCE_NAME",type=string,JSONPath=`.spec.name` +//+kubebuilder:printcolumn:name="GENERATION",type=string,JSONPath=`.metadata.generation` +//+kubebuilder:printcolumn:name="OBSERVED_GENERATION",type=string,JSONPath=`.status.observedGeneration` +//+kubebuilder:printcolumn:name="READY",type=string,JSONPath=`.status.conditions[?(@.type=="Ready")].status` + +// PulsarNSIsolationPolicy is the Schema for the pulsar ns-isolation-policy API +// It represents a Pulsar NsIsolationPolicy in the Kubernetes cluster and includes both +// the desired state (Spec) and the observed state (Status) of the policy. +type PulsarNSIsolationPolicy struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec PulsarNSIsolationPolicySpec `json:"spec,omitempty"` + Status PulsarNSIsolationPolicyStatus `json:"status,omitempty"` +} + +//+kubebuilder:object:root=true + +// PulsarNSIsolationPolicyList contains a list of PulsarNSIsolationPolicy resources. +// It is used by the Kubernetes API to return multiple PulsarNSIsolationPolicy objects. +type PulsarNSIsolationPolicyList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []PulsarNSIsolationPolicy `json:"items"` +} + +func init() { + SchemeBuilder.Register(&PulsarNSIsolationPolicy{}, &PulsarNSIsolationPolicyList{}) +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 399eda8..ec117d9 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -46,6 +46,21 @@ func (in *BatchSourceConfig) DeepCopy() *BatchSourceConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BookieAffinityGroupData) DeepCopyInto(out *BookieAffinityGroupData) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BookieAffinityGroupData. +func (in *BookieAffinityGroupData) DeepCopy() *BookieAffinityGroupData { + if in == nil { + return nil + } + out := new(BookieAffinityGroupData) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterInfo) DeepCopyInto(out *ClusterInfo) { *out = *in @@ -614,6 +629,125 @@ func (in *PulsarGeoReplicationStatus) DeepCopy() *PulsarGeoReplicationStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PulsarNSIsolationPolicy) DeepCopyInto(out *PulsarNSIsolationPolicy) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarNSIsolationPolicy. +func (in *PulsarNSIsolationPolicy) DeepCopy() *PulsarNSIsolationPolicy { + if in == nil { + return nil + } + out := new(PulsarNSIsolationPolicy) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PulsarNSIsolationPolicy) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PulsarNSIsolationPolicyList) DeepCopyInto(out *PulsarNSIsolationPolicyList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]PulsarNSIsolationPolicy, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarNSIsolationPolicyList. +func (in *PulsarNSIsolationPolicyList) DeepCopy() *PulsarNSIsolationPolicyList { + if in == nil { + return nil + } + out := new(PulsarNSIsolationPolicyList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PulsarNSIsolationPolicyList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PulsarNSIsolationPolicySpec) DeepCopyInto(out *PulsarNSIsolationPolicySpec) { + *out = *in + out.ConnectionRef = in.ConnectionRef + if in.Namespaces != nil { + in, out := &in.Namespaces, &out.Namespaces + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Primary != nil { + in, out := &in.Primary, &out.Primary + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Secondary != nil { + in, out := &in.Secondary, &out.Secondary + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.AutoFailoverPolicyParams != nil { + in, out := &in.AutoFailoverPolicyParams, &out.AutoFailoverPolicyParams + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarNSIsolationPolicySpec. +func (in *PulsarNSIsolationPolicySpec) DeepCopy() *PulsarNSIsolationPolicySpec { + if in == nil { + return nil + } + out := new(PulsarNSIsolationPolicySpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PulsarNSIsolationPolicyStatus) DeepCopyInto(out *PulsarNSIsolationPolicyStatus) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]v1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarNSIsolationPolicyStatus. +func (in *PulsarNSIsolationPolicyStatus) DeepCopy() *PulsarNSIsolationPolicyStatus { + if in == nil { + return nil + } + out := new(PulsarNSIsolationPolicyStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PulsarNamespace) DeepCopyInto(out *PulsarNamespace) { *out = *in @@ -763,6 +897,11 @@ func (in *PulsarNamespaceSpec) DeepCopyInto(out *PulsarNamespaceSpec) { *out = new(bool) **out = **in } + if in.BookieAffinityGroup != nil { + in, out := &in.BookieAffinityGroup, &out.BookieAffinityGroup + *out = new(BookieAffinityGroupData) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarNamespaceSpec. diff --git a/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarnamespaces.yaml b/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarnamespaces.yaml index 725177b..071d08e 100644 --- a/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarnamespaces.yaml +++ b/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarnamespaces.yaml @@ -104,6 +104,17 @@ spec: - destination_storage - message_age type: string + bookieAffinityGroup: + description: BookieAffinityGroup is the name of the namespace isolation + policy to apply to the namespace. + properties: + bookkeeperAffinityGroupPrimary: + type: string + bookkeeperAffinityGroupSecondary: + type: string + required: + - bookkeeperAffinityGroupPrimary + type: object bundles: description: |- Bundles specifies the number of bundles to split the namespace into. @@ -324,4 +335,4 @@ status: kind: "" plural: "" conditions: null - storedVersions: null \ No newline at end of file + storedVersions: null diff --git a/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarnsisolationpolicies.yaml b/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarnsisolationpolicies.yaml new file mode 100644 index 0000000..85e2a5c --- /dev/null +++ b/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarnsisolationpolicies.yaml @@ -0,0 +1,232 @@ +# Copyright 2024 StreamNative +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.15.0 + creationTimestamp: null + name: pulsarnsisolationpolicies.resource.streamnative.io +spec: + group: resource.streamnative.io + names: + categories: + - pulsar + - pulsarres + kind: PulsarNSIsolationPolicy + listKind: PulsarNSIsolationPolicyList + plural: pulsarnsisolationpolicies + shortNames: + - pnsip + singular: pulsarnsisolationpolicy + scope: Namespaced + versions: + - additionalPrinterColumns: + - jsonPath: .spec.name + name: RESOURCE_NAME + type: string + - jsonPath: .metadata.generation + name: GENERATION + type: string + - jsonPath: .status.observedGeneration + name: OBSERVED_GENERATION + type: string + - jsonPath: .status.conditions[?(@.type=="Ready")].status + name: READY + type: string + name: v1alpha1 + schema: + openAPIV3Schema: + description: |- + PulsarNSIsolationPolicy is the Schema for the pulsar ns-isolation-policy API + It represents a Pulsar NsIsolationPolicy in the Kubernetes cluster and includes both + the desired state (Spec) and the observed state (Status) of the policy. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: |- + PulsarNSIsolationPolicySpec defines the desired state of a Pulsar namespace isolation policy. + It corresponds to the configuration options available in Pulsar's namespaceIsolationPolicies admin API. + properties: + autoFailoverPolicyParams: + additionalProperties: + type: string + description: AutoFailoverPolicyParams auto failover policy parameters + type: object + autoFailoverPolicyType: + description: AutoFailoverPolicyType auto failover policy type name, + only support min_available now + enum: + - min_available + type: string + cluster: + description: Cluster is the name of the Pulsar Cluster + type: string + connectionRef: + description: |- + ConnectionRef is the reference to the PulsarConnection resource + used to connect to the Pulsar cluster for this ns-isolation-policy. + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + name: + description: Name is the policy name + type: string + namespaces: + description: Namespaces namespaces-regex list + items: + type: string + type: array + primary: + description: Primary primary-broker-regex list + items: + type: string + type: array + secondary: + description: Secondary secondary-broker-regex list, optional + items: + type: string + type: array + required: + - autoFailoverPolicyParams + - autoFailoverPolicyType + - cluster + - connectionRef + - name + - namespaces + - primary + type: object + status: + description: PulsarNSIsolationPolicyStatus defines the observed state + of PulsarNSIsolationPolicy + properties: + conditions: + description: |- + Conditions represent the latest available observations of the ns-isolation-policy's current state. + It follows the Kubernetes conventions for condition types and status. + The "Ready" condition type is typically used to indicate the overall status of the ns-isolation-policy. + items: + description: "Condition contains details for one aspect of the current + state of this API Resource.\n---\nThis struct is intended for + direct use as an array at the field path .status.conditions. For + example,\n\n\n\ttype FooStatus struct{\n\t // Represents the + observations of a foo's current state.\n\t // Known .status.conditions.type + are: \"Available\", \"Progressing\", and \"Degraded\"\n\t // + +patchMergeKey=type\n\t // +patchStrategy=merge\n\t // +listType=map\n\t + \ // +listMapKey=type\n\t Conditions []metav1.Condition `json:\"conditions,omitempty\" + patchStrategy:\"merge\" patchMergeKey:\"type\" protobuf:\"bytes,1,rep,name=conditions\"`\n\n\n\t + \ // other fields\n\t}" + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: |- + type of condition in CamelCase or in foo.example.com/CamelCase. + --- + Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be + useful (see .node.status.conditions), the ability to deconflict is important. + The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + x-kubernetes-list-map-keys: + - type + x-kubernetes-list-type: map + observedGeneration: + description: |- + ObservedGeneration is the most recent generation observed for this resource. + It corresponds to the metadata generation, which is updated on mutation by the API Server. + This field is used to track whether the controller has processed the latest changes. + format: int64 + type: integer + type: object + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: null + storedVersions: null diff --git a/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml b/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml index 725177b..071d08e 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml @@ -104,6 +104,17 @@ spec: - destination_storage - message_age type: string + bookieAffinityGroup: + description: BookieAffinityGroup is the name of the namespace isolation + policy to apply to the namespace. + properties: + bookkeeperAffinityGroupPrimary: + type: string + bookkeeperAffinityGroupSecondary: + type: string + required: + - bookkeeperAffinityGroupPrimary + type: object bundles: description: |- Bundles specifies the number of bundles to split the namespace into. @@ -324,4 +335,4 @@ status: kind: "" plural: "" conditions: null - storedVersions: null \ No newline at end of file + storedVersions: null diff --git a/config/crd/bases/resource.streamnative.io_pulsarnsisolationpolicies.yaml b/config/crd/bases/resource.streamnative.io_pulsarnsisolationpolicies.yaml new file mode 100644 index 0000000..85e2a5c --- /dev/null +++ b/config/crd/bases/resource.streamnative.io_pulsarnsisolationpolicies.yaml @@ -0,0 +1,232 @@ +# Copyright 2024 StreamNative +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.15.0 + creationTimestamp: null + name: pulsarnsisolationpolicies.resource.streamnative.io +spec: + group: resource.streamnative.io + names: + categories: + - pulsar + - pulsarres + kind: PulsarNSIsolationPolicy + listKind: PulsarNSIsolationPolicyList + plural: pulsarnsisolationpolicies + shortNames: + - pnsip + singular: pulsarnsisolationpolicy + scope: Namespaced + versions: + - additionalPrinterColumns: + - jsonPath: .spec.name + name: RESOURCE_NAME + type: string + - jsonPath: .metadata.generation + name: GENERATION + type: string + - jsonPath: .status.observedGeneration + name: OBSERVED_GENERATION + type: string + - jsonPath: .status.conditions[?(@.type=="Ready")].status + name: READY + type: string + name: v1alpha1 + schema: + openAPIV3Schema: + description: |- + PulsarNSIsolationPolicy is the Schema for the pulsar ns-isolation-policy API + It represents a Pulsar NsIsolationPolicy in the Kubernetes cluster and includes both + the desired state (Spec) and the observed state (Status) of the policy. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: |- + PulsarNSIsolationPolicySpec defines the desired state of a Pulsar namespace isolation policy. + It corresponds to the configuration options available in Pulsar's namespaceIsolationPolicies admin API. + properties: + autoFailoverPolicyParams: + additionalProperties: + type: string + description: AutoFailoverPolicyParams auto failover policy parameters + type: object + autoFailoverPolicyType: + description: AutoFailoverPolicyType auto failover policy type name, + only support min_available now + enum: + - min_available + type: string + cluster: + description: Cluster is the name of the Pulsar Cluster + type: string + connectionRef: + description: |- + ConnectionRef is the reference to the PulsarConnection resource + used to connect to the Pulsar cluster for this ns-isolation-policy. + properties: + name: + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? + type: string + type: object + x-kubernetes-map-type: atomic + name: + description: Name is the policy name + type: string + namespaces: + description: Namespaces namespaces-regex list + items: + type: string + type: array + primary: + description: Primary primary-broker-regex list + items: + type: string + type: array + secondary: + description: Secondary secondary-broker-regex list, optional + items: + type: string + type: array + required: + - autoFailoverPolicyParams + - autoFailoverPolicyType + - cluster + - connectionRef + - name + - namespaces + - primary + type: object + status: + description: PulsarNSIsolationPolicyStatus defines the observed state + of PulsarNSIsolationPolicy + properties: + conditions: + description: |- + Conditions represent the latest available observations of the ns-isolation-policy's current state. + It follows the Kubernetes conventions for condition types and status. + The "Ready" condition type is typically used to indicate the overall status of the ns-isolation-policy. + items: + description: "Condition contains details for one aspect of the current + state of this API Resource.\n---\nThis struct is intended for + direct use as an array at the field path .status.conditions. For + example,\n\n\n\ttype FooStatus struct{\n\t // Represents the + observations of a foo's current state.\n\t // Known .status.conditions.type + are: \"Available\", \"Progressing\", and \"Degraded\"\n\t // + +patchMergeKey=type\n\t // +patchStrategy=merge\n\t // +listType=map\n\t + \ // +listMapKey=type\n\t Conditions []metav1.Condition `json:\"conditions,omitempty\" + patchStrategy:\"merge\" patchMergeKey:\"type\" protobuf:\"bytes,1,rep,name=conditions\"`\n\n\n\t + \ // other fields\n\t}" + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: |- + type of condition in CamelCase or in foo.example.com/CamelCase. + --- + Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be + useful (see .node.status.conditions), the ability to deconflict is important. + The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + x-kubernetes-list-map-keys: + - type + x-kubernetes-list-type: map + observedGeneration: + description: |- + ObservedGeneration is the most recent generation observed for this resource. + It corresponds to the metadata generation, which is updated on mutation by the API Server. + This field is used to track whether the controller has processed the latest changes. + format: int64 + type: integer + type: object + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: null + storedVersions: null diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index f4a3f0c..a770f91 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -26,6 +26,7 @@ resources: - bases/resource.streamnative.io_pulsarpackages.yaml - bases/resource.streamnative.io_pulsarsinks.yaml - bases/resource.streamnative.io_pulsarsources.yaml +- bases/resource.streamnative.io_pulsarnsisolationpolicies.yaml #+kubebuilder:scaffold:crdkustomizeresource patchesStrategicMerge: diff --git a/config/rbac/pulsarnsisolationpolicy_editor_role.yaml b/config/rbac/pulsarnsisolationpolicy_editor_role.yaml new file mode 100644 index 0000000..8b35cbb --- /dev/null +++ b/config/rbac/pulsarnsisolationpolicy_editor_role.yaml @@ -0,0 +1,38 @@ +# Copyright 2024 StreamNative +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# permissions for end users to edit pulsarconnections. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: pulsarnsisolation-policy-editor-role +rules: + - apiGroups: + - resource.streamnative.io + resources: + - pulsarnsisolationpolicies + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - resource.streamnative.io + resources: + - pulsarnsisolationpolicies/status + verbs: + - get diff --git a/config/rbac/pulsarnsisolationpolicy_viewer_role.yaml b/config/rbac/pulsarnsisolationpolicy_viewer_role.yaml new file mode 100644 index 0000000..6293f02 --- /dev/null +++ b/config/rbac/pulsarnsisolationpolicy_viewer_role.yaml @@ -0,0 +1,34 @@ +# Copyright 2024 StreamNative +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# permissions for end users to edit pulsarconnections. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: pulsarnsisolation-policy-viewer-role +rules: + - apiGroups: + - resource.streamnative.io + resources: + - pulsarnsisolationpolicies + verbs: + - get + - list + - watch + - apiGroups: + - resource.streamnative.io + resources: + - pulsarnsisolationpolicies/status + verbs: + - get diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 07921a3..66733d1 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -134,6 +134,32 @@ rules: - get - patch - update +- apiGroups: + - resource.streamnative.io + resources: + - pulsarnsisolationpolicies + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - resource.streamnative.io + resources: + - pulsarnsisolationpolicies/finalizers + verbs: + - update +- apiGroups: + - resource.streamnative.io + resources: + - pulsarnsisolationpolicies/status + verbs: + - get + - patch + - update - apiGroups: - resource.streamnative.io resources: diff --git a/config/samples/resource_v1alpha1_pulsarnsisolationpolicy.yaml b/config/samples/resource_v1alpha1_pulsarnsisolationpolicy.yaml new file mode 100644 index 0000000..52c0fb5 --- /dev/null +++ b/config/samples/resource_v1alpha1_pulsarnsisolationpolicy.yaml @@ -0,0 +1,34 @@ +# Copyright 2024 StreamNative +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: resource.streamnative.io/v1alpha1 +kind: PulsarNSIsolationPolicy +metadata: + name: test-pulsar-ns-isolation-policy + namespace: test +spec: + name: test-policy + cluster: standalone + connectionRef: + name: test-pulsar-connection + namespaces: + - test-tenant/test-ns + primary: + - test-pulsar-broker-0.* + secondary: + - test-pulsar-broker-1.* + autoFailoverPolicyType: min_available + autoFailoverPolicyParams: + min_limit: "1" + usage_threshold: "80" diff --git a/controllers/connection_ref_mapper.go b/controllers/connection_ref_mapper.go index 92d714a..062de8d 100644 --- a/controllers/connection_ref_mapper.go +++ b/controllers/connection_ref_mapper.go @@ -81,6 +81,8 @@ func getConnectionRef(object client.Object) *corev1.LocalObjectReference { return &v.Spec.ConnectionRef case *pulsarv1alpha1.PulsarPackage: return &v.Spec.ConnectionRef + case *pulsarv1alpha1.PulsarNSIsolationPolicy: + return &v.Spec.ConnectionRef default: return nil } diff --git a/controllers/pulsarconnection_controller.go b/controllers/pulsarconnection_controller.go index ec28de2..13efaef 100644 --- a/controllers/pulsarconnection_controller.go +++ b/controllers/pulsarconnection_controller.go @@ -61,6 +61,9 @@ type PulsarConnectionReconciler struct { //+kubebuilder:rbac:groups=resource.streamnative.io,resources=pulsarnamespaces,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=resource.streamnative.io,resources=pulsarnamespaces/status,verbs=get;update;patch //+kubebuilder:rbac:groups=resource.streamnative.io,resources=pulsarnamespaces/finalizers,verbs=update +//+kubebuilder:rbac:groups=resource.streamnative.io,resources=pulsarnsisolationpolicies,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=resource.streamnative.io,resources=pulsarnsisolationpolicies/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=resource.streamnative.io,resources=pulsarnsisolationpolicies/finalizers,verbs=update //+kubebuilder:rbac:groups=resource.streamnative.io,resources=pulsartopics,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=resource.streamnative.io,resources=pulsartopics/status,verbs=get;update;patch //+kubebuilder:rbac:groups=resource.streamnative.io,resources=pulsartopics/finalizers,verbs=update @@ -210,6 +213,14 @@ func (r *PulsarConnectionReconciler) SetupWithManager(mgr ctrl.Manager, options }); err != nil { return err } + if err := mgr.GetCache().IndexField(context.TODO(), &resourcev1alpha1.PulsarNSIsolationPolicy{}, ".spec.connectionRef.name", + func(object client.Object) []string { + return []string{ + object.(*resourcev1alpha1.PulsarNSIsolationPolicy).Spec.ConnectionRef.Name, + } + }); err != nil { + return err + } return ctrl.NewControllerManagedBy(mgr). For(&resourcev1alpha1.PulsarConnection{}). @@ -234,6 +245,9 @@ func (r *PulsarConnectionReconciler) SetupWithManager(mgr ctrl.Manager, options Watches(&source.Kind{Type: &resourcev1alpha1.PulsarFunction{}}, handler.EnqueueRequestsFromMapFunc(ConnectionRefMapper), builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Watches(&source.Kind{Type: &resourcev1alpha1.PulsarNSIsolationPolicy{}}, + handler.EnqueueRequestsFromMapFunc(ConnectionRefMapper), + builder.WithPredicates(predicate.GenerationChangedPredicate{})). Watches(&source.Kind{Type: &resourcev1alpha1.PulsarSink{}}, handler.EnqueueRequestsFromMapFunc(ConnectionRefMapper), builder.WithPredicates(predicate.GenerationChangedPredicate{})). diff --git a/docs/pulsar_namespace.md b/docs/pulsar_namespace.md index 2552124..7e00197 100644 --- a/docs/pulsar_namespace.md +++ b/docs/pulsar_namespace.md @@ -26,7 +26,8 @@ The `PulsarNamespace` resource defines a namespace in a Pulsar cluster. It allow | `offloadThresholdSize` | Size limit for message offloading. When the limit is reached, older messages will be offloaded to the tiered storage. | No | | `geoReplicationRefs` | List of references to PulsarGeoReplication resources, used to configure geo-replication for this namespace. Use only when using PulsarGeoReplication for setting up geo-replication between two Pulsar instances. | No | | `replicationClusters` | List of clusters to which the namespace is replicated. Use only if replicating clusters within the same Pulsar instance. | No | -| `deduplication` | whether to enable message deduplication for the namespace. | No | +| `deduplication` | Whether to enable message deduplication for the namespace. | No | +| `bookieAffinityGroup` | Set the bookie-affinity group for the namespace, which has two sub fields: `bookkeeperAffinityGroupPrimary(String)` is required, and `bookkeeperAffinityGroupSecondary(String)` is optional. | No | Note: Valid time units are "s" (seconds), "m" (minutes), "h" (hours), "d" (days), "w" (weeks). diff --git a/docs/pulsar_ns_isolation_policy.md b/docs/pulsar_ns_isolation_policy.md new file mode 100644 index 0000000..113b86c --- /dev/null +++ b/docs/pulsar_ns_isolation_policy.md @@ -0,0 +1,81 @@ +# PulsarNSIsolationPolicy + +## Overview + +The `PulsarNSIsolationPolicy` resource defines a ns-isolation-policy in a Pulsar cluster. It allows you to configure namespace isolation policies to limit the set of brokers that can be used for assignment. + +## Specifications + +| Field | Description | Required | +|----------------------------|------------------------------------------------------------------------------------------------------|----------| +| `name` | The name of the policy. | Yes | +| `connectionRef` | Reference to the PulsarConnection resource used to connect to the Pulsar cluster for this namespace. | Yes | +| `cluster` | The name of the cluster. | Yes | +| `namespaces` | A list of fully qualified namespace name in the format "tenant/namespace". | Yes | +| `primary` | A list of primary-broker-regex. | Yes | +| `secondary` | A list of secondary-broker-regex. | No | +| `autoFailoverPolicyType` | Auto failover policy type name, only support `min_available` now. | Yes | +| `autoFailoverPolicyParams` | A map of auto failover policy parameters. | Yes | + + +## Create A Pulsar ns-isolation-policy + +1. Define a isolation policy named `test-policy` by using the YAML file and save the YAML file `policy.yaml`. +```yaml +apiVersion: resource.streamnative.io/v1alpha1 +kind: PulsarNSIsolationPolicy +metadata: + name: test-pulsar-ns-isolation-policy + namespace: test +spec: + name: test-policy + cluster: standalone + connectionRef: + name: test-pulsar-connection + namespaces: + - test-tenant/test-ns + primary: + - test-pulsar-broker-0.* + secondary: + - test-pulsar-broker-1.* + autoFailoverPolicyType: min_available + autoFailoverPolicyParams: + min_limit: "1" + usage_threshold: "80" +``` + +2. Apply the YAML file to create the ns-isolation-policy. + +```shell +kubectl apply -f policy.yaml +``` + +3. Check the resource status. When column Ready is true, it indicates the resource is created successfully in the pulsar cluster + +```shell +kubectl -n test get pulsarnsisolationpolicy.resource.streamnative.io +``` + +```shell +NAME RESOURCE_NAME GENERATION OBSERVED_GENERATION READY +test-pulsar-ns-isolation-policy test-policy 1 1 True +``` + +## Update A Pulsar ns-isolation-policy + +You can update the ns-isolation-policy by editing the `policy.yaml` file and then applying it again using `kubectl apply -f policy.yaml`. This allows you to modify various settings of the Pulsar ns-isolation-policy. + +After applying changes, you can check the status of the update using: + +```shell +kubectl -n test get pulsarnsisolationpolicy.resource.streamnative.io +``` +The `OBSERVED_GENERATION` should increment, and `READY` should become `True` when the update is complete. + +## Delete A Pulsar ns-isolation-policy + +To delete a PulsarNSIsolationPolicy resource, use the following kubectl command: + +```shell +kubectl -n test delete pulsarnsisolationpolicy.resource.streamnative.io test-pulsar-ns-isolation-policy +``` diff --git a/pkg/admin/dummy.go b/pkg/admin/dummy.go index 77c6b3d..061f6d0 100644 --- a/pkg/admin/dummy.go +++ b/pkg/admin/dummy.go @@ -15,6 +15,7 @@ package admin import ( + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" ) @@ -27,6 +28,18 @@ func NewDummyPulsarAdmin(PulsarAdminConfig) (PulsarAdmin, error) { type DummyPulsarAdmin struct { } +func (d *DummyPulsarAdmin) GetNSIsolationPolicy(policyName, clusterName string) (*utils.NamespaceIsolationData, error) { + return nil, nil +} + +func (d *DummyPulsarAdmin) CreateNSIsolationPolicy(policyName, clusterName string, policyData utils.NamespaceIsolationData) error { + return nil +} + +func (d *DummyPulsarAdmin) DeleteNSIsolationPolicy(policyName, clusterName string) error { + return nil +} + var _ PulsarAdmin = &DummyPulsarAdmin{} // ApplyTenant is a fake implements of ApplyTenant diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index 58c50b6..1714804 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -456,6 +456,20 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params return err } } + if params.BookieAffinityGroup != nil { + err = p.adminClient.Namespaces().SetBookieAffinityGroup(completeNSName, utils.BookieAffinityGroupData{ + BookkeeperAffinityGroupPrimary: params.BookieAffinityGroup.BookkeeperAffinityGroupPrimary, + BookkeeperAffinityGroupSecondary: params.BookieAffinityGroup.BookkeeperAffinityGroupSecondary, + }) + if err != nil { + return err + } + } else { + err = p.adminClient.Namespaces().DeleteBookieAffinityGroup(completeNSName) + if err != nil { + return err + } + } return nil } @@ -1231,3 +1245,33 @@ func (p *PulsarAdminClient) GetTenantAllowedClusters(tenantName string) ([]strin return tenant.AllowedClusters, nil } + +// GetNSIsolationPolicy get the ns-isolation-policy +func (p *PulsarAdminClient) GetNSIsolationPolicy(policyName, clusterName string) (*utils.NamespaceIsolationData, error) { + policyData, err := p.adminClient.NsIsolationPolicy().GetNamespaceIsolationPolicy(clusterName, policyName) + if err != nil { + return nil, err + } + + return policyData, nil +} + +// CreateNSIsolationPolicy create a ns-isolation-policy +func (p *PulsarAdminClient) CreateNSIsolationPolicy(policyName, clusterName string, policyData utils.NamespaceIsolationData) error { + err := p.adminClient.NsIsolationPolicy().CreateNamespaceIsolationPolicy(clusterName, policyName, policyData) + if err != nil { + return err + } + + return nil +} + +// DeleteNSIsolationPolicy delete the ns-isolation-policy +func (p *PulsarAdminClient) DeleteNSIsolationPolicy(policyName, clusterName string) error { + err := p.adminClient.NsIsolationPolicy().DeleteNamespaceIsolationPolicy(clusterName, policyName) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/admin/interface.go b/pkg/admin/interface.go index 3a45b43..da76389 100644 --- a/pkg/admin/interface.go +++ b/pkg/admin/interface.go @@ -23,6 +23,7 @@ import ( "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/auth" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + utils2 "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "k8s.io/apimachinery/pkg/api/resource" "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" @@ -53,6 +54,7 @@ type NamespaceParams struct { OffloadThresholdSize *resource.Quantity ReplicationClusters []string Deduplication *bool + BookieAffinityGroup *v1alpha1.BookieAffinityGroupData } // TopicParams indicates the parameters for creating a topic @@ -194,6 +196,15 @@ type PulsarAdmin interface { // GetTenantAllowedClusters get the allowed clusters of the tenant GetTenantAllowedClusters(name string) ([]string, error) + + // GetNSIsolationPolicy get the ns-isolation-policy + GetNSIsolationPolicy(policyName, clusterName string) (*utils2.NamespaceIsolationData, error) + + // CreateNSIsolationPolicy create a ns-isolation-policy + CreateNSIsolationPolicy(policyName, clusterName string, policyData utils2.NamespaceIsolationData) error + + // DeleteNSIsolationPolicy delete the ns-isolation-policy + DeleteNSIsolationPolicy(policyName, clusterName string) error } // PulsarAdminCreator is the function type to create a PulsarAdmin with config diff --git a/pkg/connection/reconcile_namespace.go b/pkg/connection/reconcile_namespace.go index d1dccfc..20ab8da 100644 --- a/pkg/connection/reconcile_namespace.go +++ b/pkg/connection/reconcile_namespace.go @@ -163,6 +163,7 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls OffloadThresholdTime: namespace.Spec.OffloadThresholdTime, OffloadThresholdSize: namespace.Spec.OffloadThresholdSize, Deduplication: namespace.Spec.Deduplication, + BookieAffinityGroup: namespace.Spec.BookieAffinityGroup, } if refs := namespace.Spec.GeoReplicationRefs; len(refs) != 0 || len(namespace.Spec.ReplicationClusters) > 0 { diff --git a/pkg/connection/reconcile_nsisolationpolicy.go b/pkg/connection/reconcile_nsisolationpolicy.go new file mode 100644 index 0000000..9cf2484 --- /dev/null +++ b/pkg/connection/reconcile_nsisolationpolicy.go @@ -0,0 +1,169 @@ +// Copyright 2024 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "context" + "fmt" + + utils2 "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/go-logr/logr" + resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" + "github.com/streamnative/pulsar-resources-operator/pkg/admin" + "github.com/streamnative/pulsar-resources-operator/pkg/feature" + "github.com/streamnative/pulsar-resources-operator/pkg/reconciler" + "k8s.io/apimachinery/pkg/api/meta" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +// PulsarNSIsolationPolicyReconciler reconciles a PulsarNSIsolationPolicy object +type PulsarNSIsolationPolicyReconciler struct { + conn *PulsarConnectionReconciler + log logr.Logger +} + +func makeNSIsolationPoliciesReconciler(r *PulsarConnectionReconciler) reconciler.Interface { + return &PulsarNSIsolationPolicyReconciler{ + conn: r, + log: makeSubResourceLog(r, "PulsarNSIsolationPolicy"), + } +} + +// Observe checks the updates of object +func (r *PulsarNSIsolationPolicyReconciler) Observe(ctx context.Context) error { + r.log.V(1).Info("Start Observe") + + nsIsolationPolicyList := &resourcev1alpha1.PulsarNSIsolationPolicyList{} + if err := r.conn.client.List(ctx, nsIsolationPolicyList, client.InNamespace(r.conn.connection.Namespace), + client.MatchingFields(map[string]string{ + ".spec.connectionRef.name": r.conn.connection.Name, + })); err != nil { + return fmt.Errorf("list ns-isolation-policies [%w]", err) + } + r.log.V(1).Info("Observed ns-isolation-policy items", "Count", len(nsIsolationPolicyList.Items)) + + r.conn.nsIsolationPolicies = nsIsolationPolicyList.Items + for i := range r.conn.nsIsolationPolicies { + if !resourcev1alpha1.IsPulsarResourceReady(&r.conn.nsIsolationPolicies[i]) { + r.conn.addUnreadyResource(&r.conn.nsIsolationPolicies[i]) + } + } + + r.log.V(1).Info("Observe Done") + return nil +} + +// Reconcile reconciles all ns-isolation-policies +func (r *PulsarNSIsolationPolicyReconciler) Reconcile(ctx context.Context) error { + errs := []error{} + for i := range r.conn.nsIsolationPolicies { + policy := &r.conn.nsIsolationPolicies[i] + if err := r.ReconcilePolicy(ctx, r.conn.pulsarAdmin, policy); err != nil { + errs = append(errs, err) + } + } + if len(errs) > 0 { + return fmt.Errorf("reconcile ns-isolation-policies error: [%v]", errs) + } + return nil +} + +// ReconcilePolicy move the current state of the ns-isolation-policy closer to the desired state +func (r *PulsarNSIsolationPolicyReconciler) ReconcilePolicy(ctx context.Context, pulsarAdmin admin.PulsarAdmin, + policy *resourcev1alpha1.PulsarNSIsolationPolicy) error { + log := r.log.WithValues("name", policy.Name, "namespace", policy.Namespace) + log.Info("Start Reconcile") + + if !policy.DeletionTimestamp.IsZero() { + log.Info("Deleting ns-isolation-policy") + + if err := pulsarAdmin.DeleteNSIsolationPolicy(policy.Spec.Name, policy.Spec.Cluster); err != nil { + if admin.IsNoSuchHostError(err) { + log.Info("Pulsar cluster has been deleted") + } else { + log.Error(err, "Failed to delete ns-isolation-policy") + meta.SetStatusCondition(&policy.Status.Conditions, *NewErrorCondition(policy.Generation, err.Error())) + if err := r.conn.client.Status().Update(ctx, policy); err != nil { + log.Error(err, "Failed to update the ns-isolation-policy status") + return err + } + return err + } + } + + // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer + controllerutil.RemoveFinalizer(policy, resourcev1alpha1.FinalizerName) + if err := r.conn.client.Update(ctx, policy); err != nil { + log.Error(err, "Failed to remove finalizer") + return err + } + + return nil + } + + // TODO use otelcontroller until kube-instrumentation upgrade controller-runtime version to newer + controllerutil.AddFinalizer(policy, resourcev1alpha1.FinalizerName) + if err := r.conn.client.Update(ctx, policy); err != nil { + log.Error(err, "Failed to add finalizer") + return err + } + + if resourcev1alpha1.IsPulsarResourceReady(policy) && + !feature.DefaultFeatureGate.Enabled(feature.AlwaysUpdatePulsarResource) { + log.Info("Skip reconcile, ns-isolation-policy resource is ready") + return nil + } + + // Pulsar backend will fail when no secondary argument is passed, so assign an empty array here + if policy.Spec.Secondary == nil { + policy.Spec.Secondary = []string{} + } + if err := pulsarAdmin.CreateNSIsolationPolicy(policy.Spec.Name, policy.Spec.Cluster, utils2.NamespaceIsolationData{ + Namespaces: policy.Spec.Namespaces, + Primary: policy.Spec.Primary, + Secondary: policy.Spec.Secondary, + AutoFailoverPolicy: utils2.AutoFailoverPolicyData{ + PolicyType: convertPolicyType(policy.Spec.AutoFailoverPolicyType), + Parameters: policy.Spec.AutoFailoverPolicyParams, + }, + }); err != nil { + meta.SetStatusCondition(&policy.Status.Conditions, *NewErrorCondition(policy.Generation, err.Error())) + log.Error(err, "Failed to apply ns isolation policy") + if err := r.conn.client.Status().Update(ctx, policy); err != nil { + log.Error(err, "Failed to update the ns isolation policy") + return err + } + return err + } + + policy.Status.ObservedGeneration = policy.Generation + meta.SetStatusCondition(&policy.Status.Conditions, *NewReadyCondition(policy.Generation)) + if err := r.conn.client.Status().Update(ctx, policy); err != nil { + log.Error(err, "Failed to update the ns-isolation-policy status") + return err + } + + return nil +} + +func convertPolicyType(policyType resourcev1alpha1.AutoFailoverPolicyType) utils2.AutoFailoverPolicyType { + switch policyType { + case resourcev1alpha1.MinAvailable: + return utils2.MinAvailable + default: + return "" + } +} diff --git a/pkg/connection/reconciler.go b/pkg/connection/reconciler.go index b2a378b..91cd409 100644 --- a/pkg/connection/reconciler.go +++ b/pkg/connection/reconciler.go @@ -37,20 +37,21 @@ import ( // PulsarConnectionReconciler reconciles a PulsarConnection object type PulsarConnectionReconciler struct { - connection *resourcev1alpha1.PulsarConnection - log logr.Logger - client client.Client - creator admin.PulsarAdminCreator - tenants []resourcev1alpha1.PulsarTenant - namespaces []resourcev1alpha1.PulsarNamespace - topics []resourcev1alpha1.PulsarTopic - permissions []resourcev1alpha1.PulsarPermission - geoReplications []resourcev1alpha1.PulsarGeoReplication - packages []resourcev1alpha1.PulsarPackage - sinks []resourcev1alpha1.PulsarSink - sources []resourcev1alpha1.PulsarSource - functions []resourcev1alpha1.PulsarFunction - unreadyResources []string + connection *resourcev1alpha1.PulsarConnection + log logr.Logger + client client.Client + creator admin.PulsarAdminCreator + tenants []resourcev1alpha1.PulsarTenant + namespaces []resourcev1alpha1.PulsarNamespace + topics []resourcev1alpha1.PulsarTopic + permissions []resourcev1alpha1.PulsarPermission + geoReplications []resourcev1alpha1.PulsarGeoReplication + packages []resourcev1alpha1.PulsarPackage + sinks []resourcev1alpha1.PulsarSink + sources []resourcev1alpha1.PulsarSource + functions []resourcev1alpha1.PulsarFunction + nsIsolationPolicies []resourcev1alpha1.PulsarNSIsolationPolicy + unreadyResources []string pulsarAdmin admin.PulsarAdmin pulsarAdminV3 admin.PulsarAdmin @@ -81,6 +82,7 @@ func MakeReconciler(log logr.Logger, k8sClient client.Client, creator admin.Puls makeFunctionsReconciler(r), makeSinksReconciler(r), makeSourcesReconciler(r), + makeNSIsolationPoliciesReconciler(r), } return r } diff --git a/tests/operator/resources_test.go b/tests/operator/resources_test.go index 002eec5..ba28206 100644 --- a/tests/operator/resources_test.go +++ b/tests/operator/resources_test.go @@ -74,18 +74,21 @@ var _ = Describe("Resources", func() { }, }, } - ppackage *v1alphav1.PulsarPackage - ppackageurl string = "function://public/default/api-examples@v3.2.3.3" - pfuncName string = "test-func" - pfuncFailureName string = "func-test-failure" - psinkName string = "test-sink" - psourceName string = "test-source" - pfunc *v1alphav1.PulsarFunction - pfuncfailure *v1alphav1.PulsarFunction - psinkpackageurl string = "builtin://data-generator" - psink *v1alphav1.PulsarSink - psource *v1alphav1.PulsarSource - psourcepackageurl string = "builtin://data-generator" + ppackage *v1alphav1.PulsarPackage + ppackageurl string = "function://public/default/api-examples@v3.2.3.3" + pfuncName string = "test-func" + pfuncFailureName string = "func-test-failure" + psinkName string = "test-sink" + psourceName string = "test-source" + pclusterName string = "test-pulsar" + pnsIsolationPolicyName string = "test-ns-isolation-policy" + pfunc *v1alphav1.PulsarFunction + pfuncfailure *v1alphav1.PulsarFunction + psinkpackageurl string = "builtin://data-generator" + psink *v1alphav1.PulsarSink + psource *v1alphav1.PulsarSource + pnsisolationpolicy *v1alphav1.PulsarNSIsolationPolicy + psourcepackageurl string = "builtin://data-generator" ) BeforeEach(func() { @@ -111,6 +114,14 @@ var _ = Describe("Resources", func() { pfuncfailure = utils.MakePulsarFunction(namespaceName, pfuncFailureName, "function://not/exists/package@latest", pconnName, lifecyclePolicy) psink = utils.MakePulsarSink(namespaceName, psinkName, psinkpackageurl, pconnName, lifecyclePolicy) psource = utils.MakePulsarSource(namespaceName, psourceName, psourcepackageurl, pconnName, lifecyclePolicy) + pnsisolationpolicy = utils.MakeNSIsolationPolicy(namespaceName, pnsIsolationPolicyName, pclusterName, pconnName, + []string{pnamespaceName}, + []string{"test-pulsar-broker-0.*"}, + []string{}, + map[string]string{ + "min_limit": "1", + "usage_threshold": "80", + }) }) Describe("Basic resource operations", Ordered, func() { @@ -460,6 +471,31 @@ var _ = Describe("Resources", func() { }) }) + Context("PulsarNSIsolationPolicy operation", func() { + It("should create the pulsar ns-isolation-policy successfully", func() { + err := k8sClient.Create(ctx, pnsisolationpolicy) + Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) + }) + + It("the ns-isolation-policy should be ready", func() { + Eventually(func() bool { + s := &v1alphav1.PulsarNSIsolationPolicy{} + tns := types.NamespacedName{Namespace: namespaceName, Name: pnsIsolationPolicyName} + Expect(k8sClient.Get(ctx, tns, s)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(s) + }, "40s", "100ms").Should(BeTrue()) + }) + + It("cleanup the pulsar ns-isolation-policy successfully", func() { + Eventually(func(g Gomega) { + t := &v1alphav1.PulsarNSIsolationPolicy{} + tns := types.NamespacedName{Namespace: namespaceName, Name: pnsIsolationPolicyName} + g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed()) + }).Should(Succeed()) + }) + }) + AfterAll(func() { Eventually(func(g Gomega) { t := &v1alphav1.PulsarTopic{} diff --git a/tests/utils/spec.go b/tests/utils/spec.go index bf37da7..e2821a1 100644 --- a/tests/utils/spec.go +++ b/tests/utils/spec.go @@ -285,3 +285,25 @@ func MakePulsarSource(namespace, name, sourcePackageUrl, connectionName string, }, } } + +// MakeNSIsolationPolicy will generate a object of PulsarNSIsolationPolicy +func MakeNSIsolationPolicy(namespace, name, clusterName, connectionName string, namespaces, primary, secondary []string, params map[string]string) *v1alpha1.PulsarNSIsolationPolicy { + return &v1alpha1.PulsarNSIsolationPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1alpha1.PulsarNSIsolationPolicySpec{ + ConnectionRef: corev1.LocalObjectReference{ + Name: connectionName, + }, + Name: name, + Cluster: clusterName, + Namespaces: namespaces, + Primary: primary, + Secondary: secondary, + AutoFailoverPolicyType: v1alpha1.MinAvailable, + AutoFailoverPolicyParams: params, + }, + } +}